Enable S3AsyncClient with multi-part upload (#24198)

* Enable S3AsyncClient with multi-part upload

* Enable S3AsyncClient with multi-part upload

* fix

* Update source for debian 11 to 12 (#24219) (#24220)

* Update source for dabian 11 to 12

* tests

* fix

---------

Co-authored-by: Pere Miquel Brull <peremiquelbrull@gmail.com>
Co-authored-by: miriann-uu <mirian.ugokwe@getcollate.io>
Co-authored-by: Ashish Gupta <ashish@getcollate.io>
This commit is contained in:
Sriharsha Chintalapani 2025-11-07 01:11:09 -08:00 committed by GitHub
parent 2bbeace734
commit cf1a742f24
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 779 additions and 369 deletions

View File

@ -49,6 +49,7 @@ public class StreamableLogsMetrics {
private final Timer s3OperationLatency;
private final AtomicInteger activeMultipartUploads;
private final AtomicLong s3WriteThroughputBytes;
private final AtomicInteger pendingPartUploads;
private final Counter sessionCreated;
private final Counter sessionReused;
@ -162,6 +163,12 @@ public class StreamableLogsMetrics {
.description("Current S3 write throughput in bytes/sec")
.register(meterRegistry);
// Add gauge for pending part uploads
this.pendingPartUploads = new AtomicInteger(0);
Gauge.builder("om_s3_pending_part_uploads", pendingPartUploads, AtomicInteger::get)
.description("Number of pending S3 multipart part uploads")
.register(meterRegistry);
this.sessionCreated =
Counter.builder("om_streamable_logs_sessions_created_total")
.description("Total number of new sessions created")
@ -298,6 +305,18 @@ public class StreamableLogsMetrics {
s3WriteThroughputBytes.set(bytesPerSecond);
}
public void updatePendingPartUploads(int count) {
pendingPartUploads.set(count);
}
public void incrementPendingPartUploads() {
pendingPartUploads.incrementAndGet();
}
public void decrementPendingPartUploads() {
pendingPartUploads.decrementAndGet();
}
public void recordSessionCreated() {
sessionCreated.increment();
activeSessions.incrementAndGet();

View File

@ -1024,20 +1024,61 @@ public class IngestionPipelineResource
return Response.status(200).entity("Pipeline Client Disabled").build();
}
IngestionPipeline ingestionPipeline =
getInternal(uriInfo, securityContext, id, FIELDS, Include.NON_DELETED);
getInternal(
uriInfo,
securityContext,
id,
"pipelineStatuses,ingestionRunner",
Include.NON_DELETED);
String filename =
String.format(
"ingestion_logs_%s_%d.txt", ingestionPipeline.getName(), System.currentTimeMillis());
boolean useStreamableLogs =
ingestionPipeline.getEnableStreamableLogs()
|| (ingestionPipeline.getIngestionRunner() != null
&& repository.isIngestionRunnerStreamableLogsEnabled(
ingestionPipeline.getIngestionRunner()));
StreamingOutput streamingOutput =
output -> {
String cursor = null;
boolean hasMoreData = true;
while (hasMoreData) {
Map<String, String> logChunk =
pipelineServiceClient.getLastIngestionLogs(ingestionPipeline, cursor);
Map<String, String> logChunk;
if (useStreamableLogs) {
// Get logs using the repository's log storage picking up the last runId
String runId = ingestionPipeline.getPipelineStatuses().getRunId();
if (CommonUtil.nullOrEmpty(runId)) {
throw new PipelineServiceClientException(
"No runId found for the last ingestion pipeline run");
}
Map<String, Object> lastIngestionLogsMap =
repository.getLogs(
ingestionPipeline.getFullyQualifiedName(),
UUID.fromString(runId),
cursor,
1000);
logChunk =
lastIngestionLogsMap.entrySet().stream()
.filter(entry -> entry.getValue() != null)
.collect(
Collectors.toMap(
Map.Entry::getKey, entry -> entry.getValue().toString()));
Object logs = logChunk.remove("logs");
if (logs != null) {
logChunk.put(
TYPE_TO_TASK.get(ingestionPipeline.getPipelineType().toString()),
logs.toString());
}
} else {
// Get the logs from the service client
logChunk = pipelineServiceClient.getLastIngestionLogs(ingestionPipeline, cursor);
}
if (logChunk == null || logChunk.isEmpty()) {
break;

View File

@ -20,6 +20,7 @@ import static org.mockito.Mockito.*;
import java.io.*;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@ -28,9 +29,11 @@ import org.mockito.MockedStatic;
import org.mockito.junit.jupiter.MockitoExtension;
import org.openmetadata.schema.api.configuration.LogStorageConfiguration;
import org.openmetadata.schema.security.credentials.AWSCredentials;
import software.amazon.awssdk.core.ResponseBytes;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.core.sync.ResponseTransformer;
import software.amazon.awssdk.core.ResponseInputStream;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.http.AbortableInputStream;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.S3AsyncClientBuilder;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.S3ClientBuilder;
import software.amazon.awssdk.services.s3.model.*;
@ -39,6 +42,7 @@ import software.amazon.awssdk.services.s3.model.*;
public class S3LogStorageTest {
@Mock private S3Client mockS3Client;
@Mock private S3AsyncClient mockS3AsyncClient;
private S3LogStorage s3LogStorage;
private LogStorageConfiguration testConfig;
@ -60,14 +64,22 @@ public class S3LogStorageTest {
.withStorageClass(LogStorageConfiguration.StorageClass.STANDARD_IA)
.withExpirationDays(30);
// Mock S3Client builder
try (MockedStatic<S3Client> s3ClientMock = mockStatic(S3Client.class)) {
// Mock S3Client and S3AsyncClient builders
try (MockedStatic<S3Client> s3ClientMock = mockStatic(S3Client.class);
MockedStatic<S3AsyncClient> s3AsyncClientMock = mockStatic(S3AsyncClient.class)) {
S3ClientBuilder mockBuilder = mock(S3ClientBuilder.class);
when(S3Client.builder()).thenReturn(mockBuilder);
when(mockBuilder.region(any())).thenReturn(mockBuilder);
when(mockBuilder.credentialsProvider(any())).thenReturn(mockBuilder);
when(mockBuilder.build()).thenReturn(mockS3Client);
S3AsyncClientBuilder mockAsyncBuilder = mock(S3AsyncClientBuilder.class);
when(S3AsyncClient.builder()).thenReturn(mockAsyncBuilder);
when(mockAsyncBuilder.region(any())).thenReturn(mockAsyncBuilder);
when(mockAsyncBuilder.credentialsProvider(any())).thenReturn(mockAsyncBuilder);
when(mockAsyncBuilder.build()).thenReturn(mockS3AsyncClient);
// Initialize S3LogStorage
s3LogStorage = new S3LogStorage();
Map<String, Object> config = new HashMap<>();
@ -81,6 +93,13 @@ public class S3LogStorageTest {
}
}
// Helper method to create ResponseInputStream for mocking
private ResponseInputStream<GetObjectResponse> createResponseInputStream(String content) {
InputStream inputStream = new ByteArrayInputStream(content.getBytes(StandardCharsets.UTF_8));
GetObjectResponse response = GetObjectResponse.builder().build();
return new ResponseInputStream<>(response, AbortableInputStream.create(inputStream));
}
@Test
void testS3LogStorageInitialization() {
assertNotNull(s3LogStorage);
@ -89,31 +108,43 @@ public class S3LogStorageTest {
@Test
void testAppendLogs() throws IOException {
String existingContent = "Existing log content\n";
String newContent = "New log content\n";
String expectedKey = String.format("%s/%s/%s/logs.txt", testPrefix, testPipelineFQN, testRunId);
// Mock getting existing content
when(mockS3Client.getObjectAsBytes(any(GetObjectRequest.class)))
// Mock async multipart upload initialization
when(mockS3AsyncClient.createMultipartUpload(any(CreateMultipartUploadRequest.class)))
.thenReturn(
ResponseBytes.fromByteArray(
GetObjectResponse.builder().build(),
existingContent.getBytes(StandardCharsets.UTF_8)));
CompletableFuture.completedFuture(
CreateMultipartUploadResponse.builder().uploadId("test-upload-id").build()));
// Mock putting updated content
when(mockS3Client.putObject(any(PutObjectRequest.class), any(RequestBody.class)))
.thenReturn(PutObjectResponse.builder().build());
// Mock async upload part
when(mockS3AsyncClient.uploadPart(any(UploadPartRequest.class), any(AsyncRequestBody.class)))
.thenReturn(
CompletableFuture.completedFuture(
UploadPartResponse.builder().eTag("test-etag").build()));
// Mock async complete multipart upload
when(mockS3AsyncClient.completeMultipartUpload(any(CompleteMultipartUploadRequest.class)))
.thenReturn(
CompletableFuture.completedFuture(CompleteMultipartUploadResponse.builder().build()));
// Mock async putObject for marking run as active
when(mockS3AsyncClient.putObject(any(PutObjectRequest.class), any(AsyncRequestBody.class)))
.thenReturn(CompletableFuture.completedFuture(PutObjectResponse.builder().build()));
// Test appending logs
assertDoesNotThrow(() -> s3LogStorage.appendLogs(testPipelineFQN, testRunId, newContent));
// Verify S3 operations
verify(mockS3Client)
.getObjectAsBytes(
argThat(
(GetObjectRequest request) ->
request.bucket().equals(testBucket) && request.key().equals(expectedKey)));
verify(mockS3Client).putObject(any(PutObjectRequest.class), any(RequestBody.class));
// Verify multipart upload was initiated
verify(mockS3AsyncClient, times(1))
.createMultipartUpload(any(CreateMultipartUploadRequest.class));
// Flush to complete multipart upload
s3LogStorage.flush();
// Verify multipart upload was completed
verify(mockS3AsyncClient, times(1))
.completeMultipartUpload(any(CompleteMultipartUploadRequest.class));
}
@Test
@ -121,19 +152,25 @@ public class S3LogStorageTest {
String newContent = "First log content\n";
String expectedKey = String.format("%s/%s/%s/logs.txt", testPrefix, testPipelineFQN, testRunId);
// Mock getting non-existent content
when(mockS3Client.getObjectAsBytes(any(GetObjectRequest.class)))
.thenThrow(NoSuchKeyException.builder().build());
// Mock async multipart upload for new file
when(mockS3AsyncClient.createMultipartUpload(any(CreateMultipartUploadRequest.class)))
.thenReturn(
CompletableFuture.completedFuture(
CreateMultipartUploadResponse.builder().uploadId("test-upload-id").build()));
// Mock putting new content
when(mockS3Client.putObject(any(PutObjectRequest.class), any(RequestBody.class)))
.thenReturn(PutObjectResponse.builder().build());
// Mock async putObject for marking run as active
when(mockS3AsyncClient.putObject(any(PutObjectRequest.class), any(AsyncRequestBody.class)))
.thenReturn(CompletableFuture.completedFuture(PutObjectResponse.builder().build()));
// Note: uploadPart and completeMultipartUpload won't be called until flush/close
// since the content is too small (< 5MB)
// Test appending logs to new file
assertDoesNotThrow(() -> s3LogStorage.appendLogs(testPipelineFQN, testRunId, newContent));
// Verify S3 operations
verify(mockS3Client).putObject(any(PutObjectRequest.class), any(RequestBody.class));
// Verify multipart upload was initiated
verify(mockS3AsyncClient, times(1))
.createMultipartUpload(any(CreateMultipartUploadRequest.class));
}
@Test
@ -145,11 +182,9 @@ public class S3LogStorageTest {
when(mockS3Client.headObject(any(HeadObjectRequest.class)))
.thenReturn(HeadObjectResponse.builder().contentLength((long) logContent.length()).build());
// Mock get object
InputStream contentStream =
new ByteArrayInputStream(logContent.getBytes(StandardCharsets.UTF_8));
when(mockS3Client.getObject(any(GetObjectRequest.class), any(ResponseTransformer.class)))
.thenReturn(contentStream);
// Mock get object - use ResponseInputStream
when(mockS3Client.getObject(any(GetObjectRequest.class)))
.thenReturn(createResponseInputStream(logContent));
// Test getting logs
Map<String, Object> result = s3LogStorage.getLogs(testPipelineFQN, testRunId, null, 2);
@ -168,11 +203,9 @@ public class S3LogStorageTest {
when(mockS3Client.headObject(any(HeadObjectRequest.class)))
.thenReturn(HeadObjectResponse.builder().contentLength((long) logContent.length()).build());
// Mock get object
InputStream contentStream =
new ByteArrayInputStream(logContent.getBytes(StandardCharsets.UTF_8));
when(mockS3Client.getObject(any(GetObjectRequest.class), any(ResponseTransformer.class)))
.thenReturn(contentStream);
// Mock get object - use ResponseInputStream
when(mockS3Client.getObject(any(GetObjectRequest.class)))
.thenReturn(createResponseInputStream(logContent));
// Test getting logs with cursor
Map<String, Object> result = s3LogStorage.getLogs(testPipelineFQN, testRunId, "2", 2);
@ -235,11 +268,7 @@ public class S3LogStorageTest {
assertDoesNotThrow(() -> s3LogStorage.deleteLogs(testPipelineFQN, testRunId));
// Verify delete was called
verify(mockS3Client)
.deleteObject(
argThat(
(DeleteObjectRequest request) ->
request.bucket().equals(testBucket) && request.key().equals(expectedKey)));
verify(mockS3Client, times(1)).deleteObject(any(DeleteObjectRequest.class));
}
@Test
@ -264,12 +293,10 @@ public class S3LogStorageTest {
@Test
void testGetLogInputStream() throws IOException {
String logContent = "Stream content";
InputStream expectedStream =
new ByteArrayInputStream(logContent.getBytes(StandardCharsets.UTF_8));
// Mock get object
when(mockS3Client.getObject(any(GetObjectRequest.class), any(ResponseTransformer.class)))
.thenReturn(expectedStream);
// Mock get object - use ResponseInputStream
when(mockS3Client.getObject(any(GetObjectRequest.class)))
.thenReturn(createResponseInputStream(logContent));
// Test getting input stream
InputStream stream = s3LogStorage.getLogInputStream(testPipelineFQN, testRunId);
@ -280,6 +307,21 @@ public class S3LogStorageTest {
@Test
void testGetLogOutputStream() throws IOException {
// Mock async multipart upload operations
when(mockS3AsyncClient.createMultipartUpload(any(CreateMultipartUploadRequest.class)))
.thenReturn(
CompletableFuture.completedFuture(
CreateMultipartUploadResponse.builder().uploadId("test-upload-id").build()));
when(mockS3AsyncClient.uploadPart(any(UploadPartRequest.class), any(AsyncRequestBody.class)))
.thenReturn(
CompletableFuture.completedFuture(
UploadPartResponse.builder().eTag("test-etag").build()));
when(mockS3AsyncClient.completeMultipartUpload(any(CompleteMultipartUploadRequest.class)))
.thenReturn(
CompletableFuture.completedFuture(CompleteMultipartUploadResponse.builder().build()));
// Test getting output stream
OutputStream stream = s3LogStorage.getLogOutputStream(testPipelineFQN, testRunId);
@ -289,17 +331,32 @@ public class S3LogStorageTest {
// Write some data
stream.write("Test output".getBytes(StandardCharsets.UTF_8));
stream.close();
// Verify multipart upload was initiated and completed
verify(mockS3AsyncClient).createMultipartUpload(any(CreateMultipartUploadRequest.class));
verify(mockS3AsyncClient).completeMultipartUpload(any(CompleteMultipartUploadRequest.class));
}
@Test
void testClose() throws IOException {
// Mock async multipart upload operations
when(mockS3AsyncClient.createMultipartUpload(any(CreateMultipartUploadRequest.class)))
.thenReturn(
CompletableFuture.completedFuture(
CreateMultipartUploadResponse.builder().uploadId("test-upload-id").build()));
when(mockS3AsyncClient.abortMultipartUpload(any(AbortMultipartUploadRequest.class)))
.thenReturn(
CompletableFuture.completedFuture(AbortMultipartUploadResponse.builder().build()));
// Create and add a mock stream
OutputStream stream = s3LogStorage.getLogOutputStream(testPipelineFQN, testRunId);
// Test closing
assertDoesNotThrow(() -> s3LogStorage.close());
// Verify S3 client was closed
// Verify S3 clients were closed
verify(mockS3Client).close();
verify(mockS3AsyncClient).close();
}
}