Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 58 additions & 4 deletions iamscope/reasoner/admin_reachability.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@

_ASSUMEROLE_ACTION: str = "sts:AssumeRole"
_MAX_DEPTH: int = 4
_AWS_MANAGED_ADMINISTRATOR_ACCESS_ARN: str = "arn:aws:iam::aws:policy/AdministratorAccess"


class AdminReachabilityReasoner:
Expand Down Expand Up @@ -159,13 +160,16 @@ def _compute_reachability(
current_node,
)
if admin_witness is not None:
admin_witness_is_clean = self._is_clean_admin_witness(admin_witness, current_node)
reachable_admins.add(current_arn)
self._append_unique_edge(admin_witness_edges, admin_witness)
if path_is_clean:
if path_is_clean and admin_witness_is_clean:
clean_reachable_admins.add(current_arn)
self._append_unique_edge(clean_admin_witness_edges, admin_witness)
self._append_unique_edges(clean_proof_walk_edges, path_edges)
else:
if not admin_witness_is_clean:
any_hyperedge_traversed = True
ambiguous_reachable_admins.add(current_arn)

# Stop walking deeper if depth limit hit.
Expand Down Expand Up @@ -277,6 +281,33 @@ def _is_ambiguous_edge(self, edge: Edge) -> bool:

return _is_unknown_witness(edge)

def _is_clean_admin_witness(self, edge: Edge, target_role: Node) -> bool:
"""Return True when an admin witness is clean enough for VALIDATED.

Arbitrary wildcard admin-like policies remain conservative.
The one supported clean wildcard admin witness is the exact AWS
managed AdministratorAccess policy, represented as an Allow over
resource "*" with no conditions, attached to the role being
classified as admin-equivalent.
"""
if not edge.edge_type.endswith("_permission"):
return False
if edge.src.provider_id != target_role.provider_id:
return False
if not _is_admin_equivalent_wildcard_permission_edge(edge):
return False
if edge.features.get("effect") != "Allow":
return False
if edge.features.get("resource_pattern") != "*":
return False
if edge.features.get("has_conditions") is True:
return False
if edge.features.get("raw_conditions") not in ({}, None):
return False
if edge.features.get("parse_status") not in (None, "", "complete"):
return False
return _policy_arn_from_edge(edge) == _AWS_MANAGED_ADMINISTRATOR_ACCESS_ARN

