import json import os import re from pathlib import Path CONVERSION = [ "Name", "CognosSource", "CognosPackageDatasourceConnection", "DataSource", "OrgName", "Dimension", "Root", "Drill", "Levels", "Associations", "Category", "SpecialCategory", "MapDrills", "ViewName", "Measure", "Signon", "Cube", "CustomView", "CustomViewChildList", "SecurityNameSpace", "SecurityObject", "AllocationAdd", ] id_lookup = {} find_words = re.compile(r'("[^"]+"|\w+) ') def convert_block(block): block = block.replace("\n", "") block_type = block.split(" ")[0] words = find_words.findall(block) if len(words) < 3: return {"Type": block_type} result = {"Type": words[0], "ID": words[1], "Name": words[2].strip('"')} offset = 0 for i in range(3, len(words), 2): if len(words) < i + offset + 2: break key = words[i + offset] if key in ["PackageReportSource", "Database"]: result[key] = { "ID": words[i + offset + 1], "Name": words[i + offset + 2].strip('"'), } offset += 1 elif key in ["DimensionView"]: if key + "s" not in result: result[key + "s"] = [] result[key + "s"].append({"ID": words[i + offset + 1], "Name": words[i + offset + 2].strip('"')}) offset += 1 elif key in ["MeasureInclude"]: if key + "s" not in result: result[key + "s"] = [] result[key + "s"].append({"ID": words[i + offset + 1], "Include": words[i + offset + 2]}) offset += 1 elif key == "Calc": for j in range(i + offset + 1, len(words)): if words[j] in ["Sign", "Format", "Filtered"] or j == len(words) - 1: result["Calc"] = " ".join(words[i + offset + 1 : j]) offset = j - i - 1 break # for elif key == "EncryptedPW": result["EncryptedPW"] = words[i + offset + 1].strip('"') result["Salt"] = words[i + offset + 2].strip('"') offset += 1 elif key == "AllocationAdd": if key + "s" not in result: result[key + "s"] = [] result[key + "s"].append({"Measure": words[i + offset + 2], "Type": words[i + offset + 4]}) offset += 3 elif key in [ "CustomViewList", "DrillThrough", "DeployLocations", "PowerCubeCustomViewList", "StartList", "TransientLevelList", ]: for j in range(i + offset + 1, len(words)): if words[j] in ["EndList"]: result[key] = " ".join(words[i + offset + 1 : j]) offset = j - i - 1 break # for # elif words[i + offset].isnumeric() or words[i + offset].startswith('"'): # offset += 1 else: result[key] = words[i + offset + 1].strip('"') if block_type == "DataSource": result["Columns"] = [] if block_type in ["OrgName", "Levels", "Measure"]: result["Associations"] = [] if block_type == "Dimension": result["Root"] = {} result["Levels"] = [] result["Categories"] = [] result["SpecialCategories"] = [] if block_type == "Root": result["Drill"] = {} if block_type == "Associations": result["Parent"] = 0 if block_type == "CustomView": result["ChildList"] = {} if block_type == "SecurityNameSpace": result["Objects"] = [] return result def remove_ids(nested): nested.pop("ID", "") nested.pop("DateDrill", "") nested.pop("Primary", "") nested.pop("Lastuse", "") nested.pop("AssociationContext", "") if nested.get("Type", "") == "SpecialCategory" and "Label" in nested and "20" in nested["Label"]: nested.pop("Label", "") for col in ["Parent", "Levels", "CustomViewList"]: if col not in nested: continue if col == "Levels" and (isinstance(nested["Levels"], list) or nested["Levels"] == "0"): continue nested[col] = id_lookup.get(nested[col], {}).get("Name", "undefined") for child in nested.values(): if isinstance(child, dict): remove_ids(child) if isinstance(child, list): for entry in child: remove_ids(entry) return nested def prepare_mdl_str(mdl_str): mdl_str = re.sub(r"\n+", "\n", mdl_str) mdl_str = re.sub(r"^\n?Name ", "ModelName 1 ", mdl_str) mdl_str = re.sub(r'\nLevels (\d+ [^"])', r"Levels \1", mdl_str) mdl_str = re.sub(r" Associations ", " \nAssociations ", mdl_str) mdl_str = re.sub(r'([^ ])""', r"\1'", mdl_str) mdl_str = re.sub(r'""([^ ])', r"'\1", mdl_str) tags = "|".join(CONVERSION) mdl_str = re.sub(r"\n(" + tags + r") ", r"\n\n\1 ", mdl_str) return mdl_str def group_mdl_blocks(converted): result = { "Model": {}, "Connections": [], "DataSources": [], "Dimensions": [], "Measures": [], "Signons": [], "CustomViews": [], "Security": [], "Cubes": [], } types = [c["Type"] for c in converted] ids = [c.get("ID", "0") for c in converted] id_lookup.update(dict(zip(ids, converted))) current = None level_ids = [] for c, t in zip(converted, types): if t in [""]: continue if t in ["Category", "SpecialCategory"] and result["Dimensions"][-1]["Name"] == "Zeit": if t == "Category" or c["Name"][0].isnumeric(): continue if t in ["ModelName"]: result["Model"] = c elif t in ["CognosSource", "CognosPackageDatasourceConnection"]: result["Connections"].append(c) elif t in ["DataSource"]: result["DataSources"].append(c) elif t in ["OrgName"]: result["DataSources"][-1]["Columns"].append(c) elif t in ["Dimension"]: level_ids = [] result["Dimensions"].append(c) elif t in ["Root"]: result["Dimensions"][-1]["Root"] = c elif t in ["Drill"]: result["Dimensions"][-1]["Root"]["Drill"] = c elif t in ["Levels"]: current = c level_ids.append(c["ID"]) result["Dimensions"][-1]["Levels"].append(c) elif t in ["Category"]: if c.get("Levels", "") in level_ids[0:2]: result["Dimensions"][-1]["Categories"].append(c) elif t in ["SpecialCategory"]: result["Dimensions"][-1]["SpecialCategories"].append(c) elif t in ["Measure"]: current = c result["Measures"].append(c) elif t in ["Associations"]: c["Parent"] = current["ID"] current["Associations"].append(c) for ds in result["DataSources"]: for col in ds["Columns"]: if col["Column"] == c["AssociationReferenced"]: col["Associations"].append(c) elif t in ["Signon"]: result["Signons"].append(c) elif t in ["Cube"]: result["Cubes"].append(c) elif t in ["CustomView"]: result["CustomViews"].append(c) elif t in ["CustomViewChildList"]: for cv in result["CustomViews"]: if cv["ID"] == c["ID"]: cv["ChildList"] = c elif t in ["SecurityNameSpace"]: result["Security"].append(c) elif t in ["SecurityObject"]: result["Security"][-1]["Objects"].append(c) # else: # print(t, c) return result def build_query(datasource): table = datasource["Name"] # suffix = "_fm" if datasource["SourceType"] == "CognosSourceQuery" else "_imr" # table_name = f"[staging].[{table}{suffix}]" table_name = f"[export_csv].[{table}]" view_name = f"[load].[{table}]" columns = ",\n\t".join([extract_column(c) for c in datasource["Columns"]]) return f"CREATE\n\tOR\n\nALTER VIEW {view_name}\nAS\nSELECT {columns} \nFROM {table_name}\nGO\n\n" def extract_column(col): name = col["Name"] if "]." in name: name = name.split("].")[-1] alias = col["Column"] is_used = "" if len(col["Associations"]) > 0 else "--" return f"{is_used}[{name}] AS [{alias}]" def convert_file(filename: str) -> None: with open(filename, "r", encoding="latin-1") as frh: mdl_str = frh.read() mdl_str = prepare_mdl_str(mdl_str) mdl_blocks = mdl_str.split("\n\n") converted = [convert_block(b) for b in mdl_blocks] grouped = group_mdl_blocks(converted) with open(filename[:-4] + "_ori.json", "w") as fwh: json.dump(grouped, fwh, indent=2) # yaml.safe_dump(result, open(filename[:-4] + ".yaml", "w")) without_ids = remove_ids(grouped) with open(filename[:-4] + ".json", "w") as fwh: json.dump(without_ids, fwh, indent=2) queries = {ds["Name"]: build_query(ds) for ds in grouped["DataSources"]} with open(filename[:-4] + "_queries.sql", "w", encoding="latin-1") as fwh: fwh.writelines(queries.values()) base_dir = str(Path(filename).parent.parent / "SQL") os.makedirs(base_dir, exist_ok=True) model = Path(filename).stem for ds, query in queries.items(): with open(f"{base_dir}\\{ds}_{model}.sql", "w", encoding="latin-1") as fwh: fwh.write(query) cat_name_to_label = dict( [ (d["Name"] + "//" + c["Name"], c.get("Label", c.get("SourceValue", ""))) for d in grouped["Dimensions"] for c in d["Categories"] ] ) filename_ids = filename[:-4] + "_ids.json" if len(grouped["Cubes"]): cube_name = Path(grouped["Cubes"][0]["MdcFile"]).name filename_ids = str(Path(filename).parent / cube_name[:-4]) + "_ids.json" with open(filename_ids, "w") as fwh: json.dump(cat_name_to_label, fwh, indent=2) def convert_folder(base_dir: str) -> None: files = sorted([(f.stat().st_mtime, f) for f in Path(base_dir).rglob("*.mdl")]) for _, filename in files: convert_file(str(filename)) if __name__ == "__main__": # convert_file("data/S_Offene_Auftraege.mdl") # convert_file("data/F_Belege_SKR_SKR_Boettche.mdl") convert_folder("cognos7/data/mdl/")