|
@@ -19,7 +19,7 @@ class BulkcopyResult:
|
|
|
def missing(self) -> int:
|
|
|
return self.exported - self.imported - self.ignored
|
|
|
|
|
|
- def to_csv(self):
|
|
|
+ def to_csv(self) -> str:
|
|
|
return (
|
|
|
f"{self.file_name};{self.timestamp.strftime('%d.%m.%Y %H:%M:%S')};"
|
|
|
+ f"{self.exported};{self.imported};{self.ignored};{self.missing};"
|
|
@@ -51,7 +51,7 @@ def check_logfiles(prefix: str, base_dir: str) -> BulkcopyResult:
|
|
|
result = BulkcopyResult(file_name=prefix, timestamp=ts)
|
|
|
|
|
|
with open(f"{base_dir}/{prefix}.in.log", "r") as frh:
|
|
|
- result.ignored = len(frh.readlines()) / 2
|
|
|
+ result.ignored = len(frh.readlines()) // 2
|
|
|
|
|
|
# info output of export
|
|
|
with open(f"{base_dir}/{prefix}.bcp1.log", "r", encoding="cp850", errors="ignore") as frh:
|
|
@@ -71,7 +71,7 @@ def check_logfiles(prefix: str, base_dir: str) -> BulkcopyResult:
|
|
|
return result
|
|
|
|
|
|
|
|
|
-def rows_copied(raw_logs):
|
|
|
+def rows_copied(raw_logs: str) -> int:
|
|
|
match = re.search(r"(\d+) Zeilen kopiert.", raw_logs)
|
|
|
if match:
|
|
|
return int(match.group(1))
|
|
@@ -81,17 +81,17 @@ def rows_copied(raw_logs):
|
|
|
return -1
|
|
|
|
|
|
|
|
|
-def total_time(raw_logs):
|
|
|
+def total_time(raw_logs: str) -> float:
|
|
|
match = re.search(r"Zeit .* gesamt\s*: (\d+)", raw_logs)
|
|
|
if match:
|
|
|
return int(match.group(1)) / 1000
|
|
|
match = re.search(r"Clock Time .* Total\s*: (\d+)", raw_logs)
|
|
|
if match:
|
|
|
return int(match.group(1)) / 1000
|
|
|
- return 0
|
|
|
+ return 0.0
|
|
|
|
|
|
|
|
|
-def check_directory(base_dir, res=None):
|
|
|
+def check_directory(base_dir: str, res: list[BulkcopyResult] | None = None) -> list[BulkcopyResult]:
|
|
|
if res is None:
|
|
|
res = []
|
|
|
for folder in Path(base_dir).glob("*"):
|
|
@@ -105,14 +105,14 @@ def check_directory(base_dir, res=None):
|
|
|
return res
|
|
|
|
|
|
|
|
|
-def export_log_csv(res, output_file):
|
|
|
+def export_log_csv(res: list[BulkcopyResult], output_file: str):
|
|
|
with open(output_file, "w") as fwh:
|
|
|
fwh.write("filename;timestamp;imported;exported;ignored;missing;import_duration;export_duration;file_size\n")
|
|
|
for log in res:
|
|
|
fwh.write(log.to_csv() + "\n")
|
|
|
|
|
|
|
|
|
-def bcp_log(logs_dir, output_file):
|
|
|
+def bcp_log(logs_dir: str, output_file: str):
|
|
|
res = check_directory(logs_dir)
|
|
|
export_log_csv(res, output_file)
|
|
|
|