Source code for ocrd.task_sequence

import json
from shlex import split as shlex_split
from shutil import which

from ocrd_utils import getLogger, parse_json_string_or_file, set_json_key_value_overrides, get_ocrd_tool_json
# from collections import Counter
from ocrd.processor.base import run_cli
from ocrd.resolver import Resolver
from ocrd_validators import ParameterValidator, WorkspaceValidator
from ocrd_models import ValidationReport

[docs] class ProcessorTask():
[docs] @classmethod def parse(cls, argstr): tokens = shlex_split(argstr) executable = 'ocrd-%s' % tokens.pop(0) input_file_grps = [] output_file_grps = [] parameters = {} while tokens: if tokens[0] == '-I': for grp in tokens[1].split(','): input_file_grps.append(grp) tokens = tokens[2:] elif tokens[0] == '-O': for grp in tokens[1].split(','): output_file_grps.append(grp) tokens = tokens[2:] elif tokens[0] == '-p': parameters = {**parameters, **parse_json_string_or_file(tokens[1])} tokens = tokens[2:] elif tokens[0] == '-P': set_json_key_value_overrides(parameters, tokens[1:3]) tokens = tokens[3:] else: raise ValueError("Failed parsing task description '%s' with tokens remaining: '%s'" % (argstr, tokens)) return cls(executable, input_file_grps, output_file_grps, parameters)
def __init__(self, executable, input_file_grps, output_file_grps, parameters): self.executable = executable self.input_file_grps = input_file_grps self.output_file_grps = output_file_grps self.parameters = parameters self._ocrd_tool_json = None @property def ocrd_tool_json(self): if self._ocrd_tool_json: return self._ocrd_tool_json self._ocrd_tool_json = get_ocrd_tool_json(self.executable) return self._ocrd_tool_json
[docs] def validate(self): if not which(self.executable): raise Exception("Executable not found in PATH: %s" % self.executable) if not self.input_file_grps: raise Exception("Task must have input file group") # TODO uncomment and adapt once OCR-D/spec#121 lands # # make implicit input/output groups explicit by defaulting to what is # # provided in ocrd-tool.json # actual_output_grps = [*self.ocrd_tool_json['output_file_grp']] # for i, grp in enumerate(self.output_file_grps): # actual_output_grps[i] = grp # self.output_file_grps = actual_output_grps # actual_input_grps = [*self.ocrd_tool_json['input_file_grp']] # for i, grp in enumerate(self.input_file_grps): # actual_input_grps[i] = grp # self.input_file_grps = actual_input_grps param_validator = ParameterValidator(self.ocrd_tool_json) report = param_validator.validate(self.parameters) if not report.is_valid: raise Exception(report.errors) # TODO remove once OCR-D/spec#121 lands if 'output_file_grp' in self.ocrd_tool_json and not self.output_file_grps: raise Exception("Processor requires output_file_grp but none was provided.") return report
def __str__(self): ret = '%s -I %s -O %s' % ( self.executable.replace('ocrd-', '', 1), ','.join(self.input_file_grps), ','.join(self.output_file_grps)) if self.parameters: ret += " -p '%s'" % json.dumps(self.parameters) return ret
[docs] def validate_tasks(tasks, workspace, page_id=None, overwrite=False): report = ValidationReport() prev_output_file_grps = workspace.mets.file_groups first_task = tasks[0] first_task.validate() # first task: check input/output file groups from METS WorkspaceValidator.check_file_grp(workspace, first_task.input_file_grps, '' if overwrite else first_task.output_file_grps, page_id, report) prev_output_file_grps += first_task.output_file_grps for task in tasks[1:]: task.validate() # check either existing fileGrp or output-file group of previous task matches current input_file_group for input_file_grp in task.input_file_grps: if not input_file_grp in prev_output_file_grps: report.add_error("Input file group not contained in METS or produced by previous steps: %s" % input_file_grp) if not overwrite: WorkspaceValidator.check_file_grp(workspace, [], task.output_file_grps, page_id, report) # TODO disable output_file_grps checks once CLI parameter 'overwrite' is implemented # XXX Thu Jan 16 20:14:17 CET 2020 still not sufficiently clever. # if len(prev_output_file_grps) != len(set(prev_output_file_grps)): # report.add_error("Output file group specified multiple times: %s" % # [grp for grp, count in Counter(prev_output_file_grps).items() if count >= 2]) prev_output_file_grps += task.output_file_grps if not report.is_valid: raise Exception("Invalid task sequence input/output file groups: %s" % report.errors) return report
[docs] def run_tasks(mets, log_level, page_id, task_strs, overwrite=False, mets_server_url=None): resolver = Resolver() workdir, mets, basename, _ = resolver.resolve_mets_arguments(None, mets, None) workspace = resolver.workspace_from_url(mets, workdir, mets_basename=basename, mets_server_url=mets_server_url) log = getLogger('ocrd.task_sequence.run_tasks') tasks = [ProcessorTask.parse(task_str) for task_str in task_strs] validate_tasks(tasks, workspace, page_id, overwrite) # Run the tasks for task in tasks: log.info("Start processing task '%s'", task) # execute cli returncode = run_cli( task.executable, mets, resolver, workspace, log_level=log_level, page_id=page_id, overwrite=overwrite, input_file_grp=','.join(task.input_file_grps), output_file_grp=','.join(task.output_file_grps), parameter=json.dumps(task.parameters), mets_server_url=mets_server_url ) # check return code if returncode != 0: raise Exception("%s exited with non-zero return value %s." % (task.executable, returncode)) log.info("Finished processing task '%s'", task) # reload mets if mets_server_url is None: workspace.reload_mets() # check output file groups are in mets for output_file_grp in task.output_file_grps: if not output_file_grp in workspace.mets.file_groups: raise Exception("Invalid state: expected output file group '%s' not in METS (despite processor success)" % output_file_grp)