Add: Looker explore to view Column Lineage (#21504)

* Add: explore to view Column Lineage

* Add tags ingestion and fix cll warnings

* lint

* Addressed comments

* fixed tests
This commit is contained in:
Suman Maharana 2025-06-03 20:23:43 +05:30 committed by SumanMaharana
parent b6705299b0
commit c201d52dbc
3 changed files with 163 additions and 66 deletions

View File

@ -112,6 +112,7 @@ from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.lineage.models import ConnectionTypeDialectMapper, Dialect
from metadata.ingestion.lineage.parser import LineageParser
from metadata.ingestion.lineage.sql_lineage import get_column_fqn
from metadata.ingestion.models.ometa_classification import OMetaTagAndClassification
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.dashboard.dashboard_service import (
DashboardServiceSource,
@ -136,6 +137,7 @@ from metadata.utils import fqn
from metadata.utils.filters import filter_by_chart, filter_by_datamodel
from metadata.utils.helpers import clean_uri, get_standard_chart_type
from metadata.utils.logger import ingestion_logger
from metadata.utils.tag_utils import get_ometa_tag_and_classification, get_tag_labels
logger = ingestion_logger()
@ -162,6 +164,8 @@ GET_DASHBOARD_FIELDS = [
TEMP_FOLDER_DIRECTORY = os.path.join(os.getcwd(), "tmp")
REPO_TMP_LOCAL_PATH = f"{TEMP_FOLDER_DIRECTORY}/lookml_repos"
LOOKER_TAG_CATEGORY = "LookerTags"
def clean_dashboard_name(name: str) -> str:
"""
@ -433,6 +437,21 @@ class LookerSource(DashboardServiceSource):
)
return _datamodel
def yield_data_model_tags(
self, tags: List[str]
) -> Iterable[Either[OMetaTagAndClassification]]:
"""
Method to yield tags related to specific dashboards
"""
if tags and self.source_config.includeTags:
yield from get_ometa_tag_and_classification(
tags=tags or [],
classification_name=LOOKER_TAG_CATEGORY,
tag_description="Looker Tag",
classification_description="Tags associated with looker entities",
include_tags=self.source_config.includeTags,
)
def yield_bulk_datamodel(
self, model: LookmlModelExplore
) -> Iterable[Either[CreateDashboardDataModelRequest]]:
@ -447,6 +466,8 @@ class LookerSource(DashboardServiceSource):
):
self.status.filter(datamodel_name, "Data model filtered out.")
else:
if model.tags and self.source_config.includeTags:
yield from self.yield_data_model_tags(model.tags or [])
explore_datamodel = CreateDashboardDataModelRequest(
name=EntityName(datamodel_name),
displayName=model.name,
@ -454,6 +475,12 @@ class LookerSource(DashboardServiceSource):
if model.description
else None,
service=self.context.get().dashboard_service,
tags=get_tag_labels(
metadata=self.metadata,
tags=model.tags or [],
classification_name=LOOKER_TAG_CATEGORY,
include_tags=self.source_config.includeTags,
),
dataModelType=DataModelType.LookMlExplore.value,
serviceType=DashboardServiceType.Looker.value,
columns=get_columns_from_model(model),
@ -551,15 +578,24 @@ class LookerSource(DashboardServiceSource):
view: Optional[LookMlView] = project_parser.find_view(view_name=view_name)
if view:
if view.tags and self.source_config.includeTags:
yield from self.yield_data_model_tags(view.tags or [])
datamodel_view_name = (
build_datamodel_name(explore.model_name, view.name) + "_view"
)
data_model_request = CreateDashboardDataModelRequest(
name=EntityName(
build_datamodel_name(explore.model_name, view.name)
),
name=EntityName(datamodel_view_name),
displayName=view.name,
description=Markdown(view.description)
if view.description
else None,
service=self.context.get().dashboard_service,
tags=get_tag_labels(
metadata=self.metadata,
tags=view.tags or [],
classification_name=LOOKER_TAG_CATEGORY,
include_tags=self.source_config.includeTags,
),
dataModelType=DataModelType.LookMlView.value,
serviceType=DashboardServiceType.Looker.value,
columns=get_columns_from_model(view),
@ -568,9 +604,7 @@ class LookerSource(DashboardServiceSource):
project=explore.project_name,
)
yield Either(right=data_model_request)
self._view_data_model = self._build_data_model(
build_datamodel_name(explore.model_name, view.name)
)
self._view_data_model = self._build_data_model(datamodel_view_name)
self.register_record_datamodel(datamodel_request=data_model_request)
yield from self.add_view_lineage(view, explore)
else:
@ -701,6 +735,57 @@ class LookerSource(DashboardServiceSource):
logger.debug(traceback.format_exc())
return []
def _get_explore_column_lineage(
self, explore_model: LookmlModelExplore
) -> Optional[List[ColumnLineage]]:
"""
Build the lineage between the view and the explore
"""
processed_column_lineage = []
for field in explore_model.columns or []:
try:
# Look for fields with format view_name.col
field_name = field.name.root
if "." not in field_name:
logger.debug(
f"Field [{field_name}] does not have a view name. Skipping."
)
continue
view_name, col_name = field_name.split(".")
if view_name != self._view_data_model.displayName:
logger.debug(
f"View name [{view_name}] do not match the view name"
f"[{self._view_data_model.displayName}] Skipping."
)
continue
# Add lineage from view column to explore column
view_col = None
for col in self._view_data_model.columns:
if (
col.displayName and col.displayName.lower() == col_name.lower()
) or (col.name.root.lower() == col_name.lower()):
view_col = col
break
from_column = view_col.fullyQualifiedName.root if view_col else None
to_column = self._get_data_model_column_fqn(
data_model_entity=explore_model, column=str(field.name.root)
)
if from_column and to_column:
processed_column_lineage.append(
ColumnLineage(fromColumns=[from_column], toColumn=to_column)
)
except Exception as err:
logger.warning(
"Error processing column lineage for explore_model"
f"[{explore_model.name}] field [{field.name}]: {err}"
)
logger.debug(traceback.format_exc())
continue
return processed_column_lineage
def add_view_lineage(
self, view: LookMlView, explore: LookmlModelExplore
) -> Iterable[Either[AddLineageRequest]]:
@ -718,8 +803,11 @@ class LookerSource(DashboardServiceSource):
logger.debug(
f"Building lineage request for view {self._view_data_model.name} to explore {explore_model.name}"
)
column_lineage = self._get_explore_column_lineage(explore_model)
yield self._get_add_lineage_request(
from_entity=self._view_data_model, to_entity=explore_model
from_entity=self._view_data_model,
to_entity=explore_model,
column_lineage=column_lineage,
)
else:
@ -1059,6 +1147,55 @@ class LookerSource(DashboardServiceSource):
)
)
def _process_and_validate_column_lineage(
self,
column_lineage: List[Tuple[Column, Column]],
from_entity: Table,
to_entity: Union[Dashboard, DashboardDataModel],
) -> List[ColumnLineage]:
"""
Process and validate column lineage
"""
processed_column_lineage = []
if column_lineage:
for column_tuple in column_lineage or []:
try:
if len(column_tuple) < 2:
logger.debug(f"Skipping invalid column tuple: {column_tuple}")
continue
source_col = column_tuple[0]
target_col = column_tuple[-1]
if not source_col or not target_col:
logger.debug(
f"Skipping column tuple with empty values: source={source_col}, "
f"target={target_col}, to_entity={to_entity.name}"
)
continue
from_column = get_column_fqn(
table_entity=from_entity, column=str(target_col)
)
to_column = self._get_data_model_column_fqn(
data_model_entity=to_entity,
column=str(source_col),
)
if from_column and to_column:
processed_column_lineage.append(
ColumnLineage(
fromColumns=[from_column],
toColumn=to_column,
)
)
except Exception as err:
logger.warning(
f"Error processing column lineage {column_tuple}: {err}"
)
logger.debug(traceback.format_exc())
continue
return processed_column_lineage
def build_lineage_request(
self,
source: str,
@ -1077,7 +1214,6 @@ class LookerSource(DashboardServiceSource):
db_service_name: name of the service from the config
to_entity: Dashboard Entity being used
"""
# pylint: disable=too-many-locals
logger.debug(f"Building lineage request for {source} to {to_entity.name}")
source_elements = fqn.split_table_name(table_name=source)
@ -1097,57 +1233,20 @@ class LookerSource(DashboardServiceSource):
fqn=from_fqn,
)
if column_lineage:
processed_column_lineage = []
for column_tuple in column_lineage or []:
try:
if len(column_tuple) < 2:
logger.debug(
f"Skipping invalid column tuple: {column_tuple}"
)
continue
source_col = column_tuple[0]
target_col = column_tuple[-1]
if not source_col or not target_col:
logger.debug(
f"Skipping column tuple with empty values: source={source_col}, "
f"target={target_col}, to_entity={to_entity.name}"
)
continue
from_column = get_column_fqn(
table_entity=from_entity, column=str(target_col)
)
to_column = self._get_data_model_column_fqn(
data_model_entity=to_entity,
column=str(source_col),
)
if from_column and to_column:
processed_column_lineage.append(
ColumnLineage(
fromColumns=[from_column], toColumn=to_column
)
)
except Exception as err:
logger.warning(
f"Error processing column lineage {column_tuple}: {err}"
)
logger.debug(traceback.format_exc())
continue
column_lineage = processed_column_lineage
if from_entity:
if from_entity.id.root not in self._added_lineage:
self._added_lineage[from_entity.id.root] = []
if to_entity.id.root not in self._added_lineage[from_entity.id.root]:
self._added_lineage[from_entity.id.root].append(to_entity.id.root)
processed_column_lineage = (
self._process_and_validate_column_lineage(
column_lineage, from_entity, to_entity
)
)
return self._get_add_lineage_request(
to_entity=to_entity,
from_entity=from_entity,
column_lineage=column_lineage,
column_lineage=processed_column_lineage,
)
return None

View File

@ -52,6 +52,7 @@ class LookMlView(BaseModel):
derived_table: Optional[LookMlDerivedTableField] = Field(
None, description="To track lineage with the source"
)
tags: Optional[List[str]] = Field(None, description="Tags for the view")
class LkmlFile(BaseModel):

View File

@ -421,22 +421,19 @@ class LookerUnitTest(TestCase):
with patch.object(fqn, "build", return_value=None), patch.object(
OpenMetadata, "get_by_name", return_value=table
):
self.assertEqual(
self.looker.build_lineage_request(
source, db_service_name, to_entity
).right,
AddLineageRequest(
edge=EntitiesEdge(
fromEntity=EntityReference(id=table.id.root, type="table"),
toEntity=EntityReference(
id=to_entity.id.root, type="dashboard"
),
lineageDetails=LineageDetails(
source=LineageSource.DashboardLineage
),
)
),
original_lineage = self.looker.build_lineage_request(
source, db_service_name, to_entity
).right
expected_lineage = AddLineageRequest(
edge=EntitiesEdge(
fromEntity=EntityReference(id=table.id.root, type="table"),
toEntity=EntityReference(id=to_entity.id.root, type="dashboard"),
lineageDetails=LineageDetails(
source=LineageSource.DashboardLineage, columnsLineage=[]
),
)
)
self.assertEqual(original_lineage, expected_lineage)
def test_yield_dashboard_chart(self):
"""