From f2cd1c1326d5fbfbd39a7b2a30a278dd66883c3c Mon Sep 17 00:00:00 2001 From: Étienne Loks Date: Mon, 24 Apr 2023 11:21:01 +0200 Subject: Document -> Town/Area: sheet --- ishtar_common/models_common.py | 2660 ++++++++++++++++++++-------------------- 1 file changed, 1353 insertions(+), 1307 deletions(-) (limited to 'ishtar_common/models_common.py') diff --git a/ishtar_common/models_common.py b/ishtar_common/models_common.py index bc4a0549b..cacb5664e 100644 --- a/ishtar_common/models_common.py +++ b/ishtar_common/models_common.py @@ -2056,7 +2056,11 @@ class DocumentItem: except AttributeError: actions = [] - if not hasattr(self, "SLUG"): + if not hasattr(self, "SLUG") or not hasattr(self, "can_do"): + return actions + + if not hasattr(self, "can_do"): + print(f"**WARNING** can_do not implemented for {self.__class__}") return actions can_add_doc = self.can_do(request, "add_document") @@ -3092,1475 +3096,1517 @@ class GeographicItem(models.Model): return lst -class TownManager(models.Manager): - def get_by_natural_key(self, numero_insee, year): - return self.get(numero_insee=numero_insee, year=year) +class SerializeItem: + SERIALIZE_EXCLUDE = ["search_vector"] + SERIALIZE_PROPERTIES = [ + "external_id", + "multi_polygon_geojson", + "point_2d_geojson", + "images_number", + "json_sections", + ] + SERIALIZE_CALL = {} + SERIALIZE_DATES = [] + SERIALIZATION_FILES = [] + SERIALIZE_STRING = [] + def full_serialize(self, search_model=None, recursion=False) -> dict: + """ + API serialization + :return: data dict + """ + full_result = {} + serialize_fields = [] -class Town(GeographicItem, Imported, DocumentItem, models.Model): - SLUG = "town" - name = models.CharField(_("Name"), max_length=100) - surface = models.IntegerField(_("Surface (m2)"), blank=True, null=True) - center = models.PointField( - _("Localisation"), srid=settings.SRID, blank=True, null=True - ) - limit = models.MultiPolygonField(_("Limit"), blank=True, null=True) - numero_insee = models.CharField("Code commune (numéro INSEE)", max_length=120) - departement = models.ForeignKey( - Department, - verbose_name=_("Department"), - on_delete=models.SET_NULL, - null=True, - blank=True, - ) - year = models.IntegerField( - _("Year of creation"), - null=True, - blank=True, - help_text=_( - "Filling this field is relevant to distinguish old towns " "from new towns." - ), - ) - children = models.ManyToManyField( - "Town", verbose_name=_("Town children"), blank=True, related_name="parents" - ) - cached_label = models.CharField( - _("Cached name"), max_length=500, null=True, blank=True, db_index=True - ) - documents = models.ManyToManyField( - "Document", related_name="towns", verbose_name=_("Documents"), blank=True - ) - main_image = models.ForeignKey( - "Document", - related_name="main_image_towns", - on_delete=models.SET_NULL, - verbose_name=_("Main image"), - blank=True, - null=True, - ) - objects = TownManager() + exclude = [] + if search_model: + exclude = [sf.key for sf in search_model.sheet_filters.distinct().all()] - class Meta: - verbose_name = _("Town") - verbose_name_plural = _("Towns") - if settings.COUNTRY == "fr": - ordering = ["numero_insee"] - unique_together = (("numero_insee", "year"),) - ADMIN_SECTION = _("Geography") + no_geodata = False + for prop in ("main_geodata", "geodata", "geodata_list"): + if prop in self.SERIALIZE_EXCLUDE or prop in exclude: + no_geodata = True + break - def natural_key(self): - return (self.numero_insee, self.year) + for field in self._meta.get_fields(): + field_name = field.name + if field_name in self.SERIALIZE_EXCLUDE or field_name in exclude: + continue + if field.many_to_one or field.one_to_one: + try: + value = getattr(self, field_name) + except (MultipleObjectsReturned, ObjectDoesNotExist): + value = None + if value: + if ( + field_name not in self.SERIALIZE_STRING + and hasattr(value, "full_serialize") + and not recursion + ): + # print(field.name, self.__class__, self) + if field_name == "main_geodata" and no_geodata: + continue + value = value.full_serialize(search_model, recursion=True) + elif field_name in self.SERIALIZATION_FILES: + try: + value = {"url": value.url} + except ValueError: + value = None + else: + value = str(value) + else: + value = None + full_result[field_name] = value + if field_name == "main_geodata": + full_result["geodata_list"] = [value] + elif field.many_to_many: + values = getattr(self, field_name) + if values.count(): + first_value = values.all()[0] + if ( + field_name not in self.SERIALIZE_STRING + and hasattr(first_value, "full_serialize") + and not recursion + ): + # print(field.name, self.__class__, self) + values = [ + v.full_serialize(search_model, recursion=True) + for v in values.all() + ] + else: + if first_value in self.SERIALIZATION_FILES: + values = [] + for v in values: + try: + values.append({"url": v.url}) + except ValueError: + pass + else: + values = [str(v) for v in values.all()] + else: + values = [] + full_result[field_name] = values + else: + if field_name in self.SERIALIZATION_FILES: + value = getattr(self, field_name) + try: + value = {"url": value.url} + except ValueError: + value = None + full_result[field.name] = value + else: + serialize_fields.append(field_name) - def history_compress(self): - return {"numero_insee": self.numero_insee, "year": self.year or ""} + result = json.loads(serialize("json", [self], fields=serialize_fields)) + full_result.update(result[0]["fields"]) + for prop in self.SERIALIZE_PROPERTIES: + if prop in self.SERIALIZE_EXCLUDE or prop in exclude: + continue + if hasattr(self, prop) and prop not in full_result: + full_result[prop] = getattr(self, prop) + if "point_2d_geojson" in full_result: + full_result["point_2d"] = True + if "multi_polygon_geojson" in full_result: + full_result["multi_polygon"] = True + for prop in self.SERIALIZE_DATES: + if prop in self.SERIALIZE_EXCLUDE or prop in exclude: + continue + dt = getattr(self, prop) or "" + if dt: + dt = human_date(dt) + full_result[prop] = dt + for k in self.SERIALIZE_CALL: + if k in self.SERIALIZE_EXCLUDE or k in exclude: + continue + full_result[k] = getattr(self, self.SERIALIZE_CALL[k])() + full_result["SLUG"] = self.SLUG + full_result["pk"] = f"external_{self.pk}" + full_result["id"] = f"external_{self.id}" + return full_result - @classmethod - def get_documentation_string(cls): - """ - Used for automatic documentation generation - """ - return "**name** {}, **numero_insee** {}, **cached_label** {}".format( - _("Name"), "Code commune (numéro INSEE)", _("Cached name") - ) + def get_associated_main_item_list(self, attr, model) -> list: + items = getattr(self, attr) + if not items.count(): + return [] + lst = [] + table_cols = model.TABLE_COLS + if callable(table_cols): + table_cols = table_cols() + for colname in table_cols: + if colname in model.COL_LABELS: + lst.append(str(model.COL_LABELS[colname])) + else: + lst.append(model._meta.get_field(colname).verbose_name) + lst = [lst] + for values in items.values_list(*table_cols): + lst.append(["-" if v is None else v for v in values]) + return lst - def get_values(self, prefix="", **kwargs): - return { - prefix or "label": str(self), - prefix + "name": self.name, - prefix + "numero_insee": self.numero_insee, - } - @classmethod - def history_decompress(cls, full_value, create=False): - if not full_value: - return [] - res = [] - for value in full_value: - try: - res.append( - cls.objects.get( - numero_insee=value["numero_insee"], year=value["year"] or None - ) - ) - except cls.DoesNotExist: - continue - return res +class ShortMenuItem: + """ + Item available in the short menu + """ - def __str__(self): - return self.cached_label or "" + UP_MODEL_QUERY = {} - def geodata_child_item_queries(self): - return [self.sites, self.operations] + @classmethod + def get_short_menu_class(cls, pk): + return "" @property - def label_with_areas(self): - label = [self.name] - if self.numero_insee: - label.append("({})".format(self.numero_insee)) - for area in self.areas.all(): - label.append(" - ") - label.append(area.full_label) - return " ".join(label) + def short_class_name(self): + return "" - def generate_geo(self, force=False): - force = self.generate_limit(force=force) - self.generate_center(force=force) - self.generate_area(force=force) - def generate_limit(self, force=False): - if not force and self.limit: - return - parents = None - if not self.parents.count(): - return - for parent in self.parents.all(): - if not parent.limit: - return - if not parents: - parents = parent.limit - else: - parents = parents.union(parent.limit) - # if union is a simple polygon make it a multi - if "MULTI" not in parents.wkt: - parents = parents.wkt.replace("POLYGON", "MULTIPOLYGON(") + ")" - if not parents: - return - self.limit = parents - self.save() - return True +class MainItem(ShortMenuItem, SerializeItem): + """ + Item with quick actions available from tables + Extra actions are available from sheets + Manage cascade updated, has_changed and no_post_process + """ - def generate_center(self, force=False): - if not force and (self.center or not self.limit): - return - self.center = self.limit.centroid - if not self.center: - return False - self.save() - return True + QUICK_ACTIONS = [] + SLUG = "" + DOWN_MODEL_UPDATE = [] + INITIAL_VALUES = [] # list of field checkable if changed on save - def generate_area(self, force=False): - if not force and (self.surface or not self.limit): - return - try: - surface = self.limit.transform(settings.SURFACE_SRID, clone=True).area - except GDALException: - return False - if surface > 214748364 or not surface: - return False - self.surface = surface - self.save() - return True + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._initial_values = {} + for field_name in self.INITIAL_VALUES: + self._initial_values[field_name] = getattr(self, field_name) - def update_town_code(self): - if not self.numero_insee or not self.children.count() or not self.year: + def has_changed(self): + """ + Check which field have a changed value between INITIAL_VALUES + :return: list of changed fields + """ + changed = [] + for field_name in self._initial_values: + value = getattr(self, field_name) + if self._initial_values[field_name] != value: + changed.append(field_name) + self._initial_values[field_name] = value + return changed + + def cascade_update(self, changed=True): + if not changed: return - old_num = self.numero_insee[:] - numero = old_num.split("-")[0] - base_insee = "{}-{}".format(numero, self.year) - self.numero_insee = base_insee - idx = 0 - while Town.objects.filter( - year=self.year, numero_insee=self.numero_insee).exclude( - pk=self.pk).count(): - idx += 1 - self.numero_insee = base_insee + "-" + str(idx) - if self.numero_insee != old_num: - return True + for down_model in self.DOWN_MODEL_UPDATE: + if not settings.USE_BACKGROUND_TASK: + rel = getattr(self, down_model) + if hasattr(rel.model, "need_update"): + rel.update(need_update=True) + continue + for item in getattr(self, down_model).all(): + item.cached_label_changed() + if hasattr(item, "main_geodata"): + item.post_save_geo() - def _get_base_image_path(self): - if self.numero_insee and len(self.numero_insee) == 5: - prefix = self.numero_insee[:2] - return f"{self.SLUG}/{prefix}" - return self.SLUG + def no_post_process(self): + self.skip_history_when_saving = True + self._cached_label_checked = True + self._post_saved_geo = True + self._external_id_changed = False + self._search_updated = True + self._no_move = True - def _generate_cached_label(self): - cached_label = self.name - if settings.COUNTRY == "fr" and self.numero_insee: - dpt_len = 2 - if ( - self.numero_insee.startswith("97") - or self.numero_insee.startswith("98") - or self.numero_insee[0] - not in ("0", "1", "2", "3", "4", "5", "6", "7", "8", "9") - ): - dpt_len = 3 - cached_label = "%s - %s" % (self.name, self.numero_insee[:dpt_len]) - if self.year and self.children.count(): - cached_label += " ({})".format(self.year) - return cached_label + @classmethod + def app_label(cls): + return cls._meta.app_label + @classmethod + def model_name(cls): + return cls._meta.model_name -def post_save_town(sender, **kwargs): - cached_label_changed(sender, **kwargs) - town = kwargs["instance"] - town.generate_geo() - if town.update_town_code(): - town.save() + @classmethod + def class_verbose_name(cls): + return cls._meta.verbose_name + @classmethod + def get_columns(cls, table_cols_attr="TABLE_COLS", dict_col_labels=True): + """ + :param table_cols_attr: "TABLE_COLS" if not specified + :param dict_col_labels: (default: True) if set to False return list matching + with table_cols list + :return: (table_cols, table_col_labels) + """ + return get_columns_from_class(cls, table_cols_attr=table_cols_attr, + dict_col_labels=dict_col_labels) -post_save.connect(post_save_town, sender=Town) -m2m_changed.connect(geodata_attached_changed, sender=Town.geodata.through) -m2m_changed.connect(document_attached_changed, sender=Town.documents.through) + def get_search_url(self): + if self.SLUG: + try: + return reverse(self.SLUG + "_search") + except NoReverseMatch: + pass + @classmethod + def get_quick_actions(cls, user, session=None, obj=None): + """ + Get a list of (url, title, icon, target) actions for an user + """ + qas = [] + for action in cls.QUICK_ACTIONS: + if not action.is_available(user, session=session, obj=obj): + continue + qas.append( + [ + action.base_url, + mark_safe(action.text), + mark_safe(action.rendered_icon), + action.target or "", + action.is_popup, + ] + ) + return qas -def town_child_changed(sender, **kwargs): - town = kwargs["instance"] - if town.update_town_code(): - town.save() + @classmethod + def get_quick_action_by_url(cls, url): + for action in cls.QUICK_ACTIONS: + if action.url == url: + return action + def regenerate_external_id(self): + if not hasattr(self, "external_id"): + return + self.skip_history_when_saving = True + self._no_move = True + self.external_id = "" + self.auto_external_id = True + self.save() -m2m_changed.connect(town_child_changed, sender=Town.children.through) + def cached_label_changed(self): + self.no_post_process() + self._cached_label_checked = False + cached_label_changed(self.__class__, instance=self, created=False) + def post_save_geo(self): + self.no_post_process() + self._post_saved_geo = False + post_save_geo(self.__class__, instance=self, created=False) -def geotown_attached_changed(sender, **kwargs): - # manage associated geoitem - profile = get_current_profile() - if not profile.mapping: - return - instance = kwargs.get("instance", None) - model = kwargs.get("model", None) - pk_set = kwargs.get("pk_set", None) - action = kwargs.get("action", None) - if not instance or not model or not hasattr(instance, "post_save_geo"): - return + def external_id_changed(self): + self.no_post_process() + self._external_id_changed = False + external_id_changed(self.__class__, instance=self, created=False) - instance._post_save_geo_ok = False - if action in ("post_add", "post_remove", "post_clear"): - instance.post_save_geo(save=True) + def can_do(self, request, action_name): + """ + Check permission availability for the current object. + :param request: request object + :param action_name: action name eg: "change_find" + :return: boolean + """ + # overload with OwnPerm when _own_ is relevant + if not getattr(request.user, "ishtaruser", None): + return False + user = request.user + return user.ishtaruser.has_right(action_name, request.session)\ + def get_extra_actions(self, request): + if not hasattr(self, "SLUG"): + return [] -class Address(BaseHistorizedItem): - FIELDS = ( - "address", - "address_complement", - "postal_code", - "town", - "precise_town_id", - "country", - "alt_address", - "alt_address_complement", - "alt_postal_code", - "alt_town", - "alt_country", - "phone", - "phone_desc", - "phone2", - "phone_desc2", - "phone3", - "phone_desc3", - "raw_phone", - "mobile_phone", - "email", - "alt_address_is_prefered", - ) - address = models.TextField(_("Address"), blank=True, default="") - address_complement = models.TextField( - _("Address complement"), blank=True, default="" - ) - postal_code = models.CharField( - _("Postal code"), max_length=10, null=True, blank=True - ) - town = models.CharField(_("Town (freeform)"), max_length=150, null=True, blank=True) - precise_town_id = models.PositiveIntegerField( - verbose_name=_("Town (precise)"), - null=True, - blank=True, - ) - country = models.CharField(_("Country"), max_length=30, null=True, blank=True) - alt_address = models.TextField(_("Other address: address"), blank=True, default="") - alt_address_complement = models.TextField( - _("Other address: address complement"), blank=True, default="" - ) - alt_postal_code = models.CharField( - _("Other address: postal code"), max_length=10, null=True, blank=True - ) - alt_town = models.CharField( - _("Other address: town"), max_length=70, null=True, blank=True - ) - alt_country = models.CharField( - _("Other address: country"), max_length=30, null=True, blank=True + actions = [] + if request.user.is_superuser and hasattr(self, "auto_external_id"): + actions += [ + ( + reverse("regenerate-external-id") + + "?{}={}".format(self.SLUG, self.pk), + _("Regenerate ID"), + "fa fa-key", + _("regen."), + "btn-info", + True, + 200, + ) + ] + + return actions + + +class TownManager(models.Manager): + def get_by_natural_key(self, numero_insee, year): + return self.get(numero_insee=numero_insee, year=year) + + +class Town(GeographicItem, Imported, DocumentItem, MainItem, models.Model): + SLUG = "town" + name = models.CharField(_("Name"), max_length=100) + surface = models.IntegerField(_("Surface (m2)"), blank=True, null=True) + center = models.PointField( + _("Localisation"), srid=settings.SRID, blank=True, null=True ) - phone = models.CharField(_("Phone"), max_length=32, null=True, blank=True) - phone_desc = models.CharField( - _("Phone description"), max_length=300, null=True, blank=True + limit = models.MultiPolygonField(_("Limit"), blank=True, null=True) + numero_insee = models.CharField("Code commune (numéro INSEE)", max_length=120) + departement = models.ForeignKey( + Department, + verbose_name=_("Department"), + on_delete=models.SET_NULL, + null=True, + blank=True, ) - phone2 = models.CharField( - _("Phone description 2"), max_length=32, null=True, blank=True + year = models.IntegerField( + _("Year of creation"), + null=True, + blank=True, + help_text=_( + "Filling this field is relevant to distinguish old towns " "from new towns." + ), ) - phone_desc2 = models.CharField( - _("Phone description 2"), max_length=300, null=True, blank=True + children = models.ManyToManyField( + "Town", verbose_name=_("Town children"), blank=True, related_name="parents" ) - phone3 = models.CharField(_("Phone 3"), max_length=32, null=True, blank=True) - phone_desc3 = models.CharField( - _("Phone description 3"), max_length=300, null=True, blank=True + cached_label = models.CharField( + _("Cached name"), max_length=500, null=True, blank=True, db_index=True ) - raw_phone = models.TextField(_("Raw phone"), blank=True, default="") - mobile_phone = models.CharField( - _("Mobile phone"), max_length=32, null=True, blank=True + documents = models.ManyToManyField( + "Document", related_name="towns", verbose_name=_("Documents"), blank=True ) - email = models.EmailField(_("Email"), max_length=300, blank=True, null=True) - alt_address_is_prefered = models.BooleanField( - _("Alternative address is prefered"), default=False + main_image = models.ForeignKey( + "Document", + related_name="main_image_towns", + on_delete=models.SET_NULL, + verbose_name=_("Main image"), + blank=True, + null=True, ) - history = HistoricalRecords(inherit=True) - SUB_ADDRESSES = [] + objects = TownManager() class Meta: - abstract = True + verbose_name = _("Town") + verbose_name_plural = _("Towns") + if settings.COUNTRY == "fr": + ordering = ["numero_insee"] + unique_together = (("numero_insee", "year"),) + ADMIN_SECTION = _("Geography") - @property - def precise_town(self): - if hasattr(self, "_precise_town"): - return self._precise_town - if not self.precise_town_id: - self._precise_town = None - else: - try: - self._precise_town = Town.objects.get(id=self.precise_town_id) - except Town.DoesNotExist: - self._precise_town = None - return self._precise_town + def natural_key(self): + return (self.numero_insee, self.year) - @property - def precise_town_name(self): - if not self.precise_town: - return "" - return self.precise_town.name + def history_compress(self): + return {"numero_insee": self.numero_insee, "year": self.year or ""} - @post_importer_action - def set_town_by_code(self, context, value): - try: - town = Town.objects.get(numero_insee=value) - except Town.DoesNotExist: - raise ImporterError( - str(_("Town with code: {} does not exists")).format(value) - ) - self.precise_town_id = town.pk - self.skip_history_when_saving = True - self.save() - return self - set_town_by_code.post_save = True + @classmethod + def get_documentation_string(cls): + """ + Used for automatic documentation generation + """ + return "**name** {}, **numero_insee** {}, **cached_label** {}".format( + _("Name"), "Code commune (numéro INSEE)", _("Cached name") + ) - def get_short_html_items(self): - items = [] - if self.address: - items.append("""{}""".format(self.address)) - if self.address_complement: - items.append( - """{}""".format( - self.address_complement - ) - ) - if self.postal_code: - items.append( - """{}""".format(self.postal_code) - ) - if self.precise_town: - items.append( - """{}""".format(self.precise_town.name) - ) - elif self.town: - items.append("""{}""".format(self.town)) - if self.country: - items.append("""{}""".format(self.country)) - return items + @property + def surface_ha(self): + if not self.surface: + return 0 + return self.surface / 10000.0 - def get_short_html_detail(self): - html = """
""" - items = self.get_short_html_items() - if not items: - items = [ - "{}".format(_("No associated address")) - ] - html += "".join(items) - html += """
""" - return html + def get_filename(self): + if self.numero_insee: + return f"{self.numero_insee} - {slugify(self.name)}" + return slugify(self.name) - def get_town_centroid(self): - if self.precise_town: - return self.precise_town.center, self._meta.verbose_name - for sub_address in self.SUB_ADDRESSES: - sub_item = getattr(self, sub_address) - if sub_item and sub_item.precise_town: - return sub_item.precise_town.center, sub_item._meta.verbose_name + def associated_filename(self): + return self.get_filename() - def get_town_polygons(self): - if self.precise_town: - return self.precise_town.limit, self._meta.verbose_name - for sub_address in self.SUB_ADDRESSES: - sub_item = getattr(self, sub_address) - if sub_item and sub_item.precise_town: - return sub_item.precise_town.limit, sub_item._meta.verbose_name + def get_values(self, prefix="", **kwargs): + return { + prefix or "label": str(self), + prefix + "name": self.name, + prefix + "numero_insee": self.numero_insee, + } - def get_attribute(self, attr): - if self.town or self.precise_town: - return getattr(self, attr) - for sub_address in self.SUB_ADDRESSES: - sub_item = getattr(self, sub_address) - if not sub_item: + @classmethod + def history_decompress(cls, full_value, create=False): + if not full_value: + return [] + res = [] + for value in full_value: + try: + res.append( + cls.objects.get( + numero_insee=value["numero_insee"], year=value["year"] or None + ) + ) + except cls.DoesNotExist: continue - if sub_item.town or sub_item.precise_town: - return getattr(sub_item, attr) - return getattr(self, attr) - - def get_address(self): - return self.get_attribute("address") - - def get_address_complement(self): - return self.get_attribute("address_complement") - - def get_postal_code(self): - return self.get_attribute("postal_code") + return res - def get_town(self): - return self.get_attribute("town") + def __str__(self): + return self.cached_label or "" - def get_precise_town(self): - return self.get_attribute("precise_town") + def geodata_child_item_queries(self): + return [self.sites, self.operations] - def get_country(self): - return self.get_attribute("country") + @property + def label_with_areas(self): + label = [self.name] + if self.numero_insee: + label.append("({})".format(self.numero_insee)) + for area in self.areas.all(): + label.append(" - ") + label.append(area.full_label) + return " ".join(label) - def simple_lbl(self): - return str(self) + def generate_geo(self, force=False): + force = self.generate_limit(force=force) + self.generate_center(force=force) + self.generate_area(force=force) - def full_address(self): - lbl = self.simple_lbl() - if lbl: - lbl += "\n" - lbl += self.address_lbl() - return lbl - - def address_lbl(self, list=False): - lbls = [] - prefix = "" - if self.alt_address_is_prefered: - prefix = "alt_" - if getattr(self, prefix + "address"): - lbls.append(( - getattr(self, prefix + "address"), - _("Address") - )) - if getattr(self, prefix + "address_complement"): - lbls.append(( - getattr(self, prefix + "address_complement"), - _("Address complement") - )) - postal_code = getattr(self, prefix + "postal_code") - town = getattr(self, prefix + "town") - if postal_code or town: - lbls.append(( - " ".join([postal_code, town]), - _("Postal code - Town") - )) - if self.phone: - lbls.append(( - self.phone, - _("Phone") - )) - if self.mobile_phone: - lbls.append(( - self.mobile_phone, - _("Mobile") - )) - if self.email: - lbls.append(( - self.email, - _("Email") - )) - if list: - return lbls - return "\n".join([ - value for value, lbl in lbls - ]) - - def address_lbl_list(self): - return self.address_lbl(list=True) - - -class Merge(models.Model): - merge_key = models.TextField(_("Merge key"), blank=True, null=True) - merge_candidate = models.ManyToManyField("self", blank=True) - merge_exclusion = models.ManyToManyField("self", blank=True) - archived = models.NullBooleanField(default=False, blank=True, null=True) - # 1 for one word similarity, 2 for two word similarity, etc. - MERGE_CLEMENCY = None - EMPTY_MERGE_KEY = "--" - MERGE_ATTRIBUTE = "name" - - class Meta: - abstract = True + def generate_limit(self, force=False): + if not force and self.limit: + return + parents = None + if not self.parents.count(): + return + for parent in self.parents.all(): + if not parent.limit: + return + if not parents: + parents = parent.limit + else: + parents = parents.union(parent.limit) + # if union is a simple polygon make it a multi + if "MULTI" not in parents.wkt: + parents = parents.wkt.replace("POLYGON", "MULTIPOLYGON(") + ")" + if not parents: + return + self.limit = parents + self.save() + return True - def generate_merge_key(self): - if self.archived: + def generate_center(self, force=False): + if not force and (self.center or not self.limit): return - merge_attr = getattr(self, self.MERGE_ATTRIBUTE) - self.merge_key = slugify(merge_attr if merge_attr else "") - if not self.merge_key: - self.merge_key = self.EMPTY_MERGE_KEY - self.merge_key = self.merge_key + self.center = self.limit.centroid + if not self.center: + return False + self.save() + return True - def generate_merge_candidate(self): - if self.archived: + def generate_area(self, force=False): + if not force and (self.surface or not self.limit): return - if not self.merge_key: - self.generate_merge_key() - self.save(merge_key_generated=True) - if not self.pk or self.merge_key == self.EMPTY_MERGE_KEY: + try: + surface = self.limit.transform(settings.SURFACE_SRID, clone=True).area + except GDALException: + return False + if surface > 214748364 or not surface: + return False + self.surface = surface + self.save() + return True + + def update_town_code(self): + if not self.numero_insee or not self.children.count() or not self.year: return - q = ( - self.__class__.objects.exclude(pk=self.pk) - .exclude(merge_exclusion=self) - .exclude(merge_candidate=self) - .exclude(archived=True) - ) - if not self.MERGE_CLEMENCY: - q = q.filter(merge_key=self.merge_key) - else: - subkeys_front = "-".join(self.merge_key.split("-")[: self.MERGE_CLEMENCY]) - subkeys_back = "-".join(self.merge_key.split("-")[-self.MERGE_CLEMENCY :]) - q = q.filter( - Q(merge_key__istartswith=subkeys_front) - | Q(merge_key__iendswith=subkeys_back) - ) - for item in q.all(): - self.merge_candidate.add(item) + old_num = self.numero_insee[:] + numero = old_num.split("-")[0] + base_insee = "{}-{}".format(numero, self.year) + self.numero_insee = base_insee + idx = 0 + while Town.objects.filter( + year=self.year, numero_insee=self.numero_insee).exclude( + pk=self.pk).count(): + idx += 1 + self.numero_insee = base_insee + "-" + str(idx) + if self.numero_insee != old_num: + return True - def save(self, *args, **kwargs): - # prevent circular save - merge_key_generated = False - if "merge_key_generated" in kwargs: - merge_key_generated = kwargs.pop("merge_key_generated") - self.generate_merge_key() - item = super(Merge, self).save(*args, **kwargs) - if not merge_key_generated: - self.merge_candidate.clear() - self.generate_merge_candidate() - return item + def _get_base_image_path(self): + if self.numero_insee and len(self.numero_insee) == 5: + prefix = self.numero_insee[:2] + return f"{self.SLUG}/{prefix}" + return self.SLUG - def archive(self): - self.archived = True - self.save() - self.merge_candidate.clear() - self.merge_exclusion.clear() + def _generate_cached_label(self): + cached_label = self.name + if settings.COUNTRY == "fr" and self.numero_insee: + dpt_len = 2 + if ( + self.numero_insee.startswith("97") + or self.numero_insee.startswith("98") + or self.numero_insee[0] + not in ("0", "1", "2", "3", "4", "5", "6", "7", "8", "9") + ): + dpt_len = 3 + cached_label = "%s - %s" % (self.name, self.numero_insee[:dpt_len]) + if self.year and self.children.count(): + cached_label += " ({})".format(self.year) + return cached_label - def merge(self, item, keep_old=False, exclude_fields=None): - merge_model_objects( - self, item, keep_old=keep_old, exclude_fields=exclude_fields - ) - self.generate_merge_candidate() + def get_extra_actions(self, request): + """ + For sheet template + """ + # url, base_text, icon, extra_text, extra css class, is a quick action + actions = super().get_extra_actions(request) + profile = get_current_profile() + can_add_geo = profile.mapping and self.can_do(request, "add_geovectordata") + if can_add_geo: + actions.append(self.get_add_geo_action()) + return actions -def __get_stats_cache_values(model_name, model_pk): - StatsCache = apps.get_model("ishtar_common", "StatsCache") - q = StatsCache.objects.filter(model=model_name, model_pk=model_pk) - nb = q.count() - if nb >= 1: - sc = q.all()[0] - for extra in q.order_by("-id").all()[1:]: - extra.delete() - else: - sc = StatsCache.objects.create(model=model_name, model_pk=model_pk) - values = sc.values - if not values: - values = {} - return sc, values +def post_save_town(sender, **kwargs): + cached_label_changed(sender, **kwargs) + town = kwargs["instance"] + town.generate_geo() + if town.update_town_code(): + town.save() -@task() -def _update_stats(app, model, model_pk, funcname): - model_name = app + "." + model - model = apps.get_model(app, model) - try: - item = model.objects.get(pk=model_pk) - except model.DoesNotExist: - return - value = getattr(item, funcname)() - sc, current_values = __get_stats_cache_values(model_name, model_pk) - current_values[funcname] = value - sc.values = current_values - sc.update_requested = None - sc.updated = datetime.datetime.now() - sc.save() +post_save.connect(post_save_town, sender=Town) +m2m_changed.connect(geodata_attached_changed, sender=Town.geodata.through) +m2m_changed.connect(document_attached_changed, sender=Town.documents.through) -def update_stats(statscache, item, funcname): - if not settings.USE_BACKGROUND_TASK: - current_values = statscache.values - if not current_values: - current_values = {} - value = getattr(item, funcname)() - current_values[funcname] = value - statscache.values = current_values - statscache.updated = datetime.datetime.now() - statscache.save() - return current_values +def town_child_changed(sender, **kwargs): + town = kwargs["instance"] + if town.update_town_code(): + town.save() - now = datetime.datetime.now() - app_name = item._meta.app_label - model_name = item._meta.model_name - statscache.update_requested = now.isoformat() - statscache.save() - _update_stats.delay(app_name, model_name, item.pk, funcname) - return statscache.values +m2m_changed.connect(town_child_changed, sender=Town.children.through) -class DashboardFormItem: - """ - Provide methods to manage statistics - """ - def last_stats_update(self): - model_name = self._meta.app_label + "." + self._meta.model_name - StatsCache = apps.get_model("ishtar_common", "StatsCache") - q = StatsCache.objects.filter(model=model_name, model_pk=self.pk).order_by( - "-updated" - ) - if not q.count(): - return - return q.all()[0].updated - - def _get_or_set_stats(self, funcname, update=False, expected_type=None): - model_name = self._meta.app_label + "." + self._meta.model_name - StatsCache = apps.get_model("ishtar_common", "StatsCache") - sc, __ = StatsCache.objects.get_or_create(model=model_name, model_pk=self.pk) - if not update: - values = sc.values - if funcname not in values: - if expected_type is not None: - return expected_type() - return 0 - else: - values = update_stats(sc, self, funcname) - if funcname in values: - values = values[funcname] - else: - values = 0 - if expected_type is not None and not isinstance(values, expected_type): - return expected_type() - return values - - @classmethod - def get_periods(cls, slice="month", fltr={}, date_source="creation"): - date_var = date_source + "_date" - q = cls.objects.filter(**{date_var + "__isnull": False}) - if fltr: - q = q.filter(**fltr) - if slice == "year": - return [ - res[date_var].year - for res in list(q.values(date_var).annotate(Count("id")).order_by()) - ] - elif slice == "month": - return [ - (res[date_var].year, res[date_var].month) - for res in list(q.values(date_var).annotate(Count("id")).order_by()) - ] - return [] - - @classmethod - def get_by_year(cls, year, fltr={}, date_source="creation"): - date_var = date_source + "_date" - q = cls.objects.filter(**{date_var + "__isnull": False}) - if fltr: - q = q.filter(**fltr) - return q.filter(**{date_var + "__year": year}).order_by("pk").distinct("pk") - - @classmethod - def get_by_month(cls, year, month, fltr={}, date_source="creation"): - date_var = date_source + "_date" - q = cls.objects.filter(**{date_var + "__isnull": False}) - if fltr: - q = q.filter(**fltr) - q = q.filter(**{date_var + "__year": year, date_var + "__month": month}) - return q.order_by("pk").distinct("pk") - - @classmethod - def get_total_number(cls, fltr=None): - q = cls.objects - if fltr: - q = q.filter(**fltr) - return q.order_by("pk").distinct("pk").count() - - -class QuickAction: - """ - Quick action available from tables - """ +def geotown_attached_changed(sender, **kwargs): + # manage associated geoitem + profile = get_current_profile() + if not profile.mapping: + return + instance = kwargs.get("instance", None) + model = kwargs.get("model", None) + pk_set = kwargs.get("pk_set", None) + action = kwargs.get("action", None) + if not instance or not model or not hasattr(instance, "post_save_geo"): + return - def __init__( - self, - url, - icon_class="", - text="", - target=None, - rights=None, - module=None, - is_popup=True, - ): - self.url = url - self.icon_class = icon_class - self.text = text - self.rights = rights - self.target = target - self.module = module - self.is_popup = is_popup - if self.target not in ("one", "many", None): - raise AttributeError("target must be one, many or None") + instance._post_save_geo_ok = False + if action in ("post_add", "post_remove", "post_clear"): + instance.post_save_geo(save=True) - def is_available(self, user, session=None, obj=None): - if self.module and not getattr(get_current_profile(), self.module): - return False - if not self.rights: # no restriction - return True - if not user or not hasattr(user, "ishtaruser") or not user.ishtaruser: - return False - user = user.ishtaruser - for right in self.rights: - if user.has_perm(right, session=session, obj=obj): - return True - return False +class Address(BaseHistorizedItem): + FIELDS = ( + "address", + "address_complement", + "postal_code", + "town", + "precise_town_id", + "country", + "alt_address", + "alt_address_complement", + "alt_postal_code", + "alt_town", + "alt_country", + "phone", + "phone_desc", + "phone2", + "phone_desc2", + "phone3", + "phone_desc3", + "raw_phone", + "mobile_phone", + "email", + "alt_address_is_prefered", + ) + address = models.TextField(_("Address"), blank=True, default="") + address_complement = models.TextField( + _("Address complement"), blank=True, default="" + ) + postal_code = models.CharField( + _("Postal code"), max_length=10, null=True, blank=True + ) + town = models.CharField(_("Town (freeform)"), max_length=150, null=True, blank=True) + precise_town_id = models.PositiveIntegerField( + verbose_name=_("Town (precise)"), + null=True, + blank=True, + ) + country = models.CharField(_("Country"), max_length=30, null=True, blank=True) + alt_address = models.TextField(_("Other address: address"), blank=True, default="") + alt_address_complement = models.TextField( + _("Other address: address complement"), blank=True, default="" + ) + alt_postal_code = models.CharField( + _("Other address: postal code"), max_length=10, null=True, blank=True + ) + alt_town = models.CharField( + _("Other address: town"), max_length=70, null=True, blank=True + ) + alt_country = models.CharField( + _("Other address: country"), max_length=30, null=True, blank=True + ) + phone = models.CharField(_("Phone"), max_length=32, null=True, blank=True) + phone_desc = models.CharField( + _("Phone description"), max_length=300, null=True, blank=True + ) + phone2 = models.CharField( + _("Phone description 2"), max_length=32, null=True, blank=True + ) + phone_desc2 = models.CharField( + _("Phone description 2"), max_length=300, null=True, blank=True + ) + phone3 = models.CharField(_("Phone 3"), max_length=32, null=True, blank=True) + phone_desc3 = models.CharField( + _("Phone description 3"), max_length=300, null=True, blank=True + ) + raw_phone = models.TextField(_("Raw phone"), blank=True, default="") + mobile_phone = models.CharField( + _("Mobile phone"), max_length=32, null=True, blank=True + ) + email = models.EmailField(_("Email"), max_length=300, blank=True, null=True) + alt_address_is_prefered = models.BooleanField( + _("Alternative address is prefered"), default=False + ) + history = HistoricalRecords(inherit=True) + SUB_ADDRESSES = [] - @property - def rendered_icon(self): - if not self.icon_class: - return "" - return "".format(self.icon_class) + class Meta: + abstract = True @property - def base_url(self): - if self.target is None: - url = reverse(self.url) + def precise_town(self): + if hasattr(self, "_precise_town"): + return self._precise_town + if not self.precise_town_id: + self._precise_town = None else: - # put arbitrary pk for the target - url = reverse(self.url, args=[0]) - url = url[:-2] # all quick action url have to finish with the - # pk of the selected item and a "/" - return url - + try: + self._precise_town = Town.objects.get(id=self.precise_town_id) + except Town.DoesNotExist: + self._precise_town = None + return self._precise_town -class DynamicRequest: - def __init__( - self, - label, - app_name, - model_name, - form_key, - search_key, - type_query, - search_query, - ): - self.label = label - self.form_key = form_key - self.search_key = search_key - self.app_name = app_name - self.model_name = model_name - self.type_query = type_query - self.search_query = search_query + @property + def precise_town_name(self): + if not self.precise_town: + return "" + return self.precise_town.name - def get_all_types(self): - model = apps.get_app_config(self.app_name).get_model(self.model_name) - return model.objects.filter(available=True) + @post_importer_action + def set_town_by_code(self, context, value): + try: + town = Town.objects.get(numero_insee=value) + except Town.DoesNotExist: + raise ImporterError( + str(_("Town with code: {} does not exists")).format(value) + ) + self.precise_town_id = town.pk + self.skip_history_when_saving = True + self.save() + return self + set_town_by_code.post_save = True - def get_form_fields(self): - fields = {} - for item in self.get_all_types().all(): - fields[self.form_key + "-" + item.txt_idx] = forms.CharField( - label=str(self.label) + " " + str(item), required=False + def get_short_html_items(self): + items = [] + if self.address: + items.append("""{}""".format(self.address)) + if self.address_complement: + items.append( + """{}""".format( + self.address_complement + ) ) - return fields + if self.postal_code: + items.append( + """{}""".format(self.postal_code) + ) + if self.precise_town: + items.append( + """{}""".format(self.precise_town.name) + ) + elif self.town: + items.append("""{}""".format(self.town)) + if self.country: + items.append("""{}""".format(self.country)) + return items - def get_extra_query(self, slug): - return {self.type_query: slug} + def get_short_html_detail(self): + html = """
""" + items = self.get_short_html_items() + if not items: + items = [ + "{}".format(_("No associated address")) + ] + html += "".join(items) + html += """
""" + return html - def get_alt_names(self): - alt_names = {} - for item in self.get_all_types().all(): - alt_names[self.form_key + "-" + item.txt_idx] = SearchAltName( - self.search_key + "-" + item.txt_idx, - self.search_query, - self.get_extra_query(item.txt_idx), - distinct_query=True, - ) - return alt_names + def get_town_centroid(self): + if self.precise_town: + return self.precise_town.center, self._meta.verbose_name + for sub_address in self.SUB_ADDRESSES: + sub_item = getattr(self, sub_address) + if sub_item and sub_item.precise_town: + return sub_item.precise_town.center, sub_item._meta.verbose_name + + def get_town_polygons(self): + if self.precise_town: + return self.precise_town.limit, self._meta.verbose_name + for sub_address in self.SUB_ADDRESSES: + sub_item = getattr(self, sub_address) + if sub_item and sub_item.precise_town: + return sub_item.precise_town.limit, sub_item._meta.verbose_name + def get_attribute(self, attr): + if self.town or self.precise_town: + return getattr(self, attr) + for sub_address in self.SUB_ADDRESSES: + sub_item = getattr(self, sub_address) + if not sub_item: + continue + if sub_item.town or sub_item.precise_town: + return getattr(sub_item, attr) + return getattr(self, attr) -class GeoItem(GeographicItem): - # gis - to be removed - GEO_SOURCE = (("T", _("Town")), ("P", _("Precise")), ("M", _("Polygon"))) + def get_address(self): + return self.get_attribute("address") - x = models.FloatField(_("X"), blank=True, null=True) - y = models.FloatField(_("Y"), blank=True, null=True) - z = models.FloatField(_("Z"), blank=True, null=True) - estimated_error_x = models.FloatField( - _("Estimated error for X"), blank=True, null=True - ) - estimated_error_y = models.FloatField( - _("Estimated error for Y"), blank=True, null=True - ) - estimated_error_z = models.FloatField( - _("Estimated error for Z"), blank=True, null=True - ) - spatial_reference_system = models.ForeignKey( - SpatialReferenceSystem, - verbose_name=_("Spatial Reference System"), - blank=True, - null=True, - on_delete=models.PROTECT, - ) - point = models.PointField(_("Point"), blank=True, null=True, dim=3) - point_2d = models.PointField(_("Point (2D)"), blank=True, null=True) - point_source = models.CharField( - _("Point source"), choices=GEO_SOURCE, max_length=1, blank=True, null=True - ) - point_source_item = models.CharField( - _("Point source item"), max_length=100, blank=True, null=True - ) - multi_polygon = models.MultiPolygonField(_("Multi polygon"), blank=True, null=True) - multi_polygon_source = models.CharField( - _("Multi-polygon source"), - choices=GEO_SOURCE, - max_length=1, - blank=True, - null=True, - ) - multi_polygon_source_item = models.CharField( - _("Multi polygon source item"), max_length=100, blank=True, null=True - ) + def get_address_complement(self): + return self.get_attribute("address_complement") - GEO_LABEL = "" + def get_postal_code(self): + return self.get_attribute("postal_code") - class Meta: - abstract = True + def get_town(self): + return self.get_attribute("town") - def get_town_centroid(self): - raise NotImplementedError + def get_precise_town(self): + return self.get_attribute("precise_town") - def get_town_polygons(self): - raise NotImplementedError + def get_country(self): + return self.get_attribute("country") - @property - def X(self): - """x coordinates using the default SRS""" - coord = self.display_coordinates - if not coord: - return - return coord[0] + def simple_lbl(self): + return str(self) - @property - def Y(self): - """y coordinates using the default SRS""" - coord = self.display_coordinates - if not coord: - return - return coord[1] + def full_address(self): + lbl = self.simple_lbl() + if lbl: + lbl += "\n" + lbl += self.address_lbl() + return lbl - @property - def display_coordinates(self, rounded=True): - if not self.main_geodata: - return "" - return self.main_geodata.display_coordinates(rounded=rounded) + def address_lbl(self, list=False): + lbls = [] + prefix = "" + if self.alt_address_is_prefered: + prefix = "alt_" + if getattr(self, prefix + "address"): + lbls.append(( + getattr(self, prefix + "address"), + _("Address") + )) + if getattr(self, prefix + "address_complement"): + lbls.append(( + getattr(self, prefix + "address_complement"), + _("Address complement") + )) + postal_code = getattr(self, prefix + "postal_code") + town = getattr(self, prefix + "town") + if postal_code or town: + lbls.append(( + " ".join([postal_code, town]), + _("Postal code - Town") + )) + if self.phone: + lbls.append(( + self.phone, + _("Phone") + )) + if self.mobile_phone: + lbls.append(( + self.mobile_phone, + _("Mobile") + )) + if self.email: + lbls.append(( + self.email, + _("Email") + )) + if list: + return lbls + return "\n".join([ + value for value, lbl in lbls + ]) - @property - def display_spatial_reference_system(self): - profile = get_current_profile() - if not profile.display_srs or not profile.display_srs.srid: - return self.spatial_reference_system - return profile.display_srs + def address_lbl_list(self): + return self.address_lbl(list=True) - def get_precise_points(self): - if self.point_source == "P" and self.point_2d: - return self.point_2d, self.point, self.point_source_item - def get_precise_polygons(self): - if self.multi_polygon_source == "P" and self.multi_polygon: - return self.multi_polygon, self.multi_polygon_source_item +class Merge(models.Model): + merge_key = models.TextField(_("Merge key"), blank=True, null=True) + merge_candidate = models.ManyToManyField("self", blank=True) + merge_exclusion = models.ManyToManyField("self", blank=True) + archived = models.NullBooleanField(default=False, blank=True, null=True) + # 1 for one word similarity, 2 for two word similarity, etc. + MERGE_CLEMENCY = None + EMPTY_MERGE_KEY = "--" + MERGE_ATTRIBUTE = "name" - def get_geo_items(self, rounded=5): - if not self.main_geodata: - return {} - return self.main_geodata.get_geo_items(rounded) + class Meta: + abstract = True - def convert_coordinates(self, point_2d, rounded): - profile = get_current_profile() - if ( - not profile.display_srs - or not profile.display_srs.srid - or ( - profile.display_srs == self.spatial_reference_system - and point_2d.x - and point_2d.y - ) - ): - x, y = point_2d.x, point_2d.y - else: - point = point_2d.transform(profile.display_srs.srid, clone=True) - x, y = point.x, point.y - if rounded: - return [round(x, 5), round(y, 5)] - return [x, y] + def generate_merge_key(self): + if self.archived: + return + merge_attr = getattr(self, self.MERGE_ATTRIBUTE) + self.merge_key = slugify(merge_attr if merge_attr else "") + if not self.merge_key: + self.merge_key = self.EMPTY_MERGE_KEY + self.merge_key = self.merge_key - def _geojson_serialize(self, geom_attr): - if not hasattr(self, geom_attr): - return "" - cached_label_key = "cached_label" - if self.GEO_LABEL: - cached_label_key = self.GEO_LABEL - if getattr(self, "CACHED_LABELS", None): - cached_label_key = self.CACHED_LABELS[-1] - geojson = serialize( - "geojson", - self.__class__.objects.filter(pk=self.pk), - geometry_field=geom_attr, - fields=(cached_label_key,), + def generate_merge_candidate(self): + if self.archived: + return + if not self.merge_key: + self.generate_merge_key() + self.save(merge_key_generated=True) + if not self.pk or self.merge_key == self.EMPTY_MERGE_KEY: + return + q = ( + self.__class__.objects.exclude(pk=self.pk) + .exclude(merge_exclusion=self) + .exclude(merge_candidate=self) + .exclude(archived=True) ) - geojson_dct = json.loads(geojson) - profile = get_current_profile() - precision = profile.point_precision + if not self.MERGE_CLEMENCY: + q = q.filter(merge_key=self.merge_key) + else: + subkeys_front = "-".join(self.merge_key.split("-")[: self.MERGE_CLEMENCY]) + subkeys_back = "-".join(self.merge_key.split("-")[-self.MERGE_CLEMENCY :]) + q = q.filter( + Q(merge_key__istartswith=subkeys_front) + | Q(merge_key__iendswith=subkeys_back) + ) + for item in q.all(): + self.merge_candidate.add(item) + + def save(self, *args, **kwargs): + # prevent circular save + merge_key_generated = False + if "merge_key_generated" in kwargs: + merge_key_generated = kwargs.pop("merge_key_generated") + self.generate_merge_key() + item = super(Merge, self).save(*args, **kwargs) + if not merge_key_generated: + self.merge_candidate.clear() + self.generate_merge_candidate() + return item + + def archive(self): + self.archived = True + self.save() + self.merge_candidate.clear() + self.merge_exclusion.clear() - features = geojson_dct.pop("features") - for idx in range(len(features)): - feature = features[idx] - lbl = feature["properties"].pop(cached_label_key) - feature["properties"]["name"] = lbl - feature["properties"]["id"] = self.pk - if precision is not None: - geom_type = feature["geometry"].get("type", None) - if geom_type == "Point": - feature["geometry"]["coordinates"] = [ - round(coord, precision) - for coord in feature["geometry"]["coordinates"] - ] - geojson_dct["features"] = features - geojson_dct["link_template"] = simple_link_to_window(self).replace( - "999999", "" + def merge(self, item, keep_old=False, exclude_fields=None): + merge_model_objects( + self, item, keep_old=keep_old, exclude_fields=exclude_fields ) - geojson = json.dumps(geojson_dct) - return geojson + self.generate_merge_candidate() - @property - def point_2d_geojson(self): - return self._geojson_serialize("point_2d") - @property - def multi_polygon_geojson(self): - return self._geojson_serialize("multi_polygon") +def __get_stats_cache_values(model_name, model_pk): + StatsCache = apps.get_model("ishtar_common", "StatsCache") + q = StatsCache.objects.filter(model=model_name, model_pk=model_pk) + nb = q.count() + if nb >= 1: + sc = q.all()[0] + for extra in q.order_by("-id").all()[1:]: + extra.delete() + else: + sc = StatsCache.objects.create(model=model_name, model_pk=model_pk) + values = sc.values + if not values: + values = {} + return sc, values -class ImageContainerModel: - def _get_image_path(self, filename): - return "{}/{}".format(self._get_base_image_path(), filename) +@task() +def _update_stats(app, model, model_pk, funcname): + model_name = app + "." + model + model = apps.get_model(app, model) + try: + item = model.objects.get(pk=model_pk) + except model.DoesNotExist: + return + value = getattr(item, funcname)() + sc, current_values = __get_stats_cache_values(model_name, model_pk) + current_values[funcname] = value + sc.values = current_values + sc.update_requested = None + sc.updated = datetime.datetime.now() + sc.save() - def _get_base_image_path(self): - n = datetime.datetime.now() - return "upload/{}/{:02d}/{:02d}".format(n.year, n.month, n.day) +def update_stats(statscache, item, funcname): + if not settings.USE_BACKGROUND_TASK: + current_values = statscache.values + if not current_values: + current_values = {} + value = getattr(item, funcname)() + current_values[funcname] = value + statscache.values = current_values + statscache.updated = datetime.datetime.now() + statscache.save() + return current_values -class CompleteIdentifierItem(models.Model, ImageContainerModel): - HAS_QR_CODE = True - cached_label = models.TextField( - _("Cached name"), - blank=True, - default="", - db_index=True, - help_text=_("Generated automatically - do not edit"), - ) - complete_identifier = models.TextField( - _("Complete identifier"), blank=True, default="" - ) - custom_index = models.IntegerField("Custom index", blank=True, null=True) - qrcode = models.ImageField( - upload_to=get_image_path, blank=True, null=True, max_length=255 - ) + now = datetime.datetime.now() + app_name = item._meta.app_label + model_name = item._meta.model_name + statscache.update_requested = now.isoformat() + statscache.save() + _update_stats.delay(app_name, model_name, item.pk, funcname) + return statscache.values - class Meta: - abstract = True - @property - def qrcode_path(self): - if not self.qrcode: - self.generate_qrcode() - if not self.qrcode: # error on qrcode generation - return "" - return self.qrcode.path +class DashboardFormItem: + """ + Provide methods to manage statistics + """ - def _profile_generate_cached_label(self): - slug = getattr(self, "SLUG", None) - if not slug: + def last_stats_update(self): + model_name = self._meta.app_label + "." + self._meta.model_name + StatsCache = apps.get_model("ishtar_common", "StatsCache") + q = StatsCache.objects.filter(model=model_name, model_pk=self.pk).order_by( + "-updated" + ) + if not q.count(): return - return get_generated_id(slug + "_cached_label", self) - - def _generate_cached_label(self): - label = self._profile_generate_cached_label() - if not label: - # to be eventually overloaded by parent class - return str(self) - return label + return q.all()[0].updated - def generate_qrcode(self, request=None, secure=True, tmpdir=None): - url = self.get_absolute_url() - site = Site.objects.get_current() - if request: - scheme = request.scheme + def _get_or_set_stats(self, funcname, update=False, expected_type=None): + model_name = self._meta.app_label + "." + self._meta.model_name + StatsCache = apps.get_model("ishtar_common", "StatsCache") + sc, __ = StatsCache.objects.get_or_create(model=model_name, model_pk=self.pk) + if not update: + values = sc.values + if funcname not in values: + if expected_type is not None: + return expected_type() + return 0 else: - if secure: - scheme = "https" - else: - scheme = "http" - url = scheme + "://" + site.domain + url - TinyUrl = apps.get_model("ishtar_common", "TinyUrl") - tiny_url = TinyUrl() - tiny_url.link = url - tiny_url.save() - short_url = ( - scheme - + "://" - + site.domain - + reverse("tiny-redirect", args=[tiny_url.get_short_id()]) - ) - qr = pyqrcode.create(short_url, version=settings.ISHTAR_QRCODE_VERSION) - tmpdir_created = False - if not tmpdir: - tmpdir = tempfile.mkdtemp("-qrcode") - tmpdir_created = True - filename = tmpdir + os.sep + "qrcode.png" - qr.png(filename, scale=settings.ISHTAR_QRCODE_SCALE) - self.skip_history_when_saving = True - self._no_move = True - self._no_geo_check = True - with open(filename, "rb") as qrfile: - self.qrcode.save("qrcode.png", File(qrfile)) - self.save() - if tmpdir_created: - shutil.rmtree(tmpdir) + values = update_stats(sc, self, funcname) + if funcname in values: + values = values[funcname] + else: + values = 0 + if expected_type is not None and not isinstance(values, expected_type): + return expected_type() + return values - def generate_complete_identifier(self): - SLUG = getattr(self, "SLUG", None) - if not SLUG: - return "" - complete_identifier = get_generated_id(SLUG + "_complete_identifier", self) - if complete_identifier: - return complete_identifier - cached_label_key = "cached_label" - if getattr(self, "GEO_LABEL", None): - cached_label_key = getattr(self, "GEO_LABEL", None) - if hasattr(self, "CACHED_COMPLETE_ID"): - cached_label_key = self.CACHED_COMPLETE_ID - if not cached_label_key: - return - complete_identifier = getattr(self, cached_label_key) - return complete_identifier + @classmethod + def get_periods(cls, slice="month", fltr={}, date_source="creation"): + date_var = date_source + "_date" + q = cls.objects.filter(**{date_var + "__isnull": False}) + if fltr: + q = q.filter(**fltr) + if slice == "year": + return [ + res[date_var].year + for res in list(q.values(date_var).annotate(Count("id")).order_by()) + ] + elif slice == "month": + return [ + (res[date_var].year, res[date_var].month) + for res in list(q.values(date_var).annotate(Count("id")).order_by()) + ] + return [] + + @classmethod + def get_by_year(cls, year, fltr={}, date_source="creation"): + date_var = date_source + "_date" + q = cls.objects.filter(**{date_var + "__isnull": False}) + if fltr: + q = q.filter(**fltr) + return q.filter(**{date_var + "__year": year}).order_by("pk").distinct("pk") + + @classmethod + def get_by_month(cls, year, month, fltr={}, date_source="creation"): + date_var = date_source + "_date" + q = cls.objects.filter(**{date_var + "__isnull": False}) + if fltr: + q = q.filter(**fltr) + q = q.filter(**{date_var + "__year": year, date_var + "__month": month}) + return q.order_by("pk").distinct("pk") + + @classmethod + def get_total_number(cls, fltr=None): + q = cls.objects + if fltr: + q = q.filter(**fltr) + return q.order_by("pk").distinct("pk").count() + + +class QuickAction: + """ + Quick action available from tables + """ + + def __init__( + self, + url, + icon_class="", + text="", + target=None, + rights=None, + module=None, + is_popup=True, + ): + self.url = url + self.icon_class = icon_class + self.text = text + self.rights = rights + self.target = target + self.module = module + self.is_popup = is_popup + if self.target not in ("one", "many", None): + raise AttributeError("target must be one, many or None") - def get_index_whole_db(self): - q = self.__class__.objects.exclude(custom_index__isnull=True) - q = q.order_by("-custom_index") - if q.count(): - current_index = q.values_list("custom_index", flat=True).all()[0] - return current_index + 1 - return 1 + def is_available(self, user, session=None, obj=None): + if self.module and not getattr(get_current_profile(), self.module): + return False + if not self.rights: # no restriction + return True + if not user or not hasattr(user, "ishtaruser") or not user.ishtaruser: + return False + user = user.ishtaruser - def generate_custom_index(self, force=False): - if not self.pk: - return - if self.custom_index and not force: - return self.custom_index - SLUG = getattr(self, "SLUG", None) - if not SLUG: - return - k = SLUG + "_custom_index" - profile = get_current_profile() - if not hasattr(profile, k): - return - key = getattr(profile, k) - if not key or not key.strip(): - return - keys = key.strip().split(";") - if len(keys) == 1 and hasattr(self, "get_index_" + keys[0]): - # custom index generation - return getattr(self, "get_index_" + key)() - model = self.__class__ - try: - self_keys = set(list(model.objects.filter(pk=self.pk).values_list(*keys))) - except Exception: # bad settings - not managed here - print("Bad settings for custom_index {}".format(";".join(keys))) - return - if len(self_keys) != 1: # key is not distinct - return - self_key = self_keys.pop() - return self._get_index(keys, self_key) + for right in self.rights: + if user.has_perm(right, session=session, obj=obj): + return True + return False - def _get_index(self, keys: list, self_keys: list): - model = self.__class__ - q = model.objects - if self.pk: - q = model.objects.exclude(pk=self.pk) - for idx, key in enumerate(keys): - q = q.filter(**{key: self_keys[idx]}) - try: - r = q.aggregate(max_index=Max("custom_index")) - except Exception: # bad settings - return - if not r["max_index"]: - return 1 - return r["max_index"] + 1 + @property + def rendered_icon(self): + if not self.icon_class: + return "" + return "".format(self.icon_class) - def save(self, *args, **kwargs): - super(CompleteIdentifierItem, self).save(*args, **kwargs) - self.regenerate_all_ids() + @property + def base_url(self): + if self.target is None: + url = reverse(self.url) + else: + # put arbitrary pk for the target + url = reverse(self.url, args=[0]) + url = url[:-2] # all quick action url have to finish with the + # pk of the selected item and a "/" + return url - def regenerate_all_ids(self, save=True): - if getattr(self, "_prevent_loop", False): - return - modified = False - custom_index = self.generate_custom_index() - if custom_index != self.custom_index: - modified = True - self.custom_index = custom_index - complete_id = self.generate_complete_identifier() - if complete_id: - modified = True - self.complete_identifier = complete_id - if modified: - self._prevent_loop = True - self.skip_history_when_saving = True - if save: - self.save() - return modified +class DynamicRequest: + def __init__( + self, + label, + app_name, + model_name, + form_key, + search_key, + type_query, + search_query, + ): + self.label = label + self.form_key = form_key + self.search_key = search_key + self.app_name = app_name + self.model_name = model_name + self.type_query = type_query + self.search_query = search_query + + def get_all_types(self): + model = apps.get_app_config(self.app_name).get_model(self.model_name) + return model.objects.filter(available=True) -class SearchVectorConfig: - def __init__(self, key, language=None, func=None): - self.key = key - if language: - self.language = language - if language == "local": - self.language = settings.ISHTAR_SEARCH_LANGUAGE - else: - self.language = "simple" - self.func = func + def get_form_fields(self): + fields = {} + for item in self.get_all_types().all(): + fields[self.form_key + "-" + item.txt_idx] = forms.CharField( + label=str(self.label) + " " + str(item), required=False + ) + return fields - def format(self, value): - if value == "None": - value = "" - if not self.func: - return [value] - value = self.func(value) - if not isinstance(value, list): - return [value] - return value + def get_extra_query(self, slug): + return {self.type_query: slug} + def get_alt_names(self): + alt_names = {} + for item in self.get_all_types().all(): + alt_names[self.form_key + "-" + item.txt_idx] = SearchAltName( + self.search_key + "-" + item.txt_idx, + self.search_query, + self.get_extra_query(item.txt_idx), + distinct_query=True, + ) + return alt_names -class ShortMenuItem: - """ - Item available in the short menu - """ - UP_MODEL_QUERY = {} +class GeoItem(GeographicItem): + # gis - to be removed + GEO_SOURCE = (("T", _("Town")), ("P", _("Precise")), ("M", _("Polygon"))) - @classmethod - def get_short_menu_class(cls, pk): - return "" + x = models.FloatField(_("X"), blank=True, null=True) + y = models.FloatField(_("Y"), blank=True, null=True) + z = models.FloatField(_("Z"), blank=True, null=True) + estimated_error_x = models.FloatField( + _("Estimated error for X"), blank=True, null=True + ) + estimated_error_y = models.FloatField( + _("Estimated error for Y"), blank=True, null=True + ) + estimated_error_z = models.FloatField( + _("Estimated error for Z"), blank=True, null=True + ) + spatial_reference_system = models.ForeignKey( + SpatialReferenceSystem, + verbose_name=_("Spatial Reference System"), + blank=True, + null=True, + on_delete=models.PROTECT, + ) + point = models.PointField(_("Point"), blank=True, null=True, dim=3) + point_2d = models.PointField(_("Point (2D)"), blank=True, null=True) + point_source = models.CharField( + _("Point source"), choices=GEO_SOURCE, max_length=1, blank=True, null=True + ) + point_source_item = models.CharField( + _("Point source item"), max_length=100, blank=True, null=True + ) + multi_polygon = models.MultiPolygonField(_("Multi polygon"), blank=True, null=True) + multi_polygon_source = models.CharField( + _("Multi-polygon source"), + choices=GEO_SOURCE, + max_length=1, + blank=True, + null=True, + ) + multi_polygon_source_item = models.CharField( + _("Multi polygon source item"), max_length=100, blank=True, null=True + ) - @property - def short_class_name(self): - return "" + GEO_LABEL = "" + class Meta: + abstract = True -class SerializeItem: - SERIALIZE_EXCLUDE = ["search_vector"] - SERIALIZE_PROPERTIES = [ - "external_id", - "multi_polygon_geojson", - "point_2d_geojson", - "images_number", - "json_sections", - ] - SERIALIZE_CALL = {} - SERIALIZE_DATES = [] - SERIALIZATION_FILES = [] - SERIALIZE_STRING = [] + def get_town_centroid(self): + raise NotImplementedError - def full_serialize(self, search_model=None, recursion=False) -> dict: - """ - API serialization - :return: data dict - """ - full_result = {} - serialize_fields = [] + def get_town_polygons(self): + raise NotImplementedError - exclude = [] - if search_model: - exclude = [sf.key for sf in search_model.sheet_filters.distinct().all()] + @property + def X(self): + """x coordinates using the default SRS""" + coord = self.display_coordinates + if not coord: + return + return coord[0] - no_geodata = False - for prop in ("main_geodata", "geodata", "geodata_list"): - if prop in self.SERIALIZE_EXCLUDE or prop in exclude: - no_geodata = True - break + @property + def Y(self): + """y coordinates using the default SRS""" + coord = self.display_coordinates + if not coord: + return + return coord[1] - for field in self._meta.get_fields(): - field_name = field.name - if field_name in self.SERIALIZE_EXCLUDE or field_name in exclude: - continue - if field.many_to_one or field.one_to_one: - try: - value = getattr(self, field_name) - except (MultipleObjectsReturned, ObjectDoesNotExist): - value = None - if value: - if ( - field_name not in self.SERIALIZE_STRING - and hasattr(value, "full_serialize") - and not recursion - ): - # print(field.name, self.__class__, self) - if field_name == "main_geodata" and no_geodata: - continue - value = value.full_serialize(search_model, recursion=True) - elif field_name in self.SERIALIZATION_FILES: - try: - value = {"url": value.url} - except ValueError: - value = None - else: - value = str(value) - else: - value = None - full_result[field_name] = value - if field_name == "main_geodata": - full_result["geodata_list"] = [value] - elif field.many_to_many: - values = getattr(self, field_name) - if values.count(): - first_value = values.all()[0] - if ( - field_name not in self.SERIALIZE_STRING - and hasattr(first_value, "full_serialize") - and not recursion - ): - # print(field.name, self.__class__, self) - values = [ - v.full_serialize(search_model, recursion=True) - for v in values.all() - ] - else: - if first_value in self.SERIALIZATION_FILES: - values = [] - for v in values: - try: - values.append({"url": v.url}) - except ValueError: - pass - else: - values = [str(v) for v in values.all()] - else: - values = [] - full_result[field_name] = values - else: - if field_name in self.SERIALIZATION_FILES: - value = getattr(self, field_name) - try: - value = {"url": value.url} - except ValueError: - value = None - full_result[field.name] = value - else: - serialize_fields.append(field_name) + @property + def display_coordinates(self, rounded=True): + if not self.main_geodata: + return "" + return self.main_geodata.display_coordinates(rounded=rounded) - result = json.loads(serialize("json", [self], fields=serialize_fields)) - full_result.update(result[0]["fields"]) - for prop in self.SERIALIZE_PROPERTIES: - if prop in self.SERIALIZE_EXCLUDE or prop in exclude: - continue - if hasattr(self, prop) and prop not in full_result: - full_result[prop] = getattr(self, prop) - if "point_2d_geojson" in full_result: - full_result["point_2d"] = True - if "multi_polygon_geojson" in full_result: - full_result["multi_polygon"] = True - for prop in self.SERIALIZE_DATES: - if prop in self.SERIALIZE_EXCLUDE or prop in exclude: - continue - dt = getattr(self, prop) or "" - if dt: - dt = human_date(dt) - full_result[prop] = dt - for k in self.SERIALIZE_CALL: - if k in self.SERIALIZE_EXCLUDE or k in exclude: - continue - full_result[k] = getattr(self, self.SERIALIZE_CALL[k])() - full_result["SLUG"] = self.SLUG - full_result["pk"] = f"external_{self.pk}" - full_result["id"] = f"external_{self.id}" - return full_result + @property + def display_spatial_reference_system(self): + profile = get_current_profile() + if not profile.display_srs or not profile.display_srs.srid: + return self.spatial_reference_system + return profile.display_srs - def get_associated_main_item_list(self, attr, model) -> list: - items = getattr(self, attr) - if not items.count(): - return [] - lst = [] - table_cols = model.TABLE_COLS - if callable(table_cols): - table_cols = table_cols() - for colname in table_cols: - if colname in model.COL_LABELS: - lst.append(str(model.COL_LABELS[colname])) - else: - lst.append(model._meta.get_field(colname).verbose_name) - lst = [lst] - for values in items.values_list(*table_cols): - lst.append(["-" if v is None else v for v in values]) - return lst + def get_precise_points(self): + if self.point_source == "P" and self.point_2d: + return self.point_2d, self.point, self.point_source_item + def get_precise_polygons(self): + if self.multi_polygon_source == "P" and self.multi_polygon: + return self.multi_polygon, self.multi_polygon_source_item -class MainItem(ShortMenuItem, SerializeItem): - """ - Item with quick actions available from tables - Extra actions are available from sheets - Manage cascade updated, has_changed and no_post_process - """ + def get_geo_items(self, rounded=5): + if not self.main_geodata: + return {} + return self.main_geodata.get_geo_items(rounded) - QUICK_ACTIONS = [] - SLUG = "" - DOWN_MODEL_UPDATE = [] - INITIAL_VALUES = [] # list of field checkable if changed on save + def convert_coordinates(self, point_2d, rounded): + profile = get_current_profile() + if ( + not profile.display_srs + or not profile.display_srs.srid + or ( + profile.display_srs == self.spatial_reference_system + and point_2d.x + and point_2d.y + ) + ): + x, y = point_2d.x, point_2d.y + else: + point = point_2d.transform(profile.display_srs.srid, clone=True) + x, y = point.x, point.y + if rounded: + return [round(x, 5), round(y, 5)] + return [x, y] - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self._initial_values = {} - for field_name in self.INITIAL_VALUES: - self._initial_values[field_name] = getattr(self, field_name) + def _geojson_serialize(self, geom_attr): + if not hasattr(self, geom_attr): + return "" + cached_label_key = "cached_label" + if self.GEO_LABEL: + cached_label_key = self.GEO_LABEL + if getattr(self, "CACHED_LABELS", None): + cached_label_key = self.CACHED_LABELS[-1] + geojson = serialize( + "geojson", + self.__class__.objects.filter(pk=self.pk), + geometry_field=geom_attr, + fields=(cached_label_key,), + ) + geojson_dct = json.loads(geojson) + profile = get_current_profile() + precision = profile.point_precision - def has_changed(self): - """ - Check which field have a changed value between INITIAL_VALUES - :return: list of changed fields - """ - changed = [] - for field_name in self._initial_values: - value = getattr(self, field_name) - if self._initial_values[field_name] != value: - changed.append(field_name) - self._initial_values[field_name] = value - return changed + features = geojson_dct.pop("features") + for idx in range(len(features)): + feature = features[idx] + lbl = feature["properties"].pop(cached_label_key) + feature["properties"]["name"] = lbl + feature["properties"]["id"] = self.pk + if precision is not None: + geom_type = feature["geometry"].get("type", None) + if geom_type == "Point": + feature["geometry"]["coordinates"] = [ + round(coord, precision) + for coord in feature["geometry"]["coordinates"] + ] + geojson_dct["features"] = features + geojson_dct["link_template"] = simple_link_to_window(self).replace( + "999999", "" + ) + geojson = json.dumps(geojson_dct) + return geojson - def cascade_update(self, changed=True): - if not changed: - return - for down_model in self.DOWN_MODEL_UPDATE: - if not settings.USE_BACKGROUND_TASK: - rel = getattr(self, down_model) - if hasattr(rel.model, "need_update"): - rel.update(need_update=True) - continue - for item in getattr(self, down_model).all(): - item.cached_label_changed() - if hasattr(item, "main_geodata"): - item.post_save_geo() + @property + def point_2d_geojson(self): + return self._geojson_serialize("point_2d") - def no_post_process(self): - self.skip_history_when_saving = True - self._cached_label_checked = True - self._post_saved_geo = True - self._external_id_changed = False - self._search_updated = True - self._no_move = True + @property + def multi_polygon_geojson(self): + return self._geojson_serialize("multi_polygon") - @classmethod - def app_label(cls): - return cls._meta.app_label - @classmethod - def model_name(cls): - return cls._meta.model_name +class ImageContainerModel: + def _get_image_path(self, filename): + return "{}/{}".format(self._get_base_image_path(), filename) - @classmethod - def class_verbose_name(cls): - return cls._meta.verbose_name + def _get_base_image_path(self): + n = datetime.datetime.now() + return "upload/{}/{:02d}/{:02d}".format(n.year, n.month, n.day) - @classmethod - def get_columns(cls, table_cols_attr="TABLE_COLS", dict_col_labels=True): - """ - :param table_cols_attr: "TABLE_COLS" if not specified - :param dict_col_labels: (default: True) if set to False return list matching - with table_cols list - :return: (table_cols, table_col_labels) - """ - return get_columns_from_class(cls, table_cols_attr=table_cols_attr, - dict_col_labels=dict_col_labels) - def get_search_url(self): - if self.SLUG: - return reverse(self.SLUG + "_search") +class CompleteIdentifierItem(models.Model, ImageContainerModel): + HAS_QR_CODE = True + cached_label = models.TextField( + _("Cached name"), + blank=True, + default="", + db_index=True, + help_text=_("Generated automatically - do not edit"), + ) + complete_identifier = models.TextField( + _("Complete identifier"), blank=True, default="" + ) + custom_index = models.IntegerField("Custom index", blank=True, null=True) + qrcode = models.ImageField( + upload_to=get_image_path, blank=True, null=True, max_length=255 + ) - @classmethod - def get_quick_actions(cls, user, session=None, obj=None): - """ - Get a list of (url, title, icon, target) actions for an user - """ - qas = [] - for action in cls.QUICK_ACTIONS: - if not action.is_available(user, session=session, obj=obj): - continue - qas.append( - [ - action.base_url, - mark_safe(action.text), - mark_safe(action.rendered_icon), - action.target or "", - action.is_popup, - ] - ) - return qas + class Meta: + abstract = True - @classmethod - def get_quick_action_by_url(cls, url): - for action in cls.QUICK_ACTIONS: - if action.url == url: - return action + @property + def qrcode_path(self): + if not self.qrcode: + self.generate_qrcode() + if not self.qrcode: # error on qrcode generation + return "" + return self.qrcode.path - def regenerate_external_id(self): - if not hasattr(self, "external_id"): + def _profile_generate_cached_label(self): + slug = getattr(self, "SLUG", None) + if not slug: return + return get_generated_id(slug + "_cached_label", self) + + def _generate_cached_label(self): + label = self._profile_generate_cached_label() + if not label: + # to be eventually overloaded by parent class + return str(self) + return label + + def generate_qrcode(self, request=None, secure=True, tmpdir=None): + url = self.get_absolute_url() + site = Site.objects.get_current() + if request: + scheme = request.scheme + else: + if secure: + scheme = "https" + else: + scheme = "http" + url = scheme + "://" + site.domain + url + TinyUrl = apps.get_model("ishtar_common", "TinyUrl") + tiny_url = TinyUrl() + tiny_url.link = url + tiny_url.save() + short_url = ( + scheme + + "://" + + site.domain + + reverse("tiny-redirect", args=[tiny_url.get_short_id()]) + ) + qr = pyqrcode.create(short_url, version=settings.ISHTAR_QRCODE_VERSION) + tmpdir_created = False + if not tmpdir: + tmpdir = tempfile.mkdtemp("-qrcode") + tmpdir_created = True + filename = tmpdir + os.sep + "qrcode.png" + qr.png(filename, scale=settings.ISHTAR_QRCODE_SCALE) self.skip_history_when_saving = True self._no_move = True - self.external_id = "" - self.auto_external_id = True + self._no_geo_check = True + with open(filename, "rb") as qrfile: + self.qrcode.save("qrcode.png", File(qrfile)) self.save() + if tmpdir_created: + shutil.rmtree(tmpdir) - def cached_label_changed(self): - self.no_post_process() - self._cached_label_checked = False - cached_label_changed(self.__class__, instance=self, created=False) + def generate_complete_identifier(self): + SLUG = getattr(self, "SLUG", None) + if not SLUG: + return "" + complete_identifier = get_generated_id(SLUG + "_complete_identifier", self) + if complete_identifier: + return complete_identifier + cached_label_key = "cached_label" + if getattr(self, "GEO_LABEL", None): + cached_label_key = getattr(self, "GEO_LABEL", None) + if hasattr(self, "CACHED_COMPLETE_ID"): + cached_label_key = self.CACHED_COMPLETE_ID + if not cached_label_key: + return + complete_identifier = getattr(self, cached_label_key) + return complete_identifier - def post_save_geo(self): - self.no_post_process() - self._post_saved_geo = False - post_save_geo(self.__class__, instance=self, created=False) + def get_index_whole_db(self): + q = self.__class__.objects.exclude(custom_index__isnull=True) + q = q.order_by("-custom_index") + if q.count(): + current_index = q.values_list("custom_index", flat=True).all()[0] + return current_index + 1 + return 1 - def external_id_changed(self): - self.no_post_process() - self._external_id_changed = False - external_id_changed(self.__class__, instance=self, created=False) + def generate_custom_index(self, force=False): + if not self.pk: + return + if self.custom_index and not force: + return self.custom_index + SLUG = getattr(self, "SLUG", None) + if not SLUG: + return + k = SLUG + "_custom_index" + profile = get_current_profile() + if not hasattr(profile, k): + return + key = getattr(profile, k) + if not key or not key.strip(): + return + keys = key.strip().split(";") + if len(keys) == 1 and hasattr(self, "get_index_" + keys[0]): + # custom index generation + return getattr(self, "get_index_" + key)() + model = self.__class__ + try: + self_keys = set(list(model.objects.filter(pk=self.pk).values_list(*keys))) + except Exception: # bad settings - not managed here + print("Bad settings for custom_index {}".format(";".join(keys))) + return + if len(self_keys) != 1: # key is not distinct + return + self_key = self_keys.pop() + return self._get_index(keys, self_key) - def get_extra_actions(self, request): - if not hasattr(self, "SLUG"): - return [] + def _get_index(self, keys: list, self_keys: list): + model = self.__class__ + q = model.objects + if self.pk: + q = model.objects.exclude(pk=self.pk) + for idx, key in enumerate(keys): + q = q.filter(**{key: self_keys[idx]}) + try: + r = q.aggregate(max_index=Max("custom_index")) + except Exception: # bad settings + return + if not r["max_index"]: + return 1 + return r["max_index"] + 1 - actions = [] - if request.user.is_superuser and hasattr(self, "auto_external_id"): - actions += [ - ( - reverse("regenerate-external-id") - + "?{}={}".format(self.SLUG, self.pk), - _("Regenerate ID"), - "fa fa-key", - _("regen."), - "btn-info", - True, - 200, - ) - ] + def save(self, *args, **kwargs): + super(CompleteIdentifierItem, self).save(*args, **kwargs) + self.regenerate_all_ids() - return actions + def regenerate_all_ids(self, save=True): + if getattr(self, "_prevent_loop", False): + return + modified = False + custom_index = self.generate_custom_index() + if custom_index != self.custom_index: + modified = True + self.custom_index = custom_index + complete_id = self.generate_complete_identifier() + if complete_id: + modified = True + self.complete_identifier = complete_id + if modified: + self._prevent_loop = True + self.skip_history_when_saving = True + if save: + self.save() + return modified + + +class SearchVectorConfig: + def __init__(self, key, language=None, func=None): + self.key = key + if language: + self.language = language + if language == "local": + self.language = settings.ISHTAR_SEARCH_LANGUAGE + else: + self.language = "simple" + self.func = func + + def format(self, value): + if value == "None": + value = "" + if not self.func: + return [value] + value = self.func(value) + if not isinstance(value, list): + return [value] + return value -- cgit v1.2.3