From 453d82a0cc03f693e080056099e04a446316cff3 Mon Sep 17 00:00:00 2001 From: david-leifker <114954101+david-leifker@users.noreply.github.com> Date: Thu, 13 Mar 2025 15:24:21 -0500 Subject: [PATCH] fix(api-tracing): handle corner case for historic (#12870) --- .../metadata/trace/TraceServiceImpl.java | 21 ++++++--- .../metadata/trace/TraceServiceImplTest.java | 47 +++++++++++++++++++ 2 files changed, 62 insertions(+), 6 deletions(-) diff --git a/metadata-io/src/main/java/com/linkedin/metadata/trace/TraceServiceImpl.java b/metadata-io/src/main/java/com/linkedin/metadata/trace/TraceServiceImpl.java index f8352a68d9..b47a818ecf 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/trace/TraceServiceImpl.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/trace/TraceServiceImpl.java @@ -331,24 +331,33 @@ public class TraceServiceImpl implements TraceService { primaryStatuses.getOrDefault(urn, new LinkedHashMap<>()); for (String aspectName : entry.getValue()) { - TraceWriteStatus status = primaryStatus.get(aspectName).getWriteStatus(); - if (status == TraceWriteStatus.PENDING) { + TraceWriteStatus primaryStorageStatus = primaryStatus.get(aspectName).getWriteStatus(); + if (primaryStorageStatus == TraceWriteStatus.PENDING) { + // If the primary storage write hasn't happened, then we don't expect the search write finalResponse.put( aspectName, TraceStorageStatus.ok(TraceWriteStatus.PENDING, "Pending primary storage write.")); - } else if (status == TraceWriteStatus.NO_OP) { + } else if (primaryStorageStatus == TraceWriteStatus.NO_OP) { if (entitySpec.getAspectSpec(aspectName).isTimeseries()) { finalResponse.put( aspectName, TraceStorageStatus.ok(TraceWriteStatus.TRACE_NOT_IMPLEMENTED)); } else { + // If the primary write is a no-op, then the search write is as well finalResponse.put(aspectName, TraceStorageStatus.NO_OP); } - } else if (status == TraceWriteStatus.ERROR) { + } else if (primaryStorageStatus == TraceWriteStatus.HISTORIC_STATE) { + // If primary storage is historic then we assume the search write should also be historic + // If the search state hasn't been overwritten, then this "write" didn't fail + finalResponse.put(aspectName, TraceStorageStatus.ok(TraceWriteStatus.HISTORIC_STATE)); + } else if (primaryStorageStatus == TraceWriteStatus.ERROR) { + // If primary write fails, then search write never happened finalResponse.put( aspectName, TraceStorageStatus.fail(TraceWriteStatus.ERROR, "Primary storage write failed.")); - } else if (status == TraceWriteStatus.TRACE_NOT_IMPLEMENTED - || status == TraceWriteStatus.UNKNOWN) { + } else if (primaryStorageStatus == TraceWriteStatus.TRACE_NOT_IMPLEMENTED + || primaryStorageStatus == TraceWriteStatus.UNKNOWN) { + // If we don't know what happened with primary storage, then we can't know what should + // have happend to search finalResponse.put( aspectName, TraceStorageStatus.ok( diff --git a/metadata-io/src/test/java/com/linkedin/metadata/trace/TraceServiceImplTest.java b/metadata-io/src/test/java/com/linkedin/metadata/trace/TraceServiceImplTest.java index 32631eb247..5292736f33 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/trace/TraceServiceImplTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/trace/TraceServiceImplTest.java @@ -392,4 +392,51 @@ public class TraceServiceImplTest { assertEquals(status.getSearchStorage().getWriteStatus(), TraceWriteStatus.NO_OP); assertTrue(status.isSuccess()); } + + @Test + public void testTraceHistoricStateSearchPropagation() throws Exception { + // This test verifies that when primary storage has HISTORIC_STATE, + // the search storage is also set to HISTORIC_STATE based on this line: + // finalResponse.put(aspectName, TraceStorageStatus.ok(TraceWriteStatus.HISTORIC_STATE)); + + // Arrange - create request with one URN and one aspect + Map> aspectNames = + Collections.singletonMap(TEST_URN, Collections.singletonList(ASPECT_NAME)); + + // Create a primary storage response with HISTORIC_STATE + Map primaryStatus = new LinkedHashMap<>(); + primaryStatus.put(ASPECT_NAME, TraceStorageStatus.ok(TraceWriteStatus.HISTORIC_STATE)); + + // Mock entityService to return empty to skip that branch + when(entityService.getEntitiesV2(any(), anyString(), anySet(), anySet(), anyBoolean())) + .thenReturn(Collections.emptyMap()); + + // Mock mcpTraceReader to return our primary status with HISTORIC_STATE + when(mcpTraceReader.tracePendingStatuses(any(), eq(TEST_TRACE_ID), any(), anyBoolean())) + .thenReturn(Collections.singletonMap(TEST_URN, primaryStatus)); + + // Mock systemMetadataService to return empty list to ensure + // we don't get a pre-existing search status + when(systemMetadataService.findAspectsByUrn(eq(TEST_URN), anyList(), eq(true))) + .thenReturn(Collections.emptyList()); + + // Act + Map> result = + traceService.trace(operationContext, TEST_TRACE_ID, aspectNames, false, false); + + // Assert + assertNotNull(result); + assertTrue(result.containsKey(TEST_URN)); + Map urnStatus = result.get(TEST_URN); + assertTrue(urnStatus.containsKey(ASPECT_NAME)); + + // The key assertion - if primary is HISTORIC_STATE, search should also be HISTORIC_STATE + TraceStatus status = urnStatus.get(ASPECT_NAME); + assertEquals(status.getPrimaryStorage().getWriteStatus(), TraceWriteStatus.HISTORIC_STATE); + assertEquals( + status.getSearchStorage().getWriteStatus(), + TraceWriteStatus.HISTORIC_STATE, + "When primary storage is HISTORIC_STATE, search storage should also be HISTORIC_STATE"); + assertTrue(status.isSuccess()); + } }