feat(openapi): Adding subtype for openapi source (#12873)

This commit is contained in:
Gabe Lyons 2025-03-18 12:35:46 -07:00 committed by GitHub
parent 78f4852d55
commit ecd9ffd137
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 301 additions and 169 deletions

View File

@ -25,6 +25,7 @@ class DatasetSubTypes(StrEnum):
NEO4J_NODE = "Neo4j Node"
NEO4J_RELATIONSHIP = "Neo4j Relationship"
SNOWFLAKE_STREAM = "Snowflake Stream"
API_ENDPOINT = "API Endpoint"
# TODO: Create separate entity...
NOTEBOOK = "Notebook"

View File

@ -2,13 +2,14 @@ import logging
import time
import warnings
from abc import ABC
from typing import Dict, Iterable, Optional, Tuple
from typing import Dict, Iterable, List, Optional, Tuple
from pydantic import validator
from pydantic.fields import Field
from datahub.configuration.common import ConfigModel
from datahub.emitter.mce_builder import make_tag_urn
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.decorators import (
SourceCapability,
@ -20,6 +21,7 @@ from datahub.ingestion.api.decorators import (
)
from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.common.subtypes import DatasetSubTypes
from datahub.ingestion.source.openapi_parser import (
clean_url,
compose_url_attr,
@ -32,14 +34,13 @@ from datahub.ingestion.source.openapi_parser import (
set_metadata,
try_guessing,
)
from datahub.metadata.com.linkedin.pegasus2avro.metadata.snapshot import DatasetSnapshot
from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
from datahub.metadata.schema_classes import (
AuditStampClass,
DatasetPropertiesClass,
GlobalTagsClass,
InstitutionalMemoryClass,
InstitutionalMemoryMetadataClass,
SubTypesClass,
TagAssociationClass,
)
@ -222,8 +223,9 @@ class APISource(Source, ABC):
def init_dataset(
self, endpoint_k: str, endpoint_dets: dict
) -> Tuple[DatasetSnapshot, str]:
) -> Tuple[str, str, List[MetadataWorkUnit]]:
config = self.config
workunits = []
dataset_name = endpoint_k[1:].replace("/", ".")
@ -233,22 +235,27 @@ class APISource(Source, ABC):
else:
dataset_name = "root"
dataset_snapshot = DatasetSnapshot(
urn=f"urn:li:dataset:(urn:li:dataPlatform:{self.platform},{config.name}.{dataset_name},PROD)",
aspects=[],
)
dataset_urn = f"urn:li:dataset:(urn:li:dataPlatform:{self.platform},{config.name}.{dataset_name},PROD)"
# adding description
dataset_properties = DatasetPropertiesClass(
# Create dataset properties aspect
properties = DatasetPropertiesClass(
description=endpoint_dets["description"], customProperties={}
)
dataset_snapshot.aspects.append(dataset_properties)
wu = MetadataWorkUnit(
id=dataset_name,
mcp=MetadataChangeProposalWrapper(entityUrn=dataset_urn, aspect=properties),
)
workunits.append(wu)
# adding tags
# Create tags aspect
tags_str = [make_tag_urn(t) for t in endpoint_dets["tags"]]
tags_tac = [TagAssociationClass(t) for t in tags_str]
gtc = GlobalTagsClass(tags_tac)
dataset_snapshot.aspects.append(gtc)
wu = MetadataWorkUnit(
id=f"{dataset_name}-tags",
mcp=MetadataChangeProposalWrapper(entityUrn=dataset_urn, aspect=gtc),
)
workunits.append(wu)
# the link will appear in the "documentation"
link_url = clean_url(config.url + self.url_basepath + endpoint_k)
@ -260,17 +267,25 @@ class APISource(Source, ABC):
url=link_url, description=link_description, createStamp=creation
)
inst_memory = InstitutionalMemoryClass([link_metadata])
dataset_snapshot.aspects.append(inst_memory)
wu = MetadataWorkUnit(
id=f"{dataset_name}-docs",
mcp=MetadataChangeProposalWrapper(
entityUrn=dataset_urn, aspect=inst_memory
),
)
workunits.append(wu)
return dataset_snapshot, dataset_name
# Create subtype aspect
sub_types = SubTypesClass(typeNames=[DatasetSubTypes.API_ENDPOINT])
wu = MetadataWorkUnit(
id=f"{dataset_name}-subtype",
mcp=MetadataChangeProposalWrapper(entityUrn=dataset_urn, aspect=sub_types),
)
workunits.append(wu)
def build_wu(
self, dataset_snapshot: DatasetSnapshot, dataset_name: str
) -> ApiWorkUnit:
mce = MetadataChangeEvent(proposedSnapshot=dataset_snapshot)
return ApiWorkUnit(id=dataset_name, mce=mce)
return dataset_name, dataset_urn, workunits
def get_workunits_internal(self) -> Iterable[ApiWorkUnit]:
def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
config = self.config
sw_dict = self.config.get_swagger()
@ -294,16 +309,24 @@ class APISource(Source, ABC):
if endpoint_k in config.ignore_endpoints:
continue
dataset_snapshot, dataset_name = self.init_dataset(
# Initialize dataset and get common aspects
dataset_name, dataset_urn, workunits = self.init_dataset(
endpoint_k, endpoint_dets
)
for wu in workunits:
yield wu
# adding dataset fields
# Handle schema metadata if available
if "data" in endpoint_dets.keys():
# we are lucky! data is defined in the swagger for this endpoint
schema_metadata = set_metadata(dataset_name, endpoint_dets["data"])
dataset_snapshot.aspects.append(schema_metadata)
yield self.build_wu(dataset_snapshot, dataset_name)
wu = MetadataWorkUnit(
id=f"{dataset_name}-schema",
mcp=MetadataChangeProposalWrapper(
entityUrn=dataset_urn, aspect=schema_metadata
),
)
yield wu
elif endpoint_dets["method"] != "get":
self.report.report_warning(
title="Failed to Extract Endpoint Metadata",
@ -338,9 +361,13 @@ class APISource(Source, ABC):
context=f"Endpoint Type: {endpoint_k}, Name: {dataset_name}",
)
schema_metadata = set_metadata(dataset_name, fields2add)
dataset_snapshot.aspects.append(schema_metadata)
yield self.build_wu(dataset_snapshot, dataset_name)
wu = MetadataWorkUnit(
id=f"{dataset_name}-schema",
mcp=MetadataChangeProposalWrapper(
entityUrn=dataset_urn, aspect=schema_metadata
),
)
yield wu
else:
self.report_bad_responses(response.status_code, type=endpoint_k)
else:
@ -369,9 +396,13 @@ class APISource(Source, ABC):
context=f"Endpoint Type: {endpoint_k}, Name: {dataset_name}",
)
schema_metadata = set_metadata(dataset_name, fields2add)
dataset_snapshot.aspects.append(schema_metadata)
yield self.build_wu(dataset_snapshot, dataset_name)
wu = MetadataWorkUnit(
id=f"{dataset_name}-schema",
mcp=MetadataChangeProposalWrapper(
entityUrn=dataset_urn, aspect=schema_metadata
),
)
yield wu
else:
self.report_bad_responses(response.status_code, type=endpoint_k)
else:
@ -400,9 +431,13 @@ class APISource(Source, ABC):
context=f"Endpoint Type: {endpoint_k}, Name: {dataset_name}",
)
schema_metadata = set_metadata(dataset_name, fields2add)
dataset_snapshot.aspects.append(schema_metadata)
yield self.build_wu(dataset_snapshot, dataset_name)
wu = MetadataWorkUnit(
id=f"{dataset_name}-schema",
mcp=MetadataChangeProposalWrapper(
entityUrn=dataset_urn, aspect=schema_metadata
),
)
yield wu
else:
self.report_bad_responses(response.status_code, type=endpoint_k)

View File

@ -1,145 +1,52 @@
[
{
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
"urn": "urn:li:dataset:(urn:li:dataPlatform:OpenApi,test_openapi.root,PROD)",
"aspects": [
{
"com.linkedin.pegasus2avro.dataset.DatasetProperties": {
"customProperties": {},
"description": "List API versions",
"tags": []
}
},
{
"com.linkedin.pegasus2avro.common.GlobalTags": {
"tags": []
}
},
{
"com.linkedin.pegasus2avro.common.InstitutionalMemory": {
"elements": [
{
"url": "https://raw.githubusercontent.com/OAI/learn.openapis.org/refs/heads/main/examples/",
"description": "Link to call for the dataset.",
"createStamp": {
"time": 1586847600,
"actor": "urn:li:corpuser:etl"
}
}
]
}
},
{
"com.linkedin.pegasus2avro.schema.SchemaMetadata": {
"schemaName": "root",
"platform": "urn:li:dataPlatform:api",
"version": 0,
"created": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
},
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
},
"hash": "",
"platformSchema": {
"com.linkedin.pegasus2avro.schema.OtherSchema": {
"rawSchema": ""
}
},
"fields": [
{
"fieldPath": "foo",
"nullable": false,
"description": "",
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"nativeDataType": "str",
"recursive": false,
"isPartOfKey": false
}
]
}
}
]
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:OpenApi,test_openapi.root,PROD)",
"changeType": "UPSERT",
"aspectName": "datasetProperties",
"aspect": {
"json": {
"customProperties": {},
"description": "List API versions",
"tags": []
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "openapi-2020_04_14-07_00_00-m1k7d5",
"runId": "openapi-2020_04_14-07_00_00-iqxees",
"lastRunId": "no-run-id-provided"
}
},
{
"proposedSnapshot": {
"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": {
"urn": "urn:li:dataset:(urn:li:dataPlatform:OpenApi,test_openapi.v2,PROD)",
"aspects": [
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:OpenApi,test_openapi.root,PROD)",
"changeType": "UPSERT",
"aspectName": "globalTags",
"aspect": {
"json": {
"tags": []
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "openapi-2020_04_14-07_00_00-iqxees",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:OpenApi,test_openapi.root,PROD)",
"changeType": "UPSERT",
"aspectName": "institutionalMemory",
"aspect": {
"json": {
"elements": [
{
"com.linkedin.pegasus2avro.dataset.DatasetProperties": {
"customProperties": {},
"description": "Show API version details",
"tags": []
}
},
{
"com.linkedin.pegasus2avro.common.GlobalTags": {
"tags": []
}
},
{
"com.linkedin.pegasus2avro.common.InstitutionalMemory": {
"elements": [
{
"url": "https://raw.githubusercontent.com/OAI/learn.openapis.org/refs/heads/main/examples/v2",
"description": "Link to call for the dataset.",
"createStamp": {
"time": 1586847600,
"actor": "urn:li:corpuser:etl"
}
}
]
}
},
{
"com.linkedin.pegasus2avro.schema.SchemaMetadata": {
"schemaName": "v2",
"platform": "urn:li:dataPlatform:api",
"version": 0,
"created": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
},
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
},
"hash": "",
"platformSchema": {
"com.linkedin.pegasus2avro.schema.OtherSchema": {
"rawSchema": ""
}
},
"fields": [
{
"fieldPath": "foo",
"nullable": false,
"description": "",
"type": {
"type": {
"com.linkedin.pegasus2avro.schema.StringType": {}
}
},
"nativeDataType": "str",
"recursive": false,
"isPartOfKey": false
}
]
"url": "https://raw.githubusercontent.com/OAI/learn.openapis.org/refs/heads/main/examples/",
"description": "Link to call for the dataset.",
"createStamp": {
"time": 1586847600,
"actor": "urn:li:corpuser:etl"
}
}
]
@ -147,7 +54,196 @@
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "openapi-2020_04_14-07_00_00-m1k7d5",
"runId": "openapi-2020_04_14-07_00_00-iqxees",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:OpenApi,test_openapi.root,PROD)",
"changeType": "UPSERT",
"aspectName": "subTypes",
"aspect": {
"json": {
"typeNames": [
"API Endpoint"
]
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "openapi-2020_04_14-07_00_00-iqxees",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:OpenApi,test_openapi.root,PROD)",
"changeType": "UPSERT",
"aspectName": "schemaMetadata",
"aspect": {
"json": {
"schemaName": "root",
"platform": "urn:li:dataPlatform:api",
"version": 0,
"created": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
},
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
},
"hash": "",
"platformSchema": {
"com.linkedin.schema.OtherSchema": {
"rawSchema": ""
}
},
"fields": [
{
"fieldPath": "foo",
"nullable": false,
"description": "",
"type": {
"type": {
"com.linkedin.schema.StringType": {}
}
},
"nativeDataType": "str",
"recursive": false,
"isPartOfKey": false
}
]
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "openapi-2020_04_14-07_00_00-iqxees",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:OpenApi,test_openapi.v2,PROD)",
"changeType": "UPSERT",
"aspectName": "datasetProperties",
"aspect": {
"json": {
"customProperties": {},
"description": "Show API version details",
"tags": []
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "openapi-2020_04_14-07_00_00-iqxees",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:OpenApi,test_openapi.v2,PROD)",
"changeType": "UPSERT",
"aspectName": "globalTags",
"aspect": {
"json": {
"tags": []
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "openapi-2020_04_14-07_00_00-iqxees",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:OpenApi,test_openapi.v2,PROD)",
"changeType": "UPSERT",
"aspectName": "institutionalMemory",
"aspect": {
"json": {
"elements": [
{
"url": "https://raw.githubusercontent.com/OAI/learn.openapis.org/refs/heads/main/examples/v2",
"description": "Link to call for the dataset.",
"createStamp": {
"time": 1586847600,
"actor": "urn:li:corpuser:etl"
}
}
]
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "openapi-2020_04_14-07_00_00-iqxees",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:OpenApi,test_openapi.v2,PROD)",
"changeType": "UPSERT",
"aspectName": "subTypes",
"aspect": {
"json": {
"typeNames": [
"API Endpoint"
]
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "openapi-2020_04_14-07_00_00-iqxees",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:OpenApi,test_openapi.v2,PROD)",
"changeType": "UPSERT",
"aspectName": "schemaMetadata",
"aspect": {
"json": {
"schemaName": "v2",
"platform": "urn:li:dataPlatform:api",
"version": 0,
"created": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
},
"lastModified": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
},
"hash": "",
"platformSchema": {
"com.linkedin.schema.OtherSchema": {
"rawSchema": ""
}
},
"fields": [
{
"fieldPath": "foo",
"nullable": false,
"description": "",
"type": {
"type": {
"com.linkedin.schema.StringType": {}
}
},
"nativeDataType": "str",
"recursive": false,
"isPartOfKey": false
}
]
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "openapi-2020_04_14-07_00_00-iqxees",
"lastRunId": "no-run-id-provided"
}
},
@ -163,7 +259,7 @@
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "openapi-2020_04_14-07_00_00-m1k7d5",
"runId": "openapi-2020_04_14-07_00_00-iqxees",
"lastRunId": "no-run-id-provided"
}
},
@ -179,8 +275,8 @@
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "openapi-2020_04_14-07_00_00-m1k7d5",
"runId": "openapi-2020_04_14-07_00_00-iqxees",
"lastRunId": "no-run-id-provided"
}
}
]
]