API Reference
April 29, 2026 ยท View on GitHub
Complete reference for all public classes and methods exported by graphrag_sdk.
Table of Contents
- GraphRAG (Facade)
- Connection
- Providers
- Data Models
- Schema
- Ingestion Strategies
- Ingestion Pipeline
- Retrieval Strategies
- Reranking Strategies
- Storage
- Context
- Exceptions
GraphRAG (Facade)
The main entry point. Three primary operations: ingest(), retrieve(), and completion().
from graphrag_sdk import GraphRAG
Constructor
GraphRAG(
connection: FalkorDBConnection | ConnectionConfig,
llm: LLMInterface,
embedder: Embedder,
schema: GraphSchema | None = None,
retrieval_strategy: RetrievalStrategy | None = None,
)
| Parameter | Type | Default | Description |
|---|---|---|---|
connection | FalkorDBConnection | ConnectionConfig | required | Database connection or config to create one |
llm | LLMInterface | required | LLM provider |
embedder | Embedder | required | Embedding provider |
schema | GraphSchema | None | None | Schema constraints for extraction (empty = unconstrained) |
retrieval_strategy | RetrievalStrategy | None | None | Default retrieval strategy (uses MultiPathRetrieval if None) |
Public attributes: llm, embedder, schema, graph_store, vector_store
ingest()
async def ingest(
source: str | list[str],
*,
text: str | None = None,
loader: LoaderStrategy | None = None,
chunker: ChunkingStrategy | None = None,
extractor: ExtractionStrategy | None = None,
resolver: ResolutionStrategy | None = None,
max_concurrent: int = 3,
ctx: Context | None = None,
) -> IngestionResult | list[IngestionResult]
Build a knowledge graph from one or more sources. Auto-detects loader from file extension. When a list of sources is provided, documents are ingested in parallel with bounded concurrency.
| Parameter | Type | Default | Description |
|---|---|---|---|
source | str | list[str] | required | File path (or list of paths) for ingestion |
text | str | None | None | Raw text (single source only; skips loader) |
loader | LoaderStrategy | None | None | Custom loader (auto-detect if None) |
chunker | ChunkingStrategy | None | None | Custom chunker (FixedSizeChunking(1000) if None) |
extractor | ExtractionStrategy | None | None | Custom extractor (GraphExtraction if None) |
resolver | ResolutionStrategy | None | None | Custom resolver (ExactMatchResolution if None) |
max_concurrent | int | 3 | Max parallel ingestions (list input only) |
ctx | Context | None | None | Execution context |
Returns: IngestionResult for a single source, list[IngestionResult] for multiple sources.
retrieve()
async def retrieve(
question: str,
*,
strategy: RetrievalStrategy | None = None,
reranker: RerankingStrategy | None = None,
ctx: Context | None = None,
) -> RetrieverResult
Retrieve context from the knowledge graph without generating an answer. Use this to inspect retrieved context or pass it to your own LLM.
| Parameter | Type | Default | Description |
|---|---|---|---|
question | str | required | The user's question |
strategy | RetrievalStrategy | None | None | Override retrieval strategy |
reranker | RerankingStrategy | None | None | Optional reranking after retrieval |
ctx | Context | None | None | Execution context |
Returns: RetrieverResult
completion()
async def completion(
question: str,
*,
history: list[ChatMessage | dict[str, str]] | None = None,
strategy: RetrievalStrategy | None = None,
reranker: RerankingStrategy | None = None,
prompt_template: str | None = None,
return_context: bool = False,
ctx: Context | None = None,
) -> RagResult
Full RAG pipeline: retrieve context and generate an answer. When history is provided, messages are passed natively to the LLM provider's multi-turn chat API.
| Parameter | Type | Default | Description |
|---|---|---|---|
question | str | required | The user's question |
history | list[ChatMessage | dict] | None | None | Conversation history (see below) |
strategy | RetrievalStrategy | None | None | Override retrieval strategy |
reranker | RerankingStrategy | None | None | Optional reranking after retrieval |
prompt_template | str | None | None | Custom prompt for single-turn (must contain {context} and {question}). Ignored when history is provided. |
return_context | bool | False | Include retriever results in output |
ctx | Context | None | None | Execution context |
Returns: RagResult
Conversation history:
History accepts a list of ChatMessage objects or plain dicts with role and content keys. Supported roles: "system", "user", "assistant". Invalid roles raise ValueError.
from graphrag_sdk import ChatMessage
# Using ChatMessage objects
result = await rag.completion(
"What happened next?",
history=[
ChatMessage(role="user", content="Who is Alice?"),
ChatMessage(role="assistant", content="Alice is an engineer."),
],
)
# Using plain dicts
result = await rag.completion(
"Tell me more.",
history=[
{"role": "user", "content": "What is Acme?"},
{"role": "assistant", "content": "A tech company."},
],
)
When history is provided, completion() builds a native messages list: [system_prompt, *history, user_question] and calls LLMInterface.ainvoke_messages(). Without history, it uses the single-turn ainvoke() path.
query() (deprecated)
async def query(question: str, **kwargs) -> RagResult
Deprecated. Use completion() for the full RAG pipeline or retrieve() for retrieval-only. Emits a DeprecationWarning and delegates to completion().
deduplicate_entities()
async def deduplicate_entities(
*,
fuzzy: bool = False,
similarity_threshold: float = 0.9,
batch_size: int = 500,
) -> int
Post-ingestion entity deduplication. Groups entities by (normalized name, label) to prevent cross-type merging (e.g. Person "Paris" and Location "Paris" stay separate).
- Phase 1 (always): Exact name match -- keeps longest description, remaps RELATES and MENTIONED_IN edges, deletes duplicates.
- Phase 2 (optional,
fuzzy=True): Embedding-based -- embeds entity names, finds near-duplicates by cosine similarity.
Call once after all documents are ingested.
Returns: Number of duplicate entities merged.
finalize()
async def finalize() -> dict[str, Any]
Run all post-ingestion steps after all documents are ingested. Bundles:
deduplicate_entities()-- global exact-name dedupbackfill_entity_embeddings()-- name-only embeddingsembed_relationships()-- fact text embeddings on RELATES edgesensure_indices()-- all indexes
Returns: Dict with counts: entities_deduplicated, entities_embedded, relationships_embedded, indexes.
Sync Wrappers
def retrieve_sync(question: str, **kwargs) -> RetrieverResult
def completion_sync(question: str, **kwargs) -> RagResult
def ingest_sync(source: str | list[str], **kwargs) -> IngestionResult | list[IngestionResult]
def finalize_sync() -> dict[str, Any]
def query_sync(question: str, **kwargs) -> RagResult # deprecated
Convenience methods that run the async versions in asyncio.run().
Connection
from graphrag_sdk import ConnectionConfig, FalkorDBConnection
ConnectionConfig
@dataclass
class ConnectionConfig:
host: str = "localhost"
port: int = 6379
username: str | None = None
password: str | None = None
graph_name: str = "knowledge_graph"
max_connections: int = 16
retry_count: int = 3
retry_delay: float = 1.0
query_timeout_ms: int = 10_000
pool_timeout: int = 30
FalkorDBConnection
FalkorDBConnection(config: ConnectionConfig | None = None)
| Method | Description |
|---|---|
await conn.query(cypher, params=None, timeout=None) | Execute a Cypher query |
await conn.close() | Close the connection pool |
conn.graph | Lazy property returning the AsyncGraph handle |
Providers
from graphrag_sdk import LLMInterface, Embedder, LLMBatchItem
from graphrag_sdk import LiteLLM, LiteLLMEmbedder, OpenRouterLLM, OpenRouterEmbedder
LLMInterface (ABC)
LLMInterface(model_name: str, model_params: dict | None = None, max_concurrency: int = 12)
| Method | Signature | Description |
|---|---|---|
invoke | (prompt: str, **kwargs) -> LLMResponse | Sync text generation (abstract) |
ainvoke | (prompt: str, *, max_retries=3, **kwargs) -> LLMResponse | Async with retry + backoff |
ainvoke_messages | (messages: list[ChatMessage], *, max_retries=3, **kwargs) -> LLMResponse | Multi-turn native messages (see below) |
invoke_with_model | (prompt: str, response_model: Type[BaseModel], **kwargs) -> BaseModel | Structured output |
ainvoke_with_model | (prompt: str, response_model: Type[BaseModel], *, max_retries=3) -> BaseModel | Async structured output |
abatch_invoke | (prompts: list[str], *, max_concurrency=None, max_retries=3) -> list[LLMBatchItem] | Concurrent batch |
ainvoke_messages() is used by completion() when conversation history is provided. The default implementation concatenates messages into a single prompt string and calls ainvoke(), so custom providers work without changes. LiteLLM and OpenRouterLLM override this with native multi-turn implementations.
Embedder (ABC)
| Method | Signature | Description |
|---|---|---|
model_name | @property -> str | Embedding model identifier (abstract) |
embed_query | (text: str, **kwargs) -> list[float] | Single text embedding (abstract) |
aembed_query | (text: str, **kwargs) -> list[float] | Async single (default: thread pool) |
embed_documents | (texts: list[str], **kwargs) -> list[list[float]] | Batch (default: sequential) |
aembed_documents | (texts: list[str], **kwargs) -> list[list[float]] | Async batch (default: thread pool) |
LLMBatchItem
@dataclass
class LLMBatchItem:
index: int
response: LLMResponse | None = None
error: Exception | None = None
@property
def ok(self) -> bool # True if response is not None
LiteLLM
LiteLLM(model: str, *, api_key=None, api_base=None, api_version=None, temperature=0.0, max_tokens=None, **kwargs)
LiteLLMEmbedder
LiteLLMEmbedder(model: str, *, api_key=None, api_base=None, api_version=None, **kwargs)
OpenRouterLLM
OpenRouterLLM(model: str, *, api_key=None, temperature=0.0, max_tokens=None, extra_headers=None)
OpenRouterEmbedder
OpenRouterEmbedder(model: str, *, api_key=None, extra_headers=None)
Data Models
All models extend DataModel (Pydantic BaseModel with extra="allow").
from graphrag_sdk import GraphNode, GraphRelationship, GraphData
from graphrag_sdk import TextChunk, TextChunks
from graphrag_sdk import DocumentInfo, DocumentOutput
from graphrag_sdk import IngestionResult, RagResult
from graphrag_sdk import RetrieverResult, RetrieverResultItem
from graphrag_sdk import ResolutionResult, SearchType
GraphNode
class GraphNode(DataModel):
id: str # Unique identifier
label: str # Node label (Person, Place, etc.)
properties: dict[str, Any] = {} # Key-value properties
embedding_properties: dict[str, list[float]] | None = None
GraphRelationship
class GraphRelationship(DataModel):
start_node_id: str
end_node_id: str
type: str # Edge type (RELATES for all extracted rels)
properties: dict[str, Any] = {} # Includes rel_type, fact, keywords, etc.
embedding_properties: dict[str, list[float]] | None = None
GraphData
class GraphData(DataModel):
nodes: list[GraphNode] = []
relationships: list[GraphRelationship] = []
TextChunk
class TextChunk(DataModel):
text: str
index: int
metadata: dict[str, Any] = {}
uid: str # Auto-generated UUID
TextChunks
class TextChunks(DataModel):
chunks: list[TextChunk] = []
DocumentInfo
class DocumentInfo(DataModel):
path: str | None = None
uid: str # Auto-generated UUID
metadata: dict[str, Any] = {}
DocumentOutput
class DocumentOutput(DataModel):
text: str
document_info: DocumentInfo = DocumentInfo()
IngestionResult
class IngestionResult(DataModel):
document_info: DocumentInfo = DocumentInfo()
nodes_created: int = 0
relationships_created: int = 0
chunks_indexed: int = 0
metadata: dict[str, Any] = {}
RagResult
class RagResult(DataModel):
answer: str
retriever_result: RetrieverResult | None = None # Populated when return_context=True
metadata: dict[str, Any] = {} # Contains model, num_context_items, strategy
RetrieverResult
class RetrieverResult(DataModel):
items: list[RetrieverResultItem] = []
metadata: dict[str, Any] = {}
RetrieverResultItem
class RetrieverResultItem(DataModel):
content: str
metadata: dict[str, Any] = {}
score: float | None = None
ResolutionResult
class ResolutionResult(DataModel):
nodes: list[GraphNode] = []
relationships: list[GraphRelationship] = []
merged_count: int = 0
ChatMessage
from graphrag_sdk import ChatMessage
class ChatMessage(DataModel):
role: Literal["system", "user", "assistant"]
content: str
def to_dict(self) -> dict[str, str] # {"role": ..., "content": ...}
Validated message type for multi-turn conversations. Used by completion(history=...) and LLMInterface.ainvoke_messages(). Invalid roles raise a validation error on construction.
LLMMessage is a backward-compatible alias for ChatMessage.
LLMResponse
class LLMResponse(DataModel):
content: str
tool_calls: list[dict[str, Any]] | None = None
SearchType
class SearchType(str, Enum):
VECTOR = "vector"
FULLTEXT = "fulltext"
HYBRID = "hybrid"
Extraction Models
class ExtractedEntity(DataModel):
name: str
type: str
description: str = ""
source_chunk_ids: list[str] = []
class ExtractedRelation(DataModel):
source: str
target: str
type: str
keywords: str = ""
description: str = ""
weight: float = 1.0
source_chunk_ids: list[str] = []
class EntityMention(DataModel):
chunk_id: str
entity_id: str
class ExtractionOutput(DataModel):
entities: list[ExtractedEntity] = []
relations: list[ExtractedRelation] = []
mentions: list[EntityMention] = []
compute_entity_id()
def compute_entity_id(name: str, entity_type: str = "") -> str
Deterministic entity ID from normalized name and optional type. When entity_type is provided, appends a __type suffix to prevent cross-type collisions (e.g. paris__person vs paris__location). Without entity_type, returns just the normalized name for backwards compatibility.
Schema
from graphrag_sdk import GraphSchema, EntityType, RelationType
EntityType
class EntityType(DataModel):
label: str # e.g. "Person"
description: str | None = None # Helps LLM understand what to extract
properties: list[PropertyType] = [] # Optional property definitions
RelationType
class RelationType(DataModel):
label: str # e.g. "WORKS_AT"
description: str | None = None
patterns: list[tuple[str, str]] = [] # Allowed (source_label, target_label) pairs
PropertyType
class PropertyType(DataModel):
name: str
type: str = "STRING" # STRING, INTEGER, FLOAT, BOOLEAN, DATE, LIST
description: str | None = None
required: bool = False
GraphSchema
class GraphSchema(DataModel):
entities: list[EntityType] = []
relations: list[RelationType] = []
Ingestion Strategies
LoaderStrategy (ABC)
class LoaderStrategy(ABC):
@abstractmethod
async def load(self, source: str, ctx: Context) -> DocumentOutput: ...
Built-in: TextLoader(encoding="utf-8"), PdfLoader()
ChunkingStrategy (ABC)
class ChunkingStrategy(ABC):
@abstractmethod
async def chunk(self, text: str, ctx: Context) -> TextChunks: ...
Built-in: FixedSizeChunking(chunk_size=1000, chunk_overlap=100)
ExtractionStrategy (ABC)
class ExtractionStrategy(ABC):
@abstractmethod
async def extract(self, chunks: TextChunks, schema: GraphSchema, ctx: Context) -> GraphData: ...
Built-in:
GraphExtraction(llm, *, entity_extractor=None, coref_resolver=None, entity_types=None, max_concurrency=None)
Entity Extractors (step 1 backends for GraphExtraction):
GLiNERExtractor(threshold=0.75, model_name="urchade/gliner_medium-v2.1")-- default, local NERLLMExtractor(llm, threshold=0.75)-- LLM-based NER- Subclass
EntityExtractorfor custom backends
ResolutionStrategy (ABC)
class ResolutionStrategy(ABC):
@abstractmethod
async def resolve(self, graph_data: GraphData, ctx: Context) -> ResolutionResult: ...
Built-in:
ExactMatchResolution(resolve_property="id")DescriptionMergeResolution(llm=None, force_summary_threshold=3, max_summary_tokens=500)
Ingestion Pipeline
from graphrag_sdk import IngestionPipeline
IngestionPipeline(
loader: LoaderStrategy,
chunker: ChunkingStrategy,
extractor: ExtractionStrategy,
resolver: ResolutionStrategy,
graph_store: GraphStore,
vector_store: VectorStore,
schema: GraphSchema | None = None,
embedder: Embedder | None = None,
)
| Method | Signature | Description |
|---|---|---|
run | (source, ctx=None, *, text=None, document_info=None) -> IngestionResult | Execute the full 9-step pipeline |
Retrieval Strategies
RetrievalStrategy (ABC)
Uses the Template Method pattern.
class RetrievalStrategy(ABC):
def __init__(self, graph_store=None, vector_store=None): ...
async def search(self, query, ctx=None, **kwargs) -> RetrieverResult: ...
@abstractmethod
async def _execute(self, query, ctx, **kwargs) -> RawSearchResult: ...
LocalRetrieval
LocalRetrieval(graph_store, vector_store, embedder, top_k=5, include_entities=True)
MultiPathRetrieval
MultiPathRetrieval(
graph_store, vector_store, embedder, llm,
*,
chunk_top_k=15,
max_entities=30,
max_relationships=20,
rel_top_k=15,
keyword_limit=10,
)
Reranking Strategies
RerankingStrategy (ABC)
class RerankingStrategy(ABC):
@abstractmethod
async def rerank(self, query, result: RetrieverResult, ctx: Context) -> RetrieverResult: ...
CosineReranker
CosineReranker(embedder: Embedder, top_k: int = 15)
Storage
from graphrag_sdk import GraphStore, VectorStore
GraphStore
GraphStore(connection: FalkorDBConnection)
| Method | Signature | Description |
|---|---|---|
upsert_nodes | (nodes: list[GraphNode]) -> int | Batched MERGE, returns count |
upsert_relationships | (rels: list[GraphRelationship]) -> int | Batched MERGE with label hints |
get_connected_entities | (chunk_id, max_hops=1) -> list[dict] | N-hop entity traversal |
query_raw | (cypher, params=None) -> Any | Raw Cypher execution |
get_statistics | () -> dict | Node/edge counts, types, density |
delete_all | () -> None | Delete all data |
VectorStore
VectorStore(connection, embedder=None, index_name="chunk_embeddings", embedding_dimension=256, similarity_function="cosine")
| Method | Signature | Description |
|---|---|---|
create_vector_index | (label="Chunk", property="embedding") -> None | Create vector index |
create_entity_vector_index | () -> None | Create entity vector index |
create_fulltext_index | (label="Chunk", *properties) -> None | Create fulltext index |
ensure_indices | () -> None | Create all standard indices |
index_chunks | (chunks: TextChunks) -> int | Embed and store chunk vectors |
backfill_entity_embeddings | () -> int | Embed all entities missing vectors |
embed_relationships | () -> int | Embed fact text on RELATES edges |
search | (query_vector, top_k=5, label="Chunk") -> list[dict] | Vector similarity search |
search_entities | (query_vector, top_k=5) -> list[dict] | Entity vector search |
search_relationships | (query_vector, top_k=15) -> list[dict] | RELATES edge vector search |
fulltext_search | (query, top_k=5, label="Chunk") -> list[dict] | Fulltext keyword search |
Context
from graphrag_sdk import Context
Execution context for logging and budget tracking.
Context(tenant_id: str = "default", latency_budget_ms: float = 60000.0)
| Method/Property | Description |
|---|---|
ctx.log(message, log_level=logging.INFO) | Log a message |
ctx.budget_exceeded | True if elapsed time > latency_budget_ms |
Exceptions
from graphrag_sdk import GraphRAGError
| Exception | When Raised |
|---|---|
GraphRAGError | Base exception for all SDK errors |
LoaderError | File loading failures |
ChunkingError | Text splitting failures |
ExtractionError | LLM extraction or JSON parsing failures |
ResolutionError | Entity deduplication failures |
RetrieverError | Retrieval execution failures |
DatabaseError | FalkorDB connection or query failures |
IngestionError | Pipeline orchestration failures |
SchemaValidationError | Schema constraint violations |