From 56f83f0c4da96f2746c8a105e6f48e93f2f5e4a6 Mon Sep 17 00:00:00 2001 From: Mayur Singal <39544459+ulixius9@users.noreply.github.com> Date: Fri, 13 Jun 2025 22:14:56 +0530 Subject: [PATCH] MINOR: Implement Lineage Filter for UC (#21761) Co-authored-by: Sriharsha Chintalapani --- .../source/database/unitycatalog/lineage.py | 49 ++++++++++++++++--- 1 file changed, 41 insertions(+), 8 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/database/unitycatalog/lineage.py b/ingestion/src/metadata/ingestion/source/database/unitycatalog/lineage.py index fd5347c9d9d..c4dc7daa012 100644 --- a/ingestion/src/metadata/ingestion/source/database/unitycatalog/lineage.py +++ b/ingestion/src/metadata/ingestion/source/database/unitycatalog/lineage.py @@ -16,6 +16,7 @@ from typing import Iterable, Optional from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest from metadata.generated.schema.entity.data.database import Database +from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema from metadata.generated.schema.entity.data.table import Table from metadata.generated.schema.entity.services.connections.database.unityCatalogConnection import ( UnityCatalogConnection, @@ -42,6 +43,7 @@ from metadata.ingestion.source.database.unitycatalog.client import UnityCatalogC from metadata.ingestion.source.database.unitycatalog.connection import get_connection from metadata.ingestion.source.database.unitycatalog.models import LineageTableStreams from metadata.utils import fqn +from metadata.utils.filters import filter_by_database, filter_by_schema, filter_by_table from metadata.utils.helpers import retry_with_docker_host from metadata.utils.logger import ingestion_logger @@ -175,16 +177,47 @@ class UnitycatalogLineageSource(Source): for database in self.metadata.list_all_entities( entity=Database, params={"service": self.config.serviceName} ): - for table in self.metadata.list_all_entities( - entity=Table, params={"database": database.fullyQualifiedName.root} + if filter_by_database( + self.source_config.databaseFilterPattern, database.name.root ): - databricks_table_fqn = f"{table.database.name}.{table.databaseSchema.name}.{table.name.root}" - table_streams: LineageTableStreams = self.client.get_table_lineage( - databricks_table_fqn - ) - yield from self._handle_upstream_table( - table_streams, table, databricks_table_fqn + self.status.filter( + database.fullyQualifiedName.root, + "Catalog Filtered Out", ) + continue + for schema in self.metadata.list_all_entities( + entity=DatabaseSchema, + params={"database": database.fullyQualifiedName.root}, + ): + if filter_by_schema( + self.source_config.schemaFilterPattern, schema.name.root + ): + self.status.filter( + schema.fullyQualifiedName.root, + "Schema Filtered Out", + ) + continue + for table in self.metadata.list_all_entities( + entity=Table, + params={"databaseSchema": schema.fullyQualifiedName.root}, + ): + if filter_by_table( + self.source_config.tableFilterPattern, table.name.root + ): + self.status.filter( + table.fullyQualifiedName.root, + "Table Filtered Out", + ) + continue + + databricks_table_fqn = f"{table.database.name}.{table.databaseSchema.name}.{table.name.root}" + logger.debug(f"Processing table: {databricks_table_fqn}") + table_streams: LineageTableStreams = self.client.get_table_lineage( + databricks_table_fqn + ) + yield from self._handle_upstream_table( + table_streams, table, databricks_table_fqn + ) def test_connection(self) -> None: test_connection_fn = get_test_connection_fn(self.service_connection)