fix(tracing): handle noop mcl (#12713)

This commit is contained in:
david-leifker 2025-02-24 12:13:20 -06:00 committed by GitHub
parent 16ef1ac174
commit 005a9b0f0d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 673 additions and 11 deletions

View File

@ -4,12 +4,14 @@ import static com.linkedin.metadata.Constants.DEFAULT_RUN_ID;
import com.datahub.util.RecordUtils;
import com.linkedin.data.template.SetMode;
import com.linkedin.data.template.StringMap;
import com.linkedin.mxe.SystemMetadata;
import javax.annotation.Nullable;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class SystemMetadataUtils {
private static final String NO_OP_KEY = "isNoOp";
private SystemMetadataUtils() {}
@ -42,4 +44,23 @@ public class SystemMetadataUtils {
}
return RecordUtils.toRecordTemplate(SystemMetadata.class, jsonSystemMetadata);
}
public static boolean isNoOp(@Nullable SystemMetadata systemMetadata) {
if (systemMetadata != null && systemMetadata.hasProperties()) {
return Boolean.parseBoolean(systemMetadata.getProperties().getOrDefault(NO_OP_KEY, "false"));
}
return false;
}
@Nullable
public static SystemMetadata setNoOp(@Nullable SystemMetadata systemMetadata, boolean isNoOp) {
if (systemMetadata != null) {
if (!systemMetadata.hasProperties()) {
systemMetadata.setProperties(new StringMap());
}
systemMetadata.getProperties().put(NO_OP_KEY, String.valueOf(isNoOp));
}
return systemMetadata;
}
}

View File

