MINOR: add MongoDB sample data (#15237)

* feat(nosql-profiler): row count

1. Implemented the NoSQLProfilerInterface as an entrypoint for the nosql profiler.
2. Added the NoSQLMetric as an abstract class.
3. Implemented the interface for the MongoDB database source.
4. Implemented an e2e test using testcontainers.

* added profiler support for mongodb connection

* doc

* use int_admin_ometa in test setup

* - fixed linting issue in gx
- removed unused inheritance

* moved the nosql function into the metric class

* formatting

* validate_compose: raise exception for bad status code.

* fixed import

* format

* feat(nosql-profiler): added sample data

1. Implemented the NoSQL sampler.
2. Some naming changes to the NoSQL adaptor to avoid fixing names with the profiler interface.
3. Tests.

* added default sample limit
This commit is contained in:
Imri Paran 2024-02-22 16:31:58 +01:00 committed by GitHub
parent 4967e091e6
commit ff2ecc56f2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 310 additions and 51 deletions

View File

@ -11,19 +11,80 @@
""" """
MongoDB adaptor for the NoSQL profiler. MongoDB adaptor for the NoSQL profiler.
""" """
import json
from dataclasses import dataclass, field
from typing import Dict, List, Optional
from pymongo import MongoClient from pymongo import MongoClient
from metadata.generated.schema.entity.data.table import Table from metadata.generated.schema.entity.data.table import Column, Table
from metadata.profiler.adaptors.nosql_adaptor import NoSQLAdaptor from metadata.profiler.adaptors.nosql_adaptor import NoSQLAdaptor
@dataclass
class Query:
database: str
collection: str
filter: dict = field(default_factory=dict)
limit: Optional[int] = None
def to_executable(self, client: MongoClient):
db = client[self.database]
collection = db[self.collection]
query = collection.find(self.filter)
if self.limit:
query = query.limit(self.limit)
return query
class MongoDB(NoSQLAdaptor): class MongoDB(NoSQLAdaptor):
"""A MongoDB client that serves as an adaptor for profiling data assets on MongoDB""" """A MongoDB client that serves as an adaptor for profiling data assets on MongoDB"""
def __init__(self, client: MongoClient): def __init__(self, client: MongoClient):
self.client = client self.client = client
def get_row_count(self, table: Table) -> int: def item_count(self, table: Table) -> int:
db = self.client[table.databaseSchema.name] db = self.client[table.databaseSchema.name]
collection = db[table.name.__root__] collection = db[table.name.__root__]
return collection.count_documents({}) return collection.count_documents({})
def scan(
self, table: Table, columns: List[Column], limit: int
) -> List[Dict[str, any]]:
return self.execute(
Query(
database=table.databaseSchema.name,
collection=table.name.__root__,
limit=limit,
)
)
def query(
self, table: Table, columns: List[Column], query: any, limit: int
) -> List[Dict[str, any]]:
try:
json_query = json.loads(query)
except json.JSONDecodeError:
raise ValueError("Invalid JSON query")
return self.execute(
Query(
database=table.databaseSchema.name,
collection=table.name.__root__,
filter=json_query,
)
)
def execute(self, query: Query) -> List[Dict[str, any]]:
records = list(query.to_executable(self.client))
result = []
for r in records:
result.append({c: self._json_safe(r.get(c)) for c in r})
return result
@staticmethod
def _json_safe(data: any):
try:
json.dumps(data)
return data
except Exception:
return str(data)

View File

@ -12,11 +12,23 @@
NoSQL adaptor for the NoSQL profiler. NoSQL adaptor for the NoSQL profiler.
""" """
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from typing import Dict, List
from metadata.generated.schema.entity.data.table import Table from metadata.generated.schema.entity.data.table import Column, Table
class NoSQLAdaptor(ABC): class NoSQLAdaptor(ABC):
@abstractmethod @abstractmethod
def get_row_count(self, table: Table) -> int: def item_count(self, table: Table) -> int:
raise NotImplementedError
@abstractmethod
def scan(
self, table: Table, columns: List[Column], limit: int
) -> List[Dict[str, any]]:
pass
def query(
self, table: Table, columns: List[Column], query: any, limit: int
) -> List[Dict[str, any]]:
raise NotImplementedError raise NotImplementedError

View File

@ -27,6 +27,7 @@ from metadata.profiler.api.models import ThreadPoolMetrics
from metadata.profiler.interface.profiler_interface import ProfilerInterface from metadata.profiler.interface.profiler_interface import ProfilerInterface
from metadata.profiler.metrics.core import Metric, MetricTypes from metadata.profiler.metrics.core import Metric, MetricTypes
from metadata.profiler.metrics.registry import Metrics from metadata.profiler.metrics.registry import Metrics
from metadata.profiler.processor.sampler.nosql.sampler import NoSQLSampler
from metadata.utils.logger import profiler_interface_registry_logger from metadata.utils.logger import profiler_interface_registry_logger
from metadata.utils.sqa_like_column import SQALikeColumn from metadata.utils.sqa_like_column import SQALikeColumn
@ -41,8 +42,9 @@ class NoSQLProfilerInterface(ProfilerInterface):
# pylint: disable=too-many-arguments # pylint: disable=too-many-arguments
def _get_sampler(self): def __init__(self, *args, **kwargs):
return None super().__init__(*args, **kwargs)
self.sampler = self._get_sampler()
def _compute_table_metrics( def _compute_table_metrics(
self, self,
@ -119,6 +121,7 @@ class NoSQLProfilerInterface(ProfilerInterface):
row = self._get_metric_fn[metric_func.metric_type.value]( row = self._get_metric_fn[metric_func.metric_type.value](
metric_func.metrics, metric_func.metrics,
client, client,
column=metric_func.column,
) )
except Exception as exc: except Exception as exc:
name = f"{metric_func.column if metric_func.column is not None else metric_func.table}" name = f"{metric_func.column if metric_func.column is not None else metric_func.table}"
@ -134,8 +137,23 @@ class NoSQLProfilerInterface(ProfilerInterface):
column = None column = None
return row, column, metric_func.metric_type.value return row, column, metric_func.metric_type.value
def fetch_sample_data(self, table, columns: SQALikeColumn) -> TableData: def fetch_sample_data(self, table, columns: List[SQALikeColumn]) -> TableData:
return None return self.sampler.fetch_sample_data(columns)
def _get_sampler(self) -> NoSQLSampler:
"""Get NoSQL sampler from config"""
from metadata.profiler.processor.sampler.sampler_factory import ( # pylint: disable=import-outside-toplevel
sampler_factory_,
)
return sampler_factory_.create(
self.service_connection_config.__class__.__name__,
table=self.table,
client=factory.construct(self.connection),
profile_sample_config=self.profile_sample_config,
partition_details=self.partition_details,
profile_sample_query=self.profile_query,
)
def get_composed_metrics( def get_composed_metrics(
self, column: Column, metric: Metrics, column_results: Dict self, column: Column, metric: Metrics, column_results: Dict
@ -176,7 +194,10 @@ class NoSQLProfilerInterface(ProfilerInterface):
return self.table_entity return self.table_entity
def get_columns(self) -> List[Optional[SQALikeColumn]]: def get_columns(self) -> List[Optional[SQALikeColumn]]:
return [] return [
SQALikeColumn(name=c.name.__root__, type=c.dataType)
for c in self.table.columns
]
def close(self): def close(self):
self.connection.close() self.connection.close()

View File

@ -54,4 +54,4 @@ class RowCount(StaticMetric):
@classmethod @classmethod
def nosql_fn(cls, client: NoSQLAdaptor) -> Callable[[Table], int]: def nosql_fn(cls, client: NoSQLAdaptor) -> Callable[[Table], int]:
return client.get_row_count return client.item_count

View File

@ -0,0 +1,72 @@
from typing import Dict, List, Optional, Tuple
from metadata.generated.schema.entity.data.table import ProfileSampleType, TableData
from metadata.profiler.adaptors.nosql_adaptor import NoSQLAdaptor
from metadata.profiler.processor.sampler.sampler_interface import SamplerInterface
from metadata.utils.constants import SAMPLE_DATA_DEFAULT_COUNT
from metadata.utils.sqa_like_column import SQALikeColumn
class NoSQLSampler(SamplerInterface):
client: NoSQLAdaptor
def _rdn_sample_from_user_query(self) -> List[Dict[str, any]]:
"""
Get random sample from user query
"""
limit = self._get_limit()
return self.client.query(
self.table, self.table.columns, self._profile_sample_query, limit
)
def _fetch_sample_data_from_user_query(self) -> TableData:
"""
Fetch sample data based on a user query. Assuming the enging has one (example: MongoDB)
If the engine does not support a custom query, an error will be raised.
"""
records = self._rdn_sample_from_user_query()
columns = [
SQALikeColumn(name=column.name.__root__, type=column.dataType)
for column in self.table.columns
]
rows, cols = self.transpose_records(records, columns)
return TableData(rows=rows, columns=[c.name for c in cols])
def random_sample(self):
pass
def fetch_sample_data(self, columns: List[SQALikeColumn]) -> TableData:
if self._profile_sample_query:
return self._fetch_sample_data_from_user_query()
return self._fetch_sample_data(columns)
def _fetch_sample_data(self, columns: List[SQALikeColumn]):
"""
returns sampled ometa dataframes
"""
limit = self._get_limit()
records = self.client.scan(self.table, self.table.columns, limit)
rows, cols = self.transpose_records(records, columns)
return TableData(rows=rows, columns=[col.name for col in cols])
def _get_limit(self) -> Optional[int]:
num_rows = self.client.item_count(self.table)
if self.profile_sample_type == ProfileSampleType.PERCENTAGE:
limit = num_rows * (self.profile_sample / 100)
elif self.profile_sample_type == ProfileSampleType.ROWS:
limit = self.profile_sample
else:
limit = SAMPLE_DATA_DEFAULT_COUNT
return limit
@staticmethod
def transpose_records(
records: List[Dict[str, any]], columns: List[SQALikeColumn]
) -> Tuple[List[List[any]], List[SQALikeColumn]]:
rows = []
for record in records:
row = []
for column in columns:
row.append(record.get(column.name))
rows.append(row)
return rows, columns

View File

@ -21,10 +21,14 @@ from metadata.generated.schema.entity.services.connections.database.bigQueryConn
from metadata.generated.schema.entity.services.connections.database.datalakeConnection import ( from metadata.generated.schema.entity.services.connections.database.datalakeConnection import (
DatalakeConnection, DatalakeConnection,
) )
from metadata.generated.schema.entity.services.connections.database.mongoDBConnection import (
MongoDBConnection,
)
from metadata.generated.schema.entity.services.connections.database.trinoConnection import ( from metadata.generated.schema.entity.services.connections.database.trinoConnection import (
TrinoConnection, TrinoConnection,
) )
from metadata.generated.schema.entity.services.databaseService import DatabaseConnection from metadata.generated.schema.entity.services.databaseService import DatabaseConnection
from metadata.profiler.processor.sampler.nosql.sampler import NoSQLSampler
from metadata.profiler.processor.sampler.pandas.sampler import DatalakeSampler from metadata.profiler.processor.sampler.pandas.sampler import DatalakeSampler
from metadata.profiler.processor.sampler.sqlalchemy.bigquery.sampler import ( from metadata.profiler.processor.sampler.sqlalchemy.bigquery.sampler import (
BigQuerySampler, BigQuerySampler,
@ -59,3 +63,4 @@ sampler_factory_.register(DatabaseConnection.__name__, SQASampler)
sampler_factory_.register(BigQueryConnection.__name__, BigQuerySampler) sampler_factory_.register(BigQueryConnection.__name__, BigQuerySampler)
sampler_factory_.register(DatalakeConnection.__name__, DatalakeSampler) sampler_factory_.register(DatalakeConnection.__name__, DatalakeSampler)
sampler_factory_.register(TrinoConnection.__name__, TrinoSampler) sampler_factory_.register(TrinoConnection.__name__, TrinoSampler)
sampler_factory_.register(MongoDBConnection.__name__, NoSQLSampler)

View File

@ -17,7 +17,7 @@ from typing import Dict, List, Optional, Union
from sqlalchemy import Column from sqlalchemy import Column
from metadata.generated.schema.entity.data.table import TableData from metadata.generated.schema.entity.data.table import Table, TableData
from metadata.profiler.api.models import ProfileSampleConfig from metadata.profiler.api.models import ProfileSampleConfig
from metadata.utils.constants import SAMPLE_DATA_DEFAULT_COUNT from metadata.utils.constants import SAMPLE_DATA_DEFAULT_COUNT
from metadata.utils.sqa_like_column import SQALikeColumn from metadata.utils.sqa_like_column import SQALikeColumn
@ -29,7 +29,7 @@ class SamplerInterface(ABC):
def __init__( def __init__(
self, self,
client, client,
table, table: Table,
profile_sample_config: Optional[ProfileSampleConfig] = None, profile_sample_config: Optional[ProfileSampleConfig] = None,
partition_details: Optional[Dict] = None, partition_details: Optional[Dict] = None,
profile_sample_query: Optional[str] = None, profile_sample_query: Optional[str] = None,

View File

@ -27,14 +27,18 @@ from copy import deepcopy
from datetime import datetime, timedelta from datetime import datetime, timedelta
from functools import partial from functools import partial
from pathlib import Path from pathlib import Path
from random import choice, randint
from unittest import TestCase from unittest import TestCase
from pymongo import MongoClient from pymongo import MongoClient, database
from testcontainers.mongodb import MongoDbContainer from testcontainers.mongodb import MongoDbContainer
from ingestion.tests.integration.integration_base import int_admin_ometa from ingestion.tests.integration.integration_base import int_admin_ometa
from metadata.generated.schema.entity.data.table import ColumnProfile from metadata.generated.schema.entity.data.table import ColumnProfile, Table
from metadata.generated.schema.entity.services.databaseService import DatabaseService from metadata.generated.schema.entity.services.databaseService import DatabaseService
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.profiler.api.models import TableConfig
from metadata.utils.constants import SAMPLE_DATA_DEFAULT_COUNT
from metadata.utils.helpers import datetime_to_ts from metadata.utils.helpers import datetime_to_ts
from metadata.utils.test_utils import accumulate_errors from metadata.utils.test_utils import accumulate_errors
from metadata.utils.time_utils import get_end_of_day_timestamp_mill from metadata.utils.time_utils import get_end_of_day_timestamp_mill
@ -45,6 +49,13 @@ from metadata.workflow.workflow_output_handler import print_status
SERVICE_NAME = Path(__file__).stem SERVICE_NAME = Path(__file__).stem
def add_query_config(config, table_config: TableConfig) -> dict:
config_copy = deepcopy(config)
config_copy["processor"]["config"].setdefault("tableConfig", [])
config_copy["processor"]["config"]["tableConfig"].append(table_config)
return config_copy
def get_ingestion_config(mongo_port: str, mongo_user: str, mongo_pass: str): def get_ingestion_config(mongo_port: str, mongo_user: str, mongo_pass: str):
return { return {
"source": { "source": {
@ -76,63 +87,66 @@ def get_ingestion_config(mongo_port: str, mongo_user: str, mongo_pass: str):
TEST_DATABASE = "test-database" TEST_DATABASE = "test-database"
EMPTY_COLLECTION = "empty-collection" EMPTY_COLLECTION = "empty-collection"
TEST_COLLECTION = "test-collection" TEST_COLLECTION = "test-collection"
TEST_DATA = [ NUM_ROWS = 200
{
"first_name": "John",
"last_name": "Doe", def random_row():
"age": 30, return {
}, "name": choice(["John", "Jane", "Alice", "Bob"]),
{ "age": randint(20, 60),
"first_name": "Jane", "city": choice(["New York", "Chicago", "San Francisco"]),
"last_name": "Doe", "nested": {"key": "value" + str(randint(1, 10))},
"age": 25, }
},
{
"first_name": "John", TEST_DATA = [random_row() for _ in range(NUM_ROWS)]
"last_name": "Smith",
"age": 35,
},
]
class NoSQLProfiler(TestCase): class NoSQLProfiler(TestCase):
"""datalake profiler E2E test""" """datalake profiler E2E test"""
mongo_container: MongoDbContainer
client: MongoClient
db: database.Database
collection: database.Collection
ingestion_config: dict
metadata: OpenMetadata
@classmethod @classmethod
def setUpClass(cls) -> None: def setUpClass(cls) -> None:
cls.metadata = int_admin_ometa() cls.metadata = int_admin_ometa()
cls.mongo_container = MongoDbContainer("mongo:7.0.5-jammy")
def setUp(self) -> None: cls.mongo_container.start()
self.mongo_container = MongoDbContainer("mongo:7.0.5-jammy") cls.client = MongoClient(cls.mongo_container.get_connection_url())
self.mongo_container.start() cls.db = cls.client[TEST_DATABASE]
self.client = MongoClient(self.mongo_container.get_connection_url()) cls.collection = cls.db[TEST_COLLECTION]
self.db = self.client[TEST_DATABASE] cls.collection.insert_many(TEST_DATA)
self.collection = self.db[TEST_COLLECTION] cls.db.create_collection(EMPTY_COLLECTION)
self.collection.insert_many(TEST_DATA) cls.ingestion_config = get_ingestion_config(
self.db.create_collection(EMPTY_COLLECTION) cls.mongo_container.get_exposed_port("27017"), "test", "test"
self.ingestion_config = get_ingestion_config(
self.mongo_container.get_exposed_port("27017"), "test", "test"
) )
ingestion_workflow = MetadataWorkflow.create( ingestion_workflow = MetadataWorkflow.create(
self.ingestion_config, cls.ingestion_config,
) )
ingestion_workflow.execute() ingestion_workflow.execute()
ingestion_workflow.raise_from_status() ingestion_workflow.raise_from_status()
print_status(ingestion_workflow) print_status(ingestion_workflow)
ingestion_workflow.stop() ingestion_workflow.stop()
def tearDown(self): @classmethod
def tearDownClass(cls):
with accumulate_errors() as error_handler: with accumulate_errors() as error_handler:
error_handler.try_execute(partial(self.mongo_container.stop, force=True)) error_handler.try_execute(partial(cls.mongo_container.stop, force=True))
error_handler.try_execute(self.delete_service) error_handler.try_execute(cls.delete_service)
def delete_service(self): @classmethod
def delete_service(cls):
service_id = str( service_id = str(
self.metadata.get_by_name( cls.metadata.get_by_name(
entity=DatabaseService, fqn=SERVICE_NAME entity=DatabaseService, fqn=SERVICE_NAME
).id.__root__ ).id.__root__
) )
self.metadata.delete( cls.metadata.delete(
entity=DatabaseService, entity=DatabaseService,
entity_id=service_id, entity_id=service_id,
recursive=True, recursive=True,
@ -140,9 +154,12 @@ class NoSQLProfiler(TestCase):
) )
def test_setup_teardown(self): def test_setup_teardown(self):
"""
does nothing. useful to check if the setup and teardown methods are working
"""
pass pass
def test_row_count(self): def test_simple(self):
workflow_config = deepcopy(self.ingestion_config) workflow_config = deepcopy(self.ingestion_config)
workflow_config["source"]["sourceConfig"]["config"].update( workflow_config["source"]["sourceConfig"]["config"].update(
{ {
@ -160,7 +177,7 @@ class NoSQLProfiler(TestCase):
assert status == 0 assert status == 0
expectations = {TEST_COLLECTION: 3, EMPTY_COLLECTION: 0} expectations = {TEST_COLLECTION: len(TEST_DATA), EMPTY_COLLECTION: 0}
for collection, expected_row_count in expectations.items(): for collection, expected_row_count in expectations.items():
collection_profile = self.metadata.get_profile_data( collection_profile = self.metadata.get_profile_data(
@ -177,3 +194,74 @@ class NoSQLProfiler(TestCase):
profile_type=ColumnProfile, profile_type=ColumnProfile,
) )
assert len(column_profile.entities) == 0 assert len(column_profile.entities) == 0
table = self.metadata.get_by_name(
Table, f"{SERVICE_NAME}.default.{TEST_DATABASE}.{TEST_COLLECTION}"
)
sample_data = self.metadata.get_sample_data(table)
assert [c.__root__ for c in sample_data.sampleData.columns] == [
"_id",
"name",
"age",
"city",
"nested",
]
assert len(sample_data.sampleData.rows) == SAMPLE_DATA_DEFAULT_COUNT
def test_custom_query(self):
workflow_config = deepcopy(self.ingestion_config)
workflow_config["source"]["sourceConfig"]["config"].update(
{
"type": "Profiler",
}
)
query_age = TEST_DATA[0]["age"]
workflow_config["processor"] = {
"type": "orm-profiler",
"config": {
"tableConfig": [
{
"fullyQualifiedName": f"{SERVICE_NAME}.default.{TEST_DATABASE}.{TEST_COLLECTION}",
"profileQuery": '{"age": %s}' % query_age,
}
],
},
}
profiler_workflow = ProfilerWorkflow.create(workflow_config)
profiler_workflow.execute()
status = profiler_workflow.result_status()
profiler_workflow.stop()
assert status == 0
# query profiler in MongoDB does not change the item count
expectations = {TEST_COLLECTION: len(TEST_DATA), EMPTY_COLLECTION: 0}
for collection, expected_row_count in expectations.items():
collection_profile = self.metadata.get_profile_data(
f"{SERVICE_NAME}.default.{TEST_DATABASE}.{collection}",
datetime_to_ts(datetime.now() - timedelta(seconds=10)),
get_end_of_day_timestamp_mill(),
)
assert collection_profile.entities, collection
assert (
collection_profile.entities[-1].rowCount == expected_row_count
), collection
column_profile = self.metadata.get_profile_data(
f"{SERVICE_NAME}.default.{TEST_DATABASE}.{TEST_COLLECTION}.age",
datetime_to_ts(datetime.now() - timedelta(seconds=10)),
get_end_of_day_timestamp_mill(),
profile_type=ColumnProfile,
)
assert len(column_profile.entities) == 0, collection
table = self.metadata.get_by_name(
Table, f"{SERVICE_NAME}.default.{TEST_DATABASE}.{TEST_COLLECTION}"
)
sample_data = self.metadata.get_sample_data(table)
age_column_index = [
col.__root__ for col in sample_data.sampleData.columns
].index("age")
assert all(
[r[age_column_index] == query_age for r in sample_data.sampleData.rows]
)