path_info.py 3.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091
  1. from datetime import datetime
  2. import os
  3. import zipfile
  4. from pathlib import Path
  5. from os import path
  6. import psutil
  7. class PathInfo:
  8. root_dir = ''
  9. ignore = []
  10. file_list = []
  11. opened_files = {}
  12. def ignore_list(self):
  13. gitignore = self.root_dir + '\\.gitignore'
  14. if not path.exists(gitignore):
  15. pass
  16. with open(gitignore, 'r') as f:
  17. for line in f.readlines():
  18. line = line.strip().replace('/', '\\').lower()
  19. if line[:1] == '*':
  20. if line[-1] == '*':
  21. line = line[1:-1]
  22. else:
  23. line = line[1:] + '\\'
  24. else:
  25. line = '\\' + line + '\\'
  26. self.ignore.append(line)
  27. def ignored(self, filename):
  28. rel_filename = '\\' + str(filename).replace(self.root_dir, '').lower() + '\\'
  29. for e in self.ignore:
  30. if e in rel_filename:
  31. return True
  32. return False
  33. def check_dir(self, current_dir):
  34. if self.root_dir == '':
  35. self.root_dir = current_dir
  36. self.opened_files = self.process_handles()
  37. current_dir = Path(current_dir)
  38. for entry in current_dir.glob('*'):
  39. if entry.is_dir():
  40. self.check_dir(entry)
  41. elif not self.ignored(entry):
  42. self.file_list.append(self.file_info(entry))
  43. def file_info(self, filename: Path):
  44. st = filename.stat()
  45. readable = 'J' if os.access(filename, os.R_OK) else 'N'
  46. writable = 'J' if os.access(filename, os.W_OK) else 'N'
  47. handle = self.opened_files.get(str(filename), '')
  48. blocked = 'J' if handle != '' else 'N'
  49. file_info = [str(filename), str(st.st_size),
  50. datetime.fromtimestamp(st.st_ctime).isoformat(timespec='seconds'),
  51. datetime.fromtimestamp(st.st_mtime).isoformat(timespec='seconds'),
  52. readable, writable, blocked, handle]
  53. return file_info
  54. def write_logfile(self, logfile):
  55. with open(logfile, 'w') as fwh:
  56. infos = [';'.join(line) for line in self.file_list]
  57. fwh.write('name;size;ctime;mtime;read;write;blocked;process')
  58. fwh.write('\n'.join(infos))
  59. def zip_to_file(self, zip_file):
  60. with zipfile.ZipFile(zip_file, 'w', compression=zipfile.ZIP_DEFLATED, compresslevel=9) as zip:
  61. for e in self.backup_list:
  62. zip.write(e)
  63. def process_handles(self):
  64. files = {}
  65. for proc in psutil.process_iter():
  66. try:
  67. for item in proc.open_files():
  68. files[item.path] = proc.name()
  69. except Exception:
  70. pass
  71. return files
  72. if __name__ == '__main__':
  73. ti = PathInfo()
  74. ti.check_dir('C:\\GlobalCube')
  75. ti.write_logfile('C:\\GlobalCube\\Tasks\\logs\\path_info.csv')
  76. # print(backup_list[:10])
  77. # ti.zip_to_file('C:\\GAPS_Autosys\\Test.zip')