diff --git a/ingestion/src/metadata/ingestion/source/dashboard/powerbi/databricks_parser.py b/ingestion/src/metadata/ingestion/source/dashboard/powerbi/databricks_parser.py index 73e912537a0..ac1e74b6d14 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/powerbi/databricks_parser.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/powerbi/databricks_parser.py @@ -27,11 +27,16 @@ def parse_databricks_native_query_source( catalog_info = details.get("catalog_info", "") catalog_parameters = details.get("catalog_parameters", "") catalog_info_match = re.match( - r".*Catalog\s*=\s*(?P.*?)\s*,", catalog_info - ).groupdict() - catalog = catalog_info_match.get("catalog", None) + r".*Catalog\s*=\s*(?P[^,]+?)\s*,", catalog_info + ) + if not catalog_info_match: + logger.error(f"Could not find catalog in info: {catalog_info}") + catalog = None + else: + catalog_groups = catalog_info_match.groupdict() + catalog = catalog_groups.get("catalog", None) database_match = re.search( - r'Name\s*=\s*(?P.*?)\s*,\s*Kind\s*=\s*"Database"', + r'Name\s*=\s*(?P[^,]+?)\s*,\s*Kind\s*=\s*"Database"', catalog_parameters, ) database = None @@ -44,7 +49,7 @@ def parse_databricks_native_query_source( logger.error(f"Could not find database in {source_expression}") return None - database = database.strip('"') + database = database.strip('"').strip() parser_query = details.get("query") # Clean the query for parser @@ -78,7 +83,8 @@ def parse_databricks_native_query_source( parser = LineageParser( parser_query, dialect=Dialect.DATABRICKS, timeout_seconds=30 ) - + if parser.query_parsing_success is False: + raise Exception(parser.query_parsing_failure_reason) except Exception as parser_exc: logger.error( f"LineageParser failed parsing query with error {parser_query[:200]} ", diff --git a/ingestion/src/metadata/ingestion/source/dashboard/powerbi/metadata.py b/ingestion/src/metadata/ingestion/source/dashboard/powerbi/metadata.py index 45c46317c01..3242c5745e7 100644 --- a/ingestion/src/metadata/ingestion/source/dashboard/powerbi/metadata.py +++ b/ingestion/src/metadata/ingestion/source/dashboard/powerbi/metadata.py @@ -1056,7 +1056,9 @@ class PowerbiSource(DashboardServiceSource): return table_info_list # parse databricks source - table_info_list = self._parse_databricks_source(source_expression) + table_info_list = self._parse_databricks_source( + source_expression, datamodel_entity + ) if isinstance(table_info_list, List): return table_info_list diff --git a/ingestion/tests/unit/topology/dashboard/test_powerbi.py b/ingestion/tests/unit/topology/dashboard/test_powerbi.py index b66a0c0fb1f..a29c98451b3 100644 --- a/ingestion/tests/unit/topology/dashboard/test_powerbi.py +++ b/ingestion/tests/unit/topology/dashboard/test_powerbi.py @@ -104,6 +104,16 @@ EXPECTED_DATABRICKS_RESULT = [ {"database": "DEMO_STAGE", "schema": "PUBLIC", "table": "STG_CUSTOMERS"} ] +MOCK_DATABRICKS_NATIVE_INVALID_QUERY_EXP = """let + Source = Value.NativeQuery(Databricks.Catalogs(Databricks_Server, Databricks_HTTP_Path, [Catalog="DEMO_CATALOG", Database=null, EnableAutomaticProxyDiscovery=null]){[Name="DEMO_STAGE",Kind="Database"]}[Data], "WITH test as (select) Select test", null, [EnableFolding=true]) +in + Source""" + +MOCK_DATABRICKS_NATIVE_INVALID_EXP = """let + Source = Value.NativeQuery(Databricks.Catalogs(Databricks_Server, Databricks_HTTP_Path, [Catalog="DEMO_CATALOG", Database=null, EnableAutomaticProxyDiscovery=null]){[Name="DEMO_STAGE",Kind="Database"]}[Data], null, [EnableFolding=true]) +in + Source""" + mock_config = { "source": { "type": "powerbi", @@ -337,6 +347,16 @@ class PowerBIUnitTest(TestCase): ) self.assertEqual(result, EXPECTED_DATABRICKS_RESULT) + result = self.powerbi._parse_databricks_source( + MOCK_DATABRICKS_NATIVE_INVALID_QUERY_EXP, MOCK_DASHBOARD_DATA_MODEL + ) + self.assertIsNone(result) + + result = self.powerbi._parse_databricks_source( + MOCK_DATABRICKS_NATIVE_INVALID_EXP, MOCK_DASHBOARD_DATA_MODEL + ) + self.assertIsNone(result) + @pytest.mark.order(2) @patch("metadata.ingestion.ometa.ometa_api.OpenMetadata.get_reference_by_email") def test_owner_ingestion(self, get_reference_by_email):