2025-01-22 19:43:14 +08:00
|
|
|
#
|
|
|
|
# Copyright 2024 The InfiniFlow Authors. All Rights Reserved.
|
|
|
|
#
|
|
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
# you may not use this file except in compliance with the License.
|
|
|
|
# You may obtain a copy of the License at
|
|
|
|
#
|
|
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
#
|
|
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
# See the License for the specific language governing permissions and
|
|
|
|
# limitations under the License.
|
|
|
|
#
|
|
|
|
import logging
|
2025-02-17 14:06:06 +08:00
|
|
|
import re
|
2025-01-22 19:43:14 +08:00
|
|
|
from collections import defaultdict, Counter
|
|
|
|
from copy import deepcopy
|
|
|
|
from typing import Callable
|
2025-03-03 18:59:49 +08:00
|
|
|
import trio
|
2025-03-26 15:34:42 +08:00
|
|
|
import networkx as nx
|
2025-01-22 19:43:14 +08:00
|
|
|
|
|
|
|
from graphrag.general.graph_prompt import SUMMARIZE_DESCRIPTIONS_PROMPT
|
|
|
|
from graphrag.utils import get_llm_cache, set_llm_cache, handle_single_entity_extraction, \
|
2025-03-26 15:34:42 +08:00
|
|
|
handle_single_relationship_extraction, split_string_by_multi_markers, flat_uniq_list, chat_limiter, get_from_to, GraphChange
|
2025-01-22 19:43:14 +08:00
|
|
|
from rag.llm.chat_model import Base as CompletionLLM
|
2025-03-06 19:21:07 +08:00
|
|
|
from rag.prompts import message_fit_in
|
2025-01-22 19:43:14 +08:00
|
|
|
from rag.utils import truncate
|
|
|
|
|
|
|
|
GRAPH_FIELD_SEP = "<SEP>"
|
|
|
|
DEFAULT_ENTITY_TYPES = ["organization", "person", "geo", "event", "category"]
|
|
|
|
ENTITY_EXTRACTION_MAX_GLEANINGS = 2
|
|
|
|
|
|
|
|
|
|
|
|
class Extractor:
|
|
|
|
_llm: CompletionLLM
|
|
|
|
|
|
|
|
def __init__(
|
|
|
|
self,
|
|
|
|
llm_invoker: CompletionLLM,
|
|
|
|
language: str | None = "English",
|
|
|
|
entity_types: list[str] | None = None,
|
|
|
|
):
|
|
|
|
self._llm = llm_invoker
|
|
|
|
self._language = language
|
|
|
|
self._entity_types = entity_types or DEFAULT_ENTITY_TYPES
|
|
|
|
|
|
|
|
def _chat(self, system, history, gen_conf):
|
|
|
|
hist = deepcopy(history)
|
|
|
|
conf = deepcopy(gen_conf)
|
|
|
|
response = get_llm_cache(self._llm.llm_name, system, hist, conf)
|
|
|
|
if response:
|
|
|
|
return response
|
2025-03-21 17:30:38 +08:00
|
|
|
_, system_msg = message_fit_in([{"role": "system", "content": system}], int(self._llm.max_length * 0.92))
|
2025-03-06 19:21:07 +08:00
|
|
|
response = self._llm.chat(system_msg[0]["content"], hist, conf)
|
2025-04-24 11:44:10 +08:00
|
|
|
response = re.sub(r"^.*</think>", "", response, flags=re.DOTALL)
|
2025-01-22 19:43:14 +08:00
|
|
|
if response.find("**ERROR**") >= 0:
|
2025-04-02 17:10:57 +08:00
|
|
|
logging.warning(f"Extractor._chat got error. response: {response}")
|
|
|
|
return ""
|
2025-01-22 19:43:14 +08:00
|
|
|
set_llm_cache(self._llm.llm_name, system, response, history, gen_conf)
|
|
|
|
return response
|
|
|
|
|
|
|
|
def _entities_and_relations(self, chunk_key: str, records: list, tuple_delimiter: str):
|
|
|
|
maybe_nodes = defaultdict(list)
|
|
|
|
maybe_edges = defaultdict(list)
|
|
|
|
ent_types = [t.lower() for t in self._entity_types]
|
|
|
|
for record in records:
|
|
|
|
record_attributes = split_string_by_multi_markers(
|
|
|
|
record, [tuple_delimiter]
|
|
|
|
)
|
|
|
|
|
|
|
|
if_entities = handle_single_entity_extraction(
|
|
|
|
record_attributes, chunk_key
|
|
|
|
)
|
|
|
|
if if_entities is not None and if_entities.get("entity_type", "unknown").lower() in ent_types:
|
|
|
|
maybe_nodes[if_entities["entity_name"]].append(if_entities)
|
|
|
|
continue
|
|
|
|
|
|
|
|
if_relation = handle_single_relationship_extraction(
|
|
|
|
record_attributes, chunk_key
|
|
|
|
)
|
|
|
|
if if_relation is not None:
|
|
|
|
maybe_edges[(if_relation["src_id"], if_relation["tgt_id"])].append(
|
|
|
|
if_relation
|
|
|
|
)
|
|
|
|
return dict(maybe_nodes), dict(maybe_edges)
|
|
|
|
|
2025-03-03 18:59:49 +08:00
|
|
|
async def __call__(
|
2025-03-10 15:15:06 +08:00
|
|
|
self, doc_id: str, chunks: list[str],
|
2025-01-22 19:43:14 +08:00
|
|
|
callback: Callable | None = None
|
|
|
|
):
|
|
|
|
|
2025-03-03 18:59:49 +08:00
|
|
|
self.callback = callback
|
|
|
|
start_ts = trio.current_time()
|
|
|
|
out_results = []
|
|
|
|
async with trio.open_nursery() as nursery:
|
2025-03-10 15:15:06 +08:00
|
|
|
for i, ck in enumerate(chunks):
|
2025-02-06 11:37:23 +08:00
|
|
|
ck = truncate(ck, int(self._llm.max_length*0.8))
|
fix(nursery): Fix Closure Trap Issues in Trio Concurrent Tasks (#7106)
## Problem Description
Multiple files in the RAGFlow project contain closure trap issues when
using lambda functions with `trio.open_nursery()`. This problem causes
concurrent tasks created in loops to reference the same variable,
resulting in all tasks processing the same data (the data from the last
iteration) rather than each task processing its corresponding data from
the loop.
## Issue Details
When using a `lambda` to create a closure function and passing it to
`nursery.start_soon()` within a loop, the lambda function captures a
reference to the loop variable rather than its value. For example:
```python
# Problematic code
async with trio.open_nursery() as nursery:
for d in docs:
nursery.start_soon(lambda: doc_keyword_extraction(chat_mdl, d, topn))
```
In this pattern, when concurrent tasks begin execution, `d` has already
become the value after the loop ends (typically the last element),
causing all tasks to use the same data.
## Fix Solution
Changed the way concurrent tasks are created with `nursery.start_soon()`
by leveraging Trio's API design to directly pass the function and its
arguments separately:
```python
# Fixed code
async with trio.open_nursery() as nursery:
for d in docs:
nursery.start_soon(doc_keyword_extraction, chat_mdl, d, topn)
```
This way, each task uses the parameter values at the time of the
function call, rather than references captured through closures.
## Fixed Files
Fixed closure traps in the following files:
1. `rag/svr/task_executor.py`: 3 fixes, involving document keyword
extraction, question generation, and tag processing
2. `rag/raptor.py`: 1 fix, involving document summarization
3. `graphrag/utils.py`: 2 fixes, involving graph node and edge
processing
4. `graphrag/entity_resolution.py`: 2 fixes, involving entity resolution
and graph node merging
5. `graphrag/general/mind_map_extractor.py`: 2 fixes, involving document
processing
6. `graphrag/general/extractor.py`: 3 fixes, involving content
processing and graph node/edge merging
7. `graphrag/general/community_reports_extractor.py`: 1 fix, involving
community report extraction
## Potential Impact
This fix resolves a serious concurrency issue that could have caused:
- Data processing errors (processing duplicate data)
- Performance degradation (all tasks working on the same data)
- Inconsistent results (some data not being processed)
After the fix, all concurrent tasks should correctly process their
respective data, improving system correctness and reliability.
2025-04-18 18:00:20 +08:00
|
|
|
nursery.start_soon(self._process_single_content, (doc_id, ck), i, len(chunks), out_results)
|
2025-01-22 19:43:14 +08:00
|
|
|
|
|
|
|
maybe_nodes = defaultdict(list)
|
|
|
|
maybe_edges = defaultdict(list)
|
2025-03-03 18:59:49 +08:00
|
|
|
sum_token_count = 0
|
|
|
|
for m_nodes, m_edges, token_count in out_results:
|
2025-01-22 19:43:14 +08:00
|
|
|
for k, v in m_nodes.items():
|
|
|
|
maybe_nodes[k].extend(v)
|
|
|
|
for k, v in m_edges.items():
|
|
|
|
maybe_edges[tuple(sorted(k))].extend(v)
|
2025-03-03 18:59:49 +08:00
|
|
|
sum_token_count += token_count
|
|
|
|
now = trio.current_time()
|
|
|
|
if callback:
|
|
|
|
callback(msg = f"Entities and relationships extraction done, {len(maybe_nodes)} nodes, {len(maybe_edges)} edges, {sum_token_count} tokens, {now-start_ts:.2f}s.")
|
|
|
|
start_ts = now
|
|
|
|
logging.info("Entities merging...")
|
2025-01-22 19:43:14 +08:00
|
|
|
all_entities_data = []
|
2025-03-03 18:59:49 +08:00
|
|
|
async with trio.open_nursery() as nursery:
|
2025-02-25 12:02:44 +08:00
|
|
|
for en_nm, ents in maybe_nodes.items():
|
fix(nursery): Fix Closure Trap Issues in Trio Concurrent Tasks (#7106)
## Problem Description
Multiple files in the RAGFlow project contain closure trap issues when
using lambda functions with `trio.open_nursery()`. This problem causes
concurrent tasks created in loops to reference the same variable,
resulting in all tasks processing the same data (the data from the last
iteration) rather than each task processing its corresponding data from
the loop.
## Issue Details
When using a `lambda` to create a closure function and passing it to
`nursery.start_soon()` within a loop, the lambda function captures a
reference to the loop variable rather than its value. For example:
```python
# Problematic code
async with trio.open_nursery() as nursery:
for d in docs:
nursery.start_soon(lambda: doc_keyword_extraction(chat_mdl, d, topn))
```
In this pattern, when concurrent tasks begin execution, `d` has already
become the value after the loop ends (typically the last element),
causing all tasks to use the same data.
## Fix Solution
Changed the way concurrent tasks are created with `nursery.start_soon()`
by leveraging Trio's API design to directly pass the function and its
arguments separately:
```python
# Fixed code
async with trio.open_nursery() as nursery:
for d in docs:
nursery.start_soon(doc_keyword_extraction, chat_mdl, d, topn)
```
This way, each task uses the parameter values at the time of the
function call, rather than references captured through closures.
## Fixed Files
Fixed closure traps in the following files:
1. `rag/svr/task_executor.py`: 3 fixes, involving document keyword
extraction, question generation, and tag processing
2. `rag/raptor.py`: 1 fix, involving document summarization
3. `graphrag/utils.py`: 2 fixes, involving graph node and edge
processing
4. `graphrag/entity_resolution.py`: 2 fixes, involving entity resolution
and graph node merging
5. `graphrag/general/mind_map_extractor.py`: 2 fixes, involving document
processing
6. `graphrag/general/extractor.py`: 3 fixes, involving content
processing and graph node/edge merging
7. `graphrag/general/community_reports_extractor.py`: 1 fix, involving
community report extraction
## Potential Impact
This fix resolves a serious concurrency issue that could have caused:
- Data processing errors (processing duplicate data)
- Performance degradation (all tasks working on the same data)
- Inconsistent results (some data not being processed)
After the fix, all concurrent tasks should correctly process their
respective data, improving system correctness and reliability.
2025-04-18 18:00:20 +08:00
|
|
|
nursery.start_soon(self._merge_nodes, en_nm, ents, all_entities_data)
|
2025-03-03 18:59:49 +08:00
|
|
|
now = trio.current_time()
|
|
|
|
if callback:
|
|
|
|
callback(msg = f"Entities merging done, {now-start_ts:.2f}s.")
|
2025-01-22 19:43:14 +08:00
|
|
|
|
2025-03-03 18:59:49 +08:00
|
|
|
start_ts = now
|
|
|
|
logging.info("Relationships merging...")
|
2025-01-22 19:43:14 +08:00
|
|
|
all_relationships_data = []
|
2025-03-03 18:59:49 +08:00
|
|
|
async with trio.open_nursery() as nursery:
|
|
|
|
for (src, tgt), rels in maybe_edges.items():
|
fix(nursery): Fix Closure Trap Issues in Trio Concurrent Tasks (#7106)
## Problem Description
Multiple files in the RAGFlow project contain closure trap issues when
using lambda functions with `trio.open_nursery()`. This problem causes
concurrent tasks created in loops to reference the same variable,
resulting in all tasks processing the same data (the data from the last
iteration) rather than each task processing its corresponding data from
the loop.
## Issue Details
When using a `lambda` to create a closure function and passing it to
`nursery.start_soon()` within a loop, the lambda function captures a
reference to the loop variable rather than its value. For example:
```python
# Problematic code
async with trio.open_nursery() as nursery:
for d in docs:
nursery.start_soon(lambda: doc_keyword_extraction(chat_mdl, d, topn))
```
In this pattern, when concurrent tasks begin execution, `d` has already
become the value after the loop ends (typically the last element),
causing all tasks to use the same data.
## Fix Solution
Changed the way concurrent tasks are created with `nursery.start_soon()`
by leveraging Trio's API design to directly pass the function and its
arguments separately:
```python
# Fixed code
async with trio.open_nursery() as nursery:
for d in docs:
nursery.start_soon(doc_keyword_extraction, chat_mdl, d, topn)
```
This way, each task uses the parameter values at the time of the
function call, rather than references captured through closures.
## Fixed Files
Fixed closure traps in the following files:
1. `rag/svr/task_executor.py`: 3 fixes, involving document keyword
extraction, question generation, and tag processing
2. `rag/raptor.py`: 1 fix, involving document summarization
3. `graphrag/utils.py`: 2 fixes, involving graph node and edge
processing
4. `graphrag/entity_resolution.py`: 2 fixes, involving entity resolution
and graph node merging
5. `graphrag/general/mind_map_extractor.py`: 2 fixes, involving document
processing
6. `graphrag/general/extractor.py`: 3 fixes, involving content
processing and graph node/edge merging
7. `graphrag/general/community_reports_extractor.py`: 1 fix, involving
community report extraction
## Potential Impact
This fix resolves a serious concurrency issue that could have caused:
- Data processing errors (processing duplicate data)
- Performance degradation (all tasks working on the same data)
- Inconsistent results (some data not being processed)
After the fix, all concurrent tasks should correctly process their
respective data, improving system correctness and reliability.
2025-04-18 18:00:20 +08:00
|
|
|
nursery.start_soon(self._merge_edges, src, tgt, rels, all_relationships_data)
|
2025-03-03 18:59:49 +08:00
|
|
|
now = trio.current_time()
|
|
|
|
if callback:
|
|
|
|
callback(msg = f"Relationships merging done, {now-start_ts:.2f}s.")
|
2025-01-22 19:43:14 +08:00
|
|
|
|
|
|
|
if not len(all_entities_data) and not len(all_relationships_data):
|
|
|
|
logging.warning(
|
|
|
|
"Didn't extract any entities and relationships, maybe your LLM is not working"
|
|
|
|
)
|
|
|
|
|
|
|
|
if not len(all_entities_data):
|
|
|
|
logging.warning("Didn't extract any entities")
|
|
|
|
if not len(all_relationships_data):
|
|
|
|
logging.warning("Didn't extract any relationships")
|
|
|
|
|
|
|
|
return all_entities_data, all_relationships_data
|
|
|
|
|
2025-03-03 18:59:49 +08:00
|
|
|
async def _merge_nodes(self, entity_name: str, entities: list[dict], all_relationships_data):
|
2025-01-22 19:43:14 +08:00
|
|
|
if not entities:
|
|
|
|
return
|
|
|
|
entity_type = sorted(
|
|
|
|
Counter(
|
2025-03-26 15:34:42 +08:00
|
|
|
[dp["entity_type"] for dp in entities]
|
2025-01-22 19:43:14 +08:00
|
|
|
).items(),
|
|
|
|
key=lambda x: x[1],
|
|
|
|
reverse=True,
|
|
|
|
)[0][0]
|
|
|
|
description = GRAPH_FIELD_SEP.join(
|
2025-03-26 15:34:42 +08:00
|
|
|
sorted(set([dp["description"] for dp in entities]))
|
2025-01-22 19:43:14 +08:00
|
|
|
)
|
|
|
|
already_source_ids = flat_uniq_list(entities, "source_id")
|
2025-03-03 18:59:49 +08:00
|
|
|
description = await self._handle_entity_relation_summary(entity_name, description)
|
|
|
|
node_data = dict(
|
|
|
|
entity_type=entity_type,
|
|
|
|
description=description,
|
|
|
|
source_id=already_source_ids,
|
|
|
|
)
|
|
|
|
node_data["entity_name"] = entity_name
|
|
|
|
all_relationships_data.append(node_data)
|
2025-01-22 19:43:14 +08:00
|
|
|
|
2025-03-03 18:59:49 +08:00
|
|
|
async def _merge_edges(
|
2025-01-22 19:43:14 +08:00
|
|
|
self,
|
|
|
|
src_id: str,
|
|
|
|
tgt_id: str,
|
2025-03-03 18:59:49 +08:00
|
|
|
edges_data: list[dict],
|
2025-03-05 14:37:51 +08:00
|
|
|
all_relationships_data=None
|
2025-01-22 19:43:14 +08:00
|
|
|
):
|
|
|
|
if not edges_data:
|
|
|
|
return
|
2025-03-26 15:34:42 +08:00
|
|
|
weight = sum([edge["weight"] for edge in edges_data])
|
|
|
|
description = GRAPH_FIELD_SEP.join(sorted(set([edge["description"] for edge in edges_data])))
|
|
|
|
description = await self._handle_entity_relation_summary(f"{src_id} -> {tgt_id}", description)
|
|
|
|
keywords = flat_uniq_list(edges_data, "keywords")
|
|
|
|
source_id = flat_uniq_list(edges_data, "source_id")
|
2025-01-22 19:43:14 +08:00
|
|
|
edge_data = dict(
|
|
|
|
src_id=src_id,
|
|
|
|
tgt_id=tgt_id,
|
|
|
|
description=description,
|
|
|
|
keywords=keywords,
|
|
|
|
weight=weight,
|
|
|
|
source_id=source_id
|
|
|
|
)
|
2025-03-26 15:34:42 +08:00
|
|
|
all_relationships_data.append(edge_data)
|
|
|
|
|
|
|
|
async def _merge_graph_nodes(self, graph: nx.Graph, nodes: list[str], change: GraphChange):
|
|
|
|
if len(nodes) <= 1:
|
|
|
|
return
|
|
|
|
change.added_updated_nodes.add(nodes[0])
|
2025-04-01 12:06:28 +08:00
|
|
|
change.removed_nodes.update(nodes[1:])
|
2025-03-26 15:34:42 +08:00
|
|
|
nodes_set = set(nodes)
|
|
|
|
node0_attrs = graph.nodes[nodes[0]]
|
|
|
|
node0_neighbors = set(graph.neighbors(nodes[0]))
|
|
|
|
for node1 in nodes[1:]:
|
|
|
|
# Merge two nodes, keep "entity_name", "entity_type", "page_rank" unchanged.
|
|
|
|
node1_attrs = graph.nodes[node1]
|
|
|
|
node0_attrs["description"] += f"{GRAPH_FIELD_SEP}{node1_attrs['description']}"
|
2025-04-01 09:38:21 +08:00
|
|
|
node0_attrs["source_id"] = sorted(set(node0_attrs["source_id"] + node1_attrs["source_id"]))
|
2025-03-26 15:34:42 +08:00
|
|
|
for neighbor in graph.neighbors(node1):
|
|
|
|
change.removed_edges.add(get_from_to(node1, neighbor))
|
|
|
|
if neighbor not in nodes_set:
|
|
|
|
edge1_attrs = graph.get_edge_data(node1, neighbor)
|
|
|
|
if neighbor in node0_neighbors:
|
|
|
|
# Merge two edges
|
|
|
|
change.added_updated_edges.add(get_from_to(nodes[0], neighbor))
|
|
|
|
edge0_attrs = graph.get_edge_data(nodes[0], neighbor)
|
|
|
|
edge0_attrs["weight"] += edge1_attrs["weight"]
|
|
|
|
edge0_attrs["description"] += f"{GRAPH_FIELD_SEP}{edge1_attrs['description']}"
|
2025-03-31 22:31:35 +08:00
|
|
|
for attr in ["keywords", "source_id"]:
|
2025-04-01 09:38:21 +08:00
|
|
|
edge0_attrs[attr] = sorted(set(edge0_attrs[attr] + edge1_attrs[attr]))
|
2025-03-26 15:34:42 +08:00
|
|
|
edge0_attrs["description"] = await self._handle_entity_relation_summary(f"({nodes[0]}, {neighbor})", edge0_attrs["description"])
|
|
|
|
graph.add_edge(nodes[0], neighbor, **edge0_attrs)
|
|
|
|
else:
|
|
|
|
graph.add_edge(nodes[0], neighbor, **edge1_attrs)
|
|
|
|
graph.remove_node(node1)
|
|
|
|
node0_attrs["description"] = await self._handle_entity_relation_summary(nodes[0], node0_attrs["description"])
|
|
|
|
graph.nodes[nodes[0]].update(node0_attrs)
|
2025-01-22 19:43:14 +08:00
|
|
|
|
2025-03-03 18:59:49 +08:00
|
|
|
async def _handle_entity_relation_summary(
|
2025-01-22 19:43:14 +08:00
|
|
|
self,
|
|
|
|
entity_or_relation_name: str,
|
|
|
|
description: str
|
|
|
|
) -> str:
|
|
|
|
summary_max_tokens = 512
|
|
|
|
use_description = truncate(description, summary_max_tokens)
|
2025-03-10 15:15:06 +08:00
|
|
|
description_list=use_description.split(GRAPH_FIELD_SEP),
|
|
|
|
if len(description_list) <= 12:
|
|
|
|
return use_description
|
2025-01-22 19:43:14 +08:00
|
|
|
prompt_template = SUMMARIZE_DESCRIPTIONS_PROMPT
|
|
|
|
context_base = dict(
|
|
|
|
entity_name=entity_or_relation_name,
|
2025-03-10 15:15:06 +08:00
|
|
|
description_list=description_list,
|
2025-01-22 19:43:14 +08:00
|
|
|
language=self._language,
|
|
|
|
)
|
|
|
|
use_prompt = prompt_template.format(**context_base)
|
|
|
|
logging.info(f"Trigger summary: {entity_or_relation_name}")
|
2025-03-03 18:59:49 +08:00
|
|
|
async with chat_limiter:
|
|
|
|
summary = await trio.to_thread.run_sync(lambda: self._chat(use_prompt, [{"role": "user", "content": "Output: "}], {"temperature": 0.8}))
|
2025-01-22 19:43:14 +08:00
|
|
|
return summary
|