From 191ecb733289be0bdf317f1fd2cc5b25be502def Mon Sep 17 00:00:00 2001 From: parthp2107 <83201188+parthp2107@users.noreply.github.com> Date: Sun, 12 Sep 2021 01:07:13 +0530 Subject: [PATCH] added unit tests to helper and workflow classes in ingestion (#391) * added unit tests to helper and workflow classes in ingestion * Addressing comments * Adressing comments * Adressing comments * Failing Unit tests fixed * Workflow test and config file modified Co-authored-by: parthp2107 Co-authored-by: Ayush Shah --- ingestion/tests/unit/client_test.py | 76 -------------- ingestion/tests/unit/helpers_test.py | 136 ++++++++++++++++++++++++++ ingestion/tests/unit/mysql_test.json | 34 +++++++ ingestion/tests/unit/workflow_test.py | 80 +++++++++++++++ 4 files changed, 250 insertions(+), 76 deletions(-) delete mode 100644 ingestion/tests/unit/client_test.py create mode 100644 ingestion/tests/unit/helpers_test.py create mode 100644 ingestion/tests/unit/mysql_test.json create mode 100644 ingestion/tests/unit/workflow_test.py diff --git a/ingestion/tests/unit/client_test.py b/ingestion/tests/unit/client_test.py deleted file mode 100644 index 21467bca4ab..00000000000 --- a/ingestion/tests/unit/client_test.py +++ /dev/null @@ -1,76 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from unittest import mock, TestCase - -from requests import HTTPError - -from metadata.generated.schema.api.data.createDatabase import CreateDatabaseEntityRequest -from metadata.generated.schema.api.data.createTable import CreateTableEntityRequest -from metadata.generated.schema.api.services.createDatabaseService import CreateDatabaseServiceEntityRequest -from metadata.generated.schema.entity.data.table import Column -from metadata.generated.schema.type.entityReference import EntityReference -from metadata.ingestion.ometa.client import REST, APIError - - -class RestTest(TestCase): - - @classmethod - def setUpClass(cls) -> None: - cls._client = REST("http://localhost:8585/api", 'test', 'none') - - @classmethod - def tearDownClass(cls) -> None: - pass - - def test_get_and_delete_service(self): - mysql_service = self._client.get_database_service('local_mysql') - self.assertEqual(mysql_service.name, 'local_mysql') - self._client.delete_database_service(mysql_service.id.__root__) - self.assertRaises(APIError, self._client.get_database_service_by_id, mysql_service.id.__root__) - - def test_create_service(self): - data = {'jdbc': {'connectionUrl': 'mysql://localhost/catalog_db', 'driverClass': 'jdbc'}, - 'name': 'local_mysql', - 'description': 'local mysql env'} - create_mysql_service = CreateDatabaseServiceEntityRequest(**data) - mysql_service = self._client.create_database_service(create_mysql_service) - print(mysql_service) - self.assertEqual(mysql_service.name, create_mysql_service.name) - - def test_get_service_by_id(self): - mysql_service = self._client.get_database_service('local_mysql') - self.assertEqual(mysql_service.name, 'local_mysql') - mysql_service_get_id = self._client.get_database_service_by_id(mysql_service.id.__root__) - self.assertEqual(mysql_service.id, mysql_service_get_id.id) - - def test_create_get_list_databases(self): - mysql_service = self._client.get_database_service('local_mysql') - service_reference = EntityReference(id=mysql_service.id, entity='mysql') - create_database_request = CreateDatabaseEntityRequest(name="dwh", service=service_reference) - created_database = self._client.create_database(create_database_request) - self.assertEqual(create_database_request.name, created_database.name) - created_database.description = "hello world" - update_database_request = CreateDatabaseEntityRequest(name=created_database.name, description=created_database.description, - service=service_reference) - updated_database = self._client.create_database(update_database_request) - self.assertEqual(updated_database.description, created_database.description) - - def test_create_update_table(self): - databases = self._client.list_databases() - columns = [Column(name="id", columnDataType="INT"), Column(name="name", columnDataType="VARCHAR")] - table = CreateTableEntityRequest(name="test1", columns=columns, database=databases[0].id) - created_table = self._client.create_or_update_table(table) - self.assertEqual(table.name, created_table.name) diff --git a/ingestion/tests/unit/helpers_test.py b/ingestion/tests/unit/helpers_test.py new file mode 100644 index 00000000000..b8fbca1168d --- /dev/null +++ b/ingestion/tests/unit/helpers_test.py @@ -0,0 +1,136 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import json +from unittest import TestCase + +from metadata.generated.schema.api.data.createDatabase import CreateDatabaseEntityRequest +from metadata.generated.schema.api.data.createTable import CreateTableEntityRequest +from metadata.generated.schema.api.services.createDatabaseService import \ + CreateDatabaseServiceEntityRequest +from metadata.generated.schema.entity.data.table import Column +from metadata.generated.schema.type.entityReference import EntityReference +from metadata.ingestion.ometa.client import APIError +from metadata.ingestion.ometa.openmetadata_rest import OpenMetadataAPIClient +from metadata.ingestion.ometa.openmetadata_rest import MetadataServerConfig +from metadata.generated.schema.api.services.createDashboardService import \ + CreateDashboardServiceEntityRequest +from metadata.generated.schema.api.services.createMessagingService import \ + CreateMessagingServiceEntityRequest + + +class RestTest(TestCase): + file_path = 'tests/unit/mysql_test.json' + with open(file_path) as ingestionFile: + ingestionData = ingestionFile.read() + client_config = json.loads(ingestionData).get("metadata_server") + config = client_config.get("config", {}) + metadata_config = MetadataServerConfig.parse_obj(config) + openmetadata_client = OpenMetadataAPIClient(metadata_config) + client = OpenMetadataAPIClient(metadata_config).client + + def test_1_create_service(self): + data = { + 'jdbc': {'connectionUrl': 'mysql://localhost/openmetadata_db', 'driverClass': 'jdbc'}, + 'name': 'local_mysql_test', + 'serviceType': "MySQL", + 'description': 'local mysql env'} + create_mysql_service = CreateDatabaseServiceEntityRequest(**data) + mysql_service = self.openmetadata_client.create_database_service(create_mysql_service) + self.assertEqual(mysql_service.name, create_mysql_service.name) + + def test_2_get_service(self): + mysql_service = self.openmetadata_client.get_database_service('local_mysql_test') + self.assertEqual(mysql_service.name, 'local_mysql_test') + + def test_3_get_service_by_id(self): + mysql_service = self.openmetadata_client.get_database_service('local_mysql_test') + mysql_service_get_id = self.openmetadata_client.get_database_service_by_id( + mysql_service.id.__root__ + ) + self.assertEqual(mysql_service.id, mysql_service_get_id.id) + + def test_4_create_update_databases(self): + mysql_service = self.openmetadata_client.get_database_service('local_mysql_test') + service_reference = EntityReference(id=mysql_service.id.__root__, type="databaseService") + create_database_request = CreateDatabaseEntityRequest( + name="dwh", service=service_reference + ) + created_database = self.openmetadata_client.create_database(create_database_request) + created_database.description = "hello world" + update_database_request = CreateDatabaseEntityRequest( + name=created_database.name, description=created_database.description, + service=service_reference + ) + updated_database = self.openmetadata_client.create_database(update_database_request) + self.assertEqual(updated_database.description, created_database.description) + + def test_5_create_table(self): + databases = self.openmetadata_client.list_databases() + columns = [Column(name="id", columnDataType="INT"), + Column(name="name", columnDataType="VARCHAR")] + table = CreateTableEntityRequest( + name="test1", columns=columns, database=databases[0].id.__root__ + ) + created_table = self.openmetadata_client.create_or_update_table(table) + self.client.delete(f"/tables/{created_table.id.__root__}") + self.client.delete(f"/databases/{databases[0].id.__root__}") + self.assertEqual(table.name, created_table.name) + + def test_6_delete_service(self): + mysql_service = self.openmetadata_client.get_database_service('local_mysql_test') + self.openmetadata_client.delete_database_service(mysql_service.id.__root__) + self.assertRaises( + APIError, self.openmetadata_client.get_database_service_by_id, + mysql_service.id.__root__ + ) + + def test_7_create_messaging_service(self): + create_messaging_service = CreateMessagingServiceEntityRequest( + name='sample_kafka_test', + serviceType='Kafka', + brokers=['localhost:9092'], + schemaRegistry='http://localhost:8081' + ) + messaging_service = self.openmetadata_client.create_messaging_service( + create_messaging_service + ) + self.assertEqual(create_messaging_service.name, messaging_service.name) + + def test_8_get_messaging_service(self): + messaging_service = self.openmetadata_client.get_messaging_service('sample_kafka_test') + self.client.delete(f"/services/messagingServices/{messaging_service.id.__root__}") + self.assertEqual(messaging_service.name, 'sample_kafka_test') + + def test_9_create_dashboard_service(self): + create_dashboard_service = CreateDashboardServiceEntityRequest( + name='sample_superset_test', + serviceType='Superset', + username='admin', + password='admin', + dashboardUrl='http://localhost:8088' + ) + dashboard_service = None + try: + dashboard_service = self.openmetadata_client.create_dashboard_service( + create_dashboard_service + ) + except APIError: + print(APIError) + self.assertEqual(create_dashboard_service.name, dashboard_service.name) + + def test_10_get_dashboard_service(self): + dashboard_service = self.openmetadata_client.get_dashboard_service('sample_superset_test') + self.client.delete(f"/services/dashboardServices/{dashboard_service.id.__root__}") + self.assertEqual(dashboard_service.name, 'sample_superset_test') diff --git a/ingestion/tests/unit/mysql_test.json b/ingestion/tests/unit/mysql_test.json new file mode 100644 index 00000000000..e20fec8f9fc --- /dev/null +++ b/ingestion/tests/unit/mysql_test.json @@ -0,0 +1,34 @@ +{ + "source": { + "type": "mysql", + "config": { + "username": "openmetadata_user", + "password": "openmetadata_password", + "database": "openmetadata_db", + "service_name": "local_mysql_test", + "filter_pattern": { + "excludes": ["mysql.*", "information_schema.*", "performance_schema.*", "sys.*"] + } + } + }, + "stage": { + "type": "file", + "config": { + "filename": "/tmp/mysql_test" + } + }, + "metadata_server": { + "type": "metadata-server", + "config": { + "api_endpoint": "http://localhost:8585/api", + "auth_provider_type": "no-auth" + } + }, + "cron": { + "minute": "*/5", + "hour": null, + "day": null, + "month": null, + "day_of_week": null + } +} diff --git a/ingestion/tests/unit/workflow_test.py b/ingestion/tests/unit/workflow_test.py new file mode 100644 index 00000000000..ff14ba58312 --- /dev/null +++ b/ingestion/tests/unit/workflow_test.py @@ -0,0 +1,80 @@ +import importlib +import pathlib +from unittest import TestCase + +from metadata.config.common import load_config_file +from metadata.ingestion.api.workflow import Workflow +from metadata.ingestion.ometa.openmetadata_rest import OpenMetadataAPIClient, MetadataServerConfig + + +class WorkflowTest(TestCase): + + def test_get_200(self): + key = 'metadata.ingestion.sink.metadata_rest.MetadataRestSink' + if key.find(".") >= 0: + module_name, class_name = key.rsplit(".", 1) + my_class = getattr(importlib.import_module(module_name), class_name) + self.assertEqual((my_class is not None), True) + + def test_get_4xx(self): + my_class = None + key = 'metadata.ingestion.sink.MYSQL.mysqlSINK' + try: + if key.find(".") >= 0: + module_name, class_name = key.rsplit(".", 1) + my_class = getattr(importlib.import_module(module_name), class_name) + except ModuleNotFoundError: + self.assertRaises(ModuleNotFoundError) + + def test_title_typeClassFetch(self): + is_file = True + file_type = 'query-parser' + if is_file: + replace = file_type.replace('-', '_') + else: + replace = ''.join([i.title() for i in file_type.replace('-', '_').split('_')]) + self.assertEqual(replace, 'query_parser') + + def test_title_typeClassFetch_4xx(self): + is_file = False + file_type = 'query-parser' + if is_file: + replace = file_type.replace('-', '_') + else: + replace = ''.join([i.title() for i in file_type.replace('-', '_').split('_')]) + self.assertEqual(replace, 'QueryParser') + + def test_execute_200(self): + config_file = pathlib.Path('tests/unit/mysql_test.json') + workflow_config = load_config_file(config_file) + if workflow_config.get('cron'): + del workflow_config['cron'] + workflow = Workflow.create(workflow_config) + workflow.execute() + workflow.stop() + config = MetadataServerConfig.parse_obj( + workflow_config.get('metadata_server').get( + 'config' + ) + ) + client = OpenMetadataAPIClient(config).client + + client.delete( + f"/services/databaseServices/" + f"{client.get('/services/databaseServices/name/local_mysql_test')['id']}" + ) + file_path = '/tmp/mysql_test' + with open(file_path) as ingestionFile: + ingestionData = ingestionFile.read() + self.assertEqual(ingestionData is not None, True) + + def test_execute_4xx(self): + config_file = pathlib.Path('tests/unit/mysql_test.json') + workflow_config = load_config_file(config_file) + ingestionData = None + try: + file_path = '/tmp/mysql_test123' + with open(file_path) as ingestionFile: + ingestionData = ingestionFile.read() + except FileNotFoundError: + self.assertRaises(FileNotFoundError)