mirror of
https://github.com/datahub-project/datahub.git
synced 2025-12-27 18:07:57 +00:00
feat(businessAttribute): parallelize-business-attribute-propagation (#10638)
This commit is contained in:
parent
d569ca17f1
commit
4a4d41cca6
@ -942,29 +942,27 @@ public class Neo4jGraphService implements GraphService {
|
||||
final String edgeCriteria = relationshipFilterToCriteria(relationshipFilter);
|
||||
|
||||
final RelationshipDirection relationshipDirection = relationshipFilter.getDirection();
|
||||
String srcNodeLabel = "";
|
||||
|
||||
String matchTemplate = "MATCH (src %s)-[r%s %s]-(dest %s)%s";
|
||||
if (relationshipDirection == RelationshipDirection.INCOMING) {
|
||||
matchTemplate = "MATCH (src %s)<-[r%s %s]-(dest %s)%s";
|
||||
} else if (relationshipDirection == RelationshipDirection.OUTGOING) {
|
||||
matchTemplate = "MATCH (src %s)-[r%s %s]->(dest %s)%s";
|
||||
}
|
||||
|
||||
String srcNodeLabel = StringUtils.EMPTY;
|
||||
// Create a URN from the String. Only proceed if srcCriteria is not null or empty
|
||||
if (srcCriteria != null && !srcCriteria.isEmpty()) {
|
||||
if (StringUtils.isNotEmpty(srcCriteria)) {
|
||||
final String urnValue =
|
||||
sourceEntityFilter.getOr().get(0).getAnd().get(0).getValue().toString();
|
||||
try {
|
||||
final Urn urn = Urn.createFromString(urnValue);
|
||||
srcNodeLabel = urn.getEntityType();
|
||||
matchTemplate = matchTemplate.replace("(src ", "(src:%s ");
|
||||
} catch (URISyntaxException e) {
|
||||
log.error("Failed to parse URN: {} ", urnValue, e);
|
||||
}
|
||||
}
|
||||
String matchTemplate = "MATCH (src:%s %s)-[r%s %s]-(dest %s)%s";
|
||||
if (relationshipDirection == RelationshipDirection.INCOMING) {
|
||||
matchTemplate = "MATCH (src:%s %s)<-[r%s %s]-(dest %s)%s";
|
||||
} else if (relationshipDirection == RelationshipDirection.OUTGOING) {
|
||||
matchTemplate = "MATCH (src:%s %s)-[r%s %s]->(dest %s)%s";
|
||||
}
|
||||
|
||||
final String returnNodes =
|
||||
String.format(
|
||||
"RETURN dest, src, type(r)"); // Return both related entity and the relationship type.
|
||||
final String returnCount = "RETURN count(*)"; // For getting the total results.
|
||||
|
||||
String relationshipTypeFilter = "";
|
||||
if (!relationshipTypes.isEmpty()) {
|
||||
@ -974,18 +972,34 @@ public class Neo4jGraphService implements GraphService {
|
||||
String whereClause = computeEntityTypeWhereClause(sourceTypes, destinationTypes);
|
||||
|
||||
// Build Statement strings
|
||||
String baseStatementString =
|
||||
String.format(
|
||||
matchTemplate,
|
||||
srcNodeLabel,
|
||||
srcCriteria,
|
||||
relationshipTypeFilter,
|
||||
edgeCriteria,
|
||||
destCriteria,
|
||||
whereClause);
|
||||
String baseStatementString;
|
||||
|
||||
if (StringUtils.isNotEmpty(srcNodeLabel)) {
|
||||
baseStatementString =
|
||||
String.format(
|
||||
matchTemplate,
|
||||
srcNodeLabel,
|
||||
srcCriteria,
|
||||
relationshipTypeFilter,
|
||||
edgeCriteria,
|
||||
destCriteria,
|
||||
whereClause);
|
||||
} else {
|
||||
baseStatementString =
|
||||
String.format(
|
||||
matchTemplate,
|
||||
srcCriteria,
|
||||
relationshipTypeFilter,
|
||||
edgeCriteria,
|
||||
destCriteria,
|
||||
whereClause);
|
||||
}
|
||||
log.info(baseStatementString);
|
||||
|
||||
final String returnNodes =
|
||||
"RETURN dest, src, type(r)"; // Return both related entity and the relationship type.
|
||||
final String returnCount = "RETURN count(*)"; // For getting the total results.
|
||||
|
||||
final String resultStatementString =
|
||||
String.format("%s %s SKIP $offset LIMIT $count", baseStatementString, returnNodes);
|
||||
final String countStatementString = String.format("%s %s", baseStatementString, returnCount);
|
||||
|
||||
@ -22,12 +22,13 @@ import com.linkedin.metadata.utils.PegasusUtils;
|
||||
import com.linkedin.mxe.PlatformEvent;
|
||||
import com.linkedin.platform.event.v1.EntityChangeEvent;
|
||||
import io.datahubproject.metadata.context.OperationContext;
|
||||
import java.util.Arrays;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.function.BiConsumer;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.stream.Collectors;
|
||||
import lombok.Data;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang.time.StopWatch;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.lang.NonNull;
|
||||
import org.springframework.lang.Nullable;
|
||||
@ -41,18 +42,25 @@ public class BusinessAttributeUpdateHookService {
|
||||
private final UpdateIndicesService updateIndicesService;
|
||||
private final int relatedEntitiesCount;
|
||||
private final int getRelatedEntitiesBatchSize;
|
||||
|
||||
private ExecutorService executor;
|
||||
public static final String TAG = "TAG";
|
||||
public static final String GLOSSARY_TERM = "GLOSSARY_TERM";
|
||||
public static final String DOCUMENTATION = "DOCUMENTATION";
|
||||
private final int threadCount;
|
||||
private final int AWAIT_TERMINATION_TIME = 10;
|
||||
private final int keepAlive;
|
||||
|
||||
public BusinessAttributeUpdateHookService(
|
||||
@NonNull UpdateIndicesService updateIndicesService,
|
||||
@NonNull @Value("${businessAttribute.fetchRelatedEntitiesCount}") int relatedEntitiesCount,
|
||||
@NonNull @Value("${businessAttribute.fetchRelatedEntitiesBatchSize}") int relatedBatchSize) {
|
||||
@NonNull @Value("${businessAttribute.fetchRelatedEntitiesBatchSize}") int relatedBatchSize,
|
||||
@NonNull @Value("${businessAttribute.threadCount}") int threadCount,
|
||||
@NonNull @Value("${businessAttribute.keepAliveTime}") int keepAlive) {
|
||||
this.updateIndicesService = updateIndicesService;
|
||||
this.relatedEntitiesCount = relatedEntitiesCount;
|
||||
this.getRelatedEntitiesBatchSize = relatedBatchSize;
|
||||
this.threadCount = threadCount;
|
||||
this.keepAlive = keepAlive;
|
||||
}
|
||||
|
||||
public void handleChangeEvent(
|
||||
@ -61,38 +69,51 @@ public class BusinessAttributeUpdateHookService {
|
||||
GenericRecordUtils.deserializePayload(
|
||||
event.getPayload().getValue(), EntityChangeEvent.class);
|
||||
|
||||
executor = businessAttributePropagationWorkerPool(threadCount, keepAlive);
|
||||
|
||||
if (!entityChangeEvent.getEntityType().equals(Constants.BUSINESS_ATTRIBUTE_ENTITY_NAME)) {
|
||||
log.info("Skipping MCL event for entity:" + entityChangeEvent.getEntityType());
|
||||
return;
|
||||
}
|
||||
|
||||
final Set<String> businessAttributeCategories =
|
||||
ImmutableSet.of(TAG, GLOSSARY_TERM, DOCUMENTATION);
|
||||
if (!businessAttributeCategories.contains(entityChangeEvent.getCategory())) {
|
||||
log.info("Skipping MCL event for category: " + entityChangeEvent.getCategory());
|
||||
return;
|
||||
}
|
||||
|
||||
Urn urn = entityChangeEvent.getEntityUrn();
|
||||
log.info("Business Attribute update hook invoked for urn :" + urn);
|
||||
log.info("Business Attribute update hook invoked for urn : {}", urn);
|
||||
fetchRelatedEntities(
|
||||
opContext,
|
||||
urn,
|
||||
(batch, batchNumber) -> processBatch(opContext, batch, batchNumber),
|
||||
(batch, batchNumber, entityKey) -> processBatch(opContext, batch, batchNumber, entityKey),
|
||||
null,
|
||||
0,
|
||||
1);
|
||||
|
||||
executor.shutdown();
|
||||
try {
|
||||
if (!executor.awaitTermination(AWAIT_TERMINATION_TIME, TimeUnit.MINUTES)) {
|
||||
executor.shutdownNow(); // Cancel currently executing tasks
|
||||
if (!executor.awaitTermination(AWAIT_TERMINATION_TIME, TimeUnit.MINUTES))
|
||||
log.error("Business Attribute Propagation Executor is not terminating");
|
||||
}
|
||||
} catch (InterruptedException ie) {
|
||||
executor.shutdownNow();
|
||||
}
|
||||
}
|
||||
|
||||
private void fetchRelatedEntities(
|
||||
@NonNull final OperationContext opContext,
|
||||
@NonNull final Urn urn,
|
||||
@NonNull final BiConsumer<RelatedEntitiesScrollResult, Integer> resultConsumer,
|
||||
@NonNull
|
||||
final TriFunction<RelatedEntitiesScrollResult, Integer, String, Callable<ExecutionResult>>
|
||||
resultFunction,
|
||||
@Nullable String scrollId,
|
||||
int consumedEntityCount,
|
||||
int batchNumber) {
|
||||
GraphRetriever graph = opContext.getRetrieverContext().get().getGraphRetriever();
|
||||
|
||||
final ArrayList<Future<ExecutionResult>> futureList = new ArrayList<>();
|
||||
RelatedEntitiesScrollResult result =
|
||||
graph.scrollRelatedEntities(
|
||||
null,
|
||||
@ -106,52 +127,143 @@ public class BusinessAttributeUpdateHookService {
|
||||
getRelatedEntitiesBatchSize,
|
||||
null,
|
||||
null);
|
||||
resultConsumer.accept(result, batchNumber);
|
||||
|
||||
futureList.add(
|
||||
executor.submit(resultFunction.apply(result, batchNumber, urn.getEntityKey().toString())));
|
||||
|
||||
consumedEntityCount = consumedEntityCount + result.getEntities().size();
|
||||
if (result.getScrollId() != null && consumedEntityCount < relatedEntitiesCount) {
|
||||
batchNumber = batchNumber + 1;
|
||||
fetchRelatedEntities(
|
||||
opContext, urn, resultConsumer, result.getScrollId(), consumedEntityCount, batchNumber);
|
||||
opContext, urn, resultFunction, result.getScrollId(), consumedEntityCount, batchNumber);
|
||||
}
|
||||
|
||||
for (Future<ExecutionResult> future : futureList) {
|
||||
try {
|
||||
ExecutionResult futureResult = future.get();
|
||||
if (futureResult.getException() != null) {
|
||||
log.error(
|
||||
"Batch {} for BA:{} is failed with exception",
|
||||
futureResult.getBatchNumber(),
|
||||
futureResult.getEntityKey(),
|
||||
futureResult.getException());
|
||||
} else {
|
||||
log.info(futureResult.getResult());
|
||||
}
|
||||
} catch (InterruptedException | ExecutionException e) {
|
||||
log.error("Business Attribute Propagation Parallel Processing Exception", e);
|
||||
}
|
||||
}
|
||||
futureList.clear();
|
||||
}
|
||||
|
||||
private void processBatch(
|
||||
private Callable<ExecutionResult> processBatch(
|
||||
@NonNull OperationContext opContext,
|
||||
@NonNull RelatedEntitiesScrollResult batch,
|
||||
int batchNumber) {
|
||||
AspectRetriever aspectRetriever = opContext.getRetrieverContext().get().getAspectRetriever();
|
||||
log.info("BA Update Batch {} started", batchNumber);
|
||||
Set<Urn> entityUrns =
|
||||
batch.getEntities().stream()
|
||||
.map(RelatedEntity::getUrn)
|
||||
.map(UrnUtils::getUrn)
|
||||
.collect(Collectors.toSet());
|
||||
int batchNumber,
|
||||
String entityKey) {
|
||||
return () -> {
|
||||
StopWatch stopWatch = new StopWatch();
|
||||
stopWatch.start();
|
||||
AspectRetriever aspectRetriever = opContext.getRetrieverContext().get().getAspectRetriever();
|
||||
log.info("Batch {} for BA:{} started", batchNumber, entityKey);
|
||||
ExecutionResult executionResult = new ExecutionResult();
|
||||
executionResult.setBatchNumber(batchNumber);
|
||||
executionResult.setEntityKey(entityKey);
|
||||
try {
|
||||
Set<Urn> entityUrns =
|
||||
batch.getEntities().stream()
|
||||
.map(RelatedEntity::getUrn)
|
||||
.map(UrnUtils::getUrn)
|
||||
.collect(Collectors.toSet());
|
||||
|
||||
Map<Urn, Map<String, Aspect>> entityAspectMap =
|
||||
aspectRetriever.getLatestAspectObjects(
|
||||
entityUrns, Set.of(Constants.BUSINESS_ATTRIBUTE_ASPECT));
|
||||
Map<Urn, Map<String, Aspect>> entityAspectMap =
|
||||
aspectRetriever.getLatestAspectObjects(
|
||||
entityUrns, Set.of(Constants.BUSINESS_ATTRIBUTE_ASPECT));
|
||||
|
||||
entityAspectMap.entrySet().stream()
|
||||
.filter(entry -> entry.getValue().containsKey(Constants.BUSINESS_ATTRIBUTE_ASPECT))
|
||||
.forEach(
|
||||
entry -> {
|
||||
final Urn entityUrn = entry.getKey();
|
||||
final Aspect aspect = entry.getValue().get(Constants.BUSINESS_ATTRIBUTE_ASPECT);
|
||||
entityAspectMap.entrySet().stream()
|
||||
.filter(entry -> entry.getValue().containsKey(Constants.BUSINESS_ATTRIBUTE_ASPECT))
|
||||
.forEach(
|
||||
entry -> {
|
||||
final Urn entityUrn = entry.getKey();
|
||||
final Aspect aspect = entry.getValue().get(Constants.BUSINESS_ATTRIBUTE_ASPECT);
|
||||
updateIndicesService.handleChangeEvent(
|
||||
opContext,
|
||||
PegasusUtils.constructMCL(
|
||||
null,
|
||||
Constants.SCHEMA_FIELD_ENTITY_NAME,
|
||||
entityUrn,
|
||||
ChangeType.UPSERT,
|
||||
Constants.BUSINESS_ATTRIBUTE_ASPECT,
|
||||
opContext.getAuditStamp(),
|
||||
new BusinessAttributes(aspect.data()),
|
||||
null,
|
||||
null,
|
||||
null));
|
||||
});
|
||||
stopWatch.stop();
|
||||
String result =
|
||||
String.format(
|
||||
"Batch %s for BA:%s is completed in %s",
|
||||
batchNumber, entityKey, TimeAgo.toDuration(stopWatch.getTime()))
|
||||
.toString();
|
||||
executionResult.setResult(result);
|
||||
} catch (Exception e) {
|
||||
executionResult.setException(e);
|
||||
}
|
||||
return executionResult;
|
||||
};
|
||||
}
|
||||
|
||||
updateIndicesService.handleChangeEvent(
|
||||
opContext,
|
||||
PegasusUtils.constructMCL(
|
||||
null,
|
||||
Constants.SCHEMA_FIELD_ENTITY_NAME,
|
||||
entityUrn,
|
||||
ChangeType.UPSERT,
|
||||
Constants.BUSINESS_ATTRIBUTE_ASPECT,
|
||||
opContext.getAuditStamp(),
|
||||
new BusinessAttributes(aspect.data()),
|
||||
null,
|
||||
null,
|
||||
null));
|
||||
});
|
||||
log.info("BA Update Batch {} completed", batchNumber);
|
||||
private ExecutorService businessAttributePropagationWorkerPool(int numThreads, int keepAlive) {
|
||||
numThreads = numThreads < 0 ? Runtime.getRuntime().availableProcessors() * 2 : numThreads;
|
||||
return new ThreadPoolExecutor(
|
||||
numThreads, numThreads, keepAlive, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
|
||||
}
|
||||
|
||||
@FunctionalInterface
|
||||
private interface TriFunction<T, U, V, R> {
|
||||
R apply(T t, U u, V v);
|
||||
}
|
||||
|
||||
@Data
|
||||
private class ExecutionResult {
|
||||
String result;
|
||||
Throwable exception;
|
||||
int batchNumber;
|
||||
String entityKey;
|
||||
}
|
||||
|
||||
private static final class TimeAgo {
|
||||
private static final List<Long> times =
|
||||
Arrays.asList(
|
||||
TimeUnit.DAYS.toMillis(365),
|
||||
TimeUnit.DAYS.toMillis(30),
|
||||
TimeUnit.DAYS.toMillis(1),
|
||||
TimeUnit.HOURS.toMillis(1),
|
||||
TimeUnit.MINUTES.toMillis(1),
|
||||
TimeUnit.SECONDS.toMillis(1),
|
||||
TimeUnit.MILLISECONDS.toMillis(1));
|
||||
private static final List<String> timesString =
|
||||
Arrays.asList("year", "month", "day", "hour", "minute", "second", "milliseconds");
|
||||
|
||||
private static String toDuration(long duration) {
|
||||
|
||||
StringBuffer res = new StringBuffer();
|
||||
for (int i = 0; i < times.size(); i++) {
|
||||
Long current = times.get(i);
|
||||
long temp = duration / current;
|
||||
if (temp > 0) {
|
||||
res.append(temp)
|
||||
.append(" ")
|
||||
.append(timesString.get(i))
|
||||
.append(temp != 1 ? "s" : StringUtils.EMPTY)
|
||||
.append(" ");
|
||||
}
|
||||
duration = duration % current;
|
||||
}
|
||||
if (StringUtils.EMPTY.equals(res.toString())) return "0 seconds ago";
|
||||
else return res.toString();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -76,7 +76,7 @@ public class BusinessAttributeUpdateHookTest {
|
||||
mockUpdateIndicesService = mock(UpdateIndicesService.class);
|
||||
actorUrn = Urn.createFromString(TEST_ACTOR_URN);
|
||||
businessAttributeServiceHook =
|
||||
new BusinessAttributeUpdateHookService(mockUpdateIndicesService, 100, 1);
|
||||
new BusinessAttributeUpdateHookService(mockUpdateIndicesService, 100, 1, 10, 60);
|
||||
businessAttributeUpdateHook =
|
||||
new BusinessAttributeUpdateHook(businessAttributeServiceHook, true);
|
||||
}
|
||||
|
||||
@ -460,6 +460,8 @@ forms:
|
||||
businessAttribute:
|
||||
fetchRelatedEntitiesCount: ${BUSINESS_ATTRIBUTE_RELATED_ENTITIES_COUNT:20000}
|
||||
fetchRelatedEntitiesBatchSize: ${BUSINESS_ATTRIBUTE_RELATED_ENTITIES_BATCH_SIZE:1000}
|
||||
threadCount: ${BUSINESS_ATTRIBUTE_PROPAGATION_CONCURRENCY_THREAD_COUNT:-1} # Thread Pool size, default 2 * # of cores
|
||||
keepAliveTime: ${BUSINESS_ATTRIBUTE_PROPAGATION_CONCURRENCY_KEEP_ALIVE:60} # Number of seconds to keep inactive threads alive
|
||||
|
||||
metadataChangeProposal:
|
||||
throttle:
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user