From 21f3c4be3cee25348ad1f688cc7a7f17137022cf Mon Sep 17 00:00:00 2001 From: Suman Maharana Date: Thu, 29 May 2025 17:26:55 +0530 Subject: [PATCH] Add: Looker column level lineage (#21458) * Add: Looker column level lineage * Fix broken lineage * add exception handling --------- Co-authored-by: ulixius9 --- .../source/dashboard/dashboard_service.py | 5 +- .../source/dashboard/looker/metadata.py | 97 ++++++++++++++++++- .../source/dashboard/looker/models.py | 1 + 3 files changed, 98 insertions(+), 5 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py b/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py index d4962efc22d..b1d72a15a3d 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/dashboard_service.py @@ -522,7 +522,10 @@ class DashboardServiceSource(TopologyRunnerMixin, Source, ABC): if not data_model_entity: return None for tbl_column in data_model_entity.columns: - if tbl_column.displayName.lower() == column.lower(): + if ( + tbl_column.displayName + and tbl_column.displayName.lower() == column.lower() + ) or (tbl_column.name.root.lower() == column.lower()): return tbl_column.fullyQualifiedName.root return None diff --git a/ingestion/src/metadata/ingestion/source/dashboard/looker/metadata.py b/ingestion/src/metadata/ingestion/source/dashboard/looker/metadata.py index f5be94e0ecb..657acee7b41 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/looker/metadata.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/looker/metadata.py @@ -32,6 +32,7 @@ from typing import ( Optional, Sequence, Set, + Tuple, Type, Union, cast, @@ -66,7 +67,7 @@ from metadata.generated.schema.entity.data.dashboardDataModel import ( DashboardDataModel, DataModelType, ) -from metadata.generated.schema.entity.data.table import Table +from metadata.generated.schema.entity.data.table import Column, Table from metadata.generated.schema.entity.services.connections.dashboard.lookerConnection import ( LookerConnection, NoGitCredentials, @@ -97,7 +98,11 @@ from metadata.generated.schema.type.basic import ( SourceUrl, Uuid, ) -from metadata.generated.schema.type.entityLineage import EntitiesEdge, LineageDetails +from metadata.generated.schema.type.entityLineage import ( + ColumnLineage, + EntitiesEdge, + LineageDetails, +) from metadata.generated.schema.type.entityLineage import Source as LineageSource from metadata.generated.schema.type.entityReference import EntityReference from metadata.generated.schema.type.entityReferenceList import EntityReferenceList @@ -106,6 +111,7 @@ from metadata.ingestion.api.models import Either from metadata.ingestion.api.steps import InvalidSourceException from metadata.ingestion.lineage.models import ConnectionTypeDialectMapper, Dialect from metadata.ingestion.lineage.parser import LineageParser +from metadata.ingestion.lineage.sql_lineage import get_column_fqn from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.dashboard.dashboard_service import ( DashboardServiceSource, @@ -638,6 +644,55 @@ class LookerSource(DashboardServiceSource): for dependent_view_name in view_references: self._derived_dependencies.add_edge(view_name, dependent_view_name) + def _extract_column_lineage(self, view: LookMlView) -> List[Tuple[Column, Column]]: + """ + Extract column level lineage from a LookML view. + Returns a list of tuples containing (source_column, target_column) + """ + column_lineage = [] + try: + # Build a map: field_name → sql block + field_sql_map = {} + for field_type in ["dimensions", "measures", "dimension_groups"]: + for field in getattr(view, field_type, []): + if hasattr(field, "sql"): + field_sql_map[field.name] = field.sql + + # Regex to extract ${TABLE}.col and ${field} + table_col_pattern = re.compile(r"\$\{TABLE\}\.([a-zA-Z_][a-zA-Z0-9_]*)") + dimension_ref_pattern = re.compile( + r"\$\{(?!TABLE\})([a-zA-Z_][a-zA-Z0-9_]*)\}" + ) + + # Recursive resolver + def resolve(field_name, visited=None): + if visited is None: + visited = set() + if field_name in visited: + return set() + visited.add(field_name) + + sql = field_sql_map.get(field_name, "") + source_cols = set(table_col_pattern.findall(sql)) + dimension_refs = dimension_ref_pattern.findall(sql) + + for ref in dimension_refs: + source_cols.update(resolve(ref, visited)) + + return source_cols + + # Build lineage for each field + for field_name, _ in field_sql_map.items(): + source_cols = resolve(field_name) + for source_col in source_cols: + column_lineage.append((source_col, field_name)) + + return column_lineage + except Exception as e: + logger.warning(f"Error extracting column lineage: {e}") + logger.debug(traceback.format_exc()) + return [] + def add_view_lineage( self, view: LookMlView, explore: LookmlModelExplore ) -> Iterable[Either[AddLineageRequest]]: @@ -672,11 +727,15 @@ class LookerSource(DashboardServiceSource): source_table_name = self._clean_table_name(sql_table_name, dialect) self._parsed_views[view.name] = source_table_name + # Extract column lineage + column_lineage = self._extract_column_lineage(view) + # View to the source is only there if we are informing the dbServiceNames yield self.build_lineage_request( source=source_table_name, db_service_name=db_service_name, to_entity=self._view_data_model, + column_lineage=column_lineage, ) elif view.derived_table: @@ -714,17 +773,28 @@ class LookerSource(DashboardServiceSource): """ for db_service_name in self.get_db_service_names() or []: lineage_parser = LineageParser( - sql_query, + f"create view {view_name} as {sql_query}", self._get_db_dialect(db_service_name), timeout_seconds=30, ) if lineage_parser.source_tables: self._parsed_views[view_name] = sql_query for from_table_name in lineage_parser.source_tables: + # Process column level lineage + column_lineage = [ + ( + source_col.raw_name, + target_col.raw_name, + ) + for source_col, target_col in lineage_parser.column_lineage + or [] + if source_col.parent == from_table_name + ] yield self.build_lineage_request( source=str(from_table_name), db_service_name=db_service_name, to_entity=self._view_data_model, + column_lineage=column_lineage if column_lineage else None, ) def _get_db_dialect(self, db_service_name) -> Dialect: @@ -983,6 +1053,7 @@ class LookerSource(DashboardServiceSource): source: str, db_service_name: str, to_entity: Union[Dashboard, DashboardDataModel], + column_lineage: Optional[List[Tuple[Column, Column]]] = None, ) -> Optional[Either[AddLineageRequest]]: """ Once we have a list of origin data sources, check their components @@ -1013,13 +1084,31 @@ class LookerSource(DashboardServiceSource): fqn=from_fqn, ) + if column_lineage: + processed_column_lineage = [] + for source_col, target_col in column_lineage: + from_column = get_column_fqn( + table_entity=from_entity, column=str(target_col) + ) + to_column = self._get_data_model_column_fqn( + data_model_entity=to_entity, + column=str(source_col), + ) + if from_column and to_column: + processed_column_lineage.append( + ColumnLineage(fromColumns=[from_column], toColumn=to_column) + ) + column_lineage = processed_column_lineage + if from_entity: if from_entity.id.root not in self._added_lineage: self._added_lineage[from_entity.id.root] = [] if to_entity.id.root not in self._added_lineage[from_entity.id.root]: self._added_lineage[from_entity.id.root].append(to_entity.id.root) return self._get_add_lineage_request( - to_entity=to_entity, from_entity=from_entity + to_entity=to_entity, + from_entity=from_entity, + column_lineage=column_lineage, ) return None diff --git a/ingestion/src/metadata/ingestion/source/dashboard/looker/models.py b/ingestion/src/metadata/ingestion/source/dashboard/looker/models.py index 897e126f35d..291df7a18d3 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/looker/models.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/looker/models.py @@ -25,6 +25,7 @@ class LookMlField(BaseModel): label: Optional[str] = Field(None, description="Field display name") type: Optional[str] = Field(None, description="Field type to be mapped to OM") name: str = Field(..., description="Field name") + sql: Optional[str] = Field(None, description="Field SQL") class LookMlDerivedTableField(BaseModel):