Fix #1171: return list of status for ingestion pipeline and next scheduled run (#1175)

* Fix #1172: return list of status for ingestion pipeline and next scheduled run
This commit is contained in:
Sriharsha Chintalapani 2021-11-13 17:43:17 -08:00 committed by GitHub
parent e2578d6be3
commit f81fb3d5e8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 191 additions and 76 deletions

View File

@ -26,90 +26,105 @@ import java.util.Map;
@Getter
@Builder
class AirflowAuthRequest {
String username;
String password;
@Builder.Default
String provider = "db";
@Builder.Default
Boolean refresh = true;
String username;
String password;
@Builder.Default
String provider = "db";
@Builder.Default
Boolean refresh = true;
}
@Getter
class AirflowAuthResponse {
@JsonProperty("access_token")
String accessToken;
@JsonProperty("refresh_token")
String refreshToken;
@JsonProperty("access_token")
String accessToken;
@JsonProperty("refresh_token")
String refreshToken;
}
@Getter
class AirflowDagRun {
String state;
String startDate;
String endDate;
}
@Getter
class AirflowListResponse {
@JsonProperty("status")
String status;
@JsonProperty("next_run")
String nextRun;
@JsonProperty("dag_runs")
List<AirflowDagRun> dagRuns;
}
@Builder
@Getter
class OpenMetadataIngestionComponent {
String type;
Map<String, Object> config;
String type;
Map<String, Object> config;
}
@Builder
@Getter
class OpenMetadataIngestionConfig {
OpenMetadataIngestionComponent source;
OpenMetadataIngestionComponent sink;
@JsonProperty("metadata_server")
OpenMetadataIngestionComponent metadataServer;
OpenMetadataIngestionComponent source;
OpenMetadataIngestionComponent sink;
@JsonProperty("metadata_server")
OpenMetadataIngestionComponent metadataServer;
}
@Builder
@Getter
class IngestionTaskConfig {
@Builder.Default
@JsonProperty("python_callable_name")
String pythonCallableName = "metadata_ingestion_workflow";
@Builder.Default
@JsonProperty("python_callable_name")
String pythonCallableName = "metadata_ingestion_workflow";
@Builder.Default
@JsonProperty("python_callable_file")
String pythonCallableFile = "metadata_ingestion.py";
@Builder.Default
@JsonProperty("python_callable_file")
String pythonCallableFile = "metadata_ingestion.py";
@JsonProperty("op_kwargs")
Map<String, Object> opKwargs;
@JsonProperty("op_kwargs")
Map<String, Object> opKwargs;
}
@Builder
@Getter
class OpenMetadataIngestionTask {
String name;
@Builder.Default
String operator = "airflow.operators.python_operator.PythonOperator";
IngestionTaskConfig config;
String name;
@Builder.Default
String operator = "airflow.operators.python_operator.PythonOperator";
IngestionTaskConfig config;
}
@Builder
@Getter
class IngestionPipeline {
String name;
@Builder.Default
Boolean forceDeploy = true;
@Builder.Default
Boolean pauseWorkflow = false;
String description;
@Builder.Default
Integer concurrency = 1;
@Builder.Default
Integer maxActiveRuns = 1;
@Builder.Default
Integer workflowTimeout = 60;
@Builder.Default
String workflowDefaultView = "tree";
@Builder.Default
String orientation = "LR";
String owner;
String startDate;
@Builder.Default
Integer retries = 3;
@Builder.Default
Integer retryDelay = 300;
@JsonProperty("schedule_interval")
String schedulerInterval;
List<OpenMetadataIngestionTask> tasks;
String name;
@Builder.Default
Boolean forceDeploy = true;
@Builder.Default
Boolean pauseWorkflow = false;
String description;
@Builder.Default
Integer concurrency = 1;
@Builder.Default
Integer maxActiveRuns = 1;
@Builder.Default
Integer workflowTimeout = 60;
@Builder.Default
String workflowDefaultView = "tree";
@Builder.Default
String orientation = "LR";
String owner;
String startDate;
@Builder.Default
Integer retries = 3;
@Builder.Default
Integer retryDelay = 300;
String scheduleInterval;
List<OpenMetadataIngestionTask> tasks;
}

View File

