diff --git a/README.md b/README.md index 3bcf3cb..a20b32e 100644 --- a/README.md +++ b/README.md @@ -25,7 +25,13 @@ - + +BiogasControllerApp has just received a major rewrite, where I focused on code-readability, documentation and stability. The documentation in the code is aimed at beginners and does contain some unnecessary extra comments + +If you are here to read the code, the files you are most likely looking for can be found in the `biogascontrollerapp/lib` folder. If you want to understand and have a look at all of the application, start with the `biogascontrollerapp.py` file in the `biogascontrollerapp` folder + +# Features + ***LOOKING FOR A MacOS BUILD MAINTAINER! You may follow the official build instructions on the kivy.org website. All other materials should already be included in this repository*** ## FEATURES diff --git a/biogascontrollerapp/biogascontrollerapp.py b/biogascontrollerapp/biogascontrollerapp.py index c2fc2c4..f3e4a2a 100644 --- a/biogascontrollerapp/biogascontrollerapp.py +++ b/biogascontrollerapp/biogascontrollerapp.py @@ -1,42 +1,61 @@ +# ──────────────────────────────────────────────────────────────────── +# ╭────────────────────────────────────────────────╮ +# │ BiogasControllerApp │ +# ╰────────────────────────────────────────────────╯ +# +# So you would like to read the source code? Nice! +# Just be warned, this application uses Thread and a UI Toolkit called +# Kivy to run. If you are unsure of what functions do, consider +# checking out the kivy docs at https://kivy.org/doc. +# It also uses the pyserial library for communication with the micro- +# controller with RS232 +# +# ──────────────────────────────────────────────────────────────────── + import os import configparser from typing import override from lib.com import Com + +# Load the config file config = configparser.ConfigParser() -config.read('./config.ini') +config.read("./config.ini") # Load config and disable kivy log if necessary -if config['Dev Settings']['verbose'] == "True": +if config["Dev Settings"]["verbose"] == "True": pass else: os.environ["KIVY_NO_CONSOLELOG"] = "1" -# Load kivy modules +# Load kivy modules. Kivy is the UI framework used. See https://kivy.org # from kivy.core.window import Window, Config from kivy.uix.screenmanager import ScreenManager from kivy.app import App -# Load other libraries -# import threading # Store the current app version app_version = f"{config['Info']['version']}{config['Info']['subVersion']}" -#---------# -# Screens # -#---------# +# ╭────────────────────────────────────────────────╮ +# │ Screens │ +# ╰────────────────────────────────────────────────╯ +# Import all the screens (= pages) used in the app from gui.home.home import HomeScreen from gui.credits.credits import CreditsScreen +from gui.program.program import ProgramScreen from gui.about.about import AboutScreen from gui.main.main import MainScreen -#----------------# -# Screen Manager # -#----------------# + + +# ╭────────────────────────────────────────────────╮ +# │ Screen Manager │ +# ╰────────────────────────────────────────────────╯ +# Kivy uses a screen manager to manage pages in the application class BiogasControllerApp(App): def __init__(self, **kwargs): super().__init__(**kwargs) @@ -44,14 +63,17 @@ class BiogasControllerApp(App): @override def build(self): - com = Com(); - self.icon = './BiogasControllerAppLogo.png' - self.title = 'BiogasControllerApp-' + app_version - self.screen_manager.add_widget(HomeScreen(com, name='home')) - self.screen_manager.add_widget(MainScreen(com, name='main')) - self.screen_manager.add_widget(CreditsScreen(name='credits')) - self.screen_manager.add_widget(AboutScreen(name='about')) + com = Com() + self.icon = "./BiogasControllerAppLogo.png" + self.title = "BiogasControllerApp-" + app_version + self.screen_manager.add_widget(HomeScreen(com, name="home")) + self.screen_manager.add_widget(MainScreen(com, name="main")) + self.screen_manager.add_widget(ProgramScreen(com, name="program")) + self.screen_manager.add_widget(CreditsScreen(name="credits")) + self.screen_manager.add_widget(AboutScreen(name="about")) return self.screen_manager + +# Disallow this file to be imported if __name__ == "__main__": BiogasControllerApp().run() diff --git a/biogascontrollerapp/config.ini b/biogascontrollerapp/config.ini index 467137c..f92e8a9 100644 --- a/biogascontrollerapp/config.ini +++ b/biogascontrollerapp/config.ini @@ -8,7 +8,7 @@ sizew = 800 [Dev Settings] verbose = True log_level = DEBUG -disableconnectioncheck = False +disableconnectioncheck = True [License] show = 1 diff --git a/biogascontrollerapp/gui/home/home.py b/biogascontrollerapp/gui/home/home.py index 87b01f1..6393e78 100644 --- a/biogascontrollerapp/gui/home/home.py +++ b/biogascontrollerapp/gui/home/home.py @@ -1,30 +1,49 @@ from kivy.uix.screenmanager import Screen from kivy.lang import Builder -from gui.popups.popups import QuitPopup, SingleRowPopup, TwoActionPopup +from gui.popups.popups import QuitPopup, TwoActionPopup from lib.com import Com +import configparser + +config = configparser.ConfigParser() +config.read('./config.ini') + +# This is the launch screen, i.e. what you see when you start up the app class HomeScreen(Screen): def __init__(self, com: Com, **kw): self._com = com; super().__init__(**kw) + # Go to the main screen if we can establish connection or the check was disabled + # in the configs def start(self): - if self._com.connect(19200): + if config[ 'Dev Settings' ][ 'disableconnectioncheck' ] != "True": + if self._com.connect(): + self.manager.current = 'main' + self.manager.transition.direction = 'right' + else: + TwoActionPopup().open('Failed to connect', 'Details', self.open_details_popup) + print('ERROR connecting') + else: self.manager.current = 'main' self.manager.transition.direction = 'right' - else: - TwoActionPopup().open('Failed to connect', 'Details', self.open_details_popup) - print('ERROR connecting') + # Open popup for details as to why the connection failed def open_details_popup(self): + # TODO: Finish print( 'Details' ) + # Helper to open a Popup to ask user whether to quit or not def quit(self): QuitPopup(self._com).open() + # Switch to about screen def to_about(self): self.manager.current = 'about' self.manager.transition.direction = 'down' +# Load the design file for this screen (.kv files) +# The path has to be relative to root of the app, i.e. where the biogascontrollerapp.py +# file is located Builder.load_file('./gui/home/home.kv') diff --git a/biogascontrollerapp/gui/main/main.kv b/biogascontrollerapp/gui/main/main.kv index 9e19e1d..4e714b4 100644 --- a/biogascontrollerapp/gui/main/main.kv +++ b/biogascontrollerapp/gui/main/main.kv @@ -74,25 +74,7 @@ on_text: root.switch_mode(mode_selector.text) background_color: (255,0,0,0.6) if self.state == "normal" else (0,0,255,0.6) Button: - text: "Read Data" - size_hint: 0.15, 0.1 - pos_hint: {"x":0.3, "y":0.2} - background_color: (255, 0, 0, 0.6) - on_release: - root.end() - app.root.current = "read" - root.manager.transition.direction = "down" - Button: - text: "Temperature" - size_hint: 0.15, 0.1 - pos_hint: {"x":0.5, "y":0.2} - background_color: (255, 0, 0, 0.6) - on_release: - root.end() - app.root.current = "temperature" - root.manager.transition.direction = "down" - Button: - text: "Change all Data" + text: "Configuration" size_hint: 0.15, 0.1 pos_hint: {"x":0.7, "y":0.2} background_color: (255, 0, 0, 0.6) @@ -101,7 +83,7 @@ app.root.current = "program" root.manager.transition.direction = "down" Label: - id: frequency - text: "Frequency will appear here" + id: status + text: "Status will appear here" font_size: 10 pos_hint: {"x":0.4, "y": 0.3} diff --git a/biogascontrollerapp/gui/main/main.py b/biogascontrollerapp/gui/main/main.py index ccb8679..ed25ce9 100644 --- a/biogascontrollerapp/gui/main/main.py +++ b/biogascontrollerapp/gui/main/main.py @@ -1,25 +1,200 @@ +from ctypes import ArgumentError +from time import time +from typing import List, override from kivy.uix.screenmanager import Screen from kivy.lang import Builder +from gui.popups.popups import SingleRowPopup, TwoActionPopup, empty_func +from kivy.clock import Clock, ClockEvent +import queue +import threading +# Load utilities +from lib.instructions import Instructions from lib.com import Com +from lib.decoder import Decoder +# TODO: Consider consolidating start and stop button + + +# Queue with data that is used to synchronize +synced_queue: queue.Queue[List[str]] = queue.Queue() + + +# ╭────────────────────────────────────────────────╮ +# │ Data Reading Thread Helper │ +# ╰────────────────────────────────────────────────╯ +# Using a Thread to run this in parallel to the UI to improve responsiveness +class ReaderThread(threading.Thread): + _com: Com + _decoder: Decoder + _instructions: Instructions + + # This method allows the user to set Com object to be used. + # The point of this is to allow for the use of a single Com object to not waste resources + def set_com(self, com: Com): + """Set the Com object to be used in this + + Args: + com: The com object to be used + """ + self._com = com + self._run = True + self._decoder = Decoder() + + # This method is given by the Thread class and has to be overriden to change + # what is executed when the thread starts + @override + def run(self) -> None: + self._run = True + if self._com == None: + raise ArgumentError("Com object not passed in (do using set_com)") + # Hook to output stream + if self._instructions.hook("", ["\n", " ", " ", " "]): + # We are now hooked to the stream (i.e. data is synced) + synced_queue.put(["HOOK"]) + + # making it exit using the stop function + while self._run: + # Take note of the time before reading the data to deduce frequency of updates + start_time = time() + + # We need to read 68 bytes of data, given by the program running on the controller + received = self._com.receive(68) + + # Store the data in a list of strings + data: List[str] = [] + + # For all sensors connected, execute the same thing + for i in range(4): + # The slicing that happens here uses offsets automatically calculated from the sensor id + # This allows for short code + data.append( + f"Tadc: { + self._decoder.decode_float(received[12 * i:12 * i + 4]) + }\nTemperature: { + self._decoder.decode_float(received[12 * i + 5:12 * i + 11]) + }\nDuty-Cycle: { + self._decoder.decode_float_long(received[48 + 5 * i: 52 + 5 * i]) / 65535.0 * 100 + }%" + ) + # Calculate the frequency of updates + data.append(str(1 / (time() - start_time))) + else: + # Send error message to the UI updater + synced_queue.put(["ERR_HOOK"]) + return + + def stop(self) -> None: + self._run = False + + +# ╭────────────────────────────────────────────────╮ +# │ Main App Screen │ +# ╰────────────────────────────────────────────────╯ +# This is the main screen, where you can read out data class MainScreen(Screen): + _event: ClockEvent + + # The constructor if this class takes a Com object to share one between all screens + # to preserve resources and make handling better def __init__(self, com: Com, **kw): - self._com = com; + # Set some variables + self._com = com + self._event = None + + # Prepare the reader thread + self._reader = ReaderThread() + self._reader.setDaemon(True) + self._reader.set_com(com) + self._has_connected = False + + # Call the constructor for the Screen class super().__init__(**kw) + # Start the connection to the micro-controller to read data from it. + # This also now starts the reader thread to continuously read out data def start(self): - pass + self.ids.status.text = "Connecting..." + if self._com.connect(): + self._has_connected = True + # Start communication + self._reader.start() + Clock.schedule_interval(self._update_screen, 0.5) + else: + self.ids.status.text = "Connection failed" + TwoActionPopup().open( + "Failed to connect. Do you want to retry?", + "Cancel", + empty_func, + "Retry", + self.start, + ) + # End connection to micro-controller and set it back to normal mode def end(self): - pass + # Set micro-controller back to Normal Mode when ending communication + # to make sure temperature control will work + if self._has_connected: + if self._event != None: + self._event.cancel() + self._reader.stop() + try: + self._com.send("NM") + except: + pass + self._com.close() + self.ids.status.text = "Connection terminated" + # A helper function to update the screen. Is called on an interval + def _update_screen(self): + update = synced_queue.get() + if len(update) == 1: + if update[0] == "ERR_HOOK": + self.ids.status.text = "Hook failed" + self.end() + elif update[0] == "HOOK": + self.ids.status.text = "Connected to controller" + else: + self.ids.sensor1.text = update[0] + self.ids.sensor2.text = update[1] + self.ids.sensor3.text = update[2] + self.ids.sensor4.text = update[3] + self.ids.status.text = "Connected, f = " + update[4] + + # Reset the screen when the screen is entered def reset(self): - pass + self.ids.sensor1.text = "" + self.ids.sensor2.text = "" + self.ids.sensor3.text = "" + self.ids.sensor4.text = "" + self.ids.status.text = "Status will appear here" - def back(self): - pass + # Switch the mode for the micro-controller + def switch_mode(self, new_mode: str): + # Store if we have been connected to the micro-controller before mode was switched + was_connected = self._reader.is_alive + + # Disconnect from the micro-controller + self.end() + self.ids.status.text = "Setting mode..." + + # Try to set the new mode + try: + if new_mode == "Normal Mode": + self._com.send("NM") + else: + self._com.send("FM") + except: + SingleRowPopup().open("Failed to switch modes") + return + + # If we have been connected, reconnect + if was_connected: + self.start() -Builder.load_file('./gui/main/main.kv') +# Load the design file for this screen (.kv files) +# The path has to be relative to root of the app, i.e. where the biogascontrollerapp.py +# file is located +Builder.load_file("./gui/main/main.kv") diff --git a/biogascontrollerapp/gui/popups/popups.py b/biogascontrollerapp/gui/popups/popups.py index 41d5d16..350d2de 100644 --- a/biogascontrollerapp/gui/popups/popups.py +++ b/biogascontrollerapp/gui/popups/popups.py @@ -4,9 +4,15 @@ from kivy.lang import Builder from lib.com import Com + +# Just an empty function def empty_func(): pass +# ╭────────────────────────────────────────────────╮ +# │ Popups │ +# ╰────────────────────────────────────────────────╯ +# Below, you can find various popups with various designs that can be used in the app class QuitPopup(Popup): def __init__(self, com: Com, **kw): self._com = com; @@ -50,4 +56,8 @@ class TwoActionPopup(Popup): self.action_two = action_two return super().open(*_args, **kwargs) + +# Load the design file for this screen (.kv files) +# The path has to be relative to root of the app, i.e. where the biogascontrollerapp.py +# file is located Builder.load_file('./gui/popups/popups.kv') diff --git a/biogascontrollerapp/gui/program/program.kv b/biogascontrollerapp/gui/program/program.kv index e69de29..7d61e0d 100644 --- a/biogascontrollerapp/gui/program/program.kv +++ b/biogascontrollerapp/gui/program/program.kv @@ -0,0 +1,131 @@ +: + name: "program" + on_pre_enter: self.config_loader = root.load_config() + canvas.before: + Color: + rgba: (50,50,50,0.2) + Rectangle: + size: self.size + pos: self.pos + FloatLayout: + Label: + text: "Configuration" + font_size: 40 + color: (0, 113, 0, 1) + bold: True + pos_hint: {"y":0.4} + GridLayout: + size_hint: 0.8, 0.5 + pos_hint: {"x":0.1, "y":0.2} + cols: 4 + Label: + text: "Sensor 1, a:" + TextInput: + id: s1_a + multiline: False + input_filter: "float" + Label: + text: "Sensor 1, b:" + TextInput: + id: s1_b + multiline: False + input_filter: "float" + Label: + text: "Sensor 1, c:" + TextInput: + id: s1_c + multiline: False + input_filter: "float" + Label: + text: "Sensor 1, Temp:" + TextInput: + id: s1_t + multiline: False + input_filter: "float" + Label: + text: "Sensor 2, a:" + TextInput: + id: s2_a + multiline: False + input_filter: "float" + Label: + text: "Sensor 2, b:" + TextInput: + id: s2_b + multiline: False + input_filter: "float" + Label: + text: "Sensor 2, c:" + TextInput: + id: s2_c + multiline: False + input_filter: "float" + Label: + text: "Sensor 2, Temp:" + TextInput: + id: s2_t + multiline: False + input_filter: "float" + Label: + text: "Sensor 3, a:" + TextInput: + id: s3_a + multiline: False + input_filter: "float" + Label: + text: "Sensor 3, b:" + TextInput: + id: s3_b + multiline: False + input_filter: "float" + Label: + text: "Sensor 3, c:" + TextInput: + id: s3_c + multiline: False + input_filter: "float" + Label: + text: "Sensor 3, Temp:" + TextInput: + id: s3_t + multiline: False + input_filter: "float" + Label: + text: "Sensor 4, a:" + TextInput: + id: s4_a + multiline: False + input_filter: "float" + Label: + text: "Sensor 4, b:" + TextInput: + id: s4_b + multiline: False + input_filter: "float" + Label: + text: "Sensor 4, c:" + TextInput: + id: s4_c + multiline: False + input_filter: "float" + Label: + text: "Sensor 4, Temp:" + TextInput: + id: s4_t + multiline: False + input_filter: "float" + Button: + text: "Back" + size_hint: 0.1, 0.1 + pos_hint: {"x":0.1, "y":0.1} + background_color: (255, 0, 0, 0.6) + on_release: + app.root.current = "main" + root.manager.transition.direction = "up" + Button: + text: "Save" + size_hint: 0.2, 0.1 + pos_hint: {"x":0.6, "y":0.1} + background_color: (255, 0, 0, 0.6) + on_release: + root.save() diff --git a/biogascontrollerapp/gui/program/program.py b/biogascontrollerapp/gui/program/program.py index 3669d49..479e25e 100644 --- a/biogascontrollerapp/gui/program/program.py +++ b/biogascontrollerapp/gui/program/program.py @@ -1,20 +1,94 @@ +from typing import List from kivy.uix.screenmanager import Screen from kivy.lang import Builder +from lib.decoder import Decoder +from lib.instructions import Instructions +from gui.popups.popups import SingleRowPopup, TwoActionPopup, empty_func +from lib.com import Com +from kivy.clock import Clock -class HomeScreen(Screen): +# The below list maps 0, 1, 2, 3 to a, b, c and t respectively +# This is used to set and read values of the UI +name_map = [ "a", "b", "c", "t" ] + + +class ProgramScreen(Screen): def __init__(self, com: Com, **kw): - self._com = com; + self._com = com + self._instructions = Instructions(com) + self._decoder = Decoder() super().__init__(**kw) - def start(self): - pass + def load_config(self): + Clock.schedule_once(self._load) - def quit(self): - pass + # Load the current configuration from the micro-controller + def _load(self, dt: float): + if self._instructions.hook("RD", ["\n", "R", "D", "\n"]): + config: List[List[str]] = [] - def to_settings(self): - pass + # Load config for all four sensors + for _ in range(4): + # Receive 28 bytes of data + received = self._com.receive(28) + + # Create a list of strings to store the config for the sensor + # This list has the following elements: a, b, c, temperature + config_sensor_i: List[str] = [] + + # Create the list + for j in range(4): + config_sensor_i.append(str(self._decoder.decode_float(received[7 * j:7 * j + 6]))) + + # Add it to the config + config.append(config_sensor_i) + else: + TwoActionPopup().open( + "Failed to connect to micro-controller, retry?", + "Cancel", + empty_func, + "Retry", + lambda: self._load(0), + ) + + # Set the elements of the UI to the values of the config + def _set_ui(self, config: List[List[str]]): + for sensor_id in range(4): + for property in range(4): + self.ids[f"s{sensor_id + 1}_{name_map[property]}"].text = config[sensor_id][property] + + # Read values from the UI. Returns the values as a list or None if the check was infringed + def _read_ui(self, enforce_none_empty: bool = True) -> List[float] | None: + data: List[float] = [] + + # Iterate over all sensor config input fields and collect the data + for sensor_id in range(4): + for property in range(4): + value = self.ids[f"s{sensor_id + 1}_{name_map[property]}"].text + + # If requested (by setting enforce_none_empty to True, which is the default) + # test if the cells are not empty and if we find an empty cell return None + if enforce_none_empty and value == "": + return + data.append(float(value)) + + return data + + # Transmit the changed data to the micro-controller to reconfigure it + def save(self): + data = self._read_ui() + if data == None: + SingleRowPopup().open("Some fields are missing values!") + else: + try: + self._instructions.change_config(data) + except: + SingleRowPopup().open("Could not save data!") + SingleRowPopup().open("Data saved successfully") -Builder.load_file('./gui/home/home.kv') +# Load the design file for this screen (.kv files) +# The path has to be relative to root of the app, i.e. where the biogascontrollerapp.py +# file is located +Builder.load_file("./gui/program/program.kv") diff --git a/biogascontrollerapp/lib/com.py b/biogascontrollerapp/lib/com.py index 747ca10..4048b2a 100644 --- a/biogascontrollerapp/lib/com.py +++ b/biogascontrollerapp/lib/com.py @@ -5,11 +5,11 @@ import serial.tools.list_ports class Com: - def __init__(self, filters: Optional[list[str]] = None) -> None: + def __init__(self, baudrate: int = 19200, filters: Optional[list[str]] = None) -> None: self._serial: Optional[serial.Serial] = None self._filters = filters if filters != None else [ 'USB-Serial Controller', 'Prolific USB-Serial Controller' ] self._port_override = '' - self._baudrate = 19200 + self._baudrate = baudrate self._err = None def set_port_override(self, override: str) -> None: @@ -47,7 +47,7 @@ class Com: except: pass - return '' + return "" def _open(self) -> bool: comport = self.get_comport() @@ -63,9 +63,8 @@ class Com: else: return False - def connect(self, baud_rate: int) -> bool: + def connect(self) -> bool: """Try to find a comport and connect to the microcontroller. Returns the success as a boolean""" - self._baudrate = baud_rate return self._connection_check() def close(self) -> None: diff --git a/biogascontrollerapp/lib/instructions.py b/biogascontrollerapp/lib/instructions.py index 0db6b8a..892b452 100644 --- a/biogascontrollerapp/lib/instructions.py +++ b/biogascontrollerapp/lib/instructions.py @@ -1,61 +1,100 @@ -import lib.com +from lib.com import Com import lib.decoder import time # TODO: Load filters (for comport search) -com = lib.com.Com() decoder = lib.decoder.Decoder() -class Instructions: - def set_port_override(self, override: str) -> None: - com.set_port_override(override) - def _hook(self, instruction: str, sequence: list[str]) -> bool: +# Class that supports sending instructions to the microcontroller, +# as well as hooking to data stream according to protocol +class Instructions: + def __init__(self, com: Com) -> None: + self._com = com + + # Set a port override (to use a specific COM port) + def set_port_override(self, override: str) -> None: + self._com.set_port_override(override) + + # Helper method to hook to the data stream according to protocol. + # You can specify the sequence that the program listens to to sync up, + # as an array of strings, that should each be of length one and only contain + # ascii characters + def hook(self, instruction: str, sequence: list[str]) -> bool: + # Add protection: If we cannot establish connection, refuse to run + if not self._com.connect(): + return False + # Send instruction to microcontroller to start hooking process - com.send(instruction) - + # If instruction is an empty string, do not send instruction + + if instruction != "": + self._com.send(instruction) + # Record start time to respond to timeout start = time.time() - # Check for timeout + # The pointer below points to the element in the array which is the next expected character to be received pointer = 0 - sequence_max = len(sequence) - 1 + + # Simply the length of the sequence, since it is both cheaper and cleaner to calculate it once + sequence_max = len(sequence) + + # Only run for a limited amount of time while time.time() - start < 5: - if ( decoder.decode_ascii( com.receive(1) ) ) == sequence[pointer]: + # If the decoded ascii character is equal to the next expected character, move pointer right by one + # If not, jump back to start + if (decoder.decode_ascii(self._com.receive(1))) == sequence[pointer]: pointer += 1 else: pointer = 0 + # If the pointer has reached the end of the sequence, return True, as now the hook was successful if pointer == sequence_max: return True + # If we time out, which is the only way in which this code can be reached, return False return False - def _change_data(self, instruction: str, readback: list[str], data: list[float], readback_length: int) -> None: + # Private helper method to transmit data using the necessary protocols + def _change_data( + self, + instruction: str, + readback: list[str], + data: list[float], + readback_length: int, + ) -> None: # Hook to stream - if self._hook(instruction, readback): + if self.hook(instruction, readback): + # Transmit data while len(data) > 0: - if com.receive(readback_length) != '': - com.send_float(data.pop(0)) + # If we received data back, we can send more data, i.e. from this we know + # the controller has received the data + # If not, we close the connection and create an exception + if self._com.receive(readback_length) != "": + self._com.send_float(data.pop(0)) else: - com.close() - raise Exception('Failed to transmit data. No response from controller') - com.close() + self._com.close() + raise Exception( + "Failed to transmit data. No response from controller" + ) + self._com.close() else: - com.close() - raise ConnectionError('Failed to hook to controller data stream. No fitting response received') + self._com.close() + raise ConnectionError( + "Failed to hook to controller data stream. No fitting response received" + ) + # Abstraction of the _change_data method specifically designed to change the entire config def change_config(self, new_config: list[float]) -> None: - self._change_data('PR', ['\n', 'P', 'R', '\n'], new_config, 3) + try: + self._change_data("PR", ["\n", "P", "R", "\n"], new_config, 3) + except Exception as e: + raise e + # Abstraction of the _change_data method specifically designed to change only the configured temperature def change_temperature(self, temperatures: list[float]) -> None: - self._change_data('PT', ['\n', 'P', 'T', '\n'], temperatures, 3) - - def enable_fastmode(self) -> None: - com.send('FM') - com.close() - - def disable_fastmode(self) -> None: - com.send('NM') - com.close() - + try: + self._change_data("PT", ["\n", "P", "T", "\n"], temperatures, 3) + except Exception as e: + raise e diff --git a/biogascontrollerapp/lib/test/com.py b/biogascontrollerapp/lib/test/com.py index 4875b21..a7cc2f2 100644 --- a/biogascontrollerapp/lib/test/com.py +++ b/biogascontrollerapp/lib/test/com.py @@ -1,36 +1,55 @@ """ Library to be used in standalone mode (without microcontroller, for testing functionality) +It simulates the behviour of an actual microcontroller being connected """ from typing import Optional import queue +import random + +# This file contains a Com class that can be used to test the functionality +# even without a microcontroller. It is not documented in a particularly +# beginner-friendly way, nor is the code written with beginner-friendliness +# in mind. It is the most complicated piece of code of the entire application + +# All double __ prefixed properties are not available in the actual one class Com: def __init__(self) -> None: # Initialize queue with values to be sent on call of recieve (add like three or so at a time) - self._port_override = '' + self._port_override = "" + self.__mode = "" + self.__simulated_data = queue.Queue() def set_port_override(self, override: str) -> None: """Set the port override, to disable port search""" self._port_override = override def get_comport(self) -> str: - return 'test' if self._port_override != '' else self._port_override + return "test" if self._port_override != "" else self._port_override - def connect(self, baud_rate: int, port_override: Optional[str] = None) -> bool: - return True # TODO: For testing, make cases where there is no successful connection, i.e. we return false + def connect(self) -> bool: + # TODO: For testing, make cases where there is no successful connection, i.e. we return false + # Randomly return false + if random.randint(0, 20): + return False + return True def close(self) -> None: pass - def receive(self, byte_count: int) -> None: + def receive(self, byte_count: int) -> bytes: # TODO: Make it return simulated data - pass + return bytes("A", "ascii") def send(self, msg: str) -> None: # TODO: Use LUT to find what should be added to the queue for read + # Using LUT to reference pass def send_float(self, msg: float) -> None: pass + + def _generate_random_value(self, precision: int) -> bytes: + return bytes(str(round(random.random() * precision) / precision), "ascii") diff --git a/changelog b/changelog index 4c9d366..d40fc1a 100644 --- a/changelog +++ b/changelog @@ -1,5 +1,15 @@ ***CHANGELOG*** +V3.0-beta +- Redesigned GUI +- Consolidated multiple previously separate screens +- Completely rewritten backend +- Improved stability +- Cleaned, documented code + +OLD VERSIONS +------------ + DEVELOPMENT VERSIONS dev-V2rev1: @@ -103,4 +113,5 @@ V2.3 - ADDS logging (you can include the logs in a bugreport so the devs can pin-point the exact cause and replicate the error) - ADDS some settings through a config file - CHANGED License from NONE to GPL V3 -- BUGFIXES \ No newline at end of file +- BUGFIXES +