diff --git a/ingestion/src/metadata/ingestion/source/dashboard/powerbi/metadata.py b/ingestion/src/metadata/ingestion/source/dashboard/powerbi/metadata.py index 98ee638101f..61a001a5a1f 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/powerbi/metadata.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/powerbi/metadata.py @@ -10,6 +10,7 @@ # limitations under the License. """PowerBI source module""" +import re import traceback from typing import Any, Iterable, List, Optional, Union @@ -616,6 +617,87 @@ class PowerbiSource(DashboardServiceSource): logger.debug(f"Error to get data_model_column_fqn {exc}") logger.debug(traceback.format_exc()) + def _parse_redshift_source(self, source_expression: str) -> Optional[dict]: + try: + db_match = re.search( + r'AmazonRedshift\.Database\("[^"]+","([^"]+)"\)', source_expression + ) + if not db_match: + # not valid redshift source + return None + schema_table_match = re.findall(r'\[Name="([^"]+)"\]', source_expression) + + database = db_match.group(1) if db_match else None + schema = table = None + if isinstance(schema_table_match, list): + schema = schema_table_match[0] if len(schema_table_match) > 0 else None + table = schema_table_match[1] if len(schema_table_match) > 1 else None + + if ( + schema and table + ): # atlease 2 entities schema and table should be fetched + return {"database": database, "schema": schema, "table": table} + return None + except Exception as exc: + logger.debug(f"Error to parse redshift table source: {exc}") + logger.debug(traceback.format_exc()) + return None + + def _parse_snowflake_source(self, source_expression: str) -> Optional[dict]: + try: + if "Snowflake.Databases" not in source_expression: + # Not a snowflake valid expression + return None + db_match = re.search( + r'\[Name="([^"]+)",Kind="Database"\]', source_expression + ) + schema_match = re.search( + r'\[Name="([^"]+)",Kind="Schema"\]', source_expression + ) + view_match = re.search(r'\[Name="([^"]+)",Kind="View"\]', source_expression) + table_match = re.search( + r'\[Name="([^"]+)",Kind="Table"\]', source_expression + ) + + database = db_match.group(1) if db_match else None + schema = schema_match.group(1) if schema_match else None + view = view_match.group(1) if view_match else None + table = table_match.group(1) if table_match else None + + if schema and ( + table or view + ): # atlease 2 entities schema and table should be fetched + return { + "database": database, + "schema": schema, + "table": table if table else view, + } + return None + except Exception as exc: + logger.debug(f"Error to parse snowflake table source: {exc}") + logger.debug(traceback.format_exc()) + return None + + def _parse_table_info_from_source_exp(self, table: PowerBiTable) -> dict: + try: + if not isinstance(table.source, list): + return {} + source_expression = table.source[0].expression + + # parse snowflake source + table_info = self._parse_snowflake_source(source_expression) + if isinstance(table_info, dict): + return table_info + # parse redshift source + table_info = self._parse_redshift_source(source_expression) + if isinstance(table_info, dict): + return table_info + return {} + except Exception as exc: + logger.debug(f"Error to parse table source: {exc}") + logger.debug(traceback.format_exc()) + return {} + def _get_table_and_datamodel_lineage( self, db_service_name: Optional[str], @@ -626,11 +708,12 @@ class PowerbiSource(DashboardServiceSource): Method to create lineage between table and datamodels """ try: + table_info = self._parse_table_info_from_source_exp(table) fqn_search_string = build_es_fqn_search_string( - database_name=None, - schema_name=None, + database_name=table_info.get("database"), + schema_name=table_info.get("schema"), service_name=db_service_name or "*", - table_name=table.name, + table_name=table_info.get("table") or table.name, ) table_entity = self.metadata.search_in_any_service( entity_type=Table, diff --git a/ingestion/src/metadata/ingestion/source/dashboard/powerbi/models.py b/ingestion/src/metadata/ingestion/source/dashboard/powerbi/models.py index da8fcbe92b6..918c9e9a531 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/powerbi/models.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/powerbi/models.py @@ -98,6 +98,14 @@ class PowerBiColumns(BaseModel): description: Optional[str] = None +class PowerBITableSource(BaseModel): + """ + PowerBI Table Source + """ + + expression: str + + class PowerBiTable(BaseModel): """ PowerBI Table Model @@ -107,6 +115,7 @@ class PowerBiTable(BaseModel): name: str columns: Optional[List[PowerBiColumns]] = None description: Optional[str] = None + source: Optional[List[PowerBITableSource]] = None class TablesResponse(BaseModel): diff --git a/ingestion/tests/unit/topology/dashboard/test_powerbi.py b/ingestion/tests/unit/topology/dashboard/test_powerbi.py new file mode 100644 index 00000000000..618f6b50c46 --- /dev/null +++ b/ingestion/tests/unit/topology/dashboard/test_powerbi.py @@ -0,0 +1,138 @@ +from unittest import TestCase +from unittest.mock import patch + +import pytest + +from metadata.generated.schema.metadataIngestion.workflow import ( + OpenMetadataWorkflowConfig, +) +from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.ingestion.source.dashboard.powerbi.metadata import PowerbiSource + +MOCK_REDSHIFT_EXP = """ +let +Source = AmazonRedshift.Database("redsshift-cluster.redshift.amazonaws.com:5439","dev"), +demo_dbt_jaffle = Source{[Name="demo_dbt_jaffle"]}[Data], +customers_clean1 = demo_dbt_jaffle{[Name="customers_clean"]}[Data] +in +customers_clean1 +""" + +MOCK_REDSHIFT_EXP_INVALID = """ +let +Source = Database("redsshift-cluster.redshift.amazonaws.com:5439","dev"), +demo_dbt_jaffle = Source{[Name="demo_dbt_jaffle"]}[Data], +customers_clean1 = demo_dbt_jaffle{[Name="customers_clean"]}[Data] +in +customers_clean1 +""" + +MOCK_REDSHIFT_EXP_INVALID_V2 = """ +let +Source = AmazonRedshift.Database("redsshift-cluster.redshift.amazonaws.com:5439","dev"), +customers_clean1 = demo_dbt_jaffle{[Name="customers_clean"]}[Data] +in +customers_clean1 +""" + +EXPECTED_REDSHIFT_RESULT = { + "database": "dev", + "schema": "demo_dbt_jaffle", + "table": "customers_clean", +} + + +MOCK_SNOWFLAKE_EXP = """let + Source = Snowflake.Databases("abcd-123.snowflakecomputing.com","COMPUTE_WH"), + DEMO_STAGE_Database = Source{[Name="DEMO_STAGE",Kind="Database"]}[Data], + PUBLIC_Schema = DEMO_STAGE_Database{[Name="PUBLIC",Kind="Schema"]}[Data], + STG_CUSTOMERS_View = PUBLIC_Schema{[Name="STG_CUSTOMERS",Kind="View"]}[Data] +in + STG_CUSTOMERS_View""" + +MOCK_SNOWFLAKE_EXP_INVALID = """let + Source = Snowflake("abcd-123.snowflakecomputing.com","COMPUTE_WH"), + DEMO_STAGE_Database = Source{[Name="DEMO_STAGE",Kind="Database"]}[Data], +in + STG_CUSTOMERS_View""" + +EXPECTED_SNOWFLAKE_RESULT = { + "database": "DEMO_STAGE", + "schema": "PUBLIC", + "table": "STG_CUSTOMERS", +} + +mock_config = { + "source": { + "type": "powerbi", + "serviceName": "mock_metabase", + "serviceConnection": { + "config": { + "type": "PowerBI", + "clientId": "client_id", + "clientSecret": "secret", + "tenantId": "tenant_id", + }, + }, + "sourceConfig": {"config": {"type": "DashboardMetadata"}}, + }, + "sink": {"type": "metadata-rest", "config": {}}, + "workflowConfig": { + "loggerLevel": "DEBUG", + "openMetadataServerConfig": { + "hostPort": "http://localhost:8585/api", + "authProvider": "openmetadata", + "securityConfig": { + "jwtToken": "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGc" + "iOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE" + "2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXB" + "iEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fN" + "r3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3u" + "d-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg" + }, + }, + }, +} + + +class PowerBIUnitTest(TestCase): + """ + Implements the necessary methods to extract + powerbi Dashboard Unit Test + """ + + @patch( + "metadata.ingestion.source.dashboard.dashboard_service.DashboardServiceSource.test_connection" + ) + @patch("metadata.ingestion.source.dashboard.powerbi.connection.get_connection") + def __init__(self, methodName, get_connection, test_connection) -> None: + super().__init__(methodName) + get_connection.return_value = False + test_connection.return_value = False + self.config = OpenMetadataWorkflowConfig.model_validate(mock_config) + self.powerbi: PowerbiSource = PowerbiSource.create( + mock_config["source"], + OpenMetadata(self.config.workflowConfig.openMetadataServerConfig), + ) + + @pytest.mark.order(1) + def test_parse_database_source(self): + # Test with valid redshift source + result = self.powerbi._parse_redshift_source(MOCK_REDSHIFT_EXP) + self.assertEqual(result, EXPECTED_REDSHIFT_RESULT) + + # Test with invalid redshift source + result = self.powerbi._parse_redshift_source(MOCK_REDSHIFT_EXP_INVALID) + self.assertEqual(result, None) + + # Test with invalid redshift source + result = self.powerbi._parse_redshift_source(MOCK_REDSHIFT_EXP_INVALID_V2) + self.assertEqual(result, None) + + # Test with valid snowflake source + result = self.powerbi._parse_snowflake_source(MOCK_SNOWFLAKE_EXP) + self.assertEqual(result, EXPECTED_SNOWFLAKE_RESULT) + + # Test with invalid snowflake source + result = self.powerbi._parse_snowflake_source(MOCK_SNOWFLAKE_EXP_INVALID) + self.assertEqual(result, None)