Config, Lots of docs, Format

Added a config validator and documented code that was previously
undocumented, for the plot_generator scripts, documented them.
This commit is contained in:
2025-06-16 16:36:18 +02:00
parent 3a6cd6af3d
commit 7905cb851a
14 changed files with 436 additions and 89 deletions

View File

@@ -13,25 +13,31 @@
# ──────────────────────────────────────────────────────────────────── # ────────────────────────────────────────────────────────────────────
# Load the config file # Load the config file
import configparser
import time import time
from lib.config import read_config, set_verbosity, str_to_bool
config = configparser.ConfigParser() verbose = str_to_bool(read_config("Dev", "verbose", "False", type_to_validate="bool"))
config.read("./config.ini") verbose = verbose if verbose != None else False
# Introducing tariffs to Python imports. # Introducing tariffs to Python imports.
# It was too funny of an idea to miss out on # It was too funny of an idea to miss out on
# You can enable or disable this in the config. # You can enable or disable this in the config.
# It is disabled by default # It is disabled by default
if config["Tariffs"]["impose_tariffs"] == "True": if str_to_bool(
read_config("Tariffs", "impose_tariffs", "False", type_to_validate="bool")
):
try: try:
import tariff import tariff
tariff.set( tariff.set(
{ {
"kivy": int(config["Tariffs"]["kivy_rate"]), "kivy": int(
"serial": int(config["Tariffs"]["pyserial_rate"]), read_config("Tariffs", "kivy_rate", "0", type_to_validate="int")
),
"serial": int(
read_config("Tariffs", "pyserial_rate", "0", type_to_validate="int")
),
} }
) )
except Exception as e: except Exception as e:
@@ -49,7 +55,7 @@ import lib.test.com
# Load config and disable kivy log if necessary # Load config and disable kivy log if necessary
if config["Dev"]["verbose"] == "True": if verbose:
pass pass
else: else:
os.environ["KIVY_NO_CONSOLELOG"] = "1" os.environ["KIVY_NO_CONSOLELOG"] = "1"
@@ -62,7 +68,10 @@ from kivymd.app import MDApp
# Set Window size # Set Window size
Window.size = (int(config["UI"]["width"]), int(config["UI"]["height"])) Window.size = (
int(int(read_config("UI", "width", "800", type_to_validate="int"))),
int(int(read_config("UI", "height", "600", type_to_validate="int"))),
)
# ╭────────────────────────────────────────────────╮ # ╭────────────────────────────────────────────────╮
@@ -79,6 +88,29 @@ from gui.main.main import MainScreen
# │ Screen Manager │ # │ Screen Manager │
# ╰────────────────────────────────────────────────╯ # ╰────────────────────────────────────────────────╯
# Kivy uses a screen manager to manage pages in the application # Kivy uses a screen manager to manage pages in the application
colors = [
"Red",
"Pink",
"Purple",
"DeepPurple",
"Indigo",
"Blue",
"LightBlue",
"Cyan",
"Teal",
"Green",
"LightGreen",
"Lime",
"Yellow",
"Amber",
"Orange",
"DeepOrange",
"Brown",
"Gray",
"BlueGray",
]
class BiogasControllerApp(MDApp): class BiogasControllerApp(MDApp):
def __init__(self, **kwargs): def __init__(self, **kwargs):
super().__init__(**kwargs) super().__init__(**kwargs)
@@ -87,35 +119,49 @@ class BiogasControllerApp(MDApp):
@override @override
def build(self): def build(self):
# Configure com # Configure com
conn = config["Connection"] filters = [
filters = [x for x in conn["filters"].split(",")] x
com: ComSuperClass = Com( for x in read_config(
int(conn["baudrate"]) if conn["baudrate"] != None else 19200, filters "Connection",
"filters",
"USB-Serial Controller, Prolific USB-Serial Controller",
).split(",")
]
baudrate = int(
read_config("Connection", "baudrate", "19200", type_to_validate="int")
) )
if config["Dev"]["use_test_library"] == "True":
com = lib.test.com.Com( com: ComSuperClass = Com(
int(config["Dev"]["fail_sim"]), baudrate,
int(conn["baudrate"]) if conn["baudrate"] != None else 19200,
filters, filters,
) )
com.set_port_override(conn["baudrate"])
self.theme_cls.theme_style = ( if str_to_bool(
"Dark" if config["UI"]["theme"] == None else config["UI"]["theme"] read_config("Dev", "use_test_library", "False", type_to_validate="bool")
):
com = lib.test.com.Com(
int(read_config("Dev", "fail_sim", "20", type_to_validate="int")),
baudrate,
filters,
)
com.set_port_override(read_config("Connection", "port_override", "None"))
self.theme_cls.theme_style = read_config(
"UI", "theme", "Dark", ["Dark", "Light"]
) )
self.theme_cls.material_style = "M3" self.theme_cls.material_style = "M3"
self.theme_cls.primary_palette = ( self.theme_cls.primary_palette = read_config(
"Green" "UI", "primary_color", "Green", colors
if config["UI"]["primary_color"] == None
else config["UI"]["primary_color"]
) )
self.theme_cls.accent_palette = ( self.theme_cls.accent_palette = read_config(
"Lime" "UI", "accent_color", "Lime", colors
if config["UI"]["accent_color"] == None
else config["UI"]["accent_color"]
) )
self.theme_cls.theme_style_switch_animation = False self.theme_cls.theme_style_switch_animation = False
if verbose:
print("\n", "-" * 20, "\n")
self.icon = "./BiogasControllerAppLogo.png" self.icon = "./BiogasControllerAppLogo.png"
self.title = "BiogasControllerApp-V3.1.0" self.title = "BiogasControllerApp-V3.1.0"
self.screen_manager.add_widget(HomeScreen(com, name="home")) self.screen_manager.add_widget(HomeScreen(com, name="home"))
@@ -148,5 +194,6 @@ if __name__ == "__main__":
=> Initializing.... => Initializing....
""" """
) )
set_verbosity(verbose)
BiogasControllerApp().run() BiogasControllerApp().run()
print("\n => Exiting!") print("\n => Exiting!")

