Removed registry files and modified workflow and setup.py

This commit is contained in:
Ayush Shah 2021-08-14 00:54:16 +05:30
parent 82bec0e50f
commit 2b26274804
4 changed files with 21 additions and 143 deletions

View File

@ -54,9 +54,9 @@ base_requirements = {
"python-jose==3.3.0",
"okta==1.7.0",
"pandas~=1.3.1",
"sqlalchemy>=1.3.24"
"sql-metadata~=2.0.0"
"spacy==3.0.5"
"sqlalchemy>=1.3.24",
"sql-metadata~=2.0.0",
"spacy==3.0.5",
"requests~=2.25.1",
"en_core_web_sm@https://github.com/explosion/spacy-models/releases/download/en_core_web_sm-3.0.0/en_core_web_sm-3.0.0.tar.gz#egg=en_core_web"
}
@ -89,7 +89,7 @@ plugins: Dict[str, Set[str]] = {
build_options = {"includes": ["_cffi_backend"]}
setup(
name="openmetadata-ingestion",
version="0.2.1",
version="0.2.2",
url="https://open-metadata.org/",
author="OpenMetadata Committers",
license="Apache License 2.0",

View File

@ -1,105 +0,0 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import importlib
import inspect
from typing import Dict, Generic, Type, TypeVar, Union
import pkg_resources
import typing_inspect
from metadata.config.common import ConfigurationError
T = TypeVar("T")
class Registry(Generic[T]):
def __init__(self):
self._mapping: Dict[str, Union[Type[T], Exception]] = {}
def _get_registered_type(self) -> Type[T]:
cls = typing_inspect.get_generic_type(self)
tp = typing_inspect.get_args(cls)[0]
return tp
def _check_cls(self, cls: Type[T]):
if inspect.isabstract(cls):
raise ValueError(
f"cannot register an abstract type in the registry; got {cls}"
)
super_cls = self._get_registered_type()
if not issubclass(cls, super_cls):
raise ValueError(f"must be derived from {super_cls}; got {cls}")
def _register(self, key: str, tp: Union[Type[T], Exception]) -> None:
if key in self._mapping:
raise KeyError(f"key already in use - {key}")
if key.find(".") >= 0:
raise KeyError(f"key cannot contain '.' - {key}")
self._mapping[key] = tp
def register(self, key: str, cls: Type[T]) -> None:
self._check_cls(cls)
self._register(key, cls)
def register_disabled(self, key: str, reason: Exception) -> None:
self._register(key, reason)
def is_enabled(self, key: str) -> bool:
tp = self._mapping[key]
return not isinstance(tp, Exception)
def load(self, entry_point_key: str) -> None:
for entry_point in pkg_resources.iter_entry_points(entry_point_key):
name = entry_point.name
try:
plugin_class = entry_point.load()
except ImportError as e:
self.register_disabled(name, e)
continue
self.register(name, plugin_class)
@property
def mapping(self):
return self._mapping
def get(self, key: str) -> Type[T]:
if key.find(".") >= 0:
# If the key contains a dot, we treat it as a import path and attempt
# to load it dynamically.
module_name, class_name = key.rsplit(".", 1)
MyClass = getattr(importlib.import_module(module_name), class_name)
self._check_cls(MyClass)
return MyClass
if key not in self._mapping:
raise KeyError(f"Did not find a registered class for {key}")
tp = self._mapping[key]
if isinstance(tp, Exception):
raise ConfigurationError(
f'{key} is disabled; try running: pip install ".[{key}]"'
) from tp
else:
# If it's not an exception, then it's a registered type.
return tp
def __str__(self):
col_width = 15
return "\n".join(
f"{key}{'' if self.is_enabled(key) else (' ' * (col_width - len(key))) + '(disabled)'}"
for key in sorted(self._mapping.keys())
)

View File

@ -1,22 +0,0 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from metadata.ingestion.api.registry import Registry
from metadata.ingestion.ometa.auth_provider import AuthenticationProvider
auth_provider_registry = Registry[AuthenticationProvider]()
#auth_provider_registry.register("google", GoogleAuthenticationProvider.__class__)
#auth_provider_registry.register("no-auth", NoOpAuthenticationProvider.__class__)
# This source is always enabled

View File

@ -13,9 +13,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import importlib
import logging
import uuid
from typing import Optional
from typing import Optional, Type, TypeVar
import click
from pydantic import Field
@ -31,11 +32,12 @@ from metadata.ingestion.api.processor import Processor
from metadata.ingestion.api.sink import Sink
from metadata.ingestion.api.source import Source
from metadata.ingestion.api.stage import Stage
from metadata.ingestion.api.registry import Registry
from metadata.ingestion.api.source import Source
logger = logging.getLogger(__name__)
T = TypeVar("T")
class WorkflowConfig(ConfigModel):
run_id: str = Field(default_factory=lambda: str(uuid.uuid1()))
@ -59,8 +61,7 @@ class Workflow:
self.config = config
self.ctx = WorkflowContext(workflow_id=self.config.run_id)
source_type = self.config.source.type
source_registry = Registry[Source]()
source_class = source_registry.get('metadata.ingestion.source.{}.{}Source'.format(
source_class = self.get('metadata.ingestion.source.{}.{}Source'.format(
source_type.replace('-', '_'), ''.join([i.title() for i in source_type.replace('-', '_').split('_')])))
metadata_config = self.config.metadata_server.dict().get("config", {})
self.source: Source = source_class.create(
@ -72,8 +73,7 @@ class Workflow:
if self.config.processor:
processor_type = self.config.processor.type
processor_registry = Registry[Processor]()
processor_class = processor_registry.get('metadata.ingestion.processor.{}.{}Processor'.format(
processor_class = self.get('metadata.ingestion.processor.{}.{}Processor'.format(
processor_type.replace('-', '_'), ''.join([i.title() for i in processor_type.replace('-', '_').split('_')])))
processor_config = self.config.processor.dict().get("config", {})
self.processor: Processor = processor_class.create(processor_config, metadata_config, self.ctx)
@ -81,8 +81,7 @@ class Workflow:
if self.config.stage:
stage_type = self.config.stage.type
stage_registry = Registry[Stage]()
stage_class = stage_registry.get('metadata.ingestion.stage.{}.{}Stage'.format(
stage_class = self.get('metadata.ingestion.stage.{}.{}Stage'.format(
stage_type.replace('-', '_'), ''.join([i.title() for i in stage_type.replace('-', '_').split('_')])))
stage_config = self.config.stage.dict().get("config", {})
self.stage: Stage = stage_class.create(stage_config, metadata_config, self.ctx)
@ -90,8 +89,7 @@ class Workflow:
if self.config.sink:
sink_type = self.config.sink.type
sink_registry = Registry[Sink]()
sink_class = sink_registry.get('metadata.ingestion.sink.{}.{}Sink'.format(
sink_class = self.get('metadata.ingestion.sink.{}.{}Sink'.format(
sink_type.replace('-', '_'), ''.join([i.title() for i in sink_type.replace('-', '_').split('_')])))
sink_config = self.config.sink.dict().get("config", {})
self.sink: Sink = sink_class.create(sink_config, metadata_config, self.ctx)
@ -99,13 +97,20 @@ class Workflow:
if self.config.bulk_sink:
bulk_sink_type = self.config.bulk_sink.type
bulk_sink_registry = Registry[BulkSink]()
bulk_sink_class = bulk_sink_registry.get('metadata.ingestion.bulksink.{}.{}BulkSink'.format(
bulk_sink_class = self.get('metadata.ingestion.bulksink.{}.{}BulkSink'.format(
bulk_sink_type.replace('-', '_'), ''.join([i.title() for i in bulk_sink_type.replace('-', '_').split('_')])))
bulk_sink_config = self.config.bulk_sink.dict().get("config", {})
self.bulk_sink: BulkSink = bulk_sink_class.create(bulk_sink_config, metadata_config, self.ctx)
logger.info(f"BulkSink type:{self.config.bulk_sink.type},{bulk_sink_class} configured")
def get(self, key: str) -> Type[T]:
if key.find(".") >= 0:
# If the key contains a dot, we treat it as a import path and attempt
# to load it dynamically.
module_name, class_name = key.rsplit(".", 1)
MyClass = getattr(importlib.import_module(module_name), class_name)
return MyClass
@classmethod
def create(cls, config_dict: dict) -> "Pipeline":
config = WorkflowConfig.parse_obj(config_dict)