Pipeline stability fixes hopefully and logging

This commit is contained in:
Jake Poznanski 2024-10-29 20:15:34 +00:00
parent ce2e4baa87
commit 232c445a23
2 changed files with 45 additions and 1 deletions

2
.gitignore vendored
View File

@ -8,6 +8,8 @@ gnarly_previews/*
s2orc_previews/*
s2orc_previews_3200/*
/*.html
debug.log
birrpipeline-debug.log
# build artifacts

View File

@ -10,6 +10,7 @@ import datetime
import posixpath
import threading
import logging
import psutil
import boto3.session
import urllib3.exceptions
@ -32,7 +33,26 @@ from pdelfin.s3_utils import parse_custom_id, expand_s3_glob, get_s3_bytes, pars
# Initialize logger
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
logger.setLevel(logging.DEBUG) # Set to DEBUG for the file handler to capture everything
# Console handler for INFO level and above
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
# File handler for DEBUG level and above with line-by-line flushing
class FlushFileHandler(logging.FileHandler):
def emit(self, record):
super().emit(record)
self.flush() # Explicitly flush after every log entry
file_handler = FlushFileHandler('birrpipeline-debug.log', mode='a')
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
# Add handlers to the logger
logger.handlers.clear()
logger.addHandler(console_handler)
logger.addHandler(file_handler)
# Global s3 client for the whole script, feel free to adjust params if you need it
workspace_s3 = boto3.client('s3')
@ -525,6 +545,7 @@ def build_pdf_queries(s3_workspace: str, pdf: DatabaseManager.PDFRecord, cur_rou
# Shortcut out of downloading the actual PDF
if set(page.page_num for page in existing_pages if page.is_usable()) == set(range(1, pdf.num_pages + 1)):
db.close()
return []
try:
@ -532,6 +553,8 @@ def build_pdf_queries(s3_workspace: str, pdf: DatabaseManager.PDFRecord, cur_rou
tf.write(get_s3_bytes(pdf_s3, pdf.s3_path))
tf.flush()
logger.debug(f"build_pdf_queries for {pdf.s3_path} with {pdf.num_pages} pages")
for target_page_num in range(1, pdf.num_pages + 1):
# Is there an existing page that has no error
if any(page.is_usable() and page.page_num == target_page_num for page in existing_pages):
@ -547,6 +570,7 @@ def build_pdf_queries(s3_workspace: str, pdf: DatabaseManager.PDFRecord, cur_rou
rotated_page_data = _get_page_data([page for page in existing_pages if page.page_num == target_page_num and page.error == "rotation_invalid"])
rotation_corrections = set(page_data.rotation_correction for page_data in rotated_page_data)
for correction in rotation_corrections:
logger.debug(f"Adding {correction}-degree rotation query for {pdf.s3_path}-{target_page_num}")
new_queries.append({**build_page_query(tf.name, pdf.s3_path, target_page_num, target_longest_image_dim, target_anchor_text_len, image_rotation=correction), "round": cur_round})
# TODO: Try to provide a smaller prompt hint if that was the error
@ -555,6 +579,7 @@ def build_pdf_queries(s3_workspace: str, pdf: DatabaseManager.PDFRecord, cur_rou
except Exception as ex:
logger.warning(f"Warning, could not get batch inferences lines for {pdf.s3_path} due to {ex}")
db.close()
return new_queries
def build_dolma_doc(s3_workspace: str, pdf: DatabaseManager.PDFRecord) -> Optional[dict]:
@ -569,6 +594,7 @@ def build_dolma_doc(s3_workspace: str, pdf: DatabaseManager.PDFRecord) -> Option
usable_pages = [page for page in existing_pages if page.is_usable() and page.page_num == target_page_num]
if len(usable_pages) == 0:
db.close()
return None
for target_page_num in range(1, pdf.num_pages + 1):
@ -608,11 +634,13 @@ def build_dolma_doc(s3_workspace: str, pdf: DatabaseManager.PDFRecord) -> Option
}
}
db.close()
return dolma_doc
def mark_pdfs_done(s3_workspace: str, dolma_docs: list[dict]):
db = DatabaseManager(s3_workspace, skip_init=True)
db.update_pdf_statuses({doc["metadata"]["Source-File"]: "completed" for doc in dolma_docs})
db.close()
def get_current_round(s3_workspace: str) -> int:
path = s3_workspace[5:]
@ -761,6 +789,20 @@ if __name__ == '__main__':
return_when=concurrent.futures.FIRST_COMPLETED,
)
# Get current Python process memory usage
# process = psutil.Process()
# memory_usage_python = process.memory_info().rss # Resident Set Size (RSS) in bytes
# logger.debug(f"Current Python memory usage: {memory_usage_python / (1024 ** 2):.2f} MB")
# # Get total memory usage on the host
# total_memory = psutil.virtual_memory().total
# used_memory = psutil.virtual_memory().used
# logger.debug(f"Total memory on host: {total_memory / (1024 ** 3):.2f} GB")
# logger.debug(f"Used memory on host: {used_memory / (1024 ** 3):.2f} GB")
for future in done:
pdf = pending_futures.pop(future)
inference_lines = future.result()