mirror of
https://github.com/datahub-project/datahub.git
synced 2025-11-09 07:53:33 +00:00
fix(Siblings): Have sibling hook use entity client (#5279)
* fixing dbt platform issues * have sibling hook use entity client over entity service * switching search service as well * lint * more lint * more specific exceptions
This commit is contained in:
parent
c1f8227693
commit
9e58cd6ff1
@ -1,7 +1,9 @@
|
|||||||
package com.linkedin.metadata.kafka.hook.siblings;
|
package com.linkedin.metadata.kafka.hook.siblings;
|
||||||
|
|
||||||
|
import com.datahub.authentication.Authentication;
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.collect.ImmutableList;
|
import com.google.common.collect.ImmutableList;
|
||||||
|
import com.google.common.collect.ImmutableSet;
|
||||||
import com.linkedin.common.AuditStamp;
|
import com.linkedin.common.AuditStamp;
|
||||||
import com.linkedin.common.Siblings;
|
import com.linkedin.common.Siblings;
|
||||||
import com.linkedin.common.SubTypes;
|
import com.linkedin.common.SubTypes;
|
||||||
@ -10,21 +12,25 @@ import com.linkedin.common.urn.DatasetUrn;
|
|||||||
import com.linkedin.common.urn.Urn;
|
import com.linkedin.common.urn.Urn;
|
||||||
import com.linkedin.dataset.UpstreamArray;
|
import com.linkedin.dataset.UpstreamArray;
|
||||||
import com.linkedin.dataset.UpstreamLineage;
|
import com.linkedin.dataset.UpstreamLineage;
|
||||||
|
import com.linkedin.entity.EntityResponse;
|
||||||
|
import com.linkedin.entity.client.RestliEntityClient;
|
||||||
import com.linkedin.events.metadata.ChangeType;
|
import com.linkedin.events.metadata.ChangeType;
|
||||||
import com.linkedin.gms.factory.entity.EntityServiceFactory;
|
import com.linkedin.gms.factory.auth.SystemAuthenticationFactory;
|
||||||
|
import com.linkedin.gms.factory.entity.RestliEntityClientFactory;
|
||||||
import com.linkedin.gms.factory.entityregistry.EntityRegistryFactory;
|
import com.linkedin.gms.factory.entityregistry.EntityRegistryFactory;
|
||||||
import com.linkedin.gms.factory.search.SearchServiceFactory;
|
import com.linkedin.gms.factory.search.EntitySearchServiceFactory;
|
||||||
import com.linkedin.metadata.entity.EntityService;
|
import com.linkedin.metadata.Constants;
|
||||||
import com.linkedin.metadata.kafka.hook.MetadataChangeLogHook;
|
import com.linkedin.metadata.kafka.hook.MetadataChangeLogHook;
|
||||||
import com.linkedin.metadata.models.EntitySpec;
|
import com.linkedin.metadata.models.EntitySpec;
|
||||||
import com.linkedin.metadata.models.registry.EntityRegistry;
|
import com.linkedin.metadata.models.registry.EntityRegistry;
|
||||||
|
import com.linkedin.metadata.search.EntitySearchService;
|
||||||
import com.linkedin.metadata.search.SearchResult;
|
import com.linkedin.metadata.search.SearchResult;
|
||||||
import com.linkedin.metadata.search.SearchService;
|
|
||||||
import com.linkedin.metadata.utils.EntityKeyUtils;
|
import com.linkedin.metadata.utils.EntityKeyUtils;
|
||||||
import com.linkedin.metadata.utils.GenericRecordUtils;
|
import com.linkedin.metadata.utils.GenericRecordUtils;
|
||||||
import com.linkedin.mxe.GenericAspect;
|
import com.linkedin.mxe.GenericAspect;
|
||||||
import com.linkedin.mxe.MetadataChangeLog;
|
import com.linkedin.mxe.MetadataChangeLog;
|
||||||
import com.linkedin.mxe.MetadataChangeProposal;
|
import com.linkedin.mxe.MetadataChangeProposal;
|
||||||
|
import com.linkedin.r2.RemoteInvocationException;
|
||||||
import java.net.URISyntaxException;
|
import java.net.URISyntaxException;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
@ -53,7 +59,7 @@ import static com.linkedin.metadata.Constants.*;
|
|||||||
@Slf4j
|
@Slf4j
|
||||||
@Component
|
@Component
|
||||||
@Singleton
|
@Singleton
|
||||||
@Import({EntityRegistryFactory.class, EntityServiceFactory.class, SearchServiceFactory.class})
|
@Import({EntityRegistryFactory.class, RestliEntityClientFactory.class, EntitySearchServiceFactory.class, SystemAuthenticationFactory.class})
|
||||||
public class SiblingAssociationHook implements MetadataChangeLogHook {
|
public class SiblingAssociationHook implements MetadataChangeLogHook {
|
||||||
|
|
||||||
public static final String SIBLING_ASSOCIATION_SYSTEM_ACTOR = "urn:li:corpuser:__datahub_system_sibling_hook";
|
public static final String SIBLING_ASSOCIATION_SYSTEM_ACTOR = "urn:li:corpuser:__datahub_system_sibling_hook";
|
||||||
@ -61,18 +67,21 @@ public class SiblingAssociationHook implements MetadataChangeLogHook {
|
|||||||
public static final String SOURCE_SUBTYPE = "source";
|
public static final String SOURCE_SUBTYPE = "source";
|
||||||
|
|
||||||
private final EntityRegistry _entityRegistry;
|
private final EntityRegistry _entityRegistry;
|
||||||
private final EntityService _entityService;
|
private final RestliEntityClient _entityClient;
|
||||||
private final SearchService _searchService;
|
private final EntitySearchService _searchService;
|
||||||
|
private final Authentication _systemAuthentication;
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
public SiblingAssociationHook(
|
public SiblingAssociationHook(
|
||||||
@Nonnull final EntityRegistry entityRegistry,
|
@Nonnull final EntityRegistry entityRegistry,
|
||||||
@Nonnull final EntityService entityService,
|
@Nonnull final RestliEntityClient entityClient,
|
||||||
@Nonnull final SearchService searchService
|
@Nonnull final EntitySearchService searchService,
|
||||||
|
@Nonnull final Authentication systemAuthentication
|
||||||
) {
|
) {
|
||||||
_entityRegistry = entityRegistry;
|
_entityRegistry = entityRegistry;
|
||||||
_entityService = entityService;
|
_entityClient = entityClient;
|
||||||
_searchService = searchService;
|
_searchService = searchService;
|
||||||
|
_systemAuthentication = systemAuthentication;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Value("${siblings.enabled:false}")
|
@Value("${siblings.enabled:false}")
|
||||||
@ -123,8 +132,7 @@ public class SiblingAssociationHook implements MetadataChangeLogHook {
|
|||||||
entitiesWithYouAsSiblingFilter,
|
entitiesWithYouAsSiblingFilter,
|
||||||
null,
|
null,
|
||||||
0,
|
0,
|
||||||
10,
|
10);
|
||||||
null);
|
|
||||||
|
|
||||||
// we have a match of an entity with you as a sibling, associate yourself back
|
// we have a match of an entity with you as a sibling, associate yourself back
|
||||||
searchResult.getEntities().forEach(entity -> {
|
searchResult.getEntities().forEach(entity -> {
|
||||||
@ -146,21 +154,12 @@ public class SiblingAssociationHook implements MetadataChangeLogHook {
|
|||||||
|
|
||||||
if (event.getAspectName().equals(UPSTREAM_LINEAGE_ASPECT_NAME)) {
|
if (event.getAspectName().equals(UPSTREAM_LINEAGE_ASPECT_NAME)) {
|
||||||
upstreamLineage = getUpstreamLineageFromEvent(event);
|
upstreamLineage = getUpstreamLineageFromEvent(event);
|
||||||
subTypesAspectOfEntity =
|
subTypesAspectOfEntity = getSubtypesFromEntityClient(datasetUrn);
|
||||||
(SubTypes) _entityService.getLatestAspect(
|
|
||||||
datasetUrn,
|
|
||||||
SUB_TYPES_ASPECT_NAME
|
|
||||||
);
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (event.getAspectName().equals(SUB_TYPES_ASPECT_NAME)) {
|
if (event.getAspectName().equals(SUB_TYPES_ASPECT_NAME)) {
|
||||||
subTypesAspectOfEntity = getSubtypesFromEvent(event);
|
subTypesAspectOfEntity = getSubtypesFromEvent(event);
|
||||||
upstreamLineage =
|
upstreamLineage = getUpstreamLineageFromEntityClient(datasetUrn);
|
||||||
(UpstreamLineage) _entityService.getLatestAspect(
|
|
||||||
datasetUrn,
|
|
||||||
UPSTREAM_LINEAGE_ASPECT_NAME
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (
|
if (
|
||||||
@ -195,10 +194,8 @@ public class SiblingAssociationHook implements MetadataChangeLogHook {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void setSiblingsAndSoftDeleteSibling(Urn dbtUrn, Urn sourceUrn) {
|
private void setSiblingsAndSoftDeleteSibling(Urn dbtUrn, Urn sourceUrn) {
|
||||||
Siblings existingDbtSiblingAspect =
|
Siblings existingDbtSiblingAspect = getSiblingsFromEntityClient(dbtUrn);
|
||||||
(Siblings) _entityService.getLatestAspect(dbtUrn, SIBLINGS_ASPECT_NAME);
|
Siblings existingSourceSiblingAspect = getSiblingsFromEntityClient(sourceUrn);
|
||||||
Siblings existingSourceSiblingAspect =
|
|
||||||
(Siblings) _entityService.getLatestAspect(sourceUrn, SIBLINGS_ASPECT_NAME);
|
|
||||||
|
|
||||||
log.info("Associating {} and {} as siblings.", dbtUrn.toString(), sourceUrn.toString());
|
log.info("Associating {} and {} as siblings.", dbtUrn.toString(), sourceUrn.toString());
|
||||||
|
|
||||||
@ -228,7 +225,12 @@ public class SiblingAssociationHook implements MetadataChangeLogHook {
|
|||||||
dbtSiblingProposal.setChangeType(ChangeType.UPSERT);
|
dbtSiblingProposal.setChangeType(ChangeType.UPSERT);
|
||||||
dbtSiblingProposal.setEntityUrn(dbtUrn);
|
dbtSiblingProposal.setEntityUrn(dbtUrn);
|
||||||
|
|
||||||
_entityService.ingestProposal(dbtSiblingProposal, auditStamp);
|
try {
|
||||||
|
_entityClient.ingestProposal(dbtSiblingProposal, _systemAuthentication);
|
||||||
|
} catch (RemoteInvocationException e) {
|
||||||
|
log.error("Error while associating {} with {}: {}", dbtUrn.toString(), sourceUrn.toString(), e.toString());
|
||||||
|
throw new RuntimeException("Error ingesting sibling proposal. Skipping processing.", e);
|
||||||
|
}
|
||||||
|
|
||||||
// set dbt as a sibling of source
|
// set dbt as a sibling of source
|
||||||
|
|
||||||
@ -245,7 +247,14 @@ public class SiblingAssociationHook implements MetadataChangeLogHook {
|
|||||||
|
|
||||||
// clean up any references to stale siblings that have been deleted
|
// clean up any references to stale siblings that have been deleted
|
||||||
List<Urn> filteredNewSiblingsArray =
|
List<Urn> filteredNewSiblingsArray =
|
||||||
newSiblingsUrnArray.stream().filter(urn -> _entityService.exists(urn)).collect(Collectors.toList());
|
newSiblingsUrnArray.stream().filter(urn -> {
|
||||||
|
try {
|
||||||
|
return _entityClient.exists(urn, _systemAuthentication);
|
||||||
|
} catch (RemoteInvocationException e) {
|
||||||
|
log.error("Error while checking existence of {}: {}", urn.toString(), e.toString());
|
||||||
|
throw new RuntimeException("Error checking existence. Skipping processing.", e);
|
||||||
|
}
|
||||||
|
}).collect(Collectors.toList());
|
||||||
|
|
||||||
sourceSiblingAspect.setSiblings(new UrnArray(filteredNewSiblingsArray));
|
sourceSiblingAspect.setSiblings(new UrnArray(filteredNewSiblingsArray));
|
||||||
sourceSiblingAspect.setPrimary(false);
|
sourceSiblingAspect.setPrimary(false);
|
||||||
@ -259,7 +268,12 @@ public class SiblingAssociationHook implements MetadataChangeLogHook {
|
|||||||
sourceSiblingProposal.setChangeType(ChangeType.UPSERT);
|
sourceSiblingProposal.setChangeType(ChangeType.UPSERT);
|
||||||
sourceSiblingProposal.setEntityUrn(sourceUrn);
|
sourceSiblingProposal.setEntityUrn(sourceUrn);
|
||||||
|
|
||||||
_entityService.ingestProposal(sourceSiblingProposal, auditStamp);
|
try {
|
||||||
|
_entityClient.ingestProposal(sourceSiblingProposal, _systemAuthentication);
|
||||||
|
} catch (RemoteInvocationException e) {
|
||||||
|
log.error("Error while associating {} with {}: {}", dbtUrn.toString(), sourceUrn.toString(), e.toString());
|
||||||
|
throw new RuntimeException("Error ingesting sibling proposal. Skipping processing.", e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -362,4 +376,67 @@ public class SiblingAssociationHook implements MetadataChangeLogHook {
|
|||||||
return filter;
|
return filter;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private SubTypes getSubtypesFromEntityClient(
|
||||||
|
final Urn urn
|
||||||
|
) {
|
||||||
|
try {
|
||||||
|
EntityResponse entityResponse = _entityClient.getV2(
|
||||||
|
DATASET_ENTITY_NAME,
|
||||||
|
urn,
|
||||||
|
ImmutableSet.of(SUB_TYPES_ASPECT_NAME),
|
||||||
|
_systemAuthentication
|
||||||
|
);
|
||||||
|
|
||||||
|
if (entityResponse != null && entityResponse.hasAspects() && entityResponse.getAspects().containsKey(Constants.SUB_TYPES_ASPECT_NAME)) {
|
||||||
|
return new SubTypes(entityResponse.getAspects().get(Constants.SUB_TYPES_ASPECT_NAME).getValue().data());
|
||||||
|
} else {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
} catch (RemoteInvocationException | URISyntaxException e) {
|
||||||
|
throw new RuntimeException("Failed to retrieve Subtypes", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private UpstreamLineage getUpstreamLineageFromEntityClient(
|
||||||
|
final Urn urn
|
||||||
|
) {
|
||||||
|
try {
|
||||||
|
EntityResponse entityResponse = _entityClient.getV2(
|
||||||
|
DATASET_ENTITY_NAME,
|
||||||
|
urn,
|
||||||
|
ImmutableSet.of(UPSTREAM_LINEAGE_ASPECT_NAME),
|
||||||
|
_systemAuthentication
|
||||||
|
);
|
||||||
|
|
||||||
|
if (entityResponse != null && entityResponse.hasAspects() && entityResponse.getAspects().containsKey(Constants.UPSTREAM_LINEAGE_ASPECT_NAME)) {
|
||||||
|
return new UpstreamLineage(entityResponse.getAspects().get(Constants.UPSTREAM_LINEAGE_ASPECT_NAME).getValue().data());
|
||||||
|
} else {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
} catch (RemoteInvocationException | URISyntaxException e) {
|
||||||
|
throw new RuntimeException("Failed to retrieve UpstreamLineage", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private Siblings getSiblingsFromEntityClient(
|
||||||
|
final Urn urn
|
||||||
|
) {
|
||||||
|
try {
|
||||||
|
EntityResponse entityResponse = _entityClient.getV2(
|
||||||
|
DATASET_ENTITY_NAME,
|
||||||
|
urn,
|
||||||
|
ImmutableSet.of(SIBLINGS_ASPECT_NAME),
|
||||||
|
_systemAuthentication
|
||||||
|
);
|
||||||
|
|
||||||
|
if (entityResponse != null && entityResponse.hasAspects() && entityResponse.getAspects().containsKey(Constants.SIBLINGS_ASPECT_NAME)) {
|
||||||
|
return new Siblings(entityResponse.getAspects().get(Constants.SIBLINGS_ASPECT_NAME).getValue().data());
|
||||||
|
} else {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
} catch (RemoteInvocationException | URISyntaxException e) {
|
||||||
|
throw new RuntimeException("Failed to retrieve UpstreamLineage", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,7 +1,8 @@
|
|||||||
package com.linkedin.metadata.kafka.hook.siblings;
|
package com.linkedin.metadata.kafka.hook.siblings;
|
||||||
|
|
||||||
|
import com.datahub.authentication.Authentication;
|
||||||
import com.google.common.collect.ImmutableList;
|
import com.google.common.collect.ImmutableList;
|
||||||
import com.linkedin.common.AuditStamp;
|
import com.google.common.collect.ImmutableSet;
|
||||||
import com.linkedin.common.FabricType;
|
import com.linkedin.common.FabricType;
|
||||||
import com.linkedin.common.Siblings;
|
import com.linkedin.common.Siblings;
|
||||||
import com.linkedin.common.SubTypes;
|
import com.linkedin.common.SubTypes;
|
||||||
@ -14,15 +15,19 @@ import com.linkedin.dataset.DatasetLineageType;
|
|||||||
import com.linkedin.dataset.Upstream;
|
import com.linkedin.dataset.Upstream;
|
||||||
import com.linkedin.dataset.UpstreamArray;
|
import com.linkedin.dataset.UpstreamArray;
|
||||||
import com.linkedin.dataset.UpstreamLineage;
|
import com.linkedin.dataset.UpstreamLineage;
|
||||||
|
import com.linkedin.entity.Aspect;
|
||||||
|
import com.linkedin.entity.EntityResponse;
|
||||||
|
import com.linkedin.entity.EnvelopedAspect;
|
||||||
|
import com.linkedin.entity.EnvelopedAspectMap;
|
||||||
|
import com.linkedin.entity.client.RestliEntityClient;
|
||||||
import com.linkedin.events.metadata.ChangeType;
|
import com.linkedin.events.metadata.ChangeType;
|
||||||
import com.linkedin.metadata.entity.EntityService;
|
|
||||||
import com.linkedin.metadata.key.DatasetKey;
|
import com.linkedin.metadata.key.DatasetKey;
|
||||||
import com.linkedin.metadata.models.registry.ConfigEntityRegistry;
|
import com.linkedin.metadata.models.registry.ConfigEntityRegistry;
|
||||||
import com.linkedin.metadata.models.registry.EntityRegistry;
|
import com.linkedin.metadata.models.registry.EntityRegistry;
|
||||||
|
import com.linkedin.metadata.search.EntitySearchService;
|
||||||
import com.linkedin.metadata.search.SearchEntity;
|
import com.linkedin.metadata.search.SearchEntity;
|
||||||
import com.linkedin.metadata.search.SearchEntityArray;
|
import com.linkedin.metadata.search.SearchEntityArray;
|
||||||
import com.linkedin.metadata.search.SearchResult;
|
import com.linkedin.metadata.search.SearchResult;
|
||||||
import com.linkedin.metadata.search.SearchService;
|
|
||||||
import com.linkedin.metadata.utils.GenericRecordUtils;
|
import com.linkedin.metadata.utils.GenericRecordUtils;
|
||||||
import com.linkedin.mxe.MetadataChangeLog;
|
import com.linkedin.mxe.MetadataChangeLog;
|
||||||
import com.linkedin.mxe.MetadataChangeProposal;
|
import com.linkedin.mxe.MetadataChangeProposal;
|
||||||
@ -36,16 +41,18 @@ import static org.mockito.ArgumentMatchers.*;
|
|||||||
|
|
||||||
public class SiblingAssociationHookTest {
|
public class SiblingAssociationHookTest {
|
||||||
private SiblingAssociationHook _siblingAssociationHook;
|
private SiblingAssociationHook _siblingAssociationHook;
|
||||||
EntityService _mockEntityService;
|
RestliEntityClient _mockEntityClient;
|
||||||
SearchService _mockSearchService;
|
EntitySearchService _mockSearchService;
|
||||||
|
Authentication _mockAuthentication;
|
||||||
|
|
||||||
@BeforeMethod
|
@BeforeMethod
|
||||||
public void setupTest() {
|
public void setupTest() {
|
||||||
EntityRegistry registry = new ConfigEntityRegistry(
|
EntityRegistry registry = new ConfigEntityRegistry(
|
||||||
SiblingAssociationHookTest.class.getClassLoader().getResourceAsStream("test-entity-registry-siblings.yml"));
|
SiblingAssociationHookTest.class.getClassLoader().getResourceAsStream("test-entity-registry-siblings.yml"));
|
||||||
_mockEntityService = Mockito.mock(EntityService.class);
|
_mockEntityClient = Mockito.mock(RestliEntityClient.class);
|
||||||
_mockSearchService = Mockito.mock(SearchService.class);
|
_mockSearchService = Mockito.mock(EntitySearchService.class);
|
||||||
_siblingAssociationHook = new SiblingAssociationHook(registry, _mockEntityService, _mockSearchService);
|
_mockAuthentication = Mockito.mock(Authentication.class);
|
||||||
|
_siblingAssociationHook = new SiblingAssociationHook(registry, _mockEntityClient, _mockSearchService, _mockAuthentication);
|
||||||
_siblingAssociationHook.setEnabled(true);
|
_siblingAssociationHook.setEnabled(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -53,14 +60,21 @@ public class SiblingAssociationHookTest {
|
|||||||
public void testInvokeWhenThereIsAPairWithDbtSourceNode() throws Exception {
|
public void testInvokeWhenThereIsAPairWithDbtSourceNode() throws Exception {
|
||||||
SubTypes mockSourceSubtypesAspect = new SubTypes();
|
SubTypes mockSourceSubtypesAspect = new SubTypes();
|
||||||
mockSourceSubtypesAspect.setTypeNames(new StringArray(ImmutableList.of("source")));
|
mockSourceSubtypesAspect.setTypeNames(new StringArray(ImmutableList.of("source")));
|
||||||
|
EnvelopedAspectMap mockResponseMap = new EnvelopedAspectMap();
|
||||||
|
mockResponseMap.put(SUB_TYPES_ASPECT_NAME, new EnvelopedAspect().setValue(new Aspect(mockSourceSubtypesAspect.data())));
|
||||||
|
EntityResponse mockResponse = new EntityResponse();
|
||||||
|
mockResponse.setAspects(mockResponseMap);
|
||||||
|
|
||||||
|
Mockito.when(_mockEntityClient.exists(Mockito.any(), Mockito.any())).thenReturn(true);
|
||||||
|
|
||||||
Mockito.when(_mockEntityService.exists(Mockito.any())).thenReturn(true);
|
|
||||||
|
|
||||||
Mockito.when(
|
Mockito.when(
|
||||||
_mockEntityService.getLatestAspect(
|
_mockEntityClient.getV2(
|
||||||
|
DATASET_ENTITY_NAME,
|
||||||
Urn.createFromString("urn:li:dataset:(urn:li:dataPlatform:dbt,my-proj.jaffle_shop.customers,PROD)"),
|
Urn.createFromString("urn:li:dataset:(urn:li:dataPlatform:dbt,my-proj.jaffle_shop.customers,PROD)"),
|
||||||
SUB_TYPES_ASPECT_NAME
|
ImmutableSet.of(SUB_TYPES_ASPECT_NAME),
|
||||||
)).thenReturn(mockSourceSubtypesAspect);
|
_mockAuthentication
|
||||||
|
)).thenReturn(mockResponse);
|
||||||
|
|
||||||
MetadataChangeLog event = new MetadataChangeLog();
|
MetadataChangeLog event = new MetadataChangeLog();
|
||||||
event.setEntityType(DATASET_ENTITY_NAME);
|
event.setEntityType(DATASET_ENTITY_NAME);
|
||||||
@ -90,9 +104,9 @@ public class SiblingAssociationHookTest {
|
|||||||
proposal.setAspect(GenericRecordUtils.serializeAspect(dbtSiblingsAspect));
|
proposal.setAspect(GenericRecordUtils.serializeAspect(dbtSiblingsAspect));
|
||||||
proposal.setChangeType(ChangeType.UPSERT);
|
proposal.setChangeType(ChangeType.UPSERT);
|
||||||
|
|
||||||
Mockito.verify(_mockEntityService, Mockito.times(1)).ingestProposal(
|
Mockito.verify(_mockEntityClient, Mockito.times(1)).ingestProposal(
|
||||||
Mockito.eq(proposal),
|
Mockito.eq(proposal),
|
||||||
Mockito.any(AuditStamp.class)
|
Mockito.eq(_mockAuthentication)
|
||||||
);
|
);
|
||||||
|
|
||||||
final Siblings sourceSiblingsAspect = new Siblings()
|
final Siblings sourceSiblingsAspect = new Siblings()
|
||||||
@ -106,9 +120,9 @@ public class SiblingAssociationHookTest {
|
|||||||
proposal2.setAspect(GenericRecordUtils.serializeAspect(sourceSiblingsAspect));
|
proposal2.setAspect(GenericRecordUtils.serializeAspect(sourceSiblingsAspect));
|
||||||
proposal2.setChangeType(ChangeType.UPSERT);
|
proposal2.setChangeType(ChangeType.UPSERT);
|
||||||
|
|
||||||
Mockito.verify(_mockEntityService, Mockito.times(1)).ingestProposal(
|
Mockito.verify(_mockEntityClient, Mockito.times(1)).ingestProposal(
|
||||||
Mockito.eq(proposal2),
|
Mockito.eq(proposal2),
|
||||||
Mockito.any(AuditStamp.class)
|
Mockito.eq(_mockAuthentication)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -117,13 +131,23 @@ public class SiblingAssociationHookTest {
|
|||||||
SubTypes mockSourceSubtypesAspect = new SubTypes();
|
SubTypes mockSourceSubtypesAspect = new SubTypes();
|
||||||
mockSourceSubtypesAspect.setTypeNames(new StringArray(ImmutableList.of("model")));
|
mockSourceSubtypesAspect.setTypeNames(new StringArray(ImmutableList.of("model")));
|
||||||
|
|
||||||
Mockito.when(_mockEntityService.exists(Mockito.any())).thenReturn(true);
|
Mockito.when(_mockEntityClient.exists(Mockito.any(), Mockito.any())).thenReturn(true);
|
||||||
|
|
||||||
|
EnvelopedAspectMap mockResponseMap = new EnvelopedAspectMap();
|
||||||
|
mockResponseMap.put(SUB_TYPES_ASPECT_NAME, new EnvelopedAspect().setValue(new Aspect(mockSourceSubtypesAspect.data())));
|
||||||
|
EntityResponse mockResponse = new EntityResponse();
|
||||||
|
mockResponse.setAspects(mockResponseMap);
|
||||||
|
|
||||||
|
Mockito.when(_mockEntityClient.exists(Mockito.any(), Mockito.any())).thenReturn(true);
|
||||||
|
|
||||||
|
|
||||||
Mockito.when(
|
Mockito.when(
|
||||||
_mockEntityService.getLatestAspect(
|
_mockEntityClient.getV2(
|
||||||
|
DATASET_ENTITY_NAME,
|
||||||
Urn.createFromString("urn:li:dataset:(urn:li:dataPlatform:dbt,my-proj.jaffle_shop.customers,PROD)"),
|
Urn.createFromString("urn:li:dataset:(urn:li:dataPlatform:dbt,my-proj.jaffle_shop.customers,PROD)"),
|
||||||
SUB_TYPES_ASPECT_NAME
|
ImmutableSet.of(SUB_TYPES_ASPECT_NAME),
|
||||||
)).thenReturn(mockSourceSubtypesAspect);
|
_mockAuthentication
|
||||||
|
)).thenReturn(mockResponse);
|
||||||
|
|
||||||
MetadataChangeLog event = new MetadataChangeLog();
|
MetadataChangeLog event = new MetadataChangeLog();
|
||||||
event.setEntityType(DATASET_ENTITY_NAME);
|
event.setEntityType(DATASET_ENTITY_NAME);
|
||||||
@ -153,15 +177,15 @@ public class SiblingAssociationHookTest {
|
|||||||
proposal.setAspect(GenericRecordUtils.serializeAspect(dbtSiblingsAspect));
|
proposal.setAspect(GenericRecordUtils.serializeAspect(dbtSiblingsAspect));
|
||||||
proposal.setChangeType(ChangeType.UPSERT);
|
proposal.setChangeType(ChangeType.UPSERT);
|
||||||
|
|
||||||
Mockito.verify(_mockEntityService, Mockito.times(0)).ingestProposal(
|
Mockito.verify(_mockEntityClient, Mockito.times(0)).ingestProposal(
|
||||||
Mockito.eq(proposal),
|
Mockito.eq(proposal),
|
||||||
Mockito.any(AuditStamp.class)
|
Mockito.eq(_mockAuthentication)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testInvokeWhenThereIsAPairWithBigqueryDownstreamNode() throws Exception {
|
public void testInvokeWhenThereIsAPairWithBigqueryDownstreamNode() throws Exception {
|
||||||
Mockito.when(_mockEntityService.exists(Mockito.any())).thenReturn(true);
|
Mockito.when(_mockEntityClient.exists(Mockito.any(), Mockito.any())).thenReturn(true);
|
||||||
|
|
||||||
MetadataChangeLog event = new MetadataChangeLog();
|
MetadataChangeLog event = new MetadataChangeLog();
|
||||||
event.setEntityType(DATASET_ENTITY_NAME);
|
event.setEntityType(DATASET_ENTITY_NAME);
|
||||||
@ -191,9 +215,9 @@ public class SiblingAssociationHookTest {
|
|||||||
proposal.setAspect(GenericRecordUtils.serializeAspect(dbtSiblingsAspect));
|
proposal.setAspect(GenericRecordUtils.serializeAspect(dbtSiblingsAspect));
|
||||||
proposal.setChangeType(ChangeType.UPSERT);
|
proposal.setChangeType(ChangeType.UPSERT);
|
||||||
|
|
||||||
Mockito.verify(_mockEntityService, Mockito.times(1)).ingestProposal(
|
Mockito.verify(_mockEntityClient, Mockito.times(1)).ingestProposal(
|
||||||
Mockito.eq(proposal),
|
Mockito.eq(proposal),
|
||||||
Mockito.any(AuditStamp.class)
|
Mockito.eq(_mockAuthentication)
|
||||||
);
|
);
|
||||||
|
|
||||||
final Siblings sourceSiblingsAspect = new Siblings()
|
final Siblings sourceSiblingsAspect = new Siblings()
|
||||||
@ -207,15 +231,15 @@ public class SiblingAssociationHookTest {
|
|||||||
proposal2.setAspect(GenericRecordUtils.serializeAspect(sourceSiblingsAspect));
|
proposal2.setAspect(GenericRecordUtils.serializeAspect(sourceSiblingsAspect));
|
||||||
proposal2.setChangeType(ChangeType.UPSERT);
|
proposal2.setChangeType(ChangeType.UPSERT);
|
||||||
|
|
||||||
Mockito.verify(_mockEntityService, Mockito.times(1)).ingestProposal(
|
Mockito.verify(_mockEntityClient, Mockito.times(1)).ingestProposal(
|
||||||
Mockito.eq(proposal2),
|
Mockito.eq(proposal2),
|
||||||
Mockito.any(AuditStamp.class)
|
Mockito.eq(_mockAuthentication)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testInvokeWhenThereIsAKeyBeingReingested() throws Exception {
|
public void testInvokeWhenThereIsAKeyBeingReingested() throws Exception {
|
||||||
Mockito.when(_mockEntityService.exists(Mockito.any())).thenReturn(true);
|
Mockito.when(_mockEntityClient.exists(Mockito.any(), Mockito.any())).thenReturn(true);
|
||||||
|
|
||||||
SearchResult returnSearchResult = new SearchResult();
|
SearchResult returnSearchResult = new SearchResult();
|
||||||
SearchEntityArray returnEntityArray = new SearchEntityArray();
|
SearchEntityArray returnEntityArray = new SearchEntityArray();
|
||||||
@ -229,7 +253,7 @@ public class SiblingAssociationHookTest {
|
|||||||
|
|
||||||
Mockito.when(
|
Mockito.when(
|
||||||
_mockSearchService.search(
|
_mockSearchService.search(
|
||||||
anyString(), anyString(), any(), any(), anyInt(), anyInt(), any()
|
anyString(), anyString(), any(), any(), anyInt(), anyInt()
|
||||||
)).thenReturn(returnSearchResult);
|
)).thenReturn(returnSearchResult);
|
||||||
|
|
||||||
MetadataChangeLog event = new MetadataChangeLog();
|
MetadataChangeLog event = new MetadataChangeLog();
|
||||||
@ -256,9 +280,9 @@ public class SiblingAssociationHookTest {
|
|||||||
proposal.setAspect(GenericRecordUtils.serializeAspect(dbtSiblingsAspect));
|
proposal.setAspect(GenericRecordUtils.serializeAspect(dbtSiblingsAspect));
|
||||||
proposal.setChangeType(ChangeType.UPSERT);
|
proposal.setChangeType(ChangeType.UPSERT);
|
||||||
|
|
||||||
Mockito.verify(_mockEntityService, Mockito.times(1)).ingestProposal(
|
Mockito.verify(_mockEntityClient, Mockito.times(1)).ingestProposal(
|
||||||
Mockito.eq(proposal),
|
Mockito.eq(proposal),
|
||||||
Mockito.any(AuditStamp.class)
|
Mockito.eq(_mockAuthentication)
|
||||||
);
|
);
|
||||||
|
|
||||||
final Siblings sourceSiblingsAspect = new Siblings()
|
final Siblings sourceSiblingsAspect = new Siblings()
|
||||||
@ -272,9 +296,9 @@ public class SiblingAssociationHookTest {
|
|||||||
proposal2.setAspect(GenericRecordUtils.serializeAspect(sourceSiblingsAspect));
|
proposal2.setAspect(GenericRecordUtils.serializeAspect(sourceSiblingsAspect));
|
||||||
proposal2.setChangeType(ChangeType.UPSERT);
|
proposal2.setChangeType(ChangeType.UPSERT);
|
||||||
|
|
||||||
Mockito.verify(_mockEntityService, Mockito.times(1)).ingestProposal(
|
Mockito.verify(_mockEntityClient, Mockito.times(1)).ingestProposal(
|
||||||
Mockito.eq(proposal2),
|
Mockito.eq(proposal2),
|
||||||
Mockito.any(AuditStamp.class)
|
Mockito.eq(_mockAuthentication)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -114,6 +114,13 @@
|
|||||||
"optional" : true
|
"optional" : true
|
||||||
} ],
|
} ],
|
||||||
"returns" : "com.linkedin.metadata.run.DeleteReferencesResponse"
|
"returns" : "com.linkedin.metadata.run.DeleteReferencesResponse"
|
||||||
|
}, {
|
||||||
|
"name" : "exists",
|
||||||
|
"parameters" : [ {
|
||||||
|
"name" : "urn",
|
||||||
|
"type" : "string"
|
||||||
|
} ],
|
||||||
|
"returns" : "boolean"
|
||||||
}, {
|
}, {
|
||||||
"name" : "filter",
|
"name" : "filter",
|
||||||
"parameters" : [ {
|
"parameters" : [ {
|
||||||
|
|||||||
@ -5679,6 +5679,13 @@
|
|||||||
"optional" : true
|
"optional" : true
|
||||||
} ],
|
} ],
|
||||||
"returns" : "com.linkedin.metadata.run.DeleteReferencesResponse"
|
"returns" : "com.linkedin.metadata.run.DeleteReferencesResponse"
|
||||||
|
}, {
|
||||||
|
"name" : "exists",
|
||||||
|
"parameters" : [ {
|
||||||
|
"name" : "urn",
|
||||||
|
"type" : "string"
|
||||||
|
} ],
|
||||||
|
"returns" : "boolean"
|
||||||
}, {
|
}, {
|
||||||
"name" : "filter",
|
"name" : "filter",
|
||||||
"parameters" : [ {
|
"parameters" : [ {
|
||||||
|
|||||||
@ -296,4 +296,6 @@ public interface EntityClient {
|
|||||||
|
|
||||||
public void producePlatformEvent(@Nonnull String name, @Nullable String key, @Nonnull PlatformEvent event,
|
public void producePlatformEvent(@Nonnull String name, @Nullable String key, @Nonnull PlatformEvent event,
|
||||||
@Nonnull Authentication authentication) throws Exception;
|
@Nonnull Authentication authentication) throws Exception;
|
||||||
|
|
||||||
|
Boolean exists(Urn urn, @Nonnull Authentication authentication) throws RemoteInvocationException;
|
||||||
}
|
}
|
||||||
|
|||||||
@ -463,4 +463,9 @@ public class JavaEntityClient implements EntityClient {
|
|||||||
@Nonnull Authentication authentication) throws Exception {
|
@Nonnull Authentication authentication) throws Exception {
|
||||||
_eventProducer.producePlatformEvent(name, key, event);
|
_eventProducer.producePlatformEvent(name, key, event);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Boolean exists(Urn urn, @Nonnull Authentication authentication) throws RemoteInvocationException {
|
||||||
|
return _entityService.exists(urn);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -20,6 +20,7 @@ import com.linkedin.entity.EntitiesDoBatchIngestRequestBuilder;
|
|||||||
import com.linkedin.entity.EntitiesDoBrowseRequestBuilder;
|
import com.linkedin.entity.EntitiesDoBrowseRequestBuilder;
|
||||||
import com.linkedin.entity.EntitiesDoDeleteReferencesRequestBuilder;
|
import com.linkedin.entity.EntitiesDoDeleteReferencesRequestBuilder;
|
||||||
import com.linkedin.entity.EntitiesDoDeleteRequestBuilder;
|
import com.linkedin.entity.EntitiesDoDeleteRequestBuilder;
|
||||||
|
import com.linkedin.entity.EntitiesDoExistsRequestBuilder;
|
||||||
import com.linkedin.entity.EntitiesDoFilterRequestBuilder;
|
import com.linkedin.entity.EntitiesDoFilterRequestBuilder;
|
||||||
import com.linkedin.entity.EntitiesDoGetBrowsePathsRequestBuilder;
|
import com.linkedin.entity.EntitiesDoGetBrowsePathsRequestBuilder;
|
||||||
import com.linkedin.entity.EntitiesDoIngestRequestBuilder;
|
import com.linkedin.entity.EntitiesDoIngestRequestBuilder;
|
||||||
@ -662,4 +663,11 @@ public class RestliEntityClient extends BaseClient implements EntityClient {
|
|||||||
}
|
}
|
||||||
sendClientRequest(requestBuilder, authentication);
|
sendClientRequest(requestBuilder, authentication);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Boolean exists(Urn urn, @Nonnull Authentication authentication) throws RemoteInvocationException {
|
||||||
|
final EntitiesDoExistsRequestBuilder requestBuilder =
|
||||||
|
ENTITIES_REQUEST_BUILDERS.actionExists().urnParam(urn.toString());
|
||||||
|
return sendClientRequest(requestBuilder, authentication).getEntity();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -89,6 +89,7 @@ public class EntityResource extends CollectionResourceTaskTemplate<String, Entit
|
|||||||
private static final String PARAM_COUNT = "count";
|
private static final String PARAM_COUNT = "count";
|
||||||
private static final String PARAM_VALUE = "value";
|
private static final String PARAM_VALUE = "value";
|
||||||
private static final String SYSTEM_METADATA = "systemMetadata";
|
private static final String SYSTEM_METADATA = "systemMetadata";
|
||||||
|
private static final String ACTION_EXISTS = "exists";
|
||||||
|
|
||||||
private final Clock _clock = Clock.systemUTC();
|
private final Clock _clock = Clock.systemUTC();
|
||||||
|
|
||||||
@ -483,4 +484,14 @@ public class EntityResource extends CollectionResourceTaskTemplate<String, Entit
|
|||||||
return RestliUtil.toTask(() -> _entitySearchService.filter(entityName, filter, sortCriterion, start, count),
|
return RestliUtil.toTask(() -> _entitySearchService.filter(entityName, filter, sortCriterion, start, count),
|
||||||
MetricRegistry.name(this.getClass(), "search"));
|
MetricRegistry.name(this.getClass(), "search"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Action(name = ACTION_EXISTS)
|
||||||
|
@Nonnull
|
||||||
|
@WithSpan
|
||||||
|
public Task<Boolean> exists(@ActionParam(PARAM_URN) @Nonnull String urnStr) throws URISyntaxException {
|
||||||
|
log.info("EXISTS for {}", urnStr);
|
||||||
|
Urn urn = Urn.createFromString(urnStr);
|
||||||
|
return RestliUtil.toTask(() -> _entityService.exists(urn),
|
||||||
|
MetricRegistry.name(this.getClass(), "exists"));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -0,0 +1,11 @@
|
|||||||
|
# see https://datahubproject.io/docs/generated/ingestion/sources/file for complete documentation
|
||||||
|
source:
|
||||||
|
type: "file"
|
||||||
|
config:
|
||||||
|
filename: "./cypress_dbt_data.json"
|
||||||
|
|
||||||
|
# see https://datahubproject.io/docs/metadata-ingestion/sink_docs/datahub for complete documentation
|
||||||
|
sink:
|
||||||
|
type: "datahub-rest"
|
||||||
|
config:
|
||||||
|
server: "http://localhost:8080"
|
||||||
Loading…
x
Reference in New Issue
Block a user