| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175 |
- import json
- import pyodbc
- def connect():
- c = pyodbc.connect("DSN=Autoline_direkt64;UID=kcc;PWD=kcc123")
- return c.cursor()
- def convert_desc(col):
- nullable = "NULL" if col.nullable == 1 else "NOT NULL"
- if col.type_name == "CHAR":
- return f" [{col.column_name}] [varchar]({col.length}) {nullable}"
- if col.type_name == "HEX":
- return f" [{col.column_name}] [binary]({col.length}) {nullable}"
- if col.type_name == "INTEGER":
- return f" [{col.column_name}] [int] {nullable}"
- if col.type_name == "DATE":
- return f" [{col.column_name}] [datetime] {nullable}"
- if col.type_name == "NUMERIC":
- return f" [{col.column_name}] [decimal]({col.length},{col.scale}) {nullable}"
- if col.type_name == "BIT":
- return f" [{col.column_name}] [boolean] {nullable}"
- if col.type_name == "MEMO":
- return f" [{col.column_name}] [text] {nullable}"
- return ", ".join(list(map(str, col)))
- def convert_desc_sql_anywhere(col):
- nullable = "NULL" if col.nullable == 1 else "NOT NULL"
- if col.type_name in (
- "char",
- "varchar",
- "long varchar",
- "long nvarchar",
- "nvarchar",
- ):
- sizes = [20, 50, 100, 255]
- length = "MAX"
- for s in sizes:
- if col.length < s:
- length = s
- break
- return f" [{col.column_name}] [varchar]({length}) {nullable}"
- if col.type_name in (
- "binary",
- "long binary",
- "varbinary",
- ):
- return f" [{col.column_name}] [binary]({col.length}) {nullable}"
- if col.type_name in (
- "bit",
- "tinyint",
- ):
- return f" [{col.column_name}] [tinyint] {nullable}"
- if col.type_name in (
- "integer",
- "smallint",
- "unsigned smallint",
- "unsigned int",
- ):
- return f" [{col.column_name}] [int] {nullable}"
- if col.type_name in (
- "bigint",
- "unsigned bigint",
- "long varbit",
- ):
- return f" [{col.column_name}] [bigint] {nullable}"
- if col.type_name in (
- "date",
- "timestamp",
- "timestamp with time zone",
- ):
- return f" [{col.column_name}] [datetime] {nullable}"
- if col.type_name in (
- "decimal",
- "float",
- "double",
- "numeric",
- ):
- return f" [{col.column_name}] [decimal](28,8) {nullable}"
- if col.type_name in ("time"):
- return f" [{col.column_name}] [time] {nullable}"
- return ", ".join(list(map(str, col)))
- def convert_desc2(col):
- return ", ".join(list(map(str, col)))
- # table_name = [x[2] for x in crsr.tables(tableType='VIEW')]
- # open("views.txt", "w").write("\n".join(table_name))
- tables = []
- res = {}
- def tables_cols():
- crsr = connect()
- for t in tables:
- try:
- cols = crsr.columns(table=t)
- # print([x[0] for x in crsr.description])
- res[t] = [convert_desc(c) for c in cols]
- crsr.cancel()
- except pyodbc.Error as e:
- print(e)
- if t != "":
- res[t] = []
- crsr = connect()
- json.dump(res, open("schema.json", "w"), indent=2)
- def pkeys():
- crsr = connect()
- for t in tables:
- try:
- cols = crsr.primaryKeys(table=t)
- # print([x[0] for x in crsr.description])
- if res.get(t) is None:
- res[t] = []
- res[t].append(
- f" CONSTRAINT [{t}$0] PRIMARY KEY CLUSTERED ([" + "], [".join([c.column_name for c in cols]) + "])"
- )
- crsr.cancel()
- except pyodbc.Error as e:
- print(e)
- if t != "":
- res[t] = []
- crsr = connect()
- json.dump(res, open("pkeys.json", "w"), indent=2)
- def fkeys():
- crsr = connect()
- for t in tables:
- try:
- cols = crsr.foreignKeys(table=t)
- print([x[0] for x in crsr.description])
- if res.get(t) is None:
- res[t] = []
- res[t].append([convert_desc2(c) for c in cols])
- crsr.cancel()
- except pyodbc.Error as e:
- print(e)
- if t != "":
- res[t] = []
- crsr = connect()
- def tables_create():
- for t, cols in res.items():
- with open("../sql_load/schema/AUTOLINE/tables/import." + t + ".sql", "w") as wh:
- wh.write(f"CREATE TABLE [import].[{t}] (\n")
- wh.write(",\n".join(cols))
- wh.write(")\n\nGO\n")
- def schema():
- with open("tables.txt", "r") as rh:
- tables.extend(rh.read().split("\n"))
- tables_cols()
- pkeys()
- # fkeys()
- tables_create()
|