diff --git a/iamscope/reasoner/admin_reachability.py b/iamscope/reasoner/admin_reachability.py index e39836a..3634fb6 100644 --- a/iamscope/reasoner/admin_reachability.py +++ b/iamscope/reasoner/admin_reachability.py @@ -43,12 +43,18 @@ import logging from collections import deque +from typing import Any from iamscope.constants import ( CONSTRAINT_TYPE_PERMISSION_BOUNDARY, CONSTRAINT_TYPE_SCP, + CONSTRAINT_TYPE_TRUST_CONDITION, + NODE_TYPE_ACCOUNT_ROOT, NODE_TYPE_IAM_ROLE, NODE_TYPE_IAM_USER, + NODE_TYPE_WILDCARD_PRINCIPAL, + TRUST_SCOPE_ACCOUNT_ROOT, + TRUST_SCOPE_ANY_AWS_PRINCIPAL, ) from iamscope.models import Edge, Node from iamscope.reasoner.evidence import EvidenceBundle, TraceEntry @@ -192,13 +198,21 @@ def _compute_reachability( if trust_edge is None: continue - # Track ambiguity: if either edge is a hyperedge witness, - # the walk has touched ambiguous ground. + # Track ambiguity: permission edges use generic wildcard/ + # hyperedge checks. Trust edges use admin_reachability's + # source-aware clean-witness rule so conditioned account-root + # trust can be clean only when aws:PrincipalArn precisely + # narrows the root principal to the current source role. hop_is_clean = True if self._is_ambiguous_edge(perm_edge): any_hyperedge_traversed = True hop_is_clean = False - if self._is_ambiguous_edge(trust_edge): + current_node = self._find_node(facts, current_arn) + if not self._is_clean_assumerole_trust_witness( + facts, + trust_edge, + current_node, + ): any_hyperedge_traversed = True hop_is_clean = False @@ -281,6 +295,85 @@ def _is_ambiguous_edge(self, edge: Edge) -> bool: return _is_unknown_witness(edge) + def _is_clean_assumerole_trust_witness( + self, + facts: FactGraph, + edge: Edge, + source: Node | None, + ) -> bool: + """Return True when a trust edge is clean for this source principal. + + Generic condition handling remains conservative. The only + conditioned account-root calibration supported here is the narrow + real-pilot shape where aws:PrincipalArn explicitly lists the source + role ARN or its exact assumed-role form. + """ + if edge.edge_type != "sts:AssumeRole_trust": + return False + if source is None: + return False + + features = edge.features or {} + if features.get("effect") not in (None, "", "Allow"): + return False + if features.get("parse_status") not in (None, "", "complete"): + return False + if _is_wildcard_trust_edge(edge): + return False + + has_conditions = bool(features.get("has_conditions") or features.get("raw_conditions")) + if not has_conditions: + if _is_account_root_trust_edge(edge): + return False + return edge.src.provider_id == source.provider_id + + if not _is_account_root_trust_edge(edge): + return False + if source.node_type != NODE_TYPE_IAM_ROLE: + return False + if not _is_iam_role_arn(source.provider_id): + return False + if _is_cross_account_unknown_org_membership(edge, source): + return False + if not _account_root_matches_source(edge.src.provider_id, source.provider_id): + return False + if not _principal_arn_conditions_match_source_role( + features.get("raw_conditions") or {}, + source.provider_id, + ): + return False + return self._trust_condition_bindings_are_resolved_for_clean_rule(facts, edge, source.provider_id) + + def _trust_condition_bindings_are_resolved_for_clean_rule( + self, + facts: FactGraph, + edge: Edge, + source_role_arn: str, + ) -> bool: + """Reject blocking or unsupported TRUST_CONDITION bindings. + + The generic trust-condition binder marks conditioned trust as + needs-review because most conditions require runtime context. For + this reasoner-local calibration, a non-blocking TRUST_CONDITION + binding is considered resolved only when its raw conditions match + the same narrow safe rule already checked on the edge. + """ + for binding in facts.bindings_for_edge(edge.edge_id): + constraint = facts.constraint_by_id(binding.constraint_id) + if constraint is None: + continue + if constraint.constraint_type != CONSTRAINT_TYPE_TRUST_CONDITION: + continue + if binding.likely_blocking: + return False + constraint_conditions = constraint.properties.get("raw_conditions") or {} + if not _principal_arn_conditions_match_source_role( + constraint_conditions, + source_role_arn, + ): + return False + return True + def _is_clean_admin_witness(self, edge: Edge, target_role: Node) -> bool: """Return True when an admin witness is clean enough for VALIDATED. @@ -834,3 +927,139 @@ def _policy_arn_from_edge(edge: Edge) -> str: if isinstance(control_policy_arn, str) and control_policy_arn: return control_policy_arn return "" + + +def _is_wildcard_trust_edge(edge: Edge) -> bool: + features = edge.features or {} + return ( + edge.src.provider_id == "*" + or edge.src.node_type == NODE_TYPE_WILDCARD_PRINCIPAL + or features.get("is_wildcard_principal") is True + or features.get("trust_scope") == TRUST_SCOPE_ANY_AWS_PRINCIPAL + ) + + +def _is_account_root_trust_edge(edge: Edge) -> bool: + features = edge.features or {} + return ( + edge.src.node_type == NODE_TYPE_ACCOUNT_ROOT + or edge.src.provider_id.endswith(":root") + or features.get("trust_scope") == TRUST_SCOPE_ACCOUNT_ROOT + ) + + +def _is_iam_role_arn(provider_id: str) -> bool: + return provider_id.startswith("arn:aws:iam::") and ":role/" in provider_id + + +def _account_root_matches_source(root_provider_id: str, source_provider_id: str) -> bool: + root_account = _account_id_from_arn(root_provider_id) + source_account = _account_id_from_arn(source_provider_id) + return bool(root_account and source_account and root_account == source_account) + + +def _account_id_from_arn(provider_id: str) -> str: + parts = provider_id.split(":") + if len(parts) >= 5: + return parts[4] + return "" + + +def _role_name_from_iam_role_arn(provider_id: str) -> str: + marker = ":role/" + if marker not in provider_id: + return "" + role_path_and_name = provider_id.split(marker, 1)[1] + return role_path_and_name.rsplit("/", 1)[-1] + + +def _is_cross_account_unknown_org_membership(edge: Edge, source: Node) -> bool: + if not bool((edge.features or {}).get("cross_account")): + return False + return source.properties.get("org_membership_status") in (None, "unknown") + + +def _principal_arn_conditions_match_source_role( + raw_conditions: dict[str, Any], + source_role_arn: str, +) -> bool: + if not raw_conditions: + return False + + principal_patterns: list[tuple[str, str]] = [] + for operator, body in raw_conditions.items(): + if operator not in {"ArnLike", "ArnEquals", "StringEquals"}: + return False + if not isinstance(body, dict): + return False + for key, value in body.items(): + if key == "aws:PrincipalArn" and operator in {"ArnLike", "ArnEquals"}: + principal_patterns.extend((operator, pattern) for pattern in _condition_values(value)) + continue + if key == "sts:ExternalId" and operator == "StringEquals": + continue + return False + + if not principal_patterns: + return False + return any( + _principal_pattern_matches_exact_source_role(operator, pattern, source_role_arn) + for operator, pattern in principal_patterns + ) + + +def _condition_values(value: Any) -> list[str]: + if isinstance(value, str): + return [value] + if isinstance(value, list): + return [item for item in value if isinstance(item, str)] + return [] + + +def _principal_pattern_matches_exact_source_role( + operator: str, + pattern: str, + source_role_arn: str, +) -> bool: + if not _is_supported_principal_arn_pattern(operator, pattern): + return False + if pattern == source_role_arn: + return True + if operator != "ArnLike": + return False + return pattern == _exact_assumed_role_pattern_for_source(source_role_arn) + + +def _is_supported_principal_arn_pattern(operator: str, pattern: str) -> bool: + if _is_broad_principal_arn_pattern(pattern): + return False + if operator == "ArnEquals": + return _is_iam_role_arn(pattern) + if operator == "ArnLike": + return _is_iam_role_arn(pattern) or _is_exact_assumed_role_pattern(pattern) + return False + + +def _exact_assumed_role_pattern_for_source(source_role_arn: str) -> str: + account_id = _account_id_from_arn(source_role_arn) + role_name = _role_name_from_iam_role_arn(source_role_arn) + if not account_id or not role_name: + return "" + return f"arn:aws:sts::{account_id}:assumed-role/{role_name}/*" + + +def _is_exact_assumed_role_pattern(pattern: str) -> bool: + if ":assumed-role/" not in pattern: + return False + suffix = pattern.split(":assumed-role/", 1)[1] + parts = suffix.split("/") + return len(parts) == 2 and bool(parts[0]) and parts[1] == "*" + + +def _is_broad_principal_arn_pattern(pattern: str) -> bool: + return ( + pattern.endswith(":role/*") + or ":role/*" in pattern + or pattern.endswith(":assumed-role/*") + or ":assumed-role/*" in pattern + ) diff --git a/tests/test_admin_reachability_reasoner.py b/tests/test_admin_reachability_reasoner.py index 842e0d0..d7fb5f2 100644 --- a/tests/test_admin_reachability_reasoner.py +++ b/tests/test_admin_reachability_reasoner.py @@ -21,9 +21,13 @@ from iamscope.constants import ( CONSTRAINT_TYPE_PERMISSION_BOUNDARY, CONSTRAINT_TYPE_SCP, + NODE_TYPE_ACCOUNT_ROOT, NODE_TYPE_IAM_ROLE, + NODE_TYPE_WILDCARD_PRINCIPAL, PROVIDER_AWS, REGION_GLOBAL, + TRUST_SCOPE_ACCOUNT_ROOT, + TRUST_SCOPE_ANY_AWS_PRINCIPAL, ) from iamscope.controls.expansion import ExpansionController from iamscope.models import Constraint, Edge, EdgeConstraint, Node @@ -270,7 +274,7 @@ def test_wildcard_trust_path_to_admin_is_inconclusive(self) -> None: assert check.state.value == "unknown" assert trust.edge_id in alice_f.evidence.edge_refs - def test_account_root_trust_path_remains_validated(self) -> None: + def test_account_root_trust_without_principalarn_is_inconclusive(self) -> None: alice = _user(_ALICE_ARN) admin = _role(_ADMIN_ARN) root_arn = "arn:aws:iam::111111\u003111111:root" @@ -285,11 +289,11 @@ def test_account_root_trust_path_remains_validated(self) -> None: findings = AdminReachabilityReasoner().run(facts) alice_f = next(f for f in findings if f.source.provider_id == _ALICE_ARN) - assert alice_f.verdict.value == "validated" + assert alice_f.verdict.value == "inconclusive" check = next( c for c in alice_f.required_checks if c.name == "at_least_one_reachable_chain_uses_clean_witnesses" ) - assert check.state.value == "pass" + assert check.state.value == "unknown" def test_unrelated_trust_still_produces_no_reachability(self) -> None: alice = _user(_ALICE_ARN) @@ -380,6 +384,76 @@ def _clean_witness_check_state(self, finding: object) -> str: ) return check.state.value + def _conditioned_root_trust_facts( + self, + *, + source_arn: str, + target_arn: str, + principal_arn_value: str | list[str] | None, + extra_conditions: dict | None = None, + wildcard_principal: bool = False, + include_external_id: bool = True, + ) -> FactGraph: + source = _role(source_arn) + target = _role(target_arn) + account_id = source_arn.split(":")[4] + root_arn = f"arn:aws:iam::{account_id}:root" + raw_conditions: dict = {} + if principal_arn_value is not None: + raw_conditions["ArnLike"] = {"aws:PrincipalArn": principal_arn_value} + if include_external_id: + raw_conditions.setdefault("StringEquals", {})["sts:ExternalId"] = "reviewed-external-id" + if extra_conditions: + for operator, body in extra_conditions.items(): + raw_conditions.setdefault(operator, {}).update(body) + assume_perm = _assume_perm_edge(src_arn=source_arn, dst_arn=target_arn) + trust_src = Node( + provider=PROVIDER_AWS, + node_type=NODE_TYPE_WILDCARD_PRINCIPAL if wildcard_principal else NODE_TYPE_ACCOUNT_ROOT, + provider_id="*" if wildcard_principal else root_arn, + properties={"account_id": account_id, "is_synthetic": True}, + ) + trust = Edge( + edge_type="sts:AssumeRole_trust", + src=trust_src.to_ref(), + dst=target.to_ref(), + region=REGION_GLOBAL, + features={ + "allow_controls": [ + { + "control_type": "TRUST", + "policy_arn": target_arn, + "statement_index": 0, + "digest": "2" * 64, + "summary": f"trust {trust_src.provider_id}", + } + ], + "cross_account": False, + "effect": "Allow", + "has_conditions": bool(raw_conditions), + "has_external_id": include_external_id, + "is_wildcard_principal": wildcard_principal, + "layer": "trust", + "principal_type": "AWS", + "raw_conditions": raw_conditions, + "statement_index": 0, + "trust_scope": TRUST_SCOPE_ANY_AWS_PRINCIPAL if wildcard_principal else TRUST_SCOPE_ACCOUNT_ROOT, + }, + ) + admin_edges, hyperedge_nodes = self._admin_edges_from_policy( + role_arn=target_arn, + policy_arn=_AWS_MANAGED_ADMINISTRATOR_ACCESS_ARN, + ) + return _make_facts( + nodes=(source, target, trust_src, *hyperedge_nodes), + edges=(assume_perm, trust, *admin_edges), + ) + + def _exact_assumed_role_pattern(self, role_arn: str) -> str: + account_id = role_arn.split(":")[4] + role_name = role_arn.rsplit("/", 1)[-1] + return f"arn:aws:sts::{account_id}:assumed-role/{role_name}/*" + def test_aws_managed_administratoraccess_is_clean_admin_witness(self) -> None: facts = self._single_hop_to_admin_with_policy(policy_arn=_AWS_MANAGED_ADMINISTRATOR_ACCESS_ARN) @@ -445,6 +519,187 @@ def test_real_pilot_shape_prodapprole_to_proddbadminrole_validates(self) -> None assert finding.verdict.value == "validated" assert self._clean_witness_check_state(finding) == "pass" + def test_conditioned_account_root_exact_source_role_principalarn_validates(self) -> None: + source_arn = "arn:aws:iam::111111\u003111111:role/ProdAppRole" + target_arn = "arn:aws:iam::111111\u003111111:role/ProdDBAdminRole" + facts = self._conditioned_root_trust_facts( + source_arn=source_arn, + target_arn=target_arn, + principal_arn_value=source_arn, + ) + + finding = self._finding_for_source(facts, source_arn) + + assert finding.verdict.value == "validated" + assert self._clean_witness_check_state(finding) == "pass" + + def test_conditioned_account_root_exact_assumed_role_pattern_validates(self) -> None: + source_arn = "arn:aws:iam::111111\u003111111:role/ProdDeployRole" + target_arn = "arn:aws:iam::111111\u003111111:role/ProdDBAdminRole" + facts = self._conditioned_root_trust_facts( + source_arn=source_arn, + target_arn=target_arn, + principal_arn_value=self._exact_assumed_role_pattern(source_arn), + ) + + finding = self._finding_for_source(facts, source_arn) + + assert finding.verdict.value == "validated" + assert self._clean_witness_check_state(finding) == "pass" + + def test_conditioned_account_root_externalid_only_stays_inconclusive(self) -> None: + source_arn = "arn:aws:iam::111111\u003111111:role/ProdAppRole" + target_arn = "arn:aws:iam::111111\u003111111:role/ProdDBAdminRole" + facts = self._conditioned_root_trust_facts( + source_arn=source_arn, + target_arn=target_arn, + principal_arn_value=None, + ) + + finding = self._finding_for_source(facts, source_arn) + + assert finding.verdict.value == "inconclusive" + assert self._clean_witness_check_state(finding) == "unknown" + + def test_conditioned_account_root_broad_role_wildcard_stays_inconclusive(self) -> None: + source_arn = "arn:aws:iam::111111\u003111111:role/ProdAppRole" + target_arn = "arn:aws:iam::111111\u003111111:role/ProdDBAdminRole" + facts = self._conditioned_root_trust_facts( + source_arn=source_arn, + target_arn=target_arn, + principal_arn_value="arn:aws:iam::111111\u003111111:role/*", + ) + + finding = self._finding_for_source(facts, source_arn) + + assert finding.verdict.value == "inconclusive" + assert self._clean_witness_check_state(finding) == "unknown" + + def test_conditioned_account_root_broad_assumed_role_wildcard_stays_inconclusive(self) -> None: + source_arn = "arn:aws:iam::111111\u003111111:role/ProdAppRole" + target_arn = "arn:aws:iam::111111\u003111111:role/ProdDBAdminRole" + facts = self._conditioned_root_trust_facts( + source_arn=source_arn, + target_arn=target_arn, + principal_arn_value="arn:aws:sts::111111\u003111111:assumed-role/*", + ) + + finding = self._finding_for_source(facts, source_arn) + + assert finding.verdict.value == "inconclusive" + assert self._clean_witness_check_state(finding) == "unknown" + + def test_conditioned_account_root_different_role_stays_inconclusive(self) -> None: + source_arn = "arn:aws:iam::111111\u003111111:role/ProdAppRole" + target_arn = "arn:aws:iam::111111\u003111111:role/ProdDBAdminRole" + facts = self._conditioned_root_trust_facts( + source_arn=source_arn, + target_arn=target_arn, + principal_arn_value="arn:aws:iam::111111\u003111111:role/OtherRole", + ) + + finding = self._finding_for_source(facts, source_arn) + + assert finding.verdict.value == "inconclusive" + assert self._clean_witness_check_state(finding) == "unknown" + + def test_conditioned_account_root_unsupported_condition_stays_inconclusive(self) -> None: + source_arn = "arn:aws:iam::111111\u003111111:role/ProdAppRole" + target_arn = "arn:aws:iam::111111\u003111111:role/ProdDBAdminRole" + facts = self._conditioned_root_trust_facts( + source_arn=source_arn, + target_arn=target_arn, + principal_arn_value=source_arn, + extra_conditions={"Bool": {"aws:MultiFactorAuthPresent": "true"}}, + ) + + finding = self._finding_for_source(facts, source_arn) + + assert finding.verdict.value == "inconclusive" + assert self._clean_witness_check_state(finding) == "unknown" + + def test_conditioned_wildcard_principal_stays_inconclusive(self) -> None: + source_arn = "arn:aws:iam::111111\u003111111:role/ProdAppRole" + target_arn = "arn:aws:iam::111111\u003111111:role/ProdDBAdminRole" + facts = self._conditioned_root_trust_facts( + source_arn=source_arn, + target_arn=target_arn, + principal_arn_value=source_arn, + wildcard_principal=True, + ) + + finding = self._finding_for_source(facts, source_arn) + + assert finding.verdict.value == "inconclusive" + assert self._clean_witness_check_state(finding) == "unknown" + + def test_conditioned_account_root_scp_blocker_still_wins(self) -> None: + source_arn = "arn:aws:iam::111111\u003111111:role/ProdAppRole" + target_arn = "arn:aws:iam::111111\u003111111:role/ProdDBAdminRole" + facts = self._conditioned_root_trust_facts( + source_arn=source_arn, + target_arn=target_arn, + principal_arn_value=source_arn, + ) + trust_edge = next(edge for edge in facts.edges if edge.edge_type == "sts:AssumeRole_trust") + scp = Constraint( + provider=PROVIDER_AWS, + constraint_type=CONSTRAINT_TYPE_SCP, + scope_type="Account", + scope_id="111111\u003111111", + policy_id="p-deny-assumerole", + statement_id="DenyAssumeRole", + region=REGION_GLOBAL, + properties={"deny_actions": ["sts:AssumeRole"], "resource_patterns": ["*"], "parse_status": "complete"}, + ) + blocked = FactGraph( + nodes=facts.nodes, + edges=facts.edges, + constraints=(scp,), + edge_constraints=( + EdgeConstraint( + edge_id=trust_edge.edge_id, + constraint_id=scp.constraint_id, + governance_confidence="complete", + likely_blocking=True, + binding_reason="SCP denies sts:AssumeRole", + ), + ), + scenario_hash=facts.scenario_hash, + edge_budget_exhausted=False, + ) + + finding = self._finding_for_source(blocked, source_arn) + + assert finding.verdict.value == "blocked" + assert finding.severity == "info" + assert finding.blockers_observed[0].kind == "scp" + + def test_real_pilot_conditioned_account_root_sources_validate_under_safe_rule(self) -> None: + target_arn = "arn:aws:iam::111111\u003111111:role/ProdDBAdminRole" + source_arns = ( + "arn:aws:iam::111111\u003111111:role/ProdAppRole", + "arn:aws:iam::111111\u003111111:role/ProdDeployRole", + "arn:aws:iam::111111\u003111111:role/ProdReadOnlyRole", + ) + principal_patterns = [ + *source_arns, + *(self._exact_assumed_role_pattern(source_arn) for source_arn in source_arns), + ] + + for source_arn in source_arns: + facts = self._conditioned_root_trust_facts( + source_arn=source_arn, + target_arn=target_arn, + principal_arn_value=principal_patterns, + ) + + finding = self._finding_for_source(facts, source_arn) + + assert finding.target.provider_id == target_arn + assert finding.verdict.value == "validated" + assert self._clean_witness_check_state(finding) == "pass" + class TestPermissionBoundaryReachability: def _boundary(self) -> Constraint: