skills/patricio0312rev/skills/etl-sync-job-builder

etl-sync-job-builder

SKILL.md

ETL/Sync Job Builder

Build reliable, incremental data synchronization pipelines.

ETL Job Pattern

// jobs/sync-users.ts
interface SyncJob {
  name: string;
  source: "database" | "api" | "file";
  destination: "database" | "warehouse" | "s3";
  schedule: string;
}

export class ETLJob {
  constructor(private name: string, private watermarkKey: string) {}

  async run() {
    console.log(`🔄 Starting ${this.name}...`);

    try {
      // 1. Get last watermark
      const lastSync = await this.getWatermark();
      console.log(`  Last sync: ${lastSync}`);

      // 2. Extract data
      const data = await this.extract(lastSync);
      console.log(`  Extracted ${data.length} records`);

      // 3. Transform data
      const transformed = await this.transform(data);

      // 4. Load data
      await this.load(transformed);

      // 5. Update watermark
      await this.updateWatermark(new Date());

      console.log(`${this.name} complete`);
    } catch (error) {
      console.error(`${this.name} failed:`, error);
      throw error;
    }
  }

  private async extract(since: Date) {
    // Extract logic
    return [];
  }

  private async transform(data: any[]) {
    // Transform logic
    return data;
  }

  private async load(data: any[]) {
    // Load logic
  }

  private async getWatermark(): Promise<Date> {
    const watermark = await prisma.syncWatermark.findUnique({
      where: { key: this.watermarkKey },
    });
    return watermark?.lastSync || new Date(0);
  }

  private async updateWatermark(timestamp: Date) {
    await prisma.syncWatermark.upsert({
      where: { key: this.watermarkKey },
      create: { key: this.watermarkKey, lastSync: timestamp },
      update: { lastSync: timestamp },
    });
  }
}

Watermark Strategy

// Track sync progress
model SyncWatermark {
  key      String   @id
  lastSync DateTime
  metadata Json?

  @@index([lastSync])
}
// Incremental sync using watermark
async function syncOrdersIncremental() {
  // Get last sync time
  const watermark = await prisma.syncWatermark.findUnique({
    where: { key: "orders_sync" },
  });

  const lastSync = watermark?.lastSync || new Date(0);

  // Fetch only new/updated records
  const newOrders = await sourceDb.order.findMany({
    where: {
      updated_at: { gt: lastSync },
    },
    orderBy: { updated_at: "asc" },
  });

  console.log(`📦 Syncing ${newOrders.length} orders...`);

  // Process in batches
  for (let i = 0; i < newOrders.length; i += 100) {
    const batch = newOrders.slice(i, i + 100);

    await destinationDb.order.createMany({
      data: batch,
      skipDuplicates: true, // Idempotency
    });
  }

  // Update watermark to latest record
  if (newOrders.length > 0) {
    const latestTimestamp = newOrders[newOrders.length - 1].updated_at;

    await prisma.syncWatermark.upsert({
      where: { key: "orders_sync" },
      create: { key: "orders_sync", lastSync: latestTimestamp },
      update: { lastSync: latestTimestamp },
    });
  }

  console.log(`✅ Sync complete`);
}

Idempotent Upsert Pattern

// Idempotent sync - safe to run multiple times
async function syncUsersIdempotent(users: User[]) {
  for (const user of users) {
    await prisma.user.upsert({
      where: { id: user.id },
      create: user,
      update: {
        email: user.email,
        name: user.name,
        updated_at: user.updated_at,
      },
    });
  }
}

// Batch upsert for better performance
async function syncUsersBatch(users: User[]) {
  // PostgreSQL: Use ON CONFLICT
  await prisma.$executeRaw`
    INSERT INTO users (id, email, name, updated_at)
    SELECT * FROM UNNEST(
      ${users.map((u) => u.id)}::bigint[],
      ${users.map((u) => u.email)}::text[],
      ${users.map((u) => u.name)}::text[],
      ${users.map((u) => u.updated_at)}::timestamp[]
    )
    ON CONFLICT (id) DO UPDATE SET
      email = EXCLUDED.email,
      name = EXCLUDED.name,
      updated_at = EXCLUDED.updated_at
    WHERE users.updated_at < EXCLUDED.updated_at
  `;
}

Retry Logic with Exponential Backoff

async function syncWithRetry<T>(
  operation: () => Promise<T>,
  maxRetries: number = 3,
  baseDelay: number = 1000
): Promise<T> {
  for (let attempt = 0; attempt <= maxRetries; attempt++) {
    try {
      return await operation();
    } catch (error) {
      if (attempt === maxRetries) throw error;

      const delay = baseDelay * Math.pow(2, attempt);
      console.log(`  Retry ${attempt + 1}/${maxRetries} after ${delay}ms`);
      await sleep(delay);
    }
  }

  throw new Error("Max retries exceeded");
}

// Usage
await syncWithRetry(
  async () => {
    return await syncOrders();
  },
  3,
  1000
);

Change Data Capture (CDC)

// Listen to database changes
import { PrismaClient } from "@prisma/client";

const prisma = new PrismaClient();

// PostgreSQL: Listen to logical replication
async function setupCDC() {
  await prisma.$executeRaw`
    CREATE PUBLICATION orders_publication FOR TABLE orders;
  `;

  // Subscribe to changes (using pg library)
  const client = await pg.connect();

  client.query("LISTEN orders_changed;");

  client.on("notification", async (msg) => {
    const change = JSON.parse(msg.payload);

    if (change.operation === "INSERT" || change.operation === "UPDATE") {
      await syncOrder(change.data);
    }
  });
}

