diff options
author | Étienne Loks <etienne.loks@iggdrasil.net> | 2021-03-19 11:05:22 +0100 |
---|---|---|
committer | Étienne Loks <etienne.loks@iggdrasil.net> | 2021-03-19 11:05:22 +0100 |
commit | 3039fae5124c00a67283c9b707e4a411149d93b1 (patch) | |
tree | 5d7fde3628825aebeeef3d85d2dfcf09a52116de /ishtar_common/utils.py | |
parent | b38e35ad05ae5b7d1c3d45436921f573bc9e5ba6 (diff) | |
download | Ishtar-3039fae5124c00a67283c9b707e4a411149d93b1.tar.bz2 Ishtar-3039fae5124c00a67283c9b707e4a411149d93b1.zip |
Format - black: ishtar_common
Diffstat (limited to 'ishtar_common/utils.py')
-rw-r--r-- | ishtar_common/utils.py | 808 |
1 files changed, 465 insertions, 343 deletions
diff --git a/ishtar_common/utils.py b/ishtar_common/utils.py index b4542688b..fa24b3dcb 100644 --- a/ishtar_common/utils.py +++ b/ishtar_common/utils.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 # -*- coding: utf-8 -*- # Copyright (C) 2013-2016 Étienne Loks <etienne.loks_AT_peacefrogsDOTnet> @@ -62,11 +62,19 @@ from django.template.defaultfilters import slugify if settings.USE_TRANSLATION_OVERLOAD: - from overload_translation.utils import ugettext_lazy, ugettext, \ - pgettext_lazy, pgettext + from overload_translation.utils import ( + ugettext_lazy, + ugettext, + pgettext_lazy, + pgettext, + ) else: - from django.utils.translation import ugettext_lazy, ugettext, \ - pgettext_lazy, pgettext + from django.utils.translation import ( + ugettext_lazy, + ugettext, + pgettext_lazy, + pgettext, + ) _ = ugettext_lazy @@ -87,6 +95,7 @@ def debug_line_no(): def fake_task(*args): def fake(func): return func + return fake @@ -94,6 +103,7 @@ task = fake_task if settings.USE_BACKGROUND_TASK: try: from celery import shared_task + task = shared_task except ModuleNotFoundError: pass @@ -103,23 +113,24 @@ class BColors: """ Bash colors. Don't forget to finish your colored string with ENDC. """ - HEADER = '\033[95m' - OKBLUE = '\033[94m' - OKGREEN = '\033[92m' - WARNING = '\033[93m' - FAIL = '\033[91m' - ENDC = '\033[0m' - BOLD = '\033[1m' - UNDERLINE = '\033[4m' + + HEADER = "\033[95m" + OKBLUE = "\033[94m" + OKGREEN = "\033[92m" + WARNING = "\033[93m" + FAIL = "\033[91m" + ENDC = "\033[0m" + BOLD = "\033[1m" + UNDERLINE = "\033[4m" class Round(models.Func): - function = 'ROUND' + function = "ROUND" arity = 2 - arg_joiner = '::numeric, ' + arg_joiner = "::numeric, " -CSV_OPTIONS = {'delimiter': ',', 'quotechar': '"', 'quoting': QUOTE_ALL} +CSV_OPTIONS = {"delimiter": ",", "quotechar": '"', "quoting": QUOTE_ALL} def is_safe_path(basedir, path, follow_symlinks=True): @@ -134,22 +145,23 @@ def import_class(full_path_classname): """ Return the model class from the full path """ - mods = full_path_classname.split('.') + mods = full_path_classname.split(".") if len(mods) == 1: - mods = ['ishtar_common', 'models', mods[0]] - elif 'models' not in mods and 'models_finds' not in mods \ - and 'models_treatments' not in mods: - raise SuspiciousOperation( - "Try to import a non model from a string") - module = import_module('.'.join(mods[:-1])) + mods = ["ishtar_common", "models", mods[0]] + elif ( + "models" not in mods + and "models_finds" not in mods + and "models_treatments" not in mods + ): + raise SuspiciousOperation("Try to import a non model from a string") + module = import_module(".".join(mods[:-1])) model = getattr(module, mods[-1]) if not issubclass(model, models.Model): - raise SuspiciousOperation( - "Try to import a non model from a string") + raise SuspiciousOperation("Try to import a non model from a string") return model -def check_rights(rights=None, redirect_url='/'): +def check_rights(rights=None, redirect_url="/"): """ Decorator that checks the rights to access the view. """ @@ -158,25 +170,25 @@ def check_rights(rights=None, redirect_url='/'): def _wrapped_view(request, *args, **kwargs): if not rights: return view_func(request, *args, **kwargs) - if hasattr(request.user, 'ishtaruser'): - if request.user.ishtaruser.has_right('administrator', - request.session): - kwargs['current_right'] = 'administrator' + if hasattr(request.user, "ishtaruser"): + if request.user.ishtaruser.has_right("administrator", request.session): + kwargs["current_right"] = "administrator" return view_func(request, *args, **kwargs) for right in rights: # be careful to put the more permissive rights first # if granted it can allow more - if request.user.ishtaruser.has_right(right, - request.session): - kwargs['current_right'] = right + if request.user.ishtaruser.has_right(right, request.session): + kwargs["current_right"] = right return view_func(request, *args, **kwargs) put_session_message( request.session.session_key, _("You don't have sufficient permissions to do this action."), - 'warning' + "warning", ) return HttpResponseRedirect(redirect_url) + return _wrapped_view + return decorator @@ -184,14 +196,16 @@ def check_rights_condition(rights): """ To be used to check in wizard condition_dict """ + def func(self): request = self.request - if request.user.ishtaruser.has_right('administrator', request.session): + if request.user.ishtaruser.has_right("administrator", request.session): return True for right in rights: if request.user.ishtaruser.has_right(right, request.session): return True return False + return func @@ -211,13 +225,15 @@ def check_model_access_control(request, model, available_perms=None): return allowed, own if not available_perms: - available_perms = ['view_' + model.__name__.lower(), - 'view_own_' + model.__name__.lower()] + available_perms = [ + "view_" + model.__name__.lower(), + "view_own_" + model.__name__.lower(), + ] try: ishtaruser = request.user.ishtaruser except request.user._meta.model.ishtaruser.RelatedObjectDoesNotExist: return False, True - if ishtaruser.has_right('administrator', session=request.session): + if ishtaruser.has_right("administrator", session=request.session): allowed = True own = False return allowed, own @@ -266,12 +282,12 @@ def move_dict_data(data, key1, key2): "data__") :return: result data """ - keys1 = key1.split('__') - keys2 = key2.split('__') + keys1 = key1.split("__") + keys2 = key2.split("__") value = data for idx, key in enumerate(keys1): if not idx: - if key != 'data': + if key != "data": return data continue if key not in value: @@ -284,7 +300,7 @@ def move_dict_data(data, key1, key2): new_value = data for idx, key in enumerate(keys2): if not idx: - if key != 'data': + if key != "data": return data continue if idx == (len(keys2) - 1): # last @@ -333,10 +349,10 @@ def is_downloadable(url): """ h = requests.head(url, allow_redirects=True) header = h.headers - content_type = header.get('content-type') - if 'text' in content_type.lower(): + content_type = header.get("content-type") + if "text" in content_type.lower(): return False - if 'html' in content_type.lower(): + if "html" in content_type.lower(): return False return True @@ -348,21 +364,21 @@ def get_current_year(): def get_cache(cls, extra_args=tuple(), app_label=None): if not app_label: app_label = cls._meta.app_label - cache_key = "{}-{}-{}".format( - settings.PROJECT_SLUG, app_label, cls.__name__) + cache_key = "{}-{}-{}".format(settings.PROJECT_SLUG, app_label, cls.__name__) for arg in extra_args: if not arg: - cache_key += '-0' + cache_key += "-0" else: if type(arg) == dict: - cache_key += '-' + "_".join([str(arg[k]) for k in arg]) + cache_key += "-" + "_".join([str(arg[k]) for k in arg]) elif type(arg) in (list, tuple): - cache_key += '-' + "_".join([str(v) for v in arg]) + cache_key += "-" + "_".join([str(v) for v in arg]) else: - cache_key += '-' + str(arg) + cache_key += "-" + str(arg) cache_key = slugify(cache_key) - if not cache_key.endswith('_current_keys') \ - and hasattr(cls, '_add_cache_key_to_refresh'): + if not cache_key.endswith("_current_keys") and hasattr( + cls, "_add_cache_key_to_refresh" + ): cls._add_cache_key_to_refresh(extra_args) if len(cache_key) >= 250: m = hashlib.md5() @@ -372,9 +388,9 @@ def get_cache(cls, extra_args=tuple(), app_label=None): def force_cached_label_changed(sender, **kwargs): - if not kwargs.get('instance'): + if not kwargs.get("instance"): return - kwargs['instance']._cached_label_checked = False + kwargs["instance"]._cached_label_checked = False cached_label_changed(sender, **kwargs) @@ -384,10 +400,10 @@ class SecretaryRenderer(MainSecretaryRenderer): Overload _pack_document: obsolete files can be referenced - continue on null content for files """ - self.log.debug('packing document') + self.log.debug("packing document") zip_file = io.BytesIO() - zipdoc = zipfile.ZipFile(zip_file, 'a') + zipdoc = zipfile.ZipFile(zip_file, "a") for fname, content in files.items(): if isinstance(content, UndefinedSilently): continue @@ -395,21 +411,20 @@ class SecretaryRenderer(MainSecretaryRenderer): zipdoc.writestr(fname, content, zipfile.ZIP_DEFLATED) else: zipdoc.writestr(fname, content) - self.log.debug('Document packing completed') + self.log.debug("Document packing completed") return zip_file def serialize_args_for_tasks(sender, instance, kwargs, extra_kwargs=None): - if 'instance' in kwargs: - kwargs['instance'] = kwargs["instance"].pk + if "instance" in kwargs: + kwargs["instance"] = kwargs["instance"].pk sender = (sender._meta.app_label, sender._meta.object_name) if extra_kwargs: for kw in extra_kwargs: if getattr(instance, kw, None): kwargs[kw] = getattr(instance, kw) for k in list(kwargs.keys()): - if k in ["model", "signal", - "_cached_labels_bulk_update"] or kwargs[k] is None: + if k in ["model", "signal", "_cached_labels_bulk_update"] or kwargs[k] is None: kwargs.pop(k) continue if isinstance(kwargs[k], set): @@ -429,9 +444,9 @@ def deserialize_args_for_tasks(sender, kwargs, extra_kwargs=None): # waiting for it while not instance and retried < 6: if retried: - time.sleep(.5) - if sender.objects.filter(pk=kwargs['instance']).count(): - instance = sender.objects.get(pk=kwargs['instance']) + time.sleep(0.5) + if sender.objects.filter(pk=kwargs["instance"]).count(): + instance = sender.objects.get(pk=kwargs["instance"]) else: retried += 1 if not instance: @@ -444,8 +459,12 @@ def deserialize_args_for_tasks(sender, kwargs, extra_kwargs=None): EXTRA_KWARGS_TRIGGER = [ - "_cascade_change", "_cached_labels_bulk_update", "skip_history_when_saving", - "_post_saved_geo", "_search_updated", "_cached_label_checked" + "_cascade_change", + "_cached_labels_bulk_update", + "skip_history_when_saving", + "_post_saved_geo", + "_search_updated", + "_cached_label_checked", ] @@ -455,11 +474,11 @@ def cached_label_and_geo_changed(sender, **kwargs): def cached_label_changed(sender, **kwargs): - if not kwargs.get('instance'): + if not kwargs.get("instance"): return - instance = kwargs.get('instance') + instance = kwargs.get("instance") - if hasattr(instance, 'test_obj'): + if hasattr(instance, "test_obj"): instance.test_obj.reached(sender, **kwargs) # cache_key, value = get_cache( @@ -470,8 +489,11 @@ def cached_label_changed(sender, **kwargs): # return # cache.set(cache_key, True, settings.CACHE_TASK_TIMEOUT) - if not settings.USE_BACKGROUND_TASK or not instance.pk \ - or not sender.objects.filter(pk=instance.pk).count(): + if ( + not settings.USE_BACKGROUND_TASK + or not instance.pk + or not sender.objects.filter(pk=instance.pk).count() + ): # no background task or not yet fully saved return _cached_label_changed(sender, **kwargs) @@ -479,34 +501,34 @@ def cached_label_changed(sender, **kwargs): kwargs["cascade_change"] = True sender, kwargs = serialize_args_for_tasks( - sender, instance, kwargs, EXTRA_KWARGS_TRIGGER) + sender, instance, kwargs, EXTRA_KWARGS_TRIGGER + ) return _cached_label_changed.delay(sender, **kwargs) @task() def _cached_label_changed(sender, **kwargs): - sender, instance = deserialize_args_for_tasks(sender, kwargs, - EXTRA_KWARGS_TRIGGER) + sender, instance = deserialize_args_for_tasks(sender, kwargs, EXTRA_KWARGS_TRIGGER) if not instance: return - force_update = kwargs.get('force_update', False) + force_update = kwargs.get("force_update", False) if hasattr(instance, "need_update") and instance.need_update: force_update = True instance.skip_history_when_saving = True - if not force_update and getattr(instance, '_cached_label_checked', False): + if not force_update and getattr(instance, "_cached_label_checked", False): return if hasattr(instance, "refresh_cache"): instance.refresh_cache() instance._cached_label_checked = True - cached_labels = ['cached_label'] - if hasattr(instance, 'CACHED_LABELS'): + cached_labels = ["cached_label"] + if hasattr(instance, "CACHED_LABELS"): cached_labels = instance.CACHED_LABELS changed = [] for cached_label in cached_labels: - gen_func = '_generate_' + cached_label + gen_func = "_generate_" + cached_label if not hasattr(instance, gen_func): continue lbl = getattr(instance, gen_func)() @@ -520,26 +542,23 @@ def _cached_label_changed(sender, **kwargs): if changed: instance._search_updated = False - if hasattr(instance, '_cascade_change') and instance._cascade_change: + if hasattr(instance, "_cascade_change") and instance._cascade_change: instance.skip_history_when_saving = True - instance.__class__.objects.filter(pk=instance.pk).update( - **dict(changed)) + instance.__class__.objects.filter(pk=instance.pk).update(**dict(changed)) if (changed or not cached_labels) and hasattr(instance, "cascade_update"): instance.cascade_update() updated = False - if force_update or hasattr(instance, 'update_search_vector'): + if force_update or hasattr(instance, "update_search_vector"): updated = instance.update_search_vector() - if hasattr(instance, '_cached_labels_bulk_update'): + if hasattr(instance, "_cached_labels_bulk_update"): updated = instance._cached_labels_bulk_update() or updated - if not updated and hasattr(instance, '_get_associated_cached_labels'): + if not updated and hasattr(instance, "_get_associated_cached_labels"): for item in instance._get_associated_cached_labels(): item._cascade_change = True - if hasattr(instance, 'test_obj'): + if hasattr(instance, "test_obj"): item.test_obj = instance.test_obj cached_label_changed(item.__class__, instance=item) - cache_key, __ = get_cache( - sender, ["cached_label_changed", instance.pk] - ) + cache_key, __ = get_cache(sender, ["cached_label_changed", instance.pk]) cache.set(cache_key, None, settings.CACHE_TASK_TIMEOUT) if cached_labels: return getattr(instance, cached_labels[0], "") @@ -560,10 +579,10 @@ def regenerate_all_cached_labels(model): def shortify(lbl, number=20): SHORTIFY_STR = ugettext(" (...)") if not lbl: - lbl = '' + lbl = "" if len(lbl) <= number: return lbl - return lbl[:number - len(SHORTIFY_STR)] + SHORTIFY_STR + return lbl[: number - len(SHORTIFY_STR)] + SHORTIFY_STR def mode(array): @@ -578,9 +597,10 @@ def disable_for_loaddata(signal_handler): @wraps(signal_handler) def wrapper(*args, **kwargs): - if kwargs.get('raw'): + if kwargs.get("raw"): return signal_handler(*args, **kwargs) + return wrapper @@ -588,8 +608,7 @@ def _get_image_link(doc): from ishtar_common.models import IshtarSiteProfile # manage missing images - if not doc.thumbnail or not doc.thumbnail.url or not doc.image \ - or not doc.image.url: + if not doc.thumbnail or not doc.thumbnail.url or not doc.image or not doc.image.url: return "" item = None @@ -607,7 +626,8 @@ def _get_image_link(doc): if item.__class__.__name__ == "ArchaeologicalSite": item_class_name = str(IshtarSiteProfile.get_default_site_label()) - return mark_safe(""" + return mark_safe( + """ <div class="col col-lg-3"> <div class="card"> <div id="lightgallery-rand-img"> @@ -630,34 +650,38 @@ def _get_image_link(doc): <script type="text/javascript"> lightGallery(document.getElementById('lightgallery-rand-img')); </script>""".format( - doc.image.url, - doc.thumbnail.url, - item_class_name, - str(item), - reverse(item.SHOW_URL, args=[item.pk, '']), - str(_("Information")), - str(_("Load another random image?")))) + doc.image.url, + doc.thumbnail.url, + item_class_name, + str(item), + reverse(item.SHOW_URL, args=[item.pk, ""]), + str(_("Information")), + str(_("Load another random image?")), + ) + ) def get_random_item_image_link(request): from ishtar_common.models import Document - if not hasattr(request.user, 'ishtaruser'): - return '' + if not hasattr(request.user, "ishtaruser"): + return "" ishtar_user = request.user.ishtaruser - if not ishtar_user.has_right('ishtar_common.view_document', - session=request.session): - return '' + if not ishtar_user.has_right( + "ishtar_common.view_document", session=request.session + ): + return "" - q = Document.objects.filter( - thumbnail__isnull=False, - image__isnull=False - ).exclude(thumbnail='').exclude(image='') + q = ( + Document.objects.filter(thumbnail__isnull=False, image__isnull=False) + .exclude(thumbnail="") + .exclude(image="") + ) total = q.count() if not total: - return '' + return "" image_nb = random.randint(0, total - 1) return _get_image_link(q.all()[image_nb]) @@ -665,9 +689,9 @@ def get_random_item_image_link(request): def convert_coordinates_to_point(x, y, z=None, srid=4326): if z: - geom = GEOSGeometry('POINT({} {} {})'.format(x, y, z), srid=srid) + geom = GEOSGeometry("POINT({} {} {})".format(x, y, z), srid=srid) else: - geom = GEOSGeometry('POINT({} {})'.format(x, y), srid=srid) + geom = GEOSGeometry("POINT({} {})".format(x, y), srid=srid) if not geom.valid: raise forms.ValidationError(geom.valid_reason) return geom @@ -675,13 +699,13 @@ def convert_coordinates_to_point(x, y, z=None, srid=4326): def get_srid_obj_from_point(point): from ishtar_common.models import SpatialReferenceSystem + try: - return SpatialReferenceSystem.objects.get( - srid=int(point.srid)) + return SpatialReferenceSystem.objects.get(srid=int(point.srid)) except SpatialReferenceSystem.DoesNotExist: return SpatialReferenceSystem.objects.create( srid=int(point.srid), - auth_name='EPSG', + auth_name="EPSG", label="EPSG-{}".format(point.srid), txt_idx="epsg-{}".format(point.srid), ) @@ -691,7 +715,7 @@ def post_save_geo(sender, **kwargs): """ Convert raw x, y, z point to real geo field """ - if not kwargs.get('instance'): + if not kwargs.get("instance"): return # cache_key, value = get_cache( # sender, ["post_save_geo", kwargs['instance'].pk]) @@ -700,13 +724,14 @@ def post_save_geo(sender, **kwargs): # return # cache.set(cache_key, True, settings.CACHE_TASK_TIMEOUT) - instance = kwargs.get('instance') + instance = kwargs.get("instance") if hasattr(instance, "_no_geo_check") and instance._no_geo_check: return if not settings.USE_BACKGROUND_TASK: return _post_save_geo(sender, **kwargs) - sender, kwargs = serialize_args_for_tasks(sender, instance, - kwargs, EXTRA_KWARGS_TRIGGER) + sender, kwargs = serialize_args_for_tasks( + sender, instance, kwargs, EXTRA_KWARGS_TRIGGER + ) return _post_save_geo.delay(sender, **kwargs) @@ -718,18 +743,18 @@ def _post_save_geo(sender, **kwargs): profile = get_current_profile() if not profile.mapping: return - sender, instance = deserialize_args_for_tasks(sender, kwargs, - EXTRA_KWARGS_TRIGGER) + sender, instance = deserialize_args_for_tasks(sender, kwargs, EXTRA_KWARGS_TRIGGER) if not instance: return kls_name = instance.__class__.__name__ - if not profile.locate_warehouses and ("Container" in kls_name - or "Warehouse" in kls_name): + if not profile.locate_warehouses and ( + "Container" in kls_name or "Warehouse" in kls_name + ): return - if getattr(instance, '_post_saved_geo', False): + if getattr(instance, "_post_saved_geo", False): return # print(sender, "post_save_geo") @@ -739,24 +764,27 @@ def _post_save_geo(sender, **kwargs): current_source = str(instance.__class__._meta.verbose_name) modified = False - if hasattr(instance, 'multi_polygon') and not getattr( - instance, "DISABLE_POLYGONS", False): - if instance.multi_polygon_source_item and \ - instance.multi_polygon_source_item != current_source: # refetch + if hasattr(instance, "multi_polygon") and not getattr( + instance, "DISABLE_POLYGONS", False + ): + if ( + instance.multi_polygon_source_item + and instance.multi_polygon_source_item != current_source + ): # refetch instance.multi_polygon = None instance.multi_polygon_source = None modified = True if instance.multi_polygon and not instance.multi_polygon_source: # should be a db source - instance.multi_polygon_source = 'P' + instance.multi_polygon_source = "P" instance.multi_polygon_source_item = current_source - elif instance.multi_polygon_source != 'P': + elif instance.multi_polygon_source != "P": precise_poly = instance.get_precise_polygons() if precise_poly: poly, source_item = precise_poly instance.multi_polygon = poly - instance.multi_polygon_source = 'P' + instance.multi_polygon_source = "P" instance.multi_polygon_source_item = source_item modified = True elif profile.use_town_for_geo: @@ -765,21 +793,24 @@ def _post_save_geo(sender, **kwargs): poly, poly_source = poly if poly != instance.multi_polygon: instance.multi_polygon_source_item = poly_source - instance.multi_polygon_source = 'T' # town + instance.multi_polygon_source = "T" # town try: instance.multi_polygon = poly modified = True except TypeError: print(instance, instance.pk) - if (instance.point_source_item and - instance.point_source_item != current_source) or ( - instance.point_source == 'M'): # refetch + if ( + instance.point_source_item and instance.point_source_item != current_source + ) or ( + instance.point_source == "M" + ): # refetch csrs = instance.spatial_reference_system if instance.x and instance.y: new_point = GEOSGeometry( - 'POINT({} {})'.format(instance.x, instance.y), srid=csrs.srid) + "POINT({} {})".format(instance.x, instance.y), srid=csrs.srid + ) proj_point = instance.point_2d.transform(csrs.srid, clone=True) if new_point.distance(proj_point) < 0.01: instance.x, instance.y = None, None @@ -789,8 +820,9 @@ def _post_save_geo(sender, **kwargs): point = instance.point point_2d = instance.point_2d - if (point or point_2d) and instance.x is None and not \ - instance.point_source: # db source + if ( + (point or point_2d) and instance.x is None and not instance.point_source + ): # db source if point: current_point = point instance.z = point.z @@ -800,41 +832,48 @@ def _post_save_geo(sender, **kwargs): instance.y = current_point.y srs = get_srid_obj_from_point(current_point) instance.spatial_reference_system = srs - instance.point_source = 'P' + instance.point_source = "P" instance.point_source_item = current_source if not point_2d: instance.point_2d = convert_coordinates_to_point( - instance.point.x, instance.point.y, - srid=current_point.srid) + instance.point.x, instance.point.y, srid=current_point.srid + ) modified = True - elif instance.x and instance.y and \ - instance.spatial_reference_system and \ - instance.spatial_reference_system.auth_name == 'EPSG' and \ - instance.spatial_reference_system.srid != 0: + elif ( + instance.x + and instance.y + and instance.spatial_reference_system + and instance.spatial_reference_system.auth_name == "EPSG" + and instance.spatial_reference_system.srid != 0 + ): # form input or already precise try: point_2d = convert_coordinates_to_point( - instance.x, instance.y, - srid=instance.spatial_reference_system.srid) + instance.x, instance.y, srid=instance.spatial_reference_system.srid + ) except forms.ValidationError: return # irrelevant data in DB distance = 1 # arbitrary if point_2d and instance.point_2d: - distance = point_2d.transform( - 4326, clone=True).distance( - instance.point_2d.transform(4326, clone=True)) + distance = point_2d.transform(4326, clone=True).distance( + instance.point_2d.transform(4326, clone=True) + ) if instance.z: point = convert_coordinates_to_point( - instance.x, instance.y, instance.z, - srid=instance.spatial_reference_system.srid) + instance.x, + instance.y, + instance.z, + srid=instance.spatial_reference_system.srid, + ) # no change if distance inf to 1 mm - if distance >= 0.0001 and (point_2d != instance.point_2d - or point != instance.point): + if distance >= 0.0001 and ( + point_2d != instance.point_2d or point != instance.point + ): instance.point = point instance.point_2d = point_2d - instance.point_source = 'P' + instance.point_source = "P" instance.point_source_item = current_source modified = True else: @@ -845,7 +884,7 @@ def _post_save_geo(sender, **kwargs): point_2d, point, source_item = precise_points instance.point_2d = point_2d instance.point = point - instance.point_source = 'P' + instance.point_source = "P" instance.point_source_item = source_item instance.x = point_2d.x instance.y = point_2d.y @@ -856,16 +895,16 @@ def _post_save_geo(sender, **kwargs): modified = True else: centroid, source, point_source = None, None, None - if instance.multi_polygon and instance.multi_polygon_source == 'P': + if instance.multi_polygon and instance.multi_polygon_source == "P": source = current_source centroid = instance.multi_polygon.centroid - point_source = 'M' + point_source = "M" if not centroid and profile.use_town_for_geo: # try to get from # parent town_centroid = instance.get_town_centroid() if town_centroid: centroid, source = town_centroid - point_source = 'T' + point_source = "T" if centroid: instance.point_2d, instance.point_source_item = centroid, source instance.point = None @@ -892,34 +931,37 @@ def _post_save_geo(sender, **kwargs): instance.save() if hasattr(instance, "cascade_update"): instance.cascade_update() - cache_key, __ = get_cache( - sender, ["post_save_geo", instance.pk] - ) + cache_key, __ = get_cache(sender, ["post_save_geo", instance.pk]) cache.set(cache_key, None, settings.CACHE_TASK_TIMEOUT) return -def create_slug(model, name, slug_attr='slug', max_length=100): +def create_slug(model, name, slug_attr="slug", max_length=100): base_slug = slugify(name) slug = base_slug[:max_length] final_slug = None idx = 1 while not final_slug: - if slug and not model.objects.filter(**{slug_attr:slug}).exists(): + if slug and not model.objects.filter(**{slug_attr: slug}).exists(): final_slug = slug break - slug = base_slug[:(max_length - 1 - len(str(idx)))] + "-" + str(idx) + slug = base_slug[: (max_length - 1 - len(str(idx)))] + "-" + str(idx) idx += 1 return final_slug def get_all_field_names(model): - return list(set(chain.from_iterable( - (field.name, field.attname) if hasattr(field, 'attname') else ( - field.name,) - for field in model._meta.get_fields() - if not (field.many_to_one and field.related_model is None) - ))) + return list( + set( + chain.from_iterable( + (field.name, field.attname) + if hasattr(field, "attname") + else (field.name,) + for field in model._meta.get_fields() + if not (field.many_to_one and field.related_model is None) + ) + ) + ) def get_all_related_m2m_objects_with_model(model): @@ -932,16 +974,17 @@ def get_all_related_m2m_objects_with_model(model): def get_all_related_many_to_many_objects(model): return [ - f for f in model._meta.get_fields(include_hidden=True) + f + for f in model._meta.get_fields(include_hidden=True) if f.many_to_many and f.auto_created ] def get_all_related_objects(model): return [ - f for f in model._meta.get_fields() - if (f.one_to_many or f.one_to_one) - and f.auto_created and not f.concrete + f + for f in model._meta.get_fields() + if (f.one_to_many or f.one_to_one) and f.auto_created and not f.concrete ] @@ -972,24 +1015,23 @@ def merge_tsvectors(vectors): current_position = max_position for dct_member in vector.split(" "): - splitted = dct_member.split(':') + splitted = dct_member.split(":") key = ":".join(splitted[:-1]) positions = splitted[-1] key = key[1:-1] # remove quotes - positions = [int(pos) + current_position - for pos in positions.split(',')] + positions = [int(pos) + current_position for pos in positions.split(",")] if key in result_dict: result_dict[key] += positions else: result_dict[key] = positions # {'lamelie': [1, 42, 5]} => {'lamelie': "1,42,5"} - result_dict = {k: ",".join([str(val) for val in result_dict[k]]) - for k in result_dict} + result_dict = { + k: ",".join([str(val) for val in result_dict[k]]) for k in result_dict + } # {'lamelie': "1,5", "hagarde": "2", "regarde": "4"} => # "'lamelie':1,5 'hagarde':2 'regarde':4" - result = " ".join(["'{}':{}".format(k, result_dict[k]) - for k in result_dict]) + result = " ".join(["'{}':{}".format(k, result_dict[k]) for k in result_dict]) return result @@ -997,10 +1039,10 @@ def merge_tsvectors(vectors): def put_session_message(session_key, message, message_type): session = SessionStore(session_key=session_key) messages = [] - if 'messages' in session: - messages = session['messages'][:] + if "messages" in session: + messages = session["messages"][:] messages.append((str(message), message_type)) - session['messages'] = messages + session["messages"] = messages session.save() @@ -1019,7 +1061,7 @@ def get_session_var(session_key, key): def clean_session_cache(session): # clean session cache - cache_key_list = 'sessionlist-{}'.format(session.session_key) + cache_key_list = "sessionlist-{}".format(session.session_key) key_list = cache.get(cache_key_list, []) for key in key_list: cache.set(key, None, settings.CACHE_TIMEOUT) @@ -1039,7 +1081,7 @@ def get_field_labels_from_path(model, path): except: labels.append(key) continue - if hasattr(field, 'verbose_name'): + if hasattr(field, "verbose_name"): labels.append(field.verbose_name) else: labels.append(key) @@ -1051,19 +1093,20 @@ def create_default_areas(models=None, verbose=False): if not models: from ishtar_common.models import Area, Town, Department, State else: - Area = models['area'] - Town = models['town'] - Department = models['department'] - State = models['state'] + Area = models["area"] + Town = models["town"] + Department = models["department"] + State = models["state"] areas = {} idx = 0 for state in State.objects.all(): - slug = 'state-' + slugify(state.label) + slug = "state-" + slugify(state.label) area, created = Area.objects.get_or_create( - txt_idx=slug, defaults={'label': state.label}) - areas['state-{}'.format(state.pk)] = area + txt_idx=slug, defaults={"label": state.label} + ) + areas["state-{}".format(state.pk)] = area if created: idx += 1 if verbose: @@ -1071,15 +1114,16 @@ def create_default_areas(models=None, verbose=False): idx, idx2 = 0, 0 for dep in Department.objects.all(): - slug = 'dep-' + slugify(dep.label) + slug = "dep-" + slugify(dep.label) area, created = Area.objects.get_or_create( - txt_idx=slug, defaults={'label': dep.label}) - areas['dep-' + dep.number] = area + txt_idx=slug, defaults={"label": dep.label} + ) + areas["dep-" + dep.number] = area if created: idx += 1 if not dep.state_id: continue - state_slug = 'state-{}'.format(dep.state_id) + state_slug = "state-{}".format(dep.state_id) if state_slug not in areas: continue if area.parent and area.parent.pk == areas[state_slug].pk: @@ -1090,15 +1134,16 @@ def create_default_areas(models=None, verbose=False): if verbose: print( "* {} department areas added with {} associations to state".format( - idx, idx2) + idx, idx2 + ) ) idx = 0 for town in Town.objects.all(): if not town.numero_insee or len(town.numero_insee) != 5: continue - code_dep = 'dep-' + town.numero_insee[:2] - code_dep_dom = 'dep-' + town.numero_insee[:3] + code_dep = "dep-" + town.numero_insee[:2] + code_dep_dom = "dep-" + town.numero_insee[:3] if code_dep in areas: if not areas[code_dep].towns.filter(pk=town.pk).count(): areas[code_dep].towns.add(town) @@ -1112,9 +1157,17 @@ def create_default_areas(models=None, verbose=False): print("* {} town associated to department area".format(idx)) -def get_relations_for_graph(rel_model, obj_pk, above_relations=None, - equal_relations=None, treated=None, styles=None, - render_above=True, render_below=True, full=False): +def get_relations_for_graph( + rel_model, + obj_pk, + above_relations=None, + equal_relations=None, + treated=None, + styles=None, + render_above=True, + render_below=True, + full=False, +): """ Get all above and equal relations of an object (get all child and parent relations) @@ -1144,37 +1197,53 @@ def get_relations_for_graph(rel_model, obj_pk, above_relations=None, treated.append(obj_pk) for q, inverse in ( - (rel_model.objects.filter( - left_record_id=obj_pk, - relation_type__logical_relation__isnull=False), False), - (rel_model.objects.filter( - right_record_id=obj_pk, - relation_type__logical_relation__isnull=False), True)): - q = q.values("left_record_id",'right_record_id', - 'relation_type__logical_relation') + ( + rel_model.objects.filter( + left_record_id=obj_pk, relation_type__logical_relation__isnull=False + ), + False, + ), + ( + rel_model.objects.filter( + right_record_id=obj_pk, relation_type__logical_relation__isnull=False + ), + True, + ), + ): + q = q.values( + "left_record_id", "right_record_id", "relation_type__logical_relation" + ) get_above, get_below = render_above, render_below if inverse and (not render_above or not render_below): get_above, get_below = not render_above, not render_below for relation in q.all(): - logical_relation = relation['relation_type__logical_relation'] - left_record = relation['left_record_id'] - right_record = relation['right_record_id'] + logical_relation = relation["relation_type__logical_relation"] + left_record = relation["left_record_id"] + right_record = relation["right_record_id"] is_above, is_below = False, False if not logical_relation: continue - elif get_below and logical_relation == 'above' and \ - (left_record, right_record) not in above_relations: + elif ( + get_below + and logical_relation == "above" + and (left_record, right_record) not in above_relations + ): above_relations.append((left_record, right_record)) is_below = True - elif get_above and logical_relation == 'below' and \ - (right_record, left_record) not in above_relations: + elif ( + get_above + and logical_relation == "below" + and (right_record, left_record) not in above_relations + ): above_relations.append((right_record, left_record)) is_above = True - elif logical_relation == 'equal' and \ - (right_record, left_record) not in equal_relations and \ - (left_record, right_record) not in equal_relations: + elif ( + logical_relation == "equal" + and (right_record, left_record) not in equal_relations + and (left_record, right_record) not in equal_relations + ): equal_relations.append((left_record, right_record)) else: continue @@ -1183,27 +1252,40 @@ def get_relations_for_graph(rel_model, obj_pk, above_relations=None, other_record = left_record else: other_record = right_record - if get_above and get_below and not full and (is_below or - is_above): + if get_above and get_below and not full and (is_below or is_above): if (is_above and not inverse) or (is_below and inverse): ar, er, substyles = get_relations_for_graph( - rel_model, other_record, above_relations, - equal_relations, treated, styles, + rel_model, + other_record, + above_relations, + equal_relations, + treated, + styles, render_above=True, - render_below=False + render_below=False, ) else: ar, er, substyles = get_relations_for_graph( - rel_model, other_record, above_relations, - equal_relations, treated, styles, + rel_model, + other_record, + above_relations, + equal_relations, + treated, + styles, render_above=False, - render_below=True + render_below=True, ) else: ar, er, substyles = get_relations_for_graph( - rel_model, other_record, above_relations, equal_relations, - treated, styles, render_above=render_above, - render_below=render_below, full=full + rel_model, + other_record, + above_relations, + equal_relations, + treated, + styles, + render_above=render_above, + render_below=render_below, + full=full, ) styles.update(substyles) error_style = "color=red" @@ -1234,19 +1316,28 @@ def get_relations_for_graph(rel_model, obj_pk, above_relations=None, return above_relations, equal_relations, styles -def generate_relation_graph(obj, highlight_current=True, - render_above=True, render_below=True, - full=False, debug=False): +def generate_relation_graph( + obj, + highlight_current=True, + render_above=True, + render_below=True, + full=False, + debug=False, +): if not settings.DOT_BINARY: return model = obj.__class__ - rel_model = model._meta.get_field('right_relations').related_model + rel_model = model._meta.get_field("right_relations").related_model # get relations above_relations, equal_relations, styles = get_relations_for_graph( - rel_model, obj.pk, render_above=render_above, - render_below=render_below, full=full) + rel_model, + obj.pk, + render_above=render_above, + render_below=render_below, + full=full, + ) # generate dotfile dot_str = "digraph relations {\nnode [shape=box];\n" @@ -1258,13 +1349,10 @@ def generate_relation_graph(obj, highlight_current=True, if highlight_current: style += ',style=filled,fillcolor="#C6C0C0"' dot_str += 'item{}[{},href="{}"];\n'.format( - obj.pk, style, - reverse('display-item', - args=[model.SLUG, obj.pk]) + obj.pk, style, reverse("display-item", args=[model.SLUG, obj.pk]) ) rel_str += "}\n" - for list, directed in ((above_relations, True), - (equal_relations, False)): + for list, directed in ((above_relations, True), (equal_relations, False)): if directed: rel_str += "subgraph Dir {\n" else: @@ -1277,8 +1365,7 @@ def generate_relation_graph(obj, highlight_current=True, if left.pk == obj.pk and highlight_current: style += ',style=filled,fillcolor="#C6C0C0"' dot_str += 'item{}[{},href="{}"];\n'.format( - left.pk, style, - reverse('display-item', args=[model.SLUG, left.pk]) + left.pk, style, reverse("display-item", args=[model.SLUG, left.pk]) ) if right_pk not in described: described.append(right_pk) @@ -1287,22 +1374,24 @@ def generate_relation_graph(obj, highlight_current=True, if right.pk == obj.pk and highlight_current: style += ',style=filled,fillcolor="#C6C0C0"' dot_str += 'item{}[{},href="{}"];\n'.format( - right.pk, style, - reverse('display-item', args=[model.SLUG, right.pk]) + right.pk, + style, + reverse("display-item", args=[model.SLUG, right.pk]), ) if not directed: # on the same level rel_str += "{{rank = same; item{}; item{};}}\n".format( - left_pk, right_pk) + left_pk, right_pk + ) style = "" if (left_pk, right_pk) in styles: style = " [{}]".format(", ".join(styles[(left_pk, right_pk)])) - rel_str += 'item{} -> item{}{};\n'.format(left_pk, right_pk, style) + rel_str += "item{} -> item{}{};\n".format(left_pk, right_pk, style) rel_str += "}\n" dot_str += rel_str + "\n}" tempdir = tempfile.mkdtemp("-ishtardot") dot_name = tempdir + os.path.sep + "relations.dot" - with open(dot_name, 'w') as dot_file: + with open(dot_name, "w") as dot_file: dot_file.write(dot_str) if not render_above: @@ -1312,8 +1401,7 @@ def generate_relation_graph(obj, highlight_current=True, else: suffix = "" - if full and obj.MAIN_UP_MODEL_QUERY and getattr(obj, - obj.MAIN_UP_MODEL_QUERY): + if full and obj.MAIN_UP_MODEL_QUERY and getattr(obj, obj.MAIN_UP_MODEL_QUERY): obj = getattr(obj, obj.MAIN_UP_MODEL_QUERY) with open(dot_name, "r") as dot_file: @@ -1353,8 +1441,7 @@ def generate_relation_graph(obj, highlight_current=True, png_name = tempdir + os.path.sep + "relations.png" with open(png_name, "wb") as png_file: - svg2png(open(svg_tmp_name, 'rb').read(), - write_to=png_file) + svg2png(open(svg_tmp_name, "rb").read(), write_to=png_file) with open(png_name, "rb") as png_file: django_file = File(png_file) attr = "relation_bitmap_image" + suffix @@ -1391,58 +1478,79 @@ def create_default_json_fields(model): content_type = ContentType.objects.get_for_model(model) for key in keys: JsonDataField.objects.get_or_create( - content_type=content_type, key=key, + content_type=content_type, + key=key, defaults={ - 'name': " ".join(key.split('__')).capitalize(), - 'value_type': 'T', - 'display': False - } + "name": " ".join(key.split("__")).capitalize(), + "value_type": "T", + "display": False, + }, ) -def get_urls_for_model(model, views, own=False, autocomplete=False, - ): +def get_urls_for_model( + model, + views, + own=False, + autocomplete=False, +): """ Generate get and show url for a model """ urls = [ - url(r'show-{}(?:/(?P<pk>.+))?/(?P<type>.+)?$'.format(model.SLUG), - check_rights(['view_' + model.SLUG, 'view_own_' + model.SLUG])( - getattr(views, 'show_' + model.SLUG)), - name="show-" + model.SLUG), - url(r'^display-{}/(?P<pk>.+)/$'.format(model.SLUG), - check_rights(['view_' + model.SLUG, 'view_own_' + model.SLUG])( - getattr(views, 'display_' + model.SLUG)), - name='display-' + model.SLUG), + url( + r"show-{}(?:/(?P<pk>.+))?/(?P<type>.+)?$".format(model.SLUG), + check_rights(["view_" + model.SLUG, "view_own_" + model.SLUG])( + getattr(views, "show_" + model.SLUG) + ), + name="show-" + model.SLUG, + ), + url( + r"^display-{}/(?P<pk>.+)/$".format(model.SLUG), + check_rights(["view_" + model.SLUG, "view_own_" + model.SLUG])( + getattr(views, "display_" + model.SLUG) + ), + name="display-" + model.SLUG, + ), ] if own: urls += [ - url(r'get-{}/own/(?P<type>.+)?$'.format(model.SLUG), - check_rights(['view_' + model.SLUG, 'view_own_' + model.SLUG])( - getattr(views, 'get_' + model.SLUG)), - name="get-own-" + model.SLUG, kwargs={'force_own': True}), + url( + r"get-{}/own/(?P<type>.+)?$".format(model.SLUG), + check_rights(["view_" + model.SLUG, "view_own_" + model.SLUG])( + getattr(views, "get_" + model.SLUG) + ), + name="get-own-" + model.SLUG, + kwargs={"force_own": True}, + ), ] urls += [ - url(r'get-{}/(?P<type>.+)?$'.format(model.SLUG), - check_rights(['view_' + model.SLUG, 'view_own_' + model.SLUG])( - getattr(views, 'get_' + model.SLUG)), - name="get-" + model.SLUG), + url( + r"get-{}/(?P<type>.+)?$".format(model.SLUG), + check_rights(["view_" + model.SLUG, "view_own_" + model.SLUG])( + getattr(views, "get_" + model.SLUG) + ), + name="get-" + model.SLUG, + ), ] if autocomplete: urls += [ - url(r'autocomplete-{}/$'.format(model.SLUG), - check_rights(['view_' + model.SLUG, 'view_own_' + model.SLUG])( - getattr(views, 'autocomplete_' + model.SLUG)), - name='autocomplete-' + model.SLUG), + url( + r"autocomplete-{}/$".format(model.SLUG), + check_rights(["view_" + model.SLUG, "view_own_" + model.SLUG])( + getattr(views, "autocomplete_" + model.SLUG) + ), + name="autocomplete-" + model.SLUG, + ), ] return urls def m2m_historization_changed(sender, **kwargs): - obj = kwargs.get('instance', None) + obj = kwargs.get("instance", None) if not obj: return hist_values = obj.history_m2m or {} @@ -1454,11 +1562,11 @@ def m2m_historization_changed(sender, **kwargs): values.append(value.history_compress()) hist_values[attr] = values obj.history_m2m = hist_values - if getattr(obj, 'skip_history_when_saving', False): + if getattr(obj, "skip_history_when_saving", False): # assume the last modifier is good... q = obj.history.filter( history_modifier_id=obj.history_modifier_id, - ).order_by('-history_date', '-history_id') + ).order_by("-history_date", "-history_id") if q.count(): hist = q.all()[0] hist.history_m2m = hist_values @@ -1507,8 +1615,7 @@ def get_broken_links(path): target_path = os.readlink(path) # resolve relative symlinks if not os.path.isabs(target_path): - target_path = os.path.join(os.path.dirname(path), - target_path) + target_path = os.path.join(os.path.dirname(path), target_path) if not os.path.exists(target_path): yield path @@ -1539,12 +1646,12 @@ def simplify_name(full_path_name, check_existing=False, min_len=15): for m in regex.finditer(name): # get the last match match = m if match: - new_name = name.replace(match.group(), '') + new_name = name.replace(match.group(), "") full_new_name = os.sep.join([path, new_name + ext]) try: is_file = os.path.isfile(full_new_name) except UnicodeEncodeError: - is_file = os.path.isfile(full_new_name.encode('utf-8')) + is_file = os.path.isfile(full_new_name.encode("utf-8")) if not check_existing or not is_file: # do not take the place of another file name = new_name[:] @@ -1565,14 +1672,13 @@ def rename_and_simplify_media_name(full_path_name, rename=True): exists = os.path.exists(full_path_name) is_file = os.path.isfile(full_path_name) except UnicodeEncodeError: - full_path_name_unicode = full_path_name.encode('utf-8') + full_path_name_unicode = full_path_name.encode("utf-8") exists = os.path.exists(full_path_name_unicode) is_file = os.path.isfile(full_path_name_unicode) if not exists or not is_file: return full_path_name, False - path, current_name, name = simplify_name(full_path_name, - check_existing=True) + path, current_name, name = simplify_name(full_path_name, check_existing=True) if current_name == name: return full_path_name, False @@ -1595,8 +1701,9 @@ def get_file_fields(): return fields -def get_used_media(exclude=None, limit=None, - return_object_and_field=False, debug=False): +def get_used_media( + exclude=None, limit=None, return_object_and_field=False, debug=False +): """ Get media which are still used in models @@ -1617,28 +1724,32 @@ def get_used_media(exclude=None, limit=None, continue if limit and str(field) not in limit: continue - is_null = {'%s__isnull' % field.name: True} - is_empty = {'%s' % field.name: ''} + is_null = {"%s__isnull" % field.name: True} + is_empty = {"%s" % field.name: ""} storage = field.storage if debug: print("") - q = field.model.objects.values('id', field.name)\ - .exclude(**is_empty).exclude(**is_null) + q = ( + field.model.objects.values("id", field.name) + .exclude(**is_empty) + .exclude(**is_null) + ) ln = q.count() for idx, res in enumerate(q): value = res[field.name] if debug: - sys.stdout.write("* get_used_media {}: {}/{}\r".format( - field, idx, ln)) + sys.stdout.write("* get_used_media {}: {}/{}\r".format(field, idx, ln)) sys.stdout.flush() if value not in EMPTY_VALUES: if return_object_and_field: - media.append(( - field.model.objects.get(pk=res['id']), - field.name, - storage.path(value) - )) + media.append( + ( + field.model.objects.get(pk=res["id"]), + field.name, + storage.path(value), + ) + ) else: media.add(storage.path(value)) return media @@ -1662,15 +1773,17 @@ def get_all_media(exclude=None, debug=False): print("") for idx, name in enumerate(files): if debug: - sys.stdout.write("* get_all_media {} ({}/{}): {}/{}\r".format( - root.encode('utf-8'), idx_main, ln_full, idx, ln)) + sys.stdout.write( + "* get_all_media {} ({}/{}): {}/{}\r".format( + root.encode("utf-8"), idx_main, ln_full, idx, ln + ) + ) sys.stdout.flush() path = os.path.abspath(os.path.join(root, name)) relpath = os.path.relpath(path, settings.MEDIA_ROOT) in_exclude = False for e in exclude: - if re.match(r'^%s$' % re.escape(e).replace('\\*', '.*'), - relpath): + if re.match(r"^%s$" % re.escape(e).replace("\\*", ".*"), relpath): in_exclude = True break @@ -1678,8 +1791,11 @@ def get_all_media(exclude=None, debug=False): media.add(path) else: if debug: - sys.stdout.write("* get_all_media {} ({}/{})\r".format( - root.encode('utf-8'), idx_main, ln_full)) + sys.stdout.write( + "* get_all_media {} ({}/{})\r".format( + root.encode("utf-8"), idx_main, ln_full + ) + ) return media @@ -1763,24 +1879,27 @@ def try_fix_file(filename, make_copy=True, hard=False): """ filename = os.path.abspath(filename) path, current_name, simplified_ref_name = simplify_name( - filename, check_existing=False, min_len=10) + filename, check_existing=False, min_len=10 + ) try: dirs = list(sorted(os.listdir(path))) except UnicodeDecodeError: - dirs = list(sorted(os.listdir(path.encode('utf-8')))) + dirs = list(sorted(os.listdir(path.encode("utf-8")))) # check existing files in the path for f in dirs: - result = _try_copy(path, f, filename, simplified_ref_name, make_copy, - min_len=10) + result = _try_copy( + path, f, filename, simplified_ref_name, make_copy, min_len=10 + ) if result: return result if not hard: return for path, __, files in os.walk(settings.MEDIA_ROOT): for f in files: - result = _try_copy(path, f, filename, simplified_ref_name, - make_copy, min_len=10) + result = _try_copy( + path, f, filename, simplified_ref_name, make_copy, min_len=10 + ) if result: return result @@ -1797,34 +1916,34 @@ PARSE_JINJA_IF = re.compile("{% if ([^}]*)}") def _deduplicate(value): new_values = [] - for v in value.split('-'): + for v in value.split("-"): if v not in new_values: new_values.append(v) - return '-'.join(new_values) + return "-".join(new_values) FORMULA_FILTERS = { - 'upper': lambda x: x.upper(), - 'lower': lambda x: x.lower(), - 'capitalize': lambda x: x.capitalize(), - 'slug': lambda x: slugify(x), - 'deduplicate': _deduplicate + "upper": lambda x: x.upper(), + "lower": lambda x: x.lower(), + "capitalize": lambda x: x.capitalize(), + "slug": lambda x: slugify(x), + "deduplicate": _deduplicate, } def _update_gen_id_dct(item, dct, initial_key, fkey=None, filters=None): if not fkey: fkey = initial_key[:] - if fkey.startswith('settings__'): - dct[fkey] = getattr(settings, fkey[len('settings__'):]) or '' + if fkey.startswith("settings__"): + dct[fkey] = getattr(settings, fkey[len("settings__") :]) or "" return obj = item - for k in fkey.split('__'): + for k in fkey.split("__"): try: obj = getattr(obj, k) except (ObjectDoesNotExist, AttributeError): obj = None - if hasattr(obj, 'all') and hasattr(obj, 'count'): # query manager + if hasattr(obj, "all") and hasattr(obj, "count"): # query manager if not obj.count(): break obj = obj.all()[0] @@ -1833,7 +1952,7 @@ def _update_gen_id_dct(item, dct, initial_key, fkey=None, filters=None): if obj is None: break if obj is None: - dct[initial_key] = '' + dct[initial_key] = "" else: dct[initial_key] = str(obj) if filters: @@ -1867,13 +1986,14 @@ def get_generated_id(key, item): new_keys = [] for key in key_list: if key.startswith("not "): - key = key[len("not "):].strip() + key = key[len("not ") :].strip() key = key.split(".")[0] if " % " in key: keys = key.split(" % ")[1] keys = [ i.replace("(", "").replace(")", "").split("|")[0].strip() - for i in keys.split(",")] + for i in keys.split(",") + ] else: keys = [key] new_keys += keys @@ -1884,7 +2004,7 @@ def get_generated_id(key, item): return tpl.render(dct) for fkey in PARSE_FORMULA.findall(formula): - filtered = fkey.split('|') + filtered = fkey.split("|") initial_key = fkey[:] fkey = filtered[0] filters = [] @@ -1892,17 +2012,17 @@ def get_generated_id(key, item): if filtr in FORMULA_FILTERS: filters.append(FORMULA_FILTERS[filtr]) _update_gen_id_dct(item, dct, initial_key, fkey, filters=filters) - values = formula.format(**dct).split('||') + values = formula.format(**dct).split("||") value = values[0] for filtr in values[1:]: if filtr not in FORMULA_FILTERS: - value += '||' + filtr + value += "||" + filtr continue value = FORMULA_FILTERS[filtr](value) return value -PRIVATE_FIELDS = ('id', 'history_modifier', 'order', 'uuid') +PRIVATE_FIELDS = ("id", "history_modifier", "order", "uuid") def duplicate_item(item, user=None, data=None): @@ -1924,8 +2044,11 @@ def duplicate_item(item, user=None, data=None): new.save() # m2m fields - m2m = [field.name for field in model._meta.many_to_many - if field.name not in PRIVATE_FIELDS] + m2m = [ + field.name + for field in model._meta.many_to_many + if field.name not in PRIVATE_FIELDS + ] for field in m2m: for val in getattr(item, field).all(): if val not in getattr(new, field).all(): @@ -1935,8 +2058,7 @@ def duplicate_item(item, user=None, data=None): def get_image_path(instance, filename): # when using migrations instance is not a real ImageModel instance - if not hasattr(instance, '_get_image_path'): + if not hasattr(instance, "_get_image_path"): n = datetime.datetime.now() - return "upload/{}/{:02d}/{:02d}/{}".format( - n.year, n.month, n.day, filename) + return "upload/{}/{:02d}/{:02d}/{}".format(n.year, n.month, n.day, filename) return instance._get_image_path(filename) |