Merge branch 'main' into feature/custom-workflows

This commit is contained in:
Ram Narayan Balaji 2025-09-25 21:17:07 +05:30 committed by GitHub
commit 016ed1a39b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
29 changed files with 1285 additions and 65 deletions

View File

@ -0,0 +1,2 @@
-- Add migrations to fetch updated searchSettings
-- Java migration handles adding entityType.keyword aggregation

View File

@ -0,0 +1,3 @@
-- Add migrations to fetch updated searchSettings
-- Java migration handles adding entityType.keyword aggregation

View File

@ -1,6 +1,7 @@
package org.openmetadata.service.apps.bundles.searchIndex;
import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.ENTITY_TYPE_KEY;
import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.TARGET_INDEX_KEY;
import es.org.elasticsearch.action.bulk.BackoffPolicy;
import es.org.elasticsearch.action.bulk.BulkProcessor;
@ -199,7 +200,10 @@ public class ElasticSearchBulkSink implements BulkSink {
LOG.debug("No index mapping found for entityType '{}'. Skipping indexing.", entityType);
return;
}
String indexName = indexMapping.getIndexName(searchRepository.getClusterAlias());
String indexName =
(String)
contextData.getOrDefault(
TARGET_INDEX_KEY, indexMapping.getIndexName(searchRepository.getClusterAlias()));
try {
// Check if these are time series entities

View File

@ -1,6 +1,7 @@
package org.openmetadata.service.apps.bundles.searchIndex;
import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.ENTITY_TYPE_KEY;
import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.TARGET_INDEX_KEY;
import java.util.List;
import java.util.Map;
@ -199,7 +200,10 @@ public class OpenSearchBulkSink implements BulkSink {
LOG.debug("No index mapping found for entityType '{}'. Skipping indexing.", entityType);
return;
}
String indexName = indexMapping.getIndexName(searchRepository.getClusterAlias());
String indexName =
(String)
contextData.getOrDefault(
TARGET_INDEX_KEY, indexMapping.getIndexName(searchRepository.getClusterAlias()));
try {
// Check if these are time series entities

View File

@ -10,6 +10,7 @@ import static org.openmetadata.service.apps.scheduler.OmAppJobListener.APP_CONFI
import static org.openmetadata.service.apps.scheduler.OmAppJobListener.APP_RUN_STATS;
import static org.openmetadata.service.apps.scheduler.OmAppJobListener.WEBSOCKET_STATUS_CHANNEL;
import static org.openmetadata.service.socket.WebSocketManager.SEARCH_INDEX_JOB_BROADCAST_CHANNEL;
import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.TARGET_INDEX_KEY;
import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.isDataInsightIndex;
import com.fasterxml.jackson.core.type.TypeReference;
@ -19,6 +20,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
@ -61,6 +63,7 @@ import org.openmetadata.service.jdbi3.EntityTimeSeriesRepository;
import org.openmetadata.service.jdbi3.ListFilter;
import org.openmetadata.service.jdbi3.SystemRepository;
import org.openmetadata.service.search.RecreateIndexHandler;
import org.openmetadata.service.search.RecreateIndexHandler.ReindexContext;
import org.openmetadata.service.search.SearchClusterMetrics;
import org.openmetadata.service.search.SearchRepository;
import org.openmetadata.service.socket.WebSocketManager;
@ -130,6 +133,7 @@ public class SearchIndexApp extends AbstractNativeApplication {
private BulkSink searchIndexSink;
private RecreateIndexHandler recreateIndexHandler;
private ReindexContext recreateContext;
@Getter private EventPublisherJob jobData;
private ExecutorService producerExecutor;
@ -201,6 +205,7 @@ public class SearchIndexApp extends AbstractNativeApplication {
consecutiveSuccesses.set(0);
lastBackpressureTime = 0;
originalBatchSize.set(0);
recreateContext = null;
}
private void initializeJobData(JobExecutionContext jobExecutionContext) {
@ -232,23 +237,29 @@ public class SearchIndexApp extends AbstractNativeApplication {
}
private void runReindexing(JobExecutionContext jobExecutionContext) throws Exception {
setupEntities();
LOG.info(
"Search Index Job Started for Entities: {}, RecreateIndex: {}",
jobData.getEntities(),
jobData.getRecreateIndex());
boolean success = false;
try {
setupEntities();
LOG.info(
"Search Index Job Started for Entities: {}, RecreateIndex: {}",
jobData.getEntities(),
jobData.getRecreateIndex());
SearchClusterMetrics clusterMetrics = initializeJob(jobExecutionContext);
SearchClusterMetrics clusterMetrics = initializeJob(jobExecutionContext);
if (Boolean.TRUE.equals(jobData.getRecreateIndex())) {
recreateIndicesIfNeeded();
if (Boolean.TRUE.equals(jobData.getRecreateIndex())) {
recreateIndicesIfNeeded();
}
updateJobStatus(EventPublisherJob.Status.RUNNING);
reIndexFromStartToEnd(clusterMetrics);
closeSinkIfNeeded();
updateFinalJobStatus();
success = jobData != null && jobData.getStatus() == EventPublisherJob.Status.COMPLETED;
handleJobCompletion();
} finally {
finalizeRecreateIndexes(success);
}
updateJobStatus(EventPublisherJob.Status.RUNNING);
reIndexFromStartToEnd(clusterMetrics);
closeSinkIfNeeded();
updateFinalJobStatus();
handleJobCompletion();
}
private void setupEntities() {
@ -265,7 +276,7 @@ public class SearchIndexApp extends AbstractNativeApplication {
if (jobLogger != null) {
jobLogger.addInitDetail(RECREATING_INDICES, "Yes");
}
reCreateIndexes(jobData.getEntities());
recreateContext = reCreateIndexes(jobData.getEntities());
}
private void closeSinkIfNeeded() throws IOException {
@ -342,6 +353,37 @@ public class SearchIndexApp extends AbstractNativeApplication {
}
}
private Optional<String> getTargetIndexForEntity(String entityType) {
if (recreateContext == null) {
return Optional.empty();
}
Optional<String> stagedIndex = recreateContext.getStagedIndex(entityType);
if (stagedIndex.isPresent()) {
return stagedIndex;
}
if (QUERY_COST_RESULT_INCORRECT.equals(entityType)) {
return recreateContext.getStagedIndex(QUERY_COST_RECORD);
}
return Optional.empty();
}
private void finalizeRecreateIndexes(boolean success) {
if (recreateIndexHandler == null || recreateContext == null) {
return;
}
try {
recreateIndexHandler.finalizeReindex(recreateContext, success);
} catch (Exception ex) {
LOG.error("Failed to finalize index recreation flow", ex);
} finally {
recreateContext = null;
}
}
private void updateStoppedStatusInJobDataMap(JobExecutionContext jobExecutionContext) {
LOG.info("Ensuring final STOPPED status in JobDataMap");
AppRunRecord appRecord = getJobRecord(jobExecutionContext);
@ -1307,8 +1349,11 @@ public class SearchIndexApp extends AbstractNativeApplication {
}
}
private void reCreateIndexes(Set<String> entities) {
recreateIndexHandler.reCreateIndexes(entities);
private ReindexContext reCreateIndexes(Set<String> entities) {
if (recreateIndexHandler == null) {
return null;
}
return recreateIndexHandler.reCreateIndexes(entities);
}
private Source<?> createSource(String entityType) {
@ -1509,6 +1554,8 @@ public class SearchIndexApp extends AbstractNativeApplication {
Map<String, Object> contextData = new HashMap<>();
contextData.put(ENTITY_TYPE_KEY, entityType);
contextData.put(RECREATE_INDEX, jobData.getRecreateIndex());
getTargetIndexForEntity(entityType)
.ifPresent(index -> contextData.put(TARGET_INDEX_KEY, index));
return contextData;
}

View File

@ -0,0 +1,19 @@
package org.openmetadata.service.migration.mysql.v1911;
import lombok.SneakyThrows;
import org.openmetadata.service.migration.api.MigrationProcessImpl;
import org.openmetadata.service.migration.utils.MigrationFile;
import org.openmetadata.service.migration.utils.v1911.MigrationUtil;
public class Migration extends MigrationProcessImpl {
public Migration(MigrationFile migrationFile) {
super(migrationFile);
}
@Override
@SneakyThrows
public void runDataMigration() {
MigrationUtil.updateSearchSettingsEntityTypeKeyword();
}
}

View File

@ -0,0 +1,19 @@
package org.openmetadata.service.migration.postgres.v1911;
import lombok.SneakyThrows;
import org.openmetadata.service.migration.api.MigrationProcessImpl;
import org.openmetadata.service.migration.utils.MigrationFile;
import org.openmetadata.service.migration.utils.v1911.MigrationUtil;
public class Migration extends MigrationProcessImpl {
public Migration(MigrationFile migrationFile) {
super(migrationFile);
}
@Override
@SneakyThrows
public void runDataMigration() {
MigrationUtil.updateSearchSettingsEntityTypeKeyword();
}
}

View File

@ -0,0 +1,87 @@
package org.openmetadata.service.migration.utils.v1911;
import com.fasterxml.jackson.databind.JsonNode;
import jakarta.json.Json;
import jakarta.json.JsonPatch;
import jakarta.json.JsonPatchBuilder;
import jakarta.json.JsonValue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.StreamSupport;
import lombok.extern.slf4j.Slf4j;
import org.openmetadata.schema.api.search.SearchSettings;
import org.openmetadata.schema.settings.Settings;
import org.openmetadata.schema.utils.JsonUtils;
import org.openmetadata.service.Entity;
import org.openmetadata.service.jdbi3.SystemRepository;
@Slf4j
public class MigrationUtil {
private static final SystemRepository systemRepository = Entity.getSystemRepository();
private static final String SEARCH_SETTINGS_KEY = "searchSettings";
private static final String AGGREGATIONS = "aggregations";
private static final String GLOBAL_SETTINGS = "globalSettings";
private static final String NAME = "name";
private static final String TYPE = "terms";
private static final String FIELD = "field";
private static final String ENTITY_TYPE_KEYWORD = "entityType.keyword";
public static void updateSearchSettingsEntityTypeKeyword() {
try {
LOG.info("Updating search settings to ensure entityType.keyword aggregation exists");
Settings searchSettings = systemRepository.getConfigWithKey(SEARCH_SETTINGS_KEY);
if (searchSettings == null) {
LOG.warn("Search settings not found, skipping migration");
return;
}
String rawJson = JsonUtils.pojoToJson(searchSettings.getConfigValue());
LOG.debug("Current search settings JSON: {}", rawJson);
JsonNode settingsNode = JsonUtils.readTree(rawJson);
JsonPatchBuilder patchBuilder = Json.createPatchBuilder();
AtomicBoolean needsUpdate = new AtomicBoolean(false);
JsonNode globalSettings = settingsNode.get(GLOBAL_SETTINGS);
if (globalSettings != null) {
JsonNode aggregations = globalSettings.get(AGGREGATIONS);
if (aggregations != null && aggregations.isArray()) {
boolean entityTypeKeywordExists =
StreamSupport.stream(aggregations.spliterator(), false)
.anyMatch(
aggregation -> ENTITY_TYPE_KEYWORD.equals(aggregation.get(NAME).asText()));
if (!entityTypeKeywordExists) {
LOG.info("Adding missing entityType.keyword aggregation");
patchBuilder.add(
"/" + GLOBAL_SETTINGS + "/" + AGGREGATIONS + "/-",
Json.createObjectBuilder()
.add(NAME, ENTITY_TYPE_KEYWORD)
.add("type", TYPE)
.add(FIELD, ENTITY_TYPE_KEYWORD)
.build());
needsUpdate.set(true);
} else {
LOG.info("entityType.keyword aggregation already exists, no update needed");
}
}
}
if (needsUpdate.get()) {
JsonPatch patch = patchBuilder.build();
LOG.debug("Applying patch: {}", patch.toString());
JsonValue updated = JsonUtils.applyPatch(searchSettings.getConfigValue(), patch);
SearchSettings updatedSettings =
JsonUtils.readValue(updated.toString(), SearchSettings.class);
searchSettings.withConfigValue(updatedSettings);
systemRepository.updateSetting(searchSettings);
LOG.info("Search settings updated successfully with entityType.keyword aggregation");
} else {
LOG.info("No updates needed for search settings");
}
} catch (Exception e) {
LOG.error("Error updating search settings for entityType.keyword aggregation", e);
}
}
}

View File

@ -1181,6 +1181,9 @@ public class IngestionPipelineResource
decryptOrNullify(securityContext, ingestionPipeline, true);
ServiceEntityInterface service =
Entity.getEntity(ingestionPipeline.getService(), "", Include.NON_DELETED);
if (repository.isS3LogStorageEnabled()) {
ingestionPipeline.setEnableStreamableLogs(true);
}
PipelineServiceClientResponse status =
pipelineServiceClient.deployPipeline(ingestionPipeline, service);
if (status.getCode() == 200) {

View File

@ -1,29 +1,190 @@
package org.openmetadata.service.search;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import lombok.extern.slf4j.Slf4j;
import org.openmetadata.common.utils.CommonUtil;
import org.openmetadata.search.IndexMapping;
import org.openmetadata.service.Entity;
import org.openmetadata.service.search.RecreateIndexHandler.ReindexContext;
/**
* Default implementation of RecreateHandler that provides basic logging.
* This can be overridden to provide more sophisticated cleanup operations.
* Default implementation of RecreateHandler that provides zero-downtime index recreation.
*/
@Slf4j
public class DefaultRecreateHandler implements RecreateIndexHandler {
@Override
public void reCreateIndexes(Set<String> entities) {
public ReindexContext reCreateIndexes(Set<String> entities) {
ReindexContext context = new ReindexContext();
SearchRepository searchRepository = Entity.getSearchRepository();
if (CommonUtil.nullOrEmpty(entities)) {
return context;
}
String clusterAlias = searchRepository.getClusterAlias();
SearchClient<?> searchClient = searchRepository.getSearchClient();
for (String entityType : entities) {
IndexMapping indexType = searchRepository.getIndexMapping(entityType);
if (indexType == null) {
IndexMapping indexMapping = searchRepository.getIndexMapping(entityType);
if (indexMapping == null) {
LOG.warn(
"No index mapping found for entityType '{}'. Skipping index recreation.", entityType);
continue;
}
searchRepository.deleteIndex(indexType);
searchRepository.createIndex(indexType);
LOG.debug("Recreated index for entityType '{}'.", entityType);
String canonicalIndexName = indexMapping.getIndexName(clusterAlias);
String activeIndexName = canonicalIndexName;
if (!searchClient.indexExists(canonicalIndexName)) {
Set<String> aliasTargets =
searchClient.getIndicesByAlias(indexMapping.getAlias(clusterAlias));
if (!aliasTargets.isEmpty()) {
activeIndexName = aliasTargets.iterator().next();
LOG.debug(
"Resolved active index '{}' for entity '{}' via alias '{}'.",
activeIndexName,
entityType,
indexMapping.getAlias(clusterAlias));
} else {
LOG.debug(
"No existing index or alias found for entity '{}'. Rebuilding from scratch.",
entityType);
activeIndexName = null;
}
}
String mappingContent = searchRepository.readIndexMapping(indexMapping);
if (mappingContent == null) {
LOG.warn(
"Unable to read index mapping content for '{}'. Skipping staged recreation.",
canonicalIndexName);
continue;
}
String stagedIndexName = buildStagedIndexName(canonicalIndexName);
searchClient.createIndex(stagedIndexName, mappingContent);
Set<String> existingAliases =
activeIndexName != null ? searchClient.getAliases(activeIndexName) : Set.of();
context.add(
entityType,
canonicalIndexName,
activeIndexName,
stagedIndexName,
existingAliases,
indexMapping.getAlias(clusterAlias),
indexMapping.getParentAliases(clusterAlias));
LOG.info(
"Created staged index '{}' for entity '{}' to support zero-downtime recreation.",
stagedIndexName,
entityType);
}
return context;
}
@Override
public void finalizeReindex(ReindexContext context, boolean success) {
if (context == null || context.isEmpty()) {
return;
}
SearchRepository searchRepository = Entity.getSearchRepository();
SearchClient<?> searchClient = searchRepository.getSearchClient();
for (String entityType : context.getEntities()) {
String canonicalIndex = context.getCanonicalIndex(entityType).orElse(null);
String activeIndex = context.getOriginalIndex(entityType).orElse(null);
String stagedIndex = context.getStagedIndex(entityType).orElse(null);
if (canonicalIndex == null || stagedIndex == null) {
continue;
}
if (success) {
try {
Set<String> aliasesToAttach = new HashSet<>();
aliasesToAttach.addAll(context.getExistingAliases(entityType));
context.getCanonicalAlias(entityType).ifPresent(aliasesToAttach::add);
aliasesToAttach.add(canonicalIndex);
List<String> parentAliases = context.getParentAliases(entityType);
if (parentAliases != null) {
parentAliases.stream()
.filter(alias -> alias != null && !alias.isBlank())
.forEach(aliasesToAttach::add);
}
aliasesToAttach.removeIf(alias -> alias == null || alias.isBlank());
for (String alias : aliasesToAttach) {
Set<String> targets = searchClient.getIndicesByAlias(alias);
for (String target : targets) {
if (target.equals(stagedIndex)) {
continue;
}
boolean belongsToEntity =
target.equals(canonicalIndex) || target.startsWith(canonicalIndex + "_rebuild_");
if (!belongsToEntity) {
LOG.debug(
"Skipping alias '{}' removal from index '{}' as it does not belong to entity '{}'.",
alias,
target,
entityType);
continue;
}
searchClient.removeAliases(target, Set.of(alias));
LOG.info(
"Removed alias '{}' from index '{}' during promotion for entity '{}'.",
alias,
target,
entityType);
}
}
if (activeIndex != null && searchClient.indexExists(activeIndex)) {
searchClient.deleteIndex(activeIndex);
LOG.debug("Replaced old index '{}' for entity '{}'.", activeIndex, entityType);
}
if (!aliasesToAttach.isEmpty()) {
searchClient.addAliases(stagedIndex, aliasesToAttach);
}
LOG.info(
"Promoted staged index '{}' to serve entity '{}' (aliases: {}).",
stagedIndex,
entityType,
aliasesToAttach);
} catch (Exception ex) {
LOG.error(
"Failed to promote staged index '{}' for entity '{}'.", stagedIndex, entityType, ex);
}
} else {
try {
if (searchClient.indexExists(stagedIndex)) {
searchClient.deleteIndex(stagedIndex);
LOG.info(
"Deleted staged index '{}' after unsuccessful reindex for entity '{}'.",
stagedIndex,
entityType);
}
} catch (Exception ex) {
LOG.warn(
"Failed to delete staged index '{}' for entity '{}' after failure.",
stagedIndex,
entityType,
ex);
}
}
}
}
private String buildStagedIndexName(String originalIndexName) {
return String.format("%s_rebuild_%d", originalIndexName, System.currentTimeMillis());
}
}

View File

@ -1,5 +1,11 @@
package org.openmetadata.service.search;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
/**
@ -7,5 +13,65 @@ import java.util.Set;
* This allows for different implementations to be provided for different deployment tiers.
*/
public interface RecreateIndexHandler {
void reCreateIndexes(Set<String> entities);
ReindexContext reCreateIndexes(Set<String> entities);
default void finalizeReindex(ReindexContext context, boolean success) {}
class ReindexContext {
private final Map<String, String> canonicalIndexByEntity = new HashMap<>();
private final Map<String, String> originalIndexByEntity = new HashMap<>();
private final Map<String, String> stagedIndexByEntity = new HashMap<>();
private final Map<String, Set<String>> existingAliasesByEntity = new HashMap<>();
private final Map<String, String> canonicalAliasByEntity = new HashMap<>();
private final Map<String, List<String>> parentAliasesByEntity = new HashMap<>();
public void add(
String entity,
String canonicalIndex,
String originalIndex,
String stagedIndex,
Set<String> existingAliases,
String canonicalAlias,
List<String> parentAliases) {
canonicalIndexByEntity.put(entity, canonicalIndex);
originalIndexByEntity.put(entity, originalIndex);
stagedIndexByEntity.put(entity, stagedIndex);
existingAliasesByEntity.put(
entity, new HashSet<>(Optional.ofNullable(existingAliases).orElseGet(HashSet::new)));
canonicalAliasByEntity.put(entity, canonicalAlias);
parentAliasesByEntity.put(entity, parentAliases != null ? parentAliases : List.of());
}
public Optional<String> getCanonicalIndex(String entity) {
return Optional.ofNullable(canonicalIndexByEntity.get(entity));
}
public Set<String> getEntities() {
return Collections.unmodifiableSet(stagedIndexByEntity.keySet());
}
public Optional<String> getStagedIndex(String entity) {
return Optional.ofNullable(stagedIndexByEntity.get(entity));
}
public Optional<String> getOriginalIndex(String entity) {
return Optional.ofNullable(originalIndexByEntity.get(entity));
}
public Set<String> getExistingAliases(String entity) {
return existingAliasesByEntity.getOrDefault(entity, Collections.emptySet());
}
public Optional<String> getCanonicalAlias(String entity) {
return Optional.ofNullable(canonicalAliasByEntity.get(entity));
}
public List<String> getParentAliases(String entity) {
return parentAliasesByEntity.getOrDefault(entity, List.of());
}
public boolean isEmpty() {
return stagedIndexByEntity.isEmpty();
}
}
}

View File

@ -395,6 +395,18 @@ public interface SearchClient<T> {
void createAliases(IndexMapping indexMapping);
void createIndex(String indexName, String indexMappingContent);
void deleteIndex(String indexName);
Set<String> getAliases(String indexName);
void addAliases(String indexName, Set<String> aliases);
void removeAliases(String indexName, Set<String> aliases);
Set<String> getIndicesByAlias(String aliasName);
void addIndexAlias(IndexMapping indexMapping, String... aliasName);
Response previewSearch(

View File

@ -236,7 +236,11 @@ public class SearchRepository {
public void createIndexes() {
RecreateIndexHandler recreateIndexHandler = this.createReindexHandler();
recreateIndexHandler.reCreateIndexes(entityIndexMap.keySet());
RecreateIndexHandler.ReindexContext context =
recreateIndexHandler.reCreateIndexes(entityIndexMap.keySet());
if (context != null) {
recreateIndexHandler.finalizeReindex(context, true);
}
}
public void updateIndexes() {
@ -268,12 +272,24 @@ public class SearchRepository {
}
public boolean indexExists(IndexMapping indexMapping) {
return searchClient.indexExists(indexMapping.getIndexName(clusterAlias));
String indexName = indexMapping.getIndexName(clusterAlias);
if (searchClient.indexExists(indexName)) {
return true;
}
return !searchClient.getIndicesByAlias(indexName).isEmpty();
}
public void createIndex(IndexMapping indexMapping) {
try {
String indexName = indexMapping.getIndexName(clusterAlias);
if (!indexExists(indexMapping)) {
// Clean up any lingering alias with the same name
Set<String> aliasTargets = searchClient.getIndicesByAlias(indexName);
for (String target : aliasTargets) {
searchClient.removeAliases(target, Set.of(indexName));
searchClient.deleteIndex(target);
}
String indexMappingContent = getIndexMapping(indexMapping);
searchClient.createIndex(indexMapping, indexMappingContent);
searchClient.createAliases(indexMapping);
@ -302,8 +318,15 @@ public class SearchRepository {
public void deleteIndex(IndexMapping indexMapping) {
try {
if (indexExists(indexMapping)) {
String indexName = indexMapping.getIndexName(clusterAlias);
if (searchClient.indexExists(indexName)) {
searchClient.deleteIndex(indexMapping);
} else {
Set<String> aliasTargets = searchClient.getIndicesByAlias(indexName);
for (String target : aliasTargets) {
searchClient.removeAliases(target, Set.of(indexName));
searchClient.deleteIndex(target);
}
}
} catch (Exception e) {
LOG.error(
@ -327,6 +350,10 @@ public class SearchRepository {
return null;
}
public String readIndexMapping(IndexMapping indexMapping) {
return getIndexMapping(indexMapping);
}
/**
* Create search index for an entity only (no lifecycle events).
* This method is used by SearchIndexHandler.

View File

@ -329,6 +329,129 @@ public class ElasticSearchClient implements SearchClient<RestHighLevelClient> {
}
}
@Override
public void createIndex(String indexName, String indexMappingContent) {
if (!isClientAvailable) {
LOG.error(
"Failed to create Elastic Search index as client is not property configured, Please check your OpenMetadata configuration");
return;
}
try {
CreateIndexRequest request = new CreateIndexRequest(indexName);
request.source(indexMappingContent, XContentType.JSON);
client.indices().create(request, RequestOptions.DEFAULT);
LOG.debug("Created staged index {}", indexName);
} catch (Exception e) {
LOG.error(String.format("Failed to create staged index %s due to", indexName), e);
}
}
@Override
public void deleteIndex(String indexName) {
if (!isClientAvailable) {
return;
}
try {
DeleteIndexRequest request = new DeleteIndexRequest(indexName);
client.indices().delete(request, RequestOptions.DEFAULT);
LOG.debug("Deleted index {}", indexName);
} catch (Exception e) {
LOG.error(String.format("Failed to delete index %s due to", indexName), e);
}
}
@Override
public Set<String> getAliases(String indexName) {
Set<String> aliases = new HashSet<>();
if (!isClientAvailable) {
return aliases;
}
try {
Request request = new Request("GET", String.format("/%s/_alias", indexName));
es.org.elasticsearch.client.Response response =
client.getLowLevelClient().performRequest(request);
String responseBody = EntityUtils.toString(response.getEntity());
JsonNode root = JsonUtils.readTree(responseBody);
JsonNode indexNode = root.get(indexName);
if (indexNode != null && indexNode.has("aliases")) {
JsonNode aliasesNode = indexNode.get("aliases");
aliasesNode.fieldNames().forEachRemaining(aliases::add);
}
} catch (Exception e) {
LOG.warn(String.format("Failed to retrieve aliases for index %s", indexName), e);
}
return aliases;
}
@Override
public void addAliases(String indexName, Set<String> aliases) {
if (!isClientAvailable || nullOrEmpty(aliases)) {
return;
}
try {
IndicesAliasesRequest aliasesRequest = new IndicesAliasesRequest();
for (String alias : aliases) {
aliasesRequest.addAliasAction(
IndicesAliasesRequest.AliasActions.add().index(indexName).alias(alias));
}
client.indices().updateAliases(aliasesRequest, RequestOptions.DEFAULT);
LOG.debug("Added aliases {} to index {}", aliases, indexName);
} catch (Exception e) {
LOG.error(String.format("Failed to add aliases %s to index %s", aliases, indexName), e);
}
}
@Override
public void removeAliases(String indexName, Set<String> aliases) {
if (!isClientAvailable || nullOrEmpty(aliases)) {
return;
}
try {
IndicesAliasesRequest aliasesRequest = new IndicesAliasesRequest();
for (String alias : aliases) {
aliasesRequest.addAliasAction(
IndicesAliasesRequest.AliasActions.remove().index(indexName).alias(alias));
}
client.indices().updateAliases(aliasesRequest, RequestOptions.DEFAULT);
LOG.debug("Removed aliases {} from index {}", aliases, indexName);
} catch (Exception e) {
if (e instanceof ResponseException responseException
&& responseException.getResponse().getStatusLine().getStatusCode() == 404) {
LOG.debug(
"Aliases {} not present on index {} while attempting removal (ignored).",
aliases,
indexName);
return;
}
LOG.error(String.format("Failed to remove aliases %s from index %s", aliases, indexName), e);
}
}
@Override
public Set<String> getIndicesByAlias(String aliasName) {
Set<String> indices = new HashSet<>();
if (!isClientAvailable || aliasName == null || aliasName.isBlank()) {
return indices;
}
try {
Request request = new Request("GET", String.format("/_alias/%s", aliasName));
es.org.elasticsearch.client.Response response =
client.getLowLevelClient().performRequest(request);
String responseBody = EntityUtils.toString(response.getEntity());
JsonNode root = JsonUtils.readTree(responseBody);
root.fieldNames().forEachRemaining(indices::add);
} catch (ResponseException ex) {
if (ex.getResponse() != null && ex.getResponse().getStatusLine().getStatusCode() == 404) {
LOG.debug("Alias '{}' not found while resolving indices.", aliasName);
} else {
LOG.warn(String.format("Failed to resolve indices for alias %s", aliasName), ex);
}
} catch (Exception e) {
LOG.warn(String.format("Failed to resolve indices for alias %s", aliasName), e);
}
return indices;
}
@Override
public void updateIndex(IndexMapping indexMapping, String indexMappingContent) {
try {

View File

@ -62,6 +62,7 @@ import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.util.EntityUtils;
import org.jetbrains.annotations.NotNull;
import org.openmetadata.common.utils.CommonUtil;
import org.openmetadata.schema.api.entityRelationship.SearchEntityRelationshipRequest;
@ -345,6 +346,129 @@ public class OpenSearchClient implements SearchClient<RestHighLevelClient> {
}
}
@Override
public void createIndex(String indexName, String indexMappingContent) {
if (!Boolean.TRUE.equals(isClientAvailable)) {
LOG.error(
"Failed to create Open Search index as client is not property configured, Please check your OpenMetadata configuration");
return;
}
try {
CreateIndexRequest request = new CreateIndexRequest(indexName);
request.source(indexMappingContent, XContentType.JSON);
client.indices().create(request, RequestOptions.DEFAULT);
LOG.debug("Created staged index {}", indexName);
} catch (Exception e) {
LOG.error(String.format("Failed to create staged index %s due to", indexName), e);
}
}
@Override
public void deleteIndex(String indexName) {
if (!Boolean.TRUE.equals(isClientAvailable)) {
return;
}
try {
DeleteIndexRequest request = new DeleteIndexRequest(indexName);
client.indices().delete(request, RequestOptions.DEFAULT);
LOG.debug("Deleted index {}", indexName);
} catch (Exception e) {
LOG.error(String.format("Failed to delete index %s due to", indexName), e);
}
}
@Override
public Set<String> getAliases(String indexName) {
Set<String> aliases = new HashSet<>();
if (!Boolean.TRUE.equals(isClientAvailable)) {
return aliases;
}
try {
Request request = new Request("GET", String.format("/%s/_alias", indexName));
os.org.opensearch.client.Response response =
client.getLowLevelClient().performRequest(request);
String responseBody = EntityUtils.toString(response.getEntity());
JsonNode root = JsonUtils.readTree(responseBody);
JsonNode indexNode = root.get(indexName);
if (indexNode != null && indexNode.has("aliases")) {
JsonNode aliasesNode = indexNode.get("aliases");
aliasesNode.fieldNames().forEachRemaining(aliases::add);
}
} catch (Exception e) {
LOG.warn(String.format("Failed to retrieve aliases for index %s", indexName), e);
}
return aliases;
}
@Override
public void addAliases(String indexName, Set<String> aliases) {
if (!Boolean.TRUE.equals(isClientAvailable) || nullOrEmpty(aliases)) {
return;
}
try {
IndicesAliasesRequest aliasesRequest = new IndicesAliasesRequest();
for (String alias : aliases) {
aliasesRequest.addAliasAction(
IndicesAliasesRequest.AliasActions.add().index(indexName).alias(alias));
}
client.indices().updateAliases(aliasesRequest, RequestOptions.DEFAULT);
LOG.debug("Added aliases {} to index {}", aliases, indexName);
} catch (Exception e) {
LOG.error(String.format("Failed to add aliases %s to index %s", aliases, indexName), e);
}
}
@Override
public void removeAliases(String indexName, Set<String> aliases) {
if (!Boolean.TRUE.equals(isClientAvailable) || nullOrEmpty(aliases)) {
return;
}
try {
IndicesAliasesRequest aliasesRequest = new IndicesAliasesRequest();
for (String alias : aliases) {
aliasesRequest.addAliasAction(
IndicesAliasesRequest.AliasActions.remove().index(indexName).alias(alias));
}
client.indices().updateAliases(aliasesRequest, RequestOptions.DEFAULT);
LOG.debug("Removed aliases {} from index {}", aliases, indexName);
} catch (Exception e) {
if (e instanceof ResponseException responseException
&& responseException.getResponse().getStatusLine().getStatusCode() == 404) {
LOG.debug(
"Aliases {} not present on index {} while attempting removal (ignored).",
aliases,
indexName);
return;
}
LOG.error(String.format("Failed to remove aliases %s from index %s", aliases, indexName), e);
}
}
@Override
public Set<String> getIndicesByAlias(String aliasName) {
Set<String> indices = new HashSet<>();
if (!Boolean.TRUE.equals(isClientAvailable) || aliasName == null || aliasName.isBlank()) {
return indices;
}
try {
Request request = new Request("GET", String.format("/_alias/%s", aliasName));
os.org.opensearch.client.Response response =
client.getLowLevelClient().performRequest(request);
String responseBody = EntityUtils.toString(response.getEntity());
JsonNode root = JsonUtils.readTree(responseBody);
root.fieldNames().forEachRemaining(indices::add);
} catch (ResponseException ex) {
if (ex.getResponse() != null && ex.getResponse().getStatusLine().getStatusCode() == 404) {
LOG.debug("Alias '{}' not found while resolving indices.", aliasName);
} else {
LOG.warn(String.format("Failed to resolve indices for alias %s", aliasName), ex);
}
} catch (Exception e) {
LOG.warn(String.format("Failed to resolve indices for alias %s", aliasName), e);
}
return indices;
}
@Override
public void updateIndex(IndexMapping indexMapping, String indexMappingContent) {
try {

View File

@ -50,6 +50,7 @@ public class ReindexingUtil {
public static final String ENTITY_TYPE_KEY = "entityType";
public static final String ENTITY_NAME_LIST_KEY = "entityNameList";
public static final String TIMESTAMP_KEY = "@timestamp";
public static final String TARGET_INDEX_KEY = "targetIndex";
public static void getUpdatedStats(StepStats stats, int currentSuccess, int currentFailed) {
stats.setSuccessRecords(stats.getSuccessRecords() + currentSuccess);

View File

@ -20,6 +20,11 @@
"type": "terms",
"field": "entityType"
},
{
"name": "entityType.keyword",
"type": "terms",
"field": "entityType.keyword"
},
{
"name": "tier.tagFQN",
"type": "terms",

View File

@ -6,14 +6,20 @@ import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anySet;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.when;
import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.TARGET_INDEX_KEY;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -26,6 +32,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@ -48,9 +55,13 @@ import org.openmetadata.schema.system.Stats;
import org.openmetadata.schema.system.StepStats;
import org.openmetadata.schema.utils.JsonUtils;
import org.openmetadata.schema.utils.ResultList;
import org.openmetadata.service.Entity;
import org.openmetadata.service.OpenMetadataApplicationTest;
import org.openmetadata.service.exception.SearchIndexException;
import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.search.DefaultRecreateHandler;
import org.openmetadata.service.search.RecreateIndexHandler;
import org.openmetadata.service.search.SearchClient;
import org.openmetadata.service.search.SearchRepository;
import org.openmetadata.service.socket.WebSocketManager;
import org.openmetadata.service.workflows.interfaces.Source;
@ -178,6 +189,132 @@ class SearchIndexAppTest extends OpenMetadataApplicationTest {
});
}
@Test
void testCreateContextDataIncludesTargetIndexWhenStaged() throws Exception {
EventPublisherJob jobDataWithRecreate =
JsonUtils.convertValue(testJobData, EventPublisherJob.class).withRecreateIndex(true);
App testApp =
new App()
.withName("SearchIndexingApplication")
.withAppConfiguration(JsonUtils.convertValue(jobDataWithRecreate, Object.class));
searchIndexApp.init(testApp);
RecreateIndexHandler.ReindexContext context = new RecreateIndexHandler.ReindexContext();
context.add(
"table",
"cluster_table",
"cluster_table",
"cluster_table_rebuild_123",
Set.of("cluster_table_alias"),
"table",
List.of("dataAsset"));
Field contextField = SearchIndexApp.class.getDeclaredField("recreateContext");
contextField.setAccessible(true);
contextField.set(searchIndexApp, context);
Method createContextData =
SearchIndexApp.class.getDeclaredMethod("createContextData", String.class);
createContextData.setAccessible(true);
@SuppressWarnings("unchecked")
Map<String, Object> contextData =
(Map<String, Object>) createContextData.invoke(searchIndexApp, "table");
assertEquals("cluster_table_rebuild_123", contextData.get(TARGET_INDEX_KEY));
assertTrue((Boolean) contextData.get("recreateIndex"));
}
@Test
void testFinalizeReindexMovesAliasesForTargetEntityOnly() {
AliasState aliasState = new AliasState();
aliasState.put(
"table_search_index_rebuild_old",
Set.of("table", "table_search_index", "all", "dataAsset"));
aliasState.put("table_search_index_rebuild_new", new HashSet<>());
aliasState.put(
"dashboard_search_index_rebuild_old",
Set.of("dashboard", "dashboard_search_index", "all", "dataAsset"));
SearchClient<Void> client = aliasState.toMock();
SearchRepository repo = mock(SearchRepository.class);
when(repo.getSearchClient()).thenReturn(client);
try (MockedStatic<Entity> entityMock = mockStatic(Entity.class)) {
entityMock.when(Entity::getSearchRepository).thenReturn(repo);
RecreateIndexHandler.ReindexContext context = new RecreateIndexHandler.ReindexContext();
context.add(
"table",
"table_search_index",
"table_search_index_rebuild_old",
"table_search_index_rebuild_new",
Set.of("table", "table_search_index", "all", "dataAsset"),
"table",
List.of("all", "dataAsset", "database", "databaseSchema", "databaseService"));
new DefaultRecreateHandler().finalizeReindex(context, true);
}
assertTrue(aliasState.deletedIndices.contains("table_search_index_rebuild_old"));
assertEquals(
Set.of(
"table",
"table_search_index",
"all",
"dataAsset",
"database",
"databaseSchema",
"databaseService"),
aliasState.indexAliases.get("table_search_index_rebuild_new"));
assertEquals(
Set.of("dashboard", "dashboard_search_index", "all", "dataAsset"),
aliasState.indexAliases.get("dashboard_search_index_rebuild_old"));
}
@Test
void testFinalizeReindexRemovesPreviousEntityRebuildIndexes() {
AliasState aliasState = new AliasState();
aliasState.put(
"table_search_index_rebuild_old1",
Set.of("table", "table_search_index", "all", "dataAsset"));
aliasState.put(
"table_search_index_rebuild_old2",
Set.of("table", "table_search_index", "all", "dataAsset"));
aliasState.put("table_search_index_rebuild_new", new HashSet<>());
SearchClient<Void> client = aliasState.toMock();
SearchRepository repo = mock(SearchRepository.class);
when(repo.getSearchClient()).thenReturn(client);
try (MockedStatic<Entity> entityMock = mockStatic(Entity.class)) {
entityMock.when(Entity::getSearchRepository).thenReturn(repo);
RecreateIndexHandler.ReindexContext context = new RecreateIndexHandler.ReindexContext();
context.add(
"table",
"table_search_index",
"table_search_index_rebuild_old1",
"table_search_index_rebuild_new",
Set.of("table", "table_search_index", "all", "dataAsset"),
"table",
List.of("all", "dataAsset"));
new DefaultRecreateHandler().finalizeReindex(context, true);
}
assertTrue(aliasState.deletedIndices.contains("table_search_index_rebuild_old1"));
assertEquals(
Set.of("table", "table_search_index", "all", "dataAsset"),
aliasState.indexAliases.get("table_search_index_rebuild_new"));
assertTrue(
aliasState.indexAliases.containsKey("table_search_index_rebuild_old2")
? aliasState.indexAliases.get("table_search_index_rebuild_old2").isEmpty()
: true);
}
@Test
void testErrorHandlingWithSearchIndexException() throws Exception {
App testApp =
@ -838,6 +975,8 @@ class SearchIndexAppTest extends OpenMetadataApplicationTest {
SearchIndexApp.IndexingTask<EntityInterface> task =
new SearchIndexApp.IndexingTask<>("table", resultList, 0);
searchIndexApp.getJobData().setStatus(EventPublisherJob.Status.RUNNING);
// Process the task
var processTaskMethod =
SearchIndexApp.class.getDeclaredMethod(
@ -904,4 +1043,77 @@ class SearchIndexAppTest extends OpenMetadataApplicationTest {
assertEquals(10, jobData.getMaxConcurrentRequests());
assertEquals(1000000L, jobData.getPayLoadSize());
}
private static class AliasState {
final Map<String, Set<String>> indexAliases = new HashMap<>();
final Set<String> deletedIndices = new HashSet<>();
void put(String indexName, Set<String> aliases) {
indexAliases.put(indexName, new HashSet<>(aliases));
}
SearchClient<Void> toMock() {
@SuppressWarnings("unchecked")
SearchClient<Void> client = mock(SearchClient.class);
lenient().when(client.isClientAvailable()).thenReturn(true);
lenient()
.when(client.getSearchType())
.thenReturn(ElasticSearchConfiguration.SearchType.ELASTICSEARCH);
when(client.indexExists(anyString()))
.thenAnswer(invocation -> indexAliases.containsKey(invocation.getArgument(0)));
lenient()
.when(client.getAliases(anyString()))
.thenAnswer(
invocation ->
new HashSet<>(indexAliases.getOrDefault(invocation.getArgument(0), Set.of())));
lenient()
.when(client.getIndicesByAlias(anyString()))
.thenAnswer(
invocation ->
indexAliases.entrySet().stream()
.filter(e -> e.getValue().contains(invocation.getArgument(0)))
.map(Map.Entry::getKey)
.collect(Collectors.toSet()));
doAnswer(
invocation -> {
String index = invocation.getArgument(0);
@SuppressWarnings("unchecked")
Set<String> aliases = new HashSet<>((Set<String>) invocation.getArgument(1));
indexAliases.computeIfPresent(
index,
(k, v) -> {
v.removeAll(aliases);
return v;
});
return null;
})
.when(client)
.removeAliases(anyString(), anySet());
doAnswer(
invocation -> {
String index = invocation.getArgument(0);
@SuppressWarnings("unchecked")
Set<String> aliases = new HashSet<>((Set<String>) invocation.getArgument(1));
indexAliases.computeIfAbsent(index, k -> new HashSet<>()).addAll(aliases);
return null;
})
.when(client)
.addAliases(anyString(), anySet());
doAnswer(
invocation -> {
String index = invocation.getArgument(0);
indexAliases.remove(index);
deletedIndices.add(index);
return null;
})
.when(client)
.deleteIndex(anyString());
return client;
}
}
}

View File

@ -78,16 +78,6 @@ test.describe('Ingestion Bot ', () => {
await afterAction();
});
test.afterAll('Cleanup pre-requests', async ({ browser }) => {
const { apiContext, afterAction } = await performAdminLogin(browser);
await Promise.all([
domain1.delete(apiContext),
domain2.delete(apiContext),
domain3.delete(apiContext),
]);
await afterAction();
});
test.beforeEach('Visit entity details page', async ({ page }) => {
await redirectToHomePage(page);
});
@ -105,7 +95,6 @@ test.describe('Ingestion Bot ', () => {
await test.step('Assign assets to domains', async () => {
// Add assets to domain 1
await redirectToHomePage(page);
await sidebarClick(page, SidebarItem.DOMAIN);
await page.waitForLoadState('networkidle');
await page.waitForSelector('[data-testid="loader"]', {
@ -115,7 +104,6 @@ test.describe('Ingestion Bot ', () => {
await addAssetsToDomain(page, domain1, domainAsset1);
// Add assets to domain 2
await redirectToHomePage(page);
await sidebarClick(page, SidebarItem.DOMAIN);
await page.waitForLoadState('networkidle');
await page.waitForSelector('[data-testid="loader"]', {
@ -128,8 +116,6 @@ test.describe('Ingestion Bot ', () => {
await test.step(
'Ingestion bot should access domain assigned assets',
async () => {
await redirectToHomePage(ingestionBotPage);
// Check if entity page is accessible & it has domain
for (const asset of domainAsset1) {
await redirectToHomePage(ingestionBotPage);
@ -173,7 +159,6 @@ test.describe('Ingestion Bot ', () => {
]);
// Add assets to domain 2
await redirectToHomePage(page);
await sidebarClick(page, SidebarItem.DOMAIN);
await page.waitForLoadState('networkidle');
await page.waitForSelector('[data-testid="loader"]', {

View File

@ -565,7 +565,7 @@ test('Verify cycle lineage should be handled properly', async ({ browser }) => {
await page.reload();
await page.waitForLoadState('networkidle');
await page.getByTestId('fit-screen').click();
await performZoomOut(page);
await expect(page.getByTestId(`lineage-node-${tableFqn}`)).toBeVisible();
await expect(page.getByTestId(`lineage-node-${topicFqn}`)).toBeVisible();

View File

@ -189,11 +189,39 @@ test('Search Index Application', async ({ page }) => {
// Verify response status code
const getMarketPlaceResponse = await page.waitForResponse(
'/api/v1/apps/marketplace?limit=*'
'/api/v1/apps/marketplace?limit=15'
);
expect(getMarketPlaceResponse.status()).toBe(200);
// Check if search-indexing-application-card is visible, if not paginate through pages
let cardFound = await page
.locator('[data-testid="search-indexing-application-card"]')
.isVisible();
while (!cardFound) {
const nextButton = page.locator('[data-testid="next"]');
const isNextButtonDisabled = await nextButton.isDisabled();
if (isNextButtonDisabled) {
throw new Error(
'search-indexing-application-card not found in marketplace and next button is disabled'
);
}
// Click next page and wait for response
const nextPageResponse = page.waitForResponse(
'/api/v1/apps/marketplace*'
);
await nextButton.click();
await nextPageResponse;
// Check if card is now visible
cardFound = await page
.locator('[data-testid="search-indexing-application-card"]')
.isVisible();
}
await page.click(
'[data-testid="search-indexing-application-card"] [data-testid="config-btn"]'
);

View File

@ -168,17 +168,24 @@ test.describe('Search Preview test', () => {
state: 'detached',
});
const searchInput = page.getByTestId('searchbar');
const previewRes = page.waitForResponse('/api/v1/search/preview');
await searchInput.fill(table1.entity.name);
await previewRes;
const descriptionField = page.getByTestId(
`field-configuration-panel-description`
);
await descriptionField.click();
await setSliderValue(page, 'field-weight-slider', 68);
await descriptionField.click();
const previewResponse = page.waitForResponse('/api/v1/search/preview');
await page.getByTestId('highlight-field-switch').click();
await previewResponse;
await expect(page.getByTestId('highlight-field-switch')).toHaveAttribute(
'aria-checked',
'false'
);
const searchInput = page.getByTestId('searchbar');
await searchInput.fill(table1.entity.name);
await previewResponse;
await page.waitForLoadState('networkidle');
await page.waitForSelector('[data-testid="loader"]', {

View File

@ -384,6 +384,12 @@ test('Classification Page', async ({ page }) => {
await page.reload();
await databaseSchemasPage;
await page.waitForLoadState('networkidle');
await page.waitForSelector('[data-testid="loader"]', {
state: 'detached',
});
await expect(
page.locator('[data-testid="tags-container"]')
).toContainText(tag);
@ -441,6 +447,11 @@ test('Classification Page', async ({ page }) => {
// Verify term count is now 0 after deleting the tag
await page.reload();
await page.waitForLoadState('networkidle');
await page.waitForSelector('[data-testid="loader"]', {
state: 'detached',
});
await page.waitForSelector('[data-testid="side-panel-classification"]', {
state: 'visible',
});

View File

@ -43,11 +43,13 @@ export class ClassificationClass {
}
async visitPage(page: Page) {
const getClassification = page.waitForResponse('/api/v1/classifications**');
const getTags = page.waitForResponse('/api/v1/tags**');
await sidebarClick(page, SidebarItem.TAGS);
const getTags = page.waitForResponse('/api/v1/tags*');
await page.waitForSelector('[data-testid="side-panel-classification"]');
await getClassification;
await getTags;
await page.waitForSelector('[data-testid="side-panel-classification"]');
await page
.locator(`[data-testid="side-panel-classification"]`)
.filter({ hasText: this.data.displayName })

View File

@ -718,7 +718,9 @@ export const validateGlossaryTerm = async (
if (isGlossaryTermPage) {
await expect(page.getByTestId(term.name)).toBeVisible();
} else {
await expect(page.locator(termSelector)).toBeVisible();
await expect(page.locator(termSelector)).toContainText(term.name);
await expect(page.locator(statusSelector)).toBeVisible();
await expect(page.locator(statusSelector)).toContainText(status);
}
};

View File

@ -56,28 +56,28 @@ export const verifyDataFilters = async (page: Page, widgetKey: string) => {
page.getByTestId(widgetKey).getByTestId('widget-sort-by-dropdown')
).toBeVisible();
const aToZFilter = page.waitForResponse(
'/api/v1/search/query?q=*&index=all*&sort_field=name.keyword&sort_order=asc'
);
await page
.getByTestId(widgetKey)
.getByTestId('widget-sort-by-dropdown')
.click();
const aToZFilter = page.waitForResponse(
'/api/v1/search/query?q=*&index=all*&sort_field=name.keyword*&sort_order=asc*'
);
await page.getByRole('menuitem', { name: 'A to Z' }).click();
await aToZFilter;
const zToAFilter = page.waitForResponse(
'/api/v1/search/query?q=*&index=all*&sort_field=name.keyword&sort_order=desc'
);
await page
.getByTestId(widgetKey)
.getByTestId('widget-sort-by-dropdown')
.click();
const zToAFilter = page.waitForResponse(
'/api/v1/search/query?q=*&index=all*&sort_field=name.keyword*&sort_order=desc*'
);
await page.getByRole('menuitem', { name: 'Z to A' }).click();
await zToAFilter;
const latestFilter = page.waitForResponse(
'/api/v1/search/query?q=*&index=all*&sort_field=updatedAt&sort_order=desc'
'/api/v1/search/query?q=*&index=all*&sort_field=updatedAt*&sort_order=desc*'
);
await page
.getByTestId(widgetKey)

View File

@ -0,0 +1,40 @@
# Custom Drive
In this section, we provide guides and references to use the Custom Drive connector.
Note that this connector is a wrapper for any Python class you create and add to the OpenMetadata ingestion image. The full idea around it is bringing you the tools to bring into OpenMetadata any source that is only available within your business/engineering context.
You can learn more about Custom Connectors and see them in action in the following [Webinar](https://www.youtube.com/watch?v=fDUj30Ub9VE&ab_channel=OpenMetadata). Also, you can directly jump to the demo code [here](https://github.com/open-metadata/openmetadata-demo/tree/main/custom-connector).
## Connection Details
$$section
### Connection Arguments $(id="connectionArguments")
Advanced arguments specific to your custom implementation. These can be any key-value pairs that your custom connector requires.
Possible uses:
- Custom authentication parameters
- Service-specific API options
- Data transformation settings
- Debugging and logging configuration
$$
$$section
### Connection Options $(id="connectionOptions")
This property becomes useful when we need to send input parameters to our Source Class.
If, for example, we want to run a piece of logic based on the value of a parameter named `business_unit`, we can pass the key `business_unit` with any value, and read it in the Source via:
```python
business_unit = self.service_connection.connectionOptions.__root__.get("business_unit")
```
You can find a full example of this implementation [here](https://github.com/open-metadata/openmetadata-demo/blob/main/custom-connector/connector/my_csv_connector.py#L91).
$$
## Test Connection
The test connection is disabled here as this is a custom implementation. The recommended approach would be to validate the connection to your source as a first step in the ingestion process.

View File

@ -0,0 +1,226 @@
# Metadata
Drive Service Metadata Pipeline Configuration.
## Configuration
$$section
### Directory Filter Pattern $(id="directoryFilterPattern")
Directory filter patterns are used to control whether to include specific directories as part of metadata ingestion.
**Include**: Explicitly include directories by adding a list of regular expressions to the `Include` field. OpenMetadata will include all directories with names matching one or more of the supplied regular expressions. All other directories will be excluded.
For example, to include only those directories whose name starts with the word `project`, add the regex pattern in the include field as `^project.*`.
**Exclude**: Explicitly exclude directories by adding a list of regular expressions to the `Exclude` field. OpenMetadata will exclude all directories with names matching one or more of the supplied regular expressions. All other directories will be included.
For example, to exclude all directories with the name containing the word `archive`, add regex pattern in the exclude field as `.*archive.*`.
Checkout [this](https://docs.open-metadata.org/connectors/ingestion/workflows/metadata/filter-patterns) document for further examples on filter patterns.
$$
$$section
### File Filter Pattern $(id="fileFilterPattern")
File filter patterns allow you to control which files are included in the metadata extraction based on their names or extensions.
**Include**: Add regular expressions to include specific file types. For example:
- `.*\.xlsx?$` - Include Excel files
- `.*\.csv$` - Include CSV files
- `.*\.pdf$` - Include PDF documents
- `.*\.docx?$` - Include Word documents
**Exclude**: Add regular expressions to exclude specific file types. For example:
- `~\$.*` - Exclude temporary files
- `\.tmp$` - Exclude files with .tmp extension
- `^\..*` - Exclude hidden files
$$
$$section
### Spreadsheet Filter Pattern $(id="spreadsheetFilterPattern")
Spreadsheet filter patterns control which spreadsheet files (like Google Sheets or Excel files) are included in metadata extraction.
**Include**: Add regular expressions to include specific spreadsheets based on their names.
**Exclude**: Add regular expressions to exclude specific spreadsheets.
This is particularly useful when you want to process only certain spreadsheets as data sources while ignoring others.
$$
$$section
### Worksheet Filter Pattern $(id="worksheetFilterPattern")
Worksheet filter patterns allow you to control which worksheets within spreadsheets are included in the metadata extraction.
**Include**: Add regular expressions to include specific worksheets based on their names. For example:
- `^data_.*` - Include worksheets starting with "data_"
- `.*_final$` - Include worksheets ending with "_final"
**Exclude**: Add regular expressions to exclude specific worksheets. For example:
- `^temp_.*` - Exclude temporary worksheets
- `.*_draft$` - Exclude draft worksheets
$$
$$section
### Include Directories $(id="includeDirectories")
Optional configuration to turn on/off fetching metadata for directories.
When enabled (default: true), the connector will extract metadata for directories including:
- Directory structure and hierarchy
- Directory permissions and ownership
- Directory metadata like creation date, modification date
Disable this if you only want to extract file-level metadata without directory information.
$$
$$section
### Include Files $(id="includeFiles")
Optional configuration to turn on/off fetching metadata for files.
When enabled (default: true), the connector will extract metadata for all types of files including:
- Documents (PDF, Word, etc.)
- Images
- Videos
- Other file types
Disable this if you only want to extract directory or spreadsheet metadata.
$$
$$section
### Include Spreadsheets $(id="includeSpreadsheets")
Optional configuration to turn on/off fetching metadata for spreadsheets.
When enabled (default: true), the connector will process spreadsheet files (Google Sheets, Excel) as structured data sources, extracting:
- Spreadsheet structure
- Sheet names and relationships
- Basic schema information
Disable this if you don't want to process spreadsheets as data sources.
$$
$$section
### Include Worksheets $(id="includeWorksheets")
Optional configuration to turn on/off fetching metadata for individual worksheets within spreadsheets.
When enabled (default: true), the connector will extract detailed metadata for each worksheet including:
- Column headers and data types
- Row counts
- Worksheet-specific metadata
Disable this if you only want spreadsheet-level metadata without worksheet details.
$$
$$section
### Include Tags $(id="includeTags")
Optional configuration to toggle the tags ingestion.
When enabled (default: true), the connector will extract and apply tags from the source drive system to the ingested entities in OpenMetadata.
$$
$$section
### Include Owners $(id="includeOwners")
Set the 'Include Owners' toggle to control whether to include owners to the ingested entity if the owner email matches with a user stored in the OpenMetadata server as part of metadata ingestion.
If the ingested entity already exists and has an owner, the owner will not be overwritten.
Default: false
$$
$$section
### Mark Deleted Directories $(id="markDeletedDirectories")
Optional configuration to soft delete directories in OpenMetadata if the source directories are deleted.
When enabled (default: true), if a directory is deleted in the source:
- The directory will be marked as deleted in OpenMetadata
- All associated entities (files, spreadsheets, worksheets, lineage) will also be deleted
Disable this to preserve directory metadata in OpenMetadata even when deleted from the source.
$$
$$section
### Mark Deleted Files $(id="markDeletedFiles")
Optional configuration to soft delete files in OpenMetadata if the source files are deleted.
When enabled (default: true), if a file is deleted in the source:
- The file will be marked as deleted in OpenMetadata
- All associated entities (lineage, relationships) will also be deleted
Disable this to preserve file metadata in OpenMetadata even when deleted from the source.
$$
$$section
### Mark Deleted Spreadsheets $(id="markDeletedSpreadsheets")
Optional configuration to soft delete spreadsheets in OpenMetadata if the source spreadsheets are deleted.
When enabled (default: true), if a spreadsheet is deleted in the source:
- The spreadsheet will be marked as deleted in OpenMetadata
- All associated worksheets and lineage will also be deleted
Disable this to preserve spreadsheet metadata in OpenMetadata even when deleted from the source.
$$
$$section
### Mark Deleted Worksheets $(id="markDeletedWorksheets")
Optional configuration to soft delete worksheets in OpenMetadata if the source worksheets are deleted.
When enabled (default: true), if a worksheet is deleted in the source:
- The worksheet will be marked as deleted in OpenMetadata
- All associated lineage will also be deleted
Disable this to preserve worksheet metadata in OpenMetadata even when deleted from the source.
$$
$$section
### Use FQN For Filtering $(id="useFqnForFiltering")
When enabled, regex patterns will be applied on the fully qualified name (FQN) instead of just the raw name.
For example:
- FQN: `service_name.directory_name.file_name`
- Raw name: `file_name`
This is useful when you want to filter based on the complete path or hierarchy rather than just the item name.
Default: false
$$
$$section
### Override Metadata $(id="overrideMetadata")
Set the 'Override Metadata' toggle to control whether to override the existing metadata in the OpenMetadata server with the metadata fetched from the source.
If the toggle is `enabled`, the metadata fetched from the source will override and replace the existing metadata in OpenMetadata.
If the toggle is `disabled`, the metadata fetched from the source will not override the existing metadata in the OpenMetadata server. In this case, the metadata will only get updated for fields that have no value added in OpenMetadata.
This is applicable for fields like description, tags, owner, and displayName.
Default: false
$$
$$section
### Number of Threads $(id="threads")
Number of threads to use in order to parallelize Drive ingestion.
Increasing the number of threads can improve ingestion performance for large drive structures, but may also increase resource consumption and API rate limit pressure.
Recommended values:
- 1 thread: Safe default for small to medium drives
- 2-4 threads: Medium to large drives with good network bandwidth
- 5+ threads: Very large drives with enterprise API limits
Default: 1
$$

View File

@ -108,7 +108,7 @@ const TagPage = () => {
tab?: string;
}>();
const { permissions, getEntityPermission } = usePermissionProvider();
const [isLoading, setIsLoading] = useState(false);
const [isLoading, setIsLoading] = useState(true);
const [tagItem, setTagItem] = useState<Tag>();
const [assetModalVisible, setAssetModalVisible] = useState(false);