issue-20546: REST connector enhancements (#20634)

This commit is contained in:
harshsoni2024 2025-04-07 10:22:45 +05:30 committed by GitHub
parent b1dc6e0b0b
commit 7953f98097
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 218 additions and 35 deletions

View File

@ -1,14 +1,18 @@
source:
type: rest
serviceName: openapi_rest
serviceName: petstore_data
serviceConnection:
config:
type: Rest
openAPISchemaURL: https://docs.open-metadata.org/swagger.json
openAPISchemaURL: https://petstore3.swagger.io/api/v3/openapi.json
docURL: https://petstore3.swagger.io/
# token: <jwt_token>
sourceConfig:
config:
type: ApiMetadata
# apiCollectionFilterPattern:
# includes:
# - ^pet
sink:
type: metadata-rest
config: {}

View File

@ -30,6 +30,8 @@ from metadata.ingestion.connections.test_connections import test_connection_step
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils.constants import THREE_MIN
ACCEPTED_CONTENT_TYPES = ["application/json", "application/vnd.oai.openapi+json"]
class SchemaURLError(Exception):
"""
@ -66,9 +68,9 @@ def test_connection(
"""
def custom_url_exec():
if (
"application/json" in client.headers.get("content-type")
and client.status_code == 200
if client.status_code == 200 and any(
content_type in client.headers.get("content-type")
for content_type in ACCEPTED_CONTENT_TYPES
):
return []
raise SchemaURLError(

View File

@ -41,6 +41,8 @@ from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.api.api_service import ApiServiceSource
from metadata.ingestion.source.api.rest.models import RESTCollection, RESTEndpoint
from metadata.utils import fqn
from metadata.utils.filters import filter_by_collection
from metadata.utils.helpers import clean_uri
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
@ -76,21 +78,33 @@ class RestSource(ApiServiceSource):
"""
try:
self.json_response = self.connection.json()
collections_list = []
tags_collection_set = set()
if self.json_response.get("tags", []):
# Works only if list of tags are present in schema so we can fetch collection names
for collection in self.json_response.get("tags", []):
if not collection.get("name"):
continue
yield RESTCollection(**collection)
else:
# in other case collect tags from paths because we have to yield collection/tags first
collections_set = set()
for path, methods in self.json_response.get("paths", {}).items():
for method_type, info in methods.items():
collections_set.update({tag for tag in info.get("tags", [])})
for collection_name in collections_set:
data = {"name": collection_name}
yield RESTCollection(**data)
collections_list.append(collection)
tags_collection_set.update({collection.get("name")})
# iterate through paths if there's any missing collection not present in tags
collections_set = set()
for path, methods in self.json_response.get("paths", {}).items():
for method_type, info in methods.items():
collections_set.update({tag for tag in info.get("tags", [])})
for collection_name in collections_set:
if collection_name not in tags_collection_set:
collections_list.append({"name": collection_name})
for collection in collections_list:
if filter_by_collection(
self.source_config.apiCollectionFilterPattern,
collection.get("name"),
):
self.status.filter(
collection.get("name"), "Collection filtered out"
)
continue
yield RESTCollection(**collection)
except Exception as err:
logger.error(f"Error while fetching collections from schema URL :{err}")
@ -126,7 +140,9 @@ class RestSource(ApiServiceSource):
for path, methods in filtered_endpoints.items():
for method_type, info in methods.items():
try:
endpoint = self._prepare_endpoint_data(path, method_type, info)
endpoint = self._prepare_endpoint_data(
path, method_type, info, collection
)
if not endpoint:
continue
yield Either(
@ -176,12 +192,14 @@ class RestSource(ApiServiceSource):
)
return None
def _prepare_endpoint_data(self, path, method_type, info) -> Optional[RESTEndpoint]:
def _prepare_endpoint_data(
self, path, method_type, info, collection
) -> Optional[RESTEndpoint]:
try:
endpoint = RESTEndpoint(**info)
endpoint.url = self._generate_endpoint_url(endpoint.name)
if not endpoint.name:
endpoint.name = f"{path} - {method_type}"
endpoint.name = f"{path}/{method_type}"
endpoint.display_name = f"{path}"
endpoint.url = self._generate_endpoint_url(collection, endpoint)
return endpoint
except Exception as err:
logger.warning(f"Error while parsing endpoint data: {err}")
@ -190,20 +208,39 @@ class RestSource(ApiServiceSource):
def _generate_collection_url(self, collection_name: str) -> Optional[AnyUrl]:
"""generate collection url"""
try:
if collection_name:
return AnyUrl(
f"{self.config.serviceConnection.root.config.openAPISchemaURL}#tag/{collection_name}"
base_url = self.config.serviceConnection.root.config.docURL
if not base_url:
logger.debug(
f"Could not generate collection url for {collection_name}"
" because docURL is not present"
)
return self.config.serviceConnection.root.config.openAPISchemaURL
base_url = str(base_url)
if base_url.endswith("#/") or base_url.endswith("#"):
base_url = base_url.split("#")[0]
return AnyUrl(f"{clean_uri(base_url)}/#/{collection_name}")
except Exception as err:
logger.warning(
f"Error while generating collection url for {collection_name}: {err}"
)
return self.config.serviceConnection.root.config.openAPISchemaURL
def _generate_endpoint_url(
self, collection: RESTCollection, endpoint: RESTEndpoint
) -> AnyUrl:
"""generate endpoint url"""
try:
if not collection.url or not endpoint.operationId:
logger.debug(
f"Could not generate endpoint url for {str(endpoint.name)},"
f" collection url: {str(collection.url)},"
f" endpoint operation id: {str(endpoint.operationId)}"
)
return self.config.serviceConnection.root.config.openAPISchemaURL
return AnyUrl(f"{str(collection.url)}/{endpoint.operationId}")
except Exception as err:
logger.warning(f"Error while generating collection url: {err}")
return None
def _generate_endpoint_url(self, endpoint_name: str) -> AnyUrl:
"""generate endpoint url"""
base_url = self.config.serviceConnection.root.config.openAPISchemaURL
if endpoint_name:
return AnyUrl(f"{base_url}#operation/{endpoint_name}")
return AnyUrl(base_url)
return self.config.serviceConnection.root.config.openAPISchemaURL
def _get_api_request_method(self, method_type: str) -> Optional[str]:
"""fetch endpoint request method"""

View File

@ -13,7 +13,7 @@ OpenAPI REST API Models
"""
from typing import Optional
from pydantic import AnyUrl, BaseModel, Field
from pydantic import AnyUrl, BaseModel
from metadata.generated.schema.entity.data.apiEndpoint import ApiRequestMethod
from metadata.generated.schema.type import basic
@ -32,10 +32,11 @@ class RESTCollection(BaseModel):
class RESTEndpoint(BaseModel):
"""REST endpoint model"""
name: Optional[str] = Field(None, alias="operationId")
name: Optional[str] = None
display_name: Optional[str] = None
description: Optional[basic.Markdown] = None
url: Optional[AnyUrl] = None
operationId: Optional[str] = None
request_method: Optional[ApiRequestMethod] = None
request_schema: Optional[APISchema] = None
response_schema: Optional[APISchema] = None

View File

@ -295,3 +295,18 @@ def filter_by_classification(
:return: True for filtering, False otherwise
"""
return _filter(classification_pattern, classification_name)
def filter_by_collection(
collection_pattern: Optional[FilterPattern], collection_name: str
) -> bool:
"""
Return True if the models needs to be filtered, False otherwise
Include takes precedence over exclude
:param collection_pattern: Model defining collection filtering logic
:param collection_name: collection name
:return: True for filtering, False otherwise
"""
return _filter(collection_pattern, collection_name)

View File

@ -12,9 +12,11 @@
Test REST/OpenAPI.
"""
from copy import deepcopy
from unittest import TestCase
from unittest.mock import patch
from pydantic import AnyUrl
from pydantic_core import Url
from metadata.generated.schema.api.data.createAPICollection import (
@ -35,7 +37,7 @@ from metadata.generated.schema.type.basic import (
)
from metadata.ingestion.api.models import Either
from metadata.ingestion.source.api.rest.metadata import RestSource
from metadata.ingestion.source.api.rest.models import RESTCollection
from metadata.ingestion.source.api.rest.models import RESTCollection, RESTEndpoint
mock_rest_config = {
"source": {
@ -45,6 +47,7 @@ mock_rest_config = {
"config": {
"type": "Rest",
"openAPISchemaURL": "https://petstore3.swagger.io/api/v3/openapi.json",
"docURL": "https://petstore3.swagger.io/",
}
},
"sourceConfig": {
@ -87,6 +90,20 @@ MOCK_COLLECTIONS = [
url=None,
),
]
MOCK_SINGLE_COLLECTION = RESTCollection(
name=EntityName(root="store"),
display_name=None,
description=Markdown(root="Access to Petstore orders"),
url=Url("https://petstore3.swagger.io/#/store"),
)
MOCK_SINGLE_ENDPOINT = RESTEndpoint(
name="/store/order/post",
display_name="/store/order",
description=Markdown(root="Place a new order in the store."),
url=None,
operationId="placeOrder",
)
MOCK_API_SERVICE = ApiService(
id="c3eb265f-5445-4ad3-ba5e-797d3a3071bb",
name="openapi_rest",
@ -99,11 +116,31 @@ EXPECTED_COLLECTION_REQUEST = [
right=CreateAPICollectionRequest(
name=EntityName(root="pet"),
description=Markdown(root="Everything about your Pets"),
endpointURL=Url("https://petstore3.swagger.io/api/v3/openapi.json#tag/pet"),
endpointURL=Url("https://petstore3.swagger.io/#/pet"),
service=FullyQualifiedEntityName(root="openapi_rest"),
)
)
]
MOCK_STORE_URL = AnyUrl("https://petstore3.swagger.io/#/store")
MOCK_STORE_ORDER_URL = AnyUrl("https://petstore3.swagger.io/#/store/placeOrder")
MOCK_JSON_RESPONSE = {
"paths": {
"/user/login": {
"get": {
"tags": ["user"],
"summary": "Logs user into the system",
"operationId": "loginUser",
}
}
},
"tags": [
{
"name": "pet",
"description": "Everything about your Pets",
},
{"name": "store", "description": "Access to Petstore orders"},
],
}
class RESTTest(TestCase):
@ -136,3 +173,79 @@ class RESTTest(TestCase):
"""test json schema"""
schema_content_type = self.rest_source.connection.headers.get("content-type")
assert "application/json" in schema_content_type
def test_all_collections(self):
with patch.object(
self.rest_source.connection, "json", return_value=MOCK_JSON_RESPONSE
):
collections = list(self.rest_source.get_api_collections())
MOCK_COLLECTIONS_COPY = deepcopy(MOCK_COLLECTIONS)
MOCK_COLLECTIONS_COPY[2].description = None
assert collections == MOCK_COLLECTIONS_COPY
def test_generate_collection_url(self):
"""test generate collection url"""
collection_url = self.rest_source._generate_collection_url("store")
assert collection_url == MOCK_STORE_URL
def test_generate_endpoint_url(self):
"""test generate endpoint url"""
endpoint_url = self.rest_source._generate_endpoint_url(
MOCK_SINGLE_COLLECTION, MOCK_SINGLE_ENDPOINT
)
assert endpoint_url == MOCK_STORE_ORDER_URL
@patch("metadata.ingestion.source.api.api_service.ApiServiceSource.test_connection")
def test_collection_filter_pattern(self, test_connection):
"""test collection filter pattern"""
test_connection.return_value = False
# Test with include pattern
include_config = deepcopy(mock_rest_config)
include_config["source"]["sourceConfig"]["config"][
"apiCollectionFilterPattern"
] = {"includes": ["pet.*"]}
rest_source_include = RestSource.create(
include_config["source"],
self.config.workflowConfig.openMetadataServerConfig,
)
collections_include = list(rest_source_include.get_api_collections())
assert len(collections_include) == 1
assert collections_include[0].name.root == "pet"
# Test with exclude pattern
exclude_config = deepcopy(mock_rest_config)
exclude_config["source"]["sourceConfig"]["config"][
"apiCollectionFilterPattern"
] = {"excludes": ["store.*"]}
rest_source_exclude = RestSource.create(
exclude_config["source"],
self.config.workflowConfig.openMetadataServerConfig,
)
collections_exclude = list(rest_source_exclude.get_api_collections())
assert len(collections_exclude) == 2
assert all(col.name.root != "store" for col in collections_exclude)
# Test with both include and exclude patterns
both_config = deepcopy(mock_rest_config)
both_config["source"]["sourceConfig"]["config"][
"apiCollectionFilterPattern"
] = {"includes": ["pet.*", "user.*"], "excludes": ["user.*"]}
rest_source_both = RestSource.create(
both_config["source"],
self.config.workflowConfig.openMetadataServerConfig,
)
collections_both = list(rest_source_both.get_api_collections())
assert len(collections_both) == 1
assert collections_both[0].name.root == "pet"
# Test with invalid pattern
invalid_config = deepcopy(mock_rest_config)
invalid_config["source"]["sourceConfig"]["config"][
"apiCollectionFilterPattern"
] = {"includes": ["invalid.*"]}
rest_source_invalid = RestSource.create(
invalid_config["source"],
self.config.workflowConfig.openMetadataServerConfig,
)
collections_invalid = list(rest_source_invalid.get_api_collections())
assert len(collections_invalid) == 0

View File

@ -33,6 +33,13 @@
"type": "string",
"format": "password"
},
"docURL": {
"expose": true,
"title": "docURL",
"description": "Documentation URL for the schema.",
"type": "string",
"format": "uri"
},
"apiCollectionFilterPattern": {
"description": "Regex to only fetch api collections with names matching the pattern.",
"$ref": "../../../../type/filterPattern.json#/definitions/filterPattern",

View File

@ -18,6 +18,10 @@ export interface RESTConnection {
* Regex to only fetch api collections with names matching the pattern.
*/
apiCollectionFilterPattern?: FilterPattern;
/**
* Documentation URL for the schema.
*/
docURL?: string;
/**
* Open API Schema URL.
*/