diff --git a/metadata-ingestion/src/datahub/configuration/kafka_consumer_config.py b/metadata-ingestion/src/datahub/configuration/kafka_consumer_config.py index d3ff5998d3..cac6bb4996 100644 --- a/metadata-ingestion/src/datahub/configuration/kafka_consumer_config.py +++ b/metadata-ingestion/src/datahub/configuration/kafka_consumer_config.py @@ -30,6 +30,9 @@ class CallableConsumerConfig: call_back = self.get_call_back_attribute() - assert call_back # to silent lint + assert isinstance(call_back, str), ( + "oauth_cb must be a string representing python function reference " + "in the format :." + ) # Set the callback self._config[CallableConsumerConfig.CALLBACK_ATTRIBUTE] = import_path(call_back) diff --git a/metadata-ingestion/tests/unit/test_kafka_source.py b/metadata-ingestion/tests/unit/test_kafka_source.py index dfd32085b7..cab0a2bce7 100644 --- a/metadata-ingestion/tests/unit/test_kafka_source.py +++ b/metadata-ingestion/tests/unit/test_kafka_source.py @@ -10,6 +10,7 @@ from confluent_kafka.schema_registry.schema_registry_client import ( ) from freezegun import freeze_time +from datahub.configuration.common import ConfigurationError from datahub.emitter.mce_builder import ( OwnerType, make_dataplatform_instance_urn, @@ -738,3 +739,23 @@ def test_kafka_source_topic_meta_mappings( assert workunits[7].metadata.aspectName == "glossaryTermKey" assert workunits[8].metadata.aspectName == "tagKey" assert workunits[9].metadata.aspectName == "tagKey" + + +def test_kafka_source_oauth_cb_configuration(): + with pytest.raises( + ConfigurationError, + match=( + "oauth_cb must be a string representing python function reference " + "in the format :." + ), + ): + KafkaSourceConfig.parse_obj( + { + "connection": { + "bootstrap": "foobar:9092", + "consumer_config": { + "oauth_cb": test_kafka_ignore_warnings_on_schema_type + }, + } + } + )