From 7359d6210c58ded171b7cbd6d99f0a82c34b044f Mon Sep 17 00:00:00 2001 From: Mayur Singal <39544459+ulixius9@users.noreply.github.com> Date: Wed, 12 Jun 2024 11:40:30 +0530 Subject: [PATCH] MINOR: Fix Profiler for SSL Enabled Source (#16613) --- .../profiler/interface/profiler_interface.py | 4 ++-- .../src/metadata/profiler/source/metadata_ext.py | 4 ++-- ingestion/src/metadata/utils/ssl_manager.py | 16 ++++++++++++++++ ingestion/src/metadata/workflow/profiler.py | 5 +++-- .../profiler/pandas/test_datalake_metrics.py | 2 +- .../tests/unit/profiler/pandas/test_profiler.py | 2 +- .../profiler/pandas/test_profiler_interface.py | 2 +- .../tests/unit/profiler/pandas/test_sample.py | 4 ++-- 8 files changed, 28 insertions(+), 11 deletions(-) diff --git a/ingestion/src/metadata/profiler/interface/profiler_interface.py b/ingestion/src/metadata/profiler/interface/profiler_interface.py index 0d912887eee..26176480c92 100644 --- a/ingestion/src/metadata/profiler/interface/profiler_interface.py +++ b/ingestion/src/metadata/profiler/interface/profiler_interface.py @@ -51,7 +51,6 @@ from metadata.generated.schema.metadataIngestion.databaseServiceProfilerPipeline from metadata.generated.schema.tests.customMetric import CustomMetric from metadata.ingestion.api.status import Status from metadata.ingestion.ometa.ometa_api import OpenMetadata -from metadata.ingestion.source.connections import get_connection from metadata.profiler.api.models import ( DatabaseAndSchemaConfig, ProfilerProcessorConfig, @@ -63,6 +62,7 @@ from metadata.profiler.metrics.registry import Metrics from metadata.profiler.processor.runner import QueryRunner from metadata.utils.constants import SAMPLE_DATA_DEFAULT_COUNT from metadata.utils.partition import get_partition_details +from metadata.utils.ssl_manager import get_ssl_connection class ProfilerProcessorStatus(Status): @@ -110,7 +110,7 @@ class ProfilerInterface(ABC): self.ometa_client = ometa_client self.source_config = source_config self.service_connection_config = service_connection_config - self.connection = get_connection(self.service_connection_config) + self.connection = get_ssl_connection(self.service_connection_config) self.status = ProfilerProcessorStatus() try: fqn = self.table_entity.fullyQualifiedName diff --git a/ingestion/src/metadata/profiler/source/metadata_ext.py b/ingestion/src/metadata/profiler/source/metadata_ext.py index 42e7d6ee9ac..71d858f5f91 100644 --- a/ingestion/src/metadata/profiler/source/metadata_ext.py +++ b/ingestion/src/metadata/profiler/source/metadata_ext.py @@ -41,7 +41,6 @@ from metadata.generated.schema.metadataIngestion.workflow import ( ) from metadata.ingestion.api.models import Either from metadata.ingestion.ometa.ometa_api import OpenMetadata -from metadata.ingestion.source.connections import get_connection from metadata.profiler.source.metadata import ( OpenMetadataSource, ProfilerSourceAndEntity, @@ -52,6 +51,7 @@ from metadata.utils.class_helper import get_service_type_from_source_type from metadata.utils.filters import filter_by_database, filter_by_schema, filter_by_table from metadata.utils.importer import import_source_class from metadata.utils.logger import profiler_logger +from metadata.utils.ssl_manager import get_ssl_connection logger = profiler_logger() @@ -112,7 +112,7 @@ class OpenMetadataSourceExt(OpenMetadataSource): if database_name: logger.info(f"Ingesting from database: {database_name}") new_service_connection.database = database_name - self.engine = get_connection(new_service_connection) + self.engine = get_ssl_connection(new_service_connection) self.inspector = inspect(self.engine) self._connection = None # Lazy init as well diff --git a/ingestion/src/metadata/utils/ssl_manager.py b/ingestion/src/metadata/utils/ssl_manager.py index bc56618b27a..812d895cf4c 100644 --- a/ingestion/src/metadata/utils/ssl_manager.py +++ b/ingestion/src/metadata/utils/ssl_manager.py @@ -15,6 +15,7 @@ Module to manage SSL certificates """ import os import tempfile +import traceback from functools import singledispatch, singledispatchmethod from typing import Optional, Union, cast @@ -43,6 +44,10 @@ from metadata.generated.schema.entity.services.connections.messaging.kafkaConnec ) from metadata.generated.schema.security.ssl import verifySSLConfig from metadata.ingestion.connections.builders import init_empty_connection_arguments +from metadata.ingestion.source.connections import get_connection +from metadata.utils.logger import utils_logger + +logger = utils_logger() class SSLManager: @@ -173,3 +178,14 @@ def _(connection): ca=connection.sslConfig.root.caCertificate if connection.sslConfig else None ) return None + + +def get_ssl_connection(service_config): + try: + ssl_manager: SSLManager = check_ssl_and_init(service_config) + if ssl_manager: + service_config = ssl_manager.setup_ssl(service_config) + except Exception: + logger.debug("Failed to setup SSL for the connection") + logger.debug(traceback.format_exc()) + return get_connection(service_config) diff --git a/ingestion/src/metadata/workflow/profiler.py b/ingestion/src/metadata/workflow/profiler.py index 896464cf290..a80e80c9ddf 100644 --- a/ingestion/src/metadata/workflow/profiler.py +++ b/ingestion/src/metadata/workflow/profiler.py @@ -15,13 +15,14 @@ from metadata.generated.schema.metadataIngestion.workflow import ( OpenMetadataWorkflowConfig, ) from metadata.ingestion.api.steps import Processor, Sink -from metadata.ingestion.source.connections import get_connection, get_test_connection_fn +from metadata.ingestion.source.connections import get_test_connection_fn from metadata.pii.processor import PIIProcessor from metadata.profiler.processor.processor import ProfilerProcessor from metadata.profiler.source.metadata import OpenMetadataSource from metadata.profiler.source.metadata_ext import OpenMetadataSourceExt from metadata.utils.importer import import_sink_class from metadata.utils.logger import profiler_logger +from metadata.utils.ssl_manager import get_ssl_connection from metadata.workflow.ingestion import IngestionWorkflow logger = profiler_logger() @@ -62,7 +63,7 @@ class ProfilerWorkflow(IngestionWorkflow): def test_connection(self): service_config = self.config.source.serviceConnection.root.config - conn = get_connection(service_config) + conn = get_ssl_connection(service_config) test_connection_fn = get_test_connection_fn(service_config) test_connection_fn(self.metadata, conn, service_config) diff --git a/ingestion/tests/unit/profiler/pandas/test_datalake_metrics.py b/ingestion/tests/unit/profiler/pandas/test_datalake_metrics.py index 5600b86c172..f96926a4427 100644 --- a/ingestion/tests/unit/profiler/pandas/test_datalake_metrics.py +++ b/ingestion/tests/unit/profiler/pandas/test_datalake_metrics.py @@ -83,7 +83,7 @@ class DatalakeMetricsTest(TestCase): @classmethod @mock.patch( - "metadata.profiler.interface.profiler_interface.get_connection", + "metadata.profiler.interface.profiler_interface.get_ssl_connection", return_value=FakeConnection, ) @mock.patch( diff --git a/ingestion/tests/unit/profiler/pandas/test_profiler.py b/ingestion/tests/unit/profiler/pandas/test_profiler.py index 41c459ca601..1d30d2602b8 100644 --- a/ingestion/tests/unit/profiler/pandas/test_profiler.py +++ b/ingestion/tests/unit/profiler/pandas/test_profiler.py @@ -146,7 +146,7 @@ class ProfilerTest(TestCase): @classmethod @mock.patch( - "metadata.profiler.interface.profiler_interface.get_connection", + "metadata.profiler.interface.profiler_interface.get_ssl_connection", return_value=FakeConnection, ) @mock.patch( diff --git a/ingestion/tests/unit/profiler/pandas/test_profiler_interface.py b/ingestion/tests/unit/profiler/pandas/test_profiler_interface.py index e3ae5a9177d..a9d893cfed2 100644 --- a/ingestion/tests/unit/profiler/pandas/test_profiler_interface.py +++ b/ingestion/tests/unit/profiler/pandas/test_profiler_interface.py @@ -141,7 +141,7 @@ class PandasInterfaceTest(TestCase): @classmethod @mock.patch( - "metadata.profiler.interface.profiler_interface.get_connection", + "metadata.profiler.interface.profiler_interface.get_ssl_connection", return_value=FakeConnection, ) @mock.patch( diff --git a/ingestion/tests/unit/profiler/pandas/test_sample.py b/ingestion/tests/unit/profiler/pandas/test_sample.py index c54054fd76b..a4d9b9055d7 100644 --- a/ingestion/tests/unit/profiler/pandas/test_sample.py +++ b/ingestion/tests/unit/profiler/pandas/test_sample.py @@ -131,7 +131,7 @@ class DatalakeSampleTest(TestCase): @classmethod @mock.patch( - "metadata.profiler.interface.profiler_interface.get_connection", + "metadata.profiler.interface.profiler_interface.get_ssl_connection", return_value=FakeConnection, ) @mock.patch( @@ -169,7 +169,7 @@ class DatalakeSampleTest(TestCase): assert res < 5 @mock.patch( - "metadata.profiler.interface.profiler_interface.get_connection", + "metadata.profiler.interface.profiler_interface.get_ssl_connection", return_value=FakeConnection, ) @mock.patch(