pipes-new-indexer
Pipes: New Indexer
Create new blockchain indexer projects using the Pipes CLI.
When to Use This Skill
Activate when user wants to:
- Create a new indexer from scratch
- Generate a project with templates (ERC20, Uniswap V3, etc.)
- Start indexing a new blockchain protocol
- Set up a fresh indexer with proper structure
- Build a Hyperliquid perpetual futures fills indexer
Overview
The Pipes CLI (@iankressin/pipes-cli) provides an interactive scaffolding tool that generates production-ready indexer projects with built-in templates for common use cases.
Available Templates
Use npx @iankressin/pipes-cli@latest init --schema to see the full list of available templates and their parameter schemas.
EVM Templates
erc20Transfers - Track ERC20 token transfers:
{"templateId": "erc20Transfers", "params": {"contractAddresses": ["0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48"], "range": {"from": "21000000"}}}
uniswapV3Swaps - Track Uniswap V3 swap events via factory pattern:
{"templateId": "uniswapV3Swaps", "params": {"factoryAddress": "0x1F98431c8aD98523631AE4a59f267346ea31F984", "range": {"from": "21000000"}}}
custom - Custom contract events (requires full ABI event objects, NOT just event names):
{"templateId": "custom", "params": {"contracts": [{"contractAddress": "0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2", "contractName": "WETH", "contractEvents": [{"name": "Deposit", "type": "event", "inputs": [{"name": "dst", "type": "address", "indexed": true}, {"name": "wad", "type": "uint256"}]}, {"name": "Withdrawal", "type": "event", "inputs": [{"name": "src", "type": "address", "indexed": true}, {"name": "wad", "type": "uint256"}]}], "range": {"from": "21000000"}}]}}
COMMON MISTAKE: Passing "contractEvents": ["Deposit", "Withdrawal"] (just names) will fail with Invalid input: expected array, received undefined. Each event must be a full ABI object with name, type, and inputs fields. For indexed parameters, include "indexed": true.
SVM (Solana) Templates
- custom - Start with a blank template for custom logic
NOTE: The SVM custom template may fail with Invalid input: expected array, received undefined. If so, scaffold the project manually (package.json, tsconfig, src/index.ts, docker-compose.yml).
Solana Program Types: Anchor vs Non-Anchor
Before writing any Solana indexer, determine if the program is Anchor or non-Anchor. This dictates your entire approach:
| Anchor Programs | Non-Anchor Programs | |
|---|---|---|
| Discriminator | d8 (8 bytes, from sha256("global:<name>")) |
d1 (1 byte, Borsh enum index) |
| Typegen | Yes — @subsquid/solana-typegen |
No — manual Borsh decoding |
| IDL available | On-chain or GitHub (target/idl/*.json) |
No IDL — read Rust source |
| Decoding | instructions.swap.decode(ins) → typed { accounts, data } |
Manual: read bytes from base58-decoded data |
| Examples | Jito Tip Distribution, Jupiter Lend, Orca Whirlpool | SPL Token, SPL Stake Pool, System Program |
How to detect: Query Portal for the program's instructions and check d8 values. If they're 8-byte hex (0x3ec6d6c1d59f6cd2), it's Anchor. If the meaningful discriminator is d1 (single byte like 0x0e), it's non-Anchor.
Anchor Programs: Use Typegen
@subsquid/solana-typegen generates typed instruction decoders from Anchor IDLs. Use this for all Anchor programs — never manually compute d8 discriminators.
Install:
npm install @subsquid/solana-typegen
Generate types from IDL (3 methods):
# Method 1: Local IDL JSON file
npx squid-solana-typegen src/abi whirlpool.json
# Method 2: Fetch IDL from on-chain (Anchor programs store IDLs on-chain)
npx squid-solana-typegen src/abi whirLbMiicVdio4qvUfM5KAg6Ct8VwpYzGff3uctyCc#whirlpool
# Method 3: Glob pattern for multiple IDLs
npx squid-solana-typegen src/abi ./idl/*
What it generates: A TypeScript module exporting:
programId— the program's public key constantinstructions.<name>.d8— 8-byte discriminator for each instructioninstructions.<name>.decode(ins)— typed decoder returning{ accounts, data }instructions.<name>.accountSelection(mapping)— helper for account-position filtering
Usage with SolanaQueryBuilder:
import * as whirlpool from './abi/whirlpool'
import { solanaPortalStream, solanaQuery } from '@subsquid/pipes/solana'
const query = solanaQuery()
.addFields({
block: { number: true, timestamp: true },
transaction: { signatures: true },
instruction: { programId: true, accounts: true, data: true },
})
.addInstruction({
range: { from: 280000000 },
request: {
programId: [whirlpool.programId],
d8: [whirlpool.instructions.swap.d8],
// Account-position filtering (optional — replaces manual a0/a1 filters):
...whirlpool.instructions.swap.accountSelection({
whirlpool: ['7qbRF6YsyGuLUVs6Y1q64bdVrfe4ZcUUz1JRdoVNUJnm']
}),
transaction: true,
},
})
Decoding instructions in .pipe():
.pipe((blocks) => {
const swaps = []
for (const block of blocks) {
for (const ins of block.instructions ?? []) {
if (ins.programId === whirlpool.programId &&
ins.d8 === whirlpool.instructions.swap.d8) {
const decoded = whirlpool.instructions.swap.decode(ins)
// decoded.accounts — typed account addresses
// decoded.data — typed instruction parameters
swaps.push({
slot: block.header.number,
timestamp: new Date(block.header.timestamp * 1000).toISOString(),
...decoded.data,
})
}
}
}
return { swaps }
})
Where to find IDLs:
- On-chain (Anchor programs): Use
npx squid-solana-typegen src/abi <programId>#<name>— this queries the chain directly - GitHub: Most protocols publish IDLs in their repos (e.g.,
target/idl/<program>.json) - Anchor IDL registries: Programs built with Anchor store IDLs at a PDA derived from the program ID
- Protocol docs: Some protocols link to their IDL files directly
When on-chain IDL is not available (still Anchor):
- Some Anchor programs don't store IDLs on-chain. The
programId#nameformat will fail with "Failed to fetch IDL" - Download the IDL from the protocol's GitHub repo (usually
target/idl/<program>.json) - Pass local files to typegen:
npx squid-solana-typegen src/abi ./idl/*.json
Non-Anchor Programs: Manual d1 Decoding
Typegen does NOT work for non-Anchor programs (SPL Token, SPL Stake Pool, System Program, etc.). These use single-byte Borsh enum discriminators (d1) and require manual decoding.
Step 1: Discover d1 discriminators from Portal
# Query ALL instructions from the program in a small slot range
curl -s 'https://portal.sqd.dev/datasets/solana-mainnet/stream' \
-H 'content-type: application/json' \
-H 'accept: application/x-ndjson' \
-d '{"type":"solana","fromBlock":400000000,"toBlock":400010000,
"instructions":[{"programId":["SPoo1Ku8WFXoNDMHPsrGSTSG1Y47rzgn41SLUNakuHy"]}],
"fields":{"instruction":{"programId":true,"d1":true}}}' \
| python3 -c "import sys,json; counts={}
for l in sys.stdin:
for i in json.loads(l).get('instructions',[]):
d=i.get('d1','?'); counts[d]=counts.get(d,0)+1
for k,v in sorted(counts.items(),key=lambda x:-x[1]): print(f'{k}: {v}')"
Step 2: Map d1 values to instruction names from the Rust source
// SPL Stake Pool instruction discriminators (from instruction.rs enum order)
const INSTRUCTIONS: Record<string, string> = {
'0x09': 'DepositStake',
'0x0a': 'WithdrawStake',
'0x0e': 'DepositSol',
'0x10': 'WithdrawSol',
'0x18': 'DepositSolWithSlippage',
'0x19': 'WithdrawSolWithSlippage',
}
Step 3: Filter by d1 in SolanaQueryBuilder
const query = solanaQuery()
.addFields({
block: { number: true, timestamp: true },
transaction: { transactionIndex: true, signatures: true },
instruction: { transactionIndex: true, programId: true, data: true, accounts: true },
})
.addInstruction({
range: { from: FROM_SLOT },
request: {
programId: [SPL_STAKE_POOL],
d1: Object.keys(INSTRUCTIONS), // d1 filter, NOT d8
isCommitted: true,
transaction: true,
},
})
Step 4: Decode manually in .pipe()
// Base58 decode the instruction data, then read fields
const data = base58Decode(ins.data)
const d1Hex = '0x' + data[0].toString(16).padStart(2, '0')
const instrName = INSTRUCTIONS[d1Hex]
// Read u64 LE amount after the d1 byte (if present)
let amount = 0n
if (data.length >= 9) {
for (let i = 0; i < 8; i++) {
amount |= BigInt(data[1 + i]) << BigInt(i * 8)
}
}
// Pool account is typically a0
const pool = ins.accounts?.[0] ?? ''
Real examples: SPL Stake Pool (solana/006), Kamino Lend uses d8 but manual decode (solana/001)
Volume warning: Many non-Anchor programs (especially SPL Stake Pool) have very low on-chain instruction volume. Always check volume with Portal BEFORE building — see the "Pre-Build: Estimate Instruction Volume" section in the portal-query skill's Solana reference.
CPI (Cross-Program Invocation) — CRITICAL for many DeFi protocols:
- Many Solana DeFi protocols use a layered architecture where user-facing programs call core programs via CPI
- Example: Jupiter Lend's Lending/Vaults programs call the Liquidity program's
operateinstruction via CPI - If your indexer returns zero data but Portal shows instructions exist, the instructions are likely CPI (inner instructions)
- Fix: Add
innerInstructions: trueto theaddInstruction()request:.addInstruction({ range: { from: FROM_SLOT }, request: { programId: [myProgram.programId], d8: [myProgram.instructions.operate.d8], isCommitted: true, transaction: true, innerInstructions: true, // Required for CPI calls }, }) - How to detect: Query Portal for the program's instructions, then check
instructionAddressfield — if all entries havelen > 1, they're all CPI - Without this flag, the indexer silently captures zero data — Portal cross-reference in validate.ts will catch this
CRITICAL: Template IDs must use camelCase format. Each template has specific required params - check the schema.
Hyperliquid Fills (No CLI Template — Manual Setup)
The Pipes SDK supports Hyperliquid fills natively via @subsquid/pipes/hyperliquid, but there is no CLI template yet. You must scaffold the project manually.
Import:
import { hyperliquidFillsPortalStream, hyperliquidFillsQuery } from '@subsquid/pipes/hyperliquid'
Query builder pattern:
const query = hyperliquidFillsQuery()
.addRange({ from: 920000000 })
.addFields({
block: { number: true, timestamp: true },
fill: {
user: true, coin: true, px: true, sz: true,
side: true, dir: true, closedPnl: true,
fee: true, feeToken: true, crossed: true, startPosition: true,
},
})
.addFill({ range: { from: 920000000 }, request: { coin: ['BTC', 'ETH', 'SOL'] } })
CRITICAL: .addFill() requires a range parameter — passing only { request: {...} } will crash with Cannot read properties of undefined (reading 'from').
Choosing a start block: Blocks increment at ~1/second. Use current_block - (days × 86400) to estimate. For a 7-day window, subtract ~604,800 from the current head block. A 7-day BTC/ETH/SOL sync yields ~6M fills in ~2-3 minutes.
Source (SDK 1.0+):
await hyperliquidFillsPortalStream({
id: 'hl-perps-fills', // required unique pipe ID
portal: 'https://portal.sqd.dev/datasets/hyperliquid-fills',
outputs: query, // query builder passed as outputs (replaces old `query` field)
})
Data shape: .pipe() receives Block[] directly (not { blocks }). Each block has header (number, timestamp) and fills array:
.pipe((blocks) => {
const fills = blocks.flatMap((block) =>
block.fills.map((fill) => ({
block_number: block.header.number,
timestamp: new Date(block.header.timestamp).toISOString(),
user: fill.user,
coin: fill.coin,
px: fill.px,
sz: fill.sz,
side: fill.side === 'B' ? 'Buy' : 'Sell',
dir: fill.dir,
closed_pnl: fill.closedPnl,
fee: fill.fee,
notional: fill.px * fill.sz,
sign: 1,
})),
)
return { fills }
})
Key differences from EVM indexers:
- No
evmDecoder— usehyperliquidFillsQuery()+.pipe()directly - Dataset starts at block 750,000,000 (not block 0)
- Timestamps are in milliseconds. Use
new Date(block.header.timestamp).toISOString()for ClickHouse (withdate_time_input_format: 'best_effort'). Unlike EVM indexers, you do NOT divide by 1000 — the ISO string approach handles it. sideis'B'(buy) or'S'(sell) — single character codes. The pipe transform maps these to'Buy'/'Sell'for ClickHouse, so SQL queries use the mapped values.dirvalues:"Open Long","Close Long","Open Short","Close Short","Long > Short","Short > Long","Net Child Vaults"- All numeric values (
px,sz,closedPnl,fee) are native floats, not BigInt closedPnlis only non-zero for closing tradesfeecan be negative (maker rebate)
Fill filter options (pass in addFill({ request: {...} })):
coin— Filter by asset:['BTC', 'ETH', 'SOL']user— Filter by trader address:['0x...']dir— Filter by direction:['Open Long', 'Close Short']feeToken— Filter by fee denominationbuilder— Filter by builder address
ClickHouse schema for fills:
CREATE TABLE IF NOT EXISTS hl_fills (
block_number UInt64,
timestamp DateTime64(3, 'UTC'),
user LowCardinality(String),
coin LowCardinality(String),
px Float64,
sz Float64,
side LowCardinality(String),
dir LowCardinality(String),
closed_pnl Float64,
fee Float64,
notional Float64,
sign Int8
) ENGINE = CollapsingMergeTree(sign)
ORDER BY (coin, block_number, user, dir)
PARTITION BY toYYYYMM(timestamp);
Common use cases (tested):
- Whale tracker — filter by
userto track specific whale addresses, monitor PnL and positions (894K fills for 5 whales in 60s) - Multi-coin volume — filter by
coinwith 10+ assets including HYPE, DOGE, WIF (2.35M fills for 9 coins in 60s) - Perps analytics — track all fills for majors, aggregate daily volume and long/short ratios (5M fills in 70s)
Coin tickers: Use uppercase symbols: BTC, ETH, SOL, HYPE, DOGE, WIF, ARB, SUI, AVAX, etc. Some tokens use prefixed tickers: kPEPE (not PEPE), kBONK, kFLOKI, cash:GOLD, cash:TSLA, xyz:GOLD. Not all tokens use obvious tickers — run a broad query first to discover available coins.
Computed fields: The notional field (fill.px * fill.sz) is NOT a native fill field — it's computed in the .pipe() transform. Native fields are listed in the addFields section above.
See references/HYPERLIQUID_GUIDE.md for the complete manual setup walkthrough, use case examples, and SQL queries.
Supported Sinks
- ClickHouse - High-performance analytics database (recommended)
- PostgreSQL - Relational database with Drizzle ORM
Note: Memory sink is listed in the schema but not yet implemented in the CLI.
Node.js Version Requirement
Use Node.js LTS (v20 or v22). Node.js v25.x has known zstd decompression bugs that cause random crashes during Portal data streaming. If node --version shows v25.x, switch to LTS:
# Option 1: nvm (if installed)
nvm install 22
nvm use 22
# Option 2: Homebrew (macOS)
brew install node@22
export PATH="/opt/homebrew/opt/node@22/bin:$PATH"
# Option 3: Direct download
# Visit https://nodejs.org/ and install the LTS version
If you cannot switch Node versions, the indexer may still work for smaller syncs. The zstd bug tends to crash on large syncs (millions of blocks). For quick tests with recent blocks, v25 often works.
SDK 1.0 Key Features
Time-Based Ranges
Ranges now accept ISO date strings and Date objects instead of block numbers. Dates are auto-resolved to block numbers via the Portal API:
evmDecoder({
range: { from: '2024-01-01' }, // ISO date string → resolved to block number
events: { transfers: erc20.events.Transfer },
})
// Date objects work too
evmDecoder({
range: { from: new Date('2024-01-01'), to: new Date('2024-02-01') },
events: { ... },
})
// Formatted block numbers
evmDecoder({ range: { from: '18_908_900' }, ... }) // underscores OK
// Latest block
evmDecoder({ range: { from: 'latest' }, ... }) // only for `from`
Validation: Inverted ranges (from > to) and unresolvable timestamps throw BlockRangeConfigurationError (E0002).
defineAbi — Use JSON ABIs Without Codegen
defineAbi() converts a standard JSON ABI into decoder objects at runtime — no squid-evm-typegen step needed:
import erc20Json from './erc20.json'
import { defineAbi } from '@subsquid/pipes'
const erc20 = defineAbi(erc20Json)
evmDecoder({
events: {
transfers: erc20.events.Transfer,
approvals: erc20.events.Approval,
},
})
Accepts: plain ABI array, as const literal (for full type inference), or Hardhat/Foundry artifact with .abi field. Uses @subsquid/evm-codec (10x faster than viem).
Query Builder Shorthands
Factory functions replace new *QueryBuilder():
| Old | New |
|---|---|
new EvmQueryBuilder() |
evmQuery() |
new SolanaQueryBuilder() |
solanaQuery() |
new HyperliquidFillsQueryBuilder() |
hyperliquidFillsQuery() |
Typed Error System
All framework errors carry unique codes linking to docs:
| Error | Code | When |
|---|---|---|
DefaultPipeIdError |
E0001 | .pipeTo() called without id on source |
BlockRangeConfigurationError |
E0002 | Inverted range, invalid date with 'latest', unresolvable timestamp |
Testing with @subsquid/pipes/testing/evm
Test pipe logic end-to-end without hitting a real portal. The testing library provides:
encodeEvent— encode events with full type inference from viem ABIsmockBlock— build mock blocks with auto-generated metadata (number, hash, timestamp)evmPortalMockStream— spin up a mock portal HTTP serverresetMockBlockCounter— reset block numbering between tests
Install: Requires vitest and viem as dev dependencies.
Basic test pattern:
import { commonAbis, evmDecoder, evmPortalStream } from '@subsquid/pipes/evm'
import {
type MockPortal,
encodeEvent,
evmPortalMockStream,
mockBlock,
resetMockBlockCounter,
} from '@subsquid/pipes/testing/evm'
// Helper to collect all stream output
async function readAll<T>(stream: AsyncIterable<{ data: T[] }>): Promise<T[]> {
const res: T[] = []
for await (const chunk of stream) {
res.push(...chunk.data)
}
return res
}
const ERC20_ABI = [
{
type: 'event' as const,
name: 'Transfer',
inputs: [
{ name: 'from', type: 'address', indexed: true },
{ name: 'to', type: 'address', indexed: true },
{ name: 'value', type: 'uint256', indexed: false },
],
},
] as const
const USDC = '0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48' as const
const ALICE = '0x7a250d5630b4cf539739df2c5dacb4c659f2488d' as const
const BOB = '0xc82e11e709deb68f3631fc165ebd8b4e3fc3d18f' as const
Test 1: Decode events from mock blocks
let portal: MockPortal
beforeEach(() => resetMockBlockCounter())
afterEach(async () => await portal?.close())
it('should decode ERC20 transfers', async () => {
// 1. Encode events — args are fully typed from the ABI
const transfer1 = encodeEvent({
abi: ERC20_ABI,
eventName: 'Transfer',
address: USDC,
args: { from: ALICE, to: BOB, value: 1_000_000n },
})
// 2. Build mock blocks — metadata is auto-generated
portal = await evmPortalMockStream({
blocks: [
mockBlock({ transactions: [{ logs: [transfer1] }] }),
],
})
// 3. Create pipe exactly as production, but with mock portal URL
const stream = evmPortalStream({
id: 'test',
portal: portal.url,
outputs: evmDecoder({
range: { from: 0, to: 1 },
events: { transfers: commonAbis.erc20.events.Transfer },
}),
}).pipe((batch) => batch.transfers)
// 4. Collect and assert
const transfers = await readAll(stream)
expect(transfers).toHaveLength(1)
expect(transfers[0].event.from).toBe(ALICE)
expect(transfers[0].event.value).toBe(1_000_000n)
expect(transfers[0].contract).toBe(USDC)
})
Test 2: Custom pipe transformations
it('should test custom transformations', async () => {
const transfer = encodeEvent({
abi: ERC20_ABI, eventName: 'Transfer', address: USDC,
args: { from: ALICE, to: BOB, value: 2_000_000n },
})
portal = await evmPortalMockStream({
blocks: [mockBlock({ transactions: [{ logs: [transfer] }] })],
})
// Chain .pipe() transforms — same as production code
const stream = evmPortalStream({
id: 'test',
portal: portal.url,
outputs: evmDecoder({
range: { from: 0, to: 1 },
events: { transfers: commonAbis.erc20.events.Transfer },
}),
})
.pipe((batch) => batch.transfers)
.pipe((transfers) =>
transfers.map((t) => ({
from: t.event.from,
to: t.event.to,
amount: Number(t.event.value) / 1e6,
})),
)
const results = await readAll(stream)
expect(results[0]).toEqual({ from: ALICE, to: BOB, amount: 2 })
})
Key patterns:
encodeEventacceptsabi,eventName,address, and typedargsmockBlockauto-generatesnumber,hash,timestamp— callresetMockBlockCounter()inbeforeEachevmPortalMockStreamreturns{ url, close() }— useportal.urlwithevmPortalStream- Chain
.pipe()on the stream to test transformations - Multiple event types: pass multiple events in
events: { transfers: ..., approvals: ... }and accessbatch.transfers,batch.approvals
How to Use the CLI
CLI Known Issue: ora ESM/CJS Crash
The CLI init command crashes with (0 , import_ora.default) is not a function on every init call. This is because the CLI bundles ESM-only ora v6+ as CJS. The --schema and --version commands still work.
IMPORTANT: This happens on every init call, not just the first time. Additionally, npx may re-download the CLI at any time, overwriting any previous patch. You must patch before every init.
Automatic pre-flight patch (MANDATORY before every init):
Run this before EVERY npx @iankressin/pipes-cli@latest init call:
# Pre-flight: ensure CLI is cached, then patch ora
npx @iankressin/pipes-cli@latest --version 2>/dev/null || true
CLI_PATH=$(find ~/.npm/_npx -name "index.cjs" -path "*pipes-cli*" 2>/dev/null | head -1)
if [ -n "$CLI_PATH" ] && ! grep -q 'import_ora = { default: function' "$CLI_PATH"; then
sed -i.bak 's/var import_ora = __toESM(require("ora"), 1);/var import_ora = { default: function(opts) { var t = typeof opts === "string" ? opts : (opts \&\& opts.text) || ""; return { start: function(m) { console.log(m || t); return this; }, succeed: function(m) { console.log(m || t); return this; }, fail: function(m) { console.log(m || t); return this; }, stop: function() { return this; }, text: t }; } };/' "$CLI_PATH"
echo "ora patch applied"
else
echo "ora patch already in place (or CLI not found)"
fi
When using this skill programmatically, ALWAYS run the pre-flight patch as a single block immediately before the init command. Do not assume a previous patch is still in place.
Programmatic Mode (RECOMMENDED for Claude Code)
ALWAYS use programmatic mode with the published npm package:
npx @iankressin/pipes-cli@latest init --config '{
"projectFolder": "/path/to/my-indexer",
"packageManager": "npm",
"networkType": "evm",
"network": "ethereum-mainnet",
"templates": [{"templateId": "erc20Transfers", "params": {"contractAddresses": ["0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48"]}}],
"sink": "clickhouse"
}'
CRITICAL: Template IDs must use camelCase:
- Use
"uniswapV3Swaps"NOT"uniswap-v3-swaps" - Use
"erc20Transfers"NOT"erc20-transfers"
Inspecting Available Templates
Before creating an indexer, inspect supported templates and their configuration:
npx @iankressin/pipes-cli@latest init --schema
This displays:
- All available template IDs (camelCase format)
- Required and optional parameters for each template
- Sink-specific configurations
- Network options
Template ID to Parameters Mapping
The --schema output uses an anyOf structure without discriminators. Here is the mapping:
| templateId | Required params | Description |
|---|---|---|
custom |
contracts (array of {contractAddress, contractName, contractEvents, range}) |
Custom contract events with full ABI |
erc20Transfers |
contractAddresses (array of address strings), range |
ERC20 Transfer events |
uniswapV3Swaps |
factoryAddress (single address string), range |
Uniswap V3 swaps via factory pattern |
Known issue: The schema anyOf variants do not include const discriminators on templateId. Match by the params property names (contracts vs contractAddresses vs factoryAddress).
Critical Rule: NEVER MANUALLY CREATE INDEXER FILES
ALWAYS use the Pipes CLI programmatic mode. Manual file creation = YOLO mode = guaranteed problems.
If the CLI fails:
- Fix the CLI issue first
- Never work around it by creating files manually
- Manual creation bypasses all scaffolding, dependency setup, and configuration
Workflow for Helping Users
Step 0: Research Protocol Architecture (MANDATORY)
Before writing ANY code or generating the project:
-
Understand the protocol structure:
- Visit the protocol's documentation
- Identify contract relationships (vault vs underlying protocol, factory vs instances, etc.)
- Determine which contract emits the events you need
-
Ask clarifying questions:
- What blockchain do they want to index? (Ethereum, Polygon, Solana, etc.)
- What does "track X" mean in this context? (e.g., "allocations" could mean rebalancing events OR actual positions)
- Which contract emits the relevant events? (Don't assume - verify!)
- Is there a specific contract, pool, or address? (Important for customization)
- Time range needed? (Recent data only = faster, full history = slower)
- Where should the data be stored? (ClickHouse, PostgreSQL, CSV)
- What should the project be named?
-
Check for proxy contracts (MANDATORY for custom template):
This is the #1 failure mode for DeFi protocol indexers. Most major DeFi protocols (Aave, Compound, Uniswap governance, etc.) use proxy contracts. If the target contract is a proxy, the CLI will fetch only the proxy ABI (typically just an
Upgradedevent), not the implementation ABI with the actual business logic events.Before running the CLI, check EVERY target contract:
# Check Etherscan for proxy status (replace chain/address as needed) # Look for: "This contract is a proxy" banner on the contract page # Or check the "Read as Proxy" tab — if it exists, the contract IS a proxyIf the contract IS a proxy:
- Find the implementation address (Etherscan "Read as Proxy" tab → implementation address)
- You will need the implementation's ABI for event decoding
- But you will use the proxy's address in the indexer config (events emit from the proxy)
- Plan to run
evm-typegenagainst the implementation address AFTER CLI generation:npx @subsquid/evm-typegen@latest <project>/src/contracts \ <IMPLEMENTATION_ADDRESS> --chain-id <CHAIN_ID> - Then update the import in
src/index.tsto use the implementation's generated file
Real example — Aave V3 Pool:
- Proxy:
0x87870Bca3F3fD6335C3F4ce8392D69350B4fA4E2(this is what users provide) - CLI fetches proxy ABI → only gets
Upgradedevent (WRONG) - Implementation has
Supply,Borrow,Repay,Withdraw,LiquidationCallevents - Solution: Generate types from implementation, keep proxy address in config
-
Verify your understanding:
- Look at actual transactions on Etherscan to see which events are emitted
- Check if there are multiple contracts involved
- Understand the data flow between contracts
Step 1: Inspect Available Templates
Before generating the project, inspect the available templates:
npx @iankressin/pipes-cli@latest init --schema
This ensures you use the correct templateId and understand the required configuration.
Step 2: Run the CLI
npx @iankressin/pipes-cli@latest init --config '{
"projectFolder": "/path/to/my-indexer",
"packageManager": "npm",
"networkType": "evm",
"network": "ethereum-mainnet",
"templates": [{"templateId": "erc20Transfers", "params": {"contractAddresses": ["0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48"]}}],
"sink": "clickhouse"
}'
IMPORTANT: Use camelCase for templateId values. Every template requires a params object - check --schema for required fields.
Step 3: Post-generation Setup (AUTOMATED - Do this AFTER CLI succeeds)
WARNING: The CLI defaults to pipes as the database name. This WILL cause sync table conflicts if you run more than one indexer. Every indexer writes sync state to {database}.sync with id = 'stream'. Two indexers sharing a database = the second one resumes from the first's position, causing wrong data or missing events.
MANDATORY: Always create a project-specific database. Use the project name or a descriptive name (e.g., aave_v3_eth, usdc_transfers, uniswap_base):
# Step 3a: Derive a database name from the project
DB_NAME=$(basename <project-folder> | tr '-' '_')
# Examples: aave-v3-indexer → aave_v3_indexer, usdc-transfers → usdc_transfers
# Step 3b: Create the dedicated database
docker exec <container> clickhouse-client --password <pw> \
--query "CREATE DATABASE IF NOT EXISTS $DB_NAME"
# Step 3c: Update .env to use the dedicated database (NOT the default 'pipes')
sed -i '' "s/CLICKHOUSE_DATABASE=.*/CLICKHOUSE_DATABASE=$DB_NAME/" <project-folder>/.env
If using ClickHouse (Local Docker):
- Get the actual password from existing container OR use "default" if creating new
- Update the .env file with correct password AND database:
sed -i '' 's/CLICKHOUSE_PASSWORD=.*/CLICKHOUSE_PASSWORD=<actual-password>/' <project-folder>/.env sed -i '' "s/CLICKHOUSE_DATABASE=.*/CLICKHOUSE_DATABASE=$DB_NAME/" <project-folder>/.env - If you MUST reuse a database, clear the sync table first:
docker exec <container-name> clickhouse-client --password <password> \ --query "DROP TABLE IF EXISTS <db>.sync"
If using ClickHouse Cloud:
-
Configure .env for Cloud:
CLICKHOUSE_URL=https://[service-id].[region].aws.clickhouse.cloud:8443 CLICKHOUSE_DATABASE=pipes CLICKHOUSE_USER=default CLICKHOUSE_PASSWORD=[your-actual-cloud-password] -
Create database manually (CLI migrations don't create databases):
- Go to https://clickhouse.cloud/
- Navigate to your service
- Click "SQL Console"
- Run:
CREATE DATABASE IF NOT EXISTS pipes;
-
Verify connection before running indexer:
curl -X POST "https://[your-service-id].[region].aws.clickhouse.cloud:8443/" \ --user "default:[your-password]" \ -d "SELECT 1"
For complete deployment guide (local Docker or ClickHouse Cloud), see DEPLOYMENT.md.
Step 3.5: Post-Generation Verification Checklist
PROXY CHECK IS #1 PRIORITY. In real-world experience, 6 out of 9 indexers required manual proxy resolution. This is the most common failure mode. Always check proxy status BEFORE doing anything else (step 5 below).
After the CLI generates the project, verify these BEFORE running:
-
Factory address injected (uniswapV3Swaps template only):
grep "address:" <project-folder>/src/index.ts # Should show your factory address, NOT an empty string [''] # Known bug: CLI drops the factoryAddress param — fix with: sed -i '' "s|address: \[''\]|address: ['<YOUR_FACTORY_ADDRESS>']|" <project-folder>/src/index.ts -
ClickHouse password correct:
grep CLICKHOUSE_PASSWORD <project-folder>/.env # CLI generates "password" — standalone Docker typically uses "default" # If using the generated docker-compose.yml, "password" is correct # If using an existing standalone container, change to match your container -
Contract addresses present (custom template):
grep "contracts:" <project-folder>/src/index.ts # Verify your contract address(es) appear in the generated code -
Know your table names (custom template): The custom template creates one table per event, named
{contractName}_{eventName}in snake_case. Example: Contract "WETH" with events "Deposit" and "Withdrawal" creates:weth_depositweth_withdrawal
There is NO combined
weth_eventstable. Query each event table separately. -
Proxy contract detection (custom template — CRITICAL): Check the generated contract file for proxy-only ABI:
grep "export const events" <project-folder>/src/contracts/*.ts # If you ONLY see "Upgraded" → it's a proxy contract! # The CLI fetched the proxy ABI, not the implementation.If proxy detected:
- Find implementation address on Etherscan ("Read as Proxy" tab)
- Generate types from implementation:
npx @subsquid/evm-typegen@latest <project-folder>/src/contracts \ <IMPLEMENTATION_ADDRESS> --chain-id <CHAIN_ID> - Update import in
src/index.tsto point to the implementation file - Keep the proxy address in
contracts:array (events emit from proxy)
See
references/ABI_GUIDE.mdfor detailed proxy handling guide.Non-standard proxy patterns (Aragon, Diamond, etc.): Some protocols use non-standard proxy patterns where
evm-typegencannot auto-resolve the implementation. Examples:- Lido uses Aragon's
AppProxyUpgradeable— typegen returns onlyProxyDeposit - Pendle Router is a Diamond proxy (EIP-2535) — typegen returns an empty
Contractclass with no events - Diamond proxies (EIP-2535) — multiple implementation facets, typegen sees only the diamond cut functions
How to detect: If the generated contract file has
export class Contract extends ContractBase {}with noexport const events, it's likely a Diamond proxy. Also check:grep "export const events" src/contracts/*.ts— if empty, the ABI fetch failed.If typegen returns a proxy-only or empty ABI:
- Check the contract source on Etherscan to find the actual event definitions
- Query Portal for the contract's actual topic0 values to discover which events it emits
- Manually create the contract types file using the
event()helper from@subsquid/evm-abi:import { event, indexed } from '@subsquid/evm-abi' import * as p from '@subsquid/evm-codec' export const events = { // Verify topic0 against on-chain data before using! MySwap: event( '0xabcdef...', // topic0 hash 'MySwap(address,address,uint256,int256)', { caller_: indexed(p.address), market_: indexed(p.address), amount_: p.uint256, netFlow_: p.int256, }, ), } - CRITICAL: Verify topic0 hashes against actual on-chain data. Query Portal for the contract's events in a recent block range and match the topic0 values. Wrong topic0 = silent zero results.
- Watch for event signature version changes: Some protocols change event parameters across upgrades. The same event name can have different data layouts in different blocks, causing
EventDecodingError: Offset is outside the bounds of the DataView. If this happens, either filter to blocks after the upgrade or remove the problematic event. - This is a last resort — try typegen with the implementation address first
-
Contract file naming (custom template): The generated contract file is named by address, not by
contractName:src/contracts/0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2.ts (not weth.ts)This is cosmetic — the file works fine. The import in
src/index.tsreferences it correctly. -
Database exists:
docker exec <container> clickhouse-client --password <pw> \ --query "SHOW DATABASES" | grep <your-db-name>
Step 4: Customization
- For EVM contracts: Update contract addresses in the generated transformer
- For custom event handling: Modify the transformer logic
- For database schema: Edit the table definitions
- For ABI generation: See references/ABI_GUIDE.md
Step 5: Start and Validate
cd <project-folder>
npm run dev
VERIFY START BLOCK - Check the first log message shows your intended start block, not a resumed block.
NOTE: On the very first start, you will see an error about Unknown table expression identifier 'pipes.sync'. This is harmless — the SDK tries to read the sync table before it exists, then creates it. Ignore this error on first run.
Resume vs Fresh Start Decision Tree
When you see "Resuming from block X" in the logs, use this to decide what to do:
| Scenario | Action |
|---|---|
| Indexer crashed mid-sync and you want to continue | Keep it — resume is correct. Verify X is near where it stopped. |
| You changed the start block or contract address | Drop sync — old sync state is stale. |
| You're running a different indexer on the same database | Drop sync — the sync state belongs to a different indexer. Better yet, use a separate database. |
| This is a brand new project, first ever run | Drop sync — there shouldn't be one. If there is, the database was used by another indexer. |
| You want to re-index from scratch | Drop sync AND data tables — clean slate. |
How to drop sync:
docker exec <container> clickhouse-client --password <pw> \
--query "DROP TABLE IF EXISTS <database>.sync"
How to restart after a crash (resume is wanted):
cd <project-folder>
npm run dev
# First log line should say "Resuming from X" where X is near where it stopped
Complete Automation Script
Follow these steps IN ORDER for first-time setup:
Step 1: Check/setup database (ClickHouse example)
CLICKHOUSE_CONTAINER=$(docker ps --filter "name=clickhouse" --format "{{.Names}}" | head -n 1)
if [ -z "$CLICKHOUSE_CONTAINER" ]; then
echo "No ClickHouse found, starting new one..."
docker run -d --name clickhouse \
-p 8123:8123 -p 9000:9000 \
-e CLICKHOUSE_PASSWORD=default \
clickhouse/clickhouse-server
CLICKHOUSE_PASSWORD="default"
else
echo "Using existing ClickHouse: $CLICKHOUSE_CONTAINER"
CLICKHOUSE_PASSWORD=$(docker inspect $CLICKHOUSE_CONTAINER | grep CLICKHOUSE_PASSWORD | cut -d'"' -f4)
fi
DB_NAME=$(basename /path/to/my-new-indexer | tr '-' '_')
docker exec $CLICKHOUSE_CONTAINER clickhouse-client --query "CREATE DATABASE IF NOT EXISTS $DB_NAME"
Step 2: Generate the indexer project
npx @iankressin/pipes-cli@latest init --config '{
"projectFolder": "/path/to/my-new-indexer",
"packageManager": "npm",
"networkType": "evm",
"network": "ethereum-mainnet",
"templates": [{"templateId": "erc20Transfers", "params": {"contractAddresses": ["0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48"]}}],
"sink": "clickhouse"
}'
Step 3: Fix the .env file
cd /path/to/my-new-indexer
sed -i '' "s/CLICKHOUSE_PASSWORD=.*/CLICKHOUSE_PASSWORD=$CLICKHOUSE_PASSWORD/" .env
sed -i '' "s/CLICKHOUSE_DATABASE=.*/CLICKHOUSE_DATABASE=$DB_NAME/" .env
Step 4: Run the indexer
npm run dev
Performance Considerations
Sync Speed Factors
-
Start block range:
- Smaller range = faster sync
- 1M blocks: 5-10 minutes
- 5M blocks: 30-60 minutes
- Full chain: 2-4 hours
-
Filtering type:
- Contract events (fastest): Events from specific contracts
- Token pair filtering (medium): Factory pattern with filters
- Address filtering (slowest): Requires scanning all transfers
-
Number of contracts tracked:
- Fewer contracts = faster processing
- Start with 1-3 key tokens, expand later if needed
Quick Testing Strategy
For fast iteration during development:
-
Start with recent blocks (last 1-2 weeks):
range: { from: '21,000,000' } -
Test with limited contracts:
contracts: ['0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2'] // Just WETH -
Once working, expand the range and contracts
Key SDK Patterns (Latest)
Event Parameter Filtering
Filter events by indexed parameters at the source level for maximum performance:
events: {
transfers: {
event: commonAbis.erc20.events.Transfer,
params: {
from: ['0x87482e84503639466fad82d1dce97f800a410945'],
to: '0x10b32a54eeb05d2c9cd1423b4ad90c3671a2ed5f',
},
},
}
Factory Pattern
Track dynamically created contracts (e.g., Uniswap pools, MetaMorpho vaults). Use contractFactory() inside the contracts field of evmDecoder:
import { evmDecoder, contractFactory, contractFactoryStore } from '@subsquid/pipes/evm'
const swaps = evmDecoder({
range: { from: '12,369,621' },
contracts: contractFactory({
address: ['0x1f98431c8ad98523631ae4a59f267346ea31f984'],
event: factoryEvents.PoolCreated,
childAddressField: 'pool',
database: await contractFactoryStore({ path: './factory-pools.sqlite' }),
}),
events: {
swaps: poolEvents.Swap,
},
})
Factory pattern key concepts:
address: The factory contract(s) that emit creation eventsevent: The creation event (e.g.,PoolCreated,CreateMetaMorpho)parameter: The event parameter containing the child contract addressdatabase: SQLite database to persist discovered child contracts across restarts
Accessing factory event metadata in .pipe():
Each decoded event includes a factory property with the creation event's data:
.pipe((deposits) => ({
deposits: deposits.map((d) => ({
vault: d.contract, // child contract address
vaultName: d.factory?.event.name ?? '', // from creation event
asset: d.factory?.event.asset ?? '', // from creation event
sender: d.event.sender, // from the decoded event itself
})),
}))
Use d.factory?.event.<param> to access any parameter from the factory creation event. This is how you enrich child contract events with metadata from deployment (names, symbols, assets, etc.).
CRITICAL: Factory cold-start behavior. The factory pattern only discovers child contracts from the range.from block forward. If the factory deployed 100 contracts before your start block, those are NOT tracked. This means:
- You may see zero data for 30-60+ seconds while the indexer catches up to a block where the factory creates a new child contract
- This is normal, not a bug — but it can be alarming during testing
- To track ALL child contracts, set
range.fromto the factory's deployment block - For testing, a recent start block works but expect a delay before data appears
Restarting a factory indexer from scratch: The factory pattern stores discovered contracts in a SQLite file. If you need a clean restart:
# 1. Kill the indexer
# 2. Delete the SQLite database
rm <project-folder>/*.sqlite
# 3. Drop ClickHouse sync + data tables
docker exec <container> clickhouse-client --password <pw> \
--query "DROP TABLE IF EXISTS <db>.sync; DROP TABLE IF EXISTS <db>.<table1>; DROP TABLE IF EXISTS <db>.<table2>"
# 4. Restart
cd <project-folder> && npm run dev
### Adapting Factory Pattern for Any Protocol
The `uniswapV3Swaps` template uses the factory pattern for Uniswap pools, but you can apply it to **any** protocol that deploys child contracts via a factory. Here's the general approach:
1. **Identify the factory contract** — the contract that deploys child contracts
2. **Find the creation event** — the event emitted when a child is deployed (e.g., `PoolCreated`, `CreateMetaMorpho`, `CreateVault`)
3. **Identify the address parameter** — the event field containing the new child contract's address
4. **Generate ABIs** for both the factory and the child contracts:
```bash
# Generate factory ABI
npx @subsquid/evm-typegen@latest src/contracts \
<FACTORY_ADDRESS> --chain-id 1
# Generate child contract ABI (use any known child address)
npx @subsquid/evm-typegen@latest src/contracts \
<ANY_CHILD_CONTRACT_ADDRESS> --chain-id 1
- Wire up the factory pattern — replace the Uniswap-specific values with your protocol's:
contracts: contractFactory({ address: ['<FACTORY_ADDRESS>'], event: factoryEvents.YourCreationEvent, // e.g., CreateMetaMorpho childAddressField: 'childAddress', // event param with child address database: await contractFactoryStore({ path: './your-factory.sqlite' }), }), events: { yourEvents: childContractEvents.YourEvent, // events on child contracts },
Real-world example — MetaMorpho vaults:
- Factory:
0x1897A8997241C1cD4bD0698647e4EB7213535c24(MetaMorpho Factory V1.1) - Creation event:
CreateMetaMorphowithmetaMorpho(indexed address) - Child contracts: ERC-4626 vaults emitting
Deposit,Withdraw, etc. - Metadata from creation event:
name,symbol,asset— accessible viad.factory?.event
Topic0-Only Global Filtering (No Contracts Array)
Track events across all contracts that emit a specific event signature, without needing to know addresses upfront. Simply omit the contracts field from evmDecoder:
// Captures ReallocateSupply from ALL MetaMorpho vaults — no address list needed
const reallocations = evmDecoder({
range: { from: 18_900_000 },
events: {
ReallocateSupply: vaultAbi.ReallocateSupply,
ReallocateWithdraw: vaultAbi.ReallocateWithdraw,
},
// NO contracts field — topic0-only filtering
})
When to use topic0-only vs factory pattern:
| Topic0-Only | Factory Pattern | |
|---|---|---|
| Setup complexity | Minimal — just omit contracts |
Requires factory ABI, SQLite DB |
| Cold-start delay | None — instant data | 30-60s while discovering child contracts |
| Metadata enrichment | No — only d.contract for emitter address |
Yes — d.factory?.event gives creation params |
| False positives | Possible if event sig is generic | None — only tracks factory children |
| Best for | Unique events (ReallocateSupply, StrategyReported) | Generic events (Deposit, Transfer) on specific contracts |
Use topic0-only when:
- Event signature is unique to the protocol (won't match unrelated contracts)
- You want to capture events across ALL instances (all vaults, all pools)
- You don't need metadata from the factory creation event
- You want zero setup and instant data
Don't use topic0-only when:
- Event signature is generic (Transfer, Approval, Deposit) — matches too many unrelated contracts
- You need creation event metadata (vault names, asset addresses)
Real examples: Yearn V2 StrategyReported (evm/037), MetaMorpho ReallocateSupply/ReallocateWithdraw (evm/039)
Combining Multiple Decoders
Pass multiple named decoders as outputs to run them in a single pipeline (replaces old pipeComposite):
const stream = evmPortalStream({
id: 'my-indexer',
portal: '...',
outputs: {
transfers: evmDecoder({ events: { transfers: commonAbis.erc20.events.Transfer } }),
swaps: evmDecoder({ events: { swaps: uniswapV3Abi.events.Swap } }),
},
})
IMPORTANT: contracts field format. For static contracts, pass an array of address strings:
evmDecoder({ contracts: ['0xABC...', '0xDEF...'], events: { ... } }) // ✅ correct
evmDecoder({ contracts: [{ address: ['0xABC...'] }], events: { ... } }) // ❌ wrong — crashes with "contract.toLowerCase is not a function"
The object format is ONLY for the contractFactory() helper pattern.
Decoded Event Field Access in .pipe()
When using evmDecoder with a .pipe() transform, each decoded event d has these fields:
| Field | Type | Description |
|---|---|---|
d.event.* |
object | Decoded event parameters (e.g., d.event.vault, d.event.assets) |
d.block.number |
number | Block number |
d.rawEvent.transactionHash |
string | Transaction hash |
d.rawEvent.logIndex |
number | Log index within the block |
d.timestamp |
Date | Block timestamp as a JS Date object |
d.contract |
string | Contract address that emitted the event |
d.factory |
object/null | Factory metadata (only if using factory pattern) |
⚠️ Common mistake: Using
d.blockNumberord.txHash(which don't exist) instead ofd.block.numberandd.rawEvent.transactionHash. These silently returnundefined, stored as0or""in ClickHouse.
⚠️ DateTime64(3) timestamp gotcha: When ClickHouse uses
DateTime64(3, 'UTC'), pass ISO strings viad.timestamp.toISOString()withdate_time_input_format: 'best_effort'. Passing epoch seconds (e.g.,1700392127) to DateTime64(3) is misinterpreted as1970-01-20.
This differs from the CLI-generated enrichEvents helper, which flattens everything into { blockNumber, txHash, timestamp, ...eventParams }.
Target Configuration (ClickHouse)
import { clickhouseTarget } from '@subsquid/pipes/targets/clickhouse'
stream.pipeTo(clickhouseTarget({
client: createClient({ url: process.env.CLICKHOUSE_URL }),
onData: async (ctx, data) => {
await ctx.insert('transfers', data.transfers)
},
onRollback: async (ctx, range) => {
// Handle chain reorgs
},
}))
Target Configuration (PostgreSQL with Drizzle)
import { drizzleTarget } from '@subsquid/pipes/targets/drizzle/node-postgres'
stream.pipeTo(drizzleTarget({
db: drizzle(pool),
tables: [transfersTable],
onData: async (ctx, data) => {
await ctx.db.insert(transfersTable).values(data.transfers)
},
}))
Timestamp Handling (CRITICAL)
ClickHouse DateTime expects seconds, but d.timestamp.getTime() returns milliseconds. If you don't divide by 1000, timestamps will show as 1970-01-28 instead of the correct date.
When using the auto-generated enrichEvents helper
The CLI-generated enrichEvents function (in src/utils/index.ts) handles this correctly — it divides by 1000 internally. If your indexer uses enrichEvents, timestamps are fine:
// enrichEvents does this internally:
timestamp: new Date(v.timestamp).getTime() / 1000 // ← correct
When writing manual .pipe() transforms
If you write a custom .pipe() transform (e.g., to access factory metadata), you handle timestamps yourself. You MUST divide by 1000:
// WRONG — produces 1970 dates in ClickHouse
.pipe((deposits) => ({
deposits: deposits.map((d) => ({
timestamp: d.timestamp.getTime(), // milliseconds! ClickHouse stores as year 1970
})),
}))
// CORRECT — produces proper dates
.pipe((deposits) => ({
deposits: deposits.map((d) => ({
timestamp: Math.floor(d.timestamp.getTime() / 1000), // seconds
})),
}))
When to use enrichEvents vs manual .pipe()
| Scenario | Approach |
|---|---|
| Standard contract events, no factory metadata needed | Use enrichEvents (auto-generated) — handles timestamps, block data, tx hash automatically |
| Need factory metadata (vault names, asset addresses from creation event) | Write manual .pipe() — access d.factory?.event fields, but handle timestamps yourself |
| Need to combine factory metadata + standard enrichment | Write manual .pipe() and include Math.floor(d.timestamp.getTime() / 1000) |
Pipes Best Practices
Single Source of Truth (Database-Centric)
State should live in the database, not in process memory:
- Prevents data loss from crashes or restarts
- Enables recovery and replay from any point
- Use idempotent inserts/updates
ClickHouse patterns:
- Materialized views for derived metrics
- SummingMergeTree for additive aggregations
- AggregatingMergeTree for complex metrics
- CollapsingMergeTree for reorg handling
PostgreSQL patterns:
- ON CONFLICT UPDATE clauses for state reconciliation
- Ensures safe re-runs without duplicates
Block-Aware Pipelines
Always start collection at or before contract deployment block:
- Critical for metrics requiring complete event history
- Ensures historical accuracy from block 0
- Check deployment block on Etherscan before configuring
Reorg Handling (ClickHouse)
For reorg-sensitive events, use CollapsingMergeTree:
// Schema with sign field
CREATE TABLE events (
...
sign Int8
) ENGINE = CollapsingMergeTree(sign)
ORDER BY (entity_id, block_number, tx_hash, event_type)
// Rollback handler
onRollback: (ctx, range) => {
// Insert sign=-1 records for rolled-back blocks
const rollbackRecords = events
.filter(e => e.block >= range.from)
.map(e => ({ ...e, sign: -1 }))
return ctx.insert(rollbackRecords)
}
Critical: ORDER BY must include ALL distinguishing fields to prevent unwanted event deduplication.
Event-Based vs. State Queries
Indexers track historical flow (event-based):
- Example: "User deposited 100, withdrew 110" = -10 net flow
- Good for: Transaction history, activity tracking, audit logs
RPC queries track current state:
- Example: "User currently holds 50 shares" = current balance
- Good for: Current positions, real-time snapshots
Important: Withdrawals including accrued interest can make event flows appear negative even when positions are positive. Use RPC for current balances, events for historical analysis.
Validation Requirements
Always validate indexed data before production use:
- Cross-reference sample transactions with block explorer
- Verify event counts match expected ranges
- Check for missing blocks or gaps
- Reconcile aggregated metrics with known totals
Troubleshooting
CLI Issues
"Network timeout with npx"
- Check internet connection
- Try again or wait a moment
- Ensure npm registry is accessible
"Template 'uniswap-v3-swaps' not found"
- Use camelCase:
uniswapV3Swapsnotuniswap-v3-swaps - Run
npx @iankressin/pipes-cli@latest init --schemato see available templates
"Template ID not recognized"
- Run
--schemaflag to verify available templates and their exact IDs - Ensure you're using the latest CLI version with
@latest
Database Issues
"Authentication failed: password is incorrect"
- Check actual password:
docker inspect <container> | grep CLICKHOUSE_PASSWORD - Update .env file with correct password
"Database pipes does not exist"
- Create it:
docker exec <container> clickhouse-client --query "CREATE DATABASE IF NOT EXISTS pipes"
"port is already allocated"
- Use existing container instead of starting new one
"Indexer starts from wrong block / Missing data"
- MOST COMMON ISSUE: Shared sync table between projects
- Clear the sync table:
docker exec <container> clickhouse-client --query "DROP TABLE IF EXISTS pipes.sync" - Restart the indexer - it will now start from the configured block
Common DeFi Protocol Fork Patterns
Many DeFi protocols are forks of well-known codebases. Recognizing forks saves time — you can reuse the same event ABI with just a different proxy address.
Aave V3 forks (same Pool ABI — Supply, Withdraw, Borrow, Repay, LiquidationCall, FlashLoan events):
- SparkLend (Sky/MakerDAO ecosystem) — Pool:
0xC13e21B648A5Ee794902342038FF3aDAB66BE987 - Radiant Capital, Seamless Protocol, Granary Finance, etc.
- Implementation ABI works across all forks — just update the proxy address
Uniswap V2 forks (same Pair/Factory ABI — Swap, Mint, Burn, Sync events):
- SushiSwap, PancakeSwap, TraderJoe, Camelot, etc.
Compound V2 forks (same cToken ABI — Mint, Redeem, Borrow, RepayBorrow, LiquidateBorrow):
- Venus, Benqi, Tectonic, etc.
How to detect: Check the protocol's docs for "forked from" mentions, or compare the contract's event signatures against known ABIs. If the topic0 hashes match, it's a fork.
Related Skills
- ENVIRONMENT_SETUP.md - Verify environment first
- pipes-troubleshooting - Fix issues
- DEPLOYMENT.md - Local and cloud deployment
Related Documentation
This skill includes comprehensive reference documentation in the references/ directory:
- ENVIRONMENT_SETUP.md - Development environment setup guide, prerequisites check, platform-specific notes, and troubleshooting
- ABI_GUIDE.md - Fetching contract ABIs,
commonAbisusage, proxy contract detection and handling, TypeScript type generation - SCHEMA_GUIDE.md - ClickHouse engine selection, ORDER BY strategy, BigInt handling, partitioning patterns
- RESEARCH_CHECKLIST.md - Protocol research workflow, contract discovery, deployment block finding, common gotchas
- HYPERLIQUID_GUIDE.md - Complete Hyperliquid fills indexer setup, API reference, query builder usage, and performance benchmarks
How to Access
cat pipes-sdk/pipes-new-indexer/references/ABI_GUIDE.md
cat pipes-sdk/pipes-new-indexer/references/SCHEMA_GUIDE.md
cat pipes-sdk/pipes-new-indexer/references/RESEARCH_CHECKLIST.md
Additional Resources
For comprehensive patterns and workflows:
- PATTERNS.md - EVM patterns, troubleshooting, and performance optimization
Official Subsquid Documentation
- llms.txt - Quick reference for Pipes SDK
- llms-full.txt - Complete Subsquid documentation
- skill.md - Comprehensive Pipes SDK guide
- Available Datasets - All supported networks and chains
More from subsquid-labs/agent-skills
pipes-troubleshooting
Diagnoses and fixes runtime errors in blockchain indexers. Handles compilation errors, database issues, Portal API failures, and data quality problems.
14portal-query-evm-transactions
Query EVM transactions using SQD Portal. Track wallet activity, function calls, and contract interactions with transaction filters.
10pipes-performance
Analyzes and optimizes blockchain indexer sync performance to reduce sync time while maintaining data completeness.
9portal-query
Query blockchain data across 210+ chains using SQD Portal. Covers EVM logs/transactions/traces, Solana instructions, Substrate events/calls/extrinsics, Hyperliquid fills, and Bitcoin transactions/inputs/outputs with dataset discovery and verification.
6portal-query-evm-traces
Query EVM traces for internal transactions and contract deployments. Track CREATE operations, internal calls, and delegatecall patterns.
6portal-dataset-discovery
Find and verify correct SQD Portal dataset names for blockchain queries. Learn Portal naming conventions and dataset verification methods.
6