mirror of
https://github.com/datahub-project/datahub.git
synced 2025-07-05 08:07:04 +00:00
182 lines
6.5 KiB
Python
182 lines
6.5 KiB
Python
import time
|
|
from typing import Any, Dict, List
|
|
from unittest.mock import patch
|
|
|
|
import pytest
|
|
from confluent_kafka.admin import AdminClient, NewTopic
|
|
from freezegun import freeze_time
|
|
|
|
from tests.test_helpers.docker_helpers import wait_for_port
|
|
from tests.test_helpers.state_helpers import (
|
|
get_current_checkpoint_from_pipeline,
|
|
run_and_get_pipeline,
|
|
validate_all_providers_have_committed_successfully,
|
|
)
|
|
|
|
FROZEN_TIME = "2020-04-14 07:00:00"
|
|
KAFKA_PORT = 29092
|
|
KAFKA_BOOTSTRAP_SERVER = f"localhost:{KAFKA_PORT}"
|
|
GMS_PORT = 8080
|
|
GMS_SERVER = f"http://localhost:{GMS_PORT}"
|
|
|
|
|
|
class KafkaTopicsCxtManager:
|
|
def __init__(self, topics: List[str], bootstrap_servers: str) -> None:
|
|
self.topics = topics
|
|
self.bootstrap_servers = bootstrap_servers
|
|
|
|
def create_kafka_topics(self, topics: List[NewTopic]) -> None:
|
|
"""
|
|
creates new kafka topics
|
|
"""
|
|
admin_config: Dict[str, Any] = {
|
|
"bootstrap.servers": self.bootstrap_servers,
|
|
}
|
|
a = AdminClient(admin_config)
|
|
|
|
fs = a.create_topics(topics, operation_timeout=3)
|
|
|
|
# Wait for operation to finish.
|
|
for topic, f in fs.items():
|
|
try:
|
|
f.result() # The result itself is None
|
|
print(f"Topic {topic} created")
|
|
except Exception as e:
|
|
print(f"Failed to create topic {topic}: {e}")
|
|
raise e
|
|
|
|
def delete_kafka_topics(self, topics: List[str]) -> None:
|
|
"""
|
|
delete a list of existing Kafka topics
|
|
"""
|
|
admin_config: Dict[str, Any] = {
|
|
"bootstrap.servers": self.bootstrap_servers,
|
|
}
|
|
a = AdminClient(admin_config)
|
|
|
|
fs = a.delete_topics(topics, operation_timeout=3)
|
|
|
|
# Wait for operation to finish.
|
|
for topic, f in fs.items():
|
|
try:
|
|
f.result() # The result itself is None
|
|
print(f"Topic {topic} deleted")
|
|
except Exception as e:
|
|
# this error should be ignored when we already deleted
|
|
# the topic within the test code
|
|
print(f"Failed to delete topic {topic}: {e}")
|
|
|
|
def __enter__(self):
|
|
topics = [
|
|
NewTopic(topic=topic_name, num_partitions=1, replication_factor=1)
|
|
for topic_name in self.topics
|
|
]
|
|
self.create_kafka_topics(topics)
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc, traceback):
|
|
self.delete_kafka_topics(self.topics)
|
|
|
|
|
|
@freeze_time(FROZEN_TIME)
|
|
@pytest.mark.integration
|
|
def test_kafka_ingest_with_stateful(
|
|
docker_compose_runner, pytestconfig, tmp_path, mock_time, mock_datahub_graph
|
|
):
|
|
test_resources_dir = pytestconfig.rootpath / "tests/integration/kafka"
|
|
topic_prefix: str = "stateful_ingestion_test"
|
|
topic_names: List[str] = [f"{topic_prefix}_t1", f"{topic_prefix}_t2"]
|
|
platform_instance = "test_platform_instance_1"
|
|
|
|
with docker_compose_runner(
|
|
test_resources_dir / "docker-compose.yml", "kafka"
|
|
) as docker_services:
|
|
wait_for_port(docker_services, "test_broker", KAFKA_PORT, timeout=120)
|
|
wait_for_port(docker_services, "test_schema_registry", 8081, timeout=120)
|
|
|
|
source_config_dict: Dict[str, Any] = {
|
|
"connection": {
|
|
"bootstrap": KAFKA_BOOTSTRAP_SERVER,
|
|
},
|
|
"platform_instance": f"{platform_instance}",
|
|
# enable stateful ingestion
|
|
"stateful_ingestion": {
|
|
"enabled": True,
|
|
"remove_stale_metadata": True,
|
|
"fail_safe_threshold": 100.0,
|
|
"state_provider": {
|
|
"type": "datahub",
|
|
"config": {"datahub_api": {"server": GMS_SERVER}},
|
|
},
|
|
},
|
|
}
|
|
|
|
pipeline_config_dict: Dict[str, Any] = {
|
|
"source": {
|
|
"type": "kafka",
|
|
"config": source_config_dict,
|
|
},
|
|
"sink": {
|
|
# we are not really interested in the resulting events for this test
|
|
"type": "console"
|
|
},
|
|
"pipeline_name": "test_pipeline",
|
|
# enable reporting
|
|
"reporting": [
|
|
{
|
|
"type": "datahub",
|
|
}
|
|
],
|
|
}
|
|
|
|
# topics will be automatically created and deleted upon test completion
|
|
with KafkaTopicsCxtManager(
|
|
topic_names, KAFKA_BOOTSTRAP_SERVER
|
|
) as kafka_ctx, patch(
|
|
"datahub.ingestion.source.state_provider.datahub_ingestion_checkpointing_provider.DataHubGraph",
|
|
mock_datahub_graph,
|
|
) as mock_checkpoint:
|
|
# both checkpoint and reporting will use the same mocked graph instance
|
|
mock_checkpoint.return_value = mock_datahub_graph
|
|
|
|
# 1. Do the first run of the pipeline and get the default job's checkpoint.
|
|
pipeline_run1 = run_and_get_pipeline(pipeline_config_dict)
|
|
checkpoint1 = get_current_checkpoint_from_pipeline(pipeline_run1)
|
|
|
|
assert checkpoint1
|
|
assert checkpoint1.state
|
|
|
|
# 2. Drop the first topic created during step 1 + rerun the pipeline and get the checkpoint state.
|
|
kafka_ctx.delete_kafka_topics([kafka_ctx.topics[0]])
|
|
# sleep to guarantee eventual consistency for kafka topic deletion
|
|
time.sleep(1)
|
|
pipeline_run2 = run_and_get_pipeline(pipeline_config_dict)
|
|
checkpoint2 = get_current_checkpoint_from_pipeline(pipeline_run2)
|
|
|
|
assert checkpoint2
|
|
assert checkpoint2.state
|
|
|
|
# 3. Perform all assertions on the states. The deleted topic should not be
|
|
# part of the second state
|
|
state1 = checkpoint1.state
|
|
state2 = checkpoint2.state
|
|
difference_urns = list(
|
|
state1.get_urns_not_in(type="topic", other_checkpoint_state=state2)
|
|
)
|
|
|
|
assert len(difference_urns) == 1
|
|
assert (
|
|
difference_urns[0]
|
|
== f"urn:li:dataset:(urn:li:dataPlatform:kafka,{platform_instance}.{kafka_ctx.topics[0]},PROD)"
|
|
)
|
|
|
|
# 4. Validate that all providers have committed successfully.
|
|
# NOTE: The following validation asserts for presence of state as well
|
|
# and validates reporting.
|
|
validate_all_providers_have_committed_successfully(
|
|
pipeline=pipeline_run1, expected_providers=1
|
|
)
|
|
validate_all_providers_have_committed_successfully(
|
|
pipeline=pipeline_run1, expected_providers=1
|
|
)
|