[Search] Indexing Fixes (#18048)

* Fix OpenSearch Content Size Issue
entity content is too long [204857600] for the configured buffer limit [104857600]

* Change Type to Long

* Add Payload Size to take dynamic entry

* Migrations for 1.5.6

* Mark Stale Entries Stopped

* Format checkstyle

* Fix failure

---------

Co-authored-by: Sriharsha Chintalapani <harshach@users.noreply.github.com>
This commit is contained in:
Mohit Yadav 2024-09-30 23:39:27 +05:30 committed by GitHub
parent 9552886f29
commit f1ae49e1a0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 61 additions and 9 deletions

View File

@ -0,0 +1,5 @@
-- Delete Search Indexing Application
DELETE er FROM entity_relationship er JOIN installed_apps ia ON er.fromId = ia.id OR er.toId = ia.id WHERE ia.name = 'SearchIndexingApplication';
DELETE er FROM entity_relationship er JOIN apps_marketplace ia ON er.fromId = ia.id OR er.toId = ia.id WHERE ia.name = 'SearchIndexingApplication';
DELETE FROM installed_apps where name = 'SearchIndexingApplication';
DELETE FROM apps_marketplace where name = 'SearchIndexingApplication';

View File

@ -0,0 +1,5 @@
-- Delete Search Indexing Application
DELETE FROM entity_relationship er USING installed_apps ia WHERE (er.fromId = ia.id OR er.toId = ia.id) AND ia.name = 'SearchIndexingApplication';
DELETE FROM entity_relationship er USING apps_marketplace ia WHERE (er.fromId = ia.id OR er.toId = ia.id) AND ia.name = 'SearchIndexingApplication';
DELETE FROM installed_apps where name = 'SearchIndexingApplication';
DELETE FROM apps_marketplace where name = 'SearchIndexingApplication';

View File

@ -163,7 +163,21 @@ public class SearchIndexApp extends AbstractNativeApplication {
}
}
private void cleanUpStaleJobsFromRuns() {
try {
collectionDAO
.appExtensionTimeSeriesDao()
.markStaleEntriesStopped(getApp().getId().toString());
} catch (Exception ex) {
LOG.error("Failed in Marking Stale Entries Stopped.");
}
}
private void initializeJob() {
// Remove any Stale Jobs
cleanUpStaleJobsFromRuns();
// Initialize New Job
int totalRecords = getTotalRequestToProcess(jobData.getEntities(), collectionDAO);
this.jobData.setStats(
new Stats()

View File

@ -4252,6 +4252,16 @@ public interface CollectionDAO {
connectionType = POSTGRES)
void insert(@Bind("json") String json);
@ConnectionAwareSqlUpdate(
value =
"UPDATE apps_extension_time_series SET json = JSON_SET(json, '$.status', 'stopped') where appId=:appId AND JSON_UNQUOTE(JSON_EXTRACT(json_column_name, '$.status')) = 'running'",
connectionType = MYSQL)
@ConnectionAwareSqlUpdate(
value =
"UPDATE apps_extension_time_series SET json = jsonb_set(json, '{status}', '\"stopped\"') WHERE appId = :appId AND json->>'status' = 'running'",
connectionType = POSTGRES)
void markStaleEntriesStopped(@Bind("appId") String appId);
@ConnectionAwareSqlUpdate(
value =
"UPDATE apps_extension_time_series set json = :json where appId=:appId and timestamp=:timestamp",

View File

@ -2228,6 +2228,7 @@ public class ElasticSearchClient implements SearchClient {
requestConfigBuilder
.setConnectTimeout(esConfig.getConnectionTimeoutSecs() * 1000)
.setSocketTimeout(esConfig.getSocketTimeoutSecs() * 1000));
restClientBuilder.setCompressionEnabled(true);
return new RestHighLevelClientBuilder(restClientBuilder.build())
.setApiCompatibilityMode(true)
.build();

View File

@ -28,10 +28,10 @@ import org.openmetadata.service.workflows.interfaces.Sink;
public class ElasticSearchIndexSink implements Sink<BulkRequest, BulkResponse> {
private final StepStats stats = new StepStats();
private final SearchRepository searchRepository;
private final int maxPayLoadSizeInBytes;
private final long maxPayLoadSizeInBytes;
public ElasticSearchIndexSink(
SearchRepository searchRepository, int total, int maxPayLoadSizeInBytes) {
SearchRepository searchRepository, int total, long maxPayLoadSizeInBytes) {
this.searchRepository = searchRepository;
this.maxPayLoadSizeInBytes = maxPayLoadSizeInBytes;
this.stats.withTotalRecords(total).withSuccessRecords(0).withFailedRecords(0);

View File

@ -2193,6 +2193,8 @@ public class OpenSearchClient implements SearchClient {
requestConfigBuilder
.setConnectTimeout(esConfig.getConnectionTimeoutSecs() * 1000)
.setSocketTimeout(esConfig.getSocketTimeoutSecs() * 1000));
restClientBuilder.setCompressionEnabled(true);
restClientBuilder.setChunkedEnabled(true);
return new RestHighLevelClient(restClientBuilder);
} catch (Exception e) {
LOG.error("Failed to create open search client ", e);

View File

@ -29,9 +29,9 @@ public class OpenSearchIndexSink implements Sink<BulkRequest, BulkResponse> {
private final StepStats stats = new StepStats();
private final SearchRepository searchRepository;
private final int maxPayLoadSizeInBytes;
private final long maxPayLoadSizeInBytes;
public OpenSearchIndexSink(SearchRepository repository, int total, int maxPayLoadSizeInBytes) {
public OpenSearchIndexSink(SearchRepository repository, int total, long maxPayLoadSizeInBytes) {
this.searchRepository = repository;
this.maxPayLoadSizeInBytes = maxPayLoadSizeInBytes;
this.stats.withTotalRecords(total).withSuccessRecords(0).withFailedRecords(0);

View File

@ -270,8 +270,8 @@ public class OpenMetadataOperations implements Callable<Integer> {
int batchSize,
@Option(
names = {"-p", "--payload-size"},
defaultValue = "10485760")
int payloadSize,
defaultValue = "104857600l")
long payloadSize,
@Option(
names = {"--recreate-indexes"},
defaultValue = "true")
@ -295,7 +295,7 @@ public class OpenMetadataOperations implements Callable<Integer> {
}
private int executeSearchReindexApp(
String appName, int batchSize, int payloadSize, boolean recreateIndexes) {
String appName, int batchSize, long payloadSize, boolean recreateIndexes) {
AppRepository appRepository = (AppRepository) Entity.getEntityRepository(Entity.APPLICATION);
App originalSearchIndexApp =
appRepository.getByName(null, appName, appRepository.getFields("id"));

View File

@ -45,6 +45,7 @@
],
"recreateIndex": false,
"batchSize": "100",
"payLoadSize": 104857600,
"searchIndexMappingLanguage": "EN"
},
"appSchedule": {

View File

@ -60,6 +60,7 @@
],
"recreateIndex": false,
"batchSize": "100",
"payLoadSize": 104857600,
"searchIndexMappingLanguage": "EN"
}
}

View File

@ -38,6 +38,12 @@
"type": "integer",
"default": 100
},
"payLoadSize": {
"description": "Maximum number of events sent in a batch (Default 100).",
"type": "integer",
"existingJavaType": "java.lang.Long",
"default": 104857600
},
"searchIndexMappingLanguage": {
"description": "Recreate Indexes with updated Language",
"$ref": "../../../../configuration/elasticSearchConfiguration.json#/definitions/searchIndexMappingLanguage"

View File

@ -102,9 +102,10 @@
"type": "integer"
},
"payLoadSize": {
"description": "Payload size in bytes depending on config",
"description": "Payload size in bytes depending on config.",
"type": "integer",
"default": 10485760
"existingJavaType": "java.lang.Long",
"default": 104857600
},
"searchIndexMappingLanguage": {
"description": "Recreate Indexes with updated Language",

View File

@ -10,6 +10,12 @@
"type": "integer",
"default": 100
},
"payLoadSize": {
"title": "Payload Size",
"description": "Maximum number of events entities in a batch (Default 100).",
"type": "integer",
"default": 104857600
},
"entities": {
"title": "Entities",
"description": "List of entities that you need to reindex",