1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677 |
- import re
- from dataclasses import dataclass
- from pathlib import Path
- from datetime import datetime
- @dataclass
- class BulkcopyResult:
- filename: str
- timestamp: datetime
- imported: int = 0
- exported: int = 0
- ignored: int = 0
- import_duration: float = 0.0
- export_duration: float = 0.0
- @property
- def missing(self) -> int:
- return self.exported - self.imported - self.ignored
- def to_csv(self):
- return (f"{self.filename};{self.timestamp.strftime('%d.%m.%Y %H:%M:%S')};"
- + f"{self.exported};{self.imported};{self.ignored};{self.missing};"
- + f"{self.export_duration};{self.import_duration}")
- def __str__(self) -> str:
- return "\n".join([f'Filename: {self.filename}',
- f"Last run: {self.timestamp.strftime('%d.%m.%Y %H:%M')}",
- '',
- f'Exported: {self.exported:>7}',
- f'Imported: {self.imported:>7}',
- f'Ignored: {self.ignored:>7}',
- f'Missing: {self.missing:>7}',
- '',
- f'Duration: {self.export_duration:>11} s',
- f' {self.import_duration:>11} s'])
- def check_logfiles(prefix, base_dir):
- ts = datetime.fromtimestamp(Path(f"{base_dir}/{prefix}.in.log").stat().st_mtime)
- result = BulkcopyResult(filename=prefix, timestamp=ts)
- with open(f"{base_dir}/{prefix}.in.log", 'r') as frh:
- result.ignored = len(frh.readlines())
- # info output of export
- with open(f"{base_dir}/{prefix}.bcp1.log", 'r', encoding='cp850', errors='ignore') as frh:
- raw_logs = frh.read()
- match = re.search(r'(\d+) Zeilen kopiert.', raw_logs)
- result.exported = int(match.group(1)) if match else 0
- match2 = re.search(r'Zeit .* gesamt: (\d+)', raw_logs)
- result.export_duration = int(match2.group(1)) / 1000 if match2 else 0
- # info output of import
- with open(f"{base_dir}/{prefix}.bcp2.log", 'r', encoding='cp850', errors='ignore') as frh:
- raw_logs = frh.read()
- match = re.search(r'(\d+) Zeilen kopiert.', raw_logs)
- result.imported = int(match.group(1)) if match else 0
- match2 = re.search(r'Zeit .* gesamt: (\d+)', raw_logs)
- result.import_duration = int(match2.group(1)) / 1000 if match2 else 0
- return result
- def check_directory(base_dir):
- res = []
- for filename in Path(base_dir).glob('*.bcp1.log'):
- stem = filename.name[:-9]
- res.append(check_logfiles(stem, base_dir).to_csv())
- with open(base_dir + '/info.log', 'w') as fwh:
- fwh.write('filename;timestamp;imported;exported;ignored;import_duration;export_duration\n')
- fwh.write('\n'.join(res))
- if __name__ == '__main__':
- check_directory('/home/robert/projekte/python/dbtools/SQL/temp')
- # check_logfiles('ORDER_LINE_1')
|