MINOR - Fix S3 logging from ingestion pipelines (#23590)

* MINOR - Fix S3 logging from ingestion pipelines

* Update generated TypeScript types

* config

* update s3 configurations for streamable logs

* Update generated TypeScript types

* update s3 configurations for streamable logs

* update s3 configurations for streamable logs

* update s3 configurations for streamable logs

* SSE off by default

* Update log retrieval to use s3 if ingestion runner has streamable logs enabled

---------

Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: Pablo Takara <pjt1991@gmail.com>
This commit is contained in:
Pere Miquel Brull 2025-10-01 09:44:17 +02:00 committed by GitHub
parent 347c26b10c
commit 375e001dd9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
35 changed files with 248 additions and 63 deletions

View File

@ -361,16 +361,20 @@ pipelineServiceClientConfiguration:
certificatePath: ${PIPELINE_SERVICE_CLIENT_SSL_CERT_PATH:-""} # Local path for the Pipeline Service Client
logStorageConfiguration:
type: ${PIPELINE_SERVICE_CLIENT_LOG_TYPE:-"default"} # Possible values are "default", "s3"
enabled: ${PIPELINE_SERVICE_CLIENT_LOG_ENABLED:-false} # Enable it for pipelines deployed in the server
# if type is s3, provide the following configuration
bucketName: ${PIPELINE_SERVICE_CLIENT_LOG_BUCKET_NAME:-""}
# optional path within the bucket to store the logs
prefix: ${PIPELINE_SERVICE_CLIENT_LOG_PREFIX:-""}
region: ${PIPELINE_SERVICE_CLIENT_LOG_REGION:-""}
enableServerSideEncryption: ${PIPELINE_SERVICE_CLIENT_LOG_SSE_ENABLED:-false}
sseAlgorithm: ${PIPELINE_SERVICE_CLIENT_LOG_SSE_ALGORITHM:-"AES256"} # Allowed values: "AES256" or "aws:kms"
kmsKeyId: ${PIPELINE_SERVICE_CLIENT_LOG_KMS_KEY_ID:-""} # Required only if sseAlgorithm is "aws:kms"
awsConfig:
awsAccessKeyId: ${PIPELINE_SERVICE_CLIENT_LOG_AWS_ACCESS_KEY_ID:-""}
awsSecretAccessKey: ${PIPELINE_SERVICE_CLIENT_LOG_AWS_SECRET_ACCESS_KEY:-""}
awsRegion: ${PIPELINE_SERVICE_CLIENT_LOG_REGION:-""}
awsSessionToken: ${PIPELINE_SERVICE_CLIENT_LOG_AWS_SESSION_TOKEN:-""}
endPointURL: ${PIPELINE_SERVICE_CLIENT_LOG_AWS_ENDPOINT_URL:-""} # port forward localhost:9000 for minio
# Secrets Manager Loader: specify to the Ingestion Framework how to load the SM credentials from its env
# Supported: noop, airflow, env

View File

@ -20,7 +20,6 @@ from metadata.workflow.base import BaseWorkflow
def execute_workflow(workflow: BaseWorkflow, config_dict: Dict[str, Any]) -> None:
"""Execute the workflow and raise if needed"""
workflow.execute()
workflow.print_status()
workflow.stop()
if config_dict.get("workflowConfig", {}).get("raiseOnError", True):
workflow.raise_from_status()

View File

@ -208,7 +208,9 @@ class OMetaLogsMixin:
try:
# Initialize log stream with the server
url = f"/services/ingestionPipelines/logs/{pipeline_fqn}/{model_str(run_id)}/init"
url = (
f"/services/ingestionPipelines/logs/{pipeline_fqn}/{model_str(run_id)}"
)
init_data = {
"connectorId": f"{socket.gethostname()}-{os.getpid()}",

View File

@ -63,6 +63,7 @@ from typing import Any, Dict, Optional
from uuid import UUID
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.ometa.utils import model_str
from metadata.utils.logger import BASE_LOGGING_FORMAT, METADATA_LOGGER, ingestion_logger
logger = ingestion_logger()
@ -506,7 +507,7 @@ def setup_streamable_logging_for_workflow(
StreamableLogHandlerManager.set_handler(handler)
logger.info(
f"Streamable logging configured for pipeline: {pipeline_fqn}, run_id: {run_id}"
f"Streamable logging configured for pipeline: {pipeline_fqn}, run_id: {model_str(run_id)}"
)
return handler

View File

@ -118,8 +118,7 @@ class BaseWorkflow(ABC, WorkflowStatusMixin):
if (
self.config.ingestionPipelineFQN
and self.config.pipelineRunId
and self.ingestion_pipeline
and self.ingestion_pipeline.enableStreamableLogs
and self.config.enableStreamableLogs
):
setup_streamable_logging_for_workflow(
metadata=self.metadata,
@ -259,6 +258,7 @@ class BaseWorkflow(ABC, WorkflowStatusMixin):
finally:
ingestion_status = self.build_ingestion_status()
self.set_ingestion_pipeline_status(pipeline_state, ingestion_status)
self.print_status()
self.stop()
@property
@ -317,6 +317,7 @@ class BaseWorkflow(ABC, WorkflowStatusMixin):
),
sourceConfig=self.config.source.sourceConfig,
airflowConfig=AirflowConfig(),
enableStreamableLogs=self.config.enableStreamableLogs,
)
)

View File

@ -89,6 +89,7 @@ def build_application_workflow_config(
else None,
workflowConfig=build_workflow_config_property(ingestion_pipeline),
ingestionPipelineFQN=ingestion_pipeline.fullyQualifiedName.root,
enableStreamableLogs=ingestion_pipeline.enableStreamableLogs,
)
return application_workflow_config

View File

@ -76,6 +76,7 @@ def build_auto_classification_workflow_config(
openMetadataServerConfig=ingestion_pipeline.openMetadataServerConnection,
),
ingestionPipelineFQN=ingestion_pipeline.fullyQualifiedName.root,
enableStreamableLogs=ingestion_pipeline.enableStreamableLogs,
)
return workflow_config

