import json from itertools import chain from typing import Dict, Optional, Tuple from unittest.mock import MagicMock, patch import pytest from confluent_kafka.schema_registry.schema_registry_client import ( RegisteredSchema, Schema, ) from freezegun import freeze_time from datahub.configuration.common import ConfigurationError from datahub.emitter.mce_builder import ( OwnerType, make_dataplatform_instance_urn, make_dataset_urn, make_dataset_urn_with_platform_instance, make_global_tag_aspect_with_tag_list, make_glossary_terms_aspect_from_urn_list, make_owner_urn, make_ownership_aspect_from_urn_list, ) from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.ingestion.source.kafka.kafka import KafkaSource, KafkaSourceConfig from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent from datahub.metadata.schema_classes import ( BrowsePathsClass, DataPlatformInstanceClass, GlobalTagsClass, GlossaryTermsClass, KafkaSchemaClass, OwnershipClass, SchemaMetadataClass, ) @pytest.fixture def mock_admin_client(): with patch( "datahub.ingestion.source.kafka.kafka.AdminClient", autospec=True ) as mock: yield mock @patch("datahub.ingestion.source.kafka.kafka.confluent_kafka.Consumer", autospec=True) def test_kafka_source_configuration(mock_kafka): ctx = PipelineContext(run_id="test") kafka_source = KafkaSource( KafkaSourceConfig.parse_obj({"connection": {"bootstrap": "foobar:9092"}}), ctx, ) kafka_source.close() assert mock_kafka.call_count == 1 @patch("datahub.ingestion.source.kafka.kafka.confluent_kafka.Consumer", autospec=True) def test_kafka_source_workunits_wildcard_topic(mock_kafka, mock_admin_client): mock_kafka_instance = mock_kafka.return_value mock_cluster_metadata = MagicMock() mock_cluster_metadata.topics = {"foobar": None, "bazbaz": None} mock_kafka_instance.list_topics.return_value = mock_cluster_metadata ctx = PipelineContext(run_id="test") kafka_source = KafkaSource( KafkaSourceConfig.parse_obj({"connection": {"bootstrap": "localhost:9092"}}), ctx, ) workunits = list(kafka_source.get_workunits()) first_mce = workunits[0].metadata assert isinstance(first_mce, MetadataChangeEvent) mock_kafka.assert_called_once() mock_kafka_instance.list_topics.assert_called_once() assert len(workunits) == 6 @patch("datahub.ingestion.source.kafka.kafka.confluent_kafka.Consumer", autospec=True) def test_kafka_source_workunits_topic_pattern(mock_kafka, mock_admin_client): mock_kafka_instance = mock_kafka.return_value mock_cluster_metadata = MagicMock() mock_cluster_metadata.topics = {"test": None, "foobar": None, "bazbaz": None} mock_kafka_instance.list_topics.return_value = mock_cluster_metadata ctx = PipelineContext(run_id="test1") kafka_source = KafkaSource.create( { "topic_patterns": {"allow": ["test"]}, "connection": {"bootstrap": "localhost:9092"}, }, ctx, ) workunits = [w for w in kafka_source.get_workunits()] mock_kafka.assert_called_once() mock_kafka_instance.list_topics.assert_called_once() assert len(workunits) == 3 mock_cluster_metadata.topics = {"test": None, "test2": None, "bazbaz": None} ctx = PipelineContext(run_id="test2") kafka_source = KafkaSource.create( { "topic_patterns": {"allow": ["test.*"]}, "connection": {"bootstrap": "localhost:9092"}, }, ctx, ) workunits = [w for w in kafka_source.get_workunits()] assert len(workunits) == 6 @patch("datahub.ingestion.source.kafka.kafka.confluent_kafka.Consumer", autospec=True) def test_kafka_source_workunits_with_platform_instance(mock_kafka, mock_admin_client): PLATFORM_INSTANCE = "kafka_cluster" PLATFORM = "kafka" TOPIC_NAME = "test" mock_kafka_instance = mock_kafka.return_value mock_cluster_metadata = MagicMock() mock_cluster_metadata.topics = {TOPIC_NAME: None} mock_kafka_instance.list_topics.return_value = mock_cluster_metadata ctx = PipelineContext(run_id="test1") kafka_source = KafkaSource.create( { "connection": {"bootstrap": "localhost:9092"}, "platform_instance": PLATFORM_INSTANCE, }, ctx, ) workunits = [w for w in kafka_source.get_workunits()] # We should only have 1 topic + sub-type wu + browse paths. assert len(workunits) == 3 assert isinstance(workunits[0], MetadataWorkUnit) assert isinstance(workunits[0].metadata, MetadataChangeEvent) proposed_snap = workunits[0].metadata.proposedSnapshot assert proposed_snap.urn == make_dataset_urn_with_platform_instance( platform=PLATFORM, name=TOPIC_NAME, platform_instance=PLATFORM_INSTANCE, env="PROD", ) # DataPlatform aspect should be present when platform_instance is configured data_platform_aspects = [ asp for asp in proposed_snap.aspects if isinstance(asp, DataPlatformInstanceClass) ] assert len(data_platform_aspects) == 1 assert data_platform_aspects[0].instance == make_dataplatform_instance_urn( PLATFORM, PLATFORM_INSTANCE ) # The default browse path should include the platform_instance value browse_path_aspects = [ asp for asp in proposed_snap.aspects if isinstance(asp, BrowsePathsClass) ] assert len(browse_path_aspects) == 1 assert f"/prod/{PLATFORM}/{PLATFORM_INSTANCE}" in browse_path_aspects[0].paths @patch("datahub.ingestion.source.kafka.kafka.confluent_kafka.Consumer", autospec=True) def test_kafka_source_workunits_no_platform_instance(mock_kafka, mock_admin_client): PLATFORM = "kafka" TOPIC_NAME = "test" mock_kafka_instance = mock_kafka.return_value mock_cluster_metadata = MagicMock() mock_cluster_metadata.topics = {TOPIC_NAME: None} mock_kafka_instance.list_topics.return_value = mock_cluster_metadata ctx = PipelineContext(run_id="test1") kafka_source = KafkaSource.create( {"connection": {"bootstrap": "localhost:9092"}}, ctx, ) workunits = [w for w in kafka_source.get_workunits()] # We should only have 1 topic + sub-type wu + browse paths. assert len(workunits) == 3 assert isinstance(workunits[0], MetadataWorkUnit) assert isinstance(workunits[0].metadata, MetadataChangeEvent) proposed_snap = workunits[0].metadata.proposedSnapshot assert proposed_snap.urn == make_dataset_urn( platform=PLATFORM, name=TOPIC_NAME, env="PROD", ) # DataPlatform aspect should not be present when platform_instance is not configured data_platform_aspects = [ asp for asp in proposed_snap.aspects if isinstance(asp, DataPlatformInstanceClass) ] assert len(data_platform_aspects) == 0 # The default browse path should include the platform_instance value browse_path_aspects = [ asp for asp in proposed_snap.aspects if isinstance(asp, BrowsePathsClass) ] assert len(browse_path_aspects) == 1 assert f"/prod/{PLATFORM}" in browse_path_aspects[0].paths @patch("datahub.ingestion.source.kafka.kafka.confluent_kafka.Consumer", autospec=True) def test_close(mock_kafka, mock_admin_client): mock_kafka_instance = mock_kafka.return_value ctx = PipelineContext(run_id="test") kafka_source = KafkaSource.create( { "topic_patterns": {"allow": ["test.*"]}, "connection": {"bootstrap": "localhost:9092"}, }, ctx, ) kafka_source.close() assert mock_kafka_instance.close.call_count == 1 @patch( "datahub.ingestion.source.confluent_schema_registry.SchemaRegistryClient", autospec=True, ) @patch("datahub.ingestion.source.kafka.kafka.confluent_kafka.Consumer", autospec=True) def test_kafka_source_workunits_schema_registry_subject_name_strategies( mock_kafka_consumer, mock_schema_registry_client, mock_admin_client ): # Setup the topic to key/value schema mappings for all types of schema registry subject name strategies. # ,) topic_subject_schema_map: Dict[str, Tuple[RegisteredSchema, RegisteredSchema]] = { # TopicNameStrategy is used for subject "topic1": ( RegisteredSchema( schema_id="schema_id_2", schema=Schema( schema_str='{"type":"record", "name":"Topic1Key", "namespace": "test.acryl", "fields": [{"name":"t1key", "type": "string"}]}', schema_type="AVRO", ), subject="topic1-key", version=1, ), RegisteredSchema( schema_id="schema_id_1", schema=Schema( schema_str='{"type":"record", "name":"Topic1Value", "namespace": "test.acryl", "fields": [{"name":"t1value", "type": "string"}]}', schema_type="AVRO", ), subject="topic1-value", version=1, ), ), # RecordNameStrategy is used for subject "topic2": ( RegisteredSchema( schema_id="schema_id_3", schema=Schema( schema_str='{"type":"record", "name":"Topic2Key", "namespace": "test.acryl", "fields": [{"name":"t2key", "type": "string"}]}', schema_type="AVRO", ), subject="test.acryl.Topic2Key", version=1, ), RegisteredSchema( schema_id="schema_id_4", schema=Schema( schema_str='{"type":"record", "name":"Topic2Value", "namespace": "test.acryl", "fields": [{"name":"t2value", "type": "string"}]}', schema_type="AVRO", ), subject="test.acryl.Topic2Value", version=1, ), ), # TopicRecordNameStrategy is used for subject "topic3": ( RegisteredSchema( schema_id="schema_id_4", schema=Schema( schema_str='{"type":"record", "name":"Topic3Key", "namespace": "test.acryl", "fields": [{"name":"t3key", "type": "string"}]}', schema_type="AVRO", ), subject="topic3-test.acryl.Topic3Key-key", version=1, ), RegisteredSchema( schema_id="schema_id_5", schema=Schema( schema_str='{"type":"record", "name":"Topic3Value", "namespace": "test.acryl", "fields": [{"name":"t3value", "type": "string"}]}', schema_type="AVRO", ), subject="topic3-test.acryl.Topic3Value-value", version=1, ), ), } # Mock the kafka consumer mock_kafka_instance = mock_kafka_consumer.return_value mock_cluster_metadata = MagicMock() mock_cluster_metadata.topics = {k: None for k in topic_subject_schema_map} mock_cluster_metadata.topics["schema_less_topic"] = None mock_kafka_instance.list_topics.return_value = mock_cluster_metadata # Mock the schema registry client # - mock get_subjects: all subjects in topic_subject_schema_map mock_schema_registry_client.return_value.get_subjects.return_value = [ v.subject for v in chain(*topic_subject_schema_map.values()) ] # - mock get_latest_version def mock_get_latest_version(subject_name: str) -> Optional[RegisteredSchema]: for registered_schema in chain(*topic_subject_schema_map.values()): if registered_schema.subject == subject_name: return registered_schema return None mock_schema_registry_client.return_value.get_latest_version = ( mock_get_latest_version ) # Test the kafka source source_config = { "connection": {"bootstrap": "localhost:9092"}, # Setup the topic_subject_map for topic2 which uses RecordNameStrategy "topic_subject_map": { "topic2-key": "test.acryl.Topic2Key", "topic2-value": "test.acryl.Topic2Value", }, "ingest_schemas_as_entities": True, } ctx = PipelineContext(run_id="test") kafka_source = KafkaSource.create(source_config, ctx) workunits = list(kafka_source.get_workunits()) mock_kafka_consumer.assert_called_once() mock_kafka_instance.list_topics.assert_called_once() # Along with with 4 topics (3 with schema and 1 schemaless) which constitutes to 8 workunits, # there will be 6 schemas (1 key and 1 value schema for 3 topics) which constitutes to 12 workunits # and there will be 10 browse paths workunits assert len(workunits) == 30 i: int = -1 for wu in workunits: assert isinstance(wu, MetadataWorkUnit) if not isinstance(wu.metadata, MetadataChangeEvent): continue mce: MetadataChangeEvent = wu.metadata i += 1 # Only topic (named schema_less_topic) does not have schema metadata but other workunits (that are created # for schema) will have corresponding SchemaMetadata aspect if i < len(topic_subject_schema_map.keys()): # First 3 workunits (topics) must have schemaMetadata aspect assert isinstance(mce.proposedSnapshot.aspects[1], SchemaMetadataClass) schemaMetadataAspect: SchemaMetadataClass = mce.proposedSnapshot.aspects[1] assert isinstance(schemaMetadataAspect.platformSchema, KafkaSchemaClass) # Make sure the schema name is present in topic_subject_schema_map. assert schemaMetadataAspect.schemaName in topic_subject_schema_map # Make sure the schema_str matches for the key schema. assert ( schemaMetadataAspect.platformSchema.keySchema == topic_subject_schema_map[schemaMetadataAspect.schemaName][ 0 ].schema.schema_str ) # Make sure the schema_type matches for the key schema. assert ( schemaMetadataAspect.platformSchema.keySchemaType == topic_subject_schema_map[schemaMetadataAspect.schemaName][ 0 ].schema.schema_type ) # Make sure the schema_str matches for the value schema. assert ( schemaMetadataAspect.platformSchema.documentSchema == topic_subject_schema_map[schemaMetadataAspect.schemaName][ 1 ].schema.schema_str ) # Make sure the schema_type matches for the value schema. assert ( schemaMetadataAspect.platformSchema.documentSchemaType == topic_subject_schema_map[schemaMetadataAspect.schemaName][ 1 ].schema.schema_type ) # Make sure we have 2 fields, one from the key schema & one from the value schema. assert len(schemaMetadataAspect.fields) == 2 elif i == len(topic_subject_schema_map.keys()): # Last topic('schema_less_topic') has no schema defined in the registry. # The schemaMetadata aspect should not be present for this. for aspect in mce.proposedSnapshot.aspects: assert not isinstance(aspect, SchemaMetadataClass) else: # Last 2 workunits (schemas) must have schemaMetadata aspect assert isinstance(mce.proposedSnapshot.aspects[1], SchemaMetadataClass) schemaMetadataAspectObj: SchemaMetadataClass = mce.proposedSnapshot.aspects[ 1 ] assert isinstance(schemaMetadataAspectObj.platformSchema, KafkaSchemaClass) @pytest.mark.parametrize( "ignore_warnings_on_schema_type", [ pytest.param( False, id="ignore_warnings_on_schema_type-FALSE", ), pytest.param( True, id="ignore_warnings_on_schema_type-TRUE", ), ], ) @patch( "datahub.ingestion.source.confluent_schema_registry.SchemaRegistryClient", autospec=True, ) @patch("datahub.ingestion.source.kafka.kafka.confluent_kafka.Consumer", autospec=True) def test_kafka_ignore_warnings_on_schema_type( mock_kafka_consumer, mock_schema_registry_client, mock_admin_client, ignore_warnings_on_schema_type, ): # define the key and value schemas for topic1 topic1_key_schema = RegisteredSchema( schema_id="schema_id_2", schema=Schema( schema_str="{}", schema_type="UNKNOWN_TYPE", ), subject="topic1-key", version=1, ) topic1_value_schema = RegisteredSchema( schema_id="schema_id_1", schema=Schema( schema_str="{}", schema_type="UNKNOWN_TYPE", ), subject="topic1-value", version=1, ) # Mock the kafka consumer mock_kafka_instance = mock_kafka_consumer.return_value mock_cluster_metadata = MagicMock() mock_cluster_metadata.topics = {"topic1": None} mock_kafka_instance.list_topics.return_value = mock_cluster_metadata # Mock the schema registry client mock_schema_registry_client.return_value.get_subjects.return_value = [ "topic1-key", "topic1-value", ] # - mock get_latest_version def mock_get_latest_version(subject_name: str) -> Optional[RegisteredSchema]: if subject_name == "topic1-key": return topic1_key_schema elif subject_name == "topic1-value": return topic1_value_schema return None mock_schema_registry_client.return_value.get_latest_version = ( mock_get_latest_version ) # Test the kafka source source_config = { "connection": {"bootstrap": "localhost:9092"}, "ignore_warnings_on_schema_type": f"{ignore_warnings_on_schema_type}", } ctx = PipelineContext(run_id="test") kafka_source = KafkaSource.create(source_config, ctx) workunits = list(kafka_source.get_workunits()) assert len(workunits) == 3 if ignore_warnings_on_schema_type: assert not kafka_source.report.warnings else: assert kafka_source.report.warnings @patch("datahub.ingestion.source.kafka.kafka.AdminClient", autospec=True) @patch("datahub.ingestion.source.kafka.kafka.confluent_kafka.Consumer", autospec=True) def test_kafka_source_succeeds_with_admin_client_init_error( mock_kafka, mock_kafka_admin_client ): mock_kafka_instance = mock_kafka.return_value mock_cluster_metadata = MagicMock() mock_cluster_metadata.topics = {"test": None, "foobar": None, "bazbaz": None} mock_kafka_instance.list_topics.return_value = mock_cluster_metadata mock_kafka_admin_client.side_effect = Exception() ctx = PipelineContext(run_id="test1") kafka_source = KafkaSource.create( { "topic_patterns": {"allow": ["test"]}, "connection": {"bootstrap": "localhost:9092"}, }, ctx, ) workunits = [w for w in kafka_source.get_workunits()] mock_kafka.assert_called_once() mock_kafka_instance.list_topics.assert_called_once() mock_kafka_admin_client.assert_called_once() assert len(workunits) == 3 @patch("datahub.ingestion.source.kafka.kafka.AdminClient", autospec=True) @patch("datahub.ingestion.source.kafka.kafka.confluent_kafka.Consumer", autospec=True) def test_kafka_source_succeeds_with_describe_configs_error( mock_kafka, mock_kafka_admin_client ): mock_kafka_instance = mock_kafka.return_value mock_cluster_metadata = MagicMock() mock_cluster_metadata.topics = {"test": None, "foobar": None, "bazbaz": None} mock_kafka_instance.list_topics.return_value = mock_cluster_metadata mock_admin_client_instance = mock_kafka_admin_client.return_value mock_admin_client_instance.describe_configs.side_effect = Exception() ctx = PipelineContext(run_id="test1") kafka_source = KafkaSource.create( { "topic_patterns": {"allow": ["test"]}, "connection": {"bootstrap": "localhost:9092"}, }, ctx, ) workunits = [w for w in kafka_source.get_workunits()] mock_kafka.assert_called_once() mock_kafka_instance.list_topics.assert_called_once() mock_kafka_admin_client.assert_called_once() mock_admin_client_instance.describe_configs.assert_called_once() assert len(workunits) == 3 @freeze_time("2023-09-20 10:00:00") @patch( "datahub.ingestion.source.confluent_schema_registry.SchemaRegistryClient", autospec=True, ) @patch("datahub.ingestion.source.kafka.kafka.confluent_kafka.Consumer", autospec=True) def test_kafka_source_topic_meta_mappings( mock_kafka_consumer, mock_schema_registry_client, mock_admin_client ): # Setup the topic to key/value schema mappings for all types of schema registry subject name strategies. # ,) topic_subject_schema_map: Dict[str, Tuple[RegisteredSchema, RegisteredSchema]] = { "topic1": ( RegisteredSchema( schema_id="schema_id_2", schema=Schema( schema_str='{"type":"record", "name":"Topic1Key", "namespace": "test.acryl", "fields": [{"name":"t1key", "type": "string"}]}', schema_type="AVRO", ), subject="topic1-key", version=1, ), RegisteredSchema( schema_id="schema_id_1", schema=Schema( schema_str=json.dumps( { "type": "record", "name": "Topic1Value", "namespace": "test.acryl", "fields": [{"name": "t1value", "type": "string"}], "owner": "@charles", "business_owner": "jdoe.last@gmail.com", "data_governance.team_owner": "Finance", "has_pii": True, "int_property": 1, "double_property": 2.5, } ), schema_type="AVRO", ), subject="topic1-value", version=1, ), ) } # Mock the kafka consumer mock_kafka_instance = mock_kafka_consumer.return_value mock_cluster_metadata = MagicMock() mock_cluster_metadata.topics = {k: None for k in topic_subject_schema_map} mock_kafka_instance.list_topics.return_value = mock_cluster_metadata # Mock the schema registry client # - mock get_subjects: all subjects in topic_subject_schema_map mock_schema_registry_client.return_value.get_subjects.return_value = [ v.subject for v in chain(*topic_subject_schema_map.values()) ] # - mock get_latest_version def mock_get_latest_version(subject_name: str) -> Optional[RegisteredSchema]: for registered_schema in chain(*topic_subject_schema_map.values()): if registered_schema.subject == subject_name: return registered_schema return None mock_schema_registry_client.return_value.get_latest_version = ( mock_get_latest_version ) ctx = PipelineContext(run_id="test1") kafka_source = KafkaSource.create( { "connection": {"bootstrap": "localhost:9092"}, "ingest_schemas_as_entities": True, "meta_mapping": { "owner": { "match": "^@(.*)", "operation": "add_owner", "config": {"owner_type": "user"}, }, "business_owner": { "match": ".*", "operation": "add_owner", "config": {"owner_type": "user"}, }, "has_pii": { "match": True, "operation": "add_tag", "config": {"tag": "has_pii_test"}, }, "int_property": { "match": 1, "operation": "add_tag", "config": {"tag": "int_meta_property"}, }, "double_property": { "match": 2.5, "operation": "add_term", "config": {"term": "double_meta_property"}, }, "data_governance.team_owner": { "match": "Finance", "operation": "add_term", "config": {"term": "Finance_test"}, }, }, }, ctx, ) # Along with with 1 topics (and 5 meta mapping) it constitutes to 6 workunits, # there will be 2 schemas which constitutes to 4 workunits (1 mce and 1 mcp each) workunits = [w for w in kafka_source.get_workunits()] assert len(workunits) == 13 # workunit[0] - DatasetSnapshot mce = workunits[0].metadata assert isinstance(mce, MetadataChangeEvent) ownership_aspect = [ asp for asp in mce.proposedSnapshot.aspects if isinstance(asp, OwnershipClass) ][0] assert ownership_aspect == make_ownership_aspect_from_urn_list( [ make_owner_urn("charles", OwnerType.USER), make_owner_urn("jdoe.last@gmail.com", OwnerType.USER), ], "SERVICE", ) tags_aspect = [ asp for asp in mce.proposedSnapshot.aspects if isinstance(asp, GlobalTagsClass) ][0] assert tags_aspect == make_global_tag_aspect_with_tag_list( ["has_pii_test", "int_meta_property"] ) terms_aspect = [ asp for asp in mce.proposedSnapshot.aspects if isinstance(asp, GlossaryTermsClass) ][0] assert terms_aspect == make_glossary_terms_aspect_from_urn_list( [ "urn:li:glossaryTerm:Finance_test", "urn:li:glossaryTerm:double_meta_property", ] ) # workunit[1] - subtypes assert isinstance(workunits[1].metadata, MetadataChangeProposalWrapper) assert workunits[1].metadata.aspectName == "subTypes" # workunit[2] - browse paths assert isinstance(workunits[2].metadata, MetadataChangeProposalWrapper) assert workunits[2].metadata.aspectName == "browsePathsV2" # workunit[3] - DatasetSnapshot mce = workunits[3].metadata assert isinstance(mce, MetadataChangeEvent) # workunit[4] - subtypes assert isinstance(workunits[4].metadata, MetadataChangeProposalWrapper) assert workunits[4].metadata.aspectName == "subTypes" # workunit[5] - browse paths assert isinstance(workunits[5].metadata, MetadataChangeProposalWrapper) assert workunits[5].metadata.aspectName == "browsePathsV2" # workunit[6] - DatasetSnapshot mce = workunits[6].metadata assert isinstance(mce, MetadataChangeEvent) ownership_aspect = [ asp for asp in mce.proposedSnapshot.aspects if isinstance(asp, OwnershipClass) ][0] assert ownership_aspect == make_ownership_aspect_from_urn_list( [ make_owner_urn("charles", OwnerType.USER), make_owner_urn("jdoe.last@gmail.com", OwnerType.USER), ], "SERVICE", ) tags_aspect = [ asp for asp in mce.proposedSnapshot.aspects if isinstance(asp, GlobalTagsClass) ][0] assert tags_aspect == make_global_tag_aspect_with_tag_list( ["has_pii_test", "int_meta_property"] ) terms_aspect = [ asp for asp in mce.proposedSnapshot.aspects if isinstance(asp, GlossaryTermsClass) ][0] assert terms_aspect == make_glossary_terms_aspect_from_urn_list( [ "urn:li:glossaryTerm:Finance_test", "urn:li:glossaryTerm:double_meta_property", ] ) # workunit[7] - subtypes assert isinstance(workunits[7].metadata, MetadataChangeProposalWrapper) assert workunits[7].metadata.aspectName == "subTypes" # workunit[8] - browse paths assert isinstance(workunits[8].metadata, MetadataChangeProposalWrapper) assert workunits[8].metadata.aspectName == "browsePathsV2" # workunit[9] - glossary terms assert isinstance(workunits[9].metadata, MetadataChangeProposalWrapper) assert workunits[9].metadata.aspectName == "glossaryTermKey" # workunit[10] - glossary terms assert isinstance(workunits[10].metadata, MetadataChangeProposalWrapper) assert workunits[10].metadata.aspectName == "glossaryTermKey" # workunit[11] - tags assert isinstance(workunits[11].metadata, MetadataChangeProposalWrapper) assert workunits[11].metadata.aspectName == "tagKey" # workunit[12] - tags assert isinstance(workunits[12].metadata, MetadataChangeProposalWrapper) assert workunits[12].metadata.aspectName == "tagKey" def test_kafka_source_oauth_cb_configuration(): with pytest.raises( ConfigurationError, match=( "oauth_cb must be a string representing python function reference " "in the format :." ), ): KafkaSourceConfig.parse_obj( { "connection": { "bootstrap": "foobar:9092", "consumer_config": { "oauth_cb": test_kafka_ignore_warnings_on_schema_type }, } } )