mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-09-26 17:34:41 +00:00
MINOR: Implement Lineage Filter for UC (#21761)
Co-authored-by: Sriharsha Chintalapani <harshach@users.noreply.github.com>
This commit is contained in:
parent
c3a5e8609f
commit
56f83f0c4d
@ -16,6 +16,7 @@ from typing import Iterable, Optional
|
|||||||
|
|
||||||
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
|
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
|
||||||
from metadata.generated.schema.entity.data.database import Database
|
from metadata.generated.schema.entity.data.database import Database
|
||||||
|
from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
|
||||||
from metadata.generated.schema.entity.data.table import Table
|
from metadata.generated.schema.entity.data.table import Table
|
||||||
from metadata.generated.schema.entity.services.connections.database.unityCatalogConnection import (
|
from metadata.generated.schema.entity.services.connections.database.unityCatalogConnection import (
|
||||||
UnityCatalogConnection,
|
UnityCatalogConnection,
|
||||||
@ -42,6 +43,7 @@ from metadata.ingestion.source.database.unitycatalog.client import UnityCatalogC
|
|||||||
from metadata.ingestion.source.database.unitycatalog.connection import get_connection
|
from metadata.ingestion.source.database.unitycatalog.connection import get_connection
|
||||||
from metadata.ingestion.source.database.unitycatalog.models import LineageTableStreams
|
from metadata.ingestion.source.database.unitycatalog.models import LineageTableStreams
|
||||||
from metadata.utils import fqn
|
from metadata.utils import fqn
|
||||||
|
from metadata.utils.filters import filter_by_database, filter_by_schema, filter_by_table
|
||||||
from metadata.utils.helpers import retry_with_docker_host
|
from metadata.utils.helpers import retry_with_docker_host
|
||||||
from metadata.utils.logger import ingestion_logger
|
from metadata.utils.logger import ingestion_logger
|
||||||
|
|
||||||
@ -175,16 +177,47 @@ class UnitycatalogLineageSource(Source):
|
|||||||
for database in self.metadata.list_all_entities(
|
for database in self.metadata.list_all_entities(
|
||||||
entity=Database, params={"service": self.config.serviceName}
|
entity=Database, params={"service": self.config.serviceName}
|
||||||
):
|
):
|
||||||
for table in self.metadata.list_all_entities(
|
if filter_by_database(
|
||||||
entity=Table, params={"database": database.fullyQualifiedName.root}
|
self.source_config.databaseFilterPattern, database.name.root
|
||||||
):
|
):
|
||||||
databricks_table_fqn = f"{table.database.name}.{table.databaseSchema.name}.{table.name.root}"
|
self.status.filter(
|
||||||
table_streams: LineageTableStreams = self.client.get_table_lineage(
|
database.fullyQualifiedName.root,
|
||||||
databricks_table_fqn
|
"Catalog Filtered Out",
|
||||||
)
|
|
||||||
yield from self._handle_upstream_table(
|
|
||||||
table_streams, table, databricks_table_fqn
|
|
||||||
)
|
)
|
||||||
|
continue
|
||||||
|
for schema in self.metadata.list_all_entities(
|
||||||
|
entity=DatabaseSchema,
|
||||||
|
params={"database": database.fullyQualifiedName.root},
|
||||||
|
):
|
||||||
|
if filter_by_schema(
|
||||||
|
self.source_config.schemaFilterPattern, schema.name.root
|
||||||
|
):
|
||||||
|
self.status.filter(
|
||||||
|
schema.fullyQualifiedName.root,
|
||||||
|
"Schema Filtered Out",
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
for table in self.metadata.list_all_entities(
|
||||||
|
entity=Table,
|
||||||
|
params={"databaseSchema": schema.fullyQualifiedName.root},
|
||||||
|
):
|
||||||
|
if filter_by_table(
|
||||||
|
self.source_config.tableFilterPattern, table.name.root
|
||||||
|
):
|
||||||
|
self.status.filter(
|
||||||
|
table.fullyQualifiedName.root,
|
||||||
|
"Table Filtered Out",
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
|
||||||
|
databricks_table_fqn = f"{table.database.name}.{table.databaseSchema.name}.{table.name.root}"
|
||||||
|
logger.debug(f"Processing table: {databricks_table_fqn}")
|
||||||
|
table_streams: LineageTableStreams = self.client.get_table_lineage(
|
||||||
|
databricks_table_fqn
|
||||||
|
)
|
||||||
|
yield from self._handle_upstream_table(
|
||||||
|
table_streams, table, databricks_table_fqn
|
||||||
|
)
|
||||||
|
|
||||||
def test_connection(self) -> None:
|
def test_connection(self) -> None:
|
||||||
test_connection_fn = get_test_connection_fn(self.service_connection)
|
test_connection_fn = get_test_connection_fn(self.service_connection)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user