From ff2ecc56f24669c9643a9a27ff815e644c71c05f Mon Sep 17 00:00:00 2001 From: Imri Paran Date: Thu, 22 Feb 2024 16:31:58 +0100 Subject: [PATCH] MINOR: add MongoDB sample data (#15237) * feat(nosql-profiler): row count 1. Implemented the NoSQLProfilerInterface as an entrypoint for the nosql profiler. 2. Added the NoSQLMetric as an abstract class. 3. Implemented the interface for the MongoDB database source. 4. Implemented an e2e test using testcontainers. * added profiler support for mongodb connection * doc * use int_admin_ometa in test setup * - fixed linting issue in gx - removed unused inheritance * moved the nosql function into the metric class * formatting * validate_compose: raise exception for bad status code. * fixed import * format * feat(nosql-profiler): added sample data 1. Implemented the NoSQL sampler. 2. Some naming changes to the NoSQL adaptor to avoid fixing names with the profiler interface. 3. Tests. * added default sample limit --- .../src/metadata/profiler/adaptors/mongodb.py | 65 ++++++- .../profiler/adaptors/nosql_adaptor.py | 16 +- .../interface/nosql/profiler_interface.py | 31 +++- .../profiler/metrics/static/row_count.py | 2 +- .../processor/sampler/nosql/sampler.py | 72 ++++++++ .../processor/sampler/sampler_factory.py | 5 + .../processor/sampler/sampler_interface.py | 4 +- .../profiler/test_nosql_profiler.py | 166 ++++++++++++++---- 8 files changed, 310 insertions(+), 51 deletions(-) create mode 100644 ingestion/src/metadata/profiler/processor/sampler/nosql/sampler.py diff --git a/ingestion/src/metadata/profiler/adaptors/mongodb.py b/ingestion/src/metadata/profiler/adaptors/mongodb.py index bf28402d998..edbb637b263 100644 --- a/ingestion/src/metadata/profiler/adaptors/mongodb.py +++ b/ingestion/src/metadata/profiler/adaptors/mongodb.py @@ -11,19 +11,80 @@ """ MongoDB adaptor for the NoSQL profiler. """ +import json +from dataclasses import dataclass, field +from typing import Dict, List, Optional + from pymongo import MongoClient -from metadata.generated.schema.entity.data.table import Table +from metadata.generated.schema.entity.data.table import Column, Table from metadata.profiler.adaptors.nosql_adaptor import NoSQLAdaptor +@dataclass +class Query: + database: str + collection: str + filter: dict = field(default_factory=dict) + limit: Optional[int] = None + + def to_executable(self, client: MongoClient): + db = client[self.database] + collection = db[self.collection] + query = collection.find(self.filter) + if self.limit: + query = query.limit(self.limit) + return query + + class MongoDB(NoSQLAdaptor): """A MongoDB client that serves as an adaptor for profiling data assets on MongoDB""" def __init__(self, client: MongoClient): self.client = client - def get_row_count(self, table: Table) -> int: + def item_count(self, table: Table) -> int: db = self.client[table.databaseSchema.name] collection = db[table.name.__root__] return collection.count_documents({}) + + def scan( + self, table: Table, columns: List[Column], limit: int + ) -> List[Dict[str, any]]: + return self.execute( + Query( + database=table.databaseSchema.name, + collection=table.name.__root__, + limit=limit, + ) + ) + + def query( + self, table: Table, columns: List[Column], query: any, limit: int + ) -> List[Dict[str, any]]: + try: + json_query = json.loads(query) + except json.JSONDecodeError: + raise ValueError("Invalid JSON query") + return self.execute( + Query( + database=table.databaseSchema.name, + collection=table.name.__root__, + filter=json_query, + ) + ) + + def execute(self, query: Query) -> List[Dict[str, any]]: + records = list(query.to_executable(self.client)) + result = [] + for r in records: + result.append({c: self._json_safe(r.get(c)) for c in r}) + return result + + @staticmethod + def _json_safe(data: any): + try: + json.dumps(data) + return data + except Exception: + return str(data) diff --git a/ingestion/src/metadata/profiler/adaptors/nosql_adaptor.py b/ingestion/src/metadata/profiler/adaptors/nosql_adaptor.py index 40eedb5d5d9..e80564ddf7a 100644 --- a/ingestion/src/metadata/profiler/adaptors/nosql_adaptor.py +++ b/ingestion/src/metadata/profiler/adaptors/nosql_adaptor.py @@ -12,11 +12,23 @@ NoSQL adaptor for the NoSQL profiler. """ from abc import ABC, abstractmethod +from typing import Dict, List -from metadata.generated.schema.entity.data.table import Table +from metadata.generated.schema.entity.data.table import Column, Table class NoSQLAdaptor(ABC): @abstractmethod - def get_row_count(self, table: Table) -> int: + def item_count(self, table: Table) -> int: + raise NotImplementedError + + @abstractmethod + def scan( + self, table: Table, columns: List[Column], limit: int + ) -> List[Dict[str, any]]: + pass + + def query( + self, table: Table, columns: List[Column], query: any, limit: int + ) -> List[Dict[str, any]]: raise NotImplementedError diff --git a/ingestion/src/metadata/profiler/interface/nosql/profiler_interface.py b/ingestion/src/metadata/profiler/interface/nosql/profiler_interface.py index f1a16961a38..0ec581d2266 100644 --- a/ingestion/src/metadata/profiler/interface/nosql/profiler_interface.py +++ b/ingestion/src/metadata/profiler/interface/nosql/profiler_interface.py @@ -27,6 +27,7 @@ from metadata.profiler.api.models import ThreadPoolMetrics from metadata.profiler.interface.profiler_interface import ProfilerInterface from metadata.profiler.metrics.core import Metric, MetricTypes from metadata.profiler.metrics.registry import Metrics +from metadata.profiler.processor.sampler.nosql.sampler import NoSQLSampler from metadata.utils.logger import profiler_interface_registry_logger from metadata.utils.sqa_like_column import SQALikeColumn @@ -41,8 +42,9 @@ class NoSQLProfilerInterface(ProfilerInterface): # pylint: disable=too-many-arguments - def _get_sampler(self): - return None + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.sampler = self._get_sampler() def _compute_table_metrics( self, @@ -119,6 +121,7 @@ class NoSQLProfilerInterface(ProfilerInterface): row = self._get_metric_fn[metric_func.metric_type.value]( metric_func.metrics, client, + column=metric_func.column, ) except Exception as exc: name = f"{metric_func.column if metric_func.column is not None else metric_func.table}" @@ -134,8 +137,23 @@ class NoSQLProfilerInterface(ProfilerInterface): column = None return row, column, metric_func.metric_type.value - def fetch_sample_data(self, table, columns: SQALikeColumn) -> TableData: - return None + def fetch_sample_data(self, table, columns: List[SQALikeColumn]) -> TableData: + return self.sampler.fetch_sample_data(columns) + + def _get_sampler(self) -> NoSQLSampler: + """Get NoSQL sampler from config""" + from metadata.profiler.processor.sampler.sampler_factory import ( # pylint: disable=import-outside-toplevel + sampler_factory_, + ) + + return sampler_factory_.create( + self.service_connection_config.__class__.__name__, + table=self.table, + client=factory.construct(self.connection), + profile_sample_config=self.profile_sample_config, + partition_details=self.partition_details, + profile_sample_query=self.profile_query, + ) def get_composed_metrics( self, column: Column, metric: Metrics, column_results: Dict @@ -176,7 +194,10 @@ class NoSQLProfilerInterface(ProfilerInterface): return self.table_entity def get_columns(self) -> List[Optional[SQALikeColumn]]: - return [] + return [ + SQALikeColumn(name=c.name.__root__, type=c.dataType) + for c in self.table.columns + ] def close(self): self.connection.close() diff --git a/ingestion/src/metadata/profiler/metrics/static/row_count.py b/ingestion/src/metadata/profiler/metrics/static/row_count.py index dd2a221de68..c3f70f9d152 100644 --- a/ingestion/src/metadata/profiler/metrics/static/row_count.py +++ b/ingestion/src/metadata/profiler/metrics/static/row_count.py @@ -54,4 +54,4 @@ class RowCount(StaticMetric): @classmethod def nosql_fn(cls, client: NoSQLAdaptor) -> Callable[[Table], int]: - return client.get_row_count + return client.item_count diff --git a/ingestion/src/metadata/profiler/processor/sampler/nosql/sampler.py b/ingestion/src/metadata/profiler/processor/sampler/nosql/sampler.py new file mode 100644 index 00000000000..333d5ae712a --- /dev/null +++ b/ingestion/src/metadata/profiler/processor/sampler/nosql/sampler.py @@ -0,0 +1,72 @@ +from typing import Dict, List, Optional, Tuple + +from metadata.generated.schema.entity.data.table import ProfileSampleType, TableData +from metadata.profiler.adaptors.nosql_adaptor import NoSQLAdaptor +from metadata.profiler.processor.sampler.sampler_interface import SamplerInterface +from metadata.utils.constants import SAMPLE_DATA_DEFAULT_COUNT +from metadata.utils.sqa_like_column import SQALikeColumn + + +class NoSQLSampler(SamplerInterface): + client: NoSQLAdaptor + + def _rdn_sample_from_user_query(self) -> List[Dict[str, any]]: + """ + Get random sample from user query + """ + limit = self._get_limit() + return self.client.query( + self.table, self.table.columns, self._profile_sample_query, limit + ) + + def _fetch_sample_data_from_user_query(self) -> TableData: + """ + Fetch sample data based on a user query. Assuming the enging has one (example: MongoDB) + If the engine does not support a custom query, an error will be raised. + """ + records = self._rdn_sample_from_user_query() + columns = [ + SQALikeColumn(name=column.name.__root__, type=column.dataType) + for column in self.table.columns + ] + rows, cols = self.transpose_records(records, columns) + return TableData(rows=rows, columns=[c.name for c in cols]) + + def random_sample(self): + pass + + def fetch_sample_data(self, columns: List[SQALikeColumn]) -> TableData: + if self._profile_sample_query: + return self._fetch_sample_data_from_user_query() + return self._fetch_sample_data(columns) + + def _fetch_sample_data(self, columns: List[SQALikeColumn]): + """ + returns sampled ometa dataframes + """ + limit = self._get_limit() + records = self.client.scan(self.table, self.table.columns, limit) + rows, cols = self.transpose_records(records, columns) + return TableData(rows=rows, columns=[col.name for col in cols]) + + def _get_limit(self) -> Optional[int]: + num_rows = self.client.item_count(self.table) + if self.profile_sample_type == ProfileSampleType.PERCENTAGE: + limit = num_rows * (self.profile_sample / 100) + elif self.profile_sample_type == ProfileSampleType.ROWS: + limit = self.profile_sample + else: + limit = SAMPLE_DATA_DEFAULT_COUNT + return limit + + @staticmethod + def transpose_records( + records: List[Dict[str, any]], columns: List[SQALikeColumn] + ) -> Tuple[List[List[any]], List[SQALikeColumn]]: + rows = [] + for record in records: + row = [] + for column in columns: + row.append(record.get(column.name)) + rows.append(row) + return rows, columns diff --git a/ingestion/src/metadata/profiler/processor/sampler/sampler_factory.py b/ingestion/src/metadata/profiler/processor/sampler/sampler_factory.py index e7c0f25e7e5..88584f4eb24 100644 --- a/ingestion/src/metadata/profiler/processor/sampler/sampler_factory.py +++ b/ingestion/src/metadata/profiler/processor/sampler/sampler_factory.py @@ -21,10 +21,14 @@ from metadata.generated.schema.entity.services.connections.database.bigQueryConn from metadata.generated.schema.entity.services.connections.database.datalakeConnection import ( DatalakeConnection, ) +from metadata.generated.schema.entity.services.connections.database.mongoDBConnection import ( + MongoDBConnection, +) from metadata.generated.schema.entity.services.connections.database.trinoConnection import ( TrinoConnection, ) from metadata.generated.schema.entity.services.databaseService import DatabaseConnection +from metadata.profiler.processor.sampler.nosql.sampler import NoSQLSampler from metadata.profiler.processor.sampler.pandas.sampler import DatalakeSampler from metadata.profiler.processor.sampler.sqlalchemy.bigquery.sampler import ( BigQuerySampler, @@ -59,3 +63,4 @@ sampler_factory_.register(DatabaseConnection.__name__, SQASampler) sampler_factory_.register(BigQueryConnection.__name__, BigQuerySampler) sampler_factory_.register(DatalakeConnection.__name__, DatalakeSampler) sampler_factory_.register(TrinoConnection.__name__, TrinoSampler) +sampler_factory_.register(MongoDBConnection.__name__, NoSQLSampler) diff --git a/ingestion/src/metadata/profiler/processor/sampler/sampler_interface.py b/ingestion/src/metadata/profiler/processor/sampler/sampler_interface.py index 8711affa2c4..daba85fcebc 100644 --- a/ingestion/src/metadata/profiler/processor/sampler/sampler_interface.py +++ b/ingestion/src/metadata/profiler/processor/sampler/sampler_interface.py @@ -17,7 +17,7 @@ from typing import Dict, List, Optional, Union from sqlalchemy import Column -from metadata.generated.schema.entity.data.table import TableData +from metadata.generated.schema.entity.data.table import Table, TableData from metadata.profiler.api.models import ProfileSampleConfig from metadata.utils.constants import SAMPLE_DATA_DEFAULT_COUNT from metadata.utils.sqa_like_column import SQALikeColumn @@ -29,7 +29,7 @@ class SamplerInterface(ABC): def __init__( self, client, - table, + table: Table, profile_sample_config: Optional[ProfileSampleConfig] = None, partition_details: Optional[Dict] = None, profile_sample_query: Optional[str] = None, diff --git a/ingestion/tests/integration/profiler/test_nosql_profiler.py b/ingestion/tests/integration/profiler/test_nosql_profiler.py index ad0249b81fa..775d225cffe 100644 --- a/ingestion/tests/integration/profiler/test_nosql_profiler.py +++ b/ingestion/tests/integration/profiler/test_nosql_profiler.py @@ -27,14 +27,18 @@ from copy import deepcopy from datetime import datetime, timedelta from functools import partial from pathlib import Path +from random import choice, randint from unittest import TestCase -from pymongo import MongoClient +from pymongo import MongoClient, database from testcontainers.mongodb import MongoDbContainer from ingestion.tests.integration.integration_base import int_admin_ometa -from metadata.generated.schema.entity.data.table import ColumnProfile +from metadata.generated.schema.entity.data.table import ColumnProfile, Table from metadata.generated.schema.entity.services.databaseService import DatabaseService +from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.profiler.api.models import TableConfig +from metadata.utils.constants import SAMPLE_DATA_DEFAULT_COUNT from metadata.utils.helpers import datetime_to_ts from metadata.utils.test_utils import accumulate_errors from metadata.utils.time_utils import get_end_of_day_timestamp_mill @@ -45,6 +49,13 @@ from metadata.workflow.workflow_output_handler import print_status SERVICE_NAME = Path(__file__).stem +def add_query_config(config, table_config: TableConfig) -> dict: + config_copy = deepcopy(config) + config_copy["processor"]["config"].setdefault("tableConfig", []) + config_copy["processor"]["config"]["tableConfig"].append(table_config) + return config_copy + + def get_ingestion_config(mongo_port: str, mongo_user: str, mongo_pass: str): return { "source": { @@ -76,63 +87,66 @@ def get_ingestion_config(mongo_port: str, mongo_user: str, mongo_pass: str): TEST_DATABASE = "test-database" EMPTY_COLLECTION = "empty-collection" TEST_COLLECTION = "test-collection" -TEST_DATA = [ - { - "first_name": "John", - "last_name": "Doe", - "age": 30, - }, - { - "first_name": "Jane", - "last_name": "Doe", - "age": 25, - }, - { - "first_name": "John", - "last_name": "Smith", - "age": 35, - }, -] +NUM_ROWS = 200 + + +def random_row(): + return { + "name": choice(["John", "Jane", "Alice", "Bob"]), + "age": randint(20, 60), + "city": choice(["New York", "Chicago", "San Francisco"]), + "nested": {"key": "value" + str(randint(1, 10))}, + } + + +TEST_DATA = [random_row() for _ in range(NUM_ROWS)] class NoSQLProfiler(TestCase): """datalake profiler E2E test""" + mongo_container: MongoDbContainer + client: MongoClient + db: database.Database + collection: database.Collection + ingestion_config: dict + metadata: OpenMetadata + @classmethod def setUpClass(cls) -> None: cls.metadata = int_admin_ometa() - - def setUp(self) -> None: - self.mongo_container = MongoDbContainer("mongo:7.0.5-jammy") - self.mongo_container.start() - self.client = MongoClient(self.mongo_container.get_connection_url()) - self.db = self.client[TEST_DATABASE] - self.collection = self.db[TEST_COLLECTION] - self.collection.insert_many(TEST_DATA) - self.db.create_collection(EMPTY_COLLECTION) - self.ingestion_config = get_ingestion_config( - self.mongo_container.get_exposed_port("27017"), "test", "test" + cls.mongo_container = MongoDbContainer("mongo:7.0.5-jammy") + cls.mongo_container.start() + cls.client = MongoClient(cls.mongo_container.get_connection_url()) + cls.db = cls.client[TEST_DATABASE] + cls.collection = cls.db[TEST_COLLECTION] + cls.collection.insert_many(TEST_DATA) + cls.db.create_collection(EMPTY_COLLECTION) + cls.ingestion_config = get_ingestion_config( + cls.mongo_container.get_exposed_port("27017"), "test", "test" ) ingestion_workflow = MetadataWorkflow.create( - self.ingestion_config, + cls.ingestion_config, ) ingestion_workflow.execute() ingestion_workflow.raise_from_status() print_status(ingestion_workflow) ingestion_workflow.stop() - def tearDown(self): + @classmethod + def tearDownClass(cls): with accumulate_errors() as error_handler: - error_handler.try_execute(partial(self.mongo_container.stop, force=True)) - error_handler.try_execute(self.delete_service) + error_handler.try_execute(partial(cls.mongo_container.stop, force=True)) + error_handler.try_execute(cls.delete_service) - def delete_service(self): + @classmethod + def delete_service(cls): service_id = str( - self.metadata.get_by_name( + cls.metadata.get_by_name( entity=DatabaseService, fqn=SERVICE_NAME ).id.__root__ ) - self.metadata.delete( + cls.metadata.delete( entity=DatabaseService, entity_id=service_id, recursive=True, @@ -140,9 +154,12 @@ class NoSQLProfiler(TestCase): ) def test_setup_teardown(self): + """ + does nothing. useful to check if the setup and teardown methods are working + """ pass - def test_row_count(self): + def test_simple(self): workflow_config = deepcopy(self.ingestion_config) workflow_config["source"]["sourceConfig"]["config"].update( { @@ -160,7 +177,7 @@ class NoSQLProfiler(TestCase): assert status == 0 - expectations = {TEST_COLLECTION: 3, EMPTY_COLLECTION: 0} + expectations = {TEST_COLLECTION: len(TEST_DATA), EMPTY_COLLECTION: 0} for collection, expected_row_count in expectations.items(): collection_profile = self.metadata.get_profile_data( @@ -177,3 +194,74 @@ class NoSQLProfiler(TestCase): profile_type=ColumnProfile, ) assert len(column_profile.entities) == 0 + + table = self.metadata.get_by_name( + Table, f"{SERVICE_NAME}.default.{TEST_DATABASE}.{TEST_COLLECTION}" + ) + sample_data = self.metadata.get_sample_data(table) + assert [c.__root__ for c in sample_data.sampleData.columns] == [ + "_id", + "name", + "age", + "city", + "nested", + ] + assert len(sample_data.sampleData.rows) == SAMPLE_DATA_DEFAULT_COUNT + + def test_custom_query(self): + workflow_config = deepcopy(self.ingestion_config) + workflow_config["source"]["sourceConfig"]["config"].update( + { + "type": "Profiler", + } + ) + query_age = TEST_DATA[0]["age"] + workflow_config["processor"] = { + "type": "orm-profiler", + "config": { + "tableConfig": [ + { + "fullyQualifiedName": f"{SERVICE_NAME}.default.{TEST_DATABASE}.{TEST_COLLECTION}", + "profileQuery": '{"age": %s}' % query_age, + } + ], + }, + } + profiler_workflow = ProfilerWorkflow.create(workflow_config) + profiler_workflow.execute() + status = profiler_workflow.result_status() + profiler_workflow.stop() + + assert status == 0 + + # query profiler in MongoDB does not change the item count + expectations = {TEST_COLLECTION: len(TEST_DATA), EMPTY_COLLECTION: 0} + + for collection, expected_row_count in expectations.items(): + collection_profile = self.metadata.get_profile_data( + f"{SERVICE_NAME}.default.{TEST_DATABASE}.{collection}", + datetime_to_ts(datetime.now() - timedelta(seconds=10)), + get_end_of_day_timestamp_mill(), + ) + assert collection_profile.entities, collection + assert ( + collection_profile.entities[-1].rowCount == expected_row_count + ), collection + column_profile = self.metadata.get_profile_data( + f"{SERVICE_NAME}.default.{TEST_DATABASE}.{TEST_COLLECTION}.age", + datetime_to_ts(datetime.now() - timedelta(seconds=10)), + get_end_of_day_timestamp_mill(), + profile_type=ColumnProfile, + ) + assert len(column_profile.entities) == 0, collection + + table = self.metadata.get_by_name( + Table, f"{SERVICE_NAME}.default.{TEST_DATABASE}.{TEST_COLLECTION}" + ) + sample_data = self.metadata.get_sample_data(table) + age_column_index = [ + col.__root__ for col in sample_data.sampleData.columns + ].index("age") + assert all( + [r[age_column_index] == query_age for r in sample_data.sampleData.rows] + )