49 Commits

Author SHA1 Message Date
Janis Hutz
1c7b758a11 Delete .github/ISSUE_TEMPLATE/custom.md 2025-06-16 15:01:00 +00:00
Janis Hutz
44822e1cc4 Update LICENSE
Remove placeholder
2025-06-16 14:59:41 +00:00
4588caf974 Update README 2025-06-16 16:58:45 +02:00
223ab40bf8 Update README 2025-06-16 16:53:34 +02:00
7905cb851a Config, Lots of docs, Format
Added a config validator and documented code that was previously
undocumented, for the plot_generator scripts, documented them.
2025-06-16 16:36:18 +02:00
3a6cd6af3d Redesign app, prepare for 3.1.0 release 2025-06-16 12:21:45 +02:00
d6a5e90b3c Design temporary save 2025-06-15 19:54:19 +02:00
2b8f3c8aad Start reworking design 2025-06-15 15:33:11 +02:00
d875119071 Start design rework 2025-06-15 12:12:12 +02:00
b01232b552 Get test library running 2025-06-15 12:01:59 +02:00
b00466c5dd Fix script errors 2025-06-11 08:35:17 +02:00
38a7bec5fd Also add dist/ dir to ignore 2025-06-10 17:54:00 +02:00
5705095d94 Add build dir to ignore 2025-06-10 17:49:10 +02:00
Janis Hutz
6d845cb328 Merge pull request #8 from janishutz/dev
Update to version 3.0.0
2025-06-10 15:44:19 +00:00
d02d6edb43 Reorder readme 2025-06-10 17:42:24 +02:00
a8340f4931 Update README.md 2025-06-10 17:41:39 +02:00
0d54a8e7a3 Update README.md 2025-06-10 17:38:12 +02:00
8da74a2853 Update SECURITY.md 2025-06-10 17:37:31 +02:00
822380a658 Complete restructure 2025-06-10 17:35:35 +02:00
e423add6a0 Restructure for better usability 2025-06-10 17:15:39 +02:00
af4b697e01 Remove build utilities, add install script 2025-06-10 17:15:05 +02:00
bf244c7dab Add tariffs as optional feature 2025-05-26 17:48:25 +02:00
f560c24574 fit.py: Update to improve syntax 2025-05-26 17:16:31 +02:00
Janis Hutz
096003ffa9 Fix inconsistent syntax in com.py 2025-05-23 09:11:42 +00:00
36471564cc [Build] Add comment 2025-05-22 11:23:03 +02:00
66bbb7a1a2 Fix incorrect typing 2025-05-20 17:41:20 +02:00
d4e64a3cec [Build] Add build ressources 2025-05-20 11:23:06 +02:00
a77cc41b60 Testing: Start rewrite 2025-05-15 17:27:05 +02:00
987b0016c3 Start design update 2025-05-15 17:26:51 +02:00
001d4f2bdb Fix up hooking process for main 2025-05-13 15:51:40 +02:00
1fe48f2494 Testing: More features, but probably not going to finish 2025-05-13 15:51:24 +02:00
986d887587 UI Tweaks 2025-05-12 16:28:28 +02:00
b694d9d086 More test features 2025-05-12 16:28:23 +02:00
f4fe3dd34c Improve error handling of com 2025-05-12 16:28:09 +02:00
cfa0afd622 Add launch output 2025-05-12 16:27:56 +02:00
0729fed5c2 Improve README, add requirements.txt 2025-05-12 16:27:48 +02:00
a8ad40148f Improve Com class, continue writing test 2025-05-09 11:03:49 +02:00
e71f9e6d02 Main, Program Screen basically done, UI Tweaks, backend fixes, start writing testing library 2025-05-08 18:12:26 +02:00
92836fe427 Fix some naming 2025-04-14 16:25:06 +02:00
e0a54ac2bd Popups done, Readout Screen prepared, Small fixes 2025-04-09 17:15:05 +02:00
36a3079040 Some fixes for update 2025-03-11 15:12:36 +01:00
92fcc4a6e7 App launching, some porting work complete 2025-03-05 14:07:58 +01:00
ffd75d94dc Begin rewrite 2025-03-05 11:20:01 +01:00
d26e91db31 Add gitignore 2025-03-05 10:12:02 +01:00
7f11d4b3af Prepare for full rewrite 2025-03-05 10:11:25 +01:00
Janis Hutz
833938e1d6 Update README.md 2024-06-05 12:02:35 +00:00
Janis Hutz
379a3876f9 Update README.md 2023-05-31 08:51:48 +02:00
Janis Hutz
6afada30be Redo README
I have to a major extent rewritten the readme files to reflect recent design changes I made to my README files
2023-05-31 08:49:41 +02:00
simplePCBuilding
e69c058345 Delete BiogasControllerApp-V2.1-Installer-V2.exe
why even was that in here in the first place... probably stupid earlier me uploaded this by accident
2022-10-02 19:55:39 +00:00
56 changed files with 2268 additions and 2176 deletions

2
.gitattributes vendored
View File

@@ -1,2 +0,0 @@
# Auto detect text files and perform LF normalization
* text=auto

View File

@@ -1,10 +0,0 @@
---
name: Custom issue template
about: Describe this issue template's purpose here.
title: ''
labels: ''
assignees: ''
---

3
.gitignore vendored Normal file
View File

@@ -0,0 +1,3 @@
__pycache__
build/
dist/

3
.idea/.gitignore generated vendored
View File

@@ -1,3 +0,0 @@
# Default ignored files
/shelf/
/workspace.xml

View File

@@ -1,12 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$" />
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
<component name="PyDocumentationSettings">
<option name="format" value="PLAIN" />
<option name="myDocStringFormat" value="Plain" />
</component>
</module>

View File

@@ -1,6 +0,0 @@
<component name="InspectionProjectProfileManager">
<settings>
<option name="USE_PROJECT_PROFILE" value="false" />
<version value="1.0" />
</settings>
</component>

4
.idea/misc.xml generated
View File

@@ -1,4 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.10" project-jdk-type="Python SDK" />
</project>

8
.idea/modules.xml generated
View File

@@ -1,8 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/BiogasControllerApp.iml" filepath="$PROJECT_DIR$/.idea/BiogasControllerApp.iml" />
</modules>
</component>
</project>

6
.idea/vcs.xml generated
View File

@@ -1,6 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$" vcs="Git" />
</component>
</project>

View File

@@ -1,3 +0,0 @@
# Default ignored files
/shelf/
/workspace.xml

View File

@@ -1 +0,0 @@
biogascontrollerapp.py

View File

@@ -1,10 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$">
<excludeFolder url="file://$MODULE_DIR$/venv" />
</content>
<orderEntry type="jdk" jdkName="Python 3.8" jdkType="Python SDK" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>

View File

@@ -1,6 +0,0 @@
<component name="InspectionProjectProfileManager">
<settings>
<option name="USE_PROJECT_PROFILE" value="false" />
<version value="1.0" />
</settings>
</component>

View File

@@ -1,4 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.8" project-jdk-type="Python SDK" />
</project>

View File

@@ -1,8 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/ENATECH.iml" filepath="$PROJECT_DIR$/.idea/ENATECH.iml" />
</modules>
</component>
</project>

Binary file not shown.

Before

Width:  |  Height:  |  Size: 118 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 36 KiB

View File

@@ -1,694 +0,0 @@
RootScreen:
HomeScreen:
ReadoutScreen:
ReadData:
ProgramTemp:
Program:
Credits:
Modify:
<InfoPU>:
title: "NOTICE"
size_hint: 0.7, 0.5
auto_dismiss: True
GridLayout:
cols:1
Label:
text: "THIS SOFTWARE IS FREE SOFTWARE LICENSED UNDER THE GNU GENERAL PUBLIC LICENSE V3 (GPL V3) AND AS SUCH COMES WITH ABSOLUTELY NO WARRANTY! \n\nmore info under Settings > Credits"
text_size: self.width, None
GridLayout:
cols: 2
Button:
text: "Don't show anymore"
on_release:
root.notshowanymore()
Button:
text: "ok"
on_release:
root.dismiss()
<QuitPU>:
title: "BiogasControllerApp"
font_size: 50
size_hint: 0.5, 0.4
auto_dismiss: False
GridLayout:
cols:1
Label:
text: "Are you sure you want to leave?"
font_size: 20
GridLayout:
cols:2
Button:
text: "Yes"
font_size: 15
on_release:
root.quitapp()
app.stop()
Button:
text: "No"
font_size: 15
on_press:
root.dismiss()
<NoConnection>:
title: "WARNING!"
font_size: 50
size_hint: 0.5, 0.4
auto_dismiss: False
GridLayout:
cols:1
Label:
text: "Unable to open Serial Port"
font_size: 20
GridLayout:
cols:2
Button:
text: "Details"
on_release:
root.details()
Button:
text:"Ok"
on_release:
root.dismiss()
<ConnectionFail>:
title: "WARNING!"
font_size: 50
size_hint: 0.7, 0.6
auto_dismiss: False
GridLayout:
cols:1
Label:
text: "Unable to communicate"
font_size: 20
Label:
text: "Possible ways to resolve this problem:\n- Try again\n- Restart the PIC16F877 or reset the program\n- Check the cable / connect one"
font_size: 14
Button:
text:"Ok"
on_release:
root.dismiss()
<DetailInfo>:
on_open: self.update_details = root.infos()
title: "DETAILS"
font_size: 50
size_hint: 1, 0.7
auto_dismiss: False
GridLayout:
cols:1
Label:
text: "Unable to open Serial Port"
font_size: 20
Label:
id: errormessage
text: root.infos()
font_size: 13
Label:
text: root.error_tips()
font_size: 13
Button:
text:"Ok"
on_release:
root.dismiss()
<Modeswitch>:
title: "NOTICE!"
font_size: 50
size_hint: 0.5, 0.4
auto_dismiss: False
GridLayout:
cols:1
Label:
text: "Mode Switched!"
font_size: 30
Button:
text:"Ok"
on_release:
root.dismiss()
<SaveConf>:
title: "NOTICE!"
font_size: 50
size_hint: 0.5, 0.4
auto_dismiss: False
GridLayout:
cols:1
Label:
text: "SAVED!"
font_size: 30
Button:
text:"Ok"
on_release:
root.dismiss()
<Connecting_PU>:
title: "NOTICE!"
font_size: 50
size_hint: 0.5, 0.4
auto_dismiss: False
GridLayout:
cols:1
Label:
text: "Establishing connection with PIC16F877"
font_size: 18
Label:
text: "This Process may take a while..."
font_size: 15
Button:
text:"Ok"
on_release:
root.dismiss()
<MissingFieldsError>:
title: "WARNING!"
font_size: 50
size_hint: 0.5, 0.4
auto_dismiss: False
GridLayout:
cols:1
Label:
text: "Missing Information!"
font_size: 18
Label:
text: "Check your entry"
font_size: 15
Button:
text:"Ok"
on_release:
root.dismiss()
<Disconnecting_PU>:
title: "NOTICE!"
font_size: 50
size_hint: 0.5, 0.4
auto_dismiss: False
GridLayout:
cols:1
Label:
text: "Connection with PIC16F877 terminated"
font_size: 18
Label:
text: "The connection to the Microcontroller\nhas been terminated successfully"
font_size: 15
Button:
text:"Ok"
on_release:
root.dismiss()
######################################
# SCREENS
######################################
<HomeScreen>:
name: "HomeS"
canvas.before:
Color:
rgba: (50,50,50,0.2)
Rectangle:
size: self.size
pos: self.pos
GridLayout:
cols:1
Label:
text: "BiogasanlageControllerApp"
font_size: 50
color: (0, 113, 0, 1)
bold:True
italic:True
FloatLayout:
GridLayout:
cols: 2
size_hint: 0.8, 0.8
pos_hint: {"x": 0.1, "y": 0.1}
Button:
text: "Start"
background_color: (255, 0, 0, 0.6)
font_size: 30
on_release:
root.tryconnection()
Button:
text: "Quit"
background_color: (255, 0, 0, 0.6)
font_size: 30
on_release:
root.exitapp()
Label:
text: root.reset()
id: app_version
font_size: 13
pos_hint: {"y": -0.45, "x":0.05}
Button:
text: "Settings"
font_size: 13
size_hint: 0.07, 0.06
pos_hint: {"x":0.01, "y":0.01}
background_color: (50, 0, 0, 0.2)
on_release:
app.root.current = "Settings"
root.manager.transition.direction = "down"
<ReadoutScreen>:
on_pre_enter: self.reset_screen = root.resscreen()
name: "Readout"
canvas.before:
Color:
rgba: (50,50,50,0.2)
Rectangle:
size: self.size
pos: self.pos
GridLayout:
FloatLayout:
Label:
pos_hint: {"y":0.4}
text: "READOUT"
font_size: 40
color: (0, 113, 0, 1)
bold: True
GridLayout:
cols:4
size_hint: 0.8, 0.3
pos_hint: {"x":0.1, "y":0.4}
Label:
text: "SENSOR 1: "
font_size: 20
Label:
id: sonde1
text: ""
Label:
text: "SENSOR 2: "
font_size: 20
Label:
id: sonde2
text: ""
Label:
text: "SENSOR 3: "
font_size: 20
Label:
id: sonde3
text: ""
Label:
text: "SENSOR 4: "
font_size: 20
Label:
id: sonde4
text: ""
Button:
text: "Start communication"
size_hint: 0.2, 0.1
pos_hint: {"x": 0.5, "y": 0.05}
background_color: (255, 0, 0, 0.6)
on_release:
root.start_com()
Button:
text: "End communication"
size_hint: 0.2, 0.1
pos_hint: {"x": 0.7, "y": 0.05}
background_color: (255, 0, 0, 0.6)
on_release:
root.end_com()
Button:
text: "Back"
size_hint: 0.3, 0.1
pos_hint: {"x":0.05, "y":0.05}
background_color: (255, 0, 0, 0.6)
on_release:
root.leave_screen()
app.root.current = "HomeS"
root.manager.transition.direction = "left"
ToggleButton:
id: mode_sel
size_hint: 0.15, 0.1
pos_hint: {"x":0.1, "y":0.2}
text: "Normal Mode" if self.state == "normal" else "Fast Mode"
on_text: root.switch_mode(mode_sel.text)
background_color: (255,0,0,0.6) if self.state == "normal" else (0,0,255,0.6)
Button:
text: "Read Data"
size_hint: 0.15, 0.1
pos_hint: {"x":0.3, "y":0.2}
background_color: (255, 0, 0, 0.6)
on_release:
root.leave_screen()
app.root.current = "RD"
root.manager.transition.direction = "down"
Button:
text: "Temperature"
size_hint: 0.15, 0.1
pos_hint: {"x":0.5, "y":0.2}
background_color: (255, 0, 0, 0.6)
on_release:
root.leave_screen()
app.root.current = "PT"
root.manager.transition.direction = "down"
Button:
text: "Change all Data"
size_hint: 0.15, 0.1
pos_hint: {"x":0.7, "y":0.2}
background_color: (255, 0, 0, 0.6)
on_release:
root.leave_screen()
app.root.current = "PR"
root.manager.transition.direction = "down"
Label:
id: frequency
text: "Frequency will appear here"
font_size: 10
pos_hint: {"x":0.4, "y": 0.3}
<ReadData>:
name: "RD"
canvas.before:
Color:
rgba: (50,50,50,0.2)
Rectangle:
size: self.size
pos: self.pos
GridLayout:
FloatLayout:
Label:
text: "Read Data"
font_size: 40
color: (0, 113, 0, 1)
bold: True
pos_hint: {"y":0.4}
Button:
text: "Start Readout"
size_hint: 0.2, 0.1
pos_hint: {"x":0.4, "y":0.1}
on_release:
root.read_data()
Button:
text: "Back"
size_hint: 0.2, 0.1
pos_hint: {"x":0.1, "y":0.1}
background_color: (255, 0, 0, 0.6)
on_release:
app.root.current = "Readout"
root.manager.transition.direction = "up"
GridLayout:
cols:4
size_hint: 0.8, 0.4
pos_hint: {"x":0.1, "y":0.3}
Label:
text: "Sonde 1"
font_size: 20
Label:
id: inf_sonde1
text: ""
Label:
text: "Sonde 2"
font_size: 20
Label:
id: inf_sonde2
text: ""
Label:
text: "Sonde 3"
font_size: 20
Label:
id: inf_sonde3
text: ""
Label:
text: "Sonde 4"
font_size: 20
Label:
id: inf_sonde4
text: ""
<ProgramTemp>:
on_pre_enter: self.check_config = root.read_config()
name: "PT"
canvas.before:
Color:
rgba: (50,50,50,0.2)
Rectangle:
size: self.size
pos: self.pos
FloatLayout:
Label:
text: "Change Temperature"
pos_hint: {"y":0.4}
font_size: 40
color: (0, 113, 0, 1)
bold: True
GridLayout:
size_hint: 0.8, 0.4
pos_hint: {"x": 0.1, "y":0.3}
cols:2
Label:
text: "Temperature Sensor 1: "
TextInput:
id: temp_s1
multiline: False
input_filter: "float"
Label:
text: "Temperature Sensor 2: "
TextInput:
id: temp_s2
multiline: False
input_filter: "float"
Label:
text: "Temperature Sensor 3: "
TextInput:
id: temp_s3
multiline: False
input_filter: "float"
Label:
text: "Temperature Sensor 4: "
TextInput:
id: temp_s4
multiline: False
input_filter: "float"
Button:
text: "Back"
size_hint: 0.1, 0.1
pos_hint: {"x":0.1, "y":0.1}
background_color: (255, 0, 0, 0.6)
on_release:
app.root.current = "Readout"
root.manager.transition.direction = "up"
ToggleButton:
id: prsel
size_hint: 0.2, 0.1
pos_hint: {"x":0.35, "y": 0.1}
text: "Full\nreprogramming" if self.state == "normal" else "Partial\nreprogramming"
on_release: root.change_mode()
background_color: (255,0,0,0.6) if self.state == "normal" else (0,0,255,0.6)
Button:
text: "Save"
size_hint: 0.2, 0.1
pos_hint: {"x":0.6, "y":0.1}
background_color: (255, 0, 0, 0.6)
on_release:
root.send_data()
<Program>:
name: "PR"
on_pre_enter: self.check_config = root.read_config()
canvas.before:
Color:
rgba: (50,50,50,0.2)
Rectangle:
size: self.size
pos: self.pos
FloatLayout:
Label:
text: "Change all Data"
font_size: 40
color: (0, 113, 0, 1)
bold: True
pos_hint: {"y":0.4}
GridLayout:
size_hint: 0.8, 0.5
pos_hint: {"x":0.1, "y":0.2}
cols: 4
Label:
text: "Sensor 1, a:"
TextInput:
id: s1_a
multiline: False
input_filter: "float"
Label:
text: "Sensor 1, b:"
TextInput:
id: s1_b
multiline: False
input_filter: "float"
Label:
text: "Sensor 1, c:"
TextInput:
id: s1_c
multiline: False
input_filter: "float"
Label:
text: "Sensor 1, Temp:"
TextInput:
id: s1_t
multiline: False
input_filter: "float"
Label:
text: "Sensor 2, a:"
TextInput:
id: s2_a
multiline: False
input_filter: "float"
Label:
text: "Sensor 2, b:"
TextInput:
id: s2_b
multiline: False
input_filter: "float"
Label:
text: "Sensor 2, c:"
TextInput:
id: s2_c
multiline: False
input_filter: "float"
Label:
text: "Sensor 2, Temp:"
TextInput:
id: s2_t
multiline: False
input_filter: "float"
Label:
text: "Sensor 3, a:"
TextInput:
id: s3_a
multiline: False
input_filter: "float"
Label:
text: "Sensor 3, b:"
TextInput:
id: s3_b
multiline: False
input_filter: "float"
Label:
text: "Sensor 3, c:"
TextInput:
id: s3_c
multiline: False
input_filter: "float"
Label:
text: "Sensor 3, Temp:"
TextInput:
id: s3_t
multiline: False
input_filter: "float"
Label:
text: "Sensor 4, a:"
TextInput:
id: s4_a
multiline: False
input_filter: "float"
Label:
text: "Sensor 4, b:"
TextInput:
id: s4_b
multiline: False
input_filter: "float"
Label:
text: "Sensor 4, c:"
TextInput:
id: s4_c
multiline: False
input_filter: "float"
Label:
text: "Sensor 4, Temp:"
TextInput:
id: s4_t
multiline: False
input_filter: "float"
Button:
text: "Back"
size_hint: 0.1, 0.1
pos_hint: {"x":0.1, "y":0.1}
background_color: (255, 0, 0, 0.6)
on_release:
app.root.current = "Readout"
root.manager.transition.direction = "up"
ToggleButton:
id: prsel
size_hint: 0.2, 0.1
pos_hint: {"x":0.35, "y": 0.1}
text: "Full\nreprogramming" if self.state == "normal" else "Partial\nreprogramming"
on_release: root.change_mode()
background_color: (255,0,0,0.6) if self.state == "normal" else (0,0,255,0.6)
Button:
text: "Save"
size_hint: 0.2, 0.1
pos_hint: {"x":0.6, "y":0.1}
background_color: (255, 0, 0, 0.6)
on_release:
root.send_data()
<Credits>:
name: "Credits"
canvas.before:
Color:
rgba: (50,50,50,0.2)
Rectangle:
size: self.size
pos: self.pos
FloatLayout:
Button:
text: "back"
size_hint: 0.4, 0.2
pos_hint: {"x":0.3, "y":0.1}
on_release:
app.root.current = "Settings"
root.manager.transition.direction = "right"
GridLayout:
cols:1
pos_hint:{"x":0.05, "y":0.35}
size_hint: 0.9, 0.5
Label:
text: "This is a rework of the BiogasControllerApp V1, that was originally programmed by S. Reichmuth."
Label:
text: "Written by: Janis Hutz\nDesigned by: Janis Hutz\nDesign language: Kivy"
Label:
text: "This software is free Software licensed under the GPL V3 (GNU General Public License) and as such comes with absolutely no warranty. In return, you can use, modify, distribute or use any of the code of this software in your own project, if you reuse the same license. For more infos, you can find a copy of this license in the project folder."
text_size: self.width, None
<Modify>:
on_pre_enter: self.config = root.read_config()
name: "Settings"
canvas.before:
Color:
rgba: (50,50,50,0.2)
Rectangle:
size: self.size
pos: self.pos
GridLayout:
cols: 1
Label:
text: "Settings"
font_size: 40
color: (0, 113, 0, 1)
bold: True
FloatLayout:
GridLayout:
pos_hint: {"x":0.05, "y":0.05}
size_hint: 0.9, 0.9
cols: 4
Button:
text: "Back"
background_color: (255,0,0,0.6)
on_release:
app.root.current = "HomeS"
root.manager.transition.direction = "up"
Button:
text: "Report a\nBug"
background_color: (255,0,0,0.6)
on_release:
root.issue_reporting()
ToggleButton:
id: prsel
text: "Full\nreprogramming" if self.state == "normal" else "Partial\nreprogramming"
on_release: root.change_programming()
background_color: (255,0,0,0.6) if self.state == "normal" else (0,0,255,0.6)
Button:
text: "Credits"
background_color: (255,0,0,0.6)
on_release:
app.root.current = "Credits"
root.manager.transition.direction = "left"

