Refactored to remove some circular dependencies, fixed bug in packing method. Added a basic compression decision.
This commit is contained in:
@@ -113,9 +113,10 @@ class Message:
|
|||||||
def pack(self) -> bytes:
|
def pack(self) -> bytes:
|
||||||
output = {'t': self.type.value, 'c': self.compression.value}
|
output = {'t': self.type.value, 'c': self.compression.value}
|
||||||
data_bytes = self.data_bytes
|
data_bytes = self.data_bytes
|
||||||
|
logging.debug("Packing Message")
|
||||||
if (self.compression is self.CompressionType.NONE) or (len(data_bytes) < 30):
|
if (self.compression is self.CompressionType.NONE) or (len(data_bytes) < 30):
|
||||||
output['d'] = data_bytes
|
output['d'] = data_bytes
|
||||||
|
output['c'] = self.CompressionType.NONE.value
|
||||||
return packb(output)
|
return packb(output)
|
||||||
|
|
||||||
if self.compression is self.CompressionType.BZIP2:
|
if self.compression is self.CompressionType.BZIP2:
|
||||||
@@ -282,3 +283,34 @@ class Response(Message):
|
|||||||
|
|
||||||
def __repr__(self):
|
def __repr__(self):
|
||||||
return f"<Response: {self.status_code}>"
|
return f"<Response: {self.status_code}>"
|
||||||
|
|
||||||
|
def send_response(conn: PacketServerConnection, response: Response, original_request: Request,
|
||||||
|
compression: Message.CompressionType = Message.CompressionType.BZIP2):
|
||||||
|
if conn.state.name == "CONNECTED" and not conn.closing:
|
||||||
|
|
||||||
|
# figure out compression setting based on request
|
||||||
|
comp = compression
|
||||||
|
|
||||||
|
if 'C' in original_request.vars:
|
||||||
|
val = original_request.vars['C']
|
||||||
|
for i in Message.CompressionType:
|
||||||
|
if str(val).strip().upper() == i.name:
|
||||||
|
comp = i
|
||||||
|
break
|
||||||
|
try:
|
||||||
|
if int(val) == i.value:
|
||||||
|
comp = i
|
||||||
|
except ValueError:
|
||||||
|
pass
|
||||||
|
response.compression = comp
|
||||||
|
|
||||||
|
logging.debug(f"sending response: {response}, {response.compression}, {response.payload}")
|
||||||
|
conn.send_data(response.pack())
|
||||||
|
logging.debug("response sent successfully")
|
||||||
|
|
||||||
|
def send_blank_response(conn: PacketServerConnection, original_request: Request, status_code: int = 200,
|
||||||
|
payload: Union[bytes, bytearray, str, dict] = ""):
|
||||||
|
response = Response.blank()
|
||||||
|
response.status_code = status_code
|
||||||
|
response.payload = payload
|
||||||
|
send_response(conn, response, original_request)
|
||||||
@@ -1,7 +1,6 @@
|
|||||||
import pe.app
|
import pe.app
|
||||||
import packetserver.common
|
from packetserver.common import Response, Message, Request, PacketServerConnection, send_response, send_blank_response
|
||||||
from packetserver.server.constants import default_server_config
|
from packetserver.server.constants import default_server_config
|
||||||
from packetserver.server.bulletin import init_bulletins
|
|
||||||
from copy import deepcopy
|
from copy import deepcopy
|
||||||
import ax25
|
import ax25
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
@@ -9,14 +8,20 @@ import ZODB, ZODB.FileStorage
|
|||||||
from BTrees.OOBTree import OOBTree
|
from BTrees.OOBTree import OOBTree
|
||||||
from persistent.mapping import PersistentMapping
|
from persistent.mapping import PersistentMapping
|
||||||
from persistent.list import PersistentList
|
from persistent.list import PersistentList
|
||||||
from packetserver.server.requests import process_incoming_data
|
|
||||||
from packetserver.server.requests import standard_handlers
|
from packetserver.server.requests import standard_handlers
|
||||||
import logging
|
import logging
|
||||||
import signal
|
import signal
|
||||||
import time
|
import time
|
||||||
from typing import Callable
|
from msgpack.exceptions import OutOfData
|
||||||
|
from typing import Callable, Self, Union
|
||||||
|
|
||||||
|
|
||||||
|
def init_bulletins(root: PersistentMapping):
|
||||||
|
if 'bulletins' not in root:
|
||||||
|
root['bulletins'] = PersistentList()
|
||||||
|
if 'bulletin_counter' not in root:
|
||||||
|
root['bulletin_counter'] = 0
|
||||||
|
|
||||||
class Server:
|
class Server:
|
||||||
def __init__(self, pe_server: str, port: int, server_callsign: str, data_dir: str = None):
|
def __init__(self, pe_server: str, port: int, server_callsign: str, data_dir: str = None):
|
||||||
if not ax25.Address.valid_call(server_callsign):
|
if not ax25.Address.valid_call(server_callsign):
|
||||||
@@ -50,8 +55,8 @@ class Server:
|
|||||||
conn.root.users = OOBTree()
|
conn.root.users = OOBTree()
|
||||||
init_bulletins(conn.root())
|
init_bulletins(conn.root())
|
||||||
self.app = pe.app.Application()
|
self.app = pe.app.Application()
|
||||||
packetserver.common.PacketServerConnection.receive_subscribers.append(lambda x: self.server_receiver(x))
|
PacketServerConnection.receive_subscribers.append(lambda x: self.server_receiver(x))
|
||||||
packetserver.common.PacketServerConnection.connection_subscribers.append(lambda x: self.server_connection_bouncer(x))
|
PacketServerConnection.connection_subscribers.append(lambda x: self.server_connection_bouncer(x))
|
||||||
signal.signal(signal.SIGINT, self.exit_gracefully)
|
signal.signal(signal.SIGINT, self.exit_gracefully)
|
||||||
signal.signal(signal.SIGTERM, self.exit_gracefully)
|
signal.signal(signal.SIGTERM, self.exit_gracefully)
|
||||||
|
|
||||||
@@ -60,7 +65,7 @@ class Server:
|
|||||||
def data_file(self) -> str:
|
def data_file(self) -> str:
|
||||||
return str(Path(self.home_dir).joinpath('data.zopedb'))
|
return str(Path(self.home_dir).joinpath('data.zopedb'))
|
||||||
|
|
||||||
def server_connection_bouncer(self, conn: packetserver.common.PacketServerConnection):
|
def server_connection_bouncer(self, conn: PacketServerConnection):
|
||||||
logging.debug("new connection bouncer checking for blacklist")
|
logging.debug("new connection bouncer checking for blacklist")
|
||||||
# blacklist check
|
# blacklist check
|
||||||
blacklisted = False
|
blacklisted = False
|
||||||
@@ -82,9 +87,46 @@ class Server:
|
|||||||
break
|
break
|
||||||
conn.close()
|
conn.close()
|
||||||
|
|
||||||
def server_receiver(self, conn: packetserver.common.PacketServerConnection):
|
def handle_request(self, req: Request, conn: PacketServerConnection):
|
||||||
|
"""Handles a proper request by handing off to the appropriate function depending on method and Path."""
|
||||||
|
logging.debug(f"asked to handle request: {req}")
|
||||||
|
if conn.closing:
|
||||||
|
logging.debug("Connection marked as closing. Ignoring it.")
|
||||||
|
return
|
||||||
|
req_root_path = req.path.split("/")[0]
|
||||||
|
if req_root_path in self.handlers:
|
||||||
|
logging.debug(f"found handler for req {req}")
|
||||||
|
self.handlers[req_root_path](req, conn, self.db)
|
||||||
|
return
|
||||||
|
logging.warning(f"unhandled request found: {req}")
|
||||||
|
send_blank_response(conn, req, status_code=404)
|
||||||
|
|
||||||
|
def process_incoming_data(self, connection: PacketServerConnection):
|
||||||
|
"""Handles incoming data."""
|
||||||
|
logging.debug("Running process_incoming_data on connection")
|
||||||
|
with connection.data_lock:
|
||||||
|
logging.debug("Data lock acquired")
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
msg = Message.partial_unpack(connection.data.unpack())
|
||||||
|
logging.debug(f"parsed a Message from data received")
|
||||||
|
except OutOfData:
|
||||||
|
logging.debug("no complete message yet, done until more data arrives")
|
||||||
|
break
|
||||||
|
except ValueError:
|
||||||
|
connection.send_data(b"BAD REQUEST. COULD NOT PARSE INCOMING DATA AS PACKETSERVER MESSAGE")
|
||||||
|
try:
|
||||||
|
request = Request(msg)
|
||||||
|
logging.debug(f"parsed Message into request {request}")
|
||||||
|
except ValueError:
|
||||||
|
connection.send_data(b"BAD REQUEST. DID NOT RECEIVE A REQUEST MESSAGE.")
|
||||||
|
logging.debug(f"attempting to handle request {request}")
|
||||||
|
self.handle_request(request, connection)
|
||||||
|
logging.debug("request handled")
|
||||||
|
|
||||||
|
def server_receiver(self, conn: PacketServerConnection):
|
||||||
logging.debug("running server receiver")
|
logging.debug("running server receiver")
|
||||||
process_incoming_data(conn, self)
|
self.process_incoming_data(conn)
|
||||||
|
|
||||||
def register_path_handler(self, path_root: str, fn: Callable):
|
def register_path_handler(self, path_root: str, fn: Callable):
|
||||||
self.handlers[path_root.strip().lower()] = fn
|
self.handlers[path_root.strip().lower()] = fn
|
||||||
|
|||||||
@@ -1,20 +1,14 @@
|
|||||||
|
import ax25
|
||||||
import persistent
|
import persistent
|
||||||
import persistent.list
|
import persistent.list
|
||||||
from persistent.mapping import PersistentMapping
|
from persistent.mapping import PersistentMapping
|
||||||
import datetime
|
import datetime
|
||||||
from typing import Self,Union,Optional
|
from typing import Self,Union,Optional
|
||||||
from packetserver.common import PacketServerConnection, Request, Response, Message
|
from packetserver.common import PacketServerConnection, Request, Response, Message
|
||||||
from packetserver.server import Server
|
from packetserver.server.requests import send_response, send_blank_response
|
||||||
from packetserver.server.requests import send_404
|
|
||||||
import ZODB
|
import ZODB
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
def init_bulletins(root: PersistentMapping):
|
|
||||||
if 'bulletins' not in root:
|
|
||||||
root['bulletins'] = persistent.list.PersistentList()
|
|
||||||
if 'bulletin_counter' not in root:
|
|
||||||
root['bulletin_counter'] = 0
|
|
||||||
|
|
||||||
def get_new_bulletin_id(root: PersistentMapping) -> int:
|
def get_new_bulletin_id(root: PersistentMapping) -> int:
|
||||||
if 'bulletin_counter' not in root:
|
if 'bulletin_counter' not in root:
|
||||||
root['bulletin_counter'] = 1
|
root['bulletin_counter'] = 1
|
||||||
@@ -32,6 +26,17 @@ class Bulletin(persistent.Persistent):
|
|||||||
return bull
|
return bull
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def get_recent_bulletins(cls, db_root: PersistentMapping, limit: int = None) -> list:
|
||||||
|
all_bulletins = sorted(db_root['bulletins'], key=lambda bulletin: bulletin.updated_at, reverse=True)
|
||||||
|
if not limit:
|
||||||
|
return all_bulletins
|
||||||
|
else:
|
||||||
|
if len(all_bulletins) < limit:
|
||||||
|
return all_bulletins
|
||||||
|
else:
|
||||||
|
return all_bulletins[:limit]
|
||||||
|
|
||||||
def __init__(self, author: str, subject: str, text: str):
|
def __init__(self, author: str, subject: str, text: str):
|
||||||
self.author = author
|
self.author = author
|
||||||
self.subject = subject
|
self.subject = subject
|
||||||
@@ -69,33 +74,74 @@ class Bulletin(persistent.Persistent):
|
|||||||
"updated_at": self.updated_at.isoformat()
|
"updated_at": self.updated_at.isoformat()
|
||||||
}
|
}
|
||||||
|
|
||||||
def handle_bulletin_get(req: Request, conn: PacketServerConnection, server: Server):
|
|
||||||
response = Response.blank()
|
|
||||||
with server.db.transaction() as db:
|
|
||||||
pass
|
|
||||||
return response
|
|
||||||
|
|
||||||
def handle_bulletin_post(req: Request, conn: PacketServerConnection, server: Server):
|
def handle_bulletin_get(req: Request, conn: PacketServerConnection, db: ZODB.DB):
|
||||||
response = Response.blank()
|
response = Response.blank()
|
||||||
with server.db.transaction() as db:
|
sp = req.path.split("/")
|
||||||
pass
|
bid = None
|
||||||
return response
|
limit = None
|
||||||
|
if 'limit' in req.vars:
|
||||||
|
try:
|
||||||
|
limit = int(req.vars['limit'])
|
||||||
|
except ValueError:
|
||||||
|
pass
|
||||||
|
if 'id' in req.vars:
|
||||||
|
try:
|
||||||
|
bid = int(req.vars['id'])
|
||||||
|
except ValueError:
|
||||||
|
pass
|
||||||
|
if len(sp) > 2:
|
||||||
|
try:
|
||||||
|
bid = int(sp[2].strip())
|
||||||
|
except ValueError:
|
||||||
|
pass
|
||||||
|
|
||||||
def handle_bulletin_update(req: Request, conn: PacketServerConnection, server: Server):
|
with db.transaction() as db:
|
||||||
|
if bid:
|
||||||
|
bull = Bulletin.get_bulletin_by_id(bid, db.root())
|
||||||
|
if bull:
|
||||||
|
response.payload = bull.to_dict()
|
||||||
|
response.status_code = 200
|
||||||
|
else:
|
||||||
|
response.status_code = 404
|
||||||
|
else:
|
||||||
|
bulls = Bulletin.get_recent_bulletins(db.root(), limit=limit)
|
||||||
|
response.payload = [bulletin.to_dict() for bulletin in bulls]
|
||||||
|
response.status_code = 200
|
||||||
|
|
||||||
|
send_response(conn, response, req)
|
||||||
|
|
||||||
|
def handle_bulletin_post(req: Request, conn: PacketServerConnection, db: ZODB.DB):
|
||||||
|
author = ax25.Address(conn.remote_callsign).call
|
||||||
|
if type(req.payload) is not dict:
|
||||||
|
send_blank_response(conn, req, 400, payload="Include dict in payload with subject and body")
|
||||||
|
if 'subject' not in req.payload:
|
||||||
|
send_blank_response(conn, req, 400, payload="Include dict in payload with subject and body")
|
||||||
|
if 'body' not in req.payload:
|
||||||
|
send_blank_response(conn, req, 400, payload="Include dict in payload with subject and body")
|
||||||
|
b = Bulletin(author, str(req.payload['subject']), str(req.payload['body']))
|
||||||
response = Response.blank()
|
response = Response.blank()
|
||||||
with server.db.transaction() as db:
|
with db.transaction() as db:
|
||||||
pass
|
b.write_new(db.root())
|
||||||
return response
|
send_blank_response(conn, req, status_code=201)
|
||||||
|
|
||||||
def handle_bulletin_delete(req: Request, conn: PacketServerConnection, server: Server):
|
def handle_bulletin_update(req: Request, conn: PacketServerConnection, db: ZODB.DB):
|
||||||
response = Response.blank()
|
response = Response.blank()
|
||||||
with server.db.transaction() as db:
|
with db.transaction() as db:
|
||||||
pass
|
pass
|
||||||
return response
|
send_response(conn, response, req)
|
||||||
|
|
||||||
def bulletin_root_handler(req: Request, conn: PacketServerConnection, server: Server):
|
def handle_bulletin_delete(req: Request, conn: PacketServerConnection, db: ZODB.DB):
|
||||||
|
response = Response.blank()
|
||||||
|
with db.transaction() as db:
|
||||||
|
pass
|
||||||
|
send_response(conn, response, req)
|
||||||
|
|
||||||
|
def bulletin_root_handler(req: Request, conn: PacketServerConnection, db: ZODB.DB):
|
||||||
logging.debug(f"{req} being processed by bulletin_root_handler")
|
logging.debug(f"{req} being processed by bulletin_root_handler")
|
||||||
if req.method is Request.Method.GET:
|
if req.method is Request.Method.GET:
|
||||||
handle_bulletin_get(req, conn, server)
|
handle_bulletin_get(req, conn, db)
|
||||||
|
elif req.method is Request.Method.POST:
|
||||||
|
handle_bulletin_post(req, conn, db)
|
||||||
else:
|
else:
|
||||||
send_404(conn)
|
send_blank_response(conn, req, status_code=404)
|
||||||
|
|||||||
@@ -1,26 +1,20 @@
|
|||||||
"""Module for handling requests as they arrive to connection objects and servers."""
|
"""Module for handling requests as they arrive to connection objects and servers."""
|
||||||
|
|
||||||
from msgpack.exceptions import OutOfData
|
from msgpack.exceptions import OutOfData
|
||||||
from packetserver.common import Message, Request, Response, PacketServerConnection
|
from packetserver.common import Message, Request, Response, PacketServerConnection, send_response, send_blank_response
|
||||||
from .bulletin import bulletin_root_handler
|
from .bulletin import bulletin_root_handler
|
||||||
import logging
|
import logging
|
||||||
from typing import Union
|
from typing import Union
|
||||||
|
import ZODB
|
||||||
def send_404(conn: PacketServerConnection, payload: Union[bytes, bytearray, str, dict] = ""):
|
|
||||||
response_404 = Response.blank()
|
|
||||||
response_404.status_code = 404
|
|
||||||
response_404.payload = payload
|
|
||||||
if conn.state.name == "CONNECTED":
|
|
||||||
conn.send_data(response_404.pack())
|
|
||||||
|
|
||||||
def handle_root_get(req: Request, conn: PacketServerConnection,
|
def handle_root_get(req: Request, conn: PacketServerConnection,
|
||||||
server: 'packetserver.server.Server'):
|
db: ZODB.DB):
|
||||||
logging.debug(f"Received request: {req}")
|
logging.debug(f"Root get handler received request: {req}")
|
||||||
response = Response.blank()
|
response = Response.blank()
|
||||||
response.compression = Message.CompressionType.BZIP2
|
response.compression = Message.CompressionType.BZIP2
|
||||||
operator = ""
|
operator = ""
|
||||||
motd = ""
|
motd = ""
|
||||||
with server.db.transaction() as storage:
|
with db.transaction() as storage:
|
||||||
if 'motd' in storage.root.config:
|
if 'motd' in storage.root.config:
|
||||||
motd = storage.root.config['motd']
|
motd = storage.root.config['motd']
|
||||||
if 'operator' in storage.root.config:
|
if 'operator' in storage.root.config:
|
||||||
@@ -31,76 +25,20 @@ def handle_root_get(req: Request, conn: PacketServerConnection,
|
|||||||
'motd': motd
|
'motd': motd
|
||||||
}
|
}
|
||||||
|
|
||||||
if conn.state.name == "CONNECTED" and not conn.closing:
|
send_response(conn, response, req)
|
||||||
logging.debug(f"sending response: {response}, {response.compression}, {response.payload}")
|
|
||||||
conn.send_data(response.pack())
|
|
||||||
logging.debug("response sent successfully")
|
|
||||||
|
|
||||||
def root_root_handler(req: Request, conn: PacketServerConnection,
|
def root_root_handler(req: Request, conn: PacketServerConnection,
|
||||||
server: 'packetserver.server.Server'):
|
db: ZODB.DB):
|
||||||
logging.debug(f"{req} got to root_root_handler")
|
logging.debug(f"{req} got to root_root_handler")
|
||||||
if req.method is Request.Method.GET:
|
if req.method is Request.Method.GET:
|
||||||
handle_root_get(req, conn, server)
|
handle_root_get(req, conn, db)
|
||||||
else:
|
else:
|
||||||
logging.warning(f"unhandled request found: {req}")
|
logging.warning(f"unhandled request found: {req}")
|
||||||
response_404 = Response.blank()
|
send_blank_response(conn, req, status_code=404)
|
||||||
response_404.status_code = 404
|
|
||||||
if (conn.state.name == "CONNECTED") and not conn.closing:
|
|
||||||
conn.send_data(response_404.pack())
|
|
||||||
logging.debug(f"Sent 404 in response to {req}")
|
|
||||||
|
|
||||||
standard_handlers = {
|
standard_handlers = {
|
||||||
"": root_root_handler,
|
"": root_root_handler,
|
||||||
"bulletin": bulletin_root_handler
|
"bulletin": bulletin_root_handler
|
||||||
}
|
}
|
||||||
|
|
||||||
def handle_request(req: Request, conn: PacketServerConnection,
|
|
||||||
server: 'packetserver.server.Server'):
|
|
||||||
"""Handles a proper request by handing off to the appropriate function depending on method and Path."""
|
|
||||||
logging.debug(f"asked to handle request: {req}")
|
|
||||||
if conn.closing:
|
|
||||||
logging.debug("Connection marked as closing. Ignoring it.")
|
|
||||||
return
|
|
||||||
req_root_path = req.path.split("/")[0]
|
|
||||||
if req_root_path in server.handlers:
|
|
||||||
logging.debug(f"found handler for req {req}")
|
|
||||||
server.handlers[req_root_path](req, conn, server)
|
|
||||||
return
|
|
||||||
logging.warning(f"unhandled request found: {req}")
|
|
||||||
response_404 = Response.blank()
|
|
||||||
response_404.status_code = 404
|
|
||||||
if conn.state.name == "CONNECTED":
|
|
||||||
conn.send_data(response_404.pack())
|
|
||||||
logging.debug(f"Sent 404 in response to {req}")
|
|
||||||
|
|
||||||
def process_incoming_data(connection: 'packetserver.common.PacketServerConnection',
|
|
||||||
server: 'packetserver.server.Server'):
|
|
||||||
"""Handles incoming data."""
|
|
||||||
logging.debug("Running process_incoming_data on connection")
|
|
||||||
with connection.data_lock:
|
|
||||||
logging.debug("Data lock acquired")
|
|
||||||
while True:
|
|
||||||
try:
|
|
||||||
msg = Message.partial_unpack(connection.data.unpack())
|
|
||||||
logging.debug(f"parsed a Message from data received")
|
|
||||||
except OutOfData:
|
|
||||||
logging.debug("no complete message yet, done until more data arrives")
|
|
||||||
break
|
|
||||||
except ValueError:
|
|
||||||
r = Response.blank()
|
|
||||||
r.status_code = 400
|
|
||||||
r.payload = "BAD REQUEST. COULD NOT PARSE INCOMING DATA AS PACKETSERVER MESSAGE"
|
|
||||||
connection.send_data(r.pack())
|
|
||||||
connection.send_data(b"BAD REQUEST. COULD NOT PARSE INCOMING DATA AS PACKETSERVER MESSAGE")
|
|
||||||
try:
|
|
||||||
request = Request(msg)
|
|
||||||
logging.debug(f"parsed Message into request {request}")
|
|
||||||
except ValueError:
|
|
||||||
r = Response.blank()
|
|
||||||
r.status_code = 400
|
|
||||||
r.payload = "BAD REQUEST. DID NOT RECEIVE A REQUEST MESSAGE."
|
|
||||||
connection.send_data(r.pack())
|
|
||||||
connection.send_data(b"BAD REQUEST. DID NOT RECEIVE A REQUEST MESSAGE.")
|
|
||||||
logging.debug(f"attempting to handle request {request}")
|
|
||||||
handle_request(request, connection, server)
|
|
||||||
logging.debug("request handled")
|
|
||||||
Reference in New Issue
Block a user