es-ingest

SKILL.md

ES Ingest

Stream-based ingestion and transformation of large data files into Elasticsearch. Built on node-es-transformer.

Features

  • Stream-based: Handle very large files (20-30 GB tested) without running out of memory
  • High throughput: Up to 20k documents/second on commodity hardware
  • Cross-version: Seamlessly migrate between ES 8.x and 9.x
  • Formats: NDJSON, CSV, Parquet, Arrow IPC
  • Transformations: Apply custom JavaScript transforms during ingestion
  • Reindexing: Copy and transform existing indices
  • Wildcards: Ingest multiple files matching a pattern (e.g., logs/*.json)
  • Document splitting: Transform one source document into multiple targets

Prerequisites

  • Elasticsearch 8.x or 9.x accessible (local or remote)
  • Node.js 22+ installed

Setup

Before first use, install dependencies:

cd {baseDir} && npm install

Basic Usage

Ingest a JSON file

{baseDir}/scripts/ingest.js --file data.json --target my-index

Stream NDJSON/CSV via stdin

# NDJSON
cat data.ndjson | {baseDir}/scripts/ingest.js --stdin --target my-index

# CSV
cat data.csv | {baseDir}/scripts/ingest.js --stdin --source-format csv --target my-index

Ingest CSV directly

{baseDir}/scripts/ingest.js --file users.csv --source-format csv --target users

Ingest Parquet directly

{baseDir}/scripts/ingest.js --file users.parquet --source-format parquet --target users

Ingest Arrow IPC directly

{baseDir}/scripts/ingest.js --file users.arrow --source-format arrow --target users

CSV/Parquet/Arrow support requires node-es-transformer >= 1.2.0.

Ingest CSV with parser options

# csv-options.json
# {
#   "columns": true,
#   "delimiter": ";",
#   "trim": true
# }

{baseDir}/scripts/ingest.js --file users.csv --source-format csv --csv-options csv-options.json --target users

Infer mappings/pipeline from CSV

{baseDir}/scripts/ingest.js --file users.csv --source-format csv --infer-mappings --target users

Infer mappings with options

# infer-options.json
# {
#   "sampleBytes": 200000,
#   "lines_to_sample": 2000
# }

{baseDir}/scripts/ingest.js --file users.csv --source-format csv --infer-mappings --infer-mappings-options infer-options.json --target users

Ingest with custom mappings

{baseDir}/scripts/ingest.js --file data.json --target my-index --mappings mappings.json

Ingest with transformation

{baseDir}/scripts/ingest.js --file data.json --target my-index --transform transform.js

Reindex from another index

{baseDir}/scripts/ingest.js --source-index old-index --target new-index

Cross-cluster reindex (ES 8.x → 9.x)

{baseDir}/scripts/ingest.js --source-index logs \
  --node https://es8.example.com:9200 --api-key es8-key \
  --target new-logs \
  --target-node https://es9.example.com:9200 --target-api-key es9-key

Command Reference

Required Options

--target <index>         # Target index name

Source Options (choose one)

--file <path>            # Source file (supports wildcards, e.g., logs/*.json)
--source-index <name>    # Source Elasticsearch index
--stdin                  # Read NDJSON/CSV from stdin

Elasticsearch Connection

--node <url>             # ES node URL (default: http://localhost:9200)
--api-key <key>          # API key authentication
--username <user>        # Basic auth username
--password <pass>        # Basic auth password

Target Connection (for cross-cluster)

--target-node <url>      # Target ES node URL (uses --node if not specified)
--target-api-key <key>   # Target API key
--target-username <user> # Target username
--target-password <pass> # Target password

Index Configuration

--mappings <file.json>          # Mappings file (auto-copy from source if reindexing)
--infer-mappings                # Infer mappings/pipeline from file/stream
--infer-mappings-options <file> # Options for inference (JSON file)
--delete-index                  # Delete target index if exists
--pipeline <name>               # Ingest pipeline name

Processing

--transform <file.js>    # Transform function (export as default or module.exports)
--query <file.json>      # Query file to filter source documents
--source-format <fmt>    # Source format: ndjson|csv|parquet|arrow (default: ndjson)
--csv-options <file>     # CSV parser options (JSON file)
--skip-header            # Skip first line (e.g., CSV header)

Performance

--buffer-size <kb>       # Buffer size in KB (default: 5120)
--search-size <n>        # Docs per search when reindexing (default: 100)
--total-docs <n>         # Total docs for progress bar (file/stream)
--stall-warn-seconds <n> # Stall warning threshold (default: 30)
--progress-mode <mode>   # Progress output: auto|line|newline (default: auto)
--debug-events           # Log pause/resume/stall events
--quiet                  # Disable progress bars

Transform Functions

Transform functions let you modify documents during ingestion. Create a JavaScript file that exports a transform function:

Basic Transform (transform.js)

// ES modules (default)
export default function transform(doc) {
  return {
    ...doc,
    full_name: `${doc.first_name} ${doc.last_name}`,
    timestamp: new Date().toISOString(),
  };
}

// Or CommonJS
module.exports = function transform(doc) {
  return {
    ...doc,
    full_name: `${doc.first_name} ${doc.last_name}`,
  };
};

Skip Documents

Return null or undefined to skip a document:

export default function transform(doc) {
  // Skip invalid documents
  if (!doc.email || !doc.email.includes('@')) {
    return null;
  }
  return doc;
}

Split Documents

Return an array to create multiple target documents from one source:

export default function transform(doc) {
  // Split a tweet into multiple hashtag documents
  const hashtags = doc.text.match(/#\w+/g) || [];
  return hashtags.map(tag => ({
    hashtag: tag,
    tweet_id: doc.id,
    created_at: doc.created_at,
  }));
}

Mappings

Auto-Copy Mappings (Reindexing)

When reindexing, mappings are automatically copied from the source index:

{baseDir}/scripts/ingest.js --source-index old-logs --target new-logs

Custom Mappings (mappings.json)

{
  "properties": {
    "@timestamp": { "type": "date" },
    "message": { "type": "text" },
    "user": {
      "properties": {
        "name": { "type": "keyword" },
        "email": { "type": "keyword" }
      }
    }
  }
}
{baseDir}/scripts/ingest.js --file data.json --target my-index --mappings mappings.json

Query Filters

Filter source documents during reindexing with a query file:

Query File (filter.json)

{
  "range": {
    "@timestamp": {
      "gte": "2024-01-01",
      "lt": "2024-02-01"
    }
  }
}
{baseDir}/scripts/ingest.js \
  --source-index logs \
  --target filtered-logs \
  --query filter.json

Common Patterns

Pattern 1: Load CSV with custom mappings

# 1. Create mappings.json with your schema
cat > mappings.json << 'EOF'
{
  "properties": {
    "timestamp": { "type": "date" },
    "user_id": { "type": "keyword" },
    "action": { "type": "keyword" },
    "value": { "type": "double" }
  }
}
EOF

# 2. Ingest CSV (skip header row)
{baseDir}/scripts/ingest.js \
  --file events.csv \
  --target events \
  --mappings mappings.json \
  --skip-header

Pattern 2: Migrate ES 8.x → 9.x with transforms

# 1. Create transform.js to update document structure
cat > transform.js << 'EOF'
export default function transform(doc) {
  // Update field names for ES 9.x compatibility
  return {
    ...doc,
    '@timestamp': doc.timestamp,  // Rename field
    user: {
      id: doc.user_id,
      name: doc.user_name,
    },
  };
}
EOF

# 2. Migrate with transform
{baseDir}/scripts/ingest.js \
  --source-index logs \
  --node https://es8-cluster.example.com:9200 \
  --api-key $ES8_API_KEY \
  --target logs-v2 \
  --target-node https://es9-cluster.example.com:9200 \
  --target-api-key $ES9_API_KEY \
  --transform transform.js

Pattern 3: Reindex with filtering and deletion

# 1. Create filter query
cat > filter.json << 'EOF'
{
  "bool": {
    "must": [
      { "range": { "@timestamp": { "gte": "now-30d" } } }
    ],
    "must_not": [
      { "term": { "status": "deleted" } }
    ]
  }
}
EOF

# 2. Reindex with filter (delete old index first)
{baseDir}/scripts/ingest.js \
  --source-index logs-raw \
  --target logs-filtered \
  --query filter.json \
  --delete-index

Pattern 4: Batch ingest multiple files

# Ingest all JSON files in a directory
{baseDir}/scripts/ingest.js \
  --file "logs/*.json" \
  --target combined-logs \
  --mappings mappings.json

Pattern 5: Document enrichment during ingestion

# 1. Create enrichment transform
cat > enrich.js << 'EOF'
export default function transform(doc) {
  return {
    ...doc,
    enriched_at: new Date().toISOString(),
    source: 'batch-import',
    year: new Date(doc.timestamp).getFullYear(),
  };
}
EOF

# 2. Ingest with enrichment
{baseDir}/scripts/ingest.js \
  --file data.json \
  --target enriched-data \
  --transform enrich.js

Performance Tuning

For Large Files (>5GB)

# Increase buffer size for better throughput
{baseDir}/scripts/ingest.js \
  --file huge-file.json \
  --target my-index \
  --buffer-size 10240  # 10 MB buffer

For Slow Networks

# Reduce batch size to avoid timeouts
{baseDir}/scripts/ingest.js \
  --source-index remote-logs \
  --target local-logs \
  --search-size 50  # Smaller batches

Quiet Mode (for scripts)

# Disable progress bars for automated scripts
{baseDir}/scripts/ingest.js \
  --file data.json \
  --target my-index \
  --quiet

When to Use

Use this skill when you need to:

  • Load large files into Elasticsearch without memory issues
  • Migrate indices between ES versions (8.x ↔ 9.x)
  • Transform data during ingestion (enrich, split, filter)
  • Reindex with modifications (rename fields, restructure documents)
  • Batch process multiple files matching a pattern
  • Cross-cluster replication with transformations

When NOT to Use

Consider alternatives for:

Troubleshooting

Connection refused

Elasticsearch is not running or URL is incorrect:

# Test connection
curl http://localhost:9200

# Or with auth
curl -H "Authorization: ApiKey $API_KEY" https://es.example.com:9200

Out of memory errors

Reduce buffer size:

{baseDir}/scripts/ingest.js --file data.json --target my-index --buffer-size 2048

Transform function not loading

Ensure the transform file exports correctly:

// ✓ Correct (ES modules)
export default function transform(doc) { /* ... */ }

// ✓ Correct (CommonJS)
module.exports = function transform(doc) { /* ... */ }

// ✗ Wrong
function transform(doc) { /* ... */ }

Mapping conflicts

Delete and recreate the index:

{baseDir}/scripts/ingest.js \
  --file data.json \
  --target my-index \
  --mappings mappings.json \
  --delete-index

References

Weekly Installs
5
GitHub Stars
7
First Seen
Feb 14, 2026
Installed on
cursor4
pi3
gemini-cli3
github-copilot3
codex3
kimi-cli3