streams

SKILL.md

Streams in Effect

Overview

Effect Streams provide:

  • Lazy evaluation - Elements produced on demand
  • Resource safety - Automatic cleanup
  • Backpressure - Producer/consumer coordination
  • Composition - Transform, filter, merge streams
  • Error handling - Typed errors in stream pipeline
Stream<A, E, R>;
// Produces values of type A
// May fail with error E
// Requires environment R

Creating Streams

From Values

import { Stream } from "effect";

const numbers = Stream.make(1, 2, 3, 4, 5);

const fromArray = Stream.fromIterable([1, 2, 3]);

const empty = Stream.empty;

const single = Stream.succeed(42);

const infinite = Stream.iterate(1, (n) => n + 1);

From Effects

const fromEffect = Stream.fromEffect(fetchData());

const polling = Stream.repeatEffect(checkStatus());

const scheduled = Stream.repeatEffectWithSchedule(checkStatus(), Schedule.spaced("5 seconds"));

From Async Sources

// From async iterable
const fromAsyncIterable = Stream.fromAsyncIterable(asyncGenerator(), (error) => new StreamError({ cause: error }));

// From callback/event emitter
const fromCallback = Stream.async<number, never>((emit) => {
  const handler = (value: number) => emit.single(value);
  eventEmitter.on("data", handler);
  return Effect.sync(() => eventEmitter.off("data", handler));
});

// From queue
const fromQueue = Stream.fromQueue(queue);

Generating Streams

const naturals = Stream.unfold(1, (n) => Option.some([n, n + 1]));

const range = Stream.range(1, 100);

const repeated = Stream.repeat(Stream.succeed("ping")).pipe(Stream.take(5));

Transforming Streams

map - Transform Elements

const doubled = numbers.pipe(Stream.map((n) => n * 2));

const enriched = users.pipe(Stream.mapEffect((user) => fetchProfile(user.id)));

const parallel = items.pipe(Stream.mapEffect(process, { concurrency: 10 }));

filter - Select Elements

const evens = numbers.pipe(Stream.filter((n) => n % 2 === 0));

const valid = items.pipe(Stream.filterEffect((item) => validate(item)));

flatMap - Nested Streams

const expanded = numbers.pipe(Stream.flatMap((n) => Stream.make(n, n * 10, n * 100)));
// 1, 10, 100, 2, 20, 200, ...

take/drop

const first5 = numbers.pipe(Stream.take(5));
const skip5 = numbers.pipe(Stream.drop(5));
const firstWhile = numbers.pipe(Stream.takeWhile((n) => n < 10));
const dropWhile = numbers.pipe(Stream.dropWhile((n) => n < 10));

Combining Streams

concat - Sequential

const combined = Stream.concat(stream1, stream2);
// or
const combined = stream1.pipe(Stream.concat(stream2));

merge - Interleaved

// Interleave elements from both
const merged = Stream.merge(stream1, stream2);

// Merge multiple
const allMerged = Stream.mergeAll([s1, s2, s3], { concurrency: 3 });

zip - Pair Elements

const zipped = Stream.zip(names, ages);
// Stream<[string, number]>

// With function
const combined = Stream.zipWith(names, ages, (name, age) => ({ name, age }));

interleave

const interleaved = Stream.interleave(stream1, stream2);
// a1, b1, a2, b2, ...

Consuming Streams

Running to Collection

const array = yield * Stream.runCollect(numbers);

const first = yield * Stream.runHead(numbers);

const sum = yield * Stream.runFold(numbers, 0, (acc, n) => acc + n);

Running for Effects

yield * numbers.pipe(Stream.runForEach((n) => Effect.log(`Got: ${n}`)));

yield * numbers.pipe(Stream.runDrain);

Running to Sink

import { Sink } from "effect";

const sum = yield * numbers.pipe(Stream.run(Sink.sum));

const array = yield * numbers.pipe(Stream.run(Sink.collectAll()));

Chunking

Streams process elements in chunks for efficiency:

const chunked = numbers.pipe(Stream.grouped(10));

const processed = numbers.pipe(Stream.mapChunks((chunk) => Chunk.map(chunk, (n) => n * 2)));

const rechunked = numbers.pipe(Stream.rechunk(100));

Error Handling

const safe = stream.pipe(Stream.catchAll((error) => Stream.succeed(fallbackValue)));

const handled = stream.pipe(Stream.catchTag("NetworkError", (error) => Stream.succeed(cachedValue)));

const resilient = stream.pipe(Stream.retry(Schedule.exponential("1 second")));

const withFallback = stream.pipe(Stream.orElse(() => fallbackStream));

Resource Management

// Stream with resource lifecycle
const fileStream = Stream.acquireRelease(
  Effect.sync(() => fs.openSync("data.txt", "r")),
  (fd) => Effect.sync(() => fs.closeSync(fd)),
).pipe(
  Stream.flatMap((fd) =>
    Stream.repeatEffectOption(
      Effect.sync(() => {
        const buffer = Buffer.alloc(1024);
        const bytes = fs.readSync(fd, buffer);
        return bytes > 0 ? Option.some(buffer.slice(0, bytes)) : Option.none();
      }),
    ),
  ),
);

// Scoped streams
const scoped = Stream.scoped(Effect.acquireRelease(openConnection, closeConnection));

Sinks

Sinks consume stream elements:

import { Sink } from "effect";

Sink.sum;
Sink.count;
Sink.head;
Sink.last;
Sink.collectAll();
Sink.forEach(f);

const maxSink = Sink.foldLeft(Number.NEGATIVE_INFINITY, (max, n: number) => Math.max(max, n));

Common Patterns

Batched Processing

const batchProcess = stream.pipe(
  Stream.grouped(100),
  Stream.mapEffect((batch) => Effect.tryPromise(() => api.processBatch(Chunk.toArray(batch)))),
);

Rate Limiting

const rateLimited = stream.pipe(
  Stream.throttle({
    units: 1,
    duration: "100 millis",
    strategy: "shape",
  }),
);

Debouncing

const debounced = stream.pipe(Stream.debounce("500 millis"));

Windowing

// Time-based windows
const windows = stream.pipe(Stream.groupedWithin(1000, "1 second"));

Best Practices

  1. Use chunking for efficiency - Batch operations when possible
  2. Handle backpressure - Use appropriate buffer strategies
  3. Clean up resources - Use acquireRelease for external resources
  4. Process in parallel - Use concurrency option in mapEffect
  5. Handle errors early - Catch/retry before final consumption

Additional Resources

For comprehensive stream documentation, consult ${CLAUDE_PLUGIN_ROOT}/references/llms-full.txt.

Search for these sections:

  • "Creating Streams" for stream construction
  • "Consuming Streams" for running streams
  • "Operations" for transformations
  • "Error Handling in Streams" for error patterns
  • "Resourceful Streams" for resource management
  • "Sink" for custom sinks
Weekly Installs
8
GitHub Stars
5
First Seen
Jan 24, 2026
Installed on
claude-code7
opencode6
gemini-cli6
github-copilot6
codex6
antigravity5