Skip to content

Commit 57280af

Browse files
harshachgithub-actions[bot]keshavmohta09ulixius9
committed
Airflow 3.x API based connector (#26624)
* Add Airflow Connector with API integration * Add Airflow Connector with API integration * Update generated TypeScript types * Add Airflow Connector with API integration improvements * fix: username password flow for airflow 3, example yaml file, & sidebar docs * fix type in UI * Fix integration tests, fixed UI rendering and docs, improved OpenLineageResolver * Fix pytests * move connector * Update generated TypeScript types * fix: response parsing for astronomer airflow * feat: added service account auth for airflow rest connection when composer managed airflow along with token * fix: airflow rest api connection class converter and airflow.md * feat: add mwaa config support for authentication * s3 & column lineage * Update generated TypeScript types * fix: test airflow mwaa client * fix: removed unused method, and extra code for parsing response * fix: git pr checks * fix: removed airflowapi integration tests that requires real host instance and added test with mocking * fix test * improve test coverage * push coverage * fix: gitar comments * fix: removed redundant files --------- Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com> Co-authored-by: Keshav Mohta <68001229+keshavmohta09@users.noreply.github.com> Co-authored-by: Keshav Mohta <keshavmohta09@gmail.com> Co-authored-by: ulixius9 <mayursingal9@gmail.com>
1 parent efd7824 commit 57280af

59 files changed

Lines changed: 9747 additions & 774 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

docker/development/docker-compose.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -512,6 +512,9 @@ services:
512512
AIRFLOW__CORE__EXECUTOR: LocalExecutor
513513
AIRFLOW__LOGGING__LOGGING_LEVEL: ${AIRFLOW_LOGGING_LEVEL:-DEBUG}
514514
AIRFLOW__OPENMETADATA_AIRFLOW_APIS__DAG_GENERATED_CONFIGS: "/opt/airflow/dag_generated_configs"
515+
# OpenLineage transport config (optional - enable for lineage via OL)
516+
# AIRFLOW__OPENLINEAGE__TRANSPORT: '{"type": "http", "url": "http://openmetadata-server:8585/api/v1/openlineage/", "endpoint": "lineage", "auth": {"type": "api_key", "api_key": "<OM_JWT_TOKEN>"}}'
517+
# AIRFLOW__OPENLINEAGE__NAMESPACE: local_airflow
515518
DB_HOST: ${AIRFLOW_DB_HOST:-mysql}
516519
DB_PORT: ${AIRFLOW_DB_PORT:-3306}
517520
AIRFLOW_DB: ${AIRFLOW_DB:-airflow_db}

ingestion/src/metadata/clients/aws_client.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ class AWSServices(Enum):
4343
REDSHIFT = "redshift"
4444
REDSHIFT_SERVERLESS = "redshift-serverless"
4545
LAKE_FORMATION = "lakeformation"
46+
MWAA = "mwaa"
4647

4748

4849
class AWSAssumeRoleException(Exception):
@@ -253,3 +254,6 @@ def get_redshift_client(self):
253254

254255
def get_redshift_serverless_client(self):
255256
return self.get_client(AWSServices.REDSHIFT_SERVERLESS.value)
257+
258+
def get_mwaa_client(self):
259+
return self.get_client(AWSServices.MWAA.value)

ingestion/src/metadata/ingestion/source/pipeline/airflow/api/__init__.py

Whitespace-only changes.
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
# Copyright 2025 Collate
2+
# Licensed under the Collate Community License, Version 1.0 (the "License");
3+
# you may not use this file except in compliance with the License.
4+
# You may obtain a copy of the License at
5+
# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
6+
# Unless required by applicable law or agreed to in writing, software
7+
# distributed under the License is distributed on an "AS IS" BASIS,
8+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
9+
# See the License for the specific language governing permissions and
10+
# limitations under the License.
11+
"""
12+
Auth helper functions for the Airflow REST API client.
13+
"""
14+
import base64
15+
import traceback
16+
from datetime import datetime, timedelta, timezone
17+
from typing import Callable, Optional, Tuple
18+
19+
import requests
20+
21+
from metadata.utils.credentials import (
22+
get_gcp_impersonate_credentials,
23+
set_google_credentials,
24+
)
25+
from metadata.utils.logger import ingestion_logger
26+
27+
logger = ingestion_logger()
28+
29+
TokenCallback = Callable[[], Tuple[str, object]]
30+
31+
_JWT_REFRESH_INTERVAL_SECONDS = (
32+
25 * 60
33+
) # re-fetch every 25 min, well within Airflow's ~30-60 min TTL
34+
_BASIC_AUTH_TTL_SECONDS = (
35+
7 * 24 * 3600
36+
) # basic auth doesn't expire; skip retry for 7 days
37+
38+
39+
def try_exchange_jwt(
40+
host: str, username: str, password: str, verify: bool
41+
) -> Optional[str]:
42+
"""POST {host}/auth/token to get a JWT Bearer token (Airflow 3.x). Returns None on failure."""
43+
try:
44+
resp = requests.post(
45+
f"{host}/auth/token",
46+
json={"username": username, "password": password},
47+
timeout=10,
48+
verify=verify,
49+
)
50+
resp.raise_for_status()
51+
return resp.json().get("access_token")
52+
except Exception:
53+
logger.debug(
54+
"JWT token exchange failed (likely Airflow 2.x): %s", traceback.format_exc()
55+
)
56+
return None
57+
58+
59+
def build_access_token_callback(token: str) -> TokenCallback:
60+
"""Returns a static token callback with no expiry."""
61+
return lambda: (token, 0)
62+
63+
64+
def build_basic_auth_callback(
65+
host: str, username: str, password: str, verify: bool
66+
) -> Tuple[TokenCallback, None]:
67+
"""
68+
Returns (callback, None). auth_token_mode=None means client.py uses the
69+
token value as-is; the callback embeds 'Bearer' or 'Basic' prefix itself.
70+
71+
On every refresh cycle the callback re-calls try_exchange_jwt so the JWT
72+
is always freshly issued — no stale-token 401s for long-running ingestions.
73+
Falls back to Basic auth for Airflow 2.x servers.
74+
"""
75+
76+
def _callback() -> Tuple[str, object]:
77+
jwt = try_exchange_jwt(host, username, password, verify)
78+
if jwt:
79+
return f"Bearer {jwt}", _JWT_REFRESH_INTERVAL_SECONDS
80+
b64 = base64.b64encode(f"{username}:{password}".encode()).decode()
81+
return f"Basic {b64}", _BASIC_AUTH_TTL_SECONDS
82+
83+
return _callback, None
84+
85+
86+
def build_gcp_token_callback(gcp_credentials) -> TokenCallback:
87+
"""
88+
Returns a token callback that fetches and auto-refreshes GCP OAuth2 tokens.
89+
90+
Supports all 4 GCP credential types via set_google_credentials():
91+
- GcpCredentialsValues: service account JSON values (clientEmail, privateKey, etc.)
92+
- GcpCredentialsPath: path to a credentials JSON file
93+
- GcpExternalAccount: workload identity federation
94+
- GcpADC: application default credentials
95+
96+
Also handles optional service account impersonation via gcpImpersonateServiceAccount.
97+
"""
98+
set_google_credentials(gcp_credentials)
99+
impersonate = gcp_credentials.gcpImpersonateServiceAccount
100+
101+
def _callback() -> Tuple[str, datetime]:
102+
import google.auth
103+
from google.auth.transport.requests import Request as AuthRequest
104+
105+
if impersonate and impersonate.impersonateServiceAccount:
106+
credentials = get_gcp_impersonate_credentials(
107+
impersonate_service_account=impersonate.impersonateServiceAccount,
108+
scopes=["https://www.googleapis.com/auth/cloud-platform"],
109+
lifetime=impersonate.lifetime,
110+
)
111+
else:
112+
credentials, _ = google.auth.default(
113+
scopes=["https://www.googleapis.com/auth/cloud-platform"]
114+
)
115+
116+
credentials.refresh(AuthRequest())
117+
expiry = getattr(credentials, "expiry", None) or (
118+
datetime.now(timezone.utc) + timedelta(minutes=55)
119+
)
120+
return (credentials.token, expiry)
121+
122+
return _callback

0 commit comments

Comments
 (0)