#!/usr/bin/env python # -*- coding: utf-8 -*- # Copyright (C) 2013-2016 Étienne Loks # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # See the file COPYING for details. from csv import QUOTE_ALL import datetime from functools import wraps from itertools import chain import hashlib import os import random import re import requests import shutil import six import subprocess import sys import tempfile from django import forms from django.apps import apps from django.conf import settings from django.conf.urls import url from django.contrib.contenttypes.models import ContentType from django.contrib.gis.geos import GEOSGeometry from django.contrib.sessions.backends.db import SessionStore from django.core.cache import cache from django.core.files import File from django.core.validators import EMPTY_VALUES from django.core.urlresolvers import reverse from django.db import models from django.http import HttpResponseRedirect from django.utils.datastructures import MultiValueDict as BaseMultiValueDict from django.utils.safestring import mark_safe from django.utils.translation import ugettext_lazy as _, ugettext from django.template.defaultfilters import slugify class BColors: """ Bash colors. Don't forget to finish your colored string with ENDC. """ HEADER = '\033[95m' OKBLUE = '\033[94m' OKGREEN = '\033[92m' WARNING = '\033[93m' FAIL = '\033[91m' ENDC = '\033[0m' BOLD = '\033[1m' UNDERLINE = '\033[4m' CSV_OPTIONS = {'delimiter': ',', 'quotechar': '"', 'quoting': QUOTE_ALL} def check_rights(rights=None, redirect_url='/'): """ Decorator that checks the rights to access the view. """ def decorator(view_func): def _wrapped_view(request, *args, **kwargs): if not rights: return view_func(request, *args, **kwargs) if hasattr(request.user, 'ishtaruser'): if request.user.ishtaruser.has_right('administrator', request.session): kwargs['current_right'] = 'administrator' return view_func(request, *args, **kwargs) for right in rights: # be careful to put the more permissive rights first # if granted it can allow more if request.user.ishtaruser.has_right(right, request.session): kwargs['current_right'] = right return view_func(request, *args, **kwargs) put_session_message( request.session.session_key, _(u"You don't have sufficient permissions to do this action."), 'warning' ) return HttpResponseRedirect(redirect_url) return _wrapped_view return decorator def check_rights_condition(rights): """ To be used to check in wizard condition_dict """ def func(self): request = self.request if request.user.ishtaruser.has_right('administrator', request.session): return True for right in rights: if request.user.ishtaruser.has_right(right, request.session): return True return False return func def check_model_access_control(request, model, available_perms=None): """ Check access control to a model for a specific request :param request: the current request :param model: the concerned model :param available_perms: specific permissions to check if not specified "view" and "view_own" will be checked :return: (allowed, own) tuple """ own = True # more restrictive by default allowed = False if not request.user.is_authenticated(): return allowed, own if not available_perms: available_perms = ['view_' + model.__name__.lower(), 'view_own_' + model.__name__.lower()] try: ishtaruser = request.user.ishtaruser except request.user._meta.model.ishtaruser.RelatedObjectDoesNotExist: return False, True if ishtaruser.has_right('administrator', session=request.session): allowed = True own = False return allowed, own for perm, lbl in model._meta.permissions: if perm not in available_perms: continue if ishtaruser.person.has_right(perm, session=request.session): allowed = True if "_own_" not in perm: own = False break # max right reach return allowed, own def update_data(data_1, data_2, merge=False): """ Update a data directory taking account of key detail """ res = {} if not isinstance(data_1, dict) or not isinstance(data_2, dict): if data_2 and not data_1: return data_2 if not merge: return data_1 if data_2 and data_2 != data_1: return data_1 + u" ; " + data_2 return data_1 for k in data_1: if k not in data_2: res[k] = data_1[k] else: res[k] = update_data(data_1[k], data_2[k], merge=merge) for k in data_2: if k not in data_1: res[k] = data_2[k] return res def move_dict_data(data, key1, key2): """ Move key1 value to key2 value in a data dict :param data: data dict (with subdicts) :param key1: key to move (with __ notation for hierarchy - begining with "data__") :param key2: target key (with __ notation for hierarchy - begining with "data__") :return: result data """ keys1 = key1.split('__') keys2 = key2.split('__') value = data for idx, key in enumerate(keys1): if not idx: if key != 'data': return data continue if key not in value: return data if idx == (len(keys1) - 1): # last value = value.pop(key) # remove from data else: value = value[key] new_value = data for idx, key in enumerate(keys2): if not idx: if key != 'data': return data continue if idx == (len(keys2) - 1): # last new_value[key] = value else: if key not in new_value: new_value[key] = {} new_value = new_value[key] return data def clean_empty_data(data): """ Clean empty branches of a data dict """ for key in data.keys(): if data[key] in [{}, None, u""]: data.pop(key) continue if isinstance(data[key], dict): data[key] = clean_empty_data(data[key]) return data class MultiValueDict(BaseMultiValueDict): def get(self, *args, **kwargs): v = super(MultiValueDict, self).getlist(*args, **kwargs) if callable(v): v = v() if type(v) in (list, tuple) and len(v) > 1: v = ",".join(v) elif type(v) not in (int, unicode): v = super(MultiValueDict, self).get(*args, **kwargs) return v def getlist(self, *args, **kwargs): lst = super(MultiValueDict, self).getlist(*args, **kwargs) if type(lst) not in (tuple, list): lst = [lst] return lst def is_downloadable(url): """ Does the url contain a downloadable resource """ h = requests.head(url, allow_redirects=True) header = h.headers content_type = header.get('content-type') if 'text' in content_type.lower(): return False if 'html' in content_type.lower(): return False return True def get_current_year(): return datetime.datetime.now().year def get_cache(cls, extra_args=tuple(), app_label=None): if not app_label: app_label = cls._meta.app_label cache_key = u"{}-{}-{}".format( settings.PROJECT_SLUG, app_label, cls.__name__) for arg in extra_args: if not arg: cache_key += '-0' else: if type(arg) == dict: cache_key += '-' + "_".join([unicode(arg[k]) for k in arg]) elif type(arg) in (list, tuple): cache_key += '-' + "_".join([unicode(v) for v in arg]) else: cache_key += '-' + unicode(arg) cache_key = slugify(cache_key) if not cache_key.endswith('_current_keys') \ and hasattr(cls, '_add_cache_key_to_refresh'): cls._add_cache_key_to_refresh(extra_args) if len(cache_key) >= 250: m = hashlib.md5() m.update(cache_key) cache_key = m.hexdigest() return cache_key, cache.get(cache_key) def force_cached_label_changed(sender, **kwargs): if not kwargs.get('instance'): return kwargs['instance']._cached_label_checked = False cached_label_changed(sender, **kwargs) def cached_label_changed(sender, **kwargs): if not kwargs.get('instance'): return instance = kwargs.get('instance') force_update = kwargs.get('force_update', False) if hasattr(instance, 'test_obj'): instance.test_obj.reached(sender, **kwargs) if not force_update and hasattr(instance, '_cached_label_checked') \ and instance._cached_label_checked: return instance._cached_label_checked = True cached_labels = ['cached_label'] if hasattr(sender, 'CACHED_LABELS'): cached_labels = sender.CACHED_LABELS changed = False for cached_label in cached_labels: lbl = getattr(instance, '_generate_' + cached_label)() if lbl != getattr(instance, cached_label): setattr(instance, cached_label, lbl) changed = True if changed: instance._search_updated = False if hasattr(instance, '_cascade_change') and instance._cascade_change: instance.skip_history_when_saving = True instance.save() updated = False if force_update or hasattr(instance, 'update_search_vector'): updated = instance.update_search_vector() if hasattr(instance, '_cached_labels_bulk_update'): updated = instance._cached_labels_bulk_update() or updated if not updated and hasattr(instance, '_get_associated_cached_labels'): for item in instance._get_associated_cached_labels(): item._cascade_change = True if hasattr(instance, 'test_obj'): item.test_obj = instance.test_obj cached_label_changed(item.__class__, instance=item) def regenerate_all_cached_labels(model): """ When the rule for generating cached label change. Regeneration of all label has to be done. :param model: model class concerned """ for item in model.all(): item.skip_history_when_saving = True cached_label_changed(model, instance=item) SHORTIFY_STR = ugettext(" (...)") def shortify(lbl, number=20): if not lbl: lbl = '' if len(lbl) <= number: return lbl return lbl[:number - len(SHORTIFY_STR)] + SHORTIFY_STR def mode(array): most = max(list(map(array.count, array))) return list(set(filter(lambda x: array.count(x) == most, array))) def disable_for_loaddata(signal_handler): """ Decorator that turns off signal handlers when loading fixture data. """ @wraps(signal_handler) def wrapper(*args, **kwargs): if kwargs.get('raw'): return signal_handler(*args, **kwargs) return wrapper def _get_image_link(doc): from ishtar_common.models import IshtarSiteProfile # manage missing images if not doc.thumbnail or not doc.thumbnail.url or not doc.image \ or not doc.image.url: return "" item = None for related_key in doc.__class__.RELATED_MODELS: q = getattr(doc, related_key) if q.count(): item = q.all()[0] break if not item: # image attached to nothing... return "" item_class_name = unicode(item.__class__._meta.verbose_name) if item.__class__.__name__ == "ArchaeologicalSite": item_class_name = unicode(IshtarSiteProfile.get_default_site_label()) return mark_safe(u"""

