Skip to content
This repository has been archived by the owner on Sep 12, 2024. It is now read-only.

Commit

Permalink
add csaf parser
Browse files Browse the repository at this point in the history
  • Loading branch information
jasinner committed Aug 16, 2024
1 parent 568fbe6 commit 99b7bb3
Show file tree
Hide file tree
Showing 5 changed files with 141 additions and 0 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
__pycache__/
26 changes: 26 additions & 0 deletions csaf_converter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#!/usr/bin/env python3

# Convert a CSAF document to OSV format
# i.e. https://access.redhat.com/security/data/csaf/v2/advisories/2024/rhsa-2024_4546.json

import argparse

from rhel_osv.csaf import CSAF


def main():
parser = argparse.ArgumentParser(description='CSAF to OSV Converter')
parser.add_argument("csaf", metavar="FILE", help='CSAF file to process')

args = parser.parse_args()

print(f"Parsing {args.csaf}")
csaf = CSAF(args.csaf)
print(f"Advisory {csaf.id} affects products: {set(csaf.cpes.values())}")
print(f"CVEs: {[v.cve_id for v in csaf.vulnerabilities]}")
print(f"References:")
for r in csaf.references:
print(f" {r.url}")

if __name__ == '__main__':
main()
Empty file added rhel_osv/__init__.py
Empty file.
Binary file added rhel_osv/__pycache__/__init__.cpython-312.pyc
Binary file not shown.
114 changes: 114 additions & 0 deletions rhel_osv/csaf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
import json
from pathlib import Path
from typing import Any


class Reference(object):
def __init__(self, csaf_ref: dict[str, str]):
self.category = csaf_ref["category"]
self.summary = csaf_ref["summary"]
self.url = csaf_ref["url"]


class Remediation(object):
def __init__(self, csaf_product_id: str, cpes: dict[str, str], purls: dict[str, str]):
split_product = csaf_product_id.split(":", maxsplit=1)
split_product_len = len(split_product)
if split_product_len == 1:
raise ValueError(f"Did not find ':' in product_id: {csaf_product_id}")
self.product = split_product[0]
self.cpe = cpes.get(self.product)
component_version = split_product[1]
self.product_version = component_version
self.purl = purls.get(self.product_version)
split_component_version = component_version.rsplit("-", maxsplit=2)
if len(split_component_version) != 3:
raise ValueError(f"Could not convert component into NEVRA: {component_version}")
self.component = split_component_version[0]
self.fixed_version = "-".join((split_component_version[1], split_component_version[2]))


class Vulnerability(object):
def __init__(self, csaf_vuln: dict[str, Any], cpes: dict[str, str], purls: dict[str, str]):
self.cve_id = csaf_vuln["cve"]
for score in csaf_vuln["scores"]:
if "cvss_v3" in score:
self.cvss_v3_vector = score["cvss_v3"]["vectorString"]
self.cvss_v3_base_score = score["cvss_v3"]["baseScore"]
self.remediations = []
for product_id in csaf_vuln["product_status"]["fixed"]:
try:
self.remediations.append(Remediation(product_id, cpes, purls))
except ValueError as e:
print(f"Warning: Could not parse product_id: {product_id}: {e}")


# from https://stackoverflow.com/questions/9807634/find-all-occurrences-of-a-key-in-nested-dictionaries-and-lists
def gen_dict_extract(key, var):
if hasattr(var, "items"):
for k, v in var.items():
if k == key:
yield v
if isinstance(v, dict):
for result in gen_dict_extract(key, v):
yield result
elif isinstance(v, list):
for d in v:
for result in gen_dict_extract(key, d):
yield result


def build_product_maps(product_tree_branches: dict) -> tuple[dict[str, str], dict[str, str]]:
cpe_map = {}
purl_map = {}
products = gen_dict_extract("product", product_tree_branches)
for product in products:
product_id = product["product_id"]
if "product_identification_helper" in product.keys():
helper = product["product_identification_helper"]
if "cpe" in helper:
cpe_map[product_id] = helper["cpe"]
elif "purl" in helper:
purl_map[product_id] = helper["purl"]
return cpe_map, purl_map


class CSAF(object):

def __init__(self, csaffile: str):
file_path = Path(csaffile)
if not file_path.exists():
print(f"Missing CSAF file: {csaffile}.")
exit(1)

with open(csaffile) as fp:
csafdata = json.load(fp)

if not csafdata:
print(f"Unable to load CSAF data from {csaffile}.")
exit(1)

self.doc = csafdata["document"]

self.csaf = {"type": self.doc["category"], "csaf_version": self.doc["csaf_version"]}

# only support csaf_vex 2.0
if self.csaf != {"type": "csaf_vex", "csaf_version": "2.0"}:
print(f"Sorry, I can only handle csaf_vex 2.0 documents, this one is {self.csaf}")
exit(1)

self.title = self.doc["title"]

self.references = [Reference(r) for r in self.doc["references"]]

# TODO compare this against the reference 'self'
file_extension = file_path.suffix
self.id = file_path.name.removesuffix(file_extension)

self.cpes, self.purls = build_product_maps(csafdata['product_tree'])

self.vulnerabilities = [
Vulnerability(v, self.cpes, self.purls) for v in (csafdata["vulnerabilities"])
]

# TODO parse references

0 comments on commit 99b7bb3

Please sign in to comment.