[GEN-1406]: import plugin and clear plugin message for failed source import (#17788)

* support side effects on source classes by always importing source class

* streamlined error message

* fixed service type extraction for test suite pipeline

* - replaced "custom" with constant
- added quotes for the plugin exception for copy/paste ergonomics
This commit is contained in:
Imri Paran 2024-09-12 16:13:03 +02:00 committed by GitHub
parent 5d972789ea
commit 59854de58d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 110 additions and 41 deletions

View File

@ -22,6 +22,7 @@ from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.services.ingestionPipelines.status import (
StackTraceError,
)
from metadata.generated.schema.entity.services.serviceType import ServiceType
from metadata.generated.schema.metadataIngestion.testSuitePipeline import (
TestSuitePipeline,
)
@ -36,6 +37,8 @@ from metadata.ingestion.api.step import Step
from metadata.ingestion.api.steps import Source
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils import fqn
from metadata.utils.constants import CUSTOM_CONNECTOR_PREFIX
from metadata.utils.importer import import_source_class
from metadata.utils.logger import test_suite_logger
logger = test_suite_logger()
@ -73,7 +76,7 @@ class TestSuiteSource(Source):
table: Table = self.metadata.get_by_name(
entity=Table,
fqn=self.source_config.entityFullyQualifiedName.root,
fields=["tableProfilerConfig", "testSuite"],
fields=["tableProfilerConfig", "testSuite", "serviceType"],
)
return table
@ -104,8 +107,16 @@ class TestSuiteSource(Source):
def _iter(self) -> Iterable[Either[TableAndTests]]:
table: Table = self._get_table_entity()
if table:
source_type = table.serviceType.value.lower()
if source_type.startswith(CUSTOM_CONNECTOR_PREFIX):
logger.warning(
"Data quality tests might not work as expected with custom sources"
)
else:
import_source_class(
service_type=ServiceType.Database, source_type=source_type
)
yield from self._process_table_suite(table)
else:

View File

@ -108,3 +108,5 @@ ENTITY_REFERENCE_CLASS_MAP = {
ENTITY_REFERENCE_TYPE_MAP = {
value.__name__: key for key, value in ENTITY_REFERENCE_CLASS_MAP.items()
}
CUSTOM_CONNECTOR_PREFIX = "custom"

View File

@ -27,6 +27,8 @@ from metadata.generated.schema.entity.services.serviceType import ServiceType
from metadata.generated.schema.metadataIngestion.workflow import Sink as WorkflowSink
from metadata.ingestion.api.steps import BulkSink, Processor, Sink, Source, Stage
from metadata.utils.class_helper import get_service_type_from_source_type
from metadata.utils.client_version import get_client_version
from metadata.utils.constants import CUSTOM_CONNECTOR_PREFIX
from metadata.utils.logger import utils_logger
from metadata.utils.singleton import Singleton
@ -43,6 +45,38 @@ class DynamicImportException(Exception):
Raise it when having issues dynamically importing objects
"""
def __init__(self, module: str, key: str = None, cause: Exception = None):
self.module = module
self.key = key
self.cause = cause
def __str__(self):
import_path = self.module
if self.key:
import_path += f".{self.key}"
return f"Cannot import {import_path} due to {self.cause}"
class MissingPluginException(Exception):
"""
An excpetion that captures a missing openmetadata-ingestion plugin for a specific connector.
"""
def __init__(self, plugin: str):
self.plugin = plugin
def __str__(self):
try:
version = "==" + get_client_version()
except Exception:
logger.warning("unable to get client version")
logger.debug(traceback.format_exc())
version = ""
return (
f"You might be missing the plugin [{self.plugin}]. Try:\n"
f'pip install "openmetadata-ingestion[{self.plugin}]{version}"'
)
def get_module_dir(type_: str) -> str:
"""
@ -93,13 +127,13 @@ def import_from_module(key: str) -> Type[Any]:
Dynamically import an object from a module path
"""
module_name, obj_name = key.rsplit(MODULE_SEPARATOR, 1)
try:
module_name, obj_name = key.rsplit(MODULE_SEPARATOR, 1)
obj = getattr(importlib.import_module(module_name), obj_name)
return obj
except Exception as err:
logger.debug(traceback.format_exc())
raise DynamicImportException(f"Cannot load object from {key} due to {err}")
raise DynamicImportException(module=module_name, key=obj_name, cause=err)
# module building strings read better with .format instead of f-strings
@ -200,7 +234,7 @@ def import_connection_fn(connection: BaseModel, function_name: str) -> Callable:
# module building strings read better with .format instead of f-strings
# pylint: disable=consider-using-f-string
if connection.type.value.lower().startswith("custom"):
if connection.type.value.lower().startswith(CUSTOM_CONNECTOR_PREFIX):
python_class_parts = connection.sourcePythonClass.rsplit(".", 1)
python_module_path = ".".join(python_class_parts[:-1])
@ -261,9 +295,7 @@ class SideEffectsLoader(metaclass=Singleton):
SideEffectsLoader.modules.add(module.__name__)
except Exception as err:
logger.debug(traceback.format_exc())
raise DynamicImportException(
f"Cannot load object from {module} due to {err}"
)
raise DynamicImportException(module=module, cause=err)
else:
logger.debug(f"Module {module} already imported")

View File

@ -173,10 +173,14 @@ def set_loggers_level(level: Union[int, str] = logging.INFO):
def log_ansi_encoded_string(
color: Optional[ANSI] = None, bold: bool = False, message: str = ""
color: Optional[ANSI] = None,
bold: bool = False,
message: str = "",
level=logging.INFO,
):
utils_logger().info(
f"{ANSI.BOLD.value if bold else ''}{color.value if color else ''}{message}{ANSI.ENDC.value}"
utils_logger().log(
level=level,
msg=f"{ANSI.BOLD.value if bold else ''}{color.value if color else ''}{message}{ANSI.ENDC.value}",
)

View File

@ -21,7 +21,7 @@ To be extended by any other workflow:
"""
import traceback
from abc import ABC, abstractmethod
from typing import List, Tuple, cast
from typing import List, Tuple, Type, cast
from metadata.config.common import WorkflowExecutionError
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
@ -46,6 +46,13 @@ from metadata.utils.class_helper import (
get_service_class_from_service_type,
get_service_type_from_source_type,
)
from metadata.utils.constants import CUSTOM_CONNECTOR_PREFIX
from metadata.utils.importer import (
DynamicImportException,
MissingPluginException,
import_from_module,
import_source_class,
)
from metadata.utils.logger import ingestion_logger
from metadata.workflow.base import BaseWorkflow, InvalidWorkflowJSONException
from metadata.workflow.workflow_status_mixin import SUCCESS_THRESHOLD_VALUE
@ -218,3 +225,22 @@ class IngestionWorkflow(BaseWorkflow, ABC):
raise WorkflowExecutionError(
f"Profiler is not supported for the service connection: {self.config.source.serviceConnection}"
)
def import_source_class(self) -> Type[Source]:
source_type = self.config.source.type.lower()
try:
return (
import_from_module(
self.config.source.serviceConnection.root.config.sourcePythonClass
)
if source_type.startswith(CUSTOM_CONNECTOR_PREFIX)
else import_source_class(
service_type=self.service_type, source_type=source_type
)
)
except DynamicImportException as e:
if source_type.startswith(CUSTOM_CONNECTOR_PREFIX):
raise e
logger.debug(traceback.format_exc())
logger.error(f"Failed to import source of type '{source_type}'")
raise MissingPluginException(source_type)

View File

@ -14,11 +14,7 @@ Workflow definition for metadata related ingestions: metadata and lineage.
from metadata.config.common import WorkflowExecutionError
from metadata.ingestion.api.steps import Sink, Source
from metadata.utils.importer import (
import_from_module,
import_sink_class,
import_source_class,
)
from metadata.utils.importer import import_sink_class
from metadata.utils.logger import ingestion_logger
from metadata.workflow.ingestion import IngestionWorkflow
@ -47,15 +43,7 @@ class MetadataWorkflow(IngestionWorkflow):
"configuration here: https://docs.open-metadata.org/connectors"
)
source_class = (
import_from_module(
self.config.source.serviceConnection.root.config.sourcePythonClass
)
if source_type.startswith("custom")
else import_source_class(
service_type=self.service_type, source_type=source_type
)
)
source_class = self.import_source_class()
pipeline_name = (
self.ingestion_pipeline.fullyQualifiedName.root

View File

@ -44,6 +44,7 @@ class ProfilerWorkflow(IngestionWorkflow):
def _get_source_class(self):
if self.config.source.serviceName:
self.import_source_class()
return OpenMetadataSource
logger.info(
"Database Service name not provided, we will scan all the tables "

View File

@ -16,9 +16,7 @@ from metadata.config.common import WorkflowExecutionError
from metadata.ingestion.api.steps import BulkSink, Processor, Source, Stage
from metadata.utils.importer import (
import_bulk_sink_type,
import_from_module,
import_processor_class,
import_source_class,
import_stage_class,
)
from metadata.utils.logger import ingestion_logger
@ -51,16 +49,7 @@ class UsageWorkflow(IngestionWorkflow):
"configuration here: https://docs.open-metadata.org/connectors"
)
source_class = (
import_from_module(
self.config.source.serviceConnection.root.config.sourcePythonClass
)
if source_type.startswith("custom")
else import_source_class(
service_type=self.service_type, source_type=source_type
)
)
source_class = self.import_source_class()
source: Source = source_class.create(
self.config.source.model_dump(), self.metadata
)

View File

@ -12,6 +12,7 @@
"""
Module handles the init error messages from different workflows
"""
import logging
import traceback
from pathlib import Path
from typing import Any, Dict, Optional, Type, Union
@ -25,7 +26,7 @@ from metadata.ingestion.api.parser import (
ParsingConfigurationError,
)
from metadata.utils.constants import UTF_8
from metadata.utils.logger import ANSI, log_ansi_encoded_string
from metadata.utils.logger import ANSI, log_ansi_encoded_string, utils_logger
EXAMPLES_WORKFLOW_PATH: Path = Path(__file__).parent / "../examples" / "workflows"
@ -74,10 +75,10 @@ class WorkflowInitErrorHandler:
source_type_name, pipeline_type
)
else:
utils_logger().debug(traceback.format_exc())
WorkflowInitErrorHandler._print_error_msg(
f"\nError initializing {pipeline_type.name}: {exc}"
)
WorkflowInitErrorHandler._print_error_msg(traceback.format_exc())
WorkflowInitErrorHandler._print_more_info(pipeline_type)
@ -151,4 +152,15 @@ class WorkflowInitErrorHandler:
"""
Print message with error style
"""
log_ansi_encoded_string(color=ANSI.BRIGHT_RED, bold=False, message=f"{msg}")
log_ansi_encoded_string(
color=ANSI.BRIGHT_RED, bold=False, message=f"{msg}", level=logging.ERROR
)
@staticmethod
def _print_debug_msg(msg: str) -> None:
"""
Print message with error style
"""
log_ansi_encoded_string(
color=ANSI.YELLOW, bold=False, message=f"{msg}", level=logging.DEBUG
)

View File

@ -5,6 +5,9 @@ import pytest
from metadata.data_quality.source.test_suite import TestSuiteSource
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.services.databaseService import (
DatabaseServiceType,
)
from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataWorkflowConfig,
)
@ -76,6 +79,7 @@ def test_source_config(parameters, expected, monkeypatch):
name="test_table",
columns=[],
testSuite=MOCK_ENTITY_REFERENCE,
serviceType=DatabaseServiceType.Postgres,
)
mock_metadata.list_all_entities.return_value = [
TestCase(