PowerPlatform Dataverse SDK Guide

May 28, 2026 · View on GitHub

Overview

Use the PowerPlatform Dataverse Client Python SDK to interact with Microsoft Dataverse.

Key Concepts

Schema Names vs Display Names

  • Standard tables: lowercase (e.g., "account", "contact")
  • Custom tables: include customization prefix (e.g., "new_Product", "cr123_Invoice")
  • Custom columns: include customization prefix (e.g., "new_Price", "cr123_Status")
  • ALWAYS use schema names (logical names), NOT display names

Operation Namespaces

  • client.records -- CRUD and OData queries
  • client.query -- query and search operations
  • client.tables -- table metadata, columns, and relationships
  • client.files -- file upload operations
  • client.batch -- batch multiple operations into a single HTTP request

Bulk Operations

The SDK supports Dataverse's native bulk operations: Pass lists to create(), update() for automatic bulk processing, for delete(), set use_bulk_delete when passing lists to use bulk operation

Paging

  • Control page size with page_size parameter on records.list(), records.list_pages(), or QueryBuilder.page_size()
  • Use top parameter to limit total records returned
  • Preferred: client.query.builder(table)....execute_pages() — composable where(col(...)) filters, formatted values, expand with nested selects, full pagination control
  • Simple streaming shortcut: records.list_pages(table, *, filter, select, top, orderby, expand, page_size, count, include_annotations) — string-based OData filter only, yields one QueryResult per page
  • execute(by_page=True/False) is deprecated and emits UserWarning; use execute_pages() instead
  • QueryBuilder.to_dataframe() is deprecated; use .execute().to_dataframe() instead

QueryResult

  • Returned by records.list(), records.retrieve(), execute(), and each page from list_pages() / execute_pages()
  • Iterable: for record in result — each item is a dict-like Record
  • .to_dataframe() — convert to pandas DataFrame
  • .first() — return the first record or None (safe: returns None on empty result)
  • result[n] — index access returns a Record; result[n:m] returns a QueryResult
  • len(result) — number of records in this result/page

DataFrame Support

  • DataFrame operations are accessed via the client.dataframe namespace: client.dataframe.create(), client.dataframe.update(), client.dataframe.delete()client.dataframe.get() is deprecated; use client.query.builder(table).where(...).execute().to_dataframe() instead

Common Operations

Import

from azure.identity import (
    InteractiveBrowserCredential,
    ClientSecretCredential,
    CertificateCredential,
    AzureCliCredential
)
from PowerPlatform.Dataverse.client import DataverseClient

Client Initialization

# Development options
credential = InteractiveBrowserCredential()
credential = AzureCliCredential()

# Production options
credential = ClientSecretCredential(tenant_id, client_id, client_secret)
credential = CertificateCredential(tenant_id, client_id, cert_path)

# Create client with context manager (recommended -- enables HTTP connection pooling)
# No trailing slash on URL!
with DataverseClient("https://yourorg.crm.dynamics.com", credential) as client:
    ...  # all operations here
# Session closed, caches cleared automatically

# Or without context manager:
client = DataverseClient("https://yourorg.crm.dynamics.com", credential)

CRUD Operations

Create Records

# Single record
account_id = client.records.create("account", {"name": "Contoso Ltd", "telephone1": "555-0100"})

# Bulk create (uses CreateMultiple API automatically)
contacts = [
    {"firstname": "John", "lastname": "Doe"},
    {"firstname": "Jane", "lastname": "Smith"}
]
contact_ids = client.records.create("contact", contacts)

Read Records

# Get single record by ID
account = client.records.retrieve("account", account_id, select=["name", "telephone1"])

# With expand — fetch a related record in the same HTTP request
account = client.records.retrieve(
    "account", account_id,
    select=["name"],
    expand=["primarycontactid"],
)
contact = (account.get("primarycontactid") or {})
print(contact.get("fullname"))

