fix(ingest/unity): generate sibling and lineage (#9894)

This commit is contained in:
dushayntAW 2024-03-18 20:22:17 +05:30 committed by GitHub
parent ad4da5747d
commit 3a4bdef44a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 388 additions and 2047 deletions

View File

@ -15,6 +15,11 @@ source:
deny:
- ".*\\.unwanted_schema"
# emit_siblings: true
# delta_lake_options:
# platform_instance_name: null
# env: 'PROD'
# profiling:
# method: "analyze"
# enabled: true

View File

@ -60,6 +60,13 @@ class UnityCatalogProfilerConfig(ConfigModel):
)
class DeltaLakeDetails(ConfigModel):
platform_instance_name: Optional[str] = Field(
default=None, description="Delta-lake paltform instance name"
)
env: str = Field(default="PROD", description="Delta-lake environment")
class UnityCatalogAnalyzeProfilerConfig(UnityCatalogProfilerConfig):
method: Literal["analyze"] = "analyze"
@ -253,6 +260,16 @@ class UnityCatalogSourceConfig(
discriminator="method",
)
emit_siblings: bool = pydantic.Field(
default=True,
description="Whether to emit siblings relation with corresponding delta-lake platform's table. If enabled, this will also ingest the corresponding delta-lake table.",
)
delta_lake_options: DeltaLakeDetails = Field(
default=DeltaLakeDetails(),
description="Details about the delta lake, incase to emit siblings",
)
scheme: str = DATABRICKS
def get_sql_alchemy_url(self, database: Optional[str] = None) -> str:

View File

