From b01232b5529fd0aac1d978657663f3512e25429e Mon Sep 17 00:00:00 2001 From: Janis Hutz Date: Sun, 15 Jun 2025 12:01:59 +0200 Subject: [PATCH] Get test library running --- gui/main/main.kv | 12 ++++ gui/main/main.py | 39 ++++++++--- gui/program/program.py | 5 +- lib/instructions.py | 5 +- lib/test/com.py | 152 +++++++++++++++++++++++++++++++---------- 5 files changed, 165 insertions(+), 48 deletions(-) diff --git a/gui/main/main.kv b/gui/main/main.kv index ccff974..6623a61 100644 --- a/gui/main/main.kv +++ b/gui/main/main.kv @@ -25,24 +25,36 @@ Label: id: sensor1 text: "" + size_hint: 1, 1 + halign: 'left' + text_size: self.size Label: text: "Sensor 2: " font_size: 20 Label: id: sensor2 text: "" + size_hint: 1, 1 + halign: 'left' + text_size: self.size Label: text: "Sensor 3: " font_size: 20 Label: id: sensor3 text: "" + size_hint: 1, 1 + halign: 'left' + text_size: self.size Label: text: "Sensor 4: " font_size: 20 Label: id: sensor4 text: "" + size_hint: 1, 1 + halign: 'left' + text_size: self.size Button: text: "Connect" size_hint: 0.2, 0.1 diff --git a/gui/main/main.py b/gui/main/main.py index 15f49a7..7fe1554 100644 --- a/gui/main/main.py +++ b/gui/main/main.py @@ -1,5 +1,6 @@ from ctypes import ArgumentError from time import time +from types import prepare_class from typing import List, override from kivy.uix.screenmanager import Screen from kivy.lang import Builder @@ -73,14 +74,15 @@ class ReaderThread(threading.Thread): data.append( f"Tadc: { self._decoder.decode_int(received[12 * i:12 * i + 4]) - }\nTemperature: { - self._decoder.decode_float(received[12 * i + 5:12 * i + 11]) - }\nDuty-Cycle: { - self._decoder.decode_float_long(received[48 + 5 * i: 52 + 5 * i]) / 65535.0 * 100 + }\nTemp: { + round(self._decoder.decode_float(received[12 * i + 5:12 * i + 11]) * 1000) / 1000 + }°C\nDC: { + round((self._decoder.decode_float_long(received[48 + 5 * i: 52 + 5 * i]) / 65535.0 * 100) * 1000) / 1000 }%" ) # Calculate the frequency of updates - data.append(str(1 / (time() - start_time))) + data.append(str(round((1 / (time() - start_time)) * 1000) / 1000) + " Hz") + synced_queue.put(data) else: # Send error message to the UI updater synced_queue.put(["ERR_HOOK"]) @@ -105,25 +107,36 @@ class MainScreen(Screen): self._event = None # Prepare the reader thread - self._reader = ReaderThread() - self._reader.setDaemon(True) - self._reader.set_com(com) + self._prepare_reader() + self._has_run = False self._has_connected = False # Call the constructor for the Screen class super().__init__(**kw) + def _prepare_reader(self): + self._reader = ReaderThread() + self._reader.setDaemon(True) + self._reader.set_com(self._com) + # Start the connection to the micro-controller to read data from it. # This also now starts the reader thread to continuously read out data def start(self): + # Prevent running multiple times + if self._has_connected: + return + self.ids.status.text = "Connecting..." if self._com.connect(): print("Acquired connection") self._has_connected = True + self._has_run = True + if self._has_run: + self._prepare_reader() # Start communication self._reader.start() print("Reader has started") - Clock.schedule_interval(self._update_screen, 0.5) + self._event = Clock.schedule_interval(self._update_screen, 0.5) else: self.ids.status.text = "Connection failed" TwoActionPopup().open( @@ -142,6 +155,11 @@ class MainScreen(Screen): if self._event != None: self._event.cancel() self._reader.stop() + try: + self._reader.join() + except: + pass + try: self._com.send("NM") except: @@ -185,7 +203,7 @@ class MainScreen(Screen): # Switch the mode for the micro-controller def switch_mode(self, new_mode: str): # Store if we have been connected to the micro-controller before mode was switched - was_connected = self._reader.is_alive + was_connected = self._has_connected # Disconnect from the micro-controller self.end() @@ -201,6 +219,7 @@ class MainScreen(Screen): SingleRowPopup().open("Failed to switch modes") return + self.ids.status.text = "Mode set" # If we have been connected, reconnect if was_connected: self.start() diff --git a/gui/program/program.py b/gui/program/program.py index baa7d1b..ce074fb 100644 --- a/gui/program/program.py +++ b/gui/program/program.py @@ -58,6 +58,8 @@ class ProgramScreen(Screen): # Add it to the config config.append(config_sensor_i) + + self._set_ui(config) else: TwoActionPopup().open( "Failed to connect to micro-controller, retry?", @@ -100,8 +102,9 @@ class ProgramScreen(Screen): else: try: self._instructions.change_config(data) - except: + except Exception as e: SingleRowPopup().open("Could not save data!") + return SingleRowPopup().open("Data saved successfully") diff --git a/lib/instructions.py b/lib/instructions.py index 0c96eef..f10fbf1 100644 --- a/lib/instructions.py +++ b/lib/instructions.py @@ -41,7 +41,8 @@ class Instructions: while time.time() - start < 5: # If the decoded ascii character is equal to the next expected character, move pointer right by one # If not, jump back to start - if decoder.decode_ascii(self._com.receive(1)) == sequence[pointer]: + data = decoder.decode_ascii(self._com.receive(1)); + if data == sequence[pointer]: pointer += 1 else: pointer = 0 @@ -53,7 +54,7 @@ class Instructions: # If we time out, which is the only way in which this code can be reached, return False return False - # Used to hook to the main data stream, as that hooking mechanism is differen + # Used to hook to the main data stream, as that hooking mechanism is different def hook_main(self) -> bool: # Record start time to respond to timeout start = time.time() diff --git a/lib/test/com.py b/lib/test/com.py index 9c4da87..fded0ba 100644 --- a/lib/test/com.py +++ b/lib/test/com.py @@ -3,7 +3,7 @@ Library to be used in standalone mode (without microcontroller, for testing func It simulates the behviour of an actual microcontroller being connected """ -from typing import Optional +from typing import List, Optional import queue import random import time @@ -26,11 +26,28 @@ instruction_lut: dict[str, list[str]] = { "FM": ["\n", "F", "M", "\n"], } +reconfig = ["a", "b", "c", "t"] + class SimulationError(Exception): pass +class SensorConfig: + a: float + b: float + c: float + t: float + + def __init__( + self, a: float = 20, b: float = 30, c: float = 10, t: float = 55 + ) -> None: + self.a = a + self.b = b + self.c = c + self.t = t + + class Com(ComSuperClass): def __init__( self, baudrate: int = 19200, filters: Optional[list[str]] = None @@ -43,6 +60,16 @@ class Com(ComSuperClass): self.__simulated_data: queue.Queue[bytes] = queue.Queue() self.__simulated_data_remaining = 0 + self.__reconf_sensor = 0 + self.__reconf_step = 0 + + self.__config: List[SensorConfig] = [ + SensorConfig(), + SensorConfig(), + SensorConfig(), + SensorConfig(), + ] + # Initially, we are in normal mode (which leads to slower data intervals) self.__mode = "NM" @@ -71,7 +98,7 @@ class Com(ComSuperClass): for _ in range(byte_count): if self.__mode == "NM": - time.sleep(0.001) + time.sleep(0.005) try: data.append(self.__simulated_data.get_nowait()) self.__simulated_data_remaining -= 1 @@ -81,41 +108,57 @@ class Com(ComSuperClass): "Simulation encountered an error with the simulation queue. The error encountered: \n" + str(e) ) - return b''.join(data) + return b"".join(data) def send(self, msg: str) -> None: # Using LUT to reference readback = instruction_lut.get(msg) if readback != None: for i in range(len(readback)): - self.__simulated_data.put(bytes(readback[i], "ascii")) + self.__add_ascii_char(readback[i]) if msg == "RD": - # Handle ReadData readback - # self.__simulated_data.put(ord("")) - pass + self.__set_read_data_data() + elif msg == "PR": + self.__reconf_sensor = 0 + self.__reconf_step = 0 + self.__add_ascii_char("a") + self.__add_ascii_char("0") + self.__add_ascii_char("\n") + + def __set_read_data_data(self) -> None: + # Send data for all four sensors + for i in range(4): + self.__add_float_as_hex(self.__config[i].a) + self.__add_ascii_char(" ") + self.__add_float_as_hex(self.__config[i].b) + self.__add_ascii_char(" ") + self.__add_float_as_hex(self.__config[i].c) + self.__add_ascii_char(" ") + self.__add_float_as_hex(self.__config[i].t) + self.__add_ascii_char("\n") def send_float(self, msg: float) -> None: - # Encode float as 8 bytes (64 bit) - ba = struct.pack("d", msg) - for byte in ba: - self.__simulated_data.put(byte.to_bytes()) + if self.__reconf_step == 0: + self.__config[self.__reconf_sensor].a = msg + elif self.__reconf_step == 1: + self.__config[self.__reconf_sensor].b = msg + elif self.__reconf_step == 2: + self.__config[self.__reconf_sensor].c = msg + elif self.__reconf_step == 3: + self.__config[self.__reconf_sensor].t = msg - def __fill_queue_alternative(self): - for _ in range(4): - for _ in range(4): - self.__simulated_data.put(random.randbytes(1)) - self.__simulated_data.put(bytes(" ", "ascii")) - for _ in range(6): - self.__simulated_data.put(random.randbytes(1)) - self.__simulated_data.put(bytes(" ", "ascii")) - for _ in range(3): - for _ in range(4): - self.__simulated_data.put(random.randbytes(1)) - self.__simulated_data.put(bytes(" ", "ascii")) - for _ in range(4): - self.__simulated_data.put(random.randbytes(1)) - self.__simulated_data.put(bytes("\n", "ascii")) - self.__simulated_data_remaining = 68 + if self.__reconf_step == 3: + self.__reconf_step = 0 + self.__reconf_sensor += 1 + else: + self.__reconf_step += 1 + + if self.__reconf_sensor == 4: + return + + self.__add_ascii_char(reconfig[self.__reconf_step]) + self.__add_ascii_char(str(self.__reconf_sensor)) + self.__add_ascii_char("\n") def __fill_queue(self): for _ in range(4): @@ -127,10 +170,10 @@ class Com(ComSuperClass): for _ in range(3): self.__add_integer_as_hex(self.__generate_random_int(65535)) self.__simulated_data.put(bytes(" ", "ascii")) + self.__simulated_data_remaining += 1 self.__add_integer_as_hex(self.__generate_random_int(65535)) self.__simulated_data.put(bytes("\n", "ascii")) - self.__simulated_data_remaining += 4 - print("Length:", self.__simulated_data_remaining) + self.__simulated_data_remaining += 1 def __generate_random_int(self, max: int) -> int: return random.randint(0, max) @@ -138,11 +181,50 @@ class Com(ComSuperClass): def __generate_random_float(self, max: int) -> float: return random.random() * max - def __add_character_as_hex(self, data: str): - pass + def __add_ascii_char(self, ascii_string: str): + self.__simulated_data.put(ord(ascii_string).to_bytes(1)) + self.__simulated_data_remaining += 1 - def __add_integer_as_hex(self, data: int): - pass + def __add_two_byte_value(self, c: int): + """putchhex - def __add_float_as_hex(self, data: float): - pass + Args: + c: The char (as integer) + """ + # First nibble (high) + high_nibble = (c >> 4) & 0x0F + high_char = chr(high_nibble + 48 if high_nibble < 10 else high_nibble + 55) + self.__simulated_data.put(high_char.encode()) + + # Second nibble (low) + low_nibble = c & 0x0F + low_char = chr(low_nibble + 48 if low_nibble < 10 else low_nibble + 55) + self.__simulated_data.put(low_char.encode()) + self.__simulated_data_remaining += 2 + + def __add_integer_as_hex(self, c: int): + """Writes the hexadecimal representation of the high and low bytes of integer `c` (16-bit) to the simulated serial port.""" + if not (0 <= c <= 0xFFFF): + raise ValueError("Input must be a 16-bit integer (0–65535)") + + # Get high byte (most significant byte) + hi_byte = (c >> 8) & 0xFF + # Get low byte (least significant byte) + lo_byte = c & 0xFF + + # Call putchhex for the high byte and low byte + self.__add_two_byte_value(hi_byte) + self.__add_two_byte_value(lo_byte) + + def __add_float_as_hex(self, f: float): + """Converts a float to its byte representation and sends the bytes using putchhex.""" + # Pack the float into bytes (IEEE 754 format) + packed = struct.pack(">f", f) # Big-endian format (network byte order) + + # Unpack the bytes into 3 bytes: high, mid, low + high, mid, low = packed[0], packed[1], packed[2] + + # Send each byte as hex + self.__add_two_byte_value(high) + self.__add_two_byte_value(mid) + self.__add_two_byte_value(low)