#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
'''Mirroring functionality for conda channels
Some constructs are bluntly copied from
https://github.com/valassis-digital-media/conda-mirror
'''
import os
import bz2
import json
import time
import random
import hashlib
import fnmatch
import tempfile
import requests
from .log import get_logger
logger = get_logger(__name__)
def _download(url, target_directory):
"""Download `url` to `target_directory`
Parameters
----------
url : str
The url to download
target_directory : str
The path to a directory where `url` should be downloaded
Returns
-------
file_size: int
The size in bytes of the file that was downloaded
"""
file_size = 0
chunk_size = 1024 # 1KB chunks
logger.info("Download %s -> %s", url, target_directory)
# create a temporary file
target_filename = url.split('/')[-1]
download_filename = os.path.join(target_directory, target_filename)
with open(download_filename, 'w+b') as tf:
ret = requests.get(url, stream=True)
size = ret.headers.get('Content-length', '??')
logger.debug('Saving to %s (%s bytes)', download_filename, size)
for data in ret.iter_content(chunk_size):
tf.write(data)
file_size = os.path.getsize(download_filename)
return file_size
def _list_conda_packages(local_dir):
"""List the conda packages (*.tar.bz2 or *.conda files) in `local_dir`
Parameters
----------
local_dir : str
Some local directory with (hopefully) some conda packages in it
Returns
-------
list
List of conda packages in `local_dir`
"""
contents = os.listdir(local_dir)
return fnmatch.filter(contents, "*.conda") + \
fnmatch.filter(contents, "*.tar.bz2")
[docs]def get_json(channel, platform, name):
"""Get a JSON file for a channel/platform combo on conda channel
Parameters
----------
channel : str
Complete channel URL
platform : {'linux-64', 'osx-64', 'noarch'}
The platform of interest
name : str
The name of the file to retrieve. If the name ends in '.bz2', then it
is auto-decompressed
Returns
-------
repodata : dict
contents of repodata.json
"""
url = channel + '/' + platform + '/' + name
logger.debug('[checking] %s...', url)
r = requests.get(url, allow_redirects=True, stream=True)
size = r.headers.get('Content-length', '??')
logger.info('[download] %s (%s bytes)...', url, size)
if name.endswith('.bz2'):
# just in case transport encoding was applied
r.raw.decode_content = True
data = bz2.decompress(r.raw.read())
return json.loads(data)
# else, just decodes the response
return r.json()
[docs]def get_local_contents(path, arch):
"""Returns the local package contents as a set"""
path_arch = os.path.join(path, arch)
if not os.path.exists(path_arch):
return set()
# path exists, lists currently available packages
logger.info('Listing package contents of %s...', path_arch)
contents = os.listdir(path_arch)
return set(fnmatch.filter(contents, '*.tar.bz2') +
fnmatch.filter(contents, '*.conda'))
[docs]def load_glob_list(path):
"""Loads a list of globs from a configuration file
Excludes comments and empty lines
"""
retval = [str(k.strip()) for k in open(path, "rt")]
return [k for k in retval if k and k[0] not in ("#", "-")]
[docs]def blacklist_filter(packages, globs):
"""Filters **out** the input package set with the glob list"""
to_remove = set()
for k in globs:
to_remove |= set(fnmatch.filter(packages, k))
return packages - to_remove
[docs]def whitelist_filter(packages, globs):
"""Filters **in** the input package set with the glob list"""
to_keep = set()
for k in globs:
to_keep |= set(fnmatch.filter(packages, k))
return to_keep
def _sha256sum(filename):
"""Calculates and returns the sha-256 sum given a file name"""
h = hashlib.sha256()
b = bytearray(128*1024)
mv = memoryview(b)
with open(filename, 'rb', buffering=0) as f:
for n in iter(lambda : f.readinto(mv), 0):
h.update(mv[:n])
return h.hexdigest()
def _md5sum(filename):
"""Calculates and returns the md5 sum given a file name"""
h = hashlib.md5()
b = bytearray(128*1024)
mv = memoryview(b)
with open(filename, 'rb', buffering=0) as f:
for n in iter(lambda : f.readinto(mv), 0):
h.update(mv[:n])
return h.hexdigest()
[docs]def download_packages(packages, repodata, channel_url, dest_dir, arch, dry_run):
"""Downloads remote packages to a download directory
Packages are downloaded first to a temporary directory, then validated
according to the expected sha256/md5 sum and then moved, one by one, to the
destination directory. An error is raised if the package cannot be
correctly downloaded.
Parameters
----------
packages : list of str
List of packages to download from the remote channel
repodata: dict
A dictionary containing the remote repodata.json contents
channel_url: str
The complete channel URL
dest_dir: str
The local directory where the channel is being mirrored
arch: str
The current architecture which we are mirroring
dry_run: bool
A boolean flag indicating if this is just a dry-run (simulation),
flagging so we don't really do anything (set to ``True``).
"""
# download files into temporary directory, that is removed by the end of
# the procedure, or if something bad occurs
with tempfile.TemporaryDirectory() as download_dir:
total = len(packages)
for k, p in enumerate(packages):
k+=1 #adjust to produce correct order on printouts
# checksum to verify
if p.endswith('.tar.bz2'):
expected_hash = repodata['packages'][p].get('sha256',
repodata['packages'][p]['md5'])
else:
expected_hash = repodata['packages.conda'][p].get('sha256',
repodata['packages.conda'][p]['md5'])
# download package to file in our temporary directory
url = channel_url + '/' + arch + '/' + p
temp_dest = os.path.join(download_dir, p)
logger.info('[download: %d/%d] %s -> %s', k, total, url, temp_dest)
package_retries = 10
while package_retries:
if not dry_run:
logger.debug('[checking: %d/%d] %s', k, total, url)
r = requests.get(url, stream=True, allow_redirects=True)
size = r.headers.get('Content-length', '??')
logger.info('[download: %d/%d] %s -> %s (%s bytes)', k,
total, url, temp_dest, size)
open(temp_dest, 'wb').write(r.raw.read())
# verify that checksum matches
if len(expected_hash) == 32: #md5
logger.info('[verify: %d/%d] md5(%s) == %s?', k, total,
temp_dest, expected_hash)
else: #sha256
logger.info('[verify: %d/%d] sha256(%s) == %s?', k, total,
temp_dest, expected_hash)
if not dry_run:
if len(expected_hash) == 32: #md5
actual_hash = _md5sum(temp_dest)
else: #sha256
actual_hash = _sha256sum(temp_dest)
if actual_hash != expected_hash:
wait_time = random.randint(10,61)
logger.warning('Checksum of locally downloaded ' \
'version of %s does not match ' \
'(actual:%r != %r:expected) - retrying ' \
'after %d seconds', url, actual_hash,
expected_hash, wait_time)
os.unlink(temp_dest)
time.sleep(wait_time)
package_retries -= 1
continue
else:
break
# final check, before we continue
assert actual_hash == expected_hash, 'Checksum of locally ' \
'downloaded version of %s does not match ' \
'(actual:%r != %r:expected)' % (url, actual_hash,
expected_hash)
# move
local_dest = os.path.join(dest_dir, arch, p)
logger.info('[move: %d/%d] %s -> %s', k, total, temp_dest,
local_dest)
# check local directory is available before moving
dirname = os.path.dirname(local_dest)
if not os.path.exists(dirname):
logger.info('[mkdir] %s', dirname)
if not dry_run:
os.makedirs(dirname)
if not dry_run:
os.rename(temp_dest, local_dest)
[docs]def remove_packages(packages, dest_dir, arch, dry_run):
"""Removes local packages that no longer matter"""
total = len(packages)
for k, p in enumerate(packages):
k+=1 #adjust to produce correct order on printouts
path = os.path.join(dest_dir, arch, p)
logger.info('[remove: %d/%d] %s', k, total, path)
if not dry_run:
os.unlink(path)
def _cleanup_json(data, packages):
"""Cleans-up the contents of conda JSON looking at existing packages"""
# only keys to clean-up here, othere keys remain unchanged
for key in ('packages', 'packages.conda'):
if key not in data: continue
data[key] = dict((k,v) for k,v in data[key].items() if k in packages)
return data
def _save_json(data, dest_dir, arch, name, dry_run):
"""Saves contents of conda JSON"""
destfile = os.path.join(dest_dir, arch, name)
if not dry_run:
with open(destfile, 'w') as outfile:
json.dump(data, outfile, ensure_ascii=True, indent=2)
return destfile
[docs]def copy_and_clean_json(url, dest_dir, arch, name, dry_run):
"""Copies and cleans conda JSON file"""
data = get_json(url, arch, name)
packages = get_local_contents(dest_dir, arch)
data = _cleanup_json(data, packages)
return _save_json(data, dest_dir, arch, name, dry_run)
[docs]def copy_and_clean_patch(url, dest_dir, arch, name, dry_run):
"""Copies and cleans conda patch_instructions JSON file"""
data = get_json(url, arch, name)
packages = get_local_contents(dest_dir, arch)
data = _cleanup_json(data, packages)
# cleanup specific patch_instructions.json fields
for key in ["remove", "revoke"]:
data[key] = [k for k in data[key] if k in packages]
return _save_json(data, dest_dir, arch, name, dry_run)
[docs]def checksum_packages(repodata, dest_dir, arch, packages):
"""Checksums packages on the local mirror and compare to remote repository
Parameters
----------
repodata : dict
Data loaded from `repodata.json` on the remote repository
dest_dir : str
Path leading to local mirror
arch : str
Current architecture being considered (e.g. noarch, linux-64 or osx-64)
packages : list
List of packages that are available locally, by name
Returns
-------
issues : list
List of matching errors
"""
issues = set()
total = len(packages)
for k, p in enumerate(packages):
path_to_package = os.path.join(dest_dir, arch, p)
# checksum to verify
if p.endswith('.tar.bz2'):
expected_hash = repodata['packages'][p].get('sha256',
repodata['packages'][p]['md5'])
else:
expected_hash = repodata['packages.conda'][p].get('sha256',
repodata['packages.conda'][p]['md5'])
# verify that checksum matches
if len(expected_hash) == 32: #md5
logger.debug('[verify: %d/%d] md5(%s) == %s?', k, total,
path_to_package, expected_hash)
else: #sha256
logger.debug('[verify: %d/%d] sha256(%s) == %s?', k, total,
path_to_package, expected_hash)
if len(expected_hash) == 32: #md5
actual_hash = _md5sum(path_to_package)
else: #sha256
actual_hash = _sha256sum(path_to_package)
if actual_hash != expected_hash:
logger.warning('Checksum of %s does not match remote ' \
'repository description (actual:%r != %r:expected)',
path_to_package, actual_hash, expected_hash)
issues.add(p)
return issues