db_schema.py 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. import pyodbc
  2. import json
  3. def connect():
  4. c = pyodbc.connect("DSN=Autoline_direkt64;UID=kcc;PWD=kcc123")
  5. return c.cursor()
  6. def convert_desc(col):
  7. nullable = "NULL" if col.nullable == 1 else "NOT NULL"
  8. if col.type_name == "CHAR":
  9. return f" [{col.column_name}] [varchar]({col.length}) {nullable}"
  10. if col.type_name == "HEX":
  11. return f" [{col.column_name}] [binary]({col.length}) {nullable}"
  12. if col.type_name == "INTEGER":
  13. return f" [{col.column_name}] [int] {nullable}"
  14. if col.type_name == "DATE":
  15. return f" [{col.column_name}] [datetime] {nullable}"
  16. if col.type_name == "NUMERIC":
  17. return f" [{col.column_name}] [decimal]({col.length},{col.scale}) {nullable}"
  18. if col.type_name == "BIT":
  19. return f" [{col.column_name}] [boolean] {nullable}"
  20. if col.type_name == "MEMO":
  21. return f" [{col.column_name}] [text] {nullable}"
  22. return ", ".join(list(map(str, col)))
  23. def convert_desc2(col):
  24. return ", ".join(list(map(str, col)))
  25. # table_name = [x[2] for x in crsr.tables(tableType='VIEW')]
  26. # open("views.txt", "w").write("\n".join(table_name))
  27. tables = []
  28. res = {}
  29. def tables_cols():
  30. crsr = connect()
  31. for t in tables:
  32. try:
  33. cols = crsr.columns(table=t)
  34. # print([x[0] for x in crsr.description])
  35. res[t] = [convert_desc(c) for c in cols]
  36. crsr.cancel()
  37. except pyodbc.Error as e:
  38. print(e)
  39. if t != "":
  40. res[t] = []
  41. crsr = connect()
  42. json.dump(res, open("schema.json", "w"), indent=2)
  43. def pkeys():
  44. crsr = connect()
  45. for t in tables:
  46. try:
  47. cols = crsr.primaryKeys(table=t)
  48. # print([x[0] for x in crsr.description])
  49. if res.get(t) is None:
  50. res[t] = []
  51. res[t].append(
  52. f" CONSTRAINT [{t}$0] PRIMARY KEY CLUSTERED (["
  53. + "], [".join([c.column_name for c in cols])
  54. + "])"
  55. )
  56. crsr.cancel()
  57. except pyodbc.Error as e:
  58. print(e)
  59. if t != "":
  60. res[t] = []
  61. crsr = connect()
  62. json.dump(res, open("pkeys.json", "w"), indent=2)
  63. def fkeys():
  64. crsr = connect()
  65. for t in tables:
  66. try:
  67. cols = crsr.foreignKeys(table=t)
  68. print([x[0] for x in crsr.description])
  69. if res.get(t) is None:
  70. res[t] = []
  71. res[t].append([convert_desc2(c) for c in cols])
  72. crsr.cancel()
  73. except pyodbc.Error as e:
  74. print(e)
  75. if t != "":
  76. res[t] = []
  77. crsr = connect()
  78. def tables_create():
  79. for t, cols in res.items():
  80. with open("../sql_load/schema/AUTOLINE/tables/import." + t + ".sql", "w") as wh:
  81. wh.write(f"CREATE TABLE [import].[{t}] (\n")
  82. wh.write(",\n".join(cols))
  83. wh.write(")\n\nGO\n")
  84. def schema():
  85. with open("tables.txt", "r") as rh:
  86. tables.extend(rh.read().split("\n"))
  87. tables_cols()
  88. pkeys()
  89. # fkeys()
  90. tables_create()