summaryrefslogtreecommitdiff
path: root/ishtar_common/models_common.py
diff options
context:
space:
mode:
Diffstat (limited to 'ishtar_common/models_common.py')
-rw-r--r--ishtar_common/models_common.py1390
1 files changed, 799 insertions, 591 deletions
diff --git a/ishtar_common/models_common.py b/ishtar_common/models_common.py
index e0685c064..7a0397c36 100644
--- a/ishtar_common/models_common.py
+++ b/ishtar_common/models_common.py
@@ -37,20 +37,30 @@ from django.db.models.signals import post_save, post_delete, m2m_changed
from django.template.defaultfilters import slugify
from django.utils.safestring import SafeText, mark_safe
from django.utils.translation import activate, deactivate
-from ishtar_common.utils import ugettext_lazy as _, \
- pgettext_lazy, get_image_path
+from ishtar_common.utils import ugettext_lazy as _, pgettext_lazy, get_image_path
from simple_history.models import HistoricalRecords as BaseHistoricalRecords
-from simple_history.signals import post_create_historical_record, \
- pre_create_historical_record
+from simple_history.signals import (
+ post_create_historical_record,
+ pre_create_historical_record,
+)
from unidecode import unidecode
from ishtar_common.model_managers import TypeManager
from ishtar_common.model_merging import merge_model_objects
from ishtar_common.models_imports import Import
from ishtar_common.templatetags.link_to_window import simple_link_to_window
-from ishtar_common.utils import get_cache, disable_for_loaddata, \
- get_all_field_names, merge_tsvectors, cached_label_changed, post_save_geo, \
- task, duplicate_item, get_generated_id, get_current_profile
+from ishtar_common.utils import (
+ get_cache,
+ disable_for_loaddata,
+ get_all_field_names,
+ merge_tsvectors,
+ cached_label_changed,
+ post_save_geo,
+ task,
+ duplicate_item,
+ get_generated_id,
+ get_current_profile,
+)
logger = logging.getLogger(__name__)
@@ -73,7 +83,7 @@ class CachedGen(object):
@classmethod
def _add_cache_key_to_refresh(cls, keys):
- cache_ckey, current_keys = get_cache(cls, ['_current_keys'])
+ cache_ckey, current_keys = get_cache(cls, ["_current_keys"])
if type(current_keys) != list:
current_keys = []
if keys not in current_keys:
@@ -82,17 +92,17 @@ class CachedGen(object):
class Cached(CachedGen):
- slug_field = 'txt_idx'
+ slug_field = "txt_idx"
@classmethod
def refresh_cache(cls):
- cache_ckey, current_keys = get_cache(cls, ['_current_keys'])
+ cache_ckey, current_keys = get_cache(cls, ["_current_keys"])
if not current_keys:
return
for keys in current_keys:
- if len(keys) == 2 and keys[0] == '__slug':
+ if len(keys) == 2 and keys[0] == "__slug":
cls.get_cache(keys[1], force=True)
- elif keys[0] == '__get_types':
+ elif keys[0] == "__get_types":
default = None
empty_first = True
exclude = []
@@ -102,14 +112,17 @@ class Cached(CachedGen):
empty_first = bool(keys.pop())
exclude = keys[1:]
cls.get_types(
- exclude=exclude, empty_first=empty_first, default=default,
- force=True)
- elif keys[0] == '__get_help':
+ exclude=exclude,
+ empty_first=empty_first,
+ default=default,
+ force=True,
+ )
+ elif keys[0] == "__get_help":
cls.get_help(force=True)
@classmethod
def _add_cache_key_to_refresh(cls, keys):
- cache_ckey, current_keys = get_cache(cls, ['_current_keys'])
+ cache_ckey, current_keys = get_cache(cls, ["_current_keys"])
if type(current_keys) != list:
current_keys = []
if keys not in current_keys:
@@ -118,7 +131,7 @@ class Cached(CachedGen):
@classmethod
def get_cache(cls, slug, force=False):
- cache_key, value = get_cache(cls, ['__slug', slug])
+ cache_key, value = get_cache(cls, ["__slug", slug])
if not force and value:
return value
try:
@@ -140,14 +153,18 @@ class GeneralType(Cached, models.Model):
"""
Abstract class for "types"
"""
+
label = models.TextField(_("Label"))
txt_idx = models.TextField(
- _("Textual ID"), validators=[validate_slug],
+ _("Textual ID"),
+ validators=[validate_slug],
unique=True,
help_text=_(
"The slug is the standardized version of the name. It contains "
"only lowercase letters, numbers and hyphens. Each slug must "
- "be unique."))
+ "be unique."
+ ),
+ )
comment = models.TextField(_("Comment"), blank=True, default="")
available = models.BooleanField(_("Available"), default=True)
HELP_TEXT = ""
@@ -170,16 +187,20 @@ class GeneralType(Cached, models.Model):
"""
Used for automatic documentation generation
"""
- s = "**label** {}, **txt_idx** {}".format(str(_("Label")),
- str(_("Textual ID")))
+ s = "**label** {}, **txt_idx** {}".format(str(_("Label")), str(_("Textual ID")))
if hasattr(cls, "extra_documentation_string"):
s += cls.extra_documentation_string()
return s
@classmethod
def admin_url(cls):
- return str(reverse('admin:{}_{}_changelist'.format(
- cls._meta.app_label, cls._meta.model_name)))
+ return str(
+ reverse(
+ "admin:{}_{}_changelist".format(
+ cls._meta.app_label, cls._meta.model_name
+ )
+ )
+ )
@classmethod
def history_decompress(cls, value, create=False):
@@ -199,7 +220,7 @@ class GeneralType(Cached, models.Model):
@classmethod
def create_default_for_test(cls):
- return [cls.objects.create(label='Test %d' % i) for i in range(5)]
+ return [cls.objects.create(label="Test %d" % i) for i in range(5)]
@property
def short_label(self):
@@ -210,7 +231,7 @@ class GeneralType(Cached, models.Model):
return self.label
@classmethod
- def get_or_create(cls, slug, label=''):
+ def get_or_create(cls, slug, label=""):
"""
Get or create a new item.
@@ -225,7 +246,8 @@ class GeneralType(Cached, models.Model):
if item:
return item
item, created = cls.objects.get_or_create(
- txt_idx=slug, defaults={'label': label})
+ txt_idx=slug, defaults={"label": label}
+ )
return item
@classmethod
@@ -260,9 +282,9 @@ class GeneralType(Cached, models.Model):
dct = {}
if not exclude:
exclude = []
- keys = ['__get_help']
+ keys = ["__get_help"]
keys += ["{}".format(ex) for ex in exclude]
- keys += ['{}-{}'.format(str(k), dct[k]) for k in dct]
+ keys += ["{}-{}".format(str(k), dct[k]) for k in dct]
cache_key, value = get_cache(cls, keys)
if value and not force:
return mark_safe(value)
@@ -270,11 +292,11 @@ class GeneralType(Cached, models.Model):
c_rank = -1
help_items = "\n"
for item in cls.get_types(dct=dct, instances=True, exclude=exclude):
- if hasattr(item, '__iter__'):
+ if hasattr(item, "__iter__"):
pk = item[0]
item = cls.objects.get(pk=pk)
item.rank = c_rank + 1
- if hasattr(item, 'parent'):
+ if hasattr(item, "parent"):
c_item = item
parents = []
while c_item.parent:
@@ -291,11 +313,13 @@ class GeneralType(Cached, models.Model):
help_items += "<dl>\n"
c_rank = item.rank
help_items += "<dt>%s</dt><dd>%s</dd>" % (
- item.label, "<br/>".join(item.comment.split('\n')))
+ item.label,
+ "<br/>".join(item.comment.split("\n")),
+ )
c_rank += 1
if c_rank:
help_items += c_rank * "</dl>"
- if help_text or help_items != '\n':
+ if help_text or help_items != "\n":
help_text = help_text + help_items
else:
help_text = ""
@@ -327,19 +351,27 @@ class GeneralType(Cached, models.Model):
return new_vals
@classmethod
- def get_types(cls, dct=None, instances=False, exclude=None,
- empty_first=True, default=None, initial=None, force=False,
- full_hierarchy=False):
+ def get_types(
+ cls,
+ dct=None,
+ instances=False,
+ exclude=None,
+ empty_first=True,
+ default=None,
+ initial=None,
+ force=False,
+ full_hierarchy=False,
+ ):
if not dct:
dct = {}
if not exclude:
exclude = []
types = []
if not instances and empty_first and not default:
- types = [('', '--')]
- types += cls._pre_get_types(dct, instances, exclude,
- default, force,
- get_full_hierarchy=full_hierarchy)
+ types = [("", "--")]
+ types += cls._pre_get_types(
+ dct, instances, exclude, default, force, get_full_hierarchy=full_hierarchy
+ )
if not initial:
return types
new_vals = cls._get_initial_types(initial, [idx for idx, lbl in types])
@@ -347,8 +379,15 @@ class GeneralType(Cached, models.Model):
return types
@classmethod
- def _pre_get_types(cls, dct=None, instances=False, exclude=None,
- default=None, force=False, get_full_hierarchy=False):
+ def _pre_get_types(
+ cls,
+ dct=None,
+ instances=False,
+ exclude=None,
+ default=None,
+ force=False,
+ get_full_hierarchy=False,
+ ):
if not dct:
dct = {}
if not exclude:
@@ -356,31 +395,42 @@ class GeneralType(Cached, models.Model):
# cache
cache_key = None
if not instances:
- keys = ['__get_types']
- keys += ["{}".format(ex) for ex in exclude] + \
- ["{}".format(default)]
- keys += ['{}-{}'.format(str(k), dct[k]) for k in dct]
+ keys = ["__get_types"]
+ keys += ["{}".format(ex) for ex in exclude] + ["{}".format(default)]
+ keys += ["{}-{}".format(str(k), dct[k]) for k in dct]
cache_key, value = get_cache(cls, keys)
if value and not force:
return value
base_dct = dct.copy()
- if hasattr(cls, 'parent'):
+ if hasattr(cls, "parent"):
if not cache_key:
return cls._get_parent_types(
- base_dct, instances, exclude=exclude,
- default=default, get_full_hierarchy=get_full_hierarchy)
- vals = [v for v in cls._get_parent_types(
- base_dct, instances, exclude=exclude,
- default=default, get_full_hierarchy=get_full_hierarchy)]
+ base_dct,
+ instances,
+ exclude=exclude,
+ default=default,
+ get_full_hierarchy=get_full_hierarchy,
+ )
+ vals = [
+ v
+ for v in cls._get_parent_types(
+ base_dct,
+ instances,
+ exclude=exclude,
+ default=default,
+ get_full_hierarchy=get_full_hierarchy,
+ )
+ ]
cache.set(cache_key, vals, settings.CACHE_TIMEOUT)
return vals
if not cache_key:
- return cls._get_types(base_dct, instances, exclude=exclude,
- default=default)
+ return cls._get_types(base_dct, instances, exclude=exclude, default=default)
vals = [
- v for v in cls._get_types(base_dct, instances, exclude=exclude,
- default=default)
+ v
+ for v in cls._get_types(
+ base_dct, instances, exclude=exclude, default=default
+ )
]
cache.set(cache_key, vals, settings.CACHE_TIMEOUT)
return vals
@@ -391,7 +441,7 @@ class GeneralType(Cached, models.Model):
dct = {}
if not exclude:
exclude = []
- dct['available'] = True
+ dct["available"] = True
if default:
try:
default = cls.objects.get(txt_idx=default)
@@ -400,7 +450,7 @@ class GeneralType(Cached, models.Model):
pass
items = cls.objects.filter(**dct)
if default and default != "None":
- if hasattr(default, 'txt_idx'):
+ if hasattr(default, "txt_idx"):
exclude.append(default.txt_idx)
else:
exclude.append(default)
@@ -411,7 +461,7 @@ class GeneralType(Cached, models.Model):
item.rank = 0
yield item
else:
- yield (item.pk, _(str(item)) if item and str(item) else '')
+ yield (item.pk, _(str(item)) if item and str(item) else "")
@classmethod
def _get_childs_list(cls, dct=None, exclude=None, instances=False):
@@ -419,13 +469,13 @@ class GeneralType(Cached, models.Model):
dct = {}
if not exclude:
exclude = []
- if 'parent' in dct:
- dct.pop('parent')
+ if "parent" in dct:
+ dct.pop("parent")
childs = cls.objects.filter(**dct)
if exclude:
childs = childs.exclude(txt_idx__in=exclude)
- if hasattr(cls, 'order'):
- childs = childs.order_by('order')
+ if hasattr(cls, "order"):
+ childs = childs.order_by("order")
res = {}
if instances:
for item in childs.all():
@@ -450,8 +500,16 @@ class GeneralType(Cached, models.Model):
PREFIX_CODES = ["\u2502", "\u251C", "\u2514"]
@classmethod
- def _get_childs(cls, item, child_list, prefix=0, instances=False,
- is_last=False, last_of=None, get_full_hierarchy=False):
+ def _get_childs(
+ cls,
+ item,
+ child_list,
+ prefix=0,
+ instances=False,
+ is_last=False,
+ last_of=None,
+ get_full_hierarchy=False,
+ ):
if not last_of:
last_of = []
@@ -465,7 +523,7 @@ class GeneralType(Cached, models.Model):
full_hierarchy_initial = get_full_hierarchy
for idx, child in enumerate(current_child_lst):
mylast_of = last_of[:]
- p = ''
+ p = ""
if instances:
child.rank = prefix
lst.append(child)
@@ -495,9 +553,7 @@ class GeneralType(Cached, models.Model):
p += cls.PREFIX_EMPTY
else:
p += cls.PREFIX
- lst.append((
- child[0], SafeText(p + str(_(child[1])))
- ))
+ lst.append((child[0], SafeText(p + str(_(child[1])))))
clast_of = last_of[:]
clast_of.append(idx + 1 == total)
if instances:
@@ -512,20 +568,31 @@ class GeneralType(Cached, models.Model):
else:
get_full_hierarchy = child[1]
for sub_child in cls._get_childs(
- child_id, child_list, prefix, instances,
- is_last=((idx + 1) == total), last_of=clast_of,
- get_full_hierarchy=get_full_hierarchy):
+ child_id,
+ child_list,
+ prefix,
+ instances,
+ is_last=((idx + 1) == total),
+ last_of=clast_of,
+ get_full_hierarchy=get_full_hierarchy,
+ ):
lst.append(sub_child)
return lst
@classmethod
- def _get_parent_types(cls, dct=None, instances=False, exclude=None,
- default=None, get_full_hierarchy=False):
+ def _get_parent_types(
+ cls,
+ dct=None,
+ instances=False,
+ exclude=None,
+ default=None,
+ get_full_hierarchy=False,
+ ):
if not dct:
dct = {}
if not exclude:
exclude = []
- dct['available'] = True
+ dct["available"] = True
child_list = cls._get_childs_list(dct, exclude, instances)
if 0 in child_list:
@@ -540,8 +607,11 @@ class GeneralType(Cached, models.Model):
if get_full_hierarchy:
get_full_hierarchy = item[1]
for child in cls._get_childs(
- item_id, child_list, instances=instances,
- get_full_hierarchy=get_full_hierarchy):
+ item_id,
+ child_list,
+ instances=instances,
+ get_full_hierarchy=get_full_hierarchy,
+ ):
yield child
def save(self, *args, **kwargs):
@@ -551,8 +621,7 @@ class GeneralType(Cached, models.Model):
if isinstance(txt_idx, list):
txt_idx = txt_idx[0]
self.txt_idx = txt_idx
- self.label = " ".join(" ".join(self.txt_idx.split('-'))
- .split('_')).title()
+ self.label = " ".join(" ".join(self.txt_idx.split("-")).split("_")).title()
if not self.txt_idx:
self.txt_idx = slugify(self.label)[:100]
@@ -562,34 +631,36 @@ class GeneralType(Cached, models.Model):
content_type = ContentType.objects.get_for_model(self.__class__)
if slugify(self.label) != slugify(old.label):
ItemKey.objects.filter(
- object_id=self.pk, key=slugify(old.label),
- content_type=content_type).delete()
+ object_id=self.pk, key=slugify(old.label), content_type=content_type
+ ).delete()
if self.txt_idx != old.txt_idx:
ItemKey.objects.filter(
- object_id=self.pk, key=old.txt_idx,
- content_type=content_type).delete()
+ object_id=self.pk, key=old.txt_idx, content_type=content_type
+ ).delete()
obj = super(GeneralType, self).save(*args, **kwargs)
self.generate_key(force=True)
return obj
- def add_key(self, key, force=False, importer=None, group=None,
- user=None):
+ def add_key(self, key, force=False, importer=None, group=None, user=None):
ItemKey = apps.get_model("ishtar_common", "ItemKey")
content_type = ContentType.objects.get_for_model(self.__class__)
- if not importer and not force and ItemKey.objects.filter(
- key=key, content_type=content_type).count():
+ if (
+ not importer
+ and not force
+ and ItemKey.objects.filter(key=key, content_type=content_type).count()
+ ):
return
- filtr = {'key': key, 'content_type': content_type}
+ filtr = {"key": key, "content_type": content_type}
if group:
- filtr['group'] = group
+ filtr["group"] = group
elif user:
- filtr['user'] = user
+ filtr["user"] = user
else:
- filtr['importer'] = importer
+ filtr["importer"] = importer
if force:
ItemKey.objects.filter(**filtr).exclude(object_id=self.pk).delete()
- filtr['object_id'] = self.pk
+ filtr["object_id"] = self.pk
ItemKey.objects.get_or_create(**filtr)
def generate_key(self, force=False):
@@ -601,16 +672,14 @@ class GeneralType(Cached, models.Model):
keys = [self.txt_idx]
content_type = ContentType.objects.get_for_model(self.__class__)
base_q = Q(content_type=content_type, object_id=self.pk)
- subquery = Q(importer__isnull=True, user__isnull=True,
- group__isnull=True)
- subquery |= Q(user__isnull=True, group__isnull=True,
- importer=importer)
+ subquery = Q(importer__isnull=True, user__isnull=True, group__isnull=True)
+ subquery |= Q(user__isnull=True, group__isnull=True, importer=importer)
if importer.user:
- subquery |= Q(user=importer.user, group__isnull=True,
- importer=importer)
+ subquery |= Q(user=importer.user, group__isnull=True, importer=importer)
if importer.associated_group:
- subquery |= Q(user__isnull=True, group=importer.associated_group,
- importer=importer)
+ subquery |= Q(
+ user__isnull=True, group=importer.associated_group, importer=importer
+ )
q = ItemKey.objects.filter(base_q & subquery)
for ik in q.exclude(key=self.txt_idx).all():
keys.append(ik.key)
@@ -624,9 +693,13 @@ class GeneralType(Cached, models.Model):
class HierarchicalType(GeneralType):
- parent = models.ForeignKey('self', blank=True, null=True,
- on_delete=models.SET_NULL,
- verbose_name=_("Parent"))
+ parent = models.ForeignKey(
+ "self",
+ blank=True,
+ null=True,
+ on_delete=models.SET_NULL,
+ verbose_name=_("Parent"),
+ )
class Meta:
abstract = True
@@ -665,16 +738,13 @@ class StatisticItem:
class TemplateItem:
@classmethod
def _label_templates_q(cls):
- model_name = "{}.{}".format(
- cls.__module__, cls.__name__)
- q = Q(associated_model__klass=model_name,
- for_labels=True, available=True)
- alt_model_name = model_name.replace(
- "models_finds", "models").replace(
- "models_treatments", "models")
+ model_name = "{}.{}".format(cls.__module__, cls.__name__)
+ q = Q(associated_model__klass=model_name, for_labels=True, available=True)
+ alt_model_name = model_name.replace("models_finds", "models").replace(
+ "models_treatments", "models"
+ )
if alt_model_name != model_name:
- q |= Q(associated_model__klass=model_name,
- for_labels=True, available=True)
+ q |= Q(associated_model__klass=model_name, for_labels=True, available=True)
DocumentTemplate = apps.get_model("ishtar_common", "DocumentTemplate")
return DocumentTemplate.objects.filter(q)
@@ -695,33 +765,35 @@ class TemplateItem:
if "models_finds" in name or "models_treatments" in name:
names = [
name,
- name.replace("models_finds", "models"
- ).replace("models_treatments", "models")
+ name.replace("models_finds", "models").replace(
+ "models_treatments", "models"
+ ),
]
else:
- names = [name, name.replace("models", "models_finds"),
- name.replace("models", "models_treatments")]
+ names = [
+ name,
+ name.replace("models", "models_finds"),
+ name.replace("models", "models_treatments"),
+ ]
else:
names = [name]
- model_names = [
- "{}.{}".format(module, name) for name in names
- ]
+ model_names = ["{}.{}".format(module, name) for name in names]
DocumentTemplate = apps.get_model("ishtar_common", "DocumentTemplate")
q = DocumentTemplate.objects.filter(
- associated_model__klass__in=model_names,
- for_labels=False, available=True)
+ associated_model__klass__in=model_names, for_labels=False, available=True
+ )
for template in q.all():
urlname = "generate-document"
templates.append(
- (template.name, reverse(
- urlname, args=[template.slug, self.pk]))
+ (template.name, reverse(urlname, args=[template.slug, self.pk]))
)
return templates
class FullSearch(models.Model):
- search_vector = SearchVectorField(_("Search vector"), blank=True, null=True,
- help_text=_("Auto filled at save"))
+ search_vector = SearchVectorField(
+ _("Search vector"), blank=True, null=True, help_text=_("Auto filled at save")
+ )
EXTRA_REQUEST_KEYS = {}
DYNAMIC_REQUESTS = {}
@@ -742,7 +814,7 @@ class FullSearch(models.Model):
def general_types(cls):
for k in get_all_field_names(cls):
field = cls._meta.get_field(k)
- if not hasattr(field, 'rel') or not field.rel:
+ if not hasattr(field, "rel") or not field.rel:
continue
rel_model = field.rel.to
if issubclass(rel_model, (GeneralType, HierarchicalType)):
@@ -768,8 +840,9 @@ class FullSearch(models.Model):
def _update_search_field(self, search_vector_conf, search_vectors, data):
for value in search_vector_conf.format(data):
with connection.cursor() as cursor:
- cursor.execute("SELECT to_tsvector(%s, %s)", [
- search_vector_conf.language, value])
+ cursor.execute(
+ "SELECT to_tsvector(%s, %s)", [search_vector_conf.language, value]
+ )
row = cursor.fetchone()
search_vectors.append(row[0])
@@ -782,20 +855,22 @@ class FullSearch(models.Model):
:param save: True if you want to save the object immediately
:return: True if modified
"""
- if not hasattr(self, 'search_vector'):
+ if not hasattr(self, "search_vector"):
return
if not self.pk:
# logger.warning("Cannot update search vector before save or "
# "after deletion.")
return
- if not self.BASE_SEARCH_VECTORS and not self.M2M_SEARCH_VECTORS \
- and not self.INT_SEARCH_VECTORS \
- and not self.PROPERTY_SEARCH_VECTORS \
- and not self.PARENT_SEARCH_VECTORS:
- logger.warning("No search_vectors defined for {}".format(
- self.__class__))
+ if (
+ not self.BASE_SEARCH_VECTORS
+ and not self.M2M_SEARCH_VECTORS
+ and not self.INT_SEARCH_VECTORS
+ and not self.PROPERTY_SEARCH_VECTORS
+ and not self.PARENT_SEARCH_VECTORS
+ ):
+ logger.warning("No search_vectors defined for {}".format(self.__class__))
return
- if getattr(self, '_search_updated', None):
+ if getattr(self, "_search_updated", None):
return
JsonDataField = apps.get_model("ishtar_common", "JsonDataField")
self._search_updated = True
@@ -808,30 +883,29 @@ class FullSearch(models.Model):
# many to many have to be queried one by one otherwise only one is fetch
for m2m_search_vector in self.M2M_SEARCH_VECTORS:
- key = m2m_search_vector.key.split('__')[0]
+ key = m2m_search_vector.key.split("__")[0]
rel_key = getattr(self, key)
- for item in rel_key.values('pk').all():
- query_dct = {key + "__pk": item['pk']}
+ for item in rel_key.values("pk").all():
+ query_dct = {key + "__pk": item["pk"]}
q = copy.copy(base_q).filter(**query_dct)
q = q.annotate(
search=SearchVector(
- m2m_search_vector.key,
- config=m2m_search_vector.language)
- ).values('search')
- search_vectors.append(q.all()[0]['search'])
+ m2m_search_vector.key, config=m2m_search_vector.language
+ )
+ ).values("search")
+ search_vectors.append(q.all()[0]["search"])
# int/float are not well managed by the SearchVector
for int_search_vector in self.INT_SEARCH_VECTORS:
q = base_q.values(int_search_vector.key)
- for val in int_search_vector.format(
- q.all()[0][int_search_vector.key]):
+ for val in int_search_vector.format(q.all()[0][int_search_vector.key]):
self._update_search_number_field(search_vectors, val)
if not exclude_parent:
# copy parent vector fields
for PARENT_SEARCH_VECTOR in self.PARENT_SEARCH_VECTORS:
parent = getattr(self, PARENT_SEARCH_VECTOR)
- if hasattr(parent, 'all'): # m2m
+ if hasattr(parent, "all"): # m2m
for p in parent.all():
search_vectors.append(p.search_vector)
elif parent:
@@ -839,7 +913,7 @@ class FullSearch(models.Model):
for PARENT_ONLY_SEARCH_VECTOR in self.PARENT_ONLY_SEARCH_VECTORS:
parent = getattr(self, PARENT_ONLY_SEARCH_VECTOR)
- if hasattr(parent, 'all'): # m2m
+ if hasattr(parent, "all"): # m2m
for p in parent.all():
search_vectors.append(
p.update_search_vector(save=False, exclude_parent=True)
@@ -856,8 +930,7 @@ class FullSearch(models.Model):
for base_search_vector in self.BASE_SEARCH_VECTORS:
data = res[base_search_vector.key]
data = unidecode(str(data))
- self._update_search_field(base_search_vector,
- search_vectors, data)
+ self._update_search_field(base_search_vector, search_vectors, data)
if self.PROPERTY_SEARCH_VECTORS:
for property_search_vector in self.PROPERTY_SEARCH_VECTORS:
@@ -867,17 +940,16 @@ class FullSearch(models.Model):
if not data:
continue
data = str(data)
- self._update_search_field(property_search_vector,
- search_vectors, data)
+ self._update_search_field(property_search_vector, search_vectors, data)
- if hasattr(self, 'data') and self.data:
+ if hasattr(self, "data") and self.data:
content_type = ContentType.objects.get_for_model(self)
for json_field in JsonDataField.objects.filter(
- content_type=content_type,
- search_index=True).all():
+ content_type=content_type, search_index=True
+ ).all():
data = copy.deepcopy(self.data)
no_data = False
- for key in json_field.key.split('__'):
+ for key in json_field.key.split("__"):
if key not in data:
no_data = True
break
@@ -885,22 +957,21 @@ class FullSearch(models.Model):
if no_data or not data:
continue
- if json_field.value_type == 'B':
+ if json_field.value_type == "B":
if data is True:
data = json_field.name
else:
continue
- elif json_field.value_type in ('I', 'F'):
+ elif json_field.value_type in ("I", "F"):
self._update_search_number_field(search_vectors, data)
continue
- elif json_field.value_type == 'D':
+ elif json_field.value_type == "D":
# only index year
self._update_search_number_field(search_vectors, data.year)
continue
for lang in ("simple", settings.ISHTAR_SEARCH_LANGUAGE):
with connection.cursor() as cursor:
- cursor.execute("SELECT to_tsvector(%s, %s)",
- [lang, data])
+ cursor.execute("SELECT to_tsvector(%s, %s)", [lang, data])
row = cursor.fetchone()
search_vectors.append(row[0])
new_search_vector = merge_tsvectors(search_vectors)
@@ -908,7 +979,8 @@ class FullSearch(models.Model):
self.search_vector = new_search_vector
if save and changed:
self.__class__.objects.filter(pk=self.pk).update(
- search_vector=new_search_vector)
+ search_vector=new_search_vector
+ )
elif not save:
return new_search_vector
return changed
@@ -916,8 +988,8 @@ class FullSearch(models.Model):
class Imported(models.Model):
imports = models.ManyToManyField(
- Import, blank=True,
- related_name="imported_%(app_label)s_%(class)s")
+ Import, blank=True, related_name="imported_%(app_label)s_%(class)s"
+ )
class Meta:
abstract = True
@@ -941,18 +1013,24 @@ class JsonData(models.Model, CachedGen):
except ContentType.DoesNotExists:
return sections
JsonDataField = apps.get_model("ishtar_common", "JsonDataField")
- fields = list(JsonDataField.objects.filter(
- content_type=content_type, display=True, section__isnull=True
- ).all()) # no section fields
-
- fields += list(JsonDataField.objects.filter(
- content_type=content_type, display=True, section__isnull=False
- ).order_by('section__order', 'order').all())
+ fields = list(
+ JsonDataField.objects.filter(
+ content_type=content_type, display=True, section__isnull=True
+ ).all()
+ ) # no section fields
+
+ fields += list(
+ JsonDataField.objects.filter(
+ content_type=content_type, display=True, section__isnull=False
+ )
+ .order_by("section__order", "order")
+ .all()
+ )
for field in fields:
value = None
data = self.data.copy()
- for key in field.key.split('__'):
+ for key in field.key.split("__"):
if key in data:
value = copy.copy(data[key])
data = data[key]
@@ -972,14 +1050,14 @@ class JsonData(models.Model, CachedGen):
@classmethod
def refresh_cache(cls):
- __, refreshed = get_cache(cls, ['cache_refreshed'])
+ __, refreshed = get_cache(cls, ["cache_refreshed"])
if refreshed and time.time() - refreshed < 1:
return
- cache_ckey, current_keys = get_cache(cls, ['_current_keys'])
+ cache_ckey, current_keys = get_cache(cls, ["_current_keys"])
if not current_keys:
return
for keys in current_keys:
- if keys[0] == '__get_dynamic_choices':
+ if keys[0] == "__get_dynamic_choices":
cls._get_dynamic_choices(keys[1], force=True)
@classmethod
@@ -990,18 +1068,19 @@ class JsonData(models.Model, CachedGen):
:param force: if set to True do not use cache
:return: tuple of choices (id, value)
"""
- cache_key, value = get_cache(cls, ['__get_dynamic_choices', key])
+ cache_key, value = get_cache(cls, ["__get_dynamic_choices", key])
if not force and value:
return value
choices = set()
- splitted_key = key[len('data__'):].split('__')
- q = cls.objects.filter(
- data__has_key=key[len('data__'):]).values_list('data', flat=True)
+ splitted_key = key[len("data__") :].split("__")
+ q = cls.objects.filter(data__has_key=key[len("data__") :]).values_list(
+ "data", flat=True
+ )
for value in q.all():
for k in splitted_key:
value = value[k]
choices.add(value)
- choices = [('', '')] + [(v, v) for v in sorted(list(choices))]
+ choices = [("", "")] + [(v, v) for v in sorted(list(choices))]
cache.set(cache_key, choices, settings.CACHE_SMALLTIMEOUT)
return choices
@@ -1022,8 +1101,9 @@ class FixAssociated:
expected_values = [expected_values]
if hasattr(ctype, "txt_idx"):
try:
- expected_values = [ctype.objects.get(txt_idx=v)
- for v in expected_values]
+ expected_values = [
+ ctype.objects.get(txt_idx=v) for v in expected_values
+ ]
except ctype.DoesNotExist:
# type not yet initialized
return
@@ -1066,8 +1146,9 @@ class CascasdeUpdate:
class SearchAltName(object):
- def __init__(self, search_key, search_query, extra_query=None,
- distinct_query=False):
+ def __init__(
+ self, search_key, search_query, extra_query=None, distinct_query=False
+ ):
self.search_key = search_key
self.search_query = search_query
self.extra_query = extra_query or {}
@@ -1083,8 +1164,17 @@ class HistoryError(Exception):
class HistoricalRecords(BaseHistoricalRecords):
- def _save_historic(self, manager, instance, history_date, history_type,
- history_user, history_change_reason, using, attrs):
+ def _save_historic(
+ self,
+ manager,
+ instance,
+ history_date,
+ history_type,
+ history_user,
+ history_change_reason,
+ using,
+ attrs,
+ ):
history_instance = manager.model(
history_date=history_date,
history_type=history_type,
@@ -1117,98 +1207,132 @@ class HistoricalRecords(BaseHistoricalRecords):
def create_historical_record(self, instance, history_type, using=None):
try:
- history_modifier = getattr(instance, 'history_modifier', None)
+ history_modifier = getattr(instance, "history_modifier", None)
assert history_modifier
except (User.DoesNotExist, AssertionError):
# on batch removing of users, user could have disappeared
return
- history_date = getattr(instance, "_history_date",
- datetime.datetime.now())
+ history_date = getattr(instance, "_history_date", datetime.datetime.now())
history_change_reason = getattr(instance, "changeReason", None)
force = getattr(instance, "_force_history", False)
manager = getattr(instance, self.manager_name)
attrs = {}
for field in instance._meta.fields:
attrs[field.attname] = getattr(instance, field.attname)
- q_history = instance.history \
- .filter(history_modifier_id=history_modifier.pk) \
- .order_by('-history_date', '-history_id')
+ q_history = instance.history.filter(
+ history_modifier_id=history_modifier.pk
+ ).order_by("-history_date", "-history_id")
# instance.skip_history_when_saving = True
if not q_history.count():
if force:
- delattr(instance, '_force_history')
+ delattr(instance, "_force_history")
self._save_historic(
- manager, instance, history_date, history_type, history_modifier,
- history_change_reason, using, attrs)
+ manager,
+ instance,
+ history_date,
+ history_type,
+ history_modifier,
+ history_change_reason,
+ using,
+ attrs,
+ )
return
old_instance = q_history.all()[0]
# multiple saving by the same user in a very short time are generaly
# caused by post_save signals it is not relevant to keep them
- min_history_date = datetime.datetime.now() \
- - datetime.timedelta(seconds=5)
- q = q_history.filter(history_date__isnull=False,
- history_date__gt=min_history_date) \
- .order_by('-history_date', '-history_id')
+ min_history_date = datetime.datetime.now() - datetime.timedelta(seconds=5)
+ q = q_history.filter(
+ history_date__isnull=False, history_date__gt=min_history_date
+ ).order_by("-history_date", "-history_id")
if not force and q.count():
return
if force:
- delattr(instance, '_force_history')
+ delattr(instance, "_force_history")
# record a new version only if data have been changed
for field in instance._meta.fields:
if getattr(old_instance, field.attname) != attrs[field.attname]:
- self._save_historic(manager, instance, history_date,
- history_type, history_modifier,
- history_change_reason, using, attrs)
+ self._save_historic(
+ manager,
+ instance,
+ history_date,
+ history_type,
+ history_modifier,
+ history_change_reason,
+ using,
+ attrs,
+ )
return
-class BaseHistorizedItem(StatisticItem, TemplateItem, FullSearch, Imported,
- JsonData, FixAssociated, CascasdeUpdate):
+class BaseHistorizedItem(
+ StatisticItem,
+ TemplateItem,
+ FullSearch,
+ Imported,
+ JsonData,
+ FixAssociated,
+ CascasdeUpdate,
+):
"""
Historized item with external ID management.
All historized items are searchable and have a data json field.
Historized items can be "locked" for edition.
"""
+
IS_BASKET = False
SHOW_URL = None
- EXTERNAL_ID_KEY = ''
+ EXTERNAL_ID_KEY = ""
EXTERNAL_ID_DEPENDENCIES = []
HISTORICAL_M2M = []
history_modifier = models.ForeignKey(
- User, related_name='+', on_delete=models.SET_NULL,
- verbose_name=_("Last editor"), blank=True, null=True)
+ User,
+ related_name="+",
+ on_delete=models.SET_NULL,
+ verbose_name=_("Last editor"),
+ blank=True,
+ null=True,
+ )
history_creator = models.ForeignKey(
- User, related_name='+', on_delete=models.SET_NULL,
- verbose_name=_("Creator"), blank=True, null=True)
+ User,
+ related_name="+",
+ on_delete=models.SET_NULL,
+ verbose_name=_("Creator"),
+ blank=True,
+ null=True,
+ )
last_modified = models.DateTimeField(auto_now=True)
history_m2m = JSONField(default={}, blank=True)
- need_update = models.BooleanField(
- verbose_name=_("Need update"), default=False)
+ need_update = models.BooleanField(verbose_name=_("Need update"), default=False)
locked = models.BooleanField(
- verbose_name=_("Item locked for edition"), default=False)
+ verbose_name=_("Item locked for edition"), default=False
+ )
lock_user = models.ForeignKey(
- User, related_name='+', on_delete=models.SET_NULL,
- verbose_name=_("Locked by"), blank=True, null=True)
+ User,
+ related_name="+",
+ on_delete=models.SET_NULL,
+ verbose_name=_("Locked by"),
+ blank=True,
+ null=True,
+ )
ALT_NAMES = {
- 'history_creator': SearchAltName(
+ "history_creator": SearchAltName(
pgettext_lazy("key for text search", "created-by"),
- 'history_creator__ishtaruser__person__cached_label__iexact'
+ "history_creator__ishtaruser__person__cached_label__iexact",
),
- 'history_modifier': SearchAltName(
+ "history_modifier": SearchAltName(
pgettext_lazy("key for text search", "modified-by"),
- 'history_modifier__ishtaruser__person__cached_label__iexact'
+ "history_modifier__ishtaruser__person__cached_label__iexact",
),
- 'modified_before': SearchAltName(
+ "modified_before": SearchAltName(
pgettext_lazy("key for text search", "modified-before"),
- 'last_modified__lte'
+ "last_modified__lte",
),
- 'modified_after': SearchAltName(
- pgettext_lazy("key for text search", "modified-after"),
- 'last_modified__gte'
+ "modified_after": SearchAltName(
+ pgettext_lazy("key for text search", "modified-after"), "last_modified__gte"
),
}
@@ -1235,8 +1359,8 @@ class BaseHistorizedItem(StatisticItem, TemplateItem, FullSearch, Imported,
def update_external_id(self, save=False):
if not self.EXTERNAL_ID_KEY or (
- self.external_id and
- not getattr(self, 'auto_external_id', False)):
+ self.external_id and not getattr(self, "auto_external_id", False)
+ ):
return
external_id = get_generated_id(self.EXTERNAL_ID_KEY, self)
if external_id == self.external_id:
@@ -1250,10 +1374,10 @@ class BaseHistorizedItem(StatisticItem, TemplateItem, FullSearch, Imported,
return external_id
def get_last_history_date(self):
- q = self.history.values("history_date").order_by('-history_date')
+ q = self.history.values("history_date").order_by("-history_date")
if not q.count():
return
- return q.all()[0]['history_date']
+ return q.all()[0]["history_date"]
def get_previous(self, step=None, date=None, strict=False):
"""
@@ -1288,11 +1412,11 @@ class BaseHistorizedItem(StatisticItem, TemplateItem, FullSearch, Imported,
model = self.__class__
for k in get_all_field_names(model):
field = model._meta.get_field(k)
- if hasattr(field, 'rel') and field.rel:
- if not hasattr(item, k + '_id'):
+ if hasattr(field, "rel") and field.rel:
+ if not hasattr(item, k + "_id"):
setattr(item, k, getattr(self, k))
continue
- val = getattr(item, k + '_id')
+ val = getattr(item, k + "_id")
if not val:
setattr(item, k, None)
continue
@@ -1301,8 +1425,9 @@ class BaseHistorizedItem(StatisticItem, TemplateItem, FullSearch, Imported,
setattr(item, k, val)
except ObjectDoesNotExist:
if strict:
- raise HistoryError("The class %s has no pk %d" % (
- str(field.rel.to), val))
+ raise HistoryError(
+ "The class %s has no pk %d" % (str(field.rel.to), val)
+ )
setattr(item, k, None)
item.pk = self.pk
return item
@@ -1310,14 +1435,14 @@ class BaseHistorizedItem(StatisticItem, TemplateItem, FullSearch, Imported,
@property
def last_edition_date(self):
try:
- return self.history.order_by('-history_date').all()[0].history_date
+ return self.history.order_by("-history_date").all()[0].history_date
except (AttributeError, IndexError):
return
@property
def history_creation_date(self):
try:
- return self.history.order_by('history_date').all()[0].history_date
+ return self.history.order_by("history_date").all()[0].history_date
except (AttributeError, IndexError):
return
@@ -1336,14 +1461,15 @@ class BaseHistorizedItem(StatisticItem, TemplateItem, FullSearch, Imported,
try:
field_keys = [f.name for f in self._meta.fields]
for k in field_keys:
- if k != 'id' and hasattr(self, k):
+ if k != "id" and hasattr(self, k):
if not hasattr(new_item, k):
k = k + "_id"
setattr(self, k, getattr(new_item, k))
try:
self.history_modifier = User.objects.get(
- pk=new_item.history_modifier_id)
+ pk=new_item.history_modifier_id
+ )
except User.ObjectDoesNotExist:
pass
self.save()
@@ -1373,51 +1499,64 @@ class BaseHistorizedItem(StatisticItem, TemplateItem, FullSearch, Imported,
values = {}
for f in self._meta.fields:
k = f.name
- if k != 'id':
+ if k != "id":
values[k] = getattr(self, k)
return values
def get_absolute_url(self):
try:
- return reverse('display-item', args=[self.SLUG, self.pk])
+ return reverse("display-item", args=[self.SLUG, self.pk])
except NoReverseMatch:
return
def get_show_url(self):
show_url = self.SHOW_URL
if not show_url:
- show_url = 'show-' + self.__class__.__name__.lower()
+ show_url = "show-" + self.__class__.__name__.lower()
try:
- return reverse(show_url, args=[self.pk, ''])
+ return reverse(show_url, args=[self.pk, ""])
except NoReverseMatch:
return
@property
def associated_filename(self):
- if [True for attr in ('get_town_label', 'get_department', 'reference',
- 'short_class_name') if not hasattr(self, attr)]:
- return ''
- items = [slugify(self.get_department()),
- slugify(self.get_town_label()).upper(),
- slugify(self.short_class_name),
- slugify(self.reference),
- slugify(self.name or '').replace('-', '_').capitalize()]
+ if [
+ True
+ for attr in (
+ "get_town_label",
+ "get_department",
+ "reference",
+ "short_class_name",
+ )
+ if not hasattr(self, attr)
+ ]:
+ return ""
+ items = [
+ slugify(self.get_department()),
+ slugify(self.get_town_label()).upper(),
+ slugify(self.short_class_name),
+ slugify(self.reference),
+ slugify(self.name or "").replace("-", "_").capitalize(),
+ ]
last_edition_date = self.last_edition_date
if last_edition_date:
- items.append(last_edition_date.strftime('%Y%m%d'))
+ items.append(last_edition_date.strftime("%Y%m%d"))
else:
- items.append('00000000')
+ items.append("00000000")
return "-".join([str(item) for item in items])
def save(self, *args, **kwargs):
created = not self.pk
- if not getattr(self, 'skip_history_when_saving', False):
- assert hasattr(self, 'history_modifier')
+ if not getattr(self, "skip_history_when_saving", False):
+ assert hasattr(self, "history_modifier")
if created:
self.history_creator = self.history_modifier
# external ID can have related item not available before save
- external_id_updated = kwargs.pop('external_id_updated') \
- if 'external_id_updated' in kwargs else False
+ external_id_updated = (
+ kwargs.pop("external_id_updated")
+ if "external_id_updated" in kwargs
+ else False
+ )
if not created and not external_id_updated:
self.update_external_id()
super(BaseHistorizedItem, self).save(*args, **kwargs)
@@ -1471,16 +1610,17 @@ class OwnPerms(object):
checked
:return: boolean
"""
- if not getattr(request.user, 'ishtaruser', None):
+ if not getattr(request.user, "ishtaruser", None):
return False
- splited = action_name.split('_')
- action_own_name = splited[0] + '_own_' + '_'.join(splited[1:])
+ splited = action_name.split("_")
+ action_own_name = splited[0] + "_own_" + "_".join(splited[1:])
user = request.user
if action_own_name == "view_own_findbasket":
action_own_name = "view_own_find"
- return user.ishtaruser.has_right(action_name, request.session) or \
- (user.ishtaruser.has_right(action_own_name, request.session)
- and self.is_own(user.ishtaruser))
+ return user.ishtaruser.has_right(action_name, request.session) or (
+ user.ishtaruser.has_right(action_own_name, request.session)
+ and self.is_own(user.ishtaruser)
+ )
def is_own(self, user, alt_query_own=None):
"""
@@ -1489,7 +1629,7 @@ class OwnPerms(object):
IshtarUser = apps.get_model("ishtar_common", "IshtarUser")
if isinstance(user, IshtarUser):
ishtaruser = user
- elif hasattr(user, 'ishtaruser'):
+ elif hasattr(user, "ishtaruser"):
ishtaruser = user.ishtaruser
else:
return False
@@ -1510,7 +1650,7 @@ class OwnPerms(object):
IshtarUser = apps.get_model("ishtar_common", "IshtarUser")
if isinstance(user, IshtarUser):
ishtaruser = user
- elif hasattr(user, 'ishtaruser'):
+ elif hasattr(user, "ishtaruser"):
ishtaruser = user.ishtaruser
else:
return False
@@ -1520,12 +1660,13 @@ class OwnPerms(object):
return cls.objects.filter(query).count()
@classmethod
- def _return_get_owns(cls, owns, values, get_short_menu_class,
- label_key='cached_label'):
+ def _return_get_owns(
+ cls, owns, values, get_short_menu_class, label_key="cached_label"
+ ):
if not owns:
return []
sorted_values = []
- if hasattr(cls, 'BASKET_MODEL'):
+ if hasattr(cls, "BASKET_MODEL"):
owns_len = len(owns)
for idx, item in enumerate(reversed(owns)):
if get_short_menu_class:
@@ -1537,24 +1678,31 @@ class OwnPerms(object):
if not values:
if not get_short_menu_class:
return sorted_values + list(
- sorted(owns, key=lambda x: getattr(x, label_key) or ""))
+ sorted(owns, key=lambda x: getattr(x, label_key) or "")
+ )
return sorted_values + list(
- sorted(owns, key=lambda x: getattr(x[0], label_key) or ""))
+ sorted(owns, key=lambda x: getattr(x[0], label_key) or "")
+ )
if not get_short_menu_class:
- return sorted_values + list(
- sorted(owns, key=lambda x: x[label_key] or ""))
- return sorted_values + list(
- sorted(owns, key=lambda x: x[0][label_key] or ""))
+ return sorted_values + list(sorted(owns, key=lambda x: x[label_key] or ""))
+ return sorted_values + list(sorted(owns, key=lambda x: x[0][label_key] or ""))
@classmethod
- def get_owns(cls, user, replace_query=None, limit=None, values=None,
- get_short_menu_class=False, menu_filtr=None):
+ def get_owns(
+ cls,
+ user,
+ replace_query=None,
+ limit=None,
+ values=None,
+ get_short_menu_class=False,
+ menu_filtr=None,
+ ):
"""
Get Own items
"""
if not replace_query:
replace_query = {}
- if hasattr(user, 'is_authenticated') and not user.is_authenticated():
+ if hasattr(user, "is_authenticated") and not user.is_authenticated():
returned = cls.objects.filter(pk__isnull=True)
if values:
returned = []
@@ -1575,7 +1723,7 @@ class OwnPerms(object):
return []
return cls.objects.filter(pk__isnull=True)
items = []
- if hasattr(cls, 'BASKET_MODEL'):
+ if hasattr(cls, "BASKET_MODEL"):
items = list(cls.BASKET_MODEL.objects.filter(user=ishtaruser).all())
query = cls.get_query_owns(ishtaruser)
if not query and not replace_query:
@@ -1590,24 +1738,25 @@ class OwnPerms(object):
if values:
q = q.values(*values)
if limit:
- items += list(q.order_by('-pk')[:limit])
+ items += list(q.order_by("-pk")[:limit])
else:
items += list(q.order_by(*cls._meta.ordering).all())
if get_short_menu_class:
if values:
- if 'id' not in values:
+ if "id" not in values:
raise NotImplementedError(
"Call of get_owns with get_short_menu_class option and"
- " no 'id' in values is not implemented")
+ " no 'id' in values is not implemented"
+ )
my_items = []
for i in items:
- if hasattr(cls, 'BASKET_MODEL') and \
- type(i) == cls.BASKET_MODEL:
+ if hasattr(cls, "BASKET_MODEL") and type(i) == cls.BASKET_MODEL:
dct = dict([(k, getattr(i, k)) for k in values])
my_items.append(
- (dct, cls.BASKET_MODEL.get_short_menu_class(i.pk)))
+ (dct, cls.BASKET_MODEL.get_short_menu_class(i.pk))
+ )
else:
- my_items.append((i, cls.get_short_menu_class(i['id'])))
+ my_items.append((i, cls.get_short_menu_class(i["id"])))
items = my_items
else:
items = [(i, cls.get_short_menu_class(i.pk)) for i in items]
@@ -1654,7 +1803,7 @@ class State(models.Model):
class Meta:
verbose_name = _("State")
- ordering = ['number']
+ ordering = ["number"]
def __str__(self):
return self.label
@@ -1667,7 +1816,10 @@ class Department(models.Model):
label = models.CharField(_("Label"), max_length=30)
number = models.CharField(_("Number"), unique=True, max_length=3)
state = models.ForeignKey(
- 'State', verbose_name=_("State"), blank=True, null=True,
+ "State",
+ verbose_name=_("State"),
+ blank=True,
+ null=True,
on_delete=models.SET_NULL,
)
objects = NumberManager()
@@ -1675,7 +1827,7 @@ class Department(models.Model):
class Meta:
verbose_name = _("Department")
verbose_name_plural = _("Departments")
- ordering = ['number']
+ ordering = ["number"]
def __str__(self):
return self.label
@@ -1709,12 +1861,10 @@ class Arrondissement(models.Model):
class Canton(models.Model):
name = models.CharField("Nom", max_length=30)
- arrondissement = models.ForeignKey(Arrondissement,
- verbose_name="Arrondissement")
+ arrondissement = models.ForeignKey(Arrondissement, verbose_name="Arrondissement")
def __str__(self):
- return settings.JOINT.join(
- (self.name, str(self.arrondissement)))
+ return settings.JOINT.join((self.name, str(self.arrondissement)))
class TownManager(models.GeoManager):
@@ -1725,37 +1875,46 @@ class TownManager(models.GeoManager):
class Town(Imported, models.Model):
name = models.CharField(_("Name"), max_length=100)
surface = models.IntegerField(_("Surface (m2)"), blank=True, null=True)
- center = models.PointField(_("Localisation"), srid=settings.SRID,
- blank=True, null=True)
+ center = models.PointField(
+ _("Localisation"), srid=settings.SRID, blank=True, null=True
+ )
limit = models.MultiPolygonField(_("Limit"), blank=True, null=True)
- numero_insee = models.CharField("Code commune (numéro INSEE)",
- max_length=120)
+ numero_insee = models.CharField("Code commune (numéro INSEE)", max_length=120)
departement = models.ForeignKey(
- Department, verbose_name=_("Department"),
- on_delete=models.SET_NULL, null=True, blank=True)
+ Department,
+ verbose_name=_("Department"),
+ on_delete=models.SET_NULL,
+ null=True,
+ blank=True,
+ )
year = models.IntegerField(
- _("Year of creation"), null=True, blank=True,
- help_text=_("Filling this field is relevant to distinguish old towns "
- "from new towns."))
+ _("Year of creation"),
+ null=True,
+ blank=True,
+ help_text=_(
+ "Filling this field is relevant to distinguish old towns " "from new towns."
+ ),
+ )
children = models.ManyToManyField(
- 'Town', verbose_name=_("Town children"), blank=True,
- related_name='parents')
- cached_label = models.CharField(_("Cached name"), max_length=500,
- null=True, blank=True, db_index=True)
+ "Town", verbose_name=_("Town children"), blank=True, related_name="parents"
+ )
+ cached_label = models.CharField(
+ _("Cached name"), max_length=500, null=True, blank=True, db_index=True
+ )
objects = TownManager()
class Meta:
verbose_name = _("Town")
verbose_name_plural = _("Towns")
- if settings.COUNTRY == 'fr':
- ordering = ['numero_insee']
- unique_together = (('numero_insee', 'year'),)
+ if settings.COUNTRY == "fr":
+ ordering = ["numero_insee"]
+ unique_together = (("numero_insee", "year"),)
def natural_key(self):
return (self.numero_insee, self.year)
def history_compress(self):
- return {'numero_insee': self.numero_insee, 'year': self.year or ""}
+ return {"numero_insee": self.numero_insee, "year": self.year or ""}
@classmethod
def get_documentation_string(cls):
@@ -1763,13 +1922,14 @@ class Town(Imported, models.Model):
Used for automatic documentation generation
"""
return "**name** {}, **numero_insee** {}, **cached_label** {}".format(
- _("Name"), "Code commune (numéro INSEE)", _("Cached name"))
+ _("Name"), "Code commune (numéro INSEE)", _("Cached name")
+ )
- def get_values(self, prefix='', **kwargs):
+ def get_values(self, prefix="", **kwargs):
return {
prefix or "label": str(self),
prefix + "name": self.name,
- prefix + "numero_insee": self.numero_insee
+ prefix + "numero_insee": self.numero_insee,
}
@classmethod
@@ -1780,8 +1940,10 @@ class Town(Imported, models.Model):
for value in full_value:
try:
res.append(
- cls.objects.get(numero_insee=value['numero_insee'],
- year=value['year'] or None))
+ cls.objects.get(
+ numero_insee=value["numero_insee"], year=value["year"] or None
+ )
+ )
except cls.DoesNotExist:
continue
return res
@@ -1818,8 +1980,8 @@ class Town(Imported, models.Model):
else:
parents = parents.union(parent.limit)
# if union is a simple polygon make it a multi
- if 'MULTI' not in parents.wkt:
- parents = parents.wkt.replace('POLYGON', 'MULTIPOLYGON(') + ")"
+ if "MULTI" not in parents.wkt:
+ parents = parents.wkt.replace("POLYGON", "MULTIPOLYGON(") + ")"
if not parents:
return
self.limit = parents
@@ -1838,8 +2000,7 @@ class Town(Imported, models.Model):
def generate_area(self, force=False):
if not force and (self.surface or not self.limit):
return
- surface = self.limit.transform(settings.SURFACE_SRID,
- clone=True).area
+ surface = self.limit.transform(settings.SURFACE_SRID, clone=True).area
if surface > 214748364 or not surface:
return False
self.surface = surface
@@ -1850,7 +2011,7 @@ class Town(Imported, models.Model):
if not self.numero_insee or not self.children.count() or not self.year:
return
old_num = self.numero_insee[:]
- numero = old_num.split('-')[0]
+ numero = old_num.split("-")[0]
self.numero_insee = "{}-{}".format(numero, self.year)
if self.numero_insee != old_num:
return True
@@ -1859,10 +2020,12 @@ class Town(Imported, models.Model):
cached_label = self.name
if settings.COUNTRY == "fr" and self.numero_insee:
dpt_len = 2
- if self.numero_insee.startswith('97') or \
- self.numero_insee.startswith('98') or \
- self.numero_insee[0] not in ('0', '1', '2', '3', '4', '5',
- '6', '7', '8', '9'):
+ if (
+ self.numero_insee.startswith("97")
+ or self.numero_insee.startswith("98")
+ or self.numero_insee[0]
+ not in ("0", "1", "2", "3", "4", "5", "6", "7", "8", "9")
+ ):
dpt_len = 3
cached_label = "%s - %s" % (self.name, self.numero_insee[:dpt_len])
if self.year and self.children.count():
@@ -1872,7 +2035,7 @@ class Town(Imported, models.Model):
def post_save_town(sender, **kwargs):
cached_label_changed(sender, **kwargs)
- town = kwargs['instance']
+ town = kwargs["instance"]
town.generate_geo()
if town.update_town_code():
town.save()
@@ -1882,7 +2045,7 @@ post_save.connect(post_save_town, sender=Town)
def town_child_changed(sender, **kwargs):
- town = kwargs['instance']
+ town = kwargs["instance"]
if town.update_town_code():
town.save()
@@ -1892,53 +2055,75 @@ m2m_changed.connect(town_child_changed, sender=Town.children.through)
class Address(BaseHistorizedItem):
FIELDS = (
- "address", "address_complement", "postal_code", "town",
- "precise_town", "country",
- "alt_address", "alt_address_complement", "alt_postal_code", "alt_town",
+ "address",
+ "address_complement",
+ "postal_code",
+ "town",
+ "precise_town",
+ "country",
+ "alt_address",
+ "alt_address_complement",
+ "alt_postal_code",
+ "alt_town",
"alt_country",
- "phone", "phone_desc", "phone2", "phone_desc2", "phone3", "phone_desc3",
- "raw_phone", "mobile_phone", "email", "alt_address_is_prefered"
+ "phone",
+ "phone_desc",
+ "phone2",
+ "phone_desc2",
+ "phone3",
+ "phone_desc3",
+ "raw_phone",
+ "mobile_phone",
+ "email",
+ "alt_address_is_prefered",
)
address = models.TextField(_("Address"), blank=True, default="")
address_complement = models.TextField(
- _("Address complement"), blank=True, default="")
- postal_code = models.CharField(_("Postal code"), max_length=10, null=True,
- blank=True)
- town = models.CharField(_("Town (freeform)"), max_length=150, null=True,
- blank=True)
+ _("Address complement"), blank=True, default=""
+ )
+ postal_code = models.CharField(
+ _("Postal code"), max_length=10, null=True, blank=True
+ )
+ town = models.CharField(_("Town (freeform)"), max_length=150, null=True, blank=True)
precise_town = models.ForeignKey(
- Town, verbose_name=_("Town (precise)"), null=True,
- blank=True)
- country = models.CharField(_("Country"), max_length=30, null=True,
- blank=True)
- alt_address = models.TextField(
- _("Other address: address"), blank=True, default="")
+ Town, verbose_name=_("Town (precise)"), null=True, blank=True
+ )
+ country = models.CharField(_("Country"), max_length=30, null=True, blank=True)
+ alt_address = models.TextField(_("Other address: address"), blank=True, default="")
alt_address_complement = models.TextField(
- _("Other address: address complement"), blank=True, default="")
- alt_postal_code = models.CharField(_("Other address: postal code"),
- max_length=10, null=True, blank=True)
- alt_town = models.CharField(_("Other address: town"), max_length=70,
- null=True, blank=True)
- alt_country = models.CharField(_("Other address: country"),
- max_length=30, null=True, blank=True)
+ _("Other address: address complement"), blank=True, default=""
+ )
+ alt_postal_code = models.CharField(
+ _("Other address: postal code"), max_length=10, null=True, blank=True
+ )
+ alt_town = models.CharField(
+ _("Other address: town"), max_length=70, null=True, blank=True
+ )
+ alt_country = models.CharField(
+ _("Other address: country"), max_length=30, null=True, blank=True
+ )
phone = models.CharField(_("Phone"), max_length=18, null=True, blank=True)
- phone_desc = models.CharField(_("Phone description"), max_length=300,
- null=True, blank=True)
- phone2 = models.CharField(_("Phone description 2"), max_length=18,
- null=True, blank=True)
- phone_desc2 = models.CharField(_("Phone description 2"), max_length=300,
- null=True, blank=True)
- phone3 = models.CharField(_("Phone 3"), max_length=18, null=True,
- blank=True)
- phone_desc3 = models.CharField(_("Phone description 3"), max_length=300,
- null=True, blank=True)
+ phone_desc = models.CharField(
+ _("Phone description"), max_length=300, null=True, blank=True
+ )
+ phone2 = models.CharField(
+ _("Phone description 2"), max_length=18, null=True, blank=True
+ )
+ phone_desc2 = models.CharField(
+ _("Phone description 2"), max_length=300, null=True, blank=True
+ )
+ phone3 = models.CharField(_("Phone 3"), max_length=18, null=True, blank=True)
+ phone_desc3 = models.CharField(
+ _("Phone description 3"), max_length=300, null=True, blank=True
+ )
raw_phone = models.TextField(_("Raw phone"), blank=True, default="")
- mobile_phone = models.CharField(_("Mobile phone"), max_length=18,
- null=True, blank=True)
- email = models.EmailField(
- _("Email"), max_length=300, blank=True, null=True)
+ mobile_phone = models.CharField(
+ _("Mobile phone"), max_length=18, null=True, blank=True
+ )
+ email = models.EmailField(_("Email"), max_length=300, blank=True, null=True)
alt_address_is_prefered = models.BooleanField(
- _("Alternative address is prefered"), default=False)
+ _("Alternative address is prefered"), default=False
+ )
history = HistoricalRecords(inherit=True)
SUB_ADDRESSES = []
@@ -1948,28 +2133,25 @@ class Address(BaseHistorizedItem):
def get_short_html_items(self):
items = []
if self.address:
- items.append(
- """<span class="subadress">{}</span>""".format(self.address))
+ items.append("""<span class="subadress">{}</span>""".format(self.address))
if self.address_complement:
items.append(
"""<span class="subadress-complement">{}</span>""".format(
- self.address_complement))
+ self.address_complement
+ )
+ )
if self.postal_code:
items.append(
- """<span class="postal-code">{}</span>""".format(
- self.postal_code))
+ """<span class="postal-code">{}</span>""".format(self.postal_code)
+ )
if self.precise_town:
items.append(
- """<span class="town">{}</span>""".format(
- self.precise_town.name))
+ """<span class="town">{}</span>""".format(self.precise_town.name)
+ )
elif self.town:
- items.append(
- """<span class="town">{}</span>""".format(
- self.town))
+ items.append("""<span class="town">{}</span>""".format(self.town))
if self.country:
- items.append(
- """<span class="country">{}</span>""".format(
- self.country))
+ items.append("""<span class="country">{}</span>""".format(self.country))
return items
def get_short_html_detail(self):
@@ -1977,9 +2159,7 @@ class Address(BaseHistorizedItem):
items = self.get_short_html_items()
if not items:
items = [
- "<span class='no-address'>{}</span>".format(
- _("No associated address")
- )
+ "<span class='no-address'>{}</span>".format(_("No associated address"))
]
html += "".join(items)
html += """</div>"""
@@ -2041,25 +2221,24 @@ class Address(BaseHistorizedItem):
return lbl
def address_lbl(self):
- lbl = ''
- prefix = ''
+ lbl = ""
+ prefix = ""
if self.alt_address_is_prefered:
- prefix = 'alt_'
- if getattr(self, prefix + 'address'):
- lbl += getattr(self, prefix + 'address')
- if getattr(self, prefix + 'address_complement'):
+ prefix = "alt_"
+ if getattr(self, prefix + "address"):
+ lbl += getattr(self, prefix + "address")
+ if getattr(self, prefix + "address_complement"):
if lbl:
lbl += "\n"
- lbl += getattr(self, prefix + 'address_complement')
- postal_code = getattr(self, prefix + 'postal_code')
- town = getattr(self, prefix + 'town')
+ lbl += getattr(self, prefix + "address_complement")
+ postal_code = getattr(self, prefix + "postal_code")
+ town = getattr(self, prefix + "town")
if postal_code or town:
if lbl:
lbl += "\n"
lbl += "{}{}{}".format(
- postal_code or '',
- " " if postal_code and town else '',
- town or '')
+ postal_code or "", " " if postal_code and town else "", town or ""
+ )
if self.phone:
if lbl:
lbl += "\n"
@@ -2079,11 +2258,10 @@ class Merge(models.Model):
merge_key = models.TextField(_("Merge key"), blank=True, null=True)
merge_candidate = models.ManyToManyField("self", blank=True)
merge_exclusion = models.ManyToManyField("self", blank=True)
- archived = models.NullBooleanField(default=False,
- blank=True, null=True)
+ archived = models.NullBooleanField(default=False, blank=True, null=True)
# 1 for one word similarity, 2 for two word similarity, etc.
MERGE_CLEMENCY = None
- EMPTY_MERGE_KEY = '--'
+ EMPTY_MERGE_KEY = "--"
MERGE_ATTRIBUTE = "name"
class Meta:
@@ -2093,7 +2271,7 @@ class Merge(models.Model):
if self.archived:
return
merge_attr = getattr(self, self.MERGE_ATTRIBUTE)
- self.merge_key = slugify(merge_attr if merge_attr else '')
+ self.merge_key = slugify(merge_attr if merge_attr else "")
if not self.merge_key:
self.merge_key = self.EMPTY_MERGE_KEY
self.merge_key = self.merge_key
@@ -2106,28 +2284,29 @@ class Merge(models.Model):
self.save(merge_key_generated=True)
if not self.pk or self.merge_key == self.EMPTY_MERGE_KEY:
return
- q = self.__class__.objects \
- .exclude(pk=self.pk) \
- .exclude(merge_exclusion=self) \
- .exclude(merge_candidate=self) \
+ q = (
+ self.__class__.objects.exclude(pk=self.pk)
+ .exclude(merge_exclusion=self)
+ .exclude(merge_candidate=self)
.exclude(archived=True)
+ )
if not self.MERGE_CLEMENCY:
q = q.filter(merge_key=self.merge_key)
else:
- subkeys_front = "-".join(
- self.merge_key.split('-')[:self.MERGE_CLEMENCY])
- subkeys_back = "-".join(
- self.merge_key.split('-')[-self.MERGE_CLEMENCY:])
- q = q.filter(Q(merge_key__istartswith=subkeys_front) |
- Q(merge_key__iendswith=subkeys_back))
+ subkeys_front = "-".join(self.merge_key.split("-")[: self.MERGE_CLEMENCY])
+ subkeys_back = "-".join(self.merge_key.split("-")[-self.MERGE_CLEMENCY :])
+ q = q.filter(
+ Q(merge_key__istartswith=subkeys_front)
+ | Q(merge_key__iendswith=subkeys_back)
+ )
for item in q.all():
self.merge_candidate.add(item)
def save(self, *args, **kwargs):
# prevent circular save
merge_key_generated = False
- if 'merge_key_generated' in kwargs:
- merge_key_generated = kwargs.pop('merge_key_generated')
+ if "merge_key_generated" in kwargs:
+ merge_key_generated = kwargs.pop("merge_key_generated")
self.generate_merge_key()
item = super(Merge, self).save(*args, **kwargs)
if not merge_key_generated:
@@ -2142,26 +2321,22 @@ class Merge(models.Model):
self.merge_exclusion.clear()
def merge(self, item, keep_old=False, exclude_fields=None):
- merge_model_objects(self, item, keep_old=keep_old,
- exclude_fields=exclude_fields)
+ merge_model_objects(
+ self, item, keep_old=keep_old, exclude_fields=exclude_fields
+ )
self.generate_merge_candidate()
-
def __get_stats_cache_values(model_name, model_pk):
StatsCache = apps.get_model("ishtar_common", "StatsCache")
- q = StatsCache.objects.filter(
- model=model_name, model_pk=model_pk
- )
+ q = StatsCache.objects.filter(model=model_name, model_pk=model_pk)
nb = q.count()
if nb >= 1:
sc = q.all()[0]
for extra in q.order_by("-id").all()[1:]:
extra.delete()
else:
- sc = StatsCache.objects.create(
- model=model_name, model_pk=model_pk
- )
+ sc = StatsCache.objects.create(model=model_name, model_pk=model_pk)
values = sc.values
if not values:
values = {}
@@ -2184,6 +2359,7 @@ def _update_stats(app, model, model_pk, funcname):
sc.updated = datetime.datetime.now()
sc.save()
+
def update_stats(statscache, item, funcname):
if not settings.USE_BACKGROUND_TASK:
current_values = statscache.values
@@ -2213,19 +2389,17 @@ class DashboardFormItem:
def last_stats_update(self):
model_name = self._meta.app_label + "." + self._meta.model_name
StatsCache = apps.get_model("ishtar_common", "StatsCache")
- q = StatsCache.objects.filter(
- model=model_name, model_pk=self.pk).order_by("-updated")
+ q = StatsCache.objects.filter(model=model_name, model_pk=self.pk).order_by(
+ "-updated"
+ )
if not q.count():
return
return q.all()[0].updated
- def _get_or_set_stats(self, funcname, update=False,
- expected_type=None):
+ def _get_or_set_stats(self, funcname, update=False, expected_type=None):
model_name = self._meta.app_label + "." + self._meta.model_name
StatsCache = apps.get_model("ishtar_common", "StatsCache")
- sc, __ = StatsCache.objects.get_or_create(
- model=model_name, model_pk=self.pk
- )
+ sc, __ = StatsCache.objects.get_or_create(model=model_name, model_pk=self.pk)
if not update:
values = sc.values
if funcname not in values:
@@ -2243,62 +2417,62 @@ class DashboardFormItem:
return values
@classmethod
- def get_periods(cls, slice='month', fltr={}, date_source='creation'):
- date_var = date_source + '_date'
- q = cls.objects.filter(**{date_var + '__isnull': False})
+ def get_periods(cls, slice="month", fltr={}, date_source="creation"):
+ date_var = date_source + "_date"
+ q = cls.objects.filter(**{date_var + "__isnull": False})
if fltr:
q = q.filter(**fltr)
- if slice == 'year':
- return [res[date_var].year for res in list(q.values(date_var)
- .annotate(
- Count("id")).order_by())]
- elif slice == 'month':
- return [(res[date_var].year, res[date_var].month)
- for res in list(q.values(date_var)
- .annotate(Count("id")).order_by())]
+ if slice == "year":
+ return [
+ res[date_var].year
+ for res in list(q.values(date_var).annotate(Count("id")).order_by())
+ ]
+ elif slice == "month":
+ return [
+ (res[date_var].year, res[date_var].month)
+ for res in list(q.values(date_var).annotate(Count("id")).order_by())
+ ]
return []
@classmethod
- def get_by_year(cls, year, fltr={}, date_source='creation'):
- date_var = date_source + '_date'
- q = cls.objects.filter(**{date_var + '__isnull': False})
+ def get_by_year(cls, year, fltr={}, date_source="creation"):
+ date_var = date_source + "_date"
+ q = cls.objects.filter(**{date_var + "__isnull": False})
if fltr:
q = q.filter(**fltr)
- return q.filter(
- **{date_var + '__year': year}).order_by('pk').distinct('pk')
+ return q.filter(**{date_var + "__year": year}).order_by("pk").distinct("pk")
@classmethod
- def get_by_month(cls, year, month, fltr={}, date_source='creation'):
- date_var = date_source + '_date'
- q = cls.objects.filter(**{date_var + '__isnull': False})
+ def get_by_month(cls, year, month, fltr={}, date_source="creation"):
+ date_var = date_source + "_date"
+ q = cls.objects.filter(**{date_var + "__isnull": False})
if fltr:
q = q.filter(**fltr)
- q = q.filter(
- **{date_var + '__year': year, date_var + '__month': month})
- return q.order_by('pk').distinct('pk')
+ q = q.filter(**{date_var + "__year": year, date_var + "__month": month})
+ return q.order_by("pk").distinct("pk")
@classmethod
def get_total_number(cls, fltr=None):
q = cls.objects
if fltr:
q = q.filter(**fltr)
- return q.order_by('pk').distinct('pk').count()
+ return q.order_by("pk").distinct("pk").count()
class DocumentItem:
ALT_NAMES = {
- 'documents__image__isnull':
- SearchAltName(
- pgettext_lazy("key for text search", "has-image"),
- 'documents__image__isnull'),
- 'documents__associated_url__isnull':
- SearchAltName(
- pgettext_lazy("key for text search", "has-url"),
- 'documents__associated_url__isnull'),
- 'documents__associated_file__isnull':
- SearchAltName(
- pgettext_lazy("key for text search", "has-attached-file"),
- 'documents__associated_file__isnull'),
+ "documents__image__isnull": SearchAltName(
+ pgettext_lazy("key for text search", "has-image"),
+ "documents__image__isnull",
+ ),
+ "documents__associated_url__isnull": SearchAltName(
+ pgettext_lazy("key for text search", "has-url"),
+ "documents__associated_url__isnull",
+ ),
+ "documents__associated_file__isnull": SearchAltName(
+ pgettext_lazy("key for text search", "has-attached-file"),
+ "documents__associated_file__isnull",
+ ),
}
def public_representation(self):
@@ -2313,29 +2487,35 @@ class DocumentItem:
@property
def images(self):
- if not hasattr(self, 'documents'):
+ if not hasattr(self, "documents"):
Document = apps.get_model("ishtar_common", "Document")
return Document.objects.none()
- return self.documents.filter(
- image__isnull=False).exclude(image="").order_by("pk")
+ return (
+ self.documents.filter(image__isnull=False).exclude(image="").order_by("pk")
+ )
@property
def images_without_main_image(self):
- if not hasattr(self, 'main_image') or not hasattr(self, 'documents'):
+ if not hasattr(self, "main_image") or not hasattr(self, "documents"):
return self.images
if not self.main_image:
- return self.documents.filter(
- image__isnull=False).exclude(
- image="").order_by("pk")
- return self.documents.filter(
- image__isnull=False).exclude(
- image="").exclude(pk=self.main_image.pk).order_by("pk")
+ return (
+ self.documents.filter(image__isnull=False)
+ .exclude(image="")
+ .order_by("pk")
+ )
+ return (
+ self.documents.filter(image__isnull=False)
+ .exclude(image="")
+ .exclude(pk=self.main_image.pk)
+ .order_by("pk")
+ )
@property
def pdf_attached(self):
for document in self.documents.filter(
- Q(associated_file__isnull=False) |
- Q(source__associated_file__isnull=False)).all():
+ Q(associated_file__isnull=False) | Q(source__associated_file__isnull=False)
+ ).all():
return document.pdf_attached
def get_extra_actions(self, request):
@@ -2348,22 +2528,21 @@ class DocumentItem:
except AttributeError:
actions = []
- if not hasattr(self, 'SLUG'):
+ if not hasattr(self, "SLUG"):
return actions
- can_add_doc = self.can_do(request, 'add_document')
+ can_add_doc = self.can_do(request, "add_document")
if can_add_doc and (
- not hasattr(self, "is_locked") or
- not self.is_locked(request.user)):
+ not hasattr(self, "is_locked") or not self.is_locked(request.user)
+ ):
actions += [
(
- reverse("create-document") + "?{}={}".format(
- self.SLUG, self.pk),
+ reverse("create-document") + "?{}={}".format(self.SLUG, self.pk),
_("Add document/image"),
"fa fa-plus",
_("doc./image"),
"",
- False
+ False,
)
]
return actions
@@ -2378,27 +2557,27 @@ def clean_duplicate_association(document, related_item, action):
return
if class_name == "Find":
for cr in document.context_records.filter(
- base_finds__find__pk=related_item.pk).all():
+ base_finds__find__pk=related_item.pk
+ ).all():
document.context_records.remove(cr)
for ope in document.operations.filter(
- context_record__base_finds__find__pk=related_item.pk).all():
+ context_record__base_finds__find__pk=related_item.pk
+ ).all():
document.operations.remove(ope)
return
if class_name == "ContextRecord":
- for ope in document.operations.filter(
- context_record__pk=related_item.pk).all():
+ for ope in document.operations.filter(context_record__pk=related_item.pk).all():
document.operations.remove(ope)
- if document.finds.filter(
- base_finds__context_record=related_item.pk).count():
+ if document.finds.filter(base_finds__context_record=related_item.pk).count():
document.context_records.remove(related_item)
return
if class_name == "Operation":
- if document.context_records.filter(
- operation=related_item.pk).count():
+ if document.context_records.filter(operation=related_item.pk).count():
document.operations.remove(related_item)
return
if document.finds.filter(
- base_finds__context_record__operation=related_item.pk).count():
+ base_finds__context_record__operation=related_item.pk
+ ).count():
document.operations.remove(related_item)
return
@@ -2422,12 +2601,10 @@ def document_attached_changed(sender, **kwargs):
return
for item in items:
- clean_duplicate_association(instance, item,
- kwargs.get("action", None))
+ clean_duplicate_association(instance, item, kwargs.get("action", None))
for doc in item.documents.all():
doc.regenerate_all_ids()
- q = item.documents.filter(
- image__isnull=False).exclude(image='')
+ q = item.documents.filter(image__isnull=False).exclude(image="")
if item.main_image:
if q.filter(pk=item.main_image.pk).count():
return
@@ -2438,7 +2615,7 @@ def document_attached_changed(sender, **kwargs):
if not q.count():
return
# by default get the lowest pk
- item.main_image = q.order_by('pk').all()[0]
+ item.main_image = q.order_by("pk").all()[0]
item.skip_history_when_saving = True
item.save()
@@ -2448,22 +2625,23 @@ class QuickAction:
Quick action available from tables
"""
- def __init__(self, url, icon_class='', text='', target=None, rights=None,
- module=None):
+ def __init__(
+ self, url, icon_class="", text="", target=None, rights=None, module=None
+ ):
self.url = url
self.icon_class = icon_class
self.text = text
self.rights = rights
self.target = target
self.module = module
- assert self.target in ('one', 'many', None)
+ assert self.target in ("one", "many", None)
def is_available(self, user, session=None, obj=None):
if self.module and not getattr(get_current_profile(), self.module):
return False
if not self.rights: # no restriction
return True
- if not user or not hasattr(user, 'ishtaruser') or not user.ishtaruser:
+ if not user or not hasattr(user, "ishtaruser") or not user.ishtaruser:
return False
user = user.ishtaruser
@@ -2491,8 +2669,16 @@ class QuickAction:
class DynamicRequest:
- def __init__(self, label, app_name, model_name, form_key, search_key,
- type_query, search_query):
+ def __init__(
+ self,
+ label,
+ app_name,
+ model_name,
+ form_key,
+ search_key,
+ type_query,
+ search_query,
+ ):
self.label = label
self.form_key = form_key
self.search_key = search_key
@@ -2509,36 +2695,34 @@ class DynamicRequest:
fields = {}
for item in self.get_all_types().all():
fields[self.form_key + "-" + item.txt_idx] = forms.CharField(
- label=str(self.label) + " " + str(item),
- required=False
+ label=str(self.label) + " " + str(item), required=False
)
return fields
def get_extra_query(self, slug):
- return {
- self.type_query: slug
- }
+ return {self.type_query: slug}
def get_alt_names(self):
alt_names = {}
for item in self.get_all_types().all():
alt_names[self.form_key + "-" + item.txt_idx] = SearchAltName(
- self.search_key + "-" + item.txt_idx, self.search_query,
- self.get_extra_query(item.txt_idx), distinct_query=True
+ self.search_key + "-" + item.txt_idx,
+ self.search_query,
+ self.get_extra_query(item.txt_idx),
+ distinct_query=True,
)
return alt_names
class SpatialReferenceSystem(GeneralType):
order = models.IntegerField(_("Order"), default=10)
- auth_name = models.CharField(
- _("Authority name"), default='EPSG', max_length=256)
+ auth_name = models.CharField(_("Authority name"), default="EPSG", max_length=256)
srid = models.IntegerField(_("Authority SRID"))
class Meta:
verbose_name = _("Spatial reference system")
verbose_name_plural = _("Spatial reference systems")
- ordering = ('label',)
+ ordering = ("label",)
@classmethod
def get_documentation_string(cls):
@@ -2557,37 +2741,46 @@ post_delete.connect(post_save_cache, sender=SpatialReferenceSystem)
class GeoItem(models.Model):
- GEO_SOURCE = (
- ('T', _("Town")), ('P', _("Precise")), ('M', _("Polygon"))
- )
+ GEO_SOURCE = (("T", _("Town")), ("P", _("Precise")), ("M", _("Polygon")))
# gis
- x = models.FloatField(_('X'), blank=True, null=True)
- y = models.FloatField(_('Y'), blank=True, null=True)
- z = models.FloatField(_('Z'), blank=True, null=True)
- estimated_error_x = models.FloatField(_('Estimated error for X'),
- blank=True, null=True)
- estimated_error_y = models.FloatField(_('Estimated error for Y'),
- blank=True, null=True)
- estimated_error_z = models.FloatField(_('Estimated error for Z'),
- blank=True, null=True)
+ x = models.FloatField(_("X"), blank=True, null=True)
+ y = models.FloatField(_("Y"), blank=True, null=True)
+ z = models.FloatField(_("Z"), blank=True, null=True)
+ estimated_error_x = models.FloatField(
+ _("Estimated error for X"), blank=True, null=True
+ )
+ estimated_error_y = models.FloatField(
+ _("Estimated error for Y"), blank=True, null=True
+ )
+ estimated_error_z = models.FloatField(
+ _("Estimated error for Z"), blank=True, null=True
+ )
spatial_reference_system = models.ForeignKey(
- SpatialReferenceSystem, verbose_name=_("Spatial Reference System"),
- blank=True, null=True)
+ SpatialReferenceSystem,
+ verbose_name=_("Spatial Reference System"),
+ blank=True,
+ null=True,
+ )
point = models.PointField(_("Point"), blank=True, null=True, dim=3)
point_2d = models.PointField(_("Point (2D)"), blank=True, null=True)
point_source = models.CharField(
- _("Point source"), choices=GEO_SOURCE, max_length=1, blank=True,
- null=True)
+ _("Point source"), choices=GEO_SOURCE, max_length=1, blank=True, null=True
+ )
point_source_item = models.CharField(
- _("Point source item"), max_length=100, blank=True, null=True)
- multi_polygon = models.MultiPolygonField(_("Multi polygon"), blank=True,
- null=True)
+ _("Point source item"), max_length=100, blank=True, null=True
+ )
+ multi_polygon = models.MultiPolygonField(_("Multi polygon"), blank=True, null=True)
multi_polygon_source = models.CharField(
- _("Multi-polygon source"), choices=GEO_SOURCE, max_length=1,
- blank=True, null=True)
+ _("Multi-polygon source"),
+ choices=GEO_SOURCE,
+ max_length=1,
+ blank=True,
+ null=True,
+ )
multi_polygon_source_item = models.CharField(
- _("Multi polygon source item"), max_length=100, blank=True, null=True)
+ _("Multi polygon source item"), max_length=100, blank=True, null=True
+ )
GEO_LABEL = ""
@@ -2621,13 +2814,18 @@ class GeoItem(models.Model):
if not self.point_2d:
return ""
profile = get_current_profile()
- if not profile.display_srs or not profile.display_srs.srid or (
+ if (
+ not profile.display_srs
+ or not profile.display_srs.srid
+ or (
profile.display_srs == self.spatial_reference_system
- and self.x and self.y):
+ and self.x
+ and self.y
+ )
+ ):
x, y = self.x, self.y
else:
- point = self.point_2d.transform(profile.display_srs.srid,
- clone=True)
+ point = self.point_2d.transform(profile.display_srs.srid, clone=True)
x, y = point.x, point.y
if rounded:
return round(x, 5), round(y, 5)
@@ -2641,40 +2839,38 @@ class GeoItem(models.Model):
return profile.display_srs
def get_precise_points(self):
- if self.point_source == 'P' and self.point_2d:
+ if self.point_source == "P" and self.point_2d:
return self.point_2d, self.point, self.point_source_item
def get_precise_polygons(self):
- if self.multi_polygon_source == 'P' and self.multi_polygon:
+ if self.multi_polygon_source == "P" and self.multi_polygon:
return self.multi_polygon, self.multi_polygon_source_item
def most_precise_geo(self):
- if self.point_source == 'M':
- return 'multi_polygon'
+ if self.point_source == "M":
+ return "multi_polygon"
current_source = str(self.__class__._meta.verbose_name)
- if self.multi_polygon_source_item == current_source \
- and (self.multi_polygon_source == "P" or
- (self.point_source_item != current_source and
- self.point_source != "P")):
- return 'multi_polygon'
- if self.point_source_item == current_source \
- and self.point_source == 'P':
- return 'point'
- if self.multi_polygon_source == 'P':
- return 'multi_polygon'
- if self.point_source == 'P':
- return 'point'
+ if self.multi_polygon_source_item == current_source and (
+ self.multi_polygon_source == "P"
+ or (self.point_source_item != current_source and self.point_source != "P")
+ ):
+ return "multi_polygon"
+ if self.point_source_item == current_source and self.point_source == "P":
+ return "point"
+ if self.multi_polygon_source == "P":
+ return "multi_polygon"
+ if self.point_source == "P":
+ return "point"
if self.multi_polygon:
- return 'multi_polygon'
+ return "multi_polygon"
if self.point_2d:
- return 'point'
+ return "point"
def geo_point_source(self):
if not self.point_source:
return ""
return "{} - {}".format(
- dict(self.GEO_SOURCE)[self.point_source],
- self.point_source_item
+ dict(self.GEO_SOURCE)[self.point_source], self.point_source_item
)
def geo_polygon_source(self):
@@ -2682,31 +2878,33 @@ class GeoItem(models.Model):
return ""
return "{} - {}".format(
dict(self.GEO_SOURCE)[self.multi_polygon_source],
- self.multi_polygon_source_item
+ self.multi_polygon_source_item,
)
def _geojson_serialize(self, geom_attr):
if not hasattr(self, geom_attr):
return ""
- cached_label_key = 'cached_label'
+ cached_label_key = "cached_label"
if self.GEO_LABEL:
cached_label_key = self.GEO_LABEL
if getattr(self, "CACHED_LABELS", None):
cached_label_key = self.CACHED_LABELS[-1]
geojson = serialize(
- 'geojson',
+ "geojson",
self.__class__.objects.filter(pk=self.pk),
- geometry_field=geom_attr, fields=(cached_label_key,))
+ geometry_field=geom_attr,
+ fields=(cached_label_key,),
+ )
geojson_dct = json.loads(geojson)
profile = get_current_profile()
precision = profile.point_precision
- features = geojson_dct.pop('features')
+ features = geojson_dct.pop("features")
for idx in range(len(features)):
feature = features[idx]
- lbl = feature['properties'].pop(cached_label_key)
- feature['properties']['name'] = lbl
- feature['properties']['id'] = self.pk
+ lbl = feature["properties"].pop(cached_label_key)
+ feature["properties"]["name"] = lbl
+ feature["properties"]["id"] = self.pk
if precision is not None:
geom_type = feature["geometry"].get("type", None)
if geom_type == "Point":
@@ -2714,20 +2912,20 @@ class GeoItem(models.Model):
round(coord, precision)
for coord in feature["geometry"]["coordinates"]
]
- geojson_dct['features'] = features
- geojson_dct['link_template'] = simple_link_to_window(self).replace(
- '999999', '<pk>'
+ geojson_dct["features"] = features
+ geojson_dct["link_template"] = simple_link_to_window(self).replace(
+ "999999", "<pk>"
)
geojson = json.dumps(geojson_dct)
return geojson
@property
def point_2d_geojson(self):
- return self._geojson_serialize('point_2d')
+ return self._geojson_serialize("point_2d")
@property
def multi_polygon_geojson(self):
- return self._geojson_serialize('multi_polygon')
+ return self._geojson_serialize("multi_polygon")
class ImageContainerModel:
@@ -2742,10 +2940,12 @@ class ImageContainerModel:
class CompleteIdentifierItem(models.Model, ImageContainerModel):
HAS_QR_CODE = True
complete_identifier = models.TextField(
- _("Complete identifier"), blank=True, default="")
+ _("Complete identifier"), blank=True, default=""
+ )
custom_index = models.IntegerField("Custom index", blank=True, null=True)
- qrcode = models.ImageField(upload_to=get_image_path, blank=True, null=True,
- max_length=255)
+ qrcode = models.ImageField(
+ upload_to=get_image_path, blank=True, null=True, max_length=255
+ )
class Meta:
abstract = True
@@ -2773,16 +2973,20 @@ class CompleteIdentifierItem(models.Model, ImageContainerModel):
tiny_url = TinyUrl()
tiny_url.link = url
tiny_url.save()
- short_url = scheme + "://" + site.domain + reverse(
- 'tiny-redirect', args=[tiny_url.get_short_id()])
+ short_url = (
+ scheme
+ + "://"
+ + site.domain
+ + reverse("tiny-redirect", args=[tiny_url.get_short_id()])
+ )
qr = pyqrcode.create(short_url, version=settings.ISHTAR_QRCODE_VERSION)
tmpdir_created = False
if not tmpdir:
tmpdir = tempfile.mkdtemp("-qrcode")
tmpdir_created = True
- filename = tmpdir + os.sep + 'qrcode.png'
+ filename = tmpdir + os.sep + "qrcode.png"
qr.png(filename, scale=settings.ISHTAR_QRCODE_SCALE)
- with open(filename, 'rb') as qrfile:
+ with open(filename, "rb") as qrfile:
self.qrcode.save("qrcode.png", File(qrfile))
self.skip_history_when_saving = True
self._no_move = True
@@ -2794,13 +2998,12 @@ class CompleteIdentifierItem(models.Model, ImageContainerModel):
SLUG = getattr(self, "SLUG", None)
if not SLUG:
return ""
- complete_identifier = get_generated_id(
- SLUG + "_complete_identifier", self)
+ complete_identifier = get_generated_id(SLUG + "_complete_identifier", self)
if complete_identifier:
return complete_identifier
- cached_label_key = 'cached_label'
- if getattr(self, 'GEO_LABEL', None):
- cached_label_key = getattr(self, 'GEO_LABEL', None)
+ cached_label_key = "cached_label"
+ if getattr(self, "GEO_LABEL", None):
+ cached_label_key = getattr(self, "GEO_LABEL", None)
if hasattr(self, "CACHED_COMPLETE_ID"):
cached_label_key = self.CACHED_COMPLETE_ID
if not cached_label_key:
@@ -2829,8 +3032,7 @@ class CompleteIdentifierItem(models.Model, ImageContainerModel):
return getattr(self, "get_index_" + key)()
model = self.__class__
try:
- self_keys = set(
- list(model.objects.filter(pk=self.pk).values_list(*keys)))
+ self_keys = set(list(model.objects.filter(pk=self.pk).values_list(*keys)))
except Exception: # bad settings - not managed here
return
if len(self_keys) != 1: # key is not distinct
@@ -2846,12 +3048,12 @@ class CompleteIdentifierItem(models.Model, ImageContainerModel):
for idx, key in enumerate(keys):
q = q.filter(**{key: self_keys[idx]})
try:
- r = q.aggregate(max_index=Max('custom_index'))
+ r = q.aggregate(max_index=Max("custom_index"))
except Exception: # bad settings
return
- if not r['max_index']:
+ if not r["max_index"]:
return 1
- return r['max_index'] + 1
+ return r["max_index"] + 1
def save(self, *args, **kwargs):
super(CompleteIdentifierItem, self).save(*args, **kwargs)
@@ -2887,8 +3089,8 @@ class SearchVectorConfig:
self.func = func
def format(self, value):
- if value == 'None':
- value = ''
+ if value == "None":
+ value = ""
if not self.func:
return [value]
return self.func(value)
@@ -2898,11 +3100,12 @@ class ShortMenuItem:
"""
Item available in the short menu
"""
+
UP_MODEL_QUERY = {}
@classmethod
def get_short_menu_class(cls, pk):
- return ''
+ return ""
@property
def short_class_name(self):
@@ -2914,6 +3117,7 @@ class MainItem(ShortMenuItem):
Item with quick actions available from tables
Extra actions are available from sheets
"""
+
QUICK_ACTIONS = []
@classmethod
@@ -2925,10 +3129,14 @@ class MainItem(ShortMenuItem):
for action in cls.QUICK_ACTIONS:
if not action.is_available(user, session=session, obj=obj):
continue
- qas.append([action.base_url,
- mark_safe(action.text),
- mark_safe(action.rendered_icon),
- action.target or ""])
+ qas.append(
+ [
+ action.base_url,
+ mark_safe(action.text),
+ mark_safe(action.rendered_icon),
+ action.target or "",
+ ]
+ )
return qas
@classmethod
@@ -2947,21 +3155,21 @@ class MainItem(ShortMenuItem):
self.save()
def get_extra_actions(self, request):
- if not hasattr(self, 'SLUG'):
+ if not hasattr(self, "SLUG"):
return []
actions = []
if request.user.is_superuser and hasattr(self, "auto_external_id"):
actions += [
(
- reverse("regenerate-external-id") + "?{}={}".format(
- self.SLUG, self.pk),
+ reverse("regenerate-external-id")
+ + "?{}={}".format(self.SLUG, self.pk),
_("Regenerate ID"),
"fa fa-key",
_("regen."),
"btn-info",
True,
- 200
+ 200,
)
]