summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--ishtar_common/admin.py24
-rw-r--r--ishtar_common/forms_common.py18
-rw-r--r--ishtar_common/models.py9
-rw-r--r--ishtar_common/tests.py87
-rw-r--r--ishtar_common/utils.py35
-rw-r--r--ishtar_common/views.py65
-rw-r--r--ishtar_common/views_item.py9
-rw-r--r--ishtar_common/wizards.py17
8 files changed, 219 insertions, 45 deletions
diff --git a/ishtar_common/admin.py b/ishtar_common/admin.py
index 62faf3b77..50b6d05b3 100644
--- a/ishtar_common/admin.py
+++ b/ishtar_common/admin.py
@@ -72,7 +72,7 @@ from django import forms
from ishtar_common import models, models_common, models_rest
from ishtar_common.apps import admin_site
from ishtar_common.model_merging import merge_model_objects
-from ishtar_common.utils import get_cache, create_slug
+from ishtar_common.utils import get_cache, create_slug, get_person_gdpr_log
from ishtar_common import forms as common_forms, forms_common as other_common_forms
from ishtar_common.serializers import restore_serialized, IMPORT_MODEL_LIST
@@ -707,6 +707,28 @@ class PersonAdmin(HistorizedObjectAdmin):
model = models.Person
inlines = [ProfileInline]
+ def get_search_results(self, request, queryset, search_term):
+ page = int(request.GET.get("p", 0))
+ slice = page * self.list_per_page, (page + 1) * self.list_per_page
+ get_person_gdpr_log("admin_person_consultation", request, None, queryset, slice)
+ return super().get_search_results(request, queryset, search_term)
+
+ def response_change(self, request, obj):
+ get_person_gdpr_log("admin_person_modify", request, None,
+ self.model.objects.filter(pk=obj.pk))
+ return super().response_change(request, obj)
+
+ def changeform_view(self, request, object_id=None, form_url='', extra_context=None):
+ if request.method == "GET":
+ get_person_gdpr_log("admin_person_view", request, None,
+ self.model.objects.filter(pk=object_id))
+ return super().changeform_view(request, object_id, form_url, extra_context)
+
+ def delete_model(self, request, obj):
+ get_person_gdpr_log("admin_person_delete", request, None,
+ self.model.objects.filter(pk=obj.pk))
+ super().delete_model(request, obj)
+
admin_site.register(models.Person, PersonAdmin)
diff --git a/ishtar_common/forms_common.py b/ishtar_common/forms_common.py
index 45b43000f..7f37e86ce 100644
--- a/ishtar_common/forms_common.py
+++ b/ishtar_common/forms_common.py
@@ -931,12 +931,13 @@ class MergeIntoForm(forms.Form):
)
)
- def merge(self):
+ def merge(self, callback=None, request=None):
model = self.associated_model
try:
main_item = model.objects.get(pk=self.cleaned_data["main_item"])
except model.DoesNotExist:
return
+ items, items_pk = [], [main_item.pk]
for pk in self.items:
if pk == str(main_item.pk):
continue
@@ -944,6 +945,12 @@ class MergeIntoForm(forms.Form):
item = model.objects.get(pk=pk)
except model.DoesNotExist:
continue
+ items.append(item)
+ items_pk.append(item.pk)
+ if callback:
+ queryset = models.Person.objects.filter(pk__in=items_pk)
+ callback("merge_person", request, "", queryset)
+ for item in items:
main_item.merge(item)
return main_item
@@ -1725,9 +1732,9 @@ class MergeFormSet(BaseModelFormSet):
self._cached_list = []
super(MergeFormSet, self).__init__(*args, **kwargs)
- def merge(self):
+ def merge(self, callback=None, request=None):
for form in self.initial_forms:
- form.merge()
+ form.merge(callback=callback, request=request)
def initial_form_count(self):
"""
@@ -1804,6 +1811,11 @@ class MergeForm(forms.ModelForm):
from_item = getattr(self.instance, self.FROM_KEY)
except ObjectDoesNotExist:
return
+ callback = kwargs.get("callback", None)
+ if callback:
+ request = kwargs.get("request")
+ queryset = models.Person.objects.filter(pk__in=[to_item.pk, from_item.pk])
+ callback("merge_person", request, "", queryset)
if self.cleaned_data.get("a_is_duplicate_b"):
to_item.merge(from_item)
elif self.cleaned_data.get("b_is_duplicate_a"):
diff --git a/ishtar_common/models.py b/ishtar_common/models.py
index b9944e4bb..c978b087b 100644
--- a/ishtar_common/models.py
+++ b/ishtar_common/models.py
@@ -3211,11 +3211,12 @@ GDPR_ACTIVITY = (
("PE", _("Exporting a person's notice")),
("PC", _("Person creation")),
("PM", _("Person modification")),
+ ("Pm", _("Person merge")),
("PD", _("Person deletion")),
("AC", _("Admin - Directory consultation")),
- ("AE", _("Admin - Directory export")),
("AV", _("Admin - Person view")),
("AM", _("Admin - Person modification")),
+ ("AD", _("Admin - Person deletion")),
)
GDPR_ACTIVITY_DICT = dict(GDPR_ACTIVITY)
@@ -3299,8 +3300,10 @@ class GDPRLog(models.Model):
if start is not None:
values = values[start:end]
for pk in values:
- gdpr_persons.append(cls.persons.through(gdprperson_id=pk, gdprlog_id=log.pk))
- cls.persons.through.objects.bulk_create(gdpr_persons)
+ if pk:
+ gdpr_persons.append(cls.persons.through(gdprperson_id=pk, gdprlog_id=log.pk))
+ if gdpr_persons:
+ cls.persons.through.objects.bulk_create(gdpr_persons)
class ProfileType(GeneralType):
diff --git a/ishtar_common/tests.py b/ishtar_common/tests.py
index 4398ed762..6a2e9d3b0 100644
--- a/ishtar_common/tests.py
+++ b/ishtar_common/tests.py
@@ -2068,6 +2068,7 @@ class GDPRTest(TestCase):
self.user = User.objects.create_superuser(
self.username, "nomail@nomail.com", self.password
)
+ self.user.user_permissions.add(Permission.objects.get(codename="change_person"))
self.person_1 = models.Person.objects.create(
name="Boule",
surname=" ",
@@ -2104,19 +2105,81 @@ class GDPRTest(TestCase):
def test_views(self):
nb = models.GDPRLog.objects.count()
- c = Client()
- c.login(username=self.username, password=self.password)
- self.settings(GDPR_LOGGING=True)
- c.get(reverse("get-person"))
- self.assertEqual(models.GDPRLog.objects.count(), nb + 1)
- self.assertEqual(models.GDPRLog.objects.order_by('-pk')[0].activity, "DC")
- c.get(reverse("get-person", args=["csv"]))
- self.assertEqual(models.GDPRLog.objects.count(), nb + 2)
- self.assertEqual(models.GDPRLog.objects.order_by('-pk')[0].activity, "DE")
- response = c.get(reverse("show-person", args=[self.person_1.pk, ""]))
- self.assertEqual(models.GDPRLog.objects.count(), nb + 3)
- self.assertEqual(models.GDPRLog.objects.order_by('-pk')[0].activity, "PC")
+ with self.settings(GDPR_LOGGING=False):
+ c = Client()
+ c.login(username=self.username, password=self.password)
+ c.get(reverse("get-person"))
+ self.assertEqual(models.GDPRLog.objects.count(), nb)
+ with self.settings(GDPR_LOGGING=True):
+ c = Client()
+ c.login(username=self.username, password=self.password)
+ c.get(reverse("get-person"))
+ self.assertEqual(models.GDPRLog.objects.count(), nb + 1)
+ self.assertEqual(models.GDPRLog.objects.order_by('-pk')[0].activity, "DC")
+ c.get(reverse("get-person", args=["csv"]))
+ self.assertEqual(models.GDPRLog.objects.count(), nb + 2)
+ self.assertEqual(models.GDPRLog.objects.order_by('-pk')[0].activity, "DE")
+ c.get(reverse("show-person", args=[self.person_1.pk, ""]))
+ self.assertEqual(models.GDPRLog.objects.count(), nb + 3)
+ self.assertEqual(models.GDPRLog.objects.order_by('-pk')[0].activity, "PV")
+ c.get(reverse("show-person", args=[self.person_1.pk, "pdf"]))
+ self.assertEqual(models.GDPRLog.objects.count(), nb + 4)
+ self.assertEqual(models.GDPRLog.objects.order_by('-pk')[0].activity, "PE")
+ title = models.TitleType.objects.create(label="Mr", txt_idx='mr')
+ c.post(
+ reverse("person-qa-bulk-update-confirm", args=[str(self.person_1.pk)]),
+ {"qa_title": title.pk},
+ )
+ self.assertEqual(models.GDPRLog.objects.count(), nb + 5)
+ self.assertEqual(models.GDPRLog.objects.order_by('-pk')[0].activity, "PM")
+ # deletion
+ url = "/person_deletion/"
+ step = "selec-person_deletion"
+ data = {
+ "person_deletionperson_deletion_wizard-current_step": [step],
+ f"{step}-pks": [str(self.person_3.pk)]
+ }
+ c.post(url, data)
+ data = {
+ "person_deletionperson_deletion_wizard-current_step": ["final-person_deletion"],
+ }
+ c.post(url, data, follow=True)
+ self.assertEqual(models.GDPRLog.objects.count(), nb + 6)
+ self.assertEqual(models.GDPRLog.objects.order_by('-pk')[0].activity, "PD")
+ # auto-merge
+ url = f"/person-manual-merge-items/{self.person_1.pk}_{self.person_2.pk}/"
+ c.post(url, {"main_item": self.person_1.pk}, follow=True)
+ self.assertEqual(models.GDPRLog.objects.count(), nb + 7)
+ self.assertEqual(models.GDPRLog.objects.order_by('-pk')[0].activity, "Pm")
+ # merge
+ merge_model = models.Person.merge_candidate.through
+ ln = merge_model.objects.count()
+ self.person_4 = models.Person.objects.create(
+ name="George", history_modifier=self.user,
+ )
+ self.person_3.save()
+ self.person_4.save()
+ self.assertTrue(merge_model.objects.count(), ln + 1)
+ data = {
+ "form-0-b_is_duplicate_a": True,
+ "form-0-id": list(merge_model.objects.all())[-1].pk,
+ "form-TOTAL_FORMS": 1,
+ "form-INITIAL_FORMS": 1
+ }
+ c.post("/person-merge/", data)
+ self.assertEqual(models.GDPRLog.objects.count(), nb + 8)
+ self.assertEqual(models.GDPRLog.objects.order_by('-pk')[0].activity, "Pm")
+ # admin consultation
+ c.get("/admin/ishtar_common/person/")
+ self.assertEqual(models.GDPRLog.objects.count(), nb + 9)
+ self.assertEqual(models.GDPRLog.objects.order_by('-pk')[0].activity, "AC")
+ c.get(f"/admin/ishtar_common/person/{self.person_1.pk}/change/")
+ self.assertEqual(models.GDPRLog.objects.count(), nb + 10)
+ self.assertEqual(models.GDPRLog.objects.order_by('-pk')[0].activity, "AV")
+ c.post(f"/admin/ishtar_common/person/{self.person_1.pk}/delete/", {"post": "yes"})
+ self.assertEqual(models.GDPRLog.objects.count(), nb + 11)
+ self.assertEqual(models.GDPRLog.objects.order_by('-pk')[0].activity, "AD")
def test_create_log_performance(self):
persons = []
diff --git a/ishtar_common/utils.py b/ishtar_common/utils.py
index 35289145c..0a6926d9d 100644
--- a/ishtar_common/utils.py
+++ b/ishtar_common/utils.py
@@ -508,8 +508,6 @@ class OwnPerms:
return q
-
-
def update_data(data, new_data, merge=False):
"""
Update a data directory taking account of key detail
@@ -620,11 +618,11 @@ class MultiValueDict(BaseMultiValueDict):
return lst
-def is_downloadable(url):
+def is_downloadable(curl):
"""
Does the url contain a downloadable resource
"""
- h = requests.head(url, allow_redirects=True)
+ h = requests.head(curl, allow_redirects=True)
header = h.headers
content_type = header.get("content-type")
if "text" in content_type.lower():
@@ -736,6 +734,35 @@ def deserialize_args_for_tasks(sender, kwargs, extra_kwargs=None):
return sender, instance
+def get_person_gdpr_log(view_name, request, data_type, queryset, slice_query=None):
+ if not settings.GDPR_LOGGING:
+ return
+ if view_name == "get_item":
+ activity = "DE" if data_type == "csv" else "DC"
+ elif view_name == "show_item":
+ activity = "PV" if not data_type else "PE"
+ elif view_name in ("new_qa_item", "new_item"):
+ activity = "PC"
+ elif view_name in ("modify_qa_item", "modify_item"):
+ activity = "PM"
+ elif view_name == "delete_item":
+ activity = "PD"
+ elif view_name == "merge_person":
+ activity = "Pm"
+ elif view_name == "admin_person_consultation":
+ activity = "AC"
+ elif view_name == "admin_person_view":
+ activity = "AV"
+ elif view_name == "admin_person_modify":
+ activity = "AM"
+ elif view_name == "admin_person_delete":
+ activity = "AD"
+ else:
+ return
+ GDPRLog = apps.get_model("ishtar_common", "GDPRLog")
+ GDPRLog.create_log(request, activity, queryset, slice_query)
+
+
EXTRA_KWARGS_TRIGGER = [
"_cascade_change",
"_cached_labels_bulk_update",
diff --git a/ishtar_common/views.py b/ishtar_common/views.py
index c5ab88efa..d5b2f369f 100644
--- a/ishtar_common/views.py
+++ b/ishtar_common/views.py
@@ -77,6 +77,7 @@ from ishtar_common.utils import (
clean_session_cache,
CSV_OPTIONS,
get_field_labels_from_path,
+ get_person_gdpr_log,
get_random_item_image_link,
shortify,
dict_to_tuple,
@@ -1092,11 +1093,13 @@ def autocomplete_author(request):
return HttpResponse(data, content_type="text/plain")
-new_person = new_qa_item(models.Person, forms.PersonForm, page_name=_("New person"))
-modify_person = modify_qa_item(models.Person, forms.PersonForm)
+new_person = new_qa_item(models.Person, forms.PersonForm, page_name=_("New person"),
+ callback=get_person_gdpr_log)
+modify_person = modify_qa_item(models.Person, forms.PersonForm, callback=get_person_gdpr_log)
detail_person = get_short_html_detail(models.Person)
new_person_noorga = new_qa_item(
- models.Person, forms.NoOrgaPersonForm, page_name=_("New person")
+ models.Person, forms.NoOrgaPersonForm, page_name=_("New person"),
+ callback=get_person_gdpr_log
)
new_organization = new_qa_item(
models.Organization, forms.OrganizationForm, page_name=_("New organization")
@@ -1107,17 +1110,6 @@ modify_organization = modify_qa_item(models.Organization, forms.OrganizationForm
detail_organization = get_short_html_detail(models.Organization)
new_author = new_qa_item(models.Author, forms.AuthorForm, page_name=_("New author"))
-
-def get_person_gdpr_log(view_name, request, data_type, queryset, slice_query=None):
- if view_name == "get_item":
- activity = "DE" if data_type == "csv" else "DC"
- elif view_name == "show_item":
- activity = "PC" if not data_type else "PC"
- else:
- return
- models.GDPRLog.create_log(request, activity, queryset, slice_query)
-
-
show_person = show_item(models.Person, "person", callback=get_person_gdpr_log)
get_person = get_item(models.Person, "get_person", "person",
callback=get_person_gdpr_log)
@@ -1170,7 +1162,7 @@ def reset_wizards(request):
ITEM_PER_PAGE = 20
-def merge_action(model, form, key, name_key="name"):
+def merge_action(model, form, key, name_key="name", callback=None):
def merge(request, page=1):
current_url = key + "_merge"
if not page:
@@ -1207,7 +1199,7 @@ def merge_action(model, form, key, name_key="name"):
if request.method == "POST":
context["formset"] = FormSet(request.POST, queryset=queryset)
if context["formset"].is_valid():
- context["formset"].merge()
+ context["formset"].merge(callback=callback, request=request)
return redirect(reverse(current_url, kwargs={"page": page}))
else:
context["formset"] = FormSet(queryset=queryset)
@@ -1234,7 +1226,7 @@ def regenerate_external_id(request):
return HttpResponseRedirect(reverse("success"))
-person_merge = merge_action(models.Person, forms.MergePersonForm, "person")
+person_merge = merge_action(models.Person, forms.MergePersonForm, "person", callback=get_person_gdpr_log)
organization_merge = merge_action(
models.Organization, forms.MergeOrganizationForm, "organization"
)
@@ -2380,6 +2372,12 @@ class PersonCreate(LoginRequiredMixin, CreateView):
form_class = forms.BasePersonForm
template_name = "ishtar/person_form.html"
+ def form_valid(self, form):
+ returned = super().form_valid(form)
+ get_person_gdpr_log("new_item", self.request, None,
+ self.model.objects.filter(pk=self.object.pk))
+ return returned
+
def get_success_url(self):
return reverse("person_edit", args=[self.object.pk])
@@ -2389,6 +2387,12 @@ class PersonEdit(LoginRequiredMixin, UpdateView):
form_class = forms.BasePersonForm
template_name = "ishtar/person_form.html"
+ def form_valid(self, form):
+ returned = super().form_valid(form)
+ get_person_gdpr_log("modify_item", self.request, None,
+ self.model.objects.filter(pk=self.object.pk))
+ return returned
+
def get_success_url(self):
return reverse("person_edit", args=[self.object.pk])
@@ -2412,14 +2416,19 @@ class PersonManualMerge(ManualMergeMixin, IshtarMixin, LoginRequiredMixin, FormV
redir_url = "person_manual_merge_items"
-class ManualMergeItemsMixin(object):
+class ManualMergeItemsMixin:
+ item_type = None
+
def get_form_kwargs(self):
kwargs = super(ManualMergeItemsMixin, self).get_form_kwargs()
kwargs["items"] = self.kwargs["pks"].split("_")
return kwargs
def form_valid(self, form):
- self.item = form.merge()
+ callback = None
+ if self.item_type:
+ callback = get_person_gdpr_log
+ self.item = form.merge(callback=callback, request=self.request)
return super(ManualMergeItemsMixin, self).form_valid(form)
def get_success_url(self):
@@ -2499,6 +2508,12 @@ class OrganizationPersonCreate(LoginRequiredMixin, CreateView):
def get_success_url(self):
return reverse("organization_person_edit", args=[self.object.pk])
+ def form_valid(self, form):
+ returned = super().form_valid(form)
+ get_person_gdpr_log("new_item", self.request, None,
+ self.model.objects.filter(pk=self.object.pk))
+ return returned
+
class OrganizationPersonEdit(LoginRequiredMixin, UpdateView):
model = models.Person
@@ -2514,6 +2529,12 @@ class OrganizationPersonEdit(LoginRequiredMixin, UpdateView):
def get_success_url(self):
return reverse("organization_person_edit", args=[self.object.pk])
+ def form_valid(self, form):
+ returned = super().form_valid(form)
+ get_person_gdpr_log("modify_item", self.request, None,
+ self.model.objects.filter(pk=self.object.pk))
+ return returned
+
# documents
@@ -3033,6 +3054,12 @@ class QAPersonForm(QAItemEditForm):
model = models.Person
form_class = forms.QAPersonFormMulti
+ def form_save(self, form):
+ get_person_gdpr_log("modify_item", self.request, None,
+ self.model.objects.filter(pk__in=[item.pk for item in self.items]))
+ form.save(self.items, self.request.user)
+ return HttpResponseRedirect(reverse("success"))
+
class QADocumentForm(QAItemEditForm):
model = models.Document
diff --git a/ishtar_common/views_item.py b/ishtar_common/views_item.py
index 25ffe582e..8c1c0b734 100644
--- a/ishtar_common/views_item.py
+++ b/ishtar_common/views_item.py
@@ -170,7 +170,8 @@ def check_permission(request, action_slug, obj_id=None):
def new_qa_item(
- model, frm, many=False, template="ishtar/forms/qa_new_item.html", page_name=""
+ model, frm, many=False, template="ishtar/forms/qa_new_item.html", page_name="",
+ callback=None
):
def func(request, parent_name, limits=""):
model_name = model._meta.object_name
@@ -204,6 +205,8 @@ def new_qa_item(
if dct["parent_pk"] and "_select_" in dct["parent_pk"]:
parents = dct["parent_pk"].split("_")
dct["parent_pk"] = "_".join([parents[0]] + parents[2:])
+ if callback:
+ callback("new_qa_item", request, None, model.objects.filter(pk=new_item.pk))
return render(request, template, dct)
else:
dct["form"] = frm(limits=limits)
@@ -228,7 +231,7 @@ def get_short_html_detail(model):
return func
-def modify_qa_item(model, frm):
+def modify_qa_item(model, frm, callback=None):
def func(request, parent_name="", pk=None):
template = "ishtar/forms/qa_new_item.html"
model_name = model._meta.object_name
@@ -263,6 +266,8 @@ def modify_qa_item(model, frm):
if dct["parent_pk"] and "_select_" in dct["parent_pk"]:
parents = dct["parent_pk"].split("_")
dct["parent_pk"] = "_".join([parents[0]] + parents[2:])
+ if callback:
+ callback("modify_qa_item", request, None, model.objects.filter(pk=new_item.pk))
return render(request, template, dct)
else:
data = model_to_dict(item)
diff --git a/ishtar_common/wizards.py b/ishtar_common/wizards.py
index e8be78932..a3f6689dc 100644
--- a/ishtar_common/wizards.py
+++ b/ishtar_common/wizards.py
@@ -50,7 +50,8 @@ from django.utils.safestring import mark_safe
from ishtar_common import models, models_rest
from ishtar_common.forms import CustomForm, reverse_lazy
-from ishtar_common.utils import get_all_field_names, MultiValueDict, put_session_message
+from ishtar_common.utils import get_all_field_names, get_person_gdpr_log, MultiValueDict, \
+ put_session_message
logger = logging.getLogger(__name__)
@@ -1938,6 +1939,15 @@ class PersonWizard(Wizard):
wizard_done_window = reverse_lazy("show-person")
redirect_url = "person_modification"
+ def save_model(self, dct, m2m, whole_associated_models, form_list, return_object):
+ obj = self.get_current_saved_object()
+ creation = not obj
+ returned = super().save_model(dct, m2m, whole_associated_models, form_list, return_object)
+ q = self.model.objects.filter(pk=self.current_object.pk)
+ action = "new_item" if creation else "modify_item"
+ get_person_gdpr_log(action, self.request, "", q)
+ return returned
+
class PersonModifWizard(PersonWizard):
modification = True
@@ -1954,6 +1964,11 @@ class PersonDeletionWizard(MultipleDeletionWizard):
"final-person_deletion": "ishtar/wizard/wizard_person_deletion.html"
}
+ def done(self, form_list, **kwargs):
+ q = self.model.objects.filter(pk__in=[o.pk for o in self.get_current_objects()])
+ get_person_gdpr_log("delete_item", self.request, "", q)
+ return super().done(form_list, **kwargs)
+
class IshtarUserDeletionWizard(MultipleDeletionWizard):
model = models.IshtarUser