Podman runner "might" be working and feature complete.

This commit is contained in:
Michael Woods
2025-02-14 14:57:49 -05:00
parent efa7e4e77f
commit acbfe96fc8
4 changed files with 324 additions and 74 deletions

View File

@@ -2,9 +2,9 @@ import re
import datetime
import tempfile
import tarfile
from typing import Union, Iterable, Tuple, Optional
from typing import Union, Iterable, Tuple, Optional, IO
import os.path
from io import BytesIO
from io import BytesIO, BufferedReader
import random
import string
@@ -78,6 +78,18 @@ def bytes_to_tar_bytes(name: str, data: bytes) -> bytes:
temp.seek(0)
return temp.read()
def bytes_tar_has_files(data: Union[bytes, IO]):
if type(data) is bytes:
bio = BytesIO(data)
else:
bio = data
tar_obj = tarfile.TarFile(fileobj=bio, mode="r")
files = [m for m in tar_obj.getmembers() if m.isfile()]
if len(files) > 0:
return True
else:
return False
def multi_bytes_to_tar_bytes(objects: dict) -> bytes:
"""Creates a tar archive with a single file of name <name> with <data> bytes as the contents"""
with tempfile.TemporaryFile() as temp:
@@ -107,3 +119,33 @@ def random_string(length=8) -> str:
rand_str = ''.join(random.choices(string.ascii_letters + string.digits, k=length))
return rand_str
class TarFileExtractor(object):
"""Generator created from file like object pointing to tar data"""
def __init__(self, fileobj: IO):
self.fileobj = fileobj
try:
self.tar_file = tarfile.TarFile(fileobj=self.fileobj)
self._raw_members = [m for m in self.tar_file.getmembers() if m.isfile()]
except:
self._raw_members = []
self._count = 0
def __iter__(self):
return self
# Python 3 compatibility
def __next__(self):
return self.next()
def next(self) -> Tuple[str, IO]:
if (self._count + 1) > len(self._raw_members):
raise StopIteration()
else:
member = self._raw_members[self._count]
name = member.name
if type(name) is bytes:
name = name.decode()
name = str(name)
self._count = self._count + 1
return os.path.basename(name), self.tar_file.extractfile(member)

View File

