Source code for ocrd_network.runtime_data.network_agents
from logging import Logger
from time import sleep
from typing import Any
from re import search as re_search
from ..constants import DeployType
# TODO: Find appropriate replacement for the hack
[docs]
def deploy_agent_native_get_pid_hack(logger: Logger, ssh_client, start_cmd: str):
channel = ssh_client.invoke_shell()
stdin, stdout = channel.makefile("wb"), channel.makefile("rb")
# TODO: set back to debug
logger.info(f"Executing command: {start_cmd}")
# TODO: This hack should still be fixed
# Note left from @joschrew
# the only way (I could find) to make it work to start a process in the background and
# return early is this construction. The pid of the last started background process is
# printed with `echo $!` but it is printed between other output. Because of that I added
# `xyz` before and after the code to easily be able to filter out the pid via regex when
# returning from the function
stdin.write(f"{start_cmd}\n")
stdin.write("echo xyz$!xyz \n exit \n")
output = stdout.read().decode("utf-8")
stdout.close()
stdin.close()
pid = re_search(r"xyz([0-9]+)xyz", output).group(1) # type: ignore
return pid
# TODO: Implement the actual method that is missing
[docs]
def deploy_agent_docker_template(logger: Logger, docker_client, start_cmd: str):
"""
logger.debug(f"Executing command: {start_cmd}")
res = docker_client.containers.run("debian", "sleep 500s", detach=True, remove=True)
assert res and res.id, f"Starting docker network agent has failed with command: {start_cmd}"
return res.id
"""
raise Exception("Deploying docker type agents is not supported yet!")
[docs]
class DataNetworkAgent:
def __init__(
self, processor_name: str, deploy_type: DeployType,
host: str, init_by_config: bool, pid: Any = None
) -> None:
self.processor_name = processor_name
self.deploy_type = deploy_type
self.host = host
self.deployed_by_config = init_by_config
# The id is assigned when the agent is deployed
self.pid = pid
# Time to wait between deploying agents
self.wait_between_agent_deploys: float = 0.3
def __str__(self):
return f"{self.pid} {self.deploy_type} {self.processor_name} on host: {self.host}"
def _start_native_instance(self, logger: Logger, ssh_client, start_cmd: str):
if self.deploy_type != DeployType.NATIVE:
raise RuntimeError(f"Mismatch of deploy type when starting network agent: {self.processor_name}")
agent_pid = deploy_agent_native_get_pid_hack(logger=logger, ssh_client=ssh_client, start_cmd=start_cmd)
return agent_pid
def _start_docker_instance(self, logger: Logger, docker_client, start_cmd: str):
if self.deploy_type != DeployType.DOCKER:
raise RuntimeError(f"Mismatch of deploy type when starting network agent: {self.processor_name}")
agent_pid = deploy_agent_docker_template(logger=logger, docker_client=docker_client, start_cmd=start_cmd)
return agent_pid
[docs]
class DataProcessingWorker(DataNetworkAgent):
def __init__(
self, processor_name: str, deploy_type: DeployType, host: str, init_by_config: bool, pid: Any = None
) -> None:
super().__init__(
processor_name=processor_name, host=host, deploy_type=deploy_type,
init_by_config=init_by_config, pid=pid
)
[docs]
def deploy_network_agent(self, logger: Logger, connector_client, database_url: str, queue_url: str):
if self.deploy_type == DeployType.NATIVE:
start_cmd = f"{self.processor_name} --database {database_url} --queue {queue_url} &"
assert connector_client, f"SSH client connection missing."
self.pid = self._start_native_instance(logger, connector_client, start_cmd)
sleep(self.wait_between_agent_deploys)
return self.pid
if self.deploy_type == DeployType.DOCKER:
# TODO: add real command to start processing worker in docker here
start_cmd = ""
assert connector_client, f"Docker client connection missing."
if not start_cmd:
raise RuntimeError("Missing start command for the Processing Worker in docker mode")
self.pid = self._start_docker_instance(logger, connector_client, start_cmd)
sleep(self.wait_between_agent_deploys)
return self.pid
raise RuntimeError(f"Unknown deploy type of {self.__dict__}")