2025-04-02 16:11:27 +05:30
import uuid
2025-03-12 12:59:29 +05:30
from unittest import TestCase
from unittest . mock import patch
import pytest
2025-05-19 19:21:02 +05:30
from metadata . generated . schema . entity . data . dashboardDataModel import (
DashboardDataModel ,
DataModelType ,
)
2025-03-12 12:59:29 +05:30
from metadata . generated . schema . metadataIngestion . workflow import (
OpenMetadataWorkflowConfig ,
)
2025-04-02 16:11:27 +05:30
from metadata . generated . schema . type . entityReference import EntityReference
from metadata . generated . schema . type . entityReferenceList import EntityReferenceList
2025-03-12 12:59:29 +05:30
from metadata . ingestion . ometa . ometa_api import OpenMetadata
from metadata . ingestion . source . dashboard . powerbi . metadata import PowerbiSource
2025-05-27 16:50:55 +05:30
from metadata . ingestion . source . dashboard . powerbi . models import (
2025-06-18 20:46:17 +05:30
Dataflow ,
2025-05-27 16:50:55 +05:30
Dataset ,
PowerBIDashboard ,
PowerBiTable ,
PowerBITableSource ,
2025-06-18 20:46:17 +05:30
UpstreaDataflow ,
2025-05-27 16:50:55 +05:30
)
2025-06-18 20:46:17 +05:30
from metadata . utils import fqn
2025-03-12 12:59:29 +05:30
MOCK_REDSHIFT_EXP = """
let
Source = AmazonRedshift . Database ( " redsshift-cluster.redshift.amazonaws.com:5439 " , " dev " ) ,
demo_dbt_jaffle = Source { [ Name = " demo_dbt_jaffle " ] } [ Data ] ,
customers_clean1 = demo_dbt_jaffle { [ Name = " customers_clean " ] } [ Data ]
in
customers_clean1
"""
MOCK_REDSHIFT_EXP_INVALID = """
let
Source = Database ( " redsshift-cluster.redshift.amazonaws.com:5439 " , " dev " ) ,
demo_dbt_jaffle = Source { [ Name = " demo_dbt_jaffle " ] } [ Data ] ,
customers_clean1 = demo_dbt_jaffle { [ Name = " customers_clean " ] } [ Data ]
in
customers_clean1
"""
MOCK_REDSHIFT_EXP_INVALID_V2 = """
let
Source = AmazonRedshift . Database ( " redsshift-cluster.redshift.amazonaws.com:5439 " , " dev " ) ,
customers_clean1 = demo_dbt_jaffle { [ Name = " customers_clean " ] } [ Data ]
in
customers_clean1
"""
2025-10-08 18:32:25 +05:30
EXPECTED_REDSHIFT_RESULT = [
{
" database " : " dev " ,
" schema " : " demo_dbt_jaffle " ,
" table " : " customers_clean " ,
}
]
2025-03-12 12:59:29 +05:30
MOCK_SNOWFLAKE_EXP = """ let
Source = Snowflake . Databases ( " abcd-123.snowflakecomputing.com " , " COMPUTE_WH " ) ,
DEMO_STAGE_Database = Source { [ Name = " DEMO_STAGE " , Kind = " Database " ] } [ Data ] ,
PUBLIC_Schema = DEMO_STAGE_Database { [ Name = " PUBLIC " , Kind = " Schema " ] } [ Data ] ,
STG_CUSTOMERS_View = PUBLIC_Schema { [ Name = " STG_CUSTOMERS " , Kind = " View " ] } [ Data ]
in
STG_CUSTOMERS_View """
MOCK_SNOWFLAKE_EXP_INVALID = """ let
Source = Snowflake ( " abcd-123.snowflakecomputing.com " , " COMPUTE_WH " ) ,
DEMO_STAGE_Database = Source { [ Name = " DEMO_STAGE " , Kind = " Database " ] } [ Data ] ,
in
STG_CUSTOMERS_View """
2025-10-08 18:32:25 +05:30
EXPECTED_SNOWFLAKE_RESULT = [
{
" database " : " DEMO_STAGE " ,
" schema " : " PUBLIC " ,
" table " : " STG_CUSTOMERS " ,
}
]
2025-03-12 12:59:29 +05:30
2025-10-16 11:51:55 +02:00
MOCK_DATABRICKS_EXP = """ let
Source = Databricks . Catalogs ( Databricks_Server , Databricks_HTTP_Path , [ Catalog = " " , Database = " " ] ) ,
test_database = Source { [ Name = " DEMO_STAGE " , Kind = " Database " ] } [ Data ] ,
test_schema = test_database { [ Name = " PUBLIC " , Kind = " Schema " ] } [ Data ] ,
test_table = test_schema { [ Name = " STG_CUSTOMERS " , Kind = " Table " ] } [ Data ]
in
Source """
MOCK_DATABRICKS_NATIVE_EXP = """ let
Source = Value . NativeQuery ( Databricks . Catalogs ( Databricks_Server , Databricks_HTTP_Path , [ Catalog = " DEMO_CATALOG " , Database = null , EnableAutomaticProxyDiscovery = null ] ) { [ Name = " DEMO_STAGE " , Kind = " Database " ] } [ Data ] , " PUBLIC.STG_CUSTOMERS " , null , [ EnableFolding = true ] )
in
Source """
MOCK_DATABRICKS_NATIVE_QUERY_EXP = """ let
Source = Value . NativeQuery ( Databricks . Catalogs ( Databricks_Server , Databricks_HTTP_Path , [ Catalog = " DEMO_CATALOG " , Database = null , EnableAutomaticProxyDiscovery = null ] ) { [ Name = " DEMO_STAGE " , Kind = " Database " ] } [ Data ] , " SELECT * FROM PUBLIC.STG_CUSTOMERS " , null , [ EnableFolding = true ] )
in
Source """
EXPECTED_DATABRICKS_RESULT = [
{ " database " : " DEMO_STAGE " , " schema " : " PUBLIC " , " table " : " STG_CUSTOMERS " }
]
2025-03-12 12:59:29 +05:30
mock_config = {
" source " : {
" type " : " powerbi " ,
" serviceName " : " mock_metabase " ,
" serviceConnection " : {
" config " : {
" type " : " PowerBI " ,
" clientId " : " client_id " ,
" clientSecret " : " secret " ,
" tenantId " : " tenant_id " ,
} ,
} ,
2025-06-20 16:32:56 +05:30
" sourceConfig " : {
" config " : {
" type " : " DashboardMetadata " ,
" includeOwners " : True ,
}
} ,
2025-03-12 12:59:29 +05:30
} ,
" sink " : { " type " : " metadata-rest " , " config " : { } } ,
" workflowConfig " : {
" loggerLevel " : " DEBUG " ,
" openMetadataServerConfig " : {
" hostPort " : " http://localhost:8585/api " ,
" authProvider " : " openmetadata " ,
2025-10-16 11:51:55 +02:00
" enableVersionValidation " : " false " ,
2025-03-12 12:59:29 +05:30
" securityConfig " : {
" jwtToken " : " eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGc "
" iOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE "
" 2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXB "
" iEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fN "
" r3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3u "
" d-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg "
} ,
} ,
} ,
}
2025-04-02 16:11:27 +05:30
MOCK_DASHBOARD_WITH_OWNERS = {
" id " : " dashboard1 " ,
" displayName " : " Test Dashboard " ,
" webUrl " : " https://test.com " ,
" embedUrl " : " https://test.com/embed " ,
" tiles " : [ ] ,
" users " : [
2025-05-19 19:21:02 +05:30
{
" displayName " : " John Doe " ,
" emailAddress " : " john.doe@example.com " ,
" dashboardUserAccessRight " : " Owner " ,
" userType " : " Member " ,
} ,
{
" displayName " : " Jane Smith " ,
" emailAddress " : " jane.smith@example.com " ,
" dashboardUserAccessRight " : " Owner " ,
" userType " : " Member " ,
} ,
2025-04-02 16:11:27 +05:30
] ,
}
MOCK_DATASET_WITH_OWNERS = {
" id " : " dataset1 " ,
" name " : " Test Dataset " ,
" tables " : [ ] ,
" description " : " Test dataset description " ,
2025-05-19 19:21:02 +05:30
" users " : [
{
" displayName " : " John Doe " ,
" emailAddress " : " john.doe@example.com " ,
" datasetUserAccessRight " : " Owner " ,
" userType " : " Member " ,
}
] ,
2025-04-02 16:11:27 +05:30
}
MOCK_USER_1_ENITYTY_REF_LIST = EntityReferenceList (
root = [ EntityReference ( id = uuid . uuid4 ( ) , name = " John Doe " , type = " user " ) ]
)
MOCK_USER_2_ENITYTY_REF_LIST = EntityReferenceList (
root = [ EntityReference ( id = uuid . uuid4 ( ) , name = " Jane Smith " , type = " user " ) ]
)
2025-05-19 19:21:02 +05:30
MOCK_SNOWFLAKE_EXP_V2 = ' let \n Source = Snowflake.Databases(Snowflake_URL,Warehouse,[Role=Role]), \n Database = Source { [Name=DB,Kind= " Database " ]}[Data], \n DB_Schema = Database { [Name=Schema,Kind= " Schema " ]}[Data], \n Table = DB_Schema { [Name= " CUSTOMER_TABLE " ,Kind= " Table " ]}[Data], \n # " Andere entfernte Spalten " = Table.SelectColumns(Table, { " ID_BERICHTSMONAT " , " ID_AKQUISE_VERMITTLER " , " ID_AKQUISE_OE " , " ID_SPARTE " , " ID_RISIKOTRAEGER " , " ID_KUNDE " , " STUECK " , " BBE " }) \n in \n # " Andere entfernte Spalten " '
2025-10-08 18:32:25 +05:30
EXPECTED_SNOWFLAKE_RESULT_V2 = [
{
" database " : " MY_DB " ,
" schema " : " MY_SCHEMA " ,
" table " : " CUSTOMER_TABLE " ,
}
]
2025-05-19 19:21:02 +05:30
MOCK_DATASET_FROM_WORKSPACE = Dataset (
id = " testdataset " ,
name = " Test Dataset " ,
tables = [ ] ,
expressions = [
{
" name " : " DB " ,
" expression " : ' " MY_DB " meta [IsParameterQuery=true, List= { " MY_DB_DEV " , " MY_DB " , " MY_DB_PROD " }, DefaultValue= " MY_DB " , Type= " Text " , IsParameterQueryRequired=true] ' ,
} ,
{
" name " : " Schema " ,
" expression " : ' " MY_SCHEMA " meta [IsParameterQuery=true, List= { " MY_SCHEMA " , " MY_SCHEMA_PROD " }, DefaultValue= " MY_SCHEMA " , Type= " Text " , IsParameterQueryRequired=true] ' ,
} ,
] ,
)
2025-05-27 16:50:55 +05:30
MOCK_DATASET_FROM_WORKSPACE_V2 = Dataset (
id = " testdataset " ,
name = " Test Dataset " ,
tables = [ ] ,
expressions = [
{
" name " : " DB " ,
} ,
{
" name " : " Schema " ,
} ,
] ,
)
2025-05-19 19:21:02 +05:30
MOCK_DASHBOARD_DATA_MODEL = DashboardDataModel (
name = " dummy_datamodel " ,
id = uuid . uuid4 ( ) ,
columns = [ ] ,
dataModelType = DataModelType . PowerBIDataModel . value ,
)
2025-06-18 20:46:17 +05:30
MOCK_DATAMODEL_ENTITY = DashboardDataModel (
name = " dummy_dataflow_id_a " ,
id = uuid . uuid4 ( ) ,
dataModelType = DataModelType . PowerBIDataFlow . value ,
columns = [ ] ,
)
2025-05-19 19:21:02 +05:30
2025-03-12 12:59:29 +05:30
class PowerBIUnitTest ( TestCase ) :
"""
Implements the necessary methods to extract
powerbi Dashboard Unit Test
"""
@patch (
" metadata.ingestion.source.dashboard.dashboard_service.DashboardServiceSource.test_connection "
)
@patch ( " metadata.ingestion.source.dashboard.powerbi.connection.get_connection " )
def __init__ ( self , methodName , get_connection , test_connection ) - > None :
super ( ) . __init__ ( methodName )
get_connection . return_value = False
test_connection . return_value = False
self . config = OpenMetadataWorkflowConfig . model_validate ( mock_config )
self . powerbi : PowerbiSource = PowerbiSource . create (
mock_config [ " source " ] ,
OpenMetadata ( self . config . workflowConfig . openMetadataServerConfig ) ,
)
@pytest.mark.order ( 1 )
2025-05-19 19:21:02 +05:30
@patch.object (
PowerbiSource ,
" _fetch_dataset_from_workspace " ,
return_value = MOCK_DATASET_FROM_WORKSPACE ,
)
def test_parse_database_source ( self , * _ ) :
2025-03-12 12:59:29 +05:30
# Test with valid redshift source
result = self . powerbi . _parse_redshift_source ( MOCK_REDSHIFT_EXP )
self . assertEqual ( result , EXPECTED_REDSHIFT_RESULT )
# Test with invalid redshift source
result = self . powerbi . _parse_redshift_source ( MOCK_REDSHIFT_EXP_INVALID )
self . assertEqual ( result , None )
# Test with invalid redshift source
result = self . powerbi . _parse_redshift_source ( MOCK_REDSHIFT_EXP_INVALID_V2 )
self . assertEqual ( result , None )
# Test with valid snowflake source
2025-05-19 19:21:02 +05:30
result = self . powerbi . _parse_snowflake_source (
MOCK_SNOWFLAKE_EXP , MOCK_DASHBOARD_DATA_MODEL
)
2025-03-12 12:59:29 +05:30
self . assertEqual ( result , EXPECTED_SNOWFLAKE_RESULT )
# Test with invalid snowflake source
2025-05-19 19:21:02 +05:30
result = self . powerbi . _parse_snowflake_source (
MOCK_SNOWFLAKE_EXP_INVALID , MOCK_DASHBOARD_DATA_MODEL
)
2025-03-12 12:59:29 +05:30
self . assertEqual ( result , None )
2025-04-02 16:11:27 +05:30
2025-05-19 19:21:02 +05:30
result = self . powerbi . _parse_snowflake_source (
MOCK_SNOWFLAKE_EXP_V2 , MOCK_DASHBOARD_DATA_MODEL
)
self . assertEqual ( result , EXPECTED_SNOWFLAKE_RESULT_V2 )
2025-10-08 18:32:25 +05:30
test_snowflaek_query_expression = ' let \n Source = Value.NativeQuery(Snowflake.Databases( " dummy_host " ,(Warehouse)) { [Name=(Database)]}[Data], " select * from " & Database & " . " " STG " " . " " STATIC_AOPANDLE " " " , null, [EnableFolding=true]), \n # " Renamed Columns " = Table.RenameColumns(Source, {{ " AOP_IMPRESSIONS " , " AOP Impressions " }, { " AOP_ORDERS " , " AOP Orders " }, { " AOP_SPEND " , " AOP Spend " }, { " AOP_TOTAL_REV " , " AOP Total Revenue " }, { " AOP_UNITS " , " AOP Units " }, { " AOP_VISITS " , " AOP Visits " }, { " LE_IMPRESSIONS " , " LE Impressions " }, { " LE_ORDERS " , " LE Orders " }, { " LE_SPEND " , " LE Spend " }, { " LE_TOTAL_REV " , " LE Total Revenue " }, { " LE_UNITS " , " LE Units " }, { " LE_VISITS " , " LE Visits " }, { " SITEID " , " SiteID " }, { " COUNTRY " , " Country " }, { " REGION " , " Region " }, { " CHANNEL " , " Channel " }, { " DATE " , " Date " }, { " AOP_CONV " , " AOP_Conv " }, { " LE_CONV " , " LE_Conv " }}), \n # " Changed Type " = Table.TransformColumnTypes(# " Renamed Columns " , {{ " SiteID " , type text}, { " AOP Impressions " , type number}, { " AOP Visits " , type number}, { " AOP Orders " , type number}, { " AOP Units " , type number}, { " AOP Total Revenue " , type number}, { " AOP Spend " , type number}, { " AOP_Conv " , type number}, { " AOP_UPT " , type number}, { " AOP_ASP " , type number}, { " AOP_AOV " , type number}, { " AOP_CTR " , type number}, { " LE Impressions " , type number}, { " LE Visits " , type number}, { " LE Orders " , type number}, { " LE Units " , type number}, { " LE Total Revenue " , type number}, { " LE Spend " , type number}, { " LE_Conv " , type number}, { " LE_UPT " , type number}, { " LE_ASP " , type number}, { " LE_AOV " , type number}, { " LE_CTR " , type number}}), \n # " Duplicated Column " = Table.DuplicateColumn(# " Changed Type " , " Date " , " Date - Copy " ), \n # " Split Column by Delimiter " = Table.SplitColumn(# " Duplicated Column " , " Date - Copy " , Splitter.SplitTextByDelimiter( " - " , QuoteStyle.None), { " Date - Copy.1 " , " Date - Copy.2 " , " Date - Copy.3 " }), \n # " Changed Type1 " = Table.TransformColumnTypes(# " Split Column by Delimiter " , {{ " Date - Copy.1 " , type text}, { " Date - Copy.2 " , type text}, { " Date - Copy.3 " , type text}}), \n # " Inserted Merged Column " = Table.AddColumn(# " Changed Type1 " , " Merged " , each Text.Combine( { [# " Date - Copy.1 " ], [# " Date - Copy.2 " ], [# " Date - Copy.3 " ]}, " " ), type text), \n # " Renamed Columns1 " = Table.RenameColumns(# " Inserted Merged Column " , {{ " Merged " , " DateKey " }}), \n # " Removed Columns " = Table.RemoveColumns(# " Renamed Columns1 " , { " Date - Copy.1 " , " Date - Copy.2 " , " Date - Copy.3 " }), \n # " Added Custom " = Table.AddColumn(# " Removed Columns " , " Brand " , each " CROCS " ), \n # " Changed Type2 " = Table.TransformColumnTypes(# " Added Custom " , {{ " Brand " , type text}}) \n in \n # " Changed Type2 " '
result = self . powerbi . _parse_snowflake_source (
test_snowflaek_query_expression , MOCK_DASHBOARD_DATA_MODEL
)
# Test should parse the Snowflake query and extract table info
self . assertIsNotNone ( result )
self . assertEqual ( len ( result ) , 1 )
result_table = result [ 0 ]
self . assertEqual ( result_table . get ( " schema " ) , " STG " )
self . assertEqual ( result_table . get ( " table " ) , " STATIC_AOPANDLE " )
2025-10-16 11:51:55 +02:00
# Test with valid databricks native source
result = self . powerbi . _parse_databricks_source (
MOCK_DATABRICKS_NATIVE_EXP , MOCK_DASHBOARD_DATA_MODEL
)
self . assertEqual ( result , EXPECTED_DATABRICKS_RESULT )
result = self . powerbi . _parse_databricks_source (
MOCK_DATABRICKS_NATIVE_QUERY_EXP , MOCK_DASHBOARD_DATA_MODEL
)
self . assertEqual ( result , EXPECTED_DATABRICKS_RESULT )
result = self . powerbi . _parse_databricks_source (
MOCK_DATABRICKS_EXP , MOCK_DASHBOARD_DATA_MODEL
)
self . assertEqual ( result , EXPECTED_DATABRICKS_RESULT )
2025-04-02 16:11:27 +05:30
@pytest.mark.order ( 2 )
@patch ( " metadata.ingestion.ometa.ometa_api.OpenMetadata.get_reference_by_email " )
def test_owner_ingestion ( self , get_reference_by_email ) :
# Mock responses for dashboard owners
self . powerbi . metadata . get_reference_by_email . side_effect = [
MOCK_USER_1_ENITYTY_REF_LIST ,
MOCK_USER_2_ENITYTY_REF_LIST ,
]
# Test dashboard owner ingestion
dashboard = PowerBIDashboard . model_validate ( MOCK_DASHBOARD_WITH_OWNERS )
owner_ref = self . powerbi . get_owner_ref ( dashboard )
self . assertIsNotNone ( owner_ref )
self . assertEqual ( len ( owner_ref . root ) , 2 )
self . assertEqual ( owner_ref . root [ 0 ] . name , " John Doe " )
self . assertEqual ( owner_ref . root [ 1 ] . name , " Jane Smith " )
# Verify get_reference_by_email was called with correct emails
self . powerbi . metadata . get_reference_by_email . assert_any_call (
" john.doe@example.com "
)
self . powerbi . metadata . get_reference_by_email . assert_any_call (
" jane.smith@example.com "
)
# Reset mock for dataset test
self . powerbi . metadata . get_reference_by_email . reset_mock ( )
self . powerbi . metadata . get_reference_by_email . side_effect = [
MOCK_USER_1_ENITYTY_REF_LIST
]
# Test dataset owner ingestion
dataset = Dataset . model_validate ( MOCK_DATASET_WITH_OWNERS )
owner_ref = self . powerbi . get_owner_ref ( dataset )
self . assertIsNotNone ( owner_ref . root )
self . assertEqual ( len ( owner_ref . root ) , 1 )
self . assertEqual ( owner_ref . root [ 0 ] . name , " John Doe " )
# Verify get_reference_by_email was called with correct email
self . powerbi . metadata . get_reference_by_email . assert_called_once_with (
" john.doe@example.com "
)
# Reset mock for no owners test
self . powerbi . metadata . get_reference_by_email . reset_mock ( )
# Test with no owners
dashboard_no_owners = PowerBIDashboard . model_validate (
{
" id " : " dashboard2 " ,
" displayName " : " Test Dashboard 2 " ,
" webUrl " : " https://test.com " ,
" embedUrl " : " https://test.com/embed " ,
" tiles " : [ ] ,
" users " : [ ] ,
}
)
owner_ref = self . powerbi . get_owner_ref ( dashboard_no_owners )
self . assertIsNone ( owner_ref )
# Verify get_reference_by_email was not called when there are no owners
self . powerbi . metadata . get_reference_by_email . assert_not_called ( )
2025-05-27 16:50:55 +05:30
2025-06-12 16:11:43 +05:30
# Reset mock for invalid owners test
self . powerbi . metadata . get_reference_by_email . reset_mock ( )
# Test with invalid owners
dashboard_invalid_owners = PowerBIDashboard . model_validate (
{
" id " : " dashboard3 " ,
" displayName " : " Test Dashboard 3 " ,
" webUrl " : " https://test.com " ,
" embedUrl " : " https://test.com/embed " ,
" tiles " : [ ] ,
" users " : [
{
" displayName " : " Kane Williams " ,
" emailAddress " : " kane.williams@example.com " ,
" dashboardUserAccessRight " : " Read " ,
" userType " : " Member " ,
} ,
] ,
}
)
owner_ref = self . powerbi . get_owner_ref ( dashboard_invalid_owners )
self . assertIsNone ( owner_ref )
# Verify get_reference_by_email was not called when there are no owners
self . powerbi . metadata . get_reference_by_email . assert_not_called ( )
2025-05-27 16:50:55 +05:30
@pytest.mark.order ( 3 )
def test_parse_table_info_from_source_exp ( self ) :
table = PowerBiTable (
name = " test_table " ,
source = [ PowerBITableSource ( expression = MOCK_REDSHIFT_EXP ) ] ,
)
result = self . powerbi . _parse_table_info_from_source_exp (
table , MOCK_DASHBOARD_DATA_MODEL
)
self . assertEqual ( result , EXPECTED_REDSHIFT_RESULT )
# no source expression
table = PowerBiTable (
name = " test_table " ,
source = [ PowerBITableSource ( expression = None ) ] ,
)
result = self . powerbi . _parse_table_info_from_source_exp (
table , MOCK_DASHBOARD_DATA_MODEL
)
2025-10-08 18:32:25 +05:30
self . assertEqual ( result , None )
2025-05-27 16:50:55 +05:30
# no source
table = PowerBiTable (
name = " test_table " ,
source = [ ] ,
)
result = self . powerbi . _parse_table_info_from_source_exp (
table , MOCK_DASHBOARD_DATA_MODEL
)
2025-10-08 18:32:25 +05:30
self . assertEqual ( result , None )
2025-05-27 16:50:55 +05:30
@pytest.mark.order ( 4 )
@patch.object (
PowerbiSource ,
" _fetch_dataset_from_workspace " ,
return_value = MOCK_DATASET_FROM_WORKSPACE_V2 ,
)
def test_parse_dataset_expressions ( self , * _ ) :
# test with valid snowflake source but no
# dataset expression value
result = self . powerbi . _parse_snowflake_source (
MOCK_SNOWFLAKE_EXP_V2 , MOCK_DASHBOARD_DATA_MODEL
)
2025-10-08 18:32:25 +05:30
result = result [ 0 ]
2025-05-27 16:50:55 +05:30
self . assertIsNone ( result [ " database " ] )
self . assertIsNone ( result [ " schema " ] )
self . assertEqual ( result [ " table " ] , " CUSTOMER_TABLE " )
2025-06-18 20:46:17 +05:30
@pytest.mark.order ( 5 )
@patch.object ( OpenMetadata , " get_by_name " , return_value = MOCK_DATAMODEL_ENTITY )
@patch.object ( fqn , " build " , return_value = None )
def test_upstream_dataflow_lineage ( self , * _ ) :
MOCK_DATAMODEL_ENTITY_2 = DashboardDataModel (
name = " dummy_dataflow_id_b " ,
id = uuid . uuid4 ( ) ,
dataModelType = DataModelType . PowerBIDataFlow . value ,
columns = [ ] ,
)
MOCK_DATAMODEL_2 = Dataflow (
name = " dataflow_b " ,
objectId = " dummy_dataflow_id_b " ,
upstreamDataflows = [
UpstreaDataflow (
targetDataflowId = " dataflow_a " ,
)
] ,
)
lineage_request = list (
self . powerbi . create_dataflow_upstream_dataflow_lineage (
MOCK_DATAMODEL_2 , MOCK_DATAMODEL_ENTITY_2
)
)
assert lineage_request [ 0 ] . right is not None
2025-10-03 10:53:25 +05:30
@pytest.mark.order ( 6 )
def test_include_owners_flag_enabled ( self ) :
"""
Test that when includeOwners is True , owner information is processed
"""
# Mock the source config to have includeOwners = True
self . powerbi . source_config . includeOwners = True
# Test that owner information is processed when includeOwners is True
self . assertTrue ( self . powerbi . source_config . includeOwners )
# Test with a dashboard that has owners
dashboard_with_owners = PowerBIDashboard . model_validate (
MOCK_DASHBOARD_WITH_OWNERS
)
# Mock the metadata.get_reference_by_email method to return different users for different emails
with patch . object (
self . powerbi . metadata , " get_reference_by_email "
) as mock_get_ref :
def mock_get_ref_by_email ( email ) :
if email == " john.doe@example.com " :
return EntityReferenceList (
root = [
EntityReference (
id = uuid . uuid4 ( ) , name = " John Doe " , type = " user "
)
]
)
elif email == " jane.smith@example.com " :
return EntityReferenceList (
root = [
EntityReference (
id = uuid . uuid4 ( ) , name = " Jane Smith " , type = " user "
)
]
)
return EntityReferenceList ( root = [ ] )
mock_get_ref . side_effect = mock_get_ref_by_email
# Test get_owner_ref with includeOwners = True
result = self . powerbi . get_owner_ref ( dashboard_with_owners )
# Should return owner reference when includeOwners is True
self . assertIsNotNone ( result )
self . assertEqual ( len ( result . root ) , 2 )
# Check that both owners are present
owner_names = [ owner . name for owner in result . root ]
self . assertIn ( " John Doe " , owner_names )
self . assertIn ( " Jane Smith " , owner_names )
@pytest.mark.order ( 7 )
def test_include_owners_flag_disabled ( self ) :
"""
Test that when includeOwners is False , owner information is not processed
"""
# Mock the source config to have includeOwners = False
self . powerbi . source_config . includeOwners = False
# Test that owner information is not processed when includeOwners is False
self . assertFalse ( self . powerbi . source_config . includeOwners )
# Test with a dashboard that has owners
dashboard_with_owners = PowerBIDashboard . model_validate (
MOCK_DASHBOARD_WITH_OWNERS
)
# Test get_owner_ref with includeOwners = False
result = self . powerbi . get_owner_ref ( dashboard_with_owners )
# Should return None when includeOwners is False
self . assertIsNone ( result )
@pytest.mark.order ( 8 )
def test_include_owners_flag_in_config ( self ) :
"""
Test that the includeOwners flag is properly set in the configuration
"""
# Check that the mock configuration includes the includeOwners flag
config = mock_config [ " source " ] [ " sourceConfig " ] [ " config " ]
self . assertIn ( " includeOwners " , config )
self . assertTrue ( config [ " includeOwners " ] )
@pytest.mark.order ( 9 )
def test_include_owners_flag_with_no_owners ( self ) :
"""
Test that when includeOwners is True but dashboard has no owners , returns None
"""
# Mock the source config to have includeOwners = True
self . powerbi . source_config . includeOwners = True
# Create a dashboard with no owners
dashboard_no_owners = PowerBIDashboard . model_validate (
{
" id " : " dashboard_no_owners " ,
" displayName " : " Test Dashboard No Owners " ,
" webUrl " : " https://test.com " ,
" embedUrl " : " https://test.com/embed " ,
" tiles " : [ ] ,
" users " : [ ] , # No users/owners
}
)
# Test get_owner_ref with no owners
result = self . powerbi . get_owner_ref ( dashboard_no_owners )
# Should return None when there are no owners
self . assertIsNone ( result )
@pytest.mark.order ( 10 )
def test_include_owners_flag_with_exception ( self ) :
"""
Test that when includeOwners is True but an exception occurs , it ' s handled gracefully
"""
# Mock the source config to have includeOwners = True
self . powerbi . source_config . includeOwners = True
# Test with a dashboard that has owners
dashboard_with_owners = PowerBIDashboard . model_validate (
MOCK_DASHBOARD_WITH_OWNERS
)
# Mock the metadata.get_reference_by_email method to raise an exception
with patch . object (
self . powerbi . metadata ,
" get_reference_by_email " ,
side_effect = Exception ( " API Error " ) ,
) :
# Test get_owner_ref with exception
result = self . powerbi . get_owner_ref ( dashboard_with_owners )
# Should return None when exception occurs
self . assertIsNone ( result )