Skip to content

Outbox Pattern

AGENIUM implements the outbox pattern for reliable, at-least-once message delivery.

What is the Outbox Pattern?

The outbox pattern ensures messages are never lost, even if:

  • The network fails mid-send
  • The remote agent is temporarily unavailable
  • Your agent crashes before the message is acknowledged

How It Works

┌──────────────────────────────────────────────┐
│  1. Enqueue message to outbox (SQLite)       │
└──────────────┬───────────────────────────────┘


┌──────────────────────────────────────────────┐
│  2. Attempt send over network                │
└──────────────┬───────────────────────────────┘

        ┌──────┴───────┐
        │              │
    Success          Failure
        │              │
        ▼              ▼
┌────────────┐  ┌──────────────────┐
│  3a. Mark  │  │  3b. Retry later │
│     SENT   │  │  (exponential    │
│            │  │   backoff)       │
└──────┬─────┘  └──────────────────┘


┌──────────────────────────────────────────────┐
│  4. Wait for ACK from remote                 │
└──────────────┬───────────────────────────────┘

        ┌──────┴────────┐
        │               │
   ACK received    Timeout (retry)
        │               │
        ▼               ▼
┌────────────┐  ┌──────────────────┐
│  5. Mark   │  │  Retry send      │
│    ACKED   │  │                  │
│  (delete)  │  │                  │
└────────────┘  └──────────────────┘

Database Schema

sql
CREATE TABLE outbox (
  id INTEGER PRIMARY KEY AUTOINCREMENT,
  session_id TEXT NOT NULL,
  message_id TEXT NOT NULL UNIQUE,
  frame_json TEXT NOT NULL,
  created_at INTEGER NOT NULL,
  last_attempt INTEGER,
  retry_count INTEGER DEFAULT 0,
  state TEXT DEFAULT 'PENDING',
  last_error TEXT
);

CREATE INDEX idx_outbox_state ON outbox(state);
CREATE INDEX idx_outbox_session ON outbox(session_id);

States

StateDescription
PENDINGQueued, not yet sent
SENDINGSend in progress
SENTSent, awaiting ACK
ACKEDAcknowledged (can delete)
FAILEDMax retries exceeded

Outbox Manager

Creating the Manager

typescript
import { createOutboxManager, createDatabase } from 'agenium';

const db = createDatabase('alice', './data');
db.open();

const outbox = createOutboxManager(db);

// Set send function
outbox.setSendFunction(async (sessionId, frame) => {
  // Actual network send
  const response = await client.request(host, port, {
    method: 'POST',
    path: '/message',
    body: JSON.stringify(frame)
  });
  
  return {
    success: response.status === 200,
    error: response.status !== 200 ? `HTTP ${response.status}` : undefined,
    isRetryable: response.status >= 500  // Retry on server errors
  };
});

// Start background processing
outbox.start();

Enqueuing Messages

typescript
// Enqueue a request
const result = outbox.enqueue(sessionId, requestFrame);

if (result.success) {
  console.log('Message queued:', result.messageId);
} else {
  console.error('Queue full!', result.error);
}

Delivery Guarantees

  • At-least-once: Messages delivered 1+ times (never zero)
  • Ordering: Messages sent in order within a session
  • Idempotency: Receivers must handle duplicates

Retry Strategy

Exponential Backoff

typescript
const RETRY_DELAYS = [
  1_000,    // 1 second
  2_000,    // 2 seconds
  4_000,    // 4 seconds
  8_000,    // 8 seconds
  16_000,   // 16 seconds
  32_000,   // 32 seconds
  60_000    // 1 minute (max)
];

function getRetryDelay(retryCount: number): number {
  const index = Math.min(retryCount, RETRY_DELAYS.length - 1);
  return RETRY_DELAYS[index];
}

Max Retries

typescript
const MAX_RETRIES = 10;

if (retryCount >= MAX_RETRIES) {
  // Mark as FAILED
  outbox.markFailed(messageId, 'Max retries exceeded');
  
  // Notify application
  emit('message_failed', { messageId, sessionId, error: 'Max retries' });
}

Acknowledgments (ACK)

Receiving ACK

When a message is successfully processed, the receiver sends an ACK event:

typescript
// Receiver side
agent.onRequest('method', async (params, sessionId) => {
  // Process request...
  
  // Automatic ACK sent after processing
  return { result: 'ok' };
});

// Or for events:
agent.onEvent('notification', async (data, sessionId, messageId) => {
  // Process event...
  
  // Send ACK
  await agent.event(sessionId, 'ACK', { msgId: messageId });
});

Processing ACK

typescript
// Built-in ACK handler
agent.onEvent('ACK', (data, sessionId) => {
  const msgId = data?.msgId;
  if (msgId && outbox) {
    outbox.ack(msgId);  // Mark as acknowledged
  }
});

Deduplication

Recipients must deduplicate messages:

Server-Side Deduplication

typescript
// Check if already processed
if (db.isDuplicate(messageId, sessionId)) {
  console.log('Duplicate message, skipping');
  return { ok: true, duplicate: true };
}

// Mark as processed
db.markProcessed(messageId, sessionId);

// Process the message
await handleMessage(frame);

Deduplication Table

sql
CREATE TABLE processed_messages (
  message_id TEXT NOT NULL,
  session_id TEXT NOT NULL,
  processed_at INTEGER NOT NULL,
  PRIMARY KEY (message_id, session_id)
);

-- Auto-cleanup old entries (> 7 days)
DELETE FROM processed_messages 
WHERE processed_at < ?;

Events

The outbox emits events for monitoring:

typescript
outbox.on('sent', (info) => {
  console.log('Message sent:', info.messageId);
});

