Day 1 - Dashboard service lineage without db_service_name (#19911)

This commit is contained in:
harshsoni2024 2025-03-07 11:16:58 +05:30 committed by GitHub
parent 1b137b7f65
commit 40a9c67875
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
20 changed files with 404 additions and 307 deletions

View File

@ -16,7 +16,17 @@ To be used by OpenMetadata class
import functools
import json
import traceback
from typing import Generic, Iterable, Iterator, List, Optional, Set, Type, TypeVar
from typing import (
Generic,
Iterable,
Iterator,
List,
Optional,
Set,
Type,
TypeVar,
Union,
)
from urllib.parse import quote_plus
from pydantic import Field
@ -29,7 +39,7 @@ from metadata.ingestion.models.custom_pydantic import BaseModel
from metadata.ingestion.ometa.client import REST, APIError
from metadata.ingestion.ometa.utils import quote
from metadata.ingestion.source.models import TableView
from metadata.utils.elasticsearch import ES_INDEX_MAP
from metadata.utils.elasticsearch import ES_INDEX_MAP, get_entity_from_es_result
from metadata.utils.logger import ometa_logger
logger = ometa_logger()
@ -469,3 +479,28 @@ class ESMixin(Generic[T]):
schema_name=schema_name,
table_name=table_name,
)
def search_in_any_service(
self,
entity_type: Type[T],
fqn_search_string: str,
fetch_multiple_entities: bool = False,
) -> Optional[Union[List[Table], Table]]:
"""
fetch table from es when with/without `db_service_name`
"""
try:
entity_result = get_entity_from_es_result(
entity_list=self.es_search_from_fqn(
entity_type=entity_type,
fqn_search_string=fqn_search_string,
),
fetch_multiple_entities=fetch_multiple_entities,
)
return entity_result
except Exception as exc:
logger.debug(
f"Error to fetch entity: fqn={fqn_search_string} from es: {exc}"
)
logger.debug(traceback.format_exc())
return None

View File

@ -252,7 +252,9 @@ class DashboardServiceSource(TopologyRunnerMixin, Source, ABC):
@abstractmethod
def yield_dashboard_lineage_details(
self, dashboard_details: Any, db_service_name: str
self,
dashboard_details: Any,
db_service_name: Optional[str] = None,
) -> Iterable[Either[AddLineageRequest]]:
"""
Get lineage between dashboard and data sources
@ -372,6 +374,8 @@ class DashboardServiceSource(TopologyRunnerMixin, Source, ABC):
yield lineage
db_service_names = self.get_db_service_names()
if not db_service_names:
yield from self.yield_dashboard_lineage_details(dashboard_details) or []
for db_service_name in db_service_names or []:
yield from self.yield_dashboard_lineage_details(
dashboard_details, db_service_name

View File

@ -248,6 +248,8 @@ class DomodashboardSource(DashboardServiceSource):
)
def yield_dashboard_lineage_details(
self, dashboard_details: dict, db_service_name
self,
dashboard_details: dict,
db_service_name: Optional[str] = None,
) -> Iterable[Either[AddLineageRequest]]:
"""No lineage implemented"""

View File

@ -176,7 +176,7 @@ class LightdashSource(DashboardServiceSource):
def yield_dashboard_lineage_details(
self,
dashboard_details: LightdashDashboard,
db_service_name: Optional[str],
db_service_name: Optional[str] = None,
) -> Optional[Iterable[AddLineageRequest]]:
"""Get lineage method

View File

