diff --git a/ingestion/src/metadata/examples/workflows/rest.yaml b/ingestion/src/metadata/examples/workflows/rest.yaml new file mode 100644 index 00000000000..d155eaea5db --- /dev/null +++ b/ingestion/src/metadata/examples/workflows/rest.yaml @@ -0,0 +1,20 @@ +source: + type: rest + serviceName: openapi_rest + serviceConnection: + config: + type: REST + openAPISchemaURL: https://docs.open-metadata.org/swagger.json + token: + sourceConfig: + config: + type: ApiMetadata +sink: + type: metadata-rest + config: {} +workflowConfig: + openMetadataServerConfig: + hostPort: http://localhost:8585/api + authProvider: openmetadata + securityConfig: + jwtToken: "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg" diff --git a/ingestion/src/metadata/ingestion/api/parser.py b/ingestion/src/metadata/ingestion/api/parser.py index 639c26492bf..84aa898844c 100644 --- a/ingestion/src/metadata/ingestion/api/parser.py +++ b/ingestion/src/metadata/ingestion/api/parser.py @@ -18,6 +18,10 @@ from pydantic import BaseModel, ValidationError from metadata.generated.schema.entity.automations.workflow import ( Workflow as AutomationWorkflow, ) +from metadata.generated.schema.entity.services.apiService import ( + ApiServiceConnection, + APIServiceType, +) from metadata.generated.schema.entity.services.dashboardService import ( DashboardConnection, DashboardServiceType, @@ -53,6 +57,10 @@ from metadata.generated.schema.entity.services.storageService import ( StorageConnection, StorageServiceType, ) +from metadata.generated.schema.metadataIngestion.apiServiceMetadataPipeline import ( + ApiMetadataConfigType, + ApiServiceMetadataPipeline, +) from metadata.generated.schema.metadataIngestion.dashboardServiceMetadataPipeline import ( DashboardMetadataConfigType, DashboardServiceMetadataPipeline, @@ -127,6 +135,7 @@ HAS_INNER_CONNECTION = {"Airflow"} # Build a service type map dynamically from JSON Schema covered types SERVICE_TYPE_MAP = { "Backend": PipelineConnection, # For Airflow backend + **{service: ApiServiceConnection for service in APIServiceType.__members__}, **{service: DatabaseConnection for service in DatabaseServiceType.__members__}, **{service: DashboardConnection for service in DashboardServiceType.__members__}, **{service: MessagingConnection for service in MessagingServiceType.__members__}, @@ -138,6 +147,7 @@ SERVICE_TYPE_MAP = { } SOURCE_CONFIG_CLASS_MAP = { + ApiMetadataConfigType.ApiMetadata.value: ApiServiceMetadataPipeline, DashboardMetadataConfigType.DashboardMetadata.value: DashboardServiceMetadataPipeline, ProfilerConfigType.Profiler.value: DatabaseServiceProfilerPipeline, DatabaseUsageConfigType.DatabaseUsage.value: DatabaseServiceQueryUsagePipeline, @@ -173,6 +183,7 @@ class InvalidWorkflowException(Exception): def get_service_type( source_type: str, ) -> Union[ + Type[ApiServiceConnection], Type[DashboardConnection], Type[DatabaseConnection], Type[MessagingConnection], @@ -196,6 +207,7 @@ def get_service_type( def get_source_config_class( source_config_type: str, ) -> Union[ + Type[ApiServiceMetadataPipeline], Type[DashboardServiceMetadataPipeline], Type[DatabaseServiceProfilerPipeline], Type[DatabaseServiceQueryUsagePipeline], @@ -221,6 +233,7 @@ def get_source_config_class( def get_connection_class( source_type: str, service_type: Union[ + Type[ApiServiceConnection], Type[DashboardConnection], Type[DatabaseConnection], Type[MessagingConnection], diff --git a/ingestion/src/metadata/ingestion/models/custom_types.py b/ingestion/src/metadata/ingestion/models/custom_types.py index 6de0ee04e15..11da3c1e123 100644 --- a/ingestion/src/metadata/ingestion/models/custom_types.py +++ b/ingestion/src/metadata/ingestion/models/custom_types.py @@ -14,6 +14,7 @@ Custom defined types from typing import NewType, Union +from metadata.generated.schema.entity.services.apiService import ApiService from metadata.generated.schema.entity.services.dashboardService import DashboardService from metadata.generated.schema.entity.services.databaseService import DatabaseService from metadata.generated.schema.entity.services.messagingService import MessagingService @@ -25,6 +26,7 @@ from metadata.generated.schema.entity.services.pipelineService import PipelineSe ServiceWithConnectionType = NewType( "ServiceWithConnectionType", Union[ + ApiService, DashboardService, DatabaseService, MessagingService, diff --git a/ingestion/src/metadata/ingestion/source/api/api_service.py b/ingestion/src/metadata/ingestion/source/api/api_service.py new file mode 100644 index 00000000000..38bddd26082 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/api/api_service.py @@ -0,0 +1,206 @@ +# Copyright 2024 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Base class for ingesting api services +""" +from abc import ABC, abstractmethod +from typing import Any, Iterable, Set + +from pydantic import Field +from typing_extensions import Annotated + +from metadata.generated.schema.api.data.createAPICollection import ( + CreateAPICollectionRequest, +) +from metadata.generated.schema.api.data.createAPIEndpoint import ( + CreateAPIEndpointRequest, +) +from metadata.generated.schema.entity.data.apiCollection import APICollection +from metadata.generated.schema.entity.data.apiEndpoint import APIEndpoint +from metadata.generated.schema.entity.services.apiService import ( + ApiService, + ApiServiceConnection, +) +from metadata.generated.schema.metadataIngestion.apiServiceMetadataPipeline import ( + ApiServiceMetadataPipeline, +) +from metadata.generated.schema.metadataIngestion.workflow import ( + Source as WorkflowSource, +) +from metadata.ingestion.api.delete import delete_entity_from_source +from metadata.ingestion.api.models import Either +from metadata.ingestion.api.steps import Source +from metadata.ingestion.api.topology_runner import TopologyRunnerMixin +from metadata.ingestion.models.delete_entity import DeleteEntity +from metadata.ingestion.models.topology import ( + NodeStage, + ServiceTopology, + TopologyContextManager, + TopologyNode, +) +from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.ingestion.source.connections import get_connection, get_test_connection_fn +from metadata.utils import fqn +from metadata.utils.logger import ingestion_logger + +logger = ingestion_logger() + + +class ApiServiceTopology(ServiceTopology): + """ + Defines the hierarchy in API Services. + service -> ApiCollection -> ApiEndpoint + + We could have a topology validator. We can only consume + data that has been produced by any parent node. + """ + + root: Annotated[ + TopologyNode, Field(description="Root node for the topology") + ] = TopologyNode( + producer="get_services", + stages=[ + NodeStage( + type_=ApiService, + context="api_service", + processor="yield_create_request_api_service", + overwrite=False, + must_return=True, + cache_entities=True, + ), + ], + children=["api_collection"], + post_process=["mark_api_collections_as_deleted"], + ) + api_collection: Annotated[ + TopologyNode, Field(description="API Collection Processing Node") + ] = TopologyNode( + producer="get_api_collections", + stages=[ + NodeStage( + type_=APICollection, + context="api_collections", + processor="yield_api_collection", + consumer=["api_service"], + use_cache=True, + ), + NodeStage( + type_=APIEndpoint, + context="api_endpoints", + processor="yield_api_endpoint", + consumer=["api_service"], + use_cache=True, + ), + ], + ) + + +class ApiServiceSource(TopologyRunnerMixin, Source, ABC): + """ + Base class for API services. + It implements the topology and context + """ + + source_config: ApiServiceMetadataPipeline + config: WorkflowSource + # Big union of types we want to fetch dynamically + service_connection: ApiServiceConnection.model_fields["config"].annotation + + topology = ApiServiceTopology() + context = TopologyContextManager(topology) + api_collection_source_state: Set = set() + api_endpoint_source_state: Set = set() + + def __init__( + self, + config: WorkflowSource, + metadata: OpenMetadata, + ): + super().__init__() + self.config = config + self.metadata = metadata + self.service_connection = self.config.serviceConnection.root.config + self.source_config: ApiServiceMetadataPipeline = self.config.sourceConfig.config + self.connection = get_connection(self.service_connection) + + # Flag the connection for the test connection + self.connection_obj = self.connection + self.test_connection() + + self.client = self.connection + + @property + def name(self) -> str: + return self.service_connection.type.name + + def get_services(self) -> Iterable[WorkflowSource]: + yield self.config + + def yield_create_request_api_service(self, config: WorkflowSource): + yield Either( + right=self.metadata.get_create_service_from_source( + entity=ApiService, config=config + ) + ) + + @abstractmethod + def get_api_collections(self, *args, **kwargs) -> Iterable[Any]: + """ + Method to list all collections to process. + Here is where filtering happens + """ + + @abstractmethod + def yield_api_collection( + self, *args, **kwargs + ) -> Iterable[Either[CreateAPICollectionRequest]]: + """Method to return api collection Entities""" + + @abstractmethod + def yield_api_endpoint( + self, *args, **kwargs + ) -> Iterable[Either[CreateAPIEndpointRequest]]: + """Method to return api endpoint Entities""" + + def close(self): + """By default, nothing to close""" + + def test_connection(self) -> None: + test_connection_fn = get_test_connection_fn(self.service_connection) + test_connection_fn(self.metadata, self.connection_obj, self.service_connection) + + def mark_api_collections_as_deleted(self) -> Iterable[Either[DeleteEntity]]: + """Method to mark the api collection as deleted""" + if self.source_config.markDeletedApiCollections: + yield from delete_entity_from_source( + metadata=self.metadata, + entity_type=APICollection, + entity_source_state=self.api_collection_source_state, + mark_deleted_entity=self.source_config.markDeletedApiCollections, + params={"service": self.context.get().api_service}, + ) + + def register_record(self, collection_request: CreateAPICollectionRequest) -> None: + """ + Mark the api collection record as scanned and update + the api_collection_source_state + """ + api_collection_fqn = fqn.build( + self.metadata, + entity_type=APICollection, + service_name=collection_request.service.root, + api_collection_name=collection_request.name.root, + ) + + self.api_collection_source_state.add(api_collection_fqn) + + def prepare(self): + """By default, nothing to prepare""" diff --git a/ingestion/src/metadata/ingestion/source/api/rest/connection.py b/ingestion/src/metadata/ingestion/source/api/rest/connection.py new file mode 100644 index 00000000000..055ecbb0c0a --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/api/rest/connection.py @@ -0,0 +1,87 @@ +# Copyright 2024 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Source connection handler +""" +from typing import Optional + +import requests +from requests.models import Response + +from metadata.generated.schema.entity.automations.workflow import ( + Workflow as AutomationWorkflow, +) +from metadata.generated.schema.entity.services.connections.apiService.restConnection import ( + RESTConnection, +) +from metadata.ingestion.connections.test_connections import test_connection_steps +from metadata.ingestion.ometa.ometa_api import OpenMetadata + + +class SchemaURLError(Exception): + """ + Class to indicate schema url is invalid + """ + + +class InvalidOpenAPISchemaError(Exception): + """ + Class to indicate openapi schema is invalid + """ + + +def get_connection(connection: RESTConnection) -> Response: + """ + Create connection + """ + if connection.token: + headers = {"Authorization": f"Bearer {connection.token.get_secret_value()}"} + return requests.get(connection.openAPISchemaURL, headers=headers) + return requests.get(connection.openAPISchemaURL) + + +def test_connection( + metadata: OpenMetadata, + client: Response, + service_connection: RESTConnection, + automation_workflow: Optional[AutomationWorkflow] = None, +) -> None: + """ + Test connection. This can be executed either as part + of a metadata workflow or during an Automation Workflow + """ + + def custom_url_exec(): + if ( + "application/json" in client.headers.get("content-type") + and client.status_code == 200 + ): + return [] + raise SchemaURLError( + "Failed to parse JSON schema url. Please check if provided url is valid JSON schema." + ) + + def custom_schema_exec(): + if client.json().get("openapi") is not None: + return [] + raise InvalidOpenAPISchemaError( + "Provided schema is not valid OpenAPI JSON schema" + ) + + test_fn = {"CheckURL": custom_url_exec, "CheckSchema": custom_schema_exec} + + test_connection_steps( + metadata=metadata, + test_fn=test_fn, + service_type=service_connection.type.value, + automation_workflow=automation_workflow, + ) diff --git a/ingestion/src/metadata/ingestion/source/api/rest/metadata.py b/ingestion/src/metadata/ingestion/source/api/rest/metadata.py new file mode 100644 index 00000000000..bd2776258fa --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/api/rest/metadata.py @@ -0,0 +1,278 @@ +# Copyright 2024 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""REST source module""" + +import traceback +from typing import Iterable, List, Optional + +from pydantic import AnyUrl + +from metadata.generated.schema.api.data.createAPICollection import ( + CreateAPICollectionRequest, +) +from metadata.generated.schema.api.data.createAPIEndpoint import ( + CreateAPIEndpointRequest, +) +from metadata.generated.schema.entity.data.apiCollection import APICollection +from metadata.generated.schema.entity.data.apiEndpoint import ApiRequestMethod +from metadata.generated.schema.entity.services.connections.apiService.restConnection import ( + RESTConnection, +) +from metadata.generated.schema.entity.services.ingestionPipelines.status import ( + StackTraceError, +) +from metadata.generated.schema.metadataIngestion.workflow import ( + Source as WorkflowSource, +) +from metadata.generated.schema.type.apiSchema import APISchema +from metadata.generated.schema.type.basic import FullyQualifiedEntityName +from metadata.generated.schema.type.schema import DataTypeTopic, FieldModel +from metadata.ingestion.api.models import Either +from metadata.ingestion.api.steps import InvalidSourceException +from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.ingestion.source.api.api_service import ApiServiceSource +from metadata.ingestion.source.api.rest.models import RESTCollection, RESTEndpoint +from metadata.utils import fqn +from metadata.utils.logger import ingestion_logger + +logger = ingestion_logger() + + +class RestSource(ApiServiceSource): + """ + Source implementation to ingest REST data. + + We will iterate on the registered collections, endpoints + and prepare an iterator of + """ + + def __init__(self, config: WorkflowSource, metadata: OpenMetadata): + super().__init__(config, metadata) + + @classmethod + def create( + cls, config_dict, metadata: OpenMetadata, pipeline_name: Optional[str] = None + ): + config: WorkflowSource = WorkflowSource.model_validate(config_dict) + connection: RESTConnection = config.serviceConnection.root.config + if not isinstance(connection, RESTConnection): + raise InvalidSourceException( + f"Expected RESTConnection, but got {connection}" + ) + return cls(config, metadata) + + def get_api_collections(self, *args, **kwargs) -> Iterable[RESTCollection]: + """ + Method to list all collections to process. + Here is where filtering happens + """ + try: + self.json_response = self.connection.json() + if self.json_response.get("tags", []): + # Works only if list of tags are present in schema so we can fetch collection names + for collection in self.json_response.get("tags", []): + if not collection.get("name"): + continue + yield RESTCollection(**collection) + else: + # in other case collect tags from paths because we have to yield collection/tags first + collections_set = set() + for path, methods in self.json_response.get("paths", {}).items(): + for method_type, info in methods.items(): + collections_set.update({tag for tag in info.get("tags", [])}) + for collection_name in collections_set: + data = {"name": collection_name} + yield RESTCollection(**data) + except Exception as err: + logger.error(f"Error while fetching collections from schema URL :{err}") + + def yield_api_collection( + self, collection: RESTCollection + ) -> Iterable[Either[CreateAPICollectionRequest]]: + """Method to return api collection Entities""" + try: + collection.url = self._generate_collection_url(collection.name.root) + collection_request = CreateAPICollectionRequest( + name=collection.name, + displayName=collection.display_name, + description=collection.description, + service=FullyQualifiedEntityName(self.context.get().api_service), + endpointURL=collection.url, + ) + yield Either(right=collection_request) + self.register_record(collection_request=collection_request) + except Exception as exc: + yield Either( + left=StackTraceError( + name=collection.name.root, + error=f"Error creating api collection request: {exc}", + stackTrace=traceback.format_exc(), + ) + ) + + def yield_api_endpoint( + self, collection: RESTCollection + ) -> Iterable[Either[CreateAPIEndpointRequest]]: + """Method to return api endpoint Entities""" + filtered_endpoints = self._filter_collection_endpoints(collection) or {} + for path, methods in filtered_endpoints.items(): + for method_type, info in methods.items(): + try: + endpoint = self._prepare_endpoint_data(path, method_type, info) + if not endpoint: + continue + yield Either( + right=CreateAPIEndpointRequest( + name=endpoint.name, + displayName=endpoint.display_name, + description=endpoint.description, + endpointURL=endpoint.url, + requestMethod=self._get_api_request_method(method_type), + requestSchema=self._get_request_schema(info), + responseSchema=self._get_response_schema(info), + apiCollection=FullyQualifiedEntityName( + fqn.build( + self.metadata, + entity_type=APICollection, + service_name=self.context.get().api_service, + api_collection_name=collection.name.root, + ) + ), + ) + ) + except Exception as exc: # pylint: disable=broad-except + yield Either( + left=StackTraceError( + name=endpoint.name, + error=f"Error creating API Endpoint request [{info.get('operationId')}]: {exc}", + stackTrace=traceback.format_exc(), + ) + ) + + def _filter_collection_endpoints( + self, collection: RESTCollection + ) -> Optional[dict]: + """filter endpoints related to specific collection""" + try: + filtered_paths = {} + for path, methods in self.json_response.get("paths", {}).items(): + for method_type, info in methods.items(): + if collection.name.root in info.get("tags", []): + # path & methods are part of collection + filtered_paths.update({path: methods}) + break + return filtered_paths + except Exception as err: + logger.info( + f"Error while filtering endpoints for collection {collection.name.root}" + ) + return None + + def _prepare_endpoint_data(self, path, method_type, info) -> Optional[RESTEndpoint]: + try: + endpoint = RESTEndpoint(**info) + endpoint.url = self._generate_endpoint_url(endpoint.name) + if not endpoint.name: + endpoint.name = f"{path} - {method_type}" + return endpoint + except Exception as err: + logger.info(f"Error while parsing endpoint data: {err}") + return None + + def _generate_collection_url(self, collection_name: str) -> Optional[AnyUrl]: + """generate collection url""" + try: + if collection_name: + return AnyUrl( + f"{self.config.serviceConnection.root.config.openAPISchemaURL}#tag/{collection_name}" + ) + except Exception as err: + logger.info(f"Error while generating collection url: {err}") + return None + + def _generate_endpoint_url(self, endpoint_name: str) -> AnyUrl: + """generate endpoint url""" + base_url = self.config.serviceConnection.root.config.openAPISchemaURL + if endpoint_name: + return AnyUrl(f"{base_url}#operation/{endpoint_name}") + else: + return AnyUrl(base_url) + + def _get_api_request_method(self, method_type: str) -> Optional[str]: + """fetch endpoint request method""" + try: + return ApiRequestMethod[method_type.upper()] + except KeyError as err: + logger.info(f"Keyerror while fetching request method: {err}") + return None + + def _get_request_schema(self, info: dict) -> Optional[APISchema]: + """fetch request schema""" + try: + schema_ref = ( + info.get("requestBody", {}) + .get("content", {}) + .get("application/json", {}) + .get("schema", {}) + .get("$ref") + ) + if not schema_ref: + logger.debug("No request schema found for the endpoint") + return None + return self._process_schema(schema_ref) + except Exception as err: + logger.info(f"Error while parsing request schema: {err}") + return None + + def _get_response_schema(self, info: dict) -> Optional[APISchema]: + """fetch response schema""" + try: + schema_ref = ( + info.get("responses", {}) + .get("200", {}) + .get("content", {}) + .get("application/json", {}) + .get("schema", {}) + .get("$ref", {}) + ) + if not schema_ref: + logger.debug("No response schema found for the endpoint") + return None + return self._process_schema(schema_ref) + except Exception as err: + logger.info(f"Error while parsing response schema: {err}") + return None + + def _process_schema(self, schema_ref: str) -> Optional[List[APISchema]]: + """process schema""" + try: + schema_ref = schema_ref.split("/")[-1] + schema_fields = ( + self.json_response.get("components").get("schemas").get(schema_ref) + ) + + fetched_fields = [] + for key, val in schema_fields.get("properties", {}).items(): + dtype = val.get("type") + if not dtype: + continue + fetched_fields.append( + FieldModel( + name=key, + dataType=DataTypeTopic[dtype.upper()] + if dtype.upper() in DataTypeTopic.__members__ + else DataTypeTopic.UNKNOWN, + ) + ) + return APISchema(schemaFields=fetched_fields) + except Exception as err: + logger.info(f"Error while processing request schema: {err}") + return None diff --git a/ingestion/src/metadata/ingestion/source/api/rest/models.py b/ingestion/src/metadata/ingestion/source/api/rest/models.py new file mode 100644 index 00000000000..3cb5a0bee46 --- /dev/null +++ b/ingestion/src/metadata/ingestion/source/api/rest/models.py @@ -0,0 +1,41 @@ +# Copyright 2024 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +OpenAPI REST API Models +""" +from typing import Optional + +from pydantic import AnyUrl, BaseModel, Field + +from metadata.generated.schema.entity.data.apiEndpoint import ApiRequestMethod +from metadata.generated.schema.type import basic +from metadata.generated.schema.type.apiSchema import APISchema + + +class RESTCollection(BaseModel): + """REST colleciton model""" + + name: basic.EntityName + display_name: Optional[str] = None + description: Optional[basic.Markdown] = None + url: Optional[AnyUrl] = None + + +class RESTEndpoint(BaseModel): + """REST endpoint model""" + + name: Optional[str] = Field(None, alias="operationId") + display_name: Optional[str] = None + description: Optional[basic.Markdown] = None + url: Optional[AnyUrl] = None + request_method: Optional[ApiRequestMethod] = None + request_schema: Optional[APISchema] = None + response_schema: Optional[APISchema] = None diff --git a/ingestion/src/metadata/utils/class_helper.py b/ingestion/src/metadata/utils/class_helper.py index 8157e252d89..281afb854db 100644 --- a/ingestion/src/metadata/utils/class_helper.py +++ b/ingestion/src/metadata/utils/class_helper.py @@ -22,6 +22,9 @@ from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipel PipelineType, ) from metadata.generated.schema.entity.services.serviceType import ServiceType +from metadata.generated.schema.metadataIngestion.apiServiceMetadataPipeline import ( + ApiServiceMetadataPipeline, +) from metadata.generated.schema.metadataIngestion.dashboardServiceMetadataPipeline import ( DashboardServiceMetadataPipeline, ) @@ -65,6 +68,7 @@ from metadata.generated.schema.metadataIngestion.testSuitePipeline import ( from metadata.generated.schema.metadataIngestion.workflow import SourceConfig SERVICE_TYPE_REF = { + ServiceType.API.value: "apiService", ServiceType.Database.value: "databaseService", ServiceType.Dashboard.value: "dashboardService", ServiceType.Pipeline.value: "pipelineService", @@ -76,6 +80,7 @@ SERVICE_TYPE_REF = { } SOURCE_CONFIG_TYPE_INGESTION = { + ApiServiceMetadataPipeline.__name__: PipelineType.metadata, DatabaseServiceMetadataPipeline.__name__: PipelineType.metadata, DatabaseServiceQueryUsagePipeline.__name__: PipelineType.usage, DatabaseServiceQueryLineagePipeline.__name__: PipelineType.lineage, diff --git a/ingestion/src/metadata/utils/constants.py b/ingestion/src/metadata/utils/constants.py index ebc15825276..fcf212d04f8 100644 --- a/ingestion/src/metadata/utils/constants.py +++ b/ingestion/src/metadata/utils/constants.py @@ -26,6 +26,7 @@ from metadata.generated.schema.entity.data.table import Table from metadata.generated.schema.entity.data.topic import Topic from metadata.generated.schema.entity.domains.dataProduct import DataProduct from metadata.generated.schema.entity.domains.domain import Domain +from metadata.generated.schema.entity.services.apiService import ApiService from metadata.generated.schema.entity.services.dashboardService import DashboardService from metadata.generated.schema.entity.services.databaseService import DatabaseService from metadata.generated.schema.entity.services.messagingService import MessagingService @@ -76,6 +77,7 @@ ENTITY_REFERENCE_CLASS_MAP = { # Service Entities "databaseService": DatabaseService, "messagingService": MessagingService, + "apiService": ApiService, "dashboardService": DashboardService, "pipelineService": PipelineService, "storageService": StorageService, diff --git a/ingestion/src/metadata/utils/fqn.py b/ingestion/src/metadata/utils/fqn.py index 25ebf1e3302..05fa11e08bd 100644 --- a/ingestion/src/metadata/utils/fqn.py +++ b/ingestion/src/metadata/utils/fqn.py @@ -27,6 +27,7 @@ from metadata.antlr.split_listener import FqnSplitListener from metadata.generated.antlr.FqnLexer import FqnLexer from metadata.generated.antlr.FqnParser import FqnParser from metadata.generated.schema.entity.classification.tag import Tag +from metadata.generated.schema.entity.data.apiCollection import APICollection from metadata.generated.schema.entity.data.chart import Chart from metadata.generated.schema.entity.data.container import Container from metadata.generated.schema.entity.data.dashboard import Dashboard @@ -250,6 +251,20 @@ def _( return _build(service_name, dashboard_name) +@fqn_build_registry.add(APICollection) +def _( + _: Optional[OpenMetadata], # ES Index not necessary for dashboard FQN building + *, + service_name: str, + api_collection_name: str, +) -> str: + if not service_name or not api_collection_name: + raise FQNBuildingException( + f"Args should be informed, but got service=`{service_name}`, collection=`{api_collection_name}``" + ) + return _build(service_name, api_collection_name) + + @fqn_build_registry.add(Chart) def _( _: Optional[OpenMetadata], # ES Index not necessary for dashboard FQN building diff --git a/ingestion/tests/unit/test_workflow_parse.py b/ingestion/tests/unit/test_workflow_parse.py index 5803fd84f4d..375dfb717e4 100644 --- a/ingestion/tests/unit/test_workflow_parse.py +++ b/ingestion/tests/unit/test_workflow_parse.py @@ -42,6 +42,9 @@ from metadata.generated.schema.entity.services.messagingService import ( MessagingConnection, ) from metadata.generated.schema.entity.services.metadataService import MetadataConnection +from metadata.generated.schema.metadataIngestion.apiServiceMetadataPipeline import ( + ApiServiceMetadataPipeline, +) from metadata.generated.schema.metadataIngestion.dashboardServiceMetadataPipeline import ( DashboardServiceMetadataPipeline, ) @@ -132,6 +135,10 @@ class TestWorkflowParse(TestCase): connection = get_source_config_class(source_config_type) self.assertEqual(connection, DashboardServiceMetadataPipeline) + source_config_type = "ApiMetadata" + connection = get_source_config_class(source_config_type) + self.assertEqual(connection, ApiServiceMetadataPipeline) + def test_parsing_ok(self): """ Test MSSQL JSON Config parsing OK diff --git a/ingestion/tests/unit/topology/api/test_rest.py b/ingestion/tests/unit/topology/api/test_rest.py new file mode 100644 index 00000000000..8e6d8e69f17 --- /dev/null +++ b/ingestion/tests/unit/topology/api/test_rest.py @@ -0,0 +1,138 @@ +# Copyright 2024 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Test REST/OpenAPI. +""" + +from unittest import TestCase +from unittest.mock import patch + +from pydantic_core import Url + +from metadata.generated.schema.api.data.createAPICollection import ( + CreateAPICollectionRequest, +) +from metadata.generated.schema.entity.services.apiService import ( + ApiService, + ApiServiceConnection, + APIServiceType, +) +from metadata.generated.schema.metadataIngestion.workflow import ( + OpenMetadataWorkflowConfig, +) +from metadata.generated.schema.type.basic import ( + EntityName, + FullyQualifiedEntityName, + Markdown, +) +from metadata.ingestion.api.models import Either +from metadata.ingestion.source.api.rest.metadata import RestSource +from metadata.ingestion.source.api.rest.models import RESTCollection + +mock_rest_config = { + "source": { + "type": "rest", + "serviceName": "openapi_rest", + "serviceConnection": { + "config": { + "type": "REST", + "openAPISchemaURL": "https://petstore3.swagger.io/api/v3/openapi.json", + } + }, + "sourceConfig": { + "config": { + "type": "ApiMetadata", + } + }, + }, + "sink": { + "type": "metadata-rest", + "config": {}, + }, + "workflowConfig": { + "openMetadataServerConfig": { + "hostPort": "http://localhost:8585/api", + "authProvider": "openmetadata", + "securityConfig": { + "jwtToken": "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg" + }, + } + }, +} +MOCK_COLLECTIONS = [ + RESTCollection( + name=EntityName(root="pet"), + display_name=None, + description=Markdown(root="Everything about your Pets"), + url=None, + ), + RESTCollection( + name=EntityName(root="store"), + display_name=None, + description=Markdown(root="Access to Petstore orders"), + url=None, + ), + RESTCollection( + name=EntityName(root="user"), + display_name=None, + description=Markdown(root="Operations about user"), + url=None, + ), +] +MOCK_API_SERVICE = ApiService( + id="c3eb265f-5445-4ad3-ba5e-797d3a3071bb", + name="openapi_rest", + fullyQualifiedName=FullyQualifiedEntityName("openapi_rest"), + connection=ApiServiceConnection(), + serviceType=APIServiceType.REST, +) +EXPECTED_COLLECTION_REQUEST = [ + Either( + right=CreateAPICollectionRequest( + name=EntityName(root="pet"), + description=Markdown(root="Everything about your Pets"), + endpointURL=Url("https://petstore3.swagger.io/api/v3/openapi.json#tag/pet"), + service=FullyQualifiedEntityName(root="openapi_rest"), + ) + ) +] + + +class RESTTest(TestCase): + @patch("metadata.ingestion.source.api.api_service.ApiServiceSource.test_connection") + def __init__(self, methodName, test_connection) -> None: + super().__init__(methodName) + test_connection.return_value = False + self.config = OpenMetadataWorkflowConfig.model_validate(mock_rest_config) + self.rest_source = RestSource.create( + mock_rest_config["source"], + self.config.workflowConfig.openMetadataServerConfig, + ) + self.rest_source.context.get().__dict__[ + "api_service" + ] = MOCK_API_SERVICE.fullyQualifiedName.root + + def test_get_api_collections(self): + """test get api collections""" + collections = list(self.rest_source.get_api_collections()) + assert collections == MOCK_COLLECTIONS + + def test_yield_api_collection(self): + """test yield api collections""" + collection_request = list( + self.rest_source.yield_api_collection(MOCK_COLLECTIONS[0]) + ) + assert collection_request == EXPECTED_COLLECTION_REQUEST + + def test_json_schema(self): + """test json schema""" + schema_content_type = self.rest_source.connection.headers.get("content-type") + assert "application/json" in schema_content_type diff --git a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/common.py b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/common.py index f9f1814f8a3..c3c07433e5a 100644 --- a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/common.py +++ b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/common.py @@ -22,6 +22,7 @@ from openmetadata_managed_apis.api.utils import clean_dag_id from pydantic import ValidationError from requests.utils import quote +from metadata.generated.schema.entity.services.apiService import ApiService from metadata.generated.schema.entity.services.dashboardService import DashboardService from metadata.generated.schema.entity.services.databaseService import DatabaseService from metadata.generated.schema.entity.services.messagingService import MessagingService @@ -71,6 +72,7 @@ from metadata.workflow.metadata import MetadataWorkflow logger = workflow_logger() ENTITY_CLASS_MAP = { + "apiService": ApiService, "databaseService": DatabaseService, "pipelineService": PipelineService, "dashboardService": DashboardService, diff --git a/openmetadata-docs/content/v1.6.x-SNAPSHOT/connectors/api/rest/index.md b/openmetadata-docs/content/v1.6.x-SNAPSHOT/connectors/api/rest/index.md new file mode 100644 index 00000000000..fa7db23a26b --- /dev/null +++ b/openmetadata-docs/content/v1.6.x-SNAPSHOT/connectors/api/rest/index.md @@ -0,0 +1,62 @@ +--- +title: REST +slug: /connectors/api/rest +--- + +{% connectorDetailsHeader +name="REST" +stage="BETA" +platform="OpenMetadata" +availableFeatures=["API Endpoint", "Request Schema", "Response Schema"] +unavailableFeatures=[] +/ %} + +In this section, we provide guides and references to use the OpenAPI/REST connector. + +Configure and schedule REST metadata workflows from the OpenMetadata UI: + +- [Requirements](#requirements) +- [Metadata Ingestion](#metadata-ingestion) + +{% partial file="/v1.6/connectors/ingestion-modes-tiles.md" variables={yamlPath: "/connectors/api/rest/yaml"} /%} + +## Requirements + +### Generate OpenAPI Schema URL + +- Generate OpenAPI schema url for your service[OpenAPI spec](https://swagger.io/specification/#openapi-document) + + +## Metadata Ingestion + +{% partial + file="/v1.6/connectors/metadata-ingestion-ui.md" + variables={ + connector: "VertexAI", + selectServicePath: "/images/v1.6/connectors/rest/select-service.png", + addNewServicePath: "/images/v1.6/connectors/rest/add-new-service.png", + serviceConnectionPath: "/images/v1.6/connectors/rest/service-connection.png", +} +/%} + +{% stepsContainer %} +{% extraContent parentTagName="stepsContainer" %} + +#### Connection Options + +**OpenAPI Schema URL**: +An OpenAPI schema URL typically refers to the URL where the OpenAPI Specification (OAS) document of a web service is hosted. The document defines the service's API, including available endpoints, request/response formats, authentication methods, etc. It is usually in JSON format. for e.g. `https://petstore3.swagger.io/api/v3/openapi.json` + +**Token**: An authentication token to connect to an OpenAPI schema URL. It is only required if the API schema is protected or secured. + + +{% /extraContent %} + +{% partial file="/v1.6/connectors/test-connection.md" /%} + + +{% partial file="/v1.6/connectors/ingestion-schedule-and-deploy.md" /%} + +{% /stepsContainer %} + +{% partial file="/v1.6/connectors/troubleshooting.md" /%} \ No newline at end of file diff --git a/openmetadata-docs/content/v1.6.x-SNAPSHOT/connectors/api/rest/yaml.md b/openmetadata-docs/content/v1.6.x-SNAPSHOT/connectors/api/rest/yaml.md new file mode 100644 index 00000000000..325f3aa1067 --- /dev/null +++ b/openmetadata-docs/content/v1.6.x-SNAPSHOT/connectors/api/rest/yaml.md @@ -0,0 +1,84 @@ +--- +title: Run the OpenAPI/REST Connector Externally +slug: /connectors/api/rest/yaml +--- + +{% connectorDetailsHeader +name="REST" +stage="BETA" +platform="OpenMetadata" +availableFeatures=["API Endpoint", "Request Schema", "Response Schema"] +unavailableFeatures=[] +/ %} + +In this section, we provide guides and references to use the OpenAPI/REST connector. + +Configure and schedule REST metadata workflows from the OpenMetadata UI: + +- [Requirements](#requirements) +- [Metadata Ingestion](#metadata-ingestion) + +{% partial file="/v1.6/connectors/external-ingestion-deployment.md" /%} + +## Requirements + +### Python Requirements + +{% partial file="/v1.6/connectors/python-requirements.md" /%} + + +### Generate OpenAPI Schema URL + +- Generate OpenAPI schema url for your service[OpenAPI spec](https://swagger.io/specification/#openapi-document) + + +## Metadata Ingestion + +### 1. Define the YAML Config + +This is a sample config for OpenAPI: + +{% codePreview %} + +{% codeInfoContainer %} + +#### Source Configuration - Service Connection + +{% codeInfo srNumber=1 %} + +**OpenAPI Schema URL**: +An OpenAPI schema URL typically refers to the URL where the OpenAPI Specification (OAS) document of a web service is hosted. The document defines the service's API, including available endpoints, request/response formats, authentication methods, etc. It is usually in JSON format. for e.g. `https://petstore3.swagger.io/api/v3/openapi.json` + +**Token**: An authentication token to connect to an OpenAPI schema URL. It is only required if the API schema is protected or secured. + +{% /codeInfo %} + +{% /codeInfoContainer %} + +{% codeBlock fileName="filename.yaml" %} + +```yaml {% isCodeBlock=true %} +source: + type: rest + serviceName: openapi_rest + serviceConnection: + config: + type: REST +``` +```yaml {% srNumber=1 %} + openAPISchemaURL: https://docs.open-metadata.org/swagger.json + +``` + + +{% partial file="/v1.6/connectors/yaml/database/source-config.md" /%} + +{% partial file="/v1.6/connectors/yaml/ingestion-sink.md" /%} + +{% partial file="/v1.6/connectors/yaml/workflow-config.md" /%} + +{% /codeBlock %} + +{% /codePreview %} + +{% partial file="/v1.6/connectors/yaml/ingestion-cli.md" /%} diff --git a/openmetadata-docs/content/v1.6.x-SNAPSHOT/menu.md b/openmetadata-docs/content/v1.6.x-SNAPSHOT/menu.md index 0dee7b42eb9..0df67141eac 100644 --- a/openmetadata-docs/content/v1.6.x-SNAPSHOT/menu.md +++ b/openmetadata-docs/content/v1.6.x-SNAPSHOT/menu.md @@ -218,6 +218,12 @@ site_menu: color: violet-70 icon: openmetadata + - category: Connectors / API + url: /connectors/api + - category: Connectors / API / REST + url: /connectors/api/rest + - category: Connectors / API / REST / Run Externally + url: /connectors/api/rest/yaml - category: Connectors / Database url: /connectors/database - category: Connectors / Database / Athena diff --git a/openmetadata-docs/images/v1.6/connectors/rest/add-new-service.png b/openmetadata-docs/images/v1.6/connectors/rest/add-new-service.png new file mode 100644 index 00000000000..3be50f090c6 Binary files /dev/null and b/openmetadata-docs/images/v1.6/connectors/rest/add-new-service.png differ diff --git a/openmetadata-docs/images/v1.6/connectors/rest/select-service.png b/openmetadata-docs/images/v1.6/connectors/rest/select-service.png new file mode 100644 index 00000000000..07613edaa66 Binary files /dev/null and b/openmetadata-docs/images/v1.6/connectors/rest/select-service.png differ diff --git a/openmetadata-docs/images/v1.6/connectors/rest/service-connection.png b/openmetadata-docs/images/v1.6/connectors/rest/service-connection.png new file mode 100644 index 00000000000..985c5cced4e Binary files /dev/null and b/openmetadata-docs/images/v1.6/connectors/rest/service-connection.png differ diff --git a/openmetadata-service/src/main/resources/json/data/testConnections/api/rest.json b/openmetadata-service/src/main/resources/json/data/testConnections/api/rest.json new file mode 100644 index 00000000000..a2ab1060f47 --- /dev/null +++ b/openmetadata-service/src/main/resources/json/data/testConnections/api/rest.json @@ -0,0 +1,21 @@ +{ + "name": "REST", + "displayName": "REST Test Connection", + "description": "This Test Connection validates the schema provided for openapi", + "steps": [ + { + "name": "CheckURL", + "description": "Checks if url is valid json/yaml schema url or not", + "errorMessage": "Failed to validate the url, please check the url", + "shortCircuit": true, + "mandatory": true + }, + { + "name": "CheckSchema", + "description": "Checks if schema is valid openapi schema or not", + "errorMessage": "Failed to validate the schema with OpenAPI. please validate the config", + "shortCircuit": true, + "mandatory": true + } + ] +} \ No newline at end of file diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/resources/services/APIServiceResourceTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/resources/services/APIServiceResourceTest.java index a021eb97cd8..a1efa41da3a 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/resources/services/APIServiceResourceTest.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/resources/services/APIServiceResourceTest.java @@ -51,7 +51,7 @@ public class APIServiceResourceTest extends ServiceResourceTest