streams
SKILL.md
Streams in Effect
Overview
Effect Streams provide:
- Lazy evaluation - Elements produced on demand
- Resource safety - Automatic cleanup
- Backpressure - Producer/consumer coordination
- Composition - Transform, filter, merge streams
- Error handling - Typed errors in stream pipeline
Stream<A, E, R>;
// Produces values of type A
// May fail with error E
// Requires environment R
Creating Streams
From Values
import { Stream } from "effect";
const numbers = Stream.make(1, 2, 3, 4, 5);
const fromArray = Stream.fromIterable([1, 2, 3]);
const empty = Stream.empty;
const single = Stream.succeed(42);
const infinite = Stream.iterate(1, (n) => n + 1);
From Effects
const fromEffect = Stream.fromEffect(fetchData());
const polling = Stream.repeatEffect(checkStatus());
const scheduled = Stream.repeatEffectWithSchedule(checkStatus(), Schedule.spaced("5 seconds"));
From Async Sources
// From async iterable
const fromAsyncIterable = Stream.fromAsyncIterable(asyncGenerator(), (error) => new StreamError({ cause: error }));
// From callback/event emitter
const fromCallback = Stream.async<number, never>((emit) => {
const handler = (value: number) => emit.single(value);
eventEmitter.on("data", handler);
return Effect.sync(() => eventEmitter.off("data", handler));
});
// From queue
const fromQueue = Stream.fromQueue(queue);
Generating Streams
const naturals = Stream.unfold(1, (n) => Option.some([n, n + 1]));
const range = Stream.range(1, 100);
const repeated = Stream.repeat(Stream.succeed("ping")).pipe(Stream.take(5));
Transforming Streams
map - Transform Elements
const doubled = numbers.pipe(Stream.map((n) => n * 2));
const enriched = users.pipe(Stream.mapEffect((user) => fetchProfile(user.id)));
const parallel = items.pipe(Stream.mapEffect(process, { concurrency: 10 }));
filter - Select Elements
const evens = numbers.pipe(Stream.filter((n) => n % 2 === 0));
const valid = items.pipe(Stream.filterEffect((item) => validate(item)));
flatMap - Nested Streams
const expanded = numbers.pipe(Stream.flatMap((n) => Stream.make(n, n * 10, n * 100)));
// 1, 10, 100, 2, 20, 200, ...
take/drop
const first5 = numbers.pipe(Stream.take(5));
const skip5 = numbers.pipe(Stream.drop(5));
const firstWhile = numbers.pipe(Stream.takeWhile((n) => n < 10));
const dropWhile = numbers.pipe(Stream.dropWhile((n) => n < 10));
Combining Streams
concat - Sequential
const combined = Stream.concat(stream1, stream2);
// or
const combined = stream1.pipe(Stream.concat(stream2));
merge - Interleaved
// Interleave elements from both
const merged = Stream.merge(stream1, stream2);
// Merge multiple
const allMerged = Stream.mergeAll([s1, s2, s3], { concurrency: 3 });
zip - Pair Elements
const zipped = Stream.zip(names, ages);
// Stream<[string, number]>
// With function
const combined = Stream.zipWith(names, ages, (name, age) => ({ name, age }));
interleave
const interleaved = Stream.interleave(stream1, stream2);
// a1, b1, a2, b2, ...
Consuming Streams
Running to Collection
const array = yield * Stream.runCollect(numbers);
const first = yield * Stream.runHead(numbers);
const sum = yield * Stream.runFold(numbers, 0, (acc, n) => acc + n);
Running for Effects
yield * numbers.pipe(Stream.runForEach((n) => Effect.log(`Got: ${n}`)));
yield * numbers.pipe(Stream.runDrain);
Running to Sink
import { Sink } from "effect";
const sum = yield * numbers.pipe(Stream.run(Sink.sum));
const array = yield * numbers.pipe(Stream.run(Sink.collectAll()));
Chunking
Streams process elements in chunks for efficiency:
const chunked = numbers.pipe(Stream.grouped(10));
const processed = numbers.pipe(Stream.mapChunks((chunk) => Chunk.map(chunk, (n) => n * 2)));
const rechunked = numbers.pipe(Stream.rechunk(100));
Error Handling
const safe = stream.pipe(Stream.catchAll((error) => Stream.succeed(fallbackValue)));
const handled = stream.pipe(Stream.catchTag("NetworkError", (error) => Stream.succeed(cachedValue)));
const resilient = stream.pipe(Stream.retry(Schedule.exponential("1 second")));
const withFallback = stream.pipe(Stream.orElse(() => fallbackStream));
Resource Management
// Stream with resource lifecycle
const fileStream = Stream.acquireRelease(
Effect.sync(() => fs.openSync("data.txt", "r")),
(fd) => Effect.sync(() => fs.closeSync(fd)),
).pipe(
Stream.flatMap((fd) =>
Stream.repeatEffectOption(
Effect.sync(() => {
const buffer = Buffer.alloc(1024);
const bytes = fs.readSync(fd, buffer);
return bytes > 0 ? Option.some(buffer.slice(0, bytes)) : Option.none();
}),
),
),
);
// Scoped streams
const scoped = Stream.scoped(Effect.acquireRelease(openConnection, closeConnection));
Sinks
Sinks consume stream elements:
import { Sink } from "effect";
Sink.sum;
Sink.count;
Sink.head;
Sink.last;
Sink.collectAll();
Sink.forEach(f);
const maxSink = Sink.foldLeft(Number.NEGATIVE_INFINITY, (max, n: number) => Math.max(max, n));
Common Patterns
Batched Processing
const batchProcess = stream.pipe(
Stream.grouped(100),
Stream.mapEffect((batch) => Effect.tryPromise(() => api.processBatch(Chunk.toArray(batch)))),
);
Rate Limiting
const rateLimited = stream.pipe(
Stream.throttle({
units: 1,
duration: "100 millis",
strategy: "shape",
}),
);
Debouncing
const debounced = stream.pipe(Stream.debounce("500 millis"));
Windowing
// Time-based windows
const windows = stream.pipe(Stream.groupedWithin(1000, "1 second"));
Best Practices
- Use chunking for efficiency - Batch operations when possible
- Handle backpressure - Use appropriate buffer strategies
- Clean up resources - Use acquireRelease for external resources
- Process in parallel - Use concurrency option in mapEffect
- Handle errors early - Catch/retry before final consumption
Additional Resources
For comprehensive stream documentation, consult ${CLAUDE_PLUGIN_ROOT}/references/llms-full.txt.
Search for these sections:
- "Creating Streams" for stream construction
- "Consuming Streams" for running streams
- "Operations" for transformations
- "Error Handling in Streams" for error patterns
- "Resourceful Streams" for resource management
- "Sink" for custom sinks
Weekly Installs
8
Repository
andrueandersonc…ffect-tsGitHub Stars
5
First Seen
Jan 24, 2026
Security Audits
Installed on
claude-code7
opencode6
gemini-cli6
github-copilot6
codex6
antigravity5