sinks

SKILL.md

Sinks in Effect

Overview

A Sink is a consumer of stream elements that produces a result:

Sink<A, In, L, E, R>;
// A  - Result type (what sink produces)
// In - Input element type (what sink consumes)
// L  - Leftover type (unconsumed elements)
// E  - Error type
// R  - Required environment

Sinks are the counterpart to Streams - while Streams produce data, Sinks consume it.

Built-in Sinks

Collecting Elements

import { Stream, Sink } from "effect";

const all = yield * Stream.make(1, 2, 3, 4, 5).pipe(Stream.run(Sink.collectAll()));

const array = yield * Stream.make(1, 2, 3).pipe(Stream.run(Sink.collectAllToArray()));

const firstThree = yield * Stream.range(1, 100).pipe(Stream.run(Sink.collectAllN(3)));

const whileSmall = yield * Stream.iterate(1, (n) => n + 1).pipe(Stream.run(Sink.collectAllWhile((n) => n < 5)));

Aggregation Sinks

const total = yield * Stream.make(1, 2, 3, 4, 5).pipe(Stream.run(Sink.sum));

const count = yield * Stream.make("a", "b", "c").pipe(Stream.run(Sink.count));

const first = yield * Stream.make(1, 2, 3).pipe(Stream.run(Sink.head));

const last = yield * Stream.make(1, 2, 3).pipe(Stream.run(Sink.last));

const taken = yield * Stream.make(1, 2, 3, 4, 5).pipe(Stream.run(Sink.take(3)));

Folding Sinks

const product = yield * Stream.make(1, 2, 3, 4, 5).pipe(Stream.run(Sink.foldLeft(1, (acc, n) => acc * n)));

const sumUntil100 =
  yield *
  Stream.iterate(1, (n) => n + 1).pipe(
    Stream.run(
      Sink.fold(
        0,
        (sum) => sum < 100,
        (sum, n) => sum + n,
      ),
    ),
  );

const foldWithLog = Sink.foldEffect(
  0,
  (sum) => sum < 100,
  (sum, n) =>
    Effect.gen(function* () {
      yield* Effect.log(`Adding ${n} to ${sum}`);
      return sum + n;
    }),
);

Side Effect Sinks

yield * Stream.make(1, 2, 3).pipe(Stream.run(Sink.forEach((n) => Effect.log(`Got: ${n}`))));

yield * Stream.make(1, 2, 3).pipe(Stream.run(Sink.drain));

Creating Custom Sinks

Sink.make

const maxSink = Sink.make<number, number, never, never, never>(
  // Initial state
  Number.NEGATIVE_INFINITY,
  // Process each element
  (max, n) => (n > max ? n : max),
  // Extract result
  (max) => max,
);

const max = yield * Stream.make(3, 1, 4, 1, 5, 9).pipe(Stream.run(maxSink)); // 9

Sink.fromEffect

const logAndReturn = <A>(label: string) =>
  Sink.fromEffect(
    Effect.gen(function* () {
      yield* Effect.log(`Starting ${label}`);
      return [] as A[];
    }),
  );

Sink.fromPush

For more control over the sink lifecycle:

const customSink = Sink.fromPush<number, number, never, never>((input) =>
  Effect.sync(() =>
    Option.match(input, {
      onNone: () => Either.left(finalResult), // Stream ended
      onSome: (chunk) => {
        // Process chunk
        // Return Either.right to continue, Either.left to finish
        return Either.right(undefined);
      },
    }),
  ),
);

Sink Operations

Transforming Sinks

const doubledSum = Sink.sum.pipe(Sink.map((sum) => sum * 2));

const lengthSum = Sink.sum.pipe(Sink.contramap((s: string) => s.length));

const processStrings = Sink.sum.pipe(
  Sink.dimap(
    (s: string) => s.length,
    (sum) => `Total length: ${sum}`,
  ),
);

Combining Sinks

const sumAndCount = Sink.zip(Sink.sum, Sink.count);

const [sum, count] = yield * Stream.make(1, 2, 3, 4, 5).pipe(Stream.run(sumAndCount));

const firstOrSum = Sink.race(Sink.head, Sink.sum.pipe(Sink.map(Option.some)));

Filtering

const sumPositive = Sink.sum.pipe(Sink.filterInput((n: number) => n > 0));

const result = yield * Stream.make(-1, 2, -3, 4, -5).pipe(Stream.run(sumPositive));

Leftovers

Sinks can leave unconsumed elements:

const takeThree = Sink.take<number>(3);

const [first, rest] =
  yield * Stream.make(1, 2, 3, 4, 5).pipe(Stream.run(Sink.take<number>(3).pipe(Sink.collectLeftover)));

Sink Concurrency

Parallel Sinks

const parallelSinks = Sink.zipPar(Sink.sum, Sink.count, Sink.collectAll<number>());

const [sum, count, all] = yield * Stream.make(1, 2, 3, 4, 5).pipe(Stream.run(parallelSinks));

Chunked Processing

const chunkedSum = Sink.foldChunks(
  0,
  () => true,
  (sum, chunk: Chunk.Chunk<number>) => sum + Chunk.reduce(chunk, 0, (a, b) => a + b),
);

Common Patterns

Batched Database Insert

const batchInsert = (batchSize: number) =>
  Sink.collectAllN<Record>(batchSize).pipe(
    Sink.mapEffect((batch) => Effect.tryPromise(() => db.insertMany(Chunk.toArray(batch)))),
  );

yield * recordStream.pipe(Stream.run(batchInsert(100)));

Aggregation Pipeline

const stats = Sink.zip(Sink.sum, Sink.zip(Sink.count, Sink.zip(Sink.head, Sink.last))).pipe(
  Sink.map(([sum, [count, [first, last]]]) => ({
    sum,
    count,
    average: count > 0 ? sum / count : 0,
    first,
    last,
  })),
);

Write to File

const writeToFile = (path: string) =>
  Sink.forEach((line: string) =>
    Effect.gen(function* () {
      const fs = yield* FileSystem;
      yield* fs.appendFileString(path, line + "\n");
    }),
  );

Best Practices

  1. Use built-in sinks when possible - Optimized and tested
  2. Combine sinks with zip - Run multiple aggregations in one pass
  3. Use foldChunks for efficiency - Process chunks, not elements
  4. Handle leftovers - Consider unconsumed elements
  5. Prefer Sink over forEach - More composable

Additional Resources

For comprehensive sink documentation, consult ${CLAUDE_PLUGIN_ROOT}/references/llms-full.txt.

Search for these sections:

  • "Creating Sinks" for sink construction
  • "Sink Operations" for transformations
  • "Sink Concurrency" for parallel processing
  • "Leftovers" for handling unconsumed elements
Weekly Installs
8
GitHub Stars
5
First Seen
Jan 24, 2026
Installed on
claude-code7
opencode6
gemini-cli6
github-copilot6
codex6
antigravity5