Fixes #17888 - Implemented Cross Database Lineage (#18831)

This commit is contained in:
Keshav Mohta 2024-12-12 15:13:36 +05:30 committed by GitHub
parent cde3a7dd1e
commit f4ff43c24c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 193 additions and 8 deletions

View File

@ -17,15 +17,27 @@ import traceback
from abc import ABC
from concurrent.futures import ThreadPoolExecutor, as_completed
from functools import partial
from typing import Callable, Iterable, Iterator, Union
from typing import Callable, Iterable, Iterator, List, Optional, Union
from metadata.generated.schema.api.data.createQuery import CreateQueryRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.type.basic import FullyQualifiedEntityName, SqlQuery
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.type.basic import (
FullyQualifiedEntityName,
SqlQuery,
Uuid,
)
from metadata.generated.schema.type.entityLineage import (
ColumnLineage,
EntitiesEdge,
LineageDetails,
Source,
)
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.generated.schema.type.tableQuery import TableQuery
from metadata.ingestion.api.models import Either
from metadata.ingestion.lineage.models import ConnectionTypeDialectMapper, Dialect
from metadata.ingestion.lineage.sql_lineage import get_lineage_by_query
from metadata.ingestion.lineage.sql_lineage import get_column_fqn, get_lineage_by_query
from metadata.ingestion.models.ometa_lineage import OMetaLineageRequest
from metadata.ingestion.source.database.query_parser_source import QueryParserSource
from metadata.ingestion.source.models import TableView
@ -239,6 +251,62 @@ class LineageSource(QueryParserSource, ABC):
f"Processing Procedure Lineage not supported for {str(self.service_connection.type.value)}"
)
def get_column_lineage(
self, from_table: Table, to_table: Table
) -> List[ColumnLineage]:
"""
Get the column lineage from the fields
"""
try:
column_lineage = []
for column in from_table.columns:
field = column.name.root
from_column = get_column_fqn(table_entity=from_table, column=field)
to_column = get_column_fqn(table_entity=to_table, column=field)
if from_column and to_column:
column_lineage.append(
ColumnLineage(fromColumns=[from_column], toColumn=to_column)
)
return column_lineage
except Exception as exc:
logger.debug(f"Error to get column lineage: {exc}")
logger.debug(traceback.format_exc())
def get_add_cross_database_lineage_request(
self,
from_entity: Table,
to_entity: Table,
column_lineage: List[ColumnLineage] = None,
) -> Optional[Either[AddLineageRequest]]:
if from_entity and to_entity:
return Either(
right=AddLineageRequest(
edge=EntitiesEdge(
fromEntity=EntityReference(
id=Uuid(from_entity.id.root), type="table"
),
toEntity=EntityReference(
id=Uuid(to_entity.id.root), type="table"
),
lineageDetails=LineageDetails(
source=Source.CrossDatabaseLineage,
columnsLineage=column_lineage,
),
)
)
)
return None
def yield_cross_database_lineage(self) -> Iterable[Either[AddLineageRequest]]:
"""
By default cross database lineage is not supported.
"""
logger.info(
f"Processing Cross Database Lineage not supported for {str(self.service_connection.type.value)}"
)
def _iter(
self, *_, **__
) -> Iterable[Either[Union[AddLineageRequest, CreateQueryRequest]]]:
@ -257,3 +325,8 @@ class LineageSource(QueryParserSource, ABC):
logger.warning(
f"Lineage extraction is not supported for {str(self.service_connection.type.value)} connection"
)
if (
self.source_config.processCrossDatabaseLineage
and self.source_config.crossDatabaseServiceNames
):
yield from self.yield_cross_database_lineage() or []

View File

