Fixes 19755: Publish app config with status (#19754)

* feat(app): add config to status

add config to the reported status of the ingestion pipeline

* added separate pipeline service client call for external apps

* fix masking of pydantic model

* - overload model_dump to mask secrets instead of a separate method
- moved tests to test_custom_pydantic.py

* fix: execution time

* fix: mask secrets in dump json

* fix: for python3.8

* fix: for python3.8

* fix: use mask_secrets=False when dumping a model for create

* format

* fix: update mask_secrets=False for workflow configurations

* fix: use context directly when using model_dump_json

* fix: default behavior when dumping json

* format

* fixed tests
This commit is contained in:
Imri Paran 2025-02-25 17:51:49 +01:00 committed by GitHub
parent df00d2f392
commit 97fad806a2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 160 additions and 16 deletions

View File

@ -20,9 +20,10 @@ import logging
from typing import Any, Dict, Literal, Optional, Union
from pydantic import BaseModel as PydanticBaseModel
from pydantic import PlainSerializer, model_validator
from pydantic import WrapSerializer, model_validator
from pydantic.main import IncEx
from pydantic.types import SecretStr
from pydantic_core.core_schema import SerializationInfo
from typing_extensions import Annotated
from metadata.ingestion.models.custom_basemodel_validation import (
@ -72,6 +73,7 @@ class BaseModel(PydanticBaseModel):
def model_dump_json( # pylint: disable=too-many-arguments
self,
*,
mask_secrets: bool = None,
indent: Optional[int] = None,
include: IncEx = None,
exclude: IncEx = None,
@ -92,10 +94,20 @@ class BaseModel(PydanticBaseModel):
This solution is covered in the `test_pydantic_v2` test comparing the
dump results from V1 vs. V2.
mask_secrets: bool - Can be overridedn by either passing it as an argument or setting it in the context.
With the following rules:
- if mask_secrets is not None, it will be used as is
- if mask_secrets is None and context is not None, it will be set to context.get("mask_secrets", True)
- if mask_secrets is None and context is None, it will be set to True
"""
if mask_secrets is None:
mask_secrets = context.get("mask_secrets", True) if context else True
return json.dumps(
self.model_dump(
mode="json",
mask_secrets=mask_secrets,
include=include,
exclude=exclude,
context=context,
@ -110,6 +122,18 @@ class BaseModel(PydanticBaseModel):
ensure_ascii=True,
)
def model_dump(
self,
*,
mask_secrets: bool = False,
**kwargs,
) -> Dict[str, Any]:
if mask_secrets:
context = kwargs.pop("context", None) or {}
context["mask_secrets"] = True
kwargs["context"] = context
return super().model_dump(**kwargs)
class _CustomSecretStr(SecretStr):
"""
@ -117,6 +141,9 @@ class _CustomSecretStr(SecretStr):
If the secret string value starts with `config:` it will use the rest of the string as secret id to search for it
in the secrets store.
By default the secrets will be unmasked when dumping ot python objects and masked when dumping to json unless
explicitly set otherwise using the `mask_secrets` or `context` arguments.
"""
def __repr__(self) -> str:
@ -154,9 +181,19 @@ class _CustomSecretStr(SecretStr):
return self._secret_value
CustomSecretStr = Annotated[
_CustomSecretStr, PlainSerializer(lambda secret: secret.get_secret_value())
]
def handle_secret(value: Any, handler, info: SerializationInfo) -> str:
"""
Handle the secret value in the model.
"""
if not (info.context is not None and info.context.get("mask_secrets", False)):
if info.mode == "json":
# short circuit the json serialization and return the actual value
return value.get_secret_value()
return handler(value.get_secret_value())
return str(value) # use pydantic's logic to mask the secret
CustomSecretStr = Annotated[_CustomSecretStr, WrapSerializer(handle_secret)]
def ignore_type_decoder(type_: Any) -> None:

View File

@ -17,8 +17,6 @@ working with OpenMetadata entities.
import traceback
from typing import Dict, Generic, Iterable, List, Optional, Type, TypeVar, Union
from pydantic import BaseModel
from metadata.generated.schema.api.createBot import CreateBot
from metadata.generated.schema.api.services.ingestionPipelines.createIngestionPipeline import (
CreateIngestionPipelineRequest,
@ -30,6 +28,7 @@ from metadata.generated.schema.type import basic
from metadata.generated.schema.type.basic import FullyQualifiedEntityName
from metadata.generated.schema.type.entityHistory import EntityVersionHistory
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.models.custom_pydantic import BaseModel
from metadata.ingestion.ometa.auth_provider import OpenMetadataAuthenticationProvider
from metadata.ingestion.ometa.client import REST, APIError, ClientConfig
from metadata.ingestion.ometa.mixins.custom_property_mixin import (
@ -268,7 +267,11 @@ class OpenMetadata(
)
fn = getattr(self.client, method)
resp = fn(self.get_suffix(entity), data=data.model_dump_json())
resp = fn(
# this might be a regular pydantic model so we build the context manually
self.get_suffix(entity),
data=data.model_dump_json(context={"mask_secrets": False}),
)
if not resp:
raise EmptyPayloadException(
f"Got an empty response when trying to PUT to {self.get_suffix(entity)}, {data.model_dump_json()}"

View File

@ -111,6 +111,12 @@ class WorkflowStatusMixin:
pipeline_status.status = (
ingestion_status if ingestion_status else pipeline_status.status
)
# committing configurations can be a burden on resources,
# we dump a subset to be mindful of the payload size
pipeline_status.config = self.config.model_dump(
include={"appConfig"},
mask_secrets=True,
)
self.metadata.create_or_update_pipeline_status(
self.ingestion_pipeline.fullyQualifiedName.root, pipeline_status
)

View File

@ -104,7 +104,7 @@ def workflow(metadata, service, mysql_container):
password=mysql_container.password,
),
hostPort=f"localhost:{mysql_container.get_exposed_port(3306)}",
).model_dump_json(),
).model_dump_json(mask_secrets=False),
source_config={},
)
)

View File

@ -1,4 +1,5 @@
import uuid
from typing import List
from unittest import TestCase
from metadata.generated.schema.api.data.createTable import CreateTableRequest
@ -16,6 +17,7 @@ from metadata.generated.schema.type.basic import (
Markdown,
)
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.models.custom_pydantic import BaseModel, CustomSecretStr
class CustomPydanticValidationTest(TestCase):
@ -141,3 +143,70 @@ class CustomPydanticValidationTest(TestCase):
)
assert fetch_response_revert_separator.name.root == "test::table"
assert fetch_response_revert_separator_2.name.root == "test::table>"
class NestedModel(BaseModel):
secret: CustomSecretStr
value: int
class RootModel(BaseModel):
root_secret: CustomSecretStr
nested: NestedModel
items: List[NestedModel]
data = {
"root_secret": "root_password",
"nested": {"secret": "nested_password", "value": 42},
"items": [
{"secret": "item1_password", "value": 1},
{"secret": "item2_password", "value": 2},
],
}
model = RootModel(**data)
masked_data = model.model_dump(mask_secrets=True)
def test_model_dump_secrets():
"""Test model_dump_masked with root, nested, and list structures."""
assert masked_data["root_secret"] == "**********"
assert masked_data["nested"]["secret"] == "**********"
assert masked_data["nested"]["value"] == 42
assert masked_data["items"][0]["secret"] == "**********"
assert masked_data["items"][0]["value"] == 1
assert masked_data["items"][1]["secret"] == "**********"
assert masked_data["items"][1]["value"] == 2
plain_data = model.model_dump(mask_secrets=False)
assert plain_data["root_secret"] == "root_password"
assert plain_data["nested"]["secret"] == "nested_password"
assert plain_data["items"][0]["secret"] == "item1_password"
default_dump = model.model_dump()
assert default_dump["root_secret"] == "root_password"
assert default_dump["nested"]["secret"] == "nested_password"
assert default_dump["items"][0]["secret"] == "item1_password"
def test_model_dump_json_secrets():
assert (
model.model_validate_json(
model.model_dump_json()
).root_secret.get_secret_value()
== "**********"
)
assert (
model.model_validate_json(
model.model_dump_json(mask_secrets=True)
).root_secret.get_secret_value()
== "**********"
)
assert (
model.model_validate_json(
model.model_dump_json(mask_secrets=False)
).root_secret.get_secret_value()
== "root_password"
)

View File

@ -48,7 +48,9 @@ def application_workflow(workflow_config: OpenMetadataApplicationConfig):
set_operator_logger(workflow_config)
config = json.loads(workflow_config.model_dump_json(exclude_defaults=False))
config = json.loads(
workflow_config.model_dump_json(exclude_defaults=False, mask_secrets=False)
)
workflow = ApplicationWorkflow.create(config)
workflow.execute()

View File

@ -42,7 +42,9 @@ def auto_classification_workflow(workflow_config: OpenMetadataWorkflowConfig):
set_operator_logger(workflow_config)
config = json.loads(workflow_config.model_dump_json(exclude_defaults=False))
config = json.loads(
workflow_config.model_dump_json(exclude_defaults=False, mask_secrets=False)
)
workflow = AutoClassificationWorkflow.create(config)
workflow.execute()

View File

@ -204,7 +204,9 @@ def metadata_ingestion_workflow(workflow_config: OpenMetadataWorkflowConfig):
set_operator_logger(workflow_config)
config = json.loads(workflow_config.model_dump_json(exclude_defaults=False))
config = json.loads(
workflow_config.model_dump_json(exclude_defaults=False, mask_secrets=False)
)
workflow = MetadataWorkflow.create(config)
workflow.execute()

View File

@ -42,7 +42,9 @@ def profiler_workflow(workflow_config: OpenMetadataWorkflowConfig):
set_operator_logger(workflow_config)
config = json.loads(workflow_config.model_dump_json(exclude_defaults=False))
config = json.loads(
workflow_config.model_dump_json(exclude_defaults=False, mask_secrets=False)
)
workflow = ProfilerWorkflow.create(config)
workflow.execute()

View File

@ -42,7 +42,9 @@ def test_suite_workflow(workflow_config: OpenMetadataWorkflowConfig):
set_operator_logger(workflow_config)
config = json.loads(workflow_config.model_dump_json(exclude_defaults=False))
config = json.loads(
workflow_config.model_dump_json(exclude_defaults=False, mask_secrets=False)
)
workflow = TestSuiteWorkflow.create(config)
workflow.execute()

View File

@ -46,7 +46,9 @@ def usage_workflow(workflow_config: OpenMetadataWorkflowConfig):
set_operator_logger(workflow_config)
config = json.loads(workflow_config.model_dump_json(exclude_defaults=False))
config = json.loads(
workflow_config.model_dump_json(exclude_defaults=False, mask_secrets=False)
)
workflow = UsageWorkflow.create(config)
workflow.execute()

View File

@ -281,6 +281,19 @@ public class IngestionPipelineRepository extends EntityRepository<IngestionPipel
allPipelineStatusList.size());
}
/* Get the status of the external application by converting the configuration so that it can be
* served like an App configuration */
public ResultList<PipelineStatus> listExternalAppStatus(
String ingestionPipelineFQN, Long startTs, Long endTs) {
return listPipelineStatus(ingestionPipelineFQN, startTs, endTs)
.map(
pipelineStatus ->
pipelineStatus.withConfig(
Optional.ofNullable(pipelineStatus.getConfig().getOrDefault("appConfig", null))
.map(JsonUtils::getMap)
.orElse(null)));
}
public PipelineStatus getLatestPipelineStatus(IngestionPipeline ingestionPipeline) {
return JsonUtils.readValue(
getLatestExtensionFromTimeSeries(

View File

@ -291,7 +291,7 @@ public class AppResource extends EntityResource<App, AppRepository> {
ingestionPipelineRepository.get(
uriInfo, pipelineRef.getId(), ingestionPipelineRepository.getFields(FIELD_OWNERS));
return ingestionPipelineRepository
.listPipelineStatus(ingestionPipeline.getFullyQualifiedName(), startTs, endTs)
.listExternalAppStatus(ingestionPipeline.getFullyQualifiedName(), startTs, endTs)
.map(pipelineStatus -> convertPipelineStatus(installation, pipelineStatus));
}
throw new IllegalArgumentException("App does not have a scheduled deployment");
@ -301,7 +301,11 @@ public class AppResource extends EntityResource<App, AppRepository> {
return new AppRunRecord()
.withAppId(app.getId())
.withAppName(app.getName())
.withExecutionTime(pipelineStatus.getStartDate())
.withStartTime(pipelineStatus.getStartDate())
.withExecutionTime(
pipelineStatus.getEndDate() == null
? System.currentTimeMillis() - pipelineStatus.getStartDate()
: pipelineStatus.getEndDate() - pipelineStatus.getStartDate())
.withEndTime(pipelineStatus.getEndDate())
.withStatus(
switch (pipelineStatus.getPipelineState()) {