diff --git a/biogascontrollerapp.py b/biogascontrollerapp.py index 1918f27..76d20e4 100644 --- a/biogascontrollerapp.py +++ b/biogascontrollerapp.py @@ -13,25 +13,31 @@ # ──────────────────────────────────────────────────────────────────── # Load the config file -import configparser import time +from lib.config import read_config, set_verbosity, str_to_bool -config = configparser.ConfigParser() -config.read("./config.ini") +verbose = str_to_bool(read_config("Dev", "verbose", "False", type_to_validate="bool")) +verbose = verbose if verbose != None else False # Introducing tariffs to Python imports. # It was too funny of an idea to miss out on # You can enable or disable this in the config. # It is disabled by default -if config["Tariffs"]["impose_tariffs"] == "True": +if str_to_bool( + read_config("Tariffs", "impose_tariffs", "False", type_to_validate="bool") +): try: import tariff tariff.set( { - "kivy": int(config["Tariffs"]["kivy_rate"]), - "serial": int(config["Tariffs"]["pyserial_rate"]), + "kivy": int( + read_config("Tariffs", "kivy_rate", "0", type_to_validate="int") + ), + "serial": int( + read_config("Tariffs", "pyserial_rate", "0", type_to_validate="int") + ), } ) except Exception as e: @@ -49,7 +55,7 @@ import lib.test.com # Load config and disable kivy log if necessary -if config["Dev"]["verbose"] == "True": +if verbose: pass else: os.environ["KIVY_NO_CONSOLELOG"] = "1" @@ -62,7 +68,10 @@ from kivymd.app import MDApp # Set Window size -Window.size = (int(config["UI"]["width"]), int(config["UI"]["height"])) +Window.size = ( + int(int(read_config("UI", "width", "800", type_to_validate="int"))), + int(int(read_config("UI", "height", "600", type_to_validate="int"))), +) # ╭────────────────────────────────────────────────╮ @@ -79,6 +88,29 @@ from gui.main.main import MainScreen # │ Screen Manager │ # ╰────────────────────────────────────────────────╯ # Kivy uses a screen manager to manage pages in the application +colors = [ + "Red", + "Pink", + "Purple", + "DeepPurple", + "Indigo", + "Blue", + "LightBlue", + "Cyan", + "Teal", + "Green", + "LightGreen", + "Lime", + "Yellow", + "Amber", + "Orange", + "DeepOrange", + "Brown", + "Gray", + "BlueGray", +] + + class BiogasControllerApp(MDApp): def __init__(self, **kwargs): super().__init__(**kwargs) @@ -87,35 +119,49 @@ class BiogasControllerApp(MDApp): @override def build(self): # Configure com - conn = config["Connection"] - filters = [x for x in conn["filters"].split(",")] - com: ComSuperClass = Com( - int(conn["baudrate"]) if conn["baudrate"] != None else 19200, filters + filters = [ + x + for x in read_config( + "Connection", + "filters", + "USB-Serial Controller, Prolific USB-Serial Controller", + ).split(",") + ] + + baudrate = int( + read_config("Connection", "baudrate", "19200", type_to_validate="int") ) - if config["Dev"]["use_test_library"] == "True": + + com: ComSuperClass = Com( + baudrate, + filters, + ) + + if str_to_bool( + read_config("Dev", "use_test_library", "False", type_to_validate="bool") + ): com = lib.test.com.Com( - int(config["Dev"]["fail_sim"]), - int(conn["baudrate"]) if conn["baudrate"] != None else 19200, + int(read_config("Dev", "fail_sim", "20", type_to_validate="int")), + baudrate, filters, ) - com.set_port_override(conn["baudrate"]) + com.set_port_override(read_config("Connection", "port_override", "None")) - self.theme_cls.theme_style = ( - "Dark" if config["UI"]["theme"] == None else config["UI"]["theme"] + self.theme_cls.theme_style = read_config( + "UI", "theme", "Dark", ["Dark", "Light"] ) self.theme_cls.material_style = "M3" - self.theme_cls.primary_palette = ( - "Green" - if config["UI"]["primary_color"] == None - else config["UI"]["primary_color"] + self.theme_cls.primary_palette = read_config( + "UI", "primary_color", "Green", colors ) - self.theme_cls.accent_palette = ( - "Lime" - if config["UI"]["accent_color"] == None - else config["UI"]["accent_color"] + self.theme_cls.accent_palette = read_config( + "UI", "accent_color", "Lime", colors ) self.theme_cls.theme_style_switch_animation = False + if verbose: + print("\n", "-" * 20, "\n") + self.icon = "./BiogasControllerAppLogo.png" self.title = "BiogasControllerApp-V3.1.0" self.screen_manager.add_widget(HomeScreen(com, name="home")) @@ -148,5 +194,6 @@ if __name__ == "__main__": => Initializing.... """ ) + set_verbosity(verbose) BiogasControllerApp().run() print("\n => Exiting!") diff --git a/config.ini b/config.ini index 7d90dbd..960d51e 100644 --- a/config.ini +++ b/config.ini @@ -1,5 +1,5 @@ [Connection] -override_port = None +port_override = None baudrate = 19200 # List the names as which the adapter cable will show up separated by commas below # For ENATECH, the below is likely correct. @@ -16,6 +16,7 @@ accent_color = Lime [Dev] verbose = False use_test_library = False +# One time out of how many (plus one) it should fail fail_sim = 10 [Tariffs] diff --git a/gui/README.md b/gui/README.md new file mode 100644 index 0000000..68ebde2 --- /dev/null +++ b/gui/README.md @@ -0,0 +1,5 @@ +# GUI +This folder contains all files that are used for the GUI of the app. + +It is written in KivyMD, so if you don't know what that is and you don't want to learn it, +there isn't much of use in here for you! - Just so you're warned diff --git a/gui/about/about.py b/gui/about/about.py index 645cf60..437928d 100644 --- a/gui/about/about.py +++ b/gui/about/about.py @@ -4,25 +4,35 @@ from kivymd.uix.button import MDFlatButton from kivy.lang import Builder import webbrowser - +# Simple about screen class AboutScreen(Screen): def __init__(self, **kw): + # Prepare dialog self.opened_web_browser_dialog = MDDialog( title="Open Link", text="Your webbrowser has been opened. Continue there", buttons=[ - MDFlatButton(text="Ok", on_release=lambda _: self.opened_web_browser_dialog.dismiss()), + MDFlatButton( + text="Ok", + on_release=lambda _: self.opened_web_browser_dialog.dismiss(), + ), ], ) super().__init__(**kw) def goto(self, loc: str): + # Open web browser with links if loc == "wiki": - webbrowser.open('https://github.com/janishutz/BiogasControllerApp/wiki', new=2) + webbrowser.open( + "https://github.com/janishutz/BiogasControllerApp/wiki", new=2 + ) elif loc == "issues": - webbrowser.open('https://github.com/janishutz/BiogasControllerApp/issues', new=2) + webbrowser.open( + "https://github.com/janishutz/BiogasControllerApp/issues", new=2 + ) elif loc == "repo": - webbrowser.open('https://github.com/janishutz/BiogasControllerApp', new=2) + webbrowser.open("https://github.com/janishutz/BiogasControllerApp", new=2) self.opened_web_browser_dialog.open() -Builder.load_file('./gui/about/about.kv') + +Builder.load_file("./gui/about/about.kv") diff --git a/gui/home/home.py b/gui/home/home.py index bd9fb2c..c4620cb 100644 --- a/gui/home/home.py +++ b/gui/home/home.py @@ -1,3 +1,4 @@ +from kivy.base import Clock from kivymd.app import MDApp from kivymd.uix.button import MDFlatButton from kivymd.uix.dialog import MDDialog @@ -48,9 +49,7 @@ class HomeScreen(MDScreen): text="Cancel", on_release=lambda _: self.quit_dialog.dismiss(), ), - MDFlatButton( - text="Quit", on_release=lambda _: self._quit() - ), + MDFlatButton(text="Quit", on_release=lambda _: self._quit()), ], ) super().__init__(**kw) @@ -59,9 +58,12 @@ class HomeScreen(MDScreen): self._com.close() MDApp.get_running_app().stop() + def start(self): + Clock.schedule_once(lambda _: self._start()) + # Go to the main screen if we can establish connection or the check was disabled # in the configs - def start(self): + def _start(self): if self._com.connect(): self.manager.current = "main" self.manager.transition.direction = "right" @@ -94,7 +96,6 @@ class HomeScreen(MDScreen): "13" ] = f"Incorrect permissions at {port}. Resolve by running 'sudo chmod 777 {port}'" - if port == "": return information[operating_system]["NO_COM"] diff --git a/gui/main/main.py b/gui/main/main.py index 9e7715c..bafbae4 100644 --- a/gui/main/main.py +++ b/gui/main/main.py @@ -112,6 +112,7 @@ class MainScreen(MDScreen): self._event = None self._fast_mode = False + # Set up Dialog for erros self.connection_error_dialog = MDDialog( title="Connection", text="Failed to connect. Do you wish to retry?", @@ -144,28 +145,39 @@ class MainScreen(MDScreen): super().__init__(**kw) def _prepare_reader(self): + # Prepares the reader thread self._reader = ReaderThread() self._reader.daemon = True self._reader.set_com(self._com) - # Start the connection to the micro-controller to read data from it. - # This also now starts the reader thread to continuously read out data + # Small helper function that makes the UI not freeze by offloading def start(self): + Clock.schedule_once(lambda _: self._start()) + + # Start the connection to the micro-controller to read data from it. + # This also starts the reader thread to continuously read out data + def _start(self): # Prevent running multiple times self.connection_error_dialog.dismiss() if self._has_connected: return + # Some UI config self.ids.status.text = "Connecting..." if self._com.connect(): print("[ COM ] Connection Acquired") + + # Prevent multiple connections self._has_connected = True self._has_run = True if self._has_run: self._prepare_reader() + # Start communication self._reader.start() print("[ COM ] Reader has started") + + # Schedule UI updates self._event = Clock.schedule_interval(self._update_screen, 0.5) else: self.ids.status.text = "Connection failed" @@ -179,15 +191,20 @@ class MainScreen(MDScreen): if self._event != None: self._event.cancel() self._reader.stop() + + # Join the thread to end it safely try: self._reader.join() except: pass + # Go back to Normal Mode on the Controller + # This is so you don't accidentally forget! try: self._com.send("NM") except: pass + self._com.close() if set_msg: self.ids.status.text = "Connection terminated" @@ -202,18 +219,24 @@ class MainScreen(MDScreen): update = synced_queue.get_nowait() except: pass + if len(update) == 0: # There are no updates to process, don't block and simply try again next time return + if len(update) == 1: + # Sync errors if update[0] == "ERR_HOOK": self.ids.status.text = "Hook failed" self.end(False) + if len(update) == 2: + # Connection successful if update[0] == "HOOK": self.ids.status.text = "Connected to controller" self.ids.port.text = "Port: " + update[1] else: + # Update the UI self.ids.sensor1.text = update[0] self.ids.sensor2.text = update[1] self.ids.sensor3.text = update[2] diff --git a/gui/program/program.py b/gui/program/program.py index 73fb772..c2fd360 100644 --- a/gui/program/program.py +++ b/gui/program/program.py @@ -20,15 +20,16 @@ class ProgramScreen(MDScreen): self._instructions = Instructions(com) self._decoder = Decoder() + # Configure Dialog self.connection_error_dialog = MDDialog( title="Connection", text="Failed to connect. Do you wish to retry?", buttons=[ MDFlatButton( text="Cancel", - on_release=lambda dt: self.connection_error_dialog.dismiss(), + on_release=lambda _: self.connection_error_dialog.dismiss(), ), - MDFlatButton(text="Retry", on_release=lambda dt: self._load()), + MDFlatButton(text="Retry", on_release=lambda _: self.load_config()), ], ) @@ -38,7 +39,7 @@ class ProgramScreen(MDScreen): buttons=[ MDFlatButton( text="Ok", - on_release=lambda dt: self.missing_fields_error_dialog.dismiss(), + on_release=lambda _: self.missing_fields_error_dialog.dismiss(), ), ], ) @@ -49,7 +50,7 @@ class ProgramScreen(MDScreen): buttons=[ MDFlatButton( text="Ok", - on_release=lambda dt: self.save_error_dialog.dismiss(), + on_release=lambda _: self.save_error_dialog.dismiss(), ), ], ) @@ -60,15 +61,16 @@ class ProgramScreen(MDScreen): buttons=[ MDFlatButton( text="Ok", - on_release=lambda dt: self.save_success_dialog.dismiss(), + on_release=lambda _: self.save_success_dialog.dismiss(), ), ], ) super().__init__(**kw) + # Load the config (async to not freeze the UI) def load_config(self): - Clock.schedule_once(lambda dt: self._load()) + Clock.schedule_once(lambda _: self._load()) # Load the current configuration from the micro-controller def _load(self): @@ -131,8 +133,11 @@ class ProgramScreen(MDScreen): return data - # Transmit the changed data to the micro-controller to reconfigure it def save(self): + Clock.schedule_once(lambda _: self._save()) + + # Transmit the changed data to the micro-controller to reconfigure it + def _save(self): self.ids.status.text = "Saving..." data = self._read_ui() if data == None: @@ -140,14 +145,14 @@ class ProgramScreen(MDScreen): else: try: self._instructions.change_config(data) - except Exception as e: + except: self.save_error_dialog.open() return self.save_success_dialog.open() self.ids.status.text = "Saved!" Clock.schedule_once(self.reset_update, 5) - def reset_update(self, dt): + def reset_update(self, _): self.ids.status.text = "" def validate_float(self, instance): diff --git a/lib/com.py b/lib/com.py index 54e2efd..f3f1d71 100644 --- a/lib/com.py +++ b/lib/com.py @@ -4,6 +4,22 @@ import serial import struct import serial.tools.list_ports +# The below class is abstract to have a consistent, targetable interface +# for both the real connection module and the simulation module +# +# If you are unaware of what classes are, you can mostly ignore the ComSuperClass +# +# For the interested, a quick rundown of what the benefits of doing it this way is: +# This class provides a way to have two wholly different implementations that have +# the same function interface (i.e. all functions take the same arguments) +# +# Another benefit of having classes is that we can pass a single instance around to +# various components and have one shared instance that all can modify, reducing some +# overhead. +# +# The actual implementation of most functions (called methods in OOP) are implemented +# in the Com class below. + class ComSuperClass(ABC): def __init__( @@ -52,6 +68,15 @@ class ComSuperClass(ABC): pass +# ┌ ┐ +# │ Main Com Class Implementation │ +# └ ┘ +# Below you can find what you were most likely looking for. This is the implementation of the communication with the microcontroller. +# You may also be interested in the decoder.py and instructions.py file, as the decoding and the hooking / syncing process are +# implemented there. It is recommended that you do NOT read the test/com.py file, as that one is only there for simulation purposes +# and is much more complicated than this here, if you are not well versed with Python or are struggling with the basics + + class Com(ComSuperClass): def _connection_check(self) -> bool: if self._serial == None: @@ -84,17 +109,30 @@ class Com(ComSuperClass): return "" def _open(self) -> bool: + """Open the connection. Internal function, not to be called directly + + Returns: + Boolean indicates if connection was successful or not + """ + # Get the com port the controller has connected to comport = self.get_comport() # Comport search returns empty string if search unsuccessful if comport == "": + # Try to generate a new Serial object with the configuration of this class + # self._baudrate contains the baud rate and defaults to 19200 try: self._serial = serial.Serial(comport, self._baudrate, timeout=5) except serial.SerialException as e: + # If an error occurs, catch it, handle it and store the error + # for the UI and return False to indicate failed connection self._err = e return False + + # Connection succeeded, return True return True else: + # Haven't found a comport return False def connect(self) -> bool: @@ -110,8 +148,13 @@ class Com(ComSuperClass): pass def receive(self, byte_count: int) -> bytes: - """Recieve bytes from microcontroller over serial. Returns bytes. Might want to decode using functions from lib.tools""" + """Receive bytes from microcontroller over serial. Returns bytes. Might want to decode using functions from lib.decoder""" + # Check connection self._connection_check() + + # Ignore this boilerplate (extra code), the body of the if is the only thing important. + # The reason for the boilerplate is that the type checker will notice that self._serial can be + # None, thus showing errors. if self._serial != None: return self._serial.read(byte_count) else: @@ -119,7 +162,12 @@ class Com(ComSuperClass): def send(self, msg: str) -> None: """Send a string over serial connection. Will open a connection if none is available""" + # Check connection self._connection_check() + + # Ignore this boilerplate (extra code), the body of the if is the only thing important. + # The reason for the boilerplate is that the type checker will notice that self._serial can be + # None, thus showing errors. if self._serial != None: self._serial.write(msg.encode()) else: @@ -127,7 +175,12 @@ class Com(ComSuperClass): def send_float(self, msg: float) -> None: """Send a float number over serial connection""" + # Check connection self._connection_check() + + # Ignore this boilerplate (extra code), the body of the if is the only thing important. + # The reason for the boilerplate is that the type checker will notice that self._serial can be + # None, thus showing errors. if self._serial != None: self._serial.write(bytearray(struct.pack(">f", msg))[0:3]) else: diff --git a/lib/config.py b/lib/config.py new file mode 100644 index 0000000..1c4b431 --- /dev/null +++ b/lib/config.py @@ -0,0 +1,144 @@ +import configparser +from typing import List + +# Load the config +config = configparser.ConfigParser() +config.read("./config.ini") + +global first_error +first_error = True + +global is_verbose +is_verbose = True + + +def set_verbosity(verbose: bool): + global is_verbose + is_verbose = verbose + + print("\n", "-" * 20, "\nValidating configuration...\n") + + +def str_to_bool(val: str) -> bool | None: + """Convert a string to boolean, converting "True" and "true" to True, same for False + + Args: + val: The value to try to convert + + Returns: + Returns either a boolean if conversion was successful, or None if not a boolean + """ + return {"True": True, "true": True, "False": False, "false": False}.get(val, None) + + +def read_config( + key_0: str, + key_1: str, + default: str, + valid_entries: List[str] = [], + type_to_validate: str = "", +) -> str: + """Read the configuration, report potential configuration issues and validate each entry + + Args: + key_0: The first key (top level) + key_1: The second key (where the actual key-value pair is) + default: The default value to return if the check fails + valid_entries: [Optiona] The entries that are valid ones to check against + type_to_validate: [Optional] Data type to validate + + Returns: + [TODO:return] + """ + # Try loading the keys + tmp = {} + try: + tmp = config[key_0] + except KeyError: + print_config_error(key_0, key_1, "", default, "unknown", index=1) + return default + + value = "" + try: + value = tmp[key_1] + except KeyError: + print_config_error(key_0, key_1, "", default, "unknown") + return default + + if len(value) == 0: + print_config_error(key_0, key_1, value, default, "not_empty") + + # Validate input + if type_to_validate != "": + # Need to validate + if type_to_validate == "int": + try: + int(value) + except ValueError: + print_config_error(key_0, key_1, value, default, "int") + return default + if type_to_validate == "float": + try: + float(value) + except ValueError: + print_config_error(key_0, key_1, value, default, "float") + return default + + if type_to_validate == "bool": + if str_to_bool(value) == None: + print_config_error(key_0, key_1, value, default, "bool") + return default + + if len(valid_entries) > 0: + # Need to validate the names + try: + valid_entries.index(value) + except ValueError: + print_config_error( + key_0, key_1, value, default, "oneof", valid_entries=valid_entries + ) + return default + + return value + + +def print_config_error( + key_0: str, + key_1: str, + value: str, + default: str, + expected: str, + valid_entries: List[str] = [], + msg: str = "", + index: int = 1, +): + """Print configuration errors to the shell + + Args: + key_0: The first key (top level) + key_1: The second key (where the actual value is to be found) + expected: The data type expected. If unknown key, set to "unknown" and set index; If should be one of, use "oneof" and set valid_entries list + msg: The message to print + index: The index in the chain (i.e. if key_0 or key_1) + """ + if not is_verbose: + return + + print(f" ==> Using default setting ({default}) for {key_0}.{key_1}") + + if expected == "unknown": + # The field was unknown + print(f' -> Unknown field "{key_0 if index == 0 else key_1}"') + elif expected == "oneof": + print( + f' -> Invalid name "{value}". Has to be one of', ", ".join(valid_entries) + ) + elif expected == "not_empty": + print(" -> Property is unexpectedly None") + elif expected == "bool": + print(f' -> Boolean property expected, but instead found "{value}".') + else: + print(f" -> Expected a config option of type {expected}.") + + if msg != "": + print(msg) diff --git a/lib/decoder.py b/lib/decoder.py index dc37975..b8eeebb 100644 --- a/lib/decoder.py +++ b/lib/decoder.py @@ -1,18 +1,24 @@ import struct + +# Decoder to decode various sent values from the microcontroller class Decoder: + # Decode an ascii character def decode_ascii(self, value: bytes) -> str: try: return value.decode() except: - return 'Error' + return "Error" + # Decode a float (6 bits) def decode_float(self, value: bytes) -> float: - return struct.unpack('>f', bytes.fromhex(str(value, 'ascii') + '00'))[0] + return struct.unpack(">f", bytes.fromhex(str(value, "ascii") + "00"))[0] + # Decode a float, but with additional offsets def decode_float_long(self, value: bytes) -> float: - return struct.unpack('>f', bytes.fromhex(str(value, 'ascii') + '0000'))[0] + return struct.unpack(">f", bytes.fromhex(str(value, "ascii") + "0000"))[0] + # Decode an int def decode_int(self, value: bytes) -> int: # return int.from_bytes(value, 'big') return int(value, base=16) diff --git a/lib/instructions.py b/lib/instructions.py index 9ae3c8c..f9f094e 100644 --- a/lib/instructions.py +++ b/lib/instructions.py @@ -2,7 +2,6 @@ from lib.com import ComSuperClass import lib.decoder import time -# TODO: Load filters (for comport search) decoder = lib.decoder.Decoder() @@ -35,12 +34,13 @@ class Instructions: # Only run for a limited amount of time while time.time() - start < 5: - # If the decoded ascii character is equal to the next expected character, move pointer right by one - # If not, jump back to start - data = decoder.decode_ascii(self._com.receive(1)); + # Receive and decode a single byte and decode as ASCII + data = decoder.decode_ascii(self._com.receive(1)) if data == sequence[pointer]: + # Increment the pointer (move to next element in the List) pointer += 1 else: + # Jump back to start pointer = 0 # If the pointer has reached the end of the sequence, return True, as now the hook was successful @@ -58,16 +58,26 @@ class Instructions: # Wait to find a CR character (enter) char = decoder.decode_ascii(self._com.receive(1)) while char != "\n": + # Check for timeout if time.time() - start > 3: return False + + # Set the next character by receiving and decoding it as ASCII char = decoder.decode_ascii(self._com.receive(1)) # Store the position in the hooking process state = 0 distance = 0 + # While we haven't timed out and have not reached the last state execute + # The last state indicates that the sync was successful while time.time() - start < 5 and state < 3: + # Receive the next char and decode it as ASCII char = decoder.decode_ascii(self._com.receive(1)) + + # The character we look for when syncing is Space (ASCII char 32 (decimal)) + # It is sent every 4 bits. If we have received 3 with the correct distance from + # the previous in a row, we are synced if char == " ": if distance == 4: state += 1 @@ -79,6 +89,7 @@ class Instructions: else: distance += 1 + # Read 5 more bits to correctly sync up self._com.receive(5) return state == 3 diff --git a/lib/test/com.py b/lib/test/com.py index eca0d32..057dc70 100644 --- a/lib/test/com.py +++ b/lib/test/com.py @@ -11,12 +11,17 @@ import struct from lib.com import ComSuperClass +# ┌ ┐ +# │ Testing Module For Com │ +# └ ┘ # This file contains a Com class that can be used to test the functionality # even without a microcontroller. It is not documented in a particularly # beginner-friendly way, nor is the code written with beginner-friendliness # in mind. It is the most complicated piece of code of the entire application -# All double __ prefixed properties and methods are not available in the actual one +# ──────────────────────────────────────────────────────────────────── + +# All double __ prefixed properties and methods are not available in the actual impl instruction_lut: dict[str, list[str]] = { "PR": ["\n", "P", "R", "\n"], @@ -162,6 +167,7 @@ class Com(ComSuperClass): self.__add_ascii_char("\n") def __fill_queue(self): + # Simulate a full cycle for _ in range(4): self.__add_integer_as_hex(self.__generate_random_int(200)) self.__simulated_data.put(bytes(" ", "ascii")) diff --git a/plot_generator/fit.py b/plot_generator/fit.py index b5234ff..810504c 100644 --- a/plot_generator/fit.py +++ b/plot_generator/fit.py @@ -6,53 +6,85 @@ n = int(input("Sensor number to be printed: ")) file = "" + def generate_plot(): - reader = csv.reader(file, delimiter=',') + # Read data using the CSV library + reader = csv.reader(file, delimiter=",") + + # Create a list from the data data = list(reader) - data.sort(key=lambda imp: float(imp[2])) - lenght = len(data) + + # Sort the list using a lambda sort descriptor + # A lambda function is an anonymous function (= an unnamed function), + # which makes it convenient. A sort descriptor is a function that + # (usually, but not here) returns a value indicating which of two values + # come before or after in the ordering. + # Here, instead we simply return a floating point value for each data point + data.sort(key=lambda data_point: float(data_point[2])) + + # Store the x and y coordinates in two arrays x = [] y = [] - for _ in range(lenght): - extract = data.pop(0) - sensor = int(extract.pop(0)) + for _ in range(len(data)): + # Extract the data point + data_point = data.pop(0) + sensor = int(data_point.pop(0)) if sensor == n: - ye = extract.pop(0) - xe = extract.pop(0) - y.append(float(ye)) - x.append(float(xe)) + y.append(float(data_point.pop(0))) + x.append(float(data_point.pop(0))) + # Use Numpy's polyfit function to fit a 2nd degree polynomial to the points using quadratic regression + # This function returns an array with the coefficients fit = np.polyfit(x, y, 2) + # The formula to output to the plot formula = f"F(U) = {round(float(fit[0]), 4)}U^2+{round(float(fit[1]), 4)}U+{round(float(fit[2]), 4)}" - fit_fn = np.poly1d(fit) + # Create a fit function from the previously determined coefficients + fit_fn = np.poly1d(fit) # Returns a function that takes a list of x-coordinate as argument + # Plot the line on the graph plt.plot(x, fit_fn(x), color="BLUE", label="T(U)") + # Scatter Plot the data points that we have plt.scatter(x, y, color="MAGENTA", marker="o", label="Data") + + # Label the graph plt.ylabel("Temperature") plt.xlabel("Voltage") - title = 'Sensor MCP9701A #{}'.format(n) - plt.title(title) + plt.title("Sensor MCP9701A #{}".format(n)) + + # Scale the axis appropriately plt.axis((0.6, 2.0, 15.0, 70.0)) + + # Print a legend and set the graph to be annotated plt.legend(loc="lower right") plt.annotate(formula, xy=(0.85, 60)) + + # Enable the background grid plt.grid(True) + + # Finally, show the graph plt.show() + # Get user input whether to save the plot or not saveit = input("Do you wish to save the plot? (y/N) ").lower() if saveit == "y": - plt.savefig("Sensor"+str(n)+".png") - plt.savefig("Sensor"+str(n)+".pdf", format="pdf") - plt.savefig("Sensor"+str(n)+".svg", format="svg") + # Save the plot as Sensor[Number] (e.g. Sensor9) as png, pdf and svg + plt.savefig("Sensor" + str(n) + ".png") + plt.savefig("Sensor" + str(n) + ".pdf", format="pdf") + plt.savefig("Sensor" + str(n) + ".svg", format="svg") print("==> Images saved") else: print("==> Images discarded") + +# Since we have defined a function above as a function, this here is executed first filename = input("Please enter a file path to the csv file to be plotted: ") + +# Try to open the file try: file = open(filename, "r") generate_plot() diff --git a/plot_generator/plot_generator.py b/plot_generator/plot_generator.py index 9072f4a..da2b61f 100644 --- a/plot_generator/plot_generator.py +++ b/plot_generator/plot_generator.py @@ -4,29 +4,36 @@ import matplotlib.pyplot as plt import csv import os +# Get user input for various data path = input("Path to csv-file to be plotted: ") +print("For the below, it is recommended to enter data in this format: yyyy-mm-dd-hh-mm") date = input("Date & time at which the measurement was taken (approx.): ") group = input("Group-name: ") saveit = input("Should the graph be saved? (y/n) ").lower() imp = open(path, "r") -reader = csv.reader(imp, delimiter=',') -rohdaten = list(reader) -lenght = len(rohdaten) +reader = csv.reader(imp, delimiter=",") +data = list(reader) x = [] y = [] -for i in range(lenght): - extract = rohdaten.pop(0) +for i in range(len(data)): + # Extract the data + extract = data.pop(0) x.append(float(extract.pop(0))) y.append(float(extract.pop(0))) +# Set up plot plt.plot(x, y, color="MAGENTA") plt.xlabel("Time") plt.ylabel("Voltage") -title = f"GC - Biogasanlage {date}" -plt.title(title) + +plt.title(f"GC - Biogasanlage {date}") plt.grid(True) -if saveit == "y": + +# Check if user wants to save the image +if saveit == "n": + print("didn't save images") +else: pos = 0 for letter in path[::-1]: if letter == "/": @@ -40,11 +47,7 @@ if saveit == "y": os.mkdir(save_path) except FileExistsError: pass - plt.savefig(save_path) - os.rename(f"{save_path}/.png", f"{save_path}/GC-{date}-{group}.png") - print(f"saved images to {save_path}") -else: - print("didn't save images") + plt.savefig(f"{save_path}/GC-{date}-{group}.png") + + print(f"Saved images to {save_path}") plt.show() - -