fix(lookml/ingestion): Skip unreferenced or improperly loaded Lookml view files (#12351)

This commit is contained in:
sagar-salvi-apptware 2025-01-27 15:37:31 +05:30 committed by GitHub
parent a4f8d170f9
commit 3e9e6e4fe0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 201 additions and 10 deletions

View File

@ -139,7 +139,10 @@ class LookMLSourceConfig(
)
emit_reachable_views_only: bool = Field(
True,
description="When enabled, only views that are reachable from explores defined in the model files are emitted",
description=(
"When enabled, only views that are reachable from explores defined in the model files are emitted. "
"If set to False, all views imported in model files are emitted. Views that are unreachable i.e. not explicitly defined in the model files are currently not emitted however reported as warning for debugging purposes."
),
)
populate_sql_logic_for_missing_descriptions: bool = Field(
False,

View File

@ -59,6 +59,7 @@ from datahub.ingestion.source.looker.lookml_concept_context import (
from datahub.ingestion.source.looker.lookml_config import (
BASE_PROJECT_NAME,
MODEL_FILE_EXTENSION,
VIEW_FILE_EXTENSION,
LookerConnectionDefinition,
LookMLSourceConfig,
LookMLSourceReport,
@ -884,6 +885,7 @@ class LookMLSource(StatefulIngestionSourceBase):
view_urn = maybe_looker_view.id.get_urn(
self.source_config
)
view_connection_mapping = view_connection_map.get(
view_urn
)
@ -939,6 +941,9 @@ class LookMLSource(StatefulIngestionSourceBase):
str(maybe_looker_view.id)
)
if not self.source_config.emit_reachable_views_only:
self.report_skipped_unreachable_views(viewfile_loader, processed_view_map)
if (
self.source_config.tag_measures_and_dimensions
and self.reporter.events_produced != 0
@ -966,5 +971,56 @@ class LookMLSource(StatefulIngestionSourceBase):
),
).as_workunit()
def report_skipped_unreachable_views(
self,
viewfile_loader: LookerViewFileLoader,
processed_view_map: Dict[str, Set[str]] = {},
) -> None:
view_files: Dict[str, List[pathlib.Path]] = {}
for project, folder_path in self.base_projects_folder.items():
folder = pathlib.Path(folder_path)
view_files[project] = list(folder.glob(f"**/*{VIEW_FILE_EXTENSION}"))
skipped_view_paths: Dict[str, List[str]] = {}
for project, views in view_files.items():
skipped_paths: Set[str] = set()
for view_path in views:
# Check if the view is already in processed_view_map
if not any(
str(view_path) in view_set
for view_set in processed_view_map.values()
):
looker_viewfile = viewfile_loader.load_viewfile(
path=str(view_path),
project_name=project,
connection=None,
reporter=self.reporter,
)
if looker_viewfile is not None:
for raw_view in looker_viewfile.views:
raw_view_name = raw_view.get("name", "")
if (
raw_view_name
and self.source_config.view_pattern.allowed(
raw_view_name
)
):
skipped_paths.add(str(view_path))
skipped_view_paths[project] = list(skipped_paths)
for project, view_paths in skipped_view_paths.items():
for path in view_paths:
self.reporter.report_warning(
title="Skipped View File",
message=(
"The Looker view file was skipped because it may not be referenced by any models."
),
context=(f"Project: {project}, View File Path: {path}"),
)
def get_report(self):
return self.reporter

View File

@ -0,0 +1,10 @@
connection: "my_connection"
include: "employee_income_source.view.lkml"
include: "employee_total_income.view.lkml"
explore: employee_income_source {
}
explore: employee_total_income {
}

View File

@ -0,0 +1,40 @@
view: employee_income_source {
derived_table: {
sql: SELECT
employee_id,
employee_name,
{% if dw_eff_dt_date._is_selected or finance_dw_eff_dt_date._is_selected %}
prod_core.data.r_metric_summary_v2
{% elsif dw_eff_dt_week._is_selected or finance_dw_eff_dt_week._is_selected %}
prod_core.data.r_metric_summary_v3
{% else %}
'default_table' as source
{% endif %},
employee_income
FROM source_table
WHERE
{% condition source_region %} source_table.region {% endcondition %}
;;
}
dimension: id {
type: number
sql: ${TABLE}.employee_id;;
}
dimension: name {
type: string
sql: ${TABLE}.employee_name;;
}
dimension: source {
type: string
sql: ${TABLE}.source ;;
}
dimension: income {
type: number
sql: ${TABLE}.employee_income ;;
}
}

View File

@ -0,0 +1,18 @@
view: employee_total_income {
sql_table_name: ${employee_income_source.SQL_TABLE_NAME} ;;
dimension: id {
type: number
sql: ${TABLE}.id;;
}
dimension: name {
type: string
sql: ${TABLE}.name;;
}
measure: total_income {
type: sum
sql: ${TABLE}.income;;
}
}

View File

@ -0,0 +1,18 @@
view: employee_unreachable {
sql_table_name: ${employee_income_source.SQL_TABLE_NAME} ;;
dimension: id {
type: number
sql: ${TABLE}.id;;
}
dimension: name {
type: string
sql: ${TABLE}.name;;
}
measure: total_income {
type: sum
sql: ${TABLE}.income;;
}
}

View File

@ -10,6 +10,8 @@ from deepdiff import DeepDiff
from freezegun import freeze_time
from looker_sdk.sdk.api40.models import DBConnection
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.run.pipeline import Pipeline
from datahub.ingestion.source.file import read_metadata_file
from datahub.ingestion.source.looker.looker_dataclasses import LookerModel
@ -20,6 +22,7 @@ from datahub.ingestion.source.looker.looker_template_language import (
)
from datahub.ingestion.source.looker.lookml_config import LookMLSourceConfig
from datahub.ingestion.source.looker.lookml_refinement import LookerRefinementResolver
from datahub.ingestion.source.looker.lookml_source import LookMLSource
from datahub.metadata.schema_classes import (
DatasetSnapshotClass,
MetadataChangeEventClass,
@ -78,7 +81,8 @@ def test_lookml_ingest(pytestconfig, tmp_path, mock_time):
)
pipeline.run()
pipeline.pretty_print_summary()
pipeline.raise_from_status(raise_warnings=True)
pipeline.raise_from_status(raise_warnings=False)
assert pipeline.source.get_report().warnings.total_elements == 1
mce_helpers.check_golden_file(
pytestconfig,
@ -112,7 +116,8 @@ def test_lookml_refinement_ingest(pytestconfig, tmp_path, mock_time):
pipeline = Pipeline.create(new_recipe)
pipeline.run()
pipeline.pretty_print_summary()
pipeline.raise_from_status(raise_warnings=True)
pipeline.raise_from_status(raise_warnings=False)
assert pipeline.source.get_report().warnings.total_elements == 1
golden_path = test_resources_dir / "refinements_ingestion_golden.json"
mce_helpers.check_golden_file(
@ -142,7 +147,8 @@ def test_lookml_refinement_include_order(pytestconfig, tmp_path, mock_time):
pipeline = Pipeline.create(new_recipe)
pipeline.run()
pipeline.pretty_print_summary()
pipeline.raise_from_status(raise_warnings=True)
pipeline.raise_from_status(raise_warnings=False)
assert pipeline.source.get_report().warnings.total_elements == 1
golden_path = test_resources_dir / "refinement_include_order_golden.json"
mce_helpers.check_golden_file(
@ -332,7 +338,8 @@ def test_lookml_ingest_offline(pytestconfig, tmp_path, mock_time):
)
pipeline.run()
pipeline.pretty_print_summary()
pipeline.raise_from_status(raise_warnings=True)
pipeline.raise_from_status(raise_warnings=False)
assert pipeline.source.get_report().warnings.total_elements == 1
mce_helpers.check_golden_file(
pytestconfig,
@ -377,7 +384,8 @@ def test_lookml_ingest_offline_with_model_deny(pytestconfig, tmp_path, mock_time
)
pipeline.run()
pipeline.pretty_print_summary()
pipeline.raise_from_status(raise_warnings=True)
pipeline.raise_from_status(raise_warnings=False)
assert pipeline.source.get_report().warnings.total_elements == 1
mce_helpers.check_golden_file(
pytestconfig,
@ -424,7 +432,8 @@ def test_lookml_ingest_offline_platform_instance(pytestconfig, tmp_path, mock_ti
)
pipeline.run()
pipeline.pretty_print_summary()
pipeline.raise_from_status(raise_warnings=True)
pipeline.raise_from_status(raise_warnings=False)
assert pipeline.source.get_report().warnings.total_elements == 1
mce_helpers.check_golden_file(
pytestconfig,
@ -507,7 +516,8 @@ def ingestion_test(
)
pipeline.run()
pipeline.pretty_print_summary()
pipeline.raise_from_status(raise_warnings=True)
pipeline.raise_from_status(raise_warnings=False)
assert pipeline.source.get_report().warnings.total_elements == 1
mce_helpers.check_golden_file(
pytestconfig,
@ -553,7 +563,8 @@ def test_lookml_git_info(pytestconfig, tmp_path, mock_time):
)
pipeline.run()
pipeline.pretty_print_summary()
pipeline.raise_from_status(raise_warnings=True)
pipeline.raise_from_status(raise_warnings=False)
assert pipeline.source.get_report().warnings.total_elements == 1
mce_helpers.check_golden_file(
pytestconfig,
@ -668,7 +679,8 @@ def test_hive_platform_drops_ids(pytestconfig, tmp_path, mock_time):
)
pipeline.run()
pipeline.pretty_print_summary()
pipeline.raise_from_status(raise_warnings=True)
pipeline.raise_from_status(raise_warnings=False)
assert pipeline.source.get_report().warnings.total_elements == 1
events = read_metadata_file(tmp_path / mce_out)
for mce in events:
@ -1051,3 +1063,37 @@ def test_gms_schema_resolution(pytestconfig, tmp_path, mock_time):
output_path=tmp_path / mce_out_file,
golden_path=golden_path,
)
@freeze_time(FROZEN_TIME)
def test_unreachable_views(pytestconfig):
test_resources_dir = pytestconfig.rootpath / "tests/integration/lookml"
config = {
"base_folder": f"{test_resources_dir}/lkml_unreachable_views",
"connection_to_platform_map": {"my_connection": "postgres"},
"parse_table_names_from_sql": True,
"tag_measures_and_dimensions": False,
"project_name": "lkml_samples",
"model_pattern": {"deny": ["data2"]},
"emit_reachable_views_only": False,
"liquid_variable": {
"order_region": "ap-south-1",
"source_region": "ap-south-1",
"dw_eff_dt_date": {
"_is_selected": True,
},
},
}
source = LookMLSource(
LookMLSourceConfig.parse_obj(config),
ctx=PipelineContext(run_id="lookml-source-test"),
)
wu: List[MetadataWorkUnit] = [*source.get_workunits_internal()]
assert len(wu) == 15
assert source.reporter.warnings.total_elements == 1
assert (
"The Looker view file was skipped because it may not be referenced by any models."
in [failure.message for failure in source.get_report().warnings]
)