sinks
Sinks in Effect
Overview
A Sink is a consumer of stream elements that produces a result:
Sink<A, In, L, E, R>;
// A - Result type (what sink produces)
// In - Input element type (what sink consumes)
// L - Leftover type (unconsumed elements)
// E - Error type
// R - Required environment
Sinks are the counterpart to Streams - while Streams produce data, Sinks consume it.
Built-in Sinks
Collecting Elements
import { Stream, Sink } from "effect";
const all = yield * Stream.make(1, 2, 3, 4, 5).pipe(Stream.run(Sink.collectAll()));
const array = yield * Stream.make(1, 2, 3).pipe(Stream.run(Sink.collectAllToArray()));
const firstThree = yield * Stream.range(1, 100).pipe(Stream.run(Sink.collectAllN(3)));
const whileSmall = yield * Stream.iterate(1, (n) => n + 1).pipe(Stream.run(Sink.collectAllWhile((n) => n < 5)));
Aggregation Sinks
const total = yield * Stream.make(1, 2, 3, 4, 5).pipe(Stream.run(Sink.sum));
const count = yield * Stream.make("a", "b", "c").pipe(Stream.run(Sink.count));
const first = yield * Stream.make(1, 2, 3).pipe(Stream.run(Sink.head));
const last = yield * Stream.make(1, 2, 3).pipe(Stream.run(Sink.last));
const taken = yield * Stream.make(1, 2, 3, 4, 5).pipe(Stream.run(Sink.take(3)));
Folding Sinks
const product = yield * Stream.make(1, 2, 3, 4, 5).pipe(Stream.run(Sink.foldLeft(1, (acc, n) => acc * n)));
const sumUntil100 =
yield *
Stream.iterate(1, (n) => n + 1).pipe(
Stream.run(
Sink.fold(
0,
(sum) => sum < 100,
(sum, n) => sum + n,
),
),
);
const foldWithLog = Sink.foldEffect(
0,
(sum) => sum < 100,
(sum, n) =>
Effect.gen(function* () {
yield* Effect.log(`Adding ${n} to ${sum}`);
return sum + n;
}),
);
Side Effect Sinks
yield * Stream.make(1, 2, 3).pipe(Stream.run(Sink.forEach((n) => Effect.log(`Got: ${n}`))));
yield * Stream.make(1, 2, 3).pipe(Stream.run(Sink.drain));
Creating Custom Sinks
Sink.make
const maxSink = Sink.make<number, number, never, never, never>(
// Initial state
Number.NEGATIVE_INFINITY,
// Process each element
(max, n) => (n > max ? n : max),
// Extract result
(max) => max,
);
const max = yield * Stream.make(3, 1, 4, 1, 5, 9).pipe(Stream.run(maxSink)); // 9
Sink.fromEffect
const logAndReturn = <A>(label: string) =>
Sink.fromEffect(
Effect.gen(function* () {
yield* Effect.log(`Starting ${label}`);
return [] as A[];
}),
);
Sink.fromPush
For more control over the sink lifecycle:
const customSink = Sink.fromPush<number, number, never, never>((input) =>
Effect.sync(() =>
Option.match(input, {
onNone: () => Either.left(finalResult), // Stream ended
onSome: (chunk) => {
// Process chunk
// Return Either.right to continue, Either.left to finish
return Either.right(undefined);
},
}),
),
);
Sink Operations
Transforming Sinks
const doubledSum = Sink.sum.pipe(Sink.map((sum) => sum * 2));
const lengthSum = Sink.sum.pipe(Sink.contramap((s: string) => s.length));
const processStrings = Sink.sum.pipe(
Sink.dimap(
(s: string) => s.length,
(sum) => `Total length: ${sum}`,
),
);
Combining Sinks
const sumAndCount = Sink.zip(Sink.sum, Sink.count);
const [sum, count] = yield * Stream.make(1, 2, 3, 4, 5).pipe(Stream.run(sumAndCount));
const firstOrSum = Sink.race(Sink.head, Sink.sum.pipe(Sink.map(Option.some)));
Filtering
const sumPositive = Sink.sum.pipe(Sink.filterInput((n: number) => n > 0));
const result = yield * Stream.make(-1, 2, -3, 4, -5).pipe(Stream.run(sumPositive));
Leftovers
Sinks can leave unconsumed elements:
const takeThree = Sink.take<number>(3);
const [first, rest] =
yield * Stream.make(1, 2, 3, 4, 5).pipe(Stream.run(Sink.take<number>(3).pipe(Sink.collectLeftover)));
Sink Concurrency
Parallel Sinks
const parallelSinks = Sink.zipPar(Sink.sum, Sink.count, Sink.collectAll<number>());
const [sum, count, all] = yield * Stream.make(1, 2, 3, 4, 5).pipe(Stream.run(parallelSinks));
Chunked Processing
const chunkedSum = Sink.foldChunks(
0,
() => true,
(sum, chunk: Chunk.Chunk<number>) => sum + Chunk.reduce(chunk, 0, (a, b) => a + b),
);
Common Patterns
Batched Database Insert
const batchInsert = (batchSize: number) =>
Sink.collectAllN<Record>(batchSize).pipe(
Sink.mapEffect((batch) => Effect.tryPromise(() => db.insertMany(Chunk.toArray(batch)))),
);
yield * recordStream.pipe(Stream.run(batchInsert(100)));
Aggregation Pipeline
const stats = Sink.zip(Sink.sum, Sink.zip(Sink.count, Sink.zip(Sink.head, Sink.last))).pipe(
Sink.map(([sum, [count, [first, last]]]) => ({
sum,
count,
average: count > 0 ? sum / count : 0,
first,
last,
})),
);
Write to File
const writeToFile = (path: string) =>
Sink.forEach((line: string) =>
Effect.gen(function* () {
const fs = yield* FileSystem;
yield* fs.appendFileString(path, line + "\n");
}),
);
Best Practices
- Use built-in sinks when possible - Optimized and tested
- Combine sinks with zip - Run multiple aggregations in one pass
- Use foldChunks for efficiency - Process chunks, not elements
- Handle leftovers - Consider unconsumed elements
- Prefer Sink over forEach - More composable
Additional Resources
For comprehensive sink documentation, consult ${CLAUDE_PLUGIN_ROOT}/references/llms-full.txt.
Search for these sections:
- "Creating Sinks" for sink construction
- "Sink Operations" for transformations
- "Sink Concurrency" for parallel processing
- "Leftovers" for handling unconsumed elements
More from andrueandersoncs/claude-skill-effect-ts
schema
This skill should be used when the user asks about "Effect Schema", "Schema.Struct", "Schema.decodeUnknown", "data validation", "parsing", "Schema.transform", "Schema filters", "Schema annotations", "JSON Schema", "Schema.Class", "Schema branded types", "encoding", "decoding", "Schema.parseJson", or needs to understand how Effect handles data validation and transformation.
13testing
This skill should be used when the user asks about "Effect testing", "@effect/vitest", "it.effect", "it.live", "it.scoped", "it.layer", "it.prop", "Schema Arbitrary", "property-based testing", "fast-check", "TestClock", "testing effects", "mocking services", "test layers", "TestContext", "Effect.provide test", "time testing", "Effect test utilities", "unit testing Effect", "generating test data", "flakyTest", "test coverage", "100% coverage", "service testing", "test doubles", "mock services", or needs to understand how to test Effect-based code.
13traits
This skill should be used when the user asks about "Effect Equal", "Effect Hash", "Equivalence", "Order", "structural equality", "custom equality", "comparing objects", "sorting", "Equal.equals", "Hash.hash", "Equivalence.make", "Order.lessThan", "comparable types", or needs to understand how Effect handles equality, hashing, and ordering of values.
12configuration
This skill should be used when the user asks about "Effect Config", "environment variables", "configuration management", "Config.string", "Config.number", "ConfigProvider", "Config.nested", "Config.withDefault", "Config.redacted", "sensitive values", "config validation", "loading config from JSON", "config schema", or needs to understand how Effect handles application configuration.
12concurrency
This skill should be used when the user asks about "Effect concurrency", "fibers", "Fiber", "forking", "Effect.fork", "Effect.forkDaemon", "parallel execution", "Effect.all concurrency", "Deferred", "Queue", "PubSub", "Semaphore", "Latch", "fiber interruption", "Effect.race", "Effect.raceAll", "concurrent effects", or needs to understand how Effect handles parallel and concurrent execution.
11observability
This skill should be used when the user asks about "Effect logging", "Effect.log", "Effect metrics", "Effect tracing", "spans", "telemetry", "Metric.counter", "Metric.gauge", "Metric.histogram", "OpenTelemetry", "structured logging", "log levels", "Effect.logDebug", "Effect.logInfo", "Effect.logWarning", "Effect.logError", or needs to understand how Effect handles logging, metrics, and distributed tracing.
10