|
@@ -1,7 +1,23 @@
|
|
|
+import json
|
|
|
+import os
|
|
|
+from itertools import count
|
|
|
from pathlib import Path
|
|
|
|
|
|
+from csv_column_types import get_column_types
|
|
|
|
|
|
-def schema_convert(base_dir: str):
|
|
|
+col_type_convert = {
|
|
|
+ "varchar(20)": "to_varchar20",
|
|
|
+ "varchar(50)": "to_varchar50",
|
|
|
+ "varchar(100)": "to_varchar100",
|
|
|
+ "varchar(255)": "to_varchar255",
|
|
|
+ "decimal(18,8)": "to_decimal",
|
|
|
+ "int": "to_int",
|
|
|
+ "datetime": "to_datetime",
|
|
|
+ "date": "to_datetime",
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+def schema_convert(base_dir: str) -> None:
|
|
|
base_dir = Path(base_dir).absolute()
|
|
|
schema_file = base_dir / "schema.ini"
|
|
|
with open(schema_file, "r", encoding="latin-1") as frh:
|
|
@@ -19,10 +35,10 @@ def schema_convert(base_dir: str):
|
|
|
col_names = [c.split(" ")[0] for c in details["columns"].values()]
|
|
|
tables_with_columns[table] = col_names
|
|
|
|
|
|
- create_format_xml_files(base_dir, tables_with_columns)
|
|
|
+ create_format_xml_files(str(base_dir), tables_with_columns)
|
|
|
|
|
|
|
|
|
-def convert_table_info(table_info):
|
|
|
+def convert_table_info(table_info: str) -> tuple[str, dict]:
|
|
|
info = table_info.split("]\n")
|
|
|
if len(info) < 2:
|
|
|
return ("", "")
|
|
@@ -34,9 +50,12 @@ def convert_table_info(table_info):
|
|
|
return (info[0], details)
|
|
|
|
|
|
|
|
|
-def create_format_xml_files(base_dir, tables_with_columns: dict[str, list[str]]):
|
|
|
+def create_format_xml_files(base_dir: str, tables_with_columns: dict[str, list[str]]) -> None:
|
|
|
for table, columns in tables_with_columns.items():
|
|
|
- format_file = base_dir / (table + ".xml")
|
|
|
+ table_file = table
|
|
|
+ if table[:-4] != ".csv":
|
|
|
+ table_file = f"{table}.csv"
|
|
|
+ format_file = f"{base_dir}\\{table_file}.xml"
|
|
|
|
|
|
record = []
|
|
|
row = []
|
|
@@ -45,7 +64,8 @@ def create_format_xml_files(base_dir, tables_with_columns: dict[str, list[str]])
|
|
|
record.append(
|
|
|
f' <FIELD ID="{i}" xsi:type="CharTerm" TERMINATOR=";" MAX_LENGTH="255" COLLATION="SQL_Latin1_General_CP1_CI_AS"/>'
|
|
|
)
|
|
|
- row.append(f' <COLUMN SOURCE="{i}" NAME="{col_name}" xsi:type="SQLVARYCHAR"/>')
|
|
|
+ col_name_escape = col_name.encode("ascii", "xmlcharrefreplace").decode()
|
|
|
+ row.append(f' <COLUMN SOURCE="{i}" NAME="{col_name_escape}" xsi:type="SQLVARYCHAR"/>')
|
|
|
record[-1] = record[-1].replace(";", "\\r\\n")
|
|
|
|
|
|
with open(format_file, "w") as fwh:
|
|
@@ -59,5 +79,120 @@ def create_format_xml_files(base_dir, tables_with_columns: dict[str, list[str]])
|
|
|
fwh.write("\n </ROW>\n</BCPFORMAT>")
|
|
|
|
|
|
|
|
|
+def create_format_xml_from_folder(base_dir: str) -> None:
|
|
|
+ format_folder = f"{base_dir}\\Format"
|
|
|
+ os.makedirs(format_folder, exist_ok=True)
|
|
|
+ sql_folder = f"{base_dir}\\SQL"
|
|
|
+ os.makedirs(f"{sql_folder}\\views_export", exist_ok=True)
|
|
|
+ os.makedirs(f"{sql_folder}\\views_load", exist_ok=True)
|
|
|
+ os.makedirs(f"{sql_folder}\\exec_drop_create", exist_ok=True)
|
|
|
+ os.makedirs(f"{sql_folder}\\exec_update", exist_ok=True)
|
|
|
+ tables_with_columns = get_tables_with_columns_from_folder(base_dir)
|
|
|
+ with open(f"{format_folder}\\tables.json", "w") as fwh:
|
|
|
+ json.dump(tables_with_columns, fwh, indent=2)
|
|
|
+ create_format_xml_files(format_folder, tables_with_columns)
|
|
|
+ create_openrowset_sql_files(base_dir, format_folder, sql_folder, tables_with_columns)
|
|
|
+ create_load_sql_files(sql_folder, tables_with_columns)
|
|
|
+ create_drop_create_sql_files(sql_folder, tables_with_columns)
|
|
|
+ create_update_sql_files(sql_folder, tables_with_columns)
|
|
|
+
|
|
|
+
|
|
|
+def create_openrowset_sql_files(
|
|
|
+ base_dir: str, format_folder: str, sql_folder: str, tables_with_columns: dict[str, list[str]]
|
|
|
+) -> None:
|
|
|
+ for table, columns in tables_with_columns.items():
|
|
|
+ csv_file = f"{base_dir}\\{table}.csv"
|
|
|
+ format_file = f"{format_folder}\\{table}.csv.xml"
|
|
|
+ sql_file = f"{sql_folder}\\views_export\\export_csv.{table}.sql"
|
|
|
+
|
|
|
+ col_types = get_column_types(csv_file, columns)
|
|
|
+ select_fields = [get_select_statement(col, col_type) for col, col_type in col_types.items()]
|
|
|
+ query = (
|
|
|
+ "SELECT "
|
|
|
+ + ",\n\t".join(select_fields)
|
|
|
+ + "\nFROM OPENROWSET(\n"
|
|
|
+ + f"\tBULK '{csv_file}',\n"
|
|
|
+ + f"\tFORMATFILE = '{format_file}',\n"
|
|
|
+ + "\tFIRSTROW = 2\n"
|
|
|
+ + ") AS T1"
|
|
|
+ )
|
|
|
+
|
|
|
+ create_view = get_create_view("export_csv", table, query)
|
|
|
+ with open(sql_file, "w", encoding="latin-1") as fwh:
|
|
|
+ fwh.write(create_view)
|
|
|
+
|
|
|
+
|
|
|
+def get_create_view(schema: str, table: str, query: str) -> str:
|
|
|
+ create_view = (
|
|
|
+ "SET QUOTED_IDENTIFIER ON\nGO\n\nSET ANSI_NULLS ON\nGO\n\n"
|
|
|
+ + f"CREATE\n\tOR\n\nALTER VIEW [{schema}].[{table}]\nAS\n{query}\n"
|
|
|
+ + "GO\n\nSET QUOTED_IDENTIFIER OFF\nGO\n\nSET ANSI_NULLS OFF\nGO\n\n\nGO\n\n\n"
|
|
|
+ )
|
|
|
+ return create_view
|
|
|
+
|
|
|
+
|
|
|
+def get_select_statement(col: str, col_type: str) -> str:
|
|
|
+ convert = col_type_convert.get(col_type)
|
|
|
+ if convert:
|
|
|
+ return f"dbo.{convert}(T1.[{col}]) AS [{col}]"
|
|
|
+ return f"T1.[{col}]"
|
|
|
+
|
|
|
+
|
|
|
+def create_load_sql_files(sql_folder: str, tables_with_columns: dict[str, list[str]]):
|
|
|
+ for table, columns in tables_with_columns.items():
|
|
|
+ sql_file = f"{sql_folder}\\views_load\\load.{table}.sql"
|
|
|
+ cols = [f"[{c}]" for c in columns]
|
|
|
+ query = "SELECT " + ",\n\t".join(cols) + f"\nFROM [export_csv].[{table}]"
|
|
|
+ create_view = get_create_view("load", table, query)
|
|
|
+ with open(sql_file, "w", encoding="latin-1") as fwh:
|
|
|
+ fwh.write(create_view)
|
|
|
+
|
|
|
+
|
|
|
+def create_drop_create_sql_files(sql_folder: str, tables_with_columns: dict[str, list[str]], system: str = "OPTIMA"):
|
|
|
+ for table in tables_with_columns.keys():
|
|
|
+ sql_file = f"{sql_folder}\\exec_drop_create\\{table}.sql"
|
|
|
+ query = (
|
|
|
+ f"USE [GC]\nGO\n\nDROP TABLE IF EXISTS [{system.lower()}].[{table}]\nGO\n\n"
|
|
|
+ + f"SELECT *\nINTO [{system.lower()}].[{table}]\nFROM [{system}].[load].[{table}]"
|
|
|
+ )
|
|
|
+ with open(sql_file, "w", encoding="latin-1") as fwh:
|
|
|
+ fwh.write(query)
|
|
|
+
|
|
|
+
|
|
|
+def create_update_sql_files(sql_folder: str, tables_with_columns: dict[str, list[str]], system: str = "OPTIMA"):
|
|
|
+ for table in tables_with_columns.keys():
|
|
|
+ sql_file = f"{sql_folder}\\exec_update\\{table}.sql"
|
|
|
+ query = (
|
|
|
+ f"USE [GC]\nGO\n\nTRUNCATE TABLE [{system.lower()}].[{table}]\nGO\n\n"
|
|
|
+ + f"INSERT INTO [{system.lower()}].[{table}]\n"
|
|
|
+ + f"SELECT *\nFROM [{system}].[load].[{table}]"
|
|
|
+ )
|
|
|
+ with open(sql_file, "w", encoding="latin-1") as fwh:
|
|
|
+ fwh.write(query)
|
|
|
+
|
|
|
+
|
|
|
+def get_tables_with_columns_from_folder(base_dir: str) -> dict[str, list[str]]:
|
|
|
+ tables_with_columns = {}
|
|
|
+
|
|
|
+ for csv_file in Path(base_dir).glob("*.csv"):
|
|
|
+ table_name = csv_file.stem
|
|
|
+ with open(csv_file, "r", encoding="latin-1") as frh:
|
|
|
+ cols = frh.readline().strip("\n").split(";")
|
|
|
+ cols_unique = []
|
|
|
+ for c in cols:
|
|
|
+ c1 = c.strip('"')
|
|
|
+ if c1 not in cols_unique:
|
|
|
+ cols_unique.append(c1)
|
|
|
+ continue
|
|
|
+ for i in count(1):
|
|
|
+ c2 = f"{c1}_{i}"
|
|
|
+ if c2 not in cols and c2 not in cols_unique:
|
|
|
+ cols_unique.append(c2)
|
|
|
+ break
|
|
|
+ tables_with_columns[table_name] = cols_unique
|
|
|
+ return tables_with_columns
|
|
|
+
|
|
|
+
|
|
|
if __name__ == "__main__":
|
|
|
- schema_convert("C:\\GlobalCube_LOCOSOFT\\GCStruct_SKR51\\Kontenrahmen")
|
|
|
+ # schema_convert("C:\\GlobalCube_LOCOSOFT\\GCStruct_SKR51\\Kontenrahmen")
|
|
|
+ create_format_xml_from_folder("C:\\GlobalCube_LOCOSOFT\\System\\LOCOSOFT\\Export")
|