mirror of
https://github.com/datahub-project/datahub.git
synced 2025-07-04 07:34:44 +00:00
22 lines
767 B
Python
22 lines
767 B
Python
from typing import List
|
|
|
|
from datahub.configuration.common import ConfigModel
|
|
from datahub.ingestion.api.common import RecordEnvelope
|
|
from datahub.ingestion.api.sink import Sink, SinkReport, WriteCallback
|
|
|
|
|
|
class RecordingSinkReport(SinkReport):
|
|
received_records: List[RecordEnvelope] = []
|
|
|
|
def report_record_written(self, record_envelope: RecordEnvelope) -> None:
|
|
super().report_record_written(record_envelope)
|
|
self.received_records.append(record_envelope)
|
|
|
|
|
|
class RecordingSink(Sink[ConfigModel, RecordingSinkReport]):
|
|
def write_record_async(
|
|
self, record_envelope: RecordEnvelope, callback: WriteCallback
|
|
) -> None:
|
|
self.report.report_record_written(record_envelope)
|
|
callback.on_success(record_envelope, {})
|