diff options
author | Étienne Loks <etienne.loks@iggdrasil.net> | 2024-01-18 11:36:19 +0100 |
---|---|---|
committer | Étienne Loks <etienne.loks@iggdrasil.net> | 2024-02-10 14:45:20 +0100 |
commit | 13cff2b11a06c9ed334c6bad96ce14a7e3d629d8 (patch) | |
tree | 97fa08bfda014a128ce7fe161695e8123b221cea | |
parent | 7293985b9d83bed4130a677521ef72d4759202c3 (diff) | |
download | Ishtar-13cff2b11a06c9ed334c6bad96ce14a7e3d629d8.tar.bz2 Ishtar-13cff2b11a06c9ed334c6bad96ce14a7e3d629d8.zip |
✨ GDPR: manage merge action, admin consultation, edition, delete, ✅ GDPR tests
-rw-r--r-- | ishtar_common/admin.py | 24 | ||||
-rw-r--r-- | ishtar_common/forms_common.py | 18 | ||||
-rw-r--r-- | ishtar_common/models.py | 9 | ||||
-rw-r--r-- | ishtar_common/tests.py | 87 | ||||
-rw-r--r-- | ishtar_common/utils.py | 35 | ||||
-rw-r--r-- | ishtar_common/views.py | 65 | ||||
-rw-r--r-- | ishtar_common/views_item.py | 9 | ||||
-rw-r--r-- | ishtar_common/wizards.py | 17 |
8 files changed, 219 insertions, 45 deletions
diff --git a/ishtar_common/admin.py b/ishtar_common/admin.py index 62faf3b77..50b6d05b3 100644 --- a/ishtar_common/admin.py +++ b/ishtar_common/admin.py @@ -72,7 +72,7 @@ from django import forms from ishtar_common import models, models_common, models_rest from ishtar_common.apps import admin_site from ishtar_common.model_merging import merge_model_objects -from ishtar_common.utils import get_cache, create_slug +from ishtar_common.utils import get_cache, create_slug, get_person_gdpr_log from ishtar_common import forms as common_forms, forms_common as other_common_forms from ishtar_common.serializers import restore_serialized, IMPORT_MODEL_LIST @@ -707,6 +707,28 @@ class PersonAdmin(HistorizedObjectAdmin): model = models.Person inlines = [ProfileInline] + def get_search_results(self, request, queryset, search_term): + page = int(request.GET.get("p", 0)) + slice = page * self.list_per_page, (page + 1) * self.list_per_page + get_person_gdpr_log("admin_person_consultation", request, None, queryset, slice) + return super().get_search_results(request, queryset, search_term) + + def response_change(self, request, obj): + get_person_gdpr_log("admin_person_modify", request, None, + self.model.objects.filter(pk=obj.pk)) + return super().response_change(request, obj) + + def changeform_view(self, request, object_id=None, form_url='', extra_context=None): + if request.method == "GET": + get_person_gdpr_log("admin_person_view", request, None, + self.model.objects.filter(pk=object_id)) + return super().changeform_view(request, object_id, form_url, extra_context) + + def delete_model(self, request, obj): + get_person_gdpr_log("admin_person_delete", request, None, + self.model.objects.filter(pk=obj.pk)) + super().delete_model(request, obj) + admin_site.register(models.Person, PersonAdmin) diff --git a/ishtar_common/forms_common.py b/ishtar_common/forms_common.py index 45b43000f..7f37e86ce 100644 --- a/ishtar_common/forms_common.py +++ b/ishtar_common/forms_common.py @@ -931,12 +931,13 @@ class MergeIntoForm(forms.Form): ) ) - def merge(self): + def merge(self, callback=None, request=None): model = self.associated_model try: main_item = model.objects.get(pk=self.cleaned_data["main_item"]) except model.DoesNotExist: return + items, items_pk = [], [main_item.pk] for pk in self.items: if pk == str(main_item.pk): continue @@ -944,6 +945,12 @@ class MergeIntoForm(forms.Form): item = model.objects.get(pk=pk) except model.DoesNotExist: continue + items.append(item) + items_pk.append(item.pk) + if callback: + queryset = models.Person.objects.filter(pk__in=items_pk) + callback("merge_person", request, "", queryset) + for item in items: main_item.merge(item) return main_item @@ -1725,9 +1732,9 @@ class MergeFormSet(BaseModelFormSet): self._cached_list = [] super(MergeFormSet, self).__init__(*args, **kwargs) - def merge(self): + def merge(self, callback=None, request=None): for form in self.initial_forms: - form.merge() + form.merge(callback=callback, request=request) def initial_form_count(self): """ @@ -1804,6 +1811,11 @@ class MergeForm(forms.ModelForm): from_item = getattr(self.instance, self.FROM_KEY) except ObjectDoesNotExist: return + callback = kwargs.get("callback", None) + if callback: + request = kwargs.get("request") + queryset = models.Person.objects.filter(pk__in=[to_item.pk, from_item.pk]) + callback("merge_person", request, "", queryset) if self.cleaned_data.get("a_is_duplicate_b"): to_item.merge(from_item) elif self.cleaned_data.get("b_is_duplicate_a"): diff --git a/ishtar_common/models.py b/ishtar_common/models.py index b9944e4bb..c978b087b 100644 --- a/ishtar_common/models.py +++ b/ishtar_common/models.py @@ -3211,11 +3211,12 @@ GDPR_ACTIVITY = ( ("PE", _("Exporting a person's notice")), ("PC", _("Person creation")), ("PM", _("Person modification")), + ("Pm", _("Person merge")), ("PD", _("Person deletion")), ("AC", _("Admin - Directory consultation")), - ("AE", _("Admin - Directory export")), ("AV", _("Admin - Person view")), ("AM", _("Admin - Person modification")), + ("AD", _("Admin - Person deletion")), ) GDPR_ACTIVITY_DICT = dict(GDPR_ACTIVITY) @@ -3299,8 +3300,10 @@ class GDPRLog(models.Model): if start is not None: values = values[start:end] for pk in values: - gdpr_persons.append(cls.persons.through(gdprperson_id=pk, gdprlog_id=log.pk)) - cls.persons.through.objects.bulk_create(gdpr_persons) + if pk: + gdpr_persons.append(cls.persons.through(gdprperson_id=pk, gdprlog_id=log.pk)) + if gdpr_persons: + cls.persons.through.objects.bulk_create(gdpr_persons) class ProfileType(GeneralType): diff --git a/ishtar_common/tests.py b/ishtar_common/tests.py index 4398ed762..6a2e9d3b0 100644 --- a/ishtar_common/tests.py +++ b/ishtar_common/tests.py @@ -2068,6 +2068,7 @@ class GDPRTest(TestCase): self.user = User.objects.create_superuser( self.username, "nomail@nomail.com", self.password ) + self.user.user_permissions.add(Permission.objects.get(codename="change_person")) self.person_1 = models.Person.objects.create( name="Boule", surname=" ", @@ -2104,19 +2105,81 @@ class GDPRTest(TestCase): def test_views(self): nb = models.GDPRLog.objects.count() - c = Client() - c.login(username=self.username, password=self.password) - self.settings(GDPR_LOGGING=True) - c.get(reverse("get-person")) - self.assertEqual(models.GDPRLog.objects.count(), nb + 1) - self.assertEqual(models.GDPRLog.objects.order_by('-pk')[0].activity, "DC") - c.get(reverse("get-person", args=["csv"])) - self.assertEqual(models.GDPRLog.objects.count(), nb + 2) - self.assertEqual(models.GDPRLog.objects.order_by('-pk')[0].activity, "DE") - response = c.get(reverse("show-person", args=[self.person_1.pk, ""])) - self.assertEqual(models.GDPRLog.objects.count(), nb + 3) - self.assertEqual(models.GDPRLog.objects.order_by('-pk')[0].activity, "PC") + with self.settings(GDPR_LOGGING=False): + c = Client() + c.login(username=self.username, password=self.password) + c.get(reverse("get-person")) + self.assertEqual(models.GDPRLog.objects.count(), nb) + with self.settings(GDPR_LOGGING=True): + c = Client() + c.login(username=self.username, password=self.password) + c.get(reverse("get-person")) + self.assertEqual(models.GDPRLog.objects.count(), nb + 1) + self.assertEqual(models.GDPRLog.objects.order_by('-pk')[0].activity, "DC") + c.get(reverse("get-person", args=["csv"])) + self.assertEqual(models.GDPRLog.objects.count(), nb + 2) + self.assertEqual(models.GDPRLog.objects.order_by('-pk')[0].activity, "DE") + c.get(reverse("show-person", args=[self.person_1.pk, ""])) + self.assertEqual(models.GDPRLog.objects.count(), nb + 3) + self.assertEqual(models.GDPRLog.objects.order_by('-pk')[0].activity, "PV") + c.get(reverse("show-person", args=[self.person_1.pk, "pdf"])) + self.assertEqual(models.GDPRLog.objects.count(), nb + 4) + self.assertEqual(models.GDPRLog.objects.order_by('-pk')[0].activity, "PE") + title = models.TitleType.objects.create(label="Mr", txt_idx='mr') + c.post( + reverse("person-qa-bulk-update-confirm", args=[str(self.person_1.pk)]), + {"qa_title": title.pk}, + ) + self.assertEqual(models.GDPRLog.objects.count(), nb + 5) + self.assertEqual(models.GDPRLog.objects.order_by('-pk')[0].activity, "PM") + # deletion + url = "/person_deletion/" + step = "selec-person_deletion" + data = { + "person_deletionperson_deletion_wizard-current_step": [step], + f"{step}-pks": [str(self.person_3.pk)] + } + c.post(url, data) + data = { + "person_deletionperson_deletion_wizard-current_step": ["final-person_deletion"], + } + c.post(url, data, follow=True) + self.assertEqual(models.GDPRLog.objects.count(), nb + 6) + self.assertEqual(models.GDPRLog.objects.order_by('-pk')[0].activity, "PD") + # auto-merge + url = f"/person-manual-merge-items/{self.person_1.pk}_{self.person_2.pk}/" + c.post(url, {"main_item": self.person_1.pk}, follow=True) + self.assertEqual(models.GDPRLog.objects.count(), nb + 7) + self.assertEqual(models.GDPRLog.objects.order_by('-pk')[0].activity, "Pm") + # merge + merge_model = models.Person.merge_candidate.through + ln = merge_model.objects.count() + self.person_4 = models.Person.objects.create( + name="George", history_modifier=self.user, + ) + self.person_3.save() + self.person_4.save() + self.assertTrue(merge_model.objects.count(), ln + 1) + data = { + "form-0-b_is_duplicate_a": True, + "form-0-id": list(merge_model.objects.all())[-1].pk, + "form-TOTAL_FORMS": 1, + "form-INITIAL_FORMS": 1 + } + c.post("/person-merge/", data) + self.assertEqual(models.GDPRLog.objects.count(), nb + 8) + self.assertEqual(models.GDPRLog.objects.order_by('-pk')[0].activity, "Pm") + # admin consultation + c.get("/admin/ishtar_common/person/") + self.assertEqual(models.GDPRLog.objects.count(), nb + 9) + self.assertEqual(models.GDPRLog.objects.order_by('-pk')[0].activity, "AC") + c.get(f"/admin/ishtar_common/person/{self.person_1.pk}/change/") + self.assertEqual(models.GDPRLog.objects.count(), nb + 10) + self.assertEqual(models.GDPRLog.objects.order_by('-pk')[0].activity, "AV") + c.post(f"/admin/ishtar_common/person/{self.person_1.pk}/delete/", {"post": "yes"}) + self.assertEqual(models.GDPRLog.objects.count(), nb + 11) + self.assertEqual(models.GDPRLog.objects.order_by('-pk')[0].activity, "AD") def test_create_log_performance(self): persons = [] diff --git a/ishtar_common/utils.py b/ishtar_common/utils.py index cf56ea705..3081d488c 100644 --- a/ishtar_common/utils.py +++ b/ishtar_common/utils.py @@ -507,8 +507,6 @@ class OwnPerms: return q - - def update_data(data, new_data, merge=False): """ Update a data directory taking account of key detail @@ -619,11 +617,11 @@ class MultiValueDict(BaseMultiValueDict): return lst -def is_downloadable(url): +def is_downloadable(curl): """ Does the url contain a downloadable resource """ - h = requests.head(url, allow_redirects=True) + h = requests.head(curl, allow_redirects=True) header = h.headers content_type = header.get("content-type") if "text" in content_type.lower(): @@ -735,6 +733,35 @@ def deserialize_args_for_tasks(sender, kwargs, extra_kwargs=None): return sender, instance +def get_person_gdpr_log(view_name, request, data_type, queryset, slice_query=None): + if not settings.GDPR_LOGGING: + return + if view_name == "get_item": + activity = "DE" if data_type == "csv" else "DC" + elif view_name == "show_item": + activity = "PV" if not data_type else "PE" + elif view_name in ("new_qa_item", "new_item"): + activity = "PC" + elif view_name in ("modify_qa_item", "modify_item"): + activity = "PM" + elif view_name == "delete_item": + activity = "PD" + elif view_name == "merge_person": + activity = "Pm" + elif view_name == "admin_person_consultation": + activity = "AC" + elif view_name == "admin_person_view": + activity = "AV" + elif view_name == "admin_person_modify": + activity = "AM" + elif view_name == "admin_person_delete": + activity = "AD" + else: + return + GDPRLog = apps.get_model("ishtar_common", "GDPRLog") + GDPRLog.create_log(request, activity, queryset, slice_query) + + EXTRA_KWARGS_TRIGGER = [ "_cascade_change", "_cached_labels_bulk_update", diff --git a/ishtar_common/views.py b/ishtar_common/views.py index c5ab88efa..d5b2f369f 100644 --- a/ishtar_common/views.py +++ b/ishtar_common/views.py @@ -77,6 +77,7 @@ from ishtar_common.utils import ( clean_session_cache, CSV_OPTIONS, get_field_labels_from_path, + get_person_gdpr_log, get_random_item_image_link, shortify, dict_to_tuple, @@ -1092,11 +1093,13 @@ def autocomplete_author(request): return HttpResponse(data, content_type="text/plain") -new_person = new_qa_item(models.Person, forms.PersonForm, page_name=_("New person")) -modify_person = modify_qa_item(models.Person, forms.PersonForm) +new_person = new_qa_item(models.Person, forms.PersonForm, page_name=_("New person"), + callback=get_person_gdpr_log) +modify_person = modify_qa_item(models.Person, forms.PersonForm, callback=get_person_gdpr_log) detail_person = get_short_html_detail(models.Person) new_person_noorga = new_qa_item( - models.Person, forms.NoOrgaPersonForm, page_name=_("New person") + models.Person, forms.NoOrgaPersonForm, page_name=_("New person"), + callback=get_person_gdpr_log ) new_organization = new_qa_item( models.Organization, forms.OrganizationForm, page_name=_("New organization") @@ -1107,17 +1110,6 @@ modify_organization = modify_qa_item(models.Organization, forms.OrganizationForm detail_organization = get_short_html_detail(models.Organization) new_author = new_qa_item(models.Author, forms.AuthorForm, page_name=_("New author")) - -def get_person_gdpr_log(view_name, request, data_type, queryset, slice_query=None): - if view_name == "get_item": - activity = "DE" if data_type == "csv" else "DC" - elif view_name == "show_item": - activity = "PC" if not data_type else "PC" - else: - return - models.GDPRLog.create_log(request, activity, queryset, slice_query) - - show_person = show_item(models.Person, "person", callback=get_person_gdpr_log) get_person = get_item(models.Person, "get_person", "person", callback=get_person_gdpr_log) @@ -1170,7 +1162,7 @@ def reset_wizards(request): ITEM_PER_PAGE = 20 -def merge_action(model, form, key, name_key="name"): +def merge_action(model, form, key, name_key="name", callback=None): def merge(request, page=1): current_url = key + "_merge" if not page: @@ -1207,7 +1199,7 @@ def merge_action(model, form, key, name_key="name"): if request.method == "POST": context["formset"] = FormSet(request.POST, queryset=queryset) if context["formset"].is_valid(): - context["formset"].merge() + context["formset"].merge(callback=callback, request=request) return redirect(reverse(current_url, kwargs={"page": page})) else: context["formset"] = FormSet(queryset=queryset) @@ -1234,7 +1226,7 @@ def regenerate_external_id(request): return HttpResponseRedirect(reverse("success")) -person_merge = merge_action(models.Person, forms.MergePersonForm, "person") +person_merge = merge_action(models.Person, forms.MergePersonForm, "person", callback=get_person_gdpr_log) organization_merge = merge_action( models.Organization, forms.MergeOrganizationForm, "organization" ) @@ -2380,6 +2372,12 @@ class PersonCreate(LoginRequiredMixin, CreateView): form_class = forms.BasePersonForm template_name = "ishtar/person_form.html" + def form_valid(self, form): + returned = super().form_valid(form) + get_person_gdpr_log("new_item", self.request, None, + self.model.objects.filter(pk=self.object.pk)) + return returned + def get_success_url(self): return reverse("person_edit", args=[self.object.pk]) @@ -2389,6 +2387,12 @@ class PersonEdit(LoginRequiredMixin, UpdateView): form_class = forms.BasePersonForm template_name = "ishtar/person_form.html" + def form_valid(self, form): + returned = super().form_valid(form) + get_person_gdpr_log("modify_item", self.request, None, + self.model.objects.filter(pk=self.object.pk)) + return returned + def get_success_url(self): return reverse("person_edit", args=[self.object.pk]) @@ -2412,14 +2416,19 @@ class PersonManualMerge(ManualMergeMixin, IshtarMixin, LoginRequiredMixin, FormV redir_url = "person_manual_merge_items" -class ManualMergeItemsMixin(object): +class ManualMergeItemsMixin: + item_type = None + def get_form_kwargs(self): kwargs = super(ManualMergeItemsMixin, self).get_form_kwargs() kwargs["items"] = self.kwargs["pks"].split("_") return kwargs def form_valid(self, form): - self.item = form.merge() + callback = None + if self.item_type: + callback = get_person_gdpr_log + self.item = form.merge(callback=callback, request=self.request) return super(ManualMergeItemsMixin, self).form_valid(form) def get_success_url(self): @@ -2499,6 +2508,12 @@ class OrganizationPersonCreate(LoginRequiredMixin, CreateView): def get_success_url(self): return reverse("organization_person_edit", args=[self.object.pk]) + def form_valid(self, form): + returned = super().form_valid(form) + get_person_gdpr_log("new_item", self.request, None, + self.model.objects.filter(pk=self.object.pk)) + return returned + class OrganizationPersonEdit(LoginRequiredMixin, UpdateView): model = models.Person @@ -2514,6 +2529,12 @@ class OrganizationPersonEdit(LoginRequiredMixin, UpdateView): def get_success_url(self): return reverse("organization_person_edit", args=[self.object.pk]) + def form_valid(self, form): + returned = super().form_valid(form) + get_person_gdpr_log("modify_item", self.request, None, + self.model.objects.filter(pk=self.object.pk)) + return returned + # documents @@ -3033,6 +3054,12 @@ class QAPersonForm(QAItemEditForm): model = models.Person form_class = forms.QAPersonFormMulti + def form_save(self, form): + get_person_gdpr_log("modify_item", self.request, None, + self.model.objects.filter(pk__in=[item.pk for item in self.items])) + form.save(self.items, self.request.user) + return HttpResponseRedirect(reverse("success")) + class QADocumentForm(QAItemEditForm): model = models.Document diff --git a/ishtar_common/views_item.py b/ishtar_common/views_item.py index a56cd1ad6..5d29a9850 100644 --- a/ishtar_common/views_item.py +++ b/ishtar_common/views_item.py @@ -170,7 +170,8 @@ def check_permission(request, action_slug, obj_id=None): def new_qa_item( - model, frm, many=False, template="ishtar/forms/qa_new_item.html", page_name="" + model, frm, many=False, template="ishtar/forms/qa_new_item.html", page_name="", + callback=None ): def func(request, parent_name, limits=""): model_name = model._meta.object_name @@ -204,6 +205,8 @@ def new_qa_item( if dct["parent_pk"] and "_select_" in dct["parent_pk"]: parents = dct["parent_pk"].split("_") dct["parent_pk"] = "_".join([parents[0]] + parents[2:]) + if callback: + callback("new_qa_item", request, None, model.objects.filter(pk=new_item.pk)) return render(request, template, dct) else: dct["form"] = frm(limits=limits) @@ -228,7 +231,7 @@ def get_short_html_detail(model): return func -def modify_qa_item(model, frm): +def modify_qa_item(model, frm, callback=None): def func(request, parent_name="", pk=None): template = "ishtar/forms/qa_new_item.html" model_name = model._meta.object_name @@ -263,6 +266,8 @@ def modify_qa_item(model, frm): if dct["parent_pk"] and "_select_" in dct["parent_pk"]: parents = dct["parent_pk"].split("_") dct["parent_pk"] = "_".join([parents[0]] + parents[2:]) + if callback: + callback("modify_qa_item", request, None, model.objects.filter(pk=new_item.pk)) return render(request, template, dct) else: data = model_to_dict(item) diff --git a/ishtar_common/wizards.py b/ishtar_common/wizards.py index e8be78932..a3f6689dc 100644 --- a/ishtar_common/wizards.py +++ b/ishtar_common/wizards.py @@ -50,7 +50,8 @@ from django.utils.safestring import mark_safe from ishtar_common import models, models_rest from ishtar_common.forms import CustomForm, reverse_lazy -from ishtar_common.utils import get_all_field_names, MultiValueDict, put_session_message +from ishtar_common.utils import get_all_field_names, get_person_gdpr_log, MultiValueDict, \ + put_session_message logger = logging.getLogger(__name__) @@ -1938,6 +1939,15 @@ class PersonWizard(Wizard): wizard_done_window = reverse_lazy("show-person") redirect_url = "person_modification" + def save_model(self, dct, m2m, whole_associated_models, form_list, return_object): + obj = self.get_current_saved_object() + creation = not obj + returned = super().save_model(dct, m2m, whole_associated_models, form_list, return_object) + q = self.model.objects.filter(pk=self.current_object.pk) + action = "new_item" if creation else "modify_item" + get_person_gdpr_log(action, self.request, "", q) + return returned + class PersonModifWizard(PersonWizard): modification = True @@ -1954,6 +1964,11 @@ class PersonDeletionWizard(MultipleDeletionWizard): "final-person_deletion": "ishtar/wizard/wizard_person_deletion.html" } + def done(self, form_list, **kwargs): + q = self.model.objects.filter(pk__in=[o.pk for o in self.get_current_objects()]) + get_person_gdpr_log("delete_item", self.request, "", q) + return super().done(form_list, **kwargs) + class IshtarUserDeletionWizard(MultipleDeletionWizard): model = models.IshtarUser |