Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
235 changes: 232 additions & 3 deletions iamscope/reasoner/admin_reachability.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,18 @@

import logging
from collections import deque
from typing import Any

from iamscope.constants import (
CONSTRAINT_TYPE_PERMISSION_BOUNDARY,
CONSTRAINT_TYPE_SCP,
CONSTRAINT_TYPE_TRUST_CONDITION,
NODE_TYPE_ACCOUNT_ROOT,
NODE_TYPE_IAM_ROLE,
NODE_TYPE_IAM_USER,
NODE_TYPE_WILDCARD_PRINCIPAL,
TRUST_SCOPE_ACCOUNT_ROOT,
TRUST_SCOPE_ANY_AWS_PRINCIPAL,
)
from iamscope.models import Edge, Node
from iamscope.reasoner.evidence import EvidenceBundle, TraceEntry
Expand Down Expand Up @@ -192,13 +198,21 @@ def _compute_reachability(
if trust_edge is None:
continue

# Track ambiguity: if either edge is a hyperedge witness,
# the walk has touched ambiguous ground.
# Track ambiguity: permission edges use generic wildcard/
# hyperedge checks. Trust edges use admin_reachability's
# source-aware clean-witness rule so conditioned account-root
# trust can be clean only when aws:PrincipalArn precisely
# narrows the root principal to the current source role.
hop_is_clean = True
if self._is_ambiguous_edge(perm_edge):
any_hyperedge_traversed = True
hop_is_clean = False
if self._is_ambiguous_edge(trust_edge):
current_node = self._find_node(facts, current_arn)
if not self._is_clean_assumerole_trust_witness(
facts,
trust_edge,
current_node,
):
any_hyperedge_traversed = True
hop_is_clean = False

Expand Down Expand Up @@ -281,6 +295,85 @@ def _is_ambiguous_edge(self, edge: Edge) -> bool:

return _is_unknown_witness(edge)

def _is_clean_assumerole_trust_witness(
self,
facts: FactGraph,
edge: Edge,
source: Node | None,
) -> bool:
"""Return True when a trust edge is clean for this source principal.

Generic condition handling remains conservative. The only
conditioned account-root calibration supported here is the narrow
real-pilot shape where aws:PrincipalArn explicitly lists the source
role ARN or its exact assumed-role form.
"""
if edge.edge_type != "sts:AssumeRole_trust":
return False
if source is None:
return False

features = edge.features or {}
if features.get("effect") not in (None, "", "Allow"):
return False
if features.get("parse_status") not in (None, "", "complete"):
return False
if _is_wildcard_trust_edge(edge):
return False

has_conditions = bool(features.get("has_conditions") or features.get("raw_conditions"))
if not has_conditions:
if _is_account_root_trust_edge(edge):
return False
return edge.src.provider_id == source.provider_id

if not _is_account_root_trust_edge(edge):
return False
if source.node_type != NODE_TYPE_IAM_ROLE:
return False
if not _is_iam_role_arn(source.provider_id):
return False
if _is_cross_account_unknown_org_membership(edge, source):
return False
if not _account_root_matches_source(edge.src.provider_id, source.provider_id):
return False
if not _principal_arn_conditions_match_source_role(
features.get("raw_conditions") or {},
source.provider_id,
):
return False
return self._trust_condition_bindings_are_resolved_for_clean_rule(facts, edge, source.provider_id)

def _trust_condition_bindings_are_resolved_for_clean_rule(
self,
facts: FactGraph,
edge: Edge,
source_role_arn: str,
) -> bool:
"""Reject blocking or unsupported TRUST_CONDITION bindings.

The generic trust-condition binder marks conditioned trust as
needs-review because most conditions require runtime context. For
this reasoner-local calibration, a non-blocking TRUST_CONDITION
binding is considered resolved only when its raw conditions match
the same narrow safe rule already checked on the edge.
"""
for binding in facts.bindings_for_edge(edge.edge_id):
constraint = facts.constraint_by_id(binding.constraint_id)
if constraint is None:
continue
if constraint.constraint_type != CONSTRAINT_TYPE_TRUST_CONDITION:
continue
if binding.likely_blocking:
return False
constraint_conditions = constraint.properties.get("raw_conditions") or {}
if not _principal_arn_conditions_match_source_role(
constraint_conditions,
source_role_arn,
):
return False
return True

def _is_clean_admin_witness(self, edge: Edge, target_role: Node) -> bool:
"""Return True when an admin witness is clean enough for VALIDATED.

Expand Down Expand Up @@ -834,3 +927,139 @@ def _policy_arn_from_edge(edge: Edge) -> str:
if isinstance(control_policy_arn, str) and control_policy_arn:
return control_policy_arn
return ""


def _is_wildcard_trust_edge(edge: Edge) -> bool:
features = edge.features or {}
return (
edge.src.provider_id == "*"
or edge.src.node_type == NODE_TYPE_WILDCARD_PRINCIPAL
or features.get("is_wildcard_principal") is True
or features.get("trust_scope") == TRUST_SCOPE_ANY_AWS_PRINCIPAL
)


def _is_account_root_trust_edge(edge: Edge) -> bool:
features = edge.features or {}
return (
edge.src.node_type == NODE_TYPE_ACCOUNT_ROOT
or edge.src.provider_id.endswith(":root")
or features.get("trust_scope") == TRUST_SCOPE_ACCOUNT_ROOT
)


def _is_iam_role_arn(provider_id: str) -> bool:
return provider_id.startswith("arn:aws:iam::") and ":role/" in provider_id


def _account_root_matches_source(root_provider_id: str, source_provider_id: str) -> bool:
root_account = _account_id_from_arn(root_provider_id)
source_account = _account_id_from_arn(source_provider_id)
return bool(root_account and source_account and root_account == source_account)


def _account_id_from_arn(provider_id: str) -> str:
parts = provider_id.split(":")
if len(parts) >= 5:
return parts[4]
return ""


def _role_name_from_iam_role_arn(provider_id: str) -> str:
marker = ":role/"
if marker not in provider_id:
return ""
role_path_and_name = provider_id.split(marker, 1)[1]
return role_path_and_name.rsplit("/", 1)[-1]


def _is_cross_account_unknown_org_membership(edge: Edge, source: Node) -> bool:
if not bool((edge.features or {}).get("cross_account")):
return False
return source.properties.get("org_membership_status") in (None, "unknown")


def _principal_arn_conditions_match_source_role(
raw_conditions: dict[str, Any],
source_role_arn: str,
) -> bool:
if not raw_conditions:
return False

principal_patterns: list[tuple[str, str]] = []
for operator, body in raw_conditions.items():
if operator not in {"ArnLike", "ArnEquals", "StringEquals"}:
return False
if not isinstance(body, dict):
return False
for key, value in body.items():
if key == "aws:PrincipalArn" and operator in {"ArnLike", "ArnEquals"}:
principal_patterns.extend((operator, pattern) for pattern in _condition_values(value))
continue
if key == "sts:ExternalId" and operator == "StringEquals":
continue
return False

if not principal_patterns:
return False
return any(
_principal_pattern_matches_exact_source_role(operator, pattern, source_role_arn)
for operator, pattern in principal_patterns
)


def _condition_values(value: Any) -> list[str]:
if isinstance(value, str):
return [value]
if isinstance(value, list):
return [item for item in value if isinstance(item, str)]
return []


def _principal_pattern_matches_exact_source_role(
operator: str,
pattern: str,
source_role_arn: str,
) -> bool:
if not _is_supported_principal_arn_pattern(operator, pattern):
return False
if pattern == source_role_arn:
return True
if operator != "ArnLike":
return False
return pattern == _exact_assumed_role_pattern_for_source(source_role_arn)


def _is_supported_principal_arn_pattern(operator: str, pattern: str) -> bool:
if _is_broad_principal_arn_pattern(pattern):
return False
if operator == "ArnEquals":
return _is_iam_role_arn(pattern)
if operator == "ArnLike":
return _is_iam_role_arn(pattern) or _is_exact_assumed_role_pattern(pattern)
return False


def _exact_assumed_role_pattern_for_source(source_role_arn: str) -> str:
account_id = _account_id_from_arn(source_role_arn)
role_name = _role_name_from_iam_role_arn(source_role_arn)
if not account_id or not role_name:
return ""
return f"arn:aws:sts::{account_id}:assumed-role/{role_name}/*"


def _is_exact_assumed_role_pattern(pattern: str) -> bool:
if ":assumed-role/" not in pattern:
return False
suffix = pattern.split(":assumed-role/", 1)[1]
parts = suffix.split("/")
return len(parts) == 2 and bool(parts[0]) and parts[1] == "*"


def _is_broad_principal_arn_pattern(pattern: str) -> bool:
return (
pattern.endswith(":role/*")
or ":role/*" in pattern
or pattern.endswith(":assumed-role/*")
or ":assumed-role/*" in pattern
)
Loading