#!/usr/bin/env python
# vim: set fileencoding=utf-8 :
'''Mirroring functionality for conda channels
Some constructs are bluntly copied from
import os
import bz2
import json
import time
import random
import hashlib
import fnmatch
import tempfile
import requests
from .log import get_logger
logger = get_logger(__name__)
def _download(url, target_directory):
"""Download `url` to `target_directory`
url : str
The url to download
target_directory : str
The path to a directory where `url` should be downloaded
file_size: int
The size in bytes of the file that was downloaded
file_size = 0
chunk_size = 1024 # 1KB chunks
logger.info("Download %s -> %s", url, target_directory)
# create a temporary file
target_filename = url.split('/')[-1]
download_filename = os.path.join(target_directory, target_filename)
with open(download_filename, 'w+b') as tf:
ret = requests.get(url, stream=True)
size = ret.headers.get('Content-length', '??')
logger.debug('Saving to %s (%s bytes)', download_filename, size)
for data in ret.iter_content(chunk_size):
file_size = os.path.getsize(download_filename)
return file_size
def _list_conda_packages(local_dir):
"""List the conda packages (*.tar.bz2 or *.conda files) in `local_dir`
local_dir : str
Some local directory with (hopefully) some conda packages in it
List of conda packages in `local_dir`
contents = os.listdir(local_dir)
return fnmatch.filter(contents, "*.conda") + \
fnmatch.filter(contents, "*.tar.bz2")
[docs]def get_json(channel, platform, name):
"""Get a JSON file for a channel/platform combo on conda channel
channel : str
Complete channel URL
platform : {'linux-64', 'osx-64', 'noarch'}
The platform of interest
name : str
The name of the file to retrieve. If the name ends in '.bz2', then it
is auto-decompressed
repodata : dict
contents of repodata.json
url = channel + '/' + platform + '/' + name
logger.debug('[checking] %s...', url)
r = requests.get(url, allow_redirects=True, stream=True)
size = r.headers.get('Content-length', '??')
logger.info('[download] %s (%s bytes)...', url, size)
if name.endswith('.bz2'):
# just in case transport encoding was applied
r.raw.decode_content = True
data = bz2.decompress(r.raw.read())
return json.loads(data)
# else, just decodes the response
return r.json()
[docs]def get_local_contents(path, arch):
"""Returns the local package contents as a set"""
path_arch = os.path.join(path, arch)
if not os.path.exists(path_arch):
return set()
# path exists, lists currently available packages
logger.info('Listing package contents of %s...', path_arch)
contents = os.listdir(path_arch)
return set(fnmatch.filter(contents, '*.tar.bz2') +
fnmatch.filter(contents, '*.conda'))
[docs]def load_glob_list(path):
"""Loads a list of globs from a configuration file
Excludes comments and empty lines
retval = [str(k.strip()) for k in open(path, "rt")]
return [k for k in retval if k and k[0] not in ("#", "-")]
[docs]def blacklist_filter(packages, globs):
"""Filters **out** the input package set with the glob list"""
to_remove = set()
for k in globs:
to_remove |= set(fnmatch.filter(packages, k))
return packages - to_remove
[docs]def whitelist_filter(packages, globs):
"""Filters **in** the input package set with the glob list"""
to_keep = set()
for k in globs:
to_keep |= set(fnmatch.filter(packages, k))
return to_keep
def _sha256sum(filename):
"""Calculates and returns the sha-256 sum given a file name"""
h = hashlib.sha256()
b = bytearray(128*1024)
mv = memoryview(b)
with open(filename, 'rb', buffering=0) as f:
for n in iter(lambda : f.readinto(mv), 0):
return h.hexdigest()
def _md5sum(filename):
"""Calculates and returns the md5 sum given a file name"""
h = hashlib.md5()
b = bytearray(128*1024)
mv = memoryview(b)
with open(filename, 'rb', buffering=0) as f:
for n in iter(lambda : f.readinto(mv), 0):
return h.hexdigest()
[docs]def download_packages(packages, repodata, channel_url, dest_dir, arch, dry_run):
"""Downloads remote packages to a download directory
Packages are downloaded first to a temporary directory, then validated
according to the expected sha256/md5 sum and then moved, one by one, to the
destination directory. An error is raised if the package cannot be
correctly downloaded.
packages : list of str
List of packages to download from the remote channel
repodata: dict
A dictionary containing the remote repodata.json contents
channel_url: str
The complete channel URL
dest_dir: str
The local directory where the channel is being mirrored
arch: str
The current architecture which we are mirroring
dry_run: bool
A boolean flag indicating if this is just a dry-run (simulation),
flagging so we don't really do anything (set to ``True``).
# download files into temporary directory, that is removed by the end of
# the procedure, or if something bad occurs
with tempfile.TemporaryDirectory() as download_dir:
total = len(packages)
for k, p in enumerate(packages):
k+=1 #adjust to produce correct order on printouts
# checksum to verify
if p.endswith('.tar.bz2'):
expected_hash = repodata['packages'][p].get('sha256',
expected_hash = repodata['packages.conda'][p].get('sha256',
# download package to file in our temporary directory
url = channel_url + '/' + arch + '/' + p
temp_dest = os.path.join(download_dir, p)
logger.info('[download: %d/%d] %s -> %s', k, total, url, temp_dest)
package_retries = 10
while package_retries:
if not dry_run:
logger.debug('[checking: %d/%d] %s', k, total, url)
r = requests.get(url, stream=True, allow_redirects=True)
size = r.headers.get('Content-length', '??')
logger.info('[download: %d/%d] %s -> %s (%s bytes)', k,
total, url, temp_dest, size)
open(temp_dest, 'wb').write(r.raw.read())
# verify that checksum matches
if len(expected_hash) == 32: #md5
logger.info('[verify: %d/%d] md5(%s) == %s?', k, total,
temp_dest, expected_hash)
else: #sha256
logger.info('[verify: %d/%d] sha256(%s) == %s?', k, total,
temp_dest, expected_hash)
if not dry_run:
if len(expected_hash) == 32: #md5
actual_hash = _md5sum(temp_dest)
else: #sha256
actual_hash = _sha256sum(temp_dest)
if actual_hash != expected_hash:
wait_time = random.randint(10,61)
logger.warning('Checksum of locally downloaded ' \
'version of %s does not match ' \
'(actual:%r != %r:expected) - retrying ' \
'after %d seconds', url, actual_hash,
expected_hash, wait_time)
package_retries -= 1
# final check, before we continue
assert actual_hash == expected_hash, 'Checksum of locally ' \
'downloaded version of %s does not match ' \
'(actual:%r != %r:expected)' % (url, actual_hash,
# move
local_dest = os.path.join(dest_dir, arch, p)
logger.info('[move: %d/%d] %s -> %s', k, total, temp_dest,
# check local directory is available before moving
dirname = os.path.dirname(local_dest)
if not os.path.exists(dirname):
logger.info('[mkdir] %s', dirname)
if not dry_run:
if not dry_run:
os.rename(temp_dest, local_dest)
[docs]def remove_packages(packages, dest_dir, arch, dry_run):
"""Removes local packages that no longer matter"""
total = len(packages)
for k, p in enumerate(packages):
k+=1 #adjust to produce correct order on printouts
path = os.path.join(dest_dir, arch, p)
logger.info('[remove: %d/%d] %s', k, total, path)
if not dry_run:
def _cleanup_json(data, packages):
"""Cleans-up the contents of conda JSON looking at existing packages"""
# only keys to clean-up here, othere keys remain unchanged
for key in ('packages', 'packages.conda'):
if key not in data: continue
data[key] = dict((k,v) for k,v in data[key].items() if k in packages)
return data
def _save_json(data, dest_dir, arch, name, dry_run):
"""Saves contents of conda JSON"""
destfile = os.path.join(dest_dir, arch, name)
if not dry_run:
with open(destfile, 'w') as outfile:
json.dump(data, outfile, ensure_ascii=True, indent=2)
return destfile
[docs]def copy_and_clean_json(url, dest_dir, arch, name, dry_run):
"""Copies and cleans conda JSON file"""
data = get_json(url, arch, name)
packages = get_local_contents(dest_dir, arch)
data = _cleanup_json(data, packages)
return _save_json(data, dest_dir, arch, name, dry_run)
[docs]def copy_and_clean_patch(url, dest_dir, arch, name, dry_run):
"""Copies and cleans conda patch_instructions JSON file"""
data = get_json(url, arch, name)
packages = get_local_contents(dest_dir, arch)
data = _cleanup_json(data, packages)
# cleanup specific patch_instructions.json fields
for key in ["remove", "revoke"]:
data[key] = [k for k in data[key] if k in packages]
return _save_json(data, dest_dir, arch, name, dry_run)
[docs]def checksum_packages(repodata, dest_dir, arch, packages):
"""Checksums packages on the local mirror and compare to remote repository
repodata : dict
Data loaded from `repodata.json` on the remote repository
dest_dir : str
Path leading to local mirror
arch : str
Current architecture being considered (e.g. noarch, linux-64 or osx-64)
packages : list
List of packages that are available locally, by name
issues : list
List of matching errors
issues = set()
total = len(packages)
for k, p in enumerate(packages):
path_to_package = os.path.join(dest_dir, arch, p)
# checksum to verify
if p.endswith('.tar.bz2'):
expected_hash = repodata['packages'][p].get('sha256',
expected_hash = repodata['packages.conda'][p].get('sha256',
# verify that checksum matches
if len(expected_hash) == 32: #md5
logger.debug('[verify: %d/%d] md5(%s) == %s?', k, total,
path_to_package, expected_hash)
else: #sha256
logger.debug('[verify: %d/%d] sha256(%s) == %s?', k, total,
path_to_package, expected_hash)
if len(expected_hash) == 32: #md5
actual_hash = _md5sum(path_to_package)
else: #sha256
actual_hash = _sha256sum(path_to_package)
if actual_hash != expected_hash:
logger.warning('Checksum of %s does not match remote ' \
'repository description (actual:%r != %r:expected)',
path_to_package, actual_hash, expected_hash)
return issues