From 0a8ea56c9932abc01c3453a7835c661a16c17eae Mon Sep 17 00:00:00 2001 From: IceS2 Date: Tue, 6 Aug 2024 17:02:03 +0200 Subject: [PATCH] Provide better errors on DataInsights Pipeline (#17315) --- .../bundles/insights/DataInsightsApp.java | 126 ++++++++---------- .../insights/workflows/WorkflowStats.java | 5 + .../costAnalysis/CostAnalysisWorkflow.java | 14 +- ...gatedCostAnalysisReportDataAggregator.java | 9 +- ...egatedCostAnalysisReportDataProcessor.java | 4 +- .../DatabaseServiceTablesProcessor.java | 4 +- .../RawCostAnalysisReportDataProcessor.java | 4 +- .../dataAssets/DataAssetsWorkflow.java | 4 +- .../DataInsightsEntityEnricherProcessor.java | 3 +- .../webAnalytics/WebAnalyticsWorkflow.java | 16 ++- .../WebAnalyticsEntityViewProcessor.java | 4 +- .../WebAnalyticsUserActivityAggregator.java | 7 +- .../WebAnalyticsUserActivityProcessor.java | 4 +- .../PaginatedWebAnalyticEventDataSource.java | 6 +- 14 files changed, 114 insertions(+), 96 deletions(-) diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/DataInsightsApp.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/DataInsightsApp.java index 0fec327ca9b..45edcb70c8f 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/DataInsightsApp.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/DataInsightsApp.java @@ -6,6 +6,7 @@ import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.getT import es.org.elasticsearch.client.RestClient; import java.io.IOException; +import java.util.List; import java.util.Map; import java.util.Optional; import lombok.Getter; @@ -119,9 +120,37 @@ public class DataInsightsApp extends AbstractNativeApplication { backfill = Optional.empty(); } - processWebAnalytics(jobExecutionContext); - processCostAnalysis(jobExecutionContext); - processDataAssets(jobExecutionContext); + WorkflowStats webAnalyticsStats = processWebAnalytics(); + updateJobStatsWithWorkflowStats(webAnalyticsStats); + + WorkflowStats costAnalysisStats = processCostAnalysis(); + updateJobStatsWithWorkflowStats(costAnalysisStats); + + WorkflowStats dataAssetsStats = processDataAssets(); + updateJobStatsWithWorkflowStats(dataAssetsStats); + + if (webAnalyticsStats.hasFailed() + || costAnalysisStats.hasFailed() + || dataAssetsStats.hasFailed()) { + String errorMessage = "Errors Found:\n"; + + for (WorkflowStats stats : List.of(webAnalyticsStats, costAnalysisStats, dataAssetsStats)) { + if (stats.hasFailed()) { + errorMessage = String.format("%s\n %s\n", errorMessage, stats.getName()); + for (String failure : stats.getFailures()) { + errorMessage = String.format("%s - %s\n", errorMessage, failure); + } + } + } + + IndexingError indexingError = + new IndexingError() + .withErrorSource(IndexingError.ErrorSource.JOB) + .withMessage(errorMessage); + LOG.error(indexingError.getMessage()); + jobData.setStatus(EventPublisherJob.Status.FAILED); + jobData.setFailure(indexingError); + } updateJobStatus(); } catch (Exception ex) { @@ -130,7 +159,7 @@ public class DataInsightsApp extends AbstractNativeApplication { .withErrorSource(IndexingError.ErrorSource.JOB) .withMessage( String.format( - "Reindexing Job Has Encountered an Exception. %n Job Data: %s, %n Stack : %s ", + "Data Insights Job Has Encountered an Exception. %n Job Data: %s, %n Stack : %s ", jobData.toString(), ExceptionUtils.getStackTrace(ex))); LOG.error(indexingError.getMessage()); jobData.setStatus(EventPublisherJob.Status.FAILED); @@ -144,101 +173,54 @@ public class DataInsightsApp extends AbstractNativeApplication { timestamp = TimestampUtils.getStartOfDayTimestamp(System.currentTimeMillis()); } - private void processWebAnalytics(JobExecutionContext jobExecutionContext) { + private WorkflowStats processWebAnalytics() { WebAnalyticsWorkflow workflow = new WebAnalyticsWorkflow(timestamp, batchSize, backfill); + WorkflowStats workflowStats = workflow.getWorkflowStats(); + try { workflow.process(); } catch (SearchIndexException ex) { jobData.setStatus(EventPublisherJob.Status.FAILED); jobData.setFailure(ex.getIndexingError()); - } finally { - WorkflowStats workflowStats = workflow.getWorkflowStats(); - - for (Map.Entry entry : workflowStats.getWorkflowStepStats().entrySet()) { - String stepName = entry.getKey(); - StepStats stats = entry.getValue(); - updateStats(stepName, stats); - } - - if (workflowStats.getFailures().isEmpty()) { - IndexingError indexingError = - new IndexingError() - .withErrorSource(IndexingError.ErrorSource.JOB) - .withMessage( - String.format( - "WebAnalytics Workflow Has encounter issues: %s", - workflowStats.getFailures())); - jobData.setStatus(EventPublisherJob.Status.FAILED); - jobData.setFailure(indexingError); - } - - sendUpdates(jobExecutionContext); } + + return workflowStats; } - private void processCostAnalysis(JobExecutionContext jobExecutionContext) { - // TODO: Actually implement Backfill + private WorkflowStats processCostAnalysis() { CostAnalysisWorkflow workflow = new CostAnalysisWorkflow(timestamp, batchSize, backfill); + WorkflowStats workflowStats = workflow.getWorkflowStats(); + try { workflow.process(); } catch (SearchIndexException ex) { jobData.setStatus(EventPublisherJob.Status.FAILED); jobData.setFailure(ex.getIndexingError()); - } finally { - WorkflowStats workflowStats = workflow.getWorkflowStats(); - - for (Map.Entry entry : workflowStats.getWorkflowStepStats().entrySet()) { - String stepName = entry.getKey(); - StepStats stats = entry.getValue(); - updateStats(stepName, stats); - } - - if (workflowStats.getFailures().isEmpty()) { - IndexingError indexingError = - new IndexingError() - .withErrorSource(IndexingError.ErrorSource.JOB) - .withMessage( - String.format( - "CostAnalysis Workflow Has encounter issues: %s", - workflowStats.getFailures())); - jobData.setStatus(EventPublisherJob.Status.FAILED); - jobData.setFailure(indexingError); - } - - sendUpdates(jobExecutionContext); } + + return workflowStats; } - private void processDataAssets(JobExecutionContext jobExecutionContext) { + private WorkflowStats processDataAssets() { DataAssetsWorkflow workflow = new DataAssetsWorkflow(timestamp, batchSize, backfill, collectionDAO, searchRepository); + WorkflowStats workflowStats = workflow.getWorkflowStats(); + try { workflow.process(); } catch (SearchIndexException ex) { jobData.setStatus(EventPublisherJob.Status.FAILED); jobData.setFailure(ex.getIndexingError()); - } finally { - WorkflowStats workflowStats = workflow.getWorkflowStats(); + } - for (Map.Entry entry : workflowStats.getWorkflowStepStats().entrySet()) { - String stepName = entry.getKey(); - StepStats stats = entry.getValue(); - updateStats(stepName, stats); - } + return workflowStats; + } - if (workflowStats.getFailures().isEmpty()) { - IndexingError indexingError = - new IndexingError() - .withErrorSource(IndexingError.ErrorSource.JOB) - .withMessage( - String.format( - "DataAssets Workflow Has encounter issues: %s", - workflowStats.getFailures())); - jobData.setStatus(EventPublisherJob.Status.FAILED); - jobData.setFailure(indexingError); - } - - sendUpdates(jobExecutionContext); + private void updateJobStatsWithWorkflowStats(WorkflowStats workflowStats) { + for (Map.Entry entry : workflowStats.getWorkflowStepStats().entrySet()) { + String stepName = entry.getKey(); + StepStats stats = entry.getValue(); + updateStats(stepName, stats); } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/WorkflowStats.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/WorkflowStats.java index 68dbe0ce983..a75b23d8a27 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/WorkflowStats.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/WorkflowStats.java @@ -10,10 +10,15 @@ import lombok.Getter; import org.openmetadata.schema.system.StepStats; public class WorkflowStats { + @Getter private final String name; @Getter private List failures = new ArrayList<>(); @Getter private StepStats workflowStats = new StepStats(); @Getter private final Map workflowStepStats = new HashMap<>(); + public WorkflowStats(String name) { + this.name = name; + } + public void setWorkflowStatsTotalRecords(int totalRecords) { workflowStats.setTotalRecords(totalRecords); } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/costAnalysis/CostAnalysisWorkflow.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/costAnalysis/CostAnalysisWorkflow.java index b06b26467df..37ddf138168 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/costAnalysis/CostAnalysisWorkflow.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/costAnalysis/CostAnalysisWorkflow.java @@ -64,7 +64,7 @@ public class CostAnalysisWorkflow { @Getter private AggregatedCostAnalysisReportDataProcessor aggregatedCostAnalysisReportDataProcessor; - @Getter private final WorkflowStats workflowStats = new WorkflowStats(); + @Getter private final WorkflowStats workflowStats = new WorkflowStats("CostAnalysisWorkflow"); public CostAnalysisWorkflow( Long timestamp, int batchSize, Optional backfill) { @@ -114,7 +114,7 @@ public class CostAnalysisWorkflow { new PaginatedEntitiesSource(Entity.TABLE, batchSize, List.of("*"), filter) .withName( String.format( - "PaginatedEntitiesSource-%s", databaseService.getFullyQualifiedName()))); + "[CostAnalysisWorkflow] %s", databaseService.getFullyQualifiedName()))); total += ((TableRepository) Entity.getEntityRepository(Entity.TABLE)) .getDao() @@ -204,7 +204,8 @@ public class CostAnalysisWorkflow { contextData.put(REPORT_DATA_TYPE_KEY, ReportData.ReportDataType.RAW_COST_ANALYSIS_REPORT_DATA); CreateReportDataProcessor createReportdataProcessor = new CreateReportDataProcessor( - rawCostAnalysisReportDataList.size(), "RawCostAnalysisReportDataProcessor"); + rawCostAnalysisReportDataList.size(), + "[CostAnalysisWorkflow] Raw Cost Analysis Report Data Processor"); Optional> rawCostAnalysisReportData = Optional.empty(); @@ -226,7 +227,8 @@ public class CostAnalysisWorkflow { if (rawCostAnalysisReportData.isPresent()) { ReportDataSink reportDataSink = new ReportDataSink( - rawCostAnalysisReportData.get().size(), "RawCostAnalysisReportDataSink"); + rawCostAnalysisReportData.get().size(), + "[CostAnalysisWorkflow] Raw Cost Analysis Report Data " + "Sink"); try { reportDataSink.write(rawCostAnalysisReportData.get(), contextData); } catch (SearchIndexException ex) { @@ -277,7 +279,7 @@ public class CostAnalysisWorkflow { CreateReportDataProcessor createReportdataProcessor = new CreateReportDataProcessor( aggregatedCostAnalysisReportDataList.get().size(), - "AggregatedCostAnalysisReportDataProcessor"); + "[CostAnalysisWorkflow] Aggregated Cost Analysis Report Data Processor"); Optional> aggregatedCostAnalysisReportData = Optional.empty(); try { @@ -300,7 +302,7 @@ public class CostAnalysisWorkflow { ReportDataSink reportDataSink = new ReportDataSink( aggregatedCostAnalysisReportData.get().size(), - "AggregatedCostAnalysisReportDataSink"); + "[CostAnalysisWorkflow] Aggregated Cost Analysis Report Data Sink"); try { reportDataSink.write(aggregatedCostAnalysisReportData.get(), contextData); } catch (SearchIndexException ex) { diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/costAnalysis/processors/AggregatedCostAnalysisReportDataAggregator.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/costAnalysis/processors/AggregatedCostAnalysisReportDataAggregator.java index 3d02011b8ab..ba657e014a6 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/costAnalysis/processors/AggregatedCostAnalysisReportDataAggregator.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/costAnalysis/processors/AggregatedCostAnalysisReportDataAggregator.java @@ -22,7 +22,10 @@ public class AggregatedCostAnalysisReportDataAggregator implements Processor< List, Map>>> { - @Getter private final String name = "AggregatedCostAnlysisReportDataAggregator"; + @Getter + private final String name = + "[CostAnalysisWorkflow] Aggregated Cost Anlysis Report Data Aggregator"; + private final StepStats stats = new StepStats(); public AggregatedCostAnalysisReportDataAggregator(int total) { @@ -93,7 +96,9 @@ public class AggregatedCostAnalysisReportDataAggregator .withSubmittedCount(input.size()) .withFailedCount(input.size()) .withSuccessCount(0) - .withMessage("Aggregated Cost Analysis Aggregator Encounter Failure.") + .withMessage( + String.format( + "Aggregated Cost Analysis Aggregator Encounter Failure: %s", e.getMessage())) .withStackTrace(ExceptionUtils.exceptionStackTraceAsString(e)); LOG.debug( "[AggregatedCostAnalysisAggregator] Failed. Details: {}", JsonUtils.pojoToJson(error)); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/costAnalysis/processors/AggregatedCostAnalysisReportDataProcessor.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/costAnalysis/processors/AggregatedCostAnalysisReportDataProcessor.java index 0ed886076ef..e8485c1d3b1 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/costAnalysis/processors/AggregatedCostAnalysisReportDataProcessor.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/costAnalysis/processors/AggregatedCostAnalysisReportDataProcessor.java @@ -271,7 +271,9 @@ public class AggregatedCostAnalysisReportDataProcessor .withSubmittedCount(input.size()) .withFailedCount(input.size()) .withSuccessCount(0) - .withMessage("Aggregated Cost Analysis Processor Encounter Failure.") + .withMessage( + String.format( + "Aggregated Cost Analysis Processor Encounter Failure: %s", e.getMessage())) .withStackTrace(ExceptionUtils.exceptionStackTraceAsString(e)); LOG.debug( "[AggregatedCostAnalysisProcessor] Failed. Details: {}", JsonUtils.pojoToJson(error)); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/costAnalysis/processors/DatabaseServiceTablesProcessor.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/costAnalysis/processors/DatabaseServiceTablesProcessor.java index f1e1b928a67..c58195938cd 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/costAnalysis/processors/DatabaseServiceTablesProcessor.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/costAnalysis/processors/DatabaseServiceTablesProcessor.java @@ -80,7 +80,9 @@ public class DatabaseServiceTablesProcessor .withSubmittedCount(input.getData().size()) .withFailedCount(input.getData().size()) .withSuccessCount(0) - .withMessage("Database Service Tables Processor Encounter Failure.") + .withMessage( + String.format( + "Database Service Tables Processor Encounter Failure: %s", e.getMessage())) .withStackTrace(ExceptionUtils.exceptionStackTraceAsString(e)); LOG.debug( "[DatabaseServiceTAblesProcessor] Failed. Details: {}", JsonUtils.pojoToJson(error)); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/costAnalysis/processors/RawCostAnalysisReportDataProcessor.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/costAnalysis/processors/RawCostAnalysisReportDataProcessor.java index cebc83e7376..9161faa6948 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/costAnalysis/processors/RawCostAnalysisReportDataProcessor.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/costAnalysis/processors/RawCostAnalysisReportDataProcessor.java @@ -48,7 +48,9 @@ public class RawCostAnalysisReportDataProcessor .withSubmittedCount(input.size()) .withFailedCount(input.size()) .withSuccessCount(0) - .withMessage("Raw Cost Analysis Processor Encounter Failure.") + .withMessage( + String.format( + "Raw Cost Analysis Processor Encounter Failure: %s", e.getMessage())) .withStackTrace(ExceptionUtils.exceptionStackTraceAsString(e)); LOG.debug("[RawCostAnalysisProcessor] Failed. Details: {}", JsonUtils.pojoToJson(error)); updateStats(0, input.size()); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/dataAssets/DataAssetsWorkflow.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/dataAssets/DataAssetsWorkflow.java index d6e25b0bee2..328ceba39a6 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/dataAssets/DataAssetsWorkflow.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/dataAssets/DataAssetsWorkflow.java @@ -67,7 +67,7 @@ public class DataAssetsWorkflow { private DataInsightsEntityEnricherProcessor entityEnricher; private Processor entityProcessor; private Sink searchIndexSink; - @Getter private final WorkflowStats workflowStats = new WorkflowStats(); + @Getter private final WorkflowStats workflowStats = new WorkflowStats("DataAssetsWorkflow"); public DataAssetsWorkflow( Long timestamp, @@ -115,7 +115,7 @@ public class DataAssetsWorkflow { List fields = List.of("*"); PaginatedEntitiesSource source = new PaginatedEntitiesSource(entityType, batchSize, fields) - .withName(String.format("PaginatedEntitiesSource-%s", entityType)); + .withName(String.format("[DataAssetsWorkflow] %s", entityType)); sources.add(source); }); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/dataAssets/processors/DataInsightsEntityEnricherProcessor.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/dataAssets/processors/DataInsightsEntityEnricherProcessor.java index 90697ddfac1..741070345b2 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/dataAssets/processors/DataInsightsEntityEnricherProcessor.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/dataAssets/processors/DataInsightsEntityEnricherProcessor.java @@ -64,7 +64,8 @@ public class DataInsightsEntityEnricherProcessor .withSubmittedCount(input.getData().size()) .withFailedCount(input.getData().size()) .withSuccessCount(0) - .withMessage("Entities Enricher Encountered Failure.") + .withMessage( + String.format("Entities Enricher Encountered Failure: %s", e.getMessage())) .withStackTrace(ExceptionUtils.exceptionStackTraceAsString(e)); LOG.debug( "[DataInsightsEntityEnricherProcessor] Failed. Details: {}", JsonUtils.pojoToJson(error)); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/webAnalytics/WebAnalyticsWorkflow.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/webAnalytics/WebAnalyticsWorkflow.java index 929476fc847..12e6f91c1dd 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/webAnalytics/WebAnalyticsWorkflow.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/webAnalytics/WebAnalyticsWorkflow.java @@ -53,7 +53,7 @@ public class WebAnalyticsWorkflow { Long lastSession) {} ; - @Getter private final WorkflowStats workflowStats = new WorkflowStats(); + @Getter private final WorkflowStats workflowStats = new WorkflowStats("WebAnalyticsWorkflow"); public static final String USER_ACTIVITY_DATA_KEY = "userActivityData"; public static final String USER_ACTIVITY_REPORT_DATA_KEY = "userActivityReportData"; public static final String ENTITY_VIEW_REPORT_DATA_KEY = "entityViewReportData"; @@ -199,7 +199,8 @@ public class WebAnalyticsWorkflow { REPORT_DATA_TYPE_KEY, ReportData.ReportDataType.WEB_ANALYTIC_ENTITY_VIEW_REPORT_DATA); CreateReportDataProcessor createReportDataProcessor = new CreateReportDataProcessor( - entityViewReportData.values().size(), "EntityViewReportDataProcessor"); + entityViewReportData.values().size(), + "[WebAnalyticsWorkflow] Entity View Report Data Processor"); Optional> entityViewReportDataList = Optional.empty(); @@ -220,7 +221,9 @@ public class WebAnalyticsWorkflow { // Sink EntityView ReportData if (entityViewReportDataList.isPresent()) { ReportDataSink reportDataSink = - new ReportDataSink(entityViewReportDataList.get().size(), "EntityViewReportDataSink"); + new ReportDataSink( + entityViewReportDataList.get().size(), + "[WebAnalyticsWorkflow] Entity View Report Data Sink"); try { reportDataSink.write(entityViewReportDataList.get(), contextData); @@ -262,7 +265,8 @@ public class WebAnalyticsWorkflow { CreateReportDataProcessor createReportdataProcessor = new CreateReportDataProcessor( - userActivityReportData.values().size(), "UserActivityReportDataProcessor"); + userActivityReportData.values().size(), + "[WebAnalyticsWorkflow] User Activity Report Data Processor"); Optional> userActivityReportDataList = Optional.empty(); // Process UserActivity ReportData @@ -284,7 +288,9 @@ public class WebAnalyticsWorkflow { if (userActivityReportDataList.isPresent()) { ReportDataSink reportDataSink = - new ReportDataSink(userActivityReportDataList.get().size(), "UserActivityReportDataSink"); + new ReportDataSink( + userActivityReportDataList.get().size(), + "[WebAnalyticsWorkflow] User Activity Report Data Sink"); try { reportDataSink.write(userActivityReportDataList.get(), contextData); } catch (SearchIndexException ex) { diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/webAnalytics/processors/WebAnalyticsEntityViewProcessor.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/webAnalytics/processors/WebAnalyticsEntityViewProcessor.java index 46397b33259..69c65535e97 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/webAnalytics/processors/WebAnalyticsEntityViewProcessor.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/webAnalytics/processors/WebAnalyticsEntityViewProcessor.java @@ -66,7 +66,9 @@ public class WebAnalyticsEntityViewProcessor .withSubmittedCount(input.getData().size()) .withFailedCount(input.getData().size()) .withSuccessCount(0) - .withMessage("WebAnalytics Entity View Processor Encounter Failure.") + .withMessage( + String.format( + "WebAnalytics Entity View Processor Encounter Failure: %s", e.getMessage())) .withStackTrace(ExceptionUtils.exceptionStackTraceAsString(e)); LOG.debug( "[WebAnalyticsEntityViewProcessor] Failed. Details: {}", JsonUtils.pojoToJson(error)); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/webAnalytics/processors/WebAnalyticsUserActivityAggregator.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/webAnalytics/processors/WebAnalyticsUserActivityAggregator.java index 2ab346ccb3f..b1ea0dfb8f3 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/webAnalytics/processors/WebAnalyticsUserActivityAggregator.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/webAnalytics/processors/WebAnalyticsUserActivityAggregator.java @@ -24,7 +24,7 @@ public class WebAnalyticsUserActivityAggregator implements Processor< Map, Map> { - @Getter private final String name = "WebAnalyticsUserActivityAggregator"; + @Getter private final String name = "[WebAnalyticsWorkflow] User Activity Aggregator"; private final StepStats stats = new StepStats(); public WebAnalyticsUserActivityAggregator(int total) { @@ -48,7 +48,10 @@ public class WebAnalyticsUserActivityAggregator .withSubmittedCount(input.size()) .withFailedCount(input.size()) .withSuccessCount(0) - .withMessage("Web Analytics User Activity Aggregator Encounter Failure.") + .withMessage( + String.format( + "Web Analytics User Activity Aggregator Encounter Failure: %s", + e.getMessage())) .withStackTrace(ExceptionUtils.exceptionStackTraceAsString(e)); LOG.debug( "[WebAnalyticsUserActivityAggregator] Failed. Details: {}", JsonUtils.pojoToJson(error)); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/webAnalytics/processors/WebAnalyticsUserActivityProcessor.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/webAnalytics/processors/WebAnalyticsUserActivityProcessor.java index 627d9d44a91..cde1423ef43 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/webAnalytics/processors/WebAnalyticsUserActivityProcessor.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/webAnalytics/processors/WebAnalyticsUserActivityProcessor.java @@ -49,7 +49,9 @@ public class WebAnalyticsUserActivityProcessor .withSubmittedCount(input.getData().size()) .withFailedCount(input.getData().size()) .withSuccessCount(0) - .withMessage("WebAnalytics User Activity Processor Encounter Failure.") + .withMessage( + String.format( + "WebAnalytics User Activity Processor Encounter Failure: %s", e.getMessage())) .withStackTrace(ExceptionUtils.exceptionStackTraceAsString(e)); LOG.debug( "[WebAnalyticsUserActivityProcessor] Failed. Details: {}", JsonUtils.pojoToJson(error)); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/webAnalytics/sources/PaginatedWebAnalyticEventDataSource.java b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/webAnalytics/sources/PaginatedWebAnalyticEventDataSource.java index e5bdf5c6464..049a73468cb 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/webAnalytics/sources/PaginatedWebAnalyticEventDataSource.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/insights/workflows/webAnalytics/sources/PaginatedWebAnalyticEventDataSource.java @@ -15,6 +15,7 @@ import org.openmetadata.schema.analytics.type.WebAnalyticEventType; import org.openmetadata.schema.system.IndexingError; import org.openmetadata.schema.system.StepStats; import org.openmetadata.service.Entity; +import org.openmetadata.service.apps.bundles.insights.utils.TimestampUtils; import org.openmetadata.service.exception.SearchIndexException; import org.openmetadata.service.jdbi3.WebAnalyticEventRepository; import org.openmetadata.service.util.RestUtil; @@ -46,7 +47,10 @@ public class PaginatedWebAnalyticEventDataSource this.batchSize = batchSize; this.startTs = startTs; this.endTs = endTs; - this.name = String.format("PaginatedWebAnalyticEventDataSource-%s-%s", startTs, endTs); + this.name = + String.format( + "[WebAnalyticsWorkflow] Event Data Source %s", + TimestampUtils.timestampToString(startTs, "YYYY-MM-dd")); this.totalRecords = repository.listWebAnalyticEventDataCount(eventType, startTs, endTs, false); this.stats.withTotalRecords(totalRecords).withSuccessRecords(0).withFailedRecords(0); }