Use type + config everywhere

This commit is contained in:
Harshal Sheth 2021-02-12 12:05:41 -08:00 committed by Shirshanka Das
parent 3678368ed3
commit f807bb44b8
21 changed files with 43 additions and 50 deletions

View File

@ -1,10 +1,9 @@
---
source:
type: "file"
file:
config:
filename: "./examples/mce_files/bootstrap_mce.json"
sink:
type: "datahub-rest"
datahub-rest:
config:
server: 'http://localhost:8080'

View File

@ -1,7 +1,7 @@
---
source:
type: "file"
file:
config:
filename: "./examples/mce_files/single_mce.json"
sink:

View File

@ -1,10 +1,10 @@
---
source:
type: "file"
file:
config:
filename: "./examples/mce_files/single_mce.json"
sink:
type: "datahub-kafka"
datahub-kafka:
config:
connection.bootstrap: "localhost:9092"

View File

@ -1,10 +1,10 @@
---
source:
type: "file"
file:
config:
filename: "./examples/mce_files/single_mce.json"
sink:
type: "datahub-rest"
datahub-rest:
config:
server: 'http://localhost:8080'

View File

@ -1,10 +1,10 @@
---
source:
type: "file"
file:
config:
filename: "./examples/mce_files/bootstrap_mce.json"
sink:
type: "file"
file:
config:
filename: './output/bootstrap_mce.json'

View File

@ -1,6 +1,6 @@
[source]
type="kafka"
[source.kafka.connection]
[source.config.connection]
bootstrap="localhost:9092"
[sink]

View File

@ -1,7 +1,7 @@
---
source:
type: "kafka"
kafka:
config:
connection.bootstrap: "broker:9092"
sink:

View File

@ -1,15 +1,15 @@
---
source:
type: "kafka"
kafka:
config:
connection.bootstrap: "localhost:9092"
topic_patterns:
allow:
- ".*"
deny:
- "^_.+" # deny all tables that start with an underscore
topic_patterns:
allow:
- ".*"
deny:
- "^_.+" # deny all tables that start with an underscore
sink:
type: "datahub-kafka"
datahub-kafka:
config:
connection.bootstrap: "localhost:9092"

View File

@ -1,10 +1,10 @@
---
source:
type: "kafka"
kafka:
config:
connection.bootstrap: "broker:9092"
sink:
type: "datahub-rest"
datahub-rest:
config:
server: 'http://localhost:8080'

View File

@ -1,7 +1,7 @@
---
source:
type: mssql
mssql:
config:
username: sa
password: test!Password
database: DemoData

View File

@ -1,12 +1,12 @@
---
source:
type: mssql
mssql:
config:
username: sa
password: test!Password
database: DemoData
sink:
type: "datahub-rest"
datahub-rest:
config:
server: 'http://localhost:8080'

View File

@ -1,11 +1,11 @@
---
source:
type: "mysql"
mysql:
config:
username: datahub
password: datahub
sink:
type: "datahub-rest"
datahub-rest:
config:
server: 'http://localhost:8080'

View File

@ -1,6 +1,6 @@
source:
type: "kafka"
kafka:
config:
connection.bootstrap: "broker:9092"
producer_config:
security.protocol: ssl

View File

@ -1,18 +1,21 @@
import re
from abc import ABC, abstractmethod
from contextlib import contextmanager
from typing import IO, List
from typing import IO, Any, List, Optional
from pydantic import BaseModel, ValidationError
class ConfigModel(BaseModel):
class Config:
extra = "allow"
# This class is here for future compatibility reasons.
pass
class DynamicTypedConfig(ConfigModel):
type: str
# This config type is declared Optional[Any] here. The eventual parser for the
# specified type is responsible for further validation.
config: Optional[Any]
class MetaError(Exception):

View File

@ -2,7 +2,7 @@ from abc import ABCMeta, abstractmethod
from dataclasses import dataclass
from typing import Generic, TypeVar
T = TypeVar('T')
T = TypeVar("T")
@dataclass
@ -27,4 +27,5 @@ class WorkUnit(_WorkUnitId, metaclass=ABCMeta):
@dataclass
class PipelineContext:
# TODO: autogenerate run_ids if not specified.
run_id: str

View File

@ -57,7 +57,7 @@ class Pipeline:
f"Did not find a registered source class for {source_type}"
) from e
self.source: Source = source_class.create(
self.config.source.dict().get(source_type, {}), self.ctx
self.config.source.dict().get("config", {}), self.ctx
)
logger.debug(f"Source type:{source_type},{source_class} configured")
@ -68,7 +68,7 @@ class Pipeline:
raise ValueError(
f"Did not find a registered sink class for {sink_type}"
) from e
sink_config = self.config.sink.dict().get(sink_type, {})
sink_config = self.config.sink.dict().get("config", {})
self.sink: Sink = sink_class.create(sink_config, self.ctx)
logger.debug(f"Sink type:{self.config.sink.type},{sink_class} configured")

View File

@ -8,13 +8,3 @@ pytest-cov>=2.8.1
pytest-docker
sqlalchemy-stubs
deepdiff
# importlib-metadata==1.6.0
# packaging==20.3
# pathspec==0.8.0
# pluggy==0.13.1
# regex==2020.5.7
# six==1.14.0
# typed-ast==1.4.1
# wcwidth==0.1.9
# zipp==3.1.0

View File

@ -2,7 +2,7 @@ run_id: mysql-test
source:
type: mysql
mysql:
config:
username: root
password: example
database: metagalaxy
@ -10,5 +10,5 @@ source:
sink:
type: file
file:
config:
filename: './mysql_mces.json'

View File

@ -2,7 +2,7 @@ run_id: mssql-test
source:
type: mssql
mssql:
config:
username: sa
password: test!Password
database: DemoData
@ -10,5 +10,5 @@ source:
sink:
type: file
file:
config:
filename: './mssql_mces.json'

View File

@ -14,8 +14,8 @@ def test_serde_large(pytestconfig, tmp_path):
pipeline = Pipeline.create(
{
'source': {'type': 'file', 'file': {'filename': str(golden_file)}},
'sink': {'type': 'file', 'file': {'filename': str(output_file)}},
"source": {"type": "file", "config": {"filename": str(golden_file)}},
"sink": {"type": "file", "config": {"filename": str(output_file)}},
}
)
pipeline.run()

View File

@ -10,7 +10,7 @@ class PipelineTest(unittest.TestCase):
def test_configure(self, mock_sink, mock_source):
pipeline = Pipeline.create(
{
"source": {"type": "kafka", "kafka": {"bootstrap": "localhost:9092"}},
"source": {"type": "kafka", "config": {"bootstrap": "localhost:9092"}},
"sink": {"type": "console"},
}
)