From 01b246f586aad82b8b0e1e415b3ead985a3d8806 Mon Sep 17 00:00:00 2001 From: Onkar Ravgan Date: Thu, 31 Mar 2022 20:38:17 +0530 Subject: [PATCH] ISSUE-3485: Added DynamoDB Unit Test (#3488) * ISSUE-3485: Added DynamoDB Unit Test * Added more tests and optimized code * Re-added list check condition * Added file sink test * Fixed file path * added assert for table and column names * Modified exception block and optimized test cases Co-authored-by: Onkar Ravgan --- .../src/metadata/ingestion/source/dynamodb.py | 16 +-- ingestion/tests/unit/source/test_dynamodb.py | 129 ++++++++++++++++++ 2 files changed, 137 insertions(+), 8 deletions(-) create mode 100644 ingestion/tests/unit/source/test_dynamodb.py diff --git a/ingestion/src/metadata/ingestion/source/dynamodb.py b/ingestion/src/metadata/ingestion/source/dynamodb.py index 3a4efe675ca..7e8dbc60b6f 100644 --- a/ingestion/src/metadata/ingestion/source/dynamodb.py +++ b/ingestion/src/metadata/ingestion/source/dynamodb.py @@ -73,9 +73,9 @@ class DynamodbSource(Source[Entity]): logger.error(err) def ingest_tables(self, next_tables_token=None) -> Iterable[OMetaDatabaseAndTable]: - try: - tables = list(self.dynamodb.tables.all()) - for table in tables: + tables = list(self.dynamodb.tables.all()) + for table in tables: + try: if not self.config.table_filter_pattern.included(table.name): self.status.filter( "{}".format(table.name), @@ -102,15 +102,15 @@ class DynamodbSource(Source[Entity]): database=database_entity, ) yield table_and_db - except Exception as err: - logger.debug(traceback.format_exc()) - logger.debug(traceback.print_exc()) - logger.error(err) + except Exception as err: + logger.debug(traceback.format_exc()) + logger.debug(traceback.print_exc()) + logger.error(err) def get_columns(self, column_data): for column in column_data: try: - if "S" in column["AttributeType"].lower(): + if "S" in column["AttributeType"].upper(): column["AttributeType"] = column["AttributeType"].replace(" ", "") parsed_string = ColumnTypeParser._parse_datatype_string( column["AttributeType"].lower() diff --git a/ingestion/tests/unit/source/test_dynamodb.py b/ingestion/tests/unit/source/test_dynamodb.py new file mode 100644 index 00000000000..dd731a887e1 --- /dev/null +++ b/ingestion/tests/unit/source/test_dynamodb.py @@ -0,0 +1,129 @@ +# Copyright 2021 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Query parser utils tests +""" +import json +from unittest import TestCase +from unittest.mock import patch + +from metadata.generated.schema.entity.data.database import Database +from metadata.generated.schema.entity.data.table import Column +from metadata.generated.schema.entity.data.table import Table as GTable +from metadata.ingestion.api.workflow import Workflow +from metadata.ingestion.models.ometa_table_db import OMetaDatabaseAndTable + +CONFIG = """ +{ + "source": { + "type": "dynamodb", + "config": { + "aws_access_key_id": "aws_access_key_id", + "aws_secret_access_key": "aws_secret_access_key", + "service_name": "DynamoDBTest", + "region_name": "us-east-2", + "endpoint_url": "https://dynamodb.us-east-2.amazonaws.com", + "db_name":"custom_database_name", + "table_filter_pattern":{ + "excludes": ["Music.*"] + } + } + }, + "sink": { + "type": "file", + "config": { + "filename": "./datasets.json" + } + }, + "metadata_server": { + "type": "metadata-server", + "config": { + "api_endpoint": "http://localhost:8585/api", + "auth_provider_type": "no-auth" + } + } + } + +""" + + +def execute_workflow(): + workflow = Workflow.create(json.loads(CONFIG)) + workflow.execute() + workflow.print_status() + workflow.stop() + + +class Table: + def __init__(self, name, attribute_definitions): + self.name = name + self.attribute_definitions = attribute_definitions + + +def get_file_path(): + return json.loads(CONFIG)["sink"]["config"]["filename"] + + +MOCK_GET_COLUMNS = [ + {"AttributeName": "Name", "AttributeType": "S"}, + {"AttributeName": "Location", "AttributeType": "L"}, +] + +MOCK_GET_COLUMNS_TWO = [{"AttributeName": 1234, "AttributeType": "S"}] + +MOCK_GET_TABLE_NAMES = [ + Table(name="Forum", attribute_definitions=MOCK_GET_COLUMNS), + Table(name="Music", attribute_definitions=MOCK_GET_COLUMNS), + Table(name="ProductCatalog", attribute_definitions=MOCK_GET_COLUMNS), + Table(name="Reply", attribute_definitions=MOCK_GET_COLUMNS), + Table(name="Thread", attribute_definitions=MOCK_GET_COLUMNS), + Table(name=1234, attribute_definitions=MOCK_GET_COLUMNS), + Table(name="Cars", attribute_definitions=MOCK_GET_COLUMNS_TWO), +] + +MOCK_GET_TABLE_NAMES_EMPTY = [] + +MOCK_GET_TABLE_NAMES_OTHER_DATATYPE = None + + +@patch("boto3.resources.collection.CollectionManager.all") +@patch("sqlalchemy.engine.base.Engine.connect") +class DynamoDbIngestionTest(TestCase): + def test_dynamodb_empty_table(self, mock_connect, all): + all.return_value = MOCK_GET_TABLE_NAMES_EMPTY + execute_workflow() + file_path = get_file_path() + with open(file_path, "r") as file: + assert len(json.loads(file.read())) == 0 + + def test_dynamodb_other_datatype_table(self, mock_connect, all): + all.return_value = MOCK_GET_TABLE_NAMES_OTHER_DATATYPE + execute_workflow() + file_path = get_file_path() + with open(file_path, "r") as file: + assert len(json.loads(file.read())) == 0 + + def test_dynamodb_file_sink(self, mock_connect, all): + all.return_value = MOCK_GET_TABLE_NAMES + execute_workflow() + table_names_list = [i.name for i in MOCK_GET_TABLE_NAMES] + column_names_list = [i["AttributeName"] for i in MOCK_GET_COLUMNS] + file_path = get_file_path() + with open(file_path, "r") as file: + for item in json.loads(file.read()): + OMetaDatabaseAndTable.parse_obj(item) + Database.parse_obj(item.get("database")) + table_obj = GTable.parse_obj(item.get("table")) + assert table_obj.name.__root__ in table_names_list + for column in table_obj.columns: + column_obj = Column.parse_obj(column) + assert column_obj.name.__root__ in column_names_list