mirror of
https://github.com/datahub-project/datahub.git
synced 2025-07-10 02:31:26 +00:00
60 lines
1.7 KiB
Python
60 lines
1.7 KiB
Python
import time
|
|
|
|
from datahub.utilities.backpressure_aware_executor import BackpressureAwareExecutor
|
|
from datahub.utilities.perf_timer import PerfTimer
|
|
|
|
|
|
def test_backpressure_aware_executor_simple():
|
|
def task(i):
|
|
return i
|
|
|
|
assert {
|
|
res.result()
|
|
for res in BackpressureAwareExecutor.map(
|
|
task, ((i,) for i in range(10)), max_workers=2
|
|
)
|
|
} == set(range(10))
|
|
|
|
|
|
def test_backpressure_aware_executor_advanced():
|
|
task_duration = 0.5
|
|
started = set()
|
|
executed = set()
|
|
|
|
def task(x, y):
|
|
assert x + 1 == y
|
|
started.add(x)
|
|
time.sleep(task_duration)
|
|
executed.add(x)
|
|
return x
|
|
|
|
args_list = [(i, i + 1) for i in range(10)]
|
|
|
|
with PerfTimer() as timer:
|
|
results = BackpressureAwareExecutor.map(
|
|
task, args_list, max_workers=2, max_pending=4
|
|
)
|
|
assert timer.elapsed_seconds() < task_duration
|
|
|
|
# No tasks should have completed yet.
|
|
assert len(executed) == 0
|
|
|
|
# Consume the first result.
|
|
first_result = next(results)
|
|
assert 0 <= first_result.result() < 4
|
|
assert timer.elapsed_seconds() > task_duration
|
|
|
|
# By now, the first four tasks should have started.
|
|
time.sleep(task_duration)
|
|
assert {0, 1, 2, 3}.issubset(started)
|
|
assert 2 <= len(executed) <= 4
|
|
|
|
# Finally, consume the rest of the results.
|
|
assert {r.result() for r in results} == {
|
|
i for i in range(10) if i != first_result.result()
|
|
}
|
|
|
|
# Validate that the entire process took about 5-10x the task duration.
|
|
# That's because we have 2 workers and 10 tasks.
|
|
assert 5 * task_duration < timer.elapsed_seconds() < 10 * task_duration
|