Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ dependencies = [
"nemollm",
"openinference-instrumentation-langchain~=0.1.31",
"ordered_set",
"packageurl-python",
"pydpkg==1.9.4",
"rank_bm25==0.2.2",
"tantivy==0.22.2",
Expand Down
9 changes: 6 additions & 3 deletions src/vuln_analysis/functions/cve_generate_vex.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import json
import uuid

from aiq.builder.builder import Builder
from aiq.builder.function_info import FunctionInfo
Expand Down Expand Up @@ -43,12 +43,15 @@ async def _arun(state: AgentMorpheusEngineState) -> AgentMorpheusEngineState:
return state

if not any(justification.get("justification_label") == "vulnerable" for justification in state.justifications.values()):
logger.info("No vulnerable CVE(s) found. Skipping VEX generation.")
return state
logger.info("No vulnerable CVE(s) found. Generating VEX with known_not_affected status.")

try:
generator = load_vex_generator(config.vex_format)
vex_doc = generator.generate(state)
if vex_doc:
tracking = vex_doc.get("document", {}).get("tracking")
if tracking and tracking.get("id"):
tracking["id"] = f"{tracking['id']}-{uuid.uuid4()}"
state.vex = vex_doc
except ValueError as e:
logger.error("VEX generator initialization failed: %s", e)
Expand Down
62 changes: 55 additions & 7 deletions src/vuln_analysis/utils/vex/implementations/csaf_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from vuln_analysis.data_models.state import AgentMorpheusEngineState

from ..vex_generator_base import VexGenerator
from ..vex_utils import get_vex_validator, build_patch_recommendation
from ..vex_utils import build_oci_image_purl, get_vex_validator, build_patch_recommendation
from csaf.generator import CSAFGenerator

from exploit_iq_commons.logging.loggers_factory import LoggingFactory
Expand Down Expand Up @@ -71,6 +71,22 @@
# Justification labels
JUSTIFICATION_LABEL_VULNERABLE = "vulnerable"

# ExploitIQ justification labels mapped to CSAF 2.0 VEX flag labels
EXPLOITIQ_TO_CSAF_JUSTIFICATION_MAP: dict[str, str] = {
"false_positive": "component_not_present",
"code_not_present": "vulnerable_code_not_present",
"code_not_reachable": "vulnerable_code_not_in_execute_path",
"requires_configuration": "vulnerable_code_cannot_be_controlled_by_adversary",
"requires_dependency": "component_not_present",
"requires_environment": "vulnerable_code_cannot_be_controlled_by_adversary",
"protected_by_compiler": "inline_mitigations_already_exist",
"protected_at_runtime": "inline_mitigations_already_exist",
"protected_at_perimeter": "vulnerable_code_cannot_be_controlled_by_adversary",
"protected_by_mitigating_control": "inline_mitigations_already_exist",
"uncertain": "component_not_present",
}
DEFAULT_CSAF_JUSTIFICATION = "component_not_present"

# Vulnerability statuses
STATUS_KNOWN_AFFECTED = "known_affected"
STATUS_KNOWN_NOT_AFFECTED = "known_not_affected"
Expand All @@ -90,6 +106,12 @@
CSAF_SCHEMA_PATH = Path(__file__).resolve().parents[3] / "configs" / "vex" / "csaf" / "v2.0" / "csaf_json_schema.json"


def _map_justification_to_csaf_label(exploitiq_label: str | None) -> str:
if not exploitiq_label:
return DEFAULT_CSAF_JUSTIFICATION
return EXPLOITIQ_TO_CSAF_JUSTIFICATION_MAP.get(exploitiq_label, DEFAULT_CSAF_JUSTIFICATION)


def _enrich_vulnerabilities_with_notes(
csaf_json: Dict[str, Any],
intel_map: Dict[str, CveIntel],
Expand Down Expand Up @@ -162,6 +184,25 @@ def _enrich_vulnerabilities_with_notes(
v["notes"] = notes


def _enrich_product_tree_with_purl(csaf_json: Dict[str, Any], purl: str | None) -> None:
"""Add product_identification_helper.purl to each product in the product tree."""
if not purl:
return

def visit(obj: Any) -> None:
if isinstance(obj, dict):
if "product_id" in obj and "name" in obj:
helper = obj.setdefault("product_identification_helper", {})
helper["purl"] = purl
for value in obj.values():
visit(value)
elif isinstance(obj, list):
for item in obj:
visit(item)

visit(csaf_json.get("product_tree", {}))


class CsafVexGenerator(VexGenerator):
"""
CSAF VEX generator. Builds a CSAF JSON document and validates it with the csaf-tool.
Expand Down Expand Up @@ -202,8 +243,10 @@ def generate(self, state: AgentMorpheusEngineState) -> Dict[str, Any]:
ci = intel_map.get(vuln_id)
impact = ci.rhsa.threat_severity if ci and ci.rhsa and ci.rhsa.threat_severity else DEFAULT_IMPACT

is_vulnerable = justification.get("justification_label") == JUSTIFICATION_LABEL_VULNERABLE

justification_label = justification.get("justification_label")
is_vulnerable = justification_label == JUSTIFICATION_LABEL_VULNERABLE
csaf_justification = _map_justification_to_csaf_label(justification_label)

if is_vulnerable:
patch_recommendation = build_patch_recommendation(ci, sbom_names)
comment = (
Expand All @@ -222,14 +265,15 @@ def generate(self, state: AgentMorpheusEngineState) -> Dict[str, Any]:
action=comment
)

else:
else:
csaf_gen.add_vulnerability(
product_name=product_name,
release=product_tag,
id=vuln_id,
status=STATUS_KNOWN_NOT_AFFECTED,
description="",
comment=impact,
justification=csaf_justification,
)

csaf_gen.generate_csaf()
Expand All @@ -243,9 +287,13 @@ def generate(self, state: AgentMorpheusEngineState) -> Dict[str, Any]:
csaf_json = json.load(f)

# Enrich the CSAF in memory
_enrich_vulnerabilities_with_notes(
csaf_json, intel_map, state.final_summaries, state.justifications
)
image = message.input.image
if image.analysis_type == "image":
product_purl = build_oci_image_purl(image.name, image.tag, image.digest)
_enrich_product_tree_with_purl(csaf_json, product_purl)
_enrich_vulnerabilities_with_notes(
csaf_json, intel_map, state.final_summaries, state.justifications
)

# Validate the CSAF document against the JSON schema
errors = list(get_vex_validator(CSAF_SCHEMA_PATH).iter_errors(csaf_json))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
from vuln_analysis.data_models.state import AgentMorpheusEngineState
from vuln_analysis.utils.vex.implementations.csaf_generator import CsafVexGenerator
from vuln_analysis.utils.vex.vex_generator_loader import load_vex_generator
from vuln_analysis.utils.vex.vex_utils import build_oci_image_purl


_DEFAULT_SOURCE_INFO = [
Expand Down Expand Up @@ -161,6 +162,42 @@ def test_product_tree_contains_product(self, mock_state):
product_tree = result["product_tree"]
assert _DEFAULT_PRODUCT_NAME in product_tree.get("branches")[0].get("branches")[0].get("name")
assert _DEFAULT_PRODUCT_TAG in product_tree.get("branches")[0].get("branches")[0].get("branches")[0].get("name")

def test_product_tree_includes_oci_purl(self, mock_state):
"""Test that product tree includes product_identification_helper with OCI purl."""
generator = CsafVexGenerator()
result = generator.generate(mock_state)

product = (
result["product_tree"]
.get("branches")[0]
.get("branches")[0]
.get("branches")[0]
.get("product")
)
helper = product.get("product_identification_helper", {})
assert helper.get("purl") == build_oci_image_purl(_DEFAULT_PRODUCT_NAME, _DEFAULT_PRODUCT_TAG)

def test_product_tree_purl_prefers_digest_over_tag(self):
"""Test that explicit digest is used in purl instead of tag."""
oci_digest = "sha256:abcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890"
state = create_mock_state(product_tag="v1.0.0")
state.original_input.input.image.digest = oci_digest

generator = CsafVexGenerator()
result = generator.generate(state)

product = (
result["product_tree"]
.get("branches")[0]
.get("branches")[0]
.get("branches")[0]
.get("product")
)
helper = product.get("product_identification_helper", {})
assert helper.get("purl") == build_oci_image_purl(
_DEFAULT_PRODUCT_NAME, "v1.0.0", oci_digest
)

def test_vulnerable_cve_has_known_affected_status(self, mock_state):
"""Test that vulnerable CVEs get 'known_affected' status."""
Expand All @@ -184,6 +221,26 @@ def test_not_vulnerable_cve_has_known_not_affected_status(self):
product_status = vuln.get("product_status", {})
assert "known_not_affected" in product_status

def test_code_not_reachable_maps_to_csaf_execute_path_flag(self):
"""Test that code_not_reachable maps to the CSAF execute-path flag."""
state = create_mock_state(
justification={
"justification": "Vulnerable function exists but is not called.",
"justification_label": "code_not_reachable",
},
)

generator = CsafVexGenerator()
result = generator.generate(state)

vuln = result["vulnerabilities"][0]
assert vuln["flags"][0]["label"] == "vulnerable_code_not_in_execute_path"
label_notes = [
n for n in vuln.get("notes", [])
if n.get("title") == "ExploitIQ Analysis Justification Label"
]
assert label_notes[0]["text"] == "code_not_reachable"

def test_vulnerable_cve_includes_remediation(self):
"""Test that vulnerable CVEs include remediation information when patch is available."""
ghsa = CveIntelGhsa(
Expand Down
53 changes: 52 additions & 1 deletion src/vuln_analysis/utils/vex/vex_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
from pathlib import Path

from jsonschema import Draft202012Validator
from packageurl import PackageURL

from exploit_iq_commons.data_models.cve_intel import CveIntel
from exploit_iq_commons.logging.loggers_factory import LoggingFactory
from urllib.parse import urlparse

logger = LoggingFactory.get_agent_logger(__name__)

Expand Down Expand Up @@ -58,6 +60,56 @@ def get_patched_package(vuln: dict) -> tuple[str | None, str | None]:
return pkg.get("name"), vuln.get("first_patched_version")


def build_oci_image_purl(
image_name: str,
tag: str | None = None,
digest: str | None = None,
) -> str | None:
"""
Build an OCI package URL (purl) for a container image.

Prefers an explicit digest, then falls back to the image tag.
"""
image_path = image_name
parsed = urlparse(f"//{image_path}")
registry = parsed.netloc

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gnetanel As discussed, please add the full repository url ( registry/namespace/reponame )

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zvigrinberg , PR has been updated with the required changed.

# qualifiers include registry and full name which all already exist in image_path
qualifiers = {"repository_url": image_path} if image_path else {}
path_parts = [part for part in parsed.path.strip("/").split("/") if part]
if path_parts:
if len(path_parts) > 1:
name = path_parts[-1]
namespace = "/".join(path_parts[:-1])
else:
name = path_parts[0]
namespace = None
elif parsed.netloc:
name = parsed.netloc
namespace = None
else:
name = image_path
namespace = None

version = digest or tag
# oci purl specification required to emit namespace, therefor it is set to None
purl = PackageURL(
type="oci",
namespace=None,
name=name,
version=version,
qualifiers=qualifiers if qualifiers else None,
)
logger.debug(
"Building OCI image purl components: registry=%s, qualifiers=%s, name=%s, version=%s",
registry,
qualifiers,
name,
version,
)
logger.debug("Resulting OCI image purl: %s", purl.to_string())
return purl.to_string()


def build_patch_recommendation(ci: CveIntel, sbom_package_names: set[str] | None) -> str:
"""
Build a patch recommendation string from GHSA data.
Expand Down Expand Up @@ -93,4 +145,3 @@ def build_patch_recommendation(ci: CveIntel, sbom_package_names: set[str] | None
if not name_to_version:
return ""
return ", ".join(f"{name}:{patch}" for name, patch in name_to_version.items())

Loading