datahub/metadata-ingestion/tests/unit/test_kafka_sink.py

125 lines
5.6 KiB
Python

import unittest
from typing import Union
from unittest.mock import MagicMock, call, patch
import datahub.emitter.mce_builder as builder
import datahub.metadata.schema_classes as models
from datahub.emitter.kafka_emitter import MCE_KEY
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.common import PipelineContext, RecordEnvelope
from datahub.ingestion.api.sink import SinkReport, WriteCallback
from datahub.ingestion.sink.datahub_kafka import DatahubKafkaSink, _KafkaCallback
from datahub.metadata.com.linkedin.pegasus2avro.mxe import (
MetadataChangeEvent,
MetadataChangeProposal,
)
class KafkaSinkTest(unittest.TestCase):
@patch("datahub.ingestion.api.sink.PipelineContext", autospec=True)
@patch("datahub.emitter.kafka_emitter.SerializingProducer", autospec=True)
def test_kafka_sink_config(self, mock_producer, mock_context):
kafka_sink = DatahubKafkaSink.create(
{"connection": {"bootstrap": "foobar:9092"}}, mock_context
)
kafka_sink.close()
assert (
mock_producer.call_count == 2
) # constructor should be called twice (once each for mce,mcp)
def validate_kafka_callback(self, mock_k_callback, record_envelope, write_callback):
assert mock_k_callback.call_count == 1 # KafkaCallback constructed
constructor_args, constructor_kwargs = mock_k_callback.call_args
assert constructor_args[1] == record_envelope
assert constructor_args[2] == write_callback
@patch("datahub.ingestion.sink.datahub_kafka._KafkaCallback", autospec=True)
@patch("datahub.emitter.kafka_emitter.SerializingProducer", autospec=True)
def test_kafka_sink_mcp(self, mock_producer, mock_callback):
from datahub.emitter.mcp import MetadataChangeProposalWrapper
mcp = MetadataChangeProposalWrapper(
entityUrn="urn:li:dataset:(urn:li:dataPlatform:mysql,User.UserAccount,PROD)",
aspect=models.DatasetProfileClass(
rowCount=2000,
columnCount=15,
timestampMillis=1626995099686,
),
)
kafka_sink = DatahubKafkaSink.create(
{"connection": {"bootstrap": "localhost:9092"}},
PipelineContext(run_id="test"),
)
kafka_sink.write_record_async(
RecordEnvelope(record=mcp, metadata={}), mock_callback
)
kafka_sink.close()
assert mock_producer.call_count == 2 # constructor should be called
@patch("datahub.ingestion.api.sink.PipelineContext", autospec=True)
@patch("datahub.emitter.kafka_emitter.SerializingProducer", autospec=True)
@patch("datahub.ingestion.sink.datahub_kafka._KafkaCallback", autospec=True)
def test_kafka_sink_write(self, mock_k_callback, mock_producer, mock_context):
mock_k_callback_instance = mock_k_callback.return_value
callback = MagicMock(spec=WriteCallback)
kafka_sink = DatahubKafkaSink.create(
{"connection": {"bootstrap": "foobar:9092"}}, mock_context
)
mock_producer_instance = kafka_sink.emitter.producers[MCE_KEY]
mce = builder.make_lineage_mce(
[
builder.make_dataset_urn("bigquery", "upstream1"),
builder.make_dataset_urn("bigquery", "upstream2"),
],
builder.make_dataset_urn("bigquery", "downstream1"),
)
re: RecordEnvelope[
Union[
MetadataChangeEvent,
MetadataChangeProposal,
MetadataChangeProposalWrapper,
]
] = RecordEnvelope(record=mce, metadata={})
kafka_sink.write_record_async(re, callback)
mock_producer_instance.poll.assert_called_once() # producer should call poll() first
self.validate_kafka_callback(
mock_k_callback, re, callback
) # validate kafka callback was constructed appropriately
# validate that confluent_kafka.Producer.produce was called with the right arguments
mock_producer_instance.produce.assert_called_once()
args, kwargs = mock_producer_instance.produce.call_args
assert kwargs["value"] == mce
assert kwargs["key"] # produce call should include a Kafka key
created_callback = kwargs["on_delivery"]
assert created_callback == mock_k_callback_instance.kafka_callback
# TODO: Test that kafka producer is configured correctly
@patch("datahub.ingestion.api.sink.PipelineContext", autospec=True)
@patch("datahub.emitter.kafka_emitter.SerializingProducer", autospec=True)
def test_kafka_sink_close(self, mock_producer, mock_context):
mock_producer_instance = mock_producer.return_value
kafka_sink = DatahubKafkaSink.create({}, mock_context)
kafka_sink.close()
mock_producer_instance.flush.assert_has_calls([call(), call()])
@patch("datahub.ingestion.sink.datahub_kafka.RecordEnvelope", autospec=True)
@patch("datahub.ingestion.sink.datahub_kafka.WriteCallback", autospec=True)
def test_kafka_callback_class(self, mock_w_callback, mock_re):
callback = _KafkaCallback(
SinkReport(), record_envelope=mock_re, write_callback=mock_w_callback
)
mock_error = MagicMock()
mock_message = MagicMock()
callback.kafka_callback(mock_error, mock_message)
mock_w_callback.on_failure.assert_called_once()
assert mock_w_callback.on_failure.call_args[0][0] == mock_re
assert mock_w_callback.on_failure.call_args[0][1] == mock_error
callback.kafka_callback(None, mock_message)
mock_w_callback.on_success.assert_called_once()
assert mock_w_callback.on_success.call_args[0][0] == mock_re