# Simple shortcut — use records.list() only for basic filter + select without composable logic.
# Follows @odata.nextLink automatically and loads all matching records into memory.
# For filtering, sorting, expansion, or formatted values, prefer client.query.builder() (see below).
result = client.records.list("account", filter="statecode eq 0", select=["name", "accountid"])
for record in result:
    print(record["name"])

Query Builder (Preferred for Filtering, Sorting, Expand, Formatted Values)

Use client.query.builder() for any query that goes beyond simple filter + select. It provides composable where(col(...)) expressions, formatted value support, nested expansion, and streaming — all with a fluent API.

from PowerPlatform.Dataverse.models.filters import col
from PowerPlatform.Dataverse.models.query_builder import ExpandOption

# Basic query with composable filter and sort
result = (client.query.builder("account")
          .select("accountid", "name", "statecode")
          .where(col("statecode") == 0)
          .order_by("name asc")
          .execute())
for record in result:
    print(record["name"])

# Composable filters — AND / OR / NOT using Python operators
result = (client.query.builder("contact")
          .select("fullname", "emailaddress1")
          .where((col("statecode") == 0) & (col("emailaddress1").contains("@contoso.com")))
          .execute())

# Formatted values — display labels for option sets, currency symbols, etc.
result = (client.query.builder("account")
          .select("accountid", "name", "industrycode")
          .where(col("statecode") == 0)
          .include_formatted_values()
          .execute())
for record in result:
    label = record.get("industrycode@OData.Community.Display.V1.FormattedValue")
    print(record["name"], label)

# Navigation property expansion with nested column select
result = (client.query.builder("account")
          .select("name")
          .expand(ExpandOption("primarycontactid").select("fullname", "emailaddress1"))
          .where(col("statecode") == 0)
          .execute())
for record in result:
    contact = record.get("primarycontactid", {})
    print(f"{record['name']} - {contact.get('fullname', 'N/A')}")

# Stream large result sets page-by-page (memory-efficient)
for page in (client.query.builder("account")
             .select("accountid", "name")
             .where(col("statecode") == 0)
             .order_by("name asc")
             .page_size(500)
             .execute_pages()):
    for record in page:
        print(record["name"])

# Convert query results to a DataFrame
df = (client.query.builder("account")
      .select("accountid", "name")
      .where(col("statecode") == 0)
      .execute()
      .to_dataframe())

# Limit total results
result = client.query.builder("account").select("name").top(100).execute()

# Simple streaming shortcut via records.list_pages() (string filter only, same params as records.list())
for page in client.records.list_pages("account", filter="statecode eq 0", select=["name"], page_size=500):
    for record in page:
        print(record["name"])

Create Records with Lookup Bindings (@odata.bind)

# Set lookup fields using @odata.bind with PascalCase navigation property names
# CORRECT: use the navigation property name (case-sensitive, must match $metadata)
guid = client.records.create("new_ticket", {
    "new_name": "TKT-001",
    "new_CustomerId@odata.bind": f"/new_customers({customer_id})",
    "new_AgentId@odata.bind": f"/new_agents({agent_id})",
})

# WRONG: lowercase navigation property causes 400 error
# "new_customerid@odata.bind" -> ODataException: undeclared property 'new_customerid'

Update Records

# Single update
client.records.update("account", account_id, {"telephone1": "555-0200"})

# Bulk update (broadcast same change to multiple records)
client.records.update("account", [id1, id2, id3], {"industry": "Technology"})

Upsert Records

Creates or updates records identified by alternate keys. Single item -> PATCH; multiple items -> UpsertMultiple bulk action.

Prerequisite: The table must have an alternate key configured in Dataverse for the columns used in alternate_key. Without it, Dataverse will reject the request with a 400 error.

from PowerPlatform.Dataverse.models.upsert import UpsertItem

# Single upsert
client.records.upsert("account", [
    UpsertItem(
        alternate_key={"accountnumber": "ACC-001"},
        record={"name": "Contoso Ltd", "telephone1": "555-0100"},
    )
])

