Docs update (#1408)

* Fix footer contrast

* Fix broken links

* Remove a few unneeded examples

* Point python API example to the whole folder

* Convert schema bullets to tables
This commit is contained in:
Nathan Evans 2024-11-14 19:26:29 -08:00 committed by GitHub
parent ec9cdcce4d
commit 425dbc60e3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
37 changed files with 94 additions and 1098 deletions

View File

@ -3,7 +3,7 @@
The following template can be used and stored as a `.env` in the the directory where you're are pointing
the `--root` parameter on your Indexing Pipeline execution.
For details about how to run the Indexing Pipeline, refer to the [Index CLI](../index/cli.md) documentation.
For details about how to run the Indexing Pipeline, refer to the [Index CLI](../cli.md) documentation.
## .env File Template

View File

@ -81,5 +81,5 @@ Make sure you have python3.10-dev installed or more generally `python<version>-d
### LLM call constantly exceeds TPM, RPM or time limits
`GRAPHRAG_LLM_THREAD_COUNT` and `GRAPHRAG_EMBEDDING_THREAD_COUNT` are both set to 50 by default. You can modify this values
`GRAPHRAG_LLM_THREAD_COUNT` and `GRAPHRAG_EMBEDDING_THREAD_COUNT` are both set to 50 by default. You can modify these values
to reduce concurrency. Please refer to the [Configuration Documents](config/overview.md)

View File

@ -85,7 +85,7 @@ deployment_name: <azure_model_deployment_name>
- For more details about configuring GraphRAG, see the [configuration documentation](config/overview.md).
- To learn more about Initialization, refer to the [Initialization documentation](config/init.md).
- For more details about using the CLI, refer to the [CLI documentation](query/cli.md).
- For more details about using the CLI, refer to the [CLI documentation](cli.md).
## Running the Indexing pipeline

View File

@ -4,86 +4,113 @@ The default pipeline produces a series of output tables that align with the [con
## Shared fields
All tables have two identifier fields:
- id: str - Generated UUID, assuring global uniqueness
- human_readable_id: int - This is an incremented short ID created per-run. For example, we use this short ID with generated summaries that print citations so they are easy to cross-reference visually.
| name | type | description |
| ----------------- | ---- | ----------- |
| id | str | Generated UUID, assuring global uniqueness |
| human_readable_id | int | This is an incremented short ID created per-run. For example, we use this short ID with generated summaries that print citations so they are easy to cross-reference visually. |
## create_final_communities
This is a list of the final communities generated by Leiden. Communities are strictly hierarchical, subdividing into children as the cluster affinity is narrowed.
- community: int - Leiden-generated cluster ID for the community. Note that these increment with depth, so they are unique through all levels of the community hierarchy. For this table, human_readable_id is a copy of the community ID rather than a plain increment.
- level: int - Depth of the community in the hierarchy.
- title: str - Friendly name of the community.
- entity_ids - List of entities that are members of the community.
- relationship_ids - List of relationships that are wholly within the community (source and target are both in the community).
- text_unit_ids - List of text units represented within the community.
- period - Date of ingest, used for incremental update merges.
- size - Size of the community (entity count), used for incremental update merges.
| name | type | description |
| ---------------- | ----- | ----------- |
| community | int | Leiden-generated cluster ID for the community. Note that these increment with depth, so they are unique through all levels of the community hierarchy. For this table, human_readable_id is a copy of the community ID rather than a plain increment. |
| level | int | Depth of the community in the hierarchy. |
| title | str | Friendly name of the community. |
| entity_ids | str[] | List of entities that are members of the community. |
| relationship_ids | str[] | List of relationships that are wholly within the community (source and target are both in the community). |
| text_unit_ids | str[] | List of text units represented within the community. |
| period | str | Date of ingest, used for incremental update merges. ISO8601 |
| size | int | Size of the community (entity count), used for incremental update merges. |
## create_final_community_reports
This is the list of summarized reports for each community.
- community: int - Short ID of the community this report applies to.
- level: int - Level of the community this report applies to.
- title: str - LM-generated title for the report.
- summary: str - LM-generated summary of the report.
- full_content: str - LM-generated full report.
- rank: float - LM-derived relevance ranking of the report based on member entity salience
- rank_explanation - LM-derived explanation of the rank.
- findings: dict - LM-derived list of the top 5-10 insights from the community. Contains `summary` and `explanation` values.
- full_content_json - Full JSON output as returned by the LM. Most fields are extracted into columns, but this JSON is sent for query summarization so we leave it to allow for prompt tuning to add fields/content by end users.
- period - Date of ingest, used for incremental update merges.
- size - Size of the community (entity count), used for incremental update merges.
| name | type | description |
| ----------------- | ----- | ----------- |
| community | int | Short ID of the community this report applies to. |
| level | int | Level of the community this report applies to. |
| title | str | LM-generated title for the report. |
| summary | str | LM-generated summary of the report. |
| full_content | str | LM-generated full report. |
| rank | float | LM-derived relevance ranking of the report based on member entity salience
| rank_explanation | str | LM-derived explanation of the rank. |
| findings | dict | LM-derived list of the top 5-10 insights from the community. Contains `summary` and `explanation` values. |
| full_content_json | json | Full JSON output as returned by the LM. Most fields are extracted into columns, but this JSON is sent for query summarization so we leave it to allow for prompt tuning to add fields/content by end users. |
| period | str | Date of ingest, used for incremental update merges. ISO8601 |
| size | int | Size of the community (entity count), used for incremental update merges. |
## create_final_covariates
(Optional) If claim extraction is turned on, this is a list of the extracted covariates. Note that claims are typically oriented around identifying malicious behavior such as fraud, so they are not useful for all datasets.
- covariate_type: str - This is always "claim" with our default covariates.
- type: str - Nature of the claim type.
- description: str - LM-generated description of the behavior.
- subject_id: str - Name of the source entity (that is performing the claimed behavior).
- object_id: str - Name of the target entity (that the claimed behavior is performed on).
- status: str [TRUE, FALSE, SUSPECTED] - LM-derived assessment of the correctness of the claim.
- start_date: str (ISO8601) - LM-derived start of the claimed activity.
- end_date: str (ISO8601) - LM-derived end of the claimed activity.
- source_text: str - Short string of text containing the claimed behavior.
- text_unit_id: str - ID of the text unit the claim text was extracted from.
| name | type | description |
| -------------- | ---- | ----------- |
| covariate_type | str | This is always "claim" with our default covariates. |
| type | str | Nature of the claim type. |
| description | str | LM-generated description of the behavior. |
| subject_id | str | Name of the source entity (that is performing the claimed behavior). |
| object_id | str | Name of the target entity (that the claimed behavior is performed on). |
| status | str | LM-derived assessment of the correctness of the claim. One of [TRUE, FALSE, SUSPECTED] |
| start_date | str | LM-derived start of the claimed activity. ISO8601 |
| end_date | str | LM-derived end of the claimed activity. ISO8601 |
| source_text | str | Short string of text containing the claimed behavior. |
| text_unit_id | str | ID of the text unit the claim text was extracted from. |
## create_final_documents
List of document content after import.
- title: str - Filename, unless otherwise configured during CSV import.
- text: str - Full text of the document.
- text_unit_ids: str[] - List of text units (chunks) that were parsed from the document.
- attributes: dict (optional) - If specified during CSV import, this is a dict of attributes for the document.
# create_final_entities
| name | type | description |
| ------------- | ----- | ----------- |
| title | str | Filename, unless otherwise configured during CSV import. |
| text | str | Full text of the document. |
| text_unit_ids | str[] | List of text units (chunks) that were parsed from the document. |
| attributes | dict | (optional) If specified during CSV import, this is a dict of attributes for the document. |
## create_final_entities
List of all entities found in the data by the LM.
- title: str - Name of the entity.
- type: str - Type of the entity. By default this will be "organization", "person", "geo", or "event" unless configured differently or auto-tuning is used.
- description: str - Textual description of the entity. Entities may be found in many text units, so this is an LM-derived summary of all descriptions.
- text_unit_ids: str[] - List of the text units containing the entity.
# create_final_nodes
| name | type | description |
| ------------- | ----- | ----------- |
| title | str | Name of the entity. |
| type | str | Type of the entity. By default this will be "organization", "person", "geo", or "event" unless configured differently or auto-tuning is used. |
| description | str | Textual description of the entity. Entities may be found in many text units, so this is an LM-derived summary of all descriptions. |
| text_unit_ids | str[] | List of the text units containing the entity. |
## create_final_nodes
This is graph-related information for the entities. It contains only information relevant to the graph such as community. There is an entry for each entity at every community level it is found within, so you may see "duplicate" entities.
Note that the ID fields match those in create_final_entities and can be used for joining if additional information about a node is required.
- title: str - Name of the referenced entity. Duplicated from create_final_entities for convenient cross-referencing.
- community: int - Leiden community the node is found within. Entities are not always assigned a community (they may not be close enough to any), so they may have a ID of -1.
- level: int - Level of the community the entity is in.
- degree: int - Node degree (connectedness) in the graph.
- x: float - X position of the node for visual layouts. If graph embeddings and UMAP are not turned on, this will be 0.
- y: float - Y position of the node for visual layouts. If graph embeddings and UMAP are not turned on, this will be 0.
| name | type | description |
| --------- | ----- | ----------- |
| title | str | Name of the referenced entity. Duplicated from create_final_entities for convenient cross-referencing. |
| community | int | Leiden community the node is found within. Entities are not always assigned a community (they may not be close enough to any), so they may have a ID of -1. |
| level | int | Level of the community the entity is in. |
| degree | int | Node degree (connectedness) in the graph. |
| x | float | X position of the node for visual layouts. If graph embeddings and UMAP are not turned on, this will be 0. |
| y | float | Y position of the node for visual layouts. If graph embeddings and UMAP are not turned on, this will be 0. |
## create_final_relationships
List of all entity-to-entity relationships found in the data by the LM. This is also the _edge list_ for the graph.
- source: str - Name of the source entity.
- target: str - Name of the target entity.
- description: str - LM-derived description of the relationship. Also see note for entity descriptions.
- weight: float - Weight of the edge in the graph. This is summed from an LM-derived "strength" measure for each relationship instance.
- combined_degree: int - Sum of source and target node degrees.
- text_unit_ids: str[] - List of text units the relationship was found within.
| name | type | description |
| --------------- | ----- | ----------- |
| source | str | Name of the source entity. |
| target | str | Name of the target entity. |
| description | str | LM-derived description of the relationship. Also see note for entity descriptions. |
| weight | float | Weight of the edge in the graph. This is summed from an LM-derived "strength" measure for each relationship instance. |
| combined_degree | int | Sum of source and target node degrees. |
| text_unit_ids | str[] | List of text units the relationship was found within. |
## create_final_text_units
List of all text chunks parsed from the input documents.
- text: str - Raw full text of the chunk.
- n_tokens: int - Number of tokens in the chunk. This should normally match the `chunk_size` config parameter, except for the last chunk which is often shorter.
- document_ids: str[] - List of document IDs the chunk came from. This is normally only 1 due to our default groupby, but for very short text documents (e.g., microblogs) it can be configured so text units span multiple documents.
- entity_ids: str[] - List of entities found in the text unit.
- relationships_ids: str[] - List of relationships found in the text unit.
- covariate_ids: str[] - Optional list of covariates found in the text unit.
| name | type | description |
| ----------------- | ----- | ----------- |
| text | str | Raw full text of the chunk. |
| n_tokens | int | Number of tokens in the chunk. This should normally match the `chunk_size` config parameter, except for the last chunk which is often shorter. |
| document_ids | str[] | List of document IDs the chunk came from. This is normally only 1 due to our default groupby, but for very short text documents (e.g., microblogs) it can be configured so text units span multiple documents. |
| entity_ids | str[] | List of entities found in the text unit. |
| relationships_ids | str[] | List of relationships found in the text unit. |
| covariate_ids | str[] | Optional list of covariates found in the text unit. |

View File

@ -39,35 +39,7 @@ yarn run:index --config your_pipeline.yml # custom config mode
### Python API
```python
from graphrag.index import run_pipeline
from graphrag.index.config import PipelineWorkflowReference
workflows: list[PipelineWorkflowReference] = [
PipelineWorkflowReference(
steps=[
{
# built-in verb
"verb": "derive", # https://github.com/microsoft/datashaper/blob/main/python/datashaper/datashaper/verbs/derive.py
"args": {
"column1": "col1", # from above
"column2": "col2", # from above
"to": "col_multiplied", # new column name
"operator": "*", # multiply the two columns
},
# Since we're trying to act on the default input, we don't need explicitly to specify an input
}
]
),
]
dataset = pd.DataFrame([{"col1": 2, "col2": 4}, {"col1": 5, "col2": 10}])
outputs = []
async for output in await run_pipeline(dataset=dataset, workflows=workflows):
outputs.append(output)
pipeline_result = outputs[-1]
print(pipeline_result)
```
Please see the [examples folder](https://github.com/microsoft/graphrag/blob/main/examples/README.md) for a handful of functional pipelines illustrating how to create and run via a custom settings.yml or through custom python scripts.
## Further Reading

View File

@ -3,6 +3,8 @@
--md-code-hl-color: #3772d9;
--md-code-hl-comment-color: #6b6b6b;
--md-code-hl-operator-color: #6b6b6b;
--md-footer-fg-color--light: #ffffff;
--md-footer-fg-color--lighter: #ffffff;
}
[data-md-color-scheme="slate"] {
@ -10,6 +12,8 @@
--md-code-hl-color: #246be5;
--md-code-hl-constant-color: #9a89ed;
--md-code-hl-number-color: #f16e5f;
--md-footer-fg-color--light: #ffffff;
--md-footer-fg-color--lighter: #ffffff;
}
.md-tabs__item--active {

View File

@ -1,2 +0,0 @@
# Copyright (c) 2024 Microsoft Corporation.
# Licensed under the MIT License

View File

@ -1,22 +0,0 @@
# Copyright (c) 2024 Microsoft Corporation.
# Licensed under the MIT License
from datashaper import TableContainer, VerbInput
def str_append(
input: VerbInput, source_column: str, target_column: str, string_to_append: str
):
"""A custom verb that appends a string to a column"""
# by convention, we typically use "column" as the input column name and "to" as the output column name, but you can use whatever you want
# just as long as the "args" in the workflow reference match the function signature
input_data = input.get_input()
output_df = input_data.copy()
output_df[target_column] = output_df[source_column].apply(
lambda x: f"{x}{string_to_append}"
)
return TableContainer(table=output_df)
custom_verbs = {
"str_append": str_append,
}

View File

@ -1,7 +0,0 @@
workflows:
- steps:
- verb: "str_append" # should be the key that you pass to the custom_verbs dict below
args:
source_column: "col1"
target_column: "col_1_custom"
string_to_append: " - custom verb"

View File

@ -1,84 +0,0 @@
# Copyright (c) 2024 Microsoft Corporation.
# Licensed under the MIT License
import asyncio
import os
import pandas as pd
from examples.custom_set_of_available_verbs.custom_verb_definitions import custom_verbs
from graphrag.index import run_pipeline, run_pipeline_with_config
from graphrag.index.config import PipelineWorkflowReference
# Our fake dataset
dataset = pd.DataFrame([{"col1": 2, "col2": 4}, {"col1": 5, "col2": 10}])
async def run_with_config():
"""Run a pipeline with a config file"""
# load pipeline.yml in this directory
config_path = os.path.join(
os.path.dirname(os.path.abspath(__file__)), "./pipeline.yml"
)
outputs = []
async for output in run_pipeline_with_config(
config_or_path=config_path, dataset=dataset
):
outputs.append(output)
pipeline_result = outputs[-1]
if pipeline_result.result is not None:
# Should look something like this, which should be identical to the python example:
# col1 col2 col_1_custom
# 0 2 4 2 - custom verb
# 1 5 10 5 - custom verb
print(pipeline_result.result)
else:
print("No results!")
async def run_python():
workflows: list[PipelineWorkflowReference] = [
PipelineWorkflowReference(
name="my_workflow",
steps=[
{
"verb": "str_append", # should be the key that you pass to the custom_verbs dict below
"args": {
"source_column": "col1", # from above
"target_column": "col_1_custom", # new column name,
"string_to_append": " - custom verb", # The string to append to the column
},
# Since we're trying to act on the default input, we don't need explicitly to specify an input
}
],
),
]
# Run the pipeline
outputs = []
async for output in run_pipeline(
dataset=dataset,
workflows=workflows,
additional_verbs=custom_verbs,
):
outputs.append(output)
# Find the result from the workflow we care about
pipeline_result = next(
(output for output in outputs if output.workflow == "my_workflow"), None
)
if pipeline_result is not None and pipeline_result.result is not None:
# Should look something like this:
# col1 col2 col_1_custom
# 0 2 4 2 - custom verb
# 1 5 10 5 - custom verb
print(pipeline_result.result)
else:
print("No results!")
if __name__ == "__main__":
asyncio.run(run_python())
asyncio.run(run_with_config())

View File

@ -1,2 +0,0 @@
# Copyright (c) 2024 Microsoft Corporation.
# Licensed under the MIT License

View File

@ -1,36 +0,0 @@
# Copyright (c) 2024 Microsoft Corporation.
# Licensed under the MIT License
from graphrag.index.workflows import WorkflowDefinitions
# Sets up the list of custom workflows that can be used in a pipeline
# The idea being that you can have a pool of workflows that can be used in any number of
# your pipelines
custom_workflows: WorkflowDefinitions = {
"my_workflow": lambda config: [
{
"verb": "derive",
"args": {
"column1": "col1", # looks for col1 in the dataset
"column2": "col2", # looks for col2 in the dataset
"to": config.get(
# Allow the user to specify the output column name,
# otherwise default to "output_column"
"derive_output_column",
"output_column",
), # new column name,
"operator": "*",
},
}
],
"my_unused_workflow": lambda _config: [
{
"verb": "derive",
"args": {
"column1": "col1", # looks for col1 in the dataset
"column2": "col2", # looks for col2 in the dataset
"to": "unused_output_column",
"operator": "*",
},
}
],
}

View File

@ -1,4 +0,0 @@
workflows:
- name: my_workflow
config:
derive_output_column: "col_1_multiplied"

View File

@ -1,85 +0,0 @@
# Copyright (c) 2024 Microsoft Corporation.
# Licensed under the MIT License
import asyncio
import os
import pandas as pd
from examples.custom_set_of_available_workflows.custom_workflow_definitions import (
custom_workflows,
)
from graphrag.index import run_pipeline, run_pipeline_with_config
from graphrag.index.config import PipelineWorkflowReference
sample_data_dir = os.path.join(
os.path.dirname(os.path.abspath(__file__)), "../_sample_data/"
)
# our fake dataset
dataset = pd.DataFrame([{"col1": 2, "col2": 4}, {"col1": 5, "col2": 10}])
async def run_with_config():
"""Run a pipeline with a config file"""
# load pipeline.yml in this directory
config_path = os.path.join(
os.path.dirname(os.path.abspath(__file__)), "./pipeline.yml"
)
# Grab the last result from the pipeline, should be our entity extraction
tables = []
async for table in run_pipeline_with_config(
config_or_path=config_path,
dataset=dataset,
additional_workflows=custom_workflows,
):
tables.append(table)
pipeline_result = tables[-1]
if pipeline_result.result is not None:
# Should look something like this:
# col1 col2 col_1_multiplied
# 0 2 4 8
# 1 5 10 50
print(pipeline_result.result)
else:
print("No results!")
async def run_python():
"""Run a pipeline using the python API"""
# Define the actual workflows to be run, this is identical to the python api
# but we're defining the workflows to be run via python instead of via a config file
workflows: list[PipelineWorkflowReference] = [
# run my_workflow against the dataset, notice we're only using the "my_workflow" workflow
# and not the "my_unused_workflow" workflow
PipelineWorkflowReference(
name="my_workflow", # should match the name of the workflow in the custom_workflows dict above
config={ # pass in a config
# set the derive_output_column to be "col_1_multiplied", this will be passed to the workflow definition above
"derive_output_column": "col_1_multiplied"
},
),
]
# Grab the last result from the pipeline, should be our entity extraction
tables = []
async for table in run_pipeline(
workflows, dataset=dataset, additional_workflows=custom_workflows
):
tables.append(table)
pipeline_result = tables[-1]
if pipeline_result.result is not None:
# Should look something like this:
# col1 col2 col_1_multiplied
# 0 2 4 8
# 1 5 10 50
print(pipeline_result.result)
else:
print("No results!")
if __name__ == "__main__":
asyncio.run(run_python())
asyncio.run(run_with_config())

View File

@ -1,2 +0,0 @@
# Copyright (c) 2024 Microsoft Corporation.
# Licensed under the MIT License

View File

@ -1,2 +0,0 @@
# Copyright (c) 2024 Microsoft Corporation.
# Licensed under the MIT License

View File

@ -1,16 +0,0 @@
workflows:
- name: "entity_extraction"
config:
entity_extract:
strategy:
type: "graph_intelligence"
llm:
type: "openai_chat"
# create a .env file in the same directory as this pipeline.yml file
# end add the following lines to it:
# EXAMPLE_OPENAI_API_KEY="YOUR_API_KEY"
api_key: !ENV ${EXAMPLE_OPENAI_API_KEY:None} # None is the default
model: !ENV ${EXAMPLE_OPENAI_MODEL:gpt-3.5-turbo} # gpt-3.5-turbo is the default
max_tokens: !ENV ${EXAMPLE_OPENAI_MAX_TOKENS:2500} # 2500 is the default
temperature: !ENV ${EXAMPLE_OPENAI_TEMPERATURE:0} # 0 is the default

View File

@ -1,111 +0,0 @@
# Copyright (c) 2024 Microsoft Corporation.
# Licensed under the MIT License
import asyncio
import os
from graphrag.index import run_pipeline, run_pipeline_with_config
from graphrag.index.config import PipelineCSVInputConfig, PipelineWorkflowReference
from graphrag.index.input import load_input
sample_data_dir = os.path.join(
os.path.dirname(os.path.abspath(__file__)), "../../_sample_data/"
)
shared_dataset = asyncio.run(
load_input(
PipelineCSVInputConfig(
file_pattern=".*\\.csv$",
base_dir=sample_data_dir,
source_column="author",
text_column="message",
timestamp_column="date(yyyyMMddHHmmss)",
timestamp_format="%Y%m%d%H%M%S",
title_column="message",
),
)
)
async def run_with_config():
"""Run a pipeline with a config file"""
# We're cheap, and this is an example, lets just do 10
dataset = shared_dataset.head(10)
# load pipeline.yml in this directory
config_path = os.path.join(
os.path.dirname(os.path.abspath(__file__)), "./pipeline.yml"
)
# Grab the last result from the pipeline, should be our entity extraction
tables = []
async for table in run_pipeline_with_config(
config_or_path=config_path, dataset=dataset
):
tables.append(table)
pipeline_result = tables[-1]
# Print the entities. This will be a row for each text unit, each with a list of entities,
# This should look pretty close to the python version, but since we're using an LLM
# it will be a little different depending on how it feels about the text
if pipeline_result.result is not None:
print(pipeline_result.result["entities"].to_list())
else:
print("No results!")
async def run_python():
if (
"EXAMPLE_OPENAI_API_KEY" not in os.environ
and "OPENAI_API_KEY" not in os.environ
):
msg = "Please set EXAMPLE_OPENAI_API_KEY or OPENAI_API_KEY environment variable to run this example"
raise Exception(msg)
# We're cheap, and this is an example, lets just do 10
dataset = shared_dataset.head(10)
workflows: list[PipelineWorkflowReference] = [
PipelineWorkflowReference(
name="entity_extraction",
config={
"entity_extract": {
"strategy": {
"type": "graph_intelligence",
"llm": {
"type": "openai_chat",
"api_key": os.environ.get(
"EXAMPLE_OPENAI_API_KEY",
os.environ.get("OPENAI_API_KEY", None),
),
"model": os.environ.get(
"EXAMPLE_OPENAI_MODEL", "gpt-3.5-turbo"
),
"max_tokens": os.environ.get(
"EXAMPLE_OPENAI_MAX_TOKENS", 2500
),
"temperature": os.environ.get(
"EXAMPLE_OPENAI_TEMPERATURE", 0
),
},
}
}
},
)
]
# Grab the last result from the pipeline, should be our entity extraction
tables = []
async for table in run_pipeline(dataset=dataset, workflows=workflows):
tables.append(table)
pipeline_result = tables[-1]
# Print the entities. This will be a row for each text unit, each with a list of entities
if pipeline_result.result is not None:
print(pipeline_result.result["entities"].to_list())
else:
print("No results!")
if __name__ == "__main__":
asyncio.run(run_python())
asyncio.run(run_with_config())

View File

@ -1,2 +0,0 @@
# Copyright (c) 2024 Microsoft Corporation.
# Licensed under the MIT License

View File

@ -1,6 +0,0 @@
workflows:
- name: "entity_extraction"
config:
entity_extract:
strategy:
type: "nltk"

View File

@ -1,78 +0,0 @@
# Copyright (c) 2024 Microsoft Corporation.
# Licensed under the MIT License
import asyncio
import os
from graphrag.index import run_pipeline, run_pipeline_with_config
from graphrag.index.config import PipelineCSVInputConfig, PipelineWorkflowReference
from graphrag.index.input import load_input
sample_data_dir = os.path.join(
os.path.dirname(os.path.abspath(__file__)), "../../_sample_data/"
)
shared_dataset = asyncio.run(
load_input(
PipelineCSVInputConfig(
file_pattern=".*\\.csv$",
base_dir=sample_data_dir,
source_column="author",
text_column="message",
timestamp_column="date(yyyyMMddHHmmss)",
timestamp_format="%Y%m%d%H%M%S",
title_column="message",
),
)
)
async def run_with_config():
"""Run a pipeline with a config file"""
# We're cheap, and this is an example, lets just do 10
dataset = shared_dataset.head(10)
# load pipeline.yml in this directory
config_path = os.path.join(
os.path.dirname(os.path.abspath(__file__)), "./pipeline.yml"
)
# Grab the last result from the pipeline, should be our entity extraction
tables = []
async for table in run_pipeline_with_config(
config_or_path=config_path, dataset=dataset
):
tables.append(table)
pipeline_result = tables[-1]
# Print the entities. This will be a row for each text unit, each with a list of entities
if pipeline_result.result is not None:
print(pipeline_result.result["entities"].to_list())
else:
print("No results!")
async def run_python():
dataset = shared_dataset.head(10)
workflows: list[PipelineWorkflowReference] = [
PipelineWorkflowReference(
name="entity_extraction",
config={"entity_extract": {"strategy": {"type": "nltk"}}},
)
]
# Grab the last result from the pipeline, should be our entity extraction
tables = []
async for table in run_pipeline(dataset=dataset, workflows=workflows):
tables.append(table)
pipeline_result = tables[-1]
# Print the entities. This will be a row for each text unit, each with a list of entities
if pipeline_result.result is not None:
print(pipeline_result.result["entities"].to_list())
else:
print("No results!")
if __name__ == "__main__":
asyncio.run(run_python())
asyncio.run(run_with_config())

View File

@ -1,2 +0,0 @@
# Copyright (c) 2024 Microsoft Corporation.
# Licensed under the MIT License

View File

@ -1,23 +0,0 @@
workflows:
- name: aggregate_workflow
steps:
- verb: "aggregate" # https://github.com/microsoft/datashaper/blob/main/python/datashaper/datashaper/verbs/aggregate.py
args:
groupby: "type"
column: "col_multiplied"
to: "aggregated_output"
operation: "sum"
input:
source: "workflow:derive_workflow" # reference the derive_workflow, cause this one requires that one to run first
# Notice, these are out of order, the indexing engine will figure out the right order to run them in
- name: derive_workflow
steps:
- verb: "derive" # https://github.com/microsoft/datashaper/blob/main/python/datashaper/datashaper/verbs/derive.py
args:
column1: "col1" # from above
column2: "col2" # from above
to: "col_multiplied" # new column name
operator: "*" # multiply the two columns,
# Since we're trying to act on the dataset, we don't need explicitly to specify an input
# "input": { "source": "source" } # use the dataset as the input to this verb. This is the default, so you can omit it.

View File

@ -1,102 +0,0 @@
# Copyright (c) 2024 Microsoft Corporation.
# Licensed under the MIT License
import asyncio
import os
import pandas as pd
from graphrag.index import run_pipeline, run_pipeline_with_config
from graphrag.index.config import PipelineWorkflowReference
# Our fake dataset
dataset = pd.DataFrame([
{"type": "A", "col1": 2, "col2": 4},
{"type": "A", "col1": 5, "col2": 10},
{"type": "A", "col1": 15, "col2": 26},
{"type": "B", "col1": 6, "col2": 15},
])
async def run_with_config():
"""Run a pipeline with a config file"""
# load pipeline.yml in this directory
config_path = os.path.join(
os.path.dirname(os.path.abspath(__file__)), "./pipeline.yml"
)
tables = []
async for table in run_pipeline_with_config(
config_or_path=config_path, dataset=dataset
):
tables.append(table)
pipeline_result = tables[-1]
if pipeline_result.result is not None:
# Should look something like this, which should be identical to the python example:
# type aggregated_output
# 0 A 448
# 1 B 90
print(pipeline_result.result)
else:
print("No results!")
async def run_python():
workflows: list[PipelineWorkflowReference] = [
PipelineWorkflowReference(
name="aggregate_workflow",
steps=[
{
"verb": "aggregate", # https://github.com/microsoft/datashaper/blob/main/python/datashaper/datashaper/verbs/aggregate.py
"args": {
"groupby": "type",
"column": "col_multiplied",
"to": "aggregated_output",
"operation": "sum",
},
"input": {
"source": "workflow:derive_workflow", # reference the derive_workflow, cause this one requires that one to run first
# Notice, these are out of order, the indexing engine will figure out the right order to run them in
},
}
],
),
PipelineWorkflowReference(
name="derive_workflow",
steps=[
{
# built-in verb
"verb": "derive", # https://github.com/microsoft/datashaper/blob/main/python/datashaper/datashaper/verbs/derive.py
"args": {
"column1": "col1", # from above
"column2": "col2", # from above
"to": "col_multiplied", # new column name
"operator": "*", # multiply the two columns,
},
# Since we're trying to act on the default input, we don't need explicitly to specify an input
}
],
),
]
# Grab the last result from the pipeline, should be our aggregate_workflow since it should be the last one to run
tables = []
async for table in run_pipeline(dataset=dataset, workflows=workflows):
tables.append(table)
pipeline_result = tables[-1]
if pipeline_result.result is not None:
# Should look something like this:
# type aggregated_output
# 0 A 448
# 1 B 90
# This is because we first in "derive_workflow" we multiply col1 and col2 together, then in "aggregate_workflow" we sum them up by type
print(pipeline_result.result)
else:
print("No results!")
if __name__ == "__main__":
asyncio.run(run_python())
asyncio.run(run_with_config())

View File

@ -1,2 +0,0 @@
# Copyright (c) 2024 Microsoft Corporation.
# Licensed under the MIT License

View File

@ -1,4 +0,0 @@
workflows:
- !include workflows/workflow_1.yml
- !include workflows/workflow_2.yml
- !include workflows/workflow_3.yml

View File

@ -1,43 +0,0 @@
# Copyright (c) 2024 Microsoft Corporation.
# Licensed under the MIT License
import asyncio
import os
from graphrag.index import run_pipeline_with_config
from graphrag.index.config import PipelineCSVInputConfig
from graphrag.index.input import load_input
sample_data_dir = os.path.join(
os.path.dirname(os.path.abspath(__file__)), "./../_sample_data/"
)
async def run_with_config():
dataset = await load_input(
PipelineCSVInputConfig(
file_pattern=".*\\.csv$",
base_dir=sample_data_dir,
source_column="author",
text_column="message",
timestamp_column="date(yyyyMMddHHmmss)",
timestamp_format="%Y%m%d%H%M%S",
title_column="message",
),
)
# We're cheap, and this is an example, lets just do 10
dataset = dataset.head(2)
# run the pipeline with the config, and override the dataset with the one we just created
# and grab the last result from the pipeline, should be the last workflow that was run (our nodes)
pipeline_path = os.path.join(
os.path.dirname(os.path.abspath(__file__)), "./pipeline.yml"
)
async for result in run_pipeline_with_config(pipeline_path, dataset=dataset):
print(f"Workflow {result.workflow} result\n: ")
print(result.result)
if __name__ == "__main__":
asyncio.run(run_with_config())

View File

@ -1 +0,0 @@
value_from_shared_file

View File

@ -1,6 +0,0 @@
name: workflow_1
steps:
- verb: fill
args:
to: "col_workflow_1"
value: 1

View File

@ -1,17 +0,0 @@
name: workflow_2
steps:
- verb: fill
args:
to: "col_workflow_2"
value: 2
input:
# workflow_2 is dependent on workflow_1
# so in workflow_2 output, you'll also see the output from workflow_1
source: "workflow:workflow_1"
# Example of pulling in values from a shared file
- verb: fill
args:
to: "col_from_shared_file"
value: !include ./shared/shared_fill_value.txt

View File

@ -1,6 +0,0 @@
name: workflow_3
steps:
- verb: fill
args:
to: "col_workflow_3"
value: 3

View File

@ -1,2 +0,0 @@
# Copyright (c) 2024 Microsoft Corporation.
# Licensed under the MIT License

View File

@ -1,64 +0,0 @@
input:
file_type: csv
base_dir: ../../_sample_data
file_pattern: .*\.csv$
source_column: "author"
text_column: "message"
timestamp_column: "date(yyyyMMddHHmmss)"
timestamp_format: "%Y%m%d%H%M%S"
title_column: "message"
# Limit to 10, we're not rich
post_process:
- verb: sample
args:
size: 10
input:
source: source
workflows:
# This workflow reference here is only necessary
# because we want to customize the how the entity_extraction workflow is configured
# otherwise, it can be omitted, but you're stuck with the default configuration for entity_extraction
- name: entity_extraction
config:
entity_extract:
strategy:
type: graph_intelligence
llm:
type: openai_chat
api_key: !ENV ${EXAMPLE_OPENAI_API_KEY}
model: !ENV ${EXAMPLE_OPENAI_MODEL:gpt-3.5-turbo}
max_tokens: !ENV ${EXAMPLE_OPENAI_MAX_TOKENS:2500}
temperature: !ENV ${EXAMPLE_OPENAI_TEMPERATURE:0}
- name: entity_graph
config:
cluster_graph:
strategy:
type: leiden
embed_graph:
strategy:
type: node2vec
num_walks: 10
walk_length: 40
window_size: 2
iterations: 3
random_seed: 597832
layout_graph:
strategy:
type: umap
# This is an anonymous workflow, it doesn't have a name
- steps:
# Unpack the nodes from the graph
- verb: graph.unpack
args:
column: positioned_graph
type: nodes
input:
# This is saying use the output of the entity_graph workflow as the input to this step
source: workflow:entity_graph

View File

@ -1,46 +0,0 @@
workflows:
# This workflow reference here is only necessary
# because we want to customize the how the entity_extraction workflow is configured
# otherwise, it can be omitted, but you're stuck with the default configuration for entity_extraction
- name: entity_extraction
config:
entity_extract:
strategy:
type: graph_intelligence
llm:
type: openai_chat
api_key: !ENV ${EXAMPLE_OPENAI_API_KEY}
model: !ENV ${EXAMPLE_OPENAI_MODEL:gpt-3.5-turbo}
max_tokens: !ENV ${EXAMPLE_OPENAI_MAX_TOKENS:2500}
temperature: !ENV ${EXAMPLE_OPENAI_TEMPERATURE:0}
- name: entity_graph
config:
cluster_graph:
strategy:
type: leiden
embed_graph:
strategy:
type: node2vec
num_walks: 10
walk_length: 40
window_size: 2
iterations: 3
random_seed: 597832
layout_graph:
strategy:
type: umap
# This is an anonymous workflow, it doesn't have a name
- steps:
# Unpack the nodes from the graph
- verb: graph.unpack
args:
column: positioned_graph
type: nodes
input:
# This is saying use the output of the entity_graph workflow as the input to this step
source: workflow:entity_graph

View File

@ -1,40 +0,0 @@
# Copyright (c) 2024 Microsoft Corporation.
# Licensed under the MIT License
import asyncio
import os
from graphrag.index import run_pipeline_with_config
async def main():
if (
"EXAMPLE_OPENAI_API_KEY" not in os.environ
and "OPENAI_API_KEY" not in os.environ
):
msg = "Please set EXAMPLE_OPENAI_API_KEY or OPENAI_API_KEY environment variable to run this example"
raise Exception(msg)
# run the pipeline with the config, and override the dataset with the one we just created
# and grab the last result from the pipeline, should be our entity extraction
pipeline_path = os.path.join(
os.path.dirname(os.path.abspath(__file__)),
"./pipelines/workflows_and_inputs.yml",
)
# run the pipeline with the config, and override the dataset with the one we just created
# and grab the last result from the pipeline, should be the last workflow that was run (our nodes)
tables = []
async for table in run_pipeline_with_config(pipeline_path):
tables.append(table)
pipeline_result = tables[-1]
# The output will contain a list of positioned nodes
if pipeline_result.result is not None:
top_nodes = pipeline_result.result.head(10)
print("pipeline result", top_nodes)
else:
print("No results!")
if __name__ == "__main__":
asyncio.run(main())

View File

@ -1,131 +0,0 @@
# Copyright (c) 2024 Microsoft Corporation.
# Licensed under the MIT License
import asyncio
import os
from typing import Any
from datashaper import NoopWorkflowCallbacks, Progress
from graphrag.index import run_pipeline_with_config
from graphrag.index.cache import InMemoryCache, PipelineCache
from graphrag.index.storage import MemoryPipelineStorage
async def main():
if (
"EXAMPLE_OPENAI_API_KEY" not in os.environ
and "OPENAI_API_KEY" not in os.environ
):
msg = "Please set EXAMPLE_OPENAI_API_KEY or OPENAI_API_KEY environment variable to run this example"
raise Exception(msg)
# run the pipeline with the config, and override the dataset with the one we just created
# and grab the last result from the pipeline, should be our entity extraction
pipeline_path = os.path.join(
os.path.dirname(os.path.abspath(__file__)),
"./pipelines/workflows_and_inputs.yml",
)
# Create our custom storage
custom_storage = ExampleStorage()
# Create our custom reporter
custom_reporter = ExampleReporter()
# Create our custom cache
custom_cache = ExampleCache()
# run the pipeline with the config, and override the dataset with the one we just created
# and grab the last result from the pipeline, should be the last workflow that was run (our nodes)
pipeline_result = []
async for result in run_pipeline_with_config(
pipeline_path,
storage=custom_storage,
callbacks=custom_reporter,
cache=custom_cache,
):
pipeline_result.append(result)
pipeline_result = pipeline_result[-1]
# The output will contain a list of positioned nodes
if pipeline_result.result is not None:
top_nodes = pipeline_result.result.head(10)
print("pipeline result", top_nodes)
else:
print("No results!")
class ExampleStorage(MemoryPipelineStorage):
"""Example of a custom storage handler"""
async def get(
self, key: str, as_bytes: bool | None = None, encoding: str | None = None
) -> Any:
print(f"ExampleStorage.get {key}")
return await super().get(key, as_bytes)
async def set(
self, key: str, value: str | bytes | None, encoding: str | None = None
) -> None:
print(f"ExampleStorage.set {key}")
return await super().set(key, value)
async def has(self, key: str) -> bool:
print(f"ExampleStorage.has {key}")
return await super().has(key)
async def delete(self, key: str) -> None:
print(f"ExampleStorage.delete {key}")
return await super().delete(key)
async def clear(self) -> None:
print("ExampleStorage.clear")
return await super().clear()
class ExampleCache(InMemoryCache):
"""Example of a custom cache handler"""
async def get(self, key: str) -> Any:
print(f"ExampleCache.get {key}")
return await super().get(key)
async def set(self, key: str, value: Any, debug_data: dict | None = None) -> None:
print(f"ExampleCache.set {key}")
return await super().set(key, value, debug_data)
async def has(self, key: str) -> bool:
print(f"ExampleCache.has {key}")
return await super().has(key)
async def delete(self, key: str) -> None:
print(f"ExampleCache.delete {key}")
return await super().delete(key)
async def clear(self) -> None:
print("ExampleCache.clear")
return await super().clear()
def child(self, name: str) -> PipelineCache:
print(f"ExampleCache.child {name}")
return ExampleCache(name)
class ExampleReporter(NoopWorkflowCallbacks):
"""Example of a custom reporter. This will print out all of the status updates from the pipeline."""
def progress(self, progress: Progress):
print("ExampleReporter.progress: ", progress)
def error(self, message: str, details: dict[str, Any] | None = None):
print("ExampleReporter.error: ", message)
def warning(self, message: str, details: dict[str, Any] | None = None):
print("ExampleReporter.warning: ", message)
def log(self, message: str, details: dict[str, Any] | None = None):
print("ExampleReporter.log: ", message)
if __name__ == "__main__":
asyncio.run(main())

View File

@ -1,59 +0,0 @@
# Copyright (c) 2024 Microsoft Corporation.
# Licensed under the MIT License
import asyncio
import os
from graphrag.index import run_pipeline_with_config
from graphrag.index.config import PipelineCSVInputConfig
from graphrag.index.input import load_input
sample_data_dir = os.path.join(
os.path.dirname(os.path.abspath(__file__)), "../_sample_data/"
)
async def main():
if (
"EXAMPLE_OPENAI_API_KEY" not in os.environ
and "OPENAI_API_KEY" not in os.environ
):
msg = "Please set EXAMPLE_OPENAI_API_KEY or OPENAI_API_KEY environment variable to run this example"
raise Exception(msg)
dataset = await load_input(
PipelineCSVInputConfig(
file_pattern=".*\\.csv$",
base_dir=sample_data_dir,
source_column="author",
text_column="message",
timestamp_column="date(yyyyMMddHHmmss)",
timestamp_format="%Y%m%d%H%M%S",
title_column="message",
),
)
# We're cheap, and this is an example, lets just do 10
dataset = dataset.head(10)
# run the pipeline with the config, and override the dataset with the one we just created
# and grab the last result from the pipeline, should be the last workflow that was run (our nodes)
pipeline_path = os.path.join(
os.path.dirname(os.path.abspath(__file__)), "./pipelines/workflows_only.yml"
)
tables = []
async for table in run_pipeline_with_config(pipeline_path, dataset=dataset):
tables.append(table)
pipeline_result = tables[-1]
# The output will contain a list of positioned nodes
if pipeline_result.result is not None:
top_nodes = pipeline_result.result.head(10)
print(
"pipeline result\ncols: ", pipeline_result.result.columns, "\n", top_nodes
)
else:
print("No results!")
if __name__ == "__main__":
asyncio.run(main())