Skip to content

Commit d8b8255

Browse files
add asynchronous response capabilities to the uart (OpenwaterHealth#75)
* added asynchronous response capabilities to the uart * more explicitly catching the value error when parsing
1 parent 15f8e49 commit d8b8255

1 file changed

Lines changed: 44 additions & 5 deletions

File tree

src/openlifu/io/LIFUUart.py

Lines changed: 44 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
import asyncio
22
import logging
3+
import queue
34
import threading
45
import time
56

67
import serial
78
import serial.tools.list_ports
89

9-
from openlifu.io.LIFUConfig import OW_ACK, OW_CMD_NOP
10+
from openlifu.io.LIFUConfig import OW_ACK, OW_CMD_NOP, OW_ERROR, OW_RESP
1011
from openlifu.io.LIFUSignal import LIFUSignal
1112

1213
# Packet structure constants
@@ -182,6 +183,8 @@ def __init__(self, vid, pid, baudrate=921600, timeout=10, align=0, desc="VCP", d
182183

183184
if async_mode:
184185
self.loop = asyncio.get_event_loop()
186+
self.response_queues = {} # Dictionary to map packet IDs to response queues
187+
self.response_lock = threading.Lock() # Lock for thread-safe access to response_queues
185188

186189
def connect(self):
187190
"""Open the serial port."""
@@ -295,7 +298,7 @@ def _read_data(self, timeout=20):
295298
data = self.demo_responses.pop(0)
296299
log.info("Demo mode: Simulated data received: %s", data)
297300
self.signal_data_received.emit(self.descriptor, "Demo Response")
298-
time.sleep(1) # Simulate delay (1 second)
301+
time.sleep(10) # Simulate delay (10 second)
299302
return
300303

301304
# In async mode, run the reading loop in a thread
@@ -305,7 +308,24 @@ def _read_data(self, timeout=20):
305308
data = self.serial.read(self.serial.in_waiting)
306309
self.read_buffer.extend(data)
307310
log.info("Data received on %s: %s", self.descriptor, data)
308-
self.signal_data_received.emit(self.descriptor, "Command Response")
311+
# Attempt to parse a complete packet from read_buffer.
312+
try:
313+
# Note: Depending on your protocol, you might need to check for start/end bytes
314+
# and possibly handle partial packets.
315+
packet = UartPacket(buffer=bytes(self.read_buffer))
316+
# Clear the buffer after a successful parse.
317+
self.read_buffer = []
318+
if self.asyncMode:
319+
with self.response_lock:
320+
# Check if a queue is waiting for this packet ID.
321+
if packet.id in self.response_queues:
322+
self.response_queues[packet.id].put(packet)
323+
else:
324+
log.warning("Received an unsolicited packet with ID %d", packet.id)
325+
else:
326+
self.signal_data_received.emit(self.descriptor, packet)
327+
except ValueError as ve:
328+
log.error("Error parsing packet: %s", ve)
309329
else:
310330
time.sleep(0.05) # Brief sleep to avoid a busy loop
311331
except serial.SerialException as e:
@@ -405,10 +425,29 @@ def send_packet(self, id=None, packetType=OW_ACK, command=OW_CMD_NOP, addr=0, re
405425

406426
self._tx(packet)
407427

408-
if not self.running:
428+
if not self.asyncMode:
409429
return self.read_packet(timeout=timeout)
410430
else:
411-
return None
431+
response_queue = queue.Queue()
432+
with self.response_lock:
433+
self.response_queues[id] = response_queue
434+
435+
try:
436+
# Wait for a response that matches the packet ID.
437+
response = response_queue.get(timeout=timeout)
438+
# Optionally, check that the response has the expected type and command.
439+
if response.packet_type == OW_RESP and response.command == command:
440+
return response
441+
else:
442+
log.error("Received unexpected response: %s", response)
443+
return response
444+
except queue.Empty:
445+
log.error("Timeout waiting for response to packet ID %d", id)
446+
return None
447+
finally:
448+
with self.response_lock:
449+
# Clean up the queue entry regardless of outcome.
450+
self.response_queues.pop(id, None)
412451

413452
except ValueError as ve:
414453
log.error("Validation error in send_packet: %s", ve)

0 commit comments

Comments
 (0)