mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-12-01 01:56:04 +00:00
Add info logs for SearchIndexApp
This commit is contained in:
parent
df99e7263e
commit
a658e94b52
@ -108,6 +108,7 @@ public class SearchIndexApp extends AbstractNativeApplication {
|
|||||||
@Override
|
@Override
|
||||||
public void startApp(JobExecutionContext jobExecutionContext) {
|
public void startApp(JobExecutionContext jobExecutionContext) {
|
||||||
try {
|
try {
|
||||||
|
LOG.info("Starting Search Indexing App with JobData: {}", jobData);
|
||||||
this.jobExecutionContext = jobExecutionContext;
|
this.jobExecutionContext = jobExecutionContext;
|
||||||
initializeJob(jobExecutionContext);
|
initializeJob(jobExecutionContext);
|
||||||
String runType =
|
String runType =
|
||||||
@ -201,7 +202,7 @@ public class SearchIndexApp extends AbstractNativeApplication {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pushAppStatusUpdates(jobExecutionContext, appRecord, true);
|
pushAppStatusUpdates(jobExecutionContext, appRecord, true);
|
||||||
LOG.debug("Updated AppRunRecord in DB: {}", appRecord);
|
LOG.info("Updated AppRunRecord in DB: {}", appRecord);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void performReindex(JobExecutionContext jobExecutionContext) throws InterruptedException {
|
private void performReindex(JobExecutionContext jobExecutionContext) throws InterruptedException {
|
||||||
@ -247,17 +248,21 @@ public class SearchIndexApp extends AbstractNativeApplication {
|
|||||||
int totalEntityRecords = getTotalEntityRecords(entityType);
|
int totalEntityRecords = getTotalEntityRecords(entityType);
|
||||||
Source<?> source = createSource(entityType);
|
Source<?> source = createSource(entityType);
|
||||||
int loadPerThread = calculateNumberOfThreads(totalEntityRecords);
|
int loadPerThread = calculateNumberOfThreads(totalEntityRecords);
|
||||||
|
LOG.info(
|
||||||
|
"Processing entity type: {}, TotalRecords: {}, Load per Thread will be : {}",
|
||||||
|
entityType,
|
||||||
|
totalEntityRecords,
|
||||||
|
loadPerThread);
|
||||||
Semaphore semaphore = new Semaphore(jobData.getQueueSize());
|
Semaphore semaphore = new Semaphore(jobData.getQueueSize());
|
||||||
if (totalEntityRecords > 0) {
|
if (totalEntityRecords > 0) {
|
||||||
for (int i = 0; i < loadPerThread; i++) {
|
for (int i = 0; i < loadPerThread; i++) {
|
||||||
semaphore.acquire();
|
semaphore.acquire();
|
||||||
LOG.debug(
|
LOG.info("Submitting producer task current queue size: {}", producerQueue.size());
|
||||||
"Submitting producer task current queue size: {}", producerQueue.size());
|
|
||||||
int currentOffset = i * batchSize.get();
|
int currentOffset = i * batchSize.get();
|
||||||
producerExecutor.submit(
|
producerExecutor.submit(
|
||||||
() -> {
|
() -> {
|
||||||
try {
|
try {
|
||||||
LOG.debug(
|
LOG.info(
|
||||||
"Running Task for CurrentOffset: {}, Producer Latch Down, Current : {}",
|
"Running Task for CurrentOffset: {}, Producer Latch Down, Current : {}",
|
||||||
currentOffset,
|
currentOffset,
|
||||||
producerLatch.getCount());
|
producerLatch.getCount());
|
||||||
@ -265,7 +270,7 @@ public class SearchIndexApp extends AbstractNativeApplication {
|
|||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOG.error("Error processing entity type {}", entityType, e);
|
LOG.error("Error processing entity type {}", entityType, e);
|
||||||
} finally {
|
} finally {
|
||||||
LOG.debug(
|
LOG.info(
|
||||||
"Producer Latch Down and Semaphore Release, Current : {}",
|
"Producer Latch Down and Semaphore Release, Current : {}",
|
||||||
producerLatch.getCount());
|
producerLatch.getCount());
|
||||||
producerLatch.countDown();
|
producerLatch.countDown();
|
||||||
@ -437,7 +442,7 @@ public class SearchIndexApp extends AbstractNativeApplication {
|
|||||||
private void reCreateIndexes(Set<String> entities) throws SearchIndexException {
|
private void reCreateIndexes(Set<String> entities) throws SearchIndexException {
|
||||||
for (String entityType : entities) {
|
for (String entityType : entities) {
|
||||||
if (Boolean.FALSE.equals(jobData.getRecreateIndex())) {
|
if (Boolean.FALSE.equals(jobData.getRecreateIndex())) {
|
||||||
LOG.debug("RecreateIndex is false. Skipping index recreation for '{}'.", entityType);
|
LOG.info("RecreateIndex is false. Skipping index recreation for '{}'.", entityType);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -573,7 +578,7 @@ public class SearchIndexApp extends AbstractNativeApplication {
|
|||||||
JobExecutionContext jobExecutionContext, String entityType, Source<?> source, int offset) {
|
JobExecutionContext jobExecutionContext, String entityType, Source<?> source, int offset) {
|
||||||
try {
|
try {
|
||||||
Object resultList = source.readWithCursor(RestUtil.encodeCursor(String.valueOf(offset)));
|
Object resultList = source.readWithCursor(RestUtil.encodeCursor(String.valueOf(offset)));
|
||||||
LOG.debug("Read Entities with entityType: {}, CurrentOffset: {}", entityType, offset);
|
LOG.info("Read Entities with entityType: {}, CurrentOffset: {}", entityType, offset);
|
||||||
if (resultList != null) {
|
if (resultList != null) {
|
||||||
ResultList<?> entities = extractEntities(entityType, resultList);
|
ResultList<?> entities = extractEntities(entityType, resultList);
|
||||||
if (!nullOrEmpty(entities.getData())) {
|
if (!nullOrEmpty(entities.getData())) {
|
||||||
|
|||||||
@ -150,14 +150,17 @@ public class PaginatedEntitiesSource implements Source<ResultList<? extends Enti
|
|||||||
|
|
||||||
public ResultList<? extends EntityInterface> readWithCursor(String currentCursor)
|
public ResultList<? extends EntityInterface> readWithCursor(String currentCursor)
|
||||||
throws SearchIndexException {
|
throws SearchIndexException {
|
||||||
LOG.debug("[PaginatedEntitiesSource] Fetching a Batch of Size: {} ", batchSize);
|
LOG.info(
|
||||||
|
"[PaginatedEntitiesSource] Fetching a Batch of Size: {} , Current Cursor: {}",
|
||||||
|
batchSize,
|
||||||
|
RestUtil.decodeCursor(currentCursor));
|
||||||
EntityRepository<?> entityRepository = Entity.getEntityRepository(entityType);
|
EntityRepository<?> entityRepository = Entity.getEntityRepository(entityType);
|
||||||
ResultList<? extends EntityInterface> result;
|
ResultList<? extends EntityInterface> result;
|
||||||
try {
|
try {
|
||||||
result =
|
result =
|
||||||
entityRepository.listAfterWithSkipFailure(
|
entityRepository.listAfterWithSkipFailure(
|
||||||
null, Entity.getFields(entityType, fields), filter, batchSize, currentCursor);
|
null, Entity.getFields(entityType, fields), filter, batchSize, currentCursor);
|
||||||
LOG.debug(
|
LOG.info(
|
||||||
"[PaginatedEntitiesSource] Batch Stats :- %n Submitted : {} Success: {} Failed: {}",
|
"[PaginatedEntitiesSource] Batch Stats :- %n Submitted : {} Success: {} Failed: {}",
|
||||||
batchSize, result.getData().size(), result.getErrors().size());
|
batchSize, result.getData().size(), result.getErrors().size());
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user