Get test library running

This commit is contained in:
2025-06-15 12:01:59 +02:00
parent b00466c5dd
commit b01232b552
5 changed files with 165 additions and 48 deletions

View File

@@ -25,24 +25,36 @@
Label:
id: sensor1
text: ""
size_hint: 1, 1
halign: 'left'
text_size: self.size
Label:
text: "Sensor 2: "
font_size: 20
Label:
id: sensor2
text: ""
size_hint: 1, 1
halign: 'left'
text_size: self.size
Label:
text: "Sensor 3: "
font_size: 20
Label:
id: sensor3
text: ""
size_hint: 1, 1
halign: 'left'
text_size: self.size
Label:
text: "Sensor 4: "
font_size: 20
Label:
id: sensor4
text: ""
size_hint: 1, 1
halign: 'left'
text_size: self.size
Button:
text: "Connect"
size_hint: 0.2, 0.1

View File

@@ -1,5 +1,6 @@
from ctypes import ArgumentError
from time import time
from types import prepare_class
from typing import List, override
from kivy.uix.screenmanager import Screen
from kivy.lang import Builder
@@ -73,14 +74,15 @@ class ReaderThread(threading.Thread):
data.append(
f"Tadc: {
self._decoder.decode_int(received[12 * i:12 * i + 4])
}\nTemperature: {
self._decoder.decode_float(received[12 * i + 5:12 * i + 11])
}\nDuty-Cycle: {
self._decoder.decode_float_long(received[48 + 5 * i: 52 + 5 * i]) / 65535.0 * 100
}\nTemp: {
round(self._decoder.decode_float(received[12 * i + 5:12 * i + 11]) * 1000) / 1000
}°C\nDC: {
round((self._decoder.decode_float_long(received[48 + 5 * i: 52 + 5 * i]) / 65535.0 * 100) * 1000) / 1000
}%"
)
# Calculate the frequency of updates
data.append(str(1 / (time() - start_time)))
data.append(str(round((1 / (time() - start_time)) * 1000) / 1000) + " Hz")
synced_queue.put(data)
else:
# Send error message to the UI updater
synced_queue.put(["ERR_HOOK"])
@@ -105,25 +107,36 @@ class MainScreen(Screen):
self._event = None
# Prepare the reader thread
self._reader = ReaderThread()
self._reader.setDaemon(True)
self._reader.set_com(com)
self._prepare_reader()
self._has_run = False
self._has_connected = False
# Call the constructor for the Screen class
super().__init__(**kw)
def _prepare_reader(self):
self._reader = ReaderThread()
self._reader.setDaemon(True)
self._reader.set_com(self._com)
# Start the connection to the micro-controller to read data from it.
# This also now starts the reader thread to continuously read out data
def start(self):
# Prevent running multiple times
if self._has_connected:
return
self.ids.status.text = "Connecting..."
if self._com.connect():
print("Acquired connection")
self._has_connected = True
self._has_run = True
if self._has_run:
self._prepare_reader()
# Start communication
self._reader.start()
print("Reader has started")
Clock.schedule_interval(self._update_screen, 0.5)
self._event = Clock.schedule_interval(self._update_screen, 0.5)
else:
self.ids.status.text = "Connection failed"
TwoActionPopup().open(
@@ -142,6 +155,11 @@ class MainScreen(Screen):
if self._event != None:
self._event.cancel()
self._reader.stop()
try:
self._reader.join()
except:
pass
try:
self._com.send("NM")
except:
@@ -185,7 +203,7 @@ class MainScreen(Screen):
# Switch the mode for the micro-controller
def switch_mode(self, new_mode: str):
# Store if we have been connected to the micro-controller before mode was switched
was_connected = self._reader.is_alive
was_connected = self._has_connected
# Disconnect from the micro-controller
self.end()
@@ -201,6 +219,7 @@ class MainScreen(Screen):
SingleRowPopup().open("Failed to switch modes")
return
self.ids.status.text = "Mode set"
# If we have been connected, reconnect
if was_connected:
self.start()

View File

@@ -58,6 +58,8 @@ class ProgramScreen(Screen):
# Add it to the config
config.append(config_sensor_i)
self._set_ui(config)
else:
TwoActionPopup().open(
"Failed to connect to micro-controller, retry?",
@@ -100,8 +102,9 @@ class ProgramScreen(Screen):
else:
try:
self._instructions.change_config(data)
except:
except Exception as e:
SingleRowPopup().open("Could not save data!")
return
SingleRowPopup().open("Data saved successfully")

View File

@@ -41,7 +41,8 @@ class Instructions:
while time.time() - start < 5:
# If the decoded ascii character is equal to the next expected character, move pointer right by one
# If not, jump back to start
if decoder.decode_ascii(self._com.receive(1)) == sequence[pointer]:
data = decoder.decode_ascii(self._com.receive(1));
if data == sequence[pointer]:
pointer += 1
else:
pointer = 0
@@ -53,7 +54,7 @@ class Instructions:
# If we time out, which is the only way in which this code can be reached, return False
return False
# Used to hook to the main data stream, as that hooking mechanism is differen
# Used to hook to the main data stream, as that hooking mechanism is different
def hook_main(self) -> bool:
# Record start time to respond to timeout
start = time.time()

View File

@@ -3,7 +3,7 @@ Library to be used in standalone mode (without microcontroller, for testing func
It simulates the behviour of an actual microcontroller being connected
"""
from typing import Optional
from typing import List, Optional
import queue
import random
import time
@@ -26,11 +26,28 @@ instruction_lut: dict[str, list[str]] = {
"FM": ["\n", "F", "M", "\n"],
}
reconfig = ["a", "b", "c", "t"]
class SimulationError(Exception):
pass
class SensorConfig:
a: float
b: float
c: float
t: float
def __init__(
self, a: float = 20, b: float = 30, c: float = 10, t: float = 55
) -> None:
self.a = a
self.b = b
self.c = c
self.t = t
class Com(ComSuperClass):
def __init__(
self, baudrate: int = 19200, filters: Optional[list[str]] = None
@@ -43,6 +60,16 @@ class Com(ComSuperClass):
self.__simulated_data: queue.Queue[bytes] = queue.Queue()
self.__simulated_data_remaining = 0
self.__reconf_sensor = 0
self.__reconf_step = 0
self.__config: List[SensorConfig] = [
SensorConfig(),
SensorConfig(),
SensorConfig(),
SensorConfig(),
]
# Initially, we are in normal mode (which leads to slower data intervals)
self.__mode = "NM"
@@ -71,7 +98,7 @@ class Com(ComSuperClass):
for _ in range(byte_count):
if self.__mode == "NM":
time.sleep(0.001)
time.sleep(0.005)
try:
data.append(self.__simulated_data.get_nowait())
self.__simulated_data_remaining -= 1
@@ -81,41 +108,57 @@ class Com(ComSuperClass):
"Simulation encountered an error with the simulation queue. The error encountered: \n"
+ str(e)
)
return b''.join(data)
return b"".join(data)
def send(self, msg: str) -> None:
# Using LUT to reference
readback = instruction_lut.get(msg)
if readback != None:
for i in range(len(readback)):
self.__simulated_data.put(bytes(readback[i], "ascii"))
self.__add_ascii_char(readback[i])
if msg == "RD":
# Handle ReadData readback
# self.__simulated_data.put(ord(""))
pass
self.__set_read_data_data()
elif msg == "PR":
self.__reconf_sensor = 0
self.__reconf_step = 0
self.__add_ascii_char("a")
self.__add_ascii_char("0")
self.__add_ascii_char("\n")
def __set_read_data_data(self) -> None:
# Send data for all four sensors
for i in range(4):
self.__add_float_as_hex(self.__config[i].a)
self.__add_ascii_char(" ")
self.__add_float_as_hex(self.__config[i].b)
self.__add_ascii_char(" ")
self.__add_float_as_hex(self.__config[i].c)
self.__add_ascii_char(" ")
self.__add_float_as_hex(self.__config[i].t)
self.__add_ascii_char("\n")
def send_float(self, msg: float) -> None:
# Encode float as 8 bytes (64 bit)
ba = struct.pack("d", msg)
for byte in ba:
self.__simulated_data.put(byte.to_bytes())
if self.__reconf_step == 0:
self.__config[self.__reconf_sensor].a = msg
elif self.__reconf_step == 1:
self.__config[self.__reconf_sensor].b = msg
elif self.__reconf_step == 2:
self.__config[self.__reconf_sensor].c = msg
elif self.__reconf_step == 3:
self.__config[self.__reconf_sensor].t = msg
def __fill_queue_alternative(self):
for _ in range(4):
for _ in range(4):
self.__simulated_data.put(random.randbytes(1))
self.__simulated_data.put(bytes(" ", "ascii"))
for _ in range(6):
self.__simulated_data.put(random.randbytes(1))
self.__simulated_data.put(bytes(" ", "ascii"))
for _ in range(3):
for _ in range(4):
self.__simulated_data.put(random.randbytes(1))
self.__simulated_data.put(bytes(" ", "ascii"))
for _ in range(4):
self.__simulated_data.put(random.randbytes(1))
self.__simulated_data.put(bytes("\n", "ascii"))
self.__simulated_data_remaining = 68
if self.__reconf_step == 3:
self.__reconf_step = 0
self.__reconf_sensor += 1
else:
self.__reconf_step += 1
if self.__reconf_sensor == 4:
return
self.__add_ascii_char(reconfig[self.__reconf_step])
self.__add_ascii_char(str(self.__reconf_sensor))
self.__add_ascii_char("\n")
def __fill_queue(self):
for _ in range(4):
@@ -127,10 +170,10 @@ class Com(ComSuperClass):
for _ in range(3):
self.__add_integer_as_hex(self.__generate_random_int(65535))
self.__simulated_data.put(bytes(" ", "ascii"))
self.__simulated_data_remaining += 1
self.__add_integer_as_hex(self.__generate_random_int(65535))
self.__simulated_data.put(bytes("\n", "ascii"))
self.__simulated_data_remaining += 4
print("Length:", self.__simulated_data_remaining)
self.__simulated_data_remaining += 1
def __generate_random_int(self, max: int) -> int:
return random.randint(0, max)
@@ -138,11 +181,50 @@ class Com(ComSuperClass):
def __generate_random_float(self, max: int) -> float:
return random.random() * max
def __add_character_as_hex(self, data: str):
pass
def __add_ascii_char(self, ascii_string: str):
self.__simulated_data.put(ord(ascii_string).to_bytes(1))
self.__simulated_data_remaining += 1
def __add_integer_as_hex(self, data: int):
pass
def __add_two_byte_value(self, c: int):
"""putchhex
def __add_float_as_hex(self, data: float):
pass
Args:
c: The char (as integer)
"""
# First nibble (high)
high_nibble = (c >> 4) & 0x0F
high_char = chr(high_nibble + 48 if high_nibble < 10 else high_nibble + 55)
self.__simulated_data.put(high_char.encode())
# Second nibble (low)
low_nibble = c & 0x0F
low_char = chr(low_nibble + 48 if low_nibble < 10 else low_nibble + 55)
self.__simulated_data.put(low_char.encode())
self.__simulated_data_remaining += 2
def __add_integer_as_hex(self, c: int):
"""Writes the hexadecimal representation of the high and low bytes of integer `c` (16-bit) to the simulated serial port."""
if not (0 <= c <= 0xFFFF):
raise ValueError("Input must be a 16-bit integer (065535)")
# Get high byte (most significant byte)
hi_byte = (c >> 8) & 0xFF
# Get low byte (least significant byte)
lo_byte = c & 0xFF
# Call putchhex for the high byte and low byte
self.__add_two_byte_value(hi_byte)
self.__add_two_byte_value(lo_byte)
def __add_float_as_hex(self, f: float):
"""Converts a float to its byte representation and sends the bytes using putchhex."""
# Pack the float into bytes (IEEE 754 format)
packed = struct.pack(">f", f) # Big-endian format (network byte order)
# Unpack the bytes into 3 bytes: high, mid, low
high, mid, low = packed[0], packed[1], packed[2]
# Send each byte as hex
self.__add_two_byte_value(high)
self.__add_two_byte_value(mid)
self.__add_two_byte_value(low)