RABBITMQ_FALLBACK_REPLAY.md 11 KB

RabbitMQ Fallback Replay Service

Overview

The RabbitmqFallbackReplayService automatically replays messages stored in the Redis fallback queue when RabbitMQ recovers from outages. This completes the recovery story with zero manual intervention.

How It Works

Message Flow

Normal Operation:
  Event → RabbitMQ → Success ✅

Circuit Breaker OPEN (RabbitMQ down):
  Event → Fallback Queue (Redis) → DLQ 📦

RabbitMQ Recovers:
  Circuit Breaker → HALF_OPEN → CLOSED ✅
  Replay Service → Read Fallback Queue → Republish → Success ✅

Architecture

graph TD
    A[Message Fails] --> B[Stored in Redis Fallback Queue]
    B --> C{RabbitMQ Recovered?}
    C -->|No| D[Wait for CRON Job]
    D --> C
    C -->|Yes| E[Replay Service Triggered]
    E --> F[SCAN Redis Keys]
    F --> G[Parse Routing Key + MessageID]
    G --> H[Read Payload]
    H --> I{Replay Success?}
    I -->|Yes| J[Delete from Fallback]
    I -->|No| K[Keep in Fallback + DLQ]
    J --> L[Process Next Message]
    K --> L

Configuration

Environment Variables

# Enable/disable automatic CRON job replay
RABBITMQ_FALLBACK_REPLAY_ENABLED=true   # Enable automatic replay
RABBITMQ_FALLBACK_REPLAY_ENABLED=false  # Disable (manual trigger only)

CRON Schedule

  • Automatic Replay: Every 5 minutes when RABBITMQ_FALLBACK_REPLAY_ENABLED=true
  • Batch Size: 100 messages per run (configurable via API)

Redis Key Format

Fallback messages are stored with this pattern:

rabbitmq:fallback:{routingKey}:{messageId}

Examples:

rabbitmq:fallback:stats.ad.click:a1b2c3d4-uuid
rabbitmq:fallback:stats.video.click:e5f6g7h8-uuid
rabbitmq:fallback:stats.ad.impression:i9j0k1l2-uuid

Payload Structure:

{
  "messageId": "a1b2c3d4-uuid",
  "uid": "user123",
  "adId": "ad456",
  "adType": "banner",
  "clickedAt": "2025-01-01T12:00:00Z",
  "ip": "192.168.1.1"
}

TTL: 24 hours (86400 seconds)

API Endpoints

All endpoints require JWT authentication (@UseGuards(JwtAuthGuard)).

1. Manual Replay

POST /rabbitmq/fallback/replay?limit={number}

Manually trigger replay of fallback messages.

Query Parameters:

  • limit (optional): Maximum messages to replay (default: 100)

Response:

{
  "scanned": 150, // Total keys found
  "replayed": 100, // Successfully replayed
  "failed": 5, // Failed to replay
  "deleted": 100 // Successfully deleted from fallback
}

Example:

curl -X POST \
  -H "Authorization: Bearer YOUR_JWT_TOKEN" \
  "http://localhost:3000/rabbitmq/fallback/replay?limit=200"

2. Get Status

GET /rabbitmq/fallback/status

Get current replay service status.

Response:

{
  "enabled": true, // CRON job enabled
  "isReplaying": false // Currently running
}

3. Get Queue Size

GET /rabbitmq/fallback/queue-size

Get count of messages currently in fallback queue.

Response:

{
  "count": 42
}

Service Methods

Public API

replayOnce(limit: number = 100)

Manually trigger replay of fallback messages.

Parameters:

  • limit - Maximum number of messages to replay (default: 100)

Returns:

{
  scanned: number; // Total keys found
  replayed: number; // Successfully replayed
  failed: number; // Failed to replay
  deleted: number; // Successfully deleted
}

Example:

const stats = await replayService.replayOnce(200);
console.log(`Replayed ${stats.replayed}/${stats.scanned} messages`);

getStatus()

Get current replay service status.

Returns:

{
  enabled: boolean; // CRON job enabled
  isReplaying: boolean; // Currently running
}

