mirror of
https://github.com/getzep/graphiti.git
synced 2025-06-27 02:00:02 +00:00

* Bump version from 0.9.0 to 0.9.1 in pyproject.toml and update google-genai dependency to >=0.1.0 * Bump version from 0.9.1 to 0.9.2 in pyproject.toml * Update google-genai dependency version to >=0.8.0 in pyproject.toml * loc file * Update pyproject.toml to version 0.9.3, restructure dependencies, and modify author format. Remove outdated Google API key note from README.md. * upgrade poetry and ruff * Update README.md to include installation instructions for Graphiti with Google Gemini support * fix to deps since peotry doesn't fully implement PEP 735 * Refactor string formatting in various files to use single quotes for consistency and improve readability. This includes updates in agent.ipynb, quickstart.py, multiple prompt files, and ingest.py and retrieve.py modules. * Remove optional dependencies from pyproject.toml to streamline project requirements.
112 lines
3.3 KiB
Python
112 lines
3.3 KiB
Python
import asyncio
|
|
from contextlib import asynccontextmanager
|
|
from functools import partial
|
|
|
|
from fastapi import APIRouter, FastAPI, status
|
|
from graphiti_core.nodes import EpisodeType # type: ignore
|
|
from graphiti_core.utils.maintenance.graph_data_operations import clear_data # type: ignore
|
|
|
|
from graph_service.dto import AddEntityNodeRequest, AddMessagesRequest, Message, Result
|
|
from graph_service.zep_graphiti import ZepGraphitiDep
|
|
|
|
|
|
class AsyncWorker:
|
|
def __init__(self):
|
|
self.queue = asyncio.Queue()
|
|
self.task = None
|
|
|
|
async def worker(self):
|
|
while True:
|
|
try:
|
|
print(f'Got a job: (size of remaining queue: {self.queue.qsize()})')
|
|
job = await self.queue.get()
|
|
await job()
|
|
except asyncio.CancelledError:
|
|
break
|
|
|
|
async def start(self):
|
|
self.task = asyncio.create_task(self.worker())
|
|
|
|
async def stop(self):
|
|
if self.task:
|
|
self.task.cancel()
|
|
await self.task
|
|
while not self.queue.empty():
|
|
self.queue.get_nowait()
|
|
|
|
|
|
async_worker = AsyncWorker()
|
|
|
|
|
|
@asynccontextmanager
|
|
async def lifespan(_: FastAPI):
|
|
await async_worker.start()
|
|
yield
|
|
await async_worker.stop()
|
|
|
|
|
|
router = APIRouter(lifespan=lifespan)
|
|
|
|
|
|
@router.post('/messages', status_code=status.HTTP_202_ACCEPTED)
|
|
async def add_messages(
|
|
request: AddMessagesRequest,
|
|
graphiti: ZepGraphitiDep,
|
|
):
|
|
async def add_messages_task(m: Message):
|
|
await graphiti.add_episode(
|
|
uuid=m.uuid,
|
|
group_id=request.group_id,
|
|
name=m.name,
|
|
episode_body=f'{m.role or ""}({m.role_type}): {m.content}',
|
|
reference_time=m.timestamp,
|
|
source=EpisodeType.message,
|
|
source_description=m.source_description,
|
|
)
|
|
|
|
for m in request.messages:
|
|
await async_worker.queue.put(partial(add_messages_task, m))
|
|
|
|
return Result(message='Messages added to processing queue', success=True)
|
|
|
|
|
|
@router.post('/entity-node', status_code=status.HTTP_201_CREATED)
|
|
async def add_entity_node(
|
|
request: AddEntityNodeRequest,
|
|
graphiti: ZepGraphitiDep,
|
|
):
|
|
node = await graphiti.save_entity_node(
|
|
uuid=request.uuid,
|
|
group_id=request.group_id,
|
|
name=request.name,
|
|
summary=request.summary,
|
|
)
|
|
return node
|
|
|
|
|
|
@router.delete('/entity-edge/{uuid}', status_code=status.HTTP_200_OK)
|
|
async def delete_entity_edge(uuid: str, graphiti: ZepGraphitiDep):
|
|
await graphiti.delete_entity_edge(uuid)
|
|
return Result(message='Entity Edge deleted', success=True)
|
|
|
|
|
|
@router.delete('/group/{group_id}', status_code=status.HTTP_200_OK)
|
|
async def delete_group(group_id: str, graphiti: ZepGraphitiDep):
|
|
await graphiti.delete_group(group_id)
|
|
return Result(message='Group deleted', success=True)
|
|
|
|
|
|
@router.delete('/episode/{uuid}', status_code=status.HTTP_200_OK)
|
|
async def delete_episode(uuid: str, graphiti: ZepGraphitiDep):
|
|
await graphiti.delete_episodic_node(uuid)
|
|
return Result(message='Episode deleted', success=True)
|
|
|
|
|
|
@router.post('/clear', status_code=status.HTTP_200_OK)
|
|
async def clear(
|
|
graphiti: ZepGraphitiDep,
|
|
):
|
|
await clear_data(graphiti.driver)
|
|
await graphiti.build_indices_and_constraints()
|
|
return Result(message='Graph cleared', success=True)
|