Fix: PowerBI snowflake query lineage parsing (#23746)

This commit is contained in:
harshsoni2024 2025-10-08 18:32:25 +05:30 committed by GitHub
parent 69f64c3f0b
commit f2819ce4e4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 263 additions and 89 deletions

View File

@ -54,6 +54,8 @@ from metadata.generated.schema.type.basic import (
from metadata.generated.schema.type.entityReferenceList import EntityReferenceList
from metadata.ingestion.api.models import Either
from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.lineage.models import Dialect
from metadata.ingestion.lineage.parser import LineageParser
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.ometa.utils import model_str
from metadata.ingestion.source.dashboard.dashboard_service import DashboardServiceSource
@ -80,6 +82,7 @@ from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
OWNER_ACCESS_RIGHTS_KEYWORDS = ["owner", "write", "admin"]
SNOWFLAKE_QUERY_EXPRESSION_KW = "Value.NativeQuery(Snowflake.Databases("
class PowerbiSource(DashboardServiceSource):
@ -788,7 +791,7 @@ class PowerbiSource(DashboardServiceSource):
logger.debug(traceback.format_exc())
return None
def _parse_redshift_source(self, source_expression: str) -> Optional[dict]:
def _parse_redshift_source(self, source_expression: str) -> Optional[List[dict]]:
try:
db_match = re.search(
r'AmazonRedshift\.Database\("[^"]+","([^"]+)"\)', source_expression
@ -805,20 +808,167 @@ class PowerbiSource(DashboardServiceSource):
table = schema_table_match[1] if len(schema_table_match) > 1 else None
if table: # atlease table should be fetched
return {"database": database, "schema": schema, "table": table}
return [{"database": database, "schema": schema, "table": table}]
return None
except Exception as exc:
logger.debug(f"Error to parse redshift table source: {exc}")
logger.debug(traceback.format_exc())
return None
def _parse_snowflake_query_source(
self, source_expression: str
) -> Optional[List[dict]]:
"""
Parse snowflake query source
source expressions like `Value.NativeQuery(Snowflake.Databases())`
"""
try:
logger.debug(
f"parsing source expression through query parser: {source_expression[:100]}"
)
# Look for SQL query after [Data],
# The pattern needs to handle the concatenated strings with & operators
m = re.search(
r'\[Data\],\s*"(.+?)"(?:,\s*null|\s*\))',
source_expression,
re.IGNORECASE | re.DOTALL,
)
if not m:
logger.debug("sql query not found in source expression")
return None
# Extract and clean the SQL query
sql_query = m.group(1).replace('""', '"')
# Handle PowerBI parameter concatenation (e.g., "& Database &")
# For now, replace parameter references with wildcards for parsing
sql_query_cleaned = re.sub(r'"?\s*&\s*\w+\s*&\s*"?\.?', "%.", sql_query)
logger.debug(f"Extracted SQL query: {sql_query}")
logger.debug(f"Cleaned SQL query: {sql_query_cleaned}")
if not sql_query_cleaned:
logger.debug("Empty SQL query after extraction")
return None
# Clean the query for parser
# 1. Replace % with a placeholder database name
parser_query = sql_query_cleaned.replace("%", "PLACEHOLDER_DB")
# 2. Remove PowerBI line feed markers #(lf) and clean up the query
parser_query = parser_query.replace("#(lf)", "\n")
# 3. Remove SQL comments that might cause issues (// style comments)
parser_query = re.sub(r"//[^\n]*", "", parser_query)
# 4. Clean up excessive whitespace
parser_query = re.sub(r"\s+", " ", parser_query).strip()
logger.debug(
f"Attempting LineageParser with cleaned query: {parser_query[:200]}"
)
try:
parser = LineageParser(
parser_query, dialect=Dialect.SNOWFLAKE, timeout_seconds=30
)
except Exception as parser_exc:
logger.debug(f"LineageParser failed with error: {parser_exc}")
logger.debug(f"Failed query was: {parser_query[:200]}...")
return None
if parser.source_tables:
logger.debug(
f"LineageParser found {len(parser.source_tables)} source table(s)"
)
for table in parser.source_tables:
logger.debug(
f"source table: {table.raw_name}, schema: {table.schema if hasattr(table, 'schema') else 'N/A'}"
)
lineage_tables_list = []
for source_table in parser.source_tables:
# source_table = parser.source_tables[0]
# Extract database from schema's parent if it exists
database = None
schema = None
if hasattr(source_table, "schema") and source_table.schema:
# Log what we have in the schema object
logger.debug(
f"Schema object type: {type(source_table.schema)}, value: {source_table.schema}"
)
# Get schema as string first
schema_str = (
source_table.schema.raw_name
if hasattr(source_table.schema, "raw_name")
else str(source_table.schema)
)
# If schema contains dots, it might be database.schema format
if "." in schema_str:
parts = schema_str.split(".")
if len(parts) == 2:
# Format: database.schema
# Check for placeholder (case insensitive)
database = (
parts[0]
if parts[0].upper() != "PLACEHOLDER_DB"
else None
)
schema = parts[1]
else:
# Just use as is
schema = schema_str
else:
schema = schema_str
# Check if schema has a parent (database)
if (
hasattr(source_table.schema, "parent")
and source_table.schema.parent
):
database = (
source_table.schema.parent.raw_name
if hasattr(source_table.schema.parent, "raw_name")
else str(source_table.schema.parent)
)
# Filter out placeholder values (case insensitive)
if database and database.upper() == "PLACEHOLDER_DB":
database = None
table = source_table.raw_name
if table:
logger.debug(f"tables found = {database}.{schema}.{table}")
lineage_tables_list.append(
{
"database": database,
"schema": schema,
"table": table,
}
)
return lineage_tables_list
logger.debug("tables in query not found through parser")
return None
except Exception as exc:
logger.debug(f"Error parsing snowflake query source: {exc}")
logger.debug(traceback.format_exc())
return None
def _parse_snowflake_source(
self, source_expression: str, datamodel_entity: DashboardDataModel
) -> Optional[dict]:
) -> Optional[List[dict]]:
try:
if "Snowflake.Databases" not in source_expression:
# Not a snowflake valid expression
return None
if SNOWFLAKE_QUERY_EXPRESSION_KW in source_expression:
# snowflake query source identified
return self._parse_snowflake_query_source(source_expression)
db_match = re.search(
r'\[Name=(?:"([^"]+)"|([^,]+)),Kind="Database"\]', source_expression
)
@ -838,11 +988,13 @@ class PowerbiSource(DashboardServiceSource):
view = self._parse_snowflake_regex_exp(view_match, datamodel_entity)
if table or view: # atlease table or view should be fetched
return {
"database": database,
"schema": schema,
"table": table if table else view,
}
return [
{
"database": database,
"schema": schema,
"table": table if table else view,
}
]
return None
except Exception as exc:
logger.debug(f"Error to parse snowflake table source: {exc}")
@ -851,29 +1003,29 @@ class PowerbiSource(DashboardServiceSource):
def _parse_table_info_from_source_exp(
self, table: PowerBiTable, datamodel_entity: DashboardDataModel
) -> dict:
) -> Optional[List[dict]]:
try:
if not isinstance(table.source, list):
return {}
return None
source_expression = table.source[0].expression
if not source_expression:
logger.debug(f"No source expression found for table: {table.name}")
return {}
return None
# parse snowflake source
table_info = self._parse_snowflake_source(
table_info_list = self._parse_snowflake_source(
source_expression, datamodel_entity
)
if isinstance(table_info, dict):
return table_info
if isinstance(table_info_list, List):
return table_info_list
# parse redshift source
table_info = self._parse_redshift_source(source_expression)
if isinstance(table_info, dict):
return table_info
return {}
table_info_list = self._parse_redshift_source(source_expression)
if isinstance(table_info_list, List):
return table_info_list
return None
except Exception as exc:
logger.debug(f"Error to parse table source: {exc}")
logger.debug(traceback.format_exc())
return {}
return None
def _get_table_and_datamodel_lineage(
self,
@ -892,60 +1044,64 @@ class PowerbiSource(DashboardServiceSource):
) = self.parse_db_service_prefix(db_service_prefix)
try:
table_info = self._parse_table_info_from_source_exp(table, datamodel_entity)
table_name = table_info.get("table") or table.name
schema_name = table_info.get("schema")
database_name = table_info.get("database")
if (
prefix_table_name
and table_name
and prefix_table_name.lower() != table_name.lower()
):
logger.debug(
f"Table {table_name} does not match prefix {prefix_table_name}"
)
return
if (
prefix_schema_name
and schema_name
and prefix_schema_name.lower() != schema_name.lower()
):
logger.debug(
f"Schema {table_info.get('schema')} does not match prefix {prefix_schema_name}"
)
return
if (
prefix_database_name
and database_name
and prefix_database_name.lower() != database_name.lower()
):
logger.debug(
f"Database {table_info.get('database')} does not match prefix {prefix_database_name}"
)
return
fqn_search_string = build_es_fqn_search_string(
service_name=prefix_service_name or "*",
table_name=(prefix_table_name or table_name),
schema_name=(prefix_schema_name or schema_name),
database_name=(prefix_database_name or database_name),
table_info_list = self._parse_table_info_from_source_exp(
table, datamodel_entity
)
table_entity = self.metadata.search_in_any_service(
entity_type=Table,
fqn_search_string=fqn_search_string,
)
if table_entity and datamodel_entity:
columns_list = [column.name for column in table.columns]
column_lineage = self._get_column_lineage(
table_entity, datamodel_entity, columns_list
)
yield self._get_add_lineage_request(
to_entity=datamodel_entity,
from_entity=table_entity,
column_lineage=column_lineage,
)
if isinstance(table_info_list, List):
for table_info in table_info_list:
table_name = table_info.get("table") or table.name
schema_name = table_info.get("schema")
database_name = table_info.get("database")
if (
prefix_table_name
and table_name
and prefix_table_name.lower() != table_name.lower()
):
logger.debug(
f"Table {table_name} does not match prefix {prefix_table_name}"
)
return
if (
prefix_schema_name
and schema_name
and prefix_schema_name.lower() != schema_name.lower()
):
logger.debug(
f"Schema {table_info.get('schema')} does not match prefix {prefix_schema_name}"
)
return
if (
prefix_database_name
and database_name
and prefix_database_name.lower() != database_name.lower()
):
logger.debug(
f"Database {table_info.get('database')} does not match prefix {prefix_database_name}"
)
return
fqn_search_string = build_es_fqn_search_string(
service_name=prefix_service_name or "*",
table_name=(prefix_table_name or table_name),
schema_name=(prefix_schema_name or schema_name),
database_name=(prefix_database_name or database_name),
)
table_entity = self.metadata.search_in_any_service(
entity_type=Table,
fqn_search_string=fqn_search_string,
)
if table_entity and datamodel_entity:
columns_list = [column.name for column in table.columns]
column_lineage = self._get_column_lineage(
table_entity, datamodel_entity, columns_list
)
yield self._get_add_lineage_request(
to_entity=datamodel_entity,
from_entity=table_entity,
column_lineage=column_lineage,
)
except Exception as exc: # pylint: disable=broad-except
yield Either(
left=StackTraceError(

View File

@ -51,11 +51,13 @@ in
customers_clean1
"""
EXPECTED_REDSHIFT_RESULT = {
"database": "dev",
"schema": "demo_dbt_jaffle",
"table": "customers_clean",
}
EXPECTED_REDSHIFT_RESULT = [
{
"database": "dev",
"schema": "demo_dbt_jaffle",
"table": "customers_clean",
}
]
MOCK_SNOWFLAKE_EXP = """let
@ -72,11 +74,13 @@ MOCK_SNOWFLAKE_EXP_INVALID = """let
in
STG_CUSTOMERS_View"""
EXPECTED_SNOWFLAKE_RESULT = {
"database": "DEMO_STAGE",
"schema": "PUBLIC",
"table": "STG_CUSTOMERS",
}
EXPECTED_SNOWFLAKE_RESULT = [
{
"database": "DEMO_STAGE",
"schema": "PUBLIC",
"table": "STG_CUSTOMERS",
}
]
mock_config = {
"source": {
@ -160,11 +164,13 @@ MOCK_USER_2_ENITYTY_REF_LIST = EntityReferenceList(
)
MOCK_SNOWFLAKE_EXP_V2 = 'let\n Source = Snowflake.Databases(Snowflake_URL,Warehouse,[Role=Role]),\n Database = Source{[Name=DB,Kind="Database"]}[Data],\n DB_Schema = Database{[Name=Schema,Kind="Schema"]}[Data],\n Table = DB_Schema{[Name="CUSTOMER_TABLE",Kind="Table"]}[Data],\n #"Andere entfernte Spalten" = Table.SelectColumns(Table,{"ID_BERICHTSMONAT", "ID_AKQUISE_VERMITTLER", "ID_AKQUISE_OE", "ID_SPARTE", "ID_RISIKOTRAEGER", "ID_KUNDE", "STUECK", "BBE"})\nin\n #"Andere entfernte Spalten"'
EXPECTED_SNOWFLAKE_RESULT_V2 = {
"database": "MY_DB",
"schema": "MY_SCHEMA",
"table": "CUSTOMER_TABLE",
}
EXPECTED_SNOWFLAKE_RESULT_V2 = [
{
"database": "MY_DB",
"schema": "MY_SCHEMA",
"table": "CUSTOMER_TABLE",
}
]
MOCK_DATASET_FROM_WORKSPACE = Dataset(
id="testdataset",
name="Test Dataset",
@ -263,6 +269,17 @@ class PowerBIUnitTest(TestCase):
)
self.assertEqual(result, EXPECTED_SNOWFLAKE_RESULT_V2)
test_snowflaek_query_expression = 'let\n Source = Value.NativeQuery(Snowflake.Databases("dummy_host",(Warehouse)){[Name=(Database)]}[Data], "select * from "& Database &".""STG"".""STATIC_AOPANDLE""", null, [EnableFolding=true]),\n #"Renamed Columns" = Table.RenameColumns(Source,{{"AOP_IMPRESSIONS", "AOP Impressions"}, {"AOP_ORDERS", "AOP Orders"}, {"AOP_SPEND", "AOP Spend"}, {"AOP_TOTAL_REV", "AOP Total Revenue"}, {"AOP_UNITS", "AOP Units"}, {"AOP_VISITS", "AOP Visits"}, {"LE_IMPRESSIONS", "LE Impressions"}, {"LE_ORDERS", "LE Orders"}, {"LE_SPEND", "LE Spend"}, {"LE_TOTAL_REV", "LE Total Revenue"}, {"LE_UNITS", "LE Units"}, {"LE_VISITS", "LE Visits"}, {"SITEID", "SiteID"}, {"COUNTRY", "Country"}, {"REGION", "Region"}, {"CHANNEL", "Channel"}, {"DATE", "Date"}, {"AOP_CONV", "AOP_Conv"}, {"LE_CONV", "LE_Conv"}}),\n #"Changed Type" = Table.TransformColumnTypes(#"Renamed Columns",{{"SiteID", type text}, {"AOP Impressions", type number}, {"AOP Visits", type number}, {"AOP Orders", type number}, {"AOP Units", type number}, {"AOP Total Revenue", type number}, {"AOP Spend", type number}, {"AOP_Conv", type number}, {"AOP_UPT", type number}, {"AOP_ASP", type number}, {"AOP_AOV", type number}, {"AOP_CTR", type number}, {"LE Impressions", type number}, {"LE Visits", type number}, {"LE Orders", type number}, {"LE Units", type number}, {"LE Total Revenue", type number}, {"LE Spend", type number}, {"LE_Conv", type number}, {"LE_UPT", type number}, {"LE_ASP", type number}, {"LE_AOV", type number}, {"LE_CTR", type number}}),\n #"Duplicated Column" = Table.DuplicateColumn(#"Changed Type", "Date", "Date - Copy"),\n #"Split Column by Delimiter" = Table.SplitColumn(#"Duplicated Column", "Date - Copy", Splitter.SplitTextByDelimiter("-", QuoteStyle.None), {"Date - Copy.1", "Date - Copy.2", "Date - Copy.3"}),\n #"Changed Type1" = Table.TransformColumnTypes(#"Split Column by Delimiter",{{"Date - Copy.1", type text}, {"Date - Copy.2", type text}, {"Date - Copy.3", type text}}),\n #"Inserted Merged Column" = Table.AddColumn(#"Changed Type1", "Merged", each Text.Combine({[#"Date - Copy.1"], [#"Date - Copy.2"], [#"Date - Copy.3"]}, ""), type text),\n #"Renamed Columns1" = Table.RenameColumns(#"Inserted Merged Column",{{"Merged", "DateKey"}}),\n #"Removed Columns" = Table.RemoveColumns(#"Renamed Columns1",{"Date - Copy.1", "Date - Copy.2", "Date - Copy.3"}),\n #"Added Custom" = Table.AddColumn(#"Removed Columns", "Brand", each "CROCS"),\n #"Changed Type2" = Table.TransformColumnTypes(#"Added Custom",{{"Brand", type text}})\nin\n #"Changed Type2"'
result = self.powerbi._parse_snowflake_source(
test_snowflaek_query_expression, MOCK_DASHBOARD_DATA_MODEL
)
# Test should parse the Snowflake query and extract table info
self.assertIsNotNone(result)
self.assertEqual(len(result), 1)
result_table = result[0]
self.assertEqual(result_table.get("schema"), "STG")
self.assertEqual(result_table.get("table"), "STATIC_AOPANDLE")
@pytest.mark.order(2)
@patch("metadata.ingestion.ometa.ometa_api.OpenMetadata.get_reference_by_email")
def test_owner_ingestion(self, get_reference_by_email):
@ -370,7 +387,7 @@ class PowerBIUnitTest(TestCase):
result = self.powerbi._parse_table_info_from_source_exp(
table, MOCK_DASHBOARD_DATA_MODEL
)
self.assertEqual(result, {})
self.assertEqual(result, None)
# no source
table = PowerBiTable(
@ -380,7 +397,7 @@ class PowerBIUnitTest(TestCase):
result = self.powerbi._parse_table_info_from_source_exp(
table, MOCK_DASHBOARD_DATA_MODEL
)
self.assertEqual(result, {})
self.assertEqual(result, None)
@pytest.mark.order(4)
@patch.object(
@ -394,6 +411,7 @@ class PowerBIUnitTest(TestCase):
result = self.powerbi._parse_snowflake_source(
MOCK_SNOWFLAKE_EXP_V2, MOCK_DASHBOARD_DATA_MODEL
)
result = result[0]
self.assertIsNone(result["database"])
self.assertIsNone(result["schema"])
self.assertEqual(result["table"], "CUSTOMER_TABLE")