implementing-io-pipelines
SKILL.md
.NET Streaming (System.IO.Pipelines)
A guide for System.IO.Pipelines API for high-performance I/O pipelines.
Quick Reference: See QUICKREF.md for essential patterns at a glance.
1. Core Concepts
| Concept | Description |
|---|---|
Pipe |
Memory buffer-based read/write pipe |
PipeReader |
Read data from pipe |
PipeWriter |
Write data to pipe |
ReadOnlySequence<T> |
Non-contiguous memory sequence |
2. Advantages
- Zero-copy: Minimizes unnecessary memory copying
- Backpressure control: Speed regulation between producer and consumer
- Memory pooling: Automatic buffer reuse
- Async I/O: Efficient asynchronous processing
3. Basic Usage
using System.IO.Pipelines;
public sealed class PipelineProcessor
{
public async Task ProcessAsync(Stream stream)
{
var pipe = new Pipe();
// Run Writer and Reader concurrently
var writing = FillPipeAsync(stream, pipe.Writer);
var reading = ReadPipeAsync(pipe.Reader);
await Task.WhenAll(writing, reading);
}
private async Task FillPipeAsync(Stream stream, PipeWriter writer)
{
const int minimumBufferSize = 512;
while (true)
{
// Acquire buffer from memory pool
Memory<byte> memory = writer.GetMemory(minimumBufferSize);
int bytesRead = await stream.ReadAsync(memory);
if (bytesRead == 0)
break;
// Notify bytes written
writer.Advance(bytesRead);
// Flush data and notify Reader
FlushResult result = await writer.FlushAsync();
if (result.IsCompleted)
break;
}
// Signal write completion
await writer.CompleteAsync();
}
private async Task ReadPipeAsync(PipeReader reader)
{
while (true)
{
ReadResult result = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = result.Buffer;
// Process buffer
ProcessBuffer(buffer);
// Notify consumption up to processed position
reader.AdvanceTo(buffer.End);
if (result.IsCompleted)
break;
}
// Signal read completion
await reader.CompleteAsync();
}
}
4. Line-by-Line Parsing
private async Task ReadLinesAsync(PipeReader reader)
{
while (true)
{
ReadResult result = await reader.ReadAsync();
ReadOnlySequence<byte> buffer = result.Buffer;
while (TryReadLine(ref buffer, out ReadOnlySequence<byte> line))
{
ProcessLine(line);
}
// Notify unprocessed data position
reader.AdvanceTo(buffer.Start, buffer.End);
if (result.IsCompleted)
break;
}
await reader.CompleteAsync();
}
private bool TryReadLine(
ref ReadOnlySequence<byte> buffer,
out ReadOnlySequence<byte> line)
{
// Find newline
SequencePosition? position = buffer.PositionOf((byte)'\n');
if (position is null)
{
line = default;
return false;
}
// Slice up to newline
line = buffer.Slice(0, position.Value);
// Move buffer past newline
buffer = buffer.Slice(buffer.GetPosition(1, position.Value));
return true;
}
5. Processing ReadOnlySequence
private void ProcessBuffer(ReadOnlySequence<byte> buffer)
{
if (buffer.IsSingleSegment)
{
// Single segment - direct access
ProcessSpan(buffer.FirstSpan);
}
else
{
// Multiple segments - iteration required
foreach (var segment in buffer)
{
ProcessSpan(segment.Span);
}
}
}
6. Network I/O Integration
public async Task ProcessSocketAsync(Socket socket)
{
var pipe = new Pipe();
var writing = ReceiveAsync(socket, pipe.Writer);
var reading = ProcessAsync(pipe.Reader);
await Task.WhenAll(writing, reading);
}
private async Task ReceiveAsync(Socket socket, PipeWriter writer)
{
while (true)
{
Memory<byte> memory = writer.GetMemory(4096);
int bytesReceived = await socket.ReceiveAsync(
memory,
SocketFlags.None);
if (bytesReceived == 0)
break;
writer.Advance(bytesReceived);
FlushResult result = await writer.FlushAsync();
if (result.IsCompleted)
break;
}
await writer.CompleteAsync();
}
7. PipeOptions Configuration
var pipeOptions = new PipeOptions(
pool: MemoryPool<byte>.Shared, // Memory pool
readerScheduler: PipeScheduler.ThreadPool, // Reader scheduler
writerScheduler: PipeScheduler.ThreadPool, // Writer scheduler
pauseWriterThreshold: 64 * 1024, // Writer pause threshold
resumeWriterThreshold: 32 * 1024, // Writer resume threshold
minimumSegmentSize: 4096, // Minimum segment size
useSynchronizationContext: false
);
var pipe = new Pipe(pipeOptions);
8. Required NuGet Package
<ItemGroup>
<!-- Included in BCL for .NET Core 3.0+ -->
<PackageReference Include="System.IO.Pipelines" Version="9.0.*" />
</ItemGroup>
9. Important Notes
AdvanceTo Call Required
// Must call AdvanceTo after ReadAsync
ReadResult result = await reader.ReadAsync();
// ... processing ...
reader.AdvanceTo(consumed, examined);
Buffer Lifetime
// ❌ Bad example: Saving buffer after ReadAsync
ReadOnlySequence<byte> saved;
var result = await reader.ReadAsync();
saved = result.Buffer; // Dangerous! Invalidated after AdvanceTo
// ✅ Good example: Copy needed data
var copy = result.Buffer.ToArray();
reader.AdvanceTo(result.Buffer.End);
Completion Calls
// Must call CompleteAsync for both Writer and Reader
await writer.CompleteAsync();
await reader.CompleteAsync();
10. References
Weekly Installs
3
Repository
christian289/do…audecodeGitHub Stars
16
First Seen
Feb 28, 2026
Security Audits
Installed on
opencode3
gemini-cli3
codebuddy3
github-copilot3
codex3
kimi-cli3