Dynamodb source fix (#3882)

Dynamodb source fix (#3882)
This commit is contained in:
Onkar Ravgan 2022-04-07 13:45:47 +05:30 committed by GitHub
parent eceff1354b
commit 4dce0a061a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 86 additions and 72 deletions

View File

@ -1,28 +1,35 @@
{
"source": {
"type": "dynamodb",
"source": {
"type": "dynamodb",
"serviceName": "local_dynamodb",
"serviceConnection": {
"config": {
"aws_access_key_id": "aws_access_key_id",
"aws_secret_access_key": "aws_secret_access_key",
"service_name": "DynamoDB",
"region_name": "us-east-2",
"endpoint_url": "https://dynamodb.us-east-2.amazonaws.com",
"db_name":"custom_database_name",
"table_filter_pattern":{
"excludes": [""]
"awsAccessKeyId": "aws_access_key_id",
"awsSecretAccessKey": "aws_secret_access_key",
"awsRegion": "us-east-2",
"endPointURL": "https://dynamodb.us-east-2.amazonaws.com",
"database": "custom_database_name"
}
},
"sourceConfig": {
"config": {
"enableDataProfiler": false,
"tableFilterPattern": {
"includes": [
""
]
}
}
},
"sink": {
"type": "metadata-rest",
"config": {}
},
"metadata_server": {
"type": "metadata-server",
"config": {
"api_endpoint": "http://localhost:8585/api",
"auth_provider_type": "no-auth"
}
}
},
"sink": {
"type": "metadata-rest",
"config": {}
},
"workflowConfig": {
"openMetadataServerConfig": {
"hostPort": "http://localhost:8585/api",
"authProvider": "no-auth"
}
}
}

View File

@ -3,44 +3,35 @@ import traceback
import uuid
from typing import Iterable
from metadata.config.common import FQDN_SEPARATOR
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.table import Column, Table
from metadata.generated.schema.entity.services.databaseService import (
DatabaseServiceType,
from metadata.generated.schema.entity.services.connections.database.dynamoDBConnection import (
DynamoDBConnection,
)
from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataServerConfig,
)
from metadata.generated.schema.metadataIngestion.workflow import (
Source as WorkflowSource,
)
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.common import Entity, IncludeFilterPattern
from metadata.ingestion.api.source import Source, SourceStatus
from metadata.ingestion.api.common import Entity
from metadata.ingestion.api.source import InvalidSourceException, Source, SourceStatus
from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.sql_source_common import SQLSourceStatus
from metadata.utils.aws_client import AWSClient, AWSClientConfigModel
from metadata.utils.aws_client import AWSClient
from metadata.utils.column_type_parser import ColumnTypeParser
from metadata.utils.filters import filter_by_table
from metadata.utils.helpers import get_database_service_or_create
logger: logging.Logger = logging.getLogger(__name__)
class DynamoDBSourceConfig(AWSClientConfigModel):
service_type = DatabaseServiceType.DynamoDB.value
service_name: str
endpoint_url: str
host_port: str = ""
db_name = "DynamoDB"
table_filter_pattern: IncludeFilterPattern = IncludeFilterPattern.allow_all()
def get_service_type(self) -> DatabaseServiceType:
return DatabaseServiceType[self.service_type]
from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
class DynamodbSource(Source[Entity]):
def __init__(
self, config: DynamoDBSourceConfig, metadata_config: OpenMetadataServerConfig
):
def __init__(self, config, metadata_config: OpenMetadataServerConfig):
super().__init__()
self.status = SQLSourceStatus()
@ -50,13 +41,20 @@ class DynamodbSource(Source[Entity]):
self.service = get_database_service_or_create(
config=config,
metadata_config=metadata_config,
service_name=self.config.service_name,
service_name=self.config.serviceName,
)
self.dynamodb = AWSClient(self.config).get_resource("dynamodb")
self.dynamodb = AWSClient(
self.config.serviceConnection.__root__.config
).get_resource("dynamodb")
@classmethod
def create(cls, config_dict, metadata_config: OpenMetadataServerConfig):
config = DynamoDBSourceConfig.parse_obj(config_dict)
config: WorkflowSource = WorkflowSource.parse_obj(config_dict)
connection: DynamoDBConnection = config.serviceConnection.__root__.config
if not isinstance(connection, DynamoDBConnection):
raise InvalidSourceException(
f"Expected DynamoDBConnection, but got {connection}"
)
return cls(config, metadata_config)
def prepare(self):
@ -77,30 +75,37 @@ class DynamodbSource(Source[Entity]):
tables = list(self.dynamodb.tables.all())
for table in tables:
try:
if not self.config.table_filter_pattern.included(table.name):
if filter_by_table(
self.config.sourceConfig.config.tableFilterPattern, table.name
):
self.status.filter(
"{}".format(table.name),
"Table pattern not allowed",
)
continue
database_entity = Database(
name=self.config.db_name,
id=uuid.uuid4(),
name="default",
service=EntityReference(id=self.service.id, type="databaseService"),
)
fqn = f"{self.config.service_name}{FQDN_SEPARATOR}{database_entity.name}{FQDN_SEPARATOR}{table}"
self.dataset_name = fqn
table_columns = self.get_columns(table.attribute_definitions)
table_entity = Table(
id=uuid.uuid4(),
name=table.name,
description="",
fullyQualifiedName=fqn,
columns=table_columns,
)
schema_entity = DatabaseSchema(
id=uuid.uuid4(),
name=self.config.serviceConnection.__root__.config.database,
database=EntityReference(id=database_entity.id, type="database"),
service=EntityReference(id=self.service.id, type="databaseService"),
)
table_and_db = OMetaDatabaseAndTable(
table=table_entity,
database=database_entity,
database_schema=schema_entity,
)
yield table_and_db
except Exception as err:

View File

@ -12,6 +12,7 @@
from typing import Any, Optional
from boto3 import Session
from pydantic import SecretStr
from metadata.config.common import ConfigModel
@ -21,11 +22,11 @@ class AWSClientConfigModel(ConfigModel):
AWSClientConfigModel holds all config parameters required to instantiate an AWSClient.
"""
aws_access_key_id: Optional[str]
aws_secret_access_key: Optional[str]
aws_session_token: Optional[str]
endpoint_url: Optional[str]
region_name: Optional[str]
awsAccessKeyId: Optional[str]
awsSecretAccessKey: Optional[SecretStr]
awsSessionToken: Optional[str]
endPointURL: Optional[str]
awsRegion: Optional[str]
class AWSClient:
@ -36,42 +37,43 @@ class AWSClient:
config: AWSClientConfigModel
def __init__(self, config: AWSClientConfigModel):
self.config = config
def _get_session(self) -> Session:
if (
self.config.aws_access_key_id
and self.config.aws_secret_access_key
and self.config.aws_session_token
self.config.awsAccessKeyId
and self.config.awsSecretAccessKey
and self.config.awsSessionToken
):
return Session(
aws_access_key_id=self.config.aws_access_key_id,
aws_secret_access_key=self.config.aws_secret_access_key,
aws_session_token=self.config.aws_session_token,
region_name=self.config.region_name,
aws_access_key_id=self.config.awsAccessKeyId,
aws_secret_access_key=self.config.awsSecretAccessKey.get_secret_value(),
aws_session_token=self.config.awsSessionToken,
region_name=self.config.awsRegion,
)
if self.config.aws_access_key_id and self.config.aws_secret_access_key:
if self.config.awsAccessKeyId and self.config.awsSecretAccessKey:
return Session(
aws_access_key_id=self.config.aws_access_key_id,
aws_secret_access_key=self.config.aws_secret_access_key,
region_name=self.config.region_name,
aws_access_key_id=self.config.awsAccessKeyId,
aws_secret_access_key=self.config.awsSecretAccessKey.get_secret_value(),
region_name=self.config.awsRegion,
)
if self.config.region_name:
return Session(region_name=self.config.region_name)
if self.config.awsRegion:
return Session(region_name=self.config.awsRegion)
return Session()
def get_client(self, service_name: str) -> Any:
session = self._get_session()
if self.config.endpoint_url is not None:
if self.config.endPointURL is not None:
return session.client(
service_name=service_name, endpoint_url=self.config.endpoint_url
service_name=service_name, endpoint_url=self.config.endPointURL
)
return session.client(service_name=service_name)
def get_resource(self, service_name: str) -> Any:
session = self._get_session()
if self.config.endpoint_url is not None:
if self.config.endPointURL is not None:
return session.resource(
service_name=service_name, endpoint_url=self.config.endpoint_url
service_name=service_name, endpoint_url=self.config.endPointURL
)
return session.resource(service_name=service_name)