From 18c22c4178d195aa9ff94e853a42c1d63b2722a3 Mon Sep 17 00:00:00 2001 From: Imri Paran Date: Thu, 22 Feb 2024 11:46:19 +0100 Subject: [PATCH] Fixes #10013: Implement first stage of NoSQL profiler (#15189) * feat(nosql-profiler): row count 1. Implemented the NoSQLProfilerInterface as an entrypoint for the nosql profiler. 2. Added the NoSQLMetric as an abstract class. 3. Implemented the interface for the MongoDB database source. 4. Implemented an e2e test using testcontainers. * added profiler support for mongodb connection * doc * use int_admin_ometa in test setup * - fixed linting issue in gx - removed unused inheritance * moved the nosql function into the metric class * formatting * validate_compose: raise exception for bad status code. * fixed import * format --- docker/validate_compose.py | 6 +- ingestion/setup.py | 1 + .../src/metadata}/__init__.py | 0 .../src/metadata/great_expectations/action.py | 3 +- ingestion/src/metadata/profiler/__init__.py | 0 .../metadata/profiler/adaptors/__init__.py | 0 .../src/metadata/profiler/adaptors/factory.py | 79 ++++++++ .../src/metadata/profiler/adaptors/mongodb.py | 29 +++ .../profiler/adaptors/nosql_adaptor.py | 22 +++ ingestion/src/metadata/profiler/api/models.py | 2 +- .../interface/nosql/profiler_interface.py | 182 ++++++++++++++++++ .../interface/profiler_interface_factory.py | 5 + .../src/metadata/profiler/metrics/core.py | 15 +- .../profiler/metrics/static/row_count.py | 9 +- .../src/metadata/profiler/processor/core.py | 5 +- ingestion/src/metadata/utils/test_utils.py | 65 +++++++ .../tests/integration/profiler/__init__.py | 0 .../profiler/test_nosql_profiler.py | 179 +++++++++++++++++ .../database/mongoDBConnection.json | 4 + .../public/locales/en-US/Database/MongoDB.md | 2 +- 20 files changed, 598 insertions(+), 10 deletions(-) rename {docker => ingestion/src/metadata}/__init__.py (100%) create mode 100644 ingestion/src/metadata/profiler/__init__.py create mode 100644 ingestion/src/metadata/profiler/adaptors/__init__.py create mode 100644 ingestion/src/metadata/profiler/adaptors/factory.py create mode 100644 ingestion/src/metadata/profiler/adaptors/mongodb.py create mode 100644 ingestion/src/metadata/profiler/adaptors/nosql_adaptor.py create mode 100644 ingestion/src/metadata/profiler/interface/nosql/profiler_interface.py create mode 100644 ingestion/src/metadata/utils/test_utils.py create mode 100644 ingestion/tests/integration/profiler/__init__.py create mode 100644 ingestion/tests/integration/profiler/test_nosql_profiler.py diff --git a/docker/validate_compose.py b/docker/validate_compose.py index c90b46b311e..812d5e49596 100644 --- a/docker/validate_compose.py +++ b/docker/validate_compose.py @@ -23,9 +23,11 @@ def get_last_run_info() -> Tuple[str, str]: while retries < max_retries: log_ansi_encoded_string(message="Waiting for DAG Run data...") time.sleep(5) - runs = requests.get( + res = requests.get( "http://localhost:8080/api/v1/dags/sample_data/dagRuns", auth=BASIC_AUTH, timeout=REQUESTS_TIMEOUT - ).json() + ) + res.raise_for_status() + runs = res.json() dag_runs = runs.get("dag_runs") if dag_runs[0].get("dag_run_id"): return dag_runs[0].get("dag_run_id"), "success" diff --git a/ingestion/setup.py b/ingestion/setup.py index fc47b5330e6..13e9498b90a 100644 --- a/ingestion/setup.py +++ b/ingestion/setup.py @@ -313,6 +313,7 @@ test = { VERSIONS["snowflake"], VERSIONS["elasticsearch8"], VERSIONS["giturlparse"], + "testcontainers==3.7.1", } e2e_test = { diff --git a/docker/__init__.py b/ingestion/src/metadata/__init__.py similarity index 100% rename from docker/__init__.py rename to ingestion/src/metadata/__init__.py diff --git a/ingestion/src/metadata/great_expectations/action.py b/ingestion/src/metadata/great_expectations/action.py index 1ab7feb0131..7a475cc6a10 100644 --- a/ingestion/src/metadata/great_expectations/action.py +++ b/ingestion/src/metadata/great_expectations/action.py @@ -107,7 +107,7 @@ class OpenMetadataValidationAction(ValidationAction): self.config_file_path = config_file_path self.ometa_conn = self._create_ometa_connection() - def _run( # pylint: disable=unused-argument + def _run( # pylint: disable=unused-argument,arguments-renamed self, validation_result_suite: ExpectationSuiteValidationResult, validation_result_suite_identifier: Union[ @@ -124,7 +124,6 @@ class OpenMetadataValidationAction(ValidationAction): validation_result_suite: result suite returned when checkpoint is ran validation_result_suite_identifier: type of result suite data_asset: - payload: expectation_suite_identifier: type of expectation suite checkpoint_identifier: identifier for the checkpoint """ diff --git a/ingestion/src/metadata/profiler/__init__.py b/ingestion/src/metadata/profiler/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/ingestion/src/metadata/profiler/adaptors/__init__.py b/ingestion/src/metadata/profiler/adaptors/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/ingestion/src/metadata/profiler/adaptors/factory.py b/ingestion/src/metadata/profiler/adaptors/factory.py new file mode 100644 index 00000000000..116eab77dd7 --- /dev/null +++ b/ingestion/src/metadata/profiler/adaptors/factory.py @@ -0,0 +1,79 @@ +# Copyright 2024 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +factory for NoSQL adaptors that are used in the NoSQLProfiler. +""" +from typing import Callable, Dict, Type + +from pymongo import MongoClient + +from metadata.profiler.adaptors.mongodb import MongoDB +from metadata.profiler.adaptors.nosql_adaptor import NoSQLAdaptor + +NoSQLAdaptorConstructor = Callable[[any], NoSQLAdaptor] + + +class NoSQLAdaptorFactory: + """ + A factory class for creating NoSQL client instances. + + This class maintains a registry of callable constructors for different NoSQL client types. + The client types are registered with their corresponding constructors, + and can be created using the `construct` method. + + Attributes: + _clients (Dict[str, NoSQLClientConstructor]): A dictionary mapping client type names to their constructors. + + Methods: + register(source_class: Type, target_class: NoSQLClientConstructor): Register a client type with its constructor. + construct(source_client: any) -> NoSQLClient: Create a client instance of the type of the given source client. + """ + + def __init__(self): + """ + Initialize a new instance of NoSQLClientFactory. + """ + self._clients: Dict[str, NoSQLAdaptorConstructor] = {} + + def register(self, source_class: Type, target_class: NoSQLAdaptorConstructor): + """ + Register a client type with its constructor. + + Args: + source_class (Type): The class of the source client. + target_class (NoSQLClientConstructor): The constructor for the target client. + + Returns: + None + """ + self._clients[source_class.__name__] = target_class + + def construct(self, source_client: any) -> NoSQLAdaptor: + """ + Create a client instance of the type of the given source client. + + Args: + source_client (any): The source client instance. + + Returns: + NoSQLAdaptor: The created client instance. + + Raises: + ValueError: If the type of the source client is not registered. + """ + client_class = self._clients.get(type(source_client).__name__) + if not client_class: + raise ValueError(f"Unknown NoSQL source: {source_client.__name__}") + return client_class(source_client) + + +factory = NoSQLAdaptorFactory() +factory.register(MongoClient, MongoDB) diff --git a/ingestion/src/metadata/profiler/adaptors/mongodb.py b/ingestion/src/metadata/profiler/adaptors/mongodb.py new file mode 100644 index 00000000000..bf28402d998 --- /dev/null +++ b/ingestion/src/metadata/profiler/adaptors/mongodb.py @@ -0,0 +1,29 @@ +# Copyright 2024 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +MongoDB adaptor for the NoSQL profiler. +""" +from pymongo import MongoClient + +from metadata.generated.schema.entity.data.table import Table +from metadata.profiler.adaptors.nosql_adaptor import NoSQLAdaptor + + +class MongoDB(NoSQLAdaptor): + """A MongoDB client that serves as an adaptor for profiling data assets on MongoDB""" + + def __init__(self, client: MongoClient): + self.client = client + + def get_row_count(self, table: Table) -> int: + db = self.client[table.databaseSchema.name] + collection = db[table.name.__root__] + return collection.count_documents({}) diff --git a/ingestion/src/metadata/profiler/adaptors/nosql_adaptor.py b/ingestion/src/metadata/profiler/adaptors/nosql_adaptor.py new file mode 100644 index 00000000000..40eedb5d5d9 --- /dev/null +++ b/ingestion/src/metadata/profiler/adaptors/nosql_adaptor.py @@ -0,0 +1,22 @@ +# Copyright 2024 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +NoSQL adaptor for the NoSQL profiler. +""" +from abc import ABC, abstractmethod + +from metadata.generated.schema.entity.data.table import Table + + +class NoSQLAdaptor(ABC): + @abstractmethod + def get_row_count(self, table: Table) -> int: + raise NotImplementedError diff --git a/ingestion/src/metadata/profiler/api/models.py b/ingestion/src/metadata/profiler/api/models.py index 961d6fcd128..499f9b4f149 100644 --- a/ingestion/src/metadata/profiler/api/models.py +++ b/ingestion/src/metadata/profiler/api/models.py @@ -122,7 +122,7 @@ class ProfilerResponse(ConfigModel): class ThreadPoolMetrics(ConfigModel): - """thread pool metric""" + """A container for all metrics to be computed on the same thread.""" metrics: Union[List[Union[Type[Metric], CustomMetric]], Type[Metric]] metric_type: MetricTypes diff --git a/ingestion/src/metadata/profiler/interface/nosql/profiler_interface.py b/ingestion/src/metadata/profiler/interface/nosql/profiler_interface.py new file mode 100644 index 00000000000..f1a16961a38 --- /dev/null +++ b/ingestion/src/metadata/profiler/interface/nosql/profiler_interface.py @@ -0,0 +1,182 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# pylint: disable=arguments-differ + +""" +Interfaces with database for all database engine +supporting sqlalchemy abstraction layer +""" +import traceback +from typing import Dict, List, Optional, Type + +from sqlalchemy import Column + +from metadata.generated.schema.entity.data.table import TableData +from metadata.generated.schema.tests.customMetric import CustomMetric +from metadata.profiler.adaptors.factory import factory +from metadata.profiler.adaptors.nosql_adaptor import NoSQLAdaptor +from metadata.profiler.api.models import ThreadPoolMetrics +from metadata.profiler.interface.profiler_interface import ProfilerInterface +from metadata.profiler.metrics.core import Metric, MetricTypes +from metadata.profiler.metrics.registry import Metrics +from metadata.utils.logger import profiler_interface_registry_logger +from metadata.utils.sqa_like_column import SQALikeColumn + +logger = profiler_interface_registry_logger() + + +class NoSQLProfilerInterface(ProfilerInterface): + """ + Interface to interact with registry supporting + sqlalchemy. + """ + + # pylint: disable=too-many-arguments + + def _get_sampler(self): + return None + + def _compute_table_metrics( + self, + metrics: List[Type[Metric]], + runner: NoSQLAdaptor, + *args, + **kwargs, + ): + result = {} + for metric in metrics: + try: + fn = metric().nosql_fn(runner) + result[metric.name()] = fn(self.table) + except Exception as exc: + logger.debug( + f"{traceback.format_exc()}\n" + f"Error trying to compute metric {metric} for {self.table.fullyQualifiedName}: {exc}" + ) + raise RuntimeError( + f"Error trying to compute metric {metric.name()} for {self.table.fullyQualifiedName}: {exc}" + ) + return result + + def _compute_static_metrics( + self, + metrics: List[Metrics], + runner: List, + column, + *args, + **kwargs, + ): + return None + + def _compute_query_metrics( + self, + metric: Metrics, + runner, + *args, + **kwargs, + ): + return None + + def _compute_window_metrics( + self, + metrics: List[Metrics], + runner, + *args, + **kwargs, + ): + return None + + def _compute_system_metrics( + self, + metrics: Metrics, + runner: List, + *args, + **kwargs, + ): + return None + + def _compute_custom_metrics( + self, metrics: List[CustomMetric], runner, *args, **kwargs + ): + return None + + def compute_metrics( + self, + client: NoSQLAdaptor, + metric_func: ThreadPoolMetrics, + ): + """Run metrics in processor worker""" + logger.debug(f"Running profiler for {metric_func.table}") + try: + row = self._get_metric_fn[metric_func.metric_type.value]( + metric_func.metrics, + client, + ) + except Exception as exc: + name = f"{metric_func.column if metric_func.column is not None else metric_func.table}" + error = f"{name} metric_type.value: {exc}" + logger.error(error) + self.status.failed_profiler(error, traceback.format_exc()) + row = None + if metric_func.column is not None: + column = metric_func.column.name + self.status.scanned(f"{metric_func.table.name.__root__}.{column}") + else: + self.status.scanned(metric_func.table.name.__root__) + column = None + return row, column, metric_func.metric_type.value + + def fetch_sample_data(self, table, columns: SQALikeColumn) -> TableData: + return None + + def get_composed_metrics( + self, column: Column, metric: Metrics, column_results: Dict + ): + return None + + def get_hybrid_metrics( + self, column: Column, metric: Metrics, column_results: Dict, **kwargs + ): + return None + + def get_all_metrics( + self, + metric_funcs: List[ThreadPoolMetrics], + ): + """get all profiler metrics""" + profile_results = {"table": {}, "columns": {}} + runner = factory.construct(self.connection) + metric_list = [ + self.compute_metrics(runner, metric_func) for metric_func in metric_funcs + ] + for metric_result in metric_list: + profile, column, metric_type = metric_result + if profile: + if metric_type == MetricTypes.Table.value: + profile_results["table"].update(profile) + if metric_type == MetricTypes.System.value: + profile_results["system"] = profile + elif metric_type == MetricTypes.Custom.value and column is None: + profile_results["table"].update(profile) + else: + pass + return profile_results + + @property + def table(self): + """OM Table entity""" + return self.table_entity + + def get_columns(self) -> List[Optional[SQALikeColumn]]: + return [] + + def close(self): + self.connection.close() diff --git a/ingestion/src/metadata/profiler/interface/profiler_interface_factory.py b/ingestion/src/metadata/profiler/interface/profiler_interface_factory.py index 2733d0a8c4c..89c4c5a5ae0 100644 --- a/ingestion/src/metadata/profiler/interface/profiler_interface_factory.py +++ b/ingestion/src/metadata/profiler/interface/profiler_interface_factory.py @@ -30,6 +30,9 @@ from metadata.generated.schema.entity.services.connections.database.db2Connectio from metadata.generated.schema.entity.services.connections.database.mariaDBConnection import ( MariaDBConnection, ) +from metadata.generated.schema.entity.services.connections.database.mongoDBConnection import ( + MongoDBConnection, +) from metadata.generated.schema.entity.services.connections.database.singleStoreConnection import ( SingleStoreConnection, ) @@ -43,6 +46,7 @@ from metadata.generated.schema.entity.services.connections.database.unityCatalog UnityCatalogConnection, ) from metadata.generated.schema.entity.services.databaseService import DatabaseConnection +from metadata.profiler.interface.nosql.profiler_interface import NoSQLProfilerInterface from metadata.profiler.interface.pandas.profiler_interface import ( PandasProfilerInterface, ) @@ -118,6 +122,7 @@ profilers = { UnityCatalogConnection.__name__: UnityCatalogProfilerInterface, DatabricksConnection.__name__: DatabricksProfilerInterface, Db2Connection.__name__: DB2ProfilerInterface, + MongoDBConnection.__name__: NoSQLProfilerInterface, } profiler_interface_factory.register_many(profilers) diff --git a/ingestion/src/metadata/profiler/metrics/core.py b/ingestion/src/metadata/profiler/metrics/core.py index 9cc219777a5..70e387a7dae 100644 --- a/ingestion/src/metadata/profiler/metrics/core.py +++ b/ingestion/src/metadata/profiler/metrics/core.py @@ -18,11 +18,14 @@ Metric Core definitions from abc import ABC, abstractmethod from enum import Enum from functools import wraps -from typing import Any, Dict, Optional, Tuple, TypeVar +from typing import Any, Callable, Dict, Optional, Tuple, TypeVar from sqlalchemy import Column from sqlalchemy.orm import DeclarativeMeta, Session +from metadata.generated.schema.entity.data.table import Table +from metadata.profiler.adaptors.nosql_adaptor import NoSQLAdaptor + # When creating complex metrics, use inherit_cache = CACHE CACHE = True @@ -87,6 +90,9 @@ def add_props(**kwargs): return inner +T = TypeVar("T") + + class Metric(ABC): """ Parent class metric @@ -153,6 +159,13 @@ class Metric(ABC): """ return self.col.type.python_type if self.col else None + def nosql_fn(self, client: NoSQLAdaptor) -> Callable[[Table], Optional[T]]: + """ + Return the function to be used for NoSQL clients to calculate the metric. + By default, returns a "do nothing" function that returns None. + """ + return lambda table: None + TMetric = TypeVar("TMetric", bound=Metric) diff --git a/ingestion/src/metadata/profiler/metrics/static/row_count.py b/ingestion/src/metadata/profiler/metrics/static/row_count.py index 6891ab43b02..dd2a221de68 100644 --- a/ingestion/src/metadata/profiler/metrics/static/row_count.py +++ b/ingestion/src/metadata/profiler/metrics/static/row_count.py @@ -12,11 +12,12 @@ """ Table Count Metric definition """ -# pylint: disable=duplicate-code - +from typing import Callable from sqlalchemy import func +from metadata.generated.schema.entity.data.table import Table +from metadata.profiler.adaptors.nosql_adaptor import NoSQLAdaptor from metadata.profiler.metrics.core import StaticMetric, _label @@ -50,3 +51,7 @@ class RowCount(StaticMetric): def df_fn(self, dfs=None): """pandas function""" return sum(len(df.index) for df in dfs) + + @classmethod + def nosql_fn(cls, client: NoSQLAdaptor) -> Callable[[Table], int]: + return client.get_row_count diff --git a/ingestion/src/metadata/profiler/processor/core.py b/ingestion/src/metadata/profiler/processor/core.py index ff0fd7cc055..5081b43304d 100644 --- a/ingestion/src/metadata/profiler/processor/core.py +++ b/ingestion/src/metadata/profiler/processor/core.py @@ -196,7 +196,10 @@ class Profiler(Generic[TMetric]): CreateTableProfileRequest: """ for attrs, val in profile.tableProfile: - if attrs not in {"timestamp", "profileSample", "profileSampleType"} and val: + if ( + attrs not in {"timestamp", "profileSample", "profileSampleType"} + and val is not None + ): return for col_element in profile.columnProfile: diff --git a/ingestion/src/metadata/utils/test_utils.py b/ingestion/src/metadata/utils/test_utils.py new file mode 100644 index 00000000000..999f0f30e48 --- /dev/null +++ b/ingestion/src/metadata/utils/test_utils.py @@ -0,0 +1,65 @@ +# Copyright 2024 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Utility functions for testing +""" +from contextlib import contextmanager + + +class MultipleException(Exception): + def __init__(self, exceptions): + self.exceptions = exceptions + super().__init__(f"Multiple exceptions occurred: {exceptions}") + + +class ErrorHandler: + """ + A context manager that accumulates errors and raises them at the end of the block. + Useful for cleaning up resources and ensuring that all errors are raised at the end of a test. + Example: + ``` + from metadata.utils.test_utils import accumulate_errors + with accumulate_errors() as error_handler: + error_handler.try_execute(lambda : 1 / 0) + error_handler.try_execute(print, "Hello, World!") + ``` + + ``` + > Hello, World! + > Traceback (most recent call last): + > ... + > ZeroDivisionError: division by zero + ``` + """ + + def __init__(self): + self.errors = [] + + def try_execute(self, func, *args, **kwargs): + try: + func(*args, **kwargs) + except Exception as e: + self.errors.append(e) + + def raise_if_errors(self): + if len(self.errors) == 1: + raise self.errors[0] + if len(self.errors) > 1: + raise MultipleException(self.errors) + + +@contextmanager +def accumulate_errors(): + error_handler = ErrorHandler() + try: + yield error_handler + finally: + error_handler.raise_if_errors() diff --git a/ingestion/tests/integration/profiler/__init__.py b/ingestion/tests/integration/profiler/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/ingestion/tests/integration/profiler/test_nosql_profiler.py b/ingestion/tests/integration/profiler/test_nosql_profiler.py new file mode 100644 index 00000000000..ad0249b81fa --- /dev/null +++ b/ingestion/tests/integration/profiler/test_nosql_profiler.py @@ -0,0 +1,179 @@ +# Copyright 2024 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Test the NoSQL profiler using a MongoDB container +To run this we need OpenMetadata server up and running. +No sample data is required beforehand + +Test Steps: + +1. Start a MongoDB container +2. Ingest data into OpenMetadata +3. Run the profiler workflow +4. Verify the profiler output +5. Tear down the MongoDB container and delete the service from OpenMetadata +""" + +from copy import deepcopy +from datetime import datetime, timedelta +from functools import partial +from pathlib import Path +from unittest import TestCase + +from pymongo import MongoClient +from testcontainers.mongodb import MongoDbContainer + +from ingestion.tests.integration.integration_base import int_admin_ometa +from metadata.generated.schema.entity.data.table import ColumnProfile +from metadata.generated.schema.entity.services.databaseService import DatabaseService +from metadata.utils.helpers import datetime_to_ts +from metadata.utils.test_utils import accumulate_errors +from metadata.utils.time_utils import get_end_of_day_timestamp_mill +from metadata.workflow.metadata import MetadataWorkflow +from metadata.workflow.profiler import ProfilerWorkflow +from metadata.workflow.workflow_output_handler import print_status + +SERVICE_NAME = Path(__file__).stem + + +def get_ingestion_config(mongo_port: str, mongo_user: str, mongo_pass: str): + return { + "source": { + "type": "mongodb", + "serviceName": SERVICE_NAME, + "serviceConnection": { + "config": { + "type": "MongoDB", + "hostPort": f"localhost:{mongo_port}", + "username": mongo_user, + "password": mongo_pass, + } + }, + "sourceConfig": {"config": {"type": "DatabaseMetadata"}}, + }, + "sink": {"type": "metadata-rest", "config": {}}, + "workflowConfig": { + "openMetadataServerConfig": { + "hostPort": "http://localhost:8585/api", + "authProvider": "openmetadata", + "securityConfig": { + "jwtToken": "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg" + }, + } + }, + } + + +TEST_DATABASE = "test-database" +EMPTY_COLLECTION = "empty-collection" +TEST_COLLECTION = "test-collection" +TEST_DATA = [ + { + "first_name": "John", + "last_name": "Doe", + "age": 30, + }, + { + "first_name": "Jane", + "last_name": "Doe", + "age": 25, + }, + { + "first_name": "John", + "last_name": "Smith", + "age": 35, + }, +] + + +class NoSQLProfiler(TestCase): + """datalake profiler E2E test""" + + @classmethod + def setUpClass(cls) -> None: + cls.metadata = int_admin_ometa() + + def setUp(self) -> None: + self.mongo_container = MongoDbContainer("mongo:7.0.5-jammy") + self.mongo_container.start() + self.client = MongoClient(self.mongo_container.get_connection_url()) + self.db = self.client[TEST_DATABASE] + self.collection = self.db[TEST_COLLECTION] + self.collection.insert_many(TEST_DATA) + self.db.create_collection(EMPTY_COLLECTION) + self.ingestion_config = get_ingestion_config( + self.mongo_container.get_exposed_port("27017"), "test", "test" + ) + ingestion_workflow = MetadataWorkflow.create( + self.ingestion_config, + ) + ingestion_workflow.execute() + ingestion_workflow.raise_from_status() + print_status(ingestion_workflow) + ingestion_workflow.stop() + + def tearDown(self): + with accumulate_errors() as error_handler: + error_handler.try_execute(partial(self.mongo_container.stop, force=True)) + error_handler.try_execute(self.delete_service) + + def delete_service(self): + service_id = str( + self.metadata.get_by_name( + entity=DatabaseService, fqn=SERVICE_NAME + ).id.__root__ + ) + self.metadata.delete( + entity=DatabaseService, + entity_id=service_id, + recursive=True, + hard_delete=True, + ) + + def test_setup_teardown(self): + pass + + def test_row_count(self): + workflow_config = deepcopy(self.ingestion_config) + workflow_config["source"]["sourceConfig"]["config"].update( + { + "type": "Profiler", + } + ) + workflow_config["processor"] = { + "type": "orm-profiler", + "config": {}, + } + profiler_workflow = ProfilerWorkflow.create(workflow_config) + profiler_workflow.execute() + status = profiler_workflow.result_status() + profiler_workflow.stop() + + assert status == 0 + + expectations = {TEST_COLLECTION: 3, EMPTY_COLLECTION: 0} + + for collection, expected_row_count in expectations.items(): + collection_profile = self.metadata.get_profile_data( + f"{SERVICE_NAME}.default.{TEST_DATABASE}.{collection}", + datetime_to_ts(datetime.now() - timedelta(seconds=10)), + get_end_of_day_timestamp_mill(), + ) + assert collection_profile.entities + assert collection_profile.entities[-1].rowCount == expected_row_count + column_profile = self.metadata.get_profile_data( + f"{SERVICE_NAME}.default.{TEST_DATABASE}.{TEST_COLLECTION}.age", + datetime_to_ts(datetime.now() - timedelta(seconds=10)), + get_end_of_day_timestamp_mill(), + profile_type=ColumnProfile, + ) + assert len(column_profile.entities) == 0 diff --git a/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/database/mongoDBConnection.json b/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/database/mongoDBConnection.json index 8fc6a817e2e..7b453a16653 100644 --- a/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/database/mongoDBConnection.json +++ b/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/database/mongoDBConnection.json @@ -66,6 +66,10 @@ "supportsMetadataExtraction": { "title": "Supports Metadata Extraction", "$ref": "../connectionBasicType.json#/definitions/supportsMetadataExtraction" + }, + "supportsProfiler": { + "title": "Supports Profiler", + "$ref": "../connectionBasicType.json#/definitions/supportsProfiler" } }, "required": ["hostPort"], diff --git a/openmetadata-ui/src/main/resources/ui/public/locales/en-US/Database/MongoDB.md b/openmetadata-ui/src/main/resources/ui/public/locales/en-US/Database/MongoDB.md index ba93db08c17..8b49227fc8d 100644 --- a/openmetadata-ui/src/main/resources/ui/public/locales/en-US/Database/MongoDB.md +++ b/openmetadata-ui/src/main/resources/ui/public/locales/en-US/Database/MongoDB.md @@ -1,4 +1,4 @@ -# Hive +# MongoDB In this section, we provide guides and references to use the MongoDB connector. You can view the full documentation for MongoDB [here](https://docs.open-metadata.org/connectors/database/mongo). ## Requirements