123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110 |
- import pyodbc
- import json
- def connect():
- c = pyodbc.connect("DSN=Autoline_direkt64;UID=kcc;PWD=kcc123")
- return c.cursor()
- def convert_desc(col):
- nullable = "NULL" if col.nullable == 1 else "NOT NULL"
- if col.type_name == "CHAR":
- return f" [{col.column_name}] [varchar]({col.length}) {nullable}"
- if col.type_name == "HEX":
- return f" [{col.column_name}] [binary]({col.length}) {nullable}"
- if col.type_name == "INTEGER":
- return f" [{col.column_name}] [int] {nullable}"
- if col.type_name == "DATE":
- return f" [{col.column_name}] [datetime] {nullable}"
- if col.type_name == "NUMERIC":
- return f" [{col.column_name}] [decimal]({col.length},{col.scale}) {nullable}"
- if col.type_name == "BIT":
- return f" [{col.column_name}] [boolean] {nullable}"
- if col.type_name == "MEMO":
- return f" [{col.column_name}] [text] {nullable}"
- return ", ".join(list(map(str, col)))
- def convert_desc2(col):
- return ", ".join(list(map(str, col)))
- # table_name = [x[2] for x in crsr.tables(tableType='VIEW')]
- # open("views.txt", "w").write("\n".join(table_name))
- with open("tables.txt", "r") as rh:
- tables = rh.read().split("\n")
- res = {}
- def tables_cols():
- crsr = connect()
- for t in tables:
- try:
- cols = crsr.columns(table=t)
- # print([x[0] for x in crsr.description])
- res[t] = [convert_desc(c) for c in cols]
- crsr.cancel()
- except pyodbc.Error as e:
- print(e)
- if t != "":
- res[t] = []
- crsr = connect()
- json.dump(res, open("schema.json", "w"), indent=2)
- def pkeys():
- crsr = connect()
- for t in tables:
- try:
- cols = crsr.primaryKeys(table=t)
- # print([x[0] for x in crsr.description])
- if res.get(t) is None:
- res[t] = []
- res[t].append(
- f" CONSTRAINT [{t}$0] PRIMARY KEY CLUSTERED (["
- + "], [".join([c.column_name for c in cols])
- + "])"
- )
- crsr.cancel()
- except pyodbc.Error as e:
- print(e)
- if t != "":
- res[t] = []
- crsr = connect()
- json.dump(res, open("pkeys.json", "w"), indent=2)
- def fkeys():
- crsr = connect()
- for t in tables:
- try:
- cols = crsr.foreignKeys(table=t)
- print([x[0] for x in crsr.description])
- if res.get(t) is None:
- res[t] = []
- res[t].append([convert_desc2(c) for c in cols])
- crsr.cancel()
- except pyodbc.Error as e:
- print(e)
- if t != "":
- res[t] = []
- crsr = connect()
- def tables_create():
- for t, cols in res.items():
- with open("../sql_load/schema/AUTOLINE/tables/import." + t + ".sql", "w") as wh:
- wh.write(f"CREATE TABLE [import].[{t}] (\n")
- wh.write(",\n".join(cols))
- wh.write(")\n\nGO\n")
- def schema():
- tables_cols()
- pkeys()
- # fkeys()
- tables_create()
|