View File

@@ -1,96 +0,0 @@
import bin.lib.lib
com = bin.lib.lib.Com()
class Communication:
def __init__(self):
self.__x = 0
self.__data_recieve = 0
self.__output = ""
def change_temp(self, data, special_port):
com.connect(19200, special_port)
com.send("PT")
self.go = 0
while True:
self.__data_recieve = com.decode_ascii(com.receive(1))
if self.__data_recieve == "\n":
self.__data_recieve = com.decode_ascii(com.receive(1))
if self.__data_recieve == "P":
self.__data_recieve = com.decode_ascii(com.receive(1))
if self.__data_recieve == "T":
self.__data_recieve = com.decode_ascii(com.receive(1))
if self.__data_recieve == "\n":
self.go = 1
break
else:
pass
else:
pass
else:
pass
else:
pass
if self.go == 1:
self.data = data
while len(self.data) > 0:
self.__data_recieve = com.receive(3)
if self.__data_recieve != "":
com.send_float(float(self.data.pop(0)))
else:
print("error")
break
else:
print("Error")
com.quitcom()
def change_all(self, data, special_port):
com.connect(19200, special_port)
com.send("PR")
self.go = 0
while True:
self.__data_recieve = com.decode_ascii(com.receive(1))
if self.__data_recieve == "\n":
self.__data_recieve = com.decode_ascii(com.receive(1))
if self.__data_recieve == "P":
self.__data_recieve = com.decode_ascii(com.receive(1))
if self.__data_recieve == "R":
self.__data_recieve = com.decode_ascii(com.receive(1))
if self.__data_recieve == "\n":
self.go = 1
break
else:
pass
else:
pass
else:
pass
else:
pass
if self.go == 1:
self.data = data
while len(self.data) > 0:
self.__data_recieve = com.receive(3)
if self.__data_recieve != "":
com.send_float(float(self.data.pop(0)))
else:
print("error")
break
else:
print("Error")
com.quitcom()
class SwitchMode:
def __init__(self):
pass
def enable_fastmode(self, special_port):
com.connect(19200, special_port)
com.send("FM")
com.quitcom()
def disable_fastmode(self, special_port):
com.connect(19200, special_port)
com.send("NM")
com.quitcom()

View File

@@ -1,22 +0,0 @@
import serial.tools.list_ports
class ComportService:
def __init__(self):
self.__comport = []
self.__import = []
self.__working = []
def get_comport(self, special_port):
self.__comport = [comport.device for comport in serial.tools.list_ports.comports()]
self.__pos = 0
if special_port != "":
self.__working = special_port
else:
while self.__working == []:
self.__com_name = serial.tools.list_ports.comports()[self.__pos]
if "USB-Serial Controller" or "Prolific USB-Serial Controller" in self.__com_name:
self.__working = self.__comport.pop(self.__pos)
else:
self.__pos += 1
return self.__working

View File

@@ -1,122 +0,0 @@
"""@package docstring
This is a simplification of the csv module"""
import csv
class CsvRead:
"""This is a class that reads csv files and depending on the module selected does do different things with it"""
def __init__(self):
self.__imp = ""
self.__raw = ""
self.__raw_list = ""
def importing(self, path):
"""Returns a list of the imported csv-file, requires path, either direct system path or relative path"""
self.__imp = open(path)
self.__raw = csv.reader(self.__imp, delimiter=',')
self.__raw_list = list(self.__raw)
self.__imp.close()
return self.__raw_list
class CsvWrite:
"""This is a class that modifies csv files"""
def __init__(self):
self.__impl = []
self.__strpop = []
self.__removed = []
self.__removing = 0
self.__change = 0
self.__appending = 0
self.__imp = []
self.__raw = []
def rem_str(self, path, row):
"""Opens the csv-file in write mode which is specified as an argument either as direct or relative path"""
self.__imp = open(path)
self.__raw = csv.reader(self.__imp, delimiter=',')
self.__impl = list(self.__raw)
self.__removed = self.__impl.pop(row + 1)
with open(path, "w") as removedata:
self.__removing = csv.writer(removedata, delimiter=',', quoting=csv.QUOTE_MINIMAL)
self.__removing.writerow(self.__impl.pop(0))
while len(self.__impl) > 0:
with open(path, "a") as removedata:
self.__removing = csv.writer(removedata, delimiter=',', quoting=csv.QUOTE_MINIMAL)
self.__removing.writerow(self.__impl.pop(0))
self.__imp.close()
removedata.close()
def chg_str(self, path, row, pos, new_value):
"""Opens the csv-file in write mode to change a value, e.g. if a recipes is changed."""
self.__imp = open(path)
self.__raw = csv.reader(self.__imp, delimiter=',')
self.__impl = list(self.__raw)
self.__strpop = self.__impl.pop(row)
self.__strpop.pop(pos)
self.__strpop.insert(pos, new_value)
self.__impl.insert(row, self.__strpop)
with open(path, "w") as changedata:
self.__change = csv.writer(changedata, delimiter=',', quoting=csv.QUOTE_MINIMAL)
self.__change.writerow(self.__impl.pop(0))
while len(self.__impl) > 0:
with open(path, "a") as changedata:
self.__removing = csv.writer(changedata, delimiter=',', quoting=csv.QUOTE_MINIMAL)
self.__removing.writerow(self.__impl.pop(0))
self.__imp.close()
changedata.close()
def chg_str_rem(self, path, row, pos):
"""Opens the csv-file in write mode to change a value, e.g. if a recipes is changed."""
self.__imp = open(path)
self.__raw = csv.reader(self.__imp, delimiter=',')
self.__impl = list(self.__raw)
self.__strpop = self.__impl.pop(row)
self.__strpop.pop(pos)
self.__strpop.pop(pos)
self.__impl.insert(row, self.__strpop)
with open(path, "w") as changedata:
self.__change = csv.writer(changedata, delimiter=',', quoting=csv.QUOTE_MINIMAL)
self.__change.writerow(self.__impl.pop(0))
while len(self.__impl) > 0:
with open(path, "a") as changedata:
self.__removing = csv.writer(changedata, delimiter=',', quoting=csv.QUOTE_MINIMAL)
self.__removing.writerow(self.__impl.pop(0))
self.__imp.close()
changedata.close()
def chg_str_add(self, path, row, new_value1, new_value2):
"""Opens the csv-file in write mode to change a value, e.g. if a recipes is changed."""
self.__imp = open(path)
self.__raw = csv.reader(self.__imp, delimiter=',')
self.__impl = list(self.__raw)
self.__strpop = self.__impl.pop(row)
self.__strpop.append(new_value1)
self.__strpop.append(new_value2)
self.__impl.insert(row, self.__strpop)
with open(path, "w") as changedata:
self.__change = csv.writer(changedata, delimiter=',', quoting=csv.QUOTE_MINIMAL)
self.__change.writerow(self.__impl.pop(0))
while len(self.__impl) > 0:
with open(path, "a") as changedata:
self.__removing = csv.writer(changedata, delimiter=',', quoting=csv.QUOTE_MINIMAL)
self.__removing.writerow(self.__impl.pop(0))
self.__imp.close()
changedata.close()
def app_str(self, path, value):
"""Opens the csv-file in append mode and writes given input. CsvWrite.app_str(path, value).
Path can be specified both as direct or relative. value is a list. Will return an error if type of value is
not a list."""
with open(path, "a") as appenddata:
self.__appending = csv.writer(appenddata, delimiter=',', quoting=csv.QUOTE_MINIMAL)
self.__appending.writerow(value)
appenddata.close()
def write_str(self, path, value):
with open(path, "w") as writedata:
self.__change = csv.writer(writedata, delimiter=',', quoting=csv.QUOTE_MINIMAL)
self.__change.writerow(value)
writedata.close()

View File

@@ -1,73 +0,0 @@
import serial
import struct
import bin.lib.comport_search
"""@package docstring
This package can communicate with a microcontroller"""
coms = bin.lib.comport_search.ComportService()
class Com:
def __init__(self):
self.xr = ""
self.output = ""
self.str_input = ""
self.str_get_input = ""
self.xs = ""
self.__comport = '/dev/ttyUSB0'
def connect(self, baudrate, special_port):
try:
self.__comport = coms.get_comport(special_port)
except:
pass
self.ser = serial.Serial(self.__comport, baudrate=baudrate, timeout=5)
def quitcom(self):
try:
self.ser.close()
except:
pass
def receive(self, amount_bytes):
self.xr = self.ser.read(amount_bytes)
return self.xr
def decode_ascii(self, value):
try:
self.output = value.decode()
except:
self.output = "Error"
return self.output
def check_value(self, value_check, checked_value):
if value_check == checked_value:
return 1
else:
return 0
def decode_int(self, value):
self.i = int(value, base=16)
return self.i
def decode_float(self, value):
self.fs = str(value, 'ascii') + '00'
self.f = struct.unpack('>f', bytes.fromhex(self.fs))
return str(self.f[0])
def decode_float_2(self, value):
self.fs = str(value, 'ascii') + '0000'
self.f = struct.unpack('>f', bytes.fromhex(self.fs))
return str(self.f[0])
def get_input(self):
self.str_get_input = input("please enter a character to send: ")
return self.str_get_input
def send(self, str_input):
self.xs = str_input.encode()
self.ser.write(self.xs)
def send_float(self, float_input):
ba = bytearray(struct.pack('>f', float_input))
self.ser.write(ba[0:3])

View File

