rust-async
Solution Patterns
Pattern 1: Stream Processing
use tokio_stream::{self as stream, StreamExt};
async fn process_stream(stream: impl Stream<Item = Data>) {
stream
.chunks(100) // Batch processing
.for_each(|batch| async {
process_batch(batch).await;
})
.await;
}
When to use: Processing continuous data flows (websockets, file streams, API pagination).
Key insight: Streams are async iterators - pull-based, lazy evaluation.
Pattern 2: Backpressure Control
use tokio::sync::Semaphore;
use std::sync::Arc;
let semaphore = Arc::new(Semaphore::new(10)); // Max 10 concurrent
let stream = tokio_stream::iter(0..1000)
.map(|i| {
let permit = semaphore.clone().acquire_owned();
async move {
let _permit = permit.await?;
process(i).await
}
})
.buffer_unordered(100); // Max 100 buffered futures
When to use: Prevent overwhelming downstream systems or resource exhaustion.
Trade-offs: Adds latency but prevents overload.
Pattern 3: Select Multiplexing
use tokio::select;
use tokio::time::{sleep, Duration};
async fn multiplex() {
loop {
select! {
msg = receiver.recv() => {
if let Some(msg) = msg {
handle(msg).await;
} else {
break; // Channel closed
}
}
_ = sleep(Duration::from_secs(5)) => {
// Timeout handling
check_health().await;
}
else => break, // All branches complete
}
}
}
When to use: Waiting on multiple async operations, first-to-complete wins.
Gotcha: All branches must be cancellation-safe.
Pattern 4: Task Cancellation
use tokio::time::timeout;
use std::time::Duration;
async fn with_timeout() -> Result<Value, TimeoutError> {
timeout(Duration::from_secs(5), long_operation()).await
.map_err(|_| TimeoutError)?
}
// Cooperative cancellation
let mut task = tokio::spawn(async move {
loop {
// Check cancellation
tokio::task::yield_now().await; // Yield point
// Do work
if let Err(_) = work().await {
return;
}
}
});
// Cancel task
task.abort();
let _ = task.await; // Will return JoinError::Cancelled
When to use: Operations with time limits or user-requested cancellation.
Key insight: Cancellation is cooperative - requires yield points.
Workflow
Step 1: Choose Stream vs Iterator
Sync data source?
→ Use Iterator (more efficient)
Async data source (network, DB)?
→ Use Stream
Need backpressure?
→ Definitely Stream
Step 2: Design Concurrency Strategy
Sequential processing?
→ for_each / fold
Limited concurrency?
→ buffer_unordered(N) + Semaphore
Unlimited (dangerous)?
→ Use with extreme caution
Step 3: Handle Cancellation
Long-running task?
→ Add timeout wrapper
User-initiated?
→ Implement abort signal
Resource cleanup?
→ Use Drop or explicit cleanup
Join vs Try_Join
Join - Wait for All
use tokio::join;
// All operations run concurrently, wait for all to complete
let (a, b, c) = join!(
fetch_user(),
fetch_posts(),
fetch_comments()
);
// All values available, even if some operations failed
Use when: All results needed regardless of individual failures.
Try_Join - Fail Fast
use tokio::try_join;
// Stop on first error
let (a, b) = try_join!(
async_op_a(),
async_op_b()
)?;
// Both succeeded, or error from first failure
Use when: All operations must succeed, fail fast on errors.
Combined Pattern
async fn fetch_dashboard() -> Result<Dashboard, Error> {
let (user, posts, comments) = try_join!(
fetch_user(),
fetch_posts(),
fetch_comments()
)?;
Ok(Dashboard { user, posts, comments })
}
Common Errors & Solutions
| Error | Cause | Solution |
|---|---|---|
.await forgotten |
Future not polled | Check all async fn calls have .await |
| Cancellation unhandled | Task aborted mid-operation | Implement cooperative cancellation |
| Missing backpressure | Unbounded concurrency | Use Semaphore or buffer_unordered |
| Deadlock | Lock held across .await |
Minimize lock scope, drop before await |
| Async drop unsupported | Drop in async context | Use spawn for cleanup or blocking drop |
Backpressure Strategies
Strategy 1: Semaphore-Based
let sem = Arc::new(Semaphore::new(10));
stream
.map(|item| {
let sem = sem.clone();
async move {
let _permit = sem.acquire().await?;
process(item).await
}
})
.buffer_unordered(10)
Pros: Precise control, easy to reason about Cons: Semaphore overhead
Strategy 2: Buffered Stream
stream
.chunks(100)
.for_each_concurrent(5, |batch| async move {
process_batch(batch).await
})
.await
Pros: Simple, built-in to StreamExt Cons: Less fine-grained control
Strategy 3: Channel-Based
let (tx, mut rx) = mpsc::channel(100); // Buffer size = backpressure
// Producer respects backpressure
tx.send(item).await?;
// Consumer pulls at own pace
while let Some(item) = rx.recv().await {
process(item).await;
}
Pros: Natural backpressure from bounded channel Cons: Extra copy/move overhead
Performance Tips
| Pattern | Performance Insight |
|---|---|
select! |
More lightweight than multiple tokio::spawn |
buffer_unordered |
More flexible than for_each_concurrent |
.chunks() |
Reduces per-item overhead for bulk operations |
| Lock-free at await | Never hold locks across .await points |
spawn_blocking |
Use for CPU-bound work in async context |
Advanced: Future Trait
Implementing Future
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
struct Delay {
when: Instant,
}
impl Future for Delay {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
if Instant::now() >= self.when {
Poll::Ready(())
} else {
// Wake me later
cx.waker().wake_by_ref();
Poll::Pending
}
}
}
When to implement: Custom async primitives, advanced control flow.
Gotcha: Must properly handle wakeup notifications.
Review Checklist
When reviewing async code:
- All async functions are properly
.awaited - Backpressure mechanisms in place for streams
- Cancellation handled cooperatively (yield points)
- No locks held across
.awaitpoints - Stream processing uses appropriate concurrency limits
- Error propagation uses
?or proper handling -
select!branches are cancellation-safe - Long-running tasks have timeout protection
- Resource cleanup happens even on cancellation
- CPU-bound work uses
spawn_blocking
Verification Commands
# Check async code compilation
cargo check
# Run async tests
cargo test
# Check for common async mistakes
cargo clippy -- -W clippy::await_holding_lock
# Test with tokio-console for debugging
RUSTFLAGS="--cfg tokio_unstable" cargo run
# Profile async runtime
cargo flamegraph --bin your-app
Common Pitfalls
1. Forgotten Await
Symptom: Future never executes, unexpected behavior
// ❌ Bad: future not awaited
async fn bad() {
fetch_data(); // Returns Future, never runs!
}
// ✅ Good
async fn good() {
fetch_data().await; // Actually runs
}
2. Unbounded Concurrency
Symptom: Resource exhaustion, system overload
// ❌ Bad: all operations run concurrently
let futures: Vec<_> = urls.iter()
.map(|url| fetch(url))
.collect();
let results = join_all(futures).await;
// ✅ Good: limited concurrency
use futures::stream::{self, StreamExt};
let results = stream::iter(urls)
.map(|url| fetch(url))
.buffer_unordered(10) // Max 10 concurrent
.collect::<Vec<_>>()
.await;
3. Lock Across Await
Symptom: Deadlock, "future cannot be sent between threads safely"
// ❌ Bad: lock held during await
let guard = mutex.lock().await;
some_async_op().await; // DANGER
drop(guard);
// ✅ Good: drop lock before await
let value = {
let guard = mutex.lock().await;
guard.clone()
}; // lock dropped
some_async_op().await;
4. Async Drop
Symptom: Cannot await in Drop impl
// ❌ Bad: async operation in Drop
impl Drop for Resource {
fn drop(&mut self) {
// Cannot await here!
self.cleanup().await; // Won't compile
}
}
// ✅ Good: explicit async cleanup
impl Resource {
async fn cleanup(self) {
// Async cleanup logic
}
}
// Or spawn cleanup task
impl Drop for Resource {
fn drop(&mut self) {
let handle = self.handle.take();
tokio::spawn(async move {
if let Some(h) = handle {
h.cleanup().await;
}
});
}
}
Related Skills
- rust-concurrency - Thread safety, Send/Sync basics
- rust-async-pattern - Async architecture patterns
- rust-ownership - Lifetime issues in async contexts
- rust-pin - Pin and self-referential types
- rust-performance - Async performance optimization
- rust-web - Async web frameworks (axum, actix)
Localized Reference
- Chinese version: SKILL_ZH.md - 完整中文版本,包含所有内容