Fix Stream logger url (#23491)

This commit is contained in:
Suman Maharana 2025-09-23 14:35:14 +05:30 committed by GitHub
parent 3771149fb5
commit 1c710ef5e3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 67 additions and 13 deletions

View File

@ -359,6 +359,18 @@ pipelineServiceClientConfiguration:
verifySSL: ${PIPELINE_SERVICE_CLIENT_VERIFY_SSL:-"no-ssl"} # Possible values are "no-ssl", "ignore", "validate"
sslConfig:
certificatePath: ${PIPELINE_SERVICE_CLIENT_SSL_CERT_PATH:-""} # Local path for the Pipeline Service Client
logStorageConfiguration:
type: ${PIPELINE_SERVICE_CLIENT_LOG_TYPE:-"default"} # Possible values are "default", "s3"
# if type is s3, provide the following configuration
bucketName: ${PIPELINE_SERVICE_CLIENT_LOG_BUCKET_NAME:-""}
# optional path within the bucket to store the logs
prefix: ${PIPELINE_SERVICE_CLIENT_LOG_PREFIX:-""}
region: ${PIPELINE_SERVICE_CLIENT_LOG_REGION:-""}
awsConfig:
awsAccessKeyId: ${PIPELINE_SERVICE_CLIENT_LOG_AWS_ACCESS_KEY_ID:-""}
awsSecretAccessKey: ${PIPELINE_SERVICE_CLIENT_LOG_AWS_SECRET_ACCESS_KEY:-""}
awsRegion: ${PIPELINE_SERVICE_CLIENT_LOG_REGION:-""}
awsSessionToken: ${PIPELINE_SERVICE_CLIENT_LOG_AWS_SESSION_TOKEN:-""}
# Secrets Manager Loader: specify to the Ingestion Framework how to load the SM credentials from its env
# Supported: noop, airflow, env

View File

@ -26,6 +26,7 @@ from typing import Optional
from uuid import UUID
from metadata.ingestion.ometa.client import REST
from metadata.ingestion.ometa.utils import model_str
from metadata.utils.constants import UTF_8
from metadata.utils.logger import ometa_logger
@ -63,8 +64,11 @@ class OMetaLogsMixin:
bool: True if logs were sent successfully, False otherwise
"""
try:
# Extract the UUID string value from the object if it has a .root attribute
# Build the API endpoint
url = f"/services/ingestionPipelines/logs/{pipeline_fqn}/{run_id}"
url = (
f"/services/ingestionPipelines/logs/{pipeline_fqn}/{model_str(run_id)}"
)
# Prepare log batch data matching Java LogBatch structure
log_batch = {
@ -86,11 +90,17 @@ class OMetaLogsMixin:
url,
data=json.dumps(log_batch),
)
if response.status_code == 200:
# The REST client returns None for successful requests with empty response body (HTTP 200 with no content)
# It also returns None on errors, but those are caught by the exception handler
# Since we're not in the exception handler, None here means successful upload with empty response
if response is None or response:
logger.debug(
f"Successfully sent {log_batch['lineCount']} log lines for pipeline {pipeline_fqn}"
)
return True
logger.warning(f"Unexpected response from log upload: {response}")
return False
except Exception as e:
@ -160,7 +170,7 @@ class OMetaLogsMixin:
# Use the metadata client's REST interface directly
self.client.post(
f"/services/ingestionPipelines/logs/{pipeline_fqn}/{run_id}",
f"/services/ingestionPipelines/logs/{pipeline_fqn}/{model_str(run_id)}",
data=json.dumps(log_batch),
)
@ -196,8 +206,9 @@ class OMetaLogsMixin:
Optional[str]: Stream session ID if applicable, None otherwise
"""
try:
# Initialize log stream with the server
url = f"/services/ingestionPipelines/logs/{pipeline_fqn}/{run_id}/init"
url = f"/services/ingestionPipelines/logs/{pipeline_fqn}/{model_str(run_id)}/init"
init_data = {
"connectorId": f"{socket.gethostname()}-{os.getpid()}",
@ -242,7 +253,8 @@ class OMetaLogsMixin:
bool: True if stream was closed successfully, False otherwise
"""
try:
url = f"/services/ingestionPipelines/logs/{pipeline_fqn}/{run_id}/close"
url = f"/services/ingestionPipelines/logs/{pipeline_fqn}/{model_str(run_id)}/close"
close_data = {
"connectorId": f"{socket.gethostname()}-{os.getpid()}",
@ -287,7 +299,10 @@ class OMetaLogsMixin:
Optional[str]: Log content if available, None otherwise
"""
try:
url = f"/services/ingestionPipelines/logs/{pipeline_fqn}/{run_id}"
url = (
f"/services/ingestionPipelines/logs/{pipeline_fqn}/{model_str(run_id)}"
)
params = {
"offset": offset,

View File

@ -23,6 +23,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
@ -50,6 +51,7 @@ import org.openmetadata.schema.security.credentials.AWSCredentials;
import org.openmetadata.service.monitoring.StreamableLogsMetrics;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.core.sync.RequestBody;
@ -156,18 +158,25 @@ public class S3LogStorage implements LogStorageInterface {
S3ClientBuilder s3Builder = S3Client.builder().region(Region.of(s3Config.getRegion()));
String customEndpoint = (String) config.get("endpoint");
URI customEndpoint = s3Config.getAwsConfig().getEndPointURL();
if (customEndpoint != null) {
s3Builder.endpointOverride(java.net.URI.create(customEndpoint));
s3Builder.endpointOverride(java.net.URI.create(customEndpoint.toString()));
s3Builder.forcePathStyle(true); // Required for MinIO
this.isCustomEndpoint = true;
}
String accessKey = (String) config.get("accessKey");
String secretKey = (String) config.get("secretKey");
String accessKey = s3Config.getAwsConfig().getAwsAccessKeyId();
String secretKey = s3Config.getAwsConfig().getAwsSecretAccessKey();
String sessionToken = s3Config.getAwsConfig().getAwsSessionToken();
if (accessKey != null && secretKey != null) {
s3Builder.credentialsProvider(
StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKey, secretKey)));
if (sessionToken != null) {
s3Builder.credentialsProvider(
StaticCredentialsProvider.create(
AwsSessionCredentials.create(accessKey, secretKey, sessionToken)));
} else {
s3Builder.credentialsProvider(
StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKey, secretKey)));
}
} else {
AwsCredentialsProvider credentialsProvider =
getCredentialsProvider(s3Config.getAwsConfig());

View File

@ -25,6 +25,7 @@ import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.Response;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;
import java.util.*;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.AfterAll;
@ -75,6 +76,7 @@ class IngestionPipelineLogStorageTest extends OpenMetadataApplicationTest {
private static IngestionPipeline testPipeline;
private static S3LogStorage s3LogStorage;
private static StreamableLogsMetrics metrics;
private static String minioEndpoint;
private static boolean initialized = false;
@BeforeEach
@ -109,7 +111,7 @@ class IngestionPipelineLogStorageTest extends OpenMetadataApplicationTest {
return;
}
TestUtils.simulateWork(2000);
String minioEndpoint =
minioEndpoint =
String.format("http://%s:%d", minioContainer.getHost(), minioContainer.getMappedPort(9000));
LOG.info("Connecting to MinIO at: {}", minioEndpoint);
@ -247,6 +249,13 @@ class IngestionPipelineLogStorageTest extends OpenMetadataApplicationTest {
@Test
void testMaxConcurrentStreamsLimit() throws Exception {
setupTestData();
// Initialize S3LogStorage with MinIO configuration
org.openmetadata.schema.security.credentials.AWSCredentials awsCreds =
new org.openmetadata.schema.security.credentials.AWSCredentials()
.withAwsAccessKeyId(MINIO_ACCESS_KEY)
.withAwsSecretAccessKey(MINIO_SECRET_KEY)
.withAwsRegion("us-east-1")
.withEndPointURL(new URI(minioEndpoint));
// Create a storage with low max concurrent streams for testing
LogStorageConfiguration limitedConfig =
@ -254,6 +263,7 @@ class IngestionPipelineLogStorageTest extends OpenMetadataApplicationTest {
.withType(LogStorageConfiguration.Type.S_3)
.withBucketName(TEST_BUCKET)
.withRegion("us-east-1")
.withAwsConfig(awsCreds)
.withPrefix("limited-test")
.withMaxConcurrentStreams(2); // Very low limit

View File

@ -65,6 +65,13 @@ public class IngestionPipelineLogStreamingTest extends OpenMetadataApplicationTe
if (!bucketExists) {
minioClient.makeBucket(MakeBucketArgs.builder().bucket(TEST_BUCKET).build());
}
// Initialize S3LogStorage with MinIO configuration
org.openmetadata.schema.security.credentials.AWSCredentials awsCreds =
new org.openmetadata.schema.security.credentials.AWSCredentials()
.withAwsAccessKeyId("minioadmin")
.withAwsSecretAccessKey("minioadmin")
.withAwsRegion("us-east-1")
.withEndPointURL(java.net.URI.create(endpoint));
LogStorageConfiguration config = new LogStorageConfiguration();
config.setType(LogStorageConfiguration.Type.S_3);
@ -74,6 +81,7 @@ public class IngestionPipelineLogStreamingTest extends OpenMetadataApplicationTe
config.setMaxConcurrentStreams(10);
config.setStreamTimeoutMinutes(5);
config.setAsyncBufferSizeMB(5);
config.setAwsConfig(awsCreds);
// Create config map as expected by S3LogStorage
Map<String, Object> configMap = new HashMap<>();