Fixes #10013: Implement first stage of NoSQL profiler (#15189)

* feat(nosql-profiler): row count

1. Implemented the NoSQLProfilerInterface as an entrypoint for the nosql profiler.
2. Added the NoSQLMetric as an abstract class.
3. Implemented the interface for the MongoDB database source.
4. Implemented an e2e test using testcontainers.

* added profiler support for mongodb connection

* doc

* use int_admin_ometa in test setup

* - fixed linting issue in gx
- removed unused inheritance

* moved the nosql function into the metric class

* formatting

* validate_compose: raise exception for bad status code.

* fixed import

* format
This commit is contained in:
Imri Paran 2024-02-22 11:46:19 +01:00 committed by GitHub
parent a46d7a96ba
commit 18c22c4178
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
20 changed files with 598 additions and 10 deletions

View File

@ -23,9 +23,11 @@ def get_last_run_info() -> Tuple[str, str]:
while retries < max_retries:
log_ansi_encoded_string(message="Waiting for DAG Run data...")
time.sleep(5)
runs = requests.get(
res = requests.get(
"http://localhost:8080/api/v1/dags/sample_data/dagRuns", auth=BASIC_AUTH, timeout=REQUESTS_TIMEOUT
).json()
)
res.raise_for_status()
runs = res.json()
dag_runs = runs.get("dag_runs")
if dag_runs[0].get("dag_run_id"):
return dag_runs[0].get("dag_run_id"), "success"

View File

