diff options
Diffstat (limited to 'ishtar_common/models_common.py')
-rw-r--r-- | ishtar_common/models_common.py | 680 |
1 files changed, 363 insertions, 317 deletions
diff --git a/ishtar_common/models_common.py b/ishtar_common/models_common.py index bc4a0549b..cacb5664e 100644 --- a/ishtar_common/models_common.py +++ b/ishtar_common/models_common.py @@ -2056,7 +2056,11 @@ class DocumentItem: except AttributeError: actions = [] - if not hasattr(self, "SLUG"): + if not hasattr(self, "SLUG") or not hasattr(self, "can_do"): + return actions + + if not hasattr(self, "can_do"): + print(f"**WARNING** can_do not implemented for {self.__class__}") return actions can_add_doc = self.can_do(request, "add_document") @@ -3092,12 +3096,343 @@ class GeographicItem(models.Model): return lst +class SerializeItem: + SERIALIZE_EXCLUDE = ["search_vector"] + SERIALIZE_PROPERTIES = [ + "external_id", + "multi_polygon_geojson", + "point_2d_geojson", + "images_number", + "json_sections", + ] + SERIALIZE_CALL = {} + SERIALIZE_DATES = [] + SERIALIZATION_FILES = [] + SERIALIZE_STRING = [] + + def full_serialize(self, search_model=None, recursion=False) -> dict: + """ + API serialization + :return: data dict + """ + full_result = {} + serialize_fields = [] + + exclude = [] + if search_model: + exclude = [sf.key for sf in search_model.sheet_filters.distinct().all()] + + no_geodata = False + for prop in ("main_geodata", "geodata", "geodata_list"): + if prop in self.SERIALIZE_EXCLUDE or prop in exclude: + no_geodata = True + break + + for field in self._meta.get_fields(): + field_name = field.name + if field_name in self.SERIALIZE_EXCLUDE or field_name in exclude: + continue + if field.many_to_one or field.one_to_one: + try: + value = getattr(self, field_name) + except (MultipleObjectsReturned, ObjectDoesNotExist): + value = None + if value: + if ( + field_name not in self.SERIALIZE_STRING + and hasattr(value, "full_serialize") + and not recursion + ): + # print(field.name, self.__class__, self) + if field_name == "main_geodata" and no_geodata: + continue + value = value.full_serialize(search_model, recursion=True) + elif field_name in self.SERIALIZATION_FILES: + try: + value = {"url": value.url} + except ValueError: + value = None + else: + value = str(value) + else: + value = None + full_result[field_name] = value + if field_name == "main_geodata": + full_result["geodata_list"] = [value] + elif field.many_to_many: + values = getattr(self, field_name) + if values.count(): + first_value = values.all()[0] + if ( + field_name not in self.SERIALIZE_STRING + and hasattr(first_value, "full_serialize") + and not recursion + ): + # print(field.name, self.__class__, self) + values = [ + v.full_serialize(search_model, recursion=True) + for v in values.all() + ] + else: + if first_value in self.SERIALIZATION_FILES: + values = [] + for v in values: + try: + values.append({"url": v.url}) + except ValueError: + pass + else: + values = [str(v) for v in values.all()] + else: + values = [] + full_result[field_name] = values + else: + if field_name in self.SERIALIZATION_FILES: + value = getattr(self, field_name) + try: + value = {"url": value.url} + except ValueError: + value = None + full_result[field.name] = value + else: + serialize_fields.append(field_name) + + result = json.loads(serialize("json", [self], fields=serialize_fields)) + full_result.update(result[0]["fields"]) + for prop in self.SERIALIZE_PROPERTIES: + if prop in self.SERIALIZE_EXCLUDE or prop in exclude: + continue + if hasattr(self, prop) and prop not in full_result: + full_result[prop] = getattr(self, prop) + if "point_2d_geojson" in full_result: + full_result["point_2d"] = True + if "multi_polygon_geojson" in full_result: + full_result["multi_polygon"] = True + for prop in self.SERIALIZE_DATES: + if prop in self.SERIALIZE_EXCLUDE or prop in exclude: + continue + dt = getattr(self, prop) or "" + if dt: + dt = human_date(dt) + full_result[prop] = dt + for k in self.SERIALIZE_CALL: + if k in self.SERIALIZE_EXCLUDE or k in exclude: + continue + full_result[k] = getattr(self, self.SERIALIZE_CALL[k])() + full_result["SLUG"] = self.SLUG + full_result["pk"] = f"external_{self.pk}" + full_result["id"] = f"external_{self.id}" + return full_result + + def get_associated_main_item_list(self, attr, model) -> list: + items = getattr(self, attr) + if not items.count(): + return [] + lst = [] + table_cols = model.TABLE_COLS + if callable(table_cols): + table_cols = table_cols() + for colname in table_cols: + if colname in model.COL_LABELS: + lst.append(str(model.COL_LABELS[colname])) + else: + lst.append(model._meta.get_field(colname).verbose_name) + lst = [lst] + for values in items.values_list(*table_cols): + lst.append(["-" if v is None else v for v in values]) + return lst + + +class ShortMenuItem: + """ + Item available in the short menu + """ + + UP_MODEL_QUERY = {} + + @classmethod + def get_short_menu_class(cls, pk): + return "" + + @property + def short_class_name(self): + return "" + + +class MainItem(ShortMenuItem, SerializeItem): + """ + Item with quick actions available from tables + Extra actions are available from sheets + Manage cascade updated, has_changed and no_post_process + """ + + QUICK_ACTIONS = [] + SLUG = "" + DOWN_MODEL_UPDATE = [] + INITIAL_VALUES = [] # list of field checkable if changed on save + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._initial_values = {} + for field_name in self.INITIAL_VALUES: + self._initial_values[field_name] = getattr(self, field_name) + + def has_changed(self): + """ + Check which field have a changed value between INITIAL_VALUES + :return: list of changed fields + """ + changed = [] + for field_name in self._initial_values: + value = getattr(self, field_name) + if self._initial_values[field_name] != value: + changed.append(field_name) + self._initial_values[field_name] = value + return changed + + def cascade_update(self, changed=True): + if not changed: + return + for down_model in self.DOWN_MODEL_UPDATE: + if not settings.USE_BACKGROUND_TASK: + rel = getattr(self, down_model) + if hasattr(rel.model, "need_update"): + rel.update(need_update=True) + continue + for item in getattr(self, down_model).all(): + item.cached_label_changed() + if hasattr(item, "main_geodata"): + item.post_save_geo() + + def no_post_process(self): + self.skip_history_when_saving = True + self._cached_label_checked = True + self._post_saved_geo = True + self._external_id_changed = False + self._search_updated = True + self._no_move = True + + @classmethod + def app_label(cls): + return cls._meta.app_label + + @classmethod + def model_name(cls): + return cls._meta.model_name + + @classmethod + def class_verbose_name(cls): + return cls._meta.verbose_name + + @classmethod + def get_columns(cls, table_cols_attr="TABLE_COLS", dict_col_labels=True): + """ + :param table_cols_attr: "TABLE_COLS" if not specified + :param dict_col_labels: (default: True) if set to False return list matching + with table_cols list + :return: (table_cols, table_col_labels) + """ + return get_columns_from_class(cls, table_cols_attr=table_cols_attr, + dict_col_labels=dict_col_labels) + + def get_search_url(self): + if self.SLUG: + try: + return reverse(self.SLUG + "_search") + except NoReverseMatch: + pass + + @classmethod + def get_quick_actions(cls, user, session=None, obj=None): + """ + Get a list of (url, title, icon, target) actions for an user + """ + qas = [] + for action in cls.QUICK_ACTIONS: + if not action.is_available(user, session=session, obj=obj): + continue + qas.append( + [ + action.base_url, + mark_safe(action.text), + mark_safe(action.rendered_icon), + action.target or "", + action.is_popup, + ] + ) + return qas + + @classmethod + def get_quick_action_by_url(cls, url): + for action in cls.QUICK_ACTIONS: + if action.url == url: + return action + + def regenerate_external_id(self): + if not hasattr(self, "external_id"): + return + self.skip_history_when_saving = True + self._no_move = True + self.external_id = "" + self.auto_external_id = True + self.save() + + def cached_label_changed(self): + self.no_post_process() + self._cached_label_checked = False + cached_label_changed(self.__class__, instance=self, created=False) + + def post_save_geo(self): + self.no_post_process() + self._post_saved_geo = False + post_save_geo(self.__class__, instance=self, created=False) + + def external_id_changed(self): + self.no_post_process() + self._external_id_changed = False + external_id_changed(self.__class__, instance=self, created=False) + + def can_do(self, request, action_name): + """ + Check permission availability for the current object. + :param request: request object + :param action_name: action name eg: "change_find" + :return: boolean + """ + # overload with OwnPerm when _own_ is relevant + if not getattr(request.user, "ishtaruser", None): + return False + user = request.user + return user.ishtaruser.has_right(action_name, request.session)\ + + def get_extra_actions(self, request): + if not hasattr(self, "SLUG"): + return [] + + actions = [] + if request.user.is_superuser and hasattr(self, "auto_external_id"): + actions += [ + ( + reverse("regenerate-external-id") + + "?{}={}".format(self.SLUG, self.pk), + _("Regenerate ID"), + "fa fa-key", + _("regen."), + "btn-info", + True, + 200, + ) + ] + + return actions + + class TownManager(models.Manager): def get_by_natural_key(self, numero_insee, year): return self.get(numero_insee=numero_insee, year=year) -class Town(GeographicItem, Imported, DocumentItem, models.Model): +class Town(GeographicItem, Imported, DocumentItem, MainItem, models.Model): SLUG = "town" name = models.CharField(_("Name"), max_length=100) surface = models.IntegerField(_("Surface (m2)"), blank=True, null=True) @@ -3163,6 +3498,20 @@ class Town(GeographicItem, Imported, DocumentItem, models.Model): _("Name"), "Code commune (numéro INSEE)", _("Cached name") ) + @property + def surface_ha(self): + if not self.surface: + return 0 + return self.surface / 10000.0 + + def get_filename(self): + if self.numero_insee: + return f"{self.numero_insee} - {slugify(self.name)}" + return slugify(self.name) + + def associated_filename(self): + return self.get_filename() + def get_values(self, prefix="", **kwargs): return { prefix or "label": str(self), @@ -3289,6 +3638,18 @@ class Town(GeographicItem, Imported, DocumentItem, models.Model): cached_label += " ({})".format(self.year) return cached_label + def get_extra_actions(self, request): + """ + For sheet template + """ + # url, base_text, icon, extra_text, extra css class, is a quick action + actions = super().get_extra_actions(request) + profile = get_current_profile() + can_add_geo = profile.mapping and self.can_do(request, "add_geovectordata") + if can_add_geo: + actions.append(self.get_add_geo_action()) + return actions + def post_save_town(sender, **kwargs): cached_label_changed(sender, **kwargs) @@ -4249,318 +4610,3 @@ class SearchVectorConfig: if not isinstance(value, list): return [value] return value - - -class ShortMenuItem: - """ - Item available in the short menu - """ - - UP_MODEL_QUERY = {} - - @classmethod - def get_short_menu_class(cls, pk): - return "" - - @property - def short_class_name(self): - return "" - - -class SerializeItem: - SERIALIZE_EXCLUDE = ["search_vector"] - SERIALIZE_PROPERTIES = [ - "external_id", - "multi_polygon_geojson", - "point_2d_geojson", - "images_number", - "json_sections", - ] - SERIALIZE_CALL = {} - SERIALIZE_DATES = [] - SERIALIZATION_FILES = [] - SERIALIZE_STRING = [] - - def full_serialize(self, search_model=None, recursion=False) -> dict: - """ - API serialization - :return: data dict - """ - full_result = {} - serialize_fields = [] - - exclude = [] - if search_model: - exclude = [sf.key for sf in search_model.sheet_filters.distinct().all()] - - no_geodata = False - for prop in ("main_geodata", "geodata", "geodata_list"): - if prop in self.SERIALIZE_EXCLUDE or prop in exclude: - no_geodata = True - break - - for field in self._meta.get_fields(): - field_name = field.name - if field_name in self.SERIALIZE_EXCLUDE or field_name in exclude: - continue - if field.many_to_one or field.one_to_one: - try: - value = getattr(self, field_name) - except (MultipleObjectsReturned, ObjectDoesNotExist): - value = None - if value: - if ( - field_name not in self.SERIALIZE_STRING - and hasattr(value, "full_serialize") - and not recursion - ): - # print(field.name, self.__class__, self) - if field_name == "main_geodata" and no_geodata: - continue - value = value.full_serialize(search_model, recursion=True) - elif field_name in self.SERIALIZATION_FILES: - try: - value = {"url": value.url} - except ValueError: - value = None - else: - value = str(value) - else: - value = None - full_result[field_name] = value - if field_name == "main_geodata": - full_result["geodata_list"] = [value] - elif field.many_to_many: - values = getattr(self, field_name) - if values.count(): - first_value = values.all()[0] - if ( - field_name not in self.SERIALIZE_STRING - and hasattr(first_value, "full_serialize") - and not recursion - ): - # print(field.name, self.__class__, self) - values = [ - v.full_serialize(search_model, recursion=True) - for v in values.all() - ] - else: - if first_value in self.SERIALIZATION_FILES: - values = [] - for v in values: - try: - values.append({"url": v.url}) - except ValueError: - pass - else: - values = [str(v) for v in values.all()] - else: - values = [] - full_result[field_name] = values - else: - if field_name in self.SERIALIZATION_FILES: - value = getattr(self, field_name) - try: - value = {"url": value.url} - except ValueError: - value = None - full_result[field.name] = value - else: - serialize_fields.append(field_name) - - result = json.loads(serialize("json", [self], fields=serialize_fields)) - full_result.update(result[0]["fields"]) - for prop in self.SERIALIZE_PROPERTIES: - if prop in self.SERIALIZE_EXCLUDE or prop in exclude: - continue - if hasattr(self, prop) and prop not in full_result: - full_result[prop] = getattr(self, prop) - if "point_2d_geojson" in full_result: - full_result["point_2d"] = True - if "multi_polygon_geojson" in full_result: - full_result["multi_polygon"] = True - for prop in self.SERIALIZE_DATES: - if prop in self.SERIALIZE_EXCLUDE or prop in exclude: - continue - dt = getattr(self, prop) or "" - if dt: - dt = human_date(dt) - full_result[prop] = dt - for k in self.SERIALIZE_CALL: - if k in self.SERIALIZE_EXCLUDE or k in exclude: - continue - full_result[k] = getattr(self, self.SERIALIZE_CALL[k])() - full_result["SLUG"] = self.SLUG - full_result["pk"] = f"external_{self.pk}" - full_result["id"] = f"external_{self.id}" - return full_result - - def get_associated_main_item_list(self, attr, model) -> list: - items = getattr(self, attr) - if not items.count(): - return [] - lst = [] - table_cols = model.TABLE_COLS - if callable(table_cols): - table_cols = table_cols() - for colname in table_cols: - if colname in model.COL_LABELS: - lst.append(str(model.COL_LABELS[colname])) - else: - lst.append(model._meta.get_field(colname).verbose_name) - lst = [lst] - for values in items.values_list(*table_cols): - lst.append(["-" if v is None else v for v in values]) - return lst - - -class MainItem(ShortMenuItem, SerializeItem): - """ - Item with quick actions available from tables - Extra actions are available from sheets - Manage cascade updated, has_changed and no_post_process - """ - - QUICK_ACTIONS = [] - SLUG = "" - DOWN_MODEL_UPDATE = [] - INITIAL_VALUES = [] # list of field checkable if changed on save - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self._initial_values = {} - for field_name in self.INITIAL_VALUES: - self._initial_values[field_name] = getattr(self, field_name) - - def has_changed(self): - """ - Check which field have a changed value between INITIAL_VALUES - :return: list of changed fields - """ - changed = [] - for field_name in self._initial_values: - value = getattr(self, field_name) - if self._initial_values[field_name] != value: - changed.append(field_name) - self._initial_values[field_name] = value - return changed - - def cascade_update(self, changed=True): - if not changed: - return - for down_model in self.DOWN_MODEL_UPDATE: - if not settings.USE_BACKGROUND_TASK: - rel = getattr(self, down_model) - if hasattr(rel.model, "need_update"): - rel.update(need_update=True) - continue - for item in getattr(self, down_model).all(): - item.cached_label_changed() - if hasattr(item, "main_geodata"): - item.post_save_geo() - - def no_post_process(self): - self.skip_history_when_saving = True - self._cached_label_checked = True - self._post_saved_geo = True - self._external_id_changed = False - self._search_updated = True - self._no_move = True - - @classmethod - def app_label(cls): - return cls._meta.app_label - - @classmethod - def model_name(cls): - return cls._meta.model_name - - @classmethod - def class_verbose_name(cls): - return cls._meta.verbose_name - - @classmethod - def get_columns(cls, table_cols_attr="TABLE_COLS", dict_col_labels=True): - """ - :param table_cols_attr: "TABLE_COLS" if not specified - :param dict_col_labels: (default: True) if set to False return list matching - with table_cols list - :return: (table_cols, table_col_labels) - """ - return get_columns_from_class(cls, table_cols_attr=table_cols_attr, - dict_col_labels=dict_col_labels) - - def get_search_url(self): - if self.SLUG: - return reverse(self.SLUG + "_search") - - @classmethod - def get_quick_actions(cls, user, session=None, obj=None): - """ - Get a list of (url, title, icon, target) actions for an user - """ - qas = [] - for action in cls.QUICK_ACTIONS: - if not action.is_available(user, session=session, obj=obj): - continue - qas.append( - [ - action.base_url, - mark_safe(action.text), - mark_safe(action.rendered_icon), - action.target or "", - action.is_popup, - ] - ) - return qas - - @classmethod - def get_quick_action_by_url(cls, url): - for action in cls.QUICK_ACTIONS: - if action.url == url: - return action - - def regenerate_external_id(self): - if not hasattr(self, "external_id"): - return - self.skip_history_when_saving = True - self._no_move = True - self.external_id = "" - self.auto_external_id = True - self.save() - - def cached_label_changed(self): - self.no_post_process() - self._cached_label_checked = False - cached_label_changed(self.__class__, instance=self, created=False) - - def post_save_geo(self): - self.no_post_process() - self._post_saved_geo = False - post_save_geo(self.__class__, instance=self, created=False) - - def external_id_changed(self): - self.no_post_process() - self._external_id_changed = False - external_id_changed(self.__class__, instance=self, created=False) - - def get_extra_actions(self, request): - if not hasattr(self, "SLUG"): - return [] - - actions = [] - if request.user.is_superuser and hasattr(self, "auto_external_id"): - actions += [ - ( - reverse("regenerate-external-id") - + "?{}={}".format(self.SLUG, self.pk), - _("Regenerate ID"), - "fa fa-key", - _("regen."), - "btn-info", - True, - 200, - ) - ] - - return actions |