123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116 |
- import pyodbc
- import json
- def connect():
- c = pyodbc.connect("DSN=Locosoft;UID=loco_auswertung_benutzer;PWD=loco")
- return c.cursor()
- def convert_desc(col):
- nullable = "NULL" if col.nullable == 1 else "NOT NULL"
- if col.type_name in ('CHAR', 'varchar'):
- return f' [{col.column_name}] [varchar]({convert_len(col.length)}) {nullable}'
- if col.type_name in ('HEX'):
- return f' [{col.column_name}] [binary]({col.length}) {nullable}'
- if col.type_name in ('INTEGER', 'int4'):
- return f' [{col.column_name}] [int] {nullable}'
- if col.type_name in ('INTEGER', 'int8', 'timestamp'):
- return f' [{col.column_name}] [bigint] {nullable}'
- if col.type_name in ('DATE', 'date'):
- return f' [{col.column_name}] [datetime] {nullable}'
- if col.type_name in ('NUMERIC', 'numeric'):
- return f' [{col.column_name}] [numeric]({col.length},{col.scale}) {nullable}'
- if col.type_name in ('BIT', 'bool'):
- return f' [{col.column_name}] [smallint] {nullable}'
- if col.type_name in ('MEMO', 'text'):
- return f' [{col.column_name}] [varchar](100) {nullable}'
- return ", ".join(list(map(str, col)))
- def convert_len(length):
- if length < 8:
- return 10
- if length < 38:
- return 50
- if length < 88:
- return 100
- return 255
- def convert_desc2(col):
- return ", ".join(list(map(str, col)))
- # table_name = [x[2] for x in crsr.tables(tableType='VIEW')]
- # open("views.txt", "w").write("\n".join(table_name))
- with open("tables.txt", "r") as rh:
- tables = rh.read().split("\n")
- res = {}
-
- def tables_cols():
- crsr = connect()
- for t in tables:
- try:
- cols = crsr.columns(table=t)
- # print([x[0] for x in crsr.description])
- res[t] = [convert_desc(c) for c in cols]
- res[t].append(" [client_db] [varchar](20) NOT NULL")
- crsr.cancel()
- except pyodbc.Error as e:
- print(e)
- if t != '':
- res[t] = []
- crsr = connect()
- json.dump(res, open("schema.json", "w"), indent=2)
-
- def pkeys():
- crsr = connect()
- for t in tables:
- try:
- cols = crsr.primaryKeys(table=t)
- # print([x[0] for x in crsr.description])
- if res.get(t) is None:
- res[t] = []
-
- res[t].append(f"\n CONSTRAINT [{t}$0] PRIMARY KEY CLUSTERED ([" + "], [".join([c.column_name for c in cols] + ['client_db']) + "])")
- crsr.cancel()
- except pyodbc.Error as e:
- print(t)
- print(e)
- if t != '':
- res[t] = []
- crsr = connect()
- json.dump(res, open("pkeys.json", "w"), indent=2)
-
- def fkeys():
- crsr = connect()
- for t in tables:
- try:
- cols = crsr.foreignKeys(table=t)
- print([x[0] for x in crsr.description])
- if res.get(t) is None:
- res[t] = []
- res[t].append([convert_desc2(c) for c in cols])
- crsr.cancel()
- except pyodbc.Error as e:
- print(e)
- if t != '':
- res[t] = []
- crsr = connect()
-
-
- def tables_create():
- for t, cols in res.items():
- with open("../../System/LOCOSOFT/SQL/schema/LOCOSOFT/sql_load/" + t + ".sql", "w") as wh:
- wh.write(f"CREATE TABLE [dbo].[{t}] (\n")
- wh.write(",\n".join(cols))
- wh.write("\n)\n\nGO\n")
- if __name__ == '__main__':
- tables_cols()
- pkeys()
- # fkeys()
- tables_create()
|