Harshal Sheth c7892ada4c Codegen avro + datahub kafka sink (#3)
* Add codegen

* New architecture + setup file -> console pipeline

* Cleanup source loader

* Basic Kafka metadata source

* Kafka source and extractor

* Add kwargs construct interface

* Fix kafka source unit test

* start working on pipeline test

* kafka datahub sink

* Make myself a profile

* Ingest to datahub from kafka

* Update codegen

* Add restli transport

* Fix bug in restli conversion
2021-02-15 18:29:27 -08:00

26 lines
878 B
Python

import unittest
from unittest.mock import patch, MagicMock
from gometa.ingestion.run.pipeline import Pipeline
class PipelineTest(unittest.TestCase):
@patch("gometa.ingestion.extractor.kafka.KafkaMetadataExtractor")
@patch("gometa.ingestion.source.kafka.KafkaSource.get_workunits")
@patch("gometa.ingestion.sink.console.ConsoleSink.close")
def test_configure(self, mock_sink, mock_source, mock_extractor):
pipeline = Pipeline(
{
"source": {
"type": "kafka",
"extractor": "gometa.ingestion.extractor.kafka.KafkaMetadataExtractor",
"kafka": {"bootstrap": "localhost:9092"},
},
"sink": {"type": "console"},
}
)
pipeline.run()
mock_source.assert_called_once()
mock_sink.assert_called_once()