Provide better errors on DataInsights Pipeline (#17315)

This commit is contained in:
IceS2 2024-08-06 17:02:03 +02:00 committed by GitHub
parent f0fd643bd8
commit 0a8ea56c99
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 114 additions and 96 deletions

View File

@ -6,6 +6,7 @@ import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.getT
import es.org.elasticsearch.client.RestClient;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import lombok.Getter;
@ -119,9 +120,37 @@ public class DataInsightsApp extends AbstractNativeApplication {
backfill = Optional.empty();
}
processWebAnalytics(jobExecutionContext);
processCostAnalysis(jobExecutionContext);
processDataAssets(jobExecutionContext);
WorkflowStats webAnalyticsStats = processWebAnalytics();
updateJobStatsWithWorkflowStats(webAnalyticsStats);
WorkflowStats costAnalysisStats = processCostAnalysis();
updateJobStatsWithWorkflowStats(costAnalysisStats);
WorkflowStats dataAssetsStats = processDataAssets();
updateJobStatsWithWorkflowStats(dataAssetsStats);
if (webAnalyticsStats.hasFailed()
|| costAnalysisStats.hasFailed()
|| dataAssetsStats.hasFailed()) {
String errorMessage = "Errors Found:\n";
for (WorkflowStats stats : List.of(webAnalyticsStats, costAnalysisStats, dataAssetsStats)) {
if (stats.hasFailed()) {
errorMessage = String.format("%s\n %s\n", errorMessage, stats.getName());
for (String failure : stats.getFailures()) {
errorMessage = String.format("%s - %s\n", errorMessage, failure);
}
}
}
IndexingError indexingError =
new IndexingError()
.withErrorSource(IndexingError.ErrorSource.JOB)
.withMessage(errorMessage);
LOG.error(indexingError.getMessage());
jobData.setStatus(EventPublisherJob.Status.FAILED);
jobData.setFailure(indexingError);
}
updateJobStatus();
} catch (Exception ex) {
@ -130,7 +159,7 @@ public class DataInsightsApp extends AbstractNativeApplication {
.withErrorSource(IndexingError.ErrorSource.JOB)
.withMessage(
String.format(
"Reindexing Job Has Encountered an Exception. %n Job Data: %s, %n Stack : %s ",
"Data Insights Job Has Encountered an Exception. %n Job Data: %s, %n Stack : %s ",
jobData.toString(), ExceptionUtils.getStackTrace(ex)));
LOG.error(indexingError.getMessage());
jobData.setStatus(EventPublisherJob.Status.FAILED);
@ -144,101 +173,54 @@ public class DataInsightsApp extends AbstractNativeApplication {
timestamp = TimestampUtils.getStartOfDayTimestamp(System.currentTimeMillis());
}
private void processWebAnalytics(JobExecutionContext jobExecutionContext) {
private WorkflowStats processWebAnalytics() {
WebAnalyticsWorkflow workflow = new WebAnalyticsWorkflow(timestamp, batchSize, backfill);
WorkflowStats workflowStats = workflow.getWorkflowStats();
try {
workflow.process();
} catch (SearchIndexException ex) {
jobData.setStatus(EventPublisherJob.Status.FAILED);
jobData.setFailure(ex.getIndexingError());
} finally {
WorkflowStats workflowStats = workflow.getWorkflowStats();
for (Map.Entry<String, StepStats> entry : workflowStats.getWorkflowStepStats().entrySet()) {
String stepName = entry.getKey();
StepStats stats = entry.getValue();
updateStats(stepName, stats);
}
if (workflowStats.getFailures().isEmpty()) {
IndexingError indexingError =
new IndexingError()
.withErrorSource(IndexingError.ErrorSource.JOB)
.withMessage(
String.format(
"WebAnalytics Workflow Has encounter issues: %s",
workflowStats.getFailures()));
jobData.setStatus(EventPublisherJob.Status.FAILED);
jobData.setFailure(indexingError);
}
sendUpdates(jobExecutionContext);
}
return workflowStats;
}
private void processCostAnalysis(JobExecutionContext jobExecutionContext) {
// TODO: Actually implement Backfill
private WorkflowStats processCostAnalysis() {
CostAnalysisWorkflow workflow = new CostAnalysisWorkflow(timestamp, batchSize, backfill);
WorkflowStats workflowStats = workflow.getWorkflowStats();
try {
workflow.process();
} catch (SearchIndexException ex) {
jobData.setStatus(EventPublisherJob.Status.FAILED);
jobData.setFailure(ex.getIndexingError());
} finally {
WorkflowStats workflowStats = workflow.getWorkflowStats();
for (Map.Entry<String, StepStats> entry : workflowStats.getWorkflowStepStats().entrySet()) {
String stepName = entry.getKey();
StepStats stats = entry.getValue();
updateStats(stepName, stats);
}
if (workflowStats.getFailures().isEmpty()) {
IndexingError indexingError =
new IndexingError()
.withErrorSource(IndexingError.ErrorSource.JOB)
.withMessage(
String.format(
"CostAnalysis Workflow Has encounter issues: %s",
workflowStats.getFailures()));
jobData.setStatus(EventPublisherJob.Status.FAILED);
jobData.setFailure(indexingError);
}
sendUpdates(jobExecutionContext);
}
return workflowStats;
}
private void processDataAssets(JobExecutionContext jobExecutionContext) {
private WorkflowStats processDataAssets() {
DataAssetsWorkflow workflow =
new DataAssetsWorkflow(timestamp, batchSize, backfill, collectionDAO, searchRepository);
WorkflowStats workflowStats = workflow.getWorkflowStats();
try {
workflow.process();
} catch (SearchIndexException ex) {
jobData.setStatus(EventPublisherJob.Status.FAILED);
jobData.setFailure(ex.getIndexingError());
} finally {
WorkflowStats workflowStats = workflow.getWorkflowStats();
}
for (Map.Entry<String, StepStats> entry : workflowStats.getWorkflowStepStats().entrySet()) {
String stepName = entry.getKey();
StepStats stats = entry.getValue();
updateStats(stepName, stats);
}
return workflowStats;
}
if (workflowStats.getFailures().isEmpty()) {
IndexingError indexingError =
new IndexingError()
.withErrorSource(IndexingError.ErrorSource.JOB)
.withMessage(
String.format(
"DataAssets Workflow Has encounter issues: %s",
workflowStats.getFailures()));
jobData.setStatus(EventPublisherJob.Status.FAILED);
jobData.setFailure(indexingError);
}
sendUpdates(jobExecutionContext);
private void updateJobStatsWithWorkflowStats(WorkflowStats workflowStats) {
for (Map.Entry<String, StepStats> entry : workflowStats.getWorkflowStepStats().entrySet()) {
String stepName = entry.getKey();
StepStats stats = entry.getValue();
updateStats(stepName, stats);
}
}

View File

@ -10,10 +10,15 @@ import lombok.Getter;
import org.openmetadata.schema.system.StepStats;
public class WorkflowStats {
@Getter private final String name;
@Getter private List<String> failures = new ArrayList<>();
@Getter private StepStats workflowStats = new StepStats();
@Getter private final Map<String, StepStats> workflowStepStats = new HashMap<>();
public WorkflowStats(String name) {
this.name = name;
}
public void setWorkflowStatsTotalRecords(int totalRecords) {
workflowStats.setTotalRecords(totalRecords);
}

View File

@ -64,7 +64,7 @@ public class CostAnalysisWorkflow {
@Getter
private AggregatedCostAnalysisReportDataProcessor aggregatedCostAnalysisReportDataProcessor;
@Getter private final WorkflowStats workflowStats = new WorkflowStats();
@Getter private final WorkflowStats workflowStats = new WorkflowStats("CostAnalysisWorkflow");
public CostAnalysisWorkflow(
Long timestamp, int batchSize, Optional<DataInsightsApp.Backfill> backfill) {
@ -114,7 +114,7 @@ public class CostAnalysisWorkflow {
new PaginatedEntitiesSource(Entity.TABLE, batchSize, List.of("*"), filter)
.withName(
String.format(
"PaginatedEntitiesSource-%s", databaseService.getFullyQualifiedName())));
"[CostAnalysisWorkflow] %s", databaseService.getFullyQualifiedName())));
total +=
((TableRepository) Entity.getEntityRepository(Entity.TABLE))
.getDao()
@ -204,7 +204,8 @@ public class CostAnalysisWorkflow {
contextData.put(REPORT_DATA_TYPE_KEY, ReportData.ReportDataType.RAW_COST_ANALYSIS_REPORT_DATA);
CreateReportDataProcessor createReportdataProcessor =
new CreateReportDataProcessor(
rawCostAnalysisReportDataList.size(), "RawCostAnalysisReportDataProcessor");
rawCostAnalysisReportDataList.size(),
"[CostAnalysisWorkflow] Raw Cost Analysis Report Data Processor");
Optional<List<ReportData>> rawCostAnalysisReportData = Optional.empty();
@ -226,7 +227,8 @@ public class CostAnalysisWorkflow {
if (rawCostAnalysisReportData.isPresent()) {
ReportDataSink reportDataSink =
new ReportDataSink(
rawCostAnalysisReportData.get().size(), "RawCostAnalysisReportDataSink");
rawCostAnalysisReportData.get().size(),
"[CostAnalysisWorkflow] Raw Cost Analysis Report Data " + "Sink");
try {
reportDataSink.write(rawCostAnalysisReportData.get(), contextData);
} catch (SearchIndexException ex) {
@ -277,7 +279,7 @@ public class CostAnalysisWorkflow {
CreateReportDataProcessor createReportdataProcessor =
new CreateReportDataProcessor(
aggregatedCostAnalysisReportDataList.get().size(),
"AggregatedCostAnalysisReportDataProcessor");
"[CostAnalysisWorkflow] Aggregated Cost Analysis Report Data Processor");
Optional<List<ReportData>> aggregatedCostAnalysisReportData = Optional.empty();
try {
@ -300,7 +302,7 @@ public class CostAnalysisWorkflow {
ReportDataSink reportDataSink =
new ReportDataSink(
aggregatedCostAnalysisReportData.get().size(),
"AggregatedCostAnalysisReportDataSink");
"[CostAnalysisWorkflow] Aggregated Cost Analysis Report Data Sink");
try {
reportDataSink.write(aggregatedCostAnalysisReportData.get(), contextData);
} catch (SearchIndexException ex) {

View File

@ -22,7 +22,10 @@ public class AggregatedCostAnalysisReportDataAggregator
implements Processor<
List<AggregatedCostAnalysisReportData>,
Map<String, Map<String, Map<String, CostAnalysisWorkflow.AggregatedCostAnalysisData>>>> {
@Getter private final String name = "AggregatedCostAnlysisReportDataAggregator";
@Getter
private final String name =
"[CostAnalysisWorkflow] Aggregated Cost Anlysis Report Data Aggregator";
private final StepStats stats = new StepStats();
public AggregatedCostAnalysisReportDataAggregator(int total) {
@ -93,7 +96,9 @@ public class AggregatedCostAnalysisReportDataAggregator
.withSubmittedCount(input.size())
.withFailedCount(input.size())
.withSuccessCount(0)
.withMessage("Aggregated Cost Analysis Aggregator Encounter Failure.")
.withMessage(
String.format(
"Aggregated Cost Analysis Aggregator Encounter Failure: %s", e.getMessage()))
.withStackTrace(ExceptionUtils.exceptionStackTraceAsString(e));
LOG.debug(
"[AggregatedCostAnalysisAggregator] Failed. Details: {}", JsonUtils.pojoToJson(error));

View File

@ -271,7 +271,9 @@ public class AggregatedCostAnalysisReportDataProcessor
.withSubmittedCount(input.size())
.withFailedCount(input.size())
.withSuccessCount(0)
.withMessage("Aggregated Cost Analysis Processor Encounter Failure.")
.withMessage(
String.format(
"Aggregated Cost Analysis Processor Encounter Failure: %s", e.getMessage()))
.withStackTrace(ExceptionUtils.exceptionStackTraceAsString(e));
LOG.debug(
"[AggregatedCostAnalysisProcessor] Failed. Details: {}", JsonUtils.pojoToJson(error));

View File

@ -80,7 +80,9 @@ public class DatabaseServiceTablesProcessor
.withSubmittedCount(input.getData().size())
.withFailedCount(input.getData().size())
.withSuccessCount(0)
.withMessage("Database Service Tables Processor Encounter Failure.")
.withMessage(
String.format(
"Database Service Tables Processor Encounter Failure: %s", e.getMessage()))
.withStackTrace(ExceptionUtils.exceptionStackTraceAsString(e));
LOG.debug(
"[DatabaseServiceTAblesProcessor] Failed. Details: {}", JsonUtils.pojoToJson(error));

View File

@ -48,7 +48,9 @@ public class RawCostAnalysisReportDataProcessor
.withSubmittedCount(input.size())
.withFailedCount(input.size())
.withSuccessCount(0)
.withMessage("Raw Cost Analysis Processor Encounter Failure.")
.withMessage(
String.format(
"Raw Cost Analysis Processor Encounter Failure: %s", e.getMessage()))
.withStackTrace(ExceptionUtils.exceptionStackTraceAsString(e));
LOG.debug("[RawCostAnalysisProcessor] Failed. Details: {}", JsonUtils.pojoToJson(error));
updateStats(0, input.size());

View File

@ -67,7 +67,7 @@ public class DataAssetsWorkflow {
private DataInsightsEntityEnricherProcessor entityEnricher;
private Processor entityProcessor;
private Sink searchIndexSink;
@Getter private final WorkflowStats workflowStats = new WorkflowStats();
@Getter private final WorkflowStats workflowStats = new WorkflowStats("DataAssetsWorkflow");
public DataAssetsWorkflow(
Long timestamp,
@ -115,7 +115,7 @@ public class DataAssetsWorkflow {
List<String> fields = List.of("*");
PaginatedEntitiesSource source =
new PaginatedEntitiesSource(entityType, batchSize, fields)
.withName(String.format("PaginatedEntitiesSource-%s", entityType));
.withName(String.format("[DataAssetsWorkflow] %s", entityType));
sources.add(source);
});

View File

@ -64,7 +64,8 @@ public class DataInsightsEntityEnricherProcessor
.withSubmittedCount(input.getData().size())
.withFailedCount(input.getData().size())
.withSuccessCount(0)
.withMessage("Entities Enricher Encountered Failure.")
.withMessage(
String.format("Entities Enricher Encountered Failure: %s", e.getMessage()))
.withStackTrace(ExceptionUtils.exceptionStackTraceAsString(e));
LOG.debug(
"[DataInsightsEntityEnricherProcessor] Failed. Details: {}", JsonUtils.pojoToJson(error));

View File

@ -53,7 +53,7 @@ public class WebAnalyticsWorkflow {
Long lastSession) {}
;
@Getter private final WorkflowStats workflowStats = new WorkflowStats();
@Getter private final WorkflowStats workflowStats = new WorkflowStats("WebAnalyticsWorkflow");
public static final String USER_ACTIVITY_DATA_KEY = "userActivityData";
public static final String USER_ACTIVITY_REPORT_DATA_KEY = "userActivityReportData";
public static final String ENTITY_VIEW_REPORT_DATA_KEY = "entityViewReportData";
@ -199,7 +199,8 @@ public class WebAnalyticsWorkflow {
REPORT_DATA_TYPE_KEY, ReportData.ReportDataType.WEB_ANALYTIC_ENTITY_VIEW_REPORT_DATA);
CreateReportDataProcessor createReportDataProcessor =
new CreateReportDataProcessor(
entityViewReportData.values().size(), "EntityViewReportDataProcessor");
entityViewReportData.values().size(),
"[WebAnalyticsWorkflow] Entity View Report Data Processor");
Optional<List<ReportData>> entityViewReportDataList = Optional.empty();
@ -220,7 +221,9 @@ public class WebAnalyticsWorkflow {
// Sink EntityView ReportData
if (entityViewReportDataList.isPresent()) {
ReportDataSink reportDataSink =
new ReportDataSink(entityViewReportDataList.get().size(), "EntityViewReportDataSink");
new ReportDataSink(
entityViewReportDataList.get().size(),
"[WebAnalyticsWorkflow] Entity View Report Data Sink");
try {
reportDataSink.write(entityViewReportDataList.get(), contextData);
@ -262,7 +265,8 @@ public class WebAnalyticsWorkflow {
CreateReportDataProcessor createReportdataProcessor =
new CreateReportDataProcessor(
userActivityReportData.values().size(), "UserActivityReportDataProcessor");
userActivityReportData.values().size(),
"[WebAnalyticsWorkflow] User Activity Report Data Processor");
Optional<List<ReportData>> userActivityReportDataList = Optional.empty();
// Process UserActivity ReportData
@ -284,7 +288,9 @@ public class WebAnalyticsWorkflow {
if (userActivityReportDataList.isPresent()) {
ReportDataSink reportDataSink =
new ReportDataSink(userActivityReportDataList.get().size(), "UserActivityReportDataSink");
new ReportDataSink(
userActivityReportDataList.get().size(),
"[WebAnalyticsWorkflow] User Activity Report Data Sink");
try {
reportDataSink.write(userActivityReportDataList.get(), contextData);
} catch (SearchIndexException ex) {

View File

@ -66,7 +66,9 @@ public class WebAnalyticsEntityViewProcessor
.withSubmittedCount(input.getData().size())
.withFailedCount(input.getData().size())
.withSuccessCount(0)
.withMessage("WebAnalytics Entity View Processor Encounter Failure.")
.withMessage(
String.format(
"WebAnalytics Entity View Processor Encounter Failure: %s", e.getMessage()))
.withStackTrace(ExceptionUtils.exceptionStackTraceAsString(e));
LOG.debug(
"[WebAnalyticsEntityViewProcessor] Failed. Details: {}", JsonUtils.pojoToJson(error));

View File

@ -24,7 +24,7 @@ public class WebAnalyticsUserActivityAggregator
implements Processor<
Map<UUID, WebAnalyticUserActivityReportData>,
Map<UUID, WebAnalyticsWorkflow.UserActivityData>> {
@Getter private final String name = "WebAnalyticsUserActivityAggregator";
@Getter private final String name = "[WebAnalyticsWorkflow] User Activity Aggregator";
private final StepStats stats = new StepStats();
public WebAnalyticsUserActivityAggregator(int total) {
@ -48,7 +48,10 @@ public class WebAnalyticsUserActivityAggregator
.withSubmittedCount(input.size())
.withFailedCount(input.size())
.withSuccessCount(0)
.withMessage("Web Analytics User Activity Aggregator Encounter Failure.")
.withMessage(
String.format(
"Web Analytics User Activity Aggregator Encounter Failure: %s",
e.getMessage()))
.withStackTrace(ExceptionUtils.exceptionStackTraceAsString(e));
LOG.debug(
"[WebAnalyticsUserActivityAggregator] Failed. Details: {}", JsonUtils.pojoToJson(error));

View File

@ -49,7 +49,9 @@ public class WebAnalyticsUserActivityProcessor
.withSubmittedCount(input.getData().size())
.withFailedCount(input.getData().size())
.withSuccessCount(0)
.withMessage("WebAnalytics User Activity Processor Encounter Failure.")
.withMessage(
String.format(
"WebAnalytics User Activity Processor Encounter Failure: %s", e.getMessage()))
.withStackTrace(ExceptionUtils.exceptionStackTraceAsString(e));
LOG.debug(
"[WebAnalyticsUserActivityProcessor] Failed. Details: {}", JsonUtils.pojoToJson(error));

View File

@ -15,6 +15,7 @@ import org.openmetadata.schema.analytics.type.WebAnalyticEventType;
import org.openmetadata.schema.system.IndexingError;
import org.openmetadata.schema.system.StepStats;
import org.openmetadata.service.Entity;
import org.openmetadata.service.apps.bundles.insights.utils.TimestampUtils;
import org.openmetadata.service.exception.SearchIndexException;
import org.openmetadata.service.jdbi3.WebAnalyticEventRepository;
import org.openmetadata.service.util.RestUtil;
@ -46,7 +47,10 @@ public class PaginatedWebAnalyticEventDataSource
this.batchSize = batchSize;
this.startTs = startTs;
this.endTs = endTs;
this.name = String.format("PaginatedWebAnalyticEventDataSource-%s-%s", startTs, endTs);
this.name =
String.format(
"[WebAnalyticsWorkflow] Event Data Source %s",
TimestampUtils.timestampToString(startTs, "YYYY-MM-dd"));
this.totalRecords = repository.listWebAnalyticEventDataCount(eventType, startTs, endTs, false);
this.stats.withTotalRecords(totalRecords).withSuccessRecords(0).withFailedRecords(0);
}