|
@@ -0,0 +1,408 @@
|
|
|
+import contextlib
|
|
|
+import enum
|
|
|
+import os
|
|
|
+import pathlib
|
|
|
+import sys
|
|
|
+import time
|
|
|
+
|
|
|
+from dataclasses import dataclass
|
|
|
+from threading import Thread, Event
|
|
|
+from typing import Union, TypeVar
|
|
|
+
|
|
|
+@dataclass
|
|
|
+class Size:
|
|
|
+ ''' A Python-version of the SIZE struct in WinApi '''
|
|
|
+ x: int
|
|
|
+ y: int
|
|
|
+
|
|
|
+@dataclass
|
|
|
+class Color:
|
|
|
+ ''' Representation of an RGB color '''
|
|
|
+ red: int
|
|
|
+ green: int
|
|
|
+ blue: int
|
|
|
+
|
|
|
+class _LogTailer(Thread):
|
|
|
+ ''' A Thread that can follow/print new lines in the pydeskband log file (which is written by the DLL) '''
|
|
|
+ LOG_PATH = pathlib.Path(os.path.expandvars('%TEMP%/pydeskband.log'))
|
|
|
+ def __init__(self):
|
|
|
+ self.exit_event = Event()
|
|
|
+
|
|
|
+ if not _LogTailer.LOG_PATH.is_file():
|
|
|
+ raise FileNotFoundError("PyDeskband log was not found")
|
|
|
+
|
|
|
+ self.starting_offset = _LogTailer.LOG_PATH.stat().st_size
|
|
|
+
|
|
|
+ Thread.__init__(self, daemon=True)
|
|
|
+
|
|
|
+ def run(self):
|
|
|
+ ''' Ran in the other thread. Will effectively 'tail -f' the log file and print to stderr the new lines '''
|
|
|
+ try:
|
|
|
+ with open(_LogTailer.LOG_PATH, 'rb') as log_file:
|
|
|
+ log_file.seek(self.starting_offset)
|
|
|
+ while not self.exit_event.is_set():
|
|
|
+ line = log_file.readline().rstrip().decode()
|
|
|
+ if line:
|
|
|
+ print(line, file=sys.stderr)
|
|
|
+ time.sleep(.01)
|
|
|
+ except KeyboardInterrupt:
|
|
|
+ pass
|
|
|
+
|
|
|
+class ControlPipe:
|
|
|
+ ''' The mechanism for controlling PyDeskband.'''
|
|
|
+ def __init__(self):
|
|
|
+ ''' Note that this may raise if PyDeskband is not in use '''
|
|
|
+ try:
|
|
|
+ self.pipe = open('\\\\.\\pipe\\PyDeskbandControlPipe', 'r+b', buffering=0)
|
|
|
+ except FileNotFoundError as ex:
|
|
|
+ raise FileNotFoundError(f"The PyDeskbandControlPipe is not available. Is the deskband enabled?.. {str(ex)}")
|
|
|
+ self._log_tailer = None
|
|
|
+
|
|
|
+ def __enter__(self):
|
|
|
+ ''' For use as a contextmanager '''
|
|
|
+ return self
|
|
|
+
|
|
|
+ def __exit__(self, type, value, traceback):
|
|
|
+ ''' For use as a contextmanager... Closes the handle to the pipe '''
|
|
|
+ self.pipe.close()
|
|
|
+
|
|
|
+ def send_command(self, cmd:Union[list, tuple, str], check_ok:bool=True) -> list:
|
|
|
+ '''
|
|
|
+ The main entry point to go from Python to/from the C++ code. It is very unlikely that a regular user
|
|
|
+ would want to call this directly. If something is done incorrectly here, PyDeskband will likely crash...
|
|
|
+ and that will lead to Windows Explorer restarting.
|
|
|
+
|
|
|
+ Arguments:
|
|
|
+ cmd: Either a list of command keywords or a string of a full command
|
|
|
+ check_ok: If True, raise ValueError if C++ does not give back "OK" as the return status.
|
|
|
+ If set, will remove OK from the return list.
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ A list of return fields.
|
|
|
+ '''
|
|
|
+ if isinstance(cmd, (list, tuple)):
|
|
|
+ cmd = ','.join([str(c) for c in cmd])
|
|
|
+
|
|
|
+ cmd = cmd.encode()
|
|
|
+
|
|
|
+ bytes_written = self.pipe.write(cmd)
|
|
|
+ if bytes_written != len(cmd):
|
|
|
+ raise RuntimeError(f"Unable to write all the bytes down the pipe. Wrote: {bytes_written} instead of {len(cmd)}")
|
|
|
+
|
|
|
+ response = self.pipe.readline().strip().decode().split(',')
|
|
|
+
|
|
|
+ if not response:
|
|
|
+ raise ValueError("Response was empty.")
|
|
|
+
|
|
|
+ if check_ok:
|
|
|
+ if response[0] != 'OK':
|
|
|
+ raise ValueError(f"Response was not OK. It was: {response[0]}")
|
|
|
+ response = response[1:]
|
|
|
+
|
|
|
+ return response
|
|
|
+
|
|
|
+ def get_width(self) -> int:
|
|
|
+ ''' Get the current width (in pixels) of the deskband '''
|
|
|
+ return int(self.send_command(['GET', 'WIDTH'])[0])
|
|
|
+
|
|
|
+ def get_height(self) -> int:
|
|
|
+ ''' Get the current height (in pixels) of the deskband '''
|
|
|
+ return int(self.send_command(['GET', 'HEIGHT'])[0])
|
|
|
+
|
|
|
+ def get_text_info_count(self) -> int:
|
|
|
+ ''' Get the count of TextInfos currently saved '''
|
|
|
+ return int(self.send_command(['GET', 'TEXTINFOCOUNT'])[0])
|
|
|
+
|
|
|
+ def add_new_text_info(self, text:str, x:int=0, y:int=0, red:int=255, green:int=255, blue:int=255) -> None:
|
|
|
+ ''' Creates a new TextInfo with the given text,x/y, and rgb text color '''
|
|
|
+ self.send_command('NEW_TEXTINFO')
|
|
|
+ self._set_color(red, green, blue)
|
|
|
+ self._set_coordinates(x, y)
|
|
|
+ self._set_text(text)
|
|
|
+ idx = (self.get_text_info_count() - 1)
|
|
|
+ return TextInfo(self, idx)
|
|
|
+
|
|
|
+ def get_text_size(self, text:str) -> Size:
|
|
|
+ ''' Gets a Size object corresponding with the x,y size this text would be (likely in pixels) '''
|
|
|
+ x, y = self.send_command([
|
|
|
+ 'GET', 'TEXTSIZE', self._verify_input_text(text)
|
|
|
+ ])[:2]
|
|
|
+ return Size(int(x), int(y))
|
|
|
+
|
|
|
+ def paint(self) -> None:
|
|
|
+ ''' Requests that PyDeskband repaint all TextInfos now '''
|
|
|
+ self.send_command('PAINT')
|
|
|
+
|
|
|
+ def clear(self, reset_target_textinfo:bool=True) -> None:
|
|
|
+ '''
|
|
|
+ Clears all TextInfos and re-paints.
|
|
|
+
|
|
|
+ If reset_target_textinfo is set, will also reset the current TextInfo target.
|
|
|
+ '''
|
|
|
+ self.send_command('CLEAR')
|
|
|
+ self._set_textinfo_target()
|
|
|
+
|
|
|
+ def set_logging(self, enabled:bool, tail:bool=False) -> None:
|
|
|
+ '''
|
|
|
+ Enables/disables logging in the C++ module. Logging goes to %TEMP%/pydeskband.log.
|
|
|
+ If tail is True, will attempt to tail the output to stderr in Python.
|
|
|
+ '''
|
|
|
+ self.send_command([
|
|
|
+ 'SET', 'LOGGING_ENABLED', 1 if enabled else 0
|
|
|
+ ])
|
|
|
+
|
|
|
+ def _stop_log_tailer():
|
|
|
+ if self._log_tailer:
|
|
|
+ self._log_tailer.exit_event.set()
|
|
|
+ self._log_tailer.join()
|
|
|
+ self._log_tailer = None
|
|
|
+
|
|
|
+ if tail:
|
|
|
+ _stop_log_tailer()
|
|
|
+ self._log_tailer = _LogTailer()
|
|
|
+ self._log_tailer.start()
|
|
|
+ else:
|
|
|
+ _stop_log_tailer()
|
|
|
+
|
|
|
+ def get_transport_version(self) -> int:
|
|
|
+ '''
|
|
|
+ Gets the current transport version from the DLL.
|
|
|
+ '''
|
|
|
+ return int(self.send_command([
|
|
|
+ 'GET', 'TRANSPORT_VERSION'
|
|
|
+ ])[0])
|
|
|
+
|
|
|
+ def set_windows_message_handle_shell_cmd(self, msg_id:int, shell_cmd:str=None) -> None:
|
|
|
+ ''' Tell PyDeskband that if msg_id is sent to the form, run this shell command. If shell_cmd is None, clear existing handling of the msg_id. '''
|
|
|
+ if shell_cmd is not None:
|
|
|
+ return self.send_command([
|
|
|
+ 'SET', 'WIN_MSG', msg_id, self._verify_input_text(shell_cmd)
|
|
|
+ ])
|
|
|
+ else:
|
|
|
+ return self.send_command([
|
|
|
+ 'SET', 'WIN_MSG', msg_id
|
|
|
+ ])
|
|
|
+
|
|
|
+ def _send_message(self, msg:int) -> None:
|
|
|
+ ''' Likely only useful for debugging. Send a WM_... message with the given id to our hwnd.'''
|
|
|
+ self.send_command([
|
|
|
+ 'SENDMESSAGE', str(msg)
|
|
|
+ ])
|
|
|
+
|
|
|
+ def _verify_input_text(self, text) -> str:
|
|
|
+ ''' Helper function. Verifies that the delimiter is not in the given text. Returns the text if not found. Otherwise raises. '''
|
|
|
+ if ',' in text:
|
|
|
+ raise ValueError(f"text cannot contain a ',' sign. Text: {text}")
|
|
|
+ return text
|
|
|
+
|
|
|
+ def _set_text(self, text:str) -> str:
|
|
|
+ ''' Call to SET TEXT in the DLL '''
|
|
|
+ return self.send_command([
|
|
|
+ 'SET', 'TEXT', self._verify_input_text(text)
|
|
|
+ ])
|
|
|
+
|
|
|
+ def _set_color(self, red:int=255, green:int=255, blue:int=255) -> str:
|
|
|
+ ''' Call to SET RGB in the DLL '''
|
|
|
+ return self.send_command([
|
|
|
+ 'SET', 'RGB', red, green, blue
|
|
|
+ ])
|
|
|
+
|
|
|
+ def _set_coordinates(self, x:int=0, y:int=0) -> str:
|
|
|
+ ''' Call to SET XY in the DLL '''
|
|
|
+ if x < 0:
|
|
|
+ raise ValueError(f"x cannot be less than 0. It was set to: {x}")
|
|
|
+ if y < 0:
|
|
|
+ raise ValueError(f"y cannot be less than 0. It was set to: {y}")
|
|
|
+
|
|
|
+ return self.send_command([
|
|
|
+ 'SET', 'XY', x, y
|
|
|
+ ])
|
|
|
+
|
|
|
+ def _set_textinfo_target(self, idx:Union[int, None]=None) -> str:
|
|
|
+ ''' Call to SET TEXTINFO_TARGET in the DLL. Passing an index of None will lead to the last TextInfo being targeted '''
|
|
|
+ if idx is None:
|
|
|
+ return self.send_command(["SET", "TEXTINFO_TARGET"])
|
|
|
+ else:
|
|
|
+ return self.send_command(["SET", "TEXTINFO_TARGET", str(idx)])
|
|
|
+
|
|
|
+ def _get_text(self) -> str:
|
|
|
+ ''' Call to GET TEXT in the DLL '''
|
|
|
+ return self.send_command(["GET", "TEXT"])[0]
|
|
|
+
|
|
|
+ def _get_color(self) -> Color:
|
|
|
+ ''' Call to GET RGB in the DLL '''
|
|
|
+ r, g, b = self.send_command(["GET", "RGB"])[:3]
|
|
|
+ return Color(int(r), int(g), int(b))
|
|
|
+
|
|
|
+ def _get_coordinates(self) -> Size:
|
|
|
+ ''' Call to GET XY in the DLL '''
|
|
|
+ x, y = self.send_command(["GET", "XY"])[:2]
|
|
|
+ return Size(int(x), int(y))
|
|
|
+
|
|
|
+ def _get_textinfo_target(self) -> Union[int, None]:
|
|
|
+ ''' Call to GET TEXTINFO_TARGET in the DLL. A return of None, means that the current target is the last TextInfo.'''
|
|
|
+ # Cheap use of eval. It can be 'None' or an int.
|
|
|
+ return eval(self.send_command(["GET", "TEXTINFO_TARGET"])[0])
|
|
|
+
|
|
|
+ def _test(self, sleep_time:int=1):
|
|
|
+ ''' a test... :) '''
|
|
|
+ import psutil, time
|
|
|
+
|
|
|
+ def get_mbps_down():
|
|
|
+ last_timestamp = getattr(get_mbps_down, 'last_timestamp', time.time())
|
|
|
+ last_bytes = getattr(get_mbps_down, 'last_bytes', 0)
|
|
|
+
|
|
|
+ get_mbps_down.last_bytes = psutil.net_io_counters().bytes_recv
|
|
|
+
|
|
|
+ now = time.time()
|
|
|
+ mbps = (get_mbps_down.last_bytes - float(last_bytes)) / 125000.0 / max(.0000001, now - last_timestamp)
|
|
|
+ get_mbps_down.last_timestamp = now
|
|
|
+ return mbps
|
|
|
+
|
|
|
+ def get_mbps_up():
|
|
|
+ last_timestamp = getattr(get_mbps_up, 'last_timestamp', time.time())
|
|
|
+ last_bytes = getattr(get_mbps_up, 'last_bytes', 0)
|
|
|
+
|
|
|
+ get_mbps_up.last_bytes = psutil.net_io_counters().bytes_sent
|
|
|
+
|
|
|
+ now = time.time()
|
|
|
+ mbps = (get_mbps_up.last_bytes - float(last_bytes)) / 125000.0 / max(.0000001, now - last_timestamp)
|
|
|
+ get_mbps_up.last_timestamp = now
|
|
|
+ return mbps
|
|
|
+
|
|
|
+ def get_cpu_used_percent():
|
|
|
+ return psutil.cpu_percent()
|
|
|
+
|
|
|
+ self.clear()
|
|
|
+
|
|
|
+ # Left click: Open task manager
|
|
|
+ self.set_windows_message_handle_shell_cmd(0x0201, r'start C:\Windows\System32\Taskmgr.exe')
|
|
|
+ cpuTextInfo = self.add_new_text_info('CPU:')
|
|
|
+ cpuValueTextInfo = self.add_new_text_info('0%')
|
|
|
+ netDownTextInfo = self.add_new_text_info('')
|
|
|
+ netDownTextInfo.justify_this_with_respect_to_that(cpuTextInfo, Justification.BELOW)
|
|
|
+
|
|
|
+ while True:
|
|
|
+ cpu = get_cpu_used_percent()
|
|
|
+ if cpu < 40:
|
|
|
+ cpuValueTextInfo.set_color(0, 250, 0)
|
|
|
+ elif cpu < 80:
|
|
|
+ cpuValueTextInfo.set_color(0, 250, 150)
|
|
|
+ elif cpu < 90:
|
|
|
+ cpuValueTextInfo.set_color(252, 148, 57)
|
|
|
+ else:
|
|
|
+ cpuValueTextInfo.set_color(250, 0, 0)
|
|
|
+
|
|
|
+ cpuValueTextInfo.set_text(f'{cpu:.02f}%')
|
|
|
+ cpuValueTextInfo.justify_this_with_respect_to_that(cpuTextInfo, Justification.RIGHT_OF)
|
|
|
+ netDownTextInfo.set_text(f'Net: {get_mbps_down():.02f}/{get_mbps_up():.02f} Mbps')
|
|
|
+ self.paint()
|
|
|
+ time.sleep(sleep_time)
|
|
|
+
|
|
|
+class Justification(enum.Enum):
|
|
|
+ LEFT_OF = 'Left of'
|
|
|
+ RIGHT_OF = 'Right of'
|
|
|
+ BELOW = 'Below'
|
|
|
+ ABOVE = 'Above'
|
|
|
+
|
|
|
+class TextInfo:
|
|
|
+ '''
|
|
|
+ Represents a reference to a TextInfo object in the DLL.
|
|
|
+
|
|
|
+ A TextInfo is a specific line/piece of text with a specific X/Y, RGB color, and text.
|
|
|
+ '''
|
|
|
+ def __init__(self, control_pipe:ControlPipe, idx:int):
|
|
|
+ self.controlPipe = control_pipe
|
|
|
+ self._idx = idx
|
|
|
+
|
|
|
+ @contextlib.contextmanager
|
|
|
+ def targeting_this_textinfo(self):
|
|
|
+ previous_target = self.controlPipe._get_textinfo_target()
|
|
|
+ self.controlPipe._set_textinfo_target(self._idx)
|
|
|
+ try:
|
|
|
+ yield
|
|
|
+ finally:
|
|
|
+ self.controlPipe._set_textinfo_target(previous_target)
|
|
|
+
|
|
|
+ def set_text(self, text:str) -> None:
|
|
|
+ ''' Sets the text of this TextInfo '''
|
|
|
+ with self.targeting_this_textinfo():
|
|
|
+ self.controlPipe._set_text(text)
|
|
|
+
|
|
|
+ def set_color(self, red:int=255, green:int=255, blue:int=255) -> None:
|
|
|
+ ''' Sets the color of this TextInfo '''
|
|
|
+ with self.targeting_this_textinfo():
|
|
|
+ self.controlPipe._set_color(red, green, blue)
|
|
|
+
|
|
|
+ def set_coordinates(self, x:int=0, y:int=0) -> None:
|
|
|
+ ''' Sets the X/Y coordinates of this TextInfo '''
|
|
|
+ with self.targeting_this_textinfo():
|
|
|
+ self.controlPipe._set_coordinates(x, y)
|
|
|
+
|
|
|
+ def get_text(self) -> str:
|
|
|
+ ''' Gets the text of this TextInfo '''
|
|
|
+ with self.targeting_this_textinfo():
|
|
|
+ return self.controlPipe._get_text()
|
|
|
+
|
|
|
+ def get_color(self) -> Color:
|
|
|
+ ''' Gets the color of this TextInfo '''
|
|
|
+ with self.targeting_this_textinfo():
|
|
|
+ return self.controlPipe._get_color()
|
|
|
+
|
|
|
+ def get_coordinates(self) -> Size:
|
|
|
+ ''' Gets the X/Y coordinates of this TextInfo '''
|
|
|
+ with self.targeting_this_textinfo():
|
|
|
+ return self.controlPipe._get_coordinates()
|
|
|
+
|
|
|
+ def get_text_size(self) -> Size:
|
|
|
+ ''' Gets the pixel size of the text within this TextInfo '''
|
|
|
+ text = self.get_text()
|
|
|
+ return self.controlPipe.get_text_size(text)
|
|
|
+
|
|
|
+ def justify_this_with_respect_to_that(self, that_text_info:TypeVar('TextInfo'), justify:Justification=Justification.RIGHT_OF, gap:Union[None, int]=None):
|
|
|
+ '''
|
|
|
+ Put this TextInfo to the <justify> of that TextInfo.
|
|
|
+
|
|
|
+ Gap is the distance in pixels between text_infos. If it is None, will be the size of a space (' ') character.
|
|
|
+
|
|
|
+ Only self (this) moves. that_text_info does not move.
|
|
|
+
|
|
|
+ Note that if a coordinate is calculated to be negative (in order to be in the correct spot) it will be set to 0.
|
|
|
+ '''
|
|
|
+ if gap is None:
|
|
|
+ gap = self.controlPipe.get_text_size(' ').x
|
|
|
+
|
|
|
+ that_coordinates = that_text_info.get_coordinates()
|
|
|
+ # that_text_info DOES NOT move. Only self (this) does.
|
|
|
+
|
|
|
+ # Right now things can wind up out of bounds in postive x and y directions.
|
|
|
+ # We could fix that later if desired.
|
|
|
+
|
|
|
+ if justify == Justification.RIGHT_OF:
|
|
|
+ # THIS THAT
|
|
|
+ that_text_size = that_text_info.get_text_size()
|
|
|
+ new_x = that_coordinates.x + that_text_size.x + gap
|
|
|
+ self.set_coordinates(max(0, new_x), that_coordinates.y)
|
|
|
+ elif justify == Justification.LEFT_OF:
|
|
|
+ # THAT THIS
|
|
|
+ this_text_size = self.get_text_size()
|
|
|
+
|
|
|
+ new_x = that_coordinates.x - this_text_size.x - gap
|
|
|
+ self.set_coordinates(max(0, new_x), that_coordinates.y)
|
|
|
+ elif justify == Justification.BELOW:
|
|
|
+ # THIS
|
|
|
+ # THAT
|
|
|
+ that_text_size = that_text_info.get_text_size()
|
|
|
+ new_y = that_coordinates.y + that_text_size.y + gap
|
|
|
+
|
|
|
+ self.set_coordinates(that_coordinates.x, max(0, new_y))
|
|
|
+
|
|
|
+ elif justify == Justification.ABOVE:
|
|
|
+ # THAT
|
|
|
+ # THIS
|
|
|
+ this_text_size = self.get_text_size()
|
|
|
+ new_y = that_coordinates.y - this_text_size.y - gap
|
|
|
+
|
|
|
+ self.set_coordinates(that_coordinates.x, max(0, new_y))
|
|
|
+ else:
|
|
|
+ raise ValueError("justify must be defined in the Justification enum")
|