diff --git a/docker/validate_compose.py b/docker/validate_compose.py index c90b46b311e..812d5e49596 100644 --- a/docker/validate_compose.py +++ b/docker/validate_compose.py @@ -23,9 +23,11 @@ def get_last_run_info() -> Tuple[str, str]: while retries < max_retries: log_ansi_encoded_string(message="Waiting for DAG Run data...") time.sleep(5) - runs = requests.get( + res = requests.get( "http://localhost:8080/api/v1/dags/sample_data/dagRuns", auth=BASIC_AUTH, timeout=REQUESTS_TIMEOUT - ).json() + ) + res.raise_for_status() + runs = res.json() dag_runs = runs.get("dag_runs") if dag_runs[0].get("dag_run_id"): return dag_runs[0].get("dag_run_id"), "success" diff --git a/ingestion/setup.py b/ingestion/setup.py index fc47b5330e6..13e9498b90a 100644 --- a/ingestion/setup.py +++ b/ingestion/setup.py @@ -313,6 +313,7 @@ test = { VERSIONS["snowflake"], VERSIONS["elasticsearch8"], VERSIONS["giturlparse"], + "testcontainers==3.7.1", } e2e_test = { diff --git a/docker/__init__.py b/ingestion/src/metadata/__init__.py similarity index 100% rename from docker/__init__.py rename to ingestion/src/metadata/__init__.py diff --git a/ingestion/src/metadata/great_expectations/action.py b/ingestion/src/metadata/great_expectations/action.py index 1ab7feb0131..7a475cc6a10 100644 --- a/ingestion/src/metadata/great_expectations/action.py +++ b/ingestion/src/metadata/great_expectations/action.py @@ -107,7 +107,7 @@ class OpenMetadataValidationAction(ValidationAction): self.config_file_path = config_file_path self.ometa_conn = self._create_ometa_connection() - def _run( # pylint: disable=unused-argument + def _run( # pylint: disable=unused-argument,arguments-renamed self, validation_result_suite: ExpectationSuiteValidationResult, validation_result_suite_identifier: Union[ @@ -124,7 +124,6 @@ class OpenMetadataValidationAction(ValidationAction): validation_result_suite: result suite returned when checkpoint is ran validation_result_suite_identifier: type of result suite data_asset: - payload: expectation_suite_identifier: type of expectation suite checkpoint_identifier: identifier for the checkpoint """ diff --git a/ingestion/src/metadata/profiler/__init__.py b/ingestion/src/metadata/profiler/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/ingestion/src/metadata/profiler/adaptors/__init__.py b/ingestion/src/metadata/profiler/adaptors/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/ingestion/src/metadata/profiler/adaptors/factory.py b/ingestion/src/metadata/profiler/adaptors/factory.py new file mode 100644 index 00000000000..116eab77dd7 --- /dev/null +++ b/ingestion/src/metadata/profiler/adaptors/factory.py @@ -0,0 +1,79 @@ +# Copyright 2024 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +factory for NoSQL adaptors that are used in the NoSQLProfiler. +""" +from typing import Callable, Dict, Type + +from pymongo import MongoClient + +from metadata.profiler.adaptors.mongodb import MongoDB +from metadata.profiler.adaptors.nosql_adaptor import NoSQLAdaptor + +NoSQLAdaptorConstructor = Callable[[any], NoSQLAdaptor] + + +class NoSQLAdaptorFactory: + """ + A factory class for creating NoSQL client instances. + + This class maintains a registry of callable constructors for different NoSQL client types. + The client types are registered with their corresponding constructors, + and can be created using the `construct` method. + + Attributes: + _clients (Dict[str, NoSQLClientConstructor]): A dictionary mapping client type names to their constructors. + + Methods: + register(source_class: Type, target_class: NoSQLClientConstructor): Register a client type with its constructor. + construct(source_client: any) -> NoSQLClient: Create a client instance of the type of the given source client. + """ + + def __init__(self): + """ + Initialize a new instance of NoSQLClientFactory. + """ + self._clients: Dict[str, NoSQLAdaptorConstructor] = {} + + def register(self, source_class: Type, target_class: NoSQLAdaptorConstructor): + """ + Register a client type with its constructor. + + Args: + source_class (Type): The class of the source client. + target_class (NoSQLClientConstructor): The constructor for the target client. + + Returns: + None + """ + self._clients[source_class.__name__] = target_class + + def construct(self, source_client: any) -> NoSQLAdaptor: + """ + Create a client instance of the type of the given source client. + + Args: + source_client (any): The source client instance. + + Returns: + NoSQLAdaptor: The created client instance. + + Raises: + ValueError: If the type of the source client is not registered. + """ + client_class = self._clients.get(type(source_client).__name__) + if not client_class: + raise ValueError(f"Unknown NoSQL source: {source_client.__name__}") + return client_class(source_client) + + +factory = NoSQLAdaptorFactory() +factory.register(MongoClient, MongoDB) diff --git a/ingestion/src/metadata/profiler/adaptors/mongodb.py b/ingestion/src/metadata/profiler/adaptors/mongodb.py new file mode 100644 index 00000000000..bf28402d998 --- /dev/null +++ b/ingestion/src/metadata/profiler/adaptors/mongodb.py @@ -0,0 +1,29 @@ +# Copyright 2024 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +MongoDB adaptor for the NoSQL profiler. +""" +from pymongo import MongoClient + +from metadata.generated.schema.entity.data.table import Table +from metadata.profiler.adaptors.nosql_adaptor import NoSQLAdaptor + + +class MongoDB(NoSQLAdaptor): + """A MongoDB client that serves as an adaptor for profiling data assets on MongoDB""" + + def __init__(self, client: MongoClient): + self.client = client + + def get_row_count(self, table: Table) -> int: + db = self.client[table.databaseSchema.name] + collection = db[table.name.__root__] + return collection.count_documents({}) diff --git a/ingestion/src/metadata/profiler/adaptors/nosql_adaptor.py b/ingestion/src/metadata/profiler/adaptors/nosql_adaptor.py new file mode 100644 index 00000000000..40eedb5d5d9 --- /dev/null +++ b/ingestion/src/metadata/profiler/adaptors/nosql_adaptor.py @@ -0,0 +1,22 @@ +# Copyright 2024 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +NoSQL adaptor for the NoSQL profiler. +""" +from abc import ABC, abstractmethod + +from metadata.generated.schema.entity.data.table import Table + + +class NoSQLAdaptor(ABC): + @abstractmethod + def get_row_count(self, table: Table) -> int: + raise NotImplementedError diff --git a/ingestion/src/metadata/profiler/api/models.py b/ingestion/src/metadata/profiler/api/models.py index 961d6fcd128..499f9b4f149 100644 --- a/ingestion/src/metadata/profiler/api/models.py +++ b/ingestion/src/metadata/profiler/api/models.py @@ -122,7 +122,7 @@ class ProfilerResponse(ConfigModel): class ThreadPoolMetrics(ConfigModel): - """thread pool metric""" + """A container for all metrics to be computed on the same thread.""" metrics: Union[List[Union[Type[Metric], CustomMetric]], Type[Metric]] metric_type: MetricTypes diff --git a/ingestion/src/metadata/profiler/interface/nosql/profiler_interface.py b/ingestion/src/metadata/profiler/interface/nosql/profiler_interface.py new file mode 100644 index 00000000000..f1a16961a38 --- /dev/null +++ b/ingestion/src/metadata/profiler/interface/nosql/profiler_interface.py @@ -0,0 +1,182 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# pylint: disable=arguments-differ + +""" +Interfaces with database for all database engine +supporting sqlalchemy abstraction layer +""" +import traceback +from typing import Dict, List, Optional, Type + +from sqlalchemy import Column + +from metadata.generated.schema.entity.data.table import TableData +from metadata.generated.schema.tests.customMetric import CustomMetric +from metadata.profiler.adaptors.factory import factory +from metadata.profiler.adaptors.nosql_adaptor import NoSQLAdaptor +from metadata.profiler.api.models import ThreadPoolMetrics +from metadata.profiler.interface.profiler_interface import ProfilerInterface +from metadata.profiler.metrics.core import Metric, MetricTypes +from metadata.profiler.metrics.registry import Metrics +from metadata.utils.logger import profiler_interface_registry_logger +from metadata.utils.sqa_like_column import SQALikeColumn + +logger = profiler_interface_registry_logger() + + +class NoSQLProfilerInterface(ProfilerInterface): + """ + Interface to interact with registry supporting + sqlalchemy. + """ + + # pylint: disable=too-many-arguments + + def _get_sampler(self): + return None + + def _compute_table_metrics( + self, + metrics: List[Type[Metric]], + runner: NoSQLAdaptor, + *args, + **kwargs, + ): + result = {} + for metric in metrics: + try: + fn = metric().nosql_fn(runner) + result[metric.name()] = fn(self.table) + except Exception as exc: + logger.debug( + f"{traceback.format_exc()}\n" + f"Error trying to compute metric {metric} for {self.table.fullyQualifiedName}: {exc}" + ) + raise RuntimeError( + f"Error trying to compute metric {metric.name()} for {self.table.fullyQualifiedName}: {exc}" + ) + return result + + def _compute_static_metrics( + self, + metrics: List[Metrics], + runner: List, + column, + *args, + **kwargs, + ): + return None + + def _compute_query_metrics( + self, + metric: Metrics, + runner, + *args, + **kwargs, + ): + return None + + def _compute_window_metrics( + self, + metrics: List[Metrics], + runner, + *args, + **kwargs, + ): + return None + + def _compute_system_metrics( + self, + metrics: Metrics, + runner: List, + *args, + **kwargs, + ): + return None + + def _compute_custom_metrics( + self, metrics: List[CustomMetric], runner, *args, **kwargs + ): + return None + + def compute_metrics( + self, + client: NoSQLAdaptor, + metric_func: ThreadPoolMetrics, + ): + """Run metrics in processor worker""" + logger.debug(f"Running profiler for {metric_func.table}") + try: + row = self._get_metric_fn[metric_func.metric_type.value]( + metric_func.metrics, + client, + ) + except Exception as exc: + name = f"{metric_func.column if metric_func.column is not None else metric_func.table}" + error = f"{name} metric_type.value: {exc}" + logger.error(error) + self.status.failed_profiler(error, traceback.format_exc()) + row = None + if metric_func.column is not None: + column = metric_func.column.name + self.status.scanned(f"{metric_func.table.name.__root__}.{column}") + else: + self.status.scanned(metric_func.table.name.__root__) + column = None + return row, column, metric_func.metric_type.value + + def fetch_sample_data(self, table, columns: SQALikeColumn) -> TableData: + return None + + def get_composed_metrics( + self, column: Column, metric: Metrics, column_results: Dict + ): + return None + + def get_hybrid_metrics( + self, column: Column, metric: Metrics, column_results: Dict, **kwargs + ): + return None + + def get_all_metrics( + self, + metric_funcs: List[ThreadPoolMetrics], + ): + """get all profiler metrics""" + profile_results = {"table": {}, "columns": {}} + runner = factory.construct(self.connection) + metric_list = [ + self.compute_metrics(runner, metric_func) for metric_func in metric_funcs + ] + for metric_result in metric_list: + profile, column, metric_type = metric_result + if profile: + if metric_type == MetricTypes.Table.value: + profile_results["table"].update(profile) + if metric_type == MetricTypes.System.value: + profile_results["system"] = profile + elif metric_type == MetricTypes.Custom.value and column is None: + profile_results["table"].update(profile) + else: + pass + return profile_results + + @property + def table(self): + """OM Table entity""" + return self.table_entity + + def get_columns(self) -> List[Optional[SQALikeColumn]]: + return [] + + def close(self): + self.connection.close() diff --git a/ingestion/src/metadata/profiler/interface/profiler_interface_factory.py b/ingestion/src/metadata/profiler/interface/profiler_interface_factory.py index 2733d0a8c4c..89c4c5a5ae0 100644 --- a/ingestion/src/metadata/profiler/interface/profiler_interface_factory.py +++ b/ingestion/src/metadata/profiler/interface/profiler_interface_factory.py @@ -30,6 +30,9 @@ from metadata.generated.schema.entity.services.connections.database.db2Connectio from metadata.generated.schema.entity.services.connections.database.mariaDBConnection import ( MariaDBConnection, ) +from metadata.generated.schema.entity.services.connections.database.mongoDBConnection import ( + MongoDBConnection, +) from metadata.generated.schema.entity.services.connections.database.singleStoreConnection import ( SingleStoreConnection, ) @@ -43,6 +46,7 @@ from metadata.generated.schema.entity.services.connections.database.unityCatalog UnityCatalogConnection, ) from metadata.generated.schema.entity.services.databaseService import DatabaseConnection +from metadata.profiler.interface.nosql.profiler_interface import NoSQLProfilerInterface from metadata.profiler.interface.pandas.profiler_interface import ( PandasProfilerInterface, ) @@ -118,6 +122,7 @@ profilers = { UnityCatalogConnection.__name__: UnityCatalogProfilerInterface, DatabricksConnection.__name__: DatabricksProfilerInterface, Db2Connection.__name__: DB2ProfilerInterface, + MongoDBConnection.__name__: NoSQLProfilerInterface, } profiler_interface_factory.register_many(profilers) diff --git a/ingestion/src/metadata/profiler/metrics/core.py b/ingestion/src/metadata/profiler/metrics/core.py index 9cc219777a5..70e387a7dae 100644 --- a/ingestion/src/metadata/profiler/metrics/core.py +++ b/ingestion/src/metadata/profiler/metrics/core.py @@ -18,11 +18,14 @@ Metric Core definitions from abc import ABC, abstractmethod from enum import Enum from functools import wraps -from typing import Any, Dict, Optional, Tuple, TypeVar +from typing import Any, Callable, Dict, Optional, Tuple, TypeVar from sqlalchemy import Column from sqlalchemy.orm import DeclarativeMeta, Session +from metadata.generated.schema.entity.data.table import Table +from metadata.profiler.adaptors.nosql_adaptor import NoSQLAdaptor + # When creating complex metrics, use inherit_cache = CACHE CACHE = True @@ -87,6 +90,9 @@ def add_props(**kwargs): return inner +T = TypeVar("T") + + class Metric(ABC): """ Parent class metric @@ -153,6 +159,13 @@ class Metric(ABC): """ return self.col.type.python_type if self.col else None + def nosql_fn(self, client: NoSQLAdaptor) -> Callable[[Table], Optional[T]]: + """ + Return the function to be used for NoSQL clients to calculate the metric. + By default, returns a "do nothing" function that returns None. + """ + return lambda table: None + TMetric = TypeVar("TMetric", bound=Metric) diff --git a/ingestion/src/metadata/profiler/metrics/static/row_count.py b/ingestion/src/metadata/profiler/metrics/static/row_count.py index 6891ab43b02..dd2a221de68 100644 --- a/ingestion/src/metadata/profiler/metrics/static/row_count.py +++ b/ingestion/src/metadata/profiler/metrics/static/row_count.py @@ -12,11 +12,12 @@ """ Table Count Metric definition """ -# pylint: disable=duplicate-code - +from typing import Callable from sqlalchemy import func +from metadata.generated.schema.entity.data.table import Table +from metadata.profiler.adaptors.nosql_adaptor import NoSQLAdaptor from metadata.profiler.metrics.core import StaticMetric, _label @@ -50,3 +51,7 @@ class RowCount(StaticMetric): def df_fn(self, dfs=None): """pandas function""" return sum(len(df.index) for df in dfs) + + @classmethod + def nosql_fn(cls, client: NoSQLAdaptor) -> Callable[[Table], int]: + return client.get_row_count diff --git a/ingestion/src/metadata/profiler/processor/core.py b/ingestion/src/metadata/profiler/processor/core.py index ff0fd7cc055..5081b43304d 100644 --- a/ingestion/src/metadata/profiler/processor/core.py +++ b/ingestion/src/metadata/profiler/processor/core.py @@ -196,7 +196,10 @@ class Profiler(Generic[TMetric]): CreateTableProfileRequest: """ for attrs, val in profile.tableProfile: - if attrs not in {"timestamp", "profileSample", "profileSampleType"} and val: + if ( + attrs not in {"timestamp", "profileSample", "profileSampleType"} + and val is not None + ): return for col_element in profile.columnProfile: diff --git a/ingestion/src/metadata/utils/test_utils.py b/ingestion/src/metadata/utils/test_utils.py new file mode 100644 index 00000000000..999f0f30e48 --- /dev/null +++ b/ingestion/src/metadata/utils/test_utils.py @@ -0,0 +1,65 @@ +# Copyright 2024 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Utility functions for testing +""" +from contextlib import contextmanager + + +class MultipleException(Exception): + def __init__(self, exceptions): + self.exceptions = exceptions + super().__init__(f"Multiple exceptions occurred: {exceptions}") + + +class ErrorHandler: + """ + A context manager that accumulates errors and raises them at the end of the block. + Useful for cleaning up resources and ensuring that all errors are raised at the end of a test. + Example: + ``` + from metadata.utils.test_utils import accumulate_errors + with accumulate_errors() as error_handler: + error_handler.try_execute(lambda : 1 / 0) + error_handler.try_execute(print, "Hello, World!") + ``` + + ``` + > Hello, World! + > Traceback (most recent call last): + > ... + > ZeroDivisionError: division by zero + ``` + """ + + def __init__(self): + self.errors = [] + + def try_execute(self, func, *args, **kwargs): + try: + func(*args, **kwargs) + except Exception as e: + self.errors.append(e) + + def raise_if_errors(self): + if len(self.errors) == 1: + raise self.errors[0] + if len(self.errors) > 1: + raise MultipleException(self.errors) + + +@contextmanager +def accumulate_errors(): + error_handler = ErrorHandler() + try: + yield error_handler + finally: + error_handler.raise_if_errors() diff --git a/ingestion/tests/integration/profiler/__init__.py b/ingestion/tests/integration/profiler/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/ingestion/tests/integration/profiler/test_nosql_profiler.py b/ingestion/tests/integration/profiler/test_nosql_profiler.py new file mode 100644 index 00000000000..ad0249b81fa --- /dev/null +++ b/ingestion/tests/integration/profiler/test_nosql_profiler.py @@ -0,0 +1,179 @@ +# Copyright 2024 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Test the NoSQL profiler using a MongoDB container +To run this we need OpenMetadata server up and running. +No sample data is required beforehand + +Test Steps: + +1. Start a MongoDB container +2. Ingest data into OpenMetadata +3. Run the profiler workflow +4. Verify the profiler output +5. Tear down the MongoDB container and delete the service from OpenMetadata +""" + +from copy import deepcopy +from datetime import datetime, timedelta +from functools import partial +from pathlib import Path +from unittest import TestCase + +from pymongo import MongoClient +from testcontainers.mongodb import MongoDbContainer + +from ingestion.tests.integration.integration_base import int_admin_ometa +from metadata.generated.schema.entity.data.table import ColumnProfile +from metadata.generated.schema.entity.services.databaseService import DatabaseService +from metadata.utils.helpers import datetime_to_ts +from metadata.utils.test_utils import accumulate_errors +from metadata.utils.time_utils import get_end_of_day_timestamp_mill +from metadata.workflow.metadata import MetadataWorkflow +from metadata.workflow.profiler import ProfilerWorkflow +from metadata.workflow.workflow_output_handler import print_status + +SERVICE_NAME = Path(__file__).stem + + +def get_ingestion_config(mongo_port: str, mongo_user: str, mongo_pass: str): + return { + "source": { + "type": "mongodb", + "serviceName": SERVICE_NAME, + "serviceConnection": { + "config": { + "type": "MongoDB", + "hostPort": f"localhost:{mongo_port}", + "username": mongo_user, + "password": mongo_pass, + } + }, + "sourceConfig": {"config": {"type": "DatabaseMetadata"}}, + }, + "sink": {"type": "metadata-rest", "config": {}}, + "workflowConfig": { + "openMetadataServerConfig": { + "hostPort": "http://localhost:8585/api", + "authProvider": "openmetadata", + "securityConfig": { + "jwtToken": "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg" + }, + } + }, + } + + +TEST_DATABASE = "test-database" +EMPTY_COLLECTION = "empty-collection" +TEST_COLLECTION = "test-collection" +TEST_DATA = [ + { + "first_name": "John", + "last_name": "Doe", + "age": 30, + }, + { + "first_name": "Jane", + "last_name": "Doe", + "age": 25, + }, + { + "first_name": "John", + "last_name": "Smith", + "age": 35, + }, +] + + +class NoSQLProfiler(TestCase): + """datalake profiler E2E test""" + + @classmethod + def setUpClass(cls) -> None: + cls.metadata = int_admin_ometa() + + def setUp(self) -> None: + self.mongo_container = MongoDbContainer("mongo:7.0.5-jammy") + self.mongo_container.start() + self.client = MongoClient(self.mongo_container.get_connection_url()) + self.db = self.client[TEST_DATABASE] + self.collection = self.db[TEST_COLLECTION] + self.collection.insert_many(TEST_DATA) + self.db.create_collection(EMPTY_COLLECTION) + self.ingestion_config = get_ingestion_config( + self.mongo_container.get_exposed_port("27017"), "test", "test" + ) + ingestion_workflow = MetadataWorkflow.create( + self.ingestion_config, + ) + ingestion_workflow.execute() + ingestion_workflow.raise_from_status() + print_status(ingestion_workflow) + ingestion_workflow.stop() + + def tearDown(self): + with accumulate_errors() as error_handler: + error_handler.try_execute(partial(self.mongo_container.stop, force=True)) + error_handler.try_execute(self.delete_service) + + def delete_service(self): + service_id = str( + self.metadata.get_by_name( + entity=DatabaseService, fqn=SERVICE_NAME + ).id.__root__ + ) + self.metadata.delete( + entity=DatabaseService, + entity_id=service_id, + recursive=True, + hard_delete=True, + ) + + def test_setup_teardown(self): + pass + + def test_row_count(self): + workflow_config = deepcopy(self.ingestion_config) + workflow_config["source"]["sourceConfig"]["config"].update( + { + "type": "Profiler", + } + ) + workflow_config["processor"] = { + "type": "orm-profiler", + "config": {}, + } + profiler_workflow = ProfilerWorkflow.create(workflow_config) + profiler_workflow.execute() + status = profiler_workflow.result_status() + profiler_workflow.stop() + + assert status == 0 + + expectations = {TEST_COLLECTION: 3, EMPTY_COLLECTION: 0} + + for collection, expected_row_count in expectations.items(): + collection_profile = self.metadata.get_profile_data( + f"{SERVICE_NAME}.default.{TEST_DATABASE}.{collection}", + datetime_to_ts(datetime.now() - timedelta(seconds=10)), + get_end_of_day_timestamp_mill(), + ) + assert collection_profile.entities + assert collection_profile.entities[-1].rowCount == expected_row_count + column_profile = self.metadata.get_profile_data( + f"{SERVICE_NAME}.default.{TEST_DATABASE}.{TEST_COLLECTION}.age", + datetime_to_ts(datetime.now() - timedelta(seconds=10)), + get_end_of_day_timestamp_mill(), + profile_type=ColumnProfile, + ) + assert len(column_profile.entities) == 0 diff --git a/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/database/mongoDBConnection.json b/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/database/mongoDBConnection.json index 8fc6a817e2e..7b453a16653 100644 --- a/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/database/mongoDBConnection.json +++ b/openmetadata-spec/src/main/resources/json/schema/entity/services/connections/database/mongoDBConnection.json @@ -66,6 +66,10 @@ "supportsMetadataExtraction": { "title": "Supports Metadata Extraction", "$ref": "../connectionBasicType.json#/definitions/supportsMetadataExtraction" + }, + "supportsProfiler": { + "title": "Supports Profiler", + "$ref": "../connectionBasicType.json#/definitions/supportsProfiler" } }, "required": ["hostPort"], diff --git a/openmetadata-ui/src/main/resources/ui/public/locales/en-US/Database/MongoDB.md b/openmetadata-ui/src/main/resources/ui/public/locales/en-US/Database/MongoDB.md index ba93db08c17..8b49227fc8d 100644 --- a/openmetadata-ui/src/main/resources/ui/public/locales/en-US/Database/MongoDB.md +++ b/openmetadata-ui/src/main/resources/ui/public/locales/en-US/Database/MongoDB.md @@ -1,4 +1,4 @@ -# Hive +# MongoDB In this section, we provide guides and references to use the MongoDB connector. You can view the full documentation for MongoDB [here](https://docs.open-metadata.org/connectors/database/mongo). ## Requirements