2024-01-31 14:42:40 +05:30
|
|
|
import time
|
|
|
|
|
2024-01-22 11:46:04 -06:00
|
|
|
from datahub.ingestion.api.common import PipelineContext, RecordEnvelope
|
|
|
|
from datahub.ingestion.api.sink import NoopWriteCallback
|
2024-01-31 14:42:40 +05:30
|
|
|
from datahub.ingestion.sink.file import FileSink, FileSinkConfig
|
2024-01-22 11:46:04 -06:00
|
|
|
|
|
|
|
|
|
|
|
class FileEmitter:
|
2024-01-31 14:42:40 +05:30
|
|
|
def __init__(
|
2025-01-18 15:06:20 +05:30
|
|
|
self, filename: str, run_id: str = f"test_{int(time.time() * 1000.0)}"
|
2024-01-31 14:42:40 +05:30
|
|
|
) -> None:
|
2024-01-22 11:46:04 -06:00
|
|
|
self.sink: FileSink = FileSink(
|
|
|
|
ctx=PipelineContext(run_id=run_id),
|
|
|
|
config=FileSinkConfig(filename=filename),
|
|
|
|
)
|
|
|
|
|
|
|
|
def emit(self, event):
|
|
|
|
self.sink.write_record_async(
|
|
|
|
record_envelope=RecordEnvelope(record=event, metadata={}),
|
|
|
|
write_callback=NoopWriteCallback(),
|
|
|
|
)
|
|
|
|
|
|
|
|
def close(self):
|
2024-01-31 14:42:40 +05:30
|
|
|
self.sink.close()
|