# Bulk upsert (uses UpsertMultiple API automatically)
client.records.upsert("account", [
    UpsertItem(alternate_key={"accountnumber": "ACC-001"}, record={"name": "Contoso Ltd"}),
    UpsertItem(alternate_key={"accountnumber": "ACC-002"}, record={"name": "Fabrikam Inc"}),
])

# Composite alternate key
client.records.upsert("account", [
    UpsertItem(
        alternate_key={"accountnumber": "ACC-001", "address1_postalcode": "98052"},
        record={"name": "Contoso Ltd"},
    )
])

# Plain dict syntax (no import needed)
client.records.upsert("account", [
    {"alternate_key": {"accountnumber": "ACC-001"}, "record": {"name": "Contoso Ltd"}}
])

Delete Records

# Single delete
client.records.delete("account", account_id)

# Bulk delete (uses BulkDelete API)
client.records.delete("account", [id1, id2, id3], use_bulk_delete=True)

DataFrame Operations

The SDK provides DataFrame wrappers for all CRUD operations via the client.dataframe namespace, using pandas DataFrames and Series as input/output.

Note: client.dataframe.get() is deprecated. Use client.query.builder(table).select(...).where(...).execute().to_dataframe() instead. QueryBuilder.to_dataframe() (without .execute()) is also deprecated — always call .execute() first.

import pandas as pd

# Query records -- returns a single DataFrame (GA pattern: .execute().to_dataframe())
from PowerPlatform.Dataverse.models.filters import col
df = client.query.builder("account").where(col("statecode") == 0).select("name").execute().to_dataframe()
print(f"Got {len(df)} rows")

# Limit results with top
df = client.query.builder("account").select("name").top(100).execute().to_dataframe()

# Via records.list() (simpler for basic queries)
df = client.records.list("account", filter="statecode eq 0", select=["name"]).to_dataframe()

# Create records from a DataFrame (returns a Series of GUIDs)
new_accounts = pd.DataFrame([
    {"name": "Contoso", "telephone1": "555-0100"},
    {"name": "Fabrikam", "telephone1": "555-0200"},
])
new_accounts["accountid"] = client.dataframe.create("account", new_accounts)

# Update records from a DataFrame (id_column identifies the GUID column)
new_accounts["telephone1"] = ["555-0199", "555-0299"]
client.dataframe.update("account", new_accounts, id_column="accountid")

# Clear a field by setting clear_nulls=True (by default, NaN/None fields are skipped)
df = pd.DataFrame([{"accountid": "guid-1", "websiteurl": None}])
client.dataframe.update("account", df, id_column="accountid", clear_nulls=True)

# Delete records by passing a Series of GUIDs
client.dataframe.delete("account", new_accounts["accountid"])

SQL Queries

SQL queries are read-only and support limited SQL syntax. A single SELECT statement with optional WHERE, TOP (integer literal), ORDER BY (column names only), and a simple table alias after FROM is supported. But JOIN and subqueries may not be. Refer to the Dataverse documentation for the current feature set.

results = client.query.sql(
    "SELECT TOP 10 accountid, name FROM account WHERE statecode = 0"
)
for record in results:
    print(record["name"])

FetchXML Queries

client.query.fetchxml(xml) returns an inert FetchXmlQuery object — no HTTP request is made until .execute() or .execute_pages() is called.

xml = """
<fetch top="50">
  <entity name="account">
    <attribute name="accountid" />
    <attribute name="name" />
    <filter>
      <condition attribute="statecode" operator="eq" value="0" />
    </filter>
  </entity>
</fetch>
"""

# Load all results into memory (simple, small-to-medium sets)
query = client.query.fetchxml(xml)
result = query.execute()              # returns QueryResult — all pages fetched upfront
for record in result:
    print(record["name"])

# Stream page-by-page (large sets or early exit)
for page in query.execute_pages():    # yields one QueryResult per HTTP page
    process(page.to_dataframe())

Table Management

Create Custom Tables