@ -11,7 +11,16 @@
"""
Trino lineage module
"""
import traceback
from typing import Iterable, List
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.services.ingestionPipelines.status import (
StackTraceError,
)
from metadata.ingestion.api.models import Either
from metadata.ingestion.source.database.lineage_source import LineageSource
from metadata.ingestion.source.database.trino.queries import TRINO_SQL_STATEMENT
from metadata.ingestion.source.database.trino.query_parser import TrinoQueryParserSource
@ -32,3 +41,89 @@ class TrinoLineageSource(TrinoQueryParserSource, LineageSource):
OR lower("query") LIKE '%%merge%%'
)
"""
def get_cross_database_fqn_from_service_names(self) -> List[str]:
database_service_names = self.source_config.crossDatabaseServiceNames
return [
database.fullyQualifiedName.root
for service in database_service_names
for database in self.metadata.list_all_entities(
entity=Database, params={"service": service}
)
]
def check_same_table(self, table1: Table, table2: Table) -> bool:
"""
Method to check whether the table1 and table2 are same
"""
return table1.name.root == table2.name.root and {
column.name.root for column in table1.columns
} == {column.name.root for column in table2.columns}
def get_cross_database_lineage(
self, from_table: Table, to_table: Table
) -> Either[AddLineageRequest]:
"""
Method to return cross database lineage request object
"""
column_lineage = None
if from_table and from_table.columns and to_table and to_table.columns:
column_lineage = self.get_column_lineage(
from_table=from_table, to_table=to_table
)
return self.get_add_cross_database_lineage_request(
from_entity=from_table, to_entity=to_table, column_lineage=column_lineage
)
def yield_cross_database_lineage(self) -> Iterable[Either[AddLineageRequest]]:
try:
all_cross_database_fqns = self.get_cross_database_fqn_from_service_names()
cross_database_table_fqn_mapping = {}
# Get all databases for the specified Trino service
trino_databases = self.metadata.list_all_entities(
entity=Database, params={"service": self.config.serviceName}
)
for trino_database in trino_databases:
trino_database_fqn = trino_database.fullyQualifiedName.root
# Get all tables for the specified Trino database schema
trino_tables = self.metadata.list_all_entities(
entity=Table, params={"database": trino_database_fqn}
)
# NOTE: Currently, tables in system-defined schemas will also be checked for lineage.
for trino_table in trino_tables:
trino_table_fqn = trino_table.fullyQualifiedName.root
for cross_database_fqn in all_cross_database_fqns:
# Construct the FQN for cross-database tables
cross_database_table_fqn = trino_table_fqn.replace(
trino_database_fqn, cross_database_fqn
)
# Cache cross-database table against its FQN to avoid repeated API calls
cross_database_table = cross_database_table_fqn_mapping[
cross_database_table_fqn
] = cross_database_table_fqn_mapping.get(
cross_database_table_fqn,
self.metadata.get_by_name(
Table, fqn=cross_database_table_fqn
),
)
# Create cross database lineage request if both tables are same
if cross_database_table and self.check_same_table(
trino_table, cross_database_table
):
yield self.get_cross_database_lineage(
cross_database_table, trino_table
)
break
except Exception as exc:
yield Either(
left=StackTraceError(
name=f"{self.config.serviceName} Cross Database Lineage",
error=(
"Error to yield cross database lineage details "
f"service name [{self.config.serviceName}]: {exc}"
),
stackTrace=traceback.format_exc(),
)
)

View File

@ -29,7 +29,7 @@ slug: /main-concepts/metadata-standard/schemas/type/entitylineage
- **Items**: Refer to *#/definitions/columnLineage*.
- **`pipeline`**: Pipeline where the sqlQuery is periodically run. Refer to *../type/entityReference.json*.
- **`description`** *(string)*: description of lineage.
- **`source`** *(string)*: Lineage type describes how a lineage was created. Must be one of: `['Manual', 'ViewLineage', 'QueryLineage', 'PipelineLineage', 'DashboardLineage', 'DbtLineage']`. Default: `Manual`.
- **`source`** *(string)*: Lineage type describes how a lineage was created. Must be one of: `['Manual', 'ViewLineage', 'QueryLineage', 'PipelineLineage', 'DashboardLineage', 'DbtLineage', 'CrossDatabaseLineage']`. Default: `Manual`.
- **`edge`** *(object)*: Edge in the lineage graph from one entity to another by entity IDs. Cannot contain additional properties.
- **`fromEntity`**: From entity that is upstream of lineage edge. Refer to *basic.json#/definitions/uuid*.
- **`toEntity`**: To entity that is downstream of lineage edge. Refer to *basic.json#/definitions/uuid*.

View File

@ -91,6 +91,20 @@
"default": 1,
"title": "Number of Threads",
"minimum": 1
},
"processCrossDatabaseLineage": {
"title": "Process Cross Database Lineage",
"description": "Set the 'Process Cross Database Lineage' toggle to control whether to process table lineage across different databases.",
"type": "boolean",
"default": false
},
"crossDatabaseServiceNames": {
"title": "Cross Database Service Names",
"description": "Set 'Cross Database Service Names' to process lineage with the database.",
"type": "array",
"items": {
"type": "string"
}
}
},
"additionalProperties": false

View File

@ -52,7 +52,7 @@
"source": {
"description": "Lineage type describes how a lineage was created.",
"type": "string",
"enum": ["Manual", "ViewLineage", "QueryLineage", "PipelineLineage", "DashboardLineage", "DbtLineage", "SparkLineage", "OpenLineage", "ExternalTableLineage"],
"enum": ["Manual", "ViewLineage", "QueryLineage", "PipelineLineage", "DashboardLineage", "DbtLineage", "SparkLineage", "OpenLineage", "ExternalTableLineage", "CrossDatabaseLineage"],
"default": "Manual"
}
}

View File

@ -99,6 +99,7 @@ export const LINEAGE_SOURCE: { [key in Source]: string } = {
[Source.SparkLineage]: 'Spark Lineage',
[Source.ViewLineage]: 'View Lineage',
[Source.OpenLineage]: 'OpenLineage',
[Source.CrossDatabaseLineage]: 'Cross Database Lineage',
[Source.ExternalTableLineage]: 'External Table Lineage',
};

View File

@ -12,7 +12,7 @@
*/
/**
/**
* Add lineage details between two entities
*/
export interface AddLineage {
@ -150,6 +150,7 @@ export interface ColumnLineage {
* Lineage type describes how a lineage was created.
*/
export enum Source {
CrossDatabaseLineage = "CrossDatabaseLineage",
DashboardLineage = "DashboardLineage",
DbtLineage = "DbtLineage",
ExternalTableLineage = "ExternalTableLineage",

View File

@ -19,8 +19,8 @@ export interface EntityLineage {
/**
* Primary entity for which this lineage graph is created.
*/
entity: EntityReference;
nodes?: EntityReference[];
entity: EntityReference;
nodes?: EntityReference[];
upstreamEdges?: Edge[];
}
@ -148,6 +148,7 @@ export interface EntityReference {
* Lineage type describes how a lineage was created.
*/
export enum Source {
CrossDatabaseLineage = "CrossDatabaseLineage",
DashboardLineage = "DashboardLineage",
DbtLineage = "DbtLineage",
ExternalTableLineage = "ExternalTableLineage",