path_info.py 3.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. from datetime import datetime
  2. import os
  3. import zipfile
  4. from pathlib import Path
  5. from os import path
  6. import psutil
  7. class PathInfo:
  8. root_dir = ""
  9. ignore = []
  10. file_list = []
  11. opened_files = {}
  12. def ignore_list(self):
  13. gitignore = self.root_dir + "\\.gitignore"
  14. if not path.exists(gitignore):
  15. pass
  16. with open(gitignore, "r") as f:
  17. for line in f.readlines():
  18. line = line.strip().replace("/", "\\").lower()
  19. if line[:1] == "*":
  20. if line[-1] == "*":
  21. line = line[1:-1]
  22. else:
  23. line = line[1:] + "\\"
  24. else:
  25. line = "\\" + line + "\\"
  26. self.ignore.append(line)
  27. def ignored(self, filename):
  28. rel_filename = "\\" + str(filename).replace(self.root_dir, "").lower() + "\\"
  29. for e in self.ignore:
  30. if e in rel_filename:
  31. return True
  32. return False
  33. def check_dir(self, current_dir):
  34. if self.root_dir == "":
  35. self.root_dir = current_dir
  36. self.opened_files = self.process_handles()
  37. current_dir = Path(current_dir)
  38. for entry in current_dir.glob("*"):
  39. if entry.is_dir():
  40. self.check_dir(entry)
  41. elif not self.ignored(entry):
  42. self.file_list.append(self.file_info(entry))
  43. def file_info(self, filename: Path):
  44. st = filename.stat()
  45. readable = "J" if os.access(filename, os.R_OK) else "N"
  46. writable = "J" if os.access(filename, os.W_OK) else "N"
  47. handle = self.opened_files.get(str(filename), "")
  48. blocked = "J" if handle != "" else "N"
  49. file_info = [
  50. str(filename),
  51. str(st.st_size),
  52. datetime.fromtimestamp(st.st_ctime).isoformat(timespec="seconds"),
  53. datetime.fromtimestamp(st.st_mtime).isoformat(timespec="seconds"),
  54. readable,
  55. writable,
  56. blocked,
  57. handle,
  58. ]
  59. return file_info
  60. def write_logfile(self, logfile):
  61. with open(logfile, "w") as fwh:
  62. infos = [";".join(line) for line in self.file_list]
  63. fwh.write("name;size;ctime;mtime;read;write;blocked;process\n")
  64. fwh.write("\n".join(infos))
  65. def zip_to_file(self, zip_file):
  66. with zipfile.ZipFile(zip_file, "w", compression=zipfile.ZIP_DEFLATED, compresslevel=9) as zip:
  67. for e in self.backup_list:
  68. zip.write(e)
  69. def process_handles(self):
  70. files = {}
  71. for proc in psutil.process_iter():
  72. try:
  73. for item in proc.open_files():
  74. files[item.path] = proc.name()
  75. except Exception:
  76. pass
  77. return files
  78. if __name__ == "__main__":
  79. ti = PathInfo()
  80. ti.check_dir("C:\\GlobalCube")
  81. ti.write_logfile("C:\\GlobalCube\\Tasks\\logs\\path_info.csv")
  82. # print(backup_list[:10])
  83. # ti.zip_to_file('C:\\GAPS_Autosys\\Test.zip')