fix(gms): filter out runs of a dataJob without any run-events (#11223)

This commit is contained in:
ksrinath 2024-09-11 20:57:21 +05:30 committed by GitHub
parent 33e898af4a
commit fd6d4c88ec
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 333 additions and 5 deletions

View File

@ -4,9 +4,7 @@ import com.google.common.collect.ImmutableList;
import com.linkedin.common.urn.Urn;
import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.concurrency.GraphQLConcurrencyUtils;
import com.linkedin.datahub.graphql.generated.DataProcessInstance;
import com.linkedin.datahub.graphql.generated.DataProcessInstanceResult;
import com.linkedin.datahub.graphql.generated.Entity;
import com.linkedin.datahub.graphql.generated.*;
import com.linkedin.datahub.graphql.types.dataprocessinst.mappers.DataProcessInstanceMapper;
import com.linkedin.entity.EntityResponse;
import com.linkedin.entity.client.EntityClient;
@ -33,6 +31,8 @@ import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** GraphQL Resolver used for fetching a list of Task Runs associated with a Data Job */
public class DataJobRunsResolver
@ -40,6 +40,8 @@ public class DataJobRunsResolver
private static final String PARENT_TEMPLATE_URN_SEARCH_INDEX_FIELD_NAME = "parentTemplate";
private static final String CREATED_TIME_SEARCH_INDEX_FIELD_NAME = "created";
private static final String HAS_RUN_EVENTS_FIELD_NAME = "hasRunEvents";
private static final Logger log = LoggerFactory.getLogger(DataJobRunsResolver.class);
private final EntityClient _entityClient;
@ -117,7 +119,12 @@ public class DataJobRunsResolver
new Criterion()
.setField(PARENT_TEMPLATE_URN_SEARCH_INDEX_FIELD_NAME)
.setCondition(Condition.EQUAL)
.setValue(entityUrn)));
.setValue(entityUrn),
new Criterion()
.setField(HAS_RUN_EVENTS_FIELD_NAME)
.setCondition(Condition.EQUAL)
.setValue(Boolean.TRUE.toString())));
final Filter filter = new Filter();
filter.setOr(
new ConjunctiveCriterionArray(ImmutableList.of(new ConjunctiveCriterion().setAnd(array))));

View File

@ -0,0 +1,43 @@
package com.linkedin.datahub.upgrade.config;
import com.linkedin.datahub.upgrade.system.NonBlockingSystemUpgrade;
import com.linkedin.datahub.upgrade.system.dataprocessinstances.BackfillDataProcessInstances;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.search.elasticsearch.ElasticSearchService;
import io.datahubproject.metadata.context.OperationContext;
import org.opensearch.client.RestHighLevelClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Configuration;
@Configuration
@Conditional(SystemUpdateCondition.NonBlockingSystemUpdateCondition.class)
public class BackfillDataProcessInstancesConfig {
@Bean
public NonBlockingSystemUpgrade backfillProcessInstancesHasRunEvents(
final OperationContext opContext,
EntityService<?> entityService,
ElasticSearchService elasticSearchService,
RestHighLevelClient restHighLevelClient,
@Value("${systemUpdate.processInstanceHasRunEvents.enabled}") final boolean enabled,
@Value("${systemUpdate.processInstanceHasRunEvents.reprocess.enabled}")
boolean reprocessEnabled,
@Value("${systemUpdate.processInstanceHasRunEvents.batchSize}") final Integer batchSize,
@Value("${systemUpdate.processInstanceHasRunEvents.delayMs}") final Integer delayMs,
@Value("${systemUpdate.processInstanceHasRunEvents.totalDays}") Integer totalDays,
@Value("${systemUpdate.processInstanceHasRunEvents.windowDays}") Integer windowDays) {
return new BackfillDataProcessInstances(
opContext,
entityService,
elasticSearchService,
restHighLevelClient,
enabled,
reprocessEnabled,
batchSize,
delayMs,
totalDays,
windowDays);
}
}

View File

@ -0,0 +1,54 @@
package com.linkedin.datahub.upgrade.system.dataprocessinstances;
import com.google.common.collect.ImmutableList;
import com.linkedin.datahub.upgrade.UpgradeStep;
import com.linkedin.datahub.upgrade.system.NonBlockingSystemUpgrade;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.search.elasticsearch.ElasticSearchService;
import io.datahubproject.metadata.context.OperationContext;
import java.util.List;
import org.opensearch.client.RestHighLevelClient;
public class BackfillDataProcessInstances implements NonBlockingSystemUpgrade {
private final List<UpgradeStep> _steps;
public BackfillDataProcessInstances(
OperationContext opContext,
EntityService<?> entityService,
ElasticSearchService elasticSearchService,
RestHighLevelClient restHighLevelClient,
boolean enabled,
boolean reprocessEnabled,
Integer batchSize,
Integer batchDelayMs,
Integer totalDays,
Integer windowDays) {
if (enabled) {
_steps =
ImmutableList.of(
new BackfillDataProcessInstancesHasRunEventsStep(
opContext,
entityService,
elasticSearchService,
restHighLevelClient,
reprocessEnabled,
batchSize,
batchDelayMs,
totalDays,
windowDays));
} else {
_steps = ImmutableList.of();
}
}
@Override
public String id() {
return "BackfillDataProcessInstances";
}
@Override
public List<UpgradeStep> steps() {
return _steps;
}
}

