123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231 |
- import json
- import os
- from itertools import count
- from pathlib import Path
- from cognos7.csv_column_types import get_column_types
- col_type_convert = {
- "varchar(20)": "to_varchar20",
- "varchar(50)": "to_varchar50",
- "varchar(100)": "to_varchar100",
- "varchar(255)": "to_varchar255",
- "decimal(18,8)": "to_decimal",
- "int": "to_int",
- "datetime": "to_datetime",
- "date": "to_datetime",
- }
- def schema_convert(base_dir: str) -> None:
- base_dir = Path(base_dir).absolute()
- schema_file = base_dir / "schema.ini"
- with open(schema_file, "r", encoding="latin-1") as frh:
- schema = frh.read()
- # schema = schema.replace('\n\n', '\n').replace('\n\n', '\n')
- tables = dict([convert_table_info(t) for t in schema[1:].split("\n[")])
- tables_with_columns = {}
- for table, details in tables.items():
- table_file = base_dir / table
- if "schema.ini" in table or not table_file.exists():
- continue
- col_names = [c.split(" ")[0] for c in details["columns"].values()]
- tables_with_columns[table] = col_names
- create_format_xml_files(str(base_dir), tables_with_columns)
- def convert_table_info(table_info: str) -> tuple[str, dict]:
- info = table_info.split("]\n")
- if len(info) < 2:
- return ("", "")
- details = {}
- details["columns"] = {}
- for key, value in [row.split("=") for row in info[1].split("\n") if "=" in row]:
- if key.lower() != "colnameheader" and key.lower()[:3] == "col":
- details["columns"][key[3:]] = value
- return (info[0], details)
- def create_format_xml_files(base_dir: str, tables_with_columns: dict[str, list[str]]) -> None:
- for table, columns in tables_with_columns.items():
- table_file = table
- if table[:-4] != ".csv":
- table_file = f"{table}.csv"
- format_file = f"{base_dir}\\{table_file}.xml"
- record = []
- row = []
- for i, col_name in enumerate(columns, 1):
- record.append(
- f' <FIELD ID="{i}" xsi:type="CharTerm" TERMINATOR=";" MAX_LENGTH="255" COLLATION="SQL_Latin1_General_CP1_CI_AS"/>'
- )
- col_name_escape = (
- col_name.replace("&", "&")
- .encode("ascii", "xmlcharrefreplace")
- .decode()
- .replace("<", "<")
- .replace(">", ">")
- )
- row.append(f' <COLUMN SOURCE="{i}" NAME="{col_name_escape}" xsi:type="SQLVARYCHAR"/>')
- record[-1] = record[-1].replace(";", "\\r\\n")
- with open(format_file, "w") as fwh:
- fwh.write(
- '<?xml version="1.0"?>\n<BCPFORMAT xmlns="http://schemas.microsoft.com/sqlserver/2004/bulkload/format" '
- )
- fwh.write('xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">\n <RECORD>\n')
- fwh.write("\n".join(record))
- fwh.write("\n </RECORD>\n <ROW>\n")
- fwh.write("\n".join(row))
- fwh.write("\n </ROW>\n</BCPFORMAT>")
- def create_format_xml_from_folder(base_dir: str) -> None:
- system = Path(base_dir).parent.name
- format_folder = f"{base_dir}\\Format"
- os.makedirs(format_folder, exist_ok=True)
- sql_folder = f"{base_dir}\\SQL"
- os.makedirs(f"{sql_folder}\\views_export", exist_ok=True)
- os.makedirs(f"{sql_folder}\\views_load", exist_ok=True)
- os.makedirs(f"{sql_folder}\\exec_drop_create", exist_ok=True)
- os.makedirs(f"{sql_folder}\\exec_update", exist_ok=True)
- tables_with_columns = get_tables_with_columns_from_folder(base_dir)
- with open(f"{format_folder}\\tables.json", "w") as fwh:
- json.dump(tables_with_columns, fwh, indent=2)
- create_format_xml_files(format_folder, tables_with_columns)
- create_openrowset_sql_files(base_dir, format_folder, sql_folder, tables_with_columns)
- create_load_sql_files(sql_folder, tables_with_columns)
- create_drop_create_sql_files(sql_folder, tables_with_columns, system)
- create_update_sql_files(sql_folder, tables_with_columns, system)
- create_sql_bat_files(sql_folder)
- def create_openrowset_sql_files(
- base_dir: str, format_folder: str, sql_folder: str, tables_with_columns: dict[str, list[str]]
- ) -> None:
- for table, columns in tables_with_columns.items():
- csv_file = f"{base_dir}\\{table}.csv"
- format_file = f"{format_folder}\\{table}.csv.xml"
- sql_file = f"{sql_folder}\\views_export\\export_csv.{table}.sql"
- col_types = get_column_types(csv_file, columns)
- select_fields = [get_select_statement(col, col_type) for col, col_type in col_types.items()]
- query = (
- "SELECT "
- + ",\n\t".join(select_fields)
- + "\nFROM OPENROWSET(\n"
- + f"\tBULK '{csv_file}',\n"
- + f"\tFORMATFILE = '{format_file}',\n"
- + "\tFIRSTROW = 2\n"
- + ") AS T1"
- )
- create_view = get_create_view("export_csv", table, query)
- with open(sql_file, "w", encoding="latin-1") as fwh:
- fwh.write(create_view)
- def get_create_view(schema: str, table: str, query: str) -> str:
- create_view = (
- "SET QUOTED_IDENTIFIER ON\nGO\n\nSET ANSI_NULLS ON\nGO\n\n"
- + f"CREATE\n\tOR\n\nALTER VIEW [{schema}].[{table}]\nAS\n{query}\n"
- + "GO\n\nSET QUOTED_IDENTIFIER OFF\nGO\n\nSET ANSI_NULLS OFF\nGO\n\n\nGO\n\n\n"
- )
- return create_view
- def get_select_statement(col: str, col_type: str) -> str:
- convert = col_type_convert.get(col_type)
- if convert:
- return f"dbo.{convert}(T1.[{col}]) AS [{col}]"
- return f"T1.[{col}]"
- def create_load_sql_files(sql_folder: str, tables_with_columns: dict[str, list[str]]):
- for table, columns in tables_with_columns.items():
- sql_file = f"{sql_folder}\\views_load\\load.{table}.sql"
- cols = [f"[{c}]" for c in columns]
- query = "SELECT " + ",\n\t".join(cols) + f"\nFROM [export_csv].[{table}]"
- create_view = get_create_view("load", table, query)
- with open(sql_file, "w", encoding="latin-1") as fwh:
- fwh.write(create_view)
- def create_drop_create_sql_files(sql_folder: str, tables_with_columns: dict[str, list[str]], system: str = "OPTIMA"):
- for table in tables_with_columns.keys():
- sql_file = f"{sql_folder}\\exec_drop_create\\{table}.sql"
- query = (
- f"USE [GC]\nGO\n\nDROP TABLE IF EXISTS [{system.lower()}].[{table}]\nGO\n\n"
- + f"SELECT *\nINTO [{system.lower()}].[{table}]\nFROM [{system}].[load].[{table}]"
- )
- with open(sql_file, "w", encoding="latin-1") as fwh:
- fwh.write(query)
- def create_update_sql_files(sql_folder: str, tables_with_columns: dict[str, list[str]], system: str = "OPTIMA"):
- for table in tables_with_columns.keys():
- sql_file = f"{sql_folder}\\exec_update\\{table}.sql"
- query = (
- f"USE [GC]\nGO\n\nTRUNCATE TABLE [{system.lower()}].[{table}]\nGO\n\n"
- + f"INSERT INTO [{system.lower()}].[{table}]\n"
- + f"SELECT *\nFROM [{system}].[load].[{table}]"
- )
- with open(sql_file, "w", encoding="latin-1") as fwh:
- fwh.write(query)
- def get_tables_with_columns_from_folder(base_dir: str) -> dict[str, list[str]]:
- tables_with_columns = {}
- for csv_file in Path(base_dir).glob("*.csv"):
- table_name = csv_file.stem
- with open(csv_file, "r", encoding="latin-1") as frh:
- cols = frh.readline().strip("\n").split(";")
- cols_unique = []
- cols_unique_lower = []
- for c in cols:
- c1 = c.strip('"')
- if c1.lower() not in cols_unique_lower:
- cols_unique.append(c1)
- cols_unique_lower.append(c1.lower())
- continue
- for i in count(1):
- c2 = f"{c1}_{i}"
- if c2 not in cols and c2.lower() not in cols_unique_lower:
- cols_unique.append(c2)
- cols_unique_lower.append(c2.lower())
- break
- tables_with_columns[table_name] = cols_unique
- return tables_with_columns
- def create_sql_bat_files(sql_folder: str) -> None:
- tasks_dir = Path(sql_folder).parent.parent.parent.parent / "Tasks" / "scripts"
- header = f'@call "{tasks_dir}\\config.bat" 0 > nul'
- folder_list = [
- "views_export",
- "views_load",
- "exec_drop_create",
- "exec_update",
- ]
- for f in folder_list:
- folder = f"{sql_folder}\\{f}"
- bat_file = f"{folder}\\{f}.bat"
- with open(bat_file, "w", encoding="cp850") as fwh:
- fwh.write(header + "\n\n")
- fwh.write(f"echo {folder}\n")
- for sql_file in Path(folder).glob("*.sql"):
- fwh.write(f" call sqlexec2.bat {sql_file}\n")
- if __name__ == "__main__":
- # schema_convert("C:\\GlobalCube_LOCOSOFT\\GCStruct_SKR51\\Kontenrahmen")
- create_format_xml_from_folder("C:\\GlobalCube_LOCOSOFT\\System\\LOCOSOFT\\Export")
|