Improve Com class, continue writing test

This commit is contained in:
2025-05-09 11:03:49 +02:00
parent e71f9e6d02
commit a8ad40148f
10 changed files with 125 additions and 52 deletions

View File

@@ -16,7 +16,8 @@ import os
import configparser import configparser
from typing import override from typing import override
from lib.com import Com from lib.com import Com, ComSuperClass
import lib.test.com
# Load the config file # Load the config file
@@ -24,7 +25,7 @@ config = configparser.ConfigParser()
config.read("./config.ini") config.read("./config.ini")
# Load config and disable kivy log if necessary # Load config and disable kivy log if necessary
if config["Dev Settings"]["verbose"] == "True": if config["Dev"]["verbose"] == "True":
pass pass
else: else:
os.environ["KIVY_NO_CONSOLELOG"] = "1" os.environ["KIVY_NO_CONSOLELOG"] = "1"
@@ -63,7 +64,10 @@ class BiogasControllerApp(App):
@override @override
def build(self): def build(self):
com = Com() com: ComSuperClass = Com()
if config["Dev"]["use_test_library"] == "True":
com = lib.test.com.Com()
self.icon = "./BiogasControllerAppLogo.png" self.icon = "./BiogasControllerAppLogo.png"
self.title = "BiogasControllerApp-" + app_version self.title = "BiogasControllerApp-" + app_version
self.screen_manager.add_widget(HomeScreen(com, name="home")) self.screen_manager.add_widget(HomeScreen(com, name="home"))

View File

@@ -1,17 +1,14 @@
[Port Settings] [Ports]
specificport = None specificport = None
[UI Config] [UI]
sizeh = 600 sizeh = 600
sizew = 800 sizew = 800
[Dev Settings] [Dev]
verbose = True verbose = True
log_level = DEBUG log_level = DEBUG
disableconnectioncheck = True use_test_library = True
[License]
show = 1
[Info] [Info]
version = V2.3.0 version = V2.3.0

View File

@@ -1,32 +1,26 @@
from kivy.uix.screenmanager import Screen from kivy.uix.screenmanager import Screen
from kivy.lang import Builder from kivy.lang import Builder
from gui.popups.popups import QuitPopup, TwoActionPopup from gui.popups.popups import QuitPopup, TwoActionPopup
from lib.com import Com from lib.com import ComSuperClass
import configparser import configparser
config = configparser.ConfigParser()
config.read('./config.ini')
# This is the launch screen, i.e. what you see when you start up the app # This is the launch screen, i.e. what you see when you start up the app
class HomeScreen(Screen): class HomeScreen(Screen):
def __init__(self, com: Com, **kw): def __init__(self, com: ComSuperClass, **kw):
self._com = com; self._com = com;
super().__init__(**kw) super().__init__(**kw)
# Go to the main screen if we can establish connection or the check was disabled # Go to the main screen if we can establish connection or the check was disabled
# in the configs # in the configs
def start(self): def start(self):
if config[ 'Dev Settings' ][ 'disableconnectioncheck' ] != "True": if self._com.connect():
if self._com.connect():
self.manager.current = 'main'
self.manager.transition.direction = 'right'
else:
TwoActionPopup().open('Failed to connect', 'Details', self.open_details_popup)
print('ERROR connecting')
else:
self.manager.current = 'main' self.manager.current = 'main'
self.manager.transition.direction = 'right' self.manager.transition.direction = 'right'
else:
TwoActionPopup().open('Failed to connect', 'Details', self.open_details_popup)
print('ERROR connecting')
# Open popup for details as to why the connection failed # Open popup for details as to why the connection failed
def open_details_popup(self): def open_details_popup(self):

View File

