Adding a better testing method without needing TNC or radio.

This commit is contained in:
Michael Woods
2025-03-16 00:58:25 -04:00
parent b9a6f35582
commit f0e5bb288d
6 changed files with 503 additions and 30 deletions

View File

@@ -1,7 +1,7 @@
import datetime
import pe.app
from ZEO.asyncio.server import new_connection
from packetserver.common.testing import SimpleDirectoryConnection
from packetserver.common import Response, Message, Request, PacketServerConnection, send_response, send_blank_response
import ax25
import logging
@@ -63,7 +63,7 @@ class Client:
return self.connections[key]
return None
def connection_for(self, callsign: str):
def connection_for(self, callsign: str) -> Union[PacketServerConnection, SimpleDirectoryConnection]:
if not ax25.Address.valid_call(callsign):
raise ValueError("Must supply a valid callsign.")
callsign = callsign.upper().strip()
@@ -90,7 +90,7 @@ class Client:
for key in cm._connections.keys():
cm._connections[key].close()
def new_connection(self, dest: str) -> PacketServerConnection:
def new_connection(self, dest: str) -> Union[PacketServerConnection, SimpleDirectoryConnection]:
if not self.started:
raise RuntimeError("Must start client before creating connections.")
if not ax25.Address.valid_call(dest):
@@ -113,7 +113,28 @@ class Client:
time.sleep(8)
return conn
def send_and_receive(self, req: Request, conn: PacketServerConnection, timeout: int = 300) -> Optional[Response]:
def receive(self, req: Request, conn: Union[PacketServerConnection,SimpleDirectoryConnection], timeout: int = 300):
cutoff_date = datetime.datetime.now() + datetime.timedelta(seconds=timeout)
logging.debug(f"{datetime.datetime.now()}: Request timeout date is {cutoff_date}")
while datetime.datetime.now() < cutoff_date:
if conn.state.name != "CONNECTED":
logging.error(f"Connection {conn} disconnected.")
if self.keep_log:
self.request_log.append((req, None))
return None
try:
unpacked = conn.data.unpack()
except:
time.sleep(.1)
continue
msg = Message.partial_unpack(unpacked)
resp = Response(msg)
return resp
logging.warning(f"{datetime.datetime.now()}: Request {req} timed out.")
return None
def send_and_receive(self, req: Request, conn: Union[PacketServerConnection,SimpleDirectoryConnection],
timeout: int = 300) -> Optional[Response]:
if conn.state.name != "CONNECTED":
raise RuntimeError("Connection is not connected.")
logging.debug(f"Sending request {req}")
@@ -124,27 +145,9 @@ class Client:
with self._connection_locks[dest]:
conn.data = Unpacker()
conn.send_data(req.pack())
cutoff_date = datetime.datetime.now() + datetime.timedelta(seconds=timeout)
logging.debug(f"{datetime.datetime.now()}: Request timeout date is {cutoff_date}")
while datetime.datetime.now() < cutoff_date:
if conn.state.name != "CONNECTED":
logging.error(f"Connection {conn} disconnected.")
if self.keep_log:
self.request_log.append((req,None))
return None
try:
unpacked = conn.data.unpack()
except:
time.sleep(.1)
continue
msg = Message.partial_unpack(unpacked)
resp = Response(msg)
if self.keep_log:
self.request_log.append((req, resp))
return resp
logging.warning(f"{datetime.datetime.now()}: Request {req} timed out.")
self.request_log.append((req, None))
return None
resp = self.receive(req, conn, timeout=timeout)
self.request_log.append((req, resp))
return resp
def send_receive_callsign(self, req: Request, callsign: str, timeout: int = 300) -> Optional[Response]:
return self.send_and_receive(req, self.connection_for(callsign), timeout=timeout)

View File

@@ -45,7 +45,7 @@ def get(ctx, job_id, all_jobs, no_data): # TODO decide what to do with output an
jobs_out = [get_job_id(client,ctx.obj['bbs'], get_data=fetch_data)]
dicts_out = []
for j in jobs_out:
pass
except Exception as e:
click.echo(str(e), err=True)

View File

