diff --git a/CHANGELOG.md b/CHANGELOG.md index 5b0105f44..0250e6fe0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,7 @@ All notable changes to this project will be documented in this file. - vector: Look for SBOM in correct location ([#1471]). - vector: Use correct license ([#1476]). - trino: Build a patched Airlift from source and depend on it to backport [airlift/airlift#1943](https://github.com/airlift/airlift/pull/1943), applying the configured max response header size to Jetty's `maxResponseHeaderSize` ([#1510]). +- airflow: Route DAG listings and menu items through OPA in the Airflow 3 OPA auth manager, and wire the OPA cache on the FastAPI api-server init path ([#1512]). ### Removed @@ -42,6 +43,7 @@ All notable changes to this project will be documented in this file. [#1493]: https://github.com/stackabletech/docker-images/pull/1493 [#1509]: https://github.com/stackabletech/docker-images/pull/1509 [#1510]: https://github.com/stackabletech/docker-images/pull/1510 +[#1512]: https://github.com/stackabletech/docker-images/pull/1512 ## [26.3.0] - 2026-03-16 diff --git a/airflow/opa-auth-manager/airflow-3/opa_auth_manager/opa_fab_auth_manager.py b/airflow/opa-auth-manager/airflow-3/opa_auth_manager/opa_fab_auth_manager.py index 07953e060..78e18595e 100644 --- a/airflow/opa-auth-manager/airflow-3/opa_auth_manager/opa_fab_auth_manager.py +++ b/airflow/opa-auth-manager/airflow-3/opa_auth_manager/opa_fab_auth_manager.py @@ -19,13 +19,18 @@ PoolDetails, VariableDetails, ) +from airflow.api_fastapi.common.types import MenuItem from airflow.configuration import conf +from airflow.models.dag import DagModel from airflow.providers.fab.auth_manager.fab_auth_manager import FabAuthManager from airflow.providers.fab.auth_manager.models import User from airflow.stats import Stats from airflow.utils.log.logging_mixin import LoggingMixin +from airflow.utils.session import NEW_SESSION, provide_session from cachetools import TTLCache, cachedmethod from overrides import override +from sqlalchemy import select +from sqlalchemy.orm import Session METRIC_NAME_OPA_CACHE_LIMIT_REACHED = "opa_cache_limit_reached" @@ -75,12 +80,38 @@ class OpaFabAuthManager(FabAuthManager, LoggingMixin): AUTH_OPA_REQUEST_TIMEOUT_DEFAULT = 10 @override - def init_flask_resources(self) -> None: + def init(self) -> None: """ Run operations when Airflow is initializing. + + Called by the FastAPI api-server during startup. + """ + + super().init() + self._init_opa_resources() + + @override + def init_flask_resources(self) -> None: + """ + Run operations when the Flask app (FAB UI) is initializing. """ super().init_flask_resources() + self._init_opa_resources() + + def _init_opa_resources(self) -> None: + """ + Set up the OPA cache and HTTP session. + + Called from both ``init`` (FastAPI api-server) and + ``init_flask_resources`` (Flask AppBuilder). In Airflow 3 both run + during a single api-server startup but on *different* + OpaFabAuthManager instances — ``init_appbuilder`` calls + ``create_auth_manager()`` again, which constructs a fresh instance. + The api-server instance is reachable via + ``request.app.state.auth_manager``; the FAB instance is returned by + ``get_auth_manager()``. Both need their own cache and session. + """ Stats.incr(METRIC_NAME_OPA_CACHE_LIMIT_REACHED, count=0) @@ -121,9 +152,6 @@ def _is_authorized_in_opa(self, endpoint: str, input: OpaInput) -> bool: self.log.debug("Forward authorization request to OPA") - opa_url = conf.get( - "core", "AUTH_OPA_REQUEST_URL", fallback=self.AUTH_OPA_REQUEST_URL_DEFAULT - ) opa_url = conf.get( "core", "AUTH_OPA_REQUEST_URL", fallback=self.AUTH_OPA_REQUEST_URL_DEFAULT ) @@ -551,3 +579,70 @@ def is_authorized_custom_view( } ), ) + + @provide_session + @override + def get_authorized_dag_ids( + self, + *, + user: User, + method: ResourceMethod = "GET", + session: Session = NEW_SESSION, + ) -> set[str]: + # FabAuthManager's implementation consults the user's FAB DB role + # permissions and bypasses is_authorized_dag entirely, which makes any + # user without a FAB role (e.g. the default Public role) see an empty + # DAG list even when OPA would allow them. List all DAG ids and filter + # them via is_authorized_dag → OPA directly, without going through + # filter_authorized_dag_ids — the Fab base class may add a DB-backed + # override of that method in the future. + dag_ids = {dag.dag_id for dag in session.execute(select(DagModel.dag_id))} + return { + dag_id + for dag_id in dag_ids + if self.is_authorized_dag( + method=method, details=DagDetails(id=dag_id), user=user + ) + } + + @override + def filter_authorized_menu_items( + self, menu_items: list[MenuItem], user: User + ) -> list[MenuItem]: + # FabAuthManager filters menu items via the FAB role permissions in the + # DB, which yields an empty menu for users without FAB perms even when + # OPA grants access. Route each menu item through the matching + # OPA-backed is_authorized_* call instead. + return [ + item for item in menu_items if self._is_menu_item_authorized(item, user) + ] + + def _is_menu_item_authorized(self, menu_item: MenuItem, user: User) -> bool: + if menu_item == MenuItem.ASSETS: + return self.is_authorized_asset(method="GET", user=user) + if menu_item == MenuItem.AUDIT_LOG: + return self.is_authorized_dag( + method="GET", access_entity=DagAccessEntity.AUDIT_LOG, user=user + ) + if menu_item == MenuItem.CONFIG: + return self.is_authorized_configuration(method="GET", user=user) + if menu_item == MenuItem.CONNECTIONS: + return self.is_authorized_connection(method="GET", user=user) + if menu_item == MenuItem.DAGS: + return self.is_authorized_dag(method="GET", user=user) + if menu_item == MenuItem.DOCS: + return self.is_authorized_view(access_view=AccessView.DOCS, user=user) + if menu_item == MenuItem.PLUGINS: + return self.is_authorized_view(access_view=AccessView.PLUGINS, user=user) + if menu_item == MenuItem.POOLS: + return self.is_authorized_pool(method="GET", user=user) + if menu_item == MenuItem.PROVIDERS: + return self.is_authorized_view(access_view=AccessView.PROVIDERS, user=user) + if menu_item == MenuItem.VARIABLES: + return self.is_authorized_variable(method="GET", user=user) + if menu_item == MenuItem.XCOMS: + return self.is_authorized_dag( + method="GET", access_entity=DagAccessEntity.XCOM, user=user + ) + self.log.warning("Unknown menu item %s — denying", menu_item) + return False diff --git a/airflow/opa-auth-manager/airflow-3/tests/test_opa_fab_auth_manager.py b/airflow/opa-auth-manager/airflow-3/tests/test_opa_fab_auth_manager.py index 762e2b1be..eae9799f7 100644 --- a/airflow/opa-auth-manager/airflow-3/tests/test_opa_fab_auth_manager.py +++ b/airflow/opa-auth-manager/airflow-3/tests/test_opa_fab_auth_manager.py @@ -6,6 +6,7 @@ # Then we could run these tests against the Airflow instance and use the Airflow API to # actually test the effect of Rego policies on user authorization. # +from types import SimpleNamespace from unittest import mock from unittest.mock import Mock @@ -14,6 +15,7 @@ DagAccessEntity, DagDetails, ) +from airflow.api_fastapi.common.types import MenuItem from airflow.providers.fab.www.extensions.init_appbuilder import init_appbuilder from airflow.providers.fab.www.security.permissions import ( ACTION_CAN_CREATE, @@ -55,7 +57,52 @@ def auth_manager_with_appbuilder(flask_app): return auth_manager +@pytest.fixture +def mock_opa(monkeypatch): + """ + Replace the OPA HTTP boundary (``call_opa``) so tests exercise the real + ``is_authorized_*`` → ``_is_authorized_in_opa`` → ``OpaInput`` chain + without making network calls. + + Set ``mock_opa.decide = lambda endpoint, body: bool`` to drive per-request + decisions. Default returns ``False`` (deny). Recorded ``(endpoint, body)`` + pairs are available as ``mock_opa.calls``. + """ + state = SimpleNamespace( + decide=lambda endpoint, body: False, + calls=[], + ) + + def fake_call_opa(self, url, json, timeout): + endpoint = url.rsplit("/", 1)[-1] + state.calls.append((endpoint, json)) + response = Mock() + response.json.return_value = {"result": state.decide(endpoint, json)} + return response + + monkeypatch.setattr(OpaFabAuthManager, "call_opa", fake_call_opa) + return state + + +def _make_user(name="jane.doe", user_id="1"): + user = Mock() + user.get_id.return_value = user_id + user.get_name.return_value = name + return user + + class TestOpaFabAuthManager: + def test_init_wires_opa_cache_for_fastapi_apiserver(self): + # The FastAPI api-server calls auth_manager.init() instead of + # init_flask_resources(). Without wiring the cache from init() too, + # any is_authorized_* call from a REST handler crashes with + # AttributeError: 'OpaFabAuthManager' object has no attribute 'opa_cache'. + auth_manager = OpaFabAuthManager() + auth_manager.init() + + assert auth_manager.opa_cache is not None + assert auth_manager.opa_session is not None + @pytest.mark.parametrize( "method, dag_access_entity, dag_details, user_permissions, expected_opa_result, expected_result", [ @@ -228,3 +275,155 @@ def test_is_authorized_dag( user=user, ) assert result == expected_result + + def test_get_authorized_dag_ids_uses_opa_not_fab_db( + self, auth_manager_with_appbuilder, mock_opa + ): + # Repro for the OPA listing bug: a user with no FAB permissions + # (e.g. the default Public role) must still see the DAGs that OPA + # allows. The FabAuthManager base override would return set() here + # because it reads roles from the metadata DB. + user = _make_user() + + session = Mock() + session.execute.return_value = [ + Mock(dag_id="allowed_dag"), + Mock(dag_id="denied_dag"), + ] + + mock_opa.decide = lambda endpoint, body: ( + endpoint == "is_authorized_dag" + and body["input"]["details"]["id"] == "allowed_dag" + ) + + result = auth_manager_with_appbuilder.get_authorized_dag_ids( + user=user, method="GET", session=session + ) + + assert result == {"allowed_dag"} + # Every DAG id was offered to OPA — confirms per-item delegation + # rather than a global FAB role lookup. + asked = {body["input"]["details"]["id"] for _, body in mock_opa.calls} + assert asked == {"allowed_dag", "denied_dag"} + + def test_get_authorized_dag_ids_provides_session_when_caller_omits_it( + self, auth_manager_with_appbuilder, mock_opa + ): + # Real callers (api_fastapi/core_api/security.py) don't pass `session`. + # Our override must rely on @provide_session to inject one; previously + # it forwarded the default NEW_SESSION (None) and crashed with + # 'NoneType' has no attribute 'execute'. + user = _make_user() + + session = Mock() + session.execute.return_value = [Mock(dag_id="allowed_dag")] + mock_opa.decide = lambda endpoint, body: True + + with mock.patch("airflow.utils.session.create_session") as mock_create_session: + mock_create_session.return_value.__enter__.return_value = session + result = auth_manager_with_appbuilder.get_authorized_dag_ids( + user=user, method="GET" + ) + + assert result == {"allowed_dag"} + mock_create_session.assert_called_once() + + @pytest.mark.parametrize( + "menu_item, expected_endpoint, expected_input_subset", + [ + (MenuItem.ASSETS, "is_authorized_asset", {"method": "GET"}), + ( + MenuItem.AUDIT_LOG, + "is_authorized_dag", + {"method": "GET", "access_entity": "AUDIT_LOG"}, + ), + (MenuItem.CONFIG, "is_authorized_configuration", {"method": "GET"}), + (MenuItem.CONNECTIONS, "is_authorized_connection", {"method": "GET"}), + ( + MenuItem.DAGS, + "is_authorized_dag", + {"method": "GET", "access_entity": None}, + ), + (MenuItem.DOCS, "is_authorized_view", {"access_view": "DOCS"}), + (MenuItem.PLUGINS, "is_authorized_view", {"access_view": "PLUGINS"}), + (MenuItem.POOLS, "is_authorized_pool", {"method": "GET"}), + (MenuItem.PROVIDERS, "is_authorized_view", {"access_view": "PROVIDERS"}), + (MenuItem.VARIABLES, "is_authorized_variable", {"method": "GET"}), + ( + MenuItem.XCOMS, + "is_authorized_dag", + {"method": "GET", "access_entity": "XCOM"}, + ), + ], + ) + def test_filter_authorized_menu_items_routes_through_opa( + self, + menu_item, + expected_endpoint, + expected_input_subset, + auth_manager_with_appbuilder, + mock_opa, + ): + # Each MenuItem must trigger a request to the matching OPA endpoint + # with the expected input, so menu visibility is OPA-driven rather + # than FAB-DB-driven, and the Rego wire contract is documented. + user = _make_user() + + mock_opa.decide = lambda endpoint, body: True + allowed = auth_manager_with_appbuilder.filter_authorized_menu_items( + [menu_item], user=user + ) + assert allowed == [menu_item] + + assert len(mock_opa.calls) == 1 + endpoint, body = mock_opa.calls[0] + assert endpoint == expected_endpoint + assert expected_input_subset.items() <= body["input"].items() + + # Deny path: a fresh OPA decision actually filters the item out, + # proving the dispatch consults OPA rather than always allowing. + auth_manager_with_appbuilder.opa_cache.clear() + mock_opa.decide = lambda endpoint, body: False + denied = auth_manager_with_appbuilder.filter_authorized_menu_items( + [menu_item], user=user + ) + assert denied == [] + + def test_filter_authorized_menu_items_denies_unknown( + self, auth_manager_with_appbuilder, mock_opa + ): + # A MenuItem value not handled by _is_menu_item_authorized (e.g. one + # introduced in a future Airflow version) must fail closed: denied + # without consulting OPA, so a new UI surface isn't silently exposed + # before the dispatch table is updated. + unknown = Mock(spec=MenuItem) + unknown.name = "FUTURE_THING" + + result = auth_manager_with_appbuilder.filter_authorized_menu_items( + [unknown], user=_make_user() + ) + + assert result == [] + assert mock_opa.calls == [] + + def test_filter_authorized_menu_items_preserves_order_and_filters( + self, auth_manager_with_appbuilder, mock_opa + ): + user = _make_user() + + def decide(endpoint, body): + if endpoint == "is_authorized_dag": + # Allow DAGs root menu, deny the XCOM access entity. + return body["input"].get("access_entity") is None + if endpoint == "is_authorized_connection": + return True + return False + + mock_opa.decide = decide + + result = auth_manager_with_appbuilder.filter_authorized_menu_items( + [MenuItem.DAGS, MenuItem.DOCS, MenuItem.CONNECTIONS, MenuItem.XCOMS], + user=user, + ) + + assert result == [MenuItem.DAGS, MenuItem.CONNECTIONS]