mirror of
https://github.com/getzep/graphiti.git
synced 2025-12-28 07:33:30 +00:00
Add episode latency improvements (#214)
* reformat prompts * update prompts * update * update * update * update * update * mypy
This commit is contained in:
parent
eba9f40ca2
commit
a8a73ec38b
@ -23,8 +23,6 @@ from dotenv import load_dotenv
|
||||
from transcript_parser import parse_podcast_messages
|
||||
|
||||
from graphiti_core import Graphiti
|
||||
from graphiti_core.nodes import EpisodeType
|
||||
from graphiti_core.utils.bulk_utils import RawEpisode
|
||||
from graphiti_core.utils.maintenance.graph_data_operations import clear_data
|
||||
|
||||
load_dotenv()
|
||||
@ -55,52 +53,21 @@ def setup_logging():
|
||||
return logger
|
||||
|
||||
|
||||
async def main(use_bulk: bool = True):
|
||||
async def main():
|
||||
setup_logging()
|
||||
client = Graphiti(neo4j_uri, neo4j_user, neo4j_password)
|
||||
await clear_data(client.driver)
|
||||
await client.build_indices_and_constraints()
|
||||
messages = parse_podcast_messages()
|
||||
|
||||
if not use_bulk:
|
||||
for i, message in enumerate(messages[3:14]):
|
||||
await client.add_episode(
|
||||
name=f'Message {i}',
|
||||
episode_body=f'{message.speaker_name} ({message.role}): {message.content}',
|
||||
reference_time=message.actual_timestamp,
|
||||
source_description='Podcast Transcript',
|
||||
group_id='podcast',
|
||||
)
|
||||
|
||||
# build communities
|
||||
await client.build_communities()
|
||||
|
||||
# add additional messages to update communities
|
||||
for i, message in enumerate(messages[14:20]):
|
||||
await client.add_episode(
|
||||
name=f'Message {i}',
|
||||
episode_body=f'{message.speaker_name} ({message.role}): {message.content}',
|
||||
reference_time=message.actual_timestamp,
|
||||
source_description='Podcast Transcript',
|
||||
group_id='podcast',
|
||||
update_communities=True,
|
||||
)
|
||||
|
||||
return
|
||||
|
||||
episodes: list[RawEpisode] = [
|
||||
RawEpisode(
|
||||
for i, message in enumerate(messages[3:14]):
|
||||
await client.add_episode(
|
||||
name=f'Message {i}',
|
||||
content=f'{message.speaker_name} ({message.role}): {message.content}',
|
||||
source=EpisodeType.message,
|
||||
source_description='Podcast Transcript',
|
||||
episode_body=f'{message.speaker_name} ({message.role}): {message.content}',
|
||||
reference_time=message.actual_timestamp,
|
||||
source_description='Podcast Transcript',
|
||||
group_id='podcast',
|
||||
)
|
||||
for i, message in enumerate(messages[3:20])
|
||||
]
|
||||
|
||||
await client.add_episode_bulk(episodes, '')
|
||||
|
||||
|
||||
asyncio.run(main(False))
|
||||
asyncio.run(main())
|
||||
|
||||
@ -353,7 +353,13 @@ class Graphiti:
|
||||
logger.debug(f'Extracted nodes: {[(n.name, n.uuid) for n in extracted_nodes]}')
|
||||
|
||||
(mentioned_nodes, uuid_map), extracted_edges = await asyncio.gather(
|
||||
resolve_extracted_nodes(self.llm_client, extracted_nodes, existing_nodes_lists),
|
||||
resolve_extracted_nodes(
|
||||
self.llm_client,
|
||||
extracted_nodes,
|
||||
existing_nodes_lists,
|
||||
episode,
|
||||
previous_episodes,
|
||||
),
|
||||
extract_edges(
|
||||
self.llm_client, episode, extracted_nodes, previous_episodes, group_id
|
||||
),
|
||||
|
||||
@ -21,103 +21,16 @@ from .models import Message, PromptFunction, PromptVersion
|
||||
|
||||
|
||||
class Prompt(Protocol):
|
||||
v1: PromptVersion
|
||||
v2: PromptVersion
|
||||
v3: PromptVersion
|
||||
edge: PromptVersion
|
||||
edge_list: PromptVersion
|
||||
|
||||
|
||||
class Versions(TypedDict):
|
||||
v1: PromptFunction
|
||||
v2: PromptFunction
|
||||
v3: PromptFunction
|
||||
edge: PromptFunction
|
||||
edge_list: PromptFunction
|
||||
|
||||
|
||||
def v1(context: dict[str, Any]) -> list[Message]:
|
||||
return [
|
||||
Message(
|
||||
role='system',
|
||||
content='You are a helpful assistant that de-duplicates relationship from edge lists.',
|
||||
),
|
||||
Message(
|
||||
role='user',
|
||||
content=f"""
|
||||
Given the following context, deduplicate facts from a list of new facts given a list of existing edges:
|
||||
|
||||
Existing Edges:
|
||||
{json.dumps(context['existing_edges'], indent=2)}
|
||||
|
||||
New Edges:
|
||||
{json.dumps(context['extracted_edges'], indent=2)}
|
||||
|
||||
Task:
|
||||
If any edge in New Edges is a duplicate of an edge in Existing Edges, add their uuids to the output list.
|
||||
When finding duplicates edges, synthesize their facts into a short new fact.
|
||||
|
||||
Guidelines:
|
||||
1. identical or near identical facts are duplicates
|
||||
2. Facts are also duplicates if they are represented by similar sentences
|
||||
3. Facts will often discuss the same or similar relation between identical entities
|
||||
|
||||
Respond with a JSON object in the following format:
|
||||
{{
|
||||
"duplicates": [
|
||||
{{
|
||||
"uuid": "uuid of the new node like 5d643020624c42fa9de13f97b1b3fa39",
|
||||
"duplicate_of": "uuid of the existing node",
|
||||
"fact": "one sentence description of the fact"
|
||||
}}
|
||||
]
|
||||
}}
|
||||
""",
|
||||
),
|
||||
]
|
||||
|
||||
|
||||
def v2(context: dict[str, Any]) -> list[Message]:
|
||||
return [
|
||||
Message(
|
||||
role='system',
|
||||
content='You are a helpful assistant that de-duplicates relationship from edge lists.',
|
||||
),
|
||||
Message(
|
||||
role='user',
|
||||
content=f"""
|
||||
Given the following context, deduplicate edges from a list of new edges given a list of existing edges:
|
||||
|
||||
Existing Edges:
|
||||
{json.dumps(context['existing_edges'], indent=2)}
|
||||
|
||||
New Edges:
|
||||
{json.dumps(context['extracted_edges'], indent=2)}
|
||||
|
||||
Task:
|
||||
1. start with the list of edges from New Edges
|
||||
2. If any edge in New Edges is a duplicate of an edge in Existing Edges, replace the new edge with the existing
|
||||
edge in the list
|
||||
3. Respond with the resulting list of edges
|
||||
|
||||
Guidelines:
|
||||
1. Use both the triplet name and fact of edges to determine if they are duplicates,
|
||||
duplicate edges may have different names meaning the same thing and slight variations in the facts.
|
||||
2. If you encounter facts that are semantically equivalent or very similar, keep the original edge
|
||||
|
||||
Respond with a JSON object in the following format:
|
||||
{{
|
||||
"new_edges": [
|
||||
{{
|
||||
"triplet": "source_node_name-edge_name-target_node_name",
|
||||
"fact": "one sentence description of the fact"
|
||||
}}
|
||||
]
|
||||
}}
|
||||
""",
|
||||
),
|
||||
]
|
||||
|
||||
|
||||
def v3(context: dict[str, Any]) -> list[Message]:
|
||||
def edge(context: dict[str, Any]) -> list[Message]:
|
||||
return [
|
||||
Message(
|
||||
role='system',
|
||||
@ -128,11 +41,14 @@ def v3(context: dict[str, Any]) -> list[Message]:
|
||||
content=f"""
|
||||
Given the following context, determine whether the New Edge represents any of the edges in the list of Existing Edges.
|
||||
|
||||
Existing Edges:
|
||||
<EXISTING EDGES>
|
||||
{json.dumps(context['related_edges'], indent=2)}
|
||||
</EXISTING EDGES>
|
||||
|
||||
New Edge:
|
||||
<NEW EDGE>
|
||||
{json.dumps(context['extracted_edges'], indent=2)}
|
||||
</NEW EDGE>
|
||||
|
||||
Task:
|
||||
1. If the New Edges represents the same factual information as any edge in Existing Edges, return 'is_duplicate: true' in the
|
||||
response. Otherwise, return 'is_duplicate: false'
|
||||
@ -189,4 +105,4 @@ def edge_list(context: dict[str, Any]) -> list[Message]:
|
||||
]
|
||||
|
||||
|
||||
versions: Versions = {'v1': v1, 'v2': v2, 'v3': v3, 'edge_list': edge_list}
|
||||
versions: Versions = {'edge': edge, 'edge_list': edge_list}
|
||||
|
||||
@ -21,20 +21,16 @@ from .models import Message, PromptFunction, PromptVersion
|
||||
|
||||
|
||||
class Prompt(Protocol):
|
||||
v1: PromptVersion
|
||||
v2: PromptVersion
|
||||
v3: PromptVersion
|
||||
node: PromptVersion
|
||||
node_list: PromptVersion
|
||||
|
||||
|
||||
class Versions(TypedDict):
|
||||
v1: PromptFunction
|
||||
v2: PromptFunction
|
||||
v3: PromptFunction
|
||||
node: PromptFunction
|
||||
node_list: PromptFunction
|
||||
|
||||
|
||||
def v1(context: dict[str, Any]) -> list[Message]:
|
||||
def node(context: dict[str, Any]) -> list[Message]:
|
||||
return [
|
||||
Message(
|
||||
role='system',
|
||||
@ -43,106 +39,28 @@ def v1(context: dict[str, Any]) -> list[Message]:
|
||||
Message(
|
||||
role='user',
|
||||
content=f"""
|
||||
Given the following context, deduplicate nodes from a list of new nodes given a list of existing nodes:
|
||||
<PREVIOUS MESSAGES>
|
||||
{json.dumps([ep for ep in context['previous_episodes']], indent=2)}
|
||||
</PREVIOUS MESSAGES>
|
||||
<CURRENT MESSAGE>
|
||||
{context["episode_content"]}
|
||||
</CURRENT MESSAGE>
|
||||
|
||||
Existing Nodes:
|
||||
<EXISTING NODES>
|
||||
{json.dumps(context['existing_nodes'], indent=2)}
|
||||
|
||||
New Nodes:
|
||||
{json.dumps(context['extracted_nodes'], indent=2)}
|
||||
</EXISTING NODES>
|
||||
|
||||
Task:
|
||||
1. start with the list of nodes from New Nodes
|
||||
2. If any node in New Nodes is a duplicate of a node in Existing Nodes, replace the new node with the existing
|
||||
node in the list
|
||||
3. when deduplicating nodes, synthesize their summaries into a short new summary that contains the relevant information
|
||||
of the summaries of the new and existing nodes
|
||||
4. Respond with the resulting list of nodes
|
||||
Given the above EXISTING NODES, MESSAGE, and PREVIOUS MESSAGES. Determine if the NEW NODE extracted from the conversation
|
||||
is a duplicate entity of one of the EXISTING NODES.
|
||||
|
||||
Guidelines:
|
||||
1. Use both the name and summary of nodes to determine if they are duplicates,
|
||||
duplicate nodes may have different names
|
||||
|
||||
Respond with a JSON object in the following format:
|
||||
{{
|
||||
"new_nodes": [
|
||||
{{
|
||||
"name": "Unique identifier for the node",
|
||||
"summary": "Brief summary of the node's role or significance"
|
||||
}}
|
||||
]
|
||||
}}
|
||||
""",
|
||||
),
|
||||
]
|
||||
|
||||
|
||||
def v2(context: dict[str, Any]) -> list[Message]:
|
||||
return [
|
||||
Message(
|
||||
role='system',
|
||||
content='You are a helpful assistant that de-duplicates nodes from node lists.',
|
||||
),
|
||||
Message(
|
||||
role='user',
|
||||
content=f"""
|
||||
Given the following context, deduplicate nodes from a list of new nodes given a list of existing nodes:
|
||||
|
||||
Existing Nodes:
|
||||
{json.dumps(context['existing_nodes'], indent=2)}
|
||||
|
||||
New Nodes:
|
||||
{json.dumps(context['extracted_nodes'], indent=2)}
|
||||
Important:
|
||||
If a node in the new nodes is describing the same entity as a node in the existing nodes, mark it as a duplicate!!!
|
||||
Task:
|
||||
If any node in New Nodes is a duplicate of a node in Existing Nodes, add their uuids to the output list
|
||||
When finding duplicates nodes, synthesize their summaries into a short new summary that contains the
|
||||
relevant information of the summaries of the new and existing nodes.
|
||||
|
||||
Guidelines:
|
||||
1. Use both the name and summary of nodes to determine if they are duplicates,
|
||||
duplicate nodes may have different names
|
||||
2. In the output, uuid should always be the uuid of the New Node that is a duplicate. duplicate_of should be
|
||||
the uuid of the Existing Node.
|
||||
|
||||
Respond with a JSON object in the following format:
|
||||
{{
|
||||
"duplicates": [
|
||||
{{
|
||||
"uuid": "uuid of the new node like 5d643020624c42fa9de13f97b1b3fa39",
|
||||
"duplicate_of": "uuid of the existing node",
|
||||
"summary": "Brief summary of the node's role or significance. Takes information from the new and existing nodes"
|
||||
}}
|
||||
]
|
||||
}}
|
||||
""",
|
||||
),
|
||||
]
|
||||
|
||||
|
||||
def v3(context: dict[str, Any]) -> list[Message]:
|
||||
return [
|
||||
Message(
|
||||
role='system',
|
||||
content='You are a helpful assistant that de-duplicates nodes from node lists.',
|
||||
),
|
||||
Message(
|
||||
role='user',
|
||||
content=f"""
|
||||
Given the following context, determine whether the New Node represents any of the entities in the list of Existing Nodes.
|
||||
|
||||
Existing Nodes:
|
||||
{json.dumps(context['existing_nodes'], indent=2)}
|
||||
|
||||
New Node:
|
||||
<NEW NODE>
|
||||
{json.dumps(context['extracted_nodes'], indent=2)}
|
||||
</NEW NODE>
|
||||
Task:
|
||||
1. If the New Node represents the same entity as any node in Existing Nodes, return 'is_duplicate: true' in the
|
||||
response. Otherwise, return 'is_duplicate: false'
|
||||
2. If is_duplicate is true, also return the uuid of the existing node in the response
|
||||
3. If is_duplicate is true, return a summary that synthesizes the information in the New Node summary and the
|
||||
summary of the Existing Node it is a duplicate of.
|
||||
3. If is_duplicate is true, return a name for the node that is the most complete full name.
|
||||
|
||||
Guidelines:
|
||||
1. Use both the name and summary of nodes to determine if the entities are duplicates,
|
||||
@ -152,7 +70,7 @@ def v3(context: dict[str, Any]) -> list[Message]:
|
||||
{{
|
||||
"is_duplicate": true or false,
|
||||
"uuid": "uuid of the existing node like 5d643020624c42fa9de13f97b1b3fa39 or null",
|
||||
"summary": "Brief summary of the node's role or significance. Takes information from the new and existing node"
|
||||
"name": "Updated name of the new node (use the best name between the new node's name, an existing duplicate name, or a combination of both)"
|
||||
}}
|
||||
""",
|
||||
),
|
||||
@ -196,4 +114,4 @@ def node_list(context: dict[str, Any]) -> list[Message]:
|
||||
]
|
||||
|
||||
|
||||
versions: Versions = {'v1': v1, 'v2': v2, 'v3': v3, 'node_list': node_list}
|
||||
versions: Versions = {'node': node, 'node_list': node_list}
|
||||
|
||||
@ -36,12 +36,19 @@ def v1(context: dict[str, Any]) -> list[Message]:
|
||||
Message(
|
||||
role='user',
|
||||
content=f"""
|
||||
Edge:
|
||||
Fact: {context['edge_fact']}
|
||||
|
||||
Current Episode: {context['current_episode']}
|
||||
Previous Episodes: {context['previous_episodes']}
|
||||
Reference Timestamp: {context['reference_timestamp']}
|
||||
<PREVIOUS MESSAGES>
|
||||
{context['previous_episodes']}
|
||||
</PREVIOUS MESSAGES>
|
||||
<CURRENT MESSAGE>
|
||||
{context["current_episode"]}
|
||||
</CURRENT MESSAGE>
|
||||
<REFERENCE TIMESTAMP>
|
||||
{context['reference_timestamp']}
|
||||
</REFERENCE TIMESTAMP>
|
||||
|
||||
<FACT>
|
||||
{context['edge_fact']}
|
||||
</FACT>
|
||||
|
||||
IMPORTANT: Only extract time information if it is part of the provided fact. Otherwise ignore the time mentioned. Make sure to do your best to determine the dates if only the relative time is mentioned. (eg 10 years ago, 2 mins ago) based on the provided reference timestamp
|
||||
If the relationship is not of spanning nature, but you are still able to determine the dates, set the valid_at only.
|
||||
@ -60,7 +67,7 @@ def v1(context: dict[str, Any]) -> list[Message]:
|
||||
5. Do not infer dates from related events. Only use dates that are directly stated to establish or change the relationship.
|
||||
6. For relative time mentions directly related to the relationship, calculate the actual datetime based on the reference timestamp.
|
||||
7. If only a date is mentioned without a specific time, use 00:00:00 (midnight) for that date.
|
||||
8. If only a year is mentioned, use January 1st of that year at 00:00:00.
|
||||
8. If only year is mentioned, use January 1st of that year at 00:00:00.
|
||||
9. Always include the time zone offset (use Z for UTC if no specific time zone is mentioned).
|
||||
Respond with a JSON object:
|
||||
{{
|
||||
|
||||
@ -21,121 +21,58 @@ from .models import Message, PromptFunction, PromptVersion
|
||||
|
||||
|
||||
class Prompt(Protocol):
|
||||
v1: PromptVersion
|
||||
v2: PromptVersion
|
||||
edge: PromptVersion
|
||||
reflexion: PromptVersion
|
||||
|
||||
|
||||
class Versions(TypedDict):
|
||||
v1: PromptFunction
|
||||
v2: PromptFunction
|
||||
edge: PromptFunction
|
||||
reflexion: PromptFunction
|
||||
|
||||
|
||||
def v1(context: dict[str, Any]) -> list[Message]:
|
||||
def edge(context: dict[str, Any]) -> list[Message]:
|
||||
return [
|
||||
Message(
|
||||
role='system',
|
||||
content='You are a helpful assistant that extracts graph edges from provided context.',
|
||||
content='You are an expert fact extractor that extracts fact triples from text.',
|
||||
),
|
||||
Message(
|
||||
role='user',
|
||||
content=f"""
|
||||
Given the following context, extract new semantic edges (relationships) that need to be added to the knowledge graph:
|
||||
|
||||
Current Graph Structure:
|
||||
{context['relevant_schema']}
|
||||
|
||||
New Nodes:
|
||||
{json.dumps(context['new_nodes'], indent=2)}
|
||||
|
||||
New Episode:
|
||||
Content: {context['episode_content']}
|
||||
Timestamp: {context['episode_timestamp']}
|
||||
|
||||
Previous Episodes:
|
||||
{json.dumps([ep['content'] for ep in context['previous_episodes']], indent=2)}
|
||||
|
||||
Extract new semantic edges based on the content of the current episode, considering the existing graph structure, new nodes, and context from previous episodes.
|
||||
|
||||
Guidelines:
|
||||
1. Create edges only between semantic nodes (not episodic nodes like messages).
|
||||
2. Each edge should represent a clear relationship between two semantic nodes.
|
||||
3. The relation_type should be a concise, all-caps description of the relationship (e.g., LOVES, IS_FRIENDS_WITH, WORKS_FOR).
|
||||
4. Provide a more detailed fact describing the relationship.
|
||||
5. If a relationship seems to update an existing one, create a new edge with the updated information.
|
||||
6. Consider temporal aspects of relationships when relevant.
|
||||
7. Do not create edges involving episodic nodes (like Message 1 or Message 2).
|
||||
8. Use existing nodes from the current graph structure when appropriate.
|
||||
|
||||
Respond with a JSON object in the following format:
|
||||
{{
|
||||
"new_edges": [
|
||||
{{
|
||||
"relation_type": "RELATION_TYPE_IN_CAPS",
|
||||
"source_node": "Name of the source semantic node",
|
||||
"target_node": "Name of the target semantic node",
|
||||
"fact": "Detailed description of the relationship",
|
||||
"valid_at": "YYYY-MM-DDTHH:MM:SSZ or null if not explicitly mentioned",
|
||||
"invalid_at": "YYYY-MM-DDTHH:MM:SSZ or null if ongoing or not explicitly mentioned"
|
||||
}}
|
||||
]
|
||||
}}
|
||||
|
||||
If no new edges need to be added, return an empty list for "new_edges".
|
||||
""",
|
||||
),
|
||||
]
|
||||
|
||||
|
||||
def v2(context: dict[str, Any]) -> list[Message]:
|
||||
return [
|
||||
Message(
|
||||
role='system',
|
||||
content='You are a helpful assistant that extracts graph edges from provided context.',
|
||||
),
|
||||
Message(
|
||||
role='user',
|
||||
content=f"""
|
||||
Given the following context, extract edges (relationships) that need to be added to the knowledge graph:
|
||||
Nodes:
|
||||
{json.dumps(context['nodes'], indent=2)}
|
||||
|
||||
<PREVIOUS MESSAGES>
|
||||
{json.dumps([ep for ep in context['previous_episodes']], indent=2)}
|
||||
</PREVIOUS MESSAGES>
|
||||
<CURRENT MESSAGE>
|
||||
{context["episode_content"]}
|
||||
</CURRENT MESSAGE>
|
||||
|
||||
|
||||
Episodes:
|
||||
{json.dumps([ep['content'] for ep in context['previous_episodes']], indent=2)}
|
||||
{context['episode_content']} <-- New Episode
|
||||
|
||||
|
||||
Extract entity edges based on the content of the current episode, the given nodes, and context from previous episodes.
|
||||
<ENTITIES>
|
||||
{context["nodes"]}
|
||||
</ENTITIES>
|
||||
|
||||
{context['custom_prompt']}
|
||||
|
||||
Given the above MESSAGES and ENTITIES, extract all facts pertaining to the listed ENTITIES from the CURRENT MESSAGE.
|
||||
|
||||
|
||||
Guidelines:
|
||||
1. Create edges only between the provided nodes.
|
||||
2. Each edge should represent a clear relationship between two DISTINCT nodes.
|
||||
3. The relation_type should be a concise, all-caps description of the relationship (e.g., LOVES, IS_FRIENDS_WITH, WORKS_FOR).
|
||||
4. Provide a more detailed fact describing the relationship.
|
||||
5. The fact should include any specific relevant information, including numeric information
|
||||
6. Consider temporal aspects of relationships when relevant.
|
||||
7. Avoid using the same node as the source and target of a relationship
|
||||
1. Extract facts only between the provided entities.
|
||||
2. Each fact should represent a clear relationship between two DISTINCT nodes.
|
||||
3. The relation_type should be a concise, all-caps description of the fact (e.g., LOVES, IS_FRIENDS_WITH, WORKS_FOR).
|
||||
4. Provide a more detailed fact containing all relevant information.
|
||||
5. Consider temporal aspects of relationships when relevant.
|
||||
|
||||
Respond with a JSON object in the following format:
|
||||
{{
|
||||
"edges": [
|
||||
{{
|
||||
"relation_type": "RELATION_TYPE_IN_CAPS",
|
||||
"source_node_uuid": "uuid of the source entity node",
|
||||
"target_node_uuid": "uuid of the target entity node",
|
||||
"fact": "brief description of the relationship",
|
||||
"valid_at": "YYYY-MM-DDTHH:MM:SSZ or null if not explicitly mentioned",
|
||||
"invalid_at": "YYYY-MM-DDTHH:MM:SSZ or null if ongoing or not explicitly mentioned"
|
||||
"source_entity_name": "name of the source entity",
|
||||
"target_entity_name": "name of the target entity",
|
||||
"fact": "extracted factual information",
|
||||
}}
|
||||
]
|
||||
}}
|
||||
|
||||
If no edges need to be added, return an empty list for "edges".
|
||||
""",
|
||||
),
|
||||
]
|
||||
@ -145,22 +82,24 @@ def reflexion(context: dict[str, Any]) -> list[Message]:
|
||||
sys_prompt = """You are an AI assistant that determines which facts have not been extracted from the given context"""
|
||||
|
||||
user_prompt = f"""
|
||||
Given the following conversation, current message, list of extracted entities, and list of extracted facts;
|
||||
determine if any facts haven't been extracted:
|
||||
|
||||
<CONVERSATION>:
|
||||
{json.dumps([ep['content'] for ep in context['previous_episodes']], indent=2)}
|
||||
</CONVERSATION>
|
||||
<PREVIOUS MESSAGES>
|
||||
{json.dumps([ep for ep in context['previous_episodes']], indent=2)}
|
||||
</PREVIOUS MESSAGES>
|
||||
<CURRENT MESSAGE>
|
||||
{context["episode_content"]}
|
||||
</CURRENT MESSAGE>
|
||||
|
||||
<EXTRACTED ENTITIES>
|
||||
{context["nodes"]}
|
||||
</EXTRACTED ENTITIES>
|
||||
|
||||
<EXTRACTED FACTS>
|
||||
{context["extracted_facts"]}
|
||||
</EXTRACTED FACTS>
|
||||
|
||||
Given the above MESSAGES, list of EXTRACTED ENTITIES entities, and list of EXTRACTED FACTS;
|
||||
determine if any facts haven't been extracted:
|
||||
|
||||
Respond with a JSON object in the following format:
|
||||
{{
|
||||
"missing_facts": [ "facts that weren't extracted", ...]
|
||||
@ -172,4 +111,4 @@ Respond with a JSON object in the following format:
|
||||
]
|
||||
|
||||
|
||||
versions: Versions = {'v1': v1, 'v2': v2, 'reflexion': reflexion}
|
||||
versions: Versions = {'edge': edge, 'reflexion': reflexion}
|
||||
|
||||
@ -21,93 +21,45 @@ from .models import Message, PromptFunction, PromptVersion
|
||||
|
||||
|
||||
class Prompt(Protocol):
|
||||
v1: PromptVersion
|
||||
v2: PromptVersion
|
||||
extract_message: PromptVersion
|
||||
extract_json: PromptVersion
|
||||
extract_text: PromptVersion
|
||||
reflexion: PromptVersion
|
||||
|
||||
|
||||
class Versions(TypedDict):
|
||||
v1: PromptFunction
|
||||
v2: PromptFunction
|
||||
extract_message: PromptFunction
|
||||
extract_json: PromptFunction
|
||||
extract_text: PromptFunction
|
||||
reflexion: PromptFunction
|
||||
|
||||
|
||||
def v1(context: dict[str, Any]) -> list[Message]:
|
||||
return [
|
||||
Message(
|
||||
role='system',
|
||||
content='You are a helpful assistant that extracts graph nodes from provided context.',
|
||||
),
|
||||
Message(
|
||||
role='user',
|
||||
content=f"""
|
||||
Given the following context, extract new entity nodes that need to be added to the knowledge graph:
|
||||
|
||||
Previous Episodes:
|
||||
{json.dumps([ep['content'] for ep in context['previous_episodes']], indent=2)}
|
||||
|
||||
New Episode:
|
||||
Content: {context["episode_content"]}
|
||||
|
||||
Extract new entity nodes based on the content of the current episode, while considering the context from previous episodes.
|
||||
|
||||
Guidelines:
|
||||
1. Focus on entities, concepts, or actors that are central to the current episode.
|
||||
2. Avoid creating nodes for relationships or actions (these will be handled as edges later).
|
||||
3. Provide a brief but informative summary for each node.
|
||||
4. Be as explicit as possible in your node names, using full names and avoiding abbreviations.
|
||||
|
||||
Respond with a JSON object in the following format:
|
||||
{{
|
||||
"new_nodes": [
|
||||
{{
|
||||
"name": "Unique identifier for the node",
|
||||
"labels": ["Entity", "OptionalAdditionalLabel"],
|
||||
"summary": "Brief summary of the node's role or significance"
|
||||
}}
|
||||
]
|
||||
}}
|
||||
|
||||
If no new nodes need to be added, return an empty list for "new_nodes".
|
||||
""",
|
||||
),
|
||||
]
|
||||
|
||||
|
||||
def v2(context: dict[str, Any]) -> list[Message]:
|
||||
sys_prompt = """You are an AI assistant that extracts entity nodes from conversational text. Your primary task is to identify and extract the speaker and other significant entities mentioned in the conversation."""
|
||||
def extract_message(context: dict[str, Any]) -> list[Message]:
|
||||
sys_prompt = """You are an AI assistant that extracts entity nodes from conversational messages. Your primary task is to identify and extract the speaker and other significant entities mentioned in the conversation."""
|
||||
|
||||
user_prompt = f"""
|
||||
Given the following conversation, extract entity nodes from the CURRENT MESSAGE that are explicitly or implicitly mentioned:
|
||||
|
||||
Conversation:
|
||||
{json.dumps([ep['content'] for ep in context['previous_episodes']], indent=2)}
|
||||
<PREVIOUS MESSAGES>
|
||||
{json.dumps([ep for ep in context['previous_episodes']], indent=2)}
|
||||
</PREVIOUS MESSAGES>
|
||||
<CURRENT MESSAGE>
|
||||
{context["episode_content"]}
|
||||
</CURRENT MESSAGE>
|
||||
|
||||
{context['custom_prompt']}
|
||||
|
||||
Given the above conversation, extract entity nodes from the CURRENT MESSAGE that are explicitly or implicitly mentioned:
|
||||
|
||||
Guidelines:
|
||||
1. ALWAYS extract the speaker/actor as the first node. The speaker is the part before the colon in each line of dialogue.
|
||||
2. Extract other significant entities, concepts, or actors mentioned in the conversation.
|
||||
3. Provide concise but informative summaries for each extracted node.
|
||||
4. Avoid creating nodes for relationships or actions.
|
||||
5. Avoid creating nodes for temporal information like dates, times or years (these will be added to edges later).
|
||||
6. Be as explicit as possible in your node names, using full names and avoiding abbreviations.
|
||||
2. Extract other significant entities, concepts, or actors mentioned in the CURRENT MESSAGE.
|
||||
3. DO NOT create nodes for relationships or actions.
|
||||
4. DO NOT create nodes for temporal information like dates, times or years (these will be added to edges later).
|
||||
5. Be as explicit as possible in your node names, using full names.
|
||||
6. DO NOT extract entities mentioned only in PREVIOUS MESSAGES, those messages are only to provide context.
|
||||
|
||||
Respond with a JSON object in the following format:
|
||||
{{
|
||||
"extracted_nodes": [
|
||||
{{
|
||||
"name": "Unique identifier for the node (use the speaker's name for speaker nodes)",
|
||||
"labels": ["Entity", "Speaker" for speaker nodes, "OptionalAdditionalLabel"],
|
||||
"summary": "Brief summary of the node's role or significance"
|
||||
}}
|
||||
]
|
||||
"extracted_node_names": ["Name of the extracted entity", ...],
|
||||
}}
|
||||
"""
|
||||
return [
|
||||
@ -117,33 +69,28 @@ Respond with a JSON object in the following format:
|
||||
|
||||
|
||||
def extract_json(context: dict[str, Any]) -> list[Message]:
|
||||
sys_prompt = """You are an AI assistant that extracts entity nodes from conversational text.
|
||||
sys_prompt = """You are an AI assistant that extracts entity nodes from JSON.
|
||||
Your primary task is to identify and extract relevant entities from JSON files"""
|
||||
|
||||
user_prompt = f"""
|
||||
Given the following source description, extract relevant entity nodes from the provided JSON:
|
||||
|
||||
Source Description:
|
||||
<SOURCE DESCRIPTION>:
|
||||
{context["source_description"]}
|
||||
|
||||
JSON:
|
||||
</SOURCE DESCRIPTION>
|
||||
<JSON>
|
||||
{context["episode_content"]}
|
||||
</JSON>
|
||||
|
||||
{context['custom_prompt']}
|
||||
|
||||
Given the above source description and JSON, extract relevant entity nodes from the provided JSON:
|
||||
|
||||
Guidelines:
|
||||
1. Always try to extract an entities that the JSON represents. This will often be something like a "name" or "user field
|
||||
2. Do NOT extract any properties that contain dates
|
||||
|
||||
Respond with a JSON object in the following format:
|
||||
{{
|
||||
"extracted_nodes": [
|
||||
{{
|
||||
"name": "Unique identifier for the node (use the speaker's name for speaker nodes)",
|
||||
"labels": ["Entity", "Speaker" for speaker nodes, "OptionalAdditionalLabel"],
|
||||
"summary": "Brief summary of the node's role or significance"
|
||||
}}
|
||||
]
|
||||
"extracted_node_names": ["Name of the extracted entity", ...],
|
||||
}}
|
||||
"""
|
||||
return [
|
||||
@ -153,36 +100,26 @@ Respond with a JSON object in the following format:
|
||||
|
||||
|
||||
def extract_text(context: dict[str, Any]) -> list[Message]:
|
||||
sys_prompt = """You are an AI assistant that extracts entity nodes from conversational text. Your primary task is to identify and extract the speaker and other significant entities mentioned in the conversation."""
|
||||
sys_prompt = """You are an AI assistant that extracts entity nodes from text. Your primary task is to identify and extract the speaker and other significant entities mentioned in the provided text."""
|
||||
|
||||
user_prompt = f"""
|
||||
Given the following conversation, extract entity nodes from the CURRENT MESSAGE that are explicitly or implicitly mentioned:
|
||||
|
||||
<CONVERSATION>
|
||||
{json.dumps([ep['content'] for ep in context['previous_episodes']], indent=2)}
|
||||
</CONVERSATION>
|
||||
<CURRENT MESSAGE>
|
||||
<TEXT>
|
||||
{context["episode_content"]}
|
||||
</CURRENT MESSAGE>
|
||||
</TEXT>
|
||||
|
||||
{context['custom_prompt']}
|
||||
|
||||
Given the following text, extract entity nodes from the TEXT that are explicitly or implicitly mentioned:
|
||||
|
||||
Guidelines:
|
||||
1. Extract significant entities, concepts, or actors mentioned in the conversation.
|
||||
2. Provide concise but informative summaries for each extracted node.
|
||||
3. Avoid creating nodes for relationships or actions.
|
||||
4. Avoid creating nodes for temporal information like dates, times or years (these will be added to edges later).
|
||||
5. Be as explicit as possible in your node names, using full names and avoiding abbreviations.
|
||||
2. Avoid creating nodes for relationships or actions.
|
||||
3. Avoid creating nodes for temporal information like dates, times or years (these will be added to edges later).
|
||||
4. Be as explicit as possible in your node names, using full names and avoiding abbreviations.
|
||||
|
||||
Respond with a JSON object in the following format:
|
||||
{{
|
||||
"extracted_nodes": [
|
||||
{{
|
||||
"name": "Unique identifier for the node (use the speaker's name for speaker nodes)",
|
||||
"labels": ["Entity", "OptionalAdditionalLabel"],
|
||||
"summary": "Brief summary of the node's role or significance"
|
||||
}}
|
||||
]
|
||||
"extracted_node_names": ["Name of the extracted entity", ...],
|
||||
}}
|
||||
"""
|
||||
return [
|
||||
@ -195,19 +132,20 @@ def reflexion(context: dict[str, Any]) -> list[Message]:
|
||||
sys_prompt = """You are an AI assistant that determines which entities have not been extracted from the given context"""
|
||||
|
||||
user_prompt = f"""
|
||||
Given the following conversation, current message, and list of extracted entities; determine if any entities haven't been
|
||||
extracted:
|
||||
|
||||
<CONVERSATION>:
|
||||
{json.dumps([ep['content'] for ep in context['previous_episodes']], indent=2)}
|
||||
</CONVERSATION>
|
||||
<PREVIOUS MESSAGES>
|
||||
{json.dumps([ep for ep in context['previous_episodes']], indent=2)}
|
||||
</PREVIOUS MESSAGES>
|
||||
<CURRENT MESSAGE>
|
||||
{context["episode_content"]}
|
||||
</CURRENT MESSAGE>
|
||||
|
||||
<EXTRACTED ENTITIES>
|
||||
{context["extracted_entities"]}
|
||||
</EXTRACTED ENTITIES>
|
||||
|
||||
Given the above previous messages, current message, and list of extracted entities; determine if any entities haven't been
|
||||
extracted:
|
||||
|
||||
Respond with a JSON object in the following format:
|
||||
{{
|
||||
"missed_entities": [ "name of entity that wasn't extracted", ...]
|
||||
@ -220,8 +158,7 @@ Respond with a JSON object in the following format:
|
||||
|
||||
|
||||
versions: Versions = {
|
||||
'v1': v1,
|
||||
'v2': v2,
|
||||
'extract_message': extract_message,
|
||||
'extract_json': extract_json,
|
||||
'extract_text': extract_text,
|
||||
'reflexion': reflexion,
|
||||
|
||||
@ -22,11 +22,13 @@ from .models import Message, PromptFunction, PromptVersion
|
||||
|
||||
class Prompt(Protocol):
|
||||
summarize_pair: PromptVersion
|
||||
summarize_context: PromptVersion
|
||||
summary_description: PromptVersion
|
||||
|
||||
|
||||
class Versions(TypedDict):
|
||||
summarize_pair: PromptFunction
|
||||
summarize_context: PromptFunction
|
||||
summary_description: PromptFunction
|
||||
|
||||
|
||||
@ -53,6 +55,39 @@ def summarize_pair(context: dict[str, Any]) -> list[Message]:
|
||||
]
|
||||
|
||||
|
||||
def summarize_context(context: dict[str, Any]) -> list[Message]:
|
||||
return [
|
||||
Message(
|
||||
role='system',
|
||||
content='You are a helpful assistant that combines summaries with new conversation context.',
|
||||
),
|
||||
Message(
|
||||
role='user',
|
||||
content=f"""
|
||||
|
||||
<MESSAGES>
|
||||
{json.dumps(context['previous_episodes'], indent=2)}
|
||||
{json.dumps(context['episode_content'], indent=2)}
|
||||
</MESSAGES>
|
||||
|
||||
Given the above MESSAGES and the following ENTITY name, create a summary for the ENTITY. Your summary must only use
|
||||
information from the provided MESSAGES. Your summary should also only contain information relevant to the
|
||||
provided ENTITY.
|
||||
|
||||
<ENTITY>
|
||||
{context['node_name']}
|
||||
</ENTITY>
|
||||
|
||||
|
||||
Respond with a JSON object in the following format:
|
||||
{{
|
||||
"summary": "Entity summary"
|
||||
}}
|
||||
""",
|
||||
),
|
||||
]
|
||||
|
||||
|
||||
def summary_description(context: dict[str, Any]) -> list[Message]:
|
||||
return [
|
||||
Message(
|
||||
@ -76,4 +111,8 @@ def summary_description(context: dict[str, Any]) -> list[Message]:
|
||||
]
|
||||
|
||||
|
||||
versions: Versions = {'summarize_pair': summarize_pair, 'summary_description': summary_description}
|
||||
versions: Versions = {
|
||||
'summarize_pair': summarize_pair,
|
||||
'summarize_context': summarize_context,
|
||||
'summary_description': summary_description,
|
||||
}
|
||||
|
||||
@ -78,30 +78,25 @@ async def extract_edges(
|
||||
) -> list[EntityEdge]:
|
||||
start = time()
|
||||
|
||||
node_uuids_by_name_map = {node.name: node.uuid for node in nodes}
|
||||
|
||||
# Prepare context for LLM
|
||||
context = {
|
||||
'episode_content': episode.content,
|
||||
'episode_timestamp': (episode.valid_at.isoformat() if episode.valid_at else None),
|
||||
'nodes': [
|
||||
{'uuid': node.uuid, 'name': node.name, 'summary': node.summary} for node in nodes
|
||||
],
|
||||
'previous_episodes': [
|
||||
{
|
||||
'content': ep.content,
|
||||
'timestamp': ep.valid_at.isoformat() if ep.valid_at else None,
|
||||
}
|
||||
for ep in previous_episodes
|
||||
],
|
||||
'nodes': [node.name for node in nodes],
|
||||
'previous_episodes': [ep.content for ep in previous_episodes],
|
||||
'custom_prompt': '',
|
||||
}
|
||||
|
||||
facts_missed = True
|
||||
reflexion_iterations = 0
|
||||
while facts_missed and reflexion_iterations < MAX_REFLEXION_ITERATIONS:
|
||||
llm_response = await llm_client.generate_response(prompt_library.extract_edges.v2(context))
|
||||
llm_response = await llm_client.generate_response(
|
||||
prompt_library.extract_edges.edge(context)
|
||||
)
|
||||
edges_data = llm_response.get('edges', [])
|
||||
|
||||
context['extracted_facts'] = [edge_data['fact'] for edge_data in edges_data]
|
||||
context['extracted_facts'] = [edge_data.get('fact', '') for edge_data in edges_data]
|
||||
|
||||
reflexion_iterations += 1
|
||||
if reflexion_iterations < MAX_REFLEXION_ITERATIONS:
|
||||
@ -125,22 +120,25 @@ async def extract_edges(
|
||||
# Convert the extracted data into EntityEdge objects
|
||||
edges = []
|
||||
for edge_data in edges_data:
|
||||
if edge_data['target_node_uuid'] and edge_data['source_node_uuid']:
|
||||
edge = EntityEdge(
|
||||
source_node_uuid=edge_data['source_node_uuid'],
|
||||
target_node_uuid=edge_data['target_node_uuid'],
|
||||
name=edge_data['relation_type'],
|
||||
group_id=group_id,
|
||||
fact=edge_data['fact'],
|
||||
episodes=[episode.uuid],
|
||||
created_at=datetime.now(timezone.utc),
|
||||
valid_at=None,
|
||||
invalid_at=None,
|
||||
)
|
||||
edges.append(edge)
|
||||
logger.debug(
|
||||
f'Created new edge: {edge.name} from (UUID: {edge.source_node_uuid}) to (UUID: {edge.target_node_uuid})'
|
||||
)
|
||||
edge = EntityEdge(
|
||||
source_node_uuid=node_uuids_by_name_map.get(
|
||||
edge_data.get('source_entity_name', ''), ''
|
||||
),
|
||||
target_node_uuid=node_uuids_by_name_map.get(
|
||||
edge_data.get('target_entity_name', ''), ''
|
||||
),
|
||||
name=edge_data.get('relation_type', ''),
|
||||
group_id=group_id,
|
||||
fact=edge_data.get('fact', ''),
|
||||
episodes=[episode.uuid],
|
||||
created_at=datetime.now(timezone.utc),
|
||||
valid_at=None,
|
||||
invalid_at=None,
|
||||
)
|
||||
edges.append(edge)
|
||||
logger.debug(
|
||||
f'Created new edge: {edge.name} from (UUID: {edge.source_node_uuid}) to (UUID: {edge.target_node_uuid})'
|
||||
)
|
||||
|
||||
return edges
|
||||
|
||||
@ -165,7 +163,7 @@ async def dedupe_extracted_edges(
|
||||
],
|
||||
}
|
||||
|
||||
llm_response = await llm_client.generate_response(prompt_library.dedupe_edges.v1(context))
|
||||
llm_response = await llm_client.generate_response(prompt_library.dedupe_edges.edge(context))
|
||||
duplicate_data = llm_response.get('duplicates', [])
|
||||
logger.debug(f'Extracted unique edges: {duplicate_data}')
|
||||
|
||||
@ -320,7 +318,7 @@ async def dedupe_extracted_edge(
|
||||
'extracted_edges': extracted_edge_context,
|
||||
}
|
||||
|
||||
llm_response = await llm_client.generate_response(prompt_library.dedupe_edges.v3(context))
|
||||
llm_response = await llm_client.generate_response(prompt_library.dedupe_edges.edge(context))
|
||||
|
||||
is_duplicate: bool = llm_response.get('is_duplicate', False)
|
||||
uuid: str | None = llm_response.get('uuid', None)
|
||||
|
||||
@ -18,7 +18,6 @@ import asyncio
|
||||
import logging
|
||||
from datetime import datetime, timezone
|
||||
from time import time
|
||||
from typing import Any
|
||||
|
||||
from graphiti_core.helpers import MAX_REFLEXION_ITERATIONS
|
||||
from graphiti_core.llm_client import LLMClient
|
||||
@ -33,24 +32,20 @@ async def extract_message_nodes(
|
||||
episode: EpisodicNode,
|
||||
previous_episodes: list[EpisodicNode],
|
||||
custom_prompt='',
|
||||
) -> list[dict[str, Any]]:
|
||||
) -> list[str]:
|
||||
# Prepare context for LLM
|
||||
context = {
|
||||
'episode_content': episode.content,
|
||||
'episode_timestamp': episode.valid_at.isoformat(),
|
||||
'previous_episodes': [
|
||||
{
|
||||
'content': ep.content,
|
||||
'timestamp': ep.valid_at.isoformat(),
|
||||
}
|
||||
for ep in previous_episodes
|
||||
],
|
||||
'previous_episodes': [ep.content for ep in previous_episodes],
|
||||
'custom_prompt': custom_prompt,
|
||||
}
|
||||
|
||||
llm_response = await llm_client.generate_response(prompt_library.extract_nodes.v2(context))
|
||||
extracted_node_data = llm_response.get('extracted_nodes', [])
|
||||
return extracted_node_data
|
||||
llm_response = await llm_client.generate_response(
|
||||
prompt_library.extract_nodes.extract_message(context)
|
||||
)
|
||||
extracted_node_names = llm_response.get('extracted_node_names', [])
|
||||
return extracted_node_names
|
||||
|
||||
|
||||
async def extract_text_nodes(
|
||||
@ -58,31 +53,25 @@ async def extract_text_nodes(
|
||||
episode: EpisodicNode,
|
||||
previous_episodes: list[EpisodicNode],
|
||||
custom_prompt='',
|
||||
) -> list[dict[str, Any]]:
|
||||
) -> list[str]:
|
||||
# Prepare context for LLM
|
||||
context = {
|
||||
'episode_content': episode.content,
|
||||
'episode_timestamp': episode.valid_at.isoformat(),
|
||||
'previous_episodes': [
|
||||
{
|
||||
'content': ep.content,
|
||||
'timestamp': ep.valid_at.isoformat(),
|
||||
}
|
||||
for ep in previous_episodes
|
||||
],
|
||||
'previous_episodes': [ep.content for ep in previous_episodes],
|
||||
'custom_prompt': custom_prompt,
|
||||
}
|
||||
|
||||
llm_response = await llm_client.generate_response(
|
||||
prompt_library.extract_nodes.extract_text(context)
|
||||
)
|
||||
extracted_node_data = llm_response.get('extracted_nodes', [])
|
||||
return extracted_node_data
|
||||
extracted_node_names = llm_response.get('extracted_node_names', [])
|
||||
return extracted_node_names
|
||||
|
||||
|
||||
async def extract_json_nodes(
|
||||
llm_client: LLMClient, episode: EpisodicNode, custom_prompt=''
|
||||
) -> list[dict[str, Any]]:
|
||||
) -> list[str]:
|
||||
# Prepare context for LLM
|
||||
context = {
|
||||
'episode_content': episode.content,
|
||||
@ -94,8 +83,8 @@ async def extract_json_nodes(
|
||||
llm_response = await llm_client.generate_response(
|
||||
prompt_library.extract_nodes.extract_json(context)
|
||||
)
|
||||
extracted_node_data = llm_response.get('extracted_nodes', [])
|
||||
return extracted_node_data
|
||||
extracted_node_names = llm_response.get('extracted_node_names', [])
|
||||
return extracted_node_names
|
||||
|
||||
|
||||
async def extract_nodes_reflexion(
|
||||
@ -107,13 +96,7 @@ async def extract_nodes_reflexion(
|
||||
# Prepare context for LLM
|
||||
context = {
|
||||
'episode_content': episode.content,
|
||||
'previous_episodes': [
|
||||
{
|
||||
'content': ep.content,
|
||||
'timestamp': ep.valid_at.isoformat(),
|
||||
}
|
||||
for ep in previous_episodes
|
||||
],
|
||||
'previous_episodes': [ep.content for ep in previous_episodes],
|
||||
'extracted_entities': node_names,
|
||||
}
|
||||
|
||||
@ -131,27 +114,26 @@ async def extract_nodes(
|
||||
previous_episodes: list[EpisodicNode],
|
||||
) -> list[EntityNode]:
|
||||
start = time()
|
||||
extracted_node_data: list[dict[str, Any]] = []
|
||||
extracted_node_names: list[str] = []
|
||||
custom_prompt = ''
|
||||
entities_missed = True
|
||||
reflexion_iterations = 0
|
||||
while entities_missed and reflexion_iterations < MAX_REFLEXION_ITERATIONS:
|
||||
if episode.source == EpisodeType.message:
|
||||
extracted_node_data = await extract_message_nodes(
|
||||
extracted_node_names = await extract_message_nodes(
|
||||
llm_client, episode, previous_episodes, custom_prompt
|
||||
)
|
||||
elif episode.source == EpisodeType.text:
|
||||
extracted_node_data = await extract_text_nodes(
|
||||
extracted_node_names = await extract_text_nodes(
|
||||
llm_client, episode, previous_episodes, custom_prompt
|
||||
)
|
||||
elif episode.source == EpisodeType.json:
|
||||
extracted_node_data = await extract_json_nodes(llm_client, episode, custom_prompt)
|
||||
extracted_node_names = await extract_json_nodes(llm_client, episode, custom_prompt)
|
||||
|
||||
reflexion_iterations += 1
|
||||
if reflexion_iterations < MAX_REFLEXION_ITERATIONS:
|
||||
entity_names = [node_data['name'] for node_data in extracted_node_data]
|
||||
missing_entities = await extract_nodes_reflexion(
|
||||
llm_client, episode, previous_episodes, entity_names
|
||||
llm_client, episode, previous_episodes, extracted_node_names
|
||||
)
|
||||
|
||||
entities_missed = len(missing_entities) != 0
|
||||
@ -161,15 +143,15 @@ async def extract_nodes(
|
||||
custom_prompt += f'\n{entity},'
|
||||
|
||||
end = time()
|
||||
logger.debug(f'Extracted new nodes: {extracted_node_data} in {(end - start) * 1000} ms')
|
||||
logger.debug(f'Extracted new nodes: {extracted_node_names} in {(end - start) * 1000} ms')
|
||||
# Convert the extracted data into EntityNode objects
|
||||
new_nodes = []
|
||||
for node_data in extracted_node_data:
|
||||
for name in extracted_node_names:
|
||||
new_node = EntityNode(
|
||||
name=node_data['name'],
|
||||
name=name,
|
||||
group_id=episode.group_id,
|
||||
labels=node_data['labels'],
|
||||
summary=node_data['summary'],
|
||||
labels=['Entity'],
|
||||
summary='',
|
||||
created_at=datetime.now(timezone.utc),
|
||||
)
|
||||
new_nodes.append(new_node)
|
||||
@ -204,7 +186,7 @@ async def dedupe_extracted_nodes(
|
||||
'extracted_nodes': extracted_nodes_context,
|
||||
}
|
||||
|
||||
llm_response = await llm_client.generate_response(prompt_library.dedupe_nodes.v2(context))
|
||||
llm_response = await llm_client.generate_response(prompt_library.dedupe_nodes.node(context))
|
||||
|
||||
duplicate_data = llm_response.get('duplicates', [])
|
||||
|
||||
@ -232,13 +214,17 @@ async def resolve_extracted_nodes(
|
||||
llm_client: LLMClient,
|
||||
extracted_nodes: list[EntityNode],
|
||||
existing_nodes_lists: list[list[EntityNode]],
|
||||
episode: EpisodicNode | None = None,
|
||||
previous_episodes: list[EpisodicNode] | None = None,
|
||||
) -> tuple[list[EntityNode], dict[str, str]]:
|
||||
uuid_map: dict[str, str] = {}
|
||||
resolved_nodes: list[EntityNode] = []
|
||||
results: list[tuple[EntityNode, dict[str, str]]] = list(
|
||||
await asyncio.gather(
|
||||
*[
|
||||
resolve_extracted_node(llm_client, extracted_node, existing_nodes)
|
||||
resolve_extracted_node(
|
||||
llm_client, extracted_node, existing_nodes, episode, previous_episodes
|
||||
)
|
||||
for extracted_node, existing_nodes in zip(extracted_nodes, existing_nodes_lists)
|
||||
]
|
||||
)
|
||||
@ -252,14 +238,16 @@ async def resolve_extracted_nodes(
|
||||
|
||||
|
||||
async def resolve_extracted_node(
|
||||
llm_client: LLMClient, extracted_node: EntityNode, existing_nodes: list[EntityNode]
|
||||
llm_client: LLMClient,
|
||||
extracted_node: EntityNode,
|
||||
existing_nodes: list[EntityNode],
|
||||
episode: EpisodicNode | None = None,
|
||||
previous_episodes: list[EpisodicNode] | None = None,
|
||||
) -> tuple[EntityNode, dict[str, str]]:
|
||||
start = time()
|
||||
|
||||
# Prepare context for LLM
|
||||
existing_nodes_context = [
|
||||
{'uuid': node.uuid, 'name': node.name, 'summary': node.summary} for node in existing_nodes
|
||||
]
|
||||
existing_nodes_context = [{'uuid': node.uuid, 'name': node.name} for node in existing_nodes]
|
||||
|
||||
extracted_node_context = {
|
||||
'uuid': extracted_node.uuid,
|
||||
@ -270,13 +258,32 @@ async def resolve_extracted_node(
|
||||
context = {
|
||||
'existing_nodes': existing_nodes_context,
|
||||
'extracted_nodes': extracted_node_context,
|
||||
'episode_content': episode.content if episode is not None else '',
|
||||
'previous_episodes': [ep.content for ep in previous_episodes]
|
||||
if previous_episodes is not None
|
||||
else [],
|
||||
}
|
||||
|
||||
llm_response = await llm_client.generate_response(prompt_library.dedupe_nodes.v3(context))
|
||||
summary_context = {
|
||||
'node_name': extracted_node.name,
|
||||
'episode_content': episode.content if episode is not None else '',
|
||||
'previous_episodes': [ep.content for ep in previous_episodes]
|
||||
if previous_episodes is not None
|
||||
else [],
|
||||
}
|
||||
|
||||
llm_response, node_summary_response = await asyncio.gather(
|
||||
llm_client.generate_response(prompt_library.dedupe_nodes.node(context)),
|
||||
llm_client.generate_response(
|
||||
prompt_library.summarize_nodes.summarize_context(summary_context)
|
||||
),
|
||||
)
|
||||
|
||||
extracted_node.summary = node_summary_response.get('summary', '')
|
||||
|
||||
is_duplicate: bool = llm_response.get('is_duplicate', False)
|
||||
uuid: str | None = llm_response.get('uuid', None)
|
||||
summary = llm_response.get('summary', '')
|
||||
name = llm_response.get('name', '')
|
||||
|
||||
node = extracted_node
|
||||
uuid_map: dict[str, str] = {}
|
||||
@ -284,8 +291,14 @@ async def resolve_extracted_node(
|
||||
for existing_node in existing_nodes:
|
||||
if existing_node.uuid != uuid:
|
||||
continue
|
||||
summary_response = await llm_client.generate_response(
|
||||
prompt_library.summarize_nodes.summarize_pair(
|
||||
{'node_summaries': [extracted_node.summary, existing_node.summary]}
|
||||
)
|
||||
)
|
||||
node = existing_node
|
||||
node.summary = summary
|
||||
node.name = name
|
||||
node.summary = summary_response.get('summary', '')
|
||||
uuid_map[extracted_node.uuid] = existing_node.uuid
|
||||
|
||||
end = time()
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user