Adding quick job to client and server.

This commit is contained in:
Michael Woods
2025-02-16 11:31:54 -05:00
parent 071cd14477
commit 99bb3ac7ce
2 changed files with 73 additions and 9 deletions

View File

@@ -93,6 +93,29 @@ def send_job(client: Client, bbs_callsign: str, cmd: Union[str, list], db: bool
raise RuntimeError(f"Sending job failed: {response.status_code}: {response.payload}") raise RuntimeError(f"Sending job failed: {response.status_code}: {response.payload}")
return response.payload['job_id'] return response.payload['job_id']
def send_job_quick(client: Client, bbs_callsign: str, cmd: Union[str, list], db: bool = False, env: dict = None,
files: dict = None) -> JobWrapper:
"""Send a job using client to bbs_callsign with args cmd. Wait for quick job to return job results."""
req = Request.blank()
req.path = "job"
req.payload = {'cmd': cmd}
req.set_var('quick', True)
if db:
req.payload['db'] = ''
if env is not None:
req.payload['env']= env
if files is not None:
req.payload['files'] = files
req.method = Request.Method.POST
response = client.send_receive_callsign(req, bbs_callsign)
if response.status_code == 200:
return JobWrapper(response.payload)
elif response.status_code == 202:
raise RuntimeError(f"Quick Job timed out. Job ID: {response.payload}")
else:
raise RuntimeError(f"Waiting for quick job failed: {response.status_code}: {response.payload}")
def get_job_id(client: Client, bbs_callsign: str, job_id: int, get_data=True) -> JobWrapper: def get_job_id(client: Client, bbs_callsign: str, job_id: int, get_data=True) -> JobWrapper:
req = Request.blank() req = Request.blank()
req.path = f"job/{job_id}" req.path = f"job/{job_id}"
@@ -116,18 +139,27 @@ class JobSession:
def send(self, cmd: Union[str, list], db: bool = False, env: dict = None, files: dict = None) -> int: def send(self, cmd: Union[str, list], db: bool = False, env: dict = None, files: dict = None) -> int:
return send_job(self.client, self.bbs, cmd, db=db, env=env, files=files) return send_job(self.client, self.bbs, cmd, db=db, env=env, files=files)
def send_quick(self, cmd: Union[str, list], db: bool = False, env: dict = None, files: dict = None) -> JobWrapper:
return send_job_quick(self.client, self.bbs, cmd, db=db, env=env, files=files)
def get_id(self, jid: int) -> JobWrapper: def get_id(self, jid: int) -> JobWrapper:
return get_job_id(self.client, self.bbs, jid) return get_job_id(self.client, self.bbs, jid)
def run_job(self, cmd: Union[str, list], db: bool = False, env: dict = None, files: dict = None) -> JobWrapper: def run_job(self, cmd: Union[str, list], db: bool = False, env: dict = None, files: dict = None,
jid = self.send(cmd, db=db, env=env, files=files) quick: bool = False) -> JobWrapper:
time.sleep(self.stutter) if quick:
j = self.get_id(jid) j = send_job_quick(cmd, db=db, env=env, files=files)
while not j.is_finished: self.job_log.append(j)
return j
else:
jid = self.send(cmd, db=db, env=env, files=files)
time.sleep(self.stutter) time.sleep(self.stutter)
j = self.get_id(jid) j = self.get_id(jid)
self.job_log.append(j) while not j.is_finished:
return j time.sleep(self.stutter)
j = self.get_id(jid)
self.job_log.append(j)
return j

View File

@@ -8,7 +8,7 @@ import datetime
from typing import Self,Union,Optional,Tuple from typing import Self,Union,Optional,Tuple
from traceback import format_exc from traceback import format_exc
from packetserver.common import PacketServerConnection, Request, Response, Message, send_response, send_blank_response from packetserver.common import PacketServerConnection, Request, Response, Message, send_response, send_blank_response
from packetserver.common.constants import no_values from packetserver.common.constants import no_values, yes_values
from packetserver.server.db import get_user_db_json from packetserver.server.db import get_user_db_json
import ZODB import ZODB
from persistent.list import PersistentList from persistent.list import PersistentList
@@ -16,6 +16,7 @@ import logging
from packetserver.server.users import user_authorized from packetserver.server.users import user_authorized
import gzip import gzip
import tarfile import tarfile
import time
import json import json
from packetserver.runner.podman import TarFileExtractor, PodmanOrchestrator, PodmanRunner, PodmanOptions from packetserver.runner.podman import TarFileExtractor, PodmanOrchestrator, PodmanRunner, PodmanOptions
from packetserver.runner import Orchestrator, Runner, RunnerStatus, RunnerFile from packetserver.runner import Orchestrator, Runner, RunnerStatus, RunnerFile
@@ -261,6 +262,13 @@ def handle_job_get(req: Request, conn: PacketServerConnection, db: ZODB.DB):
def handle_new_job_post(req: Request, conn: PacketServerConnection, db: ZODB.DB): def handle_new_job_post(req: Request, conn: PacketServerConnection, db: ZODB.DB):
username = ax25.Address(conn.remote_callsign).call.upper().strip() username = ax25.Address(conn.remote_callsign).call.upper().strip()
quick = False
if 'quick' in req.vars:
quick_val = req.vars['quick']
if type(quick_val) is str:
quick_val = quick_val.lower()
if quick_val in yes_values:
quick = True
if 'cmd' not in req.payload: if 'cmd' not in req.payload:
logging.info(f"request {req} did not contain job command (cmd) key") logging.info(f"request {req} did not contain job command (cmd) key")
send_blank_response(conn, req, 401, "job post must contain cmd key containing str or list[str]") send_blank_response(conn, req, 401, "job post must contain cmd key containing str or list[str]")
@@ -293,7 +301,31 @@ def handle_new_job_post(req: Request, conn: PacketServerConnection, db: ZODB.DB)
logging.error(f"Failed to queue new job {job}:\n{format_exc()}") logging.error(f"Failed to queue new job {job}:\n{format_exc()}")
send_blank_response(conn, req, 500, "unknown server error while queuing job") send_blank_response(conn, req, 500, "unknown server error while queuing job")
return return
send_blank_response(conn, req, 201, {'job_id': new_jid}) if quick:
start_time = datetime.datetime.now()
now = datetime.datetime.now()
job_done = False
quick_job = None
logging.debug(f"{start_time}: Waiting for a quick job for 30 seconds")
while (now - start_time).total_seconds() < 30:
with db.transaction() as storage:
try:
j = Job.get_job_by_id(new_jid, storage.root())
if j.is_finished:
job_done = True
quick_job = j
break
except:
pass
time.sleep(1)
now = datetime.datetime.now()
if job_done and (type(quick_job) is Job):
send_blank_response(conn, req, 200, job.to_dict(include_data=True))
else:
logging.warning(f"Quick job {new_jid} timed out.")
send_blank_response(conn, req, status_code=202, payload={'job_id': new_jid, 'msg': 'queued'})
else:
send_blank_response(conn, req, 201, {'job_id': new_jid})
def handle_job_post(req: Request, conn: PacketServerConnection, db: ZODB.DB): def handle_job_post(req: Request, conn: PacketServerConnection, db: ZODB.DB):
spl = [x for x in req.path.split("/") if x.strip() != ""] spl = [x for x in req.path.split("/") if x.strip() != ""]