pipes-new-indexer

Installation
SKILL.md

Pipes: New Indexer

Create new blockchain indexer projects using the Pipes CLI.

When to Use This Skill

Activate when user wants to:

  • Create a new indexer from scratch
  • Generate a project with templates (ERC20, Uniswap V3, etc.)
  • Start indexing a new blockchain protocol
  • Set up a fresh indexer with proper structure
  • Build a Hyperliquid perpetual futures fills indexer

Overview

The Pipes CLI (@iankressin/pipes-cli) provides an interactive scaffolding tool that generates production-ready indexer projects with built-in templates for common use cases.

Available Templates

Use npx @iankressin/pipes-cli@latest init --schema to see the full list of available templates and their parameter schemas.

EVM Templates

erc20Transfers - Track ERC20 token transfers:

{"templateId": "erc20Transfers", "params": {"contractAddresses": ["0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48"], "range": {"from": "21000000"}}}

uniswapV3Swaps - Track Uniswap V3 swap events via factory pattern:

{"templateId": "uniswapV3Swaps", "params": {"factoryAddress": "0x1F98431c8aD98523631AE4a59f267346ea31F984", "range": {"from": "21000000"}}}

custom - Custom contract events (requires full ABI event objects, NOT just event names):

{"templateId": "custom", "params": {"contracts": [{"contractAddress": "0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2", "contractName": "WETH", "contractEvents": [{"name": "Deposit", "type": "event", "inputs": [{"name": "dst", "type": "address", "indexed": true}, {"name": "wad", "type": "uint256"}]}, {"name": "Withdrawal", "type": "event", "inputs": [{"name": "src", "type": "address", "indexed": true}, {"name": "wad", "type": "uint256"}]}], "range": {"from": "21000000"}}]}}

COMMON MISTAKE: Passing "contractEvents": ["Deposit", "Withdrawal"] (just names) will fail with Invalid input: expected array, received undefined. Each event must be a full ABI object with name, type, and inputs fields. For indexed parameters, include "indexed": true.

SVM (Solana) Templates

  • custom - Start with a blank template for custom logic

NOTE: The SVM custom template may fail with Invalid input: expected array, received undefined. If so, scaffold the project manually (package.json, tsconfig, src/index.ts, docker-compose.yml).

Solana Program Types: Anchor vs Non-Anchor

Before writing any Solana indexer, determine if the program is Anchor or non-Anchor. This dictates your entire approach:

