PoC for gzip-base64 encoding of aspect values when in transit

This commit is contained in:
Piotr Skrydalewicz 2025-04-29 15:21:56 +02:00
parent 1c1734bf41
commit 7134e8a278
4 changed files with 204 additions and 34 deletions

View File

@ -14,3 +14,4 @@ TIMESERIES_ASPECT_MAP: Dict[str, Type[_Aspect]] = {
JSON_CONTENT_TYPE = "application/json" JSON_CONTENT_TYPE = "application/json"
JSON_PATCH_CONTENT_TYPE = "application/json-patch+json" JSON_PATCH_CONTENT_TYPE = "application/json-patch+json"
GZIP_JSON_CONTENT_TYPE = "application/json+gzip"

View File

@ -1,9 +1,12 @@
import base64
import dataclasses import dataclasses
import gzip
import json import json
import os
import warnings import warnings
from typing import TYPE_CHECKING, Dict, List, Optional, Sequence, Tuple, Union from typing import TYPE_CHECKING, Dict, List, Optional, Sequence, Tuple, Union
from datahub.emitter.aspect import ASPECT_MAP, JSON_CONTENT_TYPE from datahub.emitter.aspect import ASPECT_MAP, GZIP_JSON_CONTENT_TYPE, JSON_CONTENT_TYPE
from datahub.emitter.serialization_helper import post_json_transform, pre_json_transform from datahub.emitter.serialization_helper import post_json_transform, pre_json_transform
from datahub.errors import DataHubDeprecationWarning from datahub.errors import DataHubDeprecationWarning
from datahub.metadata.schema_classes import ( from datahub.metadata.schema_classes import (
@ -23,13 +26,26 @@ if TYPE_CHECKING:
_ENTITY_TYPE_UNSET = "ENTITY_TYPE_UNSET" _ENTITY_TYPE_UNSET = "ENTITY_TYPE_UNSET"
DEFAULT_USE_GZIP = os.environ.get("DATAHUB_USE_GZIP_ENCODING", "true").lower() == "true"
def _make_generic_aspect(codegen_obj: DictWrapper) -> GenericAspectClass: GZIP_COMPRESSED_ASPECTS = {"schemaMetadata"}
def _make_generic_aspect(codegen_obj: DictWrapper, use_gzip: bool = DEFAULT_USE_GZIP) -> GenericAspectClass:
serialized = json.dumps(pre_json_transform(codegen_obj.to_obj())) serialized = json.dumps(pre_json_transform(codegen_obj.to_obj()))
return GenericAspectClass( if use_gzip:
value=serialized.encode(), # Compress the data and then base64 encode it for safe JSON serialization
contentType=JSON_CONTENT_TYPE, compressed = gzip.compress(serialized.encode())
) base64_encoded = base64.b64encode(compressed)
return GenericAspectClass(
value=base64_encoded,
contentType=GZIP_JSON_CONTENT_TYPE,
)
else:
return GenericAspectClass(
value=serialized.encode(),
contentType=JSON_CONTENT_TYPE,
)
def _try_from_generic_aspect( def _try_from_generic_aspect(
@ -43,7 +59,7 @@ def _try_from_generic_aspect(
return True, None return True, None
assert aspectName is not None, "aspectName must be set if aspect is set" assert aspectName is not None, "aspectName must be set if aspect is set"
if aspect.contentType != JSON_CONTENT_TYPE: if aspect.contentType not in [JSON_CONTENT_TYPE, GZIP_JSON_CONTENT_TYPE]:
return False, None return False, None
if aspectName not in ASPECT_MAP: if aspectName not in ASPECT_MAP:
@ -51,7 +67,22 @@ def _try_from_generic_aspect(
aspect_cls = ASPECT_MAP[aspectName] aspect_cls = ASPECT_MAP[aspectName]
serialized = aspect.value.decode() if aspect.contentType == GZIP_JSON_CONTENT_TYPE:
# TODO: can we avoid repeating check for aspect.value? According to the schema it should be always bytes...
if isinstance(aspect.value, str):
binary_data = base64.b64decode(aspect.value)
decompressed = gzip.decompress(binary_data)
serialized = decompressed.decode()
else:
decompressed = gzip.decompress(aspect.value)
serialized = decompressed.decode()
else:
# Handle standard JSON content
if isinstance(aspect.value, bytes):
serialized = aspect.value.decode()
else:
serialized = aspect.value
obj = post_json_transform(json.loads(serialized)) obj = post_json_transform(json.loads(serialized))
return True, aspect_cls.from_obj(obj) return True, aspect_cls.from_obj(obj)
@ -72,6 +103,7 @@ class MetadataChangeProposalWrapper:
aspect: Union[None, _Aspect] = None aspect: Union[None, _Aspect] = None
systemMetadata: Union[None, SystemMetadataClass] = None systemMetadata: Union[None, SystemMetadataClass] = None
headers: Union[None, Dict[str, str]] = None headers: Union[None, Dict[str, str]] = None
use_gzip: bool = DEFAULT_USE_GZIP
def __post_init__(self) -> None: def __post_init__(self) -> None:
if self.entityUrn and self.entityType == _ENTITY_TYPE_UNSET: if self.entityUrn and self.entityType == _ENTITY_TYPE_UNSET:
@ -112,9 +144,9 @@ class MetadataChangeProposalWrapper:
@classmethod @classmethod
def construct_many( def construct_many(
cls, entityUrn: str, aspects: Sequence[Optional[_Aspect]] cls, entityUrn: str, aspects: Sequence[Optional[_Aspect]], use_gzip: bool = DEFAULT_USE_GZIP
) -> List["MetadataChangeProposalWrapper"]: ) -> List["MetadataChangeProposalWrapper"]:
return [cls(entityUrn=entityUrn, aspect=aspect) for aspect in aspects if aspect] return [cls(entityUrn=entityUrn, aspect=aspect, use_gzip=use_gzip) for aspect in aspects if aspect]
def _make_mcp_without_aspects(self) -> MetadataChangeProposalClass: def _make_mcp_without_aspects(self) -> MetadataChangeProposalClass:
return MetadataChangeProposalClass( return MetadataChangeProposalClass(
@ -127,14 +159,20 @@ class MetadataChangeProposalWrapper:
headers=self.headers, headers=self.headers,
) )
def make_mcp(self) -> MetadataChangeProposalClass: def make_mcp(self, use_gzip: bool = None) -> MetadataChangeProposalClass:
if use_gzip is None:
use_gzip = self.use_gzip
serializedEntityKeyAspect: Union[None, GenericAspectClass] = None serializedEntityKeyAspect: Union[None, GenericAspectClass] = None
if isinstance(self.entityKeyAspect, DictWrapper): if isinstance(self.entityKeyAspect, DictWrapper):
serializedEntityKeyAspect = _make_generic_aspect(self.entityKeyAspect) serializedEntityKeyAspect = _make_generic_aspect(self.entityKeyAspect, False)
serializedAspect = None serializedAspect = None
if self.aspect is not None: if self.aspect is not None:
serializedAspect = _make_generic_aspect(self.aspect) # Only compress specific aspects (for testing)
aspect_name = self.aspect.get_aspect_name()
should_compress = use_gzip and aspect_name in GZIP_COMPRESSED_ASPECTS
serializedAspect = _make_generic_aspect(self.aspect, should_compress)
mcp = self._make_mcp_without_aspects() mcp = self._make_mcp_without_aspects()
mcp.entityKeyAspect = serializedEntityKeyAspect mcp.entityKeyAspect = serializedEntityKeyAspect
@ -157,19 +195,29 @@ class MetadataChangeProposalWrapper:
return False return False
return True return True
def to_obj(self, tuples: bool = False, simplified_structure: bool = False) -> dict: def to_obj(self, tuples: bool = False, simplified_structure: bool = False, use_gzip: bool = None) -> dict:
# The simplified_structure parameter is used to make the output # The simplified_structure parameter is used to make the output
# not contain nested JSON strings. Instead, it unpacks the JSON # not contain nested JSON strings. Instead, it unpacks the JSON
# string into an object. # string into an object.
obj = self.make_mcp().to_obj(tuples=tuples) obj = self.make_mcp(use_gzip=use_gzip).to_obj(tuples=tuples)
if simplified_structure: if simplified_structure:
# Undo the double JSON serialization that happens in the MCP aspect. # Undo the double JSON serialization that happens in the MCP aspect.
if ( if obj.get("aspect"):
obj.get("aspect") content_type = obj["aspect"].get("contentType")
and obj["aspect"].get("contentType") == JSON_CONTENT_TYPE if content_type == JSON_CONTENT_TYPE:
): obj["aspect"] = {"json": json.loads(obj["aspect"]["value"])}
obj["aspect"] = {"json": json.loads(obj["aspect"]["value"])} elif content_type == GZIP_JSON_CONTENT_TYPE:
# For gzipped content, we need to base64 decode, decompress, then load
# the value should be a base64 encoded string
value = obj["aspect"]["value"]
if isinstance(value, str):
binary_data = base64.b64decode(value)
decompressed = gzip.decompress(binary_data)
obj["aspect"] = {"json": json.loads(decompressed.decode('utf-8'))}
else:
decompressed = gzip.decompress(value)
obj["aspect"] = {"json": json.loads(decompressed.decode('utf-8'))}
return obj return obj
@classmethod @classmethod
@ -185,10 +233,21 @@ class MetadataChangeProposalWrapper:
# Redo the double JSON serialization so that the rest of deserialization # Redo the double JSON serialization so that the rest of deserialization
# routine works. # routine works.
if obj.get("aspect") and obj["aspect"].get("json"): if obj.get("aspect") and obj["aspect"].get("json"):
obj["aspect"] = { content_type = obj["aspect"].get("contentType", JSON_CONTENT_TYPE)
"contentType": JSON_CONTENT_TYPE,
"value": json.dumps(obj["aspect"]["json"]), if content_type == GZIP_JSON_CONTENT_TYPE:
} json_str = json.dumps(obj["aspect"]["json"])
compressed = gzip.compress(json_str.encode('utf-8'))
base64_encoded = base64.b64encode(compressed).decode('ascii')
obj["aspect"] = {
"contentType": GZIP_JSON_CONTENT_TYPE,
"value": base64_encoded,
}
else:
obj["aspect"] = {
"contentType": JSON_CONTENT_TYPE,
"value": json.dumps(obj["aspect"]["json"]),
}
mcpc = MetadataChangeProposalClass.from_obj(obj, tuples=tuples) mcpc = MetadataChangeProposalClass.from_obj(obj, tuples=tuples)
@ -196,12 +255,16 @@ class MetadataChangeProposalWrapper:
if mcpc.entityKeyAspect is not None: if mcpc.entityKeyAspect is not None:
return mcpc return mcpc
# Try to deserialize the aspect. if obj.get("aspect") and obj["aspect"].get("contentType") == GZIP_JSON_CONTENT_TYPE:
return cls.try_from_mcpc(mcpc) or mcpc use_gzip = True
else:
use_gzip = DEFAULT_USE_GZIP
return cls.try_from_mcpc(mcpc, use_gzip=use_gzip) or mcpc
@classmethod @classmethod
def try_from_mcpc( def try_from_mcpc(
cls, mcpc: MetadataChangeProposalClass cls, mcpc: MetadataChangeProposalClass, use_gzip: bool = DEFAULT_USE_GZIP
) -> Optional["MetadataChangeProposalWrapper"]: ) -> Optional["MetadataChangeProposalWrapper"]:
"""Attempts to create a MetadataChangeProposalWrapper from a MetadataChangeProposalClass. """Attempts to create a MetadataChangeProposalWrapper from a MetadataChangeProposalClass.
Neatly handles unsupported, expected cases, such as unknown aspect types or non-json content type. Neatly handles unsupported, expected cases, such as unknown aspect types or non-json content type.
@ -216,6 +279,12 @@ class MetadataChangeProposalWrapper:
converted, aspect = _try_from_generic_aspect(mcpc.aspectName, mcpc.aspect) converted, aspect = _try_from_generic_aspect(mcpc.aspectName, mcpc.aspect)
if converted: if converted:
# Determine if the source was using gzip based on content type
if mcpc.aspect and mcpc.aspect.contentType == GZIP_JSON_CONTENT_TYPE:
inferred_use_gzip = True
else:
inferred_use_gzip = use_gzip
return cls( return cls(
entityType=mcpc.entityType, entityType=mcpc.entityType,
entityUrn=mcpc.entityUrn, entityUrn=mcpc.entityUrn,
@ -225,13 +294,14 @@ class MetadataChangeProposalWrapper:
aspect=aspect, aspect=aspect,
systemMetadata=mcpc.systemMetadata, systemMetadata=mcpc.systemMetadata,
headers=mcpc.headers, headers=mcpc.headers,
use_gzip=inferred_use_gzip,
) )
else: else:
return None return None
@classmethod @classmethod
def try_from_mcl( def try_from_mcl(
cls, mcl: MetadataChangeLogClass cls, mcl: MetadataChangeLogClass, use_gzip: bool = DEFAULT_USE_GZIP
) -> Union["MetadataChangeProposalWrapper", MetadataChangeProposalClass]: ) -> Union["MetadataChangeProposalWrapper", MetadataChangeProposalClass]:
mcpc = MetadataChangeProposalClass( mcpc = MetadataChangeProposalClass(
entityUrn=mcl.entityUrn, entityUrn=mcl.entityUrn,
@ -244,7 +314,7 @@ class MetadataChangeProposalWrapper:
systemMetadata=mcl.systemMetadata, systemMetadata=mcl.systemMetadata,
headers=mcl.headers, headers=mcl.headers,
) )
return cls.try_from_mcpc(mcpc) or mcpc return cls.try_from_mcpc(mcpc, use_gzip=use_gzip) or mcpc
@classmethod @classmethod
def from_obj_require_wrapper( def from_obj_require_wrapper(

View File

@ -35,7 +35,7 @@ import lombok.extern.slf4j.Slf4j;
@Accessors(chain = true) @Accessors(chain = true)
public class IgnoreUnknownMutator extends MutationHook { public class IgnoreUnknownMutator extends MutationHook {
private static final Set<String> SUPPORTED_MIME_TYPES = private static final Set<String> SUPPORTED_MIME_TYPES =
Set.of("application/json", "application/json-patch+json"); Set.of("application/json", "application/json-patch+json", "application/json+gzip");
private static final Set<ChangeType> MUTATION_TYPES = private static final Set<ChangeType> MUTATION_TYPES =
Set.of(CREATE, CREATE_ENTITY, UPSERT, UPDATE); Set.of(CREATE, CREATE_ENTITY, UPSERT, UPDATE);

View File

@ -28,19 +28,26 @@ import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.mxe.GenericAspect; import com.linkedin.mxe.GenericAspect;
import com.linkedin.mxe.GenericPayload; import com.linkedin.mxe.GenericPayload;
import jakarta.json.JsonPatch; import jakarta.json.JsonPatch;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.lang.reflect.Constructor; import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException; import java.lang.reflect.InvocationTargetException;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Base64;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
import javax.annotation.Nonnull; import javax.annotation.Nonnull;
public class GenericRecordUtils { public class GenericRecordUtils {
public static final String JSON = "application/json"; public static final String JSON = "application/json";
public static final String JSON_PATCH = "application/json-patch+json"; public static final String JSON_PATCH = "application/json-patch+json";
public static final String JSON_GZIP = "application/json+gzip";
private GenericRecordUtils() {} private GenericRecordUtils() {}
@ -69,24 +76,73 @@ public class GenericRecordUtils {
return deserializeAspect(aspectValue, contentType, aspectSpec.getDataTemplateClass()); return deserializeAspect(aspectValue, contentType, aspectSpec.getDataTemplateClass());
} }
/**
* Decompresses and decodes base64 gzipped data.
*
* @param base64GzippedData The base64 encoded, gzipped data
* @return The decompressed string
* @throws IOException if decompression fails
*/
private static String decompressGzippedBase64String(String base64GzippedData) throws IOException {
// Decode from Base64
byte[] decodedBytes = Base64.getDecoder().decode(base64GzippedData);
// Set up the GZIP streams
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(decodedBytes);
GZIPInputStream gzipInputStream = new GZIPInputStream(byteArrayInputStream);
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
// Decompress the data
byte[] buffer = new byte[1024];
int len;
while ((len = gzipInputStream.read(buffer)) > 0) {
byteArrayOutputStream.write(buffer, 0, len);
}
// Clean up
gzipInputStream.close();
byteArrayOutputStream.close();
// Return the decompressed string
return byteArrayOutputStream.toString(StandardCharsets.UTF_8.name());
}
@Nonnull @Nonnull
public static <T extends RecordTemplate> T deserializeAspect( public static <T extends RecordTemplate> T deserializeAspect(
@Nonnull ByteString aspectValue, @Nonnull String contentType, @Nonnull Class<T> clazz) { @Nonnull ByteString aspectValue, @Nonnull String contentType, @Nonnull Class<T> clazz) {
if (!contentType.equals(JSON)) { if (contentType.equals(JSON)) {
return RecordUtils.toRecordTemplate(clazz, aspectValue.asString(StandardCharsets.UTF_8));
} else if (contentType.equals(JSON_GZIP)) {
try {
// For gzipped content, we first need to base64 decode and then decompress
String decompressedJson = decompressGzippedBase64String(aspectValue.asString(StandardCharsets.UTF_8));
return RecordUtils.toRecordTemplate(clazz, decompressedJson);
} catch (IOException e) {
throw new IllegalArgumentException("Failed to decompress gzipped aspect value", e);
}
} else {
throw new IllegalArgumentException( throw new IllegalArgumentException(
String.format("%s content type is not supported", contentType)); String.format("%s content type is not supported", contentType));
} }
return RecordUtils.toRecordTemplate(clazz, aspectValue.asString(StandardCharsets.UTF_8));
} }
@Nonnull @Nonnull
public static <T extends RecordTemplate> T deserializePayload( public static <T extends RecordTemplate> T deserializePayload(
@Nonnull ByteString payloadValue, @Nonnull String contentType, @Nonnull Class<T> clazz) { @Nonnull ByteString payloadValue, @Nonnull String contentType, @Nonnull Class<T> clazz) {
if (!contentType.equals(JSON)) { if (contentType.equals(JSON)) {
return RecordUtils.toRecordTemplate(clazz, payloadValue.asString(StandardCharsets.UTF_8));
} else if (contentType.equals(JSON_GZIP)) {
try {
// For gzipped content, we first need to base64 decode and then decompress
String decompressedJson = decompressGzippedBase64String(payloadValue.asString(StandardCharsets.UTF_8));
return RecordUtils.toRecordTemplate(clazz, decompressedJson);
} catch (IOException e) {
throw new IllegalArgumentException("Failed to decompress gzipped payload value", e);
}
} else {
throw new IllegalArgumentException( throw new IllegalArgumentException(
String.format("%s content type is not supported", contentType)); String.format("%s content type is not supported", contentType));
} }
return RecordUtils.toRecordTemplate(clazz, payloadValue.asString(StandardCharsets.UTF_8));
} }
@Nonnull @Nonnull
@ -95,6 +151,24 @@ public class GenericRecordUtils {
return deserializePayload(payloadValue, JSON, clazz); return deserializePayload(payloadValue, JSON, clazz);
} }
/**
* Compresses and base64-encodes a string.
*
* @param uncompressedData The string to compress
* @return Base64-encoded compressed data
* @throws IOException if compression fails
*/
private static String compressAndBase64EncodeString(String uncompressedData) throws IOException {
// Compress the data
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
GZIPOutputStream gzipOutputStream = new GZIPOutputStream(byteArrayOutputStream);
gzipOutputStream.write(uncompressedData.getBytes(StandardCharsets.UTF_8));
gzipOutputStream.close();
// Base64 encode the compressed data
return Base64.getEncoder().encodeToString(byteArrayOutputStream.toByteArray());
}
@Nonnull @Nonnull
public static GenericAspect serializeAspect(@Nonnull RecordTemplate aspect) { public static GenericAspect serializeAspect(@Nonnull RecordTemplate aspect) {
return serializeAspect(RecordUtils.toJsonString(aspect)); return serializeAspect(RecordUtils.toJsonString(aspect));
@ -107,6 +181,26 @@ public class GenericRecordUtils {
genericAspect.setContentType(GenericRecordUtils.JSON); genericAspect.setContentType(GenericRecordUtils.JSON);
return genericAspect; return genericAspect;
} }
@Nonnull
public static GenericAspect serializeGzippedAspect(@Nonnull RecordTemplate aspect) {
return serializeGzippedAspect(RecordUtils.toJsonString(aspect));
}
@Nonnull
public static GenericAspect serializeGzippedAspect(@Nonnull String str) {
GenericAspect genericAspect = new GenericAspect();
try {
String compressedBase64 = compressAndBase64EncodeString(str);
genericAspect.setValue(ByteString.unsafeWrap(compressedBase64.getBytes(StandardCharsets.UTF_8)));
genericAspect.setContentType(GenericRecordUtils.JSON_GZIP);
} catch (IOException e) {
// Fallback to standard JSON if compression fails
genericAspect.setValue(ByteString.unsafeWrap(str.getBytes(StandardCharsets.UTF_8)));
genericAspect.setContentType(GenericRecordUtils.JSON);
}
return genericAspect;
}
@Nonnull @Nonnull
public static GenericAspect serializeAspect(@Nonnull JsonNode json) { public static GenericAspect serializeAspect(@Nonnull JsonNode json) {
@ -115,6 +209,11 @@ public class GenericRecordUtils {
genericAspect.setContentType(GenericRecordUtils.JSON); genericAspect.setContentType(GenericRecordUtils.JSON);
return genericAspect; return genericAspect;
} }
@Nonnull
public static GenericAspect serializeGzippedAspect(@Nonnull JsonNode json) {
return serializeGzippedAspect(json.toString());
}
@Nonnull @Nonnull
public static GenericAspect serializePatch(@Nonnull JsonPatch jsonPatch) { public static GenericAspect serializePatch(@Nonnull JsonPatch jsonPatch) {