feat(ingest/kafka): improve error handling of oauth_cb config (#11929)

This commit is contained in:
Mayuri Nehate 2024-11-25 10:31:35 +05:30 committed by GitHub
parent dd892dfbb1
commit b5fb691f0d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 25 additions and 1 deletions

View File

@ -30,6 +30,9 @@ class CallableConsumerConfig:
call_back = self.get_call_back_attribute()
assert call_back # to silent lint
assert isinstance(call_back, str), (
"oauth_cb must be a string representing python function reference "
"in the format <python-module>:<function-name>."
)
# Set the callback
self._config[CallableConsumerConfig.CALLBACK_ATTRIBUTE] = import_path(call_back)

View File

@ -10,6 +10,7 @@ from confluent_kafka.schema_registry.schema_registry_client import (
)
from freezegun import freeze_time
from datahub.configuration.common import ConfigurationError
from datahub.emitter.mce_builder import (
OwnerType,
make_dataplatform_instance_urn,
@ -738,3 +739,23 @@ def test_kafka_source_topic_meta_mappings(
assert workunits[7].metadata.aspectName == "glossaryTermKey"
assert workunits[8].metadata.aspectName == "tagKey"
assert workunits[9].metadata.aspectName == "tagKey"
def test_kafka_source_oauth_cb_configuration():
with pytest.raises(
ConfigurationError,
match=(
"oauth_cb must be a string representing python function reference "
"in the format <python-module>:<function-name>."
),
):
KafkaSourceConfig.parse_obj(
{
"connection": {
"bootstrap": "foobar:9092",
"consumer_config": {
"oauth_cb": test_kafka_ignore_warnings_on_schema_type
},
}
}
)