path_info.py 3.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. from datetime import datetime
  2. import os
  3. import zipfile
  4. from pathlib import Path
  5. from os import path
  6. import psutil
  7. class PathInfo:
  8. root_dir = ""
  9. ignore = []
  10. file_list = []
  11. opened_files = {}
  12. def ignore_list(self):
  13. gitignore = self.root_dir + "\\.gitignore"
  14. if not path.exists(gitignore):
  15. pass
  16. with open(gitignore, "r") as f:
  17. for line in f.readlines():
  18. line = line.strip().replace("/", "\\").lower()
  19. if line[:1] == "*":
  20. if line[-1] == "*":
  21. line = line[1:-1]
  22. else:
  23. line = line[1:] + "\\"
  24. else:
  25. line = "\\" + line + "\\"
  26. self.ignore.append(line)
  27. def ignored(self, filename):
  28. rel_filename = "\\" + str(filename).replace(self.root_dir, "").lower() + "\\"
  29. for e in self.ignore:
  30. if e in rel_filename:
  31. return True
  32. return False
  33. def check_dir(self, current_dir):
  34. if self.root_dir == "":
  35. self.root_dir = current_dir
  36. self.opened_files = self.process_handles()
  37. current_dir = Path(current_dir)
  38. for entry in current_dir.glob("*"):
  39. if entry.is_dir():
  40. if entry.name.lower() not in [".git", "python", "php"]:
  41. self.check_dir(entry)
  42. elif not self.ignored(entry):
  43. self.file_list.append(self.file_info(entry))
  44. def file_info(self, filename: Path):
  45. st = filename.stat()
  46. readable = "J" if os.access(filename, os.R_OK) else "N"
  47. writable = "J" if os.access(filename, os.W_OK) else "N"
  48. handle = self.opened_files.get(str(filename), "")
  49. blocked = "J" if handle != "" else "N"
  50. file_info = [
  51. str(filename),
  52. str(st.st_size),
  53. datetime.fromtimestamp(st.st_ctime).isoformat(timespec="seconds"),
  54. datetime.fromtimestamp(st.st_mtime).isoformat(timespec="seconds"),
  55. readable,
  56. writable,
  57. blocked,
  58. handle,
  59. ]
  60. return file_info
  61. def write_logfile(self, logfile):
  62. with open(logfile, "w") as fwh:
  63. infos = [";".join(line) for line in self.file_list]
  64. fwh.write("name;size;ctime;mtime;read;write;blocked;process\n")
  65. fwh.write("\n".join(infos))
  66. def zip_to_file(self, zip_file):
  67. with zipfile.ZipFile(zip_file, "w", compression=zipfile.ZIP_DEFLATED, compresslevel=9) as zip:
  68. for e in self.backup_list:
  69. zip.write(e)
  70. def process_handles(self):
  71. files = {}
  72. for proc in psutil.process_iter():
  73. try:
  74. for item in proc.open_files():
  75. files[item.path] = proc.name()
  76. except Exception:
  77. pass
  78. return files
  79. if __name__ == "__main__":
  80. ti = PathInfo()
  81. ti.check_dir("C:\\GlobalCube")
  82. ti.write_logfile("C:\\GlobalCube\\Tasks\\logs\\path_info.csv")
  83. # print(backup_list[:10])
  84. # ti.zip_to_file('C:\\GAPS_Autosys\\Test.zip')