Outbox Pattern
AGENIUM implements the outbox pattern for reliable, at-least-once message delivery.
What is the Outbox Pattern?
The outbox pattern ensures messages are never lost, even if:
- The network fails mid-send
- The remote agent is temporarily unavailable
- Your agent crashes before the message is acknowledged
How It Works
┌──────────────────────────────────────────────┐
│ 1. Enqueue message to outbox (SQLite) │
└──────────────┬───────────────────────────────┘
│
▼
┌──────────────────────────────────────────────┐
│ 2. Attempt send over network │
└──────────────┬───────────────────────────────┘
│
┌──────┴───────┐
│ │
Success Failure
│ │
▼ ▼
┌────────────┐ ┌──────────────────┐
│ 3a. Mark │ │ 3b. Retry later │
│ SENT │ │ (exponential │
│ │ │ backoff) │
└──────┬─────┘ └──────────────────┘
│
▼
┌──────────────────────────────────────────────┐
│ 4. Wait for ACK from remote │
└──────────────┬───────────────────────────────┘
│
┌──────┴────────┐
│ │
ACK received Timeout (retry)
│ │
▼ ▼
┌────────────┐ ┌──────────────────┐
│ 5. Mark │ │ Retry send │
│ ACKED │ │ │
│ (delete) │ │ │
└────────────┘ └──────────────────┘Database Schema
sql
CREATE TABLE outbox (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id TEXT NOT NULL,
message_id TEXT NOT NULL UNIQUE,
frame_json TEXT NOT NULL,
created_at INTEGER NOT NULL,
last_attempt INTEGER,
retry_count INTEGER DEFAULT 0,
state TEXT DEFAULT 'PENDING',
last_error TEXT
);
CREATE INDEX idx_outbox_state ON outbox(state);
CREATE INDEX idx_outbox_session ON outbox(session_id);States
| State | Description |
|---|---|
PENDING | Queued, not yet sent |
SENDING | Send in progress |
SENT | Sent, awaiting ACK |
ACKED | Acknowledged (can delete) |
FAILED | Max retries exceeded |
Outbox Manager
Creating the Manager
typescript
import { createOutboxManager, createDatabase } from 'agenium';
const db = createDatabase('alice', './data');
db.open();
const outbox = createOutboxManager(db);
// Set send function
outbox.setSendFunction(async (sessionId, frame) => {
// Actual network send
const response = await client.request(host, port, {
method: 'POST',
path: '/message',
body: JSON.stringify(frame)
});
return {
success: response.status === 200,
error: response.status !== 200 ? `HTTP ${response.status}` : undefined,
isRetryable: response.status >= 500 // Retry on server errors
};
});
// Start background processing
outbox.start();Enqueuing Messages
typescript
// Enqueue a request
const result = outbox.enqueue(sessionId, requestFrame);
if (result.success) {
console.log('Message queued:', result.messageId);
} else {
console.error('Queue full!', result.error);
}Delivery Guarantees
- At-least-once: Messages delivered 1+ times (never zero)
- Ordering: Messages sent in order within a session
- Idempotency: Receivers must handle duplicates
Retry Strategy
Exponential Backoff
typescript
const RETRY_DELAYS = [
1_000, // 1 second
2_000, // 2 seconds
4_000, // 4 seconds
8_000, // 8 seconds
16_000, // 16 seconds
32_000, // 32 seconds
60_000 // 1 minute (max)
];
function getRetryDelay(retryCount: number): number {
const index = Math.min(retryCount, RETRY_DELAYS.length - 1);
return RETRY_DELAYS[index];
}Max Retries
typescript
const MAX_RETRIES = 10;
if (retryCount >= MAX_RETRIES) {
// Mark as FAILED
outbox.markFailed(messageId, 'Max retries exceeded');
// Notify application
emit('message_failed', { messageId, sessionId, error: 'Max retries' });
}Acknowledgments (ACK)
Receiving ACK
When a message is successfully processed, the receiver sends an ACK event:
typescript
// Receiver side
agent.onRequest('method', async (params, sessionId) => {
// Process request...
// Automatic ACK sent after processing
return { result: 'ok' };
});
// Or for events:
agent.onEvent('notification', async (data, sessionId, messageId) => {
// Process event...
// Send ACK
await agent.event(sessionId, 'ACK', { msgId: messageId });
});Processing ACK
typescript
// Built-in ACK handler
agent.onEvent('ACK', (data, sessionId) => {
const msgId = data?.msgId;
if (msgId && outbox) {
outbox.ack(msgId); // Mark as acknowledged
}
});Deduplication
Recipients must deduplicate messages:
Server-Side Deduplication
typescript
// Check if already processed
if (db.isDuplicate(messageId, sessionId)) {
console.log('Duplicate message, skipping');
return { ok: true, duplicate: true };
}
// Mark as processed
db.markProcessed(messageId, sessionId);
// Process the message
await handleMessage(frame);Deduplication Table
sql
CREATE TABLE processed_messages (
message_id TEXT NOT NULL,
session_id TEXT NOT NULL,
processed_at INTEGER NOT NULL,
PRIMARY KEY (message_id, session_id)
);
-- Auto-cleanup old entries (> 7 days)
DELETE FROM processed_messages
WHERE processed_at < ?;Events
The outbox emits events for monitoring:
typescript
outbox.on('sent', (info) => {
console.log('Message sent:', info.messageId);
});
outbox.on('failed', (info) => {
console.error('Message failed:', info.messageId, info.error);
});
outbox.on('acked', (info) => {
console.log('Message acknowledged:', info.messageId);
});
outbox.on('retry', (info) => {
console.log('Retrying message:', info.messageId, 'attempt', info.retryCount);
});Statistics
Monitor outbox health:
typescript
const stats = outbox.getStats();
console.log(stats);
/*
{
pending: 5, // Messages queued
sending: 2, // Currently sending
sent: 10, // Sent, awaiting ACK
acked: 1234, // Total acknowledged
failed: 3, // Max retries exceeded
totalRetries: 45 // Total retry attempts
}
*/Configuration
Queue Limits
typescript
const outbox = createOutboxManager(db, {
maxPending: 1000, // Max messages in PENDING state
maxRetries: 10, // Give up after 10 retries
processingInterval: 1000 // Check queue every 1s
});Backpressure
When queue is full, new messages are rejected:
typescript
const result = outbox.enqueue(sessionId, frame);
if (!result.success) {
if (result.error === 'QUEUE_FULL') {
console.error('Outbox full, apply backpressure!');
// Options:
// 1. Wait and retry
await sleep(1000);
return outbox.enqueue(sessionId, frame);
// 2. Drop message (not recommended)
// 3. Push to external queue (Redis, etc.)
}
}Advanced Usage
Manual Retry
typescript
// Manually retry a failed message
const message = db.getOutboxMessage(messageId);
if (message.state === 'FAILED') {
// Reset retry count
db.updateOutboxState(messageId, 'PENDING', 0);
// Will be picked up by next processing cycle
}Priority Queuing
typescript
// Add priority field to schema
ALTER TABLE outbox ADD COLUMN priority INTEGER DEFAULT 0;
// Process high-priority messages first
const messages = db.prepare(`
SELECT * FROM outbox
WHERE state = 'PENDING'
ORDER BY priority DESC, created_at ASC
LIMIT 10
`).all();Dead Letter Queue
typescript
// Move failed messages to separate table
CREATE TABLE dead_letter_queue AS SELECT * FROM outbox WHERE 0;
// After max retries
if (retryCount >= MAX_RETRIES) {
db.prepare('INSERT INTO dead_letter_queue SELECT * FROM outbox WHERE id = ?')
.run(messageId);
db.prepare('DELETE FROM outbox WHERE id = ?').run(messageId);
}Error Recovery
Agent Restart
1. Agent crashes with messages in outbox
2. On restart, outbox manager loads pending messages
3. Resumes sending from where it left off
4. No messages lost!typescript
// Automatic on startup
outbox.start();
// Loads all PENDING and SENT messages
// Resumes processingNetwork Partition
1. Network fails during send
2. Message stays in PENDING state
3. Retry with exponential backoff
4. Network recovers
5. Message delivered successfullyRemote Agent Unavailable
1. Remote agent is down
2. All sends fail with connection errors
3. Messages accumulate in outbox
4. Retries continue with backoff
5. Remote agent comes back online
6. All pending messages deliveredBest Practices
1. Always Enable Persistence
typescript
// ✅ Recommended
const agent = createAgent('alice', {
persistence: true // Enables outbox
});
// ❌ Not recommended (messages can be lost)
const agent = createAgent('alice', {
persistence: false
});2. Handle Duplicates
typescript
// ✅ Idempotent handler
agent.onRequest('update_status', async (params, sessionId) => {
// Check if already processed
const existing = db.getStatus(params.id);
if (existing?.version >= params.version) {
return { ok: true, duplicate: true };
}
// Process update
db.updateStatus(params.id, params.status, params.version);
return { ok: true };
});3. Monitor Queue Depth
typescript
setInterval(() => {
const stats = outbox.getStats();
if (stats.pending > 100) {
console.warn('Outbox depth high:', stats.pending);
}
if (stats.failed > 10) {
console.error('Too many failed messages!');
}
}, 60_000);4. Clean Up Old Messages
typescript
// Periodically delete old ACKed messages
setInterval(() => {
const cutoff = Date.now() - 7 * 24 * 60 * 60 * 1000; // 7 days
db.prepare('DELETE FROM outbox WHERE state = "ACKED" AND created_at < ?')
.run(cutoff);
}, 24 * 60 * 60 * 1000); // Daily5. Implement Circuit Breakers
typescript
// Don't retry if remote agent is down
const breaker = new CircuitBreaker({ failureThreshold: 5 });
outbox.setSendFunction(async (sessionId, frame) => {
return breaker.execute(() => actualSend(sessionId, frame));
});Performance Considerations
Batch Processing
typescript
// Process multiple messages in parallel
const pending = db.getPendingMessages(10);
await Promise.all(
pending.map(msg => outbox.send(msg))
);Batching Sends
typescript
// Group multiple messages into one request
const batch = db.getPendingMessages(50);
const batchFrame = {
type: 'BATCH',
messages: batch.map(m => m.frame)
};
await client.request(host, port, {
path: '/batch',
body: JSON.stringify(batchFrame)
});Index Optimization
sql
-- Speed up queue queries
CREATE INDEX idx_outbox_processing
ON outbox(state, created_at);
-- Speed up ACK lookups
CREATE INDEX idx_outbox_message_id
ON outbox(message_id);See Also
- Sessions - Session lifecycle
- Transport - Network layer
- Configuration - Timeout and retry settings