cdc
CDC (Change Data Capture) - Internal Feature Map
Overview
CDC tracks INSERT/UPDATE/DELETE changes on database tables by writing change records into a
dedicated CDC table (turso_cdc by default). It is per-connection, enabled via PRAGMA, and
operates at the bytecode generation (translate) layer. The sync engine consumes CDC records
to push local changes to the remote.
Architecture Diagram
User SQL (INSERT/UPDATE/DELETE/DDL)
|
v
┌─────────────────────────────────────────────────┐
│ Translate layer (core/translate/) │
│ ┌───────────────────────────────────────────┐ │
│ │ prepare_cdc_if_necessary() │ │
│ │ - checks CaptureDataChangesInfo │ │
│ │ - opens CDC table cursor (OpenWrite) │ │
│ │ - skips if target == CDC table itself │ │
│ └───────────────────────────────────────────┘ │
│ ┌───────────────────────────────────────────┐ │
│ │ emit_cdc_insns() │ │
│ │ - writes (change_id, change_time, │ │
│ │ change_type, table_name, id, │ │
│ │ before, after, updates) into CDC tbl │ │
│ └───────────────────────────────────────────┘ │
│ + emit_cdc_full_record() / emit_cdc_patch_record() │
└─────────────────────────────────────────────────┘
|
v
CDC table (turso_cdc or custom name)
|
v
┌─────────────────────────────────────────────────┐
│ Sync engine (sync/engine/) │
│ DatabaseTape reads CDC table → DatabaseChange │
│ → apply/revert → push to remote │
└─────────────────────────────────────────────────┘
Core Data Types
CaptureDataChangesMode + CaptureDataChangesInfo — core/lib.rs
CDC behavior is controlled by two types:
#[derive(Debug, Clone, Copy, Eq, PartialEq, Ord, PartialOrd)]
#[repr(u8)]
enum CdcVersion {
V1 = 1,
V2 = 2,
}
const CDC_VERSION_CURRENT: CdcVersion = CdcVersion::V2;
enum CaptureDataChangesMode {
Id, // capture only rowid
Before, // capture before-image
After, // capture after-image
Full, // before + after + updates
}
struct CaptureDataChangesInfo {
mode: CaptureDataChangesMode,
table: String, // CDC table name
version: Option<CdcVersion>, // schema version (V1 or V2)
}
The connection stores Option<CaptureDataChangesInfo> — None means CDC is off.
Key methods on CdcVersion:
has_commit_record()—self >= V2, gates COMMIT record emissionDisplay/FromStr— round-trips"v1"↔V1,"v2"↔V2
Key methods on CaptureDataChangesInfo:
parse(value: &str, version: Option<CdcVersion>)— parses PRAGMA argument"<mode>[,<table_name>]", returnsNonefor "off"cdc_version()— returnsCdcVersion(panics if version is None). Single accessor replacing oldis_v1()/is_v2()/version()methods.has_before()/has_after()/has_updates()— mode capability checksmode_name()— returns mode as string
Convenience trait CaptureDataChangesExt on Option<CaptureDataChangesInfo> provides:
has_before()/has_after()/has_updates()— delegates to inner, returns false for Nonetable()— returnsOption<&str>, None when CDC is off
CDC Table Schema v1
Default table name: turso_cdc (constant TURSO_CDC_DEFAULT_TABLE_NAME)
CREATE TABLE turso_cdc (
change_id INTEGER PRIMARY KEY AUTOINCREMENT,
change_time INTEGER, -- unixepoch()
change_type INTEGER, -- 1=INSERT, 0=UPDATE, -1=DELETE
table_name TEXT,
id <untyped>, -- rowid of changed row
before BLOB, -- binary record (before-image)
after BLOB, -- binary record (after-image)
updates BLOB -- binary record of per-column changes
);
CDC Table Schema v2 (current)
CREATE TABLE turso_cdc (
change_id INTEGER PRIMARY KEY AUTOINCREMENT,
change_time INTEGER, -- unixepoch()
change_type INTEGER, -- 1=INSERT, 0=UPDATE, -1=DELETE, 2=COMMIT
table_name TEXT,
id <untyped>, -- rowid of changed row
before BLOB, -- binary record (before-image)
after BLOB, -- binary record (after-image)
updates BLOB, -- binary record of per-column changes
change_txn_id INTEGER -- transaction ID (groups rows into transactions)
);
v2 adds:
change_txn_idcolumn — groups CDC rows by transaction. Assigned viaconn_txn_id(candidate)opcode which get-or-sets a per-connection transaction ID.change_type=2(COMMIT) records — mark transaction boundaries. Emitted once per statement in autocommit mode, or on explicitCOMMIT.
The CDC table is created at runtime by the InitCdcVersion opcode via CREATE TABLE IF NOT EXISTS.
CDC Version Table
When CDC is first enabled, a version tracking table is created:
CREATE TABLE turso_cdc_version (
table_name TEXT PRIMARY KEY,
version TEXT NOT NULL
);
Current version: CDC_VERSION_CURRENT = CdcVersion::V2 (defined in core/lib.rs, re-exported from core/translate/pragma.rs)
Version Detection in InitCdcVersion
The InitCdcVersion opcode detects v1 vs v2 by checking whether the CDC table already exists before creating it:
- If CDC table already exists but has no version row → v1 (pre-existing table from before version tracking)
- If CDC table doesn't exist → create with current version (v2)
- If version row already exists → use that version as-is
DatabaseChange — sync/engine/src/types.rs:229-249
Sync engine's Rust representation of a CDC row. Has into_apply() and into_revert() methods
for forward/backward replay.
OperationMode — core/translate/emitter.rs
Used by emit_cdc_insns() to determine change_type value:
INSERT→ 1UPDATE/SELECT→ 0DELETE→ -1COMMIT→ 2 (v2 only, emitted byemit_cdc_commit_insns)
Entry Points
1. PRAGMA — Enable/Disable CDC
Set: core/translate/pragma.rs
- Checks MVCC is not enabled (CDC and MVCC are mutually exclusive)
- Parses mode string via
CaptureDataChangesInfo::parse()withCDC_VERSION_CURRENT - Emits a single
InitCdcVersionopcode — all CDC setup (table creation, version tracking, state change) happens at execution time
Get (read current mode): core/translate/pragma.rs
- Returns 3 columns:
mode,table,version - When off: returns
("off", NULL, NULL) - When active: returns
(mode_name, table, version)
Pragma registration: core/pragma.rs — UnstableCaptureDataChangesConn with columns ["mode", "table", "version"]
2. Connection State
Field: core/connection.rs — capture_data_changes: RwLock<Option<CaptureDataChangesInfo>>
Getter: get_capture_data_changes_info() — returns read guard
Setter: set_capture_data_changes_info(opts: Option<CaptureDataChangesInfo>)
Default: initialized as None (CDC off)
3. ProgramBuilder Integration
Field: core/vdbe/builder.rs — capture_data_changes_info: Option<CaptureDataChangesInfo>
Accessor: capture_data_changes_info() — returns &Option<CaptureDataChangesInfo>
Passed from: core/translate/mod.rs — read from connection when creating builder
4. PrepareContext
Field: core/vdbe/mod.rs — capture_data_changes: Option<CaptureDataChangesInfo>
Set from: PrepareContext::from_connection() — clones from connection.get_capture_data_changes_info()
5. InitCdcVersion Opcode — core/vdbe/execute.rs
Always emitted by PRAGMA SET. Handles all CDC setup at execution time:
- For "off": stores
Noneinstate.pending_cdc_info, returns early - Checks if CDC table already exists (for v1 backward compatibility)
- Creates CDC table (
CREATE TABLE IF NOT EXISTS <cdc_table_name> ...) — v2 schema withchange_txn_idcolumn - Creates version table (
CREATE TABLE IF NOT EXISTS turso_cdc_version ...) - Inserts version row: if CDC table pre-existed → "v1", otherwise → current version ("v2"). Uses
INSERT OR IGNOREto preserve existing version rows. - Reads back actual version from the table
- Stores computed
CaptureDataChangesInfoinstate.pending_cdc_info
The connection's CDC state is not applied in the opcode. Instead, pending_cdc_info is applied in halt() only after the transaction commits successfully. This ensures atomicity: if any step fails and the transaction rolls back, the connection's CDC state remains unchanged.
All table creation is done via nested conn.prepare()/run_ignore_rows() calls rather than bytecode emission, because the PRAGMA plan can't contain DML against tables that don't exist yet in the schema.
Bytecode Emission (core/translate/emitter.rs)
These are the core CDC code generation functions:
| Function | Purpose |
|---|---|
prepare_cdc_if_necessary() |
Opens CDC table cursor if CDC is active and target != CDC table |
emit_cdc_full_record() |
Reads all columns from cursor into a MakeRecord (for before/after image) |
emit_cdc_patch_record() |
Builds record from in-flight register values (for after-image of INSERT/UPDATE) |
emit_cdc_insns() |
Writes a single CDC row per changed row (INSERT/UPDATE/DELETE). Called per-row inside DML loops. |
emit_cdc_commit_insns() |
Writes a COMMIT record (change_type=2) into CDC table (v2 only). Raw emission, no autocommit check. |
emit_cdc_autocommit_commit() |
End-of-statement COMMIT emission. Checks is_autocommit() at runtime — only emits COMMIT if in autocommit mode. v2 only. |
COMMIT Emission Strategy (v2)
Per-row call sites use emit_cdc_insns() (no COMMIT). End-of-statement sites call emit_cdc_autocommit_commit() which checks is_autocommit() at runtime:
- Autocommit mode: emits a COMMIT record after the statement completes
- Explicit transaction (
BEGIN...COMMIT): skips per-statement COMMIT; the explicitCOMMITstatement emits the COMMIT record viaemit_cdc_commit_insns()
This ensures multi-row statements like INSERT INTO t VALUES (1),(2),(3) produce one COMMIT at the end, not one per row.
Integration Points — Where CDC Records Are Emitted
INSERT — core/translate/insert.rs
- Per-row:
emit_cdc_insns()after insert, and before delete for REPLACE/conflict - End-of-statement:
emit_cdc_autocommit_commit()inemit_epilogue()after the insert loop
UPDATE — core/translate/emitter.rs
- Per-row: captures before-image, after-image via patch record, emits
emit_cdc_insns() - End-of-statement:
emit_cdc_autocommit_commit()after the update loop
DELETE — core/translate/emitter.rs
- Per-row: captures before-image and emits
emit_cdc_insns() - End-of-statement:
emit_cdc_autocommit_commit()after the delete loop
UPSERT (ON CONFLICT DO UPDATE) — core/translate/upsert.rs
- Per-row:
emit_cdc_insns()for all three cases: pure insert, update after conflict, replace - No end-of-statement COMMIT — upsert shares INSERT's epilogue
Schema Changes (DDL) — core/translate/schema.rs
- CREATE TABLE:
emit_cdc_insns()(insert intosqlite_schema) +emit_cdc_autocommit_commit() - DROP TABLE:
emit_cdc_insns()per-row in metadata loop +emit_cdc_autocommit_commit()after loop - CREATE INDEX:
emit_cdc_insns()+emit_cdc_autocommit_commit()(core/translate/schema.rs) - DROP INDEX:
emit_cdc_insns()per-row +emit_cdc_autocommit_commit()after loop (core/translate/index.rs)
DDL in explicit transactions (BEGIN; CREATE TABLE t(x); COMMIT) does NOT emit per-statement COMMIT — the autocommit check prevents it.
ALTER TABLE — core/translate/update.rs
- Sets
cdc_update_alter_statementon the update plan when CDC has updates mode
Views/Triggers — Explicitly excluded
core/translate/view.rs— passesNonefor CDC cursorcore/translate/trigger.rs— passesNonefor CDC cursor
Subqueries — No CDC
core/translate/subquery.rs—cdc_cursor_id: None
Helper Functions (for reading CDC data)
table_columns_json_array(table_name) — core/function.rs, core/vdbe/execute.rs
Returns JSON array of column names for a table. Used to interpret binary records.
bin_record_json_object(columns_json, blob) — core/function.rs, core/vdbe/execute.rs
Decodes a binary record (from before/after/updates columns) into a JSON object using column names.
Sync Engine Integration
The sync engine is the primary consumer of CDC data.
DatabaseTape — sync/engine/src/database_tape.rs
- CDC config:
DEFAULT_CDC_TABLE_NAME = "turso_cdc",DEFAULT_CDC_MODE = "full" - PRAGMA name:
CDC_PRAGMA_NAME = "unstable_capture_data_changes_conn" - Initialization:
connect()sets CDC pragma and cachescdc_versionfromturso_cdc_versiontable. Must be called beforeiterate_changes(). - Version caching:
cdc_version: RwLock<Option<CdcVersion>>— set byconnect(), read byiterate_changes(). Panics if not set. - Iterator:
DatabaseChangesIteratorreads CDC table in batches, emitsDatabaseTapeOperation. For v2, real COMMIT records from the table are emitted. For v1, a synthetic Commit is appended at end of batch.ignore_schema_changes: true(default) filters outsqlite_schemarow changes but not COMMIT records.
Sync Operations — sync/engine/src/database_sync_operations.rs
- Change counting:
SELECT COUNT(*) FROM turso_cdc WHERE change_id > ?
Sync Engine — sync/engine/src/database_sync_engine.rs
- Initialization:
open_db()callsmain_tape.connect(coro)to ensure CDC is set up and version is cached before anyiterate_changes()calls. - During
apply_changes, checks if CDC table existed, re-creates it after sync
Replay Generator — sync/engine/src/database_replay_generator.rs
- Requires
updatescolumn to be populated (full mode)
Bindings CDC Surface
All bindings expose cdc_operations as part of sync stats:
| Binding | File |
|---|---|
| Python | bindings/python/src/turso_sync.rs |
| JavaScript | bindings/javascript/sync/src/lib.rs |
| JS (generator) | bindings/javascript/sync/src/generator.rs |
| Go | bindings/go/bindings_sync.go |
| React Native | bindings/react-native/src/types.ts |
| SDK Kit (C header) | sync/sdk-kit/turso_sync.h |
| SDK Kit (Rust) | sync/sdk-kit/src/bindings.rs |
Tests
- Integration tests:
tests/integration/functions/test_cdc.rs— covers all modes, CRUD, transactions, schema changes, version table, backward compatibility. Registered intests/integration/functions/mod.rs. - Sync engine tests:
sync/engine/src/database_tape.rs— CDC table reads, tape iteration, replay of schema changes. - JS binding tests:
bindings/javascript/sync/packages/{wasm,native}/promise.test.ts
Run: cargo test -- test_cdc (integration) or cargo test -p turso_sync_engine -- database_tape (sync engine).
User-facing Documentation
- CLI manual page:
cli/manuals/cdc.md— accessible via.manual cdcin the REPL - Database manual:
docs/manual.md— CDC section linked in TOC
Key Design Decisions
- Per-connection, not per-database. Each connection has its own CDC mode and can target different tables.
- Bytecode-level implementation. CDC instructions are emitted alongside the actual DML bytecode during translation — no runtime hooks or triggers.
- Self-exclusion. Changes to the CDC table and
turso_cdc_versiontable are never captured (checked inprepare_cdc_if_necessary). - Schema changes tracked. DDL operations are recorded as changes to
sqlite_schematable. - Binary record format. Before/after/updates columns use SQLite's MakeRecord format (same as B-tree payload).
- Transaction-aware. CDC writes happen within the same transaction as the DML, so rollback naturally discards CDC entries.
- Version tracking. CDC schema version is recorded in
turso_cdc_versiontable and carried inCaptureDataChangesInfo.versionfor future schema evolution. - Atomic PRAGMA. Connection CDC state is deferred via
pending_cdc_infoinProgramStateand applied only at Halt. If the PRAGMA's disk writes fail and the transaction rolls back, the connection state stays unchanged. - Per-statement COMMIT (v2). COMMIT records are emitted once per statement (not per row), using
emit_cdc_autocommit_commit()which checksis_autocommit()at runtime. In explicit transactions, only the finalCOMMITemits a COMMIT CDC record. - Backward-compatible version detection. Pre-existing v1 CDC tables (without
turso_cdc_version) are detected by checking table existence before creation. Existing tables getCdcVersion::V1inserted into the version table. - Typed version enum.
CdcVersionenum with#[repr(u8)]andOrd/PartialOrdenables feature gating via integer comparison (has_commit_record()=self >= V2).Display/FromStrhandles database round-trip. - CDC and MVCC mutual exclusion. Enabling CDC when MVCC is active (or vice versa) returns an error. Checked at PRAGMA set time and journal mode switch time.