{}

{}

{}
""".format( doc.image.url, doc.thumbnail.url, item_class_name, unicode(item), reverse(item.SHOW_URL, args=[item.pk, '']), unicode(_(u"Information")), unicode(_(u"Load another random image?")))) def get_random_item_image_link(request): from ishtar_common.models import Document if not hasattr(request.user, 'ishtaruser'): return '' ishtar_user = request.user.ishtaruser if not ishtar_user.has_right('ishtar_common.view_document', session=request.session): return '' q = Document.objects.filter( thumbnail__isnull=False, image__isnull=False ).exclude(thumbnail='').exclude(image='') total = q.count() if not total: return '' image_nb = random.randint(0, total - 1) return _get_image_link(q.all()[image_nb]) def convert_coordinates_to_point(x, y, z=None, srid=4326): if z: geom = GEOSGeometry('POINT({} {} {})'.format(x, y, z), srid=srid) else: geom = GEOSGeometry('POINT({} {})'.format(x, y), srid=srid) if not geom.valid: raise forms.ValidationError(geom.valid_reason) return geom def post_save_point(sender, **kwargs): """ Convert raw x, y, z point to real geo field """ if not kwargs.get('instance'): return instance = kwargs.get('instance') point = None point_2d = None if instance.x and instance.y and \ instance.spatial_reference_system and \ instance.spatial_reference_system.auth_name == 'EPSG' and \ instance.spatial_reference_system.srid != 0: point_2d = convert_coordinates_to_point( instance.x, instance.y, srid=instance.spatial_reference_system.srid) if instance.z: point = convert_coordinates_to_point( instance.x, instance.y, instance.z, srid=instance.spatial_reference_system.srid) if point_2d != instance.point_2d or point != instance.point: instance.point = point instance.point_2d = point_2d instance.skip_history_when_saving = True instance.save() return def create_slug(model, name, slug_attr='slug', max_length=100): base_slug = slugify(name) slug = base_slug[:max_length] final_slug = None idx = 1 while not final_slug: if slug and not model.objects.filter(**{slug_attr:slug}).exists(): final_slug = slug break slug = base_slug[:(max_length - 1 - len(str(idx)))] + "-" + str(idx) idx += 1 return final_slug def get_all_field_names(model): return list(set(chain.from_iterable( (field.name, field.attname) if hasattr(field, 'attname') else ( field.name,) for field in model._meta.get_fields() if not (field.many_to_one and field.related_model is None) ))) def get_all_related_m2m_objects_with_model(model): return [ (f, f.model if f.model != model else None) for f in model._meta.get_fields(include_hidden=True) if f.many_to_many and f.auto_created ] def get_all_related_many_to_many_objects(model): return [ f for f in model._meta.get_fields(include_hidden=True) if f.many_to_many and f.auto_created ] def get_all_related_objects(model): return [ f for f in model._meta.get_fields() if (f.one_to_many or f.one_to_one) and f.auto_created and not f.concrete ] def num2col(n): string = "" while n > 0: n, remainder = divmod(n - 1, 26) string = chr(65 + remainder) + string return string def merge_tsvectors(vectors): """ Parse tsvector to merge them in one string :param vectors: list of tsvector string :return: merged tsvector """ result_dict = {} for vector in vectors: if not vector: continue current_position = 0 if result_dict: for key in result_dict: max_position = max(result_dict[key]) if max_position > current_position: current_position = max_position for dct_member in vector.split(" "): splitted = dct_member.split(':') key = ":".join(splitted[:-1]) positions = splitted[-1] key = key[1:-1] # remove quotes positions = [int(pos) + current_position for pos in positions.split(',')] if key in result_dict: result_dict[key] += positions else: result_dict[key] = positions # {'lamelie': [1, 42, 5]} => {'lamelie': "1,42,5"} result_dict = {k: ",".join([str(val) for val in result_dict[k]]) for k in result_dict} # {'lamelie': "1,5", "hagarde": "2", "regarde": "4"} => # "'lamelie':1,5 'hagarde':2 'regarde':4" result = " ".join(["'{}':{}".format(k, result_dict[k]) for k in result_dict]) return result def put_session_message(session_key, message, message_type): session = SessionStore(session_key=session_key) messages = [] if 'messages' in session: messages = session['messages'][:] messages.append((unicode(message), message_type)) session['messages'] = messages session.save() def put_session_var(session_key, key, value): session = SessionStore(session_key=session_key) session[key] = value session.save() def get_session_var(session_key, key): session = SessionStore(session_key=session_key) if key not in session: return return session[key] def clean_session_cache(session): # clean session cache cache_key_list = 'sessionlist-{}'.format(session.session_key) key_list = cache.get(cache_key_list, []) for key in key_list: cache.set(key, None, settings.CACHE_TIMEOUT) cache.set(cache_key_list, [], settings.CACHE_TIMEOUT) def get_field_labels_from_path(model, path): """ :param model: base model :param path: list of attribute starting from the base model :return: list of labels """ labels = [] for key in path: try: field = model._meta.get_field(key) except: labels.append(key) continue if hasattr(field, 'verbose_name'): labels.append(field.verbose_name) else: labels.append(key) return labels def create_default_areas(models=None, verbose=False): # can be used on migrations if models are provided if not models: from ishtar_common.models import Area, Town, Department, State else: Area = models['area'] Town = models['town'] Department = models['department'] State = models['state'] areas = {} idx = 0 for state in State.objects.all(): slug = 'state-' + slugify(state.label) area, created = Area.objects.get_or_create( txt_idx=slug, defaults={'label': state.label}) areas['state-{}'.format(state.pk)] = area if created: idx += 1 if verbose: print("\n* {} state areas added".format(idx)) idx, idx2 = 0, 0 for dep in Department.objects.all(): slug = 'dep-' + slugify(dep.label) area, created = Area.objects.get_or_create( txt_idx=slug, defaults={'label': dep.label}) areas['dep-' + dep.number] = area if created: idx += 1 if not dep.state_id: continue state_slug = 'state-{}'.format(dep.state_id) if state_slug not in areas: continue if area.parent and area.parent.pk == areas[state_slug].pk: continue idx2 += 1 area.parent = areas[state_slug] area.save() if verbose: print( "* {} department areas added with {} associations to state".format( idx, idx2) ) idx = 0 for town in Town.objects.all(): if not town.numero_insee or len(town.numero_insee) != 5: continue code_dep = 'dep-' + town.numero_insee[:2] code_dep_dom = 'dep-' + town.numero_insee[:3] if code_dep in areas: if not areas[code_dep].towns.filter(pk=town.pk).count(): areas[code_dep].towns.add(town) idx += 1 elif code_dep_dom in areas: if not areas[code_dep_dom].towns.filter(pk=town.pk).count(): areas[code_dep_dom].towns.add(town) idx += 1 if verbose: print("* {} town associated to department area".format(idx)) def get_relations_for_graph(rel_model, obj_pk, above_relations=None, equal_relations=None, treated=None, styles=None): """ Get all above and equal relations of an object (get all child and parent relations) :param rel_model: the relation model concerned :param obj_pk: id of an object with relations :param above_relations: list of current above_relations :param equal_relations: list of current equal_relations :param treated: treated relation list to prevent circular call :param styles: current styles :return: above and equal relations list (each containing lists of two members) """ if not above_relations: above_relations = [] if not equal_relations: equal_relations = [] if not treated: treated = [] if not styles: styles = {} if obj_pk in treated: return above_relations, equal_relations, styles treated.append(obj_pk) q = rel_model.objects.filter( left_record_id=obj_pk, relation_type__logical_relation__isnull=False ).values('right_record_id', 'relation_type__logical_relation') if not q.count(): return [], [] for relation in q.all(): logical_relation = relation['relation_type__logical_relation'] right_record = relation['right_record_id'] if not logical_relation: continue elif logical_relation == 'above'and \ (obj_pk, right_record) not in above_relations: above_relations.append((obj_pk, right_record)) elif logical_relation == 'bellow' and \ (right_record, obj_pk) not in above_relations: above_relations.append((right_record, obj_pk)) elif logical_relation == 'equal' and \ (right_record, obj_pk) not in equal_relations and \ (obj_pk, right_record) not in equal_relations: equal_relations.append((obj_pk, right_record)) else: continue ar, er, substyles = get_relations_for_graph( rel_model, right_record, above_relations, equal_relations, treated, styles ) styles.update(substyles) error_style = "color=red" for r in ar: if r not in above_relations: above_relations.append(r) reverse_rel = tuple(reversed(r)) if reverse_rel in above_relations: # circular if r not in styles: styles[r] = [] if reverse_rel not in styles: styles[reverse_rel] = [] if error_style not in styles[r]: styles[r].append(error_style) if error_style not in styles[reverse_rel]: styles[reverse_rel].append(error_style) if r[0] == r[1]: # same entity if r not in styles: styles[r] = [] if error_style not in styles[r]: styles[r].append("color=red") for r in er: if r not in equal_relations: equal_relations.append(r) return above_relations, equal_relations, styles def generate_relation_graph(obj, debug=False): if not settings.DOT_BINARY: return model = obj.__class__ rel_model = model._meta.get_field('right_relations').related_model # get relations above_relations, equal_relations, styles = get_relations_for_graph( rel_model, obj.pk) print(styles) if not above_relations and not equal_relations: obj.relation_image = None obj.save() return # generate dotfile dot_str = "digraph relations {\nnode [shape=box];\n" rel_str = "" described = [] for list, directed in ((above_relations, True), (equal_relations, False)): if directed: rel_str += "subgraph Dir {\n" else: rel_str += "subgraph NoDir {\nedge [dir=none,style=dashed];\n" for left_pk, right_pk in list: if left_pk not in described: described.append(left_pk) left = model.objects.get(pk=left_pk) style = 'label="{}"'.format(left.relation_label) if left.pk == obj.pk: style += ',style=filled,fillcolor="#C6C0C0"' dot_str += u'item{}[{}];\n'.format(left.pk, style) if right_pk not in described: described.append(right_pk) right = model.objects.get(pk=right_pk) style = 'label="{}"'.format(right.relation_label) if right.pk == obj.pk: style += ',style=filled,fillcolor="#C6C0C0"' dot_str += u'item{}[{}];\n'.format(right.pk, style) if not directed: # on the same level rel_str += u"{{rank = same; item{}; item{};}}\n".format( left_pk, right_pk) style = "" if (left_pk, right_pk) in styles: style = " [{}]".format(", ".join(styles[(left_pk, right_pk)])) rel_str += u'item{} -> item{}{};\n'.format(left_pk, right_pk, style) rel_str += "}\n" dot_str += rel_str + "\n}" tempdir = tempfile.mkdtemp("-ishtardot") dot_name = tempdir + os.path.sep + "relations.dot" with open(dot_name, 'w') as dot_file: dot_file.write(dot_str) # execute dot program args = (settings.DOT_BINARY, "-Tsvg", dot_name) svg_tmp_name = tempdir + os.path.sep + "relations.svg" with open(svg_tmp_name, "w") as svg_file: popen = subprocess.Popen(args, stdout=svg_file) popen.wait() with open(svg_tmp_name, "r") as svg_file: django_file = File(svg_file) obj.relation_image.save("relations.svg", django_file, save=True) if debug: print(u"DOT file: {}. Tmp SVG file: {}.".format(dot_name, svg_tmp_name)) return shutil.rmtree(tempdir) def create_default_json_fields(model): """ Create default json field configuration in existing database :param model: model concerned """ from ishtar_common.models import JsonDataField def _get_keys(data, current_path=""): keys = [] for key in data.keys(): if type(data[key]) == dict: keys += _get_keys(data[key], current_path + key + "__") continue keys.append(current_path + key) return keys keys = [] for item in model.objects.all(): for key in _get_keys(item.data): if key not in keys: keys.append(key) content_type = ContentType.objects.get_for_model(model) for key in keys: JsonDataField.objects.get_or_create( content_type=content_type, key=key, defaults={ 'name': u" ".join(key.split('__')).capitalize(), 'value_type': 'T', 'display': False } ) def get_urls_for_model(model, views, own=False, autocomplete=False, ): """ Generate get and show url for a model """ urls = [ url(r'show-{}(?:/(?P.+))?/(?P.+)?$'.format(model.SLUG), check_rights(['view_' + model.SLUG, 'view_own_' + model.SLUG])( getattr(views, 'show_' + model.SLUG)), name="show-" + model.SLUG), url(r'^display-{}/(?P.+)/$'.format(model.SLUG), check_rights(['view_' + model.SLUG, 'view_own_' + model.SLUG])( getattr(views, 'display_' + model.SLUG)), name='display-' + model.SLUG), ] if own: urls += [ url(r'get-{}/own/(?P.+)?$'.format(model.SLUG), check_rights(['view_' + model.SLUG, 'view_own_' + model.SLUG])( getattr(views, 'get_' + model.SLUG)), name="get-own-" + model.SLUG, kwargs={'force_own': True}), ] urls += [ url(r'get-{}/(?P.+)?$'.format(model.SLUG), check_rights(['view_' + model.SLUG, 'view_own_' + model.SLUG])( getattr(views, 'get_' + model.SLUG)), name="get-" + model.SLUG), ] if autocomplete: urls += [ url(r'autocomplete-{}/$'.format(model.SLUG), check_rights(['view_' + model.SLUG, 'view_own_' + model.SLUG])( getattr(views, 'autocomplete_' + model.SLUG)), name='autocomplete-' + model.SLUG), ] return urls def m2m_historization_changed(sender, **kwargs): obj = kwargs.get('instance', None) if not obj: return hist_values = obj.history_m2m or {} for attr in obj.HISTORICAL_M2M: values = [] for value in getattr(obj, attr).all(): if not hasattr(value, "history_compress"): continue values.append(value.history_compress()) hist_values[attr] = values obj.history_m2m = hist_values if getattr(obj, 'skip_history_when_saving', False): # assume the last modifier is good... q = obj.history.filter( history_modifier_id=obj.history_modifier_id, ).order_by('-history_date', '-history_id') if q.count(): hist = q.all()[0] hist.history_m2m = hist_values hist.history_date = hist.last_modified = datetime.datetime.now() hist.save() obj.skip_history_when_saving = True elif not obj.history_modifier: obj.skip_history_when_saving = True obj.save() def max_size_help(): msg = unicode(_(u"The maximum supported file size is {} Mo.")).format( settings.MAX_UPLOAD_SIZE ) return msg def find_all_symlink(dirname): for name in os.listdir(dirname): if name not in (os.curdir, os.pardir): full = os.path.join(dirname, name) if os.path.islink(full): yield full, os.readlink(full) MEDIA_RE = [ re.compile(r"_[a-zA-Z0-9]{7}\-[0-9]"), re.compile(r"_[a-zA-Z0-9]{7}"), ] def simplify_name(full_path_name, check_existing=False): """ Simplify a file name by removing auto save suffixes :param full_path_name: full path name :param check_existing: prevent to give name of an existing file :return: """ name_exp = full_path_name.split(os.sep) path = os.sep.join(name_exp[0:-1]) name = name_exp[-1] current_name = name[:] ext = "" if u"." in name: # remove extension if have one names = name.split(u".") name = u".".join(names[0:-1]) ext = u"." + names[-1] while u"_" in name and len(name) > 15: oldname = name[:] for regex in MEDIA_RE: match = None for m in regex.finditer(name): # get the last match match = m if match: new_name = name.replace(match.group(), '') full_new_name = os.sep.join([path, new_name + ext]) if not check_existing or not os.path.isfile(full_new_name): # do not take the place of another file name = new_name[:] break if oldname == name: break return path, current_name, name + ext def rename_and_simplify_media_name(full_path_name, rename=True): """ Simplify the name if possible :param full_path_name: full path name :param rename: rename file if True (default: True) :return: new full path name (or old if not changed), modified """ if not os.path.exists(full_path_name) or not os.path.isfile(full_path_name): return full_path_name, False path, current_name, name = simplify_name(full_path_name, check_existing=True) if current_name == name: return full_path_name, False full_new_name = os.sep.join([path, name]) if rename: os.rename(full_path_name, full_new_name) return full_new_name, True def get_file_fields(): """ Get all fields which are inherited from FileField """ all_models = apps.get_models() fields = [] for model in all_models: for field in model._meta.get_fields(): if isinstance(field, models.FileField): fields.append(field) return fields def get_used_media(exclude=None, limit=None, return_object_and_field=False, debug=False): """ Get media which are still used in models :param exclude: exclude fields, ex: ['ishtar_common.Import.imported_file', 'ishtar_common.Import.imported_images'] :param limit: limit to some fields :param return_object_and_field: return associated object and field name :return: list of media filename or if return_object_and_field is set to True return (object, file field name, media filename) """ if return_object_and_field: media = [] else: media = set() for field in get_file_fields(): if exclude and unicode(field) in exclude: continue if limit and unicode(field) not in limit: continue is_null = {'%s__isnull' % field.name: True} is_empty = {'%s' % field.name: ''} storage = field.storage if debug: print("") q = field.model.objects.values('id', field.name)\ .exclude(**is_empty).exclude(**is_null) ln = q.count() for idx, res in enumerate(q): value = res[field.name] if debug: sys.stdout.write("* get_used_media {}: {}/{}\r".format( field, idx, ln)) sys.stdout.flush() if value not in EMPTY_VALUES: if return_object_and_field: media.append(( field.model.objects.get(pk=res['id']), field.name, storage.path(value) )) else: media.add(storage.path(value)) return media def get_all_media(exclude=None, debug=False): """ Get all media from MEDIA_ROOT """ if not exclude: exclude = [] media = set() full_dirs = list(os.walk(six.text_type(settings.MEDIA_ROOT))) ln_full = len(full_dirs) for idx_main, full_dir in enumerate(full_dirs): root, dirs, files = full_dir ln = len(files) if debug: print("") for idx, name in enumerate(files): if debug: sys.stdout.write("* get_all_media {} ({}/{}): {}/{}\r".format( root.encode('utf-8'), idx_main, ln_full, idx, ln)) sys.stdout.flush() path = os.path.abspath(os.path.join(root, name)) relpath = os.path.relpath(path, settings.MEDIA_ROOT) in_exclude = False for e in exclude: if re.match(r'^%s$' % re.escape(e).replace('\\*', '.*'), relpath): in_exclude = True break if not in_exclude: media.add(path) else: if debug: sys.stdout.write("* get_all_media {} ({}/{})\r".format( root.encode('utf-8'), idx_main, ln_full)) return media def get_unused_media(exclude=None): """ Get media which are not used in models """ if not exclude: exclude = [] all_media = get_all_media(exclude) used_media = get_used_media() return [x for x in all_media if x not in used_media] def remove_unused_media(): """ Remove unused media """ remove_media(get_unused_media()) def remove_media(files): """ Delete file from media dir """ for filename in files: os.remove(os.path.join(settings.MEDIA_ROOT, filename)) def remove_empty_dirs(path=None): """ Recursively delete empty directories; return True if everything was deleted. """ if not path: path = settings.MEDIA_ROOT if not os.path.isdir(path): return False listdir = [os.path.join(path, filename) for filename in os.listdir(path)] if all(list(map(remove_empty_dirs, listdir))): os.rmdir(path) return True else: return False def try_fix_file(filename, make_copy=True): """ Try to find a file with a similar name on the same dir. :param filename: filename (full path) :param make_copy: make the copy of the similar file found :return: name of the similar file found or None """ path, current_name, simplified_ref_name = simplify_name( filename, check_existing=False) # check existing files in the path for file in sorted(list(os.listdir(path))): full_file = os.sep.join([path, file]) if not os.path.isfile(full_file): # must be a file continue _, _, name = simplify_name(full_file, check_existing=False) if simplified_ref_name.lower() == name.lower(): # a candidate is found if make_copy: shutil.copy2(full_file, filename) return file