Basic server job orchestration with Podman added. Can't look up jobs by anything except ID right now. Writing client

This commit is contained in:
Michael Woods
2025-02-15 14:18:24 -05:00
parent 3c9b7d0c55
commit 9bd1c66963
6 changed files with 288 additions and 10 deletions

View File

@@ -1,2 +1,87 @@
import datetime
import pe.app
from ZEO.asyncio.server import new_connection
from packetserver.common import Response, Message, Request, PacketServerConnection, send_response, send_blank_response, \
DummyPacketServerConnection
import ax25
import logging
import signal
import time
from msgpack.exceptions import OutOfData
from typing import Callable, Self, Union, Optional
from traceback import format_exc
from os import linesep
from shutil import rmtree
from threading import Thread
class Client:
pass
def __init__(self, pe_server: str, port: int, client_callsign: str):
if not ax25.Address.valid_call(client_callsign):
raise ValueError(f"Provided callsign '{client_callsign}' is invalid.")
self.pe_server = pe_server
self.pe_port = port
self.callsign = client_callsign
self.app = pe.app.Application()
self.started = False
signal.signal(signal.SIGINT, self.stop)
signal.signal(signal.SIGTERM, self.stop)
def stop(self):
self.started = False
self.clear_connections()
self.app.stop()
def start(self):
self.app.start(self.pe_server, self.pe_port)
self.app.register_callsigns(self.callsign)
self.started = True
def clear_connections(self):
for key in cm._connections.keys():
cm._connections[key].close()
def new_connection(self, dest: str):
if not self.started:
raise RuntimeError("Must start client before creating connections.")
if not ax25.Address.valid_call(dest):
raise ValueError(f"Provided destination callsign '{dest}' is invalid.")
return self.app.open_connection(0, self.callsign, dest)
def send_and_receive(self, req: Request, conn: PacketServerConnection, timeout: int = 300) -> Optional[Response]:
if conn.state.name != "CONNECTED":
raise RuntimeError("Connection is not connected.")
logging.debug(f"Sending request {req}")
conn.send_data(req.pack())
cutoff_date = datetime.datetime.now() + datetime.timedelta(seconds=timeout)
while datetime.datetime.now() < cutoff_date:
if conn.state.name != "CONNECTED":
logging.error(f"Connection {conn} disconnected.")
return None
try:
unpacked = conn.data.unpack()
except:
time.sleep(.1)
continue
msg = Message.partial_unpack(unpacked)
return Response(msg)
return None
def single_connect_send_receive(self, dest: str, req: Request, timeout: int = 300) -> Optional[Response]:
conn = self.new_connection(dest)
logging.debug("Waiting for connection to be ready.")
cutoff_date = datetime.datetime.now() + datetime.timedelta(seconds=timeout)
while (datetime.datetime.now() < cutoff_date) and (conn.state.name != "CONNECTED"):
if conn.state.name in ["DISCONNECTED", "DISCONNECTING"]:
logging.error(f"Connection {conn} disconnected.")
return None
logging.debug("Allowing connection to stabilize for 3 seconds")
time.sleep(3)
remaining_time = int((cutoff_date - datetime.datetime.now()).total_seconds()) + 1
if remaining_time <= 0:
logging.debug("Connection attempt timed out.")
conn.close()
return None
return self.send_and_receive(req, conn, timeout=int(remaining_time))

View File

@@ -86,6 +86,7 @@ class Runner:
self.started_at = datetime.datetime.now()
self.finished_at = None
self._result = (0,(b'', b''))
self._artifact_archive = b''
if environment:
for key in environment:
self.env[key] = environment[key]

View File