Conflict Resolution

interface ConflictResolution {
  strategy: "source-wins" | "dest-wins" | "latest-wins" | "merge";
}

async function syncWithConflictResolution(
  sourceRecord: any,
  destRecord: any,
  strategy: ConflictResolution["strategy"]
) {
  if (strategy === "source-wins") {
    return sourceRecord;
  }

  if (strategy === "dest-wins") {
    return destRecord;
  }

  if (strategy === "latest-wins") {
    return sourceRecord.updated_at > destRecord.updated_at
      ? sourceRecord
      : destRecord;
  }

  if (strategy === "merge") {
    // Merge non-null fields
    return {
      ...destRecord,
      ...Object.fromEntries(
        Object.entries(sourceRecord).filter(([_, v]) => v != null)
      ),
    };
  }
}

Monitoring & Observability

// Track sync job metrics
interface SyncMetrics {
  jobName: string;
  startTime: Date;
  endTime: Date;
  recordsProcessed: number;
  recordsInserted: number;
  recordsUpdated: number;
  recordsSkipped: number;
  errors: number;
  durationMs: number;
}

async function logSyncMetrics(metrics: SyncMetrics) {
  await prisma.syncMetric.create({
    data: metrics,
  });

  console.log(`
📊 Sync Metrics
  Job: ${metrics.jobName}
  Records: ${metrics.recordsProcessed}
  Inserted: ${metrics.recordsInserted}
  Updated: ${metrics.recordsUpdated}
  Errors: ${metrics.errors}
  Duration: ${metrics.durationMs}ms
  `);
}

Full ETL Job Example

// jobs/sync-orders-to-warehouse.ts
export class OrdersETLJob extends ETLJob {
  constructor() {
    super("orders-etl", "orders_warehouse_sync");
  }

  async extract(since: Date): Promise<Order[]> {
    return prisma.order.findMany({
      where: {
        updated_at: { gt: since },
      },
      include: {
        items: true,
        user: true,
      },
      orderBy: { updated_at: "asc" },
    });
  }

  async transform(orders: Order[]): Promise<WarehouseOrder[]> {
    return orders.map((order) => ({
      order_id: order.id,
      user_email: order.user.email,
      total_amount: order.total,
      item_count: order.items.length,
      status: order.status,
      order_date: order.created_at,
      synced_at: new Date(),
    }));
  }

  async load(data: WarehouseOrder[]): Promise<void> {
    const batchSize = 100;

    for (let i = 0; i < data.length; i += batchSize) {
      const batch = data.slice(i, i + batchSize);

      await warehouseDb.$executeRaw`
        INSERT INTO orders_fact (
          order_id, user_email, total_amount, item_count,
          status, order_date, synced_at
        )
        VALUES ${batch
          .map(
            (o) => `(
          ${o.order_id}, '${o.user_email}', ${o.total_amount},
          ${o.item_count}, '${o.status}', '${o.order_date}',
          '${o.synced_at}'
        )`
          )
          .join(",")}
        ON CONFLICT (order_id) DO UPDATE SET
          total_amount = EXCLUDED.total_amount,
          status = EXCLUDED.status,
          synced_at = EXCLUDED.synced_at
      `;
    }
  }
}

// Run job
new OrdersETLJob().run();

Scheduling

// Schedule ETL jobs
import cron from "node-cron";

// Run every hour
cron.schedule("0 * * * *", async () => {
  await new OrdersETLJob().run();
});

// Run every 15 minutes
cron.schedule("*/15 * * * *", async () => {
  await syncUsersIncremental();
});

// Run nightly at 2 AM
cron.schedule("0 2 * * *", async () => {
  await fullDataSync();
});

Error Handling & Recovery

async function syncWithErrorHandling() {
  const checkpoint = await getCheckpoint();
  let processedRecords = 0;

  try {
    const records = await fetchRecords(checkpoint);

    for (const record of records) {
      try {
        await processRecord(record);
        processedRecords++;

        // Save checkpoint every 100 records
        if (processedRecords % 100 === 0) {
          await saveCheckpoint(record.id);
        }
      } catch (error) {
        // Log error but continue
        console.error(`Failed to process record ${record.id}:`, error);
        await logFailedRecord(record.id, error);
      }
    }

    await saveCheckpoint("completed");
  } catch (error) {
    // Critical failure - job will retry from checkpoint
    console.error("Job failed:", error);
    throw error;
  }
}

Best Practices

  1. Incremental sync: Use watermarks, don't full-scan
  2. Idempotent operations: Safe to retry
  3. Batch processing: Process 100-1000 records at a time
  4. Checkpointing: Resume from failure point
  5. Retry with backoff: Handle transient failures
  6. Monitor metrics: Track job health
  7. Test thoroughly: Including failure scenarios

Output Checklist

  • ETL job class created
  • Watermark tracking implemented
  • Incremental sync logic
  • Idempotent upsert operations
  • Retry logic with backoff
  • Conflict resolution strategy
  • Monitoring and metrics
  • Error handling and recovery
  • Job scheduling configured
  • Testing including failure cases
Weekly Installs
11
First Seen
10 days ago
Installed on
claude-code9
gemini-cli8
antigravity8
codex8
opencode8
windsurf7