Added testclasses to use for testing the API without a TNC and radio.

This commit is contained in:
Michael Woods
2025-01-08 22:00:08 -05:00
parent 34f3b4242e
commit 50ab5290f5
4 changed files with 94 additions and 11 deletions

View File

@@ -1,4 +1,4 @@
from pe.connect import Connection
from pe.connect import Connection, ConnectionState
from threading import Lock
from msgpack import Unpacker
from msgpack import packb, unpackb
@@ -83,6 +83,22 @@ class PacketServerConnection(Connection):
return True
class DummyPacketServerConnection(PacketServerConnection):
def __init__(self, call_from: str, call_to: str, incoming=False):
super().__init__(0, call_from, call_to, incoming=incoming)
self.sent_data = Unpacker()
self._state = ConnectionState.CONNECTED
@property
def state(self):
return self._state
def send_data(self, data: Union[bytes, bytearray]):
self.sent_data.feed(data)
logging.debug(f"Sender added {data} to self.sent_data.feed")
class Message:
"""Base class for communication encapsulated in msgpack objects."""
@@ -328,6 +344,8 @@ def send_response(conn: PacketServerConnection, response: Response, original_req
logging.debug(f"sending response: {response}, {response.compression}, {response.payload}")
conn.send_data(response.pack())
logging.debug("response sent successfully")
else:
logging.warning(f"Attempted to send data, but connection state is {conn.state.name}")
def send_blank_response(conn: PacketServerConnection, original_request: Request, status_code: int = 200,
payload: Union[bytes, bytearray, str, dict] = ""):

View File

@@ -1,5 +1,6 @@
import pe.app
from packetserver.common import Response, Message, Request, PacketServerConnection, send_response, send_blank_response
from packetserver.common import Response, Message, Request, PacketServerConnection, send_response, send_blank_response, \
DummyPacketServerConnection
from packetserver.server.constants import default_server_config
from packetserver.server.users import User
from copy import deepcopy
@@ -165,7 +166,7 @@ class Server:
def register_path_handler(self, path_root: str, fn: Callable):
self.handlers[path_root.strip().lower()] = fn
def start(self):
def start_db(self):
if not self.zeo:
self.storage = ZODB.FileStorage.FileStorage(self.data_file)
self.db = ZODB.DB(self.storage)
@@ -182,19 +183,46 @@ class Server:
logging.info(f"Wrote ZEO server info to '{zeo_address_file}'")
except:
logging.warning(f"Couldn't write ZEO server info to '{zeo_address_file}'\n{format_exc()}")
def start(self):
self.start_db()
self.app.start(self.pe_server, self.pe_port)
self.app.register_callsigns(self.callsign)
def exit_gracefully(self, signum, frame):
self.stop()
def stop(self):
cm = self.app._engine._active_handler._handlers[1]._connection_map
for key in cm._connections.keys():
cm._connections[key].close()
self.app.stop()
def stop_db(self):
self.storage.close()
self.db.close()
if self.zeo:
logging.info("Stopping ZEO.")
self.zeo_stop()
def stop(self):
cm = self.app._engine._active_handler._handlers[1]._connection_map
for key in cm._connections.keys():
cm._connections[key].close()
self.app.stop()
self.stop_db()
class TestServer(Server):
def __init__(self, server_callsign: str, data_dir: str = None, zeo: bool = True):
super().__init__('localhost', 8000, server_callsign, data_dir=data_dir, zeo=zeo)
self._data_pid = 1
def start(self):
self.start_db()
def stop(self):
self.stop_db()
def data_pid(self) -> int:
old = self._data_pid
self._data_pid = self._data_pid + 1
return old
def send_test_data(self, conn: DummyPacketServerConnection, data: bytearray):
conn.data_received(self.data_pid(), data)
self.server_receiver(conn)

View File

@@ -21,19 +21,21 @@ def handle_root_get(req: Request, conn: PacketServerConnection,
motd = storage.root.config['motd']
if 'operator' in storage.root.config:
operator = storage.root.config['operator']
logging.debug(f"Root handler retrieved config. {operator} - {motd}")
logging.debug("Running user_authorized")
if user_authorized(conn, db):
user_message = f"User {conn.remote_callsign} is enabled."
else:
user_message = f"User {conn.remote_callsign} is not enabled."
logging.debug(f"User authorized: {user_message}")
response.payload = {
'operator': operator,
'motd': motd,
'user': user_message
}
logging.debug(f"Sending response {response}")
send_response(conn, response, req)
logging.debug("Sent reesponse.")
def root_root_handler(req: Request, conn: PacketServerConnection,
db: ZODB.DB):