MSYS2-packages/.ci/ci-generate-srcinfo.py
2021-09-06 13:02:14 -07:00

237 lines
7.9 KiB
Python

#!/usr/bin/env python3
# Copyright 2017 Christoph Reiter
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be included
# in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
# IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
# CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
# TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE
import sys
import argparse
import os
import json
import shutil
from collections import OrderedDict
import hashlib
import time
import shlex
import subprocess
from concurrent.futures import ThreadPoolExecutor
from typing import List, Iterator, Tuple, Dict, Optional, Union, Collection
CacheEntry = Dict[str, Union[str, Collection[str]]]
CacheTuple = Tuple[str, CacheEntry]
Cache = Dict[str, CacheEntry]
def normalize_repo(repo: str) -> str:
if repo.endswith(".git"):
repo = repo.rsplit(".", 1)[0]
return repo
def normalize_path(path: str) -> str:
return path.replace("\\", "/")
def check_output_retry(*args, **kwargs):
# XXX: git sometimes crashes when called concurrently,
# so we retry a few times..
run = 0
max_ = 5
while True:
try:
return subprocess.check_output(*args, **kwargs)
except subprocess.CalledProcessError as e:
if run <= max_ and e.returncode == 127:
time.sleep(0.1 * run)
run += 1
continue
else:
raise
def get_cache_key(pkgbuild_path: str) -> str:
pkgbuild_path = os.path.abspath(pkgbuild_path)
git_cwd = os.path.dirname(pkgbuild_path)
git_path = os.path.relpath(pkgbuild_path, git_cwd)
h = hashlib.new("SHA1")
with open(pkgbuild_path, "rb") as f:
h.update(f.read())
fileinfo = check_output_retry(
["git", "ls-files", "-s", "--full-name", git_path],
cwd=git_cwd).decode("utf-8").strip()
h.update(normalize_path(fileinfo).encode("utf-8"))
repo = check_output_retry(
["git", "ls-remote", "--get-url", "origin"],
cwd=git_cwd).decode("utf-8").strip()
repo = normalize_repo(repo)
h.update(repo.encode("utf-8"))
return h.hexdigest()
def get_mingw_arch_list(pkgbuild_path: str) -> List[str]:
bash = shutil.which("bash")
assert bash is not None
sub_commands = [
shlex.join(['source', pkgbuild_path]),
'echo -n "${mingw_arch[@]}"'
]
out = subprocess.check_output([bash, '-c', ';'.join(sub_commands)], universal_newlines=True)
arch_list = out.strip().split()
if not arch_list:
arch_list = ["mingw32", "mingw64", "ucrt64", "clang64"]
assert arch_list
return arch_list
def get_srcinfo_for_pkgbuild(args: Tuple[str, str]) -> Optional[CacheTuple]:
pkgbuild_path, mode = args
pkgbuild_path = os.path.abspath(pkgbuild_path)
git_cwd = os.path.dirname(pkgbuild_path)
git_path = os.path.relpath(pkgbuild_path, git_cwd)
key = get_cache_key(pkgbuild_path)
bash = shutil.which("bash")
if bash is None:
print("ERROR: bash not found")
return None
print("Parsing %r" % pkgbuild_path)
try:
srcinfos = {}
if mode == "mingw":
for name in get_mingw_arch_list(pkgbuild_path):
env = os.environ.copy()
env["MINGW_ARCH"] = name
srcinfos[name] = subprocess.check_output(
[bash, "/usr/bin/makepkg-mingw",
"--printsrcinfo", "-p", git_path],
cwd=git_cwd,
env=env).decode("utf-8")
else:
srcinfos["msys"] = subprocess.check_output(
[bash, "/usr/bin/makepkg",
"--printsrcinfo", "-p", git_path],
cwd=git_cwd).decode("utf-8")
repo = check_output_retry(
["git", "ls-remote", "--get-url", "origin"],
cwd=git_cwd).decode("utf-8").strip()
repo = normalize_repo(repo)
relpath = check_output_retry(
["git", "ls-files", "--full-name", git_path],
cwd=git_cwd).decode("utf-8").strip()
relpath = normalize_path(os.path.dirname(relpath))
date = check_output_retry(
["git", "log", "-1", "--format=%aI", git_path],
cwd=git_cwd).decode("utf-8").strip()
meta = {"repo": repo, "path": relpath, "date": date, "srcinfo": srcinfos}
except subprocess.CalledProcessError as e:
print("ERROR: %s %s" % (pkgbuild_path, e.output.splitlines()))
return None
return (key, meta)
def iter_pkgbuild_paths(repo_path: str) -> Iterator[str]:
repo_path = os.path.abspath(repo_path)
print("Searching for PKGBUILD files in %s" % repo_path)
for base, dirs, files in os.walk(repo_path):
for f in files:
if f == "PKGBUILD":
# in case we find a PKGBUILD, don't go deeper
del dirs[:]
path = os.path.join(base, f)
yield path
def get_srcinfo_from_cache(args: Tuple[str, Cache]) -> Tuple[str, Optional[CacheTuple]]:
pkgbuild_path, cache = args
key = get_cache_key(pkgbuild_path)
if key in cache:
return (pkgbuild_path, (key, cache[key]))
else:
return (pkgbuild_path, None)
def iter_srcinfo(repo_path: str, mode: str, cache: Cache) -> Iterator[Optional[CacheTuple]]:
with ThreadPoolExecutor() as executor:
to_parse: List[Tuple[str, str]] = []
pool_iter = executor.map(
get_srcinfo_from_cache, ((p, cache) for p in iter_pkgbuild_paths(repo_path)))
for pkgbuild_path, srcinfo in pool_iter:
if srcinfo is not None:
yield srcinfo
else:
to_parse.append((pkgbuild_path, mode))
print("Parsing PKGBUILD files...")
for srcinfo in executor.map(get_srcinfo_for_pkgbuild, to_parse):
yield srcinfo
def main(argv: List[str]) -> Optional[Union[int, str]]:
parser = argparse.ArgumentParser(description="Create SRCINFOs for all packages in a repo", allow_abbrev=False)
parser.add_argument('mode', choices=['msys', 'mingw'], help="The type of the repo")
parser.add_argument("repo_path", help="The path to GIT repo")
parser.add_argument("json_cache", help="The path to the json file used to fetch/store the results")
parser.add_argument("--time-limit", action="store",
type=int, dest="time_limit", default=0,
help='time after which it will stop and save, 0 means no limit')
args = parser.parse_args(argv[1:])
t = time.monotonic()
srcinfo_path = os.path.abspath(args.json_cache)
cache: Cache = {}
try:
with open(srcinfo_path, "rb") as h:
cache = json.loads(h.read())
except FileNotFoundError:
pass
srcinfos = []
for entry in iter_srcinfo(args.repo_path, args.mode, cache):
if entry is None:
continue
srcinfos.append(entry)
# So we stop before CI times out
if args.time_limit and time.monotonic() - t > args.time_limit:
print("time limit reached, stopping")
break
srcinfos_dict = OrderedDict(sorted(srcinfos))
with open(srcinfo_path, "wb") as h:
h.write(json.dumps(srcinfos_dict, indent=2).encode("utf-8"))
return None
if __name__ == "__main__":
sys.exit(main(sys.argv))