View File

@@ -1,5 +1,5 @@
[Connection] [Connection]
override_port = None port_override = None
baudrate = 19200 baudrate = 19200
# List the names as which the adapter cable will show up separated by commas below # List the names as which the adapter cable will show up separated by commas below
# For ENATECH, the below is likely correct. # For ENATECH, the below is likely correct.
@@ -16,6 +16,7 @@ accent_color = Lime
[Dev] [Dev]
verbose = False verbose = False
use_test_library = False use_test_library = False
# One time out of how many (plus one) it should fail
fail_sim = 10 fail_sim = 10
[Tariffs] [Tariffs]

5
gui/README.md Normal file
View File

@@ -0,0 +1,5 @@
# GUI
This folder contains all files that are used for the GUI of the app.
It is written in KivyMD, so if you don't know what that is and you don't want to learn it,
there isn't much of use in here for you! - Just so you're warned

View File

@@ -4,25 +4,35 @@ from kivymd.uix.button import MDFlatButton
from kivy.lang import Builder from kivy.lang import Builder
import webbrowser import webbrowser
# Simple about screen
class AboutScreen(Screen): class AboutScreen(Screen):
def __init__(self, **kw): def __init__(self, **kw):
# Prepare dialog
self.opened_web_browser_dialog = MDDialog( self.opened_web_browser_dialog = MDDialog(
title="Open Link", title="Open Link",
text="Your webbrowser has been opened. Continue there", text="Your webbrowser has been opened. Continue there",
buttons=[ buttons=[
MDFlatButton(text="Ok", on_release=lambda _: self.opened_web_browser_dialog.dismiss()), MDFlatButton(
text="Ok",
on_release=lambda _: self.opened_web_browser_dialog.dismiss(),
),
], ],
) )
super().__init__(**kw) super().__init__(**kw)
def goto(self, loc: str): def goto(self, loc: str):
# Open web browser with links
if loc == "wiki": if loc == "wiki":
webbrowser.open('https://github.com/janishutz/BiogasControllerApp/wiki', new=2) webbrowser.open(
"https://github.com/janishutz/BiogasControllerApp/wiki", new=2
)
elif loc == "issues": elif loc == "issues":
webbrowser.open('https://github.com/janishutz/BiogasControllerApp/issues', new=2) webbrowser.open(
"https://github.com/janishutz/BiogasControllerApp/issues", new=2
)
elif loc == "repo": elif loc == "repo":
webbrowser.open('https://github.com/janishutz/BiogasControllerApp', new=2) webbrowser.open("https://github.com/janishutz/BiogasControllerApp", new=2)
self.opened_web_browser_dialog.open() self.opened_web_browser_dialog.open()
Builder.load_file('./gui/about/about.kv')
Builder.load_file("./gui/about/about.kv")

View File

