import json
import os
from itertools import count
from pathlib import Path
from cognos7.csv_column_types import get_column_types
col_type_convert = {
"varchar(20)": "to_varchar20",
"varchar(50)": "to_varchar50",
"varchar(100)": "to_varchar100",
"varchar(255)": "to_varchar255",
"decimal(18,8)": "to_decimal",
"int": "to_int",
"datetime": "to_datetime",
"date": "to_datetime",
}
def schema_convert(base_dir: str) -> None:
base_dir = Path(base_dir).absolute()
schema_file = base_dir / "schema.ini"
with open(schema_file, "r", encoding="latin-1") as frh:
schema = frh.read()
# schema = schema.replace('\n\n', '\n').replace('\n\n', '\n')
tables = dict([convert_table_info(t) for t in schema[1:].split("\n[")])
tables_with_columns = {}
for table, details in tables.items():
table_file = base_dir / table
if "schema.ini" in table or not table_file.exists():
continue
col_names = [c.split(" ")[0] for c in details["columns"].values()]
tables_with_columns[table] = col_names
create_format_xml_files(str(base_dir), tables_with_columns)
def convert_table_info(table_info: str) -> tuple[str, dict]:
info = table_info.split("]\n")
if len(info) < 2:
return ("", "")
details = {}
details["columns"] = {}
for key, value in [row.split("=") for row in info[1].split("\n") if "=" in row]:
if key.lower() != "colnameheader" and key.lower()[:3] == "col":
details["columns"][key[3:]] = value
return (info[0], details)
def create_format_xml_files(base_dir: str, tables_with_columns: dict[str, list[str]]) -> None:
for table, columns in tables_with_columns.items():
table_file = table
if table[:-4] != ".csv":
table_file = f"{table}.csv"
format_file = f"{base_dir}\\{table_file}.xml"
record = []
row = []
for i, col_name in enumerate(columns, 1):
record.append(
f' '
)
col_name_escape = (
col_name.replace("&", "&")
.encode("ascii", "xmlcharrefreplace")
.decode()
.replace("<", "<")
.replace(">", ">")
)
row.append(f' ')
record[-1] = record[-1].replace(";", "\\r\\n")
with open(format_file, "w") as fwh:
fwh.write(
'\n\n \n')
fwh.write("\n".join(record))
fwh.write("\n \n \n")
fwh.write("\n".join(row))
fwh.write("\n
\n")
def create_format_xml_from_folder(base_dir: str) -> None:
system = Path(base_dir).parent.name
format_folder = f"{base_dir}\\Format"
os.makedirs(format_folder, exist_ok=True)
sql_folder = f"{base_dir}\\SQL"
os.makedirs(f"{sql_folder}\\views_export", exist_ok=True)
os.makedirs(f"{sql_folder}\\views_load", exist_ok=True)
os.makedirs(f"{sql_folder}\\exec_drop_create", exist_ok=True)
os.makedirs(f"{sql_folder}\\exec_update", exist_ok=True)
tables_with_columns = get_tables_with_columns_from_folder(base_dir)
with open(f"{format_folder}\\tables.json", "w") as fwh:
json.dump(tables_with_columns, fwh, indent=2)
create_format_xml_files(format_folder, tables_with_columns)
create_openrowset_sql_files(base_dir, format_folder, sql_folder, tables_with_columns)
create_load_sql_files(sql_folder, tables_with_columns)
create_drop_create_sql_files(sql_folder, tables_with_columns, system)
create_update_sql_files(sql_folder, tables_with_columns, system)
create_sql_bat_files(sql_folder)
def create_openrowset_sql_files(
base_dir: str, format_folder: str, sql_folder: str, tables_with_columns: dict[str, list[str]]
) -> None:
for table, columns in tables_with_columns.items():
csv_file = f"{base_dir}\\{table}.csv"
format_file = f"{format_folder}\\{table}.csv.xml"
sql_file = f"{sql_folder}\\views_export\\export_csv.{table}.sql"
col_types = get_column_types(csv_file, columns)
select_fields = [get_select_statement(col, col_type) for col, col_type in col_types.items()]
query = (
"SELECT "
+ ",\n\t".join(select_fields)
+ "\nFROM OPENROWSET(\n"
+ f"\tBULK '{csv_file}',\n"
+ f"\tFORMATFILE = '{format_file}',\n"
+ "\tFIRSTROW = 2\n"
+ ") AS T1"
)
create_view = get_create_view("export_csv", table, query)
with open(sql_file, "w", encoding="latin-1") as fwh:
fwh.write(create_view)
def get_create_view(schema: str, table: str, query: str) -> str:
create_view = (
"SET QUOTED_IDENTIFIER ON\nGO\n\nSET ANSI_NULLS ON\nGO\n\n"
+ f"CREATE\n\tOR\n\nALTER VIEW [{schema}].[{table}]\nAS\n{query}\n"
+ "GO\n\nSET QUOTED_IDENTIFIER OFF\nGO\n\nSET ANSI_NULLS OFF\nGO\n\n\nGO\n\n\n"
)
return create_view
def get_select_statement(col: str, col_type: str) -> str:
convert = col_type_convert.get(col_type)
if convert:
return f"dbo.{convert}(T1.[{col}]) AS [{col}]"
return f"T1.[{col}]"
def create_load_sql_files(sql_folder: str, tables_with_columns: dict[str, list[str]]):
for table, columns in tables_with_columns.items():
sql_file = f"{sql_folder}\\views_load\\load.{table}.sql"
cols = [f"[{c}]" for c in columns]
query = "SELECT " + ",\n\t".join(cols) + f"\nFROM [export_csv].[{table}]"
create_view = get_create_view("load", table, query)
with open(sql_file, "w", encoding="latin-1") as fwh:
fwh.write(create_view)
def create_drop_create_sql_files(sql_folder: str, tables_with_columns: dict[str, list[str]], system: str = "OPTIMA"):
for table in tables_with_columns.keys():
sql_file = f"{sql_folder}\\exec_drop_create\\{table}.sql"
query = (
f"USE [GC]\nGO\n\nDROP TABLE IF EXISTS [{system.lower()}].[{table}]\nGO\n\n"
+ f"SELECT *\nINTO [{system.lower()}].[{table}]\nFROM [{system}].[load].[{table}]"
)
with open(sql_file, "w", encoding="latin-1") as fwh:
fwh.write(query)
def create_update_sql_files(sql_folder: str, tables_with_columns: dict[str, list[str]], system: str = "OPTIMA"):
for table in tables_with_columns.keys():
sql_file = f"{sql_folder}\\exec_update\\{table}.sql"
query = (
f"USE [GC]\nGO\n\nTRUNCATE TABLE [{system.lower()}].[{table}]\nGO\n\n"
+ f"INSERT INTO [{system.lower()}].[{table}]\n"
+ f"SELECT *\nFROM [{system}].[load].[{table}]"
)
with open(sql_file, "w", encoding="latin-1") as fwh:
fwh.write(query)
def get_tables_with_columns_from_folder(base_dir: str) -> dict[str, list[str]]:
tables_with_columns = {}
for csv_file in Path(base_dir).glob("*.csv"):
table_name = csv_file.stem
with open(csv_file, "r", encoding="latin-1") as frh:
cols = frh.readline().strip("\n").split(";")
cols_unique = []
cols_unique_lower = []
for c in cols:
c1 = c.strip('"')
if c1.lower() not in cols_unique_lower:
cols_unique.append(c1)
cols_unique_lower.append(c1.lower())
continue
for i in count(1):
c2 = f"{c1}_{i}"
if c2 not in cols and c2.lower() not in cols_unique_lower:
cols_unique.append(c2)
cols_unique_lower.append(c2.lower())
break
tables_with_columns[table_name] = cols_unique
return tables_with_columns
def create_sql_bat_files(sql_folder: str) -> None:
tasks_dir = Path(sql_folder).parent.parent.parent.parent / "Tasks" / "scripts"
header = f'@call "{tasks_dir}\\config.bat" 0 > nul'
folder_list = [
"views_export",
"views_load",
"exec_drop_create",
"exec_update",
]
for f in folder_list:
folder = f"{sql_folder}\\{f}"
bat_file = f"{folder}\\{f}.bat"
with open(bat_file, "w", encoding="cp850") as fwh:
fwh.write(header + "\n\n")
fwh.write(f"echo {folder}\n")
for sql_file in Path(folder).glob("*.sql"):
fwh.write(f" call sqlexec2.bat {sql_file}\n")
if __name__ == "__main__":
# schema_convert("C:\\GlobalCube_LOCOSOFT\\GCStruct_SKR51\\Kontenrahmen")
create_format_xml_from_folder("C:\\GlobalCube_LOCOSOFT\\System\\LOCOSOFT\\Export")