import json import os from itertools import count from pathlib import Path from cognos7.csv_column_types import get_column_types col_type_convert = { "varchar(20)": "to_varchar20", "varchar(50)": "to_varchar50", "varchar(100)": "to_varchar100", "varchar(255)": "to_varchar255", "decimal(18,8)": "to_decimal", "int": "to_int", "datetime": "to_datetime", "date": "to_datetime", } def schema_convert(base_dir: str) -> None: base_dir = Path(base_dir).absolute() schema_file = base_dir / "schema.ini" with open(schema_file, "r", encoding="latin-1") as frh: schema = frh.read() # schema = schema.replace('\n\n', '\n').replace('\n\n', '\n') tables = dict([convert_table_info(t) for t in schema[1:].split("\n[")]) tables_with_columns = {} for table, details in tables.items(): table_file = base_dir / table if "schema.ini" in table or not table_file.exists(): continue col_names = [c.split(" ")[0] for c in details["columns"].values()] tables_with_columns[table] = col_names create_format_xml_files(str(base_dir), tables_with_columns) def convert_table_info(table_info: str) -> tuple[str, dict]: info = table_info.split("]\n") if len(info) < 2: return ("", "") details = {} details["columns"] = {} for key, value in [row.split("=") for row in info[1].split("\n") if "=" in row]: if key.lower() != "colnameheader" and key.lower()[:3] == "col": details["columns"][key[3:]] = value return (info[0], details) def create_format_xml_files(base_dir: str, tables_with_columns: dict[str, list[str]]) -> None: for table, columns in tables_with_columns.items(): table_file = table if table[:-4] != ".csv": table_file = f"{table}.csv" format_file = f"{base_dir}\\{table_file}.xml" record = [] row = [] for i, col_name in enumerate(columns, 1): record.append( f' ' ) col_name_escape = ( col_name.replace("&", "&") .encode("ascii", "xmlcharrefreplace") .decode() .replace("<", "<") .replace(">", ">") ) row.append(f' ') record[-1] = record[-1].replace(";", "\\r\\n") with open(format_file, "w") as fwh: fwh.write( '\n\n \n') fwh.write("\n".join(record)) fwh.write("\n \n \n") fwh.write("\n".join(row)) fwh.write("\n \n") def create_format_xml_from_folder(base_dir: str) -> None: system = Path(base_dir).parent.name format_folder = f"{base_dir}\\Format" os.makedirs(format_folder, exist_ok=True) sql_folder = f"{base_dir}\\SQL" os.makedirs(f"{sql_folder}\\views_export", exist_ok=True) os.makedirs(f"{sql_folder}\\views_load", exist_ok=True) os.makedirs(f"{sql_folder}\\exec_drop_create", exist_ok=True) os.makedirs(f"{sql_folder}\\exec_update", exist_ok=True) tables_with_columns = get_tables_with_columns_from_folder(base_dir) with open(f"{format_folder}\\tables.json", "w") as fwh: json.dump(tables_with_columns, fwh, indent=2) create_format_xml_files(format_folder, tables_with_columns) create_openrowset_sql_files(base_dir, format_folder, sql_folder, tables_with_columns) create_load_sql_files(sql_folder, tables_with_columns) create_drop_create_sql_files(sql_folder, tables_with_columns, system) create_update_sql_files(sql_folder, tables_with_columns, system) create_sql_bat_files(sql_folder) def create_openrowset_sql_files( base_dir: str, format_folder: str, sql_folder: str, tables_with_columns: dict[str, list[str]] ) -> None: for table, columns in tables_with_columns.items(): csv_file = f"{base_dir}\\{table}.csv" format_file = f"{format_folder}\\{table}.csv.xml" sql_file = f"{sql_folder}\\views_export\\export_csv.{table}.sql" col_types = get_column_types(csv_file, columns) select_fields = [get_select_statement(col, col_type) for col, col_type in col_types.items()] query = ( "SELECT " + ",\n\t".join(select_fields) + "\nFROM OPENROWSET(\n" + f"\tBULK '{csv_file}',\n" + f"\tFORMATFILE = '{format_file}',\n" + "\tFIRSTROW = 2\n" + ") AS T1" ) create_view = get_create_view("export_csv", table, query) with open(sql_file, "w", encoding="latin-1") as fwh: fwh.write(create_view) def get_create_view(schema: str, table: str, query: str) -> str: create_view = ( "SET QUOTED_IDENTIFIER ON\nGO\n\nSET ANSI_NULLS ON\nGO\n\n" + f"CREATE\n\tOR\n\nALTER VIEW [{schema}].[{table}]\nAS\n{query}\n" + "GO\n\nSET QUOTED_IDENTIFIER OFF\nGO\n\nSET ANSI_NULLS OFF\nGO\n\n\nGO\n\n\n" ) return create_view def get_select_statement(col: str, col_type: str) -> str: convert = col_type_convert.get(col_type) if convert: return f"dbo.{convert}(T1.[{col}]) AS [{col}]" return f"T1.[{col}]" def create_load_sql_files(sql_folder: str, tables_with_columns: dict[str, list[str]]): for table, columns in tables_with_columns.items(): sql_file = f"{sql_folder}\\views_load\\load.{table}.sql" cols = [f"[{c}]" for c in columns] query = "SELECT " + ",\n\t".join(cols) + f"\nFROM [export_csv].[{table}]" create_view = get_create_view("load", table, query) with open(sql_file, "w", encoding="latin-1") as fwh: fwh.write(create_view) def create_drop_create_sql_files(sql_folder: str, tables_with_columns: dict[str, list[str]], system: str = "OPTIMA"): for table in tables_with_columns.keys(): sql_file = f"{sql_folder}\\exec_drop_create\\{table}.sql" query = ( f"USE [GC]\nGO\n\nDROP TABLE IF EXISTS [{system.lower()}].[{table}]\nGO\n\n" + f"SELECT *\nINTO [{system.lower()}].[{table}]\nFROM [{system}].[load].[{table}]" ) with open(sql_file, "w", encoding="latin-1") as fwh: fwh.write(query) def create_update_sql_files(sql_folder: str, tables_with_columns: dict[str, list[str]], system: str = "OPTIMA"): for table in tables_with_columns.keys(): sql_file = f"{sql_folder}\\exec_update\\{table}.sql" query = ( f"USE [GC]\nGO\n\nTRUNCATE TABLE [{system.lower()}].[{table}]\nGO\n\n" + f"INSERT INTO [{system.lower()}].[{table}]\n" + f"SELECT *\nFROM [{system}].[load].[{table}]" ) with open(sql_file, "w", encoding="latin-1") as fwh: fwh.write(query) def get_tables_with_columns_from_folder(base_dir: str) -> dict[str, list[str]]: tables_with_columns = {} for csv_file in Path(base_dir).glob("*.csv"): table_name = csv_file.stem with open(csv_file, "r", encoding="latin-1") as frh: cols = frh.readline().strip("\n").split(";") cols_unique = [] cols_unique_lower = [] for c in cols: c1 = c.strip('"') if c1.lower() not in cols_unique_lower: cols_unique.append(c1) cols_unique_lower.append(c1.lower()) continue for i in count(1): c2 = f"{c1}_{i}" if c2 not in cols and c2.lower() not in cols_unique_lower: cols_unique.append(c2) cols_unique_lower.append(c2.lower()) break tables_with_columns[table_name] = cols_unique return tables_with_columns def create_sql_bat_files(sql_folder: str) -> None: tasks_dir = Path(sql_folder).parent.parent.parent.parent / "Tasks" / "scripts" header = f'@call "{tasks_dir}\\config.bat" 0 > nul' folder_list = [ "views_export", "views_load", "exec_drop_create", "exec_update", ] for f in folder_list: folder = f"{sql_folder}\\{f}" bat_file = f"{folder}\\{f}.bat" with open(bat_file, "w", encoding="cp850") as fwh: fwh.write(header + "\n\n") fwh.write(f"echo {folder}\n") for sql_file in Path(folder).glob("*.sql"): fwh.write(f" call sqlexec2.bat {sql_file}\n") if __name__ == "__main__": # schema_convert("C:\\GlobalCube_LOCOSOFT\\GCStruct_SKR51\\Kontenrahmen") create_format_xml_from_folder("C:\\GlobalCube_LOCOSOFT\\System\\LOCOSOFT\\Export")