@@ -1,3 +1,6 @@
import datetime
import tempfile
import pe.app
from packetserver.common import Response, Message, Request, PacketServerConnection, send_response, send_blank_response, \
DummyPacketServerConnection
@@ -18,7 +21,10 @@ from msgpack.exceptions import OutOfData
from typing import Callable, Self, Union
from traceback import format_exc
from os import linesep
from shutil import rmtree
from threading import Thread
from packetserver.server.jobs import get_orchestrator_from_config, Job, JobStatus
from packetserver.runner import RunnerStatus, RunnerFile, Orchestrator, Runner
def init_bulletins(root: PersistentMapping):
if 'bulletins' not in root:
@@ -86,6 +92,13 @@ class Server:
if 'user_jobs' not in conn.root():
conn.root.user_jobs = PersistentMapping()
init_bulletins(conn.root())
if 'jobs_enabled' in conn.root.config:
if 'runner' in conn.root.config['jobs_config']:
val = str(conn.root.config['jobs_config']['runner']).lower().strip()
if val in ['podman']:
self.orchestrator = get_orchestrator_from_config(conn.root.config['jobs_config'])
self.app = pe.app.Application()
PacketServerConnection.receive_subscribers.append(lambda x: self.server_receiver(x))
PacketServerConnection.connection_subscribers.append(lambda x: self.server_connection_bouncer(x))
@@ -185,14 +198,54 @@ class Server:
if not self.started:
return
# Add things to do here:
# TODO Queue jobs if applicable.
if self.orchestrator.started:
with self.db.transaction() as storage:
# queue as many jobs as possible
while self.orchestrator.runners_available():
if len(storage.root.job_queue) > 0:
jid = storage.root.job_queue[0]
try:
logging.info(f"Starting job {jid}")
job = Job.get_job_by_id(jid, storage.root())
except:
logging.error(f"Error retrieving job {jid}")
break
runner = self.orchestrator.new_runner(job.owner, job.cmd, jid)
if runner is not None:
storage.root.job_queue.remove(jid)
job.status = JobStatus.RUNNING
job.started_at = datetime.datetime.now()
logging.info(f"Started job {job}")
else:
break
if self.orchestrator.started:
finished_runners = []
for runner in self.orchestrator.runners:
if runner.is_finished():
logging.debug(f"Finishing runner {runner}")
with self.db.transaction() as storage:
try:
if Job.update_job_from_runner(runner, storage.root()):
finished_runners.append(runner)
logging.info(f"Runner {runner} successfully synced with jobs.")
else:
logging.error(f"update_job_from_runner returned False.")
logging.error(f"Error while finishing runner and updating job status {runner}")
except:
logging.error(f"Error while finishing runner and updating job status {runner}\n:{format_exc()}")
for runner in finished_runners:
logging.info(f"Removing completed runner {runner}")
with self.orchestrator.runner_lock:
self.orchestrator.runners.remove(runner)
def run_worker(self):
"""Intended to be running as a thread."""
logging.info("Starting worker thread.")
while self.started:
self.server_worker()
time.sleep(1)
time.sleep(.5)
def __del__(self):
self.stop()
@@ -221,6 +274,7 @@ class Server:
self.app.register_callsigns(self.callsign)
self.started = True
if self.orchestrator is not None:
logging.info(f"Starting orchestrator {self.orchestrator}")
self.orchestrator.start()
self.worker_thread = Thread(target=self.run_worker)
self.worker_thread.start()
@@ -250,12 +304,23 @@ class TestServer(Server):
def __init__(self, server_callsign: str, data_dir: str = None, zeo: bool = True):
super().__init__('localhost', 8000, server_callsign, data_dir=data_dir, zeo=zeo)
self._data_pid = 1
self._file_traffic_dir = tempfile.mkdtemp()
self._file_traffic_thread = None
def start(self):
if self.orchestrator is not None:
self.orchestrator.start()
self.start_db()
self.started = True
self.worker_thread = Thread(target=self.run_worker)
self.worker_thread.start()
def stop(self):
self.started = False
if self.orchestrator is not None:
self.orchestrator.stop()
self.stop_db()
rmtree(self._file_traffic_dir)
def data_pid(self) -> int:
old = self._data_pid

View File

