import datetime import json import re from pathlib import Path from typing import Any, Dict, List, Optional, Union, cast from unittest import mock from unittest.mock import MagicMock import pytest from freezegun import freeze_time from datahub.ingestion.api.source import StructuredLogLevel from datahub.ingestion.run.pipeline import Pipeline from datahub.ingestion.source.powerbi.config import ( Constant, PowerBiDashboardSourceConfig, SupportedDataPlatform, ) from datahub.ingestion.source.powerbi.powerbi import PowerBiDashboardSource from datahub.ingestion.source.powerbi.rest_api_wrapper.data_classes import ( Page, Report, ReportType, Workspace, ) from datahub.testing import mce_helpers from tests.test_helpers import test_connection_helpers pytestmark = pytest.mark.integration_batch_2 FROZEN_TIME = "2022-02-03 07:00:00" def mock_msal_cca(*args, **kwargs): class MsalClient: def __init__(self): self.call_num = 0 self.token: Dict[str, Any] = { "access_token": "dummy", } def acquire_token_for_client(self, *args, **kwargs): self.call_num += 1 return self.token def reset(self): self.call_num = 0 return MsalClient() def scan_init_response(request, context): # Request mock is passing POST input in the form of workspaces= # If we scan 2 or more, it get messy like this. 'workspaces=64ED5CAD-7C10-4684-8180-826122881108&workspaces=64ED5CAD-7C22-4684-8180-826122881108' workspace_id_list = request.text.replace("&", "").split("workspaces=") workspace_id = "||".join(workspace_id_list[1:]) w_id_vs_response: Dict[str, Any] = { "64ED5CAD-7C10-4684-8180-826122881108": { "id": "4674efd1-603c-4129-8d82-03cf2be05aff" }, "64ED5CAD-7C22-4684-8180-826122881108": { "id": "a674efd1-603c-4129-8d82-03cf2be05aff" }, "64ED5CAD-7C10-4684-8180-826122881108||64ED5CAD-7C22-4684-8180-826122881108": { "id": "a674efd1-603c-4129-8d82-03cf2be05aff" }, "A8D655A6-F521-477E-8C22-255018583BF4": { "id": "62DAF926-0B18-4FF1-982C-2A3EB6B8F0E4" }, "C5DA6EA8-625E-4AB1-90B6-CAEA0BF9F492": { "id": "81B02907-E2A3-45C3-B505-3781839C8CAA", }, "8F756DE6-26AD-45FF-A201-44276FF1F561": { "id": "6147FCEB-7531-4449-8FB6-1F7A5431BF2D", }, } return w_id_vs_response[workspace_id] def read_mock_data(path: Union[Path, str]) -> dict: with open(path) as p: return json.load(p) def register_mock_api( pytestconfig: pytest.Config, request_mock: Any, override_data: Optional[dict] = None ) -> None: default_mock_data_path = ( pytestconfig.rootpath / "tests/integration/powerbi/mock_data/default_mock_response.json" ) api_vs_response = { "https://api.powerbi.com/v1.0/myorg/admin/workspaces/getInfo": { "method": "POST", "status_code": 200, "json": scan_init_response, }, } api_vs_response.update(read_mock_data(default_mock_data_path)) api_vs_response.update(override_data or {}) for url in api_vs_response: request_mock.register_uri( api_vs_response[url]["method"], url, json=api_vs_response[url].get("json"), text=api_vs_response[url].get("text"), status_code=api_vs_response[url]["status_code"], ) def default_source_config(): return { "client_id": "foo", "client_secret": "bar", "tenant_id": "0B0C960B-FCDF-4D0F-8C45-2E03BB59DDEB", "workspace_id": "64ED5CAD-7C10-4684-8180-826122881108", "extract_lineage": False, "extract_reports": False, "extract_ownership": True, "convert_lineage_urns_to_lowercase": False, "workspace_id_pattern": {"allow": ["64ED5CAD-7C10-4684-8180-826122881108"]}, "dataset_type_mapping": { "PostgreSql": "postgres", "Oracle": "oracle", }, "env": "DEV", "extract_workspaces_to_containers": False, "enable_advance_lineage_sql_construct": False, } @freeze_time(FROZEN_TIME) @mock.patch("msal.ConfidentialClientApplication", side_effect=mock_msal_cca) @pytest.mark.integration def test_powerbi_ingest( mock_msal: MagicMock, pytestconfig: pytest.Config, tmp_path: str, mock_time: datetime.datetime, requests_mock: Any, ) -> None: test_resources_dir = pytestconfig.rootpath / "tests/integration/powerbi" register_mock_api(pytestconfig=pytestconfig, request_mock=requests_mock) pipeline = Pipeline.create( { "run_id": "powerbi-test", "source": { "type": "powerbi", "config": { **default_source_config(), }, }, "sink": { "type": "file", "config": { "filename": f"{tmp_path}/powerbi_mces.json", }, }, } ) pipeline.run() pipeline.raise_from_status() golden_file = "golden_test_ingest.json" mce_helpers.check_golden_file( pytestconfig, output_path=f"{tmp_path}/powerbi_mces.json", golden_path=f"{test_resources_dir}/{golden_file}", ) @freeze_time(FROZEN_TIME) @mock.patch("msal.ConfidentialClientApplication", side_effect=mock_msal_cca) @pytest.mark.integration def test_powerbi_workspace_type_filter( mock_msal: MagicMock, pytestconfig: pytest.Config, tmp_path: str, mock_time: datetime.datetime, requests_mock: Any, ) -> None: test_resources_dir = pytestconfig.rootpath / "tests/integration/powerbi" register_mock_api( request_mock=requests_mock, pytestconfig=pytestconfig, override_data=read_mock_data( pytestconfig.rootpath / "tests/integration/powerbi/mock_data/workspace_type_filter.json" ), ) default_config: dict = default_source_config() del default_config["workspace_id"] del default_config["workspace_id_pattern"] pipeline = Pipeline.create( { "run_id": "powerbi-test", "source": { "type": "powerbi", "config": { **default_config, "extract_workspaces_to_containers": True, "workspace_type_filter": [ "PersonalGroup", ], }, }, "sink": { "type": "file", "config": { "filename": f"{tmp_path}/powerbi_mces.json", }, }, } ) pipeline.run() pipeline.raise_from_status() golden_file = "golden_test_personal_ingest.json" mce_helpers.check_golden_file( pytestconfig, output_path=f"{tmp_path}/powerbi_mces.json", golden_path=f"{test_resources_dir}/{golden_file}", ) @freeze_time(FROZEN_TIME) @mock.patch("msal.ConfidentialClientApplication", side_effect=mock_msal_cca) @pytest.mark.integration def test_powerbi_ingest_patch_disabled( mock_msal: MagicMock, pytestconfig: pytest.Config, tmp_path: str, mock_time: datetime.datetime, requests_mock: Any, ) -> None: test_resources_dir = pytestconfig.rootpath / "tests/integration/powerbi" register_mock_api(pytestconfig=pytestconfig, request_mock=requests_mock) pipeline = Pipeline.create( { "run_id": "powerbi-test", "source": { "type": "powerbi", "config": { **default_source_config(), "patch_metadata": False, }, }, "sink": { "type": "file", "config": { "filename": f"{tmp_path}/powerbi_mces.json", }, }, } ) pipeline.run() pipeline.raise_from_status() golden_file = "golden_test_ingest_patch_disabled.json" mce_helpers.check_golden_file( pytestconfig, output_path=f"{tmp_path}/powerbi_mces.json", golden_path=f"{test_resources_dir}/{golden_file}", ) @freeze_time(FROZEN_TIME) @mock.patch("msal.ConfidentialClientApplication", side_effect=mock_msal_cca) @pytest.mark.integration def test_powerbi_test_connection_success(mock_msal): report = test_connection_helpers.run_test_connection( PowerBiDashboardSource, default_source_config() ) test_connection_helpers.assert_basic_connectivity_success(report) @freeze_time(FROZEN_TIME) @pytest.mark.integration def test_powerbi_test_connection_failure(): report = test_connection_helpers.run_test_connection( PowerBiDashboardSource, default_source_config() ) test_connection_helpers.assert_basic_connectivity_failure( report, "Unable to get authority configuration" ) @freeze_time(FROZEN_TIME) @mock.patch("msal.ConfidentialClientApplication", side_effect=mock_msal_cca) @pytest.mark.integration def test_powerbi_platform_instance_ingest( mock_msal: MagicMock, pytestconfig: pytest.Config, tmp_path: str, mock_time: datetime.datetime, requests_mock: Any, ) -> None: test_resources_dir = pytestconfig.rootpath / "tests/integration/powerbi" register_mock_api(pytestconfig=pytestconfig, request_mock=requests_mock) output_path: str = f"{tmp_path}/powerbi_platform_instance_mces.json" pipeline = Pipeline.create( { "run_id": "powerbi-test", "source": { "type": "powerbi", "config": { **default_source_config(), "platform_instance": "aws-ap-south-1", }, }, "sink": { "type": "file", "config": { "filename": output_path, }, }, } ) pipeline.run() pipeline.raise_from_status() golden_file = "golden_test_platform_instance_ingest.json" mce_helpers.check_golden_file( pytestconfig, output_path=output_path, golden_path=f"{test_resources_dir}/{golden_file}", ) @freeze_time(FROZEN_TIME) @mock.patch("msal.ConfidentialClientApplication", side_effect=mock_msal_cca) @pytest.mark.integration def test_powerbi_ingest_urn_lower_case( mock_msal: MagicMock, pytestconfig: pytest.Config, tmp_path: str, mock_time: datetime.datetime, requests_mock: Any, ) -> None: test_resources_dir = pytestconfig.rootpath / "tests/integration/powerbi" register_mock_api(pytestconfig=pytestconfig, request_mock=requests_mock) pipeline = Pipeline.create( { "run_id": "powerbi-test", "source": { "type": "powerbi", "config": { **default_source_config(), "env": "PROD", "platform_instance": "myPlatformInstance", "convert_urns_to_lowercase": True, "convert_lineage_urns_to_lowercase": True, }, }, "sink": { "type": "file", "config": { "filename": f"{tmp_path}/powerbi_lower_case_urn_mces.json", }, }, } ) pipeline.run() pipeline.raise_from_status() golden_file = "golden_test_lower_case_urn_ingest.json" mce_helpers.check_golden_file( pytestconfig, output_path=f"{tmp_path}/powerbi_lower_case_urn_mces.json", golden_path=f"{test_resources_dir}/{golden_file}", ) @freeze_time(FROZEN_TIME) @mock.patch("msal.ConfidentialClientApplication", side_effect=mock_msal_cca) @pytest.mark.integration def test_override_ownership( mock_msal: MagicMock, pytestconfig: pytest.Config, tmp_path: str, mock_time: datetime.datetime, requests_mock: Any, ) -> None: test_resources_dir = pytestconfig.rootpath / "tests/integration/powerbi" register_mock_api(pytestconfig=pytestconfig, request_mock=requests_mock) pipeline = Pipeline.create( { "run_id": "powerbi-test", "source": { "type": "powerbi", "config": { **default_source_config(), "extract_ownership": False, }, }, "sink": { "type": "file", "config": { "filename": f"{tmp_path}/powerbi_mces_disabled_ownership.json", }, }, } ) pipeline.run() pipeline.raise_from_status() mce_out_file = "golden_test_disabled_ownership.json" mce_helpers.check_golden_file( pytestconfig, output_path=f"{tmp_path}/powerbi_mces_disabled_ownership.json", golden_path=f"{test_resources_dir}/{mce_out_file}", ) @freeze_time(FROZEN_TIME) @mock.patch("msal.ConfidentialClientApplication", side_effect=mock_msal_cca) @pytest.mark.integration def test_scan_all_workspaces( mock_msal: MagicMock, pytestconfig: pytest.Config, tmp_path: str, mock_time: datetime.datetime, requests_mock: Any, ) -> None: test_resources_dir = pytestconfig.rootpath / "tests/integration/powerbi" register_mock_api(pytestconfig=pytestconfig, request_mock=requests_mock) pipeline = Pipeline.create( { "run_id": "powerbi-test", "source": { "type": "powerbi", "config": { **default_source_config(), "extract_reports": False, "extract_ownership": False, "workspace_id_pattern": { "deny": ["64ED5CAD-7322-4684-8180-826122881108"], }, }, }, "sink": { "type": "file", "config": { "filename": f"{tmp_path}/powerbi_mces_scan_all_workspaces.json", }, }, } ) pipeline.run() pipeline.raise_from_status() golden_file = "golden_test_scan_all_workspaces.json" mce_helpers.check_golden_file( pytestconfig, output_path=f"{tmp_path}/powerbi_mces_scan_all_workspaces.json", golden_path=f"{test_resources_dir}/{golden_file}", ) @freeze_time(FROZEN_TIME) @mock.patch("msal.ConfidentialClientApplication", side_effect=mock_msal_cca) @pytest.mark.integration def test_extract_reports( mock_msal: MagicMock, pytestconfig: pytest.Config, tmp_path: str, mock_time: datetime.datetime, requests_mock: Any, ) -> None: test_resources_dir = pytestconfig.rootpath / "tests/integration/powerbi" register_mock_api(pytestconfig=pytestconfig, request_mock=requests_mock) pipeline = Pipeline.create( { "run_id": "powerbi-test", "source": { "type": "powerbi", "config": { **default_source_config(), "extract_reports": True, }, }, "sink": { "type": "file", "config": { "filename": f"{tmp_path}/powerbi_report_mces.json", }, }, } ) pipeline.run() pipeline.raise_from_status() golden_file = "golden_test_report.json" mce_helpers.check_golden_file( pytestconfig, output_path=f"{tmp_path}/powerbi_report_mces.json", golden_path=f"{test_resources_dir}/{golden_file}", ) @freeze_time(FROZEN_TIME) @mock.patch("msal.ConfidentialClientApplication", side_effect=mock_msal_cca) @pytest.mark.integration def test_extract_lineage( mock_msal: MagicMock, pytestconfig: pytest.Config, tmp_path: str, mock_time: datetime.datetime, requests_mock: Any, ) -> None: test_resources_dir = pytestconfig.rootpath / "tests/integration/powerbi" register_mock_api(pytestconfig=pytestconfig, request_mock=requests_mock) pipeline = Pipeline.create( { "run_id": "powerbi-lineage-test", "source": { "type": "powerbi", "config": { **default_source_config(), "extract_lineage": True, "dataset_type_mapping": { "PostgreSql": {"platform_instance": "operational_instance"}, "Oracle": { "platform_instance": "high_performance_production_unit" }, "Sql": {"platform_instance": "reporting-db"}, "Snowflake": {"platform_instance": "sn-2"}, }, }, }, "sink": { "type": "file", "config": { "filename": f"{tmp_path}/powerbi_lineage_mces.json", }, }, } ) pipeline.run() pipeline.raise_from_status() golden_file = "golden_test_lineage.json" mce_helpers.check_golden_file( pytestconfig, output_path=f"{tmp_path}/powerbi_lineage_mces.json", golden_path=f"{test_resources_dir}/{golden_file}", ) @freeze_time(FROZEN_TIME) @mock.patch("msal.ConfidentialClientApplication", side_effect=mock_msal_cca) @pytest.mark.integration def test_extract_endorsements( mock_msal: MagicMock, pytestconfig: pytest.Config, tmp_path: str, mock_time: datetime.datetime, requests_mock: Any, ) -> None: test_resources_dir = pytestconfig.rootpath / "tests/integration/powerbi" register_mock_api(pytestconfig=pytestconfig, request_mock=requests_mock) pipeline = Pipeline.create( { "run_id": "powerbi-test", "source": { "type": "powerbi", "config": { **default_source_config(), "extract_reports": False, "extract_endorsements_to_tags": True, }, }, "sink": { "type": "file", "config": { "filename": f"{tmp_path}/powerbi_endorsement_mces.json", }, }, } ) pipeline.run() pipeline.raise_from_status() mce_out_file = "golden_test_endorsement.json" mce_helpers.check_golden_file( pytestconfig, output_path=f"{tmp_path}/powerbi_endorsement_mces.json", golden_path=f"{test_resources_dir}/{mce_out_file}", ) @freeze_time(FROZEN_TIME) @mock.patch("msal.ConfidentialClientApplication", side_effect=mock_msal_cca) @pytest.mark.integration def test_admin_access_is_not_allowed( mock_msal: MagicMock, pytestconfig: pytest.Config, tmp_path: str, mock_time: datetime.datetime, requests_mock: Any, ) -> None: test_resources_dir = pytestconfig.rootpath / "tests/integration/powerbi" register_mock_api( pytestconfig=pytestconfig, request_mock=requests_mock, override_data={ "https://api.powerbi.com/v1.0/myorg/admin/workspaces/getInfo": { "method": "POST", "status_code": 403, "json": {}, }, }, ) pipeline = Pipeline.create( { "run_id": "powerbi-admin-api-disabled-test", "source": { "type": "powerbi", "config": { **default_source_config(), "extract_lineage": True, "dataset_type_mapping": { "PostgreSql": {"platform_instance": "operational_instance"}, "Oracle": { "platform_instance": "high_performance_production_unit" }, "Sql": {"platform_instance": "reporting-db"}, "Snowflake": {"platform_instance": "sn-2"}, }, }, }, "sink": { "type": "file", "config": { "filename": f"{tmp_path}/golden_test_admin_access_not_allowed_mces.json", }, }, } ) pipeline.run() pipeline.raise_from_status() golden_file = "golden_test_admin_access_not_allowed.json" mce_helpers.check_golden_file( pytestconfig, output_path=f"{tmp_path}/golden_test_admin_access_not_allowed_mces.json", golden_path=f"{test_resources_dir}/{golden_file}", ) @freeze_time(FROZEN_TIME) @mock.patch("msal.ConfidentialClientApplication", side_effect=mock_msal_cca) def test_workspace_container( mock_msal: MagicMock, pytestconfig: pytest.Config, tmp_path: str, mock_time: datetime.datetime, requests_mock: Any, ) -> None: test_resources_dir = pytestconfig.rootpath / "tests/integration/powerbi" register_mock_api(pytestconfig=pytestconfig, request_mock=requests_mock) pipeline = Pipeline.create( { "run_id": "powerbi-test", "source": { "type": "powerbi", "config": { **default_source_config(), "workspace_id_pattern": { "deny": ["64ED5CAD-7322-4684-8180-826122881108"], }, "extract_workspaces_to_containers": True, "extract_datasets_to_containers": True, "extract_reports": True, }, }, "sink": { "type": "file", "config": { "filename": f"{tmp_path}/powerbi_container_mces.json", }, }, } ) pipeline.run() pipeline.raise_from_status() mce_out_file = "golden_test_container.json" mce_helpers.check_golden_file( pytestconfig, output_path=f"{tmp_path}/powerbi_container_mces.json", golden_path=f"{test_resources_dir}/{mce_out_file}", ) def test_access_token_expiry_with_long_expiry( pytestconfig: pytest.Config, tmp_path: str, mock_time: datetime.datetime, requests_mock: Any, ) -> None: register_mock_api(pytestconfig=pytestconfig, request_mock=requests_mock) mock_msal = mock_msal_cca() with mock.patch("msal.ConfidentialClientApplication", return_value=mock_msal): pipeline = Pipeline.create( { "run_id": "powerbi-test", "source": { "type": "powerbi", "config": { **default_source_config(), }, }, "sink": { "type": "file", "config": { "filename": f"{tmp_path}/powerbi_access_token_mces.json", }, }, } ) # for long expiry, the token should only be requested once. mock_msal.token = { "access_token": "dummy2", "expires_in": 3600, } mock_msal.reset() pipeline.run() # We expect the token to be requested twice (once for AdminApiResolver and one for RegularApiResolver) assert mock_msal.call_num == 2 def test_access_token_expiry_with_short_expiry( pytestconfig: pytest.Config, tmp_path: str, mock_time: datetime.datetime, requests_mock: Any, ) -> None: register_mock_api(pytestconfig=pytestconfig, request_mock=requests_mock) mock_msal = mock_msal_cca() with mock.patch("msal.ConfidentialClientApplication", return_value=mock_msal): pipeline = Pipeline.create( { "run_id": "powerbi-test", "source": { "type": "powerbi", "config": { **default_source_config(), }, }, "sink": { "type": "file", "config": { "filename": f"{tmp_path}/powerbi_access_token_mces.json", }, }, } ) # for short expiry, the token should be requested when expires. mock_msal.token = { "access_token": "dummy", "expires_in": 0, } mock_msal.reset() pipeline.run() assert mock_msal.call_num > 2 def dataset_type_mapping_set_to_all_platform(pipeline: Pipeline) -> None: source_config: PowerBiDashboardSourceConfig = cast( PowerBiDashboardSource, pipeline.source ).source_config assert source_config.dataset_type_mapping is not None # Generate default dataset_type_mapping and compare it with source_config.dataset_type_mapping default_dataset_type_mapping: dict = {} for item in SupportedDataPlatform: default_dataset_type_mapping[item.value.powerbi_data_platform_name] = ( item.value.datahub_data_platform_name ) assert default_dataset_type_mapping == source_config.dataset_type_mapping @freeze_time(FROZEN_TIME) @mock.patch("msal.ConfidentialClientApplication", side_effect=mock_msal_cca) @pytest.mark.integration def test_dataset_type_mapping_should_set_to_all( mock_msal, pytestconfig, tmp_path, mock_time, requests_mock ): """ Here we don't need to run the pipeline. We need to verify dataset_type_mapping is set to default dataplatform """ register_mock_api(pytestconfig=pytestconfig, request_mock=requests_mock) new_config: dict = {**default_source_config()} del new_config["dataset_type_mapping"] pipeline = Pipeline.create( { "run_id": "powerbi-test", "source": { "type": "powerbi", "config": { **new_config, }, }, "sink": { "type": "file", "config": { "filename": f"{tmp_path}/powerbi_lower_case_urn_mces.json", }, }, } ) dataset_type_mapping_set_to_all_platform(pipeline) @freeze_time(FROZEN_TIME) @mock.patch("msal.ConfidentialClientApplication", side_effect=mock_msal_cca) @pytest.mark.integration def test_dataset_type_mapping_error( mock_msal, pytestconfig, tmp_path, mock_time, requests_mock ): """ Here we don't need to run the pipeline. We need to verify if both dataset_type_mapping and server_to_platform_instance are set then value error should get raised """ register_mock_api(pytestconfig=pytestconfig, request_mock=requests_mock) with pytest.raises(Exception, match=r"dataset_type_mapping is deprecated"): Pipeline.create( { "run_id": "powerbi-test", "source": { "type": "powerbi", "config": { **default_source_config(), "server_to_platform_instance": { "localhost": { "platform_instance": "test", } }, }, }, "sink": { "type": "file", "config": { "filename": f"{tmp_path}/powerbi_lower_case_urn_mces.json", }, }, } ) @freeze_time(FROZEN_TIME) @mock.patch("msal.ConfidentialClientApplication", side_effect=mock_msal_cca) def test_server_to_platform_map( mock_msal, pytestconfig, tmp_path, mock_time, requests_mock ): test_resources_dir = pytestconfig.rootpath / "tests/integration/powerbi" new_config: dict = { **default_source_config(), "extract_lineage": True, "convert_lineage_urns_to_lowercase": True, } del new_config["dataset_type_mapping"] new_config["server_to_platform_instance"] = { "hp123rt5.ap-southeast-2.fakecomputing.com": { "platform_instance": "snowflake_production_instance", "env": "PROD", }, "my-test-project": { "platform_instance": "bigquery-computing-dev-account", "env": "QA", }, "localhost:1521": {"platform_instance": "oracle-sales-instance", "env": "PROD"}, } register_mock_api(pytestconfig=pytestconfig, request_mock=requests_mock) output_path: str = f"{tmp_path}/powerbi_server_to_platform_instance_mces.json" pipeline = Pipeline.create( { "run_id": "powerbi-test", "source": { "type": "powerbi", "config": new_config, }, "sink": { "type": "file", "config": { "filename": output_path, }, }, } ) pipeline.run() pipeline.raise_from_status() golden_file_path: str = ( f"{test_resources_dir}/golden_test_server_to_platform_instance.json" ) mce_helpers.check_golden_file( pytestconfig, output_path=output_path, golden_path=golden_file_path, ) # As server_to_platform_instance map is provided, the old dataset_type_mapping # should be set to all supported platform # to process all available upstream lineage even if mapping for platform instance is # not provided in server_to_platform_instance map dataset_type_mapping_set_to_all_platform(pipeline) def validate_pipeline(pipeline: Pipeline) -> None: mock_workspace: Workspace = Workspace( id="64ED5CAD-7C10-4684-8180-826122881108", name="demo-workspace", type="Workspace", datasets={}, dashboards={}, reports={}, report_endorsements={}, dashboard_endorsements={}, scan_result={}, independent_datasets={}, app=None, ) # Fetch actual reports reports: Dict[str, Report] = cast( PowerBiDashboardSource, pipeline.source ).powerbi_client.get_reports(workspace=mock_workspace) assert len(reports) == 2 # Generate expected reports using mock reports mock_reports: List[Dict] = [ { "datasetId": "05169CD2-E713-41E6-9600-1D8066D95445", "id": "5b218778-e7a5-4d73-8187-f10824047715", "name": "SalesMarketing", "description": "Acryl sales marketing report", "pages": [ { "name": "ReportSection", "displayName": "Regional Sales Analysis", "order": "0", }, { "name": "ReportSection1", "displayName": "Geographic Analysis", "order": "1", }, ], }, { "datasetId": "05169CD2-E713-41E6-9600-1D8066D95445", "id": "e9fd6b0b-d8c8-4265-8c44-67e183aebf97", "name": "Product", "description": "Acryl product report", "pages": [], }, ] expected_reports: Dict[str, Report] = { report[Constant.ID]: Report( id=report[Constant.ID], name=report[Constant.NAME], type=ReportType.PowerBIReport, webUrl="", embedUrl="", description=report[Constant.DESCRIPTION], pages=[ Page( id="{}.{}".format( report[Constant.ID], page[Constant.NAME].replace(" ", "_") ), name=page[Constant.NAME], displayName=page[Constant.DISPLAY_NAME], order=page[Constant.ORDER], ) for page in report["pages"] ], users=[], tags=[], dataset_id=report[Constant.DATASET_ID], dataset=mock_workspace.datasets.get(report[Constant.DATASET_ID]), ) for report in mock_reports } # Compare actual and expected reports for i in range(2): report_id = mock_reports[i][Constant.ID] assert reports[report_id].id == expected_reports[report_id].id assert reports[report_id].name == expected_reports[report_id].name assert reports[report_id].description == expected_reports[report_id].description assert reports[report_id].dataset == expected_reports[report_id].dataset assert reports[report_id].pages == expected_reports[report_id].pages @freeze_time(FROZEN_TIME) @mock.patch("msal.ConfidentialClientApplication", side_effect=mock_msal_cca) @pytest.mark.integration def test_reports_with_failed_page_request( mock_msal: MagicMock, pytestconfig: pytest.Config, tmp_path: str, mock_time: datetime.datetime, requests_mock: Any, ) -> None: """ Test that all reports are fetched even if a single page request fails """ register_mock_api( pytestconfig=pytestconfig, request_mock=requests_mock, override_data={ "https://api.powerbi.com/v1.0/myorg/groups/64ED5CAD-7C10-4684-8180-826122881108/reports": { "method": "GET", "status_code": 200, "json": { "value": [ { "datasetId": "05169CD2-E713-41E6-9600-1D8066D95445", "id": "5b218778-e7a5-4d73-8187-f10824047715", "reportType": "PowerBIReport", "name": "SalesMarketing", "description": "Acryl sales marketing report", "webUrl": "https://app.powerbi.com/groups/64ED5CAD-7C10-4684-8180-826122881108/reports/5b218778-e7a5-4d73-8187-f10824047715", "embedUrl": "https://app.powerbi.com/reportEmbed?reportId=5b218778-e7a5-4d73-8187-f10824047715&groupId=64ED5CAD-7C10-4684-8180-826122881108", }, { "datasetId": "05169CD2-E713-41E6-9600-1D8066D95445", "id": "e9fd6b0b-d8c8-4265-8c44-67e183aebf97", "reportType": "PaginatedReport", "name": "Product", "description": "Acryl product report", "webUrl": "https://app.powerbi.com/groups/64ED5CAD-7C10-4684-8180-826122881108/reports/e9fd6b0b-d8c8-4265-8c44-67e183aebf97", "embedUrl": "https://app.powerbi.com/reportEmbed?reportId=e9fd6b0b-d8c8-4265-8c44-67e183aebf97&groupId=64ED5CAD-7C10-4684-8180-826122881108", }, ] }, }, "https://api.powerbi.com/v1.0/myorg/groups/64ED5CAD-7C10-4684-8180-826122881108/reports/5b218778-e7a5-4d73-8187-f10824047715": { "method": "GET", "status_code": 200, "json": { "datasetId": "05169CD2-E713-41E6-9600-1D8066D95445", "id": "5b218778-e7a5-4d73-8187-f10824047715", "name": "SalesMarketing", "reportType": "PowerBIReport", "description": "Acryl sales marketing report", "webUrl": "https://app.powerbi.com/groups/64ED5CAD-7C10-4684-8180-826122881108/reports/5b218778-e7a5-4d73-8187-f10824047715", "embedUrl": "https://app.powerbi.com/reportEmbed?reportId=5b218778-e7a5-4d73-8187-f10824047715&groupId=64ED5CAD-7C10-4684-8180-826122881108", }, }, "https://api.powerbi.com/v1.0/myorg/groups/64ED5CAD-7C10-4684-8180-826122881108/reports/e9fd6b0b-d8c8-4265-8c44-67e183aebf97": { "method": "GET", "status_code": 200, "json": { "datasetId": "05169CD2-E713-41E6-9600-1D8066D95445", "id": "e9fd6b0b-d8c8-4265-8c44-67e183aebf97", "reportType": "PowerBIReport", "name": "Product", "description": "Acryl product report", "webUrl": "https://app.powerbi.com/groups/64ED5CAD-7C10-4684-8180-826122881108/reports/e9fd6b0b-d8c8-4265-8c44-67e183aebf97", "embedUrl": "https://app.powerbi.com/reportEmbed?reportId=e9fd6b0b-d8c8-4265-8c44-67e183aebf97&groupId=64ED5CAD-7C10-4684-8180-826122881108", }, }, "https://api.powerbi.com/v1.0/myorg/groups/64ED5CAD-7C10-4684-8180-826122881108/reports/5b218778-e7a5-4d73-8187-f10824047715/pages": { "method": "GET", "status_code": 200, "json": { "value": [ { "displayName": "Regional Sales Analysis", "name": "ReportSection", "order": "0", }, { "displayName": "Geographic Analysis", "name": "ReportSection1", "order": "1", }, ] }, }, "https://api.powerbi.com/v1.0/myorg/groups/64ED5CAD-7C10-4684-8180-826122881108/reports/e9fd6b0b-d8c8-4265-8c44-67e183aebf97/pages": { "method": "GET", "status_code": 400, "json": { "error": { "code": "InvalidRequest", "message": "Request is currently not supported for RDL reports", } }, }, }, ) pipeline = Pipeline.create( { "run_id": "powerbi-test", "source": { "type": "powerbi", "config": { **default_source_config(), "extract_reports": True, "platform_instance": "aws-ap-south-1", }, }, "sink": { "type": "file", "config": { "filename": f"{tmp_path}powerbi_reports_with_failed_page_request_mces.json", }, }, } ) validate_pipeline(pipeline) @freeze_time(FROZEN_TIME) @mock.patch("msal.ConfidentialClientApplication", side_effect=mock_msal_cca) def test_independent_datasets_extraction( mock_msal: MagicMock, pytestconfig: pytest.Config, tmp_path: str, mock_time: datetime.datetime, requests_mock: Any, ) -> None: test_resources_dir = pytestconfig.rootpath / "tests/integration/powerbi" register_mock_api( pytestconfig=pytestconfig, request_mock=requests_mock, override_data={ "https://api.powerbi.com/v1.0/myorg/groups?%24skip=0&%24top=1000": { "method": "GET", "status_code": 200, "json": { "value": [ { "id": "64ED5CAD-7C10-4684-8180-826122881108", "isReadOnly": True, "name": "demo-workspace", "type": "Workspace", }, ], }, }, "https://api.powerbi.com/v1.0/myorg/groups?%24skip=1000&%24top=1000": { "method": "GET", "status_code": 200, "json": { "value": [], }, }, "https://api.powerbi.com/v1.0/myorg/admin/workspaces/scanResult/4674efd1-603c-4129-8d82-03cf2be05aff": { "method": "GET", "status_code": 200, "json": { "workspaces": [ { "id": "64ED5CAD-7C10-4684-8180-826122881108", "name": "demo-workspace", "type": "Workspace", "state": "Active", "datasets": [ { "id": "91580e0e-1680-4b1c-bbf9-4f6764d7a5ff", "tables": [ { "name": "employee_ctc", "source": [ { "expression": "dummy", } ], } ], }, ], }, ] }, }, "https://api.powerbi.com/v1.0/myorg/groups/64ED5CAD-7C10-4684-8180-826122881108/dashboards": { "method": "GET", "status_code": 200, "json": {"value": []}, }, }, ) pipeline = Pipeline.create( { "run_id": "powerbi-test", "source": { "type": "powerbi", "config": { **default_source_config(), "extract_independent_datasets": True, }, }, "sink": { "type": "file", "config": { "filename": f"{tmp_path}/powerbi_independent_mces.json", }, }, } ) pipeline.run() pipeline.raise_from_status() golden_file = "golden_test_independent_datasets.json" mce_helpers.check_golden_file( pytestconfig, output_path=f"{tmp_path}/powerbi_independent_mces.json", golden_path=f"{test_resources_dir}/{golden_file}", ) @freeze_time(FROZEN_TIME) @mock.patch("msal.ConfidentialClientApplication", side_effect=mock_msal_cca) def test_cll_extraction( mock_msal: MagicMock, pytestconfig: pytest.Config, tmp_path: str, mock_time: datetime.datetime, requests_mock: Any, ) -> None: test_resources_dir = pytestconfig.rootpath / "tests/integration/powerbi" register_mock_api( pytestconfig=pytestconfig, request_mock=requests_mock, ) default_conf: dict = default_source_config() del default_conf[ "dataset_type_mapping" ] # delete this key so that connector set it to default (all dataplatform) pipeline = Pipeline.create( { "run_id": "powerbi-test", "source": { "type": "powerbi", "config": { **default_conf, "extract_lineage": True, "extract_column_level_lineage": True, "enable_advance_lineage_sql_construct": True, "native_query_parsing": True, "extract_independent_datasets": True, }, }, "sink": { "type": "file", "config": { "filename": f"{tmp_path}/powerbi_cll_mces.json", }, }, } ) pipeline.run() pipeline.raise_from_status() golden_file = "golden_test_cll.json" mce_helpers.check_golden_file( pytestconfig, output_path=f"{tmp_path}/powerbi_cll_mces.json", golden_path=f"{test_resources_dir}/{golden_file}", ) @freeze_time(FROZEN_TIME) @mock.patch("msal.ConfidentialClientApplication", side_effect=mock_msal_cca) def test_cll_extraction_flags( mock_msal: MagicMock, pytestconfig: pytest.Config, tmp_path: str, mock_time: datetime.datetime, requests_mock: Any, ) -> None: register_mock_api( pytestconfig=pytestconfig, request_mock=requests_mock, ) default_conf: dict = default_source_config() pattern: str = re.escape( "Enable all these flags in recipe: ['native_query_parsing', 'enable_advance_lineage_sql_construct', 'extract_lineage', 'extract_dataset_schema']" ) with pytest.raises(Exception, match=pattern): Pipeline.create( { "run_id": "powerbi-test", "source": { "type": "powerbi", "config": { **default_conf, "extract_column_level_lineage": True, }, }, "sink": { "type": "file", "config": { "filename": f"{tmp_path}/powerbi_cll_mces.json", }, }, } ) @freeze_time(FROZEN_TIME) @mock.patch("msal.ConfidentialClientApplication", side_effect=mock_msal_cca) @pytest.mark.integration def test_powerbi_cross_workspace_reference_info_message( mock_msal: MagicMock, pytestconfig: pytest.Config, tmp_path: str, mock_time: datetime.datetime, requests_mock: Any, ) -> None: register_mock_api( pytestconfig=pytestconfig, request_mock=requests_mock, override_data=read_mock_data( path=pytestconfig.rootpath / "tests/integration/powerbi/mock_data/cross_workspace_mock_response.json" ), ) config = default_source_config() del config["workspace_id"] config["workspace_id_pattern"] = { "allow": [ "A8D655A6-F521-477E-8C22-255018583BF4", "C5DA6EA8-625E-4AB1-90B6-CAEA0BF9F492", ] } config["include_workspace_name_in_dataset_urn"] = True pipeline = Pipeline.create( { "run_id": "powerbi-test", "source": { "type": "powerbi", "config": { **config, }, }, "sink": { "type": "file", "config": { "filename": f"{tmp_path}/powerbi_mces.json", }, }, } ) pipeline.run() pipeline.raise_from_status() assert isinstance(pipeline.source, PowerBiDashboardSource) # to silent the lint info_entries: dict = pipeline.source.reporter._structured_logs._entries.get( StructuredLogLevel.INFO, {} ) # type :ignore is_entry_present: bool = False # Printing INFO entries for entry in info_entries.values(): if entry.title == "Missing Dataset Lineage For Tile": is_entry_present = True break assert is_entry_present, ( 'Info message "Missing Dataset Lineage For Tile" should be present in reporter' ) test_resources_dir = pytestconfig.rootpath / "tests/integration/powerbi" golden_file = "golden_test_cross_workspace_dataset.json" mce_helpers.check_golden_file( pytestconfig, output_path=f"{tmp_path}/powerbi_mces.json", golden_path=f"{test_resources_dir}/{golden_file}", ) def common_app_ingest( pytestconfig: pytest.Config, requests_mock: Any, output_mcp_path: str, override_config: Optional[dict] = None, ) -> Pipeline: if override_config is None: override_config = {} register_mock_api( pytestconfig=pytestconfig, request_mock=requests_mock, override_data=read_mock_data( path=pytestconfig.rootpath / "tests/integration/powerbi/mock_data/workspace_with_app_mock_response.json" ), ) config = default_source_config() del config["workspace_id"] config["workspace_id_pattern"] = { "allow": [ "8F756DE6-26AD-45FF-A201-44276FF1F561", ] } config.update(override_config) pipeline = Pipeline.create( { "run_id": "powerbi-test", "source": { "type": "powerbi", "config": { **config, }, }, "sink": { "type": "file", "config": { "filename": output_mcp_path, }, }, } ) pipeline.run() pipeline.raise_from_status() return pipeline @freeze_time(FROZEN_TIME) @mock.patch("msal.ConfidentialClientApplication", side_effect=mock_msal_cca) @pytest.mark.integration def test_powerbi_app_ingest( mock_msal: MagicMock, pytestconfig: pytest.Config, tmp_path: str, mock_time: datetime.datetime, requests_mock: Any, ) -> None: common_app_ingest( pytestconfig=pytestconfig, requests_mock=requests_mock, output_mcp_path=f"{tmp_path}/powerbi_mces.json", override_config={ "extract_app": True, }, ) golden_file = "golden_test_app_ingest.json" test_resources_dir = pytestconfig.rootpath / "tests/integration/powerbi" mce_helpers.check_golden_file( pytestconfig, output_path=f"{tmp_path}/powerbi_mces.json", golden_path=f"{test_resources_dir}/{golden_file}", ) @freeze_time(FROZEN_TIME) @mock.patch("msal.ConfidentialClientApplication", side_effect=mock_msal_cca) @pytest.mark.integration def test_powerbi_app_ingest_info_message( mock_msal: MagicMock, pytestconfig: pytest.Config, tmp_path: str, mock_time: datetime.datetime, requests_mock: Any, ) -> None: pipeline = common_app_ingest( pytestconfig=pytestconfig, requests_mock=requests_mock, output_mcp_path=f"{tmp_path}/powerbi_mces.json", ) assert isinstance(pipeline.source, PowerBiDashboardSource) # to silent the lint info_entries: dict = pipeline.source.reporter._structured_logs._entries.get( StructuredLogLevel.INFO, {} ) # type :ignore is_entry_present: bool = False # Printing INFO entries for entry in info_entries.values(): if entry.title == "App Ingestion Is Disabled": is_entry_present = True break assert is_entry_present, ( "The extract_app flag should be set to false by default. We need to keep this flag as false until all GMS instances are updated to the latest release." )