2023-03-24 14:09:24 +01:00

404 lines
16 KiB
Python

import fnmatch
import io
import json
from concurrent.futures import ThreadPoolExecutor
from enum import Enum
from typing import Any, Dict, List, Optional, Set, Tuple, cast
import requests
from github.GithubException import GithubException
from .config import (REQUESTS_TIMEOUT, ArchType, BuildType, Config,
build_type_is_src, get_all_build_types)
from .gh import (CachedAssets, download_text_asset, get_asset_filename,
get_current_repo, get_release, make_writable)
from .utils import get_requests_session, queue_website_update
class PackageStatus(Enum):
FINISHED = 'finished'
FINISHED_BUT_BLOCKED = 'finished-but-blocked'
FINISHED_BUT_INCOMPLETE = 'finished-but-incomplete'
FAILED_TO_BUILD = 'failed-to-build'
WAITING_FOR_BUILD = 'waiting-for-build'
WAITING_FOR_DEPENDENCIES = 'waiting-for-dependencies'
MANUAL_BUILD_REQUIRED = 'manual-build-required'
UNKNOWN = 'unknown'
def __str__(self) -> str:
return self.value
class Package(dict):
def __repr__(self) -> str:
return "Package(%r)" % self["name"]
def __hash__(self) -> int: # type: ignore
return id(self)
def __eq__(self, other: object) -> bool:
return self is other
def _get_build(self, build_type: BuildType) -> Dict:
return self["builds"].get(build_type, {})
def get_status(self, build_type: BuildType) -> PackageStatus:
build = self._get_build(build_type)
return build.get("status", PackageStatus.UNKNOWN)
def get_status_details(self, build_type: BuildType) -> Dict[str, Any]:
build = self._get_build(build_type)
return dict(build.get("status_details", {}))
def set_status(self, build_type: BuildType, status: PackageStatus,
description: Optional[str] = None,
urls: Optional[Dict[str, str]] = None) -> None:
build = self["builds"].setdefault(build_type, {})
build["status"] = status
meta: Dict[str, Any] = {}
meta["desc"] = description
if urls is None:
urls = {}
meta["urls"] = urls
build["status_details"] = meta
def set_blocked(
self, build_type: BuildType, status: PackageStatus,
dep: "Package", dep_type: BuildType) -> None:
dep_details = dep.get_status_details(dep_type)
dep_blocked = dep_details.get("blocked", {})
details = self.get_status_details(build_type)
blocked = details.get("blocked", {})
if dep_blocked:
blocked = dict(dep_blocked)
else:
blocked.setdefault(dep, set()).add(dep_type)
descs = []
for pkg, types in blocked.items():
descs.append("%s (%s)" % (pkg["name"], "/".join(types)))
self.set_status(build_type, status, "Blocked by: " + ", ".join(descs))
build = self._get_build(build_type)
build.setdefault("status_details", {})["blocked"] = blocked
def is_new(self, build_type: BuildType) -> bool:
build = self._get_build(build_type)
return build.get("new", False)
def get_build_patterns(self, build_type: BuildType) -> List[str]:
patterns = []
if build_type_is_src(build_type):
patterns.append(f"{self['name']}-{self['version']}.src.tar.[!s]*")
elif build_type in (Config.MINGW_ARCH_LIST + Config.MSYS_ARCH_LIST):
for item in self._get_build(build_type).get('packages', []):
patterns.append(f"{item}-{self['version']}-*.pkg.tar.zst")
else:
assert 0
return patterns
def get_failed_name(self, build_type: BuildType) -> str:
return f"{build_type}-{self['name']}-{self['version']}.failed"
def get_build_types(self) -> List[BuildType]:
build_types = [
t for t in self["builds"] if t in (Config.MINGW_ARCH_LIST + Config.MSYS_ARCH_LIST)]
if self["source"]:
if any((k in Config.MINGW_ARCH_LIST) for k in build_types):
build_types.append(Config.MINGW_SRC_BUILD_TYPE)
if any((k in Config.MSYS_ARCH_LIST) for k in build_types):
build_types.append(Config.MSYS_SRC_BUILD_TYPE)
return build_types
def _get_dep_build(self, build_type: BuildType) -> Dict:
if build_type == Config.MINGW_SRC_BUILD_TYPE:
build_type = Config.MINGW_SRC_ARCH
elif build_type == Config.MSYS_SRC_BUILD_TYPE:
build_type = Config.MSYS_SRC_ARCH
return self._get_build(build_type)
def is_optional_dep(self, dep: "Package", dep_type: BuildType) -> bool:
# Some deps are manually marked as optional to break cycles.
# This requires them to be in the main repo though, otherwise the cycle has to
# be fixed manually.
return dep["name"] in Config.OPTIONAL_DEPS.get(self["name"], []) and not dep.is_new(dep_type)
def get_depends(self, build_type: BuildType) -> "Dict[ArchType, Set[Package]]":
build = self._get_dep_build(build_type)
return build.get('ext-depends', {})
def get_rdepends(self, build_type: BuildType) -> "Dict[ArchType, Set[Package]]":
build = self._get_dep_build(build_type)
return build.get('ext-rdepends', {})
def get_buildqueue() -> List[Package]:
pkgs = []
session = get_requests_session()
r = session.get("https://packages.msys2.org/api/buildqueue2", timeout=REQUESTS_TIMEOUT)
r.raise_for_status()
for received in r.json():
pkg = Package(received)
pkg['repo'] = pkg['repo_url'].split('/')[-1]
pkgs.append(pkg)
# extract the package mapping
dep_mapping = {}
for pkg in pkgs:
for build in pkg['builds'].values():
for name in build['packages']:
dep_mapping[name] = pkg
# link up dependencies with the real package in the queue
for pkg in pkgs:
for build in pkg['builds'].values():
ver_depends: Dict[str, Set[Package]] = {}
for repo, deps in build['depends'].items():
for dep in deps:
ver_depends.setdefault(repo, set()).add(dep_mapping[dep])
build['ext-depends'] = ver_depends
# reverse dependencies
for pkg in pkgs:
for build in pkg['builds'].values():
r_depends: Dict[str, Set[Package]] = {}
for pkg2 in pkgs:
for r_repo, build2 in pkg2['builds'].items():
for repo, deps in build2['ext-depends'].items():
if pkg in deps:
r_depends.setdefault(r_repo, set()).add(pkg2)
build['ext-rdepends'] = r_depends
return pkgs
def get_cycles(pkgs: List[Package]) -> Set[Tuple[Package, Package]]:
cycles: Set[Tuple[Package, Package]] = set()
# In case the package is already built it doesn't matter if it is part of a cycle
def pkg_is_finished(pkg: Package, build_type: BuildType) -> bool:
return pkg.get_status(build_type) in [
PackageStatus.FINISHED,
PackageStatus.FINISHED_BUT_BLOCKED,
PackageStatus.FINISHED_BUT_INCOMPLETE,
]
for pkg in pkgs:
for build_type in pkg.get_build_types():
if build_type_is_src(build_type):
continue
if pkg_is_finished(pkg, build_type):
continue
build_type = cast(ArchType, build_type)
for dep_type, deps in pkg.get_depends(build_type).items():
for dep in deps:
if pkg_is_finished(dep, dep_type):
continue
# manually broken cycle
if pkg.is_optional_dep(dep, dep_type) or dep.is_optional_dep(pkg, build_type):
continue
dep_deps = dep.get_depends(dep_type)
if pkg in dep_deps.get(build_type, set()):
cycles.add(tuple(sorted([pkg, dep], key=lambda p: p["name"]))) # type: ignore
return cycles
def get_buildqueue_with_status(full_details: bool = False) -> List[Package]:
cached_assets = CachedAssets()
assets_failed = []
for build_type in get_all_build_types():
assets_failed.extend(cached_assets.get_failed_assets(build_type))
failed_urls = {}
if full_details:
# This might take a while, so only in full mode
with ThreadPoolExecutor(8) as executor:
for i, (asset, content) in enumerate(
zip(assets_failed, executor.map(download_text_asset, assets_failed))):
result = json.loads(content)
if result["urls"]:
failed_urls[get_asset_filename(asset)] = result["urls"]
def pkg_is_done(build_type: BuildType, pkg: Package) -> bool:
done_names = [get_asset_filename(a) for a in cached_assets.get_assets(build_type)]
for pattern in pkg.get_build_patterns(build_type):
if not fnmatch.filter(done_names, pattern):
return False
return True
def get_failed_urls(build_type: BuildType, pkg: Package) -> Optional[Dict[str, str]]:
failed_names = [get_asset_filename(a) for a in assets_failed]
name = pkg.get_failed_name(build_type)
if name in failed_names:
return failed_urls.get(name)
return None
def pkg_has_failed(build_type: BuildType, pkg: Package) -> bool:
failed_names = [get_asset_filename(a) for a in assets_failed]
name = pkg.get_failed_name(build_type)
return name in failed_names
def pkg_is_manual(build_type: BuildType, pkg: Package) -> bool:
if build_type_is_src(build_type):
return False
for pattern, types in Config.MANUAL_BUILD:
type_matches = not types or build_type in types
if type_matches and fnmatch.fnmatchcase(pkg['name'], pattern):
return True
return False
pkgs = get_buildqueue()
# basic state
for pkg in pkgs:
for build_type in pkg.get_build_types():
if pkg_is_done(build_type, pkg):
pkg.set_status(build_type, PackageStatus.FINISHED)
elif pkg_has_failed(build_type, pkg):
urls = get_failed_urls(build_type, pkg)
pkg.set_status(build_type, PackageStatus.FAILED_TO_BUILD, urls=urls)
elif pkg_is_manual(build_type, pkg):
pkg.set_status(build_type, PackageStatus.MANUAL_BUILD_REQUIRED)
else:
pkg.set_status(build_type, PackageStatus.WAITING_FOR_BUILD)
# wait for dependencies to be finished before starting a build
for pkg in pkgs:
for build_type in pkg.get_build_types():
status = pkg.get_status(build_type)
if status == PackageStatus.WAITING_FOR_BUILD:
for dep_type, deps in pkg.get_depends(build_type).items():
for dep in deps:
dep_status = dep.get_status(dep_type)
if dep_status != PackageStatus.FINISHED:
if pkg.is_optional_dep(dep, dep_type):
continue
pkg.set_blocked(
build_type, PackageStatus.WAITING_FOR_DEPENDENCIES, dep, dep_type)
# Block packages where not all deps/rdeps/related are finished
changed = True
while changed:
changed = False
for pkg in pkgs:
for build_type in pkg.get_build_types():
status = pkg.get_status(build_type)
if status == PackageStatus.FINISHED:
# src builds are independent
if build_type_is_src(build_type):
continue
for dep_type, deps in pkg.get_depends(build_type).items():
for dep in deps:
dep_status = dep.get_status(dep_type)
if dep_status != PackageStatus.FINISHED:
pkg.set_blocked(
build_type, PackageStatus.FINISHED_BUT_BLOCKED, dep, dep_type)
changed = True
for dep_type, deps in pkg.get_rdepends(build_type).items():
for dep in deps:
if dep["name"] in Config.IGNORE_RDEP_PACKAGES:
continue
dep_status = dep.get_status(dep_type)
dep_new = dep.is_new(dep_type)
# if the rdep isn't in the repo we can't break it by uploading
if dep_status != PackageStatus.FINISHED and not dep_new:
pkg.set_blocked(
build_type, PackageStatus.FINISHED_BUT_BLOCKED, dep, dep_type)
changed = True
# Block packages where not every build type is finished
for pkg in pkgs:
unfinished = []
blocked = []
finished = []
for build_type in pkg.get_build_types():
status = pkg.get_status(build_type)
if status != PackageStatus.FINISHED:
if status == PackageStatus.FINISHED_BUT_BLOCKED:
blocked.append(build_type)
# if the package isn't in the repo better not block on it
elif not pkg.is_new(build_type):
unfinished.append(build_type)
else:
finished.append(build_type)
# We track source packages by assuming they are in the repo if there is
# at least one binary package in the repo. Uploading lone source
# packages will not change anything, so block them.
if not blocked and not unfinished and finished and \
all(build_type_is_src(bt) for bt in finished):
for build_type in pkg.get_build_types():
status = pkg.get_status(build_type)
if status in (PackageStatus.FINISHED, PackageStatus.FINISHED_BUT_BLOCKED):
changed = True
pkg.set_status(build_type, PackageStatus.FINISHED_BUT_INCOMPLETE)
elif unfinished:
for build_type in pkg.get_build_types():
status = pkg.get_status(build_type)
if status in (PackageStatus.FINISHED, PackageStatus.FINISHED_BUT_BLOCKED):
changed = True
for bt in unfinished:
pkg.set_blocked(build_type, PackageStatus.FINISHED_BUT_INCOMPLETE, pkg, bt)
elif blocked:
for build_type in pkg.get_build_types():
status = pkg.get_status(build_type)
if status == PackageStatus.FINISHED:
changed = True
for bt in blocked:
pkg.set_blocked(build_type, PackageStatus.FINISHED_BUT_BLOCKED, pkg, bt)
return pkgs
def update_status(pkgs: List[Package]) -> None:
repo = get_current_repo()
release = get_release(repo, "status")
status_object: Dict[str, Any] = {}
packages = []
for pkg in pkgs:
pkg_result = {}
pkg_result["name"] = pkg["name"]
pkg_result["version"] = pkg["version"]
builds = {}
for build_type in pkg.get_build_types():
details = pkg.get_status_details(build_type)
details.pop("blocked", None)
details["status"] = pkg.get_status(build_type).value
builds[build_type] = details
pkg_result["builds"] = builds
packages.append(pkg_result)
status_object["packages"] = packages
cycles = []
for a, b in get_cycles(pkgs):
cycles.append([a["name"], b["name"]])
status_object["cycles"] = sorted(cycles)
content = json.dumps(status_object, indent=2).encode()
# If multiple jobs update this at the same time things can fail
try:
asset_name = "status.json"
for asset in release.assets:
if asset.name == asset_name:
with make_writable(asset):
asset.delete_asset()
break
with io.BytesIO(content) as fileobj:
with make_writable(release):
new_asset = release.upload_asset_from_memory( # type: ignore
fileobj, len(content), asset_name)
except (GithubException, requests.RequestException) as e:
print(e)
return
print(f"Uploaded status file for {len(packages)} packages: {new_asset.browser_download_url}")
queue_website_update()