| 
									
										
										
										
											2024-11-18 10:07:03 -08:00
										 |  |  | import asyncio | 
					
						
							|  |  |  | import datetime | 
					
						
							| 
									
										
										
										
											2025-01-29 15:25:10 -08:00
										 |  |  | import unittest | 
					
						
							| 
									
										
										
										
											2025-08-13 20:20:27 +00:00
										 |  |  | from unittest.mock import Mock, patch | 
					
						
							| 
									
										
										
										
											2025-01-29 15:25:10 -08:00
										 |  |  | 
 | 
					
						
							|  |  |  | from botocore.exceptions import ClientError | 
					
						
							| 
									
										
										
										
											2024-11-18 10:07:03 -08:00
										 |  |  | 
 | 
					
						
							|  |  |  | # Import the classes we're testing | 
					
						
							| 
									
										
										
										
											2025-08-13 20:21:04 +00:00
										 |  |  | from olmocr.work_queue import S3Backend, WorkItem, WorkQueue | 
					
						
							| 
									
										
										
										
											2024-11-18 10:07:03 -08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2025-01-29 15:25:10 -08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-11-18 10:07:03 -08:00
										 |  |  | class TestS3WorkQueue(unittest.TestCase): | 
					
						
							|  |  |  |     def setUp(self): | 
					
						
							|  |  |  |         """Set up test fixtures before each test method.""" | 
					
						
							|  |  |  |         self.s3_client = Mock() | 
					
						
							|  |  |  |         self.s3_client.exceptions.ClientError = ClientError | 
					
						
							| 
									
										
										
										
											2025-08-13 20:20:27 +00:00
										 |  |  |         self.backend = S3Backend(self.s3_client, "s3://test-bucket/workspace") | 
					
						
							|  |  |  |         self.work_queue = WorkQueue(self.backend) | 
					
						
							| 
									
										
										
										
											2024-11-18 10:07:03 -08:00
										 |  |  |         self.sample_paths = [ | 
					
						
							|  |  |  |             "s3://test-bucket/data/file1.pdf", | 
					
						
							|  |  |  |             "s3://test-bucket/data/file2.pdf", | 
					
						
							|  |  |  |             "s3://test-bucket/data/file3.pdf", | 
					
						
							|  |  |  |         ] | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def tearDown(self): | 
					
						
							|  |  |  |         """Clean up after each test method.""" | 
					
						
							|  |  |  |         pass | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def test_compute_workgroup_hash(self): | 
					
						
							|  |  |  |         """Test hash computation is deterministic and correct""" | 
					
						
							|  |  |  |         paths = [ | 
					
						
							|  |  |  |             "s3://test-bucket/data/file2.pdf", | 
					
						
							|  |  |  |             "s3://test-bucket/data/file1.pdf", | 
					
						
							|  |  |  |         ] | 
					
						
							| 
									
										
										
										
											2025-01-29 15:30:39 -08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-11-18 10:07:03 -08:00
										 |  |  |         # Hash should be the same regardless of order | 
					
						
							| 
									
										
										
										
											2025-08-13 20:20:27 +00:00
										 |  |  |         hash1 = WorkQueue._compute_workgroup_hash(paths) | 
					
						
							|  |  |  |         hash2 = WorkQueue._compute_workgroup_hash(reversed(paths)) | 
					
						
							| 
									
										
										
										
											2024-11-18 10:07:03 -08:00
										 |  |  |         self.assertEqual(hash1, hash2) | 
					
						
							| 
									
										
										
										
											2025-01-29 15:30:39 -08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-11-18 10:07:03 -08:00
										 |  |  |     def test_init(self): | 
					
						
							| 
									
										
										
										
											2025-08-13 20:20:27 +00:00
										 |  |  |         """Test initialization of S3Backend""" | 
					
						
							| 
									
										
										
										
											2024-11-18 10:07:03 -08:00
										 |  |  |         client = Mock() | 
					
						
							| 
									
										
										
										
											2025-08-13 20:20:27 +00:00
										 |  |  |         backend = S3Backend(client, "s3://test-bucket/workspace/") | 
					
						
							| 
									
										
										
										
											2025-01-29 15:30:39 -08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2025-08-13 20:20:27 +00:00
										 |  |  |         self.assertEqual(backend.workspace_path, "s3://test-bucket/workspace") | 
					
						
							|  |  |  |         self.assertEqual(backend._index_path, "s3://test-bucket/workspace/work_index_list.csv.zstd") | 
					
						
							|  |  |  |         self.assertEqual(backend._output_glob, "s3://test-bucket/workspace/done_flags/*.flag") | 
					
						
							| 
									
										
										
										
											2024-11-18 10:07:03 -08:00
										 |  |  | 
 | 
					
						
							|  |  |  |     def asyncSetUp(self): | 
					
						
							|  |  |  |         """Set up async test fixtures""" | 
					
						
							|  |  |  |         self.loop = asyncio.new_event_loop() | 
					
						
							|  |  |  |         asyncio.set_event_loop(self.loop) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def asyncTearDown(self): | 
					
						
							|  |  |  |         """Clean up async test fixtures""" | 
					
						
							|  |  |  |         self.loop.close() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def async_test(f): | 
					
						
							|  |  |  |         """Decorator for async test methods""" | 
					
						
							| 
									
										
										
										
											2025-01-29 15:30:39 -08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-11-18 10:07:03 -08:00
										 |  |  |         def wrapper(*args, **kwargs): | 
					
						
							|  |  |  |             loop = asyncio.new_event_loop() | 
					
						
							|  |  |  |             asyncio.set_event_loop(loop) | 
					
						
							|  |  |  |             try: | 
					
						
							|  |  |  |                 return loop.run_until_complete(f(*args, **kwargs)) | 
					
						
							|  |  |  |             finally: | 
					
						
							|  |  |  |                 loop.close() | 
					
						
							| 
									
										
										
										
											2025-01-29 15:30:39 -08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-11-18 10:07:03 -08:00
										 |  |  |         return wrapper | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     @async_test | 
					
						
							|  |  |  |     async def test_populate_queue_new_items(self): | 
					
						
							|  |  |  |         """Test populating queue with new items""" | 
					
						
							|  |  |  |         # Mock empty existing index | 
					
						
							| 
									
										
										
										
											2025-01-29 15:30:39 -08:00
										 |  |  |         with patch("olmocr.work_queue.download_zstd_csv", return_value=[]): | 
					
						
							|  |  |  |             with patch("olmocr.work_queue.upload_zstd_csv") as mock_upload: | 
					
						
							| 
									
										
										
										
											2024-11-18 10:07:03 -08:00
										 |  |  |                 await self.work_queue.populate_queue(self.sample_paths, items_per_group=2) | 
					
						
							| 
									
										
										
										
											2025-01-29 15:30:39 -08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-11-18 10:07:03 -08:00
										 |  |  |                 # Verify upload was called with correct data | 
					
						
							|  |  |  |                 self.assertEqual(mock_upload.call_count, 1) | 
					
						
							|  |  |  |                 _, _, lines = mock_upload.call_args[0] | 
					
						
							| 
									
										
										
										
											2025-01-29 15:30:39 -08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-11-18 10:07:03 -08:00
										 |  |  |                 # Should create 2 work groups (2 files + 1 file) | 
					
						
							|  |  |  |                 self.assertEqual(len(lines), 2) | 
					
						
							| 
									
										
										
										
											2025-01-29 15:30:39 -08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-11-18 10:07:03 -08:00
										 |  |  |                 # Verify format of uploaded lines | 
					
						
							|  |  |  |                 for line in lines: | 
					
						
							| 
									
										
										
										
											2025-08-13 20:20:27 +00:00
										 |  |  |                     parts = WorkQueue._decode_csv_row(line) | 
					
						
							| 
									
										
										
										
											2024-11-18 10:07:03 -08:00
										 |  |  |                     self.assertGreaterEqual(len(parts), 2)  # Hash + at least one path | 
					
						
							|  |  |  |                     self.assertEqual(len(parts[0]), 40)  # SHA1 hash length | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     @async_test | 
					
						
							|  |  |  |     async def test_populate_queue_existing_items(self): | 
					
						
							|  |  |  |         """Test populating queue with mix of new and existing items""" | 
					
						
							|  |  |  |         existing_paths = ["s3://test-bucket/data/existing1.pdf"] | 
					
						
							|  |  |  |         new_paths = ["s3://test-bucket/data/new1.pdf"] | 
					
						
							| 
									
										
										
										
											2025-01-29 15:30:39 -08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-11-18 10:07:03 -08:00
										 |  |  |         # Create existing index content | 
					
						
							| 
									
										
										
										
											2025-08-13 20:20:27 +00:00
										 |  |  |         existing_hash = WorkQueue._compute_workgroup_hash(existing_paths) | 
					
						
							|  |  |  |         existing_line = WorkQueue._encode_csv_row([existing_hash] + existing_paths) | 
					
						
							| 
									
										
										
										
											2025-01-29 15:30:39 -08:00
										 |  |  | 
 | 
					
						
							|  |  |  |         with patch("olmocr.work_queue.download_zstd_csv", return_value=[existing_line]): | 
					
						
							|  |  |  |             with patch("olmocr.work_queue.upload_zstd_csv") as mock_upload: | 
					
						
							| 
									
										
										
										
											2024-11-18 10:07:03 -08:00
										 |  |  |                 await self.work_queue.populate_queue(existing_paths + new_paths, items_per_group=1) | 
					
						
							| 
									
										
										
										
											2025-01-29 15:30:39 -08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-11-18 10:07:03 -08:00
										 |  |  |                 # Verify upload called with both existing and new items | 
					
						
							|  |  |  |                 _, _, lines = mock_upload.call_args[0] | 
					
						
							|  |  |  |                 self.assertEqual(len(lines), 2) | 
					
						
							|  |  |  |                 self.assertIn(existing_line, lines) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     @async_test | 
					
						
							|  |  |  |     async def test_initialize_queue(self): | 
					
						
							|  |  |  |         """Test queue initialization""" | 
					
						
							|  |  |  |         # Mock work items and completed items | 
					
						
							|  |  |  |         work_paths = ["s3://test/file1.pdf", "s3://test/file2.pdf"] | 
					
						
							| 
									
										
										
										
											2025-08-13 20:20:27 +00:00
										 |  |  |         work_hash = WorkQueue._compute_workgroup_hash(work_paths) | 
					
						
							|  |  |  |         work_line = WorkQueue._encode_csv_row([work_hash] + work_paths) | 
					
						
							| 
									
										
										
										
											2025-01-29 15:30:39 -08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2025-08-13 20:20:27 +00:00
										 |  |  |         completed_items = [f"s3://test-bucket/workspace/done_flags/done_{work_hash}.flag"] | 
					
						
							| 
									
										
										
										
											2025-01-29 15:30:39 -08:00
										 |  |  | 
 | 
					
						
							|  |  |  |         with patch("olmocr.work_queue.download_zstd_csv", return_value=[work_line]): | 
					
						
							|  |  |  |             with patch("olmocr.work_queue.expand_s3_glob", return_value=completed_items): | 
					
						
							| 
									
										
										
										
											2025-08-13 20:20:27 +00:00
										 |  |  |                 count = await self.work_queue.initialize_queue() | 
					
						
							| 
									
										
										
										
											2025-01-29 15:30:39 -08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-11-18 10:07:03 -08:00
										 |  |  |                 # Queue should be empty since all work is completed | 
					
						
							| 
									
										
										
										
											2025-08-13 20:20:27 +00:00
										 |  |  |                 self.assertEqual(count, 0) | 
					
						
							| 
									
										
										
										
											2024-11-18 10:07:03 -08:00
										 |  |  | 
 | 
					
						
							|  |  |  |     @async_test | 
					
						
							|  |  |  |     async def test_is_completed(self): | 
					
						
							|  |  |  |         """Test completed work check""" | 
					
						
							|  |  |  |         work_hash = "testhash123" | 
					
						
							| 
									
										
										
										
											2025-01-29 15:30:39 -08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-11-18 10:07:03 -08:00
										 |  |  |         # Test completed work | 
					
						
							| 
									
										
										
										
											2025-01-29 15:30:39 -08:00
										 |  |  |         self.s3_client.head_object.return_value = {"LastModified": datetime.datetime.now(datetime.timezone.utc)} | 
					
						
							| 
									
										
										
										
											2025-08-13 20:20:27 +00:00
										 |  |  |         self.assertTrue(await self.backend.is_completed(work_hash)) | 
					
						
							| 
									
										
										
										
											2025-01-29 15:30:39 -08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-11-18 10:07:03 -08:00
										 |  |  |         # Test incomplete work | 
					
						
							| 
									
										
										
										
											2025-01-29 15:30:39 -08:00
										 |  |  |         self.s3_client.head_object.side_effect = ClientError({"Error": {"Code": "404", "Message": "Not Found"}}, "HeadObject") | 
					
						
							| 
									
										
										
										
											2025-08-13 20:20:27 +00:00
										 |  |  |         self.assertFalse(await self.backend.is_completed(work_hash)) | 
					
						
							| 
									
										
										
										
											2024-11-18 10:07:03 -08:00
										 |  |  | 
 | 
					
						
							|  |  |  |     @async_test | 
					
						
							|  |  |  |     async def test_get_work(self): | 
					
						
							|  |  |  |         """Test getting work items""" | 
					
						
							|  |  |  |         # Setup test data | 
					
						
							| 
									
										
										
										
											2025-01-27 20:45:28 +00:00
										 |  |  |         work_item = WorkItem(hash="testhash123", work_paths=["s3://test/file1.pdf"]) | 
					
						
							| 
									
										
										
										
											2024-11-18 10:07:03 -08:00
										 |  |  |         await self.work_queue._queue.put(work_item) | 
					
						
							| 
									
										
										
										
											2025-01-29 15:30:39 -08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-11-18 10:07:03 -08:00
										 |  |  |         # Test getting available work | 
					
						
							| 
									
										
										
										
											2025-01-29 15:30:39 -08:00
										 |  |  |         self.s3_client.head_object.side_effect = ClientError({"Error": {"Code": "404", "Message": "Not Found"}}, "HeadObject") | 
					
						
							| 
									
										
										
										
											2024-11-18 10:07:03 -08:00
										 |  |  |         result = await self.work_queue.get_work() | 
					
						
							|  |  |  |         self.assertEqual(result, work_item) | 
					
						
							| 
									
										
										
										
											2025-01-29 15:30:39 -08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-11-18 10:07:03 -08:00
										 |  |  |         # Verify lock file was created | 
					
						
							|  |  |  |         self.s3_client.put_object.assert_called_once() | 
					
						
							| 
									
										
										
										
											2025-08-13 20:20:27 +00:00
										 |  |  |         key = self.s3_client.put_object.call_args[1]["Key"] | 
					
						
							|  |  |  |         self.assertTrue(key.endswith(f"worker_{work_item.hash}.lock")) | 
					
						
							| 
									
										
										
										
											2024-11-18 10:07:03 -08:00
										 |  |  | 
 | 
					
						
							|  |  |  |     @async_test | 
					
						
							|  |  |  |     async def test_get_work_completed(self): | 
					
						
							|  |  |  |         """Test getting work that's already completed""" | 
					
						
							| 
									
										
										
										
											2025-01-27 20:45:28 +00:00
										 |  |  |         work_item = WorkItem(hash="testhash123", work_paths=["s3://test/file1.pdf"]) | 
					
						
							| 
									
										
										
										
											2024-11-18 10:07:03 -08:00
										 |  |  |         await self.work_queue._queue.put(work_item) | 
					
						
							| 
									
										
										
										
											2025-01-29 15:30:39 -08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-11-18 10:07:03 -08:00
										 |  |  |         # Simulate completed work | 
					
						
							| 
									
										
										
										
											2025-01-29 15:30:39 -08:00
										 |  |  |         self.s3_client.head_object.return_value = {"LastModified": datetime.datetime.now(datetime.timezone.utc)} | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-11-18 10:07:03 -08:00
										 |  |  |         result = await self.work_queue.get_work() | 
					
						
							|  |  |  |         self.assertIsNone(result)  # Should skip completed work | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     @async_test | 
					
						
							|  |  |  |     async def test_get_work_locked(self): | 
					
						
							|  |  |  |         """Test getting work that's locked by another worker""" | 
					
						
							| 
									
										
										
										
											2025-01-27 20:45:28 +00:00
										 |  |  |         work_item = WorkItem(hash="testhash123", work_paths=["s3://test/file1.pdf"]) | 
					
						
							| 
									
										
										
										
											2024-11-18 10:07:03 -08:00
										 |  |  |         await self.work_queue._queue.put(work_item) | 
					
						
							| 
									
										
										
										
											2025-01-29 15:30:39 -08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-11-18 10:07:03 -08:00
										 |  |  |         # Simulate active lock | 
					
						
							|  |  |  |         recent_time = datetime.datetime.now(datetime.timezone.utc) | 
					
						
							|  |  |  |         self.s3_client.head_object.side_effect = [ | 
					
						
							| 
									
										
										
										
											2025-01-29 15:30:39 -08:00
										 |  |  |             ClientError({"Error": {"Code": "404", "Message": "Not Found"}}, "HeadObject"),  # Not completed | 
					
						
							|  |  |  |             {"LastModified": recent_time},  # Active lock | 
					
						
							| 
									
										
										
										
											2024-11-18 10:07:03 -08:00
										 |  |  |         ] | 
					
						
							| 
									
										
										
										
											2025-01-29 15:30:39 -08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-11-18 10:07:03 -08:00
										 |  |  |         result = await self.work_queue.get_work() | 
					
						
							|  |  |  |         self.assertIsNone(result)  # Should skip locked work | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     @async_test | 
					
						
							|  |  |  |     async def test_get_work_stale_lock(self): | 
					
						
							|  |  |  |         """Test getting work with a stale lock""" | 
					
						
							| 
									
										
										
										
											2025-01-27 20:45:28 +00:00
										 |  |  |         work_item = WorkItem(hash="testhash123", work_paths=["s3://test/file1.pdf"]) | 
					
						
							| 
									
										
										
										
											2024-11-18 10:07:03 -08:00
										 |  |  |         await self.work_queue._queue.put(work_item) | 
					
						
							| 
									
										
										
										
											2025-01-29 15:30:39 -08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-11-18 10:07:03 -08:00
										 |  |  |         # Simulate stale lock | 
					
						
							|  |  |  |         stale_time = datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(hours=1) | 
					
						
							|  |  |  |         self.s3_client.head_object.side_effect = [ | 
					
						
							| 
									
										
										
										
											2025-01-29 15:30:39 -08:00
										 |  |  |             ClientError({"Error": {"Code": "404", "Message": "Not Found"}}, "HeadObject"),  # Not completed | 
					
						
							|  |  |  |             {"LastModified": stale_time},  # Stale lock | 
					
						
							| 
									
										
										
										
											2024-11-18 10:07:03 -08:00
										 |  |  |         ] | 
					
						
							| 
									
										
										
										
											2025-01-29 15:30:39 -08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-11-18 10:07:03 -08:00
										 |  |  |         result = await self.work_queue.get_work() | 
					
						
							|  |  |  |         self.assertEqual(result, work_item)  # Should take work with stale lock | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     @async_test | 
					
						
							|  |  |  |     async def test_mark_done(self): | 
					
						
							|  |  |  |         """Test marking work as done""" | 
					
						
							| 
									
										
										
										
											2025-01-27 20:45:28 +00:00
										 |  |  |         work_item = WorkItem(hash="testhash123", work_paths=["s3://test/file1.pdf"]) | 
					
						
							| 
									
										
										
										
											2024-11-18 10:07:03 -08:00
										 |  |  |         await self.work_queue._queue.put(work_item) | 
					
						
							| 
									
										
										
										
											2025-01-29 15:30:39 -08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-11-18 10:07:03 -08:00
										 |  |  |         await self.work_queue.mark_done(work_item) | 
					
						
							| 
									
										
										
										
											2025-01-29 15:30:39 -08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2025-08-13 20:20:27 +00:00
										 |  |  |         # Verify done flag was created and lock file was deleted | 
					
						
							|  |  |  |         # Check put_object was called for done flag | 
					
						
							|  |  |  |         put_calls = self.s3_client.put_object.call_args_list | 
					
						
							|  |  |  |         self.assertEqual(len(put_calls), 1) | 
					
						
							|  |  |  |         done_flag_key = put_calls[0][1]["Key"] | 
					
						
							|  |  |  |         self.assertTrue(done_flag_key.endswith(f"done_{work_item.hash}.flag")) | 
					
						
							| 
									
										
										
										
											2025-08-13 20:21:04 +00:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-11-18 10:07:03 -08:00
										 |  |  |         # Verify lock file was deleted | 
					
						
							|  |  |  |         self.s3_client.delete_object.assert_called_once() | 
					
						
							| 
									
										
										
										
											2025-08-13 20:20:27 +00:00
										 |  |  |         key = self.s3_client.delete_object.call_args[1]["Key"] | 
					
						
							|  |  |  |         self.assertTrue(key.endswith(f"worker_{work_item.hash}.lock")) | 
					
						
							| 
									
										
										
										
											2024-11-18 10:07:03 -08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2025-04-15 18:50:13 +00:00
										 |  |  |     @async_test | 
					
						
							|  |  |  |     async def test_paths_with_commas(self): | 
					
						
							|  |  |  |         """Test handling of paths that contain commas""" | 
					
						
							|  |  |  |         # Create paths with commas in them | 
					
						
							|  |  |  |         paths_with_commas = ["s3://test-bucket/data/file1,with,commas.pdf", "s3://test-bucket/data/file2,comma.pdf", "s3://test-bucket/data/file3.pdf"] | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         # Mock empty existing index for initial population | 
					
						
							|  |  |  |         with patch("olmocr.work_queue.download_zstd_csv", return_value=[]): | 
					
						
							|  |  |  |             with patch("olmocr.work_queue.upload_zstd_csv") as mock_upload: | 
					
						
							|  |  |  |                 # Populate the queue with these paths | 
					
						
							|  |  |  |                 await self.work_queue.populate_queue(paths_with_commas, items_per_group=3) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |                 # Capture what would be written to the index | 
					
						
							|  |  |  |                 _, _, lines = mock_upload.call_args[0] | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |                 # Now simulate reading back these lines (which have commas in the paths) | 
					
						
							|  |  |  |                 with patch("olmocr.work_queue.download_zstd_csv", return_value=lines): | 
					
						
							|  |  |  |                     with patch("olmocr.work_queue.expand_s3_glob", return_value=[]): | 
					
						
							|  |  |  |                         # Initialize a fresh queue from these lines | 
					
						
							|  |  |  |                         await self.work_queue.initialize_queue() | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2025-08-13 20:20:27 +00:00
										 |  |  |                         # Mock ClientError for head_object (file doesn't exist) - need to handle multiple calls | 
					
						
							|  |  |  |                         self.s3_client.head_object.side_effect = [ | 
					
						
							|  |  |  |                             ClientError({"Error": {"Code": "404", "Message": "Not Found"}}, "HeadObject"),  # done flag check | 
					
						
							|  |  |  |                             ClientError({"Error": {"Code": "404", "Message": "Not Found"}}, "HeadObject"),  # worker lock check | 
					
						
							|  |  |  |                         ] | 
					
						
							| 
									
										
										
										
											2025-04-15 18:50:13 +00:00
										 |  |  | 
 | 
					
						
							|  |  |  |                         # Get a work item | 
					
						
							|  |  |  |                         work_item = await self.work_queue.get_work() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |                         # Now verify we get a work item | 
					
						
							|  |  |  |                         self.assertIsNotNone(work_item, "Should get a work item") | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |                         # Verify the work item has the correct number of paths | 
					
						
							|  |  |  |                         self.assertEqual(len(work_item.work_paths), len(paths_with_commas), "Work item should have the correct number of paths") | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |                         # Check that all original paths with commas are preserved | 
					
						
							|  |  |  |                         for path in paths_with_commas: | 
					
						
							|  |  |  |                             print(path) | 
					
						
							|  |  |  |                             self.assertIn(path, work_item.work_paths, f"Path with commas should be preserved: {path}") | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-11-18 10:07:03 -08:00
										 |  |  |     def test_queue_size(self): | 
					
						
							|  |  |  |         """Test queue size property""" | 
					
						
							|  |  |  |         self.assertEqual(self.work_queue.size, 0) | 
					
						
							| 
									
										
										
										
											2025-01-29 15:30:39 -08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-11-18 10:07:03 -08:00
										 |  |  |         self.loop = asyncio.new_event_loop() | 
					
						
							|  |  |  |         asyncio.set_event_loop(self.loop) | 
					
						
							| 
									
										
										
										
											2025-01-29 15:30:39 -08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2025-01-27 20:45:28 +00:00
										 |  |  |         self.loop.run_until_complete(self.work_queue._queue.put(WorkItem(hash="test1", work_paths=["path1"]))) | 
					
						
							| 
									
										
										
										
											2024-11-18 10:07:03 -08:00
										 |  |  |         self.assertEqual(self.work_queue.size, 1) | 
					
						
							| 
									
										
										
										
											2025-01-29 15:30:39 -08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2025-01-27 20:45:28 +00:00
										 |  |  |         self.loop.run_until_complete(self.work_queue._queue.put(WorkItem(hash="test2", work_paths=["path2"]))) | 
					
						
							| 
									
										
										
										
											2024-11-18 10:07:03 -08:00
										 |  |  |         self.assertEqual(self.work_queue.size, 2) | 
					
						
							| 
									
										
										
										
											2025-01-29 15:30:39 -08:00
										 |  |  | 
 | 
					
						
							| 
									
										
										
										
											2024-11-18 10:07:03 -08:00
										 |  |  |         self.loop.close() | 
					
						
							|  |  |  | 
 | 
					
						
							| 
									
										
										
										
											2025-01-29 15:30:39 -08:00
										 |  |  | 
 | 
					
						
							|  |  |  | if __name__ == "__main__": | 
					
						
							|  |  |  |     unittest.main() |