From 43b1aa2f2463a2ba0398ee8a00c863a5a906010d Mon Sep 17 00:00:00 2001 From: Mayur Singal <39544459+ulixius9@users.noreply.github.com> Date: Tue, 20 May 2025 11:10:06 +0530 Subject: [PATCH] MINOR: Use slow query log for mysql lineage (#21291) (cherry picked from commit 509bc0d82613b54ea240e8ae27385c5c6251743b) --- .../source/database/mysql/connection.py | 14 +++++++-- .../source/database/mysql/lineage.py | 8 ++--- .../source/database/mysql/queries.py | 29 +++++++++++++++++ .../source/database/mysql/query_parser.py | 31 +++++++++++++++++++ .../connections/database/mysqlConnection.json | 6 ++++ .../connections/database/mysqlConnection.ts | 4 +++ 6 files changed, 85 insertions(+), 7 deletions(-) diff --git a/ingestion/src/metadata/ingestion/source/database/mysql/connection.py b/ingestion/src/metadata/ingestion/source/database/mysql/connection.py index 5675c41f96c..42bb4688079 100644 --- a/ingestion/src/metadata/ingestion/source/database/mysql/connection.py +++ b/ingestion/src/metadata/ingestion/source/database/mysql/connection.py @@ -38,7 +38,10 @@ from metadata.ingestion.connections.test_connections import ( test_connection_db_schema_sources, ) from metadata.ingestion.ometa.ometa_api import OpenMetadata -from metadata.ingestion.source.database.mysql.queries import MYSQL_TEST_GET_QUERIES +from metadata.ingestion.source.database.mysql.queries import ( + MYSQL_TEST_GET_QUERIES, + MYSQL_TEST_GET_QUERIES_SLOW_LOGS, +) from metadata.utils.constants import THREE_MIN @@ -50,7 +53,10 @@ def get_connection(connection: MysqlConnection) -> Engine: azure_client = AzureClient(connection.authType.azureConfig).create_client() if not connection.authType.azureConfig.scopes: raise ValueError( - "Azure Scopes are missing, please refer https://learn.microsoft.com/en-gb/azure/mysql/flexible-server/how-to-azure-ad#2---retrieve-microsoft-entra-access-token and fetch the resource associated with it, for e.g. https://ossrdbms-aad.database.windows.net/.default" + "Azure Scopes are missing, please refer https://learn.microsoft.com/" + "en-gb/azure/mysql/flexible-server/how-to-azure-ad#2---retrieve-micr" + "osoft-entra-access-token and fetch the resource associated with it," + " for e.g. https://ossrdbms-aad.database.windows.net/.default" ) access_token_obj = azure_client.get_token( *connection.authType.azureConfig.scopes.split(",") @@ -75,7 +81,9 @@ def test_connection( of a metadata workflow or during an Automation Workflow """ queries = { - "GetQueries": MYSQL_TEST_GET_QUERIES, + "GetQueries": MYSQL_TEST_GET_QUERIES + if not service_connection.useSlowLogs + else MYSQL_TEST_GET_QUERIES_SLOW_LOGS, } return test_connection_db_schema_sources( metadata=metadata, diff --git a/ingestion/src/metadata/ingestion/source/database/mysql/lineage.py b/ingestion/src/metadata/ingestion/source/database/mysql/lineage.py index d9b089b88ea..06eb6ba18df 100644 --- a/ingestion/src/metadata/ingestion/source/database/mysql/lineage.py +++ b/ingestion/src/metadata/ingestion/source/database/mysql/lineage.py @@ -20,9 +20,9 @@ class MysqlLineageSource(MysqlQueryParserSource, LineageSource): sql_stmt = MYSQL_SQL_STATEMENT filters = """ AND ( - lower(argument) LIKE '%%create%%table%%select%%' - OR lower(argument) LIKE '%%insert%%into%%select%%' - OR lower(argument) LIKE '%%update%%' - OR lower(argument) LIKE '%%merge%%' + lower({sql_column}) LIKE '%%create%%table%%select%%' + OR lower({sql_column}) LIKE '%%insert%%into%%select%%' + OR lower({sql_column}) LIKE '%%update%%' + OR lower({sql_column}) LIKE '%%merge%%' ) """ diff --git a/ingestion/src/metadata/ingestion/source/database/mysql/queries.py b/ingestion/src/metadata/ingestion/source/database/mysql/queries.py index 10b4024be70..703fabc5ff9 100644 --- a/ingestion/src/metadata/ingestion/source/database/mysql/queries.py +++ b/ingestion/src/metadata/ingestion/source/database/mysql/queries.py @@ -36,8 +36,37 @@ LIMIT {result_limit}; """ ) + +MYSQL_SQL_STATEMENT_SLOW_LOGS = textwrap.dedent( + """ +SELECT + NULL `database_name`, + sql_text `query_text`, + start_time `start_time`, + NULL `end_time`, + NULL `duration`, + NULL `schema_name`, + NULL `query_type`, + NULL `user_name`, + NULL `aborted` +FROM mysql.slow_log +WHERE start_time between '{start_time}' and '{end_time}' + AND sql_text NOT LIKE '/* {{"app": "OpenMetadata", %%}} */%%' + AND sql_text NOT LIKE '/* {{"app": "dbt", %%}} */%%' + {filters} +ORDER BY start_time desc +LIMIT {result_limit}; +""" +) + MYSQL_TEST_GET_QUERIES = textwrap.dedent( """ SELECT `argument` from mysql.general_log limit 1; """ ) + +MYSQL_TEST_GET_QUERIES_SLOW_LOGS = textwrap.dedent( + """ +SELECT `sql_text` from mysql.slow_log limit 1; +""" +) diff --git a/ingestion/src/metadata/ingestion/source/database/mysql/query_parser.py b/ingestion/src/metadata/ingestion/source/database/mysql/query_parser.py index c8cd77b6720..6cb2ff6ffe0 100644 --- a/ingestion/src/metadata/ingestion/source/database/mysql/query_parser.py +++ b/ingestion/src/metadata/ingestion/source/database/mysql/query_parser.py @@ -12,6 +12,7 @@ Mysql query parser module """ from abc import ABC +from datetime import datetime from typing import Optional from metadata.generated.schema.entity.services.connections.database.mysqlConnection import ( @@ -22,6 +23,10 @@ from metadata.generated.schema.metadataIngestion.workflow import ( ) from metadata.ingestion.api.steps import InvalidSourceException from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.ingestion.source.database.mysql.queries import ( + MYSQL_SQL_STATEMENT, + MYSQL_SQL_STATEMENT_SLOW_LOGS, +) from metadata.ingestion.source.database.query_parser_source import QueryParserSource @@ -44,3 +49,29 @@ class MysqlQueryParserSource(QueryParserSource, ABC): f"Expected MysqlConnection, but got {connection}" ) return cls(config, metadata) + + def get_sql_statement(self, start_time: datetime, end_time: datetime) -> str: + """ + returns sql statement to fetch query logs. + + Override if we have specific parameters + """ + if self.service_connection.useSlowLogs: + self.sql_stmt = MYSQL_SQL_STATEMENT_SLOW_LOGS + else: + self.sql_stmt = MYSQL_SQL_STATEMENT + return self.sql_stmt.format( + start_time=start_time, + end_time=end_time, + filters=self.get_filters(), + result_limit=self.source_config.resultLimit, + ) + + def get_filters(self) -> str: + if self.service_connection.useSlowLogs: + sql_column = "sql_text" + else: + sql_column = "argument" + if self.source_config.filterCondition: + return f"{self.filters.format(sql_column=sql_column)} AND {self.source_config.filterCondition}" + return self.filters.format(sql_column=sql_column) diff --git a/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/database/mysqlConnection.json b/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/database/mysqlConnection.json index 29fe2d89d82..52095141a5e 100644 --- a/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/database/mysqlConnection.json +++ b/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/database/mysqlConnection.json @@ -132,6 +132,12 @@ }, "supportsLineageExtraction": { "$ref": "../connectionBasicType.json#/definitions/supportsLineageExtraction" + }, + "useSlowLogs": { + "title": "Use Slow Logs for Lineage", + "description": "Use slow logs to extract lineage.", + "type": "boolean", + "default": false } }, "additionalProperties": false, diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/connections/database/mysqlConnection.ts b/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/connections/database/mysqlConnection.ts index 3e8f223d8ed..bdd884db2f5 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/connections/database/mysqlConnection.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/entity/services/connections/database/mysqlConnection.ts @@ -72,6 +72,10 @@ export interface MysqlConnection { * in Mysql. */ username: string; + /** + * Use slow logs to extract lineage. + */ + useSlowLogs?: boolean; } /**