fix(siblingsHook): check number of dbtUpstreams instead of all upStreams (#8817)

Co-authored-by: Ethan Cartwright <ethan.cartwright@acryl.io>
This commit is contained in:
ethan-cartwright 2023-09-13 03:45:58 -04:00 committed by GitHub
parent 3cc0f76d17
commit 785ab7718d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 100 additions and 31 deletions

View File

@ -200,10 +200,19 @@ public class SiblingAssociationHook implements MetadataChangeLogHook {
UpstreamLineage upstreamLineage = getUpstreamLineageFromEvent(event); UpstreamLineage upstreamLineage = getUpstreamLineageFromEvent(event);
if (upstreamLineage != null && upstreamLineage.hasUpstreams()) { if (upstreamLineage != null && upstreamLineage.hasUpstreams()) {
UpstreamArray upstreams = upstreamLineage.getUpstreams(); UpstreamArray upstreams = upstreamLineage.getUpstreams();
if (
upstreams.size() == 1 // an entity can have merged lineage (eg. dbt + snowflake), but by default siblings are only between dbt <> non-dbt
&& upstreams.get(0).getDataset().getPlatformEntity().getPlatformNameEntity().equals(DBT_PLATFORM_NAME)) { UpstreamArray dbtUpstreams = new UpstreamArray(
setSiblingsAndSoftDeleteSibling(upstreams.get(0).getDataset(), sourceUrn); upstreams.stream()
.filter(obj -> obj.getDataset().getPlatformEntity().getPlatformNameEntity().equals(DBT_PLATFORM_NAME))
.collect(Collectors.toList())
);
// We're assuming a data asset (eg. snowflake table) will only ever be downstream of 1 dbt model
if (dbtUpstreams.size() == 1) {
setSiblingsAndSoftDeleteSibling(dbtUpstreams.get(0).getDataset(), sourceUrn);
} else {
log.error("{} has an unexpected number of dbt upstreams: {}. Not adding any as siblings.", sourceUrn.toString(), dbtUpstreams.size());
} }
} }
} }
@ -219,7 +228,7 @@ public class SiblingAssociationHook implements MetadataChangeLogHook {
existingDbtSiblingAspect != null existingDbtSiblingAspect != null
&& existingSourceSiblingAspect != null && existingSourceSiblingAspect != null
&& existingDbtSiblingAspect.getSiblings().contains(sourceUrn.toString()) && existingDbtSiblingAspect.getSiblings().contains(sourceUrn.toString())
&& existingDbtSiblingAspect.getSiblings().contains(dbtUrn.toString()) && existingSourceSiblingAspect.getSiblings().contains(dbtUrn.toString())
) { ) {
// we have already connected them- we can abort here // we have already connected them- we can abort here
return; return;

View File

@ -36,6 +36,8 @@ import org.mockito.Mockito;
import org.testng.annotations.BeforeMethod; import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test; import org.testng.annotations.Test;
import java.net.URISyntaxException;
import static com.linkedin.metadata.Constants.*; import static com.linkedin.metadata.Constants.*;
import static org.mockito.ArgumentMatchers.*; import static org.mockito.ArgumentMatchers.*;
@ -78,15 +80,12 @@ public class SiblingAssociationHookTest {
_mockAuthentication _mockAuthentication
)).thenReturn(mockResponse); )).thenReturn(mockResponse);
MetadataChangeLog event = new MetadataChangeLog();
event.setEntityType(DATASET_ENTITY_NAME); MetadataChangeLog event = createEvent(DATASET_ENTITY_NAME, UPSTREAM_LINEAGE_ASPECT_NAME, ChangeType.UPSERT);
event.setAspectName(UPSTREAM_LINEAGE_ASPECT_NAME);
event.setChangeType(ChangeType.UPSERT); Upstream upstream = createUpstream("urn:li:dataset:(urn:li:dataPlatform:bigquery,my-proj.jaffle_shop.customers,PROD)", DatasetLineageType.TRANSFORMED);
final UpstreamLineage upstreamLineage = new UpstreamLineage(); final UpstreamLineage upstreamLineage = new UpstreamLineage();
final UpstreamArray upstreamArray = new UpstreamArray(); final UpstreamArray upstreamArray = new UpstreamArray();
final Upstream upstream = new Upstream();
upstream.setType(DatasetLineageType.TRANSFORMED);
upstream.setDataset(DatasetUrn.createFromString("urn:li:dataset:(urn:li:dataPlatform:bigquery,my-proj.jaffle_shop.customers,PROD)"));
upstreamArray.add(upstream); upstreamArray.add(upstream);
upstreamLineage.setUpstreams(upstreamArray); upstreamLineage.setUpstreams(upstreamArray);
@ -151,15 +150,11 @@ public class SiblingAssociationHookTest {
_mockAuthentication _mockAuthentication
)).thenReturn(mockResponse); )).thenReturn(mockResponse);
MetadataChangeLog event = new MetadataChangeLog(); MetadataChangeLog event = createEvent(DATASET_ENTITY_NAME, UPSTREAM_LINEAGE_ASPECT_NAME, ChangeType.UPSERT);
event.setEntityType(DATASET_ENTITY_NAME); Upstream upstream = createUpstream("urn:li:dataset:(urn:li:dataPlatform:bigquery,my-proj.jaffle_shop.customers,PROD)", DatasetLineageType.TRANSFORMED);
event.setAspectName(UPSTREAM_LINEAGE_ASPECT_NAME);
event.setChangeType(ChangeType.UPSERT);
final UpstreamLineage upstreamLineage = new UpstreamLineage(); final UpstreamLineage upstreamLineage = new UpstreamLineage();
final UpstreamArray upstreamArray = new UpstreamArray(); final UpstreamArray upstreamArray = new UpstreamArray();
final Upstream upstream = new Upstream();
upstream.setType(DatasetLineageType.TRANSFORMED);
upstream.setDataset(DatasetUrn.createFromString("urn:li:dataset:(urn:li:dataPlatform:bigquery,my-proj.jaffle_shop.customers,PROD)"));
upstreamArray.add(upstream); upstreamArray.add(upstream);
upstreamLineage.setUpstreams(upstreamArray); upstreamLineage.setUpstreams(upstreamArray);
@ -189,15 +184,11 @@ public class SiblingAssociationHookTest {
public void testInvokeWhenThereIsAPairWithBigqueryDownstreamNode() throws Exception { public void testInvokeWhenThereIsAPairWithBigqueryDownstreamNode() throws Exception {
Mockito.when(_mockEntityClient.exists(Mockito.any(), Mockito.any())).thenReturn(true); Mockito.when(_mockEntityClient.exists(Mockito.any(), Mockito.any())).thenReturn(true);
MetadataChangeLog event = new MetadataChangeLog();
event.setEntityType(DATASET_ENTITY_NAME); MetadataChangeLog event = createEvent(DATASET_ENTITY_NAME, UPSTREAM_LINEAGE_ASPECT_NAME, ChangeType.UPSERT);
event.setAspectName(UPSTREAM_LINEAGE_ASPECT_NAME);
event.setChangeType(ChangeType.UPSERT);
final UpstreamLineage upstreamLineage = new UpstreamLineage(); final UpstreamLineage upstreamLineage = new UpstreamLineage();
final UpstreamArray upstreamArray = new UpstreamArray(); final UpstreamArray upstreamArray = new UpstreamArray();
final Upstream upstream = new Upstream(); Upstream upstream = createUpstream("urn:li:dataset:(urn:li:dataPlatform:dbt,my-proj.jaffle_shop.customers,PROD)", DatasetLineageType.TRANSFORMED);
upstream.setType(DatasetLineageType.TRANSFORMED);
upstream.setDataset(DatasetUrn.createFromString("urn:li:dataset:(urn:li:dataPlatform:dbt,my-proj.jaffle_shop.customers,PROD)"));
upstreamArray.add(upstream); upstreamArray.add(upstream);
upstreamLineage.setUpstreams(upstreamArray); upstreamLineage.setUpstreams(upstreamArray);
@ -259,10 +250,7 @@ public class SiblingAssociationHookTest {
.setSkipAggregates(true).setSkipHighlighting(true)) .setSkipAggregates(true).setSkipHighlighting(true))
)).thenReturn(returnSearchResult); )).thenReturn(returnSearchResult);
MetadataChangeLog event = new MetadataChangeLog(); MetadataChangeLog event = createEvent(DATASET_ENTITY_NAME, DATASET_KEY_ASPECT_NAME, ChangeType.UPSERT);
event.setEntityType(DATASET_ENTITY_NAME);
event.setAspectName(DATASET_KEY_ASPECT_NAME);
event.setChangeType(ChangeType.UPSERT);
final DatasetKey datasetKey = new DatasetKey(); final DatasetKey datasetKey = new DatasetKey();
datasetKey.setName("my-proj.jaffle_shop.customers"); datasetKey.setName("my-proj.jaffle_shop.customers");
datasetKey.setOrigin(FabricType.PROD); datasetKey.setOrigin(FabricType.PROD);
@ -304,4 +292,76 @@ public class SiblingAssociationHookTest {
Mockito.eq(_mockAuthentication) Mockito.eq(_mockAuthentication)
); );
} }
} @Test
public void testInvokeWhenSourceUrnHasTwoDbtUpstreams() throws Exception {
MetadataChangeLog event = createEvent(DATASET_ENTITY_NAME, UPSTREAM_LINEAGE_ASPECT_NAME, ChangeType.UPSERT);
final UpstreamLineage upstreamLineage = new UpstreamLineage();
final UpstreamArray upstreamArray = new UpstreamArray();
Upstream dbtUpstream1 = createUpstream("urn:li:dataset:(urn:li:dataPlatform:dbt,my-proj.source_entity1,PROD)", DatasetLineageType.TRANSFORMED);
Upstream dbtUpstream2 = createUpstream("urn:li:dataset:(urn:li:dataPlatform:dbt,my-proj.source_entity2,PROD)", DatasetLineageType.TRANSFORMED);
upstreamArray.add(dbtUpstream1);
upstreamArray.add(dbtUpstream2);
upstreamLineage.setUpstreams(upstreamArray);
event.setAspect(GenericRecordUtils.serializeAspect(upstreamLineage));
event.setEntityUrn(Urn.createFromString("urn:li:dataset:(urn:li:dataPlatform:bigquery,my-proj.jaffle_shop.customers,PROD)"));
_siblingAssociationHook.invoke(event);
Mockito.verify(_mockEntityClient, Mockito.times(0)).ingestProposal(
Mockito.any(),
Mockito.eq(_mockAuthentication)
);
}
@Test
public void testInvokeWhenSourceUrnHasTwoUpstreamsOneDbt() throws Exception {
MetadataChangeLog event = createEvent(DATASET_ENTITY_NAME, UPSTREAM_LINEAGE_ASPECT_NAME, ChangeType.UPSERT);
final UpstreamLineage upstreamLineage = new UpstreamLineage();
final UpstreamArray upstreamArray = new UpstreamArray();
Upstream dbtUpstream = createUpstream("urn:li:dataset:(urn:li:dataPlatform:dbt,my-proj.source_entity1,PROD)", DatasetLineageType.TRANSFORMED);
Upstream snowflakeUpstream =
createUpstream("urn:li:dataset:(urn:li:dataPlatform:snowflake,my-proj.jaffle_shop.customers,PROD)", DatasetLineageType.TRANSFORMED);
upstreamArray.add(dbtUpstream);
upstreamArray.add(snowflakeUpstream);
upstreamLineage.setUpstreams(upstreamArray);
event.setAspect(GenericRecordUtils.serializeAspect(upstreamLineage));
event.setEntityUrn(Urn.createFromString("urn:li:dataset:(urn:li:dataPlatform:bigquery,my-proj.jaffle_shop.customers,PROD)"));
_siblingAssociationHook.invoke(event);
Mockito.verify(_mockEntityClient, Mockito.times(2)).ingestProposal(
Mockito.any(),
Mockito.eq(_mockAuthentication)
);
}
private MetadataChangeLog createEvent(String entityType, String aspectName, ChangeType changeType) {
MetadataChangeLog event = new MetadataChangeLog();
event.setEntityType(entityType);
event.setAspectName(aspectName);
event.setChangeType(changeType);
return event;
}
private Upstream createUpstream(String urn, DatasetLineageType upstreamType) {
final Upstream upstream = new Upstream();
upstream.setType(upstreamType);
try {
upstream.setDataset(DatasetUrn.createFromString(urn));
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
return upstream;
}
}