import logging import pathlib from typing import Any, Dict, List, Optional, cast from unittest import mock import pydantic import pytest from deepdiff import DeepDiff from freezegun import freeze_time from looker_sdk.sdk.api40.models import DBConnection from datahub.configuration.common import PipelineExecutionError from datahub.ingestion.run.pipeline import Pipeline from datahub.ingestion.source.file import read_metadata_file from datahub.ingestion.source.looker.lookml_source import ( LookerModel, LookerRefinementResolver, LookMLSource, LookMLSourceConfig, ) from datahub.ingestion.source.state.checkpoint import Checkpoint from datahub.ingestion.source.state.entity_removal_state import GenericCheckpointState from datahub.metadata.schema_classes import ( DatasetSnapshotClass, MetadataChangeEventClass, UpstreamLineageClass, ) from tests.test_helpers import mce_helpers from tests.test_helpers.state_helpers import ( validate_all_providers_have_committed_successfully, ) logging.getLogger("lkml").setLevel(logging.INFO) FROZEN_TIME = "2020-04-14 07:00:00" GMS_PORT = 8080 GMS_SERVER = f"http://localhost:{GMS_PORT}" def get_default_recipe(output_file_path, base_folder_path): return { "run_id": "lookml-test", "source": { "type": "lookml", "config": { "base_folder": base_folder_path, "connection_to_platform_map": {"my_connection": "conn"}, "parse_table_names_from_sql": True, "tag_measures_and_dimensions": False, "project_name": "lkml_samples", "model_pattern": {"deny": ["data2"]}, "emit_reachable_views_only": False, }, }, "sink": { "type": "file", "config": { "filename": f"{output_file_path}", }, }, } @freeze_time(FROZEN_TIME) def test_lookml_ingest(pytestconfig, tmp_path, mock_time): """Test backwards compatibility with previous form of config with new flags turned off""" test_resources_dir = pytestconfig.rootpath / "tests/integration/lookml" mce_out_file = "expected_output.json" # Note this config below is known to create "bad" lineage since the config author has not provided enough information # to resolve relative table names (which are not fully qualified) # We keep this check just to validate that ingestion doesn't croak on this config pipeline = Pipeline.create( get_default_recipe( f"{tmp_path}/{mce_out_file}", f"{test_resources_dir}/lkml_samples" ) ) pipeline.run() pipeline.pretty_print_summary() pipeline.raise_from_status(raise_warnings=True) mce_helpers.check_golden_file( pytestconfig, output_path=tmp_path / mce_out_file, golden_path=test_resources_dir / mce_out_file, ) @freeze_time(FROZEN_TIME) def test_lookml_refinement_ingest(pytestconfig, tmp_path, mock_time): """Test backwards compatibility with previous form of config with new flags turned off""" test_resources_dir = pytestconfig.rootpath / "tests/integration/lookml" mce_out_file = "refinement_mces_output.json" # Note this config below is known to create "bad" lineage since the config author has not provided enough information # to resolve relative table names (which are not fully qualified) # We keep this check just to validate that ingestion doesn't croak on this config new_recipe = get_default_recipe( f"{tmp_path}/{mce_out_file}", f"{test_resources_dir}/lkml_samples" ) new_recipe["source"]["config"]["process_refinements"] = True pipeline = Pipeline.create(new_recipe) pipeline.run() pipeline.pretty_print_summary() pipeline.raise_from_status(raise_warnings=True) golden_path = test_resources_dir / "refinements_ingestion_golden.json" mce_helpers.check_golden_file( pytestconfig, output_path=tmp_path / mce_out_file, golden_path=golden_path, ) @freeze_time(FROZEN_TIME) def test_lookml_refinement_include_order(pytestconfig, tmp_path, mock_time): test_resources_dir = pytestconfig.rootpath / "tests/integration/lookml" mce_out_file = "refinement_include_order_mces_output.json" new_recipe = get_default_recipe( f"{tmp_path}/{mce_out_file}", f"{test_resources_dir}/lkml_refinement_samples/sample1", ) new_recipe["source"]["config"]["process_refinements"] = True new_recipe["source"]["config"]["project_name"] = "lkml_refinement_sample1" new_recipe["source"]["config"]["view_naming_pattern"] = { "pattern": "{project}.{model}.view.{name}" } new_recipe["source"]["config"]["connection_to_platform_map"] = { "db-connection": "conn" } pipeline = Pipeline.create(new_recipe) pipeline.run() pipeline.pretty_print_summary() pipeline.raise_from_status(raise_warnings=True) golden_path = test_resources_dir / "refinement_include_order_golden.json" mce_helpers.check_golden_file( pytestconfig, output_path=tmp_path / mce_out_file, golden_path=golden_path, ) @freeze_time(FROZEN_TIME) def test_lookml_explore_refinement(pytestconfig, tmp_path, mock_time): looker_model = LookerModel( explores=[ { "name": "book", }, {"name": "+book", "extends__all": [["order"]]}, {"name": "+book", "extends__all": [["transaction"]]}, ], connection=str(), resolved_includes=[], includes=[], ) refinement_resolver = LookerRefinementResolver( looker_model=looker_model, looker_viewfile_loader=None, # type: ignore reporter=None, # type: ignore source_config=LookMLSourceConfig.parse_obj( { "process_refinements": "True", "base_folder": ".", "api": { "base_url": "fake", "client_id": "fake_client_id", "client_secret": "fake_client_secret", }, } ), connection_definition=None, # type: ignore ) new_explore: dict = refinement_resolver.apply_explore_refinement( looker_model.explores[0] ) assert new_explore.get("extends") is not None assert new_explore["extends"].sort() == ["order", "transaction"].sort() @freeze_time(FROZEN_TIME) def test_lookml_view_merge(pytestconfig, tmp_path, mock_time): raw_view: dict = { "sql_table_name": "flightstats.accidents", "dimensions": [ { "type": "number", "primary_key": "yes", "sql": '${TABLE}."id"', "name": "id", } ], "name": "flights", } refinement_views: List[dict] = [ { "dimensions": [ { "type": "string", "sql": '${TABLE}."air_carrier"', "name": "air_carrier", } ], "name": "+flights", }, { "measures": [ {"type": "average", "sql": "${distance}", "name": "distance_avg"}, { "type": "number", "sql": "STDDEV(${distance})", "name": "distance_stddev", }, ], "dimensions": [ { "type": "tier", "sql": "${distance}", "tiers": [500, 1300], "name": "distance_tiered2", }, ], "name": "+flights", }, { "dimension_groups": [ { "type": "duration", "intervals": ["week", "year"], "sql_start": '${TABLE}."enrollment_date"', "sql_end": '${TABLE}."graduation_date"', "name": "enrolled", }, ], "name": "+flights", }, { "dimensions": [{"type": "string", "sql": '${TABLE}."id"', "name": "id"}], "name": "+flights", }, ] merged_view: dict = LookerRefinementResolver.merge_refinements( raw_view=raw_view, refinement_views=refinement_views ) expected_view: dict = { "sql_table_name": "flightstats.accidents", "dimensions": [ { "type": "string", "primary_key": "yes", "sql": '${TABLE}."id"', "name": "id", }, {"type": "string", "sql": '${TABLE}."air_carrier"', "name": "air_carrier"}, { "type": "tier", "sql": "${distance}", "tiers": [500, 1300], "name": "distance_tiered2", }, ], "name": "flights", "measures": [ {"type": "average", "sql": "${distance}", "name": "distance_avg"}, {"type": "number", "sql": "STDDEV(${distance})", "name": "distance_stddev"}, ], "dimension_groups": [ { "type": "duration", "intervals": ["week", "year"], "sql_start": '${TABLE}."enrollment_date"', "sql_end": '${TABLE}."graduation_date"', "name": "enrolled", } ], } assert DeepDiff(expected_view, merged_view) == {} @freeze_time(FROZEN_TIME) def test_lookml_ingest_offline(pytestconfig, tmp_path, mock_time): """New form of config with offline specification of connection defaults""" test_resources_dir = pytestconfig.rootpath / "tests/integration/lookml" mce_out = "lookml_mces_offline.json" pipeline = Pipeline.create( { "run_id": "lookml-test", "source": { "type": "lookml", "config": { "base_folder": str(test_resources_dir / "lkml_samples"), "connection_to_platform_map": { "my_connection": { "platform": "snowflake", "default_db": "default_db", "default_schema": "default_schema", } }, "parse_table_names_from_sql": True, "project_name": "lkml_samples", "model_pattern": {"deny": ["data2"]}, "emit_reachable_views_only": False, "process_refinements": False, }, }, "sink": { "type": "file", "config": { "filename": f"{tmp_path}/{mce_out}", }, }, } ) pipeline.run() pipeline.pretty_print_summary() pipeline.raise_from_status(raise_warnings=True) mce_helpers.check_golden_file( pytestconfig, output_path=tmp_path / mce_out, golden_path=test_resources_dir / mce_out, ) @freeze_time(FROZEN_TIME) def test_lookml_ingest_offline_with_model_deny(pytestconfig, tmp_path, mock_time): """New form of config with offline specification of connection defaults""" test_resources_dir = pytestconfig.rootpath / "tests/integration/lookml" mce_out = "lookml_mces_offline_deny_pattern.json" pipeline = Pipeline.create( { "run_id": "lookml-test", "source": { "type": "lookml", "config": { "base_folder": str(test_resources_dir / "lkml_samples"), "connection_to_platform_map": { "my_connection": { "platform": "snowflake", "default_db": "default_db", "default_schema": "default_schema", } }, "parse_table_names_from_sql": True, "project_name": "lkml_samples", "model_pattern": {"deny": ["data"]}, "emit_reachable_views_only": False, "process_refinements": False, }, }, "sink": { "type": "file", "config": { "filename": f"{tmp_path}/{mce_out}", }, }, } ) pipeline.run() pipeline.pretty_print_summary() pipeline.raise_from_status(raise_warnings=True) mce_helpers.check_golden_file( pytestconfig, output_path=tmp_path / mce_out, golden_path=test_resources_dir / mce_out, ) @freeze_time(FROZEN_TIME) def test_lookml_ingest_offline_platform_instance(pytestconfig, tmp_path, mock_time): """New form of config with offline specification of connection defaults""" test_resources_dir = pytestconfig.rootpath / "tests/integration/lookml" mce_out = "lookml_mces_offline_platform_instance.json" pipeline = Pipeline.create( { "run_id": "lookml-test", "source": { "type": "lookml", "config": { "base_folder": str(test_resources_dir / "lkml_samples"), "connection_to_platform_map": { "my_connection": { "platform": "snowflake", "platform_instance": "warehouse", "platform_env": "dev", "default_db": "default_db", "default_schema": "default_schema", } }, "parse_table_names_from_sql": True, "project_name": "lkml_samples", "model_pattern": {"deny": ["data2"]}, "emit_reachable_views_only": False, "process_refinements": False, }, }, "sink": { "type": "file", "config": { "filename": f"{tmp_path}/{mce_out}", }, }, } ) pipeline.run() pipeline.pretty_print_summary() pipeline.raise_from_status(raise_warnings=True) mce_helpers.check_golden_file( pytestconfig, output_path=tmp_path / mce_out, golden_path=test_resources_dir / mce_out, ) @freeze_time(FROZEN_TIME) def test_lookml_ingest_api_bigquery(pytestconfig, tmp_path, mock_time): # test with BigQuery connection ingestion_test( pytestconfig, tmp_path, mock_time, DBConnection( dialect_name="bigquery", host="project-foo", database="default-db" ), ) @freeze_time(FROZEN_TIME) def test_lookml_ingest_api_hive(pytestconfig, tmp_path, mock_time): # test with Hive connection ingestion_test( pytestconfig, tmp_path, mock_time, DBConnection( dialect_name="hive2", database="default-hive-db", ), ) def ingestion_test( pytestconfig: Any, tmp_path: pathlib.Path, mock_time: int, mock_connection: DBConnection, ) -> None: test_resources_dir = pytestconfig.rootpath / "tests/integration/lookml" mce_out_file = f"lookml_mces_api_{mock_connection.dialect_name}.json" mocked_client = mock.MagicMock() mock_model = mock.MagicMock(project_name="lkml_samples") with mock.patch("looker_sdk.init40") as mock_sdk: mock_sdk.return_value = mocked_client # mock_connection = mock.MagicMock() mocked_client.connection.return_value = mock_connection mocked_client.lookml_model.return_value = mock_model pipeline = Pipeline.create( { "run_id": "lookml-test", "source": { "type": "lookml", "config": { "base_folder": str(test_resources_dir / "lkml_samples"), "api": { "client_id": "fake_client_id", "client_secret": "fake_secret", "base_url": "fake_account.looker.com", }, "parse_table_names_from_sql": True, "model_pattern": {"deny": ["data2"]}, "emit_reachable_views_only": False, "process_refinements": False, }, }, "sink": { "type": "file", "config": { "filename": f"{tmp_path}/{mce_out_file}", }, }, } ) pipeline.run() pipeline.pretty_print_summary() pipeline.raise_from_status(raise_warnings=True) mce_helpers.check_golden_file( pytestconfig, output_path=tmp_path / mce_out_file, golden_path=test_resources_dir / mce_out_file, ) @freeze_time(FROZEN_TIME) def test_lookml_bad_sql_parser(pytestconfig, tmp_path, mock_time): """Incorrect specification of sql parser should not fail ingestion""" test_resources_dir = pytestconfig.rootpath / "tests/integration/lookml" mce_out = "lookml_mces_badsql_parser.json" pipeline = Pipeline.create( { "run_id": "lookml-test", "source": { "type": "lookml", "config": { "base_folder": str(test_resources_dir / "lkml_samples"), "connection_to_platform_map": { "my_connection": { "platform": "snowflake", "default_db": "default_db", "default_schema": "default_schema", } }, "parse_table_names_from_sql": True, "project_name": "lkml_samples", "sql_parser": "bad.sql.Parser", "emit_reachable_views_only": False, "process_refinements": False, }, }, "sink": { "type": "file", "config": { "filename": f"{tmp_path}/{mce_out}", }, }, } ) pipeline.run() pipeline.pretty_print_summary() pipeline.raise_from_status(raise_warnings=False) with pytest.raises(PipelineExecutionError): # we expect the source to have warnings pipeline.raise_from_status(raise_warnings=True) mce_helpers.check_golden_file( pytestconfig, output_path=tmp_path / mce_out, golden_path=test_resources_dir / mce_out, ) @freeze_time(FROZEN_TIME) def test_lookml_git_info(pytestconfig, tmp_path, mock_time): """Add github info to config""" test_resources_dir = pytestconfig.rootpath / "tests/integration/lookml" mce_out = "lookml_mces_with_external_urls.json" pipeline = Pipeline.create( { "run_id": "lookml-test", "source": { "type": "lookml", "config": { "base_folder": str(test_resources_dir / "lkml_samples"), "connection_to_platform_map": { "my_connection": { "platform": "snowflake", "default_db": "default_db", "default_schema": "default_schema", } }, "parse_table_names_from_sql": True, "project_name": "lkml_samples", "model_pattern": {"deny": ["data2"]}, "github_info": {"repo": "datahub/looker-demo", "branch": "master"}, "emit_reachable_views_only": False, "process_refinements": False, }, }, "sink": { "type": "file", "config": { "filename": f"{tmp_path}/{mce_out}", }, }, } ) pipeline.run() pipeline.pretty_print_summary() pipeline.raise_from_status(raise_warnings=True) mce_helpers.check_golden_file( pytestconfig, output_path=tmp_path / mce_out, golden_path=test_resources_dir / mce_out, ) @freeze_time(FROZEN_TIME) def test_reachable_views(pytestconfig, tmp_path, mock_time): """Test for reachable views""" test_resources_dir = pytestconfig.rootpath / "tests/integration/lookml" mce_out = "lookml_reachable_views.json" pipeline = Pipeline.create( { "run_id": "lookml-test", "source": { "type": "lookml", "config": { "base_folder": str(test_resources_dir / "lkml_samples"), "connection_to_platform_map": { "my_connection": { "platform": "snowflake", "platform_instance": "warehouse", "platform_env": "dev", "default_db": "default_db", "default_schema": "default_schema", }, "my_other_connection": { "platform": "redshift", "platform_instance": "rs_warehouse", "platform_env": "dev", "default_db": "default_db", "default_schema": "default_schema", }, }, "parse_table_names_from_sql": True, "project_name": "lkml_samples", "emit_reachable_views_only": True, "process_refinements": False, }, }, "sink": { "type": "file", "config": { "filename": f"{tmp_path}/{mce_out}", }, }, } ) pipeline.run() pipeline.pretty_print_summary() pipeline.raise_from_status(raise_warnings=True) mce_helpers.check_golden_file( pytestconfig, output_path=tmp_path / mce_out, golden_path=test_resources_dir / mce_out, ) entity_urns = mce_helpers.get_entity_urns(tmp_path / mce_out) # we should only have three views discoverable assert len(entity_urns) == 3 assert ( "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.view.my_view,PROD)" in entity_urns ) assert ( "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.view.my_view2,PROD)" in entity_urns ) assert ( "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.view.owners,PROD)" in entity_urns ) @freeze_time(FROZEN_TIME) def test_hive_platform_drops_ids(pytestconfig, tmp_path, mock_time): """Test omit db name from hive ids""" test_resources_dir = pytestconfig.rootpath / "tests/integration/lookml" mce_out = "lookml_mces_with_db_name_omitted.json" pipeline = Pipeline.create( { "run_id": "lookml-test", "source": { "type": "lookml", "config": { "base_folder": str(test_resources_dir / "lkml_samples_hive"), "connection_to_platform_map": { "my_connection": { "platform": "hive", "default_db": "default_database", "default_schema": "default_schema", } }, "parse_table_names_from_sql": True, "project_name": "lkml_samples", "model_pattern": {"deny": ["data2"]}, "github_info": {"repo": "datahub/looker-demo", "branch": "master"}, "emit_reachable_views_only": False, "process_refinements": False, }, }, "sink": { "type": "file", "config": { "filename": f"{tmp_path}/{mce_out}", }, }, } ) pipeline.run() pipeline.pretty_print_summary() pipeline.raise_from_status(raise_warnings=True) events = read_metadata_file(tmp_path / mce_out) for mce in events: if isinstance(mce, MetadataChangeEventClass): if isinstance(mce.proposedSnapshot, DatasetSnapshotClass): lineage_aspects = [ a for a in mce.proposedSnapshot.aspects if isinstance(a, UpstreamLineageClass) ] for a in lineage_aspects: for upstream in a.upstreams: assert "hive." not in upstream.dataset @freeze_time(FROZEN_TIME) def test_lookml_ingest_stateful(pytestconfig, tmp_path, mock_time, mock_datahub_graph): output_file_name: str = "lookml_mces.json" golden_file_name: str = "expected_output.json" output_file_deleted_name: str = "lookml_mces_deleted_stateful.json" golden_file_deleted_name: str = "lookml_mces_golden_deleted_stateful.json" test_resources_dir = pytestconfig.rootpath / "tests/integration/lookml" base_pipeline_config = { "run_id": "lookml-test", "pipeline_name": "lookml_stateful", "source": { "type": "lookml", "config": { "base_folder": str(test_resources_dir / "lkml_samples"), "connection_to_platform_map": {"my_connection": "conn"}, "parse_table_names_from_sql": True, "tag_measures_and_dimensions": False, "project_name": "lkml_samples", "model_pattern": {"deny": ["data2"]}, "emit_reachable_views_only": False, "stateful_ingestion": { "enabled": True, "remove_stale_metadata": True, "fail_safe_threshold": 100.0, "state_provider": { "type": "datahub", "config": {"datahub_api": {"server": GMS_SERVER}}, }, }, }, }, "sink": { "type": "file", "config": {}, }, } pipeline_run1 = None with mock.patch( "datahub.ingestion.source.state_provider.datahub_ingestion_checkpointing_provider.DataHubGraph", mock_datahub_graph, ) as mock_checkpoint: mock_checkpoint.return_value = mock_datahub_graph pipeline_run1_config: Dict[str, Dict[str, Dict[str, Any]]] = dict( # type: ignore base_pipeline_config # type: ignore ) # Set the special properties for this run pipeline_run1_config["source"]["config"]["emit_reachable_views_only"] = False pipeline_run1_config["sink"]["config"][ "filename" ] = f"{tmp_path}/{output_file_name}" pipeline_run1 = Pipeline.create(pipeline_run1_config) pipeline_run1.run() pipeline_run1.raise_from_status() pipeline_run1.pretty_print_summary() mce_helpers.check_golden_file( pytestconfig, output_path=tmp_path / output_file_name, golden_path=f"{test_resources_dir}/{golden_file_name}", ) checkpoint1 = get_current_checkpoint_from_pipeline(pipeline_run1) assert checkpoint1 assert checkpoint1.state pipeline_run2 = None with mock.patch( "datahub.ingestion.source.state_provider.datahub_ingestion_checkpointing_provider.DataHubGraph", mock_datahub_graph, ) as mock_checkpoint: mock_checkpoint.return_value = mock_datahub_graph pipeline_run2_config: Dict[str, Dict[str, Dict[str, Any]]] = dict(base_pipeline_config) # type: ignore # Set the special properties for this run pipeline_run2_config["source"]["config"]["emit_reachable_views_only"] = True pipeline_run2_config["sink"]["config"][ "filename" ] = f"{tmp_path}/{output_file_deleted_name}" pipeline_run2 = Pipeline.create(pipeline_run2_config) pipeline_run2.run() pipeline_run2.raise_from_status() pipeline_run2.pretty_print_summary() mce_helpers.check_golden_file( pytestconfig, output_path=tmp_path / output_file_deleted_name, golden_path=f"{test_resources_dir}/{golden_file_deleted_name}", ) checkpoint2 = get_current_checkpoint_from_pipeline(pipeline_run2) assert checkpoint2 assert checkpoint2.state # Validate that all providers have committed successfully. validate_all_providers_have_committed_successfully( pipeline=pipeline_run1, expected_providers=1 ) validate_all_providers_have_committed_successfully( pipeline=pipeline_run2, expected_providers=1 ) # Perform all assertions on the states. The deleted table should not be # part of the second state state1 = cast(GenericCheckpointState, checkpoint1.state) state2 = cast(GenericCheckpointState, checkpoint2.state) difference_dataset_urns = list( state1.get_urns_not_in(type="dataset", other_checkpoint_state=state2) ) # the difference in dataset urns are all the views that are not reachable from the model file assert len(difference_dataset_urns) == 11 deleted_dataset_urns: List[str] = [ "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.view.fragment_derived_view,PROD)", "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.view.my_derived_view,PROD)", "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.view.test_include_external_view,PROD)", "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.view.extending_looker_events,PROD)", "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.view.customer_facts,PROD)", "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.view.include_able_view,PROD)", "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.view.autodetect_sql_name_based_on_view_name,PROD)", "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.view.ability,PROD)", "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.view.looker_events,PROD)", "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.view.view_derived_explore,PROD)", "urn:li:dataset:(urn:li:dataPlatform:looker,lkml_samples.view.flights,PROD)", ] assert sorted(deleted_dataset_urns) == sorted(difference_dataset_urns) def get_current_checkpoint_from_pipeline( pipeline: Pipeline, ) -> Optional[Checkpoint]: dbt_source = cast(LookMLSource, pipeline.source) return dbt_source.get_current_checkpoint( dbt_source.stale_entity_removal_handler.job_id ) def test_lookml_base_folder(): fake_api = { "base_url": "https://filler.cloud.looker.com", "client_id": "this-is-fake", "client_secret": "this-is-also-fake", } LookMLSourceConfig.parse_obj( { "github_info": { "repo": "acryldata/long-tail-companions-looker", "deploy_key": "this-is-fake", }, "api": fake_api, } ) with pytest.raises( pydantic.ValidationError, match=r"base_folder.+not provided.+deploy_key" ): LookMLSourceConfig.parse_obj({"api": fake_api})