Starting to work on jobs and db exporting to json. There's a problem with message attachment packing happening..
This commit is contained in:
@@ -1,7 +1,71 @@
|
||||
"""Package runs arbitrary commands/jobs via different mechanisms."""
|
||||
from typing import Union,Optional,Iterable,Self
|
||||
from enum import Enum
|
||||
import datetime
|
||||
from uuid import UUID, uuid4
|
||||
from threading import Lock
|
||||
|
||||
class RunnerStatus(Enum):
|
||||
CREATED = 1
|
||||
QUEUED = 2
|
||||
STARTING = 3
|
||||
RUNNING = 4
|
||||
STOPPING = 5
|
||||
SUCCESSFUL = 6
|
||||
FAILED = 7
|
||||
|
||||
class Runner:
|
||||
"""Abstract class to take arguments and run a job and track the status and results."""
|
||||
def __init__(self, args: Iterable[str], ):
|
||||
def __init__(self, username: str, args: Iterable[str], job_id: int, environment: Optional[dict] = None,
|
||||
timeout_secs: str = 300, refresh_db: bool = True, labels: Optional[list] = None):
|
||||
self.status = RunnerStatus.CREATED
|
||||
self.username = username.strip().lower()
|
||||
self.args = args
|
||||
self.env = {}
|
||||
if environment:
|
||||
for key in environment:
|
||||
self.env[key] = environment[key]
|
||||
self.labels = []
|
||||
for l in labels:
|
||||
self.labels.append(l)
|
||||
|
||||
self.timeout_seconds = timeout_secs
|
||||
self.refresh_db = refresh_db
|
||||
self.created_at = datetime.datetime.now(datetime.UTC)
|
||||
|
||||
def start(self):
|
||||
raise RuntimeError("Attempting to start an abstract class.")
|
||||
|
||||
def stop(self):
|
||||
raise RuntimeError("Attempting to stop an abstract class.")
|
||||
|
||||
@property
|
||||
def output(self) -> str:
|
||||
raise RuntimeError("Attempting to interact with an abstract class.")
|
||||
|
||||
@property
|
||||
def errors(self) -> str:
|
||||
raise RuntimeError("Attempting to interact with an abstract class.")
|
||||
|
||||
@property
|
||||
def return_code(self) -> Optional[int]:
|
||||
raise RuntimeError("Attempting to interact with an abstract class.")
|
||||
|
||||
@property
|
||||
def artifacts(self) -> list:
|
||||
raise RuntimeError("Attempting to interact with an abstract class.")
|
||||
|
||||
class Orchestrator:
|
||||
"""Abstract class holds configuration and also tracks runners through their lifecycle. Prepares environments to
|
||||
run jobs in runners."""
|
||||
def __init__(self):
|
||||
self.runners = []
|
||||
self.runner_lock = Lock()
|
||||
|
||||
def new_runner(self, username: str, args: Iterable[str], job_id: int, environment: Optional[dict] = None,
|
||||
timeout_secs: str = 300, refresh_db: bool = True, labels: Optional[list] = None) -> Runner:
|
||||
pass
|
||||
|
||||
def manage_lifecycle(self):
|
||||
"""When called, starts any pending runners if queue allows, looks for finished runners and updates statuses."""
|
||||
pass
|
||||
@@ -1 +1,43 @@
|
||||
"""Uses podman to run jobs in containers."
|
||||
"""Uses podman to run jobs in containers."""
|
||||
from . import Runner, Orchestrator
|
||||
from collections import namedtuple
|
||||
from typing import Optional
|
||||
import subprocess
|
||||
import podman
|
||||
import os
|
||||
import os.path
|
||||
import logging
|
||||
import ZODB
|
||||
|
||||
PodmanOptions = namedtuple("PodmanOptions", ["default_timeout", "max_timeout", "image_name",
|
||||
"max_active_jobs", "container_keepalive", "name_prefix"])
|
||||
|
||||
class PodmanRunner(Runner):
|
||||
def __init__(self, username):
|
||||
pass
|
||||
|
||||
class PodmanOrchestrator(Orchestrator):
|
||||
def __init__(self, uri: Optional[str] = None, options: Optional[PodmanOptions] = None):
|
||||
super().__init__()
|
||||
if uri:
|
||||
self.uri = uri
|
||||
else:
|
||||
self.uri = f"unix:///run/user/{os.getuid()}/podman/podman.sock"
|
||||
|
||||
if not os.path.exists(self.uri):
|
||||
raise FileNotFoundError(f"Podman socket not found: {self.uri}")
|
||||
|
||||
logging.debug(f"Testing podman socket. Version: {self.client.version()}")
|
||||
|
||||
self.username_containers = {}
|
||||
if options:
|
||||
self.opts = options
|
||||
else:
|
||||
self.opts = PodmanOptions(default_timeout=300, max_timeout=3600, image_name="debian", max_active_jobs=5,
|
||||
container_keepalive=300, name_prefix="packetserver_")
|
||||
|
||||
@property
|
||||
def client(self):
|
||||
return podman.PodmanClient(base_url=self.uri)
|
||||
|
||||
def refresh_user_db(self, username: str, db: ZODB.DB):
|
||||
|
||||
41
src/packetserver/server/db.py
Normal file
41
src/packetserver/server/db.py
Normal file
@@ -0,0 +1,41 @@
|
||||
import ZODB
|
||||
import json
|
||||
import gzip
|
||||
import base64
|
||||
|
||||
def get_user_db(username: str, db: ZODB.DB) -> dict:
|
||||
udb = {
|
||||
"objects": {},
|
||||
"messages": [],
|
||||
"user": {},
|
||||
"bulletins": []
|
||||
}
|
||||
username = username.strip().upper()
|
||||
with db.transaction() as db_conn:
|
||||
user = db_conn.root.users[username]
|
||||
udb['user'] = user.to_safe_dict()
|
||||
for o in user.object_uuids:
|
||||
if type(o.data) is bytes:
|
||||
o.data = base64.b64encode(o.data).decode()
|
||||
else:
|
||||
o.data = base64.b64encode(o.data.encode()).decode()
|
||||
udb['objects'][o] = db_conn.root.objects[o].to_dict()
|
||||
for m in db_conn.root.messages[username]:
|
||||
for a in m.attachments:
|
||||
if type(a.data) is bytes:
|
||||
a.data = base64.b64encode(a.data).decode()
|
||||
else:
|
||||
a.data = base64.b64encode(a.data.encode()).decode()
|
||||
udb['messages'].append(m.to_dict())
|
||||
for b in db_conn.root.bulletins:
|
||||
udb['bulletins'].append(b.to_dict())
|
||||
|
||||
return udb
|
||||
|
||||
def get_user_db_json(username: str, db: ZODB.DB, gzip_output=True) -> bytes:
|
||||
udb = get_user_db(username, db)
|
||||
j = json.dumps(udb).encode()
|
||||
if gzip_output:
|
||||
return gzip.compress(j)
|
||||
else:
|
||||
return j
|
||||
0
src/packetserver/server/jobs.py
Normal file
0
src/packetserver/server/jobs.py
Normal file
Reference in New Issue
Block a user