diff --git a/ingestion/setup.py b/ingestion/setup.py index 44378e59d88..e1752f2f6fa 100644 --- a/ingestion/setup.py +++ b/ingestion/setup.py @@ -38,6 +38,7 @@ def get_long_description(): base_requirements = { "commonregex", + "idna<3,>=2.5", "click<7.2.0,>=7.1.1", "expandvars>=0.6.5" "dataclasses>=0.8" @@ -54,9 +55,9 @@ base_requirements = { "python-jose==3.3.0", "okta==1.7.0", "pandas~=1.3.1", - "sqlalchemy>=1.3.24" - "sql-metadata~=2.0.0" - "spacy==3.0.5" + "sqlalchemy>=1.3.24", + "sql-metadata~=2.0.0", + "spacy==3.0.5", "requests~=2.25.1", "en_core_web_sm@https://github.com/explosion/spacy-models/releases/download/en_core_web_sm-3.0.0/en_core_web_sm-3.0.0.tar.gz#egg=en_core_web" } @@ -89,7 +90,7 @@ plugins: Dict[str, Set[str]] = { build_options = {"includes": ["_cffi_backend"]} setup( name="openmetadata-ingestion", - version="0.2.1", + version="0.2.2", url="https://open-metadata.org/", author="OpenMetadata Committers", license="Apache License 2.0", diff --git a/ingestion/src/metadata/ingestion/api/registry.py b/ingestion/src/metadata/ingestion/api/registry.py deleted file mode 100644 index c4be23b92d9..00000000000 --- a/ingestion/src/metadata/ingestion/api/registry.py +++ /dev/null @@ -1,105 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import importlib -import inspect -from typing import Dict, Generic, Type, TypeVar, Union - -import pkg_resources -import typing_inspect - -from metadata.config.common import ConfigurationError - -T = TypeVar("T") - - -class Registry(Generic[T]): - def __init__(self): - self._mapping: Dict[str, Union[Type[T], Exception]] = {} - - def _get_registered_type(self) -> Type[T]: - cls = typing_inspect.get_generic_type(self) - tp = typing_inspect.get_args(cls)[0] - return tp - - def _check_cls(self, cls: Type[T]): - if inspect.isabstract(cls): - raise ValueError( - f"cannot register an abstract type in the registry; got {cls}" - ) - super_cls = self._get_registered_type() - if not issubclass(cls, super_cls): - raise ValueError(f"must be derived from {super_cls}; got {cls}") - - def _register(self, key: str, tp: Union[Type[T], Exception]) -> None: - if key in self._mapping: - raise KeyError(f"key already in use - {key}") - if key.find(".") >= 0: - raise KeyError(f"key cannot contain '.' - {key}") - self._mapping[key] = tp - - def register(self, key: str, cls: Type[T]) -> None: - self._check_cls(cls) - self._register(key, cls) - - def register_disabled(self, key: str, reason: Exception) -> None: - self._register(key, reason) - - def is_enabled(self, key: str) -> bool: - tp = self._mapping[key] - return not isinstance(tp, Exception) - - def load(self, entry_point_key: str) -> None: - for entry_point in pkg_resources.iter_entry_points(entry_point_key): - name = entry_point.name - - try: - plugin_class = entry_point.load() - except ImportError as e: - self.register_disabled(name, e) - continue - - self.register(name, plugin_class) - - @property - def mapping(self): - return self._mapping - - def get(self, key: str) -> Type[T]: - if key.find(".") >= 0: - # If the key contains a dot, we treat it as a import path and attempt - # to load it dynamically. - module_name, class_name = key.rsplit(".", 1) - MyClass = getattr(importlib.import_module(module_name), class_name) - self._check_cls(MyClass) - return MyClass - - if key not in self._mapping: - raise KeyError(f"Did not find a registered class for {key}") - tp = self._mapping[key] - if isinstance(tp, Exception): - raise ConfigurationError( - f'{key} is disabled; try running: pip install ".[{key}]"' - ) from tp - else: - # If it's not an exception, then it's a registered type. - return tp - - def __str__(self): - col_width = 15 - return "\n".join( - f"{key}{'' if self.is_enabled(key) else (' ' * (col_width - len(key))) + '(disabled)'}" - for key in sorted(self._mapping.keys()) - ) diff --git a/ingestion/src/metadata/ingestion/ometa/auth_provider_registry.py b/ingestion/src/metadata/ingestion/ometa/auth_provider_registry.py deleted file mode 100644 index 3ebbb8e8336..00000000000 --- a/ingestion/src/metadata/ingestion/ometa/auth_provider_registry.py +++ /dev/null @@ -1,22 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from metadata.ingestion.api.registry import Registry -from metadata.ingestion.ometa.auth_provider import AuthenticationProvider -auth_provider_registry = Registry[AuthenticationProvider]() -#auth_provider_registry.register("google", GoogleAuthenticationProvider.__class__) -#auth_provider_registry.register("no-auth", NoOpAuthenticationProvider.__class__) - -# This source is always enabled diff --git a/ingestion/src/metadata/ingestion/workflow/workflow.py b/ingestion/src/metadata/ingestion/workflow/workflow.py index fdb29d5dce3..d02e6c15e4b 100644 --- a/ingestion/src/metadata/ingestion/workflow/workflow.py +++ b/ingestion/src/metadata/ingestion/workflow/workflow.py @@ -13,9 +13,10 @@ # See the License for the specific language governing permissions and # limitations under the License. +import importlib import logging import uuid -from typing import Optional +from typing import Optional, Type, TypeVar import click from pydantic import Field @@ -31,11 +32,12 @@ from metadata.ingestion.api.processor import Processor from metadata.ingestion.api.sink import Sink from metadata.ingestion.api.source import Source from metadata.ingestion.api.stage import Stage -from metadata.ingestion.api.registry import Registry from metadata.ingestion.api.source import Source logger = logging.getLogger(__name__) +T = TypeVar("T") + class WorkflowConfig(ConfigModel): run_id: str = Field(default_factory=lambda: str(uuid.uuid1())) @@ -59,8 +61,7 @@ class Workflow: self.config = config self.ctx = WorkflowContext(workflow_id=self.config.run_id) source_type = self.config.source.type - source_registry = Registry[Source]() - source_class = source_registry.get('metadata.ingestion.source.{}.{}Source'.format( + source_class = self.get('metadata.ingestion.source.{}.{}Source'.format( source_type.replace('-', '_'), ''.join([i.title() for i in source_type.replace('-', '_').split('_')]))) metadata_config = self.config.metadata_server.dict().get("config", {}) self.source: Source = source_class.create( @@ -72,8 +73,7 @@ class Workflow: if self.config.processor: processor_type = self.config.processor.type - processor_registry = Registry[Processor]() - processor_class = processor_registry.get('metadata.ingestion.processor.{}.{}Processor'.format( + processor_class = self.get('metadata.ingestion.processor.{}.{}Processor'.format( processor_type.replace('-', '_'), ''.join([i.title() for i in processor_type.replace('-', '_').split('_')]))) processor_config = self.config.processor.dict().get("config", {}) self.processor: Processor = processor_class.create(processor_config, metadata_config, self.ctx) @@ -81,8 +81,7 @@ class Workflow: if self.config.stage: stage_type = self.config.stage.type - stage_registry = Registry[Stage]() - stage_class = stage_registry.get('metadata.ingestion.stage.{}.{}Stage'.format( + stage_class = self.get('metadata.ingestion.stage.{}.{}Stage'.format( stage_type.replace('-', '_'), ''.join([i.title() for i in stage_type.replace('-', '_').split('_')]))) stage_config = self.config.stage.dict().get("config", {}) self.stage: Stage = stage_class.create(stage_config, metadata_config, self.ctx) @@ -90,8 +89,7 @@ class Workflow: if self.config.sink: sink_type = self.config.sink.type - sink_registry = Registry[Sink]() - sink_class = sink_registry.get('metadata.ingestion.sink.{}.{}Sink'.format( + sink_class = self.get('metadata.ingestion.sink.{}.{}Sink'.format( sink_type.replace('-', '_'), ''.join([i.title() for i in sink_type.replace('-', '_').split('_')]))) sink_config = self.config.sink.dict().get("config", {}) self.sink: Sink = sink_class.create(sink_config, metadata_config, self.ctx) @@ -99,13 +97,20 @@ class Workflow: if self.config.bulk_sink: bulk_sink_type = self.config.bulk_sink.type - bulk_sink_registry = Registry[BulkSink]() - bulk_sink_class = bulk_sink_registry.get('metadata.ingestion.bulksink.{}.{}BulkSink'.format( + bulk_sink_class = self.get('metadata.ingestion.bulksink.{}.{}BulkSink'.format( bulk_sink_type.replace('-', '_'), ''.join([i.title() for i in bulk_sink_type.replace('-', '_').split('_')]))) bulk_sink_config = self.config.bulk_sink.dict().get("config", {}) self.bulk_sink: BulkSink = bulk_sink_class.create(bulk_sink_config, metadata_config, self.ctx) logger.info(f"BulkSink type:{self.config.bulk_sink.type},{bulk_sink_class} configured") + def get(self, key: str) -> Type[T]: + if key.find(".") >= 0: + # If the key contains a dot, we treat it as a import path and attempt + # to load it dynamically. + module_name, class_name = key.rsplit(".", 1) + MyClass = getattr(importlib.import_module(module_name), class_name) + return MyClass + @classmethod def create(cls, config_dict: dict) -> "Pipeline": config = WorkflowConfig.parse_obj(config_dict)