From 7ab6755beb75b7cefd5e1600843ec0aee69571a4 Mon Sep 17 00:00:00 2001 From: Teddy Date: Thu, 22 May 2025 17:22:05 +0200 Subject: [PATCH] ISSUE #21101 - Implement BQ Partitioned Tests (#21348) * feat: add query logger as an event listent in debug mode * fix: added ingestion.src plugin to pylint * minor: add partition sampled table * test: added test for partitioned BQ table * Remove log_query function from logger.py * style: ran python linting --- .pylintrc | 2 +- .../examples/sample_data/datasets/tables.json | 22 ++- ingestion/plugins/import_checker.py | 65 ++++++++ ingestion/pyproject.toml | 2 +- ingestion/setup.py | 2 +- .../ingestion/connections/builders.py | 3 + .../ingestion/connections/query_logger.py | 115 +++++++++++++ .../ingestion/source/database/sample_data.py | 1 + ingestion/tests/cli_e2e/test_cli_bigquery.py | 30 +++- ingestion/tests/unit/test_import_checker.py | 154 ++++++++++++++++++ 10 files changed, 387 insertions(+), 9 deletions(-) create mode 100644 ingestion/plugins/import_checker.py create mode 100644 ingestion/src/metadata/ingestion/connections/query_logger.py create mode 100644 ingestion/tests/unit/test_import_checker.py diff --git a/.pylintrc b/.pylintrc index f4f93184514..316ae8b9391 100644 --- a/.pylintrc +++ b/.pylintrc @@ -21,7 +21,7 @@ module-rgx=(([a-z_][a-z0-9_]*)|([a-zA-Z0-9]+))$ fail-under=6.0 init-hook='from pylint.config import find_default_config_files; import os, sys; sys.path.append(os.path.dirname(next(find_default_config_files())))' extension-pkg-allow-list=pydantic -load-plugins=ingestion.plugins.print_checker +load-plugins=ingestion.plugins.print_checker,ingestion.plugins.import_checker max-public-methods=25 [MESSAGES CONTROL] diff --git a/ingestion/examples/sample_data/datasets/tables.json b/ingestion/examples/sample_data/datasets/tables.json index 9ce4c9c5275..ab444d563b7 100644 --- a/ingestion/examples/sample_data/datasets/tables.json +++ b/ingestion/examples/sample_data/datasets/tables.json @@ -15039,7 +15039,16 @@ "updatedAt": 1638354087862, "updatedBy": "anonymous", "href": "http://localhost:8585/api/v1/tables/96b4af15-3bdf-4483-a602-3014bbd8ebc6", - "tableType": "Regular", + "tableType": "Partitioned", + "tablePartition": { + "columns": [ + { + "columnName": "_PARTITIONTIME", + "intervalType": "INGESTION-TIME", + "interval": "DAY" + } + ] + }, "columns": [ { "name": "comments", @@ -15694,7 +15703,16 @@ "updatedAt": 1638354087891, "updatedBy": "anonymous", "href": "http://localhost:8585/api/v1/tables/afce49e7-8ae1-41e3-971e-8f79c254b24a", - "tableType": "Regular", + "tableType": "Partitioned", + "tablePartition": { + "columns": [ + { + "columnName": "_PARTITIONDATE", + "intervalType": "INGESTION-TIME", + "interval": "DAY" + } + ] + }, "columns": [ { "name": "comments", diff --git a/ingestion/plugins/import_checker.py b/ingestion/plugins/import_checker.py new file mode 100644 index 00000000000..ecbf6b877bc --- /dev/null +++ b/ingestion/plugins/import_checker.py @@ -0,0 +1,65 @@ +# Copyright 2025 Collate +# Licensed under the Collate Community License, Version 1.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Custom pylint plugin to catch `ingest.src` imports +""" +from typing import TYPE_CHECKING + +from astroid import nodes +from pylint.checkers import BaseChecker +from pylint.checkers.utils import only_required_for_messages + +if TYPE_CHECKING: + from pylint.lint import PyLinter + + +class ImportChecker(BaseChecker): + """ + Check for any `ingestion.src.metadata` imports + """ + + name = "no_ingestion_src_imports" + _symbol = "ingestion-src-import" + msgs = { + "W5002": ( + "Found ingestion.src.metadata import", + _symbol, + "`ingestion.src.metadata` imports are not allowed, use `metadata.` instead", + ) + } + + @only_required_for_messages("ingestion-src-import") + def visit_import(self, node: nodes.Import) -> None: + """Check for direct imports of ingestion.src.metadata""" + for name_tuple in node.names: + if isinstance(name_tuple, tuple) and name_tuple[0].startswith( + "ingestion.src.metadata" + ): + self.add_message(self._symbol, node=node) + + @only_required_for_messages("ingestion-src-import") + def visit_importfrom(self, node: nodes.ImportFrom) -> None: + """Check for from ingestion.src.metadata imports""" + if ( + node.modname + and isinstance(node.modname, str) + and node.modname.startswith("ingestion.src.metadata") + ): + self.add_message(self._symbol, node=node) + + +def register(linter: "PyLinter") -> None: + """ + This required method auto registers the checker during initialization. + :param linter: The linter to register the checker to. + """ + linter.register_checker(ImportChecker(linter)) diff --git a/ingestion/pyproject.toml b/ingestion/pyproject.toml index a6424f1c53b..763c9f8ef18 100644 --- a/ingestion/pyproject.toml +++ b/ingestion/pyproject.toml @@ -100,7 +100,7 @@ module-rgx = "(([a-z_][a-z0-9_]*)|([a-zA-Z0-9]+))$" fail-under = 6.0 init-hook = "from pylint.config import find_default_config_files; import os, sys; sys.path.append(os.path.dirname(next(find_default_config_files())))" extension-pkg-allow-list = "pydantic" -load-plugins = "ingestion.plugins.print_checker" +load-plugins = "ingestion.plugins.print_checker,ingestion.plugins.import_checker" max-public-methods = 25 ignore-paths = [ diff --git a/ingestion/setup.py b/ingestion/setup.py index a3523163a2a..eadecd142aa 100644 --- a/ingestion/setup.py +++ b/ingestion/setup.py @@ -440,7 +440,7 @@ test = { "python-liquid", VERSIONS["google-cloud-bigtable"], *plugins["bigquery"], - "faker==37.1.0", # Fixed the version to prevent flaky tests! + "Faker==37.1.0", # Fixed the version to prevent flaky tests! } if sys.version_info >= (3, 9): diff --git a/ingestion/src/metadata/ingestion/connections/builders.py b/ingestion/src/metadata/ingestion/connections/builders.py index 61db6c7c54f..929392bd9e6 100644 --- a/ingestion/src/metadata/ingestion/connections/builders.py +++ b/ingestion/src/metadata/ingestion/connections/builders.py @@ -31,6 +31,7 @@ from metadata.generated.schema.entity.services.connections.database.common.iamAu IamAuthConfigurationSource, ) from metadata.ingestion.connections.headers import inject_query_header_by_conn +from metadata.ingestion.connections.query_logger import attach_query_tracker from metadata.ingestion.connections.secrets import connection_with_options_secrets from metadata.utils.constants import BUILDER_PASSWORD_ATTR from metadata.utils.logger import cli_logger @@ -76,6 +77,8 @@ def create_generic_db_connection( max_overflow=-1, ) + attach_query_tracker(engine) + if hasattr(connection, "supportsQueryComment"): listen( engine, diff --git a/ingestion/src/metadata/ingestion/connections/query_logger.py b/ingestion/src/metadata/ingestion/connections/query_logger.py new file mode 100644 index 00000000000..aa16e977224 --- /dev/null +++ b/ingestion/src/metadata/ingestion/connections/query_logger.py @@ -0,0 +1,115 @@ +# Copyright 2025 Collate +# Licensed under the Collate Community License, Version 1.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# pylint: disable=W0613 + +""" +Query tracking implementation using SQLAlchemy event listeners +""" +from datetime import datetime, timezone +from typing import Any, Dict, Optional, Tuple, Union + +from pydantic import BaseModel, ConfigDict +from sqlalchemy.event import listen +from sqlalchemy.sql.elements import TextClause + +from metadata.utils.logger import ingestion_logger + +logger = ingestion_logger() + + +class QueryInfo(BaseModel): + """Class to store information about a query execution""" + + model_config = ConfigDict(arbitrary_types_allowed=True) + + statement: Union[str, TextClause] + parameters: Optional[Union[Dict[str, Any], Tuple[Any, ...]]] + start_time: datetime + end_time: Optional[datetime] = None + duration_ms: Optional[float] = None + error: Optional[Exception] = None + + +class QueryLogger: + """Class to track SQL query execution using SQLAlchemy event listeners""" + + def __init__(self): + self._current_query: Optional[QueryInfo] = None + + def before_cursor_execute( + self, + conn: Any, + cursor: Any, + statement: Union[str, TextClause], + parameters: Optional[Dict[str, Any]], + context: Any, + executemany: bool, + ) -> Tuple[Union[str, TextClause], Optional[Dict[str, Any]]]: + """Event listener for before cursor execute""" + self._current_query = QueryInfo( + statement=statement, + parameters=parameters, + start_time=datetime.now(timezone.utc), + ) + return statement, parameters + + def after_cursor_execute( + self, + conn: Any, + cursor: Any, + statement: Union[str, TextClause], + parameters: Optional[Dict[str, Any]], + context: Any, + executemany: bool, + ) -> None: + """Event listener for after cursor execute""" + if self._current_query: + query = self._current_query + query.end_time = datetime.now(timezone.utc) + query.duration_ms = ( + query.end_time - query.start_time + ).total_seconds() * 1000 + + logger.debug( + "Query execution details:\n" + f" Start Time: {query.start_time}\n" + f" End Time: {query.end_time}\n" + f" Duration: {query.duration_ms:.2f} ms\n" + f" Query: {query.statement}\n" + f" Parameters: {query.parameters}" + ) + + self._current_query = None + + +def attach_query_tracker(engine: Any): + """ + Attach query tracking event listeners to a SQLAlchemy engine + + Args: + engine: SQLAlchemy engine to attach listeners to + + Returns: + QueryLogger instance that can be used to access query execution data + """ + tracker = QueryLogger() + + listen( + engine, + "before_cursor_execute", + tracker.before_cursor_execute, + retval=True, + ) + listen( + engine, + "after_cursor_execute", + tracker.after_cursor_execute, + ) diff --git a/ingestion/src/metadata/ingestion/source/database/sample_data.py b/ingestion/src/metadata/ingestion/source/database/sample_data.py index f2c03cebfca..2ac57bf8b2b 100644 --- a/ingestion/src/metadata/ingestion/source/database/sample_data.py +++ b/ingestion/src/metadata/ingestion/source/database/sample_data.py @@ -890,6 +890,7 @@ class SampleDataSource( tags=table["tags"], schemaDefinition=table.get("schemaDefinition"), sourceUrl=table.get("sourceUrl"), + tablePartition=table.get("tablePartition"), ) yield Either(right=table_and_db) diff --git a/ingestion/tests/cli_e2e/test_cli_bigquery.py b/ingestion/tests/cli_e2e/test_cli_bigquery.py index 9ad080c5cc7..8e1914534d2 100644 --- a/ingestion/tests/cli_e2e/test_cli_bigquery.py +++ b/ingestion/tests/cli_e2e/test_cli_bigquery.py @@ -15,16 +15,20 @@ Test Bigquery connector with CLI import random from typing import List, Tuple +import pytest + +from ingestion.tests.cli_e2e.base.e2e_types import E2EType from metadata.data_quality.api.models import TestCaseDefinition from metadata.generated.schema.entity.data.table import ( DmlOperationType, ProfileSampleType, SystemProfile, + Table, TableProfilerConfig, ) from metadata.generated.schema.tests.basic import TestCaseResult, TestCaseStatus from metadata.generated.schema.tests.testCase import TestCaseParameterValue -from metadata.generated.schema.type.basic import Timestamp +from metadata.generated.schema.type.basic import FullyQualifiedEntityName, Timestamp from .common.test_cli_db import CliCommonDB from .common_e2e_sqa_mixins import SQACommonMethods @@ -123,7 +127,7 @@ class BigqueryCliTest(CliCommonDB.TestSuite, SQACommonMethods): @staticmethod def expected_filtered_schema_includes() -> int: - return 1 + return 2 @staticmethod def expected_filtered_schema_excludes() -> int: @@ -131,7 +135,7 @@ class BigqueryCliTest(CliCommonDB.TestSuite, SQACommonMethods): @staticmethod def expected_filtered_table_includes() -> int: - return 2 + return 3 @staticmethod def expected_filtered_table_excludes() -> int: @@ -139,7 +143,7 @@ class BigqueryCliTest(CliCommonDB.TestSuite, SQACommonMethods): @staticmethod def expected_filtered_mix() -> int: - return 1 + return 2 @staticmethod def delete_queries() -> List[str]: @@ -209,3 +213,21 @@ class BigqueryCliTest(CliCommonDB.TestSuite, SQACommonMethods): def get_expected_test_case_results(self): return [TestCaseResult(testCaseStatus=TestCaseStatus.Success, timestamp=0)] + + @pytest.mark.order(9999) + def test_profiler_w_partition_table(self): + """Test profiler sample for partitioned table""" + self.build_config_file( + E2EType.INGEST_DB_FILTER_SCHEMA, {"includes": ["w_partition"]} + ) + self.run_command() + + self.build_config_file(E2EType.PROFILER, {"includes": ["w_partition"]}) + self.run_command("profile") + table: Table = self.openmetadata.get_latest_table_profile( + FullyQualifiedEntityName( + "local_bigquery.open-metadata-beta.w_partition.w_time_partition" + ) + ) + # We ingest 1 row for each day and the profiler should default to the latest partition + assert table.profile.rowCount == 1 diff --git a/ingestion/tests/unit/test_import_checker.py b/ingestion/tests/unit/test_import_checker.py new file mode 100644 index 00000000000..4d3e5bfafee --- /dev/null +++ b/ingestion/tests/unit/test_import_checker.py @@ -0,0 +1,154 @@ +# Copyright 2025 Collate +# Licensed under the Collate Community License, Version 1.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Test suite for the custom import checker pylint plugin +""" +import tempfile +import textwrap + +from astroid import nodes, parse +from pylint import lint +from pylint.reporters import BaseReporter +from pylint.testutils import UnittestLinter + +from ingestion.plugins.import_checker import ImportChecker + + +class TestReporter(BaseReporter): + """Custom reporter for testing that collects messages.""" + + def __init__(self): + super().__init__() + self.messages = [] + + def handle_message(self, msg): + self.messages.append(msg) + + def _display(self, layout): + pass + + +class TestImportChecker: + """Test cases for the ImportChecker""" + + def setup_method(self): + """Set up test cases.""" + self.linter = UnittestLinter() + self.checker = ImportChecker(self.linter) + + def _find_import_nodes(self, ast_node): + """Find all import and importfrom nodes in the AST.""" + import_nodes = [] + importfrom_nodes = [] + + for node in ast_node.nodes_of_class((nodes.Import, nodes.ImportFrom)): + if isinstance(node, nodes.Import): + import_nodes.append(node) + else: + importfrom_nodes.append(node) + + return import_nodes, importfrom_nodes + + def test_valid_imports(self): + """Test that valid imports don't trigger warnings.""" + test_code = """ + import metadata.something + from metadata import something + from metadata.something import other + """ + ast_node = parse(test_code) + import_nodes, importfrom_nodes = self._find_import_nodes(ast_node) + + for node in import_nodes: + self.checker.visit_import(node) + for node in importfrom_nodes: + self.checker.visit_importfrom(node) + + assert not self.linter.release_messages() + + def test_invalid_direct_import(self): + """Test that direct ingestion.src.metadata imports trigger warnings.""" + test_code = """ + import ingestion.src.metadata.something + """ + ast_node = parse(test_code) + import_nodes, _ = self._find_import_nodes(ast_node) + + for node in import_nodes: + self.checker.visit_import(node) + + messages = self.linter.release_messages() + assert len(messages) == 1 + assert messages[0].msg_id == "ingestion-src-import" + + def test_invalid_from_import(self): + """Test that from ingestion.src.metadata imports trigger warnings.""" + test_code = """ + from ingestion.src.metadata import something + """ + ast_node = parse(test_code) + _, importfrom_nodes = self._find_import_nodes(ast_node) + + for node in importfrom_nodes: + self.checker.visit_importfrom(node) + + messages = self.linter.release_messages() + assert len(messages) == 1 + assert messages[0].msg_id == "ingestion-src-import" + + def test_multiple_invalid_imports(self): + """Test that multiple invalid imports trigger multiple warnings.""" + test_code = """ + import ingestion.src.metadata.something + from ingestion.src.metadata import other + import ingestion.src.metadata.another + """ + ast_node = parse(test_code) + import_nodes, importfrom_nodes = self._find_import_nodes(ast_node) + + for node in import_nodes: + self.checker.visit_import(node) + for node in importfrom_nodes: + self.checker.visit_importfrom(node) + + messages = self.linter.release_messages() + assert len(messages) == 3 + assert all(msg.msg_id == "ingestion-src-import" for msg in messages) + + def test_real_file_check(self): + """Test the checker on actual files.""" + with tempfile.NamedTemporaryFile( + mode="w", suffix=".py", delete=False + ) as temp_file: + temp_file.write( + textwrap.dedent( + """ + from metadata import valid_import + import ingestion.src.metadata.something + from ingestion.src.metadata import another_thing + """ + ) + ) + temp_file.flush() + + reporter = TestReporter() + lint.Run( + ["--load-plugins=ingestion.plugins.import_checker", temp_file.name], + reporter=reporter, + exit=False, + ) + messages = reporter.messages + import_err_msg = [ + msg for msg in messages if msg.symbol == "ingestion-src-import" + ] + assert len(import_err_msg) == 2 + assert all(msg.symbol == "ingestion-src-import" for msg in import_err_msg)