diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index 7fb341c551..2461313d4f 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -152,6 +152,27 @@ setuptools.setup( }, entry_points={ "console_scripts": ["datahub = datahub.entrypoints:datahub"], + "datahub.ingestion.source.plugins": [ + "file = datahub.ingestion.source.mce_file:MetadataFileSource", + "athena = datahub.ingestion.source.athena:AthenaSource", + "bigquery = datahub.ingestion.source.bigquery:BigQuerySource", + "dbt = datahub.ingestion.source.dbt:DBTSource", + "druid = datahub.ingestion.source.druid:DruidSource", + "hive = datahub.ingestion.source.hive:HiveSource", + "kafka = datahub.ingestion.source.kafka:KafkaSource", + "ldap = datahub.ingestion.source.ldap:LDAPSource", + "mongodb = datahub.ingestion.source.mongodb:MongoDBSource", + "mssql = datahub.ingestion.source.mssql:SQLServerSource", + "mysql = datahub.ingestion.source.mysql:MySQLSource", + "postgres = datahub.ingestion.source.postgres:PostgresSource", + "snowflake = datahub.ingestion.source.snowflake:SnowflakeSource", + ], + "datahub.ingestion.sink.plugins": [ + "file = datahub.ingestion.sink.file:FileSink", + "console = datahub.ingestion.sink.console:ConsoleSink", + "datahub-kafka = datahub.ingestion.sink.datahub_kafka:DatahubKafkaSink", + "datahub-rest = datahub.ingestion.sink.datahub_rest:DatahubRestSink", + ], }, install_requires=list(base_requirements | framework_common), extras_require={ diff --git a/metadata-ingestion/src/datahub/ingestion/api/registry.py b/metadata-ingestion/src/datahub/ingestion/api/registry.py index d35492fb38..3a76cd2dfc 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/registry.py +++ b/metadata-ingestion/src/datahub/ingestion/api/registry.py @@ -2,6 +2,8 @@ import importlib import inspect from typing import Dict, Generic, Type, TypeVar, Union +import pkg_resources + from datahub.configuration.common import ConfigurationError T = TypeVar("T") @@ -30,6 +32,19 @@ class Registry(Generic[T]): tp = self._mapping[key] return not isinstance(tp, Exception) + def load(self, entry_point_key: str) -> None: + for entry_point in pkg_resources.iter_entry_points(entry_point_key): + name = entry_point.name + plugin_class = None + + try: + plugin_class = entry_point.load() + except ImportError as e: + self.register_disabled(name, e) + continue + + self.register(name, plugin_class) + @property def mapping(self): return self._mapping diff --git a/metadata-ingestion/src/datahub/ingestion/sink/sink_registry.py b/metadata-ingestion/src/datahub/ingestion/sink/sink_registry.py index e3e554c43c..b4662fabc5 100644 --- a/metadata-ingestion/src/datahub/ingestion/sink/sink_registry.py +++ b/metadata-ingestion/src/datahub/ingestion/sink/sink_registry.py @@ -1,25 +1,9 @@ from datahub.ingestion.api.registry import Registry from datahub.ingestion.api.sink import Sink -from .console import ConsoleSink -from .file import FileSink - sink_registry = Registry[Sink]() +sink_registry.load("datahub.ingestion.sink.plugins") -# These sinks are always enabled. -sink_registry.register("console", ConsoleSink) -sink_registry.register("file", FileSink) - -try: - from .datahub_kafka import DatahubKafkaSink - - sink_registry.register("datahub-kafka", DatahubKafkaSink) -except ImportError as e: - sink_registry.register_disabled("datahub-kafka", e) - -try: - from .datahub_rest import DatahubRestSink - - sink_registry.register("datahub-rest", DatahubRestSink) -except ImportError as e: - sink_registry.register_disabled("datahub-rest", e) +# These sinks are always enabled +assert sink_registry.get("console") +assert sink_registry.get("file") diff --git a/metadata-ingestion/src/datahub/ingestion/source/source_registry.py b/metadata-ingestion/src/datahub/ingestion/source/source_registry.py index d3919904c0..3ad5c2a3ec 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/source_registry.py +++ b/metadata-ingestion/src/datahub/ingestion/source/source_registry.py @@ -1,94 +1,8 @@ from datahub.ingestion.api.registry import Registry from datahub.ingestion.api.source import Source -from .mce_file import MetadataFileSource - source_registry = Registry[Source]() +source_registry.load("datahub.ingestion.source.plugins") -# This source is always enabled. -source_registry.register("file", MetadataFileSource) - -try: - from .athena import AthenaSource - - source_registry.register("athena", AthenaSource) -except ImportError as e: - source_registry.register_disabled("athena", e) - -try: - from .bigquery import BigQuerySource - - source_registry.register("bigquery", BigQuerySource) -except ImportError as e: - source_registry.register_disabled("bigquery", e) - -try: - from .hive import HiveSource - - source_registry.register("hive", HiveSource) -except ImportError as e: - source_registry.register_disabled("hive", e) - -try: - from .mssql import SQLServerSource - - source_registry.register("mssql", SQLServerSource) -except ImportError as e: - source_registry.register_disabled("mssql", e) - -try: - from .mysql import MySQLSource - - source_registry.register("mysql", MySQLSource) -except ImportError as e: - source_registry.register_disabled("mysql", e) - -try: - from .postgres import PostgresSource - - source_registry.register("postgres", PostgresSource) -except ImportError as e: - source_registry.register_disabled("postgres", e) - -try: - from .snowflake import SnowflakeSource - - source_registry.register("snowflake", SnowflakeSource) -except ImportError as e: - source_registry.register_disabled("snowflake", e) - -try: - from .druid import DruidSource - - source_registry.register("druid", DruidSource) -except ImportError as e: - source_registry.register_disabled("druid", e) - -try: - from .kafka import KafkaSource - - source_registry.register("kafka", KafkaSource) -except ImportError as e: - source_registry.register_disabled("kafka", e) - -try: - from .dbt import DBTSource - - source_registry.register("dbt", DBTSource) -except ImportError as e: - source_registry.register_disabled("dbt", e) - -try: - from .ldap import LDAPSource - - source_registry.register("ldap", LDAPSource) -except ImportError as e: - source_registry.register_disabled("ldap", e) - - -try: - from .mongodb import MongoDBSource - - source_registry.register("mongodb", MongoDBSource) -except ImportError as e: - source_registry.register_disabled("mongodb", e) +# This source is always enabled +assert source_registry.get("file")