mirror of
https://github.com/datahub-project/datahub.git
synced 2025-07-05 16:22:17 +00:00
49 lines
1.5 KiB
Python
49 lines
1.5 KiB
Python
import unittest
|
|
|
|
import pydantic
|
|
import pytest
|
|
|
|
from datahub.emitter.kafka_emitter import (
|
|
DEFAULT_MCE_KAFKA_TOPIC,
|
|
DEFAULT_MCP_KAFKA_TOPIC,
|
|
MCE_KEY,
|
|
MCP_KEY,
|
|
KafkaEmitterConfig,
|
|
)
|
|
|
|
|
|
class KafkaEmitterTest(unittest.TestCase):
|
|
def test_kafka_emitter_config(self):
|
|
emitter_config = KafkaEmitterConfig.parse_obj(
|
|
{"connection": {"bootstrap": "foobar:9092"}}
|
|
)
|
|
assert emitter_config.topic_routes[MCE_KEY] == DEFAULT_MCE_KAFKA_TOPIC
|
|
assert emitter_config.topic_routes[MCP_KEY] == DEFAULT_MCP_KAFKA_TOPIC
|
|
|
|
"""
|
|
Respecifying old and new topic config should barf
|
|
"""
|
|
|
|
def test_kafka_emitter_config_old_and_new(self):
|
|
with pytest.raises(pydantic.ValidationError):
|
|
KafkaEmitterConfig.parse_obj(
|
|
{
|
|
"connection": {"bootstrap": "foobar:9092"},
|
|
"topic": "NewTopic",
|
|
"topic_routes": {MCE_KEY: "NewTopic"},
|
|
}
|
|
)
|
|
|
|
"""
|
|
Old topic config provided should get auto-upgraded to new topic_routes
|
|
"""
|
|
|
|
def test_kafka_emitter_config_topic_upgrade(self):
|
|
emitter_config = KafkaEmitterConfig.parse_obj(
|
|
{"connection": {"bootstrap": "foobar:9092"}, "topic": "NewTopic"}
|
|
)
|
|
assert emitter_config.topic_routes[MCE_KEY] == "NewTopic" # MCE topic upgraded
|
|
assert (
|
|
emitter_config.topic_routes[MCP_KEY] == DEFAULT_MCP_KAFKA_TOPIC
|
|
) # No change to MCP
|