From 3c9b7d0c555f8ea76abc18bd246cf19a3ece5706 Mon Sep 17 00:00:00 2001 From: Michael Woods Date: Fri, 14 Feb 2025 22:53:43 -0500 Subject: [PATCH] Adding job type to server db. --- src/packetserver/__init__.py | 2 +- src/packetserver/runner/__init__.py | 16 ++- src/packetserver/runner/constants.py | 2 +- src/packetserver/runner/podman.py | 16 ++- src/packetserver/server/__init__.py | 9 +- src/packetserver/server/db.py | 5 +- src/packetserver/server/jobs.py | 171 +++++++++++++++++++++++++++ src/packetserver/server/requests.py | 7 +- 8 files changed, 219 insertions(+), 9 deletions(-) diff --git a/src/packetserver/__init__.py b/src/packetserver/__init__.py index d5c8727..bc58dac 100644 --- a/src/packetserver/__init__.py +++ b/src/packetserver/__init__.py @@ -1 +1 @@ -VERSION="0.1-alpha" \ No newline at end of file +VERSION="0.2-alpha" \ No newline at end of file diff --git a/src/packetserver/runner/__init__.py b/src/packetserver/runner/__init__.py index 1f18976..e95ae3d 100644 --- a/src/packetserver/runner/__init__.py +++ b/src/packetserver/runner/__init__.py @@ -83,7 +83,8 @@ class Runner: self.args = args self.job_id = int(job_id) self.env = {} - self.started = datetime.datetime.now() + self.started_at = datetime.datetime.now() + self.finished_at = None self._result = (0,(b'', b'')) if environment: for key in environment: @@ -119,8 +120,15 @@ class Runner: def output(self) -> bytes: raise RuntimeError("Attempting to interact with an abstract class.") + def output_str(self) -> str: + raise RuntimeError("Attempting to interact with an abstract class.") + @property - def errors(self) -> str: + def errors(self) -> bytes: + raise RuntimeError("Attempting to interact with an abstract class.") + + @property + def errors_str(self) -> str: raise RuntimeError("Attempting to interact with an abstract class.") @property @@ -131,6 +139,10 @@ class Runner: def artifacts(self) -> TarFileExtractor: raise RuntimeError("Attempting to interact with an abstract class.") + @property + def has_artifacts(self) -> bool: + raise RuntimeError("Abstract method called.") + class Orchestrator: """Abstract class holds configuration and also tracks runners through their lifecycle. Prepares environments to run jobs in runners.""" diff --git a/src/packetserver/runner/constants.py b/src/packetserver/runner/constants.py index 7d396b8..90add08 100644 --- a/src/packetserver/runner/constants.py +++ b/src/packetserver/runner/constants.py @@ -40,7 +40,7 @@ rm -rfv "${PACKETSERVER_JOB_DIR}" podman_bash_start = """ echo 'waiting for /root/scripts/container_run_script.sh to exist' while ! [ -f '/root/scripts/container_run_script.sh' ]; do - tail /dev/null + sleep .1 done echo 'entering /root/scripts/container_run_script.sh ...' bash /root/scripts/container_run_script.sh diff --git a/src/packetserver/runner/podman.py b/src/packetserver/runner/podman.py index 424450f..c769601 100644 --- a/src/packetserver/runner/podman.py +++ b/src/packetserver/runner/podman.py @@ -81,6 +81,7 @@ class PodmanRunner(Runner): except: logging.warning(f"Error retrieving artifacts for {self.job_id}:\n{format_exc()}") self._artifact_archive = b'' + self.finished_at = datetime.datetime.now() # set final status to FAILED or SUCCEEDED if self.return_code == 0: self.status = RunnerStatus.SUCCESSFUL @@ -96,14 +97,17 @@ class PodmanRunner(Runner): @property def artifacts(self) -> TarFileExtractor: - return TarFileExtractor(gzip.GzipFile(fileobj=BytesIO(self._artifact_archive))) + if self._artifact_archive == b'': + return TarFileExtractor(BytesIO(b'')) + else: + return TarFileExtractor(gzip.GzipFile(fileobj=BytesIO(self._artifact_archive))) @property def output(self) -> bytes: return self._result[1][0] @property - def str_output(self) -> str: + def output_str(self) -> str: try: output = self.output.decode() except: @@ -112,6 +116,10 @@ class PodmanRunner(Runner): @property def errors(self) -> str: + return self._result[1][1] + + @property + def errors_str(self) -> str: return self._result[1][1].decode() @property @@ -424,6 +432,10 @@ class PodmanOrchestrator(Orchestrator): self.manager_thread = Thread(target=self.manager) self.manager_thread.start() + def __del__(self): + if self.started: + self.stop() + def stop(self): logging.debug("Stopping podman orchestrator.") self.started = False diff --git a/src/packetserver/server/__init__.py b/src/packetserver/server/__init__.py index f277da3..ba70dc8 100644 --- a/src/packetserver/server/__init__.py +++ b/src/packetserver/server/__init__.py @@ -26,7 +26,6 @@ def init_bulletins(root: PersistentMapping): if 'bulletin_counter' not in root: root['bulletin_counter'] = 0 - class Server: def __init__(self, pe_server: str, port: int, server_callsign: str, data_dir: str = None, zeo: bool = True): if not ax25.Address.valid_call(server_callsign): @@ -79,6 +78,13 @@ class Server: if 'objects' not in conn.root(): logging.debug("objects bucket missing, creating") conn.root.objects = OOBTree() + if 'jobs' not in conn.root(): + logging.debug("jobss bucket missing, creating") + conn.root.jobs = OOBTree() + if 'job_queue' not in conn.root(): + conn.root.job_queue = PersistentList() + if 'user_jobs' not in conn.root(): + conn.root.user_jobs = PersistentMapping() init_bulletins(conn.root()) self.app = pe.app.Application() PacketServerConnection.receive_subscribers.append(lambda x: self.server_receiver(x)) @@ -179,6 +185,7 @@ class Server: if not self.started: return # Add things to do here: + # TODO Queue jobs if applicable. def run_worker(self): """Intended to be running as a thread.""" diff --git a/src/packetserver/server/db.py b/src/packetserver/server/db.py index d61257f..3e41c30 100644 --- a/src/packetserver/server/db.py +++ b/src/packetserver/server/db.py @@ -9,7 +9,8 @@ def get_user_db(username: str, db: ZODB.DB) -> dict: "objects": {}, "messages": [], "user": {}, - "bulletins": [] + "bulletins": [], + "jobs": [] } username = username.strip().upper() with db.transaction() as db_conn: @@ -31,6 +32,8 @@ def get_user_db(username: str, db: ZODB.DB) -> dict: for b in db_conn.root.bulletins: udb['bulletins'].append(b.to_dict()) + # TODO pack jobs into output + return udb def get_user_db_json(username: str, db: ZODB.DB, gzip_output=True) -> bytes: diff --git a/src/packetserver/server/jobs.py b/src/packetserver/server/jobs.py index e69de29..60c5103 100644 --- a/src/packetserver/server/jobs.py +++ b/src/packetserver/server/jobs.py @@ -0,0 +1,171 @@ +import ax25 +import persistent +import persistent.list +from persistent.mapping import PersistentMapping +import datetime +from typing import Self,Union,Optional,Tuple +from packetserver.common import PacketServerConnection, Request, Response, Message, send_response, send_blank_response +import ZODB +from persistent.list import PersistentList +import logging +from packetserver.server.users import user_authorized +import gzip +import tarfile +import json +from packetserver.runner.podman import TarFileExtractor +from enum import Enum +from io import BytesIO +import base64 + +class JobStatus(Enum): + CREATED = 1 + QUEUED = 2 + STARTING = 3 + RUNNING = 4 + STOPPING = 5 + SUCCESSFUL = 6 + FAILED = 7 + TIMED_OUT = 8 + +def get_new_job_id(root: PersistentMapping) -> int: + if 'job_counter' not in root: + root['job_counter'] = 1 + return 0 + else: + current = root['job_counter'] + root['job_counter'] = current + 1 + return current + +class Job(persistent.Persistent): + @classmethod + def get_job_by_id(cls, jid: int, db_root: PersistentMapping) -> Optional[Self]: + if jid in db_root['jobs']: + return db_root['jobs'][jid] + return None + + @classmethod + def get_jobs_by_username(cls, username:str, db_root: PersistentMapping) -> list[Self]: + un = username.strip().upper() + if un in db_root['user_jobs']: + l = [] + for j in db_root['user_jobs'][un]: + l.append(Job.get_job_by_id(j, db_root)) + return l + else: + return [] + + @classmethod + def num_jobs_queued(cls, db_root: PersistentMapping) -> int: + return len(db_root['job_queue']) + + @classmethod + def jobs_in_queue(cls, db_root: PersistentMapping) -> bool: + if Job.num_jobs_queued(db_root) > 0: + return True + else: + return False + + @classmethod + def get_next_queued_job(cls, db_root: PersistentMapping) -> Self: + return db_root['job_queue'][0] + + def __init__(self, cmd: Union[list[str], str], owner: Optional[str] = None, timeout: int = 300): + self.owner = None + if self.owner is not None: + self.owner = str(owner).upper().strip() + self.cmd = cmd + self.created_at = datetime.datetime.now(datetime.UTC) + self.started_at = None + self.finished_at = None + self._artifact_archive = b'' + self.output = b'' + self.errors = b'' + self.return_code = 0 + self.id = None + self.status = JobStatus.CREATED + + @property + def is_finished(self) -> bool: + if self.finished_at is None: + return False + else: + return True + + @property + def output_str(self) -> str: + return self.output.decode() + + @property + def errors_str(self) -> str: + return self.errors.decode() + + @property + def artifacts(self) -> TarFileExtractor: + if self._artifact_archive == b'': + return TarFileExtractor(BytesIO(b'')) + else: + return TarFileExtractor(gzip.GzipFile(fileobj=BytesIO(self._artifact_archive))) + + @property + def num_artifacts(self) -> int: + return len(list(self.artifacts)) + + def __repr__(self) -> str: + return f"" + + def artifact(self, index: int) -> Tuple[str, bytes]: + artifacts = list(self.artifacts) + if (index + 1) > len(artifacts): + raise IndexError(f"Index {index} out of bounds.") + else: + return artifacts[index][0], artifacts[index][1].read() + + def queue(self, db_root: PersistentMapping) -> int: + if self.owner is None or (str(self.owner).strip() == ""): + raise ValueError("Job must have an owner to be queued.") + if self.id is None: + self.id = get_new_job_id(db_root) + owner = self.owner.upper().strip() + if owner not in db_root['user_jobs']: + db_root['user_jobs'][owner] = PersistentList() + db_root['jobs'][self.id] = self + db_root['job_queue'].append(self.id) + return self.id + + def to_dict(self, include_data: bool = True, binary_safe: bool = False): + started_at = None + finished_at = None + if self.started_at is not None: + started_at = self.started_at.isoformat() + if self.finished_at is not None: + finished_at = self.finished_at.isoformat() + output = { + "cmd": self.cmd, + "owner": self.owner, + "created_at": self.created_at.isoformat(), + "started_at": started_at, + "finished_at": finished_at, + "output": b'', + "errors": b'', + "return_code": self.return_code, + "artifacts": [], + "status": self.status, + "id": self.id + } + if include_data: + if binary_safe: + output['output'] = base64.b64encode(self.output).decode() + output['errors'] = base64.b64encode(self.errors).decode() + else: + output['output'] = self.output + output['errors'] = self.errors + + for a in self.artifacts: + if binary_safe: + output['artifacts'].append((a[0], base64.b64encode(a[1].read()).decode())) + else: + output['artifacts'].append((a[0], a[1].read())) + return output + + def json(self, include_data: bool = True) -> str: + return json.dumps(self.to_dict(include_data=include_data, binary_safe=True)) diff --git a/src/packetserver/server/requests.py b/src/packetserver/server/requests.py index 3432be1..153a917 100644 --- a/src/packetserver/server/requests.py +++ b/src/packetserver/server/requests.py @@ -17,11 +17,14 @@ def handle_root_get(req: Request, conn: PacketServerConnection, response.compression = Message.CompressionType.BZIP2 operator = "" motd = "" + jobs_enabled = False with db.transaction() as storage: if 'motd' in storage.root.config: motd = storage.root.config['motd'] if 'operator' in storage.root.config: operator = storage.root.config['operator'] + if 'jobs_enabled' in storage.root.config: + jobs_enabled = storage.root.config['jobs_enabled'] logging.debug(f"Root handler retrieved config. {operator} - {motd}") logging.debug("Running user_authorized") if user_authorized(conn, db): @@ -32,8 +35,10 @@ def handle_root_get(req: Request, conn: PacketServerConnection, response.payload = { 'operator': operator, 'motd': motd, - 'user': user_message + 'user': user_message, + 'accepts_jobs': jobs_enabled } + logging.debug(f"Sending response {response}") send_response(conn, response, req) logging.debug("Sent reesponse.")