mirror of
https://github.com/open-metadata/OpenMetadata.git
synced 2025-12-26 23:18:25 +00:00
Fix bug in secrets manager with SSO configuration (#8932)
* Fix bug in secrets manager with SSO configuration * Remove comment from imports * Sort imports * Add missing decrypt when updating * Add minor change
This commit is contained in:
parent
33e85456fc
commit
5151db4ae0
@ -75,9 +75,10 @@ class CustomSecretStr(SecretStr):
|
||||
)
|
||||
return str(self)
|
||||
|
||||
def get_secret_value(self) -> str:
|
||||
def get_secret_value(self, skip_secret_manager: bool = False) -> str:
|
||||
if (
|
||||
self._secret_value.startswith("secret:")
|
||||
not skip_secret_manager
|
||||
and self._secret_value.startswith("secret:")
|
||||
and SecretsManagerFactory().get_secrets_manager()
|
||||
):
|
||||
secret_id = self._secret_value.replace("secret:", "")
|
||||
|
||||
@ -11,10 +11,11 @@
|
||||
"""
|
||||
Custom pydantic encoders
|
||||
"""
|
||||
|
||||
from pydantic import SecretStr
|
||||
from pydantic.json import pydantic_encoder
|
||||
|
||||
from metadata.ingestion.models.custom_pydantic import CustomSecretStr
|
||||
|
||||
|
||||
def show_secrets_encoder(obj):
|
||||
"""
|
||||
@ -22,6 +23,9 @@ def show_secrets_encoder(obj):
|
||||
:param obj: Pydantic Model
|
||||
:return: JSON repr
|
||||
"""
|
||||
if isinstance(obj, CustomSecretStr):
|
||||
return obj.get_secret_value(skip_secret_manager=True) if obj else None
|
||||
|
||||
if isinstance(obj, SecretStr):
|
||||
return obj.get_secret_value() if obj else None
|
||||
|
||||
|
||||
@ -66,7 +66,7 @@ class SecretsManagerFactory(metaclass=Singleton):
|
||||
return NoopSecretsManager()
|
||||
if secrets_manager_provider in (
|
||||
SecretsManagerProvider.aws,
|
||||
SecretsManagerProvider.managed_aws_ssm,
|
||||
SecretsManagerProvider.managed_aws,
|
||||
):
|
||||
return AWSSecretsManager(credentials)
|
||||
if secrets_manager_provider in (
|
||||
|
||||
@ -30,11 +30,15 @@ from openmetadata_managed_apis.api.utils import (
|
||||
scan_dags_job_background,
|
||||
)
|
||||
from openmetadata_managed_apis.utils.logger import operations_logger
|
||||
from openmetadata_managed_apis.workflows.ingestion.credentials_builder import (
|
||||
build_secrets_manager_credentials,
|
||||
)
|
||||
|
||||
from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import (
|
||||
IngestionPipeline,
|
||||
)
|
||||
from metadata.ingestion.models.encoders import show_secrets_encoder
|
||||
from metadata.utils.secrets.secrets_manager_factory import SecretsManagerFactory
|
||||
|
||||
logger = operations_logger()
|
||||
|
||||
@ -56,7 +60,13 @@ class DagDeployer:
|
||||
logger.info(
|
||||
f"Received the following Airflow Configuration: {ingestion_pipeline.airflowConfig}"
|
||||
)
|
||||
|
||||
# we need to instantiate the secret manager in case secrets are passed
|
||||
SecretsManagerFactory(
|
||||
ingestion_pipeline.openMetadataServerConnection.secretsManagerProvider,
|
||||
build_secrets_manager_credentials(
|
||||
ingestion_pipeline.openMetadataServerConnection.secretsManagerProvider
|
||||
),
|
||||
)
|
||||
self.ingestion_pipeline = ingestion_pipeline
|
||||
self.dag_id = clean_dag_id(self.ingestion_pipeline.name.__root__)
|
||||
|
||||
|
||||
@ -7,6 +7,7 @@ from metadata.generated.schema.security.credentials.awsCredentials import AWSCre
|
||||
from metadata.generated.schema.security.secrets.secretsManagerProvider import (
|
||||
SecretsManagerProvider,
|
||||
)
|
||||
from metadata.ingestion.models.custom_pydantic import CustomSecretStr
|
||||
from metadata.utils.secrets.secrets_manager import SECRET_MANAGER_AIRFLOW_CONF
|
||||
|
||||
|
||||
@ -15,10 +16,10 @@ def build_aws_credentials() -> Optional[AWSCredentials]:
|
||||
credentials = AWSCredentials(
|
||||
awsRegion=conf.get(SECRET_MANAGER_AIRFLOW_CONF, "aws_region", fallback="")
|
||||
)
|
||||
credentials.awsAccessKeyId = SecretStr(
|
||||
conf.get(SECRET_MANAGER_AIRFLOW_CONF, "aws_access_key_id", fallback="")
|
||||
credentials.awsAccessKeyId = conf.get(
|
||||
SECRET_MANAGER_AIRFLOW_CONF, "aws_access_key_id", fallback=""
|
||||
)
|
||||
credentials.awsSecretAccessKey = SecretStr(
|
||||
credentials.awsSecretAccessKey = CustomSecretStr(
|
||||
conf.get(SECRET_MANAGER_AIRFLOW_CONF, "aws_secret_access_key", fallback="")
|
||||
)
|
||||
return credentials
|
||||
|
||||
@ -23,11 +23,15 @@ from airflow.models import DAG
|
||||
# these are params that cannot be a dag name
|
||||
from openmetadata_managed_apis.utils.logger import workflow_logger
|
||||
from openmetadata_managed_apis.workflows.config import load_config_file
|
||||
from openmetadata_managed_apis.workflows.ingestion.credentials_builder import (
|
||||
build_secrets_manager_credentials,
|
||||
)
|
||||
from openmetadata_managed_apis.workflows.workflow_builder import WorkflowBuilder
|
||||
|
||||
from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import (
|
||||
IngestionPipeline,
|
||||
)
|
||||
from metadata.utils.secrets.secrets_manager_factory import SecretsManagerFactory
|
||||
|
||||
logger = workflow_logger()
|
||||
|
||||
@ -47,6 +51,13 @@ class WorkflowFactory:
|
||||
def __init__(self, airflow_pipeline: IngestionPipeline) -> None:
|
||||
self.dag = None
|
||||
self.airflow_pipeline = airflow_pipeline
|
||||
# we need to instantiate the secret manager in case secrets are passed
|
||||
SecretsManagerFactory(
|
||||
airflow_pipeline.openMetadataServerConnection.secretsManagerProvider,
|
||||
build_secrets_manager_credentials(
|
||||
airflow_pipeline.openMetadataServerConnection.secretsManagerProvider
|
||||
),
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def create(cls, config: str):
|
||||
|
||||
@ -226,7 +226,7 @@ public class IngestionPipelineRepository extends EntityRepository<IngestionPipel
|
||||
/** Handles entity updated from PUT and POST operation. */
|
||||
public class IngestionPipelineUpdater extends EntityUpdater {
|
||||
public IngestionPipelineUpdater(IngestionPipeline original, IngestionPipeline updated, Operation operation) {
|
||||
super(original, updated, operation);
|
||||
super(buildIngestionPipelineDecrypted(original), updated, operation);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -286,4 +286,10 @@ public class IngestionPipelineRepository extends EntityRepository<IngestionPipel
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static IngestionPipeline buildIngestionPipelineDecrypted(IngestionPipeline original) {
|
||||
IngestionPipeline decrypted = JsonUtils.convertValue(JsonUtils.getMap(original), IngestionPipeline.class);
|
||||
decrypted = SecretsManagerFactory.getSecretsManager().encryptOrDecryptIngestionPipeline(decrypted, false);
|
||||
return decrypted;
|
||||
}
|
||||
}
|
||||
|
||||
@ -324,6 +324,7 @@ public class IngestionPipelineResource extends EntityResource<IngestionPipeline,
|
||||
@Context UriInfo uriInfo, @Context SecurityContext securityContext, @Valid CreateIngestionPipeline create)
|
||||
throws IOException {
|
||||
IngestionPipeline ingestionPipeline = getIngestionPipeline(create, securityContext.getUserPrincipal().getName());
|
||||
decryptOrNullify(securityContext, ingestionPipeline);
|
||||
Response response = create(uriInfo, securityContext, ingestionPipeline);
|
||||
decryptOrNullify(securityContext, (IngestionPipeline) response.getEntity());
|
||||
return response;
|
||||
@ -375,6 +376,7 @@ public class IngestionPipelineResource extends EntityResource<IngestionPipeline,
|
||||
@Context UriInfo uriInfo, @Context SecurityContext securityContext, @Valid CreateIngestionPipeline update)
|
||||
throws IOException {
|
||||
IngestionPipeline ingestionPipeline = getIngestionPipeline(update, securityContext.getUserPrincipal().getName());
|
||||
decryptOrNullify(securityContext, ingestionPipeline);
|
||||
Response response = createOrUpdate(uriInfo, securityContext, ingestionPipeline);
|
||||
decryptOrNullify(securityContext, (IngestionPipeline) response.getEntity());
|
||||
return response;
|
||||
|
||||
@ -16,6 +16,7 @@ package org.openmetadata.service.util;
|
||||
import static org.openmetadata.schema.entity.services.ingestionPipelines.PipelineType.METADATA;
|
||||
|
||||
import java.util.List;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
import org.openmetadata.schema.entity.services.ingestionPipelines.IngestionPipeline;
|
||||
import org.openmetadata.schema.metadataIngestion.DatabaseServiceMetadataPipeline;
|
||||
import org.openmetadata.schema.metadataIngestion.dbtconfig.DbtCloudConfig;
|
||||
@ -23,6 +24,12 @@ import org.openmetadata.schema.metadataIngestion.dbtconfig.DbtGCSConfig;
|
||||
import org.openmetadata.schema.metadataIngestion.dbtconfig.DbtHttpConfig;
|
||||
import org.openmetadata.schema.metadataIngestion.dbtconfig.DbtLocalConfig;
|
||||
import org.openmetadata.schema.metadataIngestion.dbtconfig.DbtS3Config;
|
||||
import org.openmetadata.schema.security.client.Auth0SSOClientConfig;
|
||||
import org.openmetadata.schema.security.client.AzureSSOClientConfig;
|
||||
import org.openmetadata.schema.security.client.CustomOIDCSSOClientConfig;
|
||||
import org.openmetadata.schema.security.client.GoogleSSOClientConfig;
|
||||
import org.openmetadata.schema.security.client.OktaSSOClientConfig;
|
||||
import org.openmetadata.schema.security.client.OpenMetadataJWTClientConfig;
|
||||
import org.openmetadata.service.Entity;
|
||||
|
||||
public class IngestionPipelineBuilder {
|
||||
@ -30,6 +37,15 @@ public class IngestionPipelineBuilder {
|
||||
private static final List<Class<?>> DBT_CONFIG_CLASSES =
|
||||
List.of(DbtCloudConfig.class, DbtGCSConfig.class, DbtHttpConfig.class, DbtLocalConfig.class, DbtS3Config.class);
|
||||
|
||||
private static final List<Class<?>> SECURITY_CONFIG_CLASSES =
|
||||
List.of(
|
||||
OpenMetadataJWTClientConfig.class,
|
||||
GoogleSSOClientConfig.class,
|
||||
OktaSSOClientConfig.class,
|
||||
Auth0SSOClientConfig.class,
|
||||
AzureSSOClientConfig.class,
|
||||
CustomOIDCSSOClientConfig.class);
|
||||
|
||||
/**
|
||||
* Build `IngestionPipeline` object with concrete class for the config which by definition it is a `Object`.
|
||||
*
|
||||
@ -49,18 +65,33 @@ public class IngestionPipelineBuilder {
|
||||
databaseServiceMetadataPipeline.withDbtConfigSource(
|
||||
buildDbtConfigSource(databaseServiceMetadataPipeline.getDbtConfigSource())));
|
||||
}
|
||||
if (ingestionPipeline.getOpenMetadataServerConnection() != null) {
|
||||
ingestionPipeline
|
||||
.getOpenMetadataServerConnection()
|
||||
.setSecurityConfig(
|
||||
buildSecurityConfig(ingestionPipeline.getOpenMetadataServerConnection().getSecurityConfig()));
|
||||
}
|
||||
return ingestionPipeline;
|
||||
}
|
||||
|
||||
private static Object buildDbtConfigSource(Object config) {
|
||||
return buildBasedOnClassList(config, DBT_CONFIG_CLASSES);
|
||||
}
|
||||
|
||||
private static Object buildSecurityConfig(Object config) {
|
||||
return buildBasedOnClassList(config, SECURITY_CONFIG_CLASSES);
|
||||
}
|
||||
|
||||
@Nullable
|
||||
private static Object buildBasedOnClassList(Object config, List<Class<?>> listOfClasses) {
|
||||
if (config != null) {
|
||||
for (Class<?> clazz : DBT_CONFIG_CLASSES) {
|
||||
for (Class<?> clazz : listOfClasses) {
|
||||
try {
|
||||
return JsonUtils.convertValue(config, clazz);
|
||||
} catch (Exception ignored) {
|
||||
}
|
||||
}
|
||||
throw new IllegalArgumentException("Impossible to parse the config of the source config.");
|
||||
throw new IllegalArgumentException("Impossible to parse the object of the Ingestion Pipeline.");
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user