@@ -10,7 +10,7 @@ import threading
# Load utilities # Load utilities
from lib.instructions import Instructions from lib.instructions import Instructions
from lib.com import Com from lib.com import ComSuperClass
from lib.decoder import Decoder from lib.decoder import Decoder
@@ -26,13 +26,13 @@ synced_queue: queue.Queue[List[str]] = queue.Queue()
# ╰────────────────────────────────────────────────╯ # ╰────────────────────────────────────────────────╯
# Using a Thread to run this in parallel to the UI to improve responsiveness # Using a Thread to run this in parallel to the UI to improve responsiveness
class ReaderThread(threading.Thread): class ReaderThread(threading.Thread):
_com: Com _com: ComSuperClass
_decoder: Decoder _decoder: Decoder
_instructions: Instructions _instructions: Instructions
# This method allows the user to set Com object to be used. # This method allows the user to set Com object to be used.
# The point of this is to allow for the use of a single Com object to not waste resources # The point of this is to allow for the use of a single Com object to not waste resources
def set_com(self, com: Com): def set_com(self, com: ComSuperClass):
"""Set the Com object to be used in this """Set the Com object to be used in this
Args: Args:
@@ -41,6 +41,7 @@ class ReaderThread(threading.Thread):
self._com = com self._com = com
self._run = True self._run = True
self._decoder = Decoder() self._decoder = Decoder()
self._instructions = Instructions(com)
# This method is given by the Thread class and has to be overriden to change # This method is given by the Thread class and has to be overriden to change
# what is executed when the thread starts # what is executed when the thread starts
@@ -98,7 +99,7 @@ class MainScreen(Screen):
# The constructor if this class takes a Com object to share one between all screens # The constructor if this class takes a Com object to share one between all screens
# to preserve resources and make handling better # to preserve resources and make handling better
def __init__(self, com: Com, **kw): def __init__(self, com: ComSuperClass, **kw):
# Set some variables # Set some variables
self._com = com self._com = com
self._event = None self._event = None
@@ -117,9 +118,11 @@ class MainScreen(Screen):
def start(self): def start(self):
self.ids.status.text = "Connecting..." self.ids.status.text = "Connecting..."
if self._com.connect(): if self._com.connect():
print("Acquired connection")
self._has_connected = True self._has_connected = True
# Start communication # Start communication
self._reader.start() self._reader.start()
print("Reader has started")
Clock.schedule_interval(self._update_screen, 0.5) Clock.schedule_interval(self._update_screen, 0.5)
else: else:
self.ids.status.text = "Connection failed" self.ids.status.text = "Connection failed"
@@ -145,10 +148,18 @@ class MainScreen(Screen):
pass pass
self._com.close() self._com.close()
self.ids.status.text = "Connection terminated" self.ids.status.text = "Connection terminated"
print("Connection terminated")
# A helper function to update the screen. Is called on an interval # A helper function to update the screen. Is called on an interval
def _update_screen(self): def _update_screen(self, dt):
update = synced_queue.get() update = []
try:
update = synced_queue.get_nowait()
except:
pass
if len(update) == 0:
# There are no updates to process, don't block and simply try again next time
return
if len(update) == 1: if len(update) == 1:
if update[0] == "ERR_HOOK": if update[0] == "ERR_HOOK":
self.ids.status.text = "Hook failed" self.ids.status.text = "Hook failed"

View File

@@ -2,7 +2,7 @@ from typing import Callable
from kivy.uix.popup import Popup from kivy.uix.popup import Popup
from kivy.lang import Builder from kivy.lang import Builder
from lib.com import Com from lib.com import ComSuperClass
# Just an empty function # Just an empty function
@@ -14,7 +14,7 @@ def empty_func():
# ╰────────────────────────────────────────────────╯ # ╰────────────────────────────────────────────────╯
# Below, you can find various popups with various designs that can be used in the app # Below, you can find various popups with various designs that can be used in the app
class QuitPopup(Popup): class QuitPopup(Popup):
def __init__(self, com: Com, **kw): def __init__(self, com: ComSuperClass, **kw):
self._com = com; self._com = com;
super().__init__(**kw) super().__init__(**kw)

View File

@@ -1,6 +1,6 @@
<ProgramScreen>: <ProgramScreen>:
name: "program" name: "program"
on_pre_enter: self.config_loader = root.load_config() on_enter: self.config_loader = root.load_config()
canvas.before: canvas.before:
Color: Color:
rgba: (50,50,50,0.2) rgba: (50,50,50,0.2)

View File

