186 lines
6.2 KiB
Python

import logging
import subprocess
import pytest
import yaml
from freezegun import freeze_time
from datahub.configuration.common import ConfigurationError
from datahub.ingestion.api.source import SourceCapability
from datahub.ingestion.run.pipeline import Pipeline
from datahub.ingestion.source.kafka.kafka import KafkaSource, KafkaSourceConfig
from datahub.testing import mce_helpers
from tests.integration.kafka import oauth # type: ignore
from tests.test_helpers import test_connection_helpers
from tests.test_helpers.click_helpers import run_datahub_cmd
from tests.test_helpers.docker_helpers import wait_for_port
FROZEN_TIME = "2020-04-14 07:00:00"
@pytest.fixture(scope="module")
def test_resources_dir(pytestconfig):
return pytestconfig.rootpath / "tests/integration/kafka"
@pytest.fixture(scope="module")
def mock_kafka_service(docker_compose_runner, test_resources_dir):
with docker_compose_runner(
test_resources_dir / "docker-compose.yml", "kafka"
) as docker_services:
wait_for_port(docker_services, "test_zookeeper", 52181, timeout=180)
wait_for_port(docker_services, "test_broker", 29092, timeout=180)
wait_for_port(docker_services, "test_schema_registry", 8081, timeout=180)
# Set up topics and produce some data
command = f"{test_resources_dir}/send_records.sh {test_resources_dir}"
subprocess.run(command, shell=True, check=True)
yield docker_compose_runner
@pytest.mark.parametrize("approach", ["kafka_without_schemas", "kafka"])
@freeze_time(FROZEN_TIME)
@pytest.mark.integration
def test_kafka_ingest(
mock_kafka_service, test_resources_dir, pytestconfig, tmp_path, mock_time, approach
):
# Run the metadata ingestion pipeline.
config_file = (test_resources_dir / f"{approach}_to_file.yml").resolve()
run_datahub_cmd(["ingest", "-c", f"{config_file}"], tmp_path=tmp_path)
# Verify the output.
mce_helpers.check_golden_file(
pytestconfig,
output_path=tmp_path / f"{approach}_mces.json",
golden_path=test_resources_dir / f"{approach}_mces_golden.json",
ignore_paths=[],
)
@pytest.mark.parametrize(
"config_dict, is_success",
[
(
{
"connection": {
"bootstrap": "localhost:29092",
"schema_registry_url": "http://localhost:28081",
},
},
True,
),
(
{
"connection": {
"bootstrap": "localhost:2909",
"schema_registry_url": "http://localhost:2808",
},
},
False,
),
],
)
@pytest.mark.integration
@freeze_time(FROZEN_TIME)
def test_kafka_test_connection(mock_kafka_service, config_dict, is_success):
report = test_connection_helpers.run_test_connection(KafkaSource, config_dict)
if is_success:
test_connection_helpers.assert_basic_connectivity_success(report)
test_connection_helpers.assert_capability_report(
capability_report=report.capability_report,
success_capabilities=[SourceCapability.SCHEMA_METADATA],
)
else:
test_connection_helpers.assert_basic_connectivity_failure(
report, "Failed to get metadata"
)
test_connection_helpers.assert_capability_report(
capability_report=report.capability_report,
failure_capabilities={
SourceCapability.SCHEMA_METADATA: "[Errno 111] Connection refused"
},
)
@freeze_time(FROZEN_TIME)
@pytest.mark.integration
def test_kafka_oauth_callback(
mock_kafka_service, test_resources_dir, pytestconfig, tmp_path, mock_time
):
# Run the metadata ingestion pipeline.
config_file = (test_resources_dir / "kafka_to_file_oauth.yml").resolve()
log_file = tmp_path / "kafka_oauth_message.log"
file_handler = logging.FileHandler(
str(log_file)
) # Add a file handler to later validate a test-case
logging.getLogger().addHandler(file_handler)
recipe: dict = {}
with open(config_file) as fp:
recipe = yaml.safe_load(fp)
pipeline = Pipeline.create(recipe)
pipeline.run()
# Initialize flags to track oauth events
checks = {
"consumer_polling": False,
"consumer_oauth_callback": False,
"admin_polling": False,
"admin_oauth_callback": False,
}
# Read log file and check for oauth events
with open(log_file, "r") as file:
for line in file:
# Check for polling events
if "Initiating polling for kafka admin client" in line:
checks["admin_polling"] = True
elif "Initiating polling for kafka consumer" in line:
checks["consumer_polling"] = True
# Check for oauth callbacks
if oauth.MESSAGE in line:
if checks["consumer_polling"] and not checks["admin_polling"]:
checks["consumer_oauth_callback"] = True
elif checks["consumer_polling"] and checks["admin_polling"]:
checks["admin_oauth_callback"] = True
# Verify all oauth events occurred
assert checks["consumer_polling"], "Consumer polling was not initiated"
assert checks["consumer_oauth_callback"], "Consumer oauth callback not found"
assert checks["admin_polling"], "Admin polling was not initiated"
assert checks["admin_oauth_callback"], "Admin oauth callback not found"
def test_kafka_source_oauth_cb_signature():
with pytest.raises(
ConfigurationError,
match=("oauth_cb function must accept single positional argument."),
):
KafkaSourceConfig.parse_obj(
{
"connection": {
"bootstrap": "foobar:9092",
"consumer_config": {"oauth_cb": "oauth:create_token_no_args"},
}
}
)
with pytest.raises(
ConfigurationError,
match=("oauth_cb function must accept single positional argument."),
):
KafkaSourceConfig.parse_obj(
{
"connection": {
"bootstrap": "foobar:9092",
"consumer_config": {"oauth_cb": "oauth:create_token_only_kwargs"},
}
}
)