sagar-salvi-apptware 563656c4d5
feat(ingestion/lookml): resolve access notation for LookML Constant (#12277)
Co-authored-by: Siddique Bagwan <siddique.bagwan@acryl.io>
Co-authored-by: Mayuri Nehate <33225191+mayurinehate@users.noreply.github.com>
2025-01-28 12:25:45 +05:30

1235 lines
42 KiB
Python

import logging
import pathlib
from typing import Any, List, Optional, Tuple
from unittest import mock
from unittest.mock import MagicMock, patch
import pydantic
import pytest
from deepdiff import DeepDiff
from freezegun import freeze_time
from looker_sdk.sdk.api40.models import DBConnection
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.run.pipeline import Pipeline
from datahub.ingestion.source.file import read_metadata_file
from datahub.ingestion.source.looker.looker_dataclasses import (
LookerConstant,
LookerModel,
)
from datahub.ingestion.source.looker.looker_template_language import (
LookmlConstantTransformer,
SpecialVariable,
load_and_preprocess_file,
resolve_liquid_variable,
)
from datahub.ingestion.source.looker.lookml_config import (
LookMLSourceConfig,
LookMLSourceReport,
)
from datahub.ingestion.source.looker.lookml_refinement import LookerRefinementResolver
from datahub.ingestion.source.looker.lookml_source import LookMLSource
from datahub.metadata.schema_classes import (
DatasetSnapshotClass,
MetadataChangeEventClass,
UpstreamLineageClass,
)
from datahub.sql_parsing.schema_resolver import SchemaInfo, SchemaResolver
from tests.test_helpers import mce_helpers
from tests.test_helpers.state_helpers import get_current_checkpoint_from_pipeline
logging.getLogger("lkml").setLevel(logging.INFO)
FROZEN_TIME = "2020-04-14 07:00:00"
GMS_PORT = 8080
GMS_SERVER = f"http://localhost:{GMS_PORT}"
def get_default_recipe(output_file_path, base_folder_path):
return {
"run_id": "lookml-test",
"source": {
"type": "lookml",
"config": {
"base_folder": base_folder_path,
"connection_to_platform_map": {"my_connection": "postgres"},
"parse_table_names_from_sql": True,
"tag_measures_and_dimensions": False,
"project_name": "lkml_samples",
"model_pattern": {"deny": ["data2"]},
"emit_reachable_views_only": False,
"liquid_variable": {"order_region": "ap-south-1"},
},
},
"sink": {
"type": "file",
"config": {
"filename": f"{output_file_path}",
},
},
}
@freeze_time(FROZEN_TIME)
def test_lookml_ingest(pytestconfig, tmp_path, mock_time):
"""Test backwards compatibility with a previous form of config with new flags turned off"""
test_resources_dir = pytestconfig.rootpath / "tests/integration/lookml"
mce_out_file = "expected_output.json"
# Note this config below is known to create "bad" lineage since the config author has not provided enough
# information to resolve relative table names (which are not fully qualified) We keep this check just to validate
# that ingestion doesn't croak on this config
pipeline = Pipeline.create(
get_default_recipe(
f"{tmp_path}/{mce_out_file}", f"{test_resources_dir}/lkml_samples"
)
)
pipeline.run()
pipeline.pretty_print_summary()
pipeline.raise_from_status(raise_warnings=False)
assert pipeline.source.get_report().warnings.total_elements == 1
mce_helpers.check_golden_file(
pytestconfig,
output_path=tmp_path / mce_out_file,
golden_path=test_resources_dir / mce_out_file,
)
@freeze_time(FROZEN_TIME)
def test_lookml_refinement_ingest(pytestconfig, tmp_path, mock_time):
"""Test backwards compatibility with previous form of config with new flags turned off"""
test_resources_dir = pytestconfig.rootpath / "tests/integration/lookml"
mce_out_file = "refinement_mces_output.json"
# Note this config below is known to create "bad" lineage since the config author has not provided enough information
# to resolve relative table names (which are not fully qualified)
# We keep this check just to validate that ingestion doesn't croak on this config
new_recipe = get_default_recipe(
f"{tmp_path}/{mce_out_file}", f"{test_resources_dir}/lkml_samples"
)
new_recipe["source"]["config"]["process_refinements"] = True
new_recipe["source"]["config"]["view_naming_pattern"] = (
"{project}.{file_path}.view.{name}"
)
new_recipe["source"]["config"]["view_browse_pattern"] = (
"/{env}/{platform}/{project}/{file_path}/views"
)
pipeline = Pipeline.create(new_recipe)
pipeline.run()
pipeline.pretty_print_summary()
pipeline.raise_from_status(raise_warnings=False)
assert pipeline.source.get_report().warnings.total_elements == 1
golden_path = test_resources_dir / "refinements_ingestion_golden.json"
mce_helpers.check_golden_file(
pytestconfig,
output_path=tmp_path / mce_out_file,
golden_path=golden_path,
)
@freeze_time(FROZEN_TIME)
def test_lookml_refinement_include_order(pytestconfig, tmp_path, mock_time):
test_resources_dir = pytestconfig.rootpath / "tests/integration/lookml"
mce_out_file = "refinement_include_order_mces_output.json"
new_recipe = get_default_recipe(
f"{tmp_path}/{mce_out_file}",
f"{test_resources_dir}/lkml_refinement_samples/sample1",
)
new_recipe["source"]["config"]["process_refinements"] = True
new_recipe["source"]["config"]["project_name"] = "lkml_refinement_sample1"
new_recipe["source"]["config"]["view_naming_pattern"] = {
"pattern": "{project}.{model}.view.{name}"
}
new_recipe["source"]["config"]["connection_to_platform_map"] = {
"db-connection": "conn"
}
pipeline = Pipeline.create(new_recipe)
pipeline.run()
pipeline.pretty_print_summary()
pipeline.raise_from_status(raise_warnings=False)
assert pipeline.source.get_report().warnings.total_elements == 1
golden_path = test_resources_dir / "refinement_include_order_golden.json"
mce_helpers.check_golden_file(
pytestconfig,
output_path=tmp_path / mce_out_file,
golden_path=golden_path,
)
@freeze_time(FROZEN_TIME)
def test_lookml_explore_refinement(pytestconfig, tmp_path, mock_time):
looker_model = LookerModel(
explores=[
{
"name": "book",
},
{"name": "+book", "extends__all": [["order"]]},
{"name": "+book", "extends__all": [["transaction"]]},
],
connection="",
resolved_includes=[],
includes=[],
)
refinement_resolver = LookerRefinementResolver(
looker_model=looker_model,
looker_viewfile_loader=None, # type: ignore
reporter=None, # type: ignore
source_config=LookMLSourceConfig.parse_obj(
{
"process_refinements": "True",
"base_folder": ".",
"api": {
"base_url": "fake",
"client_id": "fake_client_id",
"client_secret": "fake_client_secret",
},
}
),
connection_definition=None, # type: ignore
)
new_explore: dict = refinement_resolver.apply_explore_refinement(
looker_model.explores[0]
)
assert new_explore.get("extends") is not None
assert new_explore["extends"].sort() == ["order", "transaction"].sort()
@freeze_time(FROZEN_TIME)
def test_lookml_view_merge(pytestconfig, tmp_path, mock_time):
raw_view: dict = {
"sql_table_name": "flightstats.accidents",
"dimensions": [
{
"type": "number",
"primary_key": "yes",
"sql": '${TABLE}."id"',
"name": "id",
}
],
"name": "flights",
}
refinement_views: List[dict] = [
{
"dimensions": [
{
"type": "string",
"sql": '${TABLE}."air_carrier"',
"name": "air_carrier",
}
],
"name": "+flights",
},
{
"measures": [
{"type": "average", "sql": "${distance}", "name": "distance_avg"},
{
"type": "number",
"sql": "STDDEV(${distance})",
"name": "distance_stddev",
},
],
"dimensions": [
{
"type": "tier",
"sql": "${distance}",
"tiers": [500, 1300],
"name": "distance_tiered2",
},
],
"name": "+flights",
},
{
"dimension_groups": [
{
"type": "duration",
"intervals": ["week", "year"],
"sql_start": '${TABLE}."enrollment_date"',
"sql_end": '${TABLE}."graduation_date"',
"name": "enrolled",
},
],
"name": "+flights",
},
{
"dimensions": [{"type": "string", "sql": '${TABLE}."id"', "name": "id"}],
"name": "+flights",
},
]
merged_view: dict = LookerRefinementResolver.merge_refinements(
raw_view=raw_view, refinement_views=refinement_views
)
expected_view: dict = {
"sql_table_name": "flightstats.accidents",
"dimensions": [
{
"type": "string",
"primary_key": "yes",
"sql": '${TABLE}."id"',
"name": "id",
},
{"type": "string", "sql": '${TABLE}."air_carrier"', "name": "air_carrier"},
{
"type": "tier",
"sql": "${distance}",
"tiers": [500, 1300],
"name": "distance_tiered2",
},
],
"name": "flights",
"measures": [
{"type": "average", "sql": "${distance}", "name": "distance_avg"},
{"type": "number", "sql": "STDDEV(${distance})", "name": "distance_stddev"},
],
"dimension_groups": [
{
"type": "duration",
"intervals": ["week", "year"],
"sql_start": '${TABLE}."enrollment_date"',
"sql_end": '${TABLE}."graduation_date"',
"name": "enrolled",
}
],
}
assert DeepDiff(expected_view, merged_view) == {}
@freeze_time(FROZEN_TIME)
def test_lookml_ingest_offline(pytestconfig, tmp_path, mock_time):
"""New form of config with offline specification of connection defaults"""
test_resources_dir = pytestconfig.rootpath / "tests/integration/lookml"
mce_out = "lookml_mces_offline.json"
pipeline = Pipeline.create(
{
"run_id": "lookml-test",
"source": {
"type": "lookml",
"config": {
"base_folder": str(test_resources_dir / "lkml_samples"),
"connection_to_platform_map": {
"my_connection": {
"platform": "snowflake",
"default_db": "default_db",
"default_schema": "default_schema",
}
},
"parse_table_names_from_sql": True,
"project_name": "lkml_samples",
"model_pattern": {"deny": ["data2"]},
"emit_reachable_views_only": False,
"process_refinements": False,
},
},
"sink": {
"type": "file",
"config": {
"filename": f"{tmp_path}/{mce_out}",
},
},
}
)
pipeline.run()
pipeline.pretty_print_summary()
pipeline.raise_from_status(raise_warnings=False)
assert pipeline.source.get_report().warnings.total_elements == 1
mce_helpers.check_golden_file(
pytestconfig,
output_path=tmp_path / mce_out,
golden_path=test_resources_dir / mce_out,
)
@freeze_time(FROZEN_TIME)
def test_lookml_ingest_offline_with_model_deny(pytestconfig, tmp_path, mock_time):
"""New form of config with offline specification of connection defaults"""
test_resources_dir = pytestconfig.rootpath / "tests/integration/lookml"
mce_out = "lookml_mces_offline_deny_pattern.json"
pipeline = Pipeline.create(
{
"run_id": "lookml-test",
"source": {
"type": "lookml",
"config": {
"base_folder": str(test_resources_dir / "lkml_samples"),
"connection_to_platform_map": {
"my_connection": {
"platform": "snowflake",
"default_db": "default_db",
"default_schema": "default_schema",
}
},
"parse_table_names_from_sql": True,
"project_name": "lkml_samples",
"model_pattern": {"deny": ["data"]},
"emit_reachable_views_only": False,
"process_refinements": False,
},
},
"sink": {
"type": "file",
"config": {
"filename": f"{tmp_path}/{mce_out}",
},
},
}
)
pipeline.run()
pipeline.pretty_print_summary()
pipeline.raise_from_status(raise_warnings=False)
assert pipeline.source.get_report().warnings.total_elements == 1
mce_helpers.check_golden_file(
pytestconfig,
output_path=tmp_path / mce_out,
golden_path=test_resources_dir / mce_out,
)
@freeze_time(FROZEN_TIME)
def test_lookml_ingest_offline_platform_instance(pytestconfig, tmp_path, mock_time):
"""New form of config with offline specification of connection defaults"""
test_resources_dir = pytestconfig.rootpath / "tests/integration/lookml"
mce_out = "lookml_mces_offline_platform_instance.json"
pipeline = Pipeline.create(
{
"run_id": "lookml-test",
"source": {
"type": "lookml",
"config": {
"base_folder": str(test_resources_dir / "lkml_samples"),
"connection_to_platform_map": {
"my_connection": {
"platform": "snowflake",
"platform_instance": "warehouse",
"platform_env": "dev",
"default_db": "default_db",
"default_schema": "default_schema",
}
},
"parse_table_names_from_sql": True,
"project_name": "lkml_samples",
"model_pattern": {"deny": ["data2"]},
"emit_reachable_views_only": False,
"process_refinements": False,
},
},
"sink": {
"type": "file",
"config": {
"filename": f"{tmp_path}/{mce_out}",
},
},
}
)
pipeline.run()
pipeline.pretty_print_summary()
pipeline.raise_from_status(raise_warnings=False)
assert pipeline.source.get_report().warnings.total_elements == 1
mce_helpers.check_golden_file(
pytestconfig,
output_path=tmp_path / mce_out,
golden_path=test_resources_dir / mce_out,
)
@freeze_time(FROZEN_TIME)
def test_lookml_ingest_api_bigquery(pytestconfig, tmp_path, mock_time):
# test with BigQuery connection
ingestion_test(
pytestconfig,
tmp_path,
mock_time,
DBConnection(
dialect_name="bigquery", host="project-foo", database="default-db"
),
)
@freeze_time(FROZEN_TIME)
def test_lookml_ingest_api_hive(pytestconfig, tmp_path, mock_time):
# test with Hive connection
ingestion_test(
pytestconfig,
tmp_path,
mock_time,
DBConnection(
dialect_name="hive2",
database="default-hive-db",
),
)
def ingestion_test(
pytestconfig: Any,
tmp_path: pathlib.Path,
mock_time: int,
mock_connection: DBConnection,
) -> None:
test_resources_dir = pytestconfig.rootpath / "tests/integration/lookml"
mce_out_file = f"lookml_mces_api_{mock_connection.dialect_name}.json"
mocked_client = mock.MagicMock()
mock_model = mock.MagicMock(project_name="lkml_samples")
with mock.patch("looker_sdk.init40") as mock_sdk:
mock_sdk.return_value = mocked_client
# mock_connection = mock.MagicMock()
mocked_client.connection.return_value = mock_connection
mocked_client.lookml_model.return_value = mock_model
pipeline = Pipeline.create(
{
"run_id": "lookml-test",
"source": {
"type": "lookml",
"config": {
"base_folder": str(test_resources_dir / "lkml_samples"),
"api": {
"client_id": "fake_client_id",
"client_secret": "fake_secret",
"base_url": "fake_account.looker.com",
},
"parse_table_names_from_sql": True,
"model_pattern": {"deny": ["data2"]},
"emit_reachable_views_only": False,
"process_refinements": False,
"liquid_variable": {
"order_region": "ap-south-1",
},
},
},
"sink": {
"type": "file",
"config": {
"filename": f"{tmp_path}/{mce_out_file}",
},
},
}
)
pipeline.run()
pipeline.pretty_print_summary()
pipeline.raise_from_status(raise_warnings=False)
assert pipeline.source.get_report().warnings.total_elements == 1
mce_helpers.check_golden_file(
pytestconfig,
output_path=tmp_path / mce_out_file,
golden_path=test_resources_dir / mce_out_file,
)
@freeze_time(FROZEN_TIME)
def test_lookml_git_info(pytestconfig, tmp_path, mock_time):
"""Add github info to config"""
test_resources_dir = pytestconfig.rootpath / "tests/integration/lookml"
mce_out = "lookml_mces_with_external_urls.json"
pipeline = Pipeline.create(
{
"run_id": "lookml-test",
"source": {
"type": "lookml",
"config": {
"base_folder": str(test_resources_dir / "lkml_samples"),
"connection_to_platform_map": {
"my_connection": {
"platform": "snowflake",
"default_db": "default_db",
"default_schema": "default_schema",
}
},
"parse_table_names_from_sql": True,
"project_name": "lkml_samples",
"model_pattern": {"deny": ["data2"]},
"git_info": {"repo": "datahub/looker-demo", "branch": "master"},
"emit_reachable_views_only": False,
"process_refinements": False,
},
},
"sink": {
"type": "file",
"config": {
"filename": f"{tmp_path}/{mce_out}",
},
},
}
)
pipeline.run()
pipeline.pretty_print_summary()
pipeline.raise_from_status(raise_warnings=False)
assert pipeline.source.get_report().warnings.total_elements == 1
mce_helpers.check_golden_file(
pytestconfig,
output_path=tmp_path / mce_out,
golden_path=test_resources_dir / mce_out,
)
@freeze_time(FROZEN_TIME)
def test_reachable_views(pytestconfig, tmp_path, mock_time):
"""Test for reachable views"""
test_resources_dir = pytestconfig.rootpath / "tests/integration/lookml"
mce_out = "lookml_reachable_views.json"
pipeline = Pipeline.create(
{
"run_id": "lookml-test",
"source": {
"type": "lookml",
"config": {
"base_folder": str(test_resources_dir / "lkml_samples"),
"connection_to_platform_map": {
"my_connection": {
"platform": "snowflake",
"platform_instance": "warehouse",
"platform_env": "dev",
"default_db": "default_db",
"default_schema": "default_schema",
},
"my_other_connection": {
"platform": "redshift",
"platform_instance": "rs_warehouse",
"platform_env": "dev",
"default_db": "default_db",
"default_schema": "default_schema",
},
},
"parse_table_names_from_sql": True,
"project_name": "lkml_samples",
"emit_reachable_views_only": True,
"process_refinements": False,
},
},
"sink": {
"type": "file",
"config": {
"filename": f"{tmp_path}/{mce_out}",
},
},
}
)
pipeline.run()
pipeline.pretty_print_summary()
pipeline.raise_from_status(raise_warnings=True)
mce_helpers.check_golden_file(
pytestconfig,
output_path=tmp_path / mce_out,
golden_path=test_resources_dir / mce_out,
)
entity_urns = mce_helpers.get_entity_urns(tmp_path / mce_out)
# we should only have three views discoverable
assert len(entity_urns) == 3
assert (
"urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.view.my_view,PROD)"
in entity_urns
)
assert (
"urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.view.my_view2,PROD)"
in entity_urns
)
assert (
"urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.view.owners,PROD)"
in entity_urns
)
@freeze_time(FROZEN_TIME)
def test_hive_platform_drops_ids(pytestconfig, tmp_path, mock_time):
"""Test omit db name from hive ids"""
test_resources_dir = pytestconfig.rootpath / "tests/integration/lookml"
mce_out = "lookml_mces_with_db_name_omitted.json"
pipeline = Pipeline.create(
{
"run_id": "lookml-test",
"source": {
"type": "lookml",
"config": {
"base_folder": str(test_resources_dir / "lkml_samples_hive"),
"connection_to_platform_map": {
"my_connection": {
"platform": "hive",
"default_db": "default_database",
"default_schema": "default_schema",
}
},
"parse_table_names_from_sql": True,
"project_name": "lkml_samples",
"model_pattern": {"deny": ["data2"]},
"git_info": {"repo": "datahub/looker-demo", "branch": "master"},
"emit_reachable_views_only": False,
"process_refinements": False,
},
},
"sink": {
"type": "file",
"config": {
"filename": f"{tmp_path}/{mce_out}",
},
},
}
)
pipeline.run()
pipeline.pretty_print_summary()
pipeline.raise_from_status(raise_warnings=False)
assert pipeline.source.get_report().warnings.total_elements == 1
events = read_metadata_file(tmp_path / mce_out)
for mce in events:
if isinstance(mce, MetadataChangeEventClass):
if isinstance(mce.proposedSnapshot, DatasetSnapshotClass):
lineage_aspects = [
a
for a in mce.proposedSnapshot.aspects
if isinstance(a, UpstreamLineageClass)
]
for a in lineage_aspects:
for upstream in a.upstreams:
assert "hive." not in upstream.dataset
@freeze_time(FROZEN_TIME)
def test_lookml_stateful_ingestion(pytestconfig, tmp_path, mock_time):
output_file_name: str = "lookml_mces.json"
state_file_name: str = "lookml_state_mces.json"
golden_file_name: str = "golden_test_state.json"
test_resources_dir = pytestconfig.rootpath / "tests/integration/lookml"
base_pipeline_config = {
"run_id": "lookml-test",
"pipeline_name": "lookml_stateful",
"source": {
"type": "lookml",
"config": {
"base_folder": str(test_resources_dir / "lkml_samples"),
"connection_to_platform_map": {"my_connection": "conn"},
"parse_table_names_from_sql": True,
"tag_measures_and_dimensions": False,
"project_name": "lkml_samples",
"model_pattern": {"deny": ["data2"]},
"emit_reachable_views_only": False,
"stateful_ingestion": {
"enabled": True,
"remove_stale_metadata": True,
"fail_safe_threshold": 100.0,
"state_provider": {
"type": "file",
"config": {
"filename": f"{tmp_path}/{state_file_name}",
},
},
},
},
},
"sink": {
"type": "file",
"config": {
"filename": f"{tmp_path}/{output_file_name}",
},
},
}
pipeline_run1 = Pipeline.create(base_pipeline_config)
pipeline_run1.run()
pipeline_run1.raise_from_status()
pipeline_run1.pretty_print_summary()
mce_helpers.check_golden_file(
pytestconfig,
output_path=f"{tmp_path}/{state_file_name}",
golden_path=f"{test_resources_dir}/{golden_file_name}",
)
checkpoint1 = get_current_checkpoint_from_pipeline(pipeline_run1)
assert checkpoint1
assert checkpoint1.state
def test_lookml_base_folder():
fake_api = {
"base_url": "https://filler.cloud.looker.com",
"client_id": "this-is-fake",
"client_secret": "this-is-also-fake",
}
LookMLSourceConfig.parse_obj(
{
"git_info": {
"repo": "acryldata/long-tail-companions-looker",
"deploy_key": "this-is-fake",
},
"api": fake_api,
}
)
with pytest.raises(
pydantic.ValidationError, match=r"base_folder.+nor.+git_info.+provided"
):
LookMLSourceConfig.parse_obj({"api": fake_api})
@freeze_time(FROZEN_TIME)
def test_same_name_views_different_file_path(pytestconfig, tmp_path, mock_time):
"""Test for reachable views"""
test_resources_dir = pytestconfig.rootpath / "tests/integration/lookml"
mce_out = "lookml_same_name_views_different_file_path.json"
pipeline = Pipeline.create(
{
"run_id": "lookml-test",
"source": {
"type": "lookml",
"config": {
"base_folder": str(
test_resources_dir
/ "lkml_same_name_views_different_file_path_samples"
),
"connection_to_platform_map": {
"my_connection": {
"platform": "snowflake",
"platform_instance": "warehouse",
"platform_env": "dev",
"default_db": "default_db",
"default_schema": "default_schema",
},
},
"parse_table_names_from_sql": True,
"project_name": "lkml_samples",
"process_refinements": False,
"view_naming_pattern": "{project}.{file_path}.view.{name}",
"view_browse_pattern": "/{env}/{platform}/{project}/{file_path}/views",
},
},
"sink": {
"type": "file",
"config": {
"filename": f"{tmp_path}/{mce_out}",
},
},
}
)
pipeline.run()
pipeline.pretty_print_summary()
pipeline.raise_from_status(raise_warnings=True)
mce_helpers.check_golden_file(
pytestconfig,
output_path=tmp_path / mce_out,
golden_path=test_resources_dir / mce_out,
)
def test_manifest_parser(pytestconfig: pytest.Config) -> None:
# This mainly tests that we're permissive enough that we don't crash when parsing the manifest file.
# We need the test because we monkeypatch the lkml library.
test_resources_dir = pytestconfig.rootpath / "tests/integration/lookml"
manifest_file = test_resources_dir / "lkml_manifest_samples/complex-manifest.lkml"
manifest = load_and_preprocess_file(
path=manifest_file, source_config=MagicMock(), reporter=LookMLSourceReport()
)
assert manifest
@freeze_time(FROZEN_TIME)
def test_duplicate_field_ingest(pytestconfig, tmp_path, mock_time):
test_resources_dir = pytestconfig.rootpath / "tests/integration/lookml"
mce_out_file = "duplicate_ingest_mces_output.json"
new_recipe = get_default_recipe(
f"{tmp_path}/{mce_out_file}",
f"{test_resources_dir}/lkml_samples_duplicate_field",
)
pipeline = Pipeline.create(new_recipe)
pipeline.run()
pipeline.pretty_print_summary()
pipeline.raise_from_status(raise_warnings=True)
golden_path = test_resources_dir / "duplicate_field_ingestion_golden.json"
mce_helpers.check_golden_file(
pytestconfig,
output_path=tmp_path / mce_out_file,
golden_path=golden_path,
)
@freeze_time(FROZEN_TIME)
def test_view_to_view_lineage_and_liquid_template(pytestconfig, tmp_path, mock_time):
test_resources_dir = pytestconfig.rootpath / "tests/integration/lookml"
mce_out_file = "vv_lineage_liquid_template_golden.json"
new_recipe = get_default_recipe(
f"{tmp_path}/{mce_out_file}",
f"{test_resources_dir}/vv-lineage-and-liquid-templates",
)
new_recipe["source"]["config"]["liquid_variable"] = {
"_user_attributes": {
"looker_env": "dev",
"dev_database_prefix": "employee",
"dev_schema_prefix": "public",
},
"dw_eff_dt_date": {
"_is_selected": True,
},
"source_region": "ap-south-1",
}
pipeline = Pipeline.create(new_recipe)
pipeline.run()
pipeline.pretty_print_summary()
pipeline.raise_from_status(raise_warnings=True)
golden_path = test_resources_dir / "vv_lineage_liquid_template_golden.json"
mce_helpers.check_golden_file(
pytestconfig,
output_path=tmp_path / mce_out_file,
golden_path=golden_path,
)
@freeze_time(FROZEN_TIME)
def test_view_to_view_lineage_and_lookml_constant(pytestconfig, tmp_path, mock_time):
test_resources_dir = pytestconfig.rootpath / "tests/integration/lookml"
mce_out_file = "vv_lineage_lookml_constant_golden.json"
new_recipe = get_default_recipe(
f"{tmp_path}/{mce_out_file}",
f"{test_resources_dir}/vv-lineage-and-lookml-constant",
)
new_recipe["source"]["config"]["lookml_constants"] = {"winner_table": "dev"}
pipeline = Pipeline.create(new_recipe)
pipeline.run()
pipeline.pretty_print_summary()
assert pipeline.source.get_report().warnings.total_elements == 1
golden_path = test_resources_dir / "vv_lineage_lookml_constant_golden.json"
mce_helpers.check_golden_file(
pytestconfig,
output_path=tmp_path / mce_out_file,
golden_path=golden_path,
)
@freeze_time(FROZEN_TIME)
def test_special_liquid_variables():
text: str = """{% assign source_table_variable = "source_table" | sql_quote | non_existing_filter_where_it_should_not_fail %}
SELECT
employee_id,
employee_name,
{% if dw_eff_dt_date._is_selected or finance_dw_eff_dt_date._is_selected %}
prod_core.data.r_metric_summary_v2
{% elsif dw_eff_dt_week._is_selected or finance_dw_eff_dt_week._in_query %}
prod_core.data.r_metric_summary_v3
{% elsif dw_eff_dt_week._is_selected or finance_dw_eff_dt_week._is_filtered %}
prod_core.data.r_metric_summary_v4
{% else %}
'default_table' as source
{% endif %},
employee_income
FROM {{ source_table_variable }}
"""
input_liquid_variable: dict = {}
expected_liquid_variable: dict = {
**input_liquid_variable,
"dw_eff_dt_date": {"_is_selected": True},
"finance_dw_eff_dt_date": {"_is_selected": True},
"dw_eff_dt_week": {"_is_selected": True},
"finance_dw_eff_dt_week": {
"_in_query": True,
"_is_filtered": True,
},
}
actual_liquid_variable = SpecialVariable(
input_liquid_variable
).liquid_variable_with_default(text)
assert (
expected_liquid_variable == actual_liquid_variable
) # Here new keys with default value should get added
# change input
input_liquid_variable = {
"finance_dw_eff_dt_week": {"_is_filtered": False},
}
expected_liquid_variable = {
**input_liquid_variable,
"dw_eff_dt_date": {"_is_selected": True},
"finance_dw_eff_dt_date": {"_is_selected": True},
"dw_eff_dt_week": {"_is_selected": True},
"finance_dw_eff_dt_week": {
"_in_query": True,
"_is_filtered": False,
},
}
actual_liquid_variable = SpecialVariable(
input_liquid_variable
).liquid_variable_with_default(text)
assert (
expected_liquid_variable == actual_liquid_variable
) # should not overwrite the actual value present in
# input_liquid_variable
# Match template after resolution of liquid variables
actual_text = resolve_liquid_variable(
text=text,
liquid_variable=input_liquid_variable,
report=LookMLSourceReport(),
view_name="test",
)
expected_text: str = (
"\n SELECT\n employee_id,\n employee_name,\n \n "
"prod_core.data.r_metric_summary_v2\n ,\n employee_income\n FROM "
"'source_table'\n "
)
assert actual_text == expected_text
@pytest.mark.parametrize(
"view, expected_result, warning_expected",
[
# Case 1: Single constant replacement in sql_table_name
(
{"sql_table_name": "@{constant1}.kafka_streaming.events"},
{"datahub_transformed_sql_table_name": "value1.kafka_streaming.events"},
False,
),
# Case 2: Single constant replacement with config-defined constant
(
{"sql_table_name": "SELECT * FROM @{constant2}"},
{"datahub_transformed_sql_table_name": "SELECT * FROM value2"},
False,
),
# Case 3: Multiple constants in a derived_table SQL query
(
{"derived_table": {"sql": "SELECT @{constant1}, @{constant3}"}},
{
"derived_table": {
"datahub_transformed_sql": "SELECT value1, manifest_value3"
}
},
False,
),
# Case 4: Non-existent constant in sql_table_name
(
{"sql_table_name": "SELECT * FROM @{nonexistent}"},
{"datahub_transformed_sql_table_name": "SELECT * FROM @{nonexistent}"},
False,
),
# Case 5: View with unsupported attribute
({"unsupported_attribute": "SELECT * FROM @{constant1}"}, {}, False),
# Case 6: View with no transformable attributes
(
{"sql_table_name": "SELECT * FROM table_name"},
{"datahub_transformed_sql_table_name": "SELECT * FROM table_name"},
False,
),
# Case 7: Constants only in manifest_constants
(
{"sql_table_name": "SELECT @{constant3}"},
{"datahub_transformed_sql_table_name": "SELECT manifest_value3"},
False,
),
# Case 8: Constants only in lookml_constants
(
{"sql_table_name": "SELECT @{constant2}"},
{"datahub_transformed_sql_table_name": "SELECT value2"},
False,
),
# Case 9: Multiple unsupported attributes
(
{
"unsupported_attribute": "SELECT @{constant1}",
"another_unsupported_attribute": "SELECT @{constant2}",
},
{},
False,
),
# Case 10: Misplaced lookml constant
(
{"sql_table_name": "@{constant1}.@{constant2}.@{constant4}"},
{"datahub_transformed_sql_table_name": "value1.value2.@{constant4}"},
True,
),
],
)
@freeze_time(FROZEN_TIME)
def test_lookml_constant_transformer(view, expected_result, warning_expected):
"""
Test LookmlConstantTransformer with various view structures.
"""
config = MagicMock()
report = MagicMock()
config.lookml_constants = {
"constant1": "value1",
"constant2": "value2",
}
config.liquid_variables = {
"constant4": "liquid_value1",
}
transformer = LookmlConstantTransformer(
source_config=config,
reporter=report,
manifest_constants={
"constant1": LookerConstant(name="constant1", value="manifest_value1"),
"constant3": LookerConstant(name="constant3", value="manifest_value3"),
},
)
result = transformer.transform(view)
assert result == expected_result
if warning_expected:
report.warning.assert_called_once_with(
title="Misplaced lookml constant",
message="Use 'lookml_constants' instead of 'liquid_variables'.",
context="Key constant4",
)
@freeze_time(FROZEN_TIME)
def test_field_tag_ingest(pytestconfig, tmp_path, mock_time):
test_resources_dir = pytestconfig.rootpath / "tests/integration/lookml"
mce_out_file = "field_tag_mces_output.json"
new_recipe = get_default_recipe(
f"{tmp_path}/{mce_out_file}",
f"{test_resources_dir}/lkml_samples_duplicate_field",
)
new_recipe["source"]["config"]["tag_measures_and_dimensions"] = True
pipeline = Pipeline.create(new_recipe)
pipeline.run()
pipeline.pretty_print_summary()
pipeline.raise_from_status(raise_warnings=True)
golden_path = test_resources_dir / "field_tag_ingestion_golden.json"
mce_helpers.check_golden_file(
pytestconfig,
output_path=tmp_path / mce_out_file,
golden_path=golden_path,
)
@freeze_time(FROZEN_TIME)
def test_drop_hive(pytestconfig, tmp_path, mock_time):
test_resources_dir = pytestconfig.rootpath / "tests/integration/lookml"
mce_out_file = "drop_hive_dot.json"
new_recipe = get_default_recipe(
f"{tmp_path}/{mce_out_file}",
f"{test_resources_dir}/drop_hive_dot",
)
new_recipe["source"]["config"]["connection_to_platform_map"] = {
"my_connection": "hive"
}
pipeline = Pipeline.create(new_recipe)
pipeline.run()
pipeline.pretty_print_summary()
pipeline.raise_from_status(raise_warnings=True)
golden_path = test_resources_dir / "drop_hive_dot_golden.json"
mce_helpers.check_golden_file(
pytestconfig,
output_path=tmp_path / mce_out_file,
golden_path=golden_path,
)
@freeze_time(FROZEN_TIME)
def test_gms_schema_resolution(pytestconfig, tmp_path, mock_time):
test_resources_dir = pytestconfig.rootpath / "tests/integration/lookml"
mce_out_file = "drop_hive_dot.json"
new_recipe = get_default_recipe(
f"{tmp_path}/{mce_out_file}",
f"{test_resources_dir}/gms_schema_resolution",
)
new_recipe["source"]["config"]["connection_to_platform_map"] = {
"my_connection": "hive"
}
return_value: Tuple[str, Optional[SchemaInfo]] = (
"fake_dataset_urn",
{
"Id": "String",
"Name": "String",
"source": "String",
},
)
with patch.object(SchemaResolver, "resolve_urn", return_value=return_value):
pipeline = Pipeline.create(new_recipe)
pipeline.run()
pipeline.pretty_print_summary()
pipeline.raise_from_status(raise_warnings=True)
golden_path = test_resources_dir / "gms_schema_resolution_golden.json"
mce_helpers.check_golden_file(
pytestconfig,
output_path=tmp_path / mce_out_file,
golden_path=golden_path,
)
@freeze_time(FROZEN_TIME)
def test_unreachable_views(pytestconfig):
test_resources_dir = pytestconfig.rootpath / "tests/integration/lookml"
config = {
"base_folder": f"{test_resources_dir}/lkml_unreachable_views",
"connection_to_platform_map": {"my_connection": "postgres"},
"parse_table_names_from_sql": True,
"tag_measures_and_dimensions": False,
"project_name": "lkml_samples",
"model_pattern": {"deny": ["data2"]},
"emit_reachable_views_only": False,
"liquid_variable": {
"order_region": "ap-south-1",
"source_region": "ap-south-1",
"dw_eff_dt_date": {
"_is_selected": True,
},
},
}
source = LookMLSource(
LookMLSourceConfig.parse_obj(config),
ctx=PipelineContext(run_id="lookml-source-test"),
)
wu: List[MetadataWorkUnit] = [*source.get_workunits_internal()]
assert len(wu) == 15
assert source.reporter.warnings.total_elements == 1
assert (
"The Looker view file was skipped because it may not be referenced by any models."
in [failure.message for failure in source.get_report().warnings]
)