fix(ingest/kafka): Remove topic from kafka browse path (#7398)

This commit is contained in:
Andrew Sikowitz 2023-02-22 18:38:08 -05:00 committed by GitHub
parent 98f3f93442
commit e82e284982
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 51 additions and 15 deletions

View File

@ -243,14 +243,10 @@ class KafkaSource(StatefulIngestionSourceBase):
dataset_snapshot.aspects.append(schema_metadata)
# 3. Attach browsePaths aspect
browse_path_suffix = (
f"{self.source_config.platform_instance}/{topic}"
if self.source_config.platform_instance
else topic
)
browse_path = BrowsePathsClass(
[f"/{self.source_config.env.lower()}/{self.platform}/{browse_path_suffix}"]
)
browse_path_str = f"/{self.source_config.env.lower()}/{self.platform}"
if self.source_config.platform_instance:
browse_path_str += f"/{self.source_config.platform_instance}"
browse_path = BrowsePathsClass([browse_path_str])
dataset_snapshot.aspects.append(browse_path)
custom_props = self.build_custom_properties(

View File

@ -84,7 +84,7 @@
{
"com.linkedin.pegasus2avro.common.BrowsePaths": {
"paths": [
"/prod/kafka/value_topic"
"/prod/kafka"
]
}
},
@ -187,7 +187,7 @@
{
"com.linkedin.pegasus2avro.common.BrowsePaths": {
"paths": [
"/prod/kafka/key_topic"
"/prod/kafka"
]
}
},
@ -326,7 +326,7 @@
{
"com.linkedin.pegasus2avro.common.BrowsePaths": {
"paths": [
"/prod/kafka/key_value_topic"
"/prod/kafka"
]
}
},
@ -382,4 +382,4 @@
"runId": "kafka-test"
}
}
]
]

View File

@ -10,6 +10,7 @@ from confluent_kafka.schema_registry.schema_registry_client import (
from datahub.emitter.mce_builder import (
make_dataplatform_instance_urn,
make_dataset_urn,
make_dataset_urn_with_platform_instance,
)
from datahub.ingestion.api.common import PipelineContext
@ -143,10 +144,49 @@ def test_kafka_source_workunits_with_platform_instance(mock_kafka, mock_admin_cl
asp for asp in proposed_snap.aspects if type(asp) == BrowsePathsClass
]
assert len(browse_path_aspects) == 1
assert (
f"/prod/{PLATFORM}/{PLATFORM_INSTANCE}/{TOPIC_NAME}"
in browse_path_aspects[0].paths
assert f"/prod/{PLATFORM}/{PLATFORM_INSTANCE}" in browse_path_aspects[0].paths
@patch("datahub.ingestion.source.kafka.confluent_kafka.Consumer", autospec=True)
def test_kafka_source_workunits_no_platform_instance(mock_kafka, mock_admin_client):
PLATFORM = "kafka"
TOPIC_NAME = "test"
mock_kafka_instance = mock_kafka.return_value
mock_cluster_metadata = MagicMock()
mock_cluster_metadata.topics = {TOPIC_NAME: None}
mock_kafka_instance.list_topics.return_value = mock_cluster_metadata
ctx = PipelineContext(run_id="test1")
kafka_source = KafkaSource.create(
{"connection": {"bootstrap": "localhost:9092"}},
ctx,
)
workunits = [w for w in kafka_source.get_workunits()]
# We should only have 1 topic + sub-type wu.
assert len(workunits) == 2
assert isinstance(workunits[0], MetadataWorkUnit)
assert isinstance(workunits[0].metadata, MetadataChangeEvent)
proposed_snap = workunits[0].metadata.proposedSnapshot
assert proposed_snap.urn == make_dataset_urn(
platform=PLATFORM,
name=TOPIC_NAME,
env="PROD",
)
# DataPlatform aspect should be present when platform_instance is configured
data_platform_aspects = [
asp for asp in proposed_snap.aspects if type(asp) == DataPlatformInstanceClass
]
assert len(data_platform_aspects) == 0
# The default browse path should include the platform_instance value
browse_path_aspects = [
asp for asp in proposed_snap.aspects if type(asp) == BrowsePathsClass
]
assert len(browse_path_aspects) == 1
assert f"/prod/{PLATFORM}" in browse_path_aspects[0].paths
@patch("datahub.ingestion.source.kafka.confluent_kafka.Consumer", autospec=True)