diff --git a/src/packetserver/client/jobs.py b/src/packetserver/client/jobs.py index baba70f..42c8645 100644 --- a/src/packetserver/client/jobs.py +++ b/src/packetserver/client/jobs.py @@ -93,6 +93,29 @@ def send_job(client: Client, bbs_callsign: str, cmd: Union[str, list], db: bool raise RuntimeError(f"Sending job failed: {response.status_code}: {response.payload}") return response.payload['job_id'] +def send_job_quick(client: Client, bbs_callsign: str, cmd: Union[str, list], db: bool = False, env: dict = None, + files: dict = None) -> JobWrapper: + """Send a job using client to bbs_callsign with args cmd. Wait for quick job to return job results.""" + req = Request.blank() + req.path = "job" + req.payload = {'cmd': cmd} + req.set_var('quick', True) + if db: + req.payload['db'] = '' + if env is not None: + req.payload['env']= env + if files is not None: + req.payload['files'] = files + req.method = Request.Method.POST + response = client.send_receive_callsign(req, bbs_callsign) + if response.status_code == 200: + return JobWrapper(response.payload) + elif response.status_code == 202: + raise RuntimeError(f"Quick Job timed out. Job ID: {response.payload}") + else: + raise RuntimeError(f"Waiting for quick job failed: {response.status_code}: {response.payload}") + + def get_job_id(client: Client, bbs_callsign: str, job_id: int, get_data=True) -> JobWrapper: req = Request.blank() req.path = f"job/{job_id}" @@ -116,18 +139,27 @@ class JobSession: def send(self, cmd: Union[str, list], db: bool = False, env: dict = None, files: dict = None) -> int: return send_job(self.client, self.bbs, cmd, db=db, env=env, files=files) + def send_quick(self, cmd: Union[str, list], db: bool = False, env: dict = None, files: dict = None) -> JobWrapper: + return send_job_quick(self.client, self.bbs, cmd, db=db, env=env, files=files) + def get_id(self, jid: int) -> JobWrapper: return get_job_id(self.client, self.bbs, jid) - def run_job(self, cmd: Union[str, list], db: bool = False, env: dict = None, files: dict = None) -> JobWrapper: - jid = self.send(cmd, db=db, env=env, files=files) - time.sleep(self.stutter) - j = self.get_id(jid) - while not j.is_finished: + def run_job(self, cmd: Union[str, list], db: bool = False, env: dict = None, files: dict = None, + quick: bool = False) -> JobWrapper: + if quick: + j = send_job_quick(cmd, db=db, env=env, files=files) + self.job_log.append(j) + return j + else: + jid = self.send(cmd, db=db, env=env, files=files) time.sleep(self.stutter) j = self.get_id(jid) - self.job_log.append(j) - return j + while not j.is_finished: + time.sleep(self.stutter) + j = self.get_id(jid) + self.job_log.append(j) + return j diff --git a/src/packetserver/server/jobs.py b/src/packetserver/server/jobs.py index 0e3f4fb..c40ed1a 100644 --- a/src/packetserver/server/jobs.py +++ b/src/packetserver/server/jobs.py @@ -8,7 +8,7 @@ import datetime from typing import Self,Union,Optional,Tuple from traceback import format_exc from packetserver.common import PacketServerConnection, Request, Response, Message, send_response, send_blank_response -from packetserver.common.constants import no_values +from packetserver.common.constants import no_values, yes_values from packetserver.server.db import get_user_db_json import ZODB from persistent.list import PersistentList @@ -16,6 +16,7 @@ import logging from packetserver.server.users import user_authorized import gzip import tarfile +import time import json from packetserver.runner.podman import TarFileExtractor, PodmanOrchestrator, PodmanRunner, PodmanOptions from packetserver.runner import Orchestrator, Runner, RunnerStatus, RunnerFile @@ -261,6 +262,13 @@ def handle_job_get(req: Request, conn: PacketServerConnection, db: ZODB.DB): def handle_new_job_post(req: Request, conn: PacketServerConnection, db: ZODB.DB): username = ax25.Address(conn.remote_callsign).call.upper().strip() + quick = False + if 'quick' in req.vars: + quick_val = req.vars['quick'] + if type(quick_val) is str: + quick_val = quick_val.lower() + if quick_val in yes_values: + quick = True if 'cmd' not in req.payload: logging.info(f"request {req} did not contain job command (cmd) key") send_blank_response(conn, req, 401, "job post must contain cmd key containing str or list[str]") @@ -293,7 +301,31 @@ def handle_new_job_post(req: Request, conn: PacketServerConnection, db: ZODB.DB) logging.error(f"Failed to queue new job {job}:\n{format_exc()}") send_blank_response(conn, req, 500, "unknown server error while queuing job") return - send_blank_response(conn, req, 201, {'job_id': new_jid}) + if quick: + start_time = datetime.datetime.now() + now = datetime.datetime.now() + job_done = False + quick_job = None + logging.debug(f"{start_time}: Waiting for a quick job for 30 seconds") + while (now - start_time).total_seconds() < 30: + with db.transaction() as storage: + try: + j = Job.get_job_by_id(new_jid, storage.root()) + if j.is_finished: + job_done = True + quick_job = j + break + except: + pass + time.sleep(1) + now = datetime.datetime.now() + if job_done and (type(quick_job) is Job): + send_blank_response(conn, req, 200, job.to_dict(include_data=True)) + else: + logging.warning(f"Quick job {new_jid} timed out.") + send_blank_response(conn, req, status_code=202, payload={'job_id': new_jid, 'msg': 'queued'}) + else: + send_blank_response(conn, req, 201, {'job_id': new_jid}) def handle_job_post(req: Request, conn: PacketServerConnection, db: ZODB.DB): spl = [x for x in req.path.split("/") if x.strip() != ""]