# Create table with columns (include customization prefix!)
table_info = client.tables.create(
    "new_Product",
    {
        "new_Code": "string",
        "new_Price": "decimal",
        "new_Active": "bool",
        "new_Quantity": "int",
    },
)

# With solution assignment and custom primary column
table_info = client.tables.create(
    "new_Product",
    {"new_Code": "string", "new_Price": "decimal"},
    solution="MyPublisher",
    primary_column="new_ProductCode",
)

Supported Column Types

Types on the same line map to the same exact format under the hood

  • "string" or "text" - Single line of text
  • "memo" or "multiline" - Multiple lines of text (4000 character default)
  • "int" or "integer" - Whole number
  • "decimal" or "money" - Decimal number
  • "float" or "double" - Floating point number
  • "bool" or "boolean" - Yes/No
  • "datetime" or "date" - Date
  • "file" - File column
  • Enum subclass - Local option set (picklist)

Manage Columns

# Add columns to existing table (must include customization prefix!)
client.tables.add_columns("new_Product", {
    "new_Category": "string",
    "new_InStock": "bool",
})

# Remove columns
client.tables.remove_columns("new_Product", ["new_Category"])

Inspect Tables

# Get single table information
table_info = client.tables.get("new_Product")
print(f"Logical name: {table_info['table_logical_name']}")
print(f"Entity set: {table_info['entity_set_name']}")

# List all tables
tables = client.tables.list()
for table in tables:
    print(table)

Delete Tables

client.tables.delete("new_Product")

Relationship Management

Create One-to-Many Relationship

from PowerPlatform.Dataverse.models.relationship import (
    LookupAttributeMetadata,
    OneToManyRelationshipMetadata,
    Label,
    LocalizedLabel,
    CascadeConfiguration,
)
from PowerPlatform.Dataverse.common.constants import CASCADE_BEHAVIOR_REMOVE_LINK

lookup = LookupAttributeMetadata(
    schema_name="new_DepartmentId",
    display_name=Label(
        localized_labels=[LocalizedLabel(label="Department", language_code=1033)]
    ),
)

relationship = OneToManyRelationshipMetadata(
    schema_name="new_Department_Employee",
    referenced_entity="new_department",
    referencing_entity="new_employee",
    referenced_attribute="new_departmentid",
    cascade_configuration=CascadeConfiguration(
        delete=CASCADE_BEHAVIOR_REMOVE_LINK,
    ),
)

result = client.tables.create_one_to_many_relationship(lookup, relationship)
print(f"Created lookup field: {result.lookup_schema_name}")

Create Many-to-Many Relationship

from PowerPlatform.Dataverse.models.relationship import ManyToManyRelationshipMetadata

relationship = ManyToManyRelationshipMetadata(
    schema_name="new_employee_project",
    entity1_logical_name="new_employee",
    entity2_logical_name="new_project",
)

result = client.tables.create_many_to_many_relationship(relationship)
print(f"Created: {result.relationship_schema_name}")

Convenience Method for Lookup Fields

result = client.tables.create_lookup_field(
    referencing_table="new_order",
    lookup_field_name="new_AccountId",
    referenced_table="account",
    display_name="Account",
    required=True,
)

Query and Delete Relationships

# Get relationship metadata
rel = client.tables.get_relationship("new_Department_Employee")
if rel:
    print(f"Found: {rel.relationship_schema_name}")

# Delete relationship
client.tables.delete_relationship(result.relationship_id)

File Operations

# Upload file to a file column
client.files.upload(
    table="account",
    record_id=account_id,
    file_column="new_Document",  # If the file column doesn't exist, it will be created automatically
    path="/path/to/document.pdf",
)

Batch Operations

Use client.batch to send multiple operations in one HTTP request. All batch methods return None; results arrive via BatchResult after execute().

