diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/ingestion/AirflowModels.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/ingestion/AirflowModels.java index 1050c15cd5c..dc4b2b15add 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/ingestion/AirflowModels.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/ingestion/AirflowModels.java @@ -26,90 +26,105 @@ import java.util.Map; @Getter @Builder class AirflowAuthRequest { - String username; - String password; - @Builder.Default - String provider = "db"; - @Builder.Default - Boolean refresh = true; + String username; + String password; + @Builder.Default + String provider = "db"; + @Builder.Default + Boolean refresh = true; } @Getter class AirflowAuthResponse { - @JsonProperty("access_token") - String accessToken; - @JsonProperty("refresh_token") - String refreshToken; + @JsonProperty("access_token") + String accessToken; + @JsonProperty("refresh_token") + String refreshToken; +} + +@Getter +class AirflowDagRun { + String state; + String startDate; + String endDate; +} + +@Getter +class AirflowListResponse { + @JsonProperty("status") + String status; + @JsonProperty("next_run") + String nextRun; + @JsonProperty("dag_runs") + List dagRuns; } @Builder @Getter class OpenMetadataIngestionComponent { - String type; - Map config; + String type; + Map config; } @Builder @Getter class OpenMetadataIngestionConfig { - OpenMetadataIngestionComponent source; - OpenMetadataIngestionComponent sink; - @JsonProperty("metadata_server") - OpenMetadataIngestionComponent metadataServer; + OpenMetadataIngestionComponent source; + OpenMetadataIngestionComponent sink; + @JsonProperty("metadata_server") + OpenMetadataIngestionComponent metadataServer; } @Builder @Getter class IngestionTaskConfig { - @Builder.Default - @JsonProperty("python_callable_name") - String pythonCallableName = "metadata_ingestion_workflow"; + @Builder.Default + @JsonProperty("python_callable_name") + String pythonCallableName = "metadata_ingestion_workflow"; - @Builder.Default - @JsonProperty("python_callable_file") - String pythonCallableFile = "metadata_ingestion.py"; + @Builder.Default + @JsonProperty("python_callable_file") + String pythonCallableFile = "metadata_ingestion.py"; - @JsonProperty("op_kwargs") - Map opKwargs; + @JsonProperty("op_kwargs") + Map opKwargs; } @Builder @Getter class OpenMetadataIngestionTask { - String name; - @Builder.Default - String operator = "airflow.operators.python_operator.PythonOperator"; - IngestionTaskConfig config; - + String name; + @Builder.Default + String operator = "airflow.operators.python_operator.PythonOperator"; + IngestionTaskConfig config; } @Builder @Getter class IngestionPipeline { - String name; - @Builder.Default - Boolean forceDeploy = true; - @Builder.Default - Boolean pauseWorkflow = false; - String description; - @Builder.Default - Integer concurrency = 1; - @Builder.Default - Integer maxActiveRuns = 1; - @Builder.Default - Integer workflowTimeout = 60; - @Builder.Default - String workflowDefaultView = "tree"; - @Builder.Default - String orientation = "LR"; - String owner; - String startDate; - @Builder.Default - Integer retries = 3; - @Builder.Default - Integer retryDelay = 300; - @JsonProperty("schedule_interval") - String schedulerInterval; - List tasks; + String name; + @Builder.Default + Boolean forceDeploy = true; + @Builder.Default + Boolean pauseWorkflow = false; + String description; + @Builder.Default + Integer concurrency = 1; + @Builder.Default + Integer maxActiveRuns = 1; + @Builder.Default + Integer workflowTimeout = 60; + @Builder.Default + String workflowDefaultView = "tree"; + @Builder.Default + String orientation = "LR"; + String owner; + String startDate; + @Builder.Default + Integer retries = 3; + @Builder.Default + Integer retryDelay = 300; + String scheduleInterval; + List tasks; } \ No newline at end of file diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/ingestion/AirflowRESTClient.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/ingestion/AirflowRESTClient.java index 52eee7d8063..fba8cb9c4f6 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/ingestion/AirflowRESTClient.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/ingestion/AirflowRESTClient.java @@ -20,6 +20,7 @@ import org.json.JSONObject; import org.openmetadata.catalog.CatalogApplicationConfig; import org.openmetadata.catalog.exception.IngestionPipelineDeploymentException; import org.openmetadata.catalog.operations.workflows.Ingestion; +import org.openmetadata.catalog.operations.workflows.IngestionStatus; import org.openmetadata.catalog.util.JsonUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,6 +34,8 @@ import java.net.http.HttpClient; import java.net.http.HttpRequest; import java.net.http.HttpResponse; import java.time.Duration; +import java.util.ArrayList; +import java.util.List; public class AirflowRESTClient { private static final Logger LOG = LoggerFactory.getLogger(AirflowRESTClient.class); @@ -43,6 +46,7 @@ public class AirflowRESTClient { private final String authEndpoint = "%s/api/v1/security/login"; private final String deployEndPoint = "%s/rest_api/api?api=deploy_dag"; private final String triggerEndPoint = "%s/rest_api/api?api=trigger_dag"; + private final String statusEndPoint = "%s/rest_api/api?api=list_run&dag_id=%s"; private final String authHeader = "Bearer %s"; @@ -127,4 +131,38 @@ public class AirflowRESTClient { throw IngestionPipelineDeploymentException.byMessage(pipelineName, e.getMessage()); } } + + public Ingestion getStatus(Ingestion ingestion) { + try { + String token = authenticate(); + String authToken = String.format(this.authHeader, token); + String url = String.format(this.statusEndPoint, this.url, ingestion.getName()); + JSONObject requestPayload = new JSONObject(); + HttpRequest request = HttpRequest.newBuilder(URI.create(url)) + .header("Content-Type", "application/json") + .header("Authorization", authToken) + .POST(HttpRequest.BodyPublishers.ofString(requestPayload.toString())) + .build(); + HttpResponse response = client.send(request, HttpResponse.BodyHandlers.ofString()); + if (response.statusCode() == 200) { + AirflowListResponse airflowListResponse = JsonUtils.readValue(response.body(), + AirflowListResponse.class); + ingestion.setNextExecutionDate(airflowListResponse.getNextRun()); + List statuses = new ArrayList<>(); + for (AirflowDagRun dagRun: airflowListResponse.dagRuns) { + IngestionStatus ingestionStatus = new IngestionStatus().withState(dagRun.getState()) + .withStartDate(dagRun.getStartDate()).withEndDate(dagRun.getEndDate()); + statuses.add(ingestionStatus); + } + ingestion.setIngestionStatuses(statuses); + return ingestion; + } + + throw IngestionPipelineDeploymentException.byMessage(ingestion.getName(), + "Failed to fetch ingestion pipeline runs", + Response.Status.fromStatusCode(response.statusCode())); + } catch (Exception e) { + throw IngestionPipelineDeploymentException.byMessage(ingestion.getName(), e.getMessage()); + } + } } diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/ingestion/AirflowUtils.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/ingestion/AirflowUtils.java index 66a819c8204..3e7c6985582 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/ingestion/AirflowUtils.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/ingestion/AirflowUtils.java @@ -100,7 +100,7 @@ public final class AirflowUtils { .forceDeploy(ingestion.getForceDeploy()) .pauseWorkflow(ingestion.getPauseWorkflow()) .owner(ingestion.getOwner().getName()) - .schedulerInterval(ingestion.getScheduleInterval()) + .scheduleInterval(ingestion.getScheduleInterval()) .concurrency(ingestion.getConcurrency()) .startDate(ingestion.getStartDate()) .tasks(taskList).build(); diff --git a/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/operations/IngestionResource.java b/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/operations/IngestionResource.java index f7dfad4649d..367ef73bfab 100644 --- a/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/operations/IngestionResource.java +++ b/catalog-rest-service/src/main/java/org/openmetadata/catalog/resources/operations/IngestionResource.java @@ -42,6 +42,8 @@ import org.openmetadata.catalog.util.EntityUtil.Fields; import org.openmetadata.catalog.util.RestUtil; import org.openmetadata.catalog.util.RestUtil.PutResponse; import org.openmetadata.catalog.util.ResultList; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.json.JsonPatch; import javax.validation.Valid; @@ -81,6 +83,8 @@ import java.util.UUID; @Consumes(MediaType.APPLICATION_JSON) @Collection(name = "ingestion") public class IngestionResource { + private static final Logger LOG = LoggerFactory.getLogger(IngestionResource.class); + public static final String COLLECTION_PATH = "operations/v1/ingestion/"; private final IngestionRepository dao; private final CatalogAuthorizer authorizer; @@ -126,7 +130,7 @@ public class IngestionResource { } } - static final String FIELDS = "owner,service,tags"; + static final String FIELDS = "owner,service,tags,status"; public static final List FIELD_LIST = Arrays.asList(FIELDS.replaceAll(" ", "") .split(",")); @@ -168,7 +172,11 @@ public class IngestionResource { } else { // Forward paging or first page ingestions = dao.listAfter(uriInfo, fields, null, limitParam, after); } - addHref(uriInfo, ingestions.getData()); + List ingestionList = ingestions.getData(); + if (fieldsParam != null && fieldsParam.contains("status")) { + ingestionList = addStatus(ingestions.getData()); + } + addHref(uriInfo, ingestionList); return ingestions; } @@ -205,7 +213,11 @@ public class IngestionResource { schema = @Schema(type = "string", example = FIELDS)) @QueryParam("fields") String fieldsParam) throws IOException, ParseException { Fields fields = new Fields(FIELD_LIST, fieldsParam); - return addHref(uriInfo, dao.get(uriInfo, id, fields)); + Ingestion ingestion = dao.get(uriInfo, id, fields); + if (fieldsParam != null && fieldsParam.contains("status")) { + ingestion = addStatus(ingestion); + } + return addHref(uriInfo, ingestion); } @GET @@ -245,7 +257,11 @@ public class IngestionResource { schema = @Schema(type = "string", example = FIELDS)) @QueryParam("fields") String fieldsParam) throws IOException, ParseException { Fields fields = new Fields(FIELD_LIST, fieldsParam); - return addHref(uriInfo, dao.getByName(uriInfo, fqn, fields)); + Ingestion ingestion = dao.getByName(uriInfo, fqn, fields); + if (fieldsParam != null && fieldsParam.contains("status")) { + ingestion = addStatus(ingestion); + } + return addHref(uriInfo, ingestion); } @@ -347,22 +363,23 @@ public class IngestionResource { private Ingestion getIngestion(SecurityContext securityContext, CreateIngestion create) { return new Ingestion().withId(UUID.randomUUID()).withName(create.getName()) - .withDisplayName(create.getDisplayName()) - .withDescription(create.getDescription()) - .withForceDeploy(create.getForceDeploy()) - .withConcurrency(create.getConcurrency()) - .withPauseWorkflow(create.getPauseWorkflow()) - .withStartDate(create.getStartDate()) - .withEndDate(create.getEndDate()) - .withRetries(create.getRetries()) - .withRetryDelay(create.getRetryDelay()) - .withConnectorConfig(create.getConnectorConfig()) - .withWorkflowCatchup(create.getWorkflowCatchup()) - .withTags(create.getTags()) - .withOwner(create.getOwner()) - .withService(create.getService()) - .withUpdatedBy(securityContext.getUserPrincipal().getName()) - .withUpdatedAt(new Date()); + .withDisplayName(create.getDisplayName()) + .withDescription(create.getDescription()) + .withForceDeploy(create.getForceDeploy()) + .withConcurrency(create.getConcurrency()) + .withPauseWorkflow(create.getPauseWorkflow()) + .withStartDate(create.getStartDate()) + .withEndDate(create.getEndDate()) + .withRetries(create.getRetries()) + .withRetryDelay(create.getRetryDelay()) + .withConnectorConfig(create.getConnectorConfig()) + .withWorkflowCatchup(create.getWorkflowCatchup()) + .withScheduleInterval(create.getScheduleInterval()) + .withTags(create.getTags()) + .withOwner(create.getOwner()) + .withService(create.getService()) + .withUpdatedBy(securityContext.getUserPrincipal().getName()) + .withUpdatedAt(new Date()); } private void deploy(Ingestion ingestion) { @@ -370,4 +387,18 @@ public class IngestionResource { airflowRESTClient.deploy(ingestion, config); } } + + public List addStatus(List ingestions) { + Optional.ofNullable(ingestions).orElse(Collections.emptyList()).forEach(this::addStatus); + return ingestions; + } + + private Ingestion addStatus(Ingestion ingestion) { + try { + ingestion = airflowRESTClient.getStatus(ingestion); + } catch (Exception e) { + LOG.error("Failed to fetch status for {}", ingestion.getName()); + } + return ingestion; + } } diff --git a/catalog-rest-service/src/main/resources/json/schema/operations/workflows/ingestion.json b/catalog-rest-service/src/main/resources/json/schema/operations/workflows/ingestion.json index 7d23d658c49..eed1d934432 100644 --- a/catalog-rest-service/src/main/resources/json/schema/operations/workflows/ingestion.json +++ b/catalog-rest-service/src/main/resources/json/schema/operations/workflows/ingestion.json @@ -120,6 +120,25 @@ "default": null } } + }, + "ingestionStatus": { + "type": "object", + "javaType": "org.openmetadata.catalog.operations.workflows.IngestionStatus", + "description": "This defines the runtime status of Ingestion.", + "properties": { + "state": { + "description": "Workflow status denotes if its failed or succeeded", + "type": "string" + }, + "startDate": { + "description": "startDate of the Ingestion Pipeline run for this particular execution", + "type": "string" + }, + "endDate": { + "description": "endDate of the Ingestion pipeline run for this particular execution.", + "type": "string" + } + } } }, "properties" : { @@ -183,6 +202,10 @@ "description": "End Date of the workflow.", "$ref": "../../type/basic.json#/definitions/date" }, + "nextExecutionDate": { + "description": "Next execution date from the underlying workflow platform once the ingestion scheduled.", + "$ref": "../../type/basic.json#/definitions/date" + }, "workflowTimezone": { "description": "Timezone in which workflow going to be scheduled.", "type": "string", @@ -215,6 +238,14 @@ "connectorConfig": { "$ref": "#/definitions/connectorConfig" }, + "ingestionStatuses": { + "description": "List of executions and status for the Ingestion Pipeline.", + "type": "array", + "items": { + "$ref": "#/definitions/ingestionStatus" + }, + "default": null + }, "service" : { "description": "Link to the database service where this database is hosted in.", "$ref" : "../../type/entityReference.json" diff --git a/checkstyle/checkstyle.xml b/checkstyle/checkstyle.xml index f4b25ccd47e..d8e65111180 100644 --- a/checkstyle/checkstyle.xml +++ b/checkstyle/checkstyle.xml @@ -199,7 +199,7 @@ - +