From bd97355378d0f82ed41735d8f895868f691b2440 Mon Sep 17 00:00:00 2001 From: Michael Woods Date: Wed, 5 Feb 2025 00:06:36 -0500 Subject: [PATCH] Filling in runners and podman stuff. --- src/packetserver/runner/__init__.py | 46 +++++++++++++++- src/packetserver/runner/podman.py | 82 ++++++++++++++++++++++++++++- src/packetserver/server/__init__.py | 24 ++++++--- 3 files changed, 142 insertions(+), 10 deletions(-) diff --git a/src/packetserver/runner/__init__.py b/src/packetserver/runner/__init__.py index f9bfe0b..f825f1e 100644 --- a/src/packetserver/runner/__init__.py +++ b/src/packetserver/runner/__init__.py @@ -13,6 +13,7 @@ class RunnerStatus(Enum): STOPPING = 5 SUCCESSFUL = 6 FAILED = 7 + TIMED_OUT = 8 class Runner: """Abstract class to take arguments and run a job and track the status and results.""" @@ -22,6 +23,7 @@ class Runner: self.username = username.strip().lower() self.args = args self.env = {} + self.started = datetime.datetime.now() if environment: for key in environment: self.env[key] = environment[key] @@ -33,12 +35,22 @@ class Runner: self.refresh_db = refresh_db self.created_at = datetime.datetime.now(datetime.UTC) + def is_finished(self): + if self.status in [RunnerStatus.TIMED_OUT, RunnerStatus.SUCCESSFUL, RunnerStatus.FAILED]: + return True + return False + def start(self): - raise RuntimeError("Attempting to start an abstract class.") + self.started = datetime.datetime.now() def stop(self): raise RuntimeError("Attempting to stop an abstract class.") + def heartbeat(self): + """Does any housekeeping while the underlying task is running. When the task is finished, + update status and do any cleanup activities.""" + pass + @property def output(self) -> str: raise RuntimeError("Attempting to interact with an abstract class.") @@ -62,10 +74,40 @@ class Orchestrator: self.runners = [] self.runner_lock = Lock() + def get_finished_runners(self) -> list[Runner]: + return [r for r in self.runners if r.is_finished()] + + def remove_runner(self, job_id: int): + runner_object = None + for r in self.runners: + if r.job_id == job_id: + runner_object = r + break + + if runner_object is not None: + self.runners.remove(runner_object) + + def get_runner_by_id(self, job_id: int) -> Optional[Runner]: + for r in self.runners: + if r.job_id == job_id: + return r + + def runners_available(self) -> bool: + """Abstract. True if a runner can be started. False, if queue is full or orchestrator not ready.""" + pass + def new_runner(self, username: str, args: Iterable[str], job_id: int, environment: Optional[dict] = None, timeout_secs: str = 300, refresh_db: bool = True, labels: Optional[list] = None) -> Runner: pass def manage_lifecycle(self): - """When called, starts any pending runners if queue allows, looks for finished runners and updates statuses.""" + """When called, updates runner statuses and performs any housekeeping.""" + pass + + def start(self): + """Do any setup and then be ready to operate""" + pass + + def stop(self): + """Do any cleanup needed.""" pass \ No newline at end of file diff --git a/src/packetserver/runner/podman.py b/src/packetserver/runner/podman.py index d23675e..6440d32 100644 --- a/src/packetserver/runner/podman.py +++ b/src/packetserver/runner/podman.py @@ -1,13 +1,14 @@ """Uses podman to run jobs in containers.""" -from . import Runner, Orchestrator +from . import Runner, Orchestrator, RunnerStatus from collections import namedtuple -from typing import Optional +from typing import Optional, Iterable import subprocess import podman import os import os.path import logging import ZODB +import datetime PodmanOptions = namedtuple("PodmanOptions", ["default_timeout", "max_timeout", "image_name", "max_active_jobs", "container_keepalive", "name_prefix"]) @@ -16,9 +17,17 @@ class PodmanRunner(Runner): def __init__(self, username): pass + def start(self): + super().start() + + def heartbeat(self): + pass + class PodmanOrchestrator(Orchestrator): def __init__(self, uri: Optional[str] = None, options: Optional[PodmanOptions] = None): super().__init__() + self.started = False + self.user_containers = {} if uri: self.uri = uri else: @@ -41,3 +50,72 @@ class PodmanOrchestrator(Orchestrator): return podman.PodmanClient(base_url=self.uri) def refresh_user_db(self, username: str, db: ZODB.DB): + pass + + def podman_start_user_container(self, username: str): + pass + + def podman_stop_user_container + + def podman_container_exists(self, container_name: str) -> bool: + return False + + def clean_orphaned_containers(self): + pass + + def get_container_name(self, username: str) -> str: + return self.opts.name_prefix + username.lower().strip() + + def touch_user_container(self, username: str): + self.user_containers[self.get_container_name(username)] = datetime.datetime.now() + + def start_user_container(self, username: str): + if not self.podman_container_exists(self.get_container_name(username)): + self.podman_start_user_container(username) + self.touch_user_container(username) + + def clean_containers(self): + """Checks running containers and stops them if they have been running too long.""" + for c in self.user_containers: + if (datetime.datetime.now() - self.user_containers[c]) > self.opts.container_keepalive: + # stop the container TODO + del self.user_containers[c] + + def runners_in_process(self) -> int: + count = 0 + for r in self.runners: + if not r.is_finished(): + count = count + 1 + return count + + def runners_available(self) -> bool: + if not self.started: + return False + + if self.runners_in_process() < self.opts.max_active_jobs: + return True + + return False + + def new_runner(self, username: str, args: Iterable[str], job_id: int, environment: Optional[dict] = None, + timeout_secs: str = 300, refresh_db: bool = True, labels: Optional[list] = None) -> Optional[Runner]: + with self.runner_lock: + if not self.runners_available(): + return None + pass + + def manage_lifecycle(self): + with self.runner_lock: + for r in self.runners: + if r.status is RunnerStatus.RUNNING: + r.heartbeat() + if not r.is_finished(): + self.touch_user_container(r.username) + self.clean_containers() + self.clean_orphaned_containers() + + def start(self): + self.started = True + + def stop(self): + self.started = False \ No newline at end of file diff --git a/src/packetserver/server/__init__.py b/src/packetserver/server/__init__.py index 3f5ab85..b38d86d 100644 --- a/src/packetserver/server/__init__.py +++ b/src/packetserver/server/__init__.py @@ -18,6 +18,7 @@ from msgpack.exceptions import OutOfData from typing import Callable, Self, Union from traceback import format_exc from os import linesep +from threading import Thread def init_bulletins(root: PersistentMapping): if 'bulletins' not in root: @@ -25,6 +26,7 @@ def init_bulletins(root: PersistentMapping): if 'bulletin_counter' not in root: root['bulletin_counter'] = 0 + class Server: def __init__(self, pe_server: str, port: int, server_callsign: str, data_dir: str = None, zeo: bool = True): if not ax25.Address.valid_call(server_callsign): @@ -37,6 +39,8 @@ class Server: self.zeo_stop = None self.zeo = zeo self.started = False + self.orchestrator = None + self.worker_thread = None if data_dir: data_path = Path(data_dir) else: @@ -84,7 +88,6 @@ class Server: self.db.close() self.storage.close() - @property def data_file(self) -> str: return str(Path(self.home_dir).joinpath('data.zopedb')) @@ -176,7 +179,18 @@ class Server: if not self.started: return # Add things to do here: - pass + if self.orchestrator is not None: + self.orchestrator.manage_lifecycle() + + def run_worker(self): + """Intended to be running as a thread.""" + logging.info("Starting worker thread.") + while self.started: + self.server_worker() + time.sleep(1) + + def __del__(self): + self.stop() def start_db(self): if not self.zeo: @@ -201,10 +215,7 @@ class Server: self.app.start(self.pe_server, self.pe_port) self.app.register_callsigns(self.callsign) self.started = True - while self.started: - self.server_worker() - time.sleep(5) - + self.worker_thread = Thread(target=self.run_worker) def exit_gracefully(self, signum, frame): self.stop() @@ -217,6 +228,7 @@ class Server: self.zeo_stop() def stop(self): + self.started = False cm = self.app._engine._active_handler._handlers[1]._connection_map for key in cm._connections.keys(): cm._connections[key].close()