Fix: PBI databricks lineage parsing fix (#24085)

This commit is contained in:
harshsoni2024 2025-11-04 15:54:58 +05:30 committed by GitHub
parent 886a5af844
commit 0719246558
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 35 additions and 7 deletions

View File

@ -27,11 +27,16 @@ def parse_databricks_native_query_source(
catalog_info = details.get("catalog_info", "")
catalog_parameters = details.get("catalog_parameters", "")
catalog_info_match = re.match(
r".*Catalog\s*=\s*(?P<catalog>.*?)\s*,", catalog_info
).groupdict()
catalog = catalog_info_match.get("catalog", None)
r".*Catalog\s*=\s*(?P<catalog>[^,]+?)\s*,", catalog_info
)
if not catalog_info_match:
logger.error(f"Could not find catalog in info: {catalog_info}")
catalog = None
else:
catalog_groups = catalog_info_match.groupdict()
catalog = catalog_groups.get("catalog", None)
database_match = re.search(
r'Name\s*=\s*(?P<database>.*?)\s*,\s*Kind\s*=\s*"Database"',
r'Name\s*=\s*(?P<database>[^,]+?)\s*,\s*Kind\s*=\s*"Database"',
catalog_parameters,
)
database = None
@ -44,7 +49,7 @@ def parse_databricks_native_query_source(
logger.error(f"Could not find database in {source_expression}")
return None
database = database.strip('"')
database = database.strip('"').strip()
parser_query = details.get("query")
# Clean the query for parser
@ -78,7 +83,8 @@ def parse_databricks_native_query_source(
parser = LineageParser(
parser_query, dialect=Dialect.DATABRICKS, timeout_seconds=30
)
if parser.query_parsing_success is False:
raise Exception(parser.query_parsing_failure_reason)
except Exception as parser_exc:
logger.error(
f"LineageParser failed parsing query with error {parser_query[:200]} ",

View File

@ -1056,7 +1056,9 @@ class PowerbiSource(DashboardServiceSource):
return table_info_list
# parse databricks source
table_info_list = self._parse_databricks_source(source_expression)
table_info_list = self._parse_databricks_source(
source_expression, datamodel_entity
)
if isinstance(table_info_list, List):
return table_info_list

View File

@ -104,6 +104,16 @@ EXPECTED_DATABRICKS_RESULT = [
{"database": "DEMO_STAGE", "schema": "PUBLIC", "table": "STG_CUSTOMERS"}
]
MOCK_DATABRICKS_NATIVE_INVALID_QUERY_EXP = """let
Source = Value.NativeQuery(Databricks.Catalogs(Databricks_Server, Databricks_HTTP_Path, [Catalog="DEMO_CATALOG", Database=null, EnableAutomaticProxyDiscovery=null]){[Name="DEMO_STAGE",Kind="Database"]}[Data], "WITH test as (select) Select test", null, [EnableFolding=true])
in
Source"""
MOCK_DATABRICKS_NATIVE_INVALID_EXP = """let
Source = Value.NativeQuery(Databricks.Catalogs(Databricks_Server, Databricks_HTTP_Path, [Catalog="DEMO_CATALOG", Database=null, EnableAutomaticProxyDiscovery=null]){[Name="DEMO_STAGE",Kind="Database"]}[Data], null, [EnableFolding=true])
in
Source"""
mock_config = {
"source": {
"type": "powerbi",
@ -337,6 +347,16 @@ class PowerBIUnitTest(TestCase):
)
self.assertEqual(result, EXPECTED_DATABRICKS_RESULT)
result = self.powerbi._parse_databricks_source(
MOCK_DATABRICKS_NATIVE_INVALID_QUERY_EXP, MOCK_DASHBOARD_DATA_MODEL
)
self.assertIsNone(result)
result = self.powerbi._parse_databricks_source(
MOCK_DATABRICKS_NATIVE_INVALID_EXP, MOCK_DASHBOARD_DATA_MODEL
)
self.assertIsNone(result)
@pytest.mark.order(2)
@patch("metadata.ingestion.ometa.ometa_api.OpenMetadata.get_reference_by_email")
def test_owner_ingestion(self, get_reference_by_email):