From a8ad40148f0c995266b52398815115b7ea6a43e0 Mon Sep 17 00:00:00 2001 From: Janis Hutz Date: Fri, 9 May 2025 11:03:49 +0200 Subject: [PATCH] Improve Com class, continue writing test --- biogascontrollerapp/biogascontrollerapp.py | 10 +++-- biogascontrollerapp/config.ini | 11 ++--- biogascontrollerapp/gui/home/home.py | 18 +++------ biogascontrollerapp/gui/main/main.py | 23 ++++++++--- biogascontrollerapp/gui/popups/popups.py | 4 +- biogascontrollerapp/gui/program/program.kv | 2 +- biogascontrollerapp/gui/program/program.py | 29 +++++++++---- biogascontrollerapp/lib/com.py | 29 ++++++++++++- biogascontrollerapp/lib/instructions.py | 4 +- biogascontrollerapp/lib/test/com.py | 47 +++++++++++++++++----- 10 files changed, 125 insertions(+), 52 deletions(-) diff --git a/biogascontrollerapp/biogascontrollerapp.py b/biogascontrollerapp/biogascontrollerapp.py index f3e4a2a..4be0a04 100644 --- a/biogascontrollerapp/biogascontrollerapp.py +++ b/biogascontrollerapp/biogascontrollerapp.py @@ -16,7 +16,8 @@ import os import configparser from typing import override -from lib.com import Com +from lib.com import Com, ComSuperClass +import lib.test.com # Load the config file @@ -24,7 +25,7 @@ config = configparser.ConfigParser() config.read("./config.ini") # Load config and disable kivy log if necessary -if config["Dev Settings"]["verbose"] == "True": +if config["Dev"]["verbose"] == "True": pass else: os.environ["KIVY_NO_CONSOLELOG"] = "1" @@ -63,7 +64,10 @@ class BiogasControllerApp(App): @override def build(self): - com = Com() + com: ComSuperClass = Com() + if config["Dev"]["use_test_library"] == "True": + com = lib.test.com.Com() + self.icon = "./BiogasControllerAppLogo.png" self.title = "BiogasControllerApp-" + app_version self.screen_manager.add_widget(HomeScreen(com, name="home")) diff --git a/biogascontrollerapp/config.ini b/biogascontrollerapp/config.ini index f92e8a9..faede0d 100644 --- a/biogascontrollerapp/config.ini +++ b/biogascontrollerapp/config.ini @@ -1,17 +1,14 @@ -[Port Settings] +[Ports] specificport = None -[UI Config] +[UI] sizeh = 600 sizew = 800 -[Dev Settings] +[Dev] verbose = True log_level = DEBUG -disableconnectioncheck = True - -[License] -show = 1 +use_test_library = True [Info] version = V2.3.0 diff --git a/biogascontrollerapp/gui/home/home.py b/biogascontrollerapp/gui/home/home.py index 6393e78..86630e2 100644 --- a/biogascontrollerapp/gui/home/home.py +++ b/biogascontrollerapp/gui/home/home.py @@ -1,32 +1,26 @@ from kivy.uix.screenmanager import Screen from kivy.lang import Builder from gui.popups.popups import QuitPopup, TwoActionPopup -from lib.com import Com +from lib.com import ComSuperClass import configparser -config = configparser.ConfigParser() -config.read('./config.ini') # This is the launch screen, i.e. what you see when you start up the app class HomeScreen(Screen): - def __init__(self, com: Com, **kw): + def __init__(self, com: ComSuperClass, **kw): self._com = com; super().__init__(**kw) # Go to the main screen if we can establish connection or the check was disabled # in the configs def start(self): - if config[ 'Dev Settings' ][ 'disableconnectioncheck' ] != "True": - if self._com.connect(): - self.manager.current = 'main' - self.manager.transition.direction = 'right' - else: - TwoActionPopup().open('Failed to connect', 'Details', self.open_details_popup) - print('ERROR connecting') - else: + if self._com.connect(): self.manager.current = 'main' self.manager.transition.direction = 'right' + else: + TwoActionPopup().open('Failed to connect', 'Details', self.open_details_popup) + print('ERROR connecting') # Open popup for details as to why the connection failed def open_details_popup(self): diff --git a/biogascontrollerapp/gui/main/main.py b/biogascontrollerapp/gui/main/main.py index ed25ce9..e15e816 100644 --- a/biogascontrollerapp/gui/main/main.py +++ b/biogascontrollerapp/gui/main/main.py @@ -10,7 +10,7 @@ import threading # Load utilities from lib.instructions import Instructions -from lib.com import Com +from lib.com import ComSuperClass from lib.decoder import Decoder @@ -26,13 +26,13 @@ synced_queue: queue.Queue[List[str]] = queue.Queue() # ╰────────────────────────────────────────────────╯ # Using a Thread to run this in parallel to the UI to improve responsiveness class ReaderThread(threading.Thread): - _com: Com + _com: ComSuperClass _decoder: Decoder _instructions: Instructions # This method allows the user to set Com object to be used. # The point of this is to allow for the use of a single Com object to not waste resources - def set_com(self, com: Com): + def set_com(self, com: ComSuperClass): """Set the Com object to be used in this Args: @@ -41,6 +41,7 @@ class ReaderThread(threading.Thread): self._com = com self._run = True self._decoder = Decoder() + self._instructions = Instructions(com) # This method is given by the Thread class and has to be overriden to change # what is executed when the thread starts @@ -98,7 +99,7 @@ class MainScreen(Screen): # The constructor if this class takes a Com object to share one between all screens # to preserve resources and make handling better - def __init__(self, com: Com, **kw): + def __init__(self, com: ComSuperClass, **kw): # Set some variables self._com = com self._event = None @@ -117,9 +118,11 @@ class MainScreen(Screen): def start(self): self.ids.status.text = "Connecting..." if self._com.connect(): + print("Acquired connection") self._has_connected = True # Start communication self._reader.start() + print("Reader has started") Clock.schedule_interval(self._update_screen, 0.5) else: self.ids.status.text = "Connection failed" @@ -145,10 +148,18 @@ class MainScreen(Screen): pass self._com.close() self.ids.status.text = "Connection terminated" + print("Connection terminated") # A helper function to update the screen. Is called on an interval - def _update_screen(self): - update = synced_queue.get() + def _update_screen(self, dt): + update = [] + try: + update = synced_queue.get_nowait() + except: + pass + if len(update) == 0: + # There are no updates to process, don't block and simply try again next time + return if len(update) == 1: if update[0] == "ERR_HOOK": self.ids.status.text = "Hook failed" diff --git a/biogascontrollerapp/gui/popups/popups.py b/biogascontrollerapp/gui/popups/popups.py index 350d2de..f366274 100644 --- a/biogascontrollerapp/gui/popups/popups.py +++ b/biogascontrollerapp/gui/popups/popups.py @@ -2,7 +2,7 @@ from typing import Callable from kivy.uix.popup import Popup from kivy.lang import Builder -from lib.com import Com +from lib.com import ComSuperClass # Just an empty function @@ -14,7 +14,7 @@ def empty_func(): # ╰────────────────────────────────────────────────╯ # Below, you can find various popups with various designs that can be used in the app class QuitPopup(Popup): - def __init__(self, com: Com, **kw): + def __init__(self, com: ComSuperClass, **kw): self._com = com; super().__init__(**kw) diff --git a/biogascontrollerapp/gui/program/program.kv b/biogascontrollerapp/gui/program/program.kv index 7d61e0d..cd50188 100644 --- a/biogascontrollerapp/gui/program/program.kv +++ b/biogascontrollerapp/gui/program/program.kv @@ -1,6 +1,6 @@ : name: "program" - on_pre_enter: self.config_loader = root.load_config() + on_enter: self.config_loader = root.load_config() canvas.before: Color: rgba: (50,50,50,0.2) diff --git a/biogascontrollerapp/gui/program/program.py b/biogascontrollerapp/gui/program/program.py index 479e25e..04d01ae 100644 --- a/biogascontrollerapp/gui/program/program.py +++ b/biogascontrollerapp/gui/program/program.py @@ -4,17 +4,17 @@ from kivy.lang import Builder from lib.decoder import Decoder from lib.instructions import Instructions from gui.popups.popups import SingleRowPopup, TwoActionPopup, empty_func -from lib.com import Com +from lib.com import ComSuperClass from kivy.clock import Clock # The below list maps 0, 1, 2, 3 to a, b, c and t respectively # This is used to set and read values of the UI -name_map = [ "a", "b", "c", "t" ] +name_map = ["a", "b", "c", "t"] class ProgramScreen(Screen): - def __init__(self, com: Com, **kw): + def __init__(self, com: ComSuperClass, **kw): self._com = com self._instructions = Instructions(com) self._decoder = Decoder() @@ -31,7 +31,18 @@ class ProgramScreen(Screen): # Load config for all four sensors for _ in range(4): # Receive 28 bytes of data - received = self._com.receive(28) + received = bytes() + try: + received = self._com.receive(28) + except: + TwoActionPopup().open( + "Failed to connect to micro-controller, retry?", + "Cancel", + empty_func, + "Retry", + lambda: self._load(0), + ) + return # Create a list of strings to store the config for the sensor # This list has the following elements: a, b, c, temperature @@ -39,7 +50,9 @@ class ProgramScreen(Screen): # Create the list for j in range(4): - config_sensor_i.append(str(self._decoder.decode_float(received[7 * j:7 * j + 6]))) + config_sensor_i.append( + str(self._decoder.decode_float(received[7 * j : 7 * j + 6])) + ) # Add it to the config config.append(config_sensor_i) @@ -56,12 +69,14 @@ class ProgramScreen(Screen): def _set_ui(self, config: List[List[str]]): for sensor_id in range(4): for property in range(4): - self.ids[f"s{sensor_id + 1}_{name_map[property]}"].text = config[sensor_id][property] + self.ids[f"s{sensor_id + 1}_{name_map[property]}"].text = config[ + sensor_id + ][property] # Read values from the UI. Returns the values as a list or None if the check was infringed def _read_ui(self, enforce_none_empty: bool = True) -> List[float] | None: data: List[float] = [] - + # Iterate over all sensor config input fields and collect the data for sensor_id in range(4): for property in range(4): diff --git a/biogascontrollerapp/lib/com.py b/biogascontrollerapp/lib/com.py index 4048b2a..da97ba8 100644 --- a/biogascontrollerapp/lib/com.py +++ b/biogascontrollerapp/lib/com.py @@ -1,10 +1,11 @@ +from abc import ABC, abstractmethod from typing import Optional import serial import struct import serial.tools.list_ports -class Com: +class ComSuperClass(ABC): def __init__(self, baudrate: int = 19200, filters: Optional[list[str]] = None) -> None: self._serial: Optional[serial.Serial] = None self._filters = filters if filters != None else [ 'USB-Serial Controller', 'Prolific USB-Serial Controller' ] @@ -19,6 +20,32 @@ class Com: def get_error(self) -> serial.SerialException | None: return self._err + @abstractmethod + def get_comport(self) -> str: + pass + + @abstractmethod + def connect(self) -> bool: + pass + + @abstractmethod + def close(self) -> None: + pass + + @abstractmethod + def receive(self, byte_count: int) -> bytes: + pass + + @abstractmethod + def send(self, msg: str) -> None: + pass + + @abstractmethod + def send_float(self, msg: float) -> None: + pass + + +class Com(ComSuperClass): def _connection_check(self) -> bool: if self._serial == None: return self._open() diff --git a/biogascontrollerapp/lib/instructions.py b/biogascontrollerapp/lib/instructions.py index 892b452..4b2bbc2 100644 --- a/biogascontrollerapp/lib/instructions.py +++ b/biogascontrollerapp/lib/instructions.py @@ -1,4 +1,4 @@ -from lib.com import Com +from lib.com import ComSuperClass import lib.decoder import time @@ -9,7 +9,7 @@ decoder = lib.decoder.Decoder() # Class that supports sending instructions to the microcontroller, # as well as hooking to data stream according to protocol class Instructions: - def __init__(self, com: Com) -> None: + def __init__(self, com: ComSuperClass) -> None: self._com = com # Set a port override (to use a specific COM port) diff --git a/biogascontrollerapp/lib/test/com.py b/biogascontrollerapp/lib/test/com.py index a7cc2f2..b6a0d65 100644 --- a/biogascontrollerapp/lib/test/com.py +++ b/biogascontrollerapp/lib/test/com.py @@ -6,33 +6,55 @@ It simulates the behviour of an actual microcontroller being connected from typing import Optional import queue import random +import serial + +from lib.com import ComSuperClass # This file contains a Com class that can be used to test the functionality # even without a microcontroller. It is not documented in a particularly # beginner-friendly way, nor is the code written with beginner-friendliness # in mind. It is the most complicated piece of code of the entire application -# All double __ prefixed properties are not available in the actual one +# All double __ prefixed properties and methods are not available in the actual one + +instruction_lut = { + "PR": "\nPR\n", + "PT": "\nPT\n", + "RD": "\nRD\n", + "NM": "\nNM\n", + "FM": "\nFM\n", +} -class Com: - def __init__(self) -> None: - # Initialize queue with values to be sent on call of recieve (add like three or so at a time) - self._port_override = "" - self.__mode = "" - self.__simulated_data = queue.Queue() +class Com(ComSuperClass): + def __init__(self, baudrate: int = 19200, filters: Optional[list[str]] = None) -> None: + # Calling the constructor of the super class to assign defaults + print("WARNING: Using testing library for communication!") + super().__init__(baudrate, filters); + + # Initialize queue with values to be sent on call of recieve + self.__simulated_data: queue.Queue[int] = queue.Queue() + + # Keep track of the number of bytes sent to fulfil protocol + self.__bytes_sent: int = 0 + + # Initially, we are in normal mode (which leads to slower data intervals) + self.__mode = "NM" def set_port_override(self, override: str) -> None: """Set the port override, to disable port search""" self._port_override = override + def get_error(self) -> serial.SerialException | None: + pass + def get_comport(self) -> str: return "test" if self._port_override != "" else self._port_override def connect(self) -> bool: - # TODO: For testing, make cases where there is no successful connection, i.e. we return false - # Randomly return false - if random.randint(0, 20): + # Randomly return false in 1 in 20 ish cases + if random.randint(0, 20) == 1: + print("Simulating error to connect") return False return True @@ -41,6 +63,9 @@ class Com: def receive(self, byte_count: int) -> bytes: # TODO: Make it return simulated data + data = [] + for i in range(byte_count): + data.append(self.__simulated_data.get_nowait()) return bytes("A", "ascii") def send(self, msg: str) -> None: @@ -51,5 +76,5 @@ class Com: def send_float(self, msg: float) -> None: pass - def _generate_random_value(self, precision: int) -> bytes: + def __generate_random_value(self, precision: int) -> bytes: return bytes(str(round(random.random() * precision) / precision), "ascii")