data-sync

SKILL.md

Skill: Data Sync

Build data sync integrations that pull records from CRMs and databases into Personize memory using the @personize/sdk. The user provides their source system and filters; you scaffold the connector, batch-memorize records, and deploy on a schedule.


Prerequisites

  • Node.js 18+ and npm/pnpm
  • A Personize secret key (sk_live_...) — set as PERSONIZE_SECRET_KEY in .env
  • Credentials for the source system (Salesforce, HubSpot, Postgres, etc.)

When NOT to Use This Skill

  • Want no-code visual workflows → use no-code-pipelines
  • Want durable pipelines with retries and scheduling → use code-pipelines
  • Only need a single memorizeBatch() call → use entity-memory directly
  • Need to design personalization architecture first → use personalization

SDK Quick Reference

import { Personize } from '@personize/sdk';
const client = new Personize({ secretKey: process.env.PERSONIZE_SECRET_KEY! });
Method Description
client.me() Get org, user, plan, and rate limits
client.memory.memorize(opts) Store content in RAG memory (AI extraction + vectors) — single item
client.memory.memorizeBatch(opts) Unified batch sync — per-property extractMemories flag controls AI vs structured
client.memory.upsert(opts) Structured upsert — store properties without AI extraction (legacy)
client.memory.upsertBatch(opts) Batch structured upsert — memories array format (legacy)
client.memory.recall(opts) Semantic search across memories
client.memory.search(opts) Filter/export records with conditions
client.memory.smartDigest(opts) Get compiled context bundle for an entity (properties + memories)
client.ai.smartGuidelines(opts) Get relevant context variables for a message
client.ai.prompt(opts) Execute a prompt with tools
client.guidelines.list() List all governance variables
client.guidelines.create(payload) Create a variable
client.guidelines.update(id, payload) Update a variable
client.guidelines.delete(id) Delete a variable
client.guidelines.getStructure(id) Get variable headings
client.guidelines.getSection(id, opts) Get a section by header
client.collections.list/create/update/delete() Manage property collections (full CRUD)
client.agents.list() List available agents
client.agents.run(id, opts) Run an agent

The extractMemories Flag — Per-Property AI Control

When syncing records, each property in the mapping has an optional extractMemories flag:

  • extractMemories: true → AI extraction + vector embeddings (LanceDB) + structured storage (DynamoDB). Use for unstructured content that benefits from semantic search.
  • extractMemories: false (or omitted) → Structured storage only (DynamoDB). Use for structured fields like emails, phone numbers, IDs, dates.

This is a per-property decision, all handled in a single client.memory.memorizeBatch() call:

await client.memory.memorizeBatch({
    source: 'Hubspot',
    mapping: {
        entityType: 'contact',
        email: 'email',               // source field name containing email
        website: 'company_website',    // source field name containing website
        runName: 'hubspot-contact-sync',
        properties: {
            // Structured fields — no AI
            email:      { sourceField: 'email',      collectionId: 'col_standard', collectionName: 'Contacts Standard' },
            first_name: { sourceField: 'firstname',  collectionId: 'col_standard', collectionName: 'Contacts Standard' },
            last_name:  { sourceField: 'lastname',   collectionId: 'col_standard', collectionName: 'Contacts Standard' },
            phone:      { sourceField: 'phone',      collectionId: 'col_standard', collectionName: 'Contacts Standard' },
            company:    { sourceField: 'company',     collectionId: 'col_standard', collectionName: 'Contacts Standard' },
            // Unstructured content — AI extraction + vectors
            personalization_notes: {
                sourceField: 'research_report',
                collectionId: 'col_generated',
                collectionName: 'Generated Content',
                extractMemories: true,
            },
            email_1_body: {
                sourceField: 'email_body_1',
                collectionId: 'col_generated',
                collectionName: 'Generated Content',
                extractMemories: true,
            },
            call_script: {
                sourceField: 'call_script_1',
                collectionId: 'col_generated',
                collectionName: 'Generated Content',
                extractMemories: true,
            },
        },
    },
    rows: [
        { email: 'john@acme.com', firstname: 'John', lastname: 'Smith', phone: '+1-555-0123', company: 'Acme Corp', research_report: 'VP of Sales, interested in enterprise plan...', email_body_1: '<p>Hi John...</p>', call_script_1: null },
        { email: 'jane@techstart.com', firstname: 'Jane', lastname: 'Doe', phone: null, company: 'TechStart', research_report: 'CTO, needs 99.9% uptime SLA...', email_body_1: '<p>Hi Jane...</p>', call_script_1: 'Opening: mention their recent Series B...' },
    ],
    chunkSize: 1,
});

Per-Property Decision Guide