@@ -1,893 +0,0 @@
import os
import configparser
import serial
config = configparser.ConfigParser()
config.read('./config/settings.ini')
co = config['Dev Settings']['verbose']
if co == "True":
pass
else:
os.environ["KIVY_NO_CONSOLELOG"] = "1"
import threading
import platform
import webbrowser
from kivy.uix.screenmanager import Screen, ScreenManager
from kivy.core.window import Window
from kivy.uix.popup import Popup
from kivy.app import App
from kivy.lang import Builder
from kivy.clock import mainthread, Clock
import bin.lib.lib
import bin.lib.communication
import bin.lib.comport_search
import bin.lib.csv_parsers
import logging
import datetime
import time
version_app = f"{config['Info']['version']}{config['Info']['subVersion']}"
################################################################
# LOGGER SETUP
##################
logging.basicConfig(level=logging.DEBUG, filename="./log/main_log.log", filemode="w")
logs = f"./log/{datetime.datetime.now()}-log-main.log"
logger = logging.getLogger(__name__)
handler = logging.FileHandler(logs)
formatter = logging.Formatter("%(levelname)s - %(asctime)s - %(name)s: %(message)s -- %(lineno)d")
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(config['Dev Settings']['log_level'])
logger.info(f"Logger initialized, app is running Version: {version_app}")
#################################################################
if config['Port Settings']['specificPort'] == "None" or "\"\"":
special_port = ""
else:
special_port = config['Port Settings']['specificPort']
cvr = bin.lib.csv_parsers.CsvRead()
cvw = bin.lib.csv_parsers.CsvWrite()
com = bin.lib.lib.Com()
#################################################################
# Settings Handler
#########################
class SettingsHandler:
def __init__(self):
self.ports = None
self.window_sizeh = 600
self.window_sizew = 800
def settingshandler(self):
self.ports = config['Port Settings']['specificPort']
self.window_sizeh = config['UI Config']['sizeH']
self.window_sizew = config['UI Config']['sizeW']
Window.size = (int(self.window_sizew), int(self.window_sizeh))
#################################################################
logger.info("Started modules")
##################################################################
# Popups
##################################################################
class QuitPU(Popup):
def quitapp(self):
com.quitcom()
logger.debug("App stopped")
class NoConnection(Popup):
def details(self):
self.detailsinfo = DetailInfo()
self.detailsinfo.open()
class DetailInfo(Popup):
update_details = ""
def infos(self):
self.err = ""
try:
com.connect(19200, special_port)
com.quitcom()
except Exception as err:
self.err += "Errormessage:\n"
self.err += str(err)
self.err += "\n-------------------------------------------------------------------------------------------------------------------------------------------------------------\n"
return str(self.err)
def error_tips(self):
self.err_tip = ""
try:
com.connect(19200, special_port)
com.quitcom()
except Exception as err:
self.err_tip += "Possible way to resolve the issue: \n\n"
if str(err)[0:10] == "[Errno 13]":
if platform.system() == "Linux":
self.err_tip += f"Open a terminal and type in: sudo chmod 777 {bin.lib.comport_search.ComportService().get_comport(special_port)}"
elif platform.system() == "Macintosh":
self.err_tip += "Give permission to access the cable"
elif platform.system() == "Windows":
self.err_tip += "Try a different cable or install another driver"
else:
self.err_tip += "Unknown OS"
elif str(err)[0:10] == "[Errno 2] ":
if platform.system() == "Linux":
self.err_tip += "Connect a cable, open a terminal and type in: sudo chmod 777 /dev/ttyUSB0"
elif platform.system() == "Macintosh":
self.err_tip += "Give permission to access the cable"
elif platform.system() == "Windows":
self.err_tip += "Try a different cable or install another driver"
else:
self.err_tip += "Unknown OS"
elif str(err)[0:34] == "could not open port '/dev/ttyUSB0'":
self.err_tip += "Please connect the PC with the microcontroller!"
elif str(err)[0:26] == f"could not open port '{bin.lib.comport_search.ComportService().get_comport(special_port)}'":
self.err_tip += "Try using a different cable or close all monitoring software (like MSI Afterburner)"
else:
self.err_tip += "Special Error, consult the manual of Serial"
return str(self.err_tip)
class Modeswitch(Popup):
pass
class Connecting_PU(Popup):
pass
class Disconnecting_PU(Popup):
pass
class MissingFieldsError(Popup):
pass
class ConnectionFail(Popup):
pass
class SaveConf(Popup):
pass
class InfoPU(Popup):
def notshowanymore(self):
config.set("License", "show", "0")
with open("./config/settings.ini", "w") as configfile:
config.write(configfile)
self.dismiss()
####################################################################
# SCREENS
####################################################################
class HomeScreen(Screen):
def reset(self):
logger.info("HomeScreen initialised")
SettingsHandler().settingshandler()
self.connected = 1
self.info = f"You are currently running Version {version_app} - If you encounter a bug, please report it!"
try:
com.connect(19200, special_port)
com.quitcom()
except Exception as e:
self.connected = 0
logger.error(e)
return self.info
def openlicensepu(self):
self.licensepu = InfoPU()
self.licensepu.open()
def tryconnection(self):
if config["License"]["show"] == "1":
self.openlicensepu()
logger.info("Showing License info")
else:
pass
try:
com.connect(19200, special_port)
com.quitcom()
self.connected = 1
self.manager.current = "Readout"
self.manager.transition.direction = "right"
except Exception as ex:
if config['Dev Settings']['disableConnectionCheck'] == "True":
self.connected = 1
self.manager.current = "Readout"
self.manager.transition.direction = "right"
else:
self.connected = 0
logger.error(f"COM_error: {ex}")
self.open_popup()
def open_popup(self):
self.popups = NoConnection()
self.popups.open()
def exitapp(self):
self.pup = QuitPU()
self.pup.open()
class ReadoutScreen(Screen):
go = 1
def start_com(self):
self.comstart(1)
logger.info("Trying to start COM")
def comstart(self, pu_on):
try:
com.connect(19200, special_port)
self.go = 1
except Exception as e:
self.go = 0
logger.error(f"COM_error: {e}")
if self.go == 1:
logger.debug("COM start success")
self.parent.current = "Readout"
if pu_on == 1:
self.openstartpu()
else:
pass
self.communication = threading.Thread(name="communication", target=self.start_coms)
self.communication.start()
else:
self.openconnectionfailpu()
def end_com(self):
self.stopcom(1)
def stopcom(self, pu_on):
self.go = 0
try:
self.communication.join()
except Exception as e:
logger.warning(f"COM_Close_Error: {e}")
if pu_on == 1:
self.openendpu()
else:
pass
def start_coms(self):
self.check = 1
self.__level = 0
self.__distance = 0
self.__x = ""
self.__begin = time.time()
self.go = 1
logger.info("Starting COM_Hook")
while self.__x != "\n":
if time.time() - self.__begin > 5:
self.go = 0
break
else:
self.__x = com.decode_ascii(com.receive(1))
if self.go == 1:
logger.info("COM_Hook 1 success")
while self.__level < 3:
self.__x = com.decode_ascii(com.receive(1))
if self.__x == " ":
if self.__distance == 4:
self.__level += 1
else:
pass
self.__distance = 0
else:
if self.__distance > 4:
self.__level = 0
self.__distance = 0
else:
self.__distance += 1
self.check = 1
logger.info("COM_Hook successful")
com.receive(5)
else:
self.check = 0
while self.go == 1:
self.__starttime = time.time()
self.__output = ""
self.__data_recieve = com.receive(68)
self.__output += "Tadc: "
self.__output += str(com.decode_int(self.__data_recieve[0:4]))
self.__output += "\nTemperatur: "
self.__output += com.decode_float(self.__data_recieve[5:11])
self.__output += f"\nDuty-Cycle: {(float(com.decode_float_2(self.__data_recieve[48:52])) / 65535) * 100}%"
self.change_screen(1, self.__output)
self.__output = "Tadc: "
self.__output += str(com.decode_int(self.__data_recieve[12:16]))
self.__output += "\nTemperatur: "
self.__output += com.decode_float(self.__data_recieve[17:23])
self.__output += f"\nDuty-Cycle: {(float(com.decode_float_2(self.__data_recieve[53:57])) / 65535) * 100}%"
self.change_screen(2, self.__output)
self.__output = "Tadc: "
self.__output += str(com.decode_int(self.__data_recieve[24:28]))
self.__output += "\nTemperatur: "
self.__output += com.decode_float(self.__data_recieve[29:35])
self.__output += f"\nDuty-Cycle: {(float(com.decode_float_2(self.__data_recieve[58:62])) / 65535) * 100}%"
self.change_screen(3, self.__output)
self.__output = "Tadc: "
self.__output += str(com.decode_int(self.__data_recieve[36:40]))
self.__output += "\nTemperatur: "
self.__output += com.decode_float(self.__data_recieve[41:47])
self.__output += "\nDuty-Cycle: "
self.__output += f"\nDuty-Cycle: {(float(com.decode_float_2(self.__data_recieve[63:67])) / 65535) * 100}%"
self.change_screen(4, self.__output)
self.change_screen(5, f"F={1 / (time.time() - self.__starttime)}")
self.change_screen(6, "")
com.quitcom()
def switch_mode(self, text):
self.go = 0
try:
self.communication.join()
com.quitcom()
self.com_ok = 1
logger.info("Mode_Switch successful")
except Exception as e:
if e == serial.SerialException:
logger.info("No running process found, continuing")
else:
logger.fatal(f"FATAL ERROR OCCURED, APP WILL LEAVE NOW: {e}")
self.com_ok = 0
if self.com_ok == 1:
if text == "Normal Mode":
bin.lib.communication.SwitchMode().disable_fastmode(special_port)
else:
bin.lib.communication.SwitchMode().enable_fastmode(special_port)
logger.info("Switched mode, restarting COM")
self.openpupups()
self.comstart(0)
logger.info("COM restarted successfully")
else:
self.check = 1
self.ids.mode_sel.state = "normal"
self.openconnectionfailpu()
@mainthread
def change_screen(self, pos, value):
if pos == 1:
self.ids.sonde1.text = value
elif pos == 2:
self.ids.sonde2.text = value
elif pos == 3:
self.ids.sonde3.text = value
elif pos == 4:
self.ids.sonde4.text = value
elif pos == 6:
logger.error("COM_fail")
self.openconnectionfailpu()
else:
self.ids.frequency.text = value
def openpupups(self):
self.popup = Modeswitch()
self.popup.open()
def openendpu(self):
self.pu = Disconnecting_PU()
self.pu.open()
def openstartpu(self):
self.pup = Connecting_PU()
self.pup.open()
def openconnectionfailpu(self):
if self.check == 0:
self.cfpu = ConnectionFail()
self.cfpu.open()
else:
pass
def leave_screen(self):
logger.info("Stopping COM")
self.stopcom(0)
def resscreen(self):
logger.info("Screen reset")
self.ids.sonde1.text = ""
self.ids.sonde2.text = ""
self.ids.sonde3.text = ""
self.ids.sonde4.text = ""
self.ids.frequency.text = ""
class Program(Screen):
def read_config(self):
logger.debug("Reading config")
self.config_imp = []
self.__export = []
self.config_imp = cvr.importing("./config/config.csv")
self.__export = self.config_imp.pop(0)
self.__extracted = self.__export.pop(0)
logger.debug(f"config {self.__extracted}")
if self.__extracted == "1":
self.ids.prsel.state = "normal"
self.ids.s1_a.text = ""
self.ids.s1_b.text = ""
self.ids.s1_c.text = ""
self.ids.s1_t.text = ""
self.ids.s2_a.text = ""
self.ids.s2_b.text = ""
self.ids.s2_c.text = ""
self.ids.s2_t.text = ""
self.ids.s3_a.text = ""
self.ids.s3_b.text = ""
self.ids.s3_c.text = ""
self.ids.s3_t.text = ""
self.ids.s4_a.text = ""
self.ids.s4_b.text = ""
self.ids.s4_c.text = ""
self.ids.s4_t.text = ""
self.__mode = 1
else:
self.ids.prsel.state = "down"
Clock.schedule_once(self.read_data, 1)
self.__mode = 2
def change_mode(self):
logger.info("Changing mode")
logger.debug(f"mode was: {self.__mode}")
if self.__mode == 1:
logger.debug("Sending instruction to read info")
Clock.schedule_once(self.read_data, 1)
self.__mode = 2
else:
self.ids.s1_a.text = ""
self.ids.s1_b.text = ""
self.ids.s1_c.text = ""
self.ids.s1_t.text = ""
self.ids.s2_a.text = ""
self.ids.s2_b.text = ""
self.ids.s2_c.text = ""
self.ids.s2_t.text = ""
self.ids.s3_a.text = ""
self.ids.s3_b.text = ""
self.ids.s3_c.text = ""
self.ids.s3_t.text = ""
self.ids.s4_a.text = ""
self.ids.s4_b.text = ""
self.ids.s4_c.text = ""
self.ids.s4_t.text = ""
self.__mode = 1
def read_data(self, dt):
logger.debug("Starting to read data from the microcontroller")
try:
com.connect(19200, special_port)
self.go = 1
except Exception as e:
self.go = 0
logger.error(f"COM_error: {e}")
if self.go == 1:
logger.info("Sending instructions")
com.send("RD")
self.__pos = 1
self.__beginning = time.time()
logger.info("Awaiting confirmation from the microcontroller for hook")
while True:
if time.time() - self.__beginning < 5:
self.__data_recieve = com.decode_ascii(com.receive(1))
if self.__data_recieve == "\n":
self.__data_recieve = com.decode_ascii(com.receive(1))
if self.__data_recieve == "R":
self.__data_recieve = com.decode_ascii(com.receive(1))
if self.__data_recieve == "D":
self.__data_recieve = com.decode_ascii(com.receive(1))
if self.__data_recieve == "\n":
self.go = 1
logger.info("Hook successful")
break
else:
pass
else:
pass
else:
pass
else:
pass
else:
self.go = 0
logger.error("Microcontroller not available, stopping connection")
break
if self.go == 1:
for i in range(4):
self.__x = com.receive(28)
self.__a = str(com.decode_float(self.__x[0:6]))
self.__b = str(com.decode_float(self.__x[7:13]))
self.__c = str(com.decode_float(self.__x[14:20]))
self.__temp = str(com.decode_float(self.__x[21:27]))
if self.__pos == 1:
self.ids.s1_a.text = self.__a
self.ids.s1_b.text = self.__b
self.ids.s1_c.text = self.__c
self.ids.s1_t.text = self.__temp
elif self.__pos == 2:
self.ids.s2_a.text = self.__a
self.ids.s2_b.text = self.__b
self.ids.s2_c.text = self.__c
self.ids.s2_t.text = self.__temp
elif self.__pos == 3:
self.ids.s3_a.text = self.__a
self.ids.s3_b.text = self.__b
self.ids.s3_c.text = self.__c
self.ids.s3_t.text = self.__temp
elif self.__pos == 4:
self.ids.s4_a.text = self.__a
self.ids.s4_b.text = self.__b
self.ids.s4_c.text = self.__c
self.ids.s4_t.text = self.__temp
self.__pos += 1
logger.info("Recieved info from microcontroller")
else:
self.open_confail_pu()
com.quitcom()
else:
self.open_confail_pu()
def create_com(self):
self.coms = bin.lib.communication.Communication()
def send_data(self):
try:
self.create_com()
self.go = 1
except Exception as e:
self.go = 0
logger.critical(f"TRANSMISSION_Error: {e}")
if self.go == 1:
logger.info("Preparing data to be sent")
self.__transmit = []
if self.ids.s1_a.text != "" and self.ids.s1_b.text != "" and self.ids.s1_c.text != "" and self.ids.s1_t.text != "" and self.ids.s2_a.text != "" and self.ids.s2_b.text != "" and self.ids.s2_c.text != "" and self.ids.s2_t.text != "" and self.ids.s3_a.text != "" and self.ids.s3_b.text != "" and self.ids.s3_c.text != "" and self.ids.s3_t.text != "" and self.ids.s4_a.text != "" and self.ids.s4_b.text != "" and self.ids.s4_c.text != "" and self.ids.s4_t.text != "":
self.__transmit.append(self.ids.s1_a.text)
self.__transmit.append(self.ids.s1_b.text)
self.__transmit.append(self.ids.s1_c.text)
self.__transmit.append(self.ids.s1_t.text)
self.__transmit.append(self.ids.s2_a.text)
self.__transmit.append(self.ids.s2_b.text)
self.__transmit.append(self.ids.s2_c.text)
self.__transmit.append(self.ids.s2_t.text)
self.__transmit.append(self.ids.s3_a.text)
self.__transmit.append(self.ids.s3_b.text)
self.__transmit.append(self.ids.s3_c.text)
self.__transmit.append(self.ids.s3_t.text)
self.__transmit.append(self.ids.s4_a.text)
self.__transmit.append(self.ids.s4_b.text)
self.__transmit.append(self.ids.s4_c.text)
self.__transmit.append(self.ids.s4_t.text)
logger.debug("trying to send...")
try:
self.coms.change_all(self.__transmit, special_port)
logger.info("Transmission successful")
logger.debug("purging fields...")
self.ids.s1_a.text = ""
self.ids.s1_b.text = ""
self.ids.s1_c.text = ""
self.ids.s1_t.text = ""
self.ids.s2_a.text = ""
self.ids.s2_b.text = ""
self.ids.s2_c.text = ""
self.ids.s2_t.text = ""
self.ids.s3_a.text = ""
self.ids.s3_b.text = ""
self.ids.s3_c.text = ""
self.ids.s3_t.text = ""
self.ids.s4_a.text = ""
self.ids.s4_b.text = ""
self.ids.s4_c.text = ""
self.ids.s4_t.text = ""
self.openconfpu()
except Exception as e:
self.open_confail_pu()
logger.critical(f"TRANSMITION_Error: {e}")
else:
self.openerrorpu()
else:
self.open_confail_pu()
def openerrorpu(self):
self.pu = MissingFieldsError()
self.pu.open()
def open_confail_pu(self):
self.cfpu = ConnectionFail()
self.cfpu.open()
def openconfpu(self):
self.confpus = SaveConf()
self.confpus.open()
class ProgramTemp(Screen):
def read_config(self):
logger.debug("Reading config")
self.config_imp = []
self.__export = []
self.config_imp = cvr.importing("./config/config.csv")
self.__export = self.config_imp.pop(0)
self.__extracted = self.__export.pop(0)
logger.debug(f"Mode set is: {self.__extracted}")
if self.__extracted == "1":
self.ids.prsel.state = "normal"
self.ids.temp_s1.text = ""
self.ids.temp_s2.text = ""
self.ids.temp_s3.text = ""
self.ids.temp_s4.text = ""
self.__mode = 1
else:
self.ids.prsel.state = "down"
Clock.schedule_once(self.read_data, 1)
self.__mode = 2
def change_mode(self):
logger.info("Changing mode")
logger.debug(f"Mode was: {self.__mode}")
if self.__mode == 1:
logger.info("starting sub-thread")
Clock.schedule_once(self.read_data, 1)
self.__mode = 2
else:
logger.info("clearing screen")
self.ids.temp_s1.text = ""
self.ids.temp_s2.text = ""
self.ids.temp_s3.text = ""
self.ids.temp_s4.text = ""
self.__mode = 1
def read_data(self, dt):
logger.info("Trying to establish connection...")
try:
com.connect(19200, special_port)
self.go = 1
except Exception as e:
self.go = 0
logger.error(f"COM_Error: {e}")
if self.go == 1:
logger.info("Sending instructions to microcontroller...")
com.send("RD")
self.__pos = 1
self.__beginning = time.time()
self.go = 1
logger.info("Awaiting confirmation from the microcontroller for hook")
while True:
if time.time() - self.__beginning < 5:
self.__data_recieve = com.decode_ascii(com.receive(1))
if self.__data_recieve == "\n":
self.__data_recieve = com.decode_ascii(com.receive(1))
if self.__data_recieve == "R":
self.__data_recieve = com.decode_ascii(com.receive(1))
if self.__data_recieve == "D":
self.__data_recieve = com.decode_ascii(com.receive(1))
if self.__data_recieve == "\n":
self.go = 1
logger.info("Hook successful")
break
else:
pass
else:
pass
else:
pass
else:
pass
else:
self.go = 0
logger.error("Microcontroller not available, stopping connection")
break
if self.go == 1:
logger.info("Receiving data...")
for i in range(4):
self.__x = com.receive(28)
self.__output = str(com.decode_float(self.__x[21:27]))
if self.__pos == 1:
self.ids.temp_s1.text = self.__output
elif self.__pos == 2:
self.ids.temp_s2.text = self.__output
elif self.__pos == 3:
self.ids.temp_s3.text = self.__output
elif self.__pos == 4:
self.ids.temp_s4.text = self.__output
self.__pos += 1
logger.info("Recieved data")
com.quitcom()
else:
self.open_confail_pu()
def create_com(self):
self.coms = bin.lib.communication.Communication()
def send_data(self):
try:
self.create_com()
self.go = 1
except Exception as e:
self.go = 0
logger.critical(f"COM_Error: Microcontroller unavailable: {e}")
if self.go == 1:
logger.info("Preparing transmission...")
self.__transmit = []
if self.ids.temp_s1.text != "" and self.ids.temp_s2.text != "" and self.ids.temp_s3.text != "" and self.ids.temp_s4.text != "":
self.__transmit.append(self.ids.temp_s1.text)
self.__transmit.append(self.ids.temp_s2.text)
self.__transmit.append(self.ids.temp_s3.text)
self.__transmit.append(self.ids.temp_s4.text)
logger.debug("Transmitting...")
self.coms.change_temp(self.__transmit, special_port)
self.ids.temp_s1.text = ""
self.ids.temp_s2.text = ""
self.ids.temp_s3.text = ""
self.ids.temp_s4.text = ""
self.openconfpu()
else:
self.openerrorpu()
logger.debug("Missing fields")
else:
self.open_confail_pu()
def openerrorpu(self):
self.pu = MissingFieldsError()
self.pu.open()
def openconfpu(self):
self.confpu = SaveConf()
self.confpu.open()
def open_confail_pu(self):
self.cfpu = ConnectionFail()
self.cfpu.open()
class ReadData(Screen):
def read_data(self):
logger.info("Trying to connect to the microcontroller")
try:
com.connect(19200, special_port)
self.go = 1
except Exception as e:
self.go = 0
logger.error(f"COM_Error: {e}")
if self.go == 1:
logger.info("Sending instructions to the microcontroller...")
com.send("RD")
self.__pos = 1
self.__beginning = time.time()
self.go = 1
logger.info("Awaiting confirmation from the microcontroller for hook")
while True:
if time.time() - self.__beginning < 5:
self.__data_recieve = com.decode_ascii(com.receive(1))
if self.__data_recieve == "\n":
self.__data_recieve = com.decode_ascii(com.receive(1))
if self.__data_recieve == "R":
self.__data_recieve = com.decode_ascii(com.receive(1))
if self.__data_recieve == "D":
self.__data_recieve = com.decode_ascii(com.receive(1))
if self.__data_recieve == "\n":
self.go = 1
logger.info("Hook successful")
break
else:
pass
else:
pass
else:
pass
else:
pass
else:
self.go = 0
logger.error("Microcontroller not available, stopping connection")
break
if self.go == 1:
logger.info("Receiving data")
for i in range(4):
self.__x = com.receive(28)
self.__output = "a: "
self.__output += str(com.decode_float(self.__x[0:6]))
self.__output += f"\nb: {str(com.decode_float(self.__x[7:13]))}"
self.__output += f"\nc: {str(com.decode_float(self.__x[14:20]))}"
self.__output += f"\nTemp: {str(com.decode_float(self.__x[21:27]))}"
if self.__pos == 1:
self.ids.inf_sonde1.text = self.__output
elif self.__pos == 2:
self.ids.inf_sonde2.text = self.__output
elif self.__pos == 3:
self.ids.inf_sonde3.text = self.__output
elif self.__pos == 4:
self.ids.inf_sonde4.text = self.__output
self.__pos += 1
logger.info("Received data")
else:
self.open_confail_pu()
com.quitcom()
else:
self.open_confail_pu()
def open_confail_pu(self):
self.cfpu = ConnectionFail()
self.cfpu.open()
class Credits(Screen):
pass
class Modify(Screen):
def read_config(self):
logger.debug("Reading config")
self.config_imp = []
self.__export = []
self.config_imp = cvr.importing("./config/config.csv")
self.__export = self.config_imp.pop(0)
self.__extracted = self.__export.pop(0)
logger.debug(f"Mode at: {self.__extracted}")
if self.__extracted == "1":
self.ids.prsel.state = "normal"
else:
self.ids.prsel.state = "down"
def issue_reporting(self):
logger.info("Clicked error reporting button")
webbrowser.open("https://github.com/simplePCBuilding/BiogasControllerApp/issues", new=2)
def change_programming(self):
logger.info("Switching programming mode")
self.csv_import = []
self.csv_import = cvr.importing("./config/config.csv")
self.csv_import.pop(0)
if self.ids.prsel.text == "Full\nreprogramming":
self.csv_import.insert(0, 1)
else:
self.csv_import.insert(0, 2)
logger.debug(f"Mode now: {self.csv_import}")
cvw.write_str("./config/config.csv", self.csv_import)
########################################################
# Screenmanager
########################################################
class RootScreen(ScreenManager):
pass
class BiogasControllerApp(App):
def build(self):
self.icon = "./BiogasControllerAppLogo.png"
self.title = "BiogasControllerApp"
return Builder.load_file("./bin/gui/gui.kv")
logger.info("Init finished, starting UI")
try:
if __name__ == "__main__":
bga = BiogasControllerApp()
bga.run()
except Exception as e:
logger.critical(e, exc_info=True)

