From c18b125a05f15c69dc4715607d2eb5ab100a5c82 Mon Sep 17 00:00:00 2001 From: Anush Kumar Date: Fri, 26 Sep 2025 09:27:18 -0700 Subject: [PATCH] feat(ingestion): Enhanced column lineage extraction for Looker/LookML (#14826) --- .../ingestion/source/looker/looker_common.py | 6 + .../source/looker/looker_constant.py | 4 + .../source/looker/looker_lib_wrapper.py | 39 +- .../source/looker/looker_view_id_cache.py | 2 +- .../source/looker/lookml_concept_context.py | 2 +- .../ingestion/source/looker/lookml_config.py | 32 +- .../source/looker/lookml_refinement.py | 2 +- .../ingestion/source/looker/lookml_source.py | 71 +- .../ingestion/source/looker/view_upstream.py | 495 ++++- .../tests/integration/looker/test_looker.py | 5 +- ...l_col_lineage_looker_api_based_golden.json | 1670 +++++++++++++++++ .../models/dev_project.model.lkml | 83 + .../views/purchases.view.lkml | 93 + .../views/user_metrics.view.lkml | 43 + .../views/users.view.lkml | 73 + .../tests/integration/lookml/test_lookml.py | 178 ++ .../tests/unit/lookml/__init__.py | 0 .../test_lookml_api_based_view_upstream.py | 518 +++++ 18 files changed, 3277 insertions(+), 39 deletions(-) create mode 100644 metadata-ingestion/tests/integration/lookml/lkml_col_lineage_looker_api_based_golden.json create mode 100644 metadata-ingestion/tests/integration/lookml/lkml_col_lineage_sample/models/dev_project.model.lkml create mode 100644 metadata-ingestion/tests/integration/lookml/lkml_col_lineage_sample/views/purchases.view.lkml create mode 100644 metadata-ingestion/tests/integration/lookml/lkml_col_lineage_sample/views/user_metrics.view.lkml create mode 100644 metadata-ingestion/tests/integration/lookml/lkml_col_lineage_sample/views/users.view.lkml create mode 100644 metadata-ingestion/tests/unit/lookml/__init__.py create mode 100644 metadata-ingestion/tests/unit/lookml/test_lookml_api_based_view_upstream.py diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/looker_common.py b/metadata-ingestion/src/datahub/ingestion/source/looker/looker_common.py index 08f3682696..b30097adb7 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/looker_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/looker_common.py @@ -307,6 +307,12 @@ class ViewFieldType(Enum): UNKNOWN = "Unknown" +class ViewFieldDimensionGroupType(Enum): + # Ref: https://cloud.google.com/looker/docs/reference/param-field-dimension-group + TIME = "time" + DURATION = "duration" + + class ViewFieldValue(Enum): NOT_AVAILABLE = "NotAvailable" diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/looker_constant.py b/metadata-ingestion/src/datahub/ingestion/source/looker/looker_constant.py index 920efeaa70..206b8ec9a3 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/looker_constant.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/looker_constant.py @@ -11,3 +11,7 @@ prod = "prod" dev = "dev" NAME = "name" DERIVED_DOT_SQL = "derived.sql" + +VIEW_FIELD_TYPE_ATTRIBUTE = "type" +VIEW_FIELD_INTERVALS_ATTRIBUTE = "intervals" +VIEW_FIELD_TIMEFRAMES_ATTRIBUTE = "timeframes" diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/looker_lib_wrapper.py b/metadata-ingestion/src/datahub/ingestion/source/looker/looker_lib_wrapper.py index a62589fb4c..ef43510611 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/looker_lib_wrapper.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/looker_lib_wrapper.py @@ -2,6 +2,7 @@ import json import logging import os +from enum import Enum from functools import lru_cache from typing import Dict, List, MutableMapping, Optional, Sequence, Set, Union, cast @@ -31,6 +32,14 @@ from datahub.configuration.common import ConfigurationError logger = logging.getLogger(__name__) +class LookerQueryResponseFormat(Enum): + # result_format - Ref: https://cloud.google.com/looker/docs/reference/looker-api/latest/methods/Query/run_inline_query + JSON = "json" + SQL = ( + "sql" # Note: This does not execute the query, it only generates the SQL query. + ) + + class TransportOptionsConfig(ConfigModel): timeout: int headers: MutableMapping[str, str] @@ -69,6 +78,7 @@ class LookerAPIStats(BaseModel): search_looks_calls: int = 0 search_dashboards_calls: int = 0 all_user_calls: int = 0 + generate_sql_query_calls: int = 0 class LookerAPI: @@ -170,17 +180,40 @@ class LookerAPI: logger.debug(f"Executing query {write_query}") self.client_stats.query_calls += 1 - response_json = self.client.run_inline_query( - result_format="json", + response = self.client.run_inline_query( + result_format=LookerQueryResponseFormat.JSON.value, body=write_query, transport_options=self.transport_options, ) + data = json.loads(response) + logger.debug("=================Response=================") - data = json.loads(response_json) logger.debug("Length of response: %d", len(data)) return data + def generate_sql_query( + self, write_query: WriteQuery, use_cache: bool = False + ) -> str: + """ + Generates a SQL query string for a given WriteQuery. + + Note: This does not execute the query, it only generates the SQL query. + """ + logger.debug(f"Generating SQL query for {write_query}") + self.client_stats.generate_sql_query_calls += 1 + + response = self.client.run_inline_query( + result_format=LookerQueryResponseFormat.SQL.value, + body=write_query, + transport_options=self.transport_options, + cache=use_cache, + ) + + logger.debug("=================Response=================") + logger.debug("Length of SQL response: %d", len(response)) + return str(response) + def dashboard(self, dashboard_id: str, fields: Union[str, List[str]]) -> Dashboard: self.client_stats.dashboard_calls += 1 return self.client.dashboard( diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/looker_view_id_cache.py b/metadata-ingestion/src/datahub/ingestion/source/looker/looker_view_id_cache.py index 562c7863b3..096481c0c7 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/looker_view_id_cache.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/looker_view_id_cache.py @@ -3,11 +3,11 @@ from typing import Dict, List, Optional from datahub.ingestion.source.looker.looker_common import LookerViewId, ViewFieldValue from datahub.ingestion.source.looker.looker_config import LookerConnectionDefinition +from datahub.ingestion.source.looker.looker_constant import NAME from datahub.ingestion.source.looker.looker_dataclasses import LookerModel from datahub.ingestion.source.looker.looker_file_loader import LookerViewFileLoader from datahub.ingestion.source.looker.lookml_config import ( BASE_PROJECT_NAME, - NAME, LookMLSourceReport, ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_concept_context.py b/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_concept_context.py index 4e38165bb5..7174ee8cc3 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_concept_context.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_concept_context.py @@ -12,12 +12,12 @@ from datahub.ingestion.source.looker.looker_constant import ( DIMENSION_GROUPS, DIMENSIONS, MEASURES, + NAME, ) from datahub.ingestion.source.looker.looker_dataclasses import LookerViewFile from datahub.ingestion.source.looker.looker_file_loader import LookerViewFileLoader from datahub.ingestion.source.looker.lookml_config import ( DERIVED_VIEW_SUFFIX, - NAME, LookMLSourceReport, ) from datahub.ingestion.source.looker.lookml_refinement import LookerRefinementResolver diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_config.py b/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_config.py index 957c2aaad8..c22aa9c011 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_config.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_config.py @@ -28,11 +28,10 @@ from datahub.ingestion.source.state.stateful_ingestion_base import ( StatefulIngestionConfigBase, ) from datahub.utilities.lossy_collections import LossyList +from datahub.utilities.stats_collections import TopKDict, float_top_k_dict logger = logging.getLogger(__name__) -NAME: str = "name" - BASE_PROJECT_NAME = "__BASE" EXPLORE_FILE_EXTENSION = ".explore.lkml" @@ -47,6 +46,9 @@ DERIVED_VIEW_PATTERN: str = r"\$\{([^}]*)\}" @dataclass class LookMLSourceReport(StaleEntityRemovalSourceReport): git_clone_latency: Optional[timedelta] = None + looker_query_api_latency_seconds: TopKDict[str, float] = dataclass_field( + default_factory=float_top_k_dict + ) models_discovered: int = 0 models_dropped: LossyList[str] = dataclass_field(default_factory=LossyList) views_discovered: int = 0 @@ -81,6 +83,11 @@ class LookMLSourceReport(StaleEntityRemovalSourceReport): self.api_stats = self._looker_api.compute_stats() return super().compute_stats() + def report_looker_query_api_latency( + self, view_urn: str, latency: timedelta + ) -> None: + self.looker_query_api_latency_seconds[view_urn] = latency.total_seconds() + class LookMLSourceConfig( LookerCommonConfig, StatefulIngestionConfigBase, EnvConfigMixin @@ -122,6 +129,16 @@ class LookMLSourceConfig( description="List of regex patterns for LookML views to include in the extraction.", ) parse_table_names_from_sql: bool = Field(True, description="See note below.") + use_api_for_view_lineage: bool = Field( + False, + description="When enabled, uses Looker API to get SQL representation of views for lineage parsing instead of parsing LookML files directly. Requires 'api' configuration to be provided." + "Coverage of regex based lineage extraction has limitations, it only supportes ${TABLE}.column_name syntax, See (https://cloud.google.com/looker/docs/reference/param-field-sql#sql_for_dimensions) to" + "understand the other substitutions and cross-references allowed in LookML.", + ) + use_api_cache_for_view_lineage: bool = Field( + False, + description="When enabled, uses Looker API server-side caching for query execution. Requires 'api' configuration to be provided.", + ) api: Optional[LookerAPIConfig] = None project_name: Optional[str] = Field( None, @@ -239,6 +256,17 @@ class LookMLSourceConfig( ) return values + @root_validator(skip_on_failure=True) + def check_api_provided_for_view_lineage(cls, values): + """Validate that we must have an api credential to use Looker API for view's column lineage""" + if not values.get("api") and values.get("use_api_for_view_lineage"): + raise ValueError( + "API credential was not found. LookML source requires api credentials " + "for Looker to use Looker APIs for view's column lineage extraction." + "Set `use_api_for_view_lineage` to False to skip using Looker APIs." + ) + return values + @validator("base_folder", always=True) def check_base_folder_if_not_provided( cls, v: Optional[pydantic.DirectoryPath], values: Dict[str, Any] diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_refinement.py b/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_refinement.py index 6933d9d693..c5a40a5d57 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_refinement.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_refinement.py @@ -4,10 +4,10 @@ import logging from typing import ClassVar, Dict, List, Set from datahub.ingestion.source.looker.looker_config import LookerConnectionDefinition +from datahub.ingestion.source.looker.looker_constant import NAME from datahub.ingestion.source.looker.looker_dataclasses import LookerModel from datahub.ingestion.source.looker.looker_file_loader import LookerViewFileLoader from datahub.ingestion.source.looker.lookml_config import ( - NAME, LookMLSourceConfig, LookMLSourceReport, ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_source.py b/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_source.py index c052eb3b87..eca713efc2 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_source.py @@ -142,6 +142,8 @@ class LookerView: ctx: PipelineContext, extract_col_level_lineage: bool = False, populate_sql_logic_in_descriptions: bool = False, + looker_client: Optional[LookerAPI] = None, + view_to_explore_map: Optional[Dict[str, str]] = None, ) -> Optional["LookerView"]: view_name = view_context.name() @@ -160,6 +162,8 @@ class LookerView: config=config, ctx=ctx, reporter=reporter, + looker_client=looker_client, + view_to_explore_map=view_to_explore_map, ) field_type_vs_raw_fields = OrderedDict( @@ -705,6 +709,11 @@ class LookMLSource(StatefulIngestionSourceBase): # Value: Tuple(model file name, connection name) view_connection_map: Dict[str, Tuple[str, str]] = {} + # Map of view name to explore name for API-based view lineage + # A view can be referenced by multiple explores, we only need one of the explores to use Looker Query API + # Key: view_name, Value: explore_name + view_to_explore_map: Dict[str, str] = {} + # The ** means "this directory and all subdirectories", and hence should # include all the files we want. model_files = sorted( @@ -759,37 +768,37 @@ class LookMLSource(StatefulIngestionSourceBase): ) ) - if self.source_config.emit_reachable_views_only: - model_explores_map = {d["name"]: d for d in model.explores} - for explore_dict in model.explores: - try: - if LookerRefinementResolver.is_refinement(explore_dict["name"]): - continue + model_explores_map = {d["name"]: d for d in model.explores} + for explore_dict in model.explores: + try: + if LookerRefinementResolver.is_refinement(explore_dict["name"]): + continue - explore_dict = ( - looker_refinement_resolver.apply_explore_refinement( - explore_dict - ) - ) - explore: LookerExplore = LookerExplore.from_dict( - model_name, - explore_dict, - model.resolved_includes, - viewfile_loader, - self.reporter, - model_explores_map, - ) - if explore.upstream_views: - for view_name in explore.upstream_views: + explore_dict = looker_refinement_resolver.apply_explore_refinement( + explore_dict + ) + explore: LookerExplore = LookerExplore.from_dict( + model_name, + explore_dict, + model.resolved_includes, + viewfile_loader, + self.reporter, + model_explores_map, + ) + if explore.upstream_views: + for view_name in explore.upstream_views: + if self.source_config.emit_reachable_views_only: explore_reachable_views.add(view_name.include) - except Exception as e: - self.reporter.report_warning( - title="Failed to process explores", - message="Failed to process explore dictionary.", - context=f"Explore Details: {explore_dict}", - exc=e, - ) - logger.debug("Failed to process explore", exc_info=e) + # Build view to explore mapping for API-based view lineage + view_to_explore_map[view_name.include] = explore.name + except Exception as e: + self.reporter.report_warning( + title="Failed to process explores", + message="Failed to process explore dictionary.", + context=f"Explore Details: {explore_dict}", + exc=e, + ) + logger.debug("Failed to process explore", exc_info=e) processed_view_files = processed_view_map.setdefault( model.connection, set() @@ -878,6 +887,10 @@ class LookMLSource(StatefulIngestionSourceBase): populate_sql_logic_in_descriptions=self.source_config.populate_sql_logic_for_missing_descriptions, config=self.source_config, ctx=self.ctx, + looker_client=self.looker_client, + view_to_explore_map=view_to_explore_map + if view_to_explore_map + else None, ) except Exception as e: self.reporter.report_warning( diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py b/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py index f77eebb3cd..07393ab4cc 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/view_upstream.py @@ -1,18 +1,33 @@ import logging import re from abc import ABC, abstractmethod +from datetime import datetime from functools import lru_cache from typing import Dict, List, Optional +from looker_sdk.sdk.api40.models import ( + WriteQuery, +) + from datahub.emitter.mce_builder import make_dataset_urn_with_platform_instance from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.source.looker.looker_common import ( LookerExplore, LookerViewId, ViewField, + ViewFieldDimensionGroupType, ViewFieldType, ) from datahub.ingestion.source.looker.looker_config import LookerConnectionDefinition +from datahub.ingestion.source.looker.looker_constant import ( + NAME, + VIEW_FIELD_INTERVALS_ATTRIBUTE, + VIEW_FIELD_TIMEFRAMES_ATTRIBUTE, + VIEW_FIELD_TYPE_ATTRIBUTE, +) +from datahub.ingestion.source.looker.looker_lib_wrapper import ( + LookerAPI, +) from datahub.ingestion.source.looker.looker_view_id_cache import LookerViewIdCache from datahub.ingestion.source.looker.lookml_concept_context import ( LookerFieldContext, @@ -20,7 +35,6 @@ from datahub.ingestion.source.looker.lookml_concept_context import ( ) from datahub.ingestion.source.looker.lookml_config import ( DERIVED_VIEW_SUFFIX, - NAME, LookMLSourceConfig, LookMLSourceReport, ) @@ -280,6 +294,447 @@ class AbstractViewUpstream(ABC): return upstream_column_refs +class LookerQueryAPIBasedViewUpstream(AbstractViewUpstream): + """ + Implements Looker view upstream lineage extraction using the Looker Query API. + + This class leverages the Looker API to generate the fully resolved SQL for a Looker view by constructing a WriteQuery + that includes all dimensions, dimension groups and measures. The SQL is then parsed to extract column-level lineage. + The Looker client is required for this class, as it is used to execute the WriteQuery and retrieve the SQL. + + Other view upstream implementations use string parsing to extract lineage information from the SQL, which does not cover all the edge cases. + Limitations of string based lineage extraction: Ref: https://cloud.google.com/looker/docs/reference/param-field-sql#sql_for_dimensions + + Key Features: + - Requires a Looker client (`looker_client`) to execute queries and retrieve SQL for the view. + - Requires a `view_to_explore_map` to map view names to their corresponding explore name + - Field name translation is handled: Looker API field names are constructed as `.`, and helper + methods are provided to convert between Looker API field names and raw field names. + - SQL parsing is cached for efficiency, and the class is designed to gracefully fall back if the Looker Query API fails. + - All lineage extraction is based on the SQL returned by the Looker API, ensuring accurate and up-to-date lineage. + + Why view_to_explore_map is required: + The Looker Query API expects the explore name (not the view name) as the "view" parameter in the WriteQuery. + In Looker, a view can be referenced by multiple explores, but the API needs any one of the + explores to access the view's fields + + Example WriteQuery request (see `_execute_query` for details): + { + "model": "test_model", + "view": "users_explore", # This is the explore name, not the view name + "fields": [ + "users.email", "users.lifetime_purchase_count" + ], + "limit": "1", + "cache": true + } + The SQL response is then parsed to extract upstream tables and column-level lineage. + + For further details, see the method-level docstrings, especially: + - `__get_spr`: SQL parsing and lineage extraction workflow + - `_get_sql_write_query`: WriteQuery construction and field enumeration + - `_execute_query`: Looker API invocation and SQL retrieval - this only generates the SQL query, does not execute it + - Field name translation: `_get_looker_api_field_name` and `_get_field_name_from_looker_api_field_name` + + Note: This class is intended to be robust and raise exceptions if SQL parsing or API calls fail, and will fall back to + other implementations - custom regex-based parsing if necessary. + """ + + def __init__( + self, + view_context: LookerViewContext, + looker_view_id_cache: LookerViewIdCache, + config: LookMLSourceConfig, + reporter: LookMLSourceReport, + ctx: PipelineContext, + looker_client: LookerAPI, + view_to_explore_map: Dict[str, str], + ): + super().__init__(view_context, looker_view_id_cache, config, reporter, ctx) + self.looker_client = looker_client + self.view_to_explore_map = view_to_explore_map + # Cache the SQL parsing results + # We use maxsize=1 because a new class instance is created for each view, Ref: view_upstream.create_view_upstream + self._get_spr = lru_cache(maxsize=1)(self.__get_spr) + self._get_upstream_dataset_urn = lru_cache(maxsize=1)( + self.__get_upstream_dataset_urn + ) + + # Initialize the cache + # Done to fallback to other implementations if the Looker Query API fails + self._get_spr() + + def __get_spr(self) -> SqlParsingResult: + """ + Retrieves the SQL parsing result for the current Looker view by: + 1. Building a WriteQuery for the view. + 2. Executing the query via the Looker API to get the SQL. + 3. Parsing the SQL to extract lineage information. + + Returns: + SqlParsingResult if successful, otherwise None. + Raises: + ValueError: If no SQL is found in the response. + ValueError: If no fields are found for the view. + ValueError: If explore name is not found for the view. + ValueError: If error in parsing SQL for upstream tables. + ValueError: If error in parsing SQL for column lineage. + """ + try: + # Build the WriteQuery for the current view. + sql_query: WriteQuery = self._get_sql_write_query() + + # Execute the query to get the SQL representation from Looker. + sql_response = self._execute_query(sql_query) + + # Parse the SQL to extract lineage information. + spr = create_lineage_sql_parsed_result( + query=sql_response, + default_schema=self.view_context.view_connection.default_schema, + default_db=self.view_context.view_connection.default_db, + platform=self.view_context.view_connection.platform, + platform_instance=self.view_context.view_connection.platform_instance, + env=self.view_context.view_connection.platform_env or self.config.env, + graph=self.ctx.graph, + ) + + # Check for errors encountered during table extraction. + table_error = spr.debug_info.table_error + if table_error is not None: + self.reporter.report_warning( + title="Table Level Lineage Extraction Failed", + message="Error in parsing derived sql", + context=f"View-name: {self.view_context.name()}", + exc=table_error, + ) + raise ValueError( + f"Error in parsing SQL for upstream tables: {table_error}" + ) + + column_error = spr.debug_info.column_error + if column_error is not None: + self.reporter.report_warning( + title="Column Level Lineage Extraction Failed", + message="Error in parsing derived sql", + context=f"View-name: {self.view_context.name()}", + exc=column_error, + ) + raise ValueError( + f"Error in parsing SQL for column lineage: {column_error}" + ) + + return spr + except Exception: + # Reraise the exception to allow higher-level handling. + raise + + def _get_time_dim_group_field_name(self, dim_group: dict) -> str: + """ + Time dimension groups must be referenced by their individual timeframes suffix. + Example: + dimension_group: created { + type: time + timeframes: [date, week, month] + sql: ${TABLE}.created_at ;; + } + Used as: {view_name.date_created} + + created -> created_date, created_week, created_month + # Ref: https://cloud.google.com/looker/docs/reference/param-field-dimension-group#dimension_groups_must_be_referenced_by_their_individual_dimensions + """ + dim_group_name = dim_group.get(NAME) + timeframes = dim_group.get(VIEW_FIELD_TIMEFRAMES_ATTRIBUTE) + + # If timeframes is not included (rare case), the dimension group will include all possible timeframes. + # We will pick to use "raw" + suffix = timeframes[0] if timeframes else "raw" + return f"{dim_group_name}_{suffix}" + + def _get_duration_dim_group_field_name(self, dim_group: dict) -> str: + """ + Duration dimension groups must be referenced by their plural version of the interval value as prefix + Example: + dimension_group: since_event { + type: duration + intervals: [hour, day, week, month, quarter, year] + sql_start: ${faa_event_date_raw} ;; + sql_end: CURRENT_TIMESTAMP();; + } + Used as: {view_name.hours_since_event} + + since_event -> hours_since_event, days_since_event, weeks_since_event, months_since_event, quarters_since_event, years_since_event + # Ref: https://cloud.google.com/looker/docs/reference/param-field-dimension-group#referencing_intervals_from_another_lookml_field + """ + dim_group_name = dim_group.get(NAME) + intervals = dim_group.get(VIEW_FIELD_INTERVALS_ATTRIBUTE) + + # If intervals is not included (rare case), the dimension group will include all possible intervals. + # We will pick to use "day" -> "days" + prefix = f"{intervals[0]}s" if intervals else "days" + return f"{prefix}_{dim_group_name}" + + def _get_sql_write_query(self) -> WriteQuery: + """ + Constructs a WriteQuery object to obtain the SQL representation of the current Looker view. + + We need to list all the fields for the view to get the SQL representation of the view - this fully resolved SQL for view dimensions and measures. + + The method uses the view_to_explore_map to determine the correct explore name to use in the WriteQuery. + This is crucial because the Looker Query API expects the explore name (not the view name) as the "view" parameter. + + Ref: https://cloud.google.com/looker/docs/reference/param-field-sql#sql_for_dimensions + + Returns: + WriteQuery: The WriteQuery object if fields are found and explore name is available, otherwise None. + + Raises: + ValueError: If the explore name is not found in the view_to_explore_map for the current view. + ValueError: If no fields are found for the view. + """ + + # Collect all dimension and measure fields for the view. + view_fields: List[str] = [] + # Add dimension fields in the format: . or . + for field in self.view_context.dimensions() + self.view_context.measures(): + field_name = field.get(NAME) + assert field_name # Happy linter + view_fields.append(self._get_looker_api_field_name(field_name)) + + for dim_group in self.view_context.dimension_groups(): + dim_group_type: ViewFieldDimensionGroupType = ViewFieldDimensionGroupType( + dim_group.get(VIEW_FIELD_TYPE_ATTRIBUTE) + ) + + if dim_group_type == ViewFieldDimensionGroupType.TIME: + view_fields.append( + self._get_looker_api_field_name( + self._get_time_dim_group_field_name(dim_group) + ) + ) + elif dim_group_type == ViewFieldDimensionGroupType.DURATION: + view_fields.append( + self._get_looker_api_field_name( + self._get_duration_dim_group_field_name(dim_group) + ) + ) + + # Use explore name from view_to_explore_map if available + # explore_name is always present in the view_to_explore_map because of the check in view_upstream.create_view_upstream + explore_name = self.view_to_explore_map.get(self.view_context.name()) + assert explore_name # Happy linter + + if not view_fields: + raise ValueError( + f"No fields found for view '{self.view_context.name()}'. Cannot proceed with Looker API for view lineage." + ) + + # Construct and return the WriteQuery object. + # The 'limit' is set to "1" as the query is only used to obtain SQL, not to fetch data. + return WriteQuery( + model=self.looker_view_id_cache.model_name, + view=explore_name, + fields=view_fields, + filters={}, + limit="1", + ) + + def _execute_query(self, query: WriteQuery) -> str: + """ + Executes a Looker SQL query using the Looker API and returns the SQL string. + + Ref: https://cloud.google.com/looker/docs/reference/looker-api/latest/methods/Query/run_inline_query + + Example Request: + WriteQuery: + { + "model": "test_model", + "view": "users", + "fields": [ + "users.email", "users.lifetime_purchase_count" + ], + "limit": "1", + "cache": true + } + + Response: + " + SELECT + users."EMAIL" AS "users.email", + COUNT(DISTINCT ( purchases."PK" ) ) AS "users.lifetime_purchase_count" + FROM "ECOMMERCE"."USERS" AS users + LEFT JOIN "ECOMMERCE"."PURCHASES" AS purchases ON (users."PK") = (purchases."USER_FK") + GROUP BY + 1 + ORDER BY + 2 DESC + FETCH NEXT 1 ROWS ONLY + " + Args: + query (WriteQuery): The Looker WriteQuery object to execute. + + Returns: + str: The SQL string returned by the Looker API, or an empty string if execution fails. + """ + + # Record the start time for latency measurement. + start_time = datetime.now() + + # Execute the query using the Looker client. + sql_response = self.looker_client.generate_sql_query( + write_query=query, use_cache=self.config.use_api_cache_for_view_lineage + ) + + # Record the end time after query execution. + end_time = datetime.now() + + # Attempt to get the LookerViewId for reporting. + looker_view_id: Optional[LookerViewId] = ( + self.looker_view_id_cache.get_looker_view_id( + view_name=self.view_context.name(), + base_folder_path=self.view_context.base_folder_path, + ) + ) + + # Report the query API latency if the view ID is available. + if looker_view_id is not None: + self.reporter.report_looker_query_api_latency( + looker_view_id.get_urn(self.config), + end_time - start_time, + ) + + # Validate the response structure. + if not sql_response: + raise ValueError( + f"No SQL found in response for view '{self.view_context.name()}'. Response: {sql_response}" + ) + + # Extract the SQL string from the response. + return sql_response + + def __get_upstream_dataset_urn(self) -> List[Urn]: + """ + Extract upstream dataset URNs by parsing the SQL for the current view. + + Returns: + List[Urn]: List of upstream dataset URNs, or an empty list if parsing fails. + """ + # Attempt to get the SQL parsing result for the current view. + spr: SqlParsingResult = self._get_spr() + + # Remove any 'hive.' prefix from upstream table URNs. + upstream_dataset_urns: List[str] = [ + _drop_hive_dot(urn) for urn in spr.in_tables + ] + + # Fix any derived view references present in the URNs. + upstream_dataset_urns = fix_derived_view_urn( + urns=upstream_dataset_urns, + looker_view_id_cache=self.looker_view_id_cache, + base_folder_path=self.view_context.base_folder_path, + config=self.config, + ) + + return upstream_dataset_urns + + def _get_looker_api_field_name(self, field_name: str) -> str: + """ + Translate the field name to the looker api field name + + Example: + pk -> purchases.pk + """ + return f"{self.view_context.name()}.{field_name}" + + def _get_field_name_from_looker_api_field_name( + self, looker_api_field_name: str + ) -> str: + """ + Translate the looker api field name to the field name + + Example: + purchases.pk -> pk + """ + # Remove the view name at the start and the dot from the looker_api_field_name, but only if it matches the current view name + prefix = f"{self.view_context.name()}." + if looker_api_field_name.startswith(prefix): + return looker_api_field_name[len(prefix) :] + else: + # Don't throw an error, just return the original field name + return looker_api_field_name + + def get_upstream_dataset_urn(self) -> List[Urn]: + """Get upstream dataset URNs""" + return self._get_upstream_dataset_urn() + + def get_upstream_column_ref( + self, field_context: LookerFieldContext + ) -> List[ColumnRef]: + """Return upstream column references for a given field.""" + spr: SqlParsingResult = self._get_spr() + if not spr.column_lineage: + return [] + + field_type: Optional[ViewFieldDimensionGroupType] = None + field_name = field_context.name() + try: + # Try if field is a dimension group + field_type = ViewFieldDimensionGroupType( + field_context.raw_field.get(VIEW_FIELD_TYPE_ATTRIBUTE) + ) + + if field_type == ViewFieldDimensionGroupType.TIME: + field_name = self._get_time_dim_group_field_name( + field_context.raw_field + ) + elif field_type == ViewFieldDimensionGroupType.DURATION: + field_name = self._get_duration_dim_group_field_name( + field_context.raw_field + ) + + except Exception: + # Not a dimension group, no modification needed + logger.debug( + f"view-name={self.view_context.name()}, field-name={field_name}, field-type={field_context.raw_field.get(VIEW_FIELD_TYPE_ATTRIBUTE)}" + ) + + field_api_name = self._get_looker_api_field_name(field_name).lower() + + upstream_refs: List[ColumnRef] = [] + + for lineage in spr.column_lineage: + if lineage.downstream.column.lower() == field_api_name: + for upstream in lineage.upstreams: + upstream_refs.append( + ColumnRef(table=upstream.table, column=upstream.column) + ) + + return _drop_hive_dot_from_upstream(upstream_refs) + + def create_fields(self) -> List[ViewField]: + """Create ViewField objects from SQL parsing result.""" + spr: SqlParsingResult = self._get_spr() + + if not spr.column_lineage: + return [] + + fields: List[ViewField] = [] + + for lineage in spr.column_lineage: + fields.append( + ViewField( + name=self._get_field_name_from_looker_api_field_name( + lineage.downstream.column + ), + label="", + type=lineage.downstream.native_column_type or "unknown", + description="", + field_type=ViewFieldType.UNKNOWN, + upstream_fields=_drop_hive_dot_from_upstream(lineage.upstreams), + ) + ) + return fields + + class SqlBasedDerivedViewUpstream(AbstractViewUpstream, ABC): """ Handle the case where upstream dataset is defined in derived_table.sql @@ -674,7 +1129,45 @@ def create_view_upstream( config: LookMLSourceConfig, ctx: PipelineContext, reporter: LookMLSourceReport, + looker_client: Optional["LookerAPI"] = None, + view_to_explore_map: Optional[Dict[str, str]] = None, ) -> AbstractViewUpstream: + # Looker client is required for LookerQueryAPIBasedViewUpstream also enforced by config.use_api_for_view_lineage + # view_to_explore_map is required for Looker query API args + # Only process if view exists in view_to_explore_map, because we cannot query views which are not reachable from an explore + if ( + config.use_api_for_view_lineage + and looker_client + and view_to_explore_map + and view_context.name() in view_to_explore_map + ): + try: + return LookerQueryAPIBasedViewUpstream( + view_context=view_context, + config=config, + reporter=reporter, + ctx=ctx, + looker_view_id_cache=looker_view_id_cache, + looker_client=looker_client, + view_to_explore_map=view_to_explore_map, + ) + except Exception as e: + # Falling back to custom regex-based parsing - best effort approach + reporter.report_warning( + title="Looker Query API based View Upstream Failed", + message="Error in getting upstream lineage for view using Looker Query API", + context=f"View-name: {view_context.name()}", + exc=e, + ) + else: + logger.debug( + f"Skipping Looker Query API for view: {view_context.name()} because one or more conditions are not met: " + f"use_api_for_view_lineage={config.use_api_for_view_lineage}, " + f"looker_client={'set' if looker_client else 'not set'}, " + f"view_to_explore_map={'set' if view_to_explore_map else 'not set'}, " + f"view_in_view_to_explore_map={view_context.name() in view_to_explore_map if view_to_explore_map else False}" + ) + if view_context.is_regular_case(): return RegularViewUpstream( view_context=view_context, diff --git a/metadata-ingestion/tests/integration/looker/test_looker.py b/metadata-ingestion/tests/integration/looker/test_looker.py index 2d842e9588..d128da4174 100644 --- a/metadata-ingestion/tests/integration/looker/test_looker.py +++ b/metadata-ingestion/tests/integration/looker/test_looker.py @@ -590,7 +590,10 @@ def setup_mock_all_user(mocked_client): def side_effect_query_inline( - result_format: str, body: WriteQuery, transport_options: Optional[TransportOptions] + result_format: str, + body: WriteQuery, + transport_options: Optional[TransportOptions], + cache: Optional[bool] = None, ) -> str: query_type: looker_usage.QueryId if result_format == "sql": diff --git a/metadata-ingestion/tests/integration/lookml/lkml_col_lineage_looker_api_based_golden.json b/metadata-ingestion/tests/integration/lookml/lkml_col_lineage_looker_api_based_golden.json new file mode 100644 index 0000000000..cf82e255a7 --- /dev/null +++ b/metadata-ingestion/tests/integration/lookml/lkml_col_lineage_looker_api_based_golden.json @@ -0,0 +1,1670 @@ +[ +{ + "entityType": "container", + "entityUrn": "urn:li:container:ddf73728714092b1445bb6e8d6062491", + "changeType": "UPSERT", + "aspectName": "containerProperties", + "aspect": { + "json": { + "customProperties": { + "platform": "looker", + "env": "PROD", + "project_name": "lkml_col_lineage_sample" + }, + "name": "lkml_col_lineage_sample", + "env": "PROD" + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "lookml-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:ddf73728714092b1445bb6e8d6062491", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:looker" + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "lookml-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:ddf73728714092b1445bb6e8d6062491", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "LookML Project" + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "lookml-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:ddf73728714092b1445bb6e8d6062491", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [ + { + "id": "Folders" + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "lookml-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_col_lineage_sample.view.purchases,PROD)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "lookml-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_col_lineage_sample.view.purchases,PROD)", + "changeType": "UPSERT", + "aspectName": "viewProperties", + "aspect": { + "json": { + "materialized": false, + "viewLogic": "# The name of this view in Looker is \"Purchases\"\nview: purchases {\n # The sql_table_name parameter indicates the underlying database table\n # to be used for all fields in this view.\n sql_table_name: \"ECOMMERCE\".\"PURCHASES\" ;;\n\n # No primary key is defined for this view. In order to join this view in an Explore,\n # define primary_key: yes on a dimension that has no repeated values.\n\n # Dates and timestamps can be represented in Looker using a dimension group of type: time.\n # Looker converts dates and timestamps to the specified timeframes within the dimension group.\n\n dimension_group: created {\n type: time\n timeframes: [raw, time, date, week, month, quarter, year]\n sql: ${TABLE}.\"CREATED_AT\" ;;\n }\n # Here's what a typical dimension looks like in LookML.\n # A dimension is a groupable field that can be used to filter query results.\n # This dimension will be called \"Pk\" in Explore.\n\n dimension: pk {\n primary_key: yes\n type: number\n sql: ${TABLE}.\"PK\" ;;\n }\n\n dimension: purchase_amount {\n type: number\n sql: ${TABLE}.\"PURCHASE_AMOUNT\" ;;\n }\n\n dimension: status {\n type: string\n sql: ${TABLE}.\"STATUS\" ;;\n }\n\n dimension: tax_amount {\n type: number\n sql: ${TABLE}.\"TAX_AMOUNT\" ;;\n }\n\n dimension: total_amount {\n type: number\n sql: ${TABLE}.\"TOTAL_AMOUNT\" ;;\n }\n\n dimension_group: updated {\n type: time\n timeframes: [raw, time, date, week, month, quarter, year]\n sql: ${TABLE}.\"UPDATED_AT\" ;;\n }\n\n dimension: user_fk {\n type: number\n sql: ${TABLE}.\"USER_FK\" ;;\n }\n\n # Inter View Dimension References\n dimension: is_expensive_purchase {\n type: yesno\n sql: ${total_amount} > 100 ;;\n }\n\n # Inter View Nested Dimension References\n measure: num_of_expensive_purchases {\n type: count\n drill_fields: [is_expensive_purchase]\n }\n\n # Intra View Dimension Reference\n dimension: user_email{\n type: string\n sql:${users.email} ;;\n }\n\n measure: average_purchase_value{\n type: average\n sql: ${total_amount} ;;\n value_format_name: usd\n }\n\n measure: count {\n type: count\n }\n\n dimension_group: purchase_age {\n type: duration\n sql_start: ${TABLE}.\"CREATED_AT\" ;;\n sql_end: CURRENT_TIMESTAMP ;;\n intervals: [day, week, month, quarter, year]\n }\n}\n", + "viewLanguage": "lookml" + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "lookml-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_col_lineage_sample.view.purchases,PROD)", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:looker" + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "lookml-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_col_lineage_sample.view.purchases,PROD)", + "changeType": "UPSERT", + "aspectName": "schemaMetadata", + "aspect": { + "json": { + "schemaName": "purchases", + "platform": "urn:li:dataPlatform:looker", + "version": 0, + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "hash": "", + "platformSchema": { + "com.linkedin.schema.OtherSchema": { + "rawSchema": "" + } + }, + "fields": [ + { + "fieldPath": "pk", + "nullable": false, + "description": "", + "label": "", + "type": { + "type": { + "com.linkedin.schema.NumberType": {} + } + }, + "nativeDataType": "number", + "recursive": false, + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Dimension" + } + ] + }, + "isPartOfKey": true + }, + { + "fieldPath": "purchase_amount", + "nullable": false, + "description": "", + "label": "", + "type": { + "type": { + "com.linkedin.schema.NumberType": {} + } + }, + "nativeDataType": "number", + "recursive": false, + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Dimension" + } + ] + }, + "isPartOfKey": false + }, + { + "fieldPath": "status", + "nullable": false, + "description": "", + "label": "", + "type": { + "type": { + "com.linkedin.schema.StringType": {} + } + }, + "nativeDataType": "string", + "recursive": false, + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Dimension" + } + ] + }, + "isPartOfKey": false + }, + { + "fieldPath": "tax_amount", + "nullable": false, + "description": "", + "label": "", + "type": { + "type": { + "com.linkedin.schema.NumberType": {} + } + }, + "nativeDataType": "number", + "recursive": false, + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Dimension" + } + ] + }, + "isPartOfKey": false + }, + { + "fieldPath": "total_amount", + "nullable": false, + "description": "", + "label": "", + "type": { + "type": { + "com.linkedin.schema.NumberType": {} + } + }, + "nativeDataType": "number", + "recursive": false, + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Dimension" + } + ] + }, + "isPartOfKey": false + }, + { + "fieldPath": "user_fk", + "nullable": false, + "description": "", + "label": "", + "type": { + "type": { + "com.linkedin.schema.NumberType": {} + } + }, + "nativeDataType": "number", + "recursive": false, + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Dimension" + } + ] + }, + "isPartOfKey": false + }, + { + "fieldPath": "is_expensive_purchase", + "nullable": false, + "description": "", + "label": "", + "type": { + "type": { + "com.linkedin.schema.BooleanType": {} + } + }, + "nativeDataType": "yesno", + "recursive": false, + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Dimension" + } + ] + }, + "isPartOfKey": false + }, + { + "fieldPath": "user_email", + "nullable": false, + "description": "", + "label": "", + "type": { + "type": { + "com.linkedin.schema.StringType": {} + } + }, + "nativeDataType": "string", + "recursive": false, + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Dimension" + } + ] + }, + "isPartOfKey": false + }, + { + "fieldPath": "created", + "nullable": false, + "description": "", + "label": "", + "type": { + "type": { + "com.linkedin.schema.TimeType": {} + } + }, + "nativeDataType": "time", + "recursive": false, + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Dimension" + }, + { + "tag": "urn:li:tag:Temporal" + } + ] + }, + "isPartOfKey": false + }, + { + "fieldPath": "updated", + "nullable": false, + "description": "", + "label": "", + "type": { + "type": { + "com.linkedin.schema.TimeType": {} + } + }, + "nativeDataType": "time", + "recursive": false, + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Dimension" + }, + { + "tag": "urn:li:tag:Temporal" + } + ] + }, + "isPartOfKey": false + }, + { + "fieldPath": "purchase_age", + "nullable": false, + "description": "", + "label": "", + "type": { + "type": { + "com.linkedin.schema.NumberType": {} + } + }, + "nativeDataType": "duration", + "recursive": false, + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Dimension" + }, + { + "tag": "urn:li:tag:Temporal" + } + ] + }, + "isPartOfKey": false + }, + { + "fieldPath": "num_of_expensive_purchases", + "nullable": false, + "description": "", + "label": "", + "type": { + "type": { + "com.linkedin.schema.NumberType": {} + } + }, + "nativeDataType": "count", + "recursive": false, + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Measure" + } + ] + }, + "isPartOfKey": false + }, + { + "fieldPath": "average_purchase_value", + "nullable": false, + "description": "", + "label": "", + "type": { + "type": { + "com.linkedin.schema.NumberType": {} + } + }, + "nativeDataType": "average", + "recursive": false, + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Measure" + } + ] + }, + "isPartOfKey": false + }, + { + "fieldPath": "count", + "nullable": false, + "description": "", + "label": "", + "type": { + "type": { + "com.linkedin.schema.NumberType": {} + } + }, + "nativeDataType": "count", + "recursive": false, + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Measure" + } + ] + }, + "isPartOfKey": false + } + ], + "primaryKeys": [ + "pk" + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "lookml-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_col_lineage_sample.view.purchases,PROD)", + "changeType": "UPSERT", + "aspectName": "upstreamLineage", + "aspect": { + "json": { + "upstreams": [ + { + "auditStamp": { + "time": 1586847600000, + "actor": "urn:li:corpuser:datahub" + }, + "dataset": "urn:li:dataset:(urn:li:dataPlatform:postgres,my_database.ecommerce.purchases,PROD)", + "type": "VIEW" + }, + { + "auditStamp": { + "time": 1586847600000, + "actor": "urn:li:corpuser:datahub" + }, + "dataset": "urn:li:dataset:(urn:li:dataPlatform:postgres,my_database.ecommerce.users,PROD)", + "type": "VIEW" + } + ], + "fineGrainedLineages": [ + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,my_database.ecommerce.purchases,PROD),PK)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:looker,lkml_col_lineage_sample.view.purchases,PROD),pk)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,my_database.ecommerce.purchases,PROD),PURCHASE_AMOUNT)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:looker,lkml_col_lineage_sample.view.purchases,PROD),purchase_amount)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,my_database.ecommerce.purchases,PROD),STATUS)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:looker,lkml_col_lineage_sample.view.purchases,PROD),status)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,my_database.ecommerce.purchases,PROD),TAX_AMOUNT)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:looker,lkml_col_lineage_sample.view.purchases,PROD),tax_amount)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,my_database.ecommerce.purchases,PROD),TOTAL_AMOUNT)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:looker,lkml_col_lineage_sample.view.purchases,PROD),total_amount)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,my_database.ecommerce.purchases,PROD),USER_FK)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:looker,lkml_col_lineage_sample.view.purchases,PROD),user_fk)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,my_database.ecommerce.purchases,PROD),TOTAL_AMOUNT)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:looker,lkml_col_lineage_sample.view.purchases,PROD),is_expensive_purchase)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,my_database.ecommerce.users,PROD),EMAIL)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:looker,lkml_col_lineage_sample.view.purchases,PROD),user_email)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,my_database.ecommerce.purchases,PROD),CREATED_AT)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:looker,lkml_col_lineage_sample.view.purchases,PROD),created)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,my_database.ecommerce.purchases,PROD),UPDATED_AT)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:looker,lkml_col_lineage_sample.view.purchases,PROD),updated)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,my_database.ecommerce.purchases,PROD),CREATED_AT)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:looker,lkml_col_lineage_sample.view.purchases,PROD),purchase_age)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,my_database.ecommerce.purchases,PROD),PK)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:looker,lkml_col_lineage_sample.view.purchases,PROD),num_of_expensive_purchases)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,my_database.ecommerce.purchases,PROD),TOTAL_AMOUNT)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:looker,lkml_col_lineage_sample.view.purchases,PROD),average_purchase_value)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,my_database.ecommerce.purchases,PROD),PK)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:looker,lkml_col_lineage_sample.view.purchases,PROD),count)" + ], + "confidenceScore": 1.0 + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "lookml-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_col_lineage_sample.view.purchases,PROD)", + "changeType": "UPSERT", + "aspectName": "datasetProperties", + "aspect": { + "json": { + "customProperties": { + "looker.file.path": "views/purchases.view.lkml", + "looker.model": "dev_project" + }, + "name": "purchases", + "tags": [] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "lookml-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_col_lineage_sample.view.purchases,PROD)", + "changeType": "UPSERT", + "aspectName": "container", + "aspect": { + "json": { + "container": "urn:li:container:ddf73728714092b1445bb6e8d6062491" + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "lookml-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_col_lineage_sample.view.purchases,PROD)", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "View" + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "lookml-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_col_lineage_sample.view.purchases,PROD)", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [ + { + "id": "Develop" + }, + { + "id": "urn:li:container:ddf73728714092b1445bb6e8d6062491", + "urn": "urn:li:container:ddf73728714092b1445bb6e8d6062491" + }, + { + "id": "views" + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "lookml-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_col_lineage_sample.view.user_metrics,PROD)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "lookml-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_col_lineage_sample.view.user_metrics,PROD)", + "changeType": "UPSERT", + "aspectName": "viewProperties", + "aspect": { + "json": { + "materialized": false, + "viewLogic": "SELECT\n user_fk as user_id,\n COUNT(DISTINCT pk) as purchase_count,\n SUM(total_amount) as total_spent\n FROM ${purchases.SQL_TABLE_NAME}\n GROUP BY user_id", + "viewLanguage": "sql" + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "lookml-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_col_lineage_sample.view.user_metrics,PROD)", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:looker" + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "lookml-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_col_lineage_sample.view.user_metrics,PROD)", + "changeType": "UPSERT", + "aspectName": "schemaMetadata", + "aspect": { + "json": { + "schemaName": "user_metrics", + "platform": "urn:li:dataPlatform:looker", + "version": 0, + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "hash": "", + "platformSchema": { + "com.linkedin.schema.OtherSchema": { + "rawSchema": "" + } + }, + "fields": [ + { + "fieldPath": "user_id", + "nullable": false, + "description": "", + "label": "", + "type": { + "type": { + "com.linkedin.schema.NumberType": {} + } + }, + "nativeDataType": "number", + "recursive": false, + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Dimension" + } + ] + }, + "isPartOfKey": true + }, + { + "fieldPath": "purchase_count", + "nullable": false, + "description": "", + "label": "", + "type": { + "type": { + "com.linkedin.schema.NumberType": {} + } + }, + "nativeDataType": "number", + "recursive": false, + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Dimension" + } + ] + }, + "isPartOfKey": false + }, + { + "fieldPath": "total_spent", + "nullable": false, + "description": "", + "label": "", + "type": { + "type": { + "com.linkedin.schema.NumberType": {} + } + }, + "nativeDataType": "number", + "recursive": false, + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Dimension" + } + ] + }, + "isPartOfKey": false + }, + { + "fieldPath": "customer_segment", + "nullable": false, + "description": "", + "label": "", + "type": { + "type": { + "com.linkedin.schema.StringType": {} + } + }, + "nativeDataType": "string", + "recursive": false, + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Dimension" + } + ] + }, + "isPartOfKey": false + }, + { + "fieldPath": "high_value_customer_count", + "nullable": false, + "description": "Count of customers who spent over $1000", + "label": "", + "type": { + "type": { + "com.linkedin.schema.NumberType": {} + } + }, + "nativeDataType": "count_distinct", + "recursive": false, + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Measure" + } + ] + }, + "isPartOfKey": false + } + ], + "primaryKeys": [ + "user_id" + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "lookml-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_col_lineage_sample.view.user_metrics,PROD)", + "changeType": "UPSERT", + "aspectName": "upstreamLineage", + "aspect": { + "json": { + "upstreams": [ + { + "auditStamp": { + "time": 1586847600000, + "actor": "urn:li:corpuser:datahub" + }, + "dataset": "urn:li:dataset:(urn:li:dataPlatform:postgres,my_database.ecommerce.purchases,PROD)", + "type": "VIEW" + }, + { + "auditStamp": { + "time": 1586847600000, + "actor": "urn:li:corpuser:datahub" + }, + "dataset": "urn:li:dataset:(urn:li:dataPlatform:postgres,my_database.ecommerce.users,PROD)", + "type": "VIEW" + } + ], + "fineGrainedLineages": [ + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,my_database.ecommerce.purchases,PROD),user_fk)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:looker,lkml_col_lineage_sample.view.user_metrics,PROD),user_id)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,my_database.ecommerce.purchases,PROD),pk)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:looker,lkml_col_lineage_sample.view.user_metrics,PROD),purchase_count)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,my_database.ecommerce.purchases,PROD),total_amount)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:looker,lkml_col_lineage_sample.view.user_metrics,PROD),total_spent)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,my_database.ecommerce.purchases,PROD),total_amount)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:looker,lkml_col_lineage_sample.view.user_metrics,PROD),customer_segment)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,my_database.ecommerce.purchases,PROD),total_amount)", + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,my_database.ecommerce.users,PROD),PK)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:looker,lkml_col_lineage_sample.view.user_metrics,PROD),high_value_customer_count)" + ], + "confidenceScore": 1.0 + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "lookml-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_col_lineage_sample.view.user_metrics,PROD)", + "changeType": "UPSERT", + "aspectName": "datasetProperties", + "aspect": { + "json": { + "customProperties": { + "looker.file.path": "views/user_metrics.view.lkml", + "looker.model": "dev_project" + }, + "name": "user_metrics", + "tags": [] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "lookml-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_col_lineage_sample.view.user_metrics,PROD)", + "changeType": "UPSERT", + "aspectName": "container", + "aspect": { + "json": { + "container": "urn:li:container:ddf73728714092b1445bb6e8d6062491" + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "lookml-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_col_lineage_sample.view.user_metrics,PROD)", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "View" + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "lookml-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_col_lineage_sample.view.user_metrics,PROD)", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [ + { + "id": "Develop" + }, + { + "id": "urn:li:container:ddf73728714092b1445bb6e8d6062491", + "urn": "urn:li:container:ddf73728714092b1445bb6e8d6062491" + }, + { + "id": "views" + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "lookml-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_col_lineage_sample.view.users,PROD)", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "lookml-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_col_lineage_sample.view.users,PROD)", + "changeType": "UPSERT", + "aspectName": "viewProperties", + "aspect": { + "json": { + "materialized": false, + "viewLogic": "# The name of this view in Looker is \"Users\"\nview: users {\n # The sql_table_name parameter indicates the underlying database table\n # to be used for all fields in this view.\n sql_table_name: \"ECOMMERCE\".\"USERS\" ;;\n\n # No primary key is defined for this view. In order to join this view in an Explore,\n # define primary_key: yes on a dimension that has no repeated values.\n\n # Dates and timestamps can be represented in Looker using a dimension group of type: time.\n # Looker converts dates and timestamps to the specified timeframes within the dimension group.\n\n dimension_group: created {\n type: time\n timeframes: [raw, time, date, week, month, quarter, year]\n sql: ${TABLE}.\"CREATED_AT\" ;;\n }\n # Here's what a typical dimension looks like in LookML.\n # A dimension is a groupable field that can be used to filter query results.\n # This dimension will be called \"Email\" in Explore.\n\n dimension: email {\n type: string\n sql: ${TABLE}.\"EMAIL\" ;;\n }\n\n dimension: pk {\n primary_key: yes\n type: number\n sql: ${TABLE}.\"PK\" ;;\n }\n\n dimension_group: updated {\n type: time\n timeframes: [raw, time, date, week, month, quarter, year]\n sql: ${TABLE}.\"UPDATED_AT\" ;;\n }\n\n measure: lifetime_purchase_count{\n type: count_distinct\n sql: ${purchases.pk} ;;\n description: \"Total lifetime purchases count by user\"\n }\n\n measure: lifetime_total_purchase_amount{\n type: sum\n sql: ${purchases.total_amount};;\n value_format_name: usd\n description: \"Total lifetime revenue from purchases by user\"\n }\n\n dimension: user_purchase_status{\n type: string\n sql:\n CASE\n WHEN ${user_metrics.purchase_count} <= 1 THEN 'First Purchase'\n WHEN ${user_metrics.purchase_count} <= 3 THEN 'Early Customer'\n WHEN ${user_metrics.purchase_count} <= 10 THEN 'Regular Customer'\n ELSE 'Loyal Customer'\n END ;;\n }\n\n dimension_group: user_age {\n type: duration\n sql_start: ${TABLE}.\"CREATED_AT\" ;;\n sql_end: CURRENT_TIMESTAMP ;;\n intervals: [day, week, month, quarter, year]\n }\n\n measure: count {\n type: count\n }\n}\n", + "viewLanguage": "lookml" + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "lookml-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_col_lineage_sample.view.users,PROD)", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:looker" + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "lookml-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_col_lineage_sample.view.users,PROD)", + "changeType": "UPSERT", + "aspectName": "schemaMetadata", + "aspect": { + "json": { + "schemaName": "users", + "platform": "urn:li:dataPlatform:looker", + "version": 0, + "created": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:unknown" + }, + "hash": "", + "platformSchema": { + "com.linkedin.schema.OtherSchema": { + "rawSchema": "" + } + }, + "fields": [ + { + "fieldPath": "email", + "nullable": false, + "description": "", + "label": "", + "type": { + "type": { + "com.linkedin.schema.StringType": {} + } + }, + "nativeDataType": "string", + "recursive": false, + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Dimension" + } + ] + }, + "isPartOfKey": false + }, + { + "fieldPath": "pk", + "nullable": false, + "description": "", + "label": "", + "type": { + "type": { + "com.linkedin.schema.NumberType": {} + } + }, + "nativeDataType": "number", + "recursive": false, + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Dimension" + } + ] + }, + "isPartOfKey": true + }, + { + "fieldPath": "user_purchase_status", + "nullable": false, + "description": "", + "label": "", + "type": { + "type": { + "com.linkedin.schema.StringType": {} + } + }, + "nativeDataType": "string", + "recursive": false, + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Dimension" + } + ] + }, + "isPartOfKey": false + }, + { + "fieldPath": "created", + "nullable": false, + "description": "", + "label": "", + "type": { + "type": { + "com.linkedin.schema.TimeType": {} + } + }, + "nativeDataType": "time", + "recursive": false, + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Dimension" + }, + { + "tag": "urn:li:tag:Temporal" + } + ] + }, + "isPartOfKey": false + }, + { + "fieldPath": "updated", + "nullable": false, + "description": "", + "label": "", + "type": { + "type": { + "com.linkedin.schema.TimeType": {} + } + }, + "nativeDataType": "time", + "recursive": false, + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Dimension" + }, + { + "tag": "urn:li:tag:Temporal" + } + ] + }, + "isPartOfKey": false + }, + { + "fieldPath": "user_age", + "nullable": false, + "description": "", + "label": "", + "type": { + "type": { + "com.linkedin.schema.NumberType": {} + } + }, + "nativeDataType": "duration", + "recursive": false, + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Dimension" + }, + { + "tag": "urn:li:tag:Temporal" + } + ] + }, + "isPartOfKey": false + }, + { + "fieldPath": "lifetime_purchase_count", + "nullable": false, + "description": "Total lifetime purchases count by user", + "label": "", + "type": { + "type": { + "com.linkedin.schema.NumberType": {} + } + }, + "nativeDataType": "count_distinct", + "recursive": false, + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Measure" + } + ] + }, + "isPartOfKey": false + }, + { + "fieldPath": "lifetime_total_purchase_amount", + "nullable": false, + "description": "Total lifetime revenue from purchases by user", + "label": "", + "type": { + "type": { + "com.linkedin.schema.NumberType": {} + } + }, + "nativeDataType": "sum", + "recursive": false, + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Measure" + } + ] + }, + "isPartOfKey": false + }, + { + "fieldPath": "count", + "nullable": false, + "description": "", + "label": "", + "type": { + "type": { + "com.linkedin.schema.NumberType": {} + } + }, + "nativeDataType": "count", + "recursive": false, + "globalTags": { + "tags": [ + { + "tag": "urn:li:tag:Measure" + } + ] + }, + "isPartOfKey": false + } + ], + "primaryKeys": [ + "pk" + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "lookml-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_col_lineage_sample.view.users,PROD)", + "changeType": "UPSERT", + "aspectName": "upstreamLineage", + "aspect": { + "json": { + "upstreams": [ + { + "auditStamp": { + "time": 1586847600000, + "actor": "urn:li:corpuser:datahub" + }, + "dataset": "urn:li:dataset:(urn:li:dataPlatform:postgres,my_database.ecommerce.purchases,PROD)", + "type": "VIEW" + }, + { + "auditStamp": { + "time": 1586847600000, + "actor": "urn:li:corpuser:datahub" + }, + "dataset": "urn:li:dataset:(urn:li:dataPlatform:postgres,my_database.ecommerce.users,PROD)", + "type": "VIEW" + } + ], + "fineGrainedLineages": [ + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,my_database.ecommerce.users,PROD),EMAIL)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:looker,lkml_col_lineage_sample.view.users,PROD),email)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,my_database.ecommerce.users,PROD),PK)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:looker,lkml_col_lineage_sample.view.users,PROD),pk)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,my_database.ecommerce.purchases,PROD),pk)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:looker,lkml_col_lineage_sample.view.users,PROD),user_purchase_status)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,my_database.ecommerce.users,PROD),CREATED_AT)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:looker,lkml_col_lineage_sample.view.users,PROD),created)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,my_database.ecommerce.users,PROD),UPDATED_AT)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:looker,lkml_col_lineage_sample.view.users,PROD),updated)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,my_database.ecommerce.users,PROD),CREATED_AT)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:looker,lkml_col_lineage_sample.view.users,PROD),user_age)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,my_database.ecommerce.purchases,PROD),PK)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:looker,lkml_col_lineage_sample.view.users,PROD),lifetime_purchase_count)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,my_database.ecommerce.purchases,PROD),TOTAL_AMOUNT)", + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,my_database.ecommerce.users,PROD),PK)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:looker,lkml_col_lineage_sample.view.users,PROD),lifetime_total_purchase_amount)" + ], + "confidenceScore": 1.0 + }, + { + "upstreamType": "FIELD_SET", + "upstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:postgres,my_database.ecommerce.users,PROD),PK)" + ], + "downstreamType": "FIELD", + "downstreams": [ + "urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:looker,lkml_col_lineage_sample.view.users,PROD),count)" + ], + "confidenceScore": 1.0 + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "lookml-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_col_lineage_sample.view.users,PROD)", + "changeType": "UPSERT", + "aspectName": "datasetProperties", + "aspect": { + "json": { + "customProperties": { + "looker.file.path": "views/users.view.lkml", + "looker.model": "dev_project" + }, + "name": "users", + "tags": [] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "lookml-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_col_lineage_sample.view.users,PROD)", + "changeType": "UPSERT", + "aspectName": "container", + "aspect": { + "json": { + "container": "urn:li:container:ddf73728714092b1445bb6e8d6062491" + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "lookml-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_col_lineage_sample.view.users,PROD)", + "changeType": "UPSERT", + "aspectName": "subTypes", + "aspect": { + "json": { + "typeNames": [ + "View" + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "lookml-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_col_lineage_sample.view.users,PROD)", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [ + { + "id": "Develop" + }, + { + "id": "urn:li:container:ddf73728714092b1445bb6e8d6062491", + "urn": "urn:li:container:ddf73728714092b1445bb6e8d6062491" + }, + { + "id": "views" + } + ] + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "lookml-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "container", + "entityUrn": "urn:li:container:ddf73728714092b1445bb6e8d6062491", + "changeType": "UPSERT", + "aspectName": "status", + "aspect": { + "json": { + "removed": false + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "lookml-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "tag", + "entityUrn": "urn:li:tag:Dimension", + "changeType": "UPSERT", + "aspectName": "tagKey", + "aspect": { + "json": { + "name": "Dimension" + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "lookml-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "tag", + "entityUrn": "urn:li:tag:Measure", + "changeType": "UPSERT", + "aspectName": "tagKey", + "aspect": { + "json": { + "name": "Measure" + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "lookml-test", + "lastRunId": "no-run-id-provided" + } +}, +{ + "entityType": "tag", + "entityUrn": "urn:li:tag:Temporal", + "changeType": "UPSERT", + "aspectName": "tagKey", + "aspect": { + "json": { + "name": "Temporal" + } + }, + "systemMetadata": { + "lastObserved": 1586847600000, + "runId": "lookml-test", + "lastRunId": "no-run-id-provided" + } +} +] \ No newline at end of file diff --git a/metadata-ingestion/tests/integration/lookml/lkml_col_lineage_sample/models/dev_project.model.lkml b/metadata-ingestion/tests/integration/lookml/lkml_col_lineage_sample/models/dev_project.model.lkml new file mode 100644 index 0000000000..097dc52836 --- /dev/null +++ b/metadata-ingestion/tests/integration/lookml/lkml_col_lineage_sample/models/dev_project.model.lkml @@ -0,0 +1,83 @@ +# Define the database connection to be used for this model. +connection: "long-tail-companions-snowflake" + +# include all the views +include: "/views/**/*.view.lkml" + +# Datagroups define a caching policy for an Explore. To learn more, +# use the Quick Help panel on the right to see documentation. + +datagroup: dev_project_default_datagroup { + # sql_trigger: SELECT MAX(id) FROM etl_log;; + max_cache_age: "1 hour" +} + +persist_with: dev_project_default_datagroup + +explore: purchases { + join: users { + type: left_outer + sql_on: ${purchases.user_fk} = ${users.pk} ;; + relationship: many_to_one + } + + join: user_metrics{ + type: left_outer + sql_on:${user_metrics.user_id} = ${users.pk} ;; + relationship: many_to_one + } +} + +# explore: users{ +# join: purchases { +# type: left_outer +# sql_on: ${users.pk} = ${purchases.user_fk} ;; +# relationship: one_to_many +# } + +# join: user_metrics{ +# type: left_outer +# sql_on:${user_metrics.user_id} = ${users.pk} ;; +# relationship: many_to_one +# } +# } + +explore: user_metrics { + description: "Analyze customer segments, lifetime value, and purchasing patterns" + + join: users { + type: inner + sql_on: ${user_metrics.user_id} = ${users.pk} ;; + relationship: many_to_one + } + + join: purchases{ + type: left_outer + sql_on: ${user_metrics.user_id} = ${purchases.user_fk} ;; + relationship: one_to_many + } +} + +explore: customer_analysis{ + from: users + description: "Customer analysis and demographics" + + join: purchases { + type: left_outer + sql_on: ${customer_analysis.pk} = ${purchases.user_fk} ;; + relationship: one_to_many + } + + join: user_metrics{ + type: left_outer + sql_on:${user_metrics.user_id} = ${customer_analysis.pk} ;; + relationship: one_to_one + + } + + join: users { + type: inner + sql_on: ${customer_analysis.pk} = ${users.pk} ;; + relationship: one_to_one + } +} diff --git a/metadata-ingestion/tests/integration/lookml/lkml_col_lineage_sample/views/purchases.view.lkml b/metadata-ingestion/tests/integration/lookml/lkml_col_lineage_sample/views/purchases.view.lkml new file mode 100644 index 0000000000..3c4693e319 --- /dev/null +++ b/metadata-ingestion/tests/integration/lookml/lkml_col_lineage_sample/views/purchases.view.lkml @@ -0,0 +1,93 @@ +# The name of this view in Looker is "Purchases" +view: purchases { + # The sql_table_name parameter indicates the underlying database table + # to be used for all fields in this view. + sql_table_name: "ECOMMERCE"."PURCHASES" ;; + + # No primary key is defined for this view. In order to join this view in an Explore, + # define primary_key: yes on a dimension that has no repeated values. + + # Dates and timestamps can be represented in Looker using a dimension group of type: time. + # Looker converts dates and timestamps to the specified timeframes within the dimension group. + + dimension_group: created { + type: time + timeframes: [raw, time, date, week, month, quarter, year] + sql: ${TABLE}."CREATED_AT" ;; + } + # Here's what a typical dimension looks like in LookML. + # A dimension is a groupable field that can be used to filter query results. + # This dimension will be called "Pk" in Explore. + + dimension: pk { + primary_key: yes + type: number + sql: ${TABLE}."PK" ;; + } + + dimension: purchase_amount { + type: number + sql: ${TABLE}."PURCHASE_AMOUNT" ;; + } + + dimension: status { + type: string + sql: ${TABLE}."STATUS" ;; + } + + dimension: tax_amount { + type: number + sql: ${TABLE}."TAX_AMOUNT" ;; + } + + dimension: total_amount { + type: number + sql: ${TABLE}."TOTAL_AMOUNT" ;; + } + + dimension_group: updated { + type: time + timeframes: [raw, time, date, week, month, quarter, year] + sql: ${TABLE}."UPDATED_AT" ;; + } + + dimension: user_fk { + type: number + sql: ${TABLE}."USER_FK" ;; + } + + # Inter View Dimension References + dimension: is_expensive_purchase { + type: yesno + sql: ${total_amount} > 100 ;; + } + + # Inter View Nested Dimension References + measure: num_of_expensive_purchases { + type: count + drill_fields: [is_expensive_purchase] + } + + # Intra View Dimension Reference + dimension: user_email{ + type: string + sql:${users.email} ;; + } + + measure: average_purchase_value{ + type: average + sql: ${total_amount} ;; + value_format_name: usd + } + + measure: count { + type: count + } + + dimension_group: purchase_age { + type: duration + sql_start: ${TABLE}."CREATED_AT" ;; + sql_end: CURRENT_TIMESTAMP ;; + intervals: [day, week, month, quarter, year] + } +} diff --git a/metadata-ingestion/tests/integration/lookml/lkml_col_lineage_sample/views/user_metrics.view.lkml b/metadata-ingestion/tests/integration/lookml/lkml_col_lineage_sample/views/user_metrics.view.lkml new file mode 100644 index 0000000000..a1ee56c9a4 --- /dev/null +++ b/metadata-ingestion/tests/integration/lookml/lkml_col_lineage_sample/views/user_metrics.view.lkml @@ -0,0 +1,43 @@ +view: user_metrics { + derived_table: { + sql: SELECT + user_fk as user_id, + COUNT(DISTINCT pk) as purchase_count, + SUM(total_amount) as total_spent + FROM ${purchases.SQL_TABLE_NAME} + GROUP BY user_id ;; + } + + dimension: user_id { + type: number + sql: ${TABLE}.user_id ;; + primary_key: yes + } + + dimension: purchase_count { + type: number + sql: ${TABLE}.purchase_count ;; + } + + dimension: total_spent { + type: number + sql: ${TABLE}.total_spent ;; + } + +# Cross-view dimension with conditional logic + dimension: customer_segment { + type: string + sql: CASE + WHEN ${total_spent} > 1000 THEN 'High Value' + WHEN ${total_spent} > 500 THEN 'Medium Value' + ELSE 'Low Value' + END ;; + } + +# Cross-view measure with filtering + measure: high_value_customer_count { + type: count_distinct + sql: CASE WHEN ${total_spent} > 1000 THEN ${users.pk} END ;; + description: "Count of customers who spent over $1000" + } +} diff --git a/metadata-ingestion/tests/integration/lookml/lkml_col_lineage_sample/views/users.view.lkml b/metadata-ingestion/tests/integration/lookml/lkml_col_lineage_sample/views/users.view.lkml new file mode 100644 index 0000000000..69a616f319 --- /dev/null +++ b/metadata-ingestion/tests/integration/lookml/lkml_col_lineage_sample/views/users.view.lkml @@ -0,0 +1,73 @@ +# The name of this view in Looker is "Users" +view: users { + # The sql_table_name parameter indicates the underlying database table + # to be used for all fields in this view. + sql_table_name: "ECOMMERCE"."USERS" ;; + + # No primary key is defined for this view. In order to join this view in an Explore, + # define primary_key: yes on a dimension that has no repeated values. + + # Dates and timestamps can be represented in Looker using a dimension group of type: time. + # Looker converts dates and timestamps to the specified timeframes within the dimension group. + + dimension_group: created { + type: time + timeframes: [raw, time, date, week, month, quarter, year] + sql: ${TABLE}."CREATED_AT" ;; + } + # Here's what a typical dimension looks like in LookML. + # A dimension is a groupable field that can be used to filter query results. + # This dimension will be called "Email" in Explore. + + dimension: email { + type: string + sql: ${TABLE}."EMAIL" ;; + } + + dimension: pk { + primary_key: yes + type: number + sql: ${TABLE}."PK" ;; + } + + dimension_group: updated { + type: time + timeframes: [raw, time, date, week, month, quarter, year] + sql: ${TABLE}."UPDATED_AT" ;; + } + + measure: lifetime_purchase_count{ + type: count_distinct + sql: ${purchases.pk} ;; + description: "Total lifetime purchases count by user" + } + + measure: lifetime_total_purchase_amount{ + type: sum + sql: ${purchases.total_amount};; + value_format_name: usd + description: "Total lifetime revenue from purchases by user" + } + + dimension: user_purchase_status{ + type: string + sql: + CASE + WHEN ${user_metrics.purchase_count} <= 1 THEN 'First Purchase' + WHEN ${user_metrics.purchase_count} <= 3 THEN 'Early Customer' + WHEN ${user_metrics.purchase_count} <= 10 THEN 'Regular Customer' + ELSE 'Loyal Customer' + END ;; + } + + dimension_group: user_age { + type: duration + sql_start: ${TABLE}."CREATED_AT" ;; + sql_end: CURRENT_TIMESTAMP ;; + intervals: [day, week, month, quarter, year] + } + + measure: count { + type: count + } +} diff --git a/metadata-ingestion/tests/integration/lookml/test_lookml.py b/metadata-ingestion/tests/integration/lookml/test_lookml.py index 2e11f3669a..02d0b86ac1 100644 --- a/metadata-ingestion/tests/integration/lookml/test_lookml.py +++ b/metadata-ingestion/tests/integration/lookml/test_lookml.py @@ -1285,3 +1285,181 @@ def test_unreachable_views(pytestconfig): "The Looker view file was skipped because it may not be referenced by any models." in [failure.message for failure in source.get_report().warnings] ) + + +@freeze_time(FROZEN_TIME) +def test_col_lineage_looker_api_based(pytestconfig, tmp_path): + test_resources_dir = pytestconfig.rootpath / "tests/integration/lookml" + golden_path = test_resources_dir / "lkml_col_lineage_looker_api_based_golden.json" + mce_out_file = "lkml_col_lineage_looker_api_based.json" + recipe = { + "run_id": "lookml-test", + "source": { + "type": "lookml", + "config": { + "base_folder": f"{test_resources_dir}/lkml_col_lineage_sample", + "connection_to_platform_map": {"my_connection": "postgres"}, + "parse_table_names_from_sql": True, + "tag_measures_and_dimensions": False, + "project_name": "lkml_col_lineage_sample", + "use_api_for_view_lineage": True, + "api": { + "client_id": "fake_client_id", + "client_secret": "fake_secret", + "base_url": "fake_account.looker.com", + }, + }, + }, + "sink": { + "type": "file", + "config": { + "filename": f"{tmp_path / mce_out_file}", + }, + }, + } + + # Mock SQL responses based on the dump file + mock_sql_responses = { + # For user_metrics view (fields starting with user_metrics.) + "user_metrics": """WITH user_metrics AS (SELECT + user_fk as user_id, + COUNT(DISTINCT pk) as purchase_count, + SUM(total_amount) as total_spent + FROM "ECOMMERCE"."PURCHASES" + GROUP BY user_id ) +SELECT + user_metrics.user_id AS "user_metrics.user_id", + user_metrics.purchase_count AS "user_metrics.purchase_count", + user_metrics.total_spent AS "user_metrics.total_spent", + CASE + WHEN user_metrics.total_spent > 1000 THEN 'High Value' + WHEN user_metrics.total_spent > 500 THEN 'Medium Value' + ELSE 'Low Value' + END AS "user_metrics.customer_segment", + COUNT(DISTINCT CASE WHEN user_metrics.total_spent > 1000 THEN ( users."PK" ) END ) AS "user_metrics.high_value_customer_count" +FROM "ECOMMERCE"."USERS" AS customer_analysis +LEFT JOIN user_metrics ON user_metrics.user_id = (customer_analysis."PK") +INNER JOIN "ECOMMERCE"."USERS" AS users ON (customer_analysis."PK") = (users."PK") +GROUP BY + 1, + 2, + 3, + 4 +ORDER BY + 5 DESC +FETCH NEXT 1 ROWS ONLY""", + # For users view (fields starting with users.) + "users": """WITH user_metrics AS (SELECT + user_fk as user_id, + COUNT(DISTINCT pk) as purchase_count, + SUM(total_amount) as total_spent + FROM "ECOMMERCE"."PURCHASES" + GROUP BY user_id ) +SELECT + users."EMAIL" AS "users.email", + users."PK" AS "users.pk", + CASE + WHEN user_metrics.purchase_count <= 1 THEN 'First Purchase' + WHEN user_metrics.purchase_count <= 3 THEN 'Early Customer' + WHEN user_metrics.purchase_count <= 10 THEN 'Regular Customer' + ELSE 'Loyal Customer' + END AS "users.user_purchase_status", + users."CREATED_AT" AS "users.created_raw", + users."UPDATED_AT" AS "users.updated_raw", + (TIMESTAMPDIFF(DAY, CONVERT_TIMEZONE('UTC', 'America/Los_Angeles', CAST(users."CREATED_AT" AS TIMESTAMP_NTZ)), CONVERT_TIMEZONE('UTC', 'America/Los_Angeles', CAST(CURRENT_TIMESTAMP AS TIMESTAMP_NTZ))) + CASE WHEN TIMESTAMPDIFF(SECOND, TO_DATE(CONVERT_TIMEZONE('UTC', 'America/Los_Angeles', CAST(CURRENT_TIMESTAMP AS TIMESTAMP_NTZ))), CONVERT_TIMEZONE('UTC', 'America/Los_Angeles', CAST(CURRENT_TIMESTAMP AS TIMESTAMP_NTZ))) = TIMESTAMPDIFF(SECOND, TO_DATE(CONVERT_TIMEZONE('UTC', 'America/Los_Angeles', CAST(users."CREATED_AT" AS TIMESTAMP_NTZ))), CONVERT_TIMEZONE('UTC', 'America/Los_Angeles', CAST(users."CREATED_AT" AS TIMESTAMP_NTZ))) THEN 0 WHEN TIMESTAMPDIFF(SECOND, TO_DATE(CONVERT_TIMEZONE('UTC', 'America/Los_Angeles', CAST(CURRENT_TIMESTAMP AS TIMESTAMP_NTZ))), CONVERT_TIMEZONE('UTC', 'America/Los_Angeles', CAST(CURRENT_TIMESTAMP AS TIMESTAMP_NTZ))) < TIMESTAMPDIFF(SECOND, TO_DATE(CONVERT_TIMEZONE('UTC', 'America/Los_Angeles', CAST(users."CREATED_AT" AS TIMESTAMP_NTZ))), CONVERT_TIMEZONE('UTC', 'America/Los_Angeles', CAST(users."CREATED_AT" AS TIMESTAMP_NTZ))) THEN CASE WHEN CONVERT_TIMEZONE('UTC', 'America/Los_Angeles', CAST(users."CREATED_AT" AS TIMESTAMP_NTZ)) < CONVERT_TIMEZONE('UTC', 'America/Los_Angeles', CAST(CURRENT_TIMESTAMP AS TIMESTAMP_NTZ)) THEN -1 ELSE 0 END ELSE CASE WHEN CONVERT_TIMEZONE('UTC', 'America/Los_Angeles', CAST(users."CREATED_AT" AS TIMESTAMP_NTZ)) > CONVERT_TIMEZONE('UTC', 'America/Los_Angeles', CAST(CURRENT_TIMESTAMP AS TIMESTAMP_NTZ)) THEN 1 ELSE 0 END END) AS "users.days_user_age", + COUNT(DISTINCT ( purchases."PK" ) ) AS "users.lifetime_purchase_count", + COALESCE(CAST( ( SUM(DISTINCT (CAST(FLOOR(COALESCE( ( purchases."TOTAL_AMOUNT" ) ,0)*(1000000*1.0)) AS DECIMAL(38,0))) + (TO_NUMBER(MD5( users."PK" ), 'XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX') % 1.0e27)::NUMERIC(38, 0) ) - SUM(DISTINCT (TO_NUMBER(MD5( users."PK" ), 'XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX') % 1.0e27)::NUMERIC(38, 0)) ) AS DOUBLE PRECISION) / CAST((1000000*1.0) AS DOUBLE PRECISION), 0) AS "users.lifetime_total_purchase_amount", + COUNT(DISTINCT users."PK" ) AS "users.count" +FROM "ECOMMERCE"."USERS" AS customer_analysis +LEFT JOIN "ECOMMERCE"."PURCHASES" AS purchases ON (customer_analysis."PK") = (purchases."USER_FK") +LEFT JOIN user_metrics ON user_metrics.user_id = (customer_analysis."PK") +INNER JOIN "ECOMMERCE"."USERS" AS users ON (customer_analysis."PK") = (users."PK") +GROUP BY + 1, + 2, + 3, + 4, + 5, + 6 +ORDER BY + 7 DESC +FETCH NEXT 1 ROWS ONLY""", + # For purchases view (fields starting with purchases.) + "purchases": """SELECT + purchases."PK" AS "purchases.pk", + purchases."PURCHASE_AMOUNT" AS "purchases.purchase_amount", + purchases."STATUS" AS "purchases.status", + purchases."TAX_AMOUNT" AS "purchases.tax_amount", + purchases."TOTAL_AMOUNT" AS "purchases.total_amount", + purchases."USER_FK" AS "purchases.user_fk", + (CASE WHEN (purchases."TOTAL_AMOUNT") > 100 THEN 'Yes' ELSE 'No' END) AS "purchases.is_expensive_purchase", + (users."EMAIL") AS "purchases.user_email", + purchases."CREATED_AT" AS "purchases.created_raw", + purchases."UPDATED_AT" AS "purchases.updated_raw", + (TIMESTAMPDIFF(DAY, CONVERT_TIMEZONE('UTC', 'America/Los_Angeles', CAST(purchases."CREATED_AT" AS TIMESTAMP_NTZ)), CONVERT_TIMEZONE('UTC', 'America/Los_Angeles', CAST(CURRENT_TIMESTAMP AS TIMESTAMP_NTZ))) + CASE WHEN TIMESTAMPDIFF(SECOND, TO_DATE(CONVERT_TIMEZONE('UTC', 'America/Los_Angeles', CAST(CURRENT_TIMESTAMP AS TIMESTAMP_NTZ))), CONVERT_TIMEZONE('UTC', 'America/Los_Angeles', CAST(CURRENT_TIMESTAMP AS TIMESTAMP_NTZ))) = TIMESTAMPDIFF(SECOND, TO_DATE(CONVERT_TIMEZONE('UTC', 'America/Los_Angeles', CAST(purchases."CREATED_AT" AS TIMESTAMP_NTZ))), CONVERT_TIMEZONE('UTC', 'America/Los_Angeles', CAST(purchases."CREATED_AT" AS TIMESTAMP_NTZ))) THEN 0 WHEN TIMESTAMPDIFF(SECOND, TO_DATE(CONVERT_TIMEZONE('UTC', 'America/Los_Angeles', CAST(CURRENT_TIMESTAMP AS TIMESTAMP_NTZ))), CONVERT_TIMEZONE('UTC', 'America/Los_Angeles', CAST(CURRENT_TIMESTAMP AS TIMESTAMP_NTZ))) < TIMESTAMPDIFF(SECOND, TO_DATE(CONVERT_TIMEZONE('UTC', 'America/Los_Angeles', CAST(purchases."CREATED_AT" AS TIMESTAMP_NTZ))), CONVERT_TIMEZONE('UTC', 'America/Los_Angeles', CAST(purchases."CREATED_AT" AS TIMESTAMP_NTZ))) THEN CASE WHEN CONVERT_TIMEZONE('UTC', 'America/Los_Angeles', CAST(purchases."CREATED_AT" AS TIMESTAMP_NTZ)) < CONVERT_TIMEZONE('UTC', 'America/Los_Angeles', CAST(CURRENT_TIMESTAMP AS TIMESTAMP_NTZ)) THEN -1 ELSE 0 END ELSE CASE WHEN CONVERT_TIMEZONE('UTC', 'America/Los_Angeles', CAST(purchases."CREATED_AT" AS TIMESTAMP_NTZ)) > CONVERT_TIMEZONE('UTC', 'America/Los_Angeles', CAST(CURRENT_TIMESTAMP AS TIMESTAMP_NTZ)) THEN 1 ELSE 0 END END) AS "purchases.days_purchase_age", + COUNT(purchases."PK" ) AS "purchases.num_of_expensive_purchases", + AVG(( purchases."TOTAL_AMOUNT" ) ) AS "purchases.average_purchase_value", + COUNT(purchases."PK" ) AS "purchases.count" +FROM "ECOMMERCE"."USERS" AS customer_analysis +LEFT JOIN "ECOMMERCE"."PURCHASES" AS purchases ON (customer_analysis."PK") = (purchases."USER_FK") +INNER JOIN "ECOMMERCE"."USERS" AS users ON (customer_analysis."PK") = (users."PK") +GROUP BY + 1, + 2, + 3, + 4, + 5, + 6, + 7, + 8, + 9, + 10, + 11 +ORDER BY + 12 DESC +FETCH NEXT 1 ROWS ONLY""", + } + + def mock_run_inline_query( + body, result_format=None, transport_options=None, cache=None + ): + # Determine which view is being queried based on the fields + write_query = body + if write_query.fields and any( + field.startswith("user_metrics.") for field in write_query.fields + ): + return mock_sql_responses["user_metrics"] + elif write_query.fields and any( + field.startswith("users.") for field in write_query.fields + ): + return mock_sql_responses["users"] + elif write_query.fields and any( + field.startswith("purchases.") for field in write_query.fields + ): + return mock_sql_responses["purchases"] + else: + # Default fallback + return mock_sql_responses["user_metrics"] + + mock_connection = DBConnection( + dialect_name="postgres", + database="my_database", + ) + mock_model = mock.MagicMock(project_name="lkml_col_lineage_sample") + + mocked_client = mock.MagicMock() + mocked_client.run_inline_query.side_effect = mock_run_inline_query + mocked_client.connection.return_value = mock_connection + mocked_client.lookml_model.return_value = mock_model + + with mock.patch("looker_sdk.init40", return_value=mocked_client): + pipeline = Pipeline.create(recipe) + pipeline.run() + pipeline.pretty_print_summary() + pipeline.raise_from_status(raise_warnings=True) + + mce_helpers.check_golden_file( + pytestconfig, + output_path=tmp_path / mce_out_file, + golden_path=golden_path, + ) diff --git a/metadata-ingestion/tests/unit/lookml/__init__.py b/metadata-ingestion/tests/unit/lookml/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/metadata-ingestion/tests/unit/lookml/test_lookml_api_based_view_upstream.py b/metadata-ingestion/tests/unit/lookml/test_lookml_api_based_view_upstream.py new file mode 100644 index 0000000000..a00aea29ab --- /dev/null +++ b/metadata-ingestion/tests/unit/lookml/test_lookml_api_based_view_upstream.py @@ -0,0 +1,518 @@ +from unittest.mock import MagicMock, patch + +import pytest +from looker_sdk.sdk.api40.models import WriteQuery + +from datahub.ingestion.api.common import PipelineContext +from datahub.ingestion.source.looker.looker_common import ( + LookerViewId, + ViewField, + ViewFieldType, +) +from datahub.ingestion.source.looker.looker_constant import ( + NAME, + VIEW_FIELD_INTERVALS_ATTRIBUTE, + VIEW_FIELD_TIMEFRAMES_ATTRIBUTE, + VIEW_FIELD_TYPE_ATTRIBUTE, +) +from datahub.ingestion.source.looker.looker_lib_wrapper import LookerAPI +from datahub.ingestion.source.looker.looker_view_id_cache import LookerViewIdCache +from datahub.ingestion.source.looker.lookml_concept_context import ( + LookerFieldContext, + LookerViewContext, +) +from datahub.ingestion.source.looker.lookml_config import ( + LookMLSourceConfig, + LookMLSourceReport, +) +from datahub.ingestion.source.looker.view_upstream import ( + LookerQueryAPIBasedViewUpstream, +) +from datahub.sql_parsing.sqlglot_lineage import SqlParsingResult + + +def create_mock_sql_parsing_result( + table_error=None, column_error=None, in_tables=None, column_lineage=None +): + """Helper function to create a properly mocked SqlParsingResult.""" + mock_spr = MagicMock(spec=SqlParsingResult) + mock_debug_info = MagicMock() + mock_debug_info.table_error = table_error + mock_debug_info.column_error = column_error + mock_spr.debug_info = mock_debug_info + mock_spr.in_tables = in_tables or [] + mock_spr.column_lineage = column_lineage or [] + return mock_spr + + +class TestLookMLAPIBasedViewUpstream: + """Test suite for LookerQueryAPIBasedViewUpstream functionality.""" + + @pytest.fixture + def mock_view_context(self): + """Create a mock LookerViewContext for testing.""" + view_context = MagicMock(spec=LookerViewContext) + view_context.name.return_value = "test_view" + view_context.base_folder_path = "/test/path" + view_context.dimensions.return_value = [ + {NAME: "user_id", "type": "string"}, + {NAME: "email", "type": "string"}, + ] + view_context.measures.return_value = [ + {NAME: "total_users", "type": "number"}, + ] + view_context.dimension_groups.return_value = [] + + # Mock view_connection + mock_connection = MagicMock() + mock_connection.default_schema = "public" + mock_connection.default_db = "test_db" + mock_connection.platform = "postgres" + mock_connection.platform_instance = None + mock_connection.platform_env = None + view_context.view_connection = mock_connection + + return view_context + + @pytest.fixture + def mock_looker_view_id_cache(self): + """Create a mock LookerViewIdCache for testing.""" + cache = MagicMock(spec=LookerViewIdCache) + cache.model_name = "test_model" + return cache + + @pytest.fixture + def mock_config(self): + """Create a mock LookMLSourceConfig for testing.""" + config = MagicMock(spec=LookMLSourceConfig) + config.use_api_for_view_lineage = True + config.use_api_cache_for_view_lineage = False + config.env = "PROD" + return config + + @pytest.fixture + def mock_reporter(self): + """Create a mock LookMLSourceReport for testing.""" + return MagicMock(spec=LookMLSourceReport) + + @pytest.fixture + def mock_ctx(self): + """Create a mock PipelineContext for testing.""" + ctx = MagicMock(spec=PipelineContext) + ctx.graph = MagicMock() + return ctx + + @pytest.fixture + def mock_looker_client(self): + """Create a mock LookerAPI client for testing.""" + client = MagicMock(spec=LookerAPI) + return client + + @pytest.fixture + def view_to_explore_map(self): + """Create a view to explore mapping for testing.""" + return {"test_view": "test_explore"} + + @pytest.fixture + def upstream_instance( + self, + mock_view_context, + mock_looker_view_id_cache, + mock_config, + mock_reporter, + mock_ctx, + mock_looker_client, + view_to_explore_map, + ): + """Create a LookerQueryAPIBasedViewUpstream instance for testing.""" + # Mock the API response to prevent initialization errors + mock_looker_client.generate_sql_query.return_value = [ + {"sql": "SELECT test_view.user_id FROM test_table"} + ] + + # Mock the view ID cache + mock_view_id = MagicMock(spec=LookerViewId) + mock_view_id.get_urn.return_value = "urn:li:dataset:test" + mock_looker_view_id_cache.get_looker_view_id.return_value = mock_view_id + + with patch( + "datahub.ingestion.source.looker.view_upstream.create_lineage_sql_parsed_result" + ) as mock_create_lineage: + # Mock successful SQL parsing + mock_spr = create_mock_sql_parsing_result() + mock_create_lineage.return_value = mock_spr + + return LookerQueryAPIBasedViewUpstream( + view_context=mock_view_context, + looker_view_id_cache=mock_looker_view_id_cache, + config=mock_config, + reporter=mock_reporter, + ctx=mock_ctx, + looker_client=mock_looker_client, + view_to_explore_map=view_to_explore_map, + ) + + def test_time_dimension_group_handling(self, upstream_instance): + """Test that time dimension groups are handled correctly.""" + dim_group = { + NAME: "created", + VIEW_FIELD_TYPE_ATTRIBUTE: "time", + VIEW_FIELD_TIMEFRAMES_ATTRIBUTE: ["date", "week", "month"], + } + + result = upstream_instance._get_time_dim_group_field_name(dim_group) + assert result == "created_date" + + def test_time_dimension_group_without_timeframes(self, upstream_instance): + """Test time dimension group handling when timeframes are not specified.""" + dim_group = { + NAME: "created", + VIEW_FIELD_TYPE_ATTRIBUTE: "time", + } + + result = upstream_instance._get_time_dim_group_field_name(dim_group) + assert result == "created_raw" + + def test_duration_dimension_group_handling(self, upstream_instance): + """Test that duration dimension groups are handled correctly.""" + dim_group = { + NAME: "since_event", + VIEW_FIELD_TYPE_ATTRIBUTE: "duration", + VIEW_FIELD_INTERVALS_ATTRIBUTE: ["hour", "day", "week"], + } + + result = upstream_instance._get_duration_dim_group_field_name(dim_group) + assert result == "hours_since_event" + + def test_duration_dimension_group_without_intervals(self, upstream_instance): + """Test duration dimension group handling when intervals are not specified.""" + dim_group = { + NAME: "since_event", + VIEW_FIELD_TYPE_ATTRIBUTE: "duration", + } + + result = upstream_instance._get_duration_dim_group_field_name(dim_group) + assert result == "days_since_event" + + def test_get_looker_api_field_name(self, upstream_instance): + """Test field name translation to Looker API format.""" + result = upstream_instance._get_looker_api_field_name("user_id") + assert result == "test_view.user_id" + + def test_get_field_name_from_looker_api_field_name(self, upstream_instance): + """Test field name translation from Looker API format.""" + result = upstream_instance._get_field_name_from_looker_api_field_name( + "test_view.user_id" + ) + assert result == "user_id" + + def test_get_field_name_from_looker_api_field_name_mismatch( + self, upstream_instance + ): + """Test field name translation when view name doesn't match.""" + result = upstream_instance._get_field_name_from_looker_api_field_name( + "other_view.user_id" + ) + assert result == "other_view.user_id" + + def test_get_sql_write_query_success(self, upstream_instance): + """Test successful WriteQuery construction.""" + query = upstream_instance._get_sql_write_query() + + assert isinstance(query, WriteQuery) + assert query.model == "test_model" + assert query.view == "test_explore" + assert query.limit == "1" + assert query.fields is not None + assert "test_view.user_id" in query.fields + assert "test_view.email" in query.fields + assert "test_view.total_users" in query.fields + + def test_get_sql_write_query_no_fields(self, upstream_instance, mock_view_context): + """Test WriteQuery construction when no fields are found.""" + mock_view_context.dimensions.return_value = [] + mock_view_context.measures.return_value = [] + mock_view_context.dimension_groups.return_value = [] + + with pytest.raises(ValueError, match="No fields found for view"): + upstream_instance._get_sql_write_query() + + @patch( + "datahub.ingestion.source.looker.view_upstream.create_lineage_sql_parsed_result" + ) + def test_execute_query_success( + self, mock_create_lineage, upstream_instance, mock_looker_client + ): + """Test successful query execution.""" + # Mock the SQL response + mock_sql_response = "SELECT test_view.user_id FROM test_table" + mock_looker_client.generate_sql_query.return_value = mock_sql_response + + # Mock the SQL parsing result + mock_spr = create_mock_sql_parsing_result( + in_tables=["urn:li:dataset:(urn:li:dataPlatform:postgres,test_table,PROD)"] + ) + mock_create_lineage.return_value = mock_spr + + result = upstream_instance._execute_query(MagicMock(spec=WriteQuery)) + assert result == "SELECT test_view.user_id FROM test_table" + + def test_execute_query_no_sql_response(self, upstream_instance, mock_looker_client): + """Test query execution when no SQL is returned.""" + mock_looker_client.generate_sql_query.return_value = [] + + with pytest.raises(ValueError, match="No SQL found in response"): + upstream_instance._execute_query(MagicMock(spec=WriteQuery)) + + def test_execute_query_invalid_response_format( + self, upstream_instance, mock_looker_client + ): + """Test query execution with invalid response format.""" + mock_looker_client.generate_sql_query.return_value = None + + with pytest.raises(ValueError, match="No SQL found in response"): + upstream_instance._execute_query(MagicMock(spec=WriteQuery)) + + @patch( + "datahub.ingestion.source.looker.view_upstream.create_lineage_sql_parsed_result" + ) + def test_get_spr_table_error( + self, mock_create_lineage, upstream_instance, mock_looker_client + ): + """Test SQL parsing result when table extraction fails.""" + # Clear the cache to force re-execution + upstream_instance._get_spr.cache_clear() + + # Mock the SQL response + mock_sql_response = [{"sql": "SELECT * FROM test_table"}] + mock_looker_client.generate_sql_query.return_value = mock_sql_response + + # Mock the SQL parsing result with table error + mock_spr = create_mock_sql_parsing_result( + table_error=Exception("Table parsing failed") + ) + mock_create_lineage.return_value = mock_spr + + with pytest.raises( + ValueError, match="Error in parsing SQL for upstream tables" + ): + upstream_instance._get_spr() + + @patch( + "datahub.ingestion.source.looker.view_upstream.create_lineage_sql_parsed_result" + ) + def test_get_spr_column_error( + self, mock_create_lineage, upstream_instance, mock_looker_client + ): + """Test SQL parsing result when column extraction fails.""" + # Clear the cache to force re-execution + upstream_instance._get_spr.cache_clear() + + # Mock the SQL response + mock_sql_response = [{"sql": "SELECT * FROM test_table"}] + mock_looker_client.generate_sql_query.return_value = mock_sql_response + + # Mock the SQL parsing result with column error + mock_spr = create_mock_sql_parsing_result( + column_error=Exception("Column parsing failed") + ) + mock_create_lineage.return_value = mock_spr + + with pytest.raises(ValueError, match="Error in parsing SQL for column lineage"): + upstream_instance._get_spr() + + @patch( + "datahub.ingestion.source.looker.view_upstream.create_lineage_sql_parsed_result" + ) + def test_get_upstream_dataset_urn( + self, mock_create_lineage, upstream_instance, mock_looker_client + ): + """Test upstream dataset URN extraction.""" + # Clear all caches to force re-execution + upstream_instance._get_spr.cache_clear() + upstream_instance._get_upstream_dataset_urn.cache_clear() + + # Mock the SQL response + mock_sql_response = [{"sql": "SELECT * FROM test_table"}] + mock_looker_client.generate_sql_query.return_value = mock_sql_response + + # Mock the SQL parsing result + mock_spr = create_mock_sql_parsing_result( + in_tables=["urn:li:dataset:(urn:li:dataPlatform:postgres,test_table,PROD)"] + ) + mock_create_lineage.return_value = mock_spr + + result = upstream_instance.get_upstream_dataset_urn() + assert len(result) == 1 + assert "test_table" in result[0] + + @patch( + "datahub.ingestion.source.looker.view_upstream.create_lineage_sql_parsed_result" + ) + def test_get_upstream_column_ref( + self, mock_create_lineage, upstream_instance, mock_looker_client + ): + """Test upstream column reference extraction.""" + # Clear the cache to force re-execution + upstream_instance._get_spr.cache_clear() + + # Mock the SQL response + mock_sql_response = [{"sql": "SELECT user_id FROM test_table"}] + mock_looker_client.generate_sql_query.return_value = mock_sql_response + + # Mock the SQL parsing result with column lineage + mock_column_lineage = [ + MagicMock( + downstream=MagicMock(column="test_view.user_id"), + upstreams=[MagicMock(table="test_table", column="user_id")], + ) + ] + mock_spr = create_mock_sql_parsing_result(column_lineage=mock_column_lineage) + mock_create_lineage.return_value = mock_spr + + # Mock field context + field_context = MagicMock(spec=LookerFieldContext) + field_context.name.return_value = "user_id" + field_context.raw_field = {NAME: "user_id"} + + result = upstream_instance.get_upstream_column_ref(field_context) + assert len(result) == 1 + assert result[0].table == "test_table" + assert result[0].column == "user_id" + + @patch( + "datahub.ingestion.source.looker.view_upstream.create_lineage_sql_parsed_result" + ) + def test_get_upstream_column_ref_dimension_group( + self, mock_create_lineage, upstream_instance, mock_looker_client + ): + """Test upstream column reference extraction for dimension groups.""" + # Clear the cache to force re-execution + upstream_instance._get_spr.cache_clear() + + # Mock the SQL response + mock_sql_response = [{"sql": "SELECT created_date FROM test_table"}] + mock_looker_client.generate_sql_query.return_value = mock_sql_response + + # Mock the SQL parsing result with column lineage + mock_column_lineage = [ + MagicMock( + downstream=MagicMock(column="test_view.created_date"), + upstreams=[MagicMock(table="test_table", column="created_at")], + ) + ] + mock_spr = create_mock_sql_parsing_result(column_lineage=mock_column_lineage) + mock_create_lineage.return_value = mock_spr + + # Mock field context for time dimension group + field_context = MagicMock(spec=LookerFieldContext) + field_context.name.return_value = "created" + field_context.raw_field = { + NAME: "created", + VIEW_FIELD_TYPE_ATTRIBUTE: "time", + VIEW_FIELD_TIMEFRAMES_ATTRIBUTE: ["date"], + } + + result = upstream_instance.get_upstream_column_ref(field_context) + assert len(result) == 1 + assert result[0].table == "test_table" + assert result[0].column == "created_at" + + @patch( + "datahub.ingestion.source.looker.view_upstream.create_lineage_sql_parsed_result" + ) + def test_create_fields( + self, mock_create_lineage, upstream_instance, mock_looker_client + ): + """Test ViewField creation from SQL parsing result.""" + # Clear the cache to force re-execution + upstream_instance._get_spr.cache_clear() + + # Mock the SQL response + mock_sql_response = [{"sql": "SELECT user_id FROM test_table"}] + mock_looker_client.generate_sql_query.return_value = mock_sql_response + + # Mock the SQL parsing result with column lineage + mock_column_lineage = [ + MagicMock( + downstream=MagicMock( + column="test_view.user_id", native_column_type="string" + ), + upstreams=[MagicMock(table="test_table", column="user_id")], + ) + ] + mock_spr = create_mock_sql_parsing_result(column_lineage=mock_column_lineage) + mock_create_lineage.return_value = mock_spr + + result = upstream_instance.create_fields() + assert len(result) == 1 + assert isinstance(result[0], ViewField) + assert result[0].name == "user_id" + assert result[0].type == "string" + assert result[0].field_type == ViewFieldType.UNKNOWN + + def test_create_fields_no_column_lineage(self, upstream_instance): + """Test ViewField creation when no column lineage is available.""" + # Mock the SQL parsing result without column lineage + mock_spr = MagicMock(spec=SqlParsingResult) + mock_spr.column_lineage = None + + with patch.object(upstream_instance, "_get_spr", return_value=mock_spr): + result = upstream_instance.create_fields() + assert result == [] + + def test_api_failure_fallback( + self, + mock_view_context, + mock_looker_view_id_cache, + mock_config, + mock_reporter, + mock_ctx, + mock_looker_client, + view_to_explore_map, + ): + """Test that API failures are handled gracefully.""" + # Mock the Looker client to raise an exception + mock_looker_client.generate_sql_query.side_effect = Exception("API call failed") + + # This should not raise an exception, but should be handled by the fallback mechanism + # in the factory function + with pytest.raises(Exception, match="API call failed"): + LookerQueryAPIBasedViewUpstream( + view_context=mock_view_context, + looker_view_id_cache=mock_looker_view_id_cache, + config=mock_config, + reporter=mock_reporter, + ctx=mock_ctx, + looker_client=mock_looker_client, + view_to_explore_map=view_to_explore_map, + ) + + def test_latency_tracking( + self, upstream_instance, mock_looker_client, mock_reporter + ): + """Test that API latency is tracked and reported.""" + # Clear the cache to force re-execution + upstream_instance._get_spr.cache_clear() + + # Mock the SQL response + mock_sql_response = [{"sql": "SELECT * FROM test_table"}] + mock_looker_client.generate_sql_query.return_value = mock_sql_response + + # Mock the view ID cache to return a valid view ID + mock_view_id = MagicMock(spec=LookerViewId) + mock_view_id.get_urn.return_value = "urn:li:dataset:test" + upstream_instance.looker_view_id_cache.get_looker_view_id.return_value = ( + mock_view_id + ) + + with patch( + "datahub.ingestion.source.looker.view_upstream.create_lineage_sql_parsed_result" + ) as mock_create_lineage: + mock_spr = create_mock_sql_parsing_result() + mock_create_lineage.return_value = mock_spr + + upstream_instance._execute_query(MagicMock(spec=WriteQuery)) + + # Verify that latency was reported (may be called multiple times due to caching) + assert mock_reporter.report_looker_query_api_latency.call_count >= 1