# Build a batch request
batch = client.batch.new()
batch.records.create("account", {"name": "Contoso"})
batch.records.update("account", account_id, {"telephone1": "555-0100"})
batch.records.retrieve("account", account_id, select=["name"], expand=["primarycontactid"], include_annotations="OData.Community.Display.V1.FormattedValue")  # single record with expand
batch.records.list("account", filter="statecode eq 0", select=["name"], orderby=["name asc"], top=50, page_size=25, count=True)  # multi-record, single page
batch.query.sql("SELECT TOP 5 name FROM account")

result = batch.execute()
for item in result.responses:
    if item.is_success:
        print(f"[OK] {item.status_code} entity_id={item.entity_id}")
        if item.data:
            # GET responses populate item.data with the parsed JSON record
            print(item.data.get("name"))
    else:
        print(f"[ERR] {item.status_code}: {item.error_message}")

# Transactional changeset (all succeed or roll back)
with batch.changeset() as cs:
    ref = cs.records.create("contact", {"firstname": "Alice"})
    cs.records.update("account", account_id, {"primarycontactid@odata.bind": ref})

# Continue on error
result = batch.execute(continue_on_error=True)
print(f"Succeeded: {len(result.succeeded)}, Failed: {len(result.failed)}")

BatchResult properties:

  • result.responses -- list of BatchItemResponse in submission order
  • result.succeeded -- responses with 2xx status codes
  • result.failed -- responses with non-2xx status codes
  • result.has_errors -- True if any response failed
  • result.entity_ids -- GUIDs from OData-EntityId headers (creates and updates)

Batch limitations:

  • Maximum 1000 operations per batch
  • batch.records.get() is deprecated; use batch.records.retrieve() for single records
  • batch.records.list() returns a single page (no pagination); use top to bound results
  • flush_cache() is not supported in batch

Error Handling

The SDK provides structured exceptions with detailed error information:

from PowerPlatform.Dataverse.core.errors import (
    DataverseError,
    HttpError,
    ValidationError,
    MetadataError,
    SQLParseError
)
from PowerPlatform.Dataverse.client import DataverseClient

try:
    client.records.retrieve("account", "invalid-id")
except HttpError as e:
    print(f"HTTP {e.status_code}: {e.message}")
    print(f"Error code: {e.code}")
    print(f"Subcode: {e.subcode}")
    if e.is_transient:
        print("This error may be retryable")
except ValidationError as e:
    print(f"Validation error: {e.message}")

Common Error Patterns

Authentication failures:

  • Check environment URL format (no trailing slash)
  • Verify credentials have Dataverse permissions
  • Ensure app registration is properly configured

404 Not Found:

  • Verify table schema name is correct (lowercase for standard tables)
  • Check record ID exists
  • Ensure using schema names, not display names
  • Cache issue could happen, so retry might help, especially for metadata creation

400 Bad Request:

  • Check filter/expand parameters use correct case
  • Verify column names exist and are spelled correctly
  • Ensure custom columns include customization prefix
  • For @odata.bind errors ("undeclared property"): the navigation property name before @odata.bind is case-sensitive and must match the entity's $metadata exactly (e.g., new_CustomerId@odata.bind for custom lookups, parentaccountid@odata.bind for system lookups). The SDK preserves @odata.bind key casing.

Best Practices

Performance Optimization

  1. Prefer client.query.builder() for any non-trivial query — use the builder for filtering, sorting, expansion, or formatted values; records.list() is a convenience shortcut for simple filter+select only
  2. Use bulk operations - Pass lists to create/update/delete for automatic optimization
  3. Specify select fields - Limit returned columns to reduce payload size
  4. Control page size - Use top and page_size parameters appropriately; use execute_pages() for large sets
  5. Reuse client instances - Don't create new clients for each operation
  6. Use production credentials - ClientSecretCredential or CertificateCredential for unattended operations
  7. Error handling - Implement retry logic for transient errors (e.is_transient)
  8. Always include customization prefix for custom tables/columns
  9. Use lowercase for column names, match $metadata for navigation properties - Column names in $select/$filter/record payloads use lowercase LogicalNames. Navigation properties in $expand and @odata.bind keys are case-sensitive and must match the entity's $metadata (PascalCase for custom lookups like new_CustomerId, lowercase for system lookups like parentaccountid)
  10. Test in non-production environments first
  11. Use named constants - Import cascade behavior constants from PowerPlatform.Dataverse.common.constants