View File

@@ -1 +0,0 @@
2
1 2

View File

@@ -1,19 +0,0 @@
[Port Settings]
specificport = None
[UI Config]
sizeh = 600
sizew = 800
[Dev Settings]
verbose = False
log_level = DEBUG
disableconnectioncheck = False
[License]
show = 1
[Info]
version = V2.3.0
subversion =

View File

@@ -1,5 +0,0 @@
What is getting logged?
Generally this app logs how and when you interact with the app and sometimes, which values you enter.
No logs are being sent to anybody automatically, you can choose to attach the log file to the bug report.
This helps the devs a lot, as they can better understand the state of the app as it crashed. The logs are
all found in this folder here.

View File

@@ -19,6 +19,7 @@ a = Analysis(
cipher=block_cipher, cipher=block_cipher,
noarchive=False, noarchive=False,
) )
pyz = PYZ(a.pure, a.zipped_data, cipher=block_cipher) pyz = PYZ(a.pure, a.zipped_data, cipher=block_cipher)
exe = EXE( exe = EXE(
@@ -28,7 +29,7 @@ exe = EXE(
a.zipfiles, a.zipfiles,
a.datas, a.datas,
[], [],
name='BiogasControllerApp-V2.3-stable', name='BiogasControllerApp',
debug=False, debug=False,
bootloader_ignore_signals=False, bootloader_ignore_signals=False,
strip=False, strip=False,
@@ -41,14 +42,17 @@ exe = EXE(
target_arch=None, target_arch=None,
codesign_identity=None, codesign_identity=None,
entitlements_file=None, entitlements_file=None,
icon='BiogasControllerAppLogo-V2.3.ico', icon='BiogasControllerAppLogo.ico',
) )
coll = COLLECT(exe, Tree('C:\\BiogasControllerApp-V2.3\\'), coll = COLLECT(
a.binaries, exe,
a.zipfiles, Tree('.'),
a.datas, a.binaries,
*[Tree(p) for p in (sdl2.dep_bins + glew.dep_bins)], a.zipfiles,
strip=False, a.datas,
upx=True, *[Tree(p) for p in (sdl2.dep_bins + glew.dep_bins)],
name='touchtracer') strip=False,
upx=True,
name='biogascontrollerapp'
)

BIN
BiogasControllerAppLogo.ico Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 65 KiB

BIN
BiogasControllerAppLogo.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 32 KiB

BIN
BiogasControllerAppLogo.xcf Executable file

Binary file not shown.

53
LICENSE
View File

@@ -619,56 +619,3 @@ Program, unless a warranty or assumption of liability accompanies a
copy of the Program in return for a fee. copy of the Program in return for a fee.
END OF TERMS AND CONDITIONS END OF TERMS AND CONDITIONS
How to Apply These Terms to Your New Programs
If you develop a new program, and you want it to be of the greatest
possible use to the public, the best way to achieve this is to make it
free software which everyone can redistribute and change under these terms.
To do so, attach the following notices to the program. It is safest
to attach them to the start of each source file to most effectively
state the exclusion of warranty; and each file should have at least
the "copyright" line and a pointer to where the full notice is found.
<one line to give the program's name and a brief idea of what it does.>
Copyright (C) <year> <name of author>
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <https://www.gnu.org/licenses/>.
Also add information on how to contact you by electronic and paper mail.
If the program does terminal interaction, make it output a short
notice like this when it starts in an interactive mode:
<program> Copyright (C) <year> <name of author>
This program comes with ABSOLUTELY NO WARRANTY; for details type `show w'.
This is free software, and you are welcome to redistribute it
under certain conditions; type `show c' for details.
The hypothetical commands `show w' and `show c' should show the appropriate
parts of the General Public License. Of course, your program's commands
might be different; for a GUI interface, you would use an "about box".
You should also get your employer (if you work as a programmer) or school,
if any, to sign a "copyright disclaimer" for the program, if necessary.
For more information on this, and how to apply and follow the GNU GPL, see
<https://www.gnu.org/licenses/>.
The GNU General Public License does not permit incorporating your program
into proprietary programs. If your program is a subroutine library, you
may consider it more useful to permit linking proprietary applications with
the library. If this is what you want to do, use the GNU Lesser General
Public License instead of this License. But first, please read
<https://www.gnu.org/licenses/why-not-lgpl.html>.

106
README.md
View File

@@ -1,55 +1,81 @@
<div id="title" align="center">
<img src="./BiogasControllerAppLogo.png" width="300">
<h1>BiogasControllerApp</h1>
</div>
<div id="badges" align="center">
<img src="https://img.shields.io/github/license/janishutz/BiogasControllerApp.svg">
<img src="https://img.shields.io/github/repo-size/janishutz/BiogasControllerApp.svg">
<img src="https://img.shields.io/github/languages/top/janishutz/BiogasControllerApp">
<img src="https://img.shields.io/github/directory-file-count/janishutz/BiogasControllerApp.svg">
<br>
<img alt="GitHub Repo stars" src="https://img.shields.io/github/stars/janishutz/BiogasControllerApp">
<img alt="GitHub watchers" src="https://img.shields.io/github/watchers/janishutz/BiogasControllerApp">
<img src="https://img.shields.io/github/issues-pr-raw/janishutz/BiogasControllerApp">
<img alt="GitHub forks" src="https://img.shields.io/github/forks/janishutz/BiogasControllerApp">
<img alt="GitHub commit activity" src="https://img.shields.io/github/commit-activity/m/janishutz/BiogasControllerApp">
<br>
<img alt="GitHub downloads all releases" src="https://img.shields.io/github/downloads/janishutz/BiogasControllerApp/total?label=Downloads (total)">
<img alt="GitHub downloads release (latest release)" src="https://img.shields.io/github/downloads/janishutz/BiogasControllerApp/latest/total?label=Downloads (latest)">
<img src="https://img.shields.io/github/release/janishutz/BiogasControllerApp.svg">
</div>
# **BiogasControllerApp V2.3.0** <div id="donate" align="center">
<a href="https://store.janishutz.com/donate" target="_blank"><img src="https://store-cdn.janishutz.com/static/support-me.jpg" width="150px"></a>
</div>
THIS SOFWARE FALLS UNDER THE GPL V3 LICENSE AND AS SUCH COMES WITH BiogasControllerApp has just received a major rewrite, where I focused on code-readability, documentation and stability. The documentation in the code is aimed at beginners and does contain some unnecessary extra comments
ABSOLUTELY NO WARRANTY!
***LOOKING FOR A MacOS BUILD MAINTAINER! You may follow the official build instructions on the kivy.org website. All other materials should already be included in this repository*** If you are here to read the code, the files you are most likely looking for can be found in the `lib` folder. If you want to understand and have a look at all of the application, start with the `biogascontrollerapp.py` file
Thank you for downloading the new Version of the BiogasControllerApp! You are greeted with # Installation
lots of new features, including a new and redesigned Graphical User Interface (later "GUI") To install it, navigate to the releases tab on the right hand side. Click the current release, scroll down to assets and select the version appropriate for your operating system.
and an automatic assignment of the comport on all supported Operating systems (see supported OS section).
That means:
- on Windows, select BiogasControllerApp-Windows.zip
- on Linux, you may download the tarball or you may also download the `install-linux.sh` script to automatically install it for you. Just note: You need to enable execute permissions for the file!
Compared to older versions, the new BiogasControllerApp doesn't install itself as an app and only resides in a folder where you can launch it using the executable or the `launch.sh` script.
## *FEATURE LIST* ## Troubleshooting
- Easily read out the data the Microcontroller used in ENATECH sends If you get a warning from Windows, the reason for this is that this app bundle is unsigned (since a signing certificate is about USD 350/year), so it might warn you about that. You can safely click "Run anyway" or the like to bypass that problem.
- Easily change the coefficients for the temperature sonds
- Easily change the temperature that is set for the controller to heat to If this makes you uncomfortable, you may simply install python and install the necessary dependencies (see below) and run the app using Python.
- Easy to navigate menus and submenus for better organisation
- (Almost) Bugfree # Features
- Detailed and still private logging (not sending any information to anybody except you do) - Read data the microcontroller in ENATECH sends
- Configure the microcontroller (Coefficients & Temperature). Old settings will be pre-loaded
- Focus on code quality and readability as well as stability
- Tips to resolve errors directly in the app
- The app is still maintained and as such known issues will be resolved - The app is still maintained and as such known issues will be resolved
- Highly detailed error resolving instructions directly inside the app - Clean UI focusing on ease of use
- Easy to run: No extra Software required (e.g. Python or similar) (currently Windows only) - Documented code so you can more easily understand what is happening
- Easy to install and uninstall as it has an installer and uninstaller
- Some settings are available through the /config/settings.ini file
# Issues
If you encounter any bugs or other weird behaviour, please open an issue on this GitHub repository, contact me on my [support page](https://support.janishutz.com) or send me an [email](mailto:development@janishutz.com)
# Documentation
You may find documentation for this project in its wiki here on GitHub. The code is also documented with explanations what it does
*Officially Supported OS* # Officially Supported OS
- Microsoft Windows 10, 11 (through my installer, may though support older Versions but this is not verified. Open an issue if you have managed to run it on an older version of Windows) - Microsoft Windows 10, 11 (through the provided compiled package, might work on older versions as well)
- Microsoft Windows XP, Vista, 7, 8, 10, 11 (through running the package with Python yourself) - Microsoft Windows XP, Vista, 7, 8, 10, 11 (through running the package with Python yourself)
- MacOS 10.9 (Mavericks) or later (required by Python) - GNU/Linux: All distros that support Python 3.8 or later (use `install-linux.sh` to install and `launch.sh` to launch for convenience)
- GNU/Linux: All distros that support Python 3.8 or later - FreeBSD: If you have Pyhton 3.8 or later installed
- FreeBSD: Works with slight modification of the source code
## Dependencies
Only needed if you run with python directly
*Required packages/programs to install the app yourself* - Python 3.10 - latest (only tested on this version, but should work down to at least 3.8)
- Python 3.8 - 3.10 (only tested on these versions) - kivy[base]==2.3.1
- kivy - kivymd==1.1.1
- pyserial - pyserial==3.5
To install them, run `pip install -r requirements.txt`
DEVELOPMENT:
- BiogasControllerApp V2.x: simplePCBuilding # Contributing
If you wish to contribute to this project, please fork this repository, create a new branch in your fork, make your changes and open a pull request in this repo.
COPYRIGHT 2022 simplePCBuilding
<div id="donate" align="center">
<a href="https://store.janishutz.com/donate" target="_blank"><img src="https://store-cdn.janishutz.com/static/support-me.jpg" width="150px"></a>
</div>

View File

@@ -4,13 +4,15 @@
Currently only the newest versions get security updates as security updates are also part of a release. Currently only the newest versions get security updates as security updates are also part of a release.
The support for Version 2.2.0 and below will end with the next Version (2.4.0)! Only Version 3.1 and later are supported due to the poor code quality of V2.3.0 and different UI before.
| Version | Supported | | Version | Supported |
| ------- | ------------------ | | ------- | ------------------ |
|2.3.0 | ✅ | | 3.1.X | ✅ |
| 2.2.0 | ✅ | | 3.0.X | ✅ |
| 2.1.0 | | | 2.3.0 | |
| 2.2.0 | ❎ |
| 2.1.0 | ❎ |
| 1.0.0 | ❎ | | 1.0.0 | ❎ |
## Reporting a Vulnerability ## Reporting a Vulnerability

199
biogascontrollerapp.py Normal file
View File

@@ -0,0 +1,199 @@
# ────────────────────────────────────────────────────────────────────
# ╭────────────────────────────────────────────────╮
# │ BiogasControllerApp │
# ╰────────────────────────────────────────────────╯
#
# So you would like to read the source code? Nice!
# Just be warned, this application uses Thread and a UI Toolkit called
# Kivy to run. If you are unsure of what functions do, consider
# checking out the kivy docs at https://kivy.org/doc.
# It also uses the pyserial library for communication with the micro-
# controller with RS232
#
# ────────────────────────────────────────────────────────────────────
# Load the config file
import time
from lib.config import read_config, set_verbosity, str_to_bool
verbose = str_to_bool(read_config("Dev", "verbose", "False", type_to_validate="bool"))
verbose = verbose if verbose != None else False
# Introducing tariffs to Python imports.
# It was too funny of an idea to miss out on
# You can enable or disable this in the config.
# It is disabled by default
if str_to_bool(
read_config("Tariffs", "impose_tariffs", "False", type_to_validate="bool")
):
try:
import tariff
tariff.set(
{
"kivy": int(
read_config("Tariffs", "kivy_rate", "0", type_to_validate="int")
),
"serial": int(
read_config("Tariffs", "pyserial_rate", "0", type_to_validate="int")
),
}
)
except Exception as e:
print(e)
print(
"You cannot evade the tariffs. I will impose impose a tariff of 1000000% on the launch of this app!"
)
time.sleep(2000000)
import os
from typing import override
from lib.com import Com, ComSuperClass
import lib.test.com
# Load config and disable kivy log if necessary
if verbose:
pass
else:
os.environ["KIVY_NO_CONSOLELOG"] = "1"
# Load kivy modules. Kivy is the UI framework used. See https://kivy.org
from kivy.core.window import Window
from kivy.uix.screenmanager import ScreenManager
from kivymd.app import MDApp
# Set Window size
Window.size = (
int(int(read_config("UI", "width", "800", type_to_validate="int"))),
int(int(read_config("UI", "height", "600", type_to_validate="int"))),
)
# ╭────────────────────────────────────────────────╮
# │ Screens │
# ╰────────────────────────────────────────────────╯
# Import all the screens (= pages) used in the app
from gui.home.home import HomeScreen
from gui.program.program import ProgramScreen
from gui.about.about import AboutScreen
from gui.main.main import MainScreen
# ╭────────────────────────────────────────────────╮
# │ Screen Manager │
# ╰────────────────────────────────────────────────╯
# Kivy uses a screen manager to manage pages in the application
colors = [
"Red",
"Pink",
"Purple",
"DeepPurple",
"Indigo",
"Blue",
"LightBlue",
"Cyan",
"Teal",
"Green",
"LightGreen",
"Lime",
"Yellow",
"Amber",
"Orange",
"DeepOrange",
"Brown",
"Gray",
"BlueGray",
]
class BiogasControllerApp(MDApp):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.screen_manager = ScreenManager()
@override
def build(self):
# Configure com
filters = [
x
for x in read_config(
"Connection",
"filters",
"USB-Serial Controller, Prolific USB-Serial Controller",
).split(",")
]
baudrate = int(
read_config("Connection", "baudrate", "19200", type_to_validate="int")
)
com: ComSuperClass = Com(
baudrate,
filters,
)
if str_to_bool(
read_config("Dev", "use_test_library", "False", type_to_validate="bool")
):
com = lib.test.com.Com(
int(read_config("Dev", "fail_sim", "20", type_to_validate="int")),
baudrate,
filters,
)
com.set_port_override(read_config("Connection", "port_override", "None"))
self.theme_cls.theme_style = read_config(
"UI", "theme", "Dark", ["Dark", "Light"]
)
self.theme_cls.material_style = "M3"
self.theme_cls.primary_palette = read_config(
"UI", "primary_color", "Green", colors
)
self.theme_cls.accent_palette = read_config(
"UI", "accent_color", "Lime", colors
)
self.theme_cls.theme_style_switch_animation = False
if verbose:
print("\n", "-" * 20, "\n")
self.icon = "./BiogasControllerAppLogo.png"
self.title = "BiogasControllerApp-V3.1.0"
self.screen_manager.add_widget(HomeScreen(com, name="home"))
self.screen_manager.add_widget(MainScreen(com, name="main"))
self.screen_manager.add_widget(ProgramScreen(com, name="program"))
self.screen_manager.add_widget(AboutScreen(name="about"))
return self.screen_manager
def change_theme(self):
self.theme_cls.theme_style = (
"Dark" if self.theme_cls.theme_style == "Light" else "Light"
)
# Disallow this file to be imported
if __name__ == "__main__":
print(
"""
┏━━┓━━━━━━━━━━━━━━━━━━━━┏━━━┓━━━━━━━━━┏┓━━━━━━━━┏┓━┏┓━━━━━━━━┏━━━┓━━━━━━━━
┃┏┓┃━━━━━━━━━━━━━━━━━━━━┃┏━┓┃━━━━━━━━┏┛┗┓━━━━━━━┃┃━┃┃━━━━━━━━┃┏━┓┃━━━━━━━━
┃┗┛┗┓┏┓┏━━┓┏━━┓┏━━┓━┏━━┓┃┃━┗┛┏━━┓┏━┓━┗┓┏┛┏━┓┏━━┓┃┃━┃┃━┏━━┓┏━┓┃┃━┃┃┏━━┓┏━━┓
┃┏━┓┃┣┫┃┏┓┃┃┏┓┃┗━┓┃━┃━━┫┃┃━┏┓┃┏┓┃┃┏┓┓━┃┃━┃┏┛┃┏┓┃┃┃━┃┃━┃┏┓┃┃┏┛┃┗━┛┃┃┏┓┃┃┏┓┃
┃┗━┛┃┃┃┃┗┛┃┃┗┛┃┃┗┛┗┓┣━━┃┃┗━┛┃┃┗┛┃┃┃┃┃━┃┗┓┃┃━┃┗┛┃┃┗┓┃┗┓┃┃━┫┃┃━┃┏━┓┃┃┗┛┃┃┗┛┃
┗━━━┛┗┛┗━━┛┗━┓┃┗━━━┛┗━━┛┗━━━┛┗━━┛┗┛┗┛━┗━┛┗┛━┗━━┛┗━┛┗━┛┗━━┛┗┛━┗┛━┗┛┃┏━┛┃┏━┛
━━━━━━━━━━━┏━┛┃━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┃┃━━┃┃━━
━━━━━━━━━━━┗━━┛━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┗┛━━┗┛━━
Version 3.1.0
=> Initializing....
"""
)
set_verbosity(verbose)
BiogasControllerApp().run()
print("\n => Exiting!")

View File

@@ -1,4 +1,30 @@
***CHANGELOG*** ***CHANGELOG***
V3.1.0
- Completely redesigned User Interface using KivyMD
- Added config option for themes
V3.0.1
- Install script fixes
- Packaging fixes
V3.0.0
- Small UI fixes
- Consolidated multiple previously separate screens
- Completely rewritten backend
- Improved stability
- Cleaned, documented code
- Reduced overhead of connecting
- Improved hooking reliability
- Removed installer, simpler setup now possible
- Removed official MacOS support as it didn't really work before anyway
- Added additional config options
- Improved linguistics
- Bugfixes
OLD VERSIONS
------------
DEVELOPMENT VERSIONS DEVELOPMENT VERSIONS
@@ -104,3 +130,4 @@ V2.3
- ADDS some settings through a config file - ADDS some settings through a config file
- CHANGED License from NONE to GPL V3 - CHANGED License from NONE to GPL V3
- BUGFIXES - BUGFIXES

25
config.ini Normal file
View File

@@ -0,0 +1,25 @@
[Connection]
port_override = None
baudrate = 19200
# List the names as which the adapter cable will show up separated by commas below
# For ENATECH, the below is likely correct.
filters = USB-Serial Controller, Prolific USB-Serial Controller
[UI]
height = 600
width = 800
# Can be Dark or Light
theme = Dark
primary_color = Green
accent_color = Lime
[Dev]
verbose = False
use_test_library = False
# One time out of how many (plus one) it should fail
fail_sim = 20
[Tariffs]
impose_tariffs = False
kivy_rate = 0
pyserial_rate = 0

5
gui/README.md Normal file
View File

@@ -0,0 +1,5 @@
# GUI
This folder contains all files that are used for the GUI of the app.
It is written in KivyMD, so if you don't know what that is and you don't want to learn it,
there isn't much of use in here for you! - Just so you're warned

52
gui/about/about.kv Normal file
View File

@@ -0,0 +1,52 @@
<AboutScreen>:
name: "about"
MDFloatLayout:
Image:
source: "BiogasControllerAppLogo.png"
pos_hint: {"top": 0.9}
size_hint_y: .3
radius: 36, 36, 0, 0
allow_stretch: True
keep_ratio: True
MDGridLayout:
cols: 1
MDLabel:
text: "About"
font_size: 40
halign: 'center'
valign: 'center'
bold: True
italic: True
theme_text_color: 'Secondary'
pos_hint: {'center_x': 0, 'center_y': 0}
MDFillRoundFlatButton:
pos_hint: {'x': 0.1, 'y': 0.05}
text: "Back"
on_release:
app.root.current = "home"
root.manager.transition.direction = "up"
MDFillRoundFlatButton:
pos_hint: {'right': 0.9, 'y': 0.05}
text: "Report a Bug"
on_release:
root.goto("issues")
MDFillRoundFlatButton:
pos_hint: {'right': 0.48, 'y': 0.05}
text: "Wiki"
on_release:
root.goto("wiki")
MDFillRoundFlatButton:
pos_hint: {'x': 0.52, 'y': 0.05}
text: "Repo"
on_release:
root.goto("repo")
Label:
text: "This is a simple controller application that allows you to read data from and configure the microcontroller used in ENATECH at KSWO. It is written in Python using KivyMD as its UI framework.\n\nThis software is free Software licensed under the GNU General Public License Version 3 and as such comes with absolutely no warranty."
pos_hint: {'x': 0.05, 'top': 0.42}
text_size: self.width, None
size_hint: 0.9, None

38
gui/about/about.py Normal file
View File

@@ -0,0 +1,38 @@
from kivy.uix.screenmanager import Screen
from kivymd.uix.dialog import MDDialog
from kivymd.uix.button import MDFlatButton
from kivy.lang import Builder
import webbrowser
# Simple about screen
class AboutScreen(Screen):
def __init__(self, **kw):
# Prepare dialog
self.opened_web_browser_dialog = MDDialog(
title="Open Link",
text="Your webbrowser has been opened. Continue there",
buttons=[
MDFlatButton(
text="Ok",
on_release=lambda _: self.opened_web_browser_dialog.dismiss(),
),
],
)
super().__init__(**kw)
def goto(self, loc: str):
# Open web browser with links
if loc == "wiki":
webbrowser.open(
"https://github.com/janishutz/BiogasControllerApp/wiki", new=2
)
elif loc == "issues":
webbrowser.open(
"https://github.com/janishutz/BiogasControllerApp/issues", new=2
)
elif loc == "repo":
webbrowser.open("https://github.com/janishutz/BiogasControllerApp", new=2)
self.opened_web_browser_dialog.open()
Builder.load_file("./gui/about/about.kv")

62
gui/home/home.kv Normal file
View File

@@ -0,0 +1,62 @@
<HomeScreen>:
name: "home"
MDFloatLayout:
Image:
source: "BiogasControllerAppLogo.png"
pos_hint: {"top": 0.9}
size_hint_y: .3
radius: 36, 36, 0, 0
allow_stretch: True
keep_ratio: True
MDGridLayout:
cols: 1
MDLabel:
text: "BiogasControllerApp"
font_size: 50
halign: 'center'
valign: 'center'
bold: True
italic: True
theme_text_color: 'Secondary'
pos_hint: {'center_x': 0, 'center_y': 0}
MDGridLayout:
spacing: 20
size_hint: None, None
size: self.minimum_size
cols: 2
pos_hint: {'center_x': 0.5, 'center_y': 0.3 }
MDFillRoundFlatButton:
font_size: 30
text: "Start"
on_release: root.start()
MDFillRoundFlatButton:
text: "Quit"
font_size: 30
pos_hint: {"x": 0.7, "center_y": 0}
on_release: root.quit()
MDLabel:
text: "You are running version V3.1.0"
font_size: 13
pos_hint: {"y": -0.45, "x":0}
halign: 'center'
MDFlatButton:
text: "About"
font_size: 13
size_hint: 0.07, 0.06
pos_hint: {"x":0.01, "y":0.01}
on_release:
root.to_about()
# MDFlatButton:
# text: "Change Theme"
# font_size: 13
# size_hint: 0.07, 0.06
# pos_hint: {"right":0.99, "y":0.01}
# on_release:
# app.change_theme()

125
gui/home/home.py Normal file
View File

@@ -0,0 +1,125 @@
from kivy.base import Clock
from kivymd.app import MDApp
from kivymd.uix.button import MDFlatButton
from kivymd.uix.dialog import MDDialog
from kivymd.uix.screen import MDScreen
from kivy.lang import Builder
from lib.com import ComSuperClass
import platform
# Information for errors encountered when using pyserial
information = {
"Windows": {
"2": "Un- and replug the cable and ensure you have the required driver(s) installed",
"13": "You are probably missing a required driver or your cable doesn't work. Consult the wiki for more information",
"NO_COM": "Could not find a microcontroller. Please ensure you have one connected and the required driver(s) installed",
},
"Linux": {
"2": "Un- and replug the cable, or if you haven't plugged a controller in yet, do that",
"13": "Incorrect permissions at /dev/ttyUSB0. Open a terminal and type: sudo chmod 777 /dev/ttyUSB0",
"NO_COM": "Could not find a microcontroller. Please ensure you have one connected",
},
}
# This is the launch screen, i.e. what you see when you start up the app
class HomeScreen(MDScreen):
def __init__(self, com: ComSuperClass, **kw):
self._com = com
self.connection_error_dialog = MDDialog(
title="Connection",
text="Failed to connect. See Details for more information and troubleshooting guide",
buttons=[
MDFlatButton(
text="Cancel",
on_release=lambda _: self.connection_error_dialog.dismiss(),
),
MDFlatButton(
text="Details", on_release=lambda _: self.open_details_popup()
),
],
)
self.quit_dialog = MDDialog(
title="Exit BiogasControllerApp",
text="Do you really want to exit BiogasControllerApp?",
buttons=[
MDFlatButton(
text="Cancel",
on_release=lambda _: self.quit_dialog.dismiss(),
),
MDFlatButton(text="Quit", on_release=lambda _: self._quit()),
],
)
super().__init__(**kw)
def _quit(self):
self._com.close()
MDApp.get_running_app().stop()
def start(self):
Clock.schedule_once(lambda _: self._start())
# Go to the main screen if we can establish connection or the check was disabled
# in the configs
def _start(self):
if self._com.connect():
self.manager.current = "main"
self.manager.transition.direction = "right"
else:
self.connection_error_dialog.open()
print("[ COM ] Connection failed!")
# Open popup for details as to why the connection failed
def open_details_popup(self):
self.connection_error_dialog.dismiss()
self.details_dialog = MDDialog(
title="Troubleshooting",
text=self._generate_help(),
buttons=[
MDFlatButton(
text="Ok", on_release=lambda _: self.details_dialog.dismiss()
)
],
)
self.details_dialog.open()
def _generate_help(self) -> str:
operating_system = platform.system()
if operating_system == "Windows" or operating_system == "Linux":
port = self._com.get_comport()
if port == "Sim":
return "Running in simulator, so this error is just simulated"
information["Linux"][
"13"
] = f"Incorrect permissions at {port}. Resolve by running 'sudo chmod 777 {port}'"
if port == "":
return information[operating_system]["NO_COM"]
err = self._com.get_error()
if err != None:
return information[operating_system][str(err.errno)]
else:
return "No error message available"
else:
return (
"You are running on an unsupported Operating System. No help available"
)
# Helper to open a Popup to ask user whether to quit or not
def quit(self):
self.quit_dialog.open()
# Switch to about screen
def to_about(self):
self.manager.current = "about"
self.manager.transition.direction = "down"
# Load the design file for this screen (.kv files)
# The path has to be relative to root of the app, i.e. where the biogascontrollerapp.py
# file is located
Builder.load_file("./gui/home/home.kv")

124
gui/main/main.kv Normal file
View File

@@ -0,0 +1,124 @@
<MainScreen>:
on_pre_enter: root.reset()
name: "main"
MDFloatLayout:
MDGridLayout:
cols: 1
pos_hint: {'x': 0, 'y': 0.4}
MDLabel:
text: "READOUT"
font_size: 40
halign: 'center'
valign: 'center'
pos_hint: {'center_x': 0, 'center_y': 0}
bold: True
MDGridLayout:
cols:4
size_hint: 0.8, 0.3
pos_hint: {"x":0.1, "y":0.4}
MDLabel:
text: "Sensor 1: "
font_size: 20
MDLabel:
id: sensor1
text: ""
size_hint: 1, 1
halign: 'left'
text_size: self.size
MDLabel:
text: "Sensor 2: "
font_size: 20
MDLabel:
id: sensor2
text: ""
size_hint: 1, 1
halign: 'left'
text_size: self.size
MDLabel:
text: "Sensor 3: "
font_size: 20
MDLabel:
id: sensor3
text: ""
size_hint: 1, 1
halign: 'left'
text_size: self.size
MDLabel:
text: "Sensor 4: "
font_size: 20
MDLabel:
id: sensor4
text: ""
size_hint: 1, 1
halign: 'left'
text_size: self.size
MDFillRoundFlatButton:
text: "Connect"
size_hint: 0.15, 0.09
pos_hint: {"x": 0.03, "y": 0.05}
on_release:
root.start()
MDFillRoundFlatButton:
text: "Disconnect"
size_hint: 0.15, 0.09
pos_hint: {"x": 0.2, "y": 0.05}
on_release:
root.end()
MDFillRoundFlatButton:
text: "Back"
size_hint: 0.15, 0.09
pos_hint: {"right": 0.95, "y":0.05}
md_bg_color: app.theme_cls.primary_dark
on_release:
root.end()
app.root.current = "home"
root.manager.transition.direction = "left"
MDGridLayout:
cols: 2
size_hint: 0.15, 0.1
pos_hint: {"x":0.1, "y":0.15}
MDLabel:
text: "Fast Mode"
valign: "center"
MDSwitch:
id: mode_selector
on_active: root.switch_mode()
icon_active: "check"
MDFillRoundFlatButton:
text: "Configuration"
size_hint: 0.1, 0.07
pos_hint: {"x":0.45, "y":0.06}
md_bg_color: app.theme_cls.accent_dark
on_release:
root.end()
app.root.current = "program"
root.manager.transition.direction = "down"
MDGridLayout:
size_hint: 0.2, None
spacing: 0
padding: 0
cols: 1
pos_hint: {'right': 0.95, 'top': 0.95}
MDLabel:
id: status
text: "Status will appear here"
font_size: 10
halign: 'right'
MDGridLayout:
size_hint: None, None
spacing: 0
padding: 0
cols: 1
pos_hint: {'right': 0.95, 'top': 0.925}
MDLabel:
id: port
text: "Port: Not connected"
font_size: 10
halign: 'right'

283
gui/main/main.py Normal file
View File

@@ -0,0 +1,283 @@
from ctypes import ArgumentError
from time import time
from typing import List, override
from kivymd.uix.screen import MDScreen
from kivy.lang import Builder
from kivy.clock import Clock, ClockEvent
from kivymd.uix.button import MDFlatButton
from kivymd.uix.dialog import MDDialog
import queue
import threading
# Load utilities
from lib.instructions import Instructions
from lib.com import ComSuperClass
from lib.decoder import Decoder
# TODO: Consider consolidating start and stop button
# Queue with data that is used to synchronize
synced_queue: queue.Queue[List[str]] = queue.Queue()
# ╭────────────────────────────────────────────────╮
# │ Data Reading Thread Helper │
# ╰────────────────────────────────────────────────╯
# Using a Thread to run this in parallel to the UI to improve responsiveness
class ReaderThread(threading.Thread):
_com: ComSuperClass
_decoder: Decoder
_instructions: Instructions
# This method allows the user to set Com object to be used.
# The point of this is to allow for the use of a single Com object to not waste resources
def set_com(self, com: ComSuperClass):
"""Set the Com object to be used in this
Args:
com: The com object to be used
"""
self._com = com
self._run = True
self._decoder = Decoder()
self._instructions = Instructions(com)
# This method is given by the Thread class and has to be overriden to change
# what is executed when the thread starts
@override
def run(self) -> None:
self._run = True
if self._com == None:
raise ArgumentError("Com object not passed in (do using set_com)")
# Hook to output stream
if self._instructions.hook_main():
# We are now hooked to the stream (i.e. data is synced)
synced_queue.put(["HOOK", self._com.get_comport()])
# making it exit using the stop function
while self._run:
# Take note of the time before reading the data to deduce frequency of updates
start_time = time()
# We need to read 68 bytes of data, given by the program running on the controller
received = self._com.receive(68)
# Store the data in a list of strings
data: List[str] = []
# For all sensors connected, execute the same thing
for i in range(4):
# The slicing that happens here uses offsets automatically calculated from the sensor id
# This allows for short code
try:
data.append(
f"Tadc: {
self._decoder.decode_int(received[12 * i:12 * i + 4])
}\nTemp: {
round(self._decoder.decode_float(received[12 * i + 5:12 * i + 11]) * 1000) / 1000
}°C\nDC: {
round((self._decoder.decode_float_long(received[48 + 5 * i: 52 + 5 * i]) / 65535.0 * 100) * 1000) / 1000
}%"
)
except:
data.append("Bad data")
# Calculate the frequency of updates
data.append(
str(round((1 / (time() - start_time)) * 1000) / 1000) + " Hz"
)
synced_queue.put(data)
else:
# Send error message to the UI updater
synced_queue.put(["ERR_HOOK"])
return
def stop(self) -> None:
self._run = False
# ╭────────────────────────────────────────────────╮
# │ Main App Screen │
# ╰────────────────────────────────────────────────╯
# This is the main screen, where you can read out data
class MainScreen(MDScreen):
_event: ClockEvent
# The constructor if this class takes a Com object to share one between all screens
# to preserve resources and make handling better
def __init__(self, com: ComSuperClass, **kw):
# Set some variables
self._com = com
self._event = None
self._fast_mode = False
# Set up Dialog for erros
self.connection_error_dialog = MDDialog(
title="Connection",
text="Failed to connect. Do you wish to retry?",
buttons=[
MDFlatButton(
text="Cancel",
on_release=lambda _: self.connection_error_dialog.dismiss(),
),
MDFlatButton(text="Retry", on_release=lambda _: self.start()),
],
)
self.mode_switch_error_dialog = MDDialog(
title="Mode Switch",
text="Failed to change mode. Please try again",
buttons=[
MDFlatButton(
text="Ok",
on_release=lambda _: self.mode_switch_error_dialog.dismiss(),
),
],
)
# Prepare the reader thread
self._prepare_reader()
self._has_run = False
self._has_connected = False
# Call the constructor for the Screen class
super().__init__(**kw)
def _prepare_reader(self):
# Prepares the reader thread
self._reader = ReaderThread()
self._reader.daemon = True
self._reader.set_com(self._com)
# Small helper function that makes the UI not freeze by offloading
def start(self):
Clock.schedule_once(lambda _: self._start())
# Start the connection to the micro-controller to read data from it.
# This also starts the reader thread to continuously read out data
def _start(self):
# Prevent running multiple times
self.connection_error_dialog.dismiss()
if self._has_connected:
return
# Some UI config
self.ids.status.text = "Connecting..."
if self._com.connect():
print("[ COM ] Connection Acquired")
# Prevent multiple connections
self._has_connected = True
self._has_run = True
if self._has_run:
self._prepare_reader()
# Start communication
self._reader.start()
print("[ COM ] Reader has started")
# Schedule UI updates
self._event = Clock.schedule_interval(self._update_screen, 0.5)
else:
self.ids.status.text = "Connection failed"
self.connection_error_dialog.open()
# End connection to micro-controller and set it back to normal mode
def end(self, set_msg: bool = True):
# Set micro-controller back to Normal Mode when ending communication
# to make sure temperature control will work
if self._has_connected:
if self._event != None:
self._event.cancel()
self._reader.stop()
# Join the thread to end it safely
try:
self._reader.join()
except:
pass
# Go back to Normal Mode on the Controller
# This is so you don't accidentally forget!
try:
self._com.send("NM")
except:
pass
self._com.close()
if set_msg:
self.ids.status.text = "Connection terminated"
self.ids.port.text = "Port: Not connected"
self._has_connected = False
print("Connection terminated")
# A helper function to update the screen. Is called on an interval
def _update_screen(self, _):
update = []
try:
update = synced_queue.get_nowait()
except:
pass
if len(update) == 0:
# There are no updates to process, don't block and simply try again next time
return
if len(update) == 1:
# Sync errors
if update[0] == "ERR_HOOK":
self.ids.status.text = "Hook failed"
self.end(False)
if len(update) == 2:
# Connection successful
if update[0] == "HOOK":
self.ids.status.text = "Connected to controller"
self.ids.port.text = "Port: " + update[1]
else:
# Update the UI
self.ids.sensor1.text = update[0]
self.ids.sensor2.text = update[1]
self.ids.sensor3.text = update[2]
self.ids.sensor4.text = update[3]
self.ids.status.text = "Connected, f = " + update[4]
# Reset the screen when the screen is entered
def reset(self):
self.ids.sensor1.text = ""
self.ids.sensor2.text = ""
self.ids.sensor3.text = ""
self.ids.sensor4.text = ""
self.ids.status.text = "Status will appear here"
self.ids.port.text = "Port: Not connected"
# Switch the mode for the micro-controller
def switch_mode(self):
# Store if we have been connected to the micro-controller before mode was switched
was_connected = self._has_connected
# Disconnect from the micro-controller
self.end()
self.ids.status.text = "Setting mode..."
# Try to set the new mode
try:
if self._fast_mode:
self._com.send("NM")
else:
self._com.send("FM")
except:
self.mode_switch_error_dialog.open()
return
self.ids.status.text = "Mode set"
# If we have been connected, reconnect
if was_connected:
self.start()
# Load the design file for this screen (.kv files)
# The path has to be relative to root of the app, i.e. where the biogascontrollerapp.py
# file is located
Builder.load_file("./gui/main/main.kv")

123
gui/program/program.kv Normal file
View File

@@ -0,0 +1,123 @@
<ProgramScreen>:
name: "program"
on_enter: self.config_loader = root.load_config()
FloatLayout:
MDGridLayout:
cols: 1
pos_hint: {'x': 0, 'y': 0.4}
MDLabel:
text: "Configuration"
font_size: 40
halign: 'center'
valign: 'center'
pos_hint: {'center_x': 0, 'center_y': 0}
bold: True
MDGridLayout:
cols: 1
pos_hint: {'x': 0, 'y': 0.33}
MDLabel:
text: "Change the configuration of the microcontroller"
font_size: 18
halign: 'center'
valign: 'center'
pos_hint: {'center_x': 0, 'center_y': 0}
italic: True
MDGridLayout:
cols: 1
pos_hint: {'x': 0, 'y': 0.25}
MDLabel:
id: status
text: "Loading..."
font_size: 17
halign: 'center'
bold: True
MDGridLayout:
size_hint: 0.9, 0.5
spacing: 10
pos_hint: {"x":0.05, "y":0.2}
cols: 4
MDTextField:
id: s1_a
hint_text: 'Sensor 1 a'
on_text: root.validate_float(self)
MDTextField:
id: s1_b
hint_text: 'Sensor 1 b'
on_text: root.validate_float(self)
MDTextField:
id: s1_c
hint_text: 'Sensor 1 c'
on_text: root.validate_float(self)
MDTextField:
id: s1_t
hint_text: 'Sensor 1 Temperature'
on_text: root.validate_float(self)
MDTextField:
id: s2_a
hint_text: 'Sensor 2 a'
on_text: root.validate_float(self)
MDTextField:
id: s2_b
hint_text: 'Sensor 2 b'
on_text: root.validate_float(self)
MDTextField:
id: s2_c
hint_text: 'Sensor 2 c'
on_text: root.validate_float(self)
MDTextField:
id: s2_t
hint_text: 'Sensor 2 Temperature'
on_text: root.validate_float(self)
MDTextField:
id: s3_a
hint_text: 'Sensor 3 a'
on_text: root.validate_float(self)
MDTextField:
id: s3_b
hint_text: 'Sensor 3 b'
on_text: root.validate_float(self)
MDTextField:
id: s3_c
hint_text: 'Sensor 3 c'
on_text: root.validate_float(self)
MDTextField:
id: s3_t
hint_text: 'Sensor 3 Temperature'
on_text: root.validate_float(self)
MDTextField:
id: s4_a
hint_text: 'Sensor 4 a'
on_text: root.validate_float(self)
MDTextField:
id: s4_b
hint_text: 'Sensor 4 b'
on_text: root.validate_float(self)
MDTextField:
id: s4_c
hint_text: 'Sensor 4 c'
on_text: root.validate_float(self)
MDTextField:
id: s4_t
hint_text: 'Sensor 4 Temperature'
on_text: root.validate_float(self)
MDFillRoundFlatButton:
size_hint: 0.1, 0.07
text: "Back"
pos_hint: {"x":0.1, "y":0.1}
background_color: (255, 0, 0, 0.6)
on_release:
app.root.current = "main"
root.manager.transition.direction = "up"
MDFillRoundFlatButton:
size_hint: 0.15, 0.09
text: "Save"
pos_hint: {"x":0.6, "y":0.1}
on_release:
root.save()

177
gui/program/program.py Normal file
View File

@@ -0,0 +1,177 @@
from typing import List
from kivymd.uix.screen import MDScreen
from kivy.lang import Builder
from lib.decoder import Decoder
from lib.instructions import Instructions
from kivymd.uix.button import MDFlatButton
from kivymd.uix.dialog import MDDialog
from lib.com import ComSuperClass
from kivy.clock import Clock
# The below list maps 0, 1, 2, 3 to a, b, c and t respectively
# This is used to set and read values of the UI
name_map = ["a", "b", "c", "t"]
class ProgramScreen(MDScreen):
def __init__(self, com: ComSuperClass, **kw):
self._com = com
self._instructions = Instructions(com)
self._decoder = Decoder()
# Configure Dialog
self.connection_error_dialog = MDDialog(
title="Connection",
text="Failed to connect. Do you wish to retry?",
buttons=[
MDFlatButton(
text="Cancel",
on_release=lambda _: self.connection_error_dialog.dismiss(),
),
MDFlatButton(text="Retry", on_release=lambda _: self.load_config()),
],
)
self.missing_fields_error_dialog = MDDialog(
title="Save",
text="Some fields are missing entries. Please fill them out and try again",
buttons=[
MDFlatButton(
text="Ok",
on_release=lambda _: self.missing_fields_error_dialog.dismiss(),
),
],
)
self.save_error_dialog = MDDialog(
title="Save",
text="Failed to save data. Please try again",
buttons=[
MDFlatButton(
text="Ok",
on_release=lambda _: self.save_error_dialog.dismiss(),
),
],
)
self.save_success_dialog = MDDialog(
title="Save",
text="Data saved successfully!",
buttons=[
MDFlatButton(
text="Ok",
on_release=lambda _: self.save_success_dialog.dismiss(),
),
],
)
super().__init__(**kw)
# Load the config (async to not freeze the UI)
def load_config(self):
Clock.schedule_once(lambda _: self._load())
# Load the current configuration from the micro-controller
def _load(self):
self.ids.status.text = "Loading..."
# Hook to the microcontroller's data stream (i.e. sync up with it)
if self._instructions.hook("RD", ["\n", "R", "D", "\n"]):
config: List[List[str]] = []
# Load config for all four sensors
for _ in range(4):
# Receive 28 bytes of data
received = bytes()
try:
received = self._com.receive(28)
except:
# Open error popup
self.connection_error_dialog.open()
return
# Create a list of strings to store the config for the sensor
# This list has the following elements: a, b, c, temperature
config_sensor_i: List[str] = []
# Create the list
for j in range(4):
config_sensor_i.append(
str(self._decoder.decode_float(received[7 * j : 7 * j + 6]))
)
# Add it to the config
config.append(config_sensor_i)
self.ids.status.text = ""
self._set_ui(config)
else:
self.connection_error_dialog.open()
# Set the elements of the UI to the values of the config
def _set_ui(self, config: List[List[str]]):
for sensor_id in range(4):
for property in range(4):
self.ids[f"s{sensor_id + 1}_{name_map[property]}"].text = config[
sensor_id
][property]
# Read values from the UI. Returns the values as a list or None if the check was infringed
def _read_ui(self, enforce_none_empty: bool = True) -> List[float] | None:
data: List[float] = []
# Iterate over all sensor config input fields and collect the data
for sensor_id in range(4):
for property in range(4):
value = self.ids[f"s{sensor_id + 1}_{name_map[property]}"].text
# If requested (by setting enforce_none_empty to True, which is the default)
# test if the cells are not empty and if we find an empty cell return None
if enforce_none_empty and value == "":
return
data.append(float(value))
return data
def save(self):
Clock.schedule_once(lambda _: self._save())
# Transmit the changed data to the micro-controller to reconfigure it
def _save(self):
self.ids.status.text = "Saving..."
data = self._read_ui()
if data == None:
self.missing_fields_error_dialog()
else:
try:
self._instructions.change_config(data)
except:
self.save_error_dialog.open()
return
self.save_success_dialog.open()
self.ids.status.text = "Saved!"
Clock.schedule_once(self.reset_update, 5)
def reset_update(self, _):
self.ids.status.text = ""
def validate_float(self, instance):
text = instance.text
# Allow only digits and one dot
if text.count(".") > 1 or any(c not in "0123456789." for c in text):
# Remove invalid characters
clean_text = "".join(c for c in text if c in "0123456789.")
# Remove extra dots
if clean_text.count(".") > 1:
first_dot = clean_text.find(".")
clean_text = clean_text[: first_dot + 1] + clean_text[
first_dot + 1 :
].replace(".", "")
instance.text = clean_text
# Load the design file for this screen (.kv files)
# The path has to be relative to root of the app, i.e. where the biogascontrollerapp.py
# file is located
Builder.load_file("./gui/program/program.kv")

93
install-linux.sh Executable file
View File

@@ -0,0 +1,93 @@
#!/bin/sh
# Create virtual environment to not clutter up local python install
echo "
___ ___ _ _ _ _____
( _ \ _ ( _ \ ( )_ (_ )(_ ) ( _ )
| (_) )_) _ __ _ _ ___| ( (_) _ ___ | _)_ __ _ | | | | __ _ __| (_) |_ _ _ _
| _ (| |/ _ \ / _ \/ _ ) __) | _ / _ \/ _ \ | ( __)/ _ \ | | | | / __ \ __) _ ) _ \( _ \
| (_) ) | (_) ) (_) | (_| |__ \ (_( ) (_) ) ( ) | |_| | ( (_) )| | | |( ___/ | | | | | (_) ) (_) )
(____/(_)\___/ \__ |\__ _)____/____/ \___/(_) (_)\__)_) \___/(___)___)\____)_) (_) (_) __/| __/
( )_) | | | | |
\___/ (_) (_)
WELCOME! This script will automatically install BiogasControllerApp for you!
We first have to ask a few questions. If you are unsure what they mean,
simply press enter to use default options, which are designed to make
uninstalling much easier. The default option is highlighted using capital
letters.
Please ensure you have wget installed. The script will verify and tell you
if you do not have it installed
If this script is not inside a full copy of the BiogasControllerApp repo,
the repo will be automatically downloaded for you.
"
use_venv=""
read -p "Install dependencies in a virtual environment? (Y/n) " use_venv
use_venv=$(echo "$use_venv" | tr '[:upper:]' '[:lower:]')
echo "
=> Checking for repo...
"
if [[ -f ./biogascontrollerapp.py ]]; then
echo " -> Data found, not downloading"
else
do_download=""
read -p " -> Data not found, okay to download? (Y/n) " do_download
do_download=$(echo "$do_download" | tr '[:upper:]' '[:lower:]')
if [[ "$do_download" == "y" || "$do_download" == "" ]]; then
# Check if wget is installed
if !command -v wget >/dev/null 2>&1; then
echo "wget unavailable. Please install using your distribution's package manager or manually download the repo from GitHub releases"
echo 1
fi
# Download the latest release package
wget https://github.com/janishutz/BiogasControllerApp/releases/latest/download/biogascontrollerapp-linux.tar.gz
# Extract the tar (as tar is basically standard on all distros)
tar -xf ./biogascontrollerapp-linux.tar.gz
# Remove tarball (to keep it clean)
rm ./biogascontrollerapp-linux.tar.gz
mv dist biogascontrollerapp-linux
cd biogascontrollerapp-linux/
else
echo "Please download the repo manually and execute the script inside the downloaded repo from GitHub releases"
exit 1
fi
fi
# We are now guaranteed to be in the base directory of the repo
# Set up venv if selected
if [[ "$use_venv" == "y" || "$use_venv" == "" ]]; then
python -m venv .venv
if [[ "$SHELL" == "fish" ]]; then
source ./.venv/bin/activate.fish
elif [[ "$SHELL" == "csh" ]]; then
source ./.venv/bin/activate.csh
else
source ./.venv/bin/activate
fi
if !command -v deactivate >/dev/null 2>&1; then
echo "Virtual environment could not be activated.
You may install the dependencies by changing to the biogascontrollerapp directory and running
pip install -r requirements.txt"
exit 1
fi
fi
pip install -r requirements.txt
echo "
==> Installation complete!
"

