From 8cbba6e9dbf2ae28760ae6986a81fa6d2bca371f Mon Sep 17 00:00:00 2001 From: Molion Surya Date: Tue, 8 Jul 2025 13:25:52 +0800 Subject: [PATCH 1/2] Fix #1746: [openai.py logic for streaming complete] --- lightrag/llm/openai.py | 28 ++++++++++++++++++++++++---- 1 file changed, 24 insertions(+), 4 deletions(-) diff --git a/lightrag/llm/openai.py b/lightrag/llm/openai.py index 57f016cf..30491476 100644 --- a/lightrag/llm/openai.py +++ b/lightrag/llm/openai.py @@ -210,9 +210,16 @@ async def openai_complete_if_cache( async def inner(): # Track if we've started iterating iteration_started = False + final_chunk_usage = None + try: iteration_started = True async for chunk in response: + # Check if this chunk has usage information (final chunk) + if hasattr(chunk, "usage") and chunk.usage: + final_chunk_usage = chunk.usage + logger.debug(f"Received usage info in streaming chunk: {chunk.usage}") + # Check if choices exists and is not empty if not hasattr(chunk, "choices") or not chunk.choices: logger.warning(f"Received chunk without choices: {chunk}") @@ -222,16 +229,29 @@ async def openai_complete_if_cache( if not hasattr(chunk.choices[0], "delta") or not hasattr( chunk.choices[0].delta, "content" ): - logger.warning( - f"Received chunk without delta content: {chunk.choices[0]}" - ) + # This might be the final chunk, continue to check for usage continue + content = chunk.choices[0].delta.content if content is None: continue if r"\u" in content: content = safe_unicode_decode(content.encode("utf-8")) + yield content + + # After streaming is complete, track token usage + if token_tracker and final_chunk_usage: + # Use actual usage from the API + token_counts = { + "prompt_tokens": getattr(final_chunk_usage, "prompt_tokens", 0), + "completion_tokens": getattr(final_chunk_usage, "completion_tokens", 0), + "total_tokens": getattr(final_chunk_usage, "total_tokens", 0), + } + token_tracker.add_usage(token_counts) + logger.debug(f"Streaming token usage (from API): {token_counts}") + elif token_tracker: + logger.debug("No usage information available in streaming response") except Exception as e: logger.error(f"Error in stream response: {str(e)}") # Try to clean up resources if possible @@ -451,4 +471,4 @@ async def openai_embed( response = await openai_async_client.embeddings.create( model=model, input=texts, encoding_format="float" ) - return np.array([dp.embedding for dp in response.data]) + return np.array([dp.embedding for dp in response.data]) \ No newline at end of file From 2a0cff3ed6ec69e0b5786bbcea7402b25b5c2dc0 Mon Sep 17 00:00:00 2001 From: yangdx Date: Tue, 8 Jul 2025 18:17:21 +0800 Subject: [PATCH 2/2] Fix linting --- lightrag/llm/openai.py | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/lightrag/llm/openai.py b/lightrag/llm/openai.py index 30491476..eb74c2f1 100644 --- a/lightrag/llm/openai.py +++ b/lightrag/llm/openai.py @@ -211,15 +211,17 @@ async def openai_complete_if_cache( # Track if we've started iterating iteration_started = False final_chunk_usage = None - + try: iteration_started = True async for chunk in response: # Check if this chunk has usage information (final chunk) if hasattr(chunk, "usage") and chunk.usage: final_chunk_usage = chunk.usage - logger.debug(f"Received usage info in streaming chunk: {chunk.usage}") - + logger.debug( + f"Received usage info in streaming chunk: {chunk.usage}" + ) + # Check if choices exists and is not empty if not hasattr(chunk, "choices") or not chunk.choices: logger.warning(f"Received chunk without choices: {chunk}") @@ -231,21 +233,23 @@ async def openai_complete_if_cache( ): # This might be the final chunk, continue to check for usage continue - + content = chunk.choices[0].delta.content if content is None: continue if r"\u" in content: content = safe_unicode_decode(content.encode("utf-8")) - + yield content - + # After streaming is complete, track token usage if token_tracker and final_chunk_usage: # Use actual usage from the API token_counts = { "prompt_tokens": getattr(final_chunk_usage, "prompt_tokens", 0), - "completion_tokens": getattr(final_chunk_usage, "completion_tokens", 0), + "completion_tokens": getattr( + final_chunk_usage, "completion_tokens", 0 + ), "total_tokens": getattr(final_chunk_usage, "total_tokens", 0), } token_tracker.add_usage(token_counts) @@ -471,4 +475,4 @@ async def openai_embed( response = await openai_async_client.embeddings.create( model=model, input=texts, encoding_format="float" ) - return np.array([dp.embedding for dp in response.data]) \ No newline at end of file + return np.array([dp.embedding for dp in response.data])