diff --git a/ingestion/src/metadata/profiler/adaptors/mongodb.py b/ingestion/src/metadata/profiler/adaptors/mongodb.py index bf28402d998..edbb637b263 100644 --- a/ingestion/src/metadata/profiler/adaptors/mongodb.py +++ b/ingestion/src/metadata/profiler/adaptors/mongodb.py @@ -11,19 +11,80 @@ """ MongoDB adaptor for the NoSQL profiler. """ +import json +from dataclasses import dataclass, field +from typing import Dict, List, Optional + from pymongo import MongoClient -from metadata.generated.schema.entity.data.table import Table +from metadata.generated.schema.entity.data.table import Column, Table from metadata.profiler.adaptors.nosql_adaptor import NoSQLAdaptor +@dataclass +class Query: + database: str + collection: str + filter: dict = field(default_factory=dict) + limit: Optional[int] = None + + def to_executable(self, client: MongoClient): + db = client[self.database] + collection = db[self.collection] + query = collection.find(self.filter) + if self.limit: + query = query.limit(self.limit) + return query + + class MongoDB(NoSQLAdaptor): """A MongoDB client that serves as an adaptor for profiling data assets on MongoDB""" def __init__(self, client: MongoClient): self.client = client - def get_row_count(self, table: Table) -> int: + def item_count(self, table: Table) -> int: db = self.client[table.databaseSchema.name] collection = db[table.name.__root__] return collection.count_documents({}) + + def scan( + self, table: Table, columns: List[Column], limit: int + ) -> List[Dict[str, any]]: + return self.execute( + Query( + database=table.databaseSchema.name, + collection=table.name.__root__, + limit=limit, + ) + ) + + def query( + self, table: Table, columns: List[Column], query: any, limit: int + ) -> List[Dict[str, any]]: + try: + json_query = json.loads(query) + except json.JSONDecodeError: + raise ValueError("Invalid JSON query") + return self.execute( + Query( + database=table.databaseSchema.name, + collection=table.name.__root__, + filter=json_query, + ) + ) + + def execute(self, query: Query) -> List[Dict[str, any]]: + records = list(query.to_executable(self.client)) + result = [] + for r in records: + result.append({c: self._json_safe(r.get(c)) for c in r}) + return result + + @staticmethod + def _json_safe(data: any): + try: + json.dumps(data) + return data + except Exception: + return str(data) diff --git a/ingestion/src/metadata/profiler/adaptors/nosql_adaptor.py b/ingestion/src/metadata/profiler/adaptors/nosql_adaptor.py index 40eedb5d5d9..e80564ddf7a 100644 --- a/ingestion/src/metadata/profiler/adaptors/nosql_adaptor.py +++ b/ingestion/src/metadata/profiler/adaptors/nosql_adaptor.py @@ -12,11 +12,23 @@ NoSQL adaptor for the NoSQL profiler. """ from abc import ABC, abstractmethod +from typing import Dict, List -from metadata.generated.schema.entity.data.table import Table +from metadata.generated.schema.entity.data.table import Column, Table class NoSQLAdaptor(ABC): @abstractmethod - def get_row_count(self, table: Table) -> int: + def item_count(self, table: Table) -> int: + raise NotImplementedError + + @abstractmethod + def scan( + self, table: Table, columns: List[Column], limit: int + ) -> List[Dict[str, any]]: + pass + + def query( + self, table: Table, columns: List[Column], query: any, limit: int + ) -> List[Dict[str, any]]: raise NotImplementedError diff --git a/ingestion/src/metadata/profiler/interface/nosql/profiler_interface.py b/ingestion/src/metadata/profiler/interface/nosql/profiler_interface.py index f1a16961a38..0ec581d2266 100644 --- a/ingestion/src/metadata/profiler/interface/nosql/profiler_interface.py +++ b/ingestion/src/metadata/profiler/interface/nosql/profiler_interface.py @@ -27,6 +27,7 @@ from metadata.profiler.api.models import ThreadPoolMetrics from metadata.profiler.interface.profiler_interface import ProfilerInterface from metadata.profiler.metrics.core import Metric, MetricTypes from metadata.profiler.metrics.registry import Metrics +from metadata.profiler.processor.sampler.nosql.sampler import NoSQLSampler from metadata.utils.logger import profiler_interface_registry_logger from metadata.utils.sqa_like_column import SQALikeColumn @@ -41,8 +42,9 @@ class NoSQLProfilerInterface(ProfilerInterface): # pylint: disable=too-many-arguments - def _get_sampler(self): - return None + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.sampler = self._get_sampler() def _compute_table_metrics( self, @@ -119,6 +121,7 @@ class NoSQLProfilerInterface(ProfilerInterface): row = self._get_metric_fn[metric_func.metric_type.value]( metric_func.metrics, client, + column=metric_func.column, ) except Exception as exc: name = f"{metric_func.column if metric_func.column is not None else metric_func.table}" @@ -134,8 +137,23 @@ class NoSQLProfilerInterface(ProfilerInterface): column = None return row, column, metric_func.metric_type.value - def fetch_sample_data(self, table, columns: SQALikeColumn) -> TableData: - return None + def fetch_sample_data(self, table, columns: List[SQALikeColumn]) -> TableData: + return self.sampler.fetch_sample_data(columns) + + def _get_sampler(self) -> NoSQLSampler: + """Get NoSQL sampler from config""" + from metadata.profiler.processor.sampler.sampler_factory import ( # pylint: disable=import-outside-toplevel + sampler_factory_, + ) + + return sampler_factory_.create( + self.service_connection_config.__class__.__name__, + table=self.table, + client=factory.construct(self.connection), + profile_sample_config=self.profile_sample_config, + partition_details=self.partition_details, + profile_sample_query=self.profile_query, + ) def get_composed_metrics( self, column: Column, metric: Metrics, column_results: Dict @@ -176,7 +194,10 @@ class NoSQLProfilerInterface(ProfilerInterface): return self.table_entity def get_columns(self) -> List[Optional[SQALikeColumn]]: - return [] + return [ + SQALikeColumn(name=c.name.__root__, type=c.dataType) + for c in self.table.columns + ] def close(self): self.connection.close() diff --git a/ingestion/src/metadata/profiler/metrics/static/row_count.py b/ingestion/src/metadata/profiler/metrics/static/row_count.py index dd2a221de68..c3f70f9d152 100644 --- a/ingestion/src/metadata/profiler/metrics/static/row_count.py +++ b/ingestion/src/metadata/profiler/metrics/static/row_count.py @@ -54,4 +54,4 @@ class RowCount(StaticMetric): @classmethod def nosql_fn(cls, client: NoSQLAdaptor) -> Callable[[Table], int]: - return client.get_row_count + return client.item_count diff --git a/ingestion/src/metadata/profiler/processor/sampler/nosql/sampler.py b/ingestion/src/metadata/profiler/processor/sampler/nosql/sampler.py new file mode 100644 index 00000000000..333d5ae712a --- /dev/null +++ b/ingestion/src/metadata/profiler/processor/sampler/nosql/sampler.py @@ -0,0 +1,72 @@ +from typing import Dict, List, Optional, Tuple + +from metadata.generated.schema.entity.data.table import ProfileSampleType, TableData +from metadata.profiler.adaptors.nosql_adaptor import NoSQLAdaptor +from metadata.profiler.processor.sampler.sampler_interface import SamplerInterface +from metadata.utils.constants import SAMPLE_DATA_DEFAULT_COUNT +from metadata.utils.sqa_like_column import SQALikeColumn + + +class NoSQLSampler(SamplerInterface): + client: NoSQLAdaptor + + def _rdn_sample_from_user_query(self) -> List[Dict[str, any]]: + """ + Get random sample from user query + """ + limit = self._get_limit() + return self.client.query( + self.table, self.table.columns, self._profile_sample_query, limit + ) + + def _fetch_sample_data_from_user_query(self) -> TableData: + """ + Fetch sample data based on a user query. Assuming the enging has one (example: MongoDB) + If the engine does not support a custom query, an error will be raised. + """ + records = self._rdn_sample_from_user_query() + columns = [ + SQALikeColumn(name=column.name.__root__, type=column.dataType) + for column in self.table.columns + ] + rows, cols = self.transpose_records(records, columns) + return TableData(rows=rows, columns=[c.name for c in cols]) + + def random_sample(self): + pass + + def fetch_sample_data(self, columns: List[SQALikeColumn]) -> TableData: + if self._profile_sample_query: + return self._fetch_sample_data_from_user_query() + return self._fetch_sample_data(columns) + + def _fetch_sample_data(self, columns: List[SQALikeColumn]): + """ + returns sampled ometa dataframes + """ + limit = self._get_limit() + records = self.client.scan(self.table, self.table.columns, limit) + rows, cols = self.transpose_records(records, columns) + return TableData(rows=rows, columns=[col.name for col in cols]) + + def _get_limit(self) -> Optional[int]: + num_rows = self.client.item_count(self.table) + if self.profile_sample_type == ProfileSampleType.PERCENTAGE: + limit = num_rows * (self.profile_sample / 100) + elif self.profile_sample_type == ProfileSampleType.ROWS: + limit = self.profile_sample + else: + limit = SAMPLE_DATA_DEFAULT_COUNT + return limit + + @staticmethod + def transpose_records( + records: List[Dict[str, any]], columns: List[SQALikeColumn] + ) -> Tuple[List[List[any]], List[SQALikeColumn]]: + rows = [] + for record in records: + row = [] + for column in columns: + row.append(record.get(column.name)) + rows.append(row) + return rows, columns diff --git a/ingestion/src/metadata/profiler/processor/sampler/sampler_factory.py b/ingestion/src/metadata/profiler/processor/sampler/sampler_factory.py index e7c0f25e7e5..88584f4eb24 100644 --- a/ingestion/src/metadata/profiler/processor/sampler/sampler_factory.py +++ b/ingestion/src/metadata/profiler/processor/sampler/sampler_factory.py @@ -21,10 +21,14 @@ from metadata.generated.schema.entity.services.connections.database.bigQueryConn from metadata.generated.schema.entity.services.connections.database.datalakeConnection import ( DatalakeConnection, ) +from metadata.generated.schema.entity.services.connections.database.mongoDBConnection import ( + MongoDBConnection, +) from metadata.generated.schema.entity.services.connections.database.trinoConnection import ( TrinoConnection, ) from metadata.generated.schema.entity.services.databaseService import DatabaseConnection +from metadata.profiler.processor.sampler.nosql.sampler import NoSQLSampler from metadata.profiler.processor.sampler.pandas.sampler import DatalakeSampler from metadata.profiler.processor.sampler.sqlalchemy.bigquery.sampler import ( BigQuerySampler, @@ -59,3 +63,4 @@ sampler_factory_.register(DatabaseConnection.__name__, SQASampler) sampler_factory_.register(BigQueryConnection.__name__, BigQuerySampler) sampler_factory_.register(DatalakeConnection.__name__, DatalakeSampler) sampler_factory_.register(TrinoConnection.__name__, TrinoSampler) +sampler_factory_.register(MongoDBConnection.__name__, NoSQLSampler) diff --git a/ingestion/src/metadata/profiler/processor/sampler/sampler_interface.py b/ingestion/src/metadata/profiler/processor/sampler/sampler_interface.py index 8711affa2c4..daba85fcebc 100644 --- a/ingestion/src/metadata/profiler/processor/sampler/sampler_interface.py +++ b/ingestion/src/metadata/profiler/processor/sampler/sampler_interface.py @@ -17,7 +17,7 @@ from typing import Dict, List, Optional, Union from sqlalchemy import Column -from metadata.generated.schema.entity.data.table import TableData +from metadata.generated.schema.entity.data.table import Table, TableData from metadata.profiler.api.models import ProfileSampleConfig from metadata.utils.constants import SAMPLE_DATA_DEFAULT_COUNT from metadata.utils.sqa_like_column import SQALikeColumn @@ -29,7 +29,7 @@ class SamplerInterface(ABC): def __init__( self, client, - table, + table: Table, profile_sample_config: Optional[ProfileSampleConfig] = None, partition_details: Optional[Dict] = None, profile_sample_query: Optional[str] = None, diff --git a/ingestion/tests/integration/profiler/test_nosql_profiler.py b/ingestion/tests/integration/profiler/test_nosql_profiler.py index ad0249b81fa..775d225cffe 100644 --- a/ingestion/tests/integration/profiler/test_nosql_profiler.py +++ b/ingestion/tests/integration/profiler/test_nosql_profiler.py @@ -27,14 +27,18 @@ from copy import deepcopy from datetime import datetime, timedelta from functools import partial from pathlib import Path +from random import choice, randint from unittest import TestCase -from pymongo import MongoClient +from pymongo import MongoClient, database from testcontainers.mongodb import MongoDbContainer from ingestion.tests.integration.integration_base import int_admin_ometa -from metadata.generated.schema.entity.data.table import ColumnProfile +from metadata.generated.schema.entity.data.table import ColumnProfile, Table from metadata.generated.schema.entity.services.databaseService import DatabaseService +from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.profiler.api.models import TableConfig +from metadata.utils.constants import SAMPLE_DATA_DEFAULT_COUNT from metadata.utils.helpers import datetime_to_ts from metadata.utils.test_utils import accumulate_errors from metadata.utils.time_utils import get_end_of_day_timestamp_mill @@ -45,6 +49,13 @@ from metadata.workflow.workflow_output_handler import print_status SERVICE_NAME = Path(__file__).stem +def add_query_config(config, table_config: TableConfig) -> dict: + config_copy = deepcopy(config) + config_copy["processor"]["config"].setdefault("tableConfig", []) + config_copy["processor"]["config"]["tableConfig"].append(table_config) + return config_copy + + def get_ingestion_config(mongo_port: str, mongo_user: str, mongo_pass: str): return { "source": { @@ -76,63 +87,66 @@ def get_ingestion_config(mongo_port: str, mongo_user: str, mongo_pass: str): TEST_DATABASE = "test-database" EMPTY_COLLECTION = "empty-collection" TEST_COLLECTION = "test-collection" -TEST_DATA = [ - { - "first_name": "John", - "last_name": "Doe", - "age": 30, - }, - { - "first_name": "Jane", - "last_name": "Doe", - "age": 25, - }, - { - "first_name": "John", - "last_name": "Smith", - "age": 35, - }, -] +NUM_ROWS = 200 + + +def random_row(): + return { + "name": choice(["John", "Jane", "Alice", "Bob"]), + "age": randint(20, 60), + "city": choice(["New York", "Chicago", "San Francisco"]), + "nested": {"key": "value" + str(randint(1, 10))}, + } + + +TEST_DATA = [random_row() for _ in range(NUM_ROWS)] class NoSQLProfiler(TestCase): """datalake profiler E2E test""" + mongo_container: MongoDbContainer + client: MongoClient + db: database.Database + collection: database.Collection + ingestion_config: dict + metadata: OpenMetadata + @classmethod def setUpClass(cls) -> None: cls.metadata = int_admin_ometa() - - def setUp(self) -> None: - self.mongo_container = MongoDbContainer("mongo:7.0.5-jammy") - self.mongo_container.start() - self.client = MongoClient(self.mongo_container.get_connection_url()) - self.db = self.client[TEST_DATABASE] - self.collection = self.db[TEST_COLLECTION] - self.collection.insert_many(TEST_DATA) - self.db.create_collection(EMPTY_COLLECTION) - self.ingestion_config = get_ingestion_config( - self.mongo_container.get_exposed_port("27017"), "test", "test" + cls.mongo_container = MongoDbContainer("mongo:7.0.5-jammy") + cls.mongo_container.start() + cls.client = MongoClient(cls.mongo_container.get_connection_url()) + cls.db = cls.client[TEST_DATABASE] + cls.collection = cls.db[TEST_COLLECTION] + cls.collection.insert_many(TEST_DATA) + cls.db.create_collection(EMPTY_COLLECTION) + cls.ingestion_config = get_ingestion_config( + cls.mongo_container.get_exposed_port("27017"), "test", "test" ) ingestion_workflow = MetadataWorkflow.create( - self.ingestion_config, + cls.ingestion_config, ) ingestion_workflow.execute() ingestion_workflow.raise_from_status() print_status(ingestion_workflow) ingestion_workflow.stop() - def tearDown(self): + @classmethod + def tearDownClass(cls): with accumulate_errors() as error_handler: - error_handler.try_execute(partial(self.mongo_container.stop, force=True)) - error_handler.try_execute(self.delete_service) + error_handler.try_execute(partial(cls.mongo_container.stop, force=True)) + error_handler.try_execute(cls.delete_service) - def delete_service(self): + @classmethod + def delete_service(cls): service_id = str( - self.metadata.get_by_name( + cls.metadata.get_by_name( entity=DatabaseService, fqn=SERVICE_NAME ).id.__root__ ) - self.metadata.delete( + cls.metadata.delete( entity=DatabaseService, entity_id=service_id, recursive=True, @@ -140,9 +154,12 @@ class NoSQLProfiler(TestCase): ) def test_setup_teardown(self): + """ + does nothing. useful to check if the setup and teardown methods are working + """ pass - def test_row_count(self): + def test_simple(self): workflow_config = deepcopy(self.ingestion_config) workflow_config["source"]["sourceConfig"]["config"].update( { @@ -160,7 +177,7 @@ class NoSQLProfiler(TestCase): assert status == 0 - expectations = {TEST_COLLECTION: 3, EMPTY_COLLECTION: 0} + expectations = {TEST_COLLECTION: len(TEST_DATA), EMPTY_COLLECTION: 0} for collection, expected_row_count in expectations.items(): collection_profile = self.metadata.get_profile_data( @@ -177,3 +194,74 @@ class NoSQLProfiler(TestCase): profile_type=ColumnProfile, ) assert len(column_profile.entities) == 0 + + table = self.metadata.get_by_name( + Table, f"{SERVICE_NAME}.default.{TEST_DATABASE}.{TEST_COLLECTION}" + ) + sample_data = self.metadata.get_sample_data(table) + assert [c.__root__ for c in sample_data.sampleData.columns] == [ + "_id", + "name", + "age", + "city", + "nested", + ] + assert len(sample_data.sampleData.rows) == SAMPLE_DATA_DEFAULT_COUNT + + def test_custom_query(self): + workflow_config = deepcopy(self.ingestion_config) + workflow_config["source"]["sourceConfig"]["config"].update( + { + "type": "Profiler", + } + ) + query_age = TEST_DATA[0]["age"] + workflow_config["processor"] = { + "type": "orm-profiler", + "config": { + "tableConfig": [ + { + "fullyQualifiedName": f"{SERVICE_NAME}.default.{TEST_DATABASE}.{TEST_COLLECTION}", + "profileQuery": '{"age": %s}' % query_age, + } + ], + }, + } + profiler_workflow = ProfilerWorkflow.create(workflow_config) + profiler_workflow.execute() + status = profiler_workflow.result_status() + profiler_workflow.stop() + + assert status == 0 + + # query profiler in MongoDB does not change the item count + expectations = {TEST_COLLECTION: len(TEST_DATA), EMPTY_COLLECTION: 0} + + for collection, expected_row_count in expectations.items(): + collection_profile = self.metadata.get_profile_data( + f"{SERVICE_NAME}.default.{TEST_DATABASE}.{collection}", + datetime_to_ts(datetime.now() - timedelta(seconds=10)), + get_end_of_day_timestamp_mill(), + ) + assert collection_profile.entities, collection + assert ( + collection_profile.entities[-1].rowCount == expected_row_count + ), collection + column_profile = self.metadata.get_profile_data( + f"{SERVICE_NAME}.default.{TEST_DATABASE}.{TEST_COLLECTION}.age", + datetime_to_ts(datetime.now() - timedelta(seconds=10)), + get_end_of_day_timestamp_mill(), + profile_type=ColumnProfile, + ) + assert len(column_profile.entities) == 0, collection + + table = self.metadata.get_by_name( + Table, f"{SERVICE_NAME}.default.{TEST_DATABASE}.{TEST_COLLECTION}" + ) + sample_data = self.metadata.get_sample_data(table) + age_column_index = [ + col.__root__ for col in sample_data.sampleData.columns + ].index("age") + assert all( + [r[age_column_index] == query_age for r in sample_data.sampleData.rows] + )