@@ -0,0 +1,63 @@
from typing import Union
from packetserver.common import Request, PacketServerConnection
from packetserver.common.testing import SimpleDirectoryConnection
from packetserver.client import Client
import ax25
class TestClient(Client):
def __init__(self, conn_dir: str, callsign: str, keep_log: bool = True):
super().__init__('', 0, callsign, keep_log=keep_log)
self._connections = {}
@property
def connections(self) -> dict:
return self._connections
def connection_exists(self, callsign: str):
if not ax25.Address.valid_call(callsign):
raise ValueError("Must supply a valid callsign.")
callsign = callsign.upper().strip()
for key in self.connections.keys():
if key.split(":")[1] == callsign:
return True
return False
def new_connection(self, dest: str) -> SimpleDirectoryConnection:
if not self.started:
raise RuntimeError("Must start client before creating connections.")
if not ax25.Address.valid_call(dest):
raise ValueError(f"Provided destination callsign '{dest}' is invalid.")
with self.lock_locker:
if dest.upper() not in self._connection_locks:
self._connection_locks[dest.upper()] = Lock()
with self._connection_locks[dest.upper()]:
conn = self.connection_callsign(dest.upper())
if conn is not None:
return conn
def connection_for(self, callsign: str):
if not ax25.Address.valid_call(callsign):
raise ValueError("Must supply a valid callsign.")
callsign = callsign.upper().strip()
if self.connection_exists(callsign):
return self.connection_callsign(callsign)
else:
return self.new_connection(callsign)
def receive(self, req: Request, conn: Union[PacketServerConnection,SimpleDirectoryConnection], timeout: int = 300):
if type(conn) is SimpleDirectoryConnection:
conn.check_for_data()
return super().receive(req, conn, timeout=timeout)
def clear_connections(self):
if self.app._engine is not None:
cm = self.app._engine._active_handler._handlers[1]._connection_map
for key in cm._connections.keys():
cm._connections[key].close()
def start(self):
pass
def stop(self):
pass

View File

