diff --git a/ingestion/examples/workflows/dynamodb.json b/ingestion/examples/workflows/dynamodb.json index 46cff4d6d04..b8ede535aca 100644 --- a/ingestion/examples/workflows/dynamodb.json +++ b/ingestion/examples/workflows/dynamodb.json @@ -1,28 +1,35 @@ { - "source": { - "type": "dynamodb", + "source": { + "type": "dynamodb", + "serviceName": "local_dynamodb", + "serviceConnection": { "config": { - "aws_access_key_id": "aws_access_key_id", - "aws_secret_access_key": "aws_secret_access_key", - "service_name": "DynamoDB", - "region_name": "us-east-2", - "endpoint_url": "https://dynamodb.us-east-2.amazonaws.com", - "db_name":"custom_database_name", - "table_filter_pattern":{ - "excludes": [""] + "awsAccessKeyId": "aws_access_key_id", + "awsSecretAccessKey": "aws_secret_access_key", + "awsRegion": "us-east-2", + "endPointURL": "https://dynamodb.us-east-2.amazonaws.com", + "database": "custom_database_name" + } + }, + "sourceConfig": { + "config": { + "enableDataProfiler": false, + "tableFilterPattern": { + "includes": [ + "" + ] } } - }, - "sink": { - "type": "metadata-rest", - "config": {} - }, - "metadata_server": { - "type": "metadata-server", - "config": { - "api_endpoint": "http://localhost:8585/api", - "auth_provider_type": "no-auth" - } + } + }, + "sink": { + "type": "metadata-rest", + "config": {} + }, + "workflowConfig": { + "openMetadataServerConfig": { + "hostPort": "http://localhost:8585/api", + "authProvider": "no-auth" } } - \ No newline at end of file +} \ No newline at end of file diff --git a/ingestion/src/metadata/ingestion/source/dynamodb.py b/ingestion/src/metadata/ingestion/source/dynamodb.py index cf2126348a8..0b87aa4378c 100644 --- a/ingestion/src/metadata/ingestion/source/dynamodb.py +++ b/ingestion/src/metadata/ingestion/source/dynamodb.py @@ -3,44 +3,35 @@ import traceback import uuid from typing import Iterable -from metadata.config.common import FQDN_SEPARATOR from metadata.generated.schema.entity.data.database import Database from metadata.generated.schema.entity.data.table import Column, Table -from metadata.generated.schema.entity.services.databaseService import ( - DatabaseServiceType, +from metadata.generated.schema.entity.services.connections.database.dynamoDBConnection import ( + DynamoDBConnection, ) from metadata.generated.schema.metadataIngestion.workflow import ( OpenMetadataServerConfig, ) +from metadata.generated.schema.metadataIngestion.workflow import ( + Source as WorkflowSource, +) from metadata.generated.schema.type.entityReference import EntityReference -from metadata.ingestion.api.common import Entity, IncludeFilterPattern -from metadata.ingestion.api.source import Source, SourceStatus +from metadata.ingestion.api.common import Entity +from metadata.ingestion.api.source import InvalidSourceException, Source, SourceStatus from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.ingestion.source.sql_source_common import SQLSourceStatus -from metadata.utils.aws_client import AWSClient, AWSClientConfigModel +from metadata.utils.aws_client import AWSClient from metadata.utils.column_type_parser import ColumnTypeParser +from metadata.utils.filters import filter_by_table from metadata.utils.helpers import get_database_service_or_create logger: logging.Logger = logging.getLogger(__name__) - -class DynamoDBSourceConfig(AWSClientConfigModel): - service_type = DatabaseServiceType.DynamoDB.value - service_name: str - endpoint_url: str - host_port: str = "" - db_name = "DynamoDB" - table_filter_pattern: IncludeFilterPattern = IncludeFilterPattern.allow_all() - - def get_service_type(self) -> DatabaseServiceType: - return DatabaseServiceType[self.service_type] +from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema class DynamodbSource(Source[Entity]): - def __init__( - self, config: DynamoDBSourceConfig, metadata_config: OpenMetadataServerConfig - ): + def __init__(self, config, metadata_config: OpenMetadataServerConfig): super().__init__() self.status = SQLSourceStatus() @@ -50,13 +41,20 @@ class DynamodbSource(Source[Entity]): self.service = get_database_service_or_create( config=config, metadata_config=metadata_config, - service_name=self.config.service_name, + service_name=self.config.serviceName, ) - self.dynamodb = AWSClient(self.config).get_resource("dynamodb") + self.dynamodb = AWSClient( + self.config.serviceConnection.__root__.config + ).get_resource("dynamodb") @classmethod def create(cls, config_dict, metadata_config: OpenMetadataServerConfig): - config = DynamoDBSourceConfig.parse_obj(config_dict) + config: WorkflowSource = WorkflowSource.parse_obj(config_dict) + connection: DynamoDBConnection = config.serviceConnection.__root__.config + if not isinstance(connection, DynamoDBConnection): + raise InvalidSourceException( + f"Expected DynamoDBConnection, but got {connection}" + ) return cls(config, metadata_config) def prepare(self): @@ -77,30 +75,37 @@ class DynamodbSource(Source[Entity]): tables = list(self.dynamodb.tables.all()) for table in tables: try: - if not self.config.table_filter_pattern.included(table.name): + if filter_by_table( + self.config.sourceConfig.config.tableFilterPattern, table.name + ): self.status.filter( "{}".format(table.name), "Table pattern not allowed", ) continue database_entity = Database( - name=self.config.db_name, + id=uuid.uuid4(), + name="default", service=EntityReference(id=self.service.id, type="databaseService"), ) - fqn = f"{self.config.service_name}{FQDN_SEPARATOR}{database_entity.name}{FQDN_SEPARATOR}{table}" - self.dataset_name = fqn table_columns = self.get_columns(table.attribute_definitions) table_entity = Table( id=uuid.uuid4(), name=table.name, description="", - fullyQualifiedName=fqn, columns=table_columns, ) + schema_entity = DatabaseSchema( + id=uuid.uuid4(), + name=self.config.serviceConnection.__root__.config.database, + database=EntityReference(id=database_entity.id, type="database"), + service=EntityReference(id=self.service.id, type="databaseService"), + ) table_and_db = OMetaDatabaseAndTable( table=table_entity, database=database_entity, + database_schema=schema_entity, ) yield table_and_db except Exception as err: diff --git a/ingestion/src/metadata/utils/aws_client.py b/ingestion/src/metadata/utils/aws_client.py index 6c2bf4bbc09..fefe4211cf9 100644 --- a/ingestion/src/metadata/utils/aws_client.py +++ b/ingestion/src/metadata/utils/aws_client.py @@ -12,6 +12,7 @@ from typing import Any, Optional from boto3 import Session +from pydantic import SecretStr from metadata.config.common import ConfigModel @@ -21,11 +22,11 @@ class AWSClientConfigModel(ConfigModel): AWSClientConfigModel holds all config parameters required to instantiate an AWSClient. """ - aws_access_key_id: Optional[str] - aws_secret_access_key: Optional[str] - aws_session_token: Optional[str] - endpoint_url: Optional[str] - region_name: Optional[str] + awsAccessKeyId: Optional[str] + awsSecretAccessKey: Optional[SecretStr] + awsSessionToken: Optional[str] + endPointURL: Optional[str] + awsRegion: Optional[str] class AWSClient: @@ -36,42 +37,43 @@ class AWSClient: config: AWSClientConfigModel def __init__(self, config: AWSClientConfigModel): + self.config = config def _get_session(self) -> Session: if ( - self.config.aws_access_key_id - and self.config.aws_secret_access_key - and self.config.aws_session_token + self.config.awsAccessKeyId + and self.config.awsSecretAccessKey + and self.config.awsSessionToken ): return Session( - aws_access_key_id=self.config.aws_access_key_id, - aws_secret_access_key=self.config.aws_secret_access_key, - aws_session_token=self.config.aws_session_token, - region_name=self.config.region_name, + aws_access_key_id=self.config.awsAccessKeyId, + aws_secret_access_key=self.config.awsSecretAccessKey.get_secret_value(), + aws_session_token=self.config.awsSessionToken, + region_name=self.config.awsRegion, ) - if self.config.aws_access_key_id and self.config.aws_secret_access_key: + if self.config.awsAccessKeyId and self.config.awsSecretAccessKey: return Session( - aws_access_key_id=self.config.aws_access_key_id, - aws_secret_access_key=self.config.aws_secret_access_key, - region_name=self.config.region_name, + aws_access_key_id=self.config.awsAccessKeyId, + aws_secret_access_key=self.config.awsSecretAccessKey.get_secret_value(), + region_name=self.config.awsRegion, ) - if self.config.region_name: - return Session(region_name=self.config.region_name) + if self.config.awsRegion: + return Session(region_name=self.config.awsRegion) return Session() def get_client(self, service_name: str) -> Any: session = self._get_session() - if self.config.endpoint_url is not None: + if self.config.endPointURL is not None: return session.client( - service_name=service_name, endpoint_url=self.config.endpoint_url + service_name=service_name, endpoint_url=self.config.endPointURL ) return session.client(service_name=service_name) def get_resource(self, service_name: str) -> Any: session = self._get_session() - if self.config.endpoint_url is not None: + if self.config.endPointURL is not None: return session.resource( - service_name=service_name, endpoint_url=self.config.endpoint_url + service_name=service_name, endpoint_url=self.config.endPointURL ) return session.resource(service_name=service_name)