Test client done and working. Added an env var for the cli client to enable testmode by passing a connection directory to use instead of TNC.

This commit is contained in:
Michael Woods
2025-03-16 16:52:46 -04:00
parent dd17042008
commit 412eec1d6a
6 changed files with 119 additions and 11 deletions

View File

@@ -0,0 +1,24 @@
"""Example file for using a directory test server to test without a TNC or radio."""
from packetserver.server.testserver import DirectoryTestServer
import os.path
from shutil import rmtree
import logging
logging.basicConfig(level=logging.DEBUG)
server_callsign = "KQ4PEC"
conn_dir = "/tmp/ts_conn_dir"
data_dir = "/tmp/tmp_ps_data"
if os.path.isdir(conn_dir):
rmtree(conn_dir)
os.mkdir(conn_dir)
else:
os.mkdir(conn_dir)
if not os.path.isdir(data_dir):
os.mkdir(data_dir)
ts = DirectoryTestServer(server_callsign, connection_directory=os.path.abspath(conn_dir),
data_dir=os.path.abspath(data_dir), zeo=True)
ts.start()

View File

@@ -0,0 +1,69 @@
"""Example file for using a directory test server to test without a TNC or radio."""
import msgpack
from packetserver.common import Request, Response, Message
from packetserver.common.testing import DirectoryTestServerConnection, SimpleDirectoryConnection
from packetserver.server.testserver import DirectoryTestServer
from packetserver.client.testing import TestClient
from packetserver.server.objects import Object
from packetserver.server.messages import Message as Mail
from packetserver.server.messages import Attachment
import time
import logging
import json
import os
import os.path
from shutil import rmtree
#logging.basicConfig(level=logging.DEBUG)
server_callsign = "KQ4PEC"
client_callsign = 'KQ4PEC-7'
#client_callsign = "TEST1"
conn_dir = "/tmp/ts_conn_dir"
data_dir = "/tmp/tmp_ps_data"
if os.path.isdir(conn_dir):
rmtree(conn_dir)
os.mkdir(conn_dir)
else:
os.mkdir(conn_dir)
if not os.path.isdir(data_dir):
os.mkdir(data_dir)
tc = TestClient(os.path.abspath(conn_dir), client_callsign)
ts = DirectoryTestServer(server_callsign, connection_directory=os.path.abspath(conn_dir),
data_dir=os.path.abspath(data_dir), zeo=True)
ts.start()
tc.start()
print("creating connection")
conn = tc.connection_for(server_callsign)
print(conn.remote_callsign)
print(conn.call_to)
print(conn.call_from)
time.sleep(1)
req = Request.blank()
#req.set_var('fetch_attachments', 1)
req.path = ""
req.method = Request.Method.GET
#req.method=Request.Method.POST
#attach = [Attachment("test.txt", "Hello sir, I hope that this message finds you well. The other day..")]
#req.payload = Mail("Hi there from a test user!", "KQ4PEC", attachments=attach).to_dict()
#req.payload = Object(name="test.txt", data="hello there").to_dict()
print("sending request")
resp = tc.send_receive_callsign(req, server_callsign)
ts.stop()
print("Waiting for server to stop.")
time.sleep(1)
response = resp
#print(type(response.payload))
#print(f"Response: {response}: {response.payload}")
print(json.dumps(response.payload, indent=4))

View File

@@ -83,6 +83,10 @@ def cli(ctx, conf, server, agwpe, port, callsign, keep_log):
storage = ZODB.FileStorage.FileStorage(os.path.join(cfg['cli']['directory'], DEFAULT_DB_FILE)) storage = ZODB.FileStorage.FileStorage(os.path.join(cfg['cli']['directory'], DEFAULT_DB_FILE))
db = ZODB.DB(storage) db = ZODB.DB(storage)
if 'TEST_SERVER_DIR' in os.environ:
from packetserver.client.testing import TestClient
client = TestClient(os.environ['TEST_SERVER_DIR'], ctx.obj['callsign'])
else:
client = Client(ctx.obj['agwpe_server'], ctx.obj['port'], ctx.obj['callsign'], keep_log=ctx.obj['keep_log']) client = Client(ctx.obj['agwpe_server'], ctx.obj['port'], ctx.obj['callsign'], keep_log=ctx.obj['keep_log'])
try: try:
client.start() client.start()

