summaryrefslogtreecommitdiff
path: root/ishtar_common/utils.py
diff options
context:
space:
mode:
Diffstat (limited to 'ishtar_common/utils.py')
-rw-r--r--ishtar_common/utils.py808
1 files changed, 465 insertions, 343 deletions
diff --git a/ishtar_common/utils.py b/ishtar_common/utils.py
index b4542688b..fa24b3dcb 100644
--- a/ishtar_common/utils.py
+++ b/ishtar_common/utils.py
@@ -1,4 +1,4 @@
-#!/usr/bin/env python
+#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright (C) 2013-2016 Étienne Loks <etienne.loks_AT_peacefrogsDOTnet>
@@ -62,11 +62,19 @@ from django.template.defaultfilters import slugify
if settings.USE_TRANSLATION_OVERLOAD:
- from overload_translation.utils import ugettext_lazy, ugettext, \
- pgettext_lazy, pgettext
+ from overload_translation.utils import (
+ ugettext_lazy,
+ ugettext,
+ pgettext_lazy,
+ pgettext,
+ )
else:
- from django.utils.translation import ugettext_lazy, ugettext, \
- pgettext_lazy, pgettext
+ from django.utils.translation import (
+ ugettext_lazy,
+ ugettext,
+ pgettext_lazy,
+ pgettext,
+ )
_ = ugettext_lazy
@@ -87,6 +95,7 @@ def debug_line_no():
def fake_task(*args):
def fake(func):
return func
+
return fake
@@ -94,6 +103,7 @@ task = fake_task
if settings.USE_BACKGROUND_TASK:
try:
from celery import shared_task
+
task = shared_task
except ModuleNotFoundError:
pass
@@ -103,23 +113,24 @@ class BColors:
"""
Bash colors. Don't forget to finish your colored string with ENDC.
"""
- HEADER = '\033[95m'
- OKBLUE = '\033[94m'
- OKGREEN = '\033[92m'
- WARNING = '\033[93m'
- FAIL = '\033[91m'
- ENDC = '\033[0m'
- BOLD = '\033[1m'
- UNDERLINE = '\033[4m'
+
+ HEADER = "\033[95m"
+ OKBLUE = "\033[94m"
+ OKGREEN = "\033[92m"
+ WARNING = "\033[93m"
+ FAIL = "\033[91m"
+ ENDC = "\033[0m"
+ BOLD = "\033[1m"
+ UNDERLINE = "\033[4m"
class Round(models.Func):
- function = 'ROUND'
+ function = "ROUND"
arity = 2
- arg_joiner = '::numeric, '
+ arg_joiner = "::numeric, "
-CSV_OPTIONS = {'delimiter': ',', 'quotechar': '"', 'quoting': QUOTE_ALL}
+CSV_OPTIONS = {"delimiter": ",", "quotechar": '"', "quoting": QUOTE_ALL}
def is_safe_path(basedir, path, follow_symlinks=True):
@@ -134,22 +145,23 @@ def import_class(full_path_classname):
"""
Return the model class from the full path
"""
- mods = full_path_classname.split('.')
+ mods = full_path_classname.split(".")
if len(mods) == 1:
- mods = ['ishtar_common', 'models', mods[0]]
- elif 'models' not in mods and 'models_finds' not in mods \
- and 'models_treatments' not in mods:
- raise SuspiciousOperation(
- "Try to import a non model from a string")
- module = import_module('.'.join(mods[:-1]))
+ mods = ["ishtar_common", "models", mods[0]]
+ elif (
+ "models" not in mods
+ and "models_finds" not in mods
+ and "models_treatments" not in mods
+ ):
+ raise SuspiciousOperation("Try to import a non model from a string")
+ module = import_module(".".join(mods[:-1]))
model = getattr(module, mods[-1])
if not issubclass(model, models.Model):
- raise SuspiciousOperation(
- "Try to import a non model from a string")
+ raise SuspiciousOperation("Try to import a non model from a string")
return model
-def check_rights(rights=None, redirect_url='/'):
+def check_rights(rights=None, redirect_url="/"):
"""
Decorator that checks the rights to access the view.
"""
@@ -158,25 +170,25 @@ def check_rights(rights=None, redirect_url='/'):
def _wrapped_view(request, *args, **kwargs):
if not rights:
return view_func(request, *args, **kwargs)
- if hasattr(request.user, 'ishtaruser'):
- if request.user.ishtaruser.has_right('administrator',
- request.session):
- kwargs['current_right'] = 'administrator'
+ if hasattr(request.user, "ishtaruser"):
+ if request.user.ishtaruser.has_right("administrator", request.session):
+ kwargs["current_right"] = "administrator"
return view_func(request, *args, **kwargs)
for right in rights:
# be careful to put the more permissive rights first
# if granted it can allow more
- if request.user.ishtaruser.has_right(right,
- request.session):
- kwargs['current_right'] = right
+ if request.user.ishtaruser.has_right(right, request.session):
+ kwargs["current_right"] = right
return view_func(request, *args, **kwargs)
put_session_message(
request.session.session_key,
_("You don't have sufficient permissions to do this action."),
- 'warning'
+ "warning",
)
return HttpResponseRedirect(redirect_url)
+
return _wrapped_view
+
return decorator
@@ -184,14 +196,16 @@ def check_rights_condition(rights):
"""
To be used to check in wizard condition_dict
"""
+
def func(self):
request = self.request
- if request.user.ishtaruser.has_right('administrator', request.session):
+ if request.user.ishtaruser.has_right("administrator", request.session):
return True
for right in rights:
if request.user.ishtaruser.has_right(right, request.session):
return True
return False
+
return func
@@ -211,13 +225,15 @@ def check_model_access_control(request, model, available_perms=None):
return allowed, own
if not available_perms:
- available_perms = ['view_' + model.__name__.lower(),
- 'view_own_' + model.__name__.lower()]
+ available_perms = [
+ "view_" + model.__name__.lower(),
+ "view_own_" + model.__name__.lower(),
+ ]
try:
ishtaruser = request.user.ishtaruser
except request.user._meta.model.ishtaruser.RelatedObjectDoesNotExist:
return False, True
- if ishtaruser.has_right('administrator', session=request.session):
+ if ishtaruser.has_right("administrator", session=request.session):
allowed = True
own = False
return allowed, own
@@ -266,12 +282,12 @@ def move_dict_data(data, key1, key2):
"data__")
:return: result data
"""
- keys1 = key1.split('__')
- keys2 = key2.split('__')
+ keys1 = key1.split("__")
+ keys2 = key2.split("__")
value = data
for idx, key in enumerate(keys1):
if not idx:
- if key != 'data':
+ if key != "data":
return data
continue
if key not in value:
@@ -284,7 +300,7 @@ def move_dict_data(data, key1, key2):
new_value = data
for idx, key in enumerate(keys2):
if not idx:
- if key != 'data':
+ if key != "data":
return data
continue
if idx == (len(keys2) - 1): # last
@@ -333,10 +349,10 @@ def is_downloadable(url):
"""
h = requests.head(url, allow_redirects=True)
header = h.headers
- content_type = header.get('content-type')
- if 'text' in content_type.lower():
+ content_type = header.get("content-type")
+ if "text" in content_type.lower():
return False
- if 'html' in content_type.lower():
+ if "html" in content_type.lower():
return False
return True
@@ -348,21 +364,21 @@ def get_current_year():
def get_cache(cls, extra_args=tuple(), app_label=None):
if not app_label:
app_label = cls._meta.app_label
- cache_key = "{}-{}-{}".format(
- settings.PROJECT_SLUG, app_label, cls.__name__)
+ cache_key = "{}-{}-{}".format(settings.PROJECT_SLUG, app_label, cls.__name__)
for arg in extra_args:
if not arg:
- cache_key += '-0'
+ cache_key += "-0"
else:
if type(arg) == dict:
- cache_key += '-' + "_".join([str(arg[k]) for k in arg])
+ cache_key += "-" + "_".join([str(arg[k]) for k in arg])
elif type(arg) in (list, tuple):
- cache_key += '-' + "_".join([str(v) for v in arg])
+ cache_key += "-" + "_".join([str(v) for v in arg])
else:
- cache_key += '-' + str(arg)
+ cache_key += "-" + str(arg)
cache_key = slugify(cache_key)
- if not cache_key.endswith('_current_keys') \
- and hasattr(cls, '_add_cache_key_to_refresh'):
+ if not cache_key.endswith("_current_keys") and hasattr(
+ cls, "_add_cache_key_to_refresh"
+ ):
cls._add_cache_key_to_refresh(extra_args)
if len(cache_key) >= 250:
m = hashlib.md5()
@@ -372,9 +388,9 @@ def get_cache(cls, extra_args=tuple(), app_label=None):
def force_cached_label_changed(sender, **kwargs):
- if not kwargs.get('instance'):
+ if not kwargs.get("instance"):
return
- kwargs['instance']._cached_label_checked = False
+ kwargs["instance"]._cached_label_checked = False
cached_label_changed(sender, **kwargs)
@@ -384,10 +400,10 @@ class SecretaryRenderer(MainSecretaryRenderer):
Overload _pack_document: obsolete files can be referenced - continue
on null content for files
"""
- self.log.debug('packing document')
+ self.log.debug("packing document")
zip_file = io.BytesIO()
- zipdoc = zipfile.ZipFile(zip_file, 'a')
+ zipdoc = zipfile.ZipFile(zip_file, "a")
for fname, content in files.items():
if isinstance(content, UndefinedSilently):
continue
@@ -395,21 +411,20 @@ class SecretaryRenderer(MainSecretaryRenderer):
zipdoc.writestr(fname, content, zipfile.ZIP_DEFLATED)
else:
zipdoc.writestr(fname, content)
- self.log.debug('Document packing completed')
+ self.log.debug("Document packing completed")
return zip_file
def serialize_args_for_tasks(sender, instance, kwargs, extra_kwargs=None):
- if 'instance' in kwargs:
- kwargs['instance'] = kwargs["instance"].pk
+ if "instance" in kwargs:
+ kwargs["instance"] = kwargs["instance"].pk
sender = (sender._meta.app_label, sender._meta.object_name)
if extra_kwargs:
for kw in extra_kwargs:
if getattr(instance, kw, None):
kwargs[kw] = getattr(instance, kw)
for k in list(kwargs.keys()):
- if k in ["model", "signal",
- "_cached_labels_bulk_update"] or kwargs[k] is None:
+ if k in ["model", "signal", "_cached_labels_bulk_update"] or kwargs[k] is None:
kwargs.pop(k)
continue
if isinstance(kwargs[k], set):
@@ -429,9 +444,9 @@ def deserialize_args_for_tasks(sender, kwargs, extra_kwargs=None):
# waiting for it
while not instance and retried < 6:
if retried:
- time.sleep(.5)
- if sender.objects.filter(pk=kwargs['instance']).count():
- instance = sender.objects.get(pk=kwargs['instance'])
+ time.sleep(0.5)
+ if sender.objects.filter(pk=kwargs["instance"]).count():
+ instance = sender.objects.get(pk=kwargs["instance"])
else:
retried += 1
if not instance:
@@ -444,8 +459,12 @@ def deserialize_args_for_tasks(sender, kwargs, extra_kwargs=None):
EXTRA_KWARGS_TRIGGER = [
- "_cascade_change", "_cached_labels_bulk_update", "skip_history_when_saving",
- "_post_saved_geo", "_search_updated", "_cached_label_checked"
+ "_cascade_change",
+ "_cached_labels_bulk_update",
+ "skip_history_when_saving",
+ "_post_saved_geo",
+ "_search_updated",
+ "_cached_label_checked",
]
@@ -455,11 +474,11 @@ def cached_label_and_geo_changed(sender, **kwargs):
def cached_label_changed(sender, **kwargs):
- if not kwargs.get('instance'):
+ if not kwargs.get("instance"):
return
- instance = kwargs.get('instance')
+ instance = kwargs.get("instance")
- if hasattr(instance, 'test_obj'):
+ if hasattr(instance, "test_obj"):
instance.test_obj.reached(sender, **kwargs)
# cache_key, value = get_cache(
@@ -470,8 +489,11 @@ def cached_label_changed(sender, **kwargs):
# return
# cache.set(cache_key, True, settings.CACHE_TASK_TIMEOUT)
- if not settings.USE_BACKGROUND_TASK or not instance.pk \
- or not sender.objects.filter(pk=instance.pk).count():
+ if (
+ not settings.USE_BACKGROUND_TASK
+ or not instance.pk
+ or not sender.objects.filter(pk=instance.pk).count()
+ ):
# no background task or not yet fully saved
return _cached_label_changed(sender, **kwargs)
@@ -479,34 +501,34 @@ def cached_label_changed(sender, **kwargs):
kwargs["cascade_change"] = True
sender, kwargs = serialize_args_for_tasks(
- sender, instance, kwargs, EXTRA_KWARGS_TRIGGER)
+ sender, instance, kwargs, EXTRA_KWARGS_TRIGGER
+ )
return _cached_label_changed.delay(sender, **kwargs)
@task()
def _cached_label_changed(sender, **kwargs):
- sender, instance = deserialize_args_for_tasks(sender, kwargs,
- EXTRA_KWARGS_TRIGGER)
+ sender, instance = deserialize_args_for_tasks(sender, kwargs, EXTRA_KWARGS_TRIGGER)
if not instance:
return
- force_update = kwargs.get('force_update', False)
+ force_update = kwargs.get("force_update", False)
if hasattr(instance, "need_update") and instance.need_update:
force_update = True
instance.skip_history_when_saving = True
- if not force_update and getattr(instance, '_cached_label_checked', False):
+ if not force_update and getattr(instance, "_cached_label_checked", False):
return
if hasattr(instance, "refresh_cache"):
instance.refresh_cache()
instance._cached_label_checked = True
- cached_labels = ['cached_label']
- if hasattr(instance, 'CACHED_LABELS'):
+ cached_labels = ["cached_label"]
+ if hasattr(instance, "CACHED_LABELS"):
cached_labels = instance.CACHED_LABELS
changed = []
for cached_label in cached_labels:
- gen_func = '_generate_' + cached_label
+ gen_func = "_generate_" + cached_label
if not hasattr(instance, gen_func):
continue
lbl = getattr(instance, gen_func)()
@@ -520,26 +542,23 @@ def _cached_label_changed(sender, **kwargs):
if changed:
instance._search_updated = False
- if hasattr(instance, '_cascade_change') and instance._cascade_change:
+ if hasattr(instance, "_cascade_change") and instance._cascade_change:
instance.skip_history_when_saving = True
- instance.__class__.objects.filter(pk=instance.pk).update(
- **dict(changed))
+ instance.__class__.objects.filter(pk=instance.pk).update(**dict(changed))
if (changed or not cached_labels) and hasattr(instance, "cascade_update"):
instance.cascade_update()
updated = False
- if force_update or hasattr(instance, 'update_search_vector'):
+ if force_update or hasattr(instance, "update_search_vector"):
updated = instance.update_search_vector()
- if hasattr(instance, '_cached_labels_bulk_update'):
+ if hasattr(instance, "_cached_labels_bulk_update"):
updated = instance._cached_labels_bulk_update() or updated
- if not updated and hasattr(instance, '_get_associated_cached_labels'):
+ if not updated and hasattr(instance, "_get_associated_cached_labels"):
for item in instance._get_associated_cached_labels():
item._cascade_change = True
- if hasattr(instance, 'test_obj'):
+ if hasattr(instance, "test_obj"):
item.test_obj = instance.test_obj
cached_label_changed(item.__class__, instance=item)
- cache_key, __ = get_cache(
- sender, ["cached_label_changed", instance.pk]
- )
+ cache_key, __ = get_cache(sender, ["cached_label_changed", instance.pk])
cache.set(cache_key, None, settings.CACHE_TASK_TIMEOUT)
if cached_labels:
return getattr(instance, cached_labels[0], "")
@@ -560,10 +579,10 @@ def regenerate_all_cached_labels(model):
def shortify(lbl, number=20):
SHORTIFY_STR = ugettext(" (...)")
if not lbl:
- lbl = ''
+ lbl = ""
if len(lbl) <= number:
return lbl
- return lbl[:number - len(SHORTIFY_STR)] + SHORTIFY_STR
+ return lbl[: number - len(SHORTIFY_STR)] + SHORTIFY_STR
def mode(array):
@@ -578,9 +597,10 @@ def disable_for_loaddata(signal_handler):
@wraps(signal_handler)
def wrapper(*args, **kwargs):
- if kwargs.get('raw'):
+ if kwargs.get("raw"):
return
signal_handler(*args, **kwargs)
+
return wrapper
@@ -588,8 +608,7 @@ def _get_image_link(doc):
from ishtar_common.models import IshtarSiteProfile
# manage missing images
- if not doc.thumbnail or not doc.thumbnail.url or not doc.image \
- or not doc.image.url:
+ if not doc.thumbnail or not doc.thumbnail.url or not doc.image or not doc.image.url:
return ""
item = None
@@ -607,7 +626,8 @@ def _get_image_link(doc):
if item.__class__.__name__ == "ArchaeologicalSite":
item_class_name = str(IshtarSiteProfile.get_default_site_label())
- return mark_safe("""
+ return mark_safe(
+ """
<div class="col col-lg-3">
<div class="card">
<div id="lightgallery-rand-img">
@@ -630,34 +650,38 @@ def _get_image_link(doc):
<script type="text/javascript">
lightGallery(document.getElementById('lightgallery-rand-img'));
</script>""".format(
- doc.image.url,
- doc.thumbnail.url,
- item_class_name,
- str(item),
- reverse(item.SHOW_URL, args=[item.pk, '']),
- str(_("Information")),
- str(_("Load another random image?"))))
+ doc.image.url,
+ doc.thumbnail.url,
+ item_class_name,
+ str(item),
+ reverse(item.SHOW_URL, args=[item.pk, ""]),
+ str(_("Information")),
+ str(_("Load another random image?")),
+ )
+ )
def get_random_item_image_link(request):
from ishtar_common.models import Document
- if not hasattr(request.user, 'ishtaruser'):
- return ''
+ if not hasattr(request.user, "ishtaruser"):
+ return ""
ishtar_user = request.user.ishtaruser
- if not ishtar_user.has_right('ishtar_common.view_document',
- session=request.session):
- return ''
+ if not ishtar_user.has_right(
+ "ishtar_common.view_document", session=request.session
+ ):
+ return ""
- q = Document.objects.filter(
- thumbnail__isnull=False,
- image__isnull=False
- ).exclude(thumbnail='').exclude(image='')
+ q = (
+ Document.objects.filter(thumbnail__isnull=False, image__isnull=False)
+ .exclude(thumbnail="")
+ .exclude(image="")
+ )
total = q.count()
if not total:
- return ''
+ return ""
image_nb = random.randint(0, total - 1)
return _get_image_link(q.all()[image_nb])
@@ -665,9 +689,9 @@ def get_random_item_image_link(request):
def convert_coordinates_to_point(x, y, z=None, srid=4326):
if z:
- geom = GEOSGeometry('POINT({} {} {})'.format(x, y, z), srid=srid)
+ geom = GEOSGeometry("POINT({} {} {})".format(x, y, z), srid=srid)
else:
- geom = GEOSGeometry('POINT({} {})'.format(x, y), srid=srid)
+ geom = GEOSGeometry("POINT({} {})".format(x, y), srid=srid)
if not geom.valid:
raise forms.ValidationError(geom.valid_reason)
return geom
@@ -675,13 +699,13 @@ def convert_coordinates_to_point(x, y, z=None, srid=4326):
def get_srid_obj_from_point(point):
from ishtar_common.models import SpatialReferenceSystem
+
try:
- return SpatialReferenceSystem.objects.get(
- srid=int(point.srid))
+ return SpatialReferenceSystem.objects.get(srid=int(point.srid))
except SpatialReferenceSystem.DoesNotExist:
return SpatialReferenceSystem.objects.create(
srid=int(point.srid),
- auth_name='EPSG',
+ auth_name="EPSG",
label="EPSG-{}".format(point.srid),
txt_idx="epsg-{}".format(point.srid),
)
@@ -691,7 +715,7 @@ def post_save_geo(sender, **kwargs):
"""
Convert raw x, y, z point to real geo field
"""
- if not kwargs.get('instance'):
+ if not kwargs.get("instance"):
return
# cache_key, value = get_cache(
# sender, ["post_save_geo", kwargs['instance'].pk])
@@ -700,13 +724,14 @@ def post_save_geo(sender, **kwargs):
# return
# cache.set(cache_key, True, settings.CACHE_TASK_TIMEOUT)
- instance = kwargs.get('instance')
+ instance = kwargs.get("instance")
if hasattr(instance, "_no_geo_check") and instance._no_geo_check:
return
if not settings.USE_BACKGROUND_TASK:
return _post_save_geo(sender, **kwargs)
- sender, kwargs = serialize_args_for_tasks(sender, instance,
- kwargs, EXTRA_KWARGS_TRIGGER)
+ sender, kwargs = serialize_args_for_tasks(
+ sender, instance, kwargs, EXTRA_KWARGS_TRIGGER
+ )
return _post_save_geo.delay(sender, **kwargs)
@@ -718,18 +743,18 @@ def _post_save_geo(sender, **kwargs):
profile = get_current_profile()
if not profile.mapping:
return
- sender, instance = deserialize_args_for_tasks(sender, kwargs,
- EXTRA_KWARGS_TRIGGER)
+ sender, instance = deserialize_args_for_tasks(sender, kwargs, EXTRA_KWARGS_TRIGGER)
if not instance:
return
kls_name = instance.__class__.__name__
- if not profile.locate_warehouses and ("Container" in kls_name
- or "Warehouse" in kls_name):
+ if not profile.locate_warehouses and (
+ "Container" in kls_name or "Warehouse" in kls_name
+ ):
return
- if getattr(instance, '_post_saved_geo', False):
+ if getattr(instance, "_post_saved_geo", False):
return
# print(sender, "post_save_geo")
@@ -739,24 +764,27 @@ def _post_save_geo(sender, **kwargs):
current_source = str(instance.__class__._meta.verbose_name)
modified = False
- if hasattr(instance, 'multi_polygon') and not getattr(
- instance, "DISABLE_POLYGONS", False):
- if instance.multi_polygon_source_item and \
- instance.multi_polygon_source_item != current_source: # refetch
+ if hasattr(instance, "multi_polygon") and not getattr(
+ instance, "DISABLE_POLYGONS", False
+ ):
+ if (
+ instance.multi_polygon_source_item
+ and instance.multi_polygon_source_item != current_source
+ ): # refetch
instance.multi_polygon = None
instance.multi_polygon_source = None
modified = True
if instance.multi_polygon and not instance.multi_polygon_source:
# should be a db source
- instance.multi_polygon_source = 'P'
+ instance.multi_polygon_source = "P"
instance.multi_polygon_source_item = current_source
- elif instance.multi_polygon_source != 'P':
+ elif instance.multi_polygon_source != "P":
precise_poly = instance.get_precise_polygons()
if precise_poly:
poly, source_item = precise_poly
instance.multi_polygon = poly
- instance.multi_polygon_source = 'P'
+ instance.multi_polygon_source = "P"
instance.multi_polygon_source_item = source_item
modified = True
elif profile.use_town_for_geo:
@@ -765,21 +793,24 @@ def _post_save_geo(sender, **kwargs):
poly, poly_source = poly
if poly != instance.multi_polygon:
instance.multi_polygon_source_item = poly_source
- instance.multi_polygon_source = 'T' # town
+ instance.multi_polygon_source = "T" # town
try:
instance.multi_polygon = poly
modified = True
except TypeError:
print(instance, instance.pk)
- if (instance.point_source_item and
- instance.point_source_item != current_source) or (
- instance.point_source == 'M'): # refetch
+ if (
+ instance.point_source_item and instance.point_source_item != current_source
+ ) or (
+ instance.point_source == "M"
+ ): # refetch
csrs = instance.spatial_reference_system
if instance.x and instance.y:
new_point = GEOSGeometry(
- 'POINT({} {})'.format(instance.x, instance.y), srid=csrs.srid)
+ "POINT({} {})".format(instance.x, instance.y), srid=csrs.srid
+ )
proj_point = instance.point_2d.transform(csrs.srid, clone=True)
if new_point.distance(proj_point) < 0.01:
instance.x, instance.y = None, None
@@ -789,8 +820,9 @@ def _post_save_geo(sender, **kwargs):
point = instance.point
point_2d = instance.point_2d
- if (point or point_2d) and instance.x is None and not \
- instance.point_source: # db source
+ if (
+ (point or point_2d) and instance.x is None and not instance.point_source
+ ): # db source
if point:
current_point = point
instance.z = point.z
@@ -800,41 +832,48 @@ def _post_save_geo(sender, **kwargs):
instance.y = current_point.y
srs = get_srid_obj_from_point(current_point)
instance.spatial_reference_system = srs
- instance.point_source = 'P'
+ instance.point_source = "P"
instance.point_source_item = current_source
if not point_2d:
instance.point_2d = convert_coordinates_to_point(
- instance.point.x, instance.point.y,
- srid=current_point.srid)
+ instance.point.x, instance.point.y, srid=current_point.srid
+ )
modified = True
- elif instance.x and instance.y and \
- instance.spatial_reference_system and \
- instance.spatial_reference_system.auth_name == 'EPSG' and \
- instance.spatial_reference_system.srid != 0:
+ elif (
+ instance.x
+ and instance.y
+ and instance.spatial_reference_system
+ and instance.spatial_reference_system.auth_name == "EPSG"
+ and instance.spatial_reference_system.srid != 0
+ ):
# form input or already precise
try:
point_2d = convert_coordinates_to_point(
- instance.x, instance.y,
- srid=instance.spatial_reference_system.srid)
+ instance.x, instance.y, srid=instance.spatial_reference_system.srid
+ )
except forms.ValidationError:
return # irrelevant data in DB
distance = 1 # arbitrary
if point_2d and instance.point_2d:
- distance = point_2d.transform(
- 4326, clone=True).distance(
- instance.point_2d.transform(4326, clone=True))
+ distance = point_2d.transform(4326, clone=True).distance(
+ instance.point_2d.transform(4326, clone=True)
+ )
if instance.z:
point = convert_coordinates_to_point(
- instance.x, instance.y, instance.z,
- srid=instance.spatial_reference_system.srid)
+ instance.x,
+ instance.y,
+ instance.z,
+ srid=instance.spatial_reference_system.srid,
+ )
# no change if distance inf to 1 mm
- if distance >= 0.0001 and (point_2d != instance.point_2d
- or point != instance.point):
+ if distance >= 0.0001 and (
+ point_2d != instance.point_2d or point != instance.point
+ ):
instance.point = point
instance.point_2d = point_2d
- instance.point_source = 'P'
+ instance.point_source = "P"
instance.point_source_item = current_source
modified = True
else:
@@ -845,7 +884,7 @@ def _post_save_geo(sender, **kwargs):
point_2d, point, source_item = precise_points
instance.point_2d = point_2d
instance.point = point
- instance.point_source = 'P'
+ instance.point_source = "P"
instance.point_source_item = source_item
instance.x = point_2d.x
instance.y = point_2d.y
@@ -856,16 +895,16 @@ def _post_save_geo(sender, **kwargs):
modified = True
else:
centroid, source, point_source = None, None, None
- if instance.multi_polygon and instance.multi_polygon_source == 'P':
+ if instance.multi_polygon and instance.multi_polygon_source == "P":
source = current_source
centroid = instance.multi_polygon.centroid
- point_source = 'M'
+ point_source = "M"
if not centroid and profile.use_town_for_geo: # try to get from
# parent
town_centroid = instance.get_town_centroid()
if town_centroid:
centroid, source = town_centroid
- point_source = 'T'
+ point_source = "T"
if centroid:
instance.point_2d, instance.point_source_item = centroid, source
instance.point = None
@@ -892,34 +931,37 @@ def _post_save_geo(sender, **kwargs):
instance.save()
if hasattr(instance, "cascade_update"):
instance.cascade_update()
- cache_key, __ = get_cache(
- sender, ["post_save_geo", instance.pk]
- )
+ cache_key, __ = get_cache(sender, ["post_save_geo", instance.pk])
cache.set(cache_key, None, settings.CACHE_TASK_TIMEOUT)
return
-def create_slug(model, name, slug_attr='slug', max_length=100):
+def create_slug(model, name, slug_attr="slug", max_length=100):
base_slug = slugify(name)
slug = base_slug[:max_length]
final_slug = None
idx = 1
while not final_slug:
- if slug and not model.objects.filter(**{slug_attr:slug}).exists():
+ if slug and not model.objects.filter(**{slug_attr: slug}).exists():
final_slug = slug
break
- slug = base_slug[:(max_length - 1 - len(str(idx)))] + "-" + str(idx)
+ slug = base_slug[: (max_length - 1 - len(str(idx)))] + "-" + str(idx)
idx += 1
return final_slug
def get_all_field_names(model):
- return list(set(chain.from_iterable(
- (field.name, field.attname) if hasattr(field, 'attname') else (
- field.name,)
- for field in model._meta.get_fields()
- if not (field.many_to_one and field.related_model is None)
- )))
+ return list(
+ set(
+ chain.from_iterable(
+ (field.name, field.attname)
+ if hasattr(field, "attname")
+ else (field.name,)
+ for field in model._meta.get_fields()
+ if not (field.many_to_one and field.related_model is None)
+ )
+ )
+ )
def get_all_related_m2m_objects_with_model(model):
@@ -932,16 +974,17 @@ def get_all_related_m2m_objects_with_model(model):
def get_all_related_many_to_many_objects(model):
return [
- f for f in model._meta.get_fields(include_hidden=True)
+ f
+ for f in model._meta.get_fields(include_hidden=True)
if f.many_to_many and f.auto_created
]
def get_all_related_objects(model):
return [
- f for f in model._meta.get_fields()
- if (f.one_to_many or f.one_to_one)
- and f.auto_created and not f.concrete
+ f
+ for f in model._meta.get_fields()
+ if (f.one_to_many or f.one_to_one) and f.auto_created and not f.concrete
]
@@ -972,24 +1015,23 @@ def merge_tsvectors(vectors):
current_position = max_position
for dct_member in vector.split(" "):
- splitted = dct_member.split(':')
+ splitted = dct_member.split(":")
key = ":".join(splitted[:-1])
positions = splitted[-1]
key = key[1:-1] # remove quotes
- positions = [int(pos) + current_position
- for pos in positions.split(',')]
+ positions = [int(pos) + current_position for pos in positions.split(",")]
if key in result_dict:
result_dict[key] += positions
else:
result_dict[key] = positions
# {'lamelie': [1, 42, 5]} => {'lamelie': "1,42,5"}
- result_dict = {k: ",".join([str(val) for val in result_dict[k]])
- for k in result_dict}
+ result_dict = {
+ k: ",".join([str(val) for val in result_dict[k]]) for k in result_dict
+ }
# {'lamelie': "1,5", "hagarde": "2", "regarde": "4"} =>
# "'lamelie':1,5 'hagarde':2 'regarde':4"
- result = " ".join(["'{}':{}".format(k, result_dict[k])
- for k in result_dict])
+ result = " ".join(["'{}':{}".format(k, result_dict[k]) for k in result_dict])
return result
@@ -997,10 +1039,10 @@ def merge_tsvectors(vectors):
def put_session_message(session_key, message, message_type):
session = SessionStore(session_key=session_key)
messages = []
- if 'messages' in session:
- messages = session['messages'][:]
+ if "messages" in session:
+ messages = session["messages"][:]
messages.append((str(message), message_type))
- session['messages'] = messages
+ session["messages"] = messages
session.save()
@@ -1019,7 +1061,7 @@ def get_session_var(session_key, key):
def clean_session_cache(session):
# clean session cache
- cache_key_list = 'sessionlist-{}'.format(session.session_key)
+ cache_key_list = "sessionlist-{}".format(session.session_key)
key_list = cache.get(cache_key_list, [])
for key in key_list:
cache.set(key, None, settings.CACHE_TIMEOUT)
@@ -1039,7 +1081,7 @@ def get_field_labels_from_path(model, path):
except:
labels.append(key)
continue
- if hasattr(field, 'verbose_name'):
+ if hasattr(field, "verbose_name"):
labels.append(field.verbose_name)
else:
labels.append(key)
@@ -1051,19 +1093,20 @@ def create_default_areas(models=None, verbose=False):
if not models:
from ishtar_common.models import Area, Town, Department, State
else:
- Area = models['area']
- Town = models['town']
- Department = models['department']
- State = models['state']
+ Area = models["area"]
+ Town = models["town"]
+ Department = models["department"]
+ State = models["state"]
areas = {}
idx = 0
for state in State.objects.all():
- slug = 'state-' + slugify(state.label)
+ slug = "state-" + slugify(state.label)
area, created = Area.objects.get_or_create(
- txt_idx=slug, defaults={'label': state.label})
- areas['state-{}'.format(state.pk)] = area
+ txt_idx=slug, defaults={"label": state.label}
+ )
+ areas["state-{}".format(state.pk)] = area
if created:
idx += 1
if verbose:
@@ -1071,15 +1114,16 @@ def create_default_areas(models=None, verbose=False):
idx, idx2 = 0, 0
for dep in Department.objects.all():
- slug = 'dep-' + slugify(dep.label)
+ slug = "dep-" + slugify(dep.label)
area, created = Area.objects.get_or_create(
- txt_idx=slug, defaults={'label': dep.label})
- areas['dep-' + dep.number] = area
+ txt_idx=slug, defaults={"label": dep.label}
+ )
+ areas["dep-" + dep.number] = area
if created:
idx += 1
if not dep.state_id:
continue
- state_slug = 'state-{}'.format(dep.state_id)
+ state_slug = "state-{}".format(dep.state_id)
if state_slug not in areas:
continue
if area.parent and area.parent.pk == areas[state_slug].pk:
@@ -1090,15 +1134,16 @@ def create_default_areas(models=None, verbose=False):
if verbose:
print(
"* {} department areas added with {} associations to state".format(
- idx, idx2)
+ idx, idx2
+ )
)
idx = 0
for town in Town.objects.all():
if not town.numero_insee or len(town.numero_insee) != 5:
continue
- code_dep = 'dep-' + town.numero_insee[:2]
- code_dep_dom = 'dep-' + town.numero_insee[:3]
+ code_dep = "dep-" + town.numero_insee[:2]
+ code_dep_dom = "dep-" + town.numero_insee[:3]
if code_dep in areas:
if not areas[code_dep].towns.filter(pk=town.pk).count():
areas[code_dep].towns.add(town)
@@ -1112,9 +1157,17 @@ def create_default_areas(models=None, verbose=False):
print("* {} town associated to department area".format(idx))
-def get_relations_for_graph(rel_model, obj_pk, above_relations=None,
- equal_relations=None, treated=None, styles=None,
- render_above=True, render_below=True, full=False):
+def get_relations_for_graph(
+ rel_model,
+ obj_pk,
+ above_relations=None,
+ equal_relations=None,
+ treated=None,
+ styles=None,
+ render_above=True,
+ render_below=True,
+ full=False,
+):
"""
Get all above and equal relations of an object (get all child and parent
relations)
@@ -1144,37 +1197,53 @@ def get_relations_for_graph(rel_model, obj_pk, above_relations=None,
treated.append(obj_pk)
for q, inverse in (
- (rel_model.objects.filter(
- left_record_id=obj_pk,
- relation_type__logical_relation__isnull=False), False),
- (rel_model.objects.filter(
- right_record_id=obj_pk,
- relation_type__logical_relation__isnull=False), True)):
- q = q.values("left_record_id",'right_record_id',
- 'relation_type__logical_relation')
+ (
+ rel_model.objects.filter(
+ left_record_id=obj_pk, relation_type__logical_relation__isnull=False
+ ),
+ False,
+ ),
+ (
+ rel_model.objects.filter(
+ right_record_id=obj_pk, relation_type__logical_relation__isnull=False
+ ),
+ True,
+ ),
+ ):
+ q = q.values(
+ "left_record_id", "right_record_id", "relation_type__logical_relation"
+ )
get_above, get_below = render_above, render_below
if inverse and (not render_above or not render_below):
get_above, get_below = not render_above, not render_below
for relation in q.all():
- logical_relation = relation['relation_type__logical_relation']
- left_record = relation['left_record_id']
- right_record = relation['right_record_id']
+ logical_relation = relation["relation_type__logical_relation"]
+ left_record = relation["left_record_id"]
+ right_record = relation["right_record_id"]
is_above, is_below = False, False
if not logical_relation:
continue
- elif get_below and logical_relation == 'above' and \
- (left_record, right_record) not in above_relations:
+ elif (
+ get_below
+ and logical_relation == "above"
+ and (left_record, right_record) not in above_relations
+ ):
above_relations.append((left_record, right_record))
is_below = True
- elif get_above and logical_relation == 'below' and \
- (right_record, left_record) not in above_relations:
+ elif (
+ get_above
+ and logical_relation == "below"
+ and (right_record, left_record) not in above_relations
+ ):
above_relations.append((right_record, left_record))
is_above = True
- elif logical_relation == 'equal' and \
- (right_record, left_record) not in equal_relations and \
- (left_record, right_record) not in equal_relations:
+ elif (
+ logical_relation == "equal"
+ and (right_record, left_record) not in equal_relations
+ and (left_record, right_record) not in equal_relations
+ ):
equal_relations.append((left_record, right_record))
else:
continue
@@ -1183,27 +1252,40 @@ def get_relations_for_graph(rel_model, obj_pk, above_relations=None,
other_record = left_record
else:
other_record = right_record
- if get_above and get_below and not full and (is_below or
- is_above):
+ if get_above and get_below and not full and (is_below or is_above):
if (is_above and not inverse) or (is_below and inverse):
ar, er, substyles = get_relations_for_graph(
- rel_model, other_record, above_relations,
- equal_relations, treated, styles,
+ rel_model,
+ other_record,
+ above_relations,
+ equal_relations,
+ treated,
+ styles,
render_above=True,
- render_below=False
+ render_below=False,
)
else:
ar, er, substyles = get_relations_for_graph(
- rel_model, other_record, above_relations,
- equal_relations, treated, styles,
+ rel_model,
+ other_record,
+ above_relations,
+ equal_relations,
+ treated,
+ styles,
render_above=False,
- render_below=True
+ render_below=True,
)
else:
ar, er, substyles = get_relations_for_graph(
- rel_model, other_record, above_relations, equal_relations,
- treated, styles, render_above=render_above,
- render_below=render_below, full=full
+ rel_model,
+ other_record,
+ above_relations,
+ equal_relations,
+ treated,
+ styles,
+ render_above=render_above,
+ render_below=render_below,
+ full=full,
)
styles.update(substyles)
error_style = "color=red"
@@ -1234,19 +1316,28 @@ def get_relations_for_graph(rel_model, obj_pk, above_relations=None,
return above_relations, equal_relations, styles
-def generate_relation_graph(obj, highlight_current=True,
- render_above=True, render_below=True,
- full=False, debug=False):
+def generate_relation_graph(
+ obj,
+ highlight_current=True,
+ render_above=True,
+ render_below=True,
+ full=False,
+ debug=False,
+):
if not settings.DOT_BINARY:
return
model = obj.__class__
- rel_model = model._meta.get_field('right_relations').related_model
+ rel_model = model._meta.get_field("right_relations").related_model
# get relations
above_relations, equal_relations, styles = get_relations_for_graph(
- rel_model, obj.pk, render_above=render_above,
- render_below=render_below, full=full)
+ rel_model,
+ obj.pk,
+ render_above=render_above,
+ render_below=render_below,
+ full=full,
+ )
# generate dotfile
dot_str = "digraph relations {\nnode [shape=box];\n"
@@ -1258,13 +1349,10 @@ def generate_relation_graph(obj, highlight_current=True,
if highlight_current:
style += ',style=filled,fillcolor="#C6C0C0"'
dot_str += 'item{}[{},href="{}"];\n'.format(
- obj.pk, style,
- reverse('display-item',
- args=[model.SLUG, obj.pk])
+ obj.pk, style, reverse("display-item", args=[model.SLUG, obj.pk])
)
rel_str += "}\n"
- for list, directed in ((above_relations, True),
- (equal_relations, False)):
+ for list, directed in ((above_relations, True), (equal_relations, False)):
if directed:
rel_str += "subgraph Dir {\n"
else:
@@ -1277,8 +1365,7 @@ def generate_relation_graph(obj, highlight_current=True,
if left.pk == obj.pk and highlight_current:
style += ',style=filled,fillcolor="#C6C0C0"'
dot_str += 'item{}[{},href="{}"];\n'.format(
- left.pk, style,
- reverse('display-item', args=[model.SLUG, left.pk])
+ left.pk, style, reverse("display-item", args=[model.SLUG, left.pk])
)
if right_pk not in described:
described.append(right_pk)
@@ -1287,22 +1374,24 @@ def generate_relation_graph(obj, highlight_current=True,
if right.pk == obj.pk and highlight_current:
style += ',style=filled,fillcolor="#C6C0C0"'
dot_str += 'item{}[{},href="{}"];\n'.format(
- right.pk, style,
- reverse('display-item', args=[model.SLUG, right.pk])
+ right.pk,
+ style,
+ reverse("display-item", args=[model.SLUG, right.pk]),
)
if not directed: # on the same level
rel_str += "{{rank = same; item{}; item{};}}\n".format(
- left_pk, right_pk)
+ left_pk, right_pk
+ )
style = ""
if (left_pk, right_pk) in styles:
style = " [{}]".format(", ".join(styles[(left_pk, right_pk)]))
- rel_str += 'item{} -> item{}{};\n'.format(left_pk, right_pk, style)
+ rel_str += "item{} -> item{}{};\n".format(left_pk, right_pk, style)
rel_str += "}\n"
dot_str += rel_str + "\n}"
tempdir = tempfile.mkdtemp("-ishtardot")
dot_name = tempdir + os.path.sep + "relations.dot"
- with open(dot_name, 'w') as dot_file:
+ with open(dot_name, "w") as dot_file:
dot_file.write(dot_str)
if not render_above:
@@ -1312,8 +1401,7 @@ def generate_relation_graph(obj, highlight_current=True,
else:
suffix = ""
- if full and obj.MAIN_UP_MODEL_QUERY and getattr(obj,
- obj.MAIN_UP_MODEL_QUERY):
+ if full and obj.MAIN_UP_MODEL_QUERY and getattr(obj, obj.MAIN_UP_MODEL_QUERY):
obj = getattr(obj, obj.MAIN_UP_MODEL_QUERY)
with open(dot_name, "r") as dot_file:
@@ -1353,8 +1441,7 @@ def generate_relation_graph(obj, highlight_current=True,
png_name = tempdir + os.path.sep + "relations.png"
with open(png_name, "wb") as png_file:
- svg2png(open(svg_tmp_name, 'rb').read(),
- write_to=png_file)
+ svg2png(open(svg_tmp_name, "rb").read(), write_to=png_file)
with open(png_name, "rb") as png_file:
django_file = File(png_file)
attr = "relation_bitmap_image" + suffix
@@ -1391,58 +1478,79 @@ def create_default_json_fields(model):
content_type = ContentType.objects.get_for_model(model)
for key in keys:
JsonDataField.objects.get_or_create(
- content_type=content_type, key=key,
+ content_type=content_type,
+ key=key,
defaults={
- 'name': " ".join(key.split('__')).capitalize(),
- 'value_type': 'T',
- 'display': False
- }
+ "name": " ".join(key.split("__")).capitalize(),
+ "value_type": "T",
+ "display": False,
+ },
)
-def get_urls_for_model(model, views, own=False, autocomplete=False,
- ):
+def get_urls_for_model(
+ model,
+ views,
+ own=False,
+ autocomplete=False,
+):
"""
Generate get and show url for a model
"""
urls = [
- url(r'show-{}(?:/(?P<pk>.+))?/(?P<type>.+)?$'.format(model.SLUG),
- check_rights(['view_' + model.SLUG, 'view_own_' + model.SLUG])(
- getattr(views, 'show_' + model.SLUG)),
- name="show-" + model.SLUG),
- url(r'^display-{}/(?P<pk>.+)/$'.format(model.SLUG),
- check_rights(['view_' + model.SLUG, 'view_own_' + model.SLUG])(
- getattr(views, 'display_' + model.SLUG)),
- name='display-' + model.SLUG),
+ url(
+ r"show-{}(?:/(?P<pk>.+))?/(?P<type>.+)?$".format(model.SLUG),
+ check_rights(["view_" + model.SLUG, "view_own_" + model.SLUG])(
+ getattr(views, "show_" + model.SLUG)
+ ),
+ name="show-" + model.SLUG,
+ ),
+ url(
+ r"^display-{}/(?P<pk>.+)/$".format(model.SLUG),
+ check_rights(["view_" + model.SLUG, "view_own_" + model.SLUG])(
+ getattr(views, "display_" + model.SLUG)
+ ),
+ name="display-" + model.SLUG,
+ ),
]
if own:
urls += [
- url(r'get-{}/own/(?P<type>.+)?$'.format(model.SLUG),
- check_rights(['view_' + model.SLUG, 'view_own_' + model.SLUG])(
- getattr(views, 'get_' + model.SLUG)),
- name="get-own-" + model.SLUG, kwargs={'force_own': True}),
+ url(
+ r"get-{}/own/(?P<type>.+)?$".format(model.SLUG),
+ check_rights(["view_" + model.SLUG, "view_own_" + model.SLUG])(
+ getattr(views, "get_" + model.SLUG)
+ ),
+ name="get-own-" + model.SLUG,
+ kwargs={"force_own": True},
+ ),
]
urls += [
- url(r'get-{}/(?P<type>.+)?$'.format(model.SLUG),
- check_rights(['view_' + model.SLUG, 'view_own_' + model.SLUG])(
- getattr(views, 'get_' + model.SLUG)),
- name="get-" + model.SLUG),
+ url(
+ r"get-{}/(?P<type>.+)?$".format(model.SLUG),
+ check_rights(["view_" + model.SLUG, "view_own_" + model.SLUG])(
+ getattr(views, "get_" + model.SLUG)
+ ),
+ name="get-" + model.SLUG,
+ ),
]
if autocomplete:
urls += [
- url(r'autocomplete-{}/$'.format(model.SLUG),
- check_rights(['view_' + model.SLUG, 'view_own_' + model.SLUG])(
- getattr(views, 'autocomplete_' + model.SLUG)),
- name='autocomplete-' + model.SLUG),
+ url(
+ r"autocomplete-{}/$".format(model.SLUG),
+ check_rights(["view_" + model.SLUG, "view_own_" + model.SLUG])(
+ getattr(views, "autocomplete_" + model.SLUG)
+ ),
+ name="autocomplete-" + model.SLUG,
+ ),
]
return urls
def m2m_historization_changed(sender, **kwargs):
- obj = kwargs.get('instance', None)
+ obj = kwargs.get("instance", None)
if not obj:
return
hist_values = obj.history_m2m or {}
@@ -1454,11 +1562,11 @@ def m2m_historization_changed(sender, **kwargs):
values.append(value.history_compress())
hist_values[attr] = values
obj.history_m2m = hist_values
- if getattr(obj, 'skip_history_when_saving', False):
+ if getattr(obj, "skip_history_when_saving", False):
# assume the last modifier is good...
q = obj.history.filter(
history_modifier_id=obj.history_modifier_id,
- ).order_by('-history_date', '-history_id')
+ ).order_by("-history_date", "-history_id")
if q.count():
hist = q.all()[0]
hist.history_m2m = hist_values
@@ -1507,8 +1615,7 @@ def get_broken_links(path):
target_path = os.readlink(path)
# resolve relative symlinks
if not os.path.isabs(target_path):
- target_path = os.path.join(os.path.dirname(path),
- target_path)
+ target_path = os.path.join(os.path.dirname(path), target_path)
if not os.path.exists(target_path):
yield path
@@ -1539,12 +1646,12 @@ def simplify_name(full_path_name, check_existing=False, min_len=15):
for m in regex.finditer(name): # get the last match
match = m
if match:
- new_name = name.replace(match.group(), '')
+ new_name = name.replace(match.group(), "")
full_new_name = os.sep.join([path, new_name + ext])
try:
is_file = os.path.isfile(full_new_name)
except UnicodeEncodeError:
- is_file = os.path.isfile(full_new_name.encode('utf-8'))
+ is_file = os.path.isfile(full_new_name.encode("utf-8"))
if not check_existing or not is_file:
# do not take the place of another file
name = new_name[:]
@@ -1565,14 +1672,13 @@ def rename_and_simplify_media_name(full_path_name, rename=True):
exists = os.path.exists(full_path_name)
is_file = os.path.isfile(full_path_name)
except UnicodeEncodeError:
- full_path_name_unicode = full_path_name.encode('utf-8')
+ full_path_name_unicode = full_path_name.encode("utf-8")
exists = os.path.exists(full_path_name_unicode)
is_file = os.path.isfile(full_path_name_unicode)
if not exists or not is_file:
return full_path_name, False
- path, current_name, name = simplify_name(full_path_name,
- check_existing=True)
+ path, current_name, name = simplify_name(full_path_name, check_existing=True)
if current_name == name:
return full_path_name, False
@@ -1595,8 +1701,9 @@ def get_file_fields():
return fields
-def get_used_media(exclude=None, limit=None,
- return_object_and_field=False, debug=False):
+def get_used_media(
+ exclude=None, limit=None, return_object_and_field=False, debug=False
+):
"""
Get media which are still used in models
@@ -1617,28 +1724,32 @@ def get_used_media(exclude=None, limit=None,
continue
if limit and str(field) not in limit:
continue
- is_null = {'%s__isnull' % field.name: True}
- is_empty = {'%s' % field.name: ''}
+ is_null = {"%s__isnull" % field.name: True}
+ is_empty = {"%s" % field.name: ""}
storage = field.storage
if debug:
print("")
- q = field.model.objects.values('id', field.name)\
- .exclude(**is_empty).exclude(**is_null)
+ q = (
+ field.model.objects.values("id", field.name)
+ .exclude(**is_empty)
+ .exclude(**is_null)
+ )
ln = q.count()
for idx, res in enumerate(q):
value = res[field.name]
if debug:
- sys.stdout.write("* get_used_media {}: {}/{}\r".format(
- field, idx, ln))
+ sys.stdout.write("* get_used_media {}: {}/{}\r".format(field, idx, ln))
sys.stdout.flush()
if value not in EMPTY_VALUES:
if return_object_and_field:
- media.append((
- field.model.objects.get(pk=res['id']),
- field.name,
- storage.path(value)
- ))
+ media.append(
+ (
+ field.model.objects.get(pk=res["id"]),
+ field.name,
+ storage.path(value),
+ )
+ )
else:
media.add(storage.path(value))
return media
@@ -1662,15 +1773,17 @@ def get_all_media(exclude=None, debug=False):
print("")
for idx, name in enumerate(files):
if debug:
- sys.stdout.write("* get_all_media {} ({}/{}): {}/{}\r".format(
- root.encode('utf-8'), idx_main, ln_full, idx, ln))
+ sys.stdout.write(
+ "* get_all_media {} ({}/{}): {}/{}\r".format(
+ root.encode("utf-8"), idx_main, ln_full, idx, ln
+ )
+ )
sys.stdout.flush()
path = os.path.abspath(os.path.join(root, name))
relpath = os.path.relpath(path, settings.MEDIA_ROOT)
in_exclude = False
for e in exclude:
- if re.match(r'^%s$' % re.escape(e).replace('\\*', '.*'),
- relpath):
+ if re.match(r"^%s$" % re.escape(e).replace("\\*", ".*"), relpath):
in_exclude = True
break
@@ -1678,8 +1791,11 @@ def get_all_media(exclude=None, debug=False):
media.add(path)
else:
if debug:
- sys.stdout.write("* get_all_media {} ({}/{})\r".format(
- root.encode('utf-8'), idx_main, ln_full))
+ sys.stdout.write(
+ "* get_all_media {} ({}/{})\r".format(
+ root.encode("utf-8"), idx_main, ln_full
+ )
+ )
return media
@@ -1763,24 +1879,27 @@ def try_fix_file(filename, make_copy=True, hard=False):
"""
filename = os.path.abspath(filename)
path, current_name, simplified_ref_name = simplify_name(
- filename, check_existing=False, min_len=10)
+ filename, check_existing=False, min_len=10
+ )
try:
dirs = list(sorted(os.listdir(path)))
except UnicodeDecodeError:
- dirs = list(sorted(os.listdir(path.encode('utf-8'))))
+ dirs = list(sorted(os.listdir(path.encode("utf-8"))))
# check existing files in the path
for f in dirs:
- result = _try_copy(path, f, filename, simplified_ref_name, make_copy,
- min_len=10)
+ result = _try_copy(
+ path, f, filename, simplified_ref_name, make_copy, min_len=10
+ )
if result:
return result
if not hard:
return
for path, __, files in os.walk(settings.MEDIA_ROOT):
for f in files:
- result = _try_copy(path, f, filename, simplified_ref_name,
- make_copy, min_len=10)
+ result = _try_copy(
+ path, f, filename, simplified_ref_name, make_copy, min_len=10
+ )
if result:
return result
@@ -1797,34 +1916,34 @@ PARSE_JINJA_IF = re.compile("{% if ([^}]*)}")
def _deduplicate(value):
new_values = []
- for v in value.split('-'):
+ for v in value.split("-"):
if v not in new_values:
new_values.append(v)
- return '-'.join(new_values)
+ return "-".join(new_values)
FORMULA_FILTERS = {
- 'upper': lambda x: x.upper(),
- 'lower': lambda x: x.lower(),
- 'capitalize': lambda x: x.capitalize(),
- 'slug': lambda x: slugify(x),
- 'deduplicate': _deduplicate
+ "upper": lambda x: x.upper(),
+ "lower": lambda x: x.lower(),
+ "capitalize": lambda x: x.capitalize(),
+ "slug": lambda x: slugify(x),
+ "deduplicate": _deduplicate,
}
def _update_gen_id_dct(item, dct, initial_key, fkey=None, filters=None):
if not fkey:
fkey = initial_key[:]
- if fkey.startswith('settings__'):
- dct[fkey] = getattr(settings, fkey[len('settings__'):]) or ''
+ if fkey.startswith("settings__"):
+ dct[fkey] = getattr(settings, fkey[len("settings__") :]) or ""
return
obj = item
- for k in fkey.split('__'):
+ for k in fkey.split("__"):
try:
obj = getattr(obj, k)
except (ObjectDoesNotExist, AttributeError):
obj = None
- if hasattr(obj, 'all') and hasattr(obj, 'count'): # query manager
+ if hasattr(obj, "all") and hasattr(obj, "count"): # query manager
if not obj.count():
break
obj = obj.all()[0]
@@ -1833,7 +1952,7 @@ def _update_gen_id_dct(item, dct, initial_key, fkey=None, filters=None):
if obj is None:
break
if obj is None:
- dct[initial_key] = ''
+ dct[initial_key] = ""
else:
dct[initial_key] = str(obj)
if filters:
@@ -1867,13 +1986,14 @@ def get_generated_id(key, item):
new_keys = []
for key in key_list:
if key.startswith("not "):
- key = key[len("not "):].strip()
+ key = key[len("not ") :].strip()
key = key.split(".")[0]
if " % " in key:
keys = key.split(" % ")[1]
keys = [
i.replace("(", "").replace(")", "").split("|")[0].strip()
- for i in keys.split(",")]
+ for i in keys.split(",")
+ ]
else:
keys = [key]
new_keys += keys
@@ -1884,7 +2004,7 @@ def get_generated_id(key, item):
return tpl.render(dct)
for fkey in PARSE_FORMULA.findall(formula):
- filtered = fkey.split('|')
+ filtered = fkey.split("|")
initial_key = fkey[:]
fkey = filtered[0]
filters = []
@@ -1892,17 +2012,17 @@ def get_generated_id(key, item):
if filtr in FORMULA_FILTERS:
filters.append(FORMULA_FILTERS[filtr])
_update_gen_id_dct(item, dct, initial_key, fkey, filters=filters)
- values = formula.format(**dct).split('||')
+ values = formula.format(**dct).split("||")
value = values[0]
for filtr in values[1:]:
if filtr not in FORMULA_FILTERS:
- value += '||' + filtr
+ value += "||" + filtr
continue
value = FORMULA_FILTERS[filtr](value)
return value
-PRIVATE_FIELDS = ('id', 'history_modifier', 'order', 'uuid')
+PRIVATE_FIELDS = ("id", "history_modifier", "order", "uuid")
def duplicate_item(item, user=None, data=None):
@@ -1924,8 +2044,11 @@ def duplicate_item(item, user=None, data=None):
new.save()
# m2m fields
- m2m = [field.name for field in model._meta.many_to_many
- if field.name not in PRIVATE_FIELDS]
+ m2m = [
+ field.name
+ for field in model._meta.many_to_many
+ if field.name not in PRIVATE_FIELDS
+ ]
for field in m2m:
for val in getattr(item, field).all():
if val not in getattr(new, field).all():
@@ -1935,8 +2058,7 @@ def duplicate_item(item, user=None, data=None):
def get_image_path(instance, filename):
# when using migrations instance is not a real ImageModel instance
- if not hasattr(instance, '_get_image_path'):
+ if not hasattr(instance, "_get_image_path"):
n = datetime.datetime.now()
- return "upload/{}/{:02d}/{:02d}/{}".format(
- n.year, n.month, n.day, filename)
+ return "upload/{}/{:02d}/{:02d}/{}".format(n.year, n.month, n.day, filename)
return instance._get_image_path(filename)