From cdd144b5048a383914cd30302bd7d4cd269aa12c Mon Sep 17 00:00:00 2001 From: Ram Narayan Balaji Date: Wed, 20 Aug 2025 21:14:05 +0530 Subject: [PATCH] Adding DataCompleteness Task Node, Flowable Debug logs --- GOVERNANCE_WORKFLOWS_ARCHITECTURE.md | 1873 +++++++++++++++++ WORKFLOW_SOLUTION_ARCHITECTURE.md | 98 + .../governance/workflows/Workflow.java | 42 +- .../workflows/WorkflowFailureListener.java | 2 +- .../governance/workflows/WorkflowHandler.java | 80 +- .../workflows/WorkflowVariableHandler.java | 54 +- .../governance/workflows/elements/Edge.java | 27 +- .../workflows/elements/NodeFactory.java | 4 + .../automatedTask/DataCompletenessTask.java | 129 ++ .../impl/CheckEntityAttributesImpl.java | 14 +- .../impl/DataCompletenessImpl.java | 229 ++ .../service/rules/JsonLogicUtils.java | 43 - .../openmetadata/service/rules/LogicOps.java | 19 +- .../workflows/elements/nodeSubType.json | 1 + .../automatedTask/dataCompletenessTask.json | 131 ++ 15 files changed, 2653 insertions(+), 93 deletions(-) create mode 100644 GOVERNANCE_WORKFLOWS_ARCHITECTURE.md create mode 100644 WORKFLOW_SOLUTION_ARCHITECTURE.md create mode 100644 openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/DataCompletenessTask.java create mode 100644 openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/DataCompletenessImpl.java create mode 100644 openmetadata-spec/src/main/resources/json/schema/governance/workflows/elements/nodes/automatedTask/dataCompletenessTask.json diff --git a/GOVERNANCE_WORKFLOWS_ARCHITECTURE.md b/GOVERNANCE_WORKFLOWS_ARCHITECTURE.md new file mode 100644 index 00000000000..124c6eac9df --- /dev/null +++ b/GOVERNANCE_WORKFLOWS_ARCHITECTURE.md @@ -0,0 +1,1873 @@ +# OpenMetadata Governance Workflows - Architecture & Implementation Guide + +## Table of Contents +1. [Architecture Overview](#architecture-overview) +2. [Core Components](#core-components) +3. [Implementation Details](#implementation-details) +4. [Workflow Triggers](#workflow-triggers) +5. [Workflow Nodes Catalog](#workflow-nodes-catalog) +6. [Custom Workflow Examples](#custom-workflow-examples) +7. [Best Practices](#best-practices) + +--- + +## Architecture Overview + +### System Architecture + +``` +┌─────────────────────────────────────────────────────────────────┐ +│ OpenMetadata Platform │ +├─────────────────────────────────────────────────────────────────┤ +│ │ +│ ┌──────────────────┐ ┌──────────────────┐ │ +│ │ REST API Layer │────▶│ Workflow Service │ │ +│ └──────────────────┘ └──────────────────┘ │ +│ │ │ │ +│ ▼ ▼ │ +│ ┌──────────────────┐ ┌──────────────────┐ │ +│ │ Entity Repository│ │ Workflow Handler │ │ +│ └──────────────────┘ └──────────────────┘ │ +│ │ │ │ +│ ▼ ▼ │ +│ ┌──────────────────────────────────────────┐ │ +│ │ WorkflowTransactionManager │ │ +│ │ (Atomic Operations & Deployment) │ │ +│ └──────────────────────────────────────────┘ │ +│ │ │ +│ ▼ │ +│ ┌──────────────────────────────────────────┐ │ +│ │ Flowable BPMN Engine │ │ +│ │ ┌────────────┐ ┌──────────────┐ │ │ +│ │ │Process Def │ │Runtime Engine│ │ │ +│ │ └────────────┘ └──────────────┘ │ │ +│ └──────────────────────────────────────────┘ │ +│ │ +└─────────────────────────────────────────────────────────────────┘ +``` + +### Key Architectural Components + +1. **WorkflowTransactionManager**: Ensures atomic operations between OpenMetadata DB and Flowable engine +2. **WorkflowHandler**: Manages workflow lifecycle and validation +3. **WorkflowIdEncoder**: Base64 encoding for safe Flowable IDs (replacing dangerous truncation) +4. **Trigger System**: Event-based and Periodic batch processing +5. **Node System**: Modular, reusable workflow components + +--- + +## Core Components + +### 1. Transaction Management + +```java +// WorkflowTransactionManager ensures atomicity +public class WorkflowTransactionManager { + // Atomic store and deploy + public WorkflowDefinition storeAndDeployWorkflowDefinition( + WorkflowDefinition entity, boolean update) { + // 1. Validate with Flowable + // 2. Store in OpenMetadata DB + // 3. Deploy to Flowable + // 4. Rollback on failure + } + + // Atomic update and redeploy + public WorkflowDefinition updateAndRedeployWorkflowDefinition( + WorkflowDefinition original, WorkflowDefinition updated) { + // 1. Delete old deployment + // 2. Update in DB + // 3. Deploy new version + // 4. Restore original on failure + } +} +``` + +### 2. ID Encoding System + +```java +// WorkflowIdEncoder - Safe ID generation +public class WorkflowIdEncoder { + private static final int MAX_FLOWABLE_ID_LENGTH = 255; + + public static String generateSafeFlowableId(String... parts) { + String combined = String.join("-", parts); + String encoded = Base64.getUrlEncoder() + .withoutPadding() + .encodeToString(combined.getBytes(StandardCharsets.UTF_8)); + + if (encoded.length() > MAX_FLOWABLE_ID_LENGTH) { + // Use hash for extremely long IDs + String hash = generateHash(combined); + return "id_" + hash; + } + return encoded; + } +} +``` + +--- + +## Implementation Details + +### New Implementations + +#### 1. RollbackEntityImpl +Rolls back entities to their previous approved state without full rejection. + +```java +@Slf4j +public class RollbackEntityImpl implements JavaDelegate { + @Override + public void execute(DelegateExecution execution) { + // 1. Get entity reference + // 2. Find previous approved version + // 3. Create JSON patch to rollback + // 4. Apply patch with proper attribution + // 5. Log rollback action + } + + private Double getPreviousApprovedVersion( + EntityInterface entity, + EntityRepository repository, + String rollbackToStatus) { + // Iterate through entity history + // Find last version with approved status + // Return version number for rollback + } +} +``` + +#### 2. DeprecateStaleEntityImpl +Marks entities as deprecated based on staleness criteria. + +```java +@Slf4j +public class DeprecateStaleEntityImpl implements JavaDelegate { + @Override + public void execute(DelegateExecution execution) { + // Handle both individual entities and batch processing + Object relatedEntity = execution.getVariable(RELATED_ENTITY_VARIABLE); + + if (relatedEntity instanceof String) { + // Entity link from periodic batch trigger + MessageParser.EntityLink entityLink = + MessageParser.EntityLink.parse((String) relatedEntity); + } else if (relatedEntity instanceof Map) { + // Entity reference from event trigger + EntityReference ref = JsonUtils.convertValue( + relatedEntity, EntityReference.class); + } + + // Check staleness and deprecate if needed + if (isEntityStale(entity, thresholdTimestamp)) { + deprecateEntity(repository, entity, message, user); + } + } +} +``` + +--- + +## Workflow Triggers + +### 1. EventBasedEntityTrigger + +Triggers workflows based on entity lifecycle events. + +```yaml +Configuration: + entityType: "Table" + events: ["entityCreated", "entityUpdated", "entityDeleted"] + filters: "database.name:production" +``` + +**Architecture:** +``` +Entity Event → Event Listener → Filter Check → Workflow Execution +``` + +**Use Cases:** +- Auto-approval workflows for new entities +- Data quality checks on entity updates +- Cascade operations on entity deletion + +### 2. PeriodicBatchEntityTrigger + +Processes entities in batches on a schedule. + +```yaml +Configuration: + entityTypes: ["Table", "Dashboard", "Pipeline"] + schedule: + scheduleType: "Daily" + hour: 2 + batchSize: 100 + filters: "owner.name:null" +``` + +**Architecture:** +``` +Timer → Fetch Entities → Batch Processing → Multi-Instance Workflow +``` + +**Key Features:** +- Efficient batch processing with configurable size +- Multi-instance parallel execution +- Automatic pagination for large datasets + +--- + +## Workflow Nodes Catalog + +### Automated Task Nodes + +#### 1. **SetEntityAttributeTask** +Sets attributes on entities programmatically. + +**Configuration:** +```json +{ + "attribute": "tier", + "value": "Tier.Gold" +} +``` +**Outputs:** `result` (success/failure) +**Use Cases:** Standardize metadata, set owners, apply tags + +#### 2. **ConditionalSetEntityAttributeTask** +Sets attributes based on conditions. + +**Configuration:** +```json +{ + "conditions": [ + { + "condition": "${dataQualityScore > 0.9}", + "attribute": "tier", + "value": "Tier.Gold" + } + ] +} +``` +**Outputs:** `result` (success/failure) +**Use Cases:** Dynamic tier assignment, conditional tagging + +#### 3. **CheckEntityAttributesTask** +Validates entity attributes against JsonLogic rules. + +**Configuration:** +```json +{ + "rules": "{\"and\": [{\"!=\": [{\"var\": \"owner\"}, null]}, {\">=\": [{\"length\": {\"var\": \"description\"}}, 10]}]}" +} +``` +**Outputs:** `result` (success/failure based on rule evaluation) +**Use Cases:** Enforce governance policies, validate metadata completeness + +#### 4. **DataCompletenessTask** (NEW) +Evaluates entity data completeness with flexible quality bands. + +**Configuration:** +```json +{ + "fieldsToCheck": ["name", "description", "owner", "tags", "columns[].description"], + "qualityBands": [ + {"name": "gold", "minimumScore": 85}, + {"name": "silver", "minimumScore": 60}, + {"name": "bronze", "minimumScore": 30}, + {"name": "unacceptable", "minimumScore": 0} + ], + "treatEmptyStringAsNull": true, + "treatEmptyArrayAsNull": true +} +``` +**Outputs:** +- `completenessScore` (numeric percentage) +- `qualityBand` (band name like "gold") +- `filledFieldsCount` (number) +- `totalFieldsCount` (number) +- `missingFields` (array of field names) +- `filledFields` (array of field names) +- `result` (quality band name for routing) + +**Use Cases:** +- Data quality gates before production +- Automated certification based on completeness +- Multi-tier approval workflows based on data quality + +**Example Workflow Usage:** +```json +{ + "nodes": [ + { + "name": "checkDataQuality", + "type": "automatedTask", + "subType": "dataCompletenessTask", + "config": { + "fieldsToCheck": ["description", "owner", "tags"], + "qualityBands": [ + {"name": "auto-approve", "minimumScore": 90}, + {"name": "review", "minimumScore": 70}, + {"name": "enrich", "minimumScore": 50}, + {"name": "reject", "minimumScore": 0} + ] + } + } + ], + "edges": [ + { + "from": "checkDataQuality", + "to": "autoApprove", + "condition": "auto-approve" + }, + { + "from": "checkDataQuality", + "to": "manualReview", + "condition": "review" + }, + { + "from": "checkDataQuality", + "to": "requestEnrichment", + "condition": "enrich" + }, + { + "from": "checkDataQuality", + "to": "reject", + "condition": "reject" + } + ] +} +``` + +#### 5. **SetGlossaryTermStatusTask** +Manages glossary term lifecycle. + +**Configuration:** +```json +{ + "status": "Approved" +} +``` +**States:** Draft → Approved → Deprecated +**Outputs:** `result` (success/failure) + +#### 6. **SetEntityCertificationTask** +Manages entity certification status. + +**Configuration:** +```json +{ + "certification": "Gold", + "message": "Certified after quality checks" +} +``` +**Levels:** Bronze → Silver → Gold +**Outputs:** `result` (success/failure) + +#### 7. **RollbackEntityTask** +Rollback entity to previous version. + +**Configuration:** +```json +{ + "versionToRestore": "previousVersion" +} +``` +**Outputs:** `result` (success/failure) +**Use Cases:** Undo changes without rejection, revert to last approved state + +#### 8. **CreateAndRunIngestionPipelineTask** +Creates and executes ingestion pipelines. + +**Configuration:** +```json +{ + "pipelineType": "profiler", + "config": { + "generateSampleData": true + } +} +``` +**Outputs:** `pipelineId`, `result` +**Use Cases:** Auto-profile new tables, schedule data quality scans + +#### 9. **RunAppTask** +Executes OpenMetadata applications. + +**Configuration:** +```json +{ + "appName": "DataQualityApp", + "parameters": { + "testSuite": "critical-tests" + } +} +``` +**Outputs:** `appExecutionId`, `result` + +### User Task Nodes + +#### 1. **UserApprovalTask** +Human approval step in workflow. + +**Purpose:** Manual review and approval +**Features:** +- Assignee management +- Approval/rejection with comments +- Delegation support + +### Control Flow Nodes + +#### 1. **StartEvent** +Entry point of workflow. + +#### 2. **EndEvent** +Termination point of workflow. + +#### 3. **ParallelGateway** +Fork/join parallel execution paths. + +--- + +## Custom Workflow Examples + +### Example 1: Data Asset Lifecycle Management +**Trigger:** EventBasedEntityTrigger (Table creation) + +```json +{ + "name": "tableLifecycleManagement", + "displayName": "Table Lifecycle Management", + "description": "Automated workflow for managing table lifecycle from creation to production", + "type": "AUTOMATED", + "trigger": { + "triggerType": "eventBasedEntityTrigger", + "config": { + "entityType": "table", + "events": ["entityCreated"], + "filters": "database.name:production" + } + } + + "nodes": [ + { + "name": "startEvent", + "displayName": "Start", + "nodeType": "startEvent", + "config": {} + }, + { + "name": "validateMetadata", + "displayName": "Validate Metadata", + "nodeType": "checkEntityAttributesTask", + "config": { + "checks": [ + { + "attribute": "description", + "required": true, + "minLength": 10 + }, + { + "attribute": "owner", + "required": true + }, + { + "attribute": "tags", + "required": false + } + ] + } + }, + { + "name": "setInitialTier", + "displayName": "Set Initial Tier", + "nodeType": "setEntityAttributeTask", + "config": { + "attribute": "tier", + "value": "Tier.Bronze" + } + }, + { + "name": "approveProduction", + "displayName": "Approve for Production", + "nodeType": "userApprovalTask", + "config": { + "assignees": ["data-steward-team"], + "description": "Review and approve table for production use" + } + }, + { + "name": "promoteToSilver", + "displayName": "Promote to Silver Tier", + "nodeType": "conditionalSetEntityAttributeTask", + "config": { + "conditions": [ + { + "condition": "${approved == true}", + "attribute": "tier", + "value": "Tier.Silver" + } + ] + } + }, + { + "name": "endEvent", + "displayName": "End", + "nodeType": "endEvent", + "config": {} + } + ], + "edges": [ + {"from": "startEvent", "to": "validateMetadata"}, + {"from": "validateMetadata", "to": "setInitialTier"}, + {"from": "setInitialTier", "to": "approveProduction"}, + {"from": "approveProduction", "to": "promoteToSilver"}, + {"from": "promoteToSilver", "to": "endEvent"} + ] +} +``` + +**Flow Explanation:** +1. New table triggers workflow +2. Validates required metadata +3. Parallel: Sets initial tier + Runs profiling +4. Waits for human approval +5. Promotes to Silver tier if approved + +### Example 2: Stale Entity Deprecation +**Trigger:** PeriodicBatchEntityTrigger (Daily) + +```json +{ + "name": "deprecateStaleAssets", + "displayName": "Deprecate Stale Assets", + "description": "Automatically deprecate entities that haven't been updated in 90 days", + "type": "AUTOMATED", + "trigger": { + "triggerType": "periodicBatchEntityTrigger", + "config": { + "entityTypes": ["table", "dashboard"], + "schedule": { + "scheduleType": "Daily", + "hour": 2, + "minute": 0 + }, + "batchSize": 50, + "filters": "tier.tagFQN:Tier.Bronze" + } + } + + "nodes": [ + { + "name": "startEvent", + "displayName": "Start", + "nodeType": "startEvent", + "config": {} + }, + { + "name": "checkAndDeprecate", + "displayName": "Check and Deprecate Stale Entities", + "nodeType": "deprecateStaleEntityTask", + "config": { + "staleDays": 90, + "deprecationMessage": "Auto-deprecated due to 90 days of inactivity" + } + }, + { + "name": "endEvent", + "displayName": "End", + "nodeType": "endEvent", + "config": {} + } + ], + "edges": [ + {"from": "startEvent", "to": "checkAndDeprecate"}, + {"from": "checkAndDeprecate", "to": "endEvent"} + ] +} +``` + +**Flow Explanation:** +1. Runs daily at 2 AM +2. Fetches Bronze tier entities in batches +3. Checks last update > 90 days +4. Marks as deprecated +5. Notifies owners via email + +### Example 3: Glossary Term Approval Workflow +**Trigger:** EventBasedEntityTrigger (GlossaryTerm creation) + +```json +{ + "name": "glossaryTermGovernance", + "displayName": "Glossary Term Governance", + "description": "Approval workflow for business glossary terms", + "type": "APPROVAL", + "trigger": { + "triggerType": "eventBasedEntityTrigger", + "config": { + "entityType": "glossaryTerm", + "events": ["entityCreated", "entityUpdated"] + } + } + + "nodes": [ + { + "name": "startEvent", + "displayName": "Start", + "nodeType": "startEvent", + "config": {} + }, + { + "name": "setDraftStatus", + "displayName": "Set Draft Status", + "nodeType": "setGlossaryTermStatusTask", + "config": { + "status": "Draft" + } + }, + { + "name": "validateTerm", + "displayName": "Validate Term Definition", + "nodeType": "checkEntityAttributesTask", + "config": { + "checks": [ + { + "attribute": "definition", + "required": true, + "minLength": 50 + }, + { + "attribute": "relatedTerms", + "required": false + }, + { + "attribute": "synonyms", + "required": false + } + ] + } + }, + { + "name": "businessApproval", + "displayName": "Business Team Approval", + "nodeType": "userApprovalTask", + "config": { + "assignees": ["business-glossary-team"], + "description": "Review and approve glossary term definition" + } + }, + { + "name": "setApprovedStatus", + "displayName": "Set Approved Status", + "nodeType": "setGlossaryTermStatusTask", + "config": { + "status": "Approved" + } + }, + { + "name": "rollbackToDraft", + "displayName": "Rollback to Draft", + "nodeType": "rollbackEntityTask", + "config": { + "rollbackToStatus": "Draft" + } + }, + { + "name": "endEvent", + "displayName": "End", + "nodeType": "endEvent", + "config": {} + } + ], + "edges": [ + {"from": "startEvent", "to": "setDraftStatus"}, + {"from": "setDraftStatus", "to": "validateTerm"}, + {"from": "validateTerm", "to": "businessApproval"}, + { + "from": "businessApproval", + "to": "setApprovedStatus", + "condition": "${approved == true}" + }, + { + "from": "businessApproval", + "to": "rollbackToDraft", + "condition": "${approved == false}" + }, + {"from": "setApprovedStatus", "to": "endEvent"}, + {"from": "rollbackToDraft", "to": "endEvent"} + ] +} +``` + +**Flow Explanation:** +1. New/updated glossary term triggers workflow +2. Sets status to Draft +3. Validates definition quality +4. Routes to business team for approval +5. Approves or rolls back based on decision + +### Example 4: Data Quality Certification Pipeline +**Trigger:** PeriodicBatchEntityTrigger (Weekly) + +```json +{ + "name": "dataQualityCertification", + "displayName": "Data Quality Certification", + "description": "Weekly data quality checks and certification for analytics tables", + "type": "AUTOMATED", + "trigger": { + "triggerType": "periodicBatchEntityTrigger", + "config": { + "entityType": "table", + "schedule": { + "scheduleType": "Weekly", + "dayOfWeek": 1, + "hour": 3, + "minute": 0 + }, + "filters": "database.name:analytics AND tier.tagFQN:Tier.Silver", + "batchSize": 25 + } + } + + "nodes": [ + { + "name": "startEvent", + "displayName": "Start", + "nodeType": "startEvent", + "config": {} + }, + { + "name": "checkQualityMetrics", + "displayName": "Check Quality Metrics", + "nodeType": "checkEntityAttributesTask", + "config": { + "checks": [ + { + "attribute": "dataQualityScore", + "required": true, + "customCheck": "${dataQualityScore > 0.95}" + } + ] + } + }, + { + "name": "certifyGold", + "displayName": "Certify as Gold", + "nodeType": "setEntityCertificationTask", + "config": { + "certification": "Gold", + "message": "Certified Gold tier based on quality checks" + } + }, + { + "name": "addQualityTags", + "displayName": "Add Quality Tags", + "nodeType": "setEntityAttributeTask", + "config": { + "attribute": "tags", + "value": ["high-quality", "certified", "gold-tier"] + } + }, + { + "name": "endEvent", + "displayName": "End", + "nodeType": "endEvent", + "config": {} + } + ], + "edges": [ + {"from": "startEvent", "to": "checkQualityMetrics"}, + {"from": "checkQualityMetrics", "to": "certifyGold"}, + {"from": "certifyGold", "to": "addQualityTags"}, + {"from": "addQualityTags", "to": "endEvent"} + ] +} +``` + +**Flow Explanation:** +1. Weekly scan of Silver tier analytics tables +2. Runs comprehensive quality checks +3. Evaluates quality score +4. Parallel: Certifies as Gold + Adds quality tags +5. Updates metrics dashboard + +### Example 5: Incident Response Workflow +**Trigger:** EventBasedEntityTrigger (Incident creation) + +```json +{ + "name": "dataIncidentResponse", + "displayName": "Data Incident Response", + "description": "Automated incident response workflow for data quality issues", + "type": "AUTOMATED", + "trigger": { + "triggerType": "eventBasedEntityTrigger", + "config": { + "entityType": "table", + "events": ["entityUpdated"], + "filters": "tags.tagFQN:incident-reported" + } + } + + "nodes": [ + { + "name": "startEvent", + "displayName": "Start", + "nodeType": "startEvent", + "config": {} + }, + { + "name": "markIncident", + "displayName": "Mark Incident Active", + "nodeType": "setEntityAttributeTask", + "config": { + "attribute": "tags", + "value": ["incident-active", "under-investigation"] + } + }, + { + "name": "triageIncident", + "displayName": "Triage Incident", + "nodeType": "userApprovalTask", + "config": { + "assignees": ["data-ops-team"], + "description": "Triage data incident and determine action" + } + }, + { + "name": "rollbackIfNeeded", + "displayName": "Rollback if Needed", + "nodeType": "rollbackEntityTask", + "config": { + "rollbackToStatus": "Approved" + } + }, + { + "name": "deprecateIfCorrupted", + "displayName": "Deprecate if Corrupted", + "nodeType": "deprecateStaleEntityTask", + "config": { + "staleDays": 0, + "deprecationMessage": "Deprecated due to data corruption incident" + } + }, + { + "name": "resolveIncident", + "displayName": "Resolve Incident", + "nodeType": "setEntityAttributeTask", + "config": { + "attribute": "tags", + "value": ["incident-resolved"] + } + }, + { + "name": "endEvent", + "displayName": "End", + "nodeType": "endEvent", + "config": {} + } + ], + "edges": [ + {"from": "startEvent", "to": "markIncident"}, + {"from": "markIncident", "to": "triageIncident"}, + { + "from": "triageIncident", + "to": "rollbackIfNeeded", + "condition": "${requiresRollback == true}" + }, + { + "from": "triageIncident", + "to": "deprecateIfCorrupted", + "condition": "${dataCorrupted == true}" + }, + { + "from": "triageIncident", + "to": "resolveIncident", + "condition": "${resolved == true}" + }, + {"from": "rollbackIfNeeded", "to": "resolveIncident"}, + {"from": "deprecateIfCorrupted", "to": "resolveIncident"}, + {"from": "resolveIncident", "to": "endEvent"} + ] +} +``` + +**Flow Explanation:** +1. Incident reported on table +2. Tags entity with incident markers +3. Routes to ops team for triage +4. Notifies based on priority +5. Rolls back if needed +6. Deprecates if data corrupted +7. Marks incident as resolved + +### Example 6: Complete Data Governance Pipeline +**Trigger:** EventBasedEntityTrigger for new tables + +```json +{ + "name": "enterpriseDataGovernance", + "displayName": "Enterprise Data Governance Pipeline", + "description": "Comprehensive governance workflow combining multiple checks and certifications", + "type": "AUTOMATED", + "trigger": { + "triggerType": "eventBasedEntityTrigger", + "config": { + "entityType": "table", + "events": ["entityCreated", "entityUpdated"], + "filters": "database.name:enterprise" + } + } + + "nodes": [ + { + "name": "startEvent", + "displayName": "Start", + "nodeType": "startEvent", + "config": {} + }, + { + "name": "validateMetadata", + "displayName": "Validate Required Metadata", + "nodeType": "checkEntityAttributesTask", + "config": { + "checks": [ + { + "attribute": "description", + "required": true, + "minLength": 20 + }, + { + "attribute": "owner", + "required": true + }, + { + "attribute": "tier", + "required": true + } + ] + } + }, + { + "name": "classifyData", + "displayName": "Classify Data Sensitivity", + "nodeType": "conditionalSetEntityAttributeTask", + "config": { + "conditions": [ + { + "condition": "${contains(tags, 'PII')}", + "attribute": "tags", + "value": ["data-classification:restricted"] + }, + { + "condition": "${contains(tags, 'financial')}", + "attribute": "tags", + "value": ["data-classification:confidential"] + }, + { + "condition": "${true}", + "attribute": "tags", + "value": ["data-classification:internal"] + } + ] + } + }, + { + "name": "applyRetentionPolicy", + "displayName": "Apply Retention Policy", + "nodeType": "setEntityAttributeTask", + "config": { + "attribute": "extension", + "value": { + "retentionPeriod": "7years", + "deletionDate": "2031-12-31" + } + } + }, + { + "name": "dataQualityCheck", + "displayName": "Data Quality Assessment", + "nodeType": "checkEntityAttributesTask", + "config": { + "checks": [ + { + "attribute": "columns", + "required": true, + "customCheck": "${columns.length > 0}" + } + ] + } + }, + { + "name": "technicalApproval", + "displayName": "Technical Team Approval", + "nodeType": "userApprovalTask", + "config": { + "assignees": ["data-engineering-team"], + "description": "Review technical specifications and data quality" + } + }, + { + "name": "businessApproval", + "displayName": "Business Team Approval", + "nodeType": "userApprovalTask", + "config": { + "assignees": ["business-team"], + "description": "Validate business logic and data definitions" + } + }, + { + "name": "certifyForProduction", + "displayName": "Certify for Production", + "nodeType": "setEntityCertificationTask", + "config": { + "certification": "Production-Ready", + "message": "Certified for production use after comprehensive review" + } + }, + { + "name": "promoteToGold", + "displayName": "Promote to Gold Tier", + "nodeType": "setEntityAttributeTask", + "config": { + "attribute": "tier", + "value": "Tier.Gold" + } + }, + { + "name": "addGovernanceTags", + "displayName": "Add Governance Tags", + "nodeType": "setEntityAttributeTask", + "config": { + "attribute": "tags", + "value": [ + "governance:approved", + "quality:verified", + "compliance:checked" + ] + } + }, + { + "name": "endEvent", + "displayName": "End", + "nodeType": "endEvent", + "config": {} + } + ], + "edges": [ + {"from": "startEvent", "to": "validateMetadata"}, + {"from": "validateMetadata", "to": "classifyData"}, + {"from": "classifyData", "to": "applyRetentionPolicy"}, + {"from": "applyRetentionPolicy", "to": "dataQualityCheck"}, + {"from": "dataQualityCheck", "to": "technicalApproval"}, + {"from": "technicalApproval", "to": "businessApproval"}, + { + "from": "businessApproval", + "to": "certifyForProduction", + "condition": "${approved == true}" + }, + {"from": "certifyForProduction", "to": "promoteToGold"}, + {"from": "promoteToGold", "to": "addGovernanceTags"}, + {"from": "addGovernanceTags", "to": "endEvent"}, + { + "from": "businessApproval", + "to": "endEvent", + "condition": "${approved == false}" + } + ] +} +``` + +**Flow Explanation:** +1. Triggered by new or updated enterprise tables +2. Validates required metadata (description, owner, tier) +3. Classifies data sensitivity based on tags +4. Applies retention policy based on classification +5. Performs data quality assessment +6. Routes through technical and business approvals +7. Certifies for production and promotes to Gold tier +8. Adds governance tags to indicate completion + +### Example 7: Periodic Batch Cleanup +**Trigger:** PeriodicBatchEntityTrigger (Monthly) + +```json +{ + "name": "monthlyDataCleanup", + "displayName": "Monthly Data Cleanup", + "description": "Monthly cleanup of test and temporary tables", + "type": "AUTOMATED", + "trigger": { + "triggerType": "periodicBatchEntityTrigger", + "config": { + "entityTypes": ["table", "dashboard", "pipeline"], + "schedule": { + "scheduleType": "Monthly", + "dayOfMonth": 1, + "hour": 1, + "minute": 0 + }, + "batchSize": 100, + "filters": "tags.tagFQN:temporary OR tags.tagFQN:test" + } + }, + "nodes": [ + { + "name": "startEvent", + "displayName": "Start", + "nodeType": "startEvent", + "config": {} + }, + { + "name": "checkAge", + "displayName": "Check Entity Age", + "nodeType": "checkEntityAttributesTask", + "config": { + "checks": [ + { + "attribute": "updatedAt", + "customCheck": "${(currentTime - updatedAt) > 2592000000}" + } + ] + } + }, + { + "name": "deprecateOldEntities", + "displayName": "Deprecate Old Test Entities", + "nodeType": "deprecateStaleEntityTask", + "config": { + "staleDays": 30, + "deprecationMessage": "Deprecated: Test/temporary entity older than 30 days" + } + }, + { + "name": "tagForDeletion", + "displayName": "Tag for Future Deletion", + "nodeType": "setEntityAttributeTask", + "config": { + "attribute": "tags", + "value": ["pending-deletion", "cleanup-scheduled"] + } + }, + { + "name": "endEvent", + "displayName": "End", + "nodeType": "endEvent", + "config": {} + } + ], + "edges": [ + {"from": "startEvent", "to": "checkAge"}, + {"from": "checkAge", "to": "deprecateOldEntities"}, + {"from": "deprecateOldEntities", "to": "tagForDeletion"}, + {"from": "tagForDeletion", "to": "endEvent"} + ] +} +``` + +**Flow Explanation:** +1. Runs monthly on the 1st at 1 AM +2. Fetches all entities tagged as temporary or test +3. Checks if they're older than 30 days +4. Deprecates old test entities +5. Tags them for future deletion + +### Example 8: Multi-Stage Approval with Rollback +**Trigger:** EventBasedEntityTrigger (Pipeline changes) + +```json +{ + "name": "pipelineChangeApproval", + "displayName": "Pipeline Change Approval", + "description": "Multi-stage approval for critical pipeline changes with rollback capability", + "type": "APPROVAL", + "trigger": { + "triggerType": "eventBasedEntityTrigger", + "config": { + "entityType": "pipeline", + "events": ["entityUpdated"], + "filters": "tags.tagFQN:critical" + } + }, + "nodes": [ + { + "name": "startEvent", + "displayName": "Start", + "nodeType": "startEvent", + "config": {} + }, + { + "name": "freezePipeline", + "displayName": "Freeze Pipeline Execution", + "nodeType": "setEntityAttributeTask", + "config": { + "attribute": "tags", + "value": ["execution:frozen", "approval:pending"] + } + }, + { + "name": "technicalReview", + "displayName": "Technical Review", + "nodeType": "userApprovalTask", + "config": { + "assignees": ["pipeline-engineering-team"], + "description": "Review technical changes to critical pipeline" + } + }, + { + "name": "securityReview", + "displayName": "Security Review", + "nodeType": "userApprovalTask", + "config": { + "assignees": ["security-team"], + "description": "Review security implications of pipeline changes" + } + }, + { + "name": "finalApproval", + "displayName": "Management Approval", + "nodeType": "userApprovalTask", + "config": { + "assignees": ["data-management"], + "description": "Final approval for critical pipeline changes" + } + }, + { + "name": "applyChanges", + "displayName": "Apply Approved Changes", + "nodeType": "setEntityAttributeTask", + "config": { + "attribute": "tags", + "value": ["execution:active", "approval:completed", "changes:applied"] + } + }, + { + "name": "rollbackChanges", + "displayName": "Rollback Changes", + "nodeType": "rollbackEntityTask", + "config": { + "rollbackToStatus": "Approved" + } + }, + { + "name": "notifyRollback", + "displayName": "Tag as Rolled Back", + "nodeType": "setEntityAttributeTask", + "config": { + "attribute": "tags", + "value": ["execution:active", "changes:rolled-back"] + } + }, + { + "name": "endEvent", + "displayName": "End", + "nodeType": "endEvent", + "config": {} + } + ], + "edges": [ + {"from": "startEvent", "to": "freezePipeline"}, + {"from": "freezePipeline", "to": "technicalReview"}, + { + "from": "technicalReview", + "to": "securityReview", + "condition": "${approved == true}" + }, + { + "from": "securityReview", + "to": "finalApproval", + "condition": "${approved == true}" + }, + { + "from": "finalApproval", + "to": "applyChanges", + "condition": "${approved == true}" + }, + {"from": "applyChanges", "to": "endEvent"}, + { + "from": "technicalReview", + "to": "rollbackChanges", + "condition": "${approved == false}" + }, + { + "from": "securityReview", + "to": "rollbackChanges", + "condition": "${approved == false}" + }, + { + "from": "finalApproval", + "to": "rollbackChanges", + "condition": "${approved == false}" + }, + {"from": "rollbackChanges", "to": "notifyRollback"}, + {"from": "notifyRollback", "to": "endEvent"} + ] +} +``` + +**Flow Explanation:** +1. Triggered when critical pipelines are updated +2. Immediately freezes pipeline execution +3. Goes through 3-stage approval (technical, security, management) +4. If any stage rejects, rolls back to previous approved version +5. If all approve, applies changes and reactivates pipeline +6. Tags entity with appropriate status throughout + +--- + +## Best Practices + +### 1. Workflow Design Principles + +- **Idempotency**: Ensure workflows can be safely re-run +- **Error Handling**: Use boundary events for exception handling +- **Atomicity**: Leverage WorkflowTransactionManager for critical operations +- **Modularity**: Create reusable node configurations +- **Observability**: Log key decision points and state changes + +### 2. Performance Optimization + +```java +// Batch Processing Best Practices +PeriodicBatchEntityTrigger: + - Keep batchSize between 25-100 for optimal performance + - Use filters to reduce initial dataset + - Implement pagination for large result sets + - Consider parallel processing for independent operations +``` + +### 3. Security Considerations + +- **Authentication**: All workflows run with service account privileges +- **Authorization**: Validate user permissions in approval tasks +- **Audit Trail**: All workflow actions are logged with attribution +- **Data Privacy**: Mask sensitive data in logs and notifications + +### 4. Testing Strategy + +```java +// Unit Testing +- Test individual node implementations +- Mock Flowable execution context +- Verify JSON patch generation + +// Integration Testing +- Test complete workflow execution +- Verify transaction rollback scenarios +- Test trigger conditions + +// Load Testing +- Test batch processing with large datasets +- Verify concurrent workflow execution +- Monitor resource utilization +``` + +### 5. Monitoring & Alerting + +```yaml +Key Metrics: + - Workflow execution time + - Success/failure rates + - Queue depth for batch processing + - User task response times + +Alerts: + - Workflow failures > threshold + - Stuck workflows (no progress > 24h) + - Transaction rollback events + - Deprecation of critical assets +``` + +--- + +## Migration Guide + +### From Legacy Workflows + +1. **ID Migration**: Replace truncated IDs with base64 encoded IDs +2. **Transaction Safety**: Wrap deployments in WorkflowTransactionManager +3. **Batch Processing**: Migrate from single entity to batch triggers +4. **Status Management**: Use proper status enums (Draft/Approved/Deprecated) + +### Version Compatibility + +- **Flowable**: 6.7.2+ required +- **OpenMetadata**: 1.4.0+ required +- **Java**: 17+ required +- **Database**: MySQL 8.0+ or PostgreSQL 12+ + +--- + +## Advanced Patterns + +### 1. Workflow Composition + +```yaml +# Parent workflow that orchestrates child workflows +name: "Master Data Governance" +nodes: + - callActivity: + id: "qualityWorkflow" + calledElement: "Data Quality Certification" + + - callActivity: + id: "complianceWorkflow" + calledElement: "Enterprise Data Governance" + + - callActivity: + id: "lifecycleWorkflow" + calledElement: "Table Lifecycle Management" +``` + +### 2. Dynamic Workflow Generation + +```java +// Generate workflows based on metadata +public WorkflowDefinition generateWorkflow(EntityType type) { + WorkflowDefinitionBuilder builder = new WorkflowDefinitionBuilder(); + + // Add nodes based on entity type + if (type.requiresApproval()) { + builder.addNode(new UserApprovalTask()); + } + + if (type.hasQualityChecks()) { + builder.addNode(new RunAppTask("quality-checker")); + } + + return builder.build(); +} +``` + +### 3. Event-Driven Orchestration + +```yaml +# Chain workflows through events +workflow1: + endEvent: + type: "signal" + signal: "workflow1Complete" + +workflow2: + startEvent: + type: "signal" + signal: "workflow1Complete" +``` + +--- + +## Troubleshooting Guide + +### Common Issues + +1. **Workflow Not Triggering** + - Check trigger configuration matches entity type + - Verify filters are not too restrictive + - Ensure Flowable engine is running + +2. **Transaction Rollback** + - Check logs for validation errors + - Verify entity permissions + - Ensure database connectivity + +3. **Performance Issues** + - Reduce batch size for large datasets + - Optimize filters to reduce initial query + - Consider async processing for heavy operations + +--- + +## UI Implementation Guide + +### Overview +The UI for Governance Workflows should provide an intuitive drag-and-drop interface for creating workflows without code. This section provides detailed guidelines for UI developers. + +### Workflow Builder Components + +#### 1. Node Palette +Display available nodes categorized by type: + +```javascript +const nodePalette = { + "Automated Tasks": [ + { + id: "setEntityAttribute", + icon: "SettingsIcon", + label: "Set Attribute", + description: "Set entity attributes" + }, + { + id: "dataCompleteness", + icon: "CheckCircleIcon", + label: "Data Completeness", + description: "Check data quality score" + }, + { + id: "checkEntityAttributes", + icon: "VerifiedIcon", + label: "Check Attributes", + description: "Validate with rules" + } + ], + "User Tasks": [ + { + id: "userApproval", + icon: "PersonIcon", + label: "User Approval", + description: "Manual approval step" + } + ], + "Control Flow": [ + { + id: "parallelGateway", + icon: "ForkIcon", + label: "Parallel Gateway", + description: "Split/join paths" + } + ] +}; +``` + +#### 2. Node Configuration Forms + +##### DataCompletenessTask Form +```javascript +const DataCompletenessForm = { + fieldsToCheck: { + type: "multiselect", + label: "Fields to Check", + placeholder: "Select entity fields...", + dataSource: "entityFields", // Dynamic based on entity type + required: true, + helpText: "Select fields to evaluate for completeness" + }, + qualityBands: { + type: "dynamic-list", + label: "Quality Bands", + minItems: 1, + fields: { + name: { + type: "text", + placeholder: "Band name (e.g., gold)", + required: true + }, + minimumScore: { + type: "number", + min: 0, + max: 100, + placeholder: "Minimum %", + required: true + } + }, + default: [ + {name: "excellent", minimumScore: 90}, + {name: "good", minimumScore: 70}, + {name: "acceptable", minimumScore: 50}, + {name: "poor", minimumScore: 0} + ], + helpText: "Define quality levels from highest to lowest score" + }, + treatEmptyStringAsNull: { + type: "checkbox", + label: "Treat empty strings as missing", + default: true + }, + treatEmptyArrayAsNull: { + type: "checkbox", + label: "Treat empty arrays as missing", + default: true + } +}; +``` + +##### CheckEntityAttributesTask Form +```javascript +const CheckEntityAttributesForm = { + rules: { + type: "queryBuilder", + label: "Validation Rules", + format: "jsonlogic", + entityType: "dynamic", // From workflow context + operators: [ + "equals", "notEquals", "contains", "startsWith", + "greaterThan", "lessThan", "isNull", "isNotNull", + "length", "isOwner", "isReviewer", + "isUpdatedBefore", "isUpdatedAfter", + "fieldCompleteness" // Custom operator + ], + helpText: "Define rules using JsonLogic format" + } +}; +``` + +#### 3. Edge Configuration + +```javascript +const EdgeConfiguration = { + condition: { + type: "select", + label: "Routing Condition", + dataSource: "previousNodeOutputs", // Dynamic based on source node + placeholder: "Select condition...", + examples: { + dataCompleteness: ["gold", "silver", "bronze", "unacceptable"], + checkEntityAttributes: ["success", "failure"], + userApproval: ["approved", "rejected"] + } + } +}; +``` + +### Variable Mapping UI + +#### Input Namespace Map +Show how nodes can use outputs from previous nodes: + +```javascript +const VariableMappingUI = { + inputNamespaceMap: { + type: "mapping", + label: "Input Variables", + columns: [ + { + name: "variable", + label: "Variable Name", + type: "select", + options: ["relatedEntity", "completenessScore", "missingFields"] + }, + { + name: "source", + label: "Source Node", + type: "select", + options: ["global", ...previousNodeNames] + } + ], + helpText: "Map variables from previous nodes or global context" + } +}; +``` + +### Visual Workflow Designer + +#### Canvas Features +1. **Drag & Drop**: Nodes from palette to canvas +2. **Connection Mode**: Click source node, then target node to create edge +3. **Auto-Layout**: Automatic arrangement of nodes +4. **Zoom & Pan**: Navigate large workflows +5. **Validation Indicators**: Show errors/warnings on nodes + +#### Node Visualization +```javascript +const NodeVisualization = { + base: { + width: 180, + height: 60, + borderRadius: 8, + padding: 12 + }, + states: { + default: { border: "1px solid #d9d9d9" }, + selected: { border: "2px solid #1890ff" }, + error: { border: "2px solid #ff4d4f" }, + warning: { border: "2px solid #faad14" } + }, + content: { + icon: { size: 24, position: "left" }, + label: { fontSize: 14, fontWeight: 500 }, + subtitle: { fontSize: 12, color: "#8c8c8c" }, + outputs: { position: "bottom", fontSize: 10 } + } +}; +``` + +### Workflow Templates + +Provide pre-built templates users can customize: + +```javascript +const workflowTemplates = [ + { + name: "Data Quality Gate", + description: "Check completeness before production", + template: { + nodes: [ + { + name: "qualityCheck", + type: "dataCompleteness", + config: { + fieldsToCheck: ["name", "description", "owner"], + qualityBands: [ + {name: "production-ready", minimumScore: 90}, + {name: "needs-review", minimumScore: 70}, + {name: "draft", minimumScore: 0} + ] + } + }, + { + name: "approve", + type: "userApproval", + config: { + assignees: ["data-steward"] + } + } + ], + edges: [ + { + from: "qualityCheck", + to: "approve", + condition: "needs-review" + } + ] + } + } +]; +``` + +### Validation & Error Handling + +#### Real-time Validation +```javascript +const ValidationRules = { + workflow: { + hasStartNode: "Workflow must have exactly one start node", + hasEndNode: "Workflow must have at least one end node", + allNodesConnected: "All nodes must be connected", + noCycles: "Workflow cannot have cycles (except with gateways)" + }, + node: { + uniqueName: "Node names must be unique", + requiredConfig: "All required configuration must be provided", + validOutputReferences: "Output references must exist" + }, + edge: { + validCondition: "Condition must match source node outputs", + noSelfLoop: "Nodes cannot connect to themselves" + } +}; +``` + +### Testing & Preview + +#### Workflow Simulator +```javascript +const WorkflowSimulator = { + testData: { + entity: "Select or create test entity", + initialVariables: "Set initial variable values" + }, + execution: { + stepThrough: "Execute node by node", + breakpoints: "Set breakpoints on nodes", + variableInspector: "View variables at each step" + }, + results: { + path: "Show execution path", + outputs: "Display final outputs", + errors: "List any errors encountered" + } +}; +``` + +### Best Practices for UI Implementation + +1. **Progressive Disclosure**: Show basic options first, advanced in expandable sections +2. **Contextual Help**: Tooltips and inline documentation +3. **Smart Defaults**: Pre-fill common configurations +4. **Undo/Redo**: Support workflow editing history +5. **Auto-Save**: Periodically save drafts +6. **Keyboard Shortcuts**: + - `Ctrl+Z` - Undo + - `Ctrl+Y` - Redo + - `Delete` - Remove selected node + - `Ctrl+D` - Duplicate node +7. **Export/Import**: Support JSON export/import of workflows + +### Integration with Backend + +#### API Calls +```javascript +// Validate workflow +POST /api/v1/governance/workflows/validate +{ + "workflow": workflowDefinition +} + +// Get available fields for entity type +GET /api/v1/metadata/types/{entityType}/fields + +// Get custom operators +GET /api/v1/system/config/customLogicOps + +// Test workflow execution +POST /api/v1/governance/workflows/test +{ + "workflow": workflowDefinition, + "testEntity": entityId +} +``` + +### Performance Considerations + +1. **Lazy Loading**: Load node configurations only when needed +2. **Virtualization**: For workflows with many nodes +3. **Debouncing**: Validation and auto-save +4. **Caching**: Entity fields and operator definitions +5. **Web Workers**: Complex workflow validation + +--- + +## Conclusion + +The OpenMetadata Governance Workflow system provides a robust, extensible framework for implementing complex data governance policies. The architecture ensures: + +1. **Reliability** through atomic transactions and proper error handling +2. **Scalability** through batch processing and parallel execution +3. **Flexibility** through modular, reusable components +4. **Governance** through comprehensive audit trails and approval workflows + +The combination of EventBasedEntityTrigger and PeriodicBatchEntityTrigger enables both reactive and proactive governance, while the rich set of task nodes supports virtually any governance scenario. + +--- + +## Appendix + +### A. JSON Schema Definitions + +All workflow nodes have corresponding JSON schemas in: +``` +/openmetadata-spec/src/main/resources/json/schema/governance/workflows/elements/nodes/ +``` + +### B. BPMN XML Generation + +Workflows are compiled to BPMN 2.0 XML for Flowable execution: +```java +BpmnXMLConverter converter = new BpmnXMLConverter(); +String bpmnXml = new String(converter.convertToXML(workflow.getMainModel())); +``` + +### C. Database Schema + +Workflow definitions stored in: +- `workflow_definition` table (OpenMetadata DB) +- `ACT_RE_DEPLOYMENT` table (Flowable DB) +- `ACT_RE_PROCDEF` table (Flowable process definitions) + +### D. REST API Endpoints + +``` +POST /api/v1/governance/workflows - Create workflow +GET /api/v1/governance/workflows/{id} - Get workflow +PUT /api/v1/governance/workflows/{id} - Update workflow +DELETE /api/v1/governance/workflows/{id} - Delete workflow +POST /api/v1/governance/workflows/{id}/run - Trigger workflow +``` + +--- + +*Document Version: 1.0* +*Last Updated: 2024* +*Author: Senior Architect - Governance Workflows Team* \ No newline at end of file diff --git a/WORKFLOW_SOLUTION_ARCHITECTURE.md b/WORKFLOW_SOLUTION_ARCHITECTURE.md new file mode 100644 index 00000000000..b2d4f36ab3a --- /dev/null +++ b/WORKFLOW_SOLUTION_ARCHITECTURE.md @@ -0,0 +1,98 @@ +# OpenMetadata Governance Workflows - Architecture & Implementation Guide + +## Table of Contents +1. [Architecture Overview](#architecture-overview) +2. [Core Components](#core-components) +3. [Implementation Details](#implementation-details) +4. [Workflow Triggers](#workflow-triggers) +5. [Workflow Nodes Catalog](#workflow-nodes-catalog) +6. [Custom Workflow Examples](#custom-workflow-examples) +7. [Best Practices](#best-practices) + +1. **Non-blocking**: Workflows must NOT block entity operations (glossary term creation, etc.) +2. **BPMN-driven flow**: Flowable's BPMN engine controls the flow based on conditions - we don't override this +3. **Complete audit trail**: Every execution attempt must be recorded, even failures +4. **No ID generation**: Use Flowable's execution IDs, not random UUIDs + +## Solution Implementation + +### 1. WorkflowInstanceListener +- **On Exception in execute()**: Still record the workflow state with FAILURE status +- **Use Flowable's execution ID**: Convert `execution.getId()` to UUID for tracking +- **Always persist state**: Even on failure, write to `workflow_instance_time_series` + +### 2. WorkflowInstanceStageListener +- **On Exception**: Create a failed stage record so there's an audit trail +- **Use deterministic IDs**: `UUID.nameUUIDFromBytes(execution.getId().getBytes())` +- **Don't block flow**: Log and record but let Flowable continue per BPMN + +### 3. BaseDelegate (Task Implementations) +- **Throw BpmnError**: This is CORRECT - allows boundary events to handle failures +- **Set exception variable**: For downstream stages to check +- **Let BPMN decide**: The workflow definition controls whether to continue or fail + +### 4. WorkflowFailureListener +- **Keep as-is**: `isFailOnException() = false` is correct - don't block entity operations +- **Purpose**: Global monitoring, not flow control + +## How It Works + +1. **Task fails** → BaseDelegate throws BpmnError +2. **BPMN handles** → Boundary events catch error, workflow continues/fails per definition +3. **Listeners record** → Even on exception, state is persisted to database +4. **No silent failures** → Database always has the true state + +## Key Changes Made + +```java +// WorkflowInstanceListener - Always record state +catch (Exception exc) { + LOG.error(...); + // Still write to DB even on failure + if ("end".equals(execution.getEventName())) { + workflowInstanceRepository.updateWorkflowInstance( + workflowInstanceId, + System.currentTimeMillis(), + Map.of("status", "FAILURE", "error", exc.getMessage()) + ); + } +} + +// WorkflowInstanceStageListener - Create failure records +catch (Exception exc) { + LOG.error(...); + // Create a failed stage record for audit + if ("end".equals(execution.getEventName())) { + UUID stageId = workflowInstanceStateRepository.addNewStageToInstance( + stage + "_failed", ... + ); + workflowInstanceStateRepository.updateStage( + stageId, + System.currentTimeMillis(), + Map.of("status", "FAILED", "error", exc.getMessage()) + ); + } +} +``` + +## What We DON'T Do + +1. **Don't generate random UUIDs** - Use Flowable's execution IDs +2. **Don't skip stages** - Let BPMN control flow +3. **Don't throw from listeners** - Would block entity operations +4. **Don't override BPMN decisions** - Respect workflow definitions + +## Testing + +1. Remove `relatedEntity` from glossary workflow trigger +2. Check database - should show FAILURE status, not FINISHED +3. Check stages - should have failed stage records +4. Entity (glossary term) should still be created + +## Result + +- Workflows fail gracefully per BPMN definition +- Database always reflects true state +- No silent failures +- Entity operations never blocked +- Complete audit trail maintained \ No newline at end of file diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/Workflow.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/Workflow.java index 3070b35d577..cffb6e9dc2f 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/Workflow.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/Workflow.java @@ -7,6 +7,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; import lombok.Getter; +import lombok.extern.slf4j.Slf4j; import org.flowable.bpmn.model.BoundaryEvent; import org.flowable.bpmn.model.BpmnModel; import org.flowable.bpmn.model.Process; @@ -22,6 +23,7 @@ import org.openmetadata.service.governance.workflows.elements.TriggerFactory; import org.openmetadata.service.governance.workflows.elements.TriggerInterface; import org.openmetadata.service.governance.workflows.elements.nodes.endEvent.EndEvent; +@Slf4j @Getter public class Workflow { public static final String INGESTION_PIPELINE_ID_VARIABLE = "ingestionPipelineId"; @@ -43,14 +45,27 @@ public class Workflow { public static final String GLOBAL_NAMESPACE = "global"; public Workflow(WorkflowDefinition workflowDefinition) { + LOG.info( + "[WorkflowBuild] START: Creating workflow '{}' with {} nodes, {} edges", + workflowDefinition.getFullyQualifiedName(), + workflowDefinition.getNodes() != null ? workflowDefinition.getNodes().size() : 0, + workflowDefinition.getEdges() != null ? workflowDefinition.getEdges().size() : 0); + // Build Trigger + LOG.debug( + "[WorkflowBuild] Creating trigger: type='{}'", + workflowDefinition.getTrigger() != null + ? workflowDefinition.getTrigger().getType() + : "none"); this.triggerModel = new BpmnModel(); triggerModel.setTargetNamespace(""); TriggerInterface trigger = TriggerFactory.createTrigger(workflowDefinition); trigger.addToWorkflow(triggerModel); this.triggerWorkflowName = trigger.getTriggerWorkflowId(); + LOG.debug("[WorkflowBuild] Trigger created: id='{}'", triggerWorkflowName); // Build Main Workflow + LOG.debug("[WorkflowBuild] Validating workflow graph"); new WorkflowGraph(workflowDefinition).validate(); this.mainModel = new BpmnModel(); @@ -67,30 +82,55 @@ public class Workflow { // Add Nodes for (WorkflowNodeDefinitionInterface nodeDefinitionObj : workflowDefinition.getNodes()) { + LOG.debug( + "[WorkflowBuild] Adding node: name='{}' type='{}' outputs={}", + nodeDefinitionObj.getName(), + nodeDefinitionObj.getType(), + nodeDefinitionObj.getOutput()); NodeInterface node = NodeFactory.createNode(nodeDefinitionObj, workflowDefinition.getConfig()); node.addToWorkflow(mainModel, process); Optional.ofNullable(node.getRuntimeExceptionBoundaryEvent()) - .ifPresent(runtimeExceptionBoundaryEvents::add); + .ifPresent( + event -> { + LOG.debug( + "[WorkflowBuild] Added boundary event for node '{}'", + nodeDefinitionObj.getName()); + runtimeExceptionBoundaryEvents.add(event); + }); } // Add Edges for (EdgeDefinition edgeDefinition : workflowDefinition.getEdges()) { + LOG.debug( + "[WorkflowBuild] Processing edge: from='{}' to='{}' condition='{}'", + edgeDefinition.getFrom(), + edgeDefinition.getTo(), + edgeDefinition.getCondition()); Edge edge = new Edge(edgeDefinition); edge.addToWorkflow(mainModel, process); } // Configure Exception Flow configureRuntimeExceptionFlow(process, runtimeExceptionBoundaryEvents); + + LOG.info( + "[WorkflowBuild] SUCCESS: Workflow '{}' built with trigger '{}'", + mainWorkflowName, + triggerWorkflowName); } private void configureRuntimeExceptionFlow( Process process, List runtimeExceptionBoundaryEvents) { EndEvent errorEndEvent = new EndEvent("Error"); process.addFlowElement(errorEndEvent.getEndEvent()); + LOG.debug( + "[WorkflowBuild] Configuring error flow for {} boundary events", + runtimeExceptionBoundaryEvents.size()); for (BoundaryEvent event : runtimeExceptionBoundaryEvents) { process.addFlowElement(new SequenceFlow(event.getId(), errorEndEvent.getEndEvent().getId())); + LOG.debug("[WorkflowBuild] Added error flow: boundaryEvent='{}' -> errorEnd", event.getId()); } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowFailureListener.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowFailureListener.java index ad197d0f7e3..edd07260360 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowFailureListener.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowFailureListener.java @@ -11,7 +11,7 @@ public class WorkflowFailureListener implements FlowableEventListener { @Override public void onEvent(FlowableEvent event) { if (FlowableEngineEventType.JOB_EXECUTION_FAILURE.equals(event.getType())) { - LOG.error("Workflow Failed: " + event); + LOG.error("[WorkflowFailure] JOB_EXECUTION_FAILURE: {}", event); } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowHandler.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowHandler.java index e080ddc55ff..90ed6b82ad8 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowHandler.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowHandler.java @@ -222,8 +222,29 @@ public class WorkflowHandler { public ProcessInstance triggerByKey( String processDefinitionKey, String businessKey, Map variables) { RuntimeService runtimeService = processEngine.getRuntimeService(); - LOG.debug("[GovernanceWorkflows] '{}' triggered with '{}'", processDefinitionKey, variables); - return runtimeService.startProcessInstanceByKey(processDefinitionKey, businessKey, variables); + LOG.info( + "[WorkflowTrigger] START: processKey='{}' businessKey='{}' variables={}", + processDefinitionKey, + businessKey, + variables); + try { + ProcessInstance instance = + runtimeService.startProcessInstanceByKey(processDefinitionKey, businessKey, variables); + LOG.info( + "[WorkflowTrigger] SUCCESS: processKey='{}' instanceId='{}' businessKey='{}'", + processDefinitionKey, + instance.getId(), + businessKey); + return instance; + } catch (Exception e) { + LOG.error( + "[WorkflowTrigger] FAILED: processKey='{}' businessKey='{}' error='{}'", + processDefinitionKey, + businessKey, + e.getMessage(), + e); + throw e; + } } public void triggerWithSignal(String signal, Map variables) { @@ -300,19 +321,36 @@ public class WorkflowHandler { public Map transformToNodeVariables( UUID customTaskId, Map variables) { + LOG.debug( + "[WorkflowVariable] transformToNodeVariables: customTaskId='{}' inputVars={}", + customTaskId, + variables); Map namespacedVariables = null; Optional oTask = Optional.ofNullable(getTaskFromCustomTaskId(customTaskId)); if (oTask.isPresent()) { Task task = oTask.get(); String namespace = getParentActivityId(task.getExecutionId()); + LOG.debug( + "[WorkflowVariable] Found task namespace: taskId='{}' executionId='{}' namespace='{}'", + task.getId(), + task.getExecutionId(), + namespace); namespacedVariables = new HashMap<>(); for (Map.Entry entry : variables.entrySet()) { - namespacedVariables.put( - getNamespacedVariableName(namespace, entry.getKey()), entry.getValue()); + String namespacedVar = getNamespacedVariableName(namespace, entry.getKey()); + namespacedVariables.put(namespacedVar, entry.getValue()); + LOG.debug( + "[WorkflowVariable] Transformed: '{}' -> '{}' = '{}'", + entry.getKey(), + namespacedVar, + entry.getValue()); } + LOG.debug( + "[WorkflowVariable] transformToNodeVariables complete: outputVars={}", + namespacedVariables); } else { - LOG.debug(String.format("Flowable Task for Task ID %s not found.", customTaskId)); + LOG.warn("[WorkflowVariable] Task not found for customTaskId='{}'", customTaskId); } return namespacedVariables; } @@ -323,19 +361,43 @@ public class WorkflowHandler { public void resolveTask(UUID customTaskId, Map variables) { TaskService taskService = processEngine.getTaskService(); + LOG.info("[WorkflowTask] RESOLVE: customTaskId='{}' variables={}", customTaskId, variables); try { Optional oTask = Optional.ofNullable(getTaskFromCustomTaskId(customTaskId)); if (oTask.isPresent()) { Task task = oTask.get(); + LOG.info( + "[WorkflowTask] Found task: flowableTaskId='{}' processInstanceId='{}' name='{}'", + task.getId(), + task.getProcessInstanceId(), + task.getName()); Optional.ofNullable(variables) .ifPresentOrElse( - variablesValue -> taskService.complete(task.getId(), variablesValue), - () -> taskService.complete(task.getId())); + variablesValue -> { + LOG.info( + "[WorkflowTask] Completing with variables: taskId='{}' vars={}", + task.getId(), + variablesValue); + taskService.complete(task.getId(), variablesValue); + }, + () -> { + LOG.info( + "[WorkflowTask] Completing without variables: taskId='{}'", task.getId()); + taskService.complete(task.getId()); + }); + LOG.info("[WorkflowTask] SUCCESS: Task '{}' resolved", customTaskId); } else { - LOG.debug(String.format("Flowable Task for Task ID %s not found.", customTaskId)); + LOG.warn("[WorkflowTask] NOT_FOUND: No Flowable task for customTaskId='{}'", customTaskId); } } catch (FlowableObjectNotFoundException ex) { - LOG.debug(String.format("Flowable Task for Task ID %s not found.", customTaskId)); + LOG.error( + "[WorkflowTask] ERROR: Flowable task not found for customTaskId='{}': {}", + customTaskId, + ex.getMessage()); + } catch (Exception e) { + LOG.error( + "[WorkflowTask] ERROR: Failed to resolve task '{}': {}", customTaskId, e.getMessage(), e); + throw e; } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowVariableHandler.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowVariableHandler.java index 257dce1239a..4754587b6bb 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowVariableHandler.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowVariableHandler.java @@ -28,8 +28,20 @@ public class WorkflowVariableHandler { public Object getNamespacedVariable(String namespace, String varName) { String namespacedVarName = getNamespacedVariableName(namespace, varName); if (namespacedVarName != null) { - return varScope.getVariable(namespacedVarName); + Object value = varScope.getVariable(namespacedVarName); + LOG.debug( + "[WorkflowVariable] GET: namespace='{}' varName='{}' namespacedVar='{}' value='{}' type='{}'", + namespace, + varName, + namespacedVarName, + value, + value != null ? value.getClass().getSimpleName() : "null"); + return value; } else { + LOG.debug( + "[WorkflowVariable] GET: namespace='{}' varName='{}' returned null (no namespace)", + namespace, + varName); return null; } } @@ -43,8 +55,15 @@ public class WorkflowVariableHandler { String namespacedVarName = getNamespacedVariableName(namespace, varName); if (namespacedVarName != null) { varScope.setVariable(namespacedVarName, varValue); - LOG.debug(String.format("%s variable set to %s", namespacedVarName, varValue)); + LOG.debug( + "[WorkflowVariable] SET: namespace='{}' varName='{}' namespacedVar='{}' value='{}' type='{}'", + namespace, + varName, + namespacedVarName, + varValue, + varValue != null ? varValue.getClass().getSimpleName() : "null"); } else { + LOG.error("[WorkflowVariable] ERROR: Namespace is null when setting variable '{}'", varName); throw new RuntimeException("Namespace can't be null when setting a namespaced variable."); } } @@ -54,20 +73,43 @@ public class WorkflowVariableHandler { } private String getNodeNamespace() { + String namespace; if (varScope instanceof DelegateExecution) { - return Optional.ofNullable(((DelegateExecution) varScope).getParent().getCurrentActivityId()) - .orElseGet(() -> ((DelegateExecution) varScope).getCurrentActivityId().split("\\.")[0]); + DelegateExecution execution = (DelegateExecution) varScope; + namespace = + Optional.ofNullable( + execution.getParent() != null + ? execution.getParent().getCurrentActivityId() + : null) + .orElseGet(() -> execution.getCurrentActivityId().split("\\.")[0]); + LOG.debug( + "[WorkflowVariable] getNodeNamespace: DelegateExecution activityId='{}' namespace='{}'", + execution.getCurrentActivityId(), + namespace); } else if (varScope instanceof DelegateTask) { - return WorkflowHandler.getInstance() - .getParentActivityId(((DelegateTask) varScope).getExecutionId()); + DelegateTask task = (DelegateTask) varScope; + namespace = WorkflowHandler.getInstance().getParentActivityId(task.getExecutionId()); + LOG.debug( + "[WorkflowVariable] getNodeNamespace: DelegateTask executionId='{}' namespace='{}'", + task.getExecutionId(), + namespace); } else { + LOG.error( + "[WorkflowVariable] ERROR: Invalid varScope type: {}", + varScope != null ? varScope.getClass().getName() : "null"); throw new RuntimeException( "varScope must be either an instance of 'DelegateExecution' or 'DelegateTask'."); } + return namespace; } public void setNodeVariable(String varName, Object varValue) { String namespace = getNodeNamespace(); + LOG.debug( + "[WorkflowVariable] setNodeVariable: varName='{}' value='{}' using namespace='{}'", + varName, + varValue, + namespace); setNamespacedVariable(namespace, varName, varValue); } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/Edge.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/Edge.java index 06a8b1df791..5e251678cb4 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/Edge.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/Edge.java @@ -3,26 +3,45 @@ package org.openmetadata.service.governance.workflows.elements; import static org.openmetadata.service.governance.workflows.Workflow.RESULT_VARIABLE; import static org.openmetadata.service.governance.workflows.WorkflowVariableHandler.getNamespacedVariableName; +import lombok.extern.slf4j.Slf4j; import org.flowable.bpmn.model.BpmnModel; import org.flowable.bpmn.model.Process; import org.flowable.bpmn.model.SequenceFlow; import org.openmetadata.common.utils.CommonUtil; +@Slf4j public class Edge { private final SequenceFlow edge; public Edge(org.openmetadata.schema.governance.workflows.elements.EdgeDefinition edgeDefinition) { SequenceFlow edge = new SequenceFlow(edgeDefinition.getFrom(), edgeDefinition.getTo()); if (!CommonUtil.nullOrEmpty(edgeDefinition.getCondition())) { - edge.setConditionExpression( - getFlowableCondition(edgeDefinition.getFrom(), edgeDefinition.getCondition())); + String conditionExpression = + getFlowableCondition(edgeDefinition.getFrom(), edgeDefinition.getCondition()); + edge.setConditionExpression(conditionExpression); + LOG.debug( + "[WorkflowEdge] Created conditional edge from='{}' to='{}' with condition='{}' expression='{}'", + edgeDefinition.getFrom(), + edgeDefinition.getTo(), + edgeDefinition.getCondition(), + conditionExpression); + } else { + LOG.debug( + "[WorkflowEdge] Created unconditional edge from='{}' to='{}'", + edgeDefinition.getFrom(), + edgeDefinition.getTo()); } this.edge = edge; } private String getFlowableCondition(String from, String condition) { - return String.format( - "${%s == '%s'}", getNamespacedVariableName(from, RESULT_VARIABLE), condition); + String variableName = getNamespacedVariableName(from, RESULT_VARIABLE); + String expression = String.format("${%s == '%s'}", variableName, condition); + LOG.debug( + "[WorkflowEdge] Condition expression: checking if variable '{}' equals '{}'", + variableName, + condition); + return expression; } public void addToWorkflow(BpmnModel model, Process process) { diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/NodeFactory.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/NodeFactory.java index 8992bfb7c37..7ab6adddc1c 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/NodeFactory.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/NodeFactory.java @@ -6,6 +6,7 @@ import org.openmetadata.schema.governance.workflows.elements.WorkflowNodeDefinit import org.openmetadata.schema.governance.workflows.elements.nodes.automatedTask.CheckEntityAttributesTaskDefinition; import org.openmetadata.schema.governance.workflows.elements.nodes.automatedTask.ConditionalSetEntityAttributeTaskDefinition; import org.openmetadata.schema.governance.workflows.elements.nodes.automatedTask.CreateAndRunIngestionPipelineTaskDefinition; +import org.openmetadata.schema.governance.workflows.elements.nodes.automatedTask.DataCompletenessTaskDefinition; import org.openmetadata.schema.governance.workflows.elements.nodes.automatedTask.RollbackEntityTaskDefinition; import org.openmetadata.schema.governance.workflows.elements.nodes.automatedTask.RunAppTaskDefinition; import org.openmetadata.schema.governance.workflows.elements.nodes.automatedTask.SetEntityAttributeTaskDefinition; @@ -17,6 +18,7 @@ import org.openmetadata.schema.governance.workflows.elements.nodes.startEvent.St import org.openmetadata.schema.governance.workflows.elements.nodes.userTask.UserApprovalTaskDefinition; import org.openmetadata.service.governance.workflows.elements.nodes.automatedTask.CheckEntityAttributesTask; import org.openmetadata.service.governance.workflows.elements.nodes.automatedTask.ConditionalSetEntityAttributeTask; +import org.openmetadata.service.governance.workflows.elements.nodes.automatedTask.DataCompletenessTask; import org.openmetadata.service.governance.workflows.elements.nodes.automatedTask.RollbackEntityTask; import org.openmetadata.service.governance.workflows.elements.nodes.automatedTask.SetEntityAttributeTask; import org.openmetadata.service.governance.workflows.elements.nodes.automatedTask.SetEntityCertificationTask; @@ -51,6 +53,8 @@ public class NodeFactory { case RUN_APP_TASK -> new RunAppTask((RunAppTaskDefinition) nodeDefinition, config); case ROLLBACK_ENTITY_TASK -> new RollbackEntityTask( (RollbackEntityTaskDefinition) nodeDefinition, config); + case DATA_COMPLETENESS_TASK -> new DataCompletenessTask( + (DataCompletenessTaskDefinition) nodeDefinition, config); case PARALLEL_GATEWAY -> new ParallelGateway( (ParallelGatewayDefinition) nodeDefinition, config); }; diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/DataCompletenessTask.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/DataCompletenessTask.java new file mode 100644 index 00000000000..df11c996387 --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/DataCompletenessTask.java @@ -0,0 +1,129 @@ +package org.openmetadata.service.governance.workflows.elements.nodes.automatedTask; + +import static org.openmetadata.service.governance.workflows.Workflow.getFlowableElementId; + +import java.util.List; +import lombok.extern.slf4j.Slf4j; +import org.flowable.bpmn.model.BoundaryEvent; +import org.flowable.bpmn.model.BpmnModel; +import org.flowable.bpmn.model.EndEvent; +import org.flowable.bpmn.model.FieldExtension; +import org.flowable.bpmn.model.Process; +import org.flowable.bpmn.model.SequenceFlow; +import org.flowable.bpmn.model.ServiceTask; +import org.flowable.bpmn.model.StartEvent; +import org.flowable.bpmn.model.SubProcess; +import org.openmetadata.schema.governance.workflows.WorkflowConfiguration; +import org.openmetadata.schema.governance.workflows.elements.nodes.automatedTask.DataCompletenessTaskDefinition; +import org.openmetadata.schema.utils.JsonUtils; +import org.openmetadata.service.governance.workflows.elements.NodeInterface; +import org.openmetadata.service.governance.workflows.elements.nodes.automatedTask.impl.DataCompletenessImpl; +import org.openmetadata.service.governance.workflows.flowable.builders.EndEventBuilder; +import org.openmetadata.service.governance.workflows.flowable.builders.FieldExtensionBuilder; +import org.openmetadata.service.governance.workflows.flowable.builders.ServiceTaskBuilder; +import org.openmetadata.service.governance.workflows.flowable.builders.StartEventBuilder; +import org.openmetadata.service.governance.workflows.flowable.builders.SubProcessBuilder; + +@Slf4j +public class DataCompletenessTask implements NodeInterface { + private final SubProcess subProcess; + private final BoundaryEvent runtimeExceptionBoundaryEvent; + + public DataCompletenessTask( + DataCompletenessTaskDefinition nodeDefinition, WorkflowConfiguration workflowConfig) { + String subProcessId = nodeDefinition.getName(); + + SubProcess subProcess = new SubProcessBuilder().id(subProcessId).build(); + + StartEvent startEvent = + new StartEventBuilder().id(getFlowableElementId(subProcessId, "startEvent")).build(); + + ServiceTask dataCompletenessTask = getDataCompletenessServiceTask(subProcessId, nodeDefinition); + + EndEvent endEvent = + new EndEventBuilder().id(getFlowableElementId(subProcessId, "endEvent")).build(); + + subProcess.addFlowElement(startEvent); + subProcess.addFlowElement(dataCompletenessTask); + subProcess.addFlowElement(endEvent); + + subProcess.addFlowElement(new SequenceFlow(startEvent.getId(), dataCompletenessTask.getId())); + subProcess.addFlowElement(new SequenceFlow(dataCompletenessTask.getId(), endEvent.getId())); + + this.subProcess = subProcess; + this.runtimeExceptionBoundaryEvent = getRuntimeExceptionBoundaryEvent(dataCompletenessTask); + } + + private ServiceTask getDataCompletenessServiceTask( + String parentId, DataCompletenessTaskDefinition nodeDefinition) { + + // Get configuration with defaults if null + var config = nodeDefinition.getConfig(); + Boolean treatEmptyStringAsNull = config.getTreatEmptyStringAsNull(); + Boolean treatEmptyArrayAsNull = config.getTreatEmptyArrayAsNull(); + + List fieldExtensions = + List.of( + new FieldExtensionBuilder() + .fieldName("fieldsToCheckExpr") + .fieldValue(JsonUtils.pojoToJson(config.getFieldsToCheck())) + .build(), + new FieldExtensionBuilder() + .fieldName("qualityBandsExpr") + .fieldValue(JsonUtils.pojoToJson(config.getQualityBands())) + .build(), + new FieldExtensionBuilder() + .fieldName("treatEmptyStringAsNullExpr") + .fieldValue( + String.valueOf(treatEmptyStringAsNull != null ? treatEmptyStringAsNull : true)) + .build(), + new FieldExtensionBuilder() + .fieldName("treatEmptyArrayAsNullExpr") + .fieldValue( + String.valueOf(treatEmptyArrayAsNull != null ? treatEmptyArrayAsNull : true)) + .build(), + new FieldExtensionBuilder() + .fieldName("inputNamespaceMapExpr") + .fieldValue( + JsonUtils.pojoToJson( + nodeDefinition.getInputNamespaceMap() != null + ? nodeDefinition.getInputNamespaceMap() + : new java.util.HashMap<>())) + .build()); + + ServiceTaskBuilder builder = + new ServiceTaskBuilder() + .id(getFlowableElementId(parentId, "dataCompletenessTask")) + .implementation(DataCompletenessImpl.class.getName()); + + for (FieldExtension fieldExtension : fieldExtensions) { + builder.addFieldExtension(fieldExtension); + } + + return builder.build(); + } + + @Override + public void addToWorkflow(BpmnModel model, Process process) { + process.addFlowElement(subProcess); + process.addFlowElement(runtimeExceptionBoundaryEvent); + } + + @Override + public BoundaryEvent getRuntimeExceptionBoundaryEvent() { + return runtimeExceptionBoundaryEvent; + } + + private BoundaryEvent getRuntimeExceptionBoundaryEvent(ServiceTask serviceTask) { + BoundaryEvent boundaryEvent = new BoundaryEvent(); + boundaryEvent.setId(getFlowableElementId(serviceTask.getId(), "runtimeExceptionBoundaryEvent")); + boundaryEvent.setAttachedToRefId(serviceTask.getId()); + + org.flowable.bpmn.model.ErrorEventDefinition errorEventDef = + new org.flowable.bpmn.model.ErrorEventDefinition(); + errorEventDef.setErrorCode("workflowRuntimeException"); + boundaryEvent.addEventDefinition(errorEventDef); + + return boundaryEvent; + } +} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/CheckEntityAttributesImpl.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/CheckEntityAttributesImpl.java index 8fee6f701df..6cd50fc527c 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/CheckEntityAttributesImpl.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/CheckEntityAttributesImpl.java @@ -52,20 +52,12 @@ public class CheckEntityAttributesImpl implements JavaDelegate { private Boolean checkAttributes(MessageParser.EntityLink entityLink, String rules) { EntityInterface entity = Entity.getEntity(entityLink, "*", Include.ALL); + boolean result; try { - Object result = RuleEngine.getInstance().apply(rules, JsonUtils.getMap(entity)); - - // Handle both boolean and numeric results for scoring scenarios - if (result instanceof Number) { - double score = ((Number) result).doubleValue(); - // For numeric results, consider >= 50 as success (configurable threshold) - return score >= 50.0; - } - - // Default boolean handling - return Boolean.TRUE.equals(result); + result = (boolean) RuleEngine.getInstance().apply(rules, JsonUtils.getMap(entity)); } catch (Exception e) { throw new RuntimeException(e); } + return result; } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/DataCompletenessImpl.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/DataCompletenessImpl.java new file mode 100644 index 00000000000..aa1d073c494 --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/automatedTask/impl/DataCompletenessImpl.java @@ -0,0 +1,229 @@ +package org.openmetadata.service.governance.workflows.elements.nodes.automatedTask.impl; + +import static org.openmetadata.service.governance.workflows.Workflow.EXCEPTION_VARIABLE; +import static org.openmetadata.service.governance.workflows.Workflow.RELATED_ENTITY_VARIABLE; +import static org.openmetadata.service.governance.workflows.Workflow.RESULT_VARIABLE; +import static org.openmetadata.service.governance.workflows.Workflow.WORKFLOW_RUNTIME_EXCEPTION; +import static org.openmetadata.service.governance.workflows.WorkflowHandler.getProcessDefinitionKeyFromId; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import lombok.Data; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang.exception.ExceptionUtils; +import org.flowable.common.engine.api.delegate.Expression; +import org.flowable.engine.delegate.BpmnError; +import org.flowable.engine.delegate.DelegateExecution; +import org.flowable.engine.delegate.JavaDelegate; +import org.openmetadata.schema.EntityInterface; +import org.openmetadata.schema.type.Include; +import org.openmetadata.schema.utils.JsonUtils; +import org.openmetadata.service.Entity; +import org.openmetadata.service.governance.workflows.WorkflowVariableHandler; +import org.openmetadata.service.resources.feeds.MessageParser; + +@Slf4j +public class DataCompletenessImpl implements JavaDelegate { + private Expression fieldsToCheckExpr; + private Expression qualityBandsExpr; + private Expression treatEmptyStringAsNullExpr; + private Expression treatEmptyArrayAsNullExpr; + private Expression inputNamespaceMapExpr; + + @Override + public void execute(DelegateExecution execution) { + WorkflowVariableHandler varHandler = new WorkflowVariableHandler(execution); + try { + // Get configuration + Map inputNamespaceMap = + JsonUtils.readOrConvertValue(inputNamespaceMapExpr.getValue(execution), Map.class); + List fieldsToCheck = + JsonUtils.readOrConvertValue(fieldsToCheckExpr.getValue(execution), List.class); + List> qualityBandMaps = + JsonUtils.readOrConvertValue(qualityBandsExpr.getValue(execution), List.class); + List qualityBands = new ArrayList<>(); + for (Map bandMap : qualityBandMaps) { + QualityBand band = new QualityBand(); + band.setName((String) bandMap.get("name")); + band.setMinimumScore(((Number) bandMap.get("minimumScore")).doubleValue()); + qualityBands.add(band); + } + boolean treatEmptyStringAsNull = + Boolean.parseBoolean(treatEmptyStringAsNullExpr.getValue(execution).toString()); + boolean treatEmptyArrayAsNull = + Boolean.parseBoolean(treatEmptyArrayAsNullExpr.getValue(execution).toString()); + + // Get the entity + MessageParser.EntityLink entityLink = + MessageParser.EntityLink.parse( + (String) + varHandler.getNamespacedVariable( + inputNamespaceMap.get(RELATED_ENTITY_VARIABLE), RELATED_ENTITY_VARIABLE)); + + EntityInterface entity = Entity.getEntity(entityLink, "*", Include.ALL); + Map entityMap = JsonUtils.getMap(entity); + + // Calculate completeness + DataCompletenessResult result = + calculateCompleteness( + entityMap, + fieldsToCheck, + qualityBands, + treatEmptyStringAsNull, + treatEmptyArrayAsNull); + + // Set output variables + varHandler.setNodeVariable("completenessScore", result.score); + varHandler.setNodeVariable("filledFieldsCount", result.filledFieldsCount); + varHandler.setNodeVariable("totalFieldsCount", result.totalFieldsCount); + varHandler.setNodeVariable("missingFields", result.missingFields); + varHandler.setNodeVariable("filledFields", result.filledFields); + varHandler.setNodeVariable("qualityBand", result.qualityBand); + + // Set result variable for edge routing (using the quality band name) + varHandler.setNodeVariable(RESULT_VARIABLE, result.qualityBand); + + LOG.info( + "[WorkflowNode][DataCompleteness] EXECUTED: entity='{}' score={}% band='{}' filled={}/{}", + entityLink, + result.score, + result.qualityBand, + result.filledFieldsCount, + result.totalFieldsCount); + + } catch (Exception exc) { + LOG.error( + "[{}] Data completeness check failed: ", + getProcessDefinitionKeyFromId(execution.getProcessDefinitionId()), + exc); + varHandler.setGlobalVariable(EXCEPTION_VARIABLE, ExceptionUtils.getStackTrace(exc)); + throw new BpmnError(WORKFLOW_RUNTIME_EXCEPTION, exc.getMessage()); + } + } + + private DataCompletenessResult calculateCompleteness( + Map entityMap, + List fieldsToCheck, + List qualityBands, + boolean treatEmptyStringAsNull, + boolean treatEmptyArrayAsNull) { + + DataCompletenessResult result = new DataCompletenessResult(); + result.totalFieldsCount = fieldsToCheck.size(); + result.missingFields = new ArrayList<>(); + result.filledFields = new ArrayList<>(); + + for (String fieldPath : fieldsToCheck) { + Object value = getFieldValue(entityMap, fieldPath); + + if (isFieldFilled(value, treatEmptyStringAsNull, treatEmptyArrayAsNull)) { + result.filledFieldsCount++; + result.filledFields.add(fieldPath); + } else { + result.missingFields.add(fieldPath); + } + } + + // Calculate percentage + result.score = + result.totalFieldsCount > 0 + ? (result.filledFieldsCount * 100.0) / result.totalFieldsCount + : 0.0; + + // Determine quality band based on score + result.qualityBand = determineQualityBand(result.score, qualityBands); + + return result; + } + + private Object getFieldValue(Map entityMap, String fieldPath) { + // Handle nested fields with dot notation + String[] parts = fieldPath.split("\\."); + Object current = entityMap; + + for (String part : parts) { + if (current == null) { + return null; + } + + // Handle array notation like "columns[]" + if (part.endsWith("[]")) { + String fieldName = part.substring(0, part.length() - 2); + if (current instanceof Map) { + current = ((Map) current).get(fieldName); + // For arrays, check if any element exists + if (current instanceof List && !((List) current).isEmpty()) { + return current; // Return the list itself if non-empty + } + } + return null; + } else if (current instanceof Map) { + current = ((Map) current).get(part); + } else { + return null; + } + } + + return current; + } + + private boolean isFieldFilled( + Object value, boolean treatEmptyStringAsNull, boolean treatEmptyArrayAsNull) { + + if (value == null) { + return false; + } + + if (value instanceof String) { + String str = (String) value; + return treatEmptyStringAsNull ? !str.trim().isEmpty() : true; + } + + if (value instanceof List) { + List list = (List) value; + return treatEmptyArrayAsNull ? !list.isEmpty() : true; + } + + if (value instanceof Map) { + Map map = (Map) value; + return !map.isEmpty(); + } + + // For other types (numbers, booleans), non-null means filled + return true; + } + + private String determineQualityBand(double score, List qualityBands) { + // Sort bands by minimumScore in descending order to evaluate from highest to lowest + List sortedBands = new ArrayList<>(qualityBands); + sortedBands.sort(Comparator.comparingDouble(QualityBand::getMinimumScore).reversed()); + + // Find the matching band + for (QualityBand band : sortedBands) { + if (score >= band.getMinimumScore()) { + return band.getName(); + } + } + + // If no band matches (shouldn't happen if bands are configured correctly) + // Return the band with the lowest threshold + return sortedBands.isEmpty() ? "undefined" : sortedBands.getLast().getName(); + } + + @Data + public static class QualityBand { + private String name; + private Double minimumScore; + } + + private static class DataCompletenessResult { + double score = 0.0; + int filledFieldsCount = 0; + int totalFieldsCount = 0; + List missingFields; + List filledFields; + String qualityBand = "undefined"; + } +} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/rules/JsonLogicUtils.java b/openmetadata-service/src/main/java/org/openmetadata/service/rules/JsonLogicUtils.java index 755a0a5ff23..6e8761c5797 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/rules/JsonLogicUtils.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/rules/JsonLogicUtils.java @@ -3,7 +3,6 @@ package org.openmetadata.service.rules; import io.github.jamsesso.jsonlogic.ast.JsonLogicArray; import io.github.jamsesso.jsonlogic.evaluator.JsonLogicEvaluationException; import io.github.jamsesso.jsonlogic.evaluator.JsonLogicEvaluator; -import java.util.ArrayList; import java.util.List; import java.util.Map; import org.jetbrains.annotations.NotNull; @@ -112,46 +111,4 @@ public class JsonLogicUtils { long timestamp = ((Number) timestampObj).longValue(); return updatedAt > timestamp; } - - public static @NotNull Object evaluateFieldCompleteness( - JsonLogicEvaluator evaluator, JsonLogicArray arguments, Object data) - throws JsonLogicEvaluationException { - if (arguments.isEmpty()) return 0.0; - - // Get the list of field names to check - List fields = new ArrayList<>(); - for (int i = 0; i < arguments.size(); i++) { - Object arg = evaluator.evaluate(arguments.get(i), data); - if (arg instanceof String) { - fields.add((String) arg); - } - } - - if (fields.isEmpty()) return 0.0; - - // Check if data is a Map (entity) - if (!(data instanceof Map entityMap)) return 0.0; - - // Count non-empty fields - long filledCount = 0; - for (String field : fields) { - Object value = entityMap.get(field); - if (value != null) { - // Check if the value is non-empty based on its type - if (value instanceof String && !((String) value).trim().isEmpty()) { - filledCount++; - } else if (value instanceof List && !((List) value).isEmpty()) { - filledCount++; - } else if (value instanceof Map && !((Map) value).isEmpty()) { - filledCount++; - } else if (!(value instanceof String || value instanceof List || value instanceof Map)) { - // For other types (numbers, booleans), non-null means filled - filledCount++; - } - } - } - - // Return percentage as a number (0-100) - return (filledCount * 100.0) / fields.size(); - } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/rules/LogicOps.java b/openmetadata-service/src/main/java/org/openmetadata/service/rules/LogicOps.java index 827f20ad3b3..c2cee447f7c 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/rules/LogicOps.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/rules/LogicOps.java @@ -24,8 +24,7 @@ public class LogicOps { IS_REVIEWER("isReviewer"), IS_OWNER("isOwner"), IS_UPDATED_BEFORE("isUpdatedBefore"), - IS_UPDATED_AFTER("isUpdatedAfter"), - FIELD_COMPLETENESS("fieldCompleteness"); + IS_UPDATED_AFTER("isUpdatedAfter"); public final String key; @@ -114,22 +113,6 @@ public class LogicOps { return JsonLogicUtils.evaluateIsUpdatedAfter(evaluator, arguments, data); } }); - - // {"fieldCompleteness": ["field1", "field2", "field3"]} - Returns % of non-empty fields - jsonLogic.addOperation( - new JsonLogicExpression() { - @Override - public String key() { - return CustomLogicOps.FIELD_COMPLETENESS.key; - } - - @Override - public Object evaluate( - JsonLogicEvaluator evaluator, JsonLogicArray arguments, Object data) - throws JsonLogicEvaluationException { - return JsonLogicUtils.evaluateFieldCompleteness(evaluator, arguments, data); - } - }); } /** diff --git a/openmetadata-spec/src/main/resources/json/schema/governance/workflows/elements/nodeSubType.json b/openmetadata-spec/src/main/resources/json/schema/governance/workflows/elements/nodeSubType.json index ab37ef7752d..d3a1cb7e845 100644 --- a/openmetadata-spec/src/main/resources/json/schema/governance/workflows/elements/nodeSubType.json +++ b/openmetadata-spec/src/main/resources/json/schema/governance/workflows/elements/nodeSubType.json @@ -17,6 +17,7 @@ "createAndRunIngestionPipelineTask", "runAppTask", "rollbackEntityTask", + "dataCompletenessTask", "parallelGateway" ] } diff --git a/openmetadata-spec/src/main/resources/json/schema/governance/workflows/elements/nodes/automatedTask/dataCompletenessTask.json b/openmetadata-spec/src/main/resources/json/schema/governance/workflows/elements/nodes/automatedTask/dataCompletenessTask.json new file mode 100644 index 00000000000..041d8234018 --- /dev/null +++ b/openmetadata-spec/src/main/resources/json/schema/governance/workflows/elements/nodes/automatedTask/dataCompletenessTask.json @@ -0,0 +1,131 @@ +{ + "$id": "https://open-metadata.org/schema/governance/workflows/elements/nodes/automatedTask/dataCompletenessTask.json", + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "DataCompletenessTaskDefinition", + "description": "Evaluates entity data completeness based on field presence and outputs quality bands.", + "javaInterfaces": [ + "org.openmetadata.schema.governance.workflows.elements.WorkflowNodeDefinitionInterface" + ], + "javaType": "org.openmetadata.schema.governance.workflows.elements.nodes.automatedTask.DataCompletenessTaskDefinition", + "type": "object", + "definitions": { + "qualityBand": { + "type": "object", + "properties": { + "name": { + "title": "Band Name", + "description": "Name for this quality band (e.g., 'gold', 'excellent', 'tier1')", + "type": "string" + }, + "minimumScore": { + "title": "Minimum Score", + "description": "Minimum completeness percentage for this band", + "type": "number", + "minimum": 0, + "maximum": 100 + } + }, + "required": ["name", "minimumScore"], + "additionalProperties": false + } + }, + "properties": { + "type": { + "type": "string", + "default": "automatedTask" + }, + "subType": { + "type": "string", + "default": "dataCompletenessTask" + }, + "name": { + "title": "Node Name", + "description": "Unique name that identifies this node in the workflow", + "$ref": "../../../../../type/basic.json#/definitions/entityName" + }, + "displayName": { + "title": "Display Name", + "description": "User-friendly display name for this node", + "type": "string" + }, + "description": { + "title": "Description", + "description": "Description of what this completeness check does", + "$ref": "../../../../../type/basic.json#/definitions/markdown" + }, + "config": { + "title": "Completeness Configuration", + "type": "object", + "properties": { + "fieldsToCheck": { + "title": "Fields to Check", + "description": "List of entity field paths to evaluate. Supports dot notation for nested fields (e.g., 'owner.name', 'columns[].description')", + "type": "array", + "items": { + "type": "string" + }, + "minItems": 1, + "examples": ["name", "description", "owner", "tags", "columns[].description"] + }, + "qualityBands": { + "title": "Quality Bands", + "description": "Define quality levels based on completeness scores. Bands are evaluated from highest to lowest score.", + "type": "array", + "items": { + "$ref": "#/definitions/qualityBand" + }, + "default": [ + {"name": "excellent", "minimumScore": 90}, + {"name": "good", "minimumScore": 75}, + {"name": "acceptable", "minimumScore": 50}, + {"name": "poor", "minimumScore": 0} + ], + "minItems": 1 + }, + "treatEmptyStringAsNull": { + "title": "Treat Empty String as Missing", + "description": "Consider empty strings ('') as missing values", + "type": "boolean", + "default": true + }, + "treatEmptyArrayAsNull": { + "title": "Treat Empty Array as Missing", + "description": "Consider empty arrays ([]) as missing values", + "type": "boolean", + "default": true + } + }, + "required": ["fieldsToCheck"], + "additionalProperties": false + }, + "input": { + "type": "array", + "items": { "type": "string" }, + "default": ["relatedEntity"], + "additionalItems": false, + "minItems": 1, + "maxItems": 1 + }, + "inputNamespaceMap": { + "type": "object", + "properties": { + "relatedEntity": { + "type": "string", + "default": "global" + } + }, + "additionalProperties": false, + "required": ["relatedEntity"] + }, + "output": { + "title": "Output Variables", + "description": "Variables this node outputs for use in subsequent nodes", + "type": "array", + "items": { "type": "string" }, + "default": ["completenessScore", "qualityBand", "filledFieldsCount", "totalFieldsCount", "missingFields", "filledFields", "result"], + "additionalItems": false + } + }, + "required": ["name", "config"], + "additionalProperties": false +} \ No newline at end of file