Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ All notable changes to this project will be documented in this file.
- vector: Look for SBOM in correct location ([#1471]).
- vector: Use correct license ([#1476]).
- trino: Build a patched Airlift from source and depend on it to backport [airlift/airlift#1943](https://github.com/airlift/airlift/pull/1943), applying the configured max response header size to Jetty's `maxResponseHeaderSize` ([#1510]).
- airflow: Route DAG listings and menu items through OPA in the Airflow 3 OPA auth manager, and wire the OPA cache on the FastAPI api-server init path ([#1512]).

### Removed

Expand All @@ -42,6 +43,7 @@ All notable changes to this project will be documented in this file.
[#1493]: https://github.com/stackabletech/docker-images/pull/1493
[#1509]: https://github.com/stackabletech/docker-images/pull/1509
[#1510]: https://github.com/stackabletech/docker-images/pull/1510
[#1512]: https://github.com/stackabletech/docker-images/pull/1512

## [26.3.0] - 2026-03-16

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,18 @@
PoolDetails,
VariableDetails,
)
from airflow.api_fastapi.common.types import MenuItem
from airflow.configuration import conf
from airflow.models.dag import DagModel
from airflow.providers.fab.auth_manager.fab_auth_manager import FabAuthManager
from airflow.providers.fab.auth_manager.models import User
from airflow.stats import Stats
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.session import NEW_SESSION, provide_session
from cachetools import TTLCache, cachedmethod
from overrides import override
from sqlalchemy import select
from sqlalchemy.orm import Session

METRIC_NAME_OPA_CACHE_LIMIT_REACHED = "opa_cache_limit_reached"

Expand Down Expand Up @@ -75,12 +80,38 @@ class OpaFabAuthManager(FabAuthManager, LoggingMixin):
AUTH_OPA_REQUEST_TIMEOUT_DEFAULT = 10

@override
def init_flask_resources(self) -> None:
def init(self) -> None:
"""
Run operations when Airflow is initializing.

Called by the FastAPI api-server during startup.
"""

super().init()
self._init_opa_resources()

@override
def init_flask_resources(self) -> None:
"""
Run operations when the Flask app (FAB UI) is initializing.
"""

super().init_flask_resources()
self._init_opa_resources()

def _init_opa_resources(self) -> None:
"""
Set up the OPA cache and HTTP session.

Called from both ``init`` (FastAPI api-server) and
``init_flask_resources`` (Flask AppBuilder). In Airflow 3 both run
during a single api-server startup but on *different*
OpaFabAuthManager instances — ``init_appbuilder`` calls
``create_auth_manager()`` again, which constructs a fresh instance.
The api-server instance is reachable via
``request.app.state.auth_manager``; the FAB instance is returned by
``get_auth_manager()``. Both need their own cache and session.
"""

Stats.incr(METRIC_NAME_OPA_CACHE_LIMIT_REACHED, count=0)

Expand Down Expand Up @@ -121,9 +152,6 @@ def _is_authorized_in_opa(self, endpoint: str, input: OpaInput) -> bool:

self.log.debug("Forward authorization request to OPA")

opa_url = conf.get(
"core", "AUTH_OPA_REQUEST_URL", fallback=self.AUTH_OPA_REQUEST_URL_DEFAULT
)
opa_url = conf.get(
"core", "AUTH_OPA_REQUEST_URL", fallback=self.AUTH_OPA_REQUEST_URL_DEFAULT
)
Expand Down Expand Up @@ -551,3 +579,70 @@ def is_authorized_custom_view(
}
),
)

@provide_session
@override
def get_authorized_dag_ids(
self,
*,
user: User,
method: ResourceMethod = "GET",
session: Session = NEW_SESSION,
) -> set[str]:
# FabAuthManager's implementation consults the user's FAB DB role
# permissions and bypasses is_authorized_dag entirely, which makes any
# user without a FAB role (e.g. the default Public role) see an empty
# DAG list even when OPA would allow them. List all DAG ids and filter
# them via is_authorized_dag → OPA directly, without going through
# filter_authorized_dag_ids — the Fab base class may add a DB-backed
# override of that method in the future.
dag_ids = {dag.dag_id for dag in session.execute(select(DagModel.dag_id))}
return {
dag_id
for dag_id in dag_ids
if self.is_authorized_dag(
method=method, details=DagDetails(id=dag_id), user=user
)
}

@override
def filter_authorized_menu_items(
self, menu_items: list[MenuItem], user: User
) -> list[MenuItem]:
# FabAuthManager filters menu items via the FAB role permissions in the
# DB, which yields an empty menu for users without FAB perms even when
# OPA grants access. Route each menu item through the matching
# OPA-backed is_authorized_* call instead.
return [
item for item in menu_items if self._is_menu_item_authorized(item, user)
]

def _is_menu_item_authorized(self, menu_item: MenuItem, user: User) -> bool:
if menu_item == MenuItem.ASSETS:
return self.is_authorized_asset(method="GET", user=user)
if menu_item == MenuItem.AUDIT_LOG:
return self.is_authorized_dag(
method="GET", access_entity=DagAccessEntity.AUDIT_LOG, user=user
)
if menu_item == MenuItem.CONFIG:
return self.is_authorized_configuration(method="GET", user=user)
if menu_item == MenuItem.CONNECTIONS:
return self.is_authorized_connection(method="GET", user=user)
if menu_item == MenuItem.DAGS:
return self.is_authorized_dag(method="GET", user=user)
if menu_item == MenuItem.DOCS:
return self.is_authorized_view(access_view=AccessView.DOCS, user=user)
if menu_item == MenuItem.PLUGINS:
return self.is_authorized_view(access_view=AccessView.PLUGINS, user=user)
if menu_item == MenuItem.POOLS:
return self.is_authorized_pool(method="GET", user=user)
if menu_item == MenuItem.PROVIDERS:
return self.is_authorized_view(access_view=AccessView.PROVIDERS, user=user)
if menu_item == MenuItem.VARIABLES:
return self.is_authorized_variable(method="GET", user=user)
if menu_item == MenuItem.XCOMS:
return self.is_authorized_dag(
method="GET", access_entity=DagAccessEntity.XCOM, user=user
)
self.log.warning("Unknown menu item %s — denying", menu_item)
return False
199 changes: 199 additions & 0 deletions airflow/opa-auth-manager/airflow-3/tests/test_opa_fab_auth_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
# Then we could run these tests against the Airflow instance and use the Airflow API to
# actually test the effect of Rego policies on user authorization.
#
from types import SimpleNamespace
from unittest import mock
from unittest.mock import Mock

Expand All @@ -14,6 +15,7 @@
DagAccessEntity,
DagDetails,
)
from airflow.api_fastapi.common.types import MenuItem
from airflow.providers.fab.www.extensions.init_appbuilder import init_appbuilder
from airflow.providers.fab.www.security.permissions import (
ACTION_CAN_CREATE,
Expand Down Expand Up @@ -55,7 +57,52 @@ def auth_manager_with_appbuilder(flask_app):
return auth_manager


@pytest.fixture
def mock_opa(monkeypatch):
"""
Replace the OPA HTTP boundary (``call_opa``) so tests exercise the real
``is_authorized_*`` → ``_is_authorized_in_opa`` → ``OpaInput`` chain
without making network calls.

Set ``mock_opa.decide = lambda endpoint, body: bool`` to drive per-request
decisions. Default returns ``False`` (deny). Recorded ``(endpoint, body)``
pairs are available as ``mock_opa.calls``.
"""
state = SimpleNamespace(
decide=lambda endpoint, body: False,
calls=[],
)

def fake_call_opa(self, url, json, timeout):
endpoint = url.rsplit("/", 1)[-1]
state.calls.append((endpoint, json))
response = Mock()
response.json.return_value = {"result": state.decide(endpoint, json)}
return response

monkeypatch.setattr(OpaFabAuthManager, "call_opa", fake_call_opa)
return state


def _make_user(name="jane.doe", user_id="1"):
user = Mock()
user.get_id.return_value = user_id
user.get_name.return_value = name
return user


class TestOpaFabAuthManager:
def test_init_wires_opa_cache_for_fastapi_apiserver(self):
# The FastAPI api-server calls auth_manager.init() instead of
# init_flask_resources(). Without wiring the cache from init() too,
# any is_authorized_* call from a REST handler crashes with
# AttributeError: 'OpaFabAuthManager' object has no attribute 'opa_cache'.
auth_manager = OpaFabAuthManager()
auth_manager.init()

assert auth_manager.opa_cache is not None
assert auth_manager.opa_session is not None

@pytest.mark.parametrize(
"method, dag_access_entity, dag_details, user_permissions, expected_opa_result, expected_result",
[
Expand Down Expand Up @@ -228,3 +275,155 @@ def test_is_authorized_dag(
user=user,
)
assert result == expected_result

def test_get_authorized_dag_ids_uses_opa_not_fab_db(
self, auth_manager_with_appbuilder, mock_opa
):
# Repro for the OPA listing bug: a user with no FAB permissions
# (e.g. the default Public role) must still see the DAGs that OPA
# allows. The FabAuthManager base override would return set() here
# because it reads roles from the metadata DB.
user = _make_user()

session = Mock()
session.execute.return_value = [
Mock(dag_id="allowed_dag"),
Mock(dag_id="denied_dag"),
]

mock_opa.decide = lambda endpoint, body: (
endpoint == "is_authorized_dag"
and body["input"]["details"]["id"] == "allowed_dag"
)

result = auth_manager_with_appbuilder.get_authorized_dag_ids(
user=user, method="GET", session=session
)

assert result == {"allowed_dag"}
# Every DAG id was offered to OPA — confirms per-item delegation
# rather than a global FAB role lookup.
asked = {body["input"]["details"]["id"] for _, body in mock_opa.calls}
assert asked == {"allowed_dag", "denied_dag"}

def test_get_authorized_dag_ids_provides_session_when_caller_omits_it(
self, auth_manager_with_appbuilder, mock_opa
):
# Real callers (api_fastapi/core_api/security.py) don't pass `session`.
# Our override must rely on @provide_session to inject one; previously
# it forwarded the default NEW_SESSION (None) and crashed with
# 'NoneType' has no attribute 'execute'.
user = _make_user()

session = Mock()
session.execute.return_value = [Mock(dag_id="allowed_dag")]
mock_opa.decide = lambda endpoint, body: True

with mock.patch("airflow.utils.session.create_session") as mock_create_session:
mock_create_session.return_value.__enter__.return_value = session
result = auth_manager_with_appbuilder.get_authorized_dag_ids(
user=user, method="GET"
)

assert result == {"allowed_dag"}
mock_create_session.assert_called_once()

@pytest.mark.parametrize(
"menu_item, expected_endpoint, expected_input_subset",
[
(MenuItem.ASSETS, "is_authorized_asset", {"method": "GET"}),
(
MenuItem.AUDIT_LOG,
"is_authorized_dag",
{"method": "GET", "access_entity": "AUDIT_LOG"},
),
(MenuItem.CONFIG, "is_authorized_configuration", {"method": "GET"}),
(MenuItem.CONNECTIONS, "is_authorized_connection", {"method": "GET"}),
(
MenuItem.DAGS,
"is_authorized_dag",
{"method": "GET", "access_entity": None},
),
(MenuItem.DOCS, "is_authorized_view", {"access_view": "DOCS"}),
(MenuItem.PLUGINS, "is_authorized_view", {"access_view": "PLUGINS"}),
(MenuItem.POOLS, "is_authorized_pool", {"method": "GET"}),
(MenuItem.PROVIDERS, "is_authorized_view", {"access_view": "PROVIDERS"}),
(MenuItem.VARIABLES, "is_authorized_variable", {"method": "GET"}),
(
MenuItem.XCOMS,
"is_authorized_dag",
{"method": "GET", "access_entity": "XCOM"},
),
],
)
def test_filter_authorized_menu_items_routes_through_opa(
self,
menu_item,
expected_endpoint,
expected_input_subset,
auth_manager_with_appbuilder,
mock_opa,
):
# Each MenuItem must trigger a request to the matching OPA endpoint
# with the expected input, so menu visibility is OPA-driven rather
# than FAB-DB-driven, and the Rego wire contract is documented.
user = _make_user()

mock_opa.decide = lambda endpoint, body: True
allowed = auth_manager_with_appbuilder.filter_authorized_menu_items(
[menu_item], user=user
)
assert allowed == [menu_item]

assert len(mock_opa.calls) == 1
endpoint, body = mock_opa.calls[0]
assert endpoint == expected_endpoint
assert expected_input_subset.items() <= body["input"].items()

# Deny path: a fresh OPA decision actually filters the item out,
# proving the dispatch consults OPA rather than always allowing.
auth_manager_with_appbuilder.opa_cache.clear()
mock_opa.decide = lambda endpoint, body: False
denied = auth_manager_with_appbuilder.filter_authorized_menu_items(
[menu_item], user=user
)
assert denied == []

def test_filter_authorized_menu_items_denies_unknown(
self, auth_manager_with_appbuilder, mock_opa
):
# A MenuItem value not handled by _is_menu_item_authorized (e.g. one
# introduced in a future Airflow version) must fail closed: denied
# without consulting OPA, so a new UI surface isn't silently exposed
# before the dispatch table is updated.
unknown = Mock(spec=MenuItem)
unknown.name = "FUTURE_THING"

result = auth_manager_with_appbuilder.filter_authorized_menu_items(
[unknown], user=_make_user()
)

assert result == []
assert mock_opa.calls == []

def test_filter_authorized_menu_items_preserves_order_and_filters(
self, auth_manager_with_appbuilder, mock_opa
):
user = _make_user()

def decide(endpoint, body):
if endpoint == "is_authorized_dag":
# Allow DAGs root menu, deny the XCOM access entity.
return body["input"].get("access_entity") is None
if endpoint == "is_authorized_connection":
return True
return False

mock_opa.decide = decide

result = auth_manager_with_appbuilder.filter_authorized_menu_items(
[MenuItem.DAGS, MenuItem.DOCS, MenuItem.CONNECTIONS, MenuItem.XCOMS],
user=user,
)

assert result == [MenuItem.DAGS, MenuItem.CONNECTIONS]
Loading