18
launch.sh Executable file
View File

@@ -0,0 +1,18 @@
#!/bin/sh
use_venv="y"
if [[ -f ./.venv/bin/activate ]]; then
if [[ "$SHELL" == "fish" ]]; then
source ./.venv/bin/activate.fish
elif [[ "$SHELL" == "csh" ]]; then
source ./.venv/bin/activate.csh
else
source ./.venv/bin/activate
fi
if !command -v deactivate >/dev/null 2>&1; then
echo "Virtual environment could not be activated. Trying to run anyway"
fi
fi
python biogascontrollerapp.py

187
lib/com.py Normal file
View File

@@ -0,0 +1,187 @@
from abc import ABC, abstractmethod
from typing import Optional
import serial
import struct
import serial.tools.list_ports
# The below class is abstract to have a consistent, targetable interface
# for both the real connection module and the simulation module
#
# If you are unaware of what classes are, you can mostly ignore the ComSuperClass
#
# For the interested, a quick rundown of what the benefits of doing it this way is:
# This class provides a way to have two wholly different implementations that have
# the same function interface (i.e. all functions take the same arguments)
#
# Another benefit of having classes is that we can pass a single instance around to
# various components and have one shared instance that all can modify, reducing some
# overhead.
#
# The actual implementation of most functions (called methods in OOP) are implemented
# in the Com class below.
class ComSuperClass(ABC):
def __init__(
self, baudrate: Optional[int] = 19200, filters: Optional[list[str]] = None
) -> None:
self._serial: Optional[serial.Serial] = None
self._filters = (
filters
if filters != None
else ["USB-Serial Controller", "Prolific USB-Serial Controller"]
)
self._port_override = ""
self._baudrate = baudrate if baudrate != None else 19200
self._err = None
def set_port_override(self, override: str) -> None:
"""Set the port override, to disable port search"""
if override != "" and override != "None":
self._port_override = override
def get_error(self) -> serial.SerialException | None:
return self._err
@abstractmethod
def get_comport(self) -> str:
pass
@abstractmethod
def connect(self) -> bool:
pass
@abstractmethod
def close(self) -> None:
pass
@abstractmethod
def receive(self, byte_count: int) -> bytes:
pass
@abstractmethod
def send(self, msg: str) -> None:
pass
@abstractmethod
def send_float(self, msg: float) -> None:
pass
# ┌ ┐
# │ Main Com Class Implementation │
# └ ┘
# Below you can find what you were most likely looking for. This is the implementation of the communication with the microcontroller.
# You may also be interested in the decoder.py and instructions.py file, as the decoding and the hooking / syncing process are
# implemented there. It is recommended that you do NOT read the test/com.py file, as that one is only there for simulation purposes
# and is much more complicated than this here, if you are not well versed with Python or are struggling with the basics
class Com(ComSuperClass):
def _connection_check(self) -> bool:
if self._serial == None:
return self._open()
if self._serial != None:
if not self._serial.is_open:
self._serial.open()
return True
else:
return False
def get_comport(self) -> str:
"""Find the comport the microcontroller has attached to"""
if self._port_override != "":
return self._port_override
# Catch all errors and simply return an empty string if search unsuccessful
try:
# Get an array of all used comports
ports = [comport.device for comport in serial.tools.list_ports.comports()]
# Filter for specific controller
for comport in ports:
for filter in self._filters:
if filter in comport:
return comport
except Exception as e:
self._err = e
return ""
def _open(self) -> bool:
"""Open the connection. Internal function, not to be called directly
Returns:
Boolean indicates if connection was successful or not
"""
# Get the com port the controller has connected to
comport = self.get_comport()
# Comport search returns empty string if search unsuccessful
if comport == "":
# Try to generate a new Serial object with the configuration of this class
# self._baudrate contains the baud rate and defaults to 19200
try:
self._serial = serial.Serial(comport, self._baudrate, timeout=5)
except serial.SerialException as e:
# If an error occurs, catch it, handle it and store the error
# for the UI and return False to indicate failed connection
self._err = e
return False
# Connection succeeded, return True
return True
else:
# Haven't found a comport
return False
def connect(self) -> bool:
"""Try to find a comport and connect to the microcontroller. Returns the success as a boolean"""
return self._connection_check()
def close(self) -> None:
"""Close the serial connection, if possible"""
if self._serial != None:
try:
self._serial.close()
except:
pass
def receive(self, byte_count: int) -> bytes:
"""Receive bytes from microcontroller over serial. Returns bytes. Might want to decode using functions from lib.decoder"""
# Check connection
self._connection_check()
# Ignore this boilerplate (extra code), the body of the if is the only thing important.
# The reason for the boilerplate is that the type checker will notice that self._serial can be
# None, thus showing errors.
if self._serial != None:
return self._serial.read(byte_count)
else:
raise Exception("ERR_CONNECTING")
def send(self, msg: str) -> None:
"""Send a string over serial connection. Will open a connection if none is available"""
# Check connection
self._connection_check()
# Ignore this boilerplate (extra code), the body of the if is the only thing important.
# The reason for the boilerplate is that the type checker will notice that self._serial can be
# None, thus showing errors.
if self._serial != None:
self._serial.write(msg.encode())
else:
raise Exception("ERR_CONNECTING")
def send_float(self, msg: float) -> None:
"""Send a float number over serial connection"""
# Check connection
self._connection_check()
# Ignore this boilerplate (extra code), the body of the if is the only thing important.
# The reason for the boilerplate is that the type checker will notice that self._serial can be
# None, thus showing errors.
if self._serial != None:
self._serial.write(bytearray(struct.pack(">f", msg))[0:3])
else:
raise Exception("ERR_CONNECTING")

