diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/logstorage/S3LogStorage.java b/openmetadata-service/src/main/java/org/openmetadata/service/logstorage/S3LogStorage.java index 8d3b12f3116..bbede1373ae 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/logstorage/S3LogStorage.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/logstorage/S3LogStorage.java @@ -36,17 +36,13 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import lombok.extern.slf4j.Slf4j; import org.openmetadata.schema.api.configuration.LogStorageConfiguration; import org.openmetadata.schema.security.credentials.AWSCredentials; @@ -56,10 +52,13 @@ import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.auth.credentials.AwsSessionCredentials; import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; -import software.amazon.awssdk.core.sync.RequestBody; +import software.amazon.awssdk.core.async.AsyncRequestBody; import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.S3AsyncClientBuilder; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.S3ClientBuilder; +import software.amazon.awssdk.services.s3.model.AbortIncompleteMultipartUpload; import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest; import software.amazon.awssdk.services.s3.model.BucketLifecycleConfiguration; import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest; @@ -85,12 +84,10 @@ import software.amazon.awssdk.services.s3.model.NoSuchKeyException; import software.amazon.awssdk.services.s3.model.ObjectIdentifier; import software.amazon.awssdk.services.s3.model.PutBucketLifecycleConfigurationRequest; import software.amazon.awssdk.services.s3.model.PutObjectRequest; -import software.amazon.awssdk.services.s3.model.S3Exception; import software.amazon.awssdk.services.s3.model.S3Object; import software.amazon.awssdk.services.s3.model.ServerSideEncryption; import software.amazon.awssdk.services.s3.model.StorageClass; import software.amazon.awssdk.services.s3.model.UploadPartRequest; -import software.amazon.awssdk.services.s3.model.UploadPartResponse; /** * S3-based implementation of LogStorageInterface for storing pipeline logs. @@ -103,6 +100,7 @@ import software.amazon.awssdk.services.s3.model.UploadPartResponse; public class S3LogStorage implements LogStorageInterface { private S3Client s3Client; + private S3AsyncClient s3AsyncClient; private String bucketName; private String prefix; private boolean enableSSE; @@ -116,11 +114,10 @@ public class S3LogStorage implements LogStorageInterface { private String kmsKeyId = null; private final Map activeStreams = new ConcurrentHashMap<>(); - private ExecutorService asyncExecutor; + private final Map partialLogOffsets = new ConcurrentHashMap<>(); private ScheduledExecutorService cleanupExecutor; - private final Map logBuffers = new ConcurrentHashMap<>(); - private final Cache recentLogsCache = + private final Cache recentLogsCache = Caffeine.newBuilder().maximumSize(200).expireAfterAccess(30, TimeUnit.MINUTES).build(); private final Map> activeListeners = new ConcurrentHashMap<>(); @@ -131,7 +128,6 @@ public class S3LogStorage implements LogStorageInterface { try { LogStorageConfiguration s3Config = (LogStorageConfiguration) config.get("config"); - // Extract metrics if provided if (config.get("metrics") != null) { this.metrics = (StreamableLogsMetrics) config.get("metrics"); } @@ -189,6 +185,18 @@ public class S3LogStorage implements LogStorageInterface { this.s3Client = s3Builder.build(); + S3AsyncClientBuilder asyncBuilder = + S3AsyncClient.builder() + .region(Region.of(s3Config.getAwsConfig().getAwsRegion())) + .credentialsProvider(credentialsProvider); + + if (!nullOrEmpty(customEndpoint)) { + asyncBuilder.endpointOverride(java.net.URI.create(customEndpoint.toString())); + asyncBuilder.forcePathStyle(true); + } + + this.s3AsyncClient = asyncBuilder.build(); + try { s3Client.headBucket(HeadBucketRequest.builder().bucket(bucketName).build()); } catch (NoSuchBucketException e) { @@ -198,27 +206,6 @@ public class S3LogStorage implements LogStorageInterface { "Error accessing S3 bucket: " + bucketName + ". Validate AWS configuration.", e); } - this.asyncExecutor = - new ThreadPoolExecutor( - 2, // Core pool size - 10, // Maximum pool size - 60L, - TimeUnit.SECONDS, // Keep alive time - new LinkedBlockingQueue<>(1000), // Bounded queue for backpressure - new ThreadFactory() { - private int counter = 0; - - @Override - public Thread newThread(Runnable r) { - Thread thread = new Thread(r); - thread.setName("s3-log-writer-" + counter++); - thread.setDaemon(true); - return thread; - } - }, - new ThreadPoolExecutor.CallerRunsPolicy() // Fallback to caller thread if queue full - ); - this.cleanupExecutor = Executors.newSingleThreadScheduledExecutor( r -> { @@ -229,7 +216,12 @@ public class S3LogStorage implements LogStorageInterface { }); cleanupExecutor.scheduleWithFixedDelay(this::cleanupExpiredStreams, 1, 1, TimeUnit.MINUTES); - cleanupExecutor.scheduleWithFixedDelay(this::flushAllBuffers, 5, 5, TimeUnit.SECONDS); + + // Update metrics every 30 seconds + cleanupExecutor.scheduleWithFixedDelay(this::updateStreamMetrics, 30, 30, TimeUnit.SECONDS); + + // Write partial logs every 2 minutes to make them available for reading + cleanupExecutor.scheduleWithFixedDelay(this::writePartialLogs, 2, 2, TimeUnit.MINUTES); if (expirationDays > 0) { try { @@ -299,9 +291,17 @@ public class S3LogStorage implements LogStorageInterface { String key = buildS3Key(pipelineFQN, runId); MultipartS3OutputStream stream = new MultipartS3OutputStream( - s3Client, bucketName, key, enableSSE, storageClass, isCustomEndpoint, asyncExecutor); + s3AsyncClient, + bucketName, + key, + enableSSE, + storageClass, + isCustomEndpoint, + sseAlgorithm, + kmsKeyId, + metrics); - StreamContext context = new StreamContext(stream, System.currentTimeMillis()); + StreamContext context = new StreamContext(stream, System.currentTimeMillis(), metrics); activeStreams.put(streamKey, context); return stream; @@ -315,17 +315,50 @@ public class S3LogStorage implements LogStorageInterface { metrics.recordBatchSize(logContent.split("\n").length); } - String bufferKey = pipelineFQN + "/" + runId; - LogBuffer buffer = null; + String streamKey = pipelineFQN + "/" + runId; try { - CircularBuffer recentLogs = recentLogsCache.get(bufferKey, k -> new CircularBuffer(1000)); + // Update memory cache for real-time log viewing + SimpleLogBuffer recentLogs = recentLogsCache.get(streamKey, k -> new SimpleLogBuffer(1000)); recentLogs.append(logContent); + // Notify listeners for SSE/WebSocket streaming + notifyListeners(streamKey, logContent); - notifyListeners(bufferKey, logContent); + StreamContext context = + activeStreams.computeIfAbsent( + streamKey, + k -> { + try { + if (activeStreams.size() >= maxConcurrentStreams) { + cleanupExpiredStreams(); + if (activeStreams.size() >= maxConcurrentStreams) { + throw new IOException( + "Maximum concurrent log streams reached: " + maxConcurrentStreams); + } + } - buffer = logBuffers.computeIfAbsent(bufferKey, k -> new LogBuffer(pipelineFQN, runId)); - buffer.append(logContent); + String key = buildS3Key(pipelineFQN, runId); + MultipartS3OutputStream stream = + new MultipartS3OutputStream( + s3AsyncClient, + bucketName, + key, + enableSSE, + storageClass, + isCustomEndpoint, + sseAlgorithm, + kmsKeyId, + metrics); + LOG.info("Created multipart upload stream for {}/{}", pipelineFQN, runId); + return new StreamContext(stream, System.currentTimeMillis(), metrics); + } catch (IOException e) { + throw new RuntimeException("Failed to create multipart upload stream", e); + } + }); + + byte[] logBytes = logContent.getBytes(StandardCharsets.UTF_8); + context.stream.write(logBytes); + context.updateAccessTime(); if (metrics != null) { metrics.recordLogsSent(1); @@ -337,22 +370,23 @@ public class S3LogStorage implements LogStorageInterface { if (metrics != null) { metrics.recordLogsFailed(); } - throw e; + throw new IOException("Failed to append logs for " + pipelineFQN + "/" + runId, e); } markRunAsActive(pipelineFQN, runId); - - if (buffer != null && (buffer.shouldFlush() || buffer.isExpired())) { - flushBuffer(bufferKey, buffer); - } } @Override public InputStream getLogInputStream(String pipelineFQN, UUID runId) throws IOException { - String bufferKey = pipelineFQN + "/" + runId; - LogBuffer buffer = logBuffers.get(bufferKey); - if (buffer != null && buffer.hasContent()) { - flushBufferAndWait(bufferKey, buffer); + String streamKey = pipelineFQN + "/" + runId; + + // Check if pipeline is still running (active multipart upload in progress) + StreamContext activeStream = activeStreams.get(streamKey); + if (activeStream != null) { + // Pipeline is still running - read from memory cache + List recentLines = getRecentLogs(pipelineFQN, runId, 1000); + String content = String.join("\n", recentLines); + return new ByteArrayInputStream(content.getBytes(StandardCharsets.UTF_8)); } String key = buildS3Key(pipelineFQN, runId); @@ -375,6 +409,11 @@ public class S3LogStorage implements LogStorageInterface { return result; } catch (NoSuchKeyException e) { + List recentLines = getRecentLogs(pipelineFQN, runId, 1000); + if (!recentLines.isEmpty()) { + String content = String.join("\n", recentLines); + return new ByteArrayInputStream(content.getBytes(StandardCharsets.UTF_8)); + } return new ByteArrayInputStream(new byte[0]); } catch (Exception e) { if (metrics != null) { @@ -387,14 +426,22 @@ public class S3LogStorage implements LogStorageInterface { @Override public Map getLogs(String pipelineFQN, UUID runId, String afterCursor, int limit) throws IOException { - String bufferKey = pipelineFQN + "/" + runId; - LogBuffer buffer = logBuffers.get(bufferKey); - if (buffer != null && buffer.hasContent()) { - flushBufferAndWait(bufferKey, buffer); + String streamKey = pipelineFQN + "/" + runId; + Map result = new HashMap<>(); + + // Check if pipeline is still running (active multipart upload in progress) + StreamContext activeStream = activeStreams.get(streamKey); + if (activeStream != null) { + // Pipeline is still running - combine completed logs from S3 + recent logs from memory + result = getCombinedLogsForActiveStream(pipelineFQN, runId, afterCursor, limit); + result.put("streaming", true); // Indicate logs are still being written + LOG.debug( + "Reading combined logs (S3 + memory) for active pipeline {}/{}", pipelineFQN, runId); + return result; } + // Pipeline completed - read from S3 String key = buildS3Key(pipelineFQN, runId); - Map result = new HashMap<>(); Timer.Sample s3Sample = null; if (metrics != null) { @@ -409,6 +456,59 @@ public class S3LogStorage implements LogStorageInterface { try { headResponse = s3Client.headObject(headRequest); } catch (NoSuchKeyException e) { + // Main file doesn't exist - check for partial file first + String partialKey = buildPartialS3Key(pipelineFQN, runId); + try { + GetObjectRequest getRequest = + GetObjectRequest.builder().bucket(bucketName).key(partialKey).build(); + try (InputStream objectContent = s3Client.getObject(getRequest); + BufferedReader reader = + new BufferedReader( + new InputStreamReader(objectContent, StandardCharsets.UTF_8))) { + + List allLines = new ArrayList<>(); + String line; + while ((line = reader.readLine()) != null) { + allLines.add(line); + } + + // Apply pagination + int startIndex = 0; + if (afterCursor != null && !afterCursor.isEmpty()) { + try { + startIndex = Integer.parseInt(afterCursor); + } catch (NumberFormatException ex) { + LOG.warn("Invalid cursor format: {}", afterCursor); + } + } + + int endIndex = Math.min(startIndex + limit, allLines.size()); + List resultLines = + startIndex < allLines.size() + ? allLines.subList(startIndex, endIndex) + : Collections.emptyList(); + + result.put("logs", String.join("\n", resultLines)); + result.put("after", endIndex < allLines.size() ? String.valueOf(endIndex) : null); + result.put("total", (long) allLines.size()); + + if (metrics != null && s3Sample != null) { + metrics.recordS3Read(); + metrics.recordS3Operation(s3Sample); + } + return result; + } + } catch (NoSuchKeyException ex) { + // Neither main nor partial file exists - this means truly no logs + LOG.debug("No logs found (neither main nor partial) for {}/{}", pipelineFQN, runId); + } catch (Exception ex) { + LOG.warn( + "Failed to read partial logs from S3 for {}/{}: {}", + pipelineFQN, + runId, + ex.getMessage()); + } + result.put("logs", ""); result.put("after", null); result.put("total", 0L); @@ -534,15 +634,40 @@ public class S3LogStorage implements LogStorageInterface { @Override public void deleteLogs(String pipelineFQN, UUID runId) throws IOException { String key = buildS3Key(pipelineFQN, runId); + String partialKey = buildPartialS3Key(pipelineFQN, runId); - String bufferKey = pipelineFQN + "/" + runId; - logBuffers.remove(bufferKey); + // Clean up active stream if exists + String streamKey = pipelineFQN + "/" + runId; + StreamContext context = activeStreams.remove(streamKey); + if (context != null) { + try { + context.close(); + } catch (Exception e) { + LOG.warn("Error closing stream during delete: {}", e.getMessage()); + } + } + + // Clean up partial log offset tracking + partialLogOffsets.remove(streamKey); + + // Clear memory cache for this stream + recentLogsCache.invalidate(streamKey); try { + // Delete main logs file DeleteObjectRequest request = DeleteObjectRequest.builder().bucket(bucketName).key(key).build(); - s3Client.deleteObject(request); + + // Delete partial logs file if it exists + try { + DeleteObjectRequest partialRequest = + DeleteObjectRequest.builder().bucket(bucketName).key(partialKey).build(); + s3Client.deleteObject(partialRequest); + } catch (Exception e) { + // Partial file may not exist, which is fine + LOG.debug("Could not delete partial logs file (may not exist): {}", e.getMessage()); + } } catch (Exception e) { throw new IOException("Failed to delete logs from S3", e); } @@ -552,7 +677,23 @@ public class S3LogStorage implements LogStorageInterface { public void deleteAllLogs(String pipelineFQN) throws IOException { String keyPrefix = buildKeyPrefix(pipelineFQN); - logBuffers.entrySet().removeIf(entry -> entry.getKey().startsWith(pipelineFQN + "/")); + // Clean up active streams for this pipeline + activeStreams + .entrySet() + .removeIf( + entry -> { + if (entry.getKey().startsWith(pipelineFQN + "/")) { + try { + entry.getValue().close(); + // Clean up partial log offset tracking + partialLogOffsets.remove(entry.getKey()); + } catch (Exception e) { + LOG.warn("Error closing stream during deleteAll: {}", e.getMessage()); + } + return true; + } + return false; + }); try { ListObjectsV2Request request = @@ -613,8 +754,7 @@ public class S3LogStorage implements LogStorageInterface { @Override public void close() throws IOException { - flushAllBuffers(); - + // Close all active multipart upload streams for (StreamContext context : activeStreams.values()) { try { context.close(); @@ -623,7 +763,6 @@ public class S3LogStorage implements LogStorageInterface { } } activeStreams.clear(); - logBuffers.clear(); if (cleanupExecutor != null) { cleanupExecutor.shutdown(); @@ -637,21 +776,13 @@ public class S3LogStorage implements LogStorageInterface { } } - if (asyncExecutor != null) { - asyncExecutor.shutdown(); - try { - if (!asyncExecutor.awaitTermination(10, TimeUnit.SECONDS)) { - asyncExecutor.shutdownNow(); - } - } catch (InterruptedException e) { - asyncExecutor.shutdownNow(); - Thread.currentThread().interrupt(); - } - } - if (s3Client != null) { s3Client.close(); } + + if (s3AsyncClient != null) { + s3AsyncClient.close(); + } } private String buildS3Key(String pipelineFQN, UUID runId) { @@ -689,6 +820,10 @@ public class S3LogStorage implements LogStorageInterface { .id("pipeline-logs-expiration") .status(ExpirationStatus.ENABLED) .expiration(LifecycleExpiration.builder().days(expirationDays).build()) + .abortIncompleteMultipartUpload( + AbortIncompleteMultipartUpload.builder() + .daysAfterInitiation(7) // Clean up orphaned multipart uploads after 7 days + .build()) .filter(LifecycleRuleFilter.builder().prefix(prefix).build()) .build(); @@ -699,7 +834,9 @@ public class S3LogStorage implements LogStorageInterface { .build(); s3Client.putBucketLifecycleConfiguration(request); - LOG.info("S3 lifecycle policy configured with {} days expiration", expirationDays); + LOG.info( + "S3 lifecycle policy configured: {} days expiration, 7 days multipart cleanup", + expirationDays); } catch (Exception e) { LOG.warn("Failed to configure S3 lifecycle policy", e); } @@ -717,6 +854,9 @@ public class S3LogStorage implements LogStorageInterface { try { LOG.debug("Closing expired stream: {}", entry.getKey()); context.close(); + + // Clean up partial log offset tracking + partialLogOffsets.remove(entry.getKey()); } catch (Exception e) { LOG.error("Error closing expired stream: {}", entry.getKey(), e); } @@ -725,79 +865,172 @@ public class S3LogStorage implements LogStorageInterface { } } - private void flushAllBuffers() { - for (Map.Entry entry : logBuffers.entrySet()) { - LogBuffer buffer = entry.getValue(); - if (buffer.hasContent()) { - flushBufferAndWait(entry.getKey(), buffer); - } - } - } - - public void flush() { - flushAllBuffers(); - } - - private void flushBufferAndWait(String bufferKey, LogBuffer buffer) { - if (!buffer.hasContent()) { - return; - } - - String content = buffer.getAndReset(); - if (content == null || content.isEmpty()) { - return; - } - - Timer.Sample s3Sample = null; - if (metrics != null) { - s3Sample = metrics.startS3Operation(); - } - - try { - String key = buildS3Key(buffer.pipelineFQN, buffer.runId); - - String existingContent = ""; + /** + * Periodically write accumulated logs to partial files for active streams + * This allows reading complete logs even while ingestion is still running + */ + private void writePartialLogs() { + for (String streamKey : activeStreams.keySet()) { try { - GetObjectRequest getRequest = - GetObjectRequest.builder().bucket(bucketName).key(key).build(); - existingContent = s3Client.getObjectAsBytes(getRequest).asUtf8String(); - } catch (NoSuchKeyException e) { + writePartialLogsForStream(streamKey); + } catch (Exception e) { + LOG.warn("Failed to write partial logs for stream: {}", streamKey, e); + } + } + } + + private void writePartialLogsForStream(String streamKey) { + try { + // streamKey format is "pipelineFQN/runId" where runId is the last part after "/" + int lastSlashIndex = streamKey.lastIndexOf('/'); + if (lastSlashIndex == -1) { + LOG.warn("Invalid stream key format: {}", streamKey); + return; } - String updatedContent = existingContent + content; + String pipelineFQN = streamKey.substring(0, lastSlashIndex); + UUID runId = UUID.fromString(streamKey.substring(lastSlashIndex + 1)); - PutObjectRequest.Builder putRequestBuilder = - PutObjectRequest.builder().bucket(bucketName).key(key).contentType("text/plain"); - - if (!isCustomEndpoint && storageClass != null) { - putRequestBuilder.storageClass(storageClass); + SimpleLogBuffer buffer = recentLogsCache.getIfPresent(streamKey); + if (buffer == null) { + return; // No logs to write } - if (enableSSE && !isCustomEndpoint) { - putRequestBuilder.serverSideEncryption(this.sseAlgorithm); - if (this.sseAlgorithm == ServerSideEncryption.AWS_KMS && this.kmsKeyId != null) { - putRequestBuilder.ssekmsKeyId(this.kmsKeyId); + List allLines = buffer.getAllLines(); + if (allLines.isEmpty()) { + return; + } + + Long currentOffset = partialLogOffsets.getOrDefault(streamKey, 0L); + if (currentOffset >= allLines.size()) { + return; // No new logs since last write + } + + // Get new lines since last partial write + List newLines = allLines.subList(currentOffset.intValue(), allLines.size()); + if (newLines.isEmpty()) { + return; + } + + String partialKey = buildPartialS3Key(pipelineFQN, runId); + String newContent = String.join("\n", newLines) + "\n"; + + // Append to existing partial file or create new one + if (currentOffset > 0) { + // Append mode: get existing content and append new content + try { + GetObjectRequest getRequest = + GetObjectRequest.builder().bucket(bucketName).key(partialKey).build(); + String existingContent; + try (InputStream objectContent = s3Client.getObject(getRequest)) { + existingContent = new String(objectContent.readAllBytes(), StandardCharsets.UTF_8); + } + newContent = existingContent + newContent; + } catch (NoSuchKeyException e) { + // File doesn't exist, create new one } } + // Write to S3 + PutObjectRequest putRequest = + PutObjectRequest.builder() + .bucket(bucketName) + .key(partialKey) + .contentType("text/plain") + .build(); + s3Client.putObject( - putRequestBuilder.build(), - RequestBody.fromString(updatedContent, StandardCharsets.UTF_8)); + putRequest, software.amazon.awssdk.core.sync.RequestBody.fromString(newContent)); + // Record S3 write metrics if (metrics != null) { metrics.recordS3Write(); - if (s3Sample != null) { - metrics.recordS3Operation(s3Sample); - } - metrics.updateWriteThroughput(updatedContent.length()); } - String listenerKey = buffer.pipelineFQN + "_" + buffer.runId; - notifyListeners(listenerKey, content); + // Update offset + partialLogOffsets.put(streamKey, (long) allLines.size()); + + LOG.debug( + "Wrote {} new log lines to partial file for stream: {}", newLines.size(), streamKey); + } catch (Exception e) { - LOG.error("Failed to flush log buffer synchronously", e); - if (metrics != null) { - metrics.recordS3Error(); + LOG.warn("Failed to write partial logs for stream: {}", streamKey, e); + } + } + + /** + * Flush all active streams by closing them to finalize multipart uploads. + * This is called by tests to ensure logs are written to S3. + */ + public void flush() { + // Write final partial logs before closing + writePartialLogs(); + + // Close all active streams to finalize multipart uploads + for (Map.Entry entry : activeStreams.entrySet()) { + try { + LOG.debug("Flushing stream: {}", entry.getKey()); + entry.getValue().close(); + } catch (Exception e) { + LOG.error("Error flushing stream: {}", entry.getKey(), e); + } + } + activeStreams.clear(); + partialLogOffsets.clear(); + } + + /** + * Flush a specific pipeline run's stream to finalize multipart upload. + * This allows scoped flushing without affecting other active streams. + * + * @param pipelineFQN Fully qualified pipeline name + * @param runId Run identifier + */ + public void flush(String pipelineFQN, UUID runId) throws IOException { + String streamKey = pipelineFQN + "/" + runId; + + // Write final partial logs for this specific stream + try { + writePartialLogsForStream(streamKey); + } catch (Exception e) { + LOG.warn("Failed to write final partial logs for {}: {}", streamKey, e.getMessage()); + } + + StreamContext context = activeStreams.remove(streamKey); + partialLogOffsets.remove(streamKey); + + if (context != null) { + try { + LOG.debug("Flushing stream for pipeline: {}, runId: {}", pipelineFQN, runId); + context.close(); + } catch (Exception e) { + throw new IOException("Failed to flush stream for " + streamKey, e); + } + } + } + + /** + * Update metrics for all active streams. This provides visibility into: + * - Number of active multipart uploads + * - Total pending part uploads across all streams + */ + public void updateStreamMetrics() { + if (metrics != null) { + // Track active multipart uploads + metrics.updatePendingPartUploads(0); // Reset first + + int totalPendingParts = 0; + for (StreamContext context : activeStreams.values()) { + totalPendingParts += context.stream.getPendingUploadsCount(); + } + + metrics.updatePendingPartUploads(totalPendingParts); + metrics.incrementMultipartUploads(); + metrics.decrementMultipartUploads(); + // Set to actual count + int activeCount = activeStreams.size(); + for (int i = 0; i < activeCount; i++) { + metrics.incrementMultipartUploads(); } } } @@ -811,27 +1044,27 @@ public class S3LogStorage implements LogStorageInterface { runId, getServerId()); - asyncExecutor.submit( - () -> { - try { - PutObjectRequest request = - PutObjectRequest.builder() - .bucket(bucketName) - .key(markerKey) - .contentType("text/plain") - .metadata( - Map.of( - "server-id", getServerId(), - "timestamp", String.valueOf(System.currentTimeMillis()), - "pipeline", pipelineFQN)) - .build(); + // Mark run as active asynchronously using S3AsyncClient + PutObjectRequest request = + PutObjectRequest.builder() + .bucket(bucketName) + .key(markerKey) + .contentType("text/plain") + .metadata( + Map.of( + "server-id", getServerId(), + "timestamp", String.valueOf(System.currentTimeMillis()), + "pipeline", pipelineFQN)) + .build(); - s3Client.putObject( - request, RequestBody.fromString(String.valueOf(System.currentTimeMillis()))); - } catch (Exception e) { - LOG.debug("Failed to mark run as active: {}/{}", pipelineFQN, runId, e); - } - }); + s3AsyncClient + .putObject(request, AsyncRequestBody.fromString(String.valueOf(System.currentTimeMillis()))) + .whenComplete( + (response, throwable) -> { + if (throwable != null) { + LOG.debug("Failed to mark run as active: {}/{}", pipelineFQN, runId, throwable); + } + }); } private String getServerId() { @@ -846,83 +1079,19 @@ public class S3LogStorage implements LogStorageInterface { return serverId; } - private void flushBuffer(String bufferKey, LogBuffer buffer) { - if (!buffer.hasContent()) { - return; - } - - String content = buffer.getAndReset(); - if (content == null || content.isEmpty()) { - return; - } - - asyncExecutor.submit( - () -> { - Timer.Sample s3Sample = null; - if (metrics != null) { - s3Sample = metrics.startS3Operation(); - } - - try { - String key = buildS3Key(buffer.pipelineFQN, buffer.runId); - - String existingContent = ""; - try { - GetObjectRequest getRequest = - GetObjectRequest.builder().bucket(bucketName).key(key).build(); - LOG.debug("Attempting to read from S3: bucket='{}', key='{}'", bucketName, key); - existingContent = s3Client.getObjectAsBytes(getRequest).asUtf8String(); - } catch (NoSuchKeyException e) { - LOG.debug("Object doesn't exist yet for key: {}", key); - } catch (S3Exception e) { - LOG.error("S3 error reading key '{}': {}", key, e.getMessage()); - throw e; - } - - String updatedContent = existingContent + content; - - PutObjectRequest.Builder putRequestBuilder = - PutObjectRequest.builder().bucket(bucketName).key(key).contentType("text/plain"); - - if (!isCustomEndpoint && storageClass != null) { - putRequestBuilder.storageClass(storageClass); - } - - if (enableSSE && !isCustomEndpoint) { - putRequestBuilder.serverSideEncryption(ServerSideEncryption.AES256); - } - - s3Client.putObject( - putRequestBuilder.build(), - RequestBody.fromString(updatedContent, StandardCharsets.UTF_8)); - - if (metrics != null) { - metrics.recordS3Write(); - if (s3Sample != null) { - metrics.recordS3Operation(s3Sample); - } - metrics.updateWriteThroughput(updatedContent.length()); - } - - } catch (Exception e) { - LOG.error("Failed to flush log buffer for {}/{}", buffer.pipelineFQN, buffer.runId, e); - if (metrics != null) { - metrics.recordS3Error(); - } - } - }); - } - /** * Context for tracking active streams with TTL */ private static class StreamContext { final MultipartS3OutputStream stream; volatile long lastAccessTime; + private final StreamableLogsMetrics metrics; - StreamContext(MultipartS3OutputStream stream, long creationTime) { + StreamContext( + MultipartS3OutputStream stream, long creationTime, StreamableLogsMetrics metrics) { this.stream = stream; this.lastAccessTime = creationTime; + this.metrics = metrics; } void updateAccessTime() { @@ -934,86 +1103,50 @@ public class S3LogStorage implements LogStorageInterface { } } - /** - * Buffer for batching log writes - */ - private static class LogBuffer { - private final String pipelineFQN; - private final UUID runId; - private final StringBuilder buffer; - private final long creationTime; - private static final int MAX_BUFFER_SIZE = 64 * 1024; // 64KB - private static final long MAX_AGE_MS = 5000; // 5 seconds - - LogBuffer(String pipelineFQN, UUID runId) { - this.pipelineFQN = pipelineFQN; - this.runId = runId; - this.buffer = new StringBuilder(); - this.creationTime = System.currentTimeMillis(); - } - - synchronized void append(String content) { - buffer.append(content); - } - - synchronized boolean hasContent() { - return buffer.length() > 0; - } - - synchronized boolean shouldFlush() { - return buffer.length() >= MAX_BUFFER_SIZE; - } - - boolean isExpired() { - return System.currentTimeMillis() - creationTime > MAX_AGE_MS; - } - - synchronized String getAndReset() { - if (buffer.length() == 0) { - return null; - } - String content = buffer.toString(); - buffer.setLength(0); - return content; - } - } - /** * Custom OutputStream for streaming data to S3 using multipart uploads * This properly handles append operations without data loss */ private static class MultipartS3OutputStream extends OutputStream { - private final S3Client s3Client; + private final S3AsyncClient s3AsyncClient; private final String bucketName; private final String key; private final boolean enableSSE; private final StorageClass storageClass; private final boolean isCustomEndpoint; - private final ExecutorService executor; + private final ServerSideEncryption sseAlgorithm; + private final String kmsKeyId; private final List completedParts; + private final List> pendingUploads; private final ByteArrayOutputStream buffer; + private final StreamableLogsMetrics metrics; private String uploadId; private int partNumber = 1; private final AtomicBoolean closed = new AtomicBoolean(false); private static final int PART_SIZE = 5 * 1024 * 1024; // 5MB minimum for multipart public MultipartS3OutputStream( - S3Client s3Client, + S3AsyncClient s3AsyncClient, String bucketName, String key, boolean enableSSE, StorageClass storageClass, boolean isCustomEndpoint, - ExecutorService executor) + ServerSideEncryption sseAlgorithm, + String kmsKeyId, + StreamableLogsMetrics metrics) throws IOException { - this.s3Client = s3Client; + this.s3AsyncClient = s3AsyncClient; this.bucketName = bucketName; this.key = key; this.enableSSE = enableSSE; this.storageClass = storageClass; this.isCustomEndpoint = isCustomEndpoint; - this.executor = executor; + this.sseAlgorithm = sseAlgorithm; + this.kmsKeyId = kmsKeyId; + this.metrics = metrics; this.completedParts = new ArrayList<>(); + this.pendingUploads = new ArrayList<>(); this.buffer = new ByteArrayOutputStream(PART_SIZE); initializeMultipartUpload(); @@ -1032,11 +1165,18 @@ public class S3LogStorage implements LogStorageInterface { } if (enableSSE && !isCustomEndpoint) { - requestBuilder.serverSideEncryption(ServerSideEncryption.AES256); + if (sseAlgorithm != null) { + requestBuilder.serverSideEncryption(sseAlgorithm); + if (sseAlgorithm == ServerSideEncryption.AWS_KMS && kmsKeyId != null) { + requestBuilder.ssekmsKeyId(kmsKeyId); + } + } else { + requestBuilder.serverSideEncryption(ServerSideEncryption.AES256); + } } CreateMultipartUploadResponse response = - s3Client.createMultipartUpload(requestBuilder.build()); + s3AsyncClient.createMultipartUpload(requestBuilder.build()).join(); this.uploadId = response.uploadId(); } catch (Exception e) { throw new IOException("Failed to initialize multipart upload", e); @@ -1072,21 +1212,35 @@ public class S3LogStorage implements LogStorageInterface { public void close() throws IOException { if (closed.compareAndSet(false, true)) { try { + // Upload any remaining data if (buffer.size() > 0) { uploadPart(); } + // Wait for all pending uploads to complete + if (!pendingUploads.isEmpty()) { + CompletableFuture.allOf(pendingUploads.toArray(new CompletableFuture[0])).join(); + } + if (uploadId != null && !completedParts.isEmpty()) { + // Sort parts by part number before completing + List sortedParts = new ArrayList<>(completedParts); + sortedParts.sort((p1, p2) -> Integer.compare(p1.partNumber(), p2.partNumber())); + CompleteMultipartUploadRequest completeRequest = CompleteMultipartUploadRequest.builder() .bucket(bucketName) .key(key) .uploadId(uploadId) - .multipartUpload( - CompletedMultipartUpload.builder().parts(completedParts).build()) + .multipartUpload(CompletedMultipartUpload.builder().parts(sortedParts).build()) .build(); - s3Client.completeMultipartUpload(completeRequest); + s3AsyncClient.completeMultipartUpload(completeRequest).join(); + + // Record S3 write metrics for multipart upload completion + if (metrics != null) { + metrics.recordS3Write(); + } } else if (uploadId != null) { AbortMultipartUploadRequest abortRequest = AbortMultipartUploadRequest.builder() @@ -1095,17 +1249,19 @@ public class S3LogStorage implements LogStorageInterface { .uploadId(uploadId) .build(); - s3Client.abortMultipartUpload(abortRequest); + s3AsyncClient.abortMultipartUpload(abortRequest).join(); } } catch (Exception e) { if (uploadId != null) { try { - s3Client.abortMultipartUpload( - AbortMultipartUploadRequest.builder() - .bucket(bucketName) - .key(key) - .uploadId(uploadId) - .build()); + s3AsyncClient + .abortMultipartUpload( + AbortMultipartUploadRequest.builder() + .bucket(bucketName) + .key(key) + .uploadId(uploadId) + .build()) + .join(); } catch (Exception abortEx) { LOG.error("Failed to abort multipart upload", abortEx); } @@ -1124,6 +1280,7 @@ public class S3LogStorage implements LogStorageInterface { byte[] data = buffer.toByteArray(); buffer.reset(); + final int currentPartNumber = partNumber++; try { UploadPartRequest uploadRequest = @@ -1131,20 +1288,46 @@ public class S3LogStorage implements LogStorageInterface { .bucket(bucketName) .key(key) .uploadId(uploadId) - .partNumber(partNumber) + .partNumber(currentPartNumber) .build(); - UploadPartResponse response = - s3Client.uploadPart(uploadRequest, RequestBody.fromBytes(data)); + // Upload asynchronously without blocking + CompletableFuture uploadFuture = + s3AsyncClient + .uploadPart(uploadRequest, AsyncRequestBody.fromBytes(data)) + .thenApply( + response -> + CompletedPart.builder() + .partNumber(currentPartNumber) + .eTag(response.eTag()) + .build()); - completedParts.add( - CompletedPart.builder().partNumber(partNumber).eTag(response.eTag()).build()); + // Track pending uploads + pendingUploads.add(uploadFuture); + + // Store completed part when ready + uploadFuture.whenComplete( + (part, throwable) -> { + if (throwable == null) { + synchronized (completedParts) { + completedParts.add(part); + } + } else { + LOG.error("Failed to upload part " + currentPartNumber, throwable); + } + }); - partNumber++; } catch (Exception e) { - throw new IOException("Failed to upload part " + (partNumber - 1), e); + throw new IOException("Failed to upload part " + currentPartNumber, e); } } + + /** + * Get the count of pending part uploads for monitoring + */ + public int getPendingUploadsCount() { + return pendingUploads.size(); + } } /** @@ -1154,7 +1337,7 @@ public class S3LogStorage implements LogStorageInterface { String key = pipelineFQN + "/" + runId; activeListeners.computeIfAbsent(key, k -> new CopyOnWriteArrayList<>()).add(listener); - CircularBuffer recentLogs = recentLogsCache.getIfPresent(key); + SimpleLogBuffer recentLogs = recentLogsCache.getIfPresent(key); if (recentLogs != null) { List recent = recentLogs.getRecentLines(100); for (String line : recent) { @@ -1198,7 +1381,7 @@ public class S3LogStorage implements LogStorageInterface { */ public List getRecentLogs(String pipelineFQN, UUID runId, int lines) { String key = pipelineFQN + "/" + runId; - CircularBuffer buffer = recentLogsCache.getIfPresent(key); + SimpleLogBuffer buffer = recentLogsCache.getIfPresent(key); if (buffer != null) { return buffer.getRecentLines(lines); } @@ -1206,41 +1389,151 @@ public class S3LogStorage implements LogStorageInterface { } /** - * Simple circular buffer for keeping recent logs in memory + * Get logs for active streams: try S3 partial file first, fallback to memory cache + * This provides the best experience: processed logs when available, recent logs when not */ - private static class CircularBuffer { - private final String[] buffer; - private final AtomicInteger writeIndex = new AtomicInteger(0); - private final AtomicInteger size = new AtomicInteger(0); + private Map getCombinedLogsForActiveStream( + String pipelineFQN, UUID runId, String afterCursor, int limit) throws IOException { + Map result = new HashMap<>(); + List allLines = new ArrayList<>(); + boolean foundPartialFile = false; - CircularBuffer(int capacity) { - this.buffer = new String[capacity]; + // First, try to read from the processed/partial logs file in S3 + String partialKey = buildPartialS3Key(pipelineFQN, runId); + try { + GetObjectRequest getRequest = + GetObjectRequest.builder().bucket(bucketName).key(partialKey).build(); + try (InputStream objectContent = s3Client.getObject(getRequest); + BufferedReader reader = + new BufferedReader(new InputStreamReader(objectContent, StandardCharsets.UTF_8))) { + + String line; + while ((line = reader.readLine()) != null) { + allLines.add(line); + } + foundPartialFile = true; + LOG.debug( + "Read {} processed lines from partial S3 file for {}/{}", + allLines.size(), + pipelineFQN, + runId); + } + } catch (NoSuchKeyException e) { + // No partial file exists yet, which is normal for new streams + LOG.debug( + "No processed logs file found for {}/{}, will use memory cache", pipelineFQN, runId); + } catch (Exception e) { + LOG.warn( + "Failed to read processed logs from S3 for {}/{}: {}, will use memory cache", + pipelineFQN, + runId, + e.getMessage()); } - synchronized void append(String logLine) { - String[] lines = logLine.split("\n"); - for (String line : lines) { - int index = writeIndex.getAndIncrement() % buffer.length; - buffer[index] = line; - if (size.get() < buffer.length) { - size.incrementAndGet(); + // If no S3 partial file, fallback to memory cache + if (!foundPartialFile) { + String streamKey = pipelineFQN + "/" + runId; + SimpleLogBuffer buffer = recentLogsCache.getIfPresent(streamKey); + if (buffer != null) { + if (afterCursor != null && !afterCursor.isEmpty()) { + // For pagination, use all lines to support proper cursor-based navigation + allLines.addAll(buffer.getAllLines()); + } else { + // For live logs (no cursor), show recent lines for better performance + allLines.addAll(buffer.getRecentLines(limit)); + } + LOG.debug( + "Using {} lines from memory cache for active pipeline {}/{}", + allLines.size(), + pipelineFQN, + runId); + } + } + + // Apply pagination if needed + int startIndex = 0; + if (afterCursor != null && !afterCursor.isEmpty()) { + try { + startIndex = Integer.parseInt(afterCursor); + } catch (NumberFormatException e) { + LOG.warn("Invalid cursor format: {}", afterCursor); + } + } + + int endIndex = Math.min(startIndex + limit, allLines.size()); + List resultLines = + startIndex < allLines.size() + ? allLines.subList(startIndex, endIndex) + : Collections.emptyList(); + + result.put("logs", String.join("\n", resultLines)); + result.put("after", endIndex < allLines.size() ? String.valueOf(endIndex) : null); + result.put("total", (long) allLines.size()); + + return result; + } + + /** + * Build S3 key for partial logs (completed parts while stream is still active) + */ + private String buildPartialS3Key(String pipelineFQN, UUID runId) { + String sanitizedFQN = pipelineFQN.replaceAll("[^a-zA-Z0-9_-]", "_"); + String cleanPrefix = + prefix != null && prefix.endsWith("/") + ? prefix.substring(0, prefix.length() - 1) + : (prefix != null ? prefix : ""); + return String.format("%s/%s/%s/partial.txt", cleanPrefix, sanitizedFQN, runId); + } + + /** + * Simple append-only log buffer that maintains chronological order + */ + private static class SimpleLogBuffer { + private static final int MAX_LINE_LENGTH = 10 * 1024; // 10KB per line max + private final int maxCapacity; + private final List lines = Collections.synchronizedList(new ArrayList<>()); + + SimpleLogBuffer(int maxCapacity) { + this.maxCapacity = maxCapacity; + } + + synchronized void append(String logContent) { + String[] newLines = logContent.split("\n"); + for (String line : newLines) { + // Truncate individual lines to prevent memory exhaustion + String truncatedLine = line; + if (line.length() > MAX_LINE_LENGTH) { + truncatedLine = + line.substring(0, MAX_LINE_LENGTH) + + "... [truncated " + + (line.length() - MAX_LINE_LENGTH) + + " chars]"; + } + + lines.add(truncatedLine); + + // Keep only the most recent lines to prevent unlimited memory growth + if (lines.size() > maxCapacity) { + // Remove oldest lines to stay within capacity + int excess = lines.size() - maxCapacity; + for (int i = 0; i < excess; i++) { + lines.remove(0); + } } } } synchronized List getRecentLines(int count) { - List result = new ArrayList<>(); - int currentSize = size.get(); - int start = Math.max(0, currentSize - count); - - for (int i = start; i < currentSize && result.size() < count; i++) { - int index = i % buffer.length; - if (buffer[index] != null) { - result.add(buffer[index]); - } + if (lines.isEmpty()) { + return Collections.emptyList(); } - return result; + int start = Math.max(0, lines.size() - count); + return new ArrayList<>(lines.subList(start, lines.size())); + } + + synchronized List getAllLines() { + return new ArrayList<>(lines); } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/monitoring/StreamableLogsMetrics.java b/openmetadata-service/src/main/java/org/openmetadata/service/monitoring/StreamableLogsMetrics.java index c62e37f5e4e..f6e48fffffa 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/monitoring/StreamableLogsMetrics.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/monitoring/StreamableLogsMetrics.java @@ -49,6 +49,7 @@ public class StreamableLogsMetrics { private final Timer s3OperationLatency; private final AtomicInteger activeMultipartUploads; private final AtomicLong s3WriteThroughputBytes; + private final AtomicInteger pendingPartUploads; private final Counter sessionCreated; private final Counter sessionReused; @@ -162,6 +163,12 @@ public class StreamableLogsMetrics { .description("Current S3 write throughput in bytes/sec") .register(meterRegistry); + // Add gauge for pending part uploads + this.pendingPartUploads = new AtomicInteger(0); + Gauge.builder("om_s3_pending_part_uploads", pendingPartUploads, AtomicInteger::get) + .description("Number of pending S3 multipart part uploads") + .register(meterRegistry); + this.sessionCreated = Counter.builder("om_streamable_logs_sessions_created_total") .description("Total number of new sessions created") @@ -298,6 +305,18 @@ public class StreamableLogsMetrics { s3WriteThroughputBytes.set(bytesPerSecond); } + public void updatePendingPartUploads(int count) { + pendingPartUploads.set(count); + } + + public void incrementPendingPartUploads() { + pendingPartUploads.incrementAndGet(); + } + + public void decrementPendingPartUploads() { + pendingPartUploads.decrementAndGet(); + } + public void recordSessionCreated() { sessionCreated.increment(); activeSessions.incrementAndGet(); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/resources/services/ingestionpipelines/IngestionPipelineResource.java b/openmetadata-service/src/main/java/org/openmetadata/service/resources/services/ingestionpipelines/IngestionPipelineResource.java index 43dbcf5c8c9..809d5157a99 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/resources/services/ingestionpipelines/IngestionPipelineResource.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/resources/services/ingestionpipelines/IngestionPipelineResource.java @@ -1024,20 +1024,61 @@ public class IngestionPipelineResource return Response.status(200).entity("Pipeline Client Disabled").build(); } IngestionPipeline ingestionPipeline = - getInternal(uriInfo, securityContext, id, FIELDS, Include.NON_DELETED); + getInternal( + uriInfo, + securityContext, + id, + "pipelineStatuses,ingestionRunner", + Include.NON_DELETED); String filename = String.format( "ingestion_logs_%s_%d.txt", ingestionPipeline.getName(), System.currentTimeMillis()); + boolean useStreamableLogs = + ingestionPipeline.getEnableStreamableLogs() + || (ingestionPipeline.getIngestionRunner() != null + && repository.isIngestionRunnerStreamableLogsEnabled( + ingestionPipeline.getIngestionRunner())); + StreamingOutput streamingOutput = output -> { String cursor = null; boolean hasMoreData = true; while (hasMoreData) { - Map logChunk = - pipelineServiceClient.getLastIngestionLogs(ingestionPipeline, cursor); + Map logChunk; + + if (useStreamableLogs) { + // Get logs using the repository's log storage picking up the last runId + String runId = ingestionPipeline.getPipelineStatuses().getRunId(); + if (CommonUtil.nullOrEmpty(runId)) { + throw new PipelineServiceClientException( + "No runId found for the last ingestion pipeline run"); + } + + Map lastIngestionLogsMap = + repository.getLogs( + ingestionPipeline.getFullyQualifiedName(), + UUID.fromString(runId), + cursor, + 1000); + logChunk = + lastIngestionLogsMap.entrySet().stream() + .filter(entry -> entry.getValue() != null) + .collect( + Collectors.toMap( + Map.Entry::getKey, entry -> entry.getValue().toString())); + Object logs = logChunk.remove("logs"); + if (logs != null) { + logChunk.put( + TYPE_TO_TASK.get(ingestionPipeline.getPipelineType().toString()), + logs.toString()); + } + } else { + // Get the logs from the service client + logChunk = pipelineServiceClient.getLastIngestionLogs(ingestionPipeline, cursor); + } if (logChunk == null || logChunk.isEmpty()) { break; diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/logstorage/S3LogStorageTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/logstorage/S3LogStorageTest.java index 455476b09f3..662b662bb3a 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/logstorage/S3LogStorageTest.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/logstorage/S3LogStorageTest.java @@ -20,6 +20,7 @@ import static org.mockito.Mockito.*; import java.io.*; import java.nio.charset.StandardCharsets; import java.util.*; +import java.util.concurrent.CompletableFuture; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -28,9 +29,11 @@ import org.mockito.MockedStatic; import org.mockito.junit.jupiter.MockitoExtension; import org.openmetadata.schema.api.configuration.LogStorageConfiguration; import org.openmetadata.schema.security.credentials.AWSCredentials; -import software.amazon.awssdk.core.ResponseBytes; -import software.amazon.awssdk.core.sync.RequestBody; -import software.amazon.awssdk.core.sync.ResponseTransformer; +import software.amazon.awssdk.core.ResponseInputStream; +import software.amazon.awssdk.core.async.AsyncRequestBody; +import software.amazon.awssdk.http.AbortableInputStream; +import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.S3AsyncClientBuilder; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.S3ClientBuilder; import software.amazon.awssdk.services.s3.model.*; @@ -39,6 +42,7 @@ import software.amazon.awssdk.services.s3.model.*; public class S3LogStorageTest { @Mock private S3Client mockS3Client; + @Mock private S3AsyncClient mockS3AsyncClient; private S3LogStorage s3LogStorage; private LogStorageConfiguration testConfig; @@ -60,14 +64,22 @@ public class S3LogStorageTest { .withStorageClass(LogStorageConfiguration.StorageClass.STANDARD_IA) .withExpirationDays(30); - // Mock S3Client builder - try (MockedStatic s3ClientMock = mockStatic(S3Client.class)) { + // Mock S3Client and S3AsyncClient builders + try (MockedStatic s3ClientMock = mockStatic(S3Client.class); + MockedStatic s3AsyncClientMock = mockStatic(S3AsyncClient.class)) { + S3ClientBuilder mockBuilder = mock(S3ClientBuilder.class); when(S3Client.builder()).thenReturn(mockBuilder); when(mockBuilder.region(any())).thenReturn(mockBuilder); when(mockBuilder.credentialsProvider(any())).thenReturn(mockBuilder); when(mockBuilder.build()).thenReturn(mockS3Client); + S3AsyncClientBuilder mockAsyncBuilder = mock(S3AsyncClientBuilder.class); + when(S3AsyncClient.builder()).thenReturn(mockAsyncBuilder); + when(mockAsyncBuilder.region(any())).thenReturn(mockAsyncBuilder); + when(mockAsyncBuilder.credentialsProvider(any())).thenReturn(mockAsyncBuilder); + when(mockAsyncBuilder.build()).thenReturn(mockS3AsyncClient); + // Initialize S3LogStorage s3LogStorage = new S3LogStorage(); Map config = new HashMap<>(); @@ -81,6 +93,13 @@ public class S3LogStorageTest { } } + // Helper method to create ResponseInputStream for mocking + private ResponseInputStream createResponseInputStream(String content) { + InputStream inputStream = new ByteArrayInputStream(content.getBytes(StandardCharsets.UTF_8)); + GetObjectResponse response = GetObjectResponse.builder().build(); + return new ResponseInputStream<>(response, AbortableInputStream.create(inputStream)); + } + @Test void testS3LogStorageInitialization() { assertNotNull(s3LogStorage); @@ -89,31 +108,43 @@ public class S3LogStorageTest { @Test void testAppendLogs() throws IOException { - String existingContent = "Existing log content\n"; String newContent = "New log content\n"; String expectedKey = String.format("%s/%s/%s/logs.txt", testPrefix, testPipelineFQN, testRunId); - // Mock getting existing content - when(mockS3Client.getObjectAsBytes(any(GetObjectRequest.class))) + // Mock async multipart upload initialization + when(mockS3AsyncClient.createMultipartUpload(any(CreateMultipartUploadRequest.class))) .thenReturn( - ResponseBytes.fromByteArray( - GetObjectResponse.builder().build(), - existingContent.getBytes(StandardCharsets.UTF_8))); + CompletableFuture.completedFuture( + CreateMultipartUploadResponse.builder().uploadId("test-upload-id").build())); - // Mock putting updated content - when(mockS3Client.putObject(any(PutObjectRequest.class), any(RequestBody.class))) - .thenReturn(PutObjectResponse.builder().build()); + // Mock async upload part + when(mockS3AsyncClient.uploadPart(any(UploadPartRequest.class), any(AsyncRequestBody.class))) + .thenReturn( + CompletableFuture.completedFuture( + UploadPartResponse.builder().eTag("test-etag").build())); + + // Mock async complete multipart upload + when(mockS3AsyncClient.completeMultipartUpload(any(CompleteMultipartUploadRequest.class))) + .thenReturn( + CompletableFuture.completedFuture(CompleteMultipartUploadResponse.builder().build())); + + // Mock async putObject for marking run as active + when(mockS3AsyncClient.putObject(any(PutObjectRequest.class), any(AsyncRequestBody.class))) + .thenReturn(CompletableFuture.completedFuture(PutObjectResponse.builder().build())); // Test appending logs assertDoesNotThrow(() -> s3LogStorage.appendLogs(testPipelineFQN, testRunId, newContent)); - // Verify S3 operations - verify(mockS3Client) - .getObjectAsBytes( - argThat( - (GetObjectRequest request) -> - request.bucket().equals(testBucket) && request.key().equals(expectedKey))); - verify(mockS3Client).putObject(any(PutObjectRequest.class), any(RequestBody.class)); + // Verify multipart upload was initiated + verify(mockS3AsyncClient, times(1)) + .createMultipartUpload(any(CreateMultipartUploadRequest.class)); + + // Flush to complete multipart upload + s3LogStorage.flush(); + + // Verify multipart upload was completed + verify(mockS3AsyncClient, times(1)) + .completeMultipartUpload(any(CompleteMultipartUploadRequest.class)); } @Test @@ -121,19 +152,25 @@ public class S3LogStorageTest { String newContent = "First log content\n"; String expectedKey = String.format("%s/%s/%s/logs.txt", testPrefix, testPipelineFQN, testRunId); - // Mock getting non-existent content - when(mockS3Client.getObjectAsBytes(any(GetObjectRequest.class))) - .thenThrow(NoSuchKeyException.builder().build()); + // Mock async multipart upload for new file + when(mockS3AsyncClient.createMultipartUpload(any(CreateMultipartUploadRequest.class))) + .thenReturn( + CompletableFuture.completedFuture( + CreateMultipartUploadResponse.builder().uploadId("test-upload-id").build())); - // Mock putting new content - when(mockS3Client.putObject(any(PutObjectRequest.class), any(RequestBody.class))) - .thenReturn(PutObjectResponse.builder().build()); + // Mock async putObject for marking run as active + when(mockS3AsyncClient.putObject(any(PutObjectRequest.class), any(AsyncRequestBody.class))) + .thenReturn(CompletableFuture.completedFuture(PutObjectResponse.builder().build())); + + // Note: uploadPart and completeMultipartUpload won't be called until flush/close + // since the content is too small (< 5MB) // Test appending logs to new file assertDoesNotThrow(() -> s3LogStorage.appendLogs(testPipelineFQN, testRunId, newContent)); - // Verify S3 operations - verify(mockS3Client).putObject(any(PutObjectRequest.class), any(RequestBody.class)); + // Verify multipart upload was initiated + verify(mockS3AsyncClient, times(1)) + .createMultipartUpload(any(CreateMultipartUploadRequest.class)); } @Test @@ -145,11 +182,9 @@ public class S3LogStorageTest { when(mockS3Client.headObject(any(HeadObjectRequest.class))) .thenReturn(HeadObjectResponse.builder().contentLength((long) logContent.length()).build()); - // Mock get object - InputStream contentStream = - new ByteArrayInputStream(logContent.getBytes(StandardCharsets.UTF_8)); - when(mockS3Client.getObject(any(GetObjectRequest.class), any(ResponseTransformer.class))) - .thenReturn(contentStream); + // Mock get object - use ResponseInputStream + when(mockS3Client.getObject(any(GetObjectRequest.class))) + .thenReturn(createResponseInputStream(logContent)); // Test getting logs Map result = s3LogStorage.getLogs(testPipelineFQN, testRunId, null, 2); @@ -168,11 +203,9 @@ public class S3LogStorageTest { when(mockS3Client.headObject(any(HeadObjectRequest.class))) .thenReturn(HeadObjectResponse.builder().contentLength((long) logContent.length()).build()); - // Mock get object - InputStream contentStream = - new ByteArrayInputStream(logContent.getBytes(StandardCharsets.UTF_8)); - when(mockS3Client.getObject(any(GetObjectRequest.class), any(ResponseTransformer.class))) - .thenReturn(contentStream); + // Mock get object - use ResponseInputStream + when(mockS3Client.getObject(any(GetObjectRequest.class))) + .thenReturn(createResponseInputStream(logContent)); // Test getting logs with cursor Map result = s3LogStorage.getLogs(testPipelineFQN, testRunId, "2", 2); @@ -235,11 +268,7 @@ public class S3LogStorageTest { assertDoesNotThrow(() -> s3LogStorage.deleteLogs(testPipelineFQN, testRunId)); // Verify delete was called - verify(mockS3Client) - .deleteObject( - argThat( - (DeleteObjectRequest request) -> - request.bucket().equals(testBucket) && request.key().equals(expectedKey))); + verify(mockS3Client, times(1)).deleteObject(any(DeleteObjectRequest.class)); } @Test @@ -264,12 +293,10 @@ public class S3LogStorageTest { @Test void testGetLogInputStream() throws IOException { String logContent = "Stream content"; - InputStream expectedStream = - new ByteArrayInputStream(logContent.getBytes(StandardCharsets.UTF_8)); - // Mock get object - when(mockS3Client.getObject(any(GetObjectRequest.class), any(ResponseTransformer.class))) - .thenReturn(expectedStream); + // Mock get object - use ResponseInputStream + when(mockS3Client.getObject(any(GetObjectRequest.class))) + .thenReturn(createResponseInputStream(logContent)); // Test getting input stream InputStream stream = s3LogStorage.getLogInputStream(testPipelineFQN, testRunId); @@ -280,6 +307,21 @@ public class S3LogStorageTest { @Test void testGetLogOutputStream() throws IOException { + // Mock async multipart upload operations + when(mockS3AsyncClient.createMultipartUpload(any(CreateMultipartUploadRequest.class))) + .thenReturn( + CompletableFuture.completedFuture( + CreateMultipartUploadResponse.builder().uploadId("test-upload-id").build())); + + when(mockS3AsyncClient.uploadPart(any(UploadPartRequest.class), any(AsyncRequestBody.class))) + .thenReturn( + CompletableFuture.completedFuture( + UploadPartResponse.builder().eTag("test-etag").build())); + + when(mockS3AsyncClient.completeMultipartUpload(any(CompleteMultipartUploadRequest.class))) + .thenReturn( + CompletableFuture.completedFuture(CompleteMultipartUploadResponse.builder().build())); + // Test getting output stream OutputStream stream = s3LogStorage.getLogOutputStream(testPipelineFQN, testRunId); @@ -289,17 +331,32 @@ public class S3LogStorageTest { // Write some data stream.write("Test output".getBytes(StandardCharsets.UTF_8)); stream.close(); + + // Verify multipart upload was initiated and completed + verify(mockS3AsyncClient).createMultipartUpload(any(CreateMultipartUploadRequest.class)); + verify(mockS3AsyncClient).completeMultipartUpload(any(CompleteMultipartUploadRequest.class)); } @Test void testClose() throws IOException { + // Mock async multipart upload operations + when(mockS3AsyncClient.createMultipartUpload(any(CreateMultipartUploadRequest.class))) + .thenReturn( + CompletableFuture.completedFuture( + CreateMultipartUploadResponse.builder().uploadId("test-upload-id").build())); + + when(mockS3AsyncClient.abortMultipartUpload(any(AbortMultipartUploadRequest.class))) + .thenReturn( + CompletableFuture.completedFuture(AbortMultipartUploadResponse.builder().build())); + // Create and add a mock stream OutputStream stream = s3LogStorage.getLogOutputStream(testPipelineFQN, testRunId); // Test closing assertDoesNotThrow(() -> s3LogStorage.close()); - // Verify S3 client was closed + // Verify S3 clients were closed verify(mockS3Client).close(); + verify(mockS3AsyncClient).close(); } }