@@ -1,3 +1,4 @@
from kivy.base import Clock
from kivymd.app import MDApp from kivymd.app import MDApp
from kivymd.uix.button import MDFlatButton from kivymd.uix.button import MDFlatButton
from kivymd.uix.dialog import MDDialog from kivymd.uix.dialog import MDDialog
@@ -48,9 +49,7 @@ class HomeScreen(MDScreen):
text="Cancel", text="Cancel",
on_release=lambda _: self.quit_dialog.dismiss(), on_release=lambda _: self.quit_dialog.dismiss(),
), ),
MDFlatButton( MDFlatButton(text="Quit", on_release=lambda _: self._quit()),
text="Quit", on_release=lambda _: self._quit()
),
], ],
) )
super().__init__(**kw) super().__init__(**kw)
@@ -59,9 +58,12 @@ class HomeScreen(MDScreen):
self._com.close() self._com.close()
MDApp.get_running_app().stop() MDApp.get_running_app().stop()
def start(self):
Clock.schedule_once(lambda _: self._start())
# Go to the main screen if we can establish connection or the check was disabled # Go to the main screen if we can establish connection or the check was disabled
# in the configs # in the configs
def start(self): def _start(self):
if self._com.connect(): if self._com.connect():
self.manager.current = "main" self.manager.current = "main"
self.manager.transition.direction = "right" self.manager.transition.direction = "right"
@@ -94,7 +96,6 @@ class HomeScreen(MDScreen):
"13" "13"
] = f"Incorrect permissions at {port}. Resolve by running 'sudo chmod 777 {port}'" ] = f"Incorrect permissions at {port}. Resolve by running 'sudo chmod 777 {port}'"
if port == "": if port == "":
return information[operating_system]["NO_COM"] return information[operating_system]["NO_COM"]

View File

