Source code for pyarchinit_mini.services.import_export_service

"""
PyArchInit Import/Export Service

Handles data import and export between PyArchInit (full version)
and PyArchInit-Mini databases.

Supports:
- Site data
- US (Stratigraphic Units) data with relationship mapping
- Inventario Materiali data
- Periodizzazione data
- Thesaurus data

Database support: SQLite and PostgreSQL (both source and target)
"""

import ast
import json
import shutil
import os
from datetime import datetime, date
from typing import Dict, List, Optional, Tuple, Any
import logging
from sqlalchemy import create_engine, text, inspect
from sqlalchemy.orm import sessionmaker, Session
from sqlalchemy.exc import SQLAlchemyError

# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


[docs] class ImportExportService: """Service for importing and exporting data between PyArchInit and PyArchInit-Mini""" def __init__(self, mini_db_connection: str, source_db_connection: Optional[str] = None): """ Initialize ImportExport service Args: mini_db_connection: Connection string for PyArchInit-Mini database source_db_connection: Connection string for source PyArchInit database (for import) """ self.mini_engine = create_engine(mini_db_connection) self.mini_session_maker = sessionmaker(bind=self.mini_engine) self.source_engine = None self.source_session_maker = None self._backup_created = False # Track if backup was already created for this session self._backup_path = None # Store backup path if source_db_connection: self.source_engine = create_engine(source_db_connection) self.source_session_maker = sessionmaker(bind=self.source_engine) def set_source_database(self, source_db_connection: str): """Set or change the source database connection""" self.source_engine = create_engine(source_db_connection) self.source_session_maker = sessionmaker(bind=self.source_engine) def _backup_source_database(self) -> Optional[str]: """ Create a backup of the source database before migration For SQLite: Copies the database file with timestamp For PostgreSQL: Uses pg_dump to create SQL backup Returns: Path to backup file, or None if backup failed """ if not self.source_engine: logger.warning("No source database configured, skipping backup") return None connection_string = str(self.source_engine.url) # SQLite backup if connection_string.startswith('sqlite:///'): # Extract file path from connection string # Format: sqlite:///path/to/file.db or sqlite:////absolute/path/to/file.db db_path = connection_string.replace('sqlite:///', '') # Handle absolute paths (start with /) if not db_path.startswith('/'): db_path = '/' + db_path if not os.path.exists(db_path): logger.error(f"Source database file not found: {db_path}") return None # Create backup with timestamp timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') backup_path = f"{db_path}.backup_{timestamp}" try: shutil.copy2(db_path, backup_path) file_size = os.path.getsize(backup_path) / (1024 * 1024) # MB logger.info(f"✓ Database backup created: {backup_path} ({file_size:.2f} MB)") return backup_path except Exception as e: logger.error(f"Failed to create backup: {e}") return None # PostgreSQL backup elif connection_string.startswith('postgresql'): try: import subprocess # Extract connection details url = self.source_engine.url host = url.host or 'localhost' port = url.port or 5432 database = url.database user = url.username password = url.password # Create backup file path timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') backup_path = f"{database}_backup_{timestamp}.sql" # Set password environment variable env = os.environ.copy() if password: env['PGPASSWORD'] = password # Run pg_dump cmd = [ 'pg_dump', '-h', host, '-p', str(port), '-U', user, '-F', 'p', # Plain SQL format '-f', backup_path, database ] result = subprocess.run(cmd, env=env, capture_output=True, text=True) if result.returncode == 0: file_size = os.path.getsize(backup_path) / (1024 * 1024) # MB logger.info(f"✓ Database backup created: {backup_path} ({file_size:.2f} MB)") return backup_path else: logger.error(f"pg_dump failed: {result.stderr}") return None except Exception as e: logger.error(f"Failed to create PostgreSQL backup: {e}") return None else: logger.warning(f"Unsupported database type for backup: {connection_string}") return None def _check_i18n_columns_exist(self, table_name: str) -> Dict[str, bool]: """ Check which i18n columns exist in the source database table Args: table_name: Name of the table to check Returns: Dictionary with column_name: exists mapping """ if not self.source_engine: raise ValueError("Source database not configured") inspector = inspect(self.source_engine) columns = [col['name'] for col in inspector.get_columns(table_name)] # Define i18n columns for each table i18n_columns = { 'site_table': [ 'definizione_sito_en', 'descrizione_en' ], 'us_table': [ 'd_stratigrafica_en', 'd_interpretativa_en', 'descrizione_en', 'interpretazione_en', 'formazione_en', 'stato_di_conservazione_en', 'colore_en', 'consistenza_en', 'struttura_en', 'inclusi_en', 'campioni_en', 'documentazione_en', 'osservazioni_en' ], 'inventario_materiali_table': [ 'tipo_reperto_en', 'definizione_reperto_en', 'descrizione_en', 'tecnologia_en', 'forma_en', 'stato_conservazione_en', 'osservazioni_en' ] } result = {} for col in i18n_columns.get(table_name, []): result[col] = col in columns return result def _add_missing_i18n_columns(self, table_name: str) -> Dict[str, Any]: """ Add missing i18n (_en) columns to source database table Args: table_name: Name of the table to migrate Returns: Dictionary with migration statistics """ if not self.source_engine: raise ValueError("Source database not configured") stats = {'columns_added': 0, 'columns_skipped': 0, 'errors': []} # Check which columns are missing missing_columns = {k: v for k, v in self._check_i18n_columns_exist(table_name).items() if not v} if not missing_columns: logger.info(f"Table {table_name} already has all i18n columns") return stats logger.info(f"Adding {len(missing_columns)} missing i18n columns to {table_name}: {list(missing_columns.keys())}") with self.source_engine.begin() as conn: for col_name in missing_columns.keys(): try: # Add TEXT column with NULL default sql = text(f"ALTER TABLE {table_name} ADD COLUMN {col_name} TEXT") conn.execute(sql) stats['columns_added'] += 1 logger.info(f"Added column {col_name} to {table_name}") except Exception as e: error_msg = f"Failed to add column {col_name} to {table_name}: {str(e)}" logger.error(error_msg) stats['errors'].append(error_msg) stats['columns_skipped'] += 1 return stats def migrate_source_database(self, tables: Optional[List[str]] = None, auto_backup: bool = True) -> Dict[str, Any]: """ Migrate source PyArchInit database to add i18n columns This is called automatically before import to ensure compatibility. Args: tables: List of tables to migrate (None = all tables) auto_backup: If True, create automatic backup before migration Returns: Dictionary with migration statistics including backup_path """ if not self.source_engine: raise ValueError("Source database not configured") all_tables = ['site_table', 'us_table', 'inventario_materiali_table'] tables_to_migrate = tables if tables else all_tables total_stats = { 'tables_migrated': 0, 'columns_added': 0, 'errors': [], 'backup_path': None } # Create backup before migration if requested (only once per session) if auto_backup and not self._backup_created: logger.info("Creating database backup before migration...") backup_path = self._backup_source_database() self._backup_path = backup_path self._backup_created = True if backup_path: logger.info(f"✓ Backup created successfully: {backup_path}") else: logger.warning("⚠ Backup failed, but continuing with migration...") total_stats['backup_path'] = self._backup_path for table in tables_to_migrate: try: logger.info(f"Migrating table: {table}") stats = self._add_missing_i18n_columns(table) total_stats['columns_added'] += stats['columns_added'] total_stats['errors'].extend(stats['errors']) if stats['columns_added'] > 0: total_stats['tables_migrated'] += 1 except Exception as e: error_msg = f"Failed to migrate table {table}: {str(e)}" logger.error(error_msg) total_stats['errors'].append(error_msg) logger.info(f"Migration complete: {total_stats['tables_migrated']} tables migrated, {total_stats['columns_added']} columns added") return total_stats # ============================================================================ # SITE TABLE IMPORT/EXPORT # ============================================================================ def import_sites(self, sito_filter: Optional[List[str]] = None, auto_migrate: bool = True, auto_backup: bool = True) -> Dict[str, Any]: """ Import sites from PyArchInit to PyArchInit-Mini Args: sito_filter: List of site names to import (None = import all) auto_migrate: If True, automatically add missing i18n columns to source database auto_backup: If True, create backup before database migration Returns: Dictionary with import statistics including backup_path """ if not self.source_engine: raise ValueError("Source database not configured") # Auto-migrate source database to add i18n columns if needed if auto_migrate: logger.info("Checking source database for missing i18n columns...") migration_stats = self.migrate_source_database(tables=['site_table'], auto_backup=auto_backup) if migration_stats['columns_added'] > 0: logger.info(f"Added {migration_stats['columns_added']} i18n columns to source database") if migration_stats.get('backup_path'): logger.info(f"Database backup: {migration_stats['backup_path']}") stats = {'imported': 0, 'updated': 0, 'skipped': 0, 'errors': []} source_session = self.source_session_maker() mini_session = self.mini_session_maker() try: # Query sites from PyArchInit query = "SELECT * FROM site_table" if sito_filter: placeholders = ','.join([f"'{s}'" for s in sito_filter]) query += f" WHERE sito IN ({placeholders})" result = source_session.execute(text(query)) source_sites = result.fetchall() for site_row in source_sites: try: site_data = dict(site_row._mapping) # Check if site already exists existing = mini_session.execute( text("SELECT id_sito FROM site_table WHERE sito = :sito"), {'sito': site_data['sito']} ).fetchone() if existing: # Update existing site update_query = text(""" UPDATE site_table SET nazione = :nazione, regione = :regione, comune = :comune, provincia = :provincia, definizione_sito = :definizione_sito, descrizione = :descrizione, sito_path = :sito_path, find_check = :find_check, updated_at = :updated_at WHERE sito = :sito """) mini_session.execute(update_query, { 'sito': site_data['sito'], 'nazione': site_data.get('nazione'), 'regione': site_data.get('regione'), 'comune': site_data.get('comune'), 'provincia': site_data.get('provincia'), 'definizione_sito': site_data.get('definizione_sito'), 'descrizione': site_data.get('descrizione'), 'sito_path': site_data.get('sito_path'), 'find_check': site_data.get('find_check', 0), 'updated_at': datetime.now() }) stats['updated'] += 1 else: # Insert new site insert_query = text(""" INSERT INTO site_table (sito, nazione, regione, comune, provincia, definizione_sito, descrizione, sito_path, find_check, created_at, updated_at) VALUES (:sito, :nazione, :regione, :comune, :provincia, :definizione_sito, :descrizione, :sito_path, :find_check, :created_at, :updated_at) """) mini_session.execute(insert_query, { 'sito': site_data['sito'], 'nazione': site_data.get('nazione'), 'regione': site_data.get('regione'), 'comune': site_data.get('comune'), 'provincia': site_data.get('provincia'), 'definizione_sito': site_data.get('definizione_sito'), 'descrizione': site_data.get('descrizione'), 'sito_path': site_data.get('sito_path'), 'find_check': site_data.get('find_check', 0), 'created_at': datetime.now(), 'updated_at': datetime.now() }) stats['imported'] += 1 mini_session.commit() except Exception as e: mini_session.rollback() error_msg = f"Error importing site {site_data.get('sito', 'unknown')}: {str(e)}" logger.error(error_msg) stats['errors'].append(error_msg) stats['skipped'] += 1 return stats except Exception as e: logger.error(f"Import sites failed: {str(e)}") raise finally: source_session.close() mini_session.close() def export_sites(self, target_db_connection: str, sito_filter: Optional[List[str]] = None) -> Dict[str, Any]: """ Export sites from PyArchInit-Mini to PyArchInit Args: target_db_connection: Connection string for target PyArchInit database sito_filter: List of site names to export (None = export all) Returns: Dictionary with export statistics """ stats = {'exported': 0, 'updated': 0, 'skipped': 0, 'errors': []} target_engine = create_engine(target_db_connection) target_session = sessionmaker(bind=target_engine)() mini_session = self.mini_session_maker() try: # Query sites from PyArchInit-Mini query = "SELECT * FROM site_table" if sito_filter: placeholders = ','.join([f"'{s}'" for s in sito_filter]) query += f" WHERE sito IN ({placeholders})" result = mini_session.execute(text(query)) mini_sites = result.fetchall() for site_row in mini_sites: try: site_data = dict(site_row._mapping) # Check if site exists in target existing = target_session.execute( text("SELECT id_sito FROM site_table WHERE sito = :sito"), {'sito': site_data['sito']} ).fetchone() if existing: # Update existing update_query = text(""" UPDATE site_table SET nazione = :nazione, regione = :regione, comune = :comune, provincia = :provincia, definizione_sito = :definizione_sito, descrizione = :descrizione, sito_path = :sito_path, find_check = :find_check WHERE sito = :sito """) target_session.execute(update_query, { 'sito': site_data['sito'], 'nazione': site_data.get('nazione'), 'regione': site_data.get('regione'), 'comune': site_data.get('comune'), 'provincia': site_data.get('provincia'), 'definizione_sito': site_data.get('definizione_sito'), 'descrizione': site_data.get('descrizione'), 'sito_path': site_data.get('sito_path'), 'find_check': site_data.get('find_check', 0) }) stats['updated'] += 1 else: # Insert new insert_query = text(""" INSERT INTO site_table (sito, nazione, regione, comune, provincia, definizione_sito, descrizione, sito_path, find_check) VALUES (:sito, :nazione, :regione, :comune, :provincia, :definizione_sito, :descrizione, :sito_path, :find_check) """) target_session.execute(insert_query, { 'sito': site_data['sito'], 'nazione': site_data.get('nazione'), 'regione': site_data.get('regione'), 'comune': site_data.get('comune'), 'provincia': site_data.get('provincia'), 'definizione_sito': site_data.get('definizione_sito'), 'descrizione': site_data.get('descrizione'), 'sito_path': site_data.get('sito_path'), 'find_check': site_data.get('find_check', 0) }) stats['exported'] += 1 target_session.commit() except Exception as e: target_session.rollback() error_msg = f"Error exporting site {site_data.get('sito', 'unknown')}: {str(e)}" logger.error(error_msg) stats['errors'].append(error_msg) stats['skipped'] += 1 return stats except Exception as e: logger.error(f"Export sites failed: {str(e)}") raise finally: target_session.close() mini_session.close() # ============================================================================ # US TABLE IMPORT/EXPORT WITH RELATIONSHIP MAPPING # ============================================================================ def _parse_pyarchinit_rapporti(self, rapporti_str: str) -> List[Tuple[str, str]]: """ Parse PyArchInit rapporti field (list of lists format) Args: rapporti_str: String like "[['Copre', '2'], ['Copre', '8']]" Returns: List of tuples: [(relationship_type, us_number), ...] """ if not rapporti_str or rapporti_str == '[]': return [] try: # Parse the string as Python literal rapporti_list = ast.literal_eval(rapporti_str) # Extract only relationship type and US number (ignore area and site) relationships = [] for item in rapporti_list: if isinstance(item, list) and len(item) >= 2: rel_type = item[0] # e.g., 'Copre', 'Coperto da' us_num = str(item[1]) # US number relationships.append((rel_type, us_num)) return relationships except (ValueError, SyntaxError) as e: logger.warning(f"Failed to parse rapporti: {rapporti_str} - {str(e)}") return [] def _convert_relationships_to_pyarchinit_format(self, sito: str, us: str, mini_session: Session) -> str: """ Convert PyArchInit-Mini us_relationships_table to PyArchInit rapporti format Args: sito: Site name us: US number mini_session: Session for PyArchInit-Mini database Returns: String in PyArchInit format: "[['Copre', '2'], ['Coperto da', '3']]" """ try: # Get relationships from PyArchInit-Mini query = text(""" SELECT relationship_type, us_to FROM us_relationships_table WHERE sito = :sito AND us_from = :us """) result = mini_session.execute(query, {'sito': sito, 'us': int(us)}) relationships = result.fetchall() if not relationships: return '[]' # Convert to PyArchInit format (list of lists) rapporti_list = [[rel.relationship_type, str(rel.us_to)] for rel in relationships] return str(rapporti_list) except Exception as e: logger.error(f"Error converting relationships: {str(e)}") return '[]' def import_us(self, sito_filter: Optional[List[str]] = None, import_relationships: bool = True, auto_migrate: bool = True, auto_backup: bool = True) -> Dict[str, Any]: """ Import US (Stratigraphic Units) from PyArchInit to PyArchInit-Mini Args: sito_filter: List of site names to import (None = import all) import_relationships: If True, parse rapporti field and create relationships auto_migrate: If True, automatically add missing i18n columns to source database auto_backup: If True, create backup before database migration Returns: Dictionary with import statistics including backup_path """ if not self.source_engine: raise ValueError("Source database not configured") # Auto-migrate source database to add i18n columns if needed if auto_migrate: logger.info("Checking source database for missing i18n columns...") migration_stats = self.migrate_source_database(tables=['us_table'], auto_backup=auto_backup) if migration_stats['columns_added'] > 0: logger.info(f"Added {migration_stats['columns_added']} i18n columns to source database") if migration_stats.get('backup_path'): logger.info(f"Database backup: {migration_stats['backup_path']}") stats = { 'imported': 0, 'updated': 0, 'skipped': 0, 'relationships_created': 0, 'errors': [] } source_session = self.source_session_maker() mini_session = self.mini_session_maker() try: # Query US from PyArchInit query = "SELECT * FROM us_table" if sito_filter: placeholders = ','.join([f"'{s}'" for s in sito_filter]) query += f" WHERE sito IN ({placeholders})" result = source_session.execute(text(query)) source_us_list = result.fetchall() for us_row in source_us_list: try: us_data = dict(us_row._mapping) # Check if US already exists using raw SQL (avoids ORM metadata issues) existing = mini_session.execute( text("SELECT id_us FROM us_table WHERE sito = :sito AND us = :us LIMIT 1"), {'sito': us_data['sito'], 'us': us_data['us']} ).fetchone() # Map fields from PyArchInit to PyArchInit-Mini mapped_data = self._map_us_fields_import(us_data) if existing: # Update existing US self._update_us_mini(mini_session, mapped_data) stats['updated'] += 1 else: # Insert new US self._insert_us_mini(mini_session, mapped_data) stats['imported'] += 1 # Handle relationships if import_relationships: rapporti_field = us_data.get('rapporti') if rapporti_field: logger.info(f"Processing relationships for US {us_data['sito']}/{us_data['us']}: {rapporti_field}") relationships = self._parse_pyarchinit_rapporti(rapporti_field) logger.info(f"Parsed {len(relationships)} relationships: {relationships}") for rel_type, us_to in relationships: try: # Check if relationship already exists existing_rel = mini_session.execute( text("""SELECT id_relationship FROM us_relationships_table WHERE sito = :sito AND us_from = :us_from AND us_to = :us_to AND relationship_type = :rel_type"""), { 'sito': us_data['sito'], 'us_from': int(us_data['us']), 'us_to': int(us_to), 'rel_type': rel_type } ).fetchone() if existing_rel: logger.debug(f"Relationship already exists: {us_data['sito']} US {us_data['us']} -{rel_type}-> {us_to}") continue # Insert relationship rel_query = text(""" INSERT INTO us_relationships_table (sito, us_from, us_to, relationship_type, created_at, updated_at) VALUES (:sito, :us_from, :us_to, :rel_type, :created_at, :updated_at) """) mini_session.execute(rel_query, { 'sito': us_data['sito'], 'us_from': int(us_data['us']), 'us_to': int(us_to), 'rel_type': rel_type, 'created_at': datetime.now(), 'updated_at': datetime.now() }) stats['relationships_created'] += 1 logger.info(f"Created relationship: {us_data['sito']} US {us_data['us']} -{rel_type}-> {us_to}") except Exception as e: logger.warning(f"Failed to create relationship {us_data['sito']} US {us_data['us']} -{rel_type}-> {us_to}: {str(e)}") else: logger.debug(f"No rapporti field for US {us_data['sito']}/{us_data['us']}") mini_session.commit() except Exception as e: mini_session.rollback() error_msg = f"Error importing US {us_data.get('sito')}/{us_data.get('us')}: {str(e)}" logger.error(error_msg) stats['errors'].append(error_msg) stats['skipped'] += 1 return stats except Exception as e: logger.error(f"Import US failed: {str(e)}") raise finally: source_session.close() mini_session.close() def _convert_rapporti_to_mini_format(self, rapporti_str: str) -> str: """ Convert PyArchInit rapporti format to PyArchInit-Mini format PyArchInit: [['Copre', '3', '1', 'Scavo archeologico'], ['Copre', '11', '1', 'Scavo archeologico']] PyArchInit-Mini: Copre 3, Copre 11 """ if not rapporti_str or rapporti_str == '[]': return '' try: relationships = self._parse_pyarchinit_rapporti(rapporti_str) # Format as "Relationship US, Relationship US, ..." formatted = ', '.join([f"{rel_type} {us_num}" for rel_type, us_num in relationships]) return formatted except Exception as e: logger.warning(f"Failed to convert rapporti format: {rapporti_str} - {str(e)}") return rapporti_str # Return original if conversion fails def _map_us_fields_import(self, source_data: Dict[str, Any]) -> Dict[str, Any]: """Map US fields from PyArchInit to PyArchInit-Mini format""" # Handle date conversion data_schedatura = source_data.get('data_schedatura') if data_schedatura and isinstance(data_schedatura, str): # Try to parse date string (common formats: YYYY-MM-DD, DD/MM/YYYY) for fmt in ['%Y-%m-%d', '%d/%m/%Y', '%Y/%m/%d', '%d-%m-%Y']: try: data_schedatura = datetime.strptime(data_schedatura, fmt).date() break except (ValueError, AttributeError): continue else: # If parsing fails, set to None data_schedatura = None elif not isinstance(data_schedatura, (type(None), date)): # If it's not None or date, set to None data_schedatura = None mapped = { # Core fields 'sito': source_data.get('sito'), 'area': source_data.get('area'), 'us': source_data.get('us'), 'd_stratigrafica': source_data.get('d_stratigrafica'), 'd_interpretativa': source_data.get('d_interpretativa'), 'descrizione': source_data.get('descrizione'), 'interpretazione': source_data.get('interpretazione'), # Period fields 'periodo_iniziale': source_data.get('periodo_iniziale'), 'fase_iniziale': source_data.get('fase_iniziale'), 'periodo_finale': source_data.get('periodo_finale'), 'fase_finale': source_data.get('fase_finale'), # Excavation fields 'scavato': source_data.get('scavato'), 'attivita': source_data.get('attivita'), 'anno_scavo': source_data.get('anno_scavo'), 'metodo_di_scavo': source_data.get('metodo_di_scavo'), 'data_schedatura': data_schedatura, 'schedatore': source_data.get('schedatore'), # Physical description 'formazione': source_data.get('formazione'), 'stato_di_conservazione': source_data.get('stato_di_conservazione'), 'colore': source_data.get('colore'), 'consistenza': source_data.get('consistenza'), 'struttura': source_data.get('struttura'), # Text fields 'inclusi': source_data.get('inclusi'), 'campioni': source_data.get('campioni'), 'rapporti': self._convert_rapporti_to_mini_format(source_data.get('rapporti', '')), # Convert to readable format 'documentazione': source_data.get('documentazione'), 'cont_per': source_data.get('cont_per'), # Administrative 'order_layer': source_data.get('order_layer'), 'unita_tipo': source_data.get('unita_tipo', 'US'), 'settore': source_data.get('settore'), 'quad_par': source_data.get('quad_par'), 'ambient': source_data.get('ambient'), 'saggio': source_data.get('saggio'), 'n_catalogo_generale': source_data.get('n_catalogo_generale'), 'n_catalogo_interno': source_data.get('n_catalogo_interno'), 'n_catalogo_internazionale': source_data.get('n_catalogo_internazionale'), 'soprintendenza': source_data.get('soprintendenza'), # Measurements 'quota_relativa': source_data.get('quota_relativa'), 'quota_abs': source_data.get('quota_abs'), 'lunghezza_max': source_data.get('lunghezza_max'), 'altezza_max': source_data.get('altezza_max'), 'altezza_min': source_data.get('altezza_min'), 'profondita_max': source_data.get('profondita_max'), 'profondita_min': source_data.get('profondita_min'), 'larghezza_media': source_data.get('larghezza_media'), # Additional 'osservazioni': source_data.get('osservazioni'), 'datazione': source_data.get('datazione'), 'flottazione': source_data.get('flottazione'), 'setacciatura': source_data.get('setacciatura'), 'affidabilita': source_data.get('affidabilita'), 'direttore_us': source_data.get('direttore_us'), 'responsabile_us': source_data.get('responsabile_us'), # Timestamps 'created_at': datetime.now(), 'updated_at': datetime.now() } return mapped def _insert_us_mini(self, session: Session, data: Dict[str, Any]): """Insert US into PyArchInit-Mini database""" # Generate next id_us (VARCHAR field, sequential) max_id_result = session.execute(text("SELECT MAX(CAST(id_us AS INTEGER)) FROM us_table")).fetchone() next_id = (max_id_result[0] or 0) + 1 if max_id_result else 1 data['id_us'] = str(next_id) # Build INSERT with all fields including id_us fields = list(data.keys()) placeholders = [f':{k}' for k in fields] query = text(f""" INSERT INTO us_table ({', '.join(fields)}) VALUES ({', '.join(placeholders)}) """) session.execute(query, data) def _update_us_mini(self, session: Session, data: Dict[str, Any]): """Update US in PyArchInit-Mini database using raw SQL""" # Build UPDATE query dynamically based on provided fields # Exclude identity fields (sito, us, id_us) from update update_fields = {k: v for k, v in data.items() if k not in ['sito', 'us', 'id_us']} if update_fields: set_clause = ', '.join([f"{k} = :{k}" for k in update_fields.keys()]) query = text(f""" UPDATE us_table SET {set_clause} WHERE sito = :sito AND us = :us """) # Combine update fields with identity fields for WHERE clause params = {**update_fields, 'sito': data['sito'], 'us': data['us']} session.execute(query, params) def export_us(self, target_db_connection: str, sito_filter: Optional[List[str]] = None, export_relationships: bool = True) -> Dict[str, Any]: """ Export US from PyArchInit-Mini to PyArchInit Args: target_db_connection: Connection string for target PyArchInit database sito_filter: List of site names to export (None = export all) export_relationships: If True, convert relationships to rapporti format Returns: Dictionary with export statistics """ stats = {'exported': 0, 'updated': 0, 'skipped': 0, 'errors': []} target_engine = create_engine(target_db_connection) target_session = sessionmaker(bind=target_engine)() mini_session = self.mini_session_maker() try: # Query US from PyArchInit-Mini query = "SELECT * FROM us_table" if sito_filter: placeholders = ','.join([f"'{s}'" for s in sito_filter]) query += f" WHERE sito IN ({placeholders})" result = mini_session.execute(text(query)) mini_us_list = result.fetchall() for us_row in mini_us_list: try: us_data = dict(us_row._mapping) # Convert relationships if needed rapporti_str = '[]' if export_relationships: rapporti_str = self._convert_relationships_to_pyarchinit_format( us_data['sito'], us_data['us'], mini_session ) # Map fields from PyArchInit-Mini to PyArchInit mapped_data = self._map_us_fields_export(us_data, rapporti_str) # Check if US exists in target existing = target_session.execute( text("SELECT id_us FROM us_table WHERE sito = :sito AND us = :us"), {'sito': us_data['sito'], 'us': us_data['us']} ).fetchone() if existing: self._update_us_pyarchinit(target_session, mapped_data) stats['updated'] += 1 else: self._insert_us_pyarchinit(target_session, mapped_data) stats['exported'] += 1 target_session.commit() except Exception as e: target_session.rollback() error_msg = f"Error exporting US {us_data.get('sito')}/{us_data.get('us')}: {str(e)}" logger.error(error_msg) stats['errors'].append(error_msg) stats['skipped'] += 1 return stats except Exception as e: logger.error(f"Export US failed: {str(e)}") raise finally: target_session.close() mini_session.close() def _map_us_fields_export(self, source_data: Dict[str, Any], rapporti: str) -> Dict[str, Any]: """Map US fields from PyArchInit-Mini to PyArchInit format""" return { 'sito': source_data.get('sito'), 'area': source_data.get('area'), 'us': source_data.get('us'), 'd_stratigrafica': source_data.get('d_stratigrafica'), 'd_interpretativa': source_data.get('d_interpretativa'), 'descrizione': source_data.get('descrizione'), 'interpretazione': source_data.get('interpretazione'), 'periodo_iniziale': source_data.get('periodo_iniziale'), 'fase_iniziale': source_data.get('fase_iniziale'), 'periodo_finale': source_data.get('periodo_finale'), 'fase_finale': source_data.get('fase_finale'), 'scavato': source_data.get('scavato'), 'attivita': source_data.get('attivita'), 'anno_scavo': source_data.get('anno_scavo'), 'metodo_di_scavo': source_data.get('metodo_di_scavo'), 'data_schedatura': source_data.get('data_schedatura'), 'schedatore': source_data.get('schedatore'), 'formazione': source_data.get('formazione'), 'stato_di_conservazione': source_data.get('stato_di_conservazione'), 'colore': source_data.get('colore'), 'consistenza': source_data.get('consistenza'), 'struttura': source_data.get('struttura'), 'inclusi': source_data.get('inclusi'), 'campioni': source_data.get('campioni'), 'rapporti': rapporti, # Converted relationships 'documentazione': source_data.get('documentazione'), 'cont_per': source_data.get('cont_per'), 'order_layer': source_data.get('order_layer'), 'unita_tipo': source_data.get('unita_tipo', 'US'), 'settore': source_data.get('settore'), 'quad_par': source_data.get('quad_par'), 'ambient': source_data.get('ambient'), 'saggio': source_data.get('saggio'), 'n_catalogo_generale': source_data.get('n_catalogo_generale'), 'n_catalogo_interno': source_data.get('n_catalogo_interno'), 'n_catalogo_internazionale': source_data.get('n_catalogo_internazionale'), 'soprintendenza': source_data.get('soprintendenza'), 'quota_relativa': source_data.get('quota_relativa'), 'quota_abs': source_data.get('quota_abs'), 'lunghezza_max': source_data.get('lunghezza_max'), 'altezza_max': source_data.get('altezza_max'), 'altezza_min': source_data.get('altezza_min'), 'profondita_max': source_data.get('profondita_max'), 'profondita_min': source_data.get('profondita_min'), 'larghezza_media': source_data.get('larghezza_media'), 'osservazioni': source_data.get('osservazioni'), 'datazione': source_data.get('datazione'), 'flottazione': source_data.get('flottazione'), 'setacciatura': source_data.get('setacciatura'), 'affidabilita': source_data.get('affidabilita'), 'direttore_us': source_data.get('direttore_us'), 'responsabile_us': source_data.get('responsabile_us') } def _insert_us_pyarchinit(self, session: Session, data: Dict[str, Any]): """Insert US into PyArchInit database""" query = text(""" INSERT INTO us_table (sito, area, us, d_stratigrafica, d_interpretativa, descrizione, interpretazione, periodo_iniziale, fase_iniziale, periodo_finale, fase_finale, scavato, attivita, anno_scavo, metodo_di_scavo, data_schedatura, schedatore, formazione, stato_di_conservazione, colore, consistenza, struttura, inclusi, campioni, rapporti, documentazione, cont_per, order_layer, unita_tipo, settore, quad_par, ambient, saggio, n_catalogo_generale, n_catalogo_interno, n_catalogo_internazionale, soprintendenza, quota_relativa, quota_abs, lunghezza_max, altezza_max, altezza_min, profondita_max, profondita_min, larghezza_media, osservazioni, datazione, flottazione, setacciatura, affidabilita, direttore_us, responsabile_us) VALUES (:sito, :area, :us, :d_stratigrafica, :d_interpretativa, :descrizione, :interpretazione, :periodo_iniziale, :fase_iniziale, :periodo_finale, :fase_finale, :scavato, :attivita, :anno_scavo, :metodo_di_scavo, :data_schedatura, :schedatore, :formazione, :stato_di_conservazione, :colore, :consistenza, :struttura, :inclusi, :campioni, :rapporti, :documentazione, :cont_per, :order_layer, :unita_tipo, :settore, :quad_par, :ambient, :saggio, :n_catalogo_generale, :n_catalogo_interno, :n_catalogo_internazionale, :soprintendenza, :quota_relativa, :quota_abs, :lunghezza_max, :altezza_max, :altezza_min, :profondita_max, :profondita_min, :larghezza_media, :osservazioni, :datazione, :flottazione, :setacciatura, :affidabilita, :direttore_us, :responsabile_us) """) session.execute(query, data) def _update_us_pyarchinit(self, session: Session, data: Dict[str, Any]): """Update US in PyArchInit database""" query = text(""" UPDATE us_table SET area = :area, d_stratigrafica = :d_stratigrafica, d_interpretativa = :d_interpretativa, descrizione = :descrizione, interpretazione = :interpretazione, periodo_iniziale = :periodo_iniziale, fase_iniziale = :fase_iniziale, periodo_finale = :periodo_finale, fase_finale = :fase_finale, scavato = :scavato, attivita = :attivita, anno_scavo = :anno_scavo, metodo_di_scavo = :metodo_di_scavo, data_schedatura = :data_schedatura, schedatore = :schedatore, formazione = :formazione, stato_di_conservazione = :stato_di_conservazione, colore = :colore, consistenza = :consistenza, struttura = :struttura, inclusi = :inclusi, campioni = :campioni, rapporti = :rapporti, documentazione = :documentazione, cont_per = :cont_per, order_layer = :order_layer, unita_tipo = :unita_tipo, settore = :settore, quad_par = :quad_par, ambient = :ambient, saggio = :saggio, n_catalogo_generale = :n_catalogo_generale, n_catalogo_interno = :n_catalogo_interno, n_catalogo_internazionale = :n_catalogo_internazionale, soprintendenza = :soprintendenza, quota_relativa = :quota_relativa, quota_abs = :quota_abs, lunghezza_max = :lunghezza_max, altezza_max = :altezza_max, altezza_min = :altezza_min, profondita_max = :profondita_max, profondita_min = :profondita_min, larghezza_media = :larghezza_media, osservazioni = :osservazioni, datazione = :datazione, flottazione = :flottazione, setacciatura = :setacciatura, affidabilita = :affidabilita, direttore_us = :direttore_us, responsabile_us = :responsabile_us WHERE sito = :sito AND us = :us """) session.execute(query, data) # ============================================================================ # INVENTARIO MATERIALI IMPORT/EXPORT # ============================================================================ def import_inventario(self, sito_filter: Optional[List[str]] = None, auto_migrate: bool = True, auto_backup: bool = True) -> Dict[str, Any]: """ Import Inventario Materiali from PyArchInit to PyArchInit-Mini Args: sito_filter: List of site names to import (None = import all) auto_migrate: If True, automatically add missing i18n columns to source database auto_backup: If True, create backup before database migration Returns: Dictionary with import statistics including backup_path """ if not self.source_engine: raise ValueError("Source database not configured") # Auto-migrate source database to add i18n columns if needed if auto_migrate: logger.info("Checking source database for missing i18n columns...") migration_stats = self.migrate_source_database(tables=['inventario_materiali_table'], auto_backup=auto_backup) if migration_stats['columns_added'] > 0: logger.info(f"Added {migration_stats['columns_added']} i18n columns to source database") if migration_stats.get('backup_path'): logger.info(f"Database backup: {migration_stats['backup_path']}") stats = {'imported': 0, 'updated': 0, 'skipped': 0, 'errors': []} source_session = self.source_session_maker() mini_session = self.mini_session_maker() try: # Find correct table name (might be backup table) inspector = inspect(self.source_engine) tables = inspector.get_table_names() inv_table = None for table in tables: if 'inventario_materiali_table' in table and 'backup' in table: # Use most recent backup if inv_table is None or table > inv_table: inv_table = table if inv_table is None: # Try without backup inv_table = 'inventario_materiali_table_toimp' if 'inventario_materiali_table_toimp' in tables else None if inv_table is None: raise ValueError("Could not find inventario_materiali table in source database") # Query inventario from PyArchInit query = f"SELECT * FROM {inv_table}" if sito_filter: placeholders = ','.join([f"'{s}'" for s in sito_filter]) query += f" WHERE sito IN ({placeholders})" result = source_session.execute(text(query)) source_inv_list = result.fetchall() for inv_row in source_inv_list: try: inv_data = dict(inv_row._mapping) # Check if record exists existing = mini_session.execute( text("""SELECT id_invmat FROM inventario_materiali_table WHERE sito = :sito AND numero_inventario = :numero_inventario"""), {'sito': inv_data['sito'], 'numero_inventario': inv_data['numero_inventario']} ).fetchone() # Map fields mapped_data = self._map_inventario_fields(inv_data) if existing: self._update_inventario_mini(mini_session, mapped_data) stats['updated'] += 1 else: self._insert_inventario_mini(mini_session, mapped_data) stats['imported'] += 1 mini_session.commit() except Exception as e: mini_session.rollback() error_msg = f"Error importing inventario {inv_data.get('sito')}/{inv_data.get('numero_inventario')}: {str(e)}" logger.error(error_msg) stats['errors'].append(error_msg) stats['skipped'] += 1 return stats except Exception as e: logger.error(f"Import inventario failed: {str(e)}") raise finally: source_session.close() mini_session.close() def _map_inventario_fields(self, source_data: Dict[str, Any]) -> Dict[str, Any]: """Map inventario fields from PyArchInit to PyArchInit-Mini""" return { 'sito': source_data.get('sito'), 'numero_inventario': source_data.get('numero_inventario'), 'tipo_reperto': source_data.get('tipo_reperto'), 'criterio_schedatura': source_data.get('criterio_schedatura'), 'definizione': source_data.get('definizione'), 'descrizione': source_data.get('descrizione'), 'area': source_data.get('area'), 'us': source_data.get('us'), 'lavato': source_data.get('lavato'), 'nr_cassa': source_data.get('nr_cassa'), 'luogo_conservazione': source_data.get('luogo_conservazione'), 'stato_conservazione': source_data.get('stato_conservazione'), 'datazione_reperto': source_data.get('datazione_reperto'), 'elementi_reperto': source_data.get('elementi_reperto'), 'misurazioni': source_data.get('misurazioni'), 'rif_biblio': source_data.get('rif_biblio'), 'tecnologie': source_data.get('tecnologie'), 'forme_minime': source_data.get('forme_minime'), 'forme_massime': source_data.get('forme_massime'), 'totale_frammenti': source_data.get('totale_frammenti'), 'corpo_ceramico': source_data.get('corpo_ceramico'), 'rivestimento': source_data.get('rivestimento'), 'diametro_orlo': source_data.get('diametro_orlo'), 'peso': source_data.get('peso'), 'tipo': source_data.get('tipo'), 'eve_orlo': source_data.get('eve_orlo'), 'repertato': source_data.get('repertato'), 'diagnostico': source_data.get('diagnostico'), 'n_reperto': source_data.get('n_reperto'), 'tipo_contenitore': source_data.get('tipo_contenitore'), 'struttura': source_data.get('struttura'), 'years': source_data.get('years'), 'created_at': datetime.now(), 'updated_at': datetime.now() } def _insert_inventario_mini(self, session: Session, data: Dict[str, Any]): """Insert inventario into PyArchInit-Mini database""" query = text(""" INSERT INTO inventario_materiali_table (sito, numero_inventario, tipo_reperto, criterio_schedatura, definizione, descrizione, area, us, lavato, nr_cassa, luogo_conservazione, stato_conservazione, datazione_reperto, elementi_reperto, misurazioni, rif_biblio, tecnologie, forme_minime, forme_massime, totale_frammenti, corpo_ceramico, rivestimento, diametro_orlo, peso, tipo, eve_orlo, repertato, diagnostico, n_reperto, tipo_contenitore, struttura, years, created_at, updated_at) VALUES (:sito, :numero_inventario, :tipo_reperto, :criterio_schedatura, :definizione, :descrizione, :area, :us, :lavato, :nr_cassa, :luogo_conservazione, :stato_conservazione, :datazione_reperto, :elementi_reperto, :misurazioni, :rif_biblio, :tecnologie, :forme_minime, :forme_massime, :totale_frammenti, :corpo_ceramico, :rivestimento, :diametro_orlo, :peso, :tipo, :eve_orlo, :repertato, :diagnostico, :n_reperto, :tipo_contenitore, :struttura, :years, :created_at, :updated_at) """) session.execute(query, data) def _update_inventario_mini(self, session: Session, data: Dict[str, Any]): """Update inventario in PyArchInit-Mini database""" query = text(""" UPDATE inventario_materiali_table SET tipo_reperto = :tipo_reperto, criterio_schedatura = :criterio_schedatura, definizione = :definizione, descrizione = :descrizione, area = :area, us = :us, lavato = :lavato, nr_cassa = :nr_cassa, luogo_conservazione = :luogo_conservazione, stato_conservazione = :stato_conservazione, datazione_reperto = :datazione_reperto, elementi_reperto = :elementi_reperto, misurazioni = :misurazioni, rif_biblio = :rif_biblio, tecnologie = :tecnologie, forme_minime = :forme_minime, forme_massime = :forme_massime, totale_frammenti = :totale_frammenti, corpo_ceramico = :corpo_ceramico, rivestimento = :rivestimento, diametro_orlo = :diametro_orlo, peso = :peso, tipo = :tipo, eve_orlo = :eve_orlo, repertato = :repertato, diagnostico = :diagnostico, n_reperto = :n_reperto, tipo_contenitore = :tipo_contenitore, struttura = :struttura, years = :years, updated_at = :updated_at WHERE sito = :sito AND numero_inventario = :numero_inventario """) session.execute(query, data) # ============================================================================ # PERIODIZZAZIONE IMPORT/EXPORT # ============================================================================ def import_periodizzazione(self, sito_filter: Optional[List[str]] = None) -> Dict[str, Any]: """ Import Periodizzazione from PyArchInit to PyArchInit-Mini Args: sito_filter: List of site names to import (None = import all) Returns: Dictionary with import statistics """ if not self.source_engine: raise ValueError("Source database not configured") stats = {'imported': 0, 'updated': 0, 'skipped': 0, 'errors': []} source_session = self.source_session_maker() mini_session = self.mini_session_maker() try: # Query periodizzazione from PyArchInit query = "SELECT * FROM periodizzazione_table" if sito_filter: placeholders = ','.join([f"'{s}'" for s in sito_filter]) query += f" WHERE sito IN ({placeholders})" result = source_session.execute(text(query)) source_period_list = result.fetchall() for period_row in source_period_list: try: period_data = dict(period_row._mapping) # Map and insert/update mapped_data = { 'sito': period_data.get('sito'), 'area': period_data.get('area'), 'us': period_data.get('us'), 'periodo_iniziale': period_data.get('periodo'), 'fase_iniziale': period_data.get('fase'), 'datazione_estesa': period_data.get('datazione_estesa'), 'created_at': datetime.now(), 'updated_at': datetime.now() } # Check if exists existing = mini_session.execute( text("""SELECT id_periodizzazione FROM periodizzazione_table WHERE sito = :sito AND us = :us"""), {'sito': mapped_data['sito'], 'us': mapped_data.get('us')} ).fetchone() if not existing: insert_query = text(""" INSERT INTO periodizzazione_table (sito, area, us, periodo_iniziale, fase_iniziale, datazione_estesa, created_at, updated_at) VALUES (:sito, :area, :us, :periodo_iniziale, :fase_iniziale, :datazione_estesa, :created_at, :updated_at) """) mini_session.execute(insert_query, mapped_data) stats['imported'] += 1 else: stats['skipped'] += 1 mini_session.commit() except Exception as e: mini_session.rollback() error_msg = f"Error importing periodizzazione: {str(e)}" logger.error(error_msg) stats['errors'].append(error_msg) stats['skipped'] += 1 return stats except Exception as e: logger.error(f"Import periodizzazione failed: {str(e)}") raise finally: source_session.close() mini_session.close() # ============================================================================ # THESAURUS IMPORT/EXPORT # ============================================================================ def import_thesaurus(self) -> Dict[str, Any]: """ Import Thesaurus from PyArchInit to PyArchInit-Mini Returns: Dictionary with import statistics """ if not self.source_engine: raise ValueError("Source database not configured") stats = {'imported': 0, 'updated': 0, 'skipped': 0, 'errors': []} source_session = self.source_session_maker() mini_session = self.mini_session_maker() try: # Query thesaurus from PyArchInit result = source_session.execute(text("SELECT * FROM pyarchinit_thesaurus_sigle")) source_thesaurus_list = result.fetchall() for thesaurus_row in source_thesaurus_list: try: thes_data = dict(thesaurus_row._mapping) # Check if exists existing = mini_session.execute( text("""SELECT id_thesaurus_sigle FROM pyarchinit_thesaurus_sigle WHERE nome_tabella = :nome_tabella AND sigla = :sigla"""), {'nome_tabella': thes_data['nome_tabella'], 'sigla': thes_data['sigla']} ).fetchone() if existing: stats['skipped'] += 1 continue # Insert new thesaurus entry insert_query = text(""" INSERT INTO pyarchinit_thesaurus_sigle (nome_tabella, sigla, sigla_estesa, descrizione, tipologia_sigla, lingua, created_at, updated_at) VALUES (:nome_tabella, :sigla, :sigla_estesa, :descrizione, :tipologia_sigla, :lingua, :created_at, :updated_at) """) mini_session.execute(insert_query, { 'nome_tabella': thes_data.get('nome_tabella'), 'sigla': thes_data.get('sigla'), 'sigla_estesa': thes_data.get('sigla_estesa'), 'descrizione': thes_data.get('descrizione'), 'tipologia_sigla': thes_data.get('tipologia_sigla'), 'lingua': thes_data.get('lingua', ''), 'created_at': datetime.now(), 'updated_at': datetime.now() }) stats['imported'] += 1 mini_session.commit() except Exception as e: mini_session.rollback() error_msg = f"Error importing thesaurus: {str(e)}" logger.error(error_msg) stats['errors'].append(error_msg) stats['skipped'] += 1 return stats except Exception as e: logger.error(f"Import thesaurus failed: {str(e)}") raise finally: source_session.close() mini_session.close() # ============================================================================ # UTILITY METHODS # ============================================================================ def get_available_sites_in_source(self) -> List[str]: """Get list of available sites in source database""" if not self.source_engine: raise ValueError("Source database not configured") session = self.source_session_maker() try: result = session.execute(text("SELECT DISTINCT sito FROM site_table ORDER BY sito")) return [row[0] for row in result.fetchall()] finally: session.close() def validate_database_connection(self, connection_string: str) -> bool: """Validate database connection string""" try: engine = create_engine(connection_string) with engine.connect() as conn: conn.execute(text("SELECT 1")) return True except Exception as e: logger.error(f"Database validation failed: {str(e)}") return False def sync_datazioni_from_periodizzazione(self) -> Dict[str, Any]: """ Synchronize datazioni_table from unique periodo values in periodizzazione_table This creates entries in datazioni_table for each unique periodo found in periodizzazione, if they don't already exist. Returns: Dictionary with sync statistics """ stats = {'created': 0, 'skipped': 0, 'errors': []} mini_session = self.mini_session_maker() try: # Get unique periodo values from periodizzazione_table result = mini_session.execute(text(""" SELECT DISTINCT periodo_iniziale, datazione_estesa FROM periodizzazione_table WHERE periodo_iniziale IS NOT NULL AND periodo_iniziale != '' ORDER BY periodo_iniziale """)) unique_periodi = result.fetchall() logger.info(f"Found {len(unique_periodi)} unique periodi in periodizzazione") for periodo_row in unique_periodi: periodo_nome = periodo_row[0] datazione_estesa = periodo_row[1] or '' try: # Check if datazione already exists existing = mini_session.execute( text("SELECT id_datazione FROM datazioni_table WHERE nome_datazione = :nome"), {'nome': periodo_nome} ).fetchone() if not existing: # Insert new datazione mini_session.execute(text(""" INSERT INTO datazioni_table (nome_datazione, descrizione, created_at, updated_at) VALUES (:nome, :descrizione, :created, :updated) """), { 'nome': periodo_nome, 'descrizione': datazione_estesa, 'created': datetime.now(), 'updated': datetime.now() }) mini_session.commit() stats['created'] += 1 logger.info(f"Created datazione: {periodo_nome}") else: stats['skipped'] += 1 except Exception as e: mini_session.rollback() error_msg = f"Error syncing datazione '{periodo_nome}': {str(e)}" logger.error(error_msg) stats['errors'].append(error_msg) return stats except Exception as e: logger.error(f"Sync datazioni failed: {str(e)}") raise finally: mini_session.close()
[docs] def sync_datazioni_from_us_values(self) -> Dict[str, Any]: """ Synchronize datazioni_table from unique datazione values in us_table This creates entries in datazioni_table for each unique datazione value found in us_table, if they don't already exist. This ensures that all datazione values from imported US records are available in the dropdown. Returns: Dictionary with sync statistics """ stats = {'created': 0, 'skipped': 0, 'errors': []} mini_session = self.mini_session_maker() try: # Get unique datazione values from us_table result = mini_session.execute(text(""" SELECT DISTINCT datazione FROM us_table WHERE datazione IS NOT NULL AND datazione != '' ORDER BY datazione """)) unique_datazioni = result.fetchall() logger.info(f"Found {len(unique_datazioni)} unique datazione values in us_table") for datazione_row in unique_datazioni: datazione_value = datazione_row[0] try: # Check if datazione already exists existing = mini_session.execute( text("SELECT id_datazione FROM datazioni_table WHERE nome_datazione = :nome"), {'nome': datazione_value} ).fetchone() if not existing: # Insert new datazione mini_session.execute(text(""" INSERT INTO datazioni_table (nome_datazione, descrizione, created_at, updated_at) VALUES (:nome, :descrizione, :created, :updated) """), { 'nome': datazione_value, 'descrizione': datazione_value, # Use same value for description 'created': datetime.now(), 'updated': datetime.now() }) mini_session.commit() stats['created'] += 1 logger.info(f"Created datazione from US: {datazione_value}") else: stats['skipped'] += 1 except Exception as e: mini_session.rollback() error_msg = f"Error syncing datazione '{datazione_value}': {str(e)}" logger.error(error_msg) stats['errors'].append(error_msg) return stats except Exception as e: logger.error(f"Sync datazioni from US failed: {str(e)}") raise finally: mini_session.close()
def update_us_datazione_from_periodizzazione(self, sito_filter: Optional[List[str]] = None) -> Dict[str, Any]: """ Update datazione field in us_table based on periodizzazione_table data For each US, finds the corresponding periodo_iniziale from periodizzazione and sets it as the datazione field in us_table. Args: sito_filter: List of site names to update (None = update all) Returns: Dictionary with update statistics """ stats = {'updated': 0, 'skipped': 0, 'errors': []} mini_session = self.mini_session_maker() try: # Build query for US with sito filter us_query = "SELECT sito, us FROM us_table" if sito_filter: placeholders = ','.join([f"'{s}'" for s in sito_filter]) us_query += f" WHERE sito IN ({placeholders})" result = mini_session.execute(text(us_query)) us_list = result.fetchall() logger.info(f"Updating datazione for {len(us_list)} US from periodizzazione") for us_row in us_list: sito = us_row[0] us_num = us_row[1] try: # Get periodo_iniziale from periodizzazione for this US periodo_result = mini_session.execute(text(""" SELECT periodo_iniziale FROM periodizzazione_table WHERE sito = :sito AND us = :us LIMIT 1 """), {'sito': sito, 'us': us_num}) periodo_row = periodo_result.fetchone() if periodo_row and periodo_row[0]: periodo_iniziale = periodo_row[0] # Update us_table datazione field mini_session.execute(text(""" UPDATE us_table SET datazione = :datazione, updated_at = :updated WHERE sito = :sito AND us = :us """), { 'datazione': periodo_iniziale, 'updated': datetime.now(), 'sito': sito, 'us': us_num }) mini_session.commit() stats['updated'] += 1 else: stats['skipped'] += 1 except Exception as e: mini_session.rollback() error_msg = f"Error updating US {sito}/{us_num}: {str(e)}" logger.error(error_msg) stats['errors'].append(error_msg) logger.info(f"Updated {stats['updated']} US with datazione from periodizzazione") return stats except Exception as e: logger.error(f"Update US datazione failed: {str(e)}") raise finally: mini_session.close() # ============================================================================ # DATABASE MIGRATION (SQLite ↔ PostgreSQL) # ============================================================================ @staticmethod def migrate_database(source_db_url: str, target_db_url: str, create_target: bool = True, overwrite_target: bool = False, auto_backup: bool = True, backup_dir: Optional[str] = None, merge_strategy: str = 'skip') -> Dict[str, Any]: """ Migrate all data from source database to target database Supports: - SQLite → PostgreSQL - PostgreSQL → SQLite - SQLite → SQLite (copy/backup) - PostgreSQL → PostgreSQL (copy/backup) Args: source_db_url: Source database connection string target_db_url: Target database connection string create_target: If True, create target database with schema if it doesn't exist overwrite_target: If True and create_target=True, overwrite existing target database auto_backup: If True, automatically create a backup of source database before migration backup_dir: Optional directory for backups (defaults to same dir as source) merge_strategy: How to handle ID conflicts: 'skip', 'overwrite', or 'renumber' - 'skip': Skip records with conflicting IDs (default) - 'overwrite': Update existing records with new data - 'renumber': Generate new IDs for conflicting records Returns: Dictionary with migration statistics including backup info """ stats = { 'success': False, 'tables_migrated': 0, 'total_rows_copied': 0, 'rows_per_table': {}, 'errors': [], 'duration_seconds': 0, 'backup_created': False, 'backup_path': None, 'backup_size_mb': 0.0 } import time start_time = time.time() try: # Create backup of source database if requested if auto_backup: logger.info("Creating backup of source database...") backup_result = ImportExportService._create_backup(source_db_url, backup_dir) if backup_result['success']: stats['backup_created'] = True stats['backup_path'] = backup_result['path'] stats['backup_size_mb'] = backup_result['size_mb'] logger.info(f"✓ Backup created: {backup_result['path']}") else: logger.warning(f"⚠ Backup failed: {backup_result['message']}") stats['errors'].append(f"Backup warning: {backup_result['message']}") # Continue with migration even if backup fails (user chose auto_backup=True) else: logger.info("Skipping backup (auto_backup=False)") # Create target database with schema if requested if create_target: logger.info(f"Creating target database with schema...") from pyarchinit_mini.database.database_creator import create_empty_database # Determine target database type if target_db_url.startswith('sqlite:///'): # Extract path from SQLite URL db_path = target_db_url.replace('sqlite:///', '') if not db_path.startswith('/'): db_path = '/' + db_path create_result = create_empty_database('sqlite', db_path, overwrite=overwrite_target) if not create_result['success']: raise RuntimeError(f"Failed to create target SQLite database: {create_result['message']}") elif target_db_url.startswith('postgresql'): # Parse PostgreSQL URL: postgresql://user:pass@host:port/database from sqlalchemy.engine.url import make_url url = make_url(target_db_url) pg_config = { 'host': url.host or 'localhost', 'port': url.port or 5432, 'database': url.database, 'username': url.username, 'password': url.password or '' } create_result = create_empty_database('postgresql', pg_config, overwrite=overwrite_target) if not create_result['success']: raise RuntimeError(f"Failed to create target PostgreSQL database: {create_result['message']}") else: raise ValueError(f"Unsupported target database type: {target_db_url}") logger.info(f"Target database created with {create_result['tables_created']} tables") # Connect to both databases source_engine = create_engine(source_db_url) target_engine = create_engine(target_db_url) source_session_maker = sessionmaker(bind=source_engine) target_session_maker = sessionmaker(bind=target_engine) # Get list of tables to migrate (in correct order to handle foreign keys) # Order matters: create tables without dependencies first tables_order = [ 'users_table', 'site_table', 'datazioni_table', 'us_table', 'us_relationships_table', 'periodizzazione_table', 'inventario_materiali_table', 'pyarchinit_thesaurus_sigle', 'media_table', 'harris_matrix_table', 'periods_table', 'extended_matrix_nodes_table' ] # Migrate each table for table_name in tables_order: try: rows_copied = ImportExportService._migrate_table( table_name, source_session_maker, target_session_maker, merge_strategy=merge_strategy ) if rows_copied > 0: stats['tables_migrated'] += 1 stats['total_rows_copied'] += rows_copied stats['rows_per_table'][table_name] = rows_copied logger.info(f"✓ Migrated {table_name}: {rows_copied} rows") else: logger.info(f"○ Skipped {table_name}: empty or not found") except Exception as e: error_msg = f"Error migrating table {table_name}: {str(e)}" logger.error(error_msg) stats['errors'].append(error_msg) # Continue with other tables even if one fails # Reset PostgreSQL sequences after migration if target_db_url.startswith('postgresql'): logger.info("Resetting PostgreSQL sequences...") sequence_stats = ImportExportService._reset_postgresql_sequences(target_engine) logger.info(f"Reset {sequence_stats['sequences_reset']} sequences") if sequence_stats['errors']: stats['errors'].extend(sequence_stats['errors']) stats['success'] = stats['tables_migrated'] > 0 stats['duration_seconds'] = time.time() - start_time logger.info(f"Migration complete: {stats['tables_migrated']} tables, {stats['total_rows_copied']} rows in {stats['duration_seconds']:.2f}s") return stats except Exception as e: logger.error(f"Database migration failed: {str(e)}") stats['errors'].append(str(e)) stats['duration_seconds'] = time.time() - start_time return stats @staticmethod def _create_backup(db_url: str, backup_dir: Optional[str] = None) -> Dict[str, Any]: """ Create a backup of a database before migration Args: db_url: Database connection string backup_dir: Optional directory for backup (defaults to same dir as source) Returns: Dictionary with backup info: {'success': bool, 'path': str, 'size_mb': float, 'message': str} """ result = { 'success': False, 'path': None, 'size_mb': 0.0, 'message': '' } try: # SQLite backup if db_url.startswith('sqlite:///'): # Extract file path from connection string db_path = db_url.replace('sqlite:///', '') # Handle absolute paths (start with /) if not db_path.startswith('/'): db_path = '/' + db_path if not os.path.exists(db_path): result['message'] = f"Source database file not found: {db_path}" return result # Create backup with timestamp timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') if backup_dir: os.makedirs(backup_dir, exist_ok=True) backup_filename = os.path.basename(db_path) backup_path = os.path.join(backup_dir, f"{backup_filename}.backup_{timestamp}") else: backup_path = f"{db_path}.backup_{timestamp}" shutil.copy2(db_path, backup_path) file_size = os.path.getsize(backup_path) / (1024 * 1024) # MB result['success'] = True result['path'] = backup_path result['size_mb'] = round(file_size, 2) result['message'] = f"SQLite backup created ({result['size_mb']} MB)" logger.info(f"✓ Database backup: {backup_path} ({result['size_mb']} MB)") # PostgreSQL backup elif db_url.startswith('postgresql'): import subprocess from sqlalchemy.engine.url import make_url # Parse connection URL url = make_url(db_url) host = url.host or 'localhost' port = url.port or 5432 database = url.database user = url.username password = url.password # Create backup file path timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') if backup_dir: os.makedirs(backup_dir, exist_ok=True) backup_path = os.path.join(backup_dir, f"{database}_backup_{timestamp}.sql") else: backup_path = f"{database}_backup_{timestamp}.sql" # Set password environment variable env = os.environ.copy() if password: env['PGPASSWORD'] = password # Run pg_dump cmd = [ 'pg_dump', '-h', host, '-p', str(port), '-U', user, '-F', 'p', # Plain SQL format '-f', backup_path, database ] pg_result = subprocess.run(cmd, env=env, capture_output=True, text=True) if pg_result.returncode == 0: file_size = os.path.getsize(backup_path) / (1024 * 1024) # MB result['success'] = True result['path'] = backup_path result['size_mb'] = round(file_size, 2) result['message'] = f"PostgreSQL backup created ({result['size_mb']} MB)" logger.info(f"✓ PostgreSQL backup: {backup_path} ({result['size_mb']} MB)") else: result['message'] = f"pg_dump failed: {pg_result.stderr}" logger.error(result['message']) else: result['message'] = f"Unsupported database type for backup: {db_url}" except Exception as e: result['message'] = f"Backup failed: {str(e)}" logger.error(result['message']) return result @staticmethod def _detect_conflicts(source_db_url: str, target_db_url: str) -> Dict[str, Any]: """ Detect conflicts between source and target databases before migration Analyzes both databases to find: - Duplicate IDs (primary key conflicts) - New records that don't exist in target - Records that exist in both databases Args: source_db_url: Source database connection string target_db_url: Target database connection string Returns: Dictionary with conflict analysis: { 'has_conflicts': bool, 'total_conflicts': int, 'total_new_records': int, 'tables': { 'table_name': { 'conflicts': int, 'new_records': int, 'conflicting_ids': [list of IDs], 'exists_in_target': bool } } } """ result = { 'has_conflicts': False, 'total_conflicts': 0, 'total_new_records': 0, 'tables': {}, 'errors': [] } try: # Connect to both databases source_engine = create_engine(source_db_url) target_engine = create_engine(target_db_url) source_session_maker = sessionmaker(bind=source_engine) target_session_maker = sessionmaker(bind=target_engine) # Tables to check (in same order as migration) tables_order = [ 'users_table', 'site_table', 'datazioni_table', 'us_table', 'us_relationships_table', 'periodizzazione_table', 'inventario_materiali_table', 'pyarchinit_thesaurus_sigle', 'media_table', 'harris_matrix_table', 'periods_table', 'extended_matrix_nodes_table' ] for table_name in tables_order: source_session = source_session_maker() target_session = target_session_maker() try: # Get primary key column name for this table inspector = inspect(source_engine) pk_columns = inspector.get_pk_constraint(table_name).get('constrained_columns', []) if not pk_columns: logger.warning(f"No primary key found for {table_name}, skipping") continue pk_column = pk_columns[0] # Use first PK column # Check if table exists in source try: source_result = source_session.execute(text(f"SELECT {pk_column} FROM {table_name}")) source_ids = set(row[0] for row in source_result.fetchall()) except Exception: # Table doesn't exist in source or has no data continue if not source_ids: continue # Skip empty tables # Check if table exists in target target_ids = set() table_exists_in_target = False try: target_result = target_session.execute(text(f"SELECT {pk_column} FROM {table_name}")) target_ids = set(row[0] for row in target_result.fetchall()) table_exists_in_target = True except Exception: # Table doesn't exist in target or is empty - all records are new table_exists_in_target = False # Find conflicts (IDs that exist in both) conflicting_ids = source_ids & target_ids new_ids = source_ids - target_ids # Store results for this table result['tables'][table_name] = { 'conflicts': len(conflicting_ids), 'new_records': len(new_ids), 'conflicting_ids': sorted(list(conflicting_ids)), 'exists_in_target': table_exists_in_target, 'total_source_records': len(source_ids) } # Update totals result['total_conflicts'] += len(conflicting_ids) result['total_new_records'] += len(new_ids) if len(conflicting_ids) > 0: result['has_conflicts'] = True logger.info(f"⚠️ {table_name}: {len(conflicting_ids)} conflicts, {len(new_ids)} new") else: logger.info(f"✓ {table_name}: {len(new_ids)} new records, no conflicts") except Exception as e: error_msg = f"Error analyzing {table_name}: {str(e)}" logger.error(error_msg) result['errors'].append(error_msg) finally: source_session.close() target_session.close() logger.info(f"Conflict detection complete: {result['total_conflicts']} conflicts, {result['total_new_records']} new records") except Exception as e: error_msg = f"Conflict detection failed: {str(e)}" logger.error(error_msg) result['errors'].append(error_msg) return result @staticmethod def _reset_postgresql_sequences(target_engine) -> Dict[str, Any]: """ Reset PostgreSQL sequences to max(id) + 1 after migration This fixes the issue where sequences are not updated during data migration, causing IntegrityError when inserting new records. Args: target_engine: Target database engine Returns: Dictionary with reset statistics """ stats = {'sequences_reset': 0, 'errors': []} # Only reset if target is PostgreSQL if not str(target_engine.url).startswith('postgresql'): return stats # Define tables with auto-increment primary keys (SERIAL columns) # Format: (table_name, id_column_name, sequence_name) sequences = [ ('site_table', 'id_sito', 'site_table_id_sito_seq'), ('inventario_materiali_table', 'id_invmat', 'inventario_materiali_table_id_invmat_seq'), ('media_table', 'id_media', 'media_table_id_media_seq'), ('users_table', 'id_user', 'users_table_id_user_seq'), ('datazioni_table', 'id_datazione', 'datazioni_table_id_datazione_seq'), ('periodizzazione_table', 'id_periodizzazione', 'periodizzazione_table_id_periodizzazione_seq'), ('us_relationships_table', 'id_relationship', 'us_relationships_table_id_relationship_seq'), ('pyarchinit_thesaurus_sigle', 'id_thesaurus_sigle', 'pyarchinit_thesaurus_sigle_id_thesaurus_sigle_seq'), ('harris_matrix_table', 'id_matrix', 'harris_matrix_table_id_matrix_seq'), ('periods_table', 'id', 'periods_table_id_seq'), ('extended_matrix_nodes_table', 'id', 'extended_matrix_nodes_table_id_seq'), ] with target_engine.connect() as conn: for table_name, id_column, sequence_name in sequences: try: # Get max ID from table result = conn.execute(text(f"SELECT MAX({id_column}) FROM {table_name}")) max_id = result.scalar() if max_id is not None: # Reset sequence to max_id + 1 conn.execute(text(f"SELECT setval('{sequence_name}', :max_id, true)"), {'max_id': max_id}) conn.commit() stats['sequences_reset'] += 1 logger.info(f"Reset sequence {sequence_name} to {max_id + 1}") except Exception as e: error_msg = f"Failed to reset sequence for {table_name}: {str(e)}" logger.warning(error_msg) stats['errors'].append(error_msg) # Continue with other sequences return stats @staticmethod def _convert_boolean_fields(table_name: str, row_data: Dict[str, Any], target_engine) -> Dict[str, Any]: """ Convert integer boolean values (0/1) to Python boolean (False/True) for PostgreSQL Args: table_name: Name of the table row_data: Row data dictionary target_engine: Target database engine Returns: Row data with converted boolean fields """ # Only convert if target is PostgreSQL if not str(target_engine.url).startswith('postgresql'): return row_data # Define boolean columns for each table boolean_columns = { 'site_table': ['find_check'], 'media_table': ['is_primary', 'is_public'], 'harris_matrix_table': ['is_final', 'is_public'], 'users_table': ['is_active', 'is_superuser'] } # Get boolean columns for this table bool_cols = boolean_columns.get(table_name, []) # Convert integer values to boolean converted_data = row_data.copy() for col in bool_cols: if col in converted_data and converted_data[col] is not None: # Convert 0/1 to False/True if isinstance(converted_data[col], int): converted_data[col] = bool(converted_data[col]) return converted_data @staticmethod def _migrate_table(table_name: str, source_session_maker, target_session_maker, merge_strategy: str = 'skip') -> int: """ Migrate data from one table to another with conflict resolution Args: table_name: Name of the table to migrate source_session_maker: Source database session maker target_session_maker: Target database session maker merge_strategy: How to handle ID conflicts: 'skip', 'overwrite', or 'renumber' Returns: Number of rows copied/updated """ source_session = source_session_maker() target_session = target_session_maker() rows_processed = 0 rows_skipped = 0 rows_updated = 0 rows_renumbered = 0 try: # Check if table exists in source try: source_session.execute(text(f"SELECT 1 FROM {table_name} LIMIT 1")) except Exception: # Table doesn't exist in source, skip it return 0 # Get primary key column for this table from sqlalchemy import inspect source_engine = source_session.get_bind() inspector = inspect(source_engine) pk_columns = inspector.get_pk_constraint(table_name).get('constrained_columns', []) if not pk_columns: logger.warning(f"No primary key found for {table_name}, using simple INSERT strategy") pk_column = None else: pk_column = pk_columns[0] # Use first PK column logger.debug(f"Using primary key column '{pk_column}' for {table_name}") # Get all rows from source table result = source_session.execute(text(f"SELECT * FROM {table_name}")) rows = result.fetchall() if not rows: return 0 # Get column names column_names = list(rows[0]._mapping.keys()) # Get target engine for boolean conversion target_engine = target_session.get_bind() # Get existing IDs in target (for conflict detection) existing_ids = set() if pk_column: try: existing_result = target_session.execute( text(f"SELECT {pk_column} FROM {table_name}") ) existing_ids = {row[0] for row in existing_result.fetchall()} logger.debug(f"Found {len(existing_ids)} existing records in target {table_name}") except Exception: # Target table might be empty or not exist existing_ids = set() # Find max ID in target (for renumber strategy) max_id = 0 if pk_column and merge_strategy == 'renumber': try: max_result = target_session.execute( text(f"SELECT MAX({pk_column}) FROM {table_name}") ) max_id = max_result.scalar() or 0 logger.debug(f"Max ID in target {table_name}: {max_id}") except Exception: max_id = 0 # Process each row for row in rows: row_data = dict(row._mapping) # Convert boolean fields if needed (SQLite -> PostgreSQL) row_data = ImportExportService._convert_boolean_fields( table_name, row_data, target_engine ) # Check for conflict record_id = row_data.get(pk_column) if pk_column else None has_conflict = pk_column and record_id in existing_ids try: if has_conflict: # Handle conflict based on strategy if merge_strategy == 'skip': # Skip this record rows_skipped += 1 logger.debug(f"Skipping {table_name} ID {record_id} (already exists)") continue elif merge_strategy == 'overwrite': # Update existing record set_clause = ', '.join([f"{col} = :{col}" for col in column_names if col != pk_column]) update_query = text(f""" UPDATE {table_name} SET {set_clause} WHERE {pk_column} = :_pk_value """) # Add PK value for WHERE clause update_params = row_data.copy() update_params['_pk_value'] = record_id target_session.execute(update_query, update_params) rows_updated += 1 rows_processed += 1 logger.debug(f"Updated {table_name} ID {record_id}") elif merge_strategy == 'renumber': # Generate new ID and insert max_id += 1 row_data[pk_column] = max_id existing_ids.add(max_id) # Track new ID # Build INSERT query columns = ', '.join(column_names) placeholders = ', '.join([f':{col}' for col in column_names]) insert_query = text(f""" INSERT INTO {table_name} ({columns}) VALUES ({placeholders}) """) target_session.execute(insert_query, row_data) rows_renumbered += 1 rows_processed += 1 logger.debug(f"Renumbered {table_name} ID {record_id} -> {max_id}") else: # No conflict, insert normally columns = ', '.join(column_names) placeholders = ', '.join([f':{col}' for col in column_names]) insert_query = text(f""" INSERT INTO {table_name} ({columns}) VALUES ({placeholders}) """) target_session.execute(insert_query, row_data) rows_processed += 1 # Track new ID if applicable if pk_column and record_id: existing_ids.add(record_id) except Exception as e: # Log error but continue with other rows logger.warning(f"Failed to process row in {table_name}: {str(e)}") target_session.rollback() continue # Commit all changes for this table target_session.commit() # Log summary if rows_skipped > 0 or rows_updated > 0 or rows_renumbered > 0: logger.info( f"{table_name} merge summary: " f"{rows_processed} processed, " f"{rows_skipped} skipped, " f"{rows_updated} updated, " f"{rows_renumbered} renumbered" ) return rows_processed except Exception as e: target_session.rollback() logger.error(f"Error migrating table {table_name}: {str(e)}") raise finally: source_session.close() target_session.close()