diff --git a/biogascontrollerapp/lib/decoder.py b/biogascontrollerapp/lib/decoder.py index b8f1b2f..dc37975 100644 --- a/biogascontrollerapp/lib/decoder.py +++ b/biogascontrollerapp/lib/decoder.py @@ -14,4 +14,5 @@ class Decoder: return struct.unpack('>f', bytes.fromhex(str(value, 'ascii') + '0000'))[0] def decode_int(self, value: bytes) -> int: + # return int.from_bytes(value, 'big') return int(value, base=16) diff --git a/biogascontrollerapp/lib/instructions.py b/biogascontrollerapp/lib/instructions.py index 4b2bbc2..0c96eef 100644 --- a/biogascontrollerapp/lib/instructions.py +++ b/biogascontrollerapp/lib/instructions.py @@ -26,10 +26,7 @@ class Instructions: return False # Send instruction to microcontroller to start hooking process - # If instruction is an empty string, do not send instruction - - if instruction != "": - self._com.send(instruction) + self._com.send(instruction) # Record start time to respond to timeout start = time.time() @@ -44,7 +41,7 @@ class Instructions: while time.time() - start < 5: # If the decoded ascii character is equal to the next expected character, move pointer right by one # If not, jump back to start - if (decoder.decode_ascii(self._com.receive(1))) == sequence[pointer]: + if decoder.decode_ascii(self._com.receive(1)) == sequence[pointer]: pointer += 1 else: pointer = 0 @@ -56,6 +53,39 @@ class Instructions: # If we time out, which is the only way in which this code can be reached, return False return False + # Used to hook to the main data stream, as that hooking mechanism is differen + def hook_main(self) -> bool: + # Record start time to respond to timeout + start = time.time() + + # Wait to find a CR character (enter) + char = decoder.decode_ascii(self._com.receive(1)) + while char != "\n": + if time.time() - start > 3: + return False + char = decoder.decode_ascii(self._com.receive(1)) + + # Store the position in the hooking process + state = 0 + distance = 0 + + while time.time() - start < 5 and state < 3: + char = decoder.decode_ascii(self._com.receive(1)) + if char == " ": + if distance == 4: + state += 1 + distance = 0 + else: + if distance > 4: + state = 0 + distance = 0 + else: + distance += 1 + + self._com.receive(5) + + return state == 3 + # Private helper method to transmit data using the necessary protocols def _change_data( self, diff --git a/biogascontrollerapp/lib/test/com.py b/biogascontrollerapp/lib/test/com.py index 8f30ac6..9b4ca9e 100644 --- a/biogascontrollerapp/lib/test/com.py +++ b/biogascontrollerapp/lib/test/com.py @@ -8,6 +8,7 @@ import queue import random import serial import time +import struct from lib.com import ComSuperClass @@ -32,18 +33,17 @@ class SimulationError(Exception): class Com(ComSuperClass): - def __init__(self, baudrate: int = 19200, filters: Optional[list[str]] = None) -> None: + def __init__( + self, baudrate: int = 19200, filters: Optional[list[str]] = None + ) -> None: # Calling the constructor of the super class to assign defaults print("\n\nWARNING: Using testing library for communication!\n\n") - super().__init__(baudrate, filters); + super().__init__(baudrate, filters) # Initialize queue with values to be sent on call of recieve - self.__simulated_data: queue.Queue[int] = queue.Queue() + self.__simulated_data: queue.Queue[bytes] = queue.Queue() self.__simulated_data_remaining = 0 - # Keep track of the number of bytes sent to fulfil protocol - self.__bytes_sent: int = 0 - # Initially, we are in normal mode (which leads to slower data intervals) self.__mode = "NM" @@ -68,35 +68,73 @@ class Com(ComSuperClass): pass def receive(self, byte_count: int) -> bytes: - # TODO: Make it return simulated data -> Refill if queue length is smaller than requested byte_count data = [] # If queue is too short, refill it if self.__simulated_data_remaining < byte_count: self.__fill_queue() - for i in range(byte_count): + for _ in range(byte_count): if self.__mode == "NM": - time.sleep( 0.001 ); + time.sleep(0.001) try: data.append(self.__simulated_data.get_nowait()) + self.__simulated_data_remaining -= 1 except Exception as e: print("ERROR: Simulation could not continue") - raise SimulationError("Simulation encountered an error with the simulation queue. The error encountered: \n" + str(e)) - return bytes(data) + raise SimulationError( + "Simulation encountered an error with the simulation queue. The error encountered: \n" + + str(e) + ) + return b''.join(data) def send(self, msg: str) -> None: - # TODO: Use LUT to find what should be added to the queue for read # Using LUT to reference readback = instruction_lut.get(msg) if readback != None: for i in range(len(readback)): - self.__simulated_data.put(ord(readback[i])) + self.__simulated_data.put(bytes(readback[i], "ascii")) + if msg == "RD": + # Handle ReadData readback + # self.__simulated_data.put(ord("")) + pass def send_float(self, msg: float) -> None: - pass - - def __add_random_float(self): - pass + # Encode float as 8 bytes (64 bit) + ba = struct.pack("d", msg) + for byte in ba: + self.__simulated_data.put(byte.to_bytes()) def __fill_queue(self): - pass + # Add some dummy data. The data is randomized and is *not* + # an accurate simulation of what the microcontroller will return + # It only serves to check if the protocol handling works as expected + for _ in range(4): + self.__add_to_queue(self.__generate_int_as_bytes(200)) + self.__simulated_data.put(bytes(" ", "ascii")) + self.__add_to_queue(self.__generate_float_as_bytes(size = 6)) + self.__simulated_data.put(bytes(" ", "ascii")) + self.__simulated_data_remaining += 2 + for _ in range(3): + self.__add_to_queue(self.__generate_int_as_bytes(65535)) + self.__simulated_data.put(bytes(" ", "ascii")) + self.__add_to_queue(self.__generate_int_as_bytes(65535)) + self.__simulated_data.put(bytes("\n", "ascii")) + self.__simulated_data_remaining += 4 + print("Length:", self.__simulated_data_remaining) + + def __generate_int_as_bytes(self, upper_limit: int) -> list[bytes]: + byte_array = random.randint(0, upper_limit).to_bytes(4, "big") + return [byte.to_bytes() for byte in byte_array] + + def __generate_float_as_bytes(self, upper_limit: int = 200, size: int = 4) -> list[bytes]: + random_float = random.random() * upper_limit + byte_data = struct.pack(">f", random_float) + data = [byte.to_bytes() for byte in byte_data] + for _ in range(size - len(data)): + data.append(b'\x00') + return data + + def __add_to_queue(self, data: list[bytes]): + for value in data: + self.__simulated_data_remaining += 1 + self.__simulated_data.put(value)