diff --git a/ingestion/src/metadata/ingestion/source/dashboard/powerbi/metadata.py b/ingestion/src/metadata/ingestion/source/dashboard/powerbi/metadata.py index fca9c29dd64..b9ce087910a 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/powerbi/metadata.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/powerbi/metadata.py @@ -54,6 +54,8 @@ from metadata.generated.schema.type.basic import ( from metadata.generated.schema.type.entityReferenceList import EntityReferenceList from metadata.ingestion.api.models import Either from metadata.ingestion.api.steps import InvalidSourceException +from metadata.ingestion.lineage.models import Dialect +from metadata.ingestion.lineage.parser import LineageParser from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.ometa.utils import model_str from metadata.ingestion.source.dashboard.dashboard_service import DashboardServiceSource @@ -80,6 +82,7 @@ from metadata.utils.logger import ingestion_logger logger = ingestion_logger() OWNER_ACCESS_RIGHTS_KEYWORDS = ["owner", "write", "admin"] +SNOWFLAKE_QUERY_EXPRESSION_KW = "Value.NativeQuery(Snowflake.Databases(" class PowerbiSource(DashboardServiceSource): @@ -788,7 +791,7 @@ class PowerbiSource(DashboardServiceSource): logger.debug(traceback.format_exc()) return None - def _parse_redshift_source(self, source_expression: str) -> Optional[dict]: + def _parse_redshift_source(self, source_expression: str) -> Optional[List[dict]]: try: db_match = re.search( r'AmazonRedshift\.Database\("[^"]+","([^"]+)"\)', source_expression @@ -805,20 +808,167 @@ class PowerbiSource(DashboardServiceSource): table = schema_table_match[1] if len(schema_table_match) > 1 else None if table: # atlease table should be fetched - return {"database": database, "schema": schema, "table": table} + return [{"database": database, "schema": schema, "table": table}] return None except Exception as exc: logger.debug(f"Error to parse redshift table source: {exc}") logger.debug(traceback.format_exc()) return None + def _parse_snowflake_query_source( + self, source_expression: str + ) -> Optional[List[dict]]: + """ + Parse snowflake query source + source expressions like `Value.NativeQuery(Snowflake.Databases())` + """ + try: + logger.debug( + f"parsing source expression through query parser: {source_expression[:100]}" + ) + + # Look for SQL query after [Data], + # The pattern needs to handle the concatenated strings with & operators + m = re.search( + r'\[Data\],\s*"(.+?)"(?:,\s*null|\s*\))', + source_expression, + re.IGNORECASE | re.DOTALL, + ) + + if not m: + logger.debug("sql query not found in source expression") + return None + + # Extract and clean the SQL query + sql_query = m.group(1).replace('""', '"') + + # Handle PowerBI parameter concatenation (e.g., "& Database &") + # For now, replace parameter references with wildcards for parsing + sql_query_cleaned = re.sub(r'"?\s*&\s*\w+\s*&\s*"?\.?', "%.", sql_query) + + logger.debug(f"Extracted SQL query: {sql_query}") + logger.debug(f"Cleaned SQL query: {sql_query_cleaned}") + + if not sql_query_cleaned: + logger.debug("Empty SQL query after extraction") + return None + + # Clean the query for parser + # 1. Replace % with a placeholder database name + parser_query = sql_query_cleaned.replace("%", "PLACEHOLDER_DB") + + # 2. Remove PowerBI line feed markers #(lf) and clean up the query + parser_query = parser_query.replace("#(lf)", "\n") + + # 3. Remove SQL comments that might cause issues (// style comments) + parser_query = re.sub(r"//[^\n]*", "", parser_query) + + # 4. Clean up excessive whitespace + parser_query = re.sub(r"\s+", " ", parser_query).strip() + + logger.debug( + f"Attempting LineageParser with cleaned query: {parser_query[:200]}" + ) + + try: + parser = LineageParser( + parser_query, dialect=Dialect.SNOWFLAKE, timeout_seconds=30 + ) + except Exception as parser_exc: + logger.debug(f"LineageParser failed with error: {parser_exc}") + logger.debug(f"Failed query was: {parser_query[:200]}...") + return None + + if parser.source_tables: + logger.debug( + f"LineageParser found {len(parser.source_tables)} source table(s)" + ) + for table in parser.source_tables: + logger.debug( + f"source table: {table.raw_name}, schema: {table.schema if hasattr(table, 'schema') else 'N/A'}" + ) + lineage_tables_list = [] + for source_table in parser.source_tables: + # source_table = parser.source_tables[0] + + # Extract database from schema's parent if it exists + database = None + schema = None + + if hasattr(source_table, "schema") and source_table.schema: + # Log what we have in the schema object + logger.debug( + f"Schema object type: {type(source_table.schema)}, value: {source_table.schema}" + ) + + # Get schema as string first + schema_str = ( + source_table.schema.raw_name + if hasattr(source_table.schema, "raw_name") + else str(source_table.schema) + ) + + # If schema contains dots, it might be database.schema format + if "." in schema_str: + parts = schema_str.split(".") + if len(parts) == 2: + # Format: database.schema + # Check for placeholder (case insensitive) + database = ( + parts[0] + if parts[0].upper() != "PLACEHOLDER_DB" + else None + ) + schema = parts[1] + else: + # Just use as is + schema = schema_str + else: + schema = schema_str + # Check if schema has a parent (database) + if ( + hasattr(source_table.schema, "parent") + and source_table.schema.parent + ): + database = ( + source_table.schema.parent.raw_name + if hasattr(source_table.schema.parent, "raw_name") + else str(source_table.schema.parent) + ) + + # Filter out placeholder values (case insensitive) + if database and database.upper() == "PLACEHOLDER_DB": + database = None + + table = source_table.raw_name + + if table: + logger.debug(f"tables found = {database}.{schema}.{table}") + lineage_tables_list.append( + { + "database": database, + "schema": schema, + "table": table, + } + ) + return lineage_tables_list + logger.debug("tables in query not found through parser") + return None + except Exception as exc: + logger.debug(f"Error parsing snowflake query source: {exc}") + logger.debug(traceback.format_exc()) + return None + def _parse_snowflake_source( self, source_expression: str, datamodel_entity: DashboardDataModel - ) -> Optional[dict]: + ) -> Optional[List[dict]]: try: if "Snowflake.Databases" not in source_expression: # Not a snowflake valid expression return None + if SNOWFLAKE_QUERY_EXPRESSION_KW in source_expression: + # snowflake query source identified + return self._parse_snowflake_query_source(source_expression) db_match = re.search( r'\[Name=(?:"([^"]+)"|([^,]+)),Kind="Database"\]', source_expression ) @@ -838,11 +988,13 @@ class PowerbiSource(DashboardServiceSource): view = self._parse_snowflake_regex_exp(view_match, datamodel_entity) if table or view: # atlease table or view should be fetched - return { - "database": database, - "schema": schema, - "table": table if table else view, - } + return [ + { + "database": database, + "schema": schema, + "table": table if table else view, + } + ] return None except Exception as exc: logger.debug(f"Error to parse snowflake table source: {exc}") @@ -851,29 +1003,29 @@ class PowerbiSource(DashboardServiceSource): def _parse_table_info_from_source_exp( self, table: PowerBiTable, datamodel_entity: DashboardDataModel - ) -> dict: + ) -> Optional[List[dict]]: try: if not isinstance(table.source, list): - return {} + return None source_expression = table.source[0].expression if not source_expression: logger.debug(f"No source expression found for table: {table.name}") - return {} + return None # parse snowflake source - table_info = self._parse_snowflake_source( + table_info_list = self._parse_snowflake_source( source_expression, datamodel_entity ) - if isinstance(table_info, dict): - return table_info + if isinstance(table_info_list, List): + return table_info_list # parse redshift source - table_info = self._parse_redshift_source(source_expression) - if isinstance(table_info, dict): - return table_info - return {} + table_info_list = self._parse_redshift_source(source_expression) + if isinstance(table_info_list, List): + return table_info_list + return None except Exception as exc: logger.debug(f"Error to parse table source: {exc}") logger.debug(traceback.format_exc()) - return {} + return None def _get_table_and_datamodel_lineage( self, @@ -892,60 +1044,64 @@ class PowerbiSource(DashboardServiceSource): ) = self.parse_db_service_prefix(db_service_prefix) try: - table_info = self._parse_table_info_from_source_exp(table, datamodel_entity) - table_name = table_info.get("table") or table.name - schema_name = table_info.get("schema") - database_name = table_info.get("database") - if ( - prefix_table_name - and table_name - and prefix_table_name.lower() != table_name.lower() - ): - logger.debug( - f"Table {table_name} does not match prefix {prefix_table_name}" - ) - return - - if ( - prefix_schema_name - and schema_name - and prefix_schema_name.lower() != schema_name.lower() - ): - logger.debug( - f"Schema {table_info.get('schema')} does not match prefix {prefix_schema_name}" - ) - return - - if ( - prefix_database_name - and database_name - and prefix_database_name.lower() != database_name.lower() - ): - logger.debug( - f"Database {table_info.get('database')} does not match prefix {prefix_database_name}" - ) - return - - fqn_search_string = build_es_fqn_search_string( - service_name=prefix_service_name or "*", - table_name=(prefix_table_name or table_name), - schema_name=(prefix_schema_name or schema_name), - database_name=(prefix_database_name or database_name), + table_info_list = self._parse_table_info_from_source_exp( + table, datamodel_entity ) - table_entity = self.metadata.search_in_any_service( - entity_type=Table, - fqn_search_string=fqn_search_string, - ) - if table_entity and datamodel_entity: - columns_list = [column.name for column in table.columns] - column_lineage = self._get_column_lineage( - table_entity, datamodel_entity, columns_list - ) - yield self._get_add_lineage_request( - to_entity=datamodel_entity, - from_entity=table_entity, - column_lineage=column_lineage, - ) + if isinstance(table_info_list, List): + for table_info in table_info_list: + table_name = table_info.get("table") or table.name + schema_name = table_info.get("schema") + database_name = table_info.get("database") + if ( + prefix_table_name + and table_name + and prefix_table_name.lower() != table_name.lower() + ): + logger.debug( + f"Table {table_name} does not match prefix {prefix_table_name}" + ) + return + + if ( + prefix_schema_name + and schema_name + and prefix_schema_name.lower() != schema_name.lower() + ): + logger.debug( + f"Schema {table_info.get('schema')} does not match prefix {prefix_schema_name}" + ) + return + + if ( + prefix_database_name + and database_name + and prefix_database_name.lower() != database_name.lower() + ): + logger.debug( + f"Database {table_info.get('database')} does not match prefix {prefix_database_name}" + ) + return + + fqn_search_string = build_es_fqn_search_string( + service_name=prefix_service_name or "*", + table_name=(prefix_table_name or table_name), + schema_name=(prefix_schema_name or schema_name), + database_name=(prefix_database_name or database_name), + ) + table_entity = self.metadata.search_in_any_service( + entity_type=Table, + fqn_search_string=fqn_search_string, + ) + if table_entity and datamodel_entity: + columns_list = [column.name for column in table.columns] + column_lineage = self._get_column_lineage( + table_entity, datamodel_entity, columns_list + ) + yield self._get_add_lineage_request( + to_entity=datamodel_entity, + from_entity=table_entity, + column_lineage=column_lineage, + ) except Exception as exc: # pylint: disable=broad-except yield Either( left=StackTraceError( diff --git a/ingestion/tests/unit/topology/dashboard/test_powerbi.py b/ingestion/tests/unit/topology/dashboard/test_powerbi.py index 1774358df82..cc4f9dfdef0 100644 --- a/ingestion/tests/unit/topology/dashboard/test_powerbi.py +++ b/ingestion/tests/unit/topology/dashboard/test_powerbi.py @@ -51,11 +51,13 @@ in customers_clean1 """ -EXPECTED_REDSHIFT_RESULT = { - "database": "dev", - "schema": "demo_dbt_jaffle", - "table": "customers_clean", -} +EXPECTED_REDSHIFT_RESULT = [ + { + "database": "dev", + "schema": "demo_dbt_jaffle", + "table": "customers_clean", + } +] MOCK_SNOWFLAKE_EXP = """let @@ -72,11 +74,13 @@ MOCK_SNOWFLAKE_EXP_INVALID = """let in STG_CUSTOMERS_View""" -EXPECTED_SNOWFLAKE_RESULT = { - "database": "DEMO_STAGE", - "schema": "PUBLIC", - "table": "STG_CUSTOMERS", -} +EXPECTED_SNOWFLAKE_RESULT = [ + { + "database": "DEMO_STAGE", + "schema": "PUBLIC", + "table": "STG_CUSTOMERS", + } +] mock_config = { "source": { @@ -160,11 +164,13 @@ MOCK_USER_2_ENITYTY_REF_LIST = EntityReferenceList( ) MOCK_SNOWFLAKE_EXP_V2 = 'let\n Source = Snowflake.Databases(Snowflake_URL,Warehouse,[Role=Role]),\n Database = Source{[Name=DB,Kind="Database"]}[Data],\n DB_Schema = Database{[Name=Schema,Kind="Schema"]}[Data],\n Table = DB_Schema{[Name="CUSTOMER_TABLE",Kind="Table"]}[Data],\n #"Andere entfernte Spalten" = Table.SelectColumns(Table,{"ID_BERICHTSMONAT", "ID_AKQUISE_VERMITTLER", "ID_AKQUISE_OE", "ID_SPARTE", "ID_RISIKOTRAEGER", "ID_KUNDE", "STUECK", "BBE"})\nin\n #"Andere entfernte Spalten"' -EXPECTED_SNOWFLAKE_RESULT_V2 = { - "database": "MY_DB", - "schema": "MY_SCHEMA", - "table": "CUSTOMER_TABLE", -} +EXPECTED_SNOWFLAKE_RESULT_V2 = [ + { + "database": "MY_DB", + "schema": "MY_SCHEMA", + "table": "CUSTOMER_TABLE", + } +] MOCK_DATASET_FROM_WORKSPACE = Dataset( id="testdataset", name="Test Dataset", @@ -263,6 +269,17 @@ class PowerBIUnitTest(TestCase): ) self.assertEqual(result, EXPECTED_SNOWFLAKE_RESULT_V2) + test_snowflaek_query_expression = 'let\n Source = Value.NativeQuery(Snowflake.Databases("dummy_host",(Warehouse)){[Name=(Database)]}[Data], "select * from "& Database &".""STG"".""STATIC_AOPANDLE""", null, [EnableFolding=true]),\n #"Renamed Columns" = Table.RenameColumns(Source,{{"AOP_IMPRESSIONS", "AOP Impressions"}, {"AOP_ORDERS", "AOP Orders"}, {"AOP_SPEND", "AOP Spend"}, {"AOP_TOTAL_REV", "AOP Total Revenue"}, {"AOP_UNITS", "AOP Units"}, {"AOP_VISITS", "AOP Visits"}, {"LE_IMPRESSIONS", "LE Impressions"}, {"LE_ORDERS", "LE Orders"}, {"LE_SPEND", "LE Spend"}, {"LE_TOTAL_REV", "LE Total Revenue"}, {"LE_UNITS", "LE Units"}, {"LE_VISITS", "LE Visits"}, {"SITEID", "SiteID"}, {"COUNTRY", "Country"}, {"REGION", "Region"}, {"CHANNEL", "Channel"}, {"DATE", "Date"}, {"AOP_CONV", "AOP_Conv"}, {"LE_CONV", "LE_Conv"}}),\n #"Changed Type" = Table.TransformColumnTypes(#"Renamed Columns",{{"SiteID", type text}, {"AOP Impressions", type number}, {"AOP Visits", type number}, {"AOP Orders", type number}, {"AOP Units", type number}, {"AOP Total Revenue", type number}, {"AOP Spend", type number}, {"AOP_Conv", type number}, {"AOP_UPT", type number}, {"AOP_ASP", type number}, {"AOP_AOV", type number}, {"AOP_CTR", type number}, {"LE Impressions", type number}, {"LE Visits", type number}, {"LE Orders", type number}, {"LE Units", type number}, {"LE Total Revenue", type number}, {"LE Spend", type number}, {"LE_Conv", type number}, {"LE_UPT", type number}, {"LE_ASP", type number}, {"LE_AOV", type number}, {"LE_CTR", type number}}),\n #"Duplicated Column" = Table.DuplicateColumn(#"Changed Type", "Date", "Date - Copy"),\n #"Split Column by Delimiter" = Table.SplitColumn(#"Duplicated Column", "Date - Copy", Splitter.SplitTextByDelimiter("-", QuoteStyle.None), {"Date - Copy.1", "Date - Copy.2", "Date - Copy.3"}),\n #"Changed Type1" = Table.TransformColumnTypes(#"Split Column by Delimiter",{{"Date - Copy.1", type text}, {"Date - Copy.2", type text}, {"Date - Copy.3", type text}}),\n #"Inserted Merged Column" = Table.AddColumn(#"Changed Type1", "Merged", each Text.Combine({[#"Date - Copy.1"], [#"Date - Copy.2"], [#"Date - Copy.3"]}, ""), type text),\n #"Renamed Columns1" = Table.RenameColumns(#"Inserted Merged Column",{{"Merged", "DateKey"}}),\n #"Removed Columns" = Table.RemoveColumns(#"Renamed Columns1",{"Date - Copy.1", "Date - Copy.2", "Date - Copy.3"}),\n #"Added Custom" = Table.AddColumn(#"Removed Columns", "Brand", each "CROCS"),\n #"Changed Type2" = Table.TransformColumnTypes(#"Added Custom",{{"Brand", type text}})\nin\n #"Changed Type2"' + result = self.powerbi._parse_snowflake_source( + test_snowflaek_query_expression, MOCK_DASHBOARD_DATA_MODEL + ) + # Test should parse the Snowflake query and extract table info + self.assertIsNotNone(result) + self.assertEqual(len(result), 1) + result_table = result[0] + self.assertEqual(result_table.get("schema"), "STG") + self.assertEqual(result_table.get("table"), "STATIC_AOPANDLE") + @pytest.mark.order(2) @patch("metadata.ingestion.ometa.ometa_api.OpenMetadata.get_reference_by_email") def test_owner_ingestion(self, get_reference_by_email): @@ -370,7 +387,7 @@ class PowerBIUnitTest(TestCase): result = self.powerbi._parse_table_info_from_source_exp( table, MOCK_DASHBOARD_DATA_MODEL ) - self.assertEqual(result, {}) + self.assertEqual(result, None) # no source table = PowerBiTable( @@ -380,7 +397,7 @@ class PowerBIUnitTest(TestCase): result = self.powerbi._parse_table_info_from_source_exp( table, MOCK_DASHBOARD_DATA_MODEL ) - self.assertEqual(result, {}) + self.assertEqual(result, None) @pytest.mark.order(4) @patch.object( @@ -394,6 +411,7 @@ class PowerBIUnitTest(TestCase): result = self.powerbi._parse_snowflake_source( MOCK_SNOWFLAKE_EXP_V2, MOCK_DASHBOARD_DATA_MODEL ) + result = result[0] self.assertIsNone(result["database"]) self.assertIsNone(result["schema"]) self.assertEqual(result["table"], "CUSTOMER_TABLE")