Added validation in profiler workflow to ensure service name exists and raise more explicit error (#6036)

Co-authored-by: Pere Miquel Brull <peremiquelbrull@gmail.com>
This commit is contained in:
Teddy 2022-07-13 14:43:48 +02:00 committed by GitHub
parent dc4579c564
commit d097199d2f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 65 additions and 7 deletions

View File

@ -29,6 +29,7 @@ from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.entity.services.databaseService import DatabaseService
from metadata.generated.schema.metadataIngestion.databaseServiceProfilerPipeline import (
DatabaseServiceProfilerPipeline,
)
@ -90,6 +91,13 @@ class ProfilerWorkflow:
# OpenMetadata client to fetch tables
self.metadata = OpenMetadata(self.metadata_config)
if not self._validate_service_name():
raise ValueError(
f"Service name `{self.config.source.serviceName}` does not exist. "
"Make sure you have run the ingestion for the service specified in the profiler workflow. "
"If so, make sure the profiler service name matches the service name specified during ingestion."
)
@classmethod
def create(cls, config_dict: dict) -> "ProfilerWorkflow":
"""
@ -282,6 +290,12 @@ class ProfilerWorkflow:
"Sink reported warnings", self.sink.get_status()
)
def _validate_service_name(self):
"""Validate service name exists in OpenMetadata"""
return self.metadata.get_by_name(
entity=DatabaseService, fqn=self.config.source.serviceName
)
def stop(self):
"""
Close all connections

View File

@ -14,8 +14,10 @@ Validate workflow configs and filters
"""
import uuid
from copy import deepcopy
from unittest.mock import patch
import sqlalchemy as sqa
from pytest import raises
from sqlalchemy.orm import declarative_base
from metadata.generated.schema.api.tests.createColumnTest import CreateColumnTestRequest
@ -59,13 +61,19 @@ config = {
},
}
workflow = ProfilerWorkflow.create(config)
def test_init_workflow():
@patch.object(
ProfilerWorkflow,
"_validate_service_name",
return_value=True,
)
def test_init_workflow(mocked_method):
"""
We can initialise the workflow from a config
"""
workflow = ProfilerWorkflow.create(config)
mocked_method.assert_called()
assert isinstance(workflow.source_config, DatabaseServiceProfilerPipeline)
assert isinstance(workflow.metadata_config, OpenMetadataConnection)
@ -76,11 +84,18 @@ def test_init_workflow():
assert workflow.processor.config.test_suite is None
def test_filter_entities():
@patch.object(
ProfilerWorkflow,
"_validate_service_name",
return_value=True,
)
def test_filter_entities(mocked_method):
"""
We can properly filter entities depending on the
workflow configuration
"""
workflow = ProfilerWorkflow.create(config)
mocked_method.assert_called()
service_name = "service"
schema_reference1 = EntityReference(
@ -124,6 +139,7 @@ def test_filter_entities():
}
exclude_workflow = ProfilerWorkflow.create(exclude_config)
mocked_method.assert_called()
assert len(list(exclude_workflow.filter_entities(all_tables))) == 0
exclude_config = deepcopy(config)
@ -132,6 +148,7 @@ def test_filter_entities():
}
exclude_workflow = ProfilerWorkflow.create(exclude_config)
mocked_method.assert_called()
assert len(list(exclude_workflow.filter_entities(all_tables))) == 2
include_config = deepcopy(config)
@ -140,10 +157,16 @@ def test_filter_entities():
}
include_workflow = ProfilerWorkflow.create(include_config)
mocked_method.assert_called()
assert len(list(include_workflow.filter_entities(all_tables))) == 3
def test_profile_def():
@patch.object(
ProfilerWorkflow,
"_validate_service_name",
return_value=True,
)
def test_profile_def(mocked_method):
"""
Validate the definitions of the profile in the JSON
"""
@ -154,6 +177,7 @@ def test_profile_def():
}
profile_workflow = ProfilerWorkflow.create(profile_config)
mocked_method.assert_called()
profile_workflow.create_processor(
profile_workflow.config.source.serviceConnection.__root__.config
)
@ -169,13 +193,19 @@ def test_profile_def():
assert profile_workflow.processor.config.profiler == profile_definition
def test_default_profile_def():
@patch.object(
ProfilerWorkflow,
"_validate_service_name",
return_value=True,
)
def test_default_profile_def(mocked_method):
"""
If no information is specified for the profiler, let's
use the SimpleTableProfiler and SimpleProfiler
"""
profile_workflow = ProfilerWorkflow.create(config)
mocked_method.assert_called()
profile_workflow.create_processor(
profile_workflow.config.source.serviceConnection.__root__.config
)
@ -214,7 +244,12 @@ def test_default_profile_def():
)
def test_tests_def():
@patch.object(
ProfilerWorkflow,
"_validate_service_name",
return_value=True,
)
def test_tests_def(mocked_method):
"""
Validate the test case definition
"""
@ -251,6 +286,7 @@ def test_tests_def():
}
test_workflow = ProfilerWorkflow.create(test_config)
mocked_method.assert_called()
test_workflow.create_processor(
test_workflow.config.source.serviceConnection.__root__.config
)
@ -285,3 +321,11 @@ def test_tests_def():
)
assert suite == expected
def test_service_name_validation_raised():
"""Test the service name validation for the profiler
workflow is raised correctly
"""
with raises(ValueError, match="Service name `.*` does not exist"):
ProfilerWorkflow.create(config)