more podman stuff. not done yet.
This commit is contained in:
@@ -0,0 +1 @@
|
||||
VERSION="0.1-alpha"
|
||||
@@ -78,6 +78,20 @@ def bytes_to_tar_bytes(name: str, data: bytes) -> bytes:
|
||||
temp.seek(0)
|
||||
return temp.read()
|
||||
|
||||
def multi_bytes_to_tar_bytes(objects: dict) -> bytes:
|
||||
"""Creates a tar archive with a single file of name <name> with <data> bytes as the contents"""
|
||||
with tempfile.TemporaryFile() as temp:
|
||||
tar_obj = tarfile.TarFile(fileobj=temp, mode="w")
|
||||
for name in objects:
|
||||
data = bytes(objects[name])
|
||||
bio = BytesIO(data)
|
||||
tar_info = tarfile.TarInfo(name=name)
|
||||
tar_info.size = len(data)
|
||||
tar_obj.addfile(tar_info, bio)
|
||||
tar_obj.close()
|
||||
temp.seek(0)
|
||||
return temp.read()
|
||||
|
||||
def extract_tar_bytes(tarfile_bytes: bytes) -> Tuple[str, bytes]:
|
||||
"""Takes the bytes of a tarfile, and returns the name and bytes of the first file in the archive."""
|
||||
out_bytes = b''
|
||||
|
||||
@@ -4,6 +4,51 @@ from enum import Enum
|
||||
import datetime
|
||||
from uuid import UUID, uuid4
|
||||
from threading import Lock
|
||||
import os.path
|
||||
from packetserver.runner.constants import job_setup_script, job_end_script, container_setup_script
|
||||
from packetserver.common.util import multi_bytes_to_tar_bytes
|
||||
|
||||
|
||||
def scripts_tar() -> bytes:
|
||||
return multi_bytes_to_tar_bytes({
|
||||
'job_setup_script.sh': job_setup_script,
|
||||
'job_end_script.sh': job_end_script,
|
||||
'container_setup_script.sh': container_setup_script
|
||||
})
|
||||
|
||||
class RunnerFile:
|
||||
def __init__(self, destination_path: str, source_path: str = None, data: bytes = b'', read_only: bool = False):
|
||||
self._data = data
|
||||
self._source_path = ""
|
||||
|
||||
if source_path is not None:
|
||||
if source_path.strip() != "":
|
||||
if not os.path.isfile(source_path.strip()):
|
||||
raise ValueError("Source Path must point to a file.")
|
||||
self._source_path = source_path.strip()
|
||||
|
||||
self.destination_path = destination_path.strip()
|
||||
if self.destination_path == "":
|
||||
raise ValueError("Destination path cannot be empty.")
|
||||
if not os.path.isabs(self.destination_path):
|
||||
raise ValueError("Destination path must be an absolute path.")
|
||||
|
||||
self.is_read_only = read_only
|
||||
|
||||
@property
|
||||
def basename(self) -> str:
|
||||
return os.path.basename(self.destination_path)
|
||||
|
||||
@property
|
||||
def dirname(self) -> str:
|
||||
return os.path.dirname(self.destination_path)
|
||||
|
||||
@property
|
||||
def data(self) -> bytes:
|
||||
if self._source_path == "":
|
||||
return self._data
|
||||
else:
|
||||
return open(self._source_path, "rb").read()
|
||||
|
||||
class RunnerStatus(Enum):
|
||||
CREATED = 1
|
||||
@@ -18,7 +63,12 @@ class RunnerStatus(Enum):
|
||||
class Runner:
|
||||
"""Abstract class to take arguments and run a job and track the status and results."""
|
||||
def __init__(self, username: str, args: Iterable[str], job_id: int, environment: Optional[dict] = None,
|
||||
timeout_secs: str = 300, refresh_db: bool = True, labels: Optional[list] = None):
|
||||
timeout_secs: str = 300, refresh_db: bool = True, labels: Optional[list] = None,
|
||||
files: list[RunnerFile] = None):
|
||||
self.files = []
|
||||
if files is not None:
|
||||
for f in files:
|
||||
self.files.append(f)
|
||||
self.status = RunnerStatus.CREATED
|
||||
self.username = username.strip().lower()
|
||||
self.args = args
|
||||
@@ -35,11 +85,16 @@ class Runner:
|
||||
self.refresh_db = refresh_db
|
||||
self.created_at = datetime.datetime.now(datetime.UTC)
|
||||
|
||||
def is_finished(self):
|
||||
def is_finished(self) -> bool:
|
||||
if self.status in [RunnerStatus.TIMED_OUT, RunnerStatus.SUCCESSFUL, RunnerStatus.FAILED]:
|
||||
return True
|
||||
return False
|
||||
|
||||
def is_in_process(self) -> bool:
|
||||
if self.status in [RunnerStatus.QUEUED, RunnerStatus.RUNNING, RunnerStatus.STARTING, RunnerStatus.STOPPING]:
|
||||
return True
|
||||
return False
|
||||
|
||||
def start(self):
|
||||
self.started = datetime.datetime.now()
|
||||
|
||||
@@ -97,7 +152,8 @@ class Orchestrator:
|
||||
pass
|
||||
|
||||
def new_runner(self, username: str, args: Iterable[str], job_id: int, environment: Optional[dict] = None,
|
||||
timeout_secs: str = 300, refresh_db: bool = True, labels: Optional[list] = None) -> Runner:
|
||||
timeout_secs: str = 300, refresh_db: bool = True, labels: Optional[list] = None,
|
||||
files: list[RunnerFile] = None) -> Runner:
|
||||
pass
|
||||
|
||||
def manage_lifecycle(self):
|
||||
|
||||
25
src/packetserver/runner/constants.py
Normal file
25
src/packetserver/runner/constants.py
Normal file
@@ -0,0 +1,25 @@
|
||||
from packetserver.common.util import multi_bytes_to_tar_bytes
|
||||
|
||||
container_setup_script = """#!/bin/bash
|
||||
useradd -m -s /bin/bash "$PACKETSERVER_USER" -u 1000
|
||||
mkdir -p "/home/${PACKETSERVER_USER}/.packetserver/artifacts"
|
||||
mkdir -p /artifact_output
|
||||
chown -R $PACKETSERVER_USER "/home/$PACKETSERVER_USER"
|
||||
"""
|
||||
|
||||
job_setup_script = """#!/bin/bash
|
||||
chmod 444 /user-db.json.gz
|
||||
chown -R $PACKETSERVER_USER "/home/$PACKETSERVER_USER"
|
||||
mkdir -p "/home/$PACKETSERVER_USER/.packetserver/artifacts/$PACKETSERVER_JOBID"
|
||||
"""
|
||||
|
||||
job_end_script = """#!/bin/bash
|
||||
PACKETSERVER_ARTIFACT_DIR="/home/$PACKETSERVER_USER/.packetserver/artifacts/$PACKETSERVER_JOBID"
|
||||
PACKETSERVER_ARTIFACT_TAR="/artifact_output/${PACKETSERVER_JOBID}.tar"
|
||||
cwd=$(pwd)
|
||||
if [ $(find "$PACKETSERVER_ARTIFACT_DIR" | wc -l) -gt "1" ]; then
|
||||
cd $PACKETSERVER_ARTIFACT_DIR
|
||||
tar -cf ${PACKETSERVER_ARTIFACT_TAR} .
|
||||
fi
|
||||
rm -rf ${PACKETSERVER_ARTIFACT_DIR}
|
||||
"""
|
||||
@@ -1,7 +1,7 @@
|
||||
"""Uses podman to run jobs in containers."""
|
||||
import time
|
||||
|
||||
from . import Runner, Orchestrator, RunnerStatus
|
||||
from . import Runner, Orchestrator, RunnerStatus, RunnerFile, scripts_tar
|
||||
from collections import namedtuple
|
||||
from typing import Optional, Iterable
|
||||
import subprocess
|
||||
@@ -14,6 +14,11 @@ import ZODB
|
||||
import datetime
|
||||
from os.path import basename, dirname
|
||||
from packetserver.common.util import bytes_to_tar_bytes, random_string
|
||||
from packetserver import VERSION as packetserver_version
|
||||
import re
|
||||
from threading import Thread
|
||||
|
||||
env_splitter_rex = '''([a-zA-Z0-9]+)=([a-zA-Z0-9]*)'''
|
||||
|
||||
PodmanOptions = namedtuple("PodmanOptions", ["default_timeout", "max_timeout", "image_name",
|
||||
"max_active_jobs", "container_keepalive", "name_prefix"])
|
||||
@@ -33,6 +38,7 @@ class PodmanOrchestrator(Orchestrator):
|
||||
super().__init__()
|
||||
self.started = False
|
||||
self.user_containers = {}
|
||||
self.manager_thread = None
|
||||
if uri:
|
||||
self.uri = uri
|
||||
else:
|
||||
@@ -60,7 +66,41 @@ class PodmanOrchestrator(Orchestrator):
|
||||
def get_file_from_user_container(self, username: str, path: str) -> bytes :
|
||||
pass
|
||||
|
||||
def podman_container_env(self, container_name: str) -> dict:
|
||||
cli = self.client
|
||||
logging.debug(f"Attempting to remove container named {container_name}")
|
||||
try:
|
||||
con = cli.containers.get(container_name)
|
||||
splitter = re.compile(env_splitter_rex)
|
||||
env = {}
|
||||
for i in con.inspect()['Config']['Env']:
|
||||
m = splitter.match(i)
|
||||
if m:
|
||||
env[m.groups()[0]] = m.groups()[1]
|
||||
return env
|
||||
except podman.errors.exceptions.NotFound as e:
|
||||
return
|
||||
|
||||
def podman_container_version(self, container_name: str) -> str:
|
||||
try:
|
||||
env = self.podman_container_env(container_name)
|
||||
except:
|
||||
env = {}
|
||||
return env.get("PACKETSERVER_VERSION", "0.0.0")
|
||||
|
||||
def podman_user_container_env(self, username: str) -> dict:
|
||||
container_name = self.get_container_name(username)
|
||||
return self.podman_container_env(container_name)
|
||||
|
||||
def podman_user_container_version(self, username: str) -> str:
|
||||
container_name = self.get_container_name(username)
|
||||
return self.podman_container_version(container_name)
|
||||
|
||||
def podman_start_user_container(self, username: str):
|
||||
container_env = {
|
||||
"PACKETSERVER_VERSION": packetserver_version,
|
||||
"PACKETSERVER_USER": username.strip().lower()
|
||||
}
|
||||
con = self.client.containers.create(self.opts.image_name, name=self.get_container_name(username),
|
||||
command=["tail", "-f", "/dev/null"])
|
||||
con.start()
|
||||
@@ -77,6 +117,7 @@ class PodmanOrchestrator(Orchestrator):
|
||||
con.stop()
|
||||
con.remove()
|
||||
raise RuntimeError(f"Couldn't start container for user {username}")
|
||||
self.touch_user_container(username)
|
||||
|
||||
def podman_remove_container_name(self, container_name: str):
|
||||
cli = self.client
|
||||
@@ -110,6 +151,15 @@ class PodmanOrchestrator(Orchestrator):
|
||||
except podman.errors.exceptions.NotFound:
|
||||
return False
|
||||
|
||||
def podman_run_command_simple(self, username: str, command: Iterable[str], as_root: bool = True) -> int:
|
||||
"""Runs command defined by arguments iterable in container. As root by default. Returns exit code."""
|
||||
container_name = self.get_container_name(username)
|
||||
un = username.lower().strip()
|
||||
con = self.client.containers.get(container_name)
|
||||
if as_root:
|
||||
un = 'root'
|
||||
return con.exec_run(list(command), user=un)[0]
|
||||
|
||||
def clean_orphaned_containers(self):
|
||||
cli = self.client
|
||||
for i in cli.containers.list(all=True):
|
||||
@@ -123,7 +173,7 @@ class PodmanOrchestrator(Orchestrator):
|
||||
self.user_containers[self.get_container_name(username)] = datetime.datetime.now()
|
||||
|
||||
def start_user_container(self, username: str):
|
||||
if not self.podman_container_exists(self.get_container_name(username)):
|
||||
if not self.podman_user_container_exists(username):
|
||||
self.podman_start_user_container(username)
|
||||
self.touch_user_container(username)
|
||||
|
||||
@@ -133,6 +183,23 @@ class PodmanOrchestrator(Orchestrator):
|
||||
if (datetime.datetime.now() - self.user_containers[c]) > self.opts.container_keepalive:
|
||||
self.podman_remove_container_name(c)
|
||||
del self.user_containers[c]
|
||||
else:
|
||||
if packetserver_version < self.podman_user_container_version(c):
|
||||
|
||||
def user_runners_in_process(self, username: str) -> int:
|
||||
un = username.strip().lower()
|
||||
count = 0
|
||||
for r in self.runners:
|
||||
if r.is_in_process:
|
||||
if r.username == un:
|
||||
count = count + 1
|
||||
return count
|
||||
|
||||
def user_running(self, username: str) -> bool:
|
||||
if self.user_runners_in_process(username) > 0:
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
def runners_in_process(self) -> int:
|
||||
count = 0
|
||||
@@ -151,7 +218,8 @@ class PodmanOrchestrator(Orchestrator):
|
||||
return False
|
||||
|
||||
def new_runner(self, username: str, args: Iterable[str], job_id: int, environment: Optional[dict] = None,
|
||||
timeout_secs: str = 300, refresh_db: bool = True, labels: Optional[list] = None) -> Optional[Runner]:
|
||||
timeout_secs: str = 300, refresh_db: bool = True, labels: Optional[list] = None,
|
||||
files: list[RunnerFile] = None) -> Optional[Runner]:
|
||||
if not self.started:
|
||||
return None
|
||||
with self.runner_lock:
|
||||
@@ -160,6 +228,8 @@ class PodmanOrchestrator(Orchestrator):
|
||||
pass
|
||||
|
||||
def manage_lifecycle(self):
|
||||
if not self.started:
|
||||
return
|
||||
with self.runner_lock:
|
||||
for r in self.runners:
|
||||
if r.status is RunnerStatus.RUNNING:
|
||||
@@ -169,8 +239,22 @@ class PodmanOrchestrator(Orchestrator):
|
||||
self.clean_containers()
|
||||
self.clean_orphaned_containers()
|
||||
|
||||
def manager(self):
|
||||
logging.debug("Starting podman orchestrator thread.")
|
||||
while self.started:
|
||||
self.manage_lifecycle()
|
||||
time.sleep(.5)
|
||||
logging.debug("Stopping podman orchestrator thread.")
|
||||
|
||||
def start(self):
|
||||
if not self.started:
|
||||
self.clean_orphaned_containers()
|
||||
self.started = True
|
||||
self.manager_thread = Thread(target=self.manager)
|
||||
self.manager_thread.start()
|
||||
|
||||
def stop(self):
|
||||
self.started = False
|
||||
if self.manager_thread is not None:
|
||||
self.manager_thread.join(timeout=15)
|
||||
self.manager_thread = None
|
||||
@@ -179,8 +179,6 @@ class Server:
|
||||
if not self.started:
|
||||
return
|
||||
# Add things to do here:
|
||||
if self.orchestrator is not None:
|
||||
self.orchestrator.manage_lifecycle()
|
||||
|
||||
def run_worker(self):
|
||||
"""Intended to be running as a thread."""
|
||||
@@ -215,7 +213,10 @@ class Server:
|
||||
self.app.start(self.pe_server, self.pe_port)
|
||||
self.app.register_callsigns(self.callsign)
|
||||
self.started = True
|
||||
if self.orchestrator is not None:
|
||||
self.orchestrator.start()
|
||||
self.worker_thread = Thread(target=self.run_worker)
|
||||
self.worker_thread.start()
|
||||
|
||||
def exit_gracefully(self, signum, frame):
|
||||
self.stop()
|
||||
@@ -232,6 +233,8 @@ class Server:
|
||||
cm = self.app._engine._active_handler._handlers[1]._connection_map
|
||||
for key in cm._connections.keys():
|
||||
cm._connections[key].close()
|
||||
if self.orchestrator is not None:
|
||||
self.orchestrator.stop()
|
||||
self.app.stop()
|
||||
self.stop_db()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user