feat(ingest): allow max_workers=1 with ASYNC_BATCH rest sink (#12088)

This commit is contained in:
Harshal Sheth 2024-12-10 18:32:52 -05:00 committed by GitHub
parent 00f0ee8689
commit d953718ab7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 7 additions and 6 deletions

View File

@ -65,11 +65,11 @@ class DatahubRestSinkConfig(DatahubClientConfig):
mode: RestSinkMode = _DEFAULT_REST_SINK_MODE
# These only apply in async modes.
max_threads: int = _DEFAULT_REST_SINK_MAX_THREADS
max_pending_requests: int = 2000
max_threads: pydantic.PositiveInt = _DEFAULT_REST_SINK_MAX_THREADS
max_pending_requests: pydantic.PositiveInt = 2000
# Only applies in async batch mode.
max_per_batch: int = 100
max_per_batch: pydantic.PositiveInt = 100
@dataclasses.dataclass

View File

@ -268,7 +268,7 @@ class BatchPartitionExecutor(Closeable):
self.process_batch = process_batch
self.min_process_interval = min_process_interval
self.read_from_pending_interval = read_from_pending_interval
assert self.max_workers > 1
assert self.max_workers >= 1
self._state_lock = threading.Lock()
self._executor = ThreadPoolExecutor(

View File

@ -80,7 +80,8 @@ def test_partitioned_executor_bounding():
assert len(done_tasks) == 16
def test_batch_partition_executor_sequential_key_execution():
@pytest.mark.parametrize("max_workers", [1, 2, 10])
def test_batch_partition_executor_sequential_key_execution(max_workers: int) -> None:
executing_tasks = set()
done_tasks = set()
done_task_batches = set()
@ -99,7 +100,7 @@ def test_batch_partition_executor_sequential_key_execution():
done_task_batches.add(tuple(id for _, id in batch))
with BatchPartitionExecutor(
max_workers=2,
max_workers=max_workers,
max_pending=10,
max_per_batch=2,
process_batch=process_batch,