mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-07-24 09:50:01 +00:00
Created ActivityFeed Alert (#9339)
This commit is contained in:
parent
fc60098e84
commit
b762593652
@ -1,7 +1,5 @@
|
||||
package org.openmetadata.service.alerts;
|
||||
|
||||
import static org.openmetadata.service.security.policyevaluator.CompiledRule.parseExpression;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
@ -10,7 +8,6 @@ import java.util.concurrent.ConcurrentHashMap;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.openmetadata.schema.entity.alerts.Alert;
|
||||
import org.openmetadata.schema.entity.alerts.AlertAction;
|
||||
import org.openmetadata.schema.entity.alerts.AlertFilterRule;
|
||||
import org.openmetadata.schema.entity.alerts.TriggerConfig;
|
||||
import org.openmetadata.schema.filter.EventFilter;
|
||||
import org.openmetadata.schema.filter.Filters;
|
||||
@ -21,8 +18,6 @@ import org.openmetadata.service.events.EventPublisher;
|
||||
import org.openmetadata.service.events.errors.RetriableException;
|
||||
import org.openmetadata.service.resources.events.EventResource.ChangeEventList;
|
||||
import org.openmetadata.service.util.FilterUtil;
|
||||
import org.springframework.expression.Expression;
|
||||
import org.springframework.expression.spel.support.StandardEvaluationContext;
|
||||
|
||||
@Slf4j
|
||||
public abstract class AbstractAlertPublisher implements EventPublisher {
|
||||
@ -74,7 +69,7 @@ public abstract class AbstractAlertPublisher implements EventPublisher {
|
||||
}
|
||||
|
||||
// Evaluate ChangeEvent Alert Filtering
|
||||
if (!evaluateAlertConditions(changeEvent)) {
|
||||
if (!AlertUtil.evaluateAlertConditions(changeEvent, alert.getFilteringRules())) {
|
||||
return;
|
||||
}
|
||||
|
||||
@ -120,20 +115,4 @@ public abstract class AbstractAlertPublisher implements EventPublisher {
|
||||
return filter.isEmpty() || FilterUtil.shouldProcessRequest(changeEvent, filter);
|
||||
}
|
||||
}
|
||||
|
||||
private boolean evaluateAlertConditions(ChangeEvent changeEvent) {
|
||||
boolean result = false;
|
||||
for (AlertFilterRule rule : alert.getFilteringRules()) {
|
||||
AlertsRuleEvaluator ruleEvaluator = new AlertsRuleEvaluator(changeEvent);
|
||||
StandardEvaluationContext evaluationContext = new StandardEvaluationContext(ruleEvaluator);
|
||||
Expression expression = parseExpression(rule.getCondition());
|
||||
if (rule.getEffect() == AlertFilterRule.Effect.ALLOW) {
|
||||
result = Boolean.TRUE.equals(expression.getValue(evaluationContext, Boolean.class));
|
||||
} else if (rule.getEffect() == AlertFilterRule.Effect.DENY) {
|
||||
result = Boolean.FALSE.equals(expression.getValue(evaluationContext, Boolean.class));
|
||||
}
|
||||
LOG.debug("Alert evaluated as Result : {}", result);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,58 @@
|
||||
package org.openmetadata.service.alerts;
|
||||
|
||||
import com.google.common.cache.CacheBuilder;
|
||||
import com.google.common.cache.CacheLoader;
|
||||
import com.google.common.cache.LoadingCache;
|
||||
import com.google.common.util.concurrent.UncheckedExecutionException;
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import javax.annotation.CheckForNull;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.openmetadata.schema.entity.alerts.Alert;
|
||||
import org.openmetadata.service.exception.EntityNotFoundException;
|
||||
import org.openmetadata.service.jdbi3.AlertRepository;
|
||||
import org.openmetadata.service.jdbi3.CollectionDAO;
|
||||
|
||||
@Slf4j
|
||||
public class ActivityFeedAlertCache {
|
||||
private static final ActivityFeedAlertCache INSTANCE = new ActivityFeedAlertCache();
|
||||
private static volatile boolean INITIALIZED = false;
|
||||
protected static LoadingCache<String, Alert> ALERTS_CACHE;
|
||||
protected static AlertRepository ALERT_REPOSITORY;
|
||||
private static String activityFeedAlertName;
|
||||
|
||||
public static void initialize(String alertName, CollectionDAO dao) {
|
||||
if (!INITIALIZED) {
|
||||
ALERTS_CACHE =
|
||||
CacheBuilder.newBuilder()
|
||||
.maximumSize(1000)
|
||||
.expireAfterAccess(1, TimeUnit.MINUTES)
|
||||
.build(new ActivityFeedAlertCache.ActivityFeedAlertLoader());
|
||||
ALERT_REPOSITORY = new AlertRepository(dao);
|
||||
INITIALIZED = true;
|
||||
activityFeedAlertName = alertName;
|
||||
}
|
||||
}
|
||||
|
||||
public static ActivityFeedAlertCache getInstance() {
|
||||
return INSTANCE;
|
||||
}
|
||||
|
||||
public Alert getActivityFeedAlert() throws EntityNotFoundException {
|
||||
try {
|
||||
return ALERTS_CACHE.get(activityFeedAlertName);
|
||||
} catch (ExecutionException | UncheckedExecutionException ex) {
|
||||
throw new EntityNotFoundException(ex.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
static class ActivityFeedAlertLoader extends CacheLoader<String, Alert> {
|
||||
@Override
|
||||
public Alert load(@CheckForNull String alertName) throws IOException {
|
||||
Alert alert = ALERT_REPOSITORY.getByName(null, alertName, ALERT_REPOSITORY.getFields("*"));
|
||||
LOG.debug("Loaded Alert {}", alert);
|
||||
return alert;
|
||||
}
|
||||
}
|
||||
}
|
@ -14,7 +14,9 @@ import java.util.stream.Stream;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.openmetadata.schema.entity.alerts.Alert;
|
||||
import org.openmetadata.schema.entity.alerts.AlertAction;
|
||||
import org.openmetadata.schema.entity.alerts.AlertFilterRule;
|
||||
import org.openmetadata.schema.tests.type.TestCaseStatus;
|
||||
import org.openmetadata.schema.type.ChangeEvent;
|
||||
import org.openmetadata.schema.type.EventType;
|
||||
import org.openmetadata.schema.type.Function;
|
||||
import org.openmetadata.schema.type.ParamAdditionalContext;
|
||||
@ -28,6 +30,7 @@ import org.openmetadata.service.exception.CatalogExceptionMessage;
|
||||
import org.openmetadata.service.jdbi3.CollectionDAO;
|
||||
import org.openmetadata.service.resources.CollectionRegistry;
|
||||
import org.springframework.expression.Expression;
|
||||
import org.springframework.expression.spel.support.StandardEvaluationContext;
|
||||
|
||||
@Slf4j
|
||||
public class AlertUtil {
|
||||
@ -48,6 +51,8 @@ public class AlertUtil {
|
||||
case EMAIL:
|
||||
publisher = new EmailAlertPublisher(alert, alertAction, daoCollection);
|
||||
break;
|
||||
case ACTIVITY_FEED:
|
||||
throw new IllegalArgumentException("Cannot create Activity Feed as Publisher.");
|
||||
default:
|
||||
throw new IllegalArgumentException("Invalid Alert Action Specified.");
|
||||
}
|
||||
@ -112,4 +117,20 @@ public class AlertUtil {
|
||||
}
|
||||
return indexesToSearch;
|
||||
}
|
||||
|
||||
public static boolean evaluateAlertConditions(ChangeEvent changeEvent, List<AlertFilterRule> alertFilterRules) {
|
||||
boolean result = false;
|
||||
for (AlertFilterRule rule : alertFilterRules) {
|
||||
AlertsRuleEvaluator ruleEvaluator = new AlertsRuleEvaluator(changeEvent);
|
||||
StandardEvaluationContext evaluationContext = new StandardEvaluationContext(ruleEvaluator);
|
||||
Expression expression = parseExpression(rule.getCondition());
|
||||
if (rule.getEffect() == AlertFilterRule.Effect.ALLOW) {
|
||||
result = Boolean.TRUE.equals(expression.getValue(evaluationContext, Boolean.class));
|
||||
} else if (rule.getEffect() == AlertFilterRule.Effect.DENY) {
|
||||
result = Boolean.FALSE.equals(expression.getValue(evaluationContext, Boolean.class));
|
||||
}
|
||||
LOG.debug("Alert evaluated as Result : {}", result);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
@ -38,7 +38,6 @@ import org.openmetadata.schema.type.EntityReference;
|
||||
import org.openmetadata.schema.type.EventType;
|
||||
import org.openmetadata.service.Entity;
|
||||
import org.openmetadata.service.OpenMetadataApplicationConfig;
|
||||
import org.openmetadata.service.filter.FilterRegistry;
|
||||
import org.openmetadata.service.jdbi3.CollectionDAO;
|
||||
import org.openmetadata.service.jdbi3.FeedRepository;
|
||||
import org.openmetadata.service.resources.feeds.MessageParser.EntityLink;
|
||||
@ -69,7 +68,7 @@ public class ChangeEventHandler implements EventHandler {
|
||||
String loggedInUserName = securityContext.getUserPrincipal().getName();
|
||||
try {
|
||||
notificationHandler.processNotifications(responseContext);
|
||||
ChangeEvent changeEvent = getChangeEvent(method, responseContext);
|
||||
ChangeEvent changeEvent = getChangeEvent(loggedInUserName, method, responseContext);
|
||||
if (changeEvent == null) {
|
||||
return null;
|
||||
}
|
||||
@ -92,7 +91,7 @@ public class ChangeEventHandler implements EventHandler {
|
||||
if (Entity.shouldDisplayEntityChangeOnFeed(changeEvent.getEntityType())) {
|
||||
// ignore usageSummary updates in the feed
|
||||
boolean filterEnabled;
|
||||
filterEnabled = FilterUtil.shouldProcessRequest(changeEvent, FilterRegistry.getAllFilters());
|
||||
filterEnabled = FilterUtil.shouldProcessRequest(changeEvent);
|
||||
if (filterEnabled) {
|
||||
for (Thread thread : listOrEmpty(getThreads(responseContext, loggedInUserName))) {
|
||||
// Don't create a thread if there is no message
|
||||
@ -126,7 +125,7 @@ public class ChangeEventHandler implements EventHandler {
|
||||
return null;
|
||||
}
|
||||
|
||||
public ChangeEvent getChangeEvent(String method, ContainerResponseContext responseContext) {
|
||||
public ChangeEvent getChangeEvent(String updateBy, String method, ContainerResponseContext responseContext) {
|
||||
// GET operations don't produce change events
|
||||
if (method.equals("GET")) {
|
||||
return null;
|
||||
@ -147,7 +146,7 @@ public class ChangeEventHandler implements EventHandler {
|
||||
EntityReference entityReference = entityInterface.getEntityReference();
|
||||
String entityType = entityReference.getType();
|
||||
String entityFQN = entityReference.getFullyQualifiedName();
|
||||
return getChangeEvent(EventType.ENTITY_CREATED, entityType, entityInterface)
|
||||
return getChangeEvent(updateBy, EventType.ENTITY_CREATED, entityType, entityInterface)
|
||||
.withEntity(entityInterface)
|
||||
.withEntityFullyQualifiedName(entityFQN);
|
||||
}
|
||||
@ -171,7 +170,7 @@ public class ChangeEventHandler implements EventHandler {
|
||||
eventType = ENTITY_SOFT_DELETED;
|
||||
}
|
||||
|
||||
return getChangeEvent(eventType, entityType, entityInterface)
|
||||
return getChangeEvent(updateBy, eventType, entityType, entityInterface)
|
||||
.withPreviousVersion(entityInterface.getChangeDescription().getPreviousVersion())
|
||||
.withEntity(entityInterface)
|
||||
.withEntityFullyQualifiedName(entityFQN);
|
||||
@ -188,7 +187,7 @@ public class ChangeEventHandler implements EventHandler {
|
||||
EntityReference entityReference = entityInterface.getEntityReference();
|
||||
String entityType = entityReference.getType();
|
||||
String entityFQN = entityReference.getFullyQualifiedName();
|
||||
return getChangeEvent(ENTITY_DELETED, entityType, entityInterface)
|
||||
return getChangeEvent(updateBy, ENTITY_DELETED, entityType, entityInterface)
|
||||
.withPreviousVersion(entityInterface.getVersion())
|
||||
.withEntity(entityInterface)
|
||||
.withEntityFullyQualifiedName(entityFQN);
|
||||
@ -196,12 +195,13 @@ public class ChangeEventHandler implements EventHandler {
|
||||
return null;
|
||||
}
|
||||
|
||||
private static ChangeEvent getChangeEvent(EventType eventType, String entityType, EntityInterface entityInterface) {
|
||||
private static ChangeEvent getChangeEvent(
|
||||
String updateBy, EventType eventType, String entityType, EntityInterface entityInterface) {
|
||||
return new ChangeEvent()
|
||||
.withEventType(eventType)
|
||||
.withEntityId(entityInterface.getId())
|
||||
.withEntityType(entityType)
|
||||
.withUserName(entityInterface.getUpdatedBy())
|
||||
.withUserName(updateBy)
|
||||
.withTimestamp(entityInterface.getUpdatedAt())
|
||||
.withChangeDescription(entityInterface.getChangeDescription())
|
||||
.withCurrentVersion(entityInterface.getVersion());
|
||||
|
@ -18,16 +18,19 @@ import java.util.concurrent.ForkJoinPool;
|
||||
import javax.ws.rs.container.ContainerRequestContext;
|
||||
import javax.ws.rs.container.ContainerResponseContext;
|
||||
import javax.ws.rs.container.ContainerResponseFilter;
|
||||
import javax.ws.rs.core.UriInfo;
|
||||
import javax.ws.rs.ext.Provider;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.jdbi.v3.core.Jdbi;
|
||||
import org.openmetadata.service.OpenMetadataApplicationConfig;
|
||||
import org.openmetadata.service.security.JwtFilter;
|
||||
import org.openmetadata.service.util.ParallelStreamUtil;
|
||||
|
||||
@Slf4j
|
||||
@Provider
|
||||
public class EventFilter implements ContainerResponseFilter {
|
||||
private static final List<String> AUDITABLE_METHODS = Arrays.asList("POST", "PUT", "PATCH", "DELETE");
|
||||
|
||||
private static final int FORK_JOIN_POOL_PARALLELISM = 20;
|
||||
private final ForkJoinPool forkJoinPool;
|
||||
private final List<EventHandler> eventHandlers;
|
||||
@ -65,7 +68,11 @@ public class EventFilter implements ContainerResponseFilter {
|
||||
eventHandlers
|
||||
.parallelStream()
|
||||
.forEach(
|
||||
eventHandler ->
|
||||
ParallelStreamUtil.runAsync(() -> eventHandler.process(requestContext, responseContext), forkJoinPool));
|
||||
(eventHandler) -> {
|
||||
UriInfo uriInfo = requestContext.getUriInfo();
|
||||
if (JwtFilter.EXCLUDED_ENDPOINTS.stream().noneMatch(endpoint -> uriInfo.getPath().contains(endpoint))) {
|
||||
ParallelStreamUtil.runAsync(() -> eventHandler.process(requestContext, responseContext), forkJoinPool);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
@ -13,18 +13,15 @@
|
||||
|
||||
package org.openmetadata.service.filter;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import org.openmetadata.schema.entity.alerts.Alert;
|
||||
import org.openmetadata.schema.filter.EventFilter;
|
||||
import org.openmetadata.schema.filter.Filters;
|
||||
import org.openmetadata.schema.settings.Settings;
|
||||
import org.openmetadata.schema.type.EventType;
|
||||
import org.openmetadata.service.resources.settings.SettingsCache;
|
||||
import org.openmetadata.service.util.FilterUtil;
|
||||
import org.openmetadata.service.alerts.ActivityFeedAlertCache;
|
||||
|
||||
public class FilterRegistry {
|
||||
private static final ConcurrentHashMap<String, Map<EventType, Filters>> FILTERS_MAP = new ConcurrentHashMap<>();
|
||||
@ -47,23 +44,9 @@ public class FilterRegistry {
|
||||
}
|
||||
}
|
||||
|
||||
public static List<Map<EventType, Filters>> listAllFilters() {
|
||||
List<Map<EventType, Filters>> filterList = new ArrayList<>();
|
||||
FILTERS_MAP.forEach((key, value) -> filterList.add(value));
|
||||
return filterList;
|
||||
}
|
||||
|
||||
public static List<String> listAllEntitiesHavingFilter() {
|
||||
return List.copyOf(FILTERS_MAP.keySet());
|
||||
}
|
||||
|
||||
public static Map<EventType, Filters> getFilterForEntity(String key) {
|
||||
return FILTERS_MAP.get(key);
|
||||
}
|
||||
|
||||
public static Map<String, Map<EventType, Filters>> getAllFilters() throws IOException {
|
||||
Settings settings = SettingsCache.getInstance().getEventFilters();
|
||||
add(FilterUtil.getEventFilterFromSettings(settings));
|
||||
public static Map<String, Map<EventType, Filters>> getAllFilters() {
|
||||
Alert alert = ActivityFeedAlertCache.getInstance().getActivityFeedAlert();
|
||||
add(alert.getTriggerConfig().getEventFilters());
|
||||
return FILTERS_MAP;
|
||||
}
|
||||
}
|
||||
|
@ -23,6 +23,7 @@ import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import io.swagger.v3.oas.annotations.parameters.RequestBody;
|
||||
import io.swagger.v3.oas.annotations.responses.ApiResponse;
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
import javax.json.JsonPatch;
|
||||
import javax.validation.Valid;
|
||||
@ -45,16 +46,21 @@ import javax.ws.rs.core.Response;
|
||||
import javax.ws.rs.core.SecurityContext;
|
||||
import javax.ws.rs.core.UriInfo;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.openmetadata.common.utils.CommonUtil;
|
||||
import org.openmetadata.schema.api.events.CreateAlertAction;
|
||||
import org.openmetadata.schema.entity.alerts.AlertAction;
|
||||
import org.openmetadata.schema.type.EntityHistory;
|
||||
import org.openmetadata.schema.type.Include;
|
||||
import org.openmetadata.service.OpenMetadataApplicationConfig;
|
||||
import org.openmetadata.service.jdbi3.AlertActionRepository;
|
||||
import org.openmetadata.service.jdbi3.CollectionDAO;
|
||||
import org.openmetadata.service.jdbi3.ListFilter;
|
||||
import org.openmetadata.service.resources.Collection;
|
||||
import org.openmetadata.service.resources.EntityResource;
|
||||
import org.openmetadata.service.resources.policies.PolicyResource;
|
||||
import org.openmetadata.service.security.Authorizer;
|
||||
import org.openmetadata.service.util.EntityUtil;
|
||||
import org.openmetadata.service.util.JsonUtils;
|
||||
import org.openmetadata.service.util.ResultList;
|
||||
|
||||
@Slf4j
|
||||
@ -62,7 +68,7 @@ import org.openmetadata.service.util.ResultList;
|
||||
@Api(value = "Alerts collection", tags = "Alerts collection")
|
||||
@Produces(MediaType.APPLICATION_JSON)
|
||||
@Consumes(MediaType.APPLICATION_JSON)
|
||||
@Collection(name = "alertAction", order = 7) // init befoe Alert Resource Init
|
||||
@Collection(name = "alertAction", order = 7) // init before Alert Resource Init
|
||||
public class AlertActionResource extends EntityResource<AlertAction, AlertActionRepository> {
|
||||
public static final String COLLECTION_PATH = "v1/alertAction/";
|
||||
|
||||
@ -81,6 +87,28 @@ public class AlertActionResource extends EntityResource<AlertAction, AlertAction
|
||||
public AlertActionList() {}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void initialize(OpenMetadataApplicationConfig config) throws IOException {
|
||||
initDefaultAlertActions();
|
||||
}
|
||||
|
||||
private void initDefaultAlertActions() throws IOException {
|
||||
List<String> jsonDataFiles = EntityUtil.getJsonDataResources(".*json/data/alerts/alertsActionData.json$");
|
||||
if (jsonDataFiles.size() != 1) {
|
||||
LOG.warn("Invalid number of jsonDataFiles {}. Only one expected.", jsonDataFiles.size());
|
||||
return;
|
||||
}
|
||||
String jsonDataFile = jsonDataFiles.get(0);
|
||||
try {
|
||||
String json = CommonUtil.getResourceAsStream(PolicyResource.class.getClassLoader(), jsonDataFile);
|
||||
// Assumes to have 1 entry currently
|
||||
AlertAction alertActions = JsonUtils.readObjects(json, AlertAction.class).get(0);
|
||||
dao.initializeEntity(alertActions);
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Failed to initialize the resource descriptors from file {}", jsonDataFile, e);
|
||||
}
|
||||
}
|
||||
|
||||
@GET
|
||||
@Operation(
|
||||
operationId = "listAlertActions",
|
||||
|
@ -52,19 +52,24 @@ import lombok.extern.slf4j.Slf4j;
|
||||
import org.openmetadata.common.utils.CommonUtil;
|
||||
import org.openmetadata.schema.api.events.CreateAlert;
|
||||
import org.openmetadata.schema.entity.alerts.Alert;
|
||||
import org.openmetadata.schema.entity.alerts.AlertAction;
|
||||
import org.openmetadata.schema.entity.alerts.AlertActionStatus;
|
||||
import org.openmetadata.schema.entity.alerts.EntitySpelFilters;
|
||||
import org.openmetadata.schema.entity.alerts.TriggerConfig;
|
||||
import org.openmetadata.schema.type.EntityHistory;
|
||||
import org.openmetadata.schema.type.Function;
|
||||
import org.openmetadata.schema.type.Include;
|
||||
import org.openmetadata.service.Entity;
|
||||
import org.openmetadata.service.OpenMetadataApplicationConfig;
|
||||
import org.openmetadata.service.alerts.ActivityFeedAlertCache;
|
||||
import org.openmetadata.service.alerts.AlertUtil;
|
||||
import org.openmetadata.service.jdbi3.AlertRepository;
|
||||
import org.openmetadata.service.jdbi3.CollectionDAO;
|
||||
import org.openmetadata.service.jdbi3.EntityRepository;
|
||||
import org.openmetadata.service.jdbi3.ListFilter;
|
||||
import org.openmetadata.service.resources.Collection;
|
||||
import org.openmetadata.service.resources.EntityResource;
|
||||
import org.openmetadata.service.resources.policies.PolicyResource;
|
||||
import org.openmetadata.service.resources.settings.SettingsResource;
|
||||
import org.openmetadata.service.security.Authorizer;
|
||||
import org.openmetadata.service.util.EntityUtil;
|
||||
@ -79,12 +84,12 @@ import org.openmetadata.service.util.ResultList;
|
||||
@Collection(name = "alerts", order = 8) // init after alertAction Resource
|
||||
public class AlertResource extends EntityResource<Alert, AlertRepository> {
|
||||
public static final String COLLECTION_PATH = "v1/alerts/";
|
||||
private final CollectionDAO.AlertDAO alertDAO;
|
||||
private final CollectionDAO daoCollection;
|
||||
private List<TriggerConfig> bootStrappedFilters = new ArrayList<>();
|
||||
private Map<String, EntitySpelFilters> entitySpelFiltersList = new HashMap<>();
|
||||
private final Map<String, EntitySpelFilters> entitySpelFiltersList = new HashMap<>();
|
||||
static final String FIELDS = "triggerConfig,filteringRules,alertActions";
|
||||
|
||||
private void initDefaultTriggersSettings() throws IOException {
|
||||
private void initAlerts() throws IOException {
|
||||
// Load Trigger File
|
||||
List<String> triggerDataFiles = EntityUtil.getJsonDataResources(".*json/data/alerts/triggerData.json$");
|
||||
if (triggerDataFiles.size() != 1) {
|
||||
@ -109,13 +114,44 @@ public class AlertResource extends EntityResource<Alert, AlertRepository> {
|
||||
try {
|
||||
String json = CommonUtil.getResourceAsStream(getClass().getClassLoader(), filterDataFile);
|
||||
List<EntitySpelFilters> filters = JsonUtils.readObjects(json, EntitySpelFilters.class);
|
||||
filters.forEach(
|
||||
(spelFilter) -> {
|
||||
entitySpelFiltersList.put(spelFilter.getEntityType(), spelFilter);
|
||||
});
|
||||
filters.forEach((spelFilter) -> entitySpelFiltersList.put(spelFilter.getEntityType(), spelFilter));
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Failed to initialize the {} from file {}", "filters", filterDataFile, e);
|
||||
}
|
||||
|
||||
// Initialize Alert For ActivityFeed, this does not have any publisher since it is for internal system filtering
|
||||
List<String> alertFile = EntityUtil.getJsonDataResources(".*json/data/alerts/alertsData.json$");
|
||||
List<String> alertActionFile = EntityUtil.getJsonDataResources(".*json/data/alerts/alertsActionData.json$");
|
||||
String alertDataFile = alertFile.get(0);
|
||||
String alertActionDataFile = alertActionFile.get(0);
|
||||
Alert activityFeedAlert = null;
|
||||
try {
|
||||
String actionJson = CommonUtil.getResourceAsStream(PolicyResource.class.getClassLoader(), alertActionDataFile);
|
||||
// Assumes to have 1 entry currently
|
||||
AlertAction alertActions = JsonUtils.readObjects(actionJson, AlertAction.class).get(0);
|
||||
|
||||
String alertJson = CommonUtil.getResourceAsStream(getClass().getClassLoader(), alertDataFile);
|
||||
activityFeedAlert = JsonUtils.readObjects(alertJson, Alert.class).get(0);
|
||||
// populate alert actions
|
||||
EntityRepository<AlertAction> actionEntityRepository = Entity.getEntityRepository(Entity.ALERT_ACTION);
|
||||
AlertAction action =
|
||||
actionEntityRepository.getByName(null, alertActions.getName(), actionEntityRepository.getFields("id"));
|
||||
activityFeedAlert.setAlertActions(List.of(action.getEntityReference()));
|
||||
dao.initializeEntity(activityFeedAlert);
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Failed to initialize the {} from file {}", "filters", alertDataFile, e);
|
||||
}
|
||||
|
||||
// Init Publishers
|
||||
ActivityFeedAlertCache.initialize(activityFeedAlert.getName(), daoCollection);
|
||||
// Create Publishers
|
||||
List<String> listAllAlerts = daoCollection.alertDAO().listAllAlerts(daoCollection.alertDAO().getTableName());
|
||||
List<Alert> alertList = JsonUtils.readObjects(listAllAlerts, Alert.class);
|
||||
for (Alert alert : alertList) {
|
||||
if (alert.getName().equals(activityFeedAlert.getName())) {
|
||||
dao.addAlertActionPublishers(alert);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -125,7 +161,7 @@ public class AlertResource extends EntityResource<Alert, AlertRepository> {
|
||||
|
||||
public AlertResource(CollectionDAO dao, Authorizer authorizer) {
|
||||
super(Alert.class, new AlertRepository(dao), authorizer);
|
||||
alertDAO = dao.alertDAO();
|
||||
daoCollection = dao;
|
||||
}
|
||||
|
||||
public static class AlertList extends ResultList<Alert> {
|
||||
@ -137,12 +173,7 @@ public class AlertResource extends EntityResource<Alert, AlertRepository> {
|
||||
@Override
|
||||
public void initialize(OpenMetadataApplicationConfig config) {
|
||||
try {
|
||||
List<String> listAllAlerts = alertDAO.listAllAlerts(alertDAO.getTableName());
|
||||
List<Alert> alertList = JsonUtils.readObjects(listAllAlerts, Alert.class);
|
||||
for (Alert alert : alertList) {
|
||||
dao.addAlertActionPublishers(alert);
|
||||
}
|
||||
initDefaultTriggersSettings();
|
||||
initAlerts();
|
||||
} catch (Exception ex) {
|
||||
// Starting application should not fail
|
||||
LOG.warn("Exception during initialization", ex);
|
||||
|
@ -32,6 +32,9 @@ import org.openmetadata.schema.type.ChangeEvent;
|
||||
import org.openmetadata.schema.type.EventType;
|
||||
import org.openmetadata.schema.type.FieldChange;
|
||||
import org.openmetadata.service.Entity;
|
||||
import org.openmetadata.service.alerts.ActivityFeedAlertCache;
|
||||
import org.openmetadata.service.alerts.AlertUtil;
|
||||
import org.openmetadata.service.filter.FilterRegistry;
|
||||
|
||||
@Slf4j
|
||||
public class FilterUtil {
|
||||
@ -65,6 +68,19 @@ public class FilterUtil {
|
||||
return false;
|
||||
}
|
||||
|
||||
public static boolean shouldProcessRequest(ChangeEvent event) {
|
||||
// Check Trigger Conditions
|
||||
if (!shouldProcessRequest(event, FilterRegistry.getAllFilters())) {
|
||||
return false;
|
||||
}
|
||||
// Check Spel Conditions
|
||||
if (!AlertUtil.evaluateAlertConditions(
|
||||
event, ActivityFeedAlertCache.getInstance().getActivityFeedAlert().getFilteringRules())) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
private static boolean handleTestCaseFilter(ChangeEvent changeEvent, Filters sf) {
|
||||
List<FieldChange> fieldChanges = getAllFieldChange(changeEvent);
|
||||
for (FieldChange fieldChange : fieldChanges) {
|
||||
|
@ -0,0 +1,12 @@
|
||||
[
|
||||
{
|
||||
"name": "ActivityFeedAlertAction",
|
||||
"enabled": "true",
|
||||
"alertActionType": "ActivityFeed",
|
||||
"alertActionConfig": {
|
||||
"name": "ActivityFeedAlertAction",
|
||||
"description": "Controls the Activity Displayed on the Feeds Page"
|
||||
},
|
||||
"provider" : "system"
|
||||
}
|
||||
]
|
@ -0,0 +1,232 @@
|
||||
[
|
||||
{
|
||||
"name": "ActivityFeedAlert",
|
||||
"triggerConfig": {
|
||||
"type": "EntitySpecific",
|
||||
"eventFilters": [
|
||||
{
|
||||
"entityType": "all",
|
||||
"filters": [
|
||||
{
|
||||
"eventType": "entityCreated",
|
||||
"include": [
|
||||
"all"
|
||||
],
|
||||
"exclude": []
|
||||
},
|
||||
{
|
||||
"eventType": "entityUpdated",
|
||||
"include": [
|
||||
"all"
|
||||
],
|
||||
"exclude": ["usageSummary"]
|
||||
},
|
||||
{
|
||||
"eventType": "entityDeleted",
|
||||
"include": [
|
||||
"all"
|
||||
],
|
||||
"exclude": []
|
||||
},
|
||||
{
|
||||
"eventType": "entitySoftDeleted",
|
||||
"include": [
|
||||
"all"
|
||||
],
|
||||
"exclude": []
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"entityType": "table",
|
||||
"filters": [
|
||||
{
|
||||
"eventType": "entityCreated",
|
||||
"include": [
|
||||
"all"
|
||||
],
|
||||
"exclude": []
|
||||
},
|
||||
{
|
||||
"eventType": "entityUpdated",
|
||||
"include": [
|
||||
"description",
|
||||
"owner",
|
||||
"tags",
|
||||
"followers",
|
||||
"tableConstraints",
|
||||
"constraint"
|
||||
],
|
||||
"exclude": []
|
||||
},
|
||||
{
|
||||
"eventType": "entityDeleted",
|
||||
"include": [
|
||||
"all"
|
||||
],
|
||||
"exclude": []
|
||||
},
|
||||
{
|
||||
"eventType": "entitySoftDeleted",
|
||||
"include": [
|
||||
"all"
|
||||
],
|
||||
"exclude": []
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"entityType": "dashboard",
|
||||
"filters": [
|
||||
{
|
||||
"eventType": "entityCreated",
|
||||
"include": [
|
||||
"all"
|
||||
],
|
||||
"exclude": []
|
||||
},
|
||||
{
|
||||
"eventType": "entityUpdated",
|
||||
"include": [
|
||||
"description",
|
||||
"owner",
|
||||
"tags",
|
||||
"followers"
|
||||
],
|
||||
"exclude": []
|
||||
},
|
||||
{
|
||||
"eventType": "entityDeleted",
|
||||
"include": [
|
||||
"all"
|
||||
],
|
||||
"exclude": []
|
||||
},
|
||||
{
|
||||
"eventType": "entitySoftDeleted",
|
||||
"include": [
|
||||
"all"
|
||||
],
|
||||
"exclude": []
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"entityType": "pipeline",
|
||||
"filters": [
|
||||
{
|
||||
"eventType": "entityCreated",
|
||||
"include": [
|
||||
"all"
|
||||
],
|
||||
"exclude": []
|
||||
},
|
||||
{
|
||||
"eventType": "entityUpdated",
|
||||
"include": [
|
||||
"description",
|
||||
"owner",
|
||||
"tags",
|
||||
"followers"
|
||||
],
|
||||
"exclude": []
|
||||
},
|
||||
{
|
||||
"eventType": "entityDeleted",
|
||||
"include": [
|
||||
"all"
|
||||
],
|
||||
"exclude": []
|
||||
},
|
||||
{
|
||||
"eventType": "entitySoftDeleted",
|
||||
"include": [
|
||||
"all"
|
||||
],
|
||||
"exclude": []
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"entityType": "mlmodel",
|
||||
"filters": [
|
||||
{
|
||||
"eventType": "entityCreated",
|
||||
"include": [
|
||||
"all"
|
||||
],
|
||||
"exclude": []
|
||||
},
|
||||
{
|
||||
"eventType": "entityUpdated",
|
||||
"include": [
|
||||
"description",
|
||||
"owner",
|
||||
"tags",
|
||||
"followers"
|
||||
],
|
||||
"exclude": []
|
||||
},
|
||||
{
|
||||
"eventType": "entityDeleted",
|
||||
"include": [
|
||||
"all"
|
||||
],
|
||||
"exclude": []
|
||||
},
|
||||
{
|
||||
"eventType": "entitySoftDeleted",
|
||||
"include": [
|
||||
"all"
|
||||
],
|
||||
"exclude": []
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"entityType": "testCase",
|
||||
"filters": [
|
||||
{
|
||||
"eventType": "entityCreated",
|
||||
"include": [
|
||||
"all"
|
||||
],
|
||||
"exclude": []
|
||||
},
|
||||
{
|
||||
"eventType": "entityUpdated",
|
||||
"include": [
|
||||
"testCaseResultSuccess",
|
||||
"testCaseResultFailed",
|
||||
"testCaseResultAborted"
|
||||
],
|
||||
"exclude": []
|
||||
},
|
||||
{
|
||||
"eventType": "entityDeleted",
|
||||
"include": [
|
||||
"all"
|
||||
],
|
||||
"exclude": []
|
||||
},
|
||||
{
|
||||
"eventType": "entitySoftDeleted",
|
||||
"include": [
|
||||
"all"
|
||||
],
|
||||
"exclude": []
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
},
|
||||
"filteringRules": [
|
||||
{
|
||||
"name": "DisableIngestionActivityFeedData",
|
||||
"effect": "allow",
|
||||
"condition": "!matchUpdatedBy('ingestion-bot')"
|
||||
}
|
||||
],
|
||||
"provider" : "system"
|
||||
}
|
||||
]
|
@ -14,7 +14,8 @@
|
||||
"GenericWebhook",
|
||||
"SlackWebhook",
|
||||
"MsTeamsWebhook",
|
||||
"Email"
|
||||
"Email",
|
||||
"ActivityFeed"
|
||||
]
|
||||
},
|
||||
"status": {
|
||||
@ -56,6 +57,21 @@
|
||||
},
|
||||
"additionalProperties": false
|
||||
},
|
||||
"activityFeedAlertActionConfig": {
|
||||
"description": "Activity Feed Alert Action Config",
|
||||
"type": "object",
|
||||
"javaType": "org.openmetadata.schema.entity.alerts.ActivityFeedAlertActionConfig",
|
||||
"properties": {
|
||||
"name": {
|
||||
"description": "Name",
|
||||
"type": "string"
|
||||
},
|
||||
"description": {
|
||||
"description": "Description",
|
||||
"type": "string"
|
||||
}
|
||||
}
|
||||
},
|
||||
"alertActionStatus": {
|
||||
"description": "Alert Action Current Status",
|
||||
"type": "object",
|
||||
@ -152,6 +168,9 @@
|
||||
},
|
||||
{
|
||||
"$ref": "./emailAlertConfig.json"
|
||||
},
|
||||
{
|
||||
"$ref": "#/definitions/activityFeedAlertActionConfig"
|
||||
}
|
||||
]
|
||||
},
|
||||
@ -163,5 +182,7 @@
|
||||
"provider" : {
|
||||
"$ref": "../type/basic.json#/definitions/providerType"
|
||||
}
|
||||
}
|
||||
},
|
||||
"required": ["name", "alertActionType", "alertActionConfig"],
|
||||
"additionalProperties": false
|
||||
}
|
||||
|
@ -51,6 +51,12 @@
|
||||
"oneOf": [
|
||||
{
|
||||
"$ref": "../../entity/events/webhook.json"
|
||||
},
|
||||
{
|
||||
"$ref": "../emailAlertConfig.json"
|
||||
},
|
||||
{
|
||||
"$ref": "../alertAction.json#/definitions/activityFeedAlertActionConfig"
|
||||
}
|
||||
]
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user