Source code for ocrd.processor.helpers

"""
Helper methods for running and documenting processors
"""
from time import perf_counter, process_time
from os import times
from functools import lru_cache
import json
import inspect
from subprocess import run
from typing import List, Optional

from ..workspace import Workspace
from ocrd_utils import freeze_args, getLogger, config, setOverrideLogLevel, getLevelName, sparkline


__all__ = [
    'run_cli',
    'run_processor'
]


def _get_workspace(workspace=None, resolver=None, mets_url=None, working_dir=None, mets_server_url=None):
    if workspace is None:
        if resolver is None:
            raise Exception("Need to pass a resolver to create a workspace")
        if mets_url is None:
            raise Exception("Need to pass mets_url to create a workspace")
        workspace = resolver.workspace_from_url(mets_url, dst_dir=working_dir, mets_server_url=mets_server_url)
    return workspace

[docs] def run_processor( processorClass, mets_url=None, resolver=None, workspace=None, page_id=None, log_level=None, input_file_grp=None, output_file_grp=None, parameter=None, working_dir=None, mets_server_url=None, instance_caching=False ): # pylint: disable=too-many-locals """ Instantiate a Pythonic processor, open a workspace, run the processor and save the workspace. If :py:attr:`workspace` is not none, reuse that. Otherwise, instantiate an :py:class:`~ocrd.Workspace` for :py:attr:`mets_url` (and :py:attr:`working_dir`) by using :py:meth:`ocrd.Resolver.workspace_from_url` (i.e. open or clone local workspace). Instantiate a Python object for :py:attr:`processorClass`, passing: - the workspace, - :py:attr:`page_id` - :py:attr:`input_file_grp` - :py:attr:`output_file_grp` - :py:attr:`parameter` (after applying any :py:attr:`parameter_override` settings) Warning: Avoid setting the `instance_caching` flag to True. It may have unexpected side effects. This flag is used for an experimental feature we would like to adopt in future. Run the processor on the workspace (creating output files in the filesystem). Finally, write back the workspace (updating the METS in the filesystem). Args: processorClass (object): Python class of the module processor. """ if log_level: setOverrideLogLevel(log_level) workspace = _get_workspace( workspace, resolver, mets_url, working_dir, mets_server_url ) log = getLogger('ocrd.processor.helpers.run_processor') log.debug("Running processor %s", processorClass) processor = get_processor( processorClass, parameter=parameter, workspace=None, page_id=page_id, input_file_grp=input_file_grp, output_file_grp=output_file_grp, instance_caching=instance_caching ) ocrd_tool = processor.ocrd_tool name = '%s v%s' % (ocrd_tool['executable'], processor.version) otherrole = ocrd_tool.get('steps', [''])[0] logProfile = getLogger('ocrd.process.profile') log.debug("Processor instance %s (%s doing %s)", processor, name, otherrole) t0_wall = perf_counter() t0_cpu = process_time() t0_os = times() if any(x in config.OCRD_PROFILE for x in ['RSS', 'PSS']): backend = 'psutil_pss' if 'PSS' in config.OCRD_PROFILE else 'psutil' from memory_profiler import memory_usage # pylint: disable=import-outside-toplevel try: mem_usage = memory_usage(proc=(processor.process_workspace, [workspace], {}), # only run process once max_iterations=1, interval=.1, timeout=None, timestamps=True, # include sub-processes multiprocess=True, include_children=True, # get proportional set size instead of RSS backend=backend) except Exception as err: log.exception("Failure in processor '%s'" % ocrd_tool['executable']) raise err mem_usage_values = [mem for mem, _ in mem_usage] mem_output = 'memory consumption: ' mem_output += sparkline(mem_usage_values) mem_output += ' max: %.2f MiB min: %.2f MiB' % (max(mem_usage_values), min(mem_usage_values)) logProfile.info(mem_output) else: try: processor.process_workspace(workspace) except Exception as err: log.exception("Failure in processor '%s'" % ocrd_tool['executable']) raise err t1_wall = perf_counter() - t0_wall t1_cpu = process_time() - t0_cpu t1_os = times() # add CPU time from child processes (page worker etc) t1_cpu += t1_os.children_user - t0_os.children_user t1_cpu += t1_os.children_system - t0_os.children_system logProfile.info( "Executing processor '%s' took %fs (wall) %fs (CPU)( " "[--input-file-grp='%s' --output-file-grp='%s' --parameter='%s' --page-id='%s']", ocrd_tool['executable'], t1_wall, t1_cpu, processor.input_file_grp or '', processor.output_file_grp or '', json.dumps(processor.parameter) or '', processor.page_id or '' ) workspace.mets.add_agent( name=name, _type='OTHER', othertype='SOFTWARE', role='OTHER', otherrole=otherrole, notes=[({'option': 'input-file-grp'}, processor.input_file_grp or ''), ({'option': 'output-file-grp'}, processor.output_file_grp or ''), ({'option': 'parameter'}, json.dumps(processor.parameter or '')), ({'option': 'page-id'}, processor.page_id or '')] ) workspace.save_mets() return processor
[docs] def run_cli( executable, mets_url=None, resolver=None, workspace=None, page_id=None, overwrite=None, debug=None, log_level=None, log_filename=None, input_file_grp=None, output_file_grp=None, parameter=None, working_dir=None, mets_server_url=None, ): """ Open a workspace and run a processor on the command line. If :py:attr:`workspace` is not none, reuse that. Otherwise, instantiate an :py:class:`~ocrd.Workspace` for :py:attr:`mets_url` (and :py:attr:`working_dir`) by using :py:meth:`ocrd.Resolver.workspace_from_url` (i.e. open or clone local workspace). Run the processor CLI :py:attr:`executable` on the workspace, passing: - the workspace, - :py:attr:`page_id` - :py:attr:`input_file_grp` - :py:attr:`output_file_grp` - :py:attr:`parameter` (after applying any :py:attr:`parameter_override` settings) (Will create output files and update the in the filesystem). Args: executable (string): Executable name of the module processor. """ workspace = _get_workspace(workspace, resolver, mets_url, working_dir) args = [executable, '--working-dir', workspace.directory] args += ['--mets', mets_url] if log_level: args += ['--log-level', log_level if isinstance(log_level, str) else getLevelName(log_level)] if page_id: args += ['--page-id', page_id] if input_file_grp: args += ['--input-file-grp', input_file_grp] if output_file_grp: args += ['--output-file-grp', output_file_grp] if parameter: args += ['--parameter', parameter] if overwrite: args += ['--overwrite'] if debug: args += ['--debug'] if mets_server_url: args += ['--mets-server-url', mets_server_url] log = getLogger('ocrd.processor.helpers.run_cli') log.debug("Running subprocess '%s'", ' '.join(args)) if not log_filename: result = run(args, check=False) else: with open(log_filename, 'a', encoding='utf-8') as file_desc: result = run(args, check=False, stdout=file_desc, stderr=file_desc) return result.returncode
# not decorated here but at runtime (on first use) #@freeze_args #@lru_cache(maxsize=config.OCRD_MAX_PROCESSOR_CACHE) def get_cached_processor(parameter: dict, processor_class): """ Call this function to get back an instance of a processor. The results are cached based on the parameters. Args: parameter (dict): a dictionary of parameters. processor_class: the concrete `:py:class:~ocrd.Processor` class. Returns: When the concrete class of the processor is unknown, `None` is returned. Otherwise, an instance of the `:py:class:~ocrd.Processor` is returned. """ if processor_class: processor = processor_class(None, parameter=dict(parameter)) return processor return None def get_processor( processor_class, parameter: Optional[dict] = None, workspace: Optional[Workspace] = None, page_id: Optional[str] = None, input_file_grp: Optional[List[str]] = None, output_file_grp: Optional[List[str]] = None, instance_caching: bool = False, ): if processor_class: if parameter is None: parameter = {} if instance_caching: global get_cached_processor if not hasattr(get_cached_processor, '__wrapped__'): # first call: wrap if processor_class.max_instances < 0: maxsize = config.OCRD_MAX_PROCESSOR_CACHE else: maxsize = min(config.OCRD_MAX_PROCESSOR_CACHE, processor_class.max_instances) # wrapping in call cache # wrapping dict into frozendict (from https://github.com/OCR-D/core/pull/884) get_cached_processor = freeze_args(lru_cache(maxsize=maxsize)(get_cached_processor)) processor = get_cached_processor(parameter, processor_class) else: # avoid passing workspace already (deprecated chdir behaviour) processor = processor_class(None, parameter=parameter) assert processor # set current processing parameters processor.workspace = workspace processor.page_id = page_id processor.input_file_grp = input_file_grp processor.output_file_grp = output_file_grp return processor raise ValueError("Processor class is not known")