Source code for ocrd_network.runtime_data.network_agents

from logging import Logger
from typing import Any

from re import search as re_search
from ..constants import AgentType, DeployType


# TODO: Find appropriate replacement for the hack
[docs] def deploy_agent_native_get_pid_hack(logger: Logger, ssh_client, start_cmd: str): channel = ssh_client.invoke_shell() stdin, stdout = channel.makefile("wb"), channel.makefile("rb") logger.debug(f"Executing command: {start_cmd}") # TODO: This hack should still be fixed # Note left from @joschrew # the only way (I could find) to make it work to start a process in the background and # return early is this construction. The pid of the last started background process is # printed with `echo $!` but it is printed between other output. Because of that I added # `xyz` before and after the code to easily be able to filter out the pid via regex when # returning from the function stdin.write(f"{start_cmd}\n") stdin.write("echo xyz$!xyz \n exit \n") output = stdout.read().decode("utf-8") stdout.close() stdin.close() return re_search(r"xyz([0-9]+)xyz", output).group(1) # type: ignore
# TODO: Implement the actual method that is missing
[docs] def deploy_agent_docker_template(logger: Logger, docker_client, start_cmd: str): """ logger.debug(f"Executing command: {start_cmd}") res = docker_client.containers.run("debian", "sleep 500s", detach=True, remove=True) assert res and res.id, f"Starting docker network agent has failed with command: {start_cmd}" return res.id """ raise Exception("Deploying docker type agents is not supported yet!")
[docs] class DataNetworkAgent: def __init__( self, processor_name: str, deploy_type: DeployType, agent_type: AgentType, host: str, init_by_config: bool, pid: Any = None ) -> None: self.processor_name = processor_name self.deploy_type = deploy_type self.host = host self.deployed_by_config = init_by_config self.agent_type = agent_type # The id is assigned when the agent is deployed self.pid = pid def _start_native_instance(self, logger: Logger, ssh_client, start_cmd: str): if self.deploy_type != DeployType.NATIVE: raise RuntimeError(f"Mismatch of deploy type when starting network agent: {self.processor_name}") agent_pid = deploy_agent_native_get_pid_hack(logger=logger, ssh_client=ssh_client, start_cmd=start_cmd) return agent_pid def _start_docker_instance(self, logger: Logger, docker_client, start_cmd: str): if self.deploy_type != DeployType.DOCKER: raise RuntimeError(f"Mismatch of deploy type when starting network agent: {self.processor_name}") agent_pid = deploy_agent_docker_template(logger=logger, docker_client=docker_client, start_cmd=start_cmd) return agent_pid
[docs] class DataProcessingWorker(DataNetworkAgent): def __init__( self, processor_name: str, deploy_type: DeployType, host: str, init_by_config: bool, pid: Any = None ) -> None: super().__init__( processor_name=processor_name, host=host, deploy_type=deploy_type, agent_type=AgentType.PROCESSING_WORKER, init_by_config=init_by_config, pid=pid )
[docs] def deploy_network_agent(self, logger: Logger, connector_client, database_url: str, queue_url: str): if self.deploy_type == DeployType.NATIVE: start_cmd = f"{self.processor_name} {self.agent_type} --database {database_url} --queue {queue_url} &" self.pid = self._start_native_instance(logger, connector_client, start_cmd) return self.pid if self.deploy_type == DeployType.DOCKER: # TODO: add real command to start processing worker in docker here start_cmd = f"" self.pid = self._start_docker_instance(logger, connector_client, start_cmd) return self.pid raise RuntimeError(f"Unknown deploy type of {self.__dict__}")
[docs] class DataProcessorServer(DataNetworkAgent): def __init__( self, processor_name: str, deploy_type: DeployType, host: str, port: int, init_by_config: bool, pid: Any = None ) -> None: super().__init__( processor_name=processor_name, host=host, deploy_type=deploy_type, agent_type=AgentType.PROCESSOR_SERVER, init_by_config=init_by_config, pid=pid ) self.port = port
[docs] def deploy_network_agent(self, logger: Logger, connector_client, database_url: str): agent_address = f"{self.host}:{self.port}" if self.deploy_type == DeployType.NATIVE: start_cmd = f"{self.processor_name} {self.agent_type} --address {agent_address} --database {database_url} &" self.pid = self._start_native_instance(logger, connector_client, start_cmd) return self.pid if self.deploy_type == DeployType.DOCKER: # TODO: add real command to start processor server in docker here start_cmd = f"" self.pid = self._start_docker_instance(logger, connector_client, start_cmd) return self.pid raise RuntimeError(f"Unknown deploy type of {self.__dict__}")