fix(ingest): json serializable fix (#12246)

This commit is contained in:
Aseem Bansal 2025-01-01 16:52:17 +05:30 committed by GitHub
parent 554a37a551
commit ea249caee1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 179 additions and 1 deletions

View File

@ -146,12 +146,55 @@ class DatahubIngestionRunSummaryProvider(PipelineRunListener):
aspect_value=source_info_aspect,
)
@staticmethod
def _convert_sets_to_lists(obj: Any) -> Any:
"""
Recursively converts all sets to lists in a Python object.
Works with nested dictionaries, lists, and sets.
Args:
obj: Any Python object that might contain sets
Returns:
The object with all sets converted to lists
"""
if isinstance(obj, dict):
return {
key: DatahubIngestionRunSummaryProvider._convert_sets_to_lists(value)
for key, value in obj.items()
}
elif isinstance(obj, list):
return [
DatahubIngestionRunSummaryProvider._convert_sets_to_lists(element)
for element in obj
]
elif isinstance(obj, set):
return [
DatahubIngestionRunSummaryProvider._convert_sets_to_lists(element)
for element in obj
]
elif isinstance(obj, tuple):
return tuple(
DatahubIngestionRunSummaryProvider._convert_sets_to_lists(element)
for element in obj
)
else:
return obj
def _get_recipe_to_report(self, ctx: PipelineContext) -> str:
assert ctx.pipeline_config
if not self.report_recipe or not ctx.pipeline_config.get_raw_dict():
return ""
else:
return json.dumps(redact_raw_config(ctx.pipeline_config.get_raw_dict()))
redacted_recipe = redact_raw_config(ctx.pipeline_config.get_raw_dict())
# This is required otherwise json dumps will fail
# with a TypeError: Object of type set is not JSON serializable
converted_recipe = (
DatahubIngestionRunSummaryProvider._convert_sets_to_lists(
redacted_recipe
)
)
return json.dumps(converted_recipe)
def _emit_aspect(self, entity_urn: Urn, aspect_value: _Aspect) -> None:
self.sink.write_record_async(

View File

@ -1,3 +1,5 @@
from typing import Any, Dict, List, Set, Tuple, Union
import pytest
from datahub.ingestion.reporting.datahub_ingestion_run_summary_provider import (
@ -50,3 +52,136 @@ def test_default_config():
typed_config = DatahubIngestionRunSummaryProviderConfig.parse_obj({})
assert typed_config.sink is None
assert typed_config.report_recipe is True
def test_simple_set() -> None:
"""Test conversion of a simple set"""
input_data: Set[int] = {1, 2, 3}
expected: List[int] = [1, 2, 3]
result = DatahubIngestionRunSummaryProvider._convert_sets_to_lists(input_data)
assert sorted(result) == sorted(expected)
assert isinstance(result, list)
def test_nested_dict_with_sets() -> None:
"""Test conversion of nested dictionary containing sets"""
input_data: Dict[str, Union[Set[int], Dict[str, Set[str]]]] = {
"set1": {1, 2, 3},
"dict1": {"set2": {"a", "b"}},
}
expected = {
"set1": [1, 2, 3],
"dict1": {"set2": ["a", "b"]},
}
result = DatahubIngestionRunSummaryProvider._convert_sets_to_lists(input_data)
def sort_nested_lists(d):
return {
k: (
sorted(v)
if isinstance(v, list)
else (sort_nested_lists(v) if isinstance(v, dict) else v)
)
for k, v in d.items()
}
assert sort_nested_lists(result) == sort_nested_lists(expected)
def test_nested_lists_with_sets() -> None:
"""Test conversion of nested lists containing sets"""
input_data = [{1, 2}, [{3, 4}, {5, 6}]]
expected = [[1, 2], [[3, 4], [5, 6]]]
result = DatahubIngestionRunSummaryProvider._convert_sets_to_lists(input_data)
assert [
sorted(x)
if isinstance(x, list) and len(x) > 0 and not isinstance(x[0], list)
else x
for x in result
] == [
sorted(x)
if isinstance(x, list) and len(x) > 0 and not isinstance(x[0], list)
else x
for x in expected
]
def test_tuple_with_sets() -> None:
"""Test conversion of tuples containing sets"""
input_data = (1, {2, 3}, 4)
expected = (1, [2, 3], 4)
result = DatahubIngestionRunSummaryProvider._convert_sets_to_lists(input_data)
assert (result[0], sorted(result[1]), result[2]) == (
expected[0],
sorted(expected[1]),
expected[2],
)
assert isinstance(result, tuple)
def test_mixed_nested_structure() -> None:
"""Test conversion of a complex nested structure"""
input_data = {
"simple_set": {1, 2, 3},
"nested_dict": {
"another_set": {"a", "b", "c"},
"mixed_list": [1, {2, 3}, {"x", "y"}],
},
"tuple_with_set": (1, {4, 5}, 6),
"list_of_sets": [{1, 2}, {3, 4}],
}
result = DatahubIngestionRunSummaryProvider._convert_sets_to_lists(input_data)
# Verify structure types
assert isinstance(result["simple_set"], list)
assert isinstance(result["nested_dict"]["another_set"], list)
assert isinstance(result["nested_dict"]["mixed_list"][1], list)
assert isinstance(result["nested_dict"]["mixed_list"][2], list)
assert isinstance(result["tuple_with_set"], tuple)
assert isinstance(result["tuple_with_set"][1], list)
assert isinstance(result["list_of_sets"][0], list)
def test_non_set_data() -> None:
"""Test that non-set data remains unchanged"""
input_data = {
"string": "hello",
"int": 42,
"float": 3.14,
"bool": True,
"none": None,
"list": [1, 2, 3],
"dict": {"a": 1, "b": 2},
}
result = DatahubIngestionRunSummaryProvider._convert_sets_to_lists(input_data)
assert result == input_data
def test_empty_structures() -> None:
"""Test handling of empty structures"""
input_data: Dict[
str, Union[Set[Any], Dict[Any, Any], List[Any], Tuple[Any, ...]]
] = {"empty_set": set(), "empty_dict": {}, "empty_list": [], "empty_tuple": ()}
expected: Dict[
str, Union[List[Any], Dict[Any, Any], List[Any], Tuple[Any, ...]]
] = {"empty_set": [], "empty_dict": {}, "empty_list": [], "empty_tuple": ()}
result = DatahubIngestionRunSummaryProvider._convert_sets_to_lists(input_data)
assert result == expected
def test_json_serializable() -> None:
"""Test that the converted structure is JSON serializable"""
import json
input_data = {
"set": {1, 2, 3},
"nested": {"set": {"a", "b"}},
"mixed": [1, {2, 3}, {"x"}],
}
result = DatahubIngestionRunSummaryProvider._convert_sets_to_lists(input_data)
try:
json.dumps(result)
serializable = True
except TypeError:
serializable = False
assert serializable