import io
from os import makedirs, unlink, listdir, path
from pathlib import Path
from shutil import move, copyfileobj
from re import sub
from tempfile import NamedTemporaryFile
from contextlib import contextmanager
from cv2 import COLOR_GRAY2BGR, COLOR_RGB2BGR, cvtColor
from PIL import Image
import numpy as np
from deprecated.sphinx import deprecated
import requests
from ocrd_models import OcrdMets, OcrdFile
from ocrd_models.ocrd_page import parse, BorderType, to_xml
from ocrd_modelfactory import exif_from_filename, page_from_file
from ocrd_utils import (
atomic_write,
getLogger,
image_from_polygon,
coordinates_of_segment,
adjust_canvas_to_rotation,
adjust_canvas_to_transposition,
shift_coordinates,
rotate_coordinates,
transform_coordinates,
transpose_coordinates,
crop_image,
rotate_image,
transpose_image,
bbox_from_polygon,
polygon_from_points,
xywh_from_bbox,
pushd_popd,
is_local_filename,
deprecated_alias,
MIME_TO_EXT,
MIME_TO_PIL,
MIMETYPE_PAGE,
REGEX_PREFIX
)
from .workspace_backup import WorkspaceBackupManager
from .mets_server import ClientSideOcrdMets
__all__ = ['Workspace']
@contextmanager
def download_temporary_file(url):
with NamedTemporaryFile(prefix='ocrd-download-') as f:
with requests.get(url) as r:
f.write(r.content)
yield f
[docs]class Workspace():
"""
A workspace is a temporary directory set up for a processor. It's the
interface to the METS/PAGE XML and delegates download and upload to the
:py:class:`ocrd.resolver.Resolver`.
Args:
directory (string) : Filesystem folder to work in
mets (:py:class:`ocrd_models.ocrd_mets.OcrdMets`) : `OcrdMets` representing this workspace.
Loaded from `'mets.xml'` if `None`.
mets_basename (string) : Basename of the METS XML file. Default: Last URL segment of the mets_url.
overwrite_mode (boolean) : Whether to force add operations on this workspace globally
baseurl (string) : Base URL to prefix to relative URL.
"""
def __init__(self, resolver, directory, mets=None, mets_basename='mets.xml', automatic_backup=False, baseurl=None, mets_server_url=None):
self.resolver = resolver
self.directory = directory
self.mets_target = str(Path(directory, mets_basename))
self.overwrite_mode = False
self.is_remote = bool(mets_server_url)
if mets is None:
if self.is_remote:
mets = ClientSideOcrdMets(mets_server_url)
if mets.workspace_path != self.directory:
raise ValueError(f"METS server {mets_server_url} workspace directory {mets.workspace_path} differs "
"from local workspace directory {self.directory}. These are not the same workspaces.")
else:
mets = OcrdMets(filename=self.mets_target)
self.mets = mets
if automatic_backup:
self.automatic_backup = WorkspaceBackupManager(self)
self.automatic_backup.add()
else:
self.automatic_backup = None
self.baseurl = baseurl
# print(mets.to_xml(xmllint=True).decode('utf-8'))
def __str__(self):
return 'Workspace[remote=%s, directory=%s, baseurl=%s, file_groups=%s, files=%s]' % (
not not self.is_remote,
self.directory,
self.baseurl,
self.mets.file_groups,
[str(f) for f in self.mets.find_all_files()],
)
[docs] def reload_mets(self):
"""
Reload METS from the filesystem.
"""
self.mets = OcrdMets(filename=self.mets_target)
[docs] @deprecated_alias(pageId="page_id")
@deprecated_alias(ID="file_id")
@deprecated_alias(fileGrp="file_grp")
@deprecated_alias(fileGrp_mapping="filegrp_mapping")
def merge(self, other_workspace, copy_files=True, overwrite=False, **kwargs):
"""
Merge ``other_workspace`` into this one
See :py:meth:`ocrd_models.ocrd_mets.OcrdMets.merge` for the `kwargs`
Keyword Args:
copy_files (boolean): Whether to copy files from `other_workspace` to this one
"""
def after_add_cb(f):
"""callback to run on merged OcrdFile instances in the destination"""
if not f.local_filename:
# OcrdFile has no local_filename, so nothing to be copied
return
if not copy_files:
fpath_src = Path(other_workspace.directory).resolve()
fpath_dst = Path(self.directory).resolve()
dstprefix = fpath_src.relative_to(fpath_dst) # raises ValueError if not a subpath
f.local_filename = dstprefix / f.local_filename
return
fpath_src = Path(other_workspace.directory, f.local_filename)
fpath_dest = Path(self.directory, f.local_filename)
if fpath_src.exists():
if fpath_dest.exists() and not overwrite:
raise FileExistsError("Copying %s to %s would overwrite the latter" % (fpath_src, fpath_dest))
if not fpath_dest.parent.is_dir():
makedirs(str(fpath_dest.parent))
with open(str(fpath_src), 'rb') as fstream_in, open(str(fpath_dest), 'wb') as fstream_out:
copyfileobj(fstream_in, fstream_out)
if 'page_id' in kwargs:
kwargs['pageId'] = kwargs.pop('page_id')
if 'file_id' in kwargs:
kwargs['ID'] = kwargs.pop('file_id')
if 'file_grp' in kwargs:
kwargs['fileGrp'] = kwargs.pop('file_grp')
if 'filegrp_mapping' in kwargs:
kwargs['fileGrp_mapping'] = kwargs.pop('filegrp_mapping')
self.mets.merge(other_workspace.mets, after_add_cb=after_add_cb, **kwargs)
[docs] @deprecated(version='1.0.0', reason="Use workspace.download_file")
def download_url(self, url, **kwargs):
"""
Download a URL to the workspace.
Args:
url (string): URL to download to directory
**kwargs : See :py:class:`ocrd_models.ocrd_file.OcrdFile`
Returns:
The local filename of the downloaded file
"""
dummy_mets = OcrdMets.empty_mets()
f = dummy_mets.add_file('DEPRECATED', ID=Path(url).name, url=url)
f = self.download_file(f)
return f.local_filename
[docs] def download_file(self, f, _recursion_count=0):
"""
Download a :py:class:`ocrd_models.ocrd_file.OcrdFile` to the workspace.
"""
log = getLogger('ocrd.workspace.download_file')
with pushd_popd(self.directory):
if f.local_filename:
file_path = Path(f.local_filename).absolute()
if file_path.exists():
try:
file_path.relative_to(Path(self.directory).resolve()) # raises ValueError if not relative
# If the f.local_filename exists and is within self.directory, nothing to do
log.info(f"'local_filename' {f.local_filename} already within {self.directory}, nothing to do")
except ValueError:
# f.local_filename exists, but not within self.directory, copy it
log.info("Copying 'local_filename' %s to workspace directory %s" % (f.local_filename, self.directory))
f.local_filename = self.resolver.download_to_directory(self.directory, f.local_filename, subdir=f.fileGrp)
return f
if f.url:
log.info("OcrdFile has 'local_filename' but it doesn't resolve, try to download from set 'url' %s", f.url)
elif self.baseurl:
log.info("OcrdFile has 'local_filename' but it doesn't resolve and no 'url', concatenate 'baseurl' %s and 'local_filename' %s",
self.baseurl, f.local_filename)
f.url = '%s/%s' % (self.baseurl, f.local_filename)
else:
raise FileNotFoundError(f"'local_filename' {f.local_filename} points to non-existing file,"
"and no 'url' to download and no 'baseurl' set on workspace, nothing we can do.")
if f.url:
# If f.url is set, download the file to the workspace
basename = '%s%s' % (f.ID, MIME_TO_EXT.get(f.mimetype, '')) if f.ID else f.basename
f.local_filename = self.resolver.download_to_directory(self.directory, f.url, subdir=f.fileGrp, basename=basename)
else:
# If neither f.local_filename nor f.url is set, fail
raise ValueError("OcrdFile {f} has neither 'url' nor 'local_filename', so cannot be downloaded")
return f
[docs] def remove_file(self, file_id, force=False, keep_file=False, page_recursive=False, page_same_group=False):
"""
Remove a METS `file` from the workspace.
Arguments:
file_id (string|:py:class:`ocrd_models.ocrd_file.OcrdFile`): `@ID` of the METS `file`
to delete or the file itself
Keyword Args:
force (boolean): Continue removing even if file not found in METS
keep_file (boolean): Whether to keep files on disk
page_recursive (boolean): Whether to remove all images referenced in the file
if the file is a PAGE-XML document.
page_same_group (boolean): Remove only images in the same file group as the PAGE-XML.
Has no effect unless ``page_recursive`` is `True`.
"""
log = getLogger('ocrd.workspace.remove_file')
log.debug('Deleting mets:file %s', file_id)
if self.overwrite_mode:
force = True
if isinstance(file_id, OcrdFile):
file_id = file_id.ID
try:
try:
ocrd_file = next(self.mets.find_files(ID=file_id))
except StopIteration:
if file_id.startswith(REGEX_PREFIX):
# allow empty results if filter criteria involve a regex
return None
raise FileNotFoundError("File %s not found in METS" % file_id)
if page_recursive and ocrd_file.mimetype == MIMETYPE_PAGE:
with pushd_popd(self.directory):
ocrd_page = parse(self.download_file(ocrd_file).local_filename, silence=True)
for img_url in ocrd_page.get_AllAlternativeImagePaths():
img_kwargs = {'local_filename': img_url}
if page_same_group:
img_kwargs['fileGrp'] = ocrd_file.fileGrp
for img_file in self.mets.find_files(**img_kwargs):
self.remove_file(img_file, keep_file=keep_file, force=force)
if not keep_file:
with pushd_popd(self.directory):
if not ocrd_file.local_filename:
if force:
log.debug("File not locally available but --force is set: %s", ocrd_file)
else:
raise Exception("File not locally available %s" % ocrd_file)
else:
log.info("rm %s [cwd=%s]", ocrd_file.local_filename, self.directory)
unlink(ocrd_file.local_filename)
# Remove from METS only after the recursion of AlternativeImages
self.mets.remove_file(file_id)
return ocrd_file
except FileNotFoundError as e:
if not force:
raise e
[docs] def remove_file_group(self, USE, recursive=False, force=False, keep_files=False, page_recursive=False, page_same_group=False):
"""
Remove a METS `fileGrp`.
Arguments:
USE (string): `@USE` of the METS `fileGrp` to delete
Keyword Args:
recursive (boolean): Whether to recursively delete all files in the group
force (boolean): Continue removing even if group or containing files not found in METS
keep_files (boolean): When deleting recursively whether to keep files on disk
page_recursive (boolean): Whether to remove all images referenced in the file
if the file is a PAGE-XML document.
page_same_group (boolean): Remove only images in the same file group as the PAGE-XML.
Has no effect unless ``page_recursive`` is `True`.
"""
if not force and self.overwrite_mode:
force = True
if (not USE.startswith(REGEX_PREFIX)) and (USE not in self.mets.file_groups) and (not force):
raise Exception("No such fileGrp: %s" % USE)
file_dirs = []
if recursive:
for f in self.mets.find_files(fileGrp=USE):
self.remove_file(f, force=force, keep_file=keep_files, page_recursive=page_recursive, page_same_group=page_same_group)
if f.local_filename:
f_dir = path.dirname(f.local_filename)
if f_dir:
file_dirs.append(f_dir)
self.mets.remove_file_group(USE, force=force, recursive=recursive)
# PLEASE NOTE: this only removes directories in the workspace if they are empty
# and named after the fileGrp which is a convention in OCR-D.
with pushd_popd(self.directory):
if Path(USE).is_dir() and not listdir(USE):
Path(USE).rmdir()
if file_dirs:
for file_dir in set(file_dirs):
if Path(file_dir).is_dir() and not listdir(file_dir):
Path(file_dir).rmdir()
[docs] def rename_file_group(self, old, new):
"""
Rename a METS `fileGrp`.
Arguments:
old (string): `@USE` of the METS `fileGrp` to rename
new (string): `@USE` of the METS `fileGrp` to rename as
"""
log = getLogger('ocrd.workspace.rename_file_group')
if old not in self.mets.file_groups:
raise ValueError(f"No such fileGrp: {old}")
if new in self.mets.file_groups:
raise ValueError(f"fileGrp already exists {new}")
with pushd_popd(self.directory):
# create workspace dir ``new``
log.info("mkdir %s" % new)
if not Path(new).is_dir():
Path(new).mkdir()
local_filename_replacements = {}
log.info("Moving files")
for mets_file in self.mets.find_files(fileGrp=old, local_only=True):
new_local_filename = old_local_filename = str(mets_file.local_filename)
# Directory part
new_local_filename = sub(r'^%s/' % old, r'%s/' % new, new_local_filename)
# File part
new_local_filename = sub(r'/%s' % old, r'/%s' % new, new_local_filename)
local_filename_replacements[str(mets_file.local_filename)] = new_local_filename
# move file from ``old`` to ``new``
mets_file.local_filename.rename(new_local_filename)
# change the url of ``mets:file``
mets_file.local_filename = new_local_filename
# change the file ID and update structMap
# change the file ID and update structMap
new_id = sub(r'^%s' % old, r'%s' % new, mets_file.ID)
try:
next(self.mets.find_files(ID=new_id))
log.warning("ID %s already exists, not changing ID while renaming %s -> %s" % (new_id, old_local_filename, new_local_filename))
except StopIteration:
mets_file.ID = new_id
# change file paths in PAGE-XML imageFilename and filename attributes
for page_file in self.mets.find_files(mimetype=MIMETYPE_PAGE, local_only=True):
log.info("Renaming file references in PAGE-XML %s" % page_file)
pcgts = page_from_file(page_file)
changed = False
for old_local_filename, new_local_filename in local_filename_replacements.items():
if pcgts.get_Page().imageFilename == old_local_filename:
changed = True
log.info("Rename pc:Page/@imageFilename: %s -> %s" % (old_local_filename, new_local_filename))
pcgts.get_Page().imageFilename = new_local_filename
for ai in pcgts.get_Page().get_AllAlternativeImages():
for old_local_filename, new_local_filename in local_filename_replacements.items():
if ai.filename == old_local_filename:
changed = True
log.info("Rename pc:Page/../AlternativeImage: %s -> %s" % (old_local_filename, new_local_filename))
ai.filename = new_local_filename
if changed:
log.info("PAGE-XML changed, writing %s" % (page_file.local_filename))
with open(page_file.local_filename, 'w', encoding='utf-8') as f:
f.write(to_xml(pcgts))
# change the ``USE`` attribute of the fileGrp
self.mets.rename_file_group(old, new)
# Remove the old dir
log.info("rmdir %s" % old)
if Path(old).is_dir() and not listdir(old):
Path(old).rmdir()
[docs] @deprecated_alias(pageId="page_id")
@deprecated_alias(ID="file_id")
def add_file(self, file_grp, content=None, **kwargs):
"""
Add a file to the :py:class:`ocrd_models.ocrd_mets.OcrdMets` of the workspace.
Arguments:
file_grp (string): `@USE` of the METS `fileGrp` to add to
Keyword Args:
content (string|bytes): optional content to write to the file
in the filesystem
**kwargs: See :py:func:`ocrd_models.ocrd_mets.OcrdMets.add_file`
Returns:
a new :py:class:`ocrd_models.ocrd_file.OcrdFile`
"""
log = getLogger('ocrd.workspace.add_file')
log.debug(
'outputfile file_grp=%s local_filename=%s content=%s',
file_grp,
kwargs.get('local_filename'),
content is not None)
if 'page_id' not in kwargs:
raise ValueError("workspace.add_file must be passed a 'page_id' kwarg, even if it is None.")
if content is not None and not kwargs.get('local_filename'):
raise Exception("'content' was set but no 'local_filename'")
if self.overwrite_mode:
kwargs['force'] = True
with pushd_popd(self.directory):
if kwargs.get('local_filename'):
# If the local filename has folder components, create those folders
local_filename_dir = str(kwargs['local_filename']).rsplit('/', 1)[0]
if local_filename_dir != str(kwargs['local_filename']) and not Path(local_filename_dir).is_dir():
makedirs(local_filename_dir)
# print(kwargs)
kwargs["pageId"] = kwargs.pop("page_id")
if "file_id" in kwargs:
kwargs["ID"] = kwargs.pop("file_id")
ret = self.mets.add_file(file_grp, **kwargs)
# content being set implies is_remote==False because METS server
# does not pass file contents
if content is not None:
with open(kwargs['local_filename'], 'wb') as f:
if isinstance(content, str):
content = bytes(content, 'utf-8')
f.write(content)
return ret
[docs] def save_mets(self):
"""
Write out the current state of the METS file to the filesystem.
"""
log = getLogger('ocrd.workspace.save_mets')
if self.is_remote:
self.mets.save()
else:
log.debug("Saving mets '%s'", self.mets_target)
if self.automatic_backup:
WorkspaceBackupManager(self).add()
with atomic_write(self.mets_target) as f:
f.write(self.mets.to_xml(xmllint=True).decode('utf-8'))
[docs] def resolve_image_exif(self, image_url):
"""
Get the EXIF metadata about an image URL as :py:class:`ocrd_models.ocrd_exif.OcrdExif`
Args:
image_url (string) : `@href` (path or URL) of the METS `file` to inspect
Returns:
:py:class:`ocrd_models.ocrd_exif.OcrdExif`
"""
if not image_url:
# avoid "finding" just any file
raise ValueError(f"'image_url' must be a non-empty string, not '{image_url}' ({type(image_url)})")
try:
f = next(self.mets.find_files(local_filename=str(image_url)))
return exif_from_filename(f.local_filename)
except StopIteration:
try:
f = next(self.mets.find_files(url=str(image_url)))
return exif_from_filename(self.download_file(f).local_filename)
except StopIteration:
with download_temporary_file(image_url) as f:
return exif_from_filename(f.name)
[docs] @deprecated(version='1.0.0', reason="Use workspace.image_from_page and workspace.image_from_segment")
def resolve_image_as_pil(self, image_url, coords=None):
"""
Resolve an image URL to a `PIL.Image`.
Arguments:
image_url (string): `@href` (path or URL) of the METS `file` to retrieve
Keyword Args:
coords (list) : Coordinates of the bounding box to cut from the image
Returns:
Full or cropped `PIL.Image`
"""
return self._resolve_image_as_pil(image_url, coords)
def _resolve_image_as_pil(self, image_url, coords=None):
if not image_url:
# avoid "finding" just any file
raise Exception("Cannot resolve empty image path")
log = getLogger('ocrd.workspace._resolve_image_as_pil')
with pushd_popd(self.directory):
try:
f = next(self.mets.find_files(local_filename=str(image_url)))
pil_image = Image.open(f.local_filename)
except StopIteration:
try:
f = next(self.mets.find_files(url=str(image_url)))
pil_image = Image.open(self.download_file(f).local_filename)
except StopIteration:
with download_temporary_file(image_url) as f:
pil_image = Image.open(f.name)
pil_image.load() # alloc and give up the FD
# Pillow does not properly support higher color depths
# (e.g. 16-bit or 32-bit or floating point grayscale),
# clipping its dynamic range to the lower 8-bit in
# many operations (including paste, putalpha, ImageStat...),
# even including conversion.
# Cf. Pillow#3011 Pillow#3159 Pillow#3838 (still open in 8.0)
# So to be on the safe side, we must re-quantize these
# to 8-bit via numpy (conversion to/from which fortunately
# seems to work reliably):
if (pil_image.mode.startswith('I') or
pil_image.mode.startswith('F')):
arr_image = np.array(pil_image)
if arr_image.dtype.kind == 'i':
# signed integer is *not* trustworthy in this context
# (usually a mistake in the array interface)
log.debug('Casting image "%s" from signed to unsigned', image_url)
arr_image.dtype = np.dtype('u' + arr_image.dtype.name)
if arr_image.dtype.kind == 'u':
# integer needs to be scaled linearly to 8 bit
# of course, an image might actually have some lower range
# (e.g. 10-bit in I;16 or 20-bit in I or 4-bit in L),
# but that would be guessing anyway, so here don't
# make assumptions on _scale_, just reduce _precision_
log.debug('Reducing image "%s" from depth %d bit to 8 bit',
image_url, arr_image.dtype.itemsize * 8)
arr_image = arr_image >> 8 * (arr_image.dtype.itemsize-1)
arr_image = arr_image.astype(np.uint8)
elif arr_image.dtype.kind == 'f':
# float needs to be scaled from [0,1.0] to [0,255]
log.debug('Reducing image "%s" from floating point to 8 bit',
image_url)
arr_image *= 255
arr_image = arr_image.astype(np.uint8)
pil_image = Image.fromarray(arr_image)
if coords is None:
return pil_image
# FIXME: remove or replace this by (image_from_polygon+) crop_image ...
log.debug("Converting PIL to OpenCV: %s", image_url)
color_conversion = COLOR_GRAY2BGR if pil_image.mode in ('1', 'L') else COLOR_RGB2BGR
pil_as_np_array = np.array(pil_image).astype('uint8') if pil_image.mode == '1' else np.array(pil_image)
cv2_image = cvtColor(pil_as_np_array, color_conversion)
poly = np.array(coords, np.int32)
log.debug("Cutting region %s from %s", coords, image_url)
region_cut = cv2_image[
np.min(poly[:, 1]):np.max(poly[:, 1]),
np.min(poly[:, 0]):np.max(poly[:, 0])
]
return Image.fromarray(region_cut)
[docs] def image_from_page(self, page, page_id,
fill='background', transparency=False,
feature_selector='', feature_filter='', filename=''):
"""Extract an image for a PAGE-XML page from the workspace.
Args:
page (:py:class:`ocrd_models.ocrd_page.PageType`): a PAGE `PageType` object
page_id (string): its `@ID` in the METS physical `structMap`
Keyword Args:
fill (string): a `PIL` color specifier, or `background` or `none`
transparency (boolean): whether to add an alpha channel for masking
feature_selector (string): a comma-separated list of `@comments` classes
feature_filter (string): a comma-separated list of `@comments` classes
filename (string): which file path to use
Extract a `PIL.Image` from ``page``, either from its `AlternativeImage`
(if it exists), or from its `@imageFilename` (otherwise). Also crop it,
if a `Border` exists, and rotate it, if any `@orientation` angle is
annotated.
If ``filename`` is given, then among `@imageFilename` and the available
`AlternativeImage/@filename` images, pick that one, or raise an error.
If ``feature_selector`` and/or ``feature_filter`` is given, then
among the `@imageFilename` image and the available AlternativeImages,
select/filter the richest one which contains all of the selected,
but none of the filtered features (i.e. `@comments` classes), or
raise an error.
(Required and produced features need not be in the same order, so
``feature_selector`` is merely a mask specifying Boolean AND, and
``feature_filter`` is merely a mask specifying Boolean OR.)
If the chosen image does not have the feature `"cropped"` yet, but
a `Border` exists, and unless `"cropped"` is being filtered, then crop it.
Likewise, if the chosen image does not have the feature `"deskewed"` yet,
but an `@orientation` angle is annotated, and unless `"deskewed"` is being
filtered, then rotate it. (However, if `@orientation` is above the
[-45°,45°] interval, then apply as much transposition as possible first,
unless `"rotated-90"` / `"rotated-180"` / `"rotated-270"` is being filtered.)
Cropping uses a polygon mask (not just the bounding box rectangle).
Areas outside the polygon will be filled according to ``fill``:
\b
- if `"background"` (the default),
then fill with the median color of the image;
- else if `"none"`, then avoid masking polygons where possible
(i.e. when cropping) or revert to the default (i.e. when rotating)
- otherwise, use the given color, e.g. `"white"` or `(255,255,255)`.
Moreover, if ``transparency`` is true, and unless the image already
has an alpha channel, then add an alpha channel which is fully opaque
before cropping and rotating. (Thus, unexposed/masked areas will be
transparent afterwards for consumers that can interpret alpha channels).
Returns:
a tuple of
* the extracted `PIL.Image`,
* a `dict` with information about the extracted image:
- `"transform"`: a `Numpy` array with an affine transform which
converts from absolute coordinates to those relative to the image,
i.e. after cropping to the page's border / bounding box (if any)
and deskewing with the page's orientation angle (if any)
- `"angle"`: the rotation/reflection angle applied to the image so far,
- `"features"`: the `AlternativeImage` `@comments` for the image, i.e.
names of all applied operations that lead up to this result,
* an :py:class:`ocrd_models.ocrd_exif.OcrdExif` instance associated with
the original image.
(The first two can be used to annotate a new `AlternativeImage`,
or be passed down with :py:meth:`image_from_segment`.)
Examples:
* get a raw (colored) but already deskewed and cropped image::
page_image, page_coords, page_image_info = workspace.image_from_page(
page, page_id,
feature_selector='deskewed,cropped',
feature_filter='binarized,grayscale_normalized')
"""
log = getLogger('ocrd.workspace.image_from_page')
page_image_info = self.resolve_image_exif(page.imageFilename)
page_image = self._resolve_image_as_pil(page.imageFilename)
page_coords = dict()
# use identity as initial affine coordinate transform:
page_coords['transform'] = np.eye(3)
# interim bbox (updated with each change to the transform):
page_bbox = [0, 0, page_image.width, page_image.height]
page_xywh = {'x': 0, 'y': 0,
'w': page_image.width, 'h': page_image.height}
border = page.get_Border()
# page angle: PAGE @orientation is defined clockwise,
# whereas PIL/ndimage rotation is in mathematical direction:
page_coords['angle'] = -(page.get_orientation() or 0)
# map angle from (-180,180] to [0,360], and partition into multiples of 90;
# but avoid unnecessary large remainders, i.e. split symmetrically:
orientation = (page_coords['angle'] + 45) % 360
orientation = orientation - (orientation % 90)
skew = (page_coords['angle'] % 360) - orientation
skew = 180 - (180 - skew) % 360 # map to [-45,45]
page_coords['angle'] = 0 # nothing applied yet (depends on filters)
log.debug("page '%s' has %s orientation=%d skew=%.2f",
page_id, "border," if border else "", orientation, skew)
# initialize AlternativeImage@comments classes as empty:
page_coords['features'] = ''
best_image = None
alternative_images = page.get_AlternativeImage()
if alternative_images:
# (e.g. from page-level cropping, binarization, deskewing or despeckling)
best_features = set()
auto_features = {'cropped', 'deskewed', 'rotated-90', 'rotated-180', 'rotated-270'}
# search to the end, because by convention we always append,
# and among multiple satisfactory images we want the most recent,
# but also ensure that we get the richest feature set, i.e. most
# of those features that we cannot reproduce automatically below
for alternative_image in alternative_images:
if filename and filename != alternative_image.filename:
continue
features = alternative_image.get_comments()
if not features:
log.warning("AlternativeImage %d for page '%s' does not have any feature attributes",
alternative_images.index(alternative_image) + 1, page_id)
features = ''
featureset = set(features.split(','))
if (all(feature in featureset
for feature in feature_selector.split(',') if feature) and
not any(feature in featureset
for feature in feature_filter.split(',') if feature) and
len(featureset.difference(auto_features)) >= \
len(best_features.difference(auto_features))):
best_features = featureset
best_image = alternative_image
if best_image:
log.debug("Using AlternativeImage %d %s for page '%s'",
alternative_images.index(best_image) + 1,
best_features, page_id)
page_image = self._resolve_image_as_pil(best_image.get_filename())
page_coords['features'] = best_image.get_comments() # including duplicates
# adjust the coord transformation to the steps applied on the image,
# and apply steps on the existing image in case it is missing there,
# but traverse all steps (crop/reflect/rotate) in a particular order:
# - existing image features take priority (in the order annotated),
# - next is cropping (if necessary but not already applied),
# - next is reflection (if necessary but not already applied),
# - next is rotation (if necessary but not already applied).
# This helps deal with arbitrary workflows (e.g. crop then deskew,
# or deskew then crop), regardless of where images are generated.
alternative_image_features = page_coords['features'].split(',')
for duplicate_feature in set([feature for feature in alternative_image_features
# features relevant in reconstructing coordinates:
if (feature in ['cropped', 'deskewed', 'rotated-90',
'rotated-180', 'rotated-270'] and
alternative_image_features.count(feature) > 1)]):
log.error("Duplicate feature %s in AlternativeImage for page '%s'",
duplicate_feature, page_id)
for i, feature in enumerate(alternative_image_features +
(['cropped']
if (border and
not 'cropped' in alternative_image_features and
not 'cropped' in feature_filter.split(','))
else []) +
(['rotated-%d' % orientation]
if (orientation and
not 'rotated-%d' % orientation in alternative_image_features and
not 'rotated-%d' % orientation in feature_filter.split(','))
else []) +
(['deskewed']
if (skew and
not 'deskewed' in alternative_image_features and
not 'deskewed' in feature_filter.split(','))
else []) +
# not a feature to be added, but merely as a fallback position
# to always enter loop at i == len(alternative_image_features)
['_check']):
# image geometry vs feature consistency can only be checked
# after all features on the existing AlternativeImage have
# been adjusted for in the transform, and when there is a mismatch,
# additional steps applied here would only repeat the respective
# error message; so we only check once at the boundary between
# existing and new features
# FIXME we should check/enforce consistency when _adding_ AlternativeImage
if (i == len(alternative_image_features) and
not (page_xywh['w'] - 2 < page_image.width < page_xywh['w'] + 2 and
page_xywh['h'] - 2 < page_image.height < page_xywh['h'] + 2)):
log.error('page "%s" image (%s; %dx%d) has not been cropped properly (%dx%d)',
page_id, page_coords['features'],
page_image.width, page_image.height,
page_xywh['w'], page_xywh['h'])
name = "%s for page '%s'" % ("AlternativeImage" if best_image
else "original image", page_id)
# adjust transform to feature, and ensure feature is applied to image
if feature == 'cropped':
page_image, page_coords, page_xywh = _crop(
log, name, border, page_image, page_coords,
fill=fill, transparency=transparency)
elif feature == 'rotated-%d' % orientation:
page_image, page_coords, page_xywh = _reflect(
log, name, orientation, page_image, page_coords, page_xywh)
elif feature == 'deskewed':
page_image, page_coords, page_xywh = _rotate(
log, name, skew, border, page_image, page_coords, page_xywh,
fill=fill, transparency=transparency)
# verify constraints again:
if filename and not getattr(page_image, 'filename', '').endswith(filename):
raise Exception('Found no AlternativeImage that satisfies all requirements ' +
'filename="%s" in page "%s"' % (
filename, page_id))
if not all(feature in page_coords['features']
for feature in feature_selector.split(',') if feature):
raise Exception('Found no AlternativeImage that satisfies all requirements ' +
'selector="%s" in page "%s"' % (
feature_selector, page_id))
if any(feature in page_coords['features']
for feature in feature_filter.split(',') if feature):
raise Exception('Found no AlternativeImage that satisfies all requirements ' +
'filter="%s" in page "%s"' % (
feature_filter, page_id))
page_image.format = 'PNG' # workaround for tesserocr#194
return page_image, page_coords, page_image_info
[docs] def image_from_segment(self, segment, parent_image, parent_coords,
fill='background', transparency=False,
feature_selector='', feature_filter='', filename=''):
"""Extract an image for a PAGE-XML hierarchy segment from its parent's image.
Args:
segment (object): a PAGE segment object \
(i.e. :py:class:`~ocrd_models.ocrd_page.TextRegionType` \
or :py:class:`~ocrd_models.ocrd_page.TextLineType` \
or :py:class:`~ocrd_models.ocrd_page.WordType` \
or :py:class:`~ocrd_models.ocrd_page.GlyphType`)
parent_image (`PIL.Image`): image of the `segment`'s parent
parent_coords (dict): a `dict` with information about `parent_image`:
- `"transform"`: a `Numpy` array with an affine transform which
converts from absolute coordinates to those relative to the image,
i.e. after applying all operations (starting with the original image)
- `"angle"`: the rotation/reflection angle applied to the image so far,
- `"features"`: the ``AlternativeImage/@comments`` for the image, i.e.
names of all operations that lead up to this result, and
Keyword Args:
fill (string): a `PIL` color specifier, or `background` or `none`
transparency (boolean): whether to add an alpha channel for masking
feature_selector (string): a comma-separated list of ``@comments`` classes
feature_filter (string): a comma-separated list of ``@comments`` classes
Extract a `PIL.Image` from `segment`, either from ``AlternativeImage``
(if it exists), or producing a new image via cropping from `parent_image`
(otherwise). Pass in `parent_image` and `parent_coords` from the result
of the next higher-level of this function or from :py:meth:`image_from_page`.
If ``filename`` is given, then among the available `AlternativeImage/@filename`
images, pick that one, or raise an error.
If ``feature_selector`` and/or ``feature_filter`` is given, then
among the cropped `parent_image` and the available AlternativeImages,
select/filter the richest one which contains all of the selected,
but none of the filtered features (i.e. ``@comments`` classes), or
raise an error.
(Required and produced features need not be in the same order, so
`feature_selector` is merely a mask specifying Boolean AND, and
`feature_filter` is merely a mask specifying Boolean OR.)
Cropping uses a polygon mask (not just the bounding box rectangle).
Areas outside the polygon will be filled according to `fill`:
\b
- if `"background"` (the default),
then fill with the median color of the image;
- else if `"none"`, then avoid masking polygons where possible
(i.e. when cropping) or revert to the default (i.e. when rotating)
- otherwise, use the given color, e.g. `"white"` or `(255,255,255)`.
Moreover, if `transparency` is true, and unless the image already
has an alpha channel, then add an alpha channel which is fully opaque
before cropping and rotating. (Thus, unexposed/masked areas will be
transparent afterwards for consumers that can interpret alpha channels).
When cropping, compensate any ``@orientation`` angle annotated for the
parent (from parent-level deskewing) by rotating the segment coordinates
in an inverse transformation (i.e. translation to center, then passive
rotation, and translation back).
Regardless, if any ``@orientation`` angle is annotated for the segment
(from segment-level deskewing), and the chosen image does not have
the feature `"deskewed"` yet, and unless `"deskewed"` is being filtered,
then rotate it - compensating for any previous `"angle"`. (However,
if ``@orientation`` is above the [-45°,45°] interval, then apply as much
transposition as possible first, unless `"rotated-90"` / `"rotated-180"` /
`"rotated-270"` is being filtered.)
Returns:
a tuple of
* the extracted `PIL.Image`,
* a `dict` with information about the extracted image:
- `"transform"`: a `Numpy` array with an affine transform which
converts from absolute coordinates to those relative to the image,
i.e. after applying all parent operations, and then cropping to
the segment's bounding box, and deskewing with the segment's
orientation angle (if any)
- `"angle"`: the rotation/reflection angle applied to the image so far,
- `"features"`: the ``AlternativeImage/@comments`` for the image, i.e.
names of all applied operations that lead up to this result.
(These can be used to create a new ``AlternativeImage``, or passed down
for :py:meth:`image_from_segment` calls on lower hierarchy levels.)
Examples:
* get a raw (colored) but already deskewed and cropped image::
image, xywh = workspace.image_from_segment(region,
page_image, page_xywh,
feature_selector='deskewed,cropped',
feature_filter='binarized,grayscale_normalized')
"""
log = getLogger('ocrd.workspace.image_from_segment')
# note: We should mask overlapping neighbouring segments here,
# but finding the right clipping rules can be difficult if operating
# on the raw (non-binary) image data alone: for each intersection, it
# must be decided which one of either segment or neighbour to assign,
# e.g. an ImageRegion which properly contains our TextRegion should be
# completely ignored, but an ImageRegion which is properly contained
# in our TextRegion should be completely masked, while partial overlap
# may be more difficult to decide. On the other hand, on the binary image,
# we can use connected component analysis to mask foreground areas which
# originate in the neighbouring regions. But that would introduce either
# the assumption that the input has already been binarized, or a dependency
# on some ad-hoc binarization method. Thus, it is preferable to use
# a dedicated processor for this (which produces clipped AlternativeImage
# or reduced polygon coordinates).
segment_image, segment_coords, segment_xywh = _crop(
log, "parent image for segment '%s'" % segment.id,
segment, parent_image, parent_coords,
fill=fill, transparency=transparency)
# Semantics of missing @orientation at region level could be either
# - inherited from page level: same as line or word level (no @orientation),
# - zero (unrotate page angle): different from line or word level (because
# otherwise deskewing would never have an effect on lines and words)
# The PAGE specification is silent here (but does generally not concern itself
# much with AlternativeImage coordinate consistency).
# Since our (generateDS-backed) ocrd_page supports the zero/none distinction,
# we choose the former (i.e. None is inheritance).
if 'orientation' in segment.__dict__ and segment.get_orientation() is not None:
# region angle: PAGE @orientation is defined clockwise,
# whereas PIL/ndimage rotation is in mathematical direction:
angle = -segment.get_orientation()
# @orientation is always absolute; if higher levels
# have already rotated, then we must compensate:
angle -= parent_coords['angle']
# map angle from (-180,180] to [0,360], and partition into multiples of 90;
# but avoid unnecessary large remainders, i.e. split symmetrically:
orientation = (angle + 45) % 360
orientation = orientation - (orientation % 90)
skew = (angle % 360) - orientation
skew = 180 - (180 - skew) % 360 # map to [-45,45]
log.debug("segment '%s' has orientation=%d skew=%.2f",
segment.id, orientation, skew)
else:
orientation = 0
skew = 0
segment_coords['angle'] = parent_coords['angle'] # nothing applied yet (depends on filters)
# initialize AlternativeImage@comments classes from parent, except
# for those operations that can apply on multiple hierarchy levels:
segment_coords['features'] = ','.join(
[feature for feature in parent_coords['features'].split(',')
if feature in ['binarized', 'grayscale_normalized',
'despeckled', 'dewarped']])
best_image = None
alternative_images = segment.get_AlternativeImage()
if alternative_images:
# (e.g. from segment-level cropping, binarization, deskewing or despeckling)
best_features = set()
auto_features = {'cropped', 'deskewed', 'rotated-90', 'rotated-180', 'rotated-270'}
# search to the end, because by convention we always append,
# and among multiple satisfactory images we want the most recent,
# but also ensure that we get the richest feature set, i.e. most
# of those features that we cannot reproduce automatically below
for alternative_image in alternative_images:
if filename and filename != alternative_image.filename:
continue
features = alternative_image.get_comments()
if not features:
log.warning("AlternativeImage %d for segment '%s' does not have any feature attributes",
alternative_images.index(alternative_image) + 1, segment.id)
features = ''
featureset = set(features.split(','))
if (all(feature in featureset
for feature in feature_selector.split(',') if feature) and
not any(feature in featureset
for feature in feature_filter.split(',') if feature) and
len(featureset.difference(auto_features)) >= \
len(best_features.difference(auto_features))):
best_features = featureset
best_image = alternative_image
if best_image:
log.debug("Using AlternativeImage %d %s for segment '%s'",
alternative_images.index(best_image) + 1,
best_features, segment.id)
segment_image = self._resolve_image_as_pil(alternative_image.get_filename())
segment_coords['features'] = best_image.get_comments() # including duplicates
alternative_image_features = segment_coords['features'].split(',')
for duplicate_feature in set([feature for feature in alternative_image_features
# features relevant in reconstructing coordinates:
if (feature in ['deskewed', 'rotated-90',
'rotated-180', 'rotated-270'] and
alternative_image_features.count(feature) > 1)]):
log.error("Duplicate feature %s in AlternativeImage for segment '%s'",
duplicate_feature, segment.id)
for i, feature in enumerate(alternative_image_features +
(['rotated-%d' % orientation]
if (orientation and
not 'rotated-%d' % orientation in alternative_image_features and
not 'rotated-%d' % orientation in feature_filter.split(','))
else []) +
(['deskewed']
if (skew and
not 'deskewed' in alternative_image_features and
not 'deskewed' in feature_filter.split(','))
else []) +
# not a feature to be added, but merely as a fallback position
# to always enter loop at i == len(alternative_image_features)
['_check']):
# image geometry vs feature consistency can only be checked
# after all features on the existing AlternativeImage have
# been adjusted for in the transform, and when there is a mismatch,
# additional steps applied here would only repeat the respective
# error message; so we only check once at the boundary between
# existing and new features
# FIXME we should enforce consistency here (i.e. split into transposition
# and minimal rotation, rotation always reshapes, rescaling never happens)
# FIXME: inconsistency currently unavoidable with line-level dewarping (which increases height)
if (i == len(alternative_image_features) and
not (segment_xywh['w'] - 2 < segment_image.width < segment_xywh['w'] + 2 and
segment_xywh['h'] - 2 < segment_image.height < segment_xywh['h'] + 2)):
log.error('segment "%s" image (%s; %dx%d) has not been cropped properly (%dx%d)',
segment.id, segment_coords['features'],
segment_image.width, segment_image.height,
segment_xywh['w'], segment_xywh['h'])
name = "%s for segment '%s'" % ("AlternativeImage" if best_image
else "parent image", segment.id)
# adjust transform to feature, and ensure feature is applied to image
if feature == 'rotated-%d' % orientation:
segment_image, segment_coords, segment_xywh = _reflect(
log, name, orientation, segment_image, segment_coords, segment_xywh)
elif feature == 'deskewed':
segment_image, segment_coords, segment_xywh = _rotate(
log, name, skew, segment, segment_image, segment_coords, segment_xywh,
fill=fill, transparency=transparency)
# verify constraints again:
if filename and not getattr(segment_image, 'filename', '').endswith(filename):
raise Exception('Found no AlternativeImage that satisfies all requirements ' +
'filename="%s" in segment "%s"' % (
filename, segment.id))
if not all(feature in segment_coords['features']
for feature in feature_selector.split(',') if feature):
raise Exception('Found no AlternativeImage that satisfies all requirements' +
'selector="%s" in segment "%s"' % (
feature_selector, segment.id))
if any(feature in segment_coords['features']
for feature in feature_filter.split(',') if feature):
raise Exception('Found no AlternativeImage that satisfies all requirements ' +
'filter="%s" in segment "%s"' % (
feature_filter, segment.id))
segment_image.format = 'PNG' # workaround for tesserocr#194
return segment_image, segment_coords
# pylint: disable=redefined-builtin
[docs] def save_image_file(self, image,
file_id,
file_grp,
page_id=None,
mimetype='image/png',
force=False):
"""Store an image in the filesystem and reference it as new file in the METS.
Args:
image (PIL.Image): derived image to save
file_id (string): `@ID` of the METS `file` to use
file_grp (string): `@USE` of the METS `fileGrp` to use
Keyword Args:
page_id (string): `@ID` in the METS physical `structMap` to use
mimetype (string): MIME type of the image format to serialize as
force (boolean): whether to replace any existing `file` with that `@ID`
Serialize the image into the filesystem, and add a `file` for it in the METS.
Use a filename extension based on ``mimetype``.
Returns:
The (absolute) path of the created file.
"""
log = getLogger('ocrd.workspace.save_image_file')
if self.overwrite_mode:
force = True
image_bytes = io.BytesIO()
image.save(image_bytes, format=MIME_TO_PIL[mimetype])
file_path = str(Path(file_grp, '%s%s' % (file_id, MIME_TO_EXT[mimetype])))
out = self.add_file(
file_grp,
file_id=file_id,
page_id=page_id,
local_filename=file_path,
mimetype=mimetype,
content=image_bytes.getvalue(),
force=force)
log.info('created file ID: %s, file_grp: %s, path: %s',
file_id, file_grp, out.local_filename)
return file_path
[docs] def find_files(self, *args, **kwargs):
"""
Search ``mets:file`` entries in wrapped METS document and yield results.
Delegator to :py:func:`ocrd_models.ocrd_mets.OcrdMets.find_files`
Keyword Args:
**kwargs: See :py:func:`ocrd_models.ocrd_mets.OcrdMets.find_files`
Returns:
Generator which yields :py:class:`ocrd_models:ocrd_file:OcrdFile` instantiations
"""
log = getLogger('ocrd.workspace.find_files')
log.debug('find files in mets. kwargs=%s' % kwargs)
if "page_id" in kwargs:
kwargs["pageId"] = kwargs.pop("page_id")
if "file_id" in kwargs:
kwargs["ID"] = kwargs.pop("file_id")
if "file_grp" in kwargs:
kwargs["fileGrp"] = kwargs.pop("file_grp")
with pushd_popd(self.directory):
return self.mets.find_files(*args, **kwargs)
def _crop(log, name, segment, parent_image, parent_coords, op='cropped', **kwargs):
segment_coords = parent_coords.copy()
# get polygon outline of segment relative to parent image:
segment_polygon = coordinates_of_segment(segment, parent_image, parent_coords)
# get relative bounding box:
segment_bbox = bbox_from_polygon(segment_polygon)
# get size of the segment in the parent image after cropping
# (i.e. possibly different from size before rotation at the parent, but
# also possibly different from size after rotation below/AlternativeImage):
segment_xywh = xywh_from_bbox(*segment_bbox)
# crop, if (still) necessary:
if (not isinstance(segment, BorderType) or # always crop below page level
not op in parent_coords['features']):
if op == 'recropped':
log.info("Recropping %s", name)
elif isinstance(segment, BorderType):
log.info("Cropping %s", name)
segment_coords['features'] += ',' + op
# create a mask from the segment polygon:
segment_image = image_from_polygon(parent_image, segment_polygon, **kwargs)
# crop to bbox:
segment_image = crop_image(segment_image, box=segment_bbox)
else:
segment_image = parent_image
# subtract offset from parent in affine coordinate transform:
# (consistent with image cropping)
segment_coords['transform'] = shift_coordinates(
parent_coords['transform'],
np.array([-segment_bbox[0],
-segment_bbox[1]]))
return segment_image, segment_coords, segment_xywh
def _reflect(log, name, orientation, segment_image, segment_coords, segment_xywh):
# Transpose in affine coordinate transform:
# (consistent with image transposition or AlternativeImage below)
transposition = {
90: Image.ROTATE_90,
180: Image.ROTATE_180,
270: Image.ROTATE_270
}.get(orientation) # no default
segment_coords['transform'] = transpose_coordinates(
segment_coords['transform'], transposition,
np.array([0.5 * segment_xywh['w'],
0.5 * segment_xywh['h']]))
segment_xywh['w'], segment_xywh['h'] = adjust_canvas_to_transposition(
[segment_xywh['w'], segment_xywh['h']], transposition)
segment_coords['angle'] += orientation
# transpose, if (still) necessary:
if not 'rotated-%d' % orientation in segment_coords['features']:
log.info("Transposing %s by %d°", name, orientation)
segment_image = transpose_image(segment_image, transposition)
segment_coords['features'] += ',rotated-%d' % orientation
return segment_image, segment_coords, segment_xywh
def _rotate(log, name, skew, segment, segment_image, segment_coords, segment_xywh, **kwargs):
# Rotate around center in affine coordinate transform:
# (consistent with image rotation or AlternativeImage below)
segment_coords['transform'] = rotate_coordinates(
segment_coords['transform'], skew,
np.array([0.5 * segment_xywh['w'],
0.5 * segment_xywh['h']]))
segment_xywh['w'], segment_xywh['h'] = adjust_canvas_to_rotation(
[segment_xywh['w'], segment_xywh['h']], skew)
segment_coords['angle'] += skew
# deskew, if (still) necessary:
if not 'deskewed' in segment_coords['features']:
log.info("Rotating %s by %.2f°", name, skew)
segment_image = rotate_image(segment_image, skew, **kwargs)
segment_coords['features'] += ',deskewed'
if (segment and
(not isinstance(segment, BorderType) or # always crop below page level
'cropped' in segment_coords['features'])):
# re-crop to new bbox (which may deviate
# if segment polygon was not a rectangle)
segment_image, segment_coords, segment_xywh = _crop(
log, name, segment, segment_image, segment_coords,
op='recropped', **kwargs)
elif (segment and
(not isinstance(segment, BorderType) or # always crop below page level
'cropped' in segment_coords['features'])):
# only shift coordinates as if re-cropping
segment_polygon = coordinates_of_segment(segment, segment_image, segment_coords)
segment_bbox = bbox_from_polygon(segment_polygon)
segment_xywh = xywh_from_bbox(*segment_bbox)
segment_coords['transform'] = shift_coordinates(
segment_coords['transform'],
np.array([-segment_bbox[0],
-segment_bbox[1]]))
return segment_image, segment_coords, segment_xywh
def _scale(log, name, factor, segment_image, segment_coords, segment_xywh, **kwargs):
# Resize linearly
segment_coords['transform'] = scale_coordinates(
segment_coords['transform'], [factor, factor])
segment_coords['scale'] = segment_coords.setdefault('scale', 1.0) * factor
segment_xywh['w'] *= factor
segment_xywh['h'] *= factor
# resize, if (still) necessary
if not 'scaled' in segment_coords['features']:
log.info("Scaling %s by %.2f", name, factor)
segment_coords['features'] += ',scaled'
# FIXME: validate factor against PAGE-XML attributes
# FIXME: factor should become less precise due to rounding
segment_image = segment_image.resize((int(segment_image.width * factor),
int(segment_image.height * factor)),
# slowest, but highest quality:
Image.BICUBIC)
return segment_image, segment_coords, segment_xywh