def _append_unique_edge(self, edges: list[Edge], edge: Edge) -> None:
"""Append edge once, preserving first-seen deterministic order."""
if edge.edge_id not in {existing.edge_id for existing in edges}:
Expand Down Expand Up @@ -524,13 +555,13 @@ def _build_finding(
elif has_clean_admin_path:
check_3_reason = "all BFS paths use clean witness edges"
else:
check_3_reason = "BFS walk traversed at least one wildcard/hyperedge edge"
check_3_reason = "reachable admin path or admin witness uses wildcard/hyperedge evidence"
check_results.append(
Check(
name="at_least_one_reachable_chain_uses_clean_witnesses",
description=(
"At least one BFS path from source to a reachable admin "
"traverses only non-wildcard, non-hyperedge edges"
"uses only clean AssumeRole and admin-equivalence witnesses"
),
state=check_3_state,
evidence_refs=tuple(edge_refs),
Expand All @@ -554,7 +585,7 @@ def _build_finding(
reason=(
"clean path with ambiguous alternate walk"
if has_clean_admin_path and any_hyperedge_traversed
else ("clean walk" if has_clean_admin_path else "ambiguity in walk")
else ("clean walk" if has_clean_admin_path else "ambiguity in walk or admin witness")
),
)
)
Expand Down Expand Up @@ -780,3 +811,26 @@ def _absorb_digests(
int(ref.get("statement_index", 0)),
ref.get("summary", ""),
)


def _is_admin_equivalent_wildcard_permission_edge(edge: Edge) -> bool:
action = edge.edge_type[: -len("_permission")]
if action in ("*", "iam:*"):
return True
action_matched_via = edge.features.get("action_matched_via")
return action_matched_via in ("wildcard_star", "wildcard_iam")


def _policy_arn_from_edge(edge: Edge) -> str:
policy_arn = edge.features.get("policy_arn")
if isinstance(policy_arn, str) and policy_arn:
return policy_arn
allow_controls = edge.features.get("allow_controls")
if isinstance(allow_controls, list):
for control in allow_controls:
if not isinstance(control, dict):
continue
control_policy_arn = control.get("policy_arn")
if isinstance(control_policy_arn, str) and control_policy_arn:
return control_policy_arn
return ""

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

8 changes: 7 additions & 1 deletion tests/integration/test_identity_deny_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,14 +115,18 @@ def _permission_results(
principal_type: str,
policy_doc: dict[str, Any],
policy_name: str,
*,
policy_source: str = "inline",
policy_arn: str = "",
) -> list[PermissionParseResult]:
return parse_permission_policy(
policy_doc,
source_arn=principal_arn,
source_node_type=principal_type,
source_account_id=_ACCOUNT,
policy_source="inline",
policy_source=policy_source,
policy_name=policy_name,
policy_arn=policy_arn,
)


Expand Down Expand Up @@ -158,6 +162,8 @@ def _build_case(
NODE_TYPE_IAM_ROLE,
_allow_doc("*", "*"),
"AdminAccess",
policy_source="managed",
policy_arn="arn:aws:iam::aws:policy/AdministratorAccess",
)
)

Expand Down
146 changes: 145 additions & 1 deletion tests/test_admin_reachability_reasoner.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,17 @@

from __future__ import annotations

from iamscope.collector.passrole import build_permission_edges
from iamscope.constants import (
CONSTRAINT_TYPE_PERMISSION_BOUNDARY,
CONSTRAINT_TYPE_SCP,
NODE_TYPE_IAM_ROLE,
PROVIDER_AWS,
REGION_GLOBAL,
)
from iamscope.models import Constraint, EdgeConstraint
from iamscope.controls.expansion import ExpansionController
from iamscope.models import Constraint, Edge, EdgeConstraint, Node
from iamscope.parser.permission_policy import parse_permission_policy
from iamscope.reasoner import AdminReachabilityReasoner, FactGraph
from tests.test_assume_role_chain_reasoner import ( # noqa: I001
_ADMIN_ARN,
Expand All @@ -43,6 +47,8 @@
_wildcard_trust_edge,
)

_AWS_MANAGED_ADMINISTRATOR_ACCESS_ARN = "arn:aws:iam::aws:policy/AdministratorAccess"

# ---------------------------------------------------------------------------
# Preconditions
# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -302,6 +308,144 @@ def test_unrelated_trust_still_produces_no_reachability(self) -> None:
assert not any(f.source.provider_id == _ALICE_ARN for f in findings)


class TestAdministratorAccessCalibration:
def _admin_edges_from_policy(
self,
*,
role_arn: str,
policy_arn: str,
conditions: dict | None = None,
) -> tuple[tuple[Edge, ...], tuple[Node, ...]]:
policy: dict = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "*",
"Resource": "*",
}
],
}
if conditions is not None:
policy["Statement"][0]["Condition"] = conditions
parse_results = parse_permission_policy(
policy,
source_arn=role_arn,
source_node_type=NODE_TYPE_IAM_ROLE,
source_account_id="111111\u003111111",
policy_source="managed",
policy_name="AdministratorAccess",
policy_arn=policy_arn,
)
edges, hyperedge_nodes = build_permission_edges(
parse_results,
ExpansionController(global_mode="warn"),
known_role_arns=[role_arn],
)
return (tuple(edges), tuple(hyperedge_nodes))

