mirror of
https://github.com/datahub-project/datahub.git
synced 2025-11-03 04:10:43 +00:00
fix(api-tracing): handle corner case for historic (#12870)
This commit is contained in:
parent
eb17f80e8a
commit
453d82a0cc
@ -331,24 +331,33 @@ public class TraceServiceImpl implements TraceService {
|
||||
primaryStatuses.getOrDefault(urn, new LinkedHashMap<>());
|
||||
|
||||
for (String aspectName : entry.getValue()) {
|
||||
TraceWriteStatus status = primaryStatus.get(aspectName).getWriteStatus();
|
||||
if (status == TraceWriteStatus.PENDING) {
|
||||
TraceWriteStatus primaryStorageStatus = primaryStatus.get(aspectName).getWriteStatus();
|
||||
if (primaryStorageStatus == TraceWriteStatus.PENDING) {
|
||||
// If the primary storage write hasn't happened, then we don't expect the search write
|
||||
finalResponse.put(
|
||||
aspectName,
|
||||
TraceStorageStatus.ok(TraceWriteStatus.PENDING, "Pending primary storage write."));
|
||||
} else if (status == TraceWriteStatus.NO_OP) {
|
||||
} else if (primaryStorageStatus == TraceWriteStatus.NO_OP) {
|
||||
if (entitySpec.getAspectSpec(aspectName).isTimeseries()) {
|
||||
finalResponse.put(
|
||||
aspectName, TraceStorageStatus.ok(TraceWriteStatus.TRACE_NOT_IMPLEMENTED));
|
||||
} else {
|
||||
// If the primary write is a no-op, then the search write is as well
|
||||
finalResponse.put(aspectName, TraceStorageStatus.NO_OP);
|
||||
}
|
||||
} else if (status == TraceWriteStatus.ERROR) {
|
||||
} else if (primaryStorageStatus == TraceWriteStatus.HISTORIC_STATE) {
|
||||
// If primary storage is historic then we assume the search write should also be historic
|
||||
// If the search state hasn't been overwritten, then this "write" didn't fail
|
||||
finalResponse.put(aspectName, TraceStorageStatus.ok(TraceWriteStatus.HISTORIC_STATE));
|
||||
} else if (primaryStorageStatus == TraceWriteStatus.ERROR) {
|
||||
// If primary write fails, then search write never happened
|
||||
finalResponse.put(
|
||||
aspectName,
|
||||
TraceStorageStatus.fail(TraceWriteStatus.ERROR, "Primary storage write failed."));
|
||||
} else if (status == TraceWriteStatus.TRACE_NOT_IMPLEMENTED
|
||||
|| status == TraceWriteStatus.UNKNOWN) {
|
||||
} else if (primaryStorageStatus == TraceWriteStatus.TRACE_NOT_IMPLEMENTED
|
||||
|| primaryStorageStatus == TraceWriteStatus.UNKNOWN) {
|
||||
// If we don't know what happened with primary storage, then we can't know what should
|
||||
// have happend to search
|
||||
finalResponse.put(
|
||||
aspectName,
|
||||
TraceStorageStatus.ok(
|
||||
|
||||
@ -392,4 +392,51 @@ public class TraceServiceImplTest {
|
||||
assertEquals(status.getSearchStorage().getWriteStatus(), TraceWriteStatus.NO_OP);
|
||||
assertTrue(status.isSuccess());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTraceHistoricStateSearchPropagation() throws Exception {
|
||||
// This test verifies that when primary storage has HISTORIC_STATE,
|
||||
// the search storage is also set to HISTORIC_STATE based on this line:
|
||||
// finalResponse.put(aspectName, TraceStorageStatus.ok(TraceWriteStatus.HISTORIC_STATE));
|
||||
|
||||
// Arrange - create request with one URN and one aspect
|
||||
Map<Urn, List<String>> aspectNames =
|
||||
Collections.singletonMap(TEST_URN, Collections.singletonList(ASPECT_NAME));
|
||||
|
||||
// Create a primary storage response with HISTORIC_STATE
|
||||
Map<String, TraceStorageStatus> primaryStatus = new LinkedHashMap<>();
|
||||
primaryStatus.put(ASPECT_NAME, TraceStorageStatus.ok(TraceWriteStatus.HISTORIC_STATE));
|
||||
|
||||
// Mock entityService to return empty to skip that branch
|
||||
when(entityService.getEntitiesV2(any(), anyString(), anySet(), anySet(), anyBoolean()))
|
||||
.thenReturn(Collections.emptyMap());
|
||||
|
||||
// Mock mcpTraceReader to return our primary status with HISTORIC_STATE
|
||||
when(mcpTraceReader.tracePendingStatuses(any(), eq(TEST_TRACE_ID), any(), anyBoolean()))
|
||||
.thenReturn(Collections.singletonMap(TEST_URN, primaryStatus));
|
||||
|
||||
// Mock systemMetadataService to return empty list to ensure
|
||||
// we don't get a pre-existing search status
|
||||
when(systemMetadataService.findAspectsByUrn(eq(TEST_URN), anyList(), eq(true)))
|
||||
.thenReturn(Collections.emptyList());
|
||||
|
||||
// Act
|
||||
Map<Urn, Map<String, TraceStatus>> result =
|
||||
traceService.trace(operationContext, TEST_TRACE_ID, aspectNames, false, false);
|
||||
|
||||
// Assert
|
||||
assertNotNull(result);
|
||||
assertTrue(result.containsKey(TEST_URN));
|
||||
Map<String, TraceStatus> urnStatus = result.get(TEST_URN);
|
||||
assertTrue(urnStatus.containsKey(ASPECT_NAME));
|
||||
|
||||
// The key assertion - if primary is HISTORIC_STATE, search should also be HISTORIC_STATE
|
||||
TraceStatus status = urnStatus.get(ASPECT_NAME);
|
||||
assertEquals(status.getPrimaryStorage().getWriteStatus(), TraceWriteStatus.HISTORIC_STATE);
|
||||
assertEquals(
|
||||
status.getSearchStorage().getWriteStatus(),
|
||||
TraceWriteStatus.HISTORIC_STATE,
|
||||
"When primary storage is HISTORIC_STATE, search storage should also be HISTORIC_STATE");
|
||||
assertTrue(status.isSuccess());
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user