Source code for ocrd_network.models.job
from datetime import datetime
from enum import Enum
from typing import Dict, List, Optional
from beanie import Document
from pydantic import BaseModel
[docs]class StateEnum(str, Enum):
# The processing job is cached inside the Processing Server requests cache
cached = 'CACHED'
# The processing job was cancelled due to failed dependencies
cancelled = 'CANCELLED'
# The processing job is queued inside the RabbitMQ
queued = 'QUEUED'
# Processing job is currently running in a Worker or Processor Server
running = 'RUNNING'
# Processing job finished successfully
success = 'SUCCESS'
# Processing job failed
failed = 'FAILED'
[docs]class PYJobOutput(BaseModel):
""" Wraps output information for a job-response
"""
job_id: str
processor_name: str
state: StateEnum
path_to_mets: Optional[str]
workspace_id: Optional[str]
input_file_grps: List[str]
output_file_grps: Optional[List[str]]
page_id: Optional[str] = None
log_file_path: Optional[str]
[docs]class DBProcessorJob(Document):
""" Job representation in the database
"""
job_id: str
processor_name: str
path_to_mets: Optional[str]
workspace_id: Optional[str]
description: Optional[str]
state: StateEnum
input_file_grps: List[str]
output_file_grps: Optional[List[str]]
page_id: Optional[str]
parameters: Optional[dict]
depends_on: Optional[List[str]]
result_queue_name: Optional[str]
callback_url: Optional[str]
internal_callback_url: Optional[str]
start_time: Optional[datetime]
end_time: Optional[datetime]
exec_time: Optional[str]
log_file_path: Optional[str]
[docs] class Settings:
use_enum_values = True
[docs] def to_job_output(self) -> PYJobOutput:
return PYJobOutput(
job_id=self.job_id,
processor_name=self.processor_name,
state=self.state,
path_to_mets=self.path_to_mets,
workspace_id=self.workspace_id,
input_file_grps=self.input_file_grps,
output_file_grps=self.output_file_grps,
page_id=self.page_id,
log_file_path=self.log_file_path
)
[docs]class PYWorkflowJobOutput(BaseModel):
""" Wraps output information for a workflow job-response
"""
job_id: str
page_id: str
page_wise: bool = False
# A dictionary where each entry has:
# key: page_id
# value: List of and processing job ids sorted in dependency order
processing_job_ids: Dict
path_to_mets: Optional[str]
workspace_id: Optional[str]
description: Optional[str]
[docs]class DBWorkflowJob(Document):
""" Workflow job representation in the database
"""
job_id: str
page_id: str
page_wise: bool = False
# A dictionary where each entry has:
# key: page_id
# value: List of and processing job ids sorted in dependency order
processing_job_ids: Dict
path_to_mets: Optional[str]
workspace_id: Optional[str]
description: Optional[str]
workflow_callback_url: Optional[str]
[docs] class Settings:
use_enum_values = True
[docs] def to_job_output(self) -> PYWorkflowJobOutput:
return PYWorkflowJobOutput(
job_id=self.job_id,
page_id=self.page_id,
page_wise=self.page_wise,
processing_job_ids=self.processing_job_ids,
path_to_mets=self.path_to_mets,
workspace_id=self.workspace_id,
workflow_callback_url=self.workflow_callback_url
)