tests: Added E2E tests for redshift (#11169)

This commit is contained in:
Teddy 2023-04-21 08:55:54 +02:00 committed by GitHub
parent 9db2a0198a
commit 6f5d88ff63
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 241 additions and 11 deletions

View File

@ -21,7 +21,7 @@ jobs:
strategy:
fail-fast: false
matrix:
e2e-test: ['bigquery', 'dbt_redshift', 'metabase', 'mssql', 'mysql', 'redash', 'snowflake', 'tableau', 'powerbi', 'vertica', 'python']
e2e-test: ['bigquery', 'dbt_redshift', 'metabase', 'mssql', 'mysql', 'redash', 'snowflake', 'tableau', 'powerbi', 'vertica', 'python', 'redshift']
environment: test
steps:
@ -97,6 +97,7 @@ jobs:
E2E_REDSHIFT_USERNAME: ${{ secrets.E2E_REDSHIFT_USERNAME }}
E2E_REDSHIFT_PASSWORD: ${{ secrets.E2E_REDSHIFT_PASSWORD }}
E2E_REDSHIFT_DATABASE: ${{ secrets.E2E_REDSHIFT_DATABASE }}
E2E_REDSHIFT_DB: ${{ secrets.E2E_REDSHIFT_DB }}
E2E_MSSQL_USERNAME: ${{ secrets.E2E_MSSQL_USERNAME }}
E2E_MSSQL_PASSWORD: ${{ secrets.E2E_MSSQL_PASSWORD }}
E2E_MSSQL_HOST: ${{ secrets.E2E_MSSQL_HOST }}

View File

@ -60,6 +60,12 @@ class ProfilerConfigBuilder(BaseBuilder):
"profileSample": self.profilerSample,
}
}
if self.config_args.get("includes"):
self.config["source"]["sourceConfig"]["config"][
"schemaFilterPattern"
] = self.config_args
self.config["processor"] = {"type": "orm-profiler", "config": {}}
return self.config

View File

