Skip to content

Commit 4b632cb

Browse files
committed
Fix-20713: Add support for metadata ingestion using local file in REST connector (#26036)
1 parent 185bab1 commit 4b632cb

45 files changed

Lines changed: 558 additions & 94 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
-- Migrate REST API service connections: move flat openAPISchemaURL into nested openAPISchemaConnection object
2+
UPDATE api_service_entity
3+
SET json = JSON_SET(
4+
JSON_REMOVE(json, '$.connection.config.openAPISchemaURL'),
5+
'$.connection.config.openAPISchemaConnection',
6+
JSON_OBJECT(
7+
'openAPISchemaURL',
8+
JSON_UNQUOTE(JSON_EXTRACT(json, '$.connection.config.openAPISchemaURL'))
9+
)
10+
)
11+
WHERE serviceType = 'Rest'
12+
AND JSON_CONTAINS_PATH(json, 'one', '$.connection.config.openAPISchemaURL')
13+
AND NOT JSON_CONTAINS_PATH(json, 'one', '$.connection.config.openAPISchemaConnection');

bootstrap/sql/migrations/native/1.11.11/mysql/schemaChanges.sql

Whitespace-only changes.
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
-- Migrate REST API service connections: move flat openAPISchemaURL into nested openAPISchemaConnection object
2+
UPDATE api_service_entity
3+
SET json = jsonb_set(
4+
json #- '{connection,config,openAPISchemaURL}',
5+
'{connection,config,openAPISchemaConnection}',
6+
jsonb_build_object('openAPISchemaURL', json #> '{connection,config,openAPISchemaURL}'),
7+
true
8+
)
9+
WHERE serviceType = 'Rest'
10+
AND jsonb_exists(json -> 'connection' -> 'config', 'openAPISchemaURL')
11+
AND NOT jsonb_exists(json -> 'connection' -> 'config', 'openAPISchemaConnection');

bootstrap/sql/migrations/native/1.11.11/postgres/schemaChanges.sql

Whitespace-only changes.

ingestion/examples/sample_data/api_service/service.json

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@
44
"serviceConnection": {
55
"config": {
66
"type": "Rest",
7-
"openAPISchemaURL": "https://petstore3.swagger.io/",
7+
"openAPISchemaConnection": {
8+
"openAPISchemaURL": "https://petstore3.swagger.io/"
9+
},
810
"token":"mock_token"
911
}
1012
},

ingestion/examples/sample_data/ometa_api_service/service.json

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@
44
"serviceConnection": {
55
"config": {
66
"type": "Rest",
7-
"openAPISchemaURL": "https://docs.open-metadata.org/swagger.html",
7+
"openAPISchemaConnection": {
8+
"openAPISchemaURL": "https://docs.open-metadata.org/swagger.html"
9+
},
810
"token":"token"
911
}
1012
},

ingestion/src/metadata/examples/workflows/rest.yaml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,10 @@ source:
44
serviceConnection:
55
config:
66
type: Rest
7-
openAPISchemaURL: https://petstore3.swagger.io/api/v3/openapi.json
7+
openAPISchemaConnection:
8+
openAPISchemaURL: https://petstore3.swagger.io/api/v3/openapi.json
9+
# Or use a local file path instead:
10+
# openAPISchemaFilePath: /path/to/openapi.json
811
docURL: https://petstore3.swagger.io/
912
# token: <jwt_token>
1013
sourceConfig:

ingestion/src/metadata/ingestion/source/api/rest/connection.py

Lines changed: 33 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,20 @@
1212
"""
1313
Source connection handler
1414
"""
15-
from typing import Optional
15+
from typing import Dict, Optional, Union
1616

1717
import requests
1818
from requests.models import Response
1919

2020
from metadata.generated.schema.entity.automations.workflow import (
2121
Workflow as AutomationWorkflow,
2222
)
23+
from metadata.generated.schema.entity.services.connections.api.openAPISchemaFilePath import (
24+
OpenAPISchemaFilePath,
25+
)
26+
from metadata.generated.schema.entity.services.connections.api.openAPISchemaURL import (
27+
OpenAPISchemaURL,
28+
)
2329
from metadata.generated.schema.entity.services.connections.api.restConnection import (
2430
RestConnection,
2531
)
@@ -31,6 +37,7 @@
3137
from metadata.ingestion.source.api.rest.parser import (
3238
OpenAPIParseError,
3339
parse_openapi_schema,
40+
parse_openapi_schema_from_file,
3441
validate_openapi_schema,
3542
)
3643
from metadata.utils.constants import THREE_MIN
@@ -48,19 +55,28 @@ class InvalidOpenAPISchemaError(Exception):
4855
"""
4956

5057

51-
def get_connection(connection: RestConnection) -> Response:
58+
def get_connection(connection: RestConnection) -> Union[Response, Dict]:
5259
"""
53-
Create connection
60+
Create connection.
61+
If openAPISchemaURL is provided, fetches the schema via HTTP.
62+
Otherwise, reads from the local openAPISchemaFilePath.
5463
"""
55-
if connection.token:
56-
headers = {"Authorization": f"Bearer {connection.token.get_secret_value()}"}
57-
return requests.get(connection.openAPISchemaURL, headers=headers)
58-
return requests.get(connection.openAPISchemaURL)
64+
schema_conn = connection.openAPISchemaConnection
65+
if isinstance(schema_conn, OpenAPISchemaURL):
66+
if connection.token:
67+
headers = {"Authorization": f"Bearer {connection.token.get_secret_value()}"}
68+
return requests.get(schema_conn.openAPISchemaURL, headers=headers)
69+
return requests.get(schema_conn.openAPISchemaURL)
70+
71+
if isinstance(schema_conn, OpenAPISchemaFilePath):
72+
return parse_openapi_schema_from_file(schema_conn.openAPISchemaFilePath)
73+
74+
raise ValueError(f"Unsupported openAPISchemaConnection type: {type(schema_conn)}")
5975

6076

6177
def test_connection(
6278
metadata: OpenMetadata,
63-
client: Response,
79+
client: Union[Response, Dict],
6480
service_connection: RestConnection,
6581
automation_workflow: Optional[AutomationWorkflow] = None,
6682
timeout_seconds: Optional[int] = THREE_MIN,
@@ -69,18 +85,22 @@ def test_connection(
6985
Test connection. This can be executed either as part
7086
of a metadata workflow or during an Automation Workflow
7187
"""
88+
is_local_file = isinstance(client, dict)
7289

7390
def custom_url_exec():
91+
if is_local_file:
92+
return []
7493
if client.status_code == 200:
7594
return []
7695
raise SchemaURLError(
77-
"Failed to connect to the JSON schema url. Please check the url and credentials. Status Code was: "
78-
+ str(client.status_code)
96+
"Failed to connect to the JSON schema url. "
97+
"Please check the url and credentials. "
98+
f"Status Code was: {client.status_code}"
7999
)
80100

81101
def custom_schema_exec():
82102
try:
83-
schema = parse_openapi_schema(client)
103+
schema = client if is_local_file else parse_openapi_schema(client)
84104
if validate_openapi_schema(schema):
85105
return []
86106

@@ -89,6 +109,8 @@ def custom_schema_exec():
89109
)
90110
except OpenAPIParseError as e:
91111
raise InvalidOpenAPISchemaError(f"Failed to parse OpenAPI schema: {e}")
112+
except InvalidOpenAPISchemaError:
113+
raise
92114
except Exception as e:
93115
raise InvalidOpenAPISchemaError(f"Error validating OpenAPI schema: {e}")
94116

ingestion/src/metadata/ingestion/source/api/rest/metadata.py

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@
2323
)
2424
from metadata.generated.schema.entity.data.apiCollection import APICollection
2525
from metadata.generated.schema.entity.data.apiEndpoint import ApiRequestMethod
26+
from metadata.generated.schema.entity.services.connections.api.openAPISchemaURL import (
27+
OpenAPISchemaURL,
28+
)
2629
from metadata.generated.schema.entity.services.connections.api.restConnection import (
2730
RestConnection,
2831
)
@@ -80,7 +83,10 @@ def get_api_collections(self, *args, **kwargs) -> Iterable[RESTCollection]:
8083
Here is where filtering happens
8184
"""
8285
try:
83-
self.json_response = parse_openapi_schema(self.connection)
86+
if isinstance(self.connection, dict):
87+
self.json_response = self.connection
88+
else:
89+
self.json_response = parse_openapi_schema(self.connection)
8490
collections_list = []
8591
tags_collection_set = set()
8692
if self.json_response.get("tags", []):
@@ -222,6 +228,13 @@ def _prepare_endpoint_data(
222228
logger.warning(f"Error while parsing endpoint data: {err}")
223229
return None
224230

231+
def _get_fallback_url(self) -> Optional[AnyUrl]:
232+
"""Return openAPISchemaURL if available, otherwise None."""
233+
schema_conn = self.config.serviceConnection.root.config.openAPISchemaConnection
234+
if isinstance(schema_conn, OpenAPISchemaURL):
235+
return schema_conn.openAPISchemaURL
236+
return None
237+
225238
def _generate_collection_url(self, collection_name: str) -> Optional[AnyUrl]:
226239
"""generate collection url"""
227240
try:
@@ -231,7 +244,7 @@ def _generate_collection_url(self, collection_name: str) -> Optional[AnyUrl]:
231244
f"Could not generate collection url for {collection_name}"
232245
" because docURL is not present"
233246
)
234-
return self.config.serviceConnection.root.config.openAPISchemaURL
247+
return self._get_fallback_url()
235248
base_url = str(base_url)
236249
if base_url.endswith("#/") or base_url.endswith("#"):
237250
base_url = base_url.split("#")[0]
@@ -240,11 +253,11 @@ def _generate_collection_url(self, collection_name: str) -> Optional[AnyUrl]:
240253
logger.warning(
241254
f"Error while generating collection url for {collection_name}: {err}"
242255
)
243-
return self.config.serviceConnection.root.config.openAPISchemaURL
256+
return self._get_fallback_url()
244257

245258
def _generate_endpoint_url(
246259
self, collection: RESTCollection, endpoint: RESTEndpoint
247-
) -> AnyUrl:
260+
) -> Optional[AnyUrl]:
248261
"""generate endpoint url"""
249262
try:
250263
if not collection.url or not endpoint.operationId:
@@ -253,11 +266,11 @@ def _generate_endpoint_url(
253266
f" collection url: {str(collection.url)},"
254267
f" endpoint operation id: {str(endpoint.operationId)}"
255268
)
256-
return self.config.serviceConnection.root.config.openAPISchemaURL
269+
return self._get_fallback_url()
257270
return AnyUrl(f"{str(collection.url)}/{endpoint.operationId}")
258271
except Exception as err:
259272
logger.warning(f"Error while generating collection url: {err}")
260-
return self.config.serviceConnection.root.config.openAPISchemaURL
273+
return self._get_fallback_url()
261274

262275
def _get_api_request_method(self, method_type: str) -> Optional[str]:
263276
"""fetch endpoint request method"""

ingestion/src/metadata/ingestion/source/api/rest/parser.py

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@
1313
OpenAPI schema parser for both JSON and YAML formats
1414
"""
1515
import json
16-
from typing import Any, Dict
16+
from pathlib import Path
17+
from typing import Any, Dict, Union
1718

