rust-actor

SKILL.md

Solution Patterns

Pattern 1: Basic Actor Implementation

use tokio::sync::mpsc::{channel, Sender, Receiver};
use std::collections::HashMap;

// Actor trait
trait Actor: Send + 'static {
    type Message: Send + 'static;
    type Error: std::error::Error;

    fn receive(&mut self, ctx: &mut Context<Self>, msg: Self::Message);
}

// Actor context
struct Context<A: Actor> {
    mailbox: Receiver<A::Message>,
    sender: Sender<A::Message>,
    state: ActorState,
    supervisor: Option<SupervisorAddr>,
}

#[derive(Debug, Clone)]
enum ActorState {
    Starting,
    Running,
    Restarting,
    Stopping,
    Stopped,
}

// Address handle for sending messages
#[derive(Clone)]
struct Addr<A: Actor> {
    sender: Sender<A::Message>,
}

impl<A: Actor> Addr<A> {
    pub async fn send(&self, msg: A::Message) -> Result<(), SendError> {
        self.sender.send(msg).await
            .map_err(|_| SendError::Disconnected)
    }
}

// Example actor
struct CounterActor {
    count: usize,
}

#[derive(Debug)]
enum CounterMessage {
    Increment,
    Decrement,
    GetCount(Sender<usize>),
}

impl Actor for CounterActor {
    type Message = CounterMessage;
    type Error = std::io::Error;

    fn receive(&mut self, ctx: &mut Context<Self>, msg: Self::Message) {
        match msg {
            CounterMessage::Increment => {
                self.count += 1;
            }
            CounterMessage::Decrement => {
                self.count = self.count.saturating_sub(1);
            }
            CounterMessage::GetCount(reply) => {
                let _ = reply.try_send(self.count);
            }
        }
    }
}

Pattern 2: Request-Response Pattern

use tokio::sync::oneshot;
use std::time::Duration;

// Request wrapper with response channel
struct Request<M, R> {
    payload: M,
    response: oneshot::Sender<R>,
}

// Synchronous request with timeout
async fn request<A: Actor, R>(
    actor: &Addr<A>,
    msg: A::Message,
    timeout: Duration,
) -> Result<R, RequestError> {
    let (tx, rx) = oneshot::channel();

    let request = Request {
        payload: msg,
        response: tx,
    };

    actor.send(request).await
        .map_err(|_| RequestError::SendFailed)?;

    tokio::time::timeout(timeout, rx).await
        .map_err(|_| RequestError::Timeout)?
        .map_err(|_| RequestError::Canceled)
}

// Usage example
async fn example_request_response() {
    let (tx, rx) = oneshot::channel();

    let addr = counter_actor.start();
    addr.send(CounterMessage::GetCount(tx)).await.unwrap();

    let count = rx.await.unwrap();
    println!("Count: {}", count);
}

Pattern 3: Supervision Tree

use std::collections::HashMap;

#[derive(Debug, Clone)]
enum SupervisionStrategy {
    OneForOne,    // Only restart failed child
    AllForOne,    // Restart all children if one fails
    RestForOne,   // Restart failed child and all after it
}

struct Supervisor {
    children: HashMap<ChildId, Child>,
    strategy: SupervisionStrategy,
    max_restarts: u32,
    window: Duration,
}

struct Child {
    id: ChildId,
    addr: Box<dyn std::any::Any + Send>,
    restart_count: u32,
    last_restart: Option<Instant>,
    spec: ChildSpec,
}

struct ChildSpec {
    factory: Box<dyn Fn() -> Box<dyn std::any::Any + Send>>,
    restart_strategy: RestartStrategy,
}

#[derive(Debug, Clone)]
enum RestartStrategy {
    Permanent,   // Always restart
    Temporary,   // Never restart
    Transient,   // Restart only on abnormal exit
}

impl Supervisor {
    fn new(strategy: SupervisionStrategy, max_restarts: u32, window: Duration) -> Self {
        Self {
            children: HashMap::new(),
            strategy,
            max_restarts,
            window,
        }
    }

    async fn handle_child_error(&mut self, child_id: ChildId, error: &dyn std::error::Error) {
        log::warn!("Child {} failed: {}", child_id, error);

        match self.strategy {
            SupervisionStrategy::OneForOne => {
                self.restart_child(child_id).await;
            }
            SupervisionStrategy::AllForOne => {
                for id in self.children.keys().cloned().collect::<Vec<_>>() {
                    self.stop_child(id).await;
                }
                for id in self.children.keys().cloned().collect::<Vec<_>>() {
                    self.restart_child(id).await;
                }
            }
            SupervisionStrategy::RestForOne => {
                let ids: Vec<_> = self.children.keys()
                    .filter(|&&id| id >= child_id)
                    .cloned()
                    .collect();

                for id in ids {
                    self.stop_child(id).await;
                    self.restart_child(id).await;
                }
            }
        }
    }

