mirror of
https://github.com/datahub-project/datahub.git
synced 2025-07-07 01:00:41 +00:00
87 lines
2.9 KiB
Python
87 lines
2.9 KiB
Python
import random
|
|
import re
|
|
import time
|
|
|
|
import pytest
|
|
|
|
from datahub.utilities.lossy_collections import LossyDict, LossyList, LossySet
|
|
|
|
|
|
@pytest.mark.parametrize("length, sampling", [(10, False), (100, True)])
|
|
def test_lossylist_sampling(length, sampling):
|
|
l_dict: LossyList[str] = LossyList()
|
|
for i in range(0, length):
|
|
l_dict.append(f"{i} Hello World")
|
|
|
|
assert len(l_dict) == length
|
|
assert l_dict.sampled is sampling
|
|
if sampling:
|
|
assert f"... sampled of {length} total elements" in str(l_dict)
|
|
else:
|
|
assert "sampled" not in str(l_dict)
|
|
|
|
list_version = [int(i.split(" ")[0]) for i in l_dict]
|
|
print(list_version)
|
|
assert sorted(list_version) == list_version
|
|
|
|
|
|
@pytest.mark.parametrize("length, sampling", [(10, False), (100, True)])
|
|
def test_lossyset_sampling(length, sampling):
|
|
lossy_set: LossySet[str] = LossySet()
|
|
for i in range(0, length):
|
|
lossy_set.add(f"{i} Hello World")
|
|
|
|
assert len(lossy_set) == min(10, length)
|
|
assert lossy_set.sampled is sampling
|
|
if sampling:
|
|
assert f"... sampled with at most {length - 10} elements missing" in str(
|
|
lossy_set
|
|
)
|
|
else:
|
|
assert "sampled" not in str(lossy_set)
|
|
|
|
list_version = [int(i.split(" ")[0]) for i in lossy_set]
|
|
set_version = set(list_version)
|
|
|
|
assert len(list_version) == len(set_version)
|
|
assert len(list_version) == min(10, length)
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"length, sampling, sub_length", [(4, False, 4), (10, False, 14), (100, True, 1000)]
|
|
)
|
|
def test_lossydict_sampling(length, sampling, sub_length):
|
|
lossy_dict: LossyDict[int, LossyList[str]] = LossyDict()
|
|
elements_added = 0
|
|
element_length_map = {}
|
|
for i in range(0, length):
|
|
list_length = random.choice(range(1, sub_length))
|
|
element_length_map[i] = 0
|
|
for _num_elements in range(0, list_length):
|
|
if not lossy_dict.get(i):
|
|
elements_added += 1
|
|
# reset to 0 until we get it back
|
|
element_length_map[i] = 0
|
|
else:
|
|
element_length_map[i] = len(lossy_dict[i])
|
|
|
|
current_list = lossy_dict.get(i, LossyList())
|
|
current_list.append(f"{i}:{round(time.time(), 2)} Hello World")
|
|
lossy_dict[i] = current_list
|
|
element_length_map[i] += 1
|
|
|
|
assert len(lossy_dict) == min(lossy_dict.max_elements, length)
|
|
assert lossy_dict.sampled is sampling
|
|
if sampling:
|
|
assert re.search("sampled of at most .* entries.", str(lossy_dict))
|
|
assert (
|
|
f"{lossy_dict.max_elements} sampled of at most {elements_added} entries."
|
|
in str(lossy_dict)
|
|
)
|
|
else:
|
|
# cheap way to determine that the dict isn't reporting sampled keys
|
|
assert not re.search("sampled of at most .* entries.", str(lossy_dict))
|
|
|
|
for k, v in lossy_dict.items():
|
|
assert len(v) == element_length_map[k]
|