From ada78cb6176eace2b938a39026fc77569d5150eb Mon Sep 17 00:00:00 2001 From: david-leifker <114954101+david-leifker@users.noreply.github.com> Date: Sat, 13 May 2023 22:57:06 -0500 Subject: [PATCH] fix(platform): fix NPE in change event processing (#8038) --- .../metadata/client/JavaEntityClient.java | 2 +- .../SchemaMetadataChangeEventGenerator.java | 8 ++-- ...chemaMetadataChangeEventGeneratorTest.java | 44 +++++++++++++++++++ 3 files changed, 50 insertions(+), 4 deletions(-) create mode 100644 metadata-io/src/test/java/com/linkedin/metadata/timeline/eventgenerator/SchemaMetadataChangeEventGeneratorTest.java diff --git a/metadata-io/src/main/java/com/linkedin/metadata/client/JavaEntityClient.java b/metadata-io/src/main/java/com/linkedin/metadata/client/JavaEntityClient.java index 683c9429d9..beb83fffb6 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/client/JavaEntityClient.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/client/JavaEntityClient.java @@ -376,7 +376,7 @@ public class JavaEntityClient implements EntityClient { @Nullable Long startTimeMillis, @Nullable Long endTimeMillis, @Nullable SearchFlags searchFlags, @Nonnull final Authentication authentication) throws RemoteInvocationException { - final SearchFlags finalFlags = searchFlags != null ? searchFlags : new SearchFlags().setSkipCache(true); + final SearchFlags finalFlags = searchFlags != null ? searchFlags : new SearchFlags().setFulltext(true).setSkipCache(true); return ValidationUtils.validateLineageScrollResult( _lineageSearchService.scrollAcrossLineage(sourceUrn, direction, entities, input, maxHops, filter, sortCriterion, scrollId, keepAlive, count, startTimeMillis, endTimeMillis, finalFlags), _entityService); diff --git a/metadata-io/src/main/java/com/linkedin/metadata/timeline/eventgenerator/SchemaMetadataChangeEventGenerator.java b/metadata-io/src/main/java/com/linkedin/metadata/timeline/eventgenerator/SchemaMetadataChangeEventGenerator.java index 40206a29a2..3609d0e7bb 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/timeline/eventgenerator/SchemaMetadataChangeEventGenerator.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/timeline/eventgenerator/SchemaMetadataChangeEventGenerator.java @@ -183,11 +183,13 @@ public class SchemaMetadataChangeEventGenerator extends EntityChangeEventGenerat if (baseSchema != null) { sortFieldsByPath(baseSchema); } - sortFieldsByPath(targetSchema); + if (targetSchema != null) { + sortFieldsByPath(targetSchema); + } // Performs ordinal based diff, primarily based on fixed field ordinals and their types. SchemaFieldArray baseFields = (baseSchema != null ? baseSchema.getFields() : new SchemaFieldArray()); - SchemaFieldArray targetFields = targetSchema.getFields(); + SchemaFieldArray targetFields = targetSchema != null ? targetSchema.getFields() : new SchemaFieldArray(); int baseFieldIdx = 0; int targetFieldIdx = 0; List changeEvents = new ArrayList<>(); @@ -410,7 +412,7 @@ public class SchemaMetadataChangeEventGenerator extends EntityChangeEventGenerat (baseSchema != null && baseSchema.getPrimaryKeys() != null) ? new HashSet<>(baseSchema.getPrimaryKeys()) : new HashSet<>(); Set targetPrimaryKeys = - (targetSchema.getPrimaryKeys() != null) ? new HashSet<>(targetSchema.getPrimaryKeys()) : new HashSet<>(); + (targetSchema != null && targetSchema.getPrimaryKeys() != null) ? new HashSet<>(targetSchema.getPrimaryKeys()) : new HashSet<>(); Set removedBaseKeys = basePrimaryKeys.stream().filter(key -> !targetPrimaryKeys.contains(key)).collect(Collectors.toSet()); for (String removedBaseKeyField : removedBaseKeys) { diff --git a/metadata-io/src/test/java/com/linkedin/metadata/timeline/eventgenerator/SchemaMetadataChangeEventGeneratorTest.java b/metadata-io/src/test/java/com/linkedin/metadata/timeline/eventgenerator/SchemaMetadataChangeEventGeneratorTest.java new file mode 100644 index 0000000000..75508320ab --- /dev/null +++ b/metadata-io/src/test/java/com/linkedin/metadata/timeline/eventgenerator/SchemaMetadataChangeEventGeneratorTest.java @@ -0,0 +1,44 @@ +package com.linkedin.metadata.timeline.eventgenerator; + +import com.linkedin.common.AuditStamp; +import com.linkedin.common.urn.Urn; +import com.linkedin.metadata.timeline.data.ChangeEvent; +import com.linkedin.mxe.SystemMetadata; +import com.linkedin.restli.internal.server.util.DataMapUtils; +import com.linkedin.schema.SchemaMetadata; +import org.apache.commons.io.IOUtils; +import org.springframework.test.context.testng.AbstractTestNGSpringContextTests; +import org.testng.annotations.Test; + +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Map; + +import static org.testng.AssertJUnit.assertEquals; + +public class SchemaMetadataChangeEventGeneratorTest extends AbstractTestNGSpringContextTests { + + @Test + public void testDelete() throws Exception { + SchemaMetadataChangeEventGenerator test = new SchemaMetadataChangeEventGenerator(); + + Urn urn = Urn.createFromString( + "urn:li:dataset:(urn:li:dataPlatform:hdfs,SampleHdfsDataset,PROD)"); + String entity = "dataset"; + String aspect = "schemaMetadata"; + AuditStamp auditStamp = new AuditStamp() + .setActor(Urn.createFromString("urn:li:corpuser:__datahub_system")) + .setTime(1683829509553L); + Aspect from = new Aspect<>(DataMapUtils.read(IOUtils.toInputStream(TEST_OBJECT, StandardCharsets.UTF_8), + SchemaMetadata.class, Map.of()), new SystemMetadata()); + Aspect to = new Aspect<>(null, new SystemMetadata()); + + List actual = test.getChangeEvents(urn, entity, aspect, from, to, auditStamp); + + assertEquals(14, actual.size()); + } + + //CHECKSTYLE:OFF + private static final String TEST_OBJECT = "{\"platformSchema\":{\"com.linkedin.schema.KafkaSchema\":{\"documentSchema\":\"{\\\"type\\\":\\\"record\\\",\\\"name\\\":\\\"SampleHdfsSchema\\\",\\\"namespace\\\":\\\"com.linkedin.dataset\\\",\\\"doc\\\":\\\"Sample HDFS dataset\\\",\\\"fields\\\":[{\\\"name\\\":\\\"field_foo\\\",\\\"type\\\":[\\\"string\\\"]},{\\\"name\\\":\\\"field_bar\\\",\\\"type\\\":[\\\"boolean\\\"]}]}\"}},\"created\":{\"actor\":\"urn:li:corpuser:jdoe\",\"time\":1674291843000},\"lastModified\":{\"actor\":\"urn:li:corpuser:jdoe\",\"time\":1674291843000},\"fields\":[{\"nullable\":false,\"fieldPath\":\"shipment_info\",\"description\":\"Shipment info description\",\"isPartOfKey\":false,\"type\":{\"type\":{\"com.linkedin.schema.RecordType\":{}}},\"recursive\":false,\"nativeDataType\":\"varchar(100)\"},{\"nullable\":false,\"fieldPath\":\"shipment_info.date\",\"description\":\"Shipment info date description\",\"isPartOfKey\":false,\"type\":{\"type\":{\"com.linkedin.schema.DateType\":{}}},\"recursive\":false,\"nativeDataType\":\"Date\"},{\"nullable\":false,\"fieldPath\":\"shipment_info.target\",\"description\":\"Shipment info target description\",\"isPartOfKey\":false,\"type\":{\"type\":{\"com.linkedin.schema.StringType\":{}}},\"recursive\":false,\"nativeDataType\":\"text\"},{\"nullable\":false,\"fieldPath\":\"shipment_info.destination\",\"description\":\"Shipment info destination description\",\"isPartOfKey\":false,\"type\":{\"type\":{\"com.linkedin.schema.StringType\":{}}},\"recursive\":false,\"nativeDataType\":\"varchar(100)\"},{\"nullable\":false,\"fieldPath\":\"shipment_info.geo_info\",\"description\":\"Shipment info geo_info description\",\"isPartOfKey\":false,\"type\":{\"type\":{\"com.linkedin.schema.RecordType\":{}}},\"recursive\":false,\"nativeDataType\":\"varchar(100)\"},{\"nullable\":false,\"fieldPath\":\"shipment_info.geo_info.lat\",\"description\":\"Shipment info geo_info lat\",\"isPartOfKey\":false,\"type\":{\"type\":{\"com.linkedin.schema.NumberType\":{}}},\"recursive\":false,\"nativeDataType\":\"float\"},{\"nullable\":false,\"fieldPath\":\"shipment_info.geo_info.lng\",\"description\":\"Shipment info geo_info lng\",\"isPartOfKey\":false,\"type\":{\"type\":{\"com.linkedin.schema.NumberType\":{}}},\"recursive\":false,\"nativeDataType\":\"float\"}],\"schemaName\":\"SampleHdfsSchema\",\"version\":0,\"hash\":\"\",\"platform\":\"urn:li:dataPlatform:hdfs\"}"; + //CHECKSTYLE:ON +}