Fix: ignored query with om-version and dbt (#8851)

* Fix: ignored query with om-version and dbt

* Change based on comments
This commit is contained in:
Milan Bariya 2022-11-18 18:52:11 +05:30 committed by GitHub
parent 2c055641be
commit bc215515d8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 30 additions and 19 deletions

View File

@ -25,6 +25,8 @@ from metadata.utils.helpers import datetime_to_ts
from metadata.utils.logger import ingestion_logger from metadata.utils.logger import ingestion_logger
logger = ingestion_logger() logger = ingestion_logger()
QUERY_WITH_OM_VERSION = '/* {"app": "OpenMetadata"'
QUERY_WITH_DBT = '/* {"app": "dbt"'
class DatabricksClient: class DatabricksClient:
@ -113,3 +115,10 @@ class DatabricksClient:
logger.error(exc) logger.error(exc)
return query_details return query_details
def is_query_valid(self, row) -> bool:
query_text = row.get("query_text")
return not (
query_text.startswith(QUERY_WITH_DBT)
or query_text.startswith(QUERY_WITH_OM_VERSION)
)

View File

@ -63,15 +63,16 @@ class DatabricksLineageSource(DatabricksQueryParserSource, LineageSource):
) )
for row in data: for row in data:
try: try:
yield TableQuery( if self.client.is_query_valid(row):
query=row.get("query_text"), yield TableQuery(
userName=row.get("user_name"), query=row.get("query_text"),
startTime=row.get("query_start_time_ms"), userName=row.get("user_name"),
endTime=row.get("execution_end_time_ms"), startTime=row.get("query_start_time_ms"),
analysisDate=datetime.now(), endTime=row.get("execution_end_time_ms"),
databaseName="default", # In databricks databaseName is always default analysisDate=datetime.now(),
serviceName=self.config.serviceName, databaseName="default", # In databricks databaseName is always default
) serviceName=self.config.serviceName,
)
except Exception as exc: except Exception as exc:
logger.debug(traceback.format_exc()) logger.debug(traceback.format_exc())
logger.warning(f"Error processing query_dict {row}: {exc}") logger.warning(f"Error processing query_dict {row}: {exc}")

View File

@ -93,17 +93,18 @@ class DatabricksUsageSource(DatabricksQueryParserSource, UsageSource):
) )
for row in data: for row in data:
try: try:
queries.append( if self.client.is_query_valid(row):
TableQuery( queries.append(
query=row.get("query_text"), TableQuery(
userName=row.get("user_name"), query=row.get("query_text"),
startTime=row.get("query_start_time_ms"), userName=row.get("user_name"),
endTime=row.get("execution_end_time_ms"), startTime=row.get("query_start_time_ms"),
analysisDate=datetime.now(), endTime=row.get("execution_end_time_ms"),
serviceName=self.config.serviceName, analysisDate=datetime.now(),
databaseName="default", # In databricks databaseName is always default serviceName=self.config.serviceName,
databaseName="default", # In databricks databaseName is always default
)
) )
)
except Exception as err: except Exception as err:
logger.debug(traceback.format_exc()) logger.debug(traceback.format_exc())
logger.error(str(err)) logger.error(str(err))