733 lines
25 KiB
Python
733 lines
25 KiB
Python
# Copyright 2016-2020 Christoph Reiter
|
|
# SPDX-License-Identifier: MIT
|
|
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
import re
|
|
import datetime
|
|
from enum import Enum
|
|
import urllib.parse
|
|
from typing import Any, Optional, NamedTuple
|
|
from collections.abc import Callable
|
|
|
|
import jinja2
|
|
import markupsafe
|
|
|
|
from fastapi import APIRouter, Request, Depends, Response, FastAPI
|
|
from fastapi.templating import Jinja2Templates
|
|
from fastapi.responses import RedirectResponse, HTMLResponse
|
|
from fastapi_etag import Etag
|
|
from fastapi.staticfiles import StaticFiles
|
|
from fastapi_etag import add_exception_handler as add_etag_exception_handler
|
|
|
|
from .appstate import state, get_repositories, Package, Source, DepType, SrcInfoPackage, get_base_group_name, Vulnerability, Severity
|
|
from .appconfig import DEFAULT_REPO
|
|
from .utils import extract_upstream_version, version_is_newer_than
|
|
|
|
router = APIRouter(default_response_class=HTMLResponse)
|
|
DIR = os.path.dirname(os.path.realpath(__file__))
|
|
templates = Jinja2Templates(directory=os.path.join(DIR, "templates"))
|
|
templates.env.undefined = jinja2.StrictUndefined
|
|
|
|
|
|
class PackageStatus(Enum):
|
|
FINISHED = 'finished'
|
|
FINISHED_BUT_BLOCKED = 'finished-but-blocked'
|
|
FINISHED_BUT_INCOMPLETE = 'finished-but-incomplete'
|
|
FAILED_TO_BUILD = 'failed-to-build'
|
|
WAITING_FOR_BUILD = 'waiting-for-build'
|
|
WAITING_FOR_DEPENDENCIES = 'waiting-for-dependencies'
|
|
MANUAL_BUILD_REQUIRED = 'manual-build-required'
|
|
UNKNOWN = 'unknown'
|
|
|
|
|
|
class PackageBuildStatus(NamedTuple):
|
|
type: str
|
|
status: str
|
|
details: str
|
|
urls: dict[str, str]
|
|
category: str
|
|
|
|
|
|
async def get_etag(request: Request) -> str:
|
|
return state.etag
|
|
|
|
|
|
def template_filter(name: str) -> Callable:
|
|
def wrap(f: Callable) -> Callable:
|
|
templates.env.filters[name] = f
|
|
return f
|
|
return wrap
|
|
|
|
|
|
def context_function(name: str) -> Callable:
|
|
def wrap(f: Callable) -> Callable:
|
|
@jinja2.pass_context
|
|
def ctxfunc(context: dict, *args: Any, **kwargs: Any) -> Any:
|
|
return f(context["request"], *args, **kwargs)
|
|
templates.env.globals[name] = ctxfunc
|
|
return f
|
|
return wrap
|
|
|
|
|
|
@context_function("is_endpoint")
|
|
def is_endpoint(request: Request, endpoint: str) -> bool:
|
|
path: str = request.scope["path"]
|
|
return path == "/" + endpoint or path.startswith("/" + endpoint + "/")
|
|
|
|
|
|
@context_function("update_timestamp")
|
|
def update_timestamp(request: Request) -> float:
|
|
return state.last_update
|
|
|
|
|
|
@context_function("vulnerability_color")
|
|
def vulnerability_color(request: Request, vuln: Vulnerability) -> str:
|
|
if vuln.severity == Severity.CRITICAL:
|
|
return "danger"
|
|
elif vuln.severity == Severity.HIGH:
|
|
return "warning"
|
|
else:
|
|
return "secondary"
|
|
|
|
|
|
@context_function("package_url")
|
|
def package_url(request: Request, package: Package, name: str | None = None) -> str:
|
|
res: str = ""
|
|
if name is None:
|
|
res = str(request.url_for("package", package_name=name or package.name))
|
|
res += "?repo=" + package.repo
|
|
if package.repo_variant:
|
|
res += "&variant=" + package.repo_variant
|
|
else:
|
|
res = str(request.url_for("package", package_name=re.split("[<>=]+", name)[0]))
|
|
if package.repo_variant:
|
|
res += "?repo=" + package.repo
|
|
res += "&variant=" + package.repo_variant
|
|
return res
|
|
|
|
|
|
def _license_to_html(license: str) -> str:
|
|
|
|
def create_url(license: str) -> str:
|
|
fn = urllib.parse.quote(license)
|
|
return f"https://spdx.org/licenses/{fn}.html"
|
|
|
|
def spdx_to_html(s: str) -> str:
|
|
scanner = re.Scanner([ # type: ignore
|
|
(r"[A-Za-z0-9.+-]+", lambda scanner, token: ("LICENSE", token)),
|
|
(r"[^A-Za-z0-9.+-]+", lambda scanner, token: ("TEXT", token)),
|
|
])
|
|
|
|
done = []
|
|
for t, token in scanner.scan(s)[0]:
|
|
if t == "LICENSE":
|
|
if token.upper() in ["AND", "OR", "WITH"]:
|
|
done.append(str(markupsafe.escape(token)))
|
|
elif token.upper().startswith("LICENSEREF-"):
|
|
done.append(str(markupsafe.escape(token.split("-", 1)[-1])))
|
|
else:
|
|
url = create_url(token)
|
|
done.append(f"<a href=\"{url}\">{markupsafe.escape(token)}</a>")
|
|
else:
|
|
done.append(str(markupsafe.escape(token)))
|
|
|
|
return "".join(done)
|
|
|
|
def needs_quote(s: str) -> bool:
|
|
return " " in s
|
|
|
|
if license.lower().startswith("spdx:"):
|
|
return spdx_to_html(license.split(":", 1)[-1])
|
|
return str(markupsafe.escape(license))
|
|
|
|
|
|
@context_function("licenses_to_html")
|
|
def licenses_to_html(request: Request, licenses: list[str]) -> str:
|
|
done = []
|
|
for license in licenses:
|
|
needs_quote = (" " in license.strip()) and len(licenses) > 1
|
|
html = _license_to_html(license)
|
|
if needs_quote:
|
|
done.append(f"({html})")
|
|
else:
|
|
done.append(html)
|
|
|
|
return " OR ".join(done)
|
|
|
|
|
|
@template_filter("rdepends_type")
|
|
def rdepends_type(types: set[DepType]) -> list[str]:
|
|
if list(types) == [DepType.NORMAL]:
|
|
return []
|
|
names = []
|
|
for t in types:
|
|
if t == DepType.NORMAL:
|
|
names.append("normal")
|
|
elif t == DepType.CHECK:
|
|
names.append("check")
|
|
elif t == DepType.OPTIONAL:
|
|
names.append("optional")
|
|
elif t == DepType.MAKE:
|
|
names.append("make")
|
|
return names
|
|
|
|
|
|
@template_filter("rdepends_sort")
|
|
def rdepends_sort(rdepends: dict[Package, set[str]]) -> list[tuple[Package, set[str]]]:
|
|
return sorted(rdepends.items(), key=lambda x: (x[0].name.lower(), x[0].key))
|
|
|
|
|
|
@template_filter('timestamp')
|
|
def filter_timestamp(d: int) -> str:
|
|
try:
|
|
return datetime.datetime.fromtimestamp(
|
|
int(d)).strftime('%Y-%m-%d %H:%M:%S')
|
|
except OSError:
|
|
return "-"
|
|
|
|
|
|
@template_filter('filesize')
|
|
def filter_filesize(d: int) -> str:
|
|
d = int(d)
|
|
if d > 1024 ** 3:
|
|
return "%.2f GB" % (d / (1024 ** 3))
|
|
else:
|
|
return "%.2f MB" % (d / (1024 ** 2))
|
|
|
|
|
|
@router.get('/robots.txt')
|
|
async def robots() -> Response:
|
|
data = """\
|
|
User-agent: *
|
|
Disallow: /search?*
|
|
"""
|
|
return Response(content=data, media_type='text/plain')
|
|
|
|
|
|
@router.get('/repos', dependencies=[Depends(Etag(get_etag))])
|
|
async def repos(request: Request, response: Response) -> Response:
|
|
return templates.TemplateResponse("repos.html", {"request": request, "repos": get_repositories()}, headers=dict(response.headers))
|
|
|
|
|
|
@router.get('/stats', dependencies=[Depends(Etag(get_etag))])
|
|
async def stats(request: Request, response: Response) -> Response:
|
|
return templates.TemplateResponse("stats.html", {"request": request}, headers=dict(response.headers))
|
|
|
|
|
|
@router.get('/mirrors', dependencies=[Depends(Etag(get_etag))])
|
|
async def mirrors(request: Request, response: Response) -> Response:
|
|
return templates.TemplateResponse("mirrors.html", {"request": request}, headers=dict(response.headers))
|
|
|
|
|
|
@router.get('/', dependencies=[Depends(Etag(get_etag))])
|
|
async def index(request: Request, response: Response) -> Response:
|
|
return RedirectResponse(request.url_for('queue'), headers=dict(response.headers))
|
|
|
|
|
|
@router.get('/base', dependencies=[Depends(Etag(get_etag))])
|
|
@router.get('/base/{base_name}', dependencies=[Depends(Etag(get_etag))])
|
|
async def base(request: Request, response: Response, base_name: str | None = None) -> Response:
|
|
global state
|
|
|
|
if base_name is not None:
|
|
if base_name in state.sources:
|
|
res = [state.sources[base_name]]
|
|
else:
|
|
res = []
|
|
return templates.TemplateResponse("base.html", {
|
|
"request": request,
|
|
"sources": res,
|
|
}, headers=dict(response.headers))
|
|
else:
|
|
return templates.TemplateResponse("baseindex.html", {
|
|
"request": request,
|
|
"sources": state.sources.values(),
|
|
}, headers=dict(response.headers))
|
|
|
|
|
|
@router.get('/group/', dependencies=[Depends(Etag(get_etag))])
|
|
@router.get('/group/{group_name}', dependencies=[Depends(Etag(get_etag))])
|
|
async def group(request: Request, response: Response, group_name: str | None = None) -> Response:
|
|
params = {}
|
|
if group_name is not None:
|
|
params['group_name'] = group_name
|
|
return RedirectResponse(request.url_for('groups', **params), headers=dict(response.headers))
|
|
|
|
|
|
@router.get('/groups/', dependencies=[Depends(Etag(get_etag))])
|
|
@router.get('/groups/{group_name}', dependencies=[Depends(Etag(get_etag))])
|
|
async def groups(request: Request, response: Response, group_name: str | None = None) -> Response:
|
|
global state
|
|
|
|
if group_name is not None:
|
|
res = []
|
|
for s in state.sources.values():
|
|
for k, p in sorted(s.packages.items()):
|
|
if group_name in p.groups:
|
|
res.append(p)
|
|
|
|
return templates.TemplateResponse("group.html", {
|
|
"request": request,
|
|
"name": group_name,
|
|
"packages": res,
|
|
}, headers=dict(response.headers))
|
|
else:
|
|
groups: dict[str, int] = {}
|
|
for s in state.sources.values():
|
|
for k, p in sorted(s.packages.items()):
|
|
for name in p.groups:
|
|
groups[name] = groups.get(name, 0) + 1
|
|
return templates.TemplateResponse('groups.html', {
|
|
"request": request,
|
|
"groups": groups,
|
|
}, headers=dict(response.headers))
|
|
|
|
|
|
@router.get('/basegroups/', dependencies=[Depends(Etag(get_etag))])
|
|
@router.get('/basegroups/{group_name}', dependencies=[Depends(Etag(get_etag))])
|
|
async def basegroups(request: Request, response: Response, group_name: str | None = None) -> Response:
|
|
global state
|
|
|
|
if group_name is not None:
|
|
groups: dict[str, int] = {}
|
|
for s in state.sources.values():
|
|
for k, p in sorted(s.packages.items()):
|
|
for name in p.groups:
|
|
base_name = get_base_group_name(p, name)
|
|
if base_name == group_name:
|
|
groups[name] = groups.get(name, 0) + 1
|
|
|
|
return templates.TemplateResponse("basegroup.html", {
|
|
"request": request,
|
|
"name": group_name,
|
|
"groups": groups,
|
|
}, headers=dict(response.headers))
|
|
else:
|
|
base_groups: dict[str, set[str]] = {}
|
|
for s in state.sources.values():
|
|
for k, p in sorted(s.packages.items()):
|
|
for name in p.groups:
|
|
base_name = get_base_group_name(p, name)
|
|
base_groups.setdefault(base_name, set()).add(name)
|
|
|
|
return templates.TemplateResponse('basegroups.html', {
|
|
"request": request,
|
|
"base_groups": base_groups,
|
|
}, headers=dict(response.headers))
|
|
|
|
|
|
@router.get('/package/', dependencies=[Depends(Etag(get_etag))])
|
|
async def packages(request: Request, response: Response, repo: str | None = None, variant: str | None = None) -> Response:
|
|
global state
|
|
|
|
repo = repo or DEFAULT_REPO
|
|
|
|
packages = []
|
|
for s in state.sources.values():
|
|
for k, p in sorted(s.packages.items()):
|
|
if p.repo == repo:
|
|
if not variant or p.repo_variant == variant:
|
|
packages.append((s, p))
|
|
|
|
repos = get_repositories()
|
|
return templates.TemplateResponse("packages.html", {
|
|
"request": request,
|
|
"packages": packages,
|
|
"repos": repos,
|
|
"repo_filter": repo,
|
|
}, headers=dict(response.headers))
|
|
|
|
|
|
@router.get('/package/{package_name}', dependencies=[Depends(Etag(get_etag))])
|
|
async def package(request: Request, response: Response, package_name: str, repo: str | None = None, variant: str | None = None) -> Response:
|
|
global state
|
|
|
|
packages = []
|
|
provides = []
|
|
for s in state.sources.values():
|
|
for k, p in sorted(s.packages.items()):
|
|
is_package_exact = (package_name is None or p.name == package_name)
|
|
if is_package_exact or package_name in p.provides:
|
|
if not repo or p.repo == repo:
|
|
if not variant or p.repo_variant == variant:
|
|
if is_package_exact:
|
|
packages.append((s, p))
|
|
else:
|
|
provides.append((s, p))
|
|
|
|
if packages:
|
|
return templates.TemplateResponse("package.html", {
|
|
"request": request,
|
|
"packages": packages,
|
|
}, headers=dict(response.headers))
|
|
else:
|
|
return templates.TemplateResponse("packagevirtual.html", {
|
|
"request": request,
|
|
"name": package_name,
|
|
"packages": provides,
|
|
}, headers=dict(response.headers))
|
|
|
|
|
|
@router.get('/updates', dependencies=[Depends(Etag(get_etag))])
|
|
async def updates(request: Request, response: Response, repo: str = "") -> Response:
|
|
|
|
repo_filter = repo or None
|
|
repos = get_repositories()
|
|
|
|
packages: list[Package] = []
|
|
for s in state.sources.values():
|
|
for p in s.packages.values():
|
|
if repo_filter is not None and p.repo != repo_filter:
|
|
continue
|
|
packages.append(p)
|
|
packages.sort(key=lambda p: p.builddate, reverse=True)
|
|
|
|
return templates.TemplateResponse("updates.html", {
|
|
"request": request,
|
|
"packages": packages[:250],
|
|
"repos": repos,
|
|
"repo_filter": repo_filter,
|
|
}, headers=dict(response.headers))
|
|
|
|
|
|
def get_transitive_depends(related: list[str]) -> set[str]:
|
|
if not related:
|
|
return set()
|
|
|
|
db_depends: dict[str, set[str]] = {}
|
|
related_pkgs = set()
|
|
for s in state.sources.values():
|
|
for p in s.packages.values():
|
|
if s.name in related:
|
|
related_pkgs.add(p.name)
|
|
db_depends.setdefault(p.name, set()).update(p.depends.keys())
|
|
|
|
todo = set(related_pkgs)
|
|
done = set()
|
|
while todo:
|
|
name = todo.pop()
|
|
if name in done:
|
|
continue
|
|
done.add(name)
|
|
if name in db_depends:
|
|
todo.update(db_depends[name])
|
|
|
|
return done
|
|
|
|
|
|
@router.get('/outofdate', dependencies=[Depends(Etag(get_etag))])
|
|
async def outofdate(request: Request, response: Response, related: str | None = None, repo: str = "") -> Response:
|
|
|
|
repo_filter = repo or None
|
|
repos = get_repositories()
|
|
|
|
missing = []
|
|
to_update = []
|
|
all_sources = []
|
|
|
|
if related is not None:
|
|
related_list = list(filter(None, [s.strip() for s in related.split(",")]))
|
|
else:
|
|
related_list = []
|
|
|
|
related_depends = get_transitive_depends(related_list)
|
|
|
|
for s in state.sources.values():
|
|
all_sources.append(s)
|
|
|
|
if "internal" in s.pkgextra.references:
|
|
continue
|
|
|
|
if related_depends:
|
|
for p in s.packages.values():
|
|
if p.name in related_depends:
|
|
break
|
|
else:
|
|
continue
|
|
|
|
if repo_filter is not None and repo_filter not in s.repos:
|
|
continue
|
|
|
|
msys_version = extract_upstream_version(s.version)
|
|
git_version = extract_upstream_version(s.git_version)
|
|
if not version_is_newer_than(git_version, msys_version):
|
|
git_version = ""
|
|
|
|
info = s.upstream_info
|
|
if info is not None and info.version != "":
|
|
if version_is_newer_than(info.version, msys_version):
|
|
to_update.append((s, msys_version, git_version, info.version, info.url, info.date))
|
|
else:
|
|
missing.append(s)
|
|
|
|
# show packages which have recently been build first.
|
|
# assumes high frequency update packages are more important
|
|
to_update.sort(key=lambda i: (i[-1], i[0].name), reverse=True)
|
|
|
|
missing.sort(key=lambda i: i.date, reverse=True)
|
|
|
|
return templates.TemplateResponse("outofdate.html", {
|
|
"request": request,
|
|
"all_sources": all_sources,
|
|
"to_update": to_update,
|
|
"missing": missing,
|
|
"related": related or "",
|
|
"repos": repos,
|
|
"repo_filter": repo_filter,
|
|
}, headers=dict(response.headers))
|
|
|
|
|
|
def get_status_text(key: str) -> str:
|
|
try:
|
|
status = PackageStatus(key)
|
|
except ValueError:
|
|
return key
|
|
if status == PackageStatus.UNKNOWN:
|
|
return "Waiting to be processed"
|
|
elif status == PackageStatus.FAILED_TO_BUILD:
|
|
return "Failed to build"
|
|
elif status == PackageStatus.FINISHED:
|
|
return "Ready for upload"
|
|
elif status == PackageStatus.FINISHED_BUT_BLOCKED:
|
|
return "Ready for upload but waiting for dependencies"
|
|
elif status == PackageStatus.FINISHED_BUT_INCOMPLETE:
|
|
return "Ready for upload but related builds are missing"
|
|
elif status == PackageStatus.MANUAL_BUILD_REQUIRED:
|
|
return "Manual build required"
|
|
elif status == PackageStatus.WAITING_FOR_BUILD:
|
|
return "Waiting to be built"
|
|
elif status == PackageStatus.WAITING_FOR_DEPENDENCIES:
|
|
return "Waiting for dependencies"
|
|
else:
|
|
return key
|
|
|
|
|
|
def get_status_category(key: str) -> str:
|
|
SUCCESS = "success"
|
|
DANGER = "danger"
|
|
|
|
try:
|
|
status = PackageStatus(key)
|
|
except ValueError:
|
|
return DANGER
|
|
|
|
if status in (PackageStatus.FINISHED, PackageStatus.FINISHED_BUT_BLOCKED,
|
|
PackageStatus.FINISHED_BUT_INCOMPLETE):
|
|
return SUCCESS
|
|
elif status in (PackageStatus.WAITING_FOR_BUILD, PackageStatus.WAITING_FOR_DEPENDENCIES,
|
|
PackageStatus.UNKNOWN):
|
|
return ""
|
|
else:
|
|
return DANGER
|
|
|
|
|
|
def get_status_priority(key: str) -> tuple[int, str]:
|
|
"""We want to show the most important status as the primary one"""
|
|
|
|
try:
|
|
status = PackageStatus(key)
|
|
except ValueError:
|
|
return (-1, key)
|
|
|
|
order = [
|
|
PackageStatus.UNKNOWN,
|
|
PackageStatus.FINISHED,
|
|
PackageStatus.MANUAL_BUILD_REQUIRED,
|
|
PackageStatus.FINISHED_BUT_INCOMPLETE,
|
|
PackageStatus.FINISHED_BUT_BLOCKED,
|
|
PackageStatus.WAITING_FOR_BUILD,
|
|
PackageStatus.WAITING_FOR_DEPENDENCIES,
|
|
PackageStatus.FAILED_TO_BUILD,
|
|
]
|
|
|
|
try:
|
|
return (order.index(status), key)
|
|
except ValueError:
|
|
return (-1, key)
|
|
|
|
|
|
def repo_to_builds(repo: str) -> list[str]:
|
|
if repo == "msys":
|
|
return [repo, "msys-src"]
|
|
else:
|
|
return [repo, "mingw-src"]
|
|
|
|
|
|
def get_build_status(srcinfo: SrcInfoPackage, build_types: set[str] = set()) -> list[PackageBuildStatus]:
|
|
build_status = state.build_status
|
|
|
|
entry = None
|
|
for package in build_status.packages:
|
|
if package.name == srcinfo.pkgbase and package.version == srcinfo.build_version:
|
|
entry = package
|
|
break
|
|
|
|
results = []
|
|
if entry is not None:
|
|
for build_type, status in sorted(entry.builds.items(), key=lambda i: get_status_priority(i[1].status), reverse=True):
|
|
status_key = status.status
|
|
if build_types and build_type not in build_types:
|
|
continue
|
|
results.append(
|
|
PackageBuildStatus(
|
|
build_type, get_status_text(status_key),
|
|
status.desc or "", status.urls,
|
|
get_status_category(status_key))
|
|
)
|
|
|
|
if not results:
|
|
for build in sorted(build_types):
|
|
key = "unknown"
|
|
results.append(
|
|
PackageBuildStatus(build, get_status_text(key), "", {}, get_status_category(key)))
|
|
|
|
return results
|
|
|
|
|
|
@router.get('/queue', dependencies=[Depends(Etag(get_etag))])
|
|
async def queue(request: Request, response: Response, build_type: str = "") -> Response:
|
|
# Create entries for all packages where the version doesn't match
|
|
|
|
UpdateEntry = tuple[SrcInfoPackage, Optional[Source], Optional[Package], list[PackageBuildStatus]]
|
|
|
|
build_filter = build_type or None
|
|
srcinfo_repos: dict[str, set[str]] = {}
|
|
|
|
grouped: dict[str, UpdateEntry] = {}
|
|
for s in state.sources.values():
|
|
for k, p in sorted(s.packages.items()):
|
|
if p.name in state.sourceinfos:
|
|
srcinfo = state.sourceinfos[p.name]
|
|
if build_filter is not None and build_filter not in repo_to_builds(srcinfo.repo):
|
|
continue
|
|
if version_is_newer_than(srcinfo.build_version, p.version):
|
|
srcinfo_repos.setdefault(srcinfo.pkgbase, set()).update(repo_to_builds(srcinfo.repo))
|
|
repo_list = srcinfo_repos[srcinfo.pkgbase] if not build_filter else {build_filter}
|
|
new_src = state.sources.get(srcinfo.pkgbase)
|
|
grouped[srcinfo.pkgbase] = (srcinfo, new_src, p, get_build_status(srcinfo, repo_list))
|
|
|
|
# new packages
|
|
available: dict[str, list[SrcInfoPackage]] = {}
|
|
for srcinfo in state.sourceinfos.values():
|
|
if build_filter is not None and build_filter not in repo_to_builds(srcinfo.repo):
|
|
continue
|
|
available.setdefault(srcinfo.pkgname, []).append(srcinfo)
|
|
for s in state.sources.values():
|
|
for p in s.packages.values():
|
|
available.pop(p.name, None)
|
|
|
|
# only one per pkgbase
|
|
for srcinfos in available.values():
|
|
for srcinfo in srcinfos:
|
|
srcinfo_repos.setdefault(srcinfo.pkgbase, set()).update(repo_to_builds(srcinfo.repo))
|
|
repo_list = srcinfo_repos[srcinfo.pkgbase] if not build_filter else {build_filter}
|
|
src, pkg = None, None
|
|
if srcinfo.pkgbase in grouped:
|
|
src, pkg = grouped[srcinfo.pkgbase][1:3]
|
|
elif srcinfo.pkgbase in state.sources:
|
|
src = state.sources[srcinfo.pkgbase]
|
|
grouped[srcinfo.pkgbase] = (srcinfo, src, pkg, get_build_status(srcinfo, repo_list))
|
|
|
|
updates: list[UpdateEntry] = []
|
|
updates = list(grouped.values())
|
|
updates.sort(
|
|
key=lambda i: (i[0].date, i[0].pkgbase, i[0].pkgname),
|
|
reverse=True)
|
|
|
|
# get all packages in the pacman repo which are no in GIT
|
|
removals = []
|
|
for s in state.sources.values():
|
|
for k, p in s.packages.items():
|
|
if build_filter is not None and build_filter not in repo_to_builds(p.repo):
|
|
continue
|
|
if p.name not in state.sourceinfos:
|
|
# FIXME: can also break things if it's the only provides and removed,
|
|
# and also is ok to remove if there is a replacement
|
|
removals.append((p, ", ".join([d.name for d in p.rdepends])))
|
|
|
|
build_types = set()
|
|
for r in get_repositories():
|
|
build_types.update(repo_to_builds(r.name))
|
|
|
|
return templates.TemplateResponse("queue.html", {
|
|
"request": request,
|
|
"updates": updates,
|
|
"removals": removals,
|
|
"build_types": build_types,
|
|
"build_filter": build_filter,
|
|
"cycles": state.build_status.cycles,
|
|
}, headers=dict(response.headers))
|
|
|
|
|
|
@router.get('/new', dependencies=[Depends(Etag(get_etag))])
|
|
@router.get('/removals', dependencies=[Depends(Etag(get_etag))])
|
|
async def new(request: Request, response: Response) -> Response:
|
|
return RedirectResponse(request.url_for('queue'), headers=dict(response.headers))
|
|
|
|
|
|
@router.get('/search', dependencies=[Depends(Etag(get_etag))])
|
|
async def search(request: Request, response: Response, q: str = "", t: str = "") -> Response:
|
|
query = q
|
|
qtype = t
|
|
|
|
if qtype not in ["pkg", "binpkg"]:
|
|
qtype = "pkg"
|
|
|
|
parts = query.split()
|
|
parts_lower = [p.lower() for p in parts]
|
|
res_pkg: list[tuple[float, Package | Source]] = []
|
|
|
|
def get_score(name: str, parts: list[str]) -> float:
|
|
score = 0.0
|
|
for part in parts:
|
|
if part not in name:
|
|
return -1
|
|
score += name.count(part) * len(part) / len(name)
|
|
return score
|
|
|
|
if not query:
|
|
pass
|
|
elif qtype == "pkg":
|
|
for s in state.sources.values():
|
|
score = get_score(s.realname.lower(), parts_lower)
|
|
if score >= 0:
|
|
res_pkg.append((score, s))
|
|
continue
|
|
score = get_score(s.name.lower(), parts_lower)
|
|
if score >= 0:
|
|
res_pkg.append((score, s))
|
|
res_pkg.sort(key=lambda e: (-e[0], e[1].name.lower()))
|
|
elif qtype == "binpkg":
|
|
for s in state.sources.values():
|
|
for sub in s.packages.values():
|
|
score = get_score(sub.realname.lower(), parts_lower)
|
|
if score >= 0:
|
|
res_pkg.append((score, sub))
|
|
continue
|
|
score = get_score(sub.name.lower(), parts_lower)
|
|
if score >= 0:
|
|
res_pkg.append((score, sub))
|
|
res_pkg.sort(key=lambda e: (-e[0], e[1].name.lower()))
|
|
|
|
return templates.TemplateResponse("search.html", {
|
|
"request": request,
|
|
"results": res_pkg,
|
|
"query": query,
|
|
"qtype": qtype,
|
|
}, headers=dict(response.headers))
|
|
|
|
|
|
async def check_is_ready(request: Request, call_next: Callable) -> Response:
|
|
if not state.ready:
|
|
return Response(content="starting up...", status_code=503)
|
|
response: Response = await call_next(request)
|
|
return response
|
|
|
|
|
|
webapp = FastAPI(openapi_url=None)
|
|
webapp.mount("/static", StaticFiles(directory=os.path.join(DIR, "static")), name="static")
|
|
webapp.include_router(router)
|
|
add_etag_exception_handler(webapp)
|