From ac9979070acbb0e6667b5dc0e7a7db46c6307b2d Mon Sep 17 00:00:00 2001 From: Pere Miquel Brull Date: Thu, 6 Apr 2023 21:12:18 +0200 Subject: [PATCH] [WIP] - Fixes #10725 - Fix extension in ingestion pipeline and delete statuses api (#10866) * Fix extension in ingestion pipeline and delete statuses api * Add tests for ingestion pipeline status * Format --- .../v009__create_db_connection_info.sql | 8 +- .../v009__create_db_connection_info.sql | 8 +- .../service/jdbi3/CollectionDAO.java | 96 +++++++++++++++++++ .../jdbi3/IngestionPipelineRepository.java | 41 ++++++-- .../IngestionPipelineResource.java | 26 +++++ .../IngestionPipelineResourceTest.java | 72 +++++++++++++- 6 files changed, 239 insertions(+), 12 deletions(-) diff --git a/bootstrap/sql/com.mysql.cj.jdbc.Driver/v009__create_db_connection_info.sql b/bootstrap/sql/com.mysql.cj.jdbc.Driver/v009__create_db_connection_info.sql index e99f3325229..51a08e908d7 100644 --- a/bootstrap/sql/com.mysql.cj.jdbc.Driver/v009__create_db_connection_info.sql +++ b/bootstrap/sql/com.mysql.cj.jdbc.Driver/v009__create_db_connection_info.sql @@ -141,4 +141,10 @@ SET json = JSON_INSERT( '$.connection.config.databaseName', JSON_EXTRACT(json, '$.connection.config.database') ) where serviceType = 'Druid' - and JSON_EXTRACT(json, '$.connection.config.database') is not null; \ No newline at end of file + and JSON_EXTRACT(json, '$.connection.config.database') is not null; + +-- We were using the same jsonSchema for Pipeline Services and Ingestion Pipeline status +-- Also, we relied on the extension to store the run id +UPDATE entity_extension_time_series +SET jsonSchema = 'ingestionPipelineStatus', extension = 'ingestionPipeline.pipelineStatus' +WHERE jsonSchema = 'pipelineStatus' AND extension <> 'pipeline.PipelineStatus'; diff --git a/bootstrap/sql/org.postgresql.Driver/v009__create_db_connection_info.sql b/bootstrap/sql/org.postgresql.Driver/v009__create_db_connection_info.sql index fd5dd7bfc29..5425f16522e 100644 --- a/bootstrap/sql/org.postgresql.Driver/v009__create_db_connection_info.sql +++ b/bootstrap/sql/org.postgresql.Driver/v009__create_db_connection_info.sql @@ -141,4 +141,10 @@ CREATE TABLE IF NOT EXISTS dashboard_data_model_entity ( UPDATE dbservice_entity SET json = jsonb_set(json::jsonb #- '{connection,config,database}', '{connection,config,databaseName}', json#> '{connection,config,database}', true) -WHERE servicetype = 'Druid' and json #>'{connection,config,database}' is not null; \ No newline at end of file +WHERE servicetype = 'Druid' and json #>'{connection,config,database}' is not null; + +-- We were using the same jsonSchema for Pipeline Services and Ingestion Pipeline status +-- Also, we relied on the extension to store the run id +UPDATE entity_extension_time_series +SET jsonSchema = 'ingestionPipelineStatus', extension = 'ingestionPipeline.pipelineStatus' +WHERE jsonSchema = 'pipelineStatus' AND extension <> 'pipeline.PipelineStatus'; diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java index 6cb02034c4e..fe1f8cb8767 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/CollectionDAO.java @@ -3320,6 +3320,102 @@ public interface CollectionDAO { @Bind("endTs") long endTs, @Define("orderBy") OrderBy orderBy); + default void updateExtensionByKey(String key, String value, String entityFQN, String extension, String json) { + + String mysqlCond = String.format("AND JSON_UNQUOTE(JSON_EXTRACT(json, '$.%s')) = :value", key); + String psqlCond = String.format("AND json->>'%s' = :value", key); + + updateExtensionByKeyInternal(value, entityFQN, extension, json, mysqlCond, psqlCond); + } + + default String getExtensionByKey(String key, String value, String entityFQN, String extension) { + + String mysqlCond = String.format("AND JSON_UNQUOTE(JSON_EXTRACT(json, '$.%s')) = :value", key); + String psqlCond = String.format("AND json->>'%s' = :value", key); + + return getExtensionByKeyInternal(value, entityFQN, extension, mysqlCond, psqlCond); + } + + default String getLatestExtensionByKey(String key, String value, String entityFQN, String extension) { + + String mysqlCond = String.format("AND JSON_UNQUOTE(JSON_EXTRACT(json, '$.%s')) = :value", key); + String psqlCond = String.format("AND json->>'%s' = :value", key); + + return getLatestExtensionByKeyInternal(value, entityFQN, extension, mysqlCond, psqlCond); + } + + /* + * Support updating data filtering by top-level keys in the JSON + */ + @ConnectionAwareSqlUpdate( + value = + "UPDATE entity_extension_time_series SET json = :json " + + "WHERE entityFQN = :entityFQN " + + "AND extension = :extension " + + "", + connectionType = MYSQL) + @ConnectionAwareSqlUpdate( + value = + "UPDATE entity_extension_time_series SET json = (:json :: jsonb) " + + "WHERE entityFQN = :entityFQN " + + "AND extension = :extension " + + "", + connectionType = POSTGRES) + void updateExtensionByKeyInternal( + @Bind("value") String value, + @Bind("entityFQN") String entityFQN, + @Bind("extension") String extension, + @Bind("json") String json, + @Define("mysqlCond") String mysqlCond, + @Define("psqlCond") String psqlCond); + + /* + * Support selecting data filtering by top-level keys in the JSON + */ + @ConnectionAwareSqlQuery( + value = + "SELECT json from entity_extension_time_series " + + "WHERE entityFQN = :entityFQN " + + "AND extension = :extension " + + "", + connectionType = MYSQL) + @ConnectionAwareSqlQuery( + value = + "SELECT json from entity_extension_time_series " + + "WHERE entityFQN = :entityFQN " + + "AND extension = :extension " + + "", + connectionType = POSTGRES) + String getExtensionByKeyInternal( + @Bind("value") String value, + @Bind("entityFQN") String entityFQN, + @Bind("extension") String extension, + @Define("mysqlCond") String mysqlCond, + @Define("psqlCond") String psqlCond); + + @ConnectionAwareSqlQuery( + value = + "SELECT json from entity_extension_time_series " + + "WHERE entityFQN = :entityFQN " + + "AND extension = :extension " + + " " + + "ORDER BY timestamp DESC LIMIT 1", + connectionType = MYSQL) + @ConnectionAwareSqlQuery( + value = + "SELECT json from entity_extension_time_series " + + "WHERE entityFQN = :entityFQN " + + "AND extension = :extension " + + " " + + "ORDER BY timestamp DESC LIMIT 1", + connectionType = POSTGRES) + String getLatestExtensionByKeyInternal( + @Bind("value") String value, + @Bind("entityFQN") String entityFQN, + @Bind("extension") String extension, + @Define("mysqlCond") String mysqlCond, + @Define("psqlCond") String psqlCond); + class ReportDataMapper implements RowMapper { @Override public ReportDataRow map(ResultSet rs, StatementContext ctx) throws SQLException { diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/IngestionPipelineRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/IngestionPipelineRepository.java index 91bd1f66e79..f4fd1e74ade 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/IngestionPipelineRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/IngestionPipelineRepository.java @@ -49,7 +49,9 @@ public class IngestionPipelineRepository extends EntityRepository authHeaders) throws HttpResponseException { return TestUtils.put(getCollection(), create, IngestionPipeline.class, Status.OK, authHeaders); } + protected final WebTarget getPipelineStatusTarget(String fqn) { + return getCollection().path("/" + fqn + "/pipelineStatus"); + } + + protected final WebTarget getPipelineStatusByRunId(String fqn, String runId) { + return getCollection().path("/" + fqn + "/pipelineStatus/" + runId); + } + + protected final WebTarget getDeletePipelineStatus(String id) { + return getCollection().path("/" + id + "/pipelineStatus"); + } + @Override public IngestionPipeline validateGetWithDifferentFields(IngestionPipeline ingestion, boolean byName) throws HttpResponseException {