    async fn restart_child(&mut self, child_id: ChildId) -> bool {
        if let Some(child) = self.children.get_mut(&child_id) {
            child.restart_count += 1;

            // Check restart rate limit
            if self.should_give_up(child) {
                log::error!("Child {} exceeded max restarts, giving up", child_id);
                self.stop_child(child_id).await;
                return false;
            }

            child.last_restart = Some(Instant::now());
            log::info!("Restarting child {}", child_id);

            // Factory creates new instance
            let new_instance = (child.spec.factory)();
            child.addr = new_instance;

            true
        } else {
            false
        }
    }

    fn should_give_up(&self, child: &Child) -> bool {
        if child.restart_count > self.max_restarts {
            if let Some(last_restart) = child.last_restart {
                if last_restart.elapsed() < self.window {
                    return true;
                }
            }
        }
        false
    }

    async fn stop_child(&mut self, child_id: ChildId) {
        if let Some(child) = self.children.remove(&child_id) {
            log::info!("Stopping child {}", child_id);
            // Send stop signal
        }
    }
}

Pattern 4: Deadlock Prevention with Bounded Mailboxes

use tokio::sync::mpsc;

struct BoundedMailbox<A: Actor> {
    receiver: mpsc::Receiver<A::Message>,
    sender: mpsc::Sender<A::Message>,
    capacity: usize,
}

impl<A: Actor> BoundedMailbox<A> {
    fn new(capacity: usize) -> Self {
        let (sender, receiver) = mpsc::channel(capacity);
        Self {
            receiver,
            sender,
            capacity,
        }
    }

    fn capacity(&self) -> usize {
        self.capacity
    }

    async fn send_with_backpressure(&self, msg: A::Message) -> Result<(), SendError> {
        // Will wait if mailbox is full (backpressure)
        self.sender.send(msg).await
            .map_err(|_| SendError::Disconnected)
    }

    fn try_send(&self, msg: A::Message) -> Result<(), TrySendError<A::Message>> {
        // Returns immediately if mailbox is full
        self.sender.try_send(msg)
            .map_err(|e| match e {
                mpsc::error::TrySendError::Full(msg) => TrySendError::Full(msg),
                mpsc::error::TrySendError::Closed(msg) => TrySendError::Disconnected(msg),
            })
    }
}

// Usage
async fn example_bounded_mailbox() {
    let mailbox: BoundedMailbox<CounterActor> = BoundedMailbox::new(100);

    // This will block if mailbox is full
    mailbox.send_with_backpressure(CounterMessage::Increment).await.unwrap();

    // This returns error immediately if full
    match mailbox.try_send(CounterMessage::Increment) {
        Ok(_) => println!("Sent"),
        Err(TrySendError::Full(_)) => println!("Mailbox full"),
        Err(TrySendError::Disconnected(_)) => println!("Actor stopped"),
    }
}

Pattern 5: Actor Lifecycle Management

trait LifecycleHandler: Actor {
    fn pre_start(&mut self, ctx: &mut Context<Self>) {
        // Initialize resources
        log::info!("Actor starting");
    }

    fn post_start(&mut self, ctx: &mut Context<Self>) {
        // Start timers, establish connections
        log::info!("Actor started");
    }

    fn pre_restart(&mut self, ctx: &mut Context<Self>, error: &dyn std::error::Error) {
        // Clean up resources before restart
        log::warn!("Actor restarting due to: {}", error);
    }

    fn post_restart(&mut self, ctx: &mut Context<Self>) {
        // Reinitialize after restart
        log::info!("Actor restarted");
    }

    fn post_stop(&mut self) {
        // Save state, close connections
        log::info!("Actor stopped");
    }
}

// Example with lifecycle hooks
struct DatabaseActor {
    connection: Option<DatabaseConnection>,
}

impl LifecycleHandler for DatabaseActor {
    fn pre_start(&mut self, ctx: &mut Context<Self>) {
        // Establish database connection
        self.connection = Some(DatabaseConnection::new());
    }

    fn pre_restart(&mut self, ctx: &mut Context<Self>, error: &dyn std::error::Error) {
        // Close existing connection
        if let Some(conn) = self.connection.take() {
            conn.close();
        }
    }

    fn post_stop(&mut self) {
        // Ensure connection is closed
        if let Some(conn) = self.connection.take() {
            conn.close();
        }
    }
}

Actor vs Thread Model

Feature Thread Model Actor Model
State sharing Shared memory + locks Isolated, message passing
Deadlock risk High (lock ordering) Low (message queues)
Scalability Limited by thread count Millions of actors possible
Fault handling Manual Supervision trees
Debugging Hard (race conditions) Easier (message sequence)
Memory Shared Isolated per actor

Workflow

Step 1: Design Actor Hierarchy

Design questions:
  → What state needs isolation? Each isolated state = 1 actor
  → What operations need sequential processing? Group in same actor
  → What can fail independently? Separate actors with supervision
  → What needs to scale? Use actor pool pattern

Step 2: Choose Messaging Pattern