@@ -1,10 +1,14 @@
import re
import ax25
import persistent
import persistent.list
from persistent.mapping import PersistentMapping
import datetime
from typing import Self,Union,Optional,Tuple
from traceback import format_exc
from packetserver.common import PacketServerConnection, Request, Response, Message, send_response, send_blank_response
from packetserver.common.constants import no_values
import ZODB
from persistent.list import PersistentList
import logging
@@ -12,7 +16,8 @@ from packetserver.server.users import user_authorized
import gzip
import tarfile
import json
from packetserver.runner.podman import TarFileExtractor
from packetserver.runner.podman import TarFileExtractor, PodmanOrchestrator, PodmanRunner, PodmanOptions
from packetserver.runner import Orchestrator, Runner, RunnerStatus
from enum import Enum
from io import BytesIO
import base64
@@ -27,6 +32,16 @@ class JobStatus(Enum):
FAILED = 7
TIMED_OUT = 8
def get_orchestrator_from_config(cfg: dict) -> Union[Orchestrator, PodmanOrchestrator]:
if 'runner' in cfg:
val = cfg['runner'].lower().strip()
if val == "podman":
return PodmanOrchestrator()
else:
raise RuntimeError("Other orchestrators not implemented yet.")
else:
raise RuntimeError("Runners not configured in root.config.jobs_config")
def get_new_job_id(root: PersistentMapping) -> int:
if 'job_counter' not in root:
root['job_counter'] = 1
@@ -37,6 +52,25 @@ def get_new_job_id(root: PersistentMapping) -> int:
return current
class Job(persistent.Persistent):
@classmethod
def update_job_from_runner(cls, runner: Runner, db_root: PersistentMapping) -> True:
job = Job.get_job_by_id(runner.job_id, db_root)
if job is None:
logging.warning(f"Couldn't match runner {runner} with a job by id.")
return False
if not runner.is_finished():
return False
job.finished_at = datetime.datetime.now()
job.output = runner.output
job.errors = runner.errors
job.return_code = runner.return_code
job._artifact_archive = runner._artifact_archive
if runner.status == RunnerStatus.SUCCESSFUL:
job.status = JobStatus.SUCCESSFUL
else:
job.status = JobStatus.FAILED
return True
@classmethod
def get_job_by_id(cls, jid: int, db_root: PersistentMapping) -> Optional[Self]:
if jid in db_root['jobs']:
@@ -71,7 +105,7 @@ class Job(persistent.Persistent):
def __init__(self, cmd: Union[list[str], str], owner: Optional[str] = None, timeout: int = 300):
self.owner = None
if self.owner is not None:
if owner is not None:
self.owner = str(owner).upper().strip()
self.cmd = cmd
self.created_at = datetime.datetime.now(datetime.UTC)
@@ -121,8 +155,10 @@ class Job(persistent.Persistent):
return artifacts[index][0], artifacts[index][1].read()
def queue(self, db_root: PersistentMapping) -> int:
logging.debug(f"Attempting to queue job {self}")
if self.owner is None or (str(self.owner).strip() == ""):
raise ValueError("Job must have an owner to be queued.")
if self.id is None:
self.id = get_new_job_id(db_root)
owner = self.owner.upper().strip()
@@ -149,7 +185,7 @@ class Job(persistent.Persistent):
"errors": b'',
"return_code": self.return_code,
"artifacts": [],
"status": self.status,
"status": self.status.name,
"id": self.id
}
if include_data:
@@ -169,3 +205,92 @@ class Job(persistent.Persistent):
def json(self, include_data: bool = True) -> str:
return json.dumps(self.to_dict(include_data=include_data, binary_safe=True))
def handle_job_get_id(req: Request, conn: PacketServerConnection, db: ZODB.DB, jid: int):
username = ax25.Address(conn.remote_callsign).call.upper().strip()
value = "y"
include_data = True
for key in req.vars:
if key.lower().strip() == "data":
value = req.vars[key].lower().strip()
if value in no_values:
include_data = False
with db.transaction() as storage:
try:
job = Job.get_job_by_id(jid, storage.root())
if job is None:
send_blank_response(conn, req, 404)
return
if job.owner != username:
send_blank_response(conn, req, 401)
return
send_blank_response(conn, req, 200, job.to_dict(include_data=include_data))
return
except:
logging.error(f"Error looking up job {jid}:\n{format_exc()}")
send_blank_response(conn, req, 500, payload="unknown server error")
def handle_job_get_user(req: Request, conn: PacketServerConnection, db: ZODB.DB):
username = ax25.Address(conn.remote_callsign).call.upper().strip()
# TODO finish user job lookup
send_blank_response(conn, req, 404)
def handle_job_get(req: Request, conn: PacketServerConnection, db: ZODB.DB):
spl = [x for x in req.path.split("/") if x.strip() != ""]
if (len(spl) == 2) and (spl[1].isdigit()):
handle_job_get_id(req, conn, db, int(spl[1]))
elif (len(spl) == 2) and (spl[1].lower() == "user"):
handle_job_get_user(req, conn, db)
else:
send_blank_response(conn, req, status_code=404)
def handle_new_job_post(req: Request, conn: PacketServerConnection, db: ZODB.DB):
username = ax25.Address(conn.remote_callsign).call.upper().strip()
if 'cmd' not in req.payload:
logging.info(f"request {req} did not contain job command (cmd) key")
send_blank_response(conn, req, 401, "job post must contain cmd key containing str or list[str]")
return
if type(req.payload['cmd']) not in [str, list]:
send_blank_response(conn, req, 401, "job post must contain cmd key containing str or list[str]")
return
job = Job(req.payload['cmd'], owner=username)
with db.transaction() as storage:
try:
new_jid = job.queue(storage.root())
logging.info(f"New job created with id {new_jid}")
except:
logging.error(f"Failed to queue new job {job}:\n{format_exc()}")
send_blank_response(conn, req, 500, "unknown server error while queuing job")
return
send_blank_response(conn, req, 201, {'job_id': new_jid})
def handle_job_post(req: Request, conn: PacketServerConnection, db: ZODB.DB):
spl = [x for x in req.path.split("/") if x.strip() != ""]
if len(spl) == 1:
handle_new_job_post(req, conn, db)
else:
send_blank_response(conn, req, status_code=404)
def job_root_handler(req: Request, conn: PacketServerConnection, db: ZODB.DB):
logging.debug(f"{req} being processed by job_root_handler")
if not user_authorized(conn, db):
logging.debug(f"user {conn.remote_callsign} not authorized")
send_blank_response(conn, req, status_code=401)
return
logging.debug("user is authorized")
with db.transaction() as storage:
if 'jobs_enabled' in storage.root.config:
jobs_enabled = storage.root.config['jobs_enabled']
else:
jobs_enabled = False
if not jobs_enabled:
send_blank_response(conn, req, 400, payload="jobs not enabled on this server")
return
if req.method is Request.Method.GET:
handle_job_get(req, conn, db)
elif req.method is Request.Method.POST:
handle_job_post(req, conn, db)
else:
send_blank_response(conn, req, status_code=404)

