Source code for ocrd_network.runtime_data.hosts

from logging import Logger
from typing import Dict, List, Optional

from docker import APIClient
from paramiko import SSHClient

from ..constants import RESOURCE_MANAGER_SERVER_PORT
from .connection_clients import CustomDockerClient, create_docker_client, create_ssh_client
from .network_agents import (
    DataProcessingWorker, DeployType, deploy_agent_native_get_pid_hack)


[docs] class DataHost: def __init__( self, host: str, username: str, password: str, keypath: str, workers: List[Dict], servers: List[Dict] ) -> None: self.host = host self.resource_manager_port = RESOURCE_MANAGER_SERVER_PORT self.resource_manager_pid = None self.username = username self.password = password self.keypath = keypath # These flags are used to track whether a connection of the specified # type should be created based on the received config file self.needs_ssh_connector: bool = False self.needs_docker_connector: bool = False # Connection clients, ssh for native deployment, docker for docker deployment self.ssh_client = None self.docker_client: Optional[CustomDockerClient] = None # Lists of network agents based on their agent and deployment type self.workers_native: List[DataProcessingWorker] = [] self.workers_docker: List[DataProcessingWorker] = [] if not workers: workers = [] if not servers: servers = [] self.__parse_workers(processing_workers=workers) def __append_workers_to_lists(self, worker_data: DataProcessingWorker) -> None: if worker_data.deploy_type != DeployType.DOCKER and worker_data.deploy_type != DeployType.NATIVE: raise ValueError(f"Processing Worker deploy type is unknown: {worker_data.deploy_type}") if worker_data.deploy_type == DeployType.NATIVE: self.needs_ssh_connector = True self.workers_native.append(worker_data) if worker_data.deploy_type == DeployType.DOCKER: self.needs_docker_connector = True self.workers_docker.append(worker_data) def __parse_workers(self, processing_workers: List[Dict]): for worker in processing_workers: worker_data = DataProcessingWorker( processor_name=worker["name"], deploy_type=worker.get("deploy_type", "native"), host=self.host, init_by_config=True, pid=None ) for _ in range(int(worker["number_of_instance"])): self.__append_workers_to_lists(worker_data=worker_data)
[docs] def create_connection_client(self, client_type: str): if client_type not in ["docker", "ssh"]: raise ValueError(f"Host client type cannot be of type: {client_type}") if client_type == "ssh": self.ssh_client = create_ssh_client(self.host, self.username, self.password, self.keypath) return self.ssh_client if client_type == "docker": self.docker_client = create_docker_client(self.host, self.username, self.password, self.keypath) return self.docker_client
def __deploy_network_agent_resource_manager_server(self, logger: Logger): logger.info(f"Deploying resource manager server on host: {self.host}:{self.resource_manager_port}") start_cmd = f"ocrd network resmgr-server --address {self.host}:{self.resource_manager_port} &" pid = deploy_agent_native_get_pid_hack(logger, self.ssh_client, start_cmd) logger.info(f"Deployed: OCR-D Resource Manager Server [{pid}]: {self.host}:{self.resource_manager_port}") self.resource_manager_pid = pid def __deploy_single_worker( self, logger: Logger, worker_data: DataProcessingWorker, mongodb_url: str, rabbitmq_url: str ) -> None: deploy_type = worker_data.deploy_type name = worker_data.processor_name worker_info = f"Processing Worker, deploy: {deploy_type}, name: {name}, host: {self.host}" logger.info(f"Deploying {worker_info}") connection_client = None if deploy_type == DeployType.NATIVE: assert self.ssh_client, "SSH client connection missing." connection_client = self.ssh_client if deploy_type == DeployType.DOCKER: assert self.docker_client, "Docker client connection missing." connection_client = self.docker_client worker_data.deploy_network_agent(logger, connection_client, mongodb_url, rabbitmq_url) def __deploy_all_workers(self, logger: Logger, mongodb_url: str, rabbitmq_url: str): logger.info(f"Deploying processing workers on host: {self.host}") amount_workers = len(self.workers_native) + len(self.workers_docker) if not amount_workers: logger.info("No processing workers found to be deployed") for data_worker in self.workers_native: self.__deploy_single_worker(logger, data_worker, mongodb_url, rabbitmq_url) logger.info(f"Deployed: {data_worker}") for data_worker in self.workers_docker: self.__deploy_single_worker(logger, data_worker, mongodb_url, rabbitmq_url) logger.info(f"Deployed: {data_worker}")
[docs] def deploy_workers(self, logger: Logger, mongodb_url: str, rabbitmq_url: str) -> None: if self.needs_ssh_connector and not self.ssh_client: logger.debug("Creating missing ssh connector before deploying") client = self.create_connection_client(client_type="ssh") assert isinstance(client, SSHClient) self.ssh_client = client if self.needs_docker_connector: logger.debug("Creating missing docker connector before deploying") client = self.create_connection_client(client_type="docker") assert isinstance(client, CustomDockerClient) self.docker_client = client self.__deploy_network_agent_resource_manager_server(logger) self.__deploy_all_workers(logger=logger, mongodb_url=mongodb_url, rabbitmq_url=rabbitmq_url) if self.ssh_client: self.ssh_client.close() self.ssh_client = None if self.docker_client: self.docker_client.close() self.docker_client = None
def __stop_network_agent_resource_manager_server(self, logger: Logger): logger.info(f"Stopping OCR-D Resource Manager Server [{self.resource_manager_pid}]: " f"{self.host}:{self.resource_manager_port}") assert self.ssh_client, "SSH client connection missing" self.ssh_client.exec_command(f"kill {self.resource_manager_pid}") def __stop_worker(self, logger: Logger, name: str, deploy_type: DeployType, pid: str): worker_info = f"Processing Worker: deploy: {deploy_type}, name: {name}" if not pid: logger.warning(f"No pid was passed for {worker_info}") return worker_info += f", pid: {pid}" logger.info(f"Stopping {worker_info}") if deploy_type == DeployType.NATIVE: assert self.ssh_client, "SSH client connection missing" self.ssh_client.exec_command(f"kill {pid}") if deploy_type == DeployType.DOCKER: assert self.docker_client, "Docker client connection missing" self.docker_client.containers.get(pid).stop()
[docs] def stop_workers(self, logger: Logger): if self.needs_ssh_connector and not self.ssh_client: logger.debug("Creating missing ssh connector before stopping") client = self.create_connection_client(client_type="ssh") assert isinstance(client, SSHClient) self.ssh_client = client if self.needs_docker_connector and not self.docker_client: logger.debug("Creating missing docker connector before stopping") client = self.create_connection_client(client_type="docker") assert isinstance(client, CustomDockerClient) self.docker_client = client self.__stop_network_agent_resource_manager_server(logger=logger) logger.info(f"Stopping processing workers on host: {self.host}") amount_workers = len(self.workers_native) + len(self.workers_docker) if not amount_workers: logger.warning("No active processing workers to be stopped.") for worker in self.workers_native: self.__stop_worker(logger, worker.processor_name, worker.deploy_type, worker.pid) self.workers_native = [] for worker in self.workers_docker: self.__stop_worker(logger, worker.processor_name, worker.deploy_type, worker.pid) self.workers_docker = [] if self.ssh_client: self.ssh_client.close() self.ssh_client = None if self.docker_client: self.docker_client.close() self.docker_client = None