datahub/metadata-ingestion/tests/unit/test_kafka_sink.py

66 lines
3.4 KiB
Python
Raw Normal View History

2021-02-06 09:42:05 -08:00
from gometa.ingestion.sink.datahub_kafka import DatahubKafkaSink, KafkaCallback
2021-01-31 22:40:30 -08:00
from gometa.ingestion.api.sink import WriteCallback
import unittest
from unittest.mock import patch, MagicMock
2021-02-06 09:42:05 -08:00
from gometa.ingestion.api.common import RecordEnvelope, PipelineContext
2021-01-31 22:40:30 -08:00
class KafkaSinkTest(unittest.TestCase):
2021-02-06 09:42:05 -08:00
@patch("gometa.ingestion.sink.datahub_kafka.PipelineContext")
@patch("gometa.ingestion.sink.datahub_kafka.SerializingProducer")
def test_kafka_sink_config(self, mock_producer, mock_context):
2021-01-31 22:40:30 -08:00
mock_producer_instance = mock_producer.return_value
2021-02-06 09:42:05 -08:00
kafka_sink = DatahubKafkaSink.create({'connection': {'bootstrap': 'foobar:9092'}}, mock_context)
2021-01-31 22:40:30 -08:00
assert mock_producer.call_count == 1 #constructor should be called
def validate_kafka_callback(self, mock_k_callback, record_envelope, write_callback):
assert mock_k_callback.call_count == 1 # KafkaCallback constructed
constructor_args, constructor_kwargs = mock_k_callback.call_args
assert constructor_args[0] == record_envelope
assert constructor_args[1] == write_callback
2021-02-06 09:42:05 -08:00
@patch("gometa.ingestion.sink.datahub_kafka.PipelineContext")
@patch("gometa.ingestion.sink.datahub_kafka.SerializingProducer")
@patch("gometa.ingestion.sink.datahub_kafka.KafkaCallback")
def test_kafka_sink_write(self, mock_k_callback, mock_producer, mock_context):
2021-01-31 22:40:30 -08:00
mock_producer_instance = mock_producer.return_value
mock_k_callback_instance = mock_k_callback.return_value
callback = MagicMock(spec=WriteCallback)
2021-02-06 09:42:05 -08:00
kafka_sink = DatahubKafkaSink.create({'connection': {'bootstrap': 'foobar:9092'}}, mock_context)
2021-01-31 22:40:30 -08:00
re = RecordEnvelope(record="test", metadata={})
kafka_sink.write_record_async(re, callback)
assert mock_producer_instance.poll.call_count == 1 # poll() called once
self.validate_kafka_callback(mock_k_callback, re, callback) # validate kafka callback was constructed appropriately
# validate that confluent_kafka.Producer.produce was called with the right arguments
args, kwargs = mock_producer_instance.produce.call_args
2021-02-06 09:42:05 -08:00
created_callback = kwargs["on_delivery"]
2021-01-31 22:40:30 -08:00
assert created_callback == mock_k_callback_instance.kafka_callback
## Todo: Test that kafka producer is configured correctly
2021-02-06 09:42:05 -08:00
@patch("gometa.ingestion.sink.datahub_kafka.PipelineContext")
@patch("gometa.ingestion.sink.datahub_kafka.SerializingProducer")
def test_kafka_sink_close(self, mock_producer, mock_context):
2021-01-31 22:40:30 -08:00
mock_producer_instance = mock_producer.return_value
2021-02-06 09:42:05 -08:00
kafka_sink = DatahubKafkaSink.create({}, mock_context)
2021-01-31 22:40:30 -08:00
kafka_sink.close()
mock_producer_instance.flush.assert_called_once()
2021-02-06 09:42:05 -08:00
@patch("gometa.ingestion.sink.datahub_kafka.RecordEnvelope")
@patch("gometa.ingestion.sink.datahub_kafka.WriteCallback")
2021-01-31 22:40:30 -08:00
def test_kafka_callback_class(self, mock_w_callback, mock_re):
callback = KafkaCallback(record_envelope = mock_re, write_callback = mock_w_callback)
mock_error = MagicMock()
mock_message = MagicMock()
callback.kafka_callback(mock_error, mock_message)
assert mock_w_callback.on_failure.call_count == 1
mock_w_callback.on_failure.called_with(mock_re, None, {"error", mock_error})
callback.kafka_callback(None, mock_message)
mock_w_callback.on_success.called_once_with(mock_re, {"msg", mock_message})