getFallbackQueueSize()

Get count of messages in fallback queue using cursor-based SCAN.

Returns: Promise<number>

Internal Implementation

automaticReplay() - CRON Job

Runs every 5 minutes when RABBITMQ_FALLBACK_REPLAY_ENABLED=true.

  • Replays up to 100 messages per run
  • Skips if already running (prevents concurrent execution)
  • Logs summary statistics at INFO level

replayMessage(key: string) - Private

Replays a single message from the fallback queue.

Process:

  1. Parse key format: rabbitmq:fallback:{routingKey}:{messageId}
  2. Extract routingKey and messageId
  3. Read payload from Redis using redis.getJson()
  4. Call publisher.replayFallbackMessage(routingKey, payload, messageId)
  5. Delete key on success using redis.del()

Returns:

{
  success: boolean; // Replay succeeded
  deleted: boolean; // Key deleted from fallback
}

Integration

Module Setup

// apps/box-app-api/src/rabbitmq/rabbitmq.module.ts
@Module({
  imports: [ConfigModule, RedisModule, AuthModule],
  controllers: [RabbitmqFallbackReplayController],
  providers: [RabbitmqPublisherService, RabbitmqFallbackReplayService],
  exports: [RabbitmqPublisherService, RabbitmqFallbackReplayService],
})
export class RabbitmqModule {}

App Module Setup

// apps/box-app-api/src/app.module.ts
@Module({
  imports: [
    ScheduleModule.forRoot(), // Required for @Cron decorator
    RabbitmqModule,
    // ... other modules
  ],
})
export class AppModule {}

Error Handling

Infinite Loop Prevention

The replay service prevents infinite loops through the context parameter:

// Normal publish
context = 'stats.ad.click adId=123';

// Replay publish
context = 'fallback-replay routingKey=stats.ad.click';

Behavior:

  • Normal publish failures → Store to fallback + DLQ
  • Replay publish failures → Only DLQ (no re-storage to fallback)

Circuit Breaker Integration

Replay respects the circuit breaker state:

  • OPEN: Skip replay (RabbitMQ still down)
  • HALF_OPEN: Attempt replay (testing recovery)
  • CLOSED: Full replay (RabbitMQ healthy)

Concurrent Execution Prevention

The service uses an isReplaying flag to prevent concurrent runs:

if (this.isReplaying) {
  this.logger.warn('Replay already in progress, skipping this run');
  return { scanned: 0, replayed: 0, failed: 0, deleted: 0 };
}

Performance Considerations

SCAN vs KEYS

The service uses cursor-based SCAN instead of KEYS to avoid blocking Redis:

// ✅ Good: Non-blocking cursor iteration
let cursor = '0';
do {
  const result = await this.redis.scan(cursor, 'MATCH', pattern, 'COUNT', 100);
  cursor = result[0];
  const keys = result[1];
  // Process keys...
} while (cursor !== '0');

// ❌ Bad: Blocks Redis during execution
const keys = await this.redis.keys('rabbitmq:fallback:*');

Batch Processing

  • SCAN Batch Size: 100 keys per iteration
  • Replay Batch Size: Configurable via limit parameter
  • Default Limit: 100 messages per CRON run

Logging

  • DEBUG Level: Individual message details
  • INFO Level: Summary statistics only
  • ERROR Level: Failures with stack traces

Example Output:

[INFO] Starting fallback replay (limit: 100)...
[INFO] Found 150 fallback messages, processing 100...
[INFO] Fallback replay completed: scanned=150, replayed=95, failed=5, deleted=95, duration=2345ms

Monitoring

Health Checks

Monitor these metrics:

  1. Queue Size: /rabbitmq/fallback/queue-size
    • Alert if > 1000 messages
  2. Replay Status: /rabbitmq/fallback/status
    • Alert if enabled=false in production
  3. Replay Success Rate: replayed / scanned
    • Alert if < 90%

Log Analysis

Search for these patterns:

# Successful replays
grep "Fallback replay completed" app.log

# Failed replays
grep "Error replaying message" app.log

# Circuit breaker blocking replays
grep "Circuit breaker OPEN" app.log

