MINOR: Fix athena e2e tests (#15486)

* Comment side effects

* Update assert to match clauses better

* Improve input

* Improve input

* Update assert to match clauses better

* Fix Athena E2E Values

* Uncomment needed steps

* Fix linters
This commit is contained in:
IceS2 2024-03-08 09:31:06 +01:00 committed by GitHub
parent c7afe29441
commit 7805a0b609
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 69 additions and 76 deletions

View File

@ -14,10 +14,11 @@ on:
schedule:
- cron: '0 0 * * *'
workflow_dispatch:
input:
inputs:
e2e-tests:
description: "E2E Tests to run"
type: string
required: True
default: '["bigquery", "dbt_redshift", "metabase", "mssql", "mysql", "redash", "snowflake", "tableau", "powerbi", "vertica", "python", "redshift", "quicksight", "datalake_s3", "postgres", "oracle", "athena", "bigquery_multiple_project"]'
permissions:
id-token: write

View File

@ -51,53 +51,53 @@ class CliCommonDB:
def assert_for_vanilla_ingestion(
self, source_status: Status, sink_status: Status
) -> None:
self.assertTrue(len(source_status.failures) == 0)
self.assertTrue(len(source_status.warnings) == 0)
self.assertTrue(len(source_status.filtered) == 0)
self.assertTrue(
(len(source_status.records) + len(source_status.updated_records))
>= self.expected_tables()
self.assertEqual(len(source_status.failures), 0)
self.assertEqual(len(source_status.warnings), 0)
self.assertEqual(len(source_status.filtered), 0)
self.assertGreaterEqual(
(len(source_status.records) + len(source_status.updated_records)),
self.expected_tables(),
)
self.assertTrue(len(sink_status.failures) == 0)
self.assertTrue(len(sink_status.warnings) == 0)
self.assertTrue(
(len(sink_status.records) + len(sink_status.updated_records))
> self.expected_tables()
self.assertEqual(len(sink_status.failures), 0)
self.assertEqual(len(sink_status.warnings), 0)
self.assertGreater(
(len(sink_status.records) + len(sink_status.updated_records)),
self.expected_tables(),
)
def assert_for_table_with_profiler(
self, source_status: Status, sink_status: Status
):
self.assertTrue(len(source_status.failures) == 0)
self.assertTrue(
(len(source_status.records) + len(source_status.updated_records))
>= self.expected_profiled_tables()
self.assertEqual(len(source_status.failures), 0)
self.assertGreaterEqual(
(len(source_status.records) + len(source_status.updated_records)),
self.expected_profiled_tables(),
)
self.assertTrue(len(sink_status.failures) == 0)
self.assertTrue(
(len(sink_status.records) + len(sink_status.updated_records))
>= self.expected_profiled_tables()
self.assertEqual(len(sink_status.failures), 0)
self.assertGreaterEqual(
(len(sink_status.records) + len(sink_status.updated_records)),
self.expected_profiled_tables(),
)
sample_data = self.retrieve_sample_data(self.fqn_created_table()).sampleData
lineage = self.retrieve_lineage(self.fqn_created_table())
self.assertTrue(len(sample_data.rows) == self.inserted_rows_count())
self.assertEqual(len(sample_data.rows), self.inserted_rows_count())
if self.view_column_lineage_count() is not None:
self.assertTrue(
self.assertEqual(
len(
lineage["downstreamEdges"][0]["lineageDetails"][
"columnsLineage"
]
)
== self.view_column_lineage_count()
),
self.view_column_lineage_count(),
)
def assert_for_table_with_profiler_time_partition(
self, source_status: Status, sink_status: Status
):
self.assertTrue(len(source_status.failures) == 0)
self.assertTrue(len(sink_status.failures) == 0)
self.assertEqual(len(source_status.failures), 0)
self.assertEqual(len(sink_status.failures), 0)
sample_data = self.retrieve_sample_data(self.fqn_created_table()).sampleData
self.assertTrue(len(sample_data.rows) <= self.inserted_rows_count())
self.assertLessEqual(len(sample_data.rows), self.inserted_rows_count())
profile = self.retrieve_profile(self.fqn_created_table())
expected_profiler_time_partition_results = (
self.get_profiler_time_partition_results()
@ -105,11 +105,9 @@ class CliCommonDB:
if expected_profiler_time_partition_results:
table_profile = profile.profile.dict()
for key in expected_profiler_time_partition_results["table_profile"]:
self.assertTrue(
table_profile[key]
== expected_profiler_time_partition_results["table_profile"][
key
]
self.assertEqual(
table_profile[key],
expected_profiler_time_partition_results["table_profile"][key],
)
for column in profile.columns:
@ -127,16 +125,18 @@ class CliCommonDB:
column_profile = column.profile.dict()
for key in expected_column_profile: # type: ignore
if key == "nonParametricSkew":
self.assertTrue(
column_profile[key].__round__(10)
== expected_column_profile[key].__round__(10)
self.assertEqual(
column_profile[key].__round__(10),
expected_column_profile[key].__round__(10),
)
continue
self.assertTrue(
column_profile[key] == expected_column_profile[key]
self.assertEqual(
column_profile[key], expected_column_profile[key]
)
if sample_data:
self.assertTrue(len(json.loads(sample_data.json()).get("rows")) > 0)
self.assertGreater(
len(json.loads(sample_data.json()).get("rows")), 0
)
def assert_for_delete_table_is_marked_as_deleted(
self, source_status: Status, sink_status: Status
@ -146,46 +146,38 @@ class CliCommonDB:
def assert_filtered_schemas_includes(
self, source_status: Status, sink_status: Status
):
self.assertTrue((len(source_status.failures) == 0))
self.assertTrue(
(
len(source_status.filtered)
== self.expected_filtered_schema_includes()
)
self.assertEqual(len(source_status.failures), 0)
self.assertEqual(
len(source_status.filtered), self.expected_filtered_schema_includes()
)
def assert_filtered_schemas_excludes(
self, source_status: Status, sink_status: Status
):
self.assertTrue((len(source_status.failures) == 0))
self.assertTrue(
(
len(source_status.filtered)
== self.expected_filtered_schema_excludes()
)
self.assertEqual(len(source_status.failures), 0)
self.assertEqual(
len(source_status.filtered), self.expected_filtered_schema_excludes()
)
def assert_filtered_tables_includes(
self, source_status: Status, sink_status: Status
):
self.assertTrue((len(source_status.failures) == 0))
self.assertTrue(
(len(source_status.filtered) == self.expected_filtered_table_includes())
self.assertEqual(len(source_status.failures), 0)
self.assertEqual(
len(source_status.filtered), self.expected_filtered_table_includes()
)
def assert_filtered_tables_excludes(
self, source_status: Status, sink_status: Status
):
self.assertTrue((len(source_status.failures) == 0))
self.assertTrue(
(len(source_status.filtered) == self.expected_filtered_table_excludes())
self.assertEqual(len(source_status.failures), 0)
self.assertEqual(
len(source_status.filtered), self.expected_filtered_table_excludes()
)
def assert_filtered_mix(self, source_status: Status, sink_status: Status):
self.assertTrue((len(source_status.failures) == 0))
self.assertTrue(
(len(source_status.filtered) == self.expected_filtered_mix())
)
self.assertEqual(len(source_status.failures), 0)
self.assertEqual(len(source_status.filtered), self.expected_filtered_mix())
@staticmethod
@abstractmethod

View File

@ -88,7 +88,7 @@ class AthenaCliTest(CliCommonDB.TestSuite):
@staticmethod
def expected_filtered_schema_includes() -> int:
return 5
return 6
@staticmethod
def expected_filtered_schema_excludes() -> int:
@ -96,15 +96,15 @@ class AthenaCliTest(CliCommonDB.TestSuite):
@staticmethod
def expected_filtered_table_includes() -> int:
return 7
return 8
@staticmethod
def expected_filtered_table_excludes() -> int:
return 7
return 8
@staticmethod
def expected_filtered_mix() -> int:
return 7
return 8
def retrieve_lineage(self, entity_fqn: str) -> dict:
pass
@ -133,16 +133,16 @@ class AthenaCliTest(CliCommonDB.TestSuite):
def assert_for_vanilla_ingestion(
self, source_status: Status, sink_status: Status
) -> None:
self.assertTrue(len(source_status.failures) == 0)
self.assertTrue(len(source_status.warnings) == 0)
self.assertTrue(len(source_status.filtered) == 5)
self.assertTrue(
(len(source_status.records) + len(source_status.updated_records))
>= self.expected_tables()
self.assertEqual(len(source_status.failures), 0)
self.assertEqual(len(source_status.warnings), 0)
self.assertEqual(len(source_status.filtered), 6)
self.assertGreaterEqual(
len(source_status.records) + len(source_status.updated_records),
self.expected_tables(),
)
self.assertTrue(len(sink_status.failures) == 0)
self.assertTrue(len(sink_status.warnings) == 0)
self.assertTrue(
(len(sink_status.records) + len(sink_status.updated_records))
>= self.expected_tables()
self.assertEqual(len(sink_status.failures), 0)
self.assertEqual(len(sink_status.warnings), 0)
self.assertGreaterEqual(
len(sink_status.records) + len(sink_status.updated_records),
self.expected_tables(),
)