diff --git a/metadata-ingestion/src/datahub/ingestion/source/common/subtypes.py b/metadata-ingestion/src/datahub/ingestion/source/common/subtypes.py index bb6674fdc8..c9d2aa595c 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/common/subtypes.py +++ b/metadata-ingestion/src/datahub/ingestion/source/common/subtypes.py @@ -25,6 +25,7 @@ class DatasetSubTypes(StrEnum): NEO4J_NODE = "Neo4j Node" NEO4J_RELATIONSHIP = "Neo4j Relationship" SNOWFLAKE_STREAM = "Snowflake Stream" + API_ENDPOINT = "API Endpoint" # TODO: Create separate entity... NOTEBOOK = "Notebook" diff --git a/metadata-ingestion/src/datahub/ingestion/source/openapi.py b/metadata-ingestion/src/datahub/ingestion/source/openapi.py index b770113af5..3378211f38 100755 --- a/metadata-ingestion/src/datahub/ingestion/source/openapi.py +++ b/metadata-ingestion/src/datahub/ingestion/source/openapi.py @@ -2,13 +2,14 @@ import logging import time import warnings from abc import ABC -from typing import Dict, Iterable, Optional, Tuple +from typing import Dict, Iterable, List, Optional, Tuple from pydantic import validator from pydantic.fields import Field from datahub.configuration.common import ConfigModel from datahub.emitter.mce_builder import make_tag_urn +from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.api.decorators import ( SourceCapability, @@ -20,6 +21,7 @@ from datahub.ingestion.api.decorators import ( ) from datahub.ingestion.api.source import Source, SourceReport from datahub.ingestion.api.workunit import MetadataWorkUnit +from datahub.ingestion.source.common.subtypes import DatasetSubTypes from datahub.ingestion.source.openapi_parser import ( clean_url, compose_url_attr, @@ -32,14 +34,13 @@ from datahub.ingestion.source.openapi_parser import ( set_metadata, try_guessing, ) -from datahub.metadata.com.linkedin.pegasus2avro.metadata.snapshot import DatasetSnapshot -from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent from datahub.metadata.schema_classes import ( AuditStampClass, DatasetPropertiesClass, GlobalTagsClass, InstitutionalMemoryClass, InstitutionalMemoryMetadataClass, + SubTypesClass, TagAssociationClass, ) @@ -222,8 +223,9 @@ class APISource(Source, ABC): def init_dataset( self, endpoint_k: str, endpoint_dets: dict - ) -> Tuple[DatasetSnapshot, str]: + ) -> Tuple[str, str, List[MetadataWorkUnit]]: config = self.config + workunits = [] dataset_name = endpoint_k[1:].replace("/", ".") @@ -233,22 +235,27 @@ class APISource(Source, ABC): else: dataset_name = "root" - dataset_snapshot = DatasetSnapshot( - urn=f"urn:li:dataset:(urn:li:dataPlatform:{self.platform},{config.name}.{dataset_name},PROD)", - aspects=[], - ) + dataset_urn = f"urn:li:dataset:(urn:li:dataPlatform:{self.platform},{config.name}.{dataset_name},PROD)" - # adding description - dataset_properties = DatasetPropertiesClass( + # Create dataset properties aspect + properties = DatasetPropertiesClass( description=endpoint_dets["description"], customProperties={} ) - dataset_snapshot.aspects.append(dataset_properties) + wu = MetadataWorkUnit( + id=dataset_name, + mcp=MetadataChangeProposalWrapper(entityUrn=dataset_urn, aspect=properties), + ) + workunits.append(wu) - # adding tags + # Create tags aspect tags_str = [make_tag_urn(t) for t in endpoint_dets["tags"]] tags_tac = [TagAssociationClass(t) for t in tags_str] gtc = GlobalTagsClass(tags_tac) - dataset_snapshot.aspects.append(gtc) + wu = MetadataWorkUnit( + id=f"{dataset_name}-tags", + mcp=MetadataChangeProposalWrapper(entityUrn=dataset_urn, aspect=gtc), + ) + workunits.append(wu) # the link will appear in the "documentation" link_url = clean_url(config.url + self.url_basepath + endpoint_k) @@ -260,17 +267,25 @@ class APISource(Source, ABC): url=link_url, description=link_description, createStamp=creation ) inst_memory = InstitutionalMemoryClass([link_metadata]) - dataset_snapshot.aspects.append(inst_memory) + wu = MetadataWorkUnit( + id=f"{dataset_name}-docs", + mcp=MetadataChangeProposalWrapper( + entityUrn=dataset_urn, aspect=inst_memory + ), + ) + workunits.append(wu) - return dataset_snapshot, dataset_name + # Create subtype aspect + sub_types = SubTypesClass(typeNames=[DatasetSubTypes.API_ENDPOINT]) + wu = MetadataWorkUnit( + id=f"{dataset_name}-subtype", + mcp=MetadataChangeProposalWrapper(entityUrn=dataset_urn, aspect=sub_types), + ) + workunits.append(wu) - def build_wu( - self, dataset_snapshot: DatasetSnapshot, dataset_name: str - ) -> ApiWorkUnit: - mce = MetadataChangeEvent(proposedSnapshot=dataset_snapshot) - return ApiWorkUnit(id=dataset_name, mce=mce) + return dataset_name, dataset_urn, workunits - def get_workunits_internal(self) -> Iterable[ApiWorkUnit]: + def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: config = self.config sw_dict = self.config.get_swagger() @@ -294,16 +309,24 @@ class APISource(Source, ABC): if endpoint_k in config.ignore_endpoints: continue - dataset_snapshot, dataset_name = self.init_dataset( + # Initialize dataset and get common aspects + dataset_name, dataset_urn, workunits = self.init_dataset( endpoint_k, endpoint_dets ) + for wu in workunits: + yield wu - # adding dataset fields + # Handle schema metadata if available if "data" in endpoint_dets.keys(): # we are lucky! data is defined in the swagger for this endpoint schema_metadata = set_metadata(dataset_name, endpoint_dets["data"]) - dataset_snapshot.aspects.append(schema_metadata) - yield self.build_wu(dataset_snapshot, dataset_name) + wu = MetadataWorkUnit( + id=f"{dataset_name}-schema", + mcp=MetadataChangeProposalWrapper( + entityUrn=dataset_urn, aspect=schema_metadata + ), + ) + yield wu elif endpoint_dets["method"] != "get": self.report.report_warning( title="Failed to Extract Endpoint Metadata", @@ -338,9 +361,13 @@ class APISource(Source, ABC): context=f"Endpoint Type: {endpoint_k}, Name: {dataset_name}", ) schema_metadata = set_metadata(dataset_name, fields2add) - dataset_snapshot.aspects.append(schema_metadata) - - yield self.build_wu(dataset_snapshot, dataset_name) + wu = MetadataWorkUnit( + id=f"{dataset_name}-schema", + mcp=MetadataChangeProposalWrapper( + entityUrn=dataset_urn, aspect=schema_metadata + ), + ) + yield wu else: self.report_bad_responses(response.status_code, type=endpoint_k) else: @@ -369,9 +396,13 @@ class APISource(Source, ABC): context=f"Endpoint Type: {endpoint_k}, Name: {dataset_name}", ) schema_metadata = set_metadata(dataset_name, fields2add) - dataset_snapshot.aspects.append(schema_metadata) - - yield self.build_wu(dataset_snapshot, dataset_name) + wu = MetadataWorkUnit( + id=f"{dataset_name}-schema", + mcp=MetadataChangeProposalWrapper( + entityUrn=dataset_urn, aspect=schema_metadata + ), + ) + yield wu else: self.report_bad_responses(response.status_code, type=endpoint_k) else: @@ -400,9 +431,13 @@ class APISource(Source, ABC): context=f"Endpoint Type: {endpoint_k}, Name: {dataset_name}", ) schema_metadata = set_metadata(dataset_name, fields2add) - dataset_snapshot.aspects.append(schema_metadata) - - yield self.build_wu(dataset_snapshot, dataset_name) + wu = MetadataWorkUnit( + id=f"{dataset_name}-schema", + mcp=MetadataChangeProposalWrapper( + entityUrn=dataset_urn, aspect=schema_metadata + ), + ) + yield wu else: self.report_bad_responses(response.status_code, type=endpoint_k) diff --git a/metadata-ingestion/tests/integration/openapi/openapi_mces_golden.json b/metadata-ingestion/tests/integration/openapi/openapi_mces_golden.json index f74a0bb9e0..a13f26aacc 100755 --- a/metadata-ingestion/tests/integration/openapi/openapi_mces_golden.json +++ b/metadata-ingestion/tests/integration/openapi/openapi_mces_golden.json @@ -1,145 +1,52 @@ [ { - "proposedSnapshot": { - "com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": { - "urn": "urn:li:dataset:(urn:li:dataPlatform:OpenApi,test_openapi.root,PROD)", - "aspects": [ - { - "com.linkedin.pegasus2avro.dataset.DatasetProperties": { - "customProperties": {}, - "description": "List API versions", - "tags": [] - } - }, - { - "com.linkedin.pegasus2avro.common.GlobalTags": { - "tags": [] - } - }, - { - "com.linkedin.pegasus2avro.common.InstitutionalMemory": { - "elements": [ - { - "url": "https://raw.githubusercontent.com/OAI/learn.openapis.org/refs/heads/main/examples/", - "description": "Link to call for the dataset.", - "createStamp": { - "time": 1586847600, - "actor": "urn:li:corpuser:etl" - } - } - ] - } - }, - { - "com.linkedin.pegasus2avro.schema.SchemaMetadata": { - "schemaName": "root", - "platform": "urn:li:dataPlatform:api", - "version": 0, - "created": { - "time": 0, - "actor": "urn:li:corpuser:unknown" - }, - "lastModified": { - "time": 0, - "actor": "urn:li:corpuser:unknown" - }, - "hash": "", - "platformSchema": { - "com.linkedin.pegasus2avro.schema.OtherSchema": { - "rawSchema": "" - } - }, - "fields": [ - { - "fieldPath": "foo", - "nullable": false, - "description": "", - "type": { - "type": { - "com.linkedin.pegasus2avro.schema.StringType": {} - } - }, - "nativeDataType": "str", - "recursive": false, - "isPartOfKey": false - } - ] - } - } - ] + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:OpenApi,test_openapi.root,PROD)", + "changeType": "UPSERT", + "aspectName": "datasetProperties", + "aspect": { + "json": { + "customProperties": {}, + "description": "List API versions", + "tags": [] } }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "openapi-2020_04_14-07_00_00-m1k7d5", + "runId": "openapi-2020_04_14-07_00_00-iqxees", "lastRunId": "no-run-id-provided" } }, { - "proposedSnapshot": { - "com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": { - "urn": "urn:li:dataset:(urn:li:dataPlatform:OpenApi,test_openapi.v2,PROD)", - "aspects": [ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:OpenApi,test_openapi.root,PROD)", + "changeType": "UPSERT", + "aspectName": "globalTags", + "aspect": { + "json": { + "tags": [] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "openapi-2020_04_14-07_00_00-iqxees", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:OpenApi,test_openapi.root,PROD)", + "changeType": "UPSERT", + "aspectName": "institutionalMemory", + "aspect": { + "json": { + "elements": [ { - "com.linkedin.pegasus2avro.dataset.DatasetProperties": { - "customProperties": {}, - "description": "Show API version details", - "tags": [] - } - }, - { - "com.linkedin.pegasus2avro.common.GlobalTags": { - "tags": [] - } - }, - { - "com.linkedin.pegasus2avro.common.InstitutionalMemory": { - "elements": [ - { - "url": "https://raw.githubusercontent.com/OAI/learn.openapis.org/refs/heads/main/examples/v2", - "description": "Link to call for the dataset.", - "createStamp": { - "time": 1586847600, - "actor": "urn:li:corpuser:etl" - } - } - ] - } - }, - { - "com.linkedin.pegasus2avro.schema.SchemaMetadata": { - "schemaName": "v2", - "platform": "urn:li:dataPlatform:api", - "version": 0, - "created": { - "time": 0, - "actor": "urn:li:corpuser:unknown" - }, - "lastModified": { - "time": 0, - "actor": "urn:li:corpuser:unknown" - }, - "hash": "", - "platformSchema": { - "com.linkedin.pegasus2avro.schema.OtherSchema": { - "rawSchema": "" - } - }, - "fields": [ - { - "fieldPath": "foo", - "nullable": false, - "description": "", - "type": { - "type": { - "com.linkedin.pegasus2avro.schema.StringType": {} - } - }, - "nativeDataType": "str", - "recursive": false, - "isPartOfKey": false - } - ] + "url": "https://raw.githubusercontent.com/OAI/learn.openapis.org/refs/heads/main/examples/", + "description": "Link to call for the dataset.", + "createStamp": { + "time": 1586847600, + "actor": "urn:li:corpuser:etl" } } ] @@ -147,7 +54,196 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "openapi-2020_04_14-07_00_00-m1k7d5", + "runId": "openapi-2020_04_14-07_00_00-iqxees", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:OpenApi,test_openapi.root,PROD)", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "API Endpoint" + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "openapi-2020_04_14-07_00_00-iqxees", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:OpenApi,test_openapi.root,PROD)", + "changeType": "UPSERT", + "aspectName": "schemaMetadata", + "aspect": { + "json": { + "schemaName": "root", + "platform": "urn:li:dataPlatform:api", + "version": 0, + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "hash": "", + "platformSchema": { + "com.linkedin.schema.OtherSchema": { + "rawSchema": "" + } + }, + "fields": [ + { + "fieldPath": "foo", + "nullable": false, + "description": "", + "type": { + "type": { + "com.linkedin.schema.StringType": {} + } + }, + "nativeDataType": "str", + "recursive": false, + "isPartOfKey": false + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "openapi-2020_04_14-07_00_00-iqxees", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:OpenApi,test_openapi.v2,PROD)", + "changeType": "UPSERT", + "aspectName": "datasetProperties", + "aspect": { + "json": { + "customProperties": {}, + "description": "Show API version details", + "tags": [] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "openapi-2020_04_14-07_00_00-iqxees", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:OpenApi,test_openapi.v2,PROD)", + "changeType": "UPSERT", + "aspectName": "globalTags", + "aspect": { + "json": { + "tags": [] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "openapi-2020_04_14-07_00_00-iqxees", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:OpenApi,test_openapi.v2,PROD)", + "changeType": "UPSERT", + "aspectName": "institutionalMemory", + "aspect": { + "json": { + "elements": [ + { + "url": "https://raw.githubusercontent.com/OAI/learn.openapis.org/refs/heads/main/examples/v2", + "description": "Link to call for the dataset.", + "createStamp": { + "time": 1586847600, + "actor": "urn:li:corpuser:etl" + } + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "openapi-2020_04_14-07_00_00-iqxees", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:OpenApi,test_openapi.v2,PROD)", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "API Endpoint" + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "openapi-2020_04_14-07_00_00-iqxees", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:OpenApi,test_openapi.v2,PROD)", + "changeType": "UPSERT", + "aspectName": "schemaMetadata", + "aspect": { + "json": { + "schemaName": "v2", + "platform": "urn:li:dataPlatform:api", + "version": 0, + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "hash": "", + "platformSchema": { + "com.linkedin.schema.OtherSchema": { + "rawSchema": "" + } + }, + "fields": [ + { + "fieldPath": "foo", + "nullable": false, + "description": "", + "type": { + "type": { + "com.linkedin.schema.StringType": {} + } + }, + "nativeDataType": "str", + "recursive": false, + "isPartOfKey": false + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "openapi-2020_04_14-07_00_00-iqxees", "lastRunId": "no-run-id-provided" } }, @@ -163,7 +259,7 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "openapi-2020_04_14-07_00_00-m1k7d5", + "runId": "openapi-2020_04_14-07_00_00-iqxees", "lastRunId": "no-run-id-provided" } }, @@ -179,8 +275,8 @@ }, "systemMetadata": { "lastObserved": 1586847600000, - "runId": "openapi-2020_04_14-07_00_00-m1k7d5", + "runId": "openapi-2020_04_14-07_00_00-iqxees", "lastRunId": "no-run-id-provided" } } -] +] \ No newline at end of file