mirror of
https://github.com/datahub-project/datahub.git
synced 2025-07-07 09:11:47 +00:00
1596 lines
57 KiB
Python
1596 lines
57 KiB
Python
import json
|
|
import time
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Optional, Sequence, Tuple, Union, cast
|
|
from unittest import mock
|
|
|
|
import pytest
|
|
from _pytest.config import Config
|
|
from freezegun import freeze_time
|
|
from looker_sdk.rtl import transport
|
|
from looker_sdk.rtl.transport import TransportOptions
|
|
from looker_sdk.sdk.api40.models import (
|
|
Category,
|
|
Dashboard,
|
|
DashboardElement,
|
|
Folder,
|
|
FolderBase,
|
|
Look,
|
|
LookmlModelExplore,
|
|
LookmlModelExploreField,
|
|
LookmlModelExploreFieldset,
|
|
LookmlModelExploreJoins,
|
|
LookWithQuery,
|
|
Query,
|
|
User,
|
|
WriteQuery,
|
|
)
|
|
|
|
from datahub.emitter.mcp import MetadataChangeProposalWrapper
|
|
from datahub.ingestion.api.source import SourceReport
|
|
from datahub.ingestion.run.pipeline import Pipeline, PipelineInitError
|
|
from datahub.ingestion.source.looker import looker_common, looker_usage
|
|
from datahub.ingestion.source.looker.looker_common import (
|
|
LookerDashboardSourceReport,
|
|
LookerExplore,
|
|
)
|
|
from datahub.ingestion.source.looker.looker_config import LookerCommonConfig
|
|
from datahub.ingestion.source.looker.looker_lib_wrapper import (
|
|
LookerAPI,
|
|
LookerAPIConfig,
|
|
)
|
|
from datahub.ingestion.source.looker.looker_query_model import (
|
|
HistoryViewField,
|
|
LookViewField,
|
|
UserViewField,
|
|
)
|
|
from datahub.ingestion.source.state.entity_removal_state import GenericCheckpointState
|
|
from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
|
|
from datahub.metadata.schema_classes import GlobalTagsClass, MetadataChangeEventClass
|
|
from tests.test_helpers import mce_helpers
|
|
from tests.test_helpers.state_helpers import (
|
|
get_current_checkpoint_from_pipeline,
|
|
validate_all_providers_have_committed_successfully,
|
|
)
|
|
|
|
FROZEN_TIME = "2020-04-14 07:00:00"
|
|
GMS_PORT = 8080
|
|
GMS_SERVER = f"http://localhost:{GMS_PORT}"
|
|
|
|
|
|
def get_default_recipe(output_file_path: str) -> Dict[Any, Any]:
|
|
return {
|
|
"run_id": "looker-test",
|
|
"source": {
|
|
"type": "looker",
|
|
"config": {
|
|
"base_url": "https://looker.company.com",
|
|
"client_id": "foo",
|
|
"client_secret": "bar",
|
|
"extract_usage_history": False,
|
|
},
|
|
},
|
|
"sink": {
|
|
"type": "file",
|
|
"config": {
|
|
"filename": output_file_path,
|
|
},
|
|
},
|
|
}
|
|
|
|
|
|
@freeze_time(FROZEN_TIME)
|
|
def test_looker_ingest(pytestconfig, tmp_path, mock_time):
|
|
mocked_client = mock.MagicMock()
|
|
with mock.patch("looker_sdk.init40") as mock_sdk:
|
|
mock_sdk.return_value = mocked_client
|
|
setup_mock_dashboard(mocked_client)
|
|
mocked_client.run_inline_query.side_effect = side_effect_query_inline
|
|
setup_mock_explore(mocked_client)
|
|
|
|
test_resources_dir = pytestconfig.rootpath / "tests/integration/looker"
|
|
|
|
pipeline = Pipeline.create(
|
|
{
|
|
"run_id": "looker-test",
|
|
"source": {
|
|
"type": "looker",
|
|
"config": {
|
|
"base_url": "https://looker.company.com",
|
|
"client_id": "foo",
|
|
"client_secret": "bar",
|
|
"extract_usage_history": False,
|
|
"platform_instance": "ap-south-1",
|
|
"include_platform_instance_in_urns": True,
|
|
},
|
|
},
|
|
"sink": {
|
|
"type": "file",
|
|
"config": {
|
|
"filename": f"{tmp_path}/looker_mces.json",
|
|
},
|
|
},
|
|
}
|
|
)
|
|
pipeline.run()
|
|
pipeline.raise_from_status()
|
|
mce_out_file = "golden_test_ingest.json"
|
|
|
|
mce_helpers.check_golden_file(
|
|
pytestconfig,
|
|
output_path=tmp_path / "looker_mces.json",
|
|
golden_path=f"{test_resources_dir}/{mce_out_file}",
|
|
)
|
|
|
|
|
|
def setup_mock_external_project_view_explore(mocked_client):
|
|
mock_model = mock.MagicMock(project_name="lkml_samples")
|
|
mocked_client.lookml_model.return_value = mock_model
|
|
mocked_client.lookml_model_explore.return_value = LookmlModelExplore(
|
|
id="1",
|
|
name="my_explore_name",
|
|
label="My Explore View",
|
|
description="lorem ipsum",
|
|
view_name="faa_flights",
|
|
project_name="looker_hub",
|
|
fields=LookmlModelExploreFieldset(
|
|
dimensions=[
|
|
LookmlModelExploreField(
|
|
name="dim1",
|
|
type="string",
|
|
dimension_group=None,
|
|
description="dimension one description",
|
|
label_short="Dimensions One Label",
|
|
view="faa_flights",
|
|
source_file="imported_projects/datahub-demo/views/datahub-demo/datasets/faa_flights.view.lkml",
|
|
)
|
|
]
|
|
),
|
|
source_file="test_source_file.lkml",
|
|
)
|
|
|
|
|
|
@freeze_time(FROZEN_TIME)
|
|
def test_looker_ingest_external_project_view(pytestconfig, tmp_path, mock_time):
|
|
mocked_client = mock.MagicMock()
|
|
with mock.patch("looker_sdk.init40") as mock_sdk:
|
|
mock_sdk.return_value = mocked_client
|
|
setup_mock_dashboard(mocked_client)
|
|
setup_mock_external_project_view_explore(mocked_client)
|
|
|
|
test_resources_dir = pytestconfig.rootpath / "tests/integration/looker"
|
|
|
|
pipeline = Pipeline.create(
|
|
{
|
|
"run_id": "looker-test",
|
|
"source": {
|
|
"type": "looker",
|
|
"config": {
|
|
"base_url": "https://looker.company.com",
|
|
"client_id": "foo",
|
|
"client_secret": "bar",
|
|
"extract_usage_history": False,
|
|
},
|
|
},
|
|
"sink": {
|
|
"type": "file",
|
|
"config": {
|
|
"filename": f"{tmp_path}/looker_mces.json",
|
|
},
|
|
},
|
|
}
|
|
)
|
|
pipeline.run()
|
|
pipeline.raise_from_status()
|
|
mce_out_file = "golden_test_external_project_view_mces.json"
|
|
|
|
mce_helpers.check_golden_file(
|
|
pytestconfig,
|
|
output_path=tmp_path / "looker_mces.json",
|
|
golden_path=f"{test_resources_dir}/{mce_out_file}",
|
|
)
|
|
|
|
|
|
@freeze_time(FROZEN_TIME)
|
|
def test_looker_ingest_joins(pytestconfig, tmp_path, mock_time):
|
|
mocked_client = mock.MagicMock()
|
|
with mock.patch("looker_sdk.init40") as mock_sdk:
|
|
mock_sdk.return_value = mocked_client
|
|
setup_mock_dashboard(mocked_client)
|
|
setup_mock_explore_with_joins(mocked_client)
|
|
|
|
test_resources_dir = pytestconfig.rootpath / "tests/integration/looker"
|
|
|
|
pipeline = Pipeline.create(
|
|
{
|
|
"run_id": "looker-test",
|
|
"source": {
|
|
"type": "looker",
|
|
"config": {
|
|
"base_url": "https://looker.company.com",
|
|
"client_id": "foo",
|
|
"client_secret": "bar",
|
|
"extract_usage_history": False,
|
|
},
|
|
},
|
|
"sink": {
|
|
"type": "file",
|
|
"config": {
|
|
"filename": f"{tmp_path}/looker_mces_joins.json",
|
|
},
|
|
},
|
|
}
|
|
)
|
|
pipeline.run()
|
|
pipeline.raise_from_status()
|
|
mce_out_file = "golden_test_ingest_joins.json"
|
|
|
|
mce_helpers.check_golden_file(
|
|
pytestconfig,
|
|
output_path=tmp_path / "looker_mces_joins.json",
|
|
golden_path=f"{test_resources_dir}/{mce_out_file}",
|
|
)
|
|
|
|
|
|
@freeze_time(FROZEN_TIME)
|
|
def test_looker_ingest_unaliased_joins(pytestconfig, tmp_path, mock_time):
|
|
mocked_client = mock.MagicMock()
|
|
with mock.patch("looker_sdk.init40") as mock_sdk:
|
|
mock_sdk.return_value = mocked_client
|
|
mocked_client.all_dashboards.return_value = [Dashboard(id="1")]
|
|
mocked_client.dashboard.return_value = Dashboard(
|
|
id="1",
|
|
title="foo",
|
|
created_at=datetime.utcfromtimestamp(time.time()),
|
|
updated_at=datetime.utcfromtimestamp(time.time()),
|
|
description="lorem ipsum",
|
|
dashboard_elements=[
|
|
DashboardElement(
|
|
id="2",
|
|
type="",
|
|
subtitle_text="Some text",
|
|
query=Query(
|
|
model="data",
|
|
view="my_view",
|
|
fields=["dim1"],
|
|
dynamic_fields='[{"table_calculation":"calc","label":"foobar","expression":"offset(${my_table.value},1)","value_format":null,"value_format_name":"eur","_kind_hint":"measure","_type_hint":"number"}]',
|
|
),
|
|
)
|
|
],
|
|
)
|
|
setup_mock_explore_unaliased_with_joins(mocked_client)
|
|
|
|
test_resources_dir = pytestconfig.rootpath / "tests/integration/looker"
|
|
|
|
pipeline = Pipeline.create(
|
|
{
|
|
"run_id": "looker-test",
|
|
"source": {
|
|
"type": "looker",
|
|
"config": {
|
|
"base_url": "https://looker.company.com",
|
|
"client_id": "foo",
|
|
"client_secret": "bar",
|
|
"extract_usage_history": False,
|
|
},
|
|
},
|
|
"sink": {
|
|
"type": "file",
|
|
"config": {
|
|
"filename": f"{tmp_path}/looker_mces_unaliased_joins.json",
|
|
},
|
|
},
|
|
}
|
|
)
|
|
pipeline.run()
|
|
pipeline.raise_from_status()
|
|
mce_out_file = "golden_test_ingest_unaliased_joins.json"
|
|
|
|
mce_helpers.check_golden_file(
|
|
pytestconfig,
|
|
output_path=tmp_path / "looker_mces_unaliased_joins.json",
|
|
golden_path=f"{test_resources_dir}/{mce_out_file}",
|
|
)
|
|
|
|
|
|
def setup_mock_dashboard(mocked_client):
|
|
mocked_client.all_dashboards.return_value = [Dashboard(id="1")]
|
|
mocked_client.dashboard.return_value = Dashboard(
|
|
id="1",
|
|
title="foo",
|
|
created_at=datetime.utcfromtimestamp(time.time()),
|
|
updated_at=datetime.utcfromtimestamp(time.time()),
|
|
description="lorem ipsum",
|
|
folder=FolderBase(name="Shared", id="shared-folder-id"),
|
|
dashboard_elements=[
|
|
DashboardElement(
|
|
id="2",
|
|
type="vis",
|
|
subtitle_text="Some text",
|
|
query=Query(
|
|
model="data",
|
|
fields=["dim1"],
|
|
view="my_view",
|
|
dynamic_fields='[{"table_calculation":"calc","label":"foobar","expression":"offset(${my_table.value},1)","value_format":null,"value_format_name":"eur","_kind_hint":"measure","_type_hint":"number"}]',
|
|
),
|
|
)
|
|
],
|
|
)
|
|
|
|
|
|
def setup_mock_look(mocked_client):
|
|
mocked_client.all_looks.return_value = [
|
|
Look(
|
|
id="1",
|
|
user_id="1",
|
|
title="Outer Look",
|
|
description="I am not part of any Dashboard",
|
|
query_id="1",
|
|
folder=FolderBase(name="Shared", id="shared-folder-id"),
|
|
),
|
|
Look(
|
|
id="2",
|
|
title="Personal Look",
|
|
user_id="2",
|
|
description="I am not part of any Dashboard and in personal folder",
|
|
query_id="2",
|
|
folder=FolderBase(
|
|
name="Personal",
|
|
id="personal-folder-id",
|
|
is_personal=True,
|
|
is_personal_descendant=True,
|
|
),
|
|
),
|
|
]
|
|
|
|
mocked_client.look.side_effect = [
|
|
LookWithQuery(
|
|
query=Query(
|
|
id="1",
|
|
view="sales_explore",
|
|
model="sales_model",
|
|
fields=[
|
|
"sales.profit",
|
|
],
|
|
dynamic_fields=None,
|
|
filters=None,
|
|
)
|
|
),
|
|
LookWithQuery(
|
|
query=Query(
|
|
id="2",
|
|
view="order_explore",
|
|
model="order_model",
|
|
fields=[
|
|
"order.placed_date",
|
|
],
|
|
dynamic_fields=None,
|
|
filters=None,
|
|
)
|
|
),
|
|
]
|
|
|
|
|
|
def setup_mock_soft_deleted_look(mocked_client):
|
|
mocked_client.search_looks.return_value = [
|
|
Look(
|
|
id="2",
|
|
title="Soft Deleted",
|
|
description="I am not part of any Dashboard",
|
|
query_id="1",
|
|
)
|
|
]
|
|
|
|
|
|
def setup_mock_dashboard_multiple_charts(mocked_client):
|
|
mocked_client.all_dashboards.return_value = [Dashboard(id="1")]
|
|
mocked_client.dashboard.return_value = Dashboard(
|
|
id="11",
|
|
title="foo",
|
|
created_at=datetime.utcfromtimestamp(time.time()),
|
|
updated_at=datetime.utcfromtimestamp(time.time()),
|
|
description="lorem ipsum",
|
|
dashboard_elements=[
|
|
DashboardElement(
|
|
id="2",
|
|
type="",
|
|
subtitle_text="Some text",
|
|
query=Query(
|
|
model="data",
|
|
fields=["dim1"],
|
|
view="my_view",
|
|
dynamic_fields='[{"table_calculation":"calc","label":"foobar","expression":"offset(${my_table.value},1)","value_format":null,"value_format_name":"eur","_kind_hint":"measure","_type_hint":"number"}]',
|
|
),
|
|
),
|
|
DashboardElement(
|
|
id="10",
|
|
type="",
|
|
subtitle_text="Some other text",
|
|
query=Query(
|
|
model="bogus data",
|
|
view="my_view",
|
|
dynamic_fields='[{"table_calculation":"calc","label":"foobar","expression":"offset(${my_table.value},1)","value_format":null,"value_format_name":"eur","_kind_hint":"measure","_type_hint":"number"}]',
|
|
),
|
|
),
|
|
],
|
|
)
|
|
|
|
|
|
def setup_mock_dashboard_with_usage(
|
|
mocked_client: mock.MagicMock, skip_look: bool = False
|
|
) -> None:
|
|
mocked_client.all_dashboards.return_value = [Dashboard(id="1")]
|
|
mocked_client.dashboard.return_value = Dashboard(
|
|
id="1",
|
|
title="foo",
|
|
created_at=datetime.utcfromtimestamp(time.time()),
|
|
updated_at=datetime.utcfromtimestamp(time.time()),
|
|
description="lorem ipsum",
|
|
favorite_count=5,
|
|
view_count=25,
|
|
last_viewed_at=datetime.utcfromtimestamp(time.time()),
|
|
dashboard_elements=[
|
|
DashboardElement(
|
|
id="2",
|
|
type="",
|
|
subtitle_text="Some text",
|
|
query=Query(
|
|
model="data",
|
|
view="my_view",
|
|
dynamic_fields='[{"table_calculation":"calc","label":"foobar","expression":"offset(${my_table.value},1)","value_format":null,"value_format_name":"eur","_kind_hint":"measure","_type_hint":"number"}]',
|
|
),
|
|
),
|
|
DashboardElement(
|
|
id="3",
|
|
type="" if skip_look else "vis", # Looks only ingested if type == `vis`
|
|
look=LookWithQuery(
|
|
id="3",
|
|
view_count=30,
|
|
query=Query(model="look_data", view="look_view"),
|
|
),
|
|
),
|
|
],
|
|
)
|
|
|
|
|
|
def setup_mock_explore_with_joins(mocked_client):
|
|
mock_model = mock.MagicMock(project_name="lkml_samples")
|
|
mocked_client.lookml_model.return_value = mock_model
|
|
mocked_client.lookml_model_explore.return_value = LookmlModelExplore(
|
|
id="1",
|
|
name="my_explore_name",
|
|
label="My Explore View",
|
|
description="lorem ipsum",
|
|
view_name="underlying_view",
|
|
project_name="lkml_samples",
|
|
fields=LookmlModelExploreFieldset(
|
|
dimensions=[
|
|
LookmlModelExploreField(
|
|
name="dim1",
|
|
type="string",
|
|
description="dimension one description",
|
|
label_short="Dimensions One Label",
|
|
)
|
|
]
|
|
),
|
|
source_file="test_source_file.lkml",
|
|
joins=[
|
|
LookmlModelExploreJoins(
|
|
name="my_joined_view",
|
|
dependent_fields=["my_joined_view.field", "bare_field"],
|
|
),
|
|
LookmlModelExploreJoins(
|
|
name="my_view_has_no_fields",
|
|
view_label="My Labeled View",
|
|
relationship="one_to_one",
|
|
sql_on="1=1",
|
|
),
|
|
LookmlModelExploreJoins(
|
|
name="my_joined_view_join_name",
|
|
from_="my_joined_view_original_name",
|
|
),
|
|
],
|
|
)
|
|
|
|
|
|
def setup_mock_explore_unaliased_with_joins(mocked_client):
|
|
mock_model = mock.MagicMock(project_name="lkml_samples")
|
|
mocked_client.lookml_model.return_value = mock_model
|
|
mocked_client.lookml_model_explore.return_value = LookmlModelExplore(
|
|
id="1",
|
|
name="my_view",
|
|
label="My Explore View",
|
|
description="lorem ipsum",
|
|
project_name="lkml_samples",
|
|
fields=LookmlModelExploreFieldset(
|
|
dimensions=[
|
|
LookmlModelExploreField(
|
|
name="dim1",
|
|
type="string",
|
|
dimension_group=None,
|
|
description="dimension one description",
|
|
label_short="Dimensions One Label",
|
|
)
|
|
]
|
|
),
|
|
source_file="test_source_file.lkml",
|
|
joins=[
|
|
LookmlModelExploreJoins(
|
|
name="my_view_has_no_fields",
|
|
view_label="My Labeled View",
|
|
relationship="one_to_one",
|
|
sql_on="1=1",
|
|
),
|
|
LookmlModelExploreJoins(
|
|
name="my_joined_view_join_name",
|
|
from_="my_joined_view_original_name",
|
|
),
|
|
],
|
|
)
|
|
|
|
|
|
def setup_mock_explore(
|
|
mocked_client: Any,
|
|
additional_lkml_fields: Optional[List[LookmlModelExploreField]] = None,
|
|
**additional_explore_fields: Any,
|
|
) -> None:
|
|
additional_lkml_fields = additional_lkml_fields or []
|
|
mock_model = mock.MagicMock(project_name="lkml_samples")
|
|
mocked_client.lookml_model.return_value = mock_model
|
|
|
|
lkml_fields: List[LookmlModelExploreField] = [
|
|
LookmlModelExploreField(
|
|
name="dim1",
|
|
type="string",
|
|
dimension_group=None,
|
|
description="dimension one description",
|
|
label_short="Dimensions One Label",
|
|
)
|
|
]
|
|
lkml_fields.extend(additional_lkml_fields)
|
|
|
|
mocked_client.lookml_model_explore.return_value = LookmlModelExplore(
|
|
id="1",
|
|
name="my_explore_name",
|
|
label="My Explore View",
|
|
description="lorem ipsum",
|
|
view_name="underlying_view",
|
|
project_name="lkml_samples",
|
|
fields=LookmlModelExploreFieldset(
|
|
dimensions=lkml_fields,
|
|
),
|
|
source_file="test_source_file.lkml",
|
|
**additional_explore_fields,
|
|
)
|
|
|
|
|
|
def setup_mock_user(mocked_client):
|
|
def get_user(
|
|
id_: str,
|
|
fields: Optional[str] = None,
|
|
transport_options: Optional[transport.TransportOptions] = None,
|
|
) -> User:
|
|
return User(id=id_, email=f"test-{id_}@looker.com")
|
|
|
|
mocked_client.user.side_effect = get_user
|
|
|
|
|
|
def setup_mock_all_user(mocked_client):
|
|
def all_users(
|
|
fields: Optional[str] = None,
|
|
transport_options: Optional[transport.TransportOptions] = None,
|
|
) -> List[User]:
|
|
return [
|
|
User(id="1", email="test-1@looker.com"),
|
|
User(id="2", email="test-2@looker.com"),
|
|
User(id="3", email="test-3@looker.com"),
|
|
]
|
|
|
|
mocked_client.all_users.side_effect = all_users
|
|
|
|
|
|
def side_effect_query_inline(
|
|
result_format: str, body: WriteQuery, transport_options: Optional[TransportOptions]
|
|
) -> str:
|
|
query_type: looker_usage.QueryId
|
|
if result_format == "sql":
|
|
return "" # Placeholder for sql text
|
|
|
|
for query_id, query_template in looker_usage.query_collection.items():
|
|
if body.fields == query_template.to_write_query().fields:
|
|
query_type = query_id
|
|
break
|
|
|
|
query_id_vs_response = {
|
|
looker_usage.QueryId.DASHBOARD_PER_DAY_USAGE_STAT: json.dumps(
|
|
[
|
|
{
|
|
HistoryViewField.HISTORY_DASHBOARD_ID: "1",
|
|
HistoryViewField.HISTORY_CREATED_DATE: "2022-07-05",
|
|
HistoryViewField.HISTORY_DASHBOARD_USER: 1,
|
|
HistoryViewField.HISTORY_DASHBOARD_RUN_COUNT: 14,
|
|
},
|
|
{
|
|
HistoryViewField.HISTORY_DASHBOARD_ID: "1",
|
|
HistoryViewField.HISTORY_CREATED_DATE: "2022-07-06",
|
|
HistoryViewField.HISTORY_DASHBOARD_USER: 1,
|
|
HistoryViewField.HISTORY_DASHBOARD_RUN_COUNT: 14,
|
|
},
|
|
{
|
|
HistoryViewField.HISTORY_DASHBOARD_ID: "1",
|
|
HistoryViewField.HISTORY_CREATED_DATE: "2022-07-07",
|
|
HistoryViewField.HISTORY_DASHBOARD_USER: 1,
|
|
HistoryViewField.HISTORY_DASHBOARD_RUN_COUNT: 5,
|
|
},
|
|
{
|
|
HistoryViewField.HISTORY_DASHBOARD_ID: "5",
|
|
HistoryViewField.HISTORY_CREATED_DATE: "2022-07-07",
|
|
HistoryViewField.HISTORY_DASHBOARD_USER: 1,
|
|
HistoryViewField.HISTORY_DASHBOARD_RUN_COUNT: 5,
|
|
},
|
|
]
|
|
),
|
|
looker_usage.QueryId.DASHBOARD_PER_USER_PER_DAY_USAGE_STAT: json.dumps(
|
|
[
|
|
{
|
|
HistoryViewField.HISTORY_CREATED_DATE: "2022-07-05",
|
|
HistoryViewField.HISTORY_DASHBOARD_ID: "1",
|
|
UserViewField.USER_ID: 1,
|
|
HistoryViewField.HISTORY_DASHBOARD_RUN_COUNT: 16,
|
|
},
|
|
{
|
|
HistoryViewField.HISTORY_CREATED_DATE: "2022-07-05",
|
|
HistoryViewField.HISTORY_DASHBOARD_ID: "1",
|
|
UserViewField.USER_ID: 2,
|
|
HistoryViewField.HISTORY_DASHBOARD_RUN_COUNT: 14,
|
|
},
|
|
{
|
|
HistoryViewField.HISTORY_CREATED_DATE: "2022-07-07",
|
|
HistoryViewField.HISTORY_DASHBOARD_ID: "1",
|
|
UserViewField.USER_ID: 1,
|
|
HistoryViewField.HISTORY_DASHBOARD_RUN_COUNT: 5,
|
|
},
|
|
]
|
|
),
|
|
looker_usage.QueryId.LOOK_PER_DAY_USAGE_STAT: json.dumps(
|
|
[
|
|
{
|
|
HistoryViewField.HISTORY_CREATED_DATE: "2022-07-05",
|
|
HistoryViewField.HISTORY_COUNT: 10,
|
|
LookViewField.LOOK_ID: 3,
|
|
},
|
|
{
|
|
HistoryViewField.HISTORY_CREATED_DATE: "2022-07-06",
|
|
HistoryViewField.HISTORY_COUNT: 20,
|
|
LookViewField.LOOK_ID: 3,
|
|
},
|
|
{
|
|
HistoryViewField.HISTORY_CREATED_DATE: "2022-07-07",
|
|
HistoryViewField.HISTORY_COUNT: 35,
|
|
LookViewField.LOOK_ID: 3,
|
|
},
|
|
]
|
|
),
|
|
looker_usage.QueryId.LOOK_PER_USER_PER_DAY_USAGE_STAT: json.dumps(
|
|
[
|
|
{
|
|
HistoryViewField.HISTORY_CREATED_DATE: "2022-07-05",
|
|
HistoryViewField.HISTORY_COUNT: 10,
|
|
LookViewField.LOOK_ID: 3,
|
|
UserViewField.USER_ID: 1,
|
|
},
|
|
{
|
|
HistoryViewField.HISTORY_CREATED_DATE: "2022-07-05",
|
|
HistoryViewField.HISTORY_COUNT: 20,
|
|
LookViewField.LOOK_ID: 3,
|
|
UserViewField.USER_ID: 2,
|
|
},
|
|
]
|
|
),
|
|
}
|
|
|
|
if query_id_vs_response.get(query_type) is None:
|
|
raise Exception("Unknown Query")
|
|
|
|
return query_id_vs_response[query_type]
|
|
|
|
|
|
@freeze_time(FROZEN_TIME)
|
|
def test_looker_ingest_allow_pattern(pytestconfig, tmp_path, mock_time):
|
|
mocked_client = mock.MagicMock()
|
|
|
|
with mock.patch("looker_sdk.init40") as mock_sdk:
|
|
mock_sdk.return_value = mocked_client
|
|
setup_mock_dashboard_multiple_charts(mocked_client)
|
|
setup_mock_explore(mocked_client)
|
|
|
|
test_resources_dir = pytestconfig.rootpath / "tests/integration/looker"
|
|
|
|
pipeline = Pipeline.create(
|
|
{
|
|
"run_id": "looker-test",
|
|
"source": {
|
|
"type": "looker",
|
|
"config": {
|
|
"base_url": "https://looker.company.com",
|
|
"client_id": "foo",
|
|
"client_secret": "bar",
|
|
"chart_pattern": {"allow": ["2"]},
|
|
"extract_usage_history": False,
|
|
},
|
|
},
|
|
"sink": {
|
|
"type": "file",
|
|
"config": {
|
|
"filename": f"{tmp_path}/looker_mces.json",
|
|
},
|
|
},
|
|
}
|
|
)
|
|
pipeline.run()
|
|
pipeline.pretty_print_summary()
|
|
pipeline.raise_from_status()
|
|
mce_out_file = "golden_test_allow_ingest.json"
|
|
|
|
mce_helpers.check_golden_file(
|
|
pytestconfig,
|
|
output_path=tmp_path / "looker_mces.json",
|
|
golden_path=f"{test_resources_dir}/{mce_out_file}",
|
|
)
|
|
|
|
|
|
@freeze_time(FROZEN_TIME)
|
|
def test_looker_ingest_usage_history(pytestconfig, tmp_path, mock_time):
|
|
mocked_client = mock.MagicMock()
|
|
with mock.patch("looker_sdk.init40") as mock_sdk:
|
|
mock_sdk.return_value = mocked_client
|
|
setup_mock_dashboard_with_usage(mocked_client)
|
|
mocked_client.run_inline_query.side_effect = side_effect_query_inline
|
|
setup_mock_explore(mocked_client)
|
|
setup_mock_user(mocked_client)
|
|
setup_mock_all_user(mocked_client)
|
|
|
|
test_resources_dir = pytestconfig.rootpath / "tests/integration/looker"
|
|
|
|
temp_output_file = f"{tmp_path}/looker_mces.json"
|
|
pipeline = Pipeline.create(
|
|
{
|
|
"run_id": "looker-test",
|
|
"source": {
|
|
"type": "looker",
|
|
"config": {
|
|
"base_url": "https://looker.company.com",
|
|
"client_id": "foo",
|
|
"client_secret": "bar",
|
|
"extract_usage_history": True,
|
|
"max_threads": 1,
|
|
},
|
|
},
|
|
"sink": {
|
|
"type": "file",
|
|
"config": {
|
|
"filename": temp_output_file,
|
|
},
|
|
},
|
|
}
|
|
)
|
|
pipeline.run()
|
|
pipeline.pretty_print_summary()
|
|
pipeline.raise_from_status()
|
|
mce_out_file = "looker_mces_usage_history.json"
|
|
|
|
# There should be 4 dashboardUsageStatistics aspects (one absolute and 3 timeseries)
|
|
dashboard_usage_aspect_count = 0
|
|
# There should be 4 chartUsageStatistics (one absolute and 3 timeseries)
|
|
chart_usage_aspect_count = 0
|
|
with open(temp_output_file) as f:
|
|
temp_output_dict = json.load(f)
|
|
for element in temp_output_dict:
|
|
if (
|
|
element.get("entityType") == "dashboard"
|
|
and element.get("aspectName") == "dashboardUsageStatistics"
|
|
):
|
|
dashboard_usage_aspect_count = dashboard_usage_aspect_count + 1
|
|
if (
|
|
element.get("entityType") == "chart"
|
|
and element.get("aspectName") == "chartUsageStatistics"
|
|
):
|
|
chart_usage_aspect_count = chart_usage_aspect_count + 1
|
|
|
|
assert dashboard_usage_aspect_count == 4
|
|
assert chart_usage_aspect_count == 4
|
|
|
|
mce_helpers.check_golden_file(
|
|
pytestconfig,
|
|
output_path=temp_output_file,
|
|
golden_path=f"{test_resources_dir}/{mce_out_file}",
|
|
)
|
|
|
|
|
|
@freeze_time(FROZEN_TIME)
|
|
def test_looker_filter_usage_history(pytestconfig, tmp_path, mock_time):
|
|
mocked_client = mock.MagicMock()
|
|
with mock.patch("looker_sdk.init40") as mock_sdk:
|
|
mock_sdk.return_value = mocked_client
|
|
setup_mock_dashboard_with_usage(mocked_client, skip_look=True)
|
|
mocked_client.run_inline_query.side_effect = side_effect_query_inline
|
|
setup_mock_explore(mocked_client)
|
|
setup_mock_user(mocked_client)
|
|
|
|
temp_output_file = f"{tmp_path}/looker_mces.json"
|
|
pipeline = Pipeline.create(
|
|
{
|
|
"run_id": "looker-test",
|
|
"source": {
|
|
"type": "looker",
|
|
"config": {
|
|
"base_url": "https://looker.company.com",
|
|
"client_id": "foo",
|
|
"client_secret": "bar",
|
|
"extract_usage_history": True,
|
|
"max_threads": 1,
|
|
},
|
|
},
|
|
"sink": {
|
|
"type": "file",
|
|
"config": {
|
|
"filename": temp_output_file,
|
|
},
|
|
},
|
|
}
|
|
)
|
|
pipeline.run()
|
|
pipeline.pretty_print_summary()
|
|
pipeline.raise_from_status()
|
|
|
|
# There should be 4 dashboardUsageStatistics aspects (one absolute and 3 timeseries)
|
|
dashboard_usage_aspect_count = 0
|
|
# There should be 0 chartUsageStatistics -- filtered by set of ingested charts
|
|
chart_usage_aspect_count = 0
|
|
with open(temp_output_file) as f:
|
|
temp_output_dict = json.load(f)
|
|
for element in temp_output_dict:
|
|
if (
|
|
element.get("entityType") == "dashboard"
|
|
and element.get("aspectName") == "dashboardUsageStatistics"
|
|
):
|
|
dashboard_usage_aspect_count = dashboard_usage_aspect_count + 1
|
|
if (
|
|
element.get("entityType") == "chart"
|
|
and element.get("aspectName") == "chartUsageStatistics"
|
|
):
|
|
chart_usage_aspect_count = chart_usage_aspect_count + 1
|
|
|
|
assert dashboard_usage_aspect_count == 4
|
|
assert chart_usage_aspect_count == 0
|
|
|
|
source_report = cast(LookerDashboardSourceReport, pipeline.source.get_report())
|
|
# From timeseries query
|
|
assert str(source_report.dashboards_skipped_for_usage) == str(["5"])
|
|
# From dashboard element
|
|
assert str(source_report.charts_skipped_for_usage) == str(["3"])
|
|
|
|
|
|
@freeze_time(FROZEN_TIME)
|
|
def test_looker_ingest_stateful(pytestconfig, tmp_path, mock_time, mock_datahub_graph):
|
|
output_file_name: str = "looker_mces.json"
|
|
golden_file_name: str = "golden_looker_mces.json"
|
|
output_file_deleted_name: str = "looker_mces_deleted_stateful.json"
|
|
golden_file_deleted_name: str = "looker_mces_golden_deleted_stateful.json"
|
|
|
|
test_resources_dir = pytestconfig.rootpath / "tests/integration/looker"
|
|
|
|
def looker_source_config(sink_file_name):
|
|
return {
|
|
"run_id": "looker-test",
|
|
"pipeline_name": "stateful-looker-pipeline",
|
|
"source": {
|
|
"type": "looker",
|
|
"config": {
|
|
"base_url": "https://looker.company.com",
|
|
"client_id": "foo",
|
|
"client_secret": "bar",
|
|
"extract_usage_history": False,
|
|
"stateful_ingestion": {
|
|
"enabled": True,
|
|
"remove_stale_metadata": True,
|
|
"fail_safe_threshold": 100.0,
|
|
"state_provider": {
|
|
"type": "datahub",
|
|
"config": {"datahub_api": {"server": GMS_SERVER}},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
"sink": {
|
|
"type": "file",
|
|
"config": {
|
|
"filename": f"{tmp_path}/{sink_file_name}",
|
|
},
|
|
},
|
|
}
|
|
|
|
mocked_client = mock.MagicMock()
|
|
pipeline_run1 = None
|
|
with mock.patch(
|
|
"datahub.ingestion.source.state_provider.datahub_ingestion_checkpointing_provider.DataHubGraph",
|
|
mock_datahub_graph,
|
|
) as mock_checkpoint, mock.patch("looker_sdk.init40") as mock_sdk:
|
|
mock_checkpoint.return_value = mock_datahub_graph
|
|
mock_sdk.return_value = mocked_client
|
|
setup_mock_dashboard_multiple_charts(mocked_client)
|
|
setup_mock_explore(mocked_client)
|
|
|
|
pipeline_run1 = Pipeline.create(looker_source_config(output_file_name))
|
|
pipeline_run1.run()
|
|
pipeline_run1.raise_from_status()
|
|
pipeline_run1.pretty_print_summary()
|
|
|
|
mce_helpers.check_golden_file(
|
|
pytestconfig,
|
|
output_path=tmp_path / output_file_name,
|
|
golden_path=f"{test_resources_dir}/{golden_file_name}",
|
|
)
|
|
|
|
checkpoint1 = get_current_checkpoint_from_pipeline(pipeline_run1)
|
|
assert checkpoint1
|
|
assert checkpoint1.state
|
|
|
|
pipeline_run2 = None
|
|
mocked_client = mock.MagicMock()
|
|
with mock.patch(
|
|
"datahub.ingestion.source.state_provider.datahub_ingestion_checkpointing_provider.DataHubGraph",
|
|
mock_datahub_graph,
|
|
) as mock_checkpoint, mock.patch("looker_sdk.init40") as mock_sdk:
|
|
mock_checkpoint.return_value = mock_datahub_graph
|
|
mock_sdk.return_value = mocked_client
|
|
setup_mock_dashboard(mocked_client)
|
|
setup_mock_explore(mocked_client)
|
|
|
|
pipeline_run2 = Pipeline.create(looker_source_config(output_file_deleted_name))
|
|
pipeline_run2.run()
|
|
pipeline_run2.raise_from_status()
|
|
pipeline_run2.pretty_print_summary()
|
|
|
|
mce_helpers.check_golden_file(
|
|
pytestconfig,
|
|
output_path=tmp_path / output_file_deleted_name,
|
|
golden_path=f"{test_resources_dir}/{golden_file_deleted_name}",
|
|
)
|
|
|
|
checkpoint2 = get_current_checkpoint_from_pipeline(pipeline_run2)
|
|
assert checkpoint2
|
|
assert checkpoint2.state
|
|
|
|
# Validate that all providers have committed successfully.
|
|
validate_all_providers_have_committed_successfully(
|
|
pipeline=pipeline_run1, expected_providers=1
|
|
)
|
|
validate_all_providers_have_committed_successfully(
|
|
pipeline=pipeline_run2, expected_providers=1
|
|
)
|
|
|
|
# Perform all assertions on the states. The deleted table should not be
|
|
# part of the second state
|
|
state1 = cast(GenericCheckpointState, checkpoint1.state)
|
|
state2 = cast(GenericCheckpointState, checkpoint2.state)
|
|
|
|
difference_dataset_urns = list(
|
|
state1.get_urns_not_in(type="dataset", other_checkpoint_state=state2)
|
|
)
|
|
assert len(difference_dataset_urns) == 1
|
|
deleted_dataset_urns: List[str] = [
|
|
"urn:li:dataset:(urn:li:dataPlatform:looker,bogus data.explore.my_view,PROD)"
|
|
]
|
|
assert sorted(deleted_dataset_urns) == sorted(difference_dataset_urns)
|
|
|
|
difference_chart_urns = list(
|
|
state1.get_urns_not_in(type="chart", other_checkpoint_state=state2)
|
|
)
|
|
assert len(difference_chart_urns) == 1
|
|
deleted_chart_urns = ["urn:li:chart:(looker,dashboard_elements.10)"]
|
|
assert sorted(deleted_chart_urns) == sorted(difference_chart_urns)
|
|
|
|
difference_dashboard_urns = list(
|
|
state1.get_urns_not_in(type="dashboard", other_checkpoint_state=state2)
|
|
)
|
|
assert len(difference_dashboard_urns) == 1
|
|
deleted_dashboard_urns = ["urn:li:dashboard:(looker,dashboards.11)"]
|
|
assert sorted(deleted_dashboard_urns) == sorted(difference_dashboard_urns)
|
|
|
|
|
|
@freeze_time(FROZEN_TIME)
|
|
def test_independent_look_ingestion_config(pytestconfig, tmp_path, mock_time):
|
|
"""
|
|
if extract_independent_looks is enabled, then stateful_ingestion.enabled should also be enabled
|
|
"""
|
|
new_recipe = get_default_recipe(output_file_path=f"{tmp_path}/output")
|
|
new_recipe["source"]["config"]["extract_independent_looks"] = True
|
|
|
|
with pytest.raises(
|
|
expected_exception=PipelineInitError,
|
|
match="stateful_ingestion.enabled should be set to true",
|
|
):
|
|
# Config error should get raise
|
|
Pipeline.create(new_recipe)
|
|
|
|
|
|
def ingest_independent_looks(
|
|
pytestconfig: Config,
|
|
tmp_path: Path,
|
|
mock_time: float,
|
|
mock_datahub_graph: mock.MagicMock,
|
|
skip_personal_folders: bool,
|
|
golden_file_name: str,
|
|
) -> None:
|
|
mocked_client = mock.MagicMock()
|
|
new_recipe = get_default_recipe(output_file_path=f"{tmp_path}/looker_mces.json")
|
|
new_recipe["source"]["config"]["extract_independent_looks"] = True
|
|
new_recipe["source"]["config"]["skip_personal_folders"] = skip_personal_folders
|
|
new_recipe["source"]["config"]["stateful_ingestion"] = {
|
|
"enabled": True,
|
|
"state_provider": {
|
|
"type": "datahub",
|
|
"config": {"datahub_api": {"server": GMS_SERVER}},
|
|
},
|
|
}
|
|
new_recipe["pipeline_name"] = "execution-1"
|
|
|
|
with mock.patch(
|
|
"datahub.ingestion.source.state_provider.datahub_ingestion_checkpointing_provider.DataHubGraph",
|
|
mock_datahub_graph,
|
|
) as mock_checkpoint, mock.patch("looker_sdk.init40") as mock_sdk:
|
|
mock_checkpoint.return_value = mock_datahub_graph
|
|
|
|
mock_sdk.return_value = mocked_client
|
|
setup_mock_dashboard(mocked_client)
|
|
setup_mock_explore(mocked_client)
|
|
setup_mock_user(mocked_client)
|
|
setup_mock_all_user(mocked_client)
|
|
setup_mock_look(mocked_client)
|
|
|
|
test_resources_dir = pytestconfig.rootpath / "tests/integration/looker"
|
|
|
|
pipeline = Pipeline.create(new_recipe)
|
|
pipeline.run()
|
|
pipeline.raise_from_status()
|
|
|
|
mce_helpers.check_golden_file(
|
|
pytestconfig,
|
|
output_path=tmp_path / "looker_mces.json",
|
|
golden_path=f"{test_resources_dir}/{golden_file_name}",
|
|
)
|
|
|
|
|
|
@freeze_time(FROZEN_TIME)
|
|
def test_independent_looks_ingest_with_personal_folder(
|
|
pytestconfig, tmp_path, mock_time, mock_datahub_graph
|
|
):
|
|
ingest_independent_looks(
|
|
pytestconfig=pytestconfig,
|
|
tmp_path=tmp_path,
|
|
mock_time=mock_time,
|
|
mock_datahub_graph=mock_datahub_graph,
|
|
skip_personal_folders=False,
|
|
golden_file_name="golden_test_independent_look_ingest.json",
|
|
)
|
|
|
|
|
|
@freeze_time(FROZEN_TIME)
|
|
def test_independent_looks_ingest_without_personal_folder(
|
|
pytestconfig, tmp_path, mock_time, mock_datahub_graph
|
|
):
|
|
ingest_independent_looks(
|
|
pytestconfig=pytestconfig,
|
|
tmp_path=tmp_path,
|
|
mock_time=mock_time,
|
|
mock_datahub_graph=mock_datahub_graph,
|
|
skip_personal_folders=True,
|
|
golden_file_name="golden_test_non_personal_independent_look.json",
|
|
)
|
|
|
|
|
|
@freeze_time(FROZEN_TIME)
|
|
def test_file_path_in_view_naming_pattern(
|
|
pytestconfig, tmp_path, mock_time, mock_datahub_graph
|
|
):
|
|
mocked_client = mock.MagicMock()
|
|
new_recipe = get_default_recipe(output_file_path=f"{tmp_path}/looker_mces.json")
|
|
new_recipe["source"]["config"]["view_naming_pattern"] = (
|
|
"{project}.{file_path}.view.{name}"
|
|
)
|
|
|
|
with mock.patch(
|
|
"datahub.ingestion.source.state_provider.datahub_ingestion_checkpointing_provider.DataHubGraph",
|
|
mock_datahub_graph,
|
|
) as mock_checkpoint, mock.patch("looker_sdk.init40") as mock_sdk:
|
|
mock_checkpoint.return_value = mock_datahub_graph
|
|
|
|
mock_sdk.return_value = mocked_client
|
|
setup_mock_dashboard(mocked_client)
|
|
setup_mock_explore(
|
|
mocked_client,
|
|
additional_lkml_fields=[
|
|
LookmlModelExploreField(
|
|
name="dim2",
|
|
type="string",
|
|
dimension_group=None,
|
|
description="dimension one description",
|
|
label_short="Dimensions One Label",
|
|
view="underlying_view",
|
|
source_file="views/underlying_view.view.lkml",
|
|
)
|
|
],
|
|
)
|
|
setup_mock_look(mocked_client)
|
|
setup_mock_external_project_view_explore(mocked_client)
|
|
|
|
test_resources_dir = pytestconfig.rootpath / "tests/integration/looker"
|
|
|
|
pipeline = Pipeline.create(new_recipe)
|
|
pipeline.run()
|
|
pipeline.raise_from_status()
|
|
mce_out_file = "golden_test_file_path_ingest.json"
|
|
|
|
mce_helpers.check_golden_file(
|
|
pytestconfig,
|
|
output_path=tmp_path / "looker_mces.json",
|
|
golden_path=f"{test_resources_dir}/{mce_out_file}",
|
|
)
|
|
|
|
|
|
@freeze_time(FROZEN_TIME)
|
|
def test_independent_soft_deleted_looks(
|
|
pytestconfig,
|
|
tmp_path,
|
|
mock_time,
|
|
):
|
|
mocked_client = mock.MagicMock()
|
|
|
|
with mock.patch("looker_sdk.init40") as mock_sdk:
|
|
mock_sdk.return_value = mocked_client
|
|
setup_mock_look(mocked_client)
|
|
setup_mock_soft_deleted_look(mocked_client)
|
|
looker_api = LookerAPI(
|
|
config=LookerAPIConfig(
|
|
base_url="https://fake.com",
|
|
client_id="foo",
|
|
client_secret="bar",
|
|
)
|
|
)
|
|
looks: List[Look] = looker_api.all_looks(
|
|
fields=["id"],
|
|
soft_deleted=True,
|
|
)
|
|
|
|
assert len(looks) == 3
|
|
assert looks[0].title == "Outer Look"
|
|
assert looks[1].title == "Personal Look"
|
|
assert looks[2].title == "Soft Deleted"
|
|
|
|
|
|
def setup_mock_dashboard_multi_model_explores(mocked_client):
|
|
"""Set up a dashboard element that references explores from different models."""
|
|
mocked_client.all_dashboards.return_value = [Dashboard(id="1")]
|
|
mocked_client.dashboard.return_value = Dashboard(
|
|
id="1",
|
|
title="Dashboard with Multi-Model Explores",
|
|
created_at=datetime.utcfromtimestamp(time.time()),
|
|
updated_at=datetime.utcfromtimestamp(time.time()),
|
|
description="A dashboard with elements that reference explores from different models",
|
|
dashboard_elements=[
|
|
DashboardElement(
|
|
id="2",
|
|
type="vis",
|
|
title="Multi-Model Element",
|
|
subtitle_text="Element referencing two explores from different models",
|
|
result_maker=mock.MagicMock(
|
|
query=Query(
|
|
model="model_1",
|
|
view="explore_1",
|
|
fields=["explore_1.field1", "explore_1.field2"],
|
|
),
|
|
filterables=[
|
|
mock.MagicMock(
|
|
model="model_1",
|
|
view="explore_1",
|
|
),
|
|
mock.MagicMock(
|
|
model="model_2",
|
|
view="explore_2",
|
|
),
|
|
],
|
|
),
|
|
)
|
|
],
|
|
folder=FolderBase(name="Shared", id="shared-folder-id"),
|
|
)
|
|
|
|
|
|
def setup_mock_multi_model_explores(mocked_client):
|
|
"""Set up mocks for explores from different models."""
|
|
|
|
def lookml_model_explore_side_effect(model, name, *args, **kwargs):
|
|
if model == "model_1" and name == "explore_1":
|
|
return LookmlModelExplore(
|
|
id="1",
|
|
name="explore_1",
|
|
label="Explore 1",
|
|
description="First explore from model 1",
|
|
view_name="underlying_view_1",
|
|
project_name="project_1",
|
|
fields=LookmlModelExploreFieldset(
|
|
dimensions=[
|
|
LookmlModelExploreField(
|
|
name="field1",
|
|
type="string",
|
|
description="field 1 description",
|
|
label_short="Field 1",
|
|
)
|
|
]
|
|
),
|
|
source_file="model_1/explore_1.lkml",
|
|
)
|
|
elif model == "model_2" and name == "explore_2":
|
|
return LookmlModelExplore(
|
|
id="2",
|
|
name="explore_2",
|
|
label="Explore 2",
|
|
description="Second explore from model 2",
|
|
view_name="underlying_view_2",
|
|
project_name="project_2",
|
|
fields=LookmlModelExploreFieldset(
|
|
dimensions=[
|
|
LookmlModelExploreField(
|
|
name="field1",
|
|
type="string",
|
|
description="field 1 description",
|
|
label_short="Field 1",
|
|
)
|
|
]
|
|
),
|
|
source_file="model_2/explore_2.lkml",
|
|
)
|
|
return None
|
|
|
|
def lookml_model_side_effect(model, *args, **kwargs):
|
|
if model == "model_1":
|
|
mock_model = mock.MagicMock(project_name="project_1")
|
|
return mock_model
|
|
elif model == "model_2":
|
|
mock_model = mock.MagicMock(project_name="project_2")
|
|
return mock_model
|
|
return None
|
|
|
|
mocked_client.lookml_model.side_effect = lookml_model_side_effect
|
|
mocked_client.lookml_model_explore.side_effect = lookml_model_explore_side_effect
|
|
|
|
|
|
@freeze_time(FROZEN_TIME)
|
|
def test_looker_ingest_multi_model_explores(pytestconfig, tmp_path, mock_time):
|
|
"""Test ingestion of dashboard elements with explores from different models."""
|
|
mocked_client = mock.MagicMock()
|
|
output_file = f"{tmp_path}/looker_multi_model_mces.json"
|
|
|
|
with mock.patch("looker_sdk.init40") as mock_sdk:
|
|
mock_sdk.return_value = mocked_client
|
|
setup_mock_dashboard_multi_model_explores(mocked_client)
|
|
setup_mock_multi_model_explores(mocked_client)
|
|
mocked_client.run_inline_query.side_effect = side_effect_query_inline
|
|
|
|
test_resources_dir = pytestconfig.rootpath / "tests/integration/looker"
|
|
|
|
pipeline = Pipeline.create(
|
|
{
|
|
"run_id": "looker-test",
|
|
"source": {
|
|
"type": "looker",
|
|
"config": {
|
|
"base_url": "https://looker.company.com",
|
|
"client_id": "foo",
|
|
"client_secret": "bar",
|
|
"extract_usage_history": False,
|
|
},
|
|
},
|
|
"sink": {
|
|
"type": "file",
|
|
"config": {
|
|
"filename": output_file,
|
|
},
|
|
},
|
|
}
|
|
)
|
|
pipeline.run()
|
|
pipeline.raise_from_status()
|
|
|
|
# Validate against a golden file
|
|
mce_out_file = "golden_test_multi_model_explores.json"
|
|
mce_helpers.check_golden_file(
|
|
pytestconfig,
|
|
output_path=output_file,
|
|
golden_path=f"{test_resources_dir}/{mce_out_file}",
|
|
)
|
|
|
|
# Simply check that both model_1 and model_2 explores appear in the output file
|
|
with open(output_file, "r") as f:
|
|
output = f.read()
|
|
assert "model_1.explore.explore_1" in output, (
|
|
"Missing model_1.explore.explore_1 in output"
|
|
)
|
|
assert "model_2.explore.explore_2" in output, (
|
|
"Missing model_2.explore.explore_2 in output"
|
|
)
|
|
|
|
|
|
@freeze_time(FROZEN_TIME)
|
|
def test_upstream_cll(pytestconfig, tmp_path, mock_time, mock_datahub_graph):
|
|
mocked_client = mock.MagicMock()
|
|
|
|
with mock.patch(
|
|
"datahub.ingestion.source.state_provider.datahub_ingestion_checkpointing_provider.DataHubGraph",
|
|
mock_datahub_graph,
|
|
) as mock_checkpoint, mock.patch("looker_sdk.init40") as mock_sdk:
|
|
mock_checkpoint.return_value = mock_datahub_graph
|
|
|
|
mock_sdk.return_value = mocked_client
|
|
setup_mock_explore(
|
|
mocked_client,
|
|
additional_lkml_fields=[
|
|
LookmlModelExploreField(
|
|
name="dim2",
|
|
type="string",
|
|
dimension_group=None,
|
|
description="dimension one description",
|
|
label_short="Dimensions One Label",
|
|
view="underlying_view",
|
|
source_file="views/underlying_view.view.lkml",
|
|
),
|
|
LookmlModelExploreField(
|
|
category=Category.dimension,
|
|
dimension_group="my_explore_name.createdon",
|
|
field_group_label="Createdon Date",
|
|
field_group_variant="Date",
|
|
label="Dataset Lineages Explore Createdon Date",
|
|
label_short="Createdon Date",
|
|
lookml_link="/projects/datahub-demo/files/views%2Fdatahub-demo%2Fdatasets%2Fdataset_lineages.view.lkml?line=5",
|
|
name="my_explore_name.createdon_date",
|
|
project_name="datahub-demo",
|
|
source_file="views/datahub-demo/datasets/dataset_lineages.view.lkml",
|
|
source_file_path="datahub-demo/views/datahub-demo/datasets/dataset_lineages.view.lkml",
|
|
sql='${TABLE}."CREATEDON" ',
|
|
suggest_dimension="my_explore_name.createdon_date",
|
|
suggest_explore="my_explore_name",
|
|
type="date_date",
|
|
view="my_explore_name",
|
|
view_label="Dataset Lineages Explore",
|
|
original_view="dataset_lineages",
|
|
),
|
|
],
|
|
)
|
|
config = mock.MagicMock()
|
|
|
|
config.view_naming_pattern.replace_variables.return_value = "dataset_lineages"
|
|
config.platform_name = "snowflake"
|
|
config.platform_instance = "sales"
|
|
config.env = "DEV"
|
|
|
|
looker_explore: Optional[LookerExplore] = looker_common.LookerExplore.from_api(
|
|
model="fake",
|
|
explore_name="my_explore_name",
|
|
client=mocked_client,
|
|
reporter=mock.MagicMock(),
|
|
source_config=config,
|
|
)
|
|
|
|
assert looker_explore is not None
|
|
assert looker_explore.name == "my_explore_name"
|
|
assert looker_explore.fields is not None
|
|
assert len(looker_explore.fields) == 3
|
|
|
|
assert (
|
|
looker_explore.fields[2].upstream_fields[0].table
|
|
== "urn:li:dataset:(urn:li:dataPlatform:snowflake,"
|
|
"sales.dataset_lineages,DEV)"
|
|
)
|
|
|
|
assert looker_explore.fields[2].upstream_fields[0].column == "createdon"
|
|
|
|
|
|
@freeze_time(FROZEN_TIME)
|
|
def test_explore_tags(pytestconfig, tmp_path, mock_time, mock_datahub_graph):
|
|
mocked_client = mock.MagicMock()
|
|
|
|
with mock.patch(
|
|
"datahub.ingestion.source.state_provider.datahub_ingestion_checkpointing_provider.DataHubGraph",
|
|
mock_datahub_graph,
|
|
) as mock_checkpoint, mock.patch("looker_sdk.init40") as mock_sdk:
|
|
mock_checkpoint.return_value = mock_datahub_graph
|
|
|
|
tags: List[str] = ["metrics", "all"]
|
|
|
|
mock_sdk.return_value = mocked_client
|
|
setup_mock_explore(
|
|
mocked_client,
|
|
tags=tags,
|
|
)
|
|
|
|
looker_explore: Optional[LookerExplore] = looker_common.LookerExplore.from_api(
|
|
model="fake",
|
|
explore_name="my_explore_name",
|
|
client=mocked_client,
|
|
reporter=mock.MagicMock(),
|
|
source_config=mock.MagicMock(),
|
|
)
|
|
|
|
assert looker_explore is not None
|
|
assert looker_explore.name == "my_explore_name"
|
|
assert looker_explore.tags == tags
|
|
|
|
mcps: Optional[
|
|
List[Union[MetadataChangeEvent, MetadataChangeProposalWrapper]]
|
|
] = looker_explore._to_metadata_events(
|
|
config=LookerCommonConfig(),
|
|
reporter=SourceReport(),
|
|
base_url="fake",
|
|
extract_embed_urls=False,
|
|
)
|
|
|
|
expected_tag_urns: List[str] = ["urn:li:tag:metrics", "urn:li:tag:all"]
|
|
|
|
actual_tag_urns: List[str] = []
|
|
if mcps:
|
|
for mcp in mcps:
|
|
if isinstance(mcp, MetadataChangeEventClass):
|
|
for aspect in mcp.proposedSnapshot.aspects:
|
|
if isinstance(aspect, GlobalTagsClass):
|
|
actual_tag_urns = [
|
|
tag_association.tag for tag_association in aspect.tags
|
|
]
|
|
|
|
assert expected_tag_urns == actual_tag_urns
|
|
|
|
|
|
def side_effect_function_for_dashboards(*args: Tuple[str], **kwargs: Any) -> Dashboard:
|
|
assert kwargs["dashboard_id"] in ["1", "2", "3"], "Invalid dashboard id"
|
|
|
|
if kwargs["dashboard_id"] == "1":
|
|
return Dashboard(
|
|
id=kwargs["dashboard_id"],
|
|
title="first dashboard",
|
|
created_at=datetime.utcfromtimestamp(time.time()),
|
|
updated_at=datetime.utcfromtimestamp(time.time()),
|
|
description="first",
|
|
folder=FolderBase(name="A", id="a"),
|
|
dashboard_elements=[
|
|
DashboardElement(
|
|
id="2",
|
|
type="",
|
|
subtitle_text="Some text",
|
|
query=Query(
|
|
model="data",
|
|
view="my_view",
|
|
fields=["dim1"],
|
|
dynamic_fields='[{"table_calculation":"calc","label":"foobar","expression":"offset(${my_table.value},1)","value_format":null,"value_format_name":"eur","_kind_hint":"measure","_type_hint":"number"}]',
|
|
),
|
|
)
|
|
],
|
|
)
|
|
|
|
if kwargs["dashboard_id"] == "2":
|
|
return Dashboard(
|
|
id=kwargs["dashboard_id"],
|
|
title="second dashboard",
|
|
created_at=datetime.utcfromtimestamp(time.time()),
|
|
updated_at=datetime.utcfromtimestamp(time.time()),
|
|
description="second",
|
|
folder=FolderBase(name="B", id="b"),
|
|
dashboard_elements=[
|
|
DashboardElement(
|
|
id="2",
|
|
type="",
|
|
subtitle_text="Some text",
|
|
query=Query(
|
|
model="data",
|
|
view="my_view",
|
|
fields=["dim1"],
|
|
dynamic_fields='[{"table_calculation":"calc","label":"foobar","expression":"offset(${my_table.value},1)","value_format":null,"value_format_name":"eur","_kind_hint":"measure","_type_hint":"number"}]',
|
|
),
|
|
)
|
|
],
|
|
)
|
|
|
|
if kwargs["dashboard_id"] == "3":
|
|
return Dashboard(
|
|
id=kwargs["dashboard_id"],
|
|
title="third dashboard",
|
|
created_at=datetime.utcfromtimestamp(time.time()),
|
|
updated_at=datetime.utcfromtimestamp(time.time()),
|
|
description="third",
|
|
folder=FolderBase(name="C", id="c"),
|
|
dashboard_elements=[
|
|
DashboardElement(
|
|
id="2",
|
|
type="",
|
|
subtitle_text="Some text",
|
|
query=Query(
|
|
model="data",
|
|
view="my_view",
|
|
fields=["dim1"],
|
|
dynamic_fields='[{"table_calculation":"calc","label":"foobar","expression":"offset(${my_table.value},1)","value_format":null,"value_format_name":"eur","_kind_hint":"measure","_type_hint":"number"}]',
|
|
),
|
|
)
|
|
],
|
|
)
|
|
|
|
# Default return to satisfy the linter
|
|
return Dashboard(
|
|
id="unknown",
|
|
title="unknown",
|
|
created_at=datetime.utcfromtimestamp(time.time()),
|
|
updated_at=datetime.utcfromtimestamp(time.time()),
|
|
description="unknown",
|
|
folder=FolderBase(name="Unknown", id="unknown"),
|
|
dashboard_elements=[],
|
|
)
|
|
|
|
|
|
def side_effect_function_folder_ancestors(
|
|
*args: Tuple[Any], **kwargs: Any
|
|
) -> Sequence[Folder]:
|
|
assert args[0] in ["a", "b", "c"], "Invalid folder id"
|
|
|
|
if args[0] == "a":
|
|
# No parent
|
|
return ()
|
|
|
|
if args[0] == "b":
|
|
return (Folder(id="a", name="A"),)
|
|
|
|
if args[0] == "c":
|
|
return Folder(id="a", name="A"), Folder(id="b", name="B")
|
|
|
|
# Default return to satisfy the linter
|
|
return (Folder(id="unknown", name="Unknown"),)
|
|
|
|
|
|
def setup_mock_dashboard_with_folder(mocked_client):
|
|
mocked_client.all_dashboards.return_value = [
|
|
Dashboard(id="1"),
|
|
Dashboard(id="2"),
|
|
Dashboard(id="3"),
|
|
]
|
|
mocked_client.dashboard.side_effect = side_effect_function_for_dashboards
|
|
mocked_client.folder_ancestors.side_effect = side_effect_function_folder_ancestors
|
|
|
|
|
|
@freeze_time(FROZEN_TIME)
|
|
def test_folder_path_pattern(pytestconfig, tmp_path, mock_time, mock_datahub_graph):
|
|
mocked_client = mock.MagicMock()
|
|
new_recipe = get_default_recipe(output_file_path=f"{tmp_path}/looker_mces.json")
|
|
new_recipe["source"]["config"]["folder_path_pattern"] = {
|
|
"allow": ["A/B/C"],
|
|
}
|
|
|
|
with mock.patch("looker_sdk.init40") as mock_sdk:
|
|
mock_sdk.return_value = mocked_client
|
|
|
|
setup_mock_dashboard_with_folder(mocked_client)
|
|
|
|
setup_mock_explore(mocked_client)
|
|
|
|
setup_mock_look(mocked_client)
|
|
|
|
test_resources_dir = pytestconfig.rootpath / "tests/integration/looker"
|
|
|
|
pipeline = Pipeline.create(new_recipe)
|
|
pipeline.run()
|
|
pipeline.raise_from_status()
|
|
mce_out_file = "golden_test_folder_path_pattern_ingest.json"
|
|
|
|
mce_helpers.check_golden_file(
|
|
pytestconfig,
|
|
output_path=tmp_path / "looker_mces.json",
|
|
golden_path=f"{test_resources_dir}/{mce_out_file}",
|
|
)
|