mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2026-01-08 05:26:19 +00:00
Add: Looker column level lineage (#21458)
* Add: Looker column level lineage * Fix broken lineage * add exception handling --------- Co-authored-by: ulixius9 <mayursingal9@gmail.com>
This commit is contained in:
parent
7d0020aa08
commit
21f3c4be3c
@ -522,7 +522,10 @@ class DashboardServiceSource(TopologyRunnerMixin, Source, ABC):
|
||||
if not data_model_entity:
|
||||
return None
|
||||
for tbl_column in data_model_entity.columns:
|
||||
if tbl_column.displayName.lower() == column.lower():
|
||||
if (
|
||||
tbl_column.displayName
|
||||
and tbl_column.displayName.lower() == column.lower()
|
||||
) or (tbl_column.name.root.lower() == column.lower()):
|
||||
return tbl_column.fullyQualifiedName.root
|
||||
return None
|
||||
|
||||
|
||||
@ -32,6 +32,7 @@ from typing import (
|
||||
Optional,
|
||||
Sequence,
|
||||
Set,
|
||||
Tuple,
|
||||
Type,
|
||||
Union,
|
||||
cast,
|
||||
@ -66,7 +67,7 @@ from metadata.generated.schema.entity.data.dashboardDataModel import (
|
||||
DashboardDataModel,
|
||||
DataModelType,
|
||||
)
|
||||
from metadata.generated.schema.entity.data.table import Table
|
||||
from metadata.generated.schema.entity.data.table import Column, Table
|
||||
from metadata.generated.schema.entity.services.connections.dashboard.lookerConnection import (
|
||||
LookerConnection,
|
||||
NoGitCredentials,
|
||||
@ -97,7 +98,11 @@ from metadata.generated.schema.type.basic import (
|
||||
SourceUrl,
|
||||
Uuid,
|
||||
)
|
||||
from metadata.generated.schema.type.entityLineage import EntitiesEdge, LineageDetails
|
||||
from metadata.generated.schema.type.entityLineage import (
|
||||
ColumnLineage,
|
||||
EntitiesEdge,
|
||||
LineageDetails,
|
||||
)
|
||||
from metadata.generated.schema.type.entityLineage import Source as LineageSource
|
||||
from metadata.generated.schema.type.entityReference import EntityReference
|
||||
from metadata.generated.schema.type.entityReferenceList import EntityReferenceList
|
||||
@ -106,6 +111,7 @@ from metadata.ingestion.api.models import Either
|
||||
from metadata.ingestion.api.steps import InvalidSourceException
|
||||
from metadata.ingestion.lineage.models import ConnectionTypeDialectMapper, Dialect
|
||||
from metadata.ingestion.lineage.parser import LineageParser
|
||||
from metadata.ingestion.lineage.sql_lineage import get_column_fqn
|
||||
from metadata.ingestion.ometa.ometa_api import OpenMetadata
|
||||
from metadata.ingestion.source.dashboard.dashboard_service import (
|
||||
DashboardServiceSource,
|
||||
@ -638,6 +644,55 @@ class LookerSource(DashboardServiceSource):
|
||||
for dependent_view_name in view_references:
|
||||
self._derived_dependencies.add_edge(view_name, dependent_view_name)
|
||||
|
||||
def _extract_column_lineage(self, view: LookMlView) -> List[Tuple[Column, Column]]:
|
||||
"""
|
||||
Extract column level lineage from a LookML view.
|
||||
Returns a list of tuples containing (source_column, target_column)
|
||||
"""
|
||||
column_lineage = []
|
||||
try:
|
||||
# Build a map: field_name → sql block
|
||||
field_sql_map = {}
|
||||
for field_type in ["dimensions", "measures", "dimension_groups"]:
|
||||
for field in getattr(view, field_type, []):
|
||||
if hasattr(field, "sql"):
|
||||
field_sql_map[field.name] = field.sql
|
||||
|
||||
# Regex to extract ${TABLE}.col and ${field}
|
||||
table_col_pattern = re.compile(r"\$\{TABLE\}\.([a-zA-Z_][a-zA-Z0-9_]*)")
|
||||
dimension_ref_pattern = re.compile(
|
||||
r"\$\{(?!TABLE\})([a-zA-Z_][a-zA-Z0-9_]*)\}"
|
||||
)
|
||||
|
||||
# Recursive resolver
|
||||
def resolve(field_name, visited=None):
|
||||
if visited is None:
|
||||
visited = set()
|
||||
if field_name in visited:
|
||||
return set()
|
||||
visited.add(field_name)
|
||||
|
||||
sql = field_sql_map.get(field_name, "")
|
||||
source_cols = set(table_col_pattern.findall(sql))
|
||||
dimension_refs = dimension_ref_pattern.findall(sql)
|
||||
|
||||
for ref in dimension_refs:
|
||||
source_cols.update(resolve(ref, visited))
|
||||
|
||||
return source_cols
|
||||
|
||||
# Build lineage for each field
|
||||
for field_name, _ in field_sql_map.items():
|
||||
source_cols = resolve(field_name)
|
||||
for source_col in source_cols:
|
||||
column_lineage.append((source_col, field_name))
|
||||
|
||||
return column_lineage
|
||||
except Exception as e:
|
||||
logger.warning(f"Error extracting column lineage: {e}")
|
||||
logger.debug(traceback.format_exc())
|
||||
return []
|
||||
|
||||
def add_view_lineage(
|
||||
self, view: LookMlView, explore: LookmlModelExplore
|
||||
) -> Iterable[Either[AddLineageRequest]]:
|
||||
@ -672,11 +727,15 @@ class LookerSource(DashboardServiceSource):
|
||||
source_table_name = self._clean_table_name(sql_table_name, dialect)
|
||||
self._parsed_views[view.name] = source_table_name
|
||||
|
||||
# Extract column lineage
|
||||
column_lineage = self._extract_column_lineage(view)
|
||||
|
||||
# View to the source is only there if we are informing the dbServiceNames
|
||||
yield self.build_lineage_request(
|
||||
source=source_table_name,
|
||||
db_service_name=db_service_name,
|
||||
to_entity=self._view_data_model,
|
||||
column_lineage=column_lineage,
|
||||
)
|
||||
|
||||
elif view.derived_table:
|
||||
@ -714,17 +773,28 @@ class LookerSource(DashboardServiceSource):
|
||||
"""
|
||||
for db_service_name in self.get_db_service_names() or []:
|
||||
lineage_parser = LineageParser(
|
||||
sql_query,
|
||||
f"create view {view_name} as {sql_query}",
|
||||
self._get_db_dialect(db_service_name),
|
||||
timeout_seconds=30,
|
||||
)
|
||||
if lineage_parser.source_tables:
|
||||
self._parsed_views[view_name] = sql_query
|
||||
for from_table_name in lineage_parser.source_tables:
|
||||
# Process column level lineage
|
||||
column_lineage = [
|
||||
(
|
||||
source_col.raw_name,
|
||||
target_col.raw_name,
|
||||
)
|
||||
for source_col, target_col in lineage_parser.column_lineage
|
||||
or []
|
||||
if source_col.parent == from_table_name
|
||||
]
|
||||
yield self.build_lineage_request(
|
||||
source=str(from_table_name),
|
||||
db_service_name=db_service_name,
|
||||
to_entity=self._view_data_model,
|
||||
column_lineage=column_lineage if column_lineage else None,
|
||||
)
|
||||
|
||||
def _get_db_dialect(self, db_service_name) -> Dialect:
|
||||
@ -983,6 +1053,7 @@ class LookerSource(DashboardServiceSource):
|
||||
source: str,
|
||||
db_service_name: str,
|
||||
to_entity: Union[Dashboard, DashboardDataModel],
|
||||
column_lineage: Optional[List[Tuple[Column, Column]]] = None,
|
||||
) -> Optional[Either[AddLineageRequest]]:
|
||||
"""
|
||||
Once we have a list of origin data sources, check their components
|
||||
@ -1013,13 +1084,31 @@ class LookerSource(DashboardServiceSource):
|
||||
fqn=from_fqn,
|
||||
)
|
||||
|
||||
if column_lineage:
|
||||
processed_column_lineage = []
|
||||
for source_col, target_col in column_lineage:
|
||||
from_column = get_column_fqn(
|
||||
table_entity=from_entity, column=str(target_col)
|
||||
)
|
||||
to_column = self._get_data_model_column_fqn(
|
||||
data_model_entity=to_entity,
|
||||
column=str(source_col),
|
||||
)
|
||||
if from_column and to_column:
|
||||
processed_column_lineage.append(
|
||||
ColumnLineage(fromColumns=[from_column], toColumn=to_column)
|
||||
)
|
||||
column_lineage = processed_column_lineage
|
||||
|
||||
if from_entity:
|
||||
if from_entity.id.root not in self._added_lineage:
|
||||
self._added_lineage[from_entity.id.root] = []
|
||||
if to_entity.id.root not in self._added_lineage[from_entity.id.root]:
|
||||
self._added_lineage[from_entity.id.root].append(to_entity.id.root)
|
||||
return self._get_add_lineage_request(
|
||||
to_entity=to_entity, from_entity=from_entity
|
||||
to_entity=to_entity,
|
||||
from_entity=from_entity,
|
||||
column_lineage=column_lineage,
|
||||
)
|
||||
|
||||
return None
|
||||
|
||||
@ -25,6 +25,7 @@ class LookMlField(BaseModel):
|
||||
label: Optional[str] = Field(None, description="Field display name")
|
||||
type: Optional[str] = Field(None, description="Field type to be mapped to OM")
|
||||
name: str = Field(..., description="Field name")
|
||||
sql: Optional[str] = Field(None, description="Field SQL")
|
||||
|
||||
|
||||
class LookMlDerivedTableField(BaseModel):
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user