mirror of
				https://github.com/datahub-project/datahub.git
				synced 2025-11-04 12:51:23 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			820 lines
		
	
	
		
			30 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			820 lines
		
	
	
		
			30 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
import json
 | 
						|
from itertools import chain
 | 
						|
from typing import Dict, Optional, Tuple
 | 
						|
from unittest.mock import MagicMock, patch
 | 
						|
 | 
						|
import pytest
 | 
						|
from confluent_kafka.schema_registry.schema_registry_client import (
 | 
						|
    RegisteredSchema,
 | 
						|
    Schema,
 | 
						|
)
 | 
						|
from freezegun import freeze_time
 | 
						|
 | 
						|
from datahub.configuration.common import ConfigurationError
 | 
						|
from datahub.emitter.mce_builder import (
 | 
						|
    OwnerType,
 | 
						|
    make_dataplatform_instance_urn,
 | 
						|
    make_dataset_urn,
 | 
						|
    make_dataset_urn_with_platform_instance,
 | 
						|
    make_global_tag_aspect_with_tag_list,
 | 
						|
    make_glossary_terms_aspect_from_urn_list,
 | 
						|
    make_owner_urn,
 | 
						|
    make_ownership_aspect_from_urn_list,
 | 
						|
)
 | 
						|
from datahub.emitter.mcp import MetadataChangeProposalWrapper
 | 
						|
from datahub.ingestion.api.common import PipelineContext
 | 
						|
from datahub.ingestion.api.workunit import MetadataWorkUnit
 | 
						|
from datahub.ingestion.source.kafka.kafka import KafkaSource, KafkaSourceConfig
 | 
						|
from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
 | 
						|
from datahub.metadata.schema_classes import (
 | 
						|
    BrowsePathsClass,
 | 
						|
    DataPlatformInstanceClass,
 | 
						|
    GlobalTagsClass,
 | 
						|
    GlossaryTermsClass,
 | 
						|
    KafkaSchemaClass,
 | 
						|
    OwnershipClass,
 | 
						|
    SchemaMetadataClass,
 | 
						|
)
 | 
						|
 | 
						|
 | 
						|
@pytest.fixture
 | 
						|
def mock_admin_client():
 | 
						|
    with patch(
 | 
						|
        "datahub.ingestion.source.kafka.kafka.AdminClient", autospec=True
 | 
						|
    ) as mock:
 | 
						|
        yield mock
 | 
						|
 | 
						|
 | 
						|
@patch("datahub.ingestion.source.kafka.kafka.confluent_kafka.Consumer", autospec=True)
 | 
						|
def test_kafka_source_configuration(mock_kafka):
 | 
						|
    ctx = PipelineContext(run_id="test")
 | 
						|
    kafka_source = KafkaSource(
 | 
						|
        KafkaSourceConfig.parse_obj({"connection": {"bootstrap": "foobar:9092"}}),
 | 
						|
        ctx,
 | 
						|
    )
 | 
						|
    kafka_source.close()
 | 
						|
    assert mock_kafka.call_count == 1
 | 
						|
 | 
						|
 | 
						|
@patch("datahub.ingestion.source.kafka.kafka.confluent_kafka.Consumer", autospec=True)
 | 
						|
def test_kafka_source_workunits_wildcard_topic(mock_kafka, mock_admin_client):
 | 
						|
    mock_kafka_instance = mock_kafka.return_value
 | 
						|
    mock_cluster_metadata = MagicMock()
 | 
						|
    mock_cluster_metadata.topics = {"foobar": None, "bazbaz": None}
 | 
						|
    mock_kafka_instance.list_topics.return_value = mock_cluster_metadata
 | 
						|
 | 
						|
    ctx = PipelineContext(run_id="test")
 | 
						|
    kafka_source = KafkaSource(
 | 
						|
        KafkaSourceConfig.parse_obj({"connection": {"bootstrap": "localhost:9092"}}),
 | 
						|
        ctx,
 | 
						|
    )
 | 
						|
    workunits = list(kafka_source.get_workunits())
 | 
						|
 | 
						|
    first_mce = workunits[0].metadata
 | 
						|
    assert isinstance(first_mce, MetadataChangeEvent)
 | 
						|
    mock_kafka.assert_called_once()
 | 
						|
    mock_kafka_instance.list_topics.assert_called_once()
 | 
						|
    assert len(workunits) == 6
 | 
						|
 | 
						|
 | 
						|
@patch("datahub.ingestion.source.kafka.kafka.confluent_kafka.Consumer", autospec=True)
 | 
						|
def test_kafka_source_workunits_topic_pattern(mock_kafka, mock_admin_client):
 | 
						|
    mock_kafka_instance = mock_kafka.return_value
 | 
						|
    mock_cluster_metadata = MagicMock()
 | 
						|
    mock_cluster_metadata.topics = {"test": None, "foobar": None, "bazbaz": None}
 | 
						|
    mock_kafka_instance.list_topics.return_value = mock_cluster_metadata
 | 
						|
 | 
						|
    ctx = PipelineContext(run_id="test1")
 | 
						|
    kafka_source = KafkaSource.create(
 | 
						|
        {
 | 
						|
            "topic_patterns": {"allow": ["test"]},
 | 
						|
            "connection": {"bootstrap": "localhost:9092"},
 | 
						|
        },
 | 
						|
        ctx,
 | 
						|
    )
 | 
						|
    workunits = [w for w in kafka_source.get_workunits()]
 | 
						|
 | 
						|
    mock_kafka.assert_called_once()
 | 
						|
    mock_kafka_instance.list_topics.assert_called_once()
 | 
						|
    assert len(workunits) == 3
 | 
						|
 | 
						|
    mock_cluster_metadata.topics = {"test": None, "test2": None, "bazbaz": None}
 | 
						|
    ctx = PipelineContext(run_id="test2")
 | 
						|
    kafka_source = KafkaSource.create(
 | 
						|
        {
 | 
						|
            "topic_patterns": {"allow": ["test.*"]},
 | 
						|
            "connection": {"bootstrap": "localhost:9092"},
 | 
						|
        },
 | 
						|
        ctx,
 | 
						|
    )
 | 
						|
    workunits = [w for w in kafka_source.get_workunits()]
 | 
						|
    assert len(workunits) == 6
 | 
						|
 | 
						|
 | 
						|
