Reindex improvement (#12036)

* Reindex added for TestCase

* Add Mapping Api endpoint
This commit is contained in:
Mohit Yadav 2023-06-19 23:21:29 +05:30 committed by GitHub
parent 73808bf29b
commit 7adf1ec2b5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 71 additions and 20 deletions

View File

@ -56,6 +56,7 @@ public class ElasticSearchIndexDefinition {
private final CollectionDAO dao;
final EnumMap<ElasticSearchIndexType, ElasticSearchIndexStatus> elasticSearchIndexes =
new EnumMap<>(ElasticSearchIndexType.class);
private static final Map<String, Object> ENTITY_TO_MAPPING_SCHEMA_MAP = new HashMap<>();
protected static final Map<String, String> ENTITY_TYPE_TO_INDEX_MAP;
private static final Map<ElasticSearchIndexType, Set<String>> INDEX_TO_MAPPING_FIELDS_MAP =
@ -145,8 +146,10 @@ public class ElasticSearchIndexDefinition {
GetIndexRequest gRequest = new GetIndexRequest(elasticSearchIndexType.indexName);
gRequest.local(false);
boolean exists = client.indices().exists(gRequest, RequestOptions.DEFAULT);
String elasticSearchIndexMapping = getIndexMapping(elasticSearchIndexType, lang);
ENTITY_TO_MAPPING_SCHEMA_MAP.put(
elasticSearchIndexType.entityType, JsonUtils.getMap(JsonUtils.readJson(elasticSearchIndexMapping)));
if (!exists) {
String elasticSearchIndexMapping = getIndexMapping(elasticSearchIndexType, lang);
CreateIndexRequest request = new CreateIndexRequest(elasticSearchIndexType.indexName);
request.source(elasticSearchIndexMapping, XContentType.JSON);
CreateIndexResponse createIndexResponse = client.indices().create(request, RequestOptions.DEFAULT);
@ -174,6 +177,8 @@ public class ElasticSearchIndexDefinition {
gRequest.local(false);
boolean exists = client.indices().exists(gRequest, RequestOptions.DEFAULT);
String elasticSearchIndexMapping = getIndexMapping(elasticSearchIndexType, lang);
ENTITY_TO_MAPPING_SCHEMA_MAP.put(
elasticSearchIndexType.entityType, JsonUtils.getMap(JsonUtils.readJson(elasticSearchIndexMapping)));
if (exists) {
PutMappingRequest request = new PutMappingRequest(elasticSearchIndexType.indexName);
request.source(elasticSearchIndexMapping, XContentType.JSON);
@ -318,6 +323,15 @@ public class ElasticSearchIndexDefinition {
LOG.error("Failed to Update Elastic Search Job Info");
}
}
public static Map<String, Object> getIndexMappingSchema(Set<String> entities) {
if (entities.contains("*")) {
return ENTITY_TO_MAPPING_SCHEMA_MAP;
}
Map<String, Object> result = new HashMap<>();
entities.forEach((entityType) -> result.put(entityType, ENTITY_TO_MAPPING_SCHEMA_MAP.get(entityType)));
return result;
}
}
@JsonInclude(JsonInclude.Include.NON_NULL)

View File

@ -12,6 +12,7 @@ import org.openmetadata.schema.entity.data.Table;
import org.openmetadata.schema.entity.data.Topic;
import org.openmetadata.schema.entity.teams.Team;
import org.openmetadata.schema.entity.teams.User;
import org.openmetadata.schema.tests.TestCase;
import org.openmetadata.service.Entity;
@Slf4j
@ -42,9 +43,12 @@ public class ElasticSearchIndexFactory {
return new QueryIndex((Query) entity);
case Entity.CONTAINER:
return new ContainerIndex((Container) entity);
case Entity.TEST_CASE:
case Entity.TEST_SUITE:
return new TestCaseIndex((TestCase) entity);
default:
LOG.warn("Ignoring Entity Type {}", entityType);
}
return null;
throw new IllegalArgumentException(String.format("Entity Type [%s] is not valid for Index Factory", entityType));
}
}

View File

@ -260,13 +260,16 @@ public class FormatterUtil {
if (responseCode == Response.Status.CREATED.getStatusCode()
&& !RestUtil.ENTITY_FIELDS_CHANGED.equals(changeType)
&& !responseContext.getEntity().getClass().equals(Thread.class)) {
EntityInterface entityInterface = (EntityInterface) responseContext.getEntity();
EntityReference entityReference = entityInterface.getEntityReference();
String entityType = entityReference.getType();
String entityFQN = entityReference.getFullyQualifiedName();
return getChangeEvent(updateBy, EventType.ENTITY_CREATED, entityType, entityInterface)
.withEntity(entityInterface)
.withEntityFullyQualifiedName(entityFQN);
if (responseContext.getEntity() instanceof EntityInterface) {
EntityInterface entityInterface = (EntityInterface) responseContext.getEntity();
EntityReference entityReference = entityInterface.getEntityReference();
String entityType = entityReference.getType();
String entityFQN = entityReference.getFullyQualifiedName();
return getChangeEvent(updateBy, EventType.ENTITY_CREATED, entityType, entityInterface)
.withEntity(entityInterface)
.withEntityFullyQualifiedName(entityFQN);
}
return null;
}
// PUT or PATCH operation didn't result in any change

View File

@ -2832,21 +2832,21 @@ public interface CollectionDAO {
@ConnectionAwareSqlUpdate(
value =
"UPDATE entity_extension_time_series set json = :json where entityFQN=:entityFQN and extension=:extension and timestamp=:timestamp and json -> '$.operation' = :operation",
"UPDATE entity_extension_time_series set json = :json where entityFQNHash=:entityFQNHash and extension=:extension and timestamp=:timestamp and json -> '$.operation' = :operation",
connectionType = MYSQL)
@ConnectionAwareSqlUpdate(
value =
"UPDATE entity_extension_time_series set json = (:json :: jsonb) where entityFQN=:entityFQN and extension=:extension and timestamp=:timestamp and json #>>'{operation}' = :operation",
"UPDATE entity_extension_time_series set json = (:json :: jsonb) where entityFQNHash=:entityFQNHash and extension=:extension and timestamp=:timestamp and json #>>'{operation}' = :operation",
connectionType = POSTGRES)
void updateExtensionByOperation(
@Bind("entityFQN") String entityFQN,
@Bind("entityFQNHash") String entityFQNHash,
@Bind("extension") String extension,
@Bind("json") String json,
@Bind("timestamp") Long timestamp,
@Bind("operation") String operation);
@SqlQuery(
"SELECT json FROM entity_extension_time_series WHERE entityFQNHash = :entityFQN AND extension = :extension")
"SELECT json FROM entity_extension_time_series WHERE entityFQNHash = :entityFQNHash AND extension = :extension")
String getExtension(@Bind("entityFQNHash") String entityId, @Bind("extension") String extension);
@SqlQuery("SELECT count(*) FROM entity_extension_time_series WHERE EntityFQNHash = :entityFQNHash")
@ -2855,13 +2855,13 @@ public interface CollectionDAO {
@ConnectionAwareSqlQuery(
value =
"WITH data AS (SELECT ROW_NUMBER() OVER(ORDER BY timestamp ASC) AS row_num, json "
+ "FROM entity_extension_time_series WHERE EntityFQNHash = :entityFQNHash) "
+ "FROM entity_extension_time_series WHERE entityFQNHash = :entityFQNHash) "
+ "SELECT row_num, json FROM data WHERE row_num < :before LIMIT :limit",
connectionType = MYSQL)
@ConnectionAwareSqlQuery(
value =
"WITH data AS (SELECT ROW_NUMBER() OVER(ORDER BY timestamp ASC) AS row_num, json "
+ "FROM entity_extension_time_series WHERE EntityFQNHash = :entityFQNHash) "
+ "FROM entity_extension_time_series WHERE entityFQNHash = :entityFQNHash) "
+ "SELECT row_num, json FROM data WHERE row_num < (:before :: integer) LIMIT :limit",
connectionType = POSTGRES)
@RegisterRowMapper(ReportDataMapper.class)
@ -2871,13 +2871,13 @@ public interface CollectionDAO {
@ConnectionAwareSqlQuery(
value =
"WITH data AS (SELECT ROW_NUMBER() OVER(ORDER BY timestamp ASC) AS row_num, json "
+ "FROM entity_extension_time_series WHERE EntityFQNHash = :entityFQNHash) "
+ "FROM entity_extension_time_series WHERE entityFQNHash = :entityFQNHash) "
+ "SELECT row_num, json FROM data WHERE row_num > :after LIMIT :limit",
connectionType = MYSQL)
@ConnectionAwareSqlQuery(
value =
"WITH data AS (SELECT ROW_NUMBER() OVER(ORDER BY timestamp ASC) AS row_num, json "
+ "FROM entity_extension_time_series WHERE EntityFQNHash = :entityFQNHash) "
+ "FROM entity_extension_time_series WHERE entityFQNHash = :entityFQNHash) "
+ "SELECT row_num, json FROM data WHERE row_num > (:after :: integer) LIMIT :limit",
connectionType = POSTGRES)
@RegisterRowMapper(ReportDataMapper.class)
@ -2912,9 +2912,9 @@ public interface CollectionDAO {
@SqlQuery(
"SELECT ranked.json FROM (SELECT json, ROW_NUMBER() OVER(PARTITION BY entityFQNHash ORDER BY timestamp DESC) AS row_num "
+ "FROM entity_extension_time_series WHERE entityFQNHash IN (<entityFQNs>)) ranked WHERE ranked.row_num = 1")
+ "FROM entity_extension_time_series WHERE entityFQNHash IN (<entityFQNHashes>)) ranked WHERE ranked.row_num = 1")
List<String> getLatestExtensionByFQNs(
@BindList("entityFQNs") List<String> entityFQNs, @Bind("extension") String extension);
@BindList("entityFQNHashes") List<String> entityFQNHashes, @Bind("extension") String extension);
@SqlQuery(
"SELECT json FROM entity_extension_time_series WHERE extension = :extension "
@ -2940,7 +2940,7 @@ public interface CollectionDAO {
// This just saves the limit number of records, and remove all other with given extension
@SqlUpdate(
"DELETE FROM entity_extension_time_series WHERE extension = :extension AND entityFQNHash NOT IN(SELECT entityFQN FROM (select * from entity_extension_time_series WHERE extension = :extension ORDER BY timestamp DESC LIMIT :records) AS subquery)")
"DELETE FROM entity_extension_time_series WHERE extension = :extension AND entityFQNHash NOT IN(SELECT entityFQNHash FROM (select * from entity_extension_time_series WHERE extension = :extension ORDER BY timestamp DESC LIMIT :records) AS subquery)")
void deleteLastRecords(@Bind("extension") String extension, @Bind("records") int noOfRecord);
@SqlUpdate(

View File

@ -20,6 +20,7 @@ import static org.openmetadata.service.Entity.FIELD_DISPLAY_NAME;
import static org.openmetadata.service.Entity.FIELD_NAME;
import static org.openmetadata.service.elasticsearch.ElasticSearchIndexDefinition.ELASTIC_SEARCH_ENTITY_FQN_STREAM;
import static org.openmetadata.service.elasticsearch.ElasticSearchIndexDefinition.ELASTIC_SEARCH_EXTENSION;
import static org.openmetadata.service.elasticsearch.ElasticSearchIndexDefinition.getIndexMappingSchema;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
@ -28,8 +29,11 @@ import io.swagger.v3.oas.annotations.media.Schema;
import io.swagger.v3.oas.annotations.responses.ApiResponse;
import io.swagger.v3.oas.annotations.tags.Tag;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import javax.validation.Valid;
@ -494,6 +498,32 @@ public class SearchResource {
return Response.status(Response.Status.OK).entity(ReIndexingHandler.getInstance().getJob(id)).build();
}
@GET
@Path("/mappings")
@Operation(
operationId = "getSearchMappingSchema",
summary = "Get Search Mapping Schema",
description = "Get Search Mapping Schema",
responses = {
@ApiResponse(responseCode = "200", description = "Success"),
@ApiResponse(responseCode = "404", description = "Not found")
})
public Response getElasticSearchMappingSchema(
@Context UriInfo uriInfo,
@Context SecurityContext securityContext,
@Parameter(description = "List of Entities to get schema for") @QueryParam("entityType") String entityType) {
// Only admins or bot can issue a reindex request
authorizer.authorizeAdminOrBot(securityContext);
Set<String> entities;
if (entityType == null) {
entities = new HashSet<>();
entities.add("*");
} else {
entities = new HashSet<>(Arrays.asList(entityType.replace(" ", "").split(",")));
}
return Response.status(Response.Status.OK).entity(getIndexMappingSchema(entities)).build();
}
@GET
@Path("/reindex")
@Operation(