Anchor Programs Non-Anchor Programs
Discriminator d8 (8 bytes, from sha256("global:<name>")) d1 (1 byte, Borsh enum index)
Typegen Yes — @subsquid/solana-typegen No — manual Borsh decoding
IDL available On-chain or GitHub (target/idl/*.json) No IDL — read Rust source
Decoding instructions.swap.decode(ins) → typed { accounts, data } Manual: read bytes from base58-decoded data
Examples Jito Tip Distribution, Jupiter Lend, Orca Whirlpool SPL Token, SPL Stake Pool, System Program

How to detect: Query Portal for the program's instructions and check d8 values. If they're 8-byte hex (0x3ec6d6c1d59f6cd2), it's Anchor. If the meaningful discriminator is d1 (single byte like 0x0e), it's non-Anchor.

Anchor Programs: Use Typegen

@subsquid/solana-typegen generates typed instruction decoders from Anchor IDLs. Use this for all Anchor programs — never manually compute d8 discriminators.

Install:

npm install @subsquid/solana-typegen

Generate types from IDL (3 methods):

# Method 1: Local IDL JSON file
npx squid-solana-typegen src/abi whirlpool.json

# Method 2: Fetch IDL from on-chain (Anchor programs store IDLs on-chain)
npx squid-solana-typegen src/abi whirLbMiicVdio4qvUfM5KAg6Ct8VwpYzGff3uctyCc#whirlpool

# Method 3: Glob pattern for multiple IDLs
npx squid-solana-typegen src/abi ./idl/*

What it generates: A TypeScript module exporting:

  • programId — the program's public key constant
  • instructions.<name>.d8 — 8-byte discriminator for each instruction
  • instructions.<name>.decode(ins) — typed decoder returning { accounts, data }
  • instructions.<name>.accountSelection(mapping) — helper for account-position filtering

Usage with SolanaQueryBuilder:

import * as whirlpool from './abi/whirlpool'
import { solanaPortalStream, solanaQuery } from '@subsquid/pipes/solana'

const query = solanaQuery()
  .addFields({
    block: { number: true, timestamp: true },
    transaction: { signatures: true },
    instruction: { programId: true, accounts: true, data: true },
  })
  .addInstruction({
    range: { from: 280000000 },
    request: {
      programId: [whirlpool.programId],
      d8: [whirlpool.instructions.swap.d8],
      // Account-position filtering (optional — replaces manual a0/a1 filters):
      ...whirlpool.instructions.swap.accountSelection({
        whirlpool: ['7qbRF6YsyGuLUVs6Y1q64bdVrfe4ZcUUz1JRdoVNUJnm']
      }),
      transaction: true,
    },
  })

Decoding instructions in .pipe():

.pipe((blocks) => {
  const swaps = []
  for (const block of blocks) {
    for (const ins of block.instructions ?? []) {
      if (ins.programId === whirlpool.programId &&
          ins.d8 === whirlpool.instructions.swap.d8) {
        const decoded = whirlpool.instructions.swap.decode(ins)
        // decoded.accounts — typed account addresses
        // decoded.data — typed instruction parameters
        swaps.push({
          slot: block.header.number,
          timestamp: new Date(block.header.timestamp * 1000).toISOString(),
          ...decoded.data,
        })
      }
    }
  }
  return { swaps }
})

Where to find IDLs:

  1. On-chain (Anchor programs): Use npx squid-solana-typegen src/abi <programId>#<name> — this queries the chain directly
  2. GitHub: Most protocols publish IDLs in their repos (e.g., target/idl/<program>.json)
  3. Anchor IDL registries: Programs built with Anchor store IDLs at a PDA derived from the program ID
  4. Protocol docs: Some protocols link to their IDL files directly

When on-chain IDL is not available (still Anchor):

  • Some Anchor programs don't store IDLs on-chain. The programId#name format will fail with "Failed to fetch IDL"
  • Download the IDL from the protocol's GitHub repo (usually target/idl/<program>.json)
  • Pass local files to typegen: npx squid-solana-typegen src/abi ./idl/*.json

Non-Anchor Programs: Manual d1 Decoding

Typegen does NOT work for non-Anchor programs (SPL Token, SPL Stake Pool, System Program, etc.). These use single-byte Borsh enum discriminators (d1) and require manual decoding.

Step 1: Discover d1 discriminators from Portal

# Query ALL instructions from the program in a small slot range
curl -s 'https://portal.sqd.dev/datasets/solana-mainnet/stream' \
  -H 'content-type: application/json' \
  -H 'accept: application/x-ndjson' \
  -d '{"type":"solana","fromBlock":400000000,"toBlock":400010000,
       "instructions":[{"programId":["SPoo1Ku8WFXoNDMHPsrGSTSG1Y47rzgn41SLUNakuHy"]}],
       "fields":{"instruction":{"programId":true,"d1":true}}}' \
  | python3 -c "import sys,json; counts={}
for l in sys.stdin:
  for i in json.loads(l).get('instructions',[]):
    d=i.get('d1','?'); counts[d]=counts.get(d,0)+1
for k,v in sorted(counts.items(),key=lambda x:-x[1]): print(f'{k}: {v}')"

Step 2: Map d1 values to instruction names from the Rust source

// SPL Stake Pool instruction discriminators (from instruction.rs enum order)
const INSTRUCTIONS: Record<string, string> = {
  '0x09': 'DepositStake',
  '0x0a': 'WithdrawStake',
  '0x0e': 'DepositSol',
  '0x10': 'WithdrawSol',
  '0x18': 'DepositSolWithSlippage',
  '0x19': 'WithdrawSolWithSlippage',
}

Step 3: Filter by d1 in SolanaQueryBuilder

const query = solanaQuery()
  .addFields({
    block: { number: true, timestamp: true },
    transaction: { transactionIndex: true, signatures: true },
    instruction: { transactionIndex: true, programId: true, data: true, accounts: true },
  })
  .addInstruction({
    range: { from: FROM_SLOT },
    request: {
      programId: [SPL_STAKE_POOL],
      d1: Object.keys(INSTRUCTIONS),  // d1 filter, NOT d8
      isCommitted: true,
      transaction: true,
    },
  })

Step 4: Decode manually in .pipe()

// Base58 decode the instruction data, then read fields
const data = base58Decode(ins.data)
const d1Hex = '0x' + data[0].toString(16).padStart(2, '0')
const instrName = INSTRUCTIONS[d1Hex]

// Read u64 LE amount after the d1 byte (if present)
let amount = 0n
if (data.length >= 9) {
  for (let i = 0; i < 8; i++) {
    amount |= BigInt(data[1 + i]) << BigInt(i * 8)
  }
}

// Pool account is typically a0
const pool = ins.accounts?.[0] ?? ''

Real examples: SPL Stake Pool (solana/006), Kamino Lend uses d8 but manual decode (solana/001)

Volume warning: Many non-Anchor programs (especially SPL Stake Pool) have very low on-chain instruction volume. Always check volume with Portal BEFORE building — see the "Pre-Build: Estimate Instruction Volume" section in the portal-query skill's Solana reference.

CPI (Cross-Program Invocation) — CRITICAL for many DeFi protocols:

  • Many Solana DeFi protocols use a layered architecture where user-facing programs call core programs via CPI
  • Example: Jupiter Lend's Lending/Vaults programs call the Liquidity program's operate instruction via CPI
  • If your indexer returns zero data but Portal shows instructions exist, the instructions are likely CPI (inner instructions)
  • Fix: Add innerInstructions: true to the addInstruction() request:
    .addInstruction({
      range: { from: FROM_SLOT },
      request: {
        programId: [myProgram.programId],
        d8: [myProgram.instructions.operate.d8],
        isCommitted: true,
        transaction: true,
        innerInstructions: true,  // Required for CPI calls
      },
    })
    
  • How to detect: Query Portal for the program's instructions, then check instructionAddress field — if all entries have len > 1, they're all CPI
  • Without this flag, the indexer silently captures zero data — Portal cross-reference in validate.ts will catch this

CRITICAL: Template IDs must use camelCase format. Each template has specific required params - check the schema.

Hyperliquid Fills (No CLI Template — Manual Setup)

The Pipes SDK supports Hyperliquid fills natively via @subsquid/pipes/hyperliquid, but there is no CLI template yet. You must scaffold the project manually.

Import:

import { hyperliquidFillsPortalStream, hyperliquidFillsQuery } from '@subsquid/pipes/hyperliquid'

Query builder pattern:

const query = hyperliquidFillsQuery()
  .addRange({ from: 920000000 })
  .addFields({
    block: { number: true, timestamp: true },
    fill: {
      user: true, coin: true, px: true, sz: true,
      side: true, dir: true, closedPnl: true,
      fee: true, feeToken: true, crossed: true, startPosition: true,
    },
  })
  .addFill({ range: { from: 920000000 }, request: { coin: ['BTC', 'ETH', 'SOL'] } })

CRITICAL: .addFill() requires a range parameter — passing only { request: {...} } will crash with Cannot read properties of undefined (reading 'from').

Choosing a start block: Blocks increment at ~1/second. Use current_block - (days × 86400) to estimate. For a 7-day window, subtract ~604,800 from the current head block. A 7-day BTC/ETH/SOL sync yields ~6M fills in ~2-3 minutes.

Source (SDK 1.0+):

await hyperliquidFillsPortalStream({
  id: 'hl-perps-fills',  // required unique pipe ID
  portal: 'https://portal.sqd.dev/datasets/hyperliquid-fills',
  outputs: query,  // query builder passed as outputs (replaces old `query` field)
})

Data shape: .pipe() receives Block[] directly (not { blocks }). Each block has header (number, timestamp) and fills array:

.pipe((blocks) => {
  const fills = blocks.flatMap((block) =>
    block.fills.map((fill) => ({
      block_number: block.header.number,
      timestamp: new Date(block.header.timestamp).toISOString(),
      user: fill.user,
      coin: fill.coin,
      px: fill.px,
      sz: fill.sz,
      side: fill.side === 'B' ? 'Buy' : 'Sell',
      dir: fill.dir,
      closed_pnl: fill.closedPnl,
      fee: fill.fee,
      notional: fill.px * fill.sz,
      sign: 1,
    })),
  )
  return { fills }
})

Key differences from EVM indexers:

  • No evmDecoder — use hyperliquidFillsQuery() + .pipe() directly
  • Dataset starts at block 750,000,000 (not block 0)
  • Timestamps are in milliseconds. Use new Date(block.header.timestamp).toISOString() for ClickHouse (with date_time_input_format: 'best_effort'). Unlike EVM indexers, you do NOT divide by 1000 — the ISO string approach handles it.
  • side is 'B' (buy) or 'S' (sell) — single character codes. The pipe transform maps these to 'Buy'/'Sell' for ClickHouse, so SQL queries use the mapped values.
  • dir values: "Open Long", "Close Long", "Open Short", "Close Short", "Long > Short", "Short > Long", "Net Child Vaults"
  • All numeric values (px, sz, closedPnl, fee) are native floats, not BigInt
  • closedPnl is only non-zero for closing trades
  • fee can be negative (maker rebate)

Fill filter options (pass in addFill({ request: {...} })):

  • coin — Filter by asset: ['BTC', 'ETH', 'SOL']
  • user — Filter by trader address: ['0x...']
  • dir — Filter by direction: ['Open Long', 'Close Short']
  • feeToken — Filter by fee denomination
  • builder — Filter by builder address

ClickHouse schema for fills:

CREATE TABLE IF NOT EXISTS hl_fills (
    block_number UInt64,
    timestamp DateTime64(3, 'UTC'),
    user LowCardinality(String),
    coin LowCardinality(String),
    px Float64,
    sz Float64,
    side LowCardinality(String),
    dir LowCardinality(String),
    closed_pnl Float64,
    fee Float64,
    notional Float64,
    sign Int8
) ENGINE = CollapsingMergeTree(sign)
ORDER BY (coin, block_number, user, dir)
PARTITION BY toYYYYMM(timestamp);

Common use cases (tested):

  • Whale tracker — filter by user to track specific whale addresses, monitor PnL and positions (894K fills for 5 whales in 60s)
  • Multi-coin volume — filter by coin with 10+ assets including HYPE, DOGE, WIF (2.35M fills for 9 coins in 60s)
  • Perps analytics — track all fills for majors, aggregate daily volume and long/short ratios (5M fills in 70s)

Coin tickers: Use uppercase symbols: BTC, ETH, SOL, HYPE, DOGE, WIF, ARB, SUI, AVAX, etc. Some tokens use prefixed tickers: kPEPE (not PEPE), kBONK, kFLOKI, cash:GOLD, cash:TSLA, xyz:GOLD. Not all tokens use obvious tickers — run a broad query first to discover available coins.

Computed fields: The notional field (fill.px * fill.sz) is NOT a native fill field — it's computed in the .pipe() transform. Native fields are listed in the addFields section above.

See references/HYPERLIQUID_GUIDE.md for the complete manual setup walkthrough, use case examples, and SQL queries.

Supported Sinks

  • ClickHouse - High-performance analytics database (recommended)
  • PostgreSQL - Relational database with Drizzle ORM

Note: Memory sink is listed in the schema but not yet implemented in the CLI.

Node.js Version Requirement

Use Node.js LTS (v20 or v22). Node.js v25.x has known zstd decompression bugs that cause random crashes during Portal data streaming. If node --version shows v25.x, switch to LTS:

# Option 1: nvm (if installed)
nvm install 22
nvm use 22

# Option 2: Homebrew (macOS)
brew install node@22
export PATH="/opt/homebrew/opt/node@22/bin:$PATH"

# Option 3: Direct download
# Visit https://nodejs.org/ and install the LTS version

If you cannot switch Node versions, the indexer may still work for smaller syncs. The zstd bug tends to crash on large syncs (millions of blocks). For quick tests with recent blocks, v25 often works.

SDK 1.0 Key Features

Time-Based Ranges

Ranges now accept ISO date strings and Date objects instead of block numbers. Dates are auto-resolved to block numbers via the Portal API:

evmDecoder({
  range: { from: '2024-01-01' },  // ISO date string → resolved to block number
  events: { transfers: erc20.events.Transfer },
})

// Date objects work too
evmDecoder({
  range: { from: new Date('2024-01-01'), to: new Date('2024-02-01') },
  events: { ... },
})

// Formatted block numbers
evmDecoder({ range: { from: '18_908_900' }, ... })  // underscores OK

// Latest block
evmDecoder({ range: { from: 'latest' }, ... })  // only for `from`

Validation: Inverted ranges (from > to) and unresolvable timestamps throw BlockRangeConfigurationError (E0002).

defineAbi — Use JSON ABIs Without Codegen

defineAbi() converts a standard JSON ABI into decoder objects at runtime — no squid-evm-typegen step needed:

import erc20Json from './erc20.json'
import { defineAbi } from '@subsquid/pipes'

const erc20 = defineAbi(erc20Json)

evmDecoder({
  events: {
    transfers: erc20.events.Transfer,
    approvals: erc20.events.Approval,
  },
})

Accepts: plain ABI array, as const literal (for full type inference), or Hardhat/Foundry artifact with .abi field. Uses @subsquid/evm-codec (10x faster than viem).

Query Builder Shorthands

Factory functions replace new *QueryBuilder():

Old New
new EvmQueryBuilder() evmQuery()
new SolanaQueryBuilder() solanaQuery()
new HyperliquidFillsQueryBuilder() hyperliquidFillsQuery()

Typed Error System

All framework errors carry unique codes linking to docs:

Error Code When
DefaultPipeIdError E0001 .pipeTo() called without id on source
BlockRangeConfigurationError E0002 Inverted range, invalid date with 'latest', unresolvable timestamp

Testing with @subsquid/pipes/testing/evm

Test pipe logic end-to-end without hitting a real portal. The testing library provides:

  • encodeEvent — encode events with full type inference from viem ABIs
  • mockBlock — build mock blocks with auto-generated metadata (number, hash, timestamp)
  • evmPortalMockStream — spin up a mock portal HTTP server
  • resetMockBlockCounter — reset block numbering between tests

Install: Requires vitest and viem as dev dependencies.

Basic test pattern:

import { commonAbis, evmDecoder, evmPortalStream } from '@subsquid/pipes/evm'
import {
  type MockPortal,
  encodeEvent,
  evmPortalMockStream,
  mockBlock,
  resetMockBlockCounter,
} from '@subsquid/pipes/testing/evm'

// Helper to collect all stream output
async function readAll<T>(stream: AsyncIterable<{ data: T[] }>): Promise<T[]> {
  const res: T[] = []
  for await (const chunk of stream) {
    res.push(...chunk.data)
  }
  return res
}

const ERC20_ABI = [
  {
    type: 'event' as const,
    name: 'Transfer',
    inputs: [
      { name: 'from', type: 'address', indexed: true },
      { name: 'to', type: 'address', indexed: true },
      { name: 'value', type: 'uint256', indexed: false },
    ],
  },
] as const

const USDC = '0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48' as const
const ALICE = '0x7a250d5630b4cf539739df2c5dacb4c659f2488d' as const
const BOB = '0xc82e11e709deb68f3631fc165ebd8b4e3fc3d18f' as const

Test 1: Decode events from mock blocks

let portal: MockPortal

beforeEach(() => resetMockBlockCounter())
afterEach(async () => await portal?.close())

it('should decode ERC20 transfers', async () => {
  // 1. Encode events — args are fully typed from the ABI
  const transfer1 = encodeEvent({
    abi: ERC20_ABI,
    eventName: 'Transfer',
    address: USDC,
    args: { from: ALICE, to: BOB, value: 1_000_000n },
  })

  // 2. Build mock blocks — metadata is auto-generated
  portal = await evmPortalMockStream({
    blocks: [
      mockBlock({ transactions: [{ logs: [transfer1] }] }),
    ],
  })

  // 3. Create pipe exactly as production, but with mock portal URL
  const stream = evmPortalStream({
    id: 'test',
    portal: portal.url,
    outputs: evmDecoder({
      range: { from: 0, to: 1 },
      events: { transfers: commonAbis.erc20.events.Transfer },
    }),
  }).pipe((batch) => batch.transfers)

  // 4. Collect and assert
  const transfers = await readAll(stream)
  expect(transfers).toHaveLength(1)
  expect(transfers[0].event.from).toBe(ALICE)
  expect(transfers[0].event.value).toBe(1_000_000n)
  expect(transfers[0].contract).toBe(USDC)
})

Test 2: Custom pipe transformations

it('should test custom transformations', async () => {
  const transfer = encodeEvent({
    abi: ERC20_ABI, eventName: 'Transfer', address: USDC,
    args: { from: ALICE, to: BOB, value: 2_000_000n },
  })

  portal = await evmPortalMockStream({
    blocks: [mockBlock({ transactions: [{ logs: [transfer] }] })],
  })

  // Chain .pipe() transforms — same as production code
  const stream = evmPortalStream({
    id: 'test',
    portal: portal.url,
    outputs: evmDecoder({
      range: { from: 0, to: 1 },
      events: { transfers: commonAbis.erc20.events.Transfer },
    }),
  })
    .pipe((batch) => batch.transfers)
    .pipe((transfers) =>
      transfers.map((t) => ({
        from: t.event.from,
        to: t.event.to,
        amount: Number(t.event.value) / 1e6,
      })),
    )

  const results = await readAll(stream)
  expect(results[0]).toEqual({ from: ALICE, to: BOB, amount: 2 })
})

Key patterns:

  • encodeEvent accepts abi, eventName, address, and typed args
  • mockBlock auto-generates number, hash, timestamp — call resetMockBlockCounter() in beforeEach
  • evmPortalMockStream returns { url, close() } — use portal.url with evmPortalStream
  • Chain .pipe() on the stream to test transformations
  • Multiple event types: pass multiple events in events: { transfers: ..., approvals: ... } and access batch.transfers, batch.approvals

How to Use the CLI

CLI Known Issue: ora ESM/CJS Crash

The CLI init command crashes with (0 , import_ora.default) is not a function on every init call. This is because the CLI bundles ESM-only ora v6+ as CJS. The --schema and --version commands still work.

IMPORTANT: This happens on every init call, not just the first time. Additionally, npx may re-download the CLI at any time, overwriting any previous patch. You must patch before every init.

Automatic pre-flight patch (MANDATORY before every init):

Run this before EVERY npx @iankressin/pipes-cli@latest init call:

# Pre-flight: ensure CLI is cached, then patch ora
npx @iankressin/pipes-cli@latest --version 2>/dev/null || true
CLI_PATH=$(find ~/.npm/_npx -name "index.cjs" -path "*pipes-cli*" 2>/dev/null | head -1)
if [ -n "$CLI_PATH" ] && ! grep -q 'import_ora = { default: function' "$CLI_PATH"; then
  sed -i.bak 's/var import_ora = __toESM(require("ora"), 1);/var import_ora = { default: function(opts) { var t = typeof opts === "string" ? opts : (opts \&\& opts.text) || ""; return { start: function(m) { console.log(m || t); return this; }, succeed: function(m) { console.log(m || t); return this; }, fail: function(m) { console.log(m || t); return this; }, stop: function() { return this; }, text: t }; } };/' "$CLI_PATH"
  echo "ora patch applied"
else
  echo "ora patch already in place (or CLI not found)"
fi

When using this skill programmatically, ALWAYS run the pre-flight patch as a single block immediately before the init command. Do not assume a previous patch is still in place.

Programmatic Mode (RECOMMENDED for Claude Code)

ALWAYS use programmatic mode with the published npm package:

npx @iankressin/pipes-cli@latest init --config '{
  "projectFolder": "/path/to/my-indexer",
  "packageManager": "npm",
  "networkType": "evm",
  "network": "ethereum-mainnet",
  "templates": [{"templateId": "erc20Transfers", "params": {"contractAddresses": ["0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48"]}}],
  "sink": "clickhouse"
}'

CRITICAL: Template IDs must use camelCase:

  • Use "uniswapV3Swaps" NOT "uniswap-v3-swaps"
  • Use "erc20Transfers" NOT "erc20-transfers"

Inspecting Available Templates

Before creating an indexer, inspect supported templates and their configuration:

npx @iankressin/pipes-cli@latest init --schema

This displays:

  • All available template IDs (camelCase format)
  • Required and optional parameters for each template
  • Sink-specific configurations
  • Network options

Template ID to Parameters Mapping

The --schema output uses an anyOf structure without discriminators. Here is the mapping:

templateId Required params Description
custom contracts (array of {contractAddress, contractName, contractEvents, range}) Custom contract events with full ABI
erc20Transfers contractAddresses (array of address strings), range ERC20 Transfer events
uniswapV3Swaps factoryAddress (single address string), range Uniswap V3 swaps via factory pattern

Known issue: The schema anyOf variants do not include const discriminators on templateId. Match by the params property names (contracts vs contractAddresses vs factoryAddress).

Critical Rule: NEVER MANUALLY CREATE INDEXER FILES

ALWAYS use the Pipes CLI programmatic mode. Manual file creation = YOLO mode = guaranteed problems.

If the CLI fails:

  • Fix the CLI issue first
  • Never work around it by creating files manually
  • Manual creation bypasses all scaffolding, dependency setup, and configuration

Workflow for Helping Users

Step 0: Research Protocol Architecture (MANDATORY)

Before writing ANY code or generating the project:

  1. Understand the protocol structure:

    • Visit the protocol's documentation
    • Identify contract relationships (vault vs underlying protocol, factory vs instances, etc.)
    • Determine which contract emits the events you need
  2. Ask clarifying questions:

    • What blockchain do they want to index? (Ethereum, Polygon, Solana, etc.)
    • What does "track X" mean in this context? (e.g., "allocations" could mean rebalancing events OR actual positions)
    • Which contract emits the relevant events? (Don't assume - verify!)
    • Is there a specific contract, pool, or address? (Important for customization)
    • Time range needed? (Recent data only = faster, full history = slower)
    • Where should the data be stored? (ClickHouse, PostgreSQL, CSV)
    • What should the project be named?
  3. Check for proxy contracts (MANDATORY for custom template):

    This is the #1 failure mode for DeFi protocol indexers. Most major DeFi protocols (Aave, Compound, Uniswap governance, etc.) use proxy contracts. If the target contract is a proxy, the CLI will fetch only the proxy ABI (typically just an Upgraded event), not the implementation ABI with the actual business logic events.

    Before running the CLI, check EVERY target contract:

    # Check Etherscan for proxy status (replace chain/address as needed)
    # Look for: "This contract is a proxy" banner on the contract page
    # Or check the "Read as Proxy" tab — if it exists, the contract IS a proxy
    

    If the contract IS a proxy:

    1. Find the implementation address (Etherscan "Read as Proxy" tab → implementation address)
    2. You will need the implementation's ABI for event decoding
    3. But you will use the proxy's address in the indexer config (events emit from the proxy)
    4. Plan to run evm-typegen against the implementation address AFTER CLI generation:
      npx @subsquid/evm-typegen@latest <project>/src/contracts \
        <IMPLEMENTATION_ADDRESS> --chain-id <CHAIN_ID>
      
    5. Then update the import in src/index.ts to use the implementation's generated file

    Real example — Aave V3 Pool:

    • Proxy: 0x87870Bca3F3fD6335C3F4ce8392D69350B4fA4E2 (this is what users provide)
    • CLI fetches proxy ABI → only gets Upgraded event (WRONG)
    • Implementation has Supply, Borrow, Repay, Withdraw, LiquidationCall events
    • Solution: Generate types from implementation, keep proxy address in config
  4. Verify your understanding:

    • Look at actual transactions on Etherscan to see which events are emitted
    • Check if there are multiple contracts involved
    • Understand the data flow between contracts

Step 1: Inspect Available Templates

Before generating the project, inspect the available templates:

npx @iankressin/pipes-cli@latest init --schema

This ensures you use the correct templateId and understand the required configuration.

Step 2: Run the CLI

npx @iankressin/pipes-cli@latest init --config '{
  "projectFolder": "/path/to/my-indexer",
  "packageManager": "npm",
  "networkType": "evm",
  "network": "ethereum-mainnet",
  "templates": [{"templateId": "erc20Transfers", "params": {"contractAddresses": ["0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48"]}}],
  "sink": "clickhouse"
}'

IMPORTANT: Use camelCase for templateId values. Every template requires a params object - check --schema for required fields.

Step 3: Post-generation Setup (AUTOMATED - Do this AFTER CLI succeeds)

WARNING: The CLI defaults to pipes as the database name. This WILL cause sync table conflicts if you run more than one indexer. Every indexer writes sync state to {database}.sync with id = 'stream'. Two indexers sharing a database = the second one resumes from the first's position, causing wrong data or missing events.

MANDATORY: Always create a project-specific database. Use the project name or a descriptive name (e.g., aave_v3_eth, usdc_transfers, uniswap_base):

# Step 3a: Derive a database name from the project
DB_NAME=$(basename <project-folder> | tr '-' '_')
# Examples: aave-v3-indexer → aave_v3_indexer, usdc-transfers → usdc_transfers

# Step 3b: Create the dedicated database
docker exec <container> clickhouse-client --password <pw> \
  --query "CREATE DATABASE IF NOT EXISTS $DB_NAME"

# Step 3c: Update .env to use the dedicated database (NOT the default 'pipes')
sed -i '' "s/CLICKHOUSE_DATABASE=.*/CLICKHOUSE_DATABASE=$DB_NAME/" <project-folder>/.env

If using ClickHouse (Local Docker):

  • Get the actual password from existing container OR use "default" if creating new
  • Update the .env file with correct password AND database:
    sed -i '' 's/CLICKHOUSE_PASSWORD=.*/CLICKHOUSE_PASSWORD=<actual-password>/' <project-folder>/.env
    sed -i '' "s/CLICKHOUSE_DATABASE=.*/CLICKHOUSE_DATABASE=$DB_NAME/" <project-folder>/.env
    
  • If you MUST reuse a database, clear the sync table first:
    docker exec <container-name> clickhouse-client --password <password> \
      --query "DROP TABLE IF EXISTS <db>.sync"
    

If using ClickHouse Cloud:

  1. Configure .env for Cloud:

    CLICKHOUSE_URL=https://[service-id].[region].aws.clickhouse.cloud:8443
    CLICKHOUSE_DATABASE=pipes
    CLICKHOUSE_USER=default
    CLICKHOUSE_PASSWORD=[your-actual-cloud-password]
    
  2. Create database manually (CLI migrations don't create databases):

  3. Verify connection before running indexer:

    curl -X POST "https://[your-service-id].[region].aws.clickhouse.cloud:8443/" \
      --user "default:[your-password]" \
      -d "SELECT 1"
    

For complete deployment guide (local Docker or ClickHouse Cloud), see DEPLOYMENT.md.

Step 3.5: Post-Generation Verification Checklist

PROXY CHECK IS #1 PRIORITY. In real-world experience, 6 out of 9 indexers required manual proxy resolution. This is the most common failure mode. Always check proxy status BEFORE doing anything else (step 5 below).

After the CLI generates the project, verify these BEFORE running:

  1. Factory address injected (uniswapV3Swaps template only):

    grep "address:" <project-folder>/src/index.ts
    # Should show your factory address, NOT an empty string ['']
    # Known bug: CLI drops the factoryAddress param — fix with:
    sed -i '' "s|address: \[''\]|address: ['<YOUR_FACTORY_ADDRESS>']|" <project-folder>/src/index.ts
    
  2. ClickHouse password correct:

    grep CLICKHOUSE_PASSWORD <project-folder>/.env
    # CLI generates "password" — standalone Docker typically uses "default"
    # If using the generated docker-compose.yml, "password" is correct
    # If using an existing standalone container, change to match your container
    
  3. Contract addresses present (custom template):

    grep "contracts:" <project-folder>/src/index.ts
    # Verify your contract address(es) appear in the generated code
    
  4. Know your table names (custom template): The custom template creates one table per event, named {contractName}_{eventName} in snake_case. Example: Contract "WETH" with events "Deposit" and "Withdrawal" creates:

    • weth_deposit
    • weth_withdrawal

    There is NO combined weth_events table. Query each event table separately.

  5. Proxy contract detection (custom template — CRITICAL): Check the generated contract file for proxy-only ABI:

    grep "export const events" <project-folder>/src/contracts/*.ts
    # If you ONLY see "Upgraded" → it's a proxy contract!
    # The CLI fetched the proxy ABI, not the implementation.
    

    If proxy detected:

    1. Find implementation address on Etherscan ("Read as Proxy" tab)
    2. Generate types from implementation:
      npx @subsquid/evm-typegen@latest <project-folder>/src/contracts \
        <IMPLEMENTATION_ADDRESS> --chain-id <CHAIN_ID>
      
    3. Update import in src/index.ts to point to the implementation file
    4. Keep the proxy address in contracts: array (events emit from proxy)

    See references/ABI_GUIDE.md for detailed proxy handling guide.

    Non-standard proxy patterns (Aragon, Diamond, etc.): Some protocols use non-standard proxy patterns where evm-typegen cannot auto-resolve the implementation. Examples:

    • Lido uses Aragon's AppProxyUpgradeable — typegen returns only ProxyDeposit
    • Pendle Router is a Diamond proxy (EIP-2535) — typegen returns an empty Contract class with no events
    • Diamond proxies (EIP-2535) — multiple implementation facets, typegen sees only the diamond cut functions

    How to detect: If the generated contract file has export class Contract extends ContractBase {} with no export const events, it's likely a Diamond proxy. Also check: grep "export const events" src/contracts/*.ts — if empty, the ABI fetch failed.

    If typegen returns a proxy-only or empty ABI:

    1. Check the contract source on Etherscan to find the actual event definitions
    2. Query Portal for the contract's actual topic0 values to discover which events it emits
    3. Manually create the contract types file using the event() helper from @subsquid/evm-abi:
      import { event, indexed } from '@subsquid/evm-abi'
      import * as p from '@subsquid/evm-codec'
      
      export const events = {
        // Verify topic0 against on-chain data before using!
        MySwap: event(
          '0xabcdef...', // topic0 hash
          'MySwap(address,address,uint256,int256)',
          {
            caller_: indexed(p.address),
            market_: indexed(p.address),
            amount_: p.uint256,
            netFlow_: p.int256,
          },
        ),
      }
      
    4. CRITICAL: Verify topic0 hashes against actual on-chain data. Query Portal for the contract's events in a recent block range and match the topic0 values. Wrong topic0 = silent zero results.
    5. Watch for event signature version changes: Some protocols change event parameters across upgrades. The same event name can have different data layouts in different blocks, causing EventDecodingError: Offset is outside the bounds of the DataView. If this happens, either filter to blocks after the upgrade or remove the problematic event.
    6. This is a last resort — try typegen with the implementation address first
  6. Contract file naming (custom template): The generated contract file is named by address, not by contractName:

    src/contracts/0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2.ts  (not weth.ts)
    

    This is cosmetic — the file works fine. The import in src/index.ts references it correctly.

  7. Database exists:

    docker exec <container> clickhouse-client --password <pw> \
      --query "SHOW DATABASES" | grep <your-db-name>
    

Step 4: Customization

  • For EVM contracts: Update contract addresses in the generated transformer
  • For custom event handling: Modify the transformer logic
  • For database schema: Edit the table definitions
  • For ABI generation: See references/ABI_GUIDE.md

Step 5: Start and Validate

cd <project-folder>
npm run dev

VERIFY START BLOCK - Check the first log message shows your intended start block, not a resumed block.

NOTE: On the very first start, you will see an error about Unknown table expression identifier 'pipes.sync'. This is harmless — the SDK tries to read the sync table before it exists, then creates it. Ignore this error on first run.

Resume vs Fresh Start Decision Tree

When you see "Resuming from block X" in the logs, use this to decide what to do:

Scenario Action
Indexer crashed mid-sync and you want to continue Keep it — resume is correct. Verify X is near where it stopped.
You changed the start block or contract address Drop sync — old sync state is stale.
You're running a different indexer on the same database Drop sync — the sync state belongs to a different indexer. Better yet, use a separate database.
This is a brand new project, first ever run Drop sync — there shouldn't be one. If there is, the database was used by another indexer.
You want to re-index from scratch Drop sync AND data tables — clean slate.

How to drop sync:

docker exec <container> clickhouse-client --password <pw> \
  --query "DROP TABLE IF EXISTS <database>.sync"

How to restart after a crash (resume is wanted):

cd <project-folder>
npm run dev
# First log line should say "Resuming from X" where X is near where it stopped

Complete Automation Script

Follow these steps IN ORDER for first-time setup:

Step 1: Check/setup database (ClickHouse example)

CLICKHOUSE_CONTAINER=$(docker ps --filter "name=clickhouse" --format "{{.Names}}" | head -n 1)

if [ -z "$CLICKHOUSE_CONTAINER" ]; then
  echo "No ClickHouse found, starting new one..."
  docker run -d --name clickhouse \
    -p 8123:8123 -p 9000:9000 \
    -e CLICKHOUSE_PASSWORD=default \
    clickhouse/clickhouse-server
  CLICKHOUSE_PASSWORD="default"
else
  echo "Using existing ClickHouse: $CLICKHOUSE_CONTAINER"
  CLICKHOUSE_PASSWORD=$(docker inspect $CLICKHOUSE_CONTAINER | grep CLICKHOUSE_PASSWORD | cut -d'"' -f4)
fi

DB_NAME=$(basename /path/to/my-new-indexer | tr '-' '_')
docker exec $CLICKHOUSE_CONTAINER clickhouse-client --query "CREATE DATABASE IF NOT EXISTS $DB_NAME"

Step 2: Generate the indexer project

npx @iankressin/pipes-cli@latest init --config '{
  "projectFolder": "/path/to/my-new-indexer",
  "packageManager": "npm",
  "networkType": "evm",
  "network": "ethereum-mainnet",
  "templates": [{"templateId": "erc20Transfers", "params": {"contractAddresses": ["0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48"]}}],
  "sink": "clickhouse"
}'

Step 3: Fix the .env file

cd /path/to/my-new-indexer
sed -i '' "s/CLICKHOUSE_PASSWORD=.*/CLICKHOUSE_PASSWORD=$CLICKHOUSE_PASSWORD/" .env
sed -i '' "s/CLICKHOUSE_DATABASE=.*/CLICKHOUSE_DATABASE=$DB_NAME/" .env

Step 4: Run the indexer

npm run dev

Performance Considerations

Sync Speed Factors

  1. Start block range:

    • Smaller range = faster sync
    • 1M blocks: 5-10 minutes
    • 5M blocks: 30-60 minutes
    • Full chain: 2-4 hours
  2. Filtering type:

    • Contract events (fastest): Events from specific contracts
    • Token pair filtering (medium): Factory pattern with filters
    • Address filtering (slowest): Requires scanning all transfers
  3. Number of contracts tracked:

    • Fewer contracts = faster processing
    • Start with 1-3 key tokens, expand later if needed

Quick Testing Strategy

For fast iteration during development:

  1. Start with recent blocks (last 1-2 weeks):

    range: { from: '21,000,000' }
    
  2. Test with limited contracts:

    contracts: ['0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2'] // Just WETH
    
  3. Once working, expand the range and contracts

Key SDK Patterns (Latest)

Event Parameter Filtering

Filter events by indexed parameters at the source level for maximum performance:

events: {
  transfers: {
    event: commonAbis.erc20.events.Transfer,
    params: {
      from: ['0x87482e84503639466fad82d1dce97f800a410945'],
      to: '0x10b32a54eeb05d2c9cd1423b4ad90c3671a2ed5f',
    },
  },
}

Factory Pattern

Track dynamically created contracts (e.g., Uniswap pools, MetaMorpho vaults). Use contractFactory() inside the contracts field of evmDecoder:

import { evmDecoder, contractFactory, contractFactoryStore } from '@subsquid/pipes/evm'

const swaps = evmDecoder({
  range: { from: '12,369,621' },
  contracts: contractFactory({
    address: ['0x1f98431c8ad98523631ae4a59f267346ea31f984'],
    event: factoryEvents.PoolCreated,
    childAddressField: 'pool',
    database: await contractFactoryStore({ path: './factory-pools.sqlite' }),
  }),
  events: {
    swaps: poolEvents.Swap,
  },
})

Factory pattern key concepts:

  • address: The factory contract(s) that emit creation events
  • event: The creation event (e.g., PoolCreated, CreateMetaMorpho)
  • parameter: The event parameter containing the child contract address
  • database: SQLite database to persist discovered child contracts across restarts

Accessing factory event metadata in .pipe(): Each decoded event includes a factory property with the creation event's data:

.pipe((deposits) => ({
  deposits: deposits.map((d) => ({
    vault: d.contract,                    // child contract address
    vaultName: d.factory?.event.name ?? '',  // from creation event
    asset: d.factory?.event.asset ?? '',     // from creation event
    sender: d.event.sender,               // from the decoded event itself
  })),
}))

Use d.factory?.event.<param> to access any parameter from the factory creation event. This is how you enrich child contract events with metadata from deployment (names, symbols, assets, etc.).

CRITICAL: Factory cold-start behavior. The factory pattern only discovers child contracts from the range.from block forward. If the factory deployed 100 contracts before your start block, those are NOT tracked. This means:

  • You may see zero data for 30-60+ seconds while the indexer catches up to a block where the factory creates a new child contract
  • This is normal, not a bug — but it can be alarming during testing
  • To track ALL child contracts, set range.from to the factory's deployment block
  • For testing, a recent start block works but expect a delay before data appears

Restarting a factory indexer from scratch: The factory pattern stores discovered contracts in a SQLite file. If you need a clean restart:

# 1. Kill the indexer
# 2. Delete the SQLite database
rm <project-folder>/*.sqlite
# 3. Drop ClickHouse sync + data tables
docker exec <container> clickhouse-client --password <pw> \
  --query "DROP TABLE IF EXISTS <db>.sync; DROP TABLE IF EXISTS <db>.<table1>; DROP TABLE IF EXISTS <db>.<table2>"
# 4. Restart
cd <project-folder> && npm run dev

### Adapting Factory Pattern for Any Protocol

The `uniswapV3Swaps` template uses the factory pattern for Uniswap pools, but you can apply it to **any** protocol that deploys child contracts via a factory. Here's the general approach:

1. **Identify the factory contract** — the contract that deploys child contracts
2. **Find the creation event** — the event emitted when a child is deployed (e.g., `PoolCreated`, `CreateMetaMorpho`, `CreateVault`)
3. **Identify the address parameter** — the event field containing the new child contract's address
4. **Generate ABIs** for both the factory and the child contracts:
   ```bash
   # Generate factory ABI
   npx @subsquid/evm-typegen@latest src/contracts \
     <FACTORY_ADDRESS> --chain-id 1

   # Generate child contract ABI (use any known child address)
   npx @subsquid/evm-typegen@latest src/contracts \
     <ANY_CHILD_CONTRACT_ADDRESS> --chain-id 1
  1. Wire up the factory pattern — replace the Uniswap-specific values with your protocol's:
    contracts: contractFactory({
      address: ['<FACTORY_ADDRESS>'],
      event: factoryEvents.YourCreationEvent,  // e.g., CreateMetaMorpho
      childAddressField: 'childAddress',        // event param with child address
      database: await contractFactoryStore({ path: './your-factory.sqlite' }),
    }),
    events: {
      yourEvents: childContractEvents.YourEvent,  // events on child contracts
    },
    

Real-world example — MetaMorpho vaults:

  • Factory: 0x1897A8997241C1cD4bD0698647e4EB7213535c24 (MetaMorpho Factory V1.1)
  • Creation event: CreateMetaMorpho with metaMorpho (indexed address)
  • Child contracts: ERC-4626 vaults emitting Deposit, Withdraw, etc.
  • Metadata from creation event: name, symbol, asset — accessible via d.factory?.event

Topic0-Only Global Filtering (No Contracts Array)

Track events across all contracts that emit a specific event signature, without needing to know addresses upfront. Simply omit the contracts field from evmDecoder:

// Captures ReallocateSupply from ALL MetaMorpho vaults — no address list needed
const reallocations = evmDecoder({
  range: { from: 18_900_000 },
  events: {
    ReallocateSupply: vaultAbi.ReallocateSupply,
    ReallocateWithdraw: vaultAbi.ReallocateWithdraw,
  },
  // NO contracts field — topic0-only filtering
})

When to use topic0-only vs factory pattern:

Topic0-Only Factory Pattern
Setup complexity Minimal — just omit contracts Requires factory ABI, SQLite DB
Cold-start delay None — instant data 30-60s while discovering child contracts
Metadata enrichment No — only d.contract for emitter address Yes — d.factory?.event gives creation params
False positives Possible if event sig is generic None — only tracks factory children
Best for Unique events (ReallocateSupply, StrategyReported) Generic events (Deposit, Transfer) on specific contracts

Use topic0-only when:

  • Event signature is unique to the protocol (won't match unrelated contracts)
  • You want to capture events across ALL instances (all vaults, all pools)
  • You don't need metadata from the factory creation event
  • You want zero setup and instant data

Don't use topic0-only when:

  • Event signature is generic (Transfer, Approval, Deposit) — matches too many unrelated contracts
  • You need creation event metadata (vault names, asset addresses)

Real examples: Yearn V2 StrategyReported (evm/037), MetaMorpho ReallocateSupply/ReallocateWithdraw (evm/039)

Combining Multiple Decoders

Pass multiple named decoders as outputs to run them in a single pipeline (replaces old pipeComposite):

const stream = evmPortalStream({
  id: 'my-indexer',
  portal: '...',
  outputs: {
    transfers: evmDecoder({ events: { transfers: commonAbis.erc20.events.Transfer } }),
    swaps: evmDecoder({ events: { swaps: uniswapV3Abi.events.Swap } }),
  },
})

IMPORTANT: contracts field format. For static contracts, pass an array of address strings:

evmDecoder({ contracts: ['0xABC...', '0xDEF...'], events: { ... } })  // ✅ correct
evmDecoder({ contracts: [{ address: ['0xABC...'] }], events: { ... } })  // ❌ wrong — crashes with "contract.toLowerCase is not a function"

The object format is ONLY for the contractFactory() helper pattern.

Decoded Event Field Access in .pipe()

When using evmDecoder with a .pipe() transform, each decoded event d has these fields:

Field Type Description
d.event.* object Decoded event parameters (e.g., d.event.vault, d.event.assets)
d.block.number number Block number
d.rawEvent.transactionHash string Transaction hash
d.rawEvent.logIndex number Log index within the block
d.timestamp Date Block timestamp as a JS Date object
d.contract string Contract address that emitted the event
d.factory object/null Factory metadata (only if using factory pattern)

⚠️ Common mistake: Using d.blockNumber or d.txHash (which don't exist) instead of d.block.number and d.rawEvent.transactionHash. These silently return undefined, stored as 0 or "" in ClickHouse.

⚠️ DateTime64(3) timestamp gotcha: When ClickHouse uses DateTime64(3, 'UTC'), pass ISO strings via d.timestamp.toISOString() with date_time_input_format: 'best_effort'. Passing epoch seconds (e.g., 1700392127) to DateTime64(3) is misinterpreted as 1970-01-20.

This differs from the CLI-generated enrichEvents helper, which flattens everything into { blockNumber, txHash, timestamp, ...eventParams }.

Target Configuration (ClickHouse)

import { clickhouseTarget } from '@subsquid/pipes/targets/clickhouse'

stream.pipeTo(clickhouseTarget({
  client: createClient({ url: process.env.CLICKHOUSE_URL }),
  onData: async (ctx, data) => {
    await ctx.insert('transfers', data.transfers)
  },
  onRollback: async (ctx, range) => {
    // Handle chain reorgs
  },
}))

Target Configuration (PostgreSQL with Drizzle)

import { drizzleTarget } from '@subsquid/pipes/targets/drizzle/node-postgres'

stream.pipeTo(drizzleTarget({
  db: drizzle(pool),
  tables: [transfersTable],
  onData: async (ctx, data) => {
    await ctx.db.insert(transfersTable).values(data.transfers)
  },
}))

Timestamp Handling (CRITICAL)

ClickHouse DateTime expects seconds, but d.timestamp.getTime() returns milliseconds. If you don't divide by 1000, timestamps will show as 1970-01-28 instead of the correct date.

When using the auto-generated enrichEvents helper

The CLI-generated enrichEvents function (in src/utils/index.ts) handles this correctly — it divides by 1000 internally. If your indexer uses enrichEvents, timestamps are fine:

// enrichEvents does this internally:
timestamp: new Date(v.timestamp).getTime() / 1000  // ← correct

When writing manual .pipe() transforms

If you write a custom .pipe() transform (e.g., to access factory metadata), you handle timestamps yourself. You MUST divide by 1000:

// WRONG — produces 1970 dates in ClickHouse
.pipe((deposits) => ({
  deposits: deposits.map((d) => ({
    timestamp: d.timestamp.getTime(),  // milliseconds! ClickHouse stores as year 1970
  })),
}))

// CORRECT — produces proper dates
.pipe((deposits) => ({
  deposits: deposits.map((d) => ({
    timestamp: Math.floor(d.timestamp.getTime() / 1000),  // seconds
  })),
}))

When to use enrichEvents vs manual .pipe()

Scenario Approach
Standard contract events, no factory metadata needed Use enrichEvents (auto-generated) — handles timestamps, block data, tx hash automatically
Need factory metadata (vault names, asset addresses from creation event) Write manual .pipe() — access d.factory?.event fields, but handle timestamps yourself
Need to combine factory metadata + standard enrichment Write manual .pipe() and include Math.floor(d.timestamp.getTime() / 1000)

Pipes Best Practices

Single Source of Truth (Database-Centric)

State should live in the database, not in process memory:

  • Prevents data loss from crashes or restarts
  • Enables recovery and replay from any point
  • Use idempotent inserts/updates

ClickHouse patterns:

  • Materialized views for derived metrics
  • SummingMergeTree for additive aggregations
  • AggregatingMergeTree for complex metrics
  • CollapsingMergeTree for reorg handling

PostgreSQL patterns:

  • ON CONFLICT UPDATE clauses for state reconciliation
  • Ensures safe re-runs without duplicates

Block-Aware Pipelines

Always start collection at or before contract deployment block:

  • Critical for metrics requiring complete event history
  • Ensures historical accuracy from block 0
  • Check deployment block on Etherscan before configuring

Reorg Handling (ClickHouse)

For reorg-sensitive events, use CollapsingMergeTree:

// Schema with sign field
CREATE TABLE events (
  ...
  sign Int8
) ENGINE = CollapsingMergeTree(sign)
ORDER BY (entity_id, block_number, tx_hash, event_type)

// Rollback handler
onRollback: (ctx, range) => {
  // Insert sign=-1 records for rolled-back blocks
  const rollbackRecords = events
    .filter(e => e.block >= range.from)
    .map(e => ({ ...e, sign: -1 }))

  return ctx.insert(rollbackRecords)
}

Critical: ORDER BY must include ALL distinguishing fields to prevent unwanted event deduplication.

Event-Based vs. State Queries

Indexers track historical flow (event-based):

  • Example: "User deposited 100, withdrew 110" = -10 net flow
  • Good for: Transaction history, activity tracking, audit logs

RPC queries track current state:

  • Example: "User currently holds 50 shares" = current balance
  • Good for: Current positions, real-time snapshots

Important: Withdrawals including accrued interest can make event flows appear negative even when positions are positive. Use RPC for current balances, events for historical analysis.

Validation Requirements

Always validate indexed data before production use:

  • Cross-reference sample transactions with block explorer
  • Verify event counts match expected ranges
  • Check for missing blocks or gaps
  • Reconcile aggregated metrics with known totals

Troubleshooting

CLI Issues

"Network timeout with npx"

  • Check internet connection
  • Try again or wait a moment
  • Ensure npm registry is accessible

"Template 'uniswap-v3-swaps' not found"

  • Use camelCase: uniswapV3Swaps not uniswap-v3-swaps
  • Run npx @iankressin/pipes-cli@latest init --schema to see available templates

"Template ID not recognized"

  • Run --schema flag to verify available templates and their exact IDs
  • Ensure you're using the latest CLI version with @latest

Database Issues

"Authentication failed: password is incorrect"

  • Check actual password: docker inspect <container> | grep CLICKHOUSE_PASSWORD
  • Update .env file with correct password

"Database pipes does not exist"

  • Create it: docker exec <container> clickhouse-client --query "CREATE DATABASE IF NOT EXISTS pipes"

"port is already allocated"

  • Use existing container instead of starting new one

"Indexer starts from wrong block / Missing data"

  • MOST COMMON ISSUE: Shared sync table between projects
  • Clear the sync table: docker exec <container> clickhouse-client --query "DROP TABLE IF EXISTS pipes.sync"
  • Restart the indexer - it will now start from the configured block

Common DeFi Protocol Fork Patterns

Many DeFi protocols are forks of well-known codebases. Recognizing forks saves time — you can reuse the same event ABI with just a different proxy address.

Aave V3 forks (same Pool ABI — Supply, Withdraw, Borrow, Repay, LiquidationCall, FlashLoan events):

  • SparkLend (Sky/MakerDAO ecosystem) — Pool: 0xC13e21B648A5Ee794902342038FF3aDAB66BE987
  • Radiant Capital, Seamless Protocol, Granary Finance, etc.
  • Implementation ABI works across all forks — just update the proxy address

Uniswap V2 forks (same Pair/Factory ABI — Swap, Mint, Burn, Sync events):

  • SushiSwap, PancakeSwap, TraderJoe, Camelot, etc.

Compound V2 forks (same cToken ABI — Mint, Redeem, Borrow, RepayBorrow, LiquidateBorrow):

  • Venus, Benqi, Tectonic, etc.

How to detect: Check the protocol's docs for "forked from" mentions, or compare the contract's event signatures against known ABIs. If the topic0 hashes match, it's a fork.

Related Skills

Related Documentation

This skill includes comprehensive reference documentation in the references/ directory:

  • ENVIRONMENT_SETUP.md - Development environment setup guide, prerequisites check, platform-specific notes, and troubleshooting
  • ABI_GUIDE.md - Fetching contract ABIs, commonAbis usage, proxy contract detection and handling, TypeScript type generation
  • SCHEMA_GUIDE.md - ClickHouse engine selection, ORDER BY strategy, BigInt handling, partitioning patterns
  • RESEARCH_CHECKLIST.md - Protocol research workflow, contract discovery, deployment block finding, common gotchas
  • HYPERLIQUID_GUIDE.md - Complete Hyperliquid fills indexer setup, API reference, query builder usage, and performance benchmarks

How to Access

cat pipes-sdk/pipes-new-indexer/references/ABI_GUIDE.md
cat pipes-sdk/pipes-new-indexer/references/SCHEMA_GUIDE.md
cat pipes-sdk/pipes-new-indexer/references/RESEARCH_CHECKLIST.md

Additional Resources

For comprehensive patterns and workflows:

  • PATTERNS.md - EVM patterns, troubleshooting, and performance optimization

Official Subsquid Documentation

Related skills
Installs
16
First Seen
Feb 23, 2026