Source code for ocrd_network.rabbitmq_utils.consumer

The source code in this file is adapted by reusing
some part of the source code from the official
RabbitMQ documentation.
from typing import Any, Union
from pika import PlainCredentials
from ocrd_utils import getLogger
from .constants import (
from .connector import RMQConnector

[docs]class RMQConsumer(RMQConnector): def __init__(self, host: str = HOST, port: int = PORT, vhost: str = VHOST) -> None: self.log = getLogger('ocrd_network.rabbitmq_utils.consumer') super().__init__(host=host, port=port, vhost=vhost) self.consumer_tag = None self.consuming = False self.was_consuming = False self.closing = False self.reconnect_delay = 0
[docs] def authenticate_and_connect(self, username: str, password: str) -> None: credentials = PlainCredentials( username=username, password=password, erase_on_connect=False # Delete credentials once connected ) self._connection = RMQConnector.open_blocking_connection( host=self._host, port=self._port, vhost=self._vhost, credentials=credentials, ) self._channel = RMQConnector.open_blocking_channel(self._connection) RMQConnector.set_qos(self._channel)"Set QoS for the consumer")
[docs] def setup_defaults(self) -> None: RMQConnector.declare_and_bind_defaults(self._connection, self._channel)
[docs] def get_one_message( self, queue_name: str, auto_ack: bool = False ) -> Union[Any, None]: message = None if self._channel and self._channel.is_open: message = self._channel.basic_get( queue=queue_name, auto_ack=auto_ack ) return message
[docs] def configure_consuming( self, queue_name: str, callback_method: Any ) -> None: self.log.debug(f'Configuring consuming from queue: {queue_name}') self._channel.add_on_cancel_callback(self.__on_consumer_cancelled) self.consumer_tag = self._channel.basic_consume( queue_name, callback_method ) self.was_consuming = True self.consuming = True
[docs] def start_consuming(self) -> None: if self._channel and self._channel.is_open: self._channel.start_consuming()
[docs] def get_waiting_message_count(self) -> Union[int, None]: if self._channel and self._channel.is_open: return self._channel.get_waiting_message_count() return None
def __on_consumer_cancelled(self, frame: Any) -> None: self.log.warning(f'The consumer was cancelled remotely in frame: {frame}') if self._channel: self._channel.close()
[docs] def ack_message(self, delivery_tag: int) -> None: self.log.debug(f'Acknowledging message with delivery tag: {delivery_tag}') self._channel.basic_ack(delivery_tag)