diff --git a/biogascontrollerapp/lib/test/com.py b/biogascontrollerapp/lib/test/com.py index b6a0d65..8f30ac6 100644 --- a/biogascontrollerapp/lib/test/com.py +++ b/biogascontrollerapp/lib/test/com.py @@ -7,6 +7,7 @@ from typing import Optional import queue import random import serial +import time from lib.com import ComSuperClass @@ -17,23 +18,28 @@ from lib.com import ComSuperClass # All double __ prefixed properties and methods are not available in the actual one -instruction_lut = { - "PR": "\nPR\n", - "PT": "\nPT\n", - "RD": "\nRD\n", - "NM": "\nNM\n", - "FM": "\nFM\n", +instruction_lut: dict[str, list[str]] = { + "PR": ["\n", "P", "R", "\n"], + "PT": ["\n", "P", "T", "\n"], + "RD": ["\n", "R", "D", "\n"], + "NM": ["\n", "N", "M", "\n"], + "FM": ["\n", "F", "M", "\n"], } +class SimulationError(Exception): + pass + + class Com(ComSuperClass): def __init__(self, baudrate: int = 19200, filters: Optional[list[str]] = None) -> None: # Calling the constructor of the super class to assign defaults - print("WARNING: Using testing library for communication!") + print("\n\nWARNING: Using testing library for communication!\n\n") super().__init__(baudrate, filters); # Initialize queue with values to be sent on call of recieve self.__simulated_data: queue.Queue[int] = queue.Queue() + self.__simulated_data_remaining = 0 # Keep track of the number of bytes sent to fulfil protocol self.__bytes_sent: int = 0 @@ -62,19 +68,35 @@ class Com(ComSuperClass): pass def receive(self, byte_count: int) -> bytes: - # TODO: Make it return simulated data + # TODO: Make it return simulated data -> Refill if queue length is smaller than requested byte_count data = [] + # If queue is too short, refill it + if self.__simulated_data_remaining < byte_count: + self.__fill_queue() + for i in range(byte_count): - data.append(self.__simulated_data.get_nowait()) - return bytes("A", "ascii") + if self.__mode == "NM": + time.sleep( 0.001 ); + try: + data.append(self.__simulated_data.get_nowait()) + except Exception as e: + print("ERROR: Simulation could not continue") + raise SimulationError("Simulation encountered an error with the simulation queue. The error encountered: \n" + str(e)) + return bytes(data) def send(self, msg: str) -> None: # TODO: Use LUT to find what should be added to the queue for read # Using LUT to reference - pass + readback = instruction_lut.get(msg) + if readback != None: + for i in range(len(readback)): + self.__simulated_data.put(ord(readback[i])) def send_float(self, msg: float) -> None: pass - def __generate_random_value(self, precision: int) -> bytes: - return bytes(str(round(random.random() * precision) / precision), "ascii") + def __add_random_float(self): + pass + + def __fill_queue(self): + pass