streaming-patterns
SKILL.md
Streaming Patterns
Primary Primitive: iter.Seq2[T, error]
func (m *Model) Stream(ctx context.Context, msgs []schema.Message) iter.Seq2[schema.StreamChunk, error] {
return func(yield func(schema.StreamChunk, error) bool) {
stream, err := m.client.Stream(ctx, msgs)
if err != nil { yield(schema.StreamChunk{}, err); return }
defer stream.Close()
for {
select {
case <-ctx.Done(): yield(schema.StreamChunk{}, ctx.Err()); return
default:
}
chunk, err := stream.Recv()
if err == io.EOF { return }
if err != nil { yield(schema.StreamChunk{}, err); return }
if !yield(convertChunk(chunk), nil) { return } // consumer stopped
}
}
}
Composition
- Pipe:
func Pipe[A, B any](first iter.Seq2[A, error], transform func(A) (B, error)) iter.Seq2[B, error] - Collect: Stream to slice —
func Collect[T any](stream iter.Seq2[T, error]) ([]T, error) - Invoke from Stream: Stream, collect, return last.
- Fan-out:
iter.Pull2()to get next/stop, broadcast to N consumers. - BufferedStream: Channel-backed buffer for backpressure.
Rules
- Public API:
iter.Seq2[T, error]— never<-chan. - Internal goroutine communication: channels are fine.
- Always check context cancellation in producers.
yieldreturning false = consumer stopped — respect immediately.- Use
iter.Pull2only when pull semantics are genuinely needed.
Weekly Installs
8
Repository
lookatitude/beluga-aiGitHub Stars
9
First Seen
Feb 10, 2026
Security Audits
Installed on
codex8
cline7
github-copilot7
kimi-cli7
gemini-cli7
cursor7