concurrency
SKILL.md
Concurrency in Effect
Overview
Effect provides lightweight fiber-based concurrency:
- Fibers - Lightweight threads managed by Effect runtime
- Structured concurrency - Parent fibers supervise children
- Safe interruption - Clean cancellation with resource cleanup
- Concurrent primitives - Queue, Deferred, Semaphore, PubSub
Basic Parallel Execution
Effect.all with Concurrency
import { Effect } from "effect";
const results = yield * Effect.all([fetchUser(1), fetchUser(2), fetchUser(3)], { concurrency: "unbounded" });
const results = yield * Effect.all(tasks, { concurrency: 5 });
const results = yield * Effect.all(tasks);
Effect.forEach with Concurrency
const users = yield * Effect.forEach(userIds, (id) => fetchUser(id), { concurrency: 10 });
Fibers
Creating Fibers with fork
const program = Effect.gen(function* () {
const fiber = yield* Effect.fork(longRunningTask);
yield* doOtherWork();
const result = yield* Fiber.join(fiber);
});
Fork Variants
const fiber = yield * Effect.fork(task);
const fiber = yield * Effect.forkDaemon(task);
const fiber = yield * Effect.forkIn(scope)(task);
const fiber = yield * Effect.forkWithErrorHandler(task, onError);
Fiber Operations
import { Fiber } from "effect";
const result = yield * Fiber.join(fiber);
const exit = yield * Fiber.await(fiber);
yield * Fiber.interrupt(fiber);
const maybeResult = yield * Fiber.poll(fiber);
Racing
Effect.race - First to Complete
const fastest = yield * Effect.race(fetchFromServer1(), fetchFromServer2());
Effect.raceAll - Race Many
const fastest = yield * Effect.raceAll([fetchFromCDN1(), fetchFromCDN2(), fetchFromCDN3()]);
Effect.raceFirst - Include Failures
const first = yield * Effect.raceFirst(task1, task2);
Deferred - One-Time Promise
import { Deferred } from "effect";
const program = Effect.gen(function* () {
const deferred = yield* Deferred.make<string, never>();
const fiber = yield* Effect.fork(
Effect.gen(function* () {
const value = yield* Deferred.await(deferred);
yield* Effect.log(`Got: ${value}`);
}),
);
yield* Deferred.succeed(deferred, "Hello!");
yield* Fiber.join(fiber);
});
Queue - Concurrent Queue
import { Queue } from "effect";
const program = Effect.gen(function* () {
const queue = yield* Queue.bounded<number>(100);
yield* Effect.fork(Effect.forEach([1, 2, 3, 4, 5], (n) => Queue.offer(queue, n)));
const items = yield* Effect.forEach(Array.from({ length: 5 }), () => Queue.take(queue));
});
Queue Variants
const bounded = yield * Queue.bounded<number>(100);
const unbounded = yield * Queue.unbounded<number>();
const dropping = yield * Queue.dropping<number>(100);
const sliding = yield * Queue.sliding<number>(100);
PubSub - Publish/Subscribe
import { PubSub } from "effect";
const program = Effect.gen(function* () {
const pubsub = yield* PubSub.bounded<string>(100);
const sub1 = yield* PubSub.subscribe(pubsub);
const sub2 = yield* PubSub.subscribe(pubsub);
yield* PubSub.publish(pubsub, "Hello!");
const msg1 = yield* Queue.take(sub1);
const msg2 = yield* Queue.take(sub2);
});
Semaphore - Limit Concurrency
import { Effect } from "effect";
const program = Effect.gen(function* () {
const semaphore = yield* Effect.makeSemaphore(3);
yield* Effect.forEach(tasks, (task) => semaphore.withPermits(1)(task), { concurrency: "unbounded" });
});
Latch - Coordination Point
import { Latch } from "effect";
const program = Effect.gen(function* () {
const latch = yield* Latch.make(false);
yield* Effect.fork(
Effect.forEach(
workers,
(worker) =>
Effect.gen(function* () {
yield* Latch.await(latch);
yield* worker.start();
}),
{ concurrency: "unbounded" },
),
);
yield* Latch.open(latch);
});
Interruption
Interrupting Fibers
const fiber = yield * Effect.fork(longTask);
yield * Fiber.interrupt(fiber);
Uninterruptible Regions
const critical = Effect.uninterruptible(
Effect.gen(function* () {
yield* beginTransaction();
yield* performOperations();
yield* commitTransaction();
}),
);
Interruptible Within Uninterruptible
const program = Effect.uninterruptible(
Effect.gen(function* () {
yield* criticalSetup();
// This part can be interrupted
yield* Effect.interruptible(longOperation);
yield* criticalTeardown();
}),
);
Supervision
Structured concurrency ensures child fibers are managed:
const parent = Effect.gen(function* () {
const child1 = yield* Effect.fork(task1);
const child2 = yield* Effect.fork(task2);
// If parent fails/interrupts, children are interrupted
yield* failingOperation();
});
// child1 and child2 automatically interrupted
Daemon Fibers
Escape supervision with daemon:
const daemon = yield * Effect.forkDaemon(backgroundTask);
Common Patterns
Timeout with Fallback
const withTimeout = task.pipe(Effect.timeout("5 seconds"), Effect.map(Option.getOrElse(() => defaultValue)));
Worker Pool
const workerPool = Effect.gen(function* () {
const semaphore = yield* Effect.makeSemaphore(numWorkers);
return (task: Effect.Effect<A>) => semaphore.withPermits(1)(task);
});
Parallel with Error Collection
const results =
yield *
Effect.all(tasks, {
concurrency: "unbounded",
mode: "either", // Collect all results
});
Best Practices
- Use Effect.all concurrency for simple parallelism
- Use Semaphore to limit concurrent operations
- Prefer structured concurrency over daemon fibers
- Handle interruption in long-running effects
- Use Queue for producer/consumer patterns
- Use Deferred for one-time coordination
Additional Resources
For comprehensive concurrency documentation, consult ${CLAUDE_PLUGIN_ROOT}/references/llms-full.txt.
Search for these sections:
- "Fibers" for fiber management
- "Basic Concurrency" for parallel execution
- "Deferred" for synchronization primitives
- "Queue" for concurrent queues
- "PubSub" for publish/subscribe
- "Semaphore" for concurrency limiting
Weekly Installs
8
Repository
andrueandersonc…ffect-tsGitHub Stars
5
First Seen
Jan 24, 2026
Security Audits
Installed on
claude-code7
opencode6
gemini-cli6
github-copilot6
codex6
antigravity5