Fix corner cases and error handling with filters conversion (#6376)

This commit is contained in:
Silvano Cerza 2023-11-21 18:22:48 +01:00 committed by GitHub
parent 456902235a
commit 76165d024f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 47 additions and 13 deletions

View File

@ -330,6 +330,10 @@ def convert(filters: Dict[str, Any]) -> Dict[str, Any]:
} }
``` ```
""" """
if not isinstance(filters, dict):
msg = f"Can't convert filters from type '{type(filters)}'"
raise ValueError(msg)
converted = _internal_convert(filters) converted = _internal_convert(filters)
if "conditions" not in converted: if "conditions" not in converted:
# This is done to handle a corner case when filter is really simple like so: # This is done to handle a corner case when filter is really simple like so:
@ -354,23 +358,35 @@ def _internal_convert(filters: Union[List[Any], Dict[str, Any]], previous_key=No
return _handle_non_dict(filters, previous_key) return _handle_non_dict(filters, previous_key)
for key, value in filters.items(): for key, value in filters.items():
if key not in ALL_OPERATORS: if (
previous_key is not None
and previous_key not in ALL_LEGACY_OPERATORS_MAPPING
and key not in ALL_LEGACY_OPERATORS_MAPPING
):
msg = f"This filter ({filters}) seems to be malformed."
raise FilterError(msg)
if key not in ALL_LEGACY_OPERATORS_MAPPING:
converted = _internal_convert(value, previous_key=key) converted = _internal_convert(value, previous_key=key)
if isinstance(converted, list): if isinstance(converted, list):
conditions.extend(converted) conditions.extend(converted)
else: else:
conditions.append(converted) conditions.append(converted)
elif key in LOGIC_OPERATORS: elif key in LEGACY_LOGICAL_OPERATORS_MAPPING:
if previous_key not in ALL_OPERATORS and isinstance(value, list): if previous_key not in ALL_LEGACY_OPERATORS_MAPPING and isinstance(value, list):
converted = [_internal_convert({previous_key: v}) for v in value] converted = [_internal_convert({previous_key: v}) for v in value]
conditions.append({"operator": ALL_OPERATORS[key], "conditions": converted}) conditions.append({"operator": ALL_LEGACY_OPERATORS_MAPPING[key], "conditions": converted})
else: else:
converted = _internal_convert(value, previous_key=key) converted = _internal_convert(value, previous_key=key)
if key == "$not" and type(converted) not in [dict, list]:
# This handles a corner when '$not' is used like this:
# '{"page": {"$not": 102}}'
# Without this check we would miss the implicit '$eq'
converted = {"field": previous_key, "operator": "==", "value": value}
if not isinstance(converted, list): if not isinstance(converted, list):
converted = [converted] converted = [converted]
conditions.append({"operator": ALL_OPERATORS[key], "conditions": converted}) conditions.append({"operator": ALL_LEGACY_OPERATORS_MAPPING[key], "conditions": converted})
elif key in COMPARISON_OPERATORS: elif key in LEGACY_COMPARISON_OPERATORS_MAPPING:
conditions.append({"field": previous_key, "operator": ALL_OPERATORS[key], "value": value}) conditions.append({"field": previous_key, "operator": ALL_LEGACY_OPERATORS_MAPPING[key], "value": value})
if len(conditions) == 1: if len(conditions) == 1:
return conditions[0] return conditions[0]
@ -382,23 +398,23 @@ def _internal_convert(filters: Union[List[Any], Dict[str, Any]], previous_key=No
def _handle_list(filters, previous_key): def _handle_list(filters, previous_key):
if previous_key in LOGIC_OPERATORS: if previous_key in LEGACY_LOGICAL_OPERATORS_MAPPING:
return [_internal_convert(f) for f in filters] return [_internal_convert(f) for f in filters]
elif previous_key not in COMPARISON_OPERATORS: elif previous_key not in LEGACY_COMPARISON_OPERATORS_MAPPING:
return {"field": previous_key, "operator": "in", "value": filters} return {"field": previous_key, "operator": "in", "value": filters}
return None return None
def _handle_non_dict(filters, previous_key): def _handle_non_dict(filters, previous_key):
if previous_key not in ALL_OPERATORS: if previous_key not in ALL_LEGACY_OPERATORS_MAPPING:
return {"field": previous_key, "operator": "==", "value": filters} return {"field": previous_key, "operator": "==", "value": filters}
return filters return filters
# Operator mappings from legacy style to new one # Operator mappings from legacy style to new one
LOGIC_OPERATORS = {"$and": "AND", "$or": "OR", "$not": "NOT"} LEGACY_LOGICAL_OPERATORS_MAPPING = {"$and": "AND", "$or": "OR", "$not": "NOT"}
COMPARISON_OPERATORS = { LEGACY_COMPARISON_OPERATORS_MAPPING = {
"$eq": "==", "$eq": "==",
"$ne": "!=", "$ne": "!=",
"$gt": ">", "$gt": ">",
@ -409,4 +425,4 @@ COMPARISON_OPERATORS = {
"$nin": "not in", "$nin": "not in",
} }
ALL_OPERATORS = {**LOGIC_OPERATORS, **COMPARISON_OPERATORS} ALL_LEGACY_OPERATORS_MAPPING = {**LEGACY_LOGICAL_OPERATORS_MAPPING, **LEGACY_COMPARISON_OPERATORS_MAPPING}

View File

@ -653,9 +653,27 @@ filters_data = [
}, },
id="Root explicit $not", id="Root explicit $not",
), ),
pytest.param(
{"page": {"$not": 102}},
{"operator": "NOT", "conditions": [{"field": "page", "operator": "==", "value": 102}]},
id="Explicit $not with implicit $eq",
),
] ]
@pytest.mark.parametrize("old_style, new_style", filters_data) @pytest.mark.parametrize("old_style, new_style", filters_data)
def test_convert(old_style, new_style): def test_convert(old_style, new_style):
assert convert(old_style) == new_style assert convert(old_style) == new_style
def test_convert_with_incorrect_input_type():
with pytest.raises(ValueError):
convert("some string")
def test_convert_with_incorrect_filter_nesting():
with pytest.raises(FilterError):
convert({"number": {"page": "100"}})
with pytest.raises(FilterError):
convert({"number": {"page": {"chapter": "intro"}}})