Source code for ocrd_network.rabbitmq_utils.connector

"""
The source code in this file is adapted by reusing
some part of the source code from the official
RabbitMQ documentation.
"""
from typing import Any, Optional, Union
from pika import BasicProperties, BlockingConnection, ConnectionParameters, PlainCredentials
from pika.adapters.blocking_connection import BlockingChannel
from .constants import (
    DEFAULT_EXCHANGER_NAME,
    DEFAULT_EXCHANGER_TYPE,
    DEFAULT_QUEUE,
    DEFAULT_ROUTER,
    RABBIT_MQ_HOST as HOST,
    RABBIT_MQ_PORT as PORT,
    RABBIT_MQ_VHOST as VHOST,
    PREFETCH_COUNT
)


[docs]class RMQConnector: def __init__(self, host: str = HOST, port: int = PORT, vhost: str = VHOST) -> None: self._host = host self._port = port self._vhost = vhost # According to the documentation, Pika blocking # connections are not thread-safe! self._connection = None self._channel = None # Should try reconnecting again self._try_reconnecting = False # If the module has been stopped with a # keyboard interruption, i.e., CTRL + C self._gracefully_stopped = False
[docs] @staticmethod def declare_and_bind_defaults(connection: BlockingConnection, channel: BlockingChannel) -> None: if connection and connection.is_open: if channel and channel.is_open: # Declare the default exchange agent RMQConnector.exchange_declare( channel=channel, exchange_name=DEFAULT_EXCHANGER_NAME, exchange_type=DEFAULT_EXCHANGER_TYPE, ) # Declare the default queue RMQConnector.queue_declare(channel, queue_name=DEFAULT_QUEUE) # Bind the default queue to the default exchange RMQConnector.queue_bind( channel, queue_name=DEFAULT_QUEUE, exchange_name=DEFAULT_EXCHANGER_NAME, routing_key=DEFAULT_ROUTER )
# Connection related methods
[docs] @staticmethod def open_blocking_connection( credentials: PlainCredentials, host: str = HOST, port: int = PORT, vhost: str = VHOST ) -> BlockingConnection: blocking_connection = BlockingConnection( parameters=ConnectionParameters( host=host, port=port, virtual_host=vhost, credentials=credentials, # TODO: The heartbeat should not be disabled (0)! heartbeat=0 ), ) return blocking_connection
[docs] @staticmethod def open_blocking_channel(connection: BlockingConnection) -> Union[BlockingChannel, None]: if connection and connection.is_open: channel = connection.channel() return channel return None
[docs] @staticmethod def exchange_bind( channel: BlockingChannel, destination_exchange: str, source_exchange: str, routing_key: str, arguments: Optional[Any] = None ) -> None: if arguments is None: arguments = {} if channel and channel.is_open: channel.exchange_bind( destination=destination_exchange, source=source_exchange, routing_key=routing_key, arguments=arguments )
[docs] @staticmethod def exchange_declare( channel: BlockingChannel, exchange_name: str, exchange_type: str, passive: bool = False, durable: bool = False, auto_delete: bool = False, internal: bool = False, arguments: Optional[Any] = None ) -> None: if arguments is None: arguments = {} if channel and channel.is_open: exchange = channel.exchange_declare( exchange=exchange_name, exchange_type=exchange_type, # Only check to see if the exchange exists passive=passive, # Survive a reboot of RabbitMQ durable=durable, # Remove when no more queues are bound to it auto_delete=auto_delete, # Can only be published to by other exchanges internal=internal, # Custom key/value pair arguments for the exchange arguments=arguments ) return exchange
[docs] @staticmethod def exchange_delete( channel: BlockingChannel, exchange_name: str, if_unused: bool = False ) -> None: # Deletes queue only if unused if channel and channel.is_open: channel.exchange_delete(exchange=exchange_name, if_unused=if_unused)
[docs] @staticmethod def exchange_unbind( channel: BlockingChannel, destination_exchange: str, source_exchange: str, routing_key: str, arguments: Optional[Any] = None ) -> None: if arguments is None: arguments = {} if channel and channel.is_open: channel.exchange_unbind( destination=destination_exchange, source=source_exchange, routing_key=routing_key, arguments=arguments )
[docs] @staticmethod def queue_bind( channel: BlockingChannel, queue_name: str, exchange_name: str, routing_key: str, arguments: Optional[Any] = None ) -> None: if arguments is None: arguments = {} if channel and channel.is_open: channel.queue_bind(queue=queue_name, exchange=exchange_name, routing_key=routing_key, arguments=arguments)
[docs] @staticmethod def queue_declare( channel: BlockingChannel, queue_name: str, passive: bool = False, durable: bool = False, exclusive: bool = False, auto_delete: bool = False, arguments: Optional[Any] = None ) -> None: if arguments is None: arguments = {} if channel and channel.is_open: queue = channel.queue_declare( queue=queue_name, # Only check to see if the queue exists and # raise ChannelClosed exception if it does not passive=passive, # Survive reboots of the server durable=durable, # Only allow access by the current connection exclusive=exclusive, # Delete after consumer cancels or disconnects auto_delete=auto_delete, # Custom key/value pair arguments for the queue arguments=arguments ) return queue
[docs] @staticmethod def queue_delete( channel: BlockingChannel, queue_name: str, if_unused: bool = False, if_empty: bool = False ) -> None: if channel and channel.is_open: channel.queue_delete( queue=queue_name, # Only delete if the queue is unused if_unused=if_unused, # Only delete if the queue is empty if_empty=if_empty )
[docs] @staticmethod def queue_purge(channel: BlockingChannel, queue_name: str) -> None: if channel and channel.is_open: channel.queue_purge(queue=queue_name)
[docs] @staticmethod def queue_unbind( channel: BlockingChannel, queue_name: str, exchange_name: str, routing_key: str, arguments: Optional[Any] = None ) -> None: if arguments is None: arguments = {} if channel and channel.is_open: channel.queue_unbind( queue=queue_name, exchange=exchange_name, routing_key=routing_key, arguments=arguments )
[docs] @staticmethod def set_qos( channel: BlockingChannel, prefetch_size: int = 0, prefetch_count: int = PREFETCH_COUNT, global_qos: bool = False ) -> None: if channel and channel.is_open: channel.basic_qos( # No specific limit if set to 0 prefetch_size=prefetch_size, prefetch_count=prefetch_count, # Should the qos apply to all channels of the connection global_qos=global_qos )
[docs] @staticmethod def confirm_delivery(channel: BlockingChannel) -> None: if channel and channel.is_open: channel.confirm_delivery()
[docs] @staticmethod def basic_publish( channel: BlockingChannel, exchange_name: str, routing_key: str, message_body: bytes, properties: BasicProperties ) -> None: if channel and channel.is_open: channel.basic_publish( exchange=exchange_name, routing_key=routing_key, body=message_body, properties=properties )