View File

@ -200,7 +200,6 @@ def execute_workflow(
Execute the workflow and handle the status
"""
workflow.execute()
workflow.print_status()
workflow.stop()
if workflow_config.workflowConfig.raiseOnError:
workflow.raise_from_status()

View File

@ -52,6 +52,7 @@ def build_dbt_workflow_config(
),
workflowConfig=build_workflow_config_property(ingestion_pipeline),
ingestionPipelineFQN=ingestion_pipeline.fullyQualifiedName.root,
enableStreamableLogs=ingestion_pipeline.enableStreamableLogs,
)
return workflow_config

View File

@ -74,6 +74,7 @@ def build_es_reindex_workflow_config(
openMetadataServerConfig=ingestion_pipeline.openMetadataServerConnection,
),
ingestionPipelineFQN=ingestion_pipeline.fullyQualifiedName.root,
enableStreamableLogs=ingestion_pipeline.enableStreamableLogs,
)
return workflow_config

View File

@ -47,6 +47,7 @@ def build_lineage_workflow_config(
),
workflowConfig=build_workflow_config_property(ingestion_pipeline),
ingestionPipelineFQN=ingestion_pipeline.fullyQualifiedName.root,
enableStreamableLogs=ingestion_pipeline.enableStreamableLogs,
)
return workflow_config

View File

@ -44,6 +44,7 @@ def build_metadata_workflow_config(
),
workflowConfig=build_workflow_config_property(ingestion_pipeline),
ingestionPipelineFQN=ingestion_pipeline.fullyQualifiedName.root,
enableStreamableLogs=ingestion_pipeline.enableStreamableLogs,
)
return workflow_config

View File

@ -76,6 +76,7 @@ def build_profiler_workflow_config(
openMetadataServerConfig=ingestion_pipeline.openMetadataServerConnection,
),
ingestionPipelineFQN=ingestion_pipeline.fullyQualifiedName.root,
enableStreamableLogs=ingestion_pipeline.enableStreamableLogs,
)
return workflow_config

View File

@ -77,6 +77,7 @@ def build_test_suite_workflow_config(
openMetadataServerConfig=ingestion_pipeline.openMetadataServerConnection,
),
ingestionPipelineFQN=ingestion_pipeline.fullyQualifiedName.root,
enableStreamableLogs=ingestion_pipeline.enableStreamableLogs,
)
return workflow_config

View File

@ -83,6 +83,7 @@ def build_usage_config_from_file(
),
workflowConfig=build_workflow_config_property(ingestion_pipeline),
ingestionPipelineFQN=ingestion_pipeline.fullyQualifiedName.root,
enableStreamableLogs=ingestion_pipeline.enableStreamableLogs,
)

View File

@ -499,6 +499,8 @@ public class IngestionPipelineRepository extends EntityRepository<IngestionPipel
updateEnabled(original.getEnabled(), updated.getEnabled());
updateDeployed(original.getDeployed(), updated.getDeployed());
updateRaiseOnError(original.getRaiseOnError(), updated.getRaiseOnError());
updateEnableStreamableLogs(
original.getEnableStreamableLogs(), updated.getEnableStreamableLogs());
}
protected void updateProcessingEngine(IngestionPipeline original, IngestionPipeline updated) {
@ -546,6 +548,14 @@ public class IngestionPipelineRepository extends EntityRepository<IngestionPipel
}
}
private void updateEnableStreamableLogs(
Boolean origEnableStreamableLogs, Boolean updatedEnableStreamableLogs) {
if (updatedEnableStreamableLogs != null
&& !origEnableStreamableLogs.equals(updatedEnableStreamableLogs)) {
recordChange("enableStreamableLogs", origEnableStreamableLogs, updatedEnableStreamableLogs);
}
}
private void updateDeployed(Boolean origDeployed, Boolean updatedDeployed) {
if (updatedDeployed != null && !origDeployed.equals(updatedDeployed)) {
recordChange("deployed", origDeployed, updatedDeployed);
@ -840,4 +850,8 @@ public class IngestionPipelineRepository extends EntityRepository<IngestionPipel
IngestionPipeline ingestionPipeline, ServiceEntityInterface service) {
return pipelineServiceClient.deployPipeline(ingestionPipeline, service);
}
public boolean isIngestionRunnerStreamableLogsEnabled(EntityReference ingestionRunner) {
return false; // Default implementation
}
}

View File

@ -1421,7 +1421,8 @@ public class SystemRepository {
.withMessage(
String.format(
"Connected to S3 bucket: %s in region %s",
logStorageConfig.getBucketName(), logStorageConfig.getRegion()));
logStorageConfig.getBucketName(),
logStorageConfig.getAwsConfig().getAwsRegion()));
} catch (Exception e) {
return new StepValidation()
.withDescription("S3 Log Storage")

View File

@ -13,6 +13,8 @@
package org.openmetadata.service.logstorage;
import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import io.micrometer.core.instrument.Timer;
@ -110,6 +112,8 @@ public class S3LogStorage implements LogStorageInterface {
private long streamTimeoutMs;
private int asyncBufferSize;
private boolean isCustomEndpoint = false;
private ServerSideEncryption sseAlgorithm = null;
private String kmsKeyId = null;
private final Map<String, StreamContext> activeStreams = new ConcurrentHashMap<>();
private ExecutorService asyncExecutor;
@ -138,6 +142,15 @@ public class S3LogStorage implements LogStorageInterface {
s3Config.getEnableServerSideEncryption() != null
? s3Config.getEnableServerSideEncryption()
: true;
if (enableSSE) {
if (LogStorageConfiguration.SseAlgorithm.AES_256.equals(s3Config.getSseAlgorithm())) {
this.sseAlgorithm = ServerSideEncryption.AES256;
} else if (LogStorageConfiguration.SseAlgorithm.AWS_KMS.equals(
s3Config.getSseAlgorithm())) {
this.sseAlgorithm = ServerSideEncryption.AWS_KMS;
this.kmsKeyId = !nullOrEmpty(s3Config.getKmsKeyId()) ? s3Config.getKmsKeyId() : null;
}
}
this.storageClass =
s3Config.getStorageClass() != null
? StorageClass.fromValue(s3Config.getStorageClass().value())
@ -156,32 +169,18 @@ public class S3LogStorage implements LogStorageInterface {
? s3Config.getAsyncBufferSizeMB() * 1024 * 1024
: 5 * 1024 * 1024;
S3ClientBuilder s3Builder = S3Client.builder().region(Region.of(s3Config.getRegion()));
S3ClientBuilder s3Builder =
S3Client.builder().region(Region.of(s3Config.getAwsConfig().getAwsRegion()));
URI customEndpoint = s3Config.getAwsConfig().getEndPointURL();
if (customEndpoint != null) {
if (!nullOrEmpty(customEndpoint)) {
s3Builder.endpointOverride(java.net.URI.create(customEndpoint.toString()));
s3Builder.forcePathStyle(true); // Required for MinIO
this.isCustomEndpoint = true;
}
String accessKey = s3Config.getAwsConfig().getAwsAccessKeyId();
String secretKey = s3Config.getAwsConfig().getAwsSecretAccessKey();
String sessionToken = s3Config.getAwsConfig().getAwsSessionToken();
if (accessKey != null && secretKey != null) {
if (sessionToken != null) {
s3Builder.credentialsProvider(
StaticCredentialsProvider.create(
AwsSessionCredentials.create(accessKey, secretKey, sessionToken)));
} else {
s3Builder.credentialsProvider(
StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKey, secretKey)));
}
} else {
AwsCredentialsProvider credentialsProvider =
getCredentialsProvider(s3Config.getAwsConfig());
s3Builder.credentialsProvider(credentialsProvider);
}
AwsCredentialsProvider credentialsProvider = resolveCredentials(s3Config.getAwsConfig());
s3Builder.credentialsProvider(credentialsProvider);
this.metrics = (StreamableLogsMetrics) config.get("metrics");
if (this.metrics == null) {
@ -194,6 +193,9 @@ public class S3LogStorage implements LogStorageInterface {
s3Client.headBucket(HeadBucketRequest.builder().bucket(bucketName).build());
} catch (NoSuchBucketException e) {
throw new IOException("S3 bucket does not exist: " + bucketName);
} catch (Exception e) {
throw new RuntimeException(
"Error accessing S3 bucket: " + bucketName + ". Validate AWS configuration.", e);
}
this.asyncExecutor =
@ -250,6 +252,33 @@ public class S3LogStorage implements LogStorageInterface {
}
}
private AwsCredentialsProvider resolveCredentials(AWSCredentials config) {
String accessKey = config.getAwsAccessKeyId();
String secretKey = config.getAwsSecretAccessKey();
String sessionToken = config.getAwsSessionToken();
if ((!nullOrEmpty(accessKey) && !nullOrEmpty(secretKey))
|| !nullOrEmpty(config.getEndPointURL())) {
if (!nullOrEmpty(sessionToken)) {
return StaticCredentialsProvider.create(
AwsSessionCredentials.create(accessKey, secretKey, sessionToken));
} else {
return StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKey, secretKey));
}
} else {
try {
AwsCredentialsProvider defaultProvider = DefaultCredentialsProvider.create();
defaultProvider.resolveCredentials(); // Triggers validation
LOG.info("Using AWS DefaultCredentialsProvider");
return defaultProvider;
} catch (Exception e) {
LOG.warn(
"Default credentials not found. Falling back to static credentials. Reason: {}",
e.getMessage());
return StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKey, secretKey));
}
}
}
@Override
public OutputStream getLogOutputStream(String pipelineFQN, UUID runId) throws IOException {
String streamKey = pipelineFQN + "/" + runId;
@ -397,7 +426,8 @@ public class S3LogStorage implements LogStorageInterface {
List<String> lines = new ArrayList<>();
String line;
int lineNumber = 0;
int startLine = afterCursor != null ? Integer.parseInt(afterCursor) : 0;
int startLine =
afterCursor != null && !afterCursor.isEmpty() ? Integer.parseInt(afterCursor) : 0;
while (lineNumber < startLine && (line = reader.readLine()) != null) {
lineNumber++;
@ -652,17 +682,6 @@ public class S3LogStorage implements LogStorageInterface {
return String.format("%s/%s/", cleanPrefix, sanitizedFQN);
}
private AwsCredentialsProvider getCredentialsProvider(AWSCredentials awsCredentials) {
if (awsCredentials != null
&& awsCredentials.getAwsAccessKeyId() != null
&& awsCredentials.getAwsSecretAccessKey() != null) {
return StaticCredentialsProvider.create(
AwsBasicCredentials.create(
awsCredentials.getAwsAccessKeyId(), awsCredentials.getAwsSecretAccessKey()));
}
return DefaultCredentialsProvider.create();
}
private void configureLifecyclePolicy() {
try {
LifecycleRule rule =
@ -755,7 +774,10 @@ public class S3LogStorage implements LogStorageInterface {
}
if (enableSSE && !isCustomEndpoint) {
putRequestBuilder.serverSideEncryption(ServerSideEncryption.AES256);
putRequestBuilder.serverSideEncryption(this.sseAlgorithm);
if (this.sseAlgorithm == ServerSideEncryption.AWS_KMS && this.kmsKeyId != null) {
putRequestBuilder.ssekmsKeyId(this.kmsKeyId);
}
}
s3Client.putObject(

View File

@ -29,6 +29,7 @@ public class IngestionPipelineMapper
.withRaiseOnError(create.getRaiseOnError())
.withProvider(create.getProvider())
.withService(create.getService())
.withEnableStreamableLogs(create.getEnableStreamableLogs())
.withProcessingEngine(create.getProcessingEngine());
}
}

View File

@ -16,6 +16,7 @@ package org.openmetadata.service.resources.services.ingestionpipelines;
import static org.openmetadata.common.utils.CommonUtil.listOf;
import static org.openmetadata.common.utils.CommonUtil.listOrEmpty;
import static org.openmetadata.schema.type.MetadataOperation.CREATE;
import static org.openmetadata.sdk.PipelineServiceClientInterface.TYPE_TO_TASK;
import static org.openmetadata.service.Entity.FIELD_OWNERS;
import static org.openmetadata.service.Entity.FIELD_PIPELINE_STATUS;
import static org.openmetadata.service.jdbi3.IngestionPipelineRepository.validateProfileSample;
@ -76,6 +77,7 @@ import org.openmetadata.schema.type.ProviderType;
import org.openmetadata.schema.utils.JsonUtils;
import org.openmetadata.schema.utils.ResultList;
import org.openmetadata.sdk.PipelineServiceClientInterface;
import org.openmetadata.sdk.exception.PipelineServiceClientException;
import org.openmetadata.service.Entity;
import org.openmetadata.service.OpenMetadataApplicationConfig;
import org.openmetadata.service.clients.pipeline.PipelineServiceClientFactory;
@ -950,14 +952,50 @@ public class IngestionPipelineResource
description = "Returns log chunk after this cursor",
schema = @Schema(type = "string"))
@QueryParam("after")
String after) {
String after,
@Parameter(
description = "Maximum number of lines to return",
schema = @Schema(type = "integer"))
@QueryParam("limit")
@DefaultValue("1000")
int limit) {
if (pipelineServiceClient == null) {
return Response.status(200).entity("Pipeline Client Disabled").build();
}
IngestionPipeline ingestionPipeline =
getInternal(uriInfo, securityContext, id, FIELDS, Include.NON_DELETED);
Map<String, String> lastIngestionLogs =
pipelineServiceClient.getLastIngestionLogs(ingestionPipeline, after);
getInternal(
uriInfo, securityContext, id, "pipelineStatuses,ingestionRunner", Include.NON_DELETED);
Map<String, String> lastIngestionLogs;
boolean useStreamableLogs =
ingestionPipeline.getEnableStreamableLogs()
|| (ingestionPipeline.getIngestionRunner() != null
&& repository.isIngestionRunnerStreamableLogsEnabled(
ingestionPipeline.getIngestionRunner()));
if (useStreamableLogs) {
// Get logs using the repository's log storage picking up the last runId
String runId = ingestionPipeline.getPipelineStatuses().getRunId();
if (!CommonUtil.nullOrEmpty(runId)) {
Map<String, Object> lastIngestionLogsMap =
repository.getLogs(
ingestionPipeline.getFullyQualifiedName(), UUID.fromString(runId), after, limit);
lastIngestionLogs =
lastIngestionLogsMap.entrySet().stream()
.filter(entry -> entry.getValue() != null)
.collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().toString()));
Object logs = lastIngestionLogs.remove("logs");
if (logs != null) {
lastIngestionLogs.put(
TYPE_TO_TASK.get(ingestionPipeline.getPipelineType().toString()), logs.toString());
}
} else {
throw new PipelineServiceClientException(
"No runId found for the last ingestion pipeline run");
}
} else {
// Get the logs from the service client
lastIngestionLogs = pipelineServiceClient.getLastIngestionLogs(ingestionPipeline, after);
}
return Response.ok(lastIngestionLogs, MediaType.APPLICATION_JSON_TYPE).build();
}
@ -1181,7 +1219,9 @@ public class IngestionPipelineResource
decryptOrNullify(securityContext, ingestionPipeline, true);
ServiceEntityInterface service =
Entity.getEntity(ingestionPipeline.getService(), "", Include.NON_DELETED);
if (repository.isS3LogStorageEnabled()) {
// Flag the ingestion pipeline with streamable logs only if configured and enabled for use
if (repository.isS3LogStorageEnabled()
&& repository.getLogStorageConfiguration().getEnabled()) {
ingestionPipeline.setEnableStreamableLogs(true);
}
PipelineServiceClientResponse status =

View File

@ -24,6 +24,7 @@ import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.openmetadata.schema.api.configuration.LogStorageConfiguration;
import org.openmetadata.schema.security.credentials.AWSCredentials;
import org.openmetadata.sdk.PipelineServiceClientInterface;
import org.openmetadata.service.monitoring.StreamableLogsMetrics;
@ -46,7 +47,7 @@ public class LogStorageFactoryTest {
new LogStorageConfiguration()
.withType(LogStorageConfiguration.Type.S_3)
.withBucketName("test-bucket")
.withRegion("us-east-1")
.withAwsConfig(new AWSCredentials().withAwsRegion("us-east-1"))
.withPrefix("test-prefix");
defaultConfig = null; // null config should create default storage

View File

@ -25,6 +25,7 @@ import java.util.*;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.*;
import org.openmetadata.schema.api.configuration.LogStorageConfiguration;
import org.openmetadata.schema.security.credentials.AWSCredentials;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.junit.jupiter.Container;
@ -82,7 +83,7 @@ public class MinioLogStorageIntegrationTest {
new LogStorageConfiguration()
.withType(LogStorageConfiguration.Type.S_3)
.withBucketName(TEST_BUCKET)
.withRegion("us-east-1")
.withAwsConfig(new AWSCredentials().withAwsRegion("us-east-1"))
.withPrefix("logs")
.withEnableServerSideEncryption(false) // MinIO doesn't need SSE
.withStorageClass(

View File

@ -27,6 +27,7 @@ import org.mockito.Mock;
import org.mockito.MockedStatic;
import org.mockito.junit.jupiter.MockitoExtension;
import org.openmetadata.schema.api.configuration.LogStorageConfiguration;
import org.openmetadata.schema.security.credentials.AWSCredentials;
import software.amazon.awssdk.core.ResponseBytes;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.core.sync.ResponseTransformer;
@ -53,7 +54,7 @@ public class S3LogStorageTest {
new LogStorageConfiguration()
.withType(LogStorageConfiguration.Type.S_3)
.withBucketName(testBucket)
.withRegion("us-east-1")
.withAwsConfig(new AWSCredentials().withAwsRegion("us-east-1"))
.withPrefix(testPrefix)
.withEnableServerSideEncryption(true)
.withStorageClass(LogStorageConfiguration.StorageClass.STANDARD_IA)

View File

@ -41,6 +41,7 @@ import org.openmetadata.schema.entity.services.ingestionPipelines.IngestionPipel
import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineType;
import org.openmetadata.schema.metadataIngestion.DatabaseServiceMetadataPipeline;
import org.openmetadata.schema.metadataIngestion.SourceConfig;
import org.openmetadata.schema.security.credentials.AWSCredentials;
import org.openmetadata.schema.services.connections.database.BigQueryConnection;
import org.openmetadata.schema.type.EntityReference;
import org.openmetadata.schema.utils.JsonUtils;
@ -143,7 +144,7 @@ class IngestionPipelineLogStorageTest extends OpenMetadataApplicationTest {
new LogStorageConfiguration()
.withType(LogStorageConfiguration.Type.S_3)
.withBucketName(TEST_BUCKET)
.withRegion("us-east-1")
.withAwsConfig(new AWSCredentials().withAwsRegion("us-east-1"))
.withPrefix("test-logs")
.withAwsConfig(awsCreds);
@ -262,7 +263,7 @@ class IngestionPipelineLogStorageTest extends OpenMetadataApplicationTest {
new LogStorageConfiguration()
.withType(LogStorageConfiguration.Type.S_3)
.withBucketName(TEST_BUCKET)
.withRegion("us-east-1")
.withAwsConfig(new AWSCredentials().withAwsRegion("us-east-1"))
.withAwsConfig(awsCreds)
.withPrefix("limited-test")
.withMaxConcurrentStreams(2); // Very low limit
@ -499,7 +500,7 @@ class IngestionPipelineLogStorageTest extends OpenMetadataApplicationTest {
new LogStorageConfiguration()
.withType(LogStorageConfiguration.Type.S_3)
.withBucketName("non-existent-bucket")
.withRegion("us-east-1")
.withAwsConfig(new AWSCredentials().withAwsRegion("us-east-1"))
.withPrefix("test-logs");
S3LogStorage badStorage = new S3LogStorage();

View File

@ -23,6 +23,7 @@ import java.util.concurrent.CompletableFuture;
import org.apache.commons.io.IOUtils;
import org.junit.jupiter.api.*;
import org.openmetadata.schema.api.configuration.LogStorageConfiguration;
import org.openmetadata.schema.security.credentials.AWSCredentials;
import org.openmetadata.service.OpenMetadataApplicationTest;
import org.openmetadata.service.logstorage.S3LogStorage;
import org.testcontainers.containers.GenericContainer;
@ -77,7 +78,7 @@ public class IngestionPipelineLogStreamingTest extends OpenMetadataApplicationTe
config.setType(LogStorageConfiguration.Type.S_3);
config.setBucketName(TEST_BUCKET);
config.setPrefix("pipeline-logs/");
config.setRegion("us-east-1");
config.withAwsConfig(new AWSCredentials().withAwsRegion("us-east-1"));
config.setMaxConcurrentStreams(10);
config.setStreamTimeoutMinutes(5);
config.setAsyncBufferSizeMB(5);
@ -338,7 +339,7 @@ public class IngestionPipelineLogStreamingTest extends OpenMetadataApplicationTe
failConfig.setType(LogStorageConfiguration.Type.S_3);
failConfig.setBucketName(TEST_BUCKET);
failConfig.setPrefix("pipeline-logs/");
failConfig.setRegion("us-east-1");
failConfig.withAwsConfig(new AWSCredentials().withAwsRegion("us-east-1"));
Map<String, Object> failConfigMap = new HashMap<>();
failConfigMap.put("config", failConfig);

View File

@ -59,6 +59,11 @@
"processingEngine": {
"description": "The processing engine responsible for executing the ingestion pipeline logic.",
"$ref": "../../../type/entityReference.json"
},
"enableStreamableLogs": {
"description": "Enable real-time log streaming to the OpenMetadata server. When enabled, ingestion logs will be automatically shipped to the server's configured log storage backend (S3 or compatible).",
"type": "boolean",
"default": false
}
},
"required": [

View File

@ -12,6 +12,11 @@
"enum": ["default", "s3"],
"default": "default"
},
"enabled": {
"description": "Enable it for pipelines deployed in the server",
"type": "boolean",
"default": false
},
"bucketName": {
"description": "S3 bucket name for storing logs (required for S3 type)",
"type": "string"
@ -21,10 +26,6 @@
"type": "string",
"default": "pipeline-logs"
},
"region": {
"description": "AWS region for the S3 bucket (required for S3 type)",
"type": "string"
},
"awsConfig": {
"description": "AWS credentials configuration",
"$ref": "../security/credentials/awsCredentials.json"
@ -34,6 +35,15 @@
"type": "boolean",
"default": true
},
"sseAlgorithm": {
"description": "Server-side encryption algorithm (if applicable)",
"type": "string",
"enum": ["AES256", "aws:kms"]
},
"kmsKeyId": {
"description": "KMS Key ID for server-side encryption (if applicable)",
"type": "string"
},
"storageClass": {
"description": "S3 storage class for log objects",
"type": "string",

View File

@ -29,6 +29,11 @@
"pipelineRunId": {
"description": "Unique identifier of pipeline run, used to identify the current pipeline run",
"$ref": "../type/basic.json#/definitions/uuid"
},
"enableStreamableLogs": {
"description": "Enable streaming logs to a remote log storage via the OpenMetadata Server",
"type": "boolean",
"default": false
}
},
"required": ["workflowConfig"],

View File

@ -234,6 +234,11 @@
"pipelineRunId": {
"description": "Unique identifier of pipeline run, used to identify the current pipeline run",
"$ref": "../type/basic.json#/definitions/uuid"
},
"enableStreamableLogs": {
"description": "Enable streaming logs to a remote log storage via the OpenMetadata Server",
"type": "boolean",
"default": false
}
},
"required": ["source", "workflowConfig"],

View File

@ -27,6 +27,12 @@ export interface CreateIngestionPipeline {
* Fully qualified names of the domains the Ingestion Pipeline belongs to.
*/
domains?: string[];
/**
* Enable real-time log streaming to the OpenMetadata server. When enabled, ingestion logs
* will be automatically shipped to the server's configured log storage backend (S3 or
* compatible).
*/
enableStreamableLogs?: boolean;
/**
* Set the logging level for the workflow.
*/

View File

@ -26,6 +26,10 @@ export interface LogStorageConfiguration {
* S3 bucket name for storing logs (required for S3 type)
*/
bucketName?: string;
/**
* Enable it for pipelines deployed in the server
*/
enabled?: boolean;
/**
* Enable server-side encryption for S3 objects
*/
@ -34,6 +38,10 @@ export interface LogStorageConfiguration {
* Number of days after which logs are automatically deleted (0 means no expiration)
*/
expirationDays?: number;
/**
* KMS Key ID for server-side encryption (if applicable)
*/
kmsKeyId?: string;
/**
* Maximum number of concurrent log streams allowed
*/
@ -43,9 +51,9 @@ export interface LogStorageConfiguration {
*/
prefix?: string;
/**
* AWS region for the S3 bucket (required for S3 type)
* Server-side encryption algorithm (if applicable)
*/
region?: string;
sseAlgorithm?: SSEAlgorithm;
/**
* S3 storage class for log objects
*/
@ -108,6 +116,14 @@ export interface AWSCredentials {
profileName?: string;
}
/**
* Server-side encryption algorithm (if applicable)
*/
export enum SSEAlgorithm {
Aes256 = "AES256",
AwsKms = "aws:kms",
}
/**
* S3 storage class for log objects
*/

View File

@ -266,6 +266,10 @@ export interface LogStorageConfiguration {
* S3 bucket name for storing logs (required for S3 type)
*/
bucketName?: string;
/**
* Enable it for pipelines deployed in the server
*/
enabled?: boolean;
/**
* Enable server-side encryption for S3 objects
*/
@ -274,6 +278,10 @@ export interface LogStorageConfiguration {
* Number of days after which logs are automatically deleted (0 means no expiration)
*/
expirationDays?: number;
/**
* KMS Key ID for server-side encryption (if applicable)
*/
kmsKeyId?: string;
/**
* Maximum number of concurrent log streams allowed
*/
@ -283,9 +291,9 @@ export interface LogStorageConfiguration {
*/
prefix?: string;
/**
* AWS region for the S3 bucket (required for S3 type)
* Server-side encryption algorithm (if applicable)
*/
region?: string;
sseAlgorithm?: SSEAlgorithm;
/**
* S3 storage class for log objects
*/
@ -348,6 +356,14 @@ export interface AWSCredentials {
profileName?: string;
}
/**
* Server-side encryption algorithm (if applicable)
*/
export enum SSEAlgorithm {
Aes256 = "AES256",
AwsKms = "aws:kms",
}
/**
* S3 storage class for log objects
*/

View File

@ -23,6 +23,10 @@ export interface Application {
* External Application Private configuration
*/
appPrivateConfig?: PrivateConfig;
/**
* Enable streaming logs to a remote log storage via the OpenMetadata Server
*/
enableStreamableLogs?: boolean;
/**
* Fully qualified name of ingestion pipeline, used to identify the current ingestion
* pipeline

View File

@ -33,6 +33,10 @@ export interface Workflow {
*/
export interface OpenMetadataWorkflowConfig {
bulkSink?: BulkSink;
/**
* Enable streaming logs to a remote log storage via the OpenMetadata Server
*/
enableStreamableLogs?: boolean;
/**
* Fully qualified name of ingestion pipeline, used to identify the current ingestion
* pipeline

View File

@ -1577,6 +1577,10 @@ export interface LogStorageConfiguration {
* S3 bucket name for storing logs (required for S3 type)
*/
bucketName?: string;
/**
* Enable it for pipelines deployed in the server
*/
enabled?: boolean;
/**
* Enable server-side encryption for S3 objects
*/
@ -1585,6 +1589,10 @@ export interface LogStorageConfiguration {
* Number of days after which logs are automatically deleted (0 means no expiration)
*/
expirationDays?: number;
/**
* KMS Key ID for server-side encryption (if applicable)
*/
kmsKeyId?: string;
/**
* Maximum number of concurrent log streams allowed
*/
@ -1594,9 +1602,9 @@ export interface LogStorageConfiguration {
*/
prefix?: string;
/**
* AWS region for the S3 bucket (required for S3 type)
* Server-side encryption algorithm (if applicable)
*/
region?: string;
sseAlgorithm?: SSEAlgorithm;
/**
* S3 storage class for log objects
*/
@ -1659,6 +1667,14 @@ export interface AWSCredentials {
profileName?: string;
}
/**
* Server-side encryption algorithm (if applicable)
*/
export enum SSEAlgorithm {
Aes256 = "AES256",
AwsKms = "aws:kms",
}
/**
* S3 storage class for log objects
*/