From 97fad806a22a9a029606bf7cd2d66b47d9bdd901 Mon Sep 17 00:00:00 2001 From: Imri Paran Date: Tue, 25 Feb 2025 17:51:49 +0100 Subject: [PATCH] Fixes 19755: Publish app config with status (#19754) * feat(app): add config to status add config to the reported status of the ingestion pipeline * added separate pipeline service client call for external apps * fix masking of pydantic model * - overload model_dump to mask secrets instead of a separate method - moved tests to test_custom_pydantic.py * fix: execution time * fix: mask secrets in dump json * fix: for python3.8 * fix: for python3.8 * fix: use mask_secrets=False when dumping a model for create * format * fix: update mask_secrets=False for workflow configurations * fix: use context directly when using model_dump_json * fix: default behavior when dumping json * format * fixed tests --- .../ingestion/models/custom_pydantic.py | 45 ++++++++++-- .../src/metadata/ingestion/ometa/ometa_api.py | 9 ++- .../workflow/workflow_status_mixin.py | 6 ++ ingestion/tests/integration/ometa/conftest.py | 2 +- .../tests/unit/models/test_custom_pydantic.py | 69 +++++++++++++++++++ .../workflows/ingestion/application.py | 4 +- .../ingestion/auto_classification.py | 4 +- .../workflows/ingestion/common.py | 4 +- .../workflows/ingestion/profiler.py | 4 +- .../workflows/ingestion/test_suite.py | 4 +- .../workflows/ingestion/usage.py | 4 +- .../jdbi3/IngestionPipelineRepository.java | 13 ++++ .../service/resources/apps/AppResource.java | 8 ++- 13 files changed, 160 insertions(+), 16 deletions(-) diff --git a/ingestion/src/metadata/ingestion/models/custom_pydantic.py b/ingestion/src/metadata/ingestion/models/custom_pydantic.py index e981f6add80..c2ac5a5b810 100644 --- a/ingestion/src/metadata/ingestion/models/custom_pydantic.py +++ b/ingestion/src/metadata/ingestion/models/custom_pydantic.py @@ -20,9 +20,10 @@ import logging from typing import Any, Dict, Literal, Optional, Union from pydantic import BaseModel as PydanticBaseModel -from pydantic import PlainSerializer, model_validator +from pydantic import WrapSerializer, model_validator from pydantic.main import IncEx from pydantic.types import SecretStr +from pydantic_core.core_schema import SerializationInfo from typing_extensions import Annotated from metadata.ingestion.models.custom_basemodel_validation import ( @@ -72,6 +73,7 @@ class BaseModel(PydanticBaseModel): def model_dump_json( # pylint: disable=too-many-arguments self, *, + mask_secrets: bool = None, indent: Optional[int] = None, include: IncEx = None, exclude: IncEx = None, @@ -92,10 +94,20 @@ class BaseModel(PydanticBaseModel): This solution is covered in the `test_pydantic_v2` test comparing the dump results from V1 vs. V2. + + mask_secrets: bool - Can be overridedn by either passing it as an argument or setting it in the context. + With the following rules: + - if mask_secrets is not None, it will be used as is + - if mask_secrets is None and context is not None, it will be set to context.get("mask_secrets", True) + - if mask_secrets is None and context is None, it will be set to True + """ + if mask_secrets is None: + mask_secrets = context.get("mask_secrets", True) if context else True return json.dumps( self.model_dump( mode="json", + mask_secrets=mask_secrets, include=include, exclude=exclude, context=context, @@ -110,6 +122,18 @@ class BaseModel(PydanticBaseModel): ensure_ascii=True, ) + def model_dump( + self, + *, + mask_secrets: bool = False, + **kwargs, + ) -> Dict[str, Any]: + if mask_secrets: + context = kwargs.pop("context", None) or {} + context["mask_secrets"] = True + kwargs["context"] = context + return super().model_dump(**kwargs) + class _CustomSecretStr(SecretStr): """ @@ -117,6 +141,9 @@ class _CustomSecretStr(SecretStr): If the secret string value starts with `config:` it will use the rest of the string as secret id to search for it in the secrets store. + + By default the secrets will be unmasked when dumping ot python objects and masked when dumping to json unless + explicitly set otherwise using the `mask_secrets` or `context` arguments. """ def __repr__(self) -> str: @@ -154,9 +181,19 @@ class _CustomSecretStr(SecretStr): return self._secret_value -CustomSecretStr = Annotated[ - _CustomSecretStr, PlainSerializer(lambda secret: secret.get_secret_value()) -] +def handle_secret(value: Any, handler, info: SerializationInfo) -> str: + """ + Handle the secret value in the model. + """ + if not (info.context is not None and info.context.get("mask_secrets", False)): + if info.mode == "json": + # short circuit the json serialization and return the actual value + return value.get_secret_value() + return handler(value.get_secret_value()) + return str(value) # use pydantic's logic to mask the secret + + +CustomSecretStr = Annotated[_CustomSecretStr, WrapSerializer(handle_secret)] def ignore_type_decoder(type_: Any) -> None: diff --git a/ingestion/src/metadata/ingestion/ometa/ometa_api.py b/ingestion/src/metadata/ingestion/ometa/ometa_api.py index 1217d8950af..6a63c85afb6 100644 --- a/ingestion/src/metadata/ingestion/ometa/ometa_api.py +++ b/ingestion/src/metadata/ingestion/ometa/ometa_api.py @@ -17,8 +17,6 @@ working with OpenMetadata entities. import traceback from typing import Dict, Generic, Iterable, List, Optional, Type, TypeVar, Union -from pydantic import BaseModel - from metadata.generated.schema.api.createBot import CreateBot from metadata.generated.schema.api.services.ingestionPipelines.createIngestionPipeline import ( CreateIngestionPipelineRequest, @@ -30,6 +28,7 @@ from metadata.generated.schema.type import basic from metadata.generated.schema.type.basic import FullyQualifiedEntityName from metadata.generated.schema.type.entityHistory import EntityVersionHistory from metadata.generated.schema.type.entityReference import EntityReference +from metadata.ingestion.models.custom_pydantic import BaseModel from metadata.ingestion.ometa.auth_provider import OpenMetadataAuthenticationProvider from metadata.ingestion.ometa.client import REST, APIError, ClientConfig from metadata.ingestion.ometa.mixins.custom_property_mixin import ( @@ -268,7 +267,11 @@ class OpenMetadata( ) fn = getattr(self.client, method) - resp = fn(self.get_suffix(entity), data=data.model_dump_json()) + resp = fn( + # this might be a regular pydantic model so we build the context manually + self.get_suffix(entity), + data=data.model_dump_json(context={"mask_secrets": False}), + ) if not resp: raise EmptyPayloadException( f"Got an empty response when trying to PUT to {self.get_suffix(entity)}, {data.model_dump_json()}" diff --git a/ingestion/src/metadata/workflow/workflow_status_mixin.py b/ingestion/src/metadata/workflow/workflow_status_mixin.py index fe8d99715c2..1a3b93b186d 100644 --- a/ingestion/src/metadata/workflow/workflow_status_mixin.py +++ b/ingestion/src/metadata/workflow/workflow_status_mixin.py @@ -111,6 +111,12 @@ class WorkflowStatusMixin: pipeline_status.status = ( ingestion_status if ingestion_status else pipeline_status.status ) + # committing configurations can be a burden on resources, + # we dump a subset to be mindful of the payload size + pipeline_status.config = self.config.model_dump( + include={"appConfig"}, + mask_secrets=True, + ) self.metadata.create_or_update_pipeline_status( self.ingestion_pipeline.fullyQualifiedName.root, pipeline_status ) diff --git a/ingestion/tests/integration/ometa/conftest.py b/ingestion/tests/integration/ometa/conftest.py index c1516f65694..ca9bacb3ec5 100644 --- a/ingestion/tests/integration/ometa/conftest.py +++ b/ingestion/tests/integration/ometa/conftest.py @@ -104,7 +104,7 @@ def workflow(metadata, service, mysql_container): password=mysql_container.password, ), hostPort=f"localhost:{mysql_container.get_exposed_port(3306)}", - ).model_dump_json(), + ).model_dump_json(mask_secrets=False), source_config={}, ) ) diff --git a/ingestion/tests/unit/models/test_custom_pydantic.py b/ingestion/tests/unit/models/test_custom_pydantic.py index b4a1d92c350..b3169c33a3f 100644 --- a/ingestion/tests/unit/models/test_custom_pydantic.py +++ b/ingestion/tests/unit/models/test_custom_pydantic.py @@ -1,4 +1,5 @@ import uuid +from typing import List from unittest import TestCase from metadata.generated.schema.api.data.createTable import CreateTableRequest @@ -16,6 +17,7 @@ from metadata.generated.schema.type.basic import ( Markdown, ) from metadata.generated.schema.type.entityReference import EntityReference +from metadata.ingestion.models.custom_pydantic import BaseModel, CustomSecretStr class CustomPydanticValidationTest(TestCase): @@ -141,3 +143,70 @@ class CustomPydanticValidationTest(TestCase): ) assert fetch_response_revert_separator.name.root == "test::table" assert fetch_response_revert_separator_2.name.root == "test::table>" + + +class NestedModel(BaseModel): + secret: CustomSecretStr + value: int + + +class RootModel(BaseModel): + root_secret: CustomSecretStr + nested: NestedModel + items: List[NestedModel] + + +data = { + "root_secret": "root_password", + "nested": {"secret": "nested_password", "value": 42}, + "items": [ + {"secret": "item1_password", "value": 1}, + {"secret": "item2_password", "value": 2}, + ], +} + +model = RootModel(**data) +masked_data = model.model_dump(mask_secrets=True) + + +def test_model_dump_secrets(): + """Test model_dump_masked with root, nested, and list structures.""" + + assert masked_data["root_secret"] == "**********" + assert masked_data["nested"]["secret"] == "**********" + assert masked_data["nested"]["value"] == 42 + assert masked_data["items"][0]["secret"] == "**********" + assert masked_data["items"][0]["value"] == 1 + assert masked_data["items"][1]["secret"] == "**********" + assert masked_data["items"][1]["value"] == 2 + + plain_data = model.model_dump(mask_secrets=False) + assert plain_data["root_secret"] == "root_password" + assert plain_data["nested"]["secret"] == "nested_password" + assert plain_data["items"][0]["secret"] == "item1_password" + + default_dump = model.model_dump() + assert default_dump["root_secret"] == "root_password" + assert default_dump["nested"]["secret"] == "nested_password" + assert default_dump["items"][0]["secret"] == "item1_password" + + +def test_model_dump_json_secrets(): + assert ( + model.model_validate_json( + model.model_dump_json() + ).root_secret.get_secret_value() + == "**********" + ) + assert ( + model.model_validate_json( + model.model_dump_json(mask_secrets=True) + ).root_secret.get_secret_value() + == "**********" + ) + assert ( + model.model_validate_json( + model.model_dump_json(mask_secrets=False) + ).root_secret.get_secret_value() + == "root_password" + ) diff --git a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/application.py b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/application.py index 17a6c9e1375..eff390d5540 100644 --- a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/application.py +++ b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/application.py @@ -48,7 +48,9 @@ def application_workflow(workflow_config: OpenMetadataApplicationConfig): set_operator_logger(workflow_config) - config = json.loads(workflow_config.model_dump_json(exclude_defaults=False)) + config = json.loads( + workflow_config.model_dump_json(exclude_defaults=False, mask_secrets=False) + ) workflow = ApplicationWorkflow.create(config) workflow.execute() diff --git a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/auto_classification.py b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/auto_classification.py index 4d1f81d223b..3bec08271aa 100644 --- a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/auto_classification.py +++ b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/auto_classification.py @@ -42,7 +42,9 @@ def auto_classification_workflow(workflow_config: OpenMetadataWorkflowConfig): set_operator_logger(workflow_config) - config = json.loads(workflow_config.model_dump_json(exclude_defaults=False)) + config = json.loads( + workflow_config.model_dump_json(exclude_defaults=False, mask_secrets=False) + ) workflow = AutoClassificationWorkflow.create(config) workflow.execute() diff --git a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/common.py b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/common.py index 5077964f2b7..edcca48b20a 100644 --- a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/common.py +++ b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/common.py @@ -204,7 +204,9 @@ def metadata_ingestion_workflow(workflow_config: OpenMetadataWorkflowConfig): set_operator_logger(workflow_config) - config = json.loads(workflow_config.model_dump_json(exclude_defaults=False)) + config = json.loads( + workflow_config.model_dump_json(exclude_defaults=False, mask_secrets=False) + ) workflow = MetadataWorkflow.create(config) workflow.execute() diff --git a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/profiler.py b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/profiler.py index 1053912be15..d92dc6dcd46 100644 --- a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/profiler.py +++ b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/profiler.py @@ -42,7 +42,9 @@ def profiler_workflow(workflow_config: OpenMetadataWorkflowConfig): set_operator_logger(workflow_config) - config = json.loads(workflow_config.model_dump_json(exclude_defaults=False)) + config = json.loads( + workflow_config.model_dump_json(exclude_defaults=False, mask_secrets=False) + ) workflow = ProfilerWorkflow.create(config) workflow.execute() diff --git a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/test_suite.py b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/test_suite.py index 8ed7d76f0ef..97817c2e382 100644 --- a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/test_suite.py +++ b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/test_suite.py @@ -42,7 +42,9 @@ def test_suite_workflow(workflow_config: OpenMetadataWorkflowConfig): set_operator_logger(workflow_config) - config = json.loads(workflow_config.model_dump_json(exclude_defaults=False)) + config = json.loads( + workflow_config.model_dump_json(exclude_defaults=False, mask_secrets=False) + ) workflow = TestSuiteWorkflow.create(config) workflow.execute() diff --git a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/usage.py b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/usage.py index 27d0292a26f..2450f24f97a 100644 --- a/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/usage.py +++ b/openmetadata-airflow-apis/openmetadata_managed_apis/workflows/ingestion/usage.py @@ -46,7 +46,9 @@ def usage_workflow(workflow_config: OpenMetadataWorkflowConfig): set_operator_logger(workflow_config) - config = json.loads(workflow_config.model_dump_json(exclude_defaults=False)) + config = json.loads( + workflow_config.model_dump_json(exclude_defaults=False, mask_secrets=False) + ) workflow = UsageWorkflow.create(config) workflow.execute() diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/IngestionPipelineRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/IngestionPipelineRepository.java index 993456bc5f7..f606ad8e13e 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/IngestionPipelineRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/IngestionPipelineRepository.java @@ -281,6 +281,19 @@ public class IngestionPipelineRepository extends EntityRepository listExternalAppStatus( + String ingestionPipelineFQN, Long startTs, Long endTs) { + return listPipelineStatus(ingestionPipelineFQN, startTs, endTs) + .map( + pipelineStatus -> + pipelineStatus.withConfig( + Optional.ofNullable(pipelineStatus.getConfig().getOrDefault("appConfig", null)) + .map(JsonUtils::getMap) + .orElse(null))); + } + public PipelineStatus getLatestPipelineStatus(IngestionPipeline ingestionPipeline) { return JsonUtils.readValue( getLatestExtensionFromTimeSeries( diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/resources/apps/AppResource.java b/openmetadata-service/src/main/java/org/openmetadata/service/resources/apps/AppResource.java index adeefbe7d4e..29e57ebaef6 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/resources/apps/AppResource.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/resources/apps/AppResource.java @@ -291,7 +291,7 @@ public class AppResource extends EntityResource { ingestionPipelineRepository.get( uriInfo, pipelineRef.getId(), ingestionPipelineRepository.getFields(FIELD_OWNERS)); return ingestionPipelineRepository - .listPipelineStatus(ingestionPipeline.getFullyQualifiedName(), startTs, endTs) + .listExternalAppStatus(ingestionPipeline.getFullyQualifiedName(), startTs, endTs) .map(pipelineStatus -> convertPipelineStatus(installation, pipelineStatus)); } throw new IllegalArgumentException("App does not have a scheduled deployment"); @@ -301,7 +301,11 @@ public class AppResource extends EntityResource { return new AppRunRecord() .withAppId(app.getId()) .withAppName(app.getName()) - .withExecutionTime(pipelineStatus.getStartDate()) + .withStartTime(pipelineStatus.getStartDate()) + .withExecutionTime( + pipelineStatus.getEndDate() == null + ? System.currentTimeMillis() - pipelineStatus.getStartDate() + : pipelineStatus.getEndDate() - pipelineStatus.getStartDate()) .withEndTime(pipelineStatus.getEndDate()) .withStatus( switch (pipelineStatus.getPipelineState()) {