ISSUE #21101 - Implement BQ Partitioned Tests (#21348)

* feat: add query logger as an event listent in debug mode

* fix: added ingestion.src plugin to pylint

* minor: add partition sampled table

* test: added test for partitioned BQ table

* Remove log_query function from logger.py

* style: ran python linting
This commit is contained in:
Teddy 2025-05-22 17:22:05 +02:00 committed by GitHub
parent 8138a47b20
commit 7ab6755beb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 387 additions and 9 deletions

View File

@ -21,7 +21,7 @@ module-rgx=(([a-z_][a-z0-9_]*)|([a-zA-Z0-9]+))$
fail-under=6.0
init-hook='from pylint.config import find_default_config_files; import os, sys; sys.path.append(os.path.dirname(next(find_default_config_files())))'
extension-pkg-allow-list=pydantic
load-plugins=ingestion.plugins.print_checker
load-plugins=ingestion.plugins.print_checker,ingestion.plugins.import_checker
max-public-methods=25
[MESSAGES CONTROL]

View File

@ -15039,7 +15039,16 @@
"updatedAt": 1638354087862,
"updatedBy": "anonymous",
"href": "http://localhost:8585/api/v1/tables/96b4af15-3bdf-4483-a602-3014bbd8ebc6",
"tableType": "Regular",
"tableType": "Partitioned",
"tablePartition": {
"columns": [
{
"columnName": "_PARTITIONTIME",
"intervalType": "INGESTION-TIME",
"interval": "DAY"
}
]
},
"columns": [
{
"name": "comments",
@ -15694,7 +15703,16 @@
"updatedAt": 1638354087891,
"updatedBy": "anonymous",
"href": "http://localhost:8585/api/v1/tables/afce49e7-8ae1-41e3-971e-8f79c254b24a",
"tableType": "Regular",
"tableType": "Partitioned",
"tablePartition": {
"columns": [
{
"columnName": "_PARTITIONDATE",
"intervalType": "INGESTION-TIME",
"interval": "DAY"
}
]
},
"columns": [
{
"name": "comments",

View File

@ -0,0 +1,65 @@
# Copyright 2025 Collate
# Licensed under the Collate Community License, Version 1.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Custom pylint plugin to catch `ingest.src` imports
"""
from typing import TYPE_CHECKING
from astroid import nodes
from pylint.checkers import BaseChecker
from pylint.checkers.utils import only_required_for_messages
if TYPE_CHECKING:
from pylint.lint import PyLinter
class ImportChecker(BaseChecker):
"""
Check for any `ingestion.src.metadata` imports
"""
name = "no_ingestion_src_imports"
_symbol = "ingestion-src-import"
msgs = {
"W5002": (
"Found ingestion.src.metadata import",
_symbol,
"`ingestion.src.metadata` imports are not allowed, use `metadata.` instead",
)
}
@only_required_for_messages("ingestion-src-import")
def visit_import(self, node: nodes.Import) -> None:
"""Check for direct imports of ingestion.src.metadata"""
for name_tuple in node.names:
if isinstance(name_tuple, tuple) and name_tuple[0].startswith(
"ingestion.src.metadata"
):
self.add_message(self._symbol, node=node)
@only_required_for_messages("ingestion-src-import")
def visit_importfrom(self, node: nodes.ImportFrom) -> None:
"""Check for from ingestion.src.metadata imports"""
if (
node.modname
and isinstance(node.modname, str)
and node.modname.startswith("ingestion.src.metadata")
):
self.add_message(self._symbol, node=node)
def register(linter: "PyLinter") -> None:
"""
This required method auto registers the checker during initialization.
:param linter: The linter to register the checker to.
"""
linter.register_checker(ImportChecker(linter))

View File

@ -100,7 +100,7 @@ module-rgx = "(([a-z_][a-z0-9_]*)|([a-zA-Z0-9]+))$"
fail-under = 6.0
init-hook = "from pylint.config import find_default_config_files; import os, sys; sys.path.append(os.path.dirname(next(find_default_config_files())))"
extension-pkg-allow-list = "pydantic"
load-plugins = "ingestion.plugins.print_checker"
load-plugins = "ingestion.plugins.print_checker,ingestion.plugins.import_checker"
max-public-methods = 25
ignore-paths = [

View File

@ -440,7 +440,7 @@ test = {
"python-liquid",
VERSIONS["google-cloud-bigtable"],
*plugins["bigquery"],
"faker==37.1.0", # Fixed the version to prevent flaky tests!
"Faker==37.1.0", # Fixed the version to prevent flaky tests!
}
if sys.version_info >= (3, 9):

View File

@ -31,6 +31,7 @@ from metadata.generated.schema.entity.services.connections.database.common.iamAu
IamAuthConfigurationSource,
)
from metadata.ingestion.connections.headers import inject_query_header_by_conn
from metadata.ingestion.connections.query_logger import attach_query_tracker
from metadata.ingestion.connections.secrets import connection_with_options_secrets
from metadata.utils.constants import BUILDER_PASSWORD_ATTR
from metadata.utils.logger import cli_logger
@ -76,6 +77,8 @@ def create_generic_db_connection(
max_overflow=-1,
)
attach_query_tracker(engine)
if hasattr(connection, "supportsQueryComment"):
listen(
engine,

View File

@ -0,0 +1,115 @@
# Copyright 2025 Collate
# Licensed under the Collate Community License, Version 1.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# pylint: disable=W0613
"""
Query tracking implementation using SQLAlchemy event listeners
"""
from datetime import datetime, timezone
from typing import Any, Dict, Optional, Tuple, Union
from pydantic import BaseModel, ConfigDict
from sqlalchemy.event import listen
from sqlalchemy.sql.elements import TextClause
from metadata.utils.logger import ingestion_logger
logger = ingestion_logger()
class QueryInfo(BaseModel):
"""Class to store information about a query execution"""
model_config = ConfigDict(arbitrary_types_allowed=True)
statement: Union[str, TextClause]
parameters: Optional[Union[Dict[str, Any], Tuple[Any, ...]]]
start_time: datetime
end_time: Optional[datetime] = None
duration_ms: Optional[float] = None
error: Optional[Exception] = None
class QueryLogger:
"""Class to track SQL query execution using SQLAlchemy event listeners"""
def __init__(self):
self._current_query: Optional[QueryInfo] = None
def before_cursor_execute(
self,
conn: Any,
cursor: Any,
statement: Union[str, TextClause],
parameters: Optional[Dict[str, Any]],
context: Any,
executemany: bool,
) -> Tuple[Union[str, TextClause], Optional[Dict[str, Any]]]:
"""Event listener for before cursor execute"""
self._current_query = QueryInfo(
statement=statement,
parameters=parameters,
start_time=datetime.now(timezone.utc),
)
return statement, parameters
def after_cursor_execute(
self,
conn: Any,
cursor: Any,
statement: Union[str, TextClause],
parameters: Optional[Dict[str, Any]],
context: Any,
executemany: bool,
) -> None:
"""Event listener for after cursor execute"""
if self._current_query:
query = self._current_query
query.end_time = datetime.now(timezone.utc)
query.duration_ms = (
query.end_time - query.start_time
).total_seconds() * 1000
logger.debug(
"Query execution details:\n"
f" Start Time: {query.start_time}\n"
f" End Time: {query.end_time}\n"
f" Duration: {query.duration_ms:.2f} ms\n"
f" Query: {query.statement}\n"
f" Parameters: {query.parameters}"
)
self._current_query = None
def attach_query_tracker(engine: Any):
"""
Attach query tracking event listeners to a SQLAlchemy engine
Args:
engine: SQLAlchemy engine to attach listeners to
Returns:
QueryLogger instance that can be used to access query execution data
"""
tracker = QueryLogger()
listen(
engine,
"before_cursor_execute",
tracker.before_cursor_execute,
retval=True,
)
listen(
engine,
"after_cursor_execute",
tracker.after_cursor_execute,
)

View File

@ -890,6 +890,7 @@ class SampleDataSource(
tags=table["tags"],
schemaDefinition=table.get("schemaDefinition"),
sourceUrl=table.get("sourceUrl"),
tablePartition=table.get("tablePartition"),
)
yield Either(right=table_and_db)

View File

@ -15,16 +15,20 @@ Test Bigquery connector with CLI
import random
from typing import List, Tuple
import pytest
from ingestion.tests.cli_e2e.base.e2e_types import E2EType
from metadata.data_quality.api.models import TestCaseDefinition
from metadata.generated.schema.entity.data.table import (
DmlOperationType,
ProfileSampleType,
SystemProfile,
Table,
TableProfilerConfig,
)
from metadata.generated.schema.tests.basic import TestCaseResult, TestCaseStatus
from metadata.generated.schema.tests.testCase import TestCaseParameterValue
from metadata.generated.schema.type.basic import Timestamp
from metadata.generated.schema.type.basic import FullyQualifiedEntityName, Timestamp
from .common.test_cli_db import CliCommonDB
from .common_e2e_sqa_mixins import SQACommonMethods
@ -123,7 +127,7 @@ class BigqueryCliTest(CliCommonDB.TestSuite, SQACommonMethods):
@staticmethod
def expected_filtered_schema_includes() -> int:
return 1
return 2
@staticmethod
def expected_filtered_schema_excludes() -> int:
@ -131,7 +135,7 @@ class BigqueryCliTest(CliCommonDB.TestSuite, SQACommonMethods):
@staticmethod
def expected_filtered_table_includes() -> int:
return 2
return 3
@staticmethod
def expected_filtered_table_excludes() -> int:
@ -139,7 +143,7 @@ class BigqueryCliTest(CliCommonDB.TestSuite, SQACommonMethods):
@staticmethod
def expected_filtered_mix() -> int:
return 1
return 2
@staticmethod
def delete_queries() -> List[str]:
@ -209,3 +213,21 @@ class BigqueryCliTest(CliCommonDB.TestSuite, SQACommonMethods):
def get_expected_test_case_results(self):
return [TestCaseResult(testCaseStatus=TestCaseStatus.Success, timestamp=0)]
@pytest.mark.order(9999)
def test_profiler_w_partition_table(self):
"""Test profiler sample for partitioned table"""
self.build_config_file(
E2EType.INGEST_DB_FILTER_SCHEMA, {"includes": ["w_partition"]}
)
self.run_command()
self.build_config_file(E2EType.PROFILER, {"includes": ["w_partition"]})
self.run_command("profile")
table: Table = self.openmetadata.get_latest_table_profile(
FullyQualifiedEntityName(
"local_bigquery.open-metadata-beta.w_partition.w_time_partition"
)
)
# We ingest 1 row for each day and the profiler should default to the latest partition
assert table.profile.rowCount == 1

View File

@ -0,0 +1,154 @@
# Copyright 2025 Collate
# Licensed under the Collate Community License, Version 1.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Test suite for the custom import checker pylint plugin
"""
import tempfile
import textwrap
from astroid import nodes, parse
from pylint import lint
from pylint.reporters import BaseReporter
from pylint.testutils import UnittestLinter
from ingestion.plugins.import_checker import ImportChecker
class TestReporter(BaseReporter):
"""Custom reporter for testing that collects messages."""
def __init__(self):
super().__init__()
self.messages = []
def handle_message(self, msg):
self.messages.append(msg)
def _display(self, layout):
pass
class TestImportChecker:
"""Test cases for the ImportChecker"""
def setup_method(self):
"""Set up test cases."""
self.linter = UnittestLinter()
self.checker = ImportChecker(self.linter)
def _find_import_nodes(self, ast_node):
"""Find all import and importfrom nodes in the AST."""
import_nodes = []
importfrom_nodes = []
for node in ast_node.nodes_of_class((nodes.Import, nodes.ImportFrom)):
if isinstance(node, nodes.Import):
import_nodes.append(node)
else:
importfrom_nodes.append(node)
return import_nodes, importfrom_nodes
def test_valid_imports(self):
"""Test that valid imports don't trigger warnings."""
test_code = """
import metadata.something
from metadata import something
from metadata.something import other
"""
ast_node = parse(test_code)
import_nodes, importfrom_nodes = self._find_import_nodes(ast_node)
for node in import_nodes:
self.checker.visit_import(node)
for node in importfrom_nodes:
self.checker.visit_importfrom(node)
assert not self.linter.release_messages()
def test_invalid_direct_import(self):
"""Test that direct ingestion.src.metadata imports trigger warnings."""
test_code = """
import ingestion.src.metadata.something
"""
ast_node = parse(test_code)
import_nodes, _ = self._find_import_nodes(ast_node)
for node in import_nodes:
self.checker.visit_import(node)
messages = self.linter.release_messages()
assert len(messages) == 1
assert messages[0].msg_id == "ingestion-src-import"
def test_invalid_from_import(self):
"""Test that from ingestion.src.metadata imports trigger warnings."""
test_code = """
from ingestion.src.metadata import something
"""
ast_node = parse(test_code)
_, importfrom_nodes = self._find_import_nodes(ast_node)
for node in importfrom_nodes:
self.checker.visit_importfrom(node)
messages = self.linter.release_messages()
assert len(messages) == 1
assert messages[0].msg_id == "ingestion-src-import"
def test_multiple_invalid_imports(self):
"""Test that multiple invalid imports trigger multiple warnings."""
test_code = """
import ingestion.src.metadata.something
from ingestion.src.metadata import other
import ingestion.src.metadata.another
"""
ast_node = parse(test_code)
import_nodes, importfrom_nodes = self._find_import_nodes(ast_node)
for node in import_nodes:
self.checker.visit_import(node)
for node in importfrom_nodes:
self.checker.visit_importfrom(node)
messages = self.linter.release_messages()
assert len(messages) == 3
assert all(msg.msg_id == "ingestion-src-import" for msg in messages)
def test_real_file_check(self):
"""Test the checker on actual files."""
with tempfile.NamedTemporaryFile(
mode="w", suffix=".py", delete=False
) as temp_file:
temp_file.write(
textwrap.dedent(
"""
from metadata import valid_import
import ingestion.src.metadata.something
from ingestion.src.metadata import another_thing
"""
)
)
temp_file.flush()
reporter = TestReporter()
lint.Run(
["--load-plugins=ingestion.plugins.import_checker", temp_file.name],
reporter=reporter,
exit=False,
)
messages = reporter.messages
import_err_msg = [
msg for msg in messages if msg.symbol == "ingestion-src-import"
]
assert len(import_err_msg) == 2
assert all(msg.symbol == "ingestion-src-import" for msg in import_err_msg)