123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301 |
- import json
- from pathlib import Path
- import re
- CONVERSION = [
- "Name",
- "CognosSource",
- "CognosPackageDatasourceConnection",
- "DataSource",
- "OrgName",
- "Dimension",
- "Root",
- "Drill",
- "Levels",
- "Associations",
- "Category",
- "SpecialCategory",
- "MapDrills",
- "ViewName",
- "Measure",
- "Signon",
- "Cube",
- "CustomView",
- "CustomViewChildList",
- "SecurityNameSpace",
- "SecurityObject",
- ]
- id_lookup = {}
- find_words = re.compile(r'("[^"]+"|\w+) ')
- def convert_block(block):
- block = block.replace("\n", "")
- block_type = block.split(" ")[0]
- words = find_words.findall(block)
- if len(words) < 3:
- return {"Type": block_type}
- result = {"Type": words[0], "ID": words[1], "Name": words[2].strip('"')}
- offset = 0
- for i in range(3, len(words), 2):
- if len(words) < i + offset + 2:
- break
- key = words[i + offset]
- if key in ["PackageReportSource", "Database"]:
- result[key] = {
- "ID": words[i + offset + 1],
- "Name": words[i + offset + 2].strip('"'),
- }
- offset += 1
- elif key in ["DimensionView"]:
- if key + "s" not in result:
- result[key + "s"] = []
- result[key + "s"].append(
- {"ID": words[i + offset + 1], "Name": words[i + offset + 2].strip('"')}
- )
- offset += 1
- elif key in ["MeasureInclude"]:
- if key + "s" not in result:
- result[key + "s"] = []
- result[key + "s"].append(
- {"ID": words[i + offset + 1], "Include": words[i + offset + 2]}
- )
- offset += 1
- elif key == "Calc":
- for j in range(i + offset + 1, len(words)):
- if words[j] in ["Sign", "Format", "Filtered"] or j == len(words) - 1:
- result["Calc"] = " ".join(words[i + offset + 1 : j])
- offset = j - i - 1
- break # for
- elif key == "EncryptedPW":
- result["EncryptedPW"] = words[i + offset + 1].strip('"')
- result["Salt"] = words[i + offset + 2].strip('"')
- offset += 1
- elif key == "AllocationAdd":
- if key + "s" not in result:
- result[key + "s"] = []
- result[key + "s"].append(
- {"Measure": words[i + offset + 2], "Type": words[i + offset + 4]}
- )
- offset += 3
- elif key in [
- "CustomViewList",
- "DrillThrough",
- "DeployLocations",
- "PowerCubeCustomViewList",
- "StartList",
- "TransientLevelList",
- ]:
- for j in range(i + offset + 1, len(words)):
- if words[j] in ["EndList"]:
- result[key] = " ".join(words[i + offset + 1 : j])
- offset = j - i - 1
- break # for
- # elif words[i + offset].isnumeric() or words[i + offset].startswith('"'):
- # offset += 1
- else:
- result[key] = words[i + offset + 1].strip('"')
- if block_type == "DataSource":
- result["Columns"] = []
- if block_type in ["OrgName", "Levels", "Measure"]:
- result["Associations"] = []
- if block_type == "Dimension":
- result["Root"] = {}
- result["Levels"] = []
- result["Categories"] = []
- result["SpecialCategories"] = []
- if block_type == "Root":
- result["Drill"] = {}
- if block_type == "Associations":
- result["Parent"] = 0
- if block_type == "CustomView":
- result["ChildList"] = {}
- if block_type == "SecurityNameSpace":
- result["Objects"] = []
- return result
- def remove_ids(nested):
- nested.pop("ID", "")
- nested.pop("DateDrill", "")
- nested.pop("Primary", "")
- nested.pop("Lastuse", "")
- nested.pop("AssociationContext", "")
- if (
- nested.get("Type", "") == "SpecialCategory"
- and "Label" in nested
- and "20" in nested["Label"]
- ):
- nested.pop("Label", "")
- for col in ["Parent", "Levels", "CustomViewList"]:
- if col not in nested:
- continue
- if col == "Levels" and (
- isinstance(nested["Levels"], list) or nested["Levels"] == "0"
- ):
- continue
- nested[col] = id_lookup.get(nested[col], {}).get("Name", "undefined")
- for child in nested.values():
- if isinstance(child, dict):
- remove_ids(child)
- if isinstance(child, list):
- for entry in child:
- remove_ids(entry)
- return nested
- def prepare_mdl_str(mdl_str):
- mdl_str = re.sub(r"\n+", "\n", mdl_str)
- mdl_str = re.sub(r"^\n?Name ", "ModelName 1 ", mdl_str)
- mdl_str = re.sub(r'\nLevels (\d+ [^"])', r"Levels \1", mdl_str)
- mdl_str = re.sub(r" Associations ", " \nAssociations ", mdl_str)
- mdl_str = re.sub(r'([^ ])""', r"\1'", mdl_str)
- mdl_str = re.sub(r'""([^ ])', r"'\1", mdl_str)
- tags = "|".join(CONVERSION)
- mdl_str = re.sub(r"\n(" + tags + r") ", r"\n\n\1 ", mdl_str)
- return mdl_str
- def group_mdl_blocks(converted):
- result = {
- "Model": {},
- "Connections": [],
- "DataSources": [],
- "Dimensions": [],
- "Measures": [],
- "Signons": [],
- "CustomViews": [],
- "Security": [],
- "Cubes": [],
- }
- types = [c["Type"] for c in converted]
- ids = [c.get("ID", "0") for c in converted]
- id_lookup.update(dict(zip(ids, converted)))
- current = None
- level_ids = []
- for c, t in zip(converted, types):
- if t in [""]:
- continue
- if (
- t in ["Category", "SpecialCategory"]
- and result["Dimensions"][-1]["Name"] == "Zeit"
- ):
- if t == "Category" or c["Name"][0].isnumeric():
- continue
- if t in ["ModelName"]:
- result["Model"] = c
- elif t in ["CognosSource", "CognosPackageDatasourceConnection"]:
- result["Connections"].append(c)
- elif t in ["DataSource"]:
- result["DataSources"].append(c)
- elif t in ["OrgName"]:
- result["DataSources"][-1]["Columns"].append(c)
- elif t in ["Dimension"]:
- level_ids = []
- result["Dimensions"].append(c)
- elif t in ["Root"]:
- result["Dimensions"][-1]["Root"] = c
- elif t in ["Drill"]:
- result["Dimensions"][-1]["Root"]["Drill"] = c
- elif t in ["Levels"]:
- current = c
- level_ids.append(c["ID"])
- result["Dimensions"][-1]["Levels"].append(c)
- elif t in ["Category"]:
- if c["Levels"] in level_ids[0:2]:
- result["Dimensions"][-1]["Categories"].append(c)
- elif t in ["SpecialCategory"]:
- result["Dimensions"][-1]["SpecialCategories"].append(c)
- elif t in ["Measure"]:
- current = c
- result["Measures"].append(c)
- elif t in ["Associations"]:
- c["Parent"] = current["ID"]
- current["Associations"].append(c)
- for ds in result["DataSources"]:
- for col in ds["Columns"]:
- if col["Column"] == c["AssociationReferenced"]:
- col["Associations"].append(c)
- elif t in ["Signon"]:
- result["Signons"].append(c)
- elif t in ["Cube"]:
- result["Cubes"].append(c)
- elif t in ["CustomView"]:
- result["CustomViews"].append(c)
- elif t in ["CustomViewChildList"]:
- for cv in result["CustomViews"]:
- if cv["ID"] == c["ID"]:
- cv["ChildList"] = c
- elif t in ["SecurityNameSpace"]:
- result["Security"].append(c)
- elif t in ["SecurityObject"]:
- result["Security"][-1]["Objects"].append(c)
- # else:
- # print(t, c)
- return result
- def build_query(datasource):
- table = datasource["Name"]
- suffix = "_fm" if datasource["SourceType"] == "CognosSourceQuery" else "_imr"
- table_name = f"[staging].[{table}{suffix}]"
- view_name = f"[load].[{table}]"
- columns = ",\n ".join([extract_column(c) for c in datasource["Columns"]])
- return f"CREATE VIEW {view_name}\nAS\nSELECT {columns} \nFROM {table_name}\nGO\n\n"
- def extract_column(col):
- name = col["Name"]
- if "]." in name:
- name = name.split("].")[-1]
- alias = col["Column"]
- is_used = "" if len(col["Associations"]) > 0 else "--"
- return f'{is_used}{name} AS "{alias}"'
- def convert_file(filename):
- with open(filename, "r", encoding="latin-1") as frh:
- mdl_str = frh.read()
- mdl_str = prepare_mdl_str(mdl_str)
- mdl_blocks = mdl_str.split("\n\n")
- converted = [convert_block(b) for b in mdl_blocks]
- grouped = group_mdl_blocks(converted)
- with open(filename[:-4] + "_ori.json", "w") as fwh:
- json.dump(grouped, fwh, indent=2)
- # yaml.safe_dump(result, open(filename[:-4] + ".yaml", "w"))
- without_ids = remove_ids(grouped)
- with open(filename[:-4] + ".json", "w") as fwh:
- json.dump(without_ids, fwh, indent=2)
- queries = [build_query(ds) for ds in grouped["DataSources"]]
- with open(filename[:-4] + "_queries.txt", "w", encoding="latin-1") as fwh:
- fwh.writelines(queries)
- def convert_folder(base_dir):
- files = sorted([(f.stat().st_mtime, f) for f in Path(base_dir).rglob("*.mdl")])
- for timestamp, filename in files:
- convert_file(str(filename))
- if __name__ == "__main__":
- # convert_file("data/S_Offene_Auftraege.mdl")
- # convert_file("data/F_Belege_SKR_SKR_Boettche.mdl")
- convert_folder("data/")
|