feat(ingest/unity): Get table lineage from system table (#14282)

This commit is contained in:
Tamas Nemeth 2025-08-06 20:11:43 +01:00 committed by GitHub
parent dd89fc0ad1
commit ea091bb9cc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 1052 additions and 86 deletions

View File

@ -45,9 +45,9 @@ class RestrictedText(ConfigModel):
custom_field="hello-world.test"
)
# model.name returns truncated and sanitized version
# model.name.raw_text returns original value
# model.custom_field returns "hello_worl..."
print(model.name) # Truncated and sanitized version
print(model.name.text) # Original value
print(model.custom_field) # "hello_worl..."
```
"""

View File

@ -8,7 +8,7 @@ import pydantic
from pydantic import Field
from typing_extensions import Literal
from datahub.configuration.common import AllowDenyPattern, ConfigModel
from datahub.configuration.common import AllowDenyPattern, ConfigEnum, ConfigModel
from datahub.configuration.source_common import (
DatasetSourceConfigMixin,
LowerCaseDatasetUrnConfigMixin,
@ -36,6 +36,12 @@ from datahub.utilities.global_warning_util import add_global_warning
logger = logging.getLogger(__name__)
class LineageDataSource(ConfigEnum):
AUTO = "AUTO"
SYSTEM_TABLES = "SYSTEM_TABLES"
API = "API"
class UnityCatalogProfilerConfig(ConfigModel):
method: str = Field(
description=(
@ -243,6 +249,21 @@ class UnityCatalogSourceConfig(
description="Option to enable/disable lineage generation. Currently we have to call a rest call per column to get column level lineage due to the Databrick api which can slow down ingestion. ",
)
lineage_data_source: LineageDataSource = pydantic.Field(
default=LineageDataSource.AUTO,
description=(
"Source for lineage data extraction. Options: "
f"'{LineageDataSource.AUTO.value}' - Use system tables when SQL warehouse is available, fallback to API; "
f"'{LineageDataSource.SYSTEM_TABLES.value}' - Force use of system.access.table_lineage and system.access.column_lineage tables (requires SQL warehouse); "
f"'{LineageDataSource.API.value}' - Force use of REST API endpoints for lineage data"
),
)
ignore_start_time_lineage: bool = pydantic.Field(
default=False,
description="Option to ignore the start_time and retrieve all available lineage. When enabled, the start_time filter will be set to zero to extract all lineage events regardless of the configured time window.",
)
column_lineage_column_limit: int = pydantic.Field(
default=300,
description="Limit the number of columns to get column level lineage. ",
@ -362,6 +383,20 @@ class UnityCatalogSourceConfig(
return values
@pydantic.root_validator(skip_on_failure=True)
def validate_lineage_data_source_with_warehouse(
cls, values: Dict[str, Any]
) -> Dict[str, Any]:
lineage_data_source = values.get("lineage_data_source", LineageDataSource.AUTO)
warehouse_id = values.get("warehouse_id")
if lineage_data_source == LineageDataSource.SYSTEM_TABLES and not warehouse_id:
raise ValueError(
f"lineage_data_source='{LineageDataSource.SYSTEM_TABLES.value}' requires warehouse_id to be set"
)
return values
@pydantic.validator("schema_pattern", always=True)
def schema_pattern_should__always_deny_information_schema(
cls, v: AllowDenyPattern

View File

@ -30,10 +30,14 @@ from databricks.sdk.service.sql import (
from databricks.sdk.service.workspace import ObjectType
from databricks.sql import connect
from databricks.sql.types import Row
from typing_extensions import assert_never
from datahub._version import nice_version_name
from datahub.api.entities.external.unity_catalog_external_entites import UnityCatalogTag
from datahub.emitter.mce_builder import parse_ts_millis
from datahub.ingestion.source.unity.config import (
LineageDataSource,
)
from datahub.ingestion.source.unity.hive_metastore_proxy import HiveMetastoreProxy
from datahub.ingestion.source.unity.proxy_profiling import (
UnityCatalogProxyProfilingMixin,
@ -46,6 +50,7 @@ from datahub.ingestion.source.unity.proxy_types import (
ExternalTableReference,
Metastore,
Notebook,
NotebookReference,
Query,
Schema,
ServicePrincipal,
@ -53,9 +58,14 @@ from datahub.ingestion.source.unity.proxy_types import (
TableReference,
)
from datahub.ingestion.source.unity.report import UnityCatalogReport
from datahub.utilities.file_backed_collections import FileBackedDict
logger: logging.Logger = logging.getLogger(__name__)
# It is enough to keep the cache size to 1, since we only process one catalog at a time
# We need to change this if we want to support parallel processing of multiple catalogs
_MAX_CONCURRENT_CATALOGS = 1
@dataclasses.dataclass
class TableInfoWithGeneration(TableInfo):
@ -91,6 +101,32 @@ class QueryFilterWithStatementTypes(QueryFilter):
return v
@dataclasses.dataclass
class TableUpstream:
table_name: str
source_type: str
last_updated: Optional[datetime] = None
@dataclasses.dataclass
class ExternalUpstream:
path: str
source_type: str
last_updated: Optional[datetime] = None
@dataclasses.dataclass
class TableLineageInfo:
upstreams: List[TableUpstream] = dataclasses.field(default_factory=list)
external_upstreams: List[ExternalUpstream] = dataclasses.field(default_factory=list)
upstream_notebooks: List[NotebookReference] = dataclasses.field(
default_factory=list
)
downstream_notebooks: List[NotebookReference] = dataclasses.field(
default_factory=list
)
class UnityCatalogApiProxy(UnityCatalogProxyProfilingMixin):
_workspace_client: WorkspaceClient
_workspace_url: str
@ -104,6 +140,7 @@ class UnityCatalogApiProxy(UnityCatalogProxyProfilingMixin):
warehouse_id: Optional[str],
report: UnityCatalogReport,
hive_metastore_proxy: Optional[HiveMetastoreProxy] = None,
lineage_data_source: LineageDataSource = LineageDataSource.AUTO,
):
self._workspace_client = WorkspaceClient(
host=workspace_url,
@ -114,6 +151,7 @@ class UnityCatalogApiProxy(UnityCatalogProxyProfilingMixin):
self.warehouse_id = warehouse_id or ""
self.report = report
self.hive_metastore_proxy = hive_metastore_proxy
self.lineage_data_source = lineage_data_source
self._sql_connection_params = {
"server_hostname": self._workspace_client.config.host.replace(
"https://", ""
@ -293,16 +331,142 @@ class UnityCatalogApiProxy(UnityCatalogProxyProfilingMixin):
method, path, body={**body, "page_token": response["next_page_token"]}
)
@cached(cachetools.FIFOCache(maxsize=100))
def get_catalog_column_lineage(self, catalog: str) -> Dict[str, Dict[str, dict]]:
"""Get column lineage for all tables in a catalog."""
def _build_datetime_where_conditions(
self, start_time: Optional[datetime] = None, end_time: Optional[datetime] = None
) -> str:
"""Build datetime filtering conditions for lineage queries."""
conditions = []
if start_time:
conditions.append(f"event_time >= '{start_time.isoformat()}'")
if end_time:
conditions.append(f"event_time <= '{end_time.isoformat()}'")
return " AND " + " AND ".join(conditions) if conditions else ""
@cached(cachetools.FIFOCache(maxsize=_MAX_CONCURRENT_CATALOGS))
def get_catalog_table_lineage_via_system_tables(
self,
catalog: str,
start_time: Optional[datetime] = None,
end_time: Optional[datetime] = None,
) -> FileBackedDict[TableLineageInfo]:
"""Get table lineage for all tables in a catalog using system tables."""
logger.info(f"Fetching table lineage for catalog: {catalog}")
try:
additional_where = self._build_datetime_where_conditions(
start_time, end_time
)
query = f"""
SELECT
entity_type, entity_id,
source_table_full_name, source_type,
target_table_full_name, target_type,
max(event_time) as last_updated
FROM system.access.table_lineage
WHERE
(target_table_catalog = %s or source_table_catalog = %s)
{additional_where}
GROUP BY
entity_type, entity_id,
source_table_full_name, source_type,
target_table_full_name, target_type
"""
rows = self._execute_sql_query(query, [catalog, catalog])
result_dict: FileBackedDict[TableLineageInfo] = FileBackedDict()
for row in rows:
entity_type = row["entity_type"]
entity_id = row["entity_id"]
source_full_name = row["source_table_full_name"]
target_full_name = row["target_table_full_name"]
source_type = row["source_type"]
last_updated = row["last_updated"]
# Initialize TableLineageInfo for both source and target tables if they're in our catalog
for table_name in [source_full_name, target_full_name]:
if (
table_name
and table_name.startswith(f"{catalog}.")
and table_name not in result_dict
):
result_dict[table_name] = TableLineageInfo()
# Process upstream relationships (target table gets upstreams)
if target_full_name and target_full_name.startswith(f"{catalog}."):
# Handle table upstreams
if (
source_type in ["TABLE", "VIEW"]
and source_full_name != target_full_name
):
upstream = TableUpstream(
table_name=source_full_name,
source_type=source_type,
last_updated=last_updated,
)
result_dict[target_full_name].upstreams.append(upstream)
# Handle external upstreams (PATH type)
elif source_type == "PATH":
external_upstream = ExternalUpstream(
path=source_full_name,
source_type=source_type,
last_updated=last_updated,
)
result_dict[target_full_name].external_upstreams.append(
external_upstream
)
# Handle upstream notebooks (notebook -> table)
elif entity_type == "NOTEBOOK":
notebook_ref = NotebookReference(
id=entity_id,
last_updated=last_updated,
)
result_dict[target_full_name].upstream_notebooks.append(
notebook_ref
)
# Process downstream relationships (source table gets downstream notebooks)
if (
entity_type == "NOTEBOOK"
and source_full_name
and source_full_name.startswith(f"{catalog}.")
):
notebook_ref = NotebookReference(
id=entity_id,
last_updated=last_updated,
)
result_dict[source_full_name].downstream_notebooks.append(
notebook_ref
)
return result_dict
except Exception as e:
logger.warning(
f"Error getting table lineage for catalog {catalog}: {e}",
exc_info=True,
)
return FileBackedDict()
@cached(cachetools.FIFOCache(maxsize=_MAX_CONCURRENT_CATALOGS))
def get_catalog_column_lineage_via_system_tables(
self,
catalog: str,
start_time: Optional[datetime] = None,
end_time: Optional[datetime] = None,
) -> FileBackedDict[Dict[str, dict]]:
"""Get column lineage for all tables in a catalog using system tables."""
logger.info(f"Fetching column lineage for catalog: {catalog}")
try:
query = """
additional_where = self._build_datetime_where_conditions(
start_time, end_time
)
query = f"""
SELECT
source_table_catalog, source_table_schema, source_table_name, source_column_name, source_type,
target_table_schema, target_table_name, target_column_name,
max(event_time)
max(event_time) as last_updated
FROM system.access.column_lineage
WHERE
target_table_catalog = %s
@ -313,13 +477,14 @@ class UnityCatalogApiProxy(UnityCatalogProxyProfilingMixin):
AND source_table_schema IS NOT NULL
AND source_table_name IS NOT NULL
AND source_column_name IS NOT NULL
{additional_where}
GROUP BY
source_table_catalog, source_table_schema, source_table_name, source_column_name, source_type,
source_table_catalog, source_table_schema, source_table_name, source_column_name, source_type,
target_table_schema, target_table_name, target_column_name
"""
rows = self._execute_sql_query(query, (catalog,))
rows = self._execute_sql_query(query, [catalog])
result_dict: Dict[str, Dict[str, dict]] = {}
result_dict: FileBackedDict[Dict[str, dict]] = FileBackedDict()
for row in rows:
result_dict.setdefault(row["target_table_schema"], {}).setdefault(
row["target_table_name"], {}
@ -330,6 +495,7 @@ class UnityCatalogApiProxy(UnityCatalogProxyProfilingMixin):
"schema_name": row["source_table_schema"],
"table_name": row["source_table_name"],
"name": row["source_column_name"],
"last_updated": row["last_updated"],
}
)
@ -339,9 +505,9 @@ class UnityCatalogApiProxy(UnityCatalogProxyProfilingMixin):
f"Error getting column lineage for catalog {catalog}: {e}",
exc_info=True,
)
return {}
return FileBackedDict()
def list_lineages_by_table(
def list_lineages_by_table_via_http_api(
self, table_name: str, include_entity_lineage: bool
) -> dict:
"""List table lineage by table name."""
@ -355,7 +521,9 @@ class UnityCatalogApiProxy(UnityCatalogProxyProfilingMixin):
},
)
def list_lineages_by_column(self, table_name: str, column_name: str) -> list:
def list_lineages_by_column_via_http_api(
self, table_name: str, column_name: str
) -> list:
"""List column lineage by table name and column name."""
logger.debug(f"Getting column lineage for {table_name}.{column_name}")
try:
@ -374,55 +542,173 @@ class UnityCatalogApiProxy(UnityCatalogProxyProfilingMixin):
)
return []
def table_lineage(self, table: Table, include_entity_lineage: bool) -> None:
def table_lineage(
self,
table: Table,
include_entity_lineage: bool,
start_time: Optional[datetime] = None,
end_time: Optional[datetime] = None,
) -> None:
if table.schema.catalog.type == CustomCatalogType.HIVE_METASTORE_CATALOG:
# Lineage is not available for Hive Metastore Tables.
return None
# Lineage endpoint doesn't exists on 2.1 version
try:
response: dict = self.list_lineages_by_table(
table_name=table.ref.qualified_table_name,
include_entity_lineage=include_entity_lineage,
)
# Determine lineage data source based on config
use_system_tables = False
if self.lineage_data_source == LineageDataSource.SYSTEM_TABLES:
use_system_tables = True
elif self.lineage_data_source == LineageDataSource.API:
use_system_tables = False
elif self.lineage_data_source == LineageDataSource.AUTO:
# Use the newer system tables if we have a SQL warehouse, otherwise fall back
# to the older (and slower) HTTP API.
use_system_tables = bool(self.warehouse_id)
else:
assert_never(self.lineage_data_source)
for item in response.get("upstreams") or []:
if "tableInfo" in item:
table_ref = TableReference.create_from_lineage(
item["tableInfo"], table.schema.catalog.metastore
)
if table_ref:
table.upstreams[table_ref] = {}
elif "fileInfo" in item:
external_ref = ExternalTableReference.create_from_lineage(
item["fileInfo"]
)
if external_ref:
table.external_upstreams.add(external_ref)
for notebook in item.get("notebookInfos") or []:
table.upstream_notebooks.add(notebook["notebook_id"])
for item in response.get("downstreams") or []:
for notebook in item.get("notebookInfos") or []:
table.downstream_notebooks.add(notebook["notebook_id"])
if use_system_tables:
self._process_system_table_lineage(table, start_time, end_time)
else:
self._process_table_lineage_via_http_api(table, include_entity_lineage)
except Exception as e:
logger.warning(
f"Error getting lineage on table {table.ref}: {e}", exc_info=True
)
def _process_system_table_lineage(
self,
table: Table,
start_time: Optional[datetime] = None,
end_time: Optional[datetime] = None,
) -> None:
"""Process table lineage using system.access.table_lineage table."""
catalog_lineage = self.get_catalog_table_lineage_via_system_tables(
table.ref.catalog, start_time, end_time
)
table_full_name = table.ref.qualified_table_name
lineage_info = catalog_lineage.get(table_full_name, TableLineageInfo())
# Process table upstreams
for upstream in lineage_info.upstreams:
upstream_table_name = upstream.table_name
# Parse catalog.schema.table format
parts = upstream_table_name.split(".")
if len(parts) == 3:
catalog_name, schema_name, table_name = parts[0], parts[1], parts[2]
table_ref = TableReference(
metastore=table.schema.catalog.metastore.id
if table.schema.catalog.metastore
else None,
catalog=catalog_name,
schema=schema_name,
table=table_name,
last_updated=upstream.last_updated,
)
table.upstreams[table_ref] = {}
else:
logger.warning(
f"Unexpected upstream table format: {upstream_table_name} for table {table_full_name}"
)
continue
# Process external upstreams
for external_upstream in lineage_info.external_upstreams:
external_ref = ExternalTableReference(
path=external_upstream.path,
has_permission=True,
name=None,
type=None,
storage_location=external_upstream.path,
last_updated=external_upstream.last_updated,
)
table.external_upstreams.add(external_ref)
# Process upstream notebook lineage
for notebook_ref in lineage_info.upstream_notebooks:
existing_ref = table.upstream_notebooks.get(notebook_ref.id)
if existing_ref is None or (
notebook_ref.last_updated
and existing_ref.last_updated
and notebook_ref.last_updated > existing_ref.last_updated
):
table.upstream_notebooks[notebook_ref.id] = notebook_ref
# Process downstream notebook lineage
for notebook_ref in lineage_info.downstream_notebooks:
existing_ref = table.downstream_notebooks.get(notebook_ref.id)
if existing_ref is None or (
notebook_ref.last_updated
and existing_ref.last_updated
and notebook_ref.last_updated > existing_ref.last_updated
):
table.downstream_notebooks[notebook_ref.id] = notebook_ref
def _process_table_lineage_via_http_api(
self, table: Table, include_entity_lineage: bool
) -> None:
"""Process table lineage using the HTTP API (legacy fallback)."""
response: dict = self.list_lineages_by_table_via_http_api(
table_name=table.ref.qualified_table_name,
include_entity_lineage=include_entity_lineage,
)
for item in response.get("upstreams") or []:
if "tableInfo" in item:
table_ref = TableReference.create_from_lineage(
item["tableInfo"], table.schema.catalog.metastore
)
if table_ref:
table.upstreams[table_ref] = {}
elif "fileInfo" in item:
external_ref = ExternalTableReference.create_from_lineage(
item["fileInfo"]
)
if external_ref:
table.external_upstreams.add(external_ref)
for notebook in item.get("notebookInfos") or []:
notebook_ref = NotebookReference(
id=notebook["notebook_id"],
)
table.upstream_notebooks[notebook_ref.id] = notebook_ref
for item in response.get("downstreams") or []:
for notebook in item.get("notebookInfos") or []:
notebook_ref = NotebookReference(
id=notebook["notebook_id"],
)
table.downstream_notebooks[notebook_ref.id] = notebook_ref
def get_column_lineage(
self,
table: Table,
column_names: List[str],
*,
max_workers: Optional[int] = None,
start_time: Optional[datetime] = None,
end_time: Optional[datetime] = None,
) -> None:
try:
# use the newer system tables if we have a SQL warehouse, otherwise fall back
# and use the older (and much slower) HTTP API.
if self.warehouse_id:
# Determine lineage data source based on config
use_system_tables = False
if self.lineage_data_source == LineageDataSource.SYSTEM_TABLES:
use_system_tables = True
elif self.lineage_data_source == LineageDataSource.API:
use_system_tables = False
elif self.lineage_data_source == LineageDataSource.AUTO:
# Use the newer system tables if we have a SQL warehouse, otherwise fall back
# to the older (and slower) HTTP API.
use_system_tables = bool(self.warehouse_id)
else:
assert_never(self.lineage_data_source)
if use_system_tables:
lineage = (
self.get_catalog_column_lineage(table.ref.catalog)
self.get_catalog_column_lineage_via_system_tables(
table.ref.catalog, start_time, end_time
)
.get(table.ref.schema, {})
.get(table.ref.table, {})
)
@ -430,7 +716,7 @@ class UnityCatalogApiProxy(UnityCatalogProxyProfilingMixin):
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = [
executor.submit(
self.list_lineages_by_column,
self.list_lineages_by_column_via_http_api,
table.ref.qualified_table_name,
column_name,
)
@ -608,7 +894,7 @@ class UnityCatalogApiProxy(UnityCatalogProxyProfilingMixin):
logger.warning(f"Failed to execute SQL query: {e}")
return []
@cached(cachetools.FIFOCache(maxsize=100))
@cached(cachetools.FIFOCache(maxsize=_MAX_CONCURRENT_CATALOGS))
def get_schema_tags(self, catalog: str) -> Dict[str, List[UnityCatalogTag]]:
"""Optimized version using databricks-sql"""
logger.info(f"Fetching schema tags for catalog: `{catalog}`")
@ -631,7 +917,7 @@ class UnityCatalogApiProxy(UnityCatalogProxyProfilingMixin):
return result_dict
@cached(cachetools.FIFOCache(maxsize=100))
@cached(cachetools.FIFOCache(maxsize=_MAX_CONCURRENT_CATALOGS))
def get_catalog_tags(self, catalog: str) -> Dict[str, List[UnityCatalogTag]]:
"""Optimized version using databricks-sql"""
logger.info(f"Fetching table tags for catalog: `{catalog}`")
@ -653,7 +939,7 @@ class UnityCatalogApiProxy(UnityCatalogProxyProfilingMixin):
return result_dict
@cached(cachetools.FIFOCache(maxsize=100))
@cached(cachetools.FIFOCache(maxsize=_MAX_CONCURRENT_CATALOGS))
def get_table_tags(self, catalog: str) -> Dict[str, List[UnityCatalogTag]]:
"""Optimized version using databricks-sql"""
logger.info(f"Fetching table tags for catalog: `{catalog}`")
@ -676,7 +962,7 @@ class UnityCatalogApiProxy(UnityCatalogProxyProfilingMixin):
return result_dict
@cached(cachetools.FIFOCache(maxsize=100))
@cached(cachetools.FIFOCache(maxsize=_MAX_CONCURRENT_CATALOGS))
def get_column_tags(self, catalog: str) -> Dict[str, List[UnityCatalogTag]]:
"""Optimized version using databricks-sql"""
logger.info(f"Fetching column tags for catalog: `{catalog}`")

View File

@ -148,6 +148,7 @@ class TableReference:
catalog: str
schema: str
table: str
last_updated: Optional[datetime] = None
@classmethod
def create(cls, table: "Table") -> "TableReference":
@ -172,6 +173,7 @@ class TableReference:
d["catalog_name"],
d["schema_name"],
d.get("table_name", d["name"]), # column vs table query output
last_updated=d.get("last_updated"),
)
except Exception as e:
logger.warning(f"Failed to create TableReference from {d}: {e}")
@ -199,6 +201,7 @@ class ExternalTableReference:
name: Optional[str]
type: Optional[SecurableType]
storage_location: Optional[str]
last_updated: Optional[datetime] = None
@classmethod
def create_from_lineage(cls, d: dict) -> Optional["ExternalTableReference"]:
@ -215,12 +218,19 @@ class ExternalTableReference:
name=d.get("securable_name"),
type=securable_type,
storage_location=d.get("storage_location"),
last_updated=d.get("last_updated"),
)
except Exception as e:
logger.warning(f"Failed to create ExternalTableReference from {d}: {e}")
return None
@dataclass(frozen=True, order=True)
class NotebookReference:
id: int
last_updated: Optional[datetime] = None
@dataclass
class Table(CommonProperty):
schema: Schema
@ -239,8 +249,8 @@ class Table(CommonProperty):
properties: Dict[str, str]
upstreams: Dict[TableReference, Dict[str, List[str]]] = field(default_factory=dict)
external_upstreams: Set[ExternalTableReference] = field(default_factory=set)
upstream_notebooks: Set[NotebookId] = field(default_factory=set)
downstream_notebooks: Set[NotebookId] = field(default_factory=set)
upstream_notebooks: Dict[int, NotebookReference] = field(default_factory=dict)
downstream_notebooks: Dict[int, NotebookReference] = field(default_factory=dict)
ref: TableReference = field(init=False)

View File

@ -7,12 +7,14 @@ from urllib.parse import urljoin
from datahub.api.entities.external.external_entities import PlatformResourceRepository
from datahub.api.entities.external.unity_catalog_external_entites import UnityCatalogTag
from datahub.emitter.mce_builder import (
UNKNOWN_USER,
make_data_platform_urn,
make_dataplatform_instance_urn,
make_dataset_urn_with_platform_instance,
make_domain_urn,
make_group_urn,
make_schema_field_urn,
make_ts_millis,
make_user_urn,
)
from datahub.emitter.mcp import MetadataChangeProposalWrapper
@ -111,6 +113,7 @@ from datahub.metadata.com.linkedin.pegasus2avro.dataset import (
ViewProperties,
)
from datahub.metadata.schema_classes import (
AuditStampClass,
BrowsePathsClass,
DataPlatformInstanceClass,
DatasetLineageTypeClass,
@ -203,6 +206,7 @@ class UnityCatalogSource(StatefulIngestionSourceBase, TestableSource):
config.warehouse_id,
report=self.report,
hive_metastore_proxy=self.hive_metastore_proxy,
lineage_data_source=config.lineage_data_source,
)
self.external_url_base = urljoin(self.config.workspace_url, "/explore/data")
@ -410,12 +414,12 @@ class UnityCatalogSource(StatefulIngestionSourceBase, TestableSource):
self.config.workspace_url, f"#notebook/{notebook.id}"
),
created=(
TimeStampClass(int(notebook.created_at.timestamp() * 1000))
TimeStampClass(make_ts_millis(notebook.created_at))
if notebook.created_at
else None
),
lastModified=(
TimeStampClass(int(notebook.modified_at.timestamp() * 1000))
TimeStampClass(make_ts_millis(notebook.modified_at))
if notebook.modified_at
else None
),
@ -434,17 +438,20 @@ class UnityCatalogSource(StatefulIngestionSourceBase, TestableSource):
if not notebook.upstreams:
return None
upstreams = []
for upstream_ref in notebook.upstreams:
timestamp = make_ts_millis(upstream_ref.last_updated)
upstreams.append(
self._create_upstream_class(
self.gen_dataset_urn(upstream_ref),
DatasetLineageTypeClass.COPY,
timestamp,
)
)
return MetadataChangeProposalWrapper(
entityUrn=self.gen_notebook_urn(notebook),
aspect=UpstreamLineageClass(
upstreams=[
UpstreamClass(
dataset=self.gen_dataset_urn(upstream_ref),
type=DatasetLineageTypeClass.COPY,
)
for upstream_ref in notebook.upstreams
]
),
aspect=UpstreamLineageClass(upstreams=upstreams),
).as_workunit()
def process_metastores(self) -> Iterable[MetadataWorkUnit]:
@ -463,14 +470,15 @@ class UnityCatalogSource(StatefulIngestionSourceBase, TestableSource):
self, metastore: Optional[Metastore]
) -> Iterable[MetadataWorkUnit]:
for catalog in self._get_catalogs(metastore):
if not self.config.catalog_pattern.allowed(catalog.id):
self.report.catalogs.dropped(catalog.id)
continue
with self.report.new_stage(f"Ingest catalog {catalog.id}"):
if not self.config.catalog_pattern.allowed(catalog.id):
self.report.catalogs.dropped(catalog.id)
continue
yield from self.gen_catalog_containers(catalog)
yield from self.process_schemas(catalog)
yield from self.gen_catalog_containers(catalog)
yield from self.process_schemas(catalog)
self.report.catalogs.processed(catalog.id)
self.report.catalogs.processed(catalog.id)
def _get_catalogs(self, metastore: Optional[Metastore]) -> Iterable[Catalog]:
if self.config.catalogs:
@ -647,9 +655,21 @@ class UnityCatalogSource(StatefulIngestionSourceBase, TestableSource):
]
def ingest_lineage(self, table: Table) -> Optional[UpstreamLineageClass]:
# Calculate datetime filters for lineage
lineage_start_time = None
lineage_end_time = self.config.end_time
if self.config.ignore_start_time_lineage:
lineage_start_time = None # Ignore start time to get all lineage
else:
lineage_start_time = self.config.start_time
if self.config.include_table_lineage:
self.unity_catalog_api_proxy.table_lineage(
table, include_entity_lineage=self.config.include_notebooks
table,
include_entity_lineage=self.config.include_notebooks,
start_time=lineage_start_time,
end_time=lineage_end_time,
)
if self.config.include_column_lineage and table.upstreams:
@ -661,7 +681,11 @@ class UnityCatalogSource(StatefulIngestionSourceBase, TestableSource):
for column in table.columns[: self.config.column_lineage_column_limit]
]
self.unity_catalog_api_proxy.get_column_lineage(
table, column_names, max_workers=self.config.lineage_max_workers
table,
column_names,
max_workers=self.config.lineage_max_workers,
start_time=lineage_start_time,
end_time=lineage_end_time,
)
return self._generate_lineage_aspect(self.gen_dataset_urn(table.ref), table)
@ -690,18 +714,22 @@ class UnityCatalogSource(StatefulIngestionSourceBase, TestableSource):
for d_col, u_cols in sorted(downstream_to_upstream_cols.items())
)
timestamp = make_ts_millis(upstream_ref.last_updated)
upstreams.append(
UpstreamClass(
dataset=upstream_urn,
type=DatasetLineageTypeClass.TRANSFORMED,
self._create_upstream_class(
upstream_urn,
DatasetLineageTypeClass.TRANSFORMED,
timestamp,
)
)
for notebook in table.upstream_notebooks:
for notebook in table.upstream_notebooks.values():
timestamp = make_ts_millis(notebook.last_updated)
upstreams.append(
UpstreamClass(
dataset=self.gen_notebook_urn(notebook),
type=DatasetLineageTypeClass.TRANSFORMED,
self._create_upstream_class(
self.gen_notebook_urn(notebook.id),
DatasetLineageTypeClass.TRANSFORMED,
timestamp,
)
)
@ -771,6 +799,31 @@ class UnityCatalogSource(StatefulIngestionSourceBase, TestableSource):
instance=self.config.platform_instance,
).as_urn()
def _create_upstream_class(
self,
dataset_urn: str,
lineage_type: Union[str, DatasetLineageTypeClass],
timestamp: Optional[int],
) -> UpstreamClass:
"""
Helper method to create UpstreamClass with optional audit stamp.
If timestamp is None, audit stamp is omitted.
"""
if timestamp is not None:
return UpstreamClass(
dataset=dataset_urn,
type=lineage_type,
auditStamp=AuditStampClass(
time=timestamp,
actor=UNKNOWN_USER,
),
)
else:
return UpstreamClass(
dataset=dataset_urn,
type=lineage_type,
)
def gen_schema_containers(self, schema: Schema) -> Iterable[MetadataWorkUnit]:
domain_urn = self._gen_domain_urn(f"{schema.catalog.name}.{schema.name}")
schema_tags = []
@ -961,16 +1014,20 @@ class UnityCatalogSource(StatefulIngestionSourceBase, TestableSource):
created: Optional[TimeStampClass] = None
if table.created_at:
custom_properties["created_at"] = str(table.created_at)
created = TimeStampClass(
int(table.created_at.timestamp() * 1000),
make_user_urn(table.created_by) if table.created_by else None,
)
created_ts = make_ts_millis(table.created_at)
if created_ts is not None:
created = TimeStampClass(
created_ts,
make_user_urn(table.created_by) if table.created_by else None,
)
last_modified = created
if table.updated_at:
last_modified = TimeStampClass(
int(table.updated_at.timestamp() * 1000),
table.updated_by and make_user_urn(table.updated_by),
)
updated_ts = make_ts_millis(table.updated_at)
if updated_ts is not None:
last_modified = TimeStampClass(
updated_ts,
table.updated_by and make_user_urn(table.updated_by),
)
return DatasetPropertiesClass(
name=table.name,

View File

@ -0,0 +1,578 @@
from datetime import datetime
from unittest.mock import patch
import pytest
from datahub.ingestion.source.unity.proxy import (
ExternalUpstream,
TableLineageInfo,
TableUpstream,
UnityCatalogApiProxy,
)
from datahub.ingestion.source.unity.report import UnityCatalogReport
class TestUnityCatalogProxy:
@pytest.fixture
def mock_proxy(self):
"""Create a mock UnityCatalogApiProxy for testing."""
with patch("datahub.ingestion.source.unity.proxy.WorkspaceClient"):
proxy = UnityCatalogApiProxy(
workspace_url="https://test.databricks.com",
personal_access_token="test_token",
warehouse_id="test_warehouse",
report=UnityCatalogReport(),
)
return proxy
def test_build_datetime_where_conditions_empty(self, mock_proxy):
"""Test datetime conditions with no start/end time."""
result = mock_proxy._build_datetime_where_conditions()
assert result == ""
def test_build_datetime_where_conditions_start_only(self, mock_proxy):
"""Test datetime conditions with only start time."""
start_time = datetime(2023, 1, 1, 12, 0, 0)
result = mock_proxy._build_datetime_where_conditions(start_time=start_time)
expected = " AND event_time >= '2023-01-01T12:00:00'"
assert result == expected
def test_build_datetime_where_conditions_end_only(self, mock_proxy):
"""Test datetime conditions with only end time."""
end_time = datetime(2023, 12, 31, 23, 59, 59)
result = mock_proxy._build_datetime_where_conditions(end_time=end_time)
expected = " AND event_time <= '2023-12-31T23:59:59'"
assert result == expected
def test_build_datetime_where_conditions_both(self, mock_proxy):
"""Test datetime conditions with both start and end time."""
start_time = datetime(2023, 1, 1, 12, 0, 0)
end_time = datetime(2023, 12, 31, 23, 59, 59)
result = mock_proxy._build_datetime_where_conditions(
start_time=start_time, end_time=end_time
)
expected = " AND event_time >= '2023-01-01T12:00:00' AND event_time <= '2023-12-31T23:59:59'"
assert result == expected
@patch(
"datahub.ingestion.source.unity.proxy.UnityCatalogApiProxy._execute_sql_query"
)
def test_get_catalog_table_lineage_empty(self, mock_execute, mock_proxy):
"""Test get_catalog_table_lineage with no results."""
mock_execute.return_value = []
result = mock_proxy.get_catalog_table_lineage_via_system_tables("test_catalog")
assert len(result) == 0
mock_execute.assert_called_once()
@patch(
"datahub.ingestion.source.unity.proxy.UnityCatalogApiProxy._execute_sql_query"
)
def test_get_catalog_table_lineage_with_datetime_filter(
self, mock_execute, mock_proxy
):
"""Test get_catalog_table_lineage with datetime filtering."""
mock_execute.return_value = []
start_time = datetime(2023, 1, 1)
end_time = datetime(2023, 12, 31)
mock_proxy.get_catalog_table_lineage_via_system_tables(
"test_catalog", start_time=start_time, end_time=end_time
)
# Verify the query contains datetime conditions
call_args = mock_execute.call_args
query = call_args[0][0]
assert "event_time >= '2023-01-01T00:00:00'" in query
assert "event_time <= '2023-12-31T00:00:00'" in query
@patch(
"datahub.ingestion.source.unity.proxy.UnityCatalogApiProxy._execute_sql_query"
)
def test_get_catalog_table_lineage_data_processing(self, mock_execute, mock_proxy):
"""Test get_catalog_table_lineage with sample data."""
mock_data = [
# Regular table upstream
{
"entity_type": "TABLE",
"entity_id": "entity_1",
"source_table_full_name": "other_catalog.schema.source_table",
"source_type": "TABLE",
"target_table_full_name": "test_catalog.schema.target_table",
"target_type": "TABLE",
"last_updated": datetime(2023, 1, 1),
},
# External PATH upstream
{
"entity_type": "TABLE",
"entity_id": "path_1",
"source_table_full_name": "s3://bucket/path/to/file",
"source_type": "PATH",
"target_table_full_name": "test_catalog.schema.external_target",
"target_type": "TABLE",
"last_updated": datetime(2023, 1, 2),
},
# Notebook upstream (notebook writes to table) - source_table_full_name is None
{
"entity_type": "NOTEBOOK",
"entity_id": "notebook_123",
"source_table_full_name": None,
"source_type": None,
"target_table_full_name": "test_catalog.schema.downstream_table",
"target_type": "TABLE",
"last_updated": datetime(2023, 1, 3),
},
# Notebook downstream (table read by notebook) - target_table_full_name is None
{
"entity_type": "NOTEBOOK",
"entity_id": "notebook_456",
"source_table_full_name": "test_catalog.schema.upstream_table",
"source_type": "TABLE",
"target_table_full_name": None,
"target_type": None,
"last_updated": datetime(2023, 1, 4),
},
]
mock_execute.return_value = mock_data
result = mock_proxy.get_catalog_table_lineage_via_system_tables("test_catalog")
# Verify tables are initialized
assert "test_catalog.schema.target_table" in result
assert "test_catalog.schema.external_target" in result
assert "test_catalog.schema.downstream_table" in result
assert "test_catalog.schema.upstream_table" in result
# Check table upstream
target_lineage = result["test_catalog.schema.target_table"]
assert len(target_lineage.upstreams) == 1
assert (
target_lineage.upstreams[0].table_name
== "other_catalog.schema.source_table"
)
assert target_lineage.upstreams[0].source_type == "TABLE"
# Check external upstream
external_lineage = result["test_catalog.schema.external_target"]
assert len(external_lineage.external_upstreams) == 1
assert external_lineage.external_upstreams[0].path == "s3://bucket/path/to/file"
assert external_lineage.external_upstreams[0].source_type == "PATH"
# Check notebook upstream (notebook writes to table)
downstream_lineage = result["test_catalog.schema.downstream_table"]
assert len(downstream_lineage.upstream_notebooks) == 1
notebook_ref = downstream_lineage.upstream_notebooks[0]
assert notebook_ref.id == "notebook_123"
# Check notebook downstream (table read by notebook)
upstream_lineage = result["test_catalog.schema.upstream_table"]
assert len(upstream_lineage.downstream_notebooks) == 1
notebook_ref = upstream_lineage.downstream_notebooks[0]
assert notebook_ref.id == "notebook_456"
@patch(
"datahub.ingestion.source.unity.proxy.UnityCatalogApiProxy._execute_sql_query"
)
def test_get_catalog_column_lineage_empty(self, mock_execute, mock_proxy):
"""Test get_catalog_column_lineage with no results."""
mock_execute.return_value = []
result = mock_proxy.get_catalog_column_lineage_via_system_tables("test_catalog")
assert len(result) == 0
mock_execute.assert_called_once()
@patch(
"datahub.ingestion.source.unity.proxy.UnityCatalogApiProxy._execute_sql_query"
)
def test_get_catalog_column_lineage_with_datetime_filter(
self, mock_execute, mock_proxy
):
"""Test get_catalog_column_lineage with datetime filtering."""
mock_execute.return_value = []
start_time = datetime(2023, 1, 1)
end_time = datetime(2023, 12, 31)
mock_proxy.get_catalog_column_lineage_via_system_tables(
"test_catalog", start_time=start_time, end_time=end_time
)
# Verify the query contains datetime conditions
call_args = mock_execute.call_args
query = call_args[0][0]
assert "event_time >= '2023-01-01T00:00:00'" in query
assert "event_time <= '2023-12-31T00:00:00'" in query
@patch(
"datahub.ingestion.source.unity.proxy.UnityCatalogApiProxy._execute_sql_query"
)
def test_get_catalog_column_lineage_data_processing(self, mock_execute, mock_proxy):
"""Test get_catalog_column_lineage with sample data."""
mock_data = [
{
"source_table_catalog": "source_catalog",
"source_table_schema": "source_schema",
"source_table_name": "source_table",
"source_column_name": "source_col",
"source_type": "TABLE",
"target_table_schema": "target_schema",
"target_table_name": "target_table",
"target_column_name": "target_col",
"last_updated": datetime(2023, 1, 1),
}
]
mock_execute.return_value = mock_data
result = mock_proxy.get_catalog_column_lineage_via_system_tables("test_catalog")
# Verify nested dictionary structure
assert "target_schema" in result
assert "target_table" in result["target_schema"]
assert "target_col" in result["target_schema"]["target_table"]
column_lineage = result["target_schema"]["target_table"]["target_col"]
assert len(column_lineage) == 1
assert column_lineage[0]["catalog_name"] == "source_catalog"
assert column_lineage[0]["schema_name"] == "source_schema"
assert column_lineage[0]["table_name"] == "source_table"
assert column_lineage[0]["name"] == "source_col"
def test_dataclass_creation(self):
"""Test creation of lineage dataclasses."""
# Test TableUpstream
table_upstream = TableUpstream(
table_name="catalog.schema.table",
source_type="TABLE",
last_updated=datetime(2023, 1, 1),
)
assert table_upstream.table_name == "catalog.schema.table"
assert table_upstream.source_type == "TABLE"
assert table_upstream.last_updated == datetime(2023, 1, 1)
# Test ExternalUpstream
external_upstream = ExternalUpstream(
path="s3://bucket/path",
source_type="PATH",
last_updated=datetime(2023, 1, 2),
)
assert external_upstream.path == "s3://bucket/path"
assert external_upstream.source_type == "PATH"
assert external_upstream.last_updated == datetime(2023, 1, 2)
# Test TableLineageInfo with defaults
lineage_info = TableLineageInfo()
assert lineage_info.upstreams == []
assert lineage_info.external_upstreams == []
assert lineage_info.upstream_notebooks == []
assert lineage_info.downstream_notebooks == []
@patch(
"datahub.ingestion.source.unity.proxy.UnityCatalogApiProxy._execute_sql_query"
)
def test_sql_execution_error_handling(self, mock_execute, mock_proxy):
"""Test error handling in lineage methods."""
mock_execute.side_effect = Exception("SQL execution failed")
# Test table lineage error handling
result = mock_proxy.get_catalog_table_lineage_via_system_tables("test_catalog")
assert len(result) == 0
# Test column lineage error handling
result = mock_proxy.get_catalog_column_lineage_via_system_tables("test_catalog")
assert len(result) == 0
@patch(
"datahub.ingestion.source.unity.proxy.UnityCatalogApiProxy.get_catalog_table_lineage_via_system_tables"
)
def test_process_system_table_lineage(self, mock_get_lineage, mock_proxy):
"""Test _process_system_table_lineage method."""
from datetime import datetime
from datahub.ingestion.source.unity.proxy_types import (
Catalog,
Metastore,
NotebookReference,
Schema,
Table,
)
# Create mock table object
metastore = Metastore(
id="test_metastore",
name="test_metastore",
comment=None,
global_metastore_id="global_123",
metastore_id="meta_123",
owner="owner",
region="us-west-2",
cloud="aws",
)
catalog = Catalog(
id="test_catalog",
name="test_catalog",
metastore=metastore,
comment=None,
owner="owner",
type=None,
)
schema = Schema(
id="test_catalog.test_schema",
name="test_schema",
catalog=catalog,
comment=None,
owner="owner",
)
table = Table(
id="test_catalog.test_schema.test_table",
name="test_table",
schema=schema,
columns=[],
storage_location="/path/to/table",
data_source_format=None,
table_type=None,
owner="owner",
generation=None,
created_at=None,
created_by=None,
updated_at=None,
updated_by=None,
table_id="table_123",
view_definition=None,
properties={},
comment=None,
)
# Mock lineage data
mock_lineage_info = TableLineageInfo(
upstreams=[
TableUpstream(
table_name="source_catalog.source_schema.source_table",
source_type="TABLE",
last_updated=datetime(2023, 1, 1),
),
TableUpstream(
table_name="invalid_table_name", # Should be skipped due to invalid format
source_type="TABLE",
last_updated=datetime(2023, 1, 2),
),
],
external_upstreams=[
ExternalUpstream(
path="s3://bucket/path/to/file",
source_type="PATH",
last_updated=datetime(2023, 1, 3),
)
],
upstream_notebooks=[
NotebookReference(id=123, last_updated=datetime(2023, 1, 4))
],
downstream_notebooks=[
NotebookReference(id=456, last_updated=datetime(2023, 1, 5))
],
)
mock_get_lineage.return_value = {
"test_catalog.test_schema.test_table": mock_lineage_info
}
# Test the method
start_time = datetime(2023, 1, 1)
end_time = datetime(2023, 12, 31)
mock_proxy._process_system_table_lineage(table, start_time, end_time)
# Verify get_catalog_table_lineage was called with correct parameters
mock_get_lineage.assert_called_once_with("test_catalog", start_time, end_time)
# Verify table upstreams were processed correctly
assert len(table.upstreams) == 1
table_ref = list(table.upstreams.keys())[0]
assert table_ref.catalog == "source_catalog"
assert table_ref.schema == "source_schema"
assert table_ref.table == "source_table"
assert table_ref.metastore == "test_metastore"
assert table_ref.last_updated == datetime(2023, 1, 1)
# Verify external upstreams were processed
assert len(table.external_upstreams) == 1
external_ref = list(table.external_upstreams)[0]
assert external_ref.path == "s3://bucket/path/to/file"
assert external_ref.storage_location == "s3://bucket/path/to/file"
assert external_ref.has_permission is True
assert external_ref.last_updated == datetime(2023, 1, 3)
# Verify notebook lineage was processed
assert len(table.upstream_notebooks) == 1
assert 123 in table.upstream_notebooks
upstream_notebook = table.upstream_notebooks[123]
assert upstream_notebook.id == 123
assert upstream_notebook.last_updated == datetime(2023, 1, 4)
assert len(table.downstream_notebooks) == 1
assert 456 in table.downstream_notebooks
downstream_notebook = table.downstream_notebooks[456]
assert downstream_notebook.id == 456
assert downstream_notebook.last_updated == datetime(2023, 1, 5)
@patch(
"datahub.ingestion.source.unity.proxy.UnityCatalogApiProxy.get_catalog_table_lineage_via_system_tables"
)
@patch("datahub.ingestion.source.unity.proxy.logger")
def test_process_system_table_lineage_invalid_table_name(
self, mock_logger, mock_get_lineage, mock_proxy
):
"""Test _process_system_table_lineage with invalid table names."""
from datahub.ingestion.source.unity.proxy_types import (
Catalog,
Metastore,
Schema,
Table,
)
# Create minimal table object
metastore = Metastore(
id="test_metastore",
name="test_metastore",
comment=None,
global_metastore_id=None,
metastore_id=None,
owner=None,
region=None,
cloud=None,
)
catalog = Catalog(
id="test_catalog",
name="test_catalog",
metastore=metastore,
comment=None,
owner=None,
type=None,
)
schema = Schema(
id="test_catalog.test_schema",
name="test_schema",
catalog=catalog,
comment=None,
owner=None,
)
table = Table(
id="test_table",
name="test_table",
schema=schema,
columns=[],
storage_location=None,
data_source_format=None,
table_type=None,
owner=None,
generation=None,
created_at=None,
created_by=None,
updated_at=None,
updated_by=None,
table_id=None,
view_definition=None,
properties={},
comment=None,
)
# Mock lineage with invalid table name format
mock_lineage_info = TableLineageInfo(
upstreams=[
TableUpstream(
table_name="invalid.table", # Only 2 parts, should be skipped
source_type="TABLE",
last_updated=None,
)
]
)
mock_get_lineage.return_value = {
"test_catalog.test_schema.test_table": mock_lineage_info
}
# Test the method
mock_proxy._process_system_table_lineage(table)
# Verify warning was logged for invalid table name
mock_logger.warning.assert_called_once()
warning_call = mock_logger.warning.call_args[0][0]
assert "Unexpected upstream table format" in warning_call
assert "invalid.table" in warning_call
# Verify no upstreams were added
assert len(table.upstreams) == 0
@patch(
"datahub.ingestion.source.unity.proxy.UnityCatalogApiProxy.get_catalog_table_lineage_via_system_tables"
)
def test_process_system_table_lineage_no_lineage_data(
self, mock_get_lineage, mock_proxy
):
"""Test _process_system_table_lineage when no lineage data exists."""
from datahub.ingestion.source.unity.proxy_types import (
Catalog,
Metastore,
Schema,
Table,
)
# Create minimal table object
metastore = Metastore(
id="test_metastore",
name="test_metastore",
comment=None,
global_metastore_id=None,
metastore_id=None,
owner=None,
region=None,
cloud=None,
)
catalog = Catalog(
id="test_catalog",
name="test_catalog",
metastore=metastore,
comment=None,
owner=None,
type=None,
)
schema = Schema(
id="test_catalog.test_schema",
name="test_schema",
catalog=catalog,
comment=None,
owner=None,
)
table = Table(
id="test_table",
name="test_table",
schema=schema,
columns=[],
storage_location=None,
data_source_format=None,
table_type=None,
owner=None,
generation=None,
created_at=None,
created_by=None,
updated_at=None,
updated_by=None,
table_id=None,
view_definition=None,
properties={},
comment=None,
)
# Mock empty lineage data
mock_get_lineage.return_value = {}
# Test the method
mock_proxy._process_system_table_lineage(table)
# Verify no lineage was added (empty TableLineageInfo should be used)
assert len(table.upstreams) == 0
assert len(table.external_upstreams) == 0
assert len(table.upstream_notebooks) == 0
assert len(table.downstream_notebooks) == 0