diff --git a/ingestion/src/metadata/orm_profiler/api/workflow.py b/ingestion/src/metadata/orm_profiler/api/workflow.py index 56c697c5aa8..cce60288a7d 100644 --- a/ingestion/src/metadata/orm_profiler/api/workflow.py +++ b/ingestion/src/metadata/orm_profiler/api/workflow.py @@ -29,6 +29,7 @@ from metadata.generated.schema.entity.data.table import Table from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import ( OpenMetadataConnection, ) +from metadata.generated.schema.entity.services.databaseService import DatabaseService from metadata.generated.schema.metadataIngestion.databaseServiceProfilerPipeline import ( DatabaseServiceProfilerPipeline, ) @@ -90,6 +91,13 @@ class ProfilerWorkflow: # OpenMetadata client to fetch tables self.metadata = OpenMetadata(self.metadata_config) + if not self._validate_service_name(): + raise ValueError( + f"Service name `{self.config.source.serviceName}` does not exist. " + "Make sure you have run the ingestion for the service specified in the profiler workflow. " + "If so, make sure the profiler service name matches the service name specified during ingestion." + ) + @classmethod def create(cls, config_dict: dict) -> "ProfilerWorkflow": """ @@ -282,6 +290,12 @@ class ProfilerWorkflow: "Sink reported warnings", self.sink.get_status() ) + def _validate_service_name(self): + """Validate service name exists in OpenMetadata""" + return self.metadata.get_by_name( + entity=DatabaseService, fqn=self.config.source.serviceName + ) + def stop(self): """ Close all connections diff --git a/ingestion/tests/unit/profiler/test_workflow.py b/ingestion/tests/unit/profiler/test_workflow.py index 8646acf8d4a..f0ab5dcc669 100644 --- a/ingestion/tests/unit/profiler/test_workflow.py +++ b/ingestion/tests/unit/profiler/test_workflow.py @@ -14,8 +14,10 @@ Validate workflow configs and filters """ import uuid from copy import deepcopy +from unittest.mock import patch import sqlalchemy as sqa +from pytest import raises from sqlalchemy.orm import declarative_base from metadata.generated.schema.api.tests.createColumnTest import CreateColumnTestRequest @@ -59,13 +61,19 @@ config = { }, } -workflow = ProfilerWorkflow.create(config) - -def test_init_workflow(): +@patch.object( + ProfilerWorkflow, + "_validate_service_name", + return_value=True, +) +def test_init_workflow(mocked_method): """ We can initialise the workflow from a config """ + workflow = ProfilerWorkflow.create(config) + mocked_method.assert_called() + assert isinstance(workflow.source_config, DatabaseServiceProfilerPipeline) assert isinstance(workflow.metadata_config, OpenMetadataConnection) @@ -76,11 +84,18 @@ def test_init_workflow(): assert workflow.processor.config.test_suite is None -def test_filter_entities(): +@patch.object( + ProfilerWorkflow, + "_validate_service_name", + return_value=True, +) +def test_filter_entities(mocked_method): """ We can properly filter entities depending on the workflow configuration """ + workflow = ProfilerWorkflow.create(config) + mocked_method.assert_called() service_name = "service" schema_reference1 = EntityReference( @@ -124,6 +139,7 @@ def test_filter_entities(): } exclude_workflow = ProfilerWorkflow.create(exclude_config) + mocked_method.assert_called() assert len(list(exclude_workflow.filter_entities(all_tables))) == 0 exclude_config = deepcopy(config) @@ -132,6 +148,7 @@ def test_filter_entities(): } exclude_workflow = ProfilerWorkflow.create(exclude_config) + mocked_method.assert_called() assert len(list(exclude_workflow.filter_entities(all_tables))) == 2 include_config = deepcopy(config) @@ -140,10 +157,16 @@ def test_filter_entities(): } include_workflow = ProfilerWorkflow.create(include_config) + mocked_method.assert_called() assert len(list(include_workflow.filter_entities(all_tables))) == 3 -def test_profile_def(): +@patch.object( + ProfilerWorkflow, + "_validate_service_name", + return_value=True, +) +def test_profile_def(mocked_method): """ Validate the definitions of the profile in the JSON """ @@ -154,6 +177,7 @@ def test_profile_def(): } profile_workflow = ProfilerWorkflow.create(profile_config) + mocked_method.assert_called() profile_workflow.create_processor( profile_workflow.config.source.serviceConnection.__root__.config ) @@ -169,13 +193,19 @@ def test_profile_def(): assert profile_workflow.processor.config.profiler == profile_definition -def test_default_profile_def(): +@patch.object( + ProfilerWorkflow, + "_validate_service_name", + return_value=True, +) +def test_default_profile_def(mocked_method): """ If no information is specified for the profiler, let's use the SimpleTableProfiler and SimpleProfiler """ profile_workflow = ProfilerWorkflow.create(config) + mocked_method.assert_called() profile_workflow.create_processor( profile_workflow.config.source.serviceConnection.__root__.config ) @@ -214,7 +244,12 @@ def test_default_profile_def(): ) -def test_tests_def(): +@patch.object( + ProfilerWorkflow, + "_validate_service_name", + return_value=True, +) +def test_tests_def(mocked_method): """ Validate the test case definition """ @@ -251,6 +286,7 @@ def test_tests_def(): } test_workflow = ProfilerWorkflow.create(test_config) + mocked_method.assert_called() test_workflow.create_processor( test_workflow.config.source.serviceConnection.__root__.config ) @@ -285,3 +321,11 @@ def test_tests_def(): ) assert suite == expected + + +def test_service_name_validation_raised(): + """Test the service name validation for the profiler + workflow is raised correctly + """ + with raises(ValueError, match="Service name `.*` does not exist"): + ProfilerWorkflow.create(config)