feat(ingestion): Enhanced column lineage extraction for Looker/LookML (#14826)

This commit is contained in:
Anush Kumar 2025-09-26 09:27:18 -07:00 committed by GitHub
parent 50c5841b50
commit c18b125a05
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 3277 additions and 39 deletions

View File

@ -307,6 +307,12 @@ class ViewFieldType(Enum):
UNKNOWN = "Unknown"
class ViewFieldDimensionGroupType(Enum):
# Ref: https://cloud.google.com/looker/docs/reference/param-field-dimension-group
TIME = "time"
DURATION = "duration"
class ViewFieldValue(Enum):
NOT_AVAILABLE = "NotAvailable"

View File

@ -11,3 +11,7 @@ prod = "prod"
dev = "dev"
NAME = "name"
DERIVED_DOT_SQL = "derived.sql"
VIEW_FIELD_TYPE_ATTRIBUTE = "type"
VIEW_FIELD_INTERVALS_ATTRIBUTE = "intervals"
VIEW_FIELD_TIMEFRAMES_ATTRIBUTE = "timeframes"

View File

@ -2,6 +2,7 @@
import json
import logging
import os
from enum import Enum
from functools import lru_cache
from typing import Dict, List, MutableMapping, Optional, Sequence, Set, Union, cast
@ -31,6 +32,14 @@ from datahub.configuration.common import ConfigurationError
logger = logging.getLogger(__name__)
class LookerQueryResponseFormat(Enum):
# result_format - Ref: https://cloud.google.com/looker/docs/reference/looker-api/latest/methods/Query/run_inline_query
JSON = "json"
SQL = (
"sql" # Note: This does not execute the query, it only generates the SQL query.
)
class TransportOptionsConfig(ConfigModel):
timeout: int
headers: MutableMapping[str, str]
@ -69,6 +78,7 @@ class LookerAPIStats(BaseModel):
search_looks_calls: int = 0
search_dashboards_calls: int = 0
all_user_calls: int = 0
generate_sql_query_calls: int = 0
class LookerAPI:
@ -170,17 +180,40 @@ class LookerAPI:
logger.debug(f"Executing query {write_query}")
self.client_stats.query_calls += 1
response_json = self.client.run_inline_query(
result_format="json",
response = self.client.run_inline_query(
result_format=LookerQueryResponseFormat.JSON.value,
body=write_query,
transport_options=self.transport_options,
)
data = json.loads(response)
logger.debug("=================Response=================")
data = json.loads(response_json)
logger.debug("Length of response: %d", len(data))
return data
def generate_sql_query(
self, write_query: WriteQuery, use_cache: bool = False
) -> str:
"""
Generates a SQL query string for a given WriteQuery.
Note: This does not execute the query, it only generates the SQL query.
"""
logger.debug(f"Generating SQL query for {write_query}")
self.client_stats.generate_sql_query_calls += 1
response = self.client.run_inline_query(
result_format=LookerQueryResponseFormat.SQL.value,
body=write_query,
transport_options=self.transport_options,
cache=use_cache,
)
logger.debug("=================Response=================")
logger.debug("Length of SQL response: %d", len(response))
return str(response)
def dashboard(self, dashboard_id: str, fields: Union[str, List[str]]) -> Dashboard:
self.client_stats.dashboard_calls += 1
return self.client.dashboard(

View File

@ -3,11 +3,11 @@ from typing import Dict, List, Optional
from datahub.ingestion.source.looker.looker_common import LookerViewId, ViewFieldValue
from datahub.ingestion.source.looker.looker_config import LookerConnectionDefinition
from datahub.ingestion.source.looker.looker_constant import NAME
from datahub.ingestion.source.looker.looker_dataclasses import LookerModel
from datahub.ingestion.source.looker.looker_file_loader import LookerViewFileLoader
from datahub.ingestion.source.looker.lookml_config import (
BASE_PROJECT_NAME,
NAME,
LookMLSourceReport,
)

View File

@ -12,12 +12,12 @@ from datahub.ingestion.source.looker.looker_constant import (
DIMENSION_GROUPS,
DIMENSIONS,
MEASURES,
NAME,
)
from datahub.ingestion.source.looker.looker_dataclasses import LookerViewFile
from datahub.ingestion.source.looker.looker_file_loader import LookerViewFileLoader
from datahub.ingestion.source.looker.lookml_config import (
DERIVED_VIEW_SUFFIX,
NAME,
LookMLSourceReport,
)
from datahub.ingestion.source.looker.lookml_refinement import LookerRefinementResolver

View File

@ -28,11 +28,10 @@ from datahub.ingestion.source.state.stateful_ingestion_base import (
StatefulIngestionConfigBase,
)
from datahub.utilities.lossy_collections import LossyList
from datahub.utilities.stats_collections import TopKDict, float_top_k_dict
logger = logging.getLogger(__name__)
NAME: str = "name"
BASE_PROJECT_NAME = "__BASE"
EXPLORE_FILE_EXTENSION = ".explore.lkml"
@ -47,6 +46,9 @@ DERIVED_VIEW_PATTERN: str = r"\$\{([^}]*)\}"
@dataclass
class LookMLSourceReport(StaleEntityRemovalSourceReport):
git_clone_latency: Optional[timedelta] = None
looker_query_api_latency_seconds: TopKDict[str, float] = dataclass_field(
default_factory=float_top_k_dict
)
models_discovered: int = 0
models_dropped: LossyList[str] = dataclass_field(default_factory=LossyList)
views_discovered: int = 0
@ -81,6 +83,11 @@ class LookMLSourceReport(StaleEntityRemovalSourceReport):
self.api_stats = self._looker_api.compute_stats()
return super().compute_stats()
def report_looker_query_api_latency(
self, view_urn: str, latency: timedelta
) -> None:
self.looker_query_api_latency_seconds[view_urn] = latency.total_seconds()
class LookMLSourceConfig(
LookerCommonConfig, StatefulIngestionConfigBase, EnvConfigMixin
@ -122,6 +129,16 @@ class LookMLSourceConfig(
description="List of regex patterns for LookML views to include in the extraction.",
)
parse_table_names_from_sql: bool = Field(True, description="See note below.")
use_api_for_view_lineage: bool = Field(
False,
description="When enabled, uses Looker API to get SQL representation of views for lineage parsing instead of parsing LookML files directly. Requires 'api' configuration to be provided."
"Coverage of regex based lineage extraction has limitations, it only supportes ${TABLE}.column_name syntax, See (https://cloud.google.com/looker/docs/reference/param-field-sql#sql_for_dimensions) to"
"understand the other substitutions and cross-references allowed in LookML.",
)
use_api_cache_for_view_lineage: bool = Field(
False,
description="When enabled, uses Looker API server-side caching for query execution. Requires 'api' configuration to be provided.",
)
api: Optional[LookerAPIConfig] = None
project_name: Optional[str] = Field(
None,
@ -239,6 +256,17 @@ class LookMLSourceConfig(
)
return values
@root_validator(skip_on_failure=True)
def check_api_provided_for_view_lineage(cls, values):
"""Validate that we must have an api credential to use Looker API for view's column lineage"""
if not values.get("api") and values.get("use_api_for_view_lineage"):
raise ValueError(
"API credential was not found. LookML source requires api credentials "
"for Looker to use Looker APIs for view's column lineage extraction."
"Set `use_api_for_view_lineage` to False to skip using Looker APIs."
)
return values
@validator("base_folder", always=True)
def check_base_folder_if_not_provided(
cls, v: Optional[pydantic.DirectoryPath], values: Dict[str, Any]

View File

@ -4,10 +4,10 @@ import logging
from typing import ClassVar, Dict, List, Set
from datahub.ingestion.source.looker.looker_config import LookerConnectionDefinition
from datahub.ingestion.source.looker.looker_constant import NAME
from datahub.ingestion.source.looker.looker_dataclasses import LookerModel
from datahub.ingestion.source.looker.looker_file_loader import LookerViewFileLoader
from datahub.ingestion.source.looker.lookml_config import (
NAME,
LookMLSourceConfig,
LookMLSourceReport,
)

View File

@ -142,6 +142,8 @@ class LookerView:
ctx: PipelineContext,
extract_col_level_lineage: bool = False,
populate_sql_logic_in_descriptions: bool = False,
looker_client: Optional[LookerAPI] = None,
view_to_explore_map: Optional[Dict[str, str]] = None,
) -> Optional["LookerView"]:
view_name = view_context.name()
@ -160,6 +162,8 @@ class LookerView:
config=config,
ctx=ctx,
reporter=reporter,
looker_client=looker_client,
view_to_explore_map=view_to_explore_map,
)
field_type_vs_raw_fields = OrderedDict(
@ -705,6 +709,11 @@ class LookMLSource(StatefulIngestionSourceBase):
# Value: Tuple(model file name, connection name)
view_connection_map: Dict[str, Tuple[str, str]] = {}
# Map of view name to explore name for API-based view lineage
# A view can be referenced by multiple explores, we only need one of the explores to use Looker Query API
# Key: view_name, Value: explore_name
view_to_explore_map: Dict[str, str] = {}
# The ** means "this directory and all subdirectories", and hence should
# include all the files we want.
model_files = sorted(
@ -759,37 +768,37 @@ class LookMLSource(StatefulIngestionSourceBase):
)
)
if self.source_config.emit_reachable_views_only:
model_explores_map = {d["name"]: d for d in model.explores}
for explore_dict in model.explores:
try:
if LookerRefinementResolver.is_refinement(explore_dict["name"]):
continue
model_explores_map = {d["name"]: d for d in model.explores}
for explore_dict in model.explores:
try:
if LookerRefinementResolver.is_refinement(explore_dict["name"]):
continue
explore_dict = (
looker_refinement_resolver.apply_explore_refinement(
explore_dict
)
)
explore: LookerExplore = LookerExplore.from_dict(
model_name,
explore_dict,
model.resolved_includes,
viewfile_loader,
self.reporter,
model_explores_map,
)
if explore.upstream_views:
for view_name in explore.upstream_views:
explore_dict = looker_refinement_resolver.apply_explore_refinement(
explore_dict
)
explore: LookerExplore = LookerExplore.from_dict(
model_name,
explore_dict,
model.resolved_includes,
viewfile_loader,
self.reporter,
model_explores_map,
)
if explore.upstream_views:
for view_name in explore.upstream_views:
if self.source_config.emit_reachable_views_only:
explore_reachable_views.add(view_name.include)
except Exception as e:
self.reporter.report_warning(
title="Failed to process explores",
message="Failed to process explore dictionary.",
context=f"Explore Details: {explore_dict}",
exc=e,
)
logger.debug("Failed to process explore", exc_info=e)
# Build view to explore mapping for API-based view lineage
view_to_explore_map[view_name.include] = explore.name
except Exception as e:
self.reporter.report_warning(
title="Failed to process explores",
message="Failed to process explore dictionary.",
context=f"Explore Details: {explore_dict}",
exc=e,
)
logger.debug("Failed to process explore", exc_info=e)
processed_view_files = processed_view_map.setdefault(
model.connection, set()
@ -878,6 +887,10 @@ class LookMLSource(StatefulIngestionSourceBase):
populate_sql_logic_in_descriptions=self.source_config.populate_sql_logic_for_missing_descriptions,
config=self.source_config,
ctx=self.ctx,
looker_client=self.looker_client,
view_to_explore_map=view_to_explore_map
if view_to_explore_map
else None,
)
except Exception as e:
self.reporter.report_warning(

View File

@ -1,18 +1,33 @@
import logging
import re
from abc import ABC, abstractmethod
from datetime import datetime
from functools import lru_cache
from typing import Dict, List, Optional
from looker_sdk.sdk.api40.models import (
WriteQuery,
)
from datahub.emitter.mce_builder import make_dataset_urn_with_platform_instance
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.source.looker.looker_common import (
LookerExplore,
LookerViewId,
ViewField,
ViewFieldDimensionGroupType,
ViewFieldType,
)
from datahub.ingestion.source.looker.looker_config import LookerConnectionDefinition
from datahub.ingestion.source.looker.looker_constant import (
NAME,
VIEW_FIELD_INTERVALS_ATTRIBUTE,
VIEW_FIELD_TIMEFRAMES_ATTRIBUTE,
VIEW_FIELD_TYPE_ATTRIBUTE,
)
from datahub.ingestion.source.looker.looker_lib_wrapper import (
LookerAPI,
)
from datahub.ingestion.source.looker.looker_view_id_cache import LookerViewIdCache
from datahub.ingestion.source.looker.lookml_concept_context import (
LookerFieldContext,
@ -20,7 +35,6 @@ from datahub.ingestion.source.looker.lookml_concept_context import (
)
from datahub.ingestion.source.looker.lookml_config import (
DERIVED_VIEW_SUFFIX,
NAME,
LookMLSourceConfig,
LookMLSourceReport,
)
@ -280,6 +294,447 @@ class AbstractViewUpstream(ABC):
return upstream_column_refs
class LookerQueryAPIBasedViewUpstream(AbstractViewUpstream):
"""
Implements Looker view upstream lineage extraction using the Looker Query API.
This class leverages the Looker API to generate the fully resolved SQL for a Looker view by constructing a WriteQuery
that includes all dimensions, dimension groups and measures. The SQL is then parsed to extract column-level lineage.
The Looker client is required for this class, as it is used to execute the WriteQuery and retrieve the SQL.
Other view upstream implementations use string parsing to extract lineage information from the SQL, which does not cover all the edge cases.
Limitations of string based lineage extraction: Ref: https://cloud.google.com/looker/docs/reference/param-field-sql#sql_for_dimensions
Key Features:
- Requires a Looker client (`looker_client`) to execute queries and retrieve SQL for the view.
- Requires a `view_to_explore_map` to map view names to their corresponding explore name
- Field name translation is handled: Looker API field names are constructed as `<view_name>.<field_name>`, and helper
methods are provided to convert between Looker API field names and raw field names.
- SQL parsing is cached for efficiency, and the class is designed to gracefully fall back if the Looker Query API fails.
- All lineage extraction is based on the SQL returned by the Looker API, ensuring accurate and up-to-date lineage.
Why view_to_explore_map is required:
The Looker Query API expects the explore name (not the view name) as the "view" parameter in the WriteQuery.
In Looker, a view can be referenced by multiple explores, but the API needs any one of the
explores to access the view's fields
Example WriteQuery request (see `_execute_query` for details):
{
"model": "test_model",
"view": "users_explore", # This is the explore name, not the view name
"fields": [
"users.email", "users.lifetime_purchase_count"
],
"limit": "1",
"cache": true
}
The SQL response is then parsed to extract upstream tables and column-level lineage.
For further details, see the method-level docstrings, especially:
- `__get_spr`: SQL parsing and lineage extraction workflow
- `_get_sql_write_query`: WriteQuery construction and field enumeration
- `_execute_query`: Looker API invocation and SQL retrieval - this only generates the SQL query, does not execute it
- Field name translation: `_get_looker_api_field_name` and `_get_field_name_from_looker_api_field_name`
Note: This class is intended to be robust and raise exceptions if SQL parsing or API calls fail, and will fall back to
other implementations - custom regex-based parsing if necessary.
"""
def __init__(
self,
view_context: LookerViewContext,
looker_view_id_cache: LookerViewIdCache,
config: LookMLSourceConfig,
reporter: LookMLSourceReport,
ctx: PipelineContext,
looker_client: LookerAPI,
view_to_explore_map: Dict[str, str],
):
super().__init__(view_context, looker_view_id_cache, config, reporter, ctx)
self.looker_client = looker_client
self.view_to_explore_map = view_to_explore_map
# Cache the SQL parsing results
# We use maxsize=1 because a new class instance is created for each view, Ref: view_upstream.create_view_upstream
self._get_spr = lru_cache(maxsize=1)(self.__get_spr)
self._get_upstream_dataset_urn = lru_cache(maxsize=1)(
self.__get_upstream_dataset_urn
)
# Initialize the cache
# Done to fallback to other implementations if the Looker Query API fails
self._get_spr()
def __get_spr(self) -> SqlParsingResult:
"""
Retrieves the SQL parsing result for the current Looker view by:
1. Building a WriteQuery for the view.
2. Executing the query via the Looker API to get the SQL.
3. Parsing the SQL to extract lineage information.
Returns:
SqlParsingResult if successful, otherwise None.
Raises:
ValueError: If no SQL is found in the response.
ValueError: If no fields are found for the view.
ValueError: If explore name is not found for the view.
ValueError: If error in parsing SQL for upstream tables.
ValueError: If error in parsing SQL for column lineage.
"""
try:
# Build the WriteQuery for the current view.
sql_query: WriteQuery = self._get_sql_write_query()
# Execute the query to get the SQL representation from Looker.
sql_response = self._execute_query(sql_query)
# Parse the SQL to extract lineage information.
spr = create_lineage_sql_parsed_result(
query=sql_response,
default_schema=self.view_context.view_connection.default_schema,
default_db=self.view_context.view_connection.default_db,
platform=self.view_context.view_connection.platform,
platform_instance=self.view_context.view_connection.platform_instance,
env=self.view_context.view_connection.platform_env or self.config.env,
graph=self.ctx.graph,
)
# Check for errors encountered during table extraction.
table_error = spr.debug_info.table_error
if table_error is not None:
self.reporter.report_warning(
title="Table Level Lineage Extraction Failed",
message="Error in parsing derived sql",
context=f"View-name: {self.view_context.name()}",
exc=table_error,
)
raise ValueError(
f"Error in parsing SQL for upstream tables: {table_error}"
)
column_error = spr.debug_info.column_error
if column_error is not None:
self.reporter.report_warning(
title="Column Level Lineage Extraction Failed",
message="Error in parsing derived sql",
context=f"View-name: {self.view_context.name()}",
exc=column_error,
)
raise ValueError(
f"Error in parsing SQL for column lineage: {column_error}"
)
return spr
except Exception:
# Reraise the exception to allow higher-level handling.
raise
def _get_time_dim_group_field_name(self, dim_group: dict) -> str:
"""
Time dimension groups must be referenced by their individual timeframes suffix.
Example:
dimension_group: created {
type: time
timeframes: [date, week, month]
sql: ${TABLE}.created_at ;;
}
Used as: {view_name.date_created}
created -> created_date, created_week, created_month
# Ref: https://cloud.google.com/looker/docs/reference/param-field-dimension-group#dimension_groups_must_be_referenced_by_their_individual_dimensions
"""
dim_group_name = dim_group.get(NAME)
timeframes = dim_group.get(VIEW_FIELD_TIMEFRAMES_ATTRIBUTE)
# If timeframes is not included (rare case), the dimension group will include all possible timeframes.
# We will pick to use "raw"
suffix = timeframes[0] if timeframes else "raw"
return f"{dim_group_name}_{suffix}"
def _get_duration_dim_group_field_name(self, dim_group: dict) -> str:
"""
Duration dimension groups must be referenced by their plural version of the interval value as prefix
Example:
dimension_group: since_event {
type: duration
intervals: [hour, day, week, month, quarter, year]
sql_start: ${faa_event_date_raw} ;;
sql_end: CURRENT_TIMESTAMP();;
}
Used as: {view_name.hours_since_event}
since_event -> hours_since_event, days_since_event, weeks_since_event, months_since_event, quarters_since_event, years_since_event
# Ref: https://cloud.google.com/looker/docs/reference/param-field-dimension-group#referencing_intervals_from_another_lookml_field
"""
dim_group_name = dim_group.get(NAME)
intervals = dim_group.get(VIEW_FIELD_INTERVALS_ATTRIBUTE)
# If intervals is not included (rare case), the dimension group will include all possible intervals.
# We will pick to use "day" -> "days"
prefix = f"{intervals[0]}s" if intervals else "days"
return f"{prefix}_{dim_group_name}"
def _get_sql_write_query(self) -> WriteQuery:
"""
Constructs a WriteQuery object to obtain the SQL representation of the current Looker view.
We need to list all the fields for the view to get the SQL representation of the view - this fully resolved SQL for view dimensions and measures.
The method uses the view_to_explore_map to determine the correct explore name to use in the WriteQuery.
This is crucial because the Looker Query API expects the explore name (not the view name) as the "view" parameter.
Ref: https://cloud.google.com/looker/docs/reference/param-field-sql#sql_for_dimensions
Returns:
WriteQuery: The WriteQuery object if fields are found and explore name is available, otherwise None.
Raises:
ValueError: If the explore name is not found in the view_to_explore_map for the current view.
ValueError: If no fields are found for the view.
"""
# Collect all dimension and measure fields for the view.
view_fields: List[str] = []
# Add dimension fields in the format: <view_name>.<dimension_name> or <view_name>.<measure_name>
for field in self.view_context.dimensions() + self.view_context.measures():
field_name = field.get(NAME)
assert field_name # Happy linter
view_fields.append(self._get_looker_api_field_name(field_name))
for dim_group in self.view_context.dimension_groups():
dim_group_type: ViewFieldDimensionGroupType = ViewFieldDimensionGroupType(
dim_group.get(VIEW_FIELD_TYPE_ATTRIBUTE)
)
if dim_group_type == ViewFieldDimensionGroupType.TIME:
view_fields.append(
self._get_looker_api_field_name(
self._get_time_dim_group_field_name(dim_group)
)
)
elif dim_group_type == ViewFieldDimensionGroupType.DURATION:
view_fields.append(
self._get_looker_api_field_name(
self._get_duration_dim_group_field_name(dim_group)
)
)
# Use explore name from view_to_explore_map if available
# explore_name is always present in the view_to_explore_map because of the check in view_upstream.create_view_upstream
explore_name = self.view_to_explore_map.get(self.view_context.name())
assert explore_name # Happy linter
if not view_fields:
raise ValueError(
f"No fields found for view '{self.view_context.name()}'. Cannot proceed with Looker API for view lineage."
)
# Construct and return the WriteQuery object.
# The 'limit' is set to "1" as the query is only used to obtain SQL, not to fetch data.
return WriteQuery(
model=self.looker_view_id_cache.model_name,
view=explore_name,
fields=view_fields,
filters={},
limit="1",
)
def _execute_query(self, query: WriteQuery) -> str:
"""
Executes a Looker SQL query using the Looker API and returns the SQL string.
Ref: https://cloud.google.com/looker/docs/reference/looker-api/latest/methods/Query/run_inline_query
Example Request:
WriteQuery:
{
"model": "test_model",
"view": "users",
"fields": [
"users.email", "users.lifetime_purchase_count"
],
"limit": "1",
"cache": true
}
Response:
"
SELECT
users."EMAIL" AS "users.email",
COUNT(DISTINCT ( purchases."PK" ) ) AS "users.lifetime_purchase_count"
FROM "ECOMMERCE"."USERS" AS users
LEFT JOIN "ECOMMERCE"."PURCHASES" AS purchases ON (users."PK") = (purchases."USER_FK")
GROUP BY
1
ORDER BY
2 DESC
FETCH NEXT 1 ROWS ONLY
"
Args:
query (WriteQuery): The Looker WriteQuery object to execute.
Returns:
str: The SQL string returned by the Looker API, or an empty string if execution fails.
"""
# Record the start time for latency measurement.
start_time = datetime.now()
# Execute the query using the Looker client.
sql_response = self.looker_client.generate_sql_query(
write_query=query, use_cache=self.config.use_api_cache_for_view_lineage
)
# Record the end time after query execution.
end_time = datetime.now()
# Attempt to get the LookerViewId for reporting.
looker_view_id: Optional[LookerViewId] = (
self.looker_view_id_cache.get_looker_view_id(
view_name=self.view_context.name(),
base_folder_path=self.view_context.base_folder_path,
)
)
# Report the query API latency if the view ID is available.
if looker_view_id is not None:
self.reporter.report_looker_query_api_latency(
looker_view_id.get_urn(self.config),
end_time - start_time,
)
# Validate the response structure.
if not sql_response:
raise ValueError(
f"No SQL found in response for view '{self.view_context.name()}'. Response: {sql_response}"
)
# Extract the SQL string from the response.
return sql_response
def __get_upstream_dataset_urn(self) -> List[Urn]:
"""
Extract upstream dataset URNs by parsing the SQL for the current view.
Returns:
List[Urn]: List of upstream dataset URNs, or an empty list if parsing fails.
"""
# Attempt to get the SQL parsing result for the current view.
spr: SqlParsingResult = self._get_spr()
# Remove any 'hive.' prefix from upstream table URNs.
upstream_dataset_urns: List[str] = [
_drop_hive_dot(urn) for urn in spr.in_tables
]
# Fix any derived view references present in the URNs.
upstream_dataset_urns = fix_derived_view_urn(
urns=upstream_dataset_urns,
looker_view_id_cache=self.looker_view_id_cache,
base_folder_path=self.view_context.base_folder_path,
config=self.config,
)
return upstream_dataset_urns
def _get_looker_api_field_name(self, field_name: str) -> str:
"""
Translate the field name to the looker api field name
Example:
pk -> purchases.pk
"""
return f"{self.view_context.name()}.{field_name}"
def _get_field_name_from_looker_api_field_name(
self, looker_api_field_name: str
) -> str:
"""
Translate the looker api field name to the field name
Example:
purchases.pk -> pk
"""
# Remove the view name at the start and the dot from the looker_api_field_name, but only if it matches the current view name
prefix = f"{self.view_context.name()}."
if looker_api_field_name.startswith(prefix):
return looker_api_field_name[len(prefix) :]
else:
# Don't throw an error, just return the original field name
return looker_api_field_name
def get_upstream_dataset_urn(self) -> List[Urn]:
"""Get upstream dataset URNs"""
return self._get_upstream_dataset_urn()
def get_upstream_column_ref(
self, field_context: LookerFieldContext
) -> List[ColumnRef]:
"""Return upstream column references for a given field."""
spr: SqlParsingResult = self._get_spr()
if not spr.column_lineage:
return []
field_type: Optional[ViewFieldDimensionGroupType] = None
field_name = field_context.name()
try:
# Try if field is a dimension group
field_type = ViewFieldDimensionGroupType(
field_context.raw_field.get(VIEW_FIELD_TYPE_ATTRIBUTE)
)
if field_type == ViewFieldDimensionGroupType.TIME:
field_name = self._get_time_dim_group_field_name(
field_context.raw_field
)
elif field_type == ViewFieldDimensionGroupType.DURATION:
field_name = self._get_duration_dim_group_field_name(
field_context.raw_field
)
except Exception:
# Not a dimension group, no modification needed
logger.debug(
f"view-name={self.view_context.name()}, field-name={field_name}, field-type={field_context.raw_field.get(VIEW_FIELD_TYPE_ATTRIBUTE)}"
)
field_api_name = self._get_looker_api_field_name(field_name).lower()
upstream_refs: List[ColumnRef] = []
for lineage in spr.column_lineage:
if lineage.downstream.column.lower() == field_api_name:
for upstream in lineage.upstreams:
upstream_refs.append(
ColumnRef(table=upstream.table, column=upstream.column)
)
return _drop_hive_dot_from_upstream(upstream_refs)
def create_fields(self) -> List[ViewField]:
"""Create ViewField objects from SQL parsing result."""
spr: SqlParsingResult = self._get_spr()
if not spr.column_lineage:
return []
fields: List[ViewField] = []
for lineage in spr.column_lineage:
fields.append(
ViewField(
name=self._get_field_name_from_looker_api_field_name(
lineage.downstream.column
),
label="",
type=lineage.downstream.native_column_type or "unknown",
description="",
field_type=ViewFieldType.UNKNOWN,
upstream_fields=_drop_hive_dot_from_upstream(lineage.upstreams),
)
)
return fields
class SqlBasedDerivedViewUpstream(AbstractViewUpstream, ABC):
"""
Handle the case where upstream dataset is defined in derived_table.sql
@ -674,7 +1129,45 @@ def create_view_upstream(
config: LookMLSourceConfig,
ctx: PipelineContext,
reporter: LookMLSourceReport,
looker_client: Optional["LookerAPI"] = None,
view_to_explore_map: Optional[Dict[str, str]] = None,
) -> AbstractViewUpstream:
# Looker client is required for LookerQueryAPIBasedViewUpstream also enforced by config.use_api_for_view_lineage
# view_to_explore_map is required for Looker query API args
# Only process if view exists in view_to_explore_map, because we cannot query views which are not reachable from an explore
if (
config.use_api_for_view_lineage
and looker_client
and view_to_explore_map
and view_context.name() in view_to_explore_map
):
try:
return LookerQueryAPIBasedViewUpstream(
view_context=view_context,
config=config,
reporter=reporter,
ctx=ctx,
looker_view_id_cache=looker_view_id_cache,
looker_client=looker_client,
view_to_explore_map=view_to_explore_map,
)
except Exception as e:
# Falling back to custom regex-based parsing - best effort approach
reporter.report_warning(
title="Looker Query API based View Upstream Failed",
message="Error in getting upstream lineage for view using Looker Query API",
context=f"View-name: {view_context.name()}",
exc=e,
)
else:
logger.debug(
f"Skipping Looker Query API for view: {view_context.name()} because one or more conditions are not met: "
f"use_api_for_view_lineage={config.use_api_for_view_lineage}, "
f"looker_client={'set' if looker_client else 'not set'}, "
f"view_to_explore_map={'set' if view_to_explore_map else 'not set'}, "
f"view_in_view_to_explore_map={view_context.name() in view_to_explore_map if view_to_explore_map else False}"
)
if view_context.is_regular_case():
return RegularViewUpstream(
view_context=view_context,

View File

@ -590,7 +590,10 @@ def setup_mock_all_user(mocked_client):
def side_effect_query_inline(
result_format: str, body: WriteQuery, transport_options: Optional[TransportOptions]
result_format: str,
body: WriteQuery,
transport_options: Optional[TransportOptions],
cache: Optional[bool] = None,
) -> str:
query_type: looker_usage.QueryId
if result_format == "sql":

View File

@ -0,0 +1,83 @@
# Define the database connection to be used for this model.
connection: "long-tail-companions-snowflake"
# include all the views
include: "/views/**/*.view.lkml"
# Datagroups define a caching policy for an Explore. To learn more,
# use the Quick Help panel on the right to see documentation.
datagroup: dev_project_default_datagroup {
# sql_trigger: SELECT MAX(id) FROM etl_log;;
max_cache_age: "1 hour"
}
persist_with: dev_project_default_datagroup
explore: purchases {
join: users {
type: left_outer
sql_on: ${purchases.user_fk} = ${users.pk} ;;
relationship: many_to_one
}
join: user_metrics{
type: left_outer
sql_on:${user_metrics.user_id} = ${users.pk} ;;
relationship: many_to_one
}
}
# explore: users{
# join: purchases {
# type: left_outer
# sql_on: ${users.pk} = ${purchases.user_fk} ;;
# relationship: one_to_many
# }
# join: user_metrics{
# type: left_outer
# sql_on:${user_metrics.user_id} = ${users.pk} ;;
# relationship: many_to_one
# }
# }
explore: user_metrics {
description: "Analyze customer segments, lifetime value, and purchasing patterns"
join: users {
type: inner
sql_on: ${user_metrics.user_id} = ${users.pk} ;;
relationship: many_to_one
}
join: purchases{
type: left_outer
sql_on: ${user_metrics.user_id} = ${purchases.user_fk} ;;
relationship: one_to_many
}
}
explore: customer_analysis{
from: users
description: "Customer analysis and demographics"
join: purchases {
type: left_outer
sql_on: ${customer_analysis.pk} = ${purchases.user_fk} ;;
relationship: one_to_many
}
join: user_metrics{
type: left_outer
sql_on:${user_metrics.user_id} = ${customer_analysis.pk} ;;
relationship: one_to_one
}
join: users {
type: inner
sql_on: ${customer_analysis.pk} = ${users.pk} ;;
relationship: one_to_one
}
}

View File

@ -0,0 +1,93 @@
# The name of this view in Looker is "Purchases"
view: purchases {
# The sql_table_name parameter indicates the underlying database table
# to be used for all fields in this view.
sql_table_name: "ECOMMERCE"."PURCHASES" ;;
# No primary key is defined for this view. In order to join this view in an Explore,
# define primary_key: yes on a dimension that has no repeated values.
# Dates and timestamps can be represented in Looker using a dimension group of type: time.
# Looker converts dates and timestamps to the specified timeframes within the dimension group.
dimension_group: created {
type: time
timeframes: [raw, time, date, week, month, quarter, year]
sql: ${TABLE}."CREATED_AT" ;;
}
# Here's what a typical dimension looks like in LookML.
# A dimension is a groupable field that can be used to filter query results.
# This dimension will be called "Pk" in Explore.
dimension: pk {
primary_key: yes
type: number
sql: ${TABLE}."PK" ;;
}
dimension: purchase_amount {
type: number
sql: ${TABLE}."PURCHASE_AMOUNT" ;;
}
dimension: status {
type: string
sql: ${TABLE}."STATUS" ;;
}
dimension: tax_amount {
type: number
sql: ${TABLE}."TAX_AMOUNT" ;;
}
dimension: total_amount {
type: number
sql: ${TABLE}."TOTAL_AMOUNT" ;;
}
dimension_group: updated {
type: time
timeframes: [raw, time, date, week, month, quarter, year]
sql: ${TABLE}."UPDATED_AT" ;;
}
dimension: user_fk {
type: number
sql: ${TABLE}."USER_FK" ;;
}
# Inter View Dimension References
dimension: is_expensive_purchase {
type: yesno
sql: ${total_amount} > 100 ;;
}
# Inter View Nested Dimension References
measure: num_of_expensive_purchases {
type: count
drill_fields: [is_expensive_purchase]
}
# Intra View Dimension Reference
dimension: user_email{
type: string
sql:${users.email} ;;
}
measure: average_purchase_value{
type: average
sql: ${total_amount} ;;
value_format_name: usd
}
measure: count {
type: count
}
dimension_group: purchase_age {
type: duration
sql_start: ${TABLE}."CREATED_AT" ;;
sql_end: CURRENT_TIMESTAMP ;;
intervals: [day, week, month, quarter, year]
}
}

View File

@ -0,0 +1,43 @@
view: user_metrics {
derived_table: {
sql: SELECT
user_fk as user_id,
COUNT(DISTINCT pk) as purchase_count,
SUM(total_amount) as total_spent
FROM ${purchases.SQL_TABLE_NAME}
GROUP BY user_id ;;
}
dimension: user_id {
type: number
sql: ${TABLE}.user_id ;;
primary_key: yes
}
dimension: purchase_count {
type: number
sql: ${TABLE}.purchase_count ;;
}
dimension: total_spent {
type: number
sql: ${TABLE}.total_spent ;;
}
# Cross-view dimension with conditional logic
dimension: customer_segment {
type: string
sql: CASE
WHEN ${total_spent} > 1000 THEN 'High Value'
WHEN ${total_spent} > 500 THEN 'Medium Value'
ELSE 'Low Value'
END ;;
}
# Cross-view measure with filtering
measure: high_value_customer_count {
type: count_distinct
sql: CASE WHEN ${total_spent} > 1000 THEN ${users.pk} END ;;
description: "Count of customers who spent over $1000"
}
}

View File

@ -0,0 +1,73 @@
# The name of this view in Looker is "Users"
view: users {
# The sql_table_name parameter indicates the underlying database table
# to be used for all fields in this view.
sql_table_name: "ECOMMERCE"."USERS" ;;
# No primary key is defined for this view. In order to join this view in an Explore,
# define primary_key: yes on a dimension that has no repeated values.
# Dates and timestamps can be represented in Looker using a dimension group of type: time.
# Looker converts dates and timestamps to the specified timeframes within the dimension group.
dimension_group: created {
type: time
timeframes: [raw, time, date, week, month, quarter, year]
sql: ${TABLE}."CREATED_AT" ;;
}
# Here's what a typical dimension looks like in LookML.
# A dimension is a groupable field that can be used to filter query results.
# This dimension will be called "Email" in Explore.
dimension: email {
type: string
sql: ${TABLE}."EMAIL" ;;
}
dimension: pk {
primary_key: yes
type: number
sql: ${TABLE}."PK" ;;
}
dimension_group: updated {
type: time
timeframes: [raw, time, date, week, month, quarter, year]
sql: ${TABLE}."UPDATED_AT" ;;
}
measure: lifetime_purchase_count{
type: count_distinct
sql: ${purchases.pk} ;;
description: "Total lifetime purchases count by user"
}
measure: lifetime_total_purchase_amount{
type: sum
sql: ${purchases.total_amount};;
value_format_name: usd
description: "Total lifetime revenue from purchases by user"
}
dimension: user_purchase_status{
type: string
sql:
CASE
WHEN ${user_metrics.purchase_count} <= 1 THEN 'First Purchase'
WHEN ${user_metrics.purchase_count} <= 3 THEN 'Early Customer'
WHEN ${user_metrics.purchase_count} <= 10 THEN 'Regular Customer'
ELSE 'Loyal Customer'
END ;;
}
dimension_group: user_age {
type: duration
sql_start: ${TABLE}."CREATED_AT" ;;
sql_end: CURRENT_TIMESTAMP ;;
intervals: [day, week, month, quarter, year]
}
measure: count {
type: count
}
}

View File

@ -1285,3 +1285,181 @@ def test_unreachable_views(pytestconfig):
"The Looker view file was skipped because it may not be referenced by any models."
in [failure.message for failure in source.get_report().warnings]
)
@freeze_time(FROZEN_TIME)
def test_col_lineage_looker_api_based(pytestconfig, tmp_path):
test_resources_dir = pytestconfig.rootpath / "tests/integration/lookml"
golden_path = test_resources_dir / "lkml_col_lineage_looker_api_based_golden.json"
mce_out_file = "lkml_col_lineage_looker_api_based.json"
recipe = {
"run_id": "lookml-test",
"source": {
"type": "lookml",
"config": {
"base_folder": f"{test_resources_dir}/lkml_col_lineage_sample",
"connection_to_platform_map": {"my_connection": "postgres"},
"parse_table_names_from_sql": True,
"tag_measures_and_dimensions": False,
"project_name": "lkml_col_lineage_sample",
"use_api_for_view_lineage": True,
"api": {
"client_id": "fake_client_id",
"client_secret": "fake_secret",
"base_url": "fake_account.looker.com",
},
},
},
"sink": {
"type": "file",
"config": {
"filename": f"{tmp_path / mce_out_file}",
},
},
}
# Mock SQL responses based on the dump file
mock_sql_responses = {
# For user_metrics view (fields starting with user_metrics.)
"user_metrics": """WITH user_metrics AS (SELECT
user_fk as user_id,
COUNT(DISTINCT pk) as purchase_count,
SUM(total_amount) as total_spent
FROM "ECOMMERCE"."PURCHASES"
GROUP BY user_id )
SELECT
user_metrics.user_id AS "user_metrics.user_id",
user_metrics.purchase_count AS "user_metrics.purchase_count",
user_metrics.total_spent AS "user_metrics.total_spent",
CASE
WHEN user_metrics.total_spent > 1000 THEN 'High Value'
WHEN user_metrics.total_spent > 500 THEN 'Medium Value'
ELSE 'Low Value'
END AS "user_metrics.customer_segment",
COUNT(DISTINCT CASE WHEN user_metrics.total_spent > 1000 THEN ( users."PK" ) END ) AS "user_metrics.high_value_customer_count"
FROM "ECOMMERCE"."USERS" AS customer_analysis
LEFT JOIN user_metrics ON user_metrics.user_id = (customer_analysis."PK")
INNER JOIN "ECOMMERCE"."USERS" AS users ON (customer_analysis."PK") = (users."PK")
GROUP BY
1,
2,
3,
4
ORDER BY
5 DESC
FETCH NEXT 1 ROWS ONLY""",
# For users view (fields starting with users.)
"users": """WITH user_metrics AS (SELECT
user_fk as user_id,
COUNT(DISTINCT pk) as purchase_count,
SUM(total_amount) as total_spent
FROM "ECOMMERCE"."PURCHASES"
GROUP BY user_id )
SELECT
users."EMAIL" AS "users.email",
users."PK" AS "users.pk",
CASE
WHEN user_metrics.purchase_count <= 1 THEN 'First Purchase'
WHEN user_metrics.purchase_count <= 3 THEN 'Early Customer'
WHEN user_metrics.purchase_count <= 10 THEN 'Regular Customer'
ELSE 'Loyal Customer'
END AS "users.user_purchase_status",
users."CREATED_AT" AS "users.created_raw",
users."UPDATED_AT" AS "users.updated_raw",
(TIMESTAMPDIFF(DAY, CONVERT_TIMEZONE('UTC', 'America/Los_Angeles', CAST(users."CREATED_AT" AS TIMESTAMP_NTZ)), CONVERT_TIMEZONE('UTC', 'America/Los_Angeles', CAST(CURRENT_TIMESTAMP AS TIMESTAMP_NTZ))) + CASE WHEN TIMESTAMPDIFF(SECOND, TO_DATE(CONVERT_TIMEZONE('UTC', 'America/Los_Angeles', CAST(CURRENT_TIMESTAMP AS TIMESTAMP_NTZ))), CONVERT_TIMEZONE('UTC', 'America/Los_Angeles', CAST(CURRENT_TIMESTAMP AS TIMESTAMP_NTZ))) = TIMESTAMPDIFF(SECOND, TO_DATE(CONVERT_TIMEZONE('UTC', 'America/Los_Angeles', CAST(users."CREATED_AT" AS TIMESTAMP_NTZ))), CONVERT_TIMEZONE('UTC', 'America/Los_Angeles', CAST(users."CREATED_AT" AS TIMESTAMP_NTZ))) THEN 0 WHEN TIMESTAMPDIFF(SECOND, TO_DATE(CONVERT_TIMEZONE('UTC', 'America/Los_Angeles', CAST(CURRENT_TIMESTAMP AS TIMESTAMP_NTZ))), CONVERT_TIMEZONE('UTC', 'America/Los_Angeles', CAST(CURRENT_TIMESTAMP AS TIMESTAMP_NTZ))) < TIMESTAMPDIFF(SECOND, TO_DATE(CONVERT_TIMEZONE('UTC', 'America/Los_Angeles', CAST(users."CREATED_AT" AS TIMESTAMP_NTZ))), CONVERT_TIMEZONE('UTC', 'America/Los_Angeles', CAST(users."CREATED_AT" AS TIMESTAMP_NTZ))) THEN CASE WHEN CONVERT_TIMEZONE('UTC', 'America/Los_Angeles', CAST(users."CREATED_AT" AS TIMESTAMP_NTZ)) < CONVERT_TIMEZONE('UTC', 'America/Los_Angeles', CAST(CURRENT_TIMESTAMP AS TIMESTAMP_NTZ)) THEN -1 ELSE 0 END ELSE CASE WHEN CONVERT_TIMEZONE('UTC', 'America/Los_Angeles', CAST(users."CREATED_AT" AS TIMESTAMP_NTZ)) > CONVERT_TIMEZONE('UTC', 'America/Los_Angeles', CAST(CURRENT_TIMESTAMP AS TIMESTAMP_NTZ)) THEN 1 ELSE 0 END END) AS "users.days_user_age",
COUNT(DISTINCT ( purchases."PK" ) ) AS "users.lifetime_purchase_count",
COALESCE(CAST( ( SUM(DISTINCT (CAST(FLOOR(COALESCE( ( purchases."TOTAL_AMOUNT" ) ,0)*(1000000*1.0)) AS DECIMAL(38,0))) + (TO_NUMBER(MD5( users."PK" ), 'XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX') % 1.0e27)::NUMERIC(38, 0) ) - SUM(DISTINCT (TO_NUMBER(MD5( users."PK" ), 'XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX') % 1.0e27)::NUMERIC(38, 0)) ) AS DOUBLE PRECISION) / CAST((1000000*1.0) AS DOUBLE PRECISION), 0) AS "users.lifetime_total_purchase_amount",
COUNT(DISTINCT users."PK" ) AS "users.count"
FROM "ECOMMERCE"."USERS" AS customer_analysis
LEFT JOIN "ECOMMERCE"."PURCHASES" AS purchases ON (customer_analysis."PK") = (purchases."USER_FK")
LEFT JOIN user_metrics ON user_metrics.user_id = (customer_analysis."PK")
INNER JOIN "ECOMMERCE"."USERS" AS users ON (customer_analysis."PK") = (users."PK")
GROUP BY
1,
2,
3,
4,
5,
6
ORDER BY
7 DESC
FETCH NEXT 1 ROWS ONLY""",
# For purchases view (fields starting with purchases.)
"purchases": """SELECT
purchases."PK" AS "purchases.pk",
purchases."PURCHASE_AMOUNT" AS "purchases.purchase_amount",
purchases."STATUS" AS "purchases.status",
purchases."TAX_AMOUNT" AS "purchases.tax_amount",
purchases."TOTAL_AMOUNT" AS "purchases.total_amount",
purchases."USER_FK" AS "purchases.user_fk",
(CASE WHEN (purchases."TOTAL_AMOUNT") > 100 THEN 'Yes' ELSE 'No' END) AS "purchases.is_expensive_purchase",
(users."EMAIL") AS "purchases.user_email",
purchases."CREATED_AT" AS "purchases.created_raw",
purchases."UPDATED_AT" AS "purchases.updated_raw",
(TIMESTAMPDIFF(DAY, CONVERT_TIMEZONE('UTC', 'America/Los_Angeles', CAST(purchases."CREATED_AT" AS TIMESTAMP_NTZ)), CONVERT_TIMEZONE('UTC', 'America/Los_Angeles', CAST(CURRENT_TIMESTAMP AS TIMESTAMP_NTZ))) + CASE WHEN TIMESTAMPDIFF(SECOND, TO_DATE(CONVERT_TIMEZONE('UTC', 'America/Los_Angeles', CAST(CURRENT_TIMESTAMP AS TIMESTAMP_NTZ))), CONVERT_TIMEZONE('UTC', 'America/Los_Angeles', CAST(CURRENT_TIMESTAMP AS TIMESTAMP_NTZ))) = TIMESTAMPDIFF(SECOND, TO_DATE(CONVERT_TIMEZONE('UTC', 'America/Los_Angeles', CAST(purchases."CREATED_AT" AS TIMESTAMP_NTZ))), CONVERT_TIMEZONE('UTC', 'America/Los_Angeles', CAST(purchases."CREATED_AT" AS TIMESTAMP_NTZ))) THEN 0 WHEN TIMESTAMPDIFF(SECOND, TO_DATE(CONVERT_TIMEZONE('UTC', 'America/Los_Angeles', CAST(CURRENT_TIMESTAMP AS TIMESTAMP_NTZ))), CONVERT_TIMEZONE('UTC', 'America/Los_Angeles', CAST(CURRENT_TIMESTAMP AS TIMESTAMP_NTZ))) < TIMESTAMPDIFF(SECOND, TO_DATE(CONVERT_TIMEZONE('UTC', 'America/Los_Angeles', CAST(purchases."CREATED_AT" AS TIMESTAMP_NTZ))), CONVERT_TIMEZONE('UTC', 'America/Los_Angeles', CAST(purchases."CREATED_AT" AS TIMESTAMP_NTZ))) THEN CASE WHEN CONVERT_TIMEZONE('UTC', 'America/Los_Angeles', CAST(purchases."CREATED_AT" AS TIMESTAMP_NTZ)) < CONVERT_TIMEZONE('UTC', 'America/Los_Angeles', CAST(CURRENT_TIMESTAMP AS TIMESTAMP_NTZ)) THEN -1 ELSE 0 END ELSE CASE WHEN CONVERT_TIMEZONE('UTC', 'America/Los_Angeles', CAST(purchases."CREATED_AT" AS TIMESTAMP_NTZ)) > CONVERT_TIMEZONE('UTC', 'America/Los_Angeles', CAST(CURRENT_TIMESTAMP AS TIMESTAMP_NTZ)) THEN 1 ELSE 0 END END) AS "purchases.days_purchase_age",
COUNT(purchases."PK" ) AS "purchases.num_of_expensive_purchases",
AVG(( purchases."TOTAL_AMOUNT" ) ) AS "purchases.average_purchase_value",
COUNT(purchases."PK" ) AS "purchases.count"
FROM "ECOMMERCE"."USERS" AS customer_analysis
LEFT JOIN "ECOMMERCE"."PURCHASES" AS purchases ON (customer_analysis."PK") = (purchases."USER_FK")
INNER JOIN "ECOMMERCE"."USERS" AS users ON (customer_analysis."PK") = (users."PK")
GROUP BY
1,
2,
3,
4,
5,
6,
7,
8,
9,
10,
11
ORDER BY
12 DESC
FETCH NEXT 1 ROWS ONLY""",
}
def mock_run_inline_query(
body, result_format=None, transport_options=None, cache=None
):
# Determine which view is being queried based on the fields
write_query = body
if write_query.fields and any(
field.startswith("user_metrics.") for field in write_query.fields
):
return mock_sql_responses["user_metrics"]
elif write_query.fields and any(
field.startswith("users.") for field in write_query.fields
):
return mock_sql_responses["users"]
elif write_query.fields and any(
field.startswith("purchases.") for field in write_query.fields
):
return mock_sql_responses["purchases"]
else:
# Default fallback
return mock_sql_responses["user_metrics"]
mock_connection = DBConnection(
dialect_name="postgres",
database="my_database",
)
mock_model = mock.MagicMock(project_name="lkml_col_lineage_sample")
mocked_client = mock.MagicMock()
mocked_client.run_inline_query.side_effect = mock_run_inline_query
mocked_client.connection.return_value = mock_connection
mocked_client.lookml_model.return_value = mock_model
with mock.patch("looker_sdk.init40", return_value=mocked_client):
pipeline = Pipeline.create(recipe)
pipeline.run()
pipeline.pretty_print_summary()
pipeline.raise_from_status(raise_warnings=True)
mce_helpers.check_golden_file(
pytestconfig,
output_path=tmp_path / mce_out_file,
golden_path=golden_path,
)

View File

@ -0,0 +1,518 @@
from unittest.mock import MagicMock, patch
import pytest
from looker_sdk.sdk.api40.models import WriteQuery
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.source.looker.looker_common import (
LookerViewId,
ViewField,
ViewFieldType,
)
from datahub.ingestion.source.looker.looker_constant import (
NAME,
VIEW_FIELD_INTERVALS_ATTRIBUTE,
VIEW_FIELD_TIMEFRAMES_ATTRIBUTE,
VIEW_FIELD_TYPE_ATTRIBUTE,
)
from datahub.ingestion.source.looker.looker_lib_wrapper import LookerAPI
from datahub.ingestion.source.looker.looker_view_id_cache import LookerViewIdCache
from datahub.ingestion.source.looker.lookml_concept_context import (
LookerFieldContext,
LookerViewContext,
)
from datahub.ingestion.source.looker.lookml_config import (
LookMLSourceConfig,
LookMLSourceReport,
)
from datahub.ingestion.source.looker.view_upstream import (
LookerQueryAPIBasedViewUpstream,
)
from datahub.sql_parsing.sqlglot_lineage import SqlParsingResult
def create_mock_sql_parsing_result(
table_error=None, column_error=None, in_tables=None, column_lineage=None
):
"""Helper function to create a properly mocked SqlParsingResult."""
mock_spr = MagicMock(spec=SqlParsingResult)
mock_debug_info = MagicMock()
mock_debug_info.table_error = table_error
mock_debug_info.column_error = column_error
mock_spr.debug_info = mock_debug_info
mock_spr.in_tables = in_tables or []
mock_spr.column_lineage = column_lineage or []
return mock_spr
class TestLookMLAPIBasedViewUpstream:
"""Test suite for LookerQueryAPIBasedViewUpstream functionality."""
@pytest.fixture
def mock_view_context(self):
"""Create a mock LookerViewContext for testing."""
view_context = MagicMock(spec=LookerViewContext)
view_context.name.return_value = "test_view"
view_context.base_folder_path = "/test/path"
view_context.dimensions.return_value = [
{NAME: "user_id", "type": "string"},
{NAME: "email", "type": "string"},
]
view_context.measures.return_value = [
{NAME: "total_users", "type": "number"},
]
view_context.dimension_groups.return_value = []
# Mock view_connection
mock_connection = MagicMock()
mock_connection.default_schema = "public"
mock_connection.default_db = "test_db"
mock_connection.platform = "postgres"
mock_connection.platform_instance = None
mock_connection.platform_env = None
view_context.view_connection = mock_connection
return view_context
@pytest.fixture
def mock_looker_view_id_cache(self):
"""Create a mock LookerViewIdCache for testing."""
cache = MagicMock(spec=LookerViewIdCache)
cache.model_name = "test_model"
return cache
@pytest.fixture
def mock_config(self):
"""Create a mock LookMLSourceConfig for testing."""
config = MagicMock(spec=LookMLSourceConfig)
config.use_api_for_view_lineage = True
config.use_api_cache_for_view_lineage = False
config.env = "PROD"
return config
@pytest.fixture
def mock_reporter(self):
"""Create a mock LookMLSourceReport for testing."""
return MagicMock(spec=LookMLSourceReport)
@pytest.fixture
def mock_ctx(self):
"""Create a mock PipelineContext for testing."""
ctx = MagicMock(spec=PipelineContext)
ctx.graph = MagicMock()
return ctx
@pytest.fixture
def mock_looker_client(self):
"""Create a mock LookerAPI client for testing."""
client = MagicMock(spec=LookerAPI)
return client
@pytest.fixture
def view_to_explore_map(self):
"""Create a view to explore mapping for testing."""
return {"test_view": "test_explore"}
@pytest.fixture
def upstream_instance(
self,
mock_view_context,
mock_looker_view_id_cache,
mock_config,
mock_reporter,
mock_ctx,
mock_looker_client,
view_to_explore_map,
):
"""Create a LookerQueryAPIBasedViewUpstream instance for testing."""
# Mock the API response to prevent initialization errors
mock_looker_client.generate_sql_query.return_value = [
{"sql": "SELECT test_view.user_id FROM test_table"}
]
# Mock the view ID cache
mock_view_id = MagicMock(spec=LookerViewId)
mock_view_id.get_urn.return_value = "urn:li:dataset:test"
mock_looker_view_id_cache.get_looker_view_id.return_value = mock_view_id
with patch(
"datahub.ingestion.source.looker.view_upstream.create_lineage_sql_parsed_result"
) as mock_create_lineage:
# Mock successful SQL parsing
mock_spr = create_mock_sql_parsing_result()
mock_create_lineage.return_value = mock_spr
return LookerQueryAPIBasedViewUpstream(
view_context=mock_view_context,
looker_view_id_cache=mock_looker_view_id_cache,
config=mock_config,
reporter=mock_reporter,
ctx=mock_ctx,
looker_client=mock_looker_client,
view_to_explore_map=view_to_explore_map,
)
def test_time_dimension_group_handling(self, upstream_instance):
"""Test that time dimension groups are handled correctly."""
dim_group = {
NAME: "created",
VIEW_FIELD_TYPE_ATTRIBUTE: "time",
VIEW_FIELD_TIMEFRAMES_ATTRIBUTE: ["date", "week", "month"],
}
result = upstream_instance._get_time_dim_group_field_name(dim_group)
assert result == "created_date"
def test_time_dimension_group_without_timeframes(self, upstream_instance):
"""Test time dimension group handling when timeframes are not specified."""
dim_group = {
NAME: "created",
VIEW_FIELD_TYPE_ATTRIBUTE: "time",
}
result = upstream_instance._get_time_dim_group_field_name(dim_group)
assert result == "created_raw"
def test_duration_dimension_group_handling(self, upstream_instance):
"""Test that duration dimension groups are handled correctly."""
dim_group = {
NAME: "since_event",
VIEW_FIELD_TYPE_ATTRIBUTE: "duration",
VIEW_FIELD_INTERVALS_ATTRIBUTE: ["hour", "day", "week"],
}
result = upstream_instance._get_duration_dim_group_field_name(dim_group)
assert result == "hours_since_event"
def test_duration_dimension_group_without_intervals(self, upstream_instance):
"""Test duration dimension group handling when intervals are not specified."""
dim_group = {
NAME: "since_event",
VIEW_FIELD_TYPE_ATTRIBUTE: "duration",
}
result = upstream_instance._get_duration_dim_group_field_name(dim_group)
assert result == "days_since_event"
def test_get_looker_api_field_name(self, upstream_instance):
"""Test field name translation to Looker API format."""
result = upstream_instance._get_looker_api_field_name("user_id")
assert result == "test_view.user_id"
def test_get_field_name_from_looker_api_field_name(self, upstream_instance):
"""Test field name translation from Looker API format."""
result = upstream_instance._get_field_name_from_looker_api_field_name(
"test_view.user_id"
)
assert result == "user_id"
def test_get_field_name_from_looker_api_field_name_mismatch(
self, upstream_instance
):
"""Test field name translation when view name doesn't match."""
result = upstream_instance._get_field_name_from_looker_api_field_name(
"other_view.user_id"
)
assert result == "other_view.user_id"
def test_get_sql_write_query_success(self, upstream_instance):
"""Test successful WriteQuery construction."""
query = upstream_instance._get_sql_write_query()
assert isinstance(query, WriteQuery)
assert query.model == "test_model"
assert query.view == "test_explore"
assert query.limit == "1"
assert query.fields is not None
assert "test_view.user_id" in query.fields
assert "test_view.email" in query.fields
assert "test_view.total_users" in query.fields
def test_get_sql_write_query_no_fields(self, upstream_instance, mock_view_context):
"""Test WriteQuery construction when no fields are found."""
mock_view_context.dimensions.return_value = []
mock_view_context.measures.return_value = []
mock_view_context.dimension_groups.return_value = []
with pytest.raises(ValueError, match="No fields found for view"):
upstream_instance._get_sql_write_query()
@patch(
"datahub.ingestion.source.looker.view_upstream.create_lineage_sql_parsed_result"
)
def test_execute_query_success(
self, mock_create_lineage, upstream_instance, mock_looker_client
):
"""Test successful query execution."""
# Mock the SQL response
mock_sql_response = "SELECT test_view.user_id FROM test_table"
mock_looker_client.generate_sql_query.return_value = mock_sql_response
# Mock the SQL parsing result
mock_spr = create_mock_sql_parsing_result(
in_tables=["urn:li:dataset:(urn:li:dataPlatform:postgres,test_table,PROD)"]
)
mock_create_lineage.return_value = mock_spr
result = upstream_instance._execute_query(MagicMock(spec=WriteQuery))
assert result == "SELECT test_view.user_id FROM test_table"
def test_execute_query_no_sql_response(self, upstream_instance, mock_looker_client):
"""Test query execution when no SQL is returned."""
mock_looker_client.generate_sql_query.return_value = []
with pytest.raises(ValueError, match="No SQL found in response"):
upstream_instance._execute_query(MagicMock(spec=WriteQuery))
def test_execute_query_invalid_response_format(
self, upstream_instance, mock_looker_client
):
"""Test query execution with invalid response format."""
mock_looker_client.generate_sql_query.return_value = None
with pytest.raises(ValueError, match="No SQL found in response"):
upstream_instance._execute_query(MagicMock(spec=WriteQuery))
@patch(
"datahub.ingestion.source.looker.view_upstream.create_lineage_sql_parsed_result"
)
def test_get_spr_table_error(
self, mock_create_lineage, upstream_instance, mock_looker_client
):
"""Test SQL parsing result when table extraction fails."""
# Clear the cache to force re-execution
upstream_instance._get_spr.cache_clear()
# Mock the SQL response
mock_sql_response = [{"sql": "SELECT * FROM test_table"}]
mock_looker_client.generate_sql_query.return_value = mock_sql_response
# Mock the SQL parsing result with table error
mock_spr = create_mock_sql_parsing_result(
table_error=Exception("Table parsing failed")
)
mock_create_lineage.return_value = mock_spr
with pytest.raises(
ValueError, match="Error in parsing SQL for upstream tables"
):
upstream_instance._get_spr()
@patch(
"datahub.ingestion.source.looker.view_upstream.create_lineage_sql_parsed_result"
)
def test_get_spr_column_error(
self, mock_create_lineage, upstream_instance, mock_looker_client
):
"""Test SQL parsing result when column extraction fails."""
# Clear the cache to force re-execution
upstream_instance._get_spr.cache_clear()
# Mock the SQL response
mock_sql_response = [{"sql": "SELECT * FROM test_table"}]
mock_looker_client.generate_sql_query.return_value = mock_sql_response
# Mock the SQL parsing result with column error
mock_spr = create_mock_sql_parsing_result(
column_error=Exception("Column parsing failed")
)
mock_create_lineage.return_value = mock_spr
with pytest.raises(ValueError, match="Error in parsing SQL for column lineage"):
upstream_instance._get_spr()
@patch(
"datahub.ingestion.source.looker.view_upstream.create_lineage_sql_parsed_result"
)
def test_get_upstream_dataset_urn(
self, mock_create_lineage, upstream_instance, mock_looker_client
):
"""Test upstream dataset URN extraction."""
# Clear all caches to force re-execution
upstream_instance._get_spr.cache_clear()
upstream_instance._get_upstream_dataset_urn.cache_clear()
# Mock the SQL response
mock_sql_response = [{"sql": "SELECT * FROM test_table"}]
mock_looker_client.generate_sql_query.return_value = mock_sql_response
# Mock the SQL parsing result
mock_spr = create_mock_sql_parsing_result(
in_tables=["urn:li:dataset:(urn:li:dataPlatform:postgres,test_table,PROD)"]
)
mock_create_lineage.return_value = mock_spr
result = upstream_instance.get_upstream_dataset_urn()
assert len(result) == 1
assert "test_table" in result[0]
@patch(
"datahub.ingestion.source.looker.view_upstream.create_lineage_sql_parsed_result"
)
def test_get_upstream_column_ref(
self, mock_create_lineage, upstream_instance, mock_looker_client
):
"""Test upstream column reference extraction."""
# Clear the cache to force re-execution
upstream_instance._get_spr.cache_clear()
# Mock the SQL response
mock_sql_response = [{"sql": "SELECT user_id FROM test_table"}]
mock_looker_client.generate_sql_query.return_value = mock_sql_response
# Mock the SQL parsing result with column lineage
mock_column_lineage = [
MagicMock(
downstream=MagicMock(column="test_view.user_id"),
upstreams=[MagicMock(table="test_table", column="user_id")],
)
]
mock_spr = create_mock_sql_parsing_result(column_lineage=mock_column_lineage)
mock_create_lineage.return_value = mock_spr
# Mock field context
field_context = MagicMock(spec=LookerFieldContext)
field_context.name.return_value = "user_id"
field_context.raw_field = {NAME: "user_id"}
result = upstream_instance.get_upstream_column_ref(field_context)
assert len(result) == 1
assert result[0].table == "test_table"
assert result[0].column == "user_id"
@patch(
"datahub.ingestion.source.looker.view_upstream.create_lineage_sql_parsed_result"
)
def test_get_upstream_column_ref_dimension_group(
self, mock_create_lineage, upstream_instance, mock_looker_client
):
"""Test upstream column reference extraction for dimension groups."""
# Clear the cache to force re-execution
upstream_instance._get_spr.cache_clear()
# Mock the SQL response
mock_sql_response = [{"sql": "SELECT created_date FROM test_table"}]
mock_looker_client.generate_sql_query.return_value = mock_sql_response
# Mock the SQL parsing result with column lineage
mock_column_lineage = [
MagicMock(
downstream=MagicMock(column="test_view.created_date"),
upstreams=[MagicMock(table="test_table", column="created_at")],
)
]
mock_spr = create_mock_sql_parsing_result(column_lineage=mock_column_lineage)
mock_create_lineage.return_value = mock_spr
# Mock field context for time dimension group
field_context = MagicMock(spec=LookerFieldContext)
field_context.name.return_value = "created"
field_context.raw_field = {
NAME: "created",
VIEW_FIELD_TYPE_ATTRIBUTE: "time",
VIEW_FIELD_TIMEFRAMES_ATTRIBUTE: ["date"],
}
result = upstream_instance.get_upstream_column_ref(field_context)
assert len(result) == 1
assert result[0].table == "test_table"
assert result[0].column == "created_at"
@patch(
"datahub.ingestion.source.looker.view_upstream.create_lineage_sql_parsed_result"
)
def test_create_fields(
self, mock_create_lineage, upstream_instance, mock_looker_client
):
"""Test ViewField creation from SQL parsing result."""
# Clear the cache to force re-execution
upstream_instance._get_spr.cache_clear()
# Mock the SQL response
mock_sql_response = [{"sql": "SELECT user_id FROM test_table"}]
mock_looker_client.generate_sql_query.return_value = mock_sql_response
# Mock the SQL parsing result with column lineage
mock_column_lineage = [
MagicMock(
downstream=MagicMock(
column="test_view.user_id", native_column_type="string"
),
upstreams=[MagicMock(table="test_table", column="user_id")],
)
]
mock_spr = create_mock_sql_parsing_result(column_lineage=mock_column_lineage)
mock_create_lineage.return_value = mock_spr
result = upstream_instance.create_fields()
assert len(result) == 1
assert isinstance(result[0], ViewField)
assert result[0].name == "user_id"
assert result[0].type == "string"
assert result[0].field_type == ViewFieldType.UNKNOWN
def test_create_fields_no_column_lineage(self, upstream_instance):
"""Test ViewField creation when no column lineage is available."""
# Mock the SQL parsing result without column lineage
mock_spr = MagicMock(spec=SqlParsingResult)
mock_spr.column_lineage = None
with patch.object(upstream_instance, "_get_spr", return_value=mock_spr):
result = upstream_instance.create_fields()
assert result == []
def test_api_failure_fallback(
self,
mock_view_context,
mock_looker_view_id_cache,
mock_config,
mock_reporter,
mock_ctx,
mock_looker_client,
view_to_explore_map,
):
"""Test that API failures are handled gracefully."""
# Mock the Looker client to raise an exception
mock_looker_client.generate_sql_query.side_effect = Exception("API call failed")
# This should not raise an exception, but should be handled by the fallback mechanism
# in the factory function
with pytest.raises(Exception, match="API call failed"):
LookerQueryAPIBasedViewUpstream(
view_context=mock_view_context,
looker_view_id_cache=mock_looker_view_id_cache,
config=mock_config,
reporter=mock_reporter,
ctx=mock_ctx,
looker_client=mock_looker_client,
view_to_explore_map=view_to_explore_map,
)
def test_latency_tracking(
self, upstream_instance, mock_looker_client, mock_reporter
):
"""Test that API latency is tracked and reported."""
# Clear the cache to force re-execution
upstream_instance._get_spr.cache_clear()
# Mock the SQL response
mock_sql_response = [{"sql": "SELECT * FROM test_table"}]
mock_looker_client.generate_sql_query.return_value = mock_sql_response
# Mock the view ID cache to return a valid view ID
mock_view_id = MagicMock(spec=LookerViewId)
mock_view_id.get_urn.return_value = "urn:li:dataset:test"
upstream_instance.looker_view_id_cache.get_looker_view_id.return_value = (
mock_view_id
)
with patch(
"datahub.ingestion.source.looker.view_upstream.create_lineage_sql_parsed_result"
) as mock_create_lineage:
mock_spr = create_mock_sql_parsing_result()
mock_create_lineage.return_value = mock_spr
upstream_instance._execute_query(MagicMock(spec=WriteQuery))
# Verify that latency was reported (may be called multiple times due to caching)
assert mock_reporter.report_looker_query_api_latency.call_count >= 1