Troubleshooting

Problem: Messages stuck in fallback queue

Diagnosis:

curl -H "Authorization: Bearer TOKEN" \
  http://localhost:3000/rabbitmq/fallback/queue-size

Solutions:

  1. Check RabbitMQ health
  2. Check circuit breaker state
  3. Manually trigger replay with higher limit:

    curl -X POST -H "Authorization: Bearer TOKEN" \
     "http://localhost:3000/rabbitmq/fallback/replay?limit=1000"
    

Problem: Replay failing continuously

Diagnosis: Check logs for error patterns:

grep "Error replaying message" app.log | tail -n 20

Common Causes:

  • RabbitMQ still unhealthy (circuit breaker OPEN)
  • Invalid payload format
  • Network connectivity issues

Solutions:

  1. Verify RabbitMQ is fully recovered
  2. Check circuit breaker state in logs
  3. Inspect DLQ for problematic messages

Problem: CRON job not running

Diagnosis:

curl -H "Authorization: Bearer TOKEN" \
  http://localhost:3000/rabbitmq/fallback/status

Solutions:

  1. Check environment variable:

    echo $RABBITMQ_FALLBACK_REPLAY_ENABLED  # Should be "true"
    
  2. Verify ScheduleModule is imported in AppModule

  3. Check application logs for CRON job messages

Best Practices

Production Deployment

  1. Enable CRON Job:

    RABBITMQ_FALLBACK_REPLAY_ENABLED=true
    
  2. Monitor Queue Size:

    • Set up alerts for queue size > 1000
    • Indicates persistent RabbitMQ issues
  3. Rate Limit Manual Triggers:

    • Avoid triggering replay with limit > 1000
    • Can overwhelm recovering RabbitMQ broker
  4. Review DLQ Regularly:

    • Messages that fail replay go to DLQ
    • Inspect for systematic issues

Development/Testing

  1. Disable CRON Job:

    RABBITMQ_FALLBACK_REPLAY_ENABLED=false
    
  2. Test Manual Replay:

    curl -X POST -H "Authorization: Bearer TOKEN" \
     "http://localhost:3000/rabbitmq/fallback/replay?limit=10"
    
  3. Simulate Failures:

    • Stop RabbitMQ → Generate events → Check fallback queue
    • Start RabbitMQ → Trigger manual replay → Verify success

Complete Recovery Flow

sequenceDiagram
    participant App as Application
    participant CB as Circuit Breaker
    participant RQ as Redis Fallback Queue
    participant MQ as RabbitMQ
    participant DLQ as Dead Letter Queue
    participant RS as Replay Service

    Note over MQ: RabbitMQ goes down
    App->>CB: publishStatsEvent()
    CB->>CB: Record failure
    CB->>CB: State → OPEN
    App->>RQ: storeInFallbackQueue()
    App->>DLQ: sendToDLQ()

    Note over MQ: RabbitMQ recovers
    RS->>RQ: SCAN for fallback keys
    RQ-->>RS: Return keys
    RS->>RS: Parse routingKey & messageId
    RS->>RQ: getJson(key)
    RQ-->>RS: Return payload
    RS->>CB: replayFallbackMessage()
    CB->>CB: Check state (HALF_OPEN)
    CB->>MQ: Publish message
    MQ-->>CB: ACK
    CB->>CB: Record success
    CB->>CB: State → CLOSED
    CB-->>RS: Success
    RS->>RQ: del(key)
    RQ-->>RS: Deleted

Summary

The RabbitmqFallbackReplayService provides automated recovery from RabbitMQ outages:

Automatic: CRON job runs every 5 minutes
Manual Control: API endpoints for on-demand replay
Safe: Prevents infinite loops and concurrent execution
Efficient: Uses SCAN instead of KEYS
Observable: Status API and detailed logging
Production-Ready: Integrated with circuit breaker and error handling

Data Loss Prevention: 6 layers of protection

  1. Retry with exponential backoff
  2. Circuit breaker with auto-recovery
  3. Redis fallback queue ← This service
  4. Dead Letter Queue
  5. Publisher idempotency
  6. Message TTL