MINOR: Use slow query log for mysql lineage (#21291)

(cherry picked from commit 509bc0d82613b54ea240e8ae27385c5c6251743b)
This commit is contained in:
Mayur Singal 2025-05-20 11:10:06 +05:30 committed by OpenMetadata Release Bot
parent 57735390bd
commit 43b1aa2f24
6 changed files with 85 additions and 7 deletions

View File

@ -38,7 +38,10 @@ from metadata.ingestion.connections.test_connections import (
test_connection_db_schema_sources,
)
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.database.mysql.queries import MYSQL_TEST_GET_QUERIES
from metadata.ingestion.source.database.mysql.queries import (
MYSQL_TEST_GET_QUERIES,
MYSQL_TEST_GET_QUERIES_SLOW_LOGS,
)
from metadata.utils.constants import THREE_MIN
@ -50,7 +53,10 @@ def get_connection(connection: MysqlConnection) -> Engine:
azure_client = AzureClient(connection.authType.azureConfig).create_client()
if not connection.authType.azureConfig.scopes:
raise ValueError(
"Azure Scopes are missing, please refer https://learn.microsoft.com/en-gb/azure/mysql/flexible-server/how-to-azure-ad#2---retrieve-microsoft-entra-access-token and fetch the resource associated with it, for e.g. https://ossrdbms-aad.database.windows.net/.default"
"Azure Scopes are missing, please refer https://learn.microsoft.com/"
"en-gb/azure/mysql/flexible-server/how-to-azure-ad#2---retrieve-micr"
"osoft-entra-access-token and fetch the resource associated with it,"
" for e.g. https://ossrdbms-aad.database.windows.net/.default"
)
access_token_obj = azure_client.get_token(
*connection.authType.azureConfig.scopes.split(",")
@ -75,7 +81,9 @@ def test_connection(
of a metadata workflow or during an Automation Workflow
"""
queries = {
"GetQueries": MYSQL_TEST_GET_QUERIES,
"GetQueries": MYSQL_TEST_GET_QUERIES
if not service_connection.useSlowLogs
else MYSQL_TEST_GET_QUERIES_SLOW_LOGS,
}
return test_connection_db_schema_sources(
metadata=metadata,

View File

@ -20,9 +20,9 @@ class MysqlLineageSource(MysqlQueryParserSource, LineageSource):
sql_stmt = MYSQL_SQL_STATEMENT
filters = """
AND (
lower(argument) LIKE '%%create%%table%%select%%'
OR lower(argument) LIKE '%%insert%%into%%select%%'
OR lower(argument) LIKE '%%update%%'
OR lower(argument) LIKE '%%merge%%'
lower({sql_column}) LIKE '%%create%%table%%select%%'
OR lower({sql_column}) LIKE '%%insert%%into%%select%%'
OR lower({sql_column}) LIKE '%%update%%'
OR lower({sql_column}) LIKE '%%merge%%'
)
"""

View File

@ -36,8 +36,37 @@ LIMIT {result_limit};
"""
)
MYSQL_SQL_STATEMENT_SLOW_LOGS = textwrap.dedent(
"""
SELECT
NULL `database_name`,
sql_text `query_text`,
start_time `start_time`,
NULL `end_time`,
NULL `duration`,
NULL `schema_name`,
NULL `query_type`,
NULL `user_name`,
NULL `aborted`
FROM mysql.slow_log
WHERE start_time between '{start_time}' and '{end_time}'
AND sql_text NOT LIKE '/* {{"app": "OpenMetadata", %%}} */%%'
AND sql_text NOT LIKE '/* {{"app": "dbt", %%}} */%%'
{filters}
ORDER BY start_time desc
LIMIT {result_limit};
"""
)
MYSQL_TEST_GET_QUERIES = textwrap.dedent(
"""
SELECT `argument` from mysql.general_log limit 1;
"""
)
MYSQL_TEST_GET_QUERIES_SLOW_LOGS = textwrap.dedent(
"""
SELECT `sql_text` from mysql.slow_log limit 1;
"""
)

View File

@ -12,6 +12,7 @@
Mysql query parser module
"""
from abc import ABC
from datetime import datetime
from typing import Optional
from metadata.generated.schema.entity.services.connections.database.mysqlConnection import (
@ -22,6 +23,10 @@ from metadata.generated.schema.metadataIngestion.workflow import (
)
from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.database.mysql.queries import (
MYSQL_SQL_STATEMENT,
MYSQL_SQL_STATEMENT_SLOW_LOGS,
)
from metadata.ingestion.source.database.query_parser_source import QueryParserSource
@ -44,3 +49,29 @@ class MysqlQueryParserSource(QueryParserSource, ABC):
f"Expected MysqlConnection, but got {connection}"
)
return cls(config, metadata)
def get_sql_statement(self, start_time: datetime, end_time: datetime) -> str:
"""
returns sql statement to fetch query logs.
Override if we have specific parameters
"""
if self.service_connection.useSlowLogs:
self.sql_stmt = MYSQL_SQL_STATEMENT_SLOW_LOGS
else:
self.sql_stmt = MYSQL_SQL_STATEMENT
return self.sql_stmt.format(
start_time=start_time,
end_time=end_time,
filters=self.get_filters(),
result_limit=self.source_config.resultLimit,
)
def get_filters(self) -> str:
if self.service_connection.useSlowLogs:
sql_column = "sql_text"
else:
sql_column = "argument"
if self.source_config.filterCondition:
return f"{self.filters.format(sql_column=sql_column)} AND {self.source_config.filterCondition}"
return self.filters.format(sql_column=sql_column)

View File

@ -132,6 +132,12 @@
},
"supportsLineageExtraction": {
"$ref": "../connectionBasicType.json#/definitions/supportsLineageExtraction"
},
"useSlowLogs": {
"title": "Use Slow Logs for Lineage",
"description": "Use slow logs to extract lineage.",
"type": "boolean",
"default": false
}
},
"additionalProperties": false,

View File

@ -72,6 +72,10 @@ export interface MysqlConnection {
* in Mysql.
*/
username: string;
/**
* Use slow logs to extract lineage.
*/
useSlowLogs?: boolean;
}
/**