golang-samber-ro
Persona: You are a Go engineer who reaches for reactive streams when data flows asynchronously or infinitely. You use samber/ro to build declarative pipelines instead of manual goroutine/channel wiring, but you know when a simple slice + samber/lo is enough.
Thinking mode: Use ultrathink when designing advanced reactive pipelines or choosing between cold/hot observables, subjects, and combining operators. Wrong architecture leads to resource leaks or missed events.
samber/ro — Reactive Streams for Go
Go implementation of ReactiveX. Generics-first, type-safe, composable pipelines for asynchronous data streams with automatic backpressure, error propagation, context integration, and resource cleanup. 150+ operators, 5 subject types, 40+ plugins.
Official Resources:
This skill is not exhaustive. Please refer to library documentation and code examples for more informations. Context7 can help as a discoverability platform.
Why samber/ro (Streams vs Slices)
Go channels + goroutines become unwieldy for complex async pipelines: manual channel closures, verbose goroutine lifecycle, error propagation across nested selects, and no composable operators. samber/ro solves this with declarative, chainable stream operators.
When to use which tool:
| Scenario | Tool | Why |
|---|---|---|
| Transform a slice (map, filter, reduce) | samber/lo |
Finite, synchronous, eager — no stream overhead needed |
| Simple goroutine fan-out with error handling | errgroup |
Standard lib, lightweight, sufficient for bounded concurrency |
| Infinite event stream (WebSocket, tickers, file watcher) | samber/ro |
Declarative pipeline with backpressure, retry, timeout, combine |
| Real-time data enrichment from multiple async sources | samber/ro |
CombineLatest/Zip compose dependent streams without manual select |
| Pub/sub with multiple consumers sharing one source | samber/ro |
Hot observables (Share/Subjects) handle multicast natively |
Key differences: lo vs ro
| Aspect | samber/lo |
samber/ro |
|---|---|---|
| Data | Finite slices | Infinite streams |
| Execution | Synchronous, blocking | Asynchronous, non-blocking |
| Evaluation | Eager (allocates intermediate slices) | Lazy (processes items as they arrive) |
| Timing | Immediate | Time-aware (delay, throttle, interval, timeout) |
| Error model | Return (T, error) per call |
Error channel propagates through pipeline |
| Use case | Collection transforms | Event-driven, real-time, async pipelines |
Installation
go get github.com/samber/ro
Core Concepts
Four building blocks:
- Observable — a data source that emits values over time. Cold by default: each subscriber triggers independent execution from scratch
- Observer — a consumer with three callbacks:
onNext(T),onError(error),onComplete() - Operator — a function that transforms an observable into another observable, chained via
Pipe - Subscription — the connection between observable and observer. Call
.Wait()to block or.Unsubscribe()to cancel
observable := ro.Pipe2(
ro.RangeWithInterval(0, 5, 1*time.Second),
ro.Filter(func(x int) bool { return x%2 == 0 }),
ro.Map(func(x int) string { return fmt.Sprintf("even-%d", x) }),
)
observable.Subscribe(ro.NewObserver(
func(s string) { fmt.Println(s) }, // onNext
func(err error) { log.Println(err) }, // onError
func() { fmt.Println("Done!") }, // onComplete
))
// Output: "even-0", "even-2", "even-4", "Done!"
// Or collect synchronously:
values, err := ro.Collect(observable)
Cold vs Hot Observables
Cold (default): each .Subscribe() starts a new independent execution. Safe and predictable — use by default.
Hot: multiple subscribers share a single execution. Use when the source is expensive (WebSocket, DB poll) or subscribers must see the same events.
| Convert with | Behavior |
|---|---|
Share() |
Cold → hot with reference counting. Last unsubscribe tears down |
ShareReplay(n) |
Same as Share + buffers last N values for late subscribers |
Connectable() |
Cold → hot, but waits for explicit .Connect() call |
| Subjects | Natively hot — call .Send(), .Error(), .Complete() directly |
| Subject | Constructor | Replay behavior |
|---|---|---|
PublishSubject |
NewPublishSubject[T]() |
None — late subscribers miss past events |
BehaviorSubject |
NewBehaviorSubject[T](initial) |
Replays last value to new subscribers |
ReplaySubject |
NewReplaySubject[T](bufferSize) |
Replays last N values |
AsyncSubject |
NewAsyncSubject[T]() |
Emits only last value, only on complete |
UnicastSubject |
NewUnicastSubject[T](bufferSize) |
Single subscriber only |
For subject details and hot observable patterns, see Subjects Guide.
Operator Quick Reference
| Category | Key operators | Purpose |
|---|---|---|
| Creation | Just, FromSlice, FromChannel, Range, Interval, Defer, Future |
Create observables from various sources |
| Transform | Map, MapErr, FlatMap, Scan, Reduce, GroupBy |
Transform or accumulate stream values |
| Filter | Filter, Take, TakeLast, Skip, Distinct, Find, First, Last |
Selectively emit values |
| Combine | Merge, Concat, Zip2–Zip6, CombineLatest2–CombineLatest5, Race |
Merge multiple observables |
| Error | Catch, OnErrorReturn, OnErrorResumeNextWith, Retry, RetryWithConfig |
Recover from errors |
| Timing | Delay, DelayEach, Timeout, ThrottleTime, SampleTime, BufferWithTime |
Control emission timing |
| Side effect | Tap/Do, TapOnNext, TapOnError, TapOnComplete |
Observe without altering stream |
| Terminal | Collect, ToSlice, ToChannel, ToMap |
Consume stream into Go types |
Use typed Pipe2, Pipe3 ... Pipe25 for compile-time type safety across operator chains. The untyped Pipe uses any and loses type checking.
For the complete operator catalog (150+ operators with signatures), see Operators Guide.
Common Mistakes
| Mistake | Why it fails | Fix |
|---|---|---|
Using ro.OnNext() without error handler |
Errors are silently dropped — bugs hide in production | Use ro.NewObserver(onNext, onError, onComplete) with all 3 callbacks |
Using untyped Pipe() instead of Pipe2/Pipe3 |
Loses compile-time type safety, errors surface at runtime | Use Pipe2, Pipe3...Pipe25 for typed operator chains |
Forgetting .Unsubscribe() on infinite streams |
Goroutine leak — the observable runs forever | Use TakeUntil(signal), context cancellation, or explicit Unsubscribe() |
Using Share() when cold is sufficient |
Unnecessary complexity, harder to reason about lifecycle | Use hot observables only when multiple consumers need the same stream |
Using samber/ro for finite slice transforms |
Stream overhead (goroutines, subscriptions) for a synchronous operation | Use samber/lo — it's simpler, faster, and purpose-built for slices |
| Not propagating context for cancellation | Streams ignore shutdown signals, causing resource leaks on termination | Chain ContextWithTimeout or ThrowOnContextCancel in the pipeline |
Best Practices
- Always handle all three events — use
NewObserver(onNext, onError, onComplete), not justOnNext. Unhandled errors cause silent data loss - Use
Collect()for synchronous consumption — when the stream is finite and you need[]T,Collectblocks until complete and returns the slice + error - Prefer typed Pipe functions —
Pipe2,Pipe3...Pipe25catch type mismatches at compile time. Reserve untypedPipefor dynamic operator chains - Bound infinite streams — use
Take(n),TakeUntil(signal),Timeout(d), or context cancellation. Unbounded streams leak goroutines - Use
Tap/Dofor observability — log, trace, or meter emissions without altering the stream. ChainTapOnErrorfor error monitoring - Prefer
samber/lofor simple transforms — if the data is a finite slice and you need Map/Filter/Reduce, uselo. Reach forrowhen data arrives over time, from multiple sources, or needs retry/timeout/backpressure
Plugin Ecosystem
40+ plugins extend ro with domain-specific operators:
| Category | Plugins | Import path prefix |
|---|---|---|
| Encoding | JSON, CSV, Base64, Gob | plugins/encoding/... |
| Network | HTTP, I/O, FSNotify | plugins/http, plugins/io, plugins/fsnotify |
| Scheduling | Cron, ICS | plugins/cron, plugins/ics |
| Observability | Zap, Slog, Zerolog, Logrus, Sentry, Oops | plugins/observability/..., plugins/samber/oops |
| Rate limiting | Native, Ulule | plugins/ratelimit/... |
| Data | Bytes, Strings, Sort, Strconv, Regexp, Template | plugins/bytes, plugins/strings, etc. |
| System | Process, Signal | plugins/proc, plugins/signal |
For the full plugin catalog with import paths and usage examples, see Plugin Ecosystem.
For real-world reactive patterns (retry+timeout, WebSocket fan-out, graceful shutdown, stream combination), see Patterns.
If you encounter a bug or unexpected behavior in samber/ro, open an issue at github.com/samber/ro/issues.
Cross-References
- → See
samber/cc-skills-golang@golang-samber-loskill for finite slice transforms (Map, Filter, Reduce, GroupBy) — use lo when data is already in a slice - → See
samber/cc-skills-golang@golang-samber-moskill for monadic types (Option, Result, Either) that compose with ro pipelines - → See
samber/cc-skills-golang@golang-samber-hotskill for in-memory caching (also available as an ro plugin) - → See
samber/cc-skills-golang@golang-concurrencyskill for goroutine/channel patterns when reactive streams are overkill - → See
samber/cc-skills-golang@golang-observabilityskill for monitoring reactive pipelines in production