streams
Streams in Effect
Overview
Effect Streams provide:
- Lazy evaluation - Elements produced on demand
- Resource safety - Automatic cleanup
- Backpressure - Producer/consumer coordination
- Composition - Transform, filter, merge streams
- Error handling - Typed errors in stream pipeline
Stream<A, E, R>;
// Produces values of type A
// May fail with error E
// Requires environment R
Creating Streams
From Values
import { Stream } from "effect";
const numbers = Stream.make(1, 2, 3, 4, 5);
const fromArray = Stream.fromIterable([1, 2, 3]);
const empty = Stream.empty;
const single = Stream.succeed(42);
const infinite = Stream.iterate(1, (n) => n + 1);
From Effects
const fromEffect = Stream.fromEffect(fetchData());
const polling = Stream.repeatEffect(checkStatus());
const scheduled = Stream.repeatEffectWithSchedule(checkStatus(), Schedule.spaced("5 seconds"));
From Async Sources
// From async iterable
const fromAsyncIterable = Stream.fromAsyncIterable(asyncGenerator(), (error) => new StreamError({ cause: error }));
// From callback/event emitter
const fromCallback = Stream.async<number, never>((emit) => {
const handler = (value: number) => emit.single(value);
eventEmitter.on("data", handler);
return Effect.sync(() => eventEmitter.off("data", handler));
});
// From queue
const fromQueue = Stream.fromQueue(queue);
Generating Streams
const naturals = Stream.unfold(1, (n) => Option.some([n, n + 1]));
const range = Stream.range(1, 100);
const repeated = Stream.repeat(Stream.succeed("ping")).pipe(Stream.take(5));
Transforming Streams
map - Transform Elements
const doubled = numbers.pipe(Stream.map((n) => n * 2));
const enriched = users.pipe(Stream.mapEffect((user) => fetchProfile(user.id)));
const parallel = items.pipe(Stream.mapEffect(process, { concurrency: 10 }));
filter - Select Elements
const evens = numbers.pipe(Stream.filter((n) => n % 2 === 0));
const valid = items.pipe(Stream.filterEffect((item) => validate(item)));
flatMap - Nested Streams
const expanded = numbers.pipe(Stream.flatMap((n) => Stream.make(n, n * 10, n * 100)));
// 1, 10, 100, 2, 20, 200, ...
take/drop
const first5 = numbers.pipe(Stream.take(5));
const skip5 = numbers.pipe(Stream.drop(5));
const firstWhile = numbers.pipe(Stream.takeWhile((n) => n < 10));
const dropWhile = numbers.pipe(Stream.dropWhile((n) => n < 10));
Combining Streams
concat - Sequential
const combined = Stream.concat(stream1, stream2);
// or
const combined = stream1.pipe(Stream.concat(stream2));
merge - Interleaved
// Interleave elements from both
const merged = Stream.merge(stream1, stream2);
// Merge multiple
const allMerged = Stream.mergeAll([s1, s2, s3], { concurrency: 3 });
zip - Pair Elements
const zipped = Stream.zip(names, ages);
// Stream<[string, number]>
// With function
const combined = Stream.zipWith(names, ages, (name, age) => ({ name, age }));
interleave
const interleaved = Stream.interleave(stream1, stream2);
// a1, b1, a2, b2, ...
Consuming Streams
Running to Collection
const array = yield * Stream.runCollect(numbers);
const first = yield * Stream.runHead(numbers);
const sum = yield * Stream.runFold(numbers, 0, (acc, n) => acc + n);
Running for Effects
yield * numbers.pipe(Stream.runForEach((n) => Effect.log(`Got: ${n}`)));
yield * numbers.pipe(Stream.runDrain);
Running to Sink
import { Sink } from "effect";
const sum = yield * numbers.pipe(Stream.run(Sink.sum));
const array = yield * numbers.pipe(Stream.run(Sink.collectAll()));
Chunking
Streams process elements in chunks for efficiency:
const chunked = numbers.pipe(Stream.grouped(10));
const processed = numbers.pipe(Stream.mapChunks((chunk) => Chunk.map(chunk, (n) => n * 2)));
const rechunked = numbers.pipe(Stream.rechunk(100));
Error Handling
const safe = stream.pipe(Stream.catchAll((error) => Stream.succeed(fallbackValue)));
const handled = stream.pipe(Stream.catchTag("NetworkError", (error) => Stream.succeed(cachedValue)));
const resilient = stream.pipe(Stream.retry(Schedule.exponential("1 second")));
const withFallback = stream.pipe(Stream.orElse(() => fallbackStream));
Resource Management
// Stream with resource lifecycle
const fileStream = Stream.acquireRelease(
Effect.sync(() => fs.openSync("data.txt", "r")),
(fd) => Effect.sync(() => fs.closeSync(fd)),
).pipe(
Stream.flatMap((fd) =>
Stream.repeatEffectOption(
Effect.sync(() => {
const buffer = Buffer.alloc(1024);
const bytes = fs.readSync(fd, buffer);
return bytes > 0 ? Option.some(buffer.slice(0, bytes)) : Option.none();
}),
),
),
);
// Scoped streams
const scoped = Stream.scoped(Effect.acquireRelease(openConnection, closeConnection));
Sinks
Sinks consume stream elements:
import { Sink } from "effect";
Sink.sum;
Sink.count;
Sink.head;
Sink.last;
Sink.collectAll();
Sink.forEach(f);
const maxSink = Sink.foldLeft(Number.NEGATIVE_INFINITY, (max, n: number) => Math.max(max, n));
Common Patterns
Batched Processing
const batchProcess = stream.pipe(
Stream.grouped(100),
Stream.mapEffect((batch) => Effect.tryPromise(() => api.processBatch(Chunk.toArray(batch)))),
);
Rate Limiting
const rateLimited = stream.pipe(
Stream.throttle({
units: 1,
duration: "100 millis",
strategy: "shape",
}),
);
Debouncing
const debounced = stream.pipe(Stream.debounce("500 millis"));
Windowing
// Time-based windows
const windows = stream.pipe(Stream.groupedWithin(1000, "1 second"));
Best Practices
- Use chunking for efficiency - Batch operations when possible
- Handle backpressure - Use appropriate buffer strategies
- Clean up resources - Use acquireRelease for external resources
- Process in parallel - Use concurrency option in mapEffect
- Handle errors early - Catch/retry before final consumption
Additional Resources
For comprehensive stream documentation, consult ${CLAUDE_PLUGIN_ROOT}/references/llms-full.txt.
Search for these sections:
- "Creating Streams" for stream construction
- "Consuming Streams" for running streams
- "Operations" for transformations
- "Error Handling in Streams" for error patterns
- "Resourceful Streams" for resource management
- "Sink" for custom sinks
More from andrueandersoncs/claude-skill-effect-ts
schema
This skill should be used when the user asks about "Effect Schema", "Schema.Struct", "Schema.decodeUnknown", "data validation", "parsing", "Schema.transform", "Schema filters", "Schema annotations", "JSON Schema", "Schema.Class", "Schema branded types", "encoding", "decoding", "Schema.parseJson", or needs to understand how Effect handles data validation and transformation.
13testing
This skill should be used when the user asks about "Effect testing", "@effect/vitest", "it.effect", "it.live", "it.scoped", "it.layer", "it.prop", "Schema Arbitrary", "property-based testing", "fast-check", "TestClock", "testing effects", "mocking services", "test layers", "TestContext", "Effect.provide test", "time testing", "Effect test utilities", "unit testing Effect", "generating test data", "flakyTest", "test coverage", "100% coverage", "service testing", "test doubles", "mock services", or needs to understand how to test Effect-based code.
13traits
This skill should be used when the user asks about "Effect Equal", "Effect Hash", "Equivalence", "Order", "structural equality", "custom equality", "comparing objects", "sorting", "Equal.equals", "Hash.hash", "Equivalence.make", "Order.lessThan", "comparable types", or needs to understand how Effect handles equality, hashing, and ordering of values.
12configuration
This skill should be used when the user asks about "Effect Config", "environment variables", "configuration management", "Config.string", "Config.number", "ConfigProvider", "Config.nested", "Config.withDefault", "Config.redacted", "sensitive values", "config validation", "loading config from JSON", "config schema", or needs to understand how Effect handles application configuration.
12concurrency
This skill should be used when the user asks about "Effect concurrency", "fibers", "Fiber", "forking", "Effect.fork", "Effect.forkDaemon", "parallel execution", "Effect.all concurrency", "Deferred", "Queue", "PubSub", "Semaphore", "Latch", "fiber interruption", "Effect.race", "Effect.raceAll", "concurrent effects", or needs to understand how Effect handles parallel and concurrent execution.
11observability
This skill should be used when the user asks about "Effect logging", "Effect.log", "Effect metrics", "Effect tracing", "spans", "telemetry", "Metric.counter", "Metric.gauge", "Metric.histogram", "OpenTelemetry", "structured logging", "log levels", "Effect.logDebug", "Effect.logInfo", "Effect.logWarning", "Effect.logError", or needs to understand how Effect handles logging, metrics, and distributed tracing.
10