data-sync
Skill: Data Sync
Build data sync integrations that pull records from CRMs and databases into Personize memory using the @personize/sdk. The user provides their source system and filters; you scaffold the connector, batch-memorize records, and deploy on a schedule.
Prerequisites
- Node.js 18+ and npm/pnpm
- A Personize secret key (
sk_live_...) — set asPERSONIZE_SECRET_KEYin.env - Credentials for the source system (Salesforce, HubSpot, Postgres, etc.)
When NOT to Use This Skill
- Want no-code visual workflows → use no-code-pipelines
- Want durable pipelines with retries and scheduling → use code-pipelines
- Only need a single
memorizeBatch()call → use entity-memory directly - Need to design personalization architecture first → use personalization
SDK Quick Reference
import { Personize } from '@personize/sdk';
const client = new Personize({ secretKey: process.env.PERSONIZE_SECRET_KEY! });
| Method | Description |
|---|---|
client.me() |
Get org, user, plan, and rate limits |
client.memory.memorize(opts) |
Store content in RAG memory (AI extraction + vectors) — single item |
client.memory.memorizeBatch(opts) |
Unified batch sync — per-property extractMemories flag controls AI vs structured |
client.memory.upsert(opts) |
Structured upsert — store properties without AI extraction (legacy) |
client.memory.upsertBatch(opts) |
Batch structured upsert — memories array format (legacy) |
client.memory.recall(opts) |
Semantic search across memories |
client.memory.search(opts) |
Filter/export records with conditions |
client.memory.smartDigest(opts) |
Get compiled context bundle for an entity (properties + memories) |
client.ai.smartGuidelines(opts) |
Get relevant context variables for a message |
client.ai.prompt(opts) |
Execute a prompt with tools |
client.guidelines.list() |
List all governance variables |
client.guidelines.create(payload) |
Create a variable |
client.guidelines.update(id, payload) |
Update a variable |
client.guidelines.delete(id) |
Delete a variable |
client.guidelines.getStructure(id) |
Get variable headings |
client.guidelines.getSection(id, opts) |
Get a section by header |
client.collections.list/create/update/delete() |
Manage property collections (full CRUD) |
client.agents.list() |
List available agents |
client.agents.run(id, opts) |
Run an agent |
The extractMemories Flag — Per-Property AI Control
When syncing records, each property in the mapping has an optional extractMemories flag:
extractMemories: true→ AI extraction + vector embeddings (LanceDB) + structured storage (DynamoDB). Use for unstructured content that benefits from semantic search.extractMemories: false(or omitted) → Structured storage only (DynamoDB). Use for structured fields like emails, phone numbers, IDs, dates.
This is a per-property decision, all handled in a single client.memory.memorizeBatch() call:
await client.memory.memorizeBatch({
source: 'Hubspot',
mapping: {
entityType: 'contact',
email: 'email', // source field name containing email
website: 'company_website', // source field name containing website
runName: 'hubspot-contact-sync',
properties: {
// Structured fields — no AI
email: { sourceField: 'email', collectionId: 'col_standard', collectionName: 'Contacts Standard' },
first_name: { sourceField: 'firstname', collectionId: 'col_standard', collectionName: 'Contacts Standard' },
last_name: { sourceField: 'lastname', collectionId: 'col_standard', collectionName: 'Contacts Standard' },
phone: { sourceField: 'phone', collectionId: 'col_standard', collectionName: 'Contacts Standard' },
company: { sourceField: 'company', collectionId: 'col_standard', collectionName: 'Contacts Standard' },
// Unstructured content — AI extraction + vectors
personalization_notes: {
sourceField: 'research_report',
collectionId: 'col_generated',
collectionName: 'Generated Content',
extractMemories: true,
},
email_1_body: {
sourceField: 'email_body_1',
collectionId: 'col_generated',
collectionName: 'Generated Content',
extractMemories: true,
},
call_script: {
sourceField: 'call_script_1',
collectionId: 'col_generated',
collectionName: 'Generated Content',
extractMemories: true,
},
},
},
rows: [
{ email: 'john@acme.com', firstname: 'John', lastname: 'Smith', phone: '+1-555-0123', company: 'Acme Corp', research_report: 'VP of Sales, interested in enterprise plan...', email_body_1: '<p>Hi John...</p>', call_script_1: null },
{ email: 'jane@techstart.com', firstname: 'Jane', lastname: 'Doe', phone: null, company: 'TechStart', research_report: 'CTO, needs 99.9% uptime SLA...', email_body_1: '<p>Hi Jane...</p>', call_script_1: 'Opening: mention their recent Series B...' },
],
chunkSize: 1,
});
Per-Property Decision Guide
| Source Field | extractMemories |
Reason |
|---|---|---|
| Email, Phone, Address | omit (false) | Structured, no AI needed |
| Name, Title, Company | omit (false) | Structured lookup fields |
| Research Reports, Notes | true |
Unstructured, benefits from semantic search |
| Email Bodies, Call Scripts | true |
Generated content, AI extracts facts |
| Revenue, Employee Count | omit (false) | Numeric, filter/sort only |
| Dates (DOB, Created) | omit (false) | Structured date fields |
Single-Item Memorize
For ad-hoc single-item AI memorization (outside a batch sync), use client.memory.memorize():
await client.memory.memorize({
content: 'John mentioned interest in enterprise plan during our call on March 15th.',
speaker: 'Sales Call',
enhanced: true,
tags: ['crm-sync'],
email: 'john@acme.com',
});
Integration Pattern (Step-by-Step)
Follow these steps when the user asks to connect a data source to Personize.
Step 1: Initialize the project
mkdir personize-sync && cd personize-sync
npm init -y
npm install @personize/sdk dotenv
npm install -D typescript ts-node @types/node
Create tsconfig.json:
{
"compilerOptions": {
"target": "ES2020",
"module": "commonjs",
"outDir": "dist",
"rootDir": "src",
"strict": true,
"esModuleInterop": true,
"resolveJsonModule": true,
"declaration": true
},
"include": ["src"]
}
Create .env:
PERSONIZE_SECRET_KEY=sk_live_...
# Add source-specific credentials below
Create src/personize.ts (shared client):
import { Personize } from '@personize/sdk';
import 'dotenv/config';
export const client = new Personize({
secretKey: process.env.PERSONIZE_SECRET_KEY!,
});
Step 2: Check auth and read rate limits
Always start every sync script by verifying the key and reading plan limits:
const { data } = await client.me();
const perMinute = data!.plan.limits.maxApiCallsPerMinute;
const perMonth = data!.plan.limits.maxApiCallsPerMonth;
const batchSize = Math.floor(perMinute * 0.9); // leave headroom
console.log(`Plan: ${data!.plan.name} — ${perMinute}/min, ${perMonth}/month`);
Step 3: Connect to the source and fetch rows
Install the source-specific client library and write a fetchRows() function that returns flat key-value objects — each row is a plain object whose keys match the sourceField names you'll use in the mapping (Step 5).
See the template files for source-specific fetch patterns:
- Salesforce: See
templates/salesforce.md - HubSpot: See
templates/hubspot.md - Postgres/MySQL: See
templates/postgres.md
Example output shape (keys match CRM field names):
const rows = [
{ email: 'john@acme.com', firstname: 'John', lastname: 'Smith', phone: '+1-555-0123', company: 'Acme Corp', notes: 'VP of Sales, interested in enterprise plan...' },
{ email: 'jane@techstart.com', firstname: 'Jane', lastname: 'Doe', phone: null, company: 'TechStart', notes: 'CTO, needs 99.9% uptime SLA...' },
];
Important: The row keys must exactly match the sourceField values in the mapping. Do not transform the data — pass the raw CRM field values.
Step 4: Discover collections
Before building the property mapping, fetch the available collections from the user's account:
const collections = await client.collections.list();
for (const col of collections.data?.actions || []) {
console.log(`${col.payload.collectionId} — ${col.payload.collectionName}`);
}
Use the actual collectionId and collectionName values from this output in the mapping below. Do not use placeholder IDs.
Step 5: Build the property mapping
Define which source fields map to which Personize properties, and which should have AI extraction. Use extractMemories: true for unstructured content. See the "extractMemories Flag" section above.
import { BatchMemorizeMapping } from '@personize/sdk';
const mapping: BatchMemorizeMapping = {
entityType: 'contact',
email: 'email', // source field containing email
website: 'website', // source field containing website URL
runName: `crm-sync-${Date.now()}`,
properties: {
// Structured fields — stored directly (use collectionId from Step 4)
email: { sourceField: 'email', collectionId: 'col_YOUR_STANDARD', collectionName: 'Contacts Standard Schema' },
first_name: { sourceField: 'firstname', collectionId: 'col_YOUR_STANDARD', collectionName: 'Contacts Standard Schema' },
last_name: { sourceField: 'lastname', collectionId: 'col_YOUR_STANDARD', collectionName: 'Contacts Standard Schema' },
company: { sourceField: 'company', collectionId: 'col_YOUR_STANDARD', collectionName: 'Contacts Standard Schema' },
// Unstructured fields — AI extraction + vectors
notes: {
sourceField: 'notes',
collectionId: 'col_YOUR_GENERATED',
collectionName: 'Generated Content',
extractMemories: true,
},
},
};
Important: Replace col_YOUR_STANDARD and col_YOUR_GENERATED with the actual collectionId values from Step 4. The sourceField values must match the keys in your row objects from Step 3.
Step 6: Batch sync with rate-limit awareness
Send rows through memorizeBatch(). The API handles both structured storage and AI extraction in a single call based on each property's extractMemories flag.
async function batchSync(rows: Record<string, any>[], mapping: BatchMemorizeMapping, batchSize: number) {
for (let i = 0; i < rows.length; i += batchSize) {
const batch = rows.slice(i, i + batchSize);
try {
const result = await client.memory.memorizeBatch({
source: 'CRM Sync',
mapping,
rows: batch,
chunkSize: 1,
});
console.log(`Batch ${Math.floor(i / batchSize) + 1}: ${batch.length} rows — success: ${result.success}`);
} catch (err: any) {
if (err?.response?.status === 429) {
const retryAfter = err.response.data?.retryAfterSeconds || 60;
console.log(`Rate limited. Waiting ${retryAfter}s...`);
await new Promise(r => setTimeout(r, retryAfter * 1000));
i -= batchSize; // retry this batch
continue;
}
throw err;
}
// Wait for rate limit window to reset before next batch
if (i + batchSize < rows.length) {
console.log('Waiting 62s for rate limit window...');
await new Promise(r => setTimeout(r, 62_000));
}
}
}
Step 7: Verify with export or recall
After syncing, verify the data landed correctly:
// Semantic recall — find a specific record
const result = await client.memory.recall({
query: 'What do we know about John Smith?',
limit: 5,
});
console.log(result.data);
// Filter export — list all synced records
const exported = await client.memory.search({
type: 'Contact',
returnRecords: true,
pageSize: 10,
});
console.log(`Found ${exported.data?.totalMatched} records`);
Step 8: Deploy as a scheduled job
Add build scripts to package.json:
{
"scripts": {
"build": "tsc",
"sync": "node dist/sync.js",
"sync:dev": "npx ts-node src/sync.ts"
}
}
See the deploy/ folder for deployment configs:
- Render: Use
deploy/render.yaml— cron service with schedule - GitHub Actions: Use
deploy/github-action.yml— cron workflow - Docker: Use
deploy/Dockerfile— for any container platform
Rate Limit Handling
When a rate limit is hit, the API returns HTTP 429:
{
"success": false,
"error": "rate_limit_exceeded",
"message": "Per-minute limit reached (60/min). Retry after 60 seconds.",
"limit": 60,
"current": 60,
"window": "per_minute",
"retryAfterSeconds": 60
}
Handle 429 errors with retry logic:
async function memorizeWithRetry(data: any, maxRetries = 3) {
for (let attempt = 0; attempt <= maxRetries; attempt++) {
try {
return await client.memory.memorize(data);
} catch (err: any) {
if (err?.response?.status === 429) {
const retryAfter = err.response.data?.retryAfterSeconds || 60;
console.log(`Rate limited. Retrying in ${retryAfter}s (attempt ${attempt + 1}/${maxRetries})`);
await new Promise(r => setTimeout(r, retryAfter * 1000));
} else {
throw err;
}
}
}
throw new Error('Max retries exceeded');
}
Plan limits reference
| Plan | Per Minute | Per Month |
|---|---|---|
| Free | 60 | 10,000 |
| Starter | 120 | 50,000 |
| Pro | 300 | 250,000 |
| Enterprise | 1,000 | 2,000,000 |
Always call client.me() first to get the actual limits for the user's plan.
After Sync: Connect Governance and Context
Data in memory is only useful when agents can access it with governance. After your sync is running:
- Set up guidelines — use the
governanceskill to create org rules that govern how agents use this data - Assemble context — use
smartGuidelines()+smartDigest()+recall()to build governed context for generation - Wire pipelines — use
personalizationorcode-pipelinesto build pipelines that consume this data
See the
collaborationskill'sreference/architecture.mdfor the three-layer model (Guidelines + Memory + Workspace).
Advanced Patterns
For incremental sync, adding multiple sync sources, batch export/recall with filter operators, a complete end-to-end example script, and using memories via MCP — see reference/advanced-patterns.md.
Constraints
Keywords follow RFC 2119: MUST = non-negotiable, SHOULD = strong default (override with stated reasoning), MAY = agent discretion.
- MUST call
client.me()at the start of every sync script to verify auth and read rate limits -- because starting a batch sync without knowing limits causes 429 errors and partial data states with no automatic resume. - MUST ensure row object keys exactly match the
sourceFieldvalues in the property mapping -- because mismatched keys cause silent null writes with no error from the API. - MUST use actual
collectionIdvalues fromclient.collections.list(), not placeholder strings -- because placeholder IDs cause API errors or writes to wrong collections.
More from personizeai/personize-skills
personize-memory
Stores and retrieves persistent memory about records — contacts, companies, employees, members, and more. Handles memorization (single and batch with per-property AI extraction), semantic recall, entity digests, and data export. Use this skill whenever the user wants to store data, sync records from a CRM or database, query or search memory, recall what's known about a person or company, assemble context for personalization, import CSV or spreadsheet data, or do anything involving the Personize SDK's memory methods (memorize, recall, smartRecall, smartDigest, search, memorizeBatch). Also use when the user mentions contacts, leads, accounts, customer data, or entity properties.
21entity-memory
Stores and retrieves persistent memory about records — contacts, companies, employees, members, and more. Handles memorization (single and batch with per-property AI extraction), semantic recall, entity digests, and data export. Use this skill whenever the user wants to store data, sync records from a CRM or database, query or search memory, recall what's known about a person or company, assemble context for personalization, import CSV or spreadsheet data, or do anything involving the Personize SDK's memory methods (memorize, recall, smartRecall, smartDigest, search, memorizeBatch). Also use when the user mentions contacts, leads, accounts, customer data, or entity properties.
12no-code-pipelines
Generates importable n8n workflow JSON files that sync data between Personize and 400+ apps. Produces ready-to-import workflows for batch sync, webhook ingestion, per-record AI enrichment, and data export — no code required. Use this skill whenever the user wants no-code integrations, visual workflows, n8n automation, or to connect Personize to HubSpot, Salesforce, Google Sheets, Slack, Postgres, or any app without writing code. Also trigger when they mention 'workflow automation', 'scheduled sync without code', 'visual pipeline', or 'connect Personize to [app]' and don't want to write TypeScript.
12code-pipelines
Builds, deploys, and iterates production-ready AI agent pipelines using Trigger.dev and the Personize SDK. Handles the full lifecycle: interview the user about what they want, design the schema and governance, write the pipeline code, deploy it, monitor results, and iterate based on feedback. Generates TypeScript tasks for outbound sequences, inbound lead processing, conversational reply handlers, enrichment pipelines, and account signal monitoring — all backed by Personize memory, AI context, and governance. Use this skill whenever someone wants to build an AI agent, automated workflow, email sequence, drip campaign, cold outreach, lead enrichment, reply handler, account monitor, CRM automation, daily digest, or any durable pipeline — whether they provide technical specs or just describe what they want in plain language. Also trigger for Trigger.dev, background tasks, self-scheduling follow-ups, GTM automation, 'build me an agent that...', or 'I want to automate...'.
11collaboration
Turn any record into a shared workspace where agents and humans collaborate. Attach a simple workspace schema to any entity — contacts, companies, deals, projects, tickets — and let any participant contribute updates, tasks, notes, and issues. Use this skill whenever the user wants multi-agent collaboration, shared context on an entity, agent handoffs, workspace-based coordination, or the three-layer agent operating model (Guidelines + Memory + Workspace). Also trigger when they mention multiple agents working on the same record, deal rooms, account intelligence, customer health monitoring, cross-functional coordination, or progressive autonomy for AI agents.
11governance
Manages organizational guidelines, policies, and best practices as governance variables accessible to all AI agents via SmartContext. Use this skill whenever the user wants to create, update, or manage guidelines, brand voice, compliance policies, playbooks, ICPs, sales playbooks, tone rules, or any organizational rules. Also trigger when the user mentions smartGuidelines, governance variables, GitOps sync of policies, team knowledge sharing, AI agent rules, or when they want all their AI tools to follow the same policies. Even if they just say 'set up rules' or 'add a policy', this is the right skill.
11