@ -924,7 +924,9 @@ class LookerSource(DashboardServiceSource):
)
def yield_dashboard_lineage_details(
self, dashboard_details: LookerDashboard, _: str
self,
dashboard_details: LookerDashboard,
_: Optional[str] = None,
) -> Iterable[Either[AddLineageRequest]]:
"""
Get lineage between charts and data sources.

View File

@ -20,6 +20,7 @@ from metadata.generated.schema.entity.data.chart import Chart
from metadata.generated.schema.entity.data.dashboard import (
Dashboard as LineageDashboard,
)
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.services.connections.dashboard.metabaseConnection import (
MetabaseConnection,
)
@ -42,9 +43,8 @@ from metadata.generated.schema.type.basic import (
from metadata.generated.schema.type.entityReferenceList import EntityReferenceList
from metadata.ingestion.api.models import Either
from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.lineage.models import ConnectionTypeDialectMapper
from metadata.ingestion.lineage.models import ConnectionTypeDialectMapper, Dialect
from metadata.ingestion.lineage.parser import LineageParser
from metadata.ingestion.lineage.sql_lineage import search_table_entities
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.dashboard.dashboard_service import DashboardServiceSource
from metadata.ingestion.source.dashboard.metabase.models import (
@ -56,6 +56,7 @@ from metadata.ingestion.source.dashboard.metabase.models import (
from metadata.utils import fqn
from metadata.utils.constants import DEFAULT_DASHBAORD
from metadata.utils.filters import filter_by_chart
from metadata.utils.fqn import build_es_fqn_search_string
from metadata.utils.helpers import (
clean_uri,
get_standard_chart_type,
@ -285,15 +286,13 @@ class MetabaseSource(DashboardServiceSource):
def yield_dashboard_lineage_details(
self,
dashboard_details: MetabaseDashboardDetails,
db_service_name: Optional[str],
db_service_name: Optional[str] = None,
) -> Iterable[Either[AddLineageRequest]]:
"""Get lineage method
Args:
dashboard_details
"""
if not db_service_name:
return
chart_ids, dashboard_name = (
dashboard_details.card_ids,
str(dashboard_details.id),
@ -333,11 +332,17 @@ class MetabaseSource(DashboardServiceSource):
)
)
def _get_database_service(self, db_service_name: str):
def _get_database_service(self, db_service_name: Optional[str]):
if not db_service_name:
return None
return self.metadata.get_by_name(DatabaseService, db_service_name)
# pylint: disable=too-many-locals
def _yield_lineage_from_query(
self, chart_details: MetabaseChart, db_service_name: str, dashboard_name: str
self,
chart_details: MetabaseChart,
db_service_name: Optional[str],
dashboard_name: str,
) -> Iterable[Either[AddLineageRequest]]:
database = self.client.get_database(chart_details.database_id)
@ -360,20 +365,23 @@ class MetabaseSource(DashboardServiceSource):
query,
ConnectionTypeDialectMapper.dialect_of(db_service.serviceType.value)
if db_service
else None,
else Dialect.ANSI,
)
for table in lineage_parser.source_tables:
database_schema_name, table = fqn.split(str(table))[-2:]
database_schema_name = self.check_database_schema_name(database_schema_name)
from_entities = search_table_entities(
metadata=self.metadata,
database=database_name,
service_name=db_service_name,
database_schema=database_schema_name,
table=table,
fqn_search_string = build_es_fqn_search_string(
database_name=database_name,
schema_name=database_schema_name,
service_name=db_service_name or "*",
table_name=table,
)
from_entities = self.metadata.search_in_any_service(
entity_type=Table,
fqn_search_string=fqn_search_string,
fetch_multiple_entities=True,
)
to_fqn = fqn.build(
self.metadata,
entity_type=LineageDashboard,
@ -385,13 +393,16 @@ class MetabaseSource(DashboardServiceSource):
fqn=to_fqn,
)
for from_entity in from_entities:
for from_entity in from_entities or []:
yield self._get_add_lineage_request(
to_entity=to_entity, from_entity=from_entity
)
def _yield_lineage_from_api(
self, chart_details: MetabaseChart, db_service_name: str, dashboard_name: str
self,
chart_details: MetabaseChart,
db_service_name: Optional[str],
dashboard_name: str,
) -> Iterable[Either[AddLineageRequest]]:
table = self.client.get_table(chart_details.table_id)
table_name = table.name or table.display_name
@ -400,14 +411,17 @@ class MetabaseSource(DashboardServiceSource):
return
database_name = table.db.details.db if table.db and table.db.details else None
from_entities = search_table_entities(
metadata=self.metadata,
database=database_name,
service_name=db_service_name,
database_schema=table.table_schema,
table=table_name,
fqn_search_string = build_es_fqn_search_string(
database_name=database_name,
schema_name=table.table_schema,
service_name=db_service_name or "*",
table_name=table_name,
)
from_entities = self.metadata.search_in_any_service(
entity_type=Table,
fqn_search_string=fqn_search_string,
fetch_multiple_entities=True,
)
to_fqn = fqn.build(
self.metadata,
entity_type=LineageDashboard,
@ -420,7 +434,7 @@ class MetabaseSource(DashboardServiceSource):
fqn=to_fqn,
)
for from_entity in from_entities:
for from_entity in from_entities or []:
yield self._get_add_lineage_request(
to_entity=to_entity, from_entity=from_entity
)

View File

@ -159,7 +159,9 @@ class MicrostrategySource(DashboardServiceSource):
)
def yield_dashboard_lineage_details(
self, dashboard_details: MstrDashboardDetails, db_service_name: str
self,
dashboard_details: MstrDashboardDetails,
db_service_name: Optional[str] = None,
) -> Optional[Iterable[AddLineageRequest]]:
"""Not Implemented"""

View File

@ -20,6 +20,7 @@ from metadata.generated.schema.entity.data.chart import Chart, ChartType
from metadata.generated.schema.entity.data.dashboard import (
Dashboard as Lineage_Dashboard,
)
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.services.connections.dashboard.modeConnection import (
ModeConnection,
)
@ -38,12 +39,12 @@ from metadata.generated.schema.type.basic import (
from metadata.ingestion.api.models import Either
from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.lineage.parser import LineageParser
from metadata.ingestion.lineage.sql_lineage import search_table_entities
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.dashboard.dashboard_service import DashboardServiceSource
from metadata.ingestion.source.dashboard.mode import client
from metadata.utils import fqn
from metadata.utils.filters import filter_by_chart
from metadata.utils.fqn import build_es_fqn_search_string
from metadata.utils.helpers import clean_uri
from metadata.utils.logger import ingestion_logger
@ -130,7 +131,9 @@ class ModeSource(DashboardServiceSource):
self.register_record(dashboard_request=dashboard_request)
def yield_dashboard_lineage_details(
self, dashboard_details: dict, db_service_name: str
self,
dashboard_details: dict,
db_service_name: Optional[str] = None,
) -> Iterable[Either[AddLineageRequest]]:
"""Get lineage method"""
try:
@ -151,14 +154,18 @@ class ModeSource(DashboardServiceSource):
database_schema_name = self.check_database_schema_name(
database_schema_name
)
from_entities = search_table_entities(
metadata=self.metadata,
database=data_source.get(client.DATABASE),
service_name=db_service_name,
database_schema=database_schema_name,
table=table,
fqn_search_string = build_es_fqn_search_string(
database_name=data_source.get(client.DATABASE),
schema_name=database_schema_name,
service_name=db_service_name or "*",
table_name=table,
)
for from_entity in from_entities:
from_entities = self.metadata.search_in_any_service(
entity_type=Table,
fqn_search_string=fqn_search_string,
fetch_multiple_entities=True,
)
for from_entity in from_entities or []:
to_entity = self.metadata.get_by_name(
entity=Lineage_Dashboard,
fqn=fqn.build(

View File

@ -67,6 +67,7 @@ from metadata.utils.filters import (
filter_by_datamodel,
filter_by_project,
)
from metadata.utils.fqn import build_es_fqn_search_string
from metadata.utils.helpers import clean_uri
from metadata.utils.logger import ingestion_logger
@ -534,7 +535,9 @@ class PowerbiSource(DashboardServiceSource):
)
def create_datamodel_report_lineage(
self, db_service_name: str, dashboard_details: PowerBIReport
self,
db_service_name: Optional[str],
dashboard_details: PowerBIReport,
) -> Iterable[Either[CreateDashboardRequest]]:
"""
create the lineage between datamodel and report
@ -615,7 +618,7 @@ class PowerbiSource(DashboardServiceSource):
def _get_table_and_datamodel_lineage(
self,
db_service_name: str,
db_service_name: Optional[str],
table: PowerBiTable,
datamodel_entity: DashboardDataModel,
) -> Optional[Either[AddLineageRequest]]:
@ -623,19 +626,16 @@ class PowerbiSource(DashboardServiceSource):
Method to create lineage between table and datamodels
"""
try:
table_fqn = fqn.build(
self.metadata,
entity_type=Table,
service_name=db_service_name,
fqn_search_string = build_es_fqn_search_string(
database_name=None,
schema_name=None,
service_name=db_service_name or "*",
table_name=table.name,
)
table_entity = self.metadata.get_by_name(
entity=Table,
fqn=table_fqn,
table_entity = self.metadata.search_in_any_service(
entity_type=Table,
fqn_search_string=fqn_search_string,
)
if table_entity and datamodel_entity:
columns_list = [column.name for column in table.columns]
column_lineage = self._get_column_lineage(
@ -661,7 +661,7 @@ class PowerbiSource(DashboardServiceSource):
def create_table_datamodel_lineage_from_files(
self,
db_service_name: str,
db_service_name: Optional[str],
datamodel_entity: Optional[DashboardDataModel],
) -> Iterable[Either[AddLineageRequest]]:
"""
@ -706,7 +706,7 @@ class PowerbiSource(DashboardServiceSource):
def yield_dashboard_lineage_details(
self,
dashboard_details: Union[PowerBIDashboard, PowerBIReport],
db_service_name: str,
db_service_name: Optional[str] = None,
) -> Iterable[Either[AddLineageRequest]]:
"""
We will build the logic to build the logic as below
@ -715,7 +715,8 @@ class PowerbiSource(DashboardServiceSource):
try:
if isinstance(dashboard_details, PowerBIReport):
yield from self.create_datamodel_report_lineage(
db_service_name=db_service_name, dashboard_details=dashboard_details
db_service_name=db_service_name,
dashboard_details=dashboard_details,
)
if isinstance(dashboard_details, PowerBIDashboard):

View File

@ -47,6 +47,7 @@ from metadata.ingestion.source.dashboard.qliksense.metadata import QliksenseSour
from metadata.ingestion.source.dashboard.qliksense.models import QlikTable
from metadata.utils import fqn
from metadata.utils.filters import filter_by_chart
from metadata.utils.fqn import build_es_fqn_search_string
from metadata.utils.helpers import clean_uri
from metadata.utils.logger import ingestion_logger
@ -188,18 +189,22 @@ class QlikcloudSource(QliksenseSource):
def yield_dashboard_lineage_details(
self,
dashboard_details: QlikApp,
db_service_name: Optional[str],
db_service_name: Optional[str] = None,
) -> Iterable[Either[AddLineageRequest]]:
"""Get lineage method"""
db_service_entity = self.metadata.get_by_name(
entity=DatabaseService, fqn=db_service_name
)
for datamodel in self.data_models or []:
try:
data_model_entity = self._get_datamodel(datamodel_id=datamodel.id)
if data_model_entity:
om_table = self._get_database_table(
db_service_entity, data_model_entity
fqn_search_string = build_es_fqn_search_string(
database_name=None,
schema_name=None,
service_name=db_service_name or "*",
table_name=data_model_entity.displayName,
)
om_table = self.metadata.search_in_any_service(
entity_type=Table,
fqn_search_string=fqn_search_string,
)
if om_table:
columns_list = [col.name for col in datamodel.fields]

View File

@ -55,6 +55,7 @@ from metadata.ingestion.source.dashboard.qliksense.models import (
)
from metadata.utils import fqn
from metadata.utils.filters import filter_by_chart, filter_by_datamodel
from metadata.utils.fqn import build_es_fqn_search_string
from metadata.utils.helpers import clean_uri
from metadata.utils.logger import ingestion_logger
@ -276,7 +277,11 @@ class QliksenseSource(DashboardServiceSource):
return None
def _get_database_table(
self, db_service_entity: DatabaseService, datamodel: QlikTable
self,
db_service_entity: DatabaseService,
datamodel: QlikTable,
schema_name: Optional[str],
database_name: Optional[str],
) -> Optional[Table]:
"""
Get the table entity for lineage
@ -284,17 +289,6 @@ class QliksenseSource(DashboardServiceSource):
# table.name in tableau can come as db.schema.table_name. Hence the logic to split it
if datamodel.tableName and db_service_entity:
try:
if len(datamodel.connectorProperties.tableQualifiers) > 1:
(
database_name,
schema_name,
) = datamodel.connectorProperties.tableQualifiers[-2:]
elif len(datamodel.connectorProperties.tableQualifiers) == 1:
schema_name = datamodel.connectorProperties.tableQualifiers[-1]
database_name = None
else:
schema_name, database_name = None, None
table_fqn = fqn.build(
self.metadata,
entity_type=Table,
@ -316,18 +310,32 @@ class QliksenseSource(DashboardServiceSource):
def yield_dashboard_lineage_details(
self,
dashboard_details: QlikDashboard,
db_service_name: Optional[str],
db_service_name: Optional[str] = None,
) -> Iterable[Either[AddLineageRequest]]:
"""Get lineage method"""
db_service_entity = self.metadata.get_by_name(
entity=DatabaseService, fqn=db_service_name
)
for datamodel in self.data_models or []:
try:
data_model_entity = self._get_datamodel(datamodel_id=datamodel.id)
if data_model_entity:
om_table = self._get_database_table(
db_service_entity, datamodel=datamodel
if len(datamodel.connectorProperties.tableQualifiers) > 1:
(
database_name,
schema_name,
) = datamodel.connectorProperties.tableQualifiers[-2:]
elif len(datamodel.connectorProperties.tableQualifiers) == 1:
schema_name = datamodel.connectorProperties.tableQualifiers[-1]
database_name = None
else:
schema_name, database_name = None, None
fqn_search_string = build_es_fqn_search_string(
database_name=database_name,
schema_name=schema_name,
service_name=db_service_name or "*",
table_name=datamodel.tableName,
)
om_table = self.metadata.search_in_any_service(
entity_type=Table,
fqn_search_string=fqn_search_string,
)
if om_table:
columns_list = [col.name for col in datamodel.fields]

View File

@ -49,9 +49,8 @@ from metadata.generated.schema.type.entityLineage import Source as LineageSource
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.models import Either
from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.lineage.models import ConnectionTypeDialectMapper
from metadata.ingestion.lineage.models import ConnectionTypeDialectMapper, Dialect
from metadata.ingestion.lineage.parser import LineageParser
from metadata.ingestion.lineage.sql_lineage import search_table_entities
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.dashboard.dashboard_service import (
LINEAGE_MAP,
@ -69,6 +68,7 @@ from metadata.ingestion.source.dashboard.quicksight.models import (
from metadata.ingestion.source.database.column_type_parser import ColumnTypeParser
from metadata.utils import fqn
from metadata.utils.filters import filter_by_chart
from metadata.utils.fqn import build_es_fqn_search_string
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
@ -254,12 +254,14 @@ class QuicksightSource(DashboardServiceSource):
data_model_entity,
data_source_resp: DataSourceModel,
dashboard_details: DashboardDetail,
db_service_entity,
db_service_name: Optional[str],
) -> Iterable[Either[AddLineageRequest]]:
"""yield lineage from table(parsed form query source) <-> dashboard"""
if not db_service_entity:
logger.debug(f"db service is not ingested")
return None
db_service_entity = None
if db_service_name:
db_service_entity = self.metadata.get_by_name(
entity=DatabaseService, fqn=db_service_name
)
sql_query = data_source_resp.data_source_resp.query
source_database_names = []
try:
@ -278,7 +280,7 @@ class QuicksightSource(DashboardServiceSource):
db_service_entity.serviceType.value
)
if db_service_entity
else None,
else Dialect.ANSI,
)
lineage_details = LineageDetails(
source=LineageSource.DashboardLineage, sqlQuery=sql_query
@ -289,14 +291,18 @@ class QuicksightSource(DashboardServiceSource):
database_schema_name = self.check_database_schema_name(
database_schema_name
)
from_entities = search_table_entities(
metadata=self.metadata,
database=db_name,
service_name=db_service_entity.name.root,
database_schema=database_schema_name,
table=table,
fqn_search_string = build_es_fqn_search_string(
database_name=db_name,
schema_name=database_schema_name,
service_name=db_service_name or "*",
table_name=table,
)
for from_entity in from_entities:
from_entities = self.metadata.search_in_any_service(
entity_type=Table,
fqn_search_string=fqn_search_string,
fetch_multiple_entities=True,
)
for from_entity in from_entities or []:
if from_entity is not None and data_model_entity is not None:
columns = [
col.name.root for col in data_model_entity.columns
@ -385,7 +391,7 @@ class QuicksightSource(DashboardServiceSource):
data_model_entity,
data_source_resp: DataSourceModel,
dashboard_details: DashboardDetail,
db_service_entity,
db_service_name: Optional[str],
) -> Iterable[Either[AddLineageRequest]]:
"""yield lineage from table <-> dashboard"""
try:
@ -394,17 +400,15 @@ class QuicksightSource(DashboardServiceSource):
if data_source_resp and data_source_resp.DataSourceParameters:
data_source_dict = data_source_resp.DataSourceParameters
for db in data_source_dict.keys() or []:
from_fqn = fqn.build(
self.metadata,
entity_type=Table,
service_name=db_service_entity.name.root,
fqn_search_string = build_es_fqn_search_string(
database_name=data_source_dict[db].get("Database"),
schema_name=schema_name,
service_name=db_service_name or "*",
table_name=table_name,
)
from_entity = self.metadata.get_by_name(
entity=Table,
fqn=from_fqn,
from_entity = self.metadata.search_in_any_service(
entity_type=Table,
fqn_search_string=fqn_search_string,
)
if from_entity is not None and data_model_entity is not None:
columns = [col.name.root for col in data_model_entity.columns]
@ -440,14 +444,13 @@ class QuicksightSource(DashboardServiceSource):
return None
def yield_dashboard_lineage_details( # pylint: disable=too-many-locals
self, dashboard_details: DashboardDetail, db_service_name: str
self,
dashboard_details: DashboardDetail,
db_service_name: Optional[str] = None,
) -> Iterable[Either[AddLineageRequest]]:
"""
Get lineage between dashboard and data sources
"""
db_service_entity = self.metadata.get_by_name(
entity=DatabaseService, fqn=db_service_name
)
for datamodel in self.data_models or []:
try:
data_model_entity = self._get_datamodel(
@ -460,7 +463,7 @@ class QuicksightSource(DashboardServiceSource):
data_model_entity,
datamodel.DataSource,
dashboard_details,
db_service_entity,
db_service_name,
)
elif isinstance(
datamodel.DataSource.data_source_resp, DataSourceRespS3
@ -473,7 +476,7 @@ class QuicksightSource(DashboardServiceSource):
data_model_entity,
datamodel.DataSource,
dashboard_details,
db_service_entity,
db_service_name,
)
except Exception as exc: # pylint: disable=broad-except
yield Either(

View File

@ -48,6 +48,7 @@ from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.dashboard.dashboard_service import DashboardServiceSource
from metadata.utils import fqn
from metadata.utils.filters import filter_by_chart
from metadata.utils.fqn import build_es_fqn_search_string
from metadata.utils.helpers import clean_uri, get_standard_chart_type
from metadata.utils.logger import ingestion_logger
from metadata.utils.tag_utils import get_ometa_tag_and_classification, get_tag_labels
@ -197,14 +198,15 @@ class RedashSource(DashboardServiceSource):
)
def yield_dashboard_lineage_details( # pylint: disable=too-many-locals
self, dashboard_details: dict, db_service_name: str
self,
dashboard_details: dict,
db_service_name: Optional[str] = None,
) -> Iterable[Either[AddLineageRequest]]:
"""
Get lineage between dashboard and data sources
In redash we do not get table, database_schema or database name but we do get query
the lineage is being generated based on the query
"""
to_fqn = fqn.build(
self.metadata,
entity_type=LineageDashboard,
@ -229,17 +231,17 @@ class RedashSource(DashboardServiceSource):
database_schema_name = self.check_database_schema_name(
database_schema
)
from_fqn = fqn.build(
self.metadata,
entity_type=Table,
service_name=db_service_name,
schema_name=database_schema_name,
table_name=database_schema_table.get("table"),
if not database_schema_table.get("table"):
continue
fqn_search_string = build_es_fqn_search_string(
database_name=database_schema_table.get("database"),
schema_name=database_schema_name,
service_name=db_service_name or "*",
table_name=database_schema_table.get("table"),
)
from_entity = self.metadata.get_by_name(
entity=Table,
fqn=from_fqn,
from_entity = self.metadata.search_in_any_service(
entity_type=Table,
fqn_search_string=fqn_search_string,
)
if from_entity and to_entity:
yield self._get_add_lineage_request(

View File

@ -52,6 +52,7 @@ from metadata.ingestion.source.dashboard.sigma.models import (
)
from metadata.utils import fqn
from metadata.utils.filters import filter_by_chart
from metadata.utils.fqn import build_es_fqn_search_string
from metadata.utils.helpers import get_standard_chart_type
from metadata.utils.logger import ingestion_logger
@ -196,39 +197,36 @@ class SigmaSource(DashboardServiceSource):
return None
def _get_table_entity_from_node(
self, node: NodeDetails, db_service_name: str
self, node: NodeDetails, db_service_name: Optional[str]
) -> Optional[Table]:
"""
Get the table entity for lineage
"""
if node.node_schema:
try:
table_fqn = fqn.build(
self.metadata,
entity_type=Table,
service_name=db_service_name,
fqn_search_string = build_es_fqn_search_string(
database_name=None,
schema_name=node.node_schema,
service_name=db_service_name or "*",
table_name=node.name,
database_name="",
)
if table_fqn:
return self.metadata.get_by_name(
entity=Table,
fqn=table_fqn,
)
return self.metadata.search_in_any_service(
entity_type=Table,
fqn_search_string=fqn_search_string,
)
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(f"Error occured while finding table fqn: {exc}")
return None
def yield_dashboard_lineage_details(
self, dashboard_details: WorkbookDetails, db_service_name: Optional[str]
self,
dashboard_details: WorkbookDetails,
db_service_name: Optional[str] = None,
):
"""
yield dashboard lineage
"""
if not db_service_name:
return
# charts and datamodels are same here as we are using charts as metadata for datamodels
for data_model in self.data_models or []:
try:

View File

@ -22,7 +22,6 @@ from metadata.generated.schema.api.data.createDashboardDataModel import (
)
from metadata.generated.schema.entity.data.chart import Chart
from metadata.generated.schema.entity.data.dashboardDataModel import DataModelType
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.services.databaseService import DatabaseService
from metadata.generated.schema.entity.services.ingestionPipelines.status import (
StackTraceError,
@ -42,6 +41,7 @@ from metadata.ingestion.source.dashboard.superset.models import (
)
from metadata.utils import fqn
from metadata.utils.filters import filter_by_datamodel
from metadata.utils.fqn import build_es_fqn_search_string
from metadata.utils.helpers import (
clean_uri,
get_database_name_for_lineage,
@ -130,10 +130,10 @@ class SupersetAPISource(SupersetSourceMixin):
)
def _get_datasource_fqn_for_lineage(
self, chart_json: ChartResult, db_service_entity: DatabaseService
self, chart_json: ChartResult, db_service_name: Optional[str]
):
return (
self._get_datasource_fqn(chart_json.datasource_id, db_service_entity)
self._get_datasource_fqn(chart_json.datasource_id, db_service_name)
if chart_json.datasource_id
else None
)
@ -173,34 +173,33 @@ class SupersetAPISource(SupersetSourceMixin):
)
def _get_datasource_fqn(
self, datasource_id: str, db_service_entity: DatabaseService
self, datasource_id: str, db_service_name: Optional[str]
) -> Optional[str]:
try:
datasource_json = self.client.fetch_datasource(datasource_id)
if datasource_json:
database_json = self.client.fetch_database(
datasource_json.result.database.id
)
default_database_name = (
database_json.result.parameters.database
if database_json.result.parameters
else None
)
database_name = get_database_name_for_lineage(
db_service_entity, default_database_name
)
if database_json:
dataset_fqn = fqn.build(
self.metadata,
entity_type=Table,
table_name=datasource_json.result.table_name,
schema_name=datasource_json.result.table_schema,
database_name=database_name,
service_name=db_service_entity.name.root,
database_name = None
if db_service_name:
database_json = self.client.fetch_database(
datasource_json.result.database.id
)
return dataset_fqn
default_database_name = (
database_json.result.parameters.database
if database_json.result.parameters
else None
)
db_service_entity = self.metadata.get_by_name(
entity=DatabaseService, fqn=db_service_name
)
database_name = get_database_name_for_lineage(
db_service_entity, default_database_name
)
return build_es_fqn_search_string(
database_name=database_name,
schema_name=datasource_json.result.table_schema,
service_name=db_service_name or "*",
table_name=datasource_json.result.table_name,
)
except Exception as err:
logger.debug(traceback.format_exc())
logger.warning(

View File

@ -25,7 +25,6 @@ from metadata.generated.schema.api.data.createDashboardDataModel import (
)
from metadata.generated.schema.entity.data.chart import Chart
from metadata.generated.schema.entity.data.dashboardDataModel import DataModelType
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.services.connections.database.mysqlConnection import (
MysqlConnection,
)
@ -58,6 +57,7 @@ from metadata.ingestion.source.dashboard.superset.queries import (
)
from metadata.utils import fqn
from metadata.utils.filters import filter_by_datamodel
from metadata.utils.fqn import build_es_fqn_search_string
from metadata.utils.helpers import (
clean_uri,
get_database_name_for_lineage,
@ -160,10 +160,10 @@ class SupersetDBSource(SupersetSourceMixin):
)
def _get_datasource_fqn_for_lineage(
self, chart_json: FetchChart, db_service_entity: DatabaseService
self, chart_json: FetchChart, db_service_name: Optional[str]
):
return (
self._get_datasource_fqn(db_service_entity, chart_json)
self._get_datasource_fqn(db_service_name, chart_json)
if chart_json.table_name
else None
)
@ -216,20 +216,23 @@ class SupersetDBSource(SupersetSourceMixin):
return get_database_name_for_lineage(db_service_entity, default_db_name)
def _get_datasource_fqn(
self, db_service_entity: DatabaseService, chart_json: FetchChart
self, db_service_name: Optional[str], chart_json: FetchChart
) -> Optional[str]:
try:
dataset_fqn = fqn.build(
self.metadata,
entity_type=Table,
table_name=chart_json.table_name,
database_name=self._get_database_name(
database_name = None
if db_service_name:
db_service_entity = self.metadata.get_by_name(
entity=DatabaseService, fqn=db_service_name
)
database_name = self._get_database_name(
chart_json.sqlalchemy_uri, db_service_entity
),
)
return build_es_fqn_search_string(
database_name=database_name,
schema_name=chart_json.table_schema,
service_name=db_service_entity.name.root,
service_name=db_service_name or "*",
table_name=chart_json.table_name,
)
return dataset_fqn
except Exception as err:
logger.debug(traceback.format_exc())
logger.warning(

View File

@ -30,7 +30,6 @@ from metadata.generated.schema.entity.services.connections.metadata.openMetadata
from metadata.generated.schema.entity.services.dashboardService import (
DashboardServiceType,
)
from metadata.generated.schema.entity.services.databaseService import DatabaseService
from metadata.generated.schema.entity.services.ingestionPipelines.status import (
StackTraceError,
)
@ -245,20 +244,21 @@ class SupersetSourceMixin(DashboardServiceSource):
self,
from_entities: List[Tuple[FetchChart, Dict[str, List[str]]]],
to_entity: DashboardDataModel,
db_service_entity: DatabaseService,
db_service_name: Optional[str],
):
result = []
for from_entity in from_entities:
input_table, _column_lineage = from_entity
datasource_fqn = self._get_datasource_fqn_for_lineage(
input_table, db_service_entity
input_table, db_service_name
)
from_entity = self.metadata.get_by_name(
entity=Table,
fqn=datasource_fqn,
from_entity = self.metadata.search_in_any_service(
entity_type=Table,
fqn_search_string=datasource_fqn,
)
if not from_entity:
continue
column_lineage: List[ColumnLineage] = []
for to_column, from_columns in _column_lineage.items():
_from_columns = [
@ -308,49 +308,45 @@ class SupersetSourceMixin(DashboardServiceSource):
def yield_dashboard_lineage_details(
self,
dashboard_details: Union[FetchDashboard, DashboardResult],
db_service_name: DatabaseService,
db_service_name: Optional[str] = None,
) -> Iterable[Either[AddLineageRequest]]:
"""
Get lineage between datamodel and table
"""
db_service_entity = self.metadata.get_by_name(
entity=DatabaseService, fqn=db_service_name
)
if db_service_entity:
for chart_json in filter(
None,
[
self.all_charts.get(chart_id)
for chart_id in self._get_charts_of_dashboard(dashboard_details)
],
):
try:
to_entity = self._get_dashboard_data_model_entity(chart_json)
for chart_json in filter(
None,
[
self.all_charts.get(chart_id)
for chart_id in self._get_charts_of_dashboard(dashboard_details)
],
):
try:
to_entity = self._get_dashboard_data_model_entity(chart_json)
if to_entity:
_input_tables = self._get_input_tables(chart_json)
input_tables = self._enrich_raw_input_tables(
_input_tables, to_entity, db_service_entity
)
for input_table in input_tables:
from_entity_table, column_lineage = input_table
yield self._get_add_lineage_request(
to_entity=to_entity,
from_entity=from_entity_table,
column_lineage=column_lineage,
)
except Exception as exc:
yield Either(
left=StackTraceError(
name=db_service_name,
error=(
"Error to yield dashboard lineage details for DB "
f"service name [{db_service_name}]: {exc}"
),
stackTrace=traceback.format_exc(),
)
if to_entity:
_input_tables = self._get_input_tables(chart_json)
input_tables = self._enrich_raw_input_tables(
_input_tables, to_entity, db_service_name
)
for input_table in input_tables:
from_entity_table, column_lineage = input_table
yield self._get_add_lineage_request(
to_entity=to_entity,
from_entity=from_entity_table,
column_lineage=column_lineage,
)
except Exception as exc:
yield Either(
left=StackTraceError(
name=db_service_name,
error=(
"Error to yield dashboard lineage details for DB "
f"service name [{db_service_name}]: {exc}"
),
stackTrace=traceback.format_exc(),
)
)
def _get_datamodel(
self, datamodel: Union[SupersetDatasource, FetchChart]

View File

@ -60,9 +60,9 @@ from metadata.generated.schema.type.entityReferenceList import EntityReferenceLi
from metadata.generated.schema.type.usageRequest import UsageRequest
from metadata.ingestion.api.models import Either
from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.lineage.models import ConnectionTypeDialectMapper
from metadata.ingestion.lineage.models import ConnectionTypeDialectMapper, Dialect
from metadata.ingestion.lineage.parser import LineageParser
from metadata.ingestion.lineage.sql_lineage import get_column_fqn, search_table_entities
from metadata.ingestion.lineage.sql_lineage import get_column_fqn
from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.dashboard.dashboard_service import (
@ -82,6 +82,7 @@ from metadata.ingestion.source.dashboard.tableau.models import (
from metadata.ingestion.source.database.column_type_parser import ColumnTypeParser
from metadata.utils import fqn
from metadata.utils.filters import filter_by_chart, filter_by_datamodel
from metadata.utils.fqn import build_es_fqn_search_string
from metadata.utils.helpers import (
clean_uri,
get_database_name_for_lineage,
@ -457,7 +458,7 @@ class TableauSource(DashboardServiceSource):
self,
upstream_data_model: DataSource,
datamodel: DataSource,
db_service_entity: DatabaseService,
db_service_name: Optional[str],
upstream_data_model_entity: DashboardDataModel,
) -> Iterable[Either[AddLineageRequest]]:
"""
@ -471,7 +472,7 @@ class TableauSource(DashboardServiceSource):
if column is not None
}
for table in datamodel.upstreamTables or []:
om_tables = self._get_database_tables(db_service_entity, table)
om_tables = self._get_database_tables(db_service_name, table)
for om_table_and_query in om_tables or []:
column_lineage = self._get_column_lineage(
table,
@ -570,7 +571,7 @@ class TableauSource(DashboardServiceSource):
self,
datamodel: DataSource,
data_model_entity: DashboardDataModel,
db_service_entity: DatabaseService,
db_service_name: Optional[str],
) -> Iterable[Either[AddLineageRequest]]:
""" "
Method to create lineage between tables<->published datasource<->embedded datasource
@ -594,7 +595,7 @@ class TableauSource(DashboardServiceSource):
yield from self._get_table_datamodel_lineage(
upstream_data_model=upstream_data_model,
datamodel=datamodel,
db_service_entity=db_service_entity,
db_service_name=db_service_name,
upstream_data_model_entity=upstream_data_model_entity,
)
except Exception as err:
@ -603,14 +604,16 @@ class TableauSource(DashboardServiceSource):
name="Lineage",
error=(
"Error to yield datamodel table lineage details for DB "
f"service name [{db_service_entity.name}]: {err}"
f"service name [{db_service_name}]: {err}"
),
stackTrace=traceback.format_exc(),
)
)
def yield_dashboard_lineage_details(
self, dashboard_details: TableauDashboard, db_service_name: str
self,
dashboard_details: TableauDashboard,
db_service_name: Optional[str] = None,
) -> Iterable[Either[AddLineageRequest]]:
"""
This method creates the lineage between tables and datamodels
@ -622,42 +625,38 @@ class TableauSource(DashboardServiceSource):
Returns:
Lineage request between Data Models and Database tables
"""
db_service_entity = self.metadata.get_by_name(
entity=DatabaseService, fqn=db_service_name
)
if db_service_entity:
for datamodel in dashboard_details.dataModels or []:
try:
data_model_entity = self._get_datamodel(datamodel=datamodel)
if data_model_entity:
if datamodel.upstreamDatasources:
# if we have upstreamDatasources(Published Datasources), create lineage in below format
# Table<->Published Datasource<->Embedded Datasource
yield from self._get_datamodel_table_lineage(
datamodel=datamodel,
data_model_entity=data_model_entity,
db_service_entity=db_service_entity,
)
else:
# else we'll create lineage only using Embedded Datasources in below format
# Table<->Embedded Datasource
yield from self._get_table_datamodel_lineage(
upstream_data_model=datamodel,
datamodel=datamodel,
db_service_entity=db_service_entity,
upstream_data_model_entity=data_model_entity,
)
except Exception as err:
yield Either(
left=StackTraceError(
name="Lineage",
error=(
"Error to yield dashboard lineage details for DB "
f"service name [{db_service_name}]: {err}"
),
stackTrace=traceback.format_exc(),
for datamodel in dashboard_details.dataModels or []:
try:
data_model_entity = self._get_datamodel(datamodel=datamodel)
if data_model_entity:
if datamodel.upstreamDatasources:
# if we have upstreamDatasources(Published Datasources), create lineage in below format
# Table<->Published Datasource<->Embedded Datasource
yield from self._get_datamodel_table_lineage(
datamodel=datamodel,
data_model_entity=data_model_entity,
db_service_name=db_service_name,
)
else:
# else we'll create lineage only using Embedded Datasources in below format
# Table<->Embedded Datasource
yield from self._get_table_datamodel_lineage(
upstream_data_model=datamodel,
datamodel=datamodel,
db_service_name=db_service_name,
upstream_data_model_entity=data_model_entity,
)
except Exception as err:
yield Either(
left=StackTraceError(
name="Lineage",
error=(
"Error to yield dashboard lineage details for DB "
f"service name [{db_service_name}]: {err}"
),
stackTrace=traceback.format_exc(),
)
)
def yield_dashboard_chart(
self, dashboard_details: TableauDashboard
@ -722,7 +721,7 @@ class TableauSource(DashboardServiceSource):
self.metadata.close()
def _get_table_entities_from_api(
self, db_service_entity: DatabaseService, table: UpstreamTable
self, db_service_name: Optional[str], table: UpstreamTable
) -> Optional[List[TableAndQuery]]:
"""
In case we get the table details from the Graphql APIs we process them
@ -734,39 +733,40 @@ class TableauSource(DashboardServiceSource):
if table.database and table.database.name
else database_schema_table.get("database")
)
if isinstance(db_service_entity.connection.config, BigQueryConnection):
database_name = None
database_name = get_database_name_for_lineage(
db_service_entity, database_name
)
if db_service_name:
db_service_entity = self.metadata.get_by_name(
entity=DatabaseService, fqn=db_service_name
)
if isinstance(db_service_entity.connection.config, BigQueryConnection):
database_name = None
database_name = get_database_name_for_lineage(
db_service_entity, database_name
)
schema_name = (
table.schema_
if table.schema_
else database_schema_table.get("database_schema")
)
table_name = database_schema_table.get("table")
table_fqn = fqn.build(
self.metadata,
entity_type=Table,
service_name=db_service_entity.name.root,
schema_name=schema_name,
table_name=table_name,
fqn_search_string = build_es_fqn_search_string(
database_name=database_name,
schema_name=schema_name,
service_name=db_service_name or "*",
table_name=table_name,
)
if table_fqn:
table_entity = self.metadata.get_by_name(
entity=Table,
fqn=table_fqn,
)
if table_entity:
return [TableAndQuery(table=table_entity)]
table_entity = self.metadata.search_in_any_service(
entity_type=Table,
fqn_search_string=fqn_search_string,
)
if table_entity:
return [TableAndQuery(table=table_entity)]
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(f"Error to get tables for lineage using GraphQL Apis: {exc}")
return None
def _get_table_entities_from_query(
self, db_service_entity: DatabaseService, table: UpstreamTable
self, db_service_name: Optional[str], table: UpstreamTable
) -> Optional[List[TableAndQuery]]:
"""
In case we get the table details from the Graphql APIs we process them
@ -774,34 +774,44 @@ class TableauSource(DashboardServiceSource):
tables_list = []
try:
for custom_sql_table in table.referencedByQueries or []:
db_service_entity = None
if db_service_name:
db_service_entity = self.metadata.get_by_name(
entity=DatabaseService, fqn=db_service_name
)
lineage_parser = LineageParser(
custom_sql_table.query,
ConnectionTypeDialectMapper.dialect_of(
db_service_entity.serviceType.value
)
if db_service_entity
else None,
else Dialect.ANSI,
)
for source_table in lineage_parser.source_tables or []:
database_schema_table = fqn.split_table_name(str(source_table))
database_name = database_schema_table.get("database")
if isinstance(
db_service_entity.connection.config, BigQueryConnection
):
database_name = None
database_name = get_database_name_for_lineage(
db_service_entity, database_name
)
if db_service_entity:
if isinstance(
db_service_entity.connection.config, BigQueryConnection
):
database_name = None
database_name = get_database_name_for_lineage(
db_service_entity, database_name
)
schema_name = self.check_database_schema_name(
database_schema_table.get("database_schema")
)
table_name = database_schema_table.get("table")
from_entities = search_table_entities(
metadata=self.metadata,
database=database_name,
service_name=db_service_entity.fullyQualifiedName.root,
database_schema=schema_name,
table=table_name,
fqn_search_string = build_es_fqn_search_string(
database_name=database_name,
schema_name=schema_name,
service_name=db_service_name or "*",
table_name=table_name,
)
from_entities = self.metadata.search_in_any_service(
entity_type=Table,
fqn_search_string=fqn_search_string,
fetch_multiple_entities=True,
)
tables_list.extend(
[
@ -816,7 +826,7 @@ class TableauSource(DashboardServiceSource):
return tables_list or []
def _get_database_tables(
self, db_service_entity: DatabaseService, table: UpstreamTable
self, db_service_name: Optional[str], table: UpstreamTable
) -> Optional[List[TableAndQuery]]:
"""
Get the table entities for lineage
@ -824,12 +834,12 @@ class TableauSource(DashboardServiceSource):
# If we get the table details from the Graphql APIs we process them directly
if table.name:
return self._get_table_entities_from_api(
db_service_entity=db_service_entity, table=table
db_service_name=db_service_name, table=table
)
# Else we get the table details from the SQL queries and process them using SQL lineage parser
if table.referencedByQueries:
return self._get_table_entities_from_query(
db_service_entity=db_service_entity, table=table
db_service_name=db_service_name, table=table
)
return None

View File

@ -44,7 +44,6 @@ from metadata.generated.schema.type.entityLineage import Source as LineageSource
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.models import Either
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.dashboard.metabase import metadata as MetabaseMetadata
from metadata.ingestion.source.dashboard.metabase.metadata import MetabaseSource
from metadata.ingestion.source.dashboard.metabase.models import (
DatasetQuery,
@ -277,7 +276,7 @@ class MetabaseUnitTest(TestCase):
@patch.object(fqn, "build", return_value=None)
@patch.object(OpenMetadata, "get_by_name", return_value=EXAMPLE_DASHBOARD)
@patch.object(MetabaseMetadata, "search_table_entities", return_value=EXAMPLE_TABLE)
@patch.object(OpenMetadata, "search_in_any_service", return_value=EXAMPLE_TABLE)
@patch.object(
MetabaseSource, "_get_database_service", return_value=MOCK_DATABASE_SERVICE
)
@ -294,7 +293,7 @@ class MetabaseUnitTest(TestCase):
result = self.metabase.yield_dashboard_lineage_details(
dashboard_details=MOCK_DASHBOARD_DETAILS, db_service_name=None
)
self.assertEqual(list(result), [])
self.assertEqual(next(result).right, EXPECTED_LINEAGE)
# test out _yield_lineage_from_api
mock_dashboard = deepcopy(MOCK_DASHBOARD_DETAILS)

View File

@ -16,6 +16,7 @@ import json
import uuid
from pathlib import Path
from unittest import TestCase
from unittest.mock import patch
import sqlalchemy
from collate_sqllineage.core.models import Column, Schema, SubQuery, Table
@ -224,7 +225,7 @@ MOCK_DATASOURCE = [
EXPECTED_ALL_CHARTS_DB = {1: MOCK_CHART_DB_2}
NOT_FOUND_RESP = {"message": "Not found"}
EXPECTED_API_DATASET_FQN = None
EXPECTED_API_DATASET_FQN = "test_postgres.*.main.wb_health_population"
EXPECTED_DATASET_FQN = "test_postgres.examples.main.wb_health_population"
@ -577,19 +578,25 @@ class SupersetUnitTest(TestCase):
self.assertEqual(dashboard_charts, EXPECTED_CHART)
def test_api_get_datasource_fqn(self):
"""
Test generated datasource fqn for api source
"""
fqn = self.superset_api._get_datasource_fqn( # pylint: disable=protected-access
1, MOCK_DB_POSTGRES_SERVICE
)
self.assertEqual(fqn, EXPECTED_API_DATASET_FQN)
with patch.object(
OpenMetadata, "get_by_name", return_value=MOCK_DB_POSTGRES_SERVICE
):
"""
Test generated datasource fqn for api source
"""
fqn = self.superset_api._get_datasource_fqn( # pylint: disable=protected-access
1, MOCK_DB_POSTGRES_SERVICE.name.root
)
self.assertEqual(fqn, EXPECTED_API_DATASET_FQN)
def test_db_get_datasource_fqn_for_lineage(self):
fqn = self.superset_db._get_datasource_fqn_for_lineage( # pylint: disable=protected-access
MOCK_CHART_DB, MOCK_DB_POSTGRES_SERVICE
)
self.assertEqual(fqn, EXPECTED_DATASET_FQN)
with patch.object(
OpenMetadata, "get_by_name", return_value=MOCK_DB_POSTGRES_SERVICE
):
fqn = self.superset_db._get_datasource_fqn_for_lineage( # pylint: disable=protected-access
MOCK_CHART_DB, MOCK_DB_POSTGRES_SERVICE.name.root
)
self.assertEqual(fqn, EXPECTED_DATASET_FQN)
def test_db_get_database_name(self):
sqa_str1 = "postgres://user:pass@localhost:8888/database"