datahub/metadata-ingestion/tests/unit/test_kafka_sink.py

70 lines
3.4 KiB
Python
Raw Normal View History

2021-02-06 09:42:05 -08:00
from gometa.ingestion.sink.datahub_kafka import DatahubKafkaSink, KafkaCallback
2021-02-09 15:58:26 -08:00
from gometa.ingestion.api.sink import WriteCallback, SinkReport
2021-01-31 22:40:30 -08:00
import unittest
from unittest.mock import patch, MagicMock
2021-02-11 21:59:54 -08:00
from gometa.ingestion.api.common import RecordEnvelope
2021-01-31 22:40:30 -08:00
class KafkaSinkTest(unittest.TestCase):
2021-02-06 09:42:05 -08:00
@patch("gometa.ingestion.sink.datahub_kafka.PipelineContext")
@patch("gometa.ingestion.sink.datahub_kafka.SerializingProducer")
def test_kafka_sink_config(self, mock_producer, mock_context):
2021-02-11 22:48:08 -08:00
kafka_sink = DatahubKafkaSink.create(
{"connection": {"bootstrap": "foobar:9092"}}, mock_context
)
kafka_sink.close()
2021-02-11 16:00:29 -08:00
assert mock_producer.call_count == 1 # constructor should be called
2021-01-31 22:40:30 -08:00
def validate_kafka_callback(self, mock_k_callback, record_envelope, write_callback):
assert mock_k_callback.call_count == 1 # KafkaCallback constructed
constructor_args, constructor_kwargs = mock_k_callback.call_args
2021-02-09 15:58:26 -08:00
assert constructor_args[1] == record_envelope
assert constructor_args[2] == write_callback
2021-01-31 22:40:30 -08:00
2021-02-06 09:42:05 -08:00
@patch("gometa.ingestion.sink.datahub_kafka.PipelineContext")
@patch("gometa.ingestion.sink.datahub_kafka.SerializingProducer")
@patch("gometa.ingestion.sink.datahub_kafka.KafkaCallback")
def test_kafka_sink_write(self, mock_k_callback, mock_producer, mock_context):
2021-02-11 16:00:29 -08:00
mock_producer_instance = mock_producer.return_value
2021-01-31 22:40:30 -08:00
mock_k_callback_instance = mock_k_callback.return_value
callback = MagicMock(spec=WriteCallback)
2021-02-11 22:48:08 -08:00
kafka_sink = DatahubKafkaSink.create(
{"connection": {"bootstrap": "foobar:9092"}}, mock_context
)
2021-01-31 22:40:30 -08:00
re = RecordEnvelope(record="test", metadata={})
kafka_sink.write_record_async(re, callback)
2021-02-11 16:00:29 -08:00
assert mock_producer_instance.poll.call_count == 1 # poll() called once
self.validate_kafka_callback(
mock_k_callback, re, callback
) # validate kafka callback was constructed appropriately
2021-01-31 22:40:30 -08:00
# validate that confluent_kafka.Producer.produce was called with the right arguments
args, kwargs = mock_producer_instance.produce.call_args
2021-02-06 09:42:05 -08:00
created_callback = kwargs["on_delivery"]
2021-01-31 22:40:30 -08:00
assert created_callback == mock_k_callback_instance.kafka_callback
2021-02-11 16:00:29 -08:00
2021-02-11 22:48:08 -08:00
# TODO: Test that kafka producer is configured correctly
2021-01-31 22:40:30 -08:00
2021-02-06 09:42:05 -08:00
@patch("gometa.ingestion.sink.datahub_kafka.PipelineContext")
@patch("gometa.ingestion.sink.datahub_kafka.SerializingProducer")
def test_kafka_sink_close(self, mock_producer, mock_context):
2021-01-31 22:40:30 -08:00
mock_producer_instance = mock_producer.return_value
2021-02-06 09:42:05 -08:00
kafka_sink = DatahubKafkaSink.create({}, mock_context)
2021-01-31 22:40:30 -08:00
kafka_sink.close()
mock_producer_instance.flush.assert_called_once()
2021-02-11 16:00:29 -08:00
2021-02-06 09:42:05 -08:00
@patch("gometa.ingestion.sink.datahub_kafka.RecordEnvelope")
@patch("gometa.ingestion.sink.datahub_kafka.WriteCallback")
2021-01-31 22:40:30 -08:00
def test_kafka_callback_class(self, mock_w_callback, mock_re):
2021-02-11 22:48:08 -08:00
callback = KafkaCallback(
SinkReport(), record_envelope=mock_re, write_callback=mock_w_callback
)
2021-01-31 22:40:30 -08:00
mock_error = MagicMock()
mock_message = MagicMock()
callback.kafka_callback(mock_error, mock_message)
assert mock_w_callback.on_failure.call_count == 1
mock_w_callback.on_failure.called_with(mock_re, None, {"error", mock_error})
callback.kafka_callback(None, mock_message)
mock_w_callback.on_success.called_once_with(mock_re, {"msg", mock_message})