mirror of
https://github.com/datahub-project/datahub.git
synced 2025-12-28 10:28:22 +00:00
fix(ingest): include urn as key for kafka emitter (#2634)
This commit is contained in:
parent
9bff4e1cc4
commit
6b9d0d0129
@ -61,6 +61,7 @@ class DatahubKafkaEmitter:
|
||||
self.producer.poll(0)
|
||||
self.producer.produce(
|
||||
topic=self.config.topic,
|
||||
key=mce.proposedSnapshot.urn,
|
||||
value=mce,
|
||||
on_delivery=callback,
|
||||
)
|
||||
|
||||
@ -1,6 +1,7 @@
|
||||
import unittest
|
||||
from unittest.mock import MagicMock, patch, sentinel
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import datahub.emitter.mce_builder as builder
|
||||
from datahub.ingestion.api.common import RecordEnvelope
|
||||
from datahub.ingestion.api.sink import SinkReport, WriteCallback
|
||||
from datahub.ingestion.sink.datahub_kafka import DatahubKafkaSink, _KafkaCallback
|
||||
@ -32,15 +33,27 @@ class KafkaSinkTest(unittest.TestCase):
|
||||
kafka_sink = DatahubKafkaSink.create(
|
||||
{"connection": {"bootstrap": "foobar:9092"}}, mock_context
|
||||
)
|
||||
re = RecordEnvelope(record=sentinel, metadata={})
|
||||
mce = builder.make_lineage_mce(
|
||||
[
|
||||
builder.make_dataset_urn("bigquery", "upstream1"),
|
||||
builder.make_dataset_urn("bigquery", "upstream2"),
|
||||
],
|
||||
builder.make_dataset_urn("bigquery", "downstream1"),
|
||||
)
|
||||
|
||||
re = RecordEnvelope(record=mce, metadata={})
|
||||
kafka_sink.write_record_async(re, callback)
|
||||
assert mock_producer_instance.poll.call_count == 1 # poll() called once
|
||||
|
||||
mock_producer_instance.poll.assert_called_once() # producer should call poll() first
|
||||
self.validate_kafka_callback(
|
||||
mock_k_callback, re, callback
|
||||
) # validate kafka callback was constructed appropriately
|
||||
|
||||
# validate that confluent_kafka.Producer.produce was called with the right arguments
|
||||
mock_producer_instance.produce.assert_called_once()
|
||||
args, kwargs = mock_producer_instance.produce.call_args
|
||||
assert kwargs["value"] == mce
|
||||
assert kwargs["key"] # produce call should include a Kafka key
|
||||
created_callback = kwargs["on_delivery"]
|
||||
assert created_callback == mock_k_callback_instance.kafka_callback
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user