From 7134e8a27855238c7fb0e86fb8de764d15e8766a Mon Sep 17 00:00:00 2001 From: Piotr Skrydalewicz Date: Tue, 29 Apr 2025 15:21:56 +0200 Subject: [PATCH] PoC for gzip-base64 encoding of aspect values when in transit --- .../src/datahub/emitter/aspect.py | 1 + metadata-ingestion/src/datahub/emitter/mcp.py | 128 ++++++++++++++---- .../aspect/hooks/IgnoreUnknownMutator.java | 2 +- .../metadata/utils/GenericRecordUtils.java | 107 ++++++++++++++- 4 files changed, 204 insertions(+), 34 deletions(-) diff --git a/metadata-ingestion/src/datahub/emitter/aspect.py b/metadata-ingestion/src/datahub/emitter/aspect.py index 0be2b33369..094006b4f7 100644 --- a/metadata-ingestion/src/datahub/emitter/aspect.py +++ b/metadata-ingestion/src/datahub/emitter/aspect.py @@ -14,3 +14,4 @@ TIMESERIES_ASPECT_MAP: Dict[str, Type[_Aspect]] = { JSON_CONTENT_TYPE = "application/json" JSON_PATCH_CONTENT_TYPE = "application/json-patch+json" +GZIP_JSON_CONTENT_TYPE = "application/json+gzip" diff --git a/metadata-ingestion/src/datahub/emitter/mcp.py b/metadata-ingestion/src/datahub/emitter/mcp.py index 12ca39d136..d3e802da72 100644 --- a/metadata-ingestion/src/datahub/emitter/mcp.py +++ b/metadata-ingestion/src/datahub/emitter/mcp.py @@ -1,9 +1,12 @@ +import base64 import dataclasses +import gzip import json +import os import warnings from typing import TYPE_CHECKING, Dict, List, Optional, Sequence, Tuple, Union -from datahub.emitter.aspect import ASPECT_MAP, JSON_CONTENT_TYPE +from datahub.emitter.aspect import ASPECT_MAP, GZIP_JSON_CONTENT_TYPE, JSON_CONTENT_TYPE from datahub.emitter.serialization_helper import post_json_transform, pre_json_transform from datahub.errors import DataHubDeprecationWarning from datahub.metadata.schema_classes import ( @@ -23,13 +26,26 @@ if TYPE_CHECKING: _ENTITY_TYPE_UNSET = "ENTITY_TYPE_UNSET" +DEFAULT_USE_GZIP = os.environ.get("DATAHUB_USE_GZIP_ENCODING", "true").lower() == "true" -def _make_generic_aspect(codegen_obj: DictWrapper) -> GenericAspectClass: +GZIP_COMPRESSED_ASPECTS = {"schemaMetadata"} + + +def _make_generic_aspect(codegen_obj: DictWrapper, use_gzip: bool = DEFAULT_USE_GZIP) -> GenericAspectClass: serialized = json.dumps(pre_json_transform(codegen_obj.to_obj())) - return GenericAspectClass( - value=serialized.encode(), - contentType=JSON_CONTENT_TYPE, - ) + if use_gzip: + # Compress the data and then base64 encode it for safe JSON serialization + compressed = gzip.compress(serialized.encode()) + base64_encoded = base64.b64encode(compressed) + return GenericAspectClass( + value=base64_encoded, + contentType=GZIP_JSON_CONTENT_TYPE, + ) + else: + return GenericAspectClass( + value=serialized.encode(), + contentType=JSON_CONTENT_TYPE, + ) def _try_from_generic_aspect( @@ -43,7 +59,7 @@ def _try_from_generic_aspect( return True, None assert aspectName is not None, "aspectName must be set if aspect is set" - if aspect.contentType != JSON_CONTENT_TYPE: + if aspect.contentType not in [JSON_CONTENT_TYPE, GZIP_JSON_CONTENT_TYPE]: return False, None if aspectName not in ASPECT_MAP: @@ -51,7 +67,22 @@ def _try_from_generic_aspect( aspect_cls = ASPECT_MAP[aspectName] - serialized = aspect.value.decode() + if aspect.contentType == GZIP_JSON_CONTENT_TYPE: + # TODO: can we avoid repeating check for aspect.value? According to the schema it should be always bytes... + if isinstance(aspect.value, str): + binary_data = base64.b64decode(aspect.value) + decompressed = gzip.decompress(binary_data) + serialized = decompressed.decode() + else: + decompressed = gzip.decompress(aspect.value) + serialized = decompressed.decode() + else: + # Handle standard JSON content + if isinstance(aspect.value, bytes): + serialized = aspect.value.decode() + else: + serialized = aspect.value + obj = post_json_transform(json.loads(serialized)) return True, aspect_cls.from_obj(obj) @@ -72,6 +103,7 @@ class MetadataChangeProposalWrapper: aspect: Union[None, _Aspect] = None systemMetadata: Union[None, SystemMetadataClass] = None headers: Union[None, Dict[str, str]] = None + use_gzip: bool = DEFAULT_USE_GZIP def __post_init__(self) -> None: if self.entityUrn and self.entityType == _ENTITY_TYPE_UNSET: @@ -112,9 +144,9 @@ class MetadataChangeProposalWrapper: @classmethod def construct_many( - cls, entityUrn: str, aspects: Sequence[Optional[_Aspect]] + cls, entityUrn: str, aspects: Sequence[Optional[_Aspect]], use_gzip: bool = DEFAULT_USE_GZIP ) -> List["MetadataChangeProposalWrapper"]: - return [cls(entityUrn=entityUrn, aspect=aspect) for aspect in aspects if aspect] + return [cls(entityUrn=entityUrn, aspect=aspect, use_gzip=use_gzip) for aspect in aspects if aspect] def _make_mcp_without_aspects(self) -> MetadataChangeProposalClass: return MetadataChangeProposalClass( @@ -127,14 +159,20 @@ class MetadataChangeProposalWrapper: headers=self.headers, ) - def make_mcp(self) -> MetadataChangeProposalClass: + def make_mcp(self, use_gzip: bool = None) -> MetadataChangeProposalClass: + if use_gzip is None: + use_gzip = self.use_gzip + serializedEntityKeyAspect: Union[None, GenericAspectClass] = None if isinstance(self.entityKeyAspect, DictWrapper): - serializedEntityKeyAspect = _make_generic_aspect(self.entityKeyAspect) + serializedEntityKeyAspect = _make_generic_aspect(self.entityKeyAspect, False) serializedAspect = None if self.aspect is not None: - serializedAspect = _make_generic_aspect(self.aspect) + # Only compress specific aspects (for testing) + aspect_name = self.aspect.get_aspect_name() + should_compress = use_gzip and aspect_name in GZIP_COMPRESSED_ASPECTS + serializedAspect = _make_generic_aspect(self.aspect, should_compress) mcp = self._make_mcp_without_aspects() mcp.entityKeyAspect = serializedEntityKeyAspect @@ -157,19 +195,29 @@ class MetadataChangeProposalWrapper: return False return True - def to_obj(self, tuples: bool = False, simplified_structure: bool = False) -> dict: + def to_obj(self, tuples: bool = False, simplified_structure: bool = False, use_gzip: bool = None) -> dict: # The simplified_structure parameter is used to make the output # not contain nested JSON strings. Instead, it unpacks the JSON # string into an object. - obj = self.make_mcp().to_obj(tuples=tuples) + obj = self.make_mcp(use_gzip=use_gzip).to_obj(tuples=tuples) if simplified_structure: # Undo the double JSON serialization that happens in the MCP aspect. - if ( - obj.get("aspect") - and obj["aspect"].get("contentType") == JSON_CONTENT_TYPE - ): - obj["aspect"] = {"json": json.loads(obj["aspect"]["value"])} + if obj.get("aspect"): + content_type = obj["aspect"].get("contentType") + if content_type == JSON_CONTENT_TYPE: + obj["aspect"] = {"json": json.loads(obj["aspect"]["value"])} + elif content_type == GZIP_JSON_CONTENT_TYPE: + # For gzipped content, we need to base64 decode, decompress, then load + # the value should be a base64 encoded string + value = obj["aspect"]["value"] + if isinstance(value, str): + binary_data = base64.b64decode(value) + decompressed = gzip.decompress(binary_data) + obj["aspect"] = {"json": json.loads(decompressed.decode('utf-8'))} + else: + decompressed = gzip.decompress(value) + obj["aspect"] = {"json": json.loads(decompressed.decode('utf-8'))} return obj @classmethod @@ -185,10 +233,21 @@ class MetadataChangeProposalWrapper: # Redo the double JSON serialization so that the rest of deserialization # routine works. if obj.get("aspect") and obj["aspect"].get("json"): - obj["aspect"] = { - "contentType": JSON_CONTENT_TYPE, - "value": json.dumps(obj["aspect"]["json"]), - } + content_type = obj["aspect"].get("contentType", JSON_CONTENT_TYPE) + + if content_type == GZIP_JSON_CONTENT_TYPE: + json_str = json.dumps(obj["aspect"]["json"]) + compressed = gzip.compress(json_str.encode('utf-8')) + base64_encoded = base64.b64encode(compressed).decode('ascii') + obj["aspect"] = { + "contentType": GZIP_JSON_CONTENT_TYPE, + "value": base64_encoded, + } + else: + obj["aspect"] = { + "contentType": JSON_CONTENT_TYPE, + "value": json.dumps(obj["aspect"]["json"]), + } mcpc = MetadataChangeProposalClass.from_obj(obj, tuples=tuples) @@ -196,12 +255,16 @@ class MetadataChangeProposalWrapper: if mcpc.entityKeyAspect is not None: return mcpc - # Try to deserialize the aspect. - return cls.try_from_mcpc(mcpc) or mcpc + if obj.get("aspect") and obj["aspect"].get("contentType") == GZIP_JSON_CONTENT_TYPE: + use_gzip = True + else: + use_gzip = DEFAULT_USE_GZIP + + return cls.try_from_mcpc(mcpc, use_gzip=use_gzip) or mcpc @classmethod def try_from_mcpc( - cls, mcpc: MetadataChangeProposalClass + cls, mcpc: MetadataChangeProposalClass, use_gzip: bool = DEFAULT_USE_GZIP ) -> Optional["MetadataChangeProposalWrapper"]: """Attempts to create a MetadataChangeProposalWrapper from a MetadataChangeProposalClass. Neatly handles unsupported, expected cases, such as unknown aspect types or non-json content type. @@ -216,6 +279,12 @@ class MetadataChangeProposalWrapper: converted, aspect = _try_from_generic_aspect(mcpc.aspectName, mcpc.aspect) if converted: + # Determine if the source was using gzip based on content type + if mcpc.aspect and mcpc.aspect.contentType == GZIP_JSON_CONTENT_TYPE: + inferred_use_gzip = True + else: + inferred_use_gzip = use_gzip + return cls( entityType=mcpc.entityType, entityUrn=mcpc.entityUrn, @@ -225,13 +294,14 @@ class MetadataChangeProposalWrapper: aspect=aspect, systemMetadata=mcpc.systemMetadata, headers=mcpc.headers, + use_gzip=inferred_use_gzip, ) else: return None @classmethod def try_from_mcl( - cls, mcl: MetadataChangeLogClass + cls, mcl: MetadataChangeLogClass, use_gzip: bool = DEFAULT_USE_GZIP ) -> Union["MetadataChangeProposalWrapper", MetadataChangeProposalClass]: mcpc = MetadataChangeProposalClass( entityUrn=mcl.entityUrn, @@ -244,7 +314,7 @@ class MetadataChangeProposalWrapper: systemMetadata=mcl.systemMetadata, headers=mcl.headers, ) - return cls.try_from_mcpc(mcpc) or mcpc + return cls.try_from_mcpc(mcpc, use_gzip=use_gzip) or mcpc @classmethod def from_obj_require_wrapper( diff --git a/metadata-io/src/main/java/com/linkedin/metadata/aspect/hooks/IgnoreUnknownMutator.java b/metadata-io/src/main/java/com/linkedin/metadata/aspect/hooks/IgnoreUnknownMutator.java index f5cc421042..f89b821f09 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/aspect/hooks/IgnoreUnknownMutator.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/aspect/hooks/IgnoreUnknownMutator.java @@ -35,7 +35,7 @@ import lombok.extern.slf4j.Slf4j; @Accessors(chain = true) public class IgnoreUnknownMutator extends MutationHook { private static final Set SUPPORTED_MIME_TYPES = - Set.of("application/json", "application/json-patch+json"); + Set.of("application/json", "application/json-patch+json", "application/json+gzip"); private static final Set MUTATION_TYPES = Set.of(CREATE, CREATE_ENTITY, UPSERT, UPDATE); diff --git a/metadata-utils/src/main/java/com/linkedin/metadata/utils/GenericRecordUtils.java b/metadata-utils/src/main/java/com/linkedin/metadata/utils/GenericRecordUtils.java index 724a66f7fe..f05b6853b0 100644 --- a/metadata-utils/src/main/java/com/linkedin/metadata/utils/GenericRecordUtils.java +++ b/metadata-utils/src/main/java/com/linkedin/metadata/utils/GenericRecordUtils.java @@ -28,19 +28,26 @@ import com.linkedin.metadata.models.registry.EntityRegistry; import com.linkedin.mxe.GenericAspect; import com.linkedin.mxe.GenericPayload; import jakarta.json.JsonPatch; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Base64; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.stream.Collectors; +import java.util.zip.GZIPInputStream; +import java.util.zip.GZIPOutputStream; import javax.annotation.Nonnull; public class GenericRecordUtils { public static final String JSON = "application/json"; public static final String JSON_PATCH = "application/json-patch+json"; + public static final String JSON_GZIP = "application/json+gzip"; private GenericRecordUtils() {} @@ -69,24 +76,73 @@ public class GenericRecordUtils { return deserializeAspect(aspectValue, contentType, aspectSpec.getDataTemplateClass()); } + /** + * Decompresses and decodes base64 gzipped data. + * + * @param base64GzippedData The base64 encoded, gzipped data + * @return The decompressed string + * @throws IOException if decompression fails + */ + private static String decompressGzippedBase64String(String base64GzippedData) throws IOException { + // Decode from Base64 + byte[] decodedBytes = Base64.getDecoder().decode(base64GzippedData); + + // Set up the GZIP streams + ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(decodedBytes); + GZIPInputStream gzipInputStream = new GZIPInputStream(byteArrayInputStream); + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + + // Decompress the data + byte[] buffer = new byte[1024]; + int len; + while ((len = gzipInputStream.read(buffer)) > 0) { + byteArrayOutputStream.write(buffer, 0, len); + } + + // Clean up + gzipInputStream.close(); + byteArrayOutputStream.close(); + + // Return the decompressed string + return byteArrayOutputStream.toString(StandardCharsets.UTF_8.name()); + } + @Nonnull public static T deserializeAspect( @Nonnull ByteString aspectValue, @Nonnull String contentType, @Nonnull Class clazz) { - if (!contentType.equals(JSON)) { + if (contentType.equals(JSON)) { + return RecordUtils.toRecordTemplate(clazz, aspectValue.asString(StandardCharsets.UTF_8)); + } else if (contentType.equals(JSON_GZIP)) { + try { + // For gzipped content, we first need to base64 decode and then decompress + String decompressedJson = decompressGzippedBase64String(aspectValue.asString(StandardCharsets.UTF_8)); + return RecordUtils.toRecordTemplate(clazz, decompressedJson); + } catch (IOException e) { + throw new IllegalArgumentException("Failed to decompress gzipped aspect value", e); + } + } else { throw new IllegalArgumentException( String.format("%s content type is not supported", contentType)); } - return RecordUtils.toRecordTemplate(clazz, aspectValue.asString(StandardCharsets.UTF_8)); } @Nonnull public static T deserializePayload( @Nonnull ByteString payloadValue, @Nonnull String contentType, @Nonnull Class clazz) { - if (!contentType.equals(JSON)) { + if (contentType.equals(JSON)) { + return RecordUtils.toRecordTemplate(clazz, payloadValue.asString(StandardCharsets.UTF_8)); + } else if (contentType.equals(JSON_GZIP)) { + try { + // For gzipped content, we first need to base64 decode and then decompress + String decompressedJson = decompressGzippedBase64String(payloadValue.asString(StandardCharsets.UTF_8)); + return RecordUtils.toRecordTemplate(clazz, decompressedJson); + } catch (IOException e) { + throw new IllegalArgumentException("Failed to decompress gzipped payload value", e); + } + } else { throw new IllegalArgumentException( String.format("%s content type is not supported", contentType)); } - return RecordUtils.toRecordTemplate(clazz, payloadValue.asString(StandardCharsets.UTF_8)); } @Nonnull @@ -95,6 +151,24 @@ public class GenericRecordUtils { return deserializePayload(payloadValue, JSON, clazz); } + /** + * Compresses and base64-encodes a string. + * + * @param uncompressedData The string to compress + * @return Base64-encoded compressed data + * @throws IOException if compression fails + */ + private static String compressAndBase64EncodeString(String uncompressedData) throws IOException { + // Compress the data + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + GZIPOutputStream gzipOutputStream = new GZIPOutputStream(byteArrayOutputStream); + gzipOutputStream.write(uncompressedData.getBytes(StandardCharsets.UTF_8)); + gzipOutputStream.close(); + + // Base64 encode the compressed data + return Base64.getEncoder().encodeToString(byteArrayOutputStream.toByteArray()); + } + @Nonnull public static GenericAspect serializeAspect(@Nonnull RecordTemplate aspect) { return serializeAspect(RecordUtils.toJsonString(aspect)); @@ -107,6 +181,26 @@ public class GenericRecordUtils { genericAspect.setContentType(GenericRecordUtils.JSON); return genericAspect; } + + @Nonnull + public static GenericAspect serializeGzippedAspect(@Nonnull RecordTemplate aspect) { + return serializeGzippedAspect(RecordUtils.toJsonString(aspect)); + } + + @Nonnull + public static GenericAspect serializeGzippedAspect(@Nonnull String str) { + GenericAspect genericAspect = new GenericAspect(); + try { + String compressedBase64 = compressAndBase64EncodeString(str); + genericAspect.setValue(ByteString.unsafeWrap(compressedBase64.getBytes(StandardCharsets.UTF_8))); + genericAspect.setContentType(GenericRecordUtils.JSON_GZIP); + } catch (IOException e) { + // Fallback to standard JSON if compression fails + genericAspect.setValue(ByteString.unsafeWrap(str.getBytes(StandardCharsets.UTF_8))); + genericAspect.setContentType(GenericRecordUtils.JSON); + } + return genericAspect; + } @Nonnull public static GenericAspect serializeAspect(@Nonnull JsonNode json) { @@ -115,6 +209,11 @@ public class GenericRecordUtils { genericAspect.setContentType(GenericRecordUtils.JSON); return genericAspect; } + + @Nonnull + public static GenericAspect serializeGzippedAspect(@Nonnull JsonNode json) { + return serializeGzippedAspect(json.toString()); + } @Nonnull public static GenericAspect serializePatch(@Nonnull JsonPatch jsonPatch) {