def _single_hop_to_admin_with_policy(
self,
*,
source_arn: str = _ALICE_ARN,
target_arn: str = _ADMIN_ARN,
policy_arn: str = _AWS_MANAGED_ADMINISTRATOR_ACCESS_ARN,
conditions: dict | None = None,
wildcard_assume_role_permission: bool = False,
) -> FactGraph:
source = _user(source_arn) if ":user/" in source_arn else _role(source_arn)
target = _role(target_arn)
assume_perm = _assume_perm_edge(
src_arn=source_arn,
dst_arn=target_arn,
is_wildcard_resource=wildcard_assume_role_permission,
)
trust = _trust_edge(principal_arn=source_arn, target_arn=target_arn)
admin_edges, hyperedge_nodes = self._admin_edges_from_policy(
role_arn=target_arn,
policy_arn=policy_arn,
conditions=conditions,
)
return _make_facts(
nodes=(source, target, *hyperedge_nodes),
edges=(assume_perm, trust, *admin_edges),
)

def _finding_for_source(self, facts: FactGraph, source_arn: str) -> object:
return next(f for f in AdminReachabilityReasoner().run(facts) if f.source.provider_id == source_arn)

def _clean_witness_check_state(self, finding: object) -> str:
check = next(
c for c in finding.required_checks if c.name == "at_least_one_reachable_chain_uses_clean_witnesses"
)
return check.state.value

def test_aws_managed_administratoraccess_is_clean_admin_witness(self) -> None:
facts = self._single_hop_to_admin_with_policy(policy_arn=_AWS_MANAGED_ADMINISTRATOR_ACCESS_ARN)

finding = self._finding_for_source(facts, _ALICE_ARN)

assert finding.verdict.value == "validated"
assert self._clean_witness_check_state(finding) == "pass"

def test_custom_wildcard_admin_policy_remains_conservative(self) -> None:
facts = self._single_hop_to_admin_with_policy(
policy_arn="arn:aws:iam::111111\u003111111:policy/CustomAdminPolicy"
)

finding = self._finding_for_source(facts, _ALICE_ARN)

assert finding.verdict.value == "inconclusive"
assert self._clean_witness_check_state(finding) == "unknown"

def test_spoofed_administratoraccess_policy_arn_remains_conservative(self) -> None:
facts = self._single_hop_to_admin_with_policy(
policy_arn="arn:aws:iam::111111\u003111111:policy/MyAdministratorAccess"
)

finding = self._finding_for_source(facts, _ALICE_ARN)

assert finding.verdict.value == "inconclusive"
assert self._clean_witness_check_state(finding) == "unknown"

def test_conditioned_administratoraccess_witness_remains_conservative(self) -> None:
facts = self._single_hop_to_admin_with_policy(
policy_arn=_AWS_MANAGED_ADMINISTRATOR_ACCESS_ARN,
conditions={"StringEquals": {"aws:PrincipalTag/admin-review": "approved"}},
)

finding = self._finding_for_source(facts, _ALICE_ARN)

assert finding.verdict.value == "inconclusive"
assert self._clean_witness_check_state(finding) == "unknown"

def test_ambiguous_assumerole_path_to_aws_managed_administratoraccess_stays_inconclusive(self) -> None:
facts = self._single_hop_to_admin_with_policy(
policy_arn=_AWS_MANAGED_ADMINISTRATOR_ACCESS_ARN,
wildcard_assume_role_permission=True,
)

finding = self._finding_for_source(facts, _ALICE_ARN)

assert finding.verdict.value == "inconclusive"
assert self._clean_witness_check_state(finding) == "unknown"

def test_real_pilot_shape_prodapprole_to_proddbadminrole_validates(self) -> None:
source_arn = "arn:aws:iam::111111\u003111111:role/ProdAppRole"
target_arn = "arn:aws:iam::111111\u003111111:role/ProdDBAdminRole"
facts = self._single_hop_to_admin_with_policy(
source_arn=source_arn,
target_arn=target_arn,
policy_arn=_AWS_MANAGED_ADMINISTRATOR_ACCESS_ARN,
)

finding = self._finding_for_source(facts, source_arn)

assert finding.target.provider_id == target_arn
assert finding.verdict.value == "validated"
assert self._clean_witness_check_state(finding) == "pass"


class TestPermissionBoundaryReachability:
def _boundary(self) -> Constraint:
return Constraint(
Expand Down