@ -41,7 +41,10 @@ from datahub.ingestion.api.source import (
TestConnectionReport,
)
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.aws.s3_util import make_s3_urn_for_lineage
from datahub.ingestion.source.aws.s3_util import (
make_s3_urn_for_lineage,
strip_s3_prefix,
)
from datahub.ingestion.source.common.subtypes import (
DatasetContainerSubTypes,
DatasetSubTypes,
@ -80,9 +83,13 @@ from datahub.ingestion.source.unity.proxy_types import (
)
from datahub.ingestion.source.unity.report import UnityCatalogReport
from datahub.ingestion.source.unity.usage import UnityCatalogUsageExtractor
from datahub.metadata.com.linkedin.pegasus2avro.common import Siblings
from datahub.metadata.com.linkedin.pegasus2avro.dataset import (
DatasetLineageType,
FineGrainedLineage,
FineGrainedLineageUpstreamType,
Upstream,
UpstreamLineage,
ViewProperties,
)
from datahub.metadata.schema_classes import (
@ -491,6 +498,25 @@ class UnityCatalogSource(StatefulIngestionSourceBase, TestableSource):
if table.view_definition:
self.view_definitions[dataset_urn] = (table.ref, table.view_definition)
# generate sibling and lineage aspects in case of EXTERNAL DELTA TABLE
if (
table_props.customProperties.get("table_type") == "EXTERNAL"
and table_props.customProperties.get("data_source_format") == "DELTA"
and self.config.emit_siblings
):
storage_location = str(table_props.customProperties.get("storage_location"))
if storage_location.startswith("s3://"):
browse_path = strip_s3_prefix(storage_location)
source_dataset_urn = make_dataset_urn_with_platform_instance(
"delta-lake",
browse_path,
self.config.delta_lake_options.platform_instance_name,
self.config.delta_lake_options.env,
)
yield from self.gen_siblings_workunit(dataset_urn, source_dataset_urn)
yield from self.gen_lineage_workunit(dataset_urn, source_dataset_urn)
yield from [
mcp.as_workunit()
for mcp in MetadataChangeProposalWrapper.construct_many(
@ -947,3 +973,38 @@ class UnityCatalogSource(StatefulIngestionSourceBase, TestableSource):
self.sql_parser_schema_resolver.close()
super().close()
def gen_siblings_workunit(
self,
dataset_urn: str,
source_dataset_urn: str,
) -> Iterable[MetadataWorkUnit]:
"""
Generate sibling workunit for both unity-catalog dataset and its connector source dataset
"""
yield MetadataChangeProposalWrapper(
entityUrn=dataset_urn,
aspect=Siblings(primary=False, siblings=[source_dataset_urn]),
).as_workunit()
yield MetadataChangeProposalWrapper(
entityUrn=source_dataset_urn,
aspect=Siblings(primary=True, siblings=[dataset_urn]),
).as_workunit(is_primary_source=False)
def gen_lineage_workunit(
self,
dataset_urn: str,
source_dataset_urn: str,
) -> Iterable[MetadataWorkUnit]:
"""
Generate dataset to source connector lineage workunit
"""
yield MetadataChangeProposalWrapper(
entityUrn=dataset_urn,
aspect=UpstreamLineage(
upstreams=[
Upstream(dataset=source_dataset_urn, type=DatasetLineageType.VIEW)
]
),
).as_workunit()

View File

@ -66,17 +66,6 @@ def register_mock_data(workspace_client):
workspace_client.catalogs.list.return_value = [
CatalogInfo.from_dict(d)
for d in [
{
"name": "main",
"owner": "account users",
"comment": "Main catalog (auto-created)",
"metastore_id": "2c983545-d403-4f87-9063-5b7e3b6d3736",
"created_at": 1666185153376,
"created_by": "abc@acryl.io",
"updated_at": 1666186071115,
"updated_by": "abc@acryl.io",
"catalog_type": "MANAGED_CATALOG",
},
{
"name": "quickstart_catalog",
"owner": "account users",
@ -87,50 +76,13 @@ def register_mock_data(workspace_client):
"updated_at": 1666186064332,
"updated_by": "abc@acryl.io",
"catalog_type": "MANAGED_CATALOG",
},
{
"name": "system",
"owner": SERVICE_PRINCIPAL_ID_2,
"comment": "System catalog (auto-created)",
"metastore_id": "2c983545-d403-4f87-9063-5b7e3b6d3736",
"created_at": 1666185153391,
"created_by": "System user",
"updated_at": 1666185153391,
"updated_by": "System user",
"catalog_type": "SYSTEM_CATALOG",
},
}
]
]
workspace_client.schemas.list.return_value = [
SchemaInfo.from_dict(d)
for d in [
{
"name": "default",
"catalog_name": "quickstart_catalog",
"owner": "abc@acryl.io",
"comment": "Default schema (auto-created)",
"metastore_id": "2c983545-d403-4f87-9063-5b7e3b6d3736",
"full_name": "quickstart_catalog.default",
"created_at": 1666185610021,
"created_by": "abc@acryl.io",
"updated_at": 1666185610021,
"updated_by": "abc@acryl.io",
"catalog_type": "MANAGED_CATALOG",
},
{
"name": "information_schema",
"catalog_name": "quickstart_catalog",
"owner": SERVICE_PRINCIPAL_ID_1,
"comment": "Information schema (auto-created)",
"metastore_id": "2c983545-d403-4f87-9063-5b7e3b6d3736",
"full_name": "quickstart_catalog.information_schema",
"created_at": 1666185610024,
"created_by": "System user",
"updated_at": 1666185610024,
"updated_by": "System user",
"catalog_type": "MANAGED_CATALOG",
},
{
"name": "quickstart_schema",
"catalog_name": "quickstart_catalog",
@ -199,7 +151,57 @@ def register_mock_data(workspace_client):
"updated_by": "abc@acryl.io",
"table_id": "cff27aa1-1c6a-4d78-b713-562c660c2896",
}
)
),
databricks.sdk.service.catalog.TableInfo.from_dict(
{
"name": "quickstart_table_external",
"catalog_name": "quickstart_catalog",
"schema_name": "quickstart_schema",
"table_type": "EXTERNAL",
"data_source_format": "DELTA",
"columns": [
{
"name": "columnA",
"type_text": "int",
"type_json": '{"name":"columnA","type":"integer","nullable":true,"metadata":{}}',
"type_name": "INT",
"type_precision": 0,
"type_scale": 0,
"position": 0,
"nullable": True,
},
{
"name": "columnB",
"type_text": "string",
"type_json": '{"name":"columnB","type":"string","nullable":true,"metadata":{}}',
"type_name": "STRING",
"type_precision": 0,
"type_scale": 0,
"position": 1,
"nullable": True,
},
],
"storage_location": "s3://db-02eec1f70bfe4115445be9fdb1aac6ac-s3-root-bucket/metastore/2c983545-d403-4f87-9063-5b7e3b6d3736/tables/cff27aa1-1c6a-4d78-b713-562c660c2896",
"owner": "account users",
"properties": {
"delta.lastCommitTimestamp": "1666185711000",
"delta.lastUpdateVersion": "1",
"delta.minReaderVersion": "1",
"delta.minWriterVersion": "2",
"spark.sql.statistics.numRows": "10",
"spark.sql.statistics.totalSize": "512",
},
"generation": 2,
"metastore_id": "2c983545-d403-4f87-9063-5b7e3b6d3736",
"full_name": "quickstart_catalog.quickstart_schema.quickstart_table_external",
"data_access_configuration_id": "00000000-0000-0000-0000-000000000000",
"created_at": 1666185698688,
"created_by": "abc@acryl.io",
"updated_at": 1666186049633,
"updated_by": "abc@acryl.io",
"table_id": "cff27aa1-1c6a-4d78-b713-562c660c2896",
}
),
]
workspace_client.tables.get = lambda *args, **kwargs: databricks.sdk.service.catalog.TableInfo.from_dict(
@ -409,6 +411,11 @@ def test_ingestion(pytestconfig, tmp_path, requests_mock):
"include_ownership": True,
"include_hive_metastore": True,
"warehouse_id": "test",
"emit_siblings": True,
"delta_lake_options": {
"platform_instance_name": None,
"env": "PROD",
},
"profiling": {
"enabled": True,
"method": "analyze",