fix(ingestion) Set pipeline_name on UI recipes with forms (#5535)

This commit is contained in:
Chris Collins 2022-08-01 19:26:46 -04:00 committed by GitHub
parent d0f0fae431
commit 3b85c79ada
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 47 additions and 45 deletions

View File

@ -20,6 +20,10 @@ import com.linkedin.metadata.utils.GenericRecordUtils;
import com.linkedin.mxe.MetadataChangeProposal;
import graphql.schema.DataFetcher;
import graphql.schema.DataFetchingEnvironment;
import lombok.extern.slf4j.Slf4j;
import org.json.JSONException;
import org.json.JSONObject;
import java.net.URISyntaxException;
import java.util.Optional;
import java.util.UUID;
@ -31,6 +35,7 @@ import static com.linkedin.datahub.graphql.resolvers.ResolverUtils.*;
/**
* Creates or updates an ingestion source. Requires the MANAGE_INGESTION privilege.
*/
@Slf4j
public class UpsertIngestionSourceResolver implements DataFetcher<CompletableFuture<String>> {
private final EntityClient _entityClient;
@ -51,6 +56,7 @@ public class UpsertIngestionSourceResolver implements DataFetcher<CompletableFut
final UpdateIngestionSourceInput input = bindArgument(environment.getArgument("input"), UpdateIngestionSourceInput.class);
final MetadataChangeProposal proposal = new MetadataChangeProposal();
String ingestionSourceUrnString;
if (ingestionSourceUrn.isPresent()) {
// Update existing ingestion source
@ -61,6 +67,7 @@ public class UpsertIngestionSourceResolver implements DataFetcher<CompletableFut
String.format("Malformed urn %s provided.", ingestionSourceUrn.get()),
DataHubGraphQLErrorCode.BAD_REQUEST);
}
ingestionSourceUrnString = ingestionSourceUrn.get();
} else {
// Create new ingestion source
// Since we are creating a new Ingestion Source, we need to generate a unique UUID.
@ -71,10 +78,11 @@ public class UpsertIngestionSourceResolver implements DataFetcher<CompletableFut
final DataHubIngestionSourceKey key = new DataHubIngestionSourceKey();
key.setId(uuidStr);
proposal.setEntityKeyAspect(GenericRecordUtils.serializeAspect(key));
ingestionSourceUrnString = String.format("urn:li:dataHubIngestionSource:%s", uuidStr);
}
// Create the policy info.
final DataHubIngestionSourceInfo info = mapIngestionSourceInfo(input);
final DataHubIngestionSourceInfo info = mapIngestionSourceInfo(input, ingestionSourceUrnString);
proposal.setEntityType(Constants.INGESTION_SOURCE_ENTITY_NAME);
proposal.setAspectName(Constants.INGESTION_INFO_ASPECT_NAME);
proposal.setAspect(GenericRecordUtils.serializeAspect(info));
@ -90,20 +98,24 @@ public class UpsertIngestionSourceResolver implements DataFetcher<CompletableFut
});
}
private DataHubIngestionSourceInfo mapIngestionSourceInfo(final UpdateIngestionSourceInput input) {
private DataHubIngestionSourceInfo mapIngestionSourceInfo(final UpdateIngestionSourceInput input, final String ingestionSourceUrn) {
final DataHubIngestionSourceInfo result = new DataHubIngestionSourceInfo();
result.setType(input.getType());
result.setName(input.getName());
result.setConfig(mapConfig(input.getConfig()));
result.setConfig(mapConfig(input.getConfig(), ingestionSourceUrn));
if (input.getSchedule() != null) {
result.setSchedule(mapSchedule(input.getSchedule()));
}
return result;
}
private DataHubIngestionSourceConfig mapConfig(final UpdateIngestionSourceConfigInput input) {
private DataHubIngestionSourceConfig mapConfig(final UpdateIngestionSourceConfigInput input, final String ingestionSourceUrn) {
final DataHubIngestionSourceConfig result = new DataHubIngestionSourceConfig();
result.setRecipe(input.getRecipe());
String recipe = input.getRecipe();
if (recipe != null) {
recipe = optionallySetPipelineName(recipe, ingestionSourceUrn);
}
result.setRecipe(recipe);
if (input.getVersion() != null) {
result.setVersion(input.getVersion());
}
@ -119,4 +131,19 @@ public class UpsertIngestionSourceResolver implements DataFetcher<CompletableFut
result.setTimezone(input.getTimezone());
return result;
}
private String optionallySetPipelineName(String recipe, String ingestionSourceUrn) {
try {
JSONObject jsonRecipe = new JSONObject(recipe);
boolean hasPipelineName = jsonRecipe.has("pipeline_name") && jsonRecipe.get("pipeline_name") != null && !jsonRecipe.get("pipeline_name").equals("");
if (!hasPipelineName) {
jsonRecipe.put("pipeline_name", ingestionSourceUrn);
recipe = jsonRecipe.toString();
}
} catch (JSONException e) {
log.warn("Error parsing ingestion recipe in JSON form", e);
}
return recipe;
}
}

View File

@ -301,25 +301,6 @@ export const INCLUDE_LINEAGE: RecipeField = {
},
};
export const IGNORE_START_TIME_LINEAGE: RecipeField = {
name: 'ignore_start_time_lineage',
label: 'Ignore Start Time Lineage',
tooltip: 'Get all lineage by ignoring the start_time field. It is suggested to set to true initially.',
type: FieldType.BOOLEAN,
fieldPath: 'source.config.ignore_start_time_lineage',
rules: null,
};
export const CHECK_ROLE_GRANTS: RecipeField = {
name: 'check_role_grants',
label: 'Check Role Grants',
tooltip:
'If set to True then checks role grants at the beginning of the ingestion run. To be used for debugging purposes. If you think everything is working fine then set it to False. In some cases this can take long depending on how many roles you might have.',
type: FieldType.BOOLEAN,
fieldPath: 'source.config.check_role_grants',
rules: null,
};
export const PROFILING_ENABLED: RecipeField = {
name: 'profiling.enabled',
label: 'Enable Profiling',
@ -341,7 +322,7 @@ export const STATEFUL_INGESTION_ENABLED: RecipeField = {
export const UPSTREAM_LINEAGE_IN_REPORT: RecipeField = {
name: 'upstream_lineage_in_report',
label: 'Include Upstream Lineage In Report.',
tooltip: 'Useful for debugging lineage information. Set to True to see the raw lineage created internally.',
tooltip: 'Remove stale datasets from datahub once they have been deleted in the source.',
type: FieldType.BOOLEAN,
fieldPath: 'source.config.upstream_lineage_in_report',
rules: null,
@ -613,13 +594,7 @@ export const DASHBOARD_DENY: RecipeField = {
export const RECIPE_FIELDS = {
[SNOWFLAKE]: {
fields: [SNOWFLAKE_ACCOUNT_ID, SNOWFLAKE_WAREHOUSE, SNOWFLAKE_USERNAME, SNOWFLAKE_PASSWORD, SNOWFLAKE_ROLE],
advancedFields: [
INCLUDE_LINEAGE,
IGNORE_START_TIME_LINEAGE,
CHECK_ROLE_GRANTS,
PROFILING_ENABLED,
STATEFUL_INGESTION_ENABLED,
],
advancedFields: [INCLUDE_LINEAGE, PROFILING_ENABLED, STATEFUL_INGESTION_ENABLED],
filterFields: [
TABLE_ALLOW,
TABLE_DENY,

View File

@ -10,15 +10,17 @@ source:
# Credentials
credential:
project_id: # Your BQ project id, e.g. sample_project_id
# Add secret in Secrets Tab with the relevant names for each variable below
# Your BQ private key id, e.g. "d0121d0000882411234e11166c6aaa23ed5d74e0"
private_key_id: "\${BQ_PRIVATE_KEY_ID}"
# Your BQ private key, e.g. "-----BEGIN PRIVATE KEY-----\\nMIIyourkey\\n-----END PRIVATE KEY-----\\n"
private_key: "\${BQ_PRIVATE_KEY}"
client_email: # Your BQ client email, e.g. "test@suppproject-id-1234567.iam.gserviceaccount.com"
client_id: # Your BQ client id, e.g. "123456678890"
include_table_lineage: true
include_view_lineage: true
profiling:
enabled: true
stateful_ingestion:
enabled: true
`;
export const BIGQUERY = 'bigquery';

View File

@ -14,13 +14,13 @@ source:
username: "\${REDSHIFT_USERNAME}" # Your Redshift username, e.g. admin
password: "\${REDSHIFT_PASSWORD}" # Your Redshift password, e.g. password_01
# Options
include_tables: True
include_views: True
# Profiling
table_lineage_mode: stl_scan_based
include_table_lineage: true
include_view_lineage: true
profiling:
enabled: false
enabled: true
stateful_ingestion:
enabled: true
`;
export const REDSHIFT = 'redshift';

View File

@ -8,10 +8,8 @@ source:
account_id: "example_id"
warehouse: "example_warehouse"
role: "datahub_role"
ignore_start_time_lineage: true
include_table_lineage: true
include_view_lineage: true
check_role_grants: true
profiling:
enabled: true
stateful_ingestion: