Testing client close to done.

This commit is contained in:
Michael Woods
2025-03-16 01:13:49 -04:00
parent f0e5bb288d
commit dd17042008

View File

@@ -1,14 +1,23 @@
import os.path
import time
from typing import Union
from packetserver.common import Request, PacketServerConnection
from packetserver.common.testing import SimpleDirectoryConnection
from packetserver.client import Client
import ax25
from threading import Lock
import logging
import os.path
from shutil import rmtree
class TestClient(Client):
def __init__(self, conn_dir: str, callsign: str, keep_log: bool = True):
super().__init__('', 0, callsign, keep_log=keep_log)
self._connections = {}
if not os.path.isdir(conn_dir):
raise NotADirectoryError(f"Conn dir {conn_dir} does not exist.")
self._connection_directory = os.path.abspath(conn_dir)
@property
def connections(self) -> dict:
@@ -36,14 +45,13 @@ class TestClient(Client):
if conn is not None:
return conn
def connection_for(self, callsign: str):
if not ax25.Address.valid_call(callsign):
raise ValueError("Must supply a valid callsign.")
callsign = callsign.upper().strip()
if self.connection_exists(callsign):
return self.connection_callsign(callsign)
else:
return self.new_connection(callsign)
conn_dir = os.path.join(self._connection_directory, f"{self.callsign.upper()}--{dest.upper()}")
if not os.path.isdir(conn_dir):
os.mkdir(conn_dir)
conn = SimpleDirectoryConnection.create_directory_connection(self.callsign, conn_dir)
self.connections[f"{dest.upper()}:{self.callsign.upper()}"] = conn
logging.debug(f"Connection to {dest} ready.")
return conn
def receive(self, req: Request, conn: Union[PacketServerConnection,SimpleDirectoryConnection], timeout: int = 300):
if type(conn) is SimpleDirectoryConnection:
@@ -51,13 +59,20 @@ class TestClient(Client):
return super().receive(req, conn, timeout=timeout)
def clear_connections(self):
if self.app._engine is not None:
cm = self.app._engine._active_handler._handlers[1]._connection_map
for key in cm._connections.keys():
cm._connections[key].close()
def start(self):
closing = [x for x in self.connections]
for key in closing:
conn = self.connections[key]
conn.closing = True
conn.check_closed()
while os.path.exists(conn.directory):
try:
rmtree(conn.directory)
except:
time.sleep(.5)
pass
def stop(self):
def start(self): # TODO
pass
def stop(self): # TODO
pass