@@ -1,8 +1,12 @@
import msgpack
from . import PacketServerConnection
from pe.connect import ConnectionState
from msgpack import Unpacker
from typing import Union, Self
from typing import Union, Self, Optional
import os.path
import logging
import ax25
class DummyPacketServerConnection(PacketServerConnection):
@@ -17,4 +21,245 @@ class DummyPacketServerConnection(PacketServerConnection):
def send_data(self, data: Union[bytes, bytearray]):
self.sent_data.feed(data)
logging.debug(f"Sender added {data} to self.sent_data.feed")
logging.debug(f"Sender added {data} to self.sent_data.feed")
class DirectoryTestServerConnection(PacketServerConnection):
"""Monitors a directory for messages in msgpack format."""
def __init__(self, call_from: str, call_to: str, directory: str, incoming=False):
super().__init__(0, call_from, call_to, incoming=incoming)
self._state = ConnectionState.CONNECTED
if not os.path.isdir(directory):
raise FileNotFoundError(f"No such directory as {directory}")
self._directory = directory
self._sent_data = Unpacker()
self._pid = 1
self.closing = False
@classmethod
def create_directory_connection(cls, self_callsign: str, directory: str) -> Self:
if not ax25.Address.valid_call(self_callsign):
raise ValueError("self_callsign must be a valid callsign.")
if not os.path.isdir(directory):
raise NotADirectoryError(f"{directory} is not a directory or doesn't exist.")
spl = os.path.basename(directory).split('--')
if len(spl) != 2:
raise ValueError(f"Directory {directory} has the wrong name to be a connection dir.")
src = spl[0]
dst = spl[1]
if not ax25.Address.valid_call(src):
raise ValueError(f"Directory {directory} has the wrong name to be a connection dir.")
if not ax25.Address.valid_call(dst):
raise ValueError(f"Directory {directory} has the wrong name to be a connection dir.")
if dst.upper() == self_callsign.upper():
incoming = True
else:
incoming = False
return DirectoryTestServerConnection(src, dst, directory, incoming=incoming)
@property
def pid(self) -> int:
old = self._pid
self._pid = self._pid + 1
return old
@property
def directory(self) -> str:
return self._directory
@property
def state(self):
return self._state
@property
def file_path(self) -> str:
file_name = f"{self.local_callsign}.msg"
file_path = os.path.join(self._directory, file_name)
return file_path
@property
def remote_file_path(self) -> str:
file_name = f"{self.remote_callsign}.msg"
file_path = os.path.join(self._directory, file_name)
return file_path
def check_closed(self):
if self.closing:
self._state = ConnectionState.DISCONNECTED
if self._state is not ConnectionState.CONNECTED:
return True
if not os.path.isdir(self._directory):
self._state = ConnectionState.DISCONNECTED
self.disconnected()
return True
return False
def write_out(self, data: bytes):
if self.check_closed():
raise RuntimeError("Connection is closed. Cannot send.")
if os.path.exists(self.file_path):
raise RuntimeError("The outgoing message file already exists. State is wrong for sending.")
if os.path.exists(self.file_path+".tmp"):
os.remove(self.file_path+".tmp")
open(self.file_path+".tmp", 'wb').write(data)
os.rename(self.file_path+".tmp", self.file_path)
def send_data(self, data: Union[bytes, bytearray]):
if self.check_closed():
raise RuntimeError("Connection is closed. Cannot send.")
self._sent_data.feed(data)
logging.debug(f"Sender added {data} to self.sent_data.feed")
try:
obj = self._sent_data.unpack()
self.write_out(msgpack.packb(obj))
logging.debug(f"Wrote complete binary message to {self.file_path}")
except msgpack.OutOfData as e:
pass
def check_for_data(self):
"""Monitors connection directory for data."""
if self.closing:
self._state = ConnectionState.DISCONNECTED
if self.check_closed():
return
if os.path.isfile(self.remote_file_path):
logging.debug(f"{self.local_callsign} Found that the remote file path '{self.remote_file_path}' exists now.")
data = open(self.remote_file_path, 'rb').read()
self.data_received(self.pid, bytearray(data))
os.remove(self.remote_file_path)
logging.debug(f"{self.local_callsign} detected data from {self.remote_callsign}: {msgpack.unpackb(data)}")
class SimpleDirectoryConnection:
def __init__(self, call_from: str, call_to: str, directory: str, incoming=False):
self._state = ConnectionState.CONNECTED
if not os.path.isdir(directory):
raise FileNotFoundError(f"No such directory as {directory}")
self._directory = directory
self._sent_data = Unpacker()
self.data = Unpacker()
self._pid = 1
self.call_to = call_to
self.call_from = call_from
self.incoming = incoming
self._incoming = incoming
self.closing = False
if incoming:
self.local_callsign = call_to
self.remote_callsign = call_from
else:
self.local_callsign = call_from
self.remote_callsign = call_to
@classmethod
def create_directory_connection(cls, self_callsign: str, directory: str) -> Self:
if not ax25.Address.valid_call(self_callsign):
raise ValueError("self_callsign must be a valid callsign.")
if not os.path.isdir(directory):
raise NotADirectoryError(f"{directory} is not a directory or doesn't exist.")
spl = os.path.basename(directory).split('--')
if len(spl) != 2:
raise ValueError(f"Directory {directory} has the wrong name to be a connection dir.")
src = spl[0]
dst = spl[1]
if not ax25.Address.valid_call(src):
raise ValueError(f"Directory {directory} has the wrong name to be a connection dir.")
if not ax25.Address.valid_call(dst):
raise ValueError(f"Directory {directory} has the wrong name to be a connection dir.")
if dst.upper() == self_callsign.upper():
incoming = True
else:
incoming = False
return SimpleDirectoryConnection(src, dst, directory, incoming=incoming)
@property
def pid(self) -> int:
old = self._pid
self._pid = self._pid + 1
return old
@property
def directory(self) -> str:
return self._directory
@property
def state(self):
return self._state
@property
def file_path(self) -> str:
file_name = f"{self.local_callsign}.msg"
file_path = os.path.join(self._directory, file_name)
return file_path
@property
def remote_file_path(self) -> str:
file_name = f"{self.remote_callsign}.msg"
file_path = os.path.join(self._directory, file_name)
return file_path
def check_closed(self):
if self.closing:
self._state = ConnectionState.DISCONNECTED
if self._state is not ConnectionState.CONNECTED:
return True
if not os.path.isdir(self._directory):
self._state = ConnectionState.DISCONNECTED
return True
return False
def write_out(self, data: bytes):
if self.check_closed():
raise RuntimeError("[SIMPLE] Connection is closed. Cannot send.")
if os.path.exists(self.file_path):
raise RuntimeError("[SIMPLE] The outgoing message file already exists. State is wrong for sending.")
if os.path.exists(self.file_path+".tmp"):
os.remove(self.file_path+".tmp")
open(self.file_path+".tmp", 'wb').write(data)
os.rename(self.file_path+".tmp", self.file_path)
def send_data(self, data: Union[bytes, bytearray]):
if self.check_closed():
raise RuntimeError("[SIMPLE] Connection is closed. Cannot send.")
self._sent_data.feed(data)
logging.debug(f"[SIMPLE] Sender added {data} to self.sent_data.feed")
try:
obj = self._sent_data.unpack()
self.write_out(msgpack.packb(obj))
logging.debug(f"[SIMPLE] Wrote complete binary message to {self.file_path}")
except msgpack.OutOfData as e:
pass
def check_for_data(self):
"""Monitors connection directory for data."""
if self.closing:
self._state = ConnectionState.DISCONNECTED
if self.check_closed():
return
if os.path.isfile(self.remote_file_path):
data = open(self.remote_file_path, 'rb').read()
os.remove(self.remote_file_path)
logging.debug(f"[SIMPLE] {self.local_callsign} detected data from {self.remote_callsign}: {data}")
self.data.feed(data)

