mirror of
https://github.com/datahub-project/datahub.git
synced 2025-07-03 23:28:11 +00:00
186 lines
6.2 KiB
Python
186 lines
6.2 KiB
Python
import logging
|
|
import subprocess
|
|
|
|
import pytest
|
|
import yaml
|
|
from freezegun import freeze_time
|
|
|
|
from datahub.configuration.common import ConfigurationError
|
|
from datahub.ingestion.api.source import SourceCapability
|
|
from datahub.ingestion.run.pipeline import Pipeline
|
|
from datahub.ingestion.source.kafka.kafka import KafkaSource, KafkaSourceConfig
|
|
from datahub.testing import mce_helpers
|
|
from tests.integration.kafka import oauth # type: ignore
|
|
from tests.test_helpers import test_connection_helpers
|
|
from tests.test_helpers.click_helpers import run_datahub_cmd
|
|
from tests.test_helpers.docker_helpers import wait_for_port
|
|
|
|
FROZEN_TIME = "2020-04-14 07:00:00"
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def test_resources_dir(pytestconfig):
|
|
return pytestconfig.rootpath / "tests/integration/kafka"
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def mock_kafka_service(docker_compose_runner, test_resources_dir):
|
|
with docker_compose_runner(
|
|
test_resources_dir / "docker-compose.yml", "kafka"
|
|
) as docker_services:
|
|
wait_for_port(docker_services, "test_zookeeper", 52181, timeout=180)
|
|
wait_for_port(docker_services, "test_broker", 29092, timeout=180)
|
|
wait_for_port(docker_services, "test_schema_registry", 8081, timeout=180)
|
|
|
|
# Set up topics and produce some data
|
|
command = f"{test_resources_dir}/send_records.sh {test_resources_dir}"
|
|
subprocess.run(command, shell=True, check=True)
|
|
|
|
yield docker_compose_runner
|
|
|
|
|
|
@pytest.mark.parametrize("approach", ["kafka_without_schemas", "kafka"])
|
|
@freeze_time(FROZEN_TIME)
|
|
@pytest.mark.integration
|
|
def test_kafka_ingest(
|
|
mock_kafka_service, test_resources_dir, pytestconfig, tmp_path, mock_time, approach
|
|
):
|
|
# Run the metadata ingestion pipeline.
|
|
config_file = (test_resources_dir / f"{approach}_to_file.yml").resolve()
|
|
run_datahub_cmd(["ingest", "-c", f"{config_file}"], tmp_path=tmp_path)
|
|
|
|
# Verify the output.
|
|
mce_helpers.check_golden_file(
|
|
pytestconfig,
|
|
output_path=tmp_path / f"{approach}_mces.json",
|
|
golden_path=test_resources_dir / f"{approach}_mces_golden.json",
|
|
ignore_paths=[],
|
|
)
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"config_dict, is_success",
|
|
[
|
|
(
|
|
{
|
|
"connection": {
|
|
"bootstrap": "localhost:29092",
|
|
"schema_registry_url": "http://localhost:28081",
|
|
},
|
|
},
|
|
True,
|
|
),
|
|
(
|
|
{
|
|
"connection": {
|
|
"bootstrap": "localhost:2909",
|
|
"schema_registry_url": "http://localhost:2808",
|
|
},
|
|
},
|
|
False,
|
|
),
|
|
],
|
|
)
|
|
@pytest.mark.integration
|
|
@freeze_time(FROZEN_TIME)
|
|
def test_kafka_test_connection(mock_kafka_service, config_dict, is_success):
|
|
report = test_connection_helpers.run_test_connection(KafkaSource, config_dict)
|
|
if is_success:
|
|
test_connection_helpers.assert_basic_connectivity_success(report)
|
|
test_connection_helpers.assert_capability_report(
|
|
capability_report=report.capability_report,
|
|
success_capabilities=[SourceCapability.SCHEMA_METADATA],
|
|
)
|
|
else:
|
|
test_connection_helpers.assert_basic_connectivity_failure(
|
|
report, "Failed to get metadata"
|
|
)
|
|
test_connection_helpers.assert_capability_report(
|
|
capability_report=report.capability_report,
|
|
failure_capabilities={
|
|
SourceCapability.SCHEMA_METADATA: "[Errno 111] Connection refused"
|
|
},
|
|
)
|
|
|
|
|
|
@freeze_time(FROZEN_TIME)
|
|
@pytest.mark.integration
|
|
def test_kafka_oauth_callback(
|
|
mock_kafka_service, test_resources_dir, pytestconfig, tmp_path, mock_time
|
|
):
|
|
# Run the metadata ingestion pipeline.
|
|
config_file = (test_resources_dir / "kafka_to_file_oauth.yml").resolve()
|
|
|
|
log_file = tmp_path / "kafka_oauth_message.log"
|
|
|
|
file_handler = logging.FileHandler(
|
|
str(log_file)
|
|
) # Add a file handler to later validate a test-case
|
|
logging.getLogger().addHandler(file_handler)
|
|
|
|
recipe: dict = {}
|
|
with open(config_file) as fp:
|
|
recipe = yaml.safe_load(fp)
|
|
|
|
pipeline = Pipeline.create(recipe)
|
|
|
|
pipeline.run()
|
|
|
|
# Initialize flags to track oauth events
|
|
checks = {
|
|
"consumer_polling": False,
|
|
"consumer_oauth_callback": False,
|
|
"admin_polling": False,
|
|
"admin_oauth_callback": False,
|
|
}
|
|
|
|
# Read log file and check for oauth events
|
|
with open(log_file, "r") as file:
|
|
for line in file:
|
|
# Check for polling events
|
|
if "Initiating polling for kafka admin client" in line:
|
|
checks["admin_polling"] = True
|
|
elif "Initiating polling for kafka consumer" in line:
|
|
checks["consumer_polling"] = True
|
|
|
|
# Check for oauth callbacks
|
|
if oauth.MESSAGE in line:
|
|
if checks["consumer_polling"] and not checks["admin_polling"]:
|
|
checks["consumer_oauth_callback"] = True
|
|
elif checks["consumer_polling"] and checks["admin_polling"]:
|
|
checks["admin_oauth_callback"] = True
|
|
|
|
# Verify all oauth events occurred
|
|
assert checks["consumer_polling"], "Consumer polling was not initiated"
|
|
assert checks["consumer_oauth_callback"], "Consumer oauth callback not found"
|
|
assert checks["admin_polling"], "Admin polling was not initiated"
|
|
assert checks["admin_oauth_callback"], "Admin oauth callback not found"
|
|
|
|
|
|
def test_kafka_source_oauth_cb_signature():
|
|
with pytest.raises(
|
|
ConfigurationError,
|
|
match=("oauth_cb function must accept single positional argument."),
|
|
):
|
|
KafkaSourceConfig.parse_obj(
|
|
{
|
|
"connection": {
|
|
"bootstrap": "foobar:9092",
|
|
"consumer_config": {"oauth_cb": "oauth:create_token_no_args"},
|
|
}
|
|
}
|
|
)
|
|
|
|
with pytest.raises(
|
|
ConfigurationError,
|
|
match=("oauth_cb function must accept single positional argument."),
|
|
):
|
|
KafkaSourceConfig.parse_obj(
|
|
{
|
|
"connection": {
|
|
"bootstrap": "foobar:9092",
|
|
"consumer_config": {"oauth_cb": "oauth:create_token_only_kwargs"},
|
|
}
|
|
}
|
|
)
|