144
lib/config.py Normal file
View File

@@ -0,0 +1,144 @@
import configparser
from typing import List
# Load the config
config = configparser.ConfigParser()
config.read("./config.ini")
global first_error
first_error = True
global is_verbose
is_verbose = True
def set_verbosity(verbose: bool):
global is_verbose
is_verbose = verbose
print("\n", "-" * 20, "\nValidating configuration...\n")
def str_to_bool(val: str) -> bool | None:
"""Convert a string to boolean, converting "True" and "true" to True, same for False
Args:
val: The value to try to convert
Returns:
Returns either a boolean if conversion was successful, or None if not a boolean
"""
return {"True": True, "true": True, "False": False, "false": False}.get(val, None)
def read_config(
key_0: str,
key_1: str,
default: str,
valid_entries: List[str] = [],
type_to_validate: str = "",
) -> str:
"""Read the configuration, report potential configuration issues and validate each entry
Args:
key_0: The first key (top level)
key_1: The second key (where the actual key-value pair is)
default: The default value to return if the check fails
valid_entries: [Optiona] The entries that are valid ones to check against
type_to_validate: [Optional] Data type to validate
Returns:
[TODO:return]
"""
# Try loading the keys
tmp = {}
try:
tmp = config[key_0]
except KeyError:
print_config_error(key_0, key_1, "", default, "unknown", index=1)
return default
value = ""
try:
value = tmp[key_1]
except KeyError:
print_config_error(key_0, key_1, "", default, "unknown")
return default
if len(value) == 0:
print_config_error(key_0, key_1, value, default, "not_empty")
# Validate input
if type_to_validate != "":
# Need to validate
if type_to_validate == "int":
try:
int(value)
except ValueError:
print_config_error(key_0, key_1, value, default, "int")
return default
if type_to_validate == "float":
try:
float(value)
except ValueError:
print_config_error(key_0, key_1, value, default, "float")
return default
if type_to_validate == "bool":
if str_to_bool(value) == None:
print_config_error(key_0, key_1, value, default, "bool")
return default
if len(valid_entries) > 0:
# Need to validate the names
try:
valid_entries.index(value)
except ValueError:
print_config_error(
key_0, key_1, value, default, "oneof", valid_entries=valid_entries
)
return default
return value
def print_config_error(
key_0: str,
key_1: str,
value: str,
default: str,
expected: str,
valid_entries: List[str] = [],
msg: str = "",
index: int = 1,
):
"""Print configuration errors to the shell
Args:
key_0: The first key (top level)
key_1: The second key (where the actual value is to be found)
expected: The data type expected. If unknown key, set to "unknown" and set index; If should be one of, use "oneof" and set valid_entries list
msg: The message to print
index: The index in the chain (i.e. if key_0 or key_1)
"""
if not is_verbose:
return
print(f" ==> Using default setting ({default}) for {key_0}.{key_1}")
if expected == "unknown":
# The field was unknown
print(f' -> Unknown field "{key_0 if index == 0 else key_1}"')
elif expected == "oneof":
print(
f' -> Invalid name "{value}". Has to be one of', ", ".join(valid_entries)
)
elif expected == "not_empty":
print(" -> Property is unexpectedly None")
elif expected == "bool":
print(f' -> Boolean property expected, but instead found "{value}".')
else:
print(f" -> Expected a config option of type {expected}.")
if msg != "":
print(msg)

