mirror of
https://github.com/datahub-project/datahub.git
synced 2025-07-05 16:22:17 +00:00
125 lines
5.6 KiB
Python
125 lines
5.6 KiB
Python
import unittest
|
|
from typing import Union
|
|
from unittest.mock import MagicMock, call, patch
|
|
|
|
import datahub.emitter.mce_builder as builder
|
|
import datahub.metadata.schema_classes as models
|
|
from datahub.emitter.kafka_emitter import MCE_KEY
|
|
from datahub.emitter.mcp import MetadataChangeProposalWrapper
|
|
from datahub.ingestion.api.common import PipelineContext, RecordEnvelope
|
|
from datahub.ingestion.api.sink import SinkReport, WriteCallback
|
|
from datahub.ingestion.sink.datahub_kafka import DatahubKafkaSink, _KafkaCallback
|
|
from datahub.metadata.com.linkedin.pegasus2avro.mxe import (
|
|
MetadataChangeEvent,
|
|
MetadataChangeProposal,
|
|
)
|
|
|
|
|
|
class KafkaSinkTest(unittest.TestCase):
|
|
@patch("datahub.ingestion.api.sink.PipelineContext", autospec=True)
|
|
@patch("datahub.emitter.kafka_emitter.SerializingProducer", autospec=True)
|
|
def test_kafka_sink_config(self, mock_producer, mock_context):
|
|
kafka_sink = DatahubKafkaSink.create(
|
|
{"connection": {"bootstrap": "foobar:9092"}}, mock_context
|
|
)
|
|
kafka_sink.close()
|
|
assert (
|
|
mock_producer.call_count == 2
|
|
) # constructor should be called twice (once each for mce,mcp)
|
|
|
|
def validate_kafka_callback(self, mock_k_callback, record_envelope, write_callback):
|
|
assert mock_k_callback.call_count == 1 # KafkaCallback constructed
|
|
constructor_args, constructor_kwargs = mock_k_callback.call_args
|
|
assert constructor_args[1] == record_envelope
|
|
assert constructor_args[2] == write_callback
|
|
|
|
@patch("datahub.ingestion.sink.datahub_kafka._KafkaCallback", autospec=True)
|
|
@patch("datahub.emitter.kafka_emitter.SerializingProducer", autospec=True)
|
|
def test_kafka_sink_mcp(self, mock_producer, mock_callback):
|
|
from datahub.emitter.mcp import MetadataChangeProposalWrapper
|
|
|
|
mcp = MetadataChangeProposalWrapper(
|
|
entityUrn="urn:li:dataset:(urn:li:dataPlatform:mysql,User.UserAccount,PROD)",
|
|
aspect=models.DatasetProfileClass(
|
|
rowCount=2000,
|
|
columnCount=15,
|
|
timestampMillis=1626995099686,
|
|
),
|
|
)
|
|
kafka_sink = DatahubKafkaSink.create(
|
|
{"connection": {"bootstrap": "localhost:9092"}},
|
|
PipelineContext(run_id="test"),
|
|
)
|
|
kafka_sink.write_record_async(
|
|
RecordEnvelope(record=mcp, metadata={}), mock_callback
|
|
)
|
|
kafka_sink.close()
|
|
assert mock_producer.call_count == 2 # constructor should be called
|
|
|
|
@patch("datahub.ingestion.api.sink.PipelineContext", autospec=True)
|
|
@patch("datahub.emitter.kafka_emitter.SerializingProducer", autospec=True)
|
|
@patch("datahub.ingestion.sink.datahub_kafka._KafkaCallback", autospec=True)
|
|
def test_kafka_sink_write(self, mock_k_callback, mock_producer, mock_context):
|
|
mock_k_callback_instance = mock_k_callback.return_value
|
|
callback = MagicMock(spec=WriteCallback)
|
|
kafka_sink = DatahubKafkaSink.create(
|
|
{"connection": {"bootstrap": "foobar:9092"}}, mock_context
|
|
)
|
|
mock_producer_instance = kafka_sink.emitter.producers[MCE_KEY]
|
|
|
|
mce = builder.make_lineage_mce(
|
|
[
|
|
builder.make_dataset_urn("bigquery", "upstream1"),
|
|
builder.make_dataset_urn("bigquery", "upstream2"),
|
|
],
|
|
builder.make_dataset_urn("bigquery", "downstream1"),
|
|
)
|
|
|
|
re: RecordEnvelope[
|
|
Union[
|
|
MetadataChangeEvent,
|
|
MetadataChangeProposal,
|
|
MetadataChangeProposalWrapper,
|
|
]
|
|
] = RecordEnvelope(record=mce, metadata={})
|
|
kafka_sink.write_record_async(re, callback)
|
|
|
|
mock_producer_instance.poll.assert_called_once() # producer should call poll() first
|
|
self.validate_kafka_callback(
|
|
mock_k_callback, re, callback
|
|
) # validate kafka callback was constructed appropriately
|
|
|
|
# validate that confluent_kafka.Producer.produce was called with the right arguments
|
|
mock_producer_instance.produce.assert_called_once()
|
|
args, kwargs = mock_producer_instance.produce.call_args
|
|
assert kwargs["value"] == mce
|
|
assert kwargs["key"] # produce call should include a Kafka key
|
|
created_callback = kwargs["on_delivery"]
|
|
assert created_callback == mock_k_callback_instance.kafka_callback
|
|
|
|
# TODO: Test that kafka producer is configured correctly
|
|
|
|
@patch("datahub.ingestion.api.sink.PipelineContext", autospec=True)
|
|
@patch("datahub.emitter.kafka_emitter.SerializingProducer", autospec=True)
|
|
def test_kafka_sink_close(self, mock_producer, mock_context):
|
|
mock_producer_instance = mock_producer.return_value
|
|
kafka_sink = DatahubKafkaSink.create({}, mock_context)
|
|
kafka_sink.close()
|
|
mock_producer_instance.flush.assert_has_calls([call(), call()])
|
|
|
|
@patch("datahub.ingestion.sink.datahub_kafka.RecordEnvelope", autospec=True)
|
|
@patch("datahub.ingestion.sink.datahub_kafka.WriteCallback", autospec=True)
|
|
def test_kafka_callback_class(self, mock_w_callback, mock_re):
|
|
callback = _KafkaCallback(
|
|
SinkReport(), record_envelope=mock_re, write_callback=mock_w_callback
|
|
)
|
|
mock_error = MagicMock()
|
|
mock_message = MagicMock()
|
|
callback.kafka_callback(mock_error, mock_message)
|
|
mock_w_callback.on_failure.assert_called_once()
|
|
assert mock_w_callback.on_failure.call_args[0][0] == mock_re
|
|
assert mock_w_callback.on_failure.call_args[0][1] == mock_error
|
|
callback.kafka_callback(None, mock_message)
|
|
mock_w_callback.on_success.assert_called_once()
|
|
assert mock_w_callback.on_success.call_args[0][0] == mock_re
|