1819
import yaml
1920
from requests.models import Response
@@ -101,3 +102,49 @@ def validate_openapi_schema(schema: Dict[str, Any]) -> bool:
101102

102103
# OpenAPI 3.x uses "openapi" field, OpenAPI 2.x uses "swagger" field
103104
return schema.get("openapi") is not None or schema.get("swagger") is not None
105+
106+
107+
def parse_openapi_schema_from_file(file_path: Union[str, Path]) -> Dict[str, Any]:
108+
"""
109+
Parse OpenAPI schema from a local file.
110+
Supports both JSON and YAML formats.
111+
"""
112+
path = Path(file_path)
113+
if not path.exists():
114+
raise OpenAPIParseError(f"File not found: {file_path}")
115+
if not path.is_file():
116+
raise OpenAPIParseError(f"Path is not a file: {file_path}")
117+
118+
content = path.read_text(encoding="utf-8")
119+
suffix = path.suffix.lower()
120+
121+
if suffix in (".json",):
122+
try:
123+
return json.loads(content)
124+
except json.JSONDecodeError as e:
125+
raise OpenAPIParseError(f"Failed to parse JSON file: {e}") from e
126+
127+
if suffix in (".yaml", ".yml"):
128+
try:
129+
parsed = yaml.safe_load(content)
130+
if parsed is None:
131+
raise OpenAPIParseError("YAML parsing returned None")
132+
return parsed
133+
except yaml.YAMLError as e:
134+
raise OpenAPIParseError(f"Failed to parse YAML file: {e}") from e
135+
136+
# Unknown extension — try JSON first, then YAML
137+
try:
138+
return json.loads(content)
139+
except json.JSONDecodeError:
140+
pass
141+
142+
try:
143+
parsed = yaml.safe_load(content)
144+
if parsed is None:
145+
raise OpenAPIParseError("YAML parsing returned None")
146+
return parsed
147+
except yaml.YAMLError:
148+
pass
149+
150+
raise OpenAPIParseError(f"Failed to parse '{file_path}' as either JSON or YAML.")

0 commit comments

Comments
 (0)