outbox-pattern
SKILL.md
Outbox Pattern Implementation
Overview
The Outbox pattern ensures reliable event processing:
- Atomic persistence - Events saved in same transaction as aggregate
- Guaranteed delivery - Events processed even if app crashes
- Eventual consistency - Async processing with retry
- Idempotency - Handle duplicate processing gracefully
Quick Reference
| Component | Purpose |
|---|---|
OutboxMessage |
Persisted event entity |
OutboxMessageConfiguration |
EF Core mapping |
ProcessOutboxMessagesJob |
Background processor (Quartz) |
IdempotentDomainEventHandler |
Deduplicated handler wrapper |
OutboxConsumer |
Alternative direct DB poller |
Outbox Structure
/Infrastructure/
├── Outbox/
│ ├── OutboxMessage.cs
│ ├── OutboxMessageConfiguration.cs
│ ├── ProcessOutboxMessagesJob.cs
│ ├── ProcessOutboxMessagesJobSetup.cs
│ └── IdempotentDomainEventHandler.cs
└── ApplicationDbContext.cs
Template: Outbox Message Entity
// src/{name}.infrastructure/Outbox/OutboxMessage.cs
namespace {name}.infrastructure.outbox;
/// <summary>
/// Represents a domain event stored for reliable delivery
/// </summary>
public sealed class OutboxMessage
{
public OutboxMessage()
{
}
public OutboxMessage(Guid id, string type, string content, DateTime occurredOnUtc)
{
Id = id;
Type = type;
Content = content;
OccurredOnUtc = occurredOnUtc;
}
/// <summary>
/// Unique identifier for this message
/// </summary>
public Guid Id { get; set; }
/// <summary>
/// Assembly-qualified type name of the domain event
/// </summary>
public string Type { get; set; } = string.Empty;
/// <summary>
/// JSON-serialized event content
/// </summary>
public string Content { get; set; } = string.Empty;
/// <summary>
/// When the event originally occurred
/// </summary>
public DateTime OccurredOnUtc { get; set; }
/// <summary>
/// When the message was successfully processed (null if not yet processed)
/// </summary>
public DateTime? ProcessedOnUtc { get; set; }
/// <summary>
/// Error message if processing failed
/// </summary>
public string? Error { get; set; }
/// <summary>
/// Number of processing attempts
/// </summary>
public int RetryCount { get; set; }
}
Template: EF Core Configuration
// src/{name}.infrastructure/Outbox/OutboxMessageConfiguration.cs
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Metadata.Builders;
namespace {name}.infrastructure.outbox;
internal sealed class OutboxMessageConfiguration
: IEntityTypeConfiguration<OutboxMessage>
{
public void Configure(EntityTypeBuilder<OutboxMessage> builder)
{
builder.ToTable("outbox_message");
builder.HasKey(o => o.Id);
builder.Property(o => o.Id)
.ValueGeneratedNever();
builder.Property(o => o.Type)
.HasMaxLength(500)
.IsRequired();
builder.Property(o => o.Content)
.HasColumnType("jsonb") // PostgreSQL JSONB
.IsRequired();
builder.Property(o => o.OccurredOnUtc)
.IsRequired();
builder.Property(o => o.ProcessedOnUtc);
builder.Property(o => o.Error)
.HasColumnType("text");
builder.Property(o => o.RetryCount)
.HasDefaultValue(0);
// Index for efficient polling of unprocessed messages
builder.HasIndex(o => o.ProcessedOnUtc)
.HasFilter("processed_on_utc IS NULL")
.HasDatabaseName("ix_outbox_message_unprocessed");
// Index for cleanup of old processed messages
builder.HasIndex(o => o.ProcessedOnUtc)
.HasFilter("processed_on_utc IS NOT NULL")
.HasDatabaseName("ix_outbox_message_processed");
}
}
Template: DbContext Integration
// src/{name}.infrastructure/ApplicationDbContext.cs
using System.Text.Json;
using Microsoft.EntityFrameworkCore;
using {name}.domain.abstractions;
using {name}.infrastructure.outbox;
namespace {name}.infrastructure;
public sealed class ApplicationDbContext : DbContext, IUnitOfWork
{
private static readonly JsonSerializerOptions JsonOptions = new()
{
PropertyNamingPolicy = JsonNamingPolicy.CamelCase,
WriteIndented = false
};
public ApplicationDbContext(DbContextOptions<ApplicationDbContext> options)
: base(options)
{
}
public DbSet<OutboxMessage> OutboxMessages => Set<OutboxMessage>();
protected override void OnModelCreating(ModelBuilder modelBuilder)
{
modelBuilder.ApplyConfigurationsFromAssembly(typeof(ApplicationDbContext).Assembly);
base.OnModelCreating(modelBuilder);
}
public override async Task<int> SaveChangesAsync(CancellationToken cancellationToken = default)
{
// ═══════════════════════════════════════════════════════════════
// CRITICAL: Add domain events to outbox BEFORE SaveChanges
// This ensures atomic persistence - events saved in same transaction
// ═══════════════════════════════════════════════════════════════
ConvertDomainEventsToOutboxMessages();
return await base.SaveChangesAsync(cancellationToken);
}
private void ConvertDomainEventsToOutboxMessages()
{
// Get all entities with domain events
var entitiesWithEvents = ChangeTracker
.Entries<Entity>()
.Where(e => e.Entity.GetDomainEvents().Any())
.Select(e => e.Entity)
.ToList();
// Extract all domain events
var domainEvents = entitiesWithEvents
.SelectMany(e => e.GetDomainEvents())
.ToList();
// Clear events from entities (they're now in outbox)
foreach (var entity in entitiesWithEvents)
{
entity.ClearDomainEvents();
}
// Convert to outbox messages
foreach (var domainEvent in domainEvents)
{
var outboxMessage = new OutboxMessage
{
Id = Guid.NewGuid(),
Type = domainEvent.GetType().AssemblyQualifiedName!,
Content = JsonSerializer.Serialize(
domainEvent,
domainEvent.GetType(),
JsonOptions),
OccurredOnUtc = DateTime.UtcNow
};
OutboxMessages.Add(outboxMessage);
}
}
}
Template: Outbox Processor Job (Quartz)
// src/{name}.infrastructure/Outbox/ProcessOutboxMessagesJob.cs
using System.Text.Json;
using MediatR;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging;
using Quartz;
using {name}.domain.abstractions;
namespace {name}.infrastructure.outbox;
/// <summary>
/// Background job that processes outbox messages
/// Uses Quartz for scheduling with configurable interval
/// </summary>
[DisallowConcurrentExecution] // Prevent parallel execution
public sealed class ProcessOutboxMessagesJob : IJob
{
private const int BatchSize = 20;
private const int MaxRetries = 3;
private static readonly JsonSerializerOptions JsonOptions = new()
{
PropertyNamingPolicy = JsonNamingPolicy.CamelCase
};
private readonly ApplicationDbContext _dbContext;
private readonly IPublisher _publisher;
private readonly ILogger<ProcessOutboxMessagesJob> _logger;
public ProcessOutboxMessagesJob(
ApplicationDbContext dbContext,
IPublisher publisher,
ILogger<ProcessOutboxMessagesJob> logger)
{
_dbContext = dbContext;
_publisher = publisher;
_logger = logger;
}
public async Task Execute(IJobExecutionContext context)
{
_logger.LogDebug("Starting outbox message processing...");
var messages = await GetUnprocessedMessages(context.CancellationToken);
if (!messages.Any())
{
_logger.LogDebug("No outbox messages to process");
return;
}
_logger.LogInformation(
"Processing {Count} outbox messages",
messages.Count);
foreach (var message in messages)
{
await ProcessMessage(message, context.CancellationToken);
}
await _dbContext.SaveChangesAsync(context.CancellationToken);
_logger.LogInformation("Completed outbox message processing");
}
private async Task<List<OutboxMessage>> GetUnprocessedMessages(
CancellationToken cancellationToken)
{
return await _dbContext.OutboxMessages
.Where(m => m.ProcessedOnUtc == null)
.Where(m => m.RetryCount < MaxRetries)
.OrderBy(m => m.OccurredOnUtc)
.Take(BatchSize)
.ToListAsync(cancellationToken);
}
private async Task ProcessMessage(
OutboxMessage message,
CancellationToken cancellationToken)
{
try
{
_logger.LogDebug(
"Processing outbox message {MessageId} of type {Type}",
message.Id,
message.Type);
// Resolve the event type
var eventType = Type.GetType(message.Type);
if (eventType is null)
{
_logger.LogError(
"Could not resolve type {Type} for message {MessageId}",
message.Type,
message.Id);
message.Error = $"Could not resolve type: {message.Type}";
message.ProcessedOnUtc = DateTime.UtcNow;
return;
}
// Deserialize the event
var domainEvent = JsonSerializer.Deserialize(
message.Content,
eventType,
JsonOptions) as IDomainEvent;
if (domainEvent is null)
{
_logger.LogError(
"Could not deserialize message {MessageId}",
message.Id);
message.Error = "Could not deserialize message content";
message.ProcessedOnUtc = DateTime.UtcNow;
return;
}
// Publish to MediatR handlers
await _publisher.Publish(domainEvent, cancellationToken);
// Mark as processed
message.ProcessedOnUtc = DateTime.UtcNow;
message.Error = null;
_logger.LogInformation(
"Successfully processed outbox message {MessageId}",
message.Id);
}
catch (Exception ex)
{
_logger.LogError(
ex,
"Error processing outbox message {MessageId}. Retry count: {RetryCount}",
message.Id,
message.RetryCount);
message.RetryCount++;
message.Error = ex.ToString();
// Mark as processed if max retries exceeded
if (message.RetryCount >= MaxRetries)
{
message.ProcessedOnUtc = DateTime.UtcNow;
_logger.LogError(
"Outbox message {MessageId} exceeded max retries and has been marked as failed",
message.Id);
}
}
}
}
Template: Job Configuration
// src/{name}.infrastructure/Outbox/ProcessOutboxMessagesJobSetup.cs
using Microsoft.Extensions.Options;
using Quartz;
namespace {name}.infrastructure.outbox;
internal sealed class ProcessOutboxMessagesJobSetup
: IConfigureOptions<QuartzOptions>
{
public void Configure(QuartzOptions options)
{
var jobKey = JobKey.Create(nameof(ProcessOutboxMessagesJob));
options
.AddJob<ProcessOutboxMessagesJob>(jobBuilder =>
jobBuilder.WithIdentity(jobKey))
.AddTrigger(triggerBuilder =>
triggerBuilder
.ForJob(jobKey)
.WithSimpleSchedule(schedule =>
schedule
.WithIntervalInSeconds(10) // Poll every 10 seconds
.RepeatForever()));
}
}
Template: Idempotent Event Handler Wrapper
// src/{name}.infrastructure/Outbox/IdempotentDomainEventHandler.cs
using MediatR;
using Microsoft.EntityFrameworkCore;
using {name}.domain.abstractions;
namespace {name}.infrastructure.outbox;
/// <summary>
/// Wrapper that ensures domain events are processed only once
/// Uses a separate tracking table to detect duplicates
/// </summary>
public abstract class IdempotentDomainEventHandler<TEvent>
: INotificationHandler<TEvent>
where TEvent : IDomainEvent
{
private readonly ApplicationDbContext _dbContext;
protected IdempotentDomainEventHandler(ApplicationDbContext dbContext)
{
_dbContext = dbContext;
}
public async Task Handle(TEvent notification, CancellationToken cancellationToken)
{
var handlerName = GetType().Name;
var eventId = notification.Id;
// Check if already processed
var alreadyProcessed = await _dbContext
.Set<OutboxMessageConsumer>()
.AnyAsync(
c => c.EventId == eventId && c.HandlerName == handlerName,
cancellationToken);
if (alreadyProcessed)
{
return; // Skip duplicate processing
}
// Process the event
await HandleAsync(notification, cancellationToken);
// Mark as processed
_dbContext.Set<OutboxMessageConsumer>().Add(new OutboxMessageConsumer
{
Id = Guid.NewGuid(),
EventId = eventId,
HandlerName = handlerName,
ProcessedOnUtc = DateTime.UtcNow
});
await _dbContext.SaveChangesAsync(cancellationToken);
}
protected abstract Task HandleAsync(TEvent notification, CancellationToken cancellationToken);
}
/// <summary>
/// Tracks which handlers have processed which events
/// </summary>
public sealed class OutboxMessageConsumer
{
public Guid Id { get; set; }
public Guid EventId { get; set; }
public string HandlerName { get; set; } = string.Empty;
public DateTime ProcessedOnUtc { get; set; }
}
Template: Cleanup Job
// src/{name}.infrastructure/Outbox/CleanupOutboxMessagesJob.cs
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Logging;
using Quartz;
namespace {name}.infrastructure.outbox;
/// <summary>
/// Cleans up old processed outbox messages
/// Runs daily to prevent table bloat
/// </summary>
[DisallowConcurrentExecution]
public sealed class CleanupOutboxMessagesJob : IJob
{
private const int RetentionDays = 7;
private const int BatchSize = 1000;
private readonly ApplicationDbContext _dbContext;
private readonly ILogger<CleanupOutboxMessagesJob> _logger;
public CleanupOutboxMessagesJob(
ApplicationDbContext dbContext,
ILogger<CleanupOutboxMessagesJob> logger)
{
_dbContext = dbContext;
_logger = logger;
}
public async Task Execute(IJobExecutionContext context)
{
var cutoffDate = DateTime.UtcNow.AddDays(-RetentionDays);
_logger.LogInformation(
"Cleaning up outbox messages processed before {CutoffDate}",
cutoffDate);
var totalDeleted = 0;
int deletedInBatch;
do
{
deletedInBatch = await _dbContext.OutboxMessages
.Where(m => m.ProcessedOnUtc != null)
.Where(m => m.ProcessedOnUtc < cutoffDate)
.Take(BatchSize)
.ExecuteDeleteAsync(context.CancellationToken);
totalDeleted += deletedInBatch;
} while (deletedInBatch == BatchSize);
_logger.LogInformation(
"Cleaned up {Count} old outbox messages",
totalDeleted);
}
}
Template: Registration
// src/{name}.infrastructure/DependencyInjection.cs
private static void AddBackgroundJobs(
IServiceCollection services,
IConfiguration configuration)
{
services.AddQuartz(configure =>
{
// Use persistent job store for production
configure.UsePersistentStore(options =>
{
options.UsePostgres(configuration.GetConnectionString("Database")!);
options.UseJsonSerializer();
});
});
services.AddQuartzHostedService(options =>
{
options.WaitForJobsToComplete = true;
});
// Register job configurations
services.ConfigureOptions<ProcessOutboxMessagesJobSetup>();
services.ConfigureOptions<CleanupOutboxMessagesJobSetup>();
}
Database Migration
-- Create outbox_message table
CREATE TABLE outbox_message (
id UUID PRIMARY KEY,
type VARCHAR(500) NOT NULL,
content JSONB NOT NULL,
occurred_on_utc TIMESTAMP NOT NULL,
processed_on_utc TIMESTAMP NULL,
error TEXT NULL,
retry_count INTEGER NOT NULL DEFAULT 0
);
-- Index for unprocessed messages (most important)
CREATE INDEX ix_outbox_message_unprocessed
ON outbox_message (occurred_on_utc)
WHERE processed_on_utc IS NULL;
-- Index for cleanup of old messages
CREATE INDEX ix_outbox_message_processed
ON outbox_message (processed_on_utc)
WHERE processed_on_utc IS NOT NULL;
-- Optional: Consumer tracking table for idempotency
CREATE TABLE outbox_message_consumer (
id UUID PRIMARY KEY,
event_id UUID NOT NULL,
handler_name VARCHAR(500) NOT NULL,
processed_on_utc TIMESTAMP NOT NULL
);
CREATE UNIQUE INDEX ix_outbox_consumer_event_handler
ON outbox_message_consumer (event_id, handler_name);
Critical Rules
- Same transaction - Events saved with aggregate in one transaction
- Idempotent handlers - Must handle duplicate delivery
- Order not guaranteed - Events may process out of order
- Retry with backoff - Don't hammer failing events
- Cleanup old messages - Prevent table bloat
- Monitor failures - Alert on max retries exceeded
- Type serialization - Use
AssemblyQualifiedNamefor deserialize - JSON serialization - Consistent options for serialize/deserialize
- Batch processing - Don't process one at a time
- Disable concurrent execution - Prevent duplicate processing
Anti-Patterns to Avoid
// ❌ WRONG: Publishing events directly (not reliable)
await _publisher.Publish(new UserCreatedEvent(user.Id));
await _unitOfWork.SaveChangesAsync(); // Event lost if save fails!
// ✅ CORRECT: Events converted to outbox in SaveChanges
user.RaiseDomainEvent(new UserCreatedEvent(user.Id));
await _unitOfWork.SaveChangesAsync(); // Events saved atomically
// ❌ WRONG: Non-idempotent handler
public async Task Handle(UserCreatedEvent e, CancellationToken ct)
{
await _emailService.SendWelcomeEmail(e.UserId); // Sent twice on retry!
}
// ✅ CORRECT: Idempotent handler
public async Task Handle(UserCreatedEvent e, CancellationToken ct)
{
if (await _emailLog.ExistsAsync(e.UserId, "welcome"))
return; // Already sent
await _emailService.SendWelcomeEmail(e.UserId);
await _emailLog.RecordAsync(e.UserId, "welcome");
}
// ❌ WRONG: Processing one message at a time
foreach (var message in allMessages) // Could be millions!
{
await ProcessMessage(message);
}
// ✅ CORRECT: Batch with limit
var messages = await _dbContext.OutboxMessages
.Where(m => m.ProcessedOnUtc == null)
.Take(20) // Batch size
.ToListAsync();
Related Skills
domain-events-generator- Domain events that go into outboxquartz-background-jobs- Background job schedulingdotnet-clean-architecture- Infrastructure layer setup
Weekly Installs
5
Repository
ronnythedev/dot…e-skillsGitHub Stars
42
First Seen
Mar 1, 2026
Security Audits
Installed on
github-copilot5
opencode4
gemini-cli4
codex4
kimi-cli4
amp4