Source code for compass.io

import os
import tempfile
import requests
import progressbar
from urllib.parse import urlparse
import importlib.resources


[docs] def download(url, dest_path, config, exceptions=True): """ Download a file from a URL to the given path or path name Parameters ---------- url : str The URL (including file name) to download dest_path : str The path (including file name) where the downloaded file should be saved config : compass.config.CompassConfigParser Configuration options used to find custom paths if ``dest_path`` is a config option exceptions : bool, optional Whether to raise exceptions when the download fails Returns ------- dest_path : str The resulting file name if the download was successful, or None if not """ in_file_name = os.path.basename(urlparse(url).path) dest_path = os.path.abspath(dest_path) out_file_name = os.path.basename(dest_path) do_download = config.getboolean('download', 'download') check_size = config.getboolean('download', 'check_size') verify = config.getboolean('download', 'verify') if not do_download: if not os.path.exists(dest_path): raise OSError(f'File not found and downloading is disabled: ' f'{dest_path}') return dest_path if not check_size and os.path.exists(dest_path): return dest_path session = requests.Session() if not verify: session.verify = False # dest_path contains full path, so we need to make the relevant # subdirectories if they do not exist already directory = os.path.dirname(dest_path) try: os.makedirs(directory) except OSError: pass try: response = session.get(url, stream=True) total_size = response.headers.get('content-length') except requests.exceptions.RequestException: if exceptions: raise else: print(f' {url} could not be reached!') return None try: response.raise_for_status() except requests.exceptions.HTTPError as e: if exceptions: raise else: print(f'ERROR while downloading {in_file_name}:') print(e) return None if total_size is None: # no content length header if not os.path.exists(dest_path): dest_dir = os.path.dirname(dest_path) with open(dest_path, 'wb') as f: print(f'Downloading {in_file_name}\n' f' to {dest_dir}...') try: f.write(response.content) except requests.exceptions.RequestException: if exceptions: raise else: print(f' {in_file_name} failed!') return None else: print(' {in_file_name} done.') else: # we can do the download in chunks and use a progress bar, yay! total_size = int(total_size) if os.path.exists(dest_path) and \ total_size == os.path.getsize(dest_path): # we already have the file, so just return return dest_path if out_file_name == in_file_name: file_names = in_file_name else: file_names = f'{in_file_name} as {out_file_name}' dest_dir = os.path.dirname(dest_path) print(f'Downloading {file_names} ({_sizeof_fmt(total_size)})\n' f' to {dest_dir}') widgets = [progressbar.Percentage(), ' ', progressbar.Bar(), ' ', progressbar.ETA()] bar = progressbar.ProgressBar(widgets=widgets, max_value=total_size).start() size = 0 with open(dest_path, 'wb') as f: try: for data in response.iter_content(chunk_size=4096): size += len(data) f.write(data) bar.update(size) bar.finish() except requests.exceptions.RequestException: if exceptions: raise else: print(f' {in_file_name} failed!') return None else: print(f' {in_file_name} done.') return dest_path
[docs] def package_path(package, resource): """ A replacement for deprecated ``importlib.resources.path()``: https://github.com/python/importlib_resources/blob/7e9020a1b84726fdc6ba71ee2893119d1ee61e02/importlib_resources/_legacy.py Parameters ---------- package : Package The python package for the resource resource : Resource The file within the package """ parent, file_name = os.path.split(str(resource)) if parent: raise ValueError(f'{resource!r} must be only a file name') return importlib.resources.as_file( importlib.resources.files(package) / file_name)
# From https://stackoverflow.com/a/1094933/7728169 def _sizeof_fmt(num, suffix='B'): """ Covert a number of bytes to a human-readable file size """ for unit in ['', 'Ki', 'Mi', 'Gi', 'Ti', 'Pi', 'Ei', 'Zi']: if abs(num) < 1024.0: return "%3.1f%s%s" % (num, unit, suffix) num /= 1024.0 return "%.1f%s%s" % (num, 'Yi', suffix)