import csv
import json
from io import BytesIO
import os
import tempfile
import shutil
import zipfile
from pygments import highlight
from pygments.lexers.data import JsonLexer
from pygments.formatters.html import HtmlFormatter
from ajax_select import make_ajax_form
from django.contrib import admin, messages
from django.http import HttpResponse
from django.utils.translation import ugettext_lazy as _
from django.utils.safestring import mark_safe
from django.shortcuts import HttpResponseRedirect, reverse
from commonnet.admin_site import admin_site
from commcrawler import models
RESULT_FIELDNAMES = (
"target_id", "target__name", "target__url", "nb_external_link",
"nb_internal_link", "nb_images", "nb_facebook", "nb_twitter",
"nb_instagram", "nb_youtube", "nb_dailymotion", "nb_vimeo",
"nb_video", "nb_audio", "nb_internal_pdf", "nb_external_pdf",
"nb_internal_office", "nb_external_office", "is_online",
"redirection", "error")
def export_as_csv_action(field_names,
description=_("Export selected as CSV file"),
header=True):
def export_as_csv(modeladmin, request, queryset):
"""
Generic csv export admin action.
based on http://djangosnippets.org/snippets/1697/
"""
opts = modeladmin.model._meta
response = HttpResponse(content_type='text/csv')
response['Content-Disposition'] = 'attachment; filename=%s.csv' % \
str(opts).replace('.', '_')
writer = csv.writer(response)
if header:
writer.writerow(list(field_names))
for obj in queryset.values(*field_names).order_by('pk'):
row = []
for field in field_names:
value = obj[field]
if value is None:
value = ""
else:
value = str(value)
row.append(value)
writer.writerow(row)
return response
export_as_csv.short_description = description
return export_as_csv
def export_result_as_zip(description=_("Export all results as ZIP file"),
header=True):
def write_result_csv(crawl, result_filename):
error_pos = None
field_names = list(RESULT_FIELDNAMES)
if "error" in field_names:
error_pos = field_names.index("error")
field_names.pop(error_pos)
with open(result_filename, "w") as result_file:
writer = csv.writer(result_file)
if header:
writer.writerow(list(RESULT_FIELDNAMES))
for target in models.Target.objects.filter(crawl=crawl).all():
q = models.CrawlResult.objects.filter(crawl=crawl,
target=target)
result = []
if not q.count():
for col in field_names:
if not col.startswith("target_"):
result.append("")
else:
if col.startswith("target__"):
key = "target__"
else:
key = "target_"
result.append(str(getattr(target, col[len(key):])))
if error_pos:
result.insert(error_pos, "unreachable")
else:
result = list(q.values_list(*field_names).all()[0])
if error_pos:
result.insert(error_pos, "")
result = ["" if r is None else str(r) for r in result]
writer.writerow(result)
def write_relations_csv(crawl, filename):
queryset = models.CrawlRelation.objects.filter(crawl=crawl)
field_names = ("source_id", "destination_id", "number")
with open(filename, "w") as fle:
writer = csv.writer(fle)
if header:
writer.writerow(list(field_names))
for obj in queryset.values(*field_names).order_by('pk'):
row = []
for field in field_names:
value = obj[field]
if value is None:
value = ""
else:
value = str(value)
row.append(value)
writer.writerow(row)
def export_as_zip(modeladmin, request, queryset):
if queryset.count() != 1:
url = reverse(
'admin:%s_%s_changelist' % (
modeladmin.model._meta.app_label,
modeladmin.model._meta.model_name)
)
messages.add_message(
request, messages.ERROR,
str(_("Select only one item.")),
)
return HttpResponseRedirect(url)
crawl = queryset.all()[0]
tmpdir = tempfile.mkdtemp(prefix="comm-on-net")
name = crawl.name
date = crawl.started.strftime('%Y-%m-%d')
result_filename = os.path.join(
tmpdir, "{}-results-{}.csv".format(name, date))
write_result_csv(crawl, result_filename)
relation_filename = os.path.join(
tmpdir, "{}-relations-{}.csv".format(name, date))
write_relations_csv(crawl, relation_filename)
s = BytesIO()
with zipfile.ZipFile(s, "w") as zf:
for filepath in (result_filename, relation_filename):
__, filename = os.path.split(filepath)
zf.write(filepath, filename)
shutil.rmtree(tmpdir)
response = HttpResponse(s.getvalue(), content_type='application/zip')
response['Content-Disposition'] = \
'attachment; filename={}-{}.zip'.format(name, date)
return response
export_as_zip.short_description = description
return export_as_zip
class CrawlAdmin(admin.ModelAdmin):
model = models.Crawl
list_display = ("name", "status", "get_target_nb", "time_out", "created",
"started", "crawl_ended", "ended", "progress")
list_filter = ("status",)
exclude = ("progression", "created", "started", "pre_crawl_ended",
"crawl_ended", "ended")
readonly_fields = ()
form = make_ajax_form(model, {'targets': 'target'})
actions = [export_result_as_zip()]
def get_target_nb(self, obj):
return obj.target_nb
get_target_nb.short_description = _("Target number")
admin_site.register(models.Crawl, CrawlAdmin)
class CrawlResultAdmin(admin.ModelAdmin):
model = models.CrawlResult
list_display = (
"short_name", "open_link", "crawl", "started", "duration", "status",
"is_online", "bad_ssl", "nb_external_link", "nb_internal_link",
"nb_images", "nb_facebook", "nb_twitter", "nb_instagram", "nb_youtube",
"nb_dailymotion", "nb_vimeo", "nb_video", "nb_audio", "nb_internal_pdf",
"nb_external_pdf", "nb_internal_office", "nb_external_office"
)
list_filter = ("status", "crawl", "is_online", "bad_ssl")
search_fields = ("target__name",)
readonly_fields = (
"started", "duration", "status", "nb_external_link", "nb_internal_link",
"nb_images", "nb_facebook", "nb_twitter", "nb_instagram", "nb_youtube",
"nb_dailymotion", "nb_vimeo", "nb_video", "nb_audio", "nb_internal_pdf",
"nb_external_pdf", "nb_internal_office", "nb_external_office",
"is_online", "redirection", "crawl_result_prettified"
)
exclude = ("crawl_result",)
form = make_ajax_form(model, {'target': 'target'})
actions = [
export_as_csv_action(RESULT_FIELDNAMES)
]
def open_link(self, obj):
url = obj.url()
if not url:
return "-"
return mark_safe("{}".format(url, url))
def crawl_result_prettified(self, instance):
response = json.dumps(instance.crawl_result, sort_keys=True, indent=2)
formatter = HtmlFormatter(style='colorful')
response = highlight(response, JsonLexer(), formatter)
style = "
"
return mark_safe(style + response)
crawl_result_prettified.short_description = _("Crawl result")
admin_site.register(models.CrawlResult, CrawlResultAdmin)
class CrawlRelationAdmin(admin.ModelAdmin):
model = models.CrawlRelation
list_display = ("crawl", "source", "destination", "number")
list_filter = ("crawl",)
search_fields = ["source__name", "destination__name"]
form = make_ajax_form(model, {'source': 'target', 'destination': 'target'})
actions = [
export_as_csv_action(
("source_id", "destination_id", "number")
)
]
admin_site.register(models.CrawlRelation, CrawlRelationAdmin)
class ExcludedDomainAdmin(admin.ModelAdmin):
list_display = ('domain',)
search_fields = ('domain',)
admin_site.register(models.ExludedDomains, ExcludedDomainAdmin)