diff --git a/src/packetserver/client/__init__.py b/src/packetserver/client/__init__.py index 5892ab1..432d9ca 100644 --- a/src/packetserver/client/__init__.py +++ b/src/packetserver/client/__init__.py @@ -1,2 +1,87 @@ +import datetime +import pe.app +from ZEO.asyncio.server import new_connection + +from packetserver.common import Response, Message, Request, PacketServerConnection, send_response, send_blank_response, \ + DummyPacketServerConnection +import ax25 +import logging +import signal +import time +from msgpack.exceptions import OutOfData +from typing import Callable, Self, Union, Optional +from traceback import format_exc +from os import linesep +from shutil import rmtree +from threading import Thread + class Client: - pass \ No newline at end of file + def __init__(self, pe_server: str, port: int, client_callsign: str): + if not ax25.Address.valid_call(client_callsign): + raise ValueError(f"Provided callsign '{client_callsign}' is invalid.") + self.pe_server = pe_server + self.pe_port = port + self.callsign = client_callsign + self.app = pe.app.Application() + self.started = False + signal.signal(signal.SIGINT, self.stop) + signal.signal(signal.SIGTERM, self.stop) + + def stop(self): + self.started = False + self.clear_connections() + self.app.stop() + + def start(self): + self.app.start(self.pe_server, self.pe_port) + self.app.register_callsigns(self.callsign) + self.started = True + + def clear_connections(self): + for key in cm._connections.keys(): + cm._connections[key].close() + + def new_connection(self, dest: str): + if not self.started: + raise RuntimeError("Must start client before creating connections.") + if not ax25.Address.valid_call(dest): + raise ValueError(f"Provided destination callsign '{dest}' is invalid.") + return self.app.open_connection(0, self.callsign, dest) + + def send_and_receive(self, req: Request, conn: PacketServerConnection, timeout: int = 300) -> Optional[Response]: + if conn.state.name != "CONNECTED": + raise RuntimeError("Connection is not connected.") + logging.debug(f"Sending request {req}") + conn.send_data(req.pack()) + cutoff_date = datetime.datetime.now() + datetime.timedelta(seconds=timeout) + while datetime.datetime.now() < cutoff_date: + if conn.state.name != "CONNECTED": + logging.error(f"Connection {conn} disconnected.") + return None + try: + unpacked = conn.data.unpack() + except: + time.sleep(.1) + continue + msg = Message.partial_unpack(unpacked) + return Response(msg) + return None + + def single_connect_send_receive(self, dest: str, req: Request, timeout: int = 300) -> Optional[Response]: + conn = self.new_connection(dest) + logging.debug("Waiting for connection to be ready.") + cutoff_date = datetime.datetime.now() + datetime.timedelta(seconds=timeout) + + while (datetime.datetime.now() < cutoff_date) and (conn.state.name != "CONNECTED"): + if conn.state.name in ["DISCONNECTED", "DISCONNECTING"]: + logging.error(f"Connection {conn} disconnected.") + return None + + logging.debug("Allowing connection to stabilize for 3 seconds") + time.sleep(3) + remaining_time = int((cutoff_date - datetime.datetime.now()).total_seconds()) + 1 + if remaining_time <= 0: + logging.debug("Connection attempt timed out.") + conn.close() + return None + return self.send_and_receive(req, conn, timeout=int(remaining_time)) diff --git a/src/packetserver/runner/__init__.py b/src/packetserver/runner/__init__.py index e95ae3d..01854e9 100644 --- a/src/packetserver/runner/__init__.py +++ b/src/packetserver/runner/__init__.py @@ -86,6 +86,7 @@ class Runner: self.started_at = datetime.datetime.now() self.finished_at = None self._result = (0,(b'', b'')) + self._artifact_archive = b'' if environment: for key in environment: self.env[key] = environment[key] diff --git a/src/packetserver/server/__init__.py b/src/packetserver/server/__init__.py index ba70dc8..1408bb0 100644 --- a/src/packetserver/server/__init__.py +++ b/src/packetserver/server/__init__.py @@ -1,3 +1,6 @@ +import datetime +import tempfile + import pe.app from packetserver.common import Response, Message, Request, PacketServerConnection, send_response, send_blank_response, \ DummyPacketServerConnection @@ -18,7 +21,10 @@ from msgpack.exceptions import OutOfData from typing import Callable, Self, Union from traceback import format_exc from os import linesep +from shutil import rmtree from threading import Thread +from packetserver.server.jobs import get_orchestrator_from_config, Job, JobStatus +from packetserver.runner import RunnerStatus, RunnerFile, Orchestrator, Runner def init_bulletins(root: PersistentMapping): if 'bulletins' not in root: @@ -86,6 +92,13 @@ class Server: if 'user_jobs' not in conn.root(): conn.root.user_jobs = PersistentMapping() init_bulletins(conn.root()) + + if 'jobs_enabled' in conn.root.config: + if 'runner' in conn.root.config['jobs_config']: + val = str(conn.root.config['jobs_config']['runner']).lower().strip() + if val in ['podman']: + self.orchestrator = get_orchestrator_from_config(conn.root.config['jobs_config']) + self.app = pe.app.Application() PacketServerConnection.receive_subscribers.append(lambda x: self.server_receiver(x)) PacketServerConnection.connection_subscribers.append(lambda x: self.server_connection_bouncer(x)) @@ -185,14 +198,54 @@ class Server: if not self.started: return # Add things to do here: - # TODO Queue jobs if applicable. + + if self.orchestrator.started: + with self.db.transaction() as storage: + # queue as many jobs as possible + while self.orchestrator.runners_available(): + if len(storage.root.job_queue) > 0: + jid = storage.root.job_queue[0] + try: + logging.info(f"Starting job {jid}") + job = Job.get_job_by_id(jid, storage.root()) + except: + logging.error(f"Error retrieving job {jid}") + break + runner = self.orchestrator.new_runner(job.owner, job.cmd, jid) + if runner is not None: + storage.root.job_queue.remove(jid) + job.status = JobStatus.RUNNING + job.started_at = datetime.datetime.now() + logging.info(f"Started job {job}") + else: + break + + if self.orchestrator.started: + finished_runners = [] + for runner in self.orchestrator.runners: + if runner.is_finished(): + logging.debug(f"Finishing runner {runner}") + with self.db.transaction() as storage: + try: + if Job.update_job_from_runner(runner, storage.root()): + finished_runners.append(runner) + logging.info(f"Runner {runner} successfully synced with jobs.") + else: + logging.error(f"update_job_from_runner returned False.") + logging.error(f"Error while finishing runner and updating job status {runner}") + except: + logging.error(f"Error while finishing runner and updating job status {runner}\n:{format_exc()}") + for runner in finished_runners: + logging.info(f"Removing completed runner {runner}") + with self.orchestrator.runner_lock: + self.orchestrator.runners.remove(runner) def run_worker(self): """Intended to be running as a thread.""" logging.info("Starting worker thread.") while self.started: self.server_worker() - time.sleep(1) + time.sleep(.5) def __del__(self): self.stop() @@ -221,6 +274,7 @@ class Server: self.app.register_callsigns(self.callsign) self.started = True if self.orchestrator is not None: + logging.info(f"Starting orchestrator {self.orchestrator}") self.orchestrator.start() self.worker_thread = Thread(target=self.run_worker) self.worker_thread.start() @@ -250,12 +304,23 @@ class TestServer(Server): def __init__(self, server_callsign: str, data_dir: str = None, zeo: bool = True): super().__init__('localhost', 8000, server_callsign, data_dir=data_dir, zeo=zeo) self._data_pid = 1 + self._file_traffic_dir = tempfile.mkdtemp() + self._file_traffic_thread = None def start(self): + if self.orchestrator is not None: + self.orchestrator.start() self.start_db() + self.started = True + self.worker_thread = Thread(target=self.run_worker) + self.worker_thread.start() def stop(self): + self.started = False + if self.orchestrator is not None: + self.orchestrator.stop() self.stop_db() + rmtree(self._file_traffic_dir) def data_pid(self) -> int: old = self._data_pid diff --git a/src/packetserver/server/jobs.py b/src/packetserver/server/jobs.py index 60c5103..839e1bd 100644 --- a/src/packetserver/server/jobs.py +++ b/src/packetserver/server/jobs.py @@ -1,10 +1,14 @@ +import re + import ax25 import persistent import persistent.list from persistent.mapping import PersistentMapping import datetime from typing import Self,Union,Optional,Tuple +from traceback import format_exc from packetserver.common import PacketServerConnection, Request, Response, Message, send_response, send_blank_response +from packetserver.common.constants import no_values import ZODB from persistent.list import PersistentList import logging @@ -12,7 +16,8 @@ from packetserver.server.users import user_authorized import gzip import tarfile import json -from packetserver.runner.podman import TarFileExtractor +from packetserver.runner.podman import TarFileExtractor, PodmanOrchestrator, PodmanRunner, PodmanOptions +from packetserver.runner import Orchestrator, Runner, RunnerStatus from enum import Enum from io import BytesIO import base64 @@ -27,6 +32,16 @@ class JobStatus(Enum): FAILED = 7 TIMED_OUT = 8 +def get_orchestrator_from_config(cfg: dict) -> Union[Orchestrator, PodmanOrchestrator]: + if 'runner' in cfg: + val = cfg['runner'].lower().strip() + if val == "podman": + return PodmanOrchestrator() + else: + raise RuntimeError("Other orchestrators not implemented yet.") + else: + raise RuntimeError("Runners not configured in root.config.jobs_config") + def get_new_job_id(root: PersistentMapping) -> int: if 'job_counter' not in root: root['job_counter'] = 1 @@ -37,6 +52,25 @@ def get_new_job_id(root: PersistentMapping) -> int: return current class Job(persistent.Persistent): + @classmethod + def update_job_from_runner(cls, runner: Runner, db_root: PersistentMapping) -> True: + job = Job.get_job_by_id(runner.job_id, db_root) + if job is None: + logging.warning(f"Couldn't match runner {runner} with a job by id.") + return False + if not runner.is_finished(): + return False + job.finished_at = datetime.datetime.now() + job.output = runner.output + job.errors = runner.errors + job.return_code = runner.return_code + job._artifact_archive = runner._artifact_archive + if runner.status == RunnerStatus.SUCCESSFUL: + job.status = JobStatus.SUCCESSFUL + else: + job.status = JobStatus.FAILED + return True + @classmethod def get_job_by_id(cls, jid: int, db_root: PersistentMapping) -> Optional[Self]: if jid in db_root['jobs']: @@ -71,7 +105,7 @@ class Job(persistent.Persistent): def __init__(self, cmd: Union[list[str], str], owner: Optional[str] = None, timeout: int = 300): self.owner = None - if self.owner is not None: + if owner is not None: self.owner = str(owner).upper().strip() self.cmd = cmd self.created_at = datetime.datetime.now(datetime.UTC) @@ -121,8 +155,10 @@ class Job(persistent.Persistent): return artifacts[index][0], artifacts[index][1].read() def queue(self, db_root: PersistentMapping) -> int: + logging.debug(f"Attempting to queue job {self}") if self.owner is None or (str(self.owner).strip() == ""): raise ValueError("Job must have an owner to be queued.") + if self.id is None: self.id = get_new_job_id(db_root) owner = self.owner.upper().strip() @@ -149,7 +185,7 @@ class Job(persistent.Persistent): "errors": b'', "return_code": self.return_code, "artifacts": [], - "status": self.status, + "status": self.status.name, "id": self.id } if include_data: @@ -169,3 +205,92 @@ class Job(persistent.Persistent): def json(self, include_data: bool = True) -> str: return json.dumps(self.to_dict(include_data=include_data, binary_safe=True)) + +def handle_job_get_id(req: Request, conn: PacketServerConnection, db: ZODB.DB, jid: int): + username = ax25.Address(conn.remote_callsign).call.upper().strip() + value = "y" + include_data = True + for key in req.vars: + if key.lower().strip() == "data": + value = req.vars[key].lower().strip() + if value in no_values: + include_data = False + + with db.transaction() as storage: + try: + job = Job.get_job_by_id(jid, storage.root()) + if job is None: + send_blank_response(conn, req, 404) + return + if job.owner != username: + send_blank_response(conn, req, 401) + return + send_blank_response(conn, req, 200, job.to_dict(include_data=include_data)) + return + except: + logging.error(f"Error looking up job {jid}:\n{format_exc()}") + send_blank_response(conn, req, 500, payload="unknown server error") + +def handle_job_get_user(req: Request, conn: PacketServerConnection, db: ZODB.DB): + username = ax25.Address(conn.remote_callsign).call.upper().strip() + # TODO finish user job lookup + send_blank_response(conn, req, 404) + +def handle_job_get(req: Request, conn: PacketServerConnection, db: ZODB.DB): + spl = [x for x in req.path.split("/") if x.strip() != ""] + if (len(spl) == 2) and (spl[1].isdigit()): + handle_job_get_id(req, conn, db, int(spl[1])) + elif (len(spl) == 2) and (spl[1].lower() == "user"): + handle_job_get_user(req, conn, db) + else: + send_blank_response(conn, req, status_code=404) + +def handle_new_job_post(req: Request, conn: PacketServerConnection, db: ZODB.DB): + username = ax25.Address(conn.remote_callsign).call.upper().strip() + if 'cmd' not in req.payload: + logging.info(f"request {req} did not contain job command (cmd) key") + send_blank_response(conn, req, 401, "job post must contain cmd key containing str or list[str]") + return + if type(req.payload['cmd']) not in [str, list]: + send_blank_response(conn, req, 401, "job post must contain cmd key containing str or list[str]") + return + job = Job(req.payload['cmd'], owner=username) + with db.transaction() as storage: + try: + new_jid = job.queue(storage.root()) + logging.info(f"New job created with id {new_jid}") + except: + logging.error(f"Failed to queue new job {job}:\n{format_exc()}") + send_blank_response(conn, req, 500, "unknown server error while queuing job") + return + send_blank_response(conn, req, 201, {'job_id': new_jid}) + +def handle_job_post(req: Request, conn: PacketServerConnection, db: ZODB.DB): + spl = [x for x in req.path.split("/") if x.strip() != ""] + + if len(spl) == 1: + handle_new_job_post(req, conn, db) + else: + send_blank_response(conn, req, status_code=404) + +def job_root_handler(req: Request, conn: PacketServerConnection, db: ZODB.DB): + logging.debug(f"{req} being processed by job_root_handler") + if not user_authorized(conn, db): + logging.debug(f"user {conn.remote_callsign} not authorized") + send_blank_response(conn, req, status_code=401) + return + logging.debug("user is authorized") + with db.transaction() as storage: + if 'jobs_enabled' in storage.root.config: + jobs_enabled = storage.root.config['jobs_enabled'] + else: + jobs_enabled = False + if not jobs_enabled: + send_blank_response(conn, req, 400, payload="jobs not enabled on this server") + return + if req.method is Request.Method.GET: + handle_job_get(req, conn, db) + elif req.method is Request.Method.POST: + handle_job_post(req, conn, db) + else: + send_blank_response(conn, req, status_code=404) \ No newline at end of file diff --git a/src/packetserver/server/messages.py b/src/packetserver/server/messages.py index bd0b86e..97f6028 100644 --- a/src/packetserver/server/messages.py +++ b/src/packetserver/server/messages.py @@ -21,7 +21,7 @@ from traceback import format_exc from collections import namedtuple import re -since_regex = '''^message\/since\/(\d+)$''' +since_regex = """^message\\/since\\/(\\d+)$""" def mailbox_create(username: str, db_root: PersistentMapping): un = username.upper().strip() @@ -401,7 +401,7 @@ def handle_message_post(req: Request, conn: PacketServerConnection, db: ZODB.DB) send_blank_response(conn, req, status_code=201, payload={"successes": send_counter, "failed": failed}) def message_root_handler(req: Request, conn: PacketServerConnection, db: ZODB.DB): - logging.debug(f"{req} being processed by user_root_handler") + logging.debug(f"{req} being processed by message_root_handler") if not user_authorized(conn, db): logging.debug(f"user {conn.remote_callsign} not authorized") send_blank_response(conn, req, status_code=401) @@ -409,7 +409,7 @@ def message_root_handler(req: Request, conn: PacketServerConnection, db: ZODB.DB logging.debug("user is authorized") if req.method is Request.Method.GET: handle_message_get(req, conn, db) - if req.method is Request.Method.POST: + elif req.method is Request.Method.POST: handle_message_post(req, conn, db) else: send_blank_response(conn, req, status_code=404) diff --git a/src/packetserver/server/requests.py b/src/packetserver/server/requests.py index 153a917..39b5fb7 100644 --- a/src/packetserver/server/requests.py +++ b/src/packetserver/server/requests.py @@ -6,6 +6,7 @@ from .bulletin import bulletin_root_handler from .users import user_root_handler, user_authorized from .objects import object_root_handler from .messages import message_root_handler +from .jobs import job_root_handler import logging from typing import Union import ZODB @@ -57,7 +58,8 @@ standard_handlers = { "bulletin": bulletin_root_handler, "user": user_root_handler, "object": object_root_handler, - "message": message_root_handler + "message": message_root_handler, + "job": job_root_handler }