mirror of
https://github.com/datahub-project/datahub.git
synced 2025-11-02 11:49:23 +00:00
fix(platform): add metrics for mcl hooks (#4008)
Co-authored-by: Shirshanka Das <shirshanka@apache.org>
This commit is contained in:
parent
c1bcc9924e
commit
e8cc991378
@ -2,12 +2,13 @@ package com.linkedin.metadata.kafka;
|
||||
|
||||
import com.codahale.metrics.Histogram;
|
||||
import com.codahale.metrics.MetricRegistry;
|
||||
import com.codahale.metrics.Timer;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.linkedin.metadata.EventUtils;
|
||||
import com.linkedin.metadata.kafka.config.MetadataChangeLogProcessorCondition;
|
||||
import com.linkedin.metadata.kafka.hook.ingestion.IngestionSchedulerHook;
|
||||
import com.linkedin.metadata.kafka.hook.MetadataChangeLogHook;
|
||||
import com.linkedin.metadata.kafka.hook.UpdateIndicesHook;
|
||||
import com.linkedin.metadata.kafka.hook.ingestion.IngestionSchedulerHook;
|
||||
import com.linkedin.metadata.utils.metrics.MetricUtils;
|
||||
import com.linkedin.mxe.MetadataChangeLog;
|
||||
import com.linkedin.mxe.Topics;
|
||||
@ -27,7 +28,7 @@ import org.springframework.stereotype.Component;
|
||||
@Slf4j
|
||||
@Component
|
||||
@Conditional(MetadataChangeLogProcessorCondition.class)
|
||||
@Import({ UpdateIndicesHook.class, IngestionSchedulerHook.class })
|
||||
@Import({UpdateIndicesHook.class, IngestionSchedulerHook.class})
|
||||
@EnableKafka
|
||||
public class MetadataChangeLogProcessor {
|
||||
|
||||
@ -35,8 +36,7 @@ public class MetadataChangeLogProcessor {
|
||||
private final Histogram kafkaLagStats = MetricUtils.get().histogram(MetricRegistry.name(this.getClass(), "kafkaLag"));
|
||||
|
||||
@Autowired
|
||||
public MetadataChangeLogProcessor(
|
||||
@Nonnull final UpdateIndicesHook updateIndicesHook,
|
||||
public MetadataChangeLogProcessor(@Nonnull final UpdateIndicesHook updateIndicesHook,
|
||||
@Nonnull final IngestionSchedulerHook ingestionSchedulerHook) {
|
||||
this.hooks = ImmutableList.of(updateIndicesHook, ingestionSchedulerHook);
|
||||
}
|
||||
@ -48,38 +48,36 @@ public class MetadataChangeLogProcessor {
|
||||
public void consume(final ConsumerRecord<String, GenericRecord> consumerRecord) {
|
||||
kafkaLagStats.update(System.currentTimeMillis() - consumerRecord.timestamp());
|
||||
final GenericRecord record = consumerRecord.value();
|
||||
log.debug("Got Generic MCL");
|
||||
log.debug("Got Generic MCL on topic: {}, partition: {}, offset: {}", consumerRecord.topic(), consumerRecord.partition(), consumerRecord.offset());
|
||||
MetricUtils.counter(this.getClass(), "received_mcl_count").inc();
|
||||
|
||||
MetadataChangeLog event;
|
||||
try {
|
||||
event = EventUtils.avroToPegasusMCL(record);
|
||||
log.debug(String.format("Successfully converted Avro MCL to Pegasus MCL. urn: %s, key: %s",
|
||||
event.getEntityUrn(),
|
||||
event.getEntityKeyAspect()));
|
||||
|
||||
log.debug("Successfully converted Avro MCL to Pegasus MCL. urn: {}, key: {}", event.getEntityUrn(),
|
||||
event.getEntityKeyAspect());
|
||||
} catch (Exception e) {
|
||||
log.error("Error deserializing message: {}", e.toString());
|
||||
MetricUtils.counter(this.getClass(), "avro_to_pegasus_conversion_failure").inc();
|
||||
log.error("Error deserializing message due to: ", e);
|
||||
log.error("Message: {}", record.toString());
|
||||
return;
|
||||
}
|
||||
|
||||
// TODO: debug
|
||||
log.info(String.format("Invoking MCL hooks for urn: %s, key: %s",
|
||||
event.getEntityUrn(),
|
||||
event.getEntityKeyAspect()));
|
||||
log.debug("Invoking MCL hooks for urn: {}, key: {}", event.getEntityUrn(), event.getEntityKeyAspect());
|
||||
|
||||
// Here - plug in additional "custom processor hooks"
|
||||
for (MetadataChangeLogHook hook : this.hooks) {
|
||||
try {
|
||||
try (Timer.Context ignored = MetricUtils.timer(this.getClass(), hook.getClass().getSimpleName() + "_latency")
|
||||
.time()) {
|
||||
hook.invoke(event);
|
||||
} catch (Exception e) {
|
||||
// Just skip this hook and continue.
|
||||
log.error(String.format("Failed to execute MCL hook with name %s", hook.getClass().getCanonicalName()), e);
|
||||
MetricUtils.counter(this.getClass(), hook.getClass().getSimpleName() + "_failure").inc();
|
||||
log.error("Failed to execute MCL hook with name {}", hook.getClass().getCanonicalName(), e);
|
||||
}
|
||||
}
|
||||
// TODO: debug
|
||||
log.info(String.format("Successfully completed MCL hooks for urn: %s, key: %s",
|
||||
event.getEntityUrn(),
|
||||
event.getEntityKeyAspect()));
|
||||
MetricUtils.counter(this.getClass(), "consumed_mcl_count").inc();
|
||||
log.debug("Successfully completed MCL hooks for urn: {}, key: {}", event.getEntityUrn(),
|
||||
event.getEntityKeyAspect());
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user