Adding job type to server db.

This commit is contained in:
Michael Woods
2025-02-14 22:53:43 -05:00
parent f7d9f9a2e6
commit 3c9b7d0c55
8 changed files with 219 additions and 9 deletions

View File

@@ -1 +1 @@
VERSION="0.1-alpha"
VERSION="0.2-alpha"

View File

@@ -83,7 +83,8 @@ class Runner:
self.args = args
self.job_id = int(job_id)
self.env = {}
self.started = datetime.datetime.now()
self.started_at = datetime.datetime.now()
self.finished_at = None
self._result = (0,(b'', b''))
if environment:
for key in environment:
@@ -119,8 +120,15 @@ class Runner:
def output(self) -> bytes:
raise RuntimeError("Attempting to interact with an abstract class.")
def output_str(self) -> str:
raise RuntimeError("Attempting to interact with an abstract class.")
@property
def errors(self) -> str:
def errors(self) -> bytes:
raise RuntimeError("Attempting to interact with an abstract class.")
@property
def errors_str(self) -> str:
raise RuntimeError("Attempting to interact with an abstract class.")
@property
@@ -131,6 +139,10 @@ class Runner:
def artifacts(self) -> TarFileExtractor:
raise RuntimeError("Attempting to interact with an abstract class.")
@property
def has_artifacts(self) -> bool:
raise RuntimeError("Abstract method called.")
class Orchestrator:
"""Abstract class holds configuration and also tracks runners through their lifecycle. Prepares environments to
run jobs in runners."""

View File

