sinks
SKILL.md
Sinks in Effect
Overview
A Sink is a consumer of stream elements that produces a result:
Sink<A, In, L, E, R>;
// A - Result type (what sink produces)
// In - Input element type (what sink consumes)
// L - Leftover type (unconsumed elements)
// E - Error type
// R - Required environment
Sinks are the counterpart to Streams - while Streams produce data, Sinks consume it.
Built-in Sinks
Collecting Elements
import { Stream, Sink } from "effect";
const all = yield * Stream.make(1, 2, 3, 4, 5).pipe(Stream.run(Sink.collectAll()));
const array = yield * Stream.make(1, 2, 3).pipe(Stream.run(Sink.collectAllToArray()));
const firstThree = yield * Stream.range(1, 100).pipe(Stream.run(Sink.collectAllN(3)));
const whileSmall = yield * Stream.iterate(1, (n) => n + 1).pipe(Stream.run(Sink.collectAllWhile((n) => n < 5)));
Aggregation Sinks
const total = yield * Stream.make(1, 2, 3, 4, 5).pipe(Stream.run(Sink.sum));
const count = yield * Stream.make("a", "b", "c").pipe(Stream.run(Sink.count));
const first = yield * Stream.make(1, 2, 3).pipe(Stream.run(Sink.head));
const last = yield * Stream.make(1, 2, 3).pipe(Stream.run(Sink.last));
const taken = yield * Stream.make(1, 2, 3, 4, 5).pipe(Stream.run(Sink.take(3)));
Folding Sinks
const product = yield * Stream.make(1, 2, 3, 4, 5).pipe(Stream.run(Sink.foldLeft(1, (acc, n) => acc * n)));
const sumUntil100 =
yield *
Stream.iterate(1, (n) => n + 1).pipe(
Stream.run(
Sink.fold(
0,
(sum) => sum < 100,
(sum, n) => sum + n,
),
),
);
const foldWithLog = Sink.foldEffect(
0,
(sum) => sum < 100,
(sum, n) =>
Effect.gen(function* () {
yield* Effect.log(`Adding ${n} to ${sum}`);
return sum + n;
}),
);
Side Effect Sinks
yield * Stream.make(1, 2, 3).pipe(Stream.run(Sink.forEach((n) => Effect.log(`Got: ${n}`))));
yield * Stream.make(1, 2, 3).pipe(Stream.run(Sink.drain));
Creating Custom Sinks
Sink.make
const maxSink = Sink.make<number, number, never, never, never>(
// Initial state
Number.NEGATIVE_INFINITY,
// Process each element
(max, n) => (n > max ? n : max),
// Extract result
(max) => max,
);
const max = yield * Stream.make(3, 1, 4, 1, 5, 9).pipe(Stream.run(maxSink)); // 9
Sink.fromEffect
const logAndReturn = <A>(label: string) =>
Sink.fromEffect(
Effect.gen(function* () {
yield* Effect.log(`Starting ${label}`);
return [] as A[];
}),
);
Sink.fromPush
For more control over the sink lifecycle:
const customSink = Sink.fromPush<number, number, never, never>((input) =>
Effect.sync(() =>
Option.match(input, {
onNone: () => Either.left(finalResult), // Stream ended
onSome: (chunk) => {
// Process chunk
// Return Either.right to continue, Either.left to finish
return Either.right(undefined);
},
}),
),
);
Sink Operations
Transforming Sinks
const doubledSum = Sink.sum.pipe(Sink.map((sum) => sum * 2));
const lengthSum = Sink.sum.pipe(Sink.contramap((s: string) => s.length));
const processStrings = Sink.sum.pipe(
Sink.dimap(
(s: string) => s.length,
(sum) => `Total length: ${sum}`,
),
);
Combining Sinks
const sumAndCount = Sink.zip(Sink.sum, Sink.count);
const [sum, count] = yield * Stream.make(1, 2, 3, 4, 5).pipe(Stream.run(sumAndCount));
const firstOrSum = Sink.race(Sink.head, Sink.sum.pipe(Sink.map(Option.some)));
Filtering
const sumPositive = Sink.sum.pipe(Sink.filterInput((n: number) => n > 0));
const result = yield * Stream.make(-1, 2, -3, 4, -5).pipe(Stream.run(sumPositive));
Leftovers
Sinks can leave unconsumed elements:
const takeThree = Sink.take<number>(3);
const [first, rest] =
yield * Stream.make(1, 2, 3, 4, 5).pipe(Stream.run(Sink.take<number>(3).pipe(Sink.collectLeftover)));
Sink Concurrency
Parallel Sinks
const parallelSinks = Sink.zipPar(Sink.sum, Sink.count, Sink.collectAll<number>());
const [sum, count, all] = yield * Stream.make(1, 2, 3, 4, 5).pipe(Stream.run(parallelSinks));
Chunked Processing
const chunkedSum = Sink.foldChunks(
0,
() => true,
(sum, chunk: Chunk.Chunk<number>) => sum + Chunk.reduce(chunk, 0, (a, b) => a + b),
);
Common Patterns
Batched Database Insert
const batchInsert = (batchSize: number) =>
Sink.collectAllN<Record>(batchSize).pipe(
Sink.mapEffect((batch) => Effect.tryPromise(() => db.insertMany(Chunk.toArray(batch)))),
);
yield * recordStream.pipe(Stream.run(batchInsert(100)));
Aggregation Pipeline
const stats = Sink.zip(Sink.sum, Sink.zip(Sink.count, Sink.zip(Sink.head, Sink.last))).pipe(
Sink.map(([sum, [count, [first, last]]]) => ({
sum,
count,
average: count > 0 ? sum / count : 0,
first,
last,
})),
);
Write to File
const writeToFile = (path: string) =>
Sink.forEach((line: string) =>
Effect.gen(function* () {
const fs = yield* FileSystem;
yield* fs.appendFileString(path, line + "\n");
}),
);
Best Practices
- Use built-in sinks when possible - Optimized and tested
- Combine sinks with zip - Run multiple aggregations in one pass
- Use foldChunks for efficiency - Process chunks, not elements
- Handle leftovers - Consider unconsumed elements
- Prefer Sink over forEach - More composable
Additional Resources
For comprehensive sink documentation, consult ${CLAUDE_PLUGIN_ROOT}/references/llms-full.txt.
Search for these sections:
- "Creating Sinks" for sink construction
- "Sink Operations" for transformations
- "Sink Concurrency" for parallel processing
- "Leftovers" for handling unconsumed elements
Weekly Installs
8
Repository
andrueandersonc…ffect-tsGitHub Stars
5
First Seen
Jan 24, 2026
Security Audits
Installed on
claude-code7
opencode6
gemini-cli6
github-copilot6
codex6
antigravity5