from pathlib import Path
from os.path import join
from os import environ, listdir, getcwd, path, unlink
from shutil import copytree, rmtree, copy
from fnmatch import filter as apply_glob
from datetime import datetime
from tarfile import open as open_tarfile
from urllib.parse import urlparse, unquote
from zipfile import ZipFile
import requests
from gdown.parse_url import parse_url as gparse_url
from gdown.download import get_url_from_gdrive_confirmation
from yaml import safe_load, safe_dump
# https://github.com/OCR-D/core/issues/867
# https://stackoverflow.com/questions/50900727/skip-converting-entities-while-loading-a-yaml-string-using-pyyaml
import yaml.constructor
yaml.constructor.SafeConstructor.yaml_constructors[u'tag:yaml.org,2002:timestamp'] = \
yaml.constructor.SafeConstructor.yaml_constructors[u'tag:yaml.org,2002:str']
from ocrd_validators import OcrdResourceListValidator
from ocrd_utils import getLogger, directory_size, get_moduledir, EXT_TO_MIME, nth_url_segment, guess_media_type, config
from ocrd_utils.os import get_processor_resource_types, list_all_resources, pushd_popd, get_ocrd_tool_json
from .constants import RESOURCE_LIST_FILENAME, RESOURCE_USER_LIST_COMMENT
[docs]class OcrdResourceManager():
"""
Managing processor resources
"""
def __init__(self, userdir=None, xdg_config_home=None, xdg_data_home=None, skip_init=False):
self.log = getLogger('ocrd.resource_manager')
self.database = {}
self._xdg_data_home = xdg_data_home
self._xdg_config_home = xdg_config_home
self._userdir = userdir
self.user_list = Path(self.xdg_config_home, 'ocrd', 'resources.yml')
if not skip_init:
self.load_resource_list(Path(RESOURCE_LIST_FILENAME))
if not self.user_list.exists():
if not self.user_list.parent.exists():
self.user_list.parent.mkdir(parents=True)
self.save_user_list()
self.load_resource_list(self.user_list)
@property
def userdir(self):
if not self._userdir:
self._userdir = config.HOME
return self._userdir
@property
def xdg_data_home(self):
if not self._xdg_data_home:
self._xdg_data_home = config.XDG_DATA_HOME
return self._xdg_data_home
@property
def xdg_config_home(self):
if not self._xdg_config_home:
self._xdg_config_home = config.XDG_CONFIG_HOME
return self._xdg_config_home
[docs] def save_user_list(self, database=None):
if not database:
database = self.database
with open(self.user_list, 'w', encoding='utf-8') as f:
f.write(RESOURCE_USER_LIST_COMMENT)
f.write('\n')
f.write(safe_dump(database))
[docs] def load_resource_list(self, list_filename, database=None):
if not database:
database = self.database
if list_filename.is_file():
with open(list_filename, 'r', encoding='utf-8') as f:
list_loaded = safe_load(f) or {}
report = OcrdResourceListValidator.validate(list_loaded)
if not report.is_valid:
self.log.error('\n'.join(report.errors))
raise ValueError("Resource list %s is invalid!" % (list_filename))
for executable, resource_list in list_loaded.items():
if executable not in database:
database[executable] = []
# Prepend, so user provided is sorted before builtin
database[executable] = list_loaded[executable] + database[executable]
return database
[docs] def list_available(self, executable=None, dynamic=True, name=None, database=None, url=None):
"""
List models available for download by processor
"""
if not database:
database = self.database
if not executable:
return database.items()
if dynamic:
for exec_dir in environ['PATH'].split(':'):
for exec_path in Path(exec_dir).glob(f'{executable}'):
self.log.debug(f"Inspecting '{exec_path} --dump-json' for resources")
ocrd_tool = get_ocrd_tool_json(exec_path)
for resdict in ocrd_tool.get('resources', ()):
if exec_path.name not in database:
database[exec_path.name] = []
database[exec_path.name].insert(0, resdict)
database = self._dedup_database(database)
found = False
ret = []
for k in database:
if apply_glob([k], executable):
found = True
restuple = (k, [])
ret.append(restuple)
for resdict in database[k]:
if name and resdict['name'] != name:
continue
if url and resdict['url'] != url:
continue
restuple[1].append(resdict)
if not found:
ret = [(executable, [])]
return ret
[docs] def list_installed(self, executable=None):
"""
List installed resources, matching with registry by ``name``
"""
ret = []
if executable:
all_executables = [executable]
else:
# resources we know about
all_executables = list(self.database.keys())
# resources in the file system
parent_dirs = [join(x, 'ocrd-resources') for x in [self.xdg_data_home, '/usr/local/share']]
for parent_dir in parent_dirs:
if Path(parent_dir).exists():
all_executables += [x for x in listdir(parent_dir) if x.startswith('ocrd-')]
for this_executable in set(all_executables):
reslist = []
mimetypes = get_processor_resource_types(this_executable)
moduledir = get_moduledir(this_executable)
for res_filename in list_all_resources(this_executable, moduled=moduledir, xdg_data_home=self.xdg_data_home):
res_filename = Path(res_filename)
if not '*/*' in mimetypes:
if res_filename.is_dir() and not 'text/directory' in mimetypes:
continue
if res_filename.is_file() and ['text/directory'] == mimetypes:
continue
res_name = res_filename.name
res_type = 'file' if res_filename.is_file() else 'directory'
res_size = res_filename.stat().st_size if res_filename.is_file() else directory_size(res_filename)
resdict_list = [x for x in self.database.get(this_executable, []) if x['name'] == res_name]
if resdict_list:
resdict = resdict_list[0]
elif str(res_filename.parent) == moduledir:
resdict = {
'name': res_name,
'url': str(res_filename),
'description': 'Found at module',
'type': res_type,
'size': res_size
}
else:
resdict = self.add_to_user_database(this_executable, res_filename, resource_type=res_type)
resdict['path'] = str(res_filename)
reslist.append(resdict)
ret.append((this_executable, reslist))
return ret
[docs] def add_to_user_database(self, executable, res_filename, url=None, resource_type='file'):
"""
Add a stub entry to the user resource.yml
"""
res_name = Path(res_filename).name
self.log.info("%s resource '%s' (%s) not a known resource, creating stub in %s'", executable, res_name, str(res_filename), self.user_list)
if Path(res_filename).is_dir():
res_size = directory_size(res_filename)
else:
res_size = Path(res_filename).stat().st_size
with open(self.user_list, 'r', encoding='utf-8') as f:
user_database = safe_load(f) or {}
if executable not in user_database:
user_database[executable] = []
resources_found = self.list_available(executable=executable, name=res_name, database=user_database)[0][1]
if not resources_found:
resdict = {
'name': res_name,
'url': url if url else '???',
'description': 'Found at %s on %s' % (self.resource_dir_to_location(res_filename), datetime.now()),
'version_range': '???',
'type': resource_type,
'size': res_size
}
user_database[executable].append(resdict)
else:
resdict = resources_found[0]
self.save_user_list(user_database)
self.load_resource_list(self.user_list)
return resdict
@property
def default_resource_dir(self):
return self.location_to_resource_dir('data')
[docs] def location_to_resource_dir(self, location):
return '/usr/local/share/ocrd-resources' if location == 'system' else \
join(self.xdg_data_home, 'ocrd-resources') if location == 'data' else \
getcwd()
[docs] def resource_dir_to_location(self, resource_path):
resource_path = str(resource_path)
return 'system' if resource_path.startswith('/usr/local/share/ocrd-resources') else \
'data' if resource_path.startswith(join(self.xdg_data_home, 'ocrd-resources')) else \
'cwd' if resource_path.startswith(getcwd()) else \
resource_path
[docs] def parameter_usage(self, name, usage='as-is'):
if usage == 'as-is':
return name
elif usage == 'without-extension':
return Path(name).stem
raise ValueError("No such usage '%s'" % usage)
def _download_impl(self, url, filename, progress_cb=None, size=None):
log = getLogger('ocrd.resource_manager._download_impl')
log.info("Downloading %s to %s" % (url, filename))
with open(filename, 'wb') as f:
gdrive_file_id, is_gdrive_download_link = gparse_url(url, warning=False)
if gdrive_file_id:
if not is_gdrive_download_link:
url = "https://drive.google.com/uc?id={id}".format(id=gdrive_file_id)
try:
with requests.get(url, stream=True) as r:
if "Content-Disposition" not in r.headers:
url = get_url_from_gdrive_confirmation(r.text)
except RuntimeError as e:
log.warning("Cannot unwrap Google Drive URL: ", e)
with requests.get(url, stream=True) as r:
r.raise_for_status()
for data in r.iter_content(chunk_size=4096):
if progress_cb:
progress_cb(len(data))
f.write(data)
def _copy_impl(self, src_filename, filename, progress_cb=None):
log = getLogger('ocrd.resource_manager._copy_impl')
log.info("Copying %s to %s", src_filename, filename)
if Path(src_filename).is_dir():
log.info(f"Copying recursively from {src_filename} to {filename}")
for child in Path(src_filename).rglob('*'):
child_dst = Path(filename) / child.relative_to(src_filename)
child_dst.parent.mkdir(parents=True, exist_ok=True)
with open(child_dst, 'wb') as f_out, open(child, 'rb') as f_in:
while True:
chunk = f_in.read(4096)
if chunk:
f_out.write(chunk)
if progress_cb:
progress_cb(len(chunk))
else:
break
else:
with open(filename, 'wb') as f_out, open(src_filename, 'rb') as f_in:
while True:
chunk = f_in.read(4096)
if chunk:
f_out.write(chunk)
if progress_cb:
progress_cb(len(chunk))
else:
break
# TODO Proper caching (make head request for size, If-Modified etc)
[docs] def download(
self,
executable,
url,
basedir,
overwrite=False,
no_subdir=False,
name=None,
resource_type='file',
path_in_archive='.',
progress_cb=None,
):
"""
Download a resource by URL
"""
log = getLogger('ocrd.resource_manager.download')
destdir = Path(basedir) if no_subdir else Path(basedir, executable)
if not name:
url_parsed = urlparse(url)
name = Path(unquote(url_parsed.path)).name
fpath = Path(destdir, name)
is_url = url.startswith('https://') or url.startswith('http://')
if fpath.exists():
if not overwrite:
raise FileExistsError("%s %s already exists but --overwrite is not set" % ('Directory' if fpath.is_dir() else 'File', fpath))
if fpath.is_dir():
log.info("Removing existing target directory {fpath}")
rmtree(str(fpath))
else:
log.info("Removing existing target file {fpath}")
unlink(str(fpath))
destdir.mkdir(parents=True, exist_ok=True)
if resource_type in ('file', 'directory'):
if is_url:
self._download_impl(url, fpath, progress_cb)
else:
self._copy_impl(url, fpath, progress_cb)
elif resource_type == 'archive':
archive_fname = 'download.tar.xx'
with pushd_popd(tempdir=True) as tempdir:
if is_url:
self._download_impl(url, archive_fname, progress_cb)
else:
self._copy_impl(url, archive_fname, progress_cb)
Path('out').mkdir()
with pushd_popd('out'):
mimetype = guess_media_type(f'../{archive_fname}', fallback='application/octet-stream')
log.info("Extracting %s archive to %s/out" % (mimetype, tempdir))
if mimetype == 'application/zip':
with ZipFile(f'../{archive_fname}', 'r') as zipf:
zipf.extractall()
elif mimetype in ('application/gzip', 'application/x-xz'):
with open_tarfile(f'../{archive_fname}', 'r:*') as tar:
tar.extractall()
else:
raise RuntimeError("Unable to handle extraction of %s archive %s" % (mimetype, url))
log.info("Copying '%s' from archive to %s" % (path_in_archive, fpath))
if Path(path_in_archive).is_dir():
copytree(path_in_archive, str(fpath))
else:
copy(path_in_archive, str(fpath))
return fpath
def _dedup_database(self, database=None, dedup_key='name'):
"""
Deduplicate resources by name
"""
if not database:
database = self.database
for executable, reslist in database.items():
reslist_dedup = []
for resdict in reslist:
if not any(r[dedup_key] == resdict[dedup_key] for r in reslist_dedup):
reslist_dedup.append(resdict)
database[executable] = reslist_dedup
return database