diff --git a/pyproject.toml b/pyproject.toml index c3041e8be..2f5a8019e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -26,6 +26,7 @@ dependencies = [ "nemollm", "openinference-instrumentation-langchain~=0.1.31", "ordered_set", + "packageurl-python", "pydpkg==1.9.4", "rank_bm25==0.2.2", "tantivy==0.22.2", diff --git a/src/vuln_analysis/functions/cve_generate_vex.py b/src/vuln_analysis/functions/cve_generate_vex.py index 62a9d0598..a7d2ee4b9 100644 --- a/src/vuln_analysis/functions/cve_generate_vex.py +++ b/src/vuln_analysis/functions/cve_generate_vex.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -import json +import uuid from aiq.builder.builder import Builder from aiq.builder.function_info import FunctionInfo @@ -43,12 +43,15 @@ async def _arun(state: AgentMorpheusEngineState) -> AgentMorpheusEngineState: return state if not any(justification.get("justification_label") == "vulnerable" for justification in state.justifications.values()): - logger.info("No vulnerable CVE(s) found. Skipping VEX generation.") - return state + logger.info("No vulnerable CVE(s) found. Generating VEX with known_not_affected status.") try: generator = load_vex_generator(config.vex_format) vex_doc = generator.generate(state) + if vex_doc: + tracking = vex_doc.get("document", {}).get("tracking") + if tracking and tracking.get("id"): + tracking["id"] = f"{tracking['id']}-{uuid.uuid4()}" state.vex = vex_doc except ValueError as e: logger.error("VEX generator initialization failed: %s", e) diff --git a/src/vuln_analysis/utils/vex/implementations/csaf_generator.py b/src/vuln_analysis/utils/vex/implementations/csaf_generator.py index 605c37192..c5fe5f300 100644 --- a/src/vuln_analysis/utils/vex/implementations/csaf_generator.py +++ b/src/vuln_analysis/utils/vex/implementations/csaf_generator.py @@ -26,7 +26,7 @@ from vuln_analysis.data_models.state import AgentMorpheusEngineState from ..vex_generator_base import VexGenerator -from ..vex_utils import get_vex_validator, build_patch_recommendation +from ..vex_utils import build_oci_image_purl, get_vex_validator, build_patch_recommendation from csaf.generator import CSAFGenerator from exploit_iq_commons.logging.loggers_factory import LoggingFactory @@ -71,6 +71,22 @@ # Justification labels JUSTIFICATION_LABEL_VULNERABLE = "vulnerable" +# ExploitIQ justification labels mapped to CSAF 2.0 VEX flag labels +EXPLOITIQ_TO_CSAF_JUSTIFICATION_MAP: dict[str, str] = { + "false_positive": "component_not_present", + "code_not_present": "vulnerable_code_not_present", + "code_not_reachable": "vulnerable_code_not_in_execute_path", + "requires_configuration": "vulnerable_code_cannot_be_controlled_by_adversary", + "requires_dependency": "component_not_present", + "requires_environment": "vulnerable_code_cannot_be_controlled_by_adversary", + "protected_by_compiler": "inline_mitigations_already_exist", + "protected_at_runtime": "inline_mitigations_already_exist", + "protected_at_perimeter": "vulnerable_code_cannot_be_controlled_by_adversary", + "protected_by_mitigating_control": "inline_mitigations_already_exist", + "uncertain": "component_not_present", +} +DEFAULT_CSAF_JUSTIFICATION = "component_not_present" + # Vulnerability statuses STATUS_KNOWN_AFFECTED = "known_affected" STATUS_KNOWN_NOT_AFFECTED = "known_not_affected" @@ -90,6 +106,12 @@ CSAF_SCHEMA_PATH = Path(__file__).resolve().parents[3] / "configs" / "vex" / "csaf" / "v2.0" / "csaf_json_schema.json" +def _map_justification_to_csaf_label(exploitiq_label: str | None) -> str: + if not exploitiq_label: + return DEFAULT_CSAF_JUSTIFICATION + return EXPLOITIQ_TO_CSAF_JUSTIFICATION_MAP.get(exploitiq_label, DEFAULT_CSAF_JUSTIFICATION) + + def _enrich_vulnerabilities_with_notes( csaf_json: Dict[str, Any], intel_map: Dict[str, CveIntel], @@ -162,6 +184,25 @@ def _enrich_vulnerabilities_with_notes( v["notes"] = notes +def _enrich_product_tree_with_purl(csaf_json: Dict[str, Any], purl: str | None) -> None: + """Add product_identification_helper.purl to each product in the product tree.""" + if not purl: + return + + def visit(obj: Any) -> None: + if isinstance(obj, dict): + if "product_id" in obj and "name" in obj: + helper = obj.setdefault("product_identification_helper", {}) + helper["purl"] = purl + for value in obj.values(): + visit(value) + elif isinstance(obj, list): + for item in obj: + visit(item) + + visit(csaf_json.get("product_tree", {})) + + class CsafVexGenerator(VexGenerator): """ CSAF VEX generator. Builds a CSAF JSON document and validates it with the csaf-tool. @@ -202,8 +243,10 @@ def generate(self, state: AgentMorpheusEngineState) -> Dict[str, Any]: ci = intel_map.get(vuln_id) impact = ci.rhsa.threat_severity if ci and ci.rhsa and ci.rhsa.threat_severity else DEFAULT_IMPACT - is_vulnerable = justification.get("justification_label") == JUSTIFICATION_LABEL_VULNERABLE - + justification_label = justification.get("justification_label") + is_vulnerable = justification_label == JUSTIFICATION_LABEL_VULNERABLE + csaf_justification = _map_justification_to_csaf_label(justification_label) + if is_vulnerable: patch_recommendation = build_patch_recommendation(ci, sbom_names) comment = ( @@ -222,7 +265,7 @@ def generate(self, state: AgentMorpheusEngineState) -> Dict[str, Any]: action=comment ) - else: + else: csaf_gen.add_vulnerability( product_name=product_name, release=product_tag, @@ -230,6 +273,7 @@ def generate(self, state: AgentMorpheusEngineState) -> Dict[str, Any]: status=STATUS_KNOWN_NOT_AFFECTED, description="", comment=impact, + justification=csaf_justification, ) csaf_gen.generate_csaf() @@ -243,9 +287,13 @@ def generate(self, state: AgentMorpheusEngineState) -> Dict[str, Any]: csaf_json = json.load(f) # Enrich the CSAF in memory - _enrich_vulnerabilities_with_notes( - csaf_json, intel_map, state.final_summaries, state.justifications - ) + image = message.input.image + if image.analysis_type == "image": + product_purl = build_oci_image_purl(image.name, image.tag, image.digest) + _enrich_product_tree_with_purl(csaf_json, product_purl) + _enrich_vulnerabilities_with_notes( + csaf_json, intel_map, state.final_summaries, state.justifications + ) # Validate the CSAF document against the JSON schema errors = list(get_vex_validator(CSAF_SCHEMA_PATH).iter_errors(csaf_json)) diff --git a/src/vuln_analysis/utils/vex/tests/test_csaf_generator_integration.py b/src/vuln_analysis/utils/vex/tests/test_csaf_generator_integration.py index 5bca102d3..4418a7663 100644 --- a/src/vuln_analysis/utils/vex/tests/test_csaf_generator_integration.py +++ b/src/vuln_analysis/utils/vex/tests/test_csaf_generator_integration.py @@ -36,6 +36,7 @@ from vuln_analysis.data_models.state import AgentMorpheusEngineState from vuln_analysis.utils.vex.implementations.csaf_generator import CsafVexGenerator from vuln_analysis.utils.vex.vex_generator_loader import load_vex_generator +from vuln_analysis.utils.vex.vex_utils import build_oci_image_purl _DEFAULT_SOURCE_INFO = [ @@ -161,6 +162,42 @@ def test_product_tree_contains_product(self, mock_state): product_tree = result["product_tree"] assert _DEFAULT_PRODUCT_NAME in product_tree.get("branches")[0].get("branches")[0].get("name") assert _DEFAULT_PRODUCT_TAG in product_tree.get("branches")[0].get("branches")[0].get("branches")[0].get("name") + + def test_product_tree_includes_oci_purl(self, mock_state): + """Test that product tree includes product_identification_helper with OCI purl.""" + generator = CsafVexGenerator() + result = generator.generate(mock_state) + + product = ( + result["product_tree"] + .get("branches")[0] + .get("branches")[0] + .get("branches")[0] + .get("product") + ) + helper = product.get("product_identification_helper", {}) + assert helper.get("purl") == build_oci_image_purl(_DEFAULT_PRODUCT_NAME, _DEFAULT_PRODUCT_TAG) + + def test_product_tree_purl_prefers_digest_over_tag(self): + """Test that explicit digest is used in purl instead of tag.""" + oci_digest = "sha256:abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890" + state = create_mock_state(product_tag="v1.0.0") + state.original_input.input.image.digest = oci_digest + + generator = CsafVexGenerator() + result = generator.generate(state) + + product = ( + result["product_tree"] + .get("branches")[0] + .get("branches")[0] + .get("branches")[0] + .get("product") + ) + helper = product.get("product_identification_helper", {}) + assert helper.get("purl") == build_oci_image_purl( + _DEFAULT_PRODUCT_NAME, "v1.0.0", oci_digest + ) def test_vulnerable_cve_has_known_affected_status(self, mock_state): """Test that vulnerable CVEs get 'known_affected' status.""" @@ -184,6 +221,26 @@ def test_not_vulnerable_cve_has_known_not_affected_status(self): product_status = vuln.get("product_status", {}) assert "known_not_affected" in product_status + def test_code_not_reachable_maps_to_csaf_execute_path_flag(self): + """Test that code_not_reachable maps to the CSAF execute-path flag.""" + state = create_mock_state( + justification={ + "justification": "Vulnerable function exists but is not called.", + "justification_label": "code_not_reachable", + }, + ) + + generator = CsafVexGenerator() + result = generator.generate(state) + + vuln = result["vulnerabilities"][0] + assert vuln["flags"][0]["label"] == "vulnerable_code_not_in_execute_path" + label_notes = [ + n for n in vuln.get("notes", []) + if n.get("title") == "ExploitIQ Analysis Justification Label" + ] + assert label_notes[0]["text"] == "code_not_reachable" + def test_vulnerable_cve_includes_remediation(self): """Test that vulnerable CVEs include remediation information when patch is available.""" ghsa = CveIntelGhsa( diff --git a/src/vuln_analysis/utils/vex/vex_utils.py b/src/vuln_analysis/utils/vex/vex_utils.py index e27bf227d..f41c92048 100644 --- a/src/vuln_analysis/utils/vex/vex_utils.py +++ b/src/vuln_analysis/utils/vex/vex_utils.py @@ -20,9 +20,11 @@ from pathlib import Path from jsonschema import Draft202012Validator +from packageurl import PackageURL from exploit_iq_commons.data_models.cve_intel import CveIntel from exploit_iq_commons.logging.loggers_factory import LoggingFactory +from urllib.parse import urlparse logger = LoggingFactory.get_agent_logger(__name__) @@ -58,6 +60,56 @@ def get_patched_package(vuln: dict) -> tuple[str | None, str | None]: return pkg.get("name"), vuln.get("first_patched_version") +def build_oci_image_purl( + image_name: str, + tag: str | None = None, + digest: str | None = None, +) -> str | None: + """ + Build an OCI package URL (purl) for a container image. + + Prefers an explicit digest, then falls back to the image tag. + """ + image_path = image_name + parsed = urlparse(f"//{image_path}") + registry = parsed.netloc + # qualifiers include registry and full name which all already exist in image_path + qualifiers = {"repository_url": image_path} if image_path else {} + path_parts = [part for part in parsed.path.strip("/").split("/") if part] + if path_parts: + if len(path_parts) > 1: + name = path_parts[-1] + namespace = "/".join(path_parts[:-1]) + else: + name = path_parts[0] + namespace = None + elif parsed.netloc: + name = parsed.netloc + namespace = None + else: + name = image_path + namespace = None + + version = digest or tag + # oci purl specification required to emit namespace, therefor it is set to None + purl = PackageURL( + type="oci", + namespace=None, + name=name, + version=version, + qualifiers=qualifiers if qualifiers else None, + ) + logger.debug( + "Building OCI image purl components: registry=%s, qualifiers=%s, name=%s, version=%s", + registry, + qualifiers, + name, + version, + ) + logger.debug("Resulting OCI image purl: %s", purl.to_string()) + return purl.to_string() + + def build_patch_recommendation(ci: CveIntel, sbom_package_names: set[str] | None) -> str: """ Build a patch recommendation string from GHSA data. @@ -93,4 +145,3 @@ def build_patch_recommendation(ci: CveIntel, sbom_package_names: set[str] | None if not name_to_version: return "" return ", ".join(f"{name}:{patch}" for name, patch in name_to_version.items()) -