Async Client

The SDK ships a full async client, AsyncDataverseClient, under PowerPlatform.Dataverse.aio. Requires the [async] extra: pip install "PowerPlatform-Dataverse-Client[async]".

Note: snippets in this section are fragments. Every await line assumes it lives inside an async def main(): ... body with client and credential already constructed (see the Client Initialization block for the wrapper). Outside an async function, await is a SyntaxError.

Import

from azure.identity.aio import DefaultAzureCredential
from PowerPlatform.Dataverse.aio import AsyncDataverseClient

Client Initialization

# given: credential constructed (e.g. DefaultAzureCredential())

# Context manager (recommended -- closes session and clears caches automatically)
async with AsyncDataverseClient("https://yourorg.crm.dynamics.com", credential) as client:
    ...  # all operations here

# Standalone (call aclose() in a finally block)
client = AsyncDataverseClient("https://yourorg.crm.dynamics.com", credential)
try:
    ...
finally:
    await client.aclose()

CRUD Operations

Every sync method has an async equivalent -- add await:

# given: client is an open AsyncDataverseClient

# Create
account_id = await client.records.create("account", {"name": "Contoso Ltd"})

# Read
account = await client.records.retrieve("account", account_id, select=["name", "telephone1"])

# Update
await client.records.update("account", account_id, {"telephone1": "555-0200"})

# Delete
await client.records.delete("account", account_id)

# Bulk create
ids = await client.records.create("account", [{"name": "A"}, {"name": "B"}])

Query Builder

# given: client is an open AsyncDataverseClient
from PowerPlatform.Dataverse.models.filters import col

# Collect all results
result = await (
    client.query.builder("account")
    .select("name", "telephone1")
    .where(col("statecode") == 0)
    .top(10)
    .execute()
)
for record in result:
    print(record["name"])

# Lazy page iteration (memory-efficient)
async for page in (
    client.query.builder("account")
    .select("name")
    .page_size(500)
    .execute_pages()
):
    for record in page:
        print(record["name"])

# SQL query
rows = await client.query.sql("SELECT TOP 5 name FROM account")

# FetchXML
xml = '<fetch top="5"><entity name="account"><attribute name="name"/></entity></fetch>'
rows = await client.query.fetchxml(xml).execute()

Batch and Changesets

# given: client is open; account_id from an earlier records.create

# Plain batch
batch = client.batch.new()
batch.records.create("account", {"name": "Alpha"})
result = await batch.execute()

# Atomic changeset
batch = client.batch.new()
async with batch.changeset() as cs:
    ref = cs.records.create("contact", {"firstname": "Alice"})
    cs.records.update("account", account_id, {"primarycontactid@odata.bind": ref})
result = await batch.execute()

DataFrame Operations

# given: client is an open AsyncDataverseClient
import pandas as pd

# Query to DataFrame
result = await (
    client.query.builder("account")
    .select("name", "telephone1")
    .where(col("statecode") == 0)
    .execute()
)
df = result.to_dataframe()

# Create from DataFrame
new_accounts = pd.DataFrame([{"name": "Contoso"}, {"name": "Fabrikam"}])
ids = await client.dataframe.create("account", new_accounts)

Additional Resources

Load these resources as needed during development:

Key Reminders

  1. Use client.query.builder() for queries — it's the primary query pattern; records.list() is a shortcut for trivial filter+select only
  2. Schema names are required - Never use display names
  3. Custom tables need prefixes - Include customization prefix (e.g., "new_")
  4. Filter is case-sensitive - Use lowercase logical names
  5. Bulk operations are encouraged - Pass lists for optimization
  6. No trailing slashes in URLs - Format: https://org.crm.dynamics.com
  7. Structured errors - Check is_transient for retry logic