From efa7e4e77f971bfaefc8776e7458b655706bf368 Mon Sep 17 00:00:00 2001 From: Michael Woods Date: Wed, 12 Feb 2025 21:13:14 -0500 Subject: [PATCH] more podman stuff. not done yet. --- src/packetserver/__init__.py | 1 + src/packetserver/common/util.py | 14 +++++ src/packetserver/runner/__init__.py | 62 +++++++++++++++++- src/packetserver/runner/constants.py | 25 ++++++++ src/packetserver/runner/podman.py | 94 ++++++++++++++++++++++++++-- src/packetserver/server/__init__.py | 7 ++- 6 files changed, 193 insertions(+), 10 deletions(-) create mode 100644 src/packetserver/runner/constants.py diff --git a/src/packetserver/__init__.py b/src/packetserver/__init__.py index e69de29..d5c8727 100644 --- a/src/packetserver/__init__.py +++ b/src/packetserver/__init__.py @@ -0,0 +1 @@ +VERSION="0.1-alpha" \ No newline at end of file diff --git a/src/packetserver/common/util.py b/src/packetserver/common/util.py index 14009c3..0f7cd6e 100644 --- a/src/packetserver/common/util.py +++ b/src/packetserver/common/util.py @@ -78,6 +78,20 @@ def bytes_to_tar_bytes(name: str, data: bytes) -> bytes: temp.seek(0) return temp.read() +def multi_bytes_to_tar_bytes(objects: dict) -> bytes: + """Creates a tar archive with a single file of name with bytes as the contents""" + with tempfile.TemporaryFile() as temp: + tar_obj = tarfile.TarFile(fileobj=temp, mode="w") + for name in objects: + data = bytes(objects[name]) + bio = BytesIO(data) + tar_info = tarfile.TarInfo(name=name) + tar_info.size = len(data) + tar_obj.addfile(tar_info, bio) + tar_obj.close() + temp.seek(0) + return temp.read() + def extract_tar_bytes(tarfile_bytes: bytes) -> Tuple[str, bytes]: """Takes the bytes of a tarfile, and returns the name and bytes of the first file in the archive.""" out_bytes = b'' diff --git a/src/packetserver/runner/__init__.py b/src/packetserver/runner/__init__.py index f825f1e..761c90b 100644 --- a/src/packetserver/runner/__init__.py +++ b/src/packetserver/runner/__init__.py @@ -4,6 +4,51 @@ from enum import Enum import datetime from uuid import UUID, uuid4 from threading import Lock +import os.path +from packetserver.runner.constants import job_setup_script, job_end_script, container_setup_script +from packetserver.common.util import multi_bytes_to_tar_bytes + + +def scripts_tar() -> bytes: + return multi_bytes_to_tar_bytes({ + 'job_setup_script.sh': job_setup_script, + 'job_end_script.sh': job_end_script, + 'container_setup_script.sh': container_setup_script + }) + +class RunnerFile: + def __init__(self, destination_path: str, source_path: str = None, data: bytes = b'', read_only: bool = False): + self._data = data + self._source_path = "" + + if source_path is not None: + if source_path.strip() != "": + if not os.path.isfile(source_path.strip()): + raise ValueError("Source Path must point to a file.") + self._source_path = source_path.strip() + + self.destination_path = destination_path.strip() + if self.destination_path == "": + raise ValueError("Destination path cannot be empty.") + if not os.path.isabs(self.destination_path): + raise ValueError("Destination path must be an absolute path.") + + self.is_read_only = read_only + + @property + def basename(self) -> str: + return os.path.basename(self.destination_path) + + @property + def dirname(self) -> str: + return os.path.dirname(self.destination_path) + + @property + def data(self) -> bytes: + if self._source_path == "": + return self._data + else: + return open(self._source_path, "rb").read() class RunnerStatus(Enum): CREATED = 1 @@ -18,7 +63,12 @@ class RunnerStatus(Enum): class Runner: """Abstract class to take arguments and run a job and track the status and results.""" def __init__(self, username: str, args: Iterable[str], job_id: int, environment: Optional[dict] = None, - timeout_secs: str = 300, refresh_db: bool = True, labels: Optional[list] = None): + timeout_secs: str = 300, refresh_db: bool = True, labels: Optional[list] = None, + files: list[RunnerFile] = None): + self.files = [] + if files is not None: + for f in files: + self.files.append(f) self.status = RunnerStatus.CREATED self.username = username.strip().lower() self.args = args @@ -35,11 +85,16 @@ class Runner: self.refresh_db = refresh_db self.created_at = datetime.datetime.now(datetime.UTC) - def is_finished(self): + def is_finished(self) -> bool: if self.status in [RunnerStatus.TIMED_OUT, RunnerStatus.SUCCESSFUL, RunnerStatus.FAILED]: return True return False + def is_in_process(self) -> bool: + if self.status in [RunnerStatus.QUEUED, RunnerStatus.RUNNING, RunnerStatus.STARTING, RunnerStatus.STOPPING]: + return True + return False + def start(self): self.started = datetime.datetime.now() @@ -97,7 +152,8 @@ class Orchestrator: pass def new_runner(self, username: str, args: Iterable[str], job_id: int, environment: Optional[dict] = None, - timeout_secs: str = 300, refresh_db: bool = True, labels: Optional[list] = None) -> Runner: + timeout_secs: str = 300, refresh_db: bool = True, labels: Optional[list] = None, + files: list[RunnerFile] = None) -> Runner: pass def manage_lifecycle(self): diff --git a/src/packetserver/runner/constants.py b/src/packetserver/runner/constants.py new file mode 100644 index 0000000..6d87601 --- /dev/null +++ b/src/packetserver/runner/constants.py @@ -0,0 +1,25 @@ +from packetserver.common.util import multi_bytes_to_tar_bytes + +container_setup_script = """#!/bin/bash +useradd -m -s /bin/bash "$PACKETSERVER_USER" -u 1000 +mkdir -p "/home/${PACKETSERVER_USER}/.packetserver/artifacts" +mkdir -p /artifact_output +chown -R $PACKETSERVER_USER "/home/$PACKETSERVER_USER" +""" + +job_setup_script = """#!/bin/bash +chmod 444 /user-db.json.gz +chown -R $PACKETSERVER_USER "/home/$PACKETSERVER_USER" +mkdir -p "/home/$PACKETSERVER_USER/.packetserver/artifacts/$PACKETSERVER_JOBID" +""" + +job_end_script = """#!/bin/bash +PACKETSERVER_ARTIFACT_DIR="/home/$PACKETSERVER_USER/.packetserver/artifacts/$PACKETSERVER_JOBID" +PACKETSERVER_ARTIFACT_TAR="/artifact_output/${PACKETSERVER_JOBID}.tar" +cwd=$(pwd) +if [ $(find "$PACKETSERVER_ARTIFACT_DIR" | wc -l) -gt "1" ]; then + cd $PACKETSERVER_ARTIFACT_DIR + tar -cf ${PACKETSERVER_ARTIFACT_TAR} . +fi +rm -rf ${PACKETSERVER_ARTIFACT_DIR} +""" diff --git a/src/packetserver/runner/podman.py b/src/packetserver/runner/podman.py index 6a269d0..f6e80f5 100644 --- a/src/packetserver/runner/podman.py +++ b/src/packetserver/runner/podman.py @@ -1,7 +1,7 @@ """Uses podman to run jobs in containers.""" import time -from . import Runner, Orchestrator, RunnerStatus +from . import Runner, Orchestrator, RunnerStatus, RunnerFile, scripts_tar from collections import namedtuple from typing import Optional, Iterable import subprocess @@ -14,6 +14,11 @@ import ZODB import datetime from os.path import basename, dirname from packetserver.common.util import bytes_to_tar_bytes, random_string +from packetserver import VERSION as packetserver_version +import re +from threading import Thread + +env_splitter_rex = '''([a-zA-Z0-9]+)=([a-zA-Z0-9]*)''' PodmanOptions = namedtuple("PodmanOptions", ["default_timeout", "max_timeout", "image_name", "max_active_jobs", "container_keepalive", "name_prefix"]) @@ -33,6 +38,7 @@ class PodmanOrchestrator(Orchestrator): super().__init__() self.started = False self.user_containers = {} + self.manager_thread = None if uri: self.uri = uri else: @@ -60,7 +66,41 @@ class PodmanOrchestrator(Orchestrator): def get_file_from_user_container(self, username: str, path: str) -> bytes : pass + def podman_container_env(self, container_name: str) -> dict: + cli = self.client + logging.debug(f"Attempting to remove container named {container_name}") + try: + con = cli.containers.get(container_name) + splitter = re.compile(env_splitter_rex) + env = {} + for i in con.inspect()['Config']['Env']: + m = splitter.match(i) + if m: + env[m.groups()[0]] = m.groups()[1] + return env + except podman.errors.exceptions.NotFound as e: + return + + def podman_container_version(self, container_name: str) -> str: + try: + env = self.podman_container_env(container_name) + except: + env = {} + return env.get("PACKETSERVER_VERSION", "0.0.0") + + def podman_user_container_env(self, username: str) -> dict: + container_name = self.get_container_name(username) + return self.podman_container_env(container_name) + + def podman_user_container_version(self, username: str) -> str: + container_name = self.get_container_name(username) + return self.podman_container_version(container_name) + def podman_start_user_container(self, username: str): + container_env = { + "PACKETSERVER_VERSION": packetserver_version, + "PACKETSERVER_USER": username.strip().lower() + } con = self.client.containers.create(self.opts.image_name, name=self.get_container_name(username), command=["tail", "-f", "/dev/null"]) con.start() @@ -77,6 +117,7 @@ class PodmanOrchestrator(Orchestrator): con.stop() con.remove() raise RuntimeError(f"Couldn't start container for user {username}") + self.touch_user_container(username) def podman_remove_container_name(self, container_name: str): cli = self.client @@ -110,6 +151,15 @@ class PodmanOrchestrator(Orchestrator): except podman.errors.exceptions.NotFound: return False + def podman_run_command_simple(self, username: str, command: Iterable[str], as_root: bool = True) -> int: + """Runs command defined by arguments iterable in container. As root by default. Returns exit code.""" + container_name = self.get_container_name(username) + un = username.lower().strip() + con = self.client.containers.get(container_name) + if as_root: + un = 'root' + return con.exec_run(list(command), user=un)[0] + def clean_orphaned_containers(self): cli = self.client for i in cli.containers.list(all=True): @@ -123,7 +173,7 @@ class PodmanOrchestrator(Orchestrator): self.user_containers[self.get_container_name(username)] = datetime.datetime.now() def start_user_container(self, username: str): - if not self.podman_container_exists(self.get_container_name(username)): + if not self.podman_user_container_exists(username): self.podman_start_user_container(username) self.touch_user_container(username) @@ -133,6 +183,23 @@ class PodmanOrchestrator(Orchestrator): if (datetime.datetime.now() - self.user_containers[c]) > self.opts.container_keepalive: self.podman_remove_container_name(c) del self.user_containers[c] + else: + if packetserver_version < self.podman_user_container_version(c): + + def user_runners_in_process(self, username: str) -> int: + un = username.strip().lower() + count = 0 + for r in self.runners: + if r.is_in_process: + if r.username == un: + count = count + 1 + return count + + def user_running(self, username: str) -> bool: + if self.user_runners_in_process(username) > 0: + return True + else: + return False def runners_in_process(self) -> int: count = 0 @@ -151,7 +218,8 @@ class PodmanOrchestrator(Orchestrator): return False def new_runner(self, username: str, args: Iterable[str], job_id: int, environment: Optional[dict] = None, - timeout_secs: str = 300, refresh_db: bool = True, labels: Optional[list] = None) -> Optional[Runner]: + timeout_secs: str = 300, refresh_db: bool = True, labels: Optional[list] = None, + files: list[RunnerFile] = None) -> Optional[Runner]: if not self.started: return None with self.runner_lock: @@ -160,6 +228,8 @@ class PodmanOrchestrator(Orchestrator): pass def manage_lifecycle(self): + if not self.started: + return with self.runner_lock: for r in self.runners: if r.status is RunnerStatus.RUNNING: @@ -169,8 +239,22 @@ class PodmanOrchestrator(Orchestrator): self.clean_containers() self.clean_orphaned_containers() + def manager(self): + logging.debug("Starting podman orchestrator thread.") + while self.started: + self.manage_lifecycle() + time.sleep(.5) + logging.debug("Stopping podman orchestrator thread.") + def start(self): - self.started = True + if not self.started: + self.clean_orphaned_containers() + self.started = True + self.manager_thread = Thread(target=self.manager) + self.manager_thread.start() def stop(self): - self.started = False \ No newline at end of file + self.started = False + if self.manager_thread is not None: + self.manager_thread.join(timeout=15) + self.manager_thread = None \ No newline at end of file diff --git a/src/packetserver/server/__init__.py b/src/packetserver/server/__init__.py index b38d86d..f277da3 100644 --- a/src/packetserver/server/__init__.py +++ b/src/packetserver/server/__init__.py @@ -179,8 +179,6 @@ class Server: if not self.started: return # Add things to do here: - if self.orchestrator is not None: - self.orchestrator.manage_lifecycle() def run_worker(self): """Intended to be running as a thread.""" @@ -215,7 +213,10 @@ class Server: self.app.start(self.pe_server, self.pe_port) self.app.register_callsigns(self.callsign) self.started = True + if self.orchestrator is not None: + self.orchestrator.start() self.worker_thread = Thread(target=self.run_worker) + self.worker_thread.start() def exit_gracefully(self, signum, frame): self.stop() @@ -232,6 +233,8 @@ class Server: cm = self.app._engine._active_handler._handlers[1]._connection_map for key in cm._connections.keys(): cm._connections[key].close() + if self.orchestrator is not None: + self.orchestrator.stop() self.app.stop() self.stop_db()