@ -20,6 +20,7 @@ import org.json.JSONObject;
import org.openmetadata.catalog.CatalogApplicationConfig;
import org.openmetadata.catalog.exception.IngestionPipelineDeploymentException;
import org.openmetadata.catalog.operations.workflows.Ingestion;
import org.openmetadata.catalog.operations.workflows.IngestionStatus;
import org.openmetadata.catalog.util.JsonUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -33,6 +34,8 @@ import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
public class AirflowRESTClient {
private static final Logger LOG = LoggerFactory.getLogger(AirflowRESTClient.class);
@ -43,6 +46,7 @@ public class AirflowRESTClient {
private final String authEndpoint = "%s/api/v1/security/login";
private final String deployEndPoint = "%s/rest_api/api?api=deploy_dag";
private final String triggerEndPoint = "%s/rest_api/api?api=trigger_dag";
private final String statusEndPoint = "%s/rest_api/api?api=list_run&dag_id=%s";
private final String authHeader = "Bearer %s";
@ -127,4 +131,38 @@ public class AirflowRESTClient {
throw IngestionPipelineDeploymentException.byMessage(pipelineName, e.getMessage());
}
}
public Ingestion getStatus(Ingestion ingestion) {
try {
String token = authenticate();
String authToken = String.format(this.authHeader, token);
String url = String.format(this.statusEndPoint, this.url, ingestion.getName());
JSONObject requestPayload = new JSONObject();
HttpRequest request = HttpRequest.newBuilder(URI.create(url))
.header("Content-Type", "application/json")
.header("Authorization", authToken)
.POST(HttpRequest.BodyPublishers.ofString(requestPayload.toString()))
.build();
HttpResponse<String> response = client.send(request, HttpResponse.BodyHandlers.ofString());
if (response.statusCode() == 200) {
AirflowListResponse airflowListResponse = JsonUtils.readValue(response.body(),
AirflowListResponse.class);
ingestion.setNextExecutionDate(airflowListResponse.getNextRun());
List<IngestionStatus> statuses = new ArrayList<>();
for (AirflowDagRun dagRun: airflowListResponse.dagRuns) {
IngestionStatus ingestionStatus = new IngestionStatus().withState(dagRun.getState())
.withStartDate(dagRun.getStartDate()).withEndDate(dagRun.getEndDate());
statuses.add(ingestionStatus);
}
ingestion.setIngestionStatuses(statuses);
return ingestion;
}
throw IngestionPipelineDeploymentException.byMessage(ingestion.getName(),
"Failed to fetch ingestion pipeline runs",
Response.Status.fromStatusCode(response.statusCode()));
} catch (Exception e) {
throw IngestionPipelineDeploymentException.byMessage(ingestion.getName(), e.getMessage());
}
}
}

View File

