NYC

cdc

SKILL.md

CDC (Change Data Capture) - Internal Feature Map

Overview

CDC tracks INSERT/UPDATE/DELETE changes on database tables by writing change records into a dedicated CDC table (turso_cdc by default). It is per-connection, enabled via PRAGMA, and operates at the bytecode generation (translate) layer. The sync engine consumes CDC records to push local changes to the remote.

Architecture Diagram

User SQL (INSERT/UPDATE/DELETE/DDL)
        |
        v
  ┌─────────────────────────────────────────────────┐
  │  Translate layer (core/translate/)              │
  │  ┌───────────────────────────────────────────┐  │
  │  │ prepare_cdc_if_necessary()                │  │
  │  │   - checks CaptureDataChangesInfo         │  │
  │  │   - opens CDC table cursor (OpenWrite)    │  │
  │  │   - skips if target == CDC table itself   │  │
  │  └───────────────────────────────────────────┘  │
  │  ┌───────────────────────────────────────────┐  │
  │  │ emit_cdc_insns()                          │  │
  │  │   - writes (change_id, change_time,       │  │
  │  │     change_type, table_name, id,          │  │
  │  │     before, after, updates) into CDC tbl  │  │
  │  └───────────────────────────────────────────┘  │
  │  + emit_cdc_full_record() / emit_cdc_patch_record() │
  └─────────────────────────────────────────────────┘
        |
        v
  CDC table (turso_cdc or custom name)
        |
        v
  ┌─────────────────────────────────────────────────┐
  │  Sync engine (sync/engine/)                     │
  │  DatabaseTape reads CDC table → DatabaseChange  │
  │  → apply/revert → push to remote                 │
  └─────────────────────────────────────────────────┘

Core Data Types

CaptureDataChangesMode + CaptureDataChangesInfocore/lib.rs

CDC behavior is controlled by two types:

#[derive(Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd)]
#[repr(u8)]
enum CdcVersion {
    V1 = 1,
    V2 = 2,
}

const CDC_VERSION_CURRENT: CdcVersion = CdcVersion::V2;

enum CaptureDataChangesMode {
    Id,          // capture only rowid
    Before,      // capture before-image
    After,       // capture after-image
    Full,        // before + after + updates
}

struct CaptureDataChangesInfo {
    mode: CaptureDataChangesMode,
    table: String,                  // CDC table name
    version: Option<CdcVersion>,    // schema version (V1 or V2)
}

The connection stores Option<CaptureDataChangesInfo>None means CDC is off.

Key methods on CdcVersion:

  • has_commit_record()self >= V2, gates COMMIT record emission
  • Display/FromStr — round-trips "v1"V1, "v2"V2

Key methods on CaptureDataChangesInfo:

  • parse(value: &str, version: Option<CdcVersion>) — parses PRAGMA argument "<mode>[,<table_name>]", returns None for "off"
  • cdc_version() — returns CdcVersion (panics if version is None). Single accessor replacing old is_v1()/is_v2()/version() methods.
  • has_before() / has_after() / has_updates() — mode capability checks
  • mode_name() — returns mode as string

Convenience trait CaptureDataChangesExt on Option<CaptureDataChangesInfo> provides:

  • has_before() / has_after() / has_updates() — delegates to inner, returns false for None
  • table() — returns Option<&str>, None when CDC is off

CDC Table Schema v1

Default table name: turso_cdc (constant TURSO_CDC_DEFAULT_TABLE_NAME)

CREATE TABLE turso_cdc (
    change_id   INTEGER PRIMARY KEY AUTOINCREMENT,
    change_time INTEGER,        -- unixepoch()
    change_type INTEGER,        -- 1=INSERT, 0=UPDATE, -1=DELETE
    table_name  TEXT,
    id          <untyped>,      -- rowid of changed row
    before      BLOB,           -- binary record (before-image)
    after       BLOB,           -- binary record (after-image)
    updates     BLOB            -- binary record of per-column changes
);

CDC Table Schema v2 (current)

CREATE TABLE turso_cdc (
    change_id    INTEGER PRIMARY KEY AUTOINCREMENT,
    change_time  INTEGER,        -- unixepoch()
    change_type  INTEGER,        -- 1=INSERT, 0=UPDATE, -1=DELETE, 2=COMMIT
    table_name   TEXT,
    id           <untyped>,      -- rowid of changed row
    before       BLOB,           -- binary record (before-image)
    after        BLOB,           -- binary record (after-image)
    updates      BLOB,           -- binary record of per-column changes
    change_txn_id INTEGER        -- transaction ID (groups rows into transactions)
);

