fix(): handle null systemmetadata corner cases (#13086)

This commit is contained in:
david-leifker 2025-04-10 17:44:58 -05:00 committed by GitHub
parent b82ec1c65e
commit 2add837621
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 380 additions and 7 deletions

View File

@ -16,21 +16,23 @@ create table if not exists datahub.metadata_aspect_v2 (
primary key ((urn), aspect, version))
with clustering order by (aspect asc, version asc);
insert into datahub.metadata_aspect_v2 (urn, aspect, version, metadata, createdon, createdby, entity) values(
insert into datahub.metadata_aspect_v2 (urn, aspect, version, metadata, systemmetadata, createdon, createdby, entity) values(
'urn:li:corpuser:datahub',
'corpUserInfo',
0,
'{"displayName":"Data Hub","active":true,"fullName":"Data Hub","email":"datahub@linkedin.com"}',
'{}',
toTimestamp(now()),
'urn:li:corpuser:__datahub_system',
'corpuser'
) if not exists;
insert into datahub.metadata_aspect_v2 (urn, aspect, version, metadata, createdon, createdby, entity) values(
insert into datahub.metadata_aspect_v2 (urn, aspect, version, metadata, systemmetadata, createdon, createdby, entity) values(
'urn:li:corpuser:datahub',
'corpUserEditableInfo',
0,
'{"skills":[],"teams":[],"pictureLink":"https://raw.githubusercontent.com/linkedin/datahub/master/datahub-web-react/src/images/default_avatar.png"}',
'{}',
toTimestamp(now()),
'urn:li:corpuser:__datahub_system',
'corpuser'

View File

@ -19,11 +19,12 @@ create table if not exists metadata_aspect_v2 (
-- create default records for datahub user if not exists
DROP TABLE if exists temp_metadata_aspect_v2;
CREATE TABLE temp_metadata_aspect_v2 LIKE metadata_aspect_v2;
INSERT INTO temp_metadata_aspect_v2 (urn, aspect, version, metadata, createdon, createdby) VALUES(
INSERT INTO temp_metadata_aspect_v2 (urn, aspect, version, metadata, systemmetadata, createdon, createdby) VALUES(
'urn:li:corpuser:datahub',
'corpUserInfo',
0,
'{"displayName":"Data Hub","active":true,"fullName":"Data Hub","email":"datahub@linkedin.com"}',
'{}',
now(),
'urn:li:corpuser:__datahub_system'
), (
@ -31,6 +32,7 @@ INSERT INTO temp_metadata_aspect_v2 (urn, aspect, version, metadata, createdon,
'corpUserEditableInfo',
0,
'{"skills":[],"teams":[],"pictureLink":"https://raw.githubusercontent.com/datahub-project/datahub/master/datahub-web-react/src/images/default_avatar.png"}',
'{}',
now(),
'urn:li:corpuser:__datahub_system'
);

View File

@ -15,11 +15,12 @@ create index timeIndex ON metadata_aspect_v2 (createdon);
-- create default records for datahub user if not exists
CREATE TEMP TABLE temp_metadata_aspect_v2 AS TABLE metadata_aspect_v2 WITH NO DATA;
INSERT INTO temp_metadata_aspect_v2 (urn, aspect, version, metadata, createdon, createdby) VALUES(
INSERT INTO temp_metadata_aspect_v2 (urn, aspect, version, metadata, systemmetadata, createdon, createdby) VALUES(
'urn:li:corpuser:datahub',
'corpUserInfo',
0,
'{"displayName":"Data Hub","active":true,"fullName":"Data Hub","email":"datahub@linkedin.com"}',
'{}',
now(),
'urn:li:corpuser:__datahub_system'
), (
@ -27,6 +28,7 @@ INSERT INTO temp_metadata_aspect_v2 (urn, aspect, version, metadata, createdon,
'corpUserEditableInfo',
0,
'{"skills":[],"teams":[],"pictureLink":"https://raw.githubusercontent.com/datahub-project/datahub/master/datahub-web-react/src/images/default_avatar.png"}',
'{}',
now(),
'urn:li:corpuser:__datahub_system'
);

View File

@ -161,6 +161,19 @@ public class EntityAspect {
return envelopedAspect;
}
@Nonnull
public SystemMetadata getSystemMetadata() {
if (systemMetadata == null) {
if (entityAspect != null && entityAspect.getSystemMetadata() != null) {
systemMetadata =
RecordUtils.toRecordTemplate(SystemMetadata.class, entityAspect.getSystemMetadata());
} else {
systemMetadata = SystemMetadataUtils.createDefaultSystemMetadata();
}
}
return systemMetadata;
}
public static class EntitySystemAspectBuilder {
private EntityAspect.EntitySystemAspect build() {

View File

@ -1,17 +1,21 @@
package com.linkedin.metadata.aspect;
import static com.linkedin.metadata.Constants.DATASET_ENTITY_NAME;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertSame;
import com.datahub.test.TestEntityProfile;
import com.linkedin.common.Status;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.data.schema.annotation.PathSpecBasedSchemaAnnotationVisitor;
import com.linkedin.entity.AspectType;
import com.linkedin.entity.EnvelopedAspect;
import com.linkedin.metadata.models.registry.ConfigEntityRegistry;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.mxe.SystemMetadata;
import java.sql.Timestamp;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.BeforeTest;
@ -23,7 +27,7 @@ public class EntityAspectTest {
"urn:li:dataset:(urn:li:dataPlatform:hive,SampleTable,PROD)";
private static final String TEST_ASPECT = "status";
private static final String TEST_METADATA = "{\"removes\":false}";
private static final String TEST_SYSTEM_METADATA = "{\"lastModified\":1234567890}";
private static final String TEST_SYSTEM_METADATA = "{\"lastObserved\":1234567890}";
private static final String TEST_CREATED_BY = "urn:li:corpuser:testUser";
private static final String TEST_CREATED_FOR = "urn:li:corpuser:testImpersonator";
@ -239,4 +243,107 @@ public class EntityAspectTest {
assertEquals(aspect.getCreatedBy(), "");
assertEquals(aspect.getCreatedFor(), "");
}
@Test
public void testGetSystemMetadata() {
// Case 1: When systemMetadata is null and entityAspect's systemMetadata is not null
EntityAspect.EntitySystemAspect systemAspect =
EntityAspect.EntitySystemAspect.builder().forInsert(testEntityAspect, entityRegistry);
// First call to getSystemMetadata should parse from TEST_SYSTEM_METADATA string
assertNotNull(systemAspect.getSystemMetadata());
assertEquals(systemAspect.getSystemMetadata().getLastObserved(), Long.valueOf(1234567890));
// Second call should return the cached value
SystemMetadata cachedMetadata = systemAspect.getSystemMetadata();
assertNotNull(cachedMetadata);
assertEquals(cachedMetadata.getLastObserved(), Long.valueOf(1234567890));
// Case 2: When systemMetadata is already set (not null)
SystemMetadata presetSystemMetadata = new SystemMetadata().setLastObserved(123L);
EntityAspect.EntitySystemAspect systemAspectWithPresetMetadata =
EntityAspect.EntitySystemAspect.builder()
.forInsert(testEntityAspect, entityRegistry)
.setSystemMetadata(presetSystemMetadata);
// Should return the preset metadata without using the one from entityAspect
assertEquals(systemAspectWithPresetMetadata.getSystemMetadata(), presetSystemMetadata);
assertEquals(
systemAspectWithPresetMetadata.getSystemMetadata().getLastObserved(), Long.valueOf(123L));
// Case 3: When both systemMetadata and entityAspect's systemMetadata are null
EntityAspect aspectWithoutSystemMetadata =
EntityAspect.builder()
.urn(TEST_URN)
.aspect(TEST_ASPECT)
.version(1L)
.metadata(TEST_METADATA)
.systemMetadata(null)
.build();
EntityAspect.EntitySystemAspect systemAspectWithoutMetadata =
EntityAspect.EntitySystemAspect.builder()
.forInsert(aspectWithoutSystemMetadata, entityRegistry);
// Should create default system metadata
SystemMetadata defaultMetadata = systemAspectWithoutMetadata.getSystemMetadata();
assertNotNull(defaultMetadata);
// Default system metadata should have the default run ID
assertEquals("no-run-id-provided", defaultMetadata.getRunId());
// Case 4: entityAspect is null
EntityAspect.EntitySystemAspect nullEntityAspect =
new EntityAspect.EntitySystemAspect(
null,
UrnUtils.getUrn(TEST_URN),
null,
null,
null,
entityRegistry.getEntitySpec(DATASET_ENTITY_NAME),
null);
// Should create default system metadata
SystemMetadata nullEntityDefaultMetadata = nullEntityAspect.getSystemMetadata();
assertNotNull(nullEntityDefaultMetadata);
}
@Test
public void testGetSystemMetadataFromEntityAspect() {
// Create an EntityAspect with specific systemMetadata
String systemMetadataJson = "{\"lastObserved\":9876543210,\"runId\":\"test-run-id\"}";
EntityAspect aspectWithCustomSystemMetadata =
EntityAspect.builder()
.urn(TEST_URN)
.aspect(TEST_ASPECT)
.version(1L)
.metadata(TEST_METADATA)
.systemMetadata(systemMetadataJson)
.build();
// Create EntitySystemAspect that should parse the systemMetadata from entityAspect
EntityAspect.EntitySystemAspect systemAspect =
new EntityAspect.EntitySystemAspect(
aspectWithCustomSystemMetadata, // Explicitly set entityAspect
UrnUtils.getUrn(TEST_URN),
null, // No recordTemplate
null, // No systemMetadata initially - should be populated from entityAspect
null, // No auditStamp
entityRegistry.getEntitySpec(DATASET_ENTITY_NAME),
entityRegistry.getEntitySpec(DATASET_ENTITY_NAME).getAspectSpec(TEST_ASPECT));
// This should trigger the condition we're testing:
// "if (entityAspect != null && entityAspect.getSystemMetadata() != null)"
SystemMetadata result = systemAspect.getSystemMetadata();
// Verify the result was properly parsed from the JSON in entityAspect
assertNotNull(result);
assertEquals(result.getLastObserved(), Long.valueOf(9876543210L));
assertEquals(result.getRunId(), "test-run-id");
// Make a second call to verify we get the same object back (cached)
SystemMetadata cachedResult = systemAspect.getSystemMetadata();
assertSame(result, cachedResult, "Second call should return the same cached object");
}
}

View File

@ -65,9 +65,12 @@ public class ProposedItem implements MCPItem {
return null;
}
@Nullable
@Nonnull
@Override
public SystemMetadata getSystemMetadata() {
if (metadataChangeProposal.getSystemMetadata() == null) {
metadataChangeProposal.setSystemMetadata(SystemMetadataUtils.createDefaultSystemMetadata());
}
return metadataChangeProposal.getSystemMetadata();
}

View File

@ -52,10 +52,24 @@ public class EbeanSystemAspect implements SystemAspect {
@Getter @Setter @Nullable private RecordTemplate recordTemplate;
@Getter @Setter @Nullable private SystemMetadata systemMetadata;
@Setter @Nullable private SystemMetadata systemMetadata;
@Setter @Nullable private AuditStamp auditStamp;
@Nonnull
@Override
public SystemMetadata getSystemMetadata() {
if (systemMetadata == null) {
if (ebeanAspectV2 != null && ebeanAspectV2.getSystemMetadata() != null) {
systemMetadata =
RecordUtils.toRecordTemplate(SystemMetadata.class, ebeanAspectV2.getSystemMetadata());
} else {
systemMetadata = SystemMetadataUtils.createDefaultSystemMetadata();
}
}
return systemMetadata;
}
/**
* Return the ebean model for the given version. If version 0, use latest, otherwise copy and
* update system metadata version.

View File

@ -974,4 +974,66 @@ public class EntityServiceImplTest {
verify(mockCounter).inc();
}
}
@Test
public void testRestoreIndicesWithNullSystemMetadata() {
// Setup mock AspectDao
AspectDao mockAspectDao = mock(AspectDao.class);
PartitionedStream<EbeanAspectV2> mockStream = mock(PartitionedStream.class);
// Create test aspects
List<EbeanAspectV2> batch = new ArrayList<>();
// Create an aspect with null system metadata
Urn testUrn =
UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:test,nullMetadataTest,PROD)");
Status testStatus = new Status().setRemoved(false);
EbeanAspectV2 aspectWithNullMetadata =
new EbeanAspectV2(
testUrn.toString(),
STATUS_ASPECT_NAME,
0L,
RecordUtils.toJsonString(testStatus),
new Timestamp(System.currentTimeMillis()),
TEST_AUDIT_STAMP.getActor().toString(),
null,
null // Explicitly set systemMetadata to null
);
batch.add(aspectWithNullMetadata);
// Setup mock stream
when(mockStream.partition(anyInt())).thenReturn(Stream.of(batch.stream()));
when(mockAspectDao.streamAspectBatches(any())).thenReturn(mockStream);
// Setup mock EventProducer
EventProducer mockEventProducer = mock(EventProducer.class);
when(mockEventProducer.produceMetadataChangeLog(
any(OperationContext.class), any(), any(), any()))
.thenReturn(CompletableFuture.completedFuture(null));
// Create EntityServiceImpl with mocks
EntityServiceImpl entityServiceSpy =
spy(
new EntityServiceImpl(
mockAspectDao, mockEventProducer, false, mock(PreProcessHooks.class), 0, true));
// Create RestoreIndicesArgs
RestoreIndicesArgs args =
new RestoreIndicesArgs()
.start(0)
.limit(100)
.batchSize(50)
.batchDelayMs(0L)
.createDefaultAspects(false);
// Execute the method under test
List<RestoreIndicesResult> results =
entityServiceSpy.restoreIndices(opContext, args, message -> {});
// Verify results
assertNotNull(results);
assertEquals(1, results.size());
assertEquals(1, results.get(0).rowsMigrated);
}
}

View File

@ -9,6 +9,7 @@ import com.datahub.util.RecordUtils;
import com.linkedin.common.AuditStamp;
import com.linkedin.common.Status;
import com.linkedin.common.urn.Urn;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.metadata.aspect.EntityAspect;
import com.linkedin.metadata.aspect.SystemAspect;
@ -233,4 +234,69 @@ public class EbeanSystemAspectTest {
assertThrows(NullPointerException.class, () -> aspect.withVersion(0));
}
@Test
public void testGetSystemMetadata() {
// Case 1: When systemMetadata is null and ebeanAspectV2 has system metadata
when(ebeanAspectV2.getSystemMetadata()).thenReturn(RecordUtils.toJsonString(systemMetadata));
EbeanSystemAspect aspect =
new EbeanSystemAspect(
ebeanAspectV2,
UrnUtils.getUrn(ebeanAspectV2.getUrn()),
ebeanAspectV2.getAspect(),
entitySpec,
aspectSpec,
recordTemplate,
null, // null so that we get it from ebeanAspectV2
auditStamp);
// First call should parse from ebeanAspectV2's system metadata
SystemMetadata metadata = aspect.getSystemMetadata();
assertNotNull(metadata);
assertEquals(metadata.getRunId(), systemMetadata.getRunId());
// Case 2: When systemMetadata is already set (cached)
// Call getSystemMetadata again to test caching behavior
SystemMetadata cachedMetadata = aspect.getSystemMetadata();
assertSame(
cachedMetadata, metadata, "Should return the same cached object on subsequent calls");
// Case 3: When systemMetadata is explicitly set
SystemMetadata customMetadata = new SystemMetadata();
customMetadata.setLastObserved(9876543210L);
customMetadata.setRunId("custom-run-id");
EbeanSystemAspect aspectWithCustomMetadata =
EbeanSystemAspect.builder()
.forUpdate(ebeanAspectV2, entityRegistry)
.setSystemMetadata(customMetadata);
// Should return the custom metadata
assertEquals(aspectWithCustomMetadata.getSystemMetadata(), customMetadata);
assertEquals(aspectWithCustomMetadata.getSystemMetadata().getRunId(), "custom-run-id");
assertEquals(
aspectWithCustomMetadata.getSystemMetadata().getLastObserved(), Long.valueOf(9876543210L));
// Case 4: When ebeanAspectV2's systemMetadata is null
when(ebeanAspectV2.getSystemMetadata()).thenReturn(null);
EbeanSystemAspect aspectWithNullMetadata =
EbeanSystemAspect.builder().forUpdate(ebeanAspectV2, entityRegistry);
// Should create default system metadata
SystemMetadata defaultMetadata = aspectWithNullMetadata.getSystemMetadata();
assertNotNull(defaultMetadata);
assertEquals(defaultMetadata.getRunId(), "no-run-id-provided");
// Case 5: When ebeanAspectV2 is null
EbeanSystemAspect aspectWithNullEbeanAspect =
EbeanSystemAspect.builder()
.forInsert(
urn, STATUS_ASPECT_NAME, entitySpec, aspectSpec, recordTemplate, null, auditStamp);
// Should create default system metadata
SystemMetadata defaultMetadataWithNullEbeanAspect =
aspectWithNullEbeanAspect.getSystemMetadata();
assertNotNull(defaultMetadataWithNullEbeanAspect);
assertEquals(defaultMetadataWithNullEbeanAspect.getRunId(), "no-run-id-provided");
}
}

View File

@ -0,0 +1,102 @@
package com.linkedin.metadata.entity.ebean.batch;
import static com.linkedin.metadata.Constants.DATASET_ENTITY_NAME;
import static com.linkedin.metadata.Constants.STATUS_ASPECT_NAME;
import static org.testng.Assert.*;
import com.linkedin.common.AuditStamp;
import com.linkedin.common.Status;
import com.linkedin.common.urn.Urn;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.models.AspectSpec;
import com.linkedin.metadata.models.EntitySpec;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.metadata.utils.GenericRecordUtils;
import com.linkedin.mxe.MetadataChangeProposal;
import com.linkedin.mxe.SystemMetadata;
import io.datahubproject.metadata.context.OperationContext;
import io.datahubproject.test.metadata.context.TestOperationContexts;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
public class ProposedItemTest {
private static final String URN_STRING =
"urn:li:dataset:(urn:li:dataPlatform:hdfs,SampleHdfsDataset,PROD)";
private static final String ASPECT_NAME = "status";
private static final String ACTOR_URN = "urn:li:corpuser:testUser";
private final OperationContext opContext = TestOperationContexts.systemContextNoValidate();
private final EntityRegistry entityRegistry = opContext.getEntityRegistry();
private final EntitySpec entitySpec = entityRegistry.getEntitySpec(DATASET_ENTITY_NAME);
private final AspectSpec aspectSpec = entitySpec.getAspectSpec(STATUS_ASPECT_NAME);
private Urn urn;
private MetadataChangeProposal mcp;
private AuditStamp auditStamp;
@BeforeMethod
public void setup() throws Exception {
// Set up URN and audit stamp
urn = Urn.createFromString(URN_STRING);
auditStamp =
new AuditStamp()
.setActor(Urn.createFromString(ACTOR_URN))
.setTime(System.currentTimeMillis());
// Create MCP
mcp = new MetadataChangeProposal();
mcp.setEntityUrn(urn);
mcp.setEntityType(DATASET_ENTITY_NAME);
mcp.setAspectName(ASPECT_NAME);
mcp.setChangeType(ChangeType.UPSERT);
mcp.setAspect(GenericRecordUtils.serializeAspect(new Status().setRemoved(false)));
}
@Test
public void testBuild() {
ProposedItem item = ProposedItem.builder().build(mcp, auditStamp, entityRegistry);
assertNotNull(item);
assertEquals(item.getUrn(), urn);
assertEquals(item.getMetadataChangeProposal(), mcp);
assertEquals(item.getAuditStamp(), auditStamp);
assertEquals(item.getEntitySpec(), entitySpec);
assertEquals(item.getAspectSpec(), aspectSpec);
}
@Test
public void testGetSystemMetadata() {
// Case 1: When systemMetadata is null in MCP
ProposedItem itemNullMetadata = ProposedItem.builder().build(mcp, auditStamp, entityRegistry);
SystemMetadata metadata = itemNullMetadata.getSystemMetadata();
assertNotNull(metadata);
assertEquals(metadata.getRunId(), "no-run-id-provided");
// Verify it was set on the MCP
assertNotNull(mcp.getSystemMetadata());
assertEquals(mcp.getSystemMetadata(), metadata);
// Case 2: When systemMetadata is already set in MCP
SystemMetadata customMetadata = new SystemMetadata();
customMetadata.setRunId("custom-run-id");
customMetadata.setLastObserved(1234567890L);
MetadataChangeProposal mcpWithMetadata = new MetadataChangeProposal();
mcpWithMetadata.setEntityUrn(urn);
mcpWithMetadata.setEntityType(DATASET_ENTITY_NAME);
mcpWithMetadata.setAspectName(ASPECT_NAME);
mcpWithMetadata.setChangeType(ChangeType.UPSERT);
mcpWithMetadata.setSystemMetadata(customMetadata);
ProposedItem itemWithMetadata =
ProposedItem.builder().build(mcpWithMetadata, auditStamp, entityRegistry);
SystemMetadata retrievedMetadata = itemWithMetadata.getSystemMetadata();
assertNotNull(retrievedMetadata);
assertEquals(retrievedMetadata.getRunId(), "custom-run-id");
assertEquals(retrievedMetadata.getLastObserved().longValue(), 1234567890L);
assertEquals(retrievedMetadata, customMetadata);
}
}