pydeskband.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408
  1. import contextlib
  2. import enum
  3. import os
  4. import pathlib
  5. import sys
  6. import time
  7. from dataclasses import dataclass
  8. from threading import Thread, Event
  9. from typing import Union, TypeVar
  10. @dataclass
  11. class Size:
  12. ''' A Python-version of the SIZE struct in WinApi '''
  13. x: int
  14. y: int
  15. @dataclass
  16. class Color:
  17. ''' Representation of an RGB color '''
  18. red: int
  19. green: int
  20. blue: int
  21. class _LogTailer(Thread):
  22. ''' A Thread that can follow/print new lines in the pydeskband log file (which is written by the DLL) '''
  23. LOG_PATH = pathlib.Path(os.path.expandvars('%TEMP%/pydeskband.log'))
  24. def __init__(self):
  25. self.exit_event = Event()
  26. if not _LogTailer.LOG_PATH.is_file():
  27. raise FileNotFoundError("PyDeskband log was not found")
  28. self.starting_offset = _LogTailer.LOG_PATH.stat().st_size
  29. Thread.__init__(self, daemon=True)
  30. def run(self):
  31. ''' Ran in the other thread. Will effectively 'tail -f' the log file and print to stderr the new lines '''
  32. try:
  33. with open(_LogTailer.LOG_PATH, 'rb') as log_file:
  34. log_file.seek(self.starting_offset)
  35. while not self.exit_event.is_set():
  36. line = log_file.readline().rstrip().decode()
  37. if line:
  38. print(line, file=sys.stderr)
  39. time.sleep(.01)
  40. except KeyboardInterrupt:
  41. pass
  42. class ControlPipe:
  43. ''' The mechanism for controlling PyDeskband.'''
  44. def __init__(self):
  45. ''' Note that this may raise if PyDeskband is not in use '''
  46. try:
  47. self.pipe = open('\\\\.\\pipe\\PyDeskbandControlPipe', 'r+b', buffering=0)
  48. except FileNotFoundError as ex:
  49. raise FileNotFoundError(f"The PyDeskbandControlPipe is not available. Is the deskband enabled?.. {str(ex)}")
  50. self._log_tailer = None
  51. def __enter__(self):
  52. ''' For use as a contextmanager '''
  53. return self
  54. def __exit__(self, type, value, traceback):
  55. ''' For use as a contextmanager... Closes the handle to the pipe '''
  56. self.pipe.close()
  57. def send_command(self, cmd:Union[list, tuple, str], check_ok:bool=True) -> list:
  58. '''
  59. The main entry point to go from Python to/from the C++ code. It is very unlikely that a regular user
  60. would want to call this directly. If something is done incorrectly here, PyDeskband will likely crash...
  61. and that will lead to Windows Explorer restarting.
  62. Arguments:
  63. cmd: Either a list of command keywords or a string of a full command
  64. check_ok: If True, raise ValueError if C++ does not give back "OK" as the return status.
  65. If set, will remove OK from the return list.
  66. Returns:
  67. A list of return fields.
  68. '''
  69. if isinstance(cmd, (list, tuple)):
  70. cmd = ','.join([str(c) for c in cmd])
  71. cmd = cmd.encode()
  72. bytes_written = self.pipe.write(cmd)
  73. if bytes_written != len(cmd):
  74. raise RuntimeError(f"Unable to write all the bytes down the pipe. Wrote: {bytes_written} instead of {len(cmd)}")
  75. response = self.pipe.readline().strip().decode().split(',')
  76. if not response:
  77. raise ValueError("Response was empty.")
  78. if check_ok:
  79. if response[0] != 'OK':
  80. raise ValueError(f"Response was not OK. It was: {response[0]}")
  81. response = response[1:]
  82. return response
  83. def get_width(self) -> int:
  84. ''' Get the current width (in pixels) of the deskband '''
  85. return int(self.send_command(['GET', 'WIDTH'])[0])
  86. def get_height(self) -> int:
  87. ''' Get the current height (in pixels) of the deskband '''
  88. return int(self.send_command(['GET', 'HEIGHT'])[0])
  89. def get_text_info_count(self) -> int:
  90. ''' Get the count of TextInfos currently saved '''
  91. return int(self.send_command(['GET', 'TEXTINFOCOUNT'])[0])
  92. def add_new_text_info(self, text:str, x:int=0, y:int=0, red:int=255, green:int=255, blue:int=255) -> None:
  93. ''' Creates a new TextInfo with the given text,x/y, and rgb text color '''
  94. self.send_command('NEW_TEXTINFO')
  95. self._set_color(red, green, blue)
  96. self._set_coordinates(x, y)
  97. self._set_text(text)
  98. idx = (self.get_text_info_count() - 1)
  99. return TextInfo(self, idx)
  100. def get_text_size(self, text:str) -> Size:
  101. ''' Gets a Size object corresponding with the x,y size this text would be (likely in pixels) '''
  102. x, y = self.send_command([
  103. 'GET', 'TEXTSIZE', self._verify_input_text(text)
  104. ])[:2]
  105. return Size(int(x), int(y))
  106. def paint(self) -> None:
  107. ''' Requests that PyDeskband repaint all TextInfos now '''
  108. self.send_command('PAINT')
  109. def clear(self, reset_target_textinfo:bool=True) -> None:
  110. '''
  111. Clears all TextInfos and re-paints.
  112. If reset_target_textinfo is set, will also reset the current TextInfo target.
  113. '''
  114. self.send_command('CLEAR')
  115. self._set_textinfo_target()
  116. def set_logging(self, enabled:bool, tail:bool=False) -> None:
  117. '''
  118. Enables/disables logging in the C++ module. Logging goes to %TEMP%/pydeskband.log.
  119. If tail is True, will attempt to tail the output to stderr in Python.
  120. '''
  121. self.send_command([
  122. 'SET', 'LOGGING_ENABLED', 1 if enabled else 0
  123. ])
  124. def _stop_log_tailer():
  125. if self._log_tailer:
  126. self._log_tailer.exit_event.set()
  127. self._log_tailer.join()
  128. self._log_tailer = None
  129. if tail:
  130. _stop_log_tailer()
  131. self._log_tailer = _LogTailer()
  132. self._log_tailer.start()
  133. else:
  134. _stop_log_tailer()
  135. def get_transport_version(self) -> int:
  136. '''
  137. Gets the current transport version from the DLL.
  138. '''
  139. return int(self.send_command([
  140. 'GET', 'TRANSPORT_VERSION'
  141. ])[0])
  142. def set_windows_message_handle_shell_cmd(self, msg_id:int, shell_cmd:str=None) -> None:
  143. ''' Tell PyDeskband that if msg_id is sent to the form, run this shell command. If shell_cmd is None, clear existing handling of the msg_id. '''
  144. if shell_cmd is not None:
  145. return self.send_command([
  146. 'SET', 'WIN_MSG', msg_id, self._verify_input_text(shell_cmd)
  147. ])
  148. else:
  149. return self.send_command([
  150. 'SET', 'WIN_MSG', msg_id
  151. ])
  152. def _send_message(self, msg:int) -> None:
  153. ''' Likely only useful for debugging. Send a WM_... message with the given id to our hwnd.'''
  154. self.send_command([
  155. 'SENDMESSAGE', str(msg)
  156. ])
  157. def _verify_input_text(self, text) -> str:
  158. ''' Helper function. Verifies that the delimiter is not in the given text. Returns the text if not found. Otherwise raises. '''
  159. if ',' in text:
  160. raise ValueError(f"text cannot contain a ',' sign. Text: {text}")
  161. return text
  162. def _set_text(self, text:str) -> str:
  163. ''' Call to SET TEXT in the DLL '''
  164. return self.send_command([
  165. 'SET', 'TEXT', self._verify_input_text(text)
  166. ])
  167. def _set_color(self, red:int=255, green:int=255, blue:int=255) -> str:
  168. ''' Call to SET RGB in the DLL '''
  169. return self.send_command([
  170. 'SET', 'RGB', red, green, blue
  171. ])
  172. def _set_coordinates(self, x:int=0, y:int=0) -> str:
  173. ''' Call to SET XY in the DLL '''
  174. if x < 0:
  175. raise ValueError(f"x cannot be less than 0. It was set to: {x}")
  176. if y < 0:
  177. raise ValueError(f"y cannot be less than 0. It was set to: {y}")
  178. return self.send_command([
  179. 'SET', 'XY', x, y
  180. ])
  181. def _set_textinfo_target(self, idx:Union[int, None]=None) -> str:
  182. ''' Call to SET TEXTINFO_TARGET in the DLL. Passing an index of None will lead to the last TextInfo being targeted '''
  183. if idx is None:
  184. return self.send_command(["SET", "TEXTINFO_TARGET"])
  185. else:
  186. return self.send_command(["SET", "TEXTINFO_TARGET", str(idx)])
  187. def _get_text(self) -> str:
  188. ''' Call to GET TEXT in the DLL '''
  189. return self.send_command(["GET", "TEXT"])[0]
  190. def _get_color(self) -> Color:
  191. ''' Call to GET RGB in the DLL '''
  192. r, g, b = self.send_command(["GET", "RGB"])[:3]
  193. return Color(int(r), int(g), int(b))
  194. def _get_coordinates(self) -> Size:
  195. ''' Call to GET XY in the DLL '''
  196. x, y = self.send_command(["GET", "XY"])[:2]
  197. return Size(int(x), int(y))
  198. def _get_textinfo_target(self) -> Union[int, None]:
  199. ''' Call to GET TEXTINFO_TARGET in the DLL. A return of None, means that the current target is the last TextInfo.'''
  200. # Cheap use of eval. It can be 'None' or an int.
  201. return eval(self.send_command(["GET", "TEXTINFO_TARGET"])[0])
  202. def _test(self, sleep_time:int=1):
  203. ''' a test... :) '''
  204. import psutil, time
  205. def get_mbps_down():
  206. last_timestamp = getattr(get_mbps_down, 'last_timestamp', time.time())
  207. last_bytes = getattr(get_mbps_down, 'last_bytes', 0)
  208. get_mbps_down.last_bytes = psutil.net_io_counters().bytes_recv
  209. now = time.time()
  210. mbps = (get_mbps_down.last_bytes - float(last_bytes)) / 125000.0 / max(.0000001, now - last_timestamp)
  211. get_mbps_down.last_timestamp = now
  212. return mbps
  213. def get_mbps_up():
  214. last_timestamp = getattr(get_mbps_up, 'last_timestamp', time.time())
  215. last_bytes = getattr(get_mbps_up, 'last_bytes', 0)
  216. get_mbps_up.last_bytes = psutil.net_io_counters().bytes_sent
  217. now = time.time()
  218. mbps = (get_mbps_up.last_bytes - float(last_bytes)) / 125000.0 / max(.0000001, now - last_timestamp)
  219. get_mbps_up.last_timestamp = now
  220. return mbps
  221. def get_cpu_used_percent():
  222. return psutil.cpu_percent()
  223. self.clear()
  224. # Left click: Open task manager
  225. self.set_windows_message_handle_shell_cmd(0x0201, r'start C:\Windows\System32\Taskmgr.exe')
  226. cpuTextInfo = self.add_new_text_info('CPU:')
  227. cpuValueTextInfo = self.add_new_text_info('0%')
  228. netDownTextInfo = self.add_new_text_info('')
  229. netDownTextInfo.justify_this_with_respect_to_that(cpuTextInfo, Justification.BELOW)
  230. while True:
  231. cpu = get_cpu_used_percent()
  232. if cpu < 40:
  233. cpuValueTextInfo.set_color(0, 250, 0)
  234. elif cpu < 80:
  235. cpuValueTextInfo.set_color(0, 250, 150)
  236. elif cpu < 90:
  237. cpuValueTextInfo.set_color(252, 148, 57)
  238. else:
  239. cpuValueTextInfo.set_color(250, 0, 0)
  240. cpuValueTextInfo.set_text(f'{cpu:.02f}%')
  241. cpuValueTextInfo.justify_this_with_respect_to_that(cpuTextInfo, Justification.RIGHT_OF)
  242. netDownTextInfo.set_text(f'Net: {get_mbps_down():.02f}/{get_mbps_up():.02f} Mbps')
  243. self.paint()
  244. time.sleep(sleep_time)
  245. class Justification(enum.Enum):
  246. LEFT_OF = 'Left of'
  247. RIGHT_OF = 'Right of'
  248. BELOW = 'Below'
  249. ABOVE = 'Above'
  250. class TextInfo:
  251. '''
  252. Represents a reference to a TextInfo object in the DLL.
  253. A TextInfo is a specific line/piece of text with a specific X/Y, RGB color, and text.
  254. '''
  255. def __init__(self, control_pipe:ControlPipe, idx:int):
  256. self.controlPipe = control_pipe
  257. self._idx = idx
  258. @contextlib.contextmanager
  259. def targeting_this_textinfo(self):
  260. previous_target = self.controlPipe._get_textinfo_target()
  261. self.controlPipe._set_textinfo_target(self._idx)
  262. try:
  263. yield
  264. finally:
  265. self.controlPipe._set_textinfo_target(previous_target)
  266. def set_text(self, text:str) -> None:
  267. ''' Sets the text of this TextInfo '''
  268. with self.targeting_this_textinfo():
  269. self.controlPipe._set_text(text)
  270. def set_color(self, red:int=255, green:int=255, blue:int=255) -> None:
  271. ''' Sets the color of this TextInfo '''
  272. with self.targeting_this_textinfo():
  273. self.controlPipe._set_color(red, green, blue)
  274. def set_coordinates(self, x:int=0, y:int=0) -> None:
  275. ''' Sets the X/Y coordinates of this TextInfo '''
  276. with self.targeting_this_textinfo():
  277. self.controlPipe._set_coordinates(x, y)
  278. def get_text(self) -> str:
  279. ''' Gets the text of this TextInfo '''
  280. with self.targeting_this_textinfo():
  281. return self.controlPipe._get_text()
  282. def get_color(self) -> Color:
  283. ''' Gets the color of this TextInfo '''
  284. with self.targeting_this_textinfo():
  285. return self.controlPipe._get_color()
  286. def get_coordinates(self) -> Size:
  287. ''' Gets the X/Y coordinates of this TextInfo '''
  288. with self.targeting_this_textinfo():
  289. return self.controlPipe._get_coordinates()
  290. def get_text_size(self) -> Size:
  291. ''' Gets the pixel size of the text within this TextInfo '''
  292. text = self.get_text()
  293. return self.controlPipe.get_text_size(text)
  294. def justify_this_with_respect_to_that(self, that_text_info:TypeVar('TextInfo'), justify:Justification=Justification.RIGHT_OF, gap:Union[None, int]=None):
  295. '''
  296. Put this TextInfo to the <justify> of that TextInfo.
  297. Gap is the distance in pixels between text_infos. If it is None, will be the size of a space (' ') character.
  298. Only self (this) moves. that_text_info does not move.
  299. Note that if a coordinate is calculated to be negative (in order to be in the correct spot) it will be set to 0.
  300. '''
  301. if gap is None:
  302. gap = self.controlPipe.get_text_size(' ').x
  303. that_coordinates = that_text_info.get_coordinates()
  304. # that_text_info DOES NOT move. Only self (this) does.
  305. # Right now things can wind up out of bounds in postive x and y directions.
  306. # We could fix that later if desired.
  307. if justify == Justification.RIGHT_OF:
  308. # THIS THAT
  309. that_text_size = that_text_info.get_text_size()
  310. new_x = that_coordinates.x + that_text_size.x + gap
  311. self.set_coordinates(max(0, new_x), that_coordinates.y)
  312. elif justify == Justification.LEFT_OF:
  313. # THAT THIS
  314. this_text_size = self.get_text_size()
  315. new_x = that_coordinates.x - this_text_size.x - gap
  316. self.set_coordinates(max(0, new_x), that_coordinates.y)
  317. elif justify == Justification.BELOW:
  318. # THIS
  319. # THAT
  320. that_text_size = that_text_info.get_text_size()
  321. new_y = that_coordinates.y + that_text_size.y + gap
  322. self.set_coordinates(that_coordinates.x, max(0, new_y))
  323. elif justify == Justification.ABOVE:
  324. # THAT
  325. # THIS
  326. this_text_size = self.get_text_size()
  327. new_y = that_coordinates.y - this_text_size.y - gap
  328. self.set_coordinates(that_coordinates.x, max(0, new_y))
  329. else:
  330. raise ValueError("justify must be defined in the Justification enum")