Updated server worker and podman orchestrator to make constant checking less necessary (to reduce disk and log usage).
This commit is contained in:
@@ -165,6 +165,8 @@ class PodmanOrchestrator(Orchestrator):
|
||||
self.started = False
|
||||
self.user_containers = {}
|
||||
self.manager_thread = None
|
||||
self._client = None
|
||||
self._five_min_ticker = 600
|
||||
|
||||
if uri:
|
||||
self.uri = uri
|
||||
@@ -174,8 +176,9 @@ class PodmanOrchestrator(Orchestrator):
|
||||
if uri_parsed.scheme == "unix":
|
||||
if not os.path.exists(uri_parsed.path):
|
||||
raise FileNotFoundError(f"Podman socket not found: {self.uri}")
|
||||
|
||||
logging.debug(f"Testing podman socket. Version: {self.client.version()}")
|
||||
test_client = self.new_client()
|
||||
logging.debug(f"Testing podman socket. Version: {test_client.info()}")
|
||||
self._client = None
|
||||
|
||||
self.username_containers = {}
|
||||
if options:
|
||||
@@ -185,8 +188,13 @@ class PodmanOrchestrator(Orchestrator):
|
||||
container_keepalive=300, name_prefix="packetserver_")
|
||||
|
||||
@property
|
||||
def client(self):
|
||||
return podman.PodmanClient(base_url=self.uri)
|
||||
def client(self) -> Optional[podman.PodmanClient]:
|
||||
return self._client
|
||||
|
||||
def new_client(self) -> podman.PodmanClient:
|
||||
cli = podman.PodmanClient(base_url=self.uri)
|
||||
self._client = cli
|
||||
return cli
|
||||
|
||||
def add_file_to_user_container(self, username: str, data: bytes, path: str, root_owned=False):
|
||||
cli = self.client
|
||||
@@ -416,7 +424,10 @@ class PodmanOrchestrator(Orchestrator):
|
||||
if not r.is_finished():
|
||||
self.touch_user_container(r.username)
|
||||
self.clean_containers()
|
||||
|
||||
if self._five_min_ticker >= 600:
|
||||
self.clean_orphaned_containers()
|
||||
self._five_min_ticker = 0
|
||||
|
||||
def manager(self):
|
||||
logging.debug("Starting podman orchestrator thread.")
|
||||
@@ -427,7 +438,9 @@ class PodmanOrchestrator(Orchestrator):
|
||||
|
||||
def start(self):
|
||||
if not self.started:
|
||||
self.new_client()
|
||||
self.clean_orphaned_containers()
|
||||
self._five_min_ticker = 0
|
||||
self.started = True
|
||||
self.manager_thread = Thread(target=self.manager)
|
||||
self.manager_thread.start()
|
||||
@@ -438,6 +451,7 @@ class PodmanOrchestrator(Orchestrator):
|
||||
|
||||
def stop(self):
|
||||
logging.debug("Stopping podman orchestrator.")
|
||||
self._client = None
|
||||
self.started = False
|
||||
cli = self.client
|
||||
self.user_containers = {}
|
||||
|
||||
@@ -47,6 +47,8 @@ class Server:
|
||||
self.started = False
|
||||
self.orchestrator = None
|
||||
self.worker_thread = None
|
||||
self.check_job_queue = True
|
||||
self.last_check_job_queue = datetime.datetime.now()
|
||||
if data_dir:
|
||||
data_path = Path(data_dir)
|
||||
else:
|
||||
@@ -114,6 +116,10 @@ class Server:
|
||||
def data_file(self) -> str:
|
||||
return str(Path(self.home_dir).joinpath('data.zopedb'))
|
||||
|
||||
def ping_job_queue(self):
|
||||
self.check_job_queue = True
|
||||
self.last_check_job_queue = datetime.datetime.now()
|
||||
|
||||
def server_connection_bouncer(self, conn: PacketServerConnection):
|
||||
logging.debug("new connection bouncer checking user status")
|
||||
# blacklist check
|
||||
@@ -184,6 +190,7 @@ class Server:
|
||||
connection.send_data(b"BAD REQUEST. DID NOT RECEIVE A REQUEST MESSAGE.")
|
||||
logging.debug(f"attempting to handle request {request}")
|
||||
self.handle_request(request, connection)
|
||||
self.ping_job_queue()
|
||||
logging.debug("request handled")
|
||||
|
||||
def server_receiver(self, conn: PacketServerConnection):
|
||||
@@ -201,8 +208,10 @@ class Server:
|
||||
if not self.started:
|
||||
return
|
||||
# Add things to do here:
|
||||
|
||||
if (self.orchestrator is not None) and self.orchestrator.started:
|
||||
now = datetime.datetime.now()
|
||||
if (now - self.last_check_job_queue).total_seconds() > 60:
|
||||
self.ping_job_queue()
|
||||
if (self.orchestrator is not None) and self.orchestrator.started and self.check_job_queue:
|
||||
with self.db.transaction() as storage:
|
||||
# queue as many jobs as possible
|
||||
while self.orchestrator.runners_available():
|
||||
@@ -222,6 +231,10 @@ class Server:
|
||||
logging.info(f"Started job {job}")
|
||||
else:
|
||||
break
|
||||
if len(storage.root.job_queue) == 0:
|
||||
self.check_job_queue = False
|
||||
else:
|
||||
self.ping_job_queue()
|
||||
|
||||
finished_runners = []
|
||||
for runner in self.orchestrator.runners:
|
||||
|
||||
Reference in New Issue
Block a user