feat(ingest): dynamically register plugins (#2316)

Co-authored-by: Joe Mirizio <mirizioj@email.chop.edu>
This commit is contained in:
Joe Mirizio 2021-03-31 23:59:45 -04:00 committed by GitHub
parent 7085051a73
commit f3304bec7c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 43 additions and 109 deletions

View File

@ -152,6 +152,27 @@ setuptools.setup(
},
entry_points={
"console_scripts": ["datahub = datahub.entrypoints:datahub"],
"datahub.ingestion.source.plugins": [
"file = datahub.ingestion.source.mce_file:MetadataFileSource",
"athena = datahub.ingestion.source.athena:AthenaSource",
"bigquery = datahub.ingestion.source.bigquery:BigQuerySource",
"dbt = datahub.ingestion.source.dbt:DBTSource",
"druid = datahub.ingestion.source.druid:DruidSource",
"hive = datahub.ingestion.source.hive:HiveSource",
"kafka = datahub.ingestion.source.kafka:KafkaSource",
"ldap = datahub.ingestion.source.ldap:LDAPSource",
"mongodb = datahub.ingestion.source.mongodb:MongoDBSource",
"mssql = datahub.ingestion.source.mssql:SQLServerSource",
"mysql = datahub.ingestion.source.mysql:MySQLSource",
"postgres = datahub.ingestion.source.postgres:PostgresSource",
"snowflake = datahub.ingestion.source.snowflake:SnowflakeSource",
],
"datahub.ingestion.sink.plugins": [
"file = datahub.ingestion.sink.file:FileSink",
"console = datahub.ingestion.sink.console:ConsoleSink",
"datahub-kafka = datahub.ingestion.sink.datahub_kafka:DatahubKafkaSink",
"datahub-rest = datahub.ingestion.sink.datahub_rest:DatahubRestSink",
],
},
install_requires=list(base_requirements | framework_common),
extras_require={

View File

@ -2,6 +2,8 @@ import importlib
import inspect
from typing import Dict, Generic, Type, TypeVar, Union
import pkg_resources
from datahub.configuration.common import ConfigurationError
T = TypeVar("T")
@ -30,6 +32,19 @@ class Registry(Generic[T]):
tp = self._mapping[key]
return not isinstance(tp, Exception)
def load(self, entry_point_key: str) -> None:
for entry_point in pkg_resources.iter_entry_points(entry_point_key):
name = entry_point.name
plugin_class = None
try:
plugin_class = entry_point.load()
except ImportError as e:
self.register_disabled(name, e)
continue
self.register(name, plugin_class)
@property
def mapping(self):
return self._mapping

View File

@ -1,25 +1,9 @@
from datahub.ingestion.api.registry import Registry
from datahub.ingestion.api.sink import Sink
from .console import ConsoleSink
from .file import FileSink
sink_registry = Registry[Sink]()
sink_registry.load("datahub.ingestion.sink.plugins")
# These sinks are always enabled.
sink_registry.register("console", ConsoleSink)
sink_registry.register("file", FileSink)
try:
from .datahub_kafka import DatahubKafkaSink
sink_registry.register("datahub-kafka", DatahubKafkaSink)
except ImportError as e:
sink_registry.register_disabled("datahub-kafka", e)
try:
from .datahub_rest import DatahubRestSink
sink_registry.register("datahub-rest", DatahubRestSink)
except ImportError as e:
sink_registry.register_disabled("datahub-rest", e)
# These sinks are always enabled
assert sink_registry.get("console")
assert sink_registry.get("file")

View File

@ -1,94 +1,8 @@
from datahub.ingestion.api.registry import Registry
from datahub.ingestion.api.source import Source
from .mce_file import MetadataFileSource
source_registry = Registry[Source]()
source_registry.load("datahub.ingestion.source.plugins")
# This source is always enabled.
source_registry.register("file", MetadataFileSource)
try:
from .athena import AthenaSource
source_registry.register("athena", AthenaSource)
except ImportError as e:
source_registry.register_disabled("athena", e)
try:
from .bigquery import BigQuerySource
source_registry.register("bigquery", BigQuerySource)
except ImportError as e:
source_registry.register_disabled("bigquery", e)
try:
from .hive import HiveSource
source_registry.register("hive", HiveSource)
except ImportError as e:
source_registry.register_disabled("hive", e)
try:
from .mssql import SQLServerSource
source_registry.register("mssql", SQLServerSource)
except ImportError as e:
source_registry.register_disabled("mssql", e)
try:
from .mysql import MySQLSource
source_registry.register("mysql", MySQLSource)
except ImportError as e:
source_registry.register_disabled("mysql", e)
try:
from .postgres import PostgresSource
source_registry.register("postgres", PostgresSource)
except ImportError as e:
source_registry.register_disabled("postgres", e)
try:
from .snowflake import SnowflakeSource
source_registry.register("snowflake", SnowflakeSource)
except ImportError as e:
source_registry.register_disabled("snowflake", e)
try:
from .druid import DruidSource
source_registry.register("druid", DruidSource)
except ImportError as e:
source_registry.register_disabled("druid", e)
try:
from .kafka import KafkaSource
source_registry.register("kafka", KafkaSource)
except ImportError as e:
source_registry.register_disabled("kafka", e)
try:
from .dbt import DBTSource
source_registry.register("dbt", DBTSource)
except ImportError as e:
source_registry.register_disabled("dbt", e)
try:
from .ldap import LDAPSource
source_registry.register("ldap", LDAPSource)
except ImportError as e:
source_registry.register_disabled("ldap", e)
try:
from .mongodb import MongoDBSource
source_registry.register("mongodb", MongoDBSource)
except ImportError as e:
source_registry.register_disabled("mongodb", e)
# This source is always enabled
assert source_registry.get("file")