Source code for ocrd_network.models.job

from beanie import Document
from datetime import datetime
from pydantic import BaseModel
from typing import Dict, List, Optional
from ..constants import AgentType, JobState


[docs] class PYJobInput(BaseModel): """ Wraps the parameters required to make a run-processor-request """ processor_name: Optional[str] = None path_to_mets: Optional[str] = None workspace_id: Optional[str] = None description: Optional[str] = None input_file_grps: List[str] output_file_grps: Optional[List[str]] page_id: Optional[str] = None parameters: dict = {} # Always set to empty dict when None, otherwise it fails ocr-d-validation result_queue_name: Optional[str] = None callback_url: Optional[str] = None # Used to toggle between sending requests to different network agents agent_type: AgentType = AgentType.PROCESSING_WORKER # Auto generated by the Processing Server when forwarding to the Processor Server job_id: Optional[str] = None # If set, specifies a list of job ids this job depends on depends_on: Optional[List[str]] = None
[docs] class Config: schema_extra = { 'example': { 'path_to_mets': '/path/to/mets.xml', 'description': 'The description of this execution', 'input_file_grps': ['DEFAULT'], 'output_file_grps': ['OCR-D-BIN'], 'agent_type': AgentType.PROCESSING_WORKER, 'page_id': 'PHYS_0001..PHYS_0003', 'parameters': {} } }
[docs] class PYJobOutput(BaseModel): """ Wraps output information for a job-response """ job_id: str processor_name: str state: JobState = JobState.unset path_to_mets: Optional[str] workspace_id: Optional[str] input_file_grps: List[str] output_file_grps: Optional[List[str]] page_id: Optional[str] = None log_file_path: Optional[str]
[docs] class DBProcessorJob(Document): """ Job representation in the database """ job_id: str processor_name: str path_to_mets: Optional[str] workspace_id: Optional[str] description: Optional[str] state: JobState = JobState.unset input_file_grps: List[str] output_file_grps: Optional[List[str]] page_id: Optional[str] parameters: Optional[dict] depends_on: Optional[List[str]] result_queue_name: Optional[str] callback_url: Optional[str] internal_callback_url: Optional[str] start_time: Optional[datetime] end_time: Optional[datetime] exec_time: Optional[str] log_file_path: Optional[str]
[docs] class Settings: use_enum_values = True
[docs] def to_job_output(self) -> PYJobOutput: return PYJobOutput( job_id=self.job_id, processor_name=self.processor_name, state=self.state, path_to_mets=self.path_to_mets, workspace_id=self.workspace_id, input_file_grps=self.input_file_grps, output_file_grps=self.output_file_grps, page_id=self.page_id, log_file_path=self.log_file_path )
[docs] class PYWorkflowJobOutput(BaseModel): """ Wraps output information for a workflow job-response """ # A dictionary where each entry has: # key: page_id # value: List of and processing job ids sorted in dependency order processing_job_ids: Dict[str, List[str]] page_id: str page_wise: bool = False job_id: str path_to_mets: Optional[str] workspace_id: Optional[str] description: Optional[str]
[docs] class DBWorkflowJob(Document): """ Workflow job representation in the database """ job_id: str page_id: str page_wise: bool = False # A dictionary where each entry has: # key: page_id # value: List of and processing job ids sorted in dependency order processing_job_ids: Dict path_to_mets: Optional[str] workspace_id: Optional[str] description: Optional[str] workflow_callback_url: Optional[str]
[docs] class Settings: use_enum_values = True
[docs] def to_job_output(self) -> PYWorkflowJobOutput: return PYWorkflowJobOutput( job_id=self.job_id, page_id=self.page_id, page_wise=self.page_wise, processing_job_ids=self.processing_job_ids, path_to_mets=self.path_to_mets, workspace_id=self.workspace_id, workflow_callback_url=self.workflow_callback_url )