The RabbitmqFallbackReplayService automatically replays messages stored in the Redis fallback queue when RabbitMQ recovers from outages. This completes the recovery story with zero manual intervention.
Normal Operation:
Event → RabbitMQ → Success ✅
Circuit Breaker OPEN (RabbitMQ down):
Event → Fallback Queue (Redis) → DLQ 📦
RabbitMQ Recovers:
Circuit Breaker → HALF_OPEN → CLOSED ✅
Replay Service → Read Fallback Queue → Republish → Success ✅
graph TD
A[Message Fails] --> B[Stored in Redis Fallback Queue]
B --> C{RabbitMQ Recovered?}
C -->|No| D[Wait for CRON Job]
D --> C
C -->|Yes| E[Replay Service Triggered]
E --> F[SCAN Redis Keys]
F --> G[Parse Routing Key + MessageID]
G --> H[Read Payload]
H --> I{Replay Success?}
I -->|Yes| J[Delete from Fallback]
I -->|No| K[Keep in Fallback + DLQ]
J --> L[Process Next Message]
K --> L
# Enable/disable automatic CRON job replay
RABBITMQ_FALLBACK_REPLAY_ENABLED=true # Enable automatic replay
RABBITMQ_FALLBACK_REPLAY_ENABLED=false # Disable (manual trigger only)
RABBITMQ_FALLBACK_REPLAY_ENABLED=trueFallback messages are stored with this pattern:
rabbitmq:fallback:{routingKey}:{messageId}
Examples:
rabbitmq:fallback:stats.ad.click:a1b2c3d4-uuid
rabbitmq:fallback:stats.video.click:e5f6g7h8-uuid
rabbitmq:fallback:stats.ad.impression:i9j0k1l2-uuid
Payload Structure:
{
"messageId": "a1b2c3d4-uuid",
"uid": "user123",
"adId": "ad456",
"adType": "banner",
"clickedAt": "2025-01-01T12:00:00Z",
"ip": "192.168.1.1"
}
TTL: 24 hours (86400 seconds)
All endpoints require JWT authentication (@UseGuards(JwtAuthGuard)).
POST /rabbitmq/fallback/replay?limit={number}
Manually trigger replay of fallback messages.
Query Parameters:
limit (optional): Maximum messages to replay (default: 100)Response:
{
"scanned": 150, // Total keys found
"replayed": 100, // Successfully replayed
"failed": 5, // Failed to replay
"deleted": 100 // Successfully deleted from fallback
}
Example:
curl -X POST \
-H "Authorization: Bearer YOUR_JWT_TOKEN" \
"http://localhost:3000/rabbitmq/fallback/replay?limit=200"
GET /rabbitmq/fallback/status
Get current replay service status.
Response:
{
"enabled": true, // CRON job enabled
"isReplaying": false // Currently running
}
GET /rabbitmq/fallback/queue-size
Get count of messages currently in fallback queue.
Response:
{
"count": 42
}
replayOnce(limit: number = 100)Manually trigger replay of fallback messages.
Parameters:
limit - Maximum number of messages to replay (default: 100)Returns:
{
scanned: number; // Total keys found
replayed: number; // Successfully replayed
failed: number; // Failed to replay
deleted: number; // Successfully deleted
}
Example:
const stats = await replayService.replayOnce(200);
console.log(`Replayed ${stats.replayed}/${stats.scanned} messages`);
getStatus()Get current replay service status.
Returns:
{
enabled: boolean; // CRON job enabled
isReplaying: boolean; // Currently running
}
getFallbackQueueSize()Get count of messages in fallback queue using cursor-based SCAN.
Returns: Promise<number>
automaticReplay() - CRON JobRuns every 5 minutes when RABBITMQ_FALLBACK_REPLAY_ENABLED=true.
replayMessage(key: string) - PrivateReplays a single message from the fallback queue.
Process:
rabbitmq:fallback:{routingKey}:{messageId}routingKey and messageIdredis.getJson()publisher.replayFallbackMessage(routingKey, payload, messageId)redis.del()Returns:
{
success: boolean; // Replay succeeded
deleted: boolean; // Key deleted from fallback
}
// apps/box-app-api/src/rabbitmq/rabbitmq.module.ts
@Module({
imports: [ConfigModule, RedisModule, AuthModule],
controllers: [RabbitmqFallbackReplayController],
providers: [RabbitmqPublisherService, RabbitmqFallbackReplayService],
exports: [RabbitmqPublisherService, RabbitmqFallbackReplayService],
})
export class RabbitmqModule {}
// apps/box-app-api/src/app.module.ts
@Module({
imports: [
ScheduleModule.forRoot(), // Required for @Cron decorator
RabbitmqModule,
// ... other modules
],
})
export class AppModule {}
The replay service prevents infinite loops through the context parameter:
// Normal publish
context = 'stats.ad.click adId=123';
// Replay publish
context = 'fallback-replay routingKey=stats.ad.click';
Behavior:
Replay respects the circuit breaker state:
The service uses an isReplaying flag to prevent concurrent runs:
if (this.isReplaying) {
this.logger.warn('Replay already in progress, skipping this run');
return { scanned: 0, replayed: 0, failed: 0, deleted: 0 };
}
The service uses cursor-based SCAN instead of KEYS to avoid blocking Redis:
// ✅ Good: Non-blocking cursor iteration
let cursor = '0';
do {
const result = await this.redis.scan(cursor, 'MATCH', pattern, 'COUNT', 100);
cursor = result[0];
const keys = result[1];
// Process keys...
} while (cursor !== '0');
// ❌ Bad: Blocks Redis during execution
const keys = await this.redis.keys('rabbitmq:fallback:*');
limit parameterExample Output:
[INFO] Starting fallback replay (limit: 100)...
[INFO] Found 150 fallback messages, processing 100...
[INFO] Fallback replay completed: scanned=150, replayed=95, failed=5, deleted=95, duration=2345ms
Monitor these metrics:
/rabbitmq/fallback/queue-size
/rabbitmq/fallback/status
enabled=false in productionreplayed / scanned
Search for these patterns:
# Successful replays
grep "Fallback replay completed" app.log
# Failed replays
grep "Error replaying message" app.log
# Circuit breaker blocking replays
grep "Circuit breaker OPEN" app.log
Diagnosis:
curl -H "Authorization: Bearer TOKEN" \
http://localhost:3000/rabbitmq/fallback/queue-size
Solutions:
Manually trigger replay with higher limit:
curl -X POST -H "Authorization: Bearer TOKEN" \
"http://localhost:3000/rabbitmq/fallback/replay?limit=1000"
Diagnosis: Check logs for error patterns:
grep "Error replaying message" app.log | tail -n 20
Common Causes:
Solutions:
Diagnosis:
curl -H "Authorization: Bearer TOKEN" \
http://localhost:3000/rabbitmq/fallback/status
Solutions:
Check environment variable:
echo $RABBITMQ_FALLBACK_REPLAY_ENABLED # Should be "true"
Verify ScheduleModule is imported in AppModule
Check application logs for CRON job messages
Enable CRON Job:
RABBITMQ_FALLBACK_REPLAY_ENABLED=true
Monitor Queue Size:
Rate Limit Manual Triggers:
Review DLQ Regularly:
Disable CRON Job:
RABBITMQ_FALLBACK_REPLAY_ENABLED=false
Test Manual Replay:
curl -X POST -H "Authorization: Bearer TOKEN" \
"http://localhost:3000/rabbitmq/fallback/replay?limit=10"
Simulate Failures:
sequenceDiagram
participant App as Application
participant CB as Circuit Breaker
participant RQ as Redis Fallback Queue
participant MQ as RabbitMQ
participant DLQ as Dead Letter Queue
participant RS as Replay Service
Note over MQ: RabbitMQ goes down
App->>CB: publishStatsEvent()
CB->>CB: Record failure
CB->>CB: State → OPEN
App->>RQ: storeInFallbackQueue()
App->>DLQ: sendToDLQ()
Note over MQ: RabbitMQ recovers
RS->>RQ: SCAN for fallback keys
RQ-->>RS: Return keys
RS->>RS: Parse routingKey & messageId
RS->>RQ: getJson(key)
RQ-->>RS: Return payload
RS->>CB: replayFallbackMessage()
CB->>CB: Check state (HALF_OPEN)
CB->>MQ: Publish message
MQ-->>CB: ACK
CB->>CB: Record success
CB->>CB: State → CLOSED
CB-->>RS: Success
RS->>RQ: del(key)
RQ-->>RS: Deleted
The RabbitmqFallbackReplayService provides automated recovery from RabbitMQ outages:
✅ Automatic: CRON job runs every 5 minutes
✅ Manual Control: API endpoints for on-demand replay
✅ Safe: Prevents infinite loops and concurrent execution
✅ Efficient: Uses SCAN instead of KEYS
✅ Observable: Status API and detailed logging
✅ Production-Ready: Integrated with circuit breaker and error handling
Data Loss Prevention: 6 layers of protection