PowerPlatform Dataverse SDK Guide
May 28, 2026 · View on GitHub
Overview
Use the PowerPlatform Dataverse Client Python SDK to interact with Microsoft Dataverse.
Key Concepts
Schema Names vs Display Names
- Standard tables: lowercase (e.g.,
"account","contact") - Custom tables: include customization prefix (e.g.,
"new_Product","cr123_Invoice") - Custom columns: include customization prefix (e.g.,
"new_Price","cr123_Status") - ALWAYS use schema names (logical names), NOT display names
Operation Namespaces
client.records-- CRUD and OData queriesclient.query-- query and search operationsclient.tables-- table metadata, columns, and relationshipsclient.files-- file upload operationsclient.batch-- batch multiple operations into a single HTTP request
Bulk Operations
The SDK supports Dataverse's native bulk operations: Pass lists to create(), update() for automatic bulk processing, for delete(), set use_bulk_delete when passing lists to use bulk operation
Paging
- Control page size with
page_sizeparameter onrecords.list(),records.list_pages(), orQueryBuilder.page_size() - Use
topparameter to limit total records returned - Preferred:
client.query.builder(table)....execute_pages()— composablewhere(col(...))filters, formatted values, expand with nested selects, full pagination control - Simple streaming shortcut:
records.list_pages(table, *, filter, select, top, orderby, expand, page_size, count, include_annotations)— string-based OData filter only, yields oneQueryResultper page execute(by_page=True/False)is deprecated and emitsUserWarning; useexecute_pages()insteadQueryBuilder.to_dataframe()is deprecated; use.execute().to_dataframe()instead
QueryResult
- Returned by
records.list(),records.retrieve(),execute(), and each page fromlist_pages()/execute_pages() - Iterable:
for record in result— each item is adict-likeRecord .to_dataframe()— convert to pandas DataFrame.first()— return the first record orNone(safe: returnsNoneon empty result)result[n]— index access returns aRecord;result[n:m]returns aQueryResultlen(result)— number of records in this result/page
DataFrame Support
- DataFrame operations are accessed via the
client.dataframenamespace:client.dataframe.create(),client.dataframe.update(),client.dataframe.delete()—client.dataframe.get()is deprecated; useclient.query.builder(table).where(...).execute().to_dataframe()instead
Common Operations
Import
from azure.identity import (
InteractiveBrowserCredential,
ClientSecretCredential,
CertificateCredential,
AzureCliCredential
)
from PowerPlatform.Dataverse.client import DataverseClient
Client Initialization
# Development options
credential = InteractiveBrowserCredential()
credential = AzureCliCredential()
# Production options
credential = ClientSecretCredential(tenant_id, client_id, client_secret)
credential = CertificateCredential(tenant_id, client_id, cert_path)
# Create client with context manager (recommended -- enables HTTP connection pooling)
# No trailing slash on URL!
with DataverseClient("https://yourorg.crm.dynamics.com", credential) as client:
... # all operations here
# Session closed, caches cleared automatically
# Or without context manager:
client = DataverseClient("https://yourorg.crm.dynamics.com", credential)
CRUD Operations
Create Records
# Single record
account_id = client.records.create("account", {"name": "Contoso Ltd", "telephone1": "555-0100"})
# Bulk create (uses CreateMultiple API automatically)
contacts = [
{"firstname": "John", "lastname": "Doe"},
{"firstname": "Jane", "lastname": "Smith"}
]
contact_ids = client.records.create("contact", contacts)
Read Records
# Get single record by ID
account = client.records.retrieve("account", account_id, select=["name", "telephone1"])
# With expand — fetch a related record in the same HTTP request
account = client.records.retrieve(
"account", account_id,
select=["name"],
expand=["primarycontactid"],
)
contact = (account.get("primarycontactid") or {})
print(contact.get("fullname"))
# Simple shortcut — use records.list() only for basic filter + select without composable logic.
# Follows @odata.nextLink automatically and loads all matching records into memory.
# For filtering, sorting, expansion, or formatted values, prefer client.query.builder() (see below).
result = client.records.list("account", filter="statecode eq 0", select=["name", "accountid"])
for record in result:
print(record["name"])
Query Builder (Preferred for Filtering, Sorting, Expand, Formatted Values)
Use client.query.builder() for any query that goes beyond simple filter + select. It provides composable where(col(...)) expressions, formatted value support, nested expansion, and streaming — all with a fluent API.
from PowerPlatform.Dataverse.models.filters import col
from PowerPlatform.Dataverse.models.query_builder import ExpandOption
# Basic query with composable filter and sort
result = (client.query.builder("account")
.select("accountid", "name", "statecode")
.where(col("statecode") == 0)
.order_by("name asc")
.execute())
for record in result:
print(record["name"])
# Composable filters — AND / OR / NOT using Python operators
result = (client.query.builder("contact")
.select("fullname", "emailaddress1")
.where((col("statecode") == 0) & (col("emailaddress1").contains("@contoso.com")))
.execute())
# Formatted values — display labels for option sets, currency symbols, etc.
result = (client.query.builder("account")
.select("accountid", "name", "industrycode")
.where(col("statecode") == 0)
.include_formatted_values()
.execute())
for record in result:
label = record.get("industrycode@OData.Community.Display.V1.FormattedValue")
print(record["name"], label)
# Navigation property expansion with nested column select
result = (client.query.builder("account")
.select("name")
.expand(ExpandOption("primarycontactid").select("fullname", "emailaddress1"))
.where(col("statecode") == 0)
.execute())
for record in result:
contact = record.get("primarycontactid", {})
print(f"{record['name']} - {contact.get('fullname', 'N/A')}")
# Stream large result sets page-by-page (memory-efficient)
for page in (client.query.builder("account")
.select("accountid", "name")
.where(col("statecode") == 0)
.order_by("name asc")
.page_size(500)
.execute_pages()):
for record in page:
print(record["name"])
# Convert query results to a DataFrame
df = (client.query.builder("account")
.select("accountid", "name")
.where(col("statecode") == 0)
.execute()
.to_dataframe())
# Limit total results
result = client.query.builder("account").select("name").top(100).execute()
# Simple streaming shortcut via records.list_pages() (string filter only, same params as records.list())
for page in client.records.list_pages("account", filter="statecode eq 0", select=["name"], page_size=500):
for record in page:
print(record["name"])
Create Records with Lookup Bindings (@odata.bind)
# Set lookup fields using @odata.bind with PascalCase navigation property names
# CORRECT: use the navigation property name (case-sensitive, must match $metadata)
guid = client.records.create("new_ticket", {
"new_name": "TKT-001",
"new_CustomerId@odata.bind": f"/new_customers({customer_id})",
"new_AgentId@odata.bind": f"/new_agents({agent_id})",
})
# WRONG: lowercase navigation property causes 400 error
# "new_customerid@odata.bind" -> ODataException: undeclared property 'new_customerid'
Update Records
# Single update
client.records.update("account", account_id, {"telephone1": "555-0200"})
# Bulk update (broadcast same change to multiple records)
client.records.update("account", [id1, id2, id3], {"industry": "Technology"})
Upsert Records
Creates or updates records identified by alternate keys. Single item -> PATCH; multiple items -> UpsertMultiple bulk action.
Prerequisite: The table must have an alternate key configured in Dataverse for the columns used in
alternate_key. Without it, Dataverse will reject the request with a 400 error.
from PowerPlatform.Dataverse.models.upsert import UpsertItem
# Single upsert
client.records.upsert("account", [
UpsertItem(
alternate_key={"accountnumber": "ACC-001"},
record={"name": "Contoso Ltd", "telephone1": "555-0100"},
)
])
# Bulk upsert (uses UpsertMultiple API automatically)
client.records.upsert("account", [
UpsertItem(alternate_key={"accountnumber": "ACC-001"}, record={"name": "Contoso Ltd"}),
UpsertItem(alternate_key={"accountnumber": "ACC-002"}, record={"name": "Fabrikam Inc"}),
])
# Composite alternate key
client.records.upsert("account", [
UpsertItem(
alternate_key={"accountnumber": "ACC-001", "address1_postalcode": "98052"},
record={"name": "Contoso Ltd"},
)
])
# Plain dict syntax (no import needed)
client.records.upsert("account", [
{"alternate_key": {"accountnumber": "ACC-001"}, "record": {"name": "Contoso Ltd"}}
])
Delete Records
# Single delete
client.records.delete("account", account_id)
# Bulk delete (uses BulkDelete API)
client.records.delete("account", [id1, id2, id3], use_bulk_delete=True)
DataFrame Operations
The SDK provides DataFrame wrappers for all CRUD operations via the client.dataframe namespace, using pandas DataFrames and Series as input/output.
Note:
client.dataframe.get()is deprecated. Useclient.query.builder(table).select(...).where(...).execute().to_dataframe()instead.QueryBuilder.to_dataframe()(without.execute()) is also deprecated — always call.execute()first.
import pandas as pd
# Query records -- returns a single DataFrame (GA pattern: .execute().to_dataframe())
from PowerPlatform.Dataverse.models.filters import col
df = client.query.builder("account").where(col("statecode") == 0).select("name").execute().to_dataframe()
print(f"Got {len(df)} rows")
# Limit results with top
df = client.query.builder("account").select("name").top(100).execute().to_dataframe()
# Via records.list() (simpler for basic queries)
df = client.records.list("account", filter="statecode eq 0", select=["name"]).to_dataframe()
# Create records from a DataFrame (returns a Series of GUIDs)
new_accounts = pd.DataFrame([
{"name": "Contoso", "telephone1": "555-0100"},
{"name": "Fabrikam", "telephone1": "555-0200"},
])
new_accounts["accountid"] = client.dataframe.create("account", new_accounts)
# Update records from a DataFrame (id_column identifies the GUID column)
new_accounts["telephone1"] = ["555-0199", "555-0299"]
client.dataframe.update("account", new_accounts, id_column="accountid")
# Clear a field by setting clear_nulls=True (by default, NaN/None fields are skipped)
df = pd.DataFrame([{"accountid": "guid-1", "websiteurl": None}])
client.dataframe.update("account", df, id_column="accountid", clear_nulls=True)
# Delete records by passing a Series of GUIDs
client.dataframe.delete("account", new_accounts["accountid"])
SQL Queries
SQL queries are read-only and support limited SQL syntax. A single SELECT statement with optional WHERE, TOP (integer literal), ORDER BY (column names only), and a simple table alias after FROM is supported. But JOIN and subqueries may not be. Refer to the Dataverse documentation for the current feature set.
results = client.query.sql(
"SELECT TOP 10 accountid, name FROM account WHERE statecode = 0"
)
for record in results:
print(record["name"])
FetchXML Queries
client.query.fetchxml(xml) returns an inert FetchXmlQuery object — no HTTP request is made until .execute() or .execute_pages() is called.
xml = """
<fetch top="50">
<entity name="account">
<attribute name="accountid" />
<attribute name="name" />
<filter>
<condition attribute="statecode" operator="eq" value="0" />
</filter>
</entity>
</fetch>
"""
# Load all results into memory (simple, small-to-medium sets)
query = client.query.fetchxml(xml)
result = query.execute() # returns QueryResult — all pages fetched upfront
for record in result:
print(record["name"])
# Stream page-by-page (large sets or early exit)
for page in query.execute_pages(): # yields one QueryResult per HTTP page
process(page.to_dataframe())
Table Management
Create Custom Tables
# Create table with columns (include customization prefix!)
table_info = client.tables.create(
"new_Product",
{
"new_Code": "string",
"new_Price": "decimal",
"new_Active": "bool",
"new_Quantity": "int",
},
)
# With solution assignment and custom primary column
table_info = client.tables.create(
"new_Product",
{"new_Code": "string", "new_Price": "decimal"},
solution="MyPublisher",
primary_column="new_ProductCode",
)
Supported Column Types
Types on the same line map to the same exact format under the hood
"string"or"text"- Single line of text"memo"or"multiline"- Multiple lines of text (4000 character default)"int"or"integer"- Whole number"decimal"or"money"- Decimal number"float"or"double"- Floating point number"bool"or"boolean"- Yes/No"datetime"or"date"- Date"file"- File column- Enum subclass - Local option set (picklist)
Manage Columns
# Add columns to existing table (must include customization prefix!)
client.tables.add_columns("new_Product", {
"new_Category": "string",
"new_InStock": "bool",
})
# Remove columns
client.tables.remove_columns("new_Product", ["new_Category"])
Inspect Tables
# Get single table information
table_info = client.tables.get("new_Product")
print(f"Logical name: {table_info['table_logical_name']}")
print(f"Entity set: {table_info['entity_set_name']}")
# List all tables
tables = client.tables.list()
for table in tables:
print(table)
Delete Tables
client.tables.delete("new_Product")
Relationship Management
Create One-to-Many Relationship
from PowerPlatform.Dataverse.models.relationship import (
LookupAttributeMetadata,
OneToManyRelationshipMetadata,
Label,
LocalizedLabel,
CascadeConfiguration,
)
from PowerPlatform.Dataverse.common.constants import CASCADE_BEHAVIOR_REMOVE_LINK
lookup = LookupAttributeMetadata(
schema_name="new_DepartmentId",
display_name=Label(
localized_labels=[LocalizedLabel(label="Department", language_code=1033)]
),
)
relationship = OneToManyRelationshipMetadata(
schema_name="new_Department_Employee",
referenced_entity="new_department",
referencing_entity="new_employee",
referenced_attribute="new_departmentid",
cascade_configuration=CascadeConfiguration(
delete=CASCADE_BEHAVIOR_REMOVE_LINK,
),
)
result = client.tables.create_one_to_many_relationship(lookup, relationship)
print(f"Created lookup field: {result.lookup_schema_name}")
Create Many-to-Many Relationship
from PowerPlatform.Dataverse.models.relationship import ManyToManyRelationshipMetadata
relationship = ManyToManyRelationshipMetadata(
schema_name="new_employee_project",
entity1_logical_name="new_employee",
entity2_logical_name="new_project",
)
result = client.tables.create_many_to_many_relationship(relationship)
print(f"Created: {result.relationship_schema_name}")
Convenience Method for Lookup Fields
result = client.tables.create_lookup_field(
referencing_table="new_order",
lookup_field_name="new_AccountId",
referenced_table="account",
display_name="Account",
required=True,
)
Query and Delete Relationships
# Get relationship metadata
rel = client.tables.get_relationship("new_Department_Employee")
if rel:
print(f"Found: {rel.relationship_schema_name}")
# Delete relationship
client.tables.delete_relationship(result.relationship_id)
File Operations
# Upload file to a file column
client.files.upload(
table="account",
record_id=account_id,
file_column="new_Document", # If the file column doesn't exist, it will be created automatically
path="/path/to/document.pdf",
)
Batch Operations
Use client.batch to send multiple operations in one HTTP request. All batch methods return None; results arrive via BatchResult after execute().
# Build a batch request
batch = client.batch.new()
batch.records.create("account", {"name": "Contoso"})
batch.records.update("account", account_id, {"telephone1": "555-0100"})
batch.records.retrieve("account", account_id, select=["name"], expand=["primarycontactid"], include_annotations="OData.Community.Display.V1.FormattedValue") # single record with expand
batch.records.list("account", filter="statecode eq 0", select=["name"], orderby=["name asc"], top=50, page_size=25, count=True) # multi-record, single page
batch.query.sql("SELECT TOP 5 name FROM account")
result = batch.execute()
for item in result.responses:
if item.is_success:
print(f"[OK] {item.status_code} entity_id={item.entity_id}")
if item.data:
# GET responses populate item.data with the parsed JSON record
print(item.data.get("name"))
else:
print(f"[ERR] {item.status_code}: {item.error_message}")
# Transactional changeset (all succeed or roll back)
with batch.changeset() as cs:
ref = cs.records.create("contact", {"firstname": "Alice"})
cs.records.update("account", account_id, {"primarycontactid@odata.bind": ref})
# Continue on error
result = batch.execute(continue_on_error=True)
print(f"Succeeded: {len(result.succeeded)}, Failed: {len(result.failed)}")
BatchResult properties:
result.responses-- list ofBatchItemResponsein submission orderresult.succeeded-- responses with 2xx status codesresult.failed-- responses with non-2xx status codesresult.has_errors-- True if any response failedresult.entity_ids-- GUIDs from OData-EntityId headers (creates and updates)
Batch limitations:
- Maximum 1000 operations per batch
batch.records.get()is deprecated; usebatch.records.retrieve()for single recordsbatch.records.list()returns a single page (no pagination); usetopto bound resultsflush_cache()is not supported in batch
Error Handling
The SDK provides structured exceptions with detailed error information:
from PowerPlatform.Dataverse.core.errors import (
DataverseError,
HttpError,
ValidationError,
MetadataError,
SQLParseError
)
from PowerPlatform.Dataverse.client import DataverseClient
try:
client.records.retrieve("account", "invalid-id")
except HttpError as e:
print(f"HTTP {e.status_code}: {e.message}")
print(f"Error code: {e.code}")
print(f"Subcode: {e.subcode}")
if e.is_transient:
print("This error may be retryable")
except ValidationError as e:
print(f"Validation error: {e.message}")
Common Error Patterns
Authentication failures:
- Check environment URL format (no trailing slash)
- Verify credentials have Dataverse permissions
- Ensure app registration is properly configured
404 Not Found:
- Verify table schema name is correct (lowercase for standard tables)
- Check record ID exists
- Ensure using schema names, not display names
- Cache issue could happen, so retry might help, especially for metadata creation
400 Bad Request:
- Check filter/expand parameters use correct case
- Verify column names exist and are spelled correctly
- Ensure custom columns include customization prefix
- For
@odata.binderrors ("undeclared property"): the navigation property name before@odata.bindis case-sensitive and must match the entity's$metadataexactly (e.g.,new_CustomerId@odata.bindfor custom lookups,parentaccountid@odata.bindfor system lookups). The SDK preserves@odata.bindkey casing.
Best Practices
Performance Optimization
- Prefer
client.query.builder()for any non-trivial query — use the builder for filtering, sorting, expansion, or formatted values;records.list()is a convenience shortcut for simple filter+select only - Use bulk operations - Pass lists to create/update/delete for automatic optimization
- Specify select fields - Limit returned columns to reduce payload size
- Control page size - Use
topandpage_sizeparameters appropriately; useexecute_pages()for large sets - Reuse client instances - Don't create new clients for each operation
- Use production credentials - ClientSecretCredential or CertificateCredential for unattended operations
- Error handling - Implement retry logic for transient errors (
e.is_transient) - Always include customization prefix for custom tables/columns
- Use lowercase for column names, match
$metadatafor navigation properties - Column names in$select/$filter/record payloads use lowercase LogicalNames. Navigation properties in$expandand@odata.bindkeys are case-sensitive and must match the entity's$metadata(PascalCase for custom lookups likenew_CustomerId, lowercase for system lookups likeparentaccountid) - Test in non-production environments first
- Use named constants - Import cascade behavior constants from
PowerPlatform.Dataverse.common.constants
Async Client
The SDK ships a full async client, AsyncDataverseClient, under PowerPlatform.Dataverse.aio. Requires the [async] extra: pip install "PowerPlatform-Dataverse-Client[async]".
Note: snippets in this section are fragments. Every
awaitline assumes it lives inside anasync def main(): ...body withclientandcredentialalready constructed (see the Client Initialization block for the wrapper). Outside an async function,awaitis aSyntaxError.
Import
from azure.identity.aio import DefaultAzureCredential
from PowerPlatform.Dataverse.aio import AsyncDataverseClient
Client Initialization
# given: credential constructed (e.g. DefaultAzureCredential())
# Context manager (recommended -- closes session and clears caches automatically)
async with AsyncDataverseClient("https://yourorg.crm.dynamics.com", credential) as client:
... # all operations here
# Standalone (call aclose() in a finally block)
client = AsyncDataverseClient("https://yourorg.crm.dynamics.com", credential)
try:
...
finally:
await client.aclose()
CRUD Operations
Every sync method has an async equivalent -- add await:
# given: client is an open AsyncDataverseClient
# Create
account_id = await client.records.create("account", {"name": "Contoso Ltd"})
# Read
account = await client.records.retrieve("account", account_id, select=["name", "telephone1"])
# Update
await client.records.update("account", account_id, {"telephone1": "555-0200"})
# Delete
await client.records.delete("account", account_id)
# Bulk create
ids = await client.records.create("account", [{"name": "A"}, {"name": "B"}])
Query Builder
# given: client is an open AsyncDataverseClient
from PowerPlatform.Dataverse.models.filters import col
# Collect all results
result = await (
client.query.builder("account")
.select("name", "telephone1")
.where(col("statecode") == 0)
.top(10)
.execute()
)
for record in result:
print(record["name"])
# Lazy page iteration (memory-efficient)
async for page in (
client.query.builder("account")
.select("name")
.page_size(500)
.execute_pages()
):
for record in page:
print(record["name"])
# SQL query
rows = await client.query.sql("SELECT TOP 5 name FROM account")
# FetchXML
xml = '<fetch top="5"><entity name="account"><attribute name="name"/></entity></fetch>'
rows = await client.query.fetchxml(xml).execute()
Batch and Changesets
# given: client is open; account_id from an earlier records.create
# Plain batch
batch = client.batch.new()
batch.records.create("account", {"name": "Alpha"})
result = await batch.execute()
# Atomic changeset
batch = client.batch.new()
async with batch.changeset() as cs:
ref = cs.records.create("contact", {"firstname": "Alice"})
cs.records.update("account", account_id, {"primarycontactid@odata.bind": ref})
result = await batch.execute()
DataFrame Operations
# given: client is an open AsyncDataverseClient
import pandas as pd
# Query to DataFrame
result = await (
client.query.builder("account")
.select("name", "telephone1")
.where(col("statecode") == 0)
.execute()
)
df = result.to_dataframe()
# Create from DataFrame
new_accounts = pd.DataFrame([{"name": "Contoso"}, {"name": "Fabrikam"}])
ids = await client.dataframe.create("account", new_accounts)
Additional Resources
Load these resources as needed during development:
Key Reminders
- Use
client.query.builder()for queries — it's the primary query pattern;records.list()is a shortcut for trivial filter+select only - Schema names are required - Never use display names
- Custom tables need prefixes - Include customization prefix (e.g., "new_")
- Filter is case-sensitive - Use lowercase logical names
- Bulk operations are encouraged - Pass lists for optimization
- No trailing slashes in URLs - Format:
https://org.crm.dynamics.com - Structured errors - Check
is_transientfor retry logic