mirror of
https://github.com/datahub-project/datahub.git
synced 2025-07-06 16:49:03 +00:00
32 lines
749 B
Python
32 lines
749 B
Python
![]() |
import time
|
||
|
|
||
|
import pytest
|
||
|
|
||
|
from datahub.utilities.threading_timeout import TimeoutException, threading_timeout
|
||
|
|
||
|
|
||
|
def test_timeout_no_timeout():
|
||
|
# Should complete without raising an exception
|
||
|
with threading_timeout(1.0):
|
||
|
time.sleep(0.1)
|
||
|
|
||
|
|
||
|
def test_timeout_raises():
|
||
|
# Should raise TimeoutException
|
||
|
with pytest.raises(TimeoutException):
|
||
|
with threading_timeout(0.1):
|
||
|
time.sleep(0.5)
|
||
|
|
||
|
|
||
|
def test_timeout_early_exit():
|
||
|
# Test that context manager handles other exceptions properly
|
||
|
with pytest.raises(ValueError):
|
||
|
with threading_timeout(1.0):
|
||
|
raise ValueError("Early exit")
|
||
|
|
||
|
|
||
|
def test_timeout_zero():
|
||
|
# Should not raise an exception
|
||
|
with threading_timeout(0.0):
|
||
|
pass
|