Main, Program Screen basically done, UI Tweaks, backend fixes, start writing testing library

This commit is contained in:
2025-05-08 18:12:26 +02:00
parent 92836fe427
commit e71f9e6d02
13 changed files with 592 additions and 105 deletions

View File

@@ -5,11 +5,11 @@ import serial.tools.list_ports
class Com:
def __init__(self, filters: Optional[list[str]] = None) -> None:
def __init__(self, baudrate: int = 19200, filters: Optional[list[str]] = None) -> None:
self._serial: Optional[serial.Serial] = None
self._filters = filters if filters != None else [ 'USB-Serial Controller', 'Prolific USB-Serial Controller' ]
self._port_override = ''
self._baudrate = 19200
self._baudrate = baudrate
self._err = None
def set_port_override(self, override: str) -> None:
@@ -47,7 +47,7 @@ class Com:
except:
pass
return ''
return ""
def _open(self) -> bool:
comport = self.get_comport()
@@ -63,9 +63,8 @@ class Com:
else:
return False
def connect(self, baud_rate: int) -> bool:
def connect(self) -> bool:
"""Try to find a comport and connect to the microcontroller. Returns the success as a boolean"""
self._baudrate = baud_rate
return self._connection_check()
def close(self) -> None:

View File

@@ -1,61 +1,100 @@
import lib.com
from lib.com import Com
import lib.decoder
import time
# TODO: Load filters (for comport search)
com = lib.com.Com()
decoder = lib.decoder.Decoder()
class Instructions:
def set_port_override(self, override: str) -> None:
com.set_port_override(override)
def _hook(self, instruction: str, sequence: list[str]) -> bool:
# Class that supports sending instructions to the microcontroller,
# as well as hooking to data stream according to protocol
class Instructions:
def __init__(self, com: Com) -> None:
self._com = com
# Set a port override (to use a specific COM port)
def set_port_override(self, override: str) -> None:
self._com.set_port_override(override)
# Helper method to hook to the data stream according to protocol.
# You can specify the sequence that the program listens to to sync up,
# as an array of strings, that should each be of length one and only contain
# ascii characters
def hook(self, instruction: str, sequence: list[str]) -> bool:
# Add protection: If we cannot establish connection, refuse to run
if not self._com.connect():
return False
# Send instruction to microcontroller to start hooking process
com.send(instruction)
# If instruction is an empty string, do not send instruction
if instruction != "":
self._com.send(instruction)
# Record start time to respond to timeout
start = time.time()
# Check for timeout
# The pointer below points to the element in the array which is the next expected character to be received
pointer = 0
sequence_max = len(sequence) - 1
# Simply the length of the sequence, since it is both cheaper and cleaner to calculate it once
sequence_max = len(sequence)
# Only run for a limited amount of time
while time.time() - start < 5:
if ( decoder.decode_ascii( com.receive(1) ) ) == sequence[pointer]:
# If the decoded ascii character is equal to the next expected character, move pointer right by one
# If not, jump back to start
if (decoder.decode_ascii(self._com.receive(1))) == sequence[pointer]:
pointer += 1
else:
pointer = 0
# If the pointer has reached the end of the sequence, return True, as now the hook was successful
if pointer == sequence_max:
return True
# If we time out, which is the only way in which this code can be reached, return False
return False
def _change_data(self, instruction: str, readback: list[str], data: list[float], readback_length: int) -> None:
# Private helper method to transmit data using the necessary protocols
def _change_data(
self,
instruction: str,
readback: list[str],
data: list[float],
readback_length: int,
) -> None:
# Hook to stream
if self._hook(instruction, readback):
if self.hook(instruction, readback):
# Transmit data
while len(data) > 0:
if com.receive(readback_length) != '':
com.send_float(data.pop(0))
# If we received data back, we can send more data, i.e. from this we know
# the controller has received the data
# If not, we close the connection and create an exception
if self._com.receive(readback_length) != "":
self._com.send_float(data.pop(0))
else:
com.close()
raise Exception('Failed to transmit data. No response from controller')
com.close()
self._com.close()
raise Exception(
"Failed to transmit data. No response from controller"
)
self._com.close()
else:
com.close()
raise ConnectionError('Failed to hook to controller data stream. No fitting response received')
self._com.close()
raise ConnectionError(
"Failed to hook to controller data stream. No fitting response received"
)
# Abstraction of the _change_data method specifically designed to change the entire config
def change_config(self, new_config: list[float]) -> None:
self._change_data('PR', ['\n', 'P', 'R', '\n'], new_config, 3)
try:
self._change_data("PR", ["\n", "P", "R", "\n"], new_config, 3)
except Exception as e:
raise e
# Abstraction of the _change_data method specifically designed to change only the configured temperature
def change_temperature(self, temperatures: list[float]) -> None:
self._change_data('PT', ['\n', 'P', 'T', '\n'], temperatures, 3)
def enable_fastmode(self) -> None:
com.send('FM')
com.close()
def disable_fastmode(self) -> None:
com.send('NM')
com.close()
try:
self._change_data("PT", ["\n", "P", "T", "\n"], temperatures, 3)
except Exception as e:
raise e

View File

@@ -1,36 +1,55 @@
"""
Library to be used in standalone mode (without microcontroller, for testing functionality)
It simulates the behviour of an actual microcontroller being connected
"""
from typing import Optional
import queue
import random
# This file contains a Com class that can be used to test the functionality
# even without a microcontroller. It is not documented in a particularly
# beginner-friendly way, nor is the code written with beginner-friendliness
# in mind. It is the most complicated piece of code of the entire application
# All double __ prefixed properties are not available in the actual one
class Com:
def __init__(self) -> None:
# Initialize queue with values to be sent on call of recieve (add like three or so at a time)
self._port_override = ''
self._port_override = ""
self.__mode = ""
self.__simulated_data = queue.Queue()
def set_port_override(self, override: str) -> None:
"""Set the port override, to disable port search"""
self._port_override = override
def get_comport(self) -> str:
return 'test' if self._port_override != '' else self._port_override
return "test" if self._port_override != "" else self._port_override
def connect(self, baud_rate: int, port_override: Optional[str] = None) -> bool:
return True # TODO: For testing, make cases where there is no successful connection, i.e. we return false
def connect(self) -> bool:
# TODO: For testing, make cases where there is no successful connection, i.e. we return false
# Randomly return false
if random.randint(0, 20):
return False
return True
def close(self) -> None:
pass
def receive(self, byte_count: int) -> None:
def receive(self, byte_count: int) -> bytes:
# TODO: Make it return simulated data
pass
return bytes("A", "ascii")
def send(self, msg: str) -> None:
# TODO: Use LUT to find what should be added to the queue for read
# Using LUT to reference
pass
def send_float(self, msg: float) -> None:
pass
def _generate_random_value(self, precision: int) -> bytes:
return bytes(str(round(random.random() * precision) / precision), "ascii")