outbox.on('failed', (info) => {
  console.error('Message failed:', info.messageId, info.error);
});

outbox.on('acked', (info) => {
  console.log('Message acknowledged:', info.messageId);
});

outbox.on('retry', (info) => {
  console.log('Retrying message:', info.messageId, 'attempt', info.retryCount);
});

Statistics

Monitor outbox health:

typescript
const stats = outbox.getStats();
console.log(stats);
/*
{
  pending: 5,       // Messages queued
  sending: 2,       // Currently sending
  sent: 10,         // Sent, awaiting ACK
  acked: 1234,      // Total acknowledged
  failed: 3,        // Max retries exceeded
  totalRetries: 45  // Total retry attempts
}
*/

Configuration

Queue Limits

typescript
const outbox = createOutboxManager(db, {
  maxPending: 1000,        // Max messages in PENDING state
  maxRetries: 10,          // Give up after 10 retries
  processingInterval: 1000 // Check queue every 1s
});

Backpressure

When queue is full, new messages are rejected:

typescript
const result = outbox.enqueue(sessionId, frame);

if (!result.success) {
  if (result.error === 'QUEUE_FULL') {
    console.error('Outbox full, apply backpressure!');
    
    // Options:
    // 1. Wait and retry
    await sleep(1000);
    return outbox.enqueue(sessionId, frame);
    
    // 2. Drop message (not recommended)
    // 3. Push to external queue (Redis, etc.)
  }
}

Advanced Usage

Manual Retry

typescript
// Manually retry a failed message
const message = db.getOutboxMessage(messageId);

if (message.state === 'FAILED') {
  // Reset retry count
  db.updateOutboxState(messageId, 'PENDING', 0);
  
  // Will be picked up by next processing cycle
}

Priority Queuing

typescript
// Add priority field to schema
ALTER TABLE outbox ADD COLUMN priority INTEGER DEFAULT 0;

// Process high-priority messages first
const messages = db.prepare(`
  SELECT * FROM outbox 
  WHERE state = 'PENDING' 
  ORDER BY priority DESC, created_at ASC 
  LIMIT 10
`).all();

Dead Letter Queue

typescript
// Move failed messages to separate table
CREATE TABLE dead_letter_queue AS SELECT * FROM outbox WHERE 0;

// After max retries
if (retryCount >= MAX_RETRIES) {
  db.prepare('INSERT INTO dead_letter_queue SELECT * FROM outbox WHERE id = ?')
    .run(messageId);
  
  db.prepare('DELETE FROM outbox WHERE id = ?').run(messageId);
}

Error Recovery

Agent Restart

1. Agent crashes with messages in outbox
2. On restart, outbox manager loads pending messages
3. Resumes sending from where it left off
4. No messages lost!
typescript
// Automatic on startup
outbox.start();

// Loads all PENDING and SENT messages
// Resumes processing

Network Partition

1. Network fails during send
2. Message stays in PENDING state
3. Retry with exponential backoff
4. Network recovers
5. Message delivered successfully

Remote Agent Unavailable

1. Remote agent is down
2. All sends fail with connection errors
3. Messages accumulate in outbox
4. Retries continue with backoff
5. Remote agent comes back online
6. All pending messages delivered

Best Practices

1. Always Enable Persistence

typescript
// ✅ Recommended
const agent = createAgent('alice', {
  persistence: true  // Enables outbox
});

// ❌ Not recommended (messages can be lost)
const agent = createAgent('alice', {
  persistence: false
});

2. Handle Duplicates

typescript
// ✅ Idempotent handler
agent.onRequest('update_status', async (params, sessionId) => {
  // Check if already processed
  const existing = db.getStatus(params.id);
  if (existing?.version >= params.version) {
    return { ok: true, duplicate: true };
  }
  
  // Process update
  db.updateStatus(params.id, params.status, params.version);
  return { ok: true };
});

3. Monitor Queue Depth

typescript
setInterval(() => {
  const stats = outbox.getStats();
  
  if (stats.pending > 100) {
    console.warn('Outbox depth high:', stats.pending);
  }
  
  if (stats.failed > 10) {
    console.error('Too many failed messages!');
  }
}, 60_000);

4. Clean Up Old Messages

typescript
// Periodically delete old ACKed messages
setInterval(() => {
  const cutoff = Date.now() - 7 * 24 * 60 * 60 * 1000; // 7 days
  db.prepare('DELETE FROM outbox WHERE state = "ACKED" AND created_at < ?')
    .run(cutoff);
}, 24 * 60 * 60 * 1000); // Daily

5. Implement Circuit Breakers

typescript
// Don't retry if remote agent is down
const breaker = new CircuitBreaker({ failureThreshold: 5 });

outbox.setSendFunction(async (sessionId, frame) => {
  return breaker.execute(() => actualSend(sessionId, frame));
});

Performance Considerations

Batch Processing

typescript
// Process multiple messages in parallel
const pending = db.getPendingMessages(10);

await Promise.all(
  pending.map(msg => outbox.send(msg))
);

Batching Sends

typescript
// Group multiple messages into one request
const batch = db.getPendingMessages(50);

const batchFrame = {
  type: 'BATCH',
  messages: batch.map(m => m.frame)
};

await client.request(host, port, {
  path: '/batch',
  body: JSON.stringify(batchFrame)
});

Index Optimization

sql
-- Speed up queue queries
CREATE INDEX idx_outbox_processing 
  ON outbox(state, created_at);

-- Speed up ACK lookups
CREATE INDEX idx_outbox_message_id 
  ON outbox(message_id);

See Also

Released under the MIT License.