feat(ingest/databricks): view upstream lineage for hive metastore (#9657)

This commit is contained in:
Mayuri Nehate 2024-01-22 18:07:43 +05:30 committed by GitHub
parent ad65c36ddc
commit 77df9ec926
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 159 additions and 3 deletions

View File

@ -55,7 +55,6 @@ TABLE_STAT_LIST = {ROWS, BYTES}
class HiveMetastoreProxy(Closeable):
# TODO: Support for view lineage using SQL parsing
# Why not use hive ingestion source directly here ?
# 1. hive ingestion source assumes 2-level namespace heirarchy and currently
# there is no other intermediate interface except sqlalchemy inspector

View File

@ -1,7 +1,7 @@
import logging
import re
from concurrent.futures import ThreadPoolExecutor
from typing import Dict, Iterable, List, Optional, Set, Union
from typing import Dict, Iterable, List, Optional, Set, Tuple, Union
from urllib.parse import urljoin
from datahub.emitter.mce_builder import (
@ -24,6 +24,7 @@ from datahub.emitter.mcp_builder import (
add_dataset_to_container,
gen_containers,
)
from datahub.emitter.sql_parsing_builder import SqlParsingBuilder
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.decorators import (
SupportStatus,
@ -67,6 +68,7 @@ from datahub.ingestion.source.unity.proxy_types import (
DATA_TYPE_REGISTRY,
Catalog,
Column,
CustomCatalogType,
Metastore,
Notebook,
NotebookId,
@ -104,6 +106,12 @@ from datahub.metadata.schema_classes import (
from datahub.utilities.file_backed_collections import FileBackedDict
from datahub.utilities.hive_schema_to_avro import get_schema_fields_for_hive_column
from datahub.utilities.registries.domain_registry import DomainRegistry
from datahub.utilities.sqlglot_lineage import (
SchemaResolver,
SqlParsingResult,
sqlglot_lineage,
view_definition_lineage_helper,
)
logger: logging.Logger = logging.getLogger(__name__)
@ -137,6 +145,7 @@ class UnityCatalogSource(StatefulIngestionSourceBase, TestableSource):
unity_catalog_api_proxy: UnityCatalogApiProxy
platform: str = "databricks"
platform_instance_name: Optional[str]
sql_parser_schema_resolver: Optional[SchemaResolver] = None
def get_report(self) -> UnityCatalogReport:
return self.report
@ -179,6 +188,9 @@ class UnityCatalogSource(StatefulIngestionSourceBase, TestableSource):
self.table_refs: Set[TableReference] = set()
self.view_refs: Set[TableReference] = set()
self.notebooks: FileBackedDict[Notebook] = FileBackedDict()
self.view_definitions: FileBackedDict[
Tuple[TableReference, str]
] = FileBackedDict()
# Global map of tables, for profiling
self.tables: FileBackedDict[Table] = FileBackedDict()
@ -191,6 +203,13 @@ class UnityCatalogSource(StatefulIngestionSourceBase, TestableSource):
self.config.get_sql_alchemy_url(HIVE_METASTORE), self.config.options
)
self.report.hive_metastore_catalog_found = True
if self.config.include_table_lineage:
self.sql_parser_schema_resolver = SchemaResolver(
platform=self.platform,
platform_instance=self.config.platform_instance,
env=self.config.env,
)
except Exception as e:
logger.debug("Exception", exc_info=True)
self.warn(
@ -243,6 +262,8 @@ class UnityCatalogSource(StatefulIngestionSourceBase, TestableSource):
yield from self.process_metastores()
yield from self.get_view_lineage()
if self.config.include_notebooks:
self.report.report_ingestion_stage_start("Notebook lineage")
for notebook in self.notebooks.values():
@ -304,7 +325,6 @@ class UnityCatalogSource(StatefulIngestionSourceBase, TestableSource):
yield from self._gen_notebook_workunits(notebook)
def _gen_notebook_workunits(self, notebook: Notebook) -> Iterable[MetadataWorkUnit]:
properties = {"path": notebook.path}
if notebook.language:
properties["language"] = notebook.language.value
@ -449,6 +469,17 @@ class UnityCatalogSource(StatefulIngestionSourceBase, TestableSource):
table.ref, self.notebooks[str(notebook_id)]
)
# Sql parsing is required only for hive metastore view lineage
if (
self.sql_parser_schema_resolver
and table.schema.catalog.type == CustomCatalogType.HIVE_METASTORE_CATALOG
):
self.sql_parser_schema_resolver.add_schema_metadata(
dataset_urn, schema_metadata
)
if table.view_definition:
self.view_definitions[dataset_urn] = (table.ref, table.view_definition)
yield from [
mcp.as_workunit()
for mcp in MetadataChangeProposalWrapper.construct_many(
@ -828,8 +859,74 @@ class UnityCatalogSource(StatefulIngestionSourceBase, TestableSource):
)
]
def _run_sql_parser(
self, view_ref: TableReference, query: str, schema_resolver: SchemaResolver
) -> Optional[SqlParsingResult]:
raw_lineage = sqlglot_lineage(
query,
schema_resolver=schema_resolver,
default_db=view_ref.catalog,
default_schema=view_ref.schema,
)
view_urn = self.gen_dataset_urn(view_ref)
if raw_lineage.debug_info.table_error:
logger.debug(
f"Failed to parse lineage for view {view_ref}: "
f"{raw_lineage.debug_info.table_error}"
)
self.report.num_view_definitions_failed_parsing += 1
self.report.view_definitions_parsing_failures.append(
f"Table-level sql parsing error for view {view_ref}: {raw_lineage.debug_info.table_error}"
)
return None
elif raw_lineage.debug_info.column_error:
self.report.num_view_definitions_failed_column_parsing += 1
self.report.view_definitions_parsing_failures.append(
f"Column-level sql parsing error for view {view_ref}: {raw_lineage.debug_info.column_error}"
)
else:
self.report.num_view_definitions_parsed += 1
return view_definition_lineage_helper(raw_lineage, view_urn)
def get_view_lineage(self) -> Iterable[MetadataWorkUnit]:
if not (
self.config.include_hive_metastore
and self.config.include_table_lineage
and self.sql_parser_schema_resolver
):
return
# This is only used for parsing view lineage. Usage, Operations are emitted elsewhere
builder = SqlParsingBuilder(
generate_lineage=True,
generate_usage_statistics=False,
generate_operations=False,
)
for dataset_name in self.view_definitions.keys():
view_ref, view_definition = self.view_definitions[dataset_name]
result = self._run_sql_parser(
view_ref,
view_definition,
self.sql_parser_schema_resolver,
)
if result and result.out_tables:
# This does not yield any workunits but we use
# yield here to execute this method
yield from builder.process_sql_parsing_result(
result=result,
query=view_definition,
is_view_ddl=True,
include_column_lineage=self.config.include_view_column_lineage,
)
yield from builder.gen_workunits()
def close(self):
if self.hive_metastore_proxy:
self.hive_metastore_proxy.close()
if self.view_definitions:
self.view_definitions.close()
if self.sql_parser_schema_resolver:
self.sql_parser_schema_resolver.close()
super().close()

View File

@ -3463,6 +3463,66 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:databricks,hive_metastore.bronze_kambi.view1,PROD)",
"changeType": "UPSERT",
"aspectName": "upstreamLineage",
"aspect": {
"json": {
"upstreams": [
{
"auditStamp": {
"time": 0,
"actor": "urn:li:corpuser:unknown"
},
"dataset": "urn:li:dataset:(urn:li:dataPlatform:databricks,hive_metastore.bronze_kambi.bet,PROD)",
"type": "VIEW"
}
],
"fineGrainedLineages": [
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:databricks,hive_metastore.bronze_kambi.bet,PROD),betStatusId)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:databricks,hive_metastore.bronze_kambi.view1,PROD),betStatusId)"
],
"confidenceScore": 1.0
},
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:databricks,hive_metastore.bronze_kambi.bet,PROD),channelId)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:databricks,hive_metastore.bronze_kambi.view1,PROD),channelId)"
],
"confidenceScore": 1.0
},
{
"upstreamType": "FIELD_SET",
"upstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:databricks,hive_metastore.bronze_kambi.bet,PROD),combination)"
],
"downstreamType": "FIELD",
"downstreams": [
"urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:databricks,hive_metastore.bronze_kambi.view1,PROD),combination)"
],
"confidenceScore": 1.0
}
]
}
},
"systemMetadata": {
"lastObserved": 1638860400000,
"runId": "unity-catalog-test",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:databricks,system.quickstart_schema.quickstart_table,PROD)",