v2 adds:

  • change_txn_id column — groups CDC rows by transaction. Assigned via conn_txn_id(candidate) opcode which get-or-sets a per-connection transaction ID.
  • change_type=2 (COMMIT) records — mark transaction boundaries. Emitted once per statement in autocommit mode, or on explicit COMMIT.

The CDC table is created at runtime by the InitCdcVersion opcode via CREATE TABLE IF NOT EXISTS.

CDC Version Table

When CDC is first enabled, a version tracking table is created:

CREATE TABLE turso_cdc_version (
    table_name TEXT PRIMARY KEY,
    version TEXT NOT NULL
);

Current version: CDC_VERSION_CURRENT = CdcVersion::V2 (defined in core/lib.rs, re-exported from core/translate/pragma.rs)

Version Detection in InitCdcVersion

The InitCdcVersion opcode detects v1 vs v2 by checking whether the CDC table already exists before creating it:

  • If CDC table already exists but has no version row → v1 (pre-existing table from before version tracking)
  • If CDC table doesn't exist → create with current version (v2)
  • If version row already exists → use that version as-is

DatabaseChangesync/engine/src/types.rs:229-249

Sync engine's Rust representation of a CDC row. Has into_apply() and into_revert() methods for forward/backward replay.

OperationModecore/translate/emitter.rs

Used by emit_cdc_insns() to determine change_type value:

  • INSERT → 1
  • UPDATE / SELECT → 0
  • DELETE → -1
  • COMMIT → 2 (v2 only, emitted by emit_cdc_commit_insns)

Entry Points

1. PRAGMA — Enable/Disable CDC

Set: core/translate/pragma.rs

  • Checks MVCC is not enabled (CDC and MVCC are mutually exclusive)
  • Parses mode string via CaptureDataChangesInfo::parse() with CDC_VERSION_CURRENT
  • Emits a single InitCdcVersion opcode — all CDC setup (table creation, version tracking, state change) happens at execution time

Get (read current mode): core/translate/pragma.rs

  • Returns 3 columns: mode, table, version
  • When off: returns ("off", NULL, NULL)
  • When active: returns (mode_name, table, version)

Pragma registration: core/pragma.rsUnstableCaptureDataChangesConn with columns ["mode", "table", "version"]

2. Connection State

Field: core/connection.rscapture_data_changes: RwLock<Option<CaptureDataChangesInfo>> Getter: get_capture_data_changes_info() — returns read guard Setter: set_capture_data_changes_info(opts: Option<CaptureDataChangesInfo>) Default: initialized as None (CDC off)

3. ProgramBuilder Integration

Field: core/vdbe/builder.rscapture_data_changes_info: Option<CaptureDataChangesInfo> Accessor: capture_data_changes_info() — returns &Option<CaptureDataChangesInfo> Passed from: core/translate/mod.rs — read from connection when creating builder

4. PrepareContext

Field: core/vdbe/mod.rscapture_data_changes: Option<CaptureDataChangesInfo> Set from: PrepareContext::from_connection() — clones from connection.get_capture_data_changes_info()

5. InitCdcVersion Opcode — core/vdbe/execute.rs

Always emitted by PRAGMA SET. Handles all CDC setup at execution time:

  1. For "off": stores None in state.pending_cdc_info, returns early
  2. Checks if CDC table already exists (for v1 backward compatibility)
  3. Creates CDC table (CREATE TABLE IF NOT EXISTS <cdc_table_name> ...) — v2 schema with change_txn_id column
  4. Creates version table (CREATE TABLE IF NOT EXISTS turso_cdc_version ...)
  5. Inserts version row: if CDC table pre-existed → "v1", otherwise → current version ("v2"). Uses INSERT OR IGNORE to preserve existing version rows.
  6. Reads back actual version from the table
  7. Stores computed CaptureDataChangesInfo in state.pending_cdc_info

The connection's CDC state is not applied in the opcode. Instead, pending_cdc_info is applied in halt() only after the transaction commits successfully. This ensures atomicity: if any step fails and the transaction rolls back, the connection's CDC state remains unchanged.

