diff --git a/src/packetserver/common/util.py b/src/packetserver/common/util.py index 0f7cd6e..e9e7243 100644 --- a/src/packetserver/common/util.py +++ b/src/packetserver/common/util.py @@ -2,9 +2,9 @@ import re import datetime import tempfile import tarfile -from typing import Union, Iterable, Tuple, Optional +from typing import Union, Iterable, Tuple, Optional, IO import os.path -from io import BytesIO +from io import BytesIO, BufferedReader import random import string @@ -78,6 +78,18 @@ def bytes_to_tar_bytes(name: str, data: bytes) -> bytes: temp.seek(0) return temp.read() +def bytes_tar_has_files(data: Union[bytes, IO]): + if type(data) is bytes: + bio = BytesIO(data) + else: + bio = data + tar_obj = tarfile.TarFile(fileobj=bio, mode="r") + files = [m for m in tar_obj.getmembers() if m.isfile()] + if len(files) > 0: + return True + else: + return False + def multi_bytes_to_tar_bytes(objects: dict) -> bytes: """Creates a tar archive with a single file of name with bytes as the contents""" with tempfile.TemporaryFile() as temp: @@ -107,3 +119,33 @@ def random_string(length=8) -> str: rand_str = ''.join(random.choices(string.ascii_letters + string.digits, k=length)) return rand_str + +class TarFileExtractor(object): + """Generator created from file like object pointing to tar data""" + def __init__(self, fileobj: IO): + self.fileobj = fileobj + try: + self.tar_file = tarfile.TarFile(fileobj=self.fileobj) + self._raw_members = [m for m in self.tar_file.getmembers() if m.isfile()] + except: + self._raw_members = [] + self._count = 0 + + def __iter__(self): + return self + + # Python 3 compatibility + def __next__(self): + return self.next() + + def next(self) -> Tuple[str, IO]: + if (self._count + 1) > len(self._raw_members): + raise StopIteration() + else: + member = self._raw_members[self._count] + name = member.name + if type(name) is bytes: + name = name.decode() + name = str(name) + self._count = self._count + 1 + return os.path.basename(name), self.tar_file.extractfile(member) diff --git a/src/packetserver/runner/__init__.py b/src/packetserver/runner/__init__.py index 761c90b..8843897 100644 --- a/src/packetserver/runner/__init__.py +++ b/src/packetserver/runner/__init__.py @@ -5,19 +5,20 @@ import datetime from uuid import UUID, uuid4 from threading import Lock import os.path -from packetserver.runner.constants import job_setup_script, job_end_script, container_setup_script -from packetserver.common.util import multi_bytes_to_tar_bytes +from packetserver.runner.constants import job_setup_script, job_end_script, container_setup_script, container_run_script +from packetserver.common.util import multi_bytes_to_tar_bytes, bytes_to_tar_bytes, TarFileExtractor def scripts_tar() -> bytes: return multi_bytes_to_tar_bytes({ - 'job_setup_script.sh': job_setup_script, - 'job_end_script.sh': job_end_script, - 'container_setup_script.sh': container_setup_script + 'job_setup_script.sh': job_setup_script.encode(), + 'job_end_script.sh': job_end_script.encode(), + 'container_run_script.sh': container_run_script.encode(), + 'container_setup_script.sh': container_setup_script.encode() }) class RunnerFile: - def __init__(self, destination_path: str, source_path: str = None, data: bytes = b'', read_only: bool = False): + def __init__(self, destination_path: str, source_path: str = None, data: bytes = b'', root_owned: bool = False): self._data = data self._source_path = "" @@ -30,10 +31,8 @@ class RunnerFile: self.destination_path = destination_path.strip() if self.destination_path == "": raise ValueError("Destination path cannot be empty.") - if not os.path.isabs(self.destination_path): - raise ValueError("Destination path must be an absolute path.") - self.is_read_only = read_only + self.root_owned = root_owned @property def basename(self) -> str: @@ -43,6 +42,10 @@ class RunnerFile: def dirname(self) -> str: return os.path.dirname(self.destination_path) + @property + def isabs(self) -> bool: + return os.path.isabs(self.destination_path) + @property def data(self) -> bytes: if self._source_path == "": @@ -50,6 +53,9 @@ class RunnerFile: else: return open(self._source_path, "rb").read() + def tar_data(self) -> bytes: + return bytes_to_tar_bytes(self.basename, self.data) + class RunnerStatus(Enum): CREATED = 1 QUEUED = 2 @@ -62,8 +68,8 @@ class RunnerStatus(Enum): class Runner: """Abstract class to take arguments and run a job and track the status and results.""" - def __init__(self, username: str, args: Iterable[str], job_id: int, environment: Optional[dict] = None, - timeout_secs: str = 300, refresh_db: bool = True, labels: Optional[list] = None, + def __init__(self, username: str, args: Union[str, list[str]], job_id: int, environment: Optional[dict] = None, + timeout_secs: str = 300, labels: Optional[list] = None, files: list[RunnerFile] = None): self.files = [] if files is not None: @@ -72,19 +78,24 @@ class Runner: self.status = RunnerStatus.CREATED self.username = username.strip().lower() self.args = args + self.job_id = int(job_id) self.env = {} self.started = datetime.datetime.now() + self._result = (0,(b'', b'')) if environment: for key in environment: self.env[key] = environment[key] self.labels = [] - for l in labels: - self.labels.append(l) + if type(labels) is list: + for l in labels: + self.labels.append(l) self.timeout_seconds = timeout_secs - self.refresh_db = refresh_db self.created_at = datetime.datetime.now(datetime.UTC) + def __repr__(self): + return f"<{type(self).__name__}: {self.username}[{self.job_id}] - {self.status.name}>" + def is_finished(self) -> bool: if self.status in [RunnerStatus.TIMED_OUT, RunnerStatus.SUCCESSFUL, RunnerStatus.FAILED]: return True @@ -101,13 +112,8 @@ class Runner: def stop(self): raise RuntimeError("Attempting to stop an abstract class.") - def heartbeat(self): - """Does any housekeeping while the underlying task is running. When the task is finished, - update status and do any cleanup activities.""" - pass - @property - def output(self) -> str: + def output(self) -> bytes: raise RuntimeError("Attempting to interact with an abstract class.") @property @@ -119,7 +125,7 @@ class Runner: raise RuntimeError("Attempting to interact with an abstract class.") @property - def artifacts(self) -> list: + def artifacts(self) -> TarFileExtractor: raise RuntimeError("Attempting to interact with an abstract class.") class Orchestrator: diff --git a/src/packetserver/runner/constants.py b/src/packetserver/runner/constants.py index 6d87601..f6ee89e 100644 --- a/src/packetserver/runner/constants.py +++ b/src/packetserver/runner/constants.py @@ -1,25 +1,49 @@ from packetserver.common.util import multi_bytes_to_tar_bytes container_setup_script = """#!/bin/bash -useradd -m -s /bin/bash "$PACKETSERVER_USER" -u 1000 -mkdir -p "/home/${PACKETSERVER_USER}/.packetserver/artifacts" -mkdir -p /artifact_output -chown -R $PACKETSERVER_USER "/home/$PACKETSERVER_USER" +set -e +echo "Place holder for now." +""" + +container_run_script = """#!/bin/bash +set -e +echo "Creating user ${PACKETSERVER_USER}" +useradd -m -s /bin/bash "${PACKETSERVER_USER}" -u 1000 +echo "Creating directories." +mkdir -pv "/home/${PACKETSERVER_USER}/.packetserver" +mkdir -pv /artifact_output +chown -Rv ${PACKETSERVER_USER} "/home/${PACKETSERVER_USER}" +echo +echo "Looping. Waiting for /root/ENDNOW to exist before stopping." +while ! [ -f "/root/ENDNOW" ]; do + sleep 1 +done +echo "Ending now.." """ job_setup_script = """#!/bin/bash -chmod 444 /user-db.json.gz -chown -R $PACKETSERVER_USER "/home/$PACKETSERVER_USER" -mkdir -p "/home/$PACKETSERVER_USER/.packetserver/artifacts/$PACKETSERVER_JOBID" +set -e +PACKETSERVER_JOB_DIR="/home/${PACKETSERVER_USER}/.packetserver/${PACKETSERVER_JOBID}" +mkdir -pv "${PACKETSERVER_JOB_DIR}/files" +mkdir -pv "${PACKETSERVER_JOB_DIR}/artifacts" +chown ${PACKETSERVER_USER} "/home/${PACKETSERVER_USER}" +chown -R ${PACKETSERVER_USER} "${PACKETSERVER_JOB_DIR}" """ job_end_script = """#!/bin/bash -PACKETSERVER_ARTIFACT_DIR="/home/$PACKETSERVER_USER/.packetserver/artifacts/$PACKETSERVER_JOBID" -PACKETSERVER_ARTIFACT_TAR="/artifact_output/${PACKETSERVER_JOBID}.tar" -cwd=$(pwd) -if [ $(find "$PACKETSERVER_ARTIFACT_DIR" | wc -l) -gt "1" ]; then - cd $PACKETSERVER_ARTIFACT_DIR - tar -cf ${PACKETSERVER_ARTIFACT_TAR} . -fi -rm -rf ${PACKETSERVER_ARTIFACT_DIR} +set -e +PACKETSERVER_JOB_DIR="/home/$PACKETSERVER_USER/.packetserver/${PACKETSERVER_JOBID}" +PACKETSERVER_ARTIFACT_DIR="${PACKETSERVER_JOB_DIR}/artifacts" +PACKETSERVER_ARTIFACT_TAR="/artifact_output/${PACKETSERVER_JOBID}.tar.gz" +tar -czvf "${PACKETSERVER_ARTIFACT_TAR}" -C "${PACKETSERVER_ARTIFACT_DIR}" . +rm -rfv "${PACKETSERVER_JOB_DIR}" """ + +podman_bash_start = """ echo 'waiting for /root/scripts/container_run_script.sh to exist' +while ! [ -f '/root/scripts/container_run_script.sh' ]; do + tail /dev/null +done +echo 'entering /root/scripts/container_run_script.sh ...' +bash /root/scripts/container_run_script.sh +""" +podman_run_command = ["bash", "-c", podman_bash_start] \ No newline at end of file diff --git a/src/packetserver/runner/podman.py b/src/packetserver/runner/podman.py index f6e80f5..3a6665d 100644 --- a/src/packetserver/runner/podman.py +++ b/src/packetserver/runner/podman.py @@ -1,22 +1,29 @@ """Uses podman to run jobs in containers.""" import time +from ZEO import client + from . import Runner, Orchestrator, RunnerStatus, RunnerFile, scripts_tar +from packetserver.runner.constants import podman_run_command +from urllib.parse import urlparse from collections import namedtuple -from typing import Optional, Iterable -import subprocess +from typing import Optional, Iterable, Union +from traceback import format_exc import podman +import gzip +from podman.domain.containers import Container import podman.errors import os import os.path import logging -import ZODB import datetime from os.path import basename, dirname -from packetserver.common.util import bytes_to_tar_bytes, random_string +from packetserver.common.util import bytes_to_tar_bytes, random_string, extract_tar_bytes, bytes_tar_has_files, \ + TarFileExtractor from packetserver import VERSION as packetserver_version import re from threading import Thread +from io import BytesIO env_splitter_rex = '''([a-zA-Z0-9]+)=([a-zA-Z0-9]*)''' @@ -24,14 +31,110 @@ PodmanOptions = namedtuple("PodmanOptions", ["default_timeout", "max_timeout", " "max_active_jobs", "container_keepalive", "name_prefix"]) class PodmanRunner(Runner): - def __init__(self, username): - pass + def __init__(self, username: str, args: Union[str, list[str]], job_id: int, container: Container, + environment: Optional[dict] = None, timeout_secs: str = 300, labels: Optional[list] = None, + files: list[RunnerFile] = None): + super().__init__(username, args, job_id, environment=environment, timeout_secs=timeout_secs, + labels=labels, files=files) + self._artifact_archive = b'' + if not container.inspect()['State']['Running']: + raise ValueError(f"Container {container} is not in state Running.") + self.container = container + self._thread = None + self.env['PACKETSERVER_JOBID'] = str(job_id) + self.job_path = os.path.join("/home", self.username, ".packetserver", str(job_id)) + self.archive_path = os.path.join("/artifact_output", f"{str(job_id)}.tar.gz") + + def thread_runner(self): + self.status = RunnerStatus.RUNNING + logging.debug(f"Thread for runner {self.job_id} started. Command for {(type(self.args))}:\n{self.args}") + # run the exec call + if type(self.args) is str: + logging.debug(f"Running string: {self.args}") + res = self.container.exec_run(cmd=self.args, environment=self.env, user=self.username, demux=True, + workdir=self.job_path) + else: + logging.debug(f"Running iterable: {list(self.args)}") + res = self.container.exec_run(cmd=list(self.args), environment=self.env, user=self.username, demux=True, + workdir=self.job_path) + logging.debug(str(res)) + # cleanup housekeeping + self.status = RunnerStatus.STOPPING + self._result = res + # run cleanup script + logging.debug(f"Running cleanup script for {self.job_id}") + end_res = self.container.exec_run("bash /root/scripts/job_end_script.sh", + environment=self.env, user="root", tty=True) + logging.debug(f"End result: {end_res}") + if end_res[0] != 0: + logging.error(f"End Job script failed:\n{end_res[1].decode()}") + # collect any artifacts + try: + retrieved_tar_bytes = b''.join(self.container.get_archive(self.archive_path)[0]) + art_tar_bytes = extract_tar_bytes(retrieved_tar_bytes)[1] + logging.debug(f"bytes retrieved: {retrieved_tar_bytes}") + if bytes_tar_has_files(gzip.GzipFile(fileobj=BytesIO(art_tar_bytes))): + logging.debug("found artifacts; attaching to runner object") + self._artifact_archive = art_tar_bytes + else: + logging.debug(f"no artifacts returned for job {self.job_id}") + except: + logging.warning(f"Error retrieving artifacts for {self.job_id}:\n{format_exc()}") + self._artifact_archive = b'' + # set final status to FAILED or SUCCEEDED + if self.return_code == 0: + self.status = RunnerStatus.SUCCESSFUL + else: + self.status = RunnerStatus.FAILED + + @property + def has_artifacts(self) -> bool: + if self._artifact_archive == b'': + return False + else: + return True + + @property + def artifacts(self) -> TarFileExtractor: + return TarFileExtractor(gzip.GzipFile(fileobj=BytesIO(self._artifact_archive))) + + @property + def output(self) -> bytes: + return self._result[1][0] + + @property + def errors(self) -> str: + return self._result[1][1].decode() + + @property + def return_code(self) -> int: + return self._result[0] def start(self): - super().start() + logging.debug(f"Starting runner {self.job_id} for {self.username} with command:\n({type(self.args)}){self.args}") + self.status = RunnerStatus.STARTING + # Run job setup script + logging.debug(f"Running job setup script for {self.job_id} runner") + setup_res = self.container.exec_run("bash /root/scripts/job_setup_script.sh", + environment=self.env, user="root", tty=True) + logging.debug(f"{self.job_id} Setup script:\n{str(setup_res[1])}") + if setup_res[0] != 0: + self.status = RunnerStatus.FAILED + raise RuntimeError(f"Couldn't run setup scripts for {self.job}:\n{setup_res[1]}") + # put files where they need to be + for f in self.files: + logging.debug(f"Adding file {f}") + if not f.isabs: + f.destination_path = os.path.join(self.job_path, f.destination_path) + self.container.put_archive(f.dirname, f.tar_data()) + if not f.root_owned: + self.container.exec_run(f"chown -R {self.username} {f.destination_path}") - def heartbeat(self): - pass + # start thread + logging.debug(f"Starting runner thread for {self.job_id}") + self._thread = Thread(target=self.thread_runner) + super().start() + self._thread.start() class PodmanOrchestrator(Orchestrator): def __init__(self, uri: Optional[str] = None, options: Optional[PodmanOptions] = None): @@ -39,13 +142,15 @@ class PodmanOrchestrator(Orchestrator): self.started = False self.user_containers = {} self.manager_thread = None + if uri: self.uri = uri else: self.uri = f"unix:///run/user/{os.getuid()}/podman/podman.sock" - - if not os.path.exists(self.uri): - raise FileNotFoundError(f"Podman socket not found: {self.uri}") + uri_parsed = urlparse(self.uri) + if uri_parsed.scheme == "unix": + if not os.path.exists(uri_parsed.path): + raise FileNotFoundError(f"Podman socket not found: {self.uri}") logging.debug(f"Testing podman socket. Version: {self.client.version()}") @@ -60,11 +165,22 @@ class PodmanOrchestrator(Orchestrator): def client(self): return podman.PodmanClient(base_url=self.uri) - def add_file_to_user_container(self, username: str, data: bytes, path: str): - pass + def add_file_to_user_container(self, username: str, data: bytes, path: str, root_owned=False): + cli = self.client + file_dir = dirname(path) + tar_data_bytes = bytes_to_tar_bytes(basename(path), data) + con = cli.containers.get(self.get_container_name(username)) + res = con.exec_run(cmd=["mkdir", "-p", file_dir], user="root") + if res[0] != 1: + raise RuntimeError("Couldn't create directory") + con.put_archive(file_dir, tar_data_bytes) def get_file_from_user_container(self, username: str, path: str) -> bytes : - pass + cli = self.client + con = cli.containers.get(self.get_container_name(username)) + tar_result = con.get_archive(path) + bytes_tar = b"".join(list(tar_result[0])) + return extract_tar_bytes(bytes_tar)[1] def podman_container_env(self, container_name: str) -> dict: cli = self.client @@ -96,53 +212,83 @@ class PodmanOrchestrator(Orchestrator): container_name = self.get_container_name(username) return self.podman_container_version(container_name) - def podman_start_user_container(self, username: str): + def podman_start_user_container(self, username: str) -> Container: container_env = { "PACKETSERVER_VERSION": packetserver_version, "PACKETSERVER_USER": username.strip().lower() } + logging.debug(f"Starting user container for {username} with command {podman_run_command}") con = self.client.containers.create(self.opts.image_name, name=self.get_container_name(username), - command=["tail", "-f", "/dev/null"]) + command=podman_run_command, + environment=container_env, user="root") con.start() + logging.debug(f"Container started for {username}") started_at = datetime.datetime.now() + logging.debug(f"Container state: \n{con.inspect()['State']}") while con.inspect()['State']['Status'] not in ['exited', 'running']: + logging.debug("Container state not in ['exited', 'running']") now = datetime.datetime.now() if (now - started_at).total_seconds() > 300: con.stop() con.remove() - raise RuntimeError(f"Couldn't start container for user {username}") time.sleep(.1) time.sleep(.5) if con.inspect()['State']['Status'] != 'running': + logging.debug(f"Container for {username} isn't running. Cleaning it up.") + try: + con.stop() + except: + pass + try: + con.rename(f"{self.get_container_name(username)}_old") + con.remove() + except: + pass + raise RuntimeError(f"Couldn't start container for user {username}") + if not con.put_archive('/root/scripts', scripts_tar()): con.stop() con.remove() - raise RuntimeError(f"Couldn't start container for user {username}") + raise RuntimeError("Failed to upload job scripts to container.") + res = con.exec_run(cmd=["bash", "/root/scripts/container_setup_script.sh"], tty=True, user="root") + logging.debug(f"Container setup script run:\n{res[1].decode()}\nExit Code: {res[0]}") + if res[0] != 0: + logging.error(f"Container setup script failed:\n{res[1].decode()}\nExit Code: {res[0]}") + con.stop() + con.remove() + raise RuntimeError(f"Container setup script failed:\n{res[1].decode()}\nExit Code: {res[0]}") self.touch_user_container(username) + return con def podman_remove_container_name(self, container_name: str): cli = self.client logging.debug(f"Attempting to remove container named {container_name}") try: con = cli.containers.get(container_name) + if con.inspect()['State']['Status'] == 'running': + con.exec_run(cmd="touch /root/ENDNOW", user="root") + time.sleep(1) except podman.errors.exceptions.NotFound as e: + logging.warning(f"Didn't find container named {container_name}") return try: con.rename(f"{container_name}_{random_string()}") except: - pass + logging.error(f"Couldn't rename container:\n{format_exc()}") if con.inspect()['State']['Status'] != 'exited': try: - con.stop() + con.stop(timeout=10) except: - pass + logging.error(f"Couldn't stop container:\n{format_exc()}") try: con.remove() except: - pass + logging.error(f"Couldn't remove container:\n{format_exc()}") return def podman_stop_user_container(self, username: str): self.podman_remove_container_name(self.get_container_name(username)) + if self.get_container_name(username) in self.user_containers: + del self.user_containers[self.get_container_name(username)] def podman_user_container_exists(self, username: str) -> bool: try: @@ -164,27 +310,45 @@ class PodmanOrchestrator(Orchestrator): cli = self.client for i in cli.containers.list(all=True): if self.opts.name_prefix in str(i.name): - self.podman_remove_container_name(str(i.name)) + if str(i.name) not in self.user_containers: + self.podman_remove_container_name(str(i.name)) def get_container_name(self, username: str) -> str: return self.opts.name_prefix + username.lower().strip() + def get_username_from_container_name(self, container_name: str) -> str: + if not self.opts.name_prefix in container_name: + raise ValueError(f"{container_name} is not a user container") + return container_name.replace(self.opts.name_prefix, "") + + def touch_user_container(self, username: str): self.user_containers[self.get_container_name(username)] = datetime.datetime.now() - def start_user_container(self, username: str): + def start_user_container(self, username: str) -> Container: if not self.podman_user_container_exists(username): - self.podman_start_user_container(username) - self.touch_user_container(username) + con = self.podman_start_user_container(username) + else: + con = self.client.containers.get(self.get_container_name(username)) + return con def clean_containers(self): """Checks running containers and stops them if they have been running too long.""" + containers_to_clean = set() for c in self.user_containers: - if (datetime.datetime.now() - self.user_containers[c]) > self.opts.container_keepalive: - self.podman_remove_container_name(c) - del self.user_containers[c] + if (datetime.datetime.now() - self.user_containers[c]).total_seconds() > self.opts.container_keepalive: + logging.debug(f"Container {c} no activity for {self.opts.container_keepalive} seconds. Clearing.") + containers_to_clean.add(c) else: - if packetserver_version < self.podman_user_container_version(c): + if packetserver_version < self.podman_container_version(c): + logging.debug(f"Container {c} was created using older code version. Clearing.") + un = self.get_username_from_container_name(c) + if not self.user_running(un): + containers_to_clean.add(c) + for c in list(containers_to_clean): + self.podman_remove_container_name(c) + del self.user_containers[c] + def user_runners_in_process(self, username: str) -> int: un = username.strip().lower() @@ -217,23 +381,31 @@ class PodmanOrchestrator(Orchestrator): return False - def new_runner(self, username: str, args: Iterable[str], job_id: int, environment: Optional[dict] = None, + def new_runner(self, username: str, args: Union[str, list[str]], job_id: int, environment: Optional[dict] = None, timeout_secs: str = 300, refresh_db: bool = True, labels: Optional[list] = None, - files: list[RunnerFile] = None) -> Optional[Runner]: + files: list[RunnerFile] = None) -> Optional[PodmanRunner]: if not self.started: + logging.warning("Attempted to queue a runner when not started") return None with self.runner_lock: if not self.runners_available(): + logging.warning("Attempted to queue a runner when no runner slots available.") return None - pass + con = self.start_user_container(username) + logging.debug(f"Started a container for {username} successfully.") + self.touch_user_container(username) + logging.debug(f"Queuing a runner on container {con}, with command '{args}' of type '{type(args)}'") + runner = PodmanRunner(username, args, job_id, con, environment=environment, timeout_secs=timeout_secs, + labels=labels, files=files) + self.runners.append(runner) + runner.start() + return runner def manage_lifecycle(self): if not self.started: return with self.runner_lock: for r in self.runners: - if r.status is RunnerStatus.RUNNING: - r.heartbeat() if not r.is_finished(): self.touch_user_container(r.username) self.clean_containers() @@ -254,7 +426,13 @@ class PodmanOrchestrator(Orchestrator): self.manager_thread.start() def stop(self): + logging.debug("Stopping podman orchestrator.") self.started = False + cli = self.client + self.user_containers = {} + self.clean_orphaned_containers() if self.manager_thread is not None: + logging.debug("Joining orchestrator manager thread.") self.manager_thread.join(timeout=15) + logging.debug("Orchestrator manager thread stopped") self.manager_thread = None \ No newline at end of file