#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Utilities to needed to release packages."""
import os
import re
import shutil
import time
from distutils.version import StrictVersion
import gitlab
from .log import get_logger
logger = get_logger(__name__)
[docs]def download_path(package, path, output=None, ref="master"):
"""Downloads paths from gitlab, with an optional recurse.
This method will download an archive of the repository from chosen
reference, and then it will search inside the zip blob for the path to be
copied into output. It uses :py:class:`zipfile.ZipFile` to do this search.
This method will not be very efficient for larger repository references,
but works recursively by default.
Args:
package: the gitlab package object to use (should be pre-fetched)
path: the path on the project to download
output: where to place the path to be downloaded - if not provided, use
the basename of ``path`` as storage point with respect to the current
directory
ref: the name of the git reference (branch, tag or commit hash) to use
"""
import tarfile
import tempfile
from io import BytesIO
output = output or os.path.realpath(os.curdir)
logger.debug(
'Downloading archive of "%s" from "%s"...',
ref,
package.attributes["path_with_namespace"],
)
archive = package.repository_archive(ref=ref)
logger.debug("Archive has %d bytes", len(archive))
logger.debug('Searching for "%s" within archive...', path)
with tempfile.TemporaryDirectory() as d:
with tarfile.open(fileobj=BytesIO(archive), mode="r:gz") as f:
f.extractall(path=d)
# move stuff to "output"
basedir = os.listdir(d)[0]
shutil.move(os.path.join(d, basedir, path), output)
[docs]def get_gitlab_instance():
"""Returns an instance of the gitlab object for remote operations."""
# tries to figure if we can authenticate using a global configuration
cfgs = ["~/.python-gitlab.cfg", "/etc/python-gitlab.cfg"]
cfgs = [os.path.expanduser(k) for k in cfgs]
if any([os.path.exists(k) for k in cfgs]):
gl = gitlab.Gitlab.from_config(
"idiap", [k for k in cfgs if os.path.exists(k)]
)
else: # ask the user for a token or use one from the current runner
server = "https://gitlab.idiap.ch"
token = os.environ.get("CI_JOB_TOKEN")
if token is None:
logger.debug(
"Did not find any of %s nor CI_JOB_TOKEN is defined. "
"Asking for user token on the command line...",
"|".join(cfgs),
)
token = input("Your %s (private) token: " % server)
gl = gitlab.Gitlab(server, private_token=token, api_version=4)
return gl
def _update_readme(readme, version):
"""Inside text of the readme, replaces parts of the links to the provided
version. If version is not provided, replace to `stable` or `master`.
Args:
readme: Text of the README.rst file from a bob package
version: Format of the version string is '#.#.#'
Returns: New text of readme with all replaces done
"""
# replace the badge in the readme's text with the given version
DOC_IMAGE = re.compile(
r"\-(available|master|latest|(v\d+\.\d+\.\d+([abc]\d+)?))\-"
)
BRANCH_RE = re.compile(r"/(stable|master|(v\d+\.\d+\.\d+([abc]\d+)?))")
new_readme = []
for line in readme.splitlines():
if BRANCH_RE.search(line) is not None:
if "gitlab" in line: # gitlab links
replacement = (
"/v%s" % version if version is not None else "/master"
)
line = BRANCH_RE.sub(replacement, line)
if ("software/bob" in line) or (
"software/beat" in line
): # our doc server
replacement = (
"/v%s" % version if version is not None else "/master"
)
line = BRANCH_RE.sub(replacement, line)
if DOC_IMAGE.search(line) is not None:
replacement = (
"-v%s-" % version if version is not None else "-latest-"
)
line = DOC_IMAGE.sub(replacement, line)
new_readme.append(line)
return "\n".join(new_readme) + "\n"
[docs]def get_latest_tag_name(gitpkg):
"""Find the name of the latest tag for a given package in the format
'#.#.#'.
Args:
gitpkg: gitlab package object
Returns: The name of the latest tag in format '#.#.#'. None if no tags for
the package were found.
"""
# get 50 latest tags as a list
latest_tags = gitpkg.releases.list(all=True)
if not latest_tags:
return None
# create list of tags' names but ignore the first 'v' character in each name
# also filter out non version tags
tag_names = [
tag.name[1:]
for tag in latest_tags
if StrictVersion.version_re.match(tag.name[1:])
]
if not tag_names: # no tags wee found.
return None
# sort them correctly according to each subversion number
tag_names.sort(key=StrictVersion)
# take the last one, as it is the latest tag in the sorted tags
latest_tag_name = tag_names[-1]
return latest_tag_name
[docs]def get_parsed_tag(gitpkg, tag):
"""An older tag is formatted as 'v2.1.3 (Sep 22, 2017 10:37)', from which
we need only v2.1.3.
The latest tag is either patch, minor, major, or none
"""
m = re.search(r"(v\d+\.\d+\.\d+)", tag)
if m:
return m.group(0)
# tag = Version(tag)
# if we bump the version, we need to find the latest released version for
# this package
if tag in ("major", "minor", "patch"):
# find the correct latest tag of this package (without 'v' in front),
# None if there are no tags yet
latest_tag_name = get_latest_tag_name(gitpkg)
# if there were no tags yet, assume the very first version
if latest_tag_name is None:
if tag == "major":
assume_version = "v1.0.0"
elif tag == "minor":
assume_version = "v0.1.0"
elif tag == "patch":
assume_version = "v0.0.1"
logger.warn(
"Package %s does not have any tags. I'm assuming "
'version will be %s since you proposed a "%s" bump',
gitpkg.attributes["path_with_namespace"],
assume_version,
tag,
)
return assume_version
# check that it has expected format #.#.#
# latest_tag_name = Version(latest_tag_name)
m = re.match(r"(\d+\.\d+\.\d+)", latest_tag_name)
if not m:
raise ValueError(
"The latest tag name {0} in package {1} has "
"unknown format".format(
"v" + latest_tag_name,
gitpkg.attributes["path_with_namespace"],
)
)
# increase the version accordingly
major, minor, patch = latest_tag_name.split(".")
# handle alpha and beta version
if "a" in patch or "b" in patch:
patch = patch.split("a")[0].split("b")[0]
patch = int(patch) - 1
if "major" == tag:
# increment the first number in 'v#.#.#' but make minor and patch
# to be 0
return "v" + str(int(major) + 1) + ".0.0"
if "minor" == tag:
# increment the second number in 'v#.#.#' but make patch to be 0
return "v" + major + "." + str(int(minor) + 1) + ".0"
if "patch" == tag:
# increment the last number in 'v#.#.#'
return "v" + major + "." + minor + "." + str(int(patch) + 1)
if "none" == tag:
# we do nothing in this case
return tag
raise ValueError(
"Cannot parse changelog tag {0} of the "
"package {1}".format(tag, gitpkg.attributes["path_with_namespace"])
)
[docs]def update_files_with_mr(
gitpkg, files_dict, message, branch, automerge, dry_run, user_id
):
"""Update (via a commit) files of a given gitlab package, through an MR.
This function can update a file in a gitlab package, but will do this
through a formal merge request. You can auto-merge this request
Args:
gitpkg: gitlab package object
files_dict: Dictionary of file names and their contents (as text)
message: Commit message
branch: The branch name to use for the merge request
automerge: If we should set the "merge if build succeeds" flag on the
created MR
dry_run: If True, nothing will be pushed to gitlab
user_id: The integer which numbers the user to attribute this MR to
"""
data = {
"branch": branch,
"start_branch": "master",
"commit_message": message,
"actions": [],
}
# add files to update
for filename in files_dict.keys():
update_action = dict(action="update", file_path=filename)
update_action["content"] = files_dict[filename]
data["actions"].append(update_action)
logger.debug(
"Committing changes in files (%s) to branch '%s'",
", ".join(files_dict.keys()),
branch,
)
if not dry_run:
commit = gitpkg.commits.create(data)
logger.info(
"Created commit %s at %s (branch=%s)",
commit.short_id,
gitpkg.attributes["path_with_namespace"],
branch,
)
logger.debug("Creating merge request %s -> master", branch)
if not dry_run:
mr = gitpkg.mergerequests.create(
{
"source_branch": branch,
"target_branch": "master",
"title": message,
"remove_source_branch": True,
"assignee_id": user_id,
}
)
logger.info(
"Created merge-request !%d (%s -> %s) at %s",
mr.iid,
branch,
"master",
gitpkg.attributes["path_with_namespace"],
)
if automerge:
if "[ci skip]" in message.lower() or "[skip ci]" in message.lower():
logger.info("Merging !%d immediately - CI was skipped", mr.iid)
mr.merge()
else:
logger.info(
"Auto-merging !%d only if pipeline succeeds", mr.iid
)
time.sleep(
0.5
) # to avoid the MR to be merged automatically - bug?
mr.merge(merge_when_pipeline_succeeds=True)
[docs]def update_files_at_master(gitpkg, files_dict, message, dry_run):
"""Update (via a commit) files of a given gitlab package, directly on the
master branch.
Args:
gitpkg: gitlab package object
files_dict: Dictionary of file names and their contents (as text)
message: Commit message
dry_run: If True, nothing will be committed or pushed to GitLab
"""
data = {"branch": "master", "commit_message": message, "actions": []} # v4
# add files to update
for filename in files_dict.keys():
update_action = dict(action="update", file_path=filename)
update_action["content"] = files_dict[filename]
data["actions"].append(update_action)
logger.debug(
"Committing changes in files (%s) to branch 'master'",
", ".join(files_dict.keys()),
)
if not dry_run:
commit = gitpkg.commits.create(data)
logger.info(
"Created commit %s at %s (branch=%s)",
commit.short_id,
gitpkg.attributes["path_with_namespace"],
"master",
)
[docs]def get_last_pipeline(gitpkg):
"""Returns the last pipeline of the project.
Args:
gitpkg: gitlab package object
Returns: The gitlab object of the pipeline
"""
# wait for 10 seconds to ensure that if a pipeline was just submitted,
# we can retrieve it
time.sleep(10)
# get the last pipeline
return gitpkg.pipelines.list(per_page=1, page=1)[0]
[docs]def just_build_package(gitpkg, dry_run=False):
"""Creates the pipeline with the latest tag and starts it.
Args:
gitpkg: gitlab package object
dry_run: If True, the pipeline will not be created on GitLab
Returns:
"""
# get the latest tag
latest_tag_name = "v" + get_latest_tag_name(gitpkg)
# create the pipeline with this tag and start it
logger.info("Creating and starting pipeline for tag %s", latest_tag_name)
if not dry_run:
new_pipeline = gitpkg.pipelines.create({"ref": latest_tag_name})
return new_pipeline.id
return None
[docs]def wait_for_pipeline_to_finish(gitpkg, pipeline_id, dry_run=False):
"""Using sleep function, wait for the latest pipeline to finish building.
This function pauses the script until pipeline completes either
successfully or with error.
Args:
gitpkg: gitlab package object
pipeline_id: id of the pipeline for which we are waiting to finish
dry_run: If True, outputs log message and exit. There wil be no
waiting.
"""
sleep_step = 30
max_sleep = 120 * 60 # two hours
# pipeline = get_last_pipeline(gitpkg, before_last=before_last)
logger.warn(
'Waiting for the pipeline %s of "%s" to finish',
pipeline_id,
gitpkg.attributes["path_with_namespace"],
)
logger.warn("Do **NOT** interrupt!")
if dry_run:
return
# retrieve the pipeline we are waiting for
pipeline = gitpkg.pipelines.get(pipeline_id)
# probe and wait for the pipeline to finish
slept_so_far = 0
while pipeline.status == "running" or pipeline.status == "pending":
time.sleep(sleep_step)
slept_so_far += sleep_step
if slept_so_far > max_sleep:
raise ValueError(
"I cannot wait longer than {0} seconds for "
"pipeline {1} to finish running!".format(max_sleep, pipeline_id)
)
# probe gitlab to update the status of the pipeline
pipeline = gitpkg.pipelines.get(pipeline_id)
# finished running, now check if it succeeded
if pipeline.status != "success":
raise ValueError(
"Pipeline {0} of project {1} exited with "
'undesired status "{2}". Release is not possible.'.format(
pipeline_id,
gitpkg.attributes["path_with_namespace"],
pipeline.status,
)
)
logger.info(
"Pipeline %s of package %s SUCCEEDED. Continue processing.",
pipeline_id,
gitpkg.attributes["path_with_namespace"],
)
[docs]def cancel_last_pipeline(gitpkg):
"""Cancel the last started pipeline of a package.
Args:
gitpkg: gitlab package object
"""
pipeline = get_last_pipeline(gitpkg)
logger.info(
"Cancelling the last pipeline %s of project %s",
pipeline.id,
gitpkg.attributes["path_with_namespace"],
)
pipeline.cancel()
[docs]def release_package(gitpkg, tag_name, tag_comments_list, dry_run=False):
"""Release package.
The provided tag will be annotated with a given list of comments.
README.rst and version.txt files will also be updated according to the
release procedures.
Args:
gitpkg: gitlab package object
tag_name: The name of the release tag
tag_comments_list: New annotations for this tag in a form of list
dry_run: If True, nothing will be committed or pushed to GitLab
"""
# if there is nothing to release, just rebuild the package
latest_tag = get_latest_tag_name(gitpkg)
if tag_name == "none" or (latest_tag and ("v" + latest_tag) == tag_name):
logger.warn(
"Since the tag is 'none' or already exists, we just "
"re-build the last pipeline"
)
return just_build_package(gitpkg, dry_run)
# 1. Replace branch tag in Readme to new tag, change version file to new
# version tag. Add and commit to gitlab
version_number = tag_name[1:] # remove 'v' in front
readme_file = gitpkg.files.get(file_path="README.rst", ref="master")
readme_content = readme_file.decode().decode()
readme_content = _update_readme(readme_content, version_number)
# commit and push changes
update_files_at_master(
gitpkg,
{"README.rst": readme_content, "version.txt": version_number + "\n"},
"Increased stable version to %s" % version_number,
dry_run,
)
if not dry_run:
# cancel running the pipeline triggered by the last commit
cancel_last_pipeline(gitpkg)
# 2. Tag package with new tag and push
logger.info('Tagging "%s"', tag_name)
tag_comments = "\n".join(tag_comments_list)
logger.debug("Updating tag comments with:\n%s", tag_comments)
if not dry_run:
params = {
"name": tag_name,
"tag_name": tag_name,
"ref": "master",
}
if tag_comments:
params["description"] = tag_comments
gitpkg.releases.create(params)
# get the pipeline that is actually running with no skips
running_pipeline = get_last_pipeline(gitpkg)
# 3. Replace branch tag in Readme to master, change version file to beta
# version tag. Git add, commit, and push.
readme_content = _update_readme(readme_content, None)
major, minor, patch = version_number.split(".")
version_number = "{}.{}.{}b0".format(major, minor, int(patch) + 1)
# commit and push changes
update_files_at_master(
gitpkg,
{"README.rst": readme_content, "version.txt": version_number + "\n"},
"Increased latest version to %s [skip ci]" % version_number,
dry_run,
)
return running_pipeline.id
[docs]def parse_and_process_package_changelog(gl, gitpkg, package_changelog, dry_run):
"""Process the changelog of a single package.
Parse the log following specific format. Update annotations of the
provided older tags and release the package by following the last tag
description.
Args:
gl: Gitlab API object
gitpkg: gitlab package object
package_changelog: the changelog corresponding to the provided package
dry_run: If True, nothing will be committed or pushed to GitLab
Returns: the name of the latest tag, and tag's
comments
"""
cur_tag = None
cur_tag_comments = []
# we assume that changelog is formatted as structured text
# first line is the name of the package
for line in package_changelog:
if " *" == line[:3]: # a tag level
# write the comments collected for the previous tag
if cur_tag:
update_tag_comments(gitpkg, cur_tag, cur_tag_comments, dry_run)
cur_tag_comments = [] # reset comments
# parse the current tag name
cur_tag = get_parsed_tag(gitpkg, line[3:].strip())
else: # all other lines are assumed to be comments
cur_tag_comments.append(line.strip())
# return the last tag and comments for release
return cur_tag, cur_tag_comments