feat(kafka): add interface design for listeners (#13637)

This commit is contained in:
RyanHolstien 2025-06-02 13:39:32 -05:00 committed by GitHub
parent 7d5519f2e1
commit 5311434c70
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
23 changed files with 817 additions and 315 deletions

View File

@ -10,6 +10,8 @@ dependencies {
}
implementation externalDependency.springActuator
implementation externalDependency.springKafka
implementation externalDependency.opentelemetrySdkTrace
implementation externalDependency.opentelemetrySdkMetrics
implementation externalDependency.slf4jApi
implementation(externalDependency.springBootStarterJetty) {

View File

@ -0,0 +1,209 @@
package com.linkedin.metadata.kafka.listener;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.MetricRegistry;
import com.linkedin.metadata.utils.metrics.MetricUtils;
import com.linkedin.mxe.SystemMetadata;
import io.datahubproject.metadata.context.OperationContext;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.StatusCode;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.MDC;
@Slf4j
public abstract class AbstractKafkaListener<E, H extends EventHook<E>, R>
implements GenericKafkaListener<E, H, R> {
protected OperationContext systemOperationContext;
@Getter protected String consumerGroupId;
@Getter protected List<H> hooks;
protected boolean fineGrainedLoggingEnabled;
protected Map<String, Set<String>> aspectsToDrop;
private final Histogram kafkaLagStats =
MetricUtils.get().histogram(MetricRegistry.name(this.getClass(), "kafkaLag"));
@Override
public GenericKafkaListener<E, H, R> init(
@Nonnull OperationContext systemOperationContext,
@Nonnull String consumerGroup,
@Nonnull List<H> hooks,
boolean fineGrainedLoggingEnabled,
@Nonnull Map<String, Set<String>> aspectsToDrop) {
this.systemOperationContext = systemOperationContext;
this.consumerGroupId = consumerGroup;
this.hooks = hooks;
this.hooks.forEach(hook -> hook.init(systemOperationContext));
this.fineGrainedLoggingEnabled = fineGrainedLoggingEnabled;
this.aspectsToDrop = aspectsToDrop;
log.info(
"Enabled Hooks - Group: {} Hooks: {}",
consumerGroup,
hooks.stream().map(hook -> hook.getClass().getSimpleName()).collect(Collectors.toList()));
return this;
}
@Override
public void consume(@Nonnull final ConsumerRecord<String, R> consumerRecord) {
try {
kafkaLagStats.update(System.currentTimeMillis() - consumerRecord.timestamp());
final R record = consumerRecord.value();
log.debug(
"Got event consumer: {} key: {}, topic: {}, partition: {}, offset: {}, value size: {}, timestamp: {}",
consumerGroupId,
consumerRecord.key(),
consumerRecord.topic(),
consumerRecord.partition(),
consumerRecord.offset(),
consumerRecord.serializedValueSize(),
consumerRecord.timestamp());
MetricUtils.counter(this.getClass(), consumerGroupId + "_received_event_count").inc();
E event;
try {
event = convertRecord(record);
} catch (Exception e) {
MetricUtils.counter(this.getClass(), consumerGroupId + "_conversion_failure").inc();
log.error("Error deserializing message due to: ", e);
log.error("Message: {}", record.toString());
return;
}
// Initialize MDC context with event metadata
setMDCContext(event);
// Check if should skip processing
if (shouldSkipProcessing(event)) {
log.info("Skipping event: {}", event);
return;
}
List<String> loggingAttributes = getFineGrainedLoggingAttributes(event);
processWithHooks(event, loggingAttributes, consumerRecord.topic());
} finally {
MDC.clear();
}
}
/**
* Process the event with all registered hooks.
*
* @param event The event to process
* @param loggingAttributes Attributes for logging
*/
protected void processWithHooks(E event, List<String> loggingAttributes, String topic) {
systemOperationContext.withQueueSpan(
"consume",
getSystemMetadata(event),
topic,
() -> {
log.info(
"Invoking hooks for consumer: {} event: {}",
consumerGroupId,
getEventDisplayString(event));
// Process with each hook
for (H hook : this.hooks) {
systemOperationContext.withSpan(
hook.getClass().getSimpleName(),
() -> {
log.debug(
"Invoking hook {} for event: {}",
hook.getClass().getSimpleName(),
getEventDisplayString(event));
try {
hook.invoke(event);
} catch (Exception e) {
// Just skip this hook and continue - "at most once" processing
MetricUtils.counter(
this.getClass(), hook.getClass().getSimpleName() + "_failure")
.inc();
log.error(
"Failed to execute hook with name {}",
hook.getClass().getCanonicalName(),
e);
Span currentSpan = Span.current();
currentSpan.recordException(e);
currentSpan.setStatus(StatusCode.ERROR, e.getMessage());
currentSpan.setAttribute(MetricUtils.ERROR_TYPE, e.getClass().getName());
}
},
Stream.concat(
Stream.of(
MetricUtils.DROPWIZARD_NAME,
MetricUtils.name(
this.getClass(), hook.getClass().getSimpleName() + "_latency")),
loggingAttributes.stream())
.toArray(String[]::new));
}
MetricUtils.counter(this.getClass(), consumerGroupId + "_consumed_event_count").inc();
log.info(
"Successfully completed hooks for consumer: {} event: {}",
consumerGroupId,
getEventDisplayString(event));
},
Stream.concat(
Stream.of(
MetricUtils.DROPWIZARD_NAME, MetricUtils.name(this.getClass(), "consume")),
loggingAttributes.stream())
.toArray(String[]::new));
}
/**
* Sets MDC context based on event metadata.
*
* @param event The event to extract metadata from
*/
protected abstract void setMDCContext(E event);
/**
* Determines if this event should be skipped based on filtering rules.
*
* @param event The event to check
* @return true if event should be skipped, false otherwise
*/
protected abstract boolean shouldSkipProcessing(E event);
/**
* Gets attributes for fine-grained logging.
*
* @param event The event to extract attributes from
* @return List of attribute name-value pairs
*/
protected abstract List<String> getFineGrainedLoggingAttributes(E event);
/**
* Gets system metadata from the event for tracing.
*
* @param event The event
* @return System metadata object
*/
protected abstract SystemMetadata getSystemMetadata(E event);
/**
* Gets a display string for the event for logging.
*
* @param event The event
* @return Display string
*/
protected abstract String getEventDisplayString(E event);
}

View File

@ -0,0 +1,167 @@
package com.linkedin.metadata.kafka.listener;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.lang.reflect.Method;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerEndpoint;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.config.MethodKafkaListenerEndpoint;
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;
@Slf4j
public abstract class AbstractKafkaListenerRegistrar<E, H extends EventHook<E>, R>
implements GenericKafkaListenerRegistrar<E, H, R>, InitializingBean {
protected final KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
protected final KafkaListenerContainerFactory<?> kafkaListenerContainerFactory;
protected final String consumerGroupBase;
protected final List<H> hooks;
protected final ObjectMapper objectMapper;
protected AbstractKafkaListenerRegistrar(
KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry,
KafkaListenerContainerFactory<?> kafkaListenerContainerFactory,
String consumerGroupBase,
List<H> hooks,
ObjectMapper objectMapper) {
this.kafkaListenerEndpointRegistry = kafkaListenerEndpointRegistry;
this.kafkaListenerContainerFactory = kafkaListenerContainerFactory;
this.consumerGroupBase = consumerGroupBase;
this.hooks = hooks;
this.objectMapper = objectMapper;
}
@Override
public void afterPropertiesSet() {
Map<String, List<H>> hookGroups =
getEnabledHooks().stream().collect(Collectors.groupingBy(H::getConsumerGroupSuffix));
log.info(
"{} Consumer Groups: {}",
getProcessorType(),
hookGroups.keySet().stream().map(this::buildConsumerGroupName).collect(Collectors.toSet()));
hookGroups.forEach(
(key, groupHooks) -> {
KafkaListenerEndpoint kafkaListenerEndpoint =
createListenerEndpoint(buildConsumerGroupName(key), getTopicNames(), groupHooks);
registerKafkaListener(kafkaListenerEndpoint, false);
});
}
@Override
@Nonnull
public List<H> getEnabledHooks() {
return hooks.stream()
.filter(EventHook::isEnabled)
.sorted(Comparator.comparing(EventHook::executionOrder))
.toList();
}
@Override
public void registerKafkaListener(
@Nonnull KafkaListenerEndpoint kafkaListenerEndpoint, boolean startImmediately) {
kafkaListenerEndpointRegistry.registerListenerContainer(
kafkaListenerEndpoint, kafkaListenerContainerFactory, startImmediately);
}
@Override
@Nonnull
public KafkaListenerEndpoint createListenerEndpoint(
@Nonnull String consumerGroupId, @Nonnull List<String> topics, @Nonnull List<H> groupHooks) {
MethodKafkaListenerEndpoint<String, R> kafkaListenerEndpoint =
new MethodKafkaListenerEndpoint<>();
kafkaListenerEndpoint.setId(consumerGroupId);
kafkaListenerEndpoint.setGroupId(consumerGroupId);
kafkaListenerEndpoint.setAutoStartup(false);
kafkaListenerEndpoint.setTopics(topics.toArray(new String[0]));
kafkaListenerEndpoint.setMessageHandlerMethodFactory(new DefaultMessageHandlerMethodFactory());
Map<String, Set<String>> aspectsToDrop = parseAspectsToDrop();
GenericKafkaListener<E, H, R> listener =
createListener(consumerGroupId, groupHooks, isFineGrainedLoggingEnabled(), aspectsToDrop);
kafkaListenerEndpoint.setBean(listener);
try {
Method consumeMethod = GenericKafkaListener.class.getMethod("consume", ConsumerRecord.class);
kafkaListenerEndpoint.setMethod(consumeMethod);
} catch (NoSuchMethodException e) {
throw new RuntimeException(e);
}
return kafkaListenerEndpoint;
}
@Override
@Nonnull
public String buildConsumerGroupName(@Nonnull String suffix) {
if (suffix.isEmpty()) {
return consumerGroupBase;
} else {
return String.join("-", consumerGroupBase, suffix);
}
}
/**
* Gets the processor type name for logging.
*
* @return The processor type name
*/
protected abstract String getProcessorType();
/**
* Gets the list of topic names to listen to.
*
* @return List of topic names
*/
protected abstract List<String> getTopicNames();
/**
* Checks if fine-grained logging is enabled.
*
* @return true if fine-grained logging is enabled, false otherwise
*/
protected abstract boolean isFineGrainedLoggingEnabled();
/**
* Parses the aspects to drop from configuration.
*
* @return Map of entity types to sets of aspect names to drop
*/
protected Map<String, Set<String>> parseAspectsToDrop() {
String aspectsToDropConfig = getAspectsToDropConfig();
if (StringUtils.isBlank(aspectsToDropConfig)) {
return Collections.emptyMap();
} else {
JavaType type =
objectMapper.getTypeFactory().constructMapType(Map.class, String.class, Set.class);
try {
return objectMapper.readValue(aspectsToDropConfig, type);
} catch (Exception e) {
log.error("Unable to parse aspects to drop configuration: {}", aspectsToDropConfig, e);
return Collections.emptyMap();
}
}
}
/**
* Gets the configuration string for aspects to drop.
*
* @return Configuration string
*/
protected abstract String getAspectsToDropConfig();
}

View File

@ -0,0 +1,36 @@
package com.linkedin.metadata.kafka.listener;
import io.datahubproject.metadata.context.OperationContext;
import javax.annotation.Nonnull;
public interface EventHook<E> {
default EventHook<E> init(@Nonnull OperationContext systemOperationContext) {
return this;
}
/**
* Suffix for the consumer group
*
* @return suffix
*/
@Nonnull
String getConsumerGroupSuffix();
/**
* Return whether the hook is enabled or not. If not enabled, the below invoke method is not
* triggered
*/
boolean isEnabled();
/** Invoke the hook when a MetadataChangeLog is received */
void invoke(@Nonnull E event) throws Exception;
/**
* Controls hook execution ordering
*
* @return order to execute
*/
default int executionOrder() {
return 100;
}
}

View File

@ -0,0 +1,67 @@
package com.linkedin.metadata.kafka.listener;
import io.datahubproject.metadata.context.OperationContext;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nonnull;
import org.apache.kafka.clients.consumer.ConsumerRecord;
/**
* Generic interface for Kafka listeners that process events with hooks.
*
* @param <E> The event type the hook processes
* @param <H> The hook type this listener processes
* @param <R> The record type
*/
public interface GenericKafkaListener<E, H extends EventHook<E>, R> {
/**
* Initializes the listener with system context and hooks.
*
* @param systemOperationContext The operation context for the system
* @param consumerGroup The consumer group ID
* @param hooks The list of hooks to apply
* @param fineGrainedLoggingEnabled Whether to enable fine-grained logging
* @param aspectsToDrop Map of aspects to drop during processing
* @return this listener instance for chaining
*/
GenericKafkaListener<E, H, R> init(
@Nonnull OperationContext systemOperationContext,
@Nonnull String consumerGroup,
@Nonnull List<H> hooks,
boolean fineGrainedLoggingEnabled,
@Nonnull Map<String, Set<String>> aspectsToDrop);
/**
* Process a Kafka consumer record.
*
* @param consumerRecord The Kafka consumer record to process
*/
void consume(@Nonnull ConsumerRecord<String, R> consumerRecord);
/**
* Converts a generic record to the specific event type.
*
* @param record The generic record to convert
* @return The converted event object
*/
E convertRecord(@Nonnull R record) throws IOException;
/**
* Gets the consumer group ID for this listener.
*
* @return The consumer group ID
*/
@Nonnull
String getConsumerGroupId();
/**
* Gets the list of hooks used by this listener.
*
* @return The list of hooks
*/
@Nonnull
List<H> getHooks();
}

View File

@ -0,0 +1,71 @@
package com.linkedin.metadata.kafka.listener;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nonnull;
import org.springframework.kafka.config.KafkaListenerEndpoint;
/**
* Generic interface for registering Kafka listeners for different event types.
*
* @param <E> The event type
* @param <H> The hook type
* @param <R> The record type
*/
public interface GenericKafkaListenerRegistrar<E, H extends EventHook<E>, R> {
/**
* Gets the enabled hooks for this registrar.
*
* @return List of enabled hooks sorted by execution order
*/
@Nonnull
List<H> getEnabledHooks();
/**
* Registers a Kafka listener endpoint.
*
* @param kafkaListenerEndpoint The endpoint to register
* @param startImmediately Whether to start the listener immediately
*/
void registerKafkaListener(
@Nonnull KafkaListenerEndpoint kafkaListenerEndpoint, boolean startImmediately);
/**
* Creates a listener endpoint for the given consumer group and topics.
*
* @param consumerGroupId The consumer group ID
* @param topics List of topics to listen to
* @param hooks List of hooks to apply to events
* @return The created Kafka listener endpoint
*/
@Nonnull
KafkaListenerEndpoint createListenerEndpoint(
@Nonnull String consumerGroupId, @Nonnull List<String> topics, @Nonnull List<H> hooks);
/**
* Builds a consumer group name from a suffix.
*
* @param suffix The suffix to append to the base consumer group name
* @return The complete consumer group name
*/
@Nonnull
String buildConsumerGroupName(@Nonnull String suffix);
/**
* Creates a Kafka listener instance.
*
* @param consumerGroupId The consumer group ID
* @param hooks The hooks to apply
* @param fineGrainedLoggingEnabled Whether to enable fine-grained logging
* @param aspectsToDrop Map of aspects to drop during processing
* @return The created Kafka listener
*/
@Nonnull
GenericKafkaListener<E, H, R> createListener(
@Nonnull String consumerGroupId,
@Nonnull List<H> hooks,
boolean fineGrainedLoggingEnabled,
@Nonnull Map<String, Set<String>> aspectsToDrop);
}

View File

@ -16,7 +16,6 @@ dependencies {
implementation project(':metadata-jobs:mae-consumer')
// TODO: Extract PE consumer into separate pod.
implementation project(':metadata-jobs:pe-consumer')
implementation project(':metadata-jobs:common')
implementation(externalDependency.springBootStarterWeb) {
exclude module: "spring-boot-starter-tomcat"

View File

@ -25,6 +25,7 @@ dependencies {
implementation project(':metadata-events:mxe-registration')
implementation project(':metadata-events:mxe-utils-avro')
implementation project(':datahub-graphql-core')
api project(':metadata-jobs:common')
implementation externalDependency.elasticSearchRest
implementation externalDependency.kafkaAvroSerde

View File

@ -1,5 +1,7 @@
package com.linkedin.metadata.kafka;
import static com.linkedin.metadata.config.kafka.KafkaConfiguration.SIMPLE_EVENT_CONSUMER_NAME;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.MetricRegistry;
import com.linkedin.events.metadata.ChangeType;
@ -53,7 +55,7 @@ public class DataHubUsageEventsProcessor {
@KafkaListener(
id = "${DATAHUB_USAGE_EVENT_KAFKA_CONSUMER_GROUP_ID:datahub-usage-event-consumer-job-client}",
topics = "${DATAHUB_USAGE_EVENT_NAME:" + Topics.DATAHUB_USAGE_EVENT + "}",
containerFactory = "simpleKafkaConsumer",
containerFactory = SIMPLE_EVENT_CONSUMER_NAME,
autoStartup = "false")
public void consume(final ConsumerRecord<String, String> consumerRecord) {
systemOperationContext.withSpan(

View File

@ -1,150 +0,0 @@
package com.linkedin.metadata.kafka;
import static com.linkedin.metadata.Constants.MDC_ASPECT_NAME;
import static com.linkedin.metadata.Constants.MDC_CHANGE_TYPE;
import static com.linkedin.metadata.Constants.MDC_ENTITY_TYPE;
import static com.linkedin.metadata.Constants.MDC_ENTITY_URN;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.MetricRegistry;
import com.linkedin.common.urn.Urn;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.EventUtils;
import com.linkedin.metadata.kafka.hook.MetadataChangeLogHook;
import com.linkedin.metadata.utils.metrics.MetricUtils;
import com.linkedin.mxe.MetadataChangeLog;
import io.datahubproject.metadata.context.OperationContext;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.StatusCode;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.MDC;
@Slf4j
public class MCLKafkaListener {
private static final Histogram kafkaLagStats =
MetricUtils.get()
.histogram(
MetricRegistry.name(
"com.linkedin.metadata.kafka.MetadataChangeLogProcessor", "kafkaLag"));
private final String consumerGroupId;
private final List<MetadataChangeLogHook> hooks;
private final OperationContext systemOperationContext;
public MCLKafkaListener(
OperationContext systemOperationContext,
String consumerGroup,
List<MetadataChangeLogHook> hooks) {
this.systemOperationContext = systemOperationContext;
this.consumerGroupId = consumerGroup;
this.hooks = hooks;
this.hooks.forEach(hook -> hook.init(systemOperationContext));
log.info(
"Enabled MCL Hooks - Group: {} Hooks: {}",
consumerGroup,
hooks.stream().map(hook -> hook.getClass().getSimpleName()).collect(Collectors.toList()));
}
public void consume(final ConsumerRecord<String, GenericRecord> consumerRecord) {
try {
kafkaLagStats.update(System.currentTimeMillis() - consumerRecord.timestamp());
final GenericRecord record = consumerRecord.value();
log.debug(
"Got MCL event consumer: {} key: {}, topic: {}, partition: {}, offset: {}, value size: {}, timestamp: {}",
consumerGroupId,
consumerRecord.key(),
consumerRecord.topic(),
consumerRecord.partition(),
consumerRecord.offset(),
consumerRecord.serializedValueSize(),
consumerRecord.timestamp());
MetricUtils.counter(this.getClass(), consumerGroupId + "_received_mcl_count").inc();
MetadataChangeLog event;
try {
event = EventUtils.avroToPegasusMCL(record);
} catch (Exception e) {
MetricUtils.counter(
this.getClass(), consumerGroupId + "_avro_to_pegasus_conversion_failure")
.inc();
log.error("Error deserializing message due to: ", e);
log.error("Message: {}", record.toString());
return;
}
Urn entityUrn = event.getEntityUrn();
String aspectName = event.hasAspectName() ? event.getAspectName() : null;
String entityType = event.hasEntityType() ? event.getEntityType() : null;
ChangeType changeType = event.hasChangeType() ? event.getChangeType() : null;
MDC.put(MDC_ENTITY_URN, Optional.ofNullable(entityUrn).map(Urn::toString).orElse(""));
MDC.put(MDC_ASPECT_NAME, aspectName);
MDC.put(MDC_ENTITY_TYPE, entityType);
MDC.put(
MDC_CHANGE_TYPE, Optional.ofNullable(changeType).map(ChangeType::toString).orElse(""));
systemOperationContext.withQueueSpan(
"consume",
event.getSystemMetadata(),
consumerRecord.topic(),
() -> {
log.info(
"Invoking MCL hooks for consumer: {} urn: {}, aspect name: {}, entity type: {}, change type: {}",
consumerGroupId,
entityUrn,
aspectName,
entityType,
changeType);
// Here - plug in additional "custom processor hooks"
for (MetadataChangeLogHook hook : this.hooks) {
systemOperationContext.withSpan(
hook.getClass().getSimpleName(),
() -> {
log.debug(
"Invoking MCL hook {} for urn: {}",
hook.getClass().getSimpleName(),
event.getEntityUrn());
try {
hook.invoke(event);
} catch (Exception e) {
// Just skip this hook and continue. - Note that this represents "at most
// once"//
// processing.
MetricUtils.counter(
this.getClass(), hook.getClass().getSimpleName() + "_failure")
.inc();
log.error(
"Failed to execute MCL hook with name {}",
hook.getClass().getCanonicalName(),
e);
Span currentSpan = Span.current();
currentSpan.recordException(e);
currentSpan.setStatus(StatusCode.ERROR, e.getMessage());
currentSpan.setAttribute(MetricUtils.ERROR_TYPE, e.getClass().getName());
}
},
MetricUtils.DROPWIZARD_NAME,
MetricUtils.name(this.getClass(), hook.getClass().getSimpleName() + "_latency"));
}
// TODO: Manually commit kafka offsets after full processing.
MetricUtils.counter(this.getClass(), consumerGroupId + "_consumed_mcl_count").inc();
log.info(
"Successfully completed MCL hooks for consumer: {} urn: {}",
consumerGroupId,
event.getEntityUrn());
},
MetricUtils.DROPWIZARD_NAME,
MetricUtils.name(this.getClass(), "consume"));
} finally {
MDC.clear();
}
}
}

View File

@ -1,122 +0,0 @@
package com.linkedin.metadata.kafka;
import static com.linkedin.metadata.config.kafka.KafkaConfiguration.MCL_EVENT_CONSUMER_NAME;
import com.linkedin.metadata.kafka.config.MetadataChangeLogProcessorCondition;
import com.linkedin.metadata.kafka.hook.MetadataChangeLogHook;
import com.linkedin.mxe.Topics;
import io.datahubproject.metadata.context.OperationContext;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Conditional;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerEndpoint;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.config.MethodKafkaListenerEndpoint;
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;
import org.springframework.stereotype.Component;
@Slf4j
@EnableKafka
@Component
@Conditional(MetadataChangeLogProcessorCondition.class)
public class MCLKafkaListenerRegistrar implements InitializingBean {
@Autowired
@Qualifier("systemOperationContext")
private OperationContext systemOperationContext;
@Autowired private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
@Autowired
@Qualifier(MCL_EVENT_CONSUMER_NAME)
private KafkaListenerContainerFactory<?> kafkaListenerContainerFactory;
@Value("${METADATA_CHANGE_LOG_KAFKA_CONSUMER_GROUP_ID:generic-mae-consumer-job-client}")
private String consumerGroupBase;
@Value("${METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME:" + Topics.METADATA_CHANGE_LOG_VERSIONED + "}")
private String mclVersionedTopicName;
@Value(
"${METADATA_CHANGE_LOG_TIMESERIES_TOPIC_NAME:" + Topics.METADATA_CHANGE_LOG_TIMESERIES + "}")
private String mclTimeseriesTopicName;
@Autowired private List<MetadataChangeLogHook> metadataChangeLogHooks;
@Override
public void afterPropertiesSet() {
Map<String, List<MetadataChangeLogHook>> hookGroups =
getMetadataChangeLogHooks().stream()
.collect(Collectors.groupingBy(MetadataChangeLogHook::getConsumerGroupSuffix));
log.info(
"MetadataChangeLogProcessor Consumer Groups: {}",
hookGroups.keySet().stream().map(this::buildConsumerGroupName).collect(Collectors.toSet()));
hookGroups.forEach(
(key, hooks) -> {
KafkaListenerEndpoint kafkaListenerEndpoint =
createListenerEndpoint(
buildConsumerGroupName(key),
List.of(mclVersionedTopicName, mclTimeseriesTopicName),
hooks);
registerMCLKafkaListener(kafkaListenerEndpoint, false);
});
}
public List<MetadataChangeLogHook> getMetadataChangeLogHooks() {
return metadataChangeLogHooks.stream()
.filter(MetadataChangeLogHook::isEnabled)
.sorted(Comparator.comparing(MetadataChangeLogHook::executionOrder))
.toList();
}
@SneakyThrows
public void registerMCLKafkaListener(
KafkaListenerEndpoint kafkaListenerEndpoint, boolean startImmediately) {
kafkaListenerEndpointRegistry.registerListenerContainer(
kafkaListenerEndpoint, kafkaListenerContainerFactory, startImmediately);
}
private KafkaListenerEndpoint createListenerEndpoint(
String consumerGroupId, List<String> topics, List<MetadataChangeLogHook> hooks) {
MethodKafkaListenerEndpoint<String, GenericRecord> kafkaListenerEndpoint =
new MethodKafkaListenerEndpoint<>();
kafkaListenerEndpoint.setId(consumerGroupId);
kafkaListenerEndpoint.setGroupId(consumerGroupId);
kafkaListenerEndpoint.setAutoStartup(false);
kafkaListenerEndpoint.setTopics(topics.toArray(new String[topics.size()]));
kafkaListenerEndpoint.setMessageHandlerMethodFactory(new DefaultMessageHandlerMethodFactory());
kafkaListenerEndpoint.setBean(
new MCLKafkaListener(systemOperationContext, consumerGroupId, hooks));
try {
kafkaListenerEndpoint.setMethod(
MCLKafkaListener.class.getMethod("consume", ConsumerRecord.class));
} catch (NoSuchMethodException e) {
throw new RuntimeException(e);
}
return kafkaListenerEndpoint;
}
private String buildConsumerGroupName(@Nonnull String suffix) {
if (suffix.isEmpty()) {
return consumerGroupBase;
} else {
return String.join("-", consumerGroupBase, suffix);
}
}
}

View File

@ -9,7 +9,8 @@ public class DataHubUsageEventsProcessorCondition implements Condition {
@Override
public boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata) {
Environment env = context.getEnvironment();
return "true".equals(env.getProperty("MAE_CONSUMER_ENABLED"))
return ("true".equals(env.getProperty("MAE_CONSUMER_ENABLED"))
|| "true".equals(env.getProperty("MCL_CONSUMER_ENABLED")))
&& (env.getProperty("DATAHUB_ANALYTICS_ENABLED") == null
|| "true".equals(env.getProperty("DATAHUB_ANALYTICS_ENABLED")));
}

View File

@ -1,5 +1,6 @@
package com.linkedin.metadata.kafka.hook;
import com.linkedin.metadata.kafka.listener.EventHook;
import com.linkedin.mxe.MetadataChangeLog;
import io.datahubproject.metadata.context.OperationContext;
import javax.annotation.Nonnull;
@ -11,36 +12,13 @@ import javax.annotation.Nonnull;
* with the same message. In the future, we intend to migrate to "at least once" semantics, meaning
* that the hook will be responsible for implementing idempotency.
*/
public interface MetadataChangeLogHook {
public interface MetadataChangeLogHook extends EventHook<MetadataChangeLog> {
/** Initialize the hook */
default MetadataChangeLogHook init(@Nonnull OperationContext systemOperationContext) {
return this;
}
/**
* Suffix for the consumer group
*
* @return suffix
*/
@Nonnull
String getConsumerGroupSuffix();
/**
* Return whether the hook is enabled or not. If not enabled, the below invoke method is not
* triggered
*/
boolean isEnabled();
/** Invoke the hook when a MetadataChangeLog is received */
void invoke(@Nonnull MetadataChangeLog log) throws Exception;
/**
* Controls hook execution ordering
*
* @return order to execute
*/
default int executionOrder() {
return 100;
}
void invoke(@Nonnull MetadataChangeLog event) throws Exception;
}

View File

@ -0,0 +1,26 @@
package com.linkedin.metadata.kafka.hook.usage;
import com.fasterxml.jackson.databind.JsonNode;
import com.linkedin.metadata.kafka.listener.EventHook;
import io.datahubproject.metadata.context.OperationContext;
import javax.annotation.Nonnull;
import org.apache.kafka.clients.consumer.ConsumerRecord;
public interface DataHubUsageEventHook extends EventHook<JsonNode> {
default DataHubUsageEventHook init(@Nonnull OperationContext systemOperationContext) {
return this;
}
/** Invoke the hook when a MetadataChangeLog is received */
void invoke(@Nonnull ConsumerRecord<String, String> event) throws Exception;
/**
* Controls hook execution ordering
*
* @return order to execute
*/
default int executionOrder() {
return 100;
}
}

View File

@ -0,0 +1,104 @@
package com.linkedin.metadata.kafka.listener.mcl;
import static com.linkedin.metadata.Constants.MDC_ASPECT_NAME;
import static com.linkedin.metadata.Constants.MDC_CHANGE_TYPE;
import static com.linkedin.metadata.Constants.MDC_ENTITY_TYPE;
import static com.linkedin.metadata.Constants.MDC_ENTITY_URN;
import com.linkedin.common.urn.Urn;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.EventUtils;
import com.linkedin.metadata.kafka.hook.MetadataChangeLogHook;
import com.linkedin.metadata.kafka.listener.AbstractKafkaListener;
import com.linkedin.metadata.utils.metrics.MetricUtils;
import com.linkedin.mxe.MetadataChangeLog;
import com.linkedin.mxe.SystemMetadata;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import javax.annotation.Nonnull;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.generic.GenericRecord;
import org.slf4j.MDC;
@Slf4j
public class MCLKafkaListener
extends AbstractKafkaListener<MetadataChangeLog, MetadataChangeLogHook, GenericRecord> {
private static final String WILDCARD = "*";
@Override
@Nonnull
public MetadataChangeLog convertRecord(@Nonnull GenericRecord record) throws IOException {
return EventUtils.avroToPegasusMCL(record);
}
@Override
protected void setMDCContext(MetadataChangeLog event) {
Urn entityUrn = event.getEntityUrn();
String aspectName = event.hasAspectName() ? event.getAspectName() : null;
String entityType = event.hasEntityType() ? event.getEntityType() : null;
ChangeType changeType = event.hasChangeType() ? event.getChangeType() : null;
MDC.put(MDC_ENTITY_URN, Optional.ofNullable(entityUrn).map(Urn::toString).orElse(""));
MDC.put(MDC_ASPECT_NAME, aspectName);
MDC.put(MDC_ENTITY_TYPE, entityType);
MDC.put(MDC_CHANGE_TYPE, Optional.ofNullable(changeType).map(ChangeType::toString).orElse(""));
}
@Override
protected boolean shouldSkipProcessing(MetadataChangeLog event) {
String entityType = event.hasEntityType() ? event.getEntityType() : null;
String aspectName = event.hasAspectName() ? event.getAspectName() : null;
return aspectsToDrop.getOrDefault(entityType, Collections.emptySet()).contains(aspectName)
|| aspectsToDrop.getOrDefault(WILDCARD, Collections.emptySet()).contains(aspectName);
}
@Override
protected List<String> getFineGrainedLoggingAttributes(MetadataChangeLog event) {
List<String> attributes = new ArrayList<>();
if (!fineGrainedLoggingEnabled) {
return attributes;
}
String aspectName = event.hasAspectName() ? event.getAspectName() : null;
String entityType = event.hasEntityType() ? event.getEntityType() : null;
ChangeType changeType = event.hasChangeType() ? event.getChangeType() : null;
if (aspectName != null) {
attributes.add(MetricUtils.ASPECT_NAME);
attributes.add(aspectName);
}
if (entityType != null) {
attributes.add(MetricUtils.ENTITY_TYPE);
attributes.add(entityType);
}
if (changeType != null) {
attributes.add(MetricUtils.CHANGE_TYPE);
attributes.add(changeType.name());
}
return attributes;
}
@Override
protected SystemMetadata getSystemMetadata(MetadataChangeLog event) {
return event.getSystemMetadata();
}
@Override
protected String getEventDisplayString(MetadataChangeLog event) {
return String.format(
"urn: %s, aspect name: %s, entity type: %s, change type: %s",
event.getEntityUrn(),
event.hasAspectName() ? event.getAspectName() : null,
event.hasEntityType() ? event.getEntityType() : null,
event.hasChangeType() ? event.getChangeType() : null);
}
}

View File

@ -0,0 +1,101 @@
package com.linkedin.metadata.kafka.listener.mcl;
import static com.linkedin.metadata.config.kafka.KafkaConfiguration.MCL_EVENT_CONSUMER_NAME;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.linkedin.gms.factory.config.ConfigurationProvider;
import com.linkedin.metadata.kafka.config.MetadataChangeLogProcessorCondition;
import com.linkedin.metadata.kafka.hook.MetadataChangeLogHook;
import com.linkedin.metadata.kafka.listener.AbstractKafkaListenerRegistrar;
import com.linkedin.metadata.kafka.listener.GenericKafkaListener;
import com.linkedin.mxe.MetadataChangeLog;
import com.linkedin.mxe.Topics;
import io.datahubproject.metadata.context.OperationContext;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nonnull;
import org.apache.avro.generic.GenericRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Conditional;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.stereotype.Component;
@Component
@Conditional(MetadataChangeLogProcessorCondition.class)
public class MCLKafkaListenerRegistrar
extends AbstractKafkaListenerRegistrar<
MetadataChangeLog, MetadataChangeLogHook, GenericRecord> {
private final OperationContext systemOperationContext;
private final ConfigurationProvider configurationProvider;
@Autowired
@Qualifier(MCL_EVENT_CONSUMER_NAME)
private KafkaListenerContainerFactory<?> kafkaListenerContainerFactory;
@Value("${METADATA_CHANGE_LOG_KAFKA_CONSUMER_GROUP_ID:generic-mae-consumer-job-client}")
private String consumerGroupBase;
@Value("${METADATA_CHANGE_LOG_VERSIONED_TOPIC_NAME:" + Topics.METADATA_CHANGE_LOG_VERSIONED + "}")
private String mclVersionedTopicName;
@Value(
"${METADATA_CHANGE_LOG_TIMESERIES_TOPIC_NAME:" + Topics.METADATA_CHANGE_LOG_TIMESERIES + "}")
private String mclTimeseriesTopicName;
public MCLKafkaListenerRegistrar(
KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry,
@Qualifier(MCL_EVENT_CONSUMER_NAME)
KafkaListenerContainerFactory<?> kafkaListenerContainerFactory,
@Value("${METADATA_CHANGE_LOG_KAFKA_CONSUMER_GROUP_ID:generic-mae-consumer-job-client}")
String consumerGroupBase,
List<MetadataChangeLogHook> hooks,
ObjectMapper objectMapper,
@Qualifier("systemOperationContext") OperationContext systemOperationContext,
ConfigurationProvider configurationProvider) {
super(
kafkaListenerEndpointRegistry,
kafkaListenerContainerFactory,
consumerGroupBase,
hooks,
objectMapper);
this.systemOperationContext = systemOperationContext;
this.configurationProvider = configurationProvider;
}
@Override
protected String getProcessorType() {
return "MetadataChangeLogProcessor";
}
@Override
protected List<String> getTopicNames() {
return List.of(mclVersionedTopicName, mclTimeseriesTopicName);
}
@Override
protected boolean isFineGrainedLoggingEnabled() {
return configurationProvider.getKafka().getConsumer().getMcl().isFineGrainedLoggingEnabled();
}
@Override
protected String getAspectsToDropConfig() {
return configurationProvider.getKafka().getConsumer().getMcl().getAspectsToDrop();
}
@Override
@Nonnull
public GenericKafkaListener<MetadataChangeLog, MetadataChangeLogHook, GenericRecord>
createListener(
@Nonnull String consumerGroupId,
@Nonnull List<MetadataChangeLogHook> hooks,
boolean fineGrainedLoggingEnabled,
@Nonnull Map<String, Set<String>> aspectsToDrop) {
MCLKafkaListener listener = new MCLKafkaListener();
return listener.init(
systemOperationContext, consumerGroupId, hooks, fineGrainedLoggingEnabled, aspectsToDrop);
}
}

View File

@ -3,12 +3,12 @@ package com.linkedin.metadata.kafka.hook.spring;
import static org.testng.AssertJUnit.*;
import com.linkedin.gms.factory.config.ConfigurationProvider;
import com.linkedin.metadata.kafka.MCLKafkaListenerRegistrar;
import com.linkedin.metadata.kafka.hook.UpdateIndicesHook;
import com.linkedin.metadata.kafka.hook.event.EntityChangeEventGeneratorHook;
import com.linkedin.metadata.kafka.hook.incident.IncidentsSummaryHook;
import com.linkedin.metadata.kafka.hook.ingestion.IngestionSchedulerHook;
import com.linkedin.metadata.kafka.hook.siblings.SiblingAssociationHook;
import com.linkedin.metadata.kafka.listener.mcl.MCLKafkaListenerRegistrar;
import com.linkedin.metadata.service.UpdateIndicesService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
@ -42,20 +42,19 @@ public class MCLGMSSpringTest extends AbstractTestNGSpringContextTests {
MCLKafkaListenerRegistrar registrar =
applicationContext.getBean(MCLKafkaListenerRegistrar.class);
assertTrue(
registrar.getMetadataChangeLogHooks().stream()
registrar.getEnabledHooks().stream()
.noneMatch(hook -> hook instanceof IngestionSchedulerHook));
assertTrue(
registrar.getMetadataChangeLogHooks().stream()
.anyMatch(hook -> hook instanceof UpdateIndicesHook));
registrar.getEnabledHooks().stream().anyMatch(hook -> hook instanceof UpdateIndicesHook));
assertTrue(
registrar.getMetadataChangeLogHooks().stream()
registrar.getEnabledHooks().stream()
.anyMatch(hook -> hook instanceof SiblingAssociationHook));
assertTrue(
registrar.getMetadataChangeLogHooks().stream()
registrar.getEnabledHooks().stream()
.anyMatch(hook -> hook instanceof EntityChangeEventGeneratorHook));
assertEquals(
1,
registrar.getMetadataChangeLogHooks().stream()
registrar.getEnabledHooks().stream()
.filter(hook -> hook instanceof IncidentsSummaryHook)
.count());
}

View File

@ -7,12 +7,12 @@ import static org.testng.AssertJUnit.assertTrue;
import com.linkedin.data.schema.annotation.PathSpecBasedSchemaAnnotationVisitor;
import com.linkedin.gms.factory.config.ConfigurationProvider;
import com.linkedin.metadata.kafka.MCLKafkaListenerRegistrar;
import com.linkedin.metadata.kafka.hook.UpdateIndicesHook;
import com.linkedin.metadata.kafka.hook.event.EntityChangeEventGeneratorHook;
import com.linkedin.metadata.kafka.hook.incident.IncidentsSummaryHook;
import com.linkedin.metadata.kafka.hook.ingestion.IngestionSchedulerHook;
import com.linkedin.metadata.kafka.hook.siblings.SiblingAssociationHook;
import com.linkedin.metadata.kafka.listener.mcl.MCLKafkaListenerRegistrar;
import com.linkedin.metadata.service.UpdateIndicesService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
@ -49,20 +49,19 @@ public class MCLMAESpringTest extends AbstractTestNGSpringContextTests {
MCLKafkaListenerRegistrar registrar =
applicationContext.getBean(MCLKafkaListenerRegistrar.class);
assertTrue(
registrar.getMetadataChangeLogHooks().stream()
registrar.getEnabledHooks().stream()
.noneMatch(hook -> hook instanceof IngestionSchedulerHook));
assertTrue(
registrar.getMetadataChangeLogHooks().stream()
.anyMatch(hook -> hook instanceof UpdateIndicesHook));
registrar.getEnabledHooks().stream().anyMatch(hook -> hook instanceof UpdateIndicesHook));
assertTrue(
registrar.getMetadataChangeLogHooks().stream()
registrar.getEnabledHooks().stream()
.anyMatch(hook -> hook instanceof SiblingAssociationHook));
assertTrue(
registrar.getMetadataChangeLogHooks().stream()
registrar.getEnabledHooks().stream()
.anyMatch(hook -> hook instanceof EntityChangeEventGeneratorHook));
assertEquals(
1,
registrar.getMetadataChangeLogHooks().stream()
registrar.getEnabledHooks().stream()
.filter(hook -> hook instanceof IncidentsSummaryHook)
.count());
}

View File

@ -16,5 +16,7 @@ public class ConsumerConfiguration {
@Data
public static class ConsumerOptions {
private String autoOffsetReset;
private boolean fineGrainedLoggingEnabled;
private String aspectsToDrop;
}
}

View File

@ -23,6 +23,7 @@ public class KafkaConfiguration {
public static final String MCP_EVENT_CONSUMER_NAME = "mcpEventConsumer";
public static final String MCL_EVENT_CONSUMER_NAME = "mclEventConsumer";
public static final String PE_EVENT_CONSUMER_NAME = "platformEventConsumer";
public static final String SIMPLE_EVENT_CONSUMER_NAME = "simpleKafkaConsumer";
public static final String DEFAULT_EVENT_CONSUMER_NAME = "kafkaEventConsumer";
private String bootstrapServers;

View File

@ -321,6 +321,8 @@ kafka:
autoOffsetReset: ${KAFKA_CONSUMER_MCP_AUTO_OFFSET_RESET:earliest}
mcl:
autoOffsetReset: ${KAFKA_CONSUMER_MCL_AUTO_OFFSET_RESET:earliest}
fineGrainedLoggingEnabled: ${KAFKA_CONSUMER_MCL_FINE_GRAINED_LOGGING_ENABLED:false}
aspectsToDrop: ${KAFKA_CONSUMER_MCL_ASPECTS_TO_DROP:}
pe:
autoOffsetReset: ${KAFKA_CONSUMER_PE_AUTO_OFFSET_RESET:latest}

View File

@ -2,11 +2,14 @@ package com.linkedin.gms;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.cassandra.CassandraAutoConfiguration;
import org.springframework.boot.autoconfigure.elasticsearch.ElasticsearchRestClientAutoConfiguration;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.boot.web.servlet.support.SpringBootServletInitializer;
import org.springframework.context.annotation.Import;
@SpringBootApplication
@SpringBootApplication(
exclude = {ElasticsearchRestClientAutoConfiguration.class, CassandraAutoConfiguration.class})
@Import({CommonApplicationConfig.class, ServletConfig.class})
public class GMSApplication extends SpringBootServletInitializer {

View File

@ -19,6 +19,10 @@ public class MetricUtils {
public static final String MESSAGING_OPERATION = "messaging.operation";
public static final String ERROR_TYPE = "error.type";
public static final String CHANGE_TYPE = "aspect.change_type";
public static final String ENTITY_TYPE = "aspect.entity_type";
public static final String ASPECT_NAME = "aspect.name";
private MetricUtils() {}
public static final String DELIMITER = "_";