2021-08-03 11:17:16 -07:00
|
|
|
import unittest
|
|
|
|
|
2022-09-22 16:32:07 -07:00
|
|
|
import pydantic
|
2021-08-03 11:17:16 -07:00
|
|
|
import pytest
|
|
|
|
|
|
|
|
from datahub.emitter.kafka_emitter import (
|
|
|
|
DEFAULT_MCE_KAFKA_TOPIC,
|
|
|
|
DEFAULT_MCP_KAFKA_TOPIC,
|
|
|
|
MCE_KEY,
|
|
|
|
MCP_KEY,
|
|
|
|
KafkaEmitterConfig,
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
class KafkaEmitterTest(unittest.TestCase):
|
|
|
|
def test_kafka_emitter_config(self):
|
|
|
|
emitter_config = KafkaEmitterConfig.parse_obj(
|
|
|
|
{"connection": {"bootstrap": "foobar:9092"}}
|
|
|
|
)
|
|
|
|
assert emitter_config.topic_routes[MCE_KEY] == DEFAULT_MCE_KAFKA_TOPIC
|
|
|
|
assert emitter_config.topic_routes[MCP_KEY] == DEFAULT_MCP_KAFKA_TOPIC
|
|
|
|
|
|
|
|
"""
|
|
|
|
Respecifying old and new topic config should barf
|
|
|
|
"""
|
|
|
|
|
|
|
|
def test_kafka_emitter_config_old_and_new(self):
|
2022-09-22 16:32:07 -07:00
|
|
|
with pytest.raises(pydantic.ValidationError):
|
|
|
|
KafkaEmitterConfig.parse_obj(
|
2021-08-03 11:17:16 -07:00
|
|
|
{
|
|
|
|
"connection": {"bootstrap": "foobar:9092"},
|
|
|
|
"topic": "NewTopic",
|
|
|
|
"topic_routes": {MCE_KEY: "NewTopic"},
|
|
|
|
}
|
|
|
|
)
|
|
|
|
|
|
|
|
"""
|
|
|
|
Old topic config provided should get auto-upgraded to new topic_routes
|
|
|
|
"""
|
|
|
|
|
|
|
|
def test_kafka_emitter_config_topic_upgrade(self):
|
|
|
|
emitter_config = KafkaEmitterConfig.parse_obj(
|
|
|
|
{"connection": {"bootstrap": "foobar:9092"}, "topic": "NewTopic"}
|
|
|
|
)
|
|
|
|
assert emitter_config.topic_routes[MCE_KEY] == "NewTopic" # MCE topic upgraded
|
|
|
|
assert (
|
|
|
|
emitter_config.topic_routes[MCP_KEY] == DEFAULT_MCP_KAFKA_TOPIC
|
|
|
|
) # No change to MCP
|