View File

@@ -21,7 +21,7 @@ from traceback import format_exc
from collections import namedtuple
import re
since_regex = '''^message\/since\/(\d+)$'''
since_regex = """^message\\/since\\/(\\d+)$"""
def mailbox_create(username: str, db_root: PersistentMapping):
un = username.upper().strip()
@@ -401,7 +401,7 @@ def handle_message_post(req: Request, conn: PacketServerConnection, db: ZODB.DB)
send_blank_response(conn, req, status_code=201, payload={"successes": send_counter, "failed": failed})
def message_root_handler(req: Request, conn: PacketServerConnection, db: ZODB.DB):
logging.debug(f"{req} being processed by user_root_handler")
logging.debug(f"{req} being processed by message_root_handler")
if not user_authorized(conn, db):
logging.debug(f"user {conn.remote_callsign} not authorized")
send_blank_response(conn, req, status_code=401)
@@ -409,7 +409,7 @@ def message_root_handler(req: Request, conn: PacketServerConnection, db: ZODB.DB
logging.debug("user is authorized")
if req.method is Request.Method.GET:
handle_message_get(req, conn, db)
if req.method is Request.Method.POST:
elif req.method is Request.Method.POST:
handle_message_post(req, conn, db)
else:
send_blank_response(conn, req, status_code=404)

View File

@@ -6,6 +6,7 @@ from .bulletin import bulletin_root_handler
from .users import user_root_handler, user_authorized
from .objects import object_root_handler
from .messages import message_root_handler
from .jobs import job_root_handler
import logging
from typing import Union
import ZODB
@@ -57,7 +58,8 @@ standard_handlers = {
"bulletin": bulletin_root_handler,
"user": user_root_handler,
"object": object_root_handler,
"message": message_root_handler
"message": message_root_handler,
"job": job_root_handler
}