View File

@ -0,0 +1,213 @@
package com.linkedin.datahub.upgrade.system.dataprocessinstances;
import static com.linkedin.metadata.Constants.*;
import com.google.common.base.Throwables;
import com.linkedin.common.urn.Urn;
import com.linkedin.datahub.upgrade.UpgradeContext;
import com.linkedin.datahub.upgrade.UpgradeStep;
import com.linkedin.datahub.upgrade.UpgradeStepResult;
import com.linkedin.datahub.upgrade.impl.DefaultUpgradeStepResult;
import com.linkedin.metadata.boot.BootstrapStep;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.search.elasticsearch.ElasticSearchService;
import com.linkedin.metadata.utils.elasticsearch.IndexConvention;
import com.linkedin.upgrade.DataHubUpgradeState;
import io.datahubproject.metadata.context.OperationContext;
import java.io.IOException;
import java.net.URISyntaxException;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.function.Function;
import lombok.extern.slf4j.Slf4j;
import org.codehaus.jackson.node.JsonNodeFactory;
import org.codehaus.jackson.node.ObjectNode;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.client.RequestOptions;
import org.opensearch.client.RestHighLevelClient;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.search.aggregations.Aggregation;
import org.opensearch.search.aggregations.AggregationBuilders;
import org.opensearch.search.aggregations.bucket.composite.CompositeAggregation;
import org.opensearch.search.aggregations.bucket.composite.CompositeAggregationBuilder;
import org.opensearch.search.aggregations.bucket.composite.TermsValuesSourceBuilder;
import org.opensearch.search.builder.SearchSourceBuilder;
@Slf4j
public class BackfillDataProcessInstancesHasRunEventsStep implements UpgradeStep {
private static final String UPGRADE_ID = "BackfillDataProcessInstancesHasRunEvents";
private static final Urn UPGRADE_ID_URN = BootstrapStep.getUpgradeUrn(UPGRADE_ID);
private final OperationContext opContext;
private final EntityService<?> entityService;
private final ElasticSearchService elasticSearchService;
private final RestHighLevelClient restHighLevelClient;
private final boolean reprocessEnabled;
private final Integer batchSize;
private final Integer batchDelayMs;
private final Integer totalDays;
private final Integer windowDays;
public BackfillDataProcessInstancesHasRunEventsStep(
OperationContext opContext,
EntityService<?> entityService,
ElasticSearchService elasticSearchService,
RestHighLevelClient restHighLevelClient,
boolean reprocessEnabled,
Integer batchSize,
Integer batchDelayMs,
Integer totalDays,
Integer windowDays) {
this.opContext = opContext;
this.entityService = entityService;
this.elasticSearchService = elasticSearchService;
this.restHighLevelClient = restHighLevelClient;
this.reprocessEnabled = reprocessEnabled;
this.batchSize = batchSize;
this.batchDelayMs = batchDelayMs;
this.totalDays = totalDays;
this.windowDays = windowDays;
}
@SuppressWarnings("BusyWait")
@Override
public Function<UpgradeContext, UpgradeStepResult> executable() {
return (context) -> {
TermsValuesSourceBuilder termsValuesSourceBuilder =
new TermsValuesSourceBuilder("urn").field("urn");
ObjectNode json = JsonNodeFactory.instance.objectNode();
json.put("hasRunEvents", true);
IndexConvention indexConvention = opContext.getSearchContext().getIndexConvention();
String runEventsIndexName =
indexConvention.getTimeseriesAspectIndexName(
DATA_PROCESS_INSTANCE_ENTITY_NAME, DATA_PROCESS_INSTANCE_RUN_EVENT_ASPECT_NAME);
DataHubUpgradeState upgradeState = DataHubUpgradeState.SUCCEEDED;
Instant now = Instant.now();
Instant overallStart = now.minus(totalDays, ChronoUnit.DAYS);
for (int i = 0; ; i++) {
Instant windowEnd = now.minus(i * windowDays, ChronoUnit.DAYS);
if (!windowEnd.isAfter(overallStart)) {
break;
}
Instant windowStart = windowEnd.minus(windowDays, ChronoUnit.DAYS);
if (windowStart.isBefore(overallStart)) {
// last iteration, cap at overallStart
windowStart = overallStart;
}
QueryBuilder queryBuilder =
QueryBuilders.boolQuery()
.must(
QueryBuilders.rangeQuery("@timestamp")
.gte(windowStart.toString())
.lt(windowEnd.toString()));
CompositeAggregationBuilder aggregationBuilder =
AggregationBuilders.composite("aggs", List.of(termsValuesSourceBuilder))
.size(batchSize);
while (true) {
SearchRequest searchRequest = new SearchRequest(runEventsIndexName);
searchRequest.source(
new SearchSourceBuilder()
.size(0)
.aggregation(aggregationBuilder)
.query(queryBuilder));
SearchResponse response;
try {
response = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
} catch (IOException e) {
log.error(Throwables.getStackTraceAsString(e));
log.error("Error querying index {}", runEventsIndexName);
upgradeState = DataHubUpgradeState.FAILED;
break;
}
List<Aggregation> aggregations = response.getAggregations().asList();
if (aggregations.isEmpty()) {
break;
}
CompositeAggregation aggregation = (CompositeAggregation) aggregations.get(0);
Set<Urn> urns = new HashSet<>();
for (CompositeAggregation.Bucket bucket : aggregation.getBuckets()) {
for (Object value : bucket.getKey().values()) {
try {
urns.add(Urn.createFromString(String.valueOf(value)));
} catch (URISyntaxException e) {
log.warn("Ignoring invalid urn {}", value);
}
}
}
if (!urns.isEmpty()) {
urns = entityService.exists(opContext, urns);
urns.forEach(
urn ->
elasticSearchService.upsertDocument(
opContext,
DATA_PROCESS_INSTANCE_ENTITY_NAME,
json.toString(),
indexConvention.getEntityDocumentId(urn)));
}
if (aggregation.afterKey() == null) {
break;
}
aggregationBuilder.aggregateAfter(aggregation.afterKey());
if (batchDelayMs > 0) {
log.info("Sleeping for {} ms", batchDelayMs);
try {
Thread.sleep(batchDelayMs);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}
BootstrapStep.setUpgradeResult(context.opContext(), UPGRADE_ID_URN, entityService);
return new DefaultUpgradeStepResult(id(), upgradeState);
};
}
@Override
public String id() {
return UPGRADE_ID;
}
/**
* Returns whether the upgrade should proceed if the step fails after exceeding the maximum
* retries.
*/
@Override
public boolean isOptional() {
return true;
}
/** Returns whether the upgrade should be skipped. */
@Override
public boolean skip(UpgradeContext context) {
if (reprocessEnabled) {
return false;
}
boolean previouslyRun =
entityService.exists(
context.opContext(), UPGRADE_ID_URN, DATA_HUB_UPGRADE_RESULT_ASPECT_NAME, true);
if (previouslyRun) {
log.info("{} was already run. Skipping.", id());
}
return previouslyRun;
}
}

View File

@ -15,6 +15,9 @@ import com.linkedin.common.Urn
record DataProcessInstanceRunEvent includes TimeseriesAspectBase, ExternalReference {
@TimeseriesField = {}
@Searchable = {
"hasValuesFieldName": "hasRunEvents"
}
status: enum DataProcessRunStatus {
/**
* The status where the Data processing run is in.

View File

@ -387,7 +387,15 @@ systemUpdate:
batchSize: ${SYSTEM_UPDATE_SCHEMA_FIELDS_DOC_IDS_BATCH_SIZE:500}
delayMs: ${SYSTEM_UPDATE_SCHEMA_FIELDS_DOC_IDS_DELAY_MS:5000}
limit: ${SYSTEM_UPDATE_SCHEMA_FIELDS_DOC_IDS_LIMIT:0}
processInstanceHasRunEvents:
enabled: ${SYSTEM_UPDATE_PROCESS_INSTANCE_HAS_RUN_EVENTS_ENABLED:true}
batchSize: ${SYSTEM_UPDATE_PROCESS_INSTANCE_HAS_RUN_EVENTS_BATCH_SIZE:100}
delayMs: ${SYSTEM_UPDATE_PROCESS_INSTANCE_HAS_RUN_EVENTS_DELAY_MS:1000}
totalDays: ${SYSTEM_UPDATE_PROCESS_INSTANCE_HAS_RUN_EVENTS_TOTAL_DAYS:90}
windowDays: ${SYSTEM_UPDATE_PROCESS_INSTANCE_HAS_RUN_EVENTS_WINDOW_DAYS:1}
reprocess:
enabled: ${SYSTEM_UPDATE_PROCESS_INSTANCE_HAS_RUN_EVENTS_REPROCESS:false}
structuredProperties:
enabled: ${ENABLE_STRUCTURED_PROPERTIES_HOOK:true} # applies structured properties mappings
writeEnabled: ${ENABLE_STRUCTURED_PROPERTIES_WRITE:true} # write structured property values