{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# GraphRAG Quickstart" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Prerequisite installs to run the quickstart notebook\n", "Install 3rd party packages that are not part of the Python Standard Library" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "! pip install devtools pandas python-magic requests tqdm" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import getpass\n", "import json\n", "import sys\n", "import time\n", "from pathlib import Path\n", "\n", "import magic\n", "import pandas as pd\n", "import requests\n", "from devtools import pprint\n", "from tqdm import tqdm" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Configuration - API Key, file directions and API endpoints" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Get API Key for API Management Service\n", "For authentication, the API requires a *subscription key* to be passed in the header of all requests. To find this key, visit the Azure Portal. The API subscription key will be located under ` --> --> --> --> Primary Key`." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "ocp_apim_subscription_key = getpass.getpass(\n", " \"Enter the subscription key to the GraphRag APIM:\"\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Setup directories and API endpoint\n", "\n", "The following parameters are required to access and use the GraphRAG solution accelerator API:\n", "* file_directory\n", "* storage_name\n", "* index_name\n", "* endpoint\n", "\n", "For demonstration purposes, you may use the provided `get-wiki-articles.py` script to download a small set of wikipedia articles or provide your own data." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "\"\"\"\n", "These parameters must be defined by the user:\n", "\n", "- file_directory: local directory where data files of interest are stored.\n", "- storage_name: unique name for an Azure blob storage container where files will be uploaded.\n", "- index_name: unique name for a single knowledge graph construction. Multiple indexes can be created from the same blob container of data.\n", "- apim_url: the endpoint URL for GraphRAG service (this is the Gateway URL found in the APIM resource).\n", "\"\"\"\n", "\n", "file_directory = \"\"\n", "storage_name = \"\"\n", "index_name = \"\"\n", "apim_url = \"\"" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "assert (\n", " file_directory != \"\" and storage_name != \"\" and index_name != \"\" and apim_url != \"\"\n", ")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "\"\"\"\n", "\"Ocp-Apim-Subscription-Key\": \n", " This is a custom HTTP header used by Azure API Management service (APIM) to \n", " authenticate API requests. The value for this key should be set to the subscription \n", " key provided by the Azure APIM instance in your GraphRAG resource group.\n", "\"\"\"\n", "\n", "headers = {\"Ocp-Apim-Subscription-Key\": ocp_apim_subscription_key}" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Upload Files to Storage Data" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def upload_files(\n", " file_directory: str,\n", " storage_name: str,\n", " batch_size: int = 100,\n", " overwrite: bool = True,\n", " max_retries: int = 5,\n", ") -> requests.Response | list[Path]:\n", " \"\"\"\n", " Upload files to a blob storage container.\n", "\n", " Args:\n", " file_directory - a local directory of .txt files to upload. All files must be in utf-8 encoding.\n", " storage_name - a unique name for the Azure storage container.\n", " batch_size - the number of files to upload in a single batch.\n", " overwrite - whether or not to overwrite files if they already exist in the storage container.\n", " max_retries - the maximum number of times to retry uploading a batch of files if the API is busy.\n", "\n", " NOTE: Uploading files may sometimes fail if the blob container was recently deleted\n", " (i.e. a few seconds before. The solution \"in practice\" is to sleep a few seconds and try again.\n", " \"\"\"\n", " url = apim_url + \"/data\"\n", "\n", " def upload_batch(\n", " files: list, storage_name: str, overwrite: bool, max_retries: int\n", " ) -> requests.Response:\n", " for _ in range(max_retries):\n", " response = requests.post(\n", " url=url,\n", " files=files,\n", " params={\"storage_name\": storage_name, \"overwrite\": overwrite},\n", " headers=headers,\n", " )\n", " # API may be busy, retry\n", " if response.status_code == 500:\n", " print(\"API busy. Sleeping and will try again.\")\n", " time.sleep(10)\n", " continue\n", " return response\n", " return response\n", "\n", " batch_files = []\n", " accepted_file_types = [\"text/plain\"]\n", " filepaths = list(Path(file_directory).iterdir())\n", " for file in tqdm(filepaths):\n", " # validate that file is a file, has acceptable file type, has a .txt extension, and has utf-8 encoding\n", " if (\n", " not file.is_file()\n", " or file.suffix != \".txt\"\n", " or magic.from_file(str(file), mime=True) not in accepted_file_types\n", " ):\n", " print(f\"Skipping invalid file: {file}\")\n", " continue\n", " # open and decode file as utf-8, ignore bad characters\n", " batch_files.append(\n", " (\"files\", open(file=file, mode=\"r\", encoding=\"utf-8\", errors=\"ignore\"))\n", " )\n", " # upload batch of files\n", " if len(batch_files) == batch_size:\n", " response = upload_batch(batch_files, storage_name, overwrite, max_retries)\n", " # if response is not ok, return early\n", " if not response.ok:\n", " return response\n", " batch_files.clear()\n", " # upload remaining files\n", " if len(batch_files) > 0:\n", " response = upload_batch(batch_files, storage_name, overwrite, max_retries)\n", " return response" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "response = upload_files(\n", " file_directory=file_directory,\n", " storage_name=storage_name,\n", " batch_size=100,\n", " overwrite=True,\n", ")\n", "if not response.ok:\n", " print(response.text)\n", "else:\n", " print(response)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Create an Index\n", "\n", "After data files have been uploaded, it is now possible to construct a knowledge graph by creating a search index. If an entity configuration is not provided, a default entity configuration will be used that has been shown to generally work well." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def build_index(\n", " storage_name: str,\n", " index_name: str,\n", ") -> requests.Response:\n", " \"\"\"Create a search index.\n", " This function kicks off a job that builds a knowledge graph (KG) index from files located in a blob storage container.\n", " \"\"\"\n", " url = apim_url + \"/index\"\n", " request = {\n", " \"storage_name\": storage_name,\n", " \"index_name\": index_name\n", " }\n", " return requests.post(url, params=request, headers=headers)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "response = build_index(\n", " storage_name=storage_name,\n", " index_name=index_name\n", ")\n", "print(response)\n", "if response.ok:\n", " print(response.text)\n", "else:\n", " print(f\"Failed to submit job.\\nStatus: {response.text}\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Check the status of an indexing job\n", "\n", "Please wait for your index to reach 100 percent complete before continuing on to the next section to run queries." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def index_status(index_name: str) -> requests.Response:\n", " url = apim_url + f\"/index/status/{index_name}\"\n", " return requests.get(url, headers=headers)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "response = index_status(index_name)\n", "\n", "pprint(response.json())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Query\n", "\n", "After an indexing job has completed, the knowledge graph is ready to query. Two types of queries (global and local) are currently supported. In addition, you can issue a query over a single index or multiple indexes." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "\"\"\"Needed helper function to parse out the clear result from the query response. \"\"\"\n", "def parse_query_response(\n", " response: requests.Response, return_context_data: bool = False\n", ") -> requests.Response | dict[list[dict]]:\n", " \"\"\"\n", " Prints response['result'] value and optionally\n", " returns associated context data.\n", " \"\"\"\n", " if response.ok:\n", " print(json.loads(response.text)[\"result\"])\n", " if return_context_data:\n", " return json.loads(response.text)[\"context_data\"]\n", " return response\n", " else:\n", " print(response.reason)\n", " print(response.content)\n", " return response" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Global Query \n", "\n", "Global search queries are resource-intensive, but give good responses to questions that require an understanding of the dataset as a whole." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def global_search(index_name: str | list[str], query: str) -> requests.Response:\n", " \"\"\"Run a global query over the knowledge graph(s) associated with one or more indexes\"\"\"\n", " url = apim_url + \"/query/global\"\n", " request = {\"index_name\": index_name, \"query\": query}\n", " return requests.post(url, json=request, headers=headers)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%time\n", "# pass in a single index name as a string or to query across multiple indexes, set index_name=[myindex1, myindex2]\n", "global_response = global_search(\n", " index_name=index_name, query=\"Summarize the main topics of this data\"\n", ")\n", "# print the result and save context data in a variable\n", "global_response_data = parse_query_response(global_response, return_context_data=True)\n", "global_response_data" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Local Query\n", "\n", "Local search queries are best suited for narrow-focused questions that require an understanding of specific entities mentioned in the documents (e.g. What are the healing properties of chamomile?)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def local_search(index_name: str | list[str], query: str) -> requests.Response:\n", " \"\"\"Run a local query over the knowledge graph(s) associated with one or more indexes\"\"\"\n", " url = apim_url + \"/query/local\"\n", " request = {\"index_name\": index_name, \"query\": query}\n", " return requests.post(url, json=request, headers=headers)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%time\n", "# pass in a single index name as a string or to query across multiple indexes, set index_name=[myindex1, myindex2]\n", "local_response = local_search(\n", " index_name=index_name, query=\"Who are the primary actors in these communities?\"\n", ")\n", "# print the result and save context data in a variable\n", "local_response_data = parse_query_response(local_response, return_context_data=True)\n", "local_response_data" ] } ], "metadata": { "kernelspec": { "display_name": ".venv", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.10.14" } }, "nbformat": 4, "nbformat_minor": 2 }