MINOR: Dynamodb sample data (#15264)

* feat(nosql-profiler): row count

1. Implemented the NoSQLProfilerInterface as an entrypoint for the nosql profiler.
2. Added the NoSQLMetric as an abstract class.
3. Implemented the interface for the MongoDB database source.
4. Implemented an e2e test using testcontainers.

* added profiler support for mongodb connection

* doc

* use int_admin_ometa in test setup

* - fixed linting issue in gx
- removed unused inheritance

* moved the nosql function into the metric class

* feat(profiler): add dynamodb row count

* feat(profiler): add dynamodb row count

* formatting

* validate_compose: raise exception for bad status code.

* fixed import

* format

* feat(nosql-profiler): added sample data

1. Implemented the NoSQL sampler.
2. Some naming changes to the NoSQL adaptor to avoid fixing names with the profiler interface.
3. Tests.

* added default sample limit

* formatting

* fixed import

* feat(profiler): dynamodb sample data

* tests for dynamo db sample data

* format

* format

* use service connection for nosql adaptor factory

* fixed tests

* format

* fixed after merge
This commit is contained in:
Imri Paran 2024-04-22 17:46:40 +02:00 committed by GitHub
parent cb801dedb4
commit 93ec391f5c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 289 additions and 4 deletions

View File

@ -281,6 +281,7 @@ dev = {
test = {
# Install Airflow as it's not part of `all` plugin
VERSIONS["airflow"],
"boto3-stubs[boto3]",
"coverage",
# Install GE because it's not in the `all` plugin
VERSIONS["great-expectations"],

View File

View File

@ -35,4 +35,6 @@ class DynamoDB(NoSQLAdaptor):
def scan(
self, table: Table, columns: List[Column], limit: int
) -> List[Dict[str, any]]:
return []
table = self.client.Table(table.name.__root__)
response = table.scan(Limit=limit)
return response["Items"]

View File

@ -0,0 +1,76 @@
# Copyright 2024 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
factory for NoSQL adaptors that are used in the NoSQLProfiler.
"""
from typing import Callable
from metadata.generated.schema.entity.services.connections.database.dynamoDBConnection import (
DynamoDBConnection,
)
from metadata.generated.schema.entity.services.connections.database.mongoDBConnection import (
MongoDBConnection,
)
from metadata.profiler.adaptors.dynamodb import DynamoDB
from metadata.profiler.adaptors.mongodb import MongoDB
from metadata.profiler.adaptors.nosql_adaptor import NoSQLAdaptor
from metadata.profiler.factory import Factory
NoSQLAdaptorConstructor = Callable[[any], NoSQLAdaptor]
class NoSQLAdaptorFactory(Factory):
"""
A factory class for creating NoSQL client instances.
This class maintains a registry of callable constructors for different NoSQL client types.
The client types are registered with their corresponding constructors,
and can be created using the `construct` method.
"""
def register(self, interface_type: str, interface_class: NoSQLAdaptorConstructor):
"""
Register a client type with its constructor.
Args:
source_class_name (str): The class of the source client.
target_class (NoSQLClientConstructor): The constructor for the target client.
Returns:
None
"""
self._interface_type[interface_type] = interface_class
def create(self, interface_type: any, *args, **kwargs) -> NoSQLAdaptor:
"""
Create a client instance of the type of the given source client.
Args:
interface_type (str): The type of the source connection.
Returns:
NoSQLAdaptor: The created client instance.
Raises:
ValueError: If the type of the source client is not registered.
"""
client_class = self._interface_type.get(interface_type)
if not client_class:
raise ValueError(f"Unknown NoSQL source: {interface_type}")
return client_class(*args, **kwargs)
factory = NoSQLAdaptorFactory()
adaptors = {
MongoDBConnection.__name__: MongoDB,
DynamoDBConnection.__name__: DynamoDB,
}
factory.register_many(adaptors)

View File

@ -23,7 +23,7 @@ from sqlalchemy import Column
from metadata.generated.schema.entity.data.table import TableData
from metadata.generated.schema.tests.customMetric import CustomMetric
from metadata.profiler.adaptors.adaptor_factory import factory
from metadata.profiler.adaptors.factory import factory
from metadata.profiler.adaptors.nosql_adaptor import NoSQLAdaptor
from metadata.profiler.api.models import ThreadPoolMetrics
from metadata.profiler.interface.profiler_interface import ProfilerInterface
@ -166,7 +166,8 @@ class NoSQLProfilerInterface(ProfilerInterface):
self.service_connection_config.__class__.__name__,
table=self.table,
client=factory.create(
self.service_connection_config.__class__.__name__, self.connection
self.service_connection_config.__class__.__name__,
client=self.connection,
),
profile_sample_config=self.profile_sample_config,
partition_details=self.partition_details,
@ -190,7 +191,8 @@ class NoSQLProfilerInterface(ProfilerInterface):
"""get all profiler metrics"""
profile_results = {"table": {}, "columns": defaultdict(dict)}
runner = factory.create(
self.service_connection_config.__class__.__name__, self.connection
self.service_connection_config.__class__.__name__,
client=self.connection,
)
metric_list = [
self.compute_metrics(runner, metric_func) for metric_func in metric_funcs

View File

@ -41,6 +41,7 @@ from metadata.ingestion.api.parser import parse_workflow_config_gracefully
from metadata.ingestion.api.step import Step, Summary
from metadata.ingestion.api.steps import BulkSink, Processor, Sink, Source, Stage
from metadata.ingestion.models.custom_types import ServiceWithConnectionType
from metadata.profiler.api.models import ProfilerProcessorConfig
from metadata.utils.class_helper import (
get_service_class_from_service_type,
get_service_type_from_source_type,
@ -201,3 +202,21 @@ class IngestionWorkflow(BaseWorkflow, ABC):
f"Unknown error getting service connection for service name [{service_name}]"
f" using the secrets manager provider [{self.metadata.config.secretsManagerProvider}]: {exc}"
)
def validate(self):
try:
if (
not self.config.source.serviceConnection.__root__.config.supportsProfiler
):
raise AttributeError()
except AttributeError:
if ProfilerProcessorConfig.parse_obj(
self.config.processor.dict().get("config")
).ignoreValidation:
logger.debug(
f"Profiler is not supported for the service connection: {self.config.source.serviceConnection}"
)
return
raise WorkflowExecutionError(
f"Profiler is not supported for the service connection: {self.config.source.serviceConnection}"
)

View File

@ -0,0 +1,92 @@
from typing import TYPE_CHECKING
import boto3
import botocore
import pytest
from testcontainers.localstack import LocalStackContainer
from metadata.generated.schema.api.services.createDatabaseService import (
CreateDatabaseServiceRequest,
)
from metadata.generated.schema.entity.services.connections.database.dynamoDBConnection import (
DynamoDBConnection,
)
from metadata.generated.schema.entity.services.databaseService import (
DatabaseConnection,
DatabaseService,
DatabaseServiceType,
)
from metadata.generated.schema.security.credentials.awsCredentials import AWSCredentials
from ...integration.integration_base import int_admin_ometa
if TYPE_CHECKING:
from mypy_boto3_dynamodb.client import DynamoDBClient
else:
DynamoDBClient = None
@pytest.fixture(scope="session")
def metadata():
return int_admin_ometa()
@pytest.fixture(scope="session")
def localstack_container():
with LocalStackContainer("localstack/localstack:3.3") as container:
yield container
@pytest.fixture(scope="session")
def ingest_sample_data(localstack_container):
client: DynamoDBClient = boto3.client(
"dynamodb",
region_name="us-east-1",
endpoint_url=localstack_container.get_url(),
aws_access_key_id="does-not-matter",
aws_secret_access_key="does-not-matter",
)
client.create_table(
TableName="test_table",
KeySchema=[
{"AttributeName": "id", "KeyType": "HASH"},
],
AttributeDefinitions=[
{"AttributeName": "id", "AttributeType": "S"},
],
ProvisionedThroughput={
"ReadCapacityUnits": 5,
"WriteCapacityUnits": 5,
},
)
rows = [
{"id": "1", "name": "Alice"},
{"id": "2", "name": "Bob"},
]
for row in rows:
client.put_item(
TableName="test_table", Item={k: {"S": v} for k, v in row.items()}
)
@pytest.fixture(scope="module")
def db_service(metadata, localstack_container):
service = CreateDatabaseServiceRequest(
name="docker_dynamo_db",
serviceType=DatabaseServiceType.DynamoDB,
connection=DatabaseConnection(
config=DynamoDBConnection(
awsConfig=AWSCredentials(
awsRegion="us-east-1",
endPointURL=localstack_container.get_url(),
awsAccessKeyId="does-not-matter",
awsSecretAccessKey="does-not-matter",
),
),
),
)
service_entity = metadata.create_or_update(data=service)
yield service_entity
metadata.delete(
DatabaseService, service_entity.id, recursive=True, hard_delete=True
)

View File

@ -0,0 +1,93 @@
import pytest
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.services.databaseService import DatabaseService
from metadata.generated.schema.metadataIngestion.databaseServiceProfilerPipeline import (
ProfilerConfigType,
)
from metadata.generated.schema.metadataIngestion.workflow import (
LogLevels,
OpenMetadataWorkflowConfig,
Sink,
Source,
SourceConfig,
WorkflowConfig,
)
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.workflow.metadata import MetadataWorkflow
from metadata.workflow.profiler import ProfilerWorkflow
@pytest.fixture(autouse=True, scope="module")
def ingest_metadata(
db_service: DatabaseService, metadata: OpenMetadata, ingest_sample_data
):
workflow_config = OpenMetadataWorkflowConfig(
source=Source(
type=db_service.serviceType.name.lower(),
serviceName=db_service.fullyQualifiedName.__root__,
sourceConfig=SourceConfig(config={}),
serviceConnection=db_service.connection,
),
sink=Sink(
type="metadata-rest",
config={},
),
workflowConfig=WorkflowConfig(openMetadataServerConfig=metadata.config),
)
metadata_ingestion = MetadataWorkflow.create(workflow_config)
metadata_ingestion.execute()
metadata_ingestion.raise_from_status()
return
@pytest.fixture(scope="module")
def db_fqn(db_service: DatabaseService):
return ".".join(
[
db_service.fullyQualifiedName.__root__,
"default",
"default",
]
)
def test_sample_data(db_service, db_fqn, metadata):
workflow_config = {
"source": {
"type": db_service.serviceType.name.lower(),
"serviceName": db_service.fullyQualifiedName.__root__,
"sourceConfig": {
"config": {
"type": ProfilerConfigType.Profiler.value,
},
},
},
"processor": {
"type": "orm-profiler",
"config": {},
},
"sink": {
"type": "metadata-rest",
"config": {},
},
"workflowConfig": {
"loggerLevel": LogLevels.DEBUG,
"openMetadataServerConfig": metadata.config.dict(),
},
}
profiler_workflow = ProfilerWorkflow.create(workflow_config)
profiler_workflow.execute()
profiler_workflow.raise_from_status()
table = metadata.list_entities(
Table,
fields=["fullyQualifiedName"],
params={
"databaseSchema": db_fqn,
},
).entities[0]
sample_data = metadata.get_sample_data(table).sampleData
assert sample_data is not None
assert len(sample_data.columns) == 2
assert len(sample_data.rows) == 2
assert sample_data.rows[0][0] == "Alice"