schema_ini_convert.py 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231
  1. import json
  2. import os
  3. from itertools import count
  4. from pathlib import Path
  5. from cognos7.csv_column_types import get_column_types
  6. col_type_convert = {
  7. "varchar(20)": "to_varchar20",
  8. "varchar(50)": "to_varchar50",
  9. "varchar(100)": "to_varchar100",
  10. "varchar(255)": "to_varchar255",
  11. "decimal(18,8)": "to_decimal",
  12. "int": "to_int",
  13. "datetime": "to_datetime",
  14. "date": "to_datetime",
  15. }
  16. def schema_convert(base_dir: str) -> None:
  17. base_dir = Path(base_dir).absolute()
  18. schema_file = base_dir / "schema.ini"
  19. with open(schema_file, "r", encoding="latin-1") as frh:
  20. schema = frh.read()
  21. # schema = schema.replace('\n\n', '\n').replace('\n\n', '\n')
  22. tables = dict([convert_table_info(t) for t in schema[1:].split("\n[")])
  23. tables_with_columns = {}
  24. for table, details in tables.items():
  25. table_file = base_dir / table
  26. if "schema.ini" in table or not table_file.exists():
  27. continue
  28. col_names = [c.split(" ")[0] for c in details["columns"].values()]
  29. tables_with_columns[table] = col_names
  30. create_format_xml_files(str(base_dir), tables_with_columns)
  31. def convert_table_info(table_info: str) -> tuple[str, dict]:
  32. info = table_info.split("]\n")
  33. if len(info) < 2:
  34. return ("", "")
  35. details = {}
  36. details["columns"] = {}
  37. for key, value in [row.split("=") for row in info[1].split("\n") if "=" in row]:
  38. if key.lower() != "colnameheader" and key.lower()[:3] == "col":
  39. details["columns"][key[3:]] = value
  40. return (info[0], details)
  41. def create_format_xml_files(base_dir: str, tables_with_columns: dict[str, list[str]]) -> None:
  42. for table, columns in tables_with_columns.items():
  43. table_file = table
  44. if table[:-4] != ".csv":
  45. table_file = f"{table}.csv"
  46. format_file = f"{base_dir}\\{table_file}.xml"
  47. record = []
  48. row = []
  49. for i, col_name in enumerate(columns, 1):
  50. record.append(
  51. f' <FIELD ID="{i}" xsi:type="CharTerm" TERMINATOR=";" MAX_LENGTH="255" COLLATION="SQL_Latin1_General_CP1_CI_AS"/>'
  52. )
  53. col_name_escape = (
  54. col_name.replace("&", "&#38;")
  55. .encode("ascii", "xmlcharrefreplace")
  56. .decode()
  57. .replace("<", "&#60;")
  58. .replace(">", "&#62;")
  59. )
  60. row.append(f' <COLUMN SOURCE="{i}" NAME="{col_name_escape}" xsi:type="SQLVARYCHAR"/>')
  61. record[-1] = record[-1].replace(";", "\\r\\n")
  62. with open(format_file, "w") as fwh:
  63. fwh.write(
  64. '<?xml version="1.0"?>\n<BCPFORMAT xmlns="http://schemas.microsoft.com/sqlserver/2004/bulkload/format" '
  65. )
  66. fwh.write('xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">\n <RECORD>\n')
  67. fwh.write("\n".join(record))
  68. fwh.write("\n </RECORD>\n <ROW>\n")
  69. fwh.write("\n".join(row))
  70. fwh.write("\n </ROW>\n</BCPFORMAT>")
  71. def create_format_xml_from_folder(base_dir: str) -> None:
  72. system = Path(base_dir).parent.name
  73. format_folder = f"{base_dir}\\Format"
  74. os.makedirs(format_folder, exist_ok=True)
  75. sql_folder = f"{base_dir}\\SQL"
  76. os.makedirs(f"{sql_folder}\\views_export", exist_ok=True)
  77. os.makedirs(f"{sql_folder}\\views_load", exist_ok=True)
  78. os.makedirs(f"{sql_folder}\\exec_drop_create", exist_ok=True)
  79. os.makedirs(f"{sql_folder}\\exec_update", exist_ok=True)
  80. tables_with_columns = get_tables_with_columns_from_folder(base_dir)
  81. with open(f"{format_folder}\\tables.json", "w") as fwh:
  82. json.dump(tables_with_columns, fwh, indent=2)
  83. create_format_xml_files(format_folder, tables_with_columns)
  84. create_openrowset_sql_files(base_dir, format_folder, sql_folder, tables_with_columns)
  85. create_load_sql_files(sql_folder, tables_with_columns)
  86. create_drop_create_sql_files(sql_folder, tables_with_columns, system)
  87. create_update_sql_files(sql_folder, tables_with_columns, system)
  88. create_sql_bat_files(sql_folder)
  89. def create_openrowset_sql_files(
  90. base_dir: str, format_folder: str, sql_folder: str, tables_with_columns: dict[str, list[str]]
  91. ) -> None:
  92. for table, columns in tables_with_columns.items():
  93. csv_file = f"{base_dir}\\{table}.csv"
  94. format_file = f"{format_folder}\\{table}.csv.xml"
  95. sql_file = f"{sql_folder}\\views_export\\export_csv.{table}.sql"
  96. col_types = get_column_types(csv_file, columns)
  97. select_fields = [get_select_statement(col, col_type) for col, col_type in col_types.items()]
  98. query = (
  99. "SELECT "
  100. + ",\n\t".join(select_fields)
  101. + "\nFROM OPENROWSET(\n"
  102. + f"\tBULK '{csv_file}',\n"
  103. + f"\tFORMATFILE = '{format_file}',\n"
  104. + "\tFIRSTROW = 2\n"
  105. + ") AS T1"
  106. )
  107. create_view = get_create_view("export_csv", table, query)
  108. with open(sql_file, "w", encoding="latin-1") as fwh:
  109. fwh.write(create_view)
  110. def get_create_view(schema: str, table: str, query: str) -> str:
  111. create_view = (
  112. "SET QUOTED_IDENTIFIER ON\nGO\n\nSET ANSI_NULLS ON\nGO\n\n"
  113. + f"CREATE\n\tOR\n\nALTER VIEW [{schema}].[{table}]\nAS\n{query}\n"
  114. + "GO\n\nSET QUOTED_IDENTIFIER OFF\nGO\n\nSET ANSI_NULLS OFF\nGO\n\n\nGO\n\n\n"
  115. )
  116. return create_view
  117. def get_select_statement(col: str, col_type: str) -> str:
  118. convert = col_type_convert.get(col_type)
  119. if convert:
  120. return f"dbo.{convert}(T1.[{col}]) AS [{col}]"
  121. return f"T1.[{col}]"
  122. def create_load_sql_files(sql_folder: str, tables_with_columns: dict[str, list[str]]):
  123. for table, columns in tables_with_columns.items():
  124. sql_file = f"{sql_folder}\\views_load\\load.{table}.sql"
  125. cols = [f"[{c}]" for c in columns]
  126. query = "SELECT " + ",\n\t".join(cols) + f"\nFROM [export_csv].[{table}]"
  127. create_view = get_create_view("load", table, query)
  128. with open(sql_file, "w", encoding="latin-1") as fwh:
  129. fwh.write(create_view)
  130. def create_drop_create_sql_files(sql_folder: str, tables_with_columns: dict[str, list[str]], system: str = "OPTIMA"):
  131. for table in tables_with_columns.keys():
  132. sql_file = f"{sql_folder}\\exec_drop_create\\{table}.sql"
  133. query = (
  134. f"USE [GC]\nGO\n\nDROP TABLE IF EXISTS [{system.lower()}].[{table}]\nGO\n\n"
  135. + f"SELECT *\nINTO [{system.lower()}].[{table}]\nFROM [{system}].[load].[{table}]"
  136. )
  137. with open(sql_file, "w", encoding="latin-1") as fwh:
  138. fwh.write(query)
  139. def create_update_sql_files(sql_folder: str, tables_with_columns: dict[str, list[str]], system: str = "OPTIMA"):
  140. for table in tables_with_columns.keys():
  141. sql_file = f"{sql_folder}\\exec_update\\{table}.sql"
  142. query = (
  143. f"USE [GC]\nGO\n\nTRUNCATE TABLE [{system.lower()}].[{table}]\nGO\n\n"
  144. + f"INSERT INTO [{system.lower()}].[{table}]\n"
  145. + f"SELECT *\nFROM [{system}].[load].[{table}]"
  146. )
  147. with open(sql_file, "w", encoding="latin-1") as fwh:
  148. fwh.write(query)
  149. def get_tables_with_columns_from_folder(base_dir: str) -> dict[str, list[str]]:
  150. tables_with_columns = {}
  151. for csv_file in Path(base_dir).glob("*.csv"):
  152. table_name = csv_file.stem
  153. with open(csv_file, "r", encoding="latin-1") as frh:
  154. cols = frh.readline().strip("\n").split(";")
  155. cols_unique = []
  156. cols_unique_lower = []
  157. for c in cols:
  158. c1 = c.strip('"')
  159. if c1.lower() not in cols_unique_lower:
  160. cols_unique.append(c1)
  161. cols_unique_lower.append(c1.lower())
  162. continue
  163. for i in count(1):
  164. c2 = f"{c1}_{i}"
  165. if c2 not in cols and c2.lower() not in cols_unique_lower:
  166. cols_unique.append(c2)
  167. cols_unique_lower.append(c2.lower())
  168. break
  169. tables_with_columns[table_name] = cols_unique
  170. return tables_with_columns
  171. def create_sql_bat_files(sql_folder: str) -> None:
  172. tasks_dir = Path(sql_folder).parent.parent.parent.parent / "Tasks" / "scripts"
  173. header = f'@call "{tasks_dir}\\config.bat" 0 > nul'
  174. folder_list = [
  175. "views_export",
  176. "views_load",
  177. "exec_drop_create",
  178. "exec_update",
  179. ]
  180. for f in folder_list:
  181. folder = f"{sql_folder}\\{f}"
  182. bat_file = f"{folder}\\{f}.bat"
  183. with open(bat_file, "w", encoding="cp850") as fwh:
  184. fwh.write(header + "\n\n")
  185. fwh.write(f"echo {folder}\n")
  186. for sql_file in Path(folder).glob("*.sql"):
  187. fwh.write(f" call sqlexec2.bat {sql_file}\n")
  188. if __name__ == "__main__":
  189. # schema_convert("C:\\GlobalCube_LOCOSOFT\\GCStruct_SKR51\\Kontenrahmen")
  190. create_format_xml_from_folder("C:\\GlobalCube_LOCOSOFT\\System\\LOCOSOFT\\Export")