It'a a list of CVE IDs or GHSA IDs which whould be ignored. In lists we still show them, but at the end and with strike through. For picking the worst for the tooltip button color we ignore them. On the security page, of all are ignored, the package is skipped.
746 lines
25 KiB
Python
746 lines
25 KiB
Python
# Copyright 2016-2020 Christoph Reiter
|
|
# SPDX-License-Identifier: MIT
|
|
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
import re
|
|
import datetime
|
|
from enum import Enum
|
|
import urllib.parse
|
|
from typing import Any, Optional, NamedTuple
|
|
from collections.abc import Callable
|
|
|
|
import jinja2
|
|
import markupsafe
|
|
|
|
from fastapi import APIRouter, Request, Depends, Response, FastAPI
|
|
from fastapi.templating import Jinja2Templates
|
|
from fastapi.responses import RedirectResponse, HTMLResponse
|
|
from fastapi_etag import Etag
|
|
from fastapi.staticfiles import StaticFiles
|
|
from fastapi_etag import add_exception_handler as add_etag_exception_handler
|
|
|
|
from .appstate import state, get_repositories, Package, Source, DepType, SrcInfoPackage, get_base_group_name, Vulnerability, Severity
|
|
from .appconfig import DEFAULT_REPO
|
|
from .utils import extract_upstream_version, version_is_newer_than
|
|
|
|
router = APIRouter(default_response_class=HTMLResponse)
|
|
DIR = os.path.dirname(os.path.realpath(__file__))
|
|
templates = Jinja2Templates(directory=os.path.join(DIR, "templates"))
|
|
templates.env.undefined = jinja2.StrictUndefined
|
|
|
|
|
|
class PackageStatus(Enum):
|
|
FINISHED = 'finished'
|
|
FINISHED_BUT_BLOCKED = 'finished-but-blocked'
|
|
FINISHED_BUT_INCOMPLETE = 'finished-but-incomplete'
|
|
FAILED_TO_BUILD = 'failed-to-build'
|
|
WAITING_FOR_BUILD = 'waiting-for-build'
|
|
WAITING_FOR_DEPENDENCIES = 'waiting-for-dependencies'
|
|
MANUAL_BUILD_REQUIRED = 'manual-build-required'
|
|
UNKNOWN = 'unknown'
|
|
|
|
|
|
class PackageBuildStatus(NamedTuple):
|
|
type: str
|
|
status: str
|
|
details: str
|
|
urls: dict[str, str]
|
|
category: str
|
|
|
|
|
|
async def get_etag(request: Request) -> str:
|
|
return state.etag
|
|
|
|
|
|
def template_filter(name: str) -> Callable:
|
|
def wrap(f: Callable) -> Callable:
|
|
templates.env.filters[name] = f
|
|
return f
|
|
return wrap
|
|
|
|
|
|
def context_function(name: str) -> Callable:
|
|
def wrap(f: Callable) -> Callable:
|
|
@jinja2.pass_context
|
|
def ctxfunc(context: dict, *args: Any, **kwargs: Any) -> Any:
|
|
return f(context["request"], *args, **kwargs)
|
|
templates.env.globals[name] = ctxfunc
|
|
return f
|
|
return wrap
|
|
|
|
|
|
@context_function("is_endpoint")
|
|
def is_endpoint(request: Request, endpoint: str) -> bool:
|
|
path: str = request.scope["path"]
|
|
return path == "/" + endpoint or path.startswith("/" + endpoint + "/")
|
|
|
|
|
|
@context_function("update_timestamp")
|
|
def update_timestamp(request: Request) -> float:
|
|
return state.last_update
|
|
|
|
|
|
@context_function("vulnerability_color")
|
|
def vulnerability_color(request: Request, vuln: Vulnerability) -> str:
|
|
if vuln.severity == Severity.CRITICAL:
|
|
return "danger"
|
|
elif vuln.severity == Severity.HIGH:
|
|
return "warning"
|
|
else:
|
|
return "secondary"
|
|
|
|
|
|
@context_function("package_url")
|
|
def package_url(request: Request, package: Package, name: str | None = None) -> str:
|
|
res: str = ""
|
|
if name is None:
|
|
res = str(request.url_for("package", package_name=name or package.name))
|
|
res += "?repo=" + package.repo
|
|
if package.repo_variant:
|
|
res += "&variant=" + package.repo_variant
|
|
else:
|
|
res = str(request.url_for("package", package_name=re.split("[<>=]+", name)[0]))
|
|
if package.repo_variant:
|
|
res += "?repo=" + package.repo
|
|
res += "&variant=" + package.repo_variant
|
|
return res
|
|
|
|
|
|
def _license_to_html(license: str) -> str:
|
|
|
|
def create_url(license: str) -> str:
|
|
fn = urllib.parse.quote(license)
|
|
return f"https://spdx.org/licenses/{fn}.html"
|
|
|
|
def spdx_to_html(s: str) -> str:
|
|
scanner = re.Scanner([ # type: ignore
|
|
(r"[A-Za-z0-9.+-]+", lambda scanner, token: ("LICENSE", token)),
|
|
(r"[^A-Za-z0-9.+-]+", lambda scanner, token: ("TEXT", token)),
|
|
])
|
|
|
|
done = []
|
|
for t, token in scanner.scan(s)[0]:
|
|
if t == "LICENSE":
|
|
if token.upper() in ["AND", "OR", "WITH"]:
|
|
done.append(str(markupsafe.escape(token)))
|
|
elif token.upper().startswith("LICENSEREF-"):
|
|
done.append(str(markupsafe.escape(token.split("-", 1)[-1])))
|
|
else:
|
|
url = create_url(token)
|
|
done.append(f"<a href=\"{url}\">{markupsafe.escape(token)}</a>")
|
|
else:
|
|
done.append(str(markupsafe.escape(token)))
|
|
|
|
return "".join(done)
|
|
|
|
def needs_quote(s: str) -> bool:
|
|
return " " in s
|
|
|
|
if license.lower().startswith("spdx:"):
|
|
return spdx_to_html(license.split(":", 1)[-1])
|
|
return str(markupsafe.escape(license))
|
|
|
|
|
|
@context_function("licenses_to_html")
|
|
def licenses_to_html(request: Request, licenses: list[str]) -> str:
|
|
done = []
|
|
for license in licenses:
|
|
needs_quote = (" " in license.strip()) and len(licenses) > 1
|
|
html = _license_to_html(license)
|
|
if needs_quote:
|
|
done.append(f"({html})")
|
|
else:
|
|
done.append(html)
|
|
|
|
return " OR ".join(done)
|
|
|
|
|
|
@template_filter("rdepends_type")
|
|
def rdepends_type(types: set[DepType]) -> list[str]:
|
|
if list(types) == [DepType.NORMAL]:
|
|
return []
|
|
names = []
|
|
for t in types:
|
|
if t == DepType.NORMAL:
|
|
names.append("normal")
|
|
elif t == DepType.CHECK:
|
|
names.append("check")
|
|
elif t == DepType.OPTIONAL:
|
|
names.append("optional")
|
|
elif t == DepType.MAKE:
|
|
names.append("make")
|
|
return names
|
|
|
|
|
|
@template_filter("rdepends_sort")
|
|
def rdepends_sort(rdepends: dict[Package, set[str]]) -> list[tuple[Package, set[str]]]:
|
|
return sorted(rdepends.items(), key=lambda x: (x[0].name.lower(), x[0].key))
|
|
|
|
|
|
@template_filter('timestamp')
|
|
def filter_timestamp(d: int) -> str:
|
|
try:
|
|
return datetime.datetime.fromtimestamp(
|
|
int(d)).strftime('%Y-%m-%d %H:%M:%S')
|
|
except OSError:
|
|
return "-"
|
|
|
|
|
|
@template_filter('filesize')
|
|
def filter_filesize(d: int) -> str:
|
|
d = int(d)
|
|
if d > 1024 ** 3:
|
|
return "%.2f GB" % (d / (1024 ** 3))
|
|
else:
|
|
return "%.2f MB" % (d / (1024 ** 2))
|
|
|
|
|
|
@router.get('/robots.txt')
|
|
async def robots() -> Response:
|
|
data = """\
|
|
User-agent: *
|
|
Disallow: /search?*
|
|
"""
|
|
return Response(content=data, media_type='text/plain')
|
|
|
|
|
|
@router.get('/repos', dependencies=[Depends(Etag(get_etag))])
|
|
async def repos(request: Request, response: Response) -> Response:
|
|
return templates.TemplateResponse("repos.html", {"request": request, "repos": get_repositories()}, headers=dict(response.headers))
|
|
|
|
|
|
@router.get('/stats', dependencies=[Depends(Etag(get_etag))])
|
|
async def stats(request: Request, response: Response) -> Response:
|
|
return templates.TemplateResponse("stats.html", {"request": request}, headers=dict(response.headers))
|
|
|
|
|
|
@router.get('/mirrors', dependencies=[Depends(Etag(get_etag))])
|
|
async def mirrors(request: Request, response: Response) -> Response:
|
|
return templates.TemplateResponse("mirrors.html", {"request": request}, headers=dict(response.headers))
|
|
|
|
|
|
@router.get('/', dependencies=[Depends(Etag(get_etag))])
|
|
async def index(request: Request, response: Response) -> Response:
|
|
return RedirectResponse(request.url_for('queue'), headers=dict(response.headers))
|
|
|
|
|
|
@router.get('/base', dependencies=[Depends(Etag(get_etag))])
|
|
@router.get('/base/{base_name}', dependencies=[Depends(Etag(get_etag))])
|
|
async def base(request: Request, response: Response, base_name: str | None = None) -> Response:
|
|
global state
|
|
|
|
if base_name is not None:
|
|
if base_name in state.sources:
|
|
res = [state.sources[base_name]]
|
|
else:
|
|
res = []
|
|
return templates.TemplateResponse("base.html", {
|
|
"request": request,
|
|
"sources": res,
|
|
}, headers=dict(response.headers))
|
|
else:
|
|
return templates.TemplateResponse("baseindex.html", {
|
|
"request": request,
|
|
"sources": state.sources.values(),
|
|
}, headers=dict(response.headers))
|
|
|
|
|
|
@router.get('/security', dependencies=[Depends(Etag(get_etag))])
|
|
async def security(request: Request, response: Response) -> Response:
|
|
global state
|
|
|
|
return templates.TemplateResponse("security.html", {
|
|
"request": request,
|
|
"vulnerable": [s for s in state.sources.values() if s.worst_vulnerability is not None],
|
|
"sources": state.sources.values(),
|
|
"known": [s for s in state.sources.values() if s.can_have_vulnerabilities],
|
|
"unknown": [s for s in state.sources.values() if not s.can_have_vulnerabilities],
|
|
}, headers=dict(response.headers))
|
|
|
|
|
|
@router.get('/group/', dependencies=[Depends(Etag(get_etag))])
|
|
@router.get('/group/{group_name}', dependencies=[Depends(Etag(get_etag))])
|
|
async def group(request: Request, response: Response, group_name: str | None = None) -> Response:
|
|
params = {}
|
|
if group_name is not None:
|
|
params['group_name'] = group_name
|
|
return RedirectResponse(request.url_for('groups', **params), headers=dict(response.headers))
|
|
|
|
|
|
@router.get('/groups/', dependencies=[Depends(Etag(get_etag))])
|
|
@router.get('/groups/{group_name}', dependencies=[Depends(Etag(get_etag))])
|
|
async def groups(request: Request, response: Response, group_name: str | None = None) -> Response:
|
|
global state
|
|
|
|
if group_name is not None:
|
|
res = []
|
|
for s in state.sources.values():
|
|
for k, p in sorted(s.packages.items()):
|
|
if group_name in p.groups:
|
|
res.append(p)
|
|
|
|
return templates.TemplateResponse("group.html", {
|
|
"request": request,
|
|
"name": group_name,
|
|
"packages": res,
|
|
}, headers=dict(response.headers))
|
|
else:
|
|
groups: dict[str, int] = {}
|
|
for s in state.sources.values():
|
|
for k, p in sorted(s.packages.items()):
|
|
for name in p.groups:
|
|
groups[name] = groups.get(name, 0) + 1
|
|
return templates.TemplateResponse('groups.html', {
|
|
"request": request,
|
|
"groups": groups,
|
|
}, headers=dict(response.headers))
|
|
|
|
|
|
@router.get('/basegroups/', dependencies=[Depends(Etag(get_etag))])
|
|
@router.get('/basegroups/{group_name}', dependencies=[Depends(Etag(get_etag))])
|
|
async def basegroups(request: Request, response: Response, group_name: str | None = None) -> Response:
|
|
global state
|
|
|
|
if group_name is not None:
|
|
groups: dict[str, int] = {}
|
|
for s in state.sources.values():
|
|
for k, p in sorted(s.packages.items()):
|
|
for name in p.groups:
|
|
base_name = get_base_group_name(p, name)
|
|
if base_name == group_name:
|
|
groups[name] = groups.get(name, 0) + 1
|
|
|
|
return templates.TemplateResponse("basegroup.html", {
|
|
"request": request,
|
|
"name": group_name,
|
|
"groups": groups,
|
|
}, headers=dict(response.headers))
|
|
else:
|
|
base_groups: dict[str, set[str]] = {}
|
|
for s in state.sources.values():
|
|
for k, p in sorted(s.packages.items()):
|
|
for name in p.groups:
|
|
base_name = get_base_group_name(p, name)
|
|
base_groups.setdefault(base_name, set()).add(name)
|
|
|
|
return templates.TemplateResponse('basegroups.html', {
|
|
"request": request,
|
|
"base_groups": base_groups,
|
|
}, headers=dict(response.headers))
|
|
|
|
|
|
@router.get('/package/', dependencies=[Depends(Etag(get_etag))])
|
|
async def packages(request: Request, response: Response, repo: str | None = None, variant: str | None = None) -> Response:
|
|
global state
|
|
|
|
repo = repo or DEFAULT_REPO
|
|
|
|
packages = []
|
|
for s in state.sources.values():
|
|
for k, p in sorted(s.packages.items()):
|
|
if p.repo == repo:
|
|
if not variant or p.repo_variant == variant:
|
|
packages.append((s, p))
|
|
|
|
repos = get_repositories()
|
|
return templates.TemplateResponse("packages.html", {
|
|
"request": request,
|
|
"packages": packages,
|
|
"repos": repos,
|
|
"repo_filter": repo,
|
|
}, headers=dict(response.headers))
|
|
|
|
|
|
@router.get('/package/{package_name}', dependencies=[Depends(Etag(get_etag))])
|
|
async def package(request: Request, response: Response, package_name: str, repo: str | None = None, variant: str | None = None) -> Response:
|
|
global state
|
|
|
|
packages = []
|
|
provides = []
|
|
for s in state.sources.values():
|
|
for k, p in sorted(s.packages.items()):
|
|
is_package_exact = (package_name is None or p.name == package_name)
|
|
if is_package_exact or package_name in p.provides:
|
|
if not repo or p.repo == repo:
|
|
if not variant or p.repo_variant == variant:
|
|
if is_package_exact:
|
|
packages.append((s, p))
|
|
else:
|
|
provides.append((s, p))
|
|
|
|
if packages:
|
|
return templates.TemplateResponse("package.html", {
|
|
"request": request,
|
|
"packages": packages,
|
|
}, headers=dict(response.headers))
|
|
else:
|
|
return templates.TemplateResponse("packagevirtual.html", {
|
|
"request": request,
|
|
"name": package_name,
|
|
"packages": provides,
|
|
}, headers=dict(response.headers))
|
|
|
|
|
|
@router.get('/updates', dependencies=[Depends(Etag(get_etag))])
|
|
async def updates(request: Request, response: Response, repo: str = "") -> Response:
|
|
|
|
repo_filter = repo or None
|
|
repos = get_repositories()
|
|
|
|
packages: list[Package] = []
|
|
for s in state.sources.values():
|
|
for p in s.packages.values():
|
|
if repo_filter is not None and p.repo != repo_filter:
|
|
continue
|
|
packages.append(p)
|
|
packages.sort(key=lambda p: p.builddate, reverse=True)
|
|
|
|
return templates.TemplateResponse("updates.html", {
|
|
"request": request,
|
|
"packages": packages[:250],
|
|
"repos": repos,
|
|
"repo_filter": repo_filter,
|
|
}, headers=dict(response.headers))
|
|
|
|
|
|
def get_transitive_depends(related: list[str]) -> set[str]:
|
|
if not related:
|
|
return set()
|
|
|
|
db_depends: dict[str, set[str]] = {}
|
|
related_pkgs = set()
|
|
for s in state.sources.values():
|
|
for p in s.packages.values():
|
|
if s.name in related:
|
|
related_pkgs.add(p.name)
|
|
db_depends.setdefault(p.name, set()).update(p.depends.keys())
|
|
|
|
todo = set(related_pkgs)
|
|
done = set()
|
|
while todo:
|
|
name = todo.pop()
|
|
if name in done:
|
|
continue
|
|
done.add(name)
|
|
if name in db_depends:
|
|
todo.update(db_depends[name])
|
|
|
|
return done
|
|
|
|
|
|
@router.get('/outofdate', dependencies=[Depends(Etag(get_etag))])
|
|
async def outofdate(request: Request, response: Response, related: str | None = None, repo: str = "") -> Response:
|
|
|
|
repo_filter = repo or None
|
|
repos = get_repositories()
|
|
|
|
missing = []
|
|
to_update = []
|
|
all_sources = []
|
|
|
|
if related is not None:
|
|
related_list = list(filter(None, [s.strip() for s in related.split(",")]))
|
|
else:
|
|
related_list = []
|
|
|
|
related_depends = get_transitive_depends(related_list)
|
|
|
|
for s in state.sources.values():
|
|
all_sources.append(s)
|
|
|
|
if "internal" in s.pkgextra.references:
|
|
continue
|
|
|
|
if related_depends:
|
|
for p in s.packages.values():
|
|
if p.name in related_depends:
|
|
break
|
|
else:
|
|
continue
|
|
|
|
if repo_filter is not None and repo_filter not in s.repos:
|
|
continue
|
|
|
|
msys_version = extract_upstream_version(s.version)
|
|
git_version = extract_upstream_version(s.git_version)
|
|
if not version_is_newer_than(git_version, msys_version):
|
|
git_version = ""
|
|
|
|
info = s.upstream_info
|
|
if info is not None and info.version is not None:
|
|
if version_is_newer_than(info.version, msys_version):
|
|
to_update.append((s, msys_version, git_version, info.version, info.url, info.date))
|
|
else:
|
|
missing.append(s)
|
|
|
|
# show packages which have recently been build first.
|
|
# assumes high frequency update packages are more important
|
|
to_update.sort(key=lambda i: (i[-1], i[0].name), reverse=True)
|
|
|
|
missing.sort(key=lambda i: i.date, reverse=True)
|
|
|
|
return templates.TemplateResponse("outofdate.html", {
|
|
"request": request,
|
|
"all_sources": all_sources,
|
|
"to_update": to_update,
|
|
"missing": missing,
|
|
"related": related or "",
|
|
"repos": repos,
|
|
"repo_filter": repo_filter,
|
|
}, headers=dict(response.headers))
|
|
|
|
|
|
def get_status_text(key: str) -> str:
|
|
try:
|
|
status = PackageStatus(key)
|
|
except ValueError:
|
|
return key
|
|
if status == PackageStatus.UNKNOWN:
|
|
return "Waiting to be processed"
|
|
elif status == PackageStatus.FAILED_TO_BUILD:
|
|
return "Failed to build"
|
|
elif status == PackageStatus.FINISHED:
|
|
return "Ready for upload"
|
|
elif status == PackageStatus.FINISHED_BUT_BLOCKED:
|
|
return "Ready for upload but waiting for dependencies"
|
|
elif status == PackageStatus.FINISHED_BUT_INCOMPLETE:
|
|
return "Ready for upload but related builds are missing"
|
|
elif status == PackageStatus.MANUAL_BUILD_REQUIRED:
|
|
return "Manual build required"
|
|
elif status == PackageStatus.WAITING_FOR_BUILD:
|
|
return "Waiting to be built"
|
|
elif status == PackageStatus.WAITING_FOR_DEPENDENCIES:
|
|
return "Waiting for dependencies"
|
|
else:
|
|
return key
|
|
|
|
|
|
def get_status_category(key: str) -> str:
|
|
SUCCESS = "success"
|
|
DANGER = "danger"
|
|
|
|
try:
|
|
status = PackageStatus(key)
|
|
except ValueError:
|
|
return DANGER
|
|
|
|
if status in (PackageStatus.FINISHED, PackageStatus.FINISHED_BUT_BLOCKED,
|
|
PackageStatus.FINISHED_BUT_INCOMPLETE):
|
|
return SUCCESS
|
|
elif status in (PackageStatus.WAITING_FOR_BUILD, PackageStatus.WAITING_FOR_DEPENDENCIES,
|
|
PackageStatus.UNKNOWN):
|
|
return ""
|
|
else:
|
|
return DANGER
|
|
|
|
|
|
def get_status_priority(key: str) -> tuple[int, str]:
|
|
"""We want to show the most important status as the primary one"""
|
|
|
|
try:
|
|
status = PackageStatus(key)
|
|
except ValueError:
|
|
return (-1, key)
|
|
|
|
order = [
|
|
PackageStatus.UNKNOWN,
|
|
PackageStatus.FINISHED,
|
|
PackageStatus.MANUAL_BUILD_REQUIRED,
|
|
PackageStatus.FINISHED_BUT_INCOMPLETE,
|
|
PackageStatus.FINISHED_BUT_BLOCKED,
|
|
PackageStatus.WAITING_FOR_BUILD,
|
|
PackageStatus.WAITING_FOR_DEPENDENCIES,
|
|
PackageStatus.FAILED_TO_BUILD,
|
|
]
|
|
|
|
try:
|
|
return (order.index(status), key)
|
|
except ValueError:
|
|
return (-1, key)
|
|
|
|
|
|
def repo_to_builds(repo: str) -> list[str]:
|
|
if repo == "msys":
|
|
return [repo, "msys-src"]
|
|
else:
|
|
return [repo, "mingw-src"]
|
|
|
|
|
|
def get_build_status(srcinfo: SrcInfoPackage, build_types: set[str] = set()) -> list[PackageBuildStatus]:
|
|
build_status = state.build_status
|
|
|
|
entry = None
|
|
for package in build_status.packages:
|
|
if package.name == srcinfo.pkgbase and package.version == srcinfo.build_version:
|
|
entry = package
|
|
break
|
|
|
|
results = []
|
|
if entry is not None:
|
|
for build_type, status in sorted(entry.builds.items(), key=lambda i: get_status_priority(i[1].status), reverse=True):
|
|
status_key = status.status
|
|
if build_types and build_type not in build_types:
|
|
continue
|
|
results.append(
|
|
PackageBuildStatus(
|
|
build_type, get_status_text(status_key),
|
|
status.desc or "", status.urls,
|
|
get_status_category(status_key))
|
|
)
|
|
|
|
if not results:
|
|
for build in sorted(build_types):
|
|
key = "unknown"
|
|
results.append(
|
|
PackageBuildStatus(build, get_status_text(key), "", {}, get_status_category(key)))
|
|
|
|
return results
|
|
|
|
|
|
@router.get('/queue', dependencies=[Depends(Etag(get_etag))])
|
|
async def queue(request: Request, response: Response, build_type: str = "") -> Response:
|
|
# Create entries for all packages where the version doesn't match
|
|
|
|
UpdateEntry = tuple[SrcInfoPackage, Optional[Source], Optional[Package], list[PackageBuildStatus]]
|
|
|
|
build_filter = build_type or None
|
|
srcinfo_repos: dict[str, set[str]] = {}
|
|
|
|
grouped: dict[str, UpdateEntry] = {}
|
|
for s in state.sources.values():
|
|
for k, p in sorted(s.packages.items()):
|
|
if p.name in state.sourceinfos:
|
|
srcinfo = state.sourceinfos[p.name]
|
|
if build_filter is not None and build_filter not in repo_to_builds(srcinfo.repo):
|
|
continue
|
|
if version_is_newer_than(srcinfo.build_version, p.version):
|
|
srcinfo_repos.setdefault(srcinfo.pkgbase, set()).update(repo_to_builds(srcinfo.repo))
|
|
repo_list = srcinfo_repos[srcinfo.pkgbase] if not build_filter else {build_filter}
|
|
new_src = state.sources.get(srcinfo.pkgbase)
|
|
grouped[srcinfo.pkgbase] = (srcinfo, new_src, p, get_build_status(srcinfo, repo_list))
|
|
|
|
# new packages
|
|
available: dict[str, list[SrcInfoPackage]] = {}
|
|
for srcinfo in state.sourceinfos.values():
|
|
if build_filter is not None and build_filter not in repo_to_builds(srcinfo.repo):
|
|
continue
|
|
available.setdefault(srcinfo.pkgname, []).append(srcinfo)
|
|
for s in state.sources.values():
|
|
for p in s.packages.values():
|
|
available.pop(p.name, None)
|
|
|
|
# only one per pkgbase
|
|
for srcinfos in available.values():
|
|
for srcinfo in srcinfos:
|
|
srcinfo_repos.setdefault(srcinfo.pkgbase, set()).update(repo_to_builds(srcinfo.repo))
|
|
repo_list = srcinfo_repos[srcinfo.pkgbase] if not build_filter else {build_filter}
|
|
src, pkg = None, None
|
|
if srcinfo.pkgbase in grouped:
|
|
src, pkg = grouped[srcinfo.pkgbase][1:3]
|
|
elif srcinfo.pkgbase in state.sources:
|
|
src = state.sources[srcinfo.pkgbase]
|
|
grouped[srcinfo.pkgbase] = (srcinfo, src, pkg, get_build_status(srcinfo, repo_list))
|
|
|
|
updates: list[UpdateEntry] = []
|
|
updates = list(grouped.values())
|
|
updates.sort(
|
|
key=lambda i: (i[0].date, i[0].pkgbase, i[0].pkgname),
|
|
reverse=True)
|
|
|
|
# get all packages in the pacman repo which are no in GIT
|
|
removals = []
|
|
for s in state.sources.values():
|
|
for k, p in s.packages.items():
|
|
if build_filter is not None and build_filter not in repo_to_builds(p.repo):
|
|
continue
|
|
if p.name not in state.sourceinfos:
|
|
# FIXME: can also break things if it's the only provides and removed,
|
|
# and also is ok to remove if there is a replacement
|
|
removals.append((p, ", ".join([d.name for d in p.rdepends])))
|
|
|
|
build_types = set()
|
|
for r in get_repositories():
|
|
build_types.update(repo_to_builds(r.name))
|
|
|
|
return templates.TemplateResponse("queue.html", {
|
|
"request": request,
|
|
"updates": updates,
|
|
"removals": removals,
|
|
"build_types": build_types,
|
|
"build_filter": build_filter,
|
|
"cycles": state.build_status.cycles,
|
|
}, headers=dict(response.headers))
|
|
|
|
|
|
@router.get('/new', dependencies=[Depends(Etag(get_etag))])
|
|
@router.get('/removals', dependencies=[Depends(Etag(get_etag))])
|
|
async def new(request: Request, response: Response) -> Response:
|
|
return RedirectResponse(request.url_for('queue'), headers=dict(response.headers))
|
|
|
|
|
|
@router.get('/search', dependencies=[Depends(Etag(get_etag))])
|
|
async def search(request: Request, response: Response, q: str = "", t: str = "") -> Response:
|
|
query = q
|
|
qtype = t
|
|
|
|
if qtype not in ["pkg", "binpkg"]:
|
|
qtype = "pkg"
|
|
|
|
parts = query.split()
|
|
parts_lower = [p.lower() for p in parts]
|
|
res_pkg: list[tuple[float, Package | Source]] = []
|
|
|
|
def get_score(name: str, parts: list[str]) -> float:
|
|
score = 0.0
|
|
for part in parts:
|
|
if part not in name:
|
|
return -1
|
|
score += name.count(part) * len(part) / len(name)
|
|
return score
|
|
|
|
if not query:
|
|
pass
|
|
elif qtype == "pkg":
|
|
for s in state.sources.values():
|
|
score = get_score(s.realname.lower(), parts_lower)
|
|
if score >= 0:
|
|
res_pkg.append((score, s))
|
|
continue
|
|
score = get_score(s.name.lower(), parts_lower)
|
|
if score >= 0:
|
|
res_pkg.append((score, s))
|
|
res_pkg.sort(key=lambda e: (-e[0], e[1].name.lower()))
|
|
elif qtype == "binpkg":
|
|
for s in state.sources.values():
|
|
for sub in s.packages.values():
|
|
score = get_score(sub.realname.lower(), parts_lower)
|
|
if score >= 0:
|
|
res_pkg.append((score, sub))
|
|
continue
|
|
score = get_score(sub.name.lower(), parts_lower)
|
|
if score >= 0:
|
|
res_pkg.append((score, sub))
|
|
res_pkg.sort(key=lambda e: (-e[0], e[1].name.lower()))
|
|
|
|
return templates.TemplateResponse("search.html", {
|
|
"request": request,
|
|
"results": res_pkg,
|
|
"query": query,
|
|
"qtype": qtype,
|
|
}, headers=dict(response.headers))
|
|
|
|
|
|
async def check_is_ready(request: Request, call_next: Callable) -> Response:
|
|
if not state.ready:
|
|
return Response(content="starting up...", status_code=503)
|
|
response: Response = await call_next(request)
|
|
return response
|
|
|
|
|
|
webapp = FastAPI(openapi_url=None)
|
|
webapp.mount("/static", StaticFiles(directory=os.path.join(DIR, "static")), name="static")
|
|
webapp.include_router(router)
|
|
add_etag_exception_handler(webapp)
|