Source code for ocrd_network.cli.client

import sys
import click
from json import dumps
from typing import List, Optional, Tuple
from urllib.parse import urlparse
from tempfile import NamedTemporaryFile

from ocrd.decorators.parameter_option import parameter_option, parameter_override_option
from ocrd_network.constants import JobState
from ocrd_utils import DEFAULT_METS_BASENAME
from ocrd_utils.introspect import set_json_key_value_overrides
from ocrd_utils.str import parse_json_string_or_file
from ..client import Client
from requests import RequestException


ADDRESS_HELP = 'The URL of the Processing Server. If not provided, ' + \
    'the "OCRD_NETWORK_SERVER_ADDR_PROCESSING" environment variable is used by default'


[docs] class URLType(click.types.StringParamType): name = "url"
[docs] def convert(self, value, param, ctx): try: parsed = urlparse(value) if parsed.scheme not in ("http", "https"): self.fail(f"invalid URL scheme ({parsed.scheme}): only HTTP allowed", param, ctx) return value except ValueError as err: self.fail(err, param, ctx)
URL = URLType() @click.group('client') def client_cli(): """ A client for interacting with the network modules. The client CLI mimics the WebAPI endpoints """ pass @client_cli.group('discovery') def discovery_cli(): """ The discovery endpoint of the WebAPI """ pass @discovery_cli.command('processors') @click.option('--address', type=URL, help=ADDRESS_HELP) def check_deployed_processors(address: Optional[str]): """ Get a list of deployed processing workers. Each processor is shown only once regardless of the amount of deployed instances. """ client = Client(server_addr_processing=address) try: processors_list = client.check_deployed_processors() except RequestException as e: print( getattr(e, 'detail_message', str(e)), f"Requested URL: {getattr(getattr(e, 'response', ''), 'url', '')}" ) sys.exit(1) print(dumps(processors_list, indent=4)) @discovery_cli.command('processor') @click.option('--address', type=URL, help=ADDRESS_HELP) @click.argument('processor_name', required=True, type=click.STRING) def check_processor_ocrd_tool(address: Optional[str], processor_name: str): """ Get the json tool of a deployed processor specified with `processor_name` """ client = Client(server_addr_processing=address) try: ocrd_tool = client.check_deployed_processor_ocrd_tool(processor_name=processor_name) except RequestException as e: print( getattr(e, 'detail_message', str(e)), f"Requested URL: {getattr(getattr(e, 'response', ''), 'url', '')}" ) sys.exit(1) print(dumps(ocrd_tool, indent=4)) @client_cli.group('processing') def processing_cli(): """ The processing endpoint of the WebAPI """ pass @processing_cli.command('check-log') @click.option('--address', type=URL, help=ADDRESS_HELP) @click.option('-j', '--processing-job-id', required=True) def check_processing_job_log(address: Optional[str], processing_job_id: str): """ Check the log of a previously submitted processing job. """ client = Client(server_addr_processing=address) try: response = client.check_job_log(job_id=processing_job_id) except RequestException as e: print( getattr(e, 'detail_message', str(e)), f"Requested URL: {getattr(getattr(e, 'response', ''), 'url', '')}" ) sys.exit(1) print(response._content.decode(encoding='utf-8')) @processing_cli.command('check-status') @click.option('--address', type=URL, help=ADDRESS_HELP) @click.option('-j', '--processing-job-id', required=True) def check_processing_job_status(address: Optional[str], processing_job_id: str): """ Check the status of a previously submitted processing job. """ client = Client(server_addr_processing=address) try: job_status = client.check_job_status(processing_job_id) except RequestException as e: print( getattr(e, 'detail_message', str(e)), f"Requested URL: {getattr(getattr(e, 'response', ''), 'url', '')}" ) sys.exit(1) print(f"Processing job status: {job_status}") @processing_cli.command('run') @click.argument('processor_name', required=True, type=click.STRING) @click.option('--address', type=URL, help=ADDRESS_HELP) @click.option('-m', '--mets', required=True, default=DEFAULT_METS_BASENAME) @click.option('-I', '--input-file-grp', default='OCR-D-INPUT') @click.option('-O', '--output-file-grp', default='OCR-D-OUTPUT') @click.option('-g', '--page-id') @parameter_option @parameter_override_option @click.option('--result-queue-name') @click.option('--callback-url') @click.option('-b', '--block', default=False, is_flag=True, help='If set, the client will block till job timeout, fail or success.') @click.option('-p', '--print-state', default=False, is_flag=True, help='If set, the client will print job states by each iteration.') def send_processing_job_request( processor_name: str, address: Optional[str], mets: str, input_file_grp: str, output_file_grp: Optional[str], page_id: Optional[str], parameter: List[str], parameter_override: List[Tuple[str, str]], result_queue_name: Optional[str], callback_url: Optional[str], block: Optional[bool], print_state: Optional[bool] ): """ Submit a processing job to the processing server. """ req_params = { "path_to_mets": mets, "description": "OCR-D Network client request", "input_file_grps": input_file_grp.split(','), } if output_file_grp: req_params["output_file_grps"] = output_file_grp.split(',') if page_id: req_params["page_id"] = page_id req_params["parameters"] = set_json_key_value_overrides(parse_json_string_or_file(*parameter), *parameter_override) if result_queue_name: req_params["result_queue_name"] = result_queue_name if callback_url: req_params["callback_url"] = callback_url client = Client(server_addr_processing=address) try: processing_job_id = client.send_processing_job_request( processor_name=processor_name, req_params=req_params) except RequestException as e: print( getattr(e, 'detail_message', str(e)), f"Requested URL: {getattr(getattr(e, 'response', ''), 'url', '')}" ) sys.exit(1) print(f"Processing job id: {processing_job_id}") if block: try: client.poll_job_status(job_id=processing_job_id, print_state=print_state) except RequestException as e: print( getattr(e, 'detail_message', str(e)), f"Requested URL: {getattr(getattr(e, 'response', ''), 'url', '')}" ) sys.exit(1) @client_cli.group('workflow') def workflow_cli(): """ The workflow endpoint of the WebAPI """ pass @workflow_cli.command('check-status') @click.option('--address', type=URL, help=ADDRESS_HELP) @click.option('-j', '--workflow-job-id', required=True) @click.option('-v', '--verbose', default=False, is_flag=True) def check_workflow_job_status(address: Optional[str], workflow_job_id: str, verbose: bool = False): """ Check the status of a previously submitted workflow job. """ client = Client(server_addr_processing=address) try: if verbose: job_status = client.check_workflow_status(workflow_job_id) else: job_status = client.check_workflow_status_simple(workflow_job_id) except RequestException as e: print( getattr(e, 'detail_message', str(e)), f"Requested URL: {getattr(getattr(e, 'response', ''), 'url', '')}" ) sys.exit(1) print(f"Workflow job status: {job_status}") @workflow_cli.command('run') @click.option('--address', type=URL, help=ADDRESS_HELP) @click.option('-m', '--path-to-mets', required=True, help="path to METS file of workspace to be processed (server-side path)") @click.option('-w', '--path-to-workflow', required=False, help="path to workflow file (server- or client-side path)") @click.option('--page-wise', is_flag=True, default=False, help="Whether to generate per-page jobs") @click.option('-b', '--block', default=False, is_flag=True, help='If set, the client will block till job timeout, fail or success.') @click.option('-p', '--print-state', default=False, is_flag=True, help='If set, the client will print job states by each iteration.') @click.argument('tasks', nargs=-1) def send_workflow_job_request( address: Optional[str], path_to_mets: str, path_to_workflow: Optional[str], page_wise: bool, block: bool, print_state: bool, tasks: List[str] ): """ Submit a workflow job to the processing server. Provide workflow either via `tasks` arguments (same syntax as in ``ocrd process`` tasks arguments), or via `-w` file path (same syntax, but newline separated). """ if (path_to_workflow) != bool(len(tasks)): raise ValueError("either -w/path-to-workflow or task argument(s) is required") client = Client(server_addr_processing=address) with NamedTemporaryFile() as workflow_file: for task in tasks: workflow_file.write((task + '\n').encode('utf-8')) workflow_file.flush() workflow_job_id = client.send_workflow_job_request( path_to_wf=path_to_workflow or workflow_file.name, path_to_mets=path_to_mets, page_wise=page_wise, ) print(f"Workflow job id: {workflow_job_id}") if block: print(f"Polling state of workflow job {workflow_job_id}") try: state = client.poll_workflow_status(job_id=workflow_job_id, print_state=print_state) except RequestException as e: print( getattr(e, 'detail_message', str(e)), f"Requested URL: {getattr(getattr(e, 'response', ''), 'url', '')}" ) sys.exit(1) if state != JobState.success: print(f"Workflow failed with {state}") exit(1) else: print("Workflow succeeded") exit(0) @client_cli.group('workspace') def workspace_cli(): """ The workspace endpoint of the WebAPI """ pass