@@ -112,6 +112,7 @@ class MainScreen(MDScreen):
self._event = None self._event = None
self._fast_mode = False self._fast_mode = False
# Set up Dialog for erros
self.connection_error_dialog = MDDialog( self.connection_error_dialog = MDDialog(
title="Connection", title="Connection",
text="Failed to connect. Do you wish to retry?", text="Failed to connect. Do you wish to retry?",
@@ -144,28 +145,39 @@ class MainScreen(MDScreen):
super().__init__(**kw) super().__init__(**kw)
def _prepare_reader(self): def _prepare_reader(self):
# Prepares the reader thread
self._reader = ReaderThread() self._reader = ReaderThread()
self._reader.daemon = True self._reader.daemon = True
self._reader.set_com(self._com) self._reader.set_com(self._com)
# Start the connection to the micro-controller to read data from it. # Small helper function that makes the UI not freeze by offloading
# This also now starts the reader thread to continuously read out data
def start(self): def start(self):
Clock.schedule_once(lambda _: self._start())
# Start the connection to the micro-controller to read data from it.
# This also starts the reader thread to continuously read out data
def _start(self):
# Prevent running multiple times # Prevent running multiple times
self.connection_error_dialog.dismiss() self.connection_error_dialog.dismiss()
if self._has_connected: if self._has_connected:
return return
# Some UI config
self.ids.status.text = "Connecting..." self.ids.status.text = "Connecting..."
if self._com.connect(): if self._com.connect():
print("[ COM ] Connection Acquired") print("[ COM ] Connection Acquired")
# Prevent multiple connections
self._has_connected = True self._has_connected = True
self._has_run = True self._has_run = True
if self._has_run: if self._has_run:
self._prepare_reader() self._prepare_reader()
# Start communication # Start communication
self._reader.start() self._reader.start()
print("[ COM ] Reader has started") print("[ COM ] Reader has started")
# Schedule UI updates
self._event = Clock.schedule_interval(self._update_screen, 0.5) self._event = Clock.schedule_interval(self._update_screen, 0.5)
else: else:
self.ids.status.text = "Connection failed" self.ids.status.text = "Connection failed"
@@ -179,15 +191,20 @@ class MainScreen(MDScreen):
if self._event != None: if self._event != None:
self._event.cancel() self._event.cancel()
self._reader.stop() self._reader.stop()
# Join the thread to end it safely
try: try:
self._reader.join() self._reader.join()
except: except:
pass pass
# Go back to Normal Mode on the Controller
# This is so you don't accidentally forget!
try: try:
self._com.send("NM") self._com.send("NM")
except: except:
pass pass
self._com.close() self._com.close()
if set_msg: if set_msg:
self.ids.status.text = "Connection terminated" self.ids.status.text = "Connection terminated"
@@ -202,18 +219,24 @@ class MainScreen(MDScreen):
update = synced_queue.get_nowait() update = synced_queue.get_nowait()
except: except:
pass pass
if len(update) == 0: if len(update) == 0:
# There are no updates to process, don't block and simply try again next time # There are no updates to process, don't block and simply try again next time
return return
if len(update) == 1: if len(update) == 1:
# Sync errors
if update[0] == "ERR_HOOK": if update[0] == "ERR_HOOK":
self.ids.status.text = "Hook failed" self.ids.status.text = "Hook failed"
self.end(False) self.end(False)
if len(update) == 2: if len(update) == 2:
# Connection successful
if update[0] == "HOOK": if update[0] == "HOOK":
self.ids.status.text = "Connected to controller" self.ids.status.text = "Connected to controller"
self.ids.port.text = "Port: " + update[1] self.ids.port.text = "Port: " + update[1]
else: else:
# Update the UI
self.ids.sensor1.text = update[0] self.ids.sensor1.text = update[0]
self.ids.sensor2.text = update[1] self.ids.sensor2.text = update[1]
self.ids.sensor3.text = update[2] self.ids.sensor3.text = update[2]

View File

@@ -20,15 +20,16 @@ class ProgramScreen(MDScreen):
self._instructions = Instructions(com) self._instructions = Instructions(com)
self._decoder = Decoder() self._decoder = Decoder()
# Configure Dialog
self.connection_error_dialog = MDDialog( self.connection_error_dialog = MDDialog(
title="Connection", title="Connection",
text="Failed to connect. Do you wish to retry?", text="Failed to connect. Do you wish to retry?",
buttons=[ buttons=[
MDFlatButton( MDFlatButton(
text="Cancel", text="Cancel",
on_release=lambda dt: self.connection_error_dialog.dismiss(), on_release=lambda _: self.connection_error_dialog.dismiss(),
), ),
MDFlatButton(text="Retry", on_release=lambda dt: self._load()), MDFlatButton(text="Retry", on_release=lambda _: self.load_config()),
], ],
) )
@@ -38,7 +39,7 @@ class ProgramScreen(MDScreen):
buttons=[ buttons=[
MDFlatButton( MDFlatButton(
text="Ok", text="Ok",
on_release=lambda dt: self.missing_fields_error_dialog.dismiss(), on_release=lambda _: self.missing_fields_error_dialog.dismiss(),
), ),
], ],
) )
@@ -49,7 +50,7 @@ class ProgramScreen(MDScreen):
buttons=[ buttons=[
MDFlatButton( MDFlatButton(
text="Ok", text="Ok",
on_release=lambda dt: self.save_error_dialog.dismiss(), on_release=lambda _: self.save_error_dialog.dismiss(),
), ),
], ],
) )
@@ -60,15 +61,16 @@ class ProgramScreen(MDScreen):
buttons=[ buttons=[
MDFlatButton( MDFlatButton(
text="Ok", text="Ok",
on_release=lambda dt: self.save_success_dialog.dismiss(), on_release=lambda _: self.save_success_dialog.dismiss(),
), ),
], ],
) )
super().__init__(**kw) super().__init__(**kw)
# Load the config (async to not freeze the UI)
def load_config(self): def load_config(self):
Clock.schedule_once(lambda dt: self._load()) Clock.schedule_once(lambda _: self._load())
# Load the current configuration from the micro-controller # Load the current configuration from the micro-controller
def _load(self): def _load(self):
@@ -131,8 +133,11 @@ class ProgramScreen(MDScreen):
return data return data
# Transmit the changed data to the micro-controller to reconfigure it
def save(self): def save(self):
Clock.schedule_once(lambda _: self._save())
# Transmit the changed data to the micro-controller to reconfigure it
def _save(self):
self.ids.status.text = "Saving..." self.ids.status.text = "Saving..."
data = self._read_ui() data = self._read_ui()
if data == None: if data == None:
@@ -140,14 +145,14 @@ class ProgramScreen(MDScreen):
else: else:
try: try:
self._instructions.change_config(data) self._instructions.change_config(data)
except Exception as e: except:
self.save_error_dialog.open() self.save_error_dialog.open()
return return
self.save_success_dialog.open() self.save_success_dialog.open()
self.ids.status.text = "Saved!" self.ids.status.text = "Saved!"
Clock.schedule_once(self.reset_update, 5) Clock.schedule_once(self.reset_update, 5)
def reset_update(self, dt): def reset_update(self, _):
self.ids.status.text = "" self.ids.status.text = ""
def validate_float(self, instance): def validate_float(self, instance):

View File