@@ -4,17 +4,17 @@ from kivy.lang import Builder
from lib.decoder import Decoder from lib.decoder import Decoder
from lib.instructions import Instructions from lib.instructions import Instructions
from gui.popups.popups import SingleRowPopup, TwoActionPopup, empty_func from gui.popups.popups import SingleRowPopup, TwoActionPopup, empty_func
from lib.com import Com from lib.com import ComSuperClass
from kivy.clock import Clock from kivy.clock import Clock
# The below list maps 0, 1, 2, 3 to a, b, c and t respectively # The below list maps 0, 1, 2, 3 to a, b, c and t respectively
# This is used to set and read values of the UI # This is used to set and read values of the UI
name_map = [ "a", "b", "c", "t" ] name_map = ["a", "b", "c", "t"]
class ProgramScreen(Screen): class ProgramScreen(Screen):
def __init__(self, com: Com, **kw): def __init__(self, com: ComSuperClass, **kw):
self._com = com self._com = com
self._instructions = Instructions(com) self._instructions = Instructions(com)
self._decoder = Decoder() self._decoder = Decoder()
@@ -31,7 +31,18 @@ class ProgramScreen(Screen):
# Load config for all four sensors # Load config for all four sensors
for _ in range(4): for _ in range(4):
# Receive 28 bytes of data # Receive 28 bytes of data
received = self._com.receive(28) received = bytes()
try:
received = self._com.receive(28)
except:
TwoActionPopup().open(
"Failed to connect to micro-controller, retry?",
"Cancel",
empty_func,
"Retry",
lambda: self._load(0),
)
return
# Create a list of strings to store the config for the sensor # Create a list of strings to store the config for the sensor
# This list has the following elements: a, b, c, temperature # This list has the following elements: a, b, c, temperature
@@ -39,7 +50,9 @@ class ProgramScreen(Screen):
# Create the list # Create the list
for j in range(4): for j in range(4):
config_sensor_i.append(str(self._decoder.decode_float(received[7 * j:7 * j + 6]))) config_sensor_i.append(
str(self._decoder.decode_float(received[7 * j : 7 * j + 6]))
)
# Add it to the config # Add it to the config
config.append(config_sensor_i) config.append(config_sensor_i)
@@ -56,7 +69,9 @@ class ProgramScreen(Screen):
def _set_ui(self, config: List[List[str]]): def _set_ui(self, config: List[List[str]]):
for sensor_id in range(4): for sensor_id in range(4):
for property in range(4): for property in range(4):
self.ids[f"s{sensor_id + 1}_{name_map[property]}"].text = config[sensor_id][property] self.ids[f"s{sensor_id + 1}_{name_map[property]}"].text = config[
sensor_id
][property]
# Read values from the UI. Returns the values as a list or None if the check was infringed # Read values from the UI. Returns the values as a list or None if the check was infringed
def _read_ui(self, enforce_none_empty: bool = True) -> List[float] | None: def _read_ui(self, enforce_none_empty: bool = True) -> List[float] | None:

View File

@@ -1,10 +1,11 @@
from abc import ABC, abstractmethod
from typing import Optional from typing import Optional
import serial import serial
import struct import struct
import serial.tools.list_ports import serial.tools.list_ports
class Com: class ComSuperClass(ABC):
def __init__(self, baudrate: int = 19200, filters: Optional[list[str]] = None) -> None: def __init__(self, baudrate: int = 19200, filters: Optional[list[str]] = None) -> None:
self._serial: Optional[serial.Serial] = None self._serial: Optional[serial.Serial] = None
self._filters = filters if filters != None else [ 'USB-Serial Controller', 'Prolific USB-Serial Controller' ] self._filters = filters if filters != None else [ 'USB-Serial Controller', 'Prolific USB-Serial Controller' ]
@@ -19,6 +20,32 @@ class Com:
def get_error(self) -> serial.SerialException | None: def get_error(self) -> serial.SerialException | None:
return self._err return self._err
@abstractmethod
def get_comport(self) -> str:
pass
@abstractmethod
def connect(self) -> bool:
pass
@abstractmethod
def close(self) -> None:
pass
@abstractmethod
def receive(self, byte_count: int) -> bytes:
pass
@abstractmethod
def send(self, msg: str) -> None:
pass
@abstractmethod
def send_float(self, msg: float) -> None:
pass
class Com(ComSuperClass):
def _connection_check(self) -> bool: def _connection_check(self) -> bool:
if self._serial == None: if self._serial == None:
return self._open() return self._open()

