feat(datahub-actions): Adding support for Metadata Change Log in DataHub Cloud Events Source (#14497)

Co-authored-by: John Joyce <john@Mac.lan>
This commit is contained in:
John Joyce 2025-09-03 08:46:22 -07:00 committed by GitHub
parent 9ec58e3876
commit 5971e79c76
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 714 additions and 53 deletions

View File

@ -1,2 +1,4 @@
PLATFORM_EVENT_TOPIC_NAME = "PlatformEvent_v1"
METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME = "MetadataChangeLog_Versioned_v1"
METADATA_CHANGE_LOG_TIMESERIES_TOPIC_NAME = "MetadataChangeLog_Timeseries_v1"
ENTITY_CHANGE_EVENT_NAME = "entityChangeEvent"

View File

@ -2,23 +2,28 @@ import json
import logging
import time
from dataclasses import dataclass
from typing import Iterable, List, Optional
from typing import Dict, Iterable, List, Optional, Union, cast
from datahub.configuration import ConfigModel
from datahub.emitter.serialization_helper import post_json_transform
from datahub.ingestion.graph.client import DataHubGraph
# DataHub imports.
from datahub.metadata.schema_classes import GenericPayloadClass
from datahub_actions.event.event_envelope import EventEnvelope
from datahub_actions.event.event_registry import (
ENTITY_CHANGE_EVENT_V1_TYPE,
METADATA_CHANGE_LOG_EVENT_V1_TYPE,
EntityChangeEvent,
MetadataChangeLogEvent,
)
# May or may not need these.
from datahub_actions.pipeline.pipeline_context import PipelineContext
from datahub_actions.plugin.source.acryl.constants import (
ENTITY_CHANGE_EVENT_NAME,
METADATA_CHANGE_LOG_TIMESERIES_TOPIC_NAME,
METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME,
PLATFORM_EVENT_TOPIC_NAME,
)
from datahub_actions.plugin.source.acryl.datahub_cloud_events_ack_manager import (
@ -42,8 +47,15 @@ def build_entity_change_event(payload: GenericPayloadClass) -> EntityChangeEvent
raise ValueError("Failed to parse into EntityChangeEvent") from e
def build_metadata_change_log_event(msg: ExternalEvent) -> MetadataChangeLogEvent:
try:
return cast(MetadataChangeLogEvent, MetadataChangeLogEvent.from_json(msg.value))
except Exception as e:
raise ValueError("Failed to parse into MetadataChangeLogEvent") from e
class DataHubEventsSourceConfig(ConfigModel):
topic: str = PLATFORM_EVENT_TOPIC_NAME
topics: Union[str, List[str]] = PLATFORM_EVENT_TOPIC_NAME
consumer_id: Optional[str] = None # Used to store offset for the consumer.
lookback_days: Optional[int] = None
reset_offsets: Optional[bool] = False
@ -71,20 +83,79 @@ class DataHubEventSource(EventSource):
def __init__(self, config: DataHubEventsSourceConfig, ctx: PipelineContext):
self.ctx = ctx
self.source_config = config
self.consumer_id = DataHubEventSource._get_pipeline_urn(self.ctx.pipeline_name)
self.base_consumer_id = DataHubEventSource._get_pipeline_urn(
self.ctx.pipeline_name
)
# Convert topics to a list for consistent handling
if isinstance(self.source_config.topics, str):
self.topics_list = [self.source_config.topics]
else:
self.topics_list = self.source_config.topics
# Ensure a Graph Instance was provided.
assert self.ctx.graph is not None
self.datahub_events_consumer: DataHubEventsConsumer = DataHubEventsConsumer(
# TODO: This PipelineContext provides an Acryl Graph Instance
# Initialize topic consumers
self.topic_consumers = self._initialize_topic_consumers(
topics_list=self.topics_list,
base_consumer_id=self.base_consumer_id,
graph=self.ctx.graph.graph,
consumer_id=self.consumer_id,
lookback_days=self.source_config.lookback_days,
reset_offsets=self.source_config.reset_offsets,
)
self.ack_manager = AckManager()
self.safe_to_ack_offset: Optional[str] = None
self.safe_to_ack_offsets: Dict[str, Optional[str]] = {
topic: None for topic in self.topics_list
}
def _initialize_topic_consumers(
self,
topics_list: List[str],
base_consumer_id: str,
graph: DataHubGraph,
lookback_days: Optional[int],
reset_offsets: Optional[bool],
) -> Dict[str, DataHubEventsConsumer]:
"""
Initialize DataHub consumers for each topic with appropriate consumer IDs.
Maintains backward compatibility by using the legacy consumer ID format
for single PlatformEvent_v1 deployments, and topic-suffixed IDs for
multi-topic or other single-topic deployments.
Args:
topics_list: List of topic names to create consumers for
base_consumer_id: Base consumer ID for the pipeline
graph: DataHub graph instance
lookback_days: Number of days to look back for events
reset_offsets: Whether to reset consumer offsets
Returns:
Dictionary mapping topic names to their corresponding consumers
"""
topic_consumers: Dict[str, DataHubEventsConsumer] = {}
for topic in topics_list:
# Backward compatibility: if only PlatformEvent_v1, use legacy consumer ID format
if len(topics_list) == 1 and topic == PLATFORM_EVENT_TOPIC_NAME:
topic_consumer_id = (
base_consumer_id # Legacy format for existing deployments
)
else:
topic_consumer_id = (
f"{base_consumer_id}-{topic}" # New format for multi-topic
)
topic_consumers[topic] = DataHubEventsConsumer(
graph=graph,
consumer_id=topic_consumer_id,
lookback_days=lookback_days,
reset_offsets=reset_offsets,
)
return topic_consumers
@classmethod
def create(cls, config_dict: dict, ctx: PipelineContext) -> "EventSource":
@ -93,7 +164,7 @@ class DataHubEventSource(EventSource):
def events(self) -> Iterable[EventEnvelope]:
logger.info("Starting DataHub Cloud events source...")
logger.info(f"Subscribing to the following topic: {self.source_config.topic}")
logger.info(f"Subscribing to the following topics: {self.topics_list}")
self.running = True
yield from self._poll_and_process_events()
@ -116,26 +187,37 @@ class DataHubEventSource(EventSource):
raise Exception(
f"Failed to process all events successfully after specified time {self.source_config.event_processing_time_max_duration_seconds}! If more time is required, please increase the timeout using this config. {self.ack_manager.acks.values()}",
)
logger.debug(
f"Successfully processed events up to offset id {self.safe_to_ack_offset}"
)
self.safe_to_ack_offset = self.datahub_events_consumer.offset_id
logger.debug(f"Safe to ack offset: {self.safe_to_ack_offset}")
# Update safe-to-ack offsets for each topic
for topic in self.topics_list:
consumer = self.topic_consumers[topic]
self.safe_to_ack_offsets[topic] = consumer.offset_id
logger.debug(
f"Safe to ack offset for {topic}: {self.safe_to_ack_offsets[topic]}"
)
events_response = self.datahub_events_consumer.poll_events(
topic=self.source_config.topic, poll_timeout_seconds=2
)
# Poll events from all topics using their respective consumers
all_events = []
total_events = 0
for topic in self.topics_list:
consumer = self.topic_consumers[topic]
events_response = consumer.poll_events(
topic=topic, poll_timeout_seconds=2
)
total_events += len(events_response.events)
# Process events from this topic
for msg in events_response.events:
all_events.append((topic, msg))
# Handle Idle Timeout
num_events = len(events_response.events)
if num_events == 0:
if total_events == 0:
if last_idle_response_timestamp == 0:
last_idle_response_timestamp = (
self._get_current_timestamp_seconds()
)
if self._should_idle_timeout(
num_events, last_idle_response_timestamp
total_events, last_idle_response_timestamp
):
logger.info("Exiting main loop due to idle timeout")
return
@ -144,8 +226,9 @@ class DataHubEventSource(EventSource):
last_idle_response_timestamp = 0 # Reset the idle timeout
event_envelopes: List[EventEnvelope] = []
for msg in events_response.events:
for event_envelope in self.handle_pe(msg):
for topic, msg in all_events:
# Route events based on topic type
for event_envelope in self._route_event_by_topic(topic, msg):
event_envelope.meta = self.ack_manager.get_meta(event_envelope)
event_envelopes.append(event_envelope)
@ -157,6 +240,20 @@ class DataHubEventSource(EventSource):
logger.info("DataHub Events consumer exiting main loop")
def _route_event_by_topic(
self, topic: str, msg: ExternalEvent
) -> Iterable[EventEnvelope]:
"""Route events to appropriate handlers based on topic type."""
if topic == PLATFORM_EVENT_TOPIC_NAME:
yield from self.handle_pe(msg)
elif topic in [
METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME,
METADATA_CHANGE_LOG_TIMESERIES_TOPIC_NAME,
]:
yield from self.handle_mcl(msg)
else:
logger.warning(f"Unknown topic: {topic}, skipping event")
@staticmethod
def handle_pe(msg: ExternalEvent) -> Iterable[EventEnvelope]:
value: dict = json.loads(msg.value)
@ -167,14 +264,19 @@ class DataHubEventSource(EventSource):
event = build_entity_change_event(payload)
yield EventEnvelope(ENTITY_CHANGE_EVENT_V1_TYPE, event, {})
@staticmethod
def handle_mcl(msg: ExternalEvent) -> Iterable[EventEnvelope]:
event = build_metadata_change_log_event(msg)
yield EventEnvelope(METADATA_CHANGE_LOG_EVENT_V1_TYPE, event, {})
def close(self) -> None:
if self.datahub_events_consumer:
self.running = False
if self.safe_to_ack_offset:
self.datahub_events_consumer.commit_offsets(
offset_id=self.safe_to_ack_offset
)
self.datahub_events_consumer.close()
self.running = False
# Close and commit offsets for each topic consumer
for topic, consumer in self.topic_consumers.items():
safe_offset = self.safe_to_ack_offsets.get(topic)
if safe_offset:
consumer.commit_offsets(offset_id=safe_offset)
consumer.close()
def ack(self, event: EventEnvelope, processed: bool = True) -> None:
self.ack_manager.ack(event.meta, processed=processed)

View File

@ -1,5 +1,6 @@
# test_datahub_event_source.py
import json
from typing import List, cast
from unittest.mock import MagicMock, patch
@ -8,14 +9,22 @@ import pytest
from datahub_actions.event.event_envelope import EventEnvelope
from datahub_actions.event.event_registry import (
ENTITY_CHANGE_EVENT_V1_TYPE,
METADATA_CHANGE_LOG_EVENT_V1_TYPE,
EntityChangeEvent,
MetadataChangeLogEvent,
)
from datahub_actions.pipeline.pipeline_context import PipelineContext
from datahub_actions.plugin.source.acryl.constants import (
METADATA_CHANGE_LOG_TIMESERIES_TOPIC_NAME,
METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME,
PLATFORM_EVENT_TOPIC_NAME,
)
# Import your source + config classes from the correct module path.
from datahub_actions.plugin.source.acryl.datahub_cloud_event_source import (
DataHubEventSource,
DataHubEventsSourceConfig,
build_metadata_change_log_event,
)
from datahub_actions.plugin.source.acryl.datahub_cloud_events_ack_manager import (
AckManager,
@ -50,7 +59,7 @@ def base_config_dict() -> dict:
We will parse this into DataHubEventsSourceConfig in each test.
"""
return {
"topic": "PlatformEvent_v1",
"topics": "PlatformEvent_v1",
"lookback_days": None,
"reset_offsets": False,
"kill_after_idle_timeout": True,
@ -67,8 +76,8 @@ def test_create_source(
"""
source = DataHubEventSource.create(base_config_dict, mock_pipeline_context)
assert isinstance(source, DataHubEventSource)
# The consumer_id on the instance includes the action prefix from pipeline_name
assert source.consumer_id == "urn:li:dataHubAction:test-pipeline"
# The base_consumer_id on the instance includes the action prefix from pipeline_name
assert source.base_consumer_id == "urn:li:dataHubAction:test-pipeline"
def test_get_pipeline_urn() -> None:
@ -88,12 +97,15 @@ def test_source_initialization(
"""
Validate that DataHubEventSource constructor sets up DataHubEventsConsumer and AckManager.
"""
config_model = DataHubEventsSourceConfig.parse_obj(base_config_dict)
config_model = DataHubEventsSourceConfig.model_validate(base_config_dict)
source = DataHubEventSource(config_model, mock_pipeline_context)
assert source.consumer_id == "urn:li:dataHubAction:test-pipeline"
assert isinstance(source.datahub_events_consumer, DataHubEventsConsumer)
assert source.base_consumer_id == "urn:li:dataHubAction:test-pipeline"
assert isinstance(source.topic_consumers, dict)
assert len(source.topic_consumers) == 1 # Default single topic
assert "PlatformEvent_v1" in source.topic_consumers
assert isinstance(source.topic_consumers["PlatformEvent_v1"], DataHubEventsConsumer)
assert isinstance(source.ack_manager, AckManager)
assert source.safe_to_ack_offset is None
assert source.safe_to_ack_offsets == {"PlatformEvent_v1": None}
def test_events_with_no_events(
@ -101,12 +113,13 @@ def test_events_with_no_events(
) -> None:
base_config_dict["idle_timeout_duration_seconds"] = 1
base_config_dict["kill_after_idle_timeout"] = True
config_model = DataHubEventsSourceConfig.parse_obj(base_config_dict)
config_model = DataHubEventsSourceConfig.model_validate(base_config_dict)
source = DataHubEventSource(config_model, mock_pipeline_context)
mock_consumer = MagicMock(spec=DataHubEventsConsumer)
mock_consumer.offset_id = "offset-100" # Set the mocked offset_id
source.datahub_events_consumer = mock_consumer
# Replace the consumer for the default topic
source.topic_consumers["PlatformEvent_v1"] = mock_consumer
# We'll simulate that poll_events returns a response with 0 events repeatedly.
empty_response = ExternalEventsResponse(offsetId="offset-100", count=0, events=[])
@ -130,12 +143,13 @@ def test_events_with_some_events(
"""
If poll_events returns events, verify that the source yields them and resets idle timer.
"""
config_model = DataHubEventsSourceConfig.parse_obj(base_config_dict)
config_model = DataHubEventsSourceConfig.model_validate(base_config_dict)
source = DataHubEventSource(config_model, mock_pipeline_context)
mock_consumer = MagicMock(spec=DataHubEventsConsumer)
mock_consumer.offset_id = "offset-100"
source.datahub_events_consumer = mock_consumer
# Replace the consumer for the default topic
source.topic_consumers["PlatformEvent_v1"] = mock_consumer
mock_ack_manager = MagicMock(spec=AckManager)
mock_ack_manager.outstanding_acks.side_effect = [0]
@ -165,7 +179,9 @@ def test_events_with_some_events(
assert emitted[0].event_type == ENTITY_CHANGE_EVENT_V1_TYPE
assert isinstance(emitted[0].event, EntityChangeEvent)
mock_ack_manager.get_meta.assert_called_once()
assert source.safe_to_ack_offset == "offset-100" # Previous offset.
assert (
source.safe_to_ack_offsets["PlatformEvent_v1"] == "offset-100"
) # Previous offset.
assert mock_consumer.poll_events.call_count == 1
@ -177,7 +193,7 @@ def test_outstanding_acks_timeout(
due to event_processing_time_max_duration_seconds.
"""
base_config_dict["event_processing_time_max_duration_seconds"] = 2
config_model = DataHubEventsSourceConfig.parse_obj(base_config_dict)
config_model = DataHubEventsSourceConfig.model_validate(base_config_dict)
source = DataHubEventSource(config_model, mock_pipeline_context)
mock_ack_manager = MagicMock(spec=AckManager)
@ -188,7 +204,8 @@ def test_outstanding_acks_timeout(
mock_consumer = MagicMock(spec=DataHubEventsConsumer)
mock_consumer.offset_id = "offset-100"
source.datahub_events_consumer = mock_consumer
# Replace the consumer for the default topic
source.topic_consumers["PlatformEvent_v1"] = mock_consumer
source.running = True
@ -223,7 +240,7 @@ def test_ack(mock_pipeline_context: PipelineContext, base_config_dict: dict) ->
"""
Verify that ack() calls ack_manager.ack with the event's metadata.
"""
config_model = DataHubEventsSourceConfig.parse_obj(base_config_dict)
config_model = DataHubEventsSourceConfig.model_validate(base_config_dict)
source = DataHubEventSource(config_model, mock_pipeline_context)
mock_ack_manager = MagicMock(spec=AckManager)
@ -242,13 +259,14 @@ def test_close(mock_pipeline_context: PipelineContext, base_config_dict: dict) -
"""
Verify that close() stops the source, commits offsets, and calls consumer.close().
"""
config_model = DataHubEventsSourceConfig.parse_obj(base_config_dict)
config_model = DataHubEventsSourceConfig.model_validate(base_config_dict)
source = DataHubEventSource(config_model, mock_pipeline_context)
mock_consumer = MagicMock(spec=DataHubEventsConsumer)
source.datahub_events_consumer = mock_consumer
# Replace the consumer for the default topic
source.topic_consumers["PlatformEvent_v1"] = mock_consumer
source.safe_to_ack_offset = "some-offset-id"
source.safe_to_ack_offsets["PlatformEvent_v1"] = "some-offset-id"
source.close()
assert source.running is False
@ -263,7 +281,7 @@ def test_should_idle_timeout(
Verify the idle timeout logic in _should_idle_timeout().
"""
base_config_dict["idle_timeout_duration_seconds"] = 5
config_model = DataHubEventsSourceConfig.parse_obj(base_config_dict)
config_model = DataHubEventsSourceConfig.model_validate(base_config_dict)
source = DataHubEventSource(config_model, mock_pipeline_context)
# If events > 0 => always False
@ -288,3 +306,226 @@ def test_should_idle_timeout(
is True
)
assert source.running is False
def test_multiple_topics_config(
mock_pipeline_context: PipelineContext, base_config_dict: dict
) -> None:
"""
Test that the source properly handles multiple topics configuration.
"""
# Test with list of topics
base_config_dict["topics"] = [
PLATFORM_EVENT_TOPIC_NAME,
METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME,
METADATA_CHANGE_LOG_TIMESERIES_TOPIC_NAME,
]
config_model = DataHubEventsSourceConfig.model_validate(base_config_dict)
source = DataHubEventSource(config_model, mock_pipeline_context)
assert source.topics_list == [
PLATFORM_EVENT_TOPIC_NAME,
METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME,
METADATA_CHANGE_LOG_TIMESERIES_TOPIC_NAME,
]
# Each topic should have its own consumer
assert len(source.topic_consumers) == 3
assert PLATFORM_EVENT_TOPIC_NAME in source.topic_consumers
assert METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME in source.topic_consumers
assert METADATA_CHANGE_LOG_TIMESERIES_TOPIC_NAME in source.topic_consumers
# Each topic should have its own offset tracking
assert len(source.safe_to_ack_offsets) == 3
for topic in source.topics_list:
assert topic in source.safe_to_ack_offsets
assert source.safe_to_ack_offsets[topic] is None # Initially None
def test_single_topic_config_as_string(
mock_pipeline_context: PipelineContext, base_config_dict: dict
) -> None:
"""
Test that the source properly handles single topic configuration as string.
"""
# topics config as string should be converted to list
config_model = DataHubEventsSourceConfig.model_validate(base_config_dict)
source = DataHubEventSource(config_model, mock_pipeline_context)
assert source.topics_list == [PLATFORM_EVENT_TOPIC_NAME]
def test_backward_compatibility_single_platform_event(
mock_pipeline_context: PipelineContext, base_config_dict: dict
) -> None:
"""
Test backward compatibility: single PlatformEvent_v1 topic uses legacy consumer ID format.
"""
# Default config has only PlatformEvent_v1
config_model = DataHubEventsSourceConfig.model_validate(base_config_dict)
source = DataHubEventSource(config_model, mock_pipeline_context)
# Should use legacy consumer ID format (no topic suffix)
platform_consumer = source.topic_consumers[PLATFORM_EVENT_TOPIC_NAME]
assert platform_consumer.consumer_id == "urn:li:dataHubAction:test-pipeline"
def test_new_format_for_multiple_topics(
mock_pipeline_context: PipelineContext, base_config_dict: dict
) -> None:
"""
Test that multiple topics use new consumer ID format with topic suffixes.
"""
base_config_dict["topics"] = [
PLATFORM_EVENT_TOPIC_NAME,
METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME,
]
config_model = DataHubEventsSourceConfig.model_validate(base_config_dict)
source = DataHubEventSource(config_model, mock_pipeline_context)
# Should use new consumer ID format with topic suffixes
platform_consumer = source.topic_consumers[PLATFORM_EVENT_TOPIC_NAME]
mcl_consumer = source.topic_consumers[METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME]
assert (
platform_consumer.consumer_id
== "urn:li:dataHubAction:test-pipeline-PlatformEvent_v1"
)
assert (
mcl_consumer.consumer_id
== "urn:li:dataHubAction:test-pipeline-MetadataChangeLog_Versioned_v1"
)
def test_new_format_for_single_non_platform_topic(
mock_pipeline_context: PipelineContext, base_config_dict: dict
) -> None:
"""
Test that single non-PlatformEvent topic uses new consumer ID format.
"""
base_config_dict["topics"] = METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME
config_model = DataHubEventsSourceConfig.model_validate(base_config_dict)
source = DataHubEventSource(config_model, mock_pipeline_context)
# Should use new consumer ID format even for single topic (since it's not PlatformEvent_v1)
mcl_consumer = source.topic_consumers[METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME]
assert (
mcl_consumer.consumer_id
== "urn:li:dataHubAction:test-pipeline-MetadataChangeLog_Versioned_v1"
)
def test_handle_mcl() -> None:
"""
Test that handle_mcl properly processes MetadataChangeLogEvent with proper aspect encoding.
"""
# Create a realistic MCL event based on the documented format
mcl_value = {
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:hive,test,PROD)",
"entityKeyAspect": None,
"changeType": "UPSERT",
"aspectName": "globalTags",
"aspect": {
"value": '{"tags":[{"tag":"urn:li:tag:pii"}]}', # JSON string as per API format
"contentType": "application/json",
},
"systemMetadata": {
"lastObserved": 1651516475595,
"runId": "test-run-id",
"registryName": "testRegistry",
"registryVersion": "1.0.0",
"properties": None,
},
"previousAspectValue": None,
"previousSystemMetadata": None,
"created": {
"time": 1651516475594,
"actor": "urn:li:corpuser:datahub",
"impersonator": None,
},
}
msg = ExternalEvent(contentType="application/json", value=json.dumps(mcl_value))
envelopes: List[EventEnvelope] = list(DataHubEventSource.handle_mcl(msg))
assert len(envelopes) == 1
assert envelopes[0].event_type == METADATA_CHANGE_LOG_EVENT_V1_TYPE
assert isinstance(envelopes[0].event, MetadataChangeLogEvent)
# Verify the event was parsed correctly
mcl_event = envelopes[0].event
assert mcl_event.entityUrn == "urn:li:dataset:(urn:li:dataPlatform:hive,test,PROD)"
assert mcl_event.entityType == "dataset"
assert mcl_event.aspectName == "globalTags"
assert mcl_event.changeType == "UPSERT"
def test_route_event_by_topic(
mock_pipeline_context: PipelineContext, base_config_dict: dict
) -> None:
"""
Test that _route_event_by_topic properly routes events based on topic.
"""
config_model = DataHubEventsSourceConfig.model_validate(base_config_dict)
source = DataHubEventSource(config_model, mock_pipeline_context)
# Test platform event routing
pe_value = '{"header":{"timestampMillis":1737170481713},"name":"entityChangeEvent","payload":{"value":"{\\"auditStamp\\":{\\"actor\\":\\"urn:li:corpuser:test\\",\\"time\\":1737170481713},\\"entityUrn\\":\\"urn:li:dataset:(urn:li:dataPlatform:hive,test,PROD)\\",\\"entityType\\":\\"dataset\\",\\"modifier\\":\\"urn:li:tag:test\\",\\"category\\":\\"TAG\\",\\"operation\\":\\"ADD\\",\\"version\\":0}","contentType":"application/json"}}'
pe_msg = ExternalEvent(contentType="application/json", value=pe_value)
pe_envelopes = list(source._route_event_by_topic(PLATFORM_EVENT_TOPIC_NAME, pe_msg))
assert len(pe_envelopes) == 1
assert pe_envelopes[0].event_type == ENTITY_CHANGE_EVENT_V1_TYPE
# Test MCL event routing with mocked handler
mcl_msg = ExternalEvent(contentType="application/json", value='{"test": "mcl"}')
with patch.object(source, "handle_mcl") as mock_handle_mcl:
mock_envelope = EventEnvelope(
METADATA_CHANGE_LOG_EVENT_V1_TYPE, MagicMock(), {}
)
mock_handle_mcl.return_value = [mock_envelope]
mcl_envelopes = list(
source._route_event_by_topic(
METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME, mcl_msg
)
)
assert len(mcl_envelopes) == 1
assert mcl_envelopes[0].event_type == METADATA_CHANGE_LOG_EVENT_V1_TYPE
mock_handle_mcl.assert_called_once_with(mcl_msg)
# Test unknown topic (should return no events)
unknown_envelopes = list(source._route_event_by_topic("unknown_topic", pe_msg))
assert len(unknown_envelopes) == 0
def test_build_metadata_change_log_event() -> None:
"""
Test that build_metadata_change_log_event properly creates MetadataChangeLogEvent.
"""
# Create a realistic MCL event based on documented format
mcl_value = {
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:hive,test_dataset,PROD)",
"changeType": "UPSERT",
"aspectName": "datasetProfile",
"aspect": {
"value": '{"rowCount": 1000, "columnCount": 5}', # JSON string
"contentType": "application/json",
},
"systemMetadata": {"lastObserved": 1651516475595, "runId": "test-run"},
"created": {"time": 1651516475594, "actor": "urn:li:corpuser:datahub"},
}
msg = ExternalEvent(contentType="application/json", value=json.dumps(mcl_value))
event = build_metadata_change_log_event(msg)
assert isinstance(event, MetadataChangeLogEvent)
assert (
event.entityUrn == "urn:li:dataset:(urn:li:dataPlatform:hive,test_dataset,PROD)"
)
assert event.entityType == "dataset"
assert event.aspectName == "datasetProfile"
assert event.changeType == "UPSERT"

View File

@ -28,7 +28,16 @@ import org.apache.kafka.common.TopicPartition;
public class ExternalEventsService {
public static final String PLATFORM_EVENT_TOPIC_NAME = "PlatformEvent_v1";
private static final Set<String> ALLOWED_TOPICS = Set.of(PLATFORM_EVENT_TOPIC_NAME);
public static final String METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME =
"MetadataChangeLog_Versioned_v1";
public static final String METADATA_CHANGE_LOG_TIMESERIES_TOPIC_NAME =
"MetadataChangeLog_Timeseries_v1";
private static final Set<String> ALLOWED_TOPICS =
Set.of(
PLATFORM_EVENT_TOPIC_NAME,
METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME,
METADATA_CHANGE_LOG_TIMESERIES_TOPIC_NAME);
private final KafkaConsumerPool consumerPool;
private final ObjectMapper objectMapper;
private final Map<String, String>
@ -88,7 +97,7 @@ public class ExternalEventsService {
consumer.assign(partitions);
Map<TopicPartition, Long> partitionOffsets =
getPartitionOffsets(topic, offsetId, consumer, partitions, lookbackWindowDays);
getPartitionOffsets(finalTopic, offsetId, consumer, partitions, lookbackWindowDays);
for (Map.Entry<TopicPartition, Long> entry : partitionOffsets.entrySet()) {
consumer.seek(entry.getKey(), entry.getValue());

View File

@ -37,6 +37,12 @@ public class ExternalEventsServiceTest {
MockitoAnnotations.initMocks(this);
when(consumerPool.borrowConsumer()).thenReturn(kafkaConsumer);
topicNames.put(ExternalEventsService.PLATFORM_EVENT_TOPIC_NAME, "CustomerSpecificTopicName");
topicNames.put(
ExternalEventsService.METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME,
"CustomerSpecificVersionedTopicName");
topicNames.put(
ExternalEventsService.METADATA_CHANGE_LOG_TIMESERIES_TOPIC_NAME,
"CustomerSpecificTimeseriesTopicName");
service = new ExternalEventsService(consumerPool, objectMapper, topicNames, 10, 100);
// Setup to simulate fetching records from Kafka
@ -99,6 +105,38 @@ public class ExternalEventsServiceTest {
verify(kafkaConsumer, atLeastOnce()).poll(any());
}
@Test
public void testPollValidMetadataChangeLogVersionedTopic() throws Exception {
// Mocking Kafka and ObjectMapper behaviors
when(kafkaConsumer.partitionsFor(anyString())).thenReturn(Collections.emptyList());
when(objectMapper.writeValueAsString(any())).thenReturn("encodedString");
// Execute
ExternalEvents events =
service.poll(
ExternalEventsService.METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME, null, 10, 5, null);
// Validate
assertNotNull(events);
verify(kafkaConsumer, atLeastOnce()).poll(any());
}
@Test
public void testPollValidMetadataChangeLogTimeseriesTopic() throws Exception {
// Mocking Kafka and ObjectMapper behaviors
when(kafkaConsumer.partitionsFor(anyString())).thenReturn(Collections.emptyList());
when(objectMapper.writeValueAsString(any())).thenReturn("encodedString");
// Execute
ExternalEvents events =
service.poll(
ExternalEventsService.METADATA_CHANGE_LOG_TIMESERIES_TOPIC_NAME, null, 10, 5, null);
// Validate
assertNotNull(events);
verify(kafkaConsumer, atLeastOnce()).poll(any());
}
@Test(expectedExceptions = UnsupportedTopicException.class)
public void testPollInvalidTopic() throws Exception {
service.poll("InvalidTopic", null, 10, 5, null);

View File

@ -1,5 +1,7 @@
package com.linkedin.gms.factory.event;
import static io.datahubproject.event.ExternalEventsService.METADATA_CHANGE_LOG_TIMESERIES_TOPIC_NAME;
import static io.datahubproject.event.ExternalEventsService.METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME;
import static io.datahubproject.event.ExternalEventsService.PLATFORM_EVENT_TOPIC_NAME;
import com.fasterxml.jackson.databind.ObjectMapper;
@ -39,6 +41,12 @@ public class ExternalEventsServiceFactory {
private Map<String, String> buildTopicNameMappings() {
final Map<String, String> topicNames = new HashMap<>();
topicNames.put(PLATFORM_EVENT_TOPIC_NAME, topicConvention.getPlatformEventTopicName());
topicNames.put(
METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME,
topicConvention.getMetadataChangeLogVersionedTopicName());
topicNames.put(
METADATA_CHANGE_LOG_TIMESERIES_TOPIC_NAME,
topicConvention.getMetadataChangeLogTimeseriesTopicName());
return topicNames;
}
}

View File

@ -80,7 +80,7 @@ public class ExternalEventsController {
name = "topic",
required = true,
description =
"The topic to read events for. Currently only supports PlatformEvent_v1, which provides Platform Events such as EntityChangeEvent and NotificationRequestEvent.")
"The topic to read events for. Currently only supports PlatformEvent_v1, which provides Platform Events such as EntityChangeEvent and NotificationRequestEvents and MetadataChangeLog_v1, which provides all aspect updates.")
@RequestParam(name = "topic", required = true)
String topic,
@Parameter(name = "offsetId", description = "The offset to start reading the topic from")
@ -143,6 +143,12 @@ public class ExternalEventsController {
if (Topics.PLATFORM_EVENT.equals(topic)) {
return AuthUtil.isAPIAuthorized(opContext, PoliciesConfig.GET_PLATFORM_EVENTS_PRIVILEGE);
}
if (Topics.METADATA_CHANGE_LOG_VERSIONED.equals(topic)) {
return AuthUtil.isAPIAuthorized(opContext, PoliciesConfig.GET_METADATA_CHANGE_LOG_EVENTS);
}
if (Topics.METADATA_CHANGE_LOG_TIMESERIES.equals(topic)) {
return AuthUtil.isAPIAuthorized(opContext, PoliciesConfig.GET_METADATA_CHANGE_LOG_EVENTS);
}
return false;
}

View File

@ -1,5 +1,7 @@
package io.datahubproject.openapi.v1.event;
import static io.datahubproject.event.ExternalEventsService.METADATA_CHANGE_LOG_TIMESERIES_TOPIC_NAME;
import static io.datahubproject.event.ExternalEventsService.METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME;
import static io.datahubproject.event.ExternalEventsService.PLATFORM_EVENT_TOPIC_NAME;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
@ -299,6 +301,250 @@ public class ExternalEventsControllerTest extends AbstractTestNGSpringContextTes
.andExpect(status().isForbidden());
}
@Test
public void testPollMetadataChangeLogVersionedTopic() throws Exception {
// Setup mock authorization
when(mockAuthorizerChain.authorize(any(AuthorizationRequest.class)))
.thenReturn(new AuthorizationResult(null, AuthorizationResult.Type.ALLOW, ""));
// Setup mock response
List<ExternalEvent> events = new ArrayList<>();
ExternalEvent event = new ExternalEvent();
event.setValue(TEST_CONTENT);
event.setContentType(TEST_CONTENT_TYPE);
events.add(event);
ExternalEvents externalEvents = new ExternalEvents();
externalEvents.setEvents(events);
externalEvents.setOffsetId(TEST_OFFSET_ID);
externalEvents.setCount(1L);
when(mockEventsService.poll(
eq(METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME),
nullable(String.class),
anyInt(),
anyInt(),
nullable(Integer.class)))
.thenReturn(externalEvents);
// Execute test
mockMvc
.perform(
MockMvcRequestBuilders.get("/openapi/v1/events/poll")
.param("topic", METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME)
.param("limit", "100")
.param("pollTimeoutSeconds", "10")
.accept(MediaType.APPLICATION_JSON))
.andExpect(status().isOk())
.andExpect(MockMvcResultMatchers.jsonPath("$.count").value(1))
.andExpect(MockMvcResultMatchers.jsonPath("$.offsetId").value(TEST_OFFSET_ID))
.andExpect(MockMvcResultMatchers.jsonPath("$.events[0].value").value(TEST_CONTENT))
.andExpect(
MockMvcResultMatchers.jsonPath("$.events[0].contentType").value(TEST_CONTENT_TYPE));
}
@Test
public void testPollMetadataChangeLogTimeseriesTopic() throws Exception {
// Setup mock authorization
when(mockAuthorizerChain.authorize(any(AuthorizationRequest.class)))
.thenReturn(new AuthorizationResult(null, AuthorizationResult.Type.ALLOW, ""));
// Setup mock response
List<ExternalEvent> events = new ArrayList<>();
ExternalEvent event = new ExternalEvent();
event.setValue(TEST_CONTENT);
event.setContentType(TEST_CONTENT_TYPE);
events.add(event);
ExternalEvents externalEvents = new ExternalEvents();
externalEvents.setEvents(events);
externalEvents.setOffsetId(TEST_OFFSET_ID);
externalEvents.setCount(1L);
when(mockEventsService.poll(
eq(METADATA_CHANGE_LOG_TIMESERIES_TOPIC_NAME),
nullable(String.class),
anyInt(),
anyInt(),
nullable(Integer.class)))
.thenReturn(externalEvents);
// Execute test
mockMvc
.perform(
MockMvcRequestBuilders.get("/openapi/v1/events/poll")
.param("topic", METADATA_CHANGE_LOG_TIMESERIES_TOPIC_NAME)
.param("limit", "100")
.param("pollTimeoutSeconds", "10")
.accept(MediaType.APPLICATION_JSON))
.andExpect(status().isOk())
.andExpect(MockMvcResultMatchers.jsonPath("$.count").value(1))
.andExpect(MockMvcResultMatchers.jsonPath("$.offsetId").value(TEST_OFFSET_ID))
.andExpect(MockMvcResultMatchers.jsonPath("$.events[0].value").value(TEST_CONTENT))
.andExpect(
MockMvcResultMatchers.jsonPath("$.events[0].contentType").value(TEST_CONTENT_TYPE));
}
@Test
public void testPollMetadataChangeLogVersionedTopicWithOffset() throws Exception {
// Setup mock authorization
when(mockAuthorizerChain.authorize(any(AuthorizationRequest.class)))
.thenReturn(new AuthorizationResult(null, AuthorizationResult.Type.ALLOW, ""));
// Setup mock response
List<ExternalEvent> events = new ArrayList<>();
ExternalEvent event = new ExternalEvent();
event.setValue(TEST_CONTENT);
event.setContentType(TEST_CONTENT_TYPE);
events.add(event);
ExternalEvents externalEvents = new ExternalEvents();
externalEvents.setEvents(events);
externalEvents.setOffsetId("new-offset-id");
externalEvents.setCount(1L);
when(mockEventsService.poll(
eq(METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME),
eq(TEST_OFFSET_ID),
anyInt(),
anyInt(),
nullable(Integer.class)))
.thenReturn(externalEvents);
// Execute test
mockMvc
.perform(
MockMvcRequestBuilders.get("/openapi/v1/events/poll")
.param("topic", METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME)
.param("offsetId", TEST_OFFSET_ID)
.param("limit", "100")
.param("pollTimeoutSeconds", "10")
.accept(MediaType.APPLICATION_JSON))
.andExpect(status().isOk())
.andExpect(MockMvcResultMatchers.jsonPath("$.count").value(1))
.andExpect(MockMvcResultMatchers.jsonPath("$.offsetId").value("new-offset-id"));
}
@Test
public void testPollMetadataChangeLogTimeseriesTopicWithOffset() throws Exception {
// Setup mock authorization
when(mockAuthorizerChain.authorize(any(AuthorizationRequest.class)))
.thenReturn(new AuthorizationResult(null, AuthorizationResult.Type.ALLOW, ""));
// Setup mock response
List<ExternalEvent> events = new ArrayList<>();
ExternalEvent event = new ExternalEvent();
event.setValue(TEST_CONTENT);
event.setContentType(TEST_CONTENT_TYPE);
events.add(event);
ExternalEvents externalEvents = new ExternalEvents();
externalEvents.setEvents(events);
externalEvents.setOffsetId("new-offset-id");
externalEvents.setCount(1L);
when(mockEventsService.poll(
eq(METADATA_CHANGE_LOG_TIMESERIES_TOPIC_NAME),
eq(TEST_OFFSET_ID),
anyInt(),
anyInt(),
nullable(Integer.class)))
.thenReturn(externalEvents);
// Execute test
mockMvc
.perform(
MockMvcRequestBuilders.get("/openapi/v1/events/poll")
.param("topic", METADATA_CHANGE_LOG_TIMESERIES_TOPIC_NAME)
.param("offsetId", TEST_OFFSET_ID)
.param("limit", "100")
.param("pollTimeoutSeconds", "10")
.accept(MediaType.APPLICATION_JSON))
.andExpect(status().isOk())
.andExpect(MockMvcResultMatchers.jsonPath("$.count").value(1))
.andExpect(MockMvcResultMatchers.jsonPath("$.offsetId").value("new-offset-id"));
}
@Test
public void testPollMetadataChangeLogVersionedTopicWithLookbackWindow() throws Exception {
// Setup mock authorization
when(mockAuthorizerChain.authorize(any(AuthorizationRequest.class)))
.thenReturn(new AuthorizationResult(null, AuthorizationResult.Type.ALLOW, ""));
// Setup mock response
List<ExternalEvent> events = new ArrayList<>();
ExternalEvent event = new ExternalEvent();
event.setValue(TEST_CONTENT);
event.setContentType(TEST_CONTENT_TYPE);
events.add(event);
ExternalEvents externalEvents = new ExternalEvents();
externalEvents.setEvents(events);
externalEvents.setOffsetId(TEST_OFFSET_ID);
externalEvents.setCount(1L);
when(mockEventsService.poll(
eq(METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME),
nullable(String.class),
anyInt(),
anyInt(),
eq(7)))
.thenReturn(externalEvents);
// Execute test
mockMvc
.perform(
MockMvcRequestBuilders.get("/openapi/v1/events/poll")
.param("topic", METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME)
.param("lookbackWindowDays", "7")
.param("limit", "100")
.param("pollTimeoutSeconds", "10")
.accept(MediaType.APPLICATION_JSON))
.andExpect(status().isOk())
.andExpect(MockMvcResultMatchers.jsonPath("$.count").value(1))
.andExpect(MockMvcResultMatchers.jsonPath("$.offsetId").value(TEST_OFFSET_ID));
}
@Test
public void testPollMetadataChangeLogTimeseriesTopicWithLookbackWindow() throws Exception {
// Setup mock authorization
when(mockAuthorizerChain.authorize(any(AuthorizationRequest.class)))
.thenReturn(new AuthorizationResult(null, AuthorizationResult.Type.ALLOW, ""));
// Setup mock response
List<ExternalEvent> events = new ArrayList<>();
ExternalEvent event = new ExternalEvent();
event.setValue(TEST_CONTENT);
event.setContentType(TEST_CONTENT_TYPE);
events.add(event);
ExternalEvents externalEvents = new ExternalEvents();
externalEvents.setEvents(events);
externalEvents.setOffsetId(TEST_OFFSET_ID);
externalEvents.setCount(1L);
when(mockEventsService.poll(
eq(METADATA_CHANGE_LOG_TIMESERIES_TOPIC_NAME),
nullable(String.class),
anyInt(),
anyInt(),
eq(7)))
.thenReturn(externalEvents);
// Execute test
mockMvc
.perform(
MockMvcRequestBuilders.get("/openapi/v1/events/poll")
.param("topic", METADATA_CHANGE_LOG_TIMESERIES_TOPIC_NAME)
.param("lookbackWindowDays", "7")
.param("limit", "100")
.param("pollTimeoutSeconds", "10")
.accept(MediaType.APPLICATION_JSON))
.andExpect(status().isOk())
.andExpect(MockMvcResultMatchers.jsonPath("$.count").value(1))
.andExpect(MockMvcResultMatchers.jsonPath("$.offsetId").value(TEST_OFFSET_ID));
}
@Test
public void testAuditEventsSearch() throws Exception {
when(mockAuthorizerChain.authorize(any(AuthorizationRequest.class)))

View File

@ -41,7 +41,8 @@
"MANAGE_FEATURES",
"MANAGE_SYSTEM_OPERATIONS",
"GET_PLATFORM_EVENTS",
"MANAGE_HOME_PAGE_TEMPLATES"
"MANAGE_HOME_PAGE_TEMPLATES",
"GET_METADATA_CHANGE_LOG_EVENTS"
],
"displayName": "Root User - All Platform Privileges",
"description": "Grants all platform privileges to root user.",
@ -198,7 +199,8 @@
"MANAGE_FEATURES",
"MANAGE_SYSTEM_OPERATIONS",
"GET_PLATFORM_EVENTS",
"MANAGE_HOME_PAGE_TEMPLATES"
"MANAGE_HOME_PAGE_TEMPLATES",
"GET_METADATA_CHANGE_LOG_EVENTS"
],
"displayName": "Admins - Platform Policy",
"description": "Admins have all platform privileges.",

View File

@ -212,6 +212,12 @@ public class PoliciesConfig {
"Get Platform Events",
"The ability to use the Events API to read Platform Events - Entity Change Events and Notification Request Events.");
public static final Privilege GET_METADATA_CHANGE_LOG_EVENTS =
Privilege.of(
"GET_METADATA_CHANGE_LOG_EVENTS",
"Get Metadata Change Log Events",
"The ability to use the Events API to read Metadata Change Log, or all low-level Metadata Change Events.");
public static final Privilege MANAGE_HOME_PAGE_TEMPLATES_PRIVILEGE =
Privilege.of(
"MANAGE_HOME_PAGE_TEMPLATES",
@ -256,6 +262,7 @@ public class PoliciesConfig {
MANAGE_FEATURES_PRIVILEGE,
MANAGE_SYSTEM_OPERATIONS_PRIVILEGE,
GET_PLATFORM_EVENTS_PRIVILEGE,
GET_METADATA_CHANGE_LOG_EVENTS,
MANAGE_HOME_PAGE_TEMPLATES_PRIVILEGE);
// Resource Privileges //