View File

@@ -9,6 +9,7 @@ import ax25
from threading import Lock from threading import Lock
import logging import logging
import os.path import os.path
import datetime
from shutil import rmtree from shutil import rmtree
class TestClient(Client): class TestClient(Client):
@@ -55,7 +56,12 @@ class TestClient(Client):
def receive(self, req: Request, conn: Union[PacketServerConnection,SimpleDirectoryConnection], timeout: int = 300): def receive(self, req: Request, conn: Union[PacketServerConnection,SimpleDirectoryConnection], timeout: int = 300):
if type(conn) is SimpleDirectoryConnection: if type(conn) is SimpleDirectoryConnection:
conn.check_for_data() time.sleep(1)
cutoff_date = datetime.datetime.now() + datetime.timedelta(seconds=timeout)
while datetime.datetime.now() < cutoff_date:
logging.debug(f"Client {self.callsign} checking for connection conn {conn}")
if conn.check_for_data():
break
return super().receive(req, conn, timeout=timeout) return super().receive(req, conn, timeout=timeout)
def clear_connections(self): def clear_connections(self):
@@ -71,8 +77,9 @@ class TestClient(Client):
time.sleep(.5) time.sleep(.5)
pass pass
def start(self): # TODO def start(self):
pass self.started = True
def stop(self): # TODO def stop(self):
pass self.clear_connections()
self.started = False

View File

@@ -252,14 +252,17 @@ class SimpleDirectoryConnection:
except msgpack.OutOfData as e: except msgpack.OutOfData as e:
pass pass
def check_for_data(self): def check_for_data(self) -> bool:
"""Monitors connection directory for data.""" """Monitors connection directory for data."""
if self.closing: if self.closing:
self._state = ConnectionState.DISCONNECTED self._state = ConnectionState.DISCONNECTED
if self.check_closed(): if self.check_closed():
return return False
if os.path.isfile(self.remote_file_path): if os.path.isfile(self.remote_file_path):
data = open(self.remote_file_path, 'rb').read() data = open(self.remote_file_path, 'rb').read()
os.remove(self.remote_file_path) os.remove(self.remote_file_path)
logging.debug(f"[SIMPLE] {self.local_callsign} detected data from {self.remote_callsign}: {data}") logging.debug(f"[SIMPLE] {self.local_callsign} detected data from {self.remote_callsign}: {data}")
self.data.feed(data) self.data.feed(data)
return True
else:
return False

View File

@@ -1,5 +1,5 @@
"""Module for handling requests as they arrive to connection objects and servers.""" """Module for handling requests as they arrive to connection objects and servers."""
import ax25
from msgpack.exceptions import OutOfData from msgpack.exceptions import OutOfData
from packetserver.common import Message, Request, Response, PacketServerConnection, send_response, send_blank_response from packetserver.common import Message, Request, Response, PacketServerConnection, send_response, send_blank_response
from .bulletin import bulletin_root_handler from .bulletin import bulletin_root_handler
@@ -28,10 +28,11 @@ def handle_root_get(req: Request, conn: PacketServerConnection,
jobs_enabled = storage.root.config['jobs_enabled'] jobs_enabled = storage.root.config['jobs_enabled']
logging.debug(f"Root handler retrieved config. {operator} - {motd}") logging.debug(f"Root handler retrieved config. {operator} - {motd}")
logging.debug("Running user_authorized") logging.debug("Running user_authorized")
base = ax25.Address(conn.remote_callsign).call
if user_authorized(conn, db): if user_authorized(conn, db):
user_message = f"User {conn.remote_callsign} is enabled." user_message = f"User {base} is enabled."
else: else:
user_message = f"User {conn.remote_callsign} is not enabled." user_message = f"User {base} is not enabled."
logging.debug(f"User authorized: {user_message}") logging.debug(f"User authorized: {user_message}")
response.payload = { response.payload = {
'operator': operator, 'operator': operator,