#!/usr/bin/env python # -*- coding: utf-8 -*- # Copyright (C) 2015-2017 Étienne Loks # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . # See the file COPYING for details. from bs4 import BeautifulSoup as Soup import csv import datetime import importlib import io import json import os import shutil import tempfile import zipfile from io import StringIO from django.apps import apps from django.conf import settings from django.contrib.auth.models import User, Permission from django.contrib.contenttypes.models import ContentType from django.core.cache import cache from django.core.exceptions import ValidationError from django.core.files import File as DjangoFile from django.core.files.uploadedfile import SimpleUploadedFile from django.core.management import call_command from django.core.urlresolvers import reverse from django.db.models.fields import BooleanField from django.db.models.fields.related import ForeignKey from django.template.defaultfilters import slugify from django.test import TestCase as BaseTestCase from django.test.client import Client from django.test.runner import DiscoverRunner from ishtar_common import models, models_common from ishtar_common import views from ishtar_common.apps import admin_site from ishtar_common.serializers import type_serialization, \ SERIALIZATION_VERSION, \ restore_serialized, conf_serialization, CONF_MODEL_LIST, \ importer_serialization, IMPORT_MODEL_LIST, geo_serialization, \ GEO_MODEL_LIST, directory_serialization, DIRECTORY_MODEL_LIST, \ document_serialization, get_type_models, full_serialization from archaeological_operations.serializers import OPERATION_MODEL_LIST from archaeological_context_records.serializers import CR_MODEL_LIST from archaeological_finds.serializers import FIND_MODEL_LIST from archaeological_warehouse.serializers import WAREHOUSE_MODEL_LIST from ishtar_common.serializers_utils import serialization_info from ishtar_common.utils import post_save_geo, update_data, move_dict_data, \ rename_and_simplify_media_name, try_fix_file from ishtar_common.tasks import launch_export COMMON_FIXTURES = [ settings.ROOT_PATH + '../fixtures/initial_data-auth-fr.json', settings.ROOT_PATH + '../ishtar_common/fixtures/initial_data-fr.json', settings.ROOT_PATH + '../ishtar_common/fixtures/initial_spatialrefsystem-fr.json', settings.ROOT_PATH + '../ishtar_common/fixtures/initial_importtypes-fr.json', ] OPERATION_FIXTURES = COMMON_FIXTURES + [ settings.ROOT_PATH + '../archaeological_operations/fixtures/initial_data-fr.json', settings.ROOT_PATH + '../archaeological_operations/fixtures/initial_data_relation_type_norel-fr.json', settings.ROOT_PATH + '../archaeological_operations/fixtures/initial_data_relation_type-fr.json', ] OPERATION_TOWNS_FIXTURES = \ OPERATION_FIXTURES + \ [settings.ROOT_PATH + '../ishtar_common/fixtures/test_towns.json'] FILE_FIXTURES = OPERATION_FIXTURES + [ settings.ROOT_PATH + '../archaeological_files/fixtures/initial_data-fr.json'] FILE_TOWNS_FIXTURES = OPERATION_TOWNS_FIXTURES + [ settings.ROOT_PATH + '../archaeological_files/fixtures/initial_data-fr.json'] CONTEXT_RECORD_FIXTURES = FILE_FIXTURES + [ settings.ROOT_PATH + '../archaeological_context_records/fixtures/initial_data-fr.json', settings.ROOT_PATH + '../archaeological_context_records/fixtures/initial_data_relation_type_norel-fr.json', settings.ROOT_PATH + '../archaeological_context_records/fixtures/initial_data_relation_type-fr.json', ] CONTEXT_RECORD_TOWNS_FIXTURES = FILE_TOWNS_FIXTURES + [ settings.ROOT_PATH + '../archaeological_context_records/fixtures/initial_data-fr.json', settings.ROOT_PATH + '../archaeological_context_records/fixtures/initial_data_relation_type_norel-fr.json', settings.ROOT_PATH + '../archaeological_context_records/fixtures/initial_data_relation_type-fr.json', ] FIND_FIXTURES = CONTEXT_RECORD_FIXTURES + [ settings.ROOT_PATH + '../archaeological_finds/fixtures/initial_data-fr.json', ] FIND_TOWNS_FIXTURES = CONTEXT_RECORD_TOWNS_FIXTURES + [ settings.ROOT_PATH + '../archaeological_finds/fixtures/initial_data-fr.json', ] WAREHOUSE_FIXTURES = FIND_FIXTURES + [ settings.ROOT_PATH + '../archaeological_warehouse/fixtures/initial_data-fr.json', ] def create_superuser(): username = 'username4277' password = 'dcbqj756456!@%' q = User.objects.filter(username=username) if q.count(): return username, password, q.all()[0] user = User.objects.create_superuser(username, "nomail@nomail.com", password) user.set_password(password) user.save() return username, password, user def create_user(username='username678', password='dcbqj756aaa456!@%'): q = User.objects.filter(username=username) if q.count(): return username, password, q.all()[0] user = User.objects.create_user(username, email="nomail2@nomail.com") user.set_password(password) user.save() return username, password, user class TestCase(BaseTestCase): pass class UtilsTest(TestCase): def test_update_data(self): data_1 = { 'old': {'youpi': {'plouf': u'tralalalère'}} } data_2 = { 'tsoin_tsoin': 'hop', 'old': {'hoppy': 'hop'} } expected = { 'tsoin_tsoin': 'hop', 'old': {'youpi': {'plouf': u'tralalalère'}, 'hoppy': 'hop'} } res = update_data(data_1, data_2) self.assertEqual(res, expected) def test_move_dict_data(self): data = { 'old': {u'daté': "value"} } expected = { 'old': {'date': "value"} } res = move_dict_data( data, u'data__old__daté', u"data__old__date") self.assertEqual(res, expected) data = { '': {'hop': "value"} } expected = { 'hop': "value", '': {} } res = move_dict_data(data, u'data____hop', u"data__hop") self.assertEqual(res, expected) data = { 'old': { 'traitement': { u'constat_état': 'aaa' } } } expected = { 'old': { 'traitement': { 'constat_etat': 'aaa' } } } res = move_dict_data(data, u'data__old__traitement__constat_état', u'data__old__traitement__constat_etat') self.assertEqual(res, expected) def test_tinyfy_url(self): for idx in range(65): # > 62 to test with 2 letters base_url = "https://example.com#{}".format(idx) tiny_url = models.TinyUrl() tiny_url.link = base_url tiny_url.save() short_id = tiny_url.get_short_id() db_id = models.TinyUrl.decode_id(short_id) self.assertEqual(tiny_url.pk, db_id) ty = models.TinyUrl.objects.get(id=db_id) self.assertEqual(base_url, ty.link) c = Client() response = c.get(reverse('tiny-redirect', args=[short_id])) self.assertEqual(response['Location'], base_url) self.assertEqual(response.status_code, 302) class SearchText: SEARCH_URL = '' def _test_search(self, client, result, context=""): assert self.SEARCH_URL for q, expected_result in result: search = {'search_vector': q} response = client.get(reverse(self.SEARCH_URL), search) self.assertEqual(response.status_code, 200) res = json.loads(response.content.decode()) msg = "{} result(s) where expected for search: {} - found {}" \ "".format(expected_result, q, res['recordsTotal']) if context: msg = context + " - " + msg self.assertEqual(res['recordsTotal'], expected_result, msg=msg) class CommandsTestCase(TestCase): fixtures = [settings.ROOT_PATH + '../ishtar_common/fixtures/test_towns.json'] def test_clean_ishtar(self): """ Clean ishtar db """ from archaeological_operations.models import Parcel p = Parcel.objects.create( town=models.Town.objects.create(name='test', numero_insee='25000'), ) parcel_nb = Parcel.objects.count() out = StringIO() call_command('clean_ishtar', stdout=out) # no operation or file attached - the parcel should have disappear self.assertEqual(parcel_nb - 1, Parcel.objects.count()) self.assertEqual(Parcel.objects.filter(pk=p.pk).count(), 0) def test_import_geofla(self): q = models.Town.objects town_nb = q.count() out = StringIO() call_command('import_geofla_csv', settings.ROOT_PATH + '../ishtar_common/tests/geofla-test.csv', '--quiet', stdout=out) self.assertEqual(town_nb + 9, models.Town.objects.count()) call_command('import_geofla_csv', settings.ROOT_PATH + '../ishtar_common/tests/geofla-test.csv', '--quiet', stdout=out) # no new town self.assertEqual(town_nb + 9, models.Town.objects.count()) def test_import_insee(self): q = models.Town.objects town_nb = q.count() first, union_start, union_end = '', '', [] new_nums = [] for idx, town in enumerate(q.all()): x1 = float(idx) / 10 if not x1: x1 = 0 x2 = float(idx) / 10 + 0.1 l = 'MULTIPOLYGON((({x1} 0.1,{x2} 0.1,{x2} 0,{x1} 0,' \ '{x1} 0.1)))'.format(x1=x1, x2=x2) if union_start: union_start += ", " else: first = '{x1} 0.1'.format(x1=x1) union_start += '{x2} 0.1'.format(x1=x1, x2=x2) union_end.append('{x2} 0'.format(x1=x1, x2=x2)) town.limit = l town.year = 1792 new_nums.append("{}-{}".format(town.numero_insee, town.year)) town.save() union = 'MULTIPOLYGON (((' + first + ", " + union_start + \ ", " + ", ".join(reversed(union_end)) + ", 0 0, " + first + ")))" out = StringIO() call_command('import_insee_comm_csv', settings.ROOT_PATH + '../ishtar_common/tests/insee-test.csv', '--quiet', stdout=out) self.assertEqual(town_nb + 1, models.Town.objects.count()) new = models.Town.objects.order_by('-pk').all()[0] self.assertEqual(new.parents.count(), 2) self.assertEqual(new.limit.wkt, union) call_command('import_insee_comm_csv', '--quiet', settings.ROOT_PATH + '../ishtar_common/tests/insee-test.csv', stdout=out) # no new town self.assertEqual(town_nb + 1, models.Town.objects.count()) for parent_town in new.parents.all(): self.assertIn(parent_town.numero_insee, new_nums) class WizardTestFormData(object): """ Test set to simulate wizard steps """ def __init__(self, name, form_datas=None, ignored=None, pre_tests=None, extra_tests=None, error_expected=None): """ :param name: explicit name of the test :param form_datas: dict with data for each step - dict key are wizard step name :param ignored: steps to be ignored in wizard processing :param pre_tests: list of function to be executed before the wizard :param extra_tests: list of extra tests. These tests must be functions accepting two parameters: the current test object and the final step response :param error_expected: step where an error is expected """ self.name = name self.form_datas = form_datas or {} self.ignored = ignored[:] if ignored else [] self.pre_tests = pre_tests or [] self.extra_tests = extra_tests or [] self.error_expected = error_expected def set(self, form_name, field_name, value): """ Set data value. :param form_name: form name without wizard name :param field_name: field name :param value: value :return: None """ if form_name not in self.form_datas: self.form_datas[form_name] = {} self.form_datas[form_name][field_name] = value def append(self, form_name, value): """ Add data value to formset. :param form_name: form name without wizard name :param value: value :return: None """ if form_name not in self.form_datas: self.form_datas[form_name] = [] self.form_datas[form_name].append(value) def inits(self, test_object): """ Initialisations before the wizard. """ suffix = '-' + test_object.url_name # if form names are defined without url_name fix it for form_name in list(self.form_datas.keys()): if suffix in form_name: continue self.form_datas[form_name + suffix] = self.form_datas.pop(form_name) if self.error_expected and suffix not in self.error_expected: self.error_expected += suffix for pre in self.pre_tests: pre(test_object) def tests(self, test_object, final_step_response): """ Specific tests for these data. Raise Exception if not OK. """ for test in self.extra_tests: test(test_object, final_step_response) class ManagedModelTestRunner(DiscoverRunner): """ Test runner that automatically makes all unmanaged models in your Django project managed for the duration of the test run, so that one doesn't need to execute the SQL manually to create them. """ def setup_test_environment(self, *args, **kwargs): from django.apps import apps self.unmanaged_models = [m for m in apps.get_models() if not m._meta.managed] for m in self.unmanaged_models: m._meta.managed = True super(ManagedModelTestRunner, self).setup_test_environment(*args, **kwargs) def teardown_test_environment(self, *args, **kwargs): super(ManagedModelTestRunner, self).teardown_test_environment(*args, **kwargs) # reset unmanaged models for m in self.unmanaged_models: m._meta.managed = False class WizardTest(object): url_name = None url_uri = None wizard_name = '' steps = None condition_dict = None form_datas = [] test_back = True redirect_url = None model = None current_id_key = "pk" def setUp(self): self.username, self.password, self.user = create_superuser() def pre_wizard(self): self.client.login(**{'username': self.username, 'password': self.password}) def post_wizard(self): pass def pass_test(self): return False def check_response(self, response, current_step, data_idx): if "errorlist" in response.content.decode(): soup = Soup(response.content.decode(), "lxml") errorlist = soup.findAll( "ul", {"class": "errorlist"}) errors = [] for li in errorlist: lbl = li.findParent().findParent().findChild().text errors.append("{} - {}".format(lbl, li.text)) raise ValidationError("Errors: {} on {} - dataset {}.".format( " ".join(errors), current_step, data_idx + 1)) @classmethod def wizard_post(cls, client, url, current_step, form_data=None, follow=True, extra_data=None): if not url: url = reverse(cls.url_name) data = { '{}{}-current_step'.format(cls.url_name, cls.wizard_name): [current_step], } if not form_data: form_data = [] # reconstruct a POST request if type(form_data) in (list, tuple): # is a formset for d_idx, item in enumerate(form_data): for k in item: data['{}-{}-{}'.format( current_step, d_idx, k)] = item[k] else: for k in form_data: data['{}-{}'.format(current_step, k)] = form_data[k] if extra_data: data.update(extra_data) try: response = client.post(url, data, follow=follow) except ValidationError as e: msg = "Errors: {} on {}. On \"ManagementForm data is " \ "missing or...\" error verify the wizard_name or " \ "step name".format(" - ".join(e.messages), current_step) raise ValidationError(msg) return response def wizard_post_test(self, idx, data_idx, url, current_step, form_data, test_form_data, ignored): next_form_is_checked = len(self.steps) > idx + 1 and \ self.steps[idx + 1][0] not in ignored data = [] if current_step in form_data: data = form_data[current_step] response = self.wizard_post( self.client, url, current_step, data, not next_form_is_checked) if current_step == test_form_data.error_expected: with self.assertRaises(ValidationError): self.check_response(response, current_step, data_idx) else: self.check_response(response, current_step, data_idx) if next_form_is_checked: next_form = self.steps[idx + 1][0] self.assertRedirects( response, '/{}/{}'.format(self.url_uri, next_form), msg_prefix="Dataset n{} Redirection to {} has failed - " "Error on previous form ({})?".format( data_idx + 1, next_form, current_step) ) if idx == len(self.steps) - 1: # last form if not self.redirect_url: redirect_url = '/{}/done'.format(self.url_uri) else: dct = { "url_name": self.url_name, "url_uri": self.url_uri } form_key = 'selec-' + self.url_name if form_key in form_data and self.current_id_key in form_data[ form_key]: dct["current_id"] = form_data[form_key][self.current_id_key] if self.model: q = self.model.objects if q.count(): dct["last_id"] = q.order_by("-pk").all()[0].pk redirect_url = self.redirect_url.format(**dct) self.assertRedirects(response, redirect_url) return response def test_wizard(self): if self.pass_test(): return url = reverse(self.url_name) if not self.url_uri: self.url_uri = self.url_name self.pre_wizard() for data_idx, test_form_data in enumerate(self.form_datas): test_form_data.inits(self) form_data = test_form_data.form_datas ignored = test_form_data.ignored previous_step, back_tested, response = None, False, None for idx, step in enumerate(self.steps): current_step, current_form = step if current_step in ignored: continue if not previous_step: previous_step = idx elif self.test_back and not back_tested: # test going back on a form response = self.wizard_post( self.client, url, current_step, None, extra_data={"form_prev_step": previous_step} ) self.assertEqual(response.status_code, 200) back_tested = True response = self.wizard_post_test( idx, data_idx, url, current_step, form_data, test_form_data, ignored) test_form_data.tests(self, response) self.post_wizard() class CacheTest(TestCase): fixtures = [settings.ROOT_PATH + '../fixtures/initial_data-auth-fr.json', settings.ROOT_PATH + '../ishtar_common/fixtures/initial_data-fr.json',] def test_add(self): models.OrganizationType.refresh_cache() cached = models.OrganizationType.get_cache('test') self.assertEqual(cached, None) orga = models.OrganizationType.objects.create( txt_idx='test', label='testy') cached = models.OrganizationType.get_cache('test') self.assertEqual(cached.pk, orga.pk) orga.txt_idx = 'testy' orga.save() cached = models.OrganizationType.get_cache('testy') self.assertEqual(cached.pk, orga.pk) def test_list(self): models.OrganizationType.refresh_cache() types = models.OrganizationType.get_types() # only empty self.assertTrue(len(types), 1) org = models.OrganizationType.objects.create( txt_idx='test', label='testy') types = [ str(lbl) for idx, lbl in models.OrganizationType.get_types()] self.assertTrue('testy' in types) org.delete() types = [ str(lbl) for idx, lbl in models.OrganizationType.get_types()] self.assertFalse('testy' in types) def test_menu_cache(self): admin_username = 'username4277' admin_password = 'dcbqj756456!@%' User.objects.create_superuser(admin_username, "nomail@nomail.com", admin_password) readonly_user = 'username4456' readonly_password = 'xshqu744456!@%' user = User.objects.create_user(readonly_user, "nomail@nomail.com", readonly_password) ishtuser = models.IshtarUser.objects.get(user_ptr=user) models.UserProfile.objects.get_or_create( current=True, person=ishtuser.person, profile_type=models.ProfileType.objects.get( txt_idx='reader_access')) c = Client() c.login(username=admin_username, password=admin_password) response = c.get("/operation_search/general-operation_search") self.assertIn(b'href="/operation_modification/', response.content) # the cache menu must be updated... c2 = Client() c2.login(username=readonly_user, password=readonly_password) response = c2.get("/operation_search/general-operation_search") self.assertNotIn(b'href="/operation_modification/', response.content) # ... only on a session basis response = c.get("/operation_search/general-operation_search") self.assertIn(b'href="/operation_modification/', response.content) class GenericSerializationTest: def create_document_default(self): image_path = os.path.join(settings.ROOT_PATH, "..", "ishtar_common", "tests", "test.png") self.documents = [] for idx in range(12): self.documents.append(models.Document.objects.create( title="Test{}".format(idx), associated_file=SimpleUploadedFile( 'test.txt', b'no real content'), image=SimpleUploadedFile( name='test.png', content=open(image_path, 'rb').read(), content_type='image/png'))) def generic_serialization_test(self, serialize, no_test=False, kwargs=None): if not kwargs: kwargs = {} json_result = serialize(**kwargs) if no_test: return json_result for key in json_result.keys(): __, k = key module_name, model_name = k.split("__") if module_name == "django": if model_name in ("Group", "Permission"): module = importlib.import_module( "django.contrib.auth.models") elif model_name in ("ContentType",): module = importlib.import_module( "django.contrib.contenttypes.models") else: return else: module = importlib.import_module(module_name + ".models") model = getattr(module, model_name) if getattr(model, "TO_BE_DELETED", False): continue current_count = model.objects.count() result = json.loads(json_result[key]) serialization_count = len(result) # all serialization have to be tested self.assertTrue(serialization_count, msg="No data to test for {}".format(key)) # only "natural" serialization self.assertNotIn( "pk", result[0], msg="Serialization for {} do not use natural keys".format(key)) self.assertNotIn( "id", result[0], msg="Serialization for {} do not use natural keys".format(key)) # has to be at least equal (can be superior for model with # recursion) self.assertTrue( serialization_count >= current_count, msg="Serialization for model {}.{} failed. {} serialized {} " "expected".format(module.__name__, model_name, serialization_count, current_count)) return json_result def generic_restore_test_genzip(self, model_list, serialization, kwargs=None): current_number = {} for model in model_list: current_number[(model.__module__, model.__name__)] = \ model.objects.count() if not kwargs: kwargs = {} kwargs["archive"] = True zip_filename = serialization(**kwargs) return current_number, zip_filename def generic_restore_test(self, zip_filename, current_number, model_list, release_locks=False, delete_existing=True): restore_serialized(zip_filename, delete_existing=delete_existing, release_locks=release_locks) for model in model_list: previous_nb = current_number[(model.__module__, model.__name__)] current_nb = model.objects.count() self.assertEqual( previous_nb, current_nb, msg="Restore for model {}.{} failed. Initial: {}, restored: " "{}.".format(model.__module__, model.__name__, previous_nb, current_nb)) class SerializationTest(GenericSerializationTest, TestCase): fixtures = COMMON_FIXTURES + WAREHOUSE_FIXTURES def create_types(self): from archaeological_finds.models import MaterialTypeQualityType, \ ObjectTypeQualityType, AlterationType, AlterationCauseType, \ TreatmentEmergencyType, CommunicabilityType from archaeological_operations.models import CulturalAttributionType for model in (models.LicenseType, MaterialTypeQualityType, ObjectTypeQualityType, AlterationType, AlterationCauseType, TreatmentEmergencyType, CommunicabilityType, CulturalAttributionType): model.objects.create(txt_idx="test", label="Test") def test_type_serialization(self): self.create_types() self.generic_serialization_test(type_serialization) def create_default_conf(self): values = {} models.get_current_profile(force=True) # create a default profile models.GlobalVar.objects.create(slug="test") cform = models.CustomForm.objects.create( name="Test", form='ishtar_common.forms.TestForm') models.ExcludedField.objects.create(custom_form=cform, field="ExcludedField") CT = ContentType.objects.get_for_model(models.OrganizationType) models.JsonDataSection.objects.create( content_type=CT, name="Test", ) JF = models.JsonDataField.objects.create( name="Test", content_type=CT, key="test" ) models.CustomFormJsonField.objects.create( custom_form=cform, json_field=JF, label="test" ) mod = models.ImporterModel.objects.get( klass="ishtar_common.models.Organization" ) values["document_template"] = models.DocumentTemplate.objects.create( name="Test", slug="test", associated_model=mod, template=SimpleUploadedFile('test.txt', b'no real content') ) return values def test_conf_serialization(self): self.create_default_conf() self.generic_serialization_test(conf_serialization) def create_default_importer(self): models.ValueFormater.objects.create( name="Test", slug="test", format_string="HOPS-{}" ) def test_importer_serialization(self): self.create_default_importer() self.generic_serialization_test(importer_serialization) def create_geo_default(self): s = models_common.State.objects.create(label="test", number="999") d = models.Department.objects.create(label="test", number="999", state=s) t1 = models.Town.objects.create( name="Test town", center="SRID=4326;POINT(-44.3 60.1)", numero_insee="12345", departement=d ) t2 = models.Town.objects.create( name="Test town 2", center="SRID=4326;POINT(-44.2 60.2)", numero_insee="12346", departement=d ) t2.children.add(t1) a = models.Area.objects.create(label="Test", txt_idx='test') a.towns.add(t1) def test_geo_serialization(self): self.create_geo_default() self.generic_serialization_test(geo_serialization) def create_directory_default(self): org = models.Organization.objects.create( name="Test", organization_type=models.OrganizationType.objects.all()[0]) person = models.Person.objects.create( name="Test", attached_to=org ) models.Author.objects.create( person=person, author_type=models.AuthorType.objects.all()[0]) def test_directory_serialization(self): self.create_directory_default() self.generic_serialization_test(directory_serialization) def create_document_default(self): super(SerializationTest, self).create_document_default() from archaeological_operations.models import Operation, \ ArchaeologicalSite, OperationType from archaeological_context_records.models import ContextRecord from archaeological_finds.models import Find, BaseFind from archaeological_warehouse.models import Warehouse, Container, \ ContainerLocalisation, WarehouseDivision, WarehouseDivisionLink, \ WarehouseType, ContainerType operation_type = OperationType.objects.all()[0] dct = {'year': 2010, 'operation_type_id': operation_type.pk, "code_patriarche": "66666"} operation1 = Operation.objects.create(**dct) operation1.documents.add(self.documents[0]) dct["code_patriarche"] = "66667" operation2 = Operation.objects.create(**dct) operation2.documents.add(self.documents[1]) site1 = ArchaeologicalSite.objects.create(reference="3333", name="test") operation1.archaeological_sites.add(site1) site1.documents.add(self.documents[2]) site2 = ArchaeologicalSite.objects.create(reference="444", name="test2") operation2.archaeological_sites.add(site2) site2.documents.add(self.documents[3]) dct = {'label': "Context record1", "operation": operation1} cr1 = ContextRecord.objects.create(**dct) cr1.documents.add(self.documents[4]) dct = {'label': "Context record2", "operation": operation2} cr2 = ContextRecord.objects.create(**dct) cr2.documents.add(self.documents[5]) dct = {'label': "Base find", "context_record": cr1} base_find1 = BaseFind.objects.create(**dct) dct = {'label': "Base find2", "context_record": cr2} base_find2 = BaseFind.objects.create(**dct) dct = {'label': "Find1"} find1 = Find.objects.create(**dct) find1.documents.add(self.documents[6]) find1.base_finds.add(base_find1) dct = {'label': "Find2"} find2 = Find.objects.create(**dct) find2.documents.add(self.documents[7]) find2.base_finds.add(base_find2) w1 = Warehouse.objects.create( name="Test1", external_id="test", warehouse_type=WarehouseType.objects.all()[0], ) w1.documents.add(self.documents[8]) w2 = Warehouse.objects.create( name="Test2", external_id="test2", warehouse_type=WarehouseType.objects.all()[0], ) w2.documents.add(self.documents[9]) self.warehouses = [w1, w2] c1 = Container.objects.create( location=w1, responsible=w1, container_type=ContainerType.objects.all()[0], reference="Réf1", index=1, external_id="ref1-1" ) c1.documents.add(self.documents[10]) c2 = Container.objects.create( location=w2, responsible=w2, container_type=ContainerType.objects.all()[0], reference="Réf2", index=2, external_id="ref2-2" ) c2.documents.add(self.documents[11]) find1.container = c1 find1.container_ref = c1 find1.save() find2.container = c2 find2.container_ref = c2 find2.save() wd1 = WarehouseDivision.objects.create( label="Étagère", txt_idx="etagere" ) wd2 = WarehouseDivision.objects.create( label="Allée", txt_idx="allee" ) wl1 = WarehouseDivisionLink.objects.create( warehouse=w1, container_type=ContainerType.objects.all()[0], ) wl2 = WarehouseDivisionLink.objects.create( warehouse=w2, container_type = ContainerType.objects.all()[1], ) ContainerLocalisation.objects.create( container=c1, division=wl1, reference="A1" ) ContainerLocalisation.objects.create( container=c2, division=wl2, reference="A2" ) def test_base_document_serialization(self): self.create_document_default() self.generic_serialization_test(document_serialization) def test_document_serialization(self): self.create_document_default() res = self.generic_serialization_test( document_serialization) docs = json.loads( res[('documents', 'ishtar_common__Document')] ) self.assertEqual(len(docs), 12) from archaeological_operations.models import Operation, \ ArchaeologicalSite result_queryset = Operation.objects.filter( code_patriarche="66666") res = self.generic_serialization_test( document_serialization, no_test=True, kwargs={"operation_queryset": result_queryset} ) docs = json.loads( res[('documents', 'ishtar_common__Document')] ) self.assertEqual(len(docs), 6) result_queryset = ArchaeologicalSite.objects.filter( id=ArchaeologicalSite.objects.all()[0].id) res = self.generic_serialization_test( document_serialization, no_test=True, kwargs={"site_queryset": result_queryset} ) docs = json.loads( res[('documents', 'ishtar_common__Document')] ) self.assertEqual(len(docs), 6) from archaeological_context_records.models import ContextRecord result_queryset = ContextRecord.objects.filter( id=ContextRecord.objects.all()[0].id) res = self.generic_serialization_test( document_serialization, no_test=True, kwargs={"cr_queryset": result_queryset} ) docs = json.loads( res[('documents', 'ishtar_common__Document')] ) self.assertEqual(len(docs), 6) from archaeological_finds.models import Find result_queryset = Find.objects.filter( id=Find.objects.all()[0].id) res = self.generic_serialization_test( document_serialization, no_test=True, kwargs={"find_queryset": result_queryset} ) docs = json.loads( res[('documents', 'ishtar_common__Document')] ) self.assertEqual(len(docs), 6) from archaeological_warehouse.models import Warehouse result_queryset = Warehouse.objects.filter( id=Warehouse.objects.all()[0].id) res = self.generic_serialization_test( document_serialization, no_test=True, kwargs={"warehouse_queryset": result_queryset} ) docs = json.loads( res[('documents', 'ishtar_common__Document')] ) self.assertEqual(len(docs), 6) def test_serialization_zip(self): zip_filename = type_serialization(archive=True) # only check the validity of the zip, the type content is tested above self.assertTrue(zipfile.is_zipfile(zip_filename)) with zipfile.ZipFile(zip_filename, "r") as zip_file: self.assertIsNone(zip_file.testzip()) info = json.loads(zip_file.read("info.json").decode("utf-8")) self.assertEqual(info["serialize-version"], SERIALIZATION_VERSION) def test_restore_version(self): zip_filename = type_serialization(archive=True) with tempfile.TemporaryDirectory() as tmpdirname: with zipfile.ZipFile(zip_filename, "w") as zip_file: base_filename = "info.json" filename = tmpdirname + os.sep + base_filename with open(filename, "w") as json_file: info = serialization_info() info["serialize-version"] = "-42" json_file.write(json.dumps(info, indent=2)) zip_file.write(filename, arcname=base_filename) with self.assertRaises(ValueError): restore_serialized(zip_filename) def test_type_restore(self): from archaeological_context_records.models import RelationType as CRRT from archaeological_operations.models import RelationType as OperationRT cr_rel_type_nb = CRRT.objects.count() ope_rel_type_nb = OperationRT.objects.count() self.create_types() models.AuthorType.objects.create(label="Test", txt_idx="test") zip_filename = type_serialization(archive=True) models.AuthorType.objects.create( label="Am I still here", txt_idx="am-i-still-here") models.AuthorType.objects.filter(txt_idx="test").delete() restore_serialized(zip_filename) self.assertEqual( models.AuthorType.objects.filter(txt_idx="test").count(), 1) self.assertEqual( models.AuthorType.objects.filter(txt_idx="am-i-still-here").count(), 1) self.assertEqual(cr_rel_type_nb, CRRT.objects.count()) self.assertEqual(ope_rel_type_nb, OperationRT.objects.count()) self.assertTrue(OperationRT.objects.filter( inverse_relation__isnull=False).count()) models.AuthorType.objects.filter(txt_idx="am-i-still-here").delete() zip_filename = type_serialization(archive=True) models.AuthorType.objects.create( label="Am I still here", txt_idx="am-i-still-here") models.AuthorType.objects.filter(txt_idx="test").delete() restore_serialized(zip_filename, delete_existing=True) self.assertEqual( models.AuthorType.objects.filter(txt_idx="test").count(), 1) self.assertEqual( models.AuthorType.objects.filter(txt_idx="am-i-still-here").count(), 0) self.assertEqual(cr_rel_type_nb, CRRT.objects.count()) self.assertEqual(ope_rel_type_nb, OperationRT.objects.count()) self.assertTrue(OperationRT.objects.filter( inverse_relation__isnull=False).count()) def test_conf_restore(self): values = self.create_default_conf() current_number, zip_filename = self.generic_restore_test_genzip( CONF_MODEL_LIST, conf_serialization) os.remove(values["document_template"].template.path) self.generic_restore_test(zip_filename, current_number, CONF_MODEL_LIST) self.assertTrue( os.path.isfile(values["document_template"].template.path) ) def test_importer_restore(self): self.create_default_importer() current_number, zip_filename = self.generic_restore_test_genzip( IMPORT_MODEL_LIST, importer_serialization) self.generic_restore_test(zip_filename, current_number, IMPORT_MODEL_LIST) def test_geo_restore(self): self.create_geo_default() self.assertTrue(models.Town.objects.get(numero_insee="12345").center) current_number, zip_filename = self.generic_restore_test_genzip( GEO_MODEL_LIST, geo_serialization) self.generic_restore_test(zip_filename, current_number, GEO_MODEL_LIST) # no geo restore self.assertFalse(models.Town.objects.get(numero_insee="12345").center) def test_directory_restore(self): self.create_directory_default() current_number, zip_filename = self.generic_restore_test_genzip( DIRECTORY_MODEL_LIST, directory_serialization) self.generic_restore_test(zip_filename, current_number, DIRECTORY_MODEL_LIST) def test_document_restore(self): self.create_document_default() current_number, zip_filename = self.generic_restore_test_genzip( [models.Document], document_serialization) self.generic_restore_test(zip_filename, current_number, [models.Document]) def full_create(self): self.create_types() self.create_default_conf() self.create_default_importer() self.create_geo_default() self.create_directory_default() self.create_document_default() def test_full_restore(self): self.full_create() model_list = get_type_models() + CONF_MODEL_LIST + IMPORT_MODEL_LIST + \ GEO_MODEL_LIST + DIRECTORY_MODEL_LIST + OPERATION_MODEL_LIST + \ CR_MODEL_LIST + FIND_MODEL_LIST + WAREHOUSE_MODEL_LIST current_number, zip_filename = self.generic_restore_test_genzip( model_list, full_serialization) self.generic_restore_test(zip_filename, current_number, model_list) def test_export_action(self): self.full_create() model_list = get_type_models() + CONF_MODEL_LIST + IMPORT_MODEL_LIST + \ GEO_MODEL_LIST + DIRECTORY_MODEL_LIST + OPERATION_MODEL_LIST + \ CR_MODEL_LIST + FIND_MODEL_LIST + WAREHOUSE_MODEL_LIST task = models.ExportTask.objects.create(state="S") launch_export(task.pk) task = models.ExportTask.objects.get(pk=task.pk) current_number = {} for model in model_list: current_number[(model.__module__, model.__name__)] = \ model.objects.count() self.generic_restore_test(task.result.path, current_number, model_list) task = models.ExportTask.objects.create( filter_type="O", filter_text="66666", state="S") current_number.update( { ("archaeological_operations.models", "ArchaeologicalSite"): 1, ("archaeological_operations.models", "Operation"): 1, ("archaeological_context_records.models", "ContextRecord"): 1, ("archaeological_finds.models_finds", "BaseFind"): 1, ("archaeological_finds.models_finds", "Find"): 1, ("archaeological_warehouse.models", "Warehouse"): 1, ("archaeological_warehouse.models", "Container"): 1, ("archaeological_warehouse.models", "WarehouseDivisionLink"): 1, ("archaeological_warehouse.models", "ContainerLocalisation"): 1, } ) launch_export(task.pk) task = models.ExportTask.objects.get(pk=task.pk) self.generic_restore_test(task.result.path, current_number, model_list) class AccessControlTest(TestCase): def test_administrator(self): admin, created = models.PersonType.objects.get_or_create( txt_idx='administrator', defaults={'label': 'Admin'}) user, created = User.objects.get_or_create(username='myusername') user.is_superuser = True user.save() ishtar_user = models.IshtarUser.objects.get( user_ptr__username='myusername') self.assertEqual( models.UserProfile.objects.filter( person__ishtaruser=ishtar_user, profile_type__txt_idx='administrator' ).count(), 1 ) user = ishtar_user.user_ptr user.is_superuser = False user.save() self.assertEqual( models.UserProfile.objects.filter( person__ishtaruser=ishtar_user, profile_type__txt_idx='administrator' ).count(), 1 ) # no more automatic deletion of profile for admin self.assertEqual( models.UserProfile.objects.filter( person__ishtaruser=ishtar_user, profile_type__txt_idx='administrator' ).count(), 1 ) def test_django_admin(self): username, password = "myusername", "mypassword" __, __, user = create_user(username=username, password=password) user.is_superuser = False user.is_staff = False user.save() client = Client() url = "/admin/" client.login(username=username, password=password) response = client.get(url) self.assertRedirects(response, "/admin/login/?next={}".format(url)) User.objects.filter(username='myusername').update(is_staff=True) client.logout() client.login(username=username, password=password) response = client.get(url) self.assertEqual(response.status_code, 200) url += "ishtar_common/persontype/" response = client.get(url) self.assertEqual(response.status_code, 403) user.user_permissions.add(Permission.objects.get( codename='change_persontype')) client.logout() client.login(username=username, password=password) response = client.get(url) self.assertEqual(response.status_code, 200) class UserProfileTest(TestCase): fixtures = OPERATION_FIXTURES def setUp(self): self.password = 'mypassword' self.username = "myuser" self.user = User.objects.create_superuser( self.username, 'myemail@test.com', self.password) self.user.set_password(self.password) self.user.save() self.client = Client() self.client.login(username=self.username, password=self.password) def test_profile_edit(self): base_url = '/profile/' base_profile = self.user.ishtaruser.current_profile response = self.client.get(base_url) self.assertEqual(response.status_code, 200) response = self.client.post( base_url, {'name': "New name", "current_profile": base_profile.pk}) self.assertEqual(response.status_code, 302) base_profile = models.UserProfile.objects.get(pk=base_profile.pk) self.assertEqual( base_profile.name, "New name" ) self.client.post( base_url, {'delete_profile': True, 'name': "New name", "current_profile": base_profile.pk}) self.assertEqual(response.status_code, 302) # cannot delete a profile it is the last of his kind self.assertEqual( self.user.ishtaruser.person.profiles.count(), 1 ) self.client.post( base_url, {'name': "New name", 'duplicate_profile': True, "current_profile": base_profile.pk}) self.assertEqual(response.status_code, 302) # duplicate self.assertEqual( self.user.ishtaruser.person.profiles.count(), 2 ) # new current profile is the duplicated new_profile = self.user.ishtaruser.current_profile base_profile = models.UserProfile.objects.get(pk=base_profile.pk) self.assertNotEqual(base_profile.pk, new_profile.pk) self.assertNotEqual(base_profile.name, new_profile.name) response = self.client.post( base_url, {'name': "New name", "current_profile": new_profile.pk}) self.assertIn( b"errorlist nonfield", response.content, msg="An error should be isplayed as this name is already taken" ) # the deletion can now occurs self.client.post( base_url, {'delete_profile': True, "current_profile": base_profile.pk}) self.assertEqual( self.user.ishtaruser.person.profiles.count(), 1 ) class AcItem: def __init__(self, model, url, lbl_key=None, prepare_func=None, id_key="pk", one_word_search=False, default_values=None): self.model, self.url, self.lbl_key = model, url, lbl_key self.prepare_func, self.id_key = prepare_func, id_key self.one_word_search = one_word_search self.default_values = default_values if not lbl_key and not prepare_func: self.lbl_key = "label" class AutocompleteTestBase: def setUp(self): self.password = 'mypassword' self.username = "myuser" user = User.objects.create_superuser( self.username, 'myemail@test.com', self.password) user.set_password(self.password) user.save() self.user = user self.client = Client() self.client.login(username=self.username, password=self.password) def test_autocomplete(self): for mdl in self.models: model, url, lbl_key = mdl.model, mdl.url, mdl.lbl_key prepare_func = mdl.prepare_func url = reverse(url) base_name = "tralala lere" search_term = "trala" if prepare_func: item, extra_search = getattr(self, prepare_func)(base_name) if extra_search: search_term += " " + extra_search else: create_dict = {lbl_key: base_name} if mdl.default_values: create_dict.update(mdl.default_values) item, __ = model.objects.get_or_create(**create_dict) response = self.client.get(url, {"term": search_term}) self.assertEqual( response.status_code, 200, msg="Status code != 200 - {}".format(url)) data = json.loads(response.content.decode()) self.assertEqual( len(data), 1, msg="{} result for '{}' expected 1 - {}".format( len(data), search_term, url)) self.assertEqual( data[0]['id'], getattr(item, mdl.id_key), msg="id: {} expected {} for '{}' - {}".format( data[0]['id'], item.pk, search_term, url)) if mdl.one_word_search: continue search_term = "ler " + search_term response = self.client.get(url, {"term": search_term}) self.assertEqual( response.status_code, 200, msg="Status code != 200 when reaching {}".format(url)) data = json.loads(response.content.decode()) self.assertEqual( len(data), 1, msg="{} result for '{}' expected 1 - {}".format( len(data), search_term, url)) self.assertEqual( data[0]['id'], getattr(item, mdl.id_key), msg="id: {} expected {} for '{}' - {}".format( data[0]['id'], item.pk, search_term, url)) class AutocompleteTest(AutocompleteTestBase, TestCase): fixtures = OPERATION_FIXTURES models = [ AcItem(models.User, 'autocomplete-user', "username"), AcItem(models.User, 'autocomplete-ishtaruser', "username"), AcItem(models.Person, 'autocomplete-person', "name"), AcItem(models.Person, 'autocomplete-person-permissive', "name"), AcItem(models.Town, 'autocomplete-town', "name"), AcItem(models.Town, 'autocomplete-advanced-town', prepare_func="create_advanced_town"), AcItem(models.Department, "autocomplete-department", "label"), AcItem(models.Author, "autocomplete-author", prepare_func="create_author"), AcItem(models.Organization, 'autocomplete-organization', prepare_func="create_orga"), ] def create_advanced_town(self, base_name): town, __ = models.Town.objects.get_or_create(name=base_name) dep, __ = models.Department.objects.get_or_create(label="Mydepartment", number=999) town.departement = dep town.save() return town, "Mydepart" def create_author(self, base_name): person, __ = models.Person.objects.get_or_create(name=base_name) author, __ = models.Author.objects.get_or_create( person=person, author_type=models.AuthorType.objects.all()[0]) return author, None def create_orga(self, base_name): orga, __ = models.Organization.objects.get_or_create( name=base_name, organization_type=models.OrganizationType.objects.all()[0]) return orga, None class AdminGenTypeTest(TestCase): fixtures = OPERATION_FIXTURES gen_models = [ models.OrganizationType, models.PersonType, models.TitleType, models.AuthorType, models.SourceType, models.OperationType, models.SpatialReferenceSystem, models.Format, models.SupportType, ] models_with_data = gen_models + [models.ImporterModel] models = models_with_data module_name = 'ishtar_common' ishtar_apps = [ 'ishtar_common', 'archaeological_files', 'archaeological_operations', 'archaeological_context_records', 'archaeological_warehouse', 'archaeological_finds' ] readonly_models = ['archaeological_finds.Property', 'archaeological_finds.Treatment', 'ishtar_common.ProfileTypeSummary'] def setUp(self): self.password = 'mypassword' self.username = "myuser" user = User.objects.create_superuser( self.username, 'myemail@test.com', self.password) user.set_password(self.password) user.save() self.client = Client() self.client.login(username=self.username, password=self.password) def test_listing_and_detail(self): models = [] for app in self.ishtar_apps: app_models = apps.get_app_config(app).get_models() for model in app_models: if model in admin_site._registry: models.append((app, model)) for app, model in models: # quick test to verify basic access to listing base_url = '/admin/{}/{}/'.format(app, model.__name__.lower()) url = base_url response = self.client.get(url) self.assertEqual( response.status_code, 200, msg="Can not access admin list for {}.".format(model)) nb = model.objects.count() url = base_url + "add/" response = self.client.get(url) if app + "." + model.__name__ in self.readonly_models: continue self.assertEqual( response.status_code, 200, msg="Can not access admin add page for {}.".format(model)) self.assertEqual( nb, model.objects.count(), msg="A ghost object have been created on access to add page " "for {}.".format(model)) if nb: url = base_url + "{}/change/".format(model.objects.all()[0].pk) response = self.client.get(url) self.assertEqual( response.status_code, 200, msg="Can not access admin detail for {}.".format(model)) def test_csv_export(self): for model in self.gen_models: url = '/admin/{}/{}/'.format(self.module_name, model.__name__.lower()) q = model.objects if not q.count(): continue response = self.client.post( url, {'action': 'export_as_csv', '_selected_action': [str(o.pk) for o in q.all()]}) self.assertEqual( response.status_code, 200, msg="Can not export as CSV for {}.".format(model)) try: f = io.StringIO(response.content.decode()) reader = csv.DictReader(f) for row in reader: if 'txt_idx' in row: slug_name = 'txt_idx' elif 'slug' in row: slug_name = 'slug' else: continue obj = model.objects.get(**{slug_name: row[slug_name]}) for k in row: current_value = getattr(obj, k) if not row[k]: self.assertIn(current_value, [None, ''], msg="Export CSV for {} - {}: CSV value is " "null whereas value for object is {}" ".".format(model, k, current_value)) continue field = model._meta.get_field(k) if isinstance(field, BooleanField): if current_value: self.assertEqual( row[k], 'True', msg="Export CSV for {} - {}: CSV value is " "{} whereas value for " "object is True.".format(model, k, row[k])) continue else: self.assertEqual( row[k], 'False', msg="Export CSV for {} - {}: CSV value is " "{} whereas value for " "object is False.".format( model, k, row[k])) continue elif isinstance(field, ForeignKey): fmodel = field.rel.to try: value = fmodel.objects.get( **{slug_name: row[k]} ) except fmodel.DoesNotExist: msg = "Export CSV for {} - {}: CSV value is "\ "{} but it is not a vaild slug for {}" \ ".".format(model, k, row[k], fmodel) raise ValidationError(msg) self.assertEqual( value, current_value, msg="Export CSV for {} - {}: CSV value is " "{} whereas value for " "object is {}.".format( model, k, value, current_value)) elif type(current_value) == float: self.assertEqual( float(row[k]), current_value, msg="Export CSV for {} - {}: CSV value is " "{} whereas value for " "object is {}.".format( model, k, row[k], current_value)) elif type(current_value) == int: self.assertEqual( int(row[k]), current_value, msg="Export CSV for {} - {}: CSV value is " "{} whereas value for " "object is {}.".format( model, k, row[k], current_value)) finally: f.close() def test_csv_import(self): for model in self.gen_models: q = model.objects nb = q.count() if not nb: continue base_url = '/admin/{}/{}/'.format(self.module_name, model.__name__.lower()) response = self.client.post( base_url, {'action': 'export_as_csv', '_selected_action': [str(o.pk) for o in q.all()]}) self.assertEqual( response.status_code, 200, msg="Can not export as CSV for {}.".format(model)) for obj in q.all(): obj.delete() url = base_url + 'import-from-csv/' try: f = io.BytesIO(response.content) response = self.client.post(url, {'csv_file': f, 'apply': True}) self.assertEqual(response.status_code, 302) self.assertRedirects(response, base_url) self.assertEqual(nb, model.objects.count()) finally: f.close() def test_json_export(self): for model in self.gen_models: url = '/admin/{}/{}/'.format(self.module_name, model.__name__.lower()) q = model.objects if not q.count(): continue response = self.client.post( url, {'action': '_serialize_action', '_selected_action': [str(o.pk) for o in q.all()]}) self.assertEqual( response.status_code, 200, msg="Can not export as JSON for {}.".format(model)) # json content already tested on full export def test_json_import(self): for model in self.gen_models: q = model.objects nb = q.count() if not nb: continue base_url = '/admin/{}/{}/'.format(self.module_name, model.__name__.lower()) response = self.client.post( base_url, {'action': '_serialize_action', '_selected_action': [str(o.pk) for o in q.all()]}) self.assertEqual( response.status_code, 200, msg="Can not export as CSV for {}.".format(model)) for obj in q.all(): obj.delete() url = base_url + 'import-from-json/' try: f = io.BytesIO(response.content) response = self.client.post(url, {'json_file': f, 'apply': True}) self.assertEqual(response.status_code, 302) self.assertRedirects(response, base_url) self.assertEqual(nb, model.objects.count()) finally: f.close() def test_importer_type_duplicate(self): model = models.ImporterType base_url = '/admin/{}/{}/'.format(self.module_name, model.__name__.lower()) ref = model.objects.all()[0] nb = model.objects.count() response = self.client.post( base_url, {'action': 'duplicate_importertype', '_selected_action': [str(ref.pk)]}) self.assertEqual(response.status_code, 302) self.assertEqual(nb + 1, model.objects.count()) duplicate = model.objects.order_by('-pk').all()[0] self.assertEqual(duplicate.columns.count(), ref.columns.count()) def test_importer_column_duplicate(self): model = models.ImporterColumn base_url = '/admin/{}/{}/'.format(self.module_name, model.__name__.lower()) ref = model.objects.all()[0] nb = model.objects.count() response = self.client.post( base_url, {'action': 'duplicate_importercolumn', '_selected_action': [str(ref.pk)]}) self.assertEqual(response.status_code, 302) self.assertEqual(nb + 1, model.objects.count()) duplicate = model.objects.order_by('-pk').all()[0] self.assertEqual(duplicate.duplicate_fields.count(), ref.duplicate_fields.count()) self.assertEqual(duplicate.targets.count(), ref.targets.count()) def test_importer_column_shift(self): model = models.ImporterColumn importer_type = models.ImporterType.objects.get( slug='ishtar-operations') # col in fixture should be well ordered for idx, col in enumerate( importer_type.columns.order_by('col_number').all()): self.assertEqual(col.col_number, idx + 1) base_url = '/admin/{}/{}/'.format(self.module_name, model.__name__.lower()) response = self.client.post( base_url, { 'action': 'shift_right', '_selected_action': [ str(c.pk) for c in importer_type.columns.all() ]}) self.assertEqual(response.status_code, 302) # col shifted to the right for idx, col in enumerate( importer_type.columns.order_by('col_number').all()): self.assertEqual(col.col_number, idx + 2) response = self.client.post( base_url, { 'action': 'shift_left', '_selected_action': [ str(c.pk) for c in importer_type.columns.all() ]}) self.assertEqual(response.status_code, 302) # col shifted back to the left for idx, col in enumerate( importer_type.columns.order_by('col_number').all()): self.assertEqual(col.col_number, idx + 1) def test_str(self): # test __str__ for model in self.models_with_data: self.assertTrue(str(model.objects.all()[0])) def test_user_creation(self): url = '/admin/auth/user/add/' password = 'ishtar is the queen' response = self.client.post( url, {'username': 'test', 'password1': password, 'password2': password}) self.assertEqual(response.status_code, 302) self.assertTrue(self.client.login(username='test', password=password)) class MergeTest(TestCase): def setUp(self): self.user, created = User.objects.get_or_create(username='username') self.organisation_types = \ models.OrganizationType.create_default_for_test() self.person_types = [models.PersonType.objects.create(label='Admin'), models.PersonType.objects.create(label='User')] self.author_types = [models.AuthorType.objects.create(label='1'), models.AuthorType.objects.create(label='2')] self.company_1 = models.Organization.objects.create( history_modifier=self.user, name='Franquin Comp.', organization_type=self.organisation_types[0]) self.person_1 = models.Person.objects.create( name='Boule', surname=' ', history_modifier=self.user, attached_to=self.company_1) self.person_1.person_types.add(self.person_types[0]) self.author_1_pk = models.Author.objects.create( person=self.person_1, author_type=self.author_types[0]).pk self.title = models.TitleType.objects.create(label='Test') self.company_2 = models.Organization.objects.create( history_modifier=self.user, name='Goscinny Corp.', organization_type=self.organisation_types[1]) self.person_2 = models.Person.objects.create( name='Bill', history_modifier=self.user, surname='Peyo', title=self.title, attached_to=self.company_2) self.user.ishtaruser.person = self.person_2 self.user.ishtaruser.save() models.UserProfile.objects.create( profile_type=models.ProfileType.objects.all()[0], person=self.person_2 ) self.person_2.person_types.add(self.person_types[1]) self.author_2_pk = models.Author.objects.create( person=self.person_2, author_type=self.author_types[1]).pk self.person_3 = models.Person.objects.create( name='George', history_modifier=self.user, attached_to=self.company_1) def test_person_merge(self): self.person_1.merge(self.person_2) # preserve existing fields self.assertEqual(self.person_1.name, 'Boule') # fill missing fields self.assertEqual(self.person_1.title, self.title) # string field with only spaces is an empty field self.assertEqual(self.person_1.surname, 'Peyo') # preserve one to one field user = User.objects.get(username='username') self.assertEqual(self.person_1, user.ishtaruser.person) # preserve existing foreign key self.assertEqual(self.person_1.attached_to, self.company_1) # preserve existing many to many self.assertTrue(self.person_types[0] in self.person_1.person_types.all()) # add new many to many self.assertTrue(self.person_types[1] in self.person_1.person_types.all()) # update reverse foreign key association and do not break the existing self.assertEqual(models.Author.objects.get(pk=self.author_1_pk).person, self.person_1) self.assertEqual(models.Author.objects.get(pk=self.author_2_pk).person, self.person_1) self.person_3.merge(self.person_1) # manage well empty many to many fields self.assertTrue(self.person_types[1] in self.person_3.person_types.all()) def test_person_with_use_account_merge(self): # bug: merge when the target is not the one having a Ishtar user account self.person_1.merge(self.person_2) def test_person_merge_candidate(self): init_mc = self.person_1.merge_candidate.count() person = models.Person.objects.create( name=self.person_1.name, surname=self.person_1.surname, history_modifier=self.user, attached_to=self.person_1.attached_to) self.assertEqual(self.person_1.merge_candidate.count(), init_mc + 1) person.archive() self.assertEqual(self.person_1.merge_candidate.count(), init_mc) class ShortMenuTest(TestCase): def setUp(self): self.username = 'username666' self.password = 'dcbqj7xnjkxnjsknx!@%' self.user = User.objects.create_superuser( self.username, "nomail@nomail.com", self.password) self.other_user = User.objects.create_superuser( 'John', "nomail@nomail.com", self.password) profile = models.get_current_profile() profile.files = True profile.context_record = True profile.find = True profile.warehouse = True profile.save() def _create_ope(self, user=None): if not user: user = self.other_user from archaeological_operations.models import Operation, OperationType ope_type, created = OperationType.objects.get_or_create(label="test") idx = 1 while Operation.objects.filter(code_patriarche=str(idx)).count(): idx += 1 return Operation.objects.create( operation_type=ope_type, history_modifier=user, year=2042, operation_code=54, code_patriarche=str(idx) ) def test_not_connected(self): c = Client() response = c.get(reverse('shortcut-menu')) # no content if not logged self.assertFalse(b"shortcut-menu" in response.content) c = Client() c.login(username=self.username, password=self.password) # no content because the user owns no object response = c.get(reverse('shortcut-menu')) self.assertFalse(b"shortcut-menu" in response.content) self._create_ope(user=self.user) # content is here response = c.get(reverse('shortcut-menu')) self.assertTrue(b"shortcut-menu" in response.content) def test_operation(self): c = Client() c.login(username=self.username, password=self.password) ope = self._create_ope() # not available at first response = c.get(reverse('shortcut-menu')) self.assertEqual(response.status_code, 200) self.assertFalse(str(ope.cached_label) in response.content.decode()) # available because is the creator ope.history_creator = self.user ope.save() response = c.get(reverse('shortcut-menu')) self.assertEqual(response.status_code, 200) self.assertTrue(str(ope.cached_label) in response.content.decode()) # available because is in charge ope.history_creator = self.other_user ope.in_charge = self.user.ishtaruser.person ope.save() response = c.get(reverse('shortcut-menu')) self.assertEqual(response.status_code, 200) self.assertTrue(str(ope.cached_label) in response.content.decode()) # available because is the scientist ope.history_creator = self.other_user ope.in_charge = None ope.scientist = self.user.ishtaruser.person ope.save() response = c.get(reverse('shortcut-menu')) self.assertEqual(response.status_code, 200) self.assertTrue(str(ope.cached_label) in response.content.decode()) # end date is reached - no more available ope.end_date = datetime.date(1900, 1, 1) ope.save() response = c.get(reverse('shortcut-menu')) self.assertEqual(response.status_code, 200) self.assertFalse(str(ope.cached_label) in response.content.decode()) # test current is not owned ope.end_date = None ope.history_creator = self.other_user ope.in_charge = None ope.scientist = None ope.save() session = c.session session[ope.SLUG] = ope.pk session.save() response = c.get(reverse('shortcut-menu')) self.assertEqual(response.status_code, 200) self.assertTrue(str(ope.cached_label) in response.content.decode()) def testFile(self): from archaeological_files.models import File, FileType c = Client() c.login(username=self.username, password=self.password) file_type = FileType.objects.create() fle = File.objects.create( file_type=file_type, history_modifier=self.other_user, year=2043, ) # not available at first response = c.get(reverse('shortcut-menu')) self.assertEqual(response.status_code, 200) self.assertFalse(str(fle.cached_label) in response.content.decode()) # available because is the creator fle.history_creator = self.user fle.save() response = c.get(reverse('shortcut-menu')) self.assertEqual(response.status_code, 200) self.assertTrue(str(fle.cached_label) in response.content.decode()) # available because is in charge fle.history_creator = self.other_user fle.in_charge = self.user.ishtaruser.person fle.save() response = c.get(reverse('shortcut-menu')) self.assertEqual(response.status_code, 200) self.assertTrue(str(fle.cached_label) in response.content.decode()) # end date is reached - no more available fle.end_date = datetime.date(1900, 1, 1) fle.save() response = c.get(reverse('shortcut-menu')) self.assertEqual(response.status_code, 200) self.assertFalse(str(fle.cached_label) in response.content.decode()) def _create_cr(self): from archaeological_context_records.models import ContextRecord from archaeological_operations.models import Parcel ope = self._create_ope() town = models.Town.objects.create() parcel = Parcel.objects.create( operation=ope, town=town, section="AA", parcel_number=42 ) return ContextRecord.objects.create( parcel=parcel, operation=ope, history_modifier=self.other_user, ) def testContextRecord(self): c = Client() c.login(username=self.username, password=self.password) cr = self._create_cr() # not available at first response = c.get(reverse('shortcut-menu')) self.assertEqual(response.status_code, 200) self.assertFalse(str(cr.cached_label) in response.content.decode()) # available because is the creator cr.history_creator = self.user cr.save() response = c.get(reverse('shortcut-menu')) self.assertEqual(response.status_code, 200) self.assertTrue(str(cr.cached_label) in response.content.decode()) # available because is in charge cr.history_creator = self.other_user cr.save() cr.operation.in_charge = self.user.ishtaruser.person cr.operation.save() response = c.get(reverse('shortcut-menu')) self.assertEqual(response.status_code, 200) self.assertTrue(str(cr.cached_label) in response.content.decode()) # available because is the scientist cr.history_creator = self.other_user cr.save() cr.operation.in_charge = None cr.operation.scientist = self.user.ishtaruser.person cr.save() response = c.get(reverse('shortcut-menu')) self.assertEqual(response.status_code, 200) self.assertTrue(str(cr.cached_label) in response.content.decode()) def _create_find(self): from archaeological_finds.models import BaseFind, Find cr = self._create_cr() base_find = BaseFind.objects.create( context_record=cr ) find = Find.objects.create( label="Where is my find?" ) find.base_finds.add(base_find) return base_find, find def testFind(self): c = Client() c.login(username=self.username, password=self.password) base_find, find = self._create_find() # not available at first response = c.get(reverse('shortcut-menu')) self.assertEqual(response.status_code, 200) self.assertFalse(str(find.cached_label) in response.content.decode()) # available because is the creator find.history_creator = self.user find.save() response = c.get(reverse('shortcut-menu')) self.assertEqual(response.status_code, 200) self.assertTrue(str(find.cached_label) in response.content.decode()) # available because is in charge find.history_creator = self.other_user find.save() base_find.context_record.operation.in_charge = \ self.user.ishtaruser.person base_find.context_record.operation.save() response = c.get(reverse('shortcut-menu')) self.assertEqual(response.status_code, 200) self.assertTrue(str(find.cached_label) in response.content.decode()) # available because is the scientist find.history_creator = self.other_user find.save() base_find.context_record.operation.in_charge = None base_find.context_record.operation.scientist = \ self.user.ishtaruser.person base_find.context_record.operation.save() response = c.get(reverse('shortcut-menu')) self.assertEqual(response.status_code, 200) self.assertTrue(str(find.cached_label) in response.content.decode()) def testBasket(self): c = Client() c.login(username=self.username, password=self.password) from archaeological_finds.models import FindBasket basket = FindBasket.objects.create( label="My basket", ) # not available at first response = c.get(reverse('shortcut-menu')) self.assertEqual(response.status_code, 200) self.assertFalse(str(basket.label) in response.content.decode()) # available because is the owner basket.user = self.user.ishtaruser basket.save() response = c.get(reverse('shortcut-menu')) self.assertEqual(response.status_code, 200) self.assertTrue(str(basket.label) in response.content.decode()) def test_treatment_file(self): c = Client() c.login(username=self.username, password=self.password) from archaeological_finds.models import TreatmentFile, \ TreatmentFileType tf = TreatmentFile.objects.create( type=TreatmentFileType.objects.create(), year=2050 ) # not available at first response = c.get(reverse('shortcut-menu')) self.assertEqual(response.status_code, 200) self.assertFalse(str(tf.cached_label) in response.content.decode()) # available because is the creator tf.history_creator = self.user tf.save() tf = TreatmentFile.objects.get(pk=tf.pk) response = c.get(reverse('shortcut-menu')) self.assertEqual(response.status_code, 200) self.assertTrue(str(tf.cached_label) in response.content.decode()) # available because is in charge tf.history_creator = self.other_user tf.in_charge = self.user.ishtaruser.person tf.save() response = c.get(reverse('shortcut-menu')) self.assertEqual(response.status_code, 200) self.assertTrue(str(tf.cached_label) in response.content.decode()) # end date is reached - no more available tf.end_date = datetime.date(1900, 1, 1) tf.save() response = c.get(reverse('shortcut-menu')) self.assertEqual(response.status_code, 200) self.assertFalse(str(tf.cached_label) in response.content.decode()) def _create_treatment(self): from archaeological_finds.models import Treatment, TreatmentState completed, created = TreatmentState.objects.get_or_create( txt_idx='completed', defaults={"executed": True, "label": u"Done"} ) return Treatment.objects.create( label="My treatment", year=2052, treatment_state=completed ) def test_treatment(self): c = Client() c.login(username=self.username, password=self.password) treat = self._create_treatment() # not available at first response = c.get(reverse('shortcut-menu')) self.assertEqual(response.status_code, 200) self.assertFalse(str(treat.cached_label) in response.content.decode()) # available because is the creator treat.history_creator = self.user treat.save() response = c.get(reverse('shortcut-menu')) self.assertEqual(response.status_code, 200) self.assertTrue(str(treat.cached_label) in response.content.decode()) # available because is in charge treat.history_creator = self.other_user treat.person = self.user.ishtaruser.person treat.save() response = c.get(reverse('shortcut-menu')) self.assertEqual(response.status_code, 200) self.assertTrue(str(treat.cached_label) in response.content.decode()) # end date is reached - no more available treat.end_date = datetime.date(1900, 1, 1) treat.save() response = c.get(reverse('shortcut-menu')) self.assertEqual(response.status_code, 200) self.assertFalse(str(treat.cached_label) in response.content.decode()) def test_pin_search(self): c = Client() c.login(username=self.username, password=self.password) base_find, find = self._create_find() response = c.post( reverse('pin-search', args=['find']), {'value': 'Where is my find'}, **{'HTTP_X_REQUESTED_WITH': 'XMLHttpRequest'} ) self.assertEqual(response.status_code, 200) # the selected find search is pined self.assertEqual(c.session['pin-search-find'], 'Where is my find') # empty search save means empty dependant search c.get( reverse('pin', args=['contextrecord', str(base_find.context_record.pk)]) ) response = c.post( reverse('pin-search', args=['find']), {'value': ''}, **{'HTTP_X_REQUESTED_WITH': 'XMLHttpRequest'} ) self.assertEqual(response.status_code, 200) self.assertEqual(c.session['pin-search-find'], '') self.assertEqual(c.session['contextrecord'], '') def test_update_current_item(self): c = Client() c.login(username=self.username, password=self.password) base_find, find = self._create_find() response = c.get(reverse('pin', args=['find', find.pk])) self.assertEqual(response.status_code, 200) # the selected find is pined self.assertEqual(c.session['find'], str(find.pk)) # dependant items are also pined self.assertEqual(c.session['contextrecord'], str(base_find.context_record.pk)) self.assertEqual(c.session['operation'], str(base_find.context_record.operation.pk)) # pin another operation - dependant items are nullify ope = self._create_ope() response = c.get(reverse('pin', args=['operation', ope.pk])) self.assertEqual(response.status_code, 200) self.assertFalse(c.session['find']) self.assertFalse(c.session['contextrecord']) # current find is set as an integer session = c.session session['find'] = find.id session.save() response = c.get(reverse('shortcut-menu')) self.assertEqual(response.status_code, 200) self._create_treatment() def test_basket_is_current_item(self): c = Client() c.login(username=self.username, password=self.password) from archaeological_finds.models import FindBasket basket = FindBasket.objects.create( label="My basket", user=self.user.ishtaruser ) session = c.session session['find'] = 'basket-{}'.format(basket.pk) session.save() response = c.get(reverse('shortcut-menu')) self.assertEqual(response.status_code, 200) response = c.get(reverse('get-document')) self.assertEqual(response.status_code, 200) class ImportTest(TestCase): def create_import(self): create_user() imp_model = models.ImporterModel.objects.create( klass='ishtar_common.models.Person', name='Person') importer_type = models.ImporterType.objects.create( associated_models=imp_model) dest = os.path.join(settings.MEDIA_ROOT, "MCC-operations-example.csv") shutil.copy( settings.ROOT_PATH + '../archaeological_operations/tests/MCC-operations-example.csv', dest ) with open(dest, 'rb') as f: mcc_operation_file = DjangoFile(f) imprt = models.Import.objects.create( user=models.IshtarUser.objects.all()[0], importer_type=importer_type, imported_file=mcc_operation_file) return imprt def test_delete_related(self): town = models.Town.objects.create(name='my-test') self.assertEqual(models.Town.objects.filter(name='my-test').count(), 1) imprt = self.create_import() town.imports.add(imprt) imprt.delete() # town should be deleted self.assertEqual(models.Town.objects.filter(name='my-test').count(), 0) def test_keys(self): content_type = ContentType.objects.get_for_model( models.OrganizationType) # creation label = u"Ploufé" ot = models.OrganizationType.objects.create(label=label) self.assertEqual(models.ItemKey.objects.filter( object_id=ot.pk, key=slugify(label), content_type=content_type).count(), 1) label_2 = u"Plif" ot_2 = models.OrganizationType.objects.create(label=label_2) self.assertEqual(models.ItemKey.objects.filter( object_id=ot_2.pk, key=slugify(label_2), content_type=content_type).count(), 1) # replace key ot_2.add_key(slugify(label), force=True) # one key point to only one item self.assertEqual(models.ItemKey.objects.filter( key=slugify(label), content_type=content_type).count(), 1) # this key point to the right item self.assertEqual(models.ItemKey.objects.filter( object_id=ot_2.pk, key=slugify(label), content_type=content_type).count(), 1) # modification label_3 = "Yop" ot_2.label = label_3 ot_2.txt_idx = slugify(label_3) ot_2.save() # old label not referenced anymore self.assertEqual(models.ItemKey.objects.filter( object_id=ot_2.pk, key=slugify(label_2), content_type=content_type).count(), 0) # # forced key association is always here # new key is here self.assertEqual(models.ItemKey.objects.filter( object_id=ot_2.pk, key=slugify(label), content_type=content_type).count(), 1) self.assertEqual(models.ItemKey.objects.filter( object_id=ot_2.pk, key=slugify(label_3), content_type=content_type).count(), 1) class IshtarSiteProfileTest(TestCase): fixtures = [settings.ROOT_PATH + '../fixtures/initial_data-auth-fr.json', settings.ROOT_PATH + '../ishtar_common/fixtures/initial_data-fr.json',] def testRelevance(self): cache.set('default-ishtarsiteprofile-is-current-profile', None, settings.CACHE_TIMEOUT) profile = models.get_current_profile() default_slug = profile.slug profile2 = models.IshtarSiteProfile.objects.create( label="Test profile 2", slug='test-profile-2') profile2.save() # when no profile is the current, activate by default the first created self.assertTrue(profile.active and not profile2.active) profile2.active = True profile2 = profile2.save() # only one profile active at a time profile = models.IshtarSiteProfile.objects.get(slug=default_slug) self.assertTrue(profile2.active and not profile.active) # activate find active automatically context records self.assertFalse(profile.context_record) profile.find = True profile = profile.save() self.assertTrue(profile.context_record) # activate warehouse active automatically context records and finds self.assertFalse(profile2.context_record or profile2.find) profile2.warehouse = True profile2 = profile2.save() self.assertTrue(profile2.context_record and profile2.find) def testDefaultProfile(self): cache.set('default-ishtar_common-IshtarSiteProfile', None, settings.CACHE_TIMEOUT) self.assertFalse(models.IshtarSiteProfile.objects.count()) profile = models.get_current_profile(force=True) self.assertTrue(profile) self.assertEqual(models.IshtarSiteProfile.objects.count(), 1) def test_menu_filtering(self): cache.set('default-ishtarsiteprofile-is-current-profile', None, settings.CACHE_TIMEOUT) username = 'username4277' password = 'dcbqj756456!@%' User.objects.create_superuser(username, "nomail@nomail.com", password) c = Client() c.login(username=username, password=password) response = c.get(reverse('start')) self.assertNotIn(b'href="/file_search/"', response.content) profile = models.get_current_profile() profile.files = True profile.save() response = c.get(reverse('start')) self.assertIn(b'href="/file_search/"', response.content) def testExternalKey(self): profile = models.get_current_profile() p = models.Person.objects.create(name='plouf', surname=u'Tégada') self.assertEqual(p.raw_name, u"PLOUF Tégada") profile.person_raw_name = u'{surname|slug} {name}' profile.save() p.raw_name = '' p.save() self.assertEqual(p.raw_name, u"tegada plouf") class IshtarBasicTest(TestCase): def setUp(self): self.password = 'mypassword' self.my_admin = User.objects.create_superuser( 'myuser', 'myemail@test.com', self.password) self.client = Client() self.client.login(username=self.my_admin.username, password=self.password) def test_status(self): response = self.client.get(reverse('status')) self.assertEqual(response.status_code, 200) def test_person_rawname(self): person = models.Person.objects.create(name="Weasley", surname="Bill") self.assertEqual(person.raw_name, "WEASLEY Bill") person.surname = "George" person.save() self.assertEqual(person.raw_name, "WEASLEY George") def test_show(self): person = models.Person.objects.create(name="Weasley", surname="Bill") orga_type = models.OrganizationType.objects.create( txt_idx='test', label='testy') company = models.Organization.objects.create( history_modifier=self.my_admin, name='Franquin Comp.', organization_type=orga_type) c = Client() response = c.get(reverse('show-person', kwargs={'pk': person.pk})) self.assertEqual(response.status_code, 200) # empty content when not allowed self.assertEqual(response.content, b"") response = c.get(reverse('show-organization', kwargs={'pk': company.pk})) self.assertEqual(response.status_code, 200) # empty content when not allowed self.assertEqual(response.content, b"") c.login(username=self.my_admin.username, password=self.password) response = c.get(reverse('show-person', kwargs={'pk': person.pk})) self.assertEqual(response.status_code, 200) self.assertIn(b'class="card sheet"', response.content) response = c.get(reverse('show-organization', kwargs={'pk': company.pk})) self.assertEqual(response.status_code, 200) self.assertIn(b'class="card sheet"', response.content) def test_town_cache(self): models.Town.objects.create(name="Sin City", numero_insee="99999") town = models.Town.objects.get(numero_insee="99999") self.assertEqual(town.cached_label, "Sin City - 99") town.year = 2050 town.save() town = models.Town.objects.get(numero_insee="99999") self.assertEqual(town.cached_label, "Sin City - 99") models.Town.objects.create(name="Mega City", numero_insee="99999", year=2051) mega_city = models.Town.objects.get(numero_insee="99999", year=2051) town.children.add(mega_city) town = models.Town.objects.get(numero_insee="99999-2050") self.assertEqual(town.cached_label, "Sin City - 99 (2050)") class GeomaticTest(TestCase): def test_post_save_point(self): class FakeGeomaticObject(object): _meta = models.GeoItem._meta def __init__(self, x, y, z, spatial_reference_system, point=None, point_2d=None): self.x = x self.y = y self.z = z self.spatial_reference_system = spatial_reference_system self.point_source = 'P' self.point_source_item = "" self.point = point self.point_2d = point_2d self.pk = 42 def save(self, *args, **kwargs): pass profile = models.get_current_profile() profile.mapping = True profile.save() srs = models.SpatialReferenceSystem.objects.create( label='WGS84', txt_idx='wgs84', srid=4326 ) obj = FakeGeomaticObject( x=2, y=3, z=4, spatial_reference_system=srs) self.assertIsNone(obj.point_2d) post_save_geo(FakeGeomaticObject, instance=obj) self.assertIsNotNone(obj.point_2d) self.assertIsNotNone(obj.point) class NewItems(TestCase): fixtures = COMMON_FIXTURES def setUp(self): self.username, self.password, self.user = create_superuser() def test_new_author(self): url = 'new-author' person = models.Person.objects.create( name="Hop", surname="Oups" ) c = Client() # TODO # response = c.get(reverse(url)) # self.assertEqual(response.status_code, 404) c.login(username=self.username, password=self.password) response = c.get(reverse(url)) self.assertEqual(response.status_code, 200) response = c.post( reverse(url), {"person": person.id, "author_type": models.AuthorType.objects.all()[0].pk} ) self.assertEqual(response.status_code, 200) self.assertEqual(person.author.count(), 1) class AccountWizardTest(WizardTest, TestCase): fixtures = [settings.ROOT_PATH + '../fixtures/initial_data-auth-fr.json', settings.ROOT_PATH + '../ishtar_common/fixtures/initial_data-fr.json',] url_name = 'account_management' wizard_name = 'account_wizard' steps = views.account_wizard_steps form_datas = [ WizardTestFormData( "Add an account", form_datas={ 'account': { 'username': "My username", 'email': "test@example.com", 'hidden_password': "my_pass", 'hidden_password_confirm': "my_pass", } }, ), ] def pre_wizard(self): self.person = models.Person.objects.create( name='Boule', surname=' ', ) self.form_datas[0].set('selec', 'pk', self.person.pk) self.form_datas[0].set('account', 'pk', self.person.pk) self.account_number = models.IshtarUser.objects.count() super(AccountWizardTest, self).pre_wizard() def post_wizard(self): person = models.Person.objects.get(pk=self.person.pk) user = person.ishtaruser.user_ptr self.assertEqual(user.username, "My username") self.assertEqual(user.email, "test@example.com") self.assertEqual(models.IshtarUser.objects.count(), self.account_number + 1) class DashboardTest(TestCase): def setUp(self): self.username, self.password, self.user = create_superuser() profile, created = models.IshtarSiteProfile.objects.get_or_create( slug='default', active=True) profile.files = True profile.context_record = True profile.find = True profile.warehouse = True profile.save() def test_dashboard(self): c = Client() c.login(username=self.username, password=self.password) url = 'dashboard-main-detail' response = c.get(reverse(url, kwargs={'item_name': "zorglub"})) self.assertEqual( response.status_code, 404) for item in ['users', 'files', 'treatmentfiles', 'treatments', 'operations', 'contextrecords', 'finds']: response = c.get(reverse(url, kwargs={'item_name': item})) self.assertEqual( response.status_code, 200, "Reaching dashboard for item: {} return an error.".format(url)) class CleanMedia(TestCase): def test_rename(self): test_names = [ ("éofficier2-12-02-04.93_gvK3hAr-1_2m7zZPn-1_nKhh2S2-1_"\ "ONmUhfD-1_ymA3gGJ-1_XzJyRx3-1_PhvRcO8-1-thumb_ZwWMKBd.jpg", "éofficier2-12-02-04.93-thumb.jpg"), ("a_ZwWMKBd.jpg", False), # no rename because too short ("hoplala_gvK3hAr_2m7zZPn_nKhh2S2_ZwWMKBd.jpg", "hoplala_gvK3hAr_2m7zZPn_nKhh2S2.jpg",), # stop before because # another file exists ] base_dir = os.sep.join([settings.ROOT_PATH, u"..", u"ishtar_common", u"tests", u"rename"]) for name, expected in test_names: name = os.sep.join([base_dir, name]) new_name, modif = rename_and_simplify_media_name(name, rename=False) if expected: self.assertTrue(modif) self.assertEqual(new_name, os.sep.join([base_dir, expected])) else: self.assertFalse(modif) def test_try_fix(self): test_names = [ (u"hoplala_gvK3hAr_2m7zZPn_nKhh2S2_ZwWMKBd_ZwWMKBd.jpg", # non existing file u"hoplala_gvK3hAr_2m7zZPn.jpg",), ] base_dir = os.sep.join([settings.ROOT_PATH, u"..", u"ishtar_common", u"tests", u"rename"]) for name, expected in test_names: name = os.sep.join([base_dir, name]) found = try_fix_file(name, make_copy=False) self.assertEqual(found, expected) class PersonQATest(TestCase): model = models.Person def setUp(self): self.username, self.password, self.user = create_superuser() self.user.user_permissions.add(Permission.objects.get( codename='change_person')) self.title_1 = models.TitleType.objects.create(label="T1", txt_idx="t1") self.title_2 = models.TitleType.objects.create(label="T2", txt_idx="t2") self.person_1 = models.Person.objects.create(title=self.title_1) self.person_2 = models.Person.objects.create(title=self.title_1) def test_bulk_update(self): c = Client() pks = u"{}-{}".format(self.person_1.pk, self.person_2.pk) response = c.get(reverse('person-qa-bulk-update', args=[pks])) self.assertRedirects(response, '/') c = Client() c.login(username=self.username, password=self.password) response = c.get(reverse('person-qa-bulk-update', args=[pks])) self.assertEqual(response.status_code, 200) self.assertNotEqual(self.person_1.title, self.title_2) self.assertNotEqual(self.person_2.title, self.title_2) response = c.post( reverse('person-qa-bulk-update-confirm', args=[pks]), {'qa_title': self.title_2.pk} ) if response.status_code != 200: self.assertRedirects(response, '/success/') self.assertEqual( models.Person.objects.get(pk=self.person_1.pk).title, self.title_2 ) self.assertEqual( models.Person.objects.get(pk=self.person_2.pk).title, self.title_2 ) class DocumentTest(TestCase): def test_custom_index(self): Operation = apps.get_model("archaeological_operations", "Operation") ContextRecord = apps.get_model("archaeological_context_records", "ContextRecord") Unit = apps.get_model("archaeological_context_records", "Unit") BaseFind = apps.get_model("archaeological_finds", "BaseFind") Find = apps.get_model("archaeological_finds", "Find") operation_type, __ = models.OperationType.objects.get_or_create( txt_idx="arch_diagnostic", label="Diagnostic") ope1 = Operation.objects.create( code_patriarche="001", operation_type_id=operation_type.pk) ope2 = Operation.objects.create( code_patriarche="002", operation_type_id=operation_type.pk) su, __ = Unit.objects.get_or_create( txt_idx='stratigraphic-unit', label="Stratigraphic unit", order=1) cr1 = ContextRecord.objects.create(operation=ope1, unit=su) cr2 = ContextRecord.objects.create(operation=ope2, unit=su) bf1 = BaseFind.objects.create(context_record=cr1) bf2 = BaseFind.objects.create(context_record=cr2) find1 = Find.objects.create() find1.base_finds.add(bf1) find2 = Find.objects.create() find2.base_finds.add(bf2) st1 = models.SourceType.objects.create(label="Report", code="REP") st2 = models.SourceType.objects.create(label="Illustration", code="ILL") profile, created = models.IshtarSiteProfile.objects.get_or_create( slug='default', active=True) profile.document_complete_identifier = \ "{operation_codes}-{source_type__code}-{custom_index}" profile.document_custom_index = "operation" profile.save() doc = models.Document.objects.create(source_type=st1, title="Operation report") doc.operations.add(ope1) doc = models.Document.objects.get(pk=doc.pk) self.assertEqual(doc.complete_identifier, "001-REP-1") doc2 = models.Document.objects.create(source_type=st2, title="Illustration CR") doc2.context_records.add(cr1) doc2 = models.Document.objects.get(pk=doc2.pk) self.assertEqual(doc2.complete_identifier, "001-ILL-2") doc3 = models.Document.objects.create(source_type=st1, title="Operation report 2") doc3.operations.add(ope2) doc3 = models.Document.objects.get(pk=doc3.pk) self.assertEqual(doc3.complete_identifier, "002-REP-1") doc3.operations.add(ope1) doc3.custom_index = None doc3.save() doc3 = models.Document.objects.get(pk=doc3.pk) self.assertEqual(doc3.custom_index, None) # 2 operations - no index self.assertEqual(doc3.complete_identifier, '001|002-REP-') # complex jinja identifier profile.document_complete_identifier = \ "{% if custom_index %}{{operation_codes}}-{{source_type__code}}-" \ "{{ \"%03d\" % (custom_index|int)}}{% else %}no-code{% endif %}" profile.save() doc3.operations.remove(ope1) doc3.custom_index = None doc3.complete_identifier = "" doc3.save() doc3 = models.Document.objects.get(pk=doc3.pk) self.assertEqual(doc3.complete_identifier, '002-REP-001') doc3.operations.remove(ope2) doc3.custom_index = None doc3.complete_identifier = "" doc3.save() doc3 = models.Document.objects.get(pk=doc3.pk) self.assertEqual(doc3.complete_identifier, 'no-code')