feat(bootstrap): bootstrap template mcps (#11518)

This commit is contained in:
david-leifker 2024-10-04 11:57:42 -05:00 committed by GitHub
parent 73d8a4649d
commit 04349cb9cd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
58 changed files with 1969 additions and 1585 deletions

View File

@ -276,6 +276,7 @@ project.ext.externalDependency = [
'annotationApi': 'javax.annotation:javax.annotation-api:1.3.2',
'jakartaAnnotationApi': 'jakarta.annotation:jakarta.annotation-api:3.0.0',
'classGraph': 'io.github.classgraph:classgraph:4.8.172',
'mustache': 'com.github.spullara.mustache.java:compiler:0.9.14'
]
allprojects {

View File

@ -19,6 +19,7 @@ dependencies {
implementation project(':metadata-dao-impl:kafka-producer')
implementation externalDependency.charle
implementation externalDependency.mustache
implementation externalDependency.javaxInject
implementation(externalDependency.hadoopClient) {
exclude group: 'net.minidev', module: 'json-smart'
@ -83,6 +84,7 @@ dependencies {
testImplementation externalDependency.springBootTest
testImplementation externalDependency.mockito
testImplementation externalDependency.testng
testImplementation 'uk.org.webcompere:system-stubs-testng:2.1.7'
testRuntimeOnly externalDependency.logbackClassic
constraints {

View File

@ -8,7 +8,7 @@ import javax.annotation.Nonnull;
public interface UpgradeManager {
/** Register an {@link Upgrade} with the manaager. */
void register(Upgrade upgrade);
UpgradeManager register(Upgrade upgrade);
/** Kick off an {@link Upgrade} by identifier. */
UpgradeResult execute(

View File

@ -0,0 +1,30 @@
package com.linkedin.datahub.upgrade.config;
import com.linkedin.datahub.upgrade.system.bootstrapmcps.BootstrapMCP;
import com.linkedin.metadata.entity.EntityService;
import io.datahubproject.metadata.context.OperationContext;
import java.io.IOException;
import javax.annotation.Nonnull;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class BootstrapMCPConfig {
@Nonnull
@Value("${systemUpdate.bootstrap.mcpConfig}")
private String bootstrapMCPConfig;
@Bean(name = "bootstrapMCPNonBlocking")
public BootstrapMCP bootstrapMCPNonBlocking(
final OperationContext opContext, EntityService<?> entityService) throws IOException {
return new BootstrapMCP(opContext, bootstrapMCPConfig, entityService, false);
}
@Bean(name = "bootstrapMCPBlocking")
public BootstrapMCP bootstrapMCPBlocking(
final OperationContext opContext, EntityService<?> entityService) throws IOException {
return new BootstrapMCP(opContext, bootstrapMCPConfig, entityService, true);
}
}

View File

@ -6,6 +6,7 @@ import com.linkedin.datahub.upgrade.system.NonBlockingSystemUpgrade;
import com.linkedin.datahub.upgrade.system.SystemUpdate;
import com.linkedin.datahub.upgrade.system.SystemUpdateBlocking;
import com.linkedin.datahub.upgrade.system.SystemUpdateNonBlocking;
import com.linkedin.datahub.upgrade.system.bootstrapmcps.BootstrapMCP;
import com.linkedin.datahub.upgrade.system.elasticsearch.steps.DataHubStartupStep;
import com.linkedin.gms.factory.config.ConfigurationProvider;
import com.linkedin.gms.factory.kafka.DataHubKafkaProducerFactory;
@ -31,6 +32,7 @@ import io.datahubproject.metadata.context.ServicesRegistryContext;
import io.datahubproject.metadata.services.RestrictedService;
import java.util.List;
import javax.annotation.Nonnull;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.generic.IndexedRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
@ -54,21 +56,31 @@ public class SystemUpdateConfig {
public SystemUpdate systemUpdate(
final List<BlockingSystemUpgrade> blockingSystemUpgrades,
final List<NonBlockingSystemUpgrade> nonBlockingSystemUpgrades,
final DataHubStartupStep dataHubStartupStep) {
return new SystemUpdate(blockingSystemUpgrades, nonBlockingSystemUpgrades, dataHubStartupStep);
final DataHubStartupStep dataHubStartupStep,
@Qualifier("bootstrapMCPBlocking") @NonNull final BootstrapMCP bootstrapMCPBlocking,
@Qualifier("bootstrapMCPNonBlocking") @NonNull final BootstrapMCP bootstrapMCPNonBlocking) {
return new SystemUpdate(
blockingSystemUpgrades,
nonBlockingSystemUpgrades,
dataHubStartupStep,
bootstrapMCPBlocking,
bootstrapMCPNonBlocking);
}
@Bean(name = "systemUpdateBlocking")
public SystemUpdateBlocking systemUpdateBlocking(
final List<BlockingSystemUpgrade> blockingSystemUpgrades,
final DataHubStartupStep dataHubStartupStep) {
return new SystemUpdateBlocking(blockingSystemUpgrades, List.of(), dataHubStartupStep);
final DataHubStartupStep dataHubStartupStep,
@Qualifier("bootstrapMCPBlocking") @NonNull final BootstrapMCP bootstrapMCPBlocking) {
return new SystemUpdateBlocking(
blockingSystemUpgrades, dataHubStartupStep, bootstrapMCPBlocking);
}
@Bean(name = "systemUpdateNonBlocking")
public SystemUpdateNonBlocking systemUpdateNonBlocking(
final List<NonBlockingSystemUpgrade> nonBlockingSystemUpgrades) {
return new SystemUpdateNonBlocking(List.of(), nonBlockingSystemUpgrades, null);
final List<NonBlockingSystemUpgrade> nonBlockingSystemUpgrades,
@Qualifier("bootstrapMCPNonBlocking") @NonNull final BootstrapMCP bootstrapMCPNonBlocking) {
return new SystemUpdateNonBlocking(nonBlockingSystemUpgrades, bootstrapMCPNonBlocking);
}
@Value("#{systemEnvironment['DATAHUB_REVISION'] ?: '0'}")

View File

@ -24,7 +24,7 @@ public class DefaultUpgradeContext implements UpgradeContext {
private final List<String> args;
private final Map<String, Optional<String>> parsedArgs;
DefaultUpgradeContext(
public DefaultUpgradeContext(
@Nonnull OperationContext opContext,
Upgrade upgrade,
UpgradeReport report,

View File

@ -26,8 +26,9 @@ public class DefaultUpgradeManager implements UpgradeManager {
private final Map<String, Upgrade> _upgrades = new HashMap<>();
@Override
public void register(@Nonnull Upgrade upgrade) {
public UpgradeManager register(@Nonnull Upgrade upgrade) {
_upgrades.put(upgrade.id(), upgrade);
return this;
}
@Override

View File

@ -3,6 +3,7 @@ package com.linkedin.datahub.upgrade.system;
import com.linkedin.datahub.upgrade.Upgrade;
import com.linkedin.datahub.upgrade.UpgradeCleanupStep;
import com.linkedin.datahub.upgrade.UpgradeStep;
import com.linkedin.datahub.upgrade.system.bootstrapmcps.BootstrapMCP;
import com.linkedin.datahub.upgrade.system.elasticsearch.steps.DataHubStartupStep;
import java.util.LinkedList;
import java.util.List;
@ -22,7 +23,9 @@ public class SystemUpdate implements Upgrade {
public SystemUpdate(
@NonNull final List<BlockingSystemUpgrade> blockingSystemUpgrades,
@NonNull final List<NonBlockingSystemUpgrade> nonBlockingSystemUpgrades,
@Nullable final DataHubStartupStep dataHubStartupStep) {
@Nullable final DataHubStartupStep dataHubStartupStep,
@Nullable final BootstrapMCP bootstrapMCPBlocking,
@Nullable final BootstrapMCP bootstrapMCPNonBlocking) {
steps = new LinkedList<>();
cleanupSteps = new LinkedList<>();
@ -32,11 +35,23 @@ public class SystemUpdate implements Upgrade {
cleanupSteps.addAll(
blockingSystemUpgrades.stream().flatMap(up -> up.cleanupSteps().stream()).toList());
// bootstrap blocking only
if (bootstrapMCPBlocking != null) {
steps.addAll(bootstrapMCPBlocking.steps());
cleanupSteps.addAll(bootstrapMCPBlocking.cleanupSteps());
}
// emit system update message if blocking upgrade(s) present
if (dataHubStartupStep != null && !blockingSystemUpgrades.isEmpty()) {
steps.add(dataHubStartupStep);
}
// bootstrap non-blocking only
if (bootstrapMCPNonBlocking != null) {
steps.addAll(bootstrapMCPNonBlocking.steps());
cleanupSteps.addAll(bootstrapMCPNonBlocking.cleanupSteps());
}
// add non-blocking upgrades last
steps.addAll(nonBlockingSystemUpgrades.stream().flatMap(up -> up.steps().stream()).toList());
cleanupSteps.addAll(

View File

@ -1,16 +1,16 @@
package com.linkedin.datahub.upgrade.system;
import com.linkedin.datahub.upgrade.system.bootstrapmcps.BootstrapMCP;
import com.linkedin.datahub.upgrade.system.elasticsearch.steps.DataHubStartupStep;
import java.util.List;
import lombok.NonNull;
import org.jetbrains.annotations.Nullable;
public class SystemUpdateBlocking extends SystemUpdate {
public SystemUpdateBlocking(
@NonNull List<BlockingSystemUpgrade> blockingSystemUpgrades,
@NonNull List<NonBlockingSystemUpgrade> nonBlockingSystemUpgrades,
@Nullable DataHubStartupStep dataHubStartupStep) {
super(blockingSystemUpgrades, nonBlockingSystemUpgrades, dataHubStartupStep);
@NonNull DataHubStartupStep dataHubStartupStep,
@NonNull final BootstrapMCP bootstrapMCPBlocking) {
super(blockingSystemUpgrades, List.of(), dataHubStartupStep, bootstrapMCPBlocking, null);
}
}

View File

@ -1,16 +1,14 @@
package com.linkedin.datahub.upgrade.system;
import com.linkedin.datahub.upgrade.system.elasticsearch.steps.DataHubStartupStep;
import com.linkedin.datahub.upgrade.system.bootstrapmcps.BootstrapMCP;
import java.util.List;
import lombok.NonNull;
import org.jetbrains.annotations.Nullable;
public class SystemUpdateNonBlocking extends SystemUpdate {
public SystemUpdateNonBlocking(
@NonNull List<BlockingSystemUpgrade> blockingSystemUpgrades,
@NonNull List<NonBlockingSystemUpgrade> nonBlockingSystemUpgrades,
@Nullable DataHubStartupStep dataHubStartupStep) {
super(blockingSystemUpgrades, nonBlockingSystemUpgrades, dataHubStartupStep);
final BootstrapMCP bootstrapMCPNonBlocking) {
super(List.of(), nonBlockingSystemUpgrades, null, null, bootstrapMCPNonBlocking);
}
}

View File

@ -0,0 +1,38 @@
package com.linkedin.datahub.upgrade.system.bootstrapmcps;
import com.google.common.collect.ImmutableList;
import com.linkedin.datahub.upgrade.Upgrade;
import com.linkedin.datahub.upgrade.UpgradeStep;
import com.linkedin.metadata.entity.EntityService;
import io.datahubproject.metadata.context.OperationContext;
import java.io.IOException;
import java.util.List;
import javax.annotation.Nullable;
public class BootstrapMCP implements Upgrade {
private final List<UpgradeStep> _steps;
public BootstrapMCP(
OperationContext opContext,
@Nullable String bootstrapMCPConfig,
EntityService<?> entityService,
boolean isBlocking)
throws IOException {
if (bootstrapMCPConfig != null && !bootstrapMCPConfig.isEmpty()) {
_steps =
BootstrapMCPUtil.generateSteps(opContext, isBlocking, bootstrapMCPConfig, entityService);
} else {
_steps = ImmutableList.of();
}
}
@Override
public String id() {
return getClass().getSimpleName();
}
@Override
public List<UpgradeStep> steps() {
return _steps;
}
}

View File

@ -0,0 +1,95 @@
package com.linkedin.datahub.upgrade.system.bootstrapmcps;
import static com.linkedin.metadata.Constants.DATA_HUB_UPGRADE_RESULT_ASPECT_NAME;
import com.linkedin.common.urn.Urn;
import com.linkedin.datahub.upgrade.UpgradeContext;
import com.linkedin.datahub.upgrade.UpgradeStep;
import com.linkedin.datahub.upgrade.UpgradeStepResult;
import com.linkedin.datahub.upgrade.impl.DefaultUpgradeStepResult;
import com.linkedin.datahub.upgrade.system.bootstrapmcps.model.BootstrapMCPConfigFile;
import com.linkedin.metadata.aspect.batch.AspectsBatch;
import com.linkedin.metadata.boot.BootstrapStep;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.upgrade.DataHubUpgradeState;
import io.datahubproject.metadata.context.OperationContext;
import java.io.IOException;
import java.util.List;
import java.util.function.Function;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
/**
* This bootstrap step is responsible for upgrading DataHub policy documents with new searchable
* fields in ES
*/
@Slf4j
public class BootstrapMCPStep implements UpgradeStep {
private final String upgradeId;
private final Urn upgradeIdUrn;
private final OperationContext opContext;
private final EntityService<?> entityService;
@Getter private final BootstrapMCPConfigFile.MCPTemplate mcpTemplate;
public BootstrapMCPStep(
OperationContext opContext,
EntityService<?> entityService,
BootstrapMCPConfigFile.MCPTemplate mcpTemplate) {
this.opContext = opContext;
this.entityService = entityService;
this.mcpTemplate = mcpTemplate;
this.upgradeId =
String.join("-", List.of("bootstrap", mcpTemplate.getName(), mcpTemplate.getVersion()));
this.upgradeIdUrn = BootstrapStep.getUpgradeUrn(this.upgradeId);
}
@Override
public String id() {
return upgradeId;
}
@Override
public Function<UpgradeContext, UpgradeStepResult> executable() {
return (context) -> {
try {
AspectsBatch batch = BootstrapMCPUtil.generateAspectBatch(opContext, mcpTemplate);
log.info("Ingesting {} MCPs", batch.getItems().size());
entityService.ingestProposal(opContext, batch, mcpTemplate.isAsync());
} catch (IOException e) {
log.error("Error bootstrapping MCPs", e);
return new DefaultUpgradeStepResult(id(), DataHubUpgradeState.FAILED);
}
BootstrapStep.setUpgradeResult(context.opContext(), upgradeIdUrn, entityService);
return new DefaultUpgradeStepResult(id(), DataHubUpgradeState.SUCCEEDED);
};
}
/**
* Returns whether the upgrade should proceed if the step fails after exceeding the maximum
* retries.
*/
@Override
public boolean isOptional() {
return mcpTemplate.isOptional();
}
/** Returns whether the upgrade should be skipped. */
@Override
public boolean skip(UpgradeContext context) {
if (!mcpTemplate.isForce()) {
boolean previouslyRun =
entityService.exists(
context.opContext(), upgradeIdUrn, DATA_HUB_UPGRADE_RESULT_ASPECT_NAME, true);
if (previouslyRun) {
log.info("{} was already run. Skipping.", id());
}
return previouslyRun;
} else {
log.info("{} forced run.", id());
return false;
}
}
}

View File

@ -0,0 +1,183 @@
package com.linkedin.datahub.upgrade.system.bootstrapmcps;
import static com.linkedin.metadata.Constants.INGESTION_INFO_ASPECT_NAME;
import com.datahub.util.RecordUtils;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.github.mustachejava.DefaultMustacheFactory;
import com.github.mustachejava.Mustache;
import com.github.mustachejava.MustacheFactory;
import com.linkedin.common.AuditStamp;
import com.linkedin.datahub.upgrade.UpgradeStep;
import com.linkedin.datahub.upgrade.system.bootstrapmcps.model.BootstrapMCPConfigFile;
import com.linkedin.metadata.aspect.batch.AspectsBatch;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.entity.ebean.batch.AspectsBatchImpl;
import com.linkedin.metadata.utils.AuditStampUtils;
import com.linkedin.metadata.utils.GenericRecordUtils;
import com.linkedin.mxe.GenericAspect;
import com.linkedin.mxe.MetadataChangeProposal;
import io.datahubproject.metadata.context.OperationContext;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.StringReader;
import java.io.StringWriter;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.IOUtils;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.FileSystemResource;
@Slf4j
public class BootstrapMCPUtil {
static final MustacheFactory MUSTACHE_FACTORY = new DefaultMustacheFactory();
private BootstrapMCPUtil() {}
static List<UpgradeStep> generateSteps(
@Nonnull OperationContext opContext,
boolean isBlocking,
@Nonnull String bootstrapMCPConfig,
@Nonnull EntityService<?> entityService)
throws IOException {
List<UpgradeStep> steps =
resolveYamlConf(opContext, bootstrapMCPConfig, BootstrapMCPConfigFile.class)
.getBootstrap()
.getTemplates()
.stream()
.filter(cfg -> cfg.isBlocking() == isBlocking)
.map(cfg -> new BootstrapMCPStep(opContext, entityService, cfg))
.collect(Collectors.toList());
log.info(
"Generated {} {} BootstrapMCP steps",
steps.size(),
isBlocking ? "blocking" : "non-blocking");
return steps;
}
static AspectsBatch generateAspectBatch(
OperationContext opContext, BootstrapMCPConfigFile.MCPTemplate mcpTemplate)
throws IOException {
final AuditStamp auditStamp = AuditStampUtils.createDefaultAuditStamp();
List<MetadataChangeProposal> mcps =
resolveMCPTemplate(opContext, mcpTemplate, auditStamp).stream()
.map(
mcpObjectNode -> {
ObjectNode aspect = (ObjectNode) mcpObjectNode.remove("aspect");
MetadataChangeProposal mcp =
opContext
.getObjectMapper()
.convertValue(mcpObjectNode, MetadataChangeProposal.class);
try {
String jsonAspect =
opContext
.getObjectMapper()
.writeValueAsString(
convenienceConversions(opContext, mcp.getAspectName(), aspect));
GenericAspect genericAspect = GenericRecordUtils.serializeAspect(jsonAspect);
mcp.setAspect(genericAspect);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
return mcp;
})
.collect(Collectors.toList());
return AspectsBatchImpl.builder()
.mcps(mcps, auditStamp, opContext.getRetrieverContext().get())
.retrieverContext(opContext.getRetrieverContext().get())
.build();
}
static List<ObjectNode> resolveMCPTemplate(
OperationContext opContext,
BootstrapMCPConfigFile.MCPTemplate mcpTemplate,
AuditStamp auditStamp)
throws IOException {
String template = loadTemplate(mcpTemplate.getMcps_location());
Mustache mustache = MUSTACHE_FACTORY.compile(new StringReader(template), mcpTemplate.getName());
Map<String, Object> scopeValues = resolveValues(opContext, mcpTemplate, auditStamp);
StringWriter writer = new StringWriter();
mustache.execute(writer, scopeValues);
return opContext.getYamlMapper().readValue(writer.toString(), new TypeReference<>() {});
}
static Map<String, Object> resolveValues(
OperationContext opContext,
BootstrapMCPConfigFile.MCPTemplate mcpTemplate,
AuditStamp auditStamp)
throws IOException {
final Map<String, Object> scopeValues = new HashMap<>();
// built-in
scopeValues.put("auditStamp", RecordUtils.toJsonString(auditStamp));
if (mcpTemplate.getValues_env() != null
&& !mcpTemplate.getValues_env().isEmpty()
&& System.getenv().containsKey(mcpTemplate.getValues_env())) {
String envValue = System.getenv(mcpTemplate.getValues_env());
scopeValues.putAll(opContext.getObjectMapper().readValue(envValue, new TypeReference<>() {}));
}
return scopeValues;
}
private static String loadTemplate(String source) throws IOException {
log.info("Loading MCP template {}", source);
try (InputStream stream = new ClassPathResource(source).getInputStream()) {
log.info("Found in classpath: {}", source);
return IOUtils.toString(stream, StandardCharsets.UTF_8);
} catch (FileNotFoundException e) {
log.info("{} was NOT found in the classpath.", source);
try (InputStream stream = new FileSystemResource(source).getInputStream()) {
log.info("Found in filesystem: {}", source);
return IOUtils.toString(stream, StandardCharsets.UTF_8);
} catch (Exception e2) {
throw new IllegalArgumentException(String.format("Could not resolve %s", source));
}
}
}
static <T> T resolveYamlConf(OperationContext opContext, String source, Class<T> clazz)
throws IOException {
log.info("Resolving {} to {}", source, clazz.getSimpleName());
try (InputStream stream = new ClassPathResource(source).getInputStream()) {
log.info("Found in classpath: {}", source);
return opContext.getYamlMapper().readValue(stream, clazz);
} catch (FileNotFoundException e) {
log.info("{} was NOT found in the classpath.", source);
try (InputStream stream = new FileSystemResource(source).getInputStream()) {
log.info("Found in filesystem: {}", source);
return opContext.getYamlMapper().readValue(stream, clazz);
} catch (Exception e2) {
throw new IllegalArgumentException(String.format("Could not resolve %s", source));
}
}
}
private static ObjectNode convenienceConversions(
OperationContext opContext, String aspectName, ObjectNode aspectObjectNode)
throws JsonProcessingException {
if (INGESTION_INFO_ASPECT_NAME.equals(aspectName)) {
ObjectNode config = (ObjectNode) aspectObjectNode.get("config");
ObjectNode recipe = (ObjectNode) config.remove("recipe");
config.put("recipe", opContext.getObjectMapper().writeValueAsString(recipe));
}
return aspectObjectNode;
}
}

View File

@ -0,0 +1,40 @@
package com.linkedin.datahub.upgrade.system.bootstrapmcps.model;
import java.util.List;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
@AllArgsConstructor
@NoArgsConstructor
@Data
@Builder
public class BootstrapMCPConfigFile {
private Bootstrap bootstrap;
@AllArgsConstructor
@NoArgsConstructor
@Data
@Builder
public static class Bootstrap {
private List<MCPTemplate> templates;
}
@AllArgsConstructor
@NoArgsConstructor
@Data
@Builder
public static class MCPTemplate {
@Nonnull private String name;
@Nonnull private String version;
@Builder.Default private boolean force = false;
@Builder.Default private boolean blocking = false;
@Builder.Default private boolean async = true;
@Builder.Default private boolean optional = false;
@Nonnull private String mcps_location;
@Nullable private String values_env;
}
}

View File

@ -0,0 +1,48 @@
package com.linkedin.datahub.upgrade;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.AssertJUnit.assertNotNull;
import com.linkedin.datahub.upgrade.system.SystemUpdateBlocking;
import com.linkedin.datahub.upgrade.system.bootstrapmcps.BootstrapMCPStep;
import java.util.List;
import java.util.stream.Collectors;
import javax.inject.Named;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.testng.AbstractTestNGSpringContextTests;
import org.testng.annotations.Test;
@ActiveProfiles("test")
@SpringBootTest(
classes = {UpgradeCliApplication.class, UpgradeCliApplicationTestConfiguration.class},
args = {"-u", "SystemUpdateBlocking"})
public class DatahubUpgradeBlockingTest extends AbstractTestNGSpringContextTests {
@Autowired
@Named("systemUpdateBlocking")
private SystemUpdateBlocking systemUpdateBlocking;
@Test
public void testNBlockingBootstrapMCP() {
assertNotNull(systemUpdateBlocking);
List<BootstrapMCPStep> mcpTemplate =
systemUpdateBlocking.steps().stream()
.filter(update -> update instanceof BootstrapMCPStep)
.map(update -> (BootstrapMCPStep) update)
.toList();
assertFalse(mcpTemplate.isEmpty());
assertTrue(
mcpTemplate.stream().allMatch(update -> update.getMcpTemplate().isBlocking()),
String.format(
"Found non-blocking step (expected blocking only): %s",
mcpTemplate.stream()
.filter(update -> !update.getMcpTemplate().isBlocking())
.map(update -> update.getMcpTemplate().getName())
.collect(Collectors.toSet())));
}
}

View File

@ -5,10 +5,13 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.AssertJUnit.assertNotNull;
import com.linkedin.datahub.upgrade.impl.DefaultUpgradeManager;
import com.linkedin.datahub.upgrade.system.SystemUpdateNonBlocking;
import com.linkedin.datahub.upgrade.system.bootstrapmcps.BootstrapMCPStep;
import com.linkedin.datahub.upgrade.system.graph.vianodes.ReindexDataJobViaNodesCLL;
import com.linkedin.metadata.boot.kafka.MockSystemUpdateDeserializer;
import com.linkedin.metadata.boot.kafka.MockSystemUpdateSerializer;
@ -22,6 +25,7 @@ import com.linkedin.mxe.Topics;
import io.datahubproject.metadata.context.OperationContext;
import io.datahubproject.test.metadata.context.TestOperationContexts;
import java.util.List;
import java.util.stream.Collectors;
import javax.inject.Named;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@ -41,7 +45,7 @@ import org.testng.annotations.Test;
args = {"-u", "SystemUpdateNonBlocking"})
public class DatahubUpgradeNonBlockingTest extends AbstractTestNGSpringContextTests {
@Autowired(required = false)
@Autowired
@Named("systemUpdateNonBlocking")
private SystemUpdateNonBlocking systemUpdateNonBlocking;
@ -84,8 +88,7 @@ public class DatahubUpgradeNonBlockingTest extends AbstractTestNGSpringContextTe
ReindexDataJobViaNodesCLL cllUpgrade =
new ReindexDataJobViaNodesCLL(opContext, mockService, mockAspectDao, true, 10, 0, 0);
SystemUpdateNonBlocking upgrade =
new SystemUpdateNonBlocking(List.of(), List.of(cllUpgrade), null);
SystemUpdateNonBlocking upgrade = new SystemUpdateNonBlocking(List.of(cllUpgrade), null);
DefaultUpgradeManager manager = new DefaultUpgradeManager();
manager.register(upgrade);
manager.execute(
@ -101,4 +104,23 @@ public class DatahubUpgradeNonBlockingTest extends AbstractTestNGSpringContextTe
.aspectName("dataJobInputOutput")
.urnLike("urn:li:dataJob:%")));
}
@Test
public void testNonBlockingBootstrapMCP() {
List<BootstrapMCPStep> mcpTemplate =
systemUpdateNonBlocking.steps().stream()
.filter(update -> update instanceof BootstrapMCPStep)
.map(update -> (BootstrapMCPStep) update)
.toList();
assertFalse(mcpTemplate.isEmpty());
assertTrue(
mcpTemplate.stream().noneMatch(update -> update.getMcpTemplate().isBlocking()),
String.format(
"Found blocking step: %s (expected non-blocking only)",
mcpTemplate.stream()
.filter(update -> update.getMcpTemplate().isBlocking())
.map(update -> update.getMcpTemplate().getName())
.collect(Collectors.toSet())));
}
}

View File

@ -0,0 +1,224 @@
package com.linkedin.datahub.upgrade.system.bootstrapmcps;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.linkedin.common.AuditStamp;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.datahub.upgrade.system.bootstrapmcps.model.BootstrapMCPConfigFile;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.ingestion.DataHubIngestionSourceInfo;
import com.linkedin.metadata.aspect.batch.AspectsBatch;
import com.linkedin.metadata.aspect.batch.MCPItem;
import com.linkedin.metadata.utils.AuditStampUtils;
import io.datahubproject.metadata.context.OperationContext;
import io.datahubproject.test.metadata.context.TestOperationContexts;
import java.io.IOException;
import java.util.List;
import org.testng.annotations.Listeners;
import org.testng.annotations.Test;
import uk.org.webcompere.systemstubs.environment.EnvironmentVariables;
import uk.org.webcompere.systemstubs.testng.SystemStub;
import uk.org.webcompere.systemstubs.testng.SystemStubsListener;
@Listeners(SystemStubsListener.class)
public class BootstrapMCPUtilTest {
static final OperationContext OP_CONTEXT =
TestOperationContexts.systemContextNoSearchAuthorization();
private static final String DATAHUB_TEST_VALUES_ENV = "DATAHUB_TEST_VALUES_ENV";
private static final AuditStamp TEST_AUDIT_STAMP = AuditStampUtils.createDefaultAuditStamp();
@SystemStub private EnvironmentVariables environmentVariables;
@Test
public void testResolveYamlConf() throws IOException {
BootstrapMCPConfigFile initConfig =
BootstrapMCPUtil.resolveYamlConf(
OP_CONTEXT, "bootstrapmcp/test.yaml", BootstrapMCPConfigFile.class);
assertEquals(initConfig.getBootstrap().getTemplates().size(), 1);
BootstrapMCPConfigFile.MCPTemplate template = initConfig.getBootstrap().getTemplates().get(0);
assertEquals(template.getName(), "datahub-test");
assertEquals(template.getVersion(), "v10");
assertFalse(template.isForce());
assertFalse(template.isBlocking());
assertTrue(template.isAsync());
assertFalse(template.isOptional());
assertEquals(template.getMcps_location(), "bootstrapmcp/datahub-test-mcp.yaml");
assertEquals(template.getValues_env(), "DATAHUB_TEST_VALUES_ENV");
}
@Test
public void testResolveMCPTemplateDefaults() throws IOException {
environmentVariables.remove(DATAHUB_TEST_VALUES_ENV);
BootstrapMCPConfigFile.MCPTemplate template =
BootstrapMCPUtil.resolveYamlConf(
OP_CONTEXT, "bootstrapmcp/test.yaml", BootstrapMCPConfigFile.class)
.getBootstrap()
.getTemplates()
.get(0);
List<ObjectNode> mcpObjectNodes =
BootstrapMCPUtil.resolveMCPTemplate(OP_CONTEXT, template, TEST_AUDIT_STAMP);
assertEquals(mcpObjectNodes.size(), 1);
ObjectNode mcp = mcpObjectNodes.get(0);
assertEquals(mcp.get("entityType").asText(), "dataHubIngestionSource");
assertEquals(mcp.get("entityUrn").asText(), "urn:li:dataHubIngestionSource:datahub-test");
assertEquals(mcp.get("aspectName").asText(), "dataHubIngestionSourceInfo");
assertEquals(mcp.get("changeType").asText(), "UPSERT");
ObjectNode aspect = (ObjectNode) mcp.get("aspect");
assertEquals(aspect.get("type").asText(), "datahub-gc");
assertEquals(aspect.get("name").asText(), "datahub-test");
ObjectNode schedule = (ObjectNode) aspect.get("schedule");
assertEquals(schedule.get("timezone").asText(), "UTC");
assertEquals(schedule.get("interval").asText(), "0 0 * * *");
ObjectNode config = (ObjectNode) aspect.get("config");
assertTrue(config.get("extraArgs").isObject());
assertTrue(config.get("debugMode").isBoolean());
assertEquals(config.get("executorId").asText(), "default");
ObjectNode recipe = (ObjectNode) config.get("recipe");
ObjectNode source = (ObjectNode) recipe.get("source");
assertEquals(source.get("type").asText(), "datahub-gc");
ObjectNode sourceConfig = (ObjectNode) source.get("config");
assertFalse(sourceConfig.get("cleanup_expired_tokens").asBoolean());
assertTrue(sourceConfig.get("truncate_indices").asBoolean());
ObjectNode dataprocessCleanup = (ObjectNode) sourceConfig.get("dataprocess_cleanup");
assertEquals(dataprocessCleanup.get("retention_days").asInt(), 10);
assertTrue(dataprocessCleanup.get("delete_empty_data_jobs").asBoolean());
assertTrue(dataprocessCleanup.get("delete_empty_data_flows").asBoolean());
assertFalse(dataprocessCleanup.get("hard_delete_entities").asBoolean());
assertEquals(dataprocessCleanup.get("keep_last_n").asInt(), 5);
ObjectNode softDeletedEntitiesCleanup =
(ObjectNode) sourceConfig.get("soft_deleted_entities_cleanup");
assertEquals(softDeletedEntitiesCleanup.get("retention_days").asInt(), 10);
assertTrue(mcp.get("headers").isObject());
}
@Test
public void testResolveMCPTemplateOverride() throws IOException {
environmentVariables.set(
"DATAHUB_TEST_VALUES_ENV",
"{\n"
+ " \"ingestion\": {\n"
+ " \"name\": \"name-override\"\n"
+ " },\n"
+ " \"schedule\": {\n"
+ " \"timezone\": \"America/Chicago\",\n"
+ " \"interval\": \"9 9 * * *\"\n"
+ " },\n"
+ " \"cleanup_expired_tokens\": true,\n"
+ " \"truncate_indices\": false,\n"
+ " \"dataprocess_cleanup\": {\n"
+ " \"retention_days\": 99,\n"
+ " \"delete_empty_data_jobs\": false,\n"
+ " \"delete_empty_data_flows\": false,\n"
+ " \"hard_delete_entities\": true,\n"
+ " \"keep_last_n\": 50\n"
+ " },\n"
+ " \"soft_deleted_entities_cleanup\": {\n"
+ " \"retention_days\": 100\n"
+ " }\n"
+ "}");
BootstrapMCPConfigFile.MCPTemplate template =
BootstrapMCPUtil.resolveYamlConf(
OP_CONTEXT, "bootstrapmcp/test.yaml", BootstrapMCPConfigFile.class)
.getBootstrap()
.getTemplates()
.get(0);
List<ObjectNode> mcpObjectNodes =
BootstrapMCPUtil.resolveMCPTemplate(OP_CONTEXT, template, TEST_AUDIT_STAMP);
assertEquals(mcpObjectNodes.size(), 1);
ObjectNode mcp = mcpObjectNodes.get(0);
assertEquals(mcp.get("entityType").asText(), "dataHubIngestionSource");
assertEquals(mcp.get("entityUrn").asText(), "urn:li:dataHubIngestionSource:datahub-test");
assertEquals(mcp.get("aspectName").asText(), "dataHubIngestionSourceInfo");
assertEquals(mcp.get("changeType").asText(), "UPSERT");
ObjectNode aspect = (ObjectNode) mcp.get("aspect");
assertEquals(aspect.get("type").asText(), "datahub-gc");
assertEquals(aspect.get("name").asText(), "name-override");
ObjectNode schedule = (ObjectNode) aspect.get("schedule");
assertEquals(schedule.get("timezone").asText(), "America/Chicago");
assertEquals(schedule.get("interval").asText(), "9 9 * * *");
ObjectNode config = (ObjectNode) aspect.get("config");
assertTrue(config.get("extraArgs").isObject());
assertTrue(config.get("debugMode").isBoolean());
assertEquals(config.get("executorId").asText(), "default");
ObjectNode recipe = (ObjectNode) config.get("recipe");
ObjectNode source = (ObjectNode) recipe.get("source");
assertEquals(source.get("type").asText(), "datahub-gc");
ObjectNode sourceConfig = (ObjectNode) source.get("config");
assertTrue(sourceConfig.get("cleanup_expired_tokens").asBoolean());
assertFalse(sourceConfig.get("truncate_indices").asBoolean());
ObjectNode dataprocessCleanup = (ObjectNode) sourceConfig.get("dataprocess_cleanup");
assertEquals(dataprocessCleanup.get("retention_days").asInt(), 99);
assertFalse(dataprocessCleanup.get("delete_empty_data_jobs").asBoolean());
assertFalse(dataprocessCleanup.get("delete_empty_data_flows").asBoolean());
assertTrue(dataprocessCleanup.get("hard_delete_entities").asBoolean());
assertEquals(dataprocessCleanup.get("keep_last_n").asInt(), 50);
ObjectNode softDeletedEntitiesCleanup =
(ObjectNode) sourceConfig.get("soft_deleted_entities_cleanup");
assertEquals(softDeletedEntitiesCleanup.get("retention_days").asInt(), 100);
assertTrue(mcp.get("headers").isObject());
}
@Test
public void testMCPBatch() throws IOException {
environmentVariables.remove(DATAHUB_TEST_VALUES_ENV);
BootstrapMCPConfigFile.MCPTemplate template =
BootstrapMCPUtil.resolveYamlConf(
OP_CONTEXT, "bootstrapmcp/test.yaml", BootstrapMCPConfigFile.class)
.getBootstrap()
.getTemplates()
.get(0);
AspectsBatch batch = BootstrapMCPUtil.generateAspectBatch(OP_CONTEXT, template);
assertEquals(batch.getMCPItems().size(), 1);
MCPItem item = batch.getMCPItems().get(0);
assertEquals(item.getUrn(), UrnUtils.getUrn("urn:li:dataHubIngestionSource:datahub-test"));
assertEquals(item.getAspectName(), "dataHubIngestionSourceInfo");
assertEquals(item.getChangeType(), ChangeType.UPSERT);
DataHubIngestionSourceInfo ingestionSource = item.getAspect(DataHubIngestionSourceInfo.class);
assertEquals(ingestionSource.getName(), "datahub-test");
assertEquals(ingestionSource.getType(), "datahub-gc");
assertFalse(ingestionSource.getConfig().isDebugMode());
assertEquals(ingestionSource.getConfig().getExecutorId(), "default");
assertEquals(ingestionSource.getSchedule().getTimezone(), "UTC");
assertEquals(ingestionSource.getSchedule().getInterval(), "0 0 * * *");
assertEquals(
OP_CONTEXT.getObjectMapper().readTree(ingestionSource.getConfig().getRecipe()),
OP_CONTEXT
.getObjectMapper()
.readTree(
"{\"source\":{\"type\":\"datahub-gc\",\"config\":{\"cleanup_expired_tokens\":false,\"truncate_indices\":true,\"dataprocess_cleanup\":{\"retention_days\":10,\"delete_empty_data_jobs\":true,\"delete_empty_data_flows\":true,\"hard_delete_entities\":false,\"keep_last_n\":5},\"soft_deleted_entities_cleanup\":{\"retention_days\":10}}}}"));
}
}

View File

@ -0,0 +1,79 @@
package com.linkedin.datahub.upgrade.system.bootstrapmcps;
import static com.linkedin.datahub.upgrade.system.bootstrapmcps.BootstrapMCPUtilTest.OP_CONTEXT;
import static com.linkedin.metadata.Constants.*;
import static org.mockito.Mockito.*;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import com.linkedin.common.urn.Urn;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.datahub.upgrade.Upgrade;
import com.linkedin.datahub.upgrade.UpgradeManager;
import com.linkedin.datahub.upgrade.UpgradeResult;
import com.linkedin.datahub.upgrade.impl.DefaultUpgradeManager;
import com.linkedin.datatype.DataTypeInfo;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.entity.ebean.batch.AspectsBatchImpl;
import com.linkedin.upgrade.DataHubUpgradeState;
import io.datahubproject.metadata.context.OperationContext;
import java.io.IOException;
import java.util.List;
import org.testng.annotations.Test;
public class DataTypesTest {
private static final Urn TEST_DATA_TYPE_URN = UrnUtils.getUrn("urn:li:dataType:datahub.test");
@Test
public void testExecuteValidDataTypesNoExistingDataTypes() throws Exception {
final EntityService<?> entityService = mock(EntityService.class);
final UpgradeManager upgradeManager =
loadContext("bootstrapmcp_datatypes/test_valid.yaml", entityService);
// run the upgrade
upgradeManager.execute(OP_CONTEXT, "BootstrapMCP", List.of());
DataTypeInfo expectedResult = new DataTypeInfo();
expectedResult.setDescription("Test Description");
expectedResult.setDisplayName("Test Name");
expectedResult.setQualifiedName("datahub.test");
verify(entityService, times(1))
.ingestProposal(any(OperationContext.class), any(AspectsBatchImpl.class), eq(true));
}
@Test
public void testExecuteInvalidJson() throws Exception {
final EntityService<?> entityService = mock(EntityService.class);
final UpgradeManager upgradeManager =
loadContext("bootstrapmcp_datatypes/test_invalid.yaml", entityService);
UpgradeResult upgradeResult = upgradeManager.execute(OP_CONTEXT, "BootstrapMCP", List.of());
assertEquals(upgradeResult.result(), DataHubUpgradeState.FAILED);
// verify expected existence check
verify(entityService)
.exists(
any(OperationContext.class),
eq(UrnUtils.getUrn("urn:li:dataHubUpgrade:bootstrap-data-types-v1")),
eq("dataHubUpgradeResult"),
anyBoolean());
// Verify no additional interactions
verifyNoMoreInteractions(entityService);
}
private static UpgradeManager loadContext(String configFile, EntityService<?> entityService)
throws IOException {
// hasn't run
when(entityService.exists(
any(OperationContext.class), any(Urn.class), eq("dataHubUpgradeResult"), anyBoolean()))
.thenReturn(false);
Upgrade bootstrapUpgrade = new BootstrapMCP(OP_CONTEXT, configFile, entityService, false);
assertFalse(bootstrapUpgrade.steps().isEmpty());
return new DefaultUpgradeManager().register(bootstrapUpgrade);
}
}

View File

@ -0,0 +1,29 @@
- entityType: dataHubIngestionSource
entityUrn: urn:li:dataHubIngestionSource:datahub-test
aspectName: dataHubIngestionSourceInfo
changeType: UPSERT
aspect:
type: 'datahub-gc'
name: '{{ingestion.name}}{{^ingestion.name}}datahub-test{{/ingestion.name}}'
schedule:
timezone: '{{schedule.timezone}}{{^schedule.timezone}}UTC{{/schedule.timezone}}'
interval: '{{schedule.interval}}{{^schedule.interval}}0 0 * * *{{/schedule.interval}}'
config:
recipe:
source:
type: 'datahub-gc'
config:
cleanup_expired_tokens: {{cleanup_expired_tokens}}{{^cleanup_expired_tokens}}false{{/cleanup_expired_tokens}}
truncate_indices: {{truncate_indices}}{{^truncate_indices}}true{{/truncate_indices}}
dataprocess_cleanup:
retention_days: {{dataprocess_cleanup.retention_days}}{{^dataprocess_cleanup.retention_days}}10{{/dataprocess_cleanup.retention_days}}
delete_empty_data_jobs: {{dataprocess_cleanup.delete_empty_data_jobs}}{{^dataprocess_cleanup.delete_empty_data_jobs}}true{{/dataprocess_cleanup.delete_empty_data_jobs}}
delete_empty_data_flows: {{dataprocess_cleanup.delete_empty_data_flows}}{{^dataprocess_cleanup.delete_empty_data_flows}}true{{/dataprocess_cleanup.delete_empty_data_flows}}
hard_delete_entities: {{dataprocess_cleanup.hard_delete_entities}}{{^dataprocess_cleanup.hard_delete_entities}}false{{/dataprocess_cleanup.hard_delete_entities}}
keep_last_n: {{dataprocess_cleanup.keep_last_n}}{{^dataprocess_cleanup.keep_last_n}}5{{/dataprocess_cleanup.keep_last_n}}
soft_deleted_entities_cleanup:
retention_days: {{soft_deleted_entities_cleanup.retention_days}}{{^soft_deleted_entities_cleanup.retention_days}}10{{/soft_deleted_entities_cleanup.retention_days}}
extraArgs: {}
debugMode: false
executorId: default
headers: {}

View File

@ -0,0 +1,9 @@
bootstrap:
templates:
- name: datahub-test
version: v10
# force: false
# blocking: false
# async: true
mcps_location: "bootstrapmcp/datahub-test-mcp.yaml"
values_env: "DATAHUB_TEST_VALUES_ENV"

View File

@ -0,0 +1,8 @@
- entityUrn: urn:li:dataType:datahub.test
entityType: dataType
aspectName: dataTypeInfo
changeType: UPSERT
aspect:
badField:
qualifiedName: datahub.test
description: Test Description

View File

@ -0,0 +1,8 @@
- entityUrn: urn:li:dataType:datahub.test
entityType: dataType
aspectName: dataTypeInfo
changeType: UPSERT
aspect:
qualifiedName: datahub.test
displayName: Test Name
description: Test Description

View File

@ -0,0 +1,5 @@
bootstrap:
templates:
- name: data-types
version: v1
mcps_location: "bootstrapmcp_datatypes/test_data_types_invalid.yaml"

View File

@ -0,0 +1,5 @@
bootstrap:
templates:
- name: data-types
version: v1
mcps_location: "bootstrapmcp_datatypes/test_data_types_valid.yaml"

View File

@ -635,6 +635,7 @@ module.exports = {
"docs/advanced/browse-paths-upgrade",
"docs/browseV2/browse-paths-v2",
"docs/plugins",
"docs/advanced/bootstrap-mcps",
],
},
{

View File

@ -0,0 +1,157 @@
# Bootstrap MetadataChangeProposals (MCPs)
Bootstrap MCPs are templated MCPs which are loaded when the `system-update` job runs. This allows adding
entities and aspects to DataHub at install time with the ability to customize them via environment variable
overrides.
The built-in bootstrap MCP process can also be extended with custom MCPs. This can streamline deployment
scenarios where a set of standard ingestion recipes, data platforms, users groups, or other configuration
can be applied without the need for developing custom scripts.
## Process Overview
When DataHub is installed or upgraded, a job runs called `system-update`, this job is responsible for data
migration (particularly Elasticsearch indices) and ensuring the data is prepared for the next version of
DataHub. This is the job which will also apply the bootstrap MCPs.
The `system-update` job, depending on configuration, can be split into two sequences of steps. If they are
not split, then all steps are blocking.
1. An initial blocking sequence which is run prior to the new version of GMS and other components
2. Second sequence of steps where GMS and other components are allowed to run while additional data migration steps are
continued in the background
When applying bootstrap MCPs `system-update` will perform the following steps:
1. The `bootstrap_mcps.yaml` file is read, either from a default classpath location, `bootstrap_mcps.yaml`, or a filesystem location
provided by an environment variable, `SYSTEM_UPDATE_BOOTSTRAP_MCP_CONFIG`.
2. Depending on the mode of blocking or non-blocking each entry in the configuration file will be executed in sequence.
3. The template MCP file is loaded either from the classpath, or a filesystem location, and the template values are applied.
4. The rendered template MCPs are executed with the options specified in the `bootstrap_mcps.yaml`.
## `bootstrap_mcps.yaml` Configuration
The `bootstrap_mcps.yaml` file has the following format.
```yaml
bootstrap:
templates:
- name: <name>
version: <version>
force: false
blocking: false
async: true
optional: false
mcps_location: <classpath or file location>
values_env: <environment variable>
```
Each entry in the list of templates points to a single yaml file which can contain one or more MCP objects. The
execution of the template MCPs is tracked by name and version to prevent re-execution. The MCP objects are executed once
unless `force=true` for each `name`/`version` combination.
See the following table of options for descriptions of each field in the template configuration.
| Field | Default | Required | Description |
|---------------|----------|-----------|------------------------------------------------------------------------------------------------------------|
| name | | `true` | The name for the collection of template MCPs. |
| version | | `true` | A string version for the collection of template MCPs. |
| force | `false` | `false` | Ignores the previous run history, will not skip execution if run previously. |
| blocking | `false` | `false` | Run before GMS and other components during upgrade/install if running in split blocking/non-blocking mode. |
| async | `true` | `false` | Controls whether the MCPs are executed for sync or async ingestion. |
| optional | `false` | `false` | Whether to ignore a failure or fail the entire `system-update` job. |
| mcps_location | | `true` | The location of the file which contains the template MCPs |
| values_env | | `false` | The environment variable which contains override template values. |
## Template MCPs
Template MCPs are stored in a yaml file which uses the mustache templating library to populate values from an optional environment
variable. Defaults can be provided inline making override only necessary when providing install/upgrade time configuration.
In general the file contains a list of MCPs which follow the schema definition for MCPs exactly. Any valid field for an MCP
is accepted, including optional fields such as `headers`.
### Example: Native Group
An example template MCP collection, configuration, and values environment variable is shown below which would create a native group.
```yaml
- entityUrn: urn:li:corpGroup:{{group.id}}
entityType: corpGroup
aspectName: corpGroupInfo
changeType: UPSERT
aspect:
description: {{group.description}}{{^group.description}}Default description{{/group.description}}
displayName: {{group.displayName}}
created: {{&auditStamp}}
members: [] # required as part of the aspect's schema definition
groups: [] # required as part of the aspect's schema definition
admins: [] # required as part of the aspect's schema definition
- entityUrn: urn:li:corpGroup:{{group.id}}
entityType: corpGroup
aspectName: origin
changeType: UPSERT
aspect:
type: NATIVE
```
Creating an entry in the `bootstrap_mcps.yaml` to populate the values from the environment variable `DATAHUB_TEST_GROUP_VALUES`
```yaml
- name: test-group
version: v1
mcps_location: "bootstrap_mcps/test-group.yaml"
values_env: "DATAHUB_TEST_GROUP_VALUES"
```
An example json values are loaded from environment variable in `DATAHUB_TEST_GROUP_VALUES` might look like the following.
```json
{"group":{"id":"mygroup", "displayName":"My Group", "description":"Description of the group"}}
```
Using standard mustache template semantics the values in the environment would be inserted into the yaml structure
and ingested when the `system-update` runs.
#### Default values
In the example above, the group's `description` if not provided would default to `Default description` if not specified
in the values contain in the environment variable override following the standard mustache template semantics.
#### AuditStamp
A special template reference, `{{&auditStamp}}` can be used to inject an `auditStamp` into the aspect. This can be used to
populate required fields of type `auditStamp` calculated from when the MCP is applied. This will insert an inline json representation
of the `auditStamp` into the location and avoid escaping html characters per standard mustache template indicated by the `&` character.
### Ingestion Template MCPs
Ingestion template MCPs are slightly more complicated since the ingestion `recipe` is stored as a json string within the aspect.
For ingestion recipes, special handling was added so that they can be described naturally in yaml instead of the normally encoded json string.
This means that in the example below, the structure beneath the `aspect.config.recipe` path will be automatically converted
to the required json structure and stored as a string.
```yaml
- entityType: dataHubIngestionSource
entityUrn: urn:li:dataHubIngestionSource:demo-data
aspectName: dataHubIngestionSourceInfo
changeType: UPSERT
aspect:
type: 'demo-data'
name: 'demo-data'
config:
recipe:
source:
type: 'datahub-gc'
config: {}
executorId: default
```
## Known Limitations
* Supported change types:
* UPSERT
* CREATE
* CREATE_ENTITY

View File

@ -12,7 +12,7 @@ your custom Data Platform will persist even between full cleans (nukes) of DataH
## Changing Default Data Platforms
Simply make a change to the [data_platforms.json](https://github.com/datahub-project/datahub/blob/master/metadata-service/war/src/main/resources/boot/data_platforms.json)
Simply make a change to the [data_platforms.yaml](https://github.com/datahub-project/datahub/blob/master/metadata-service/configuration/src/main/resources/bootstrap_mcps/data-platforms.yaml)
file to add a custom Data Platform:
```

View File

@ -99,7 +99,7 @@ List of Data Platforms
- Tableau
- Vertica
Reference : [data_platforms.json](https://github.com/datahub-project/datahub/blob/master/metadata-service/war/src/main/resources/boot/data_platforms.json)
Reference : [data_platforms.yaml](https://github.com/datahub-project/datahub/blob/master/metadata-service/configuration/src/main/resources/bootstrap_mcps/data-platforms.yaml)
</details>

View File

@ -240,7 +240,7 @@ in [sql_common.py](./src/datahub/ingestion/source/sql/sql_common.py) if the sour
### 9. Add logo for the platform
Add the logo image in [images folder](../datahub-web-react/src/images) and add it to be ingested at [startup](../metadata-service/war/src/main/resources/boot/data_platforms.json)
Add the logo image in [images folder](../datahub-web-react/src/images) and add it to be ingested at [startup](../metadata-service/configuration/src/main/resources/bootstrap_mcps/data-platforms.yaml)
### 10. Update Frontend for UI-based ingestion

View File

@ -725,7 +725,7 @@ class MetabaseSource(StatefulIngestionSourceBase):
return "", None, None, None
# Map engine names to what datahub expects in
# https://github.com/datahub-project/datahub/blob/master/metadata-service/war/src/main/resources/boot/data_platforms.json
# https://github.com/datahub-project/datahub/blob/master/metadata-service/configuration/src/main/resources/bootstrap_mcps/data-platforms.yaml
engine = dataset_json.get("engine", "")
engine_mapping = {

View File

@ -686,7 +686,7 @@ class ModeSource(StatefulIngestionSourceBase):
def _get_datahub_friendly_platform(self, adapter, platform):
# Map adaptor names to what datahub expects in
# https://github.com/datahub-project/datahub/blob/master/metadata-service/war/src/main/resources/boot/data_platforms.json
# https://github.com/datahub-project/datahub/blob/master/metadata-service/configuration/src/main/resources/bootstrap_mcps/data-platforms.yaml
platform_mapping = {
"jdbc:athena": "athena",

View File

@ -558,7 +558,7 @@ def get_platform(connection_type: str) -> str:
# connection_type taken from
# https://help.tableau.com/current/api/rest_api/en-us/REST/rest_api_concepts_connectiontype.htm
# datahub platform mapping is found here
# https://github.com/datahub-project/datahub/blob/master/metadata-service/war/src/main/resources/boot/data_platforms.json
# https://github.com/datahub-project/datahub/blob/master/metadata-service/configuration/src/main/resources/bootstrap_mcps/data-platforms.yaml
if connection_type in ("textscan", "textclean", "excel-direct", "excel", "csv"):
platform = "external"

View File

@ -6,4 +6,4 @@ Examples of data platforms are `redshift`, `hive`, `bigquery`, `looker`, `tablea
## Identity
Data Platforms are identified by the name of the technology. A complete list of currently supported data platforms is available [here](https://raw.githubusercontent.com/datahub-project/datahub/master/metadata-service/war/src/main/resources/boot/data_platforms.json).
Data Platforms are identified by the name of the technology. A complete list of currently supported data platforms is available [here](https://github.com/datahub-project/datahub/blob/master/metadata-service/configuration/src/main/resources/bootstrap_mcps/data-platforms.yaml).

View File

@ -3,7 +3,9 @@ package io.datahubproject.metadata.context;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.StreamReadConstraints;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import com.linkedin.metadata.Constants;
import java.util.List;
import java.util.Optional;
import javax.annotation.Nonnull;
import lombok.Builder;
@ -14,23 +16,29 @@ import lombok.Getter;
public class ObjectMapperContext implements ContextInterface {
public static ObjectMapper defaultMapper = new ObjectMapper();
public static ObjectMapper defaultYamlMapper = new ObjectMapper(new YAMLFactory());
static {
defaultMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
int maxSize =
Integer.parseInt(
System.getenv()
.getOrDefault(
Constants.INGESTION_MAX_SERIALIZED_STRING_LENGTH,
Constants.MAX_JACKSON_STRING_SIZE));
defaultMapper
.getFactory()
.setStreamReadConstraints(StreamReadConstraints.builder().maxStringLength(maxSize).build());
for (ObjectMapper mapper : List.of(defaultMapper, defaultYamlMapper)) {
int maxSize =
Integer.parseInt(
System.getenv()
.getOrDefault(
Constants.INGESTION_MAX_SERIALIZED_STRING_LENGTH,
Constants.MAX_JACKSON_STRING_SIZE));
mapper
.getFactory()
.setStreamReadConstraints(
StreamReadConstraints.builder().maxStringLength(maxSize).build());
}
}
public static ObjectMapperContext DEFAULT = ObjectMapperContext.builder().build();
@Nonnull private final ObjectMapper objectMapper;
@Nonnull private final ObjectMapper yamlMapper;
@Override
public Optional<Integer> getCacheKeyComponent() {
@ -42,7 +50,10 @@ public class ObjectMapperContext implements ContextInterface {
if (this.objectMapper == null) {
objectMapper(defaultMapper);
}
return new ObjectMapperContext(this.objectMapper);
if (this.yamlMapper == null) {
yamlMapper(defaultYamlMapper);
}
return new ObjectMapperContext(this.objectMapper, this.yamlMapper);
}
}
}

View File

@ -402,6 +402,11 @@ public class OperationContext implements AuthorizationSession {
return objectMapperContext.getObjectMapper();
}
@Nonnull
public ObjectMapper getYamlMapper() {
return objectMapperContext.getYamlMapper();
}
@Override
public boolean equals(Object o) {
if (this == o) return true;

View File

@ -347,8 +347,6 @@ bootstrap:
file: ${BOOTSTRAP_POLICIES_FILE:classpath:boot/policies.json}
# eg for local file
# file: "file:///datahub/datahub-gms/resources/custom-policies.json"
ownershipTypes:
file: ${BOOTSTRAP_OWNERSHIP_TYPES_FILE:classpath:boot/ownership_types.json}
servlets:
waitTimeout: ${BOOTSTRAP_SERVLETS_WAITTIMEOUT:60} # Total waiting time in seconds for servlets to initialize
@ -357,6 +355,8 @@ systemUpdate:
maxBackOffs: ${BOOTSTRAP_SYSTEM_UPDATE_MAX_BACK_OFFS:50}
backOffFactor: ${BOOTSTRAP_SYSTEM_UPDATE_BACK_OFF_FACTOR:2} # Multiplicative factor for back off, default values will result in waiting 5min 15s
waitForSystemUpdate: ${BOOTSTRAP_SYSTEM_UPDATE_WAIT_FOR_SYSTEM_UPDATE:true}
bootstrap:
mcpConfig: ${SYSTEM_UPDATE_BOOTSTRAP_MCP_CONFIG:bootstrap_mcps.yaml}
dataJobNodeCLL:
enabled: ${BOOTSTRAP_SYSTEM_UPDATE_DATA_JOB_NODE_CLL_ENABLED:false}
batchSize: ${BOOTSTRAP_SYSTEM_UPDATE_DATA_JOB_NODE_CLL_BATCH_SIZE:1000}

View File

@ -0,0 +1,36 @@
bootstrap:
# Defaults
# force: false
# blocking: false
# async: true
# optional: false
templates:
# Bootstrap
- name: root-user
version: v1
blocking: true
async: false
mcps_location: "bootstrap_mcps/root-user.yaml"
- name: data-platforms
version: v1
mcps_location: "bootstrap_mcps/data-platforms.yaml"
- name: data-types
version: v1
mcps_location: "bootstrap_mcps/data-types.yaml"
- name: ownership-types
version: v1
mcps_location: "bootstrap_mcps/ownership-types.yaml"
- name: roles
version: v1
mcps_location: "bootstrap_mcps/roles.yaml"
# Ingestion Recipes
- name: ingestion-datahub-gc
version: v1
optional: true
mcps_location: "bootstrap_mcps/ingestion-datahub-gc.yaml"
values_env: "DATAHUB_GC_BOOTSTRAP_VALUES"

View File

@ -0,0 +1,709 @@
# Instructions to add additional entry
# 1. Add new entry to this list
# 2. Increment version in bootstrap_mcps.yaml for the entry referring to this file
- entityUrn: urn:li:dataPlatform:adlsGen1
entityType: dataPlatform
aspectName: dataPlatformInfo
changeType: UPSERT
aspect:
datasetNameDelimiter: "/"
name: adlsGen1
displayName: Azure Data Lake (Gen 1)
type: FILE_SYSTEM
logoUrl: "/assets/platforms/adlslogo.png"
- entityUrn: urn:li:dataPlatform:adlsGen2
entityType: dataPlatform
aspectName: dataPlatformInfo
changeType: UPSERT
aspect:
datasetNameDelimiter: "/"
name: adlsGen2
displayName: Azure Data Lake (Gen 2)
type: FILE_SYSTEM
logoUrl: "/assets/platforms/adlslogo.png"
- entityUrn: urn:li:dataPlatform:airflow
entityType: dataPlatform
aspectName: dataPlatformInfo
changeType: UPSERT
aspect:
datasetNameDelimiter: "."
name: airflow
displayName: Airflow
type: OTHERS
logoUrl: "/assets/platforms/airflowlogo.png"
- entityUrn: urn:li:dataPlatform:ambry
entityType: dataPlatform
aspectName: dataPlatformInfo
changeType: UPSERT
aspect:
datasetNameDelimiter: "."
name: ambry
displayName: Ambry
type: OBJECT_STORE
- entityUrn: urn:li:dataPlatform:clickhouse
entityType: dataPlatform
aspectName: dataPlatformInfo
changeType: UPSERT
aspect:
datasetNameDelimiter: "."
name: clickhouse
displayName: ClickHouse
type: RELATIONAL_DB
logoUrl: "/assets/platforms/clickhouselogo.png"
- entityUrn: urn:li:dataPlatform:cockroachdb
entityType: dataPlatform
aspectName: dataPlatformInfo
changeType: UPSERT
aspect:
datasetNameDelimiter: "."
name: cockroachdb
displayName: CockroachDb
type: RELATIONAL_DB
logoUrl: "/assets/platforms/cockroachdblogo.png"
- entityUrn: urn:li:dataPlatform:couchbase
entityType: dataPlatform
aspectName: dataPlatformInfo
changeType: UPSERT
aspect:
datasetNameDelimiter: "."
name: couchbase
displayName: Couchbase
type: KEY_VALUE_STORE
logoUrl: "/assets/platforms/couchbaselogo.png"
- entityUrn: urn:li:dataPlatform:dagster
entityType: dataPlatform
aspectName: dataPlatformInfo
changeType: UPSERT
aspect:
datasetNameDelimiter: "/"
name: dagster
displayName: Dagster
type: OTHERS
logoUrl: "/assets/platforms/dagsterlogo.svg"
- entityUrn: urn:li:dataPlatform:external
entityType: dataPlatform
aspectName: dataPlatformInfo
changeType: UPSERT
aspect:
datasetNameDelimiter: "."
name: external
displayName: External Source
type: OTHERS
- entityUrn: urn:li:dataPlatform:hdfs
entityType: dataPlatform
aspectName: dataPlatformInfo
changeType: UPSERT
aspect:
datasetNameDelimiter: "/"
name: hdfs
displayName: HDFS
type: FILE_SYSTEM
logoUrl: "/assets/platforms/hadooplogo.png"
- entityUrn: urn:li:dataPlatform:hana
entityType: dataPlatform
aspectName: dataPlatformInfo
changeType: UPSERT
aspect:
datasetNameDelimiter: "."
name: hana
displayName: SAP HANA
type: RELATIONAL_DB
logoUrl: "/assets/platforms/hanalogo.png"
- entityUrn: urn:li:dataPlatform:hive
entityType: dataPlatform
aspectName: dataPlatformInfo
changeType: UPSERT
aspect:
datasetNameDelimiter: "."
name: hive
displayName: Hive
type: FILE_SYSTEM
logoUrl: "/assets/platforms/hivelogo.png"
- entityUrn: urn:li:dataPlatform:iceberg
entityType: dataPlatform
aspectName: dataPlatformInfo
changeType: UPSERT
aspect:
datasetNameDelimiter: "."
name: iceberg
displayName: Iceberg
type: FILE_SYSTEM
logoUrl: "/assets/platforms/iceberglogo.png"
- entityUrn: urn:li:dataPlatform:s3
entityType: dataPlatform
aspectName: dataPlatformInfo
changeType: UPSERT
aspect:
datasetNameDelimiter: "/"
name: s3
displayName: AWS S3
type: FILE_SYSTEM
logoUrl: "/assets/platforms/s3.png"
- entityUrn: urn:li:dataPlatform:kafka
entityType: dataPlatform
aspectName: dataPlatformInfo
changeType: UPSERT
aspect:
datasetNameDelimiter: "."
name: kafka
displayName: Kafka
type: MESSAGE_BROKER
logoUrl: "/assets/platforms/kafkalogo.png"
- entityUrn: urn:li:dataPlatform:kafka-connect
entityType: dataPlatform
aspectName: dataPlatformInfo
changeType: UPSERT
aspect:
datasetNameDelimiter: "."
name: kafka-connect
displayName: Kafka Connect
type: OTHERS
logoUrl: "/assets/platforms/kafkalogo.png"
- entityUrn: urn:li:dataPlatform:kusto
entityType: dataPlatform
aspectName: dataPlatformInfo
changeType: UPSERT
aspect:
datasetNameDelimiter: "."
name: kusto
displayName: Kusto
type: OLAP_DATASTORE
logoUrl: "/assets/platforms/kustologo.png"
- entityUrn: urn:li:dataPlatform:mode
entityType: dataPlatform
aspectName: dataPlatformInfo
changeType: UPSERT
aspect:
datasetNameDelimiter: "."
name: mode
displayName: Mode
type: KEY_VALUE_STORE
logoUrl: "/assets/platforms/modelogo.png"
- entityUrn: urn:li:dataPlatform:mongodb
entityType: dataPlatform
aspectName: dataPlatformInfo
changeType: UPSERT
aspect:
datasetNameDelimiter: "."
name: mongodb
displayName: MongoDB
type: KEY_VALUE_STORE
logoUrl: "/assets/platforms/mongodblogo.png"
- entityUrn: urn:li:dataPlatform:mysql
entityType: dataPlatform
aspectName: dataPlatformInfo
changeType: UPSERT
aspect:
datasetNameDelimiter: "."
name: mysql
displayName: MySQL
type: RELATIONAL_DB
logoUrl: "/assets/platforms/mysqllogo.png"
- entityUrn: urn:li:dataPlatform:db2
entityType: dataPlatform
aspectName: dataPlatformInfo
changeType: UPSERT
aspect:
datasetNameDelimiter: "."
name: db2
displayName: DB2
type: RELATIONAL_DB
logoUrl: "/assets/platforms/db2logo.png"
- entityUrn: urn:li:dataPlatform:mariadb
entityType: dataPlatform
aspectName: dataPlatformInfo
changeType: UPSERT
aspect:
datasetNameDelimiter: "."
name: mariadb
displayName: MariaDB
type: RELATIONAL_DB
logoUrl: "/assets/platforms/mariadblogo.png"
- entityUrn: urn:li:dataPlatform:OpenApi
entityType: dataPlatform
aspectName: dataPlatformInfo
changeType: UPSERT
aspect:
datasetNameDelimiter: "."
name: openapi
displayName: OpenAPI
type: OTHERS
logoUrl: "/assets/platforms/openapilogo.png"
- entityUrn: urn:li:dataPlatform:oracle
entityType: dataPlatform
aspectName: dataPlatformInfo
changeType: UPSERT
aspect:
datasetNameDelimiter: "."
name: oracle
displayName: Oracle
type: RELATIONAL_DB
logoUrl: "/assets/platforms/oraclelogo.png"
- entityUrn: urn:li:dataPlatform:pinot
entityType: dataPlatform
aspectName: dataPlatformInfo
changeType: UPSERT
aspect:
datasetNameDelimiter: "."
name: pinot
displayName: Pinot
type: OLAP_DATASTORE
logoUrl: "/assets/platforms/pinotlogo.png"
- entityUrn: urn:li:dataPlatform:postgres
entityType: dataPlatform
aspectName: dataPlatformInfo
changeType: UPSERT
aspect:
datasetNameDelimiter: "."
name: postgres
displayName: PostgreSQL
type: RELATIONAL_DB
logoUrl: "/assets/platforms/postgreslogo.png"
- entityUrn: urn:li:dataPlatform:prefect
entityType: dataPlatform
aspectName: dataPlatformInfo
changeType: UPSERT
aspect:
datasetNameDelimiter: "."
name: prefect
displayName: Prefect
type: OTHERS
logoUrl: "/assets/platforms/prefectlogo.png"
- entityUrn: urn:li:dataPlatform:presto
entityType: dataPlatform
aspectName: dataPlatformInfo
changeType: UPSERT
aspect:
datasetNameDelimiter: "."
name: prefect
displayName: Prefect
type: OTHERS
logoUrl: "/assets/platforms/prefectlogo.png"
- entityUrn: urn:li:dataPlatform:presto
entityType: dataPlatform
aspectName: dataPlatformInfo
changeType: UPSERT
aspect:
datasetNameDelimiter: "."
name: presto
displayName: Presto
type: QUERY_ENGINE
logoUrl: "/assets/platforms/prestologo.png"
- entityUrn: urn:li:dataPlatform:tableau
entityType: dataPlatform
aspectName: dataPlatformInfo
changeType: UPSERT
aspect:
datasetNameDelimiter: "."
name: tableau
displayName: Tableau
type: OTHERS
logoUrl: "/assets/platforms/tableaulogo.svg"
- entityUrn: urn:li:dataPlatform:teradata
entityType: dataPlatform
aspectName: dataPlatformInfo
changeType: UPSERT
aspect:
datasetNameDelimiter: "."
name: teradata
displayName: Teradata
type: RELATIONAL_DB
logoUrl: "/assets/platforms/teradatalogo.png"
- entityUrn: urn:li:dataPlatform:voldemort
entityType: dataPlatform
aspectName: dataPlatformInfo
changeType: UPSERT
aspect:
datasetNameDelimiter: "."
name: voldemort
displayName: Voldemort
type: KEY_VALUE_STORE
- entityUrn: urn:li:dataPlatform:snowflake
entityType: dataPlatform
aspectName: dataPlatformInfo
changeType: UPSERT
aspect:
datasetNameDelimiter: "."
name: snowflake
displayName: Snowflake
type: RELATIONAL_DB
logoUrl: "/assets/platforms/snowflakelogo.png"
- entityUrn: urn:li:dataPlatform:redshift
entityType: dataPlatform
aspectName: dataPlatformInfo
changeType: UPSERT
aspect:
datasetNameDelimiter: "."
name: redshift
displayName: Redshift
type: RELATIONAL_DB
logoUrl: "/assets/platforms/redshiftlogo.png"
- entityUrn: urn:li:dataPlatform:mssql
entityType: dataPlatform
aspectName: dataPlatformInfo
changeType: UPSERT
aspect:
datasetNameDelimiter: "."
name: mssql
displayName: SQL Server
type: RELATIONAL_DB
logoUrl: "/assets/platforms/mssqllogo.png"
- entityUrn: urn:li:dataPlatform:bigquery
entityType: dataPlatform
aspectName: dataPlatformInfo
changeType: UPSERT
aspect:
datasetNameDelimiter: "."
name: bigquery
displayName: BigQuery
type: RELATIONAL_DB
logoUrl: "/assets/platforms/bigquerylogo.png"
- entityUrn: urn:li:dataPlatform:druid
entityType: dataPlatform
aspectName: dataPlatformInfo
changeType: UPSERT
aspect:
datasetNameDelimiter: "."
name: druid
displayName: Druid
type: OLAP_DATASTORE
logoUrl: "/assets/platforms/druidlogo.png"
- entityUrn: urn:li:dataPlatform:looker
entityType: dataPlatform
aspectName: dataPlatformInfo
changeType: UPSERT
aspect:
datasetNameDelimiter: "."
name: looker
displayName: Looker
type: OTHERS
logoUrl: "/assets/platforms/lookerlogo.svg"
- entityUrn: urn:li:dataPlatform:feast
entityType: dataPlatform
aspectName: dataPlatformInfo
changeType: UPSERT
aspect:
datasetNameDelimiter: "."
name: feast
displayName: Feast
type: OTHERS
logoUrl: "/assets/platforms/feastlogo.png"
- entityUrn: urn:li:dataPlatform:sagemaker
entityType: dataPlatform
aspectName: dataPlatformInfo
changeType: UPSERT
aspect:
datasetNameDelimiter: "."
name: sagemaker
displayName: SageMaker
type: OTHERS
logoUrl: "/assets/platforms/sagemakerlogo.png"
- entityUrn: urn:li:dataPlatform:mlflow
entityType: dataPlatform
aspectName: dataPlatformInfo
changeType: UPSERT
aspect:
datasetNameDelimiter: "."
name: mlflow
displayName: MLflow
type: OTHERS
logoUrl: "/assets/platforms/mlflowlogo.png"
- entityUrn: urn:li:dataPlatform:glue
entityType: dataPlatform
aspectName: dataPlatformInfo
changeType: UPSERT
aspect:
datasetNameDelimiter: "."
name: glue
displayName: Glue
type: OTHERS
logoUrl: "/assets/platforms/gluelogo.png"
- entityUrn: urn:li:dataPlatform:redash
entityType: dataPlatform
aspectName: dataPlatformInfo
changeType: UPSERT
aspect:
datasetNameDelimiter: "."
name: redash
displayName: Redash
type: OTHERS
logoUrl: "/assets/platforms/redashlogo.png"
- entityUrn: urn:li:dataPlatform:athena
entityType: dataPlatform
aspectName: dataPlatformInfo
changeType: UPSERT
aspect:
datasetNameDelimiter: "."
name: athena
displayName: AWS Athena
type: RELATIONAL_DB
logoUrl: "/assets/platforms/awsathenalogo.png"
- entityUrn: urn:li:dataPlatform:spark
entityType: dataPlatform
aspectName: dataPlatformInfo
changeType: UPSERT
aspect:
datasetNameDelimiter: "."
name: spark
displayName: Spark
type: OTHERS
logoUrl: "/assets/platforms/sparklogo.png"
- entityUrn: urn:li:dataPlatform:dbt
entityType: dataPlatform
aspectName: dataPlatformInfo
changeType: UPSERT
aspect:
datasetNameDelimiter: "."
name: dbt
displayName: dbt
type: OTHERS
logoUrl: "/assets/platforms/dbtlogo.png"
- entityUrn: urn:li:dataPlatform:elasticsearch
entityType: dataPlatform
aspectName: dataPlatformInfo
changeType: UPSERT
aspect:
datasetNameDelimiter: "."
name: elasticsearch
displayName: Elasticsearch
type: OTHERS
logoUrl: "/assets/platforms/elasticsearchlogo.png"
- entityUrn: urn:li:dataPlatform:great-expectations
entityType: dataPlatform
aspectName: dataPlatformInfo
changeType: UPSERT
aspect:
name: Great Expectations
displayName: Great Expectations
type: OTHERS
logoUrl: "/assets/platforms/greatexpectationslogo.png"
datasetNameDelimiter: "."
- entityUrn: urn:li:dataPlatform:powerbi
entityType: dataPlatform
aspectName: dataPlatformInfo
changeType: UPSERT
aspect:
datasetNameDelimiter: "."
name: powerbi
displayName: Power BI
type: OTHERS
logoUrl: "/assets/platforms/powerbilogo.png"
- entityUrn: urn:li:dataPlatform:presto-on-hive
entityType: dataPlatform
aspectName: dataPlatformInfo
changeType: UPSERT
aspect:
datasetNameDelimiter: "."
name: presto-on-hive
displayName: Presto on Hive
type: FILE_SYSTEM
logoUrl: "/assets/platforms/prestoonhivelogo.png"
- entityUrn: urn:li:dataPlatform:metabase
entityType: dataPlatform
aspectName: dataPlatformInfo
changeType: UPSERT
aspect:
datasetNameDelimiter: "."
name: metabase
displayName: Metabase
type: OTHERS
logoUrl: "/assets/platforms/metabaselogo.svg"
- entityUrn: urn:li:dataPlatform:nifi
entityType: dataPlatform
aspectName: dataPlatformInfo
changeType: UPSERT
aspect:
datasetNameDelimiter: "."
name: nifi
displayName: NiFi
type: OTHERS
logoUrl: "/assets/platforms/nifilogo.svg"
- entityUrn: urn:li:dataPlatform:superset
entityType: dataPlatform
aspectName: dataPlatformInfo
changeType: UPSERT
aspect:
datasetNameDelimiter: "."
name: superset
displayName: Superset
type: OTHERS
logoUrl: "/assets/platforms/supersetlogo.png"
- entityUrn: urn:li:dataPlatform:trino
entityType: dataPlatform
aspectName: dataPlatformInfo
changeType: UPSERT
aspect:
datasetNameDelimiter: "."
name: trino
displayName: Trino
type: QUERY_ENGINE
logoUrl: "/assets/platforms/trinologo.png"
- entityUrn: urn:li:dataPlatform:pulsar
entityType: dataPlatform
aspectName: dataPlatformInfo
changeType: UPSERT
aspect:
datasetNameDelimiter: "."
name: pulsar
displayName: Pulsar
type: MESSAGE_BROKER
logoUrl: "/assets/platforms/pulsarlogo.png"
- entityUrn: urn:li:dataPlatform:salesforce
entityType: dataPlatform
aspectName: dataPlatformInfo
changeType: UPSERT
aspect:
datasetNameDelimiter: "."
name: salesforce
displayName: Salesforce
type: OTHERS
logoUrl: "/assets/platforms/logo-salesforce.svg"
- entityUrn: urn:li:dataPlatform:unknown
entityType: dataPlatform
aspectName: dataPlatformInfo
changeType: UPSERT
aspect:
datasetNameDelimiter: "."
name: Unknown Platform
displayName: N/A
type: OTHERS
- entityUrn: urn:li:dataPlatform:delta-lake
entityType: dataPlatform
aspectName: dataPlatformInfo
changeType: UPSERT
aspect:
datasetNameDelimiter: "."
name: delta-lake
displayName: Delta Lake
type: OTHERS
logoUrl: "/assets/platforms/deltalakelogo.png"
- entityUrn: urn:li:dataPlatform:databricks
entityType: dataPlatform
aspectName: dataPlatformInfo
changeType: UPSERT
aspect:
datasetNameDelimiter: "."
name: databricks
displayName: Databricks
type: OTHERS
logoUrl: "/assets/platforms/databrickslogo.png"
- entityUrn: urn:li:dataPlatform:vertica
entityType: dataPlatform
aspectName: dataPlatformInfo
changeType: UPSERT
aspect:
datasetNameDelimiter: "."
name: vertica
displayName: Vertica
type: OLAP_DATASTORE
logoUrl: "/assets/platforms/verticalogo.png"
- entityUrn: urn:li:dataPlatform:gcs
entityType: dataPlatform
aspectName: dataPlatformInfo
changeType: UPSERT
aspect:
datasetNameDelimiter: "/"
name: gcs
displayName: Google Cloud Storage
type: FILE_SYSTEM
logoUrl: "/assets/platforms/gcslogo.svg"
- entityUrn: urn:li:dataPlatform:slack
entityType: dataPlatform
aspectName: dataPlatformInfo
changeType: UPSERT
aspect:
datasetNameDelimiter: "."
name: Slack
displayName: Slack
type: OTHERS
logoUrl: "/assets/platforms/slacklogo.png"
- entityUrn: urn:li:dataPlatform:microsoft-teams
entityType: dataPlatform
aspectName: dataPlatformInfo
changeType: UPSERT
aspect:
datasetNameDelimiter: "."
name: Microsoft Teams
displayName: Microsoft Teams
type: OTHERS
logoUrl: "/assets/platforms/teamslogo.png"
- entityUrn: urn:li:dataPlatform:dynamodb
entityType: dataPlatform
aspectName: dataPlatformInfo
changeType: UPSERT
aspect:
datasetNameDelimiter: "."
name: dynamodb
displayName: DynamoDB
type: KEY_VALUE_STORE
logoUrl: "/assets/platforms/dynamodblogo.png"
- entityUrn: urn:li:dataPlatform:fivetran
entityType: dataPlatform
aspectName: dataPlatformInfo
changeType: UPSERT
aspect:
datasetNameDelimiter: "."
name: fivetran
displayName: Fivetran
type: OTHERS
logoUrl: "/assets/platforms/fivetranlogo.png"
- entityUrn: urn:li:dataPlatform:csv
entityType: dataPlatform
aspectName: dataPlatformInfo
changeType: UPSERT
aspect:
datasetNameDelimiter: "."
name: csv
displayName: CSV
type: OTHERS
logoUrl: "/assets/platforms/csv-logo.png"
- entityUrn: urn:li:dataPlatform:qlik-sense
entityType: dataPlatform
aspectName: dataPlatformInfo
changeType: UPSERT
aspect:
datasetNameDelimiter: "."
name: qlik-sense
displayName: Qlik Sense
type: OTHERS
logoUrl: "/assets/platforms/qliklogo.png"
- entityUrn: urn:li:dataPlatform:file
entityType: dataPlatform
aspectName: dataPlatformInfo
changeType: UPSERT
aspect:
datasetNameDelimiter: "."
name: file
displayName: File
type: OTHERS
logoUrl: "/assets/platforms/file-logo.svg"
- entityUrn: urn:li:dataPlatform:excel
entityType: dataPlatform
aspectName: dataPlatformInfo
changeType: UPSERT
aspect:
name: excel
displayName: Excel
type: OTHERS
datasetNameDelimiter: "/"
logoUrl: "/assets/platforms/excel-logo.svg"
- entityUrn: urn:li:dataPlatform:sigma
entityType: dataPlatform
aspectName: dataPlatformInfo
changeType: UPSERT
aspect:
datasetNameDelimiter: "."
name: sigma
displayName: Sigma
type: OTHERS
logoUrl: "/assets/platforms/sigmalogo.png"
- entityUrn: urn:li:dataPlatform:sac
entityType: dataPlatform
aspectName: dataPlatformInfo
changeType: UPSERT
aspect:
datasetNameDelimiter: "."
name: sac
displayName: SAP Analytics Cloud
type: OTHERS
logoUrl: "/assets/platforms/saclogo.svg"

View File

@ -0,0 +1,43 @@
# Instructions to add additional entry
# 1. Add new entry to this list
# 2. Increment version in bootstrap_mcps.yaml for the entry referring to this file
- entityUrn: urn:li:dataType:datahub.string
entityType: dataType
aspectName: dataTypeInfo
changeType: UPSERT
aspect:
qualifiedName: datahub.string
displayName: String
description: A string of characters.
- entityUrn: urn:li:dataType:datahub.number
entityType: dataType
aspectName: dataTypeInfo
changeType: UPSERT
aspect:
qualifiedName: datahub.number
displayName: Number
description: An integer or decimal number.
- entityUrn: urn:li:dataType:datahub.urn
entityType: dataType
aspectName: dataTypeInfo
changeType: UPSERT
aspect:
qualifiedName: datahub.urn
displayName: Urn
description: An unique identifier for a DataHub entity.
- entityUrn: urn:li:dataType:datahub.rich_text
entityType: dataType
aspectName: dataTypeInfo
changeType: UPSERT
aspect:
qualifiedName: datahub.rich_text
displayName: Rich Text
description: An attributed string of characters.
- entityUrn: urn:li:dataType:datahub.date
entityType: dataType
aspectName: dataTypeInfo
changeType: UPSERT
aspect:
qualifiedName: datahub.date
displayName: Date
description: A specific day, without time.

View File

@ -0,0 +1,33 @@
# Instructions to add additional entry or update on the target system
# 1. Edit this file
# 2. Increment version in bootstrap_mcps.yaml for the entry referring to this file
- entityType: dataHubIngestionSource
entityUrn: urn:li:dataHubIngestionSource:datahub-gc
aspectName: dataHubIngestionSourceInfo
changeType: UPSERT
aspect:
type: 'datahub-gc'
name: '{{ingestion.name}}{{^ingestion.name}}datahub-gc{{/ingestion.name}}'
schedule:
timezone: '{{schedule.timezone}}{{^schedule.timezone}}UTC{{/schedule.timezone}}'
interval: '{{schedule.interval}}{{^schedule.interval}}0 1 * * *{{/schedule.interval}}'
config:
version: 0.14.1.1rc5
recipe:
source:
type: 'datahub-gc'
config:
cleanup_expired_tokens: {{cleanup_expired_tokens}}{{^cleanup_expired_tokens}}false{{/cleanup_expired_tokens}}
truncate_indices: {{truncate_indices}}{{^truncate_indices}}true{{/truncate_indices}}
dataprocess_cleanup:
retention_days: {{dataprocess_cleanup.retention_days}}{{^dataprocess_cleanup.retention_days}}10{{/dataprocess_cleanup.retention_days}}
delete_empty_data_jobs: {{dataprocess_cleanup.delete_empty_data_jobs}}{{^dataprocess_cleanup.delete_empty_data_jobs}}true{{/dataprocess_cleanup.delete_empty_data_jobs}}
delete_empty_data_flows: {{dataprocess_cleanup.delete_empty_data_flows}}{{^dataprocess_cleanup.delete_empty_data_flows}}true{{/dataprocess_cleanup.delete_empty_data_flows}}
hard_delete_entities: {{dataprocess_cleanup.hard_delete_entities}}{{^dataprocess_cleanup.hard_delete_entities}}false{{/dataprocess_cleanup.hard_delete_entities}}
keep_last_n: {{dataprocess_cleanup.keep_last_n}}{{^dataprocess_cleanup.keep_last_n}}5{{/dataprocess_cleanup.keep_last_n}}
soft_deleted_entities_cleanup:
retention_days: {{soft_deleted_entities_cleanup.retention_days}}{{^soft_deleted_entities_cleanup.retention_days}}10{{/soft_deleted_entities_cleanup.retention_days}}
extraArgs: {}
debugMode: false
executorId: default
headers: {}

View File

@ -0,0 +1,39 @@
# Instructions to add additional entry
# 1. Add new entry to this list
# 2. Increment version in bootstrap_mcps.yaml for the entry referring to this file
- entityUrn: urn:li:ownershipType:__system__technical_owner
entityType: ownershipType
aspectName: ownershipTypeInfo
changeType: UPSERT
aspect:
name: Technical Owner
description: Involved in the production, maintenance, or distribution of the asset(s).
created: {{&auditStamp}}
lastModified: {{&auditStamp}}
- entityUrn: urn:li:ownershipType:__system__business_owner
entityType: ownershipType
aspectName: ownershipTypeInfo
changeType: UPSERT
aspect:
name: Business Owner
description: Principle stakeholders or domain experts associated with the asset(s).
created: {{&auditStamp}}
lastModified: {{&auditStamp}}
- entityUrn: urn:li:ownershipType:__system__data_steward
entityType: ownershipType
aspectName: ownershipTypeInfo
changeType: UPSERT
aspect:
name: Data Steward
description: Involved in governance of the asset(s).
created: {{&auditStamp}}
lastModified: {{&auditStamp}}
- entityUrn: urn:li:ownershipType:__system__none
entityType: ownershipType
aspectName: ownershipTypeInfo
changeType: UPSERT
aspect:
name: None
description: No ownership type specified.
created: {{&auditStamp}}
lastModified: {{&auditStamp}}

View File

@ -0,0 +1,28 @@
# Instructions to add additional entry
# 1. Add new entry to this list
# 2. Increment version in bootstrap_mcps.yaml for the entry referring to this file
- entityUrn: urn:li:dataHubRole:Admin
entityType: dataHubRole
aspectName: dataHubRoleInfo
changeType: UPSERT
aspect:
name: Admin
description: Can do everything on the platform.
editable: false
- entityUrn: urn:li:dataHubRole:Editor
entityType: dataHubRole
aspectName: dataHubRoleInfo
changeType: UPSERT
aspect:
name: Editor
description: Can read and edit all metadata. Cannot take administrative actions.
editable: false
- entityUrn: urn:li:dataHubRole:Reader
entityType: dataHubRole
aspectName: dataHubRoleInfo
changeType: UPSERT
aspect:
name: Reader
description: Can read all metadata. Cannot edit anything by default, or take administrative
actions.
editable: false

View File

@ -0,0 +1,8 @@
- entityUrn: urn:li:corpuser:datahub
entityType: corpuser
aspectName: corpUserInfo
changeType: UPSERT
aspect:
active: true
displayName: DataHub
title: DataHub Root User

View File

@ -11,15 +11,10 @@ import com.linkedin.metadata.boot.BootstrapStep;
import com.linkedin.metadata.boot.dependencies.BootstrapDependency;
import com.linkedin.metadata.boot.steps.IndexDataPlatformsStep;
import com.linkedin.metadata.boot.steps.IngestDataPlatformInstancesStep;
import com.linkedin.metadata.boot.steps.IngestDataPlatformsStep;
import com.linkedin.metadata.boot.steps.IngestDataTypesStep;
import com.linkedin.metadata.boot.steps.IngestDefaultGlobalSettingsStep;
import com.linkedin.metadata.boot.steps.IngestEntityTypesStep;
import com.linkedin.metadata.boot.steps.IngestOwnershipTypesStep;
import com.linkedin.metadata.boot.steps.IngestPoliciesStep;
import com.linkedin.metadata.boot.steps.IngestRetentionPoliciesStep;
import com.linkedin.metadata.boot.steps.IngestRolesStep;
import com.linkedin.metadata.boot.steps.IngestRootUserStep;
import com.linkedin.metadata.boot.steps.RemoveClientIdAspectStep;
import com.linkedin.metadata.boot.steps.RestoreColumnLineageIndices;
import com.linkedin.metadata.boot.steps.RestoreDbtSiblingsIndices;
@ -90,21 +85,14 @@ public class BootstrapManagerFactory {
@Value("${bootstrap.policies.file}")
private Resource _policiesResource;
@Value("${bootstrap.ownershipTypes.file}")
private Resource _ownershipTypesResource;
@Bean(name = "bootstrapManager")
@Scope("singleton")
@Nonnull
protected BootstrapManager createInstance(
@Qualifier("systemOperationContext") final OperationContext systemOpContext) {
final IngestRootUserStep ingestRootUserStep = new IngestRootUserStep(_entityService);
final IngestPoliciesStep ingestPoliciesStep =
new IngestPoliciesStep(
_entityService, _entitySearchService, _searchDocumentTransformer, _policiesResource);
final IngestRolesStep ingestRolesStep = new IngestRolesStep(_entityService, _entityRegistry);
final IngestDataPlatformsStep ingestDataPlatformsStep =
new IngestDataPlatformsStep(_entityService);
final IngestDataPlatformInstancesStep ingestDataPlatformInstancesStep =
new IngestDataPlatformInstancesStep(_entityService, _migrationsDao);
final RestoreGlossaryIndices restoreGlossaryIndicesStep =
@ -121,29 +109,21 @@ public class BootstrapManagerFactory {
new IngestDefaultGlobalSettingsStep(_entityService);
final WaitForSystemUpdateStep waitForSystemUpdateStep =
new WaitForSystemUpdateStep(_dataHubUpgradeKafkaListener, _configurationProvider);
final IngestOwnershipTypesStep ingestOwnershipTypesStep =
new IngestOwnershipTypesStep(_entityService, _ownershipTypesResource);
final IngestDataTypesStep ingestDataTypesStep = new IngestDataTypesStep(_entityService);
final IngestEntityTypesStep ingestEntityTypesStep = new IngestEntityTypesStep(_entityService);
final List<BootstrapStep> finalSteps =
new ArrayList<>(
ImmutableList.of(
waitForSystemUpdateStep,
ingestRootUserStep,
ingestPoliciesStep,
ingestRolesStep,
ingestDataPlatformsStep,
ingestDataPlatformInstancesStep,
_ingestRetentionPoliciesStep,
ingestOwnershipTypesStep,
ingestSettingsStep,
restoreGlossaryIndicesStep,
removeClientIdAspectStep,
restoreDbtSiblingsIndices,
indexDataPlatformsStep,
restoreColumnLineageIndices,
ingestDataTypesStep,
ingestEntityTypesStep));
return new BootstrapManager(finalSteps);

View File

@ -1,117 +0,0 @@
package com.linkedin.metadata.boot.steps;
import static com.linkedin.metadata.Constants.*;
import com.datahub.util.RecordUtils;
import com.fasterxml.jackson.core.StreamReadConstraints;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.linkedin.common.AuditStamp;
import com.linkedin.common.urn.Urn;
import com.linkedin.dataplatform.DataPlatformInfo;
import com.linkedin.metadata.Constants;
import com.linkedin.metadata.boot.BootstrapStep;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.entity.ebean.batch.AspectsBatchImpl;
import com.linkedin.metadata.entity.ebean.batch.ChangeItemImpl;
import io.datahubproject.metadata.context.OperationContext;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.List;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import javax.annotation.Nonnull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.core.io.ClassPathResource;
@Slf4j
@RequiredArgsConstructor
public class IngestDataPlatformsStep implements BootstrapStep {
private static final String PLATFORM_ASPECT_NAME = "dataPlatformInfo";
private final EntityService<?> _entityService;
@Override
public String name() {
return "IngestDataPlatformsStep";
}
@Override
public void execute(@Nonnull OperationContext systemOperationContext)
throws IOException, URISyntaxException {
final ObjectMapper mapper = new ObjectMapper();
int maxSize =
Integer.parseInt(
System.getenv()
.getOrDefault(INGESTION_MAX_SERIALIZED_STRING_LENGTH, MAX_JACKSON_STRING_SIZE));
mapper
.getFactory()
.setStreamReadConstraints(StreamReadConstraints.builder().maxStringLength(maxSize).build());
// 1. Read from the file into JSON.
final JsonNode dataPlatforms =
mapper.readTree(new ClassPathResource("./boot/data_platforms.json").getFile());
if (!dataPlatforms.isArray()) {
throw new RuntimeException(
String.format(
"Found malformed data platforms file, expected an Array but found %s",
dataPlatforms.getNodeType()));
}
// 2. For each JSON object, cast into a DataPlatformSnapshot object.
List<ChangeItemImpl> dataPlatformAspects =
StreamSupport.stream(
Spliterators.spliteratorUnknownSize(dataPlatforms.iterator(), Spliterator.ORDERED),
false)
.map(
dataPlatform -> {
final String urnString;
final Urn urn;
try {
urnString = dataPlatform.get("urn").asText();
urn = Urn.createFromString(urnString);
} catch (URISyntaxException e) {
log.error("Malformed urn: {}", dataPlatform.get("urn").asText());
throw new RuntimeException("Malformed urn", e);
}
final DataPlatformInfo info =
RecordUtils.toRecordTemplate(
DataPlatformInfo.class, dataPlatform.get("aspect").toString());
try {
return ChangeItemImpl.builder()
.urn(urn)
.aspectName(PLATFORM_ASPECT_NAME)
.recordTemplate(info)
.auditStamp(
new AuditStamp()
.setActor(Urn.createFromString(Constants.SYSTEM_ACTOR))
.setTime(System.currentTimeMillis()))
.build(
systemOperationContext
.getRetrieverContext()
.get()
.getAspectRetriever());
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
})
.collect(Collectors.toList());
_entityService.ingestAspects(
systemOperationContext,
AspectsBatchImpl.builder()
.retrieverContext(systemOperationContext.getRetrieverContext().get())
.items(dataPlatformAspects)
.build(),
true,
false);
}
}

View File

@ -1,108 +0,0 @@
package com.linkedin.metadata.boot.steps;
import static com.linkedin.metadata.Constants.*;
import com.datahub.util.RecordUtils;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.linkedin.common.AuditStamp;
import com.linkedin.common.urn.Urn;
import com.linkedin.datatype.DataTypeInfo;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.boot.BootstrapStep;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.utils.GenericRecordUtils;
import com.linkedin.mxe.MetadataChangeProposal;
import io.datahubproject.metadata.context.OperationContext;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import javax.annotation.Nonnull;
import lombok.extern.slf4j.Slf4j;
import org.springframework.core.io.ClassPathResource;
/** This bootstrap step is responsible for ingesting default data types. */
@Slf4j
public class IngestDataTypesStep implements BootstrapStep {
private static final String DEFAULT_FILE_PATH = "./boot/data_types.json";
private static final ObjectMapper JSON_MAPPER = new ObjectMapper();
private final EntityService<?> _entityService;
private final String _resourcePath;
public IngestDataTypesStep(@Nonnull final EntityService<?> entityService) {
this(entityService, DEFAULT_FILE_PATH);
}
public IngestDataTypesStep(
@Nonnull final EntityService<?> entityService, @Nonnull final String filePath) {
_entityService = Objects.requireNonNull(entityService, "entityService must not be null");
_resourcePath = filePath;
}
@Override
public String name() {
return "IngestDataTypesStep";
}
@Override
public void execute(@Nonnull OperationContext systemOperationContext) throws Exception {
log.info("Ingesting default data types...");
// 1. Read from the file into JSON.
final JsonNode dataTypesObj =
JSON_MAPPER.readTree(new ClassPathResource(_resourcePath).getFile());
if (!dataTypesObj.isArray()) {
throw new RuntimeException(
String.format(
"Found malformed data types file, expected an Array but found %s",
dataTypesObj.getNodeType()));
}
log.info("Ingesting {} data types types", dataTypesObj.size());
int numIngested = 0;
Map<Urn, JsonNode> urnDataTypesMap = new HashMap<>();
for (final JsonNode roleObj : dataTypesObj) {
final Urn urn = Urn.createFromString(roleObj.get("urn").asText());
urnDataTypesMap.put(urn, roleObj);
}
Set<Urn> existingUrns = _entityService.exists(systemOperationContext, urnDataTypesMap.keySet());
for (final Map.Entry<Urn, JsonNode> entry : urnDataTypesMap.entrySet()) {
if (!existingUrns.contains(entry.getKey())) {
final DataTypeInfo info =
RecordUtils.toRecordTemplate(
DataTypeInfo.class, entry.getValue().get("info").toString());
log.info(String.format("Ingesting default data type with urn %s", entry.getKey()));
ingestDataType(systemOperationContext, entry.getKey(), info);
numIngested++;
}
}
log.info("Ingested {} new data types", numIngested);
}
private void ingestDataType(
@Nonnull OperationContext systemOperationContext,
final Urn dataTypeUrn,
final DataTypeInfo info)
throws Exception {
final MetadataChangeProposal proposal = new MetadataChangeProposal();
proposal.setEntityUrn(dataTypeUrn);
proposal.setEntityType(DATA_TYPE_ENTITY_NAME);
proposal.setAspectName(DATA_TYPE_INFO_ASPECT_NAME);
proposal.setAspect(GenericRecordUtils.serializeAspect(info));
proposal.setChangeType(ChangeType.UPSERT);
_entityService.ingestProposal(
systemOperationContext,
proposal,
new AuditStamp()
.setActor(Urn.createFromString(SYSTEM_ACTOR))
.setTime(System.currentTimeMillis()),
false);
}
}

View File

@ -1,117 +0,0 @@
package com.linkedin.metadata.boot.steps;
import static com.linkedin.metadata.Constants.*;
import com.datahub.util.RecordUtils;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.linkedin.common.AuditStamp;
import com.linkedin.common.urn.Urn;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.Constants;
import com.linkedin.metadata.boot.BootstrapStep;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.entity.ebean.batch.AspectsBatchImpl;
import com.linkedin.metadata.models.AspectSpec;
import com.linkedin.metadata.utils.EntityKeyUtils;
import com.linkedin.metadata.utils.GenericRecordUtils;
import com.linkedin.mxe.GenericAspect;
import com.linkedin.mxe.MetadataChangeProposal;
import com.linkedin.ownership.OwnershipTypeInfo;
import io.datahubproject.metadata.context.OperationContext;
import java.util.List;
import javax.annotation.Nonnull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.core.io.Resource;
/**
* This bootstrap step is responsible for ingesting default ownership types.
*
* <p>If system has never bootstrapped this step will: For each ownership type defined in the yaml
* file, it checks whether the urn exists. If not, it ingests the ownership type into DataHub.
*/
@Slf4j
@RequiredArgsConstructor
public class IngestOwnershipTypesStep implements BootstrapStep {
private static final ObjectMapper JSON_MAPPER = new ObjectMapper();
private final EntityService<?> _entityService;
private final Resource _ownershipTypesResource;
@Override
public String name() {
return "IngestOwnershipTypesStep";
}
@Override
public void execute(@Nonnull OperationContext systemOperationContext) throws Exception {
log.info("Ingesting default ownership types from {}...", _ownershipTypesResource);
// 1. Read from the file into JSON.
final JsonNode ownershipTypesObj = JSON_MAPPER.readTree(_ownershipTypesResource.getFile());
if (!ownershipTypesObj.isArray()) {
throw new RuntimeException(
String.format(
"Found malformed ownership file, expected an Array but found %s",
ownershipTypesObj.getNodeType()));
}
final AuditStamp auditStamp =
new AuditStamp()
.setActor(Urn.createFromString(Constants.SYSTEM_ACTOR))
.setTime(System.currentTimeMillis());
log.info("Ingesting {} ownership types", ownershipTypesObj.size());
int numIngested = 0;
for (final JsonNode roleObj : ownershipTypesObj) {
final Urn urn = Urn.createFromString(roleObj.get("urn").asText());
final OwnershipTypeInfo info =
RecordUtils.toRecordTemplate(OwnershipTypeInfo.class, roleObj.get("info").toString());
log.info(String.format("Ingesting default ownership type with urn %s", urn));
ingestOwnershipType(systemOperationContext, urn, info, auditStamp);
numIngested++;
}
log.info("Ingested {} new ownership types", numIngested);
}
private void ingestOwnershipType(
@Nonnull OperationContext systemOperationContext,
final Urn ownershipTypeUrn,
final OwnershipTypeInfo info,
final AuditStamp auditStamp) {
// 3. Write key & aspect MCPs.
final MetadataChangeProposal keyAspectProposal = new MetadataChangeProposal();
final AspectSpec keyAspectSpec =
systemOperationContext.getEntityRegistryContext().getKeyAspectSpec(ownershipTypeUrn);
GenericAspect aspect =
GenericRecordUtils.serializeAspect(
EntityKeyUtils.convertUrnToEntityKey(ownershipTypeUrn, keyAspectSpec));
keyAspectProposal.setAspect(aspect);
keyAspectProposal.setAspectName(keyAspectSpec.getName());
keyAspectProposal.setEntityType(OWNERSHIP_TYPE_ENTITY_NAME);
keyAspectProposal.setChangeType(ChangeType.UPSERT);
keyAspectProposal.setEntityUrn(ownershipTypeUrn);
final MetadataChangeProposal proposal = new MetadataChangeProposal();
proposal.setEntityUrn(ownershipTypeUrn);
proposal.setEntityType(OWNERSHIP_TYPE_ENTITY_NAME);
proposal.setAspectName(OWNERSHIP_TYPE_INFO_ASPECT_NAME);
info.setCreated(auditStamp);
info.setLastModified(auditStamp);
proposal.setAspect(GenericRecordUtils.serializeAspect(info));
proposal.setChangeType(ChangeType.UPSERT);
_entityService.ingestProposal(
systemOperationContext,
AspectsBatchImpl.builder()
.mcps(
List.of(keyAspectProposal, proposal),
auditStamp,
systemOperationContext.getRetrieverContext().get())
.build(),
false);
}
}

View File

@ -1,154 +0,0 @@
package com.linkedin.metadata.boot.steps;
import static com.linkedin.metadata.Constants.*;
import com.datahub.util.RecordUtils;
import com.fasterxml.jackson.core.StreamReadConstraints;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.linkedin.common.AuditStamp;
import com.linkedin.common.urn.Urn;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.Constants;
import com.linkedin.metadata.boot.BootstrapStep;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.entity.ebean.batch.AspectsBatchImpl;
import com.linkedin.metadata.models.AspectSpec;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.metadata.utils.EntityKeyUtils;
import com.linkedin.metadata.utils.GenericRecordUtils;
import com.linkedin.mxe.GenericAspect;
import com.linkedin.mxe.MetadataChangeProposal;
import com.linkedin.policy.DataHubRoleInfo;
import io.datahubproject.metadata.context.OperationContext;
import jakarta.annotation.Nonnull;
import java.net.URISyntaxException;
import java.util.List;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.core.io.ClassPathResource;
@Slf4j
@RequiredArgsConstructor
public class IngestRolesStep implements BootstrapStep {
private static final int SLEEP_SECONDS = 60;
private final EntityService<?> _entityService;
private final EntityRegistry _entityRegistry;
@Override
public String name() {
return this.getClass().getSimpleName();
}
@Nonnull
@Override
public ExecutionMode getExecutionMode() {
return ExecutionMode.ASYNC;
}
@Override
public void execute(@Nonnull OperationContext systemOperationContext) throws Exception {
final ObjectMapper mapper = new ObjectMapper();
int maxSize =
Integer.parseInt(
System.getenv()
.getOrDefault(INGESTION_MAX_SERIALIZED_STRING_LENGTH, MAX_JACKSON_STRING_SIZE));
mapper
.getFactory()
.setStreamReadConstraints(StreamReadConstraints.builder().maxStringLength(maxSize).build());
// Sleep to ensure deployment process finishes.
Thread.sleep(SLEEP_SECONDS * 1000);
// 0. Execute preflight check to see whether we need to ingest Roles
log.info("Ingesting default Roles...");
// 1. Read from the file into JSON.
final JsonNode rolesObj = mapper.readTree(new ClassPathResource("./boot/roles.json").getFile());
if (!rolesObj.isArray()) {
throw new RuntimeException(
String.format(
"Found malformed roles file, expected an Array but found %s",
rolesObj.getNodeType()));
}
final AspectSpec roleInfoAspectSpec =
_entityRegistry
.getEntitySpec(DATAHUB_ROLE_ENTITY_NAME)
.getAspectSpec(DATAHUB_ROLE_INFO_ASPECT_NAME);
final AuditStamp auditStamp =
new AuditStamp()
.setActor(Urn.createFromString(Constants.SYSTEM_ACTOR))
.setTime(System.currentTimeMillis());
for (final JsonNode roleObj : rolesObj) {
final Urn urn = Urn.createFromString(roleObj.get("urn").asText());
// If the info is not there, it means that the role was there before, but must now be removed
if (!roleObj.has("info")) {
_entityService.deleteUrn(systemOperationContext, urn);
continue;
}
final DataHubRoleInfo info =
RecordUtils.toRecordTemplate(DataHubRoleInfo.class, roleObj.get("info").toString());
ingestRole(systemOperationContext, urn, info, auditStamp, roleInfoAspectSpec);
}
log.info("Successfully ingested default Roles.");
}
private void ingestRole(
@Nonnull OperationContext systemOperationContext,
final Urn roleUrn,
final DataHubRoleInfo dataHubRoleInfo,
final AuditStamp auditStamp,
final AspectSpec roleInfoAspectSpec)
throws URISyntaxException {
// 3. Write key & aspect
final MetadataChangeProposal keyAspectProposal = new MetadataChangeProposal();
final AspectSpec keyAspectSpec =
systemOperationContext.getEntityRegistryContext().getKeyAspectSpec(roleUrn);
GenericAspect aspect =
GenericRecordUtils.serializeAspect(
EntityKeyUtils.convertUrnToEntityKey(roleUrn, keyAspectSpec));
keyAspectProposal.setAspect(aspect);
keyAspectProposal.setAspectName(keyAspectSpec.getName());
keyAspectProposal.setEntityType(DATAHUB_ROLE_ENTITY_NAME);
keyAspectProposal.setChangeType(ChangeType.UPSERT);
keyAspectProposal.setEntityUrn(roleUrn);
final MetadataChangeProposal proposal = new MetadataChangeProposal();
proposal.setEntityUrn(roleUrn);
proposal.setEntityType(DATAHUB_ROLE_ENTITY_NAME);
proposal.setAspectName(DATAHUB_ROLE_INFO_ASPECT_NAME);
proposal.setAspect(GenericRecordUtils.serializeAspect(dataHubRoleInfo));
proposal.setChangeType(ChangeType.UPSERT);
_entityService.ingestProposal(
systemOperationContext,
AspectsBatchImpl.builder()
.mcps(
List.of(keyAspectProposal, proposal),
new AuditStamp()
.setActor(Urn.createFromString(SYSTEM_ACTOR))
.setTime(System.currentTimeMillis()),
systemOperationContext.getRetrieverContext().get())
.build(),
false);
_entityService.alwaysProduceMCLAsync(
systemOperationContext,
roleUrn,
DATAHUB_ROLE_ENTITY_NAME,
DATAHUB_ROLE_INFO_ASPECT_NAME,
roleInfoAspectSpec,
null,
dataHubRoleInfo,
null,
null,
auditStamp,
ChangeType.RESTATE);
}
}

View File

@ -1,96 +0,0 @@
package com.linkedin.metadata.boot.steps;
import static com.linkedin.metadata.Constants.*;
import com.datahub.util.RecordUtils;
import com.fasterxml.jackson.core.StreamReadConstraints;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.linkedin.common.AuditStamp;
import com.linkedin.common.urn.Urn;
import com.linkedin.identity.CorpUserInfo;
import com.linkedin.metadata.boot.BootstrapStep;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.key.CorpUserKey;
import com.linkedin.metadata.models.AspectSpec;
import com.linkedin.metadata.models.EntitySpec;
import com.linkedin.metadata.utils.EntityKeyUtils;
import com.linkedin.util.Pair;
import io.datahubproject.metadata.context.OperationContext;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.List;
import javax.annotation.Nonnull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.core.io.ClassPathResource;
@Slf4j
@RequiredArgsConstructor
public class IngestRootUserStep implements BootstrapStep {
private static final String USER_INFO_ASPECT_NAME = "corpUserInfo";
private final EntityService<?> _entityService;
@Override
public String name() {
return getClass().getSimpleName();
}
@Override
public void execute(@Nonnull OperationContext systemOperationContext)
throws IOException, URISyntaxException {
final ObjectMapper mapper = new ObjectMapper();
int maxSize =
Integer.parseInt(
System.getenv()
.getOrDefault(INGESTION_MAX_SERIALIZED_STRING_LENGTH, MAX_JACKSON_STRING_SIZE));
mapper
.getFactory()
.setStreamReadConstraints(StreamReadConstraints.builder().maxStringLength(maxSize).build());
// 1. Read from the file into JSON.
final JsonNode userObj =
mapper.readTree(new ClassPathResource("./boot/root_user.json").getFile());
if (!userObj.isObject()) {
throw new RuntimeException(
String.format(
"Found malformed root user file, expected an Object but found %s",
userObj.getNodeType()));
}
// 2. Ingest the user info
final Urn urn;
try {
urn = Urn.createFromString(userObj.get("urn").asText());
} catch (URISyntaxException e) {
log.error("Malformed urn: {}", userObj.get("urn").asText());
throw new RuntimeException("Malformed urn", e);
}
final CorpUserInfo info =
RecordUtils.toRecordTemplate(CorpUserInfo.class, userObj.get("info").toString());
final CorpUserKey key =
(CorpUserKey)
EntityKeyUtils.convertUrnToEntityKey(urn, getUserKeyAspectSpec(systemOperationContext));
final AuditStamp aspectAuditStamp =
new AuditStamp()
.setActor(Urn.createFromString(SYSTEM_ACTOR))
.setTime(System.currentTimeMillis());
_entityService.ingestAspects(
systemOperationContext,
urn,
List.of(Pair.of(CORP_USER_KEY_ASPECT_NAME, key), Pair.of(USER_INFO_ASPECT_NAME, info)),
aspectAuditStamp,
null);
}
private AspectSpec getUserKeyAspectSpec(@Nonnull OperationContext opContext) {
final EntitySpec spec = opContext.getEntityRegistry().getEntitySpec(CORP_USER_ENTITY_NAME);
return spec.getKeyAspectSpec();
}
}

View File

@ -1,97 +0,0 @@
package com.linkedin.metadata.boot.steps;
import static com.linkedin.metadata.Constants.*;
import static org.mockito.Mockito.*;
import com.linkedin.common.AuditStamp;
import com.linkedin.common.urn.Urn;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.datatype.DataTypeInfo;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.models.registry.ConfigEntityRegistry;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.metadata.utils.GenericRecordUtils;
import com.linkedin.mxe.MetadataChangeProposal;
import io.datahubproject.metadata.context.EntityRegistryContext;
import io.datahubproject.metadata.context.OperationContext;
import java.util.Collection;
import java.util.Set;
import org.jetbrains.annotations.NotNull;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;
public class IngestDataTypesStepTest {
private static final Urn TEST_DATA_TYPE_URN = UrnUtils.getUrn("urn:li:dataType:datahub.test");
@Test
public void testExecuteValidDataTypesNoExistingDataTypes() throws Exception {
EntityRegistry testEntityRegistry = getTestEntityRegistry();
final EntityService<?> entityService = mock(EntityService.class);
final OperationContext mockContext = mock(OperationContext.class);
final EntityRegistryContext entityRegistryContext = mock(EntityRegistryContext.class);
when(mockContext.getEntityRegistryContext()).thenReturn(entityRegistryContext);
when(mockContext.getEntityRegistry()).thenReturn(testEntityRegistry);
when(entityRegistryContext.getKeyAspectSpec(anyString()))
.thenAnswer(
args -> testEntityRegistry.getEntitySpec(args.getArgument(0)).getKeyAspectSpec());
final IngestDataTypesStep step =
new IngestDataTypesStep(entityService, "./boot/test_data_types_valid.json");
step.execute(mockContext);
DataTypeInfo expectedResult = new DataTypeInfo();
expectedResult.setDescription("Test Description");
expectedResult.setDisplayName("Test Name");
expectedResult.setQualifiedName("datahub.test");
Mockito.verify(entityService, times(1))
.ingestProposal(
any(OperationContext.class),
Mockito.eq(buildUpdateDataTypeProposal(expectedResult)),
Mockito.any(AuditStamp.class),
Mockito.eq(false));
}
@Test
public void testExecuteInvalidJson() throws Exception {
final EntityService<?> entityService = mock(EntityService.class);
final OperationContext mockContext = mock(OperationContext.class);
when(mockContext.getEntityRegistry()).thenReturn(mock(EntityRegistry.class));
when(entityService.exists(any(OperationContext.class), any(Collection.class)))
.thenAnswer(args -> Set.of());
final IngestDataTypesStep step =
new IngestDataTypesStep(entityService, "./boot/test_data_types_invalid.json");
Assert.assertThrows(RuntimeException.class, () -> step.execute(mockContext));
verify(entityService, times(1)).exists(any(OperationContext.class), any(Collection.class));
// Verify no additional interactions
verifyNoMoreInteractions(entityService);
}
private static MetadataChangeProposal buildUpdateDataTypeProposal(final DataTypeInfo info) {
final MetadataChangeProposal mcp = new MetadataChangeProposal();
mcp.setEntityUrn(TEST_DATA_TYPE_URN);
mcp.setEntityType(DATA_TYPE_ENTITY_NAME);
mcp.setAspectName(DATA_TYPE_INFO_ASPECT_NAME);
mcp.setChangeType(ChangeType.UPSERT);
mcp.setAspect(GenericRecordUtils.serializeAspect(info));
return mcp;
}
@NotNull
private ConfigEntityRegistry getTestEntityRegistry() {
return new ConfigEntityRegistry(
IngestDataPlatformInstancesStepTest.class
.getClassLoader()
.getResourceAsStream("test-entity-registry.yaml"));
}
}

View File

@ -1,9 +0,0 @@
[
{
"urn": "urn:li:dataType:datahub.test",
"badField": {
"qualifiedName":"datahub.test",
"description": "Test Description"
}
}
]

View File

@ -1,10 +0,0 @@
[
{
"urn": "urn:li:dataType:datahub.test",
"info": {
"qualifiedName":"datahub.test",
"displayName": "Test Name",
"description": "Test Description"
}
}
]

View File

@ -1,708 +0,0 @@
[
{
"urn": "urn:li:dataPlatform:adlsGen1",
"aspect": {
"datasetNameDelimiter": "/",
"name": "adlsGen1",
"displayName": "Azure Data Lake (Gen 1)",
"type": "FILE_SYSTEM",
"logoUrl": "/assets/platforms/adlslogo.png"
}
},
{
"urn": "urn:li:dataPlatform:adlsGen2",
"aspect": {
"datasetNameDelimiter": "/",
"name": "adlsGen2",
"displayName": "Azure Data Lake (Gen 2)",
"type": "FILE_SYSTEM",
"logoUrl": "/assets/platforms/adlslogo.png"
}
},
{
"urn": "urn:li:dataPlatform:airflow",
"aspect": {
"datasetNameDelimiter": ".",
"name": "airflow",
"displayName": "Airflow",
"type": "OTHERS",
"logoUrl": "/assets/platforms/airflowlogo.png"
}
},
{
"urn": "urn:li:dataPlatform:ambry",
"aspect": {
"datasetNameDelimiter": ".",
"name": "ambry",
"displayName": "Ambry",
"type": "OBJECT_STORE"
}
},
{
"urn": "urn:li:dataPlatform:clickhouse",
"aspect": {
"datasetNameDelimiter": ".",
"name": "clickhouse",
"displayName": "ClickHouse",
"type": "RELATIONAL_DB",
"logoUrl": "/assets/platforms/clickhouselogo.png"
}
},
{
"urn": "urn:li:dataPlatform:cockroachdb",
"aspect": {
"datasetNameDelimiter": ".",
"name": "cockroachdb",
"displayName": "CockroachDb",
"type": "RELATIONAL_DB",
"logoUrl": "/assets/platforms/cockroachdblogo.png"
}
},
{
"urn": "urn:li:dataPlatform:couchbase",
"aspect": {
"datasetNameDelimiter": ".",
"name": "couchbase",
"displayName": "Couchbase",
"type": "KEY_VALUE_STORE",
"logoUrl": "/assets/platforms/couchbaselogo.png"
}
},
{
"urn": "urn:li:dataPlatform:dagster",
"aspect": {
"datasetNameDelimiter": "/",
"name": "dagster",
"displayName": "Dagster",
"type": "OTHERS",
"logoUrl": "/assets/platforms/dagsterlogo.svg"
}
},
{
"urn": "urn:li:dataPlatform:external",
"aspect": {
"datasetNameDelimiter": ".",
"name": "external",
"displayName": "External Source",
"type": "OTHERS"
}
},
{
"urn": "urn:li:dataPlatform:hdfs",
"aspect": {
"datasetNameDelimiter": "/",
"name": "hdfs",
"displayName": "HDFS",
"type": "FILE_SYSTEM",
"logoUrl": "/assets/platforms/hadooplogo.png"
}
},
{
"urn": "urn:li:dataPlatform:hana",
"aspect": {
"datasetNameDelimiter": ".",
"name": "hana",
"displayName": "SAP HANA",
"type": "RELATIONAL_DB",
"logoUrl": "/assets/platforms/hanalogo.png"
}
},
{
"urn": "urn:li:dataPlatform:hive",
"aspect": {
"datasetNameDelimiter": ".",
"name": "hive",
"displayName": "Hive",
"type": "FILE_SYSTEM",
"logoUrl": "/assets/platforms/hivelogo.png"
}
},
{
"urn": "urn:li:dataPlatform:iceberg",
"aspect": {
"datasetNameDelimiter": ".",
"name": "iceberg",
"displayName": "Iceberg",
"type": "FILE_SYSTEM",
"logoUrl": "/assets/platforms/iceberglogo.png"
}
},
{
"urn": "urn:li:dataPlatform:s3",
"aspect": {
"datasetNameDelimiter": "/",
"name": "s3",
"displayName": "AWS S3",
"type": "FILE_SYSTEM",
"logoUrl": "/assets/platforms/s3.png"
}
},
{
"urn": "urn:li:dataPlatform:kafka",
"aspect": {
"datasetNameDelimiter": ".",
"name": "kafka",
"displayName": "Kafka",
"type": "MESSAGE_BROKER",
"logoUrl": "/assets/platforms/kafkalogo.png"
}
},
{
"urn": "urn:li:dataPlatform:kafka-connect",
"aspect": {
"datasetNameDelimiter": ".",
"name": "kafka-connect",
"displayName": "Kafka Connect",
"type": "OTHERS",
"logoUrl": "/assets/platforms/kafkalogo.png"
}
},
{
"urn": "urn:li:dataPlatform:kusto",
"aspect": {
"datasetNameDelimiter": ".",
"name": "kusto",
"displayName": "Kusto",
"type": "OLAP_DATASTORE",
"logoUrl": "/assets/platforms/kustologo.png"
}
},
{
"urn": "urn:li:dataPlatform:mode",
"aspect": {
"datasetNameDelimiter": ".",
"name": "mode",
"displayName": "Mode",
"type": "KEY_VALUE_STORE",
"logoUrl": "/assets/platforms/modelogo.png"
}
},
{
"urn": "urn:li:dataPlatform:mongodb",
"aspect": {
"datasetNameDelimiter": ".",
"name": "mongodb",
"displayName": "MongoDB",
"type": "KEY_VALUE_STORE",
"logoUrl": "/assets/platforms/mongodblogo.png"
}
},
{
"urn": "urn:li:dataPlatform:mysql",
"aspect": {
"datasetNameDelimiter": ".",
"name": "mysql",
"displayName": "MySQL",
"type": "RELATIONAL_DB",
"logoUrl": "/assets/platforms/mysqllogo.png"
}
},
{
"urn": "urn:li:dataPlatform:db2",
"aspect": {
"datasetNameDelimiter": ".",
"name": "db2",
"displayName": "DB2",
"type": "RELATIONAL_DB",
"logoUrl": "/assets/platforms/db2logo.png"
}
},
{
"urn": "urn:li:dataPlatform:mariadb",
"aspect": {
"datasetNameDelimiter": ".",
"name": "mariadb",
"displayName": "MariaDB",
"type": "RELATIONAL_DB",
"logoUrl": "/assets/platforms/mariadblogo.png"
}
},
{
"urn": "urn:li:dataPlatform:OpenApi",
"aspect": {
"datasetNameDelimiter": ".",
"name": "openapi",
"displayName": "OpenAPI",
"type": "OTHERS",
"logoUrl": "/assets/platforms/openapilogo.png"
}
},
{
"urn": "urn:li:dataPlatform:oracle",
"aspect": {
"datasetNameDelimiter": ".",
"name": "oracle",
"displayName": "Oracle",
"type": "RELATIONAL_DB",
"logoUrl": "/assets/platforms/oraclelogo.png"
}
},
{
"urn": "urn:li:dataPlatform:pinot",
"aspect": {
"datasetNameDelimiter": ".",
"name": "pinot",
"displayName": "Pinot",
"type": "OLAP_DATASTORE",
"logoUrl": "/assets/platforms/pinotlogo.png"
}
},
{
"urn": "urn:li:dataPlatform:postgres",
"aspect": {
"datasetNameDelimiter": ".",
"name": "postgres",
"displayName": "PostgreSQL",
"type": "RELATIONAL_DB",
"logoUrl": "/assets/platforms/postgreslogo.png"
}
},
{
"urn": "urn:li:dataPlatform:prefect",
"aspect": {
"datasetNameDelimiter": ".",
"name": "prefect",
"displayName": "Prefect",
"type": "OTHERS",
"logoUrl": "/assets/platforms/prefectlogo.png"
}
},
{
"urn": "urn:li:dataPlatform:presto",
"aspect": {
"datasetNameDelimiter": ".",
"name": "prefect",
"displayName": "Prefect",
"type": "OTHERS",
"logoUrl": "/assets/platforms/prefectlogo.png"
}
},
{
"urn": "urn:li:dataPlatform:presto",
"aspect": {
"datasetNameDelimiter": ".",
"name": "presto",
"displayName": "Presto",
"type": "QUERY_ENGINE",
"logoUrl": "/assets/platforms/prestologo.png"
}
},
{
"urn": "urn:li:dataPlatform:tableau",
"aspect": {
"datasetNameDelimiter": ".",
"name": "tableau",
"displayName": "Tableau",
"type": "OTHERS",
"logoUrl": "/assets/platforms/tableaulogo.svg"
}
},
{
"urn": "urn:li:dataPlatform:teradata",
"aspect": {
"datasetNameDelimiter": ".",
"name": "teradata",
"displayName": "Teradata",
"type": "RELATIONAL_DB",
"logoUrl": "/assets/platforms/teradatalogo.png"
}
},
{
"urn": "urn:li:dataPlatform:voldemort",
"aspect": {
"datasetNameDelimiter": ".",
"name": "voldemort",
"displayName": "Voldemort",
"type": "KEY_VALUE_STORE"
}
},
{
"urn": "urn:li:dataPlatform:snowflake",
"aspect": {
"datasetNameDelimiter": ".",
"name": "snowflake",
"displayName": "Snowflake",
"type": "RELATIONAL_DB",
"logoUrl": "/assets/platforms/snowflakelogo.png"
}
},
{
"urn": "urn:li:dataPlatform:redshift",
"aspect": {
"datasetNameDelimiter": ".",
"name": "redshift",
"displayName": "Redshift",
"type": "RELATIONAL_DB",
"logoUrl": "/assets/platforms/redshiftlogo.png"
}
},
{
"urn": "urn:li:dataPlatform:mssql",
"aspect": {
"datasetNameDelimiter": ".",
"name": "mssql",
"displayName": "SQL Server",
"type": "RELATIONAL_DB",
"logoUrl": "/assets/platforms/mssqllogo.png"
}
},
{
"urn": "urn:li:dataPlatform:bigquery",
"aspect": {
"datasetNameDelimiter": ".",
"name": "bigquery",
"displayName": "BigQuery",
"type": "RELATIONAL_DB",
"logoUrl": "/assets/platforms/bigquerylogo.png"
}
},
{
"urn": "urn:li:dataPlatform:druid",
"aspect": {
"datasetNameDelimiter": ".",
"name": "druid",
"displayName": "Druid",
"type": "OLAP_DATASTORE",
"logoUrl": "/assets/platforms/druidlogo.png"
}
},
{
"urn": "urn:li:dataPlatform:looker",
"aspect": {
"datasetNameDelimiter": ".",
"name": "looker",
"displayName": "Looker",
"type": "OTHERS",
"logoUrl": "/assets/platforms/lookerlogo.svg"
}
},
{
"urn": "urn:li:dataPlatform:feast",
"aspect": {
"datasetNameDelimiter": ".",
"name": "feast",
"displayName": "Feast",
"type": "OTHERS",
"logoUrl": "/assets/platforms/feastlogo.png"
}
},
{
"urn": "urn:li:dataPlatform:sagemaker",
"aspect": {
"datasetNameDelimiter": ".",
"name": "sagemaker",
"displayName": "SageMaker",
"type": "OTHERS",
"logoUrl": "/assets/platforms/sagemakerlogo.png"
}
},
{
"urn": "urn:li:dataPlatform:mlflow",
"aspect": {
"datasetNameDelimiter": ".",
"name": "mlflow",
"displayName": "MLflow",
"type": "OTHERS",
"logoUrl": "/assets/platforms/mlflowlogo.png"
}
},
{
"urn": "urn:li:dataPlatform:glue",
"aspect": {
"datasetNameDelimiter": ".",
"name": "glue",
"displayName": "Glue",
"type": "OTHERS",
"logoUrl": "/assets/platforms/gluelogo.png"
}
},
{
"urn": "urn:li:dataPlatform:redash",
"aspect": {
"datasetNameDelimiter": ".",
"name": "redash",
"displayName": "Redash",
"type": "OTHERS",
"logoUrl": "/assets/platforms/redashlogo.png"
}
},
{
"urn": "urn:li:dataPlatform:athena",
"aspect": {
"datasetNameDelimiter": ".",
"name": "athena",
"displayName": "AWS Athena",
"type": "RELATIONAL_DB",
"logoUrl": "/assets/platforms/awsathenalogo.png"
}
},
{
"urn": "urn:li:dataPlatform:spark",
"aspect": {
"datasetNameDelimiter": ".",
"name": "spark",
"displayName": "Spark",
"type": "OTHERS",
"logoUrl": "/assets/platforms/sparklogo.png"
}
},
{
"urn": "urn:li:dataPlatform:dbt",
"aspect": {
"datasetNameDelimiter": ".",
"name": "dbt",
"displayName": "dbt",
"type": "OTHERS",
"logoUrl": "/assets/platforms/dbtlogo.png"
}
},
{
"urn": "urn:li:dataPlatform:elasticsearch",
"aspect": {
"datasetNameDelimiter": ".",
"name": "elasticsearch",
"displayName": "Elasticsearch",
"type": "OTHERS",
"logoUrl": "/assets/platforms/elasticsearchlogo.png"
}
},
{
"urn": "urn:li:dataPlatform:great-expectations",
"aspect": {
"name": "Great Expectations",
"displayName": "Great Expectations",
"type": "OTHERS",
"logoUrl": "/assets/platforms/greatexpectationslogo.png",
"datasetNameDelimiter": "."
}
},
{
"urn": "urn:li:dataPlatform:powerbi",
"aspect": {
"datasetNameDelimiter": ".",
"name": "powerbi",
"displayName": "Power BI",
"type": "OTHERS",
"logoUrl": "/assets/platforms/powerbilogo.png"
}
},
{
"urn": "urn:li:dataPlatform:presto-on-hive",
"aspect": {
"datasetNameDelimiter": ".",
"name": "presto-on-hive",
"displayName": "Presto on Hive",
"type": "FILE_SYSTEM",
"logoUrl": "/assets/platforms/prestoonhivelogo.png"
}
},
{
"urn": "urn:li:dataPlatform:metabase",
"aspect": {
"datasetNameDelimiter": ".",
"name": "metabase",
"displayName": "Metabase",
"type": "OTHERS",
"logoUrl": "/assets/platforms/metabaselogo.svg"
}
},
{
"urn": "urn:li:dataPlatform:nifi",
"aspect": {
"datasetNameDelimiter": ".",
"name": "nifi",
"displayName": "NiFi",
"type": "OTHERS",
"logoUrl": "/assets/platforms/nifilogo.svg"
}
},
{
"urn": "urn:li:dataPlatform:superset",
"aspect": {
"datasetNameDelimiter": ".",
"name": "superset",
"displayName": "Superset",
"type": "OTHERS",
"logoUrl": "/assets/platforms/supersetlogo.png"
}
},
{
"urn": "urn:li:dataPlatform:trino",
"aspect": {
"datasetNameDelimiter": ".",
"name": "trino",
"displayName": "Trino",
"type": "QUERY_ENGINE",
"logoUrl": "/assets/platforms/trinologo.png"
}
},
{
"urn": "urn:li:dataPlatform:pulsar",
"aspect": {
"datasetNameDelimiter": ".",
"name": "pulsar",
"displayName": "Pulsar",
"type": "MESSAGE_BROKER",
"logoUrl": "/assets/platforms/pulsarlogo.png"
}
},
{
"urn": "urn:li:dataPlatform:salesforce",
"aspect": {
"datasetNameDelimiter": ".",
"name": "salesforce",
"displayName": "Salesforce",
"type": "OTHERS",
"logoUrl": "/assets/platforms/logo-salesforce.svg"
}
},
{
"urn": "urn:li:dataPlatform:unknown",
"aspect": {
"datasetNameDelimiter": ".",
"name": "Unknown Platform",
"displayName": "N/A",
"type": "OTHERS"
}
},
{
"urn": "urn:li:dataPlatform:delta-lake",
"aspect": {
"datasetNameDelimiter": ".",
"name": "delta-lake",
"displayName": "Delta Lake",
"type": "OTHERS",
"logoUrl": "/assets/platforms/deltalakelogo.png"
}
},
{
"urn": "urn:li:dataPlatform:databricks",
"aspect": {
"datasetNameDelimiter": ".",
"name": "databricks",
"displayName": "Databricks",
"type": "OTHERS",
"logoUrl": "/assets/platforms/databrickslogo.png"
}
},
{
"urn": "urn:li:dataPlatform:vertica",
"aspect": {
"datasetNameDelimiter": ".",
"name": "vertica",
"displayName": "Vertica",
"type": "OLAP_DATASTORE",
"logoUrl": "/assets/platforms/verticalogo.png"
}
},
{
"urn": "urn:li:dataPlatform:gcs",
"aspect": {
"datasetNameDelimiter": "/",
"name": "gcs",
"displayName": "Google Cloud Storage",
"type": "FILE_SYSTEM",
"logoUrl": "/assets/platforms/gcslogo.svg"
}
},
{
"urn": "urn:li:dataPlatform:slack",
"aspect": {
"datasetNameDelimiter": ".",
"name": "Slack",
"displayName": "Slack",
"type": "OTHERS",
"logoUrl": "/assets/platforms/slacklogo.png"
}
},
{
"urn": "urn:li:dataPlatform:microsoft-teams",
"aspect": {
"datasetNameDelimiter": ".",
"name": "Microsoft Teams",
"displayName": "Microsoft Teams",
"type": "OTHERS",
"logoUrl": "/assets/platforms/teamslogo.png"
}
},
{
"urn": "urn:li:dataPlatform:dynamodb",
"aspect": {
"datasetNameDelimiter": ".",
"name": "dynamodb",
"displayName": "DynamoDB",
"type": "KEY_VALUE_STORE",
"logoUrl": "/assets/platforms/dynamodblogo.png"
}
},
{
"urn": "urn:li:dataPlatform:fivetran",
"aspect": {
"datasetNameDelimiter": ".",
"name": "fivetran",
"displayName": "Fivetran",
"type": "OTHERS",
"logoUrl": "/assets/platforms/fivetranlogo.png"
}
},
{
"urn": "urn:li:dataPlatform:csv",
"aspect": {
"datasetNameDelimiter": ".",
"name": "csv",
"displayName": "CSV",
"type": "OTHERS",
"logoUrl": "/assets/platforms/csv-logo.png"
}
},
{
"urn": "urn:li:dataPlatform:qlik-sense",
"aspect": {
"datasetNameDelimiter": ".",
"name": "qlik-sense",
"displayName": "Qlik Sense",
"type": "OTHERS",
"logoUrl": "/assets/platforms/qliklogo.png"
}
},
{
"urn": "urn:li:dataPlatform:file",
"aspect": {
"datasetNameDelimiter": ".",
"name": "file",
"displayName": "File",
"type": "OTHERS",
"logoUrl": "/assets/platforms/file-logo.svg"
}
},
{
"urn": "urn:li:dataPlatform:excel",
"aspect": {
"name": "excel",
"displayName": "Excel",
"type": "OTHERS",
"datasetNameDelimiter": "/",
"logoUrl": "/assets/platforms/excel-logo.svg"
}
},
{
"urn": "urn:li:dataPlatform:sigma",
"aspect": {
"datasetNameDelimiter": ".",
"name": "sigma",
"displayName": "Sigma",
"type": "OTHERS",
"logoUrl": "/assets/platforms/sigmalogo.png"
}
},
{
"urn": "urn:li:dataPlatform:sac",
"aspect": {
"datasetNameDelimiter": ".",
"name": "sac",
"displayName": "SAP Analytics Cloud",
"type": "OTHERS",
"logoUrl": "/assets/platforms/saclogo.svg"
}
}
]

View File

@ -1,42 +0,0 @@
[
{
"urn": "urn:li:dataType:datahub.string",
"info": {
"qualifiedName":"datahub.string",
"displayName": "String",
"description": "A string of characters."
}
},
{
"urn": "urn:li:dataType:datahub.number",
"info": {
"qualifiedName":"datahub.number",
"displayName": "Number",
"description": "An integer or decimal number."
}
},
{
"urn": "urn:li:dataType:datahub.urn",
"info": {
"qualifiedName":"datahub.urn",
"displayName": "Urn",
"description": "An unique identifier for a DataHub entity."
}
},
{
"urn": "urn:li:dataType:datahub.rich_text",
"info": {
"qualifiedName":"datahub.rich_text",
"displayName": "Rich Text",
"description": "An attributed string of characters."
}
},
{
"urn": "urn:li:dataType:datahub.date",
"info": {
"qualifiedName":"datahub.date",
"displayName": "Date",
"description": "A specific day, without time."
}
}
]

View File

@ -1,30 +0,0 @@
[
{
"urn": "urn:li:ownershipType:__system__technical_owner",
"info": {
"name":"Technical Owner",
"description":"Involved in the production, maintenance, or distribution of the asset(s)."
}
},
{
"urn": "urn:li:ownershipType:__system__business_owner",
"info": {
"name":"Business Owner",
"description":"Principle stakeholders or domain experts associated with the asset(s)."
}
},
{
"urn": "urn:li:ownershipType:__system__data_steward",
"info": {
"name":"Data Steward",
"description":"Involved in governance of the asset(s)."
}
},
{
"urn": "urn:li:ownershipType:__system__none",
"info": {
"name":"None",
"description":"No ownership type specified."
}
}
]

View File

@ -1,26 +0,0 @@
[
{
"urn": "urn:li:dataHubRole:Admin",
"info": {
"name":"Admin",
"description":"Can do everything on the platform.",
"editable":false
}
},
{
"urn": "urn:li:dataHubRole:Editor",
"info": {
"name":"Editor",
"description":"Can read and edit all metadata. Cannot take administrative actions.",
"editable":false
}
},
{
"urn": "urn:li:dataHubRole:Reader",
"info": {
"name":"Reader",
"description":"Can read all metadata. Cannot edit anything by default, or take administrative actions.",
"editable":false
}
}
]

View File

@ -1,8 +0,0 @@
{
"urn": "urn:li:corpuser:datahub",
"info": {
"active": true,
"displayName": "DataHub",
"title": "DataHub Root User"
}
}

View File

@ -59,9 +59,13 @@ public class GenericRecordUtils {
@Nonnull
public static GenericAspect serializeAspect(@Nonnull RecordTemplate aspect) {
return serializeAspect(RecordUtils.toJsonString(aspect));
}
@Nonnull
public static GenericAspect serializeAspect(@Nonnull String str) {
GenericAspect genericAspect = new GenericAspect();
genericAspect.setValue(
ByteString.unsafeWrap(RecordUtils.toJsonString(aspect).getBytes(StandardCharsets.UTF_8)));
genericAspect.setValue(ByteString.unsafeWrap(str.getBytes(StandardCharsets.UTF_8)));
genericAspect.setContentType(GenericRecordUtils.JSON);
return genericAspect;
}