From ffd75d94dc2ef8481915836ae3bad113a1fa7a3d Mon Sep 17 00:00:00 2001 From: Janis Hutz Date: Wed, 5 Mar 2025 11:20:01 +0100 Subject: [PATCH] Begin rewrite --- biogascontrollerapp/gui/credits.kv | 0 biogascontrollerapp/gui/home.kv | 0 biogascontrollerapp/gui/popups.kv | 0 biogascontrollerapp/gui/program.kv | 0 biogascontrollerapp/gui/settings.kv | 0 biogascontrollerapp/lib/com.py | 80 +++++++++++++++++++++++++ biogascontrollerapp/lib/decoder.py | 17 ++++++ biogascontrollerapp/lib/instructions.py | 22 +++++++ biogascontrollerapp/lib/test/com.py | 36 +++++++++++ 9 files changed, 155 insertions(+) create mode 100644 biogascontrollerapp/gui/credits.kv create mode 100644 biogascontrollerapp/gui/home.kv create mode 100644 biogascontrollerapp/gui/popups.kv create mode 100644 biogascontrollerapp/gui/program.kv create mode 100644 biogascontrollerapp/gui/settings.kv create mode 100644 biogascontrollerapp/lib/com.py create mode 100644 biogascontrollerapp/lib/decoder.py create mode 100644 biogascontrollerapp/lib/instructions.py create mode 100644 biogascontrollerapp/lib/test/com.py diff --git a/biogascontrollerapp/gui/credits.kv b/biogascontrollerapp/gui/credits.kv new file mode 100644 index 0000000..e69de29 diff --git a/biogascontrollerapp/gui/home.kv b/biogascontrollerapp/gui/home.kv new file mode 100644 index 0000000..e69de29 diff --git a/biogascontrollerapp/gui/popups.kv b/biogascontrollerapp/gui/popups.kv new file mode 100644 index 0000000..e69de29 diff --git a/biogascontrollerapp/gui/program.kv b/biogascontrollerapp/gui/program.kv new file mode 100644 index 0000000..e69de29 diff --git a/biogascontrollerapp/gui/settings.kv b/biogascontrollerapp/gui/settings.kv new file mode 100644 index 0000000..e69de29 diff --git a/biogascontrollerapp/lib/com.py b/biogascontrollerapp/lib/com.py new file mode 100644 index 0000000..f884688 --- /dev/null +++ b/biogascontrollerapp/lib/com.py @@ -0,0 +1,80 @@ +from typing import Optional +import serial +import struct +import serial.tools.list_ports + + +class Com: + def __init__(self, filters: Optional[list[str]] = None) -> None: + self._serial: Optional[serial.Serial] = None + self._filters = filters if filters != None else [ 'USB-Serial Controller', 'Prolific USB-Serial Controller' ] + self._port_override = '' + + def set_port_override(self, override: str) -> None: + """Set the port override, to disable port search""" + self._port_override = override + + def get_comport(self) -> str: + """Find the comport the microcontroller has attached to""" + if self._port_override != '': + return self._port_override + + # Catch all errors and simply return an empty string if search unsuccessful + try: + # Get an array of all used comports + ports = [comport.device for comport in serial.tools.list_ports.comports()] + + # Filter for specific controller + for comport in ports: + for filter in self._filters: + if ( filter in comport ): + return comport + except: + pass + + return '' + + def connect(self, baud_rate: int, port_override: Optional[str] = None) -> bool: + """Try to find a comport and connect to the microcontroller. Returns the success as a boolean""" + comport = self.get_comport() + + # Comport search returns empty string if search unsuccessful + if comport == '': + try: + self._serial = serial.Serial(comport, baud_rate, timeout=5) + except: + return False + return True + else: + return False + + def close(self) -> None: + """Close the serial connection, if possible""" + if self._serial != None: + try: + self._serial.close() + except: + pass + + def receive(self, byte_count: int) -> bytes: + """Recieve bytes from microcontroller over serial. Returns bytes. Might want to decode using functions from lib.tools""" + if self._serial == None: + self.connect(19200) + if self._serial != None: + return self._serial.read(byte_count) + else: + raise Exception('ERR_CONNECTION') + + def send(self, msg: str) -> None: + """Send a string over serial connection.""" + if self._serial == None: + self.connect(19200) + if self._serial != None: + self._serial.write(msg.encode()) + + def send_float(self, msg: float) -> None: + """Send a float number over serial connection""" + if self._serial == None: + self.connect(19200) + if self._serial != None: + self._serial.write(bytearray(struct.pack('>f', msg))[0:3]) diff --git a/biogascontrollerapp/lib/decoder.py b/biogascontrollerapp/lib/decoder.py new file mode 100644 index 0000000..b8f1b2f --- /dev/null +++ b/biogascontrollerapp/lib/decoder.py @@ -0,0 +1,17 @@ +import struct + +class Decoder: + def decode_ascii(self, value: bytes) -> str: + try: + return value.decode() + except: + return 'Error' + + def decode_float(self, value: bytes) -> float: + return struct.unpack('>f', bytes.fromhex(str(value, 'ascii') + '00'))[0] + + def decode_float_long(self, value: bytes) -> float: + return struct.unpack('>f', bytes.fromhex(str(value, 'ascii') + '0000'))[0] + + def decode_int(self, value: bytes) -> int: + return int(value, base=16) diff --git a/biogascontrollerapp/lib/instructions.py b/biogascontrollerapp/lib/instructions.py new file mode 100644 index 0000000..182bde2 --- /dev/null +++ b/biogascontrollerapp/lib/instructions.py @@ -0,0 +1,22 @@ +from typing import Optional +import lib.com +import lib.decoder + +# TODO: Load filters (for comport search) +com = lib.com.Com() +decoder = lib.decoder.Decoder() + +class Instructions: + def __init__(self) -> None: + pass + + def _hook(self, instruction: str, sequence: list[str]) -> bool: + return False + + def change_temperature(self, new_temps: list[float]) -> None: + pass + + def change_config(self, new_config: list[float]) -> None: + pass + + diff --git a/biogascontrollerapp/lib/test/com.py b/biogascontrollerapp/lib/test/com.py new file mode 100644 index 0000000..4875b21 --- /dev/null +++ b/biogascontrollerapp/lib/test/com.py @@ -0,0 +1,36 @@ +""" +Library to be used in standalone mode (without microcontroller, for testing functionality) +""" + +from typing import Optional +import queue + + +class Com: + def __init__(self) -> None: + # Initialize queue with values to be sent on call of recieve (add like three or so at a time) + self._port_override = '' + + def set_port_override(self, override: str) -> None: + """Set the port override, to disable port search""" + self._port_override = override + + def get_comport(self) -> str: + return 'test' if self._port_override != '' else self._port_override + + def connect(self, baud_rate: int, port_override: Optional[str] = None) -> bool: + return True # TODO: For testing, make cases where there is no successful connection, i.e. we return false + + def close(self) -> None: + pass + + def receive(self, byte_count: int) -> None: + # TODO: Make it return simulated data + pass + + def send(self, msg: str) -> None: + # TODO: Use LUT to find what should be added to the queue for read + pass + + def send_float(self, msg: float) -> None: + pass