summaryrefslogtreecommitdiff
path: root/ishtar_common/utils.py
diff options
context:
space:
mode:
authorÉtienne Loks <etienne.loks@iggdrasil.net>2019-01-27 13:21:12 +0100
committerÉtienne Loks <etienne.loks@iggdrasil.net>2019-01-27 13:21:12 +0100
commit45180c1f4f2e75b38b57d581f454b170ca68cfe1 (patch)
tree62b31735a99e8e60655e3bbe1bc0956294a4652c /ishtar_common/utils.py
parentab651277d8f70c9930481b1b578a762c7eedd1b3 (diff)
downloadIshtar-45180c1f4f2e75b38b57d581f454b170ca68cfe1.tar.bz2
Ishtar-45180c1f4f2e75b38b57d581f454b170ca68cfe1.zip
Tools to manage media files.
* clean unused * find missing * rename and simplify
Diffstat (limited to 'ishtar_common/utils.py')
-rw-r--r--ishtar_common/utils.py251
1 files changed, 251 insertions, 0 deletions
diff --git a/ishtar_common/utils.py b/ishtar_common/utils.py
index f2fe34631..2156d4f95 100644
--- a/ishtar_common/utils.py
+++ b/ishtar_common/utils.py
@@ -24,12 +24,16 @@ from itertools import chain
import hashlib
import os
import random
+import re
import requests
import shutil
+import six
import subprocess
+import sys
import tempfile
from django import forms
+from django.apps import apps
from django.conf import settings
from django.conf.urls import url
from django.contrib.contenttypes.models import ContentType
@@ -37,7 +41,9 @@ from django.contrib.gis.geos import GEOSGeometry
from django.contrib.sessions.backends.db import SessionStore
from django.core.cache import cache
from django.core.files import File
+from django.core.validators import EMPTY_VALUES
from django.core.urlresolvers import reverse
+from django.db import models
from django.http import HttpResponseRedirect
from django.utils.datastructures import MultiValueDict as BaseMultiValueDict
from django.utils.safestring import mark_safe
@@ -966,3 +972,248 @@ def max_size_help():
settings.MAX_UPLOAD_SIZE
)
return msg
+
+
+def find_all_symlink(dirname):
+ for name in os.listdir(dirname):
+ if name not in (os.curdir, os.pardir):
+ full = os.path.join(dirname, name)
+ if os.path.islink(full):
+ yield full, os.readlink(full)
+
+
+MEDIA_RE = [
+ re.compile(r"_[a-zA-Z0-9]{7}\-[0-9]"),
+ re.compile(r"_[a-zA-Z0-9]{7}"),
+]
+
+
+def simplify_name(full_path_name, check_existing=False):
+ """
+ Simplify a file name by removing auto save suffixes
+
+ :param full_path_name: full path name
+ :param check_existing: prevent to give name of an existing file
+ :return:
+ """
+ name_exp = full_path_name.split(os.sep)
+ path = os.sep.join(name_exp[0:-1])
+ name = name_exp[-1]
+ current_name = name[:]
+ ext = ""
+ if u"." in name: # remove extension if have one
+ names = name.split(u".")
+ name = u".".join(names[0:-1])
+ ext = u"." + names[-1]
+
+ while u"_" in name and len(name) > 15:
+ oldname = name[:]
+ for regex in MEDIA_RE:
+ match = None
+ for m in regex.finditer(name): # get the last match
+ match = m
+ if match:
+ new_name = name.replace(match.group(), '')
+ full_new_name = os.sep.join([path, new_name + ext])
+ if not check_existing or not os.path.isfile(full_new_name):
+ # do not take the place of another file
+ name = new_name[:]
+ break
+ if oldname == name:
+ break
+ return path, current_name, name + ext
+
+
+def rename_and_simplify_media_name(full_path_name, rename=True):
+ """
+ Simplify the name if possible
+ :param full_path_name: full path name
+ :param rename: rename file if True (default: True)
+ :return: new full path name (or old if not changed), modified
+ """
+ if not os.path.exists(full_path_name) or not os.path.isfile(full_path_name):
+ return full_path_name, False
+
+ path, current_name, name = simplify_name(full_path_name,
+ check_existing=True)
+ if current_name == name:
+ return full_path_name, False
+
+ full_new_name = os.sep.join([path, name])
+ if rename:
+ os.rename(full_path_name, full_new_name)
+ return full_new_name, True
+
+
+def get_file_fields():
+ """
+ Get all fields which are inherited from FileField
+ """
+ all_models = apps.get_models()
+ fields = []
+ for model in all_models:
+ for field in model._meta.get_fields():
+ if isinstance(field, models.FileField):
+ fields.append(field)
+ return fields
+
+
+def get_used_media(exclude=None, limit=None,
+ return_object_and_field=False, debug=False):
+ """
+ Get media which are still used in models
+
+ :param exclude: exclude fields, ex: ['ishtar_common.Import.imported_file',
+ 'ishtar_common.Import.imported_images']
+ :param limit: limit to some fields
+ :param return_object_and_field: return associated object and field name
+ :return: list of media filename or if return_object_and_field is set to
+ True return (object, file field name, media filename)
+ """
+
+ if return_object_and_field:
+ media = []
+ else:
+ media = set()
+ for field in get_file_fields():
+ if exclude and unicode(field) in exclude:
+ continue
+ if limit and unicode(field) not in limit:
+ continue
+ is_null = {'%s__isnull' % field.name: True}
+ is_empty = {'%s' % field.name: ''}
+
+ storage = field.storage
+ if debug:
+ print("")
+ q = field.model.objects.values('id', field.name)\
+ .exclude(**is_empty).exclude(**is_null)
+ ln = q.count()
+ for idx, res in enumerate(q):
+ value = res[field.name]
+ if debug:
+ sys.stdout.write("* get_used_media {}: {}/{}\r".format(
+ field, idx, ln))
+ sys.stdout.flush()
+ if value not in EMPTY_VALUES:
+ if return_object_and_field:
+ media.append((
+ field.model.objects.get(pk=res['id']),
+ field.name,
+ storage.path(value)
+ ))
+ else:
+ media.add(storage.path(value))
+ return media
+
+
+def get_all_media(exclude=None, debug=False):
+ """
+ Get all media from MEDIA_ROOT
+ """
+
+ if not exclude:
+ exclude = []
+
+ media = set()
+ full_dirs = list(os.walk(six.text_type(settings.MEDIA_ROOT)))
+ ln_full = len(full_dirs)
+ for idx_main, full_dir in enumerate(full_dirs):
+ root, dirs, files = full_dir
+ ln = len(files)
+ if debug:
+ print("")
+ for idx, name in enumerate(files):
+ if debug:
+ sys.stdout.write("* get_all_media {} ({}/{}): {}/{}\r".format(
+ root.encode('utf-8'), idx_main, ln_full, idx, ln))
+ sys.stdout.flush()
+ path = os.path.abspath(os.path.join(root, name))
+ relpath = os.path.relpath(path, settings.MEDIA_ROOT)
+ in_exclude = False
+ for e in exclude:
+ if re.match(r'^%s$' % re.escape(e).replace('\\*', '.*'),
+ relpath):
+ in_exclude = True
+ break
+
+ if not in_exclude:
+ media.add(path)
+ else:
+ if debug:
+ sys.stdout.write("* get_all_media {} ({}/{})\r".format(
+ root.encode('utf-8'), idx_main, ln_full))
+ return media
+
+
+def get_unused_media(exclude=None):
+ """
+ Get media which are not used in models
+ """
+
+ if not exclude:
+ exclude = []
+
+ all_media = get_all_media(exclude)
+ used_media = get_used_media()
+
+ return [x for x in all_media if x not in used_media]
+
+
+def remove_unused_media():
+ """
+ Remove unused media
+ """
+ remove_media(get_unused_media())
+
+
+def remove_media(files):
+ """
+ Delete file from media dir
+ """
+ for filename in files:
+ os.remove(os.path.join(settings.MEDIA_ROOT, filename))
+
+
+def remove_empty_dirs(path=None):
+ """
+ Recursively delete empty directories; return True if everything was deleted.
+ """
+
+ if not path:
+ path = settings.MEDIA_ROOT
+
+ if not os.path.isdir(path):
+ return False
+
+ listdir = [os.path.join(path, filename) for filename in os.listdir(path)]
+
+ if all(list(map(remove_empty_dirs, listdir))):
+ os.rmdir(path)
+ return True
+ else:
+ return False
+
+
+def try_fix_file(filename, make_copy=True):
+ """
+ Try to find a file with a similar name on the same dir.
+
+ :param filename: filename (full path)
+ :param make_copy: make the copy of the similar file found
+ :return: name of the similar file found or None
+ """
+ path, current_name, simplified_ref_name = simplify_name(
+ filename, check_existing=False)
+
+ # check existing files in the path
+ for file in sorted(list(os.listdir(path))):
+ full_file = os.sep.join([path, file])
+ if not os.path.isfile(full_file): # must be a file
+ continue
+ _, _, name = simplify_name(full_file, check_existing=False)
+ if simplified_ref_name.lower() == name.lower():
+ # a candidate is found
+ if make_copy:
+ shutil.copy2(full_file, filename)
+ return file