Message patterns:
  → Fire-and-forget: Async send, no response
  → Request-response: Oneshot channel for reply
  → Streaming: Channel for multiple responses
  → Broadcast: Multiple recipients

Step 3: Set Up Supervision

Supervision strategy:
  → OneForOne: Independent actors (default choice)
  → AllForOne: Tightly coupled actors needing consistent state
  → RestForOne: Sequential dependencies

Restart policy:
  → Permanent: Critical actors (always restart)
  → Temporary: One-time tasks (never restart)
  → Transient: Restart on errors only

Review Checklist

When implementing actor systems:

  • Each actor has clear single responsibility
  • Mailboxes have bounded capacity (prevent memory leaks)
  • Message types are Send + 'static
  • No shared mutable state between actors
  • Supervision strategy appropriate for error handling
  • Actor lifecycle properly managed (cleanup in post_stop)
  • No circular message dependencies (deadlock risk)
  • Timeouts on request-response patterns
  • Monitoring tracks mailbox size and message latency
  • Backpressure handled when mailbox is full

Verification Commands

# Run tests with actor system
cargo test --test actor_tests

# Check for deadlocks with timeout
cargo test --test deadlock_tests -- --test-threads=1 --nocapture

# Profile actor message throughput
cargo bench --bench actor_bench

# Check memory usage under load
cargo run --release --bin load_test

# Monitor actor lifecycle events
RUST_LOG=debug cargo run

Common Pitfalls

1. Circular Message Dependencies (Deadlock)

Symptom: Actors waiting for each other's responses

// ❌ Bad: Actor A waits for Actor B, Actor B waits for Actor A
async fn actor_a_handler(&mut self, msg: Message) {
    let response = self.actor_b.request(msg).await;  // Blocks
    // Actor A is blocked, can't process Actor B's request
}

async fn actor_b_handler(&mut self, msg: Message) {
    let response = self.actor_a.request(msg).await;  // Blocks
    // Deadlock!
}

// ✅ Good: Use timeouts and avoid circular dependencies
async fn actor_a_handler(&mut self, msg: Message) {
    match tokio::time::timeout(
        Duration::from_secs(5),
        self.actor_b.request(msg)
    ).await {
        Ok(response) => { /* handle response */ }
        Err(_) => { /* timeout, handle error */ }
    }
}

// Better: redesign to avoid circular dependency

2. Unbounded Mailbox Growth

Symptom: Memory grows unbounded, OOM crashes

// ❌ Bad: unbounded channel
let (tx, rx) = mpsc::unbounded_channel();

// Slow consumer can't keep up, mailbox grows forever

// ✅ Good: bounded channel with backpressure
let (tx, rx) = mpsc::channel(100);  // Max 100 messages

// Sender will wait when mailbox is full (backpressure)
tx.send(msg).await?;

3. Blocking Operations in Actor

Symptom: Actor becomes unresponsive, messages pile up

// ❌ Bad: blocking I/O in actor
impl Actor for MyActor {
    fn receive(&mut self, ctx: &mut Context<Self>, msg: Self::Message) {
        // Blocks entire actor!
        let data = std::fs::read("file.txt").unwrap();
        // Other messages can't be processed
    }
}

// ✅ Good: use async I/O or spawn blocking task
impl Actor for MyActor {
    fn receive(&mut self, ctx: &mut Context<Self>, msg: Self::Message) {
        let addr = ctx.address();
        tokio::spawn(async move {
            // Runs in separate task
            let data = tokio::fs::read("file.txt").await.unwrap();
            addr.send(ProcessData(data)).await;
        });
        // Actor continues processing messages
    }
}

Actix Framework Example

use actix::{Actor, Handler, Message, Context};

struct MyActor {
    counter: usize,
}

impl Actor for MyActor {
    type Context = Context<Self>;

    fn started(&mut self, _ctx: &mut Self::Context) {
        println!("Actor started");
    }

    fn stopped(&mut self, _ctx: &mut Self::Context) {
        println!("Actor stopped");
    }
}

#[derive(Message)]
#[rtype(result = "usize")]
struct Increment;

impl Handler<Increment> for MyActor {
    type Result = usize;

    fn handle(&mut self, _msg: Increment, _ctx: &mut Self::Context) -> Self::Result {
        self.counter += 1;
        self.counter
    }
}

// Usage
#[actix_rt::main]
async fn main() {
    let actor = MyActor { counter: 0 }.start();
    let result = actor.send(Increment).await.unwrap();
    println!("Counter: {}", result);
}

Related Skills

  • rust-concurrency - Concurrency primitives and patterns
  • rust-async - Async message handling
  • rust-error - Error propagation in actor systems
  • rust-channel - Channel-based communication
  • rust-performance - Actor system optimization

Localized Reference

  • Chinese version: SKILL_ZH.md - 完整中文版本,包含所有内容
Weekly Installs
7
GitHub Stars
20
First Seen
Jan 30, 2026
Installed on
gemini-cli5
claude-code4
github-copilot4
codex4
amp4
cline4