diff --git a/conf/openmetadata.yaml b/conf/openmetadata.yaml index d416512847b..68f0a966073 100644 --- a/conf/openmetadata.yaml +++ b/conf/openmetadata.yaml @@ -361,16 +361,20 @@ pipelineServiceClientConfiguration: certificatePath: ${PIPELINE_SERVICE_CLIENT_SSL_CERT_PATH:-""} # Local path for the Pipeline Service Client logStorageConfiguration: type: ${PIPELINE_SERVICE_CLIENT_LOG_TYPE:-"default"} # Possible values are "default", "s3" + enabled: ${PIPELINE_SERVICE_CLIENT_LOG_ENABLED:-false} # Enable it for pipelines deployed in the server # if type is s3, provide the following configuration bucketName: ${PIPELINE_SERVICE_CLIENT_LOG_BUCKET_NAME:-""} # optional path within the bucket to store the logs prefix: ${PIPELINE_SERVICE_CLIENT_LOG_PREFIX:-""} - region: ${PIPELINE_SERVICE_CLIENT_LOG_REGION:-""} + enableServerSideEncryption: ${PIPELINE_SERVICE_CLIENT_LOG_SSE_ENABLED:-false} + sseAlgorithm: ${PIPELINE_SERVICE_CLIENT_LOG_SSE_ALGORITHM:-"AES256"} # Allowed values: "AES256" or "aws:kms" + kmsKeyId: ${PIPELINE_SERVICE_CLIENT_LOG_KMS_KEY_ID:-""} # Required only if sseAlgorithm is "aws:kms" awsConfig: awsAccessKeyId: ${PIPELINE_SERVICE_CLIENT_LOG_AWS_ACCESS_KEY_ID:-""} awsSecretAccessKey: ${PIPELINE_SERVICE_CLIENT_LOG_AWS_SECRET_ACCESS_KEY:-""} awsRegion: ${PIPELINE_SERVICE_CLIENT_LOG_REGION:-""} awsSessionToken: ${PIPELINE_SERVICE_CLIENT_LOG_AWS_SESSION_TOKEN:-""} + endPointURL: ${PIPELINE_SERVICE_CLIENT_LOG_AWS_ENDPOINT_URL:-""} # port forward localhost:9000 for minio # Secrets Manager Loader: specify to the Ingestion Framework how to load the SM credentials from its env # Supported: noop, airflow, env diff --git a/ingestion/src/metadata/cli/common.py b/ingestion/src/metadata/cli/common.py index 1e07ebde2ba..7135a88ba64 100644 --- a/ingestion/src/metadata/cli/common.py +++ b/ingestion/src/metadata/cli/common.py @@ -20,7 +20,6 @@ from metadata.workflow.base import BaseWorkflow def execute_workflow(workflow: BaseWorkflow, config_dict: Dict[str, Any]) -> None: """Execute the workflow and raise if needed""" workflow.execute() - workflow.print_status() workflow.stop() if config_dict.get("workflowConfig", {}).get("raiseOnError", True): workflow.raise_from_status() diff --git a/ingestion/src/metadata/ingestion/ometa/mixins/logs_mixin.py b/ingestion/src/metadata/ingestion/ometa/mixins/logs_mixin.py index 46c81f13aaf..27202292e79 100644 --- a/ingestion/src/metadata/ingestion/ometa/mixins/logs_mixin.py +++ b/ingestion/src/metadata/ingestion/ometa/mixins/logs_mixin.py @@ -208,7 +208,9 @@ class OMetaLogsMixin: try: # Initialize log stream with the server - url = f"/services/ingestionPipelines/logs/{pipeline_fqn}/{model_str(run_id)}/init" + url = ( + f"/services/ingestionPipelines/logs/{pipeline_fqn}/{model_str(run_id)}" + ) init_data = { "connectorId": f"{socket.gethostname()}-{os.getpid()}", diff --git a/ingestion/src/metadata/utils/streamable_logger.py b/ingestion/src/metadata/utils/streamable_logger.py index c249d5d6a2c..297ef1cbd3a 100644 --- a/ingestion/src/metadata/utils/streamable_logger.py +++ b/ingestion/src/metadata/utils/streamable_logger.py @@ -63,6 +63,7 @@ from typing import Any, Dict, Optional from uuid import UUID from metadata.ingestion.ometa.ometa_api import OpenMetadata +from metadata.ingestion.ometa.utils import model_str from metadata.utils.logger import BASE_LOGGING_FORMAT, METADATA_LOGGER, ingestion_logger logger = ingestion_logger() @@ -506,7 +507,7 @@ def setup_streamable_logging_for_workflow( StreamableLogHandlerManager.set_handler(handler) logger.info( - f"Streamable logging configured for pipeline: {pipeline_fqn}, run_id: {run_id}" + f"Streamable logging configured for pipeline: {pipeline_fqn}, run_id: {model_str(run_id)}" ) return handler diff --git a/ingestion/src/metadata/workflow/base.py b/ingestion/src/metadata/workflow/base.py index ea305602d9f..0afe8cf541e 100644 --- a/ingestion/src/metadata/workflow/base.py +++ b/ingestion/src/metadata/workflow/base.py @@ -118,8 +118,7 @@ class BaseWorkflow(ABC, WorkflowStatusMixin): if ( self.config.ingestionPipelineFQN and self.config.pipelineRunId - and self.ingestion_pipeline - and self.ingestion_pipeline.enableStreamableLogs + and self.config.enableStreamableLogs ): setup_streamable_logging_for_workflow( metadata=self.metadata, @@ -259,6 +258,7 @@ class BaseWorkflow(ABC, WorkflowStatusMixin): finally: ingestion_status = self.build_ingestion_status() self.set_ingestion_pipeline_status(pipeline_state, ingestion_status) + self.print_status() self.stop() @property @@ -317,6 +317,7 @@ class BaseWorkflow(ABC, WorkflowStatusMixin): ), sourceConfig=self.config.source.sourceConfig, airflowConfig=AirflowConfig(), + enableStreamableLogs=self.config.enableStreamableLogs, ) ) diff --git a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/application.py b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/application.py index b8620fa556b..5b2d18447f7 100644 --- a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/application.py +++ b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/application.py @@ -89,6 +89,7 @@ def build_application_workflow_config( else None, workflowConfig=build_workflow_config_property(ingestion_pipeline), ingestionPipelineFQN=ingestion_pipeline.fullyQualifiedName.root, + enableStreamableLogs=ingestion_pipeline.enableStreamableLogs, ) return application_workflow_config diff --git a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/auto_classification.py b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/auto_classification.py index b14b3e84720..1c191b33b58 100644 --- a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/auto_classification.py +++ b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/auto_classification.py @@ -76,6 +76,7 @@ def build_auto_classification_workflow_config( openMetadataServerConfig=ingestion_pipeline.openMetadataServerConnection, ), ingestionPipelineFQN=ingestion_pipeline.fullyQualifiedName.root, + enableStreamableLogs=ingestion_pipeline.enableStreamableLogs, ) return workflow_config diff --git a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/common.py b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/common.py index 6896c500419..cc6039ca608 100644 --- a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/common.py +++ b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/common.py @@ -200,7 +200,6 @@ def execute_workflow( Execute the workflow and handle the status """ workflow.execute() - workflow.print_status() workflow.stop() if workflow_config.workflowConfig.raiseOnError: workflow.raise_from_status() diff --git a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/dbt.py b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/dbt.py index 6a71c1ccff3..d559e3a7048 100644 --- a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/dbt.py +++ b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/dbt.py @@ -52,6 +52,7 @@ def build_dbt_workflow_config( ), workflowConfig=build_workflow_config_property(ingestion_pipeline), ingestionPipelineFQN=ingestion_pipeline.fullyQualifiedName.root, + enableStreamableLogs=ingestion_pipeline.enableStreamableLogs, ) return workflow_config diff --git a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/es_reindex.py b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/es_reindex.py index 663aa13b40e..a198db4e140 100644 --- a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/es_reindex.py +++ b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/es_reindex.py @@ -74,6 +74,7 @@ def build_es_reindex_workflow_config( openMetadataServerConfig=ingestion_pipeline.openMetadataServerConnection, ), ingestionPipelineFQN=ingestion_pipeline.fullyQualifiedName.root, + enableStreamableLogs=ingestion_pipeline.enableStreamableLogs, ) return workflow_config diff --git a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/lineage.py b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/lineage.py index 0a2ed0b5abb..acb6c0481e3 100644 --- a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/lineage.py +++ b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/lineage.py @@ -47,6 +47,7 @@ def build_lineage_workflow_config( ), workflowConfig=build_workflow_config_property(ingestion_pipeline), ingestionPipelineFQN=ingestion_pipeline.fullyQualifiedName.root, + enableStreamableLogs=ingestion_pipeline.enableStreamableLogs, ) return workflow_config diff --git a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/metadata.py b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/metadata.py index 09e04429eef..64eaf586ad6 100644 --- a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/metadata.py +++ b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/metadata.py @@ -44,6 +44,7 @@ def build_metadata_workflow_config( ), workflowConfig=build_workflow_config_property(ingestion_pipeline), ingestionPipelineFQN=ingestion_pipeline.fullyQualifiedName.root, + enableStreamableLogs=ingestion_pipeline.enableStreamableLogs, ) return workflow_config diff --git a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/profiler.py b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/profiler.py index 0141b3bad20..21c451b428f 100644 --- a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/profiler.py +++ b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/profiler.py @@ -76,6 +76,7 @@ def build_profiler_workflow_config( openMetadataServerConfig=ingestion_pipeline.openMetadataServerConnection, ), ingestionPipelineFQN=ingestion_pipeline.fullyQualifiedName.root, + enableStreamableLogs=ingestion_pipeline.enableStreamableLogs, ) return workflow_config diff --git a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/test_suite.py b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/test_suite.py index 0b9ea399c64..c0f713c1fb4 100644 --- a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/test_suite.py +++ b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/test_suite.py @@ -77,6 +77,7 @@ def build_test_suite_workflow_config( openMetadataServerConfig=ingestion_pipeline.openMetadataServerConnection, ), ingestionPipelineFQN=ingestion_pipeline.fullyQualifiedName.root, + enableStreamableLogs=ingestion_pipeline.enableStreamableLogs, ) return workflow_config diff --git a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/usage.py b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/usage.py index b42cd3caad9..658bbe21a38 100644 --- a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/usage.py +++ b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/usage.py @@ -83,6 +83,7 @@ def build_usage_config_from_file( ), workflowConfig=build_workflow_config_property(ingestion_pipeline), ingestionPipelineFQN=ingestion_pipeline.fullyQualifiedName.root, + enableStreamableLogs=ingestion_pipeline.enableStreamableLogs, ) diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/IngestionPipelineRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/IngestionPipelineRepository.java index 77bfff628f8..bb01bac0812 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/IngestionPipelineRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/IngestionPipelineRepository.java @@ -499,6 +499,8 @@ public class IngestionPipelineRepository extends EntityRepository activeStreams = new ConcurrentHashMap<>(); private ExecutorService asyncExecutor; @@ -138,6 +142,15 @@ public class S3LogStorage implements LogStorageInterface { s3Config.getEnableServerSideEncryption() != null ? s3Config.getEnableServerSideEncryption() : true; + if (enableSSE) { + if (LogStorageConfiguration.SseAlgorithm.AES_256.equals(s3Config.getSseAlgorithm())) { + this.sseAlgorithm = ServerSideEncryption.AES256; + } else if (LogStorageConfiguration.SseAlgorithm.AWS_KMS.equals( + s3Config.getSseAlgorithm())) { + this.sseAlgorithm = ServerSideEncryption.AWS_KMS; + this.kmsKeyId = !nullOrEmpty(s3Config.getKmsKeyId()) ? s3Config.getKmsKeyId() : null; + } + } this.storageClass = s3Config.getStorageClass() != null ? StorageClass.fromValue(s3Config.getStorageClass().value()) @@ -156,32 +169,18 @@ public class S3LogStorage implements LogStorageInterface { ? s3Config.getAsyncBufferSizeMB() * 1024 * 1024 : 5 * 1024 * 1024; - S3ClientBuilder s3Builder = S3Client.builder().region(Region.of(s3Config.getRegion())); + S3ClientBuilder s3Builder = + S3Client.builder().region(Region.of(s3Config.getAwsConfig().getAwsRegion())); URI customEndpoint = s3Config.getAwsConfig().getEndPointURL(); - if (customEndpoint != null) { + if (!nullOrEmpty(customEndpoint)) { s3Builder.endpointOverride(java.net.URI.create(customEndpoint.toString())); s3Builder.forcePathStyle(true); // Required for MinIO this.isCustomEndpoint = true; } - String accessKey = s3Config.getAwsConfig().getAwsAccessKeyId(); - String secretKey = s3Config.getAwsConfig().getAwsSecretAccessKey(); - String sessionToken = s3Config.getAwsConfig().getAwsSessionToken(); - if (accessKey != null && secretKey != null) { - if (sessionToken != null) { - s3Builder.credentialsProvider( - StaticCredentialsProvider.create( - AwsSessionCredentials.create(accessKey, secretKey, sessionToken))); - } else { - s3Builder.credentialsProvider( - StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKey, secretKey))); - } - } else { - AwsCredentialsProvider credentialsProvider = - getCredentialsProvider(s3Config.getAwsConfig()); - s3Builder.credentialsProvider(credentialsProvider); - } + AwsCredentialsProvider credentialsProvider = resolveCredentials(s3Config.getAwsConfig()); + s3Builder.credentialsProvider(credentialsProvider); this.metrics = (StreamableLogsMetrics) config.get("metrics"); if (this.metrics == null) { @@ -194,6 +193,9 @@ public class S3LogStorage implements LogStorageInterface { s3Client.headBucket(HeadBucketRequest.builder().bucket(bucketName).build()); } catch (NoSuchBucketException e) { throw new IOException("S3 bucket does not exist: " + bucketName); + } catch (Exception e) { + throw new RuntimeException( + "Error accessing S3 bucket: " + bucketName + ". Validate AWS configuration.", e); } this.asyncExecutor = @@ -250,6 +252,33 @@ public class S3LogStorage implements LogStorageInterface { } } + private AwsCredentialsProvider resolveCredentials(AWSCredentials config) { + String accessKey = config.getAwsAccessKeyId(); + String secretKey = config.getAwsSecretAccessKey(); + String sessionToken = config.getAwsSessionToken(); + if ((!nullOrEmpty(accessKey) && !nullOrEmpty(secretKey)) + || !nullOrEmpty(config.getEndPointURL())) { + if (!nullOrEmpty(sessionToken)) { + return StaticCredentialsProvider.create( + AwsSessionCredentials.create(accessKey, secretKey, sessionToken)); + } else { + return StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKey, secretKey)); + } + } else { + try { + AwsCredentialsProvider defaultProvider = DefaultCredentialsProvider.create(); + defaultProvider.resolveCredentials(); // Triggers validation + LOG.info("Using AWS DefaultCredentialsProvider"); + return defaultProvider; + } catch (Exception e) { + LOG.warn( + "Default credentials not found. Falling back to static credentials. Reason: {}", + e.getMessage()); + return StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKey, secretKey)); + } + } + } + @Override public OutputStream getLogOutputStream(String pipelineFQN, UUID runId) throws IOException { String streamKey = pipelineFQN + "/" + runId; @@ -397,7 +426,8 @@ public class S3LogStorage implements LogStorageInterface { List lines = new ArrayList<>(); String line; int lineNumber = 0; - int startLine = afterCursor != null ? Integer.parseInt(afterCursor) : 0; + int startLine = + afterCursor != null && !afterCursor.isEmpty() ? Integer.parseInt(afterCursor) : 0; while (lineNumber < startLine && (line = reader.readLine()) != null) { lineNumber++; @@ -652,17 +682,6 @@ public class S3LogStorage implements LogStorageInterface { return String.format("%s/%s/", cleanPrefix, sanitizedFQN); } - private AwsCredentialsProvider getCredentialsProvider(AWSCredentials awsCredentials) { - if (awsCredentials != null - && awsCredentials.getAwsAccessKeyId() != null - && awsCredentials.getAwsSecretAccessKey() != null) { - return StaticCredentialsProvider.create( - AwsBasicCredentials.create( - awsCredentials.getAwsAccessKeyId(), awsCredentials.getAwsSecretAccessKey())); - } - return DefaultCredentialsProvider.create(); - } - private void configureLifecyclePolicy() { try { LifecycleRule rule = @@ -755,7 +774,10 @@ public class S3LogStorage implements LogStorageInterface { } if (enableSSE && !isCustomEndpoint) { - putRequestBuilder.serverSideEncryption(ServerSideEncryption.AES256); + putRequestBuilder.serverSideEncryption(this.sseAlgorithm); + if (this.sseAlgorithm == ServerSideEncryption.AWS_KMS && this.kmsKeyId != null) { + putRequestBuilder.ssekmsKeyId(this.kmsKeyId); + } } s3Client.putObject( diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/resources/services/ingestionpipelines/IngestionPipelineMapper.java b/openmetadata-service/src/main/java/org/openmetadata/service/resources/services/ingestionpipelines/IngestionPipelineMapper.java index 11218c5970f..abfe1748fef 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/resources/services/ingestionpipelines/IngestionPipelineMapper.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/resources/services/ingestionpipelines/IngestionPipelineMapper.java @@ -29,6 +29,7 @@ public class IngestionPipelineMapper .withRaiseOnError(create.getRaiseOnError()) .withProvider(create.getProvider()) .withService(create.getService()) + .withEnableStreamableLogs(create.getEnableStreamableLogs()) .withProcessingEngine(create.getProcessingEngine()); } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/resources/services/ingestionpipelines/IngestionPipelineResource.java b/openmetadata-service/src/main/java/org/openmetadata/service/resources/services/ingestionpipelines/IngestionPipelineResource.java index cc375d713d4..43dbcf5c8c9 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/resources/services/ingestionpipelines/IngestionPipelineResource.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/resources/services/ingestionpipelines/IngestionPipelineResource.java @@ -16,6 +16,7 @@ package org.openmetadata.service.resources.services.ingestionpipelines; import static org.openmetadata.common.utils.CommonUtil.listOf; import static org.openmetadata.common.utils.CommonUtil.listOrEmpty; import static org.openmetadata.schema.type.MetadataOperation.CREATE; +import static org.openmetadata.sdk.PipelineServiceClientInterface.TYPE_TO_TASK; import static org.openmetadata.service.Entity.FIELD_OWNERS; import static org.openmetadata.service.Entity.FIELD_PIPELINE_STATUS; import static org.openmetadata.service.jdbi3.IngestionPipelineRepository.validateProfileSample; @@ -76,6 +77,7 @@ import org.openmetadata.schema.type.ProviderType; import org.openmetadata.schema.utils.JsonUtils; import org.openmetadata.schema.utils.ResultList; import org.openmetadata.sdk.PipelineServiceClientInterface; +import org.openmetadata.sdk.exception.PipelineServiceClientException; import org.openmetadata.service.Entity; import org.openmetadata.service.OpenMetadataApplicationConfig; import org.openmetadata.service.clients.pipeline.PipelineServiceClientFactory; @@ -950,14 +952,50 @@ public class IngestionPipelineResource description = "Returns log chunk after this cursor", schema = @Schema(type = "string")) @QueryParam("after") - String after) { + String after, + @Parameter( + description = "Maximum number of lines to return", + schema = @Schema(type = "integer")) + @QueryParam("limit") + @DefaultValue("1000") + int limit) { if (pipelineServiceClient == null) { return Response.status(200).entity("Pipeline Client Disabled").build(); } IngestionPipeline ingestionPipeline = - getInternal(uriInfo, securityContext, id, FIELDS, Include.NON_DELETED); - Map lastIngestionLogs = - pipelineServiceClient.getLastIngestionLogs(ingestionPipeline, after); + getInternal( + uriInfo, securityContext, id, "pipelineStatuses,ingestionRunner", Include.NON_DELETED); + Map lastIngestionLogs; + boolean useStreamableLogs = + ingestionPipeline.getEnableStreamableLogs() + || (ingestionPipeline.getIngestionRunner() != null + && repository.isIngestionRunnerStreamableLogsEnabled( + ingestionPipeline.getIngestionRunner())); + if (useStreamableLogs) { + // Get logs using the repository's log storage picking up the last runId + String runId = ingestionPipeline.getPipelineStatuses().getRunId(); + if (!CommonUtil.nullOrEmpty(runId)) { + Map lastIngestionLogsMap = + repository.getLogs( + ingestionPipeline.getFullyQualifiedName(), UUID.fromString(runId), after, limit); + lastIngestionLogs = + lastIngestionLogsMap.entrySet().stream() + .filter(entry -> entry.getValue() != null) + .collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().toString())); + Object logs = lastIngestionLogs.remove("logs"); + if (logs != null) { + lastIngestionLogs.put( + TYPE_TO_TASK.get(ingestionPipeline.getPipelineType().toString()), logs.toString()); + } + } else { + throw new PipelineServiceClientException( + "No runId found for the last ingestion pipeline run"); + } + } else { + // Get the logs from the service client + lastIngestionLogs = pipelineServiceClient.getLastIngestionLogs(ingestionPipeline, after); + } + return Response.ok(lastIngestionLogs, MediaType.APPLICATION_JSON_TYPE).build(); } @@ -1181,7 +1219,9 @@ public class IngestionPipelineResource decryptOrNullify(securityContext, ingestionPipeline, true); ServiceEntityInterface service = Entity.getEntity(ingestionPipeline.getService(), "", Include.NON_DELETED); - if (repository.isS3LogStorageEnabled()) { + // Flag the ingestion pipeline with streamable logs only if configured and enabled for use + if (repository.isS3LogStorageEnabled() + && repository.getLogStorageConfiguration().getEnabled()) { ingestionPipeline.setEnableStreamableLogs(true); } PipelineServiceClientResponse status = diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/logstorage/LogStorageFactoryTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/logstorage/LogStorageFactoryTest.java index d761b0dafd1..93db7bbedba 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/logstorage/LogStorageFactoryTest.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/logstorage/LogStorageFactoryTest.java @@ -24,6 +24,7 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.openmetadata.schema.api.configuration.LogStorageConfiguration; +import org.openmetadata.schema.security.credentials.AWSCredentials; import org.openmetadata.sdk.PipelineServiceClientInterface; import org.openmetadata.service.monitoring.StreamableLogsMetrics; @@ -46,7 +47,7 @@ public class LogStorageFactoryTest { new LogStorageConfiguration() .withType(LogStorageConfiguration.Type.S_3) .withBucketName("test-bucket") - .withRegion("us-east-1") + .withAwsConfig(new AWSCredentials().withAwsRegion("us-east-1")) .withPrefix("test-prefix"); defaultConfig = null; // null config should create default storage diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/logstorage/MinioLogStorageIntegrationTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/logstorage/MinioLogStorageIntegrationTest.java index 16fb3386339..d202fec480d 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/logstorage/MinioLogStorageIntegrationTest.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/logstorage/MinioLogStorageIntegrationTest.java @@ -25,6 +25,7 @@ import java.util.*; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.*; import org.openmetadata.schema.api.configuration.LogStorageConfiguration; +import org.openmetadata.schema.security.credentials.AWSCredentials; import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.wait.strategy.Wait; import org.testcontainers.junit.jupiter.Container; @@ -82,7 +83,7 @@ public class MinioLogStorageIntegrationTest { new LogStorageConfiguration() .withType(LogStorageConfiguration.Type.S_3) .withBucketName(TEST_BUCKET) - .withRegion("us-east-1") + .withAwsConfig(new AWSCredentials().withAwsRegion("us-east-1")) .withPrefix("logs") .withEnableServerSideEncryption(false) // MinIO doesn't need SSE .withStorageClass( diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/logstorage/S3LogStorageTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/logstorage/S3LogStorageTest.java index 88c7f3e3089..455476b09f3 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/logstorage/S3LogStorageTest.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/logstorage/S3LogStorageTest.java @@ -27,6 +27,7 @@ import org.mockito.Mock; import org.mockito.MockedStatic; import org.mockito.junit.jupiter.MockitoExtension; import org.openmetadata.schema.api.configuration.LogStorageConfiguration; +import org.openmetadata.schema.security.credentials.AWSCredentials; import software.amazon.awssdk.core.ResponseBytes; import software.amazon.awssdk.core.sync.RequestBody; import software.amazon.awssdk.core.sync.ResponseTransformer; @@ -53,7 +54,7 @@ public class S3LogStorageTest { new LogStorageConfiguration() .withType(LogStorageConfiguration.Type.S_3) .withBucketName(testBucket) - .withRegion("us-east-1") + .withAwsConfig(new AWSCredentials().withAwsRegion("us-east-1")) .withPrefix(testPrefix) .withEnableServerSideEncryption(true) .withStorageClass(LogStorageConfiguration.StorageClass.STANDARD_IA) diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/resources/services/ingestionpipelines/IngestionPipelineLogStorageTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/resources/services/ingestionpipelines/IngestionPipelineLogStorageTest.java index ed02ba48806..dc831c9cc53 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/resources/services/ingestionpipelines/IngestionPipelineLogStorageTest.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/resources/services/ingestionpipelines/IngestionPipelineLogStorageTest.java @@ -41,6 +41,7 @@ import org.openmetadata.schema.entity.services.ingestionPipelines.IngestionPipel import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineType; import org.openmetadata.schema.metadataIngestion.DatabaseServiceMetadataPipeline; import org.openmetadata.schema.metadataIngestion.SourceConfig; +import org.openmetadata.schema.security.credentials.AWSCredentials; import org.openmetadata.schema.services.connections.database.BigQueryConnection; import org.openmetadata.schema.type.EntityReference; import org.openmetadata.schema.utils.JsonUtils; @@ -143,7 +144,7 @@ class IngestionPipelineLogStorageTest extends OpenMetadataApplicationTest { new LogStorageConfiguration() .withType(LogStorageConfiguration.Type.S_3) .withBucketName(TEST_BUCKET) - .withRegion("us-east-1") + .withAwsConfig(new AWSCredentials().withAwsRegion("us-east-1")) .withPrefix("test-logs") .withAwsConfig(awsCreds); @@ -262,7 +263,7 @@ class IngestionPipelineLogStorageTest extends OpenMetadataApplicationTest { new LogStorageConfiguration() .withType(LogStorageConfiguration.Type.S_3) .withBucketName(TEST_BUCKET) - .withRegion("us-east-1") + .withAwsConfig(new AWSCredentials().withAwsRegion("us-east-1")) .withAwsConfig(awsCreds) .withPrefix("limited-test") .withMaxConcurrentStreams(2); // Very low limit @@ -499,7 +500,7 @@ class IngestionPipelineLogStorageTest extends OpenMetadataApplicationTest { new LogStorageConfiguration() .withType(LogStorageConfiguration.Type.S_3) .withBucketName("non-existent-bucket") - .withRegion("us-east-1") + .withAwsConfig(new AWSCredentials().withAwsRegion("us-east-1")) .withPrefix("test-logs"); S3LogStorage badStorage = new S3LogStorage(); diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/resources/services/ingestionpipelines/IngestionPipelineLogStreamingTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/resources/services/ingestionpipelines/IngestionPipelineLogStreamingTest.java index 4a83b45d4f7..e6a9239ba8a 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/resources/services/ingestionpipelines/IngestionPipelineLogStreamingTest.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/resources/services/ingestionpipelines/IngestionPipelineLogStreamingTest.java @@ -23,6 +23,7 @@ import java.util.concurrent.CompletableFuture; import org.apache.commons.io.IOUtils; import org.junit.jupiter.api.*; import org.openmetadata.schema.api.configuration.LogStorageConfiguration; +import org.openmetadata.schema.security.credentials.AWSCredentials; import org.openmetadata.service.OpenMetadataApplicationTest; import org.openmetadata.service.logstorage.S3LogStorage; import org.testcontainers.containers.GenericContainer; @@ -77,7 +78,7 @@ public class IngestionPipelineLogStreamingTest extends OpenMetadataApplicationTe config.setType(LogStorageConfiguration.Type.S_3); config.setBucketName(TEST_BUCKET); config.setPrefix("pipeline-logs/"); - config.setRegion("us-east-1"); + config.withAwsConfig(new AWSCredentials().withAwsRegion("us-east-1")); config.setMaxConcurrentStreams(10); config.setStreamTimeoutMinutes(5); config.setAsyncBufferSizeMB(5); @@ -338,7 +339,7 @@ public class IngestionPipelineLogStreamingTest extends OpenMetadataApplicationTe failConfig.setType(LogStorageConfiguration.Type.S_3); failConfig.setBucketName(TEST_BUCKET); failConfig.setPrefix("pipeline-logs/"); - failConfig.setRegion("us-east-1"); + failConfig.withAwsConfig(new AWSCredentials().withAwsRegion("us-east-1")); Map failConfigMap = new HashMap<>(); failConfigMap.put("config", failConfig); diff --git a/openmetadata-spec/src/main/resources/json/schema/api/services/ingestionPipelines/createIngestionPipeline.json b/openmetadata-spec/src/main/resources/json/schema/api/services/ingestionPipelines/createIngestionPipeline.json index fc8a03521df..86617664d9f 100644 --- a/openmetadata-spec/src/main/resources/json/schema/api/services/ingestionPipelines/createIngestionPipeline.json +++ b/openmetadata-spec/src/main/resources/json/schema/api/services/ingestionPipelines/createIngestionPipeline.json @@ -59,6 +59,11 @@ "processingEngine": { "description": "The processing engine responsible for executing the ingestion pipeline logic.", "$ref": "../../../type/entityReference.json" + }, + "enableStreamableLogs": { + "description": "Enable real-time log streaming to the OpenMetadata server. When enabled, ingestion logs will be automatically shipped to the server's configured log storage backend (S3 or compatible).", + "type": "boolean", + "default": false } }, "required": [ diff --git a/openmetadata-spec/src/main/resources/json/schema/configuration/logStorageConfiguration.json b/openmetadata-spec/src/main/resources/json/schema/configuration/logStorageConfiguration.json index 0b7e3cefcf0..5e937312778 100644 --- a/openmetadata-spec/src/main/resources/json/schema/configuration/logStorageConfiguration.json +++ b/openmetadata-spec/src/main/resources/json/schema/configuration/logStorageConfiguration.json @@ -12,6 +12,11 @@ "enum": ["default", "s3"], "default": "default" }, + "enabled": { + "description": "Enable it for pipelines deployed in the server", + "type": "boolean", + "default": false + }, "bucketName": { "description": "S3 bucket name for storing logs (required for S3 type)", "type": "string" @@ -21,10 +26,6 @@ "type": "string", "default": "pipeline-logs" }, - "region": { - "description": "AWS region for the S3 bucket (required for S3 type)", - "type": "string" - }, "awsConfig": { "description": "AWS credentials configuration", "$ref": "../security/credentials/awsCredentials.json" @@ -34,6 +35,15 @@ "type": "boolean", "default": true }, + "sseAlgorithm": { + "description": "Server-side encryption algorithm (if applicable)", + "type": "string", + "enum": ["AES256", "aws:kms"] + }, + "kmsKeyId": { + "description": "KMS Key ID for server-side encryption (if applicable)", + "type": "string" + }, "storageClass": { "description": "S3 storage class for log objects", "type": "string", diff --git a/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/application.json b/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/application.json index bd37ce5d74e..882f33ae186 100644 --- a/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/application.json +++ b/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/application.json @@ -29,6 +29,11 @@ "pipelineRunId": { "description": "Unique identifier of pipeline run, used to identify the current pipeline run", "$ref": "../type/basic.json#/definitions/uuid" + }, + "enableStreamableLogs": { + "description": "Enable streaming logs to a remote log storage via the OpenMetadata Server", + "type": "boolean", + "default": false } }, "required": ["workflowConfig"], diff --git a/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/workflow.json b/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/workflow.json index f411b6d0d80..cb5c9f2636c 100644 --- a/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/workflow.json +++ b/openmetadata-spec/src/main/resources/json/schema/metadataIngestion/workflow.json @@ -234,6 +234,11 @@ "pipelineRunId": { "description": "Unique identifier of pipeline run, used to identify the current pipeline run", "$ref": "../type/basic.json#/definitions/uuid" + }, + "enableStreamableLogs": { + "description": "Enable streaming logs to a remote log storage via the OpenMetadata Server", + "type": "boolean", + "default": false } }, "required": ["source", "workflowConfig"], diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/api/services/ingestionPipelines/createIngestionPipeline.ts b/openmetadata-ui/src/main/resources/ui/src/generated/api/services/ingestionPipelines/createIngestionPipeline.ts index 82210351449..bf3cb22fb61 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/api/services/ingestionPipelines/createIngestionPipeline.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/api/services/ingestionPipelines/createIngestionPipeline.ts @@ -27,6 +27,12 @@ export interface CreateIngestionPipeline { * Fully qualified names of the domains the Ingestion Pipeline belongs to. */ domains?: string[]; + /** + * Enable real-time log streaming to the OpenMetadata server. When enabled, ingestion logs + * will be automatically shipped to the server's configured log storage backend (S3 or + * compatible). + */ + enableStreamableLogs?: boolean; /** * Set the logging level for the workflow. */ diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/configuration/logStorageConfiguration.ts b/openmetadata-ui/src/main/resources/ui/src/generated/configuration/logStorageConfiguration.ts index 74e6a0e957f..c9a8c22db29 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/configuration/logStorageConfiguration.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/configuration/logStorageConfiguration.ts @@ -26,6 +26,10 @@ export interface LogStorageConfiguration { * S3 bucket name for storing logs (required for S3 type) */ bucketName?: string; + /** + * Enable it for pipelines deployed in the server + */ + enabled?: boolean; /** * Enable server-side encryption for S3 objects */ @@ -34,6 +38,10 @@ export interface LogStorageConfiguration { * Number of days after which logs are automatically deleted (0 means no expiration) */ expirationDays?: number; + /** + * KMS Key ID for server-side encryption (if applicable) + */ + kmsKeyId?: string; /** * Maximum number of concurrent log streams allowed */ @@ -43,9 +51,9 @@ export interface LogStorageConfiguration { */ prefix?: string; /** - * AWS region for the S3 bucket (required for S3 type) + * Server-side encryption algorithm (if applicable) */ - region?: string; + sseAlgorithm?: SSEAlgorithm; /** * S3 storage class for log objects */ @@ -108,6 +116,14 @@ export interface AWSCredentials { profileName?: string; } +/** + * Server-side encryption algorithm (if applicable) + */ +export enum SSEAlgorithm { + Aes256 = "AES256", + AwsKms = "aws:kms", +} + /** * S3 storage class for log objects */ diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/configuration/pipelineServiceClientConfiguration.ts b/openmetadata-ui/src/main/resources/ui/src/generated/configuration/pipelineServiceClientConfiguration.ts index 05ae63fc891..2b542f46b99 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/configuration/pipelineServiceClientConfiguration.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/configuration/pipelineServiceClientConfiguration.ts @@ -266,6 +266,10 @@ export interface LogStorageConfiguration { * S3 bucket name for storing logs (required for S3 type) */ bucketName?: string; + /** + * Enable it for pipelines deployed in the server + */ + enabled?: boolean; /** * Enable server-side encryption for S3 objects */ @@ -274,6 +278,10 @@ export interface LogStorageConfiguration { * Number of days after which logs are automatically deleted (0 means no expiration) */ expirationDays?: number; + /** + * KMS Key ID for server-side encryption (if applicable) + */ + kmsKeyId?: string; /** * Maximum number of concurrent log streams allowed */ @@ -283,9 +291,9 @@ export interface LogStorageConfiguration { */ prefix?: string; /** - * AWS region for the S3 bucket (required for S3 type) + * Server-side encryption algorithm (if applicable) */ - region?: string; + sseAlgorithm?: SSEAlgorithm; /** * S3 storage class for log objects */ @@ -348,6 +356,14 @@ export interface AWSCredentials { profileName?: string; } +/** + * Server-side encryption algorithm (if applicable) + */ +export enum SSEAlgorithm { + Aes256 = "AES256", + AwsKms = "aws:kms", +} + /** * S3 storage class for log objects */ diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/application.ts b/openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/application.ts index 0b50dfd035b..f063949a72b 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/application.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/application.ts @@ -23,6 +23,10 @@ export interface Application { * External Application Private configuration */ appPrivateConfig?: PrivateConfig; + /** + * Enable streaming logs to a remote log storage via the OpenMetadata Server + */ + enableStreamableLogs?: boolean; /** * Fully qualified name of ingestion pipeline, used to identify the current ingestion * pipeline diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/workflow.ts b/openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/workflow.ts index 81668e55aa1..a7cfc648230 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/workflow.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/metadataIngestion/workflow.ts @@ -33,6 +33,10 @@ export interface Workflow { */ export interface OpenMetadataWorkflowConfig { bulkSink?: BulkSink; + /** + * Enable streaming logs to a remote log storage via the OpenMetadata Server + */ + enableStreamableLogs?: boolean; /** * Fully qualified name of ingestion pipeline, used to identify the current ingestion * pipeline diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/settings/settings.ts b/openmetadata-ui/src/main/resources/ui/src/generated/settings/settings.ts index 7f5d4734076..09030f8f7ce 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/settings/settings.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/settings/settings.ts @@ -1577,6 +1577,10 @@ export interface LogStorageConfiguration { * S3 bucket name for storing logs (required for S3 type) */ bucketName?: string; + /** + * Enable it for pipelines deployed in the server + */ + enabled?: boolean; /** * Enable server-side encryption for S3 objects */ @@ -1585,6 +1589,10 @@ export interface LogStorageConfiguration { * Number of days after which logs are automatically deleted (0 means no expiration) */ expirationDays?: number; + /** + * KMS Key ID for server-side encryption (if applicable) + */ + kmsKeyId?: string; /** * Maximum number of concurrent log streams allowed */ @@ -1594,9 +1602,9 @@ export interface LogStorageConfiguration { */ prefix?: string; /** - * AWS region for the S3 bucket (required for S3 type) + * Server-side encryption algorithm (if applicable) */ - region?: string; + sseAlgorithm?: SSEAlgorithm; /** * S3 storage class for log objects */ @@ -1659,6 +1667,14 @@ export interface AWSCredentials { profileName?: string; } +/** + * Server-side encryption algorithm (if applicable) + */ +export enum SSEAlgorithm { + Aes256 = "AES256", + AwsKms = "aws:kms", +} + /** * S3 storage class for log objects */