fix(spark-lineage): default timeout for future responses (#10947)

This commit is contained in:
deepgarg-visa 2024-07-22 13:37:37 +05:30 committed by GitHub
parent 20574cf1c6
commit aa97cba3e8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 10 additions and 3 deletions

View File

@ -69,7 +69,7 @@ public class MetadataResponseFuture implements Future<MetadataWriteResponse> {
return mapper.map(response);
} else {
// We wait for the callback to fill this out
responseLatch.await();
responseLatch.await(timeout, unit);
return responseReference.get();
}
}

View File

@ -250,6 +250,7 @@ public class RestEmitter implements Emitter {
@Override
public void failed(Exception ex) {
responseLatch.countDown();
if (callback != null) {
try {
callback.onFailure(ex);
@ -261,6 +262,7 @@ public class RestEmitter implements Emitter {
@Override
public void cancelled() {
responseLatch.countDown();
if (callback != null) {
try {
callback.onFailure(new RuntimeException("Cancelled"));
@ -344,6 +346,7 @@ public class RestEmitter implements Emitter {
@Override
public void failed(Exception ex) {
responseLatch.countDown();
if (callback != null) {
try {
callback.onFailure(ex);
@ -355,6 +358,7 @@ public class RestEmitter implements Emitter {
@Override
public void cancelled() {
responseLatch.countDown();
if (callback != null) {
try {
callback.onFailure(new RuntimeException("Cancelled"));

View File

@ -39,6 +39,8 @@ import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
@ -53,6 +55,7 @@ public class DatahubEventEmitter extends EventEmitter {
private final List<DatahubJob> _datahubJobs = new LinkedList<>();
private final Map<String, MetadataChangeProposalWrapper> schemaMap = new HashMap<>();
private SparkLineageConf datahubConf;
private static final int DEFAULT_TIMEOUT_SEC = 10;
private final EventFormatter eventFormatter = new EventFormatter();
@ -386,8 +389,8 @@ public class DatahubEventEmitter extends EventEmitter {
.forEach(
future -> {
try {
log.info(future.get().toString());
} catch (InterruptedException | ExecutionException e) {
log.info(future.get(DEFAULT_TIMEOUT_SEC, TimeUnit.SECONDS).toString());
} catch (InterruptedException | ExecutionException | TimeoutException e) {
// log error, but don't impact thread
log.error("Failed to emit metadata to DataHub", e);
}