@ -313,6 +313,7 @@ test = {
VERSIONS["snowflake"],
VERSIONS["elasticsearch8"],
VERSIONS["giturlparse"],
"testcontainers==3.7.1",
}
e2e_test = {

View File

@ -107,7 +107,7 @@ class OpenMetadataValidationAction(ValidationAction):
self.config_file_path = config_file_path
self.ometa_conn = self._create_ometa_connection()
def _run( # pylint: disable=unused-argument
def _run( # pylint: disable=unused-argument,arguments-renamed
self,
validation_result_suite: ExpectationSuiteValidationResult,
validation_result_suite_identifier: Union[
@ -124,7 +124,6 @@ class OpenMetadataValidationAction(ValidationAction):
validation_result_suite: result suite returned when checkpoint is ran
validation_result_suite_identifier: type of result suite
data_asset:
payload:
expectation_suite_identifier: type of expectation suite
checkpoint_identifier: identifier for the checkpoint
"""

View File

@ -0,0 +1,79 @@
# Copyright 2024 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
factory for NoSQL adaptors that are used in the NoSQLProfiler.
"""
from typing import Callable, Dict, Type
from pymongo import MongoClient
from metadata.profiler.adaptors.mongodb import MongoDB
from metadata.profiler.adaptors.nosql_adaptor import NoSQLAdaptor
NoSQLAdaptorConstructor = Callable[[any], NoSQLAdaptor]
class NoSQLAdaptorFactory:
"""
A factory class for creating NoSQL client instances.
This class maintains a registry of callable constructors for different NoSQL client types.
The client types are registered with their corresponding constructors,
and can be created using the `construct` method.
Attributes:
_clients (Dict[str, NoSQLClientConstructor]): A dictionary mapping client type names to their constructors.
Methods:
register(source_class: Type, target_class: NoSQLClientConstructor): Register a client type with its constructor.
construct(source_client: any) -> NoSQLClient: Create a client instance of the type of the given source client.
"""
def __init__(self):
"""
Initialize a new instance of NoSQLClientFactory.
"""
self._clients: Dict[str, NoSQLAdaptorConstructor] = {}
def register(self, source_class: Type, target_class: NoSQLAdaptorConstructor):
"""
Register a client type with its constructor.
Args:
source_class (Type): The class of the source client.
target_class (NoSQLClientConstructor): The constructor for the target client.
Returns:
None
"""
self._clients[source_class.__name__] = target_class
def construct(self, source_client: any) -> NoSQLAdaptor:
"""
Create a client instance of the type of the given source client.
Args:
source_client (any): The source client instance.
Returns:
NoSQLAdaptor: The created client instance.
Raises:
ValueError: If the type of the source client is not registered.
"""
client_class = self._clients.get(type(source_client).__name__)
if not client_class:
raise ValueError(f"Unknown NoSQL source: {source_client.__name__}")
return client_class(source_client)
factory = NoSQLAdaptorFactory()
factory.register(MongoClient, MongoDB)

View File

@ -0,0 +1,29 @@
# Copyright 2024 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
MongoDB adaptor for the NoSQL profiler.
"""
from pymongo import MongoClient
from metadata.generated.schema.entity.data.table import Table
from metadata.profiler.adaptors.nosql_adaptor import NoSQLAdaptor
class MongoDB(NoSQLAdaptor):
"""A MongoDB client that serves as an adaptor for profiling data assets on MongoDB"""
def __init__(self, client: MongoClient):
self.client = client
def get_row_count(self, table: Table) -> int:
db = self.client[table.databaseSchema.name]
collection = db[table.name.__root__]
return collection.count_documents({})

View File

@ -0,0 +1,22 @@
# Copyright 2024 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
NoSQL adaptor for the NoSQL profiler.
"""
from abc import ABC, abstractmethod
from metadata.generated.schema.entity.data.table import Table
class NoSQLAdaptor(ABC):
@abstractmethod
def get_row_count(self, table: Table) -> int:
raise NotImplementedError

View File

@ -122,7 +122,7 @@ class ProfilerResponse(ConfigModel):
class ThreadPoolMetrics(ConfigModel):
"""thread pool metric"""
"""A container for all metrics to be computed on the same thread."""
metrics: Union[List[Union[Type[Metric], CustomMetric]], Type[Metric]]
metric_type: MetricTypes

View File

@ -0,0 +1,182 @@
# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# pylint: disable=arguments-differ
"""
Interfaces with database for all database engine
supporting sqlalchemy abstraction layer
"""
import traceback
from typing import Dict, List, Optional, Type
from sqlalchemy import Column
from metadata.generated.schema.entity.data.table import TableData
from metadata.generated.schema.tests.customMetric import CustomMetric
from metadata.profiler.adaptors.factory import factory
from metadata.profiler.adaptors.nosql_adaptor import NoSQLAdaptor
from metadata.profiler.api.models import ThreadPoolMetrics
from metadata.profiler.interface.profiler_interface import ProfilerInterface
from metadata.profiler.metrics.core import Metric, MetricTypes
from metadata.profiler.metrics.registry import Metrics
from metadata.utils.logger import profiler_interface_registry_logger
from metadata.utils.sqa_like_column import SQALikeColumn
logger = profiler_interface_registry_logger()
class NoSQLProfilerInterface(ProfilerInterface):
"""
Interface to interact with registry supporting
sqlalchemy.
"""
# pylint: disable=too-many-arguments
def _get_sampler(self):
return None
def _compute_table_metrics(
self,
metrics: List[Type[Metric]],
runner: NoSQLAdaptor,
*args,
**kwargs,
):
result = {}
for metric in metrics:
try:
fn = metric().nosql_fn(runner)
result[metric.name()] = fn(self.table)
except Exception as exc:
logger.debug(
f"{traceback.format_exc()}\n"
f"Error trying to compute metric {metric} for {self.table.fullyQualifiedName}: {exc}"
)
raise RuntimeError(
f"Error trying to compute metric {metric.name()} for {self.table.fullyQualifiedName}: {exc}"
)
return result
def _compute_static_metrics(
self,
metrics: List[Metrics],
runner: List,
column,
*args,
**kwargs,
):
return None
def _compute_query_metrics(
self,
metric: Metrics,
runner,
*args,
**kwargs,
):
return None
def _compute_window_metrics(
self,
metrics: List[Metrics],
runner,
*args,
**kwargs,
):
return None
def _compute_system_metrics(
self,
metrics: Metrics,
runner: List,
*args,
**kwargs,
):
return None
def _compute_custom_metrics(
self, metrics: List[CustomMetric], runner, *args, **kwargs
):
return None
def compute_metrics(
self,
client: NoSQLAdaptor,
metric_func: ThreadPoolMetrics,
):
"""Run metrics in processor worker"""
logger.debug(f"Running profiler for {metric_func.table}")
try:
row = self._get_metric_fn[metric_func.metric_type.value](
metric_func.metrics,
client,
)
except Exception as exc:
name = f"{metric_func.column if metric_func.column is not None else metric_func.table}"
error = f"{name} metric_type.value: {exc}"
logger.error(error)
self.status.failed_profiler(error, traceback.format_exc())
row = None
if metric_func.column is not None:
column = metric_func.column.name
self.status.scanned(f"{metric_func.table.name.__root__}.{column}")
else:
self.status.scanned(metric_func.table.name.__root__)
column = None
return row, column, metric_func.metric_type.value
def fetch_sample_data(self, table, columns: SQALikeColumn) -> TableData:
return None
def get_composed_metrics(
self, column: Column, metric: Metrics, column_results: Dict
):
return None
def get_hybrid_metrics(
self, column: Column, metric: Metrics, column_results: Dict, **kwargs
):
return None
def get_all_metrics(
self,
metric_funcs: List[ThreadPoolMetrics],
):
"""get all profiler metrics"""
profile_results = {"table": {}, "columns": {}}
runner = factory.construct(self.connection)
metric_list = [
self.compute_metrics(runner, metric_func) for metric_func in metric_funcs
]
for metric_result in metric_list:
profile, column, metric_type = metric_result
if profile:
if metric_type == MetricTypes.Table.value:
profile_results["table"].update(profile)
if metric_type == MetricTypes.System.value:
profile_results["system"] = profile
elif metric_type == MetricTypes.Custom.value and column is None:
profile_results["table"].update(profile)
else:
pass
return profile_results
@property
def table(self):
"""OM Table entity"""
return self.table_entity
def get_columns(self) -> List[Optional[SQALikeColumn]]:
return []
def close(self):
self.connection.close()

View File

@ -30,6 +30,9 @@ from metadata.generated.schema.entity.services.connections.database.db2Connectio
from metadata.generated.schema.entity.services.connections.database.mariaDBConnection import (
MariaDBConnection,
)
from metadata.generated.schema.entity.services.connections.database.mongoDBConnection import (
MongoDBConnection,
)
from metadata.generated.schema.entity.services.connections.database.singleStoreConnection import (
SingleStoreConnection,
)
@ -43,6 +46,7 @@ from metadata.generated.schema.entity.services.connections.database.unityCatalog
UnityCatalogConnection,
)
from metadata.generated.schema.entity.services.databaseService import DatabaseConnection
from metadata.profiler.interface.nosql.profiler_interface import NoSQLProfilerInterface
from metadata.profiler.interface.pandas.profiler_interface import (
PandasProfilerInterface,
)
@ -118,6 +122,7 @@ profilers = {
UnityCatalogConnection.__name__: UnityCatalogProfilerInterface,
DatabricksConnection.__name__: DatabricksProfilerInterface,
Db2Connection.__name__: DB2ProfilerInterface,
MongoDBConnection.__name__: NoSQLProfilerInterface,
}
profiler_interface_factory.register_many(profilers)

View File

@ -18,11 +18,14 @@ Metric Core definitions
from abc import ABC, abstractmethod
from enum import Enum
from functools import wraps
from typing import Any, Dict, Optional, Tuple, TypeVar
from typing import Any, Callable, Dict, Optional, Tuple, TypeVar
from sqlalchemy import Column
from sqlalchemy.orm import DeclarativeMeta, Session
from metadata.generated.schema.entity.data.table import Table
from metadata.profiler.adaptors.nosql_adaptor import NoSQLAdaptor
# When creating complex metrics, use inherit_cache = CACHE
CACHE = True
@ -87,6 +90,9 @@ def add_props(**kwargs):
return inner
T = TypeVar("T")
class Metric(ABC):
"""
Parent class metric
@ -153,6 +159,13 @@ class Metric(ABC):
"""
return self.col.type.python_type if self.col else None
def nosql_fn(self, client: NoSQLAdaptor) -> Callable[[Table], Optional[T]]:
"""
Return the function to be used for NoSQL clients to calculate the metric.
By default, returns a "do nothing" function that returns None.
"""
return lambda table: None
TMetric = TypeVar("TMetric", bound=Metric)

View File

@ -12,11 +12,12 @@
"""
Table Count Metric definition
"""
# pylint: disable=duplicate-code
from typing import Callable
from sqlalchemy import func
from metadata.generated.schema.entity.data.table import Table
from metadata.profiler.adaptors.nosql_adaptor import NoSQLAdaptor
from metadata.profiler.metrics.core import StaticMetric, _label
@ -50,3 +51,7 @@ class RowCount(StaticMetric):
def df_fn(self, dfs=None):
"""pandas function"""
return sum(len(df.index) for df in dfs)
@classmethod
def nosql_fn(cls, client: NoSQLAdaptor) -> Callable[[Table], int]:
return client.get_row_count

View File

@ -196,7 +196,10 @@ class Profiler(Generic[TMetric]):
CreateTableProfileRequest:
"""
for attrs, val in profile.tableProfile:
if attrs not in {"timestamp", "profileSample", "profileSampleType"} and val:
if (
attrs not in {"timestamp", "profileSample", "profileSampleType"}
and val is not None
):
return
for col_element in profile.columnProfile:

View File

@ -0,0 +1,65 @@
# Copyright 2024 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Utility functions for testing
"""
from contextlib import contextmanager
class MultipleException(Exception):
def __init__(self, exceptions):
self.exceptions = exceptions
super().__init__(f"Multiple exceptions occurred: {exceptions}")
class ErrorHandler:
"""
A context manager that accumulates errors and raises them at the end of the block.
Useful for cleaning up resources and ensuring that all errors are raised at the end of a test.
Example:
```
from metadata.utils.test_utils import accumulate_errors
with accumulate_errors() as error_handler:
error_handler.try_execute(lambda : 1 / 0)
error_handler.try_execute(print, "Hello, World!")
```
```
> Hello, World!
> Traceback (most recent call last):
> ...
> ZeroDivisionError: division by zero
```
"""
def __init__(self):
self.errors = []
def try_execute(self, func, *args, **kwargs):
try:
func(*args, **kwargs)
except Exception as e:
self.errors.append(e)
def raise_if_errors(self):
if len(self.errors) == 1:
raise self.errors[0]
if len(self.errors) > 1:
raise MultipleException(self.errors)
@contextmanager
def accumulate_errors():
error_handler = ErrorHandler()
try:
yield error_handler
finally:
error_handler.raise_if_errors()

View File

@ -0,0 +1,179 @@
# Copyright 2024 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Test the NoSQL profiler using a MongoDB container
To run this we need OpenMetadata server up and running.
No sample data is required beforehand
Test Steps:
1. Start a MongoDB container
2. Ingest data into OpenMetadata
3. Run the profiler workflow
4. Verify the profiler output
5. Tear down the MongoDB container and delete the service from OpenMetadata
"""
from copy import deepcopy
from datetime import datetime, timedelta
from functools import partial
from pathlib import Path
from unittest import TestCase
from pymongo import MongoClient
from testcontainers.mongodb import MongoDbContainer
from ingestion.tests.integration.integration_base import int_admin_ometa
from metadata.generated.schema.entity.data.table import ColumnProfile
from metadata.generated.schema.entity.services.databaseService import DatabaseService
from metadata.utils.helpers import datetime_to_ts
from metadata.utils.test_utils import accumulate_errors
from metadata.utils.time_utils import get_end_of_day_timestamp_mill
from metadata.workflow.metadata import MetadataWorkflow
from metadata.workflow.profiler import ProfilerWorkflow
from metadata.workflow.workflow_output_handler import print_status
SERVICE_NAME = Path(__file__).stem
def get_ingestion_config(mongo_port: str, mongo_user: str, mongo_pass: str):
return {
"source": {
"type": "mongodb",
"serviceName": SERVICE_NAME,
"serviceConnection": {
"config": {
"type": "MongoDB",
"hostPort": f"localhost:{mongo_port}",
"username": mongo_user,
"password": mongo_pass,
}
},
"sourceConfig": {"config": {"type": "DatabaseMetadata"}},
},
"sink": {"type": "metadata-rest", "config": {}},
"workflowConfig": {
"openMetadataServerConfig": {
"hostPort": "http://localhost:8585/api",
"authProvider": "openmetadata",
"securityConfig": {
"jwtToken": "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg"
},
}
},
}
TEST_DATABASE = "test-database"
EMPTY_COLLECTION = "empty-collection"
TEST_COLLECTION = "test-collection"
TEST_DATA = [
{
"first_name": "John",
"last_name": "Doe",
"age": 30,
},
{
"first_name": "Jane",
"last_name": "Doe",
"age": 25,
},
{
"first_name": "John",
"last_name": "Smith",
"age": 35,
},
]
class NoSQLProfiler(TestCase):
"""datalake profiler E2E test"""
@classmethod
def setUpClass(cls) -> None:
cls.metadata = int_admin_ometa()
def setUp(self) -> None:
self.mongo_container = MongoDbContainer("mongo:7.0.5-jammy")
self.mongo_container.start()
self.client = MongoClient(self.mongo_container.get_connection_url())
self.db = self.client[TEST_DATABASE]
self.collection = self.db[TEST_COLLECTION]
self.collection.insert_many(TEST_DATA)
self.db.create_collection(EMPTY_COLLECTION)
self.ingestion_config = get_ingestion_config(
self.mongo_container.get_exposed_port("27017"), "test", "test"
)
ingestion_workflow = MetadataWorkflow.create(
self.ingestion_config,
)
ingestion_workflow.execute()
ingestion_workflow.raise_from_status()
print_status(ingestion_workflow)
ingestion_workflow.stop()
def tearDown(self):
with accumulate_errors() as error_handler:
error_handler.try_execute(partial(self.mongo_container.stop, force=True))
error_handler.try_execute(self.delete_service)
def delete_service(self):
service_id = str(
self.metadata.get_by_name(
entity=DatabaseService, fqn=SERVICE_NAME
).id.__root__
)
self.metadata.delete(
entity=DatabaseService,
entity_id=service_id,
recursive=True,
hard_delete=True,
)
def test_setup_teardown(self):
pass
def test_row_count(self):
workflow_config = deepcopy(self.ingestion_config)
workflow_config["source"]["sourceConfig"]["config"].update(
{
"type": "Profiler",
}
)
workflow_config["processor"] = {
"type": "orm-profiler",
"config": {},
}
profiler_workflow = ProfilerWorkflow.create(workflow_config)
profiler_workflow.execute()
status = profiler_workflow.result_status()
profiler_workflow.stop()
assert status == 0
expectations = {TEST_COLLECTION: 3, EMPTY_COLLECTION: 0}
for collection, expected_row_count in expectations.items():
collection_profile = self.metadata.get_profile_data(
f"{SERVICE_NAME}.default.{TEST_DATABASE}.{collection}",
datetime_to_ts(datetime.now() - timedelta(seconds=10)),
get_end_of_day_timestamp_mill(),
)
assert collection_profile.entities
assert collection_profile.entities[-1].rowCount == expected_row_count
column_profile = self.metadata.get_profile_data(
f"{SERVICE_NAME}.default.{TEST_DATABASE}.{TEST_COLLECTION}.age",
datetime_to_ts(datetime.now() - timedelta(seconds=10)),
get_end_of_day_timestamp_mill(),
profile_type=ColumnProfile,
)
assert len(column_profile.entities) == 0

View File

@ -66,6 +66,10 @@
"supportsMetadataExtraction": {
"title": "Supports Metadata Extraction",
"$ref": "../connectionBasicType.json#/definitions/supportsMetadataExtraction"
},
"supportsProfiler": {
"title": "Supports Profiler",
"$ref": "../connectionBasicType.json#/definitions/supportsProfiler"
}
},
"required": ["hostPort"],

View File

@ -1,4 +1,4 @@
# Hive
# MongoDB
In this section, we provide guides and references to use the MongoDB connector. You can view the full documentation for MongoDB [here](https://docs.open-metadata.org/connectors/database/mongo).
## Requirements