fix(bootstrap): do not re-run retention policy ingestion (#7295)

This commit is contained in:
Aseem Bansal 2023-02-10 20:56:19 +05:30 committed by GitHub
parent 97355bc805
commit a60d27b40f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 74 additions and 47 deletions

View File

@ -263,6 +263,12 @@ public class Constants {
public static final String CHANGE_EVENT_PLATFORM_EVENT_NAME = "entityChangeEvent";
/**
* Retention
*/
public static final String DATAHUB_RETENTION_ENTITY = "dataHubRetention";
public static final String DATAHUB_RETENTION_ASPECT = "dataHubRetentionConfig";
public static final String DATAHUB_RETENTION_KEY_ASPECT = "dataHubRetentionKey";
/**
* User Status
*/

View File

@ -38,9 +38,6 @@ import lombok.Value;
*/
public abstract class RetentionService {
protected static final String ALL = "*";
protected static final String DATAHUB_RETENTION_ENTITY = "dataHubRetention";
protected static final String DATAHUB_RETENTION_ASPECT = "dataHubRetentionConfig";
protected static final String DATAHUB_RETENTION_KEY_ASPECT = "dataHubRetentionKey";
protected abstract EntityService getEntityService();
@ -56,7 +53,7 @@ public abstract class RetentionService {
// Prioritized list of retention keys to fetch
List<Urn> retentionUrns = getRetentionKeys(entityName, aspectName);
Map<Urn, List<RecordTemplate>> fetchedAspects =
getEntityService().getLatestAspects(new HashSet<>(retentionUrns), ImmutableSet.of(DATAHUB_RETENTION_ASPECT));
getEntityService().getLatestAspects(new HashSet<>(retentionUrns), ImmutableSet.of(Constants.DATAHUB_RETENTION_ASPECT));
// Find the first retention info that is set among the prioritized list of retention keys above
Optional<DataHubRetentionConfig> retentionInfo = retentionUrns.stream()
.flatMap(urn -> fetchedAspects.getOrDefault(urn, Collections.emptyList())
@ -75,7 +72,7 @@ public abstract class RetentionService {
new DataHubRetentionKey().setEntityName(ALL).setAspectName(aspectName),
new DataHubRetentionKey().setEntityName(ALL).setAspectName(ALL))
.stream()
.map(key -> EntityKeyUtils.convertEntityKeyToUrn(key, DATAHUB_RETENTION_ENTITY))
.map(key -> EntityKeyUtils.convertEntityKeyToUrn(key, Constants.DATAHUB_RETENTION_ENTITY))
.collect(Collectors.toList());
}
@ -95,12 +92,12 @@ public abstract class RetentionService {
DataHubRetentionKey retentionKey = new DataHubRetentionKey();
retentionKey.setEntityName(entityName != null ? entityName : ALL);
retentionKey.setAspectName(aspectName != null ? aspectName : ALL);
Urn retentionUrn = EntityKeyUtils.convertEntityKeyToUrn(retentionKey, DATAHUB_RETENTION_ENTITY);
Urn retentionUrn = EntityKeyUtils.convertEntityKeyToUrn(retentionKey, Constants.DATAHUB_RETENTION_ENTITY);
MetadataChangeProposal keyProposal = new MetadataChangeProposal();
GenericAspect keyAspect = GenericRecordUtils.serializeAspect(retentionKey);
keyProposal.setAspect(keyAspect);
keyProposal.setAspectName(DATAHUB_RETENTION_KEY_ASPECT);
keyProposal.setEntityType(DATAHUB_RETENTION_ENTITY);
keyProposal.setAspectName(Constants.DATAHUB_RETENTION_KEY_ASPECT);
keyProposal.setEntityType(Constants.DATAHUB_RETENTION_ENTITY);
keyProposal.setChangeType(ChangeType.UPSERT);
keyProposal.setEntityUrn(retentionUrn);
AuditStamp auditStamp =
@ -109,7 +106,7 @@ public abstract class RetentionService {
MetadataChangeProposal aspectProposal = keyProposal.clone();
GenericAspect retentionAspect = GenericRecordUtils.serializeAspect(retentionConfig);
aspectProposal.setAspect(retentionAspect);
aspectProposal.setAspectName(DATAHUB_RETENTION_ASPECT);
aspectProposal.setAspectName(Constants.DATAHUB_RETENTION_ASPECT);
return getEntityService().ingestProposal(aspectProposal, auditStamp, false).isDidUpdate();
}
@ -125,7 +122,7 @@ public abstract class RetentionService {
DataHubRetentionKey retentionKey = new DataHubRetentionKey();
retentionKey.setEntityName(entityName != null ? entityName : ALL);
retentionKey.setAspectName(aspectName != null ? aspectName : ALL);
Urn retentionUrn = EntityKeyUtils.convertEntityKeyToUrn(retentionKey, DATAHUB_RETENTION_ENTITY);
Urn retentionUrn = EntityKeyUtils.convertEntityKeyToUrn(retentionKey, Constants.DATAHUB_RETENTION_ENTITY);
getEntityService().deleteUrn(retentionUrn);
}

View File

@ -20,6 +20,7 @@ import com.linkedin.retention.DataHubRetentionConfig;
import com.linkedin.retention.Retention;
import com.linkedin.retention.TimeBasedRetention;
import com.linkedin.retention.VersionBasedRetention;
import com.linkedin.metadata.Constants;
import io.opentelemetry.extension.annotations.WithSpan;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@ -37,7 +38,6 @@ import java.util.stream.Collectors;
import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.deleteFrom;
import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.literal;
import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.selectFrom;
import static com.linkedin.metadata.Constants.ASPECT_LATEST_VERSION;
@Slf4j
@RequiredArgsConstructor
@ -131,7 +131,7 @@ public class CassandraRetentionService extends RetentionService {
SimpleStatement ss = deleteFrom(CassandraAspect.TABLE_NAME)
.whereColumn(CassandraAspect.URN_COLUMN).isEqualTo(literal(urn.toString()))
.whereColumn(CassandraAspect.ASPECT_COLUMN).isEqualTo(literal(aspectName))
.whereColumn(CassandraAspect.VERSION_COLUMN).isGreaterThan(literal(ASPECT_LATEST_VERSION))
.whereColumn(CassandraAspect.VERSION_COLUMN).isGreaterThan(literal(Constants.ASPECT_LATEST_VERSION))
.whereColumn(CassandraAspect.VERSION_COLUMN).isLessThanOrEqualTo(literal(largestVersion - retention.getMaxVersions() + 1L))
.build();
@ -174,7 +174,7 @@ public class CassandraRetentionService extends RetentionService {
if (aspectName != null) {
select = select.whereColumn(CassandraAspect.ASPECT_COLUMN).isEqualTo(literal(aspectName));
}
select = select.whereColumn(CassandraAspect.VERSION_COLUMN).isGreaterThan(literal(ASPECT_LATEST_VERSION));
select = select.whereColumn(CassandraAspect.VERSION_COLUMN).isGreaterThan(literal(Constants.ASPECT_LATEST_VERSION));
if (entityName != null) {
select = select.whereColumn(CassandraAspect.ENTITY_COLUMN).isEqualTo(literal(entityName));
}
@ -187,8 +187,8 @@ public class CassandraRetentionService extends RetentionService {
private Map<String, DataHubRetentionConfig> getAllRetentionPolicies() {
SimpleStatement ss = selectFrom(CassandraAspect.TABLE_NAME)
.all()
.whereColumn(CassandraAspect.ASPECT_COLUMN).isEqualTo(literal(DATAHUB_RETENTION_ASPECT))
.whereColumn(CassandraAspect.VERSION_COLUMN).isEqualTo(literal(ASPECT_LATEST_VERSION))
.whereColumn(CassandraAspect.ASPECT_COLUMN).isEqualTo(literal(Constants.DATAHUB_RETENTION_ASPECT))
.whereColumn(CassandraAspect.VERSION_COLUMN).isEqualTo(literal(Constants.ASPECT_LATEST_VERSION))
.allowFiltering()
.build();
ResultSet rs = _cqlSession.execute(ss);

View File

@ -10,6 +10,7 @@ import com.linkedin.retention.DataHubRetentionConfig;
import com.linkedin.retention.Retention;
import com.linkedin.retention.TimeBasedRetention;
import com.linkedin.retention.VersionBasedRetention;
import com.linkedin.metadata.Constants;
import io.ebean.EbeanServer;
import io.ebean.Expression;
import io.ebean.ExpressionList;
@ -32,8 +33,6 @@ import javax.annotation.Nullable;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import static com.linkedin.metadata.Constants.ASPECT_LATEST_VERSION;
@Slf4j
@RequiredArgsConstructor
@ -62,7 +61,7 @@ public class EbeanRetentionService extends RetentionService {
.where()
.eq(EbeanAspectV2.URN_COLUMN, urn.toString())
.eq(EbeanAspectV2.ASPECT_COLUMN, aspectName)
.ne(EbeanAspectV2.VERSION_COLUMN, ASPECT_LATEST_VERSION)
.ne(EbeanAspectV2.VERSION_COLUMN, Constants.ASPECT_LATEST_VERSION)
.or();
List<Expression> filterList = new ArrayList<>();
@ -243,8 +242,8 @@ public class EbeanRetentionService extends RetentionService {
.select(String.format("%s, %s, %s", EbeanAspectV2.URN_COLUMN, EbeanAspectV2.ASPECT_COLUMN,
EbeanAspectV2.METADATA_COLUMN))
.where()
.eq(EbeanAspectV2.ASPECT_COLUMN, DATAHUB_RETENTION_ASPECT)
.eq(EbeanAspectV2.VERSION_COLUMN, ASPECT_LATEST_VERSION)
.eq(EbeanAspectV2.ASPECT_COLUMN, Constants.DATAHUB_RETENTION_ASPECT)
.eq(EbeanAspectV2.VERSION_COLUMN, Constants.ASPECT_LATEST_VERSION)
.findList()
.stream()
.collect(Collectors.toMap(EbeanAspectV2::getUrn,

View File

@ -1,6 +1,18 @@
package com.linkedin.metadata.boot;
import com.linkedin.common.AuditStamp;
import com.linkedin.common.urn.Urn;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.Constants;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.key.DataHubUpgradeKey;
import com.linkedin.metadata.utils.EntityKeyUtils;
import com.linkedin.metadata.utils.GenericRecordUtils;
import com.linkedin.mxe.MetadataChangeProposal;
import com.linkedin.upgrade.DataHubUpgradeResult;
import javax.annotation.Nonnull;
import java.net.URISyntaxException;
/**
@ -32,4 +44,25 @@ public interface BootstrapStep {
// Start the step asynchronously without waiting for it to end
ASYNC,
}
static Urn getUpgradeUrn(String upgradeId) {
return EntityKeyUtils.convertEntityKeyToUrn(new DataHubUpgradeKey().setId(upgradeId),
Constants.DATA_HUB_UPGRADE_ENTITY_NAME);
}
static void setUpgradeResult(Urn urn, EntityService entityService) throws URISyntaxException {
final AuditStamp auditStamp = new AuditStamp()
.setActor(Urn.createFromString(Constants.SYSTEM_ACTOR))
.setTime(System.currentTimeMillis());
final DataHubUpgradeResult upgradeResult = new DataHubUpgradeResult()
.setTimestampMs(System.currentTimeMillis());
final MetadataChangeProposal upgradeProposal = new MetadataChangeProposal();
upgradeProposal.setEntityUrn(urn);
upgradeProposal.setEntityType(Constants.DATA_HUB_UPGRADE_ENTITY_NAME);
upgradeProposal.setAspectName(Constants.DATA_HUB_UPGRADE_RESULT_ASPECT_NAME);
upgradeProposal.setAspect(GenericRecordUtils.serializeAspect(upgradeResult));
upgradeProposal.setChangeType(ChangeType.UPSERT);
entityService.ingestProposal(upgradeProposal, auditStamp, false);
}
}

View File

@ -3,6 +3,7 @@ package com.linkedin.metadata.boot.factories;
import com.linkedin.gms.factory.entity.RetentionServiceFactory;
import com.linkedin.gms.factory.spring.YamlPropertySourceFactory;
import com.linkedin.metadata.boot.steps.IngestRetentionPoliciesStep;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.entity.RetentionService;
import javax.annotation.Nonnull;
import org.springframework.beans.factory.annotation.Autowired;
@ -24,6 +25,10 @@ public class IngestRetentionPoliciesStepFactory {
@Qualifier("retentionService")
private RetentionService _retentionService;
@Autowired
@Qualifier("entityService")
private EntityService _entityService;
@Value("${entityService.retention.enabled}")
private Boolean _enableRetention;
@ -37,6 +42,6 @@ public class IngestRetentionPoliciesStepFactory {
@Scope("singleton")
@Nonnull
protected IngestRetentionPoliciesStep createInstance() {
return new IngestRetentionPoliciesStep(_retentionService, _enableRetention, _applyOnBootstrap, _pluginRegistryPath);
return new IngestRetentionPoliciesStep(_retentionService, _entityService, _enableRetention, _applyOnBootstrap, _pluginRegistryPath);
}
}

View File

@ -3,8 +3,10 @@ package com.linkedin.metadata.boot.steps;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import com.linkedin.common.urn.Urn;
import com.linkedin.metadata.boot.BootstrapStep;
import com.datahub.util.RecordUtils;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.entity.RetentionService;
import com.linkedin.metadata.key.DataHubRetentionKey;
import com.linkedin.retention.DataHubRetentionConfig;
@ -25,11 +27,14 @@ import org.springframework.core.io.ClassPathResource;
public class IngestRetentionPoliciesStep implements BootstrapStep {
private final RetentionService _retentionService;
private final EntityService _entityService;
private final boolean _enableRetention;
private final boolean _applyOnBootstrap;
private final String pluginPath;
private static final ObjectMapper YAML_MAPPER = new ObjectMapper(new YAMLFactory());
private static final String UPGRADE_ID = "ingest-retention-policies";
private static final Urn UPGRADE_ID_URN = BootstrapStep.getUpgradeUrn(UPGRADE_ID);
@Nonnull
@Override
@ -45,6 +50,10 @@ public class IngestRetentionPoliciesStep implements BootstrapStep {
@Override
public void execute() throws IOException, URISyntaxException {
// 0. Execute preflight check to see whether we need to ingest policies
if (_entityService.exists(UPGRADE_ID_URN)) {
log.info("Retention was applied. Skipping.");
return;
}
log.info("Ingesting default retention...");
// If retention is disabled, skip step
@ -74,6 +83,8 @@ public class IngestRetentionPoliciesStep implements BootstrapStep {
log.info("Applying policies to all records");
_retentionService.batchApplyRetention(null, null);
}
BootstrapStep.setUpgradeResult(UPGRADE_ID_URN, _entityService);
}
// Parse input yaml file or yaml files in the input directory to generate a retention policy map

View File

@ -1,18 +1,9 @@
package com.linkedin.metadata.boot.steps;
import com.linkedin.common.AuditStamp;
import com.linkedin.common.urn.Urn;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.gms.factory.telemetry.TelemetryUtils;
import com.linkedin.metadata.Constants;
import com.linkedin.metadata.boot.BootstrapStep;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.key.DataHubUpgradeKey;
import com.linkedin.metadata.utils.EntityKeyUtils;
import com.linkedin.metadata.utils.GenericRecordUtils;
import com.linkedin.mxe.MetadataChangeProposal;
import com.linkedin.upgrade.DataHubUpgradeResult;
import java.util.HashMap;
import javax.annotation.Nonnull;
import lombok.RequiredArgsConstructor;
@ -25,11 +16,9 @@ public class RemoveClientIdAspectStep implements BootstrapStep {
private final EntityService _entityService;
private static final String VERSION = "0";
private static final String UPGRADE_ID = "remove-unknown-aspects";
private static final String INVALID_TELEMETRY_ASPECT_NAME = "clientId";
private static final Urn REMOVE_UNKNOWN_ASPECTS_URN =
EntityKeyUtils.convertEntityKeyToUrn(new DataHubUpgradeKey().setId(UPGRADE_ID), Constants.DATA_HUB_UPGRADE_ENTITY_NAME);
private static final Urn REMOVE_UNKNOWN_ASPECTS_URN = BootstrapStep.getUpgradeUrn(UPGRADE_ID);
@Override
public String name() {
@ -46,9 +35,7 @@ public class RemoveClientIdAspectStep implements BootstrapStep {
// Remove invalid telemetry aspect
_entityService.deleteAspect(TelemetryUtils.CLIENT_ID_URN, INVALID_TELEMETRY_ASPECT_NAME, new HashMap<>(), true);
final AuditStamp auditStamp = new AuditStamp().setActor(Urn.createFromString(Constants.SYSTEM_ACTOR)).setTime(System.currentTimeMillis());
final DataHubUpgradeResult upgradeResult = new DataHubUpgradeResult().setTimestampMs(System.currentTimeMillis());
ingestUpgradeAspect(Constants.DATA_HUB_UPGRADE_RESULT_ASPECT_NAME, upgradeResult, auditStamp);
BootstrapStep.setUpgradeResult(REMOVE_UNKNOWN_ASPECTS_URN, _entityService);
} catch (Exception e) {
log.error("Error when running the RemoveUnknownAspects Bootstrap Step", e);
_entityService.deleteUrn(REMOVE_UNKNOWN_ASPECTS_URN);
@ -62,15 +49,4 @@ public class RemoveClientIdAspectStep implements BootstrapStep {
return ExecutionMode.ASYNC;
}
private void ingestUpgradeAspect(String aspectName, RecordTemplate aspect, AuditStamp auditStamp) {
final MetadataChangeProposal upgradeProposal = new MetadataChangeProposal();
upgradeProposal.setEntityUrn(REMOVE_UNKNOWN_ASPECTS_URN);
upgradeProposal.setEntityType(Constants.DATA_HUB_UPGRADE_ENTITY_NAME);
upgradeProposal.setAspectName(aspectName);
upgradeProposal.setAspect(GenericRecordUtils.serializeAspect(aspect));
upgradeProposal.setChangeType(ChangeType.UPSERT);
_entityService.ingestProposal(upgradeProposal, auditStamp, false);
}
}