from __future__ import annotations
from typing import Any, Dict, List, Optional
from yaml import dump, safe_load
from ocrd_validators import OcrdNetworkMessageValidator
[docs]
class OcrdProcessingMessage:
def __init__(
self, job_id: str, processor_name: str, created_time: int, input_file_grps: List[str],
output_file_grps: Optional[List[str]], path_to_mets: Optional[str], workspace_id: Optional[str],
page_id: Optional[str], result_queue_name: Optional[str], callback_url: Optional[str],
internal_callback_url: Optional[str], parameters: Dict[str, Any] = None
) -> None:
if not job_id:
raise ValueError("job_id must be provided")
if not processor_name:
raise ValueError("processor_name must be provided")
if not created_time:
raise ValueError("created time must be provided")
if not input_file_grps or len(input_file_grps) == 0:
raise ValueError("input_file_grps must be provided and contain at least 1 element")
if not (workspace_id or path_to_mets):
raise ValueError("Either 'workspace_id' or 'path_to_mets' must be provided")
self.job_id = job_id
self.processor_name = processor_name
self.created_time = created_time
self.input_file_grps = input_file_grps
if output_file_grps:
self.output_file_grps = output_file_grps
if path_to_mets:
self.path_to_mets = path_to_mets
if workspace_id:
self.workspace_id = workspace_id
if page_id:
self.page_id = page_id
if result_queue_name:
self.result_queue_name = result_queue_name
if callback_url:
self.callback_url = callback_url
if internal_callback_url:
self.internal_callback_url = internal_callback_url
self.parameters = parameters if parameters else {}
[docs]
@staticmethod
def encode_yml(ocrd_processing_message: OcrdProcessingMessage, encode_type: str = "utf-8") -> bytes:
return dump(ocrd_processing_message.__dict__, indent=2).encode(encode_type)
[docs]
@staticmethod
def decode_yml(ocrd_processing_message: bytes, decode_type: str = "utf-8") -> OcrdProcessingMessage:
msg = ocrd_processing_message.decode(decode_type)
data = safe_load(msg)
report = OcrdNetworkMessageValidator.validate_message_processing(data)
if not report.is_valid:
raise ValueError(f"Validating the processing message has failed:\n{report.errors}")
return OcrdProcessingMessage(
job_id=data.get("job_id", None),
processor_name=data.get("processor_name", None),
created_time=data.get("created_time", None),
path_to_mets=data.get("path_to_mets", None),
workspace_id=data.get("workspace_id", None),
input_file_grps=data.get("input_file_grps", None),
output_file_grps=data.get("output_file_grps", None),
page_id=data.get("page_id", None),
parameters=data.get("parameters", None),
result_queue_name=data.get("result_queue_name", None),
callback_url=data.get("callback_url", None),
internal_callback_url=data.get("internal_callback_url", None)
)
[docs]
class OcrdResultMessage:
def __init__(self, job_id: str, state: str, path_to_mets: Optional[str], workspace_id: Optional[str] = '') -> None:
self.job_id = job_id
self.state = state
self.workspace_id = workspace_id
self.path_to_mets = path_to_mets
[docs]
@staticmethod
def encode_yml(ocrd_result_message: OcrdResultMessage, encode_type: str = "utf-8") -> bytes:
return dump(ocrd_result_message.__dict__, indent=2).encode(encode_type)
[docs]
@staticmethod
def decode_yml(ocrd_result_message: bytes, decode_type: str = "utf-8") -> OcrdResultMessage:
msg = ocrd_result_message.decode(decode_type)
data = safe_load(msg)
report = OcrdNetworkMessageValidator.validate_message_result(data)
if not report.is_valid:
raise ValueError(f"Validating the result message has failed:\n{report.errors}")
return OcrdResultMessage(
job_id=data.get("job_id", None),
state=data.get("state", None),
path_to_mets=data.get("path_to_mets", None),
workspace_id=data.get("workspace_id", ''),
)