mirror of
https://github.com/datahub-project/datahub.git
synced 2025-07-04 07:34:44 +00:00
810 lines
29 KiB
Python
810 lines
29 KiB
Python
import json
|
|
from itertools import chain
|
|
from typing import Dict, Optional, Tuple
|
|
from unittest.mock import MagicMock, patch
|
|
|
|
import pytest
|
|
from confluent_kafka.schema_registry.schema_registry_client import (
|
|
RegisteredSchema,
|
|
Schema,
|
|
)
|
|
from freezegun import freeze_time
|
|
|
|
from datahub.configuration.common import ConfigurationError
|
|
from datahub.emitter.mce_builder import (
|
|
OwnerType,
|
|
make_dataplatform_instance_urn,
|
|
make_dataset_urn,
|
|
make_dataset_urn_with_platform_instance,
|
|
make_global_tag_aspect_with_tag_list,
|
|
make_glossary_terms_aspect_from_urn_list,
|
|
make_owner_urn,
|
|
make_ownership_aspect_from_urn_list,
|
|
)
|
|
from datahub.emitter.mcp import MetadataChangeProposalWrapper
|
|
from datahub.ingestion.api.common import PipelineContext
|
|
from datahub.ingestion.api.workunit import MetadataWorkUnit
|
|
from datahub.ingestion.source.kafka.kafka import KafkaSource, KafkaSourceConfig
|
|
from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
|
|
from datahub.metadata.schema_classes import (
|
|
BrowsePathsClass,
|
|
DataPlatformInstanceClass,
|
|
GlobalTagsClass,
|
|
GlossaryTermsClass,
|
|
KafkaSchemaClass,
|
|
OwnershipClass,
|
|
SchemaMetadataClass,
|
|
)
|
|
|
|
|
|
@pytest.fixture
|
|
def mock_admin_client():
|
|
with patch(
|
|
"datahub.ingestion.source.kafka.kafka.AdminClient", autospec=True
|
|
) as mock:
|
|
yield mock
|
|
|
|
|
|
@patch("datahub.ingestion.source.kafka.kafka.confluent_kafka.Consumer", autospec=True)
|
|
def test_kafka_source_configuration(mock_kafka):
|
|
ctx = PipelineContext(run_id="test")
|
|
kafka_source = KafkaSource(
|
|
KafkaSourceConfig.parse_obj({"connection": {"bootstrap": "foobar:9092"}}),
|
|
ctx,
|
|
)
|
|
kafka_source.close()
|
|
assert mock_kafka.call_count == 1
|
|
|
|
|
|
@patch("datahub.ingestion.source.kafka.kafka.confluent_kafka.Consumer", autospec=True)
|
|
def test_kafka_source_workunits_wildcard_topic(mock_kafka, mock_admin_client):
|
|
mock_kafka_instance = mock_kafka.return_value
|
|
mock_cluster_metadata = MagicMock()
|
|
mock_cluster_metadata.topics = {"foobar": None, "bazbaz": None}
|
|
mock_kafka_instance.list_topics.return_value = mock_cluster_metadata
|
|
|
|
ctx = PipelineContext(run_id="test")
|
|
kafka_source = KafkaSource(
|
|
KafkaSourceConfig.parse_obj({"connection": {"bootstrap": "localhost:9092"}}),
|
|
ctx,
|
|
)
|
|
workunits = list(kafka_source.get_workunits())
|
|
|
|
first_mce = workunits[0].metadata
|
|
assert isinstance(first_mce, MetadataChangeEvent)
|
|
mock_kafka.assert_called_once()
|
|
mock_kafka_instance.list_topics.assert_called_once()
|
|
assert len(workunits) == 6
|
|
|
|
|
|
@patch("datahub.ingestion.source.kafka.kafka.confluent_kafka.Consumer", autospec=True)
|
|
def test_kafka_source_workunits_topic_pattern(mock_kafka, mock_admin_client):
|
|
mock_kafka_instance = mock_kafka.return_value
|
|
mock_cluster_metadata = MagicMock()
|
|
mock_cluster_metadata.topics = {"test": None, "foobar": None, "bazbaz": None}
|
|
mock_kafka_instance.list_topics.return_value = mock_cluster_metadata
|
|
|
|
ctx = PipelineContext(run_id="test1")
|
|
kafka_source = KafkaSource.create(
|
|
{
|
|
"topic_patterns": {"allow": ["test"]},
|
|
"connection": {"bootstrap": "localhost:9092"},
|
|
},
|
|
ctx,
|
|
)
|
|
workunits = [w for w in kafka_source.get_workunits()]
|
|
|
|
mock_kafka.assert_called_once()
|
|
mock_kafka_instance.list_topics.assert_called_once()
|
|
assert len(workunits) == 3
|
|
|
|
mock_cluster_metadata.topics = {"test": None, "test2": None, "bazbaz": None}
|
|
ctx = PipelineContext(run_id="test2")
|
|
kafka_source = KafkaSource.create(
|
|
{
|
|
"topic_patterns": {"allow": ["test.*"]},
|
|
"connection": {"bootstrap": "localhost:9092"},
|
|
},
|
|
ctx,
|
|
)
|
|
workunits = [w for w in kafka_source.get_workunits()]
|
|
assert len(workunits) == 6
|
|
|
|
|
|
@patch("datahub.ingestion.source.kafka.kafka.confluent_kafka.Consumer", autospec=True)
|
|
def test_kafka_source_workunits_with_platform_instance(mock_kafka, mock_admin_client):
|
|
PLATFORM_INSTANCE = "kafka_cluster"
|
|
PLATFORM = "kafka"
|
|
TOPIC_NAME = "test"
|
|
|
|
mock_kafka_instance = mock_kafka.return_value
|
|
mock_cluster_metadata = MagicMock()
|
|
mock_cluster_metadata.topics = {TOPIC_NAME: None}
|
|
mock_kafka_instance.list_topics.return_value = mock_cluster_metadata
|
|
|
|
ctx = PipelineContext(run_id="test1")
|
|
kafka_source = KafkaSource.create(
|
|
{
|
|
"connection": {"bootstrap": "localhost:9092"},
|
|
"platform_instance": PLATFORM_INSTANCE,
|
|
},
|
|
ctx,
|
|
)
|
|
workunits = [w for w in kafka_source.get_workunits()]
|
|
|
|
# We should only have 1 topic + sub-type wu + browse paths.
|
|
assert len(workunits) == 3
|
|
assert isinstance(workunits[0], MetadataWorkUnit)
|
|
assert isinstance(workunits[0].metadata, MetadataChangeEvent)
|
|
proposed_snap = workunits[0].metadata.proposedSnapshot
|
|
assert proposed_snap.urn == make_dataset_urn_with_platform_instance(
|
|
platform=PLATFORM,
|
|
name=TOPIC_NAME,
|
|
platform_instance=PLATFORM_INSTANCE,
|
|
env="PROD",
|
|
)
|
|
|
|
# DataPlatform aspect should be present when platform_instance is configured
|
|
data_platform_aspects = [
|
|
asp
|
|
for asp in proposed_snap.aspects
|
|
if isinstance(asp, DataPlatformInstanceClass)
|
|
]
|
|
assert len(data_platform_aspects) == 1
|
|
assert data_platform_aspects[0].instance == make_dataplatform_instance_urn(
|
|
PLATFORM, PLATFORM_INSTANCE
|
|
)
|
|
|
|
# The default browse path should include the platform_instance value
|
|
browse_path_aspects = [
|
|
asp for asp in proposed_snap.aspects if isinstance(asp, BrowsePathsClass)
|
|
]
|
|
assert len(browse_path_aspects) == 1
|
|
assert f"/prod/{PLATFORM}/{PLATFORM_INSTANCE}" in browse_path_aspects[0].paths
|
|
|
|
|
|
@patch("datahub.ingestion.source.kafka.kafka.confluent_kafka.Consumer", autospec=True)
|
|
def test_kafka_source_workunits_no_platform_instance(mock_kafka, mock_admin_client):
|
|
PLATFORM = "kafka"
|
|
TOPIC_NAME = "test"
|
|
|
|
mock_kafka_instance = mock_kafka.return_value
|
|
mock_cluster_metadata = MagicMock()
|
|
mock_cluster_metadata.topics = {TOPIC_NAME: None}
|
|
mock_kafka_instance.list_topics.return_value = mock_cluster_metadata
|
|
|
|
ctx = PipelineContext(run_id="test1")
|
|
kafka_source = KafkaSource.create(
|
|
{"connection": {"bootstrap": "localhost:9092"}},
|
|
ctx,
|
|
)
|
|
workunits = [w for w in kafka_source.get_workunits()]
|
|
|
|
# We should only have 1 topic + sub-type wu + browse paths.
|
|
assert len(workunits) == 3
|
|
assert isinstance(workunits[0], MetadataWorkUnit)
|
|
assert isinstance(workunits[0].metadata, MetadataChangeEvent)
|
|
proposed_snap = workunits[0].metadata.proposedSnapshot
|
|
assert proposed_snap.urn == make_dataset_urn(
|
|
platform=PLATFORM,
|
|
name=TOPIC_NAME,
|
|
env="PROD",
|
|
)
|
|
|
|
# DataPlatform aspect should not be present when platform_instance is not configured
|
|
data_platform_aspects = [
|
|
asp
|
|
for asp in proposed_snap.aspects
|
|
if isinstance(asp, DataPlatformInstanceClass)
|
|
]
|
|
assert len(data_platform_aspects) == 0
|
|
|
|
# The default browse path should include the platform_instance value
|
|
browse_path_aspects = [
|
|
asp for asp in proposed_snap.aspects if isinstance(asp, BrowsePathsClass)
|
|
]
|
|
assert len(browse_path_aspects) == 1
|
|
assert f"/prod/{PLATFORM}" in browse_path_aspects[0].paths
|
|
|
|
|
|
@patch("datahub.ingestion.source.kafka.kafka.confluent_kafka.Consumer", autospec=True)
|
|
def test_close(mock_kafka, mock_admin_client):
|
|
mock_kafka_instance = mock_kafka.return_value
|
|
ctx = PipelineContext(run_id="test")
|
|
kafka_source = KafkaSource.create(
|
|
{
|
|
"topic_patterns": {"allow": ["test.*"]},
|
|
"connection": {"bootstrap": "localhost:9092"},
|
|
},
|
|
ctx,
|
|
)
|
|
kafka_source.close()
|
|
assert mock_kafka_instance.close.call_count == 1
|
|
|
|
|
|
@patch(
|
|
"datahub.ingestion.source.confluent_schema_registry.SchemaRegistryClient",
|
|
autospec=True,
|
|
)
|
|
@patch("datahub.ingestion.source.kafka.kafka.confluent_kafka.Consumer", autospec=True)
|
|
def test_kafka_source_workunits_schema_registry_subject_name_strategies(
|
|
mock_kafka_consumer, mock_schema_registry_client, mock_admin_client
|
|
):
|
|
# Setup the topic to key/value schema mappings for all types of schema registry subject name strategies.
|
|
# <key=topic_name, value=(<key_schema>,<value_schema>)
|
|
topic_subject_schema_map: Dict[str, Tuple[RegisteredSchema, RegisteredSchema]] = {
|
|
# TopicNameStrategy is used for subject
|
|
"topic1": (
|
|
RegisteredSchema(
|
|
schema_id="schema_id_2",
|
|
schema=Schema(
|
|
schema_str='{"type":"record", "name":"Topic1Key", "namespace": "test.acryl", "fields": [{"name":"t1key", "type": "string"}]}',
|
|
schema_type="AVRO",
|
|
),
|
|
subject="topic1-key",
|
|
version=1,
|
|
),
|
|
RegisteredSchema(
|
|
schema_id="schema_id_1",
|
|
schema=Schema(
|
|
schema_str='{"type":"record", "name":"Topic1Value", "namespace": "test.acryl", "fields": [{"name":"t1value", "type": "string"}]}',
|
|
schema_type="AVRO",
|
|
),
|
|
subject="topic1-value",
|
|
version=1,
|
|
),
|
|
),
|
|
# RecordNameStrategy is used for subject
|
|
"topic2": (
|
|
RegisteredSchema(
|
|
schema_id="schema_id_3",
|
|
schema=Schema(
|
|
schema_str='{"type":"record", "name":"Topic2Key", "namespace": "test.acryl", "fields": [{"name":"t2key", "type": "string"}]}',
|
|
schema_type="AVRO",
|
|
),
|
|
subject="test.acryl.Topic2Key",
|
|
version=1,
|
|
),
|
|
RegisteredSchema(
|
|
schema_id="schema_id_4",
|
|
schema=Schema(
|
|
schema_str='{"type":"record", "name":"Topic2Value", "namespace": "test.acryl", "fields": [{"name":"t2value", "type": "string"}]}',
|
|
schema_type="AVRO",
|
|
),
|
|
subject="test.acryl.Topic2Value",
|
|
version=1,
|
|
),
|
|
),
|
|
# TopicRecordNameStrategy is used for subject
|
|
"topic3": (
|
|
RegisteredSchema(
|
|
schema_id="schema_id_4",
|
|
schema=Schema(
|
|
schema_str='{"type":"record", "name":"Topic3Key", "namespace": "test.acryl", "fields": [{"name":"t3key", "type": "string"}]}',
|
|
schema_type="AVRO",
|
|
),
|
|
subject="topic3-test.acryl.Topic3Key-key",
|
|
version=1,
|
|
),
|
|
RegisteredSchema(
|
|
schema_id="schema_id_5",
|
|
schema=Schema(
|
|
schema_str='{"type":"record", "name":"Topic3Value", "namespace": "test.acryl", "fields": [{"name":"t3value", "type": "string"}]}',
|
|
schema_type="AVRO",
|
|
),
|
|
subject="topic3-test.acryl.Topic3Value-value",
|
|
version=1,
|
|
),
|
|
),
|
|
}
|
|
|
|
# Mock the kafka consumer
|
|
mock_kafka_instance = mock_kafka_consumer.return_value
|
|
mock_cluster_metadata = MagicMock()
|
|
mock_cluster_metadata.topics = {k: None for k in topic_subject_schema_map}
|
|
mock_cluster_metadata.topics["schema_less_topic"] = None
|
|
mock_kafka_instance.list_topics.return_value = mock_cluster_metadata
|
|
|
|
# Mock the schema registry client
|
|
# - mock get_subjects: all subjects in topic_subject_schema_map
|
|
mock_schema_registry_client.return_value.get_subjects.return_value = [
|
|
v.subject for v in chain(*topic_subject_schema_map.values())
|
|
]
|
|
|
|
# - mock get_latest_version
|
|
def mock_get_latest_version(subject_name: str) -> Optional[RegisteredSchema]:
|
|
for registered_schema in chain(*topic_subject_schema_map.values()):
|
|
if registered_schema.subject == subject_name:
|
|
return registered_schema
|
|
return None
|
|
|
|
mock_schema_registry_client.return_value.get_latest_version = (
|
|
mock_get_latest_version
|
|
)
|
|
|
|
# Test the kafka source
|
|
source_config = {
|
|
"connection": {"bootstrap": "localhost:9092"},
|
|
# Setup the topic_subject_map for topic2 which uses RecordNameStrategy
|
|
"topic_subject_map": {
|
|
"topic2-key": "test.acryl.Topic2Key",
|
|
"topic2-value": "test.acryl.Topic2Value",
|
|
},
|
|
"ingest_schemas_as_entities": True,
|
|
}
|
|
ctx = PipelineContext(run_id="test")
|
|
kafka_source = KafkaSource.create(source_config, ctx)
|
|
workunits = list(kafka_source.get_workunits())
|
|
|
|
mock_kafka_consumer.assert_called_once()
|
|
mock_kafka_instance.list_topics.assert_called_once()
|
|
# Along with with 4 topics (3 with schema and 1 schemaless) which constitutes to 8 workunits,
|
|
# there will be 6 schemas (1 key and 1 value schema for 3 topics) which constitutes to 12 workunits
|
|
# and there will be 10 browse paths workunits
|
|
assert len(workunits) == 30
|
|
i: int = -1
|
|
for wu in workunits:
|
|
assert isinstance(wu, MetadataWorkUnit)
|
|
if not isinstance(wu.metadata, MetadataChangeEvent):
|
|
continue
|
|
mce: MetadataChangeEvent = wu.metadata
|
|
i += 1
|
|
|
|
# Only topic (named schema_less_topic) does not have schema metadata but other workunits (that are created
|
|
# for schema) will have corresponding SchemaMetadata aspect
|
|
if i < len(topic_subject_schema_map.keys()):
|
|
# First 3 workunits (topics) must have schemaMetadata aspect
|
|
assert isinstance(mce.proposedSnapshot.aspects[1], SchemaMetadataClass)
|
|
schemaMetadataAspect: SchemaMetadataClass = mce.proposedSnapshot.aspects[1]
|
|
assert isinstance(schemaMetadataAspect.platformSchema, KafkaSchemaClass)
|
|
# Make sure the schema name is present in topic_subject_schema_map.
|
|
assert schemaMetadataAspect.schemaName in topic_subject_schema_map
|
|
# Make sure the schema_str matches for the key schema.
|
|
assert (
|
|
schemaMetadataAspect.platformSchema.keySchema
|
|
== topic_subject_schema_map[schemaMetadataAspect.schemaName][
|
|
0
|
|
].schema.schema_str
|
|
)
|
|
# Make sure the schema_type matches for the key schema.
|
|
assert (
|
|
schemaMetadataAspect.platformSchema.keySchemaType
|
|
== topic_subject_schema_map[schemaMetadataAspect.schemaName][
|
|
0
|
|
].schema.schema_type
|
|
)
|
|
# Make sure the schema_str matches for the value schema.
|
|
assert (
|
|
schemaMetadataAspect.platformSchema.documentSchema
|
|
== topic_subject_schema_map[schemaMetadataAspect.schemaName][
|
|
1
|
|
].schema.schema_str
|
|
)
|
|
# Make sure the schema_type matches for the value schema.
|
|
assert (
|
|
schemaMetadataAspect.platformSchema.documentSchemaType
|
|
== topic_subject_schema_map[schemaMetadataAspect.schemaName][
|
|
1
|
|
].schema.schema_type
|
|
)
|
|
# Make sure we have 2 fields, one from the key schema & one from the value schema.
|
|
assert len(schemaMetadataAspect.fields) == 2
|
|
elif i == len(topic_subject_schema_map.keys()):
|
|
# Last topic('schema_less_topic') has no schema defined in the registry.
|
|
# The schemaMetadata aspect should not be present for this.
|
|
for aspect in mce.proposedSnapshot.aspects:
|
|
assert not isinstance(aspect, SchemaMetadataClass)
|
|
else:
|
|
# Last 2 workunits (schemas) must have schemaMetadata aspect
|
|
assert isinstance(mce.proposedSnapshot.aspects[1], SchemaMetadataClass)
|
|
schemaMetadataAspectObj: SchemaMetadataClass = mce.proposedSnapshot.aspects[
|
|
1
|
|
]
|
|
assert isinstance(schemaMetadataAspectObj.platformSchema, KafkaSchemaClass)
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"ignore_warnings_on_schema_type",
|
|
[
|
|
pytest.param(
|
|
False,
|
|
id="ignore_warnings_on_schema_type-FALSE",
|
|
),
|
|
pytest.param(
|
|
True,
|
|
id="ignore_warnings_on_schema_type-TRUE",
|
|
),
|
|
],
|
|
)
|
|
@patch(
|
|
"datahub.ingestion.source.confluent_schema_registry.SchemaRegistryClient",
|
|
autospec=True,
|
|
)
|
|
@patch("datahub.ingestion.source.kafka.kafka.confluent_kafka.Consumer", autospec=True)
|
|
def test_kafka_ignore_warnings_on_schema_type(
|
|
mock_kafka_consumer,
|
|
mock_schema_registry_client,
|
|
mock_admin_client,
|
|
ignore_warnings_on_schema_type,
|
|
):
|
|
# define the key and value schemas for topic1
|
|
topic1_key_schema = RegisteredSchema(
|
|
schema_id="schema_id_2",
|
|
schema=Schema(
|
|
schema_str="{}",
|
|
schema_type="UNKNOWN_TYPE",
|
|
),
|
|
subject="topic1-key",
|
|
version=1,
|
|
)
|
|
topic1_value_schema = RegisteredSchema(
|
|
schema_id="schema_id_1",
|
|
schema=Schema(
|
|
schema_str="{}",
|
|
schema_type="UNKNOWN_TYPE",
|
|
),
|
|
subject="topic1-value",
|
|
version=1,
|
|
)
|
|
|
|
# Mock the kafka consumer
|
|
mock_kafka_instance = mock_kafka_consumer.return_value
|
|
mock_cluster_metadata = MagicMock()
|
|
mock_cluster_metadata.topics = {"topic1": None}
|
|
mock_kafka_instance.list_topics.return_value = mock_cluster_metadata
|
|
|
|
# Mock the schema registry client
|
|
mock_schema_registry_client.return_value.get_subjects.return_value = [
|
|
"topic1-key",
|
|
"topic1-value",
|
|
]
|
|
|
|
# - mock get_latest_version
|
|
def mock_get_latest_version(subject_name: str) -> Optional[RegisteredSchema]:
|
|
if subject_name == "topic1-key":
|
|
return topic1_key_schema
|
|
elif subject_name == "topic1-value":
|
|
return topic1_value_schema
|
|
return None
|
|
|
|
mock_schema_registry_client.return_value.get_latest_version = (
|
|
mock_get_latest_version
|
|
)
|
|
|
|
# Test the kafka source
|
|
source_config = {
|
|
"connection": {"bootstrap": "localhost:9092"},
|
|
"ignore_warnings_on_schema_type": f"{ignore_warnings_on_schema_type}",
|
|
}
|
|
ctx = PipelineContext(run_id="test")
|
|
kafka_source = KafkaSource.create(source_config, ctx)
|
|
|
|
workunits = list(kafka_source.get_workunits())
|
|
assert len(workunits) == 3
|
|
if ignore_warnings_on_schema_type:
|
|
assert not kafka_source.report.warnings
|
|
else:
|
|
assert kafka_source.report.warnings
|
|
|
|
|
|
@patch("datahub.ingestion.source.kafka.kafka.AdminClient", autospec=True)
|
|
@patch("datahub.ingestion.source.kafka.kafka.confluent_kafka.Consumer", autospec=True)
|
|
def test_kafka_source_succeeds_with_admin_client_init_error(
|
|
mock_kafka, mock_kafka_admin_client
|
|
):
|
|
mock_kafka_instance = mock_kafka.return_value
|
|
mock_cluster_metadata = MagicMock()
|
|
mock_cluster_metadata.topics = {"test": None, "foobar": None, "bazbaz": None}
|
|
mock_kafka_instance.list_topics.return_value = mock_cluster_metadata
|
|
|
|
mock_kafka_admin_client.side_effect = Exception()
|
|
|
|
ctx = PipelineContext(run_id="test1")
|
|
kafka_source = KafkaSource.create(
|
|
{
|
|
"topic_patterns": {"allow": ["test"]},
|
|
"connection": {"bootstrap": "localhost:9092"},
|
|
},
|
|
ctx,
|
|
)
|
|
workunits = [w for w in kafka_source.get_workunits()]
|
|
|
|
mock_kafka.assert_called_once()
|
|
mock_kafka_instance.list_topics.assert_called_once()
|
|
|
|
mock_kafka_admin_client.assert_called_once()
|
|
|
|
assert len(workunits) == 3
|
|
|
|
|
|
@patch("datahub.ingestion.source.kafka.kafka.AdminClient", autospec=True)
|
|
@patch("datahub.ingestion.source.kafka.kafka.confluent_kafka.Consumer", autospec=True)
|
|
def test_kafka_source_succeeds_with_describe_configs_error(
|
|
mock_kafka, mock_kafka_admin_client
|
|
):
|
|
mock_kafka_instance = mock_kafka.return_value
|
|
mock_cluster_metadata = MagicMock()
|
|
mock_cluster_metadata.topics = {"test": None, "foobar": None, "bazbaz": None}
|
|
mock_kafka_instance.list_topics.return_value = mock_cluster_metadata
|
|
|
|
mock_admin_client_instance = mock_kafka_admin_client.return_value
|
|
mock_admin_client_instance.describe_configs.side_effect = Exception()
|
|
|
|
ctx = PipelineContext(run_id="test1")
|
|
kafka_source = KafkaSource.create(
|
|
{
|
|
"topic_patterns": {"allow": ["test"]},
|
|
"connection": {"bootstrap": "localhost:9092"},
|
|
},
|
|
ctx,
|
|
)
|
|
workunits = [w for w in kafka_source.get_workunits()]
|
|
|
|
mock_kafka.assert_called_once()
|
|
mock_kafka_instance.list_topics.assert_called_once()
|
|
|
|
mock_kafka_admin_client.assert_called_once()
|
|
mock_admin_client_instance.describe_configs.assert_called_once()
|
|
|
|
assert len(workunits) == 3
|
|
|
|
|
|
@freeze_time("2023-09-20 10:00:00")
|
|
@patch(
|
|
"datahub.ingestion.source.confluent_schema_registry.SchemaRegistryClient",
|
|
autospec=True,
|
|
)
|
|
@patch("datahub.ingestion.source.kafka.kafka.confluent_kafka.Consumer", autospec=True)
|
|
def test_kafka_source_topic_meta_mappings(
|
|
mock_kafka_consumer, mock_schema_registry_client, mock_admin_client
|
|
):
|
|
# Setup the topic to key/value schema mappings for all types of schema registry subject name strategies.
|
|
# <key=topic_name, value=(<key_schema>,<value_schema>)
|
|
topic_subject_schema_map: Dict[str, Tuple[RegisteredSchema, RegisteredSchema]] = {
|
|
"topic1": (
|
|
RegisteredSchema(
|
|
schema_id="schema_id_2",
|
|
schema=Schema(
|
|
schema_str='{"type":"record", "name":"Topic1Key", "namespace": "test.acryl", "fields": [{"name":"t1key", "type": "string"}]}',
|
|
schema_type="AVRO",
|
|
),
|
|
subject="topic1-key",
|
|
version=1,
|
|
),
|
|
RegisteredSchema(
|
|
schema_id="schema_id_1",
|
|
schema=Schema(
|
|
schema_str=json.dumps(
|
|
{
|
|
"type": "record",
|
|
"name": "Topic1Value",
|
|
"namespace": "test.acryl",
|
|
"fields": [{"name": "t1value", "type": "string"}],
|
|
"owner": "@charles",
|
|
"business_owner": "jdoe.last@gmail.com",
|
|
"data_governance.team_owner": "Finance",
|
|
"has_pii": True,
|
|
"int_property": 1,
|
|
"double_property": 2.5,
|
|
}
|
|
),
|
|
schema_type="AVRO",
|
|
),
|
|
subject="topic1-value",
|
|
version=1,
|
|
),
|
|
)
|
|
}
|
|
|
|
# Mock the kafka consumer
|
|
mock_kafka_instance = mock_kafka_consumer.return_value
|
|
mock_cluster_metadata = MagicMock()
|
|
mock_cluster_metadata.topics = {k: None for k in topic_subject_schema_map}
|
|
mock_kafka_instance.list_topics.return_value = mock_cluster_metadata
|
|
|
|
# Mock the schema registry client
|
|
# - mock get_subjects: all subjects in topic_subject_schema_map
|
|
mock_schema_registry_client.return_value.get_subjects.return_value = [
|
|
v.subject for v in chain(*topic_subject_schema_map.values())
|
|
]
|
|
|
|
# - mock get_latest_version
|
|
def mock_get_latest_version(subject_name: str) -> Optional[RegisteredSchema]:
|
|
for registered_schema in chain(*topic_subject_schema_map.values()):
|
|
if registered_schema.subject == subject_name:
|
|
return registered_schema
|
|
return None
|
|
|
|
mock_schema_registry_client.return_value.get_latest_version = (
|
|
mock_get_latest_version
|
|
)
|
|
|
|
ctx = PipelineContext(run_id="test1")
|
|
kafka_source = KafkaSource.create(
|
|
{
|
|
"connection": {"bootstrap": "localhost:9092"},
|
|
"ingest_schemas_as_entities": True,
|
|
"meta_mapping": {
|
|
"owner": {
|
|
"match": "^@(.*)",
|
|
"operation": "add_owner",
|
|
"config": {"owner_type": "user"},
|
|
},
|
|
"business_owner": {
|
|
"match": ".*",
|
|
"operation": "add_owner",
|
|
"config": {"owner_type": "user"},
|
|
},
|
|
"has_pii": {
|
|
"match": True,
|
|
"operation": "add_tag",
|
|
"config": {"tag": "has_pii_test"},
|
|
},
|
|
"int_property": {
|
|
"match": 1,
|
|
"operation": "add_tag",
|
|
"config": {"tag": "int_meta_property"},
|
|
},
|
|
"double_property": {
|
|
"match": 2.5,
|
|
"operation": "add_term",
|
|
"config": {"term": "double_meta_property"},
|
|
},
|
|
"data_governance.team_owner": {
|
|
"match": "Finance",
|
|
"operation": "add_term",
|
|
"config": {"term": "Finance_test"},
|
|
},
|
|
},
|
|
},
|
|
ctx,
|
|
)
|
|
# Along with with 1 topics (and 5 meta mapping) it constitutes to 6 workunits,
|
|
# there will be 2 schemas which constitutes to 4 workunits (1 mce and 1 mcp each)
|
|
workunits = [w for w in kafka_source.get_workunits()]
|
|
assert len(workunits) == 13
|
|
|
|
# workunit[0] - DatasetSnapshot
|
|
|
|
mce = workunits[0].metadata
|
|
assert isinstance(mce, MetadataChangeEvent)
|
|
|
|
ownership_aspect = [
|
|
asp for asp in mce.proposedSnapshot.aspects if isinstance(asp, OwnershipClass)
|
|
][0]
|
|
assert ownership_aspect == make_ownership_aspect_from_urn_list(
|
|
[
|
|
make_owner_urn("charles", OwnerType.USER),
|
|
make_owner_urn("jdoe.last@gmail.com", OwnerType.USER),
|
|
],
|
|
"SERVICE",
|
|
)
|
|
|
|
tags_aspect = [
|
|
asp for asp in mce.proposedSnapshot.aspects if isinstance(asp, GlobalTagsClass)
|
|
][0]
|
|
assert tags_aspect == make_global_tag_aspect_with_tag_list(
|
|
["has_pii_test", "int_meta_property"]
|
|
)
|
|
|
|
terms_aspect = [
|
|
asp
|
|
for asp in mce.proposedSnapshot.aspects
|
|
if isinstance(asp, GlossaryTermsClass)
|
|
][0]
|
|
assert terms_aspect == make_glossary_terms_aspect_from_urn_list(
|
|
[
|
|
"urn:li:glossaryTerm:Finance_test",
|
|
"urn:li:glossaryTerm:double_meta_property",
|
|
]
|
|
)
|
|
|
|
# workunit[1] - subtypes
|
|
|
|
assert isinstance(workunits[1].metadata, MetadataChangeProposalWrapper)
|
|
assert workunits[1].metadata.aspectName == "subTypes"
|
|
|
|
# workunit[2] - browse paths
|
|
|
|
assert isinstance(workunits[2].metadata, MetadataChangeProposalWrapper)
|
|
assert workunits[2].metadata.aspectName == "browsePathsV2"
|
|
|
|
# workunit[3] - DatasetSnapshot
|
|
|
|
mce = workunits[3].metadata
|
|
assert isinstance(mce, MetadataChangeEvent)
|
|
|
|
# workunit[4] - subtypes
|
|
|
|
assert isinstance(workunits[4].metadata, MetadataChangeProposalWrapper)
|
|
assert workunits[4].metadata.aspectName == "subTypes"
|
|
|
|
# workunit[5] - browse paths
|
|
|
|
assert isinstance(workunits[5].metadata, MetadataChangeProposalWrapper)
|
|
assert workunits[5].metadata.aspectName == "browsePathsV2"
|
|
|
|
# workunit[6] - DatasetSnapshot
|
|
|
|
mce = workunits[6].metadata
|
|
assert isinstance(mce, MetadataChangeEvent)
|
|
ownership_aspect = [
|
|
asp for asp in mce.proposedSnapshot.aspects if isinstance(asp, OwnershipClass)
|
|
][0]
|
|
assert ownership_aspect == make_ownership_aspect_from_urn_list(
|
|
[
|
|
make_owner_urn("charles", OwnerType.USER),
|
|
make_owner_urn("jdoe.last@gmail.com", OwnerType.USER),
|
|
],
|
|
"SERVICE",
|
|
)
|
|
|
|
tags_aspect = [
|
|
asp for asp in mce.proposedSnapshot.aspects if isinstance(asp, GlobalTagsClass)
|
|
][0]
|
|
assert tags_aspect == make_global_tag_aspect_with_tag_list(
|
|
["has_pii_test", "int_meta_property"]
|
|
)
|
|
|
|
terms_aspect = [
|
|
asp
|
|
for asp in mce.proposedSnapshot.aspects
|
|
if isinstance(asp, GlossaryTermsClass)
|
|
][0]
|
|
assert terms_aspect == make_glossary_terms_aspect_from_urn_list(
|
|
[
|
|
"urn:li:glossaryTerm:Finance_test",
|
|
"urn:li:glossaryTerm:double_meta_property",
|
|
]
|
|
)
|
|
|
|
# workunit[7] - subtypes
|
|
|
|
assert isinstance(workunits[7].metadata, MetadataChangeProposalWrapper)
|
|
assert workunits[7].metadata.aspectName == "subTypes"
|
|
|
|
# workunit[8] - browse paths
|
|
|
|
assert isinstance(workunits[8].metadata, MetadataChangeProposalWrapper)
|
|
assert workunits[8].metadata.aspectName == "browsePathsV2"
|
|
|
|
# workunit[9] - glossary terms
|
|
|
|
assert isinstance(workunits[9].metadata, MetadataChangeProposalWrapper)
|
|
assert workunits[9].metadata.aspectName == "glossaryTermKey"
|
|
|
|
# workunit[10] - glossary terms
|
|
|
|
assert isinstance(workunits[10].metadata, MetadataChangeProposalWrapper)
|
|
assert workunits[10].metadata.aspectName == "glossaryTermKey"
|
|
|
|
# workunit[11] - tags
|
|
|
|
assert isinstance(workunits[11].metadata, MetadataChangeProposalWrapper)
|
|
assert workunits[11].metadata.aspectName == "tagKey"
|
|
|
|
# workunit[12] - tags
|
|
|
|
assert isinstance(workunits[12].metadata, MetadataChangeProposalWrapper)
|
|
assert workunits[12].metadata.aspectName == "tagKey"
|
|
|
|
|
|
def test_kafka_source_oauth_cb_configuration():
|
|
with pytest.raises(
|
|
ConfigurationError,
|
|
match=(
|
|
"oauth_cb must be a string representing python function reference "
|
|
"in the format <python-module>:<function-name>."
|
|
),
|
|
):
|
|
KafkaSourceConfig.parse_obj(
|
|
{
|
|
"connection": {
|
|
"bootstrap": "foobar:9092",
|
|
"consumer_config": {
|
|
"oauth_cb": test_kafka_ignore_warnings_on_schema_type
|
|
},
|
|
}
|
|
}
|
|
)
|