diff --git a/msal/application.py b/msal/application.py index 084f9bf3..f98d77c1 100644 --- a/msal/application.py +++ b/msal/application.py @@ -242,6 +242,7 @@ class ClientApplication(object): ACQUIRE_TOKEN_FOR_CLIENT_ID = "730" ACQUIRE_TOKEN_BY_AUTHORIZATION_CODE_ID = "832" ACQUIRE_TOKEN_INTERACTIVE = "169" + ACQUIRE_TOKEN_BY_USER_FIC_ID = "950" GET_ACCOUNTS_ID = "902" REMOVE_ACCOUNT_ID = "903" @@ -2572,3 +2573,68 @@ def acquire_token_on_behalf_of(self, user_assertion, scopes, claims_challenge=No response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_IDP telemetry_context.update_telemetry(response) return response + + def acquire_token_by_user_federated_identity_credential( + self, scopes, assertion, username=None, user_object_id=None, + claims_challenge=None, **kwargs): + """Acquires a user-scoped token using the ``user_fic`` grant type. + + This method exchanges a federated identity credential (typically an + agent instance token from Leg 2 of the agent identity protocol) for + a user-scoped access token, enabling an agent to act on behalf of + a specific user. + + :param list[str] scopes: Scopes required by downstream API (a resource). + :param str assertion: + The federated identity credential token (e.g. the instance token + obtained from Leg 2 of the agent identity flow). + :param str username: + The target user's UPN (User Principal Name). + Mutually exclusive with ``user_object_id``. + :param str user_object_id: + The target user's Object ID. + Mutually exclusive with ``username``. + :param claims_challenge: + The claims_challenge parameter requests specific claims requested by the resource provider + in the form of a claims_challenge directive in the www-authenticate header to be + returned from the UserInfo Endpoint and/or in the ID Token and/or Access Token. + It is a string of a JSON object which contains lists of claims being requested from these locations. + + :return: A dict representing the json response from Microsoft Entra: + + - A successful response would contain "access_token" key, + - an error response would contain "error" and usually "error_description". + """ + # Input validation + if not assertion: + raise ValueError("assertion is required and must be non-empty") + if not username and not user_object_id: + raise ValueError( + "Either username or user_object_id must be provided") + if username and user_object_id: + raise ValueError( + "username and user_object_id are mutually exclusive") + + telemetry_context = self._build_telemetry_context( + self.ACQUIRE_TOKEN_BY_USER_FIC_ID) + headers = telemetry_context.generate_headers() + if username: + headers["X-AnchorMailbox"] = "upn:{}".format(username) + elif user_object_id: + headers["X-AnchorMailbox"] = "Oid:{}@{}".format( + user_object_id, self.authority.tenant) + response = _clean_up(self.client.obtain_token_by_user_fic( + scope=self._decorate_scope(scopes), + assertion=assertion, + username=username, + user_object_id=user_object_id, + headers=headers, + data=dict( + kwargs.pop("data", {}), + claims=_merge_claims_challenge_and_capabilities( + self._client_capabilities, claims_challenge)), + **kwargs)) + if "access_token" in response: + response[self._TOKEN_SOURCE] = self._TOKEN_SOURCE_IDP + telemetry_context.update_telemetry(response) + return response diff --git a/msal/oauth2cli/oauth2.py b/msal/oauth2cli/oauth2.py index 68b0e84e..4590d52d 100644 --- a/msal/oauth2cli/oauth2.py +++ b/msal/oauth2cli/oauth2.py @@ -7,6 +7,7 @@ except ImportError: from urlparse import parse_qs, urlparse, urlunparse from urllib import urlencode, quote_plus +import inspect import logging import warnings import time @@ -104,6 +105,15 @@ def __init__( or a raw JWT assertion in bytes (which we will relay to http layer). It can also be a callable (recommended), so that we will do lazy creation of an assertion. + + The callable may accept zero arguments (legacy) or one + required positional argument. Callables whose positional + parameters all have default values (e.g. + ``lambda token=token: token``) are treated as zero-arg. + When the callable declares a required positional parameter, + it will receive a dict containing ``"client_id"``, + ``"token_endpoint"``, and optionally ``"fmi_path"`` + (when an FMI path is set on the current request). client_assertion_type (str): The type of your :attr:`client_assertion` parameter. It is typically the value of :attr:`CLIENT_ASSERTION_TYPE_SAML2` or @@ -168,6 +178,41 @@ def __init__( # A workaround for requests not supporting session-wide timeout self._http_client.request, timeout=timeout) + @staticmethod + def _accepts_context(func): + """Check if a callable requires at least one positional argument. + + Returns True only when the callable has a positional parameter + **without** a default value. This ensures that legacy zero-arg + callables — including ``lambda token=token: token`` patterns + where every positional param has a default — are still invoked + with no arguments. + """ + try: + sig = inspect.signature(func) + for p in sig.parameters.values(): + if p.kind in ( + inspect.Parameter.POSITIONAL_ONLY, + inspect.Parameter.POSITIONAL_OR_KEYWORD, + ) and p.default is inspect.Parameter.empty: + return True + return False + except (ValueError, TypeError): + return False # Signature not inspectable; treat as zero-arg + + def _invoke_assertion_callable(self, assertion_callable, data=None): + """Invoke an assertion callable, passing context if it accepts one.""" + if self._accepts_context(assertion_callable): + context = { + "client_id": self.client_id, + "token_endpoint": self.configuration.get( + "token_endpoint", ""), + } + if data and data.get("fmi_path"): + context["fmi_path"] = data["fmi_path"] + return assertion_callable(context) + return assertion_callable() + def _build_auth_request_params(self, response_type, **kwargs): # response_type is a string defined in # https://tools.ietf.org/html/rfc6749#section-3.1.1 @@ -198,11 +243,11 @@ def _obtain_token( # The verb "obtain" is influenced by OAUTH2 RFC 6749 # See https://tools.ietf.org/html/rfc7521#section-4.2 encoder = self.client_assertion_encoders.get( self.default_body["client_assertion_type"], lambda a: a) - _data["client_assertion"] = encoder( - self.client_assertion() # Do lazy on-the-fly computation - if callable(self.client_assertion) else self.client_assertion - ) # The type is bytes, which is preferable. See also: - # https://github.com/psf/requests/issues/4503#issuecomment-455001070 + if callable(self.client_assertion): + raw = self._invoke_assertion_callable(self.client_assertion, data) + else: + raw = self.client_assertion + _data["client_assertion"] = encoder(raw) _data.update(self.default_body) # It may contain authen parameters _data.update(data or {}) # So the content in data param prevails @@ -770,6 +815,34 @@ class initialization. data.update(scope=scope) return self._obtain_token("client_credentials", data=data, **kwargs) + def obtain_token_by_user_fic( + self, scope, assertion, username=None, user_object_id=None, + **kwargs): + """Obtain token using the ``user_fic`` grant type. + + This exchanges a federated identity credential (e.g. an agent + instance token) for a user-scoped access token. + + :param scope: Scopes for the target resource (already decorated + with OIDC scopes by the caller). + :param str assertion: The federated identity credential token. + :param str username: The target user's UPN (mutually exclusive + with *user_object_id*). + :param str user_object_id: The target user's Object ID (mutually + exclusive with *username*). + """ + data = kwargs.pop("data", {}) + data.update( + scope=scope, + user_federated_identity_credential=assertion, + client_info="1", + ) + if user_object_id: + data["user_id"] = str(user_object_id) + elif username: + data["username"] = username + return self._obtain_token("user_fic", data=data, **kwargs) + def __init__(self, server_configuration, client_id, on_obtaining_tokens=lambda event: None, # event is defined in _obtain_token(...) diff --git a/msal/throttled_http_client.py b/msal/throttled_http_client.py index 7c64fbf6..8cabe261 100644 --- a/msal/throttled_http_client.py +++ b/msal/throttled_http_client.py @@ -126,7 +126,8 @@ def __init__(self, *args, default_throttle_time=None, **kwargs): # TODO: We may want to disable it for confidential client, though _extract_data(kwargs, "refresh_token", # "account" during refresh _extract_data(kwargs, "code", # "account" of auth code grant - _extract_data(kwargs, "username")))), # "account" of ROPC + _extract_data(kwargs, "username", # "account" of ROPC + _extract_data(kwargs, "user_id"))))), # "account" of user_fic (OID path) ), expires_in=RetryAfterParser(default_throttle_time or 5).parse, )(self.post) diff --git a/msal/token_cache.py b/msal/token_cache.py index d6e2a2b1..0ca250df 100644 --- a/msal/token_cache.py +++ b/msal/token_cache.py @@ -65,6 +65,11 @@ "token_type", "req_cnf", "key_id", + # user_fic grant parameters — these are standard body params for the + # user_fic flow; FIC tokens use normal user cache keys (not extended). + "user_federated_identity_credential", + "user_id", + "client_info", }) @@ -301,6 +306,7 @@ def make_clean_copy(dictionary, sensitive_fields): # Masks sensitive info event, data=make_clean_copy(event.get("data", {}), ( "password", "client_secret", "refresh_token", "assertion", + "user_federated_identity_credential", )), response=make_clean_copy(event.get("response", {}), ( "id_token_claims", # Provided by broker @@ -410,7 +416,7 @@ def __add(self, event, now=None): } grant_types_that_establish_an_account = ( _GRANT_TYPE_BROKER, "authorization_code", "password", - Client.DEVICE_FLOW["GRANT_TYPE"]) + Client.DEVICE_FLOW["GRANT_TYPE"], "user_fic") if event.get("grant_type") in grant_types_that_establish_an_account: account["account_source"] = event["grant_type"] self.modify(self.CredentialType.ACCOUNT, account, account) diff --git a/tests/test_agentic_e2e.py b/tests/test_agentic_e2e.py new file mode 100644 index 00000000..153d7fb7 --- /dev/null +++ b/tests/test_agentic_e2e.py @@ -0,0 +1,291 @@ +"""End-to-end tests for agentic (agent identity) scenarios. + +These tests verify the full agent identity flow using MSAL Python APIs: +1. Assertion callback context propagation (fmi_path flows to callback) +2. Agent app-only token acquisition using FMI-sourced client assertion (Leg 2) +3. Full 3-leg flow: FMI → assertion → user_fic → user-scoped token +4. Cache isolation between app-only and user-scoped tokens + +Test configuration uses the same lab infrastructure as test_fmi_e2e.py. +Requires LAB_APP_CLIENT_CERT_PFX_PATH environment variable. +""" + +import logging +import os +import sys +import unittest + +import msal +from tests.http_client import MinimalHttpClient +from tests.lab_config import get_client_certificate +from tests.test_e2e import LabBasedTestCase + +logger = logging.getLogger(__name__) +logging.basicConfig(level=logging.DEBUG if "-v" in sys.argv else logging.INFO) + +# ============================================================================= +# Test configuration — shared lab app registrations for agentic scenarios +# ============================================================================= +_TENANT_ID = "10c419d4-4a50-45b2-aa4e-919fb84df24f" +_BLUEPRINT_CLIENT_ID = "aab5089d-e764-47e3-9f28-cc11c2513821" +_AGENT_APP_ID = "ab18ca07-d139-4840-8b3b-4be9610c6ed5" +_RMA_CLIENT_ID = "3bf56293-fbb5-42bd-a407-248ba7431a8c" +_USER_UPN = "agentuser1@id4slab1.onmicrosoft.com" +_TOKEN_EXCHANGE_SCOPE = "api://AzureADTokenExchange/.default" +_FMI_EXCHANGE_SCOPE = "api://AzureFMITokenExchange/.default" +_GRAPH_SCOPE = "https://graph.microsoft.com/.default" +_FMI_PATH = "SomeFmiPath/FmiCredentialPath" +_AUTHORITY = "https://login.microsoftonline.com/" + _TENANT_ID + + +# ============================================================================= +# Helpers +# ============================================================================= + +def _acquire_fmi_credential_for_agent(agent_app_id): + """Leg 1: Blueprint app acquires FMI credential (T1) for the given agent. + + Uses certificate authentication with SNI (sendX5C) and fmi_path set to + the agent app ID. + """ + blueprint_app = msal.ConfidentialClientApplication( + _BLUEPRINT_CLIENT_ID, + client_credential=get_client_certificate(), + authority=_AUTHORITY, + http_client=MinimalHttpClient(), + ) + result = blueprint_app.acquire_token_for_client( + [_TOKEN_EXCHANGE_SCOPE], fmi_path=agent_app_id) + if "access_token" not in result: + raise RuntimeError( + "Leg 1 failed — could not acquire FMI credential: {}: {}".format( + result.get("error"), result.get("error_description"))) + return result["access_token"] + + +def _acquire_fmi_credential_from_rma(): + """Acquire an FMI credential from the RMA app using certificate credentials. + + Uses the same RMA pattern as test_fmi_e2e._get_fmi_credential_from_rma(). + Used for assertion callback context tests where the callback just needs to + return a valid FMI token (not specifically for an agent app). + """ + rma_app = msal.ConfidentialClientApplication( + _RMA_CLIENT_ID, + client_credential=get_client_certificate(), + authority=_AUTHORITY, + http_client=MinimalHttpClient(), + ) + result = rma_app.acquire_token_for_client( + [_FMI_EXCHANGE_SCOPE], fmi_path=_FMI_PATH) + if "access_token" not in result: + raise RuntimeError( + "RMA FMI credential acquisition failed: {}: {}".format( + result.get("error"), result.get("error_description"))) + return result["access_token"] + + +def _acquire_instance_token_for_agent(): + """Leg 1 + Leg 2: Acquire an instance token (T2) for the agent app. + + 1. Blueprint → T1 (FMI credential via fmi_path) + 2. Agent uses T1 as client_assertion → T2 (instance token) + + T2 is used as user_federated_identity_credential in Leg 3 (user_fic). + """ + t1 = _acquire_fmi_credential_for_agent(_AGENT_APP_ID) + + agent_app = msal.ConfidentialClientApplication( + _AGENT_APP_ID, + client_credential={"client_assertion": t1}, + authority=_AUTHORITY, + http_client=MinimalHttpClient(), + ) + result = agent_app.acquire_token_for_client([_TOKEN_EXCHANGE_SCOPE]) + if "access_token" not in result: + raise RuntimeError( + "Leg 2 failed — could not acquire instance token: {}: {}".format( + result.get("error"), result.get("error_description"))) + return result["access_token"] + + +# ============================================================================= +# Tests +# ============================================================================= + +class TestAssertionCallbackContext(LabBasedTestCase): + """Verify assertion callback receives correct context when fmi_path is set.""" + + def test_assertion_callback_receives_fmi_path(self): + captured_context = {} + + def assertion_callback(context): + captured_context.update(context) + return _acquire_fmi_credential_from_rma() + + app = msal.ConfidentialClientApplication( + "urn:microsoft:identity:fmi", + client_credential={"client_assertion": assertion_callback}, + authority=_AUTHORITY, + http_client=MinimalHttpClient(), + ) + + result = app.acquire_token_for_client( + [_FMI_EXCHANGE_SCOPE], fmi_path=_AGENT_APP_ID) + self.assertIn("access_token", result, + "acquire_token_for_client failed: {}: {}".format( + result.get("error"), result.get("error_description"))) + + # Verify context was passed to callback + self.assertEqual(_AGENT_APP_ID, captured_context.get("fmi_path"), + "fmi_path should flow to assertion callback context") + self.assertEqual("urn:microsoft:identity:fmi", captured_context.get("client_id"), + "client_id should be in assertion callback context") + self.assertTrue(captured_context.get("token_endpoint"), + "token_endpoint should be in assertion callback context") + + +class TestAgentAppToken(LabBasedTestCase): + """Agent acquires app-only token for Graph using FMI-sourced assertion. + + Flow: Blueprint → T1 (assertion callback) → Agent CCA → app token + + Disabled in CI: The blueprint app (aab5089d) requires SNI authentication, + but the CI pipeline's PFX-based cert loading does not include intermediate + certs in the x5c chain, causing AADSTS700027. These tests pass locally + where the OS cert store can resolve the chain. + """ + + @unittest.skipUnless( + os.environ.get("MSAL_RUN_LOCAL_ONLY_TESTS"), + "Requires local cert store for SNI — set MSAL_RUN_LOCAL_ONLY_TESTS=1") + def test_agent_gets_app_token_for_graph(self): + def assertion_provider(context): + return _acquire_fmi_credential_for_agent(_AGENT_APP_ID) + + agent_app = msal.ConfidentialClientApplication( + _AGENT_APP_ID, + client_credential={"client_assertion": assertion_provider}, + authority=_AUTHORITY, + http_client=MinimalHttpClient(), + ) + + result = agent_app.acquire_token_for_client([_GRAPH_SCOPE]) + self.assertIn("access_token", result, + "Agent app token acquisition failed: {}: {}".format( + result.get("error"), result.get("error_description"))) + self.assertTrue(result["access_token"], + "Access token should not be empty") + + +class TestAgentUserIdentity(LabBasedTestCase): + """Full 3-leg agent identity flow: FMI → assertion → user_fic → user token. + + Flow: + 1. Blueprint → T1 (FMI credential) + 2. Agent uses T1 → T2 (instance token) + 3. Agent exchanges T2 via user_fic → user-scoped Graph token + 4. Verify token is cached and retrievable via acquire_token_silent + + Disabled in CI: see TestAgentAppToken docstring. + """ + + @unittest.skipUnless( + os.environ.get("MSAL_RUN_LOCAL_ONLY_TESTS"), + "Requires local cert store for SNI — set MSAL_RUN_LOCAL_ONLY_TESTS=1") + def test_agent_user_identity_gets_token_for_graph(self): + # Get instance token (T2) for user_fic exchange + t2 = _acquire_instance_token_for_agent() + + # Build agent CCA with assertion callback + def assertion_provider(context): + return _acquire_fmi_credential_for_agent(_AGENT_APP_ID) + + agent_app = msal.ConfidentialClientApplication( + _AGENT_APP_ID, + client_credential={"client_assertion": assertion_provider}, + authority=_AUTHORITY, + http_client=MinimalHttpClient(), + ) + + # Exchange T2 for user-scoped token via user_fic grant + result = agent_app.acquire_token_by_user_federated_identity_credential( + [_GRAPH_SCOPE], assertion=t2, username=_USER_UPN) + self.assertIn("access_token", result, + "user_fic token acquisition failed: {}: {}".format( + result.get("error"), result.get("error_description"))) + self.assertTrue(result["access_token"], + "Access token should not be empty") + + # Verify account was created + accounts = agent_app.get_accounts() + self.assertTrue(len(accounts) > 0, + "Account should be created from user_fic response") + + # Verify silent retrieval works (token should be cached) + account = accounts[0] + silent_result = agent_app.acquire_token_silent( + [_GRAPH_SCOPE], account=account) + self.assertIsNotNone(silent_result, + "acquire_token_silent should return cached token") + self.assertIn("access_token", silent_result) + self.assertEqual(result["access_token"], silent_result["access_token"], + "Silent call should return the same cached token") + + +class TestAgentCacheIsolation(LabBasedTestCase): + """App-only and user-scoped tokens are isolated in cache on the same CCA. + + Disabled in CI: see TestAgentAppToken docstring. + """ + + @unittest.skipUnless( + os.environ.get("MSAL_RUN_LOCAL_ONLY_TESTS"), + "Requires local cert store for SNI — set MSAL_RUN_LOCAL_ONLY_TESTS=1") + def test_app_and_user_tokens_are_isolated(self): + def assertion_provider(context): + return _acquire_fmi_credential_for_agent(_AGENT_APP_ID) + + agent_app = msal.ConfidentialClientApplication( + _AGENT_APP_ID, + client_credential={"client_assertion": assertion_provider}, + authority=_AUTHORITY, + http_client=MinimalHttpClient(), + ) + + # Acquire app-only token + app_result = agent_app.acquire_token_for_client([_GRAPH_SCOPE]) + self.assertIn("access_token", app_result, + "App token acquisition failed: {}: {}".format( + app_result.get("error"), app_result.get("error_description"))) + + # Acquire user token via user_fic + t2 = _acquire_instance_token_for_agent() + user_result = agent_app.acquire_token_by_user_federated_identity_credential( + [_GRAPH_SCOPE], assertion=t2, username=_USER_UPN) + self.assertIn("access_token", user_result, + "User token acquisition failed: {}: {}".format( + user_result.get("error"), user_result.get("error_description"))) + + # Tokens should be different (app-scoped vs user-scoped) + self.assertNotEqual(app_result["access_token"], user_result["access_token"], + "App token and user token should be different") + + # Verify both are independently retrievable + # App token: second call should return from cache + app_cached = agent_app.acquire_token_for_client([_GRAPH_SCOPE]) + self.assertEqual("cache", app_cached.get("token_source"), + "App token should be returned from cache on second call") + self.assertEqual(app_result["access_token"], app_cached["access_token"]) + + # User token: silent call should return from cache + accounts = agent_app.get_accounts() + self.assertTrue(len(accounts) > 0) + user_cached = agent_app.acquire_token_silent( + [_GRAPH_SCOPE], account=accounts[0]) + self.assertIsNotNone(user_cached) + self.assertEqual(user_result["access_token"], user_cached["access_token"]) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_application.py b/tests/test_application.py index 54da96c0..8d278b45 100644 --- a/tests/test_application.py +++ b/tests/test_application.py @@ -1,5 +1,6 @@ # Note: Since Aug 2019 we move all e2e tests into test_e2e.py, # so this test_application file contains only unit tests without dependency. +import base64 import json import logging import sys @@ -1090,4 +1091,405 @@ def mock_post(url, headers=None, *args, **kwargs): result = app.acquire_token_for_client([scope]) self.assertEqual(result[app._TOKEN_SOURCE], app._TOKEN_SOURCE_CACHE) self.assertEqual("AT_with_valid_scope1_valid_scope2_scopes", result.get("access_token")) - self.assertIsNone(result.get("scope"), "scope field is not returned when token comes from cache") \ No newline at end of file + self.assertIsNone(result.get("scope"), "scope field is not returned when token comes from cache") + + +def _build_user_fic_response(uid="user_oid", utid="tenant_id", access_token="user_at"): + """Build a mock user_fic response with client_info and id_token.""" + client_info = base64.b64encode(json.dumps({ + "uid": uid, "utid": utid, + }).encode()).decode("utf-8") + id_token_claims = { + "iss": "https://login.microsoftonline.com/tenant_id/v2.0", + "sub": "subject", + "aud": "agent_app_id", + "exp": time.time() + 3600, + "iat": time.time(), + "oid": uid, + "preferred_username": "user@contoso.com", + "tid": utid, + } + id_token = "header.%s.signature" % base64.b64encode( + json.dumps(id_token_claims).encode()).decode("utf-8") + return json.dumps({ + "access_token": access_token, + "expires_in": 3600, + "token_type": "Bearer", + "client_info": client_info, + "id_token": id_token, + "refresh_token": "a_refresh_token", + }) + + +@patch(_OIDC_DISCOVERY, new=_OIDC_DISCOVERY_MOCK) +class TestUserFicProtocol(unittest.TestCase): + """Tests that acquire_token_by_user_federated_identity_credential sends correct POST body.""" + + def _make_app(self): + return ConfidentialClientApplication( + "agent_app_id", client_credential="secret", + authority="https://login.microsoftonline.com/my_tenant") + + def test_sends_correct_grant_type_and_params(self): + app = self._make_app() + captured_data = {} + + def mock_post(url, headers=None, data=None, *args, **kwargs): + captured_data.update(data or {}) + return MinimalResponse(status_code=200, text=_build_user_fic_response()) + + result = app.acquire_token_by_user_federated_identity_credential( + ["https://graph.microsoft.com/.default"], + assertion="instance_token_t2", + username="user@contoso.com", + post=mock_post) + self.assertIn("access_token", result) + self.assertEqual("user_fic", captured_data.get("grant_type")) + self.assertEqual("instance_token_t2", + captured_data.get("user_federated_identity_credential")) + self.assertEqual("1", captured_data.get("client_info")) + self.assertEqual("agent_app_id", captured_data.get("client_id")) + + def test_scope_includes_oidc_scopes(self): + app = self._make_app() + captured_data = {} + + def mock_post(url, headers=None, data=None, *args, **kwargs): + captured_data.update(data or {}) + return MinimalResponse(status_code=200, text=_build_user_fic_response()) + + app.acquire_token_by_user_federated_identity_credential( + ["https://graph.microsoft.com/.default"], + assertion="t2", username="user@contoso.com", post=mock_post) + scope_str = captured_data.get("scope", "") + for oidc_scope in ("openid", "offline_access", "profile"): + self.assertIn(oidc_scope, scope_str, + "OIDC scope '{}' should be present".format(oidc_scope)) + + def test_with_username_sends_username_not_user_id(self): + app = self._make_app() + captured_data = {} + + def mock_post(url, headers=None, data=None, *args, **kwargs): + captured_data.update(data or {}) + return MinimalResponse(status_code=200, text=_build_user_fic_response()) + + app.acquire_token_by_user_federated_identity_credential( + ["scope"], assertion="t2", username="user@contoso.com", post=mock_post) + self.assertEqual("user@contoso.com", captured_data.get("username")) + self.assertNotIn("user_id", captured_data, + "user_id should NOT be in body when username is provided") + + def test_with_oid_sends_user_id_not_username(self): + app = self._make_app() + captured_data = {} + + def mock_post(url, headers=None, data=None, *args, **kwargs): + captured_data.update(data or {}) + return MinimalResponse(status_code=200, text=_build_user_fic_response()) + + app.acquire_token_by_user_federated_identity_credential( + ["scope"], assertion="t2", + user_object_id="00000000-0000-0000-0000-000000000001", + post=mock_post) + self.assertEqual("00000000-0000-0000-0000-000000000001", + captured_data.get("user_id")) + self.assertNotIn("username", captured_data, + "username should NOT be in body when user_object_id is provided") + + def test_ccs_routing_header_with_username(self): + app = self._make_app() + captured_headers = {} + + def mock_post(url, headers=None, data=None, *args, **kwargs): + captured_headers.update(headers or {}) + return MinimalResponse(status_code=200, text=_build_user_fic_response()) + + app.acquire_token_by_user_federated_identity_credential( + ["scope"], assertion="t2", username="user@contoso.com", post=mock_post) + self.assertEqual("upn:user@contoso.com", + captured_headers.get("X-AnchorMailbox"), + "CCS routing header should use UPN format for username path") + + def test_ccs_routing_header_with_oid(self): + app = self._make_app() + captured_headers = {} + + def mock_post(url, headers=None, data=None, *args, **kwargs): + captured_headers.update(headers or {}) + return MinimalResponse(status_code=200, text=_build_user_fic_response()) + + app.acquire_token_by_user_federated_identity_credential( + ["scope"], assertion="t2", + user_object_id="user_oid_123", post=mock_post) + self.assertIn("X-AnchorMailbox", captured_headers, + "CCS routing header should be present for OID path") + self.assertTrue( + captured_headers["X-AnchorMailbox"].startswith("Oid:"), + "CCS routing header should use Oid format for user_object_id path") + + +@patch(_OIDC_DISCOVERY, new=_OIDC_DISCOVERY_MOCK) +class TestUserFicCacheBehavior(unittest.TestCase): + """Tests that user_fic tokens are stored in user cache with account info.""" + + def _make_app(self): + return ConfidentialClientApplication( + "agent_app_id", client_credential="secret", + authority="https://login.microsoftonline.com/my_tenant") + + def test_token_stored_in_user_cache_with_account(self): + app = self._make_app() + + def mock_post(url, headers=None, data=None, *args, **kwargs): + return MinimalResponse(status_code=200, text=_build_user_fic_response( + uid="user_oid", utid="tenant_id", access_token="fic_at")) + + result = app.acquire_token_by_user_federated_identity_credential( + ["https://graph.microsoft.com/.default"], + assertion="t2", username="user@contoso.com", post=mock_post) + self.assertIn("access_token", result) + + # Verify the account was created + accounts = app.get_accounts() + self.assertTrue(len(accounts) > 0, "Account should be created from user_fic response") + account = accounts[0] + self.assertEqual("user_oid.tenant_id", account["home_account_id"]) + + def test_token_not_stored_as_atext(self): + """user_fic tokens should use standard AccessToken type, not atext.""" + app = self._make_app() + + def mock_post(url, headers=None, data=None, *args, **kwargs): + return MinimalResponse(status_code=200, text=_build_user_fic_response()) + + app.acquire_token_by_user_federated_identity_credential( + ["https://graph.microsoft.com/.default"], + assertion="t2", username="user@contoso.com", post=mock_post) + + # Check the raw cache for credential type + at_entries = list(app.token_cache.search( + msal.TokenCache.CredentialType.ACCESS_TOKEN, query={})) + self.assertTrue(len(at_entries) > 0, "AT should be cached") + self.assertNotIn("ext_cache_key", at_entries[0], + "user_fic tokens should NOT have ext_cache_key") + + def test_acquire_token_silent_returns_cached_fic_token(self): + app = self._make_app() + + def mock_post(url, headers=None, data=None, *args, **kwargs): + return MinimalResponse(status_code=200, text=_build_user_fic_response( + uid="user_oid", utid="tenant_id", access_token="cached_fic_at")) + + app.acquire_token_by_user_federated_identity_credential( + ["https://graph.microsoft.com/.default"], + assertion="t2", username="user@contoso.com", post=mock_post) + + accounts = app.get_accounts() + self.assertTrue(len(accounts) > 0) + + # Silent call should return cached token without hitting network + silent_result = app.acquire_token_silent( + ["https://graph.microsoft.com/.default"], account=accounts[0]) + self.assertIn("access_token", silent_result) + self.assertEqual("cached_fic_at", silent_result["access_token"]) + + def test_oid_path_token_stored_and_retrievable_via_silent(self): + """user_fic with user_object_id should cache and retrieve like username.""" + app = self._make_app() + + def mock_post(url, headers=None, data=None, *args, **kwargs): + return MinimalResponse(status_code=200, text=_build_user_fic_response( + uid="user_oid", utid="tenant_id", access_token="oid_fic_at")) + + result = app.acquire_token_by_user_federated_identity_credential( + ["https://graph.microsoft.com/.default"], + assertion="t2", user_object_id="user_oid", post=mock_post) + self.assertIn("access_token", result) + + # Verify no ext_cache_key on cached token + at_entries = list(app.token_cache.search( + msal.TokenCache.CredentialType.ACCESS_TOKEN, query={})) + self.assertTrue(len(at_entries) > 0, "AT should be cached") + self.assertNotIn("ext_cache_key", at_entries[0], + "OID-path user_fic tokens should NOT have ext_cache_key") + + # Verify account and silent retrieval + accounts = app.get_accounts() + self.assertTrue(len(accounts) > 0) + silent_result = app.acquire_token_silent( + ["https://graph.microsoft.com/.default"], account=accounts[0]) + self.assertIn("access_token", silent_result) + self.assertEqual("oid_fic_at", silent_result["access_token"]) + + def test_account_source_is_set_to_user_fic(self): + """Accounts created by user_fic should have account_source set.""" + app = self._make_app() + + def mock_post(url, headers=None, data=None, *args, **kwargs): + return MinimalResponse(status_code=200, text=_build_user_fic_response( + uid="user_oid", utid="tenant_id")) + + app.acquire_token_by_user_federated_identity_credential( + ["https://graph.microsoft.com/.default"], + assertion="t2", username="user@contoso.com", post=mock_post) + + accounts = app.get_accounts() + self.assertTrue(len(accounts) > 0) + self.assertEqual("user_fic", accounts[0].get("account_source"), + "FIC accounts should have account_source='user_fic' to avoid " + "broker path misrouting") + + +@patch(_OIDC_DISCOVERY, new=_OIDC_DISCOVERY_MOCK) +class TestUserFicInputValidation(unittest.TestCase): + """Tests that input validation rejects invalid parameters.""" + + def _make_app(self): + return ConfidentialClientApplication( + "agent_app_id", client_credential="secret", + authority="https://login.microsoftonline.com/my_tenant") + + def test_empty_assertion_raises(self): + app = self._make_app() + with self.assertRaises(ValueError): + app.acquire_token_by_user_federated_identity_credential( + ["scope"], assertion="", username="user@contoso.com") + + def test_none_assertion_raises(self): + app = self._make_app() + with self.assertRaises(ValueError): + app.acquire_token_by_user_federated_identity_credential( + ["scope"], assertion=None, username="user@contoso.com") + + def test_no_user_identifier_raises(self): + app = self._make_app() + with self.assertRaises(ValueError): + app.acquire_token_by_user_federated_identity_credential( + ["scope"], assertion="t2") + + def test_both_user_identifiers_raises(self): + app = self._make_app() + with self.assertRaises(ValueError): + app.acquire_token_by_user_federated_identity_credential( + ["scope"], assertion="t2", + username="user@contoso.com", + user_object_id="oid-123") + + def test_reserved_scopes_rejected(self): + app = self._make_app() + with self.assertRaises(ValueError): + app.acquire_token_by_user_federated_identity_credential( + ["openid"], assertion="t2", username="user@contoso.com") + + +@patch(_OIDC_DISCOVERY, new=_OIDC_DISCOVERY_MOCK) +class TestAssertionCallbackContext(unittest.TestCase): + """Tests that assertion callbacks receive context when they accept arguments.""" + + def test_context_aware_callback_receives_fmi_path(self): + received_context = {} + + def assertion_with_context(context): + received_context.update(context) + return "assertion_value" + + app = ConfidentialClientApplication( + "client_id", + client_credential={"client_assertion": assertion_with_context}, + authority="https://login.microsoftonline.com/my_tenant") + + app.acquire_token_for_client( + ["scope"], fmi_path="agent_app_123", + post=lambda url, **kwargs: MinimalResponse( + status_code=200, text=json.dumps({ + "access_token": "an_at", "expires_in": 3600}))) + + self.assertEqual("client_id", received_context.get("client_id")) + self.assertIn("token_endpoint", received_context) + self.assertEqual("agent_app_123", received_context.get("fmi_path")) + + def test_context_aware_callback_omits_fmi_path_when_not_set(self): + received_context = {} + + def assertion_with_context(context): + received_context.update(context) + return "assertion_value" + + app = ConfidentialClientApplication( + "client_id", + client_credential={"client_assertion": assertion_with_context}, + authority="https://login.microsoftonline.com/my_tenant") + + app.acquire_token_for_client( + ["scope"], + post=lambda url, **kwargs: MinimalResponse( + status_code=200, text=json.dumps({ + "access_token": "an_at", "expires_in": 3600}))) + + self.assertEqual("client_id", received_context.get("client_id")) + self.assertNotIn("fmi_path", received_context) + + def test_legacy_zero_arg_callback_still_works(self): + call_count = [0] + + def legacy_callback(): + call_count[0] += 1 + return "legacy_assertion" + + app = ConfidentialClientApplication( + "client_id", + client_credential={"client_assertion": legacy_callback}, + authority="https://login.microsoftonline.com/my_tenant") + + result = app.acquire_token_for_client( + ["scope"], + post=lambda url, **kwargs: MinimalResponse( + status_code=200, text=json.dumps({ + "access_token": "an_at", "expires_in": 3600}))) + + self.assertIn("access_token", result) + self.assertEqual(1, call_count[0], "Legacy callback should be invoked once") + + def test_context_callback_type_error_not_swallowed(self): + """If a one-arg callback raises TypeError internally, it should propagate.""" + def buggy_callback(context): + raise TypeError("Bug inside callback") + + app = ConfidentialClientApplication( + "client_id", + client_credential={"client_assertion": buggy_callback}, + authority="https://login.microsoftonline.com/my_tenant") + + with self.assertRaises(TypeError, msg="Internal TypeError should propagate"): + app.acquire_token_for_client( + ["scope"], + post=lambda url, **kwargs: MinimalResponse( + status_code=200, text=json.dumps({ + "access_token": "an_at", "expires_in": 3600}))) + + def test_lambda_with_defaulted_param_treated_as_zero_arg(self): + """A lambda like ``lambda token=token: token`` should be treated as + zero-arg because all its positional params have defaults.""" + captured_value = "my_assertion_value" + assertion_callable = lambda token=captured_value: token # noqa: E731 + + app = ConfidentialClientApplication( + "client_id", + client_credential={"client_assertion": assertion_callable}, + authority="https://login.microsoftonline.com/my_tenant") + + captured_data = {} + def mock_post(url, headers=None, data=None, *args, **kwargs): + captured_data.update(data or {}) + return MinimalResponse( + status_code=200, text=json.dumps({ + "access_token": "an_at", "expires_in": 3600})) + + result = app.acquire_token_for_client(["scope"], post=mock_post) + self.assertIn("access_token", result) + # The assertion should be the string value, not a dict context object + self.assertEqual( + captured_value, captured_data.get("client_assertion"), + "Lambda with defaulted params should return its default value, " + "not receive a context dict") \ No newline at end of file