fix(ingestion) Inject pipeline_name into recipes at runtime (#6833)

This commit is contained in:
Chris Collins 2022-12-22 16:16:51 -05:00 committed by GitHub
parent 4cba09e97d
commit 2ef2ad05d0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 80 additions and 29 deletions

View File

@ -114,6 +114,7 @@ project.ext.externalDependency = [
'jsonSchemaAvro': 'com.github.fge:json-schema-avro:0.1.4', 'jsonSchemaAvro': 'com.github.fge:json-schema-avro:0.1.4',
'jsonSimple': 'com.googlecode.json-simple:json-simple:1.1.1', 'jsonSimple': 'com.googlecode.json-simple:json-simple:1.1.1',
'jsonSmart': 'net.minidev:json-smart:2.4.6', 'jsonSmart': 'net.minidev:json-smart:2.4.6',
'json': 'org.json:json:20090211',
'junitJupiterApi': "org.junit.jupiter:junit-jupiter-api:$junitJupiterVersion", 'junitJupiterApi': "org.junit.jupiter:junit-jupiter-api:$junitJupiterVersion",
'junitJupiterParams': "org.junit.jupiter:junit-jupiter-params:$junitJupiterVersion", 'junitJupiterParams': "org.junit.jupiter:junit-jupiter-params:$junitJupiterVersion",
'junitJupiterEngine': "org.junit.jupiter:junit-jupiter-engine:$junitJupiterVersion", 'junitJupiterEngine': "org.junit.jupiter:junit-jupiter-engine:$junitJupiterVersion",

View File

@ -22,6 +22,7 @@ import com.linkedin.metadata.config.IngestionConfiguration;
import com.linkedin.metadata.key.ExecutionRequestKey; import com.linkedin.metadata.key.ExecutionRequestKey;
import com.linkedin.metadata.utils.EntityKeyUtils; import com.linkedin.metadata.utils.EntityKeyUtils;
import com.linkedin.metadata.utils.GenericRecordUtils; import com.linkedin.metadata.utils.GenericRecordUtils;
import com.linkedin.metadata.utils.IngestionUtils;
import com.linkedin.mxe.MetadataChangeProposal; import com.linkedin.mxe.MetadataChangeProposal;
import graphql.schema.DataFetcher; import graphql.schema.DataFetcher;
import graphql.schema.DataFetchingEnvironment; import graphql.schema.DataFetchingEnvironment;
@ -105,7 +106,10 @@ public class CreateIngestionExecutionRequestResolver implements DataFetcher<Comp
execInput.setRequestedAt(System.currentTimeMillis()); execInput.setRequestedAt(System.currentTimeMillis());
Map<String, String> arguments = new HashMap<>(); Map<String, String> arguments = new HashMap<>();
arguments.put(RECIPE_ARG_NAME, injectRunId(ingestionSourceInfo.getConfig().getRecipe(), executionRequestUrn.toString())); String recipe = ingestionSourceInfo.getConfig().getRecipe();
recipe = injectRunId(recipe, executionRequestUrn.toString());
recipe = IngestionUtils.injectPipelineName(recipe, executionRequestUrn.toString());
arguments.put(RECIPE_ARG_NAME, recipe);
arguments.put(VERSION_ARG_NAME, ingestionSourceInfo.getConfig().hasVersion() arguments.put(VERSION_ARG_NAME, ingestionSourceInfo.getConfig().hasVersion()
? ingestionSourceInfo.getConfig().getVersion() ? ingestionSourceInfo.getConfig().getVersion()
: _ingestionConfiguration.getDefaultCliVersion() : _ingestionConfiguration.getDefaultCliVersion()

View File

@ -21,8 +21,6 @@ import com.linkedin.mxe.MetadataChangeProposal;
import graphql.schema.DataFetcher; import graphql.schema.DataFetcher;
import graphql.schema.DataFetchingEnvironment; import graphql.schema.DataFetchingEnvironment;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.json.JSONException;
import org.json.JSONObject;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.util.Optional; import java.util.Optional;
@ -56,7 +54,6 @@ public class UpsertIngestionSourceResolver implements DataFetcher<CompletableFut
final UpdateIngestionSourceInput input = bindArgument(environment.getArgument("input"), UpdateIngestionSourceInput.class); final UpdateIngestionSourceInput input = bindArgument(environment.getArgument("input"), UpdateIngestionSourceInput.class);
final MetadataChangeProposal proposal = new MetadataChangeProposal(); final MetadataChangeProposal proposal = new MetadataChangeProposal();
String ingestionSourceUrnString;
if (ingestionSourceUrn.isPresent()) { if (ingestionSourceUrn.isPresent()) {
// Update existing ingestion source // Update existing ingestion source
@ -67,7 +64,6 @@ public class UpsertIngestionSourceResolver implements DataFetcher<CompletableFut
String.format("Malformed urn %s provided.", ingestionSourceUrn.get()), String.format("Malformed urn %s provided.", ingestionSourceUrn.get()),
DataHubGraphQLErrorCode.BAD_REQUEST); DataHubGraphQLErrorCode.BAD_REQUEST);
} }
ingestionSourceUrnString = ingestionSourceUrn.get();
} else { } else {
// Create new ingestion source // Create new ingestion source
// Since we are creating a new Ingestion Source, we need to generate a unique UUID. // Since we are creating a new Ingestion Source, we need to generate a unique UUID.
@ -78,11 +74,10 @@ public class UpsertIngestionSourceResolver implements DataFetcher<CompletableFut
final DataHubIngestionSourceKey key = new DataHubIngestionSourceKey(); final DataHubIngestionSourceKey key = new DataHubIngestionSourceKey();
key.setId(uuidStr); key.setId(uuidStr);
proposal.setEntityKeyAspect(GenericRecordUtils.serializeAspect(key)); proposal.setEntityKeyAspect(GenericRecordUtils.serializeAspect(key));
ingestionSourceUrnString = String.format("urn:li:dataHubIngestionSource:%s", uuidStr);
} }
// Create the policy info. // Create the policy info.
final DataHubIngestionSourceInfo info = mapIngestionSourceInfo(input, ingestionSourceUrnString); final DataHubIngestionSourceInfo info = mapIngestionSourceInfo(input);
proposal.setEntityType(Constants.INGESTION_SOURCE_ENTITY_NAME); proposal.setEntityType(Constants.INGESTION_SOURCE_ENTITY_NAME);
proposal.setAspectName(Constants.INGESTION_INFO_ASPECT_NAME); proposal.setAspectName(Constants.INGESTION_INFO_ASPECT_NAME);
proposal.setAspect(GenericRecordUtils.serializeAspect(info)); proposal.setAspect(GenericRecordUtils.serializeAspect(info));
@ -98,23 +93,20 @@ public class UpsertIngestionSourceResolver implements DataFetcher<CompletableFut
}); });
} }
private DataHubIngestionSourceInfo mapIngestionSourceInfo(final UpdateIngestionSourceInput input, final String ingestionSourceUrn) { private DataHubIngestionSourceInfo mapIngestionSourceInfo(final UpdateIngestionSourceInput input) {
final DataHubIngestionSourceInfo result = new DataHubIngestionSourceInfo(); final DataHubIngestionSourceInfo result = new DataHubIngestionSourceInfo();
result.setType(input.getType()); result.setType(input.getType());
result.setName(input.getName()); result.setName(input.getName());
result.setConfig(mapConfig(input.getConfig(), ingestionSourceUrn)); result.setConfig(mapConfig(input.getConfig()));
if (input.getSchedule() != null) { if (input.getSchedule() != null) {
result.setSchedule(mapSchedule(input.getSchedule())); result.setSchedule(mapSchedule(input.getSchedule()));
} }
return result; return result;
} }
private DataHubIngestionSourceConfig mapConfig(final UpdateIngestionSourceConfigInput input, final String ingestionSourceUrn) { private DataHubIngestionSourceConfig mapConfig(final UpdateIngestionSourceConfigInput input) {
final DataHubIngestionSourceConfig result = new DataHubIngestionSourceConfig(); final DataHubIngestionSourceConfig result = new DataHubIngestionSourceConfig();
String recipe = input.getRecipe(); String recipe = input.getRecipe();
if (recipe != null) {
recipe = optionallySetPipelineName(recipe, ingestionSourceUrn);
}
result.setRecipe(recipe); result.setRecipe(recipe);
if (input.getVersion() != null) { if (input.getVersion() != null) {
result.setVersion(input.getVersion()); result.setVersion(input.getVersion());
@ -134,19 +126,4 @@ public class UpsertIngestionSourceResolver implements DataFetcher<CompletableFut
result.setTimezone(input.getTimezone()); result.setTimezone(input.getTimezone());
return result; return result;
} }
private String optionallySetPipelineName(String recipe, String ingestionSourceUrn) {
try {
JSONObject jsonRecipe = new JSONObject(recipe);
boolean hasPipelineName = jsonRecipe.has("pipeline_name") && jsonRecipe.get("pipeline_name") != null && !jsonRecipe.get("pipeline_name").equals("");
if (!hasPipelineName) {
jsonRecipe.put("pipeline_name", ingestionSourceUrn);
recipe = jsonRecipe.toString();
}
} catch (JSONException e) {
log.warn("Error parsing ingestion recipe in JSON form", e);
}
return recipe;
}
} }

