mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-07-30 20:59:57 +00:00
Pagination query task (#9377)
* added new api for fetching table queries * fix a small error * corrected some line of code Co-authored-by: Himank Mehta <himankmehta@Himanks-MacBook-Air.local>
This commit is contained in:
parent
92bad23fcf
commit
f68d77cf51
@ -17,9 +17,11 @@ import static org.openmetadata.service.Entity.ORGANIZATION_NAME;
|
||||
import static org.openmetadata.service.jdbi3.locator.ConnectionType.MYSQL;
|
||||
import static org.openmetadata.service.jdbi3.locator.ConnectionType.POSTGRES;
|
||||
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import java.io.IOException;
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.SQLException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
@ -86,7 +88,9 @@ import org.openmetadata.schema.settings.SettingsType;
|
||||
import org.openmetadata.schema.tests.TestCase;
|
||||
import org.openmetadata.schema.tests.TestDefinition;
|
||||
import org.openmetadata.schema.tests.TestSuite;
|
||||
import org.openmetadata.schema.type.EntityReference;
|
||||
import org.openmetadata.schema.type.Relationship;
|
||||
import org.openmetadata.schema.type.SQLQuery;
|
||||
import org.openmetadata.schema.type.TagCategory;
|
||||
import org.openmetadata.schema.type.TagLabel;
|
||||
import org.openmetadata.schema.type.TaskStatus;
|
||||
@ -394,6 +398,71 @@ public interface CollectionDAO {
|
||||
@SqlQuery("SELECT json FROM entity_extension WHERE id = :id AND extension = :extension")
|
||||
String getExtension(@Bind("id") String id, @Bind("extension") String extension);
|
||||
|
||||
@ConnectionAwareSqlQuery(
|
||||
value =
|
||||
"select count(*) from entity_extension d, json_table(d.json, '$[*]' columns ("
|
||||
+ " vote double path '$.vote', "
|
||||
+ " query varchar(200) path '$.query',"
|
||||
+ " users json path '$.users',"
|
||||
+ " checksum varchar(200) path '$.checksum',"
|
||||
+ " duration double path '$.duration',"
|
||||
+ " queryDate varchar(200) path '$.queryDate'"
|
||||
+ " )"
|
||||
+ ""
|
||||
+ ") as j where d.id = :id",
|
||||
connectionType = MYSQL)
|
||||
@ConnectionAwareSqlQuery(
|
||||
value =
|
||||
"select count(*) from entity_extension as ee , jsonb_to_recordset(ee.json) as x (vote decimal,query varchar,users json,checksum varchar,duration decimal,queryDate varchar) where ee.id = :id ",
|
||||
connectionType = POSTGRES)
|
||||
int getTotalQueriesCount(@Bind("id") String id);
|
||||
|
||||
@RegisterRowMapper(SqlQueryMapper.class)
|
||||
@ConnectionAwareSqlQuery(
|
||||
value =
|
||||
"select j.* from entity_extension d, json_table(d.json, '$[*]' columns ("
|
||||
+ " vote Double path '$.vote', "
|
||||
+ " query varchar(200) path '$.query',"
|
||||
+ " users json path '$.users',"
|
||||
+ " checksum varchar(200) path '$.checksum',"
|
||||
+ " duration Double path '$.duration',"
|
||||
+ " queryDate varchar(200) path '$.queryDate'"
|
||||
+ " )"
|
||||
+ ") as j where d.id = :id AND d.extension = :extension And query > :after order by query LIMIT :limit",
|
||||
connectionType = MYSQL)
|
||||
@ConnectionAwareSqlQuery(
|
||||
value =
|
||||
"select x.* from entity_extension as ee , jsonb_to_recordset(ee.json) as x (vote decimal,query varchar,users json,checksum varchar,duration decimal,queryDate varchar) where ee.id = :id and ee.extension = :extension and query > :after order by query LIMIT :limit ",
|
||||
connectionType = POSTGRES)
|
||||
List<SQLQuery> getExtensionPagination(
|
||||
@Bind("id") String id,
|
||||
@Bind("extension") String extension,
|
||||
@Bind("limit") int limit,
|
||||
@Bind("after") String after);
|
||||
|
||||
@RegisterRowMapper(SqlQueryMapper.class)
|
||||
@ConnectionAwareSqlQuery(
|
||||
value =
|
||||
"select j.* from entity_extension d, json_table(d.json, '$[*]' columns ("
|
||||
+ " vote Double path '$.vote',"
|
||||
+ " query varchar(200) path '$.query',"
|
||||
+ " users json path '$.users',"
|
||||
+ " checksum varchar(200) path '$.checksum',"
|
||||
+ " duration Double path '$.duration',"
|
||||
+ " queryDate varchar(200) path '$.queryDate'"
|
||||
+ " )"
|
||||
+ ") as j where d.id = :id AND d.extension = :extension And query < :before order by query LIMIT :limit",
|
||||
connectionType = MYSQL)
|
||||
@ConnectionAwareSqlQuery(
|
||||
value =
|
||||
"select x.* from entity_extension as ee , jsonb_to_recordset(ee.json) as x (vote decimal,query varchar,users json,checksum varchar,duration decimal,queryDate varchar) where ee.id = :id and ee.extension = :extension and query < : before order by query LIMIT :limit ",
|
||||
connectionType = POSTGRES)
|
||||
List<SQLQuery> listBeforeTableQueries(
|
||||
@Bind("id") String id,
|
||||
@Bind("extension") String extension,
|
||||
@Bind("limit") int limit,
|
||||
@Bind("before") String before);
|
||||
|
||||
@RegisterRowMapper(ExtensionMapper.class)
|
||||
@SqlQuery(
|
||||
"SELECT extension, json FROM entity_extension WHERE id = :id AND extension "
|
||||
@ -438,6 +507,26 @@ public interface CollectionDAO {
|
||||
}
|
||||
}
|
||||
|
||||
class SqlQueryMapper implements RowMapper<SQLQuery> {
|
||||
@Override
|
||||
public SQLQuery map(ResultSet rs, StatementContext ctx) throws SQLException {
|
||||
List<EntityReference> users = new ArrayList<>();
|
||||
String json = rs.getString("users");
|
||||
try {
|
||||
users = JsonUtils.readValue(json, new TypeReference<ArrayList<EntityReference>>() {});
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
return new SQLQuery()
|
||||
.withVote(rs.getDouble("vote"))
|
||||
.withQuery(rs.getString("query"))
|
||||
.withUsers(users)
|
||||
.withChecksum(rs.getString("checksum"))
|
||||
.withDuration(rs.getDouble("duration"))
|
||||
.withQueryDate(rs.getString("queryDate"));
|
||||
}
|
||||
}
|
||||
|
||||
@Getter
|
||||
@Builder
|
||||
class EntityRelationshipRecord {
|
||||
|
@ -519,6 +519,58 @@ public class TableRepository extends EntityRepository<Table> {
|
||||
return table.withTableQueries(getQueries(table));
|
||||
}
|
||||
|
||||
public ResultList<SQLQuery> getQueriesForPagination(UUID id, int limit, String before, String after) {
|
||||
RestUtil.validateCursors(before, after);
|
||||
int total = daoCollection.entityExtensionDAO().getTotalQueriesCount(id.toString());
|
||||
List<SQLQuery> tableQueries = null;
|
||||
if (before != null) {
|
||||
tableQueries =
|
||||
daoCollection
|
||||
.entityExtensionDAO()
|
||||
.listBeforeTableQueries(id.toString(), "table.tableQueries", limit + 1, RestUtil.decodeCursor(before));
|
||||
} else {
|
||||
tableQueries =
|
||||
daoCollection
|
||||
.entityExtensionDAO()
|
||||
.getExtensionPagination(
|
||||
id.toString(), "table.tableQueries", limit + 1, after == null ? "" : RestUtil.decodeCursor(after));
|
||||
}
|
||||
ResultList<SQLQuery> tablQueriesList;
|
||||
if (before != null) {
|
||||
tablQueriesList = getBeforeListForTableQueries(tableQueries, limit, total);
|
||||
} else {
|
||||
tablQueriesList = getAfterListForTableQueries(after, tableQueries, limit, total);
|
||||
}
|
||||
return tablQueriesList;
|
||||
}
|
||||
|
||||
private ResultList<SQLQuery> getBeforeListForTableQueries(List<SQLQuery> tableQueries, int limit, int total) {
|
||||
String beforeCursor = null;
|
||||
String afterCursor;
|
||||
if (tableQueries.size() > limit) { // If extra result exists, then previous page exists - return before cursor
|
||||
tableQueries.remove(0);
|
||||
beforeCursor = tableQueries.get(0).getQuery();
|
||||
}
|
||||
afterCursor = tableQueries.get(tableQueries.size() - 1).getQuery();
|
||||
return getQueryResultList(tableQueries, beforeCursor, afterCursor, total);
|
||||
}
|
||||
|
||||
private ResultList<SQLQuery> getAfterListForTableQueries(
|
||||
String after, List<SQLQuery> tableQueries, int limit, int total) {
|
||||
String beforeCursor;
|
||||
String afterCursor = null;
|
||||
beforeCursor = after == null ? null : tableQueries.get(0).getQuery();
|
||||
if (tableQueries.size() > limit) { // If extra result exists, then next page exists - return after cursor
|
||||
tableQueries.remove(limit);
|
||||
afterCursor = tableQueries.get(limit - 1).getQuery();
|
||||
}
|
||||
return getQueryResultList(tableQueries, beforeCursor, afterCursor, total);
|
||||
}
|
||||
|
||||
private ResultList<SQLQuery> getQueryResultList(List<SQLQuery> queries, String before, String after, int total) {
|
||||
return new ResultList<>(queries, before, after, total);
|
||||
}
|
||||
|
||||
private List<SQLQuery> getQueries(Table table) throws IOException {
|
||||
List<SQLQuery> tableQueries =
|
||||
JsonUtils.readObjects(
|
||||
|
@ -895,6 +895,38 @@ public class TableResource extends EntityResource<Table, TableRepository> {
|
||||
return addHref(uriInfo, table);
|
||||
}
|
||||
|
||||
@GET
|
||||
@Path("/{id}/getTableQueries")
|
||||
@Operation(
|
||||
operationId = "getTableQueryList",
|
||||
summary = "get table query data",
|
||||
tags = "tables",
|
||||
description = "get table query data from the table.",
|
||||
responses = {
|
||||
@ApiResponse(
|
||||
responseCode = "200",
|
||||
description = "OK",
|
||||
content = @Content(mediaType = "application/json", schema = @Schema(implementation = Table.class)))
|
||||
})
|
||||
public ResultList<SQLQuery> getTableQueryList(
|
||||
@Context UriInfo uriInfo,
|
||||
@Context SecurityContext securityContext,
|
||||
@Parameter(description = "Id of the table", schema = @Schema(type = "UUID")) @PathParam("id") UUID id,
|
||||
@Valid SQLQuery sqlQuery,
|
||||
@DefaultValue("10") @Min(0) @Max(1000000) @QueryParam("limit") int limitParam,
|
||||
@Parameter(description = "Returns list of users before this cursor", schema = @Schema(type = "string"))
|
||||
@QueryParam("before")
|
||||
String before,
|
||||
@Parameter(description = "Returns list of users after this cursor", schema = @Schema(type = "string"))
|
||||
@QueryParam("after")
|
||||
String after)
|
||||
throws IOException {
|
||||
OperationContext operationContext = new OperationContext(entityType, MetadataOperation.VIEW_QUERIES);
|
||||
authorizer.authorize(securityContext, operationContext, getResourceContextById(id));
|
||||
ResultList<SQLQuery> getTableQueryList = dao.getQueriesForPagination(id, limitParam, before, after);
|
||||
return getTableQueryList;
|
||||
}
|
||||
|
||||
@PUT
|
||||
@Path("/{id}/dataModel")
|
||||
@Operation(
|
||||
|
Loading…
x
Reference in New Issue
Block a user