mirror of
				https://github.com/open-metadata/OpenMetadata.git
				synced 2025-11-04 04:29:13 +00:00 
			
		
		
		
	[Search] Indexing Fixes (#18048)
* Fix OpenSearch Content Size Issue entity content is too long [204857600] for the configured buffer limit [104857600] * Change Type to Long * Add Payload Size to take dynamic entry * Migrations for 1.5.6 * Mark Stale Entries Stopped * Format checkstyle * Fix failure --------- Co-authored-by: Sriharsha Chintalapani <harshach@users.noreply.github.com> (cherry picked from commit f1ae49e1a0fc560b3a611acb855ec5d465deff5c)
This commit is contained in:
		
							parent
							
								
									b2a2d8573f
								
							
						
					
					
						commit
						384cba9bab
					
				@ -0,0 +1,5 @@
 | 
			
		||||
-- Delete Search Indexing Application 
 | 
			
		||||
DELETE er FROM  entity_relationship er JOIN installed_apps ia ON er.fromId = ia.id OR er.toId = ia.id WHERE ia.name = 'SearchIndexingApplication';
 | 
			
		||||
DELETE er FROM  entity_relationship er JOIN apps_marketplace ia ON er.fromId = ia.id OR er.toId = ia.id WHERE ia.name = 'SearchIndexingApplication';
 | 
			
		||||
DELETE FROM  installed_apps where name = 'SearchIndexingApplication';
 | 
			
		||||
DELETE FROM  apps_marketplace where name = 'SearchIndexingApplication';
 | 
			
		||||
@ -0,0 +1,5 @@
 | 
			
		||||
-- Delete Search Indexing Application 
 | 
			
		||||
DELETE FROM  entity_relationship er  USING installed_apps ia WHERE (er.fromId = ia.id OR er.toId = ia.id) AND ia.name = 'SearchIndexingApplication';
 | 
			
		||||
DELETE FROM  entity_relationship er  USING apps_marketplace ia WHERE (er.fromId = ia.id OR er.toId = ia.id) AND ia.name = 'SearchIndexingApplication';
 | 
			
		||||
DELETE FROM  installed_apps where name = 'SearchIndexingApplication';
 | 
			
		||||
DELETE FROM  apps_marketplace where name = 'SearchIndexingApplication';
 | 
			
		||||
@ -162,7 +162,21 @@ public class SearchIndexApp extends AbstractNativeApplication {
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  private void cleanUpStaleJobsFromRuns() {
 | 
			
		||||
    try {
 | 
			
		||||
      collectionDAO
 | 
			
		||||
          .appExtensionTimeSeriesDao()
 | 
			
		||||
          .markStaleEntriesStopped(getApp().getId().toString());
 | 
			
		||||
    } catch (Exception ex) {
 | 
			
		||||
      LOG.error("Failed in Marking Stale Entries Stopped.");
 | 
			
		||||
    }
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  private void initializeJob() {
 | 
			
		||||
    // Remove any Stale Jobs
 | 
			
		||||
    cleanUpStaleJobsFromRuns();
 | 
			
		||||
 | 
			
		||||
    // Initialize New Job
 | 
			
		||||
    int totalRecords = getTotalRequestToProcess(jobData.getEntities(), collectionDAO);
 | 
			
		||||
    this.jobData.setStats(
 | 
			
		||||
        new Stats()
 | 
			
		||||
 | 
			
		||||
@ -4249,6 +4249,16 @@ public interface CollectionDAO {
 | 
			
		||||
        connectionType = POSTGRES)
 | 
			
		||||
    void insert(@Bind("json") String json);
 | 
			
		||||
 | 
			
		||||
    @ConnectionAwareSqlUpdate(
 | 
			
		||||
        value =
 | 
			
		||||
            "UPDATE apps_extension_time_series SET json = JSON_SET(json, '$.status', 'stopped') where appId=:appId AND JSON_UNQUOTE(JSON_EXTRACT(json_column_name, '$.status')) = 'running'",
 | 
			
		||||
        connectionType = MYSQL)
 | 
			
		||||
    @ConnectionAwareSqlUpdate(
 | 
			
		||||
        value =
 | 
			
		||||
            "UPDATE apps_extension_time_series SET json = jsonb_set(json, '{status}', '\"stopped\"') WHERE appId = :appId AND json->>'status' = 'running'",
 | 
			
		||||
        connectionType = POSTGRES)
 | 
			
		||||
    void markStaleEntriesStopped(@Bind("appId") String appId);
 | 
			
		||||
 | 
			
		||||
    @ConnectionAwareSqlUpdate(
 | 
			
		||||
        value =
 | 
			
		||||
            "UPDATE apps_extension_time_series set json = :json where appId=:appId and timestamp=:timestamp",
 | 
			
		||||
 | 
			
		||||
@ -2231,6 +2231,7 @@ public class ElasticSearchClient implements SearchClient {
 | 
			
		||||
                requestConfigBuilder
 | 
			
		||||
                    .setConnectTimeout(esConfig.getConnectionTimeoutSecs() * 1000)
 | 
			
		||||
                    .setSocketTimeout(esConfig.getSocketTimeoutSecs() * 1000));
 | 
			
		||||
        restClientBuilder.setCompressionEnabled(true);
 | 
			
		||||
        return new RestHighLevelClientBuilder(restClientBuilder.build())
 | 
			
		||||
            .setApiCompatibilityMode(true)
 | 
			
		||||
            .build();
 | 
			
		||||
 | 
			
		||||
@ -28,10 +28,10 @@ import org.openmetadata.service.workflows.interfaces.Sink;
 | 
			
		||||
public class ElasticSearchIndexSink implements Sink<BulkRequest, BulkResponse> {
 | 
			
		||||
  private final StepStats stats = new StepStats();
 | 
			
		||||
  private final SearchRepository searchRepository;
 | 
			
		||||
  private final int maxPayLoadSizeInBytes;
 | 
			
		||||
  private final long maxPayLoadSizeInBytes;
 | 
			
		||||
 | 
			
		||||
  public ElasticSearchIndexSink(
 | 
			
		||||
      SearchRepository searchRepository, int total, int maxPayLoadSizeInBytes) {
 | 
			
		||||
      SearchRepository searchRepository, int total, long maxPayLoadSizeInBytes) {
 | 
			
		||||
    this.searchRepository = searchRepository;
 | 
			
		||||
    this.maxPayLoadSizeInBytes = maxPayLoadSizeInBytes;
 | 
			
		||||
    this.stats.withTotalRecords(total).withSuccessRecords(0).withFailedRecords(0);
 | 
			
		||||
 | 
			
		||||
@ -2209,6 +2209,8 @@ public class OpenSearchClient implements SearchClient {
 | 
			
		||||
                requestConfigBuilder
 | 
			
		||||
                    .setConnectTimeout(esConfig.getConnectionTimeoutSecs() * 1000)
 | 
			
		||||
                    .setSocketTimeout(esConfig.getSocketTimeoutSecs() * 1000));
 | 
			
		||||
        restClientBuilder.setCompressionEnabled(true);
 | 
			
		||||
        restClientBuilder.setChunkedEnabled(true);
 | 
			
		||||
        return new RestHighLevelClient(restClientBuilder);
 | 
			
		||||
      } catch (Exception e) {
 | 
			
		||||
        LOG.error("Failed to create open search client ", e);
 | 
			
		||||
 | 
			
		||||
@ -29,9 +29,9 @@ public class OpenSearchIndexSink implements Sink<BulkRequest, BulkResponse> {
 | 
			
		||||
  private final StepStats stats = new StepStats();
 | 
			
		||||
  private final SearchRepository searchRepository;
 | 
			
		||||
 | 
			
		||||
  private final int maxPayLoadSizeInBytes;
 | 
			
		||||
  private final long maxPayLoadSizeInBytes;
 | 
			
		||||
 | 
			
		||||
  public OpenSearchIndexSink(SearchRepository repository, int total, int maxPayLoadSizeInBytes) {
 | 
			
		||||
  public OpenSearchIndexSink(SearchRepository repository, int total, long maxPayLoadSizeInBytes) {
 | 
			
		||||
    this.searchRepository = repository;
 | 
			
		||||
    this.maxPayLoadSizeInBytes = maxPayLoadSizeInBytes;
 | 
			
		||||
    this.stats.withTotalRecords(total).withSuccessRecords(0).withFailedRecords(0);
 | 
			
		||||
 | 
			
		||||
@ -270,8 +270,8 @@ public class OpenMetadataOperations implements Callable<Integer> {
 | 
			
		||||
          int batchSize,
 | 
			
		||||
      @Option(
 | 
			
		||||
              names = {"-p", "--payload-size"},
 | 
			
		||||
              defaultValue = "10485760")
 | 
			
		||||
          int payloadSize,
 | 
			
		||||
              defaultValue = "104857600l")
 | 
			
		||||
          long payloadSize,
 | 
			
		||||
      @Option(
 | 
			
		||||
              names = {"--recreate-indexes"},
 | 
			
		||||
              defaultValue = "true")
 | 
			
		||||
@ -295,7 +295,7 @@ public class OpenMetadataOperations implements Callable<Integer> {
 | 
			
		||||
  }
 | 
			
		||||
 | 
			
		||||
  private int executeSearchReindexApp(
 | 
			
		||||
      String appName, int batchSize, int payloadSize, boolean recreateIndexes) {
 | 
			
		||||
      String appName, int batchSize, long payloadSize, boolean recreateIndexes) {
 | 
			
		||||
    AppRepository appRepository = (AppRepository) Entity.getEntityRepository(Entity.APPLICATION);
 | 
			
		||||
    App originalSearchIndexApp =
 | 
			
		||||
        appRepository.getByName(null, appName, appRepository.getFields("id"));
 | 
			
		||||
 | 
			
		||||
@ -45,6 +45,7 @@
 | 
			
		||||
    ],
 | 
			
		||||
    "recreateIndex": false,
 | 
			
		||||
    "batchSize": "100",
 | 
			
		||||
    "payLoadSize": 104857600,
 | 
			
		||||
    "searchIndexMappingLanguage": "EN"
 | 
			
		||||
  },
 | 
			
		||||
  "appSchedule": {
 | 
			
		||||
 | 
			
		||||
@ -59,6 +59,7 @@
 | 
			
		||||
    ],
 | 
			
		||||
    "recreateIndex": false,
 | 
			
		||||
    "batchSize": "100",
 | 
			
		||||
    "payLoadSize": 104857600,
 | 
			
		||||
    "searchIndexMappingLanguage": "EN"
 | 
			
		||||
  }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@ -38,6 +38,12 @@
 | 
			
		||||
      "type": "integer",
 | 
			
		||||
      "default": 100
 | 
			
		||||
    },
 | 
			
		||||
    "payLoadSize": {
 | 
			
		||||
      "description": "Maximum number of events sent in a batch (Default 100).",
 | 
			
		||||
      "type": "integer",
 | 
			
		||||
      "existingJavaType": "java.lang.Long",
 | 
			
		||||
      "default": 104857600
 | 
			
		||||
    },
 | 
			
		||||
    "searchIndexMappingLanguage": {
 | 
			
		||||
      "description": "Recreate Indexes with updated Language",
 | 
			
		||||
      "$ref": "../../../../configuration/elasticSearchConfiguration.json#/definitions/searchIndexMappingLanguage"
 | 
			
		||||
 | 
			
		||||
@ -102,9 +102,10 @@
 | 
			
		||||
      "type": "integer"
 | 
			
		||||
    },
 | 
			
		||||
    "payLoadSize": {
 | 
			
		||||
      "description": "Payload size in bytes depending on config",
 | 
			
		||||
      "description": "Payload size in bytes depending on config.",
 | 
			
		||||
      "type": "integer",
 | 
			
		||||
      "default": 10485760
 | 
			
		||||
      "existingJavaType": "java.lang.Long",
 | 
			
		||||
      "default": 104857600
 | 
			
		||||
    },
 | 
			
		||||
    "searchIndexMappingLanguage": {
 | 
			
		||||
      "description": "Recreate Indexes with updated Language",
 | 
			
		||||
 | 
			
		||||
@ -10,6 +10,12 @@
 | 
			
		||||
      "type": "integer",
 | 
			
		||||
      "default": 100
 | 
			
		||||
    },
 | 
			
		||||
    "payLoadSize": {
 | 
			
		||||
      "title": "Payload Size",
 | 
			
		||||
      "description": "Maximum number of events entities in a batch (Default 100).",
 | 
			
		||||
      "type": "integer",
 | 
			
		||||
      "default": 104857600
 | 
			
		||||
    },
 | 
			
		||||
    "entities": {
 | 
			
		||||
      "title": "Entities",
 | 
			
		||||
      "description": "List of entities that you need to reindex",
 | 
			
		||||
 | 
			
		||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user