import os
import tempfile
import requests
import progressbar
from urllib.parse import urlparse
import importlib.resources
[docs]
def download(url, dest_path, config, exceptions=True):
"""
Download a file from a URL to the given path or path name
Parameters
----------
url : str
The URL (including file name) to download
dest_path : str
The path (including file name) where the downloaded file should be
saved
config : compass.config.CompassConfigParser
Configuration options used to find custom paths if ``dest_path`` is
a config option
exceptions : bool, optional
Whether to raise exceptions when the download fails
Returns
-------
dest_path : str
The resulting file name if the download was successful, or None if not
"""
in_file_name = os.path.basename(urlparse(url).path)
dest_path = os.path.abspath(dest_path)
out_file_name = os.path.basename(dest_path)
do_download = config.getboolean('download', 'download')
check_size = config.getboolean('download', 'check_size')
verify = config.getboolean('download', 'verify')
if not do_download:
if not os.path.exists(dest_path):
raise OSError(f'File not found and downloading is disabled: '
f'{dest_path}')
return dest_path
if not check_size and os.path.exists(dest_path):
return dest_path
session = requests.Session()
if not verify:
session.verify = False
# dest_path contains full path, so we need to make the relevant
# subdirectories if they do not exist already
directory = os.path.dirname(dest_path)
try:
os.makedirs(directory)
except OSError:
pass
try:
response = session.get(url, stream=True)
total_size = response.headers.get('content-length')
except requests.exceptions.RequestException:
if exceptions:
raise
else:
print(f' {url} could not be reached!')
return None
try:
response.raise_for_status()
except requests.exceptions.HTTPError as e:
if exceptions:
raise
else:
print(f'ERROR while downloading {in_file_name}:')
print(e)
return None
if total_size is None:
# no content length header
if not os.path.exists(dest_path):
dest_dir = os.path.dirname(dest_path)
with open(dest_path, 'wb') as f:
print(f'Downloading {in_file_name}\n'
f' to {dest_dir}...')
try:
f.write(response.content)
except requests.exceptions.RequestException:
if exceptions:
raise
else:
print(f' {in_file_name} failed!')
return None
else:
print(' {in_file_name} done.')
else:
# we can do the download in chunks and use a progress bar, yay!
total_size = int(total_size)
if os.path.exists(dest_path) and \
total_size == os.path.getsize(dest_path):
# we already have the file, so just return
return dest_path
if out_file_name == in_file_name:
file_names = in_file_name
else:
file_names = f'{in_file_name} as {out_file_name}'
dest_dir = os.path.dirname(dest_path)
print(f'Downloading {file_names} ({_sizeof_fmt(total_size)})\n'
f' to {dest_dir}')
widgets = [progressbar.Percentage(), ' ', progressbar.Bar(),
' ', progressbar.ETA()]
bar = progressbar.ProgressBar(widgets=widgets,
max_value=total_size).start()
size = 0
with open(dest_path, 'wb') as f:
try:
for data in response.iter_content(chunk_size=4096):
size += len(data)
f.write(data)
bar.update(size)
bar.finish()
except requests.exceptions.RequestException:
if exceptions:
raise
else:
print(f' {in_file_name} failed!')
return None
else:
print(f' {in_file_name} done.')
return dest_path
[docs]
def symlink(target, link_name, overwrite=True):
"""
From https://stackoverflow.com/a/55742015/7728169
Create a symbolic link named link_name pointing to target.
If link_name exists then FileExistsError is raised, unless overwrite=True.
When trying to overwrite a directory, IsADirectoryError is raised.
Parameters
----------
target : str
The file path to link to
link_name : str
The name of the new link
overwrite : bool, optional
Whether to replace an existing link if one already exists
"""
if not overwrite:
os.symlink(target, link_name)
return
# os.replace() may fail if files are on different filesystems
link_dir = os.path.dirname(link_name)
# Create link to target with temporary file_name
while True:
temp_link_name = tempfile.mktemp(dir=link_dir)
# os.* functions mimic as closely as possible system functions
# The POSIX symlink() returns EEXIST if link_name already exists
# https://pubs.opengroup.org/onlinepubs/9699919799/functions/symlink.html
try:
os.symlink(target, temp_link_name)
break
except FileExistsError:
pass
# Replace link_name with temp_link_name
try:
# Preempt os.replace on a directory with a nicer message
if not os.path.islink(link_name) and os.path.isdir(link_name):
raise IsADirectoryError(
f"Cannot symlink over existing directory: '{link_name}'")
os.replace(temp_link_name, link_name)
except BaseException:
if os.path.islink(temp_link_name):
os.remove(temp_link_name)
raise
[docs]
def package_path(package, resource):
"""
A replacement for deprecated ``importlib.resources.path()``:
https://github.com/python/importlib_resources/blob/7e9020a1b84726fdc6ba71ee2893119d1ee61e02/importlib_resources/_legacy.py
Parameters
----------
package : Package
The python package for the resource
resource : Resource
The file within the package
"""
parent, file_name = os.path.split(str(resource))
if parent:
raise ValueError(f'{resource!r} must be only a file name')
return importlib.resources.as_file(
importlib.resources.files(package) / file_name)
# From https://stackoverflow.com/a/1094933/7728169
def _sizeof_fmt(num, suffix='B'):
"""
Covert a number of bytes to a human-readable file size
"""
for unit in ['', 'Ki', 'Mi', 'Gi', 'Ti', 'Pi', 'Ei', 'Zi']:
if abs(num) < 1024.0:
return "%3.1f%s%s" % (num, unit, suffix)
num /= 1024.0
return "%.1f%s%s" % (num, 'Yi', suffix)