Filling in runners and podman stuff.

This commit is contained in:
Michael Woods
2025-02-05 00:06:36 -05:00
parent 2bfa42f6da
commit bd97355378
3 changed files with 142 additions and 10 deletions

View File

@@ -13,6 +13,7 @@ class RunnerStatus(Enum):
STOPPING = 5
SUCCESSFUL = 6
FAILED = 7
TIMED_OUT = 8
class Runner:
"""Abstract class to take arguments and run a job and track the status and results."""
@@ -22,6 +23,7 @@ class Runner:
self.username = username.strip().lower()
self.args = args
self.env = {}
self.started = datetime.datetime.now()
if environment:
for key in environment:
self.env[key] = environment[key]
@@ -33,12 +35,22 @@ class Runner:
self.refresh_db = refresh_db
self.created_at = datetime.datetime.now(datetime.UTC)
def is_finished(self):
if self.status in [RunnerStatus.TIMED_OUT, RunnerStatus.SUCCESSFUL, RunnerStatus.FAILED]:
return True
return False
def start(self):
raise RuntimeError("Attempting to start an abstract class.")
self.started = datetime.datetime.now()
def stop(self):
raise RuntimeError("Attempting to stop an abstract class.")
def heartbeat(self):
"""Does any housekeeping while the underlying task is running. When the task is finished,
update status and do any cleanup activities."""
pass
@property
def output(self) -> str:
raise RuntimeError("Attempting to interact with an abstract class.")
@@ -62,10 +74,40 @@ class Orchestrator:
self.runners = []
self.runner_lock = Lock()
def get_finished_runners(self) -> list[Runner]:
return [r for r in self.runners if r.is_finished()]
def remove_runner(self, job_id: int):
runner_object = None
for r in self.runners:
if r.job_id == job_id:
runner_object = r
break
if runner_object is not None:
self.runners.remove(runner_object)
def get_runner_by_id(self, job_id: int) -> Optional[Runner]:
for r in self.runners:
if r.job_id == job_id:
return r
def runners_available(self) -> bool:
"""Abstract. True if a runner can be started. False, if queue is full or orchestrator not ready."""
pass
def new_runner(self, username: str, args: Iterable[str], job_id: int, environment: Optional[dict] = None,
timeout_secs: str = 300, refresh_db: bool = True, labels: Optional[list] = None) -> Runner:
pass
def manage_lifecycle(self):
"""When called, starts any pending runners if queue allows, looks for finished runners and updates statuses."""
"""When called, updates runner statuses and performs any housekeeping."""
pass
def start(self):
"""Do any setup and then be ready to operate"""
pass
def stop(self):
"""Do any cleanup needed."""
pass

View File

@@ -1,13 +1,14 @@
"""Uses podman to run jobs in containers."""
from . import Runner, Orchestrator
from . import Runner, Orchestrator, RunnerStatus
from collections import namedtuple
from typing import Optional
from typing import Optional, Iterable
import subprocess
import podman
import os
import os.path
import logging
import ZODB
import datetime
PodmanOptions = namedtuple("PodmanOptions", ["default_timeout", "max_timeout", "image_name",
"max_active_jobs", "container_keepalive", "name_prefix"])
@@ -16,9 +17,17 @@ class PodmanRunner(Runner):
def __init__(self, username):
pass
def start(self):
super().start()
def heartbeat(self):
pass
class PodmanOrchestrator(Orchestrator):
def __init__(self, uri: Optional[str] = None, options: Optional[PodmanOptions] = None):
super().__init__()
self.started = False
self.user_containers = {}
if uri:
self.uri = uri
else:
@@ -41,3 +50,72 @@ class PodmanOrchestrator(Orchestrator):
return podman.PodmanClient(base_url=self.uri)
def refresh_user_db(self, username: str, db: ZODB.DB):
pass
def podman_start_user_container(self, username: str):
pass
def podman_stop_user_container
def podman_container_exists(self, container_name: str) -> bool:
return False
def clean_orphaned_containers(self):
pass
def get_container_name(self, username: str) -> str:
return self.opts.name_prefix + username.lower().strip()
def touch_user_container(self, username: str):
self.user_containers[self.get_container_name(username)] = datetime.datetime.now()
def start_user_container(self, username: str):
if not self.podman_container_exists(self.get_container_name(username)):
self.podman_start_user_container(username)
self.touch_user_container(username)
def clean_containers(self):
"""Checks running containers and stops them if they have been running too long."""
for c in self.user_containers:
if (datetime.datetime.now() - self.user_containers[c]) > self.opts.container_keepalive:
# stop the container TODO
del self.user_containers[c]
def runners_in_process(self) -> int:
count = 0
for r in self.runners:
if not r.is_finished():
count = count + 1
return count
def runners_available(self) -> bool:
if not self.started:
return False
if self.runners_in_process() < self.opts.max_active_jobs:
return True
return False
def new_runner(self, username: str, args: Iterable[str], job_id: int, environment: Optional[dict] = None,
timeout_secs: str = 300, refresh_db: bool = True, labels: Optional[list] = None) -> Optional[Runner]:
with self.runner_lock:
if not self.runners_available():
return None
pass
def manage_lifecycle(self):
with self.runner_lock:
for r in self.runners:
if r.status is RunnerStatus.RUNNING:
r.heartbeat()
if not r.is_finished():
self.touch_user_container(r.username)
self.clean_containers()
self.clean_orphaned_containers()
def start(self):
self.started = True
def stop(self):
self.started = False

View File

@@ -18,6 +18,7 @@ from msgpack.exceptions import OutOfData
from typing import Callable, Self, Union
from traceback import format_exc
from os import linesep
from threading import Thread
def init_bulletins(root: PersistentMapping):
if 'bulletins' not in root:
@@ -25,6 +26,7 @@ def init_bulletins(root: PersistentMapping):
if 'bulletin_counter' not in root:
root['bulletin_counter'] = 0
class Server:
def __init__(self, pe_server: str, port: int, server_callsign: str, data_dir: str = None, zeo: bool = True):
if not ax25.Address.valid_call(server_callsign):
@@ -37,6 +39,8 @@ class Server:
self.zeo_stop = None
self.zeo = zeo
self.started = False
self.orchestrator = None
self.worker_thread = None
if data_dir:
data_path = Path(data_dir)
else:
@@ -84,7 +88,6 @@ class Server:
self.db.close()
self.storage.close()
@property
def data_file(self) -> str:
return str(Path(self.home_dir).joinpath('data.zopedb'))
@@ -176,7 +179,18 @@ class Server:
if not self.started:
return
# Add things to do here:
pass
if self.orchestrator is not None:
self.orchestrator.manage_lifecycle()
def run_worker(self):
"""Intended to be running as a thread."""
logging.info("Starting worker thread.")
while self.started:
self.server_worker()
time.sleep(1)
def __del__(self):
self.stop()
def start_db(self):
if not self.zeo:
@@ -201,10 +215,7 @@ class Server:
self.app.start(self.pe_server, self.pe_port)
self.app.register_callsigns(self.callsign)
self.started = True
while self.started:
self.server_worker()
time.sleep(5)
self.worker_thread = Thread(target=self.run_worker)
def exit_gracefully(self, signum, frame):
self.stop()
@@ -217,6 +228,7 @@ class Server:
self.zeo_stop()
def stop(self):
self.started = False
cm = self.app._engine._active_handler._handlers[1]._connection_map
for key in cm._connections.keys():
cm._connections[key].close()