diff --git a/ingestion/src/metadata/data_quality/source/test_suite.py b/ingestion/src/metadata/data_quality/source/test_suite.py index f04eed72e1f..08ebdec500d 100644 --- a/ingestion/src/metadata/data_quality/source/test_suite.py +++ b/ingestion/src/metadata/data_quality/source/test_suite.py @@ -22,6 +22,7 @@ from metadata.generated.schema.entity.data.table import Table from metadata.generated.schema.entity.services.ingestionPipelines.status import ( StackTraceError, ) +from metadata.generated.schema.entity.services.serviceType import ServiceType from metadata.generated.schema.metadataIngestion.testSuitePipeline import ( TestSuitePipeline, ) @@ -36,6 +37,8 @@ from metadata.ingestion.api.step import Step from metadata.ingestion.api.steps import Source from metadata.ingestion.ometa.ometa_api import OpenMetadata from metadata.utils import fqn +from metadata.utils.constants import CUSTOM_CONNECTOR_PREFIX +from metadata.utils.importer import import_source_class from metadata.utils.logger import test_suite_logger logger = test_suite_logger() @@ -73,7 +76,7 @@ class TestSuiteSource(Source): table: Table = self.metadata.get_by_name( entity=Table, fqn=self.source_config.entityFullyQualifiedName.root, - fields=["tableProfilerConfig", "testSuite"], + fields=["tableProfilerConfig", "testSuite", "serviceType"], ) return table @@ -104,8 +107,16 @@ class TestSuiteSource(Source): def _iter(self) -> Iterable[Either[TableAndTests]]: table: Table = self._get_table_entity() - if table: + source_type = table.serviceType.value.lower() + if source_type.startswith(CUSTOM_CONNECTOR_PREFIX): + logger.warning( + "Data quality tests might not work as expected with custom sources" + ) + else: + import_source_class( + service_type=ServiceType.Database, source_type=source_type + ) yield from self._process_table_suite(table) else: diff --git a/ingestion/src/metadata/utils/constants.py b/ingestion/src/metadata/utils/constants.py index fcf212d04f8..cbb1b7b5f3e 100644 --- a/ingestion/src/metadata/utils/constants.py +++ b/ingestion/src/metadata/utils/constants.py @@ -108,3 +108,5 @@ ENTITY_REFERENCE_CLASS_MAP = { ENTITY_REFERENCE_TYPE_MAP = { value.__name__: key for key, value in ENTITY_REFERENCE_CLASS_MAP.items() } + +CUSTOM_CONNECTOR_PREFIX = "custom" diff --git a/ingestion/src/metadata/utils/importer.py b/ingestion/src/metadata/utils/importer.py index 23b99ffc800..12c03eb2ece 100644 --- a/ingestion/src/metadata/utils/importer.py +++ b/ingestion/src/metadata/utils/importer.py @@ -27,6 +27,8 @@ from metadata.generated.schema.entity.services.serviceType import ServiceType from metadata.generated.schema.metadataIngestion.workflow import Sink as WorkflowSink from metadata.ingestion.api.steps import BulkSink, Processor, Sink, Source, Stage from metadata.utils.class_helper import get_service_type_from_source_type +from metadata.utils.client_version import get_client_version +from metadata.utils.constants import CUSTOM_CONNECTOR_PREFIX from metadata.utils.logger import utils_logger from metadata.utils.singleton import Singleton @@ -43,6 +45,38 @@ class DynamicImportException(Exception): Raise it when having issues dynamically importing objects """ + def __init__(self, module: str, key: str = None, cause: Exception = None): + self.module = module + self.key = key + self.cause = cause + + def __str__(self): + import_path = self.module + if self.key: + import_path += f".{self.key}" + return f"Cannot import {import_path} due to {self.cause}" + + +class MissingPluginException(Exception): + """ + An excpetion that captures a missing openmetadata-ingestion plugin for a specific connector. + """ + + def __init__(self, plugin: str): + self.plugin = plugin + + def __str__(self): + try: + version = "==" + get_client_version() + except Exception: + logger.warning("unable to get client version") + logger.debug(traceback.format_exc()) + version = "" + return ( + f"You might be missing the plugin [{self.plugin}]. Try:\n" + f'pip install "openmetadata-ingestion[{self.plugin}]{version}"' + ) + def get_module_dir(type_: str) -> str: """ @@ -93,13 +127,13 @@ def import_from_module(key: str) -> Type[Any]: Dynamically import an object from a module path """ + module_name, obj_name = key.rsplit(MODULE_SEPARATOR, 1) try: - module_name, obj_name = key.rsplit(MODULE_SEPARATOR, 1) obj = getattr(importlib.import_module(module_name), obj_name) return obj except Exception as err: logger.debug(traceback.format_exc()) - raise DynamicImportException(f"Cannot load object from {key} due to {err}") + raise DynamicImportException(module=module_name, key=obj_name, cause=err) # module building strings read better with .format instead of f-strings @@ -200,7 +234,7 @@ def import_connection_fn(connection: BaseModel, function_name: str) -> Callable: # module building strings read better with .format instead of f-strings # pylint: disable=consider-using-f-string - if connection.type.value.lower().startswith("custom"): + if connection.type.value.lower().startswith(CUSTOM_CONNECTOR_PREFIX): python_class_parts = connection.sourcePythonClass.rsplit(".", 1) python_module_path = ".".join(python_class_parts[:-1]) @@ -261,9 +295,7 @@ class SideEffectsLoader(metaclass=Singleton): SideEffectsLoader.modules.add(module.__name__) except Exception as err: logger.debug(traceback.format_exc()) - raise DynamicImportException( - f"Cannot load object from {module} due to {err}" - ) + raise DynamicImportException(module=module, cause=err) else: logger.debug(f"Module {module} already imported") diff --git a/ingestion/src/metadata/utils/logger.py b/ingestion/src/metadata/utils/logger.py index aadc0fafb2f..c33eb3b96d1 100644 --- a/ingestion/src/metadata/utils/logger.py +++ b/ingestion/src/metadata/utils/logger.py @@ -173,10 +173,14 @@ def set_loggers_level(level: Union[int, str] = logging.INFO): def log_ansi_encoded_string( - color: Optional[ANSI] = None, bold: bool = False, message: str = "" + color: Optional[ANSI] = None, + bold: bool = False, + message: str = "", + level=logging.INFO, ): - utils_logger().info( - f"{ANSI.BOLD.value if bold else ''}{color.value if color else ''}{message}{ANSI.ENDC.value}" + utils_logger().log( + level=level, + msg=f"{ANSI.BOLD.value if bold else ''}{color.value if color else ''}{message}{ANSI.ENDC.value}", ) diff --git a/ingestion/src/metadata/workflow/ingestion.py b/ingestion/src/metadata/workflow/ingestion.py index 19b1a5f739e..cfa78c1259e 100644 --- a/ingestion/src/metadata/workflow/ingestion.py +++ b/ingestion/src/metadata/workflow/ingestion.py @@ -21,7 +21,7 @@ To be extended by any other workflow: """ import traceback from abc import ABC, abstractmethod -from typing import List, Tuple, cast +from typing import List, Tuple, Type, cast from metadata.config.common import WorkflowExecutionError from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( @@ -46,6 +46,13 @@ from metadata.utils.class_helper import ( get_service_class_from_service_type, get_service_type_from_source_type, ) +from metadata.utils.constants import CUSTOM_CONNECTOR_PREFIX +from metadata.utils.importer import ( + DynamicImportException, + MissingPluginException, + import_from_module, + import_source_class, +) from metadata.utils.logger import ingestion_logger from metadata.workflow.base import BaseWorkflow, InvalidWorkflowJSONException from metadata.workflow.workflow_status_mixin import SUCCESS_THRESHOLD_VALUE @@ -218,3 +225,22 @@ class IngestionWorkflow(BaseWorkflow, ABC): raise WorkflowExecutionError( f"Profiler is not supported for the service connection: {self.config.source.serviceConnection}" ) + + def import_source_class(self) -> Type[Source]: + source_type = self.config.source.type.lower() + try: + return ( + import_from_module( + self.config.source.serviceConnection.root.config.sourcePythonClass + ) + if source_type.startswith(CUSTOM_CONNECTOR_PREFIX) + else import_source_class( + service_type=self.service_type, source_type=source_type + ) + ) + except DynamicImportException as e: + if source_type.startswith(CUSTOM_CONNECTOR_PREFIX): + raise e + logger.debug(traceback.format_exc()) + logger.error(f"Failed to import source of type '{source_type}'") + raise MissingPluginException(source_type) diff --git a/ingestion/src/metadata/workflow/metadata.py b/ingestion/src/metadata/workflow/metadata.py index b440db98028..204299bb5db 100644 --- a/ingestion/src/metadata/workflow/metadata.py +++ b/ingestion/src/metadata/workflow/metadata.py @@ -14,11 +14,7 @@ Workflow definition for metadata related ingestions: metadata and lineage. from metadata.config.common import WorkflowExecutionError from metadata.ingestion.api.steps import Sink, Source -from metadata.utils.importer import ( - import_from_module, - import_sink_class, - import_source_class, -) +from metadata.utils.importer import import_sink_class from metadata.utils.logger import ingestion_logger from metadata.workflow.ingestion import IngestionWorkflow @@ -47,15 +43,7 @@ class MetadataWorkflow(IngestionWorkflow): "configuration here: https://docs.open-metadata.org/connectors" ) - source_class = ( - import_from_module( - self.config.source.serviceConnection.root.config.sourcePythonClass - ) - if source_type.startswith("custom") - else import_source_class( - service_type=self.service_type, source_type=source_type - ) - ) + source_class = self.import_source_class() pipeline_name = ( self.ingestion_pipeline.fullyQualifiedName.root diff --git a/ingestion/src/metadata/workflow/profiler.py b/ingestion/src/metadata/workflow/profiler.py index b0363fbc987..c6987fb0ef1 100644 --- a/ingestion/src/metadata/workflow/profiler.py +++ b/ingestion/src/metadata/workflow/profiler.py @@ -44,6 +44,7 @@ class ProfilerWorkflow(IngestionWorkflow): def _get_source_class(self): if self.config.source.serviceName: + self.import_source_class() return OpenMetadataSource logger.info( "Database Service name not provided, we will scan all the tables " diff --git a/ingestion/src/metadata/workflow/usage.py b/ingestion/src/metadata/workflow/usage.py index aeab26a13ec..e1d9368f033 100644 --- a/ingestion/src/metadata/workflow/usage.py +++ b/ingestion/src/metadata/workflow/usage.py @@ -16,9 +16,7 @@ from metadata.config.common import WorkflowExecutionError from metadata.ingestion.api.steps import BulkSink, Processor, Source, Stage from metadata.utils.importer import ( import_bulk_sink_type, - import_from_module, import_processor_class, - import_source_class, import_stage_class, ) from metadata.utils.logger import ingestion_logger @@ -51,16 +49,7 @@ class UsageWorkflow(IngestionWorkflow): "configuration here: https://docs.open-metadata.org/connectors" ) - source_class = ( - import_from_module( - self.config.source.serviceConnection.root.config.sourcePythonClass - ) - if source_type.startswith("custom") - else import_source_class( - service_type=self.service_type, source_type=source_type - ) - ) - + source_class = self.import_source_class() source: Source = source_class.create( self.config.source.model_dump(), self.metadata ) diff --git a/ingestion/src/metadata/workflow/workflow_init_error_handler.py b/ingestion/src/metadata/workflow/workflow_init_error_handler.py index ff1cffa26e0..0e7c614762f 100644 --- a/ingestion/src/metadata/workflow/workflow_init_error_handler.py +++ b/ingestion/src/metadata/workflow/workflow_init_error_handler.py @@ -12,6 +12,7 @@ """ Module handles the init error messages from different workflows """ +import logging import traceback from pathlib import Path from typing import Any, Dict, Optional, Type, Union @@ -25,7 +26,7 @@ from metadata.ingestion.api.parser import ( ParsingConfigurationError, ) from metadata.utils.constants import UTF_8 -from metadata.utils.logger import ANSI, log_ansi_encoded_string +from metadata.utils.logger import ANSI, log_ansi_encoded_string, utils_logger EXAMPLES_WORKFLOW_PATH: Path = Path(__file__).parent / "../examples" / "workflows" @@ -74,10 +75,10 @@ class WorkflowInitErrorHandler: source_type_name, pipeline_type ) else: + utils_logger().debug(traceback.format_exc()) WorkflowInitErrorHandler._print_error_msg( f"\nError initializing {pipeline_type.name}: {exc}" ) - WorkflowInitErrorHandler._print_error_msg(traceback.format_exc()) WorkflowInitErrorHandler._print_more_info(pipeline_type) @@ -151,4 +152,15 @@ class WorkflowInitErrorHandler: """ Print message with error style """ - log_ansi_encoded_string(color=ANSI.BRIGHT_RED, bold=False, message=f"{msg}") + log_ansi_encoded_string( + color=ANSI.BRIGHT_RED, bold=False, message=f"{msg}", level=logging.ERROR + ) + + @staticmethod + def _print_debug_msg(msg: str) -> None: + """ + Print message with error style + """ + log_ansi_encoded_string( + color=ANSI.YELLOW, bold=False, message=f"{msg}", level=logging.DEBUG + ) diff --git a/ingestion/tests/unit/data_quality/source/test_test_suite.py b/ingestion/tests/unit/data_quality/source/test_test_suite.py index d909026a4f3..822f125a846 100644 --- a/ingestion/tests/unit/data_quality/source/test_test_suite.py +++ b/ingestion/tests/unit/data_quality/source/test_test_suite.py @@ -5,6 +5,9 @@ import pytest from metadata.data_quality.source.test_suite import TestSuiteSource from metadata.generated.schema.entity.data.table import Table +from metadata.generated.schema.entity.services.databaseService import ( + DatabaseServiceType, +) from metadata.generated.schema.metadataIngestion.workflow import ( OpenMetadataWorkflowConfig, ) @@ -76,6 +79,7 @@ def test_source_config(parameters, expected, monkeypatch): name="test_table", columns=[], testSuite=MOCK_ENTITY_REFERENCE, + serviceType=DatabaseServiceType.Postgres, ) mock_metadata.list_all_entities.return_value = [ TestCase(