import os
import logging
from osgeo import ogr, osr
from itertools import permutations
from django.apps import apps
from django.conf import settings
logger = logging.getLogger(__name__)
ENCODING = settings.ENCODING or "utf-8"
HIERARCHIC_LEVELS = 5
LIST_FIELDS = { # key: hierarchic depth
"conservatory_states": HIERARCHIC_LEVELS,
"identifications": HIERARCHIC_LEVELS,
"material_types": HIERARCHIC_LEVELS,
"material_type": HIERARCHIC_LEVELS,
"object_types": HIERARCHIC_LEVELS,
"period": HIERARCHIC_LEVELS,
"periods": HIERARCHIC_LEVELS,
"source_type": HIERARCHIC_LEVELS,
"unit": HIERARCHIC_LEVELS,
"museum_collection_entry_mode": HIERARCHIC_LEVELS,
"shooting_angle": HIERARCHIC_LEVELS,
"technical_processes": HIERARCHIC_LEVELS,
"structures": HIERARCHIC_LEVELS,
"textures": HIERARCHIC_LEVELS,
"inclusions": HIERARCHIC_LEVELS,
"colors": HIERARCHIC_LEVELS,
"development_type": HIERARCHIC_LEVELS,
"monitoring_justification": HIERARCHIC_LEVELS,
"documentations": HIERARCHIC_LEVELS,
"excavation_technics": HIERARCHIC_LEVELS,
"treatment_types": HIERARCHIC_LEVELS,
"discovery_method": 0,
"discovery_status": 0,
"current_status": 0,
"nature_of_site": 0,
"interpretation_level": 0,
"museum_inventory_marking_presence": 0,
"museum_marking_type": 0,
"museum_collection": 0,
"batch": 0,
"preservation_to_considers": 0,
"integrities": 0,
"remarkabilities": 0,
"checked_type": 0,
"material_type_quality": 0,
"object_type_quality": 0,
"communicabilities": 0,
"alterations": 0,
"alteration_causes": 0,
"treatment_emergency": 0,
"cultural_attributions": 0,
"remains": 0,
"dating_type": 0,
"quality": 0,
"operation_type": 0,
"report_processing": 0,
"record_quality_type": 0,
"data_type": 0,
"origin": 0,
"provider": 0,
"activity": 0,
"person_types": 0,
"relation_types": 0,
"types": HIERARCHIC_LEVELS, # keep it at the end to not mess with other types
}
HIERARCHIC_FIELDS = list(LIST_FIELDS.keys())
def gpkg_creation(model, root, table_cols, col_names, datas):
"""
:param model: Table from the query
:param root: Path to the folder to create the geopackage
:param table_cols: List of the columns used in the query
:param col_names: Name of the columns in the new geopackage
:param datas: Data from the query
:function: Creation of the Finds and Context_Records geopackages when the query come from one of these two tables
:return finds: Geopackage for the Finds
:return cr: Geopackage for the Context_Records
:return list_ope: List of the different operations linked to the Finds and Context_Records
:return list_cr: List of the labels/names of the Context_Records used
"""
# Preparation of important values and parameters for the geopackages
finds = ''
cr = ''
list_ope = []
list_crea = []
driver = ogr.GetDriverByName('GPKG')
srs = osr.SpatialReference()
srs.ImportFromEPSG(4326)
# I. Case where the extraction come from Finds
if str(model._meta) == 'archaeological_finds.find':
# 1) Creation of the Finds geopackage
finds = os.path.join(root, 'export', 'Finds.gpkg')
# Verification to delete it if already existing
if os.path.exists(finds):
os.remove(finds)
# 2) Creation of the finds layer and its attributes
datasource = driver.CreateDataSource(finds)
layer = datasource.CreateLayer('Finds', srs, ogr.wkbPoint25D)
layer = attributes_creation_finds_query(layer, col_names, table_cols)
# 4a) Populating the finds layer with the datas
list_cr = populating_layer_finds_query(layer,table_cols,col_names,datas)
datasource = None
# 3) Creation of the Context Records file
cr = os.path.join(root, 'export', 'Context_records.gpkg')
# Verification to delete it if already existing
if os.path.exists(cr):
os.remove(cr)
datasource = driver.CreateDataSource(cr)
# 4) Creation of the Context_Records layer and a list of default attributes
layer = datasource.CreateLayer('Context_records', srs, ogr.wkbMultiPolygon)
list_crea = ['Unité_Enregistrement', 'Opération', 'INSEE_Commune', 'Type', 'Interprétation',
'Description', 'Localisation', 'Document_associe', 'Media', 'Periode', 'Type_Activité',
'Type_Identification', 'Commentaires', 'WKT', 'Infos_Parcelles_UE']
layer = attributes_creation_cr_default(layer, list_crea)
# 5) Populating the Context_Records layer with datas from the Context_Records of the extracted finds
list_ope = populating_layer_cr_default(layer, list_crea, list_cr)
datasource = None
# 6) Preparation of a list of the attributes names for the style modifications
list_crea = ['cr', list_crea]
# II. Case where the extraction come from Context_Recods
elif str(model._meta) == 'archaeological_context_records.contextrecord':
# 1) Creation of the Context Records geopackage
cr = os.path.join(root, 'export', 'Context_records.gpkg')
# Verification to delete it if already existing
if os.path.exists(cr):
os.remove(cr)
datasource = driver.CreateDataSource(cr)
# 2) Creation of the Context_Records layer and its attributes
layer = datasource.CreateLayer('Context_records', srs, ogr.wkbMultiPolygon)
layer = attributes_creation_cr_query(layer, col_names, table_cols)
# 3) Populating the Finds layer with the datas
list_ope, list_cr = populating_layer_cr_query(layer, table_cols, col_names, datas)
datasource = None
# 4) Creation of the Finds geopackage
finds = os.path.join(root, 'export', 'Finds.gpkg')
# Verification to delete it if already existing
if os.path.exists(finds):
os.remove(finds)
datasource = driver.CreateDataSource(finds)
layer = datasource.CreateLayer('Finds', srs, ogr.wkbPoint25D)
list_crea = ['Identifiant', 'UE', 'Date', 'X', 'Y', 'Z', 'Matériaux', 'Type_objets', 'Description', 'Media',
'Commentaires', 'WKT_point', 'Infos_Parcelle_Mobilier']
attributes_creation_finds_default(layer, list_crea)
# 5) Populating the finds layer with the datas
populating_layer_finds_default(layer, list_crea, list_cr)
# 6) Preparation of a list of the attributes names for the style modifications
list_crea = ['finds', list_crea]
return finds, cr, list_ope, list_crea
def gpkg_creation_sites(model, root, table_cols, col_names, datas):
"""
:param model: Table from the query
:param root: Path to the folder to create the geopackage
:param table_cols: List of the columns used in the query
:param col_names: Name of the columns in the new geopackage
:param datas: Data from the query
:function: Specific version for the creation of the needed geopackages when the query come from the
Archaeological_Sites table
:return sites: Geopackage for the Sites
:return finds: Geopackage for the Finds
:return cr: Geopackage for the Context_Records
:return list_ope: List of the different operations linked to the Finds and Context_Records
:return list_cr: List of the labels/names of the Context_Records used
"""
# Preparation of important values and parameters for the geopackages
finds = ''
cr = ''
list_ope = []
list_crea = []
driver = ogr.GetDriverByName('GPKG')
srs = osr.SpatialReference()
srs.ImportFromEPSG(4326)
# 1) Creation of the sites layer
sites = os.path.join(root, 'export', 'Sites.gpkg')
if os.path.exists(sites):
os.remove(sites)
datasource = driver.CreateDataSource(sites)
layer = datasource.CreateLayer('Sites', srs, ogr.wkbPoint)
# Specific case if the query come from the Operations, and all element linked to them must be searched
if str(model._meta) == 'archaeological_operations.operation':
# Creation of the attributes
list_s = ['Référence', 'Nom', 'Operation', 'Commune', 'Période', 'Type', 'X', 'Y', 'Commentaires', 'WKT', 'Infos_Parcelles_Sites']
layer = attributes_creation_sites_default(layer, list_s)
#Populating the finds layer with the datas
list_ope, list_cr = populating_layer_sites_default(layer, list_s, table_cols, datas)
else:
# Creation of the attributes
layer = attributes_creation_sites_query(layer, col_names, table_cols)
# Creation of the entities
list_ope, list_cr = populating_layer_sites_query(layer, table_cols, col_names, datas)
# 2) Creation of the Context_Records layer
cr = os.path.join(root, 'export', 'Context_records.gpkg')
# Verification to delete it if already existing
if os.path.exists(cr):
os.remove(cr)
datasource = driver.CreateDataSource(cr)
layer = datasource.CreateLayer('Context_records', srs, ogr.wkbMultiPolygon)
# Creation of the attributes
list_a = ['Unité_Enregistrement', 'Opération', 'INSEE_Commune', 'Type', 'Interprétation', 'Description',
'Localisation', 'Document_associe', 'Media', 'Periode', 'Type_Activité', 'Type_Identification',
'Commentaires', 'WKT', 'Infos_Parcelles_UE']
layer = attributes_creation_cr_default(layer, list_a)
# # Creation of the entities
populating_layer_cr_default(layer, list_a, list_cr)
datasource = None
# 3) Creation of the finds layer
finds = os.path.join(root, 'export', 'Finds.gpkg')
# Verification to delete it if already existing
if os.path.exists(finds):
os.remove(finds)
datasource = driver.CreateDataSource(finds)
layer = datasource.CreateLayer('Finds', srs, ogr.wkbPoint25D)
# Creation of the attributes
list_b = ['Identifiant', 'UE', 'Date', 'X', 'Y', 'Z', 'Matériaux', 'Type_objets', 'Description', 'Media',
'Commentaires', 'WKT_point', 'Infos_Parcelles_Mobilier']
layer = attributes_creation_finds_default(layer, list_b)
# Creation of the entities
populating_layer_finds_default(layer, list_b, list_cr)
# Recuperation of all created attributes
if str(model._meta) == 'archaeological_operations.operation':
list_crea = list_s + list_a + list_b
else:
list_crea = list_a + list_b
return sites, finds, cr, list_ope, list_crea
def attributes_creation_finds_query(layer, col_names, table_cols):
"""
:param layer: Finds layer from the linked geopackage
:param col_names: Name of the columns in the new layer
:param table_cols: List of the columns used in the query
:function: Creation of the attributes of the Finds layer with the information from the exporter
:return layer: Finds layer with attributes
"""
# print(table_cols) # debugtest
# print(col_names) # debugtest
# print(datas) # debugtest
# Looping on all the attributes
for idx in range(0, len(col_names)):
# Prevent missing values (case in some .gpkg)
if table_cols[idx] != '':
# print(table_cols[idx]) # debugtest
# print(col_names[idx]) # debugtest
# Gestion of specific formats of attributes
if any(elem in table_cols[idx] for elem in ['index', 'order', 'quantity', 'taq', 'tpq', 'year']):
layer.CreateField(ogr.FieldDefn(col_names[idx], ogr.OFTInteger64))
elif any(elem in table_cols[idx] for elem in
['_x', '_y', '_z', 'circumference', 'cost', 'depth', 'diameter', 'height', 'length', 'number',
'surface', 'side', 'thickness', 'value', 'volume', 'weight', 'width']):
layer.CreateField(ogr.FieldDefn(col_names[idx], ogr.OFTReal))
elif '_date' in table_cols[idx]:
layer.CreateField(ogr.FieldDefn(col_names[idx], ogr.OFTDate))
elif '_datetime' in table_cols[idx]:
layer.CreateField(ogr.FieldDefn(col_names[idx], ogr.OFTDateTime))
elif any(elem in table_cols[idx] for elem in ['large_area_prescription', 'is_complete', 'executed']):
layer.CreateField(ogr.FieldDefn(col_names[idx], ogr.OFTBinary))
else:
layer.CreateField(ogr.FieldDefn(col_names[idx], ogr.OFTString))
return layer
def populating_layer_finds_query(layer, table_cols, col_names, datas):
"""
:param layer: Finds layer from the linked geopackage with attributes
:param table_cols: List of the columns used in the query
:param col_names: Name of the columns in the new layer
:param datas: Data from the query
:function: Population of the Finds layer using all the data from the query
:return list_cr: List of all the Context_Records linked to the Finds from the query
"""
max = len(col_names)
list_cr = []
# Looping on all the datas extracted to create features
for data in datas:
# Creation of a new feature
feature = ogr.Feature(layer.GetLayerDefn())
# Preparations for the geometry
point = ''
# Looping on the attributes to add them to the feature
for idx in range(0, max):
if col_names[idx] != '':
# print(col_names[idx]) # debugtest
# print(data[idx + 1]) # debugtest
# 4) Completion of the attributes
if any(elem == table_cols[idx] for elem in ['_date', '_datetime']):
# Preparations for specific values for the date and date_time
try:
# First version if it has all the data necessary for an ogr.OFTDateTime
# +1 because the first value in the attributes is ''
feature.SetField(col_names[idx], data[idx + 1])
except:
# Second version if some values are missing
# +1 because the first value in the attributes is ''
feature.SetField(col_names[idx], data[idx + 1].year, data[idx + 1].month,
data[idx + 1].day, 0, 0, 0)
# Completing the list of the Context_Records
elif 'context_record__label' in table_cols[idx] and data[idx + 1] not in list_cr:
list_cr.append(data[idx + 1])
feature.SetField(col_names[idx], str(data[idx + 1]))
else:
# +1 because the first value in the attributes is ''
feature.SetField(col_names[idx], str(data[idx + 1]))
# Gestion of the geometry
id_label = table_cols.index('label')
BaseFind = apps.get_model('archaeological_finds', 'BaseFind')
name = data[id_label + 1]
try:
# Searching for the element itself, especially if the geometry was not exported
object, __ = BaseFind.objects.get_or_create(
label=name,
)
try:
# Completing the list of the Context_Records
if str(object.context_record).split(' | ')[-1] not in list_cr:
list_cr.append(str(object.context_record).split(' | ')[-1])
except:
pass
try:
# Creating the geometry from the coordinates X, Y and Z
point = ogr.Geometry(ogr.wkbPoint25D)
point.AddPoint(float(object.x), float(object.y), float(object.z))
except:
try:
# Case if the Z coordinates doesn't exist
point = ogr.Geometry(ogr.wkbPoint25D)
point.AddPoint(float(object.x), float(object.y), float(0.0))
except:
try:
# Extreme case if no coordinates but the WKT exists
point = ogr.CreateGeometryFromWkt(str(object.point_3d).split(';')[-1])
except:
pass
except:
pass
if point != '':
feature.SetGeometry(point)
# Addition of the new feature
layer.CreateFeature(feature)
feature = None
return list_cr
def attributes_creation_finds_default(layer, list_crea):
"""
:param layer: Finds layer from the linked geopackage
:param list_crea: Name of the columns by default
:function: Population of the Finds layer using default attributes
:return layer: Finds layer with attributes
"""
# Gestion of specific types of attributes for the default values
for attribute in list_crea:
if attribute == 'Date':
layer.CreateField(ogr.FieldDefn(attribute, ogr.OFTDate))
elif attribute in ['X', 'Y', 'Z']:
layer.CreateField(ogr.FieldDefn(attribute, ogr.OFTReal))
else:
layer.CreateField(ogr.FieldDefn(attribute, ogr.OFTString))
return layer
def populating_layer_finds_default(layer, list_crea, list_cr):
"""
:param layer: Finds layer from the linked geopackage with attributes
:param list_crea: Name of the columns by default
:param list_cr: List of all the Context_Records linked to the Finds from the query
:function: Population of the Finds layer using all the data from a specific query
:return layer: Populated Finds layer
"""
# Preparations for the queries
ContextRecord = apps.get_model('archaeological_context_records', 'ContextRecord')
BaseFind = apps.get_model('archaeological_finds', 'BaseFind')
Find = apps.get_model('archaeological_finds', 'Find')
# Looping on the different Context_Records names to add them
for name in list_cr:
# Recuperation of the information of the searched Context_Record
cr, __ = ContextRecord.objects.get_or_create(label=name)
# Recuperation of all the BaseFind linked to the searched Context_Record
finds = list(BaseFind.objects.filter(context_record=cr.id))
# Looping on all the Finds from the query
for elem in finds:
point = ''
try:
# Recuperation of the information of the corresponding Find for some attributes
find = list(Find.objects.filter(base_finds=elem))[0]
list_attributes = []
try:list_attributes.append(elem.label)
except:list_attributes.append('')
try:list_attributes.append(str(elem.context_record).split(' | ')[-1])
except:list_attributes.append('')
try:list_attributes.append(elem.discovery_date)
except:list_attributes.append('')
try:list_attributes.append(elem.x)
except:list_attributes.append('')
try:list_attributes.append(elem.y)
except:list_attributes.append('')
try:list_attributes.append(elem.z)
except:list_attributes.append('')
try:list_attributes.append(find.cached_materials)
except:list_attributes.append('')
try:list_attributes.append(find.cached_object_types)
except:list_attributes.append('')
try:list_attributes.append(find.description)
except:list_attributes.append('')
try:list_attributes.append(find.document.image)
except:list_attributes.append('')
try:list_attributes.append(find.comment)
except:list_attributes.append('')
try:list_attributes.append(str(elem.point_3d).split(';')[-1])
except:list_attributes.append('Point Z ({} {} {})'.format(elem.x, elem.y, elem.z))
try:list_attributes.append(elem.parcel.external_id)
except:list_attributes.append('')
# Creation of a new feature
feature = ogr.Feature(layer.GetLayerDefn())
for idx in range(0, len(list_crea)):
if idx == 2 :
# Gestion of the dates
try:
# First version if it has all the data necessary for an ogr.OFTDateTime
feature.SetField(list_crea[idx], list_attributes[idx])
except:
# Second version if some values are missing
feature.SetField(list_crea[idx], int(list_attributes[idx].year),
int(list_attributes[idx].month), int(list_attributes[idx].day), 0, 0, 0.0, 0)
elif idx in [3,4,5]:
# Gestion of the coordinates
try:
feature.SetField(list_crea[idx], float(list_attributes[idx]))
except:
pass
else:
feature.SetField(list_crea[idx], str(list_attributes[idx]))
try:
# Creating the geometry from the coordinates X, Y and Z
point = ogr.Geometry(ogr.wkbPoint25D)
point.AddPoint(float(elem.x), float(elem.y), float(elem.z))
except:
try:
# Case if the Z coordinates doesn't exist
point = ogr.Geometry(ogr.wkbPoint25D)
point.AddPoint(float(elem.x), float(elem.y), float(0.0))
except:
try:
# Extreme case if no coordinates but the WKT exists
point = ogr.CreateGeometryFromWkt(str(elem.point_3d).split(';')[-1])
except:
pass
if point != '':
feature.SetGeometry(point)
layer.CreateFeature(feature)
feature = None
except:
pass
return layer
def attributes_creation_cr_default(layer, list_crea):
"""
:param layer: Context_Records layer from the linked geopackage
:param list_crea: Name of the columns by default
:function: Population of the Context_Records layer using default attributes
:return layer: Populated Context_Records layer
"""
for idx in range(0, len(list_crea)):
layer.CreateField(ogr.FieldDefn(list_crea[idx], ogr.OFTString))
return layer
def populating_layer_cr_default(layer, list_crea, list_cr):
"""
:param layer: Context_Records layer from the linked geopackage with attributes
:param list_crea: Name of the columns by default
:param list_cr: List of all the Context_Records linked to the Finds from the query
:function: Population of the Context_Records layer using all the data from a specific query
:return list_ope: List of all the Operations linked to the Context_Records from the query
"""
list_ope = []
# Query in the DataBase to get information on the Context_Records of the Finds exported
ContextRecord = apps.get_model('archaeological_context_records', 'ContextRecord')
for name in list_cr:
geom = ''
try:
# Recuperation of the information of the searched Context_Record
cr, __ = ContextRecord.objects.get_or_create(label=name)
list_attributes = []
try:list_attributes.append(cr.label)
except:list_attributes.append('')
try:list_attributes.append(str(cr.operation.code_patriarche))
except:list_attributes.append('')
try:list_attributes.append(cr.town.numero_insee)
except:list_attributes.append('')
try:list_attributes.append(str(cr.unit))
except:list_attributes.append('')
try:list_attributes.append(cr.interpretation)
except:list_attributes.append('')
try:list_attributes.append(cr.description)
except:list_attributes.append('')
try:list_attributes.append(cr.location)
except:list_attributes.append('')
try:list_attributes.append(cr.documentations.values('label')[0]['label'])
except:list_attributes.append('')
try:list_attributes.append(cr.documents.image)
except:list_attributes.append('')
try:list_attributes.append(cr.cached_periods)
except:list_attributes.append('Non-renseigné')
try:list_attributes.append(cr.activity)
except:list_attributes.append('')
try:list_attributes.append(cr.identifications.values('label')[0]['label'])
except:list_attributes.append('')
try:list_attributes.append(cr.comment)
except:list_attributes.append('')
try:list_attributes.append(str(cr.main_geodata.multi_polygon).split(';')[-1])
except:list_attributes.append('')
try:list_attributes.append(cr.parcel.external_id)
except:list_attributes.append('')
# Creation of a new feature
feature = ogr.Feature(layer.GetLayerDefn())
for idx in range(0, len(list_crea)):
try:
feature.SetField(list_crea[idx], str(list_attributes[idx]))
except:
pass
# Completion of the list of Operations linked to the exported Context_Records
if cr.operation.code_patriarche not in list_ope:
list_ope.append(cr.operation.code_patriarche)
# Gestion of the geometry
try:
geom = ogr.CreateGeometryFromWkt(str(cr.main_geodata.multi_polygon).split(';')[-1])
feature.SetGeometry(geom)
except:
pass
layer.CreateFeature(feature)
feature = None
except:
pass
return list_ope
def attributes_creation_cr_query(layer, col_names, table_cols):
"""
:param layer: Context_Records layer from the linked geopackage
:param col_names: Name of the columns in the new layer
:param table_cols: List of the columns used in the query
:function: Creation of the attributes of the Context_Records layer with the data from the exporter
:return layer: Layer with attributes
"""
for idx in range(0, len(col_names)):
if table_cols[idx] != '':
# print(table_cols[idx]) # debugtest
# print(col_names[idx]) # debugtest
layer.CreateField(ogr.FieldDefn(col_names[idx], ogr.OFTString))
return layer
def populating_layer_cr_query(layer, table_cols, col_names, datas):
"""
:param layer: Context_Records layer from the linked geopackage with attributes
:param table_cols: List of the columns used in the query
:param col_names: Name of the columns in the new layer
:param datas: Data from the query
:function: Population of the Context_Records layer using all the data from the query
:return list_ope: List of all the Operations linked to the Context_Records from the query
"""
#print(table_cols) #debugtest
#print(col_names) #debugtest
#print(datas) #debugtest
list_ope = []
list_cr = []
max = len(col_names)
# Looping on all the datas extracted to create features
for data in datas:
# Creation of a new feature
feature = ogr.Feature(layer.GetLayerDefn())
for idx in range(0, max):
if col_names[idx] != '':
# Completing the list of the Operations
if 'operation__code_patriarche' in table_cols[idx] and data[idx + 1] not in list_ope:
list_ope.append(data[idx + 1])
feature.SetField(col_names[idx], str(data[idx + 1]))
else:
feature.SetField(col_names[idx], str(data[idx + 1]))
# Getting the name of the Context_Records
id_label = table_cols.index(['label'])
name = data[id_label + 1]
list_cr.append(name)
# Searching for the element itself, especially if the geometry was not exported
ContextRecord = apps.get_model('archaeological_context_records', 'ContextRecord')
cr, __ = ContextRecord.objects.get_or_create(
label=name
)
# Gestion of the geometry
try:
geom = ogr.CreateGeometryFromWkt(str(cr.main_geodata.multi_polygon).split(';')[-1])
feature.SetGeometry(geom)
except:
pass
layer.CreateFeature(feature)
feature = None
return list_ope, list_cr
def attributes_creation_sites_query(layer, col_names, table_cols):
"""
:param layer: Sites layer from the linked geopackage
:param col_names: Name of the columns in the new layer
:param table_cols: List of the columns used in the query
:function: Creation of the attributes of the Sites layer with the data from the exporter
:return layer: Layer with attributes
"""
for idx in range(0, len(col_names)):
if table_cols[idx] != '':
# Gestion of the attribute's type
if table_cols[idx] in ['geodata__x', 'geodata__y']:
layer.CreateField(ogr.FieldDefn(col_names[idx], ogr.OFTReal))
else:
layer.CreateField(ogr.FieldDefn(col_names[idx], ogr.OFTString))
return layer
def populating_layer_sites_query(layer, table_cols, col_names, datas):
"""
:param layer: Sites layer from the linked geopackage with attributes
:param table_cols: List of the columns used in the query
:param col_names: Name of the columns in the new layer
:param datas: Data from the query
:function: Population of the Sites layer using all the data from the query
:return list_cr: List of all the Context_Records linked to the Sites from the query
"""
max = len(col_names)
list_ope = []
# Looping on all the datas extracted to create features
for data in datas:
point = ''
# Creation of a new feature
feature = ogr.Feature(layer.GetLayerDefn())
# Looping on the attributes to add them to the feature
for idx in range(0, max):
if col_names[idx] != '':
# print(table_cols[idx]) # debugtest
# print(data[idx + 1]) # debugtest
# Completing the list of the Operations
if table_cols[idx] == ['operations__code_patriarche'] and data[idx + 1] not in list_ope:
list_ope.append(data[idx + 1])
feature.SetField(col_names[idx], str(data[idx + 1]))
# Gestion of the information linked to the geometry
elif any(elem in table_cols[idx][0] for elem in ['_x', '_y']):
feature.SetField(col_names[idx], str(data[idx + 1]).split(' & ')[-1])
elif '_point_2d' in table_cols[idx][0]:
feature.SetField(col_names[idx], str(data[idx + 1]).split(';')[-1])
else:
# +1 because the first value in the attributes is ''
feature.SetField(col_names[idx], str(data[idx + 1]))
# Recuperation of the information of the searched Sites
ArchaeologicalSite = apps.get_model("archaeological_operations", "ArchaeologicalSite")
try:
# First version with the name of the Site
id_label = table_cols.index(['name'])
label = data[id_label + 1]
object = list(ArchaeologicalSite.objects.filter(name=label))[0]
except:
# Second version with the reference of the Site
id_label = table_cols.index(['reference'])
label = data[id_label + 1]
object = list(ArchaeologicalSite.objects.filter(reference=label))[0]
# Completion of the list of Operations
try:
ope = str(list(object.operations.iterator())).split(' | OA')[-1].replace('>]', '')
if ope not in list_ope:
list_ope.append(ope)
else:
pass
except:
pass
try:
# Creating the geometry from the coordinates X and Y
point = ogr.Geometry(ogr.wkbPoint)
point.AddPoint(float(object.main_geodata.x), float(object.main_geodata.y))
except:
try:
# Extreme case if no coordinates but the WKT exists
point = ogr.CreateGeometryFromWkt(str(object.main_geodata.point_2d).split(';')[-1])
except:
pass
if point != '':
feature.SetGeometry(point)
# Addition of the new feature
layer.CreateFeature(feature)
feature = None
Operation = apps.get_model("archaeological_operations", "Operation")
ContextRecord = apps.get_model('archaeological_context_records', 'ContextRecord')
# Completion of the list of Context_Records linked to the extracted Sites
list_cr = []
for name in list_ope:
ope, __ = Operation.objects.get_or_create(code_patriarche=name)
search = ContextRecord.objects.filter(operation=ope)
for cr in search:
if cr.label not in list_cr:
list_cr.append(cr.label)
return list_ope, list_cr
def attributes_creation_sites_default(layer, list_crea):
"""
:param layer: Sites layer from the linked geopackage
:param list_crea: Name of the columns by default
:function: Population of the Sites layer using default attributes
:return layer: Sites layer with attributes
"""
# Gestion of specific types of attributes for the default values
for attribute in list_crea:
if attribute in ['X', 'Y']:
layer.CreateField(ogr.FieldDefn(attribute, ogr.OFTReal))
else:
layer.CreateField(ogr.FieldDefn(attribute, ogr.OFTString))
return layer
def populating_layer_sites_default(layer, list_crea, table_cols, datas):
"""
:param layer: Sites layer from the linked geopackage with attributes
:param list_crea: Name of the columns by default
:param table_cols: List of the columns used in the query
:param datas: Data from the query
:function: Population of the Sites layer using all the data from a specific query
:return list_ope: List of all the Operations linked to the Sites from the query
"""
list_ope = []
list_cr = []
# Looping on all the datas to get the code_patriarche of the Operations
for data in datas:
idx = 0
for col in table_cols:
if 'code_patriarche' in col[0]:
list_ope.append(data[idx + 1])
idx += 1
Operation = apps.get_model("archaeological_operations", "Operation")
ArchaeologicalSite = apps.get_model("archaeological_operations", "ArchaeologicalSite")
for name in list_ope:
# Recuperation of the information of the searched Sites
ope, __ = Operation.objects.get_or_create(code_patriarche=name)
sites = list(ArchaeologicalSite.objects.filter(operations=ope.id))
for site in sites:
list_attributes = []
point = ''
try:list_attributes.append(site.reference)
except:list_attributes.append('')
try:list_attributes.append(site.name)
except:list_attributes.append('')
try:list_attributes.append(name)
except:list_attributes.append('')
try:list_attributes.append(str(site.cached_towns_label).split('(')[-1].replace(')',''))
except:list_attributes.append('')
try:list_attributes.append(site.cached_periods)
except:list_attributes.append('')
try:list_attributes.append(site.cached_remains)
except:list_attributes.append('')
try:list_attributes.append(site.main_geodata.x)
except:list_attributes.append('')
try:list_attributes.append(site.main_geodata.y)
except:list_attributes.append('')
try:list_attributes.append(site.comment)
except:list_attributes.append('')
if str(site.point) == 'None':
list_attributes.append('Point ({} {})'.format(site.main_geodata.x, site.main_geodata.y))
else:
list_attributes.append(str(site.point).split(';')[-1])
# Creation of a new feature
feature = ogr.Feature(layer.GetLayerDefn())
for idx in range(0, len(list_attributes)):
if idx in [6, 7]:
# Gestion of the coordinates
try:
feature.SetField(list_crea[idx], float(list_attributes[idx]))
except:
pass
else:
feature.SetField(list_crea[idx], str(list_attributes[idx]))
try:
# Creating the geometry from the coordinates X and Y
point = ogr.CreateGeometryFromWkt(str(site.main_geodata.point).split(';')[-1])
except:
try:
# Extreme case if no coordinates but the WKT exists
point = ogr.Geometry(ogr.wkbPoint)
point.AddPoint(float(site.main_geodata.x), float(site.main_geodata.y))
except:
pass
if point != '':
feature.SetGeometry(point)
layer.CreateFeature(feature)
feature = None
# Completion of the list of Context_Records linked to the extracted Sites
ContextRecord = apps.get_model('archaeological_context_records', 'ContextRecord')
search = ContextRecord.objects.filter(operation=ope)
for cr in search:
if cr.label not in list_cr:
list_cr.append(cr.label)
return list_ope, list_cr
def format_identification(default_value, text):
"""
:param default_value: Name of the default value of the targeted list
:param text: Text version of the .qgs
:function: Find the list entry to modify, and get the good order of element to be sure of the modification
:return old_text: List entry to modify, with arguments in the good order, or Fail if not found
"""
# All parameters of an entry in a list for a QGIS attribute
bricks = ['value="{}"'.format(default_value), 'name="{}"'.format(default_value), 'type="QString"']
old_text = ''
# Getting all the configurations of attributes to be sure to find it in the .qgs
tests = list(permutations(bricks, 3))
# Trying all configurations, and preparing the good one
for test in tests:
if ' '.join(test) in text:
old_text = ' \n \n'.format(' '.join(test))
return old_text
if old_text == '':
return 'Fail'
def list_recuperation(col, old_name, default_value, base, model, table_cols, col_names, text):
"""
:param col: Column that will have a list
:param old_name: Default_name of the attribute in the .qgs
:param default_value: Name of the default value of the targeted list
:param base: Name of the database to query
:param model: Name of the specific model to query
:param table_cols: List of the columns used in the query
:param col_names: Name of the columns in the new layer
:param text: Text version of the .qgs
:function: Get all possible value for specific attributes, and placing them in lists in the .qgs
:return text: Modified QGIS project
"""
# Replacing the default name of the attribute with a list
idx = table_cols.index(col)
new = col_names[idx]
text = text.replace(old_name, new)
# Recuperation of the different entries linked to the searched attribute
model = apps.get_model(base, model)
query = model.objects.all()
# Recuperation of the entry to modify, with arguments in the good order
old_text = format_identification(default_value, text)
if old_text != 'Fail':
new_text = ''
# Preparing all the entries for the list and adding to the .qgs
for elem in query:
choice = ' \n \n'.format(
elem, elem)
new_text += choice
text = text.replace(old_text, new_text)
return text
def modification_style(qgs_path, table_cols, col_names, list_ope, list_crea):
"""
:param qgs_path: Path to the QGIS project, containing the layers style
:param table_cols: List of the columns used in the query to spot specific ones
:param col_names: Name of the columns in the new layer to add their name to the style of the layer
:param list_ope: List of the Operations linked to the entities from the query, to add them as a list
:param list_crea: List of created attributes for the Finds or Context_Records layers
:function: Modification of the QGIS project style to assure the autocompletion/automations for some attributes
:return text: Modified QGIS project
"""
# Lists of default names in the style, attribut names of the datas and new default names
list_ref = ['finds_date', 'finds_time', 'finds_x', 'finds_y', 'finds_z', 'finds_cr', 'finds_parcel',
'cr_operation', 'cr_insee', 'cr_section', 'cr_parcel', 'cr_full_parcel','cr_wkt']
list_search = ['_date', '_datetime', '__x', '__y', '__z', 'context_record__label', 'parcel__external_id',
'operation__code_patriarche', 'town__numero_insee', 'parcel__section', 'parcel__parcel_number',
'parcel__external_id', 'geodata__multi_polygon']
# Opening of the style
text = open(qgs_path, encoding='utf-8').read()
# Adding the different Operations linked of the Contexts Records and/or Finds exported to a list of possible values
old_text = format_identification("Default_value", text)
if len(list_ope) > 0:
new_text = ''
for ope in list_ope:
choice = ' \n \n'.format(ope, ope)
new_text += choice
text = text.replace(old_text, new_text)
else:
text = text.replace("Test_choice", "Null")
# Verification of all the possible attributes with lists, and application of modification if present
if 'material_types' in table_cols:
text = list_recuperation('material_types', 'finds_material', 'Default_material',
"archaeological_finds", "MaterialType", table_cols, col_names, text)
if 'conservatory_states' in table_cols:
text = list_recuperation('conservatory_states', 'finds_conservation', 'Default_conserv',
"archaeological_finds", "ConservatoryState", table_cols, col_names, text)
if 'object_types' in table_cols:
text = list_recuperation('object_types', 'finds_object', 'Default_object',
"archaeological_finds", "ObjectType", table_cols, col_names, text)
if ['unit'] in table_cols:
text = list_recuperation(['unit'], 'cr_type', 'Default_unit',
"archaeological_context_records", "Unit", table_cols, col_names, text)
if ['activity'] in table_cols:
text = list_recuperation(['activity'], 'cr_activity', 'Default_activity',
"archaeological_context_records", "ActivityType", table_cols, col_names, text)
if ['identifications'] in table_cols:
text = list_recuperation(['identifications'], 'cr_identif', 'Default_identif',
"archaeological_context_records", "IdentificationType", table_cols, col_names, text)
if ['documentations'] in table_cols:
text = list_recuperation(['documentations'], 'cr_doc', 'Default_doc',
"archaeological_context_records", "DocumentationType", table_cols, col_names, text)
if ['datings__period'] in table_cols:
text = list_recuperation(['datings__period'], 'cr_periode', 'Default_periode',
"archaeological_operations", "Period", table_cols, col_names, text)
# Specifics modifications if the datas don't come from Finds
if list_crea[0] == 'finds':
for ref in list_ref:
id_ref = list_ref.index(ref)
for col in table_cols:
if col != '' and list_search[id_ref] in col[0]:
id_new = table_cols.index(col)
new = col_names[id_new]
text = text.replace(ref, new)
# List of corresponding default names in the style linked to the default names used for the Finds
list_corr = ['finds_id', 'finds_cr', 'finds_date', 'finds_x', 'finds_y', 'finds_z', 'finds_material',
'finds_object', 'finds_description', 'finds_media', 'finds_comment', 'finds_wkt_modif', 'finds_parcel']
# Gestion of the link between the Finds and Context Records layers
id_label = table_cols.index(['label'])
new = col_names[id_label]
text = text.replace("cr_name", new)
if ['documents__image'] in table_cols:
id_media = table_cols.index(['documents__image'])
# Gestion of the link between the Finds and Context Records layers
new = col_names[id_media]
text = text.replace("cr_media", new)
# Modification of the default attributes with lists
text = list_recuperation('finds_material', 'finds_material', 'Default_material',
"archaeological_finds", "MaterialType", list_corr, list_crea[1], text)
text = list_recuperation('finds_object', 'finds_object', 'Default_object',
"archaeological_finds", "ObjectType", list_corr, list_crea[1], text)
# Replacement of the values from the default names used for the Finds
n = 0
for elem in list_crea[1]:
old = list_corr[n]
text = text.replace(old, elem)
n += 1
# Specifics modifications if the datas don't come from Context_Records
elif list_crea[0] == 'cr':
for ref in list_ref:
id_ref = list_ref.index(ref)
for col in table_cols:
if col != '' and list_search[id_ref] in col:
id_new = table_cols.index(col)
new = col_names[id_new]
text = text.replace(ref, new)
# List of corresponding default names in the style linked to the default names used for the Finds
list_corr = ['cr_name', 'cr_operation', 'cr_insee', 'cr_type', 'cr_occupation', 'cr_description',
'cr_localisation', 'cr_doc', 'cr_media', 'cr_periode', 'cr_activity', 'cr_identif', 'cr_comment',
'cr_wkt', 'cr_full_parcel']
# Test in case the all names of attributes are in lists
try:
id_label = table_cols.index(['label'])
except:
id_label = table_cols.index('label')
# Gestion of the link between the Finds and Context Records layers
new = col_names[id_label]
text = text.replace('finds_id', new)
if 'documents__image' in table_cols:
try:
id_media = table_cols.index(['documents__image'])
except:
id_media = table_cols.index('documents__image')
# Gestion of the link between the Finds and Context Records layers
new = col_names[id_media]
text = text.replace("finds_media", new)
# Modification of the default attributes with lists
text = list_recuperation('cr_type', 'cr_type', 'Default_unit',
"archaeological_context_records", "Unit", list_corr, list_crea[1], text)
text = list_recuperation('cr_activity', 'cr_activity', 'Default_activity',
"archaeological_context_records", "ActivityType", list_corr, list_crea[1], text)
text = list_recuperation('cr_periode', 'cr_periode', 'Default_periode',
"archaeological_operations", "Period", list_corr, list_crea[1], text)
text = list_recuperation('cr_identif', 'cr_identif', 'Default_identif',
"archaeological_context_records", "IdentificationType", list_corr, list_crea[1], text)
text = list_recuperation('cr_doc', 'cr_doc', 'Default_doc',
"archaeological_context_records", "DocumentationType", list_corr, list_crea[1], text)
# Specific case to assure the good registration of the z coordinate
if any('__point_3d' in elem for elem in table_cols):
id_new = [i for i, element in enumerate(table_cols) if 'geodata__point_3d' in element][0]
#id_new = table_cols.index('geodata__point_3d')
if any('__z' in elem for elem in table_cols):
ref = "finds_wkt_modif"
new = col_names[id_new]
else:
ref = "finds_wkt_simple"
new = col_names[id_new]
text = text.replace(ref, new)
# Replacement of the values from the default names used for the Context Records
n = 0
for elem in list_crea[1]:
old = list_corr[n]
text = text.replace(old, elem)
n += 1
else:
pass
return text
def modification_style_sites(qgs_path, table_cols, col_names, list_ope, list_crea):
"""
:param qgs_path: Path to the QGIS project, containing the layers style
:param table_cols: List of the columns used in the query to spot specific ones
:param col_names: Name of the columns in the new layer to add their name to the style of the layer
:param list_ope: List of the Operations linked to the entities from the query, to add them as a list
:param list_crea: List of created attributes for the Finds and Context_Records layers
:function: Modification of the QGIS project style to assure the autocompletion/automations for some attributes
:return text: Modified QGIS project
"""
# Lists of default names in the style, attribut names of the datas and new default names
list_ref = ['sites_operation', 'sites_parcel', 'sites_insee', 'sites_x', 'sites_y', 'sites_wkt']
list_search = ['operations__code_patriarche', 'parcel__external_id', 'towns__numero_insee', 'geodata__x',
'geodata__y', 'geodata__point_2d']
# Opening of the style
text = open(qgs_path, encoding='utf-8').read()
# Adding the different Operations linked of the Contexts Records and/or Finds exported to a list of possible values
old_text = format_identification("Default_value", text)
# Adding the different Operations linked of the Contexts Records and/or Finds exported to a list of possible values
if len(list_ope) > 0:
new_text = ''
for ope in list_ope:
choice = ' \n \n'.format(
ope, ope)
new_text += choice
text = text.replace(old_text, new_text)
else:
text = text.replace("Test_choice", "Default")
# Verification of all the possible attributes with lists, and application of modification if present
if ['periods'] in table_cols:
text = list_recuperation(['periods'], 'sites_periodes', 'Default_speriode',
"archaeological_operations", "Period", table_cols, col_names, text)
if ['remains'] in table_cols:
text = list_recuperation(['remains'], 'sites_type', 'Default_rtype',
"archaeological_operations", "RemainType", table_cols, col_names, text)
# Only loop and modify the name of attributes if the query was done on the Sites, not the Operations
# If from Operations, the first name of default columns is the Reference of the Site
if list_crea[0] != 'Référence':
for ref in list_ref:
id_ref = list_ref.index(ref)
for col in table_cols:
if col != '' and list_search[id_ref] in col:
id_new = table_cols.index(col)
new = col_names[id_new]
text = text.replace(ref, new)
# Default names of the common attributes to modify
list_ref = ['cr_name', 'cr_operation', 'cr_insee', 'cr_type', 'cr_occupation', 'cr_description',
'cr_localisation', 'cr_doc', 'cr_media', 'cr_periode', 'cr_activity', 'cr_identif', 'cr_comment',
'cr_wkt', 'cr_full_parcel', 'finds_id', 'finds_cr', 'finds_date', 'finds_x', 'finds_y', 'finds_z',
'finds_material', 'finds_object', 'finds_description', 'finds_media', 'find_comment', 'finds_wkt_modif',
'finds_parcel']
# Modification of the default attributes with lists
if list_crea[0] == 'Référence':
# Adding the names of the specific attributes for the sites
list_ref = ['sites_reference', 'sites_name', 'sites_operation', 'sites_insee', 'sites_periodes', 'sites_type',
'sites_x', 'sites_y', 'sites_comment', 'sites_wkt', 'sites_parcel'] + list_ref
text = list_recuperation('sites_periodes', 'sites_periodes', 'Default_speriode',
"archaeological_operations", "Period", list_ref, list_crea, text)
text = list_recuperation('sites_type', 'sites_type', 'Default_rtype',
"archaeological_operations", "RemainType", list_ref, list_crea, text)
text = list_recuperation('finds_material', 'finds_material', 'Default_material',
"archaeological_finds", "MaterialType", list_ref, list_crea, text)
text = list_recuperation('finds_object', 'finds_object', 'Default_object',
"archaeological_finds", "ObjectType", list_ref, list_crea, text)
text = list_recuperation('cr_type', 'cr_type', 'Default_unit',
"archaeological_context_records", "Unit", list_ref, list_crea, text)
text = list_recuperation('cr_activity', 'cr_activity', 'Default_activity',
"archaeological_context_records", "ActivityType", list_ref, list_crea, text)
text = list_recuperation('cr_identif', 'cr_identif', 'Default_identif',
"archaeological_context_records", "IdentificationType", list_ref, list_crea, text)
text = list_recuperation('cr_doc', 'cr_doc', 'Default_doc',
"archaeological_context_records", "DocumentationType", list_ref, list_crea, text)
text = list_recuperation('cr_periode', 'cr_periode', 'Default_periode',
"archaeological_operations", "Period", list_ref, list_crea, text)
# Modification of the othe attributes names
for id in range(0, len(list_crea)):
text = text.replace(list_ref[id], list_crea[id])
return text