@ -62,7 +62,9 @@ class CliDBBase(TestCase):
self.create_table_and_view()
self.build_config_file()
self.run_command()
self.build_config_file(E2EType.PROFILER)
self.build_config_file(
E2EType.PROFILER, {"includes": self.get_includes_schemas()}
)
result = self.run_command("profile")
sink_status, source_status = self.retrieve_statuses(result)
self.assert_for_table_with_profiler(source_status, sink_status)
@ -189,10 +191,10 @@ class CliDBBase(TestCase):
def test_profiler_with_time_partition(self) -> None:
"""11. Test time partitioning for the profiler"""
time_partition = self.get_profiler_time_partition()
if not time_partition:
pytest.skip("Profiler time partition not configured. Skipping test.")
if time_partition:
processor_config = self.get_profiler_processor_config(
self.get_profiler_time_partition()
)
processor_config = self.get_profiler_processor_config(time_partition)
self.build_config_file(
E2EType.PROFILER_PROCESSOR,
{"processor": processor_config},

View File

@ -14,6 +14,7 @@ Test database connectors which extend from `CommonDbSourceService` with CLI
"""
from abc import ABC, abstractmethod
from pathlib import Path
from typing import Optional
from sqlalchemy.engine import Engine
@ -60,9 +61,9 @@ class CliCommonDB:
self, source_status: SourceStatus, sink_status: SinkStatus
):
self.assertTrue(len(source_status.failures) == 0)
self.assertTrue(len(source_status.records) > self.expected_tables())
self.assertTrue(len(source_status.records) >= self.expected_tables())
self.assertTrue(len(sink_status.failures) == 0)
self.assertTrue(len(sink_status.records) > self.expected_tables())
self.assertTrue(len(sink_status.records) >= self.expected_tables())
sample_data = self.retrieve_sample_data(self.fqn_created_table()).sampleData
lineage = self.retrieve_lineage(self.fqn_created_table())
self.assertTrue(len(sample_data.rows) == self.inserted_rows_count())
@ -77,6 +78,7 @@ class CliCommonDB:
self.assertTrue(len(source_status.failures) == 0)
self.assertTrue(len(sink_status.failures) == 0)
sample_data = self.retrieve_sample_data(self.fqn_created_table()).sampleData
self.assertTrue(len(sample_data.rows) < self.inserted_rows_count())
profile = self.retrieve_profile(self.fqn_created_table())
expected_profiler_time_partition_results = (
self.get_profiler_time_partition_results()
@ -112,7 +114,7 @@ class CliCommonDB:
def assert_for_delete_table_is_marked_as_deleted(
self, source_status: SourceStatus, sink_status: SinkStatus
):
self.assertEqual(self.retrieve_table(self.fqn_created_table()), None)
self.assertEqual(self.retrieve_table(self.fqn_deleted_table()), None)
def assert_filtered_schemas_includes(
self, source_status: SourceStatus, sink_status: SinkStatus
@ -178,6 +180,15 @@ class CliCommonDB:
def fqn_created_table() -> str:
raise NotImplementedError()
@staticmethod
def _fqn_deleted_table() -> Optional[str]:
return None
def fqn_deleted_table(self) -> str:
if self._fqn_deleted_table() is None:
return self.fqn_created_table()
return self._fqn_deleted_table() # type: ignore
@staticmethod
@abstractmethod
def expected_filtered_schema_includes() -> int:

View File

@ -0,0 +1,26 @@
source:
type: redshift
serviceName: e2e_redshift
serviceConnection:
config:
hostPort: $E2E_REDSHIFT_HOST_PORT
username: $E2E_REDSHIFT_USERNAME
password: $E2E_REDSHIFT_PASSWORD
database: $E2E_REDSHIFT_DB
type: Redshift
sourceConfig:
config:
markDeletedTables: true
includeTables: true
includeViews: true
type: DatabaseMetadata
sink:
type: metadata-rest
config: {}
workflowConfig:
loggerLevel: DEBUG
openMetadataServerConfig:
hostPort: http://localhost:8585/api
authProvider: openmetadata
securityConfig:
"jwtToken": "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg"

View File

@ -15,9 +15,6 @@ MSSQL E2E tests
from typing import List
import pytest
import yaml
from metadata.generated.schema.entity.data.table import Histogram
from .common.test_cli_db import CliCommonDB

View File

@ -0,0 +1,187 @@
# Copyright 2022 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Redshift E2E tests
"""
from typing import List
from metadata.generated.schema.entity.data.table import Histogram
from .common.test_cli_db import CliCommonDB
from .common_e2e_sqa_mixins import SQACommonMethods
class RedshiftCliTest(CliCommonDB.TestSuite, SQACommonMethods):
create_table_query: str = """
CREATE TABLE e2e_cli_tests.dbt_jaffle.persons (
person_id int,
full_name varchar(255),
birthdate date
)
"""
create_view_query: str = """
CREATE VIEW e2e_cli_tests.dbt_jaffle.view_persons AS
SELECT *
FROM e2e_cli_tests.dbt_jaffle.persons;
"""
insert_data_queries: List[str] = [
"""
INSERT INTO e2e_cli_tests.dbt_jaffle.persons (person_id, full_name, birthdate) VALUES
(1,'Peter Parker', '2004-08-10'),
(2,'Bruce Banner', '1988-12-18'),
(3,'Steve Rogers', '1988-07-04'),
(4,'Natasha Romanoff', '1997-12-03'),
(5,'Wanda Maximoff', '1998-02-10'),
(6,'Diana Prince', '1976-03-17');
"""
]
drop_table_query: str = """
DROP TABLE IF EXISTS e2e_cli_tests.dbt_jaffle.persons;
"""
drop_view_query: str = """
DROP VIEW IF EXISTS e2e_cli_tests.dbt_jaffle.view_persons;
"""
def setUp(self) -> None:
self.create_table_and_view()
def tearDown(self) -> None:
self.delete_table_and_view()
def create_table_and_view(self) -> None:
SQACommonMethods.create_table_and_view(self)
def delete_table_and_view(self) -> None:
SQACommonMethods.delete_table_and_view(self)
@staticmethod
def get_connector_name() -> str:
return "redshift"
@staticmethod
def expected_tables() -> int:
return 5
def inserted_rows_count(self) -> int:
return 100
def view_column_lineage_count(self) -> int:
"""view was created from `CREATE VIEW xyz AS (SELECT * FROM abc)`
which does not propagate column lineage
"""
return 0
@staticmethod
def fqn_created_table() -> str:
return "e2e_redshift.e2e_cli_tests.dbt_jaffle.listing"
@staticmethod
def _fqn_deleted_table() -> str:
return "e2e_redshift.e2e_cli_tests.dbt_jaffle.persons"
@staticmethod
def get_profiler_time_partition() -> dict:
return {
"fullyQualifiedName": "e2e_redshift.e2e_cli_tests.dbt_jaffle.listing",
"partitionConfig": {
"enablePartitioning": True,
"partitionColumnName": "date",
"partitionIntervalType": "TIME-UNIT",
"partitionInterval": 5,
"partitionIntervalUnit": "YEAR",
},
}
@staticmethod
def get_includes_schemas() -> List[str]:
return ["dbt_jaffle"]
@staticmethod
def get_includes_tables() -> List[str]:
return ["customer", "listing"]
@staticmethod
def get_excludes_tables() -> List[str]:
return ["foo"]
@staticmethod
def expected_filtered_schema_includes() -> int:
return 3
@staticmethod
def expected_filtered_schema_excludes() -> int:
return 1
@staticmethod
def expected_filtered_table_includes() -> int:
return 45
@staticmethod
def expected_filtered_table_excludes() -> int:
return 2
@staticmethod
def expected_filtered_mix() -> int:
return 8
@staticmethod
def get_profiler_time_partition_results() -> dict:
return {
"table_profile": {
"columnCount": 9.0,
"rowCount": 22.0,
},
"column_profile": [
{
"totalprice": {
"distinctCount": 22.0,
"distinctProportion": 1.0,
"duplicateCount": None,
"firstQuartile": -493.42,
"histogram": Histogram(
boundaries=[
"-999.63 to -369.21",
"-369.21 to 261.22",
"261.22 and up",
],
frequencies=[9, 8, 5],
),
"interQuartileRange": 883.236,
"max": 856.41,
"maxLength": None,
"mean": -160.16,
"median": -288.81,
"min": -999.63,
"minLength": None,
"missingCount": None,
"missingPercentage": None,
"nonParametricSkew": 0.24351799263849705,
"nullCount": 0.0,
"nullProportion": 0.0,
"stddev": 528.297718809555,
"sum": -3518.0,
"thirdQuartile": 389.816,
"uniqueCount": 22.0,
"uniqueProportion": 1.0,
"validCount": None,
"valuesCount": 22.0,
"valuesPercentage": None,
"variance": None,
}
}
],
}