feat(ingest/unity): support extracting ownership (#7801)

This commit is contained in:
Harshal Sheth 2023-04-13 08:15:41 +05:30 committed by GitHub
parent 6f5f8de99a
commit 204727a6ee
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 2190 additions and 1202 deletions

View File

@ -50,6 +50,11 @@ class UnityCatalogSourceConfig(StatefulIngestionConfigBase, DatasetSourceConfigM
description="Option to enable/disable lineage generation.",
)
include_table_ownership: bool = pydantic.Field(
default=False,
description="Option to enable/disable table ownership generation.",
)
include_column_lineage: Optional[bool] = pydantic.Field(
default=True,
description="Option to enable/disable lineage generation. Currently we have to call a rest call per column to get column level lineage due to the Databrick api which can slow down ingestion. ",

View File

@ -7,6 +7,7 @@ from datahub.emitter.mce_builder import (
make_dataset_urn_with_platform_instance,
make_domain_urn,
make_schema_field_urn,
make_user_urn,
)
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.mcp_builder import (
@ -57,6 +58,9 @@ from datahub.metadata.schema_classes import (
DatasetPropertiesClass,
DomainsClass,
MySqlDDLClass,
OwnerClass,
OwnershipClass,
OwnershipTypeClass,
SchemaFieldClass,
SchemaMetadataClass,
SubTypesClass,
@ -83,6 +87,9 @@ logger: logging.Logger = logging.getLogger(__name__)
@capability(SourceCapability.PLATFORM_INSTANCE, "Enabled by default")
@capability(SourceCapability.DOMAINS, "Supported via the `domain` config field")
@capability(SourceCapability.CONTAINERS, "Enabled by default")
@capability(
SourceCapability.OWNERSHIP, "Supported via the `include_table_ownership` config"
)
@capability(
SourceCapability.DELETION_DETECTION,
"Optionally enabled via `stateful_ingestion.remove_stale_metadata`",
@ -254,6 +261,8 @@ class UnityCatalogSource(StatefulIngestionSourceBase, TestableSource):
)
)
ownership = self._create_table_ownership_aspect(table)
if self.config.include_column_lineage:
self.unity_catalog_api_proxy.get_column_lineage(table)
lineage = self._generate_column_lineage_aspect(dataset_urn, table)
@ -271,6 +280,7 @@ class UnityCatalogSource(StatefulIngestionSourceBase, TestableSource):
sub_type,
schema_metadata,
domain,
ownership,
lineage,
],
)
@ -456,6 +466,20 @@ class UnityCatalogSource(StatefulIngestionSourceBase, TestableSource):
customProperties=custom_properties,
)
def _create_table_ownership_aspect(
self, table: proxy.Table
) -> Optional[OwnershipClass]:
if self.config.include_table_ownership and table.owner:
return OwnershipClass(
owners=[
OwnerClass(
owner=make_user_urn(table.owner),
type=OwnershipTypeClass.DATAOWNER,
)
]
)
return None
def _create_table_sub_type_aspect(self, table: proxy.Table) -> SubTypesClass:
return SubTypesClass(
typeNames=[

View File

@ -223,6 +223,7 @@ def test_ingestion(pytestconfig, tmp_path, requests_mock):
"config": {
"workspace_url": "https://dummy.cloud.databricks.com",
"token": "fake",
"include_table_ownership": True,
},
},
"sink": {