@ -0,0 +1,177 @@
package com.linkedin.metadata.utils;
import static com.linkedin.metadata.Constants.DEFAULT_RUN_ID;
import static org.testng.Assert.*;
import com.linkedin.data.template.StringMap;
import com.linkedin.mxe.SystemMetadata;
import org.testng.annotations.Test;
public class SystemMetadataUtilsTest {
@Test
public void testCreateDefaultSystemMetadata() {
SystemMetadata metadata = SystemMetadataUtils.createDefaultSystemMetadata();
assertNotNull(metadata);
assertEquals(metadata.getRunId(), DEFAULT_RUN_ID);
assertTrue(metadata.hasLastObserved());
assertTrue(metadata.getLastObserved() > 0);
}
@Test
public void testCreateDefaultSystemMetadataWithRunId() {
String customRunId = "custom-run-id";
SystemMetadata metadata = SystemMetadataUtils.createDefaultSystemMetadata(customRunId);
assertNotNull(metadata);
assertEquals(metadata.getRunId(), customRunId);
assertTrue(metadata.hasLastObserved());
assertTrue(metadata.getLastObserved() > 0);
}
@Test
public void testGenerateSystemMetadataIfEmpty() {
// Test with null input
SystemMetadata nullMetadata = SystemMetadataUtils.generateSystemMetadataIfEmpty(null);
assertNotNull(nullMetadata);
assertEquals(nullMetadata.getRunId(), DEFAULT_RUN_ID);
assertTrue(nullMetadata.hasLastObserved());
// Test with existing metadata
SystemMetadata existingMetadata =
new SystemMetadata().setRunId("existing-run").setLastObserved(1234567890L);
SystemMetadata result = SystemMetadataUtils.generateSystemMetadataIfEmpty(existingMetadata);
assertEquals(result.getRunId(), "existing-run");
assertEquals(result.getLastObserved(), 1234567890L);
}
@Test
public void testParseSystemMetadata() {
// Test null input
SystemMetadata nullResult = SystemMetadataUtils.parseSystemMetadata(null);
assertNotNull(nullResult);
assertEquals(nullResult.getRunId(), DEFAULT_RUN_ID);
// Test empty string input
SystemMetadata emptyResult = SystemMetadataUtils.parseSystemMetadata("");
assertNotNull(emptyResult);
assertEquals(emptyResult.getRunId(), DEFAULT_RUN_ID);
// Test valid JSON input
String validJson = "{\"runId\":\"test-run\",\"lastObserved\":1234567890}";
SystemMetadata jsonResult = SystemMetadataUtils.parseSystemMetadata(validJson);
assertNotNull(jsonResult);
assertEquals(jsonResult.getRunId(), "test-run");
assertEquals(jsonResult.getLastObserved(), 1234567890L);
}
@Test
public void testIsNoOp() {
// Test null metadata
assertFalse(SystemMetadataUtils.isNoOp(null));
// Test metadata without properties
SystemMetadata emptyMetadata = new SystemMetadata();
assertFalse(SystemMetadataUtils.isNoOp(emptyMetadata));
// Test metadata with isNoOp=true
SystemMetadata noOpMetadata = new SystemMetadata();
StringMap properties = new StringMap();
properties.put("isNoOp", "true");
noOpMetadata.setProperties(properties);
assertTrue(SystemMetadataUtils.isNoOp(noOpMetadata));
// Test metadata with isNoOp=false
properties.put("isNoOp", "false");
assertFalse(SystemMetadataUtils.isNoOp(noOpMetadata));
}
@Test
public void testSetNoOp() {
// Test with null metadata
assertNull(SystemMetadataUtils.setNoOp(null, true));
// Test setting noOp to true
SystemMetadata metadata = new SystemMetadata();
SystemMetadata result = SystemMetadataUtils.setNoOp(metadata, true);
assertNotNull(result);
assertTrue(result.hasProperties());
assertNotNull(result.getProperties());
assertEquals(result.getProperties().get("isNoOp"), "true");
// Test setting noOp to false
result = SystemMetadataUtils.setNoOp(metadata, false);
assertNotNull(result);
assertTrue(result.hasProperties());
assertNotNull(result.getProperties());
assertEquals(result.getProperties().get("isNoOp"), "false");
// Test with existing properties
StringMap existingProps = new StringMap();
existingProps.put("otherKey", "value");
metadata.setProperties(existingProps);
result = SystemMetadataUtils.setNoOp(metadata, true);
assertNotNull(result);
assertEquals(result.getProperties().get("otherKey"), "value");
assertEquals(result.getProperties().get("isNoOp"), "true");
}
@Test
public void testGenerateSystemMetadataIfEmpty_NullInput() {
SystemMetadata result = SystemMetadataUtils.generateSystemMetadataIfEmpty(null);
assertNotNull(result);
assertEquals(DEFAULT_RUN_ID, result.getRunId());
assertNotNull(result.getLastObserved());
assertTrue(result.getLastObserved() > 0);
}
@Test
public void testGenerateSystemMetadataIfEmpty_NoRunId() {
SystemMetadata input = new SystemMetadata().setLastObserved(1234567890L);
SystemMetadata result = SystemMetadataUtils.generateSystemMetadataIfEmpty(input);
assertNotNull(result);
assertEquals(DEFAULT_RUN_ID, result.getRunId());
assertEquals(1234567890L, result.getLastObserved().longValue());
}
@Test
public void testGenerateSystemMetadataIfEmpty_NoLastObserved() {
SystemMetadata input = new SystemMetadata().setRunId("custom-run-id");
SystemMetadata result = SystemMetadataUtils.generateSystemMetadataIfEmpty(input);
assertNotNull(result);
assertEquals("custom-run-id", result.getRunId());
assertNotNull(result.getLastObserved());
assertTrue(result.getLastObserved() > 0);
}
@Test
public void testGenerateSystemMetadataIfEmpty_ZeroLastObserved() {
SystemMetadata input = new SystemMetadata().setRunId("custom-run-id").setLastObserved(0L);
SystemMetadata result = SystemMetadataUtils.generateSystemMetadataIfEmpty(input);
assertNotNull(result);
assertEquals("custom-run-id", result.getRunId());
assertNotNull(result.getLastObserved());
assertTrue(result.getLastObserved() > 0);
}
@Test
public void testGenerateSystemMetadataIfEmpty_AllFieldsPopulated() {
SystemMetadata input =
new SystemMetadata().setRunId("custom-run-id").setLastObserved(1234567890L);
SystemMetadata result = SystemMetadataUtils.generateSystemMetadataIfEmpty(input);
assertNotNull(result);
assertEquals("custom-run-id", result.getRunId());
assertEquals(1234567890L, result.getLastObserved().longValue());
}
}

View File