Source Field extractMemories Reason
Email, Phone, Address omit (false) Structured, no AI needed
Name, Title, Company omit (false) Structured lookup fields
Research Reports, Notes true Unstructured, benefits from semantic search
Email Bodies, Call Scripts true Generated content, AI extracts facts
Revenue, Employee Count omit (false) Numeric, filter/sort only
Dates (DOB, Created) omit (false) Structured date fields

Single-Item Memorize

For ad-hoc single-item AI memorization (outside a batch sync), use client.memory.memorize():

await client.memory.memorize({
    content: 'John mentioned interest in enterprise plan during our call on March 15th.',
    speaker: 'Sales Call',
    enhanced: true,
    tags: ['crm-sync'],
    email: 'john@acme.com',
});

Integration Pattern (Step-by-Step)

Follow these steps when the user asks to connect a data source to Personize.

Step 1: Initialize the project

mkdir personize-sync && cd personize-sync
npm init -y
npm install @personize/sdk dotenv
npm install -D typescript ts-node @types/node

Create tsconfig.json:

{
    "compilerOptions": {
        "target": "ES2020",
        "module": "commonjs",
        "outDir": "dist",
        "rootDir": "src",
        "strict": true,
        "esModuleInterop": true,
        "resolveJsonModule": true,
        "declaration": true
    },
    "include": ["src"]
}

Create .env:

PERSONIZE_SECRET_KEY=sk_live_...
# Add source-specific credentials below

Create src/personize.ts (shared client):

import { Personize } from '@personize/sdk';
import 'dotenv/config';

export const client = new Personize({
    secretKey: process.env.PERSONIZE_SECRET_KEY!,
});

Step 2: Check auth and read rate limits

Always start every sync script by verifying the key and reading plan limits:

const { data } = await client.me();
const perMinute = data!.plan.limits.maxApiCallsPerMinute;
const perMonth = data!.plan.limits.maxApiCallsPerMonth;
const batchSize = Math.floor(perMinute * 0.9); // leave headroom
console.log(`Plan: ${data!.plan.name}${perMinute}/min, ${perMonth}/month`);

Step 3: Connect to the source and fetch rows

Install the source-specific client library and write a fetchRows() function that returns flat key-value objects — each row is a plain object whose keys match the sourceField names you'll use in the mapping (Step 5).

See the template files for source-specific fetch patterns:

  • Salesforce: See templates/salesforce.md
  • HubSpot: See templates/hubspot.md
  • Postgres/MySQL: See templates/postgres.md

Example output shape (keys match CRM field names):

const rows = [
    { email: 'john@acme.com', firstname: 'John', lastname: 'Smith', phone: '+1-555-0123', company: 'Acme Corp', notes: 'VP of Sales, interested in enterprise plan...' },
    { email: 'jane@techstart.com', firstname: 'Jane', lastname: 'Doe', phone: null, company: 'TechStart', notes: 'CTO, needs 99.9% uptime SLA...' },
];

Important: The row keys must exactly match the sourceField values in the mapping. Do not transform the data — pass the raw CRM field values.

Step 4: Discover collections

Before building the property mapping, fetch the available collections from the user's account:

const collections = await client.collections.list();
for (const col of collections.data?.actions || []) {
    console.log(`${col.payload.collectionId}${col.payload.collectionName}`);
}

Use the actual collectionId and collectionName values from this output in the mapping below. Do not use placeholder IDs.

Step 5: Build the property mapping

Define which source fields map to which Personize properties, and which should have AI extraction. Use extractMemories: true for unstructured content. See the "extractMemories Flag" section above.

import { BatchMemorizeMapping } from '@personize/sdk';

const mapping: BatchMemorizeMapping = {
    entityType: 'contact',
    email: 'email',          // source field containing email
    website: 'website',      // source field containing website URL
    runName: `crm-sync-${Date.now()}`,
    properties: {
        // Structured fields — stored directly (use collectionId from Step 4)
        email:      { sourceField: 'email',     collectionId: 'col_YOUR_STANDARD', collectionName: 'Contacts Standard Schema' },
        first_name: { sourceField: 'firstname', collectionId: 'col_YOUR_STANDARD', collectionName: 'Contacts Standard Schema' },
        last_name:  { sourceField: 'lastname',  collectionId: 'col_YOUR_STANDARD', collectionName: 'Contacts Standard Schema' },
        company:    { sourceField: 'company',   collectionId: 'col_YOUR_STANDARD', collectionName: 'Contacts Standard Schema' },
        // Unstructured fields — AI extraction + vectors
        notes: {
            sourceField: 'notes',
            collectionId: 'col_YOUR_GENERATED',
            collectionName: 'Generated Content',
            extractMemories: true,
        },
    },
};

Important: Replace col_YOUR_STANDARD and col_YOUR_GENERATED with the actual collectionId values from Step 4. The sourceField values must match the keys in your row objects from Step 3.

Step 6: Batch sync with rate-limit awareness

Send rows through memorizeBatch(). The API handles both structured storage and AI extraction in a single call based on each property's extractMemories flag.

async function batchSync(rows: Record<string, any>[], mapping: BatchMemorizeMapping, batchSize: number) {
    for (let i = 0; i < rows.length; i += batchSize) {
        const batch = rows.slice(i, i + batchSize);

        try {
            const result = await client.memory.memorizeBatch({
                source: 'CRM Sync',
                mapping,
                rows: batch,
                chunkSize: 1,
            });

            console.log(`Batch ${Math.floor(i / batchSize) + 1}: ${batch.length} rows — success: ${result.success}`);
        } catch (err: any) {
            if (err?.response?.status === 429) {
                const retryAfter = err.response.data?.retryAfterSeconds || 60;
                console.log(`Rate limited. Waiting ${retryAfter}s...`);
                await new Promise(r => setTimeout(r, retryAfter * 1000));
                i -= batchSize; // retry this batch
                continue;
            }
            throw err;
        }

        // Wait for rate limit window to reset before next batch
        if (i + batchSize < rows.length) {
            console.log('Waiting 62s for rate limit window...');
            await new Promise(r => setTimeout(r, 62_000));
        }
    }
}

Step 7: Verify with export or recall

After syncing, verify the data landed correctly:

// Semantic recall — find a specific record
const result = await client.memory.recall({
    query: 'What do we know about John Smith?',
    limit: 5,
});
console.log(result.data);

// Filter export — list all synced records
const exported = await client.memory.search({
    type: 'Contact',
    returnRecords: true,
    pageSize: 10,
});
console.log(`Found ${exported.data?.totalMatched} records`);

Step 8: Deploy as a scheduled job

Add build scripts to package.json:

{
    "scripts": {
        "build": "tsc",
        "sync": "node dist/sync.js",
        "sync:dev": "npx ts-node src/sync.ts"
    }
}

See the deploy/ folder for deployment configs:

  • Render: Use deploy/render.yaml — cron service with schedule
  • GitHub Actions: Use deploy/github-action.yml — cron workflow
  • Docker: Use deploy/Dockerfile — for any container platform

Rate Limit Handling

When a rate limit is hit, the API returns HTTP 429:

{
    "success": false,
    "error": "rate_limit_exceeded",
    "message": "Per-minute limit reached (60/min). Retry after 60 seconds.",
    "limit": 60,
    "current": 60,
    "window": "per_minute",
    "retryAfterSeconds": 60
}

Handle 429 errors with retry logic:

async function memorizeWithRetry(data: any, maxRetries = 3) {
    for (let attempt = 0; attempt <= maxRetries; attempt++) {
        try {
            return await client.memory.memorize(data);
        } catch (err: any) {
            if (err?.response?.status === 429) {
                const retryAfter = err.response.data?.retryAfterSeconds || 60;
                console.log(`Rate limited. Retrying in ${retryAfter}s (attempt ${attempt + 1}/${maxRetries})`);
                await new Promise(r => setTimeout(r, retryAfter * 1000));
            } else {
                throw err;
            }
        }
    }
    throw new Error('Max retries exceeded');
}

Plan limits reference

Plan Per Minute Per Month
Free 60 10,000
Starter 120 50,000
Pro 300 250,000
Enterprise 1,000 2,000,000

Always call client.me() first to get the actual limits for the user's plan.


After Sync: Connect Governance and Context

Data in memory is only useful when agents can access it with governance. After your sync is running:

  1. Set up guidelines — use the governance skill to create org rules that govern how agents use this data
  2. Assemble context — use smartGuidelines() + smartDigest() + recall() to build governed context for generation
  3. Wire pipelines — use personalization or code-pipelines to build pipelines that consume this data

See the collaboration skill's reference/architecture.md for the three-layer model (Guidelines + Memory + Workspace).


Advanced Patterns

For incremental sync, adding multiple sync sources, batch export/recall with filter operators, a complete end-to-end example script, and using memories via MCP — see reference/advanced-patterns.md.


Constraints

Keywords follow RFC 2119: MUST = non-negotiable, SHOULD = strong default (override with stated reasoning), MAY = agent discretion.

  1. MUST call client.me() at the start of every sync script to verify auth and read rate limits -- because starting a batch sync without knowing limits causes 429 errors and partial data states with no automatic resume.
  2. MUST ensure row object keys exactly match the sourceField values in the property mapping -- because mismatched keys cause silent null writes with no error from the API.
  3. MUST use actual collectionId values from client.collections.list(), not placeholder strings -- because placeholder IDs cause API errors or writes to wrong collections.
Weekly Installs
1
First Seen
12 days ago
Installed on
mcpjam1
claude-code1
replit1
junie1
windsurf1
zencoder1