View File

@@ -1,9 +1,15 @@
import tempfile
from packetserver.common import (Response, Message, Request, send_response, send_blank_response,
DummyPacketServerConnection)
from packetserver.common import Response, Message, Request, send_response, send_blank_response
from packetserver.common.testing import DirectoryTestServerConnection, DummyPacketServerConnection
from pe.connect import ConnectionState
from shutil import rmtree
from threading import Thread
from . import Server
import os
import os.path
import time
import logging
from traceback import format_exc
class TestServer(Server):
def __init__(self, server_callsign: str, data_dir: str = None, zeo: bool = True):
@@ -35,3 +41,75 @@ class TestServer(Server):
def send_test_data(self, conn: DummyPacketServerConnection, data: bytearray):
conn.data_received(self.data_pid(), data)
self.server_receiver(conn)
class DirectoryTestServer(Server):
def __init__(self, server_callsign: str, connection_directory: str, data_dir: str = None, zeo: bool = True):
super().__init__('localhost', 8000, server_callsign, data_dir=data_dir, zeo=zeo)
if not os.path.isdir(connection_directory):
raise NotADirectoryError(f"{connection_directory} is not a directory or doesn't exist.")
self._file_traffic_dir = os.path.abspath(connection_directory)
self._dir_connections = []
def check_connection_directories(self):
logging.debug(f"Server checking connection directory {self._file_traffic_dir}")
if not os.path.isdir(self._file_traffic_dir):
raise NotADirectoryError(f"{self._file_traffic_dir} is not a directory or doesn't exist.")
for path in os.listdir(self._file_traffic_dir):
dir_path = os.path.join(self._file_traffic_dir, path)
logging.debug(f"Checking directory {dir_path}")
if not os.path.isdir(dir_path):
logging.debug(f"Server: {dir_path} is not a directory; skipping")
continue
conn_exists = False
for conn in self._dir_connections:
if os.path.abspath(conn.directory) == dir_path:
conn_exists = True
break
if conn_exists:
continue
try:
conn = DirectoryTestServerConnection.create_directory_connection(self.callsign, dir_path)
logging.debug(f"New connection detected from {conn.remote_callsign}")
self._dir_connections.append(conn)
self.server_connection_bouncer(conn)
except ValueError:
logging.debug(format_exc())
pass
closed = []
for conn in self._dir_connections:
conn.check_for_data()
if conn.state is not ConnectionState.CONNECTED:
closed.append(conn)
for conn in closed:
if conn in self._dir_connections:
self._dir_connections.remove(conn)
def dir_worker(self):
"""Intended to be running as a thread."""
logging.info("Starting worker thread.")
while self.started:
self.server_worker()
self.check_connection_directories()
time.sleep(.5)
def start(self):
if self.orchestrator is not None:
self.orchestrator.start()
self.start_db()
self.started = True
self.worker_thread = Thread(target=self.dir_worker)
self.worker_thread.start()
def stop(self):
self.started = False
if self.orchestrator is not None:
self.orchestrator.stop()
self.stop_db()