diff --git a/.github/workflows/py-cli-e2e-tests.yml b/.github/workflows/py-cli-e2e-tests.yml index 664f6549707..f8dd27872ef 100644 --- a/.github/workflows/py-cli-e2e-tests.yml +++ b/.github/workflows/py-cli-e2e-tests.yml @@ -21,7 +21,7 @@ jobs: strategy: fail-fast: false matrix: - e2e-test: ['bigquery', 'dbt_redshift', 'metabase', 'mssql', 'mysql', 'redash', 'snowflake', 'tableau', 'powerbi', 'vertica', 'python'] + e2e-test: ['bigquery', 'dbt_redshift', 'metabase', 'mssql', 'mysql', 'redash', 'snowflake', 'tableau', 'powerbi', 'vertica', 'python', 'redshift'] environment: test steps: @@ -97,6 +97,7 @@ jobs: E2E_REDSHIFT_USERNAME: ${{ secrets.E2E_REDSHIFT_USERNAME }} E2E_REDSHIFT_PASSWORD: ${{ secrets.E2E_REDSHIFT_PASSWORD }} E2E_REDSHIFT_DATABASE: ${{ secrets.E2E_REDSHIFT_DATABASE }} + E2E_REDSHIFT_DB: ${{ secrets.E2E_REDSHIFT_DB }} E2E_MSSQL_USERNAME: ${{ secrets.E2E_MSSQL_USERNAME }} E2E_MSSQL_PASSWORD: ${{ secrets.E2E_MSSQL_PASSWORD }} E2E_MSSQL_HOST: ${{ secrets.E2E_MSSQL_HOST }} diff --git a/ingestion/tests/cli_e2e/base/config_builders/builders.py b/ingestion/tests/cli_e2e/base/config_builders/builders.py index cb03ec93c76..7993e133214 100644 --- a/ingestion/tests/cli_e2e/base/config_builders/builders.py +++ b/ingestion/tests/cli_e2e/base/config_builders/builders.py @@ -60,6 +60,12 @@ class ProfilerConfigBuilder(BaseBuilder): "profileSample": self.profilerSample, } } + + if self.config_args.get("includes"): + self.config["source"]["sourceConfig"]["config"][ + "schemaFilterPattern" + ] = self.config_args + self.config["processor"] = {"type": "orm-profiler", "config": {}} return self.config diff --git a/ingestion/tests/cli_e2e/base/test_cli_db.py b/ingestion/tests/cli_e2e/base/test_cli_db.py index c394fed4ae8..6c57d211b1d 100644 --- a/ingestion/tests/cli_e2e/base/test_cli_db.py +++ b/ingestion/tests/cli_e2e/base/test_cli_db.py @@ -62,7 +62,9 @@ class CliDBBase(TestCase): self.create_table_and_view() self.build_config_file() self.run_command() - self.build_config_file(E2EType.PROFILER) + self.build_config_file( + E2EType.PROFILER, {"includes": self.get_includes_schemas()} + ) result = self.run_command("profile") sink_status, source_status = self.retrieve_statuses(result) self.assert_for_table_with_profiler(source_status, sink_status) @@ -189,10 +191,10 @@ class CliDBBase(TestCase): def test_profiler_with_time_partition(self) -> None: """11. Test time partitioning for the profiler""" time_partition = self.get_profiler_time_partition() + if not time_partition: + pytest.skip("Profiler time partition not configured. Skipping test.") if time_partition: - processor_config = self.get_profiler_processor_config( - self.get_profiler_time_partition() - ) + processor_config = self.get_profiler_processor_config(time_partition) self.build_config_file( E2EType.PROFILER_PROCESSOR, {"processor": processor_config}, diff --git a/ingestion/tests/cli_e2e/common/test_cli_db.py b/ingestion/tests/cli_e2e/common/test_cli_db.py index 5c6dff0e23a..e50c03694bc 100644 --- a/ingestion/tests/cli_e2e/common/test_cli_db.py +++ b/ingestion/tests/cli_e2e/common/test_cli_db.py @@ -14,6 +14,7 @@ Test database connectors which extend from `CommonDbSourceService` with CLI """ from abc import ABC, abstractmethod from pathlib import Path +from typing import Optional from sqlalchemy.engine import Engine @@ -60,9 +61,9 @@ class CliCommonDB: self, source_status: SourceStatus, sink_status: SinkStatus ): self.assertTrue(len(source_status.failures) == 0) - self.assertTrue(len(source_status.records) > self.expected_tables()) + self.assertTrue(len(source_status.records) >= self.expected_tables()) self.assertTrue(len(sink_status.failures) == 0) - self.assertTrue(len(sink_status.records) > self.expected_tables()) + self.assertTrue(len(sink_status.records) >= self.expected_tables()) sample_data = self.retrieve_sample_data(self.fqn_created_table()).sampleData lineage = self.retrieve_lineage(self.fqn_created_table()) self.assertTrue(len(sample_data.rows) == self.inserted_rows_count()) @@ -77,6 +78,7 @@ class CliCommonDB: self.assertTrue(len(source_status.failures) == 0) self.assertTrue(len(sink_status.failures) == 0) sample_data = self.retrieve_sample_data(self.fqn_created_table()).sampleData + self.assertTrue(len(sample_data.rows) < self.inserted_rows_count()) profile = self.retrieve_profile(self.fqn_created_table()) expected_profiler_time_partition_results = ( self.get_profiler_time_partition_results() @@ -112,7 +114,7 @@ class CliCommonDB: def assert_for_delete_table_is_marked_as_deleted( self, source_status: SourceStatus, sink_status: SinkStatus ): - self.assertEqual(self.retrieve_table(self.fqn_created_table()), None) + self.assertEqual(self.retrieve_table(self.fqn_deleted_table()), None) def assert_filtered_schemas_includes( self, source_status: SourceStatus, sink_status: SinkStatus @@ -178,6 +180,15 @@ class CliCommonDB: def fqn_created_table() -> str: raise NotImplementedError() + @staticmethod + def _fqn_deleted_table() -> Optional[str]: + return None + + def fqn_deleted_table(self) -> str: + if self._fqn_deleted_table() is None: + return self.fqn_created_table() + return self._fqn_deleted_table() # type: ignore + @staticmethod @abstractmethod def expected_filtered_schema_includes() -> int: diff --git a/ingestion/tests/cli_e2e/database/redshift/redshift.yaml b/ingestion/tests/cli_e2e/database/redshift/redshift.yaml new file mode 100644 index 00000000000..cd1f7977169 --- /dev/null +++ b/ingestion/tests/cli_e2e/database/redshift/redshift.yaml @@ -0,0 +1,26 @@ +source: + type: redshift + serviceName: e2e_redshift + serviceConnection: + config: + hostPort: $E2E_REDSHIFT_HOST_PORT + username: $E2E_REDSHIFT_USERNAME + password: $E2E_REDSHIFT_PASSWORD + database: $E2E_REDSHIFT_DB + type: Redshift + sourceConfig: + config: + markDeletedTables: true + includeTables: true + includeViews: true + type: DatabaseMetadata +sink: + type: metadata-rest + config: {} +workflowConfig: + loggerLevel: DEBUG + openMetadataServerConfig: + hostPort: http://localhost:8585/api + authProvider: openmetadata + securityConfig: + "jwtToken": "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg" diff --git a/ingestion/tests/cli_e2e/test_cli_mssql.py b/ingestion/tests/cli_e2e/test_cli_mssql.py index 62588c84f85..f3efaa47857 100644 --- a/ingestion/tests/cli_e2e/test_cli_mssql.py +++ b/ingestion/tests/cli_e2e/test_cli_mssql.py @@ -15,9 +15,6 @@ MSSQL E2E tests from typing import List -import pytest -import yaml - from metadata.generated.schema.entity.data.table import Histogram from .common.test_cli_db import CliCommonDB diff --git a/ingestion/tests/cli_e2e/test_cli_redshift.py b/ingestion/tests/cli_e2e/test_cli_redshift.py new file mode 100644 index 00000000000..d4cdadc308f --- /dev/null +++ b/ingestion/tests/cli_e2e/test_cli_redshift.py @@ -0,0 +1,187 @@ +# Copyright 2022 Collate +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Redshift E2E tests +""" + +from typing import List + +from metadata.generated.schema.entity.data.table import Histogram + +from .common.test_cli_db import CliCommonDB +from .common_e2e_sqa_mixins import SQACommonMethods + + +class RedshiftCliTest(CliCommonDB.TestSuite, SQACommonMethods): + create_table_query: str = """ + CREATE TABLE e2e_cli_tests.dbt_jaffle.persons ( + person_id int, + full_name varchar(255), + birthdate date + ) + """ + + create_view_query: str = """ + CREATE VIEW e2e_cli_tests.dbt_jaffle.view_persons AS + SELECT * + FROM e2e_cli_tests.dbt_jaffle.persons; + """ + + insert_data_queries: List[str] = [ + """ + INSERT INTO e2e_cli_tests.dbt_jaffle.persons (person_id, full_name, birthdate) VALUES + (1,'Peter Parker', '2004-08-10'), + (2,'Bruce Banner', '1988-12-18'), + (3,'Steve Rogers', '1988-07-04'), + (4,'Natasha Romanoff', '1997-12-03'), + (5,'Wanda Maximoff', '1998-02-10'), + (6,'Diana Prince', '1976-03-17'); + """ + ] + + drop_table_query: str = """ + DROP TABLE IF EXISTS e2e_cli_tests.dbt_jaffle.persons; + """ + + drop_view_query: str = """ + DROP VIEW IF EXISTS e2e_cli_tests.dbt_jaffle.view_persons; + """ + + def setUp(self) -> None: + self.create_table_and_view() + + def tearDown(self) -> None: + self.delete_table_and_view() + + def create_table_and_view(self) -> None: + SQACommonMethods.create_table_and_view(self) + + def delete_table_and_view(self) -> None: + SQACommonMethods.delete_table_and_view(self) + + @staticmethod + def get_connector_name() -> str: + return "redshift" + + @staticmethod + def expected_tables() -> int: + return 5 + + def inserted_rows_count(self) -> int: + return 100 + + def view_column_lineage_count(self) -> int: + """view was created from `CREATE VIEW xyz AS (SELECT * FROM abc)` + which does not propagate column lineage + """ + return 0 + + @staticmethod + def fqn_created_table() -> str: + return "e2e_redshift.e2e_cli_tests.dbt_jaffle.listing" + + @staticmethod + def _fqn_deleted_table() -> str: + return "e2e_redshift.e2e_cli_tests.dbt_jaffle.persons" + + @staticmethod + def get_profiler_time_partition() -> dict: + return { + "fullyQualifiedName": "e2e_redshift.e2e_cli_tests.dbt_jaffle.listing", + "partitionConfig": { + "enablePartitioning": True, + "partitionColumnName": "date", + "partitionIntervalType": "TIME-UNIT", + "partitionInterval": 5, + "partitionIntervalUnit": "YEAR", + }, + } + + @staticmethod + def get_includes_schemas() -> List[str]: + return ["dbt_jaffle"] + + @staticmethod + def get_includes_tables() -> List[str]: + return ["customer", "listing"] + + @staticmethod + def get_excludes_tables() -> List[str]: + return ["foo"] + + @staticmethod + def expected_filtered_schema_includes() -> int: + return 3 + + @staticmethod + def expected_filtered_schema_excludes() -> int: + return 1 + + @staticmethod + def expected_filtered_table_includes() -> int: + return 45 + + @staticmethod + def expected_filtered_table_excludes() -> int: + return 2 + + @staticmethod + def expected_filtered_mix() -> int: + return 8 + + @staticmethod + def get_profiler_time_partition_results() -> dict: + return { + "table_profile": { + "columnCount": 9.0, + "rowCount": 22.0, + }, + "column_profile": [ + { + "totalprice": { + "distinctCount": 22.0, + "distinctProportion": 1.0, + "duplicateCount": None, + "firstQuartile": -493.42, + "histogram": Histogram( + boundaries=[ + "-999.63 to -369.21", + "-369.21 to 261.22", + "261.22 and up", + ], + frequencies=[9, 8, 5], + ), + "interQuartileRange": 883.236, + "max": 856.41, + "maxLength": None, + "mean": -160.16, + "median": -288.81, + "min": -999.63, + "minLength": None, + "missingCount": None, + "missingPercentage": None, + "nonParametricSkew": 0.24351799263849705, + "nullCount": 0.0, + "nullProportion": 0.0, + "stddev": 528.297718809555, + "sum": -3518.0, + "thirdQuartile": 389.816, + "uniqueCount": 22.0, + "uniqueProportion": 1.0, + "validCount": None, + "valuesCount": 22.0, + "valuesPercentage": None, + "variance": None, + } + } + ], + }