Source code for ocrd_network.runtime_data.network_agents

from logging import Logger
from time import sleep
from typing import Any

from re import search as re_search
from ..constants import DeployType


# TODO: Find appropriate replacement for the hack
[docs] def deploy_agent_native_get_pid_hack(logger: Logger, ssh_client, start_cmd: str): channel = ssh_client.invoke_shell() stdin, stdout = channel.makefile("wb"), channel.makefile("rb") # TODO: set back to debug logger.info(f"Executing command: {start_cmd}") # TODO: This hack should still be fixed # Note left from @joschrew # the only way (I could find) to make it work to start a process in the background and # return early is this construction. The pid of the last started background process is # printed with `echo $!` but it is printed between other output. Because of that I added # `xyz` before and after the code to easily be able to filter out the pid via regex when # returning from the function stdin.write(f"{start_cmd}\n") stdin.write("echo xyz$!xyz \n exit \n") output = stdout.read().decode("utf-8") stdout.close() stdin.close() pid = re_search(r"xyz([0-9]+)xyz", output).group(1) # type: ignore return pid
# TODO: Implement the actual method that is missing
[docs] def deploy_agent_docker_template(logger: Logger, docker_client, start_cmd: str): """ logger.debug(f"Executing command: {start_cmd}") res = docker_client.containers.run("debian", "sleep 500s", detach=True, remove=True) assert res and res.id, f"Starting docker network agent has failed with command: {start_cmd}" return res.id """ raise Exception("Deploying docker type agents is not supported yet!")
[docs] class DataNetworkAgent: def __init__( self, processor_name: str, deploy_type: DeployType, host: str, init_by_config: bool, pid: Any = None ) -> None: self.processor_name = processor_name self.deploy_type = deploy_type self.host = host self.deployed_by_config = init_by_config # The id is assigned when the agent is deployed self.pid = pid # Time to wait between deploying agents self.wait_between_agent_deploys: float = 0.3 def __str__(self): return f"{self.pid} {self.deploy_type} {self.processor_name} on host: {self.host}" def _start_native_instance(self, logger: Logger, ssh_client, start_cmd: str): if self.deploy_type != DeployType.NATIVE: raise RuntimeError(f"Mismatch of deploy type when starting network agent: {self.processor_name}") agent_pid = deploy_agent_native_get_pid_hack(logger=logger, ssh_client=ssh_client, start_cmd=start_cmd) return agent_pid def _start_docker_instance(self, logger: Logger, docker_client, start_cmd: str): if self.deploy_type != DeployType.DOCKER: raise RuntimeError(f"Mismatch of deploy type when starting network agent: {self.processor_name}") agent_pid = deploy_agent_docker_template(logger=logger, docker_client=docker_client, start_cmd=start_cmd) return agent_pid
[docs] class DataProcessingWorker(DataNetworkAgent): def __init__( self, processor_name: str, deploy_type: DeployType, host: str, init_by_config: bool, pid: Any = None ) -> None: super().__init__( processor_name=processor_name, host=host, deploy_type=deploy_type, init_by_config=init_by_config, pid=pid )
[docs] def deploy_network_agent(self, logger: Logger, connector_client, database_url: str, queue_url: str): if self.deploy_type == DeployType.NATIVE: start_cmd = f"{self.processor_name} --database {database_url} --queue {queue_url} &" assert connector_client, f"SSH client connection missing." self.pid = self._start_native_instance(logger, connector_client, start_cmd) sleep(self.wait_between_agent_deploys) return self.pid if self.deploy_type == DeployType.DOCKER: # TODO: add real command to start processing worker in docker here start_cmd = "" assert connector_client, f"Docker client connection missing." if not start_cmd: raise RuntimeError("Missing start command for the Processing Worker in docker mode") self.pid = self._start_docker_instance(logger, connector_client, start_cmd) sleep(self.wait_between_agent_deploys) return self.pid raise RuntimeError(f"Unknown deploy type of {self.__dict__}")