print(""" ===================== BIOGASCONTROLLERAPP ---------- Version 2.2 Copyright 2022 J.Hutz""") import time import threading import platform import os import webbrowser os.environ["KIVY_NO_CONSOLELOG"] = "1" from kivy.uix.screenmanager import Screen, ScreenManager from kivy.uix.popup import Popup from kivy.app import App from kivy.lang import Builder from kivy.clock import mainthread import bin.lib.lib import bin.lib.communication import bin.lib.comport_search import bin.lib.csv_parsers cvr = bin.lib.csv_parsers.CsvRead() cvw = bin.lib.csv_parsers.CsvWrite() com = bin.lib.lib.Com() ################################################################## # Popups ################################################################## class QuitPU(Popup): def quitapp(self): com.quitcom() class NoConnection(Popup): def details(self): self.detailsinfo = DetailInfo() self.detailsinfo.open() class DetailInfo(Popup): update_details = "" def infos(self): self.err = "" try: com.connect(19200, "") com.quitcom() except Exception as err: self.err += "Errormessage:\n" self.err += str(err) self.err += "\n-------------------------------------------------------------------------------------------------------------------------------------------------------------\n" return str(self.err) def error_tips(self): self.err_tip = "" try: com.connect(19200, "") com.quitcom() except Exception as err: self.err_tip += "Possible way to resolve the issue: \n\n" if str(err)[0:10] == "[Errno 13]": if platform.system() == "Linux": self.err_tip += f"Open a terminal and type in: sudo chmod 777 {bin.lib.comport_search.ComportService().get_comport()}" elif platform.system() == "Macintosh": self.err_tip += "Give permission to access the cable" elif platform.system() == "Windows": self.err_tip += "Try a different cable or install another driver" else: self.err_tip += "Unknown OS" elif str(err)[0:10] == "[Errno 2] ": if platform.system() == "Linux": self.err_tip += "Connect a cable, open a terminal and type in: sudo chmod 777 /dev/ttyUSB0" elif platform.system() == "Macintosh": self.err_tip += "Give permission to access the cable" elif platform.system() == "Windows": self.err_tip += "Try a different cable or install another driver" else: self.err_tip += "Unknown OS" elif str(err)[0:34] == "could not open port '/dev/ttyUSB0'": self.err_tip += "Please connect the PC with the microcontroller!" elif str(err)[0:26] == f"could not open port '{bin.lib.comport_search.ComportService().get_comport()}'": self.err_tip += "Try using a different cable or close all monitoring software (like MSI Afterburner)" else: self.err_tip += "Special Error, consult the manual of Serial" return str(self.err_tip) class Modeswitch(Popup): pass class Connecting_PU(Popup): pass class Disconnecting_PU(Popup): pass class MissingFieldsError(Popup): pass class ConnectionFail(Popup): pass class SaveConf(Popup): pass #################################################################### # SCREENS #################################################################### class HomeScreen(Screen): connected = 1 try: com.connect(19200, "") com.quitcom() except: connected = 0 def tryconnection(self): try: com.connect(19200, "") com.quitcom() self.connected = 1 self.manager.current = "Readout" self.manager.transition.direction = "right" except: self.connected = 0 self.open_popup() def open_popup(self): self.popups = NoConnection() self.popups.open() def exitapp(self): self.pup = QuitPU() self.pup.open() class ReadoutScreen(Screen): go = 1 def start_com(self): self.comstart(1) def comstart(self, pu_on): try: com.connect(19200, "") self.go = 1 except: self.go = 0 if self.go == 1: self.parent.current = "Readout" if pu_on == 1: self.openstartpu() else: pass self.communication = threading.Thread(name="communication", target=self.start_coms) self.communication.start() else: self.openconnectionfailpu() def end_com(self): self.stopcom(1) def stopcom(self, pu_on): self.go = 0 try: self.communication.join() except: pass if pu_on == 1: self.openendpu() else: pass def start_coms(self): self.check = 1 self.__level = 0 self.__distance = 0 self.__x = "" self.__begin = time.time() self.go = 1 while self.__x != "\n": if time.time() - self.__begin > 5: self.go = 0 break else: self.__x = com.decode_ascii(com.receive(1)) if self.go == 1: while self.__level < 3: self.__x = com.decode_ascii(com.receive(1)) if self.__x == " ": if self.__distance == 4: self.__level += 1 else: pass self.__distance = 0 else: if self.__distance > 4: self.__level = 0 self.__distance = 0 else: self.__distance += 1 self.check = 1 com.receive(5) else: self.go = 0 self.check = 0 while self.go == 1: self.__starttime = time.time() self.__output = "" self.__data_recieve = com.receive(68) self.__output += "Tadc: " self.__output += str(com.decode_int(self.__data_recieve[0:4])) self.__output += "\nTemperatur: " self.__output += com.decode_float(self.__data_recieve[5:11]) self.__output += f"\nDuty-Cycle: {(float(com.decode_float_2(self.__data_recieve[48:52])) / 65535) * 100}%" self.change_screen(1, self.__output) self.__output = "Tadc: " self.__output += str(com.decode_int(self.__data_recieve[12:16])) self.__output += "\nTemperatur: " self.__output += com.decode_float(self.__data_recieve[17:23]) self.__output += f"\nDuty-Cycle: {(float(com.decode_float_2(self.__data_recieve[53:57])) / 65535) * 100}%" self.change_screen(2, self.__output) self.__output = "Tadc: " self.__output += str(com.decode_int(self.__data_recieve[24:28])) self.__output += "\nTemperatur: " self.__output += com.decode_float(self.__data_recieve[29:35]) self.__output += f"\nDuty-Cycle: {(float(com.decode_float_2(self.__data_recieve[58:62])) / 65535) * 100}%" self.change_screen(3, self.__output) self.__output = "Tadc: " self.__output += str(com.decode_int(self.__data_recieve[36:40])) self.__output += "\nTemperatur: " self.__output += com.decode_float(self.__data_recieve[41:47]) self.__output += "\nDuty-Cycle: " self.__output += f"\nDuty-Cycle: {(float(com.decode_float_2(self.__data_recieve[63:67])) / 65535) * 100}%" self.change_screen(4, self.__output) self.change_screen(5, f"F={1 / (time.time() - self.__starttime)}") self.change_screen(6, "") com.quitcom() def switch_mode(self, text): self.go = 0 try: self.communication.join() com.quitcom() except: pass if text == "Normal Mode": bin.lib.communication.SwitchMode().disable_fastmode() else: bin.lib.communication.SwitchMode().enable_fastmode() self.openpupups() self.comstart(0) @mainthread def change_screen(self, pos, value): if pos == 1: self.ids.sonde1.text = value elif pos == 2: self.ids.sonde2.text = value elif pos == 3: self.ids.sonde3.text = value elif pos == 4: self.ids.sonde4.text = value elif pos == 6: self.openconnectionfailpu() else: self.ids.frequency.text = value def openpupups(self): self.popup = Modeswitch() self.popup.open() def openendpu(self): self.pu = Disconnecting_PU() self.pu.open() def openstartpu(self): self.pup = Connecting_PU() self.pup.open() def openconnectionfailpu(self): if self.check == 0: self.cfpu = ConnectionFail() self.cfpu.open() else: pass def leave_screen(self): self.stopcom(0) def resscreen(self): self.ids.sonde1.text = "" self.ids.sonde2.text = "" self.ids.sonde3.text = "" self.ids.sonde4.text = "" self.ids.frequency.text = "" class Program(Screen): def read_config(self): self.config_imp = [] self.__export = [] self.config_imp = cvr.importing("./config/config.csv") self.__export = self.config_imp.pop(0) self.__extracted = self.__export.pop(0) if self.__extracted == "1": self.ids.prsel.state = "normal" self.ids.s1_a.text = "" self.ids.s1_b.text = "" self.ids.s1_c.text = "" self.ids.s1_t.text = "" self.ids.s2_a.text = "" self.ids.s2_b.text = "" self.ids.s2_c.text = "" self.ids.s2_t.text = "" self.ids.s3_a.text = "" self.ids.s3_b.text = "" self.ids.s3_c.text = "" self.ids.s3_t.text = "" self.ids.s4_a.text = "" self.ids.s4_b.text = "" self.ids.s4_c.text = "" self.ids.s4_t.text = "" self.__mode = 1 else: self.ids.prsel.state = "down" self.read_data() self.__mode = 2 def change_mode(self): if self.__mode == "1": self.read_data() self.__mode = 2 else: self.ids.s1_a.text = "" self.ids.s1_b.text = "" self.ids.s1_c.text = "" self.ids.s1_t.text = "" self.ids.s2_a.text = "" self.ids.s2_b.text = "" self.ids.s2_c.text = "" self.ids.s2_t.text = "" self.ids.s3_a.text = "" self.ids.s3_b.text = "" self.ids.s3_c.text = "" self.ids.s3_t.text = "" self.ids.s4_a.text = "" self.ids.s4_b.text = "" self.ids.s4_c.text = "" self.ids.s4_t.text = "" self.__mode = 1 def read_data(self): try: com.connect(19200, "") self.go = 1 except: self.go = 0 if self.go == 1: com.send("RD") self.__pos = 1 self.__beginning = time.time() self.go = 1 while True: if time.time() - self.__beginning < 5: self.__data_recieve = com.decode_ascii(com.receive(1)) if self.__data_recieve == "\n": self.__data_recieve = com.decode_ascii(com.receive(1)) if self.__data_recieve == "R": self.__data_recieve = com.decode_ascii(com.receive(1)) if self.__data_recieve == "D": self.__data_recieve = com.decode_ascii(com.receive(1)) if self.__data_recieve == "\n": self.go = 1 break else: pass else: pass else: pass else: pass else: self.go = 0 break if self.go == 1: for i in range(4): self.__x = com.receive(28) self.__a = str(com.decode_float(self.__x[0:6])) self.__b += str(com.decode_float(self.__x[7:13])) self.__c += str(com.decode_float(self.__x[14:20])) self.__temp += str(com.decode_float(self.__x[21:27])) if self.__pos == 1: self.ids.s1_a.text = self.__a self.ids.s1_b.text = self.__b self.ids.s1_c.text = self.__c self.ids.s1_t.text = self.__temp elif self.__pos == 2: self.ids.s2_a.text = self.__a self.ids.s2_b.text = self.__b self.ids.s2_c.text = self.__c self.ids.s2_t.text = self.__temp elif self.__pos == 3: self.ids.s3_a.text = self.__a self.ids.s3_b.text = self.__b self.ids.s3_c.text = self.__c self.ids.s3_t.text = self.__temp elif self.__pos == 4: self.ids.s4_a.text = self.__a self.ids.s4_b.text = self.__b self.ids.s4_c.text = self.__c self.ids.s4_t.text = self.__temp self.__pos += 1 else: self.open_confail_pu() com.quitcom() else: self.open_confail_pu() def create_com(self): self.coms = bin.lib.communication.Communication() def send_data(self): try: self.create_com() self.go = 1 except: self.go = 0 if self.go == 1: self.__transmit = [] if self.ids.s1_a.text != "" and self.ids.s1_b.text != "" and self.ids.s1_c.text != "" and self.ids.s1_t.text != "" and self.ids.s2_a.text != "" and self.ids.s2_b.text != "" and self.ids.s2_c.text != "" and self.ids.s2_t.text != "" and self.ids.s3_a.text != "" and self.ids.s3_b.text != "" and self.ids.s3_c.text != "" and self.ids.s3_t.text != "" and self.ids.s4_a.text != "" and self.ids.s4_b.text != "" and self.ids.s4_c.text != "" and self.ids.s4_t.text != "": self.__transmit.append(self.ids.s1_a.text) self.__transmit.append(self.ids.s1_b.text) self.__transmit.append(self.ids.s1_c.text) self.__transmit.append(self.ids.s1_t.text) self.__transmit.append(self.ids.s2_a.text) self.__transmit.append(self.ids.s2_b.text) self.__transmit.append(self.ids.s2_c.text) self.__transmit.append(self.ids.s2_t.text) self.__transmit.append(self.ids.s3_a.text) self.__transmit.append(self.ids.s3_b.text) self.__transmit.append(self.ids.s3_c.text) self.__transmit.append(self.ids.s3_t.text) self.__transmit.append(self.ids.s4_a.text) self.__transmit.append(self.ids.s4_b.text) self.__transmit.append(self.ids.s4_c.text) self.__transmit.append(self.ids.s4_t.text) try: self.coms.change_all(self.__transmit, "") self.ids.s1_a.text = "" self.ids.s1_b.text = "" self.ids.s1_c.text = "" self.ids.s1_t.text = "" self.ids.s2_a.text = "" self.ids.s2_b.text = "" self.ids.s2_c.text = "" self.ids.s2_t.text = "" self.ids.s3_a.text = "" self.ids.s3_b.text = "" self.ids.s3_c.text = "" self.ids.s3_t.text = "" self.ids.s4_a.text = "" self.ids.s4_b.text = "" self.ids.s4_c.text = "" self.ids.s4_t.text = "" self.openconfpu() except: self.open_confail_pu() else: self.openerrorpu() else: self.open_confail_pu() def openerrorpu(self): self.pu = MissingFieldsError() self.pu.open() def open_confail_pu(self): self.cfpu = ConnectionFail() self.cfpu.open() def openconfpu(self): self.confpus = SaveConf() self.confpus.open() class ProgramTemp(Screen): def read_config(self): self.config_imp = [] self.__export = [] self.config_imp = cvr.importing("./config/config.csv") self.__export = self.config_imp.pop(0) self.__extracted = self.__export.pop(0) if self.__extracted == "1": self.ids.prsel.state = "normal" self.ids.temp_s1.text = "" self.ids.temp_s2.text = "" self.ids.temp_s3.text = "" self.ids.temp_s4.text = "" self.__mode = 1 else: self.ids.prsel.state = "down" self.read_data() self.__mode = 2 def change_mode(self): if self.__mode == "1": self.read_data() self.__mode = 2 else: self.ids.temp_s1.text = "" self.ids.temp_s2.text = "" self.ids.temp_s3.text = "" self.ids.temp_s4.text = "" self.__mode = 1 def read_data(self): try: com.connect(19200, "") self.go = 1 except: self.go = 0 if self.go == 1: com.send("RD") self.__pos = 1 self.__beginning = time.time() self.go = 1 while True: if time.time() - self.__beginning < 5: self.__data_recieve = com.decode_ascii(com.receive(1)) if self.__data_recieve == "\n": self.__data_recieve = com.decode_ascii(com.receive(1)) if self.__data_recieve == "R": self.__data_recieve = com.decode_ascii(com.receive(1)) if self.__data_recieve == "D": self.__data_recieve = com.decode_ascii(com.receive(1)) if self.__data_recieve == "\n": self.go = 1 break else: pass else: pass else: pass else: pass else: self.go = 0 break if self.go == 1: for i in range(4): self.__x = com.receive(28) self.__output = str(com.decode_float(self.__x[21:27])) if self.__pos == 1: self.ids.temp_s1.text = self.__output elif self.__pos == 2: self.ids.temp_s2.text = self.__output elif self.__pos == 3: self.ids.temp_s3.text = self.__output elif self.__pos == 4: self.ids.temp_s4.text = self.__output self.__pos += 1 else: self.open_confail_pu() com.quitcom() else: self.open_confail_pu() def create_com(self): self.coms = bin.lib.communication.Communication() def send_data(self): try: self.create_com() self.go = 1 except: self.go = 0 if self.go == 1: self.__transmit = [] if self.ids.temp_s1.text != "" and self.ids.temp_s2.text != "" and self.ids.temp_s3.text != "" and self.ids.temp_s4.text != "": self.__transmit.append(self.ids.temp_s1.text) self.__transmit.append(self.ids.temp_s2.text) self.__transmit.append(self.ids.temp_s3.text) self.__transmit.append(self.ids.temp_s4.text) self.coms.change_temp(self.__transmit, "") self.ids.temp_s1.text = "" self.ids.temp_s2.text = "" self.ids.temp_s3.text = "" self.ids.temp_s4.text = "" self.openconfpu() else: self.openerrorpu() else: self.open_confail_pu() def openerrorpu(self): self.pu = MissingFieldsError() self.pu.open() def openconfpu(self): self.confpu = SaveConf() self.confpu.open() def open_confail_pu(self): self.cfpu = ConnectionFail() self.cfpu.open() class ReadData(Screen): def read_data(self): try: com.connect(19200, "") self.go = 1 except: self.go = 0 if self.go == 1: com.send("RD") self.__pos = 1 self.__beginning = time.time() self.go = 1 while True: if time.time() - self.__beginning < 5: self.__data_recieve = com.decode_ascii(com.receive(1)) if self.__data_recieve == "\n": self.__data_recieve = com.decode_ascii(com.receive(1)) if self.__data_recieve == "R": self.__data_recieve = com.decode_ascii(com.receive(1)) if self.__data_recieve == "D": self.__data_recieve = com.decode_ascii(com.receive(1)) if self.__data_recieve == "\n": self.go = 1 break else: pass else: pass else: pass else: pass else: self.go = 0 break if self.go == 1: for i in range(4): self.__x = com.receive(28) self.__output = "a: " self.__output += str(com.decode_float(self.__x[0:6])) self.__output += f"\nb: {str(com.decode_float(self.__x[7:13]))}" self.__output += f"\nc: {str(com.decode_float(self.__x[14:20]))}" self.__output += f"\nTemp: {str(com.decode_float(self.__x[21:27]))}" if self.__pos == 1: self.ids.inf_sonde1.text = self.__output elif self.__pos == 2: self.ids.inf_sonde2.text = self.__output elif self.__pos == 3: self.ids.inf_sonde3.text = self.__output elif self.__pos == 4: self.ids.inf_sonde4.text = self.__output self.__pos += 1 else: self.open_confail_pu() com.quitcom() else: self.open_confail_pu() def open_confail_pu(self): self.cfpu = ConnectionFail() self.cfpu.open() class Credits(Screen): pass class Modify(Screen): def read_config(self): self.config_imp = [] self.__export = [] self.config_imp = cvr.importing("./config/config.csv") self.__export = self.config_imp.pop(0) self.__extracted = self.__export.pop(0) if self.__extracted == "1": self.ids.prsel.state = "normal" else: self.ids.prsel.state = "down" def issue_reporting(self): webbrowser.open("https://github.com/simplePCBuilding/BiogasControllerApp/issues", new=2) def change_programming(self): self.csv_import = [] self.csv_import = cvr.importing("./config/config.csv") self.csv_import.pop(0) if self.ids.prsel.text == "Easy\nreprogramming": self.csv_import.insert(0, 1) else: self.csv_import.insert(0, 2) cvw.write_str("./config/config.csv", self.csv_import) ######################################################## # Screenmanager ######################################################## class RootScreen(ScreenManager): pass kv = Builder.load_file("./bin/gui/gui.kv") class BiogasControllerApp(App): def build(self): self.icon = "./BiogasControllerAppLogo.png" return kv if __name__ == "__main__": BiogasControllerApp().run()