dv-data

Installation
SKILL.md

Skill: Data — Create, Update, Delete, and Bulk Import

This skill uses Python exclusively. Do not use Node.js, JavaScript, or any other language for Dataverse scripting. If you are about to run npm install or write a .js file, STOP — you are going off-rails. See the overview skill's Hard Rules.

Preview Before Running — Scope by Operation Type

Two rules, different strictness:

  • Destructive / stateful operations (create, update, delete, bulk-import, upsert records) — preview the action in plain prose: which table, how many records, and which environment, using placeholders (<ENV_URL>) for anything unknown. Reference the documented snippet by name rather than pasting the full block inline. Ask for confirmation and missing values in the same turn.
  • Read-only or scoped-reference operations (schema inspection, looking up a record count, explaining what you'll run from a documented snippet) — a one-sentence prose preview is enough.

Key principle: the user should be able to evaluate what's about to happen from your first response. A bare "which environment?" fails that test; a one-line prose preview passes it.

Examples

Use the official Microsoft Power Platform Dataverse Client Python SDK for all data write operations.

Official SDK: https://github.com/microsoft/PowerPlatform-DataverseClient-Python PyPI package: PowerPlatform-Dataverse-Client (this is the only official one — do not use dataverse-api or other unofficial packages) Status: Preview — breaking changes are possible

Skill boundaries

Need Use instead
Query or read records dv-query
Create tables, columns, relationships, forms, views dv-metadata
Export or deploy solutions dv-solution

Before Writing ANY Script — Check MCP First

If MCP tools are available (create_record, update_record) and the task is ≤10 records, use MCP directly — no script needed. Only write a Python script when the task requires: bulk operations (10+ records), data transformation, retry logic, CSV import, or operations the SDK supports that MCP cannot (upsert, file uploads). Sequential MCP tool calls are not "multi-step logic" — use MCP for those.

SDK-First Rule

If an operation is in the "supports" list below, you MUST use the SDK — not urllib, requests, or raw HTTP.

Correct imports (always preceded by sys.path.insert in a full script — see Setup below):

from auth import get_credential, load_env
from PowerPlatform.Dataverse.client import DataverseClient

WRONG for SDK-supported operations:

from auth import get_token, load_env  # WRONG for SDK-supported ops
import requests                        # WRONG for SDK-supported ops

get_token() and requests exist ONLY for operations the SDK does not support (forms, views, $apply, N:N $expand, unbound actions) — see dv-query and dv-metadata.


What This SDK Supports (Data Operations)

  • Record writes: create, update, delete
  • Record reads within write workflows (e.g., lookup resolution) — for standalone queries see dv-query
  • Upsert (with alternate key support)
  • Bulk operations: CreateMultiple, UpdateMultiple, UpsertMultiple
  • File column uploads (chunked for files >128MB)
  • Context manager with HTTP connection pooling

What This SDK Does NOT Support

Use raw Web API (get_token()) for:

  • Forms (FormXml) — see dv-metadata
  • Views (SavedQueries) — see dv-metadata
  • Global option sets — see dv-metadata
  • N:N record association ($ref POST) — use raw Web API (POST /api/data/v9.2/<entity>(<id>)/<nav-property>/$ref)
  • N:N $expand — see dv-query
  • $apply aggregation — see dv-query
  • Unbound actions (e.g., InstallSampleData)
  • DeleteMultiple, general OData batching

Setup

import os, sys
sys.path.insert(0, os.path.join(os.getcwd(), "scripts"))
from auth import get_credential, load_env
from PowerPlatform.Dataverse.client import DataverseClient

load_env()
client = DataverseClient(
    base_url=os.environ["DATAVERSE_URL"],
    credential=get_credential(),
)

get_credential() returns ClientSecretCredential (if CLIENT_ID + CLIENT_SECRET are in .env) or DeviceCodeCredential (interactive fallback). See scripts/auth.py.

For scripts that run to completion: wrap in with DataverseClient(...) as client: for automatic connection cleanup (recommended since b6). For notebooks and interactive sessions, the explicit client above is simpler.


Field Name Casing Rule

Getting this wrong causes 400 errors.

Property type Convention Example When used
Structural (columns) LogicalName — always lowercase new_name, new_priority Record payload keys
Navigation (lookups) Navigation Property Name — case-sensitive, matches $metadata new_AccountId @odata.bind keys

The SDK lowercases structural keys automatically but preserves @odata.bind key casing.


Create a Record

guid = client.records.create("new_ticket", {
    "new_name": "Ticket 001",
    "new_priority": 100000002,          # choice column — integer value, not string
    "new_AccountId@odata.bind": "/accounts(<account-guid>)",
})
print(f"Created: {guid}")

@odata.bind notes:

  • Key is the Navigation Property Name: new_AccountId@odata.bind (the SDK preserves casing automatically as of b6, but matching the schema name is still the correct form)
  • Value is "/<EntitySetName>(<guid>)" — e.g., "/accounts(<guid>)"
  • If you just created the lookup column, wait 5–10 seconds before inserting. Metadata propagation delays cause "Invalid property" errors.
  • Choice columns use integer values, not strings: "new_priority": 100000002 (not "High")

Common @odata.bind patterns

Lookup Correct key Wrong
Custom: new_AccountId new_AccountId@odata.bind new_accountid@odata.bind
System polymorphic: customerid customerid_account@odata.bind customerid@odata.bind
System: parentcustomerid parentcustomerid_account@odata.bind _parentcustomerid_value@odata.bind

Find the Navigation Property Name

After creating a lookup via SDK: result.lookup_schema_name is the navigation property name.

For existing system tables, query:

GET /api/data/v9.2/EntityDefinitions(LogicalName='<entity>')/ManyToOneRelationships
  ?$select=ReferencingEntityNavigationPropertyName,ReferencedEntity

Update a Record

client.records.update("new_ticket", "<record-guid>",
    {"new_status": 100000001})

Delete a Record

client.records.delete("new_ticket", "<record-guid>")

Bulk Create (SDK uses CreateMultiple internally)

records = [{"new_name": f"Ticket {i}", "new_priority": 100000000} for i in range(500)]
guids = client.records.create("new_ticket", records)
print(f"Created {len(guids)} records")

Volume guidance: MCP create_record for 1-10 records. SDK for 10+ records.

Important: The SDK sends all records in a single POST to CreateMultiple. It does not chunk automatically. Dataverse has no fixed record count limit — the constraints are payload size and request timeout (SDK default: 120s for POST). For larger datasets, you must chunk in your script. The bulk_upsert and bulk_create helpers below use adaptive chunking: start at 1,000, double on success (up to 4,000), halve on payload/timeout failure, and cap at the last successful size. Tables with few columns can handle larger chunks than tables with many columns.


Bulk Update

# Broadcast same change to multiple records
client.records.update("new_ticket",
    [id1, id2, id3],
    {"new_status": 100000001})

DataFrame Write-Back

To create or update records from a pandas DataFrame, use the client.dataframe namespace. This is documented in dv-query (alongside client.dataframe.get()) but is a write operation — include it in your data write workflow:

# Update records — DataFrame must include the primary key column
client.dataframe.update("opportunity", df_updates, id_column="opportunityid")

# Create records — returns a Series of new GUIDs
guids = client.dataframe.create("opportunity", df_new_records)

See dv-query for the full client.dataframe reference including client.dataframe.get().


Upsert (Alternate Keys)

Idempotent — re-running the same import does not create duplicates. The alternate key must be defined on the table first — see dv-metadata.

Do NOT include alternate key columns in the record body. The alternate key identifies the record; the record body contains the data to set. If the same column appears in both, UpsertMultiple fails with "An unexpected error occurred" (single upsert tolerates it, bulk does not).

from PowerPlatform.Dataverse.models.upsert import UpsertItem

client.records.upsert("account", [
    UpsertItem(
        alternate_key={"accountnumber": "ACC-001"},
        record={"name": "Contoso Ltd", "description": "Primary account"},
    ),
    UpsertItem(
        alternate_key={"accountnumber": "ACC-002"},
        record={"name": "Fabrikam Inc"},
    ),
])

Bulk Import from CSV

For imports that may be re-run (most real-world cases), use UpsertItem with alternate keys instead of create() — see the Multi-Table Import section below. The create() pattern here is for one-shot loads only.

Volume Tool Why
1–10 records MCP create_record Simple, no script
10+ records SDK client.records.create(table, list) Uses CreateMultiple; chunk large datasets (start at 1K, adapt)
import csv, os, sys
sys.path.insert(0, os.path.join(os.getcwd(), "scripts"))
from auth import get_credential, load_env
from PowerPlatform.Dataverse.client import DataverseClient

load_env()
client = DataverseClient(base_url=os.environ["DATAVERSE_URL"], credential=get_credential())

with open("data/customers.csv", newline="", encoding="utf-8") as f:
    rows = list(csv.DictReader(f))

records = [{"new_name": row["name"], "new_email": row["email"]} for row in rows]

# SDK sends all in one POST — chunk to avoid payload/timeout limits
# Start at 1000; for narrow tables (few columns) you can go higher
chunk_size = 1000
for i in range(0, len(records), chunk_size):
    guids = client.records.create("new_customer", records[i:i + chunk_size])
    print(f"Imported {i + len(guids)}/{len(records)} customers", flush=True)

Lookup resolution during import

If the CSV has a human-readable key (e.g., customer_email) but Dataverse needs a GUID, pre-resolve with a lookup dict:

# Build email -> GUID map first
email_to_guid = {}
for page in client.records.get("new_customer", select=["new_customerid", "new_email"]):
    for r in page:
        email_to_guid[r["new_email"]] = r["new_customerid"]

# Use it during import
records = []
for row in rows:
    customer_guid = email_to_guid.get(row["customer_email"])
    if not customer_guid:
        print(f"Skipping row — unknown email: {row['customer_email']}")
        continue
    records.append({
        "new_channel": row["channel"],
        "new_CustomerId@odata.bind": f"/new_customers({customer_guid})",  # verify entity set name via EntityDefinitions
    })

guids = client.records.create("new_interaction", records)

Required field discovery for system tables

Before bulk-creating in a system table (account, contact, opportunity):

  1. Create a single test record with your intended minimal payload
  2. If HttpError 400 is raised, the error message names the missing required field
  3. Some required fields are plugin-enforced and not visible in describe_table
  4. Delete the test record, then proceed with bulk create

Multi-Table Import with FK Dependencies

When importing data across multiple tables with foreign key relationships, follow this sequence:

  1. Create tables with source ID columns (prefix_Src*Id) — see dv-metadata
  2. Create alternate keys on the source ID columns — see dv-metadata "Alternate Keys" section
  3. Create lookup relationships — see dv-metadata
  4. Import data in dependency order using UpsertItem with alternate keys (safe for re-runs)

Using upsert from the start means partial failures, retries, and re-runs never create duplicates. The alternate key lets Dataverse match records by the source system's ID instead of GUIDs.

Deciding which alternate key to create:

  • Database source (SQLite, SQL Server): Read the schema to identify primary keys. The source PK maps directly to the Dataverse alternate key. Agent can decide without asking.
  • Excel/CSV source: Inspect the data for columns with all-unique values (df[col].nunique() == len(df)). Look for naming conventions (*_ID, *_Code). Propose the candidate to the user and confirm — "Column Employee_ID has 500 unique values across 500 rows. Use this as the key?" Do not create the key without confirmation, since uniqueness in current data doesn't guarantee it's the intended business key.
import os, sys, csv, time
sys.path.insert(0, os.path.join(os.getcwd(), "scripts"))
from auth import get_credential, load_env
from PowerPlatform.Dataverse.client import DataverseClient
from PowerPlatform.Dataverse.models.upsert import UpsertItem
from PowerPlatform.Dataverse.core.errors import HttpError
from concurrent.futures import ThreadPoolExecutor, as_completed

load_env()
client = DataverseClient(base_url=os.environ["DATAVERSE_URL"], credential=get_credential())

def bind(entity_set, guid):
    """Build an @odata.bind value. entity_set must be the actual EntitySetName, not a guess."""
    return f"/{entity_set}({guid})"

# IMPORTANT: EntitySetName is NOT always logical_name + 's'.
# Dataverse uses English pluralization: country -> countries, city -> cities,
# winby -> winbies, extraruns -> extrarunses.
# Always query the actual names before building @odata.bind values:
#   GET /api/data/v9.2/EntityDefinitions?$select=LogicalName,EntitySetName

def bulk_upsert(logical_name, items, chunk_size=1000, retries=3):
    """Upsert items in adaptive chunks with retry. Starts at chunk_size, doubles on
    success (up to max_size), halves on size/timeout failure. Caps at last successful
    size to avoid oscillation. Safe for re-runs."""
    import requests as req_lib  # for timeout exception types
    current_size = chunk_size
    max_size = 4000
    i = 0
    while i < len(items):
        chunk = items[i:i + current_size]
        for attempt in range(retries):
            try:
                client.records.upsert(logical_name, chunk)
                print(f"  {logical_name}: {i + len(chunk)}/{len(items)} (chunk={current_size})", flush=True)
                i += len(chunk)
                current_size = min(current_size * 2, max_size)  # ramp up
                break
            except HttpError as e:
                if e.status_code == 429 and attempt < retries - 1:
                    time.sleep(5 * (attempt + 1))
                    continue
                if e.status_code in (413, 500) and current_size > 100:
                    current_size = max(current_size // 2, 100)
                    max_size = current_size
                    print(f"  {logical_name}: chunk capped at {current_size}", flush=True)
                    break  # retry same offset with smaller chunk
                raise
            except req_lib.exceptions.RequestException:
                # Network timeout — SDK default is 120s for POST
                if current_size > 100:
                    current_size = max(current_size // 2, 100)
                    max_size = current_size
                    print(f"  {logical_name}: timeout, chunk capped at {current_size}", flush=True)
                    break
                raise
        else:
            i += len(chunk)  # skip chunk after all retries exhausted

def build_map(logical_name, src_col, id_col):
    """Query Dataverse to build source_id -> GUID map after upsert."""
    result = {}
    for page in client.records.get(logical_name, select=[src_col, id_col]):
        for r in page:
            src_val = r.get(src_col)
            if src_val is not None:
                result[src_val] = r[id_col]
    return result

def upsert_table(logical_name, items, chunk_size=1000):
    """Upsert one table — used as target for ThreadPoolExecutor."""
    bulk_upsert(logical_name, items, chunk_size)
    return logical_name

Import data in dependency levels — parallelize tables within each level:

Tables at the same dependency level are independent of each other and can be imported concurrently. Tables at different levels must be sequential (Level 1 needs Level 0's GUIDs for @odata.bind).

# --- Level 0: All lookup tables concurrently (no FK dependencies) ---
# Alternate keys must already exist. See dv-metadata "Alternate Keys".
level0 = {
    "prefix_country": [UpsertItem(
        alternate_key={"prefix_srccountryid": r["id"]},
        record={"prefix_name": r["name"]},  # key cols must NOT be in record body
    ) for r in country_rows],
    "prefix_team": [UpsertItem(...) for r in team_rows],
    # ... all other Level 0 tables
}

# For composite-key tables (e.g., line items with multi-column PK):
# ALL key columns go in alternate_key, NONE of them in record.
line_items = [UpsertItem(
    alternate_key={
        "prefix_srcorderid": r["order_id"],
        "prefix_srclineno": r["line_no"],
    },
    record={  # only non-key columns here
        "prefix_name": f"Order-{r['order_id']}-Line-{r['line_no']}",
        "prefix_quantity": r["qty"],
        "prefix_unitprice": r["price"],
    },
) for r in order_line_rows]

level0 = {
    # ... lookup tables as above
}

with ThreadPoolExecutor(max_workers=len(level0)) as pool:
    futures = {pool.submit(upsert_table, t, items): t for t, items in level0.items()}
    for f in as_completed(futures):
        table = futures[f]
        try:
            f.result()
            print(f"  {table}: done", flush=True)
        except Exception as e:
            print(f"  {table}: FAILED — {e}", flush=True)
            # Continue — don't kill other tables. Re-run later (upsert is idempotent).

# Build lookup maps by querying back (upsert doesn't return GUIDs)
country_map = build_map("prefix_country", "prefix_srccountryid", "prefix_countryid")
team_map = build_map("prefix_team", "prefix_srcteamid", "prefix_teamid")

# --- Level 1: Tables referencing Level 0, imported concurrently ---
# Build items with @odata.bind using Level 0 maps, then import in parallel
# ... repeat pattern for each level

Key rules:

  • Parallelize across tables at the same level — they share no data pages or indexes. Use ThreadPoolExecutor with one worker per table.
  • Sequential between levels — Level 1 needs Level 0's GUIDs for @odata.bind.
  • Sequential chunks within each table — concurrent writes to the same table cause SQL deadlocks (error 1205).
  • Use UpsertItem with the source system's PK as the alternate key — idempotent, safe for re-runs and partial failures.
  • Do NOT put alternate key columns in the record body. UpsertMultiple fails with "An unexpected error" if key columns appear in both. Single upsert tolerates it; bulk does not.
  • Catch per-table failures in ThreadPoolExecutor — wrap f.result() in try/except. One table failing must not kill the entire executor and prevent other tables from completing.
  • Build GUID maps by querying Dataverse after each level (upsert doesn't return GUIDs).
  • Start with chunk_size=1000 and let the adaptive helper ramp up. Dataverse has no fixed record limit — the constraints are payload size and timeout. Narrow tables (few columns) can handle 2000-4000 per chunk.
  • flush=True on all print statements for real-time progress on Windows.
  • If a source row references a missing lookup ID, skip the row and log it.

Do NOT parallelize chunks within a single table. Concurrent UpsertMultiple/CreateMultiple calls to the same table cause SQL Server deadlocks because concurrent inserts contend on shared data pages and index pages — even though the records are different.

Post-Import Verification

After all levels are imported, verify record counts match the source. Count by iterating pages with a single-column select (memory-efficient — no need to load full DataFrames just for counts):

def count_records(logical_name, id_col):
    return sum(len(page) for page in client.records.get(logical_name, select=[id_col]))

# Build expected counts from source data (e.g., len(rows) per table from earlier import phases)
expected = {"prefix_department": 12, "prefix_employee": 500, "prefix_timesheet": 15000}
for table, exp in expected.items():
    actual = count_records(table, table + "id")  # e.g., prefix_department -> prefix_departmentid
    status = "OK" if actual == exp else f"MISMATCH ({actual})"
    print(f"  {table}: {status} (expected {exp})", flush=True)

For deeper verification (spot-check data values, not just counts), use client.dataframe.get() — see dv-query.

First-Time Import (when you are certain no re-runs are needed)

If you control the environment and are certain the tables are empty, client.records.create() is faster than upsert (no existence check). But if the import fails partway through, re-running will create duplicates. Use this only for one-shot loads into fresh environments:

def bulk_create(logical_name, records, chunk_size=1000):
    """Import via create with adaptive chunking — faster but NOT safe for re-runs."""
    import requests as req_lib
    all_guids = []
    current_size = chunk_size
    max_size = 4000
    i = 0
    while i < len(records):
        chunk = records[i:i + current_size]
        try:
            guids = client.records.create(logical_name, chunk)
            all_guids.extend(guids)
            print(f"  {logical_name}: {i + len(chunk)}/{len(records)} (chunk={current_size})", flush=True)
            i += len(chunk)
            current_size = min(current_size * 2, max_size)
        except HttpError as e:
            if e.status_code in (413, 500) and current_size > 100:
                current_size = max(current_size // 2, 100)
                max_size = current_size
                print(f"  {logical_name}: chunk capped at {current_size}", flush=True)
            else:
                raise
        except req_lib.exceptions.RequestException:
            if current_size > 100:
                current_size = max(current_size // 2, 100)
                max_size = current_size
                print(f"  {logical_name}: timeout, chunk capped at {current_size}", flush=True)
            else:
                raise
    return all_guids

Error Handling

from PowerPlatform.Dataverse.core.errors import HttpError

try:
    guid = client.records.create("new_ticket", {"new_name": "Test"})
except HttpError as e:
    print(f"Status {e.status_code}: {e.message}")
    if e.details:
        print(f"Details: {e.details}")
    # 400 — bad field name, @odata.bind format, or missing required field
    # 403 — check security roles
    # 404 — table or record not found
    # 429 — rate limited; SDK retries automatically, reduce batch size if persistent

Windows Scripting Notes

  • ASCII only in .py files — curly quotes and em dashes cause SyntaxError on Windows.
  • No python -c for multiline code — write a .py file instead.
  • Generate GUIDs in scripts: str(uuid.uuid4()), not shell backtick substitution.

Sample Data Generation

Generate and insert realistic sample records into any Dataverse table. Useful for development, demos, and testing.

Use the Python SDK (client.records.create()) — not raw urllib or requests.

Agentic Flow

Step 1: Confirm environment and count

Before creating anything, confirm:

  • Target environment — run pac auth list to show the active environment
  • Record count — default is 5 records unless the user specifies otherwise
  • Table name — get the logical name (e.g., account, contact, cr123_customtable)

Step 2: Inspect the table schema

Use the EntityDefinitions metadata API to discover required columns and their types. Two non-obvious bits:

  • $filter=AttributeOf eq null — without this, each lookup column returns a duplicated sub-attribute row (e.g. a primarycontactid Lookup plus a _primarycontactid_value pair), which makes the list twice as long and confuses downstream code.
  • UserLocalizedLabel is null on unlocalized columns — dereference safely before reading .Label, otherwise custom tables without display names crash the loop.
import os, sys, json, urllib.request, urllib.parse
sys.path.insert(0, os.path.join(os.getcwd(), "scripts"))
from auth import get_token, load_env  # SDK does not support EntityDefinitions metadata

load_env()
env_url = os.environ["DATAVERSE_URL"].rstrip("/")
TABLE = "account"   # or any other table logical name

params = urllib.parse.urlencode({
    "$select": "LogicalName,AttributeType,RequiredLevel,DisplayName",
    "$filter": "AttributeOf eq null",
})
req = urllib.request.Request(
    f"{env_url}/api/data/v9.2/EntityDefinitions(LogicalName='{TABLE}')/Attributes?{params}",
    headers={"Authorization": f"Bearer {get_token()}", "Accept": "application/json"},
)
with urllib.request.urlopen(req) as resp:
    attrs = json.loads(resp.read())["value"]

for a in attrs:
    dn = (a.get("DisplayName") or {}).get("UserLocalizedLabel")
    label = dn["Label"] if dn else a["LogicalName"]
    if a["RequiredLevel"]["Value"] == "ApplicationRequired":
        print(f"REQUIRED  {a['LogicalName']:30s} {a['AttributeType']:15s} {label}", flush=True)

Filter or group the raw attrs list however the task needs — don't assume the printed shape above; inline code downstream should consume attrs directly.

Step 3: Generate realistic data (by AttributeType)

Use the schema from Step 2 to pick a generator per column. No separate script — the agent writes this inline per request so it matches the actual table.

AttributeType Generate
String / Memo Realistic text based on column name (e.g., name -> company names)
Integer / Decimal / Money Random values within MinValue/MaxValue
Boolean Alternate true/false
DateTime Recent dates in ISO 8601 format
Picklist / Status Integer option values (e.g., industrycode: 1)
Lookup Skip by default — only set if user provides valid record IDs
Uniqueidentifier (non-PK) Skip — let Dataverse auto-generate

Step 4: Create the records inline via the SDK — schema-driven

This template is table-agnostic by design. It reads the attrs list from Step 2 and dispatches by AttributeType — no table-specific field names or hardcoded sample values baked in. Use it as a starting point; override/extend fake() when the table has domain-specific needs (e.g., look up real picklist option values, generate industry-appropriate company names for account, etc.).

import os, sys, random, datetime
sys.path.insert(0, os.path.join(os.getcwd(), "scripts"))
from auth import get_credential, load_env
from PowerPlatform.Dataverse.client import DataverseClient

load_env()
env_url = os.environ["DATAVERSE_URL"].rstrip("/")
client = DataverseClient(base_url=env_url, credential=get_credential())

TABLE = "account"   # any table logical name from Step 1
COUNT = 5           # confirmed with user
# `attrs` = the list returned by Step 2's EntityDefinitions query (re-run Step 2 if needed)

SKIP_TYPES = {"Lookup", "Uniqueidentifier", "EntityName", "State", "Status", "Owner", "Customer"}

def fake(attr, i):
    """Context-based value by AttributeType + PII-safe heuristics on column name."""
    name, t = attr["LogicalName"], attr["AttributeType"]
    if t in ("String", "Memo"):
        if "email" in name: return f"user{i}@example.com"
        if any(s in name for s in ("phone", "telephone", "fax")): return f"555-01{i:02d}"
        if "url" in name or "website" in name: return f"https://example.com/{name}/{i}"
        return f"Sample {name} {i}"
    if t in ("Integer", "BigInt"):          return random.randint(1, 1000)
    if t in ("Decimal", "Double", "Money"): return round(random.uniform(1, 10_000), 2)
    if t == "Boolean":                       return bool(i % 2)
    if t == "DateTime":
        d = datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(days=i)
        return d.isoformat(timespec="seconds").replace("+00:00", "Z")
    if t in ("Picklist", "Status"):
        return 1   # placeholder — for real picklists, look up valid OptionSet values first
    return None    # skip Lookup, Uniqueidentifier, and anything unhandled

required = [a for a in attrs
            if a["RequiredLevel"]["Value"] == "ApplicationRequired"
            and a["AttributeType"] not in SKIP_TYPES]

records = []
for i in range(COUNT):
    rec = {}
    for a in required:
        v = fake(a, i)
        if v is not None:
            rec[a["LogicalName"]] = v
    records.append(rec)

# CreateMultiple for count >= 10, individual creates otherwise.
if COUNT >= 10:
    ids = client.records.create(TABLE, records)
    print(f"Created {len(ids)} records via CreateMultiple", flush=True)
else:
    ids = [client.records.create(TABLE, r) for r in records]
    print(f"Created {len(ids)} records individually", flush=True)

print(f"View: {env_url}/main.aspx?pagetype=entitylist&etn={TABLE}", flush=True)

Why schema-driven and not a hardcoded 5-account template: a template that bakes in account-shaped columns (name, telephone1, revenue, numberofemployees) biases the agent toward copy-paste-then-hack whenever the user asks for contact or cr123_project records. The fake() function above dispatches per-attribute so the same snippet produces correct fields for any table. Override fake() when you need domain-specific values — e.g. real company names for account.name, valid status-reason integers for a custom picklist.

Safety Rules for Sample Data

  • Always confirm the target environment and record count
  • Use .example.com domains for emails — never real domains
  • Use 555-01xx phone numbers — obviously fake
  • Skip lookup fields unless user explicitly asks
  • Skip system fields: createdon, modifiedon, ownerid, statecode, statuscode
Weekly Installs
2
GitHub Stars
70
First Seen
2 days ago