Source code for ocrd_network.rabbitmq_utils.ocrd_messages

from __future__ import annotations
from typing import Any, Dict, List, Optional
import yaml

from ocrd_validators import OcrdNetworkMessageValidator


[docs]class OcrdProcessingMessage: def __init__( self, job_id: str, processor_name: str, created_time: int, input_file_grps: List[str], output_file_grps: Optional[List[str]], path_to_mets: Optional[str], workspace_id: Optional[str], page_id: Optional[str], result_queue_name: Optional[str], callback_url: Optional[str], internal_callback_url: Optional[str], parameters: Dict[str, Any] = None ) -> None: if not job_id: raise ValueError('job_id must be provided') if not processor_name: raise ValueError('processor_name must be provided') if not created_time: raise ValueError('created time must be provided') if not input_file_grps or len(input_file_grps) == 0: raise ValueError('input_file_grps must be provided and contain at least 1 element') if not (workspace_id or path_to_mets): raise ValueError('Either "workspace_id" or "path_to_mets" must be provided') self.job_id = job_id self.processor_name = processor_name self.created_time = created_time self.input_file_grps = input_file_grps if output_file_grps: self.output_file_grps = output_file_grps if path_to_mets: self.path_to_mets = path_to_mets if workspace_id: self.workspace_id = workspace_id if page_id: self.page_id = page_id if result_queue_name: self.result_queue_name = result_queue_name if callback_url: self.callback_url = callback_url if internal_callback_url: self.internal_callback_url = internal_callback_url self.parameters = parameters if parameters else {}
[docs] @staticmethod def encode_yml(ocrd_processing_message: OcrdProcessingMessage) -> bytes: return yaml.dump(ocrd_processing_message.__dict__, indent=2).encode('utf-8')
[docs] @staticmethod def decode_yml(ocrd_processing_message: bytes) -> OcrdProcessingMessage: msg = ocrd_processing_message.decode('utf-8') data = yaml.safe_load(msg) report = OcrdNetworkMessageValidator.validate_message_processing(data) if not report.is_valid: raise ValueError(f'Validating the processing message has failed:\n{report.errors}') return OcrdProcessingMessage( job_id=data.get('job_id', None), processor_name=data.get('processor_name', None), created_time=data.get('created_time', None), path_to_mets=data.get('path_to_mets', None), workspace_id=data.get('workspace_id', None), input_file_grps=data.get('input_file_grps', None), output_file_grps=data.get('output_file_grps', None), page_id=data.get('page_id', None), parameters=data.get('parameters', None), result_queue_name=data.get('result_queue_name', None), callback_url=data.get('callback_url', None), internal_callback_url=data.get('internal_callback_url', None) )
[docs]class OcrdResultMessage: def __init__(self, job_id: str, state: str, path_to_mets: Optional[str] = None, workspace_id: Optional[str] = None) -> None: self.job_id = job_id self.state = state self.workspace_id = workspace_id self.path_to_mets = path_to_mets
[docs] @staticmethod def encode_yml(ocrd_result_message: OcrdResultMessage) -> bytes: return yaml.dump(ocrd_result_message.__dict__, indent=2).encode('utf-8')
[docs] @staticmethod def decode_yml(ocrd_result_message: bytes) -> OcrdResultMessage: msg = ocrd_result_message.decode('utf-8') data = yaml.safe_load(msg) report = OcrdNetworkMessageValidator.validate_message_result(data) if not report.is_valid: raise ValueError(f'Validating the result message has failed:\n{report.errors}') return OcrdResultMessage( job_id=data.get('job_id', None), state=data.get('state', None), path_to_mets=data.get('path_to_mets', None), workspace_id=data.get('workspace_id', None), )