mirror of
				https://github.com/open-metadata/OpenMetadata.git
				synced 2025-10-31 02:29:03 +00:00 
			
		
		
		
	MINOR: Fix Profiler for SSL Enabled Source (#16613)
This commit is contained in:
		
							parent
							
								
									4f50e0b6a0
								
							
						
					
					
						commit
						7359d6210c
					
				| @ -51,7 +51,6 @@ from metadata.generated.schema.metadataIngestion.databaseServiceProfilerPipeline | |||||||
| from metadata.generated.schema.tests.customMetric import CustomMetric | from metadata.generated.schema.tests.customMetric import CustomMetric | ||||||
| from metadata.ingestion.api.status import Status | from metadata.ingestion.api.status import Status | ||||||
| from metadata.ingestion.ometa.ometa_api import OpenMetadata | from metadata.ingestion.ometa.ometa_api import OpenMetadata | ||||||
| from metadata.ingestion.source.connections import get_connection |  | ||||||
| from metadata.profiler.api.models import ( | from metadata.profiler.api.models import ( | ||||||
|     DatabaseAndSchemaConfig, |     DatabaseAndSchemaConfig, | ||||||
|     ProfilerProcessorConfig, |     ProfilerProcessorConfig, | ||||||
| @ -63,6 +62,7 @@ from metadata.profiler.metrics.registry import Metrics | |||||||
| from metadata.profiler.processor.runner import QueryRunner | from metadata.profiler.processor.runner import QueryRunner | ||||||
| from metadata.utils.constants import SAMPLE_DATA_DEFAULT_COUNT | from metadata.utils.constants import SAMPLE_DATA_DEFAULT_COUNT | ||||||
| from metadata.utils.partition import get_partition_details | from metadata.utils.partition import get_partition_details | ||||||
|  | from metadata.utils.ssl_manager import get_ssl_connection | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| class ProfilerProcessorStatus(Status): | class ProfilerProcessorStatus(Status): | ||||||
| @ -110,7 +110,7 @@ class ProfilerInterface(ABC): | |||||||
|         self.ometa_client = ometa_client |         self.ometa_client = ometa_client | ||||||
|         self.source_config = source_config |         self.source_config = source_config | ||||||
|         self.service_connection_config = service_connection_config |         self.service_connection_config = service_connection_config | ||||||
|         self.connection = get_connection(self.service_connection_config) |         self.connection = get_ssl_connection(self.service_connection_config) | ||||||
|         self.status = ProfilerProcessorStatus() |         self.status = ProfilerProcessorStatus() | ||||||
|         try: |         try: | ||||||
|             fqn = self.table_entity.fullyQualifiedName |             fqn = self.table_entity.fullyQualifiedName | ||||||
|  | |||||||
| @ -41,7 +41,6 @@ from metadata.generated.schema.metadataIngestion.workflow import ( | |||||||
| ) | ) | ||||||
| from metadata.ingestion.api.models import Either | from metadata.ingestion.api.models import Either | ||||||
| from metadata.ingestion.ometa.ometa_api import OpenMetadata | from metadata.ingestion.ometa.ometa_api import OpenMetadata | ||||||
| from metadata.ingestion.source.connections import get_connection |  | ||||||
| from metadata.profiler.source.metadata import ( | from metadata.profiler.source.metadata import ( | ||||||
|     OpenMetadataSource, |     OpenMetadataSource, | ||||||
|     ProfilerSourceAndEntity, |     ProfilerSourceAndEntity, | ||||||
| @ -52,6 +51,7 @@ from metadata.utils.class_helper import get_service_type_from_source_type | |||||||
| from metadata.utils.filters import filter_by_database, filter_by_schema, filter_by_table | from metadata.utils.filters import filter_by_database, filter_by_schema, filter_by_table | ||||||
| from metadata.utils.importer import import_source_class | from metadata.utils.importer import import_source_class | ||||||
| from metadata.utils.logger import profiler_logger | from metadata.utils.logger import profiler_logger | ||||||
|  | from metadata.utils.ssl_manager import get_ssl_connection | ||||||
| 
 | 
 | ||||||
| logger = profiler_logger() | logger = profiler_logger() | ||||||
| 
 | 
 | ||||||
| @ -112,7 +112,7 @@ class OpenMetadataSourceExt(OpenMetadataSource): | |||||||
|         if database_name: |         if database_name: | ||||||
|             logger.info(f"Ingesting from database: {database_name}") |             logger.info(f"Ingesting from database: {database_name}") | ||||||
|             new_service_connection.database = database_name |             new_service_connection.database = database_name | ||||||
|         self.engine = get_connection(new_service_connection) |         self.engine = get_ssl_connection(new_service_connection) | ||||||
|         self.inspector = inspect(self.engine) |         self.inspector = inspect(self.engine) | ||||||
|         self._connection = None  # Lazy init as well |         self._connection = None  # Lazy init as well | ||||||
| 
 | 
 | ||||||
|  | |||||||
| @ -15,6 +15,7 @@ Module to manage SSL certificates | |||||||
| """ | """ | ||||||
| import os | import os | ||||||
| import tempfile | import tempfile | ||||||
|  | import traceback | ||||||
| from functools import singledispatch, singledispatchmethod | from functools import singledispatch, singledispatchmethod | ||||||
| from typing import Optional, Union, cast | from typing import Optional, Union, cast | ||||||
| 
 | 
 | ||||||
| @ -43,6 +44,10 @@ from metadata.generated.schema.entity.services.connections.messaging.kafkaConnec | |||||||
| ) | ) | ||||||
| from metadata.generated.schema.security.ssl import verifySSLConfig | from metadata.generated.schema.security.ssl import verifySSLConfig | ||||||
| from metadata.ingestion.connections.builders import init_empty_connection_arguments | from metadata.ingestion.connections.builders import init_empty_connection_arguments | ||||||
|  | from metadata.ingestion.source.connections import get_connection | ||||||
|  | from metadata.utils.logger import utils_logger | ||||||
|  | 
 | ||||||
|  | logger = utils_logger() | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| class SSLManager: | class SSLManager: | ||||||
| @ -173,3 +178,14 @@ def _(connection): | |||||||
|             ca=connection.sslConfig.root.caCertificate if connection.sslConfig else None |             ca=connection.sslConfig.root.caCertificate if connection.sslConfig else None | ||||||
|         ) |         ) | ||||||
|     return None |     return None | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | def get_ssl_connection(service_config): | ||||||
|  |     try: | ||||||
|  |         ssl_manager: SSLManager = check_ssl_and_init(service_config) | ||||||
|  |         if ssl_manager: | ||||||
|  |             service_config = ssl_manager.setup_ssl(service_config) | ||||||
|  |     except Exception: | ||||||
|  |         logger.debug("Failed to setup SSL for the connection") | ||||||
|  |         logger.debug(traceback.format_exc()) | ||||||
|  |     return get_connection(service_config) | ||||||
|  | |||||||
| @ -15,13 +15,14 @@ from metadata.generated.schema.metadataIngestion.workflow import ( | |||||||
|     OpenMetadataWorkflowConfig, |     OpenMetadataWorkflowConfig, | ||||||
| ) | ) | ||||||
| from metadata.ingestion.api.steps import Processor, Sink | from metadata.ingestion.api.steps import Processor, Sink | ||||||
| from metadata.ingestion.source.connections import get_connection, get_test_connection_fn | from metadata.ingestion.source.connections import get_test_connection_fn | ||||||
| from metadata.pii.processor import PIIProcessor | from metadata.pii.processor import PIIProcessor | ||||||
| from metadata.profiler.processor.processor import ProfilerProcessor | from metadata.profiler.processor.processor import ProfilerProcessor | ||||||
| from metadata.profiler.source.metadata import OpenMetadataSource | from metadata.profiler.source.metadata import OpenMetadataSource | ||||||
| from metadata.profiler.source.metadata_ext import OpenMetadataSourceExt | from metadata.profiler.source.metadata_ext import OpenMetadataSourceExt | ||||||
| from metadata.utils.importer import import_sink_class | from metadata.utils.importer import import_sink_class | ||||||
| from metadata.utils.logger import profiler_logger | from metadata.utils.logger import profiler_logger | ||||||
|  | from metadata.utils.ssl_manager import get_ssl_connection | ||||||
| from metadata.workflow.ingestion import IngestionWorkflow | from metadata.workflow.ingestion import IngestionWorkflow | ||||||
| 
 | 
 | ||||||