@patch("datahub.ingestion.source.kafka.kafka.confluent_kafka.Consumer", autospec=True)
 | 
						|
def test_kafka_source_workunits_with_platform_instance(mock_kafka, mock_admin_client):
 | 
						|
    PLATFORM_INSTANCE = "kafka_cluster"
 | 
						|
    PLATFORM = "kafka"
 | 
						|
    TOPIC_NAME = "test"
 | 
						|
 | 
						|
    mock_kafka_instance = mock_kafka.return_value
 | 
						|
    mock_cluster_metadata = MagicMock()
 | 
						|
    mock_cluster_metadata.topics = {TOPIC_NAME: None}
 | 
						|
    mock_kafka_instance.list_topics.return_value = mock_cluster_metadata
 | 
						|
 | 
						|
    ctx = PipelineContext(run_id="test1")
 | 
						|
    kafka_source = KafkaSource.create(
 | 
						|
        {
 | 
						|
            "connection": {"bootstrap": "localhost:9092"},
 | 
						|
            "platform_instance": PLATFORM_INSTANCE,
 | 
						|
        },
 | 
						|
        ctx,
 | 
						|
    )
 | 
						|
    workunits = [w for w in kafka_source.get_workunits()]
 | 
						|
 | 
						|
    # We should only have 1 topic + sub-type wu + browse paths.
 | 
						|
    assert len(workunits) == 3
 | 
						|
    assert isinstance(workunits[0], MetadataWorkUnit)
 | 
						|
    assert isinstance(workunits[0].metadata, MetadataChangeEvent)
 | 
						|
    proposed_snap = workunits[0].metadata.proposedSnapshot
 | 
						|
    assert proposed_snap.urn == make_dataset_urn_with_platform_instance(
 | 
						|
        platform=PLATFORM,
 | 
						|
        name=TOPIC_NAME,
 | 
						|
        platform_instance=PLATFORM_INSTANCE,
 | 
						|
        env="PROD",
 | 
						|
    )
 | 
						|
 | 
						|
    # DataPlatform aspect should be present when platform_instance is configured
 | 
						|
    data_platform_aspects = [
 | 
						|
        asp
 | 
						|
        for asp in proposed_snap.aspects
 | 
						|
        if isinstance(asp, DataPlatformInstanceClass)
 | 
						|
    ]
 | 
						|
    assert len(data_platform_aspects) == 1
 | 
						|
    assert data_platform_aspects[0].instance == make_dataplatform_instance_urn(
 | 
						|
        PLATFORM, PLATFORM_INSTANCE
 | 
						|
    )
 | 
						|
 | 
						|
    # The default browse path should include the platform_instance value
 | 
						|
    browse_path_aspects = [
 | 
						|
        asp for asp in proposed_snap.aspects if isinstance(asp, BrowsePathsClass)
 | 
						|
    ]
 | 
						|
    assert len(browse_path_aspects) == 1
 | 
						|
    assert f"/prod/{PLATFORM}/{PLATFORM_INSTANCE}" in browse_path_aspects[0].paths
 | 
						|
 | 
						|
 | 
						|
@patch("datahub.ingestion.source.kafka.kafka.confluent_kafka.Consumer", autospec=True)
 | 
						|
def test_kafka_source_workunits_no_platform_instance(mock_kafka, mock_admin_client):
 | 
						|
    PLATFORM = "kafka"
 | 
						|
    TOPIC_NAME = "test"
 | 
						|
 | 
						|
    mock_kafka_instance = mock_kafka.return_value
 | 
						|
    mock_cluster_metadata = MagicMock()
 | 
						|
    mock_cluster_metadata.topics = {TOPIC_NAME: None}
 | 
						|
    mock_kafka_instance.list_topics.return_value = mock_cluster_metadata
 | 
						|
 | 
						|
    ctx = PipelineContext(run_id="test1")
 | 
						|
    kafka_source = KafkaSource.create(
 | 
						|
        {"connection": {"bootstrap": "localhost:9092"}},
 | 
						|
        ctx,
 | 
						|
    )
 | 
						|
    workunits = [w for w in kafka_source.get_workunits()]
 | 
						|
 | 
						|
    # We should only have 1 topic + sub-type wu + browse paths.
 | 
						|
    assert len(workunits) == 3
 | 
						|
    assert isinstance(workunits[0], MetadataWorkUnit)
 | 
						|
    assert isinstance(workunits[0].metadata, MetadataChangeEvent)
 | 
						|
    proposed_snap = workunits[0].metadata.proposedSnapshot
 | 
						|
    assert proposed_snap.urn == make_dataset_urn(
 | 
						|
        platform=PLATFORM,
 | 
						|
        name=TOPIC_NAME,
 | 
						|
        env="PROD",
 | 
						|
    )
 | 
						|
 | 
						|
    # DataPlatform aspect should not be present when platform_instance is not configured
 | 
						|
    data_platform_aspects = [
 | 
						|
        asp
 | 
						|
        for asp in proposed_snap.aspects
 | 
						|
        if isinstance(asp, DataPlatformInstanceClass)
 | 
						|
    ]
 | 
						|
    assert len(data_platform_aspects) == 0
 | 
						|
 | 
						|
    # The default browse path should include the platform_instance value
 | 
						|
    browse_path_aspects = [
 | 
						|
        asp for asp in proposed_snap.aspects if isinstance(asp, BrowsePathsClass)
 | 
						|
    ]
 | 
						|
    assert len(browse_path_aspects) == 1
 | 
						|
    assert f"/prod/{PLATFORM}" in browse_path_aspects[0].paths
 | 
						|
 | 
						|
 | 
						|
@patch("datahub.ingestion.source.kafka.kafka.confluent_kafka.Consumer", autospec=True)
 | 
						|
def test_close(mock_kafka, mock_admin_client):
 | 
						|
    mock_kafka_instance = mock_kafka.return_value
 | 
						|
    ctx = PipelineContext(run_id="test")
 | 
						|
    kafka_source = KafkaSource.create(
 | 
						|
        {
 | 
						|
            "topic_patterns": {"allow": ["test.*"]},
 | 
						|
            "connection": {"bootstrap": "localhost:9092"},
 | 
						|
        },
 | 
						|
        ctx,
 | 
						|
    )
 | 
						|
    kafka_source.close()
 | 
						|
    assert mock_kafka_instance.close.call_count == 1
 | 
						|
 | 
						|
 | 
						|
@patch(
 | 
						|
    "datahub.ingestion.source.confluent_schema_registry.SchemaRegistryClient",
 | 
						|
    autospec=True,
 | 
						|
)
 | 
						|
@patch("datahub.ingestion.source.kafka.kafka.confluent_kafka.Consumer", autospec=True)
 | 
						|
def test_kafka_source_workunits_schema_registry_subject_name_strategies(
 | 
						|
    mock_kafka_consumer, mock_schema_registry_client, mock_admin_client
 | 
						|
):
 | 
						|
    # Setup the topic to key/value schema mappings for all types of schema registry subject name strategies.
 | 
						|
    # <key=topic_name, value=(<key_schema>,<value_schema>)
 | 
						|
    topic_subject_schema_map: Dict[str, Tuple[RegisteredSchema, RegisteredSchema]] = {
 | 
						|
        # TopicNameStrategy is used for subject
 | 
						|
        "topic1": (
 | 
						|
            RegisteredSchema(
 | 
						|
                guid=None,
 | 
						|
                schema_id="schema_id_2",
 | 
						|
                schema=Schema(
 | 
						|
                    schema_str='{"type":"record", "name":"Topic1Key", "namespace": "test.acryl", "fields": [{"name":"t1key", "type": "string"}]}',
 | 
						|
                    schema_type="AVRO",
 | 
						|
                ),
 | 
						|
                subject="topic1-key",
 | 
						|
                version=1,
 | 
						|
            ),
 | 
						|
            RegisteredSchema(
 | 
						|
                guid=None,
 | 
						|
                schema_id="schema_id_1",
 | 
						|
                schema=Schema(
 | 
						|
                    schema_str='{"type":"record", "name":"Topic1Value", "namespace": "test.acryl", "fields": [{"name":"t1value", "type": "string"}]}',
 | 
						|
                    schema_type="AVRO",
 | 
						|
                ),
 | 
						|
                subject="topic1-value",
 | 
						|
                version=1,
 | 
						|
            ),
 | 
						|
        ),
 | 
						|
        # RecordNameStrategy is used for subject
 | 
						|
        "topic2": (
 | 
						|
            RegisteredSchema(
 | 
						|
                guid=None,
 | 
						|
                schema_id="schema_id_3",
 | 
						|
                schema=Schema(
 | 
						|
                    schema_str='{"type":"record", "name":"Topic2Key", "namespace": "test.acryl", "fields": [{"name":"t2key", "type": "string"}]}',
 | 
						|
                    schema_type="AVRO",
 | 
						|
                ),
 | 
						|
                subject="test.acryl.Topic2Key",
 | 
						|
                version=1,
 | 
						|
            ),
 | 
						|
            RegisteredSchema(
 | 
						|
                guid=None,
 | 
						|
                schema_id="schema_id_4",
 | 
						|
                schema=Schema(
 | 
						|
                    schema_str='{"type":"record", "name":"Topic2Value", "namespace": "test.acryl", "fields": [{"name":"t2value", "type": "string"}]}',
 | 
						|
                    schema_type="AVRO",
 | 
						|
                ),
 | 
						|
                subject="test.acryl.Topic2Value",
 | 
						|
                version=1,
 | 
						|
            ),
 | 
						|
        ),
 | 
						|
        # TopicRecordNameStrategy is used for subject
 | 
						|
        "topic3": (
 | 
						|
            RegisteredSchema(
 | 
						|
                guid=None,
 | 
						|
                schema_id="schema_id_4",
 | 
						|
                schema=Schema(
 | 
						|
                    schema_str='{"type":"record", "name":"Topic3Key", "namespace": "test.acryl", "fields": [{"name":"t3key", "type": "string"}]}',
 | 
						|
                    schema_type="AVRO",
 | 
						|
                ),
 | 
						|
                subject="topic3-test.acryl.Topic3Key-key",
 | 
						|
                version=1,
 | 
						|
            ),
 | 
						|
            RegisteredSchema(
 | 
						|
                guid=None,
 | 
						|
                schema_id="schema_id_5",
 | 
						|
                schema=Schema(
 | 
						|
                    schema_str='{"type":"record", "name":"Topic3Value", "namespace": "test.acryl", "fields": [{"name":"t3value", "type": "string"}]}',
 | 
						|
                    schema_type="AVRO",
 | 
						|
                ),
 | 
						|
                subject="topic3-test.acryl.Topic3Value-value",
 | 
						|
                version=1,
 | 
						|
            ),
 | 
						|
        ),
 | 
						|
    }
 | 
						|
 | 
						|
    # Mock the kafka consumer
 | 
						|
    mock_kafka_instance = mock_kafka_consumer.return_value
 | 
						|
    mock_cluster_metadata = MagicMock()
 | 
						|
    mock_cluster_metadata.topics = {k: None for k in topic_subject_schema_map}
 | 
						|
    mock_cluster_metadata.topics["schema_less_topic"] = None
 | 
						|
    mock_kafka_instance.list_topics.return_value = mock_cluster_metadata
 | 
						|
 | 
						|
    # Mock the schema registry client
 | 
						|
    # - mock get_subjects: all subjects in topic_subject_schema_map
 | 
						|
    mock_schema_registry_client.return_value.get_subjects.return_value = [
 | 
						|
        v.subject for v in chain(*topic_subject_schema_map.values())
 | 
						|
    ]
 | 
						|
 | 
						|
    # - mock get_latest_version
 | 
						|
    def mock_get_latest_version(subject_name: str) -> Optional[RegisteredSchema]:
 | 
						|
        for registered_schema in chain(*topic_subject_schema_map.values()):
 | 
						|
            if registered_schema.subject == subject_name:
 | 
						|
                return registered_schema
 | 
						|
        return None
 | 
						|
 | 
						|
    mock_schema_registry_client.return_value.get_latest_version = (
 | 
						|
        mock_get_latest_version
 | 
						|
    )
 | 
						|
 | 
						|
    # Test the kafka source
 | 
						|
    source_config = {
 | 
						|
        "connection": {"bootstrap": "localhost:9092"},
 | 
						|
        # Setup the topic_subject_map for topic2 which uses RecordNameStrategy
 | 
						|
        "topic_subject_map": {
 | 
						|
            "topic2-key": "test.acryl.Topic2Key",
 | 
						|
            "topic2-value": "test.acryl.Topic2Value",
 | 
						|
        },
 | 
						|
        "ingest_schemas_as_entities": True,
 | 
						|
    }
 | 
						|
    ctx = PipelineContext(run_id="test")
 | 
						|
    kafka_source = KafkaSource.create(source_config, ctx)
 | 
						|
    workunits = list(kafka_source.get_workunits())
 | 
						|
 | 
						|
    mock_kafka_consumer.assert_called_once()
 | 
						|
    mock_kafka_instance.list_topics.assert_called_once()
 | 
						|
    # Along with with 4 topics (3 with schema and 1 schemaless) which constitutes to 8 workunits,
 | 
						|
    #   there will be 6 schemas (1 key and 1 value schema for 3 topics) which constitutes to 12 workunits
 | 
						|
    #   and there will be 10 browse paths workunits
 | 
						|
    assert len(workunits) == 30
 | 
						|
    i: int = -1
 | 
						|
    for wu in workunits:
 | 
						|
        assert isinstance(wu, MetadataWorkUnit)
 | 
						|
        if not isinstance(wu.metadata, MetadataChangeEvent):
 | 
						|
            continue
 | 
						|
        mce: MetadataChangeEvent = wu.metadata
 | 
						|
        i += 1
 | 
						|
 | 
						|
        # Only topic (named schema_less_topic) does not have schema metadata but other workunits (that are created
 | 
						|
        #   for schema) will have corresponding SchemaMetadata aspect
 | 
						|
        if i < len(topic_subject_schema_map.keys()):
 | 
						|
            # First 3 workunits (topics) must have schemaMetadata aspect
 | 
						|
            assert isinstance(mce.proposedSnapshot.aspects[1], SchemaMetadataClass)
 | 
						|
            schemaMetadataAspect: SchemaMetadataClass = mce.proposedSnapshot.aspects[1]
 | 
						|
            assert isinstance(schemaMetadataAspect.platformSchema, KafkaSchemaClass)
 | 
						|
            # Make sure the schema name is present in topic_subject_schema_map.
 | 
						|
            assert schemaMetadataAspect.schemaName in topic_subject_schema_map
 | 
						|
            # Make sure the schema_str matches for the key schema.
 | 
						|
            assert (
 | 
						|
                schemaMetadataAspect.platformSchema.keySchema
 | 
						|
                == topic_subject_schema_map[schemaMetadataAspect.schemaName][
 | 
						|
                    0
 | 
						|
                ].schema.schema_str
 | 
						|
            )
 | 
						|
            # Make sure the schema_type matches for the key schema.
 | 
						|
            assert (
 | 
						|
                schemaMetadataAspect.platformSchema.keySchemaType
 | 
						|
                == topic_subject_schema_map[schemaMetadataAspect.schemaName][
 | 
						|
                    0
 | 
						|
                ].schema.schema_type
 | 
						|
            )
 | 
						|
            # Make sure the schema_str matches for the value schema.
 | 
						|
            assert (
 | 
						|
                schemaMetadataAspect.platformSchema.documentSchema
 | 
						|
                == topic_subject_schema_map[schemaMetadataAspect.schemaName][
 | 
						|
                    1
 | 
						|
                ].schema.schema_str
 | 
						|
            )
 | 
						|
            # Make sure the schema_type matches for the value schema.
 | 
						|
            assert (
 | 
						|
                schemaMetadataAspect.platformSchema.documentSchemaType
 | 
						|
                == topic_subject_schema_map[schemaMetadataAspect.schemaName][
 | 
						|
                    1
 | 
						|
                ].schema.schema_type
 | 
						|
            )
 | 
						|
            # Make sure we have 2 fields, one from the key schema & one from the value schema.
 | 
						|
            assert len(schemaMetadataAspect.fields) == 2
 | 
						|
        elif i == len(topic_subject_schema_map.keys()):
 | 
						|
            # Last topic('schema_less_topic') has no schema defined in the registry.
 | 
						|
            # The schemaMetadata aspect should not be present for this.
 | 
						|
            for aspect in mce.proposedSnapshot.aspects:
 | 
						|
                assert not isinstance(aspect, SchemaMetadataClass)
 | 
						|
        else:
 | 
						|
            # Last 2 workunits (schemas) must have schemaMetadata aspect
 | 
						|
            assert isinstance(mce.proposedSnapshot.aspects[1], SchemaMetadataClass)
 | 
						|
            schemaMetadataAspectObj: SchemaMetadataClass = mce.proposedSnapshot.aspects[
 | 
						|
                1
 | 
						|
            ]
 | 
						|
            assert isinstance(schemaMetadataAspectObj.platformSchema, KafkaSchemaClass)
 | 
						|
 | 
						|
 | 
						|
@pytest.mark.parametrize(
 | 
						|
    "ignore_warnings_on_schema_type",
 | 
						|
    [
 | 
						|
        pytest.param(
 | 
						|
            False,
 | 
						|
            id="ignore_warnings_on_schema_type-FALSE",
 | 
						|
        ),
 | 
						|
        pytest.param(
 | 
						|
            True,
 | 
						|
            id="ignore_warnings_on_schema_type-TRUE",
 | 
						|
        ),
 | 
						|
    ],
 | 
						|
)
 | 
						|
@patch(
 | 
						|
    "datahub.ingestion.source.confluent_schema_registry.SchemaRegistryClient",
 | 
						|
    autospec=True,
 | 
						|
)
 | 
						|
@patch("datahub.ingestion.source.kafka.kafka.confluent_kafka.Consumer", autospec=True)
 | 
						|
def test_kafka_ignore_warnings_on_schema_type(
 | 
						|
    mock_kafka_consumer,
 | 
						|
    mock_schema_registry_client,
 | 
						|
    mock_admin_client,
 | 
						|
    ignore_warnings_on_schema_type,
 | 
						|
):
 | 
						|
    # define the key and value schemas for topic1
 | 
						|
    topic1_key_schema = RegisteredSchema(
 | 
						|
        guid=None,
 | 
						|
        schema_id="schema_id_2",
 | 
						|
        schema=Schema(
 | 
						|
            schema_str="{}",
 | 
						|
            schema_type="UNKNOWN_TYPE",
 | 
						|
        ),
 | 
						|
        subject="topic1-key",
 | 
						|
        version=1,
 | 
						|
    )
 | 
						|
    topic1_value_schema = RegisteredSchema(
 | 
						|
        guid=None,
 | 
						|
        schema_id="schema_id_1",
 | 
						|
        schema=Schema(
 | 
						|
            schema_str="{}",
 | 
						|
            schema_type="UNKNOWN_TYPE",
 | 
						|
        ),
 | 
						|
        subject="topic1-value",
 | 
						|
        version=1,
 | 
						|
    )
 | 
						|
 | 
						|
    # Mock the kafka consumer
 | 
						|
    mock_kafka_instance = mock_kafka_consumer.return_value
 | 
						|
    mock_cluster_metadata = MagicMock()
 | 
						|
    mock_cluster_metadata.topics = {"topic1": None}
 | 
						|
    mock_kafka_instance.list_topics.return_value = mock_cluster_metadata
 | 
						|
 | 
						|
    # Mock the schema registry client
 | 
						|
    mock_schema_registry_client.return_value.get_subjects.return_value = [
 | 
						|
        "topic1-key",
 | 
						|
        "topic1-value",
 | 
						|
    ]
 | 
						|
 | 
						|
    # - mock get_latest_version
 | 
						|
    def mock_get_latest_version(subject_name: str) -> Optional[RegisteredSchema]:
 | 
						|
        if subject_name == "topic1-key":
 | 
						|
            return topic1_key_schema
 | 
						|
        elif subject_name == "topic1-value":
 | 
						|
            return topic1_value_schema
 | 
						|
        return None
 | 
						|
 | 
						|
    mock_schema_registry_client.return_value.get_latest_version = (
 | 
						|
        mock_get_latest_version
 | 
						|
    )
 | 
						|
 | 
						|
    # Test the kafka source
 | 
						|
    source_config = {
 | 
						|
        "connection": {"bootstrap": "localhost:9092"},
 | 
						|
        "ignore_warnings_on_schema_type": f"{ignore_warnings_on_schema_type}",
 | 
						|
    }
 | 
						|
    ctx = PipelineContext(run_id="test")
 | 
						|
    kafka_source = KafkaSource.create(source_config, ctx)
 | 
						|
 | 
						|
    workunits = list(kafka_source.get_workunits())
 | 
						|
    assert len(workunits) == 3
 | 
						|
    if ignore_warnings_on_schema_type:
 | 
						|
        assert not kafka_source.report.warnings
 | 
						|
    else:
 | 
						|
        assert kafka_source.report.warnings
 | 
						|
 | 
						|
 | 
						|
@patch("datahub.ingestion.source.kafka.kafka.AdminClient", autospec=True)
 | 
						|
@patch("datahub.ingestion.source.kafka.kafka.confluent_kafka.Consumer", autospec=True)
 | 
						|
def test_kafka_source_succeeds_with_admin_client_init_error(
 | 
						|
    mock_kafka, mock_kafka_admin_client
 | 
						|
):
 | 
						|
    mock_kafka_instance = mock_kafka.return_value
 | 
						|
    mock_cluster_metadata = MagicMock()
 | 
						|
    mock_cluster_metadata.topics = {"test": None, "foobar": None, "bazbaz": None}
 | 
						|
    mock_kafka_instance.list_topics.return_value = mock_cluster_metadata
 | 
						|
 | 
						|
    mock_kafka_admin_client.side_effect = Exception()
 | 
						|
 | 
						|
    ctx = PipelineContext(run_id="test1")
 | 
						|
    kafka_source = KafkaSource.create(
 | 
						|
        {
 | 
						|
            "topic_patterns": {"allow": ["test"]},
 | 
						|
            "connection": {"bootstrap": "localhost:9092"},
 | 
						|
        },
 | 
						|
        ctx,
 | 
						|
    )
 | 
						|
    workunits = [w for w in kafka_source.get_workunits()]
 | 
						|
 | 
						|
    mock_kafka.assert_called_once()
 | 
						|
    mock_kafka_instance.list_topics.assert_called_once()
 | 
						|
 | 
						|
    mock_kafka_admin_client.assert_called_once()
 | 
						|
 | 
						|
    assert len(workunits) == 3
 | 
						|
 | 
						|
 | 
						|
@patch("datahub.ingestion.source.kafka.kafka.AdminClient", autospec=True)
 | 
						|
@patch("datahub.ingestion.source.kafka.kafka.confluent_kafka.Consumer", autospec=True)
 | 
						|
def test_kafka_source_succeeds_with_describe_configs_error(
 | 
						|
    mock_kafka, mock_kafka_admin_client
 | 
						|
):
 | 
						|
    mock_kafka_instance = mock_kafka.return_value
 | 
						|
    mock_cluster_metadata = MagicMock()
 | 
						|
    mock_cluster_metadata.topics = {"test": None, "foobar": None, "bazbaz": None}
 | 
						|
    mock_kafka_instance.list_topics.return_value = mock_cluster_metadata
 | 
						|
 | 
						|
    mock_admin_client_instance = mock_kafka_admin_client.return_value
 | 
						|
    mock_admin_client_instance.describe_configs.side_effect = Exception()
 | 
						|
 | 
						|
    ctx = PipelineContext(run_id="test1")
 | 
						|
    kafka_source = KafkaSource.create(
 | 
						|
        {
 | 
						|
            "topic_patterns": {"allow": ["test"]},
 | 
						|
            "connection": {"bootstrap": "localhost:9092"},
 | 
						|
        },
 | 
						|
        ctx,
 | 
						|
    )
 | 
						|
    workunits = [w for w in kafka_source.get_workunits()]
 | 
						|
 | 
						|
    mock_kafka.assert_called_once()
 | 
						|
    mock_kafka_instance.list_topics.assert_called_once()
 | 
						|
 | 
						|
    mock_kafka_admin_client.assert_called_once()
 | 
						|
    mock_admin_client_instance.describe_configs.assert_called_once()
 | 
						|
 | 
						|
    assert len(workunits) == 3
 | 
						|
 | 
						|
 | 
						|
@freeze_time("2023-09-20 10:00:00")
 | 
						|
@patch(
 | 
						|
    "datahub.ingestion.source.confluent_schema_registry.SchemaRegistryClient",
 | 
						|
    autospec=True,
 | 
						|
)
 | 
						|
@patch("datahub.ingestion.source.kafka.kafka.confluent_kafka.Consumer", autospec=True)
 | 
						|
def test_kafka_source_topic_meta_mappings(
 | 
						|
    mock_kafka_consumer, mock_schema_registry_client, mock_admin_client
 | 
						|
):
 | 
						|
    # Setup the topic to key/value schema mappings for all types of schema registry subject name strategies.
 | 
						|
    # <key=topic_name, value=(<key_schema>,<value_schema>)
 | 
						|
    topic_subject_schema_map: Dict[str, Tuple[RegisteredSchema, RegisteredSchema]] = {
 | 
						|
        "topic1": (
 | 
						|
            RegisteredSchema(
 | 
						|
                guid=None,
 | 
						|
                schema_id="schema_id_2",
 | 
						|
                schema=Schema(
 | 
						|
                    schema_str='{"type":"record", "name":"Topic1Key", "namespace": "test.acryl", "fields": [{"name":"t1key", "type": "string"}]}',
 | 
						|
                    schema_type="AVRO",
 | 
						|
                ),
 | 
						|
                subject="topic1-key",
 | 
						|
                version=1,
 | 
						|
            ),
 | 
						|
            RegisteredSchema(
 | 
						|
                guid=None,
 | 
						|
                schema_id="schema_id_1",
 | 
						|
                schema=Schema(
 | 
						|
                    schema_str=json.dumps(
 | 
						|
                        {
 | 
						|
                            "type": "record",
 | 
						|
                            "name": "Topic1Value",
 | 
						|
                            "namespace": "test.acryl",
 | 
						|
                            "fields": [{"name": "t1value", "type": "string"}],
 | 
						|
                            "owner": "@charles",
 | 
						|
                            "business_owner": "jdoe.last@gmail.com",
 | 
						|
                            "data_governance.team_owner": "Finance",
 | 
						|
                            "has_pii": True,
 | 
						|
                            "int_property": 1,
 | 
						|
                            "double_property": 2.5,
 | 
						|
                        }
 | 
						|
                    ),
 | 
						|
                    schema_type="AVRO",
 | 
						|
                ),
 | 
						|
                subject="topic1-value",
 | 
						|
                version=1,
 | 
						|
            ),
 | 
						|
        )
 | 
						|
    }
 | 
						|
 | 
						|
    # Mock the kafka consumer
 | 
						|
    mock_kafka_instance = mock_kafka_consumer.return_value
 | 
						|
    mock_cluster_metadata = MagicMock()
 | 
						|
    mock_cluster_metadata.topics = {k: None for k in topic_subject_schema_map}
 | 
						|
    mock_kafka_instance.list_topics.return_value = mock_cluster_metadata
 | 
						|
 | 
						|
    # Mock the schema registry client
 | 
						|
    # - mock get_subjects: all subjects in topic_subject_schema_map
 | 
						|
    mock_schema_registry_client.return_value.get_subjects.return_value = [
 | 
						|
        v.subject for v in chain(*topic_subject_schema_map.values())
 | 
						|
    ]
 | 
						|
 | 
						|
    # - mock get_latest_version
 | 
						|
    def mock_get_latest_version(subject_name: str) -> Optional[RegisteredSchema]:
 | 
						|
        for registered_schema in chain(*topic_subject_schema_map.values()):
 | 
						|
            if registered_schema.subject == subject_name:
 | 
						|
                return registered_schema
 | 
						|
        return None
 | 
						|
 | 
						|
    mock_schema_registry_client.return_value.get_latest_version = (
 | 
						|
        mock_get_latest_version
 | 
						|
    )
 | 
						|
 | 
						|
    ctx = PipelineContext(run_id="test1")
 | 
						|
    kafka_source = KafkaSource.create(
 | 
						|
        {
 | 
						|
            "connection": {"bootstrap": "localhost:9092"},
 | 
						|
            "ingest_schemas_as_entities": True,
 | 
						|
            "meta_mapping": {
 | 
						|
                "owner": {
 | 
						|
                    "match": "^@(.*)",
 | 
						|
                    "operation": "add_owner",
 | 
						|
                    "config": {"owner_type": "user"},
 | 
						|
                },
 | 
						|
                "business_owner": {
 | 
						|
                    "match": ".*",
 | 
						|
                    "operation": "add_owner",
 | 
						|
                    "config": {"owner_type": "user"},
 | 
						|
                },
 | 
						|
                "has_pii": {
 | 
						|
                    "match": True,
 | 
						|
                    "operation": "add_tag",
 | 
						|
                    "config": {"tag": "has_pii_test"},
 | 
						|
                },
 | 
						|
                "int_property": {
 | 
						|
                    "match": 1,
 | 
						|
                    "operation": "add_tag",
 | 
						|
                    "config": {"tag": "int_meta_property"},
 | 
						|
                },
 | 
						|
                "double_property": {
 | 
						|
                    "match": 2.5,
 | 
						|
                    "operation": "add_term",
 | 
						|
                    "config": {"term": "double_meta_property"},
 | 
						|
                },
 | 
						|
                "data_governance.team_owner": {
 | 
						|
                    "match": "Finance",
 | 
						|
                    "operation": "add_term",
 | 
						|
                    "config": {"term": "Finance_test"},
 | 
						|
                },
 | 
						|
            },
 | 
						|
        },
 | 
						|
        ctx,
 | 
						|
    )
 | 
						|
    # Along with with 1 topics (and 5 meta mapping) it constitutes to 6 workunits,
 | 
						|
    #   there will be 2 schemas which constitutes to 4 workunits (1 mce and 1 mcp each)
 | 
						|
    workunits = [w for w in kafka_source.get_workunits()]
 | 
						|
    assert len(workunits) == 13
 | 
						|
 | 
						|
    # workunit[0] - DatasetSnapshot
 | 
						|
 | 
						|
    mce = workunits[0].metadata
 | 
						|
    assert isinstance(mce, MetadataChangeEvent)
 | 
						|
 | 
						|
    ownership_aspect = [
 | 
						|
        asp for asp in mce.proposedSnapshot.aspects if isinstance(asp, OwnershipClass)
 | 
						|
    ][0]
 | 
						|
    assert ownership_aspect == make_ownership_aspect_from_urn_list(
 | 
						|
        [
 | 
						|
            make_owner_urn("charles", OwnerType.USER),
 | 
						|
            make_owner_urn("jdoe.last@gmail.com", OwnerType.USER),
 | 
						|
        ],
 | 
						|
        "SERVICE",
 | 
						|
    )
 | 
						|
 | 
						|
    tags_aspect = [
 | 
						|
        asp for asp in mce.proposedSnapshot.aspects if isinstance(asp, GlobalTagsClass)
 | 
						|
    ][0]
 | 
						|
    assert tags_aspect == make_global_tag_aspect_with_tag_list(
 | 
						|
        ["has_pii_test", "int_meta_property"]
 | 
						|
    )
 | 
						|
 | 
						|
    terms_aspect = [
 | 
						|
        asp
 | 
						|
        for asp in mce.proposedSnapshot.aspects
 | 
						|
        if isinstance(asp, GlossaryTermsClass)
 | 
						|
    ][0]
 | 
						|
    assert terms_aspect == make_glossary_terms_aspect_from_urn_list(
 | 
						|
        [
 | 
						|
            "urn:li:glossaryTerm:Finance_test",
 | 
						|
            "urn:li:glossaryTerm:double_meta_property",
 | 
						|
        ]
 | 
						|
    )
 | 
						|
 | 
						|
    # workunit[1] - subtypes
 | 
						|
 | 
						|
    assert isinstance(workunits[1].metadata, MetadataChangeProposalWrapper)
 | 
						|
    assert workunits[1].metadata.aspectName == "subTypes"
 | 
						|
 | 
						|
    # workunit[2] - browse paths
 | 
						|
 | 
						|
    assert isinstance(workunits[2].metadata, MetadataChangeProposalWrapper)
 | 
						|
    assert workunits[2].metadata.aspectName == "browsePathsV2"
 | 
						|
 | 
						|
    # workunit[3] - DatasetSnapshot
 | 
						|
 | 
						|
    mce = workunits[3].metadata
 | 
						|
    assert isinstance(mce, MetadataChangeEvent)
 | 
						|
 | 
						|
    # workunit[4] - subtypes
 | 
						|
 | 
						|
    assert isinstance(workunits[4].metadata, MetadataChangeProposalWrapper)
 | 
						|
    assert workunits[4].metadata.aspectName == "subTypes"
 | 
						|
 | 
						|
    # workunit[5] - browse paths
 | 
						|
 | 
						|
    assert isinstance(workunits[5].metadata, MetadataChangeProposalWrapper)
 | 
						|
    assert workunits[5].metadata.aspectName == "browsePathsV2"
 | 
						|
 | 
						|
    # workunit[6] - DatasetSnapshot
 | 
						|
 | 
						|
    mce = workunits[6].metadata
 | 
						|
    assert isinstance(mce, MetadataChangeEvent)
 | 
						|
    ownership_aspect = [
 | 
						|
        asp for asp in mce.proposedSnapshot.aspects if isinstance(asp, OwnershipClass)
 | 
						|
    ][0]
 | 
						|
    assert ownership_aspect == make_ownership_aspect_from_urn_list(
 | 
						|
        [
 | 
						|
            make_owner_urn("charles", OwnerType.USER),
 | 
						|
            make_owner_urn("jdoe.last@gmail.com", OwnerType.USER),
 | 
						|
        ],
 | 
						|
        "SERVICE",
 | 
						|
    )
 | 
						|
 | 
						|
    tags_aspect = [
 | 
						|
        asp for asp in mce.proposedSnapshot.aspects if isinstance(asp, GlobalTagsClass)
 | 
						|
    ][0]
 | 
						|
    assert tags_aspect == make_global_tag_aspect_with_tag_list(
 | 
						|
        ["has_pii_test", "int_meta_property"]
 | 
						|
    )
 | 
						|
 | 
						|
    terms_aspect = [
 | 
						|
        asp
 | 
						|
        for asp in mce.proposedSnapshot.aspects
 | 
						|
        if isinstance(asp, GlossaryTermsClass)
 | 
						|
    ][0]
 | 
						|
    assert terms_aspect == make_glossary_terms_aspect_from_urn_list(
 | 
						|
        [
 | 
						|
            "urn:li:glossaryTerm:Finance_test",
 | 
						|
            "urn:li:glossaryTerm:double_meta_property",
 | 
						|
        ]
 | 
						|
    )
 | 
						|
 | 
						|
    # workunit[7] - subtypes
 | 
						|
 | 
						|
    assert isinstance(workunits[7].metadata, MetadataChangeProposalWrapper)
 | 
						|
    assert workunits[7].metadata.aspectName == "subTypes"
 | 
						|
 | 
						|
    # workunit[8] - browse paths
 | 
						|
 | 
						|
    assert isinstance(workunits[8].metadata, MetadataChangeProposalWrapper)
 | 
						|
    assert workunits[8].metadata.aspectName == "browsePathsV2"
 | 
						|
 | 
						|
    # workunit[9] - glossary terms
 | 
						|
 | 
						|
    assert isinstance(workunits[9].metadata, MetadataChangeProposalWrapper)
 | 
						|
    assert workunits[9].metadata.aspectName == "glossaryTermKey"
 | 
						|
 | 
						|
    # workunit[10] - glossary terms
 | 
						|
 | 
						|
    assert isinstance(workunits[10].metadata, MetadataChangeProposalWrapper)
 | 
						|
    assert workunits[10].metadata.aspectName == "glossaryTermKey"
 | 
						|
 | 
						|
    # workunit[11] - tags
 | 
						|
 | 
						|
    assert isinstance(workunits[11].metadata, MetadataChangeProposalWrapper)
 | 
						|
    assert workunits[11].metadata.aspectName == "tagKey"
 | 
						|
 | 
						|
    # workunit[12] - tags
 | 
						|
 | 
						|
    assert isinstance(workunits[12].metadata, MetadataChangeProposalWrapper)
 | 
						|
    assert workunits[12].metadata.aspectName == "tagKey"
 | 
						|
 | 
						|
 | 
						|
def test_kafka_source_oauth_cb_configuration():
 | 
						|
    with pytest.raises(
 | 
						|
        ConfigurationError,
 | 
						|
        match=(
 | 
						|
            "oauth_cb must be a string representing python function reference "
 | 
						|
            "in the format <python-module>:<function-name>."
 | 
						|
        ),
 | 
						|
    ):
 | 
						|
        KafkaSourceConfig.parse_obj(
 | 
						|
            {
 | 
						|
                "connection": {
 | 
						|
                    "bootstrap": "foobar:9092",
 | 
						|
                    "consumer_config": {
 | 
						|
                        "oauth_cb": test_kafka_ignore_warnings_on_schema_type
 | 
						|
                    },
 | 
						|
                }
 | 
						|
            }
 | 
						|
        )
 |