diff --git a/ingestion/setup.py b/ingestion/setup.py index 22ae132f4c0..d3721f29d84 100644 --- a/ingestion/setup.py +++ b/ingestion/setup.py @@ -281,6 +281,7 @@ dev = { test = { # Install Airflow as it's not part of `all` plugin VERSIONS["airflow"], + "boto3-stubs[boto3]", "coverage", # Install GE because it's not in the `all` plugin VERSIONS["great-expectations"], diff --git a/ingestion/src/metadata/__init__.py b/ingestion/src/metadata/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/ingestion/src/metadata/profiler/adaptors/dynamodb.py b/ingestion/src/metadata/profiler/adaptors/dynamodb.py index 93d731d4ec6..e6ee297aff2 100644 --- a/ingestion/src/metadata/profiler/adaptors/dynamodb.py +++ b/ingestion/src/metadata/profiler/adaptors/dynamodb.py @@ -35,4 +35,6 @@ class DynamoDB(NoSQLAdaptor): def scan( self, table: Table, columns: List[Column], limit: int ) -> List[Dict[str, any]]: - return [] + table = self.client.Table(table.name.__root__) + response = table.scan(Limit=limit) + return response["Items"] diff --git a/ingestion/src/metadata/profiler/adaptors/factory.py b/ingestion/src/metadata/profiler/adaptors/factory.py new file mode 100644 index 00000000000..50005158b58 --- /dev/null +++ b/ingestion/src/metadata/profiler/adaptors/factory.py @@ -0,0 +1,76 @@ +# Copyright 2024 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +factory for NoSQL adaptors that are used in the NoSQLProfiler. +""" +from typing import Callable + +from metadata.generated.schema.entity.services.connections.database.dynamoDBConnection import ( + DynamoDBConnection, +) +from metadata.generated.schema.entity.services.connections.database.mongoDBConnection import ( + MongoDBConnection, +) +from metadata.profiler.adaptors.dynamodb import DynamoDB +from metadata.profiler.adaptors.mongodb import MongoDB +from metadata.profiler.adaptors.nosql_adaptor import NoSQLAdaptor +from metadata.profiler.factory import Factory + +NoSQLAdaptorConstructor = Callable[[any], NoSQLAdaptor] + + +class NoSQLAdaptorFactory(Factory): + """ + A factory class for creating NoSQL client instances. + + This class maintains a registry of callable constructors for different NoSQL client types. + The client types are registered with their corresponding constructors, + and can be created using the `construct` method. + """ + + def register(self, interface_type: str, interface_class: NoSQLAdaptorConstructor): + """ + Register a client type with its constructor. + + Args: + source_class_name (str): The class of the source client. + target_class (NoSQLClientConstructor): The constructor for the target client. + + Returns: + None + """ + self._interface_type[interface_type] = interface_class + + def create(self, interface_type: any, *args, **kwargs) -> NoSQLAdaptor: + """ + Create a client instance of the type of the given source client. + + Args: + interface_type (str): The type of the source connection. + + Returns: + NoSQLAdaptor: The created client instance. + + Raises: + ValueError: If the type of the source client is not registered. + """ + client_class = self._interface_type.get(interface_type) + if not client_class: + raise ValueError(f"Unknown NoSQL source: {interface_type}") + return client_class(*args, **kwargs) + + +factory = NoSQLAdaptorFactory() +adaptors = { + MongoDBConnection.__name__: MongoDB, + DynamoDBConnection.__name__: DynamoDB, +} +factory.register_many(adaptors) diff --git a/ingestion/src/metadata/profiler/interface/nosql/profiler_interface.py b/ingestion/src/metadata/profiler/interface/nosql/profiler_interface.py index ce3f6a8857e..b74d1683cd8 100644 --- a/ingestion/src/metadata/profiler/interface/nosql/profiler_interface.py +++ b/ingestion/src/metadata/profiler/interface/nosql/profiler_interface.py @@ -23,7 +23,7 @@ from sqlalchemy import Column from metadata.generated.schema.entity.data.table import TableData from metadata.generated.schema.tests.customMetric import CustomMetric -from metadata.profiler.adaptors.adaptor_factory import factory +from metadata.profiler.adaptors.factory import factory from metadata.profiler.adaptors.nosql_adaptor import NoSQLAdaptor from metadata.profiler.api.models import ThreadPoolMetrics from metadata.profiler.interface.profiler_interface import ProfilerInterface @@ -166,7 +166,8 @@ class NoSQLProfilerInterface(ProfilerInterface): self.service_connection_config.__class__.__name__, table=self.table, client=factory.create( - self.service_connection_config.__class__.__name__, self.connection + self.service_connection_config.__class__.__name__, + client=self.connection, ), profile_sample_config=self.profile_sample_config, partition_details=self.partition_details, @@ -190,7 +191,8 @@ class NoSQLProfilerInterface(ProfilerInterface): """get all profiler metrics""" profile_results = {"table": {}, "columns": defaultdict(dict)} runner = factory.create( - self.service_connection_config.__class__.__name__, self.connection + self.service_connection_config.__class__.__name__, + client=self.connection, ) metric_list = [ self.compute_metrics(runner, metric_func) for metric_func in metric_funcs diff --git a/ingestion/src/metadata/workflow/ingestion.py b/ingestion/src/metadata/workflow/ingestion.py index a89c58c1d5c..ae736f15b39 100644 --- a/ingestion/src/metadata/workflow/ingestion.py +++ b/ingestion/src/metadata/workflow/ingestion.py @@ -41,6 +41,7 @@ from metadata.ingestion.api.parser import parse_workflow_config_gracefully from metadata.ingestion.api.step import Step, Summary from metadata.ingestion.api.steps import BulkSink, Processor, Sink, Source, Stage from metadata.ingestion.models.custom_types import ServiceWithConnectionType +from metadata.profiler.api.models import ProfilerProcessorConfig from metadata.utils.class_helper import ( get_service_class_from_service_type, get_service_type_from_source_type, @@ -201,3 +202,21 @@ class IngestionWorkflow(BaseWorkflow, ABC): f"Unknown error getting service connection for service name [{service_name}]" f" using the secrets manager provider [{self.metadata.config.secretsManagerProvider}]: {exc}" ) + + def validate(self): + try: + if ( + not self.config.source.serviceConnection.__root__.config.supportsProfiler + ): + raise AttributeError() + except AttributeError: + if ProfilerProcessorConfig.parse_obj( + self.config.processor.dict().get("config") + ).ignoreValidation: + logger.debug( + f"Profiler is not supported for the service connection: {self.config.source.serviceConnection}" + ) + return + raise WorkflowExecutionError( + f"Profiler is not supported for the service connection: {self.config.source.serviceConnection}" + ) diff --git a/ingestion/tests/integration/profiler/conftest.py b/ingestion/tests/integration/profiler/conftest.py new file mode 100644 index 00000000000..f0607f74f96 --- /dev/null +++ b/ingestion/tests/integration/profiler/conftest.py @@ -0,0 +1,92 @@ +from typing import TYPE_CHECKING + +import boto3 +import botocore +import pytest +from testcontainers.localstack import LocalStackContainer + +from metadata.generated.schema.api.services.createDatabaseService import ( + CreateDatabaseServiceRequest, +) +from metadata.generated.schema.entity.services.connections.database.dynamoDBConnection import ( + DynamoDBConnection, +) +from metadata.generated.schema.entity.services.databaseService import ( + DatabaseConnection, + DatabaseService, + DatabaseServiceType, +) +from metadata.generated.schema.security.credentials.awsCredentials import AWSCredentials + +from ...integration.integration_base import int_admin_ometa + +if TYPE_CHECKING: + from mypy_boto3_dynamodb.client import DynamoDBClient +else: + DynamoDBClient = None + + +@pytest.fixture(scope="session") +def metadata(): + return int_admin_ometa() + + +@pytest.fixture(scope="session") +def localstack_container(): + with LocalStackContainer("localstack/localstack:3.3") as container: + yield container + + +@pytest.fixture(scope="session") +def ingest_sample_data(localstack_container): + client: DynamoDBClient = boto3.client( + "dynamodb", + region_name="us-east-1", + endpoint_url=localstack_container.get_url(), + aws_access_key_id="does-not-matter", + aws_secret_access_key="does-not-matter", + ) + client.create_table( + TableName="test_table", + KeySchema=[ + {"AttributeName": "id", "KeyType": "HASH"}, + ], + AttributeDefinitions=[ + {"AttributeName": "id", "AttributeType": "S"}, + ], + ProvisionedThroughput={ + "ReadCapacityUnits": 5, + "WriteCapacityUnits": 5, + }, + ) + rows = [ + {"id": "1", "name": "Alice"}, + {"id": "2", "name": "Bob"}, + ] + for row in rows: + client.put_item( + TableName="test_table", Item={k: {"S": v} for k, v in row.items()} + ) + + +@pytest.fixture(scope="module") +def db_service(metadata, localstack_container): + service = CreateDatabaseServiceRequest( + name="docker_dynamo_db", + serviceType=DatabaseServiceType.DynamoDB, + connection=DatabaseConnection( + config=DynamoDBConnection( + awsConfig=AWSCredentials( + awsRegion="us-east-1", + endPointURL=localstack_container.get_url(), + awsAccessKeyId="does-not-matter", + awsSecretAccessKey="does-not-matter", + ), + ), + ), + ) + service_entity = metadata.create_or_update(data=service) + yield service_entity + metadata.delete( + DatabaseService, service_entity.id, recursive=True, hard_delete=True + ) diff --git a/ingestion/tests/integration/profiler/test_dynamodb.py b/ingestion/tests/integration/profiler/test_dynamodb.py new file mode 100644 index 00000000000..58ea56d1581 --- /dev/null +++ b/ingestion/tests/integration/profiler/test_dynamodb.py @@ -0,0 +1,93 @@ +import pytest + +from metadata.generated.schema.entity.data.table import Table +from metadata.generated.schema.entity.services.databaseService import DatabaseService +from metadata.generated.schema.metadataIngestion.databaseServiceProfilerPipeline import ( + ProfilerConfigType, +) +from metadata.generated.schema.metadataIngestion.workflow import ( + LogLevels, + OpenMetadataWorkflowConfig, + Sink, + Source, + SourceConfig, + WorkflowConfig, +) +from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.workflow.metadata import MetadataWorkflow +from metadata.workflow.profiler import ProfilerWorkflow + + +@pytest.fixture(autouse=True, scope="module") +def ingest_metadata( + db_service: DatabaseService, metadata: OpenMetadata, ingest_sample_data +): + workflow_config = OpenMetadataWorkflowConfig( + source=Source( + type=db_service.serviceType.name.lower(), + serviceName=db_service.fullyQualifiedName.__root__, + sourceConfig=SourceConfig(config={}), + serviceConnection=db_service.connection, + ), + sink=Sink( + type="metadata-rest", + config={}, + ), + workflowConfig=WorkflowConfig(openMetadataServerConfig=metadata.config), + ) + metadata_ingestion = MetadataWorkflow.create(workflow_config) + metadata_ingestion.execute() + metadata_ingestion.raise_from_status() + return + + +@pytest.fixture(scope="module") +def db_fqn(db_service: DatabaseService): + return ".".join( + [ + db_service.fullyQualifiedName.__root__, + "default", + "default", + ] + ) + + +def test_sample_data(db_service, db_fqn, metadata): + workflow_config = { + "source": { + "type": db_service.serviceType.name.lower(), + "serviceName": db_service.fullyQualifiedName.__root__, + "sourceConfig": { + "config": { + "type": ProfilerConfigType.Profiler.value, + }, + }, + }, + "processor": { + "type": "orm-profiler", + "config": {}, + }, + "sink": { + "type": "metadata-rest", + "config": {}, + }, + "workflowConfig": { + "loggerLevel": LogLevels.DEBUG, + "openMetadataServerConfig": metadata.config.dict(), + }, + } + profiler_workflow = ProfilerWorkflow.create(workflow_config) + profiler_workflow.execute() + profiler_workflow.raise_from_status() + table = metadata.list_entities( + Table, + fields=["fullyQualifiedName"], + params={ + "databaseSchema": db_fqn, + }, + ).entities[0] + sample_data = metadata.get_sample_data(table).sampleData + assert sample_data is not None + assert len(sample_data.columns) == 2 + assert len(sample_data.rows) == 2 + assert sample_data.rows[0][0] == "Alice"