Fix #9281: Column level lineage for other entities (#15827)

This commit is contained in:
harshsoni2024 2024-04-18 12:20:33 +05:30 committed by GitHub
parent c591db4eba
commit 39fc846fdb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 72 additions and 5 deletions

View File

@ -52,6 +52,7 @@ from metadata.ingestion.api.delete import delete_entity_from_source
from metadata.ingestion.api.models import Either, Entity from metadata.ingestion.api.models import Either, Entity
from metadata.ingestion.api.steps import Source from metadata.ingestion.api.steps import Source
from metadata.ingestion.api.topology_runner import C, TopologyRunnerMixin from metadata.ingestion.api.topology_runner import C, TopologyRunnerMixin
from metadata.ingestion.lineage.sql_lineage import get_column_fqn
from metadata.ingestion.models.delete_entity import DeleteEntity from metadata.ingestion.models.delete_entity import DeleteEntity
from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification
from metadata.ingestion.models.patch_request import PatchRequest from metadata.ingestion.models.patch_request import PatchRequest
@ -480,6 +481,20 @@ class DashboardServiceSource(TopologyRunnerMixin, Source, ABC):
return None return None
@staticmethod
def _get_data_model_column_fqn(
data_model_entity: DashboardDataModel, column: str
) -> Optional[str]:
"""
Get fqn of column if exist in table entity
"""
if not data_model_entity:
return None
for tbl_column in data_model_entity.columns:
if tbl_column.displayName.lower() == column.lower():
return tbl_column.fullyQualifiedName.__root__
return None
def get_dashboard(self) -> Any: def get_dashboard(self) -> Any:
""" """
Method to iterate through dashboard lists filter dashboards & yield dashboard details Method to iterate through dashboard lists filter dashboards & yield dashboard details
@ -615,3 +630,29 @@ class DashboardServiceSource(TopologyRunnerMixin, Source, ABC):
) )
patch_request.new_entity.dataModels = datamodel_entity_ref_list patch_request.new_entity.dataModels = datamodel_entity_ref_list
return patch_request return patch_request
def _get_column_lineage(
self,
om_table: Table,
data_model_entity: DashboardDataModel,
columns_list: List[str],
) -> List[ColumnLineage]:
"""
Get the column lineage from the fields
"""
try:
column_lineage = []
for field in columns_list or []:
from_column = get_column_fqn(table_entity=om_table, column=field)
to_column = self._get_data_model_column_fqn(
data_model_entity=data_model_entity,
column=field,
)
if from_column and to_column:
column_lineage.append(
ColumnLineage(fromColumns=[from_column], toColumn=to_column)
)
return column_lineage
except Exception as exc:
logger.debug(f"Error to get column lineage: {exc}")
logger.debug(traceback.format_exc())

View File

@ -35,6 +35,7 @@ from metadata.generated.schema.metadataIngestion.workflow import (
from metadata.ingestion.api.models import Either from metadata.ingestion.api.models import Either
from metadata.ingestion.api.steps import InvalidSourceException from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.dashboard.qlikcloud.client import QlikCloudClient
from metadata.ingestion.source.dashboard.qlikcloud.models import QlikApp, QlikAppList from metadata.ingestion.source.dashboard.qlikcloud.models import QlikApp, QlikAppList
from metadata.ingestion.source.dashboard.qliksense.metadata import QliksenseSource from metadata.ingestion.source.dashboard.qliksense.metadata import QliksenseSource
from metadata.ingestion.source.dashboard.qliksense.models import QlikTable from metadata.ingestion.source.dashboard.qliksense.models import QlikTable
@ -52,6 +53,7 @@ class QlikcloudSource(QliksenseSource):
""" """
config: WorkflowSource config: WorkflowSource
client: QlikCloudClient
metadata_config: OpenMetadataConnection metadata_config: OpenMetadataConnection
@classmethod @classmethod
@ -182,16 +184,22 @@ class QlikcloudSource(QliksenseSource):
db_service_entity = self.metadata.get_by_name( db_service_entity = self.metadata.get_by_name(
entity=DatabaseService, fqn=db_service_name entity=DatabaseService, fqn=db_service_name
) )
for datamodel_id in self.context.get().dataModels or []: for datamodel in self.data_models or []:
try: try:
data_model_entity = self._get_datamodel(datamodel_id=datamodel_id) data_model_entity = self._get_datamodel(datamodel_id=datamodel.id)
if data_model_entity: if data_model_entity:
om_table = self._get_database_table( om_table = self._get_database_table(
db_service_entity, data_model_entity db_service_entity, data_model_entity
) )
if om_table: if om_table:
columns_list = [col.name for col in datamodel.fields]
column_lineage = self._get_column_lineage(
om_table, data_model_entity, columns_list
)
yield self._get_add_lineage_request( yield self._get_add_lineage_request(
to_entity=data_model_entity, from_entity=om_table to_entity=data_model_entity,
from_entity=om_table,
column_lineage=column_lineage,
) )
except Exception as err: except Exception as err:
yield Either( yield Either(

View File

@ -314,8 +314,14 @@ class QliksenseSource(DashboardServiceSource):
db_service_entity, datamodel=datamodel db_service_entity, datamodel=datamodel
) )
if om_table: if om_table:
columns_list = [col.name for col in datamodel.fields]
column_lineage = self._get_column_lineage(
om_table, data_model_entity, columns_list
)
yield self._get_add_lineage_request( yield self._get_add_lineage_request(
to_entity=data_model_entity, from_entity=om_table to_entity=data_model_entity,
from_entity=om_table,
column_lineage=column_lineage,
) )
except Exception as err: except Exception as err:
yield Either( yield Either(

View File

@ -182,9 +182,21 @@ class SupersetSourceMixin(DashboardServiceSource):
fqn=datamodel_fqn, fqn=datamodel_fqn,
) )
datasource_json = self.client.fetch_datasource(
chart_json.datasource_id
)
datasource_columns = self.get_column_info(
datasource_json.result.columns
)
columns_list = [col.displayName for col in datasource_columns]
column_lineage = self._get_column_lineage(
from_entity, to_entity, columns_list
)
if from_entity and to_entity: if from_entity and to_entity:
yield self._get_add_lineage_request( yield self._get_add_lineage_request(
to_entity=to_entity, from_entity=from_entity to_entity=to_entity,
from_entity=from_entity,
column_lineage=column_lineage,
) )
except Exception as exc: except Exception as exc:
yield Either( yield Either(