mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-07-09 01:58:53 +00:00
421 lines
15 KiB
Python
421 lines
15 KiB
Python
import uuid
|
|
from unittest import TestCase
|
|
from unittest.mock import patch
|
|
|
|
import pytest
|
|
|
|
from metadata.generated.schema.entity.data.dashboardDataModel import (
|
|
DashboardDataModel,
|
|
DataModelType,
|
|
)
|
|
from metadata.generated.schema.metadataIngestion.workflow import (
|
|
OpenMetadataWorkflowConfig,
|
|
)
|
|
from metadata.generated.schema.type.entityReference import EntityReference
|
|
from metadata.generated.schema.type.entityReferenceList import EntityReferenceList
|
|
from metadata.ingestion.ometa.ometa_api import OpenMetadata
|
|
from metadata.ingestion.source.dashboard.powerbi.metadata import PowerbiSource
|
|
from metadata.ingestion.source.dashboard.powerbi.models import (
|
|
Dataflow,
|
|
Dataset,
|
|
PowerBIDashboard,
|
|
PowerBiTable,
|
|
PowerBITableSource,
|
|
UpstreaDataflow,
|
|
)
|
|
from metadata.utils import fqn
|
|
|
|
MOCK_REDSHIFT_EXP = """
|
|
let
|
|
Source = AmazonRedshift.Database("redsshift-cluster.redshift.amazonaws.com:5439","dev"),
|
|
demo_dbt_jaffle = Source{[Name="demo_dbt_jaffle"]}[Data],
|
|
customers_clean1 = demo_dbt_jaffle{[Name="customers_clean"]}[Data]
|
|
in
|
|
customers_clean1
|
|
"""
|
|
|
|
MOCK_REDSHIFT_EXP_INVALID = """
|
|
let
|
|
Source = Database("redsshift-cluster.redshift.amazonaws.com:5439","dev"),
|
|
demo_dbt_jaffle = Source{[Name="demo_dbt_jaffle"]}[Data],
|
|
customers_clean1 = demo_dbt_jaffle{[Name="customers_clean"]}[Data]
|
|
in
|
|
customers_clean1
|
|
"""
|
|
|
|
MOCK_REDSHIFT_EXP_INVALID_V2 = """
|
|
let
|
|
Source = AmazonRedshift.Database("redsshift-cluster.redshift.amazonaws.com:5439","dev"),
|
|
customers_clean1 = demo_dbt_jaffle{[Name="customers_clean"]}[Data]
|
|
in
|
|
customers_clean1
|
|
"""
|
|
|
|
EXPECTED_REDSHIFT_RESULT = {
|
|
"database": "dev",
|
|
"schema": "demo_dbt_jaffle",
|
|
"table": "customers_clean",
|
|
}
|
|
|
|
|
|
MOCK_SNOWFLAKE_EXP = """let
|
|
Source = Snowflake.Databases("abcd-123.snowflakecomputing.com","COMPUTE_WH"),
|
|
DEMO_STAGE_Database = Source{[Name="DEMO_STAGE",Kind="Database"]}[Data],
|
|
PUBLIC_Schema = DEMO_STAGE_Database{[Name="PUBLIC",Kind="Schema"]}[Data],
|
|
STG_CUSTOMERS_View = PUBLIC_Schema{[Name="STG_CUSTOMERS",Kind="View"]}[Data]
|
|
in
|
|
STG_CUSTOMERS_View"""
|
|
|
|
MOCK_SNOWFLAKE_EXP_INVALID = """let
|
|
Source = Snowflake("abcd-123.snowflakecomputing.com","COMPUTE_WH"),
|
|
DEMO_STAGE_Database = Source{[Name="DEMO_STAGE",Kind="Database"]}[Data],
|
|
in
|
|
STG_CUSTOMERS_View"""
|
|
|
|
EXPECTED_SNOWFLAKE_RESULT = {
|
|
"database": "DEMO_STAGE",
|
|
"schema": "PUBLIC",
|
|
"table": "STG_CUSTOMERS",
|
|
}
|
|
|
|
mock_config = {
|
|
"source": {
|
|
"type": "powerbi",
|
|
"serviceName": "mock_metabase",
|
|
"serviceConnection": {
|
|
"config": {
|
|
"type": "PowerBI",
|
|
"clientId": "client_id",
|
|
"clientSecret": "secret",
|
|
"tenantId": "tenant_id",
|
|
},
|
|
},
|
|
"sourceConfig": {"config": {"type": "DashboardMetadata"}},
|
|
},
|
|
"sink": {"type": "metadata-rest", "config": {}},
|
|
"workflowConfig": {
|
|
"loggerLevel": "DEBUG",
|
|
"openMetadataServerConfig": {
|
|
"hostPort": "http://localhost:8585/api",
|
|
"authProvider": "openmetadata",
|
|
"securityConfig": {
|
|
"jwtToken": "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGc"
|
|
"iOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE"
|
|
"2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXB"
|
|
"iEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fN"
|
|
"r3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3u"
|
|
"d-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg"
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
MOCK_DASHBOARD_WITH_OWNERS = {
|
|
"id": "dashboard1",
|
|
"displayName": "Test Dashboard",
|
|
"webUrl": "https://test.com",
|
|
"embedUrl": "https://test.com/embed",
|
|
"tiles": [],
|
|
"users": [
|
|
{
|
|
"displayName": "John Doe",
|
|
"emailAddress": "john.doe@example.com",
|
|
"dashboardUserAccessRight": "Owner",
|
|
"userType": "Member",
|
|
},
|
|
{
|
|
"displayName": "Jane Smith",
|
|
"emailAddress": "jane.smith@example.com",
|
|
"dashboardUserAccessRight": "Owner",
|
|
"userType": "Member",
|
|
},
|
|
],
|
|
}
|
|
|
|
MOCK_DATASET_WITH_OWNERS = {
|
|
"id": "dataset1",
|
|
"name": "Test Dataset",
|
|
"tables": [],
|
|
"description": "Test dataset description",
|
|
"users": [
|
|
{
|
|
"displayName": "John Doe",
|
|
"emailAddress": "john.doe@example.com",
|
|
"datasetUserAccessRight": "Owner",
|
|
"userType": "Member",
|
|
}
|
|
],
|
|
}
|
|
|
|
MOCK_USER_1_ENITYTY_REF_LIST = EntityReferenceList(
|
|
root=[EntityReference(id=uuid.uuid4(), name="John Doe", type="user")]
|
|
)
|
|
MOCK_USER_2_ENITYTY_REF_LIST = EntityReferenceList(
|
|
root=[EntityReference(id=uuid.uuid4(), name="Jane Smith", type="user")]
|
|
)
|
|
|
|
MOCK_SNOWFLAKE_EXP_V2 = 'let\n Source = Snowflake.Databases(Snowflake_URL,Warehouse,[Role=Role]),\n Database = Source{[Name=DB,Kind="Database"]}[Data],\n DB_Schema = Database{[Name=Schema,Kind="Schema"]}[Data],\n Table = DB_Schema{[Name="CUSTOMER_TABLE",Kind="Table"]}[Data],\n #"Andere entfernte Spalten" = Table.SelectColumns(Table,{"ID_BERICHTSMONAT", "ID_AKQUISE_VERMITTLER", "ID_AKQUISE_OE", "ID_SPARTE", "ID_RISIKOTRAEGER", "ID_KUNDE", "STUECK", "BBE"})\nin\n #"Andere entfernte Spalten"'
|
|
EXPECTED_SNOWFLAKE_RESULT_V2 = {
|
|
"database": "MY_DB",
|
|
"schema": "MY_SCHEMA",
|
|
"table": "CUSTOMER_TABLE",
|
|
}
|
|
MOCK_DATASET_FROM_WORKSPACE = Dataset(
|
|
id="testdataset",
|
|
name="Test Dataset",
|
|
tables=[],
|
|
expressions=[
|
|
{
|
|
"name": "DB",
|
|
"expression": '"MY_DB" meta [IsParameterQuery=true, List={"MY_DB_DEV", "MY_DB", "MY_DB_PROD"}, DefaultValue="MY_DB", Type="Text", IsParameterQueryRequired=true]',
|
|
},
|
|
{
|
|
"name": "Schema",
|
|
"expression": '"MY_SCHEMA" meta [IsParameterQuery=true, List={"MY_SCHEMA", "MY_SCHEMA_PROD"}, DefaultValue="MY_SCHEMA", Type="Text", IsParameterQueryRequired=true]',
|
|
},
|
|
],
|
|
)
|
|
MOCK_DATASET_FROM_WORKSPACE_V2 = Dataset(
|
|
id="testdataset",
|
|
name="Test Dataset",
|
|
tables=[],
|
|
expressions=[
|
|
{
|
|
"name": "DB",
|
|
},
|
|
{
|
|
"name": "Schema",
|
|
},
|
|
],
|
|
)
|
|
MOCK_DASHBOARD_DATA_MODEL = DashboardDataModel(
|
|
name="dummy_datamodel",
|
|
id=uuid.uuid4(),
|
|
columns=[],
|
|
dataModelType=DataModelType.PowerBIDataModel.value,
|
|
)
|
|
MOCK_DATAMODEL_ENTITY = DashboardDataModel(
|
|
name="dummy_dataflow_id_a",
|
|
id=uuid.uuid4(),
|
|
dataModelType=DataModelType.PowerBIDataFlow.value,
|
|
columns=[],
|
|
)
|
|
|
|
|
|
class PowerBIUnitTest(TestCase):
|
|
"""
|
|
Implements the necessary methods to extract
|
|
powerbi Dashboard Unit Test
|
|
"""
|
|
|
|
@patch(
|
|
"metadata.ingestion.source.dashboard.dashboard_service.DashboardServiceSource.test_connection"
|
|
)
|
|
@patch("metadata.ingestion.source.dashboard.powerbi.connection.get_connection")
|
|
def __init__(self, methodName, get_connection, test_connection) -> None:
|
|
super().__init__(methodName)
|
|
get_connection.return_value = False
|
|
test_connection.return_value = False
|
|
self.config = OpenMetadataWorkflowConfig.model_validate(mock_config)
|
|
self.powerbi: PowerbiSource = PowerbiSource.create(
|
|
mock_config["source"],
|
|
OpenMetadata(self.config.workflowConfig.openMetadataServerConfig),
|
|
)
|
|
|
|
@pytest.mark.order(1)
|
|
@patch.object(
|
|
PowerbiSource,
|
|
"_fetch_dataset_from_workspace",
|
|
return_value=MOCK_DATASET_FROM_WORKSPACE,
|
|
)
|
|
def test_parse_database_source(self, *_):
|
|
# Test with valid redshift source
|
|
result = self.powerbi._parse_redshift_source(MOCK_REDSHIFT_EXP)
|
|
self.assertEqual(result, EXPECTED_REDSHIFT_RESULT)
|
|
|
|
# Test with invalid redshift source
|
|
result = self.powerbi._parse_redshift_source(MOCK_REDSHIFT_EXP_INVALID)
|
|
self.assertEqual(result, None)
|
|
|
|
# Test with invalid redshift source
|
|
result = self.powerbi._parse_redshift_source(MOCK_REDSHIFT_EXP_INVALID_V2)
|
|
self.assertEqual(result, None)
|
|
|
|
# Test with valid snowflake source
|
|
result = self.powerbi._parse_snowflake_source(
|
|
MOCK_SNOWFLAKE_EXP, MOCK_DASHBOARD_DATA_MODEL
|
|
)
|
|
self.assertEqual(result, EXPECTED_SNOWFLAKE_RESULT)
|
|
|
|
# Test with invalid snowflake source
|
|
result = self.powerbi._parse_snowflake_source(
|
|
MOCK_SNOWFLAKE_EXP_INVALID, MOCK_DASHBOARD_DATA_MODEL
|
|
)
|
|
self.assertEqual(result, None)
|
|
|
|
result = self.powerbi._parse_snowflake_source(
|
|
MOCK_SNOWFLAKE_EXP_V2, MOCK_DASHBOARD_DATA_MODEL
|
|
)
|
|
self.assertEqual(result, EXPECTED_SNOWFLAKE_RESULT_V2)
|
|
|
|
@pytest.mark.order(2)
|
|
@patch("metadata.ingestion.ometa.ometa_api.OpenMetadata.get_reference_by_email")
|
|
def test_owner_ingestion(self, get_reference_by_email):
|
|
# Mock responses for dashboard owners
|
|
self.powerbi.metadata.get_reference_by_email.side_effect = [
|
|
MOCK_USER_1_ENITYTY_REF_LIST,
|
|
MOCK_USER_2_ENITYTY_REF_LIST,
|
|
]
|
|
# Test dashboard owner ingestion
|
|
dashboard = PowerBIDashboard.model_validate(MOCK_DASHBOARD_WITH_OWNERS)
|
|
owner_ref = self.powerbi.get_owner_ref(dashboard)
|
|
self.assertIsNotNone(owner_ref)
|
|
self.assertEqual(len(owner_ref.root), 2)
|
|
self.assertEqual(owner_ref.root[0].name, "John Doe")
|
|
self.assertEqual(owner_ref.root[1].name, "Jane Smith")
|
|
|
|
# Verify get_reference_by_email was called with correct emails
|
|
self.powerbi.metadata.get_reference_by_email.assert_any_call(
|
|
"john.doe@example.com"
|
|
)
|
|
self.powerbi.metadata.get_reference_by_email.assert_any_call(
|
|
"jane.smith@example.com"
|
|
)
|
|
|
|
# Reset mock for dataset test
|
|
self.powerbi.metadata.get_reference_by_email.reset_mock()
|
|
self.powerbi.metadata.get_reference_by_email.side_effect = [
|
|
MOCK_USER_1_ENITYTY_REF_LIST
|
|
]
|
|
|
|
# Test dataset owner ingestion
|
|
dataset = Dataset.model_validate(MOCK_DATASET_WITH_OWNERS)
|
|
owner_ref = self.powerbi.get_owner_ref(dataset)
|
|
self.assertIsNotNone(owner_ref.root)
|
|
self.assertEqual(len(owner_ref.root), 1)
|
|
self.assertEqual(owner_ref.root[0].name, "John Doe")
|
|
|
|
# Verify get_reference_by_email was called with correct email
|
|
self.powerbi.metadata.get_reference_by_email.assert_called_once_with(
|
|
"john.doe@example.com"
|
|
)
|
|
|
|
# Reset mock for no owners test
|
|
self.powerbi.metadata.get_reference_by_email.reset_mock()
|
|
|
|
# Test with no owners
|
|
dashboard_no_owners = PowerBIDashboard.model_validate(
|
|
{
|
|
"id": "dashboard2",
|
|
"displayName": "Test Dashboard 2",
|
|
"webUrl": "https://test.com",
|
|
"embedUrl": "https://test.com/embed",
|
|
"tiles": [],
|
|
"users": [],
|
|
}
|
|
)
|
|
owner_ref = self.powerbi.get_owner_ref(dashboard_no_owners)
|
|
self.assertIsNone(owner_ref)
|
|
|
|
# Verify get_reference_by_email was not called when there are no owners
|
|
self.powerbi.metadata.get_reference_by_email.assert_not_called()
|
|
|
|
# Reset mock for invalid owners test
|
|
self.powerbi.metadata.get_reference_by_email.reset_mock()
|
|
# Test with invalid owners
|
|
dashboard_invalid_owners = PowerBIDashboard.model_validate(
|
|
{
|
|
"id": "dashboard3",
|
|
"displayName": "Test Dashboard 3",
|
|
"webUrl": "https://test.com",
|
|
"embedUrl": "https://test.com/embed",
|
|
"tiles": [],
|
|
"users": [
|
|
{
|
|
"displayName": "Kane Williams",
|
|
"emailAddress": "kane.williams@example.com",
|
|
"dashboardUserAccessRight": "Read",
|
|
"userType": "Member",
|
|
},
|
|
],
|
|
}
|
|
)
|
|
owner_ref = self.powerbi.get_owner_ref(dashboard_invalid_owners)
|
|
self.assertIsNone(owner_ref)
|
|
|
|
# Verify get_reference_by_email was not called when there are no owners
|
|
self.powerbi.metadata.get_reference_by_email.assert_not_called()
|
|
|
|
@pytest.mark.order(3)
|
|
def test_parse_table_info_from_source_exp(self):
|
|
table = PowerBiTable(
|
|
name="test_table",
|
|
source=[PowerBITableSource(expression=MOCK_REDSHIFT_EXP)],
|
|
)
|
|
result = self.powerbi._parse_table_info_from_source_exp(
|
|
table, MOCK_DASHBOARD_DATA_MODEL
|
|
)
|
|
self.assertEqual(result, EXPECTED_REDSHIFT_RESULT)
|
|
|
|
# no source expression
|
|
table = PowerBiTable(
|
|
name="test_table",
|
|
source=[PowerBITableSource(expression=None)],
|
|
)
|
|
result = self.powerbi._parse_table_info_from_source_exp(
|
|
table, MOCK_DASHBOARD_DATA_MODEL
|
|
)
|
|
self.assertEqual(result, {})
|
|
|
|
# no source
|
|
table = PowerBiTable(
|
|
name="test_table",
|
|
source=[],
|
|
)
|
|
result = self.powerbi._parse_table_info_from_source_exp(
|
|
table, MOCK_DASHBOARD_DATA_MODEL
|
|
)
|
|
self.assertEqual(result, {})
|
|
|
|
@pytest.mark.order(4)
|
|
@patch.object(
|
|
PowerbiSource,
|
|
"_fetch_dataset_from_workspace",
|
|
return_value=MOCK_DATASET_FROM_WORKSPACE_V2,
|
|
)
|
|
def test_parse_dataset_expressions(self, *_):
|
|
# test with valid snowflake source but no
|
|
# dataset expression value
|
|
result = self.powerbi._parse_snowflake_source(
|
|
MOCK_SNOWFLAKE_EXP_V2, MOCK_DASHBOARD_DATA_MODEL
|
|
)
|
|
self.assertIsNone(result["database"])
|
|
self.assertIsNone(result["schema"])
|
|
self.assertEqual(result["table"], "CUSTOMER_TABLE")
|
|
|
|
@pytest.mark.order(5)
|
|
@patch.object(OpenMetadata, "get_by_name", return_value=MOCK_DATAMODEL_ENTITY)
|
|
@patch.object(fqn, "build", return_value=None)
|
|
def test_upstream_dataflow_lineage(self, *_):
|
|
MOCK_DATAMODEL_ENTITY_2 = DashboardDataModel(
|
|
name="dummy_dataflow_id_b",
|
|
id=uuid.uuid4(),
|
|
dataModelType=DataModelType.PowerBIDataFlow.value,
|
|
columns=[],
|
|
)
|
|
MOCK_DATAMODEL_2 = Dataflow(
|
|
name="dataflow_b",
|
|
objectId="dummy_dataflow_id_b",
|
|
upstreamDataflows=[
|
|
UpstreaDataflow(
|
|
targetDataflowId="dataflow_a",
|
|
)
|
|
],
|
|
)
|
|
lineage_request = list(
|
|
self.powerbi.create_dataflow_upstream_dataflow_lineage(
|
|
MOCK_DATAMODEL_2, MOCK_DATAMODEL_ENTITY_2
|
|
)
|
|
)
|
|
assert lineage_request[0].right is not None
|