From b859bdef2fdc375ef691488355f73e1a5976a25b Mon Sep 17 00:00:00 2001 From: Gal Netanel Date: Sun, 14 Jun 2026 11:01:52 +0300 Subject: [PATCH 1/7] APPENG-5327: generate VEX report even when no vulnerabilities are found --- src/vuln_analysis/functions/cve_generate_vex.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/vuln_analysis/functions/cve_generate_vex.py b/src/vuln_analysis/functions/cve_generate_vex.py index 62a9d0598..1cd2bc789 100644 --- a/src/vuln_analysis/functions/cve_generate_vex.py +++ b/src/vuln_analysis/functions/cve_generate_vex.py @@ -43,8 +43,7 @@ async def _arun(state: AgentMorpheusEngineState) -> AgentMorpheusEngineState: return state if not any(justification.get("justification_label") == "vulnerable" for justification in state.justifications.values()): - logger.info("No vulnerable CVE(s) found. Skipping VEX generation.") - return state + logger.info("No vulnerable CVE(s) found. Generating VEX with known_not_affected status.") try: generator = load_vex_generator(config.vex_format) From 12c2058b9ee4ecc2e9eb3dc311852d1814172739 Mon Sep 17 00:00:00 2001 From: Gal Netanel Date: Tue, 16 Jun 2026 10:28:20 +0300 Subject: [PATCH 2/7] Update flags labels with thh correct value --- .../vex/implementations/csaf_generator.py | 31 +++++++++++++++++-- .../tests/test_csaf_generator_integration.py | 20 ++++++++++++ 2 files changed, 48 insertions(+), 3 deletions(-) diff --git a/src/vuln_analysis/utils/vex/implementations/csaf_generator.py b/src/vuln_analysis/utils/vex/implementations/csaf_generator.py index 605c37192..1d2a8bf47 100644 --- a/src/vuln_analysis/utils/vex/implementations/csaf_generator.py +++ b/src/vuln_analysis/utils/vex/implementations/csaf_generator.py @@ -71,6 +71,22 @@ # Justification labels JUSTIFICATION_LABEL_VULNERABLE = "vulnerable" +# ExploitIQ justification labels mapped to CSAF 2.0 VEX flag labels +EXPLOITIQ_TO_CSAF_JUSTIFICATION_MAP: dict[str, str] = { + "false_positive": "component_not_present", + "code_not_present": "vulnerable_code_not_present", + "code_not_reachable": "vulnerable_code_not_in_execute_path", + "requires_configuration": "vulnerable_code_cannot_be_controlled_by_adversary", + "requires_dependency": "component_not_present", + "requires_environment": "vulnerable_code_cannot_be_controlled_by_adversary", + "protected_by_compiler": "inline_mitigations_already_exist", + "protected_at_runtime": "inline_mitigations_already_exist", + "protected_at_perimeter": "vulnerable_code_cannot_be_controlled_by_adversary", + "protected_by_mitigating_control": "inline_mitigations_already_exist", + "uncertain": "component_not_present", +} +DEFAULT_CSAF_JUSTIFICATION = "component_not_present" + # Vulnerability statuses STATUS_KNOWN_AFFECTED = "known_affected" STATUS_KNOWN_NOT_AFFECTED = "known_not_affected" @@ -90,6 +106,12 @@ CSAF_SCHEMA_PATH = Path(__file__).resolve().parents[3] / "configs" / "vex" / "csaf" / "v2.0" / "csaf_json_schema.json" +def _map_justification_to_csaf_label(exploitiq_label: str | None) -> str: + if not exploitiq_label: + return DEFAULT_CSAF_JUSTIFICATION + return EXPLOITIQ_TO_CSAF_JUSTIFICATION_MAP.get(exploitiq_label, DEFAULT_CSAF_JUSTIFICATION) + + def _enrich_vulnerabilities_with_notes( csaf_json: Dict[str, Any], intel_map: Dict[str, CveIntel], @@ -202,8 +224,10 @@ def generate(self, state: AgentMorpheusEngineState) -> Dict[str, Any]: ci = intel_map.get(vuln_id) impact = ci.rhsa.threat_severity if ci and ci.rhsa and ci.rhsa.threat_severity else DEFAULT_IMPACT - is_vulnerable = justification.get("justification_label") == JUSTIFICATION_LABEL_VULNERABLE - + justification_label = justification.get("justification_label") + is_vulnerable = justification_label == JUSTIFICATION_LABEL_VULNERABLE + csaf_justification = _map_justification_to_csaf_label(justification_label) + if is_vulnerable: patch_recommendation = build_patch_recommendation(ci, sbom_names) comment = ( @@ -222,7 +246,7 @@ def generate(self, state: AgentMorpheusEngineState) -> Dict[str, Any]: action=comment ) - else: + else: csaf_gen.add_vulnerability( product_name=product_name, release=product_tag, @@ -230,6 +254,7 @@ def generate(self, state: AgentMorpheusEngineState) -> Dict[str, Any]: status=STATUS_KNOWN_NOT_AFFECTED, description="", comment=impact, + justification=csaf_justification, ) csaf_gen.generate_csaf() diff --git a/src/vuln_analysis/utils/vex/tests/test_csaf_generator_integration.py b/src/vuln_analysis/utils/vex/tests/test_csaf_generator_integration.py index 5bca102d3..331856203 100644 --- a/src/vuln_analysis/utils/vex/tests/test_csaf_generator_integration.py +++ b/src/vuln_analysis/utils/vex/tests/test_csaf_generator_integration.py @@ -184,6 +184,26 @@ def test_not_vulnerable_cve_has_known_not_affected_status(self): product_status = vuln.get("product_status", {}) assert "known_not_affected" in product_status + def test_code_not_reachable_maps_to_csaf_execute_path_flag(self): + """Test that code_not_reachable maps to the CSAF execute-path flag.""" + state = create_mock_state( + justification={ + "justification": "Vulnerable function exists but is not called.", + "justification_label": "code_not_reachable", + }, + ) + + generator = CsafVexGenerator() + result = generator.generate(state) + + vuln = result["vulnerabilities"][0] + assert vuln["flags"][0]["label"] == "vulnerable_code_not_in_execute_path" + label_notes = [ + n for n in vuln.get("notes", []) + if n.get("title") == "ExploitIQ Analysis Justification Label" + ] + assert label_notes[0]["text"] == "code_not_reachable" + def test_vulnerable_cve_includes_remediation(self): """Test that vulnerable CVEs include remediation information when patch is available.""" ghsa = CveIntelGhsa( From 71f6302dd0dc50854bc4a7f5a59c523bbb8ee78d Mon Sep 17 00:00:00 2001 From: Gal Netanel Date: Tue, 16 Jun 2026 12:14:03 +0300 Subject: [PATCH 3/7] Add uuid to tracking-id to add uniquness to the id --- src/vuln_analysis/functions/cve_generate_vex.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/vuln_analysis/functions/cve_generate_vex.py b/src/vuln_analysis/functions/cve_generate_vex.py index 1cd2bc789..a7d2ee4b9 100644 --- a/src/vuln_analysis/functions/cve_generate_vex.py +++ b/src/vuln_analysis/functions/cve_generate_vex.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -import json +import uuid from aiq.builder.builder import Builder from aiq.builder.function_info import FunctionInfo @@ -48,6 +48,10 @@ async def _arun(state: AgentMorpheusEngineState) -> AgentMorpheusEngineState: try: generator = load_vex_generator(config.vex_format) vex_doc = generator.generate(state) + if vex_doc: + tracking = vex_doc.get("document", {}).get("tracking") + if tracking and tracking.get("id"): + tracking["id"] = f"{tracking['id']}-{uuid.uuid4()}" state.vex = vex_doc except ValueError as e: logger.error("VEX generator initialization failed: %s", e) From 8254d7da0366a9660373b615cf92163d940fdf3c Mon Sep 17 00:00:00 2001 From: Gal Netanel Date: Tue, 16 Jun 2026 15:27:24 +0300 Subject: [PATCH 4/7] Add purl to vex report --- pyproject.toml | 1 + .../vex/implementations/csaf_generator.py | 24 ++++++++++++- .../tests/test_csaf_generator_integration.py | 34 +++++++++++++++++++ src/vuln_analysis/utils/vex/vex_utils.py | 20 +++++++++++ 4 files changed, 78 insertions(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index c3041e8be..2f5a8019e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -26,6 +26,7 @@ dependencies = [ "nemollm", "openinference-instrumentation-langchain~=0.1.31", "ordered_set", + "packageurl-python", "pydpkg==1.9.4", "rank_bm25==0.2.2", "tantivy==0.22.2", diff --git a/src/vuln_analysis/utils/vex/implementations/csaf_generator.py b/src/vuln_analysis/utils/vex/implementations/csaf_generator.py index 1d2a8bf47..93c069795 100644 --- a/src/vuln_analysis/utils/vex/implementations/csaf_generator.py +++ b/src/vuln_analysis/utils/vex/implementations/csaf_generator.py @@ -26,7 +26,7 @@ from vuln_analysis.data_models.state import AgentMorpheusEngineState from ..vex_generator_base import VexGenerator -from ..vex_utils import get_vex_validator, build_patch_recommendation +from ..vex_utils import build_oci_image_purl, get_vex_validator, build_patch_recommendation from csaf.generator import CSAFGenerator from exploit_iq_commons.logging.loggers_factory import LoggingFactory @@ -184,6 +184,25 @@ def _enrich_vulnerabilities_with_notes( v["notes"] = notes +def _enrich_product_tree_with_purl(csaf_json: Dict[str, Any], purl: str | None) -> None: + """Add product_identification_helper.purl to each product in the product tree.""" + if not purl: + return + + def visit(obj: Any) -> None: + if isinstance(obj, dict): + if "product_id" in obj and "name" in obj: + helper = obj.setdefault("product_identification_helper", {}) + helper["purl"] = purl + for value in obj.values(): + visit(value) + elif isinstance(obj, list): + for item in obj: + visit(item) + + visit(csaf_json.get("product_tree", {})) + + class CsafVexGenerator(VexGenerator): """ CSAF VEX generator. Builds a CSAF JSON document and validates it with the csaf-tool. @@ -268,6 +287,9 @@ def generate(self, state: AgentMorpheusEngineState) -> Dict[str, Any]: csaf_json = json.load(f) # Enrich the CSAF in memory + image = message.input.image + product_purl = build_oci_image_purl(image.name, image.tag, image.digest) + _enrich_product_tree_with_purl(csaf_json, product_purl) _enrich_vulnerabilities_with_notes( csaf_json, intel_map, state.final_summaries, state.justifications ) diff --git a/src/vuln_analysis/utils/vex/tests/test_csaf_generator_integration.py b/src/vuln_analysis/utils/vex/tests/test_csaf_generator_integration.py index 331856203..b16dba2e7 100644 --- a/src/vuln_analysis/utils/vex/tests/test_csaf_generator_integration.py +++ b/src/vuln_analysis/utils/vex/tests/test_csaf_generator_integration.py @@ -161,6 +161,40 @@ def test_product_tree_contains_product(self, mock_state): product_tree = result["product_tree"] assert _DEFAULT_PRODUCT_NAME in product_tree.get("branches")[0].get("branches")[0].get("name") assert _DEFAULT_PRODUCT_TAG in product_tree.get("branches")[0].get("branches")[0].get("branches")[0].get("name") + + def test_product_tree_includes_oci_purl(self, mock_state): + """Test that product tree includes product_identification_helper with OCI purl.""" + generator = CsafVexGenerator() + result = generator.generate(mock_state) + + product = ( + result["product_tree"] + .get("branches")[0] + .get("branches")[0] + .get("branches")[0] + .get("product") + ) + helper = product.get("product_identification_helper", {}) + assert helper.get("purl") == f"pkg:oci/{_DEFAULT_PRODUCT_NAME}@{_DEFAULT_PRODUCT_TAG}" + + def test_product_tree_purl_prefers_digest_over_tag(self): + """Test that explicit digest is used in purl instead of tag.""" + oci_digest = "sha256:abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890" + state = create_mock_state(product_tag="v1.0.0") + state.original_input.input.image.digest = oci_digest + + generator = CsafVexGenerator() + result = generator.generate(state) + + product = ( + result["product_tree"] + .get("branches")[0] + .get("branches")[0] + .get("branches")[0] + .get("product") + ) + helper = product.get("product_identification_helper", {}) + assert helper.get("purl") == f"pkg:oci/{_DEFAULT_PRODUCT_NAME}@{oci_digest}" def test_vulnerable_cve_has_known_affected_status(self, mock_state): """Test that vulnerable CVEs get 'known_affected' status.""" diff --git a/src/vuln_analysis/utils/vex/vex_utils.py b/src/vuln_analysis/utils/vex/vex_utils.py index e27bf227d..627de6473 100644 --- a/src/vuln_analysis/utils/vex/vex_utils.py +++ b/src/vuln_analysis/utils/vex/vex_utils.py @@ -20,6 +20,7 @@ from pathlib import Path from jsonschema import Draft202012Validator +from packageurl import PackageURL from exploit_iq_commons.data_models.cve_intel import CveIntel from exploit_iq_commons.logging.loggers_factory import LoggingFactory @@ -58,6 +59,25 @@ def get_patched_package(vuln: dict) -> tuple[str | None, str | None]: return pkg.get("name"), vuln.get("first_patched_version") +def build_oci_image_purl( + image_name: str | None, + tag: str | None = None, + digest: str | None = None, +) -> str | None: + """ + Build an OCI package URL (purl) for a container image. + + Prefers an explicit digest, then falls back to the image tag. + """ + if not image_name: + return None + + version_ref = digest or tag + if version_ref: + return PackageURL(type="oci", name=image_name, version=version_ref).to_string() + return PackageURL(type="oci", name=image_name).to_string() + + def build_patch_recommendation(ci: CveIntel, sbom_package_names: set[str] | None) -> str: """ Build a patch recommendation string from GHSA data. From 32cca992de880d5d8799f889874a18ce86a64bb7 Mon Sep 17 00:00:00 2001 From: Gal Netanel Date: Wed, 17 Jun 2026 14:12:09 +0300 Subject: [PATCH 5/7] Bugfix in purl for oci-image --- .../vex/implementations/csaf_generator.py | 11 ++--- .../tests/test_csaf_generator_integration.py | 7 +++- src/vuln_analysis/utils/vex/vex_utils.py | 41 +++++++++++++++---- 3 files changed, 44 insertions(+), 15 deletions(-) diff --git a/src/vuln_analysis/utils/vex/implementations/csaf_generator.py b/src/vuln_analysis/utils/vex/implementations/csaf_generator.py index 93c069795..c5fe5f300 100644 --- a/src/vuln_analysis/utils/vex/implementations/csaf_generator.py +++ b/src/vuln_analysis/utils/vex/implementations/csaf_generator.py @@ -288,11 +288,12 @@ def generate(self, state: AgentMorpheusEngineState) -> Dict[str, Any]: # Enrich the CSAF in memory image = message.input.image - product_purl = build_oci_image_purl(image.name, image.tag, image.digest) - _enrich_product_tree_with_purl(csaf_json, product_purl) - _enrich_vulnerabilities_with_notes( - csaf_json, intel_map, state.final_summaries, state.justifications - ) + if image.analysis_type == "image": + product_purl = build_oci_image_purl(image.name, image.tag, image.digest) + _enrich_product_tree_with_purl(csaf_json, product_purl) + _enrich_vulnerabilities_with_notes( + csaf_json, intel_map, state.final_summaries, state.justifications + ) # Validate the CSAF document against the JSON schema errors = list(get_vex_validator(CSAF_SCHEMA_PATH).iter_errors(csaf_json)) diff --git a/src/vuln_analysis/utils/vex/tests/test_csaf_generator_integration.py b/src/vuln_analysis/utils/vex/tests/test_csaf_generator_integration.py index b16dba2e7..4418a7663 100644 --- a/src/vuln_analysis/utils/vex/tests/test_csaf_generator_integration.py +++ b/src/vuln_analysis/utils/vex/tests/test_csaf_generator_integration.py @@ -36,6 +36,7 @@ from vuln_analysis.data_models.state import AgentMorpheusEngineState from vuln_analysis.utils.vex.implementations.csaf_generator import CsafVexGenerator from vuln_analysis.utils.vex.vex_generator_loader import load_vex_generator +from vuln_analysis.utils.vex.vex_utils import build_oci_image_purl _DEFAULT_SOURCE_INFO = [ @@ -175,7 +176,7 @@ def test_product_tree_includes_oci_purl(self, mock_state): .get("product") ) helper = product.get("product_identification_helper", {}) - assert helper.get("purl") == f"pkg:oci/{_DEFAULT_PRODUCT_NAME}@{_DEFAULT_PRODUCT_TAG}" + assert helper.get("purl") == build_oci_image_purl(_DEFAULT_PRODUCT_NAME, _DEFAULT_PRODUCT_TAG) def test_product_tree_purl_prefers_digest_over_tag(self): """Test that explicit digest is used in purl instead of tag.""" @@ -194,7 +195,9 @@ def test_product_tree_purl_prefers_digest_over_tag(self): .get("product") ) helper = product.get("product_identification_helper", {}) - assert helper.get("purl") == f"pkg:oci/{_DEFAULT_PRODUCT_NAME}@{oci_digest}" + assert helper.get("purl") == build_oci_image_purl( + _DEFAULT_PRODUCT_NAME, "v1.0.0", oci_digest + ) def test_vulnerable_cve_has_known_affected_status(self, mock_state): """Test that vulnerable CVEs get 'known_affected' status.""" diff --git a/src/vuln_analysis/utils/vex/vex_utils.py b/src/vuln_analysis/utils/vex/vex_utils.py index 627de6473..511fed6f6 100644 --- a/src/vuln_analysis/utils/vex/vex_utils.py +++ b/src/vuln_analysis/utils/vex/vex_utils.py @@ -24,6 +24,7 @@ from exploit_iq_commons.data_models.cve_intel import CveIntel from exploit_iq_commons.logging.loggers_factory import LoggingFactory +from urllib.parse import urlparse logger = LoggingFactory.get_agent_logger(__name__) @@ -60,7 +61,7 @@ def get_patched_package(vuln: dict) -> tuple[str | None, str | None]: def build_oci_image_purl( - image_name: str | None, + image_name: str, tag: str | None = None, digest: str | None = None, ) -> str | None: @@ -69,13 +70,37 @@ def build_oci_image_purl( Prefers an explicit digest, then falls back to the image tag. """ - if not image_name: - return None - - version_ref = digest or tag - if version_ref: - return PackageURL(type="oci", name=image_name, version=version_ref).to_string() - return PackageURL(type="oci", name=image_name).to_string() + image_path = image_name + + parsed = urlparse(f"//{image_path}") + registry = parsed.netloc + qualifiers = {"repository_url": registry} if registry else {} + + path_parts = [part for part in parsed.path.strip("/").split("/") if part] + if path_parts: + if len(path_parts) > 1: + name = path_parts[-1] + namespace = "/".join(path_parts[:-1]) + else: + name = path_parts[0] + namespace = None + elif parsed.netloc: + name = parsed.netloc + namespace = None + else: + name = image_path + namespace = None + + version = digest or tag + + purl = PackageURL( + type="oci", + namespace=namespace, + name=name, + version=version, + qualifiers=qualifiers if qualifiers else None, + ) + return purl.to_string() def build_patch_recommendation(ci: CveIntel, sbom_package_names: set[str] | None) -> str: From f0b5e636390f1bfba79ceeb52e8c9bf9bbae12fa Mon Sep 17 00:00:00 2001 From: Gal Netanel Date: Sun, 21 Jun 2026 11:24:55 +0300 Subject: [PATCH 6/7] Remove namespace from oci purl --- src/vuln_analysis/utils/vex/vex_utils.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/vuln_analysis/utils/vex/vex_utils.py b/src/vuln_analysis/utils/vex/vex_utils.py index 511fed6f6..05914f62d 100644 --- a/src/vuln_analysis/utils/vex/vex_utils.py +++ b/src/vuln_analysis/utils/vex/vex_utils.py @@ -92,10 +92,10 @@ def build_oci_image_purl( namespace = None version = digest or tag - + # oci purl specification required to emit namespace, therefor it is set to None purl = PackageURL( type="oci", - namespace=namespace, + namespace=None, name=name, version=version, qualifiers=qualifiers if qualifiers else None, From 7d3abb977d23fb67301572969521ebedff29af52 Mon Sep 17 00:00:00 2001 From: Gal Netanel Date: Tue, 23 Jun 2026 14:26:47 +0300 Subject: [PATCH 7/7] Update full image name in repository_url --- src/vuln_analysis/utils/vex/vex_utils.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/src/vuln_analysis/utils/vex/vex_utils.py b/src/vuln_analysis/utils/vex/vex_utils.py index 05914f62d..f41c92048 100644 --- a/src/vuln_analysis/utils/vex/vex_utils.py +++ b/src/vuln_analysis/utils/vex/vex_utils.py @@ -71,11 +71,10 @@ def build_oci_image_purl( Prefers an explicit digest, then falls back to the image tag. """ image_path = image_name - parsed = urlparse(f"//{image_path}") registry = parsed.netloc - qualifiers = {"repository_url": registry} if registry else {} - + # qualifiers include registry and full name which all already exist in image_path + qualifiers = {"repository_url": image_path} if image_path else {} path_parts = [part for part in parsed.path.strip("/").split("/") if part] if path_parts: if len(path_parts) > 1: @@ -100,6 +99,14 @@ def build_oci_image_purl( version=version, qualifiers=qualifiers if qualifiers else None, ) + logger.debug( + "Building OCI image purl components: registry=%s, qualifiers=%s, name=%s, version=%s", + registry, + qualifiers, + name, + version, + ) + logger.debug("Resulting OCI image purl: %s", purl.to_string()) return purl.to_string() @@ -138,4 +145,3 @@ def build_patch_recommendation(ci: CveIntel, sbom_package_names: set[str] | None if not name_to_version: return "" return ", ".join(f"{name}:{patch}" for name, patch in name_to_version.items()) -