@@ -4,6 +4,22 @@ import serial
import struct import struct
import serial.tools.list_ports import serial.tools.list_ports
# The below class is abstract to have a consistent, targetable interface
# for both the real connection module and the simulation module
#
# If you are unaware of what classes are, you can mostly ignore the ComSuperClass
#
# For the interested, a quick rundown of what the benefits of doing it this way is:
# This class provides a way to have two wholly different implementations that have
# the same function interface (i.e. all functions take the same arguments)
#
# Another benefit of having classes is that we can pass a single instance around to
# various components and have one shared instance that all can modify, reducing some
# overhead.
#
# The actual implementation of most functions (called methods in OOP) are implemented
# in the Com class below.
class ComSuperClass(ABC): class ComSuperClass(ABC):
def __init__( def __init__(
@@ -52,6 +68,15 @@ class ComSuperClass(ABC):
pass pass
# ┌ ┐
# │ Main Com Class Implementation │
# └ ┘
# Below you can find what you were most likely looking for. This is the implementation of the communication with the microcontroller.
# You may also be interested in the decoder.py and instructions.py file, as the decoding and the hooking / syncing process are
# implemented there. It is recommended that you do NOT read the test/com.py file, as that one is only there for simulation purposes
# and is much more complicated than this here, if you are not well versed with Python or are struggling with the basics
class Com(ComSuperClass): class Com(ComSuperClass):
def _connection_check(self) -> bool: def _connection_check(self) -> bool:
if self._serial == None: if self._serial == None:
@@ -84,17 +109,30 @@ class Com(ComSuperClass):
return "" return ""
def _open(self) -> bool: def _open(self) -> bool:
"""Open the connection. Internal function, not to be called directly
Returns:
Boolean indicates if connection was successful or not
"""
# Get the com port the controller has connected to
comport = self.get_comport() comport = self.get_comport()
# Comport search returns empty string if search unsuccessful # Comport search returns empty string if search unsuccessful
if comport == "": if comport == "":
# Try to generate a new Serial object with the configuration of this class
# self._baudrate contains the baud rate and defaults to 19200
try: try:
self._serial = serial.Serial(comport, self._baudrate, timeout=5) self._serial = serial.Serial(comport, self._baudrate, timeout=5)
except serial.SerialException as e: except serial.SerialException as e:
# If an error occurs, catch it, handle it and store the error
# for the UI and return False to indicate failed connection
self._err = e self._err = e
return False return False
# Connection succeeded, return True
return True return True
else: else:
# Haven't found a comport
return False return False
def connect(self) -> bool: def connect(self) -> bool:
@@ -110,8 +148,13 @@ class Com(ComSuperClass):
pass pass
def receive(self, byte_count: int) -> bytes: def receive(self, byte_count: int) -> bytes:
"""Recieve bytes from microcontroller over serial. Returns bytes. Might want to decode using functions from lib.tools""" """Receive bytes from microcontroller over serial. Returns bytes. Might want to decode using functions from lib.decoder"""
# Check connection
self._connection_check() self._connection_check()
# Ignore this boilerplate (extra code), the body of the if is the only thing important.
# The reason for the boilerplate is that the type checker will notice that self._serial can be
# None, thus showing errors.
if self._serial != None: if self._serial != None:
return self._serial.read(byte_count) return self._serial.read(byte_count)
else: else:
@@ -119,7 +162,12 @@ class Com(ComSuperClass):
def send(self, msg: str) -> None: def send(self, msg: str) -> None:
"""Send a string over serial connection. Will open a connection if none is available""" """Send a string over serial connection. Will open a connection if none is available"""
# Check connection
self._connection_check() self._connection_check()
# Ignore this boilerplate (extra code), the body of the if is the only thing important.
# The reason for the boilerplate is that the type checker will notice that self._serial can be
# None, thus showing errors.
if self._serial != None: if self._serial != None:
self._serial.write(msg.encode()) self._serial.write(msg.encode())
else: else:
@@ -127,7 +175,12 @@ class Com(ComSuperClass):
def send_float(self, msg: float) -> None: def send_float(self, msg: float) -> None:
"""Send a float number over serial connection""" """Send a float number over serial connection"""
# Check connection
self._connection_check() self._connection_check()
# Ignore this boilerplate (extra code), the body of the if is the only thing important.
# The reason for the boilerplate is that the type checker will notice that self._serial can be
# None, thus showing errors.
if self._serial != None: if self._serial != None:
self._serial.write(bytearray(struct.pack(">f", msg))[0:3]) self._serial.write(bytearray(struct.pack(">f", msg))[0:3])
else: else:

144
lib/config.py Normal file
View File

@@ -0,0 +1,144 @@
import configparser
from typing import List
# Load the config
config = configparser.ConfigParser()
config.read("./config.ini")
global first_error
first_error = True
global is_verbose
is_verbose = True
def set_verbosity(verbose: bool):
global is_verbose
is_verbose = verbose
print("\n", "-" * 20, "\nValidating configuration...\n")
def str_to_bool(val: str) -> bool | None:
"""Convert a string to boolean, converting "True" and "true" to True, same for False
Args:
val: The value to try to convert
Returns:
Returns either a boolean if conversion was successful, or None if not a boolean
"""
return {"True": True, "true": True, "False": False, "false": False}.get(val, None)
def read_config(
key_0: str,
key_1: str,
default: str,
valid_entries: List[str] = [],
type_to_validate: str = "",
) -> str:
"""Read the configuration, report potential configuration issues and validate each entry
Args:
key_0: The first key (top level)
key_1: The second key (where the actual key-value pair is)
default: The default value to return if the check fails
valid_entries: [Optiona] The entries that are valid ones to check against
type_to_validate: [Optional] Data type to validate
Returns:
[TODO:return]
"""
# Try loading the keys
tmp = {}
try:
tmp = config[key_0]
except KeyError:
print_config_error(key_0, key_1, "", default, "unknown", index=1)
return default
value = ""
try:
value = tmp[key_1]
except KeyError:
print_config_error(key_0, key_1, "", default, "unknown")
return default
if len(value) == 0:
print_config_error(key_0, key_1, value, default, "not_empty")
# Validate input
if type_to_validate != "":
# Need to validate
if type_to_validate == "int":
try:
int(value)
except ValueError:
print_config_error(key_0, key_1, value, default, "int")
return default
if type_to_validate == "float":
try:
float(value)
except ValueError:
print_config_error(key_0, key_1, value, default, "float")
return default
if type_to_validate == "bool":
if str_to_bool(value) == None:
print_config_error(key_0, key_1, value, default, "bool")
return default
if len(valid_entries) > 0:
# Need to validate the names
try:
valid_entries.index(value)
except ValueError:
print_config_error(
key_0, key_1, value, default, "oneof", valid_entries=valid_entries
)
return default
return value
def print_config_error(
key_0: str,
key_1: str,
value: str,
default: str,
expected: str,
valid_entries: List[str] = [],
msg: str = "",
index: int = 1,
):
"""Print configuration errors to the shell
Args:
key_0: The first key (top level)
key_1: The second key (where the actual value is to be found)
expected: The data type expected. If unknown key, set to "unknown" and set index; If should be one of, use "oneof" and set valid_entries list
msg: The message to print
index: The index in the chain (i.e. if key_0 or key_1)
"""
if not is_verbose:
return
print(f" ==> Using default setting ({default}) for {key_0}.{key_1}")
if expected == "unknown":
# The field was unknown
print(f' -> Unknown field "{key_0 if index == 0 else key_1}"')
elif expected == "oneof":
print(
f' -> Invalid name "{value}". Has to be one of', ", ".join(valid_entries)
)
elif expected == "not_empty":
print(" -> Property is unexpectedly None")
elif expected == "bool":
print(f' -> Boolean property expected, but instead found "{value}".')
else:
print(f" -> Expected a config option of type {expected}.")
if msg != "":
print(msg)

View File

@@ -1,18 +1,24 @@
import struct import struct
# Decoder to decode various sent values from the microcontroller
class Decoder: class Decoder:
# Decode an ascii character
def decode_ascii(self, value: bytes) -> str: def decode_ascii(self, value: bytes) -> str:
try: try:
return value.decode() return value.decode()
except: except:
return 'Error' return "Error"
# Decode a float (6 bits)
def decode_float(self, value: bytes) -> float: def decode_float(self, value: bytes) -> float:
return struct.unpack('>f', bytes.fromhex(str(value, 'ascii') + '00'))[0] return struct.unpack(">f", bytes.fromhex(str(value, "ascii") + "00"))[0]
# Decode a float, but with additional offsets
def decode_float_long(self, value: bytes) -> float: def decode_float_long(self, value: bytes) -> float:
return struct.unpack('>f', bytes.fromhex(str(value, 'ascii') + '0000'))[0] return struct.unpack(">f", bytes.fromhex(str(value, "ascii") + "0000"))[0]
# Decode an int
def decode_int(self, value: bytes) -> int: def decode_int(self, value: bytes) -> int:
# return int.from_bytes(value, 'big') # return int.from_bytes(value, 'big')
return int(value, base=16) return int(value, base=16)

View File

@@ -2,7 +2,6 @@ from lib.com import ComSuperClass
import lib.decoder import lib.decoder
import time import time
# TODO: Load filters (for comport search)
decoder = lib.decoder.Decoder() decoder = lib.decoder.Decoder()
@@ -35,12 +34,13 @@ class Instructions:
# Only run for a limited amount of time # Only run for a limited amount of time
while time.time() - start < 5: while time.time() - start < 5:
# If the decoded ascii character is equal to the next expected character, move pointer right by one # Receive and decode a single byte and decode as ASCII
# If not, jump back to start data = decoder.decode_ascii(self._com.receive(1))
data = decoder.decode_ascii(self._com.receive(1));
if data == sequence[pointer]: if data == sequence[pointer]:
# Increment the pointer (move to next element in the List)
pointer += 1 pointer += 1
else: else:
# Jump back to start
pointer = 0 pointer = 0
# If the pointer has reached the end of the sequence, return True, as now the hook was successful # If the pointer has reached the end of the sequence, return True, as now the hook was successful
@@ -58,16 +58,26 @@ class Instructions:
# Wait to find a CR character (enter) # Wait to find a CR character (enter)
char = decoder.decode_ascii(self._com.receive(1)) char = decoder.decode_ascii(self._com.receive(1))
while char != "\n": while char != "\n":
# Check for timeout
if time.time() - start > 3: if time.time() - start > 3:
return False return False
# Set the next character by receiving and decoding it as ASCII
char = decoder.decode_ascii(self._com.receive(1)) char = decoder.decode_ascii(self._com.receive(1))
# Store the position in the hooking process # Store the position in the hooking process
state = 0 state = 0
distance = 0 distance = 0
# While we haven't timed out and have not reached the last state execute
# The last state indicates that the sync was successful
while time.time() - start < 5 and state < 3: while time.time() - start < 5 and state < 3:
# Receive the next char and decode it as ASCII
char = decoder.decode_ascii(self._com.receive(1)) char = decoder.decode_ascii(self._com.receive(1))
# The character we look for when syncing is Space (ASCII char 32 (decimal))
# It is sent every 4 bits. If we have received 3 with the correct distance from
# the previous in a row, we are synced
if char == " ": if char == " ":
if distance == 4: if distance == 4:
state += 1 state += 1
@@ -79,6 +89,7 @@ class Instructions:
else: else:
distance += 1 distance += 1
# Read 5 more bits to correctly sync up
self._com.receive(5) self._com.receive(5)
return state == 3 return state == 3

View File

@@ -11,12 +11,17 @@ import struct
from lib.com import ComSuperClass from lib.com import ComSuperClass
# ┌ ┐
# │ Testing Module For Com │
# └ ┘
# This file contains a Com class that can be used to test the functionality # This file contains a Com class that can be used to test the functionality
# even without a microcontroller. It is not documented in a particularly # even without a microcontroller. It is not documented in a particularly
# beginner-friendly way, nor is the code written with beginner-friendliness # beginner-friendly way, nor is the code written with beginner-friendliness
# in mind. It is the most complicated piece of code of the entire application # in mind. It is the most complicated piece of code of the entire application
# All double __ prefixed properties and methods are not available in the actual one # ────────────────────────────────────────────────────────────────────
# All double __ prefixed properties and methods are not available in the actual impl
instruction_lut: dict[str, list[str]] = { instruction_lut: dict[str, list[str]] = {
"PR": ["\n", "P", "R", "\n"], "PR": ["\n", "P", "R", "\n"],
@@ -162,6 +167,7 @@ class Com(ComSuperClass):
self.__add_ascii_char("\n") self.__add_ascii_char("\n")
def __fill_queue(self): def __fill_queue(self):
# Simulate a full cycle
for _ in range(4): for _ in range(4):
self.__add_integer_as_hex(self.__generate_random_int(200)) self.__add_integer_as_hex(self.__generate_random_int(200))
self.__simulated_data.put(bytes(" ", "ascii")) self.__simulated_data.put(bytes(" ", "ascii"))

View File

@@ -6,45 +6,73 @@ n = int(input("Sensor number to be printed: "))
file = "" file = ""
def generate_plot(): def generate_plot():
reader = csv.reader(file, delimiter=',') # Read data using the CSV library
reader = csv.reader(file, delimiter=",")
# Create a list from the data
data = list(reader) data = list(reader)
data.sort(key=lambda imp: float(imp[2]))
lenght = len(data) # Sort the list using a lambda sort descriptor
# A lambda function is an anonymous function (= an unnamed function),
# which makes it convenient. A sort descriptor is a function that
# (usually, but not here) returns a value indicating which of two values
# come before or after in the ordering.
# Here, instead we simply return a floating point value for each data point
data.sort(key=lambda data_point: float(data_point[2]))
# Store the x and y coordinates in two arrays
x = [] x = []
y = [] y = []
for _ in range(lenght): for _ in range(len(data)):
extract = data.pop(0) # Extract the data point
sensor = int(extract.pop(0)) data_point = data.pop(0)
sensor = int(data_point.pop(0))
if sensor == n: if sensor == n:
ye = extract.pop(0) y.append(float(data_point.pop(0)))
xe = extract.pop(0) x.append(float(data_point.pop(0)))
y.append(float(ye))
x.append(float(xe))
# Use Numpy's polyfit function to fit a 2nd degree polynomial to the points using quadratic regression
# This function returns an array with the coefficients
fit = np.polyfit(x, y, 2) fit = np.polyfit(x, y, 2)
# The formula to output to the plot
formula = f"F(U) = {round(float(fit[0]), 4)}U^2+{round(float(fit[1]), 4)}U+{round(float(fit[2]), 4)}" formula = f"F(U) = {round(float(fit[0]), 4)}U^2+{round(float(fit[1]), 4)}U+{round(float(fit[2]), 4)}"
fit_fn = np.poly1d(fit) # Create a fit function from the previously determined coefficients
fit_fn = np.poly1d(fit) # Returns a function that takes a list of x-coordinate as argument
# Plot the line on the graph
plt.plot(x, fit_fn(x), color="BLUE", label="T(U)") plt.plot(x, fit_fn(x), color="BLUE", label="T(U)")
# Scatter Plot the data points that we have
plt.scatter(x, y, color="MAGENTA", marker="o", label="Data") plt.scatter(x, y, color="MAGENTA", marker="o", label="Data")
# Label the graph
plt.ylabel("Temperature") plt.ylabel("Temperature")
plt.xlabel("Voltage") plt.xlabel("Voltage")
title = 'Sensor MCP9701A #{}'.format(n) plt.title("Sensor MCP9701A #{}".format(n))
plt.title(title)
# Scale the axis appropriately
plt.axis((0.6, 2.0, 15.0, 70.0)) plt.axis((0.6, 2.0, 15.0, 70.0))
# Print a legend and set the graph to be annotated
plt.legend(loc="lower right") plt.legend(loc="lower right")
plt.annotate(formula, xy=(0.85, 60)) plt.annotate(formula, xy=(0.85, 60))
# Enable the background grid
plt.grid(True) plt.grid(True)
# Finally, show the graph
plt.show() plt.show()
# Get user input whether to save the plot or not
saveit = input("Do you wish to save the plot? (y/N) ").lower() saveit = input("Do you wish to save the plot? (y/N) ").lower()
if saveit == "y": if saveit == "y":
# Save the plot as Sensor[Number] (e.g. Sensor9) as png, pdf and svg
plt.savefig("Sensor" + str(n) + ".png") plt.savefig("Sensor" + str(n) + ".png")
plt.savefig("Sensor" + str(n) + ".pdf", format="pdf") plt.savefig("Sensor" + str(n) + ".pdf", format="pdf")
plt.savefig("Sensor" + str(n) + ".svg", format="svg") plt.savefig("Sensor" + str(n) + ".svg", format="svg")
@@ -52,7 +80,11 @@ def generate_plot():
else: else:
print("==> Images discarded") print("==> Images discarded")
# Since we have defined a function above as a function, this here is executed first
filename = input("Please enter a file path to the csv file to be plotted: ") filename = input("Please enter a file path to the csv file to be plotted: ")
# Try to open the file
try: try:
file = open(filename, "r") file = open(filename, "r")
generate_plot() generate_plot()

View File

@@ -4,29 +4,36 @@ import matplotlib.pyplot as plt
import csv import csv
import os import os
# Get user input for various data
path = input("Path to csv-file to be plotted: ") path = input("Path to csv-file to be plotted: ")
print("For the below, it is recommended to enter data in this format: yyyy-mm-dd-hh-mm")
date = input("Date & time at which the measurement was taken (approx.): ") date = input("Date & time at which the measurement was taken (approx.): ")
group = input("Group-name: ") group = input("Group-name: ")
saveit = input("Should the graph be saved? (y/n) ").lower() saveit = input("Should the graph be saved? (y/n) ").lower()
imp = open(path, "r") imp = open(path, "r")
reader = csv.reader(imp, delimiter=',') reader = csv.reader(imp, delimiter=",")
rohdaten = list(reader) data = list(reader)
lenght = len(rohdaten)
x = [] x = []
y = [] y = []
for i in range(lenght): for i in range(len(data)):
extract = rohdaten.pop(0) # Extract the data
extract = data.pop(0)
x.append(float(extract.pop(0))) x.append(float(extract.pop(0)))
y.append(float(extract.pop(0))) y.append(float(extract.pop(0)))
# Set up plot
plt.plot(x, y, color="MAGENTA") plt.plot(x, y, color="MAGENTA")
plt.xlabel("Time") plt.xlabel("Time")
plt.ylabel("Voltage") plt.ylabel("Voltage")
title = f"GC - Biogasanlage {date}"
plt.title(title) plt.title(f"GC - Biogasanlage {date}")
plt.grid(True) plt.grid(True)
if saveit == "y":
# Check if user wants to save the image
if saveit == "n":
print("didn't save images")
else:
pos = 0 pos = 0
for letter in path[::-1]: for letter in path[::-1]:
if letter == "/": if letter == "/":
@@ -40,11 +47,7 @@ if saveit == "y":
os.mkdir(save_path) os.mkdir(save_path)
except FileExistsError: except FileExistsError:
pass pass
plt.savefig(save_path) plt.savefig(f"{save_path}/GC-{date}-{group}.png")
os.rename(f"{save_path}/.png", f"{save_path}/GC-{date}-{group}.png")
print(f"saved images to {save_path}") print(f"Saved images to {save_path}")
else:
print("didn't save images")
plt.show() plt.show()