MINOR: Update Default Configurations (#20398)

* Update Default Configurations

* Update Airflow Default Scheduling

* Update Airflow Default Scheduling
This commit is contained in:
IceS2 2025-03-25 05:53:01 -03:00 committed by GitHub
parent 662741e496
commit 254420bbb1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 61 additions and 17 deletions

View File

@ -46,6 +46,7 @@ public class CreateAndRunIngestionPipelineTask implements NodeInterface {
ServiceTask runIngestionPipeline =
getRunIngestionPipelineServiceTask(
subProcessId,
nodeDefinition.getConfig().getShouldRun(),
nodeDefinition.getConfig().getWaitForCompletion(),
nodeDefinition.getConfig().getTimeoutSeconds(),
JsonUtils.pojoToJson(nodeDefinition.getInputNamespaceMap()));
@ -79,9 +80,15 @@ public class CreateAndRunIngestionPipelineTask implements NodeInterface {
private ServiceTask getRunIngestionPipelineServiceTask(
String subProcessId,
boolean shouldRun,
boolean waitForCompletion,
long timeoutSeconds,
String inputNamespaceMap) {
FieldExtension shouldRunExpr =
new FieldExtensionBuilder()
.fieldName("shouldRunExpr")
.fieldValue(String.valueOf(shouldRun))
.build();
FieldExtension waitExpr =
new FieldExtensionBuilder()
.fieldName("waitForCompletionExpr")
@ -108,6 +115,7 @@ public class CreateAndRunIngestionPipelineTask implements NodeInterface {
return new ServiceTaskBuilder()
.id(getFlowableElementId(subProcessId, "triggerIngestionWorkflow"))
.implementation(RunIngestionPipelineDelegate.class.getName())
.addFieldExtension(shouldRunExpr)
.addFieldExtension(waitExpr)
.addFieldExtension(timeoutSecondsExpr)
.addFieldExtension(inputNamespaceMapExpr)

View File

@ -50,6 +50,7 @@ import org.openmetadata.service.util.JsonUtils;
@Slf4j
public class CreateIngestionPipelineImpl {
private static final List<String> DEFAULT_TIERS_TO_PROCESS = List.of("Tier1", "Tier2");
private static final Map<PipelineType, String> SUPPORT_FEATURE_MAP = new HashMap<>();
static {
@ -209,9 +210,7 @@ public class CreateIngestionPipelineImpl {
org.openmetadata.schema.api.services.ingestionPipelines.CreateIngestionPipeline create =
new org.openmetadata.schema.api.services.ingestionPipelines.CreateIngestionPipeline()
.withAirflowConfig(
new AirflowConfig()
.withStartDate(getYesterdayDate())
.withScheduleInterval("0 0 * * 0")) // Run every Sunday at midnight by default
getAirflowConfig(pipelineType)) // Run every Sunday at midnight by default
.withLoggerLevel(LogLevels.INFO)
.withName(UUID.randomUUID().toString())
.withDisplayName(displayName)
@ -225,6 +224,21 @@ public class CreateIngestionPipelineImpl {
return repository.create(null, ingestionPipeline);
}
private AirflowConfig getAirflowConfig(PipelineType pipelineType) {
String scheduleInterval = "0 0 * * 0";
if (List.of(PipelineType.LINEAGE, PipelineType.USAGE).contains(pipelineType)) {
scheduleInterval = "0 2 * * 0";
} else if (List.of(PipelineType.PROFILER, PipelineType.AUTO_CLASSIFICATION)
.contains(pipelineType)) {
scheduleInterval = "0 4 * * 0";
}
return new AirflowConfig()
.withStartDate(getYesterdayDate())
.withScheduleInterval(scheduleInterval);
}
private IngestionPipeline getIngestionPipeline(
IngestionPipelineRepository repository,
PipelineType pipelineType,
@ -303,7 +317,9 @@ public class CreateIngestionPipelineImpl {
return new DatabaseServiceProfilerPipeline()
.withDatabaseFilterPattern(defaultFilters.get(DATABASE_FILTER_PATTERN))
.withSchemaFilterPattern(defaultFilters.get(SCHEMA_FILTER_PATTERN))
.withTableFilterPattern(defaultFilters.get(TABLE_FILTER_PATTERN));
.withTableFilterPattern(defaultFilters.get(TABLE_FILTER_PATTERN))
.withClassificationFilterPattern(
new FilterPattern().withIncludes(DEFAULT_TIERS_TO_PROCESS));
}
private static DatabaseServiceAutoClassificationPipeline
@ -311,7 +327,9 @@ public class CreateIngestionPipelineImpl {
return new DatabaseServiceAutoClassificationPipeline()
.withDatabaseFilterPattern(defaultFilters.get(DATABASE_FILTER_PATTERN))
.withSchemaFilterPattern(defaultFilters.get(SCHEMA_FILTER_PATTERN))
.withTableFilterPattern(defaultFilters.get(TABLE_FILTER_PATTERN));
.withTableFilterPattern(defaultFilters.get(TABLE_FILTER_PATTERN))
.withClassificationFilterPattern(new FilterPattern().withIncludes(DEFAULT_TIERS_TO_PROCESS))
.withEnableAutoClassification(true);
}
// Other Services Metadata Pipelines

View File

@ -23,6 +23,7 @@ import org.openmetadata.service.util.JsonUtils;
public class RunIngestionPipelineDelegate implements JavaDelegate {
private Expression inputNamespaceMapExpr;
private Expression pipelineServiceClientExpr;
private Expression shouldRunExpr;
private Expression waitForCompletionExpr;
private Expression timeoutSecondsExpr;
@ -33,18 +34,25 @@ public class RunIngestionPipelineDelegate implements JavaDelegate {
Map<String, String> inputNamespaceMap =
JsonUtils.readOrConvertValue(inputNamespaceMapExpr.getValue(execution), Map.class);
boolean shouldRun = Boolean.parseBoolean((String) shouldRunExpr.getValue(execution));
boolean waitForCompletion =
Boolean.parseBoolean((String) waitForCompletionExpr.getValue(execution));
long timeoutSeconds = Long.parseLong((String) timeoutSecondsExpr.getValue(execution));
PipelineServiceClientInterface pipelineServiceClient =
(PipelineServiceClientInterface) pipelineServiceClientExpr.getValue(execution);
boolean result = true;
UUID ingestionPipelineId = (UUID) varHandler.getNodeVariable(INGESTION_PIPELINE_ID_VARIABLE);
if (shouldRun) {
PipelineServiceClientInterface pipelineServiceClient =
(PipelineServiceClientInterface) pipelineServiceClientExpr.getValue(execution);
boolean result =
new RunIngestionPipelineImpl(pipelineServiceClient)
.execute(ingestionPipelineId, waitForCompletion, timeoutSeconds);
UUID ingestionPipelineId =
(UUID) varHandler.getNodeVariable(INGESTION_PIPELINE_ID_VARIABLE);
result =
new RunIngestionPipelineImpl(pipelineServiceClient)
.execute(ingestionPipelineId, waitForCompletion, timeoutSeconds);
}
varHandler.setNodeVariable(RESULT_VARIABLE, getResultFromBoolean(result));
if (!result) {

View File

@ -100,7 +100,8 @@ public class RunAppImpl {
.withFilter(
String.format(
"{\"query\":{\"bool\":{\"must\":[{\"bool\":{\"must\":[{\"term\":{\"Tier.TagFQN\":\"Tier.Tier1\"}},{\"term\":{\"entityType\":\"table\"}},{\"term\":{\"service.displayName.keyword\":\"%s\"}}]}}]}}}",
service.getName()));
service.getName()))
.withPatchIfEmpty(true);
case "CollateAIQualityAgentApplication" -> config =
(JsonUtils.convertValue(config, CollateAIQualityAgentAppConfig.class))
.withFilter(
@ -112,7 +113,8 @@ public class RunAppImpl {
.withFilter(
String.format(
"{\"query\":{\"bool\":{\"must\":[{\"bool\":{\"must\":[{\"term\":{\"entityType\":\"table\"}},{\"term\":{\"service.displayName.keyword\":\"%s\"}}]}}]}}}",
service.getName()));
service.getName()))
.withPatchIfEmpty(true);
case "DataInsightsApplication" -> {
DataInsightsAppConfig updatedAppConfig =
(JsonUtils.convertValue(config, DataInsightsAppConfig.class));

View File

@ -95,8 +95,7 @@
"displayName": "Run Profiler Ingestion Pipeline",
"config": {
"pipelineType": "profiler",
"waitForCompletion": true,
"timeoutSeconds": 3600
"shouldRun": false
},
"inputNamespaceMap": {
"relatedEntity": "global"
@ -109,8 +108,7 @@
"displayName": "Run AutoClassification Ingestion Pipeline",
"config": {
"pipelineType": "autoClassification",
"waitForCompletion": true,
"timeoutSeconds": 3600
"shouldRun": false
},
"inputNamespaceMap": {
"relatedEntity": "global"

View File

@ -17,6 +17,12 @@
"description": "Define which ingestion pipeline type should be created",
"$ref": "../../../../../entity/services/ingestionPipelines/ingestionPipeline.json#/definitions/pipelineType"
},
"shouldRun": {
"title": "Run the Ingestion Pipeline",
"description": "If True, it will be created and run. Otherwise it will just be created.",
"type": "boolean",
"default": true
},
"waitForCompletion": {
"title": "Wait for Completion",
"description": "Set if this step should wait until the Ingestion Pipeline finishes running",

View File

@ -40,6 +40,10 @@ export interface Config {
* Define which ingestion pipeline type should be created
*/
pipelineType: PipelineType;
/**
* If True, it will be created and run. Otherwise it will just be created.
*/
shouldRun?: boolean;
/**
* Set the amount of seconds to wait before defining the Ingestion Pipeline has timed out.
*/