[WIP] - Fixes #10725 - Fix extension in ingestion pipeline and delete statuses api (#10866)

* Fix extension in ingestion pipeline and delete statuses api

* Add tests for ingestion pipeline status

* Format
This commit is contained in:
Pere Miquel Brull 2023-04-06 21:12:18 +02:00 committed by GitHub
parent 0206af0303
commit ac9979070a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 239 additions and 12 deletions

View File

@ -141,4 +141,10 @@ SET json = JSON_INSERT(
'$.connection.config.databaseName', JSON_EXTRACT(json, '$.connection.config.database')
)
where serviceType = 'Druid'
and JSON_EXTRACT(json, '$.connection.config.database') is not null;
and JSON_EXTRACT(json, '$.connection.config.database') is not null;
-- We were using the same jsonSchema for Pipeline Services and Ingestion Pipeline status
-- Also, we relied on the extension to store the run id
UPDATE entity_extension_time_series
SET jsonSchema = 'ingestionPipelineStatus', extension = 'ingestionPipeline.pipelineStatus'
WHERE jsonSchema = 'pipelineStatus' AND extension <> 'pipeline.PipelineStatus';

View File

@ -141,4 +141,10 @@ CREATE TABLE IF NOT EXISTS dashboard_data_model_entity (
UPDATE dbservice_entity
SET json = jsonb_set(json::jsonb #- '{connection,config,database}', '{connection,config,databaseName}', json#> '{connection,config,database}', true)
WHERE servicetype = 'Druid' and json #>'{connection,config,database}' is not null;
WHERE servicetype = 'Druid' and json #>'{connection,config,database}' is not null;
-- We were using the same jsonSchema for Pipeline Services and Ingestion Pipeline status
-- Also, we relied on the extension to store the run id
UPDATE entity_extension_time_series
SET jsonSchema = 'ingestionPipelineStatus', extension = 'ingestionPipeline.pipelineStatus'
WHERE jsonSchema = 'pipelineStatus' AND extension <> 'pipeline.PipelineStatus';

View File

@ -3320,6 +3320,102 @@ public interface CollectionDAO {
@Bind("endTs") long endTs,
@Define("orderBy") OrderBy orderBy);
default void updateExtensionByKey(String key, String value, String entityFQN, String extension, String json) {
String mysqlCond = String.format("AND JSON_UNQUOTE(JSON_EXTRACT(json, '$.%s')) = :value", key);
String psqlCond = String.format("AND json->>'%s' = :value", key);
updateExtensionByKeyInternal(value, entityFQN, extension, json, mysqlCond, psqlCond);
}
default String getExtensionByKey(String key, String value, String entityFQN, String extension) {
String mysqlCond = String.format("AND JSON_UNQUOTE(JSON_EXTRACT(json, '$.%s')) = :value", key);
String psqlCond = String.format("AND json->>'%s' = :value", key);
return getExtensionByKeyInternal(value, entityFQN, extension, mysqlCond, psqlCond);
}
default String getLatestExtensionByKey(String key, String value, String entityFQN, String extension) {
String mysqlCond = String.format("AND JSON_UNQUOTE(JSON_EXTRACT(json, '$.%s')) = :value", key);
String psqlCond = String.format("AND json->>'%s' = :value", key);
return getLatestExtensionByKeyInternal(value, entityFQN, extension, mysqlCond, psqlCond);
}
/*
* Support updating data filtering by top-level keys in the JSON
*/
@ConnectionAwareSqlUpdate(
value =
"UPDATE entity_extension_time_series SET json = :json "
+ "WHERE entityFQN = :entityFQN "
+ "AND extension = :extension "
+ "<mysqlCond>",
connectionType = MYSQL)
@ConnectionAwareSqlUpdate(
value =
"UPDATE entity_extension_time_series SET json = (:json :: jsonb) "
+ "WHERE entityFQN = :entityFQN "
+ "AND extension = :extension "
+ "<psqlCond>",
connectionType = POSTGRES)
void updateExtensionByKeyInternal(
@Bind("value") String value,
@Bind("entityFQN") String entityFQN,
@Bind("extension") String extension,
@Bind("json") String json,
@Define("mysqlCond") String mysqlCond,
@Define("psqlCond") String psqlCond);
/*
* Support selecting data filtering by top-level keys in the JSON
*/
@ConnectionAwareSqlQuery(
value =
"SELECT json from entity_extension_time_series "
+ "WHERE entityFQN = :entityFQN "
+ "AND extension = :extension "
+ "<mysqlCond>",
connectionType = MYSQL)
@ConnectionAwareSqlQuery(
value =
"SELECT json from entity_extension_time_series "
+ "WHERE entityFQN = :entityFQN "
+ "AND extension = :extension "
+ "<psqlCond>",
connectionType = POSTGRES)
String getExtensionByKeyInternal(
@Bind("value") String value,
@Bind("entityFQN") String entityFQN,
@Bind("extension") String extension,
@Define("mysqlCond") String mysqlCond,
@Define("psqlCond") String psqlCond);
@ConnectionAwareSqlQuery(
value =
"SELECT json from entity_extension_time_series "
+ "WHERE entityFQN = :entityFQN "
+ "AND extension = :extension "
+ "<mysqlCond> "
+ "ORDER BY timestamp DESC LIMIT 1",
connectionType = MYSQL)
@ConnectionAwareSqlQuery(
value =
"SELECT json from entity_extension_time_series "
+ "WHERE entityFQN = :entityFQN "
+ "AND extension = :extension "
+ "<psqlCond> "
+ "ORDER BY timestamp DESC LIMIT 1",
connectionType = POSTGRES)
String getLatestExtensionByKeyInternal(
@Bind("value") String value,
@Bind("entityFQN") String entityFQN,
@Bind("extension") String extension,
@Define("mysqlCond") String mysqlCond,
@Define("psqlCond") String psqlCond);
class ReportDataMapper implements RowMapper<ReportDataRow> {
@Override
public ReportDataRow map(ResultSet rs, StatementContext ctx) throws SQLException {

View File

@ -49,7 +49,9 @@ public class IngestionPipelineRepository extends EntityRepository<IngestionPipel
private static final String UPDATE_FIELDS = "owner,sourceConfig,airflowConfig,loggerLevel,enabled,deployed";
private static final String PATCH_FIELDS = "owner,sourceConfig,airflowConfig,loggerLevel,enabled,deployed";
private static final String PIPELINE_STATUS_JSON_SCHEMA = "pipelineStatus";
private static final String PIPELINE_STATUS_JSON_SCHEMA = "ingestionPipelineStatus";
private static final String PIPELINE_STATUS_EXTENSION = "ingestionPipeline.pipelineStatus";
private static final String RUN_ID_EXTENSION_KEY = "runId";
private static PipelineServiceClient pipelineServiceClient;
public IngestionPipelineRepository(CollectionDAO dao) {
@ -80,6 +82,18 @@ public class IngestionPipelineRepository extends EntityRepository<IngestionPipel
ingestionPipeline.setService(entityReference);
}
@Transaction
public IngestionPipeline deletePipelineStatus(UUID ingestionPipelineId) throws IOException {
// Validate the request content
IngestionPipeline ingestionPipeline = dao.findEntityById(ingestionPipelineId);
daoCollection
.entityExtensionTimeSeriesDao()
.delete(ingestionPipeline.getFullyQualifiedName(), PIPELINE_STATUS_EXTENSION);
setFieldsInternal(ingestionPipeline, Fields.EMPTY_FIELDS);
return ingestionPipeline;
}
@Override
public void storeEntity(IngestionPipeline ingestionPipeline, boolean update) throws IOException {
// Relationships and fields such as service are derived and not stored as part of json
@ -160,23 +174,28 @@ public class IngestionPipelineRepository extends EntityRepository<IngestionPipel
JsonUtils.readValue(
daoCollection
.entityExtensionTimeSeriesDao()
.getLatestExtension(ingestionPipeline.getFullyQualifiedName(), pipelineStatus.getRunId()),
.getLatestExtensionByKey(
RUN_ID_EXTENSION_KEY,
pipelineStatus.getRunId(),
ingestionPipeline.getFullyQualifiedName(),
PIPELINE_STATUS_EXTENSION),
PipelineStatus.class);
if (storedPipelineStatus != null) {
daoCollection
.entityExtensionTimeSeriesDao()
.update(
ingestionPipeline.getFullyQualifiedName(),
.updateExtensionByKey(
RUN_ID_EXTENSION_KEY,
pipelineStatus.getRunId(),
JsonUtils.pojoToJson(pipelineStatus),
pipelineStatus.getTimestamp());
ingestionPipeline.getFullyQualifiedName(),
PIPELINE_STATUS_EXTENSION,
JsonUtils.pojoToJson(pipelineStatus));
} else {
daoCollection
.entityExtensionTimeSeriesDao()
.insert(
ingestionPipeline.getFullyQualifiedName(),
pipelineStatus.getRunId(),
"pipelineStatus",
PIPELINE_STATUS_EXTENSION,
PIPELINE_STATUS_JSON_SCHEMA,
JsonUtils.pojoToJson(pipelineStatus));
}
ChangeDescription change =
@ -216,7 +235,11 @@ public class IngestionPipelineRepository extends EntityRepository<IngestionPipel
return JsonUtils.readValue(
daoCollection
.entityExtensionTimeSeriesDao()
.getExtension(ingestionPipeline.getFullyQualifiedName(), pipelineStatusRunId.toString()),
.getExtensionByKey(
RUN_ID_EXTENSION_KEY,
pipelineStatusRunId.toString(),
ingestionPipeline.getFullyQualifiedName(),
PIPELINE_STATUS_EXTENSION),
PipelineStatus.class);
}

View File

@ -727,6 +727,32 @@ public class IngestionPipelineResource extends EntityResource<IngestionPipeline,
return dao.getPipelineStatus(fqn, runId);
}
@DELETE
@Path("/{id}/pipelineStatus")
@Operation(
operationId = "deletePipelineStatus",
summary = "Delete Pipeline Status",
tags = "ingestionPipelines",
description = "Delete the Pipeline Status for this Ingestion Pipeline.",
responses = {
@ApiResponse(
responseCode = "200",
description = "Successfully deleted the Statuses",
content =
@Content(mediaType = "application/json", schema = @Schema(implementation = IngestionPipeline.class)))
})
public IngestionPipeline deletePipelineStatus(
@Context UriInfo uriInfo,
@Context SecurityContext securityContext,
@Parameter(description = "Id of the Ingestion Pipeline", schema = @Schema(type = "UUID")) @PathParam("id")
UUID id)
throws IOException {
OperationContext operationContext = new OperationContext(entityType, MetadataOperation.DELETE);
authorizer.authorize(securityContext, operationContext, getResourceContextById(id));
IngestionPipeline ingestionPipeline = dao.deletePipelineStatus(id);
return addHref(uriInfo, ingestionPipeline);
}
private IngestionPipeline getIngestionPipeline(CreateIngestionPipeline create, String user) throws IOException {
OpenMetadataConnection openMetadataServerConnection =
new OpenMetadataConnectionBuilder(openMetadataApplicationConfig).build();

View File

@ -33,7 +33,10 @@ import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.UUID;
import java.util.function.Predicate;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.client.HttpResponseException;
@ -49,6 +52,8 @@ import org.openmetadata.schema.api.services.ingestionPipelines.CreateIngestionPi
import org.openmetadata.schema.entity.services.DatabaseService;
import org.openmetadata.schema.entity.services.ingestionPipelines.AirflowConfig;
import org.openmetadata.schema.entity.services.ingestionPipelines.IngestionPipeline;
import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineStatus;
import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineStatusType;
import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineType;
import org.openmetadata.schema.metadataIngestion.DashboardServiceMetadataPipeline;
import org.openmetadata.schema.metadataIngestion.DatabaseServiceMetadataPipeline;
@ -68,6 +73,7 @@ import org.openmetadata.schema.type.EntityReference;
import org.openmetadata.service.Entity;
import org.openmetadata.service.resources.EntityResourceTest;
import org.openmetadata.service.resources.services.DatabaseServiceResourceTest;
import org.openmetadata.service.security.SecurityUtil;
import org.openmetadata.service.util.FullyQualifiedName;
import org.openmetadata.service.util.JsonUtils;
import org.openmetadata.service.util.TestUtils;
@ -81,12 +87,14 @@ public class IngestionPipelineResourceTest extends EntityResourceTest<IngestionP
public static DatabaseServiceResourceTest DATABASE_SERVICE_RESOURCE_TEST;
public static Date START_DATE;
private static final String COLLECTION = "services/ingestionPipelines";
public IngestionPipelineResourceTest() {
super(
Entity.INGESTION_PIPELINE,
IngestionPipeline.class,
IngestionPipelineResource.IngestionPipelineList.class,
"services/ingestionPipelines",
COLLECTION,
IngestionPipelineResource.FIELDS);
}
@ -606,11 +614,73 @@ public class IngestionPipelineResourceTest extends EntityResourceTest<IngestionP
actualDbtS3Config.getDbtSecurityConfig().getAwsSecretAccessKey());
}
@Test
void put_pipelineStatus(TestInfo test) throws IOException {
CreateIngestionPipeline requestPipeline =
createRequest(test)
.withName("ingestion_testStatus")
.withPipelineType(PipelineType.METADATA)
.withService(BIGQUERY_REFERENCE)
.withAirflowConfig(new AirflowConfig().withScheduleInterval("5 * * * *").withStartDate(START_DATE));
IngestionPipeline ingestionPipeline = createAndCheckEntity(requestPipeline, ADMIN_AUTH_HEADERS);
String runId = UUID.randomUUID().toString();
// Create the first status
TestUtils.put(
getPipelineStatusTarget(ingestionPipeline.getFullyQualifiedName()),
new PipelineStatus().withPipelineState(PipelineStatusType.RUNNING).withRunId(runId).withTimestamp(3L),
Response.Status.CREATED,
ADMIN_AUTH_HEADERS);
PipelineStatus pipelineStatus =
TestUtils.get(
getPipelineStatusByRunId(ingestionPipeline.getFullyQualifiedName(), runId),
PipelineStatus.class,
ADMIN_AUTH_HEADERS);
assertEquals(pipelineStatus.getPipelineState(), PipelineStatusType.RUNNING);
// Update it
TestUtils.put(
getPipelineStatusTarget(ingestionPipeline.getFullyQualifiedName()),
new PipelineStatus().withPipelineState(PipelineStatusType.SUCCESS).withRunId(runId).withTimestamp(3L),
Response.Status.CREATED,
ADMIN_AUTH_HEADERS);
pipelineStatus =
TestUtils.get(
getPipelineStatusByRunId(ingestionPipeline.getFullyQualifiedName(), runId),
PipelineStatus.class,
ADMIN_AUTH_HEADERS);
assertEquals(pipelineStatus.getPipelineState(), PipelineStatusType.SUCCESS);
// DELETE all status from the pipeline
TestUtils.delete(getDeletePipelineStatus(ingestionPipeline.getId().toString()), ADMIN_AUTH_HEADERS);
// We get no content back
Response response =
SecurityUtil.addHeaders(
getPipelineStatusByRunId(ingestionPipeline.getFullyQualifiedName(), runId), ADMIN_AUTH_HEADERS)
.get();
TestUtils.readResponse(response, PipelineStatus.class, Status.NO_CONTENT.getStatusCode());
}
private IngestionPipeline updateIngestionPipeline(CreateIngestionPipeline create, Map<String, String> authHeaders)
throws HttpResponseException {
return TestUtils.put(getCollection(), create, IngestionPipeline.class, Status.OK, authHeaders);
}
protected final WebTarget getPipelineStatusTarget(String fqn) {
return getCollection().path("/" + fqn + "/pipelineStatus");
}
protected final WebTarget getPipelineStatusByRunId(String fqn, String runId) {
return getCollection().path("/" + fqn + "/pipelineStatus/" + runId);
}
protected final WebTarget getDeletePipelineStatus(String id) {
return getCollection().path("/" + id + "/pipelineStatus");
}
@Override
public IngestionPipeline validateGetWithDifferentFields(IngestionPipeline ingestion, boolean byName)
throws HttpResponseException {