| logger = profiler_logger() | logger = profiler_logger() | ||||||
| @ -62,7 +63,7 @@ class ProfilerWorkflow(IngestionWorkflow): | |||||||
| 
 | 
 | ||||||
|     def test_connection(self): |     def test_connection(self): | ||||||
|         service_config = self.config.source.serviceConnection.root.config |         service_config = self.config.source.serviceConnection.root.config | ||||||
|         conn = get_connection(service_config) |         conn = get_ssl_connection(service_config) | ||||||
| 
 | 
 | ||||||
|         test_connection_fn = get_test_connection_fn(service_config) |         test_connection_fn = get_test_connection_fn(service_config) | ||||||
|         test_connection_fn(self.metadata, conn, service_config) |         test_connection_fn(self.metadata, conn, service_config) | ||||||
|  | |||||||
| @ -83,7 +83,7 @@ class DatalakeMetricsTest(TestCase): | |||||||
| 
 | 
 | ||||||
|     @classmethod |     @classmethod | ||||||
|     @mock.patch( |     @mock.patch( | ||||||
|         "metadata.profiler.interface.profiler_interface.get_connection", |         "metadata.profiler.interface.profiler_interface.get_ssl_connection", | ||||||
|         return_value=FakeConnection, |         return_value=FakeConnection, | ||||||
|     ) |     ) | ||||||
|     @mock.patch( |     @mock.patch( | ||||||
|  | |||||||
| @ -146,7 +146,7 @@ class ProfilerTest(TestCase): | |||||||
| 
 | 
 | ||||||
|     @classmethod |     @classmethod | ||||||
|     @mock.patch( |     @mock.patch( | ||||||
|         "metadata.profiler.interface.profiler_interface.get_connection", |         "metadata.profiler.interface.profiler_interface.get_ssl_connection", | ||||||
|         return_value=FakeConnection, |         return_value=FakeConnection, | ||||||
|     ) |     ) | ||||||
|     @mock.patch( |     @mock.patch( | ||||||
|  | |||||||
| @ -141,7 +141,7 @@ class PandasInterfaceTest(TestCase): | |||||||
| 
 | 
 | ||||||
|     @classmethod |     @classmethod | ||||||
|     @mock.patch( |     @mock.patch( | ||||||
|         "metadata.profiler.interface.profiler_interface.get_connection", |         "metadata.profiler.interface.profiler_interface.get_ssl_connection", | ||||||
|         return_value=FakeConnection, |         return_value=FakeConnection, | ||||||
|     ) |     ) | ||||||
|     @mock.patch( |     @mock.patch( | ||||||
|  | |||||||
| @ -131,7 +131,7 @@ class DatalakeSampleTest(TestCase): | |||||||
| 
 | 
 | ||||||
|     @classmethod |     @classmethod | ||||||
|     @mock.patch( |     @mock.patch( | ||||||
|         "metadata.profiler.interface.profiler_interface.get_connection", |         "metadata.profiler.interface.profiler_interface.get_ssl_connection", | ||||||
|         return_value=FakeConnection, |         return_value=FakeConnection, | ||||||
|     ) |     ) | ||||||
|     @mock.patch( |     @mock.patch( | ||||||
| @ -169,7 +169,7 @@ class DatalakeSampleTest(TestCase): | |||||||
|         assert res < 5 |         assert res < 5 | ||||||
| 
 | 
 | ||||||
|     @mock.patch( |     @mock.patch( | ||||||
|         "metadata.profiler.interface.profiler_interface.get_connection", |         "metadata.profiler.interface.profiler_interface.get_ssl_connection", | ||||||
|         return_value=FakeConnection, |         return_value=FakeConnection, | ||||||
|     ) |     ) | ||||||
|     @mock.patch( |     @mock.patch( | ||||||
|  | |||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user
	 Mayur Singal
						Mayur Singal