#!/usr/bin/env python3 # -*- coding: utf-8 -*- # Copyright (C) 2013-2025 Étienne Loks # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # See the file COPYING for details. from cairosvg import svg2png from csv import QUOTE_ALL import datetime import feedparser from functools import wraps from guardian.exceptions import WrongAppError from itertools import chain from inspect import currentframe import json import logging import hashlib from importlib import import_module import io from jinja2 import Template from jinja2.filters import FILTERS, environmentfilter import locale import math import numpy import os import pytz import random import re import requests from secretary import Renderer as MainSecretaryRenderer, UndefinedSilently import shutil import string # nosec: no user input import subprocess # nosec import sys import tempfile import time import uuid import xmltodict import zipfile from django import forms from django.apps import apps from django.conf import settings from django.conf.urls import url from django.contrib.auth.models import Permission, User, Group from django.contrib.auth.hashers import Argon2PasswordHasher as BaseArgon2PasswordHasher from django.contrib.contenttypes.models import ContentType from django.contrib.gis.geos import GEOSGeometry from django.contrib.sessions.backends.db import SessionStore from django.core.cache import cache from django.core.exceptions import SuspiciousOperation, ObjectDoesNotExist, \ ValidationError from django.core.files import File from django.core.files.storage import FileSystemStorage from django.core.validators import EMPTY_VALUES, MaxValueValidator from django.db import models from django.db.models import Q from django.db.models.functions import Length from django.http import HttpResponseRedirect from django.urls import reverse, NoReverseMatch from django.utils.crypto import get_random_string from django.utils.datastructures import MultiValueDict as BaseMultiValueDict from django.utils.formats import date_format from django.utils.safestring import mark_safe from django.template.defaultfilters import slugify from .jinja_filters import capfirst_filter, capitalize_filter, \ euro_format, float_format, human_date_filter, lowerfirst_filter, \ number_to_words, replace_line_breaks if settings.USE_TRANSLATION_OVERLOAD: from overload_translation.utils import ( ugettext_lazy, ugettext, pgettext_lazy, pgettext, ) else: from django.utils.translation import ( ugettext_lazy, ugettext, pgettext_lazy, pgettext, ) _ = ugettext_lazy logger = logging.getLogger(__name__) def dict_to_tuple(dct): values = [] for k, v in dct.items(): if isinstance(v, dict): v = dict_to_tuple(v) values.append((k, v)) return tuple(values) def debug_line_no(): return currentframe().f_back.f_lineno class InlineClass: """ Dynamic class used in templates """ def __init__(self, dct): for k in dct: setattr(self, k, dct[k]) def toJSON(self): return json.dumps( self, default=lambda o: o.__dict__, sort_keys=True, indent=4 ) def json_used_equipments(value): res = [] for gp in value: res.append(gp[:4] + [cost.toJSON() for cost in gp[-1]]) return res JSON_SERIALIZATION = { "used_equipments": json_used_equipments } def fake_task(*args): def fake(func): return func return fake task = fake_task celery_app = None if settings.USE_BACKGROUND_TASK: try: from celery import shared_task task = shared_task celery_app = getattr( import_module(settings.ROOT_PATH.split("/")[-2] + ".celery_app"), "app" ) except ModuleNotFoundError: pass @task() def test_task(arg): print("Running task with priority: {}".format(arg)) time.sleep(3) class BColors: """ Bash colors. Don't forget to finish your colored string with ENDC. """ HEADER = "\033[95m" OKBLUE = "\033[94m" OKGREEN = "\033[92m" WARNING = "\033[93m" FAIL = "\033[91m" ENDC = "\033[0m" BOLD = "\033[1m" UNDERLINE = "\033[4m" @classmethod def format(cls, color, value): if not hasattr(cls, color): return value return f"{getattr(cls, color)}{value}{cls.ENDC}" class Round(models.Func): function = "ROUND" arity = 2 arg_joiner = "::numeric, " CSV_OPTIONS = {"delimiter": ",", "quotechar": '"', "quoting": QUOTE_ALL} def is_safe_path(basedir, path, follow_symlinks=True): # resolves symbolic links if follow_symlinks: return os.path.realpath(path).startswith(basedir) return os.path.abspath(path).startswith(basedir) def import_class(full_path_classname): """ Return the model class from the full path """ mods = full_path_classname.split(".") if len(mods) == 1: mods = ["ishtar_common", "models", mods[0]] elif ( "models" not in mods and "models_finds" not in mods and "models_treatments" not in mods ): raise SuspiciousOperation("Try to import a non model from a string") module = import_module(".".join(mods[:-1])) model = getattr(module, mods[-1], None) if not model: return if not issubclass(model, models.Model): raise SuspiciousOperation("Try to import a non model from a string") return model def check_permissions(permissions=None, redirect_url="/"): """ Decorator that checks the rights to access the view. """ def decorator(view_func): def _wrapped_view(request, *args, **kwargs): if not permissions: return view_func(request, *args, **kwargs) if hasattr(request.user, "ishtaruser"): ishtaruser = request.user.ishtaruser if ishtaruser.has_permission("ishtaradmin"): kwargs["current_right"] = "administrator" return view_func(request, *args, **kwargs) for permission in permissions: # be careful to put the more permissive rights first # if granted it can allow more if ishtaruser.has_permission(permission): kwargs["current_right"] = permission return view_func(request, *args, **kwargs) msg = str(_("You don't have sufficient permissions to do this action.")) groups = Group.objects.filter(permissions__codename__in=[ perm.split(".")[-1] for perm in permissions ]) if groups.count(): msg += "
" + str( _("If this is unexpected, the profile type attached to yours account have to add one of this group: {}. Ask the administrator of your instance.") ).format( ", ".join(["" + str(g) + "" for g in groups]) ) put_session_message( request.session.session_key, msg, "warning", ) return HttpResponseRedirect(redirect_url) return _wrapped_view return decorator def check_permissions_condition(permissions): """ To be used to check in wizard condition_dict """ def func(self): request = self.request ishtaruser = request.user.ishtaruser if ishtaruser.has_permission("ishtaradmin"): return True for permission in permissions: if ishtaruser.has_permission(permission): return True return False return func def check_model_access_control(request, model, available_perms=None): """ Check access control to a model for a specific request :param request: the current request :param model: the concerned model :param available_perms: specific permissions to check if not specified "view" and "view_own" will be checked :return: (allowed, own) tuple """ own = True # more restrictive by default allowed = False if not request.user.is_authenticated: return allowed, own if not available_perms: available_perms = [ "view_" + model.__name__.lower(), "view_own_" + model.__name__.lower(), ] try: ishtaruser = request.user.ishtaruser except request.user._meta.model.ishtaruser.RelatedObjectDoesNotExist: return False, True if ishtaruser.has_permission("ishtaradmin"): allowed = True own = False return allowed, own app_name = model._meta.app_label for perm in available_perms: if ishtaruser.has_permission(f"{app_name}.{perm}"): allowed = True if "_own_" not in perm: own = False break # max right reach return allowed, own class SheetItem: SHOW_URL = "" SHEET_ALTERNATIVES = [] # list tuple: (key checked in profile, sheet name) def get_show_url(self): show_url = self.SHOW_URL if not show_url: show_url = "show-" + self.__class__.__name__.lower() try: return reverse(show_url, args=[self.pk, ""]) except NoReverseMatch: return def get_current_item_keys(): return ( ("file", apps.get_model("archaeological_files", "File")), ("operation", apps.get_model("archaeological_operations", "Operation")), ("site", apps.get_model("archaeological_operations", "ArchaeologicalSite")), ("contextrecord", apps.get_model("archaeological_context_records", "ContextRecord")), ("warehouse", apps.get_model("archaeological_warehouse", "Warehouse")), ("container", apps.get_model("archaeological_warehouse", "Container")), ("find", apps.get_model("archaeological_finds", "Find")), ("findbasket", apps.get_model("archaeological_finds", "FindBasket")), ("treatmentfile", apps.get_model("archaeological_finds", "TreatmentFile")), ("treatment", apps.get_model("archaeological_finds", "Treatment")), ("administrativeact", apps.get_model("archaeological_operations", "AdministrativeAct")), ("administrativeactop", apps.get_model("archaeological_operations", "AdministrativeAct")), ("administrativeactfile", apps.get_model("archaeological_operations", "AdministrativeAct")), ("administrativeacttreatment", apps.get_model("archaeological_operations", "AdministrativeAct")), ("administrativeacttreatmentfile", apps.get_model("archaeological_operations", "AdministrativeAct")), ) def get_current_item_keys_dict(): return dict(get_current_item_keys()) API_APP_CONTENT_TYPES = [ ("archaeological_operations", "operation"), ("archaeological_context_records", "contextrecord"), ("archaeological_finds", "find"), ("archaeological_warehouse", "warehouse"), ("archaeological_files", "file"), ] API_MAIN_CONTENT_TYPES = API_APP_CONTENT_TYPES + [ ("archaeological_operations", "archaeologicalsite"), ("archaeological_warehouse", "container"), ] API_MAIN_MODELS = dict( [(model_name, app_name) for app_name, model_name in API_MAIN_CONTENT_TYPES] ) class HistoryError(Exception): def __init__(self, value): self.value = value def __str__(self): return repr(self.value) class SearchAltName(object): def __init__( self, search_key, search_query, extra_query=None, distinct_query=False, related_name=None ): self.search_key = search_key self.search_query = search_query self.extra_query = extra_query or {} self.distinct_query = distinct_query self.related_name = related_name GENERAL_TYPE_PREFIX = { "prefix": "│ ", "prefix_empty": "  ", "prefix_medium": "├ ", "prefix_last": "└ ", "prefix_codes": ["\u2502", "\u251C", "\u2514"] } class OwnPerms: """ Manage special permissions for object's owner """ UPPER_PERMISSIONS = [] @classmethod def _has_permission_query_for_upper_permissions( cls, base_permissions, model, user_id): ProfileType = apps.get_model("ishtar_common", "ProfileType") permissions = list(set([ "_".join(permission.codename.split("_")[:-1]) + f"_{model._meta.model_name}" for permission in base_permissions ])) q = ProfileType.objects.filter( user_profiles__person__ishtaruser=user_id, groups__permissions__codename__in=permissions ) return q, permissions @classmethod def get_ids_from_upper_permissions(cls, user_id, content_type): if not cls.UPPER_PERMISSIONS: return [] UserObjectPermission = apps.get_model( "guardian", "UserObjectPermission" ) item_ids = [] try: full_permission = Permission.objects.get( codename=f"view_{content_type.model}", content_type=content_type ) except Permission.DoesNotExist: full_permission = None try: base_permission = Permission.objects.get( codename=f"view_own_{content_type.model}", content_type=content_type ) except Permission.DoesNotExist: base_permission = None for model, attr in cls.UPPER_PERMISSIONS: if isinstance(model, tuple): app_label, model_name = model model = apps.get_model(app_label, model_name) # check if has full permission q_full, __ = cls._has_permission_query_for_upper_permissions( [full_permission], model, user_id ) has_full_permission = bool(q_full.count()) if has_full_permission: if attr.startswith("q_"): # use a property item_ids += getattr(cls, f"has_{attr}")().values_list( "pk", flat=True ) else: item_ids += cls.objects.filter( **{f"{attr}__isnull": False} ).values_list("pk", flat=True) continue q, permissions = cls._has_permission_query_for_upper_permissions( [base_permission], model, user_id ) lst = [] if not q.count(): # no permissions associated for upstream model get direct attachement lst = model.objects.filter( ishtar_users__pk=user_id ).values_list("pk", flat=True) else: perms = [] for codename in permissions: perms += [ perm for perm in Permission.objects.filter( codename=codename).all() ] lst = [] for permission in perms: lst += list( UserObjectPermission.objects.filter( permission=permission, user_id=user_id ).values_list("object_pk", flat=True) ) if attr.startswith("q_"): # use a property item_ids += getattr(cls, attr)(lst).values_list( "pk", flat=True ) else: item_ids += cls.objects.filter( **{f"{attr}__in": lst} ).values_list("pk", flat=True) return list(set(item_ids)) @classmethod def get_query_owns(cls, ishtaruser): """ Query object to get own items """ return None # implement for each object def can_add(self, request): meta = self.__class__._meta return self.can_do( request, "add", app=meta.app_label, model_name=meta.model_name ) def can_view(self, request): meta = self.__class__._meta return self.can_do( request, "view", app=meta.app_label, model_name=meta.model_name ) def can_change(self, request): return self.can_edit(request) def can_edit(self, request): meta = self.__class__._meta return self.can_do( request, "change", app=meta.app_label, model_name=meta.model_name ) def can_delete(self, request): meta = self.__class__._meta return self.can_do( request, "delete", app=meta.app_label, model_name=meta.model_name ) def can_do(self, request, permission, app=None, model_name=None): """ Check permission availability for the current object. :param request: request object :param permission: action name eg: "archaelogical_finds.change_find" - "own" variation is checked - can provide only simple permission (e.g. "change") if app and model_name are provided :param app: application name (if permission not fully provided) :param model_name: model name (if permission not fully provided) :return: boolean """ if not getattr(request.user, "ishtaruser", None): return False if (app and not model_name) or (not app and model_name): return False if not app: app, perm = permission.split(".") p = perm.split("_") permission = p[0] model_name = ('_').join(p[1:]) if model_name == "findbasket": model_name = "find" ishtaruser = request.user.ishtaruser full_permission = f"{app}.{permission}_{model_name}" if ishtaruser.has_permission(full_permission): return True own = f"{app}.{permission}_own_{model_name}" try: return ishtaruser.has_permission(own, self) except WrongAppError: # normaly occurs when, for instance, add doc permission is required # for an item with document attached but the item is not a document. # own permission is irrelevant: return False return False def is_own(self, user, alt_query_own=None): """ Check if the current object is owned by the user """ print("ishtar_common/utils.py - 377 - DELETE") raise IshtarUser = apps.get_model("ishtar_common", "IshtarUser") if isinstance(user, IshtarUser): ishtaruser = user elif hasattr(user, "ishtaruser"): ishtaruser = user.ishtaruser else: return False if not alt_query_own: query = self.get_query_owns(ishtaruser) else: query = getattr(self, alt_query_own)(ishtaruser) if not query: return False query &= Q(pk=self.pk) return self.__class__.objects.filter(query).exists() @classmethod def has_item_of(cls, user): """ Check if the user own some items """ print("ishtar_common/utils.py - 392 - DELETE") IshtarUser = apps.get_model("ishtar_common", "IshtarUser") if isinstance(user, IshtarUser): ishtaruser = user elif hasattr(user, "ishtaruser"): ishtaruser = user.ishtaruser else: return False query = cls.get_query_owns(ishtaruser) if not query: return False return cls.objects.filter(query).count() @classmethod def _return_get_owns( cls, owns, values, get_short_menu_class, label_key="cached_label" ): if not owns: return [] sorted_values = [] if hasattr(cls, "BASKET_MODEL"): owns_len = len(owns) for idx, item in enumerate(reversed(owns)): if get_short_menu_class: item = item[0] if type(item) == cls.BASKET_MODEL: basket = owns.pop(owns_len - idx - 1) sorted_values.append(basket) sorted_values = list(reversed(sorted_values)) if not values: if not get_short_menu_class: return sorted_values + list( sorted(owns, key=lambda x: getattr(x, label_key) or "") ) return sorted_values + list( sorted(owns, key=lambda x: getattr(x[0], label_key) or "") ) if not get_short_menu_class: return sorted_values + list(sorted(owns, key=lambda x: x[label_key] or "")) return sorted_values + list(sorted(owns, key=lambda x: x[0][label_key] or "")) @classmethod def get_owns( cls, user, replace_query=None, limit=None, values=None, get_short_menu_class=False, menu_filtr=None, no_auth_check=False, query=False ): """ Get Own items """ return_query = query query = None if not replace_query: replace_query = {} if hasattr(user, "is_authenticated") and not user.is_authenticated \ and not no_auth_check: returned = cls.objects.filter(pk__isnull=True) if values: returned = [] return returned IshtarUser = apps.get_model("ishtar_common", "IshtarUser") if isinstance(user, User): try: ishtaruser = IshtarUser.objects.get(user_ptr=user) except IshtarUser.DoesNotExist: returned = cls.objects.filter(pk__isnull=True) if values: returned = [] return returned elif isinstance(user, IshtarUser): ishtaruser = user else: if values: return [] return cls.objects.filter(pk__isnull=True) items = [] if hasattr(cls, "BASKET_MODEL"): items = list(cls.BASKET_MODEL.objects.filter(user=ishtaruser).all()) query = cls.get_query_owns(ishtaruser) if not query and not replace_query: returned = cls.objects.filter(pk__isnull=True) if values: returned = [] return returned if return_query: if query: return query return replace_query if query: q = cls.objects.filter(query) else: # replace_query q = cls.objects.filter(replace_query) if values: q = q.values(*values) if limit: items += list(q.order_by("-pk")[:limit]) else: items += list(q.order_by(*cls._meta.ordering).all()) if get_short_menu_class: if values: if "id" not in values: raise NotImplementedError( "Call of get_owns with get_short_menu_class option and" " no 'id' in values is not implemented" ) my_items = [] for i in items: if hasattr(cls, "BASKET_MODEL") and type(i) == cls.BASKET_MODEL: dct = dict([(k, getattr(i, k)) for k in values]) my_items.append( (dct, cls.BASKET_MODEL.get_short_menu_class(i.pk)) ) else: my_items.append((i, cls.get_short_menu_class(i["id"]))) items = my_items else: items = [(i, cls.get_short_menu_class(i.pk)) for i in items] return items @classmethod def _get_query_owns_dicts(cls, ishtaruser): """ List of query own dict to construct the query. Each dict is joined with an AND operator, each dict key, values are joined with OR operator """ return [] @classmethod def _construct_query_own(cls, model, prefix, dct_list): q = None for subquery_dict in dct_list: subquery = None for k in subquery_dict: subsubquery = Q(**{prefix + k: subquery_dict[k]}) # slice in multiple queries in order to prevent too complicated queries subsubquery = Q(id__in=( model.objects.filter(subsubquery).values_list("id", flat=True) ) ) if subquery: subquery |= subsubquery else: subquery = subsubquery if not subquery: continue if q: q &= subquery else: q = subquery return q def update_data(data, new_data, merge=False): """ Update a data directory taking account of key detail """ res = {} if not isinstance(data, dict) or not isinstance(new_data, dict): if new_data and not data: return new_data if not merge: if new_data: return new_data return data if new_data and new_data != data: return data + " ; " + new_data return data for k in data: if k not in new_data: res[k] = data[k] else: res[k] = update_data(data[k], new_data[k], merge=merge) for k in new_data: if k not in data: res[k] = new_data[k] return res def generate_dict_from_data_string(key: str, value: str) -> dict: """ "data__key1__key2", value -> {"key1": {"key2": value}} """ full_dct = dct = {} keys = key[len("data__"):].split("__") for k in keys[:-1]: # do not get the last key dct[k] = {} dct = dct[k] dct[keys[-1]] = value return full_dct def generate_dict_from_list(lst: list, value) -> dict: """ ("key1", "key2", "key3"), value -> {"key1": {"key2": {"key3": value}}} """ dct = {} for key in reversed(lst): if not dct: dct = {key: value} else: dct = {key: dct} return dct def move_dict_data(data, key1, key2): """ Move key1 value to key2 value in a data dict :param data: data dict (with subdicts) :param key1: key to move (with __ notation for hierarchy - begining with "data__") :param key2: target key (with __ notation for hierarchy - begining with "data__") :return: result data """ keys1 = key1.split("__") keys2 = key2.split("__") value = data for idx, key in enumerate(keys1): if not idx: if key != "data": return data continue if key not in value: return data if idx == (len(keys1) - 1): # last value = value.pop(key) # remove from data else: value = value[key] new_value = data for idx, key in enumerate(keys2): if not idx: if key != "data": return data continue if idx == (len(keys2) - 1): # last new_value[key] = value else: if key not in new_value: new_value[key] = {} new_value = new_value[key] return data def clean_empty_data(data): """ Clean empty branches of a data dict """ for key in data.keys(): if data[key] in [{}, None, ""]: data.pop(key) continue if isinstance(data[key], dict): data[key] = clean_empty_data(data[key]) return data class MultiValueDict(BaseMultiValueDict): def get(self, *args, **kwargs): v = super(MultiValueDict, self).getlist(*args, **kwargs) if callable(v): v = v() if type(v) in (list, tuple) and len(v) > 1: v = ",".join(v) elif type(v) not in (int, str): v = super(MultiValueDict, self).get(*args, **kwargs) return v def getlist(self, *args, **kwargs): lst = super(MultiValueDict, self).getlist(*args, **kwargs) if type(lst) not in (tuple, list): lst = [lst] return lst def is_downloadable(curl): """ Does the url contain a downloadable resource """ h = requests.head(curl, allow_redirects=True) header = h.headers content_type = header.get("content-type") if "text" in content_type.lower(): return False if "html" in content_type.lower(): return False return True def get_file_from_link(file_link): """ return filename and temp_file object from a web link """ try: request = requests.get(file_link, stream=True) except requests.exceptions.RequestException: raise ValueError() ntf = tempfile.NamedTemporaryFile() for block in request.iter_content(1024 * 8): if not block: break ntf.write(block) file_name = file_link.split("/")[-1] return file_name, ntf def get_current_year(): return datetime.datetime.now().year def get_cache(cls, extra_args=tuple(), app_label=None): if not app_label: app_label = cls._meta.app_label cache_key = "{}-{}-{}".format(settings.PROJECT_SLUG, app_label, cls.__name__) for arg in extra_args: if not arg: cache_key += "-0" else: if type(arg) == dict: cache_key += "-" + "_".join([str(arg[k]) for k in arg]) elif type(arg) in (list, tuple): cache_key += "-" + "_".join([str(v) for v in arg]) else: cache_key += "-" + str(arg) cache_key = slugify(cache_key) if not cache_key.endswith("_current_keys") and hasattr( cls, "_add_cache_key_to_refresh" ): cls._add_cache_key_to_refresh(extra_args) if len(cache_key) >= 250: # nosec: used for a cache key no consequence if predictable m = hashlib.md5() # nosec m.update(cache_key.encode("utf-8")) cache_key = m.hexdigest() return cache_key, cache.get(cache_key) def force_cached_label_changed(sender, **kwargs): if not kwargs.get("instance"): return kwargs["instance"]._cached_label_checked = False cached_label_changed(sender, **kwargs) class SecretaryRenderer(MainSecretaryRenderer): def _pack_document(self, files): """ Overload _pack_document: obsolete files can be referenced - continue on null content for files """ self.log.debug("packing document") zip_file = io.BytesIO() zipdoc = zipfile.ZipFile(zip_file, "a") for fname, content in files.items(): if isinstance(content, UndefinedSilently): continue if sys.version_info >= (2, 7): zipdoc.writestr(fname, content, zipfile.ZIP_DEFLATED) else: zipdoc.writestr(fname, content) self.log.debug("Document packing completed") return zip_file def serialize_args_for_tasks(sender, instance, kwargs, extra_kwargs=None): if "instance" in kwargs: kwargs["instance"] = kwargs["instance"].pk sender = (sender._meta.app_label, sender._meta.object_name) if extra_kwargs: for kw in extra_kwargs: if getattr(instance, kw, None): kwargs[kw] = getattr(instance, kw) for k in list(kwargs.keys()): if k in ["model", "signal", "_cached_labels_bulk_update"] or kwargs[k] is None: kwargs.pop(k) continue if isinstance(kwargs[k], set): kwargs[k] = list(kwargs[k]) return sender, kwargs def deserialize_args_for_tasks(sender, kwargs, extra_kwargs=None): if "instance" not in kwargs: return sender, None if not isinstance(sender, (tuple, list)): # not task return sender, kwargs["instance"] sender = apps.get_model(*sender) instance = None retried = 0 # object can be in cache of another thread but not yet commited # waiting for it while not instance and retried < 6: if retried: time.sleep(0.5) if sender.objects.filter(pk=kwargs["instance"]).count(): instance = sender.objects.get(pk=kwargs["instance"]) else: retried += 1 if not instance: return sender, None # object is not here anymore if extra_kwargs: for kw in extra_kwargs: if kw in kwargs: setattr(instance, kw, kwargs[kw]) return sender, instance def get_ishtaruser_gdpr_log(view_name, request, data_type, queryset, slice_query=None): if not settings.GDPR_LOGGING: return Person = apps.get_model("ishtar_common", "Person") queryset = Person.objects.filter( ishtaruser__pk__in=queryset.values_list("pk", flat=True) ) return get_person_gdpr_log(view_name, request, data_type, queryset, slice_query) def get_person_gdpr_log(view_name, request, data_type, queryset, slice_query=None): if not settings.GDPR_LOGGING: return if view_name == "get_item": activity = "DE" if data_type == "csv" else "DC" elif view_name == "show_item": activity = "PV" if not data_type else "PE" elif view_name in ("new_qa_item", "new_item"): activity = "PC" elif view_name in ("modify_qa_item", "modify_item"): activity = "PM" elif view_name == "delete_item": activity = "PD" elif view_name == "merge_person": activity = "Pm" elif view_name == "admin_person_consultation": activity = "AC" elif view_name == "admin_person_view": activity = "AV" elif view_name == "admin_person_modify": activity = "AM" elif view_name == "admin_person_delete": activity = "AD" else: return GDPRLog = apps.get_model("ishtar_common", "GDPRLog") GDPRLog.create_log(request, activity, queryset, slice_query) EXTRA_KWARGS_TRIGGER = [ "_cascade_change", "_cached_labels_bulk_update", "skip_history_when_saving", "_post_saved_geo", "_search_updated", "_cached_label_checked", "_timestamp", ] def cached_label_and_geo_changed(sender, **kwargs): instance = kwargs["instance"] if getattr(instance, "_no_post_save", False): return cached_label_changed(sender=sender, **kwargs) post_save_geo(sender=sender, **kwargs) def revoke_old_task(kwargs, action_name, task_id, instance_cls): kwargs["action"] = action_name key, old_task_id = get_cache( instance_cls, tuple(f"{k}:{v}" for k, v in kwargs.items()) ) if old_task_id: try: celery_app.control.revoke(old_task_id) except ConnectionResetError: # task already revoked or done pass cache.set(key, task_id, settings.CACHE_TIMEOUT * 4) def load_task(task_func, task_name, checks, sender, queue=None, **kwargs): if not queue: queue = settings.CELERY_DEFAULT_QUEUE instance = kwargs.get("instance", None) if not instance: return if hasattr(instance, "test_obj"): instance.test_obj.reached(sender, **kwargs) if checks: for check in checks: if getattr(instance, check, None): return if ( not settings.USE_BACKGROUND_TASK or not instance.pk or not sender.objects.filter(pk=instance.pk).count() ): # no background task or not yet fully saved return task_func(sender, **kwargs) if getattr(instance, "_cascade_change", False): kwargs["cascade_change"] = True sender, kwargs = serialize_args_for_tasks( sender, instance, kwargs, EXTRA_KWARGS_TRIGGER ) kwargs["queue"] = queue task_item = task_func.apply_async([sender], kwargs, queue=queue) revoke_old_task(kwargs, task_name, task_item.id, instance.__class__) return task_item def cached_label_changed(sender, **kwargs): if "instance" not in kwargs: return instance = kwargs["instance"] if not instance: return if hasattr(instance, "_timestamp") and hasattr(instance, "timestamp_label") and ( instance.timestamp_label or 0) >= (instance._timestamp or 0): return queue = getattr(instance, "_queue", settings.CELERY_DEFAULT_QUEUE) if hasattr(instance, "external_id") and hasattr(instance, "auto_external_id") \ and hasattr(instance, "SLUG") and not getattr(instance, "_external_id_checked", None): load_task(_external_id_changed, "external_id_changed", None, sender, queue=queue, **kwargs) force_update = kwargs.get("force_update", False) if getattr(instance, "need_update", False): force_update = True if not force_update and getattr(instance, "_cached_label_checked", False): return return load_task(_cached_label_changed, "cached_label_changed", None, sender, queue=queue, **kwargs) @task() def _cached_label_changed(sender, **kwargs): sender, instance = deserialize_args_for_tasks(sender, kwargs, EXTRA_KWARGS_TRIGGER) if not instance: return force_update = kwargs.get("force_update", False) if hasattr(instance, "need_update") and instance.need_update: force_update = True instance.skip_history_when_saving = True if not force_update and getattr(instance, "_cached_label_checked", False): return if hasattr(instance, "_timestamp") and hasattr(instance, "timestamp_label"): if (instance.timestamp_label or 0) >= (instance._timestamp or 0): return instance.__class__.objects.filter(pk=instance.pk).update(timestamp_label=instance._timestamp) instance._queue = kwargs.get("queue", settings.CELERY_DEFAULT_QUEUE) logger.debug(f"[ishtar] ishtar_common.utils._cached_label_changed - {instance.__class__.__name__} - {instance.pk} - {instance}") if hasattr(instance, "refresh_cache"): instance.refresh_cache() instance._cached_label_checked = True cached_labels = [] if hasattr(instance, "CACHED_LABELS"): cached_labels = instance.CACHED_LABELS if hasattr(instance, "cached_label") and "cached_label" not in cached_labels: cached_labels.append("cached_label") changed = [] for cached_label in cached_labels: gen_func = "_generate_" + cached_label if not hasattr(instance, gen_func): continue lbl = getattr(instance, gen_func)() if lbl != getattr(instance, cached_label): changed.append((cached_label, lbl)) setattr(instance, cached_label, lbl) # update for cache if hasattr(instance, "need_update") and instance.need_update: changed.append(("need_update", False)) instance.need_update = False if changed: instance._search_updated = False if hasattr(instance, "_cascade_change") and instance._cascade_change: instance.skip_history_when_saving = True instance.__class__.objects.filter(pk=instance.pk).update(**dict(changed)) if ((getattr(instance, "check_cascade_update", False) and instance.check_cascade_update()) or changed or not cached_labels) and hasattr(instance, "cascade_update"): instance.cascade_update() updated = False if force_update or hasattr(instance, "update_search_vector"): updated = instance.update_search_vector() if not updated and hasattr(instance, "_get_associated_cached_labels"): for item in instance._get_associated_cached_labels(): item._cascade_change = True if hasattr(instance, "test_obj"): item.test_obj = instance.test_obj if instance.timestamp_label: item._timestamp = instance.timestamp_label cached_label_changed(item.__class__, instance=item) cache_key, __ = get_cache(sender, ["cached_label_changed", instance.pk]) cache.set(cache_key, None, settings.CACHE_TASK_TIMEOUT) if cached_labels: return getattr(instance, cached_labels[0], "") return "" def regenerate_all_cached_labels(model): """ When the rule for generating cached label change. Regeneration of all label has to be done. :param model: model class concerned """ for item in model.all(): item.skip_history_when_saving = True cached_label_changed(model, instance=item) def external_id_changed(sender, **kwargs): if "instance" not in kwargs: return instance = kwargs["instance"] if not instance or not hasattr(instance, "external_id") \ or not hasattr(instance, "auto_external_id") \ or not hasattr(instance, "SLUG"): return if getattr(instance, "_external_id_checked", None): return queue = getattr(instance, "_queue", settings.CELERY_DEFAULT_QUEUE) return load_task(_external_id_changed, "external_id_changed", ["_external_id_changed"], sender, queue=queue, **kwargs) @task() def _external_id_changed(sender, **kwargs): sender, instance = deserialize_args_for_tasks(sender, kwargs, EXTRA_KWARGS_TRIGGER) if not instance or not hasattr(instance, "external_id") \ or not hasattr(instance, "auto_external_id") \ or not hasattr(instance, "SLUG"): return if getattr(instance, "_external_id_checked", None): return instance._queue = kwargs.get("queue", settings.CELERY_DEFAULT_QUEUE) updated = {} if not instance.external_id or instance.auto_external_id: if hasattr(instance, "update_external_id"): external_id = instance.update_external_id(save=False, no_set=True) else: external_id = get_generated_id(instance.SLUG + "_external_id", instance) if external_id and external_id != instance.external_id: updated = {"auto_external_id": True, "external_id": external_id} instance.auto_external_id = True instance.external_id = external_id if hasattr(instance, "regenerate_all_ids"): updated.update(instance.regenerate_all_ids(save=False)) instance._external_id_checked = True if updated: if instance.pk: instance.__class__.objects.filter(pk=instance.pk).update(**updated) else: if settings.USE_BACKGROUND_TASK and hasattr(instance, "no_post_process"): instance.no_post_process() else: instance._no_move = True instance.skip_history_when_saving = True instance.save() return True def shortify(lbl, number=20): SHORTIFY_STR = ugettext(" (...)") if not lbl: lbl = "" if len(lbl) <= number: return lbl return lbl[: number - len(SHORTIFY_STR)] + SHORTIFY_STR def mode(array): most = max(list(map(array.count, array))) return list(set(filter(lambda x: array.count(x) == most, array))) def disable_for_loaddata(signal_handler): """ Decorator that turns off signal handlers when loading fixture data. """ @wraps(signal_handler) def wrapper(*args, **kwargs): if kwargs.get("raw"): return signal_handler(*args, **kwargs) return wrapper def _get_image_link(doc): from ishtar_common.models import IshtarSiteProfile # manage missing images if not doc.thumbnail or not doc.thumbnail.url or not doc.image or not doc.image.url: return "" item = None for related_key in doc.__class__.RELATED_MODELS: q = getattr(doc, related_key) if q.count(): item = q.all()[0] break if not item: # image attached to nothing... return "" item_class_name = str(item.__class__._meta.verbose_name) if item.__class__.__name__ == "ArchaeologicalSite": item_class_name = str(IshtarSiteProfile.get_default_site_label()) if not getattr(item, "SHOW_URL", ""): return "" return mark_safe( """