@ -97,6 +97,7 @@ dependencies {
testImplementation externalDependency.springBootTest
testImplementation spec.product.pegasus.restliServer
testImplementation externalDependency.ebeanTest
testImplementation externalDependency.opentelemetrySdk
// logback >=1.3 required due to `testcontainers` only
testImplementation 'ch.qos.logback:logback-classic:1.4.7'

View File

@ -10,6 +10,7 @@ import com.linkedin.metadata.aspect.batch.AspectsBatch;
import com.linkedin.metadata.entity.ebean.EbeanAspectV2;
import com.linkedin.metadata.entity.ebean.PartitionedStream;
import com.linkedin.metadata.entity.restoreindices.RestoreIndicesArgs;
import com.linkedin.metadata.utils.SystemMetadataUtils;
import com.linkedin.metadata.utils.metrics.MetricUtils;
import com.linkedin.mxe.SystemMetadata;
import com.linkedin.util.Pair;
@ -142,15 +143,19 @@ public interface AspectDao {
.equals(currentVersion0.getSystemMetadataVersion())) {
inserted = insertAspect(txContext, latestAspect.getDatabaseAspect().get(), targetVersion);
// add trace - overwrite if version incremented
newAspect.setSystemMetadata(opContext.withTraceId(newAspect.getSystemMetadata(), true));
}
// update version 0
Optional<EntityAspect> updated = Optional.empty();
boolean isNoOp =
Objects.equals(currentVersion0.getRecordTemplate(), newAspect.getRecordTemplate());
if (!Objects.equals(currentVersion0.getSystemMetadata(), newAspect.getSystemMetadata())
|| !Objects.equals(currentVersion0.getRecordTemplate(), newAspect.getRecordTemplate())) {
|| !isNoOp) {
// update no-op used for tracing
SystemMetadataUtils.setNoOp(newAspect.getSystemMetadata(), isNoOp);
// add trace - overwrite if version incremented
newAspect.setSystemMetadata(opContext.withTraceId(newAspect.getSystemMetadata(), true));
updated = updateAspect(txContext, newAspect);
}

View File

@ -76,6 +76,7 @@ import com.linkedin.metadata.utils.AuditStampUtils;
import com.linkedin.metadata.utils.EntityApiUtils;
import com.linkedin.metadata.utils.GenericRecordUtils;
import com.linkedin.metadata.utils.PegasusUtils;
import com.linkedin.metadata.utils.SystemMetadataUtils;
import com.linkedin.metadata.utils.metrics.MetricUtils;
import com.linkedin.mxe.MetadataAuditOperation;
import com.linkedin.mxe.MetadataChangeLog;
@ -2050,7 +2051,8 @@ public class EntityServiceImpl implements EntityService<ChangeItemImpl> {
Urn entityUrn,
AuditStamp auditStamp,
AspectSpec aspectSpec) {
boolean isNoOp = Objects.equals(oldAspect, newAspect);
boolean isNoOp =
SystemMetadataUtils.isNoOp(newSystemMetadata) || Objects.equals(oldAspect, newAspect);
if (!isNoOp || alwaysEmitChangeLog || shouldAspectEmitChangeLog(aspectSpec)) {
log.info("Producing MCL for ingested aspect {}, urn {}", aspectSpec.getName(), entityUrn);

View File

@ -11,6 +11,7 @@ import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.metadata.run.AspectRowSummary;
import com.linkedin.metadata.systemmetadata.SystemMetadataService;
import com.linkedin.metadata.systemmetadata.TraceService;
import com.linkedin.metadata.utils.SystemMetadataUtils;
import com.linkedin.mxe.FailedMetadataChangeProposal;
import com.linkedin.mxe.SystemMetadata;
import com.linkedin.util.Pair;
@ -168,7 +169,12 @@ public class TraceServiceImpl implements TraceService {
String aspectName = aspectEntry.getKey();
if (traceId.equals(systemTraceId)) {
aspectStatuses.put(aspectName, TraceStorageStatus.ok(TraceWriteStatus.ACTIVE_STATE));
if (SystemMetadataUtils.isNoOp(systemMetadata)) {
aspectStatuses.put(aspectName, TraceStorageStatus.ok(TraceWriteStatus.NO_OP));
} else {
aspectStatuses.put(
aspectName, TraceStorageStatus.ok(TraceWriteStatus.ACTIVE_STATE));
}
} else if (traceTimestampMillis <= extractTimestamp(systemTraceId, createdOnMillis)) {
aspectStatuses.put(
aspectName, TraceStorageStatus.ok(TraceWriteStatus.HISTORIC_STATE));
@ -421,7 +427,9 @@ public class TraceServiceImpl implements TraceService {
storageEntry -> {
String aspectName = storageEntry.getKey();
TraceStorageStatus primaryStatus = storageEntry.getValue();
TraceStorageStatus searchStatus = searchAspectStatus.get(aspectName);
TraceStorageStatus searchStatus =
searchAspectStatus.getOrDefault(
aspectName, TraceStorageStatus.ok(TraceWriteStatus.PENDING));
TraceStatus traceStatus =
TraceStatus.builder()
.primaryStorage(primaryStatus)
@ -448,7 +456,7 @@ public class TraceServiceImpl implements TraceService {
}
private static boolean isSuccess(
TraceStorageStatus primaryStatus, TraceStorageStatus searchStatus) {
@Nonnull TraceStorageStatus primaryStatus, @Nonnull TraceStorageStatus searchStatus) {
return !TraceWriteStatus.ERROR.equals(primaryStatus.getWriteStatus())
&& !TraceWriteStatus.ERROR.equals(searchStatus.getWriteStatus());
}

View File

@ -4,6 +4,8 @@ import static com.linkedin.metadata.Constants.CORP_USER_ENTITY_NAME;
import static com.linkedin.metadata.Constants.STATUS_ASPECT_NAME;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import com.datahub.util.RecordUtils;
@ -11,6 +13,7 @@ import com.linkedin.common.Status;
import com.linkedin.common.urn.Urn;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.data.template.SetMode;
import com.linkedin.data.template.StringMap;
import com.linkedin.metadata.aspect.EntityAspect;
import com.linkedin.metadata.aspect.SystemAspect;
import com.linkedin.metadata.entity.ebean.EbeanAspectV2;
@ -19,10 +22,19 @@ import com.linkedin.metadata.entity.ebean.PartitionedStream;
import com.linkedin.metadata.entity.restoreindices.RestoreIndicesArgs;
import com.linkedin.metadata.models.EntitySpec;
import com.linkedin.metadata.utils.AuditStampUtils;
import com.linkedin.metadata.utils.SystemMetadataUtils;
import com.linkedin.mxe.SystemMetadata;
import com.linkedin.util.Pair;
import io.datahubproject.metadata.context.ObjectMapperContext;
import io.datahubproject.metadata.context.OperationContext;
import io.datahubproject.metadata.context.TraceContext;
import io.datahubproject.test.metadata.context.TestOperationContexts;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import io.opentelemetry.sdk.trace.SdkTracerProvider;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@ -39,7 +51,18 @@ import org.testng.annotations.Test;
public class AspectDaoTest {
private final OperationContext opContext =
TestOperationContexts.systemContextNoSearchAuthorization();
TestOperationContexts.systemContextTraceNoSearchAuthorization(
() -> ObjectMapperContext.DEFAULT,
() -> {
// Set up OpenTelemetry SDK for testing
SdkTracerProvider tracerProvider = SdkTracerProvider.builder().build();
OpenTelemetry openTelemetry =
OpenTelemetrySdk.builder().setTracerProvider(tracerProvider).build();
// Create a tracer
Tracer tracer = openTelemetry.getTracer("test-tracer");
return TraceContext.builder().tracer(tracer).build();
});
private final EntitySpec corpUserEntitySpec =
opContext.getEntityRegistry().getEntitySpec(CORP_USER_ENTITY_NAME);
@ -208,6 +231,170 @@ public class AspectDaoTest {
aspectDao.saveLatestAspect(opContext, null, null, newAspect);
}
@Test
public void testSaveLatestAspect_NoOpTracingSet() {
// Setup
SystemAspect currentAspect = createSystemAspect("1");
SystemAspect newAspect = createSystemAspect("1");
SystemAspect dbAspect = createSystemAspect("1");
currentAspect.setDatabaseAspect(dbAspect);
// Execute
Pair<Optional<EntityAspect>, Optional<EntityAspect>> result =
aspectDao.saveLatestAspect(opContext, txContext, currentAspect, newAspect);
// Verify
// Should not have any changes since it's a true no-op (same version and content)
assertFalse(result.getFirst().isPresent(), "Should not have inserted previous version");
assertFalse(result.getSecond().isPresent(), "Should not have updated current version");
// The input aspect should not be modified since no update occurred
assertNull(
newAspect.getSystemMetadata().getProperties(),
"SystemMetadata should not be modified for no-op case with no update");
}
@Test
public void testSaveLatestAspect_NoOpWithMetadataChange() {
opContext.withSpan(
"testSaveLatestAspect_NoOpWithMetadataChange",
() -> {
// Verify span context is valid
SpanContext currentSpanContext = Span.current().getSpanContext();
assertTrue(currentSpanContext.isValid(), "Span context should be valid");
// Setup
SystemAspect currentAspect = createSystemAspect("1");
SystemAspect newAspect = createSystemAspect("1");
SystemAspect dbAspect = createSystemAspect("1");
currentAspect.setDatabaseAspect(dbAspect);
// Modify system metadata but keep same content
newAspect
.getSystemMetadata()
.setLastObserved(newAspect.getSystemMetadata().getLastObserved() + 1);
// Execute
Pair<Optional<EntityAspect>, Optional<EntityAspect>> result =
aspectDao.saveLatestAspect(opContext, txContext, currentAspect, newAspect);
// Verify
assertFalse(result.getFirst().isPresent(), "Should not have inserted previous version");
assertTrue(result.getSecond().isPresent(), "Should have updated current version");
SystemMetadata updatedMetadata =
RecordUtils.toRecordTemplate(
SystemMetadata.class, result.getSecond().get().getSystemMetadata());
assertTrue(
SystemMetadataUtils.isNoOp(updatedMetadata),
"NoOp should be true for metadata-only change");
assertTrue(
updatedMetadata.getProperties().containsKey("telemetryTraceId"),
"TraceId should be set");
});
}
@Test
public void testSaveLatestAspect_ContentChangeTracing() {
opContext.withSpan(
"testSaveLatestAspect_ContentChangeTracing",
() -> {
// Verify span context is valid
SpanContext currentSpanContext = Span.current().getSpanContext();
assertTrue(currentSpanContext.isValid(), "Span context should be valid");
// Setup
SystemAspect currentAspect = createSystemAspect("1");
SystemAspect newAspect = createSystemAspect("2");
// Modify the content to ensure it's not a no-op
Status newStatus = new Status().setRemoved(true);
newAspect.setRecordTemplate(newStatus);
SystemAspect dbAspect = createSystemAspect("1");
currentAspect.setDatabaseAspect(dbAspect);
// Execute
Pair<Optional<EntityAspect>, Optional<EntityAspect>> result =
aspectDao.saveLatestAspect(opContext, txContext, currentAspect, newAspect);
// Verify
assertTrue(result.getFirst().isPresent(), "Should have inserted previous version");
assertTrue(result.getSecond().isPresent(), "Should have updated current version");
SystemMetadata updatedMetadata =
RecordUtils.toRecordTemplate(
SystemMetadata.class, result.getSecond().get().getSystemMetadata());
assertFalse(
SystemMetadataUtils.isNoOp(updatedMetadata),
"NoOp should be false for content change");
assertTrue(
updatedMetadata.getProperties().containsKey("telemetryTraceId"),
"TraceId should be set");
// Verify previous version's metadata is unchanged
SystemMetadata previousMetadata =
RecordUtils.toRecordTemplate(
SystemMetadata.class, result.getFirst().get().getSystemMetadata());
assertFalse(
SystemMetadataUtils.isNoOp(previousMetadata),
"Previous version should not have NoOp flag");
});
}
@Test
public void testSaveLatestAspect_TraceIdPropagation() {
opContext.withSpan(
"testSaveLatestAspect_TraceIdPropagation",
() -> {
// Verify span context is valid
SpanContext currentSpanContext = Span.current().getSpanContext();
assertTrue(currentSpanContext.isValid(), "Span context should be valid");
// Setup
String existingTraceId = "existing-trace-123";
SystemAspect currentAspect = createSystemAspect("1");
currentAspect
.getSystemMetadata()
.setProperties(new StringMap(Map.of("telemetryTraceId", existingTraceId)));
SystemAspect newAspect = createSystemAspect("2");
// Set a different trace ID to verify overwrite behavior
newAspect
.getSystemMetadata()
.setProperties(new StringMap(Map.of("telemetryTraceId", "new-trace-456")));
SystemAspect dbAspect = createSystemAspect("1");
dbAspect
.getSystemMetadata()
.setProperties(new StringMap(Map.of("telemetryTraceId", existingTraceId)));
currentAspect.setDatabaseAspect(dbAspect);
// Execute
Pair<Optional<EntityAspect>, Optional<EntityAspect>> result =
aspectDao.saveLatestAspect(opContext, txContext, currentAspect, newAspect);
// Verify
assertTrue(result.getSecond().isPresent(), "Should have updated current version");
SystemMetadata updatedMetadata =
RecordUtils.toRecordTemplate(
SystemMetadata.class, result.getSecond().get().getSystemMetadata());
assertTrue(
updatedMetadata.getProperties().containsKey("telemetryTraceId"),
"TraceId should be set");
assertNotEquals(
updatedMetadata.getProperties().get("telemetryTraceId"),
existingTraceId,
"TraceId should be overwritten for version increment");
assertFalse(
updatedMetadata.getProperties().get("telemetryTraceId").contains("-trace-"),
"TraceId should match operation context and not the test trace ids");
});
}
// Concrete implementation for testing default methods
private class TestAspectDao implements AspectDao {

View File

@ -1,35 +1,90 @@
package com.linkedin.metadata.entity;
import static com.linkedin.metadata.Constants.STATUS_ASPECT_NAME;
import static com.linkedin.metadata.Constants.UPSTREAM_LINEAGE_ASPECT_NAME;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import com.datahub.util.RecordUtils;
import com.linkedin.common.AuditStamp;
import com.linkedin.common.Status;
import com.linkedin.common.urn.Urn;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.data.template.DataTemplateUtil;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.dataset.UpstreamLineage;
import com.linkedin.identity.CorpUserInfo;
import com.linkedin.metadata.AspectGenerationUtils;
import com.linkedin.metadata.aspect.SystemAspect;
import com.linkedin.metadata.aspect.batch.ChangeMCP;
import com.linkedin.metadata.config.PreProcessHooks;
import com.linkedin.metadata.entity.ebean.EbeanAspectV2;
import com.linkedin.metadata.entity.ebean.EbeanSystemAspect;
import com.linkedin.metadata.entity.ebean.batch.ChangeItemImpl;
import com.linkedin.metadata.event.EventProducer;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.metadata.utils.GenericRecordUtils;
import com.linkedin.metadata.utils.SystemMetadataUtils;
import com.linkedin.mxe.MetadataChangeProposal;
import com.linkedin.mxe.SystemMetadata;
import com.linkedin.util.Pair;
import io.datahubproject.metadata.context.OperationContext;
import io.datahubproject.test.metadata.context.TestOperationContexts;
import java.sql.Timestamp;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
public class EntityServiceImplApplyUpsertTest {
public class EntityServiceImplTest {
private final AuditStamp TEST_AUDIT_STAMP = AspectGenerationUtils.createAuditStamp();
private final OperationContext opContext =
TestOperationContexts.systemContextNoSearchAuthorization();
private final EntityRegistry testEntityRegistry = opContext.getEntityRegistry();
private static final Urn TEST_URN = UrnUtils.getUrn("urn:li:corpuser:EntityServiceImplTest");
private EventProducer mockEventProducer;
private Status oldAspect;
private Status newAspect;
private EntityServiceImpl entityService;
private MetadataChangeProposal testMCP;
@BeforeMethod
public void setup() throws Exception {
mockEventProducer = mock(EventProducer.class);
// Initialize common test objects
entityService =
new EntityServiceImpl(
mock(AspectDao.class), mockEventProducer, false, mock(PreProcessHooks.class), 0, true);
// Create test aspects
oldAspect = new Status().setRemoved(false);
newAspect = new Status().setRemoved(true);
testMCP =
new MetadataChangeProposal()
.setEntityUrn(TEST_URN)
.setEntityType(TEST_URN.getEntityType())
.setAspectName(STATUS_ASPECT_NAME)
.setAspect(GenericRecordUtils.serializeAspect(newAspect));
when(mockEventProducer.produceMetadataChangeLog(
any(OperationContext.class), any(), any(), any()))
.thenReturn(CompletableFuture.completedFuture(null));
}
@Test
public void testApplyUpsertNoOp() throws Exception {
// Set up initial system metadata
@ -293,4 +348,161 @@ public class EntityServiceImplApplyUpsertTest {
assertEquals(changeMCP.getNextAspectVersion(), 1);
assertEquals(changeMCP.getSystemMetadata().getVersion(), "1");
}
@Test
public void testNoMCLWhenSystemMetadataIsNoOp() {
// Arrange
SystemMetadata systemMetadata = SystemMetadataUtils.createDefaultSystemMetadata();
SystemMetadataUtils.setNoOp(systemMetadata, true); // Makes it a no-op
// Act
Optional<Pair<Future<?>, Boolean>> result =
entityService.conditionallyProduceMCLAsync(
opContext,
oldAspect,
null, // oldSystemMetadata
newAspect,
systemMetadata,
testMCP,
TEST_URN,
TEST_AUDIT_STAMP,
opContext
.getEntityRegistry()
.getEntitySpec(TEST_URN.getEntityType())
.getAspectSpec(STATUS_ASPECT_NAME));
// Assert
assertFalse(result.isPresent(), "Should not produce MCL when system metadata is no-op");
verify(mockEventProducer, never()).produceMetadataChangeLog(any(), any(), any());
}
@Test
public void testNoMCLWhenAspectsAreEqual() {
// Arrange
RecordTemplate sameAspect = newAspect;
// Act
Optional<Pair<Future<?>, Boolean>> result =
entityService.conditionallyProduceMCLAsync(
opContext,
sameAspect,
null, // oldSystemMetadata
sameAspect,
SystemMetadataUtils.createDefaultSystemMetadata(),
testMCP,
TEST_URN,
TEST_AUDIT_STAMP,
opContext
.getEntityRegistry()
.getEntitySpec(TEST_URN.getEntityType())
.getAspectSpec(STATUS_ASPECT_NAME));
// Assert
assertFalse(result.isPresent(), "Should not produce MCL when aspects are equal");
verify(mockEventProducer, never()).produceMetadataChangeLog(any(), any(), any());
}
@Test
public void testProducesMCLWhenChangesExist() {
// Arrange
SystemMetadata systemMetadata = SystemMetadataUtils.createDefaultSystemMetadata();
SystemMetadataUtils.setNoOp(systemMetadata, false); // Makes it not a no-op
// Act
Optional<Pair<Future<?>, Boolean>> result =
entityService.conditionallyProduceMCLAsync(
opContext,
oldAspect,
null, // oldSystemMetadata
newAspect,
systemMetadata,
testMCP,
TEST_URN,
TEST_AUDIT_STAMP,
opContext
.getEntityRegistry()
.getEntitySpec(TEST_URN.getEntityType())
.getAspectSpec(STATUS_ASPECT_NAME));
// Assert
assertTrue(result.isPresent(), "Should produce MCL when changes exist");
verify(mockEventProducer, times(1))
.produceMetadataChangeLog(any(OperationContext.class), any(), any(), any());
}
@Test
public void testAlwaysEmitChangeLogFlag() {
// Arrange
entityService =
new EntityServiceImpl(
mock(AspectDao.class),
mockEventProducer,
true, // alwaysEmitChangeLog set to true
mock(PreProcessHooks.class),
0,
true);
RecordTemplate sameAspect = newAspect;
// Act
Optional<Pair<Future<?>, Boolean>> result =
entityService.conditionallyProduceMCLAsync(
opContext,
sameAspect,
null, // oldSystemMetadata
sameAspect, // Same aspect
SystemMetadataUtils.createDefaultSystemMetadata(),
testMCP,
TEST_URN,
TEST_AUDIT_STAMP,
opContext
.getEntityRegistry()
.getEntitySpec(TEST_URN.getEntityType())
.getAspectSpec(STATUS_ASPECT_NAME));
// Assert
assertTrue(
result.isPresent(),
"Should produce MCL when alwaysEmitChangeLog is true, regardless of no-op status");
verify(mockEventProducer, times(1))
.produceMetadataChangeLog(any(OperationContext.class), any(), any(), any());
}
@Test
public void testAspectWithLineageRelationship() {
// Arrange
Urn datasetUrn =
UrnUtils.getUrn(
"urn:li:dataset:(urn:li:dataPlatform:test,testAspectWithLineageRelationship,PROD)");
UpstreamLineage sameLineageAspect = new UpstreamLineage();
MetadataChangeProposal datasetMCP =
new MetadataChangeProposal()
.setEntityUrn(datasetUrn)
.setEntityType(datasetUrn.getEntityType())
.setAspectName(UPSTREAM_LINEAGE_ASPECT_NAME)
.setAspect(GenericRecordUtils.serializeAspect(sameLineageAspect));
// Act
Optional<Pair<Future<?>, Boolean>> result =
entityService.conditionallyProduceMCLAsync(
opContext,
sameLineageAspect,
null, // oldSystemMetadata
sameLineageAspect, // Same aspect
SystemMetadataUtils.createDefaultSystemMetadata(),
datasetMCP,
datasetUrn,
TEST_AUDIT_STAMP,
opContext
.getEntityRegistry()
.getEntitySpec(datasetUrn.getEntityType())
.getAspectSpec(UPSTREAM_LINEAGE_ASPECT_NAME));
// Assert
assertTrue(
result.isPresent(),
"Should produce MCL when aspect has lineage relationship, regardless of no-op status");
verify(mockEventProducer, times(1))
.produceMetadataChangeLog(any(OperationContext.class), any(), any(), any());
}
}

View File

@ -1352,6 +1352,7 @@ public abstract class EntityServiceTest<T_AD extends AspectDao, T_RS extends Ret
EntityAspect readAspectDao2 = _aspectDao.getAspect(entityUrn.toString(), aspectName, 0);
assertTrue(DataTemplateUtil.areEqual(writeAspect2, readAspect2));
SystemMetadataUtils.setNoOp(expectedMetadata2, false);
assertTrue(
DataTemplateUtil.areEqual(
SystemMetadataUtils.parseSystemMetadata(readAspectDao2.getSystemMetadata()),
@ -1393,7 +1394,9 @@ public abstract class EntityServiceTest<T_AD extends AspectDao, T_RS extends Ret
SystemMetadata metadata2 =
AspectGenerationUtils.createSystemMetadata(1635792689, "run-456", null, "2");
SystemMetadata expectedMetadata2 =
AspectGenerationUtils.createSystemMetadata(1635792689, "run-456", "run-123", "2");
SystemMetadataUtils.setNoOp(
AspectGenerationUtils.createSystemMetadata(1635792689, "run-456", "run-123", "2"),
false);
List<ChangeItemImpl> items =
List.of(
@ -1594,6 +1597,7 @@ public abstract class EntityServiceTest<T_AD extends AspectDao, T_RS extends Ret
SystemMetadataUtils.parseSystemMetadata(readAspectDao2.getSystemMetadata()),
metadata1));
SystemMetadataUtils.setNoOp(expectedMetadata2, true);
assertTrue(
DataTemplateUtil.areEqual(
SystemMetadataUtils.parseSystemMetadata(readAspectDao2.getSystemMetadata()),

View File

@ -25,6 +25,7 @@ import com.linkedin.metadata.EventUtils;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.run.AspectRowSummary;
import com.linkedin.metadata.systemmetadata.SystemMetadataService;
import com.linkedin.metadata.utils.SystemMetadataUtils;
import com.linkedin.mxe.FailedMetadataChangeProposal;
import com.linkedin.mxe.MetadataChangeProposal;
import com.linkedin.mxe.SystemMetadata;
@ -347,4 +348,48 @@ public class TraceServiceImplTest {
"java.lang.IllegalArgumentException");
assertFalse(status.isSuccess());
}
@Test
public void testTraceWithNoOpState() throws Exception {
// Arrange
Map<Urn, List<String>> aspectNames =
Collections.singletonMap(TEST_URN, Collections.singletonList(ASPECT_NAME));
// Create system metadata with NO_OP state
SystemMetadata systemMetadata = new SystemMetadata();
systemMetadata.setProperties(
new StringMap(Map.of(TraceContext.TELEMETRY_TRACE_KEY, TEST_TRACE_ID)));
SystemMetadataUtils.setNoOp(systemMetadata, true); // Set NO_OP flag
// Create enveloped aspect with NO_OP system metadata
EnvelopedAspect envelopedAspect = new EnvelopedAspect();
envelopedAspect.setCreated(new AuditStamp().setTime(Instant.now().toEpochMilli()));
envelopedAspect.setSystemMetadata(systemMetadata);
// Set up entity response
EntityResponse entityResponse = new EntityResponse();
entityResponse.setAspects(
new EnvelopedAspectMap(Collections.singletonMap(ASPECT_NAME, envelopedAspect)));
entityResponse.setEntityName(TEST_URN.getEntityType());
entityResponse.setUrn(TEST_URN);
// Mock entity service response
when(entityService.getEntitiesV2(any(), anyString(), anySet(), anySet(), anyBoolean()))
.thenReturn(Collections.singletonMap(TEST_URN, entityResponse));
// Act
Map<Urn, Map<String, TraceStatus>> result =
traceService.trace(operationContext, TEST_TRACE_ID, aspectNames, false, false);
// Assert
assertNotNull(result);
assertTrue(result.containsKey(TEST_URN));
Map<String, TraceStatus> urnStatus = result.get(TEST_URN);
assertTrue(urnStatus.containsKey(ASPECT_NAME));
TraceStatus status = urnStatus.get(ASPECT_NAME);
assertEquals(status.getPrimaryStorage().getWriteStatus(), TraceWriteStatus.NO_OP);
assertEquals(status.getSearchStorage().getWriteStatus(), TraceWriteStatus.NO_OP);
assertTrue(status.isSuccess());
}
}