From 1c710ef5e35ca82ca1d8447acd776aff17d17c23 Mon Sep 17 00:00:00 2001 From: Suman Maharana Date: Tue, 23 Sep 2025 14:35:14 +0530 Subject: [PATCH] Fix Stream logger url (#23491) --- conf/openmetadata.yaml | 12 +++++++++ .../ingestion/ometa/mixins/logs_mixin.py | 27 ++++++++++++++----- .../service/logstorage/S3LogStorage.java | 21 ++++++++++----- .../IngestionPipelineLogStorageTest.java | 12 ++++++++- .../IngestionPipelineLogStreamingTest.java | 8 ++++++ 5 files changed, 67 insertions(+), 13 deletions(-) diff --git a/conf/openmetadata.yaml b/conf/openmetadata.yaml index 901116b0040..d416512847b 100644 --- a/conf/openmetadata.yaml +++ b/conf/openmetadata.yaml @@ -359,6 +359,18 @@ pipelineServiceClientConfiguration: verifySSL: ${PIPELINE_SERVICE_CLIENT_VERIFY_SSL:-"no-ssl"} # Possible values are "no-ssl", "ignore", "validate" sslConfig: certificatePath: ${PIPELINE_SERVICE_CLIENT_SSL_CERT_PATH:-""} # Local path for the Pipeline Service Client + logStorageConfiguration: + type: ${PIPELINE_SERVICE_CLIENT_LOG_TYPE:-"default"} # Possible values are "default", "s3" + # if type is s3, provide the following configuration + bucketName: ${PIPELINE_SERVICE_CLIENT_LOG_BUCKET_NAME:-""} + # optional path within the bucket to store the logs + prefix: ${PIPELINE_SERVICE_CLIENT_LOG_PREFIX:-""} + region: ${PIPELINE_SERVICE_CLIENT_LOG_REGION:-""} + awsConfig: + awsAccessKeyId: ${PIPELINE_SERVICE_CLIENT_LOG_AWS_ACCESS_KEY_ID:-""} + awsSecretAccessKey: ${PIPELINE_SERVICE_CLIENT_LOG_AWS_SECRET_ACCESS_KEY:-""} + awsRegion: ${PIPELINE_SERVICE_CLIENT_LOG_REGION:-""} + awsSessionToken: ${PIPELINE_SERVICE_CLIENT_LOG_AWS_SESSION_TOKEN:-""} # Secrets Manager Loader: specify to the Ingestion Framework how to load the SM credentials from its env # Supported: noop, airflow, env diff --git a/ingestion/src/metadata/ingestion/ometa/mixins/logs_mixin.py b/ingestion/src/metadata/ingestion/ometa/mixins/logs_mixin.py index fa0e6c2d030..46c81f13aaf 100644 --- a/ingestion/src/metadata/ingestion/ometa/mixins/logs_mixin.py +++ b/ingestion/src/metadata/ingestion/ometa/mixins/logs_mixin.py @@ -26,6 +26,7 @@ from typing import Optional from uuid import UUID from metadata.ingestion.ometa.client import REST +from metadata.ingestion.ometa.utils import model_str from metadata.utils.constants import UTF_8 from metadata.utils.logger import ometa_logger @@ -63,8 +64,11 @@ class OMetaLogsMixin: bool: True if logs were sent successfully, False otherwise """ try: + # Extract the UUID string value from the object if it has a .root attribute # Build the API endpoint - url = f"/services/ingestionPipelines/logs/{pipeline_fqn}/{run_id}" + url = ( + f"/services/ingestionPipelines/logs/{pipeline_fqn}/{model_str(run_id)}" + ) # Prepare log batch data matching Java LogBatch structure log_batch = { @@ -86,11 +90,17 @@ class OMetaLogsMixin: url, data=json.dumps(log_batch), ) - if response.status_code == 200: + + # The REST client returns None for successful requests with empty response body (HTTP 200 with no content) + # It also returns None on errors, but those are caught by the exception handler + # Since we're not in the exception handler, None here means successful upload with empty response + if response is None or response: logger.debug( f"Successfully sent {log_batch['lineCount']} log lines for pipeline {pipeline_fqn}" ) return True + + logger.warning(f"Unexpected response from log upload: {response}") return False except Exception as e: @@ -160,7 +170,7 @@ class OMetaLogsMixin: # Use the metadata client's REST interface directly self.client.post( - f"/services/ingestionPipelines/logs/{pipeline_fqn}/{run_id}", + f"/services/ingestionPipelines/logs/{pipeline_fqn}/{model_str(run_id)}", data=json.dumps(log_batch), ) @@ -196,8 +206,9 @@ class OMetaLogsMixin: Optional[str]: Stream session ID if applicable, None otherwise """ try: + # Initialize log stream with the server - url = f"/services/ingestionPipelines/logs/{pipeline_fqn}/{run_id}/init" + url = f"/services/ingestionPipelines/logs/{pipeline_fqn}/{model_str(run_id)}/init" init_data = { "connectorId": f"{socket.gethostname()}-{os.getpid()}", @@ -242,7 +253,8 @@ class OMetaLogsMixin: bool: True if stream was closed successfully, False otherwise """ try: - url = f"/services/ingestionPipelines/logs/{pipeline_fqn}/{run_id}/close" + + url = f"/services/ingestionPipelines/logs/{pipeline_fqn}/{model_str(run_id)}/close" close_data = { "connectorId": f"{socket.gethostname()}-{os.getpid()}", @@ -287,7 +299,10 @@ class OMetaLogsMixin: Optional[str]: Log content if available, None otherwise """ try: - url = f"/services/ingestionPipelines/logs/{pipeline_fqn}/{run_id}" + + url = ( + f"/services/ingestionPipelines/logs/{pipeline_fqn}/{model_str(run_id)}" + ) params = { "offset": offset, diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/logstorage/S3LogStorage.java b/openmetadata-service/src/main/java/org/openmetadata/service/logstorage/S3LogStorage.java index 280c910f908..7529f8a8cff 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/logstorage/S3LogStorage.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/logstorage/S3LogStorage.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.io.OutputStream; +import java.net.URI; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collections; @@ -50,6 +51,7 @@ import org.openmetadata.schema.security.credentials.AWSCredentials; import org.openmetadata.service.monitoring.StreamableLogsMetrics; import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.auth.credentials.AwsSessionCredentials; import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; import software.amazon.awssdk.core.sync.RequestBody; @@ -156,18 +158,25 @@ public class S3LogStorage implements LogStorageInterface { S3ClientBuilder s3Builder = S3Client.builder().region(Region.of(s3Config.getRegion())); - String customEndpoint = (String) config.get("endpoint"); + URI customEndpoint = s3Config.getAwsConfig().getEndPointURL(); if (customEndpoint != null) { - s3Builder.endpointOverride(java.net.URI.create(customEndpoint)); + s3Builder.endpointOverride(java.net.URI.create(customEndpoint.toString())); s3Builder.forcePathStyle(true); // Required for MinIO this.isCustomEndpoint = true; } - String accessKey = (String) config.get("accessKey"); - String secretKey = (String) config.get("secretKey"); + String accessKey = s3Config.getAwsConfig().getAwsAccessKeyId(); + String secretKey = s3Config.getAwsConfig().getAwsSecretAccessKey(); + String sessionToken = s3Config.getAwsConfig().getAwsSessionToken(); if (accessKey != null && secretKey != null) { - s3Builder.credentialsProvider( - StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKey, secretKey))); + if (sessionToken != null) { + s3Builder.credentialsProvider( + StaticCredentialsProvider.create( + AwsSessionCredentials.create(accessKey, secretKey, sessionToken))); + } else { + s3Builder.credentialsProvider( + StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKey, secretKey))); + } } else { AwsCredentialsProvider credentialsProvider = getCredentialsProvider(s3Config.getAwsConfig()); diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/resources/services/ingestionpipelines/IngestionPipelineLogStorageTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/resources/services/ingestionpipelines/IngestionPipelineLogStorageTest.java index 122e2b39252..ed02ba48806 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/resources/services/ingestionpipelines/IngestionPipelineLogStorageTest.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/resources/services/ingestionpipelines/IngestionPipelineLogStorageTest.java @@ -25,6 +25,7 @@ import jakarta.ws.rs.core.MediaType; import jakarta.ws.rs.core.Response; import java.io.IOException; import java.io.OutputStream; +import java.net.URI; import java.util.*; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.AfterAll; @@ -75,6 +76,7 @@ class IngestionPipelineLogStorageTest extends OpenMetadataApplicationTest { private static IngestionPipeline testPipeline; private static S3LogStorage s3LogStorage; private static StreamableLogsMetrics metrics; + private static String minioEndpoint; private static boolean initialized = false; @BeforeEach @@ -109,7 +111,7 @@ class IngestionPipelineLogStorageTest extends OpenMetadataApplicationTest { return; } TestUtils.simulateWork(2000); - String minioEndpoint = + minioEndpoint = String.format("http://%s:%d", minioContainer.getHost(), minioContainer.getMappedPort(9000)); LOG.info("Connecting to MinIO at: {}", minioEndpoint); @@ -247,6 +249,13 @@ class IngestionPipelineLogStorageTest extends OpenMetadataApplicationTest { @Test void testMaxConcurrentStreamsLimit() throws Exception { setupTestData(); + // Initialize S3LogStorage with MinIO configuration + org.openmetadata.schema.security.credentials.AWSCredentials awsCreds = + new org.openmetadata.schema.security.credentials.AWSCredentials() + .withAwsAccessKeyId(MINIO_ACCESS_KEY) + .withAwsSecretAccessKey(MINIO_SECRET_KEY) + .withAwsRegion("us-east-1") + .withEndPointURL(new URI(minioEndpoint)); // Create a storage with low max concurrent streams for testing LogStorageConfiguration limitedConfig = @@ -254,6 +263,7 @@ class IngestionPipelineLogStorageTest extends OpenMetadataApplicationTest { .withType(LogStorageConfiguration.Type.S_3) .withBucketName(TEST_BUCKET) .withRegion("us-east-1") + .withAwsConfig(awsCreds) .withPrefix("limited-test") .withMaxConcurrentStreams(2); // Very low limit diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/resources/services/ingestionpipelines/IngestionPipelineLogStreamingTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/resources/services/ingestionpipelines/IngestionPipelineLogStreamingTest.java index 7a821be901e..4a83b45d4f7 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/resources/services/ingestionpipelines/IngestionPipelineLogStreamingTest.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/resources/services/ingestionpipelines/IngestionPipelineLogStreamingTest.java @@ -65,6 +65,13 @@ public class IngestionPipelineLogStreamingTest extends OpenMetadataApplicationTe if (!bucketExists) { minioClient.makeBucket(MakeBucketArgs.builder().bucket(TEST_BUCKET).build()); } + // Initialize S3LogStorage with MinIO configuration + org.openmetadata.schema.security.credentials.AWSCredentials awsCreds = + new org.openmetadata.schema.security.credentials.AWSCredentials() + .withAwsAccessKeyId("minioadmin") + .withAwsSecretAccessKey("minioadmin") + .withAwsRegion("us-east-1") + .withEndPointURL(java.net.URI.create(endpoint)); LogStorageConfiguration config = new LogStorageConfiguration(); config.setType(LogStorageConfiguration.Type.S_3); @@ -74,6 +81,7 @@ public class IngestionPipelineLogStreamingTest extends OpenMetadataApplicationTe config.setMaxConcurrentStreams(10); config.setStreamTimeoutMinutes(5); config.setAsyncBufferSizeMB(5); + config.setAwsConfig(awsCreds); // Create config map as expected by S3LogStorage Map configMap = new HashMap<>();