View File

@ -21,6 +21,7 @@ import com.linkedin.metadata.config.IngestionConfiguration;
import com.linkedin.metadata.key.ExecutionRequestKey; import com.linkedin.metadata.key.ExecutionRequestKey;
import com.linkedin.metadata.query.ListResult; import com.linkedin.metadata.query.ListResult;
import com.linkedin.metadata.utils.GenericRecordUtils; import com.linkedin.metadata.utils.GenericRecordUtils;
import com.linkedin.metadata.utils.IngestionUtils;
import com.linkedin.mxe.MetadataChangeProposal; import com.linkedin.mxe.MetadataChangeProposal;
import com.linkedin.r2.RemoteInvocationException; import com.linkedin.r2.RemoteInvocationException;
import java.util.ArrayList; import java.util.ArrayList;
@ -347,7 +348,8 @@ public class IngestionScheduler {
input.setRequestedAt(System.currentTimeMillis()); input.setRequestedAt(System.currentTimeMillis());
Map<String, String> arguments = new HashMap<>(); Map<String, String> arguments = new HashMap<>();
arguments.put(RECIPE_ARGUMENT_NAME, _ingestionSourceInfo.getConfig().getRecipe()); String recipe = IngestionUtils.injectPipelineName(_ingestionSourceInfo.getConfig().getRecipe(), _ingestionSourceUrn.toString());
arguments.put(RECIPE_ARGUMENT_NAME, recipe);
arguments.put(VERSION_ARGUMENT_NAME, _ingestionSourceInfo.getConfig().hasVersion() arguments.put(VERSION_ARGUMENT_NAME, _ingestionSourceInfo.getConfig().hasVersion()
? _ingestionSourceInfo.getConfig().getVersion() ? _ingestionSourceInfo.getConfig().getVersion()
: _ingestionConfiguration.getDefaultCliVersion()); : _ingestionConfiguration.getDefaultCliVersion());

View File

@ -8,6 +8,7 @@ dependencies {
compile externalDependency.elasticSearchRest compile externalDependency.elasticSearchRest
compile externalDependency.httpClient compile externalDependency.httpClient
compile externalDependency.neo4jJavaDriver compile externalDependency.neo4jJavaDriver
compile externalDependency.json
compile spec.product.pegasus.restliClient compile spec.product.pegasus.restliClient
compile spec.product.pegasus.restliCommon compile spec.product.pegasus.restliCommon

View File

@ -0,0 +1,37 @@
package com.linkedin.metadata.utils;
import org.json.JSONException;
import org.json.JSONObject;
import javax.annotation.Nonnull;
public class IngestionUtils {
private static final String PIPELINE_NAME = "pipeline_name";
private IngestionUtils() {
}
/**
* Injects a pipeline_name into a recipe if there isn't a pipeline_name already there.
* The pipeline_name will be the urn of the ingestion source.
*
* @param pipelineName the new pipeline name in the recipe.
* @return a modified recipe JSON string
*/
public static String injectPipelineName(@Nonnull String originalJson, @Nonnull final String pipelineName) {
try {
final JSONObject jsonRecipe = new JSONObject(originalJson);
boolean hasPipelineName = jsonRecipe.has(PIPELINE_NAME) && jsonRecipe.get(PIPELINE_NAME) != null && !jsonRecipe.get(PIPELINE_NAME).equals("");
if (!hasPipelineName) {
jsonRecipe.put(PIPELINE_NAME, pipelineName);
return jsonRecipe.toString();
}
} catch (JSONException e) {
throw new IllegalArgumentException("Failed to create execution request: Invalid recipe json provided.", e);
}
return originalJson;
}
}

View File

@ -0,0 +1,29 @@
package com.linkedin.metadata.utils;
import org.testng.annotations.Test;
import static org.testng.Assert.assertEquals;
public class IngestionUtilsTest {
private final String ingestionSourceUrn = "urn:li:ingestionSource:12345";
@Test
public void injectPipelineNameWhenThere() {
String recipe = "{\"source\":{\"type\":\"snowflake\",\"config\":{\"stateful_ingestion\":{\"enabled\":true}}},\"pipeline_name\":\"test\"}";
assertEquals(recipe, IngestionUtils.injectPipelineName(recipe, ingestionSourceUrn));
}
@Test
public void injectPipelineNameWhenNotThere() {
String recipe = "{\"source\":{\"type\":\"snowflake\",\"config\":{\"stateful_ingestion\":{\"enabled\":true}}}}";
recipe = IngestionUtils.injectPipelineName(recipe, ingestionSourceUrn);
assertEquals(
recipe,
"{\"source\":{\"type\":\"snowflake\",\"config\":{\"stateful_ingestion\":{\"enabled\":true}}},\"pipeline_name\":\"urn:li:ingestionSource:12345\"}"
);
}
}