Added some connection client locks to make sure that nothing interrupts a connection send/receive routine.
This commit is contained in:
@@ -7,6 +7,7 @@ import ax25
|
|||||||
import logging
|
import logging
|
||||||
import signal
|
import signal
|
||||||
import time
|
import time
|
||||||
|
from threading import Lock
|
||||||
from msgpack.exceptions import OutOfData
|
from msgpack.exceptions import OutOfData
|
||||||
from typing import Callable, Self, Union, Optional
|
from typing import Callable, Self, Union, Optional
|
||||||
from traceback import format_exc
|
from traceback import format_exc
|
||||||
@@ -23,6 +24,8 @@ class Client:
|
|||||||
self.callsign = client_callsign
|
self.callsign = client_callsign
|
||||||
self.app = pe.app.Application()
|
self.app = pe.app.Application()
|
||||||
self.started = False
|
self.started = False
|
||||||
|
self._connection_locks = {}
|
||||||
|
self.lock_locker = Lock()
|
||||||
signal.signal(signal.SIGINT, self.exit_gracefully)
|
signal.signal(signal.SIGINT, self.exit_gracefully)
|
||||||
signal.signal(signal.SIGTERM, self.exit_gracefully)
|
signal.signal(signal.SIGTERM, self.exit_gracefully)
|
||||||
|
|
||||||
@@ -85,6 +88,9 @@ class Client:
|
|||||||
raise RuntimeError("Must start client before creating connections.")
|
raise RuntimeError("Must start client before creating connections.")
|
||||||
if not ax25.Address.valid_call(dest):
|
if not ax25.Address.valid_call(dest):
|
||||||
raise ValueError(f"Provided destination callsign '{dest}' is invalid.")
|
raise ValueError(f"Provided destination callsign '{dest}' is invalid.")
|
||||||
|
with self.lock_locker:
|
||||||
|
if dest.upper() not in self._connection_locks:
|
||||||
|
self._connection_locks[dest.upper()] = Lock()
|
||||||
conn = self.connection_callsign(dest.upper())
|
conn = self.connection_callsign(dest.upper())
|
||||||
if conn is not None:
|
if conn is not None:
|
||||||
return conn
|
return conn
|
||||||
@@ -103,20 +109,25 @@ class Client:
|
|||||||
if conn.state.name != "CONNECTED":
|
if conn.state.name != "CONNECTED":
|
||||||
raise RuntimeError("Connection is not connected.")
|
raise RuntimeError("Connection is not connected.")
|
||||||
logging.debug(f"Sending request {req}")
|
logging.debug(f"Sending request {req}")
|
||||||
conn.send_data(req.pack())
|
dest = conn.remote_callsign.upper()
|
||||||
cutoff_date = datetime.datetime.now() + datetime.timedelta(seconds=timeout)
|
with self.lock_locker:
|
||||||
while datetime.datetime.now() < cutoff_date:
|
if dest not in self._connection_locks:
|
||||||
if conn.state.name != "CONNECTED":
|
self._connection_locks[dest] = Lock()
|
||||||
logging.error(f"Connection {conn} disconnected.")
|
with self._connection_locks[dest]:
|
||||||
return None
|
conn.send_data(req.pack())
|
||||||
try:
|
cutoff_date = datetime.datetime.now() + datetime.timedelta(seconds=timeout)
|
||||||
unpacked = conn.data.unpack()
|
while datetime.datetime.now() < cutoff_date:
|
||||||
except:
|
if conn.state.name != "CONNECTED":
|
||||||
time.sleep(.1)
|
logging.error(f"Connection {conn} disconnected.")
|
||||||
continue
|
return None
|
||||||
msg = Message.partial_unpack(unpacked)
|
try:
|
||||||
return Response(msg)
|
unpacked = conn.data.unpack()
|
||||||
return None
|
except:
|
||||||
|
time.sleep(.1)
|
||||||
|
continue
|
||||||
|
msg = Message.partial_unpack(unpacked)
|
||||||
|
return Response(msg)
|
||||||
|
return None
|
||||||
|
|
||||||
def send_receive_callsign(self, req: Request, callsign: str, timeout: int = 300) -> Optional[Response]:
|
def send_receive_callsign(self, req: Request, callsign: str, timeout: int = 300) -> Optional[Response]:
|
||||||
return self.send_and_receive(req, self.connection_for(callsign), timeout=timeout)
|
return self.send_and_receive(req, self.connection_for(callsign), timeout=timeout)
|
||||||
|
|||||||
Reference in New Issue
Block a user