ocrd_network.processing_server module

class ocrd_network.processing_server.ProcessingServer(config_path: str, host: str, port: int)[source]

Bases: FastAPI

FastAPI app to make ocr-d processor calls

The Processing-Server receives calls conforming to the ocr-d webapi regarding the processing part. It can run ocrd-processors and provides endpoints to discover processors and watch the job status. The Processing-Server does not execute the processors itself but starts up a queue and a database to delegate the calls to processing workers. They are started by the Processing-Server and the communication goes through the queue.

start() None[source]

deploy agents (db, queue, workers) and start the processing server with uvicorn

async on_startup()[source]
async on_shutdown() None[source]
  • hosts and pids should be stored somewhere

  • ensure queue is empty or processor is not currently running

  • connect to hosts and kill pids

add_api_routes_others()[source]
add_api_routes_processing()[source]
add_api_routes_workflow()[source]
async forward_tcp_request_to_uds_mets_server(request: Request) Dict[source]

Forward mets-server-request

A processor calls a mets related method like add_file with ClientSideOcrdMets. This sends a request to this endpoint. This request contains all infomation neccessary to make a call to the uds-mets-server. This information is used by MetsServerProxy to make a the call to the local (local for the processing-server) reachable the uds-mets-server.

async home_page()[source]
async stop_deployed_agents() None[source]
query_ocrd_tool_json_from_server(processor_name: str) Dict[source]
async get_network_agent_ocrd_tool(processor_name: str, agent_type: AgentType = AgentType.PROCESSING_WORKER) Dict[source]
network_agent_exists_server(processor_name: str) bool[source]
network_agent_exists_worker(processor_name: str) bool[source]
validate_agent_type_and_existence(processor_name: str, agent_type: AgentType) None[source]
async validate_and_forward_job_to_network_agent(processor_name: str, data: PYJobInput) PYJobOutput[source]
async push_job_to_network_agent(data: PYJobInput, db_job: DBProcessorJob) PYJobOutput[source]
async push_job_to_processing_queue(db_job: DBProcessorJob) PYJobOutput[source]
async push_job_to_processor_server(job_input: PYJobInput) PYJobOutput[source]
async get_processor_job(job_id: str) PYJobOutput[source]
async get_processor_job_log(job_id: str) FileResponse[source]
async push_cached_jobs_to_agents(processing_jobs: List[PYJobInput]) None[source]
async remove_job_from_request_cache(result_message: PYResultMessage)[source]
async list_processors() List[str][source]
async task_sequence_to_processing_jobs(tasks: List[ProcessorTask], mets_path: str, page_id: str, agent_type: AgentType = AgentType.PROCESSING_WORKER) List[PYJobOutput][source]
validate_tasks_agents_existence(tasks: List[ProcessorTask], agent_type: AgentType) None[source]
async run_workflow(mets_path: str, workflow: UploadFile | None = File(None), workflow_id: str | None = None, agent_type: AgentType = AgentType.PROCESSING_WORKER, page_id: str | None = None, page_wise: bool = False, workflow_callback_url: str | None = None) PYWorkflowJobOutput[source]
async get_workflow_info(workflow_job_id) Dict[source]

Return list of a workflow’s processor jobs

async get_workflow_info_simple(workflow_job_id) Dict[str, JobState][source]

Simplified version of the get_workflow_info that returns a single state for the entire workflow. - If a single processing job fails, the entire workflow job status is set to FAILED. - If there are any processing jobs running, regardless of other states, such as QUEUED and CACHED, the entire workflow job status is set to RUNNING. - If all processing jobs has finished successfully, only then the workflow job status is set to SUCCESS

async upload_workflow(workflow: UploadFile) Dict[str, str][source]

Store a script for a workflow in the database

async replace_workflow(workflow_id, workflow: UploadFile) Dict[str, str][source]

Update a workflow script file in the database

async download_workflow(workflow_id) PlainTextResponse[source]

Load workflow-script from the database