24
lib/decoder.py Normal file
View File

@@ -0,0 +1,24 @@
import struct
# Decoder to decode various sent values from the microcontroller
class Decoder:
# Decode an ascii character
def decode_ascii(self, value: bytes) -> str:
try:
return value.decode()
except:
return "Error"
# Decode a float (6 bits)
def decode_float(self, value: bytes) -> float:
return struct.unpack(">f", bytes.fromhex(str(value, "ascii") + "00"))[0]
# Decode a float, but with additional offsets
def decode_float_long(self, value: bytes) -> float:
return struct.unpack(">f", bytes.fromhex(str(value, "ascii") + "0000"))[0]
# Decode an int
def decode_int(self, value: bytes) -> int:
# return int.from_bytes(value, 'big')
return int(value, base=16)

138
lib/instructions.py Normal file
View File

@@ -0,0 +1,138 @@
from lib.com import ComSuperClass
import lib.decoder
import time
decoder = lib.decoder.Decoder()
# Class that supports sending instructions to the microcontroller,
# as well as hooking to data stream according to protocol
class Instructions:
def __init__(self, com: ComSuperClass) -> None:
self._com = com
# Helper method to hook to the data stream according to protocol.
# You can specify the sequence that the program listens to to sync up,
# as an array of strings, that should each be of length one and only contain
# ascii characters
def hook(self, instruction: str, sequence: list[str]) -> bool:
# Add protection: If we cannot establish connection, refuse to run
if not self._com.connect():
return False
# Send instruction to microcontroller to start hooking process
self._com.send(instruction)
# Record start time to respond to timeout
start = time.time()
# The pointer below points to the element in the array which is the next expected character to be received
pointer = 0
# Simply the length of the sequence, since it is both cheaper and cleaner to calculate it once
sequence_max = len(sequence)
# Only run for a limited amount of time
while time.time() - start < 5:
# Receive and decode a single byte and decode as ASCII
data = decoder.decode_ascii(self._com.receive(1))
if data == sequence[pointer]:
# Increment the pointer (move to next element in the List)
pointer += 1
else:
# Jump back to start
pointer = 0
# If the pointer has reached the end of the sequence, return True, as now the hook was successful
if pointer == sequence_max:
return True
# If we time out, which is the only way in which this code can be reached, return False
return False
# Used to hook to the main data stream, as that hooking mechanism is different
def hook_main(self) -> bool:
# Record start time to respond to timeout
start = time.time()
# Wait to find a CR character (enter)
char = decoder.decode_ascii(self._com.receive(1))
while char != "\n":
# Check for timeout
if time.time() - start > 3:
return False
# Set the next character by receiving and decoding it as ASCII
char = decoder.decode_ascii(self._com.receive(1))
# Store the position in the hooking process
state = 0
distance = 0
# While we haven't timed out and have not reached the last state execute
# The last state indicates that the sync was successful
while time.time() - start < 5 and state < 3:
# Receive the next char and decode it as ASCII
char = decoder.decode_ascii(self._com.receive(1))
# The character we look for when syncing is Space (ASCII char 32 (decimal))
# It is sent every 4 bits. If we have received 3 with the correct distance from
# the previous in a row, we are synced
if char == " ":
if distance == 4:
state += 1
distance = 0
else:
if distance > 4:
state = 0
distance = 0
else:
distance += 1
# Read 5 more bits to correctly sync up
self._com.receive(5)
return state == 3
# Private helper method to transmit data using the necessary protocols
def _change_data(
self,
instruction: str,
readback: list[str],
data: list[float],
readback_length: int,
) -> None:
# Hook to stream
if self.hook(instruction, readback):
# Transmit data
while len(data) > 0:
# If we received data back, we can send more data, i.e. from this we know
# the controller has received the data
# If not, we close the connection and create an exception
if self._com.receive(readback_length) != "":
self._com.send_float(data.pop(0))
else:
self._com.close()
raise Exception(
"Failed to transmit data. No response from controller"
)
self._com.close()
else:
self._com.close()
raise ConnectionError(
"Failed to hook to controller data stream. No fitting response received"
)
# Abstraction of the _change_data method specifically designed to change the entire config
def change_config(self, new_config: list[float]) -> None:
try:
self._change_data("PR", ["\n", "P", "R", "\n"], new_config, 3)
except Exception as e:
raise e
# Abstraction of the _change_data method specifically designed to change only the configured temperature
def change_temperature(self, temperatures: list[float]) -> None:
try:
self._change_data("PT", ["\n", "P", "T", "\n"], temperatures, 3)
except Exception as e:
raise e