@@ -5,19 +5,20 @@ import datetime
from uuid import UUID, uuid4
from threading import Lock
import os.path
from packetserver.runner.constants import job_setup_script, job_end_script, container_setup_script
from packetserver.common.util import multi_bytes_to_tar_bytes
from packetserver.runner.constants import job_setup_script, job_end_script, container_setup_script, container_run_script
from packetserver.common.util import multi_bytes_to_tar_bytes, bytes_to_tar_bytes, TarFileExtractor
def scripts_tar() -> bytes:
return multi_bytes_to_tar_bytes({
'job_setup_script.sh': job_setup_script,
'job_end_script.sh': job_end_script,
'container_setup_script.sh': container_setup_script
'job_setup_script.sh': job_setup_script.encode(),
'job_end_script.sh': job_end_script.encode(),
'container_run_script.sh': container_run_script.encode(),
'container_setup_script.sh': container_setup_script.encode()
})
class RunnerFile:
def __init__(self, destination_path: str, source_path: str = None, data: bytes = b'', read_only: bool = False):
def __init__(self, destination_path: str, source_path: str = None, data: bytes = b'', root_owned: bool = False):
self._data = data
self._source_path = ""
@@ -30,10 +31,8 @@ class RunnerFile:
self.destination_path = destination_path.strip()
if self.destination_path == "":
raise ValueError("Destination path cannot be empty.")
if not os.path.isabs(self.destination_path):
raise ValueError("Destination path must be an absolute path.")
self.is_read_only = read_only
self.root_owned = root_owned
@property
def basename(self) -> str:
@@ -43,6 +42,10 @@ class RunnerFile:
def dirname(self) -> str:
return os.path.dirname(self.destination_path)
@property
def isabs(self) -> bool:
return os.path.isabs(self.destination_path)
@property
def data(self) -> bytes:
if self._source_path == "":
@@ -50,6 +53,9 @@ class RunnerFile:
else:
return open(self._source_path, "rb").read()
def tar_data(self) -> bytes:
return bytes_to_tar_bytes(self.basename, self.data)
class RunnerStatus(Enum):
CREATED = 1
QUEUED = 2
@@ -62,8 +68,8 @@ class RunnerStatus(Enum):
class Runner:
"""Abstract class to take arguments and run a job and track the status and results."""
def __init__(self, username: str, args: Iterable[str], job_id: int, environment: Optional[dict] = None,
timeout_secs: str = 300, refresh_db: bool = True, labels: Optional[list] = None,
def __init__(self, username: str, args: Union[str, list[str]], job_id: int, environment: Optional[dict] = None,
timeout_secs: str = 300, labels: Optional[list] = None,
files: list[RunnerFile] = None):
self.files = []
if files is not None:
@@ -72,19 +78,24 @@ class Runner:
self.status = RunnerStatus.CREATED
self.username = username.strip().lower()
self.args = args
self.job_id = int(job_id)
self.env = {}
self.started = datetime.datetime.now()
self._result = (0,(b'', b''))
if environment:
for key in environment:
self.env[key] = environment[key]
self.labels = []
if type(labels) is list:
for l in labels:
self.labels.append(l)
self.timeout_seconds = timeout_secs
self.refresh_db = refresh_db
self.created_at = datetime.datetime.now(datetime.UTC)
def __repr__(self):
return f"<{type(self).__name__}: {self.username}[{self.job_id}] - {self.status.name}>"
def is_finished(self) -> bool:
if self.status in [RunnerStatus.TIMED_OUT, RunnerStatus.SUCCESSFUL, RunnerStatus.FAILED]:
return True
@@ -101,13 +112,8 @@ class Runner:
def stop(self):
raise RuntimeError("Attempting to stop an abstract class.")
def heartbeat(self):
"""Does any housekeeping while the underlying task is running. When the task is finished,
update status and do any cleanup activities."""
pass
@property
def output(self) -> str:
def output(self) -> bytes:
raise RuntimeError("Attempting to interact with an abstract class.")
@property
@@ -119,7 +125,7 @@ class Runner:
raise RuntimeError("Attempting to interact with an abstract class.")
@property
def artifacts(self) -> list:
def artifacts(self) -> TarFileExtractor:
raise RuntimeError("Attempting to interact with an abstract class.")
class Orchestrator:

View File

@@ -1,25 +1,49 @@
from packetserver.common.util import multi_bytes_to_tar_bytes
container_setup_script = """#!/bin/bash
useradd -m -s /bin/bash "$PACKETSERVER_USER" -u 1000
mkdir -p "/home/${PACKETSERVER_USER}/.packetserver/artifacts"
mkdir -p /artifact_output
chown -R $PACKETSERVER_USER "/home/$PACKETSERVER_USER"
set -e
echo "Place holder for now."
"""
container_run_script = """#!/bin/bash
set -e
echo "Creating user ${PACKETSERVER_USER}"
useradd -m -s /bin/bash "${PACKETSERVER_USER}" -u 1000
echo "Creating directories."
mkdir -pv "/home/${PACKETSERVER_USER}/.packetserver"
mkdir -pv /artifact_output
chown -Rv ${PACKETSERVER_USER} "/home/${PACKETSERVER_USER}"
echo
echo "Looping. Waiting for /root/ENDNOW to exist before stopping."
while ! [ -f "/root/ENDNOW" ]; do
sleep 1
done
echo "Ending now.."
"""
job_setup_script = """#!/bin/bash
chmod 444 /user-db.json.gz
chown -R $PACKETSERVER_USER "/home/$PACKETSERVER_USER"
mkdir -p "/home/$PACKETSERVER_USER/.packetserver/artifacts/$PACKETSERVER_JOBID"
set -e
PACKETSERVER_JOB_DIR="/home/${PACKETSERVER_USER}/.packetserver/${PACKETSERVER_JOBID}"
mkdir -pv "${PACKETSERVER_JOB_DIR}/files"
mkdir -pv "${PACKETSERVER_JOB_DIR}/artifacts"
chown ${PACKETSERVER_USER} "/home/${PACKETSERVER_USER}"
chown -R ${PACKETSERVER_USER} "${PACKETSERVER_JOB_DIR}"
"""
job_end_script = """#!/bin/bash
PACKETSERVER_ARTIFACT_DIR="/home/$PACKETSERVER_USER/.packetserver/artifacts/$PACKETSERVER_JOBID"
PACKETSERVER_ARTIFACT_TAR="/artifact_output/${PACKETSERVER_JOBID}.tar"
cwd=$(pwd)
if [ $(find "$PACKETSERVER_ARTIFACT_DIR" | wc -l) -gt "1" ]; then
cd $PACKETSERVER_ARTIFACT_DIR
tar -cf ${PACKETSERVER_ARTIFACT_TAR} .
fi
rm -rf ${PACKETSERVER_ARTIFACT_DIR}
set -e
PACKETSERVER_JOB_DIR="/home/$PACKETSERVER_USER/.packetserver/${PACKETSERVER_JOBID}"
PACKETSERVER_ARTIFACT_DIR="${PACKETSERVER_JOB_DIR}/artifacts"
PACKETSERVER_ARTIFACT_TAR="/artifact_output/${PACKETSERVER_JOBID}.tar.gz"
tar -czvf "${PACKETSERVER_ARTIFACT_TAR}" -C "${PACKETSERVER_ARTIFACT_DIR}" .
rm -rfv "${PACKETSERVER_JOB_DIR}"
"""
podman_bash_start = """ echo 'waiting for /root/scripts/container_run_script.sh to exist'
while ! [ -f '/root/scripts/container_run_script.sh' ]; do
tail /dev/null
done
echo 'entering /root/scripts/container_run_script.sh ...'
bash /root/scripts/container_run_script.sh
"""
podman_run_command = ["bash", "-c", podman_bash_start]

View File

@@ -1,22 +1,29 @@
"""Uses podman to run jobs in containers."""
import time
from ZEO import client
from . import Runner, Orchestrator, RunnerStatus, RunnerFile, scripts_tar
from packetserver.runner.constants import podman_run_command
from urllib.parse import urlparse
from collections import namedtuple
from typing import Optional, Iterable
import subprocess
from typing import Optional, Iterable, Union
from traceback import format_exc
import podman
import gzip
from podman.domain.containers import Container
import podman.errors
import os
import os.path
import logging
import ZODB
import datetime
from os.path import basename, dirname
from packetserver.common.util import bytes_to_tar_bytes, random_string
from packetserver.common.util import bytes_to_tar_bytes, random_string, extract_tar_bytes, bytes_tar_has_files, \
TarFileExtractor
from packetserver import VERSION as packetserver_version
import re
from threading import Thread
from io import BytesIO
env_splitter_rex = '''([a-zA-Z0-9]+)=([a-zA-Z0-9]*)'''
@@ -24,14 +31,110 @@ PodmanOptions = namedtuple("PodmanOptions", ["default_timeout", "max_timeout", "
"max_active_jobs", "container_keepalive", "name_prefix"])
class PodmanRunner(Runner):
def __init__(self, username):
pass
def __init__(self, username: str, args: Union[str, list[str]], job_id: int, container: Container,
environment: Optional[dict] = None, timeout_secs: str = 300, labels: Optional[list] = None,
files: list[RunnerFile] = None):
super().__init__(username, args, job_id, environment=environment, timeout_secs=timeout_secs,
labels=labels, files=files)
self._artifact_archive = b''
if not container.inspect()['State']['Running']:
raise ValueError(f"Container {container} is not in state Running.")
self.container = container
self._thread = None
self.env['PACKETSERVER_JOBID'] = str(job_id)
self.job_path = os.path.join("/home", self.username, ".packetserver", str(job_id))
self.archive_path = os.path.join("/artifact_output", f"{str(job_id)}.tar.gz")
def thread_runner(self):
self.status = RunnerStatus.RUNNING
logging.debug(f"Thread for runner {self.job_id} started. Command for {(type(self.args))}:\n{self.args}")
# run the exec call
if type(self.args) is str:
logging.debug(f"Running string: {self.args}")
res = self.container.exec_run(cmd=self.args, environment=self.env, user=self.username, demux=True,
workdir=self.job_path)
else:
logging.debug(f"Running iterable: {list(self.args)}")
res = self.container.exec_run(cmd=list(self.args), environment=self.env, user=self.username, demux=True,
workdir=self.job_path)
logging.debug(str(res))
# cleanup housekeeping
self.status = RunnerStatus.STOPPING
self._result = res
# run cleanup script
logging.debug(f"Running cleanup script for {self.job_id}")
end_res = self.container.exec_run("bash /root/scripts/job_end_script.sh",
environment=self.env, user="root", tty=True)
logging.debug(f"End result: {end_res}")
if end_res[0] != 0:
logging.error(f"End Job script failed:\n{end_res[1].decode()}")
# collect any artifacts
try:
retrieved_tar_bytes = b''.join(self.container.get_archive(self.archive_path)[0])
art_tar_bytes = extract_tar_bytes(retrieved_tar_bytes)[1]
logging.debug(f"bytes retrieved: {retrieved_tar_bytes}")
if bytes_tar_has_files(gzip.GzipFile(fileobj=BytesIO(art_tar_bytes))):
logging.debug("found artifacts; attaching to runner object")
self._artifact_archive = art_tar_bytes
else:
logging.debug(f"no artifacts returned for job {self.job_id}")
except:
logging.warning(f"Error retrieving artifacts for {self.job_id}:\n{format_exc()}")
self._artifact_archive = b''
# set final status to FAILED or SUCCEEDED
if self.return_code == 0:
self.status = RunnerStatus.SUCCESSFUL
else:
self.status = RunnerStatus.FAILED
@property
def has_artifacts(self) -> bool:
if self._artifact_archive == b'':
return False
else:
return True
@property
def artifacts(self) -> TarFileExtractor:
return TarFileExtractor(gzip.GzipFile(fileobj=BytesIO(self._artifact_archive)))
@property
def output(self) -> bytes:
return self._result[1][0]
@property
def errors(self) -> str:
return self._result[1][1].decode()
@property
def return_code(self) -> int:
return self._result[0]
def start(self):
super().start()
logging.debug(f"Starting runner {self.job_id} for {self.username} with command:\n({type(self.args)}){self.args}")
self.status = RunnerStatus.STARTING
# Run job setup script
logging.debug(f"Running job setup script for {self.job_id} runner")
setup_res = self.container.exec_run("bash /root/scripts/job_setup_script.sh",
environment=self.env, user="root", tty=True)
logging.debug(f"{self.job_id} Setup script:\n{str(setup_res[1])}")
if setup_res[0] != 0:
self.status = RunnerStatus.FAILED
raise RuntimeError(f"Couldn't run setup scripts for {self.job}:\n{setup_res[1]}")
# put files where they need to be
for f in self.files:
logging.debug(f"Adding file {f}")
if not f.isabs:
f.destination_path = os.path.join(self.job_path, f.destination_path)
self.container.put_archive(f.dirname, f.tar_data())
if not f.root_owned:
self.container.exec_run(f"chown -R {self.username} {f.destination_path}")
def heartbeat(self):
pass
# start thread
logging.debug(f"Starting runner thread for {self.job_id}")
self._thread = Thread(target=self.thread_runner)
super().start()
self._thread.start()
class PodmanOrchestrator(Orchestrator):
def __init__(self, uri: Optional[str] = None, options: Optional[PodmanOptions] = None):
@@ -39,12 +142,14 @@ class PodmanOrchestrator(Orchestrator):
self.started = False
self.user_containers = {}
self.manager_thread = None
if uri:
self.uri = uri
else:
self.uri = f"unix:///run/user/{os.getuid()}/podman/podman.sock"
if not os.path.exists(self.uri):
uri_parsed = urlparse(self.uri)
if uri_parsed.scheme == "unix":
if not os.path.exists(uri_parsed.path):
raise FileNotFoundError(f"Podman socket not found: {self.uri}")
logging.debug(f"Testing podman socket. Version: {self.client.version()}")
@@ -60,11 +165,22 @@ class PodmanOrchestrator(Orchestrator):
def client(self):
return podman.PodmanClient(base_url=self.uri)
def add_file_to_user_container(self, username: str, data: bytes, path: str):
pass
def add_file_to_user_container(self, username: str, data: bytes, path: str, root_owned=False):
cli = self.client
file_dir = dirname(path)
tar_data_bytes = bytes_to_tar_bytes(basename(path), data)
con = cli.containers.get(self.get_container_name(username))
res = con.exec_run(cmd=["mkdir", "-p", file_dir], user="root")
if res[0] != 1:
raise RuntimeError("Couldn't create directory")
con.put_archive(file_dir, tar_data_bytes)
def get_file_from_user_container(self, username: str, path: str) -> bytes :
pass
cli = self.client
con = cli.containers.get(self.get_container_name(username))
tar_result = con.get_archive(path)
bytes_tar = b"".join(list(tar_result[0]))
return extract_tar_bytes(bytes_tar)[1]
def podman_container_env(self, container_name: str) -> dict:
cli = self.client
@@ -96,53 +212,83 @@ class PodmanOrchestrator(Orchestrator):
container_name = self.get_container_name(username)
return self.podman_container_version(container_name)
def podman_start_user_container(self, username: str):
def podman_start_user_container(self, username: str) -> Container:
container_env = {
"PACKETSERVER_VERSION": packetserver_version,
"PACKETSERVER_USER": username.strip().lower()
}
logging.debug(f"Starting user container for {username} with command {podman_run_command}")
con = self.client.containers.create(self.opts.image_name, name=self.get_container_name(username),
command=["tail", "-f", "/dev/null"])
command=podman_run_command,
environment=container_env, user="root")
con.start()
logging.debug(f"Container started for {username}")
started_at = datetime.datetime.now()
logging.debug(f"Container state: \n{con.inspect()['State']}")
while con.inspect()['State']['Status'] not in ['exited', 'running']:
logging.debug("Container state not in ['exited', 'running']")
now = datetime.datetime.now()
if (now - started_at).total_seconds() > 300:
con.stop()
con.remove()
raise RuntimeError(f"Couldn't start container for user {username}")
time.sleep(.1)
time.sleep(.5)
if con.inspect()['State']['Status'] != 'running':
logging.debug(f"Container for {username} isn't running. Cleaning it up.")
try:
con.stop()
except:
pass
try:
con.rename(f"{self.get_container_name(username)}_old")
con.remove()
except:
pass
raise RuntimeError(f"Couldn't start container for user {username}")
if not con.put_archive('/root/scripts', scripts_tar()):
con.stop()
con.remove()
raise RuntimeError(f"Couldn't start container for user {username}")
raise RuntimeError("Failed to upload job scripts to container.")
res = con.exec_run(cmd=["bash", "/root/scripts/container_setup_script.sh"], tty=True, user="root")
logging.debug(f"Container setup script run:\n{res[1].decode()}\nExit Code: {res[0]}")
if res[0] != 0:
logging.error(f"Container setup script failed:\n{res[1].decode()}\nExit Code: {res[0]}")
con.stop()
con.remove()
raise RuntimeError(f"Container setup script failed:\n{res[1].decode()}\nExit Code: {res[0]}")
self.touch_user_container(username)
return con
def podman_remove_container_name(self, container_name: str):
cli = self.client
logging.debug(f"Attempting to remove container named {container_name}")
try:
con = cli.containers.get(container_name)
if con.inspect()['State']['Status'] == 'running':
con.exec_run(cmd="touch /root/ENDNOW", user="root")
time.sleep(1)
except podman.errors.exceptions.NotFound as e:
logging.warning(f"Didn't find container named {container_name}")
return
try:
con.rename(f"{container_name}_{random_string()}")
except:
pass
logging.error(f"Couldn't rename container:\n{format_exc()}")
if con.inspect()['State']['Status'] != 'exited':
try:
con.stop()
con.stop(timeout=10)
except:
pass
logging.error(f"Couldn't stop container:\n{format_exc()}")
try:
con.remove()
except:
pass
logging.error(f"Couldn't remove container:\n{format_exc()}")
return
def podman_stop_user_container(self, username: str):
self.podman_remove_container_name(self.get_container_name(username))
if self.get_container_name(username) in self.user_containers:
del self.user_containers[self.get_container_name(username)]
def podman_user_container_exists(self, username: str) -> bool:
try:
@@ -164,27 +310,45 @@ class PodmanOrchestrator(Orchestrator):
cli = self.client
for i in cli.containers.list(all=True):
if self.opts.name_prefix in str(i.name):
if str(i.name) not in self.user_containers:
self.podman_remove_container_name(str(i.name))
def get_container_name(self, username: str) -> str:
return self.opts.name_prefix + username.lower().strip()
def get_username_from_container_name(self, container_name: str) -> str:
if not self.opts.name_prefix in container_name:
raise ValueError(f"{container_name} is not a user container")
return container_name.replace(self.opts.name_prefix, "")
def touch_user_container(self, username: str):
self.user_containers[self.get_container_name(username)] = datetime.datetime.now()
def start_user_container(self, username: str):
def start_user_container(self, username: str) -> Container:
if not self.podman_user_container_exists(username):
self.podman_start_user_container(username)
self.touch_user_container(username)
con = self.podman_start_user_container(username)
else:
con = self.client.containers.get(self.get_container_name(username))
return con
def clean_containers(self):
"""Checks running containers and stops them if they have been running too long."""
containers_to_clean = set()
for c in self.user_containers:
if (datetime.datetime.now() - self.user_containers[c]) > self.opts.container_keepalive:
if (datetime.datetime.now() - self.user_containers[c]).total_seconds() > self.opts.container_keepalive:
logging.debug(f"Container {c} no activity for {self.opts.container_keepalive} seconds. Clearing.")
containers_to_clean.add(c)
else:
if packetserver_version < self.podman_container_version(c):
logging.debug(f"Container {c} was created using older code version. Clearing.")
un = self.get_username_from_container_name(c)
if not self.user_running(un):
containers_to_clean.add(c)
for c in list(containers_to_clean):
self.podman_remove_container_name(c)
del self.user_containers[c]
else:
if packetserver_version < self.podman_user_container_version(c):
def user_runners_in_process(self, username: str) -> int:
un = username.strip().lower()
@@ -217,23 +381,31 @@ class PodmanOrchestrator(Orchestrator):
return False
def new_runner(self, username: str, args: Iterable[str], job_id: int, environment: Optional[dict] = None,
def new_runner(self, username: str, args: Union[str, list[str]], job_id: int, environment: Optional[dict] = None,
timeout_secs: str = 300, refresh_db: bool = True, labels: Optional[list] = None,
files: list[RunnerFile] = None) -> Optional[Runner]:
files: list[RunnerFile] = None) -> Optional[PodmanRunner]:
if not self.started:
logging.warning("Attempted to queue a runner when not started")
return None
with self.runner_lock:
if not self.runners_available():
logging.warning("Attempted to queue a runner when no runner slots available.")
return None
pass
con = self.start_user_container(username)
logging.debug(f"Started a container for {username} successfully.")
self.touch_user_container(username)
logging.debug(f"Queuing a runner on container {con}, with command '{args}' of type '{type(args)}'")
runner = PodmanRunner(username, args, job_id, con, environment=environment, timeout_secs=timeout_secs,
labels=labels, files=files)
self.runners.append(runner)
runner.start()
return runner
def manage_lifecycle(self):
if not self.started:
return
with self.runner_lock:
for r in self.runners:
if r.status is RunnerStatus.RUNNING:
r.heartbeat()
if not r.is_finished():
self.touch_user_container(r.username)
self.clean_containers()
@@ -254,7 +426,13 @@ class PodmanOrchestrator(Orchestrator):
self.manager_thread.start()
def stop(self):
logging.debug("Stopping podman orchestrator.")
self.started = False
cli = self.client
self.user_containers = {}
self.clean_orphaned_containers()
if self.manager_thread is not None:
logging.debug("Joining orchestrator manager thread.")
self.manager_thread.join(timeout=15)
logging.debug("Orchestrator manager thread stopped")
self.manager_thread = None