Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

421 lines
15 KiB
Python
Raw Permalink Normal View History

import uuid
from unittest import TestCase
from unittest.mock import patch
import pytest
from metadata.generated.schema.entity.data.dashboardDataModel import (
DashboardDataModel,
DataModelType,
)
from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataWorkflowConfig,
)
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.generated.schema.type.entityReferenceList import EntityReferenceList
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.dashboard.powerbi.metadata import PowerbiSource
from metadata.ingestion.source.dashboard.powerbi.models import (
Dataflow,
Dataset,
PowerBIDashboard,
PowerBiTable,
PowerBITableSource,
UpstreaDataflow,
)
from metadata.utils import fqn
MOCK_REDSHIFT_EXP = """
let
Source = AmazonRedshift.Database("redsshift-cluster.redshift.amazonaws.com:5439","dev"),
demo_dbt_jaffle = Source{[Name="demo_dbt_jaffle"]}[Data],
customers_clean1 = demo_dbt_jaffle{[Name="customers_clean"]}[Data]
in
customers_clean1
"""
MOCK_REDSHIFT_EXP_INVALID = """
let
Source = Database("redsshift-cluster.redshift.amazonaws.com:5439","dev"),
demo_dbt_jaffle = Source{[Name="demo_dbt_jaffle"]}[Data],
customers_clean1 = demo_dbt_jaffle{[Name="customers_clean"]}[Data]
in
customers_clean1
"""
MOCK_REDSHIFT_EXP_INVALID_V2 = """
let
Source = AmazonRedshift.Database("redsshift-cluster.redshift.amazonaws.com:5439","dev"),
customers_clean1 = demo_dbt_jaffle{[Name="customers_clean"]}[Data]
in
customers_clean1
"""
EXPECTED_REDSHIFT_RESULT = {
"database": "dev",
"schema": "demo_dbt_jaffle",
"table": "customers_clean",
}
MOCK_SNOWFLAKE_EXP = """let
Source = Snowflake.Databases("abcd-123.snowflakecomputing.com","COMPUTE_WH"),
DEMO_STAGE_Database = Source{[Name="DEMO_STAGE",Kind="Database"]}[Data],
PUBLIC_Schema = DEMO_STAGE_Database{[Name="PUBLIC",Kind="Schema"]}[Data],
STG_CUSTOMERS_View = PUBLIC_Schema{[Name="STG_CUSTOMERS",Kind="View"]}[Data]
in
STG_CUSTOMERS_View"""
MOCK_SNOWFLAKE_EXP_INVALID = """let
Source = Snowflake("abcd-123.snowflakecomputing.com","COMPUTE_WH"),
DEMO_STAGE_Database = Source{[Name="DEMO_STAGE",Kind="Database"]}[Data],
in
STG_CUSTOMERS_View"""
EXPECTED_SNOWFLAKE_RESULT = {
"database": "DEMO_STAGE",
"schema": "PUBLIC",
"table": "STG_CUSTOMERS",
}
mock_config = {
"source": {
"type": "powerbi",
"serviceName": "mock_metabase",
"serviceConnection": {
"config": {
"type": "PowerBI",
"clientId": "client_id",
"clientSecret": "secret",
"tenantId": "tenant_id",
},
},
"sourceConfig": {"config": {"type": "DashboardMetadata"}},
},
"sink": {"type": "metadata-rest", "config": {}},
"workflowConfig": {
"loggerLevel": "DEBUG",
"openMetadataServerConfig": {
"hostPort": "http://localhost:8585/api",
"authProvider": "openmetadata",
"securityConfig": {
"jwtToken": "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGc"
"iOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE"
"2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXB"
"iEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fN"
"r3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3u"
"d-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg"
},
},
},
}
MOCK_DASHBOARD_WITH_OWNERS = {
"id": "dashboard1",
"displayName": "Test Dashboard",
"webUrl": "https://test.com",
"embedUrl": "https://test.com/embed",
"tiles": [],
"users": [
{
"displayName": "John Doe",
"emailAddress": "john.doe@example.com",
"dashboardUserAccessRight": "Owner",
"userType": "Member",
},
{
"displayName": "Jane Smith",
"emailAddress": "jane.smith@example.com",
"dashboardUserAccessRight": "Owner",
"userType": "Member",
},
],
}
MOCK_DATASET_WITH_OWNERS = {
"id": "dataset1",
"name": "Test Dataset",
"tables": [],
"description": "Test dataset description",
"users": [
{
"displayName": "John Doe",
"emailAddress": "john.doe@example.com",
"datasetUserAccessRight": "Owner",
"userType": "Member",
}
],
}
MOCK_USER_1_ENITYTY_REF_LIST = EntityReferenceList(
root=[EntityReference(id=uuid.uuid4(), name="John Doe", type="user")]
)
MOCK_USER_2_ENITYTY_REF_LIST = EntityReferenceList(
root=[EntityReference(id=uuid.uuid4(), name="Jane Smith", type="user")]
)
MOCK_SNOWFLAKE_EXP_V2 = 'let\n Source = Snowflake.Databases(Snowflake_URL,Warehouse,[Role=Role]),\n Database = Source{[Name=DB,Kind="Database"]}[Data],\n DB_Schema = Database{[Name=Schema,Kind="Schema"]}[Data],\n Table = DB_Schema{[Name="CUSTOMER_TABLE",Kind="Table"]}[Data],\n #"Andere entfernte Spalten" = Table.SelectColumns(Table,{"ID_BERICHTSMONAT", "ID_AKQUISE_VERMITTLER", "ID_AKQUISE_OE", "ID_SPARTE", "ID_RISIKOTRAEGER", "ID_KUNDE", "STUECK", "BBE"})\nin\n #"Andere entfernte Spalten"'
EXPECTED_SNOWFLAKE_RESULT_V2 = {
"database": "MY_DB",
"schema": "MY_SCHEMA",
"table": "CUSTOMER_TABLE",
}
MOCK_DATASET_FROM_WORKSPACE = Dataset(
id="testdataset",
name="Test Dataset",
tables=[],
expressions=[
{
"name": "DB",
"expression": '"MY_DB" meta [IsParameterQuery=true, List={"MY_DB_DEV", "MY_DB", "MY_DB_PROD"}, DefaultValue="MY_DB", Type="Text", IsParameterQueryRequired=true]',
},
{
"name": "Schema",
"expression": '"MY_SCHEMA" meta [IsParameterQuery=true, List={"MY_SCHEMA", "MY_SCHEMA_PROD"}, DefaultValue="MY_SCHEMA", Type="Text", IsParameterQueryRequired=true]',
},
],
)
MOCK_DATASET_FROM_WORKSPACE_V2 = Dataset(
id="testdataset",
name="Test Dataset",
tables=[],
expressions=[
{
"name": "DB",
},
{
"name": "Schema",
},
],
)
MOCK_DASHBOARD_DATA_MODEL = DashboardDataModel(
name="dummy_datamodel",
id=uuid.uuid4(),
columns=[],
dataModelType=DataModelType.PowerBIDataModel.value,
)
MOCK_DATAMODEL_ENTITY = DashboardDataModel(
name="dummy_dataflow_id_a",
id=uuid.uuid4(),
dataModelType=DataModelType.PowerBIDataFlow.value,
columns=[],
)
class PowerBIUnitTest(TestCase):
"""
Implements the necessary methods to extract
powerbi Dashboard Unit Test
"""
@patch(
"metadata.ingestion.source.dashboard.dashboard_service.DashboardServiceSource.test_connection"
)
@patch("metadata.ingestion.source.dashboard.powerbi.connection.get_connection")
def __init__(self, methodName, get_connection, test_connection) -> None:
super().__init__(methodName)
get_connection.return_value = False
test_connection.return_value = False
self.config = OpenMetadataWorkflowConfig.model_validate(mock_config)
self.powerbi: PowerbiSource = PowerbiSource.create(
mock_config["source"],
OpenMetadata(self.config.workflowConfig.openMetadataServerConfig),
)
@pytest.mark.order(1)
@patch.object(
PowerbiSource,
"_fetch_dataset_from_workspace",
return_value=MOCK_DATASET_FROM_WORKSPACE,
)
def test_parse_database_source(self, *_):
# Test with valid redshift source
result = self.powerbi._parse_redshift_source(MOCK_REDSHIFT_EXP)
self.assertEqual(result, EXPECTED_REDSHIFT_RESULT)
# Test with invalid redshift source
result = self.powerbi._parse_redshift_source(MOCK_REDSHIFT_EXP_INVALID)
self.assertEqual(result, None)
# Test with invalid redshift source
result = self.powerbi._parse_redshift_source(MOCK_REDSHIFT_EXP_INVALID_V2)
self.assertEqual(result, None)
# Test with valid snowflake source
result = self.powerbi._parse_snowflake_source(
MOCK_SNOWFLAKE_EXP, MOCK_DASHBOARD_DATA_MODEL
)
self.assertEqual(result, EXPECTED_SNOWFLAKE_RESULT)
# Test with invalid snowflake source
result = self.powerbi._parse_snowflake_source(
MOCK_SNOWFLAKE_EXP_INVALID, MOCK_DASHBOARD_DATA_MODEL
)
self.assertEqual(result, None)
result = self.powerbi._parse_snowflake_source(
MOCK_SNOWFLAKE_EXP_V2, MOCK_DASHBOARD_DATA_MODEL
)
self.assertEqual(result, EXPECTED_SNOWFLAKE_RESULT_V2)
@pytest.mark.order(2)
@patch("metadata.ingestion.ometa.ometa_api.OpenMetadata.get_reference_by_email")
def test_owner_ingestion(self, get_reference_by_email):
# Mock responses for dashboard owners
self.powerbi.metadata.get_reference_by_email.side_effect = [
MOCK_USER_1_ENITYTY_REF_LIST,
MOCK_USER_2_ENITYTY_REF_LIST,
]
# Test dashboard owner ingestion
dashboard = PowerBIDashboard.model_validate(MOCK_DASHBOARD_WITH_OWNERS)
owner_ref = self.powerbi.get_owner_ref(dashboard)
self.assertIsNotNone(owner_ref)
self.assertEqual(len(owner_ref.root), 2)
self.assertEqual(owner_ref.root[0].name, "John Doe")
self.assertEqual(owner_ref.root[1].name, "Jane Smith")
# Verify get_reference_by_email was called with correct emails
self.powerbi.metadata.get_reference_by_email.assert_any_call(
"john.doe@example.com"
)
self.powerbi.metadata.get_reference_by_email.assert_any_call(
"jane.smith@example.com"
)
# Reset mock for dataset test
self.powerbi.metadata.get_reference_by_email.reset_mock()
self.powerbi.metadata.get_reference_by_email.side_effect = [
MOCK_USER_1_ENITYTY_REF_LIST
]
# Test dataset owner ingestion
dataset = Dataset.model_validate(MOCK_DATASET_WITH_OWNERS)
owner_ref = self.powerbi.get_owner_ref(dataset)
self.assertIsNotNone(owner_ref.root)
self.assertEqual(len(owner_ref.root), 1)
self.assertEqual(owner_ref.root[0].name, "John Doe")
# Verify get_reference_by_email was called with correct email
self.powerbi.metadata.get_reference_by_email.assert_called_once_with(
"john.doe@example.com"
)
# Reset mock for no owners test
self.powerbi.metadata.get_reference_by_email.reset_mock()
# Test with no owners
dashboard_no_owners = PowerBIDashboard.model_validate(
{
"id": "dashboard2",
"displayName": "Test Dashboard 2",
"webUrl": "https://test.com",
"embedUrl": "https://test.com/embed",
"tiles": [],
"users": [],
}
)
owner_ref = self.powerbi.get_owner_ref(dashboard_no_owners)
self.assertIsNone(owner_ref)
# Verify get_reference_by_email was not called when there are no owners
self.powerbi.metadata.get_reference_by_email.assert_not_called()
# Reset mock for invalid owners test
self.powerbi.metadata.get_reference_by_email.reset_mock()
# Test with invalid owners
dashboard_invalid_owners = PowerBIDashboard.model_validate(
{
"id": "dashboard3",
"displayName": "Test Dashboard 3",
"webUrl": "https://test.com",
"embedUrl": "https://test.com/embed",
"tiles": [],
"users": [
{
"displayName": "Kane Williams",
"emailAddress": "kane.williams@example.com",
"dashboardUserAccessRight": "Read",
"userType": "Member",
},
],
}
)
owner_ref = self.powerbi.get_owner_ref(dashboard_invalid_owners)
self.assertIsNone(owner_ref)
# Verify get_reference_by_email was not called when there are no owners
self.powerbi.metadata.get_reference_by_email.assert_not_called()
@pytest.mark.order(3)
def test_parse_table_info_from_source_exp(self):
table = PowerBiTable(
name="test_table",
source=[PowerBITableSource(expression=MOCK_REDSHIFT_EXP)],
)
result = self.powerbi._parse_table_info_from_source_exp(
table, MOCK_DASHBOARD_DATA_MODEL
)
self.assertEqual(result, EXPECTED_REDSHIFT_RESULT)
# no source expression
table = PowerBiTable(
name="test_table",
source=[PowerBITableSource(expression=None)],
)
result = self.powerbi._parse_table_info_from_source_exp(
table, MOCK_DASHBOARD_DATA_MODEL
)
self.assertEqual(result, {})
# no source
table = PowerBiTable(
name="test_table",
source=[],
)
result = self.powerbi._parse_table_info_from_source_exp(
table, MOCK_DASHBOARD_DATA_MODEL
)
self.assertEqual(result, {})
@pytest.mark.order(4)
@patch.object(
PowerbiSource,
"_fetch_dataset_from_workspace",
return_value=MOCK_DATASET_FROM_WORKSPACE_V2,
)
def test_parse_dataset_expressions(self, *_):
# test with valid snowflake source but no
# dataset expression value
result = self.powerbi._parse_snowflake_source(
MOCK_SNOWFLAKE_EXP_V2, MOCK_DASHBOARD_DATA_MODEL
)
self.assertIsNone(result["database"])
self.assertIsNone(result["schema"])
self.assertEqual(result["table"], "CUSTOMER_TABLE")
@pytest.mark.order(5)
@patch.object(OpenMetadata, "get_by_name", return_value=MOCK_DATAMODEL_ENTITY)
@patch.object(fqn, "build", return_value=None)
def test_upstream_dataflow_lineage(self, *_):
MOCK_DATAMODEL_ENTITY_2 = DashboardDataModel(
name="dummy_dataflow_id_b",
id=uuid.uuid4(),
dataModelType=DataModelType.PowerBIDataFlow.value,
columns=[],
)
MOCK_DATAMODEL_2 = Dataflow(
name="dataflow_b",
objectId="dummy_dataflow_id_b",
upstreamDataflows=[
UpstreaDataflow(
targetDataflowId="dataflow_a",
)
],
)
lineage_request = list(
self.powerbi.create_dataflow_upstream_dataflow_lineage(
MOCK_DATAMODEL_2, MOCK_DATAMODEL_ENTITY_2
)
)
assert lineage_request[0].right is not None