Refactor publishers to use Notification Templates engine (#24145)

This commit is contained in:
Adrià Manero 2025-11-05 15:57:55 +01:00 committed by GitHub
parent b3238fd654
commit 650197d9c7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 110 additions and 46 deletions

View File

@ -30,16 +30,16 @@ import org.openmetadata.service.Entity;
import org.openmetadata.service.apps.bundles.changeEvent.Destination; import org.openmetadata.service.apps.bundles.changeEvent.Destination;
import org.openmetadata.service.events.errors.EventPublisherException; import org.openmetadata.service.events.errors.EventPublisherException;
import org.openmetadata.service.exception.CatalogExceptionMessage; import org.openmetadata.service.exception.CatalogExceptionMessage;
import org.openmetadata.service.formatter.decorators.EmailMessageDecorator; import org.openmetadata.service.jdbi3.NotificationTemplateRepository;
import org.openmetadata.service.formatter.decorators.MessageDecorator; import org.openmetadata.service.notifications.HandlebarsNotificationMessageEngine;
import org.openmetadata.service.jdbi3.CollectionDAO; import org.openmetadata.service.notifications.channels.NotificationMessage;
import org.openmetadata.service.notifications.channels.email.EmailMessage;
import org.openmetadata.service.util.email.EmailUtil; import org.openmetadata.service.util.email.EmailUtil;
@Slf4j @Slf4j
public class EmailPublisher implements Destination<ChangeEvent> { public class EmailPublisher implements Destination<ChangeEvent> {
private final MessageDecorator<EmailMessage> emailDecorator = new EmailMessageDecorator(); private final HandlebarsNotificationMessageEngine messageEngine;
private final EmailAlertConfig emailAlertConfig; private final EmailAlertConfig emailAlertConfig;
private final CollectionDAO daoCollection;
@Getter private final SubscriptionDestination subscriptionDestination; @Getter private final SubscriptionDestination subscriptionDestination;
private final EventSubscription eventSubscription; private final EventSubscription eventSubscription;
@ -51,7 +51,10 @@ public class EmailPublisher implements Destination<ChangeEvent> {
this.subscriptionDestination = subscriptionDestination; this.subscriptionDestination = subscriptionDestination;
this.emailAlertConfig = this.emailAlertConfig =
JsonUtils.convertValue(subscriptionDestination.getConfig(), EmailAlertConfig.class); JsonUtils.convertValue(subscriptionDestination.getConfig(), EmailAlertConfig.class);
this.daoCollection = Entity.getCollectionDAO(); this.messageEngine =
new HandlebarsNotificationMessageEngine(
(NotificationTemplateRepository)
Entity.getEntityRepository(Entity.NOTIFICATION_TEMPLATE));
} else { } else {
throw new IllegalArgumentException("Email Alert Invoked with Illegal Type and Settings."); throw new IllegalArgumentException("Email Alert Invoked with Illegal Type and Settings.");
} }
@ -60,12 +63,20 @@ public class EmailPublisher implements Destination<ChangeEvent> {
@Override @Override
public void sendMessage(ChangeEvent event) throws EventPublisherException { public void sendMessage(ChangeEvent event) throws EventPublisherException {
try { try {
// Generate message using new Handlebars pipeline
NotificationMessage message =
messageEngine.generateMessage(event, eventSubscription, subscriptionDestination);
EmailMessage emailMessage = (EmailMessage) message;
// Get receivers
Set<String> receivers = getTargetsForAlert(emailAlertConfig, subscriptionDestination, event); Set<String> receivers = getTargetsForAlert(emailAlertConfig, subscriptionDestination, event);
EmailMessage emailMessage =
emailDecorator.buildOutgoingMessage(getDisplayNameOrFqn(eventSubscription), event); // Send using new helper method
for (String email : receivers) { for (String receiver : receivers) {
EmailUtil.sendChangeEventMail(getDisplayNameOrFqn(eventSubscription), email, emailMessage); EmailUtil.sendNotificationEmail(
receiver, emailMessage.getSubject(), emailMessage.getHtmlContent());
} }
setSuccessStatus(System.currentTimeMillis()); setSuccessStatus(System.currentTimeMillis());
} catch (Exception e) { } catch (Exception e) {
setErrorStatus(System.currentTimeMillis(), 500, e.getMessage()); setErrorStatus(System.currentTimeMillis(), 500, e.getMessage());

View File

@ -31,21 +31,23 @@ import org.openmetadata.schema.entity.events.SubscriptionDestination;
import org.openmetadata.schema.type.ChangeEvent; import org.openmetadata.schema.type.ChangeEvent;
import org.openmetadata.schema.type.Webhook; import org.openmetadata.schema.type.Webhook;
import org.openmetadata.schema.utils.JsonUtils; import org.openmetadata.schema.utils.JsonUtils;
import org.openmetadata.service.Entity;
import org.openmetadata.service.apps.bundles.changeEvent.Destination; import org.openmetadata.service.apps.bundles.changeEvent.Destination;
import org.openmetadata.service.events.errors.EventPublisherException; import org.openmetadata.service.events.errors.EventPublisherException;
import org.openmetadata.service.exception.CatalogExceptionMessage; import org.openmetadata.service.exception.CatalogExceptionMessage;
import org.openmetadata.service.formatter.decorators.GChatMessageDecorator; import org.openmetadata.service.formatter.decorators.GChatMessageDecorator;
import org.openmetadata.service.formatter.decorators.MessageDecorator; import org.openmetadata.service.jdbi3.NotificationTemplateRepository;
import org.openmetadata.service.notifications.HandlebarsNotificationMessageEngine;
import org.openmetadata.service.notifications.channels.NotificationMessage;
import org.openmetadata.service.notifications.channels.gchat.GChatMessageV2;
@Slf4j @Slf4j
public class GChatPublisher implements Destination<ChangeEvent> { public class GChatPublisher implements Destination<ChangeEvent> {
private final MessageDecorator<GChatMessage> gChatMessageMessageDecorator = private final HandlebarsNotificationMessageEngine messageEngine;
new GChatMessageDecorator();
private final Webhook webhook; private final Webhook webhook;
private final Client client; private final Client client;
@Getter private final SubscriptionDestination subscriptionDestination; @Getter private final SubscriptionDestination subscriptionDestination;
private final EventSubscription eventSubscription; private final EventSubscription eventSubscription;
public GChatPublisher( public GChatPublisher(
@ -54,10 +56,12 @@ public class GChatPublisher implements Destination<ChangeEvent> {
this.eventSubscription = eventSubscription; this.eventSubscription = eventSubscription;
this.subscriptionDestination = subscriptionDestination; this.subscriptionDestination = subscriptionDestination;
this.webhook = JsonUtils.convertValue(subscriptionDestination.getConfig(), Webhook.class); this.webhook = JsonUtils.convertValue(subscriptionDestination.getConfig(), Webhook.class);
this.client =
// Build Client
client =
getClient(subscriptionDestination.getTimeout(), subscriptionDestination.getReadTimeout()); getClient(subscriptionDestination.getTimeout(), subscriptionDestination.getReadTimeout());
this.messageEngine =
new HandlebarsNotificationMessageEngine(
(NotificationTemplateRepository)
Entity.getEntityRepository(Entity.NOTIFICATION_TEMPLATE));
} else { } else {
throw new IllegalArgumentException("GChat Alert Invoked with Illegal Type and Settings."); throw new IllegalArgumentException("GChat Alert Invoked with Illegal Type and Settings.");
} }
@ -65,15 +69,18 @@ public class GChatPublisher implements Destination<ChangeEvent> {
@Override @Override
public void sendMessage(ChangeEvent event) throws EventPublisherException { public void sendMessage(ChangeEvent event) throws EventPublisherException {
try { try {
GChatMessage gchatMessage = // Generate message using new Handlebars pipeline
gChatMessageMessageDecorator.buildOutgoingMessage( NotificationMessage message =
getDisplayNameOrFqn(eventSubscription), event); messageEngine.generateMessage(event, eventSubscription, subscriptionDestination);
GChatMessageV2 gchatMessage = (GChatMessageV2) message;
// Send using existing webhook utilities
String json = JsonUtils.pojoToJsonIgnoreNull(gchatMessage); String json = JsonUtils.pojoToJsonIgnoreNull(gchatMessage);
List<Invocation.Builder> targets = List<Invocation.Builder> targets =
getTargetsForWebhookAlert(webhook, subscriptionDestination, client, event, json); getTargetsForWebhookAlert(webhook, subscriptionDestination, client, event, json);
targets.add(getTarget(client, webhook, json)); targets.add(getTarget(client, webhook, json));
for (Invocation.Builder actionTarget : targets) { for (Invocation.Builder actionTarget : targets) {
postWebhookMessage(this, actionTarget, gchatMessage); postWebhookMessage(this, actionTarget, gchatMessage);
} }
@ -90,7 +97,9 @@ public class GChatPublisher implements Destination<ChangeEvent> {
@Override @Override
public void sendTestMessage() throws EventPublisherException { public void sendTestMessage() throws EventPublisherException {
try { try {
GChatMessage gchatMessage = gChatMessageMessageDecorator.buildOutgoingTestMessage(); // Use legacy test message (unchanged)
GChatMessage gchatMessage = new GChatMessageDecorator().buildOutgoingTestMessage();
deliverTestWebhookMessage( deliverTestWebhookMessage(
this, getTarget(client, webhook, JsonUtils.pojoToJson(gchatMessage)), gchatMessage); this, getTarget(client, webhook, JsonUtils.pojoToJson(gchatMessage)), gchatMessage);
} catch (Exception e) { } catch (Exception e) {

View File

@ -31,16 +31,18 @@ import org.openmetadata.schema.entity.events.SubscriptionDestination;
import org.openmetadata.schema.type.ChangeEvent; import org.openmetadata.schema.type.ChangeEvent;
import org.openmetadata.schema.type.Webhook; import org.openmetadata.schema.type.Webhook;
import org.openmetadata.schema.utils.JsonUtils; import org.openmetadata.schema.utils.JsonUtils;
import org.openmetadata.service.Entity;
import org.openmetadata.service.apps.bundles.changeEvent.Destination; import org.openmetadata.service.apps.bundles.changeEvent.Destination;
import org.openmetadata.service.events.errors.EventPublisherException; import org.openmetadata.service.events.errors.EventPublisherException;
import org.openmetadata.service.exception.CatalogExceptionMessage; import org.openmetadata.service.exception.CatalogExceptionMessage;
import org.openmetadata.service.formatter.decorators.MSTeamsMessageDecorator; import org.openmetadata.service.formatter.decorators.MSTeamsMessageDecorator;
import org.openmetadata.service.formatter.decorators.MessageDecorator; import org.openmetadata.service.jdbi3.NotificationTemplateRepository;
import org.openmetadata.service.notifications.HandlebarsNotificationMessageEngine;
import org.openmetadata.service.notifications.channels.NotificationMessage;
@Slf4j @Slf4j
public class MSTeamsPublisher implements Destination<ChangeEvent> { public class MSTeamsPublisher implements Destination<ChangeEvent> {
private final MessageDecorator<TeamsMessage> teamsMessageFormatter = private final HandlebarsNotificationMessageEngine messageEngine;
new MSTeamsMessageDecorator();
private final Webhook webhook; private final Webhook webhook;
private final Client client; private final Client client;
@ -53,10 +55,12 @@ public class MSTeamsPublisher implements Destination<ChangeEvent> {
this.eventSubscription = eventSubscription; this.eventSubscription = eventSubscription;
this.subscriptionDestination = subscriptionDestination; this.subscriptionDestination = subscriptionDestination;
this.webhook = JsonUtils.convertValue(subscriptionDestination.getConfig(), Webhook.class); this.webhook = JsonUtils.convertValue(subscriptionDestination.getConfig(), Webhook.class);
this.client =
// Build Client
client =
getClient(subscriptionDestination.getTimeout(), subscriptionDestination.getReadTimeout()); getClient(subscriptionDestination.getTimeout(), subscriptionDestination.getReadTimeout());
this.messageEngine =
new HandlebarsNotificationMessageEngine(
(NotificationTemplateRepository)
Entity.getEntityRepository(Entity.NOTIFICATION_TEMPLATE));
} else { } else {
throw new IllegalArgumentException("MsTeams Alert Invoked with Illegal Type and Settings."); throw new IllegalArgumentException("MsTeams Alert Invoked with Illegal Type and Settings.");
} }
@ -65,12 +69,17 @@ public class MSTeamsPublisher implements Destination<ChangeEvent> {
@Override @Override
public void sendMessage(ChangeEvent event) throws EventPublisherException { public void sendMessage(ChangeEvent event) throws EventPublisherException {
try { try {
TeamsMessage teamsMessage = // Generate message using new Handlebars pipeline
teamsMessageFormatter.buildOutgoingMessage(getDisplayNameOrFqn(eventSubscription), event); NotificationMessage message =
messageEngine.generateMessage(event, eventSubscription, subscriptionDestination);
TeamsMessage teamsMessage = (TeamsMessage) message;
// Send using existing webhook utilities
String eventJson = JsonUtils.pojoToJson(teamsMessage); String eventJson = JsonUtils.pojoToJson(teamsMessage);
List<Invocation.Builder> targets = List<Invocation.Builder> targets =
getTargetsForWebhookAlert(webhook, subscriptionDestination, client, event, eventJson); getTargetsForWebhookAlert(webhook, subscriptionDestination, client, event, eventJson);
targets.add(getTarget(client, webhook, eventJson)); targets.add(getTarget(client, webhook, eventJson));
for (Invocation.Builder actionTarget : targets) { for (Invocation.Builder actionTarget : targets) {
postWebhookMessage(this, actionTarget, teamsMessage); postWebhookMessage(this, actionTarget, teamsMessage);
} }
@ -87,7 +96,9 @@ public class MSTeamsPublisher implements Destination<ChangeEvent> {
@Override @Override
public void sendTestMessage() throws EventPublisherException { public void sendTestMessage() throws EventPublisherException {
try { try {
TeamsMessage teamsMessage = teamsMessageFormatter.buildOutgoingTestMessage(); // Use legacy test message (unchanged)
TeamsMessage teamsMessage = new MSTeamsMessageDecorator().buildOutgoingTestMessage();
deliverTestWebhookMessage( deliverTestWebhookMessage(
this, getTarget(client, webhook, JsonUtils.pojoToJson(teamsMessage)), teamsMessage); this, getTarget(client, webhook, JsonUtils.pojoToJson(teamsMessage)), teamsMessage);
} catch (Exception e) { } catch (Exception e) {

View File

@ -34,17 +34,21 @@ import org.openmetadata.schema.entity.events.SubscriptionDestination;
import org.openmetadata.schema.type.ChangeEvent; import org.openmetadata.schema.type.ChangeEvent;
import org.openmetadata.schema.type.Webhook; import org.openmetadata.schema.type.Webhook;
import org.openmetadata.schema.utils.JsonUtils; import org.openmetadata.schema.utils.JsonUtils;
import org.openmetadata.service.Entity;
import org.openmetadata.service.apps.bundles.changeEvent.Destination; import org.openmetadata.service.apps.bundles.changeEvent.Destination;
import org.openmetadata.service.events.errors.EventPublisherException; import org.openmetadata.service.events.errors.EventPublisherException;
import org.openmetadata.service.exception.CatalogExceptionMessage; import org.openmetadata.service.exception.CatalogExceptionMessage;
import org.openmetadata.service.formatter.decorators.MessageDecorator;
import org.openmetadata.service.formatter.decorators.SlackMessageDecorator; import org.openmetadata.service.formatter.decorators.SlackMessageDecorator;
import org.openmetadata.service.jdbi3.NotificationTemplateRepository;
import org.openmetadata.service.notifications.HandlebarsNotificationMessageEngine;
import org.openmetadata.service.notifications.channels.NotificationMessage;
@Slf4j @Slf4j
public class SlackEventPublisher implements Destination<ChangeEvent> { public class SlackEventPublisher implements Destination<ChangeEvent> {
private final MessageDecorator<SlackMessage> slackMessageFormatter = new SlackMessageDecorator(); private final HandlebarsNotificationMessageEngine messageEngine;
private final Webhook webhook; private final Webhook webhook;
private final Client client; private final Client client;
@Getter private final SubscriptionDestination subscriptionDestination; @Getter private final SubscriptionDestination subscriptionDestination;
private final EventSubscription eventSubscription; private final EventSubscription eventSubscription;
@ -54,9 +58,11 @@ public class SlackEventPublisher implements Destination<ChangeEvent> {
this.eventSubscription = eventSubscription; this.eventSubscription = eventSubscription;
this.subscriptionDestination = subscriptionDest; this.subscriptionDestination = subscriptionDest;
this.webhook = JsonUtils.convertValue(subscriptionDest.getConfig(), Webhook.class); this.webhook = JsonUtils.convertValue(subscriptionDest.getConfig(), Webhook.class);
this.client = getClient(subscriptionDest.getTimeout(), subscriptionDest.getReadTimeout());
// Build Client this.messageEngine =
client = getClient(subscriptionDest.getTimeout(), subscriptionDest.getReadTimeout()); new HandlebarsNotificationMessageEngine(
(NotificationTemplateRepository)
Entity.getEntityRepository(Entity.NOTIFICATION_TEMPLATE));
} else { } else {
throw new IllegalArgumentException("Slack Alert Invoked with Illegal Type and Settings."); throw new IllegalArgumentException("Slack Alert Invoked with Illegal Type and Settings.");
} }
@ -65,14 +71,20 @@ public class SlackEventPublisher implements Destination<ChangeEvent> {
@Override @Override
public void sendMessage(ChangeEvent event) throws EventPublisherException { public void sendMessage(ChangeEvent event) throws EventPublisherException {
try { try {
SlackMessage slackMessage = // Generate message using new Handlebars pipeline
slackMessageFormatter.buildOutgoingMessage(getDisplayNameOrFqn(eventSubscription), event); NotificationMessage message =
messageEngine.generateMessage(event, eventSubscription, subscriptionDestination);
SlackMessage slackMessage = (SlackMessage) message;
// Convert to JSON and apply snake_case transformation
String json = JsonUtils.pojoToJsonIgnoreNull(slackMessage); String json = JsonUtils.pojoToJsonIgnoreNull(slackMessage);
json = convertCamelCaseToSnakeCase(json); json = convertCamelCaseToSnakeCase(json);
// Send using existing webhook utilities
List<Invocation.Builder> targets = List<Invocation.Builder> targets =
getTargetsForWebhookAlert(webhook, subscriptionDestination, client, event, json); getTargetsForWebhookAlert(webhook, subscriptionDestination, client, event, json);
targets.add(getTarget(client, webhook, json)); targets.add(getTarget(client, webhook, json));
for (Invocation.Builder actionTarget : targets) { for (Invocation.Builder actionTarget : targets) {
postWebhookMessage(this, actionTarget, json); postWebhookMessage(this, actionTarget, json);
} }
@ -89,7 +101,8 @@ public class SlackEventPublisher implements Destination<ChangeEvent> {
@Override @Override
public void sendTestMessage() throws EventPublisherException { public void sendTestMessage() throws EventPublisherException {
try { try {
SlackMessage slackMessage = slackMessageFormatter.buildOutgoingTestMessage(); // Use legacy test message (unchanged)
SlackMessage slackMessage = new SlackMessageDecorator().buildOutgoingTestMessage();
String json = JsonUtils.pojoToJsonIgnoreNull(slackMessage); String json = JsonUtils.pojoToJsonIgnoreNull(slackMessage);
json = convertCamelCaseToSnakeCase(json); json = convertCamelCaseToSnakeCase(json);
@ -102,10 +115,10 @@ public class SlackEventPublisher implements Destination<ChangeEvent> {
} }
/** /**
* Slack messages sent via webhook require some keys in snake_case, while the Slack * Slack messages sent via webhook require some keys in snake_case.
* app accepts them as they are (camelCase). Using Layout blocks (from com.slack.api.model.block) restricts control over key * Using Layout blocks (from com.slack.api.model.block) restricts control over key
* aliases within the class. * aliases within the class.
**/ */
public String convertCamelCaseToSnakeCase(String jsonString) { public String convertCamelCaseToSnakeCase(String jsonString) {
JsonNode rootNode = JsonUtils.readTree(jsonString); JsonNode rootNode = JsonUtils.readTree(jsonString);
JsonNode modifiedNode = convertKeys(rootNode); JsonNode modifiedNode = convertKeys(rootNode);
@ -127,16 +140,12 @@ public class SlackEventPublisher implements Destination<ChangeEvent> {
} else if (fieldName.equals("altText")) { } else if (fieldName.equals("altText")) {
newFieldName = "alt_text"; newFieldName = "alt_text";
} }
// Recursively convert the keys
newNode.set(newFieldName, convertKeys(objectNode.get(fieldName))); newNode.set(newFieldName, convertKeys(objectNode.get(fieldName)));
}); });
return newNode; return newNode;
} else if (node.isArray()) { } else if (node.isArray()) {
ArrayNode arrayNode = (ArrayNode) node; ArrayNode arrayNode = (ArrayNode) node;
ArrayNode newArrayNode = JsonUtils.getObjectNode().arrayNode(); ArrayNode newArrayNode = JsonUtils.getObjectNode().arrayNode();
// recursively convert elements
for (int i = 0; i < arrayNode.size(); i++) { for (int i = 0; i < arrayNode.size(); i++) {
newArrayNode.add(convertKeys(arrayNode.get(i))); newArrayNode.add(convertKeys(arrayNode.get(i)));
} }

View File

@ -271,6 +271,30 @@ public class EmailUtil {
} }
} }
/**
* Send notification email with pre-rendered HTML content.
* Used by the new Handlebars notification system.
*
* @param to Recipient email address
* @param subject Email subject
* @param htmlContent Pre-rendered HTML content (already processed by HandlebarsNotificationMessageEngine)
*/
public static void sendNotificationEmail(String to, String subject, String htmlContent) {
if (Boolean.TRUE.equals(getSmtpSettings().getEnableSmtpServer())) {
Email email =
EmailBuilder.startingBlank()
.withSubject(subject)
.to(to)
.from(getSmtpSettings().getSenderMail())
.withHTMLText(htmlContent)
.buildEmail();
sendMail(email, true);
} else {
LOG.warn(EMAIL_IGNORE_MSG, to);
}
}
public static void sendInviteMailToAdmin(User user, String password) { public static void sendInviteMailToAdmin(User user, String password) {
if (Boolean.TRUE.equals(getSmtpSettings().getEnableSmtpServer())) { if (Boolean.TRUE.equals(getSmtpSettings().getEnableSmtpServer())) {