# RabbitMQ Fallback Replay Service ## Overview The **RabbitmqFallbackReplayService** automatically replays messages stored in the Redis fallback queue when RabbitMQ recovers from outages. This completes the recovery story with zero manual intervention. ## How It Works ### Message Flow ``` Normal Operation: Event → RabbitMQ → Success ✅ Circuit Breaker OPEN (RabbitMQ down): Event → Fallback Queue (Redis) → DLQ 📦 RabbitMQ Recovers: Circuit Breaker → HALF_OPEN → CLOSED ✅ Replay Service → Read Fallback Queue → Republish → Success ✅ ``` ### Architecture ```mermaid graph TD A[Message Fails] --> B[Stored in Redis Fallback Queue] B --> C{RabbitMQ Recovered?} C -->|No| D[Wait for CRON Job] D --> C C -->|Yes| E[Replay Service Triggered] E --> F[SCAN Redis Keys] F --> G[Parse Routing Key + MessageID] G --> H[Read Payload] H --> I{Replay Success?} I -->|Yes| J[Delete from Fallback] I -->|No| K[Keep in Fallback + DLQ] J --> L[Process Next Message] K --> L ``` ## Configuration ### Environment Variables ```bash # Enable/disable automatic CRON job replay RABBITMQ_FALLBACK_REPLAY_ENABLED=true # Enable automatic replay RABBITMQ_FALLBACK_REPLAY_ENABLED=false # Disable (manual trigger only) ``` ### CRON Schedule - **Automatic Replay**: Every 5 minutes when `RABBITMQ_FALLBACK_REPLAY_ENABLED=true` - **Batch Size**: 100 messages per run (configurable via API) ## Redis Key Format Fallback messages are stored with this pattern: ``` rabbitmq:fallback:{routingKey}:{messageId} ``` **Examples:** ``` rabbitmq:fallback:stats.ad.click:a1b2c3d4-uuid rabbitmq:fallback:stats.video.click:e5f6g7h8-uuid rabbitmq:fallback:stats.ad.impression:i9j0k1l2-uuid ``` **Payload Structure:** ```json { "messageId": "a1b2c3d4-uuid", "uid": "user123", "adId": "ad456", "adType": "banner", "clickedAt": "2025-01-01T12:00:00Z", "ip": "192.168.1.1" } ``` **TTL:** 24 hours (86400 seconds) ## API Endpoints All endpoints require JWT authentication (`@UseGuards(JwtAuthGuard)`). ### 1. Manual Replay **POST** `/rabbitmq/fallback/replay?limit={number}` Manually trigger replay of fallback messages. **Query Parameters:** - `limit` (optional): Maximum messages to replay (default: 100) **Response:** ```json { "scanned": 150, // Total keys found "replayed": 100, // Successfully replayed "failed": 5, // Failed to replay "deleted": 100 // Successfully deleted from fallback } ``` **Example:** ```bash curl -X POST \ -H "Authorization: Bearer YOUR_JWT_TOKEN" \ "http://localhost:3000/rabbitmq/fallback/replay?limit=200" ``` ### 2. Get Status **GET** `/rabbitmq/fallback/status` Get current replay service status. **Response:** ```json { "enabled": true, // CRON job enabled "isReplaying": false // Currently running } ``` ### 3. Get Queue Size **GET** `/rabbitmq/fallback/queue-size` Get count of messages currently in fallback queue. **Response:** ```json { "count": 42 } ``` ## Service Methods ### Public API #### `replayOnce(limit: number = 100)` Manually trigger replay of fallback messages. **Parameters:** - `limit` - Maximum number of messages to replay (default: 100) **Returns:** ```typescript { scanned: number; // Total keys found replayed: number; // Successfully replayed failed: number; // Failed to replay deleted: number; // Successfully deleted } ``` **Example:** ```typescript const stats = await replayService.replayOnce(200); console.log(`Replayed ${stats.replayed}/${stats.scanned} messages`); ``` #### `getStatus()` Get current replay service status. **Returns:** ```typescript { enabled: boolean; // CRON job enabled isReplaying: boolean; // Currently running } ``` #### `getFallbackQueueSize()` Get count of messages in fallback queue using cursor-based SCAN. **Returns:** `Promise` ### Internal Implementation #### `automaticReplay()` - CRON Job Runs every 5 minutes when `RABBITMQ_FALLBACK_REPLAY_ENABLED=true`. - Replays up to 100 messages per run - Skips if already running (prevents concurrent execution) - Logs summary statistics at INFO level #### `replayMessage(key: string)` - Private Replays a single message from the fallback queue. **Process:** 1. Parse key format: `rabbitmq:fallback:{routingKey}:{messageId}` 2. Extract `routingKey` and `messageId` 3. Read payload from Redis using `redis.getJson()` 4. Call `publisher.replayFallbackMessage(routingKey, payload, messageId)` 5. Delete key on success using `redis.del()` **Returns:** ```typescript { success: boolean; // Replay succeeded deleted: boolean; // Key deleted from fallback } ``` ## Integration ### Module Setup ```typescript // apps/box-app-api/src/rabbitmq/rabbitmq.module.ts @Module({ imports: [ConfigModule, RedisModule, AuthModule], controllers: [RabbitmqFallbackReplayController], providers: [RabbitmqPublisherService, RabbitmqFallbackReplayService], exports: [RabbitmqPublisherService, RabbitmqFallbackReplayService], }) export class RabbitmqModule {} ``` ### App Module Setup ```typescript // apps/box-app-api/src/app.module.ts @Module({ imports: [ ScheduleModule.forRoot(), // Required for @Cron decorator RabbitmqModule, // ... other modules ], }) export class AppModule {} ``` ## Error Handling ### Infinite Loop Prevention The replay service prevents infinite loops through the **context** parameter: ```typescript // Normal publish context = 'stats.ad.click adId=123'; // Replay publish context = 'fallback-replay routingKey=stats.ad.click'; ``` **Behavior:** - Normal publish failures → Store to fallback + DLQ - Replay publish failures → **Only DLQ** (no re-storage to fallback) ### Circuit Breaker Integration Replay respects the circuit breaker state: - **OPEN**: Skip replay (RabbitMQ still down) - **HALF_OPEN**: Attempt replay (testing recovery) - **CLOSED**: Full replay (RabbitMQ healthy) ### Concurrent Execution Prevention The service uses an `isReplaying` flag to prevent concurrent runs: ```typescript if (this.isReplaying) { this.logger.warn('Replay already in progress, skipping this run'); return { scanned: 0, replayed: 0, failed: 0, deleted: 0 }; } ``` ## Performance Considerations ### SCAN vs KEYS The service uses cursor-based **SCAN** instead of **KEYS** to avoid blocking Redis: ```typescript // ✅ Good: Non-blocking cursor iteration let cursor = '0'; do { const result = await this.redis.scan(cursor, 'MATCH', pattern, 'COUNT', 100); cursor = result[0]; const keys = result[1]; // Process keys... } while (cursor !== '0'); // ❌ Bad: Blocks Redis during execution const keys = await this.redis.keys('rabbitmq:fallback:*'); ``` ### Batch Processing - **SCAN Batch Size**: 100 keys per iteration - **Replay Batch Size**: Configurable via `limit` parameter - **Default Limit**: 100 messages per CRON run ### Logging - **DEBUG Level**: Individual message details - **INFO Level**: Summary statistics only - **ERROR Level**: Failures with stack traces **Example Output:** ``` [INFO] Starting fallback replay (limit: 100)... [INFO] Found 150 fallback messages, processing 100... [INFO] Fallback replay completed: scanned=150, replayed=95, failed=5, deleted=95, duration=2345ms ``` ## Monitoring ### Health Checks Monitor these metrics: 1. **Queue Size**: `/rabbitmq/fallback/queue-size` - Alert if > 1000 messages 2. **Replay Status**: `/rabbitmq/fallback/status` - Alert if `enabled=false` in production 3. **Replay Success Rate**: `replayed / scanned` - Alert if < 90% ### Log Analysis Search for these patterns: ```bash # Successful replays grep "Fallback replay completed" app.log # Failed replays grep "Error replaying message" app.log # Circuit breaker blocking replays grep "Circuit breaker OPEN" app.log ``` ## Troubleshooting ### Problem: Messages stuck in fallback queue **Diagnosis:** ```bash curl -H "Authorization: Bearer TOKEN" \ http://localhost:3000/rabbitmq/fallback/queue-size ``` **Solutions:** 1. Check RabbitMQ health 2. Check circuit breaker state 3. Manually trigger replay with higher limit: ```bash curl -X POST -H "Authorization: Bearer TOKEN" \ "http://localhost:3000/rabbitmq/fallback/replay?limit=1000" ``` ### Problem: Replay failing continuously **Diagnosis:** Check logs for error patterns: ```bash grep "Error replaying message" app.log | tail -n 20 ``` **Common Causes:** - RabbitMQ still unhealthy (circuit breaker OPEN) - Invalid payload format - Network connectivity issues **Solutions:** 1. Verify RabbitMQ is fully recovered 2. Check circuit breaker state in logs 3. Inspect DLQ for problematic messages ### Problem: CRON job not running **Diagnosis:** ```bash curl -H "Authorization: Bearer TOKEN" \ http://localhost:3000/rabbitmq/fallback/status ``` **Solutions:** 1. Check environment variable: ```bash echo $RABBITMQ_FALLBACK_REPLAY_ENABLED # Should be "true" ``` 2. Verify ScheduleModule is imported in AppModule 3. Check application logs for CRON job messages ## Best Practices ### Production Deployment 1. **Enable CRON Job:** ```bash RABBITMQ_FALLBACK_REPLAY_ENABLED=true ``` 2. **Monitor Queue Size:** - Set up alerts for queue size > 1000 - Indicates persistent RabbitMQ issues 3. **Rate Limit Manual Triggers:** - Avoid triggering replay with limit > 1000 - Can overwhelm recovering RabbitMQ broker 4. **Review DLQ Regularly:** - Messages that fail replay go to DLQ - Inspect for systematic issues ### Development/Testing 1. **Disable CRON Job:** ```bash RABBITMQ_FALLBACK_REPLAY_ENABLED=false ``` 2. **Test Manual Replay:** ```bash curl -X POST -H "Authorization: Bearer TOKEN" \ "http://localhost:3000/rabbitmq/fallback/replay?limit=10" ``` 3. **Simulate Failures:** - Stop RabbitMQ → Generate events → Check fallback queue - Start RabbitMQ → Trigger manual replay → Verify success ## Complete Recovery Flow ```mermaid sequenceDiagram participant App as Application participant CB as Circuit Breaker participant RQ as Redis Fallback Queue participant MQ as RabbitMQ participant DLQ as Dead Letter Queue participant RS as Replay Service Note over MQ: RabbitMQ goes down App->>CB: publishStatsEvent() CB->>CB: Record failure CB->>CB: State → OPEN App->>RQ: storeInFallbackQueue() App->>DLQ: sendToDLQ() Note over MQ: RabbitMQ recovers RS->>RQ: SCAN for fallback keys RQ-->>RS: Return keys RS->>RS: Parse routingKey & messageId RS->>RQ: getJson(key) RQ-->>RS: Return payload RS->>CB: replayFallbackMessage() CB->>CB: Check state (HALF_OPEN) CB->>MQ: Publish message MQ-->>CB: ACK CB->>CB: Record success CB->>CB: State → CLOSED CB-->>RS: Success RS->>RQ: del(key) RQ-->>RS: Deleted ``` ## Summary The RabbitmqFallbackReplayService provides **automated recovery** from RabbitMQ outages: ✅ **Automatic**: CRON job runs every 5 minutes ✅ **Manual Control**: API endpoints for on-demand replay ✅ **Safe**: Prevents infinite loops and concurrent execution ✅ **Efficient**: Uses SCAN instead of KEYS ✅ **Observable**: Status API and detailed logging ✅ **Production-Ready**: Integrated with circuit breaker and error handling **Data Loss Prevention:** 6 layers of protection 1. Retry with exponential backoff 2. Circuit breaker with auto-recovery 3. Redis fallback queue ← **This service** 4. Dead Letter Queue 5. Publisher idempotency 6. Message TTL