72 lines
2.2 KiB
Python
72 lines
2.2 KiB
Python
# Copyright 2024 Christoph Reiter
|
|
# SPDX-License-Identifier: MIT
|
|
|
|
import json
|
|
|
|
from ..appconfig import CDX_URLS, REQUEST_TIMEOUT
|
|
from ..appstate import Severity, Vulnerability, state
|
|
from ..utils import logger
|
|
from .utils import check_needs_update, get_content_cached
|
|
|
|
|
|
def parse_cdx(data: bytes) -> dict[str, list[Vulnerability]]:
|
|
"""Parse the cdx data and returns a mapping of pkgbase names to a list of
|
|
vulnerabilities."""
|
|
|
|
cdx = json.loads(data)
|
|
|
|
mapping = {}
|
|
for component in cdx["components"]:
|
|
pkgbases = set()
|
|
for property in component.get("properties", []):
|
|
if property["name"] == "msys2:pkgbase":
|
|
pkgbases.add(property["value"])
|
|
bom_ref = component["bom-ref"]
|
|
mapping[bom_ref] = pkgbases
|
|
|
|
def parse_vuln(vuln: dict) -> Vulnerability:
|
|
severity = Severity.UNKNOWN
|
|
for ratings in vuln["ratings"]:
|
|
severity = Severity(ratings["severity"])
|
|
break
|
|
|
|
unaffected_versions = []
|
|
for affects in vuln["affects"]:
|
|
versions = affects.get("versions", [])
|
|
for version in versions:
|
|
if version.get("status") == "unaffected" and "version" in version:
|
|
unaffected_versions.append(version["version"])
|
|
|
|
return Vulnerability(
|
|
id=vuln["id"],
|
|
url=vuln["source"]["url"],
|
|
severity=severity,
|
|
unaffected_versions=unaffected_versions)
|
|
|
|
vuln_mapping: dict[str, list[Vulnerability]] = {}
|
|
for vuln in cdx["vulnerabilities"]:
|
|
for affected in vuln["affects"]:
|
|
bom_ref = affected["ref"]
|
|
pkgbases = mapping[bom_ref]
|
|
parsed_vuln = parse_vuln(vuln)
|
|
for pkgbase in pkgbases:
|
|
vuln_mapping.setdefault(pkgbase, []).append(parsed_vuln)
|
|
|
|
return vuln_mapping
|
|
|
|
|
|
async def update_cdx() -> None:
|
|
urls = CDX_URLS
|
|
if not await check_needs_update(urls):
|
|
return
|
|
|
|
logger.info("update cdx")
|
|
vuln_mapping = {}
|
|
for url in urls:
|
|
logger.info(f"Loading {url!r}")
|
|
data = await get_content_cached(url, timeout=REQUEST_TIMEOUT)
|
|
logger.info(f"Done: {url!r}")
|
|
vuln_mapping.update(parse_cdx(data))
|
|
|
|
state.vulnerabilities = vuln_mapping
|