mirror of
https://github.com/datahub-project/datahub.git
synced 2025-12-25 17:08:29 +00:00
test(ingest/athena): add connector integration tests (#12256)
This commit is contained in:
parent
a814cb389f
commit
7f64ffd2f7
1362
metadata-ingestion/tests/integration/athena/athena_mce_golden.json
Normal file
1362
metadata-ingestion/tests/integration/athena/athena_mce_golden.json
Normal file
File diff suppressed because it is too large
Load Diff
@ -0,0 +1,163 @@
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
from freezegun import freeze_time
|
||||
from sqlalchemy import ARRAY, BIGINT, INTEGER, String
|
||||
from sqlalchemy_bigquery import STRUCT
|
||||
|
||||
from datahub.ingestion.run.pipeline import Pipeline
|
||||
from datahub.ingestion.source.aws.s3_util import make_s3_urn
|
||||
from datahub.ingestion.source.sql.athena import AthenaSource
|
||||
from datahub.utilities.sqlalchemy_type_converter import MapType
|
||||
from tests.test_helpers import ( # Ensure mce_helpers is available for validation.
|
||||
mce_helpers,
|
||||
)
|
||||
|
||||
FROZEN_TIME = "2022-12-15 10:00:00"
|
||||
|
||||
|
||||
@freeze_time(FROZEN_TIME)
|
||||
def test_athena_source_ingestion(pytestconfig, tmp_path):
|
||||
"""Test Athena source ingestion and generate MCP JSON file for validation."""
|
||||
output_file_name = "athena_mce_output.json"
|
||||
golden_file_name = "athena_mce_golden.json"
|
||||
test_resources_dir = pytestconfig.rootpath / "tests/integration/athena"
|
||||
|
||||
# Mock dependencies
|
||||
with patch.object(
|
||||
AthenaSource, "get_inspectors"
|
||||
) as mock_get_inspectors, patch.object(
|
||||
AthenaSource, "get_table_properties"
|
||||
) as mock_get_table_properties:
|
||||
# Mock engine and inspectors
|
||||
mock_inspector = MagicMock()
|
||||
mock_get_inspectors.return_value = [mock_inspector]
|
||||
mock_engine_instance = MagicMock()
|
||||
mock_engine_instance.url.database = ""
|
||||
mock_inspector.engine = mock_engine_instance
|
||||
|
||||
# Mock schema and table names
|
||||
mock_inspector.get_schema_names.return_value = ["test_schema"]
|
||||
mock_inspector.get_table_names.return_value = ["test_table"]
|
||||
mock_inspector.get_view_names.return_value = ["test_view_1", "test_view_2"]
|
||||
|
||||
# Mock view definitions
|
||||
def mock_get_view_definition(view_name, schema):
|
||||
if view_name == "test_view_1":
|
||||
return (
|
||||
'CREATE VIEW "test_schema".test_view_1 AS\n'
|
||||
"SELECT *\n"
|
||||
"FROM\n"
|
||||
' "test_schema"."test_table"'
|
||||
)
|
||||
elif view_name == "test_view_2":
|
||||
return (
|
||||
'CREATE VIEW "test_schema".test_view_2 AS\n'
|
||||
"SELECT employee_id, employee_name, skills\n"
|
||||
"FROM\n"
|
||||
' "test_schema"."test_view_1"'
|
||||
)
|
||||
return ""
|
||||
|
||||
mock_inspector.get_view_definition.side_effect = mock_get_view_definition
|
||||
|
||||
mock_inspector.get_columns.return_value = [
|
||||
{
|
||||
"name": "employee_id",
|
||||
"type": String(),
|
||||
"nullable": False,
|
||||
"default": None,
|
||||
"autoincrement": False,
|
||||
"comment": "Unique identifier for the employee",
|
||||
"dialect_options": {"awsathena_partition": None},
|
||||
},
|
||||
{
|
||||
"name": "annual_salary",
|
||||
"type": BIGINT(),
|
||||
"nullable": True,
|
||||
"default": None,
|
||||
"autoincrement": False,
|
||||
"comment": "Annual salary of the employee in USD",
|
||||
"dialect_options": {"awsathena_partition": None},
|
||||
},
|
||||
{
|
||||
"name": "employee_name",
|
||||
"type": String(),
|
||||
"nullable": False,
|
||||
"default": None,
|
||||
"autoincrement": False,
|
||||
"comment": "Full name of the employee",
|
||||
"dialect_options": {"awsathena_partition": None},
|
||||
},
|
||||
{
|
||||
"name": "job_history",
|
||||
"type": MapType(
|
||||
String(), STRUCT(year=INTEGER(), company=String(), role=String())
|
||||
),
|
||||
"nullable": True,
|
||||
"default": None,
|
||||
"autoincrement": False,
|
||||
"comment": "Job history map: year to details (company, role)",
|
||||
"dialect_options": {"awsathena_partition": None},
|
||||
},
|
||||
{
|
||||
"name": "department_budgets",
|
||||
"type": MapType(String(), BIGINT()),
|
||||
"nullable": True,
|
||||
"default": None,
|
||||
"autoincrement": False,
|
||||
"comment": "Map of department names to their respective budgets",
|
||||
"dialect_options": {"awsathena_partition": None},
|
||||
},
|
||||
{
|
||||
"name": "skills",
|
||||
"type": ARRAY(String()),
|
||||
"nullable": True,
|
||||
"default": None,
|
||||
"autoincrement": False,
|
||||
"comment": "List of skills possessed by the employee",
|
||||
"dialect_options": {"awsathena_partition": None},
|
||||
},
|
||||
]
|
||||
# Mock table properties
|
||||
mock_get_table_properties.return_value = (
|
||||
"Test table description",
|
||||
{"key": "value", "table_type": "EXTERNAL_TABLE"},
|
||||
make_s3_urn("s3://test-bucket/test_table", "PROD"),
|
||||
)
|
||||
|
||||
# Define the pipeline configuration
|
||||
config_dict = {
|
||||
"run_id": "athena-test",
|
||||
"source": {
|
||||
"type": "athena",
|
||||
"config": {
|
||||
"aws_region": "us-east-1",
|
||||
"work_group": "primary",
|
||||
"query_result_location": "s3://athena-query-results/",
|
||||
"catalog_name": "awsdatacatalog",
|
||||
"include_views": True,
|
||||
"include_tables": True,
|
||||
"profiling": {
|
||||
"enabled": False,
|
||||
},
|
||||
},
|
||||
},
|
||||
"sink": {
|
||||
"type": "file",
|
||||
"config": {
|
||||
"filename": f"{tmp_path}/{output_file_name}",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
# Create and run the pipeline
|
||||
pipeline = Pipeline.create(config_dict)
|
||||
pipeline.run()
|
||||
pipeline.raise_from_status()
|
||||
|
||||
# Validate the output with the golden file
|
||||
mce_helpers.check_golden_file(
|
||||
pytestconfig=pytestconfig,
|
||||
output_path=f"{tmp_path}/{output_file_name}",
|
||||
golden_path=f"{test_resources_dir}/{golden_file_name}",
|
||||
)
|
||||
Loading…
x
Reference in New Issue
Block a user