{}

{}

{}
""".format( doc.image.url, doc.thumbnail.url, item_class_name, str(item), reverse(item.SHOW_URL, args=[item.pk, ""]), str(_("Information")), str(_("Load another random image?")), ) ) def get_random_item_image_link(request): from ishtar_common.models import Document if not hasattr(request.user, "ishtaruser"): return "" ishtar_user = request.user.ishtaruser if not ishtar_user.has_permission("ishtar_common.view_document"): return "" q = ( Document.objects.filter(thumbnail__isnull=False, image__isnull=False) .exclude(thumbnail="") .exclude(image="") ) total = q.count() if not total: return "" # nosec: not used for security/cryptographic purposes image_nb = random.randint(0, total - 1) # nosec return _get_image_link(q.all()[image_nb]) class BSMessage: def __init__(self, message, message_type="info", icon=None, no_dismiss=False): self.message = message self.type = message_type self.icon = icon self.no_dismiss = no_dismiss def convert_coordinates_to_point(x, y, z=None, srid=4326): if z: geom = GEOSGeometry("POINT({} {} {})".format(x, y, z), srid=srid) else: geom = GEOSGeometry("POINT({} {})".format(x, y), srid=srid) if not geom.valid: raise forms.ValidationError(geom.valid_reason) return geom def get_srid_obj_from_point(point): from ishtar_common.models import SpatialReferenceSystem try: return SpatialReferenceSystem.objects.get(srid=int(point.srid)) except SpatialReferenceSystem.DoesNotExist: return SpatialReferenceSystem.objects.create( srid=int(point.srid), auth_name="EPSG", label="EPSG-{}".format(point.srid), txt_idx="epsg-{}".format(point.srid), ) def post_save_geodata(sender, **kwargs): instance = kwargs["instance"] if not instance: return queue = getattr(instance, "_queue", settings.CELERY_DEFAULT_QUEUE) return load_task(_post_save_geodata, "post_save_geodata", ["_no_geo_check"], sender, queue=queue, **kwargs) @task() def _post_save_geodata(sender, **kwargs): """ Save cached_x, cached_y, cached_z using display srid """ sender, instance = deserialize_args_for_tasks(sender, kwargs, EXTRA_KWARGS_TRIGGER) if not instance: return if getattr(instance, "_post_saved_geo", False): return instance._queue = kwargs.get("queue", settings.CELERY_DEFAULT_QUEUE) modified = False if getattr(instance, "post_save_geo", False): # TODO: geovectordata -> no post_save_geo: delete? modified = instance.post_save_geo(save=False) logger.debug(f"[ishtar] ishtar_common.utils._post_save_geodata - {instance.__class__.__name__} - {instance.pk} - {instance}") # managed cached coordinates cached_x, cached_y, cached_z = None, None, None coords = instance.display_coordinates(rounded=False, dim=3, srid=4326, cache=False) if coords and coords != [None, None, None]: cached_x, cached_y, cached_z = coords else: coords = instance.display_coordinates(rounded=False, dim=2, srid=4326, cache=False) if coords: cached_x, cached_y = coords if instance.cached_x != cached_x or instance.cached_y != cached_y \ or instance.cached_z != cached_z: modified = True instance.cached_x = cached_x instance.cached_y = cached_y instance.cached_z = cached_z if hasattr(instance, "need_update") and instance.need_update: instance.need_update = False modified = True if modified: instance._post_saved_geo = True instance._no_move = True instance.skip_history_when_saving = True instance.save() cache_key, __ = get_cache(sender, ("post_save_geodata", instance.pk)) cache.set(cache_key, None, settings.CACHE_TASK_TIMEOUT) return def post_save_geo(sender, **kwargs): """ Convert raw x, y, z point to real geo field """ if "instance" not in kwargs: return instance = kwargs["instance"] if not instance: return if getattr(instance, "_post_saved_geo", False): return queue = getattr(instance, "_queue", settings.CELERY_DEFAULT_QUEUE) return load_task(_post_save_geo, "post_save_geo", ["_no_geo_check"], sender, queue=queue, **kwargs) @task() def _post_save_geo(sender, **kwargs): """ Convert raw x, y, z point to real geo field """ profile = get_current_profile() if not profile.mapping: return sender, instance = deserialize_args_for_tasks(sender, kwargs, EXTRA_KWARGS_TRIGGER) if not instance: return if getattr(instance, "_post_saved_geo", False): return if hasattr(instance, "_timestamp") and hasattr(instance, "timestamp_geo"): if (instance.timestamp_label or 0) >= (instance._timestamp or 0): return instance.__class__.objects.filter(pk=instance.pk).update(timestamp_geo=instance._timestamp) instance._queue = kwargs.get("queue", settings.CELERY_DEFAULT_QUEUE) logger.debug(f"[ishtar] ishtar_common.utils._post_save_geo - {instance.__class__.__name__} - {instance.pk} - {instance}") instance._post_saved_geo = True modified = False if getattr(instance, "post_save_geo", False): modified = instance.post_save_geo(save=False) if hasattr(instance, "need_update") and instance.need_update: instance.need_update = False modified = True if modified: if hasattr(instance, "no_post_process"): instance.no_post_process() else: instance.skip_history_when_saving = True instance._post_saved_geo = True instance._cached_label_checked = False instance.save() if hasattr(instance, "cascade_update"): instance.cascade_update() cache_key, __ = get_cache(sender, ["post_save_geo", instance.pk]) cache.set(cache_key, None, settings.CACHE_TASK_TIMEOUT) return def np_format_int_float(values): """ Numpy array: format integer with no "." """ new_values = [] for value in values: if numpy.isnan(value): new_values.append("") elif int(value) == value: new_values.append(str(int(value))) else: new_values.append(value) return new_values def max_value_current_year(value): return MaxValueValidator(datetime.date.today().year)(value) def create_slug(model, name, slug_attr="slug", max_length=100, pk=None): base_slug = slugify(name) slug = base_slug[:max_length] final_slug = None idx = 1 while not final_slug: q = model.objects.filter(**{slug_attr: slug}) if pk: q = q.exclude(pk=pk) if slug and not q.exists(): final_slug = slug break slug = base_slug[: (max_length - 1 - len(str(idx)))] + "-" + str(idx) idx += 1 return final_slug def _get_parse_string(trunc_number=None): def parse_string(value): value = value.strip() if value == "#EMPTY": value = "" value = value.replace(" ", " ") if trunc_number: value = value[:trunc_number] return value return parse_string parse_string = _get_parse_string() def parse_insee(value): value = parse_string(value) values = [] while len(value) > 4: values.append(value[:5]) value = value[5:] towns = [] Town = apps.get_model("ishtar_common", "Town") for value in values: try: town = Town.objects.get(numero_insee=value) except Town.DoesNotExist: # sys.stderr.write('Numero INSEE : %s non existant en base' # % value) continue towns.append(town) return towns PARCEL_YEAR_REGEXP = re.compile(r"^([0-9]{4})[ :]+") PARCEL_SECTION_REGEXP = re.compile( r"(?: )*(?:[Ss]ection(?:s)?)?(?: )*([A-Z][A-Z0-9]{0,3})[ :]*" r"((?:(?: |;|,|[Pp]arcelle(?:s)?|n°|et|à|to)*[0-9]+[p]?)+)" ) PARCEL_NB_RANGE_REGEXP = re.compile(r"([0-9]+[p]?) (?:à|to) ([0-9]+[p]?)") PARCEL_NB_REGEXP = re.compile(r"(?: |;|,|[Pp]arcelle(?:s)?|n°|et|à|to)*([0-9]+[p]?)") def parse_parcels(parcel_str, insee_code=None, owner=None): parcels, town = [], None if insee_code: town = parse_insee(insee_code) # manage only one town at a time if len(town) >= 2 or not town: return parcels town = town[0] parcel_str = parcel_str.strip().replace( "\ufe50", ",").replace("\uff0c", ",").replace("\n", " ") parcel_str = re.sub(r'\s+', ' ', parcel_str) parcel_str = parcel_str.replace("à", "_aaaa_").replace("n°", "_nnnn_") parcel_str = parcel_str.encode("ascii", "ignore").decode("utf-8") parcel_str = parcel_str.replace("_aaaa_", "à").replace("_nnnn_", "n°") m = PARCEL_YEAR_REGEXP.match(parcel_str) year = None if m: year = m.groups()[0] parcel_str = parcel_str[m.span()[1]:] for parcel in PARCEL_SECTION_REGEXP.findall(parcel_str): sector, nums = parcel[0], parcel[1] for num in PARCEL_NB_REGEXP.findall(nums): if len(str(num)) > 6: continue dct = {"year": year, "section": sector, "parcel_number": num} if town: dct["town"] = town if owner: dct["history_modifier"] = owner parcels.append(dct) for parcel_ranges in PARCEL_NB_RANGE_REGEXP.findall(nums): lower_range, higher_range = parcel_ranges try: # the lower range itself has been already kept lower_range = int(lower_range) + 1 higher_range = int(higher_range) except ValueError: continue for num in range(lower_range, higher_range): dct = {"year": year, "section": sector, "parcel_number": str(num)} if town: dct["town"] = town if owner: dct["history_modifier"] = owner parcels.append(dct) return parcels def get_all_field_names(model): return list( set( chain.from_iterable( (field.name, field.attname) if hasattr(field, "attname") else (field.name,) for field in model._meta.get_fields() if not (field.many_to_one and field.related_model is None) ) ) ) def get_all_related_m2m_objects_with_model(model, related_name=None): return [ (f, f.model if f.model != model else None) for f in model._meta.get_fields(include_hidden=True) if f.many_to_many and f.auto_created and (not related_name or f.related_name.startswith(related_name)) ] def get_all_related_many_to_many_objects(model): return [ f for f in model._meta.get_fields(include_hidden=True) if f.many_to_many and f.auto_created ] def get_all_related_objects(model): return [ f for f in model._meta.get_fields() if (f.one_to_many or f.one_to_one) and f.auto_created and not f.concrete ] def num2col(n): strng = "" while n > 0: n, remainder = divmod(n - 1, 26) strng = chr(65 + remainder) + strng return strng RE_TSVECTOR = re.compile(r"('[^']+':\d+(?:,\d+)*)") def merge_tsvectors(vectors): """ Parse tsvector to merge them in one string :param vectors: list of tsvector string :return: merged tsvector """ result_dict = {} for vector in vectors: if not vector: continue current_position = 0 if result_dict: for key in result_dict: max_position = max(result_dict[key]) if max_position > current_position: current_position = max_position for dct_member in RE_TSVECTOR.findall(vector): splitted = dct_member.split(":") key = ":".join(splitted[:-1]) key = key[1:-1] # remove quotes result_dict[key] = [1] """ # position is not used today - simplify positions = splitted[-1] positions = [int(pos) + current_position for pos in positions.split(",")] if key in result_dict: result_dict[key] += positions else: result_dict[key] = positions """ # {'lamelie': [1, 42, 5]} => {'lamelie': "1,42,5"} result_dict = { k: ",".join([str(val) for val in result_dict[k]]) for k in result_dict } # {'lamelie': "1,5", "hagarde": "2", "regarde": "4"} => # "'lamelie':1,5 'hagarde':2 'regarde':4" result = " ".join(["'{}':{}".format(k, result_dict[k]) for k in result_dict]) return result def put_session_message(session_key, message, message_type): session = SessionStore(session_key=session_key) messages = [] if "messages" in session: messages = session["messages"][:] messages.append((str(message), message_type)) session["messages"] = messages session.save() def put_session_var(session_key, key, value): session = SessionStore(session_key=session_key) session[key] = value session.save() def get_session_var(session_key, key): session = SessionStore(session_key=session_key) if key not in session: return return session[key] def clean_session_cache(session): # clean session cache cache_key_list = "sessionlist-{}".format(session.session_key) key_list = cache.get(cache_key_list, []) for key in key_list: cache.set(key, None, settings.CACHE_TIMEOUT) cache.set(cache_key_list, [], settings.CACHE_TIMEOUT) def sanitize_filepath(filepath): #TODO: python3-pathvalidate # from pathvalidate import sanitize_filepath # filepath -> sanitize_filepath(filepath) keep_characters = (" ", ".", "_", "-", "/") return "".join(c for c in filepath if c.isalnum() or c in keep_characters).rstrip() def get_field_labels_from_path(model, path): """ :param model: base model :param path: list of attribute starting from the base model :return: list of labels """ labels = [] for key in path: try: field = model._meta.get_field(key) except: labels.append(key) continue if hasattr(field, "verbose_name"): labels.append(field.verbose_name) else: labels.append(key) return labels def get_columns_from_class(cls, table_cols_attr="TABLE_COLS", dict_col_labels=True): """ Get table columns and table label from a model :param table_cols_attr: "TABLE_COLS" if not specified :param dict_col_labels: (default: True) if set to False return list matching with table_cols list :return: (table_cols, table_col_labels) """ col_labels = {} slug = getattr(cls, "SLUG", None) if hasattr(cls, "COL_LABELS"): col_labels = cls.COL_LABELS if slug in settings.COL_LABELS: col_labels.update(settings.COL_LABELS[slug]) tb_key = (slug, table_cols_attr) if tb_key in settings.TABLE_COLS: table_cols = settings.TABLE_COLS[tb_key] else: table_cols = getattr(cls, table_cols_attr) if callable(table_cols): table_cols = table_cols() if dict_col_labels: return table_cols, col_labels table_cols_label = [] for c in table_cols: if c in col_labels: table_cols_label.append(str(col_labels[c])) else: field_verbose_name, field_name = "", "" field = cls keys = c.split("__") if "." in c: keys = c.split(".") for key in keys: if hasattr(field, "remote_field") and field.remote_field: field = field.remote_field.model try: field = field._meta.get_field(key) field_verbose_name = field.verbose_name except (field.FieldDoesNotExist, AttributeError): if hasattr(field, key + "_lbl"): field_verbose_name = getattr(field, key + "_lbl") else: continue table_cols_label.append(str(field_verbose_name)) return table_cols, table_cols_label def create_default_areas(models=None, verbose=False): # can be used on migrations if models are provided if not models: from ishtar_common.models import Area, Town, Department, State else: Area = models["area"] Town = models["town"] Department = models["department"] State = models["state"] areas = {} created = 0 q = State.objects total = q.count() for idx, state in enumerate(q.all()): if verbose: sys.stdout.write(f"State \t{idx + 1}/{total}\r") slug = "state-" + state.number area, create = Area.objects.get_or_create( txt_idx=slug, defaults={"label": state.label} ) areas[f"state-{state.pk}"] = area if create: created += 1 if verbose: sys.stdout.write(f"* {created} state areas added\n") created, association, association2 = 0, 0, 0 q = Department.objects total = q.count() for idx, dep in enumerate(q.all()): if verbose: sys.stdout.write(f"Department \t{idx + 1}/{total}\r") slug = f"dep-{dep.number}" area, create = Area.objects.get_or_create( txt_idx=slug, defaults={"label": dep.label} ) areas["dep-" + dep.number] = area if create: created += 1 if not dep.state_id: continue state_slug = "state-{}".format(dep.state_id) if state_slug in areas and (not area.parent or area.parent.pk != areas[state_slug].pk): association += 1 area.parent = areas[state_slug] area.save() area.towns.clear() q2 = Town.objects.annotate(insee_len=Length('numero_insee')).filter( numero_insee__startswith=dep.number, insee_len=5) area.towns.add(*list(q2.all())) if verbose: sys.stdout.write( f"* {created} department areas added with {association} associations to state\n" ) sys.stdout.write(f"* {association} town associated to department area") def get_relations_for_graph( rel_model, obj_pk, above_relations=None, equal_relations=None, treated=None, styles=None, render_above=True, render_below=True, full=False, ): """ Get all above and equal relations of an object (get all child and parent relations) :param rel_model: the relation model concerned :param obj_pk: id of an object with relations :param above_relations: list of current above_relations :param equal_relations: list of current equal_relations :param treated: treated relation list to prevent circular call :param styles: current styles :param render_above: render relation above the current object :param render_below: render relation below the current object :param full: render the full graph :return: above and equal relations list (each containing lists of two members) """ if not above_relations: above_relations = [] if not equal_relations: equal_relations = [] if not treated: treated = [] if not styles: styles = {} if obj_pk in treated: return above_relations, equal_relations, styles treated.append(obj_pk) for q, inverse in ( ( rel_model.objects.filter( left_record_id=obj_pk, relation_type__logical_relation__isnull=False ), False, ), ( rel_model.objects.filter( right_record_id=obj_pk, relation_type__logical_relation__isnull=False ), True, ), ): q = q.values( "left_record_id", "right_record_id", "relation_type__logical_relation" ) get_above, get_below = render_above, render_below if inverse and (not render_above or not render_below): get_above, get_below = not render_above, not render_below for relation in q.all(): logical_relation = relation["relation_type__logical_relation"] left_record = relation["left_record_id"] right_record = relation["right_record_id"] is_above, is_below = False, False if not logical_relation: continue elif ( get_below and logical_relation == "above" and (left_record, right_record) not in above_relations ): above_relations.append((left_record, right_record)) is_below = True elif ( get_above and logical_relation == "below" and (right_record, left_record) not in above_relations ): above_relations.append((right_record, left_record)) is_above = True elif ( logical_relation == "equal" and (right_record, left_record) not in equal_relations and (left_record, right_record) not in equal_relations ): equal_relations.append((left_record, right_record)) else: continue if right_record == obj_pk: other_record = left_record else: other_record = right_record if get_above and get_below and not full and (is_below or is_above): if (is_above and not inverse) or (is_below and inverse): ar, er, substyles = get_relations_for_graph( rel_model, other_record, above_relations, equal_relations, treated, styles, render_above=True, render_below=False, ) else: ar, er, substyles = get_relations_for_graph( rel_model, other_record, above_relations, equal_relations, treated, styles, render_above=False, render_below=True, ) else: ar, er, substyles = get_relations_for_graph( rel_model, other_record, above_relations, equal_relations, treated, styles, render_above=render_above, render_below=render_below, full=full, ) styles.update(substyles) error_style = "color=red" for r in ar: if r not in above_relations: above_relations.append(r) reverse_rel = tuple(reversed(r)) if reverse_rel in above_relations: # circular if r not in styles: styles[r] = [] if reverse_rel not in styles: styles[reverse_rel] = [] if error_style not in styles[r]: styles[r].append(error_style) if error_style not in styles[reverse_rel]: styles[reverse_rel].append(error_style) if r[0] == r[1]: # same entity if r not in styles: styles[r] = [] if error_style not in styles[r]: styles[r].append("color=red") for r in er: if r not in equal_relations: equal_relations.append(r) return above_relations, equal_relations, styles def generate_relation_graph( obj, highlight_current=True, render_above=True, render_below=True, full=False, debug=False, ): if not settings.DOT_BINARY: return model = obj.__class__ rel_model = model._meta.get_field("right_relations").related_model # get relations above_relations, equal_relations, styles = get_relations_for_graph( rel_model, obj.pk, render_above=render_above, render_below=render_below, full=full, ) # generate dotfile dot_str = "digraph relations {\nnode [shape=box];\n" rel_str = "" described = [] if not above_relations and not equal_relations: rel_str += "subgraph NoDir {\nedge [dir=none,style=dashed];\n" style = 'label="{}"'.format(obj.relation_label) if highlight_current: style += ',style=filled,fillcolor="#C6C0C0"' dot_str += 'item{}[{},href="{}"];\n'.format( obj.pk, style, reverse("display-item", args=[model.SLUG, obj.pk]) ) rel_str += "}\n" for list, directed in ((above_relations, True), (equal_relations, False)): if directed: rel_str += "subgraph Dir {\n" else: rel_str += "subgraph NoDir {\nedge [dir=none,style=dashed];\n" for left_pk, right_pk in list: if left_pk not in described: described.append(left_pk) left = model.objects.get(pk=left_pk) style = 'label="{}"'.format(left.relation_label) if left.pk == obj.pk and highlight_current: style += ',style=filled,fillcolor="#C6C0C0"' dot_str += 'item{}[{},href="{}"];\n'.format( left.pk, style, reverse("display-item", args=[model.SLUG, left.pk]) ) if right_pk not in described: described.append(right_pk) right = model.objects.get(pk=right_pk) style = 'label="{}"'.format(right.relation_label) if right.pk == obj.pk and highlight_current: style += ',style=filled,fillcolor="#C6C0C0"' dot_str += 'item{}[{},href="{}"];\n'.format( right.pk, style, reverse("display-item", args=[model.SLUG, right.pk]), ) if not directed: # on the same level rel_str += "{{rank = same; item{}; item{};}}\n".format( left_pk, right_pk ) style = "" if (left_pk, right_pk) in styles: style = " [{}]".format(", ".join(styles[(left_pk, right_pk)])) rel_str += "item{} -> item{}{};\n".format(left_pk, right_pk, style) rel_str += "}\n" dot_str += rel_str + "\n}" tempdir = tempfile.mkdtemp("-ishtardot") dot_name = tempdir + os.path.sep + "relations.dot" with open(dot_name, "w") as dot_file: dot_file.write(dot_str) if not render_above: suffix = "_below" elif not render_below: suffix = "_above" else: suffix = "" if full and obj.MAIN_UP_MODEL_QUERY and getattr(obj, obj.MAIN_UP_MODEL_QUERY): obj = getattr(obj, obj.MAIN_UP_MODEL_QUERY) with open(dot_name, "r") as dot_file: django_file = File(dot_file) attr = "relation_dot" + suffix getattr(obj, attr).save("relations.dot", django_file, save=True) # execute dot program args = (settings.DOT_BINARY, "-Tsvg", dot_name) svg_tmp_name = tempdir + os.path.sep + "relations.svg" with open(svg_tmp_name, "w") as svg_file: # nosec: no user input popen = subprocess.Popen(args, stdout=svg_file) # nosec popen.wait() # scale image if necessary with open(svg_tmp_name, "r") as svg_file: doc = xmltodict.parse(svg_file.read()) width = doc["svg"]["@width"] if width.endswith("pt"): width = float(width[:-2]) if width > 600: doc["svg"].pop("@height") doc["svg"].pop("@width") doc["svg"]["@preserveAspectRatio"] = "xMinYMin meet" with open(svg_tmp_name, "w") as svg_file: svg_file.write(xmltodict.unparse(doc)) with open(svg_tmp_name, "r") as svg_file: django_file = File(svg_file) attr = "relation_image" + suffix getattr(obj, attr).save("relations.svg", django_file, save=True) png_name = tempdir + os.path.sep + "relations.png" with open(png_name, "wb") as png_file: svg2png(open(svg_tmp_name, "rb").read(), write_to=png_file) with open(png_name, "rb") as png_file: django_file = File(png_file) attr = "relation_bitmap_image" + suffix getattr(obj, attr).save("relations.png", django_file, save=True) if debug: print("DOT file: {}. Tmp SVG file: {}.".format(dot_name, svg_tmp_name)) return shutil.rmtree(tempdir) def create_default_json_fields(model): """ Create default json field configuration in existing database :param model: model concerned """ from ishtar_common.models import JsonDataField def _get_keys(data, current_path=""): keys = [] for key in data.keys(): if type(data[key]) == dict: keys += _get_keys(data[key], current_path + key + "__") continue keys.append(current_path + key) return keys keys = [] for item in model.objects.all(): for key in _get_keys(item.data): if key not in keys: keys.append(key) content_type = ContentType.objects.get_for_model(model) for key in keys: JsonDataField.objects.get_or_create( content_type=content_type, key=key, defaults={ "name": " ".join(key.split("__")).capitalize(), "value_type": "T", "display": False, }, ) def get_urls_for_model( model, views, own=False, autocomplete=False, ): """ Generate get and show url for a model """ app_label = model._meta.app_label model_name = model._meta.model_name urls = [ url( r"show-{}(?:/(?P.+))?/(?P.+)?$".format(model.SLUG), check_permissions( [f"{app_label}.view_{model_name}", f"{app_label}.view_own_{model_name}"])( getattr(views, "show_" + model.SLUG) ), name="show-" + model.SLUG, ), url( r"^display-{}/(?P.+)/$".format(model.SLUG), check_permissions( [f"{app_label}.view_{model_name}", f"{app_label}.view_own_{model_name}"])( getattr(views, "display_" + model.SLUG) ), name="display-" + model.SLUG, ), ] if own: urls += [ url( r"get-{}/own/(?P.+)?$".format(model.SLUG), check_permissions( [f"{app_label}.view_{model_name}", f"{app_label}.view_own_{model_name}"])( getattr(views, "get_" + model.SLUG) ), name="get-own-" + model.SLUG, kwargs={"force_own": True}, ), ] urls += [ url( r"get-{}/(?P.+)?$".format(model.SLUG), check_permissions( [f"{app_label}.view_{model_name}", f"{app_label}.view_own_{model_name}"])( getattr(views, "get_" + model.SLUG) ), name="get-" + model.SLUG, ), ] if autocomplete: urls += [ url( r"autocomplete-{}/$".format(model.SLUG), check_permissions( [f"{app_label}.view_{model_name}", f"{app_label}.view_own_{model_name}"])( getattr(views, "autocomplete_" + model.SLUG) ), name="autocomplete-" + model.SLUG, ), ] return urls def m2m_historization_changed(sender, **kwargs): obj = kwargs.get("instance", None) if not obj: return obj._queue = kwargs.get("queue", settings.CELERY_DEFAULT_QUEUE) hist_values = obj.history_m2m or {} for attr in obj.HISTORICAL_M2M: values = [] for value in getattr(obj, attr).all(): if not hasattr(value, "history_compress"): continue values.append(value.history_compress()) hist_values[attr] = values obj.history_m2m = hist_values if getattr(obj, "skip_history_when_saving", False): # assume the last modifier is good... q = obj.history.filter( history_modifier_id=obj.history_modifier_id, ).order_by("-history_date", "-history_id") if q.count(): hist = q.all()[0] hist.history_m2m = hist_values hist.history_date = hist.last_modified = datetime.datetime.now() hist.save() obj.skip_history_when_saving = True elif not obj.history_modifier: obj.skip_history_when_saving = True obj.save() def max_size_help(help_for_doc=False): max_size = settings.MAX_UPLOAD_SIZE if help_for_doc: max_size = '`MAX_UPLOAD_SIZE`' msg = str(_("The maximum supported file size is {} Mo.")).format(max_size) return msg def find_all_symlink(dirname): for name in os.listdir(dirname): if name not in (os.curdir, os.pardir): full = os.path.join(dirname, name) if os.path.islink(full): yield full, os.readlink(full) def get_model_by_slug(slug): all_models = apps.get_models() for model in all_models: if hasattr(model, "SLUG") and model.SLUG == slug: return model return MEDIA_RE = [ re.compile(r"_[a-zA-Z0-9]{7}\-[0-9]"), re.compile(r"_[a-zA-Z0-9]{7}"), ] def get_broken_links(path): for root, dirs, files in os.walk(path): for filename in files: path = os.path.join(root, filename) if os.path.islink(path): target_path = os.readlink(path) # resolve relative symlinks if not os.path.isabs(target_path): target_path = os.path.join(os.path.dirname(path), target_path) if not os.path.exists(target_path): yield path def simplify_name(full_path_name, check_existing=False, min_len=15): """ Simplify a file name by removing auto save suffixes :param full_path_name: full path name :param check_existing: prevent to give name of an existing file :param min_len: minimum lenght of the base filename :return: """ name_exp = full_path_name.split(os.sep) path = os.sep.join(name_exp[0:-1]) name = name_exp[-1] current_name = name[:] ext = "" if "." in name: # remove extension if have one names = name.split(".") name = ".".join(names[0:-1]) ext = "." + names[-1] while "_" in name and len(name) > min_len: oldname = name[:] for regex in MEDIA_RE: match = None for m in regex.finditer(name): # get the last match match = m if match: new_name = name.replace(match.group(), "") full_new_name = os.sep.join([path, new_name + ext]) try: is_file = os.path.isfile(full_new_name) except UnicodeEncodeError: is_file = os.path.isfile(full_new_name.encode("utf-8")) if not check_existing or not is_file: # do not take the place of another file name = new_name[:] break if oldname == name: break return path, current_name, name + ext def rename_and_simplify_media_name(full_path_name, rename=True): """ Simplify the name if possible :param full_path_name: full path name :param rename: rename file if True (default: True) :return: new full path name (or old if not changed), modified """ try: exists = os.path.exists(full_path_name) is_file = os.path.isfile(full_path_name) except UnicodeEncodeError: full_path_name_unicode = full_path_name.encode("utf-8") exists = os.path.exists(full_path_name_unicode) is_file = os.path.isfile(full_path_name_unicode) if not exists or not is_file: return full_path_name, False path, current_name, name = simplify_name(full_path_name, check_existing=True) if current_name == name: return full_path_name, False full_new_name = os.sep.join([path, name]) if rename: os.rename(full_path_name, full_new_name) return full_new_name, True def get_file_fields(): """ Get all fields which are inherited from FileField """ all_models = apps.get_models() fields = [] for model in all_models: for field in model._meta.get_fields(): if isinstance(field, models.FileField): fields.append(field) return fields def get_used_media( exclude=None, limit=None, return_object_and_field=False, debug=False ): """ Get media which are still used in models :param exclude: exclude fields, ex: ['ishtar_common.Import.imported_file', 'ishtar_common.Import.imported_images'] :param limit: limit to some fields :param return_object_and_field: return associated object and field name :return: list of media filename or if return_object_and_field is set to True return (object, file field name, media filename) """ if return_object_and_field: media = [] else: media = set() for field in get_file_fields(): if exclude and str(field) in exclude: continue if limit and str(field) not in limit: continue is_null = {"%s__isnull" % field.name: True} is_empty = {"%s" % field.name: ""} storage = field.storage if debug: print("") q = ( field.model.objects.values("id", field.name) .exclude(**is_empty) .exclude(**is_null) ) ln = q.count() for idx, res in enumerate(q): value = res[field.name] if debug: sys.stdout.write("* get_used_media {}: {}/{}\r".format(field, idx, ln)) sys.stdout.flush() if value not in EMPTY_VALUES: if return_object_and_field: media.append( ( field.model.objects.get(pk=res["id"]), field.name, storage.path(value), ) ) else: media.add(storage.path(value)) return media def get_all_media(exclude=None, debug=False): """ Get all media from MEDIA_ROOT """ if not exclude: exclude = [] media = set() full_dirs = list(os.walk(settings.MEDIA_ROOT)) ln_full = len(full_dirs) for idx_main, full_dir in enumerate(full_dirs): root, dirs, files = full_dir ln = len(files) if debug: print("") for idx, name in enumerate(files): if debug: sys.stdout.write( "* get_all_media {} ({}/{}): {}/{}\r".format( root.encode("utf-8"), idx_main, ln_full, idx, ln ) ) sys.stdout.flush() path = os.path.abspath(os.path.join(root, name)) relpath = os.path.relpath(path, settings.MEDIA_ROOT) in_exclude = False for e in exclude: if re.match(r"^%s$" % re.escape(e).replace("\\*", ".*"), relpath): in_exclude = True break if not in_exclude: media.add(path) else: if debug: sys.stdout.write( "* get_all_media {} ({}/{})\r".format( root.encode("utf-8"), idx_main, ln_full ) ) return media def get_unused_media(exclude=None): """ Get media which are not used in models """ if not exclude: exclude = [] all_media = get_all_media(exclude) used_media = get_used_media() return [x for x in all_media if x not in used_media] def remove_unused_media(): """ Remove unused media """ remove_media(get_unused_media()) def remove_media(files): """ Delete file from media dir """ for filename in files: os.remove(os.path.join(settings.MEDIA_ROOT, filename)) def remove_empty_dirs(path=None): """ Recursively delete empty directories; return True if everything was deleted. """ if not path: path = settings.MEDIA_ROOT if not os.path.isdir(path): return False listdir = [os.path.join(path, filename) for filename in os.listdir(path)] if all(list(map(remove_empty_dirs, listdir))): os.rmdir(path) return True else: return False def _try_copy(path, f, dest, simplified_ref_name, make_copy, min_len): full_file = os.path.abspath(os.sep.join([path, f])) # must be a file if not os.path.isfile(full_file) or full_file == dest: return _, _, name = simplify_name(full_file, check_existing=False, min_len=min_len) if simplified_ref_name.lower() == name.lower(): # a candidate is found if make_copy: try: os.remove(dest) except OSError: pass try: shutil.copy2(full_file, dest) except OSError: return return f def try_fix_file(filename, make_copy=True, hard=False): """ Try to find a file with a similar name on the same dir. :param filename: filename (full path) :param make_copy: make the copy of the similar file found :param hard: search on the whole media dir :return: name of the similar file found or None """ filename = os.path.abspath(filename) path, current_name, simplified_ref_name = simplify_name( filename, check_existing=False, min_len=10 ) try: dirs = list(sorted(os.listdir(path))) except UnicodeDecodeError: dirs = list(sorted(os.listdir(path.encode("utf-8")))) # check existing files in the path for f in dirs: result = _try_copy( path, f, filename, simplified_ref_name, make_copy, min_len=10 ) if result: return result if not hard: return for path, __, files in os.walk(settings.MEDIA_ROOT): for f in files: result = _try_copy( path, f, filename, simplified_ref_name, make_copy, min_len=10 ) if result: return result def get_current_profile(force=False): IshtarSiteProfile = apps.get_model("ishtar_common", "IshtarSiteProfile") return IshtarSiteProfile.get_current_profile(force=force) PARSE_FORMULA = re.compile(r"{([^}^\\:]*)(?::[^}]*)?}") PARSE_JINJA = re.compile("{{([^}]*)}") PARSE_JINJA_IF = re.compile("{% if ([^}]*)}") @environmentfilter def _deduplicate(*args): value = args[0] if len(args) == 1 else args[1] # jinja simple filter new_values = [] for v in value.split("-"): if v not in new_values: new_values.append(v) return "-".join(new_values) def _padding(formt, args): value = args[0] if len(args) == 1 else args[1] # jinja simple filter try: value = int(value) except ValueError: return value return formt.format(value) @environmentfilter def _padding03(*args): return _padding("{:0>3}", args) @environmentfilter def _padding04(*args): return _padding("{:0>4}", args) @environmentfilter def _padding05(*args): return _padding("{:0>5}", args) @environmentfilter def _padding06(*args): return _padding( "{:0>6}", args) @environmentfilter def _padding07(*args): return _padding( "{:0>7}", args) @environmentfilter def _padding08(*args): return _padding( "{:0>8}", args) @environmentfilter def _upper(*args): value = args[0] if len(args) == 1 else args[1] # jinja simple filter return value.upper() @environmentfilter def _lower(*args): value = args[0] if len(args) == 1 else args[1] # jinja simple filter return value.lower() @environmentfilter def _capitalize(*args): value = args[0] if len(args) == 1 else args[1] # jinja simple filter return value.capitalize() @environmentfilter def _slug(*args): value = args[0] if len(args) == 1 else args[1] # jinja simple filter return slugify(value) FORMULA_FILTERS = { "upper": _upper, "lower": _lower, "capitalize": _capitalize, "slug": _slug, "deduplicate": _deduplicate, "padding03": _padding03, "padding04": _padding04, "padding05": _padding05, "padding06": _padding06, "padding07": _padding07, "padding08": _padding08, } EXTRA_JINJA_FILTERS = { "human_date": human_date_filter, "capfirst": capfirst_filter, "lowerfirst": lowerfirst_filter, "capitalize": capitalize_filter, "float_format": float_format, "euro_format": euro_format, "number_to_words": number_to_words, "replace_line_breaks": replace_line_breaks, } def _update_gen_id_dct(item, dct, initial_key, fkey=None, filters=None): if not fkey: fkey = initial_key[:] if fkey.startswith("settings__"): dct[fkey] = getattr(settings, fkey[len("settings__"):]) or "" return obj = item for k in fkey.split("__"): if isinstance(obj, dict): if k not in obj: obj = None break obj = obj[k] else: try: obj = getattr(obj, k) except (ObjectDoesNotExist, AttributeError): obj = None if hasattr(obj, "all") and hasattr(obj, "count"): # query manager if not obj.count(): break obj = obj.all()[0] elif callable(obj): obj = obj() if obj is None: break if obj is None: dct[initial_key] = "" else: dct[initial_key] = str(obj) if filters: for filtr in filters: dct[initial_key] = filtr(dct[initial_key]) def get_generated_id(key, item): profile = get_current_profile() if not hasattr(profile, key): return formula = getattr(profile, key, None) if not formula: return "" dct = {} # jinja2 style if "{{" in formula or "{%" in formula: # naive parse - only simple jinja2 is managed key_list = [] for key in PARSE_JINJA.findall(formula): key = key.strip().split("|")[0] key_list.append(key) for keys in PARSE_JINJA_IF.findall(formula): sub_key_list = keys.split(" or ") res = [] for keys2 in sub_key_list: res += keys2.split(" and ") key_list += [k.split(" ")[0] for k in res] key_list = map(lambda x: x.strip(), key_list) new_keys = [] for key in key_list: if key.startswith("not "): key = key[len("not "):].strip() key = key.split(".")[0] if " % " in key: keys = key.split(" % ")[1] keys = [ i.replace("(", "").replace(")", "").split("|")[0].strip() for i in keys.split(",") ] else: keys = [key] new_keys += keys key_list = new_keys for key in set(key_list): _update_gen_id_dct(item, dct, key) for key in FORMULA_FILTERS: if key not in FILTERS: FILTERS[key] = FORMULA_FILTERS[key] tpl = Template(formula) values = tpl.render(dct).split("||") else: for fkey in PARSE_FORMULA.findall(formula): filtered = fkey.split("|") initial_key = fkey[:] fkey = filtered[0] filters = [] for filtr in filtered[1:]: if filtr in FORMULA_FILTERS: filters.append(FORMULA_FILTERS[filtr]) _update_gen_id_dct(item, dct, initial_key, fkey, filters=filters) values = formula.format(**dct).split("||") value = values[0] for filtr in values[1:]: if filtr not in FORMULA_FILTERS: value += "||" + filtr continue value = FORMULA_FILTERS[filtr](value) return value def jinja_evaluation(formula, values): for key in FORMULA_FILTERS: if key not in FILTERS: FILTERS[key] = FORMULA_FILTERS[key] for key in EXTRA_JINJA_FILTERS: if key not in FILTERS: FILTERS[key] = EXTRA_JINJA_FILTERS[key] tpl = Template(formula) return tpl.render(values) PRIVATE_FIELDS = ("id", "history_modifier", "order", "uuid") def duplicate_item(item, user=None, data=None): model = item.__class__ new = model.objects.get(pk=item.pk) data = data or {} for field in model._meta.fields: # pk is in PRIVATE_FIELDS so: new.pk = None and a new # item will be created on save if field.name == "uuid": new.uuid = uuid.uuid4() elif field.name in PRIVATE_FIELDS: setattr(new, field.name, None) if user: new.history_user = user for k in data: setattr(new, k, data[k]) exclude_fields = getattr(model, "DUPLICATE_EXCLUDE", []) for k in exclude_fields: setattr(new, k, None) new.save() if hasattr(user, "user_ptr"): if hasattr(new, "history_creator"): new.history_creator = user.user_ptr if hasattr(new, "history_modifier"): new.history_modifier = user.user_ptr new.save() # m2m fields m2m = [ field.name for field in model._meta.many_to_many if field.name not in PRIVATE_FIELDS ] for field in m2m: for val in getattr(item, field).all(): if val not in getattr(new, field).all(): getattr(new, field).add(val) return new def get_image_path(instance, filename): # when using migrations instance is not a real ImageModel instance if not hasattr(instance, "_get_image_path"): n = datetime.datetime.now() return "upload/{}/{:02d}/{:02d}/{}".format(n.year, n.month, n.day, filename) return instance._get_image_path(filename) def human_date(value): language_code = settings.LANGUAGE_CODE.split("-") language_code = language_code[0] + "_" + language_code[1].upper() for language_suffix in (".utf8", ""): try: locale.setlocale(locale.LC_TIME, language_code + language_suffix) break except locale.Error: pass return value.strftime(settings.DATE_FORMAT) class StrJSONEncoder(json.JSONEncoder): def default(self, o): try: return super().default(o) except TypeError: s = f"<{o.__class__.__name__}> " if hasattr(o, "pk"): s += f"[{o.pk}] " s += str(o) return s class IshtarFileSystemStorage(FileSystemStorage): def exists(self, name): path_name = self.path(name) if os.path.islink(path_name): if not os.path.exists(os.readlink(path_name)): os.remove(path_name) return os.path.exists(path_name) def generate_pdf_preview(item, save=True, tempdir=None, page_number=None): if not settings.PDFTOPPM_BINARY: return preview_tmp_name = None if not item.image and item.source and item.source_page_range: tempdir = tempfile.mkdtemp("-ishtarppmtopdf") try: page_number = int( item.source_page_range.split("-")[0].split(";")[0].split(",")[0]) except ValueError: page_number = 1 returned = generate_pdf_preview(item.source, save=False, tempdir=tempdir, page_number=page_number) if not returned: return tempdir, preview_tmp_name = returned elif (not page_number and item.image) \ or not item.associated_file \ or not item.associated_file.path \ or not item.associated_file.path.lower().endswith(".pdf"): return if not page_number: page_number = 1 if not tempdir: tempdir = tempfile.mkdtemp("-ishtarppmtopdf") if not preview_tmp_name: preview_tmp_name = tempdir + os.path.sep + "preview" args = (settings.PDFTOPPM_BINARY, "-singlefile", "-jpeg", "-f", str(page_number), item.associated_file.path, preview_tmp_name) try: # nosec: no user input popen = subprocess.Popen(args) # nosec popen.wait(timeout=5) except subprocess.SubprocessError: return if not save: return tempdir, preview_tmp_name with open(preview_tmp_name + ".jpg", "rb") as preview: django_file = File(preview) getattr(item, "image").save("page.jpg", django_file, save=True) shutil.rmtree(tempdir) def get_create_merge_town(insee, default): from ishtar_common.model_merging import merge_model_objects Town = apps.get_model("ishtar_common", "Town") q = Town.objects.filter(numero_insee=insee) if q.count() > 1: town = None for t in q.all(): if not town: town = t else: merge_model_objects(town, t) else: town, __ = Town.objects.get_or_create( numero_insee=insee, defaults=default ) return town OSM_URL = "http://polygons.openstreetmap.fr/get_geojson.py?id={}" OSM_REFRESH_URL = "http://polygons.openstreetmap.fr/?id={}" OSM_SIMPLIFY = "¶ms=0.000800-0.000200-0.000200" def create_osm_town(rel_id, name, numero_insee=None): """ Create town from OSM relation :param rel_id: OSM relation IS :param name: "town" name :param numero_insee: town reference by default OSM[rel_id] :return: town """ geojson, retry = None, 0 while not geojson and retry < 5: response = requests.get(OSM_URL.format(rel_id)) retry += 1 try: geojson = response.json() except requests.JSONDecodeError: requests.get(OSM_REFRESH_URL.format(rel_id)) time.sleep(3) if not geojson: print("\nError on: " + OSM_URL.format(rel_id)) return if len(geojson) > 50000: response = requests.get(OSM_URL.format(rel_id) + OSM_SIMPLIFY) try: geojson_simplify = response.json() geojson = geojson_simplify except requests.JSONDecodeError: pass default = {"name": name} if not numero_insee: numero_insee = f"OSM{rel_id}" town = get_create_merge_town(numero_insee, default) geom = GEOSGeometry(str(geojson)) if geom.geom_type == "GeometryCollection": geom = geom[0] GeoVectorData = apps.get_model("ishtar_common", "GeoVectorData") GeoDataType = apps.get_model("ishtar_common", "GeoDataType") GeoProviderType = apps.get_model("ishtar_common", "GeoProviderType") town_content_type = ContentType.objects.get(app_label="ishtar_common", model="town") data_type, __ = GeoDataType.objects.get_or_create( txt_idx="town-limit", defaults={"label": "Limites commune"} ) provider, __ = GeoProviderType.objects.get_or_create( txt_idx="openstreetmap", defaults={"label": "OpenStreetMap"} ) data = GeoVectorData.objects.create( name="Limite de " + town.name, source_content_type=town_content_type, multi_polygon=geom, source_id=town.pk, data_type=data_type, provider=provider ) town.main_geodata = data town._post_save_geo_ok = False town.save() return town def format_date(value, frmat="SHORT_DATE_FORMAT"): formats = ["%Y-%m-%d", "%d/%m/%Y"] date = None for forma in formats: try: date = datetime.datetime.strptime(value, forma).date() break except ValueError: continue if not date: return value return date_format(date, format=frmat, use_l10n=True) def get_percent(current, total): return f"{(current + 1) / total * 100:.1f}".rjust(4, "0") + "%" def get_log_time(): return datetime.datetime.now().isoformat().split(".")[0] def get_eta(current, total, base_time, current_time): if current < 5: return "-" elapsed_time = current_time - base_time eta = elapsed_time.seconds / current * (total - current) if eta < 1: return "-" return f"{int(eta // 3600):02d}:{int(eta % 3600 // 60):02d}:{int(eta % 60):02d}" def get_progress(base_lbl, idx, total, ref_time): """ Output progress for a long task. - base_lbl: label to display - idx: current item number - total: number of items - ref_time: time the task has been started """ lbl = f"\r{BColors.OKBLUE}[{get_percent(idx, total)}] {base_lbl} {idx + 1}/{total}" lbl += f" ({get_eta(idx, total, ref_time, datetime.datetime.now())} left){BColors.ENDC}" return lbl def fast_line_count(filename): """ Efficient line counter for a file """ CHUNK_SIZE = 1024 * 1024 def _count(reader): b = reader(CHUNK_SIZE) while b: yield b b = reader(CHUNK_SIZE) with open(filename, 'rb') as fp: count = sum(buffer.count(b"\n") for buffer in _count(fp.raw.read)) return count + 1 RE_NUMBER = r"[+-]?\d+(?:\.\d*)?" RE_COORDS = r"(" + RE_NUMBER + r") (" + RE_NUMBER + r")" def reverse_coordinates(wkt): return re.sub(RE_COORDS, r"\2 \1", wkt) def reverse_list_coordinates(lst): return list(reversed(lst)) def add_business_days(from_date: datetime.datetime, business_days_to_add: int) -> datetime.datetime: """ Add days to a date with exclusion of saturday and sunday. """ num_whole_weeks = business_days_to_add // 5 extra_days = num_whole_weeks * 2 # manage the last week first_weekday = from_date.weekday() remainder_days = business_days_to_add % 5 if (first_weekday + remainder_days) > 4: if first_weekday == 5: extra_days += 1 elif first_weekday != 6: extra_days += 2 return from_date + datetime.timedelta(business_days_to_add + extra_days) class EachCharacterTypeValidator: def __init__(self, character_types=None): if not character_types: character_types = ( (_("uppercase letter"), string.ascii_uppercase), (_("lowercase letter"), string.ascii_lowercase), (_("number"), [str(i) for i in range(10)]), (_("punctuation sign"), string.punctuation) ) self.character_types = character_types def validate(self, password, user=None): ok = set() for letter in password: for idx, character_type in enumerate(self.character_types): __, character_type = character_type if idx in ok: continue if letter in character_type: ok.add(idx) missing = [ str(character_type[0]) for idx, character_type in enumerate(self.character_types) if idx not in ok] if not missing: return msg = str(_("This password must contain one of this character: ")) + ", ".join( missing) + str(_(".")) raise ValidationError(msg, code='missing_character_types') def get_help_text(self): return str( _("Your password must contain each of these characters: ") ) + ", ".join( [str(character_type[0]) for character_type in self.character_types] ) + str(_(".")) def get_news_feed(): cache_key = f"{settings.PROJECT_SLUG}-news_feed" news_feed = cache.get(cache_key) if news_feed is None: news_feed = update_news_feed() # "" could be a temporary unavailability of the forum re-test 5 minutes later timeout = settings.CACHE_TIMEOUT if news_feed != "" else 60 * 5 cache.set(cache_key, news_feed, timeout=timeout) return news_feed def update_news_feed(): try: response = requests.get(settings.ISHTAR_FEED_URL, timeout=5) except (requests.ReadTimeout, requests.ConnectionError): logger.warning(f"Timeout when reading RSS {settings.ISHTAR_FEED_URL}") return "" # Put it to memory stream object universal feedparser content = io.BytesIO(response.content) feed = feedparser.parse(content) news_feed = [] if "entries" in feed: for entry in feed["entries"][:5]: published = entry.published try: locale.setlocale(locale.LC_TIME, "en_GB") d = datetime.datetime.strptime(published, "%a, %d %b %Y %H:%M:%S %z") d_aware = d.replace(tzinfo=pytz.timezone(settings.TIME_ZONE)) lang_code = settings.LANGUAGE_CODE.split("-") lang_code = lang_code[0] + "_" + lang_code[1].upper() locale.setlocale(locale.LC_TIME, lang_code) published = d_aware.strftime("%d %b %Y %H:%M") except ValueError: published = "–" desc = f"""
{published}
""" news_feed.append(desc) return "".join(news_feed) # picked from Django 3.2 to assure 128 bites salt - should be removed on upgrade class Argon2PasswordHasher(BaseArgon2PasswordHasher): salt_entropy = 128 RANDOM_STRING_CHARS = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789' def salt(self): """ Generate a cryptographically secure nonce salt in ASCII with an entropy of at least `salt_entropy` bits. """ char_count = math.ceil(self.salt_entropy / math.log2(len(self.RANDOM_STRING_CHARS))) return get_random_string(char_count, allowed_chars=self.RANDOM_STRING_CHARS)