diff --git a/ingestion/src/metadata/clients/databricks_client.py b/ingestion/src/metadata/clients/databricks_client.py index 9db7ad78b1a..11116a31fee 100644 --- a/ingestion/src/metadata/clients/databricks_client.py +++ b/ingestion/src/metadata/clients/databricks_client.py @@ -25,6 +25,8 @@ from metadata.utils.helpers import datetime_to_ts from metadata.utils.logger import ingestion_logger logger = ingestion_logger() +QUERY_WITH_OM_VERSION = '/* {"app": "OpenMetadata"' +QUERY_WITH_DBT = '/* {"app": "dbt"' class DatabricksClient: @@ -113,3 +115,10 @@ class DatabricksClient: logger.error(exc) return query_details + + def is_query_valid(self, row) -> bool: + query_text = row.get("query_text") + return not ( + query_text.startswith(QUERY_WITH_DBT) + or query_text.startswith(QUERY_WITH_OM_VERSION) + ) diff --git a/ingestion/src/metadata/ingestion/source/database/databricks_lineage.py b/ingestion/src/metadata/ingestion/source/database/databricks_lineage.py index 47fd346fb99..78413aa24ed 100644 --- a/ingestion/src/metadata/ingestion/source/database/databricks_lineage.py +++ b/ingestion/src/metadata/ingestion/source/database/databricks_lineage.py @@ -63,15 +63,16 @@ class DatabricksLineageSource(DatabricksQueryParserSource, LineageSource): ) for row in data: try: - yield TableQuery( - query=row.get("query_text"), - userName=row.get("user_name"), - startTime=row.get("query_start_time_ms"), - endTime=row.get("execution_end_time_ms"), - analysisDate=datetime.now(), - databaseName="default", # In databricks databaseName is always default - serviceName=self.config.serviceName, - ) + if self.client.is_query_valid(row): + yield TableQuery( + query=row.get("query_text"), + userName=row.get("user_name"), + startTime=row.get("query_start_time_ms"), + endTime=row.get("execution_end_time_ms"), + analysisDate=datetime.now(), + databaseName="default", # In databricks databaseName is always default + serviceName=self.config.serviceName, + ) except Exception as exc: logger.debug(traceback.format_exc()) logger.warning(f"Error processing query_dict {row}: {exc}") diff --git a/ingestion/src/metadata/ingestion/source/database/databricks_usage.py b/ingestion/src/metadata/ingestion/source/database/databricks_usage.py index caf7fedd871..14b219ec478 100644 --- a/ingestion/src/metadata/ingestion/source/database/databricks_usage.py +++ b/ingestion/src/metadata/ingestion/source/database/databricks_usage.py @@ -93,17 +93,18 @@ class DatabricksUsageSource(DatabricksQueryParserSource, UsageSource): ) for row in data: try: - queries.append( - TableQuery( - query=row.get("query_text"), - userName=row.get("user_name"), - startTime=row.get("query_start_time_ms"), - endTime=row.get("execution_end_time_ms"), - analysisDate=datetime.now(), - serviceName=self.config.serviceName, - databaseName="default", # In databricks databaseName is always default + if self.client.is_query_valid(row): + queries.append( + TableQuery( + query=row.get("query_text"), + userName=row.get("user_name"), + startTime=row.get("query_start_time_ms"), + endTime=row.get("execution_end_time_ms"), + analysisDate=datetime.now(), + serviceName=self.config.serviceName, + databaseName="default", # In databricks databaseName is always default + ) ) - ) except Exception as err: logger.debug(traceback.format_exc()) logger.error(str(err))