Source code for ocrd_network.rabbitmq_utils.connector

"""
The source code in this file is adapted by reusing
some part of the source code from the official
RabbitMQ documentation.
"""
from typing import Any, Optional, Union
from pika import BasicProperties, BlockingConnection, ConnectionParameters, PlainCredentials
from pika.adapters.blocking_connection import BlockingChannel
from .constants import (
    DEFAULT_EXCHANGER_NAME,
    DEFAULT_EXCHANGER_TYPE,
    DEFAULT_QUEUE,
    DEFAULT_ROUTER,
    RABBIT_MQ_HOST,
    RABBIT_MQ_PORT,
    RABBIT_MQ_VHOST,
    PREFETCH_COUNT
)


[docs] class RMQConnector: def __init__(self, host: str = RABBIT_MQ_HOST, port: int = RABBIT_MQ_PORT, vhost: str = RABBIT_MQ_VHOST) -> None: self._host = host self._port = port self._vhost = vhost # According to the documentation, Pika blocking # connections are not thread-safe! self._connection = None self._channel = None # Should try reconnecting again self._try_reconnecting = False # If the module has been stopped with a # keyboard interruption, i.e., CTRL + C self._gracefully_stopped = False
[docs] def close_connection(self, reply_code: int = 200, reply_text: str = "Normal shutdown"): self._connection.close(reply_code=reply_code, reply_text=reply_text)
[docs] @staticmethod def declare_and_bind_defaults(connection: BlockingConnection, channel: BlockingChannel) -> None: if connection and connection.is_open: if channel and channel.is_open: # Declare the default exchange agent RMQConnector.exchange_declare( channel=channel, exchange_name=DEFAULT_EXCHANGER_NAME, exchange_type=DEFAULT_EXCHANGER_TYPE, ) # Declare the default queue RMQConnector.queue_declare(channel, queue_name=DEFAULT_QUEUE) # Bind the default queue to the default exchange RMQConnector.queue_bind( channel=channel, queue_name=DEFAULT_QUEUE, exchange_name=DEFAULT_EXCHANGER_NAME, routing_key=DEFAULT_ROUTER ) return raise ConnectionError("The channel is missing or closed.") raise ConnectionError("The connection is missing or closed.")
# Connection related methods
[docs] @staticmethod def open_blocking_connection( credentials: PlainCredentials, host: str = RABBIT_MQ_HOST, port: int = RABBIT_MQ_PORT, vhost: str = RABBIT_MQ_VHOST ) -> BlockingConnection: blocking_connection = BlockingConnection( parameters=ConnectionParameters( host=host, port=port, virtual_host=vhost, credentials=credentials, # TODO: The heartbeat should not be disabled (0)! heartbeat=0 ), ) return blocking_connection
[docs] @staticmethod def open_blocking_channel(connection: BlockingConnection) -> Union[BlockingChannel, None]: if connection and connection.is_open: channel = connection.channel() return channel raise ConnectionError("The connection is missing or closed.")
def _authenticate_and_connect(self, username: str, password: str) -> None: # Delete credentials once connected credentials = PlainCredentials(username=username, password=password, erase_on_connect=False) self._connection = RMQConnector.open_blocking_connection( host=self._host, port=self._port, vhost=self._vhost, credentials=credentials, ) self._channel = RMQConnector.open_blocking_channel(self._connection) if not self._connection: raise ConnectionError("The connection is missing or closed.") if not self._channel: raise ConnectionError("The channel is missing or closed.")
[docs] @staticmethod def exchange_bind( channel: BlockingChannel, destination_exchange: str, source_exchange: str, routing_key: str, arguments: Optional[Any] = None ) -> None: if arguments is None: arguments = {} if channel and channel.is_open: channel.exchange_bind( destination=destination_exchange, source=source_exchange, routing_key=routing_key, arguments=arguments ) return raise ConnectionError("The channel is missing or closed.")
[docs] @staticmethod def exchange_declare( channel: BlockingChannel, exchange_name: str, exchange_type: str, passive: bool = False, durable: bool = False, auto_delete: bool = False, internal: bool = False, arguments: Optional[Any] = None ) -> None: if arguments is None: arguments = {} if channel and channel.is_open: channel.exchange_declare( exchange=exchange_name, exchange_type=exchange_type, # Only check to see if the exchange exists passive=passive, # Survive a reboot of RabbitMQ durable=durable, # Remove when no more queues are bound to it auto_delete=auto_delete, # Can only be published to by other exchanges internal=internal, # Custom key/value pair arguments for the exchange arguments=arguments ) return raise ConnectionError("The channel is missing or closed.")
[docs] @staticmethod def exchange_delete(channel: BlockingChannel, exchange_name: str, if_unused: bool = False) -> None: # Deletes queue only if unused if channel and channel.is_open: channel.exchange_delete(exchange=exchange_name, if_unused=if_unused) return raise ConnectionError("The channel is missing or closed.")
[docs] @staticmethod def exchange_unbind( channel: BlockingChannel, destination_exchange: str, source_exchange: str, routing_key: str, arguments: Optional[Any] = None ) -> None: if arguments is None: arguments = {} if channel and channel.is_open: channel.exchange_unbind( destination=destination_exchange, source=source_exchange, routing_key=routing_key, arguments=arguments ) return raise ConnectionError("The channel is missing or closed.")
[docs] @staticmethod def queue_bind( channel: BlockingChannel, queue_name: str, exchange_name: str, routing_key: str, arguments: Optional[Any] = None ) -> None: if arguments is None: arguments = {} if channel and channel.is_open: channel.queue_bind(queue=queue_name, exchange=exchange_name, routing_key=routing_key, arguments=arguments) return raise ConnectionError("The channel is missing or closed.")
[docs] @staticmethod def queue_declare( channel: BlockingChannel, queue_name: str, passive: bool = False, durable: bool = False, exclusive: bool = False, auto_delete: bool = False, arguments: Optional[Any] = None ) -> None: if arguments is None: arguments = {} if channel and channel.is_open: queue = channel.queue_declare( queue=queue_name, # Only check to see if the queue exists and # raise ChannelClosed exception if it does not passive=passive, # Survive reboots of the server durable=durable, # Only allow access by the current connection exclusive=exclusive, # Delete after consumer cancels or disconnects auto_delete=auto_delete, # Custom key/value pair arguments for the queue arguments=arguments ) return queue raise ConnectionError("The channel is missing or closed.")
[docs] @staticmethod def queue_delete( channel: BlockingChannel, queue_name: str, if_unused: bool = False, if_empty: bool = False ) -> None: if channel and channel.is_open: channel.queue_delete( queue=queue_name, # Only delete if the queue is unused if_unused=if_unused, # Only delete if the queue is empty if_empty=if_empty ) return raise ConnectionError("The channel is missing or closed.")
[docs] @staticmethod def queue_purge(channel: BlockingChannel, queue_name: str) -> None: if channel and channel.is_open: channel.queue_purge(queue=queue_name)
[docs] @staticmethod def queue_unbind( channel: BlockingChannel, queue_name: str, exchange_name: str, routing_key: str, arguments: Optional[Any] = None ) -> None: if arguments is None: arguments = {} if channel and channel.is_open: channel.queue_unbind( queue=queue_name, exchange=exchange_name, routing_key=routing_key, arguments=arguments ) return raise ConnectionError("The channel is missing or closed.")
[docs] def create_queue( self, queue_name: str, exchange_name: Optional[str] = DEFAULT_EXCHANGER_NAME, exchange_type: Optional[str] = "direct", passive: bool = False ) -> None: RMQConnector.exchange_declare(channel=self._channel, exchange_name=exchange_name, exchange_type=exchange_type) RMQConnector.queue_declare(channel=self._channel, queue_name=queue_name, passive=passive) # The queue name is used as a routing key, to keep implementation simple RMQConnector.queue_bind( channel=self._channel, queue_name=queue_name, exchange_name=exchange_name, routing_key=queue_name )
[docs] @staticmethod def set_qos( channel: BlockingChannel, prefetch_size: int = 0, prefetch_count: int = PREFETCH_COUNT, global_qos: bool = False ) -> None: if channel and channel.is_open: channel.basic_qos( # No specific limit if set to 0 prefetch_size=prefetch_size, prefetch_count=prefetch_count, # Should the qos apply to all channels of the connection global_qos=global_qos ) return raise ConnectionError("The channel is missing or closed.")
[docs] @staticmethod def confirm_delivery(channel: BlockingChannel) -> None: if channel and channel.is_open: channel.confirm_delivery() return raise ConnectionError("The channel is missing or closed.")
[docs] @staticmethod def basic_publish( channel: BlockingChannel, exchange_name: str, routing_key: str, message_body: bytes, properties: BasicProperties ) -> None: if channel and channel.is_open: channel.basic_publish( exchange=exchange_name, routing_key=routing_key, body=message_body, properties=properties ) return raise ConnectionError("The channel is missing or closed.")