View File

@@ -1,4 +1,4 @@
from lib.com import Com from lib.com import ComSuperClass
import lib.decoder import lib.decoder
import time import time
@@ -9,7 +9,7 @@ decoder = lib.decoder.Decoder()
# Class that supports sending instructions to the microcontroller, # Class that supports sending instructions to the microcontroller,
# as well as hooking to data stream according to protocol # as well as hooking to data stream according to protocol
class Instructions: class Instructions:
def __init__(self, com: Com) -> None: def __init__(self, com: ComSuperClass) -> None:
self._com = com self._com = com
# Set a port override (to use a specific COM port) # Set a port override (to use a specific COM port)

View File

@@ -6,33 +6,55 @@ It simulates the behviour of an actual microcontroller being connected
from typing import Optional from typing import Optional
import queue import queue
import random import random
import serial
from lib.com import ComSuperClass
# This file contains a Com class that can be used to test the functionality # This file contains a Com class that can be used to test the functionality
# even without a microcontroller. It is not documented in a particularly # even without a microcontroller. It is not documented in a particularly
# beginner-friendly way, nor is the code written with beginner-friendliness # beginner-friendly way, nor is the code written with beginner-friendliness
# in mind. It is the most complicated piece of code of the entire application # in mind. It is the most complicated piece of code of the entire application
# All double __ prefixed properties are not available in the actual one # All double __ prefixed properties and methods are not available in the actual one
instruction_lut = {
"PR": "\nPR\n",
"PT": "\nPT\n",
"RD": "\nRD\n",
"NM": "\nNM\n",
"FM": "\nFM\n",
}
class Com: class Com(ComSuperClass):
def __init__(self) -> None: def __init__(self, baudrate: int = 19200, filters: Optional[list[str]] = None) -> None:
# Initialize queue with values to be sent on call of recieve (add like three or so at a time) # Calling the constructor of the super class to assign defaults
self._port_override = "" print("WARNING: Using testing library for communication!")
self.__mode = "" super().__init__(baudrate, filters);
self.__simulated_data = queue.Queue()
# Initialize queue with values to be sent on call of recieve
self.__simulated_data: queue.Queue[int] = queue.Queue()
# Keep track of the number of bytes sent to fulfil protocol
self.__bytes_sent: int = 0
# Initially, we are in normal mode (which leads to slower data intervals)
self.__mode = "NM"
def set_port_override(self, override: str) -> None: def set_port_override(self, override: str) -> None:
"""Set the port override, to disable port search""" """Set the port override, to disable port search"""
self._port_override = override self._port_override = override
def get_error(self) -> serial.SerialException | None:
pass
def get_comport(self) -> str: def get_comport(self) -> str:
return "test" if self._port_override != "" else self._port_override return "test" if self._port_override != "" else self._port_override
def connect(self) -> bool: def connect(self) -> bool:
# TODO: For testing, make cases where there is no successful connection, i.e. we return false # Randomly return false in 1 in 20 ish cases
# Randomly return false if random.randint(0, 20) == 1:
if random.randint(0, 20): print("Simulating error to connect")
return False return False
return True return True
@@ -41,6 +63,9 @@ class Com:
def receive(self, byte_count: int) -> bytes: def receive(self, byte_count: int) -> bytes:
# TODO: Make it return simulated data # TODO: Make it return simulated data
data = []
for i in range(byte_count):
data.append(self.__simulated_data.get_nowait())
return bytes("A", "ascii") return bytes("A", "ascii")
def send(self, msg: str) -> None: def send(self, msg: str) -> None:
@@ -51,5 +76,5 @@ class Com:
def send_float(self, msg: float) -> None: def send_float(self, msg: float) -> None:
pass pass
def _generate_random_value(self, precision: int) -> bytes: def __generate_random_value(self, precision: int) -> bytes:
return bytes(str(round(random.random() * precision) / precision), "ascii") return bytes(str(round(random.random() * precision) / precision), "ascii")