@@ -40,7 +40,7 @@ rm -rfv "${PACKETSERVER_JOB_DIR}"
podman_bash_start = """ echo 'waiting for /root/scripts/container_run_script.sh to exist'
while ! [ -f '/root/scripts/container_run_script.sh' ]; do
tail /dev/null
sleep .1
done
echo 'entering /root/scripts/container_run_script.sh ...'
bash /root/scripts/container_run_script.sh

View File

@@ -81,6 +81,7 @@ class PodmanRunner(Runner):
except:
logging.warning(f"Error retrieving artifacts for {self.job_id}:\n{format_exc()}")
self._artifact_archive = b''
self.finished_at = datetime.datetime.now()
# set final status to FAILED or SUCCEEDED
if self.return_code == 0:
self.status = RunnerStatus.SUCCESSFUL
@@ -96,6 +97,9 @@ class PodmanRunner(Runner):
@property
def artifacts(self) -> TarFileExtractor:
if self._artifact_archive == b'':
return TarFileExtractor(BytesIO(b''))
else:
return TarFileExtractor(gzip.GzipFile(fileobj=BytesIO(self._artifact_archive)))
@property
@@ -103,7 +107,7 @@ class PodmanRunner(Runner):
return self._result[1][0]
@property
def str_output(self) -> str:
def output_str(self) -> str:
try:
output = self.output.decode()
except:
@@ -112,6 +116,10 @@ class PodmanRunner(Runner):
@property
def errors(self) -> str:
return self._result[1][1]
@property
def errors_str(self) -> str:
return self._result[1][1].decode()
@property
@@ -424,6 +432,10 @@ class PodmanOrchestrator(Orchestrator):
self.manager_thread = Thread(target=self.manager)
self.manager_thread.start()
def __del__(self):
if self.started:
self.stop()
def stop(self):
logging.debug("Stopping podman orchestrator.")
self.started = False

View File

@@ -26,7 +26,6 @@ def init_bulletins(root: PersistentMapping):
if 'bulletin_counter' not in root:
root['bulletin_counter'] = 0
class Server:
def __init__(self, pe_server: str, port: int, server_callsign: str, data_dir: str = None, zeo: bool = True):
if not ax25.Address.valid_call(server_callsign):
@@ -79,6 +78,13 @@ class Server:
if 'objects' not in conn.root():
logging.debug("objects bucket missing, creating")
conn.root.objects = OOBTree()
if 'jobs' not in conn.root():
logging.debug("jobss bucket missing, creating")
conn.root.jobs = OOBTree()
if 'job_queue' not in conn.root():
conn.root.job_queue = PersistentList()
if 'user_jobs' not in conn.root():
conn.root.user_jobs = PersistentMapping()
init_bulletins(conn.root())
self.app = pe.app.Application()
PacketServerConnection.receive_subscribers.append(lambda x: self.server_receiver(x))
@@ -179,6 +185,7 @@ class Server:
if not self.started:
return
# Add things to do here:
# TODO Queue jobs if applicable.
def run_worker(self):
"""Intended to be running as a thread."""

View File

@@ -9,7 +9,8 @@ def get_user_db(username: str, db: ZODB.DB) -> dict:
"objects": {},
"messages": [],
"user": {},
"bulletins": []
"bulletins": [],
"jobs": []
}
username = username.strip().upper()
with db.transaction() as db_conn:
@@ -31,6 +32,8 @@ def get_user_db(username: str, db: ZODB.DB) -> dict:
for b in db_conn.root.bulletins:
udb['bulletins'].append(b.to_dict())
# TODO pack jobs into output
return udb
def get_user_db_json(username: str, db: ZODB.DB, gzip_output=True) -> bytes:

View File

@@ -0,0 +1,171 @@
import ax25
import persistent
import persistent.list
from persistent.mapping import PersistentMapping
import datetime
from typing import Self,Union,Optional,Tuple
from packetserver.common import PacketServerConnection, Request, Response, Message, send_response, send_blank_response
import ZODB
from persistent.list import PersistentList
import logging
from packetserver.server.users import user_authorized
import gzip
import tarfile
import json
from packetserver.runner.podman import TarFileExtractor
from enum import Enum
from io import BytesIO
import base64
class JobStatus(Enum):
CREATED = 1
QUEUED = 2
STARTING = 3
RUNNING = 4
STOPPING = 5
SUCCESSFUL = 6
FAILED = 7
TIMED_OUT = 8
def get_new_job_id(root: PersistentMapping) -> int:
if 'job_counter' not in root:
root['job_counter'] = 1
return 0
else:
current = root['job_counter']
root['job_counter'] = current + 1
return current
class Job(persistent.Persistent):
@classmethod
def get_job_by_id(cls, jid: int, db_root: PersistentMapping) -> Optional[Self]:
if jid in db_root['jobs']:
return db_root['jobs'][jid]
return None
@classmethod
def get_jobs_by_username(cls, username:str, db_root: PersistentMapping) -> list[Self]:
un = username.strip().upper()
if un in db_root['user_jobs']:
l = []
for j in db_root['user_jobs'][un]:
l.append(Job.get_job_by_id(j, db_root))
return l
else:
return []
@classmethod
def num_jobs_queued(cls, db_root: PersistentMapping) -> int:
return len(db_root['job_queue'])
@classmethod
def jobs_in_queue(cls, db_root: PersistentMapping) -> bool:
if Job.num_jobs_queued(db_root) > 0:
return True
else:
return False
@classmethod
def get_next_queued_job(cls, db_root: PersistentMapping) -> Self:
return db_root['job_queue'][0]
def __init__(self, cmd: Union[list[str], str], owner: Optional[str] = None, timeout: int = 300):
self.owner = None
if self.owner is not None:
self.owner = str(owner).upper().strip()
self.cmd = cmd
self.created_at = datetime.datetime.now(datetime.UTC)
self.started_at = None
self.finished_at = None
self._artifact_archive = b''
self.output = b''
self.errors = b''
self.return_code = 0
self.id = None
self.status = JobStatus.CREATED
@property
def is_finished(self) -> bool:
if self.finished_at is None:
return False
else:
return True
@property
def output_str(self) -> str:
return self.output.decode()
@property
def errors_str(self) -> str:
return self.errors.decode()
@property
def artifacts(self) -> TarFileExtractor:
if self._artifact_archive == b'':
return TarFileExtractor(BytesIO(b''))
else:
return TarFileExtractor(gzip.GzipFile(fileobj=BytesIO(self._artifact_archive)))
@property
def num_artifacts(self) -> int:
return len(list(self.artifacts))
def __repr__(self) -> str:
return f"<Job[{self.id}] - {self.owner} - {self.status.name}>"
def artifact(self, index: int) -> Tuple[str, bytes]:
artifacts = list(self.artifacts)
if (index + 1) > len(artifacts):
raise IndexError(f"Index {index} out of bounds.")
else:
return artifacts[index][0], artifacts[index][1].read()
def queue(self, db_root: PersistentMapping) -> int:
if self.owner is None or (str(self.owner).strip() == ""):
raise ValueError("Job must have an owner to be queued.")
if self.id is None:
self.id = get_new_job_id(db_root)
owner = self.owner.upper().strip()
if owner not in db_root['user_jobs']:
db_root['user_jobs'][owner] = PersistentList()
db_root['jobs'][self.id] = self
db_root['job_queue'].append(self.id)
return self.id
def to_dict(self, include_data: bool = True, binary_safe: bool = False):
started_at = None
finished_at = None
if self.started_at is not None:
started_at = self.started_at.isoformat()
if self.finished_at is not None:
finished_at = self.finished_at.isoformat()
output = {
"cmd": self.cmd,
"owner": self.owner,
"created_at": self.created_at.isoformat(),
"started_at": started_at,
"finished_at": finished_at,
"output": b'',
"errors": b'',
"return_code": self.return_code,
"artifacts": [],
"status": self.status,
"id": self.id
}
if include_data:
if binary_safe:
output['output'] = base64.b64encode(self.output).decode()
output['errors'] = base64.b64encode(self.errors).decode()
else:
output['output'] = self.output
output['errors'] = self.errors
for a in self.artifacts:
if binary_safe:
output['artifacts'].append((a[0], base64.b64encode(a[1].read()).decode()))
else:
output['artifacts'].append((a[0], a[1].read()))
return output
def json(self, include_data: bool = True) -> str:
return json.dumps(self.to_dict(include_data=include_data, binary_safe=True))

View File

@@ -17,11 +17,14 @@ def handle_root_get(req: Request, conn: PacketServerConnection,
response.compression = Message.CompressionType.BZIP2
operator = ""
motd = ""
jobs_enabled = False
with db.transaction() as storage:
if 'motd' in storage.root.config:
motd = storage.root.config['motd']
if 'operator' in storage.root.config:
operator = storage.root.config['operator']
if 'jobs_enabled' in storage.root.config:
jobs_enabled = storage.root.config['jobs_enabled']
logging.debug(f"Root handler retrieved config. {operator} - {motd}")
logging.debug("Running user_authorized")
if user_authorized(conn, db):
@@ -32,8 +35,10 @@ def handle_root_get(req: Request, conn: PacketServerConnection,
response.payload = {
'operator': operator,
'motd': motd,
'user': user_message
'user': user_message,
'accepts_jobs': jobs_enabled
}
logging.debug(f"Sending response {response}")
send_response(conn, response, req)
logging.debug("Sent reesponse.")