diff --git a/metadata-ingestion/examples/recipes/example_to_datahub_rest.yml b/metadata-ingestion/examples/recipes/example_to_datahub_rest.yml index 70e15a6bec..43baeedd99 100644 --- a/metadata-ingestion/examples/recipes/example_to_datahub_rest.yml +++ b/metadata-ingestion/examples/recipes/example_to_datahub_rest.yml @@ -1,10 +1,9 @@ ---- source: type: "file" - file: + config: filename: "./examples/mce_files/bootstrap_mce.json" sink: type: "datahub-rest" - datahub-rest: + config: server: 'http://localhost:8080' diff --git a/metadata-ingestion/examples/recipes/file_to_console.yml b/metadata-ingestion/examples/recipes/file_to_console.yml index 8061e99d52..e9ee832d8b 100644 --- a/metadata-ingestion/examples/recipes/file_to_console.yml +++ b/metadata-ingestion/examples/recipes/file_to_console.yml @@ -1,7 +1,7 @@ --- source: type: "file" - file: + config: filename: "./examples/mce_files/single_mce.json" sink: diff --git a/metadata-ingestion/examples/recipes/file_to_datahub.yml b/metadata-ingestion/examples/recipes/file_to_datahub.yml index 87a8f8d3f5..3c89d5c3a0 100644 --- a/metadata-ingestion/examples/recipes/file_to_datahub.yml +++ b/metadata-ingestion/examples/recipes/file_to_datahub.yml @@ -1,10 +1,10 @@ --- source: type: "file" - file: + config: filename: "./examples/mce_files/single_mce.json" sink: type: "datahub-kafka" - datahub-kafka: + config: connection.bootstrap: "localhost:9092" diff --git a/metadata-ingestion/examples/recipes/file_to_datahub_rest.yml b/metadata-ingestion/examples/recipes/file_to_datahub_rest.yml index c2f1b3a15a..75c14800ce 100644 --- a/metadata-ingestion/examples/recipes/file_to_datahub_rest.yml +++ b/metadata-ingestion/examples/recipes/file_to_datahub_rest.yml @@ -1,10 +1,10 @@ --- source: type: "file" - file: + config: filename: "./examples/mce_files/single_mce.json" sink: type: "datahub-rest" - datahub-rest: + config: server: 'http://localhost:8080' diff --git a/metadata-ingestion/examples/recipes/file_to_file.yml b/metadata-ingestion/examples/recipes/file_to_file.yml index 8d5bd4b4fa..82fdc0f8c9 100644 --- a/metadata-ingestion/examples/recipes/file_to_file.yml +++ b/metadata-ingestion/examples/recipes/file_to_file.yml @@ -1,10 +1,10 @@ --- source: type: "file" - file: + config: filename: "./examples/mce_files/bootstrap_mce.json" sink: type: "file" - file: + config: filename: './output/bootstrap_mce.json' diff --git a/metadata-ingestion/examples/recipes/kafka_to_console.toml b/metadata-ingestion/examples/recipes/kafka_to_console.toml index 05499c9ce9..7535032cf9 100644 --- a/metadata-ingestion/examples/recipes/kafka_to_console.toml +++ b/metadata-ingestion/examples/recipes/kafka_to_console.toml @@ -1,6 +1,6 @@ [source] type="kafka" -[source.kafka.connection] +[source.config.connection] bootstrap="localhost:9092" [sink] diff --git a/metadata-ingestion/examples/recipes/kafka_to_console.yml b/metadata-ingestion/examples/recipes/kafka_to_console.yml index 4ce4fec0c0..cbcec5a773 100644 --- a/metadata-ingestion/examples/recipes/kafka_to_console.yml +++ b/metadata-ingestion/examples/recipes/kafka_to_console.yml @@ -1,7 +1,7 @@ --- source: type: "kafka" - kafka: + config: connection.bootstrap: "broker:9092" sink: diff --git a/metadata-ingestion/examples/recipes/kafka_to_datahub.yml b/metadata-ingestion/examples/recipes/kafka_to_datahub.yml index 7ed60b4291..188624fe60 100644 --- a/metadata-ingestion/examples/recipes/kafka_to_datahub.yml +++ b/metadata-ingestion/examples/recipes/kafka_to_datahub.yml @@ -1,15 +1,15 @@ --- source: type: "kafka" - kafka: + config: connection.bootstrap: "localhost:9092" - topic_patterns: - allow: - - ".*" - deny: - - "^_.+" # deny all tables that start with an underscore + topic_patterns: + allow: + - ".*" + deny: + - "^_.+" # deny all tables that start with an underscore sink: type: "datahub-kafka" - datahub-kafka: + config: connection.bootstrap: "localhost:9092" diff --git a/metadata-ingestion/examples/recipes/kafka_to_datahub_rest.yml b/metadata-ingestion/examples/recipes/kafka_to_datahub_rest.yml index bf9a4dc528..1dcf57b528 100644 --- a/metadata-ingestion/examples/recipes/kafka_to_datahub_rest.yml +++ b/metadata-ingestion/examples/recipes/kafka_to_datahub_rest.yml @@ -1,10 +1,10 @@ --- source: type: "kafka" - kafka: + config: connection.bootstrap: "broker:9092" sink: type: "datahub-rest" - datahub-rest: + config: server: 'http://localhost:8080' diff --git a/metadata-ingestion/examples/recipes/mssql_to_console.yml b/metadata-ingestion/examples/recipes/mssql_to_console.yml index fa2320b541..cb42bc6605 100644 --- a/metadata-ingestion/examples/recipes/mssql_to_console.yml +++ b/metadata-ingestion/examples/recipes/mssql_to_console.yml @@ -1,7 +1,7 @@ --- source: type: mssql - mssql: + config: username: sa password: test!Password database: DemoData diff --git a/metadata-ingestion/examples/recipes/mssql_to_datahub.yml b/metadata-ingestion/examples/recipes/mssql_to_datahub.yml index 7c6e1883fc..7d1b2a13c9 100644 --- a/metadata-ingestion/examples/recipes/mssql_to_datahub.yml +++ b/metadata-ingestion/examples/recipes/mssql_to_datahub.yml @@ -1,12 +1,12 @@ --- source: type: mssql - mssql: + config: username: sa password: test!Password database: DemoData sink: type: "datahub-rest" - datahub-rest: + config: server: 'http://localhost:8080' diff --git a/metadata-ingestion/examples/recipes/mysql_to_datahub.yml b/metadata-ingestion/examples/recipes/mysql_to_datahub.yml index 09141a0053..75ae0086ae 100644 --- a/metadata-ingestion/examples/recipes/mysql_to_datahub.yml +++ b/metadata-ingestion/examples/recipes/mysql_to_datahub.yml @@ -1,11 +1,11 @@ --- source: type: "mysql" - mysql: + config: username: datahub password: datahub sink: type: "datahub-rest" - datahub-rest: + config: server: 'http://localhost:8080' diff --git a/metadata-ingestion/examples/recipes/secured_kafka_to_console.yml b/metadata-ingestion/examples/recipes/secured_kafka_to_console.yml index 724c0eec3a..f41f033837 100644 --- a/metadata-ingestion/examples/recipes/secured_kafka_to_console.yml +++ b/metadata-ingestion/examples/recipes/secured_kafka_to_console.yml @@ -1,6 +1,6 @@ source: type: "kafka" - kafka: + config: connection.bootstrap: "broker:9092" producer_config: security.protocol: ssl diff --git a/metadata-ingestion/src/gometa/configuration/common.py b/metadata-ingestion/src/gometa/configuration/common.py index 67c2b27b7f..4d655aae4e 100644 --- a/metadata-ingestion/src/gometa/configuration/common.py +++ b/metadata-ingestion/src/gometa/configuration/common.py @@ -1,18 +1,21 @@ import re from abc import ABC, abstractmethod from contextlib import contextmanager -from typing import IO, List +from typing import IO, Any, List, Optional from pydantic import BaseModel, ValidationError class ConfigModel(BaseModel): - class Config: - extra = "allow" + # This class is here for future compatibility reasons. + pass class DynamicTypedConfig(ConfigModel): type: str + # This config type is declared Optional[Any] here. The eventual parser for the + # specified type is responsible for further validation. + config: Optional[Any] class MetaError(Exception): diff --git a/metadata-ingestion/src/gometa/ingestion/api/common.py b/metadata-ingestion/src/gometa/ingestion/api/common.py index 3b583b2248..a4c4b6121b 100644 --- a/metadata-ingestion/src/gometa/ingestion/api/common.py +++ b/metadata-ingestion/src/gometa/ingestion/api/common.py @@ -2,7 +2,7 @@ from abc import ABCMeta, abstractmethod from dataclasses import dataclass from typing import Generic, TypeVar -T = TypeVar('T') +T = TypeVar("T") @dataclass @@ -27,4 +27,5 @@ class WorkUnit(_WorkUnitId, metaclass=ABCMeta): @dataclass class PipelineContext: + # TODO: autogenerate run_ids if not specified. run_id: str diff --git a/metadata-ingestion/src/gometa/ingestion/run/pipeline.py b/metadata-ingestion/src/gometa/ingestion/run/pipeline.py index 92cbbfc592..3870a1b649 100644 --- a/metadata-ingestion/src/gometa/ingestion/run/pipeline.py +++ b/metadata-ingestion/src/gometa/ingestion/run/pipeline.py @@ -57,7 +57,7 @@ class Pipeline: f"Did not find a registered source class for {source_type}" ) from e self.source: Source = source_class.create( - self.config.source.dict().get(source_type, {}), self.ctx + self.config.source.dict().get("config", {}), self.ctx ) logger.debug(f"Source type:{source_type},{source_class} configured") @@ -68,7 +68,7 @@ class Pipeline: raise ValueError( f"Did not find a registered sink class for {sink_type}" ) from e - sink_config = self.config.sink.dict().get(sink_type, {}) + sink_config = self.config.sink.dict().get("config", {}) self.sink: Sink = sink_class.create(sink_config, self.ctx) logger.debug(f"Sink type:{self.config.sink.type},{sink_class} configured") diff --git a/metadata-ingestion/test_requirements.txt b/metadata-ingestion/test_requirements.txt index 3b07bbe9ec..219713ca9a 100644 --- a/metadata-ingestion/test_requirements.txt +++ b/metadata-ingestion/test_requirements.txt @@ -8,13 +8,3 @@ pytest-cov>=2.8.1 pytest-docker sqlalchemy-stubs deepdiff - -# importlib-metadata==1.6.0 -# packaging==20.3 -# pathspec==0.8.0 -# pluggy==0.13.1 -# regex==2020.5.7 -# six==1.14.0 -# typed-ast==1.4.1 -# wcwidth==0.1.9 -# zipp==3.1.0 diff --git a/metadata-ingestion/tests/integration/mysql/mysql_to_file.yml b/metadata-ingestion/tests/integration/mysql/mysql_to_file.yml index 810f94d389..867385093b 100644 --- a/metadata-ingestion/tests/integration/mysql/mysql_to_file.yml +++ b/metadata-ingestion/tests/integration/mysql/mysql_to_file.yml @@ -2,7 +2,7 @@ run_id: mysql-test source: type: mysql - mysql: + config: username: root password: example database: metagalaxy @@ -10,5 +10,5 @@ source: sink: type: file - file: + config: filename: './mysql_mces.json' diff --git a/metadata-ingestion/tests/integration/sql_server/mssql_to_file.yml b/metadata-ingestion/tests/integration/sql_server/mssql_to_file.yml index 93b56ee1ba..78a3f8f782 100644 --- a/metadata-ingestion/tests/integration/sql_server/mssql_to_file.yml +++ b/metadata-ingestion/tests/integration/sql_server/mssql_to_file.yml @@ -2,7 +2,7 @@ run_id: mssql-test source: type: mssql - mssql: + config: username: sa password: test!Password database: DemoData @@ -10,5 +10,5 @@ source: sink: type: file - file: + config: filename: './mssql_mces.json' diff --git a/metadata-ingestion/tests/unit/serde/test_serde.py b/metadata-ingestion/tests/unit/serde/test_serde.py index 2991421479..6d0b7fd232 100644 --- a/metadata-ingestion/tests/unit/serde/test_serde.py +++ b/metadata-ingestion/tests/unit/serde/test_serde.py @@ -14,8 +14,8 @@ def test_serde_large(pytestconfig, tmp_path): pipeline = Pipeline.create( { - 'source': {'type': 'file', 'file': {'filename': str(golden_file)}}, - 'sink': {'type': 'file', 'file': {'filename': str(output_file)}}, + "source": {"type": "file", "config": {"filename": str(golden_file)}}, + "sink": {"type": "file", "config": {"filename": str(output_file)}}, } ) pipeline.run() diff --git a/metadata-ingestion/tests/unit/test_pipeline.py b/metadata-ingestion/tests/unit/test_pipeline.py index 389f963581..07f91dec00 100644 --- a/metadata-ingestion/tests/unit/test_pipeline.py +++ b/metadata-ingestion/tests/unit/test_pipeline.py @@ -10,7 +10,7 @@ class PipelineTest(unittest.TestCase): def test_configure(self, mock_sink, mock_source): pipeline = Pipeline.create( { - "source": {"type": "kafka", "kafka": {"bootstrap": "localhost:9092"}}, + "source": {"type": "kafka", "config": {"bootstrap": "localhost:9092"}}, "sink": {"type": "console"}, } )