diff options
author | Étienne Loks <etienne.loks@iggdrasil.net> | 2019-01-27 13:21:12 +0100 |
---|---|---|
committer | Étienne Loks <etienne.loks@iggdrasil.net> | 2019-01-27 13:21:12 +0100 |
commit | 45180c1f4f2e75b38b57d581f454b170ca68cfe1 (patch) | |
tree | 62b31735a99e8e60655e3bbe1bc0956294a4652c /ishtar_common/utils.py | |
parent | ab651277d8f70c9930481b1b578a762c7eedd1b3 (diff) | |
download | Ishtar-45180c1f4f2e75b38b57d581f454b170ca68cfe1.tar.bz2 Ishtar-45180c1f4f2e75b38b57d581f454b170ca68cfe1.zip |
Tools to manage media files.
* clean unused
* find missing
* rename and simplify
Diffstat (limited to 'ishtar_common/utils.py')
-rw-r--r-- | ishtar_common/utils.py | 251 |
1 files changed, 251 insertions, 0 deletions
diff --git a/ishtar_common/utils.py b/ishtar_common/utils.py index f2fe34631..2156d4f95 100644 --- a/ishtar_common/utils.py +++ b/ishtar_common/utils.py @@ -24,12 +24,16 @@ from itertools import chain import hashlib import os import random +import re import requests import shutil +import six import subprocess +import sys import tempfile from django import forms +from django.apps import apps from django.conf import settings from django.conf.urls import url from django.contrib.contenttypes.models import ContentType @@ -37,7 +41,9 @@ from django.contrib.gis.geos import GEOSGeometry from django.contrib.sessions.backends.db import SessionStore from django.core.cache import cache from django.core.files import File +from django.core.validators import EMPTY_VALUES from django.core.urlresolvers import reverse +from django.db import models from django.http import HttpResponseRedirect from django.utils.datastructures import MultiValueDict as BaseMultiValueDict from django.utils.safestring import mark_safe @@ -966,3 +972,248 @@ def max_size_help(): settings.MAX_UPLOAD_SIZE ) return msg + + +def find_all_symlink(dirname): + for name in os.listdir(dirname): + if name not in (os.curdir, os.pardir): + full = os.path.join(dirname, name) + if os.path.islink(full): + yield full, os.readlink(full) + + +MEDIA_RE = [ + re.compile(r"_[a-zA-Z0-9]{7}\-[0-9]"), + re.compile(r"_[a-zA-Z0-9]{7}"), +] + + +def simplify_name(full_path_name, check_existing=False): + """ + Simplify a file name by removing auto save suffixes + + :param full_path_name: full path name + :param check_existing: prevent to give name of an existing file + :return: + """ + name_exp = full_path_name.split(os.sep) + path = os.sep.join(name_exp[0:-1]) + name = name_exp[-1] + current_name = name[:] + ext = "" + if u"." in name: # remove extension if have one + names = name.split(u".") + name = u".".join(names[0:-1]) + ext = u"." + names[-1] + + while u"_" in name and len(name) > 15: + oldname = name[:] + for regex in MEDIA_RE: + match = None + for m in regex.finditer(name): # get the last match + match = m + if match: + new_name = name.replace(match.group(), '') + full_new_name = os.sep.join([path, new_name + ext]) + if not check_existing or not os.path.isfile(full_new_name): + # do not take the place of another file + name = new_name[:] + break + if oldname == name: + break + return path, current_name, name + ext + + +def rename_and_simplify_media_name(full_path_name, rename=True): + """ + Simplify the name if possible + :param full_path_name: full path name + :param rename: rename file if True (default: True) + :return: new full path name (or old if not changed), modified + """ + if not os.path.exists(full_path_name) or not os.path.isfile(full_path_name): + return full_path_name, False + + path, current_name, name = simplify_name(full_path_name, + check_existing=True) + if current_name == name: + return full_path_name, False + + full_new_name = os.sep.join([path, name]) + if rename: + os.rename(full_path_name, full_new_name) + return full_new_name, True + + +def get_file_fields(): + """ + Get all fields which are inherited from FileField + """ + all_models = apps.get_models() + fields = [] + for model in all_models: + for field in model._meta.get_fields(): + if isinstance(field, models.FileField): + fields.append(field) + return fields + + +def get_used_media(exclude=None, limit=None, + return_object_and_field=False, debug=False): + """ + Get media which are still used in models + + :param exclude: exclude fields, ex: ['ishtar_common.Import.imported_file', + 'ishtar_common.Import.imported_images'] + :param limit: limit to some fields + :param return_object_and_field: return associated object and field name + :return: list of media filename or if return_object_and_field is set to + True return (object, file field name, media filename) + """ + + if return_object_and_field: + media = [] + else: + media = set() + for field in get_file_fields(): + if exclude and unicode(field) in exclude: + continue + if limit and unicode(field) not in limit: + continue + is_null = {'%s__isnull' % field.name: True} + is_empty = {'%s' % field.name: ''} + + storage = field.storage + if debug: + print("") + q = field.model.objects.values('id', field.name)\ + .exclude(**is_empty).exclude(**is_null) + ln = q.count() + for idx, res in enumerate(q): + value = res[field.name] + if debug: + sys.stdout.write("* get_used_media {}: {}/{}\r".format( + field, idx, ln)) + sys.stdout.flush() + if value not in EMPTY_VALUES: + if return_object_and_field: + media.append(( + field.model.objects.get(pk=res['id']), + field.name, + storage.path(value) + )) + else: + media.add(storage.path(value)) + return media + + +def get_all_media(exclude=None, debug=False): + """ + Get all media from MEDIA_ROOT + """ + + if not exclude: + exclude = [] + + media = set() + full_dirs = list(os.walk(six.text_type(settings.MEDIA_ROOT))) + ln_full = len(full_dirs) + for idx_main, full_dir in enumerate(full_dirs): + root, dirs, files = full_dir + ln = len(files) + if debug: + print("") + for idx, name in enumerate(files): + if debug: + sys.stdout.write("* get_all_media {} ({}/{}): {}/{}\r".format( + root.encode('utf-8'), idx_main, ln_full, idx, ln)) + sys.stdout.flush() + path = os.path.abspath(os.path.join(root, name)) + relpath = os.path.relpath(path, settings.MEDIA_ROOT) + in_exclude = False + for e in exclude: + if re.match(r'^%s$' % re.escape(e).replace('\\*', '.*'), + relpath): + in_exclude = True + break + + if not in_exclude: + media.add(path) + else: + if debug: + sys.stdout.write("* get_all_media {} ({}/{})\r".format( + root.encode('utf-8'), idx_main, ln_full)) + return media + + +def get_unused_media(exclude=None): + """ + Get media which are not used in models + """ + + if not exclude: + exclude = [] + + all_media = get_all_media(exclude) + used_media = get_used_media() + + return [x for x in all_media if x not in used_media] + + +def remove_unused_media(): + """ + Remove unused media + """ + remove_media(get_unused_media()) + + +def remove_media(files): + """ + Delete file from media dir + """ + for filename in files: + os.remove(os.path.join(settings.MEDIA_ROOT, filename)) + + +def remove_empty_dirs(path=None): + """ + Recursively delete empty directories; return True if everything was deleted. + """ + + if not path: + path = settings.MEDIA_ROOT + + if not os.path.isdir(path): + return False + + listdir = [os.path.join(path, filename) for filename in os.listdir(path)] + + if all(list(map(remove_empty_dirs, listdir))): + os.rmdir(path) + return True + else: + return False + + +def try_fix_file(filename, make_copy=True): + """ + Try to find a file with a similar name on the same dir. + + :param filename: filename (full path) + :param make_copy: make the copy of the similar file found + :return: name of the similar file found or None + """ + path, current_name, simplified_ref_name = simplify_name( + filename, check_existing=False) + + # check existing files in the path + for file in sorted(list(os.listdir(path))): + full_file = os.sep.join([path, file]) + if not os.path.isfile(full_file): # must be a file + continue + _, _, name = simplify_name(full_file, check_existing=False) + if simplified_ref_name.lower() == name.lower(): + # a candidate is found + if make_copy: + shutil.copy2(full_file, filename) + return file |