schema_ini_convert.py 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198
  1. import json
  2. import os
  3. from itertools import count
  4. from pathlib import Path
  5. from csv_column_types import get_column_types
  6. col_type_convert = {
  7. "varchar(20)": "to_varchar20",
  8. "varchar(50)": "to_varchar50",
  9. "varchar(100)": "to_varchar100",
  10. "varchar(255)": "to_varchar255",
  11. "decimal(18,8)": "to_decimal",
  12. "int": "to_int",
  13. "datetime": "to_datetime",
  14. "date": "to_datetime",
  15. }
  16. def schema_convert(base_dir: str) -> None:
  17. base_dir = Path(base_dir).absolute()
  18. schema_file = base_dir / "schema.ini"
  19. with open(schema_file, "r", encoding="latin-1") as frh:
  20. schema = frh.read()
  21. # schema = schema.replace('\n\n', '\n').replace('\n\n', '\n')
  22. tables = dict([convert_table_info(t) for t in schema[1:].split("\n[")])
  23. tables_with_columns = {}
  24. for table, details in tables.items():
  25. table_file = base_dir / table
  26. if "schema.ini" in table or not table_file.exists():
  27. continue
  28. col_names = [c.split(" ")[0] for c in details["columns"].values()]
  29. tables_with_columns[table] = col_names
  30. create_format_xml_files(str(base_dir), tables_with_columns)
  31. def convert_table_info(table_info: str) -> tuple[str, dict]:
  32. info = table_info.split("]\n")
  33. if len(info) < 2:
  34. return ("", "")
  35. details = {}
  36. details["columns"] = {}
  37. for key, value in [row.split("=") for row in info[1].split("\n") if "=" in row]:
  38. if key.lower() != "colnameheader" and key.lower()[:3] == "col":
  39. details["columns"][key[3:]] = value
  40. return (info[0], details)
  41. def create_format_xml_files(base_dir: str, tables_with_columns: dict[str, list[str]]) -> None:
  42. for table, columns in tables_with_columns.items():
  43. table_file = table
  44. if table[:-4] != ".csv":
  45. table_file = f"{table}.csv"
  46. format_file = f"{base_dir}\\{table_file}.xml"
  47. record = []
  48. row = []
  49. for i, col_name in enumerate(columns, 1):
  50. record.append(
  51. f' <FIELD ID="{i}" xsi:type="CharTerm" TERMINATOR=";" MAX_LENGTH="255" COLLATION="SQL_Latin1_General_CP1_CI_AS"/>'
  52. )
  53. col_name_escape = col_name.encode("ascii", "xmlcharrefreplace").decode()
  54. row.append(f' <COLUMN SOURCE="{i}" NAME="{col_name_escape}" xsi:type="SQLVARYCHAR"/>')
  55. record[-1] = record[-1].replace(";", "\\r\\n")
  56. with open(format_file, "w") as fwh:
  57. fwh.write(
  58. '<?xml version="1.0"?>\n<BCPFORMAT xmlns="http://schemas.microsoft.com/sqlserver/2004/bulkload/format" '
  59. )
  60. fwh.write('xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">\n <RECORD>\n')
  61. fwh.write("\n".join(record))
  62. fwh.write("\n </RECORD>\n <ROW>\n")
  63. fwh.write("\n".join(row))
  64. fwh.write("\n </ROW>\n</BCPFORMAT>")
  65. def create_format_xml_from_folder(base_dir: str) -> None:
  66. format_folder = f"{base_dir}\\Format"
  67. os.makedirs(format_folder, exist_ok=True)
  68. sql_folder = f"{base_dir}\\SQL"
  69. os.makedirs(f"{sql_folder}\\views_export", exist_ok=True)
  70. os.makedirs(f"{sql_folder}\\views_load", exist_ok=True)
  71. os.makedirs(f"{sql_folder}\\exec_drop_create", exist_ok=True)
  72. os.makedirs(f"{sql_folder}\\exec_update", exist_ok=True)
  73. tables_with_columns = get_tables_with_columns_from_folder(base_dir)
  74. with open(f"{format_folder}\\tables.json", "w") as fwh:
  75. json.dump(tables_with_columns, fwh, indent=2)
  76. create_format_xml_files(format_folder, tables_with_columns)
  77. create_openrowset_sql_files(base_dir, format_folder, sql_folder, tables_with_columns)
  78. create_load_sql_files(sql_folder, tables_with_columns)
  79. create_drop_create_sql_files(sql_folder, tables_with_columns)
  80. create_update_sql_files(sql_folder, tables_with_columns)
  81. def create_openrowset_sql_files(
  82. base_dir: str, format_folder: str, sql_folder: str, tables_with_columns: dict[str, list[str]]
  83. ) -> None:
  84. for table, columns in tables_with_columns.items():
  85. csv_file = f"{base_dir}\\{table}.csv"
  86. format_file = f"{format_folder}\\{table}.csv.xml"
  87. sql_file = f"{sql_folder}\\views_export\\export_csv.{table}.sql"
  88. col_types = get_column_types(csv_file, columns)
  89. select_fields = [get_select_statement(col, col_type) for col, col_type in col_types.items()]
  90. query = (
  91. "SELECT "
  92. + ",\n\t".join(select_fields)
  93. + "\nFROM OPENROWSET(\n"
  94. + f"\tBULK '{csv_file}',\n"
  95. + f"\tFORMATFILE = '{format_file}',\n"
  96. + "\tFIRSTROW = 2\n"
  97. + ") AS T1"
  98. )
  99. create_view = get_create_view("export_csv", table, query)
  100. with open(sql_file, "w", encoding="latin-1") as fwh:
  101. fwh.write(create_view)
  102. def get_create_view(schema: str, table: str, query: str) -> str:
  103. create_view = (
  104. "SET QUOTED_IDENTIFIER ON\nGO\n\nSET ANSI_NULLS ON\nGO\n\n"
  105. + f"CREATE\n\tOR\n\nALTER VIEW [{schema}].[{table}]\nAS\n{query}\n"
  106. + "GO\n\nSET QUOTED_IDENTIFIER OFF\nGO\n\nSET ANSI_NULLS OFF\nGO\n\n\nGO\n\n\n"
  107. )
  108. return create_view
  109. def get_select_statement(col: str, col_type: str) -> str:
  110. convert = col_type_convert.get(col_type)
  111. if convert:
  112. return f"dbo.{convert}(T1.[{col}]) AS [{col}]"
  113. return f"T1.[{col}]"
  114. def create_load_sql_files(sql_folder: str, tables_with_columns: dict[str, list[str]]):
  115. for table, columns in tables_with_columns.items():
  116. sql_file = f"{sql_folder}\\views_load\\load.{table}.sql"
  117. cols = [f"[{c}]" for c in columns]
  118. query = "SELECT " + ",\n\t".join(cols) + f"\nFROM [export_csv].[{table}]"
  119. create_view = get_create_view("load", table, query)
  120. with open(sql_file, "w", encoding="latin-1") as fwh:
  121. fwh.write(create_view)
  122. def create_drop_create_sql_files(sql_folder: str, tables_with_columns: dict[str, list[str]], system: str = "OPTIMA"):
  123. for table in tables_with_columns.keys():
  124. sql_file = f"{sql_folder}\\exec_drop_create\\{table}.sql"
  125. query = (
  126. f"USE [GC]\nGO\n\nDROP TABLE IF EXISTS [{system.lower()}].[{table}]\nGO\n\n"
  127. + f"SELECT *\nINTO [{system.lower()}].[{table}]\nFROM [{system}].[load].[{table}]"
  128. )
  129. with open(sql_file, "w", encoding="latin-1") as fwh:
  130. fwh.write(query)
  131. def create_update_sql_files(sql_folder: str, tables_with_columns: dict[str, list[str]], system: str = "OPTIMA"):
  132. for table in tables_with_columns.keys():
  133. sql_file = f"{sql_folder}\\exec_update\\{table}.sql"
  134. query = (
  135. f"USE [GC]\nGO\n\nTRUNCATE TABLE [{system.lower()}].[{table}]\nGO\n\n"
  136. + f"INSERT INTO [{system.lower()}].[{table}]\n"
  137. + f"SELECT *\nFROM [{system}].[load].[{table}]"
  138. )
  139. with open(sql_file, "w", encoding="latin-1") as fwh:
  140. fwh.write(query)
  141. def get_tables_with_columns_from_folder(base_dir: str) -> dict[str, list[str]]:
  142. tables_with_columns = {}
  143. for csv_file in Path(base_dir).glob("*.csv"):
  144. table_name = csv_file.stem
  145. with open(csv_file, "r", encoding="latin-1") as frh:
  146. cols = frh.readline().strip("\n").split(";")
  147. cols_unique = []
  148. for c in cols:
  149. c1 = c.strip('"')
  150. if c1 not in cols_unique:
  151. cols_unique.append(c1)
  152. continue
  153. for i in count(1):
  154. c2 = f"{c1}_{i}"
  155. if c2 not in cols and c2 not in cols_unique:
  156. cols_unique.append(c2)
  157. break
  158. tables_with_columns[table_name] = cols_unique
  159. return tables_with_columns
  160. if __name__ == "__main__":
  161. # schema_convert("C:\\GlobalCube_LOCOSOFT\\GCStruct_SKR51\\Kontenrahmen")
  162. create_format_xml_from_folder("C:\\GlobalCube_LOCOSOFT\\System\\LOCOSOFT\\Export")