From d3e66f45b24ff00f858ab7389a358bbe80483937 Mon Sep 17 00:00:00 2001 From: Michael Woods Date: Sun, 28 Dec 2025 11:53:42 -0500 Subject: [PATCH] Made some changes to jobs system to make it faster to respond when jobs created and change status. --- packetserver/runner/__init__.py | 17 +++++++++++--- packetserver/runner/podman.py | 11 ++++++--- packetserver/server/__init__.py | 1 + packetserver/server/jobs.py | 41 ++++++++++++++++++++++++++++++--- packetserver/server/objects.py | 2 +- 5 files changed, 62 insertions(+), 10 deletions(-) diff --git a/packetserver/runner/__init__.py b/packetserver/runner/__init__.py index e6bc8ee..95ec17c 100644 --- a/packetserver/runner/__init__.py +++ b/packetserver/runner/__init__.py @@ -1,5 +1,5 @@ """Package runs arbitrary commands/jobs via different mechanisms.""" -from typing import Union,Optional,Iterable,Self +from typing import Union, Optional, Iterable, Self, Callable, List from enum import Enum import datetime from uuid import UUID, uuid4 @@ -73,8 +73,8 @@ class Runner: """Abstract class to take arguments and run a job and track the status and results.""" def __init__(self, username: str, args: Union[str, list[str]], job_id: int, environment: Optional[dict] = None, timeout_secs: str = 300, labels: Optional[list] = None, - files: list[RunnerFile] = None): - self.files = [] + files: list[RunnerFile] = None, notify_function: Callable = None): + self.files: List[RunnerFile] = [] if files is not None: for f in files: self.files.append(f) @@ -87,6 +87,7 @@ class Runner: self.finished_at = None self._result = (0,(b'', b'')) self._artifact_archive = b'' + self.notify_function = notify_function if environment: for key in environment: self.env[key] = environment[key] @@ -98,6 +99,10 @@ class Runner: self.timeout_seconds = timeout_secs self.created_at = datetime.datetime.now(datetime.UTC) + def notify(self): + if self.notify_function: + self.notify_function() + def __repr__(self): return f"<{type(self).__name__}: {self.username}[{self.job_id}] - {self.status.name}>" @@ -150,6 +155,7 @@ class Orchestrator: def __init__(self): self.runners = [] self.runner_lock = Lock() + self.listeners = [] def get_finished_runners(self) -> list[Runner]: return [r for r in self.runners if r.is_finished()] @@ -178,6 +184,11 @@ class Orchestrator: files: list[RunnerFile] = None) -> Runner: pass + def notify_listeners(self): + """If any runners change status, call all listener functions.""" + for func in self.listeners: + func() + def manage_lifecycle(self): """When called, updates runner statuses and performs any housekeeping.""" pass diff --git a/packetserver/runner/podman.py b/packetserver/runner/podman.py index b203f0d..397d2cc 100644 --- a/packetserver/runner/podman.py +++ b/packetserver/runner/podman.py @@ -1,5 +1,6 @@ """Uses podman to run jobs in containers.""" import time +from collections.abc import Callable from ZEO import client @@ -24,6 +25,7 @@ from packetserver import VERSION as packetserver_version import re from threading import Thread from io import BytesIO +from typing import Callable env_splitter_rex = '''([a-zA-Z0-9]+)=([a-zA-Z0-9]*)''' @@ -33,9 +35,9 @@ PodmanOptions = namedtuple("PodmanOptions", ["default_timeout", "max_timeout", " class PodmanRunner(Runner): def __init__(self, username: str, args: Union[str, list[str]], job_id: int, container: Container, environment: Optional[dict] = None, timeout_secs: str = 300, labels: Optional[list] = None, - files: list[RunnerFile] = None): + files: list[RunnerFile] = None, notify_function: Callable = None): super().__init__(username, args, job_id, environment=environment, timeout_secs=timeout_secs, - labels=labels, files=files) + labels=labels, files=files, notify_function=notify_function) self._artifact_archive = b'' if not container.inspect()['State']['Running']: raise ValueError(f"Container {container} is not in state Running.") @@ -48,6 +50,7 @@ class PodmanRunner(Runner): def thread_runner(self): self.status = RunnerStatus.RUNNING logging.debug(f"Thread for runner {self.job_id} started. Command for {(type(self.args))}:\n{self.args}") + self.notify() # run the exec call if type(self.args) is str: logging.debug(f"Running string: {self.args}") @@ -60,6 +63,7 @@ class PodmanRunner(Runner): logging.debug(str(res)) # cleanup housekeeping self.status = RunnerStatus.STOPPING + self.notify() self._result = res # run cleanup script logging.debug(f"Running cleanup script for {self.job_id}") @@ -87,6 +91,7 @@ class PodmanRunner(Runner): self.status = RunnerStatus.SUCCESSFUL else: self.status = RunnerStatus.FAILED + self.notify() @property def has_artifacts(self) -> bool: @@ -411,7 +416,7 @@ class PodmanOrchestrator(Orchestrator): self.touch_user_container(username) logging.debug(f"Queuing a runner on container {con}, with command '{args}' of type '{type(args)}'") runner = PodmanRunner(username, args, job_id, con, environment=environment, timeout_secs=timeout_secs, - labels=labels, files=files) + labels=labels, files=files, notify_function=lambda : self.notify_listeners()) self.runners.append(runner) runner.start() return runner diff --git a/packetserver/server/__init__.py b/packetserver/server/__init__.py index 9b80515..0eb6a4d 100644 --- a/packetserver/server/__init__.py +++ b/packetserver/server/__init__.py @@ -112,6 +112,7 @@ class Server: if val in ['podman']: logging.debug(f"Enabling {val} orchestrator") self.orchestrator = get_orchestrator_from_config(conn.root.config['jobs_config']) + self.orchestrator.listeners.append(lambda : self.ping_job_queue()) self.app = pe.app.Application() PacketServerConnection.receive_subscribers.append(lambda x: self.server_receiver(x)) diff --git a/packetserver/server/jobs.py b/packetserver/server/jobs.py index a5771be..40d7f05 100644 --- a/packetserver/server/jobs.py +++ b/packetserver/server/jobs.py @@ -5,15 +5,17 @@ import persistent import persistent.list from persistent.mapping import PersistentMapping import datetime -from typing import Self,Union,Optional,Tuple +from typing import Self,Union,Optional,Tuple,List from traceback import format_exc from packetserver.common import PacketServerConnection, Request, Response, Message, send_response, send_blank_response from packetserver.common.constants import no_values, yes_values from packetserver.server.db import get_user_db_json import ZODB +from ZODB.Connection import Connection from persistent.list import PersistentList import logging -from packetserver.server.users import user_authorized +from packetserver.server.users import user_authorized, User +from packetserver.server.objects import Object import gzip import tarfile import time @@ -23,6 +25,7 @@ from packetserver.runner import Orchestrator, Runner, RunnerStatus, RunnerFile from enum import Enum from io import BytesIO import base64 +from uuid import UUID class JobStatus(Enum): CREATED = 1 @@ -58,6 +61,33 @@ def get_new_job_id(root: PersistentMapping) -> int: root['job_counter'] = current + 1 return current +def add_object_to_file_list(object_id: Union[str,UUID], file_list: List[RunnerFile], username: str, conn: Connection): + if type(object_id) is str: + object_id = UUID(object_id) + + root = conn.root() + obj = Object.get_object_by_uuid(object_id, root) + if obj is None: + raise KeyError(f"Object '{object_id}' does not exist.") + if obj.private: + owner_uuid = obj.owner + owner = User.get_user_by_uuid(owner_uuid, root) + if not (owner.username.lower() == username.lower()): + raise PermissionError(f"Specified object {object_id} not public and not owned by user.") + unique_path = obj.name + runner_paths = [] + for i in file_list: + runner_paths.append(i.destination_path) + suffix = 1 + while unique_path not in runner_paths: + unique_path = obj.name + f"_{suffix}" + suffix = suffix + 1 + + rf = RunnerFile(unique_path,data=obj.data_bytes) + file_list.append(rf) + + + class Job(persistent.Persistent): @classmethod def update_job_from_runner(cls, runner: Runner, db_root: PersistentMapping) -> True: @@ -297,7 +327,7 @@ def handle_new_job_post(req: Request, conn: PacketServerConnection, db: ZODB.DB) if type(req.payload['cmd']) not in [str, list]: send_blank_response(conn, req, 401, "job post must contain cmd key containing str or list[str]") return - files = [] + files: List[RunnerFile] = [] if 'db' in req.payload: logging.debug(f"Fetching a user db as requested.") try: @@ -313,6 +343,11 @@ def handle_new_job_post(req: Request, conn: PacketServerConnection, db: ZODB.DB) val = req.payload['files'][key] if type(val) is bytes: files.append(RunnerFile(key, data=val)) + if 'objs' in req.payload: + if type(req.payload['objs']) is list: + with db.transaction() as db_connection: + for obj in req.payload['objs']: + add_object_to_file_list(obj, files, username, db_connection) env = {} if 'env' in req.payload: if type(req.payload['env']) is dict: diff --git a/packetserver/server/objects.py b/packetserver/server/objects.py index c4470aa..411958f 100644 --- a/packetserver/server/objects.py +++ b/packetserver/server/objects.py @@ -120,7 +120,7 @@ class Object(persistent.Persistent): raise KeyError(f"User '{un}' not found.") @classmethod - def get_object_by_uuid(cls, obj: UUID, db_root: PersistentMapping): + def get_object_by_uuid(cls, obj: UUID, db_root: PersistentMapping) -> Union[None,Self]: return db_root['objects'].get(obj) @classmethod