fix(platform): fix NPE in change event processing (#8038)

This commit is contained in:
david-leifker 2023-05-13 22:57:06 -05:00 committed by GitHub
parent 92af7152c0
commit ada78cb617
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 50 additions and 4 deletions

View File

@ -376,7 +376,7 @@ public class JavaEntityClient implements EntityClient {
@Nullable Long startTimeMillis, @Nullable Long endTimeMillis, @Nullable SearchFlags searchFlags,
@Nonnull final Authentication authentication)
throws RemoteInvocationException {
final SearchFlags finalFlags = searchFlags != null ? searchFlags : new SearchFlags().setSkipCache(true);
final SearchFlags finalFlags = searchFlags != null ? searchFlags : new SearchFlags().setFulltext(true).setSkipCache(true);
return ValidationUtils.validateLineageScrollResult(
_lineageSearchService.scrollAcrossLineage(sourceUrn, direction, entities, input, maxHops, filter,
sortCriterion, scrollId, keepAlive, count, startTimeMillis, endTimeMillis, finalFlags), _entityService);

View File

@ -183,11 +183,13 @@ public class SchemaMetadataChangeEventGenerator extends EntityChangeEventGenerat
if (baseSchema != null) {
sortFieldsByPath(baseSchema);
}
sortFieldsByPath(targetSchema);
if (targetSchema != null) {
sortFieldsByPath(targetSchema);
}
// Performs ordinal based diff, primarily based on fixed field ordinals and their types.
SchemaFieldArray baseFields = (baseSchema != null ? baseSchema.getFields() : new SchemaFieldArray());
SchemaFieldArray targetFields = targetSchema.getFields();
SchemaFieldArray targetFields = targetSchema != null ? targetSchema.getFields() : new SchemaFieldArray();
int baseFieldIdx = 0;
int targetFieldIdx = 0;
List<ChangeEvent> changeEvents = new ArrayList<>();
@ -410,7 +412,7 @@ public class SchemaMetadataChangeEventGenerator extends EntityChangeEventGenerat
(baseSchema != null && baseSchema.getPrimaryKeys() != null) ? new HashSet<>(baseSchema.getPrimaryKeys())
: new HashSet<>();
Set<String> targetPrimaryKeys =
(targetSchema.getPrimaryKeys() != null) ? new HashSet<>(targetSchema.getPrimaryKeys()) : new HashSet<>();
(targetSchema != null && targetSchema.getPrimaryKeys() != null) ? new HashSet<>(targetSchema.getPrimaryKeys()) : new HashSet<>();
Set<String> removedBaseKeys =
basePrimaryKeys.stream().filter(key -> !targetPrimaryKeys.contains(key)).collect(Collectors.toSet());
for (String removedBaseKeyField : removedBaseKeys) {

View File

@ -0,0 +1,44 @@
package com.linkedin.metadata.timeline.eventgenerator;
import com.linkedin.common.AuditStamp;
import com.linkedin.common.urn.Urn;
import com.linkedin.metadata.timeline.data.ChangeEvent;
import com.linkedin.mxe.SystemMetadata;
import com.linkedin.restli.internal.server.util.DataMapUtils;
import com.linkedin.schema.SchemaMetadata;
import org.apache.commons.io.IOUtils;
import org.springframework.test.context.testng.AbstractTestNGSpringContextTests;
import org.testng.annotations.Test;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import static org.testng.AssertJUnit.assertEquals;
public class SchemaMetadataChangeEventGeneratorTest extends AbstractTestNGSpringContextTests {
@Test
public void testDelete() throws Exception {
SchemaMetadataChangeEventGenerator test = new SchemaMetadataChangeEventGenerator();
Urn urn = Urn.createFromString(
"urn:li:dataset:(urn:li:dataPlatform:hdfs,SampleHdfsDataset,PROD)");
String entity = "dataset";
String aspect = "schemaMetadata";
AuditStamp auditStamp = new AuditStamp()
.setActor(Urn.createFromString("urn:li:corpuser:__datahub_system"))
.setTime(1683829509553L);
Aspect<SchemaMetadata> from = new Aspect<>(DataMapUtils.read(IOUtils.toInputStream(TEST_OBJECT, StandardCharsets.UTF_8),
SchemaMetadata.class, Map.of()), new SystemMetadata());
Aspect<SchemaMetadata> to = new Aspect<>(null, new SystemMetadata());
List<ChangeEvent> actual = test.getChangeEvents(urn, entity, aspect, from, to, auditStamp);
assertEquals(14, actual.size());
}
//CHECKSTYLE:OFF
private static final String TEST_OBJECT = "{\"platformSchema\":{\"com.linkedin.schema.KafkaSchema\":{\"documentSchema\":\"{\\\"type\\\":\\\"record\\\",\\\"name\\\":\\\"SampleHdfsSchema\\\",\\\"namespace\\\":\\\"com.linkedin.dataset\\\",\\\"doc\\\":\\\"Sample HDFS dataset\\\",\\\"fields\\\":[{\\\"name\\\":\\\"field_foo\\\",\\\"type\\\":[\\\"string\\\"]},{\\\"name\\\":\\\"field_bar\\\",\\\"type\\\":[\\\"boolean\\\"]}]}\"}},\"created\":{\"actor\":\"urn:li:corpuser:jdoe\",\"time\":1674291843000},\"lastModified\":{\"actor\":\"urn:li:corpuser:jdoe\",\"time\":1674291843000},\"fields\":[{\"nullable\":false,\"fieldPath\":\"shipment_info\",\"description\":\"Shipment info description\",\"isPartOfKey\":false,\"type\":{\"type\":{\"com.linkedin.schema.RecordType\":{}}},\"recursive\":false,\"nativeDataType\":\"varchar(100)\"},{\"nullable\":false,\"fieldPath\":\"shipment_info.date\",\"description\":\"Shipment info date description\",\"isPartOfKey\":false,\"type\":{\"type\":{\"com.linkedin.schema.DateType\":{}}},\"recursive\":false,\"nativeDataType\":\"Date\"},{\"nullable\":false,\"fieldPath\":\"shipment_info.target\",\"description\":\"Shipment info target description\",\"isPartOfKey\":false,\"type\":{\"type\":{\"com.linkedin.schema.StringType\":{}}},\"recursive\":false,\"nativeDataType\":\"text\"},{\"nullable\":false,\"fieldPath\":\"shipment_info.destination\",\"description\":\"Shipment info destination description\",\"isPartOfKey\":false,\"type\":{\"type\":{\"com.linkedin.schema.StringType\":{}}},\"recursive\":false,\"nativeDataType\":\"varchar(100)\"},{\"nullable\":false,\"fieldPath\":\"shipment_info.geo_info\",\"description\":\"Shipment info geo_info description\",\"isPartOfKey\":false,\"type\":{\"type\":{\"com.linkedin.schema.RecordType\":{}}},\"recursive\":false,\"nativeDataType\":\"varchar(100)\"},{\"nullable\":false,\"fieldPath\":\"shipment_info.geo_info.lat\",\"description\":\"Shipment info geo_info lat\",\"isPartOfKey\":false,\"type\":{\"type\":{\"com.linkedin.schema.NumberType\":{}}},\"recursive\":false,\"nativeDataType\":\"float\"},{\"nullable\":false,\"fieldPath\":\"shipment_info.geo_info.lng\",\"description\":\"Shipment info geo_info lng\",\"isPartOfKey\":false,\"type\":{\"type\":{\"com.linkedin.schema.NumberType\":{}}},\"recursive\":false,\"nativeDataType\":\"float\"}],\"schemaName\":\"SampleHdfsSchema\",\"version\":0,\"hash\":\"\",\"platform\":\"urn:li:dataPlatform:hdfs\"}";
//CHECKSTYLE:ON
}