Logs added for Search Indexing and Stats issue fixed (#13956)

* Logs added for Search Indexing and Stats issue fixed

* Fix uninstall error

* Add error handling

* fix lint

* Push Job Level Exception on top

* disable flaky tests

* Fix Logs not visible in Search

---------

Co-authored-by: ulixius9 <mayursingal9@gmail.com>
This commit is contained in:
Mohit Yadav 2023-11-13 23:39:56 +05:30 committed by GitHub
parent 7bc4c7cb57
commit 10d8ec84fe
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 357 additions and 296 deletions

View File

@ -112,7 +112,7 @@ class MetadataElasticsearchSource(Source):
total_retries_count = 3 total_retries_count = 3
current_try = 1 current_try = 1
while status not in {Status.COMPLETED, Status.FAILED, Status.STOPPED}: while status not in {Status.completed, Status.failed, Status.stopped}:
sleep(5) sleep(5)
job = self.metadata.get_reindex_job_status(model_str(self.reindex_job.id)) job = self.metadata.get_reindex_job_status(model_str(self.reindex_job.id))
if job and job.stats and job.stats.jobStats: if job and job.stats and job.stats.jobStats:

View File

@ -37,9 +37,11 @@ class TestGitHubReader(TestCase):
self.assertEqual(reader.auth_headers, {"Authorization": "Bearer token"}) self.assertEqual(reader.auth_headers, {"Authorization": "Bearer token"})
def test_read(self): def x_test_read(self):
""" """
We can read the OM README We can read the OM README
disabling this test as it is flakey and fails with error rate limit exceeded
""" """
creds = GitHubCredentials( creds = GitHubCredentials(
repositoryName="OpenMetadata", repositoryName="OpenMetadata",

View File

@ -35,11 +35,13 @@ class TestLookMLGitHubReader(TestCase):
reader = GitHubReader(creds) reader = GitHubReader(creds)
parser = LkmlParser(reader) parser = LkmlParser(reader)
def test_lookml_read_and_parse(self): def x_test_lookml_read_and_parse(self):
""" """
We can parse the explore file. We can parse the explore file.
We'll expand and find views from https://github.com/open-metadata/lookml-sample/blob/main/cats.explore.lkml We'll expand and find views from https://github.com/open-metadata/lookml-sample/blob/main/cats.explore.lkml
disabling this test as it is flakey and fails with error rate limit exceeded
""" """
explore_file = "cats.explore.lkml" explore_file = "cats.explore.lkml"

View File

@ -1,6 +1,7 @@
package org.openmetadata.service.apps; package org.openmetadata.service.apps;
import static com.cronutils.model.CronType.QUARTZ; import static com.cronutils.model.CronType.QUARTZ;
import static org.openmetadata.service.apps.scheduler.AbstractOmAppJobListener.JOB_LISTENER_NAME;
import static org.openmetadata.service.apps.scheduler.AppScheduler.APP_INFO_KEY; import static org.openmetadata.service.apps.scheduler.AppScheduler.APP_INFO_KEY;
import static org.openmetadata.service.apps.scheduler.AppScheduler.COLLECTION_DAO_KEY; import static org.openmetadata.service.apps.scheduler.AppScheduler.COLLECTION_DAO_KEY;
import static org.openmetadata.service.apps.scheduler.AppScheduler.SEARCH_CLIENT_KEY; import static org.openmetadata.service.apps.scheduler.AppScheduler.SEARCH_CLIENT_KEY;
@ -13,10 +14,12 @@ import com.cronutils.model.definition.CronDefinitionBuilder;
import com.cronutils.parser.CronParser; import com.cronutils.parser.CronParser;
import java.util.List; import java.util.List;
import lombok.Getter; import lombok.Getter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.openmetadata.schema.AppRuntime; import org.openmetadata.schema.AppRuntime;
import org.openmetadata.schema.api.services.ingestionPipelines.CreateIngestionPipeline; import org.openmetadata.schema.api.services.ingestionPipelines.CreateIngestionPipeline;
import org.openmetadata.schema.entity.app.App; import org.openmetadata.schema.entity.app.App;
import org.openmetadata.schema.entity.app.AppRunRecord;
import org.openmetadata.schema.entity.app.AppType; import org.openmetadata.schema.entity.app.AppType;
import org.openmetadata.schema.entity.app.ExternalAppIngestionConfig; import org.openmetadata.schema.entity.app.ExternalAppIngestionConfig;
import org.openmetadata.schema.entity.app.ScheduleType; import org.openmetadata.schema.entity.app.ScheduleType;
@ -29,6 +32,7 @@ import org.openmetadata.schema.type.ProviderType;
import org.openmetadata.schema.type.Relationship; import org.openmetadata.schema.type.Relationship;
import org.openmetadata.service.Entity; import org.openmetadata.service.Entity;
import org.openmetadata.service.apps.scheduler.AppScheduler; import org.openmetadata.service.apps.scheduler.AppScheduler;
import org.openmetadata.service.apps.scheduler.OmAppJobListener;
import org.openmetadata.service.exception.EntityNotFoundException; import org.openmetadata.service.exception.EntityNotFoundException;
import org.openmetadata.service.jdbi3.CollectionDAO; import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.jdbi3.EntityRepository; import org.openmetadata.service.jdbi3.EntityRepository;
@ -39,6 +43,7 @@ import org.openmetadata.service.util.JsonUtils;
import org.openmetadata.service.util.OpenMetadataConnectionBuilder; import org.openmetadata.service.util.OpenMetadataConnectionBuilder;
import org.quartz.JobExecutionContext; import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException; import org.quartz.JobExecutionException;
import org.quartz.SchedulerException;
@Slf4j @Slf4j
public class AbstractNativeApplication implements NativeApplication { public class AbstractNativeApplication implements NativeApplication {
@ -203,4 +208,20 @@ public class AbstractNativeApplication implements NativeApplication {
.withLoggerLevel(create.getLoggerLevel()) .withLoggerLevel(create.getLoggerLevel())
.withService(create.getService()); .withService(create.getService());
} }
private OmAppJobListener getJobListener(JobExecutionContext jobExecutionContext) throws SchedulerException {
return (OmAppJobListener) jobExecutionContext.getScheduler().getListenerManager().getJobListener(JOB_LISTENER_NAME);
}
@SneakyThrows
protected AppRunRecord getJobRecord(JobExecutionContext jobExecutionContext) {
OmAppJobListener listener = getJobListener(jobExecutionContext);
return listener.getAppRunRecordForJob(jobExecutionContext);
}
@SneakyThrows
protected void pushAppStausUpdates(JobExecutionContext jobExecutionContext, AppRunRecord record, boolean update) {
OmAppJobListener listener = getJobListener(jobExecutionContext);
listener.pushApplicationStatusUpdates(jobExecutionContext, record, update);
}
} }

View File

@ -21,10 +21,12 @@ import org.openmetadata.common.utils.CommonUtil;
import org.openmetadata.schema.EntityInterface; import org.openmetadata.schema.EntityInterface;
import org.openmetadata.schema.analytics.ReportData; import org.openmetadata.schema.analytics.ReportData;
import org.openmetadata.schema.entity.app.App; import org.openmetadata.schema.entity.app.App;
import org.openmetadata.schema.entity.app.AppRunRecord;
import org.openmetadata.schema.entity.app.FailureContext;
import org.openmetadata.schema.entity.app.SuccessContext;
import org.openmetadata.schema.service.configuration.elasticsearch.ElasticSearchConfiguration; import org.openmetadata.schema.service.configuration.elasticsearch.ElasticSearchConfiguration;
import org.openmetadata.schema.system.EventPublisherJob; import org.openmetadata.schema.system.EventPublisherJob;
import org.openmetadata.schema.system.Failure; import org.openmetadata.schema.system.Failure;
import org.openmetadata.schema.system.FailureDetails;
import org.openmetadata.schema.system.Stats; import org.openmetadata.schema.system.Stats;
import org.openmetadata.schema.system.StepStats; import org.openmetadata.schema.system.StepStats;
import org.openmetadata.service.apps.AbstractNativeApplication; import org.openmetadata.service.apps.AbstractNativeApplication;
@ -52,8 +54,8 @@ import org.quartz.JobExecutionContext;
@Slf4j @Slf4j
public class SearchIndexApp extends AbstractNativeApplication { public class SearchIndexApp extends AbstractNativeApplication {
private static final String ENTITY_TYPE_ERROR_MSG = "EntityType: %s %n Cause: %s %n Stack: %s"; private static final String ENTITY_TYPE_ERROR_MSG = "EntityType: %s %n Cause: %s %n Stack: %s";
private List<PaginatedEntitiesSource> paginatedEntitiesSources = new ArrayList<>(); private final List<PaginatedEntitiesSource> paginatedEntitiesSources = new ArrayList<>();
private List<PaginatedDataInsightSource> paginatedDataInsightSources = new ArrayList<>(); private final List<PaginatedDataInsightSource> paginatedDataInsightSources = new ArrayList<>();
private Processor entityProcessor; private Processor entityProcessor;
private Processor dataInsightProcessor; private Processor dataInsightProcessor;
private Sink searchIndexSink; private Sink searchIndexSink;
@ -70,7 +72,11 @@ public class SearchIndexApp extends AbstractNativeApplication {
JsonUtils.convertValue(app.getAppConfiguration(), EventPublisherJob.class) JsonUtils.convertValue(app.getAppConfiguration(), EventPublisherJob.class)
.withStats(new Stats()) .withStats(new Stats())
.withFailure(new Failure()); .withFailure(new Failure());
int totalRecords = getTotalRequestToProcess(request.getEntities(), collectionDAO);
this.jobData = request; this.jobData = request;
this.jobData.setStats(
new Stats()
.withJobStats(new StepStats().withTotalRecords(totalRecords).withFailedRecords(0).withSuccessRecords(0)));
request request
.getEntities() .getEntities()
.forEach( .forEach(
@ -89,13 +95,13 @@ public class SearchIndexApp extends AbstractNativeApplication {
} }
}); });
if (searchRepository.getSearchType().equals(ElasticSearchConfiguration.SearchType.OPENSEARCH)) { if (searchRepository.getSearchType().equals(ElasticSearchConfiguration.SearchType.OPENSEARCH)) {
this.entityProcessor = new OpenSearchEntitiesProcessor(); this.entityProcessor = new OpenSearchEntitiesProcessor(totalRecords);
this.dataInsightProcessor = new OpenSearchDataInsightProcessor(); this.dataInsightProcessor = new OpenSearchDataInsightProcessor(totalRecords);
this.searchIndexSink = new OpenSearchIndexSink(searchRepository); this.searchIndexSink = new OpenSearchIndexSink(searchRepository, totalRecords);
} else { } else {
this.entityProcessor = new ElasticSearchEntitiesProcessor(); this.entityProcessor = new ElasticSearchEntitiesProcessor(totalRecords);
this.dataInsightProcessor = new ElasticSearchDataInsightProcessor(); this.dataInsightProcessor = new ElasticSearchDataInsightProcessor(totalRecords);
this.searchIndexSink = new ElasticSearchIndexSink(searchRepository); this.searchIndexSink = new ElasticSearchIndexSink(searchRepository, totalRecords);
} }
} }
@ -121,11 +127,33 @@ public class SearchIndexApp extends AbstractNativeApplication {
} finally { } finally {
// store job details in Database // store job details in Database
jobExecutionContext.getJobDetail().getJobDataMap().put(APP_RUN_STATS, jobData.getStats()); jobExecutionContext.getJobDetail().getJobDataMap().put(APP_RUN_STATS, jobData.getStats());
// Update Record to db
updateRecordToDb(jobExecutionContext);
// Send update // Send update
sendUpdates(); sendUpdates();
} }
} }
public void updateRecordToDb(JobExecutionContext jobExecutionContext) {
AppRunRecord appRecord = getJobRecord(jobExecutionContext);
// Update Run Record with Status
appRecord.setStatus(AppRunRecord.Status.fromValue(jobData.getStatus().value()));
// Update Error
if (jobData.getFailure() != null) {
appRecord.setFailureContext(
new FailureContext().withAdditionalProperty("failure", JsonUtils.pojoToJson(jobData.getFailure())));
}
// Update Stats
if (jobData.getStats() != null) {
appRecord.setSuccessContext(new SuccessContext().withAdditionalProperty("stats", jobData.getStats()));
}
pushAppStausUpdates(jobExecutionContext, appRecord, true);
}
private void entitiesReIndex() { private void entitiesReIndex() {
Map<String, Object> contextData = new HashMap<>(); Map<String, Object> contextData = new HashMap<>();
for (PaginatedEntitiesSource paginatedEntitiesSource : paginatedEntitiesSources) { for (PaginatedEntitiesSource paginatedEntitiesSource : paginatedEntitiesSources) {
@ -134,12 +162,8 @@ public class SearchIndexApp extends AbstractNativeApplication {
ResultList<? extends EntityInterface> resultList; ResultList<? extends EntityInterface> resultList;
while (!stopped && !paginatedEntitiesSource.isDone()) { while (!stopped && !paginatedEntitiesSource.isDone()) {
long currentTime = System.currentTimeMillis(); long currentTime = System.currentTimeMillis();
int requestToProcess = jobData.getBatchSize();
int failed = requestToProcess;
int success = 0;
try { try {
resultList = paginatedEntitiesSource.readNext(null); resultList = paginatedEntitiesSource.readNext(null);
requestToProcess = resultList.getData().size() + resultList.getErrors().size();
if (!resultList.getData().isEmpty()) { if (!resultList.getData().isEmpty()) {
if (searchRepository.getSearchType().equals(ElasticSearchConfiguration.SearchType.OPENSEARCH)) { if (searchRepository.getSearchType().equals(ElasticSearchConfiguration.SearchType.OPENSEARCH)) {
// process data to build Reindex Request // process data to build Reindex Request
@ -150,8 +174,6 @@ public class SearchIndexApp extends AbstractNativeApplication {
(os.org.opensearch.action.bulk.BulkResponse) searchIndexSink.write(requests, contextData); (os.org.opensearch.action.bulk.BulkResponse) searchIndexSink.write(requests, contextData);
// update Status // update Status
handleErrorsOs(resultList, paginatedEntitiesSource.getLastFailedCursor(), response, currentTime); handleErrorsOs(resultList, paginatedEntitiesSource.getLastFailedCursor(), response, currentTime);
// Update stats
success = searchRepository.getSearchClient().getSuccessFromBulkResponse(response);
} else { } else {
// process data to build Reindex Request // process data to build Reindex Request
BulkRequest requests = (BulkRequest) entityProcessor.process(resultList, contextData); BulkRequest requests = (BulkRequest) entityProcessor.process(resultList, contextData);
@ -159,13 +181,17 @@ public class SearchIndexApp extends AbstractNativeApplication {
BulkResponse response = (BulkResponse) searchIndexSink.write(requests, contextData); BulkResponse response = (BulkResponse) searchIndexSink.write(requests, contextData);
// update Status // update Status
handleErrorsEs(resultList, paginatedEntitiesSource.getLastFailedCursor(), response, currentTime); handleErrorsEs(resultList, paginatedEntitiesSource.getLastFailedCursor(), response, currentTime);
// Update stats
success = searchRepository.getSearchClient().getSuccessFromBulkResponse(response);
} }
failed = requestToProcess - success;
} else {
failed = 0;
} }
} catch (SourceException rx) {
handleSourceError(
rx.getMessage(),
String.format(
ENTITY_TYPE_ERROR_MSG,
paginatedEntitiesSource.getEntityType(),
rx.getCause(),
ExceptionUtils.getStackTrace(rx)),
currentTime);
} catch (ProcessorException px) { } catch (ProcessorException px) {
handleProcessorError( handleProcessorError(
px.getMessage(), px.getMessage(),
@ -184,16 +210,10 @@ public class SearchIndexApp extends AbstractNativeApplication {
wx.getCause(), wx.getCause(),
ExceptionUtils.getStackTrace(wx)), ExceptionUtils.getStackTrace(wx)),
currentTime); currentTime);
} finally {
updateStats(
success,
failed,
paginatedEntitiesSource.getStats(),
entityProcessor.getStats(),
searchIndexSink.getStats());
sendUpdates();
} }
} }
updateStats(paginatedEntitiesSource.getEntityType(), paginatedEntitiesSource.getStats());
sendUpdates();
} }
} }
@ -205,12 +225,8 @@ public class SearchIndexApp extends AbstractNativeApplication {
ResultList<ReportData> resultList; ResultList<ReportData> resultList;
while (!stopped && !paginatedDataInsightSource.isDone()) { while (!stopped && !paginatedDataInsightSource.isDone()) {
long currentTime = System.currentTimeMillis(); long currentTime = System.currentTimeMillis();
int requestToProcess = jobData.getBatchSize();
int failed = requestToProcess;
int success = 0;
try { try {
resultList = paginatedDataInsightSource.readNext(null); resultList = paginatedDataInsightSource.readNext(null);
requestToProcess = resultList.getData().size() + resultList.getErrors().size();
if (!resultList.getData().isEmpty()) { if (!resultList.getData().isEmpty()) {
if (searchRepository.getSearchType().equals(ElasticSearchConfiguration.SearchType.OPENSEARCH)) { if (searchRepository.getSearchType().equals(ElasticSearchConfiguration.SearchType.OPENSEARCH)) {
// process data to build Reindex Request // process data to build Reindex Request
@ -220,20 +236,13 @@ public class SearchIndexApp extends AbstractNativeApplication {
os.org.opensearch.action.bulk.BulkResponse response = os.org.opensearch.action.bulk.BulkResponse response =
(os.org.opensearch.action.bulk.BulkResponse) searchIndexSink.write(requests, contextData); (os.org.opensearch.action.bulk.BulkResponse) searchIndexSink.write(requests, contextData);
handleErrorsOs(resultList, "", response, currentTime); handleErrorsOs(resultList, "", response, currentTime);
// Update stats
success = searchRepository.getSearchClient().getSuccessFromBulkResponse(response);
} else { } else {
// process data to build Reindex Request // process data to build Reindex Request
BulkRequest requests = (BulkRequest) dataInsightProcessor.process(resultList, contextData); BulkRequest requests = (BulkRequest) dataInsightProcessor.process(resultList, contextData);
// process data to build Reindex Request // process data to build Reindex Request
BulkResponse response = (BulkResponse) searchIndexSink.write(requests, contextData); BulkResponse response = (BulkResponse) searchIndexSink.write(requests, contextData);
handleErrorsEs(resultList, "", response, currentTime); handleErrorsEs(resultList, "", response, currentTime);
// Update stats
success = searchRepository.getSearchClient().getSuccessFromBulkResponse(response);
} }
failed = requestToProcess - success;
} else {
failed = 0;
} }
} catch (SourceException rx) { } catch (SourceException rx) {
handleSourceError( handleSourceError(
@ -262,16 +271,10 @@ public class SearchIndexApp extends AbstractNativeApplication {
wx.getCause(), wx.getCause(),
ExceptionUtils.getStackTrace(wx)), ExceptionUtils.getStackTrace(wx)),
currentTime); currentTime);
} finally {
updateStats(
success,
failed,
paginatedDataInsightSource.getStats(),
dataInsightProcessor.getStats(),
searchIndexSink.getStats());
sendUpdates();
} }
} }
updateStats(paginatedDataInsightSource.getEntityType(), paginatedDataInsightSource.getStats());
sendUpdates();
} }
} }
@ -284,41 +287,31 @@ public class SearchIndexApp extends AbstractNativeApplication {
} }
} }
public void updateStats( public void updateStats(String entityType, StepStats currentEntityStats) {
int currentSuccess, int currentFailed, StepStats reader, StepStats processor, StepStats writer) {
// Job Level Stats // Job Level Stats
Stats jobDataStats = jobData.getStats(); Stats jobDataStats = jobData.getStats();
// Update Entity Level Stats
StepStats entityLevelStats = jobDataStats.getEntityStats();
if (entityLevelStats == null) {
entityLevelStats = new StepStats();
}
entityLevelStats.withAdditionalProperty(entityType, currentEntityStats);
// Total Stats // Total Stats
StepStats stats = jobData.getStats().getJobStats(); StepStats stats = jobData.getStats().getJobStats();
if (stats == null) { if (stats == null) {
stats = new StepStats().withTotalRecords(getTotalRequestToProcess(jobData.getEntities(), collectionDAO)); stats = new StepStats().withTotalRecords(getTotalRequestToProcess(jobData.getEntities(), collectionDAO));
} }
getUpdatedStats(stats, currentSuccess, currentFailed); getUpdatedStats(stats, currentEntityStats.getSuccessRecords(), currentEntityStats.getFailedRecords());
// Update for the Job // Update for the Job
jobDataStats.setJobStats(stats); jobDataStats.setJobStats(stats);
// Source Stats jobDataStats.setEntityStats(entityLevelStats);
jobDataStats.setSourceStats(getTotalStatsTillCurrentRun(jobDataStats.getSourceStats(), reader));
// Processor
jobDataStats.setProcessorStats(processor);
// Writer
jobDataStats.setSinkStats(writer);
jobData.setStats(jobDataStats); jobData.setStats(jobDataStats);
} }
private StepStats getTotalStatsTillCurrentRun(StepStats sourceStat, StepStats newInputStat) {
if (sourceStat == null) {
sourceStat = new StepStats();
}
sourceStat.setTotalRecords(sourceStat.getTotalRecords() + newInputStat.getTotalRecords());
sourceStat.setProcessedRecords(sourceStat.getProcessedRecords() + newInputStat.getProcessedRecords());
sourceStat.setSuccessRecords(sourceStat.getSuccessRecords() + newInputStat.getSuccessRecords());
sourceStat.setFailedRecords(sourceStat.getFailedRecords() + newInputStat.getFailedRecords());
return sourceStat;
}
private void reCreateIndexes(String entityType) { private void reCreateIndexes(String entityType) {
if (Boolean.FALSE.equals(jobData.getRecreateIndex())) { if (Boolean.FALSE.equals(jobData.getRecreateIndex())) {
return; return;
@ -343,62 +336,63 @@ public class SearchIndexApp extends AbstractNativeApplication {
} }
private void handleSourceError(String context, String reason, long time) { private void handleSourceError(String context, String reason, long time) {
Failure failures = getFailure(); handleError("source", context, reason, time);
FailureDetails readerFailures = getFailureDetails(context, reason, time);
failures.setSourceError(readerFailures);
jobData.setFailure(failures);
} }
private void handleProcessorError(String context, String reason, long time) { private void handleProcessorError(String context, String reason, long time) {
Failure failures = getFailure(); handleError("processor", context, reason, time);
FailureDetails processorError = getFailureDetails(context, reason, time); }
failures.setProcessorError(processorError);
private void handleError(String errType, String context, String reason, long time) {
Failure failures = jobData.getFailure() != null ? jobData.getFailure() : new Failure();
failures.withAdditionalProperty("errorFrom", errType);
failures.withAdditionalProperty("context", context);
failures.withAdditionalProperty("lastFailedReason", reason);
failures.withAdditionalProperty("lastFailedAt", time);
jobData.setFailure(failures); jobData.setFailure(failures);
} }
private void handleEsSinkError(String context, String reason, long time) { private void handleEsSinkError(String context, String reason, long time) {
Failure failures = getFailure(); handleError("sink", context, reason, time);
FailureDetails writerFailure = getFailureDetails(context, reason, time);
failures.setSinkError(writerFailure);
jobData.setFailure(failures);
} }
private void handleJobError(String context, String reason, long time) { private void handleJobError(String context, String reason, long time) {
Failure failures = getFailure(); handleError("job", context, reason, time);
FailureDetails jobFailure = getFailureDetails(context, reason, time);
failures.setJobError(jobFailure);
jobData.setFailure(failures);
} }
@SneakyThrows @SneakyThrows
private void handleSourceError(ResultList<?> data, String lastCursor, long time) { private void handleSourceError(ResultList<?> data, String lastCursor, long time) {
if (!data.getErrors().isEmpty()) { if (!data.getErrors().isEmpty()) {
StringBuilder builder = new StringBuilder();
for (String str : data.getErrors()) {
builder.append(str);
builder.append("%n");
}
handleSourceError( handleSourceError(
String.format("SourceContext: After Cursor : %s, Encountered Error While Reading Data.", lastCursor), String.format("SourceContext: After Cursor : %s, Encountered Error While Reading Data.", lastCursor),
String.format( String.format("Following Entities were not fetched Successfully : %s", builder),
"Following Entities were not fetched Successfully : %s", JsonUtils.pojoToJson(data.getErrors())),
time); time);
} }
} }
@SneakyThrows @SneakyThrows
private void handleOsSinkErrors(os.org.opensearch.action.bulk.BulkResponse response, long time) { private void handleOsSinkErrors(os.org.opensearch.action.bulk.BulkResponse response, long time) {
List<FailureDetails> details = new ArrayList<>(); List<Map<String, Object>> details = new ArrayList<>();
for (os.org.opensearch.action.bulk.BulkItemResponse bulkItemResponse : response) { for (os.org.opensearch.action.bulk.BulkItemResponse bulkItemResponse : response) {
if (bulkItemResponse.isFailed()) { if (bulkItemResponse.isFailed()) {
Map<String, Object> detailsMap = new HashMap<>();
os.org.opensearch.action.bulk.BulkItemResponse.Failure failure = bulkItemResponse.getFailure(); os.org.opensearch.action.bulk.BulkItemResponse.Failure failure = bulkItemResponse.getFailure();
FailureDetails esFailure = detailsMap.put(
new FailureDetails() "context",
.withContext( String.format(
String.format( "EsWriterContext: Encountered Error While Writing Data %n Entity %n ID : [%s] ", failure.getId()));
"EsWriterContext: Encountered Error While Writing Data %n Entity %n ID : [%s] ", detailsMap.put(
failure.getId())) "lastFailedReason",
.withLastFailedReason( String.format(
String.format( "Index Type: [%s], Reason: [%s] %n Trace : [%s]",
"Index Type: [%s], Reason: [%s] %n Trace : [%s]", failure.getIndex(), failure.getMessage(), ExceptionUtils.getStackTrace(failure.getCause())));
failure.getIndex(), failure.getMessage(), ExceptionUtils.getStackTrace(failure.getCause()))) detailsMap.put("lastFailedAt", System.currentTimeMillis());
.withLastFailedAt(System.currentTimeMillis()); details.add(detailsMap);
details.add(esFailure);
} }
} }
if (!details.isEmpty()) { if (!details.isEmpty()) {
@ -411,22 +405,23 @@ public class SearchIndexApp extends AbstractNativeApplication {
@SneakyThrows @SneakyThrows
private void handleEsSinkErrors(BulkResponse response, long time) { private void handleEsSinkErrors(BulkResponse response, long time) {
List<FailureDetails> details = new ArrayList<>();
List<Map<String, Object>> details = new ArrayList<>();
for (BulkItemResponse bulkItemResponse : response) { for (BulkItemResponse bulkItemResponse : response) {
if (bulkItemResponse.isFailed()) { if (bulkItemResponse.isFailed()) {
Map<String, Object> detailsMap = new HashMap<>();
BulkItemResponse.Failure failure = bulkItemResponse.getFailure(); BulkItemResponse.Failure failure = bulkItemResponse.getFailure();
FailureDetails esFailure = detailsMap.put(
new FailureDetails() "context",
.withContext( String.format(
String.format( "EsWriterContext: Encountered Error While Writing Data %n Entity %n ID : [%s] ", failure.getId()));
"EsWriterContext: Encountered Error While Writing Data %n Entity %n ID : [%s] ", detailsMap.put(
failure.getId())) "lastFailedReason",
.withLastFailedReason( String.format(
String.format( "Index Type: [%s], Reason: [%s] %n Trace : [%s]",
"Index Type: [%s], Reason: [%s] %n Trace : [%s]", failure.getIndex(), failure.getMessage(), ExceptionUtils.getStackTrace(failure.getCause())));
failure.getIndex(), failure.getMessage(), ExceptionUtils.getStackTrace(failure.getCause()))) detailsMap.put("lastFailedAt", System.currentTimeMillis());
.withLastFailedAt(System.currentTimeMillis()); details.add(detailsMap);
details.add(esFailure);
} }
} }
if (!details.isEmpty()) { if (!details.isEmpty()) {
@ -441,9 +436,7 @@ public class SearchIndexApp extends AbstractNativeApplication {
if (stopped) { if (stopped) {
jobData.setStatus(EventPublisherJob.Status.STOPPED); jobData.setStatus(EventPublisherJob.Status.STOPPED);
} else { } else {
if (jobData.getFailure().getSinkError() != null if (jobData.getFailure() != null && !jobData.getFailure().getAdditionalProperties().isEmpty()) {
|| jobData.getFailure().getSourceError() != null
|| jobData.getFailure().getProcessorError() != null) {
jobData.setStatus(EventPublisherJob.Status.FAILED); jobData.setStatus(EventPublisherJob.Status.FAILED);
} else { } else {
jobData.setStatus(EventPublisherJob.Status.COMPLETED); jobData.setStatus(EventPublisherJob.Status.COMPLETED);
@ -451,14 +444,6 @@ public class SearchIndexApp extends AbstractNativeApplication {
} }
} }
private Failure getFailure() {
return jobData.getFailure() != null ? jobData.getFailure() : new Failure();
}
private FailureDetails getFailureDetails(String context, String reason, long time) {
return new FailureDetails().withContext(context).withLastFailedReason(reason).withLastFailedAt(time);
}
public void stopJob() { public void stopJob() {
stopped = true; stopped = true;
} }

View File

@ -2,6 +2,7 @@ package org.openmetadata.service.apps.scheduler;
import static org.openmetadata.service.apps.scheduler.AppScheduler.APP_INFO_KEY; import static org.openmetadata.service.apps.scheduler.AppScheduler.APP_INFO_KEY;
import java.util.UUID;
import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.commons.lang.exception.ExceptionUtils;
import org.openmetadata.schema.entity.app.App; import org.openmetadata.schema.entity.app.App;
import org.openmetadata.schema.entity.app.AppRunRecord; import org.openmetadata.schema.entity.app.AppRunRecord;
@ -16,11 +17,10 @@ import org.quartz.JobExecutionException;
import org.quartz.JobListener; import org.quartz.JobListener;
public abstract class AbstractOmAppJobListener implements JobListener { public abstract class AbstractOmAppJobListener implements JobListener {
private CollectionDAO collectionDAO; private final CollectionDAO collectionDAO;
private static final String SCHEDULED_APP_RUN_EXTENSION = "AppScheduleRun"; private static final String SCHEDULED_APP_RUN_EXTENSION = "AppScheduleRun";
private static final String SCHEDULED_APP_RUN_RECORD_SCHEMA = "applicationRunRecord.json";
public static final String APP_RUN_STATS = "AppRunStats"; public static final String APP_RUN_STATS = "AppRunStats";
static final String JOB_LISTENER_NAME = "OM_JOB_LISTENER"; public static final String JOB_LISTENER_NAME = "OM_JOB_LISTENER";
protected AbstractOmAppJobListener(CollectionDAO dao) { protected AbstractOmAppJobListener(CollectionDAO dao) {
this.collectionDAO = dao; this.collectionDAO = dao;
@ -50,8 +50,8 @@ public abstract class AbstractOmAppJobListener implements JobListener {
// Put the Context in the Job Data Map // Put the Context in the Job Data Map
dataMap.put(SCHEDULED_APP_RUN_EXTENSION, runRecord); dataMap.put(SCHEDULED_APP_RUN_EXTENSION, runRecord);
// Run the Scheduled Run Record on the time series // Insert new Record Run
collectionDAO.appExtensionTimeSeriesDao().insert(JsonUtils.pojoToJson(runRecord)); pushApplicationStatusUpdates(jobExecutionContext, runRecord, false);
this.doJobToBeExecuted(jobExecutionContext); this.doJobToBeExecuted(jobExecutionContext);
} }
@ -67,27 +67,58 @@ public abstract class AbstractOmAppJobListener implements JobListener {
long endTime = System.currentTimeMillis(); long endTime = System.currentTimeMillis();
runRecord.withEndTime(endTime); runRecord.withEndTime(endTime);
boolean success = jobException == null; if (jobException == null
if (success) { && !(runRecord.getStatus() == AppRunRecord.Status.FAILED
|| runRecord.getStatus() == AppRunRecord.Status.ACTIVE_ERROR)) {
runRecord.withStatus(AppRunRecord.Status.SUCCESS); runRecord.withStatus(AppRunRecord.Status.SUCCESS);
SuccessContext context = new SuccessContext(); SuccessContext context = new SuccessContext();
if (runRecord.getSuccessContext() != null) {
context = runRecord.getSuccessContext();
}
context.getAdditionalProperties().put("stats", JsonUtils.getMap(jobStats)); context.getAdditionalProperties().put("stats", JsonUtils.getMap(jobStats));
runRecord.setSuccessContext(context); runRecord.setSuccessContext(context);
} else { } else {
runRecord.withStatus(AppRunRecord.Status.FAILED); runRecord.withStatus(AppRunRecord.Status.FAILED);
FailureContext context = new FailureContext(); FailureContext context = new FailureContext();
context.withAdditionalProperty("message", jobException.getMessage()); if (runRecord.getFailureContext() != null) {
context.withAdditionalProperty("stackTrace", ExceptionUtils.getStackTrace(jobException)); context = runRecord.getFailureContext();
}
if (jobException != null) {
context.withAdditionalProperty("message", jobException.getMessage());
context.withAdditionalProperty("jobStackTrace", ExceptionUtils.getStackTrace(jobException));
}
runRecord.setFailureContext(context); runRecord.setFailureContext(context);
} }
collectionDAO // Update App Run Record
.appExtensionTimeSeriesDao() pushApplicationStatusUpdates(jobExecutionContext, runRecord, true);
.update(runRecord.getAppId().toString(), JsonUtils.pojoToJson(runRecord), runRecord.getTimestamp());
this.doJobWasExecuted(jobExecutionContext, jobException); this.doJobWasExecuted(jobExecutionContext, jobException);
} }
public AppRunRecord getAppRunRecordForJob(JobExecutionContext context) {
JobDataMap dataMap = context.getJobDetail().getJobDataMap();
return (AppRunRecord) dataMap.get(SCHEDULED_APP_RUN_EXTENSION);
}
public void pushApplicationStatusUpdates(JobExecutionContext context, AppRunRecord runRecord, boolean update) {
JobDataMap dataMap = context.getJobDetail().getJobDataMap();
if (dataMap.containsKey(SCHEDULED_APP_RUN_EXTENSION)) {
App jobApp = (App) context.getJobDetail().getJobDataMap().get(APP_INFO_KEY);
updateStatus(jobApp.getId(), runRecord, update);
}
}
private void updateStatus(UUID appId, AppRunRecord record, boolean update) {
if (update) {
collectionDAO
.appExtensionTimeSeriesDao()
.update(appId.toString(), JsonUtils.pojoToJson(record), record.getTimestamp());
} else {
collectionDAO.appExtensionTimeSeriesDao().insert(JsonUtils.pojoToJson(record));
}
}
protected void doJobWasExecuted(JobExecutionContext jobExecutionContext, JobExecutionException jobException) {} protected void doJobWasExecuted(JobExecutionContext jobExecutionContext, JobExecutionException jobException) {}
protected void doJobToBeExecuted(JobExecutionContext jobExecutionContext) {} protected void doJobToBeExecuted(JobExecutionContext jobExecutionContext) {}

View File

@ -19,4 +19,12 @@ public class SourceException extends IOException {
public SourceException(String msg, Throwable throwable) { public SourceException(String msg, Throwable throwable) {
super(msg, throwable); super(msg, throwable);
} }
public SourceException(Throwable throwable) {
super(throwable);
}
public SourceException(String msg) {
super(msg);
}
} }

View File

@ -610,7 +610,7 @@ public abstract class EntityRepository<T extends EntityInterface> {
entities.add(withHref(uriInfo, entity)); entities.add(withHref(uriInfo, entity));
} catch (Exception e) { } catch (Exception e) {
LOG.error("Failed in Set Fields for Entity with Json : {}", json); LOG.error("Failed in Set Fields for Entity with Json : {}", json);
errors.add(json); errors.add(String.format("Error Message : %s , %n Entity Json : %s", e.getMessage(), json));
} }
} }
currentOffset = currentOffset + limitParam; currentOffset = currentOffset + limitParam;

View File

@ -553,11 +553,10 @@ public class AppResource extends EntityResource<App, AppRepository> {
@DefaultValue("false") @DefaultValue("false")
boolean hardDelete, boolean hardDelete,
@Parameter(description = "Name of the App", schema = @Schema(type = "string")) @PathParam("name") String name) { @Parameter(description = "Name of the App", schema = @Schema(type = "string")) @PathParam("name") String name) {
Response response = deleteByName(uriInfo, securityContext, name, true, hardDelete); App app = repository.getByName(null, name, repository.getFields("bot,pipelines"));
if (response.getStatus() == Response.Status.OK.getStatusCode()) { // Remove from Pipeline Service
deleteApp(securityContext, (App) response.getEntity(), hardDelete); deleteApp(securityContext, app, hardDelete);
} return deleteByName(uriInfo, securityContext, name, true, hardDelete);
return response;
} }
@DELETE @DELETE
@ -578,11 +577,11 @@ public class AppResource extends EntityResource<App, AppRepository> {
@DefaultValue("false") @DefaultValue("false")
boolean hardDelete, boolean hardDelete,
@Parameter(description = "Id of the App", schema = @Schema(type = "UUID")) @PathParam("id") UUID id) { @Parameter(description = "Id of the App", schema = @Schema(type = "UUID")) @PathParam("id") UUID id) {
Response response = delete(uriInfo, securityContext, id, true, hardDelete); App app = repository.get(null, id, repository.getFields("bot,pipelines"));
if (response.getStatus() == Response.Status.OK.getStatusCode()) { // Remove from Pipeline Service
deleteApp(securityContext, (App) response.getEntity(), hardDelete); deleteApp(securityContext, app, hardDelete);
} // Remove from repository
return response; return delete(uriInfo, securityContext, id, true, hardDelete);
} }
@PUT @PUT
@ -794,25 +793,29 @@ public class AppResource extends EntityResource<App, AppRepository> {
throw new InternalServerErrorException("Failed in Delete App from Scheduler."); throw new InternalServerErrorException("Failed in Delete App from Scheduler.");
} }
} else { } else {
App app = repository.getByName(null, installedApp.getName(), repository.getFields("bot,pipelines")); if (!nullOrEmpty(installedApp.getPipelines())) {
if (!nullOrEmpty(app.getPipelines())) { EntityReference pipelineRef = installedApp.getPipelines().get(0);
EntityReference pipelineRef = app.getPipelines().get(0);
IngestionPipelineRepository ingestionPipelineRepository = IngestionPipelineRepository ingestionPipelineRepository =
(IngestionPipelineRepository) Entity.getEntityRepository(Entity.INGESTION_PIPELINE); (IngestionPipelineRepository) Entity.getEntityRepository(Entity.INGESTION_PIPELINE);
IngestionPipeline ingestionPipeline = IngestionPipeline ingestionPipeline =
ingestionPipelineRepository.get( ingestionPipelineRepository.get(
null, pipelineRef.getId(), ingestionPipelineRepository.getFields(FIELD_OWNER)); null, pipelineRef.getId(), ingestionPipelineRepository.getFields(FIELD_OWNER));
try {
if (hardDelete) { if (hardDelete) {
// Remove the Pipeline in case of Delete // Remove the Pipeline in case of Delete
if (!nullOrEmpty(app.getPipelines())) { if (!nullOrEmpty(installedApp.getPipelines())) {
pipelineServiceClient.deletePipeline(ingestionPipeline); pipelineServiceClient.deletePipeline(ingestionPipeline);
}
} else {
// Just Kill Running ingestion
if (Boolean.TRUE.equals(ingestionPipeline.getDeployed())) {
decryptOrNullify(securityContext, ingestionPipeline, installedApp.getBot().getName(), true);
pipelineServiceClient.killIngestion(ingestionPipeline);
}
} }
} else { } catch (Exception ex) {
// Just Kill Running ingestion LOG.error("Failed in Pipeline Service Client : ", ex);
decryptOrNullify(securityContext, ingestionPipeline, app.getBot().getName(), true);
pipelineServiceClient.killIngestion(ingestionPipeline);
} }
} }
} }

View File

@ -1,8 +1,13 @@
package org.openmetadata.service.search; package org.openmetadata.service.search;
import static org.openmetadata.service.Entity.AGGREGATED_COST_ANALYSIS_REPORT_DATA;
import static org.openmetadata.service.Entity.ENTITY_REPORT_DATA;
import static org.openmetadata.service.Entity.FIELD_FOLLOWERS; import static org.openmetadata.service.Entity.FIELD_FOLLOWERS;
import static org.openmetadata.service.Entity.FIELD_USAGE_SUMMARY; import static org.openmetadata.service.Entity.FIELD_USAGE_SUMMARY;
import static org.openmetadata.service.Entity.QUERY; import static org.openmetadata.service.Entity.QUERY;
import static org.openmetadata.service.Entity.RAW_COST_ANALYSIS_REPORT_DATA;
import static org.openmetadata.service.Entity.WEB_ANALYTIC_ENTITY_VIEW_REPORT_DATA;
import static org.openmetadata.service.Entity.WEB_ANALYTIC_USER_ACTIVITY_REPORT_DATA;
import static org.openmetadata.service.search.SearchClient.DEFAULT_UPDATE_SCRIPT; import static org.openmetadata.service.search.SearchClient.DEFAULT_UPDATE_SCRIPT;
import static org.openmetadata.service.search.SearchClient.GLOBAL_SEARCH_ALIAS; import static org.openmetadata.service.search.SearchClient.GLOBAL_SEARCH_ALIAS;
import static org.openmetadata.service.search.SearchClient.PROPAGATE_ENTITY_REFERENCE_FIELD_SCRIPT; import static org.openmetadata.service.search.SearchClient.PROPAGATE_ENTITY_REFERENCE_FIELD_SCRIPT;
@ -69,11 +74,11 @@ public class SearchRepository {
public final List<String> dataInsightReports = public final List<String> dataInsightReports =
List.of( List.of(
"entityReportData", ENTITY_REPORT_DATA,
"webAnalyticEntityViewReportData", WEB_ANALYTIC_ENTITY_VIEW_REPORT_DATA,
"webAnalyticUserActivityReportData", WEB_ANALYTIC_USER_ACTIVITY_REPORT_DATA,
"rawCostAnalysisReportData", RAW_COST_ANALYSIS_REPORT_DATA,
"aggregatedCostAnalysisReportData"); AGGREGATED_COST_ANALYSIS_REPORT_DATA);
public static final String ELASTIC_SEARCH_EXTENSION = "service.eventPublisher"; public static final String ELASTIC_SEARCH_EXTENSION = "service.eventPublisher";
public static final String ELASTIC_SEARCH_ENTITY_FQN_STREAM = "eventPublisher:ElasticSearch:STREAM"; public static final String ELASTIC_SEARCH_ENTITY_FQN_STREAM = "eventPublisher:ElasticSearch:STREAM";

View File

@ -24,6 +24,10 @@ import org.openmetadata.service.workflows.interfaces.Processor;
public class ElasticSearchDataInsightProcessor implements Processor<BulkRequest, ResultList<ReportData>> { public class ElasticSearchDataInsightProcessor implements Processor<BulkRequest, ResultList<ReportData>> {
private final StepStats stats = new StepStats(); private final StepStats stats = new StepStats();
public ElasticSearchDataInsightProcessor(int total) {
this.stats.withTotalRecords(total).withSuccessRecords(0).withFailedRecords(0);
}
@Override @Override
public BulkRequest process(ResultList<ReportData> input, Map<String, Object> contextData) throws ProcessorException { public BulkRequest process(ResultList<ReportData> input, Map<String, Object> contextData) throws ProcessorException {
String entityType = (String) contextData.get(ENTITY_TYPE_KEY); String entityType = (String) contextData.get(ENTITY_TYPE_KEY);

View File

@ -25,6 +25,10 @@ import org.openmetadata.service.workflows.interfaces.Processor;
public class ElasticSearchEntitiesProcessor implements Processor<BulkRequest, ResultList<? extends EntityInterface>> { public class ElasticSearchEntitiesProcessor implements Processor<BulkRequest, ResultList<? extends EntityInterface>> {
private final StepStats stats = new StepStats(); private final StepStats stats = new StepStats();
public ElasticSearchEntitiesProcessor(int total) {
this.stats.withTotalRecords(total).withSuccessRecords(0).withFailedRecords(0);
}
@Override @Override
public BulkRequest process(ResultList<? extends EntityInterface> input, Map<String, Object> contextData) public BulkRequest process(ResultList<? extends EntityInterface> input, Map<String, Object> contextData)
throws ProcessorException { throws ProcessorException {

View File

@ -18,8 +18,9 @@ public class ElasticSearchIndexSink implements Sink<BulkRequest, BulkResponse> {
private final StepStats stats = new StepStats(); private final StepStats stats = new StepStats();
private final SearchRepository searchRepository; private final SearchRepository searchRepository;
public ElasticSearchIndexSink(SearchRepository searchRepository) { public ElasticSearchIndexSink(SearchRepository searchRepository, int total) {
this.searchRepository = searchRepository; this.searchRepository = searchRepository;
this.stats.withTotalRecords(total).withSuccessRecords(0).withFailedRecords(0);
} }
@Override @Override

View File

@ -24,6 +24,10 @@ import os.org.opensearch.common.xcontent.XContentType;
public class OpenSearchDataInsightProcessor implements Processor<BulkRequest, ResultList<ReportData>> { public class OpenSearchDataInsightProcessor implements Processor<BulkRequest, ResultList<ReportData>> {
private final StepStats stats = new StepStats(); private final StepStats stats = new StepStats();
public OpenSearchDataInsightProcessor(int total) {
this.stats.withTotalRecords(total).withSuccessRecords(0).withFailedRecords(0);
}
@Override @Override
public BulkRequest process(ResultList<ReportData> input, Map<String, Object> contextData) throws ProcessorException { public BulkRequest process(ResultList<ReportData> input, Map<String, Object> contextData) throws ProcessorException {
String entityType = (String) contextData.get(ENTITY_TYPE_KEY); String entityType = (String) contextData.get(ENTITY_TYPE_KEY);

View File

@ -25,6 +25,10 @@ import os.org.opensearch.common.xcontent.XContentType;
public class OpenSearchEntitiesProcessor implements Processor<BulkRequest, ResultList<? extends EntityInterface>> { public class OpenSearchEntitiesProcessor implements Processor<BulkRequest, ResultList<? extends EntityInterface>> {
private final StepStats stats = new StepStats(); private final StepStats stats = new StepStats();
public OpenSearchEntitiesProcessor(int total) {
this.stats.withTotalRecords(total).withSuccessRecords(0).withFailedRecords(0);
}
@Override @Override
public BulkRequest process(ResultList<? extends EntityInterface> input, Map<String, Object> contextData) public BulkRequest process(ResultList<? extends EntityInterface> input, Map<String, Object> contextData)
throws ProcessorException { throws ProcessorException {

View File

@ -18,8 +18,9 @@ public class OpenSearchIndexSink implements Sink<BulkRequest, BulkResponse> {
private final StepStats stats = new StepStats(); private final StepStats stats = new StepStats();
private final SearchRepository searchRepository; private final SearchRepository searchRepository;
public OpenSearchIndexSink(SearchRepository repository) { public OpenSearchIndexSink(SearchRepository repository, int total) {
this.searchRepository = repository; this.searchRepository = repository;
this.stats.withTotalRecords(total).withSuccessRecords(0).withFailedRecords(0);
} }
@Override @Override
@ -27,7 +28,6 @@ public class OpenSearchIndexSink implements Sink<BulkRequest, BulkResponse> {
LOG.debug("[EsSearchIndexSink] Processing a Batch of Size: {}", data.numberOfActions()); LOG.debug("[EsSearchIndexSink] Processing a Batch of Size: {}", data.numberOfActions());
try { try {
BulkResponse response = searchRepository.getSearchClient().bulk(data, RequestOptions.DEFAULT); BulkResponse response = searchRepository.getSearchClient().bulk(data, RequestOptions.DEFAULT);
// BulkResponse response = null;
int currentSuccess = getSuccessFromBulkResponse(response); int currentSuccess = getSuccessFromBulkResponse(response);
int currentFailed = response.getItems().length - currentSuccess; int currentFailed = response.getItems().length - currentSuccess;

View File

@ -13,11 +13,14 @@
package org.openmetadata.service.workflows.interfaces; package org.openmetadata.service.workflows.interfaces;
import java.util.List;
import java.util.Map; import java.util.Map;
import org.openmetadata.service.exception.SourceException; import org.openmetadata.service.exception.SourceException;
public interface Source<R> extends Stats { public interface Source<R> extends Stats {
R readNext(Map<String, Object> contextData) throws SourceException; R readNext(Map<String, Object> contextData) throws SourceException;
List<String> getReaderErrors();
void reset(); void reset();
} }

View File

@ -33,7 +33,8 @@ public class PaginatedDataInsightSource implements Source<ResultList<ReportData>
private final CollectionDAO dao; private final CollectionDAO dao;
@Getter private final String entityType; @Getter private final String entityType;
@Getter private final int batchSize; @Getter private final int batchSize;
private final StepStats stats = new StepStats(); @Getter private final List<String> readerErrors = new ArrayList<>();
@Getter private final StepStats stats = new StepStats();
private String cursor = null; private String cursor = null;
@Getter private boolean isDone = false; @Getter private boolean isDone = false;
@ -41,7 +42,10 @@ public class PaginatedDataInsightSource implements Source<ResultList<ReportData>
this.dao = dao; this.dao = dao;
this.entityType = entityType; this.entityType = entityType;
this.batchSize = batchSize; this.batchSize = batchSize;
stats.setTotalRecords(dao.reportDataTimeSeriesDao().listCount(entityType)); this.stats
.withTotalRecords(dao.reportDataTimeSeriesDao().listCount(entityType))
.withSuccessRecords(0)
.withFailedRecords(0);
} }
@Override @Override
@ -66,7 +70,7 @@ public class PaginatedDataInsightSource implements Source<ResultList<ReportData>
private ResultList<ReportData> read(String afterCursor) throws SourceException { private ResultList<ReportData> read(String afterCursor) throws SourceException {
LOG.debug("[DataInsightReader] Fetching a Batch of Size: {} ", batchSize); LOG.debug("[DataInsightReader] Fetching a Batch of Size: {} ", batchSize);
ResultList<ReportData> result; ResultList<ReportData> result = null;
try { try {
result = getReportDataPagination(entityType, batchSize, afterCursor); result = getReportDataPagination(entityType, batchSize, afterCursor);
LOG.debug( LOG.debug(
@ -76,14 +80,27 @@ public class PaginatedDataInsightSource implements Source<ResultList<ReportData>
0); 0);
updateStats(result.getData().size(), result.getErrors().size()); updateStats(result.getData().size(), result.getErrors().size());
} catch (Exception ex) { } catch (Exception ex) {
LOG.debug("[DataInsightReader] Batch Stats :- Submitted : {} Success: {} Failed: {}", batchSize, 0, batchSize); String errMsg =
if (stats.getTotalRecords() - stats.getProcessedRecords() <= batchSize) { String.format(
isDone = true; "[DataInsightReader] Failing Completely. Batch Stats :- Submitted : %s Success: %s Failed: %s",
updateStats(0, stats.getTotalRecords() - stats.getProcessedRecords()); batchSize, 0, batchSize);
LOG.debug(errMsg);
if (result != null) {
if (result.getPaging().getAfter() == null) {
isDone = true;
int recordToRead = stats.getTotalRecords() - (stats.getSuccessRecords() + stats.getFailedRecords());
updateStats(result.getData().size(), recordToRead - result.getData().size());
} else {
updateStats(result.getData().size(), batchSize - result.getData().size());
}
} else { } else {
updateStats(0, batchSize); updateStats(0, batchSize);
} }
throw new SourceException("[EntitiesReader] Batch encountered Exception. Failing Completely.", ex);
// Add the error to the list
readerErrors.add(errMsg);
throw new SourceException(errMsg, ex);
} }
return result; return result;
@ -117,9 +134,4 @@ public class PaginatedDataInsightSource implements Source<ResultList<ReportData>
public void updateStats(int currentSuccess, int currentFailed) { public void updateStats(int currentSuccess, int currentFailed) {
getUpdatedStats(stats, currentSuccess, currentFailed); getUpdatedStats(stats, currentSuccess, currentFailed);
} }
@Override
public StepStats getStats() {
return stats;
}
} }

View File

@ -15,6 +15,7 @@ package org.openmetadata.service.workflows.searchIndex;
import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.getUpdatedStats; import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.getUpdatedStats;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import lombok.Getter; import lombok.Getter;
@ -23,8 +24,10 @@ import org.openmetadata.schema.EntityInterface;
import org.openmetadata.schema.system.StepStats; import org.openmetadata.schema.system.StepStats;
import org.openmetadata.schema.type.Include; import org.openmetadata.schema.type.Include;
import org.openmetadata.service.Entity; import org.openmetadata.service.Entity;
import org.openmetadata.service.exception.SourceException;
import org.openmetadata.service.jdbi3.EntityRepository; import org.openmetadata.service.jdbi3.EntityRepository;
import org.openmetadata.service.jdbi3.ListFilter; import org.openmetadata.service.jdbi3.ListFilter;
import org.openmetadata.service.util.JsonUtils;
import org.openmetadata.service.util.RestUtil; import org.openmetadata.service.util.RestUtil;
import org.openmetadata.service.util.ResultList; import org.openmetadata.service.util.ResultList;
import org.openmetadata.service.workflows.interfaces.Source; import org.openmetadata.service.workflows.interfaces.Source;
@ -34,9 +37,9 @@ public class PaginatedEntitiesSource implements Source<ResultList<? extends Enti
@Getter private final int batchSize; @Getter private final int batchSize;
@Getter private final String entityType; @Getter private final String entityType;
@Getter private final List<String> fields; @Getter private final List<String> fields;
private final StepStats stats = new StepStats(); @Getter private final List<String> readerErrors = new ArrayList<>();
@Getter private final StepStats stats = new StepStats();
private String lastFailedCursor = null; private String lastFailedCursor = null;
private String cursor = RestUtil.encodeCursor("0"); private String cursor = RestUtil.encodeCursor("0");
@Getter private boolean isDone = false; @Getter private boolean isDone = false;
@ -44,48 +47,71 @@ public class PaginatedEntitiesSource implements Source<ResultList<? extends Enti
this.entityType = entityType; this.entityType = entityType;
this.batchSize = batchSize; this.batchSize = batchSize;
this.fields = fields; this.fields = fields;
this.stats.setTotalRecords(Entity.getEntityRepository(entityType).getDao().listTotalCount()); this.stats
.withTotalRecords(Entity.getEntityRepository(entityType).getDao().listTotalCount())
.withSuccessRecords(0)
.withFailedRecords(0);
} }
@Override @Override
public ResultList<? extends EntityInterface> readNext(Map<String, Object> contextData) { public ResultList<? extends EntityInterface> readNext(Map<String, Object> contextData) throws SourceException {
ResultList<? extends EntityInterface> data = null;
if (!isDone) { if (!isDone) {
ResultList<? extends EntityInterface> data = read(cursor); data = read(cursor);
cursor = data.getPaging().getAfter(); cursor = data.getPaging().getAfter();
if (cursor == null) { if (cursor == null) {
isDone = true; isDone = true;
} }
return data;
} else {
return null;
} }
return data;
} }
private ResultList<? extends EntityInterface> read(String cursor) { private ResultList<? extends EntityInterface> read(String cursor) throws SourceException {
LOG.debug("[PaginatedEntitiesSource] Fetching a Batch of Size: {} ", batchSize); LOG.debug("[PaginatedEntitiesSource] Fetching a Batch of Size: {} ", batchSize);
EntityRepository<?> entityRepository = Entity.getEntityRepository(entityType); EntityRepository<?> entityRepository = Entity.getEntityRepository(entityType);
ResultList<? extends EntityInterface> result; ResultList<? extends EntityInterface> result = null;
result = try {
entityRepository.listAfterWithSkipFailure( result =
null, Entity.getFields(entityType, fields), new ListFilter(Include.ALL), batchSize, cursor); entityRepository.listAfterWithSkipFailure(
if (!result.getErrors().isEmpty()) { null, Entity.getFields(entityType, fields), new ListFilter(Include.ALL), batchSize, cursor);
lastFailedCursor = this.cursor; if (!result.getErrors().isEmpty()) {
result lastFailedCursor = this.cursor;
.getErrors() String errMsg =
.forEach( String.format(
error -> "[PaginatedEntitiesSource] Encountered Failures. %n Marked After Cursor : %s, %n Batch Stats :- %n Submitted : %s Success: %s Failed: %s, %n Errors : %s",
LOG.error( this.lastFailedCursor,
"[PaginatedEntitiesSource] Failed in getting Record, After Cursor : {} , RECORD: {}", batchSize,
result.getPaging().getAfter(), result.getData().size(),
error)); result.getErrors().size(),
} JsonUtils.pojoToJson(result.getErrors()));
LOG.error(errMsg);
throw new SourceException(errMsg);
}
LOG.debug( LOG.debug(
"[PaginatedEntitiesSource] Batch Stats :- Submitted : {} Success: {} Failed: {}", "[PaginatedEntitiesSource] Batch Stats :- %n Submitted : {} Success: {} Failed: {}",
batchSize, batchSize, result.getData().size(), result.getErrors().size());
result.getData().size(), updateStats(result.getData().size(), result.getErrors().size());
result.getErrors().size()); } catch (Exception e) {
updateStats(result.getData().size(), result.getErrors().size()); lastFailedCursor = this.cursor;
if (result != null) {
if (result.getPaging().getAfter() == null) {
isDone = true;
} else {
this.cursor = result.getPaging().getAfter();
}
updateStats(result.getData().size(), result.getErrors().size());
} else {
String errMsg =
String.format(
"[PaginatedEntitiesSource] Encountered Failures. %n Marked After Cursor : %s, %n Batch Stats :- Submitted : %s Success: %s Failed: %s, %n Errors : %s",
this.lastFailedCursor, batchSize, 0, batchSize, "No Relationship Issue , Json Processing or DB issue.");
LOG.debug(errMsg);
updateStats(0, batchSize);
}
throw new SourceException(e);
}
return result; return result;
} }

View File

@ -22,10 +22,13 @@ import os.org.opensearch.action.bulk.BulkItemResponse;
import os.org.opensearch.action.bulk.BulkResponse; import os.org.opensearch.action.bulk.BulkResponse;
public class ReindexingUtil { public class ReindexingUtil {
private ReindexingUtil() {
/*unused*/
}
public static final String ENTITY_TYPE_KEY = "entityType"; public static final String ENTITY_TYPE_KEY = "entityType";
public static void getUpdatedStats(StepStats stats, int currentSuccess, int currentFailed) { public static void getUpdatedStats(StepStats stats, int currentSuccess, int currentFailed) {
stats.setProcessedRecords(stats.getProcessedRecords() + currentSuccess + currentFailed);
stats.setSuccessRecords(stats.getSuccessRecords() + currentSuccess); stats.setSuccessRecords(stats.getSuccessRecords() + currentSuccess);
stats.setFailedRecords(stats.getFailedRecords() + currentFailed); stats.setFailedRecords(stats.getFailedRecords() + currentFailed);
} }
@ -41,7 +44,7 @@ public class ReindexingUtil {
EntityRepository<?> repository = Entity.getEntityRepository(entityType); EntityRepository<?> repository = Entity.getEntityRepository(entityType);
total += repository.getDao().listTotalCount(); total += repository.getDao().listTotalCount();
} else { } else {
total += dao.entityExtensionTimeSeriesDao().listCount(entityType); total += dao.reportDataTimeSeriesDao().listCount(entityType);
} }
} }
return total; return total;

View File

@ -14,8 +14,13 @@
"description": "Status for the Job.", "description": "Status for the Job.",
"type": "string", "type": "string",
"enum": [ "enum": [
"started",
"running", "running",
"completed",
"failed", "failed",
"active",
"activeError",
"stopped",
"success" "success"
] ]
}, },

View File

@ -34,11 +34,6 @@
"type": "integer", "type": "integer",
"default": 0 "default": 0
}, },
"processedRecords": {
"description": "Records that are processed in",
"type": "integer",
"default": 0
},
"successRecords": { "successRecords": {
"description": "Count of Total Successfully Records", "description": "Count of Total Successfully Records",
"type": "integer", "type": "integer",
@ -49,23 +44,16 @@
"type": "integer", "type": "integer",
"default": 0 "default": 0
} }
}, }
"additionalProperties": false
}, },
"stats": { "stats": {
"type": "object", "type": "object",
"properties": { "properties": {
"sourceStats": {
"$ref": "#/definitions/stepStats"
},
"processorStats": {
"$ref": "#/definitions/stepStats"
},
"sinkStats": {
"$ref": "#/definitions/stepStats"
},
"jobStats": { "jobStats": {
"$ref": "#/definitions/stepStats" "$ref": "#/definitions/stepStats"
},
"entityStats": {
"$ref": "#/definitions/stepStats"
} }
}, },
"additionalProperties": false "additionalProperties": false
@ -99,41 +87,19 @@
"description": "This schema publisher run job status.", "description": "This schema publisher run job status.",
"type": "string", "type": "string",
"enum": [ "enum": [
"STARTED", "started",
"RUNNING", "running",
"COMPLETED", "completed",
"FAILED", "failed",
"ACTIVE", "active",
"ACTIVE_WITH_ERROR", "activeError",
"STOPPED" "stopped",
"success"
] ]
}, },
"failure": { "failure": {
"description": "List of Failures in the Job", "description": "List of Failures in the Job",
"type": "object", "type": "object"
"properties": {
"sourceError": {
"type": "object",
"$ref": "#/definitions/failureDetails",
"default": null
},
"processorError": {
"type": "object",
"$ref": "#/definitions/failureDetails",
"default": null
},
"sinkError": {
"type": "object",
"$ref": "#/definitions/failureDetails",
"default": null
},
"jobError": {
"type": "object",
"$ref": "#/definitions/failureDetails",
"default": null
}
},
"additionalProperties": false
}, },
"stats": { "stats": {
"$ref": "#/definitions/stats" "$ref": "#/definitions/stats"

View File

@ -12,8 +12,8 @@
*/ */
import { Badge, Button, Card, Col, Divider, Row, Space } from 'antd'; import { Badge, Button, Card, Col, Divider, Row, Space } from 'antd';
import { isEmpty, isNil } from 'lodash'; import { isNil } from 'lodash';
import React, { useCallback, useMemo } from 'react'; import React, { useCallback } from 'react';
import { useTranslation } from 'react-i18next'; import { useTranslation } from 'react-i18next';
import { LazyLog } from 'react-lazylog'; import { LazyLog } from 'react-lazylog';
import { ReactComponent as IconSuccessBadge } from '../../../assets/svg/success-badge.svg'; import { ReactComponent as IconSuccessBadge } from '../../../assets/svg/success-badge.svg';
@ -24,21 +24,7 @@ import { AppLogsViewerProps, JobStats } from './AppLogsViewer.interface';
const AppLogsViewer = ({ data }: AppLogsViewerProps) => { const AppLogsViewer = ({ data }: AppLogsViewerProps) => {
const { t } = useTranslation(); const { t } = useTranslation();
const { failureContext, successContext, timestamp } = useMemo( const { successContext, failureContext, timestamp } = data;
() => ({
...data,
}),
[data]
);
const hasSuccessStats = useMemo(
() => !isEmpty(successContext?.stats),
[successContext]
);
const hasFailureStats = useMemo(
() => !isEmpty(failureContext?.stats),
[failureContext]
);
const handleJumpToEnd = () => { const handleJumpToEnd = () => {
const logsBody = document.getElementsByClassName( const logsBody = document.getElementsByClassName(
@ -155,27 +141,13 @@ const AppLogsViewer = ({ data }: AppLogsViewerProps) => {
[timestamp, formatDateTimeWithTimezone] [timestamp, formatDateTimeWithTimezone]
); );
const successContextRender = useMemo( return (
() => ( <>
<> {successContext?.stats && statsRender(successContext?.stats.jobStats)}
{hasSuccessStats && statsRender(successContext?.stats.jobStats)} {failureContext?.stats && statsRender(failureContext?.stats.jobStats)}
{logsRender(successContext?.stackTrace)} {logsRender(failureContext?.stackTrace ?? failureContext?.failure)}
</> </>
),
[successContext]
); );
const failureContextRender = useMemo(
() => (
<>
{hasFailureStats && statsRender(failureContext?.stats.jobStats)}
{logsRender(failureContext?.stackTrace)}
</>
),
[failureContext]
);
return successContext ? successContextRender : failureContextRender;
}; };
export default AppLogsViewer; export default AppLogsViewer;