237
lib/test/com.py Normal file
View File

@@ -0,0 +1,237 @@
"""
Library to be used in standalone mode (without microcontroller, for testing functionality)
It simulates the behviour of an actual microcontroller being connected
"""
from typing import List, Optional
import queue
import random
import time
import struct
from lib.com import ComSuperClass
# ┌ ┐
# │ Testing Module For Com │
# └ ┘
# This file contains a Com class that can be used to test the functionality
# even without a microcontroller. It is not documented in a particularly
# beginner-friendly way, nor is the code written with beginner-friendliness
# in mind. It is the most complicated piece of code of the entire application
# ────────────────────────────────────────────────────────────────────
# All double __ prefixed properties and methods are not available in the actual impl
instruction_lut: dict[str, list[str]] = {
"PR": ["\n", "P", "R", "\n"],
"PT": ["\n", "P", "T", "\n"],
"RD": ["\n", "R", "D", "\n"],
"NM": ["\n", "N", "M", "\n"],
"FM": ["\n", "F", "M", "\n"],
}
reconfig = ["a", "b", "c", "t"]
class SimulationError(Exception):
pass
class SensorConfig:
a: float
b: float
c: float
t: float
def __init__(
self, a: float = 20, b: float = 30, c: float = 10, t: float = 55
) -> None:
self.a = a
self.b = b
self.c = c
self.t = t
class Com(ComSuperClass):
def __init__(
self, fail_sim: int, baudrate: int = 19200, filters: Optional[list[str]] = None
) -> None:
# Calling the constructor of the super class to assign defaults
print("\n\nWARNING: Using testing library for communication!\n\n")
super().__init__(baudrate, filters)
# Initialize queue with values to be sent on call of recieve
self.__simulated_data: queue.Queue[bytes] = queue.Queue()
self.__simulated_data_remaining = 0
self.__reconf_sensor = 0
self.__reconf_step = 0
self.__fail_sim = fail_sim
self.__config: List[SensorConfig] = [
SensorConfig(),
SensorConfig(),
SensorConfig(),
SensorConfig(),
]
# Initially, we are in normal mode (which leads to slower data intervals)
self.__mode = "NM"
def set_port_override(self, override: str) -> None:
"""Set the port override, to disable port search"""
self._port_override = override
def get_comport(self) -> str:
return "Sim" if self._port_override == "" else self._port_override
def connect(self) -> bool:
# Randomly return false in 1 in fail_sim ish cases
if random.randint(0, self.__fail_sim) == 0:
print("Simulating error to connect")
return False
return True
def close(self) -> None:
pass
def receive(self, byte_count: int) -> bytes:
data = []
# If queue is too short, refill it
if self.__simulated_data_remaining < byte_count:
self.__fill_queue()
for _ in range(byte_count):
if self.__mode == "NM":
time.sleep(0.005)
try:
data.append(self.__simulated_data.get_nowait())
self.__simulated_data_remaining -= 1
except Exception as e:
print("ERROR: Simulation could not continue")
raise SimulationError(
"Simulation encountered an error with the simulation queue. The error encountered: \n"
+ str(e)
)
return b"".join(data)
def send(self, msg: str) -> None:
# Using LUT to reference
readback = instruction_lut.get(msg)
if readback != None:
for i in range(len(readback)):
self.__add_ascii_char(readback[i])
if msg == "RD":
self.__set_read_data_data()
elif msg == "PR":
self.__reconf_sensor = 0
self.__reconf_step = 0
self.__add_ascii_char("a")
self.__add_ascii_char("0")
self.__add_ascii_char("\n")
def __set_read_data_data(self) -> None:
# Send data for all four sensors
for i in range(4):
self.__add_float_as_hex(self.__config[i].a)
self.__add_ascii_char(" ")
self.__add_float_as_hex(self.__config[i].b)
self.__add_ascii_char(" ")
self.__add_float_as_hex(self.__config[i].c)
self.__add_ascii_char(" ")
self.__add_float_as_hex(self.__config[i].t)
self.__add_ascii_char("\n")
def send_float(self, msg: float) -> None:
if self.__reconf_step == 0:
self.__config[self.__reconf_sensor].a = msg
elif self.__reconf_step == 1:
self.__config[self.__reconf_sensor].b = msg
elif self.__reconf_step == 2:
self.__config[self.__reconf_sensor].c = msg
elif self.__reconf_step == 3:
self.__config[self.__reconf_sensor].t = msg
if self.__reconf_step == 3:
self.__reconf_step = 0
self.__reconf_sensor += 1
else:
self.__reconf_step += 1
if self.__reconf_sensor == 4:
return
self.__add_ascii_char(reconfig[self.__reconf_step])
self.__add_ascii_char(str(self.__reconf_sensor))
self.__add_ascii_char("\n")
def __fill_queue(self):
# Simulate a full cycle
for _ in range(4):
self.__add_integer_as_hex(self.__generate_random_int(200))
self.__simulated_data.put(bytes(" ", "ascii"))
self.__add_float_as_hex(self.__generate_random_float(50))
self.__simulated_data.put(bytes(" ", "ascii"))
self.__simulated_data_remaining += 2
for _ in range(3):
self.__add_integer_as_hex(self.__generate_random_int(65535))
self.__simulated_data.put(bytes(" ", "ascii"))
self.__simulated_data_remaining += 1
self.__add_integer_as_hex(self.__generate_random_int(65535))
self.__simulated_data.put(bytes("\n", "ascii"))
self.__simulated_data_remaining += 1
def __generate_random_int(self, max: int) -> int:
return random.randint(0, max)
def __generate_random_float(self, max: int) -> float:
return random.random() * max
def __add_ascii_char(self, ascii_string: str):
self.__simulated_data.put(ord(ascii_string).to_bytes(1))
self.__simulated_data_remaining += 1
def __add_two_byte_value(self, c: int):
"""putchhex
Args:
c: The char (as integer)
"""
# First nibble (high)
high_nibble = (c >> 4) & 0x0F
high_char = chr(high_nibble + 48 if high_nibble < 10 else high_nibble + 55)
self.__simulated_data.put(high_char.encode())
# Second nibble (low)
low_nibble = c & 0x0F
low_char = chr(low_nibble + 48 if low_nibble < 10 else low_nibble + 55)
self.__simulated_data.put(low_char.encode())
self.__simulated_data_remaining += 2
def __add_integer_as_hex(self, c: int):
"""Writes the hexadecimal representation of the high and low bytes of integer `c` (16-bit) to the simulated serial port."""
if not (0 <= c <= 0xFFFF):
raise ValueError("Input must be a 16-bit integer (065535)")
# Get high byte (most significant byte)
hi_byte = (c >> 8) & 0xFF
# Get low byte (least significant byte)
lo_byte = c & 0xFF
# Call putchhex for the high byte and low byte
self.__add_two_byte_value(hi_byte)
self.__add_two_byte_value(lo_byte)
def __add_float_as_hex(self, f: float):
"""Converts a float to its byte representation and sends the bytes using putchhex."""
# Pack the float into bytes (IEEE 754 format)
packed = struct.pack(">f", f) # Big-endian format (network byte order)
# Unpack the bytes into 3 bytes: high, mid, low
high, mid, low = packed[0], packed[1], packed[2]
# Send each byte as hex
self.__add_two_byte_value(high)
self.__add_two_byte_value(mid)
self.__add_two_byte_value(low)

View File

@@ -1,61 +1,92 @@
import csv import csv
import numpy as np import numpy as np
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
go = 0
n = int(input("Aktuelle Sondennummer: ")) n = int(input("Sensor number to be printed: "))
try: file = ""
imp = open("Sonden2021.csv", "r")
go = 1
except FileNotFoundError:
print("Failed to open file (non-existent or corrupted?)")
go = 0
if go == 1:
reader = csv.reader(imp, delimiter=',') def generate_plot():
rohdaten = list(reader) # Read data using the CSV library
rohdaten.sort(key=lambda imp: float(imp[2])) reader = csv.reader(file, delimiter=",")
lenght = len(rohdaten)
# Create a list from the data
data = list(reader)
# Sort the list using a lambda sort descriptor
# A lambda function is an anonymous function (= an unnamed function),
# which makes it convenient. A sort descriptor is a function that
# (usually, but not here) returns a value indicating which of two values
# come before or after in the ordering.
# Here, instead we simply return a floating point value for each data point
data.sort(key=lambda data_point: float(data_point[2]))
# Store the x and y coordinates in two arrays
x = [] x = []
y = [] y = []
for i in range(lenght): for _ in range(len(data)):
extract = rohdaten.pop(0) # Extract the data point
sondennummer = int(extract.pop(0)) data_point = data.pop(0)
if sondennummer == n: sensor = int(data_point.pop(0))
ye = extract.pop(0) if sensor == n:
xe = extract.pop(0) y.append(float(data_point.pop(0)))
y.append(float(ye)) x.append(float(data_point.pop(0)))
x.append(float(xe))
# Use Numpy's polyfit function to fit a 2nd degree polynomial to the points using quadratic regression
# This function returns an array with the coefficients
fit = np.polyfit(x, y, 2) fit = np.polyfit(x, y, 2)
print(fit)
# The formula to output to the plot
formula = f"F(U) = {round(float(fit[0]), 4)}U^2+{round(float(fit[1]), 4)}U+{round(float(fit[2]), 4)}" formula = f"F(U) = {round(float(fit[0]), 4)}U^2+{round(float(fit[1]), 4)}U+{round(float(fit[2]), 4)}"
fit_fn = np.poly1d(fit) # Create a fit function from the previously determined coefficients
fit_fn = np.poly1d(fit) # Returns a function that takes a list of x-coordinate as argument
# Plot the line on the graph
plt.plot(x, fit_fn(x), color="BLUE", label="T(U)") plt.plot(x, fit_fn(x), color="BLUE", label="T(U)")
plt.scatter(x, y, color="MAGENTA", marker="o", label="Messsdaten") # Scatter Plot the data points that we have
plt.ylabel("Temperatur") plt.scatter(x, y, color="MAGENTA", marker="o", label="Data")
plt.xlabel("Spannung")
titel = 'Temperatursonde MCP9701A Nummer: {}'.format(n) # Label the graph
plt.title(titel) plt.ylabel("Temperature")
plt.axis([0.6, 2, 15, 70]) plt.xlabel("Voltage")
plt.title("Sensor MCP9701A #{}".format(n))
# Scale the axis appropriately
plt.axis((0.6, 2.0, 15.0, 70.0))
# Print a legend and set the graph to be annotated
plt.legend(loc="lower right") plt.legend(loc="lower right")
plt.annotate(formula, xy=(0.85, 60)) plt.annotate(formula, xy=(0.85, 60))
# Enable the background grid
plt.grid(True) plt.grid(True)
# Finally, show the graph
plt.show() plt.show()
saveit = input("Soll der Graph gespeichert werden? (y/n) ").lower() # Get user input whether to save the plot or not
saveit = input("Do you wish to save the plot? (y/N) ").lower()
if saveit == "y": if saveit == "y":
plt.savefig("Sonde"+str(n)+".png") # Save the plot as Sensor[Number] (e.g. Sensor9) as png, pdf and svg
plt.savefig("Sonde"+str(n)+".pdf", format="pdf") plt.savefig("Sensor" + str(n) + ".png")
plt.savefig("Sonde"+str(n)+".svg", format="svg") plt.savefig("Sensor" + str(n) + ".pdf", format="pdf")
print("saved images") plt.savefig("Sensor" + str(n) + ".svg", format="svg")
print("==> Images saved")
else: else:
print("discarded images") print("==> Images discarded")
# Since we have defined a function above as a function, this here is executed first
filename = input("Please enter a file path to the csv file to be plotted: ")
# Try to open the file
try:
file = open(filename, "r")
generate_plot()
except FileNotFoundError:
print("Failed to open file (non-existent or corrupted?)")

View File

@@ -4,29 +4,36 @@ import matplotlib.pyplot as plt
import csv import csv
import os import os
# Get user input for various data
path = input("Path to csv-file to be plotted: ") path = input("Path to csv-file to be plotted: ")
print("For the below, it is recommended to enter data in this format: yyyy-mm-dd-hh-mm")
date = input("Date & time at which the measurement was taken (approx.): ") date = input("Date & time at which the measurement was taken (approx.): ")
group = input("Group-name: ") group = input("Group-name: ")
saveit = input("Should the graph be saved? (y/n) ").lower() saveit = input("Should the graph be saved? (y/n) ").lower()
imp = open(path, "r") imp = open(path, "r")
reader = csv.reader(imp, delimiter=',') reader = csv.reader(imp, delimiter=",")
rohdaten = list(reader) data = list(reader)
lenght = len(rohdaten)
x = [] x = []
y = [] y = []
for i in range(lenght): for i in range(len(data)):
extract = rohdaten.pop(0) # Extract the data
extract = data.pop(0)
x.append(float(extract.pop(0))) x.append(float(extract.pop(0)))
y.append(float(extract.pop(0))) y.append(float(extract.pop(0)))
# Set up plot
plt.plot(x, y, color="MAGENTA") plt.plot(x, y, color="MAGENTA")
plt.xlabel("Time") plt.xlabel("Time")
plt.ylabel("Voltage") plt.ylabel("Voltage")
title = f"GC - Biogasanlage {date}"
plt.title(title) plt.title(f"GC - Biogasanlage {date}")
plt.grid(True) plt.grid(True)
if saveit == "y":
# Check if user wants to save the image
if saveit == "n":
print("didn't save images")
else:
pos = 0 pos = 0
for letter in path[::-1]: for letter in path[::-1]:
if letter == "/": if letter == "/":
@@ -40,11 +47,7 @@ if saveit == "y":
os.mkdir(save_path) os.mkdir(save_path)
except FileExistsError: except FileExistsError:
pass pass
plt.savefig(save_path) plt.savefig(f"{save_path}/GC-{date}-{group}.png")
os.rename(f"{save_path}/.png", f"{save_path}/GC-{date}-{group}.png")
print(f"saved images to {save_path}") print(f"Saved images to {save_path}")
else:
print("didn't save images")
plt.show() plt.show()

3
requirements.txt Normal file
View File

@@ -0,0 +1,3 @@
kivy[base]==2.3.1
kivymd==1.1.1
pyserial==3.5