Source code for ocrd_network.runtime_data.hosts

from logging import Logger
from time import sleep
from typing import Dict, List, Union

from .connection_clients import create_docker_client, create_ssh_client
from .network_agents import AgentType, DataNetworkAgent, DataProcessingWorker, DataProcessorServer, DeployType


[docs] class DataHost: def __init__( self, host: str, username: str, password: str, keypath: str, workers: List[Dict], servers: List[Dict] ) -> None: self.host = host self.username = username self.password = password self.keypath = keypath # These flags are used to track whether a connection of the specified # type should be created based on the received config file self.needs_ssh_connector: bool = False self.needs_docker_connector: bool = False # Connection clients, ssh for native deployment, docker for docker deployment self.ssh_client = None self.docker_client = None # Time to wait between deploying agents self.wait_between_agent_deploys: float = 0.3 # Lists of network agents based on their agent and deployment type self.network_agents_worker_native = [] self.network_agents_worker_docker = [] self.network_agents_server_native = [] self.network_agents_server_docker = [] if not workers: workers = [] if not servers: servers = [] self.__parse_network_agents_workers(processing_workers=workers) self.__parse_network_agents_servers(processor_servers=servers) # Used for caching deployed Processor Servers' ports on the current host # Key: processor_name, Value: list of ports self.processor_servers_ports: dict = {} def __add_deployed_agent_server_port_to_cache(self, processor_name: str, port: int) -> None: if processor_name not in self.processor_servers_ports: self.processor_servers_ports[processor_name] = [port] return self.processor_servers_ports[processor_name] = self.processor_servers_ports[processor_name].append(port) def __append_network_agent_to_lists(self, agent_data: DataNetworkAgent) -> None: if agent_data.deploy_type != DeployType.DOCKER and agent_data.deploy_type != DeployType.NATIVE: raise ValueError(f"Network agent deploy type is unknown: {agent_data.deploy_type}") if agent_data.agent_type != AgentType.PROCESSING_WORKER and agent_data.agent_type != AgentType.PROCESSOR_SERVER: raise ValueError(f"Network agent type is unknown: {agent_data.agent_type}") if agent_data.deploy_type == DeployType.NATIVE: self.needs_ssh_connector = True if agent_data.agent_type == AgentType.PROCESSING_WORKER: self.network_agents_worker_native.append(agent_data) if agent_data.agent_type == AgentType.PROCESSOR_SERVER: self.network_agents_server_native.append(agent_data) if agent_data.deploy_type == DeployType.DOCKER: self.needs_docker_connector = True if agent_data.agent_type == AgentType.PROCESSING_WORKER: self.network_agents_worker_docker.append(agent_data) if agent_data.agent_type == AgentType.PROCESSOR_SERVER: self.network_agents_server_docker.append(agent_data) def __parse_network_agents_servers(self, processor_servers: List[Dict]): for server in processor_servers: server_data = DataProcessorServer( processor_name=server["name"], deploy_type=server["deploy_type"], host=self.host, port=int(server["port"]), init_by_config=True, pid=None ) self.__append_network_agent_to_lists(agent_data=server_data) def __parse_network_agents_workers(self, processing_workers: List[Dict]): for worker in processing_workers: worker_data = DataProcessingWorker( processor_name=worker["name"], deploy_type=worker["deploy_type"], host=self.host, init_by_config=True, pid=None ) for _ in range(int(worker["number_of_instance"])): self.__append_network_agent_to_lists(agent_data=worker_data)
[docs] def create_connection_client(self, client_type: str): if client_type not in ["docker", "ssh"]: raise ValueError(f"Host client type cannot be of type: {client_type}") if client_type == "ssh": self.ssh_client = create_ssh_client(self.host, self.username, self.password, self.keypath) return self.ssh_client if client_type == "docker": self.docker_client = create_docker_client(self.host, self.username, self.password, self.keypath) return self.docker_client
def __deploy_network_agent( self, logger: Logger, agent_data: Union[DataProcessorServer, DataProcessingWorker], mongodb_url: str, rabbitmq_url: str ) -> None: deploy_type = agent_data.deploy_type agent_type = agent_data.agent_type name = agent_data.processor_name agent_info = f"network agent: {agent_type}, deploy: {deploy_type}, name: {name}, host: {self.host}" logger.info(f"Deploying {agent_info}") connection_client = None if deploy_type == DeployType.NATIVE: assert self.ssh_client, f"SSH client connection missing." connection_client = self.ssh_client if deploy_type == DeployType.DOCKER: assert self.docker_client, f"Docker client connection missing." connection_client = self.docker_client if agent_type == AgentType.PROCESSING_WORKER: agent_data.deploy_network_agent(logger, connection_client, mongodb_url, rabbitmq_url) if agent_type == AgentType.PROCESSOR_SERVER: agent_data.deploy_network_agent(logger, connection_client, mongodb_url) sleep(self.wait_between_agent_deploys) def __deploy_network_agents_workers(self, logger: Logger, mongodb_url: str, rabbitmq_url: str): logger.info(f"Deploying processing workers on host: {self.host}") amount_workers = len(self.network_agents_worker_native) + len(self.network_agents_worker_docker) if not amount_workers: logger.info(f"No processing workers found to be deployed") for data_worker in self.network_agents_worker_native: self.__deploy_network_agent(logger, data_worker, mongodb_url, rabbitmq_url) for data_worker in self.network_agents_worker_docker: self.__deploy_network_agent(logger, data_worker, mongodb_url, rabbitmq_url) def __deploy_network_agents_servers(self, logger: Logger, mongodb_url: str, rabbitmq_url: str): logger.info(f"Deploying processor servers on host: {self.host}") amount_servers = len(self.network_agents_server_native) + len(self.network_agents_server_docker) if not amount_servers: logger.info(f"No processor servers found to be deployed") for data_server in self.network_agents_server_native: self.__deploy_network_agent(logger, data_server, mongodb_url, rabbitmq_url) self.__add_deployed_agent_server_port_to_cache(data_server.processor_name, data_server.port) for data_server in self.network_agents_server_docker: self.__deploy_network_agent(logger, data_server, mongodb_url, rabbitmq_url) self.__add_deployed_agent_server_port_to_cache(data_server.processor_name, data_server.port)
[docs] def deploy_network_agents(self, logger: Logger, mongodb_url: str, rabbitmq_url: str) -> None: if self.needs_ssh_connector and not self.ssh_client: logger.debug("Creating missing ssh connector before deploying") self.ssh_client = self.create_connection_client(client_type="ssh") if self.needs_docker_connector: logger.debug("Creating missing docker connector before deploying") self.docker_client = self.create_connection_client(client_type="docker") self.__deploy_network_agents_workers(logger=logger, mongodb_url=mongodb_url, rabbitmq_url=rabbitmq_url) self.__deploy_network_agents_servers(logger=logger, mongodb_url=mongodb_url, rabbitmq_url=rabbitmq_url) if self.ssh_client: self.ssh_client.close() self.ssh_client = None if self.docker_client: self.docker_client.close() self.docker_client = None
def __stop_network_agent(self, logger: Logger, name: str, deploy_type: DeployType, agent_type: AgentType, pid: str): agent_info = f"network agent: {agent_type}, deploy: {deploy_type}, name: {name}" if not pid: logger.warning(f"No pid was passed for {agent_info}") return agent_info += f", pid: {pid}" logger.info(f"Stopping {agent_info}") if deploy_type == DeployType.NATIVE: assert self.ssh_client, f"SSH client connection missing" self.ssh_client.exec_command(f"kill {pid}") if deploy_type == DeployType.DOCKER: assert self.docker_client, f"Docker client connection missing" self.docker_client.containers.get(pid).stop() def __stop_network_agents_workers(self, logger: Logger): logger.info(f"Stopping processing workers on host: {self.host}") amount_workers = len(self.network_agents_worker_native) + len(self.network_agents_worker_docker) if not amount_workers: logger.warning(f"No active processing workers to be stopped.") for worker in self.network_agents_worker_native: self.__stop_network_agent(logger, worker.processor_name, worker.deploy_type, worker.agent_type, worker.pid) self.network_agents_worker_native = [] for worker in self.network_agents_worker_docker: self.__stop_network_agent(logger, worker.processor_name, worker.deploy_type, worker.agent_type, worker.pid) self.network_agents_worker_docker = [] def __stop_network_agents_servers(self, logger: Logger): logger.info(f"Stopping processor servers on host: {self.host}") amount_servers = len(self.network_agents_server_native) + len(self.network_agents_server_docker) if not amount_servers: logger.warning(f"No active processor servers to be stopped.") for server in self.network_agents_server_native: self.__stop_network_agent(logger, server.processor_name, server.deploy_type, server.agent_type, server.pid) self.network_agents_server_native = [] for server in self.network_agents_server_docker: self.__stop_network_agent(logger, server.processor_name, server.deploy_type, server.agent_type, server.pid) self.network_agents_server_docker = []
[docs] def stop_network_agents(self, logger: Logger): if self.needs_ssh_connector and not self.ssh_client: logger.debug("Creating missing ssh connector before stopping") self.ssh_client = self.create_connection_client(client_type="ssh") if self.needs_docker_connector and not self.docker_client: logger.debug("Creating missing docker connector before stopping") self.docker_client = self.create_connection_client(client_type="docker") self.__stop_network_agents_workers(logger=logger) self.__stop_network_agents_servers(logger=logger) if self.ssh_client: self.ssh_client.close() self.ssh_client = None if self.docker_client: self.docker_client.close() self.docker_client = None
[docs] def resolve_processor_server_url(self, processor_name: str) -> str: processor_server_url = '' for data_server in self.network_agents_server_docker: if data_server.processor_name == processor_name: processor_server_url = f"http://{self.host}:{data_server.port}/" for data_server in self.network_agents_server_native: if data_server.processor_name == processor_name: processor_server_url = f"http://{self.host}:{data_server.port}/" return processor_server_url