All table creation is done via nested conn.prepare()/run_ignore_rows() calls rather than bytecode emission, because the PRAGMA plan can't contain DML against tables that don't exist yet in the schema.

Bytecode Emission (core/translate/emitter.rs)

These are the core CDC code generation functions:

Function Purpose
prepare_cdc_if_necessary() Opens CDC table cursor if CDC is active and target != CDC table
emit_cdc_full_record() Reads all columns from cursor into a MakeRecord (for before/after image)
emit_cdc_patch_record() Builds record from in-flight register values (for after-image of INSERT/UPDATE)
emit_cdc_insns() Writes a single CDC row per changed row (INSERT/UPDATE/DELETE). Called per-row inside DML loops.
emit_cdc_commit_insns() Writes a COMMIT record (change_type=2) into CDC table (v2 only). Raw emission, no autocommit check.
emit_cdc_autocommit_commit() End-of-statement COMMIT emission. Checks is_autocommit() at runtime — only emits COMMIT if in autocommit mode. v2 only.

COMMIT Emission Strategy (v2)

Per-row call sites use emit_cdc_insns() (no COMMIT). End-of-statement sites call emit_cdc_autocommit_commit() which checks is_autocommit() at runtime:

  • Autocommit mode: emits a COMMIT record after the statement completes
  • Explicit transaction (BEGIN...COMMIT): skips per-statement COMMIT; the explicit COMMIT statement emits the COMMIT record via emit_cdc_commit_insns()

This ensures multi-row statements like INSERT INTO t VALUES (1),(2),(3) produce one COMMIT at the end, not one per row.

Integration Points — Where CDC Records Are Emitted

INSERT — core/translate/insert.rs

  • Per-row: emit_cdc_insns() after insert, and before delete for REPLACE/conflict
  • End-of-statement: emit_cdc_autocommit_commit() in emit_epilogue() after the insert loop

UPDATE — core/translate/emitter.rs

  • Per-row: captures before-image, after-image via patch record, emits emit_cdc_insns()
  • End-of-statement: emit_cdc_autocommit_commit() after the update loop

DELETE — core/translate/emitter.rs

  • Per-row: captures before-image and emits emit_cdc_insns()
  • End-of-statement: emit_cdc_autocommit_commit() after the delete loop

UPSERT (ON CONFLICT DO UPDATE) — core/translate/upsert.rs

  • Per-row: emit_cdc_insns() for all three cases: pure insert, update after conflict, replace
  • No end-of-statement COMMIT — upsert shares INSERT's epilogue

Schema Changes (DDL) — core/translate/schema.rs

  • CREATE TABLE: emit_cdc_insns() (insert into sqlite_schema) + emit_cdc_autocommit_commit()
  • DROP TABLE: emit_cdc_insns() per-row in metadata loop + emit_cdc_autocommit_commit() after loop
  • CREATE INDEX: emit_cdc_insns() + emit_cdc_autocommit_commit() (core/translate/schema.rs)
  • DROP INDEX: emit_cdc_insns() per-row + emit_cdc_autocommit_commit() after loop (core/translate/index.rs)

DDL in explicit transactions (BEGIN; CREATE TABLE t(x); COMMIT) does NOT emit per-statement COMMIT — the autocommit check prevents it.

ALTER TABLE — core/translate/update.rs

  • Sets cdc_update_alter_statement on the update plan when CDC has updates mode

Views/Triggers — Explicitly excluded

  • core/translate/view.rs — passes None for CDC cursor
  • core/translate/trigger.rs — passes None for CDC cursor

Subqueries — No CDC

  • core/translate/subquery.rscdc_cursor_id: None

Helper Functions (for reading CDC data)

table_columns_json_array(table_name)core/function.rs, core/vdbe/execute.rs

Returns JSON array of column names for a table. Used to interpret binary records.

bin_record_json_object(columns_json, blob)core/function.rs, core/vdbe/execute.rs

Decodes a binary record (from before/after/updates columns) into a JSON object using column names.

Sync Engine Integration

The sync engine is the primary consumer of CDC data.

