diff --git a/ingestion/src/metadata/ingestion/source/dashboard/looker/metadata.py b/ingestion/src/metadata/ingestion/source/dashboard/looker/metadata.py index 657acee7b41..203b8c8bf93 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/looker/metadata.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/looker/metadata.py @@ -649,13 +649,14 @@ class LookerSource(DashboardServiceSource): Extract column level lineage from a LookML view. Returns a list of tuples containing (source_column, target_column) """ + logger.debug(f"Extracting column lineage for view: {view.name}") column_lineage = [] try: # Build a map: field_name → sql block field_sql_map = {} for field_type in ["dimensions", "measures", "dimension_groups"]: for field in getattr(view, field_type, []): - if hasattr(field, "sql"): + if hasattr(field, "sql") and field.sql is not None: field_sql_map[field.name] = field.sql # Regex to extract ${TABLE}.col and ${field} @@ -682,10 +683,17 @@ class LookerSource(DashboardServiceSource): return source_cols # Build lineage for each field - for field_name, _ in field_sql_map.items(): - source_cols = resolve(field_name) - for source_col in source_cols: - column_lineage.append((source_col, field_name)) + for field_name, sql in field_sql_map.items(): + try: + if not sql: # Skip if sql is None or empty + continue + source_cols = resolve(field_name) + for source_col in source_cols: + column_lineage.append((source_col, field_name)) + except Exception as err: + logger.warning(f"Error processing field {field_name}: {err}") + logger.debug(traceback.format_exc()) + continue return column_lineage except Exception as e: @@ -707,6 +715,9 @@ class LookerSource(DashboardServiceSource): # TODO: column-level lineage parsing the explore columns with the format `view_name.col` # Now the context has the newly created view if explore_model: + logger.debug( + f"Building lineage request for view {self._view_data_model.name} to explore {explore_model.name}" + ) yield self._get_add_lineage_request( from_entity=self._view_data_model, to_entity=explore_model ) @@ -1066,6 +1077,8 @@ class LookerSource(DashboardServiceSource): db_service_name: name of the service from the config to_entity: Dashboard Entity being used """ + # pylint: disable=too-many-locals + logger.debug(f"Building lineage request for {source} to {to_entity.name}") source_elements = fqn.split_table_name(table_name=source) @@ -1086,18 +1099,44 @@ class LookerSource(DashboardServiceSource): if column_lineage: processed_column_lineage = [] - for source_col, target_col in column_lineage: - from_column = get_column_fqn( - table_entity=from_entity, column=str(target_col) - ) - to_column = self._get_data_model_column_fqn( - data_model_entity=to_entity, - column=str(source_col), - ) - if from_column and to_column: - processed_column_lineage.append( - ColumnLineage(fromColumns=[from_column], toColumn=to_column) + for column_tuple in column_lineage or []: + try: + if len(column_tuple) < 2: + logger.debug( + f"Skipping invalid column tuple: {column_tuple}" + ) + continue + + source_col = column_tuple[0] + target_col = column_tuple[-1] + + if not source_col or not target_col: + logger.debug( + f"Skipping column tuple with empty values: source={source_col}, " + f"target={target_col}, to_entity={to_entity.name}" + ) + continue + + from_column = get_column_fqn( + table_entity=from_entity, column=str(target_col) ) + to_column = self._get_data_model_column_fqn( + data_model_entity=to_entity, + column=str(source_col), + ) + if from_column and to_column: + processed_column_lineage.append( + ColumnLineage( + fromColumns=[from_column], toColumn=to_column + ) + ) + except Exception as err: + logger.warning( + f"Error processing column lineage {column_tuple}: {err}" + ) + logger.debug(traceback.format_exc()) + continue + column_lineage = processed_column_lineage if from_entity: