2022-02-22 16:05:46 -08:00
|
|
|
from itertools import chain
|
|
|
|
from typing import Dict, Optional, Tuple
|
2021-02-11 23:14:20 -08:00
|
|
|
from unittest.mock import MagicMock, patch
|
|
|
|
|
2022-02-15 16:18:36 +01:00
|
|
|
import pytest
|
2022-01-18 03:59:54 +05:30
|
|
|
from confluent_kafka.schema_registry.schema_registry_client import (
|
|
|
|
RegisteredSchema,
|
|
|
|
Schema,
|
|
|
|
)
|
|
|
|
|
2022-02-15 16:18:36 +01:00
|
|
|
from datahub.emitter.mce_builder import (
|
|
|
|
make_dataplatform_instance_urn,
|
2023-02-22 18:38:08 -05:00
|
|
|
make_dataset_urn,
|
2022-02-15 16:18:36 +01:00
|
|
|
make_dataset_urn_with_platform_instance,
|
|
|
|
)
|
2021-02-15 15:04:21 -08:00
|
|
|
from datahub.ingestion.api.common import PipelineContext
|
2022-05-02 00:18:15 -07:00
|
|
|
from datahub.ingestion.api.workunit import MetadataWorkUnit
|
|
|
|
from datahub.ingestion.source.kafka import KafkaSource, KafkaSourceConfig
|
2021-02-15 15:04:21 -08:00
|
|
|
from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
|
2022-02-22 16:05:46 -08:00
|
|
|
from datahub.metadata.schema_classes import (
|
|
|
|
BrowsePathsClass,
|
|
|
|
DataPlatformInstanceClass,
|
|
|
|
KafkaSchemaClass,
|
|
|
|
SchemaMetadataClass,
|
|
|
|
)
|
2021-02-10 14:53:55 -08:00
|
|
|
|
2021-01-31 22:40:30 -08:00
|
|
|
|
2023-01-23 10:22:20 -05:00
|
|
|
@pytest.fixture
|
|
|
|
def mock_admin_client():
|
|
|
|
with patch("datahub.ingestion.source.kafka.AdminClient", autospec=True) as mock:
|
|
|
|
yield mock
|
|
|
|
|
|
|
|
|
2022-12-22 14:04:55 +05:30
|
|
|
@patch("datahub.ingestion.source.kafka.confluent_kafka.Consumer", autospec=True)
|
|
|
|
def test_kafka_source_configuration(mock_kafka):
|
|
|
|
ctx = PipelineContext(run_id="test")
|
|
|
|
kafka_source = KafkaSource(
|
|
|
|
KafkaSourceConfig.parse_obj({"connection": {"bootstrap": "foobar:9092"}}),
|
|
|
|
ctx,
|
|
|
|
)
|
|
|
|
kafka_source.close()
|
|
|
|
assert mock_kafka.call_count == 1
|
2021-01-31 22:40:30 -08:00
|
|
|
|
|
|
|
|
2022-12-22 14:04:55 +05:30
|
|
|
@patch("datahub.ingestion.source.kafka.confluent_kafka.Consumer", autospec=True)
|
2023-01-23 10:22:20 -05:00
|
|
|
def test_kafka_source_workunits_wildcard_topic(mock_kafka, mock_admin_client):
|
2022-12-22 14:04:55 +05:30
|
|
|
mock_kafka_instance = mock_kafka.return_value
|
|
|
|
mock_cluster_metadata = MagicMock()
|
|
|
|
mock_cluster_metadata.topics = {"foobar": None, "bazbaz": None}
|
|
|
|
mock_kafka_instance.list_topics.return_value = mock_cluster_metadata
|
2022-02-15 16:18:36 +01:00
|
|
|
|
2022-12-22 14:04:55 +05:30
|
|
|
ctx = PipelineContext(run_id="test")
|
|
|
|
kafka_source = KafkaSource(
|
|
|
|
KafkaSourceConfig.parse_obj({"connection": {"bootstrap": "localhost:9092"}}),
|
|
|
|
ctx,
|
|
|
|
)
|
|
|
|
workunits = list(kafka_source.get_workunits())
|
|
|
|
|
|
|
|
first_mce = workunits[0].metadata
|
|
|
|
assert isinstance(first_mce, MetadataChangeEvent)
|
|
|
|
mock_kafka.assert_called_once()
|
|
|
|
mock_kafka_instance.list_topics.assert_called_once()
|
|
|
|
assert len(workunits) == 4
|
|
|
|
|
|
|
|
|
|
|
|
@patch("datahub.ingestion.source.kafka.confluent_kafka.Consumer", autospec=True)
|
2023-01-23 10:22:20 -05:00
|
|
|
def test_kafka_source_workunits_topic_pattern(mock_kafka, mock_admin_client):
|
2022-12-22 14:04:55 +05:30
|
|
|
mock_kafka_instance = mock_kafka.return_value
|
|
|
|
mock_cluster_metadata = MagicMock()
|
|
|
|
mock_cluster_metadata.topics = {"test": None, "foobar": None, "bazbaz": None}
|
|
|
|
mock_kafka_instance.list_topics.return_value = mock_cluster_metadata
|
|
|
|
|
|
|
|
ctx = PipelineContext(run_id="test1")
|
|
|
|
kafka_source = KafkaSource.create(
|
|
|
|
{
|
|
|
|
"topic_patterns": {"allow": ["test"]},
|
|
|
|
"connection": {"bootstrap": "localhost:9092"},
|
|
|
|
},
|
|
|
|
ctx,
|
|
|
|
)
|
|
|
|
workunits = [w for w in kafka_source.get_workunits()]
|
|
|
|
|
|
|
|
mock_kafka.assert_called_once()
|
|
|
|
mock_kafka_instance.list_topics.assert_called_once()
|
|
|
|
assert len(workunits) == 2
|
|
|
|
|
|
|
|
mock_cluster_metadata.topics = {"test": None, "test2": None, "bazbaz": None}
|
|
|
|
ctx = PipelineContext(run_id="test2")
|
|
|
|
kafka_source = KafkaSource.create(
|
|
|
|
{
|
|
|
|
"topic_patterns": {"allow": ["test.*"]},
|
|
|
|
"connection": {"bootstrap": "localhost:9092"},
|
|
|
|
},
|
|
|
|
ctx,
|
|
|
|
)
|
|
|
|
workunits = [w for w in kafka_source.get_workunits()]
|
|
|
|
assert len(workunits) == 4
|
|
|
|
|
|
|
|
|
|
|
|
@patch("datahub.ingestion.source.kafka.confluent_kafka.Consumer", autospec=True)
|
2023-01-23 10:22:20 -05:00
|
|
|
def test_kafka_source_workunits_with_platform_instance(mock_kafka, mock_admin_client):
|
2022-12-22 14:04:55 +05:30
|
|
|
PLATFORM_INSTANCE = "kafka_cluster"
|
|
|
|
PLATFORM = "kafka"
|
|
|
|
TOPIC_NAME = "test"
|
|
|
|
|
|
|
|
mock_kafka_instance = mock_kafka.return_value
|
|
|
|
mock_cluster_metadata = MagicMock()
|
|
|
|
mock_cluster_metadata.topics = {TOPIC_NAME: None}
|
|
|
|
mock_kafka_instance.list_topics.return_value = mock_cluster_metadata
|
|
|
|
|
|
|
|
ctx = PipelineContext(run_id="test1")
|
|
|
|
kafka_source = KafkaSource.create(
|
|
|
|
{
|
|
|
|
"connection": {"bootstrap": "localhost:9092"},
|
|
|
|
"platform_instance": PLATFORM_INSTANCE,
|
|
|
|
},
|
|
|
|
ctx,
|
|
|
|
)
|
|
|
|
workunits = [w for w in kafka_source.get_workunits()]
|
|
|
|
|
|
|
|
# We should only have 1 topic + sub-type wu.
|
|
|
|
assert len(workunits) == 2
|
|
|
|
assert isinstance(workunits[0], MetadataWorkUnit)
|
|
|
|
assert isinstance(workunits[0].metadata, MetadataChangeEvent)
|
|
|
|
proposed_snap = workunits[0].metadata.proposedSnapshot
|
|
|
|
assert proposed_snap.urn == make_dataset_urn_with_platform_instance(
|
|
|
|
platform=PLATFORM,
|
|
|
|
name=TOPIC_NAME,
|
|
|
|
platform_instance=PLATFORM_INSTANCE,
|
|
|
|
env="PROD",
|
|
|
|
)
|
|
|
|
|
|
|
|
# DataPlatform aspect should be present when platform_instance is configured
|
|
|
|
data_platform_aspects = [
|
|
|
|
asp for asp in proposed_snap.aspects if type(asp) == DataPlatformInstanceClass
|
|
|
|
]
|
|
|
|
assert len(data_platform_aspects) == 1
|
|
|
|
assert data_platform_aspects[0].instance == make_dataplatform_instance_urn(
|
|
|
|
PLATFORM, PLATFORM_INSTANCE
|
|
|
|
)
|
|
|
|
|
|
|
|
# The default browse path should include the platform_instance value
|
|
|
|
browse_path_aspects = [
|
|
|
|
asp for asp in proposed_snap.aspects if type(asp) == BrowsePathsClass
|
|
|
|
]
|
|
|
|
assert len(browse_path_aspects) == 1
|
2023-02-22 18:38:08 -05:00
|
|
|
assert f"/prod/{PLATFORM}/{PLATFORM_INSTANCE}" in browse_path_aspects[0].paths
|
|
|
|
|
|
|
|
|
|
|
|
@patch("datahub.ingestion.source.kafka.confluent_kafka.Consumer", autospec=True)
|
|
|
|
def test_kafka_source_workunits_no_platform_instance(mock_kafka, mock_admin_client):
|
|
|
|
PLATFORM = "kafka"
|
|
|
|
TOPIC_NAME = "test"
|
|
|
|
|
|
|
|
mock_kafka_instance = mock_kafka.return_value
|
|
|
|
mock_cluster_metadata = MagicMock()
|
|
|
|
mock_cluster_metadata.topics = {TOPIC_NAME: None}
|
|
|
|
mock_kafka_instance.list_topics.return_value = mock_cluster_metadata
|
|
|
|
|
|
|
|
ctx = PipelineContext(run_id="test1")
|
|
|
|
kafka_source = KafkaSource.create(
|
|
|
|
{"connection": {"bootstrap": "localhost:9092"}},
|
|
|
|
ctx,
|
|
|
|
)
|
|
|
|
workunits = [w for w in kafka_source.get_workunits()]
|
|
|
|
|
|
|
|
# We should only have 1 topic + sub-type wu.
|
|
|
|
assert len(workunits) == 2
|
|
|
|
assert isinstance(workunits[0], MetadataWorkUnit)
|
|
|
|
assert isinstance(workunits[0].metadata, MetadataChangeEvent)
|
|
|
|
proposed_snap = workunits[0].metadata.proposedSnapshot
|
|
|
|
assert proposed_snap.urn == make_dataset_urn(
|
|
|
|
platform=PLATFORM,
|
|
|
|
name=TOPIC_NAME,
|
|
|
|
env="PROD",
|
2022-12-22 14:04:55 +05:30
|
|
|
)
|
2022-02-15 16:18:36 +01:00
|
|
|
|
2023-05-17 23:33:08 +05:30
|
|
|
# DataPlatform aspect should not be present when platform_instance is not configured
|
2023-02-22 18:38:08 -05:00
|
|
|
data_platform_aspects = [
|
|
|
|
asp for asp in proposed_snap.aspects if type(asp) == DataPlatformInstanceClass
|
|
|
|
]
|
|
|
|
assert len(data_platform_aspects) == 0
|
|
|
|
|
|
|
|
# The default browse path should include the platform_instance value
|
|
|
|
browse_path_aspects = [
|
|
|
|
asp for asp in proposed_snap.aspects if type(asp) == BrowsePathsClass
|
|
|
|
]
|
|
|
|
assert len(browse_path_aspects) == 1
|
|
|
|
assert f"/prod/{PLATFORM}" in browse_path_aspects[0].paths
|
|
|
|
|
2022-02-15 16:18:36 +01:00
|
|
|
|
2022-12-22 14:04:55 +05:30
|
|
|
@patch("datahub.ingestion.source.kafka.confluent_kafka.Consumer", autospec=True)
|
2023-01-23 10:22:20 -05:00
|
|
|
def test_close(mock_kafka, mock_admin_client):
|
2022-12-22 14:04:55 +05:30
|
|
|
mock_kafka_instance = mock_kafka.return_value
|
|
|
|
ctx = PipelineContext(run_id="test")
|
|
|
|
kafka_source = KafkaSource.create(
|
|
|
|
{
|
|
|
|
"topic_patterns": {"allow": ["test.*"]},
|
|
|
|
"connection": {"bootstrap": "localhost:9092"},
|
|
|
|
},
|
|
|
|
ctx,
|
|
|
|
)
|
|
|
|
kafka_source.close()
|
|
|
|
assert mock_kafka_instance.close.call_count == 1
|
|
|
|
|
|
|
|
|
|
|
|
@patch(
|
2023-05-04 05:31:58 +05:30
|
|
|
"datahub.ingestion.source.confluent_schema_registry.SchemaRegistryClient",
|
2022-12-22 14:04:55 +05:30
|
|
|
autospec=True,
|
|
|
|
)
|
|
|
|
@patch("datahub.ingestion.source.kafka.confluent_kafka.Consumer", autospec=True)
|
|
|
|
def test_kafka_source_workunits_schema_registry_subject_name_strategies(
|
2023-01-23 10:22:20 -05:00
|
|
|
mock_kafka_consumer, mock_schema_registry_client, mock_admin_client
|
2022-12-22 14:04:55 +05:30
|
|
|
):
|
|
|
|
# Setup the topic to key/value schema mappings for all types of schema registry subject name strategies.
|
|
|
|
# <key=topic_name, value=(<key_schema>,<value_schema>)
|
|
|
|
topic_subject_schema_map: Dict[str, Tuple[RegisteredSchema, RegisteredSchema]] = {
|
|
|
|
# TopicNameStrategy is used for subject
|
|
|
|
"topic1": (
|
|
|
|
RegisteredSchema(
|
|
|
|
schema_id="schema_id_2",
|
|
|
|
schema=Schema(
|
|
|
|
schema_str='{"type":"record", "name":"Topic1Key", "namespace": "test.acryl", "fields": [{"name":"t1key", "type": "string"}]}',
|
|
|
|
schema_type="AVRO",
|
2022-02-22 16:05:46 -08:00
|
|
|
),
|
2022-12-22 14:04:55 +05:30
|
|
|
subject="topic1-key",
|
|
|
|
version=1,
|
|
|
|
),
|
|
|
|
RegisteredSchema(
|
|
|
|
schema_id="schema_id_1",
|
|
|
|
schema=Schema(
|
|
|
|
schema_str='{"type":"record", "name":"Topic1Value", "namespace": "test.acryl", "fields": [{"name":"t1value", "type": "string"}]}',
|
|
|
|
schema_type="AVRO",
|
2022-02-22 16:05:46 -08:00
|
|
|
),
|
2022-12-22 14:04:55 +05:30
|
|
|
subject="topic1-value",
|
|
|
|
version=1,
|
2022-02-22 16:05:46 -08:00
|
|
|
),
|
2022-12-22 14:04:55 +05:30
|
|
|
),
|
|
|
|
# RecordNameStrategy is used for subject
|
|
|
|
"topic2": (
|
|
|
|
RegisteredSchema(
|
|
|
|
schema_id="schema_id_3",
|
|
|
|
schema=Schema(
|
|
|
|
schema_str='{"type":"record", "name":"Topic2Key", "namespace": "test.acryl", "fields": [{"name":"t2key", "type": "string"}]}',
|
|
|
|
schema_type="AVRO",
|
2022-02-22 16:05:46 -08:00
|
|
|
),
|
2022-12-22 14:04:55 +05:30
|
|
|
subject="test.acryl.Topic2Key",
|
|
|
|
version=1,
|
|
|
|
),
|
|
|
|
RegisteredSchema(
|
|
|
|
schema_id="schema_id_4",
|
|
|
|
schema=Schema(
|
|
|
|
schema_str='{"type":"record", "name":"Topic2Value", "namespace": "test.acryl", "fields": [{"name":"t2value", "type": "string"}]}',
|
|
|
|
schema_type="AVRO",
|
2022-02-22 16:05:46 -08:00
|
|
|
),
|
2022-12-22 14:04:55 +05:30
|
|
|
subject="test.acryl.Topic2Value",
|
|
|
|
version=1,
|
2022-02-22 16:05:46 -08:00
|
|
|
),
|
2022-12-22 14:04:55 +05:30
|
|
|
),
|
|
|
|
# TopicRecordNameStrategy is used for subject
|
|
|
|
"topic3": (
|
|
|
|
RegisteredSchema(
|
|
|
|
schema_id="schema_id_4",
|
|
|
|
schema=Schema(
|
|
|
|
schema_str='{"type":"record", "name":"Topic3Key", "namespace": "test.acryl", "fields": [{"name":"t3key", "type": "string"}]}',
|
|
|
|
schema_type="AVRO",
|
2022-02-22 16:05:46 -08:00
|
|
|
),
|
2022-12-22 14:04:55 +05:30
|
|
|
subject="topic3-test.acryl.Topic3Key-key",
|
|
|
|
version=1,
|
|
|
|
),
|
|
|
|
RegisteredSchema(
|
|
|
|
schema_id="schema_id_5",
|
|
|
|
schema=Schema(
|
|
|
|
schema_str='{"type":"record", "name":"Topic3Value", "namespace": "test.acryl", "fields": [{"name":"t3value", "type": "string"}]}',
|
|
|
|
schema_type="AVRO",
|
2022-02-22 16:05:46 -08:00
|
|
|
),
|
2022-12-22 14:04:55 +05:30
|
|
|
subject="topic3-test.acryl.Topic3Value-value",
|
|
|
|
version=1,
|
2022-02-22 16:05:46 -08:00
|
|
|
),
|
2022-12-22 14:04:55 +05:30
|
|
|
),
|
|
|
|
}
|
|
|
|
|
|
|
|
# Mock the kafka consumer
|
|
|
|
mock_kafka_instance = mock_kafka_consumer.return_value
|
|
|
|
mock_cluster_metadata = MagicMock()
|
|
|
|
mock_cluster_metadata.topics = {k: None for k in topic_subject_schema_map.keys()}
|
|
|
|
mock_cluster_metadata.topics["schema_less_topic"] = None
|
|
|
|
mock_kafka_instance.list_topics.return_value = mock_cluster_metadata
|
|
|
|
|
|
|
|
# Mock the schema registry client
|
|
|
|
# - mock get_subjects: all subjects in topic_subject_schema_map
|
|
|
|
mock_schema_registry_client.return_value.get_subjects.return_value = [
|
|
|
|
v.subject for v in chain(*topic_subject_schema_map.values())
|
|
|
|
]
|
|
|
|
|
|
|
|
# - mock get_latest_version
|
|
|
|
def mock_get_latest_version(subject_name: str) -> Optional[RegisteredSchema]:
|
|
|
|
for registered_schema in chain(*topic_subject_schema_map.values()):
|
|
|
|
if registered_schema.subject == subject_name:
|
|
|
|
return registered_schema
|
|
|
|
return None
|
|
|
|
|
|
|
|
mock_schema_registry_client.return_value.get_latest_version = (
|
|
|
|
mock_get_latest_version
|
|
|
|
)
|
2022-02-22 16:05:46 -08:00
|
|
|
|
2022-12-22 14:04:55 +05:30
|
|
|
# Test the kafka source
|
|
|
|
source_config = {
|
|
|
|
"connection": {"bootstrap": "localhost:9092"},
|
|
|
|
# Setup the topic_subject_map for topic2 which uses RecordNameStrategy
|
|
|
|
"topic_subject_map": {
|
|
|
|
"topic2-key": "test.acryl.Topic2Key",
|
|
|
|
"topic2-value": "test.acryl.Topic2Value",
|
|
|
|
},
|
|
|
|
}
|
|
|
|
ctx = PipelineContext(run_id="test")
|
|
|
|
kafka_source = KafkaSource.create(source_config, ctx)
|
|
|
|
workunits = list(kafka_source.get_workunits())
|
|
|
|
|
|
|
|
mock_kafka_consumer.assert_called_once()
|
|
|
|
mock_kafka_instance.list_topics.assert_called_once()
|
|
|
|
assert len(workunits) == 8
|
|
|
|
i: int = -1
|
|
|
|
for wu in workunits:
|
|
|
|
assert isinstance(wu, MetadataWorkUnit)
|
|
|
|
if not isinstance(wu.metadata, MetadataChangeEvent):
|
|
|
|
continue
|
|
|
|
mce: MetadataChangeEvent = wu.metadata
|
|
|
|
i += 1
|
|
|
|
|
|
|
|
if i < len(topic_subject_schema_map.keys()):
|
|
|
|
# First 3 workunits (topics) must have schemaMetadata aspect
|
|
|
|
assert isinstance(mce.proposedSnapshot.aspects[1], SchemaMetadataClass)
|
|
|
|
schemaMetadataAspect: SchemaMetadataClass = mce.proposedSnapshot.aspects[1]
|
|
|
|
assert isinstance(schemaMetadataAspect.platformSchema, KafkaSchemaClass)
|
|
|
|
# Make sure the schema name is present in topic_subject_schema_map.
|
|
|
|
assert schemaMetadataAspect.schemaName in topic_subject_schema_map
|
|
|
|
# Make sure the schema_str matches for the key schema.
|
|
|
|
assert (
|
|
|
|
schemaMetadataAspect.platformSchema.keySchema
|
|
|
|
== topic_subject_schema_map[schemaMetadataAspect.schemaName][
|
|
|
|
0
|
|
|
|
].schema.schema_str
|
|
|
|
)
|
2023-05-17 23:33:08 +05:30
|
|
|
# Make sure the schema_type matches for the key schema.
|
|
|
|
assert (
|
|
|
|
schemaMetadataAspect.platformSchema.keySchemaType
|
|
|
|
== topic_subject_schema_map[schemaMetadataAspect.schemaName][
|
|
|
|
0
|
|
|
|
].schema.schema_type
|
|
|
|
)
|
2022-12-22 14:04:55 +05:30
|
|
|
# Make sure the schema_str matches for the value schema.
|
|
|
|
assert (
|
|
|
|
schemaMetadataAspect.platformSchema.documentSchema
|
|
|
|
== topic_subject_schema_map[schemaMetadataAspect.schemaName][
|
|
|
|
1
|
|
|
|
].schema.schema_str
|
|
|
|
)
|
2023-05-17 23:33:08 +05:30
|
|
|
# Make sure the schema_type matches for the value schema.
|
|
|
|
assert (
|
|
|
|
schemaMetadataAspect.platformSchema.documentSchemaType
|
|
|
|
== topic_subject_schema_map[schemaMetadataAspect.schemaName][
|
|
|
|
1
|
|
|
|
].schema.schema_type
|
|
|
|
)
|
2022-12-22 14:04:55 +05:30
|
|
|
# Make sure we have 2 fields, one from the key schema & one from the value schema.
|
|
|
|
assert len(schemaMetadataAspect.fields) == 2
|
|
|
|
else:
|
|
|
|
# Last topic('schema_less_topic') has no schema defined in the registry.
|
|
|
|
# The schemaMetadata aspect should not be present for this.
|
|
|
|
for aspect in mce.proposedSnapshot.aspects:
|
|
|
|
assert not isinstance(aspect, SchemaMetadataClass)
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
|
|
"ignore_warnings_on_schema_type",
|
|
|
|
[
|
|
|
|
pytest.param(
|
|
|
|
False,
|
|
|
|
id="ignore_warnings_on_schema_type-FALSE",
|
|
|
|
),
|
|
|
|
pytest.param(
|
|
|
|
True,
|
|
|
|
id="ignore_warnings_on_schema_type-TRUE",
|
|
|
|
),
|
|
|
|
],
|
|
|
|
)
|
|
|
|
@patch(
|
2023-05-04 05:31:58 +05:30
|
|
|
"datahub.ingestion.source.confluent_schema_registry.SchemaRegistryClient",
|
2022-12-22 14:04:55 +05:30
|
|
|
autospec=True,
|
|
|
|
)
|
|
|
|
@patch("datahub.ingestion.source.kafka.confluent_kafka.Consumer", autospec=True)
|
|
|
|
def test_kafka_ignore_warnings_on_schema_type(
|
|
|
|
mock_kafka_consumer,
|
|
|
|
mock_schema_registry_client,
|
2023-01-23 10:22:20 -05:00
|
|
|
mock_admin_client,
|
2022-12-22 14:04:55 +05:30
|
|
|
ignore_warnings_on_schema_type,
|
|
|
|
):
|
|
|
|
# define the key and value schemas for topic1
|
|
|
|
topic1_key_schema = RegisteredSchema(
|
|
|
|
schema_id="schema_id_2",
|
|
|
|
schema=Schema(
|
|
|
|
schema_str="{}",
|
2023-02-19 08:43:13 -08:00
|
|
|
schema_type="UNKNOWN_TYPE",
|
2022-12-22 14:04:55 +05:30
|
|
|
),
|
|
|
|
subject="topic1-key",
|
|
|
|
version=1,
|
2022-05-24 23:27:02 +02:00
|
|
|
)
|
2022-12-22 14:04:55 +05:30
|
|
|
topic1_value_schema = RegisteredSchema(
|
|
|
|
schema_id="schema_id_1",
|
|
|
|
schema=Schema(
|
|
|
|
schema_str="{}",
|
2023-02-19 08:43:13 -08:00
|
|
|
schema_type="UNKNOWN_TYPE",
|
2022-12-22 14:04:55 +05:30
|
|
|
),
|
|
|
|
subject="topic1-value",
|
|
|
|
version=1,
|
2022-05-24 23:27:02 +02:00
|
|
|
)
|
|
|
|
|
2022-12-22 14:04:55 +05:30
|
|
|
# Mock the kafka consumer
|
|
|
|
mock_kafka_instance = mock_kafka_consumer.return_value
|
|
|
|
mock_cluster_metadata = MagicMock()
|
|
|
|
mock_cluster_metadata.topics = {"topic1": None}
|
|
|
|
mock_kafka_instance.list_topics.return_value = mock_cluster_metadata
|
|
|
|
|
|
|
|
# Mock the schema registry client
|
|
|
|
mock_schema_registry_client.return_value.get_subjects.return_value = [
|
|
|
|
"topic1-key",
|
|
|
|
"topic1-value",
|
|
|
|
]
|
|
|
|
|
|
|
|
# - mock get_latest_version
|
|
|
|
def mock_get_latest_version(subject_name: str) -> Optional[RegisteredSchema]:
|
|
|
|
if subject_name == "topic1-key":
|
|
|
|
return topic1_key_schema
|
|
|
|
elif subject_name == "topic1-value":
|
|
|
|
return topic1_value_schema
|
|
|
|
return None
|
|
|
|
|
|
|
|
mock_schema_registry_client.return_value.get_latest_version = (
|
|
|
|
mock_get_latest_version
|
|
|
|
)
|
2022-05-24 23:27:02 +02:00
|
|
|
|
2022-12-22 14:04:55 +05:30
|
|
|
# Test the kafka source
|
|
|
|
source_config = {
|
|
|
|
"connection": {"bootstrap": "localhost:9092"},
|
|
|
|
"ignore_warnings_on_schema_type": f"{ignore_warnings_on_schema_type}",
|
|
|
|
}
|
|
|
|
ctx = PipelineContext(run_id="test")
|
|
|
|
kafka_source = KafkaSource.create(source_config, ctx)
|
|
|
|
|
|
|
|
workunits = list(kafka_source.get_workunits())
|
|
|
|
|
|
|
|
assert len(workunits) == 2
|
|
|
|
if ignore_warnings_on_schema_type:
|
|
|
|
assert not kafka_source.report.warnings
|
|
|
|
else:
|
|
|
|
assert kafka_source.report.warnings
|
|
|
|
|
|
|
|
|
|
|
|
@patch("datahub.ingestion.source.kafka.AdminClient", autospec=True)
|
|
|
|
@patch("datahub.ingestion.source.kafka.confluent_kafka.Consumer", autospec=True)
|
|
|
|
def test_kafka_source_succeeds_with_admin_client_init_error(
|
|
|
|
mock_kafka, mock_kafka_admin_client
|
|
|
|
):
|
|
|
|
mock_kafka_instance = mock_kafka.return_value
|
|
|
|
mock_cluster_metadata = MagicMock()
|
|
|
|
mock_cluster_metadata.topics = {"test": None, "foobar": None, "bazbaz": None}
|
|
|
|
mock_kafka_instance.list_topics.return_value = mock_cluster_metadata
|
|
|
|
|
|
|
|
mock_kafka_admin_client.side_effect = Exception()
|
|
|
|
|
|
|
|
ctx = PipelineContext(run_id="test1")
|
|
|
|
kafka_source = KafkaSource.create(
|
|
|
|
{
|
|
|
|
"topic_patterns": {"allow": ["test"]},
|
2022-05-24 23:27:02 +02:00
|
|
|
"connection": {"bootstrap": "localhost:9092"},
|
2022-12-22 14:04:55 +05:30
|
|
|
},
|
|
|
|
ctx,
|
|
|
|
)
|
|
|
|
workunits = [w for w in kafka_source.get_workunits()]
|
|
|
|
|
|
|
|
mock_kafka.assert_called_once()
|
|
|
|
mock_kafka_instance.list_topics.assert_called_once()
|
|
|
|
|
|
|
|
mock_kafka_admin_client.assert_called_once()
|
|
|
|
|
|
|
|
assert len(workunits) == 2
|
|
|
|
|
|
|
|
|
|
|
|
@patch("datahub.ingestion.source.kafka.AdminClient", autospec=True)
|
|
|
|
@patch("datahub.ingestion.source.kafka.confluent_kafka.Consumer", autospec=True)
|
|
|
|
def test_kafka_source_succeeds_with_describe_configs_error(
|
|
|
|
mock_kafka, mock_kafka_admin_client
|
|
|
|
):
|
|
|
|
mock_kafka_instance = mock_kafka.return_value
|
|
|
|
mock_cluster_metadata = MagicMock()
|
|
|
|
mock_cluster_metadata.topics = {"test": None, "foobar": None, "bazbaz": None}
|
|
|
|
mock_kafka_instance.list_topics.return_value = mock_cluster_metadata
|
|
|
|
|
|
|
|
mock_admin_client_instance = mock_kafka_admin_client.return_value
|
|
|
|
mock_admin_client_instance.describe_configs.side_effect = Exception()
|
|
|
|
|
|
|
|
ctx = PipelineContext(run_id="test1")
|
|
|
|
kafka_source = KafkaSource.create(
|
|
|
|
{
|
|
|
|
"topic_patterns": {"allow": ["test"]},
|
|
|
|
"connection": {"bootstrap": "localhost:9092"},
|
|
|
|
},
|
|
|
|
ctx,
|
|
|
|
)
|
|
|
|
workunits = [w for w in kafka_source.get_workunits()]
|
|
|
|
|
|
|
|
mock_kafka.assert_called_once()
|
|
|
|
mock_kafka_instance.list_topics.assert_called_once()
|
|
|
|
|
|
|
|
mock_kafka_admin_client.assert_called_once()
|
|
|
|
mock_admin_client_instance.describe_configs.assert_called_once()
|
|
|
|
|
|
|
|
assert len(workunits) == 2
|