from __future__ import annotations
from logging import Logger
from typing import Any, Dict, List, Optional, Union
from ..constants import DOCKER_IMAGE_MONGO_DB, DOCKER_IMAGE_RABBIT_MQ, DOCKER_RABBIT_MQ_FEATURES
from ..database import verify_mongodb_available
from ..rabbitmq_utils import verify_rabbitmq_available
from .connection_clients import create_docker_client
[docs]
class DataNetworkService:
def __init__(
self, host: str, port: int, ssh_username: str, ssh_keypath: str, ssh_password: str,
cred_username: str, cred_password: str, service_url: str, skip_deployment: bool, pid: Optional[Any]
) -> None:
self.host = host
self.port = port
self.ssh_username = ssh_username
self.ssh_keypath = ssh_keypath
self.ssh_password = ssh_password
self.cred_username = cred_username
self.cred_password = cred_password
self.service_url = service_url
self.skip_deployment = skip_deployment
self.pid = pid
[docs]
@staticmethod
def deploy_docker_service(
logger: Logger, service_data: Union[DataMongoDB, DataRabbitMQ], image: str, env: Optional[List[str]],
ports_mapping: Optional[Dict], detach: bool = True, remove: bool = True
) -> None:
if not service_data or not service_data.host:
message = f"Deploying '{image}' has failed - missing service configurations."
logger.exception(message)
raise RuntimeError(message)
logger.info(f"Deploying '{image}' service on '{service_data.host}', detach={detach}, remove={remove}")
logger.info(f"Ports mapping: {ports_mapping}")
logger.info(f"Environment: {env}")
client = create_docker_client(
service_data.host, service_data.ssh_username, service_data.ssh_password, service_data.ssh_keypath
)
result = client.containers.run(image=image, detach=detach, remove=remove, ports=ports_mapping, environment=env)
if not result or not result.id:
message = f"Failed to deploy '{image}' service on host: {service_data.host}"
logger.exception(message)
raise RuntimeError(message)
service_data.pid = result.id
client.close()
[docs]
@staticmethod
def stop_docker_service(logger: Logger, service_data: Union[DataMongoDB, DataRabbitMQ]) -> None:
if not service_data.pid:
logger.warning("No running service found")
return
client = create_docker_client(
service_data.host, service_data.ssh_username, service_data.ssh_password, service_data.ssh_keypath
)
client.containers.get(service_data.pid).stop()
client.close()
[docs]
class DataMongoDB(DataNetworkService):
def __init__(
self, host: str, port: int, ssh_username: Optional[str], ssh_keypath: Optional[str],
ssh_password: Optional[str], cred_username: Optional[str], cred_password: Optional[str],
skip_deployment: bool, protocol: str = "mongodb"
) -> None:
service_url = f"{protocol}://{host}:{port}"
if cred_username and cred_password:
service_url = f"{protocol}://{cred_username}:{cred_password}@{host}:{port}"
super().__init__(
host=host, port=port, ssh_username=ssh_username, ssh_keypath=ssh_keypath, ssh_password=ssh_password,
cred_username=cred_username, cred_password=cred_password, service_url=service_url,
skip_deployment=skip_deployment, pid=None
)
[docs]
def deploy_mongodb(
self, logger: Logger, image: str = DOCKER_IMAGE_MONGO_DB, detach: bool = True, remove: bool = True,
env: Optional[List[str]] = None, ports_mapping: Optional[Dict] = None
) -> str:
if self.skip_deployment:
logger.debug("MongoDB is managed externally. Skipping deployment.")
verify_mongodb_available(self.service_url)
return self.service_url
if not env:
env = []
if self.cred_username:
env = [
f"MONGO_INITDB_ROOT_USERNAME={self.cred_username}",
f"MONGO_INITDB_ROOT_PASSWORD={self.cred_password}"
]
if not ports_mapping:
ports_mapping = {27017: self.port}
self.deploy_docker_service(logger, self, image, env, ports_mapping, detach, remove)
verify_mongodb_available(self.service_url)
mongodb_host_info = f"{self.host}:{self.port}"
logger.info(f"The MongoDB was deployed on host: {mongodb_host_info}")
return self.service_url
[docs]
def stop_service_mongodb(self, logger: Logger) -> None:
if self.skip_deployment:
return
logger.info("Stopping the MongoDB service...")
self.stop_docker_service(logger, service_data=self)
self.pid = None
logger.info("The MongoDB service is stopped")
[docs]
class DataRabbitMQ(DataNetworkService):
def __init__(
self, host: str, port: int, ssh_username: Optional[str], ssh_keypath: Optional[str],
ssh_password: Optional[str], cred_username: Optional[str], cred_password: Optional[str],
skip_deployment: bool, protocol: str = "amqp", vhost: str = "/"
) -> None:
self.vhost = f"/{vhost}" if vhost != "/" else vhost
service_url = f"{protocol}://{host}:{port}{self.vhost}"
if cred_username and cred_password:
service_url = f"{protocol}://{cred_username}:{cred_password}@{host}:{port}{self.vhost}"
super().__init__(
host=host, port=port, ssh_username=ssh_username, ssh_keypath=ssh_keypath, ssh_password=ssh_password,
cred_username=cred_username, cred_password=cred_password, service_url=service_url,
skip_deployment=skip_deployment, pid=None
)
[docs]
def deploy_rabbitmq(
self, logger: Logger, image: str = DOCKER_IMAGE_RABBIT_MQ, detach: bool = True, remove: bool = True,
env: Optional[List[str]] = None, ports_mapping: Optional[Dict] = None
) -> str:
rmq_host, rmq_port, rmq_vhost = self.host, int(self.port), self.vhost
rmq_user, rmq_password = self.cred_username, self.cred_password
if self.skip_deployment:
logger.debug(f"RabbitMQ is managed externally. Skipping deployment.")
verify_rabbitmq_available(logger=logger, rabbitmq_address=self.service_url)
return self.service_url
if not env:
env = [
# The default credentials to be used by the processing workers
f"RABBITMQ_DEFAULT_USER={rmq_user}",
f"RABBITMQ_DEFAULT_PASS={rmq_password}",
# These feature flags are required by default to use the newer version
f"RABBITMQ_FEATURE_FLAGS={DOCKER_RABBIT_MQ_FEATURES}"
]
if not ports_mapping:
# 5672, 5671 - used by AMQP 0-9-1 and AMQP 1.0 clients without and with TLS
# 15672, 15671: HTTP API clients, management UI and rabbitmq admin, without and with TLS
# 25672: used for internode and CLI tools communication and is allocated from
# a dynamic range (limited to a single port by default, computed as AMQP port + 20000)
ports_mapping = {5672: self.port, 15672: 15672, 25672: 25672}
self.deploy_docker_service(logger, self, image, env, ports_mapping, detach, remove)
verify_rabbitmq_available(logger=logger, rabbitmq_address=self.service_url)
logger.info(f"The RabbitMQ server was deployed on host: {rmq_host}:{rmq_port}{rmq_vhost}")
return self.service_url
[docs]
def stop_service_rabbitmq(self, logger: Logger) -> None:
if self.skip_deployment:
return
logger.info("Stopping the RabbitMQ service...")
self.stop_docker_service(logger, service_data=self)
self.pid = None
logger.info("The RabbitMQ service is stopped")