From 071cd14477648237766bd9eef069669e87214147 Mon Sep 17 00:00:00 2001 From: Michael Woods Date: Sun, 16 Feb 2025 11:11:53 -0500 Subject: [PATCH] Added some connection client locks to make sure that nothing interrupts a connection send/receive routine. --- src/packetserver/client/__init__.py | 39 ++++++++++++++++++----------- 1 file changed, 25 insertions(+), 14 deletions(-) diff --git a/src/packetserver/client/__init__.py b/src/packetserver/client/__init__.py index dbd6cd9..f5bfb29 100644 --- a/src/packetserver/client/__init__.py +++ b/src/packetserver/client/__init__.py @@ -7,6 +7,7 @@ import ax25 import logging import signal import time +from threading import Lock from msgpack.exceptions import OutOfData from typing import Callable, Self, Union, Optional from traceback import format_exc @@ -23,6 +24,8 @@ class Client: self.callsign = client_callsign self.app = pe.app.Application() self.started = False + self._connection_locks = {} + self.lock_locker = Lock() signal.signal(signal.SIGINT, self.exit_gracefully) signal.signal(signal.SIGTERM, self.exit_gracefully) @@ -85,6 +88,9 @@ class Client: raise RuntimeError("Must start client before creating connections.") if not ax25.Address.valid_call(dest): raise ValueError(f"Provided destination callsign '{dest}' is invalid.") + with self.lock_locker: + if dest.upper() not in self._connection_locks: + self._connection_locks[dest.upper()] = Lock() conn = self.connection_callsign(dest.upper()) if conn is not None: return conn @@ -103,20 +109,25 @@ class Client: if conn.state.name != "CONNECTED": raise RuntimeError("Connection is not connected.") logging.debug(f"Sending request {req}") - conn.send_data(req.pack()) - cutoff_date = datetime.datetime.now() + datetime.timedelta(seconds=timeout) - while datetime.datetime.now() < cutoff_date: - if conn.state.name != "CONNECTED": - logging.error(f"Connection {conn} disconnected.") - return None - try: - unpacked = conn.data.unpack() - except: - time.sleep(.1) - continue - msg = Message.partial_unpack(unpacked) - return Response(msg) - return None + dest = conn.remote_callsign.upper() + with self.lock_locker: + if dest not in self._connection_locks: + self._connection_locks[dest] = Lock() + with self._connection_locks[dest]: + conn.send_data(req.pack()) + cutoff_date = datetime.datetime.now() + datetime.timedelta(seconds=timeout) + while datetime.datetime.now() < cutoff_date: + if conn.state.name != "CONNECTED": + logging.error(f"Connection {conn} disconnected.") + return None + try: + unpacked = conn.data.unpack() + except: + time.sleep(.1) + continue + msg = Message.partial_unpack(unpacked) + return Response(msg) + return None def send_receive_callsign(self, req: Request, callsign: str, timeout: int = 300) -> Optional[Response]: return self.send_and_receive(req, self.connection_for(callsign), timeout=timeout)