From 2ef2ad05d0cda459581a48802e0d478f6ffe3988 Mon Sep 17 00:00:00 2001 From: Chris Collins Date: Thu, 22 Dec 2022 16:16:51 -0500 Subject: [PATCH] fix(ingestion) Inject pipeline_name into recipes at runtime (#6833) --- build.gradle | 1 + ...eateIngestionExecutionRequestResolver.java | 6 ++- .../source/UpsertIngestionSourceResolver.java | 31 ++-------------- .../ingestion/IngestionScheduler.java | 4 +- metadata-utils/build.gradle | 1 + .../metadata/utils/IngestionUtils.java | 37 +++++++++++++++++++ .../metadata/utils/IngestionUtilsTest.java | 29 +++++++++++++++ 7 files changed, 80 insertions(+), 29 deletions(-) create mode 100644 metadata-utils/src/main/java/com/linkedin/metadata/utils/IngestionUtils.java create mode 100644 metadata-utils/src/test/java/com/linkedin/metadata/utils/IngestionUtilsTest.java diff --git a/build.gradle b/build.gradle index 22217f2149..f8a911b1e4 100644 --- a/build.gradle +++ b/build.gradle @@ -114,6 +114,7 @@ project.ext.externalDependency = [ 'jsonSchemaAvro': 'com.github.fge:json-schema-avro:0.1.4', 'jsonSimple': 'com.googlecode.json-simple:json-simple:1.1.1', 'jsonSmart': 'net.minidev:json-smart:2.4.6', + 'json': 'org.json:json:20090211', 'junitJupiterApi': "org.junit.jupiter:junit-jupiter-api:$junitJupiterVersion", 'junitJupiterParams': "org.junit.jupiter:junit-jupiter-params:$junitJupiterVersion", 'junitJupiterEngine': "org.junit.jupiter:junit-jupiter-engine:$junitJupiterVersion", diff --git a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/ingest/execution/CreateIngestionExecutionRequestResolver.java b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/ingest/execution/CreateIngestionExecutionRequestResolver.java index ccec5f4b67..1f67856e69 100644 --- a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/ingest/execution/CreateIngestionExecutionRequestResolver.java +++ b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/ingest/execution/CreateIngestionExecutionRequestResolver.java @@ -22,6 +22,7 @@ import com.linkedin.metadata.config.IngestionConfiguration; import com.linkedin.metadata.key.ExecutionRequestKey; import com.linkedin.metadata.utils.EntityKeyUtils; import com.linkedin.metadata.utils.GenericRecordUtils; +import com.linkedin.metadata.utils.IngestionUtils; import com.linkedin.mxe.MetadataChangeProposal; import graphql.schema.DataFetcher; import graphql.schema.DataFetchingEnvironment; @@ -105,7 +106,10 @@ public class CreateIngestionExecutionRequestResolver implements DataFetcher arguments = new HashMap<>(); - arguments.put(RECIPE_ARG_NAME, injectRunId(ingestionSourceInfo.getConfig().getRecipe(), executionRequestUrn.toString())); + String recipe = ingestionSourceInfo.getConfig().getRecipe(); + recipe = injectRunId(recipe, executionRequestUrn.toString()); + recipe = IngestionUtils.injectPipelineName(recipe, executionRequestUrn.toString()); + arguments.put(RECIPE_ARG_NAME, recipe); arguments.put(VERSION_ARG_NAME, ingestionSourceInfo.getConfig().hasVersion() ? ingestionSourceInfo.getConfig().getVersion() : _ingestionConfiguration.getDefaultCliVersion() diff --git a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/ingest/source/UpsertIngestionSourceResolver.java b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/ingest/source/UpsertIngestionSourceResolver.java index 8e101be7ac..41e786927c 100644 --- a/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/ingest/source/UpsertIngestionSourceResolver.java +++ b/datahub-graphql-core/src/main/java/com/linkedin/datahub/graphql/resolvers/ingest/source/UpsertIngestionSourceResolver.java @@ -21,8 +21,6 @@ import com.linkedin.mxe.MetadataChangeProposal; import graphql.schema.DataFetcher; import graphql.schema.DataFetchingEnvironment; import lombok.extern.slf4j.Slf4j; -import org.json.JSONException; -import org.json.JSONObject; import java.net.URISyntaxException; import java.util.Optional; @@ -56,7 +54,6 @@ public class UpsertIngestionSourceResolver implements DataFetcher arguments = new HashMap<>(); - arguments.put(RECIPE_ARGUMENT_NAME, _ingestionSourceInfo.getConfig().getRecipe()); + String recipe = IngestionUtils.injectPipelineName(_ingestionSourceInfo.getConfig().getRecipe(), _ingestionSourceUrn.toString()); + arguments.put(RECIPE_ARGUMENT_NAME, recipe); arguments.put(VERSION_ARGUMENT_NAME, _ingestionSourceInfo.getConfig().hasVersion() ? _ingestionSourceInfo.getConfig().getVersion() : _ingestionConfiguration.getDefaultCliVersion()); diff --git a/metadata-utils/build.gradle b/metadata-utils/build.gradle index 3ab83a97d2..9124480146 100644 --- a/metadata-utils/build.gradle +++ b/metadata-utils/build.gradle @@ -8,6 +8,7 @@ dependencies { compile externalDependency.elasticSearchRest compile externalDependency.httpClient compile externalDependency.neo4jJavaDriver + compile externalDependency.json compile spec.product.pegasus.restliClient compile spec.product.pegasus.restliCommon diff --git a/metadata-utils/src/main/java/com/linkedin/metadata/utils/IngestionUtils.java b/metadata-utils/src/main/java/com/linkedin/metadata/utils/IngestionUtils.java new file mode 100644 index 0000000000..d923005c8c --- /dev/null +++ b/metadata-utils/src/main/java/com/linkedin/metadata/utils/IngestionUtils.java @@ -0,0 +1,37 @@ +package com.linkedin.metadata.utils; + +import org.json.JSONException; +import org.json.JSONObject; + +import javax.annotation.Nonnull; + + +public class IngestionUtils { + + private static final String PIPELINE_NAME = "pipeline_name"; + + private IngestionUtils() { + } + + /** + * Injects a pipeline_name into a recipe if there isn't a pipeline_name already there. + * The pipeline_name will be the urn of the ingestion source. + * + * @param pipelineName the new pipeline name in the recipe. + * @return a modified recipe JSON string + */ + public static String injectPipelineName(@Nonnull String originalJson, @Nonnull final String pipelineName) { + try { + final JSONObject jsonRecipe = new JSONObject(originalJson); + boolean hasPipelineName = jsonRecipe.has(PIPELINE_NAME) && jsonRecipe.get(PIPELINE_NAME) != null && !jsonRecipe.get(PIPELINE_NAME).equals(""); + + if (!hasPipelineName) { + jsonRecipe.put(PIPELINE_NAME, pipelineName); + return jsonRecipe.toString(); + } + } catch (JSONException e) { + throw new IllegalArgumentException("Failed to create execution request: Invalid recipe json provided.", e); + } + return originalJson; + } +} diff --git a/metadata-utils/src/test/java/com/linkedin/metadata/utils/IngestionUtilsTest.java b/metadata-utils/src/test/java/com/linkedin/metadata/utils/IngestionUtilsTest.java new file mode 100644 index 0000000000..8b2078c7b9 --- /dev/null +++ b/metadata-utils/src/test/java/com/linkedin/metadata/utils/IngestionUtilsTest.java @@ -0,0 +1,29 @@ +package com.linkedin.metadata.utils; + +import org.testng.annotations.Test; + + +import static org.testng.Assert.assertEquals; + +public class IngestionUtilsTest { + + private final String ingestionSourceUrn = "urn:li:ingestionSource:12345"; + + @Test + public void injectPipelineNameWhenThere() { + String recipe = "{\"source\":{\"type\":\"snowflake\",\"config\":{\"stateful_ingestion\":{\"enabled\":true}}},\"pipeline_name\":\"test\"}"; + + assertEquals(recipe, IngestionUtils.injectPipelineName(recipe, ingestionSourceUrn)); + } + + @Test + public void injectPipelineNameWhenNotThere() { + String recipe = "{\"source\":{\"type\":\"snowflake\",\"config\":{\"stateful_ingestion\":{\"enabled\":true}}}}"; + recipe = IngestionUtils.injectPipelineName(recipe, ingestionSourceUrn); + + assertEquals( + recipe, + "{\"source\":{\"type\":\"snowflake\",\"config\":{\"stateful_ingestion\":{\"enabled\":true}}},\"pipeline_name\":\"urn:li:ingestionSource:12345\"}" + ); + } +}