@ -100,7 +100,7 @@ public final class AirflowUtils {
.forceDeploy(ingestion.getForceDeploy())
.pauseWorkflow(ingestion.getPauseWorkflow())
.owner(ingestion.getOwner().getName())
.schedulerInterval(ingestion.getScheduleInterval())
.scheduleInterval(ingestion.getScheduleInterval())
.concurrency(ingestion.getConcurrency())
.startDate(ingestion.getStartDate())
.tasks(taskList).build();

View File

@ -42,6 +42,8 @@ import org.openmetadata.catalog.util.EntityUtil.Fields;
import org.openmetadata.catalog.util.RestUtil;
import org.openmetadata.catalog.util.RestUtil.PutResponse;
import org.openmetadata.catalog.util.ResultList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.json.JsonPatch;
import javax.validation.Valid;
@ -81,6 +83,8 @@ import java.util.UUID;
@Consumes(MediaType.APPLICATION_JSON)
@Collection(name = "ingestion")
public class IngestionResource {
private static final Logger LOG = LoggerFactory.getLogger(IngestionResource.class);
public static final String COLLECTION_PATH = "operations/v1/ingestion/";
private final IngestionRepository dao;
private final CatalogAuthorizer authorizer;
@ -126,7 +130,7 @@ public class IngestionResource {
}
}
static final String FIELDS = "owner,service,tags";
static final String FIELDS = "owner,service,tags,status";
public static final List<String> FIELD_LIST = Arrays.asList(FIELDS.replaceAll(" ", "")
.split(","));
@ -168,7 +172,11 @@ public class IngestionResource {
} else { // Forward paging or first page
ingestions = dao.listAfter(uriInfo, fields, null, limitParam, after);
}
addHref(uriInfo, ingestions.getData());
List<Ingestion> ingestionList = ingestions.getData();
if (fieldsParam != null && fieldsParam.contains("status")) {
ingestionList = addStatus(ingestions.getData());
}
addHref(uriInfo, ingestionList);
return ingestions;
}
@ -205,7 +213,11 @@ public class IngestionResource {
schema = @Schema(type = "string", example = FIELDS))
@QueryParam("fields") String fieldsParam) throws IOException, ParseException {
Fields fields = new Fields(FIELD_LIST, fieldsParam);
return addHref(uriInfo, dao.get(uriInfo, id, fields));
Ingestion ingestion = dao.get(uriInfo, id, fields);
if (fieldsParam != null && fieldsParam.contains("status")) {
ingestion = addStatus(ingestion);
}
return addHref(uriInfo, ingestion);
}
@GET
@ -245,7 +257,11 @@ public class IngestionResource {
schema = @Schema(type = "string", example = FIELDS))
@QueryParam("fields") String fieldsParam) throws IOException, ParseException {
Fields fields = new Fields(FIELD_LIST, fieldsParam);
return addHref(uriInfo, dao.getByName(uriInfo, fqn, fields));
Ingestion ingestion = dao.getByName(uriInfo, fqn, fields);
if (fieldsParam != null && fieldsParam.contains("status")) {
ingestion = addStatus(ingestion);
}
return addHref(uriInfo, ingestion);
}
@ -347,22 +363,23 @@ public class IngestionResource {
private Ingestion getIngestion(SecurityContext securityContext, CreateIngestion create) {
return new Ingestion().withId(UUID.randomUUID()).withName(create.getName())
.withDisplayName(create.getDisplayName())
.withDescription(create.getDescription())
.withForceDeploy(create.getForceDeploy())
.withConcurrency(create.getConcurrency())
.withPauseWorkflow(create.getPauseWorkflow())
.withStartDate(create.getStartDate())
.withEndDate(create.getEndDate())
.withRetries(create.getRetries())
.withRetryDelay(create.getRetryDelay())
.withConnectorConfig(create.getConnectorConfig())
.withWorkflowCatchup(create.getWorkflowCatchup())
.withTags(create.getTags())
.withOwner(create.getOwner())
.withService(create.getService())
.withUpdatedBy(securityContext.getUserPrincipal().getName())
.withUpdatedAt(new Date());
.withDisplayName(create.getDisplayName())
.withDescription(create.getDescription())
.withForceDeploy(create.getForceDeploy())
.withConcurrency(create.getConcurrency())
.withPauseWorkflow(create.getPauseWorkflow())
.withStartDate(create.getStartDate())
.withEndDate(create.getEndDate())
.withRetries(create.getRetries())
.withRetryDelay(create.getRetryDelay())
.withConnectorConfig(create.getConnectorConfig())
.withWorkflowCatchup(create.getWorkflowCatchup())
.withScheduleInterval(create.getScheduleInterval())
.withTags(create.getTags())
.withOwner(create.getOwner())
.withService(create.getService())
.withUpdatedBy(securityContext.getUserPrincipal().getName())
.withUpdatedAt(new Date());
}
private void deploy(Ingestion ingestion) {
@ -370,4 +387,18 @@ public class IngestionResource {
airflowRESTClient.deploy(ingestion, config);
}
}
public List<Ingestion> addStatus(List<Ingestion> ingestions) {
Optional.ofNullable(ingestions).orElse(Collections.emptyList()).forEach(this::addStatus);
return ingestions;
}
private Ingestion addStatus(Ingestion ingestion) {
try {
ingestion = airflowRESTClient.getStatus(ingestion);
} catch (Exception e) {
LOG.error("Failed to fetch status for {}", ingestion.getName());
}
return ingestion;
}
}

View File

@ -120,6 +120,25 @@
"default": null
}
}
},
"ingestionStatus": {
"type": "object",
"javaType": "org.openmetadata.catalog.operations.workflows.IngestionStatus",
"description": "This defines the runtime status of Ingestion.",
"properties": {
"state": {
"description": "Workflow status denotes if its failed or succeeded",
"type": "string"
},
"startDate": {
"description": "startDate of the Ingestion Pipeline run for this particular execution",
"type": "string"
},
"endDate": {
"description": "endDate of the Ingestion pipeline run for this particular execution.",
"type": "string"
}
}
}
},
"properties" : {
@ -183,6 +202,10 @@
"description": "End Date of the workflow.",
"$ref": "../../type/basic.json#/definitions/date"
},
"nextExecutionDate": {
"description": "Next execution date from the underlying workflow platform once the ingestion scheduled.",
"$ref": "../../type/basic.json#/definitions/date"
},
"workflowTimezone": {
"description": "Timezone in which workflow going to be scheduled.",
"type": "string",
@ -215,6 +238,14 @@
"connectorConfig": {
"$ref": "#/definitions/connectorConfig"
},
"ingestionStatuses": {
"description": "List of executions and status for the Ingestion Pipeline.",
"type": "array",
"items": {
"$ref": "#/definitions/ingestionStatus"
},
"default": null
},
"service" : {
"description": "Link to the database service where this database is hosted in.",
"$ref" : "../../type/entityReference.json"

View File

@ -199,7 +199,7 @@
<module name="ArrayTypeStyle"/>
<module name="Indentation">
<property name="basicOffset" value="2"/>
<property name="caseIndent" value="2"/>
<property name="caseIndent" value="4"/>
</module>
<!--<module name="TodoComment"/>-->
<module name="UpperEll"/>