DatabaseTape — sync/engine/src/database_tape.rs

  • CDC config: DEFAULT_CDC_TABLE_NAME = "turso_cdc", DEFAULT_CDC_MODE = "full"
  • PRAGMA name: CDC_PRAGMA_NAME = "unstable_capture_data_changes_conn"
  • Initialization: connect() sets CDC pragma and caches cdc_version from turso_cdc_version table. Must be called before iterate_changes().
  • Version caching: cdc_version: RwLock<Option<CdcVersion>> — set by connect(), read by iterate_changes(). Panics if not set.
  • Iterator: DatabaseChangesIterator reads CDC table in batches, emits DatabaseTapeOperation. For v2, real COMMIT records from the table are emitted. For v1, a synthetic Commit is appended at end of batch. ignore_schema_changes: true (default) filters out sqlite_schema row changes but not COMMIT records.

Sync Operations — sync/engine/src/database_sync_operations.rs

  • Change counting: SELECT COUNT(*) FROM turso_cdc WHERE change_id > ?

Sync Engine — sync/engine/src/database_sync_engine.rs

  • Initialization: open_db() calls main_tape.connect(coro) to ensure CDC is set up and version is cached before any iterate_changes() calls.
  • During apply_changes, checks if CDC table existed, re-creates it after sync

Replay Generator — sync/engine/src/database_replay_generator.rs

  • Requires updates column to be populated (full mode)

Bindings CDC Surface

All bindings expose cdc_operations as part of sync stats:

Binding File
Python bindings/python/src/turso_sync.rs
JavaScript bindings/javascript/sync/src/lib.rs
JS (generator) bindings/javascript/sync/src/generator.rs
Go bindings/go/bindings_sync.go
React Native bindings/react-native/src/types.ts
SDK Kit (C header) sync/sdk-kit/turso_sync.h
SDK Kit (Rust) sync/sdk-kit/src/bindings.rs

Tests

  • Integration tests: tests/integration/functions/test_cdc.rs — covers all modes, CRUD, transactions, schema changes, version table, backward compatibility. Registered in tests/integration/functions/mod.rs.
  • Sync engine tests: sync/engine/src/database_tape.rs — CDC table reads, tape iteration, replay of schema changes.
  • JS binding tests: bindings/javascript/sync/packages/{wasm,native}/promise.test.ts

Run: cargo test -- test_cdc (integration) or cargo test -p turso_sync_engine -- database_tape (sync engine).

User-facing Documentation

  • CLI manual page: cli/manuals/cdc.md — accessible via .manual cdc in the REPL
  • Database manual: docs/manual.md — CDC section linked in TOC

Key Design Decisions

  1. Per-connection, not per-database. Each connection has its own CDC mode and can target different tables.
  2. Bytecode-level implementation. CDC instructions are emitted alongside the actual DML bytecode during translation — no runtime hooks or triggers.
  3. Self-exclusion. Changes to the CDC table and turso_cdc_version table are never captured (checked in prepare_cdc_if_necessary).
  4. Schema changes tracked. DDL operations are recorded as changes to sqlite_schema table.
  5. Binary record format. Before/after/updates columns use SQLite's MakeRecord format (same as B-tree payload).
  6. Transaction-aware. CDC writes happen within the same transaction as the DML, so rollback naturally discards CDC entries.
  7. Version tracking. CDC schema version is recorded in turso_cdc_version table and carried in CaptureDataChangesInfo.version for future schema evolution.
  8. Atomic PRAGMA. Connection CDC state is deferred via pending_cdc_info in ProgramState and applied only at Halt. If the PRAGMA's disk writes fail and the transaction rolls back, the connection state stays unchanged.
  9. Per-statement COMMIT (v2). COMMIT records are emitted once per statement (not per row), using emit_cdc_autocommit_commit() which checks is_autocommit() at runtime. In explicit transactions, only the final COMMIT emits a COMMIT CDC record.
  10. Backward-compatible version detection. Pre-existing v1 CDC tables (without turso_cdc_version) are detected by checking table existence before creation. Existing tables get CdcVersion::V1 inserted into the version table.
  11. Typed version enum. CdcVersion enum with #[repr(u8)] and Ord/PartialOrd enables feature gating via integer comparison (has_commit_record() = self >= V2). Display/FromStr handles database round-trip.
  12. CDC and MVCC mutual exclusion. Enabling CDC when MVCC is active (or vice versa) returns an error. Checked at PRAGMA set time and journal mode switch time.
Weekly Installs
11
First Seen
3 days ago
Installed on
claude-code11
opencode5
qwen-code5
github-copilot5
codex5
kimi-cli5