From f0e5bb288d2c3452a3660d1e1ccb4dd36f707049 Mon Sep 17 00:00:00 2001 From: Michael Woods Date: Sun, 16 Mar 2025 00:58:25 -0400 Subject: [PATCH] Adding a better testing method without needing TNC or radio. --- examples/misc/dirtester.py | 84 +++++++++ src/packetserver/client/__init__.py | 53 +++--- src/packetserver/client/cli/job.py | 2 +- src/packetserver/client/testing.py | 63 +++++++ src/packetserver/common/testing.py | 249 +++++++++++++++++++++++++- src/packetserver/server/testserver.py | 82 ++++++++- 6 files changed, 503 insertions(+), 30 deletions(-) create mode 100644 examples/misc/dirtester.py create mode 100644 src/packetserver/client/testing.py diff --git a/examples/misc/dirtester.py b/examples/misc/dirtester.py new file mode 100644 index 0000000..104c453 --- /dev/null +++ b/examples/misc/dirtester.py @@ -0,0 +1,84 @@ +"""Example file for using a directory test server to test without a TNC or radio.""" + +import msgpack + +from packetserver.common import Request, Response, Message +from packetserver.common.testing import DirectoryTestServerConnection, SimpleDirectoryConnection +from packetserver.server.testserver import DirectoryTestServer +from packetserver.server.objects import Object +from packetserver.server.messages import Message as Mail +from packetserver.server.messages import Attachment +import time +import logging +import json +import os +import os.path +from shutil import rmtree + +logging.basicConfig(level=logging.DEBUG) + +server_callsign = "KQ4PEC" +client_callsign = 'KQ4PEC-7' +#client_callsign = "TEST1" +conn_dir = "/tmp/ts_conn_dir" +data_dir = "/tmp/tmp_ps_data" + +if os.path.isdir(conn_dir): + rmtree(conn_dir) + os.mkdir(conn_dir) +else: + os.mkdir(conn_dir) + +if not os.path.isdir(data_dir): + os.mkdir(data_dir) + +ts = DirectoryTestServer(server_callsign, connection_directory=os.path.abspath(conn_dir), + data_dir=os.path.abspath(data_dir), zeo=True) +ts.start() + +time.sleep(1) +print("creating connection") +new_conn_dir = os.path.join(conn_dir,f"{client_callsign}--{server_callsign}") +os.mkdir(new_conn_dir) + +conn = SimpleDirectoryConnection.create_directory_connection(client_callsign, new_conn_dir) +print(conn.remote_callsign) +print(conn.call_to) +print(conn.call_from) + +req = Request.blank() + +#req.set_var('fetch_attachments', 1) +req.path = "" +req.method = Request.Method.GET + +#req.method=Request.Method.POST +#attach = [Attachment("test.txt", "Hello sir, I hope that this message finds you well. The other day..")] +#req.payload = Mail("Hi there from a test user!", "KQ4PEC", attachments=attach).to_dict() + + +#req.payload = Object(name="test.txt", data="hello there").to_dict() + +print("sending request") +conn.send_data(req.pack()) +print("Waiting on response.") + +data = None +while data is None: + conn.check_for_data() + try: + data = conn.data.unpack() + except msgpack.OutOfData: + pass + time.sleep(.5) + +ts.stop() +print("Waiting for server to stop.") +time.sleep(2) +#print(f"Got some data: {data}") +msg = data +print(f"msg: {msg}") +response = Response(Message.partial_unpack(msg)) +#print(type(response.payload)) +#print(f"Response: {response}: {response.payload}") +print(json.dumps(response.payload, indent=4)) diff --git a/src/packetserver/client/__init__.py b/src/packetserver/client/__init__.py index 232401c..b59ddaa 100644 --- a/src/packetserver/client/__init__.py +++ b/src/packetserver/client/__init__.py @@ -1,7 +1,7 @@ import datetime import pe.app from ZEO.asyncio.server import new_connection - +from packetserver.common.testing import SimpleDirectoryConnection from packetserver.common import Response, Message, Request, PacketServerConnection, send_response, send_blank_response import ax25 import logging @@ -63,7 +63,7 @@ class Client: return self.connections[key] return None - def connection_for(self, callsign: str): + def connection_for(self, callsign: str) -> Union[PacketServerConnection, SimpleDirectoryConnection]: if not ax25.Address.valid_call(callsign): raise ValueError("Must supply a valid callsign.") callsign = callsign.upper().strip() @@ -90,7 +90,7 @@ class Client: for key in cm._connections.keys(): cm._connections[key].close() - def new_connection(self, dest: str) -> PacketServerConnection: + def new_connection(self, dest: str) -> Union[PacketServerConnection, SimpleDirectoryConnection]: if not self.started: raise RuntimeError("Must start client before creating connections.") if not ax25.Address.valid_call(dest): @@ -113,7 +113,28 @@ class Client: time.sleep(8) return conn - def send_and_receive(self, req: Request, conn: PacketServerConnection, timeout: int = 300) -> Optional[Response]: + def receive(self, req: Request, conn: Union[PacketServerConnection,SimpleDirectoryConnection], timeout: int = 300): + cutoff_date = datetime.datetime.now() + datetime.timedelta(seconds=timeout) + logging.debug(f"{datetime.datetime.now()}: Request timeout date is {cutoff_date}") + while datetime.datetime.now() < cutoff_date: + if conn.state.name != "CONNECTED": + logging.error(f"Connection {conn} disconnected.") + if self.keep_log: + self.request_log.append((req, None)) + return None + try: + unpacked = conn.data.unpack() + except: + time.sleep(.1) + continue + msg = Message.partial_unpack(unpacked) + resp = Response(msg) + return resp + logging.warning(f"{datetime.datetime.now()}: Request {req} timed out.") + return None + + def send_and_receive(self, req: Request, conn: Union[PacketServerConnection,SimpleDirectoryConnection], + timeout: int = 300) -> Optional[Response]: if conn.state.name != "CONNECTED": raise RuntimeError("Connection is not connected.") logging.debug(f"Sending request {req}") @@ -124,27 +145,9 @@ class Client: with self._connection_locks[dest]: conn.data = Unpacker() conn.send_data(req.pack()) - cutoff_date = datetime.datetime.now() + datetime.timedelta(seconds=timeout) - logging.debug(f"{datetime.datetime.now()}: Request timeout date is {cutoff_date}") - while datetime.datetime.now() < cutoff_date: - if conn.state.name != "CONNECTED": - logging.error(f"Connection {conn} disconnected.") - if self.keep_log: - self.request_log.append((req,None)) - return None - try: - unpacked = conn.data.unpack() - except: - time.sleep(.1) - continue - msg = Message.partial_unpack(unpacked) - resp = Response(msg) - if self.keep_log: - self.request_log.append((req, resp)) - return resp - logging.warning(f"{datetime.datetime.now()}: Request {req} timed out.") - self.request_log.append((req, None)) - return None + resp = self.receive(req, conn, timeout=timeout) + self.request_log.append((req, resp)) + return resp def send_receive_callsign(self, req: Request, callsign: str, timeout: int = 300) -> Optional[Response]: return self.send_and_receive(req, self.connection_for(callsign), timeout=timeout) diff --git a/src/packetserver/client/cli/job.py b/src/packetserver/client/cli/job.py index d446dc6..5b8798a 100644 --- a/src/packetserver/client/cli/job.py +++ b/src/packetserver/client/cli/job.py @@ -45,7 +45,7 @@ def get(ctx, job_id, all_jobs, no_data): # TODO decide what to do with output an jobs_out = [get_job_id(client,ctx.obj['bbs'], get_data=fetch_data)] dicts_out = [] for j in jobs_out: - + pass except Exception as e: click.echo(str(e), err=True) diff --git a/src/packetserver/client/testing.py b/src/packetserver/client/testing.py new file mode 100644 index 0000000..6fd54e2 --- /dev/null +++ b/src/packetserver/client/testing.py @@ -0,0 +1,63 @@ +from typing import Union + +from packetserver.common import Request, PacketServerConnection +from packetserver.common.testing import SimpleDirectoryConnection +from packetserver.client import Client +import ax25 + +class TestClient(Client): + def __init__(self, conn_dir: str, callsign: str, keep_log: bool = True): + super().__init__('', 0, callsign, keep_log=keep_log) + self._connections = {} + + @property + def connections(self) -> dict: + return self._connections + + def connection_exists(self, callsign: str): + if not ax25.Address.valid_call(callsign): + raise ValueError("Must supply a valid callsign.") + callsign = callsign.upper().strip() + for key in self.connections.keys(): + if key.split(":")[1] == callsign: + return True + return False + + def new_connection(self, dest: str) -> SimpleDirectoryConnection: + if not self.started: + raise RuntimeError("Must start client before creating connections.") + if not ax25.Address.valid_call(dest): + raise ValueError(f"Provided destination callsign '{dest}' is invalid.") + with self.lock_locker: + if dest.upper() not in self._connection_locks: + self._connection_locks[dest.upper()] = Lock() + with self._connection_locks[dest.upper()]: + conn = self.connection_callsign(dest.upper()) + if conn is not None: + return conn + + def connection_for(self, callsign: str): + if not ax25.Address.valid_call(callsign): + raise ValueError("Must supply a valid callsign.") + callsign = callsign.upper().strip() + if self.connection_exists(callsign): + return self.connection_callsign(callsign) + else: + return self.new_connection(callsign) + + def receive(self, req: Request, conn: Union[PacketServerConnection,SimpleDirectoryConnection], timeout: int = 300): + if type(conn) is SimpleDirectoryConnection: + conn.check_for_data() + return super().receive(req, conn, timeout=timeout) + + def clear_connections(self): + if self.app._engine is not None: + cm = self.app._engine._active_handler._handlers[1]._connection_map + for key in cm._connections.keys(): + cm._connections[key].close() + + def start(self): + pass + + def stop(self): + pass \ No newline at end of file diff --git a/src/packetserver/common/testing.py b/src/packetserver/common/testing.py index dd93d44..21d963a 100644 --- a/src/packetserver/common/testing.py +++ b/src/packetserver/common/testing.py @@ -1,8 +1,12 @@ +import msgpack + from . import PacketServerConnection from pe.connect import ConnectionState from msgpack import Unpacker -from typing import Union, Self +from typing import Union, Self, Optional +import os.path import logging +import ax25 class DummyPacketServerConnection(PacketServerConnection): @@ -17,4 +21,245 @@ class DummyPacketServerConnection(PacketServerConnection): def send_data(self, data: Union[bytes, bytearray]): self.sent_data.feed(data) - logging.debug(f"Sender added {data} to self.sent_data.feed") \ No newline at end of file + logging.debug(f"Sender added {data} to self.sent_data.feed") + +class DirectoryTestServerConnection(PacketServerConnection): + """Monitors a directory for messages in msgpack format.""" + def __init__(self, call_from: str, call_to: str, directory: str, incoming=False): + super().__init__(0, call_from, call_to, incoming=incoming) + self._state = ConnectionState.CONNECTED + if not os.path.isdir(directory): + raise FileNotFoundError(f"No such directory as {directory}") + self._directory = directory + self._sent_data = Unpacker() + self._pid = 1 + self.closing = False + + @classmethod + def create_directory_connection(cls, self_callsign: str, directory: str) -> Self: + + if not ax25.Address.valid_call(self_callsign): + raise ValueError("self_callsign must be a valid callsign.") + + if not os.path.isdir(directory): + raise NotADirectoryError(f"{directory} is not a directory or doesn't exist.") + + spl = os.path.basename(directory).split('--') + if len(spl) != 2: + raise ValueError(f"Directory {directory} has the wrong name to be a connection dir.") + + src = spl[0] + dst = spl[1] + + if not ax25.Address.valid_call(src): + raise ValueError(f"Directory {directory} has the wrong name to be a connection dir.") + + if not ax25.Address.valid_call(dst): + raise ValueError(f"Directory {directory} has the wrong name to be a connection dir.") + + if dst.upper() == self_callsign.upper(): + incoming = True + else: + incoming = False + + return DirectoryTestServerConnection(src, dst, directory, incoming=incoming) + + @property + def pid(self) -> int: + old = self._pid + self._pid = self._pid + 1 + return old + + @property + def directory(self) -> str: + return self._directory + + @property + def state(self): + return self._state + + @property + def file_path(self) -> str: + file_name = f"{self.local_callsign}.msg" + file_path = os.path.join(self._directory, file_name) + return file_path + + @property + def remote_file_path(self) -> str: + file_name = f"{self.remote_callsign}.msg" + file_path = os.path.join(self._directory, file_name) + return file_path + + def check_closed(self): + if self.closing: + self._state = ConnectionState.DISCONNECTED + if self._state is not ConnectionState.CONNECTED: + return True + if not os.path.isdir(self._directory): + self._state = ConnectionState.DISCONNECTED + self.disconnected() + return True + return False + + def write_out(self, data: bytes): + if self.check_closed(): + raise RuntimeError("Connection is closed. Cannot send.") + + if os.path.exists(self.file_path): + raise RuntimeError("The outgoing message file already exists. State is wrong for sending.") + + if os.path.exists(self.file_path+".tmp"): + os.remove(self.file_path+".tmp") + + open(self.file_path+".tmp", 'wb').write(data) + os.rename(self.file_path+".tmp", self.file_path) + + def send_data(self, data: Union[bytes, bytearray]): + if self.check_closed(): + raise RuntimeError("Connection is closed. Cannot send.") + self._sent_data.feed(data) + logging.debug(f"Sender added {data} to self.sent_data.feed") + try: + obj = self._sent_data.unpack() + self.write_out(msgpack.packb(obj)) + logging.debug(f"Wrote complete binary message to {self.file_path}") + except msgpack.OutOfData as e: + pass + + def check_for_data(self): + """Monitors connection directory for data.""" + if self.closing: + self._state = ConnectionState.DISCONNECTED + if self.check_closed(): + return + + if os.path.isfile(self.remote_file_path): + logging.debug(f"{self.local_callsign} Found that the remote file path '{self.remote_file_path}' exists now.") + data = open(self.remote_file_path, 'rb').read() + self.data_received(self.pid, bytearray(data)) + os.remove(self.remote_file_path) + logging.debug(f"{self.local_callsign} detected data from {self.remote_callsign}: {msgpack.unpackb(data)}") + + +class SimpleDirectoryConnection: + def __init__(self, call_from: str, call_to: str, directory: str, incoming=False): + self._state = ConnectionState.CONNECTED + if not os.path.isdir(directory): + raise FileNotFoundError(f"No such directory as {directory}") + self._directory = directory + self._sent_data = Unpacker() + self.data = Unpacker() + self._pid = 1 + self.call_to = call_to + self.call_from = call_from + self.incoming = incoming + self._incoming = incoming + self.closing = False + if incoming: + self.local_callsign = call_to + self.remote_callsign = call_from + else: + self.local_callsign = call_from + self.remote_callsign = call_to + + @classmethod + def create_directory_connection(cls, self_callsign: str, directory: str) -> Self: + + if not ax25.Address.valid_call(self_callsign): + raise ValueError("self_callsign must be a valid callsign.") + + if not os.path.isdir(directory): + raise NotADirectoryError(f"{directory} is not a directory or doesn't exist.") + + spl = os.path.basename(directory).split('--') + if len(spl) != 2: + raise ValueError(f"Directory {directory} has the wrong name to be a connection dir.") + + src = spl[0] + dst = spl[1] + + if not ax25.Address.valid_call(src): + raise ValueError(f"Directory {directory} has the wrong name to be a connection dir.") + + if not ax25.Address.valid_call(dst): + raise ValueError(f"Directory {directory} has the wrong name to be a connection dir.") + + if dst.upper() == self_callsign.upper(): + incoming = True + else: + incoming = False + + return SimpleDirectoryConnection(src, dst, directory, incoming=incoming) + + @property + def pid(self) -> int: + old = self._pid + self._pid = self._pid + 1 + return old + + @property + def directory(self) -> str: + return self._directory + + @property + def state(self): + return self._state + + @property + def file_path(self) -> str: + file_name = f"{self.local_callsign}.msg" + file_path = os.path.join(self._directory, file_name) + return file_path + + @property + def remote_file_path(self) -> str: + file_name = f"{self.remote_callsign}.msg" + file_path = os.path.join(self._directory, file_name) + return file_path + + def check_closed(self): + if self.closing: + self._state = ConnectionState.DISCONNECTED + if self._state is not ConnectionState.CONNECTED: + return True + if not os.path.isdir(self._directory): + self._state = ConnectionState.DISCONNECTED + return True + return False + + def write_out(self, data: bytes): + if self.check_closed(): + raise RuntimeError("[SIMPLE] Connection is closed. Cannot send.") + + if os.path.exists(self.file_path): + raise RuntimeError("[SIMPLE] The outgoing message file already exists. State is wrong for sending.") + + if os.path.exists(self.file_path+".tmp"): + os.remove(self.file_path+".tmp") + + open(self.file_path+".tmp", 'wb').write(data) + os.rename(self.file_path+".tmp", self.file_path) + + def send_data(self, data: Union[bytes, bytearray]): + if self.check_closed(): + raise RuntimeError("[SIMPLE] Connection is closed. Cannot send.") + self._sent_data.feed(data) + logging.debug(f"[SIMPLE] Sender added {data} to self.sent_data.feed") + try: + obj = self._sent_data.unpack() + self.write_out(msgpack.packb(obj)) + logging.debug(f"[SIMPLE] Wrote complete binary message to {self.file_path}") + except msgpack.OutOfData as e: + pass + + def check_for_data(self): + """Monitors connection directory for data.""" + if self.closing: + self._state = ConnectionState.DISCONNECTED + if self.check_closed(): + return + if os.path.isfile(self.remote_file_path): + data = open(self.remote_file_path, 'rb').read() + os.remove(self.remote_file_path) + logging.debug(f"[SIMPLE] {self.local_callsign} detected data from {self.remote_callsign}: {data}") + self.data.feed(data) diff --git a/src/packetserver/server/testserver.py b/src/packetserver/server/testserver.py index e35c332..767f61b 100644 --- a/src/packetserver/server/testserver.py +++ b/src/packetserver/server/testserver.py @@ -1,9 +1,15 @@ import tempfile -from packetserver.common import (Response, Message, Request, send_response, send_blank_response, - DummyPacketServerConnection) +from packetserver.common import Response, Message, Request, send_response, send_blank_response +from packetserver.common.testing import DirectoryTestServerConnection, DummyPacketServerConnection +from pe.connect import ConnectionState from shutil import rmtree from threading import Thread from . import Server +import os +import os.path +import time +import logging +from traceback import format_exc class TestServer(Server): def __init__(self, server_callsign: str, data_dir: str = None, zeo: bool = True): @@ -35,3 +41,75 @@ class TestServer(Server): def send_test_data(self, conn: DummyPacketServerConnection, data: bytearray): conn.data_received(self.data_pid(), data) self.server_receiver(conn) + + +class DirectoryTestServer(Server): + def __init__(self, server_callsign: str, connection_directory: str, data_dir: str = None, zeo: bool = True): + super().__init__('localhost', 8000, server_callsign, data_dir=data_dir, zeo=zeo) + if not os.path.isdir(connection_directory): + raise NotADirectoryError(f"{connection_directory} is not a directory or doesn't exist.") + self._file_traffic_dir = os.path.abspath(connection_directory) + self._dir_connections = [] + + def check_connection_directories(self): + logging.debug(f"Server checking connection directory {self._file_traffic_dir}") + if not os.path.isdir(self._file_traffic_dir): + raise NotADirectoryError(f"{self._file_traffic_dir} is not a directory or doesn't exist.") + + for path in os.listdir(self._file_traffic_dir): + dir_path = os.path.join(self._file_traffic_dir, path) + logging.debug(f"Checking directory {dir_path}") + if not os.path.isdir(dir_path): + logging.debug(f"Server: {dir_path} is not a directory; skipping") + continue + + conn_exists = False + for conn in self._dir_connections: + if os.path.abspath(conn.directory) == dir_path: + conn_exists = True + break + + if conn_exists: + continue + + try: + conn = DirectoryTestServerConnection.create_directory_connection(self.callsign, dir_path) + logging.debug(f"New connection detected from {conn.remote_callsign}") + self._dir_connections.append(conn) + self.server_connection_bouncer(conn) + except ValueError: + logging.debug(format_exc()) + pass + + closed = [] + + for conn in self._dir_connections: + conn.check_for_data() + if conn.state is not ConnectionState.CONNECTED: + closed.append(conn) + + for conn in closed: + if conn in self._dir_connections: + self._dir_connections.remove(conn) + + def dir_worker(self): + """Intended to be running as a thread.""" + logging.info("Starting worker thread.") + while self.started: + self.server_worker() + self.check_connection_directories() + time.sleep(.5) + + def start(self): + if self.orchestrator is not None: + self.orchestrator.start() + self.start_db() + self.started = True + self.worker_thread = Thread(target=self.dir_worker) + self.worker_thread.start() + + def stop(self): + self.started = False + if self.orchestrator is not None: + self.orchestrator.stop() + self.stop_db()