Source code for ocrd_network.client
from typing import Optional
from ocrd_utils import config, getLogger, LOG_FORMAT
from .client_utils import (
get_ps_deployed_processors,
get_ps_deployed_processor_ocrd_tool,
get_ps_processing_job_log,
get_ps_processing_job_status,
get_ps_workflow_job_status,
poll_job_status_till_timeout_fail_or_success,
poll_wf_status_till_timeout_fail_or_success,
post_ps_processing_request,
post_ps_workflow_request,
verify_server_protocol
)
[docs]
class Client:
def __init__(
self,
server_addr_processing: Optional[str],
timeout: int = config.OCRD_NETWORK_CLIENT_POLLING_TIMEOUT,
wait: int = config.OCRD_NETWORK_CLIENT_POLLING_SLEEP
):
self.log = getLogger(f"ocrd_network.client")
if not server_addr_processing:
server_addr_processing = config.OCRD_NETWORK_SERVER_ADDR_PROCESSING
self.server_addr_processing = server_addr_processing
verify_server_protocol(self.server_addr_processing)
self.polling_timeout = timeout
self.polling_wait = wait
self.polling_tries = int(timeout / wait)
[docs]
def check_deployed_processors(self):
return get_ps_deployed_processors(ps_server_host=self.server_addr_processing)
[docs]
def check_job_log(self, job_id: str):
return get_ps_processing_job_log(self.server_addr_processing, processing_job_id=job_id)
[docs]
def check_job_status(self, job_id: str):
return get_ps_processing_job_status(self.server_addr_processing, processing_job_id=job_id)
[docs]
def check_workflow_status(self, workflow_job_id: str):
return get_ps_workflow_job_status(self.server_addr_processing, workflow_job_id=workflow_job_id)
[docs]
def poll_job_status(self, job_id: str) -> str:
return poll_job_status_till_timeout_fail_or_success(
ps_server_host=self.server_addr_processing, job_id=job_id, tries=self.polling_tries, wait=self.polling_wait)
[docs]
def poll_workflow_status(self, job_id: str) -> str:
return poll_wf_status_till_timeout_fail_or_success(
ps_server_host=self.server_addr_processing, job_id=job_id, tries=self.polling_tries, wait=self.polling_wait)
[docs]
def send_processing_job_request(self, processor_name: str, req_params: dict) -> str:
return post_ps_processing_request(
ps_server_host=self.server_addr_processing, processor=processor_name, job_input=req_params)
[docs]
def send_workflow_job_request(self, path_to_wf: str, path_to_mets: str):
return post_ps_workflow_request(
ps_server_host=self.server_addr_processing, path_to_wf=path_to_wf, path_to_mets=path_to_mets)