Fix: #24100 Implementation for Change Events for Workflow Generated Changes and Manual Task Resolutions (#24108)

* Initial Implementation for Change Events for Workflow Generated Changes and Manual Task Resolutions

* Test case

* Add impersonatedBy to changeEvent schema, changeEvent generations, workflow consumer filtering, always override impersonatedBy even if not null

* Update generated TypeScript types

* Improved debug logging in WorkflowEventConsumer and tests

---------

Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
This commit is contained in:
Ram Narayan Balaji 2025-11-11 10:28:39 +05:30 committed by GitHub
parent 8514b967e4
commit 5bcf792aa4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 273 additions and 30 deletions

View File

@ -95,6 +95,7 @@ public class ChangeEventHandler implements EventHandler {
.withEntityId(changeEvent.getEntityId())
.withEntityType(changeEvent.getEntityType())
.withUserName(changeEvent.getUserName())
.withImpersonatedBy(changeEvent.getImpersonatedBy())
.withTimestamp(changeEvent.getTimestamp())
.withChangeDescription(changeEvent.getChangeDescription())
.withCurrentVersion(changeEvent.getCurrentVersion());

View File

@ -307,6 +307,7 @@ public class FormatterUtil {
? null
: entityInterface.getDomains().stream().map(EntityReference::getId).toList())
.withUserName(updateBy)
.withImpersonatedBy(entityInterface.getImpersonatedBy())
.withTimestamp(entityInterface.getUpdatedAt())
.withChangeDescription(entityInterface.getChangeDescription())
.withCurrentVersion(entityInterface.getVersion());
@ -396,6 +397,7 @@ public class FormatterUtil {
.withDomains(thread.getDomains())
.withEntityType(entityType)
.withUserName(updateBy)
.withImpersonatedBy(thread.getImpersonatedBy())
.withTimestamp(thread.getUpdatedAt());
// Include changeDescription if present

View File

@ -25,6 +25,7 @@ import org.openmetadata.service.resources.feeds.MessageParser;
@Slf4j
public class WorkflowEventConsumer implements Destination<ChangeEvent> {
public static final String GOVERNANCE_BOT = "governance-bot";
private final SubscriptionDestination subscriptionDestination;
private final EventSubscription eventSubscription;
@ -105,6 +106,18 @@ public class WorkflowEventConsumer implements Destination<ChangeEvent> {
EventType eventType = event.getEventType();
String entityType = event.getEntityType();
// Skip events from governance-bot to prevent infinite loops
// These are system-initiated workflow changes that shouldn't trigger new workflows
if (GOVERNANCE_BOT.equals(event.getUserName())
|| (event.getImpersonatedBy() != null
&& GOVERNANCE_BOT.equals(event.getImpersonatedBy()))) {
LOG.debug(
"Skipping workflow-initiated event from governance-bot for entity {} of type: {}",
event.getEntityFullyQualifiedName(),
event.getEntityType());
return;
}
if (validEventTypes.contains(eventType) && validEntityTypes.contains(entityType)) {
String signal = String.format("%s-%s", entityType, eventType.toString());

View File

@ -56,14 +56,24 @@ public class SetEntityAttributeImpl implements JavaDelegate {
}
String updatedByNamespace = (String) inputNamespaceMap.get(UPDATED_BY_VARIABLE);
String user =
String actualUser =
Optional.ofNullable(updatedByNamespace)
.map(ns -> (String) varHandler.getNamespacedVariable(ns, UPDATED_BY_VARIABLE))
.orElse("governance-bot");
.orElse(null);
// Apply the field change using shared utility
// Apply the field change using shared utility with bot impersonation
// Note: fieldValue can be null to clear/remove a field value
EntityFieldUtils.setEntityField(entity, entityType, user, fieldName, fieldValue, true);
// When actualUser is available, use it as the user and mark 'governance-bot' as impersonator
// Otherwise, use 'governance-bot' directly (for system-initiated workflows)
if (actualUser != null && !actualUser.isEmpty()) {
// User-initiated workflow: preserve actual user, mark bot as impersonator
EntityFieldUtils.setEntityField(
entity, entityType, actualUser, fieldName, fieldValue, true, "governance-bot");
} else {
// System-initiated workflow: use governance-bot directly
EntityFieldUtils.setEntityField(
entity, entityType, "governance-bot", fieldName, fieldValue, true, null);
}
} catch (Exception exc) {
LOG.error(

View File

@ -1171,9 +1171,8 @@ public abstract class EntityRepository<T extends EntityInterface> {
if (updatedBy != null) {
entity.setUpdatedBy(updatedBy);
}
if (impersonatedBy != null) {
entity.setImpersonatedBy(impersonatedBy);
}
// Always set impersonatedBy to clear it when null (regular user operations)
entity.setImpersonatedBy(impersonatedBy);
return createNewEntity(entity);
}
@ -1440,9 +1439,8 @@ public abstract class EntityRepository<T extends EntityInterface> {
T original = findByNameOrNull(updated.getFullyQualifiedName(), ALL);
if (original == null) { // If an original entity does not exist then create it, else update
if (impersonatedBy != null) {
updated.setImpersonatedBy(impersonatedBy);
}
// Always set impersonatedBy to clear it when null (regular user operations)
updated.setImpersonatedBy(impersonatedBy);
return new PutResponse<>(
Status.CREATED, withHref(uriInfo, createNewEntity(updated)), ENTITY_CREATED);
}
@ -1459,9 +1457,8 @@ public abstract class EntityRepository<T extends EntityInterface> {
UriInfo uriInfo, T updated, String updatedBy, String impersonatedBy) {
T original = findByNameOrNull(updated.getFullyQualifiedName(), ALL);
if (original == null) { // If an original entity does not exist then create it, else update
if (impersonatedBy != null) {
updated.setImpersonatedBy(impersonatedBy);
}
// Always set impersonatedBy to clear it when null (regular user operations)
updated.setImpersonatedBy(impersonatedBy);
return new PutResponse<>(
Status.CREATED, withHref(uriInfo, createNewEntity(updated)), ENTITY_CREATED);
}
@ -1600,9 +1597,8 @@ public abstract class EntityRepository<T extends EntityInterface> {
setFieldsInternal(original, putFields);
updated.setUpdatedBy(updatedBy);
updated.setUpdatedAt(System.currentTimeMillis());
if (impersonatedBy != null) {
updated.setImpersonatedBy(impersonatedBy);
}
// Always set impersonatedBy to clear it when null (regular user operations)
updated.setImpersonatedBy(impersonatedBy);
// If the entity state is soft-deleted, recursively undelete the entity and it's children
if (Boolean.TRUE.equals(original.getDeleted())) {
restoreEntity(updated.getUpdatedBy(), original.getId());
@ -1633,9 +1629,8 @@ public abstract class EntityRepository<T extends EntityInterface> {
setFieldsInternal(original, putFields);
updated.setUpdatedBy(updatedBy);
updated.setUpdatedAt(System.currentTimeMillis());
if (impersonatedBy != null) {
updated.setImpersonatedBy(impersonatedBy);
}
// Always set impersonatedBy to clear it when null (regular user operations)
updated.setImpersonatedBy(impersonatedBy);
// If the entity state is soft-deleted, recursively undelete the entity and it's children
if (Boolean.TRUE.equals(original.getDeleted())) {
restoreEntity(updated.getUpdatedBy(), original.getId());
@ -1805,10 +1800,10 @@ public abstract class EntityRepository<T extends EntityInterface> {
updated.setDomains(validatedDomains);
restorePatchAttributes(original, updated);
// Set impersonatedBy after all attribute restoration to prevent it from being overwritten
if (impersonatedBy != null) {
updated.setImpersonatedBy(impersonatedBy);
}
// Always set impersonatedBy to the passed value (which can be null)
// This ensures that when regular users make changes (impersonatedBy=null),
// any existing impersonatedBy value is cleared, preventing it from persisting
updated.setImpersonatedBy(impersonatedBy);
// Update the attributes and relationships of an entity
EntityUpdater entityUpdater;

View File

@ -526,6 +526,20 @@ public class FeedRepository {
JsonPatch patch = JsonUtils.getJsonPatch(origJson, updatedEntityJson);
EntityRepository<?> repository = threadContext.getEntityRepository();
repository.patch(null, aboutEntity.getId(), user, patch);
if (!origJson.equals(updatedEntityJson)) {
ChangeEvent changeEvent =
new ChangeEvent()
.withId(UUID.randomUUID())
.withEventType(EventType.ENTITY_UPDATED)
.withEntityId(aboutEntity.getId())
.withEntityType(threadContext.getAbout().getEntityType())
.withEntityFullyQualifiedName(aboutEntity.getFullyQualifiedName())
.withUserName(user)
.withTimestamp(System.currentTimeMillis())
.withEntity(updatedEntity);
Entity.getCollectionDAO().changeEventDAO().insert(JsonUtils.pojoToMaskedJson(changeEvent));
}
// Update the attributes
threadContext.getThread().getTask().withNewValue(resolveTask.getNewValue());

View File

@ -17,6 +17,7 @@ import jakarta.json.JsonPatch;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import lombok.extern.slf4j.Slf4j;
import org.openmetadata.schema.EntityInterface;
import org.openmetadata.schema.entity.classification.Tag;
@ -24,8 +25,10 @@ import org.openmetadata.schema.entity.data.GlossaryTerm;
import org.openmetadata.schema.entity.teams.Team;
import org.openmetadata.schema.entity.teams.User;
import org.openmetadata.schema.type.AssetCertification;
import org.openmetadata.schema.type.ChangeEvent;
import org.openmetadata.schema.type.EntityReference;
import org.openmetadata.schema.type.EntityStatus;
import org.openmetadata.schema.type.EventType;
import org.openmetadata.schema.type.Include;
import org.openmetadata.schema.type.TagLabel;
import org.openmetadata.schema.utils.JsonUtils;
@ -59,6 +62,17 @@ public class EntityFieldUtils {
String fieldName,
String fieldValue,
boolean applyPatch) {
setEntityField(entity, entityType, user, fieldName, fieldValue, applyPatch, null);
}
public static void setEntityField(
EntityInterface entity,
String entityType,
String user,
String fieldName,
String fieldValue,
boolean applyPatch,
String impersonatedBy) {
// Store original state for patch creation
String originalJson = applyPatch ? JsonUtils.pojoToJson(entity) : null;
@ -121,8 +135,23 @@ public class EntityFieldUtils {
if (applyPatch) {
String updatedJson = JsonUtils.pojoToJson(entity);
JsonPatch patch = JsonUtils.getJsonPatch(originalJson, updatedJson);
EntityRepository<?> entityRepository = Entity.getEntityRepository(entityType);
entityRepository.patch(null, entity.getId(), user, patch, null);
if (!originalJson.equals(updatedJson)) {
EntityRepository<?> entityRepository = Entity.getEntityRepository(entityType);
entityRepository.patch(null, entity.getId(), user, patch, null, impersonatedBy);
ChangeEvent changeEvent =
new ChangeEvent()
.withId(UUID.randomUUID())
.withEventType(EventType.ENTITY_UPDATED)
.withEntityId(entity.getId())
.withEntityType(entityType)
.withEntityFullyQualifiedName(entity.getFullyQualifiedName())
.withUserName(user)
.withImpersonatedBy(impersonatedBy)
.withTimestamp(System.currentTimeMillis())
.withEntity(entity);
Entity.getCollectionDAO().changeEventDAO().insert(JsonUtils.pojoToMaskedJson(changeEvent));
}
}
}

View File

@ -1277,7 +1277,7 @@ public class GlossaryResourceTest extends EntityResourceTest<Glossary, CreateGlo
}
public static void waitForTaskToBeCreated(String fullyQualifiedName) {
waitForTaskToBeCreated(fullyQualifiedName, 60000L * 2);
waitForTaskToBeCreated(fullyQualifiedName, 90000L * 2);
}
public static void waitForTaskToBeCreated(String fullyQualifiedName, long timeout) {

View File

@ -82,6 +82,7 @@ import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.TestMethodOrder;
import org.junit.jupiter.api.parallel.Execution;
import org.junit.jupiter.api.parallel.ExecutionMode;
import org.openmetadata.schema.api.CreateTaskDetails;
import org.openmetadata.schema.api.ValidateGlossaryTagsRequest;
import org.openmetadata.schema.api.classification.CreateClassification;
import org.openmetadata.schema.api.classification.CreateTag;
@ -89,6 +90,7 @@ import org.openmetadata.schema.api.data.CreateGlossary;
import org.openmetadata.schema.api.data.CreateGlossaryTerm;
import org.openmetadata.schema.api.data.CreateTable;
import org.openmetadata.schema.api.data.TermReference;
import org.openmetadata.schema.api.feed.CreateThread;
import org.openmetadata.schema.api.feed.ResolveTask;
import org.openmetadata.schema.entity.Type;
import org.openmetadata.schema.entity.classification.Classification;
@ -110,6 +112,8 @@ import org.openmetadata.schema.type.Include;
import org.openmetadata.schema.type.TagLabel;
import org.openmetadata.schema.type.TaskDetails;
import org.openmetadata.schema.type.TaskStatus;
import org.openmetadata.schema.type.TaskType;
import org.openmetadata.schema.type.ThreadType;
import org.openmetadata.schema.type.api.BulkOperationResult;
import org.openmetadata.schema.type.api.BulkResponse;
import org.openmetadata.schema.utils.JsonUtils;
@ -382,6 +386,9 @@ public class GlossaryTermResourceTest extends EntityResourceTest<GlossaryTerm, C
@Test
void test_GlossaryTermApprovalWorkflow(TestInfo test) throws IOException {
// Ensure the workflow is active (it might have been suspended by another test)
WorkflowHandler.getInstance().resumeWorkflow("GlossaryTermApprovalWorkflow");
//
// glossary1 create without reviewers is created with Approved status
//
@ -2249,7 +2256,7 @@ public class GlossaryTermResourceTest extends EntityResourceTest<GlossaryTerm, C
patchEntity(term.getId(), json, term, authHeaders(DATA_CONSUMER.getName()));
// Verify workflow task was created
boolean taskCreated = wasDetailedWorkflowTaskCreated(term.getFullyQualifiedName(), 30000L);
boolean taskCreated = wasDetailedWorkflowTaskCreated(term.getFullyQualifiedName(), 90000L);
assertTrue(taskCreated, "Workflow should be triggered when non-reviewer updates the term");
// Verify term status moved to IN_REVIEW
@ -2499,7 +2506,7 @@ public class GlossaryTermResourceTest extends EntityResourceTest<GlossaryTerm, C
patchEntity(term.getId(), json, term, ADMIN_AUTH_HEADERS);
// Verify workflow task was created
taskCreated = wasDetailedWorkflowTaskCreated(term.getFullyQualifiedName(), 30000L);
taskCreated = wasDetailedWorkflowTaskCreated(term.getFullyQualifiedName(), 90000L);
assertTrue(taskCreated, "Workflow should be triggered when AND condition is false");
// Resolve the task to complete the workflow and prevent EntityNotFoundException
@ -2638,7 +2645,7 @@ public class GlossaryTermResourceTest extends EntityResourceTest<GlossaryTerm, C
authHeaders(USER2.getName()));
// Wait for new task to be created for the update
waitForDetailedTaskToBeCreated(term.getFullyQualifiedName(), 60000L);
waitForDetailedTaskToBeCreated(term.getFullyQualifiedName(), 90000L);
// Get the new task
threads =
@ -2713,7 +2720,7 @@ public class GlossaryTermResourceTest extends EntityResourceTest<GlossaryTerm, C
authHeaders(USER2.getName()));
// Wait for detailed task to be created
waitForDetailedTaskToBeCreated(term.getFullyQualifiedName(), 60000L);
waitForDetailedTaskToBeCreated(term.getFullyQualifiedName(), 90000L);
// Get the new task
threads =
@ -3307,4 +3314,148 @@ public class GlossaryTermResourceTest extends EntityResourceTest<GlossaryTerm, C
ResultList<EntityReference> page2 = getAssets(term.getId(), 2, 2, ADMIN_AUTH_HEADERS);
assertFalse(page2.getData().isEmpty());
}
@Test
void test_WorkflowTriggerOnDescriptionApprovalByNonReviewer(TestInfo test) throws Exception {
// Test scenario:
// 1. Create a glossary term with USER1 as reviewer (should be auto-approved)
// 2. USER1 requests a description update, asking USER2 (non-reviewer) for approval
// 3. When USER2 approves, verify the workflow IS triggered:
// - Description is updated
// - Term moves to IN_REVIEW status
// - Approval task is created for USER1
try {
// Step 1: Create glossary with no reviewers
// Use simple names without special characters to avoid SQL syntax issues
String simpleName = "glossary_workflow_test_" + System.currentTimeMillis();
Glossary glossary = createGlossary(simpleName, null, null);
// Create term with USER1 as reviewer
String termName = "term_workflow_test_" + System.currentTimeMillis();
CreateGlossaryTerm createRequest =
createRequest(termName)
.withDescription("Initial description")
.withGlossary(glossary.getFullyQualifiedName())
.withReviewers(listOf(USER1.getEntityReference()));
// Create as USER1 (who is the reviewer) - should be auto-approved
GlossaryTerm term = createEntity(createRequest, authHeaders(USER1.getName()));
// Wait a bit for any workflow to process
java.lang.Thread.sleep(2000);
// Verify term is approved since creator is the reviewer
GlossaryTerm autoApprovedTerm = getEntity(term.getId(), "", authHeaders(USER1.getName()));
assertEquals(
EntityStatus.APPROVED,
autoApprovedTerm.getEntityStatus(),
"Term should be auto-approved when creator is reviewer");
// Record initial version for later comparison
double initialVersion = autoApprovedTerm.getVersion();
// Step 2: USER1 (reviewer) requests a description update, asking USER2 for approval
// Create UpdateDescription task
String newDescription = "Updated description needing approval";
String entityLink =
new MessageParser.EntityLink(Entity.GLOSSARY_TERM, term.getFullyQualifiedName())
.getLinkString();
CreateTaskDetails taskDetails =
new CreateTaskDetails()
.withType(TaskType.UpdateDescription)
.withOldValue(term.getDescription())
.withSuggestion(newDescription)
.withAssignees(List.of(USER2.getEntityReference()));
CreateThread createThread =
new CreateThread()
.withMessage("Please approve this description update")
.withFrom(USER1.getName())
.withAbout(entityLink)
.withTaskDetails(taskDetails)
.withType(ThreadType.Task);
Thread descriptionTask = taskTest.createAndCheck(createThread, authHeaders(USER1.getName()));
assertNotNull(descriptionTask);
assertEquals(TaskStatus.Open, descriptionTask.getTask().getStatus());
// Verify that USER2 can see the task
ThreadList tasks =
taskTest.listTasks(entityLink, null, null, null, 100, authHeaders(USER2.getName()));
assertTrue(
tasks.getData().stream().anyMatch(t -> t.getId().equals(descriptionTask.getId())),
"USER2 should be able to see the task");
// Step 3: USER2 (non-reviewer) approves the description update
// This should trigger a workflow because USER2 is NOT a reviewer
// USER2 resolves the task (approves the description change)
ResolveTask resolveTask = new ResolveTask().withNewValue(newDescription);
taskTest.resolveTask(
descriptionTask.getTask().getId(), resolveTask, authHeaders(USER2.getName()));
// Task resolution should have closed the description task immediately
// Wait for the ChangeEvent to be processed and workflow to trigger
java.lang.Thread.sleep(15000); // Give enough time for workflow processing
// Step 4: Verify the workflow was triggered
// When a non-reviewer (USER2) approves a change, the workflow should:
// 1. Update the description (immediate effect)
// 2. Move the term to IN_REVIEW status
// 3. Create a new approval task for the actual reviewers (USER1)
GlossaryTerm updatedTerm = getEntity(term.getId(), "", ADMIN_AUTH_HEADERS);
// Verify description was updated immediately
assertEquals(
newDescription,
updatedTerm.getDescription(),
"Description should be updated after approval");
// CRITICAL: Verify term moved to IN_REVIEW status (workflow was triggered)
assertEquals(
EntityStatus.IN_REVIEW,
updatedTerm.getEntityStatus(),
"Term MUST move to IN_REVIEW when non-reviewer approves changes - this proves workflow triggered");
// Verify version was incremented (entity was modified)
assertTrue(
updatedTerm.getVersion() > initialVersion,
"Version should be incremented after task resolution and workflow processing");
// Step 5: Verify a new approval task was created for USER1 (the reviewer)
// Wait a bit more for task creation
java.lang.Thread.sleep(5000);
// The workflow MUST create an approval task
Thread approvalTask = assertApprovalTask(term, TaskStatus.Open);
assertNotNull(approvalTask, "Workflow MUST create an approval task for the reviewer");
// Verify the task is assigned to USER1 (the reviewer)
assertTrue(
approvalTask.getTask().getAssignees().stream()
.anyMatch(a -> a.getId().equals(USER1.getEntityReference().getId())),
"The approval task MUST be assigned to USER1 (the reviewer)");
LOG.info(
"Test completed: Workflow successfully triggered when non-reviewer approved description change");
// Clean up: Resolve the approval task
try {
taskTest.resolveTask(
approvalTask.getTask().getId(),
new ResolveTask().withNewValue("Approved"),
authHeaders(USER1.getName()));
java.lang.Thread.sleep(2000);
} catch (Exception e) {
// Ignore cleanup errors
}
} finally {
// Clean up: Re-suspend the workflow to not affect other tests
WorkflowHandler.getInstance().suspendWorkflow("GlossaryTermApprovalWorkflow");
}
}
}

View File

@ -45,6 +45,10 @@
"description": "Name of the user whose activity resulted in the change.",
"type": "string"
},
"impersonatedBy": {
"description": "Bot user that performed the action on behalf of the actual user.",
"$ref": "../type/basic.json#/definitions/impersonatedBy"
},
"timestamp": {
"description": "Timestamp when the change was made in Unix epoch time milliseconds.",
"$ref": "basic.json#/definitions/timestamp"

View File

@ -105,6 +105,10 @@ export interface ChangeEvent {
* Unique identifier for the event.
*/
id: string;
/**
* Bot user that performed the action on behalf of the actual user.
*/
impersonatedBy?: string;
/**
* Change that lead to this version of the entity.
*/

View File

@ -79,6 +79,10 @@ export interface ChangeEvent {
* Unique identifier for the event.
*/
id?: string;
/**
* Bot user that performed the action on behalf of the actual user.
*/
impersonatedBy?: string;
/**
* Change that lead to this version of the entity.
*/
@ -235,6 +239,10 @@ export interface ChangeEventClass {
* Unique identifier for the event.
*/
id: string;
/**
* Bot user that performed the action on behalf of the actual user.
*/
impersonatedBy?: string;
/**
* Change that lead to this version of the entity.
*/

View File

@ -85,6 +85,10 @@ export interface ChangeEvent {
* Unique identifier for the event.
*/
id: string;
/**
* Bot user that performed the action on behalf of the actual user.
*/
impersonatedBy?: string;
/**
* Change that lead to this version of the entity.
*/

View File

@ -85,6 +85,10 @@ export interface ChangeEvent {
* Unique identifier for the event.
*/
id: string;
/**
* Bot user that performed the action on behalf of the actual user.
*/
impersonatedBy?: string;
/**
* Change that lead to this version of the entity.
*/

View File

@ -57,6 +57,10 @@ export interface ChangeEvent {
* Unique identifier for the event.
*/
id: string;
/**
* Bot user that performed the action on behalf of the actual user.
*/
impersonatedBy?: string;
/**
* Change that lead to this version of the entity.
*/