Moved testserver to another module and removed the import for dummypacketserverconnection.

This commit is contained in:
Michael Woods
2025-02-15 15:16:29 -05:00
parent 7f295e8fbb
commit ed02b86e79
4 changed files with 54 additions and 53 deletions

View File

@@ -82,6 +82,22 @@ class PacketServerConnection(Connection):
def query_accept(cls, port, call_from, call_to): def query_accept(cls, port, call_from, call_to):
return True return True
class DummyPacketServerConnection(PacketServerConnection):
def __init__(self, call_from: str, call_to: str, incoming=False):
super().__init__(0, call_from, call_to, incoming=incoming)
self.sent_data = Unpacker()
self._state = ConnectionState.CONNECTED
@property
def state(self):
return self._state
def send_data(self, data: Union[bytes, bytearray]):
self.sent_data.feed(data)
logging.debug(f"Sender added {data} to self.sent_data.feed")
class Message: class Message:
"""Base class for communication encapsulated in msgpack objects.""" """Base class for communication encapsulated in msgpack objects."""

View File

@@ -1,21 +0,0 @@
from . import PacketServerConnection
from msgpack import Unpacker
from msgpack import packb, unpackb
from pe.connect import Connection, ConnectionState
import logging
from typing import Union
class DummyPacketServerConnection(PacketServerConnection):
def __init__(self, call_from: str, call_to: str, incoming=False):
super().__init__(0, call_from, call_to, incoming=incoming)
self.sent_data = Unpacker()
self._state = ConnectionState.CONNECTED
@property
def state(self):
return self._state
def send_data(self, data: Union[bytes, bytearray]):
self.sent_data.feed(data)
logging.debug(f"Sender added {data} to self.sent_data.feed")

View File

@@ -2,8 +2,7 @@ import datetime
import tempfile import tempfile
import pe.app import pe.app
from packetserver.common import Response, Message, Request, PacketServerConnection, send_response, send_blank_response, \ from packetserver.common import Response, Message, Request, PacketServerConnection, send_response, send_blank_response
DummyPacketServerConnection
from packetserver.server.constants import default_server_config from packetserver.server.constants import default_server_config
from packetserver.server.users import User from packetserver.server.users import User
from copy import deepcopy from copy import deepcopy
@@ -300,33 +299,3 @@ class Server:
self.stop_db() self.stop_db()
class TestServer(Server):
def __init__(self, server_callsign: str, data_dir: str = None, zeo: bool = True):
super().__init__('localhost', 8000, server_callsign, data_dir=data_dir, zeo=zeo)
self._data_pid = 1
self._file_traffic_dir = tempfile.mkdtemp()
self._file_traffic_thread = None
def start(self):
if self.orchestrator is not None:
self.orchestrator.start()
self.start_db()
self.started = True
self.worker_thread = Thread(target=self.run_worker)
self.worker_thread.start()
def stop(self):
self.started = False
if self.orchestrator is not None:
self.orchestrator.stop()
self.stop_db()
rmtree(self._file_traffic_dir)
def data_pid(self) -> int:
old = self._data_pid
self._data_pid = self._data_pid + 1
return old
def send_test_data(self, conn: DummyPacketServerConnection, data: bytearray):
conn.data_received(self.data_pid(), data)
self.server_receiver(conn)

View File

@@ -0,0 +1,37 @@
import tempfile
from packetserver.common import (Response, Message, Request, send_response, send_blank_response,
DummyPacketServerConnection)
from shutil import rmtree
from threading import Thread
from . import Server
class TestServer(Server):
def __init__(self, server_callsign: str, data_dir: str = None, zeo: bool = True):
super().__init__('localhost', 8000, server_callsign, data_dir=data_dir, zeo=zeo)
self._data_pid = 1
self._file_traffic_dir = tempfile.mkdtemp()
self._file_traffic_thread = None
def start(self):
if self.orchestrator is not None:
self.orchestrator.start()
self.start_db()
self.started = True
self.worker_thread = Thread(target=self.run_worker)
self.worker_thread.start()
def stop(self):
self.started = False
if self.orchestrator is not None:
self.orchestrator.stop()
self.stop_db()
rmtree(self._file_traffic_dir)
def data_pid(self) -> int:
old = self._data_pid
self._data_pid = self._data_pid + 1
return old
def send_test_data(self, conn: DummyPacketServerConnection, data: bytearray):
conn.data_received(self.data_pid(), data)
self.server_receiver(conn)