From 07aed10a7ed607d83016e4881a5dc0ad94c63d56 Mon Sep 17 00:00:00 2001 From: Mayur Singal <39544459+ulixius9@users.noreply.github.com> Date: Tue, 20 Dec 2022 18:00:26 +0530 Subject: [PATCH] Snowflake E2E Tests (#8781) --- .github/workflows/py-cli-e2e-tests.yml | 7 +- .../cli_e2e/database/snowflake/queries.sql | 11 ++ .../cli_e2e/database/snowflake/snowflake.yaml | 34 +++++ ingestion/tests/cli_e2e/test_cli_db_base.py | 38 ++++-- ingestion/tests/cli_e2e/test_cli_snowflake.py | 124 ++++++++++++++++++ 5 files changed, 205 insertions(+), 9 deletions(-) create mode 100644 ingestion/tests/cli_e2e/database/snowflake/queries.sql create mode 100644 ingestion/tests/cli_e2e/database/snowflake/snowflake.yaml create mode 100644 ingestion/tests/cli_e2e/test_cli_snowflake.py diff --git a/.github/workflows/py-cli-e2e-tests.yml b/.github/workflows/py-cli-e2e-tests.yml index 69bf0fa0675..81b3a5ccbea 100644 --- a/.github/workflows/py-cli-e2e-tests.yml +++ b/.github/workflows/py-cli-e2e-tests.yml @@ -21,7 +21,7 @@ jobs: strategy: matrix: py-version: ['3.9'] - e2e-test: ['mysql', 'bigquery'] + e2e-test: ['mysql', 'bigquery', 'snowflake'] environment: e2e steps: @@ -73,6 +73,11 @@ jobs: E2E_BQ_PRIVATE_KEY_ID: ${{ secrets.E2E_BQ_PRIVATE_KEY_ID }} E2E_BQ_CLIENT_EMAIL: ${{ secrets.E2E_BQ_CLIENT_EMAIL }} E2E_BQ_CLIENT_ID: ${{ secrets.E2E_BQ_CLIENT_ID }} + E2E_SNOWFLAKE_PASSWORD: ${{ secrets.E2E_SNOWFLAKE_PASSWORD }} + E2E_SNOWFLAKE_USERNAME: ${{ secrets.E2E_SNOWFLAKE_USERNAME }} + E2E_SNOWFLAKE_ACCOUNT: ${{ secrets.E2E_SNOWFLAKE_ACCOUNT }} + E2E_SNOWFLAKE_DATABASE: ${{ secrets.E2E_SNOWFLAKE_DATABASE }} + E2E_SNOWFLAKE_WAREHOUSE: ${{ secrets.E2E_SNOWFLAKE_WAREHOUSE }} run: | source env/bin/activate python -m pytest -c ingestion/setup.cfg ingestion/tests/cli_e2e/test_cli_$E2E_TEST.py diff --git a/ingestion/tests/cli_e2e/database/snowflake/queries.sql b/ingestion/tests/cli_e2e/database/snowflake/queries.sql new file mode 100644 index 00000000000..0a9b4520614 --- /dev/null +++ b/ingestion/tests/cli_e2e/database/snowflake/queries.sql @@ -0,0 +1,11 @@ +DROP DATABASE IF EXISTS E2E_DB; +CREATE OR REPLACE DATABASE E2E_DB; +USE E2E_DB; +CREATE OR REPLACE SCHEMA e2e_test; +CREATE OR REPLACE TABLE e2e_test.regions(region_id INT PRIMARY KEY,region_name VARCHAR(25)); +CREATE OR REPLACE TABLE e2e_test.countries(country_id CHAR(2) PRIMARY KEY,country_name VARCHAR (40),region_id INT NOT NULL); +CREATE OR REPLACE TABLE e2e_test.locations(e2e_testlocation_id INT PRIMARY KEY,e2e_teststreet_address VARCHAR (40),e2e_testpostal_code VARCHAR (12),e2e_testcity VARCHAR (30) NOT NULL,e2e_teststate_province VARCHAR (25),e2e_testcountry_id CHAR (2) NOT NULL); +CREATE OR REPLACE TABLE e2e_test.jobs(e2e_testjob_id INT PRIMARY KEY,e2e_testjob_title VARCHAR (35) NOT NULL,e2e_testmin_salary DECIMAL (8, 2),e2e_testmax_salary DECIMAL (8, 2)); +CREATE OR REPLACE TABLE e2e_test.test_departments(e2e_testdepartment_id INT PRIMARY KEY,e2e_testdepartment_name VARCHAR (30) NOT NULL,e2e_testlocation_id INT); +CREATE OR REPLACE TABLE e2e_test.test_employees(e2e_testemployee_id INT PRIMARY KEY,e2e_testfirst_name VARCHAR (20),e2e_testlast_name VARCHAR (25) NOT NULL,e2e_testemail VARCHAR (100) NOT NULL,e2e_testphone_number VARCHAR (20),e2e_testhire_date DATE NOT NULL,e2e_testjob_id INT NOT NULL,e2e_testsalary DECIMAL (8, 2) NOT NULL,e2e_testmanager_id INT,e2e_testdepartment_id INT); +CREATE OR REPLACE TABLE e2e_test.test_dependents(e2e_testdependent_id INT PRIMARY KEY,e2e_testfirst_name VARCHAR (50) NOT NULL,e2e_testlast_name VARCHAR (50) NOT NULL,e2e_testrelationship VARCHAR (25) NOT NULL,e2e_testemployee_id INT NOT NULL); diff --git a/ingestion/tests/cli_e2e/database/snowflake/snowflake.yaml b/ingestion/tests/cli_e2e/database/snowflake/snowflake.yaml new file mode 100644 index 00000000000..32e96f8cf3d --- /dev/null +++ b/ingestion/tests/cli_e2e/database/snowflake/snowflake.yaml @@ -0,0 +1,34 @@ +source: + type: snowflake + serviceName: local_snowflake + serviceConnection: + config: + username: $E2E_SNOWFLAKE_USERNAME + password: $E2E_SNOWFLAKE_PASSWORD + account: $E2E_SNOWFLAKE_ACCOUNT + warehouse: $E2E_SNOWFLAKE_WAREHOUSE + database: $E2E_SNOWFLAKE_DATABASE + type: Snowflake + connectionOptions: {} + connectionArguments: {} + sourceConfig: + config: + markDeletedTables: true + includeTables: true + includeViews: true + type: DatabaseMetadata + schemaFilterPattern: + excludes: + - information_schema.* + +sink: + type: metadata-rest + config: {} +workflowConfig: + openMetadataServerConfig: + hostPort: http://localhost:8585/api + authProvider: openmetadata + securityConfig: + jwtToken: "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg" + + diff --git a/ingestion/tests/cli_e2e/test_cli_db_base.py b/ingestion/tests/cli_e2e/test_cli_db_base.py index 0e5cd7f347b..da66308ce9b 100644 --- a/ingestion/tests/cli_e2e/test_cli_db_base.py +++ b/ingestion/tests/cli_e2e/test_cli_db_base.py @@ -32,11 +32,16 @@ from metadata.ingestion.api.sink import SinkStatus from metadata.ingestion.api.source import SourceStatus from metadata.ingestion.api.workflow import Workflow from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.utils.constants import UTF_8 PATH_TO_RESOURCES = os.path.dirname(os.path.realpath(__file__)) class E2EType(Enum): + """ + E2E Type Enum Class + """ + INGEST = "ingest" PROFILER = "profiler" INGEST_FILTER_SCHEMA = "ingest-filter-schema" @@ -45,7 +50,15 @@ class E2EType(Enum): class CliDBBase(TestCase): - class TestSuite(TestCase): + """ + CLI DB Base class + """ + + class TestSuite(TestCase): # pylint: disable=too-many-public-methods + """ + TestSuite class to define test structure + """ + catcher = StringIO() openmetadata: OpenMetadata test_file_path: str @@ -206,11 +219,11 @@ class CliDBBase(TestCase): def build_config_file( self, test_type: E2EType = E2EType.INGEST, extra_args: dict = None ) -> None: - with open(self.config_file_path) as f: - config_yaml = yaml.safe_load(f) + with open(self.config_file_path, encoding=UTF_8) as config_file: + config_yaml = yaml.safe_load(config_file) config_yaml = self.build_yaml(config_yaml, test_type, extra_args) - with open(self.test_file_path, "w") as w: - yaml.dump(config_yaml, w) + with open(self.test_file_path, "w", encoding=UTF_8) as test_file: + yaml.dump(config_yaml, test_file) def retrieve_statuses(self, result): source_status: SourceStatus = self.extract_source_status(result) @@ -253,7 +266,9 @@ class CliDBBase(TestCase): output_clean = re.findall( "Source Status: (.*?) Sink Status: .*", output_clean.strip() ) - return SourceStatus.parse_obj(eval(output_clean[0].strip())) + return SourceStatus.parse_obj( + eval(output_clean[0].strip()) # pylint: disable=eval-used + ) @staticmethod def extract_sink_status(output) -> SinkStatus: @@ -264,7 +279,7 @@ class CliDBBase(TestCase): output_clean = re.findall( ".* Sink Status: (.*?) Workflow finished.*", output_clean.strip() )[0].strip() - return SinkStatus.parse_obj(eval(output_clean)) + return SinkStatus.parse_obj(eval(output_clean)) # pylint: disable=eval-used @staticmethod @abstractmethod @@ -344,10 +359,17 @@ class CliDBBase(TestCase): @staticmethod def build_yaml(config_yaml: dict, test_type: E2EType, extra_args: dict): + """ + Build yaml as per E2EType + """ if test_type == E2EType.PROFILER: del config_yaml["source"]["sourceConfig"]["config"] config_yaml["source"]["sourceConfig"] = { - "config": {"type": "Profiler", "generateSampleData": True} + "config": { + "type": "Profiler", + "generateSampleData": True, + "profileSample": 1, + } } config_yaml["processor"] = {"type": "orm-profiler", "config": {}} if test_type == E2EType.INGEST_FILTER_SCHEMA: diff --git a/ingestion/tests/cli_e2e/test_cli_snowflake.py b/ingestion/tests/cli_e2e/test_cli_snowflake.py new file mode 100644 index 00000000000..c0de922687c --- /dev/null +++ b/ingestion/tests/cli_e2e/test_cli_snowflake.py @@ -0,0 +1,124 @@ +# Copyright 2022 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Test Snowflake connector with CLI +""" +from typing import List + +from metadata.ingestion.api.sink import SinkStatus +from metadata.ingestion.api.source import SourceStatus + +from .test_cli_db_base_common import CliCommonDB + + +class SnowflakeCliTest(CliCommonDB.TestSuite): + """ + Snowflake CLI Tests + """ + + create_table_query: str = """ + CREATE TABLE E2E_DB.e2e_test.persons ( + person_id int, + full_name varchar(255) + ) + """ + + create_view_query: str = """ + CREATE VIEW E2E_DB.e2e_test.view_persons AS + SELECT person_id, full_name + FROM e2e_test.persons; + """ + + insert_data_queries: List[str] = [ + "INSERT INTO E2E_DB.e2e_test.persons (person_id, full_name) VALUES (1,'Peter Parker');", + "INSERT INTO E2E_DB.e2e_test.persons (person_id, full_name) VALUES (1, 'Clark Kent');", + ] + + drop_table_query: str = """ + DROP TABLE IF EXISTS E2E_DB.e2e_test.persons; + """ + + drop_view_query: str = """ + DROP VIEW IF EXISTS E2E_DB.e2e_test.view_persons; + """ + + @staticmethod + def get_connector_name() -> str: + return "snowflake" + + def assert_for_vanilla_ingestion( + self, source_status: SourceStatus, sink_status: SinkStatus + ) -> None: + self.assertTrue(len(source_status.failures) == 0) + self.assertTrue(len(source_status.warnings) == 0) + self.assertTrue(len(source_status.filtered) == 1) + self.assertTrue(len(source_status.success) >= self.expected_tables()) + self.assertTrue(len(sink_status.failures) == 0) + self.assertTrue(len(sink_status.warnings) == 0) + self.assertTrue(len(sink_status.records) > self.expected_tables()) + + def create_table_and_view(self) -> None: + with self.engine.connect() as connection: + connection.execute(self.create_table_query) + for insert_query in self.insert_data_queries: + connection.execute(insert_query) + connection.execute(self.create_view_query) + connection.close() + + def delete_table_and_view(self) -> None: + with self.engine.connect() as connection: + connection.execute(self.drop_view_query) + connection.execute(self.drop_table_query) + connection.close() + + @staticmethod + def expected_tables() -> int: + return 7 + + def inserted_rows_count(self) -> int: + return len(self.insert_data_queries) + + @staticmethod + def fqn_created_table() -> str: + return "local_snowflake.E2E_DB.E2E_TEST.PERSONS" + + @staticmethod + def get_includes_schemas() -> List[str]: + return ["e2e_test.*"] + + @staticmethod + def get_includes_tables() -> List[str]: + return ["^test.*"] + + @staticmethod + def get_excludes_tables() -> List[str]: + return [".*ons"] + + @staticmethod + def expected_filtered_schema_includes() -> int: + return 2 + + @staticmethod + def expected_filtered_schema_excludes() -> int: + return 1 + + @staticmethod + def expected_filtered_table_includes() -> int: + return 5 + + @staticmethod + def expected_filtered_table_excludes() -> int: + return 4 + + @staticmethod + def expected_filtered_mix() -> int: + return 6