# Observability Enhancements for RabbitMQ and Stats Aggregation This document describes the enhanced observability features added to the RabbitMQ publisher and stats aggregation services. --- ## Part A: RabbitMQ Publisher Observability ### Event Publishing Resilience Tiers The RabbitMQ publisher implements **three tiers of resilience** based on event criticality: #### Tier 1: Stats Events (Full Resilience) **Events:** `stats.ad.click`, `stats.video.click`, `stats.ad.impression` **Features:** - ✅ Circuit breaker protection - ✅ Retry logic (3 attempts with exponential backoff) - ✅ Redis fallback queue (24-hour TTL) - ✅ Dead Letter Queue (DLQ) - ✅ Publisher-level idempotency (7-day window) - ✅ Message TTL (24 hours) **Why:** These events are critical for analytics and recommendations. Data loss is unacceptable. #### Tier 2: User Activity Events (Partial Resilience) **Events:** `user.login`, `ads.click` **Features:** - ✅ Circuit breaker protection - ✅ Retry logic (3 attempts with exponential backoff) - ❌ No Redis fallback queue - ❌ No DLQ - ❌ No idempotency checking - ❌ No message TTL **Why:** These events are less critical. Losing a few during outages is acceptable. Simpler implementation reduces overhead. **Warning Logs:** ``` [WARN] ⚠️ Circuit breaker OPEN. Dropping user.login event for uid=user123 (non-critical event, no fallback) [WARN] ⚠️ Circuit breaker OPEN. Dropping ads.click event for adsId=ad456 (non-critical event, no fallback) ``` **Monitoring Impact:** Track these warning logs to understand data loss during RabbitMQ outages: ```bash # Count dropped user.login events grep "Dropping user.login event" app.log | wc -l # Count dropped ads.click events grep "Dropping ads.click event" app.log | wc -l ``` #### Tier 3: Fire-and-Forget (No Resilience) **Events:** (none currently, but pattern available for future use) **Features:** - ❌ No circuit breaker - ❌ No retries - ❌ No fallback - ❌ Direct publish, drop on failure **Why:** For truly ephemeral events where loss doesn't matter (e.g., real-time presence indicators). --- ### Enhanced Structured Logging #### 1. Circuit Breaker State Transitions All circuit breaker state transitions now include `failureCount` and `successCount` in log messages: **Circuit Opened:** ``` [WARN] Circuit breaker OPENED (failureCount=5, successCount=0). Will retry after 60000ms ``` **Circuit Half-Open:** ``` [INFO] Circuit breaker HALF-OPEN (failureCount=0, successCount=0). Testing connection... ``` **Circuit Closed:** ``` [INFO] Circuit breaker CLOSED (failureCount=0, successCount=2). Resuming normal operation. ``` #### 2. Reconnection Lifecycle Logging Reconnection attempts now have detailed structured logs with emojis for easy visual scanning: **Reconnection Start:** ``` [INFO] 🔄 Starting RabbitMQ reconnection attempt (circuitState=HALF_OPEN)... ``` **Connection Progress:** ``` [DEBUG] 🔌 Reconnecting to RabbitMQ at amqp://localhost:5672... ``` **Reconnection Success:** ``` [INFO] ✅ RabbitMQ reconnection successful (hasConnection=true, hasChannel=true) ``` **Reconnection Failure:** ``` [ERROR] ❌ RabbitMQ reconnection failed (circuitState=HALF_OPEN): Connection refused [WARN] ⚠️ Reconnection failed during HALF_OPEN, reopening circuit ``` **URL Not Configured:** ``` [ERROR] ❌ Reconnection failed: RABBITMQ_URL is not set. Cannot reconnect to RabbitMQ. ``` ### Extended Circuit Status API The `getCircuitStatus()` method now returns comprehensive health information: ```typescript interface CircuitStatus { state: 'CLOSED' | 'OPEN' | 'HALF_OPEN'; failureCount: number; successCount: number; hasConnection: boolean; // ✨ NEW hasChannel: boolean; // ✨ NEW isReconnecting: boolean; // ✨ NEW nextAttemptTime: number; // ✨ NEW (timestamp in ms) } ``` **Example Response:** ```json { "state": "CLOSED", "failureCount": 0, "successCount": 2, "hasConnection": true, "hasChannel": true, "isReconnecting": false, "nextAttemptTime": 0 } ``` ### Internal Status Endpoint A new internal-only HTTP endpoint provides RabbitMQ health monitoring: **Endpoint:** `GET /api/v1/internal/rabbitmq/status` **Access Control:** - Disabled by default in production (NODE_ENV=production) - Enable in production with: `ENABLE_INTERNAL_ENDPOINTS=true` - Always accessible in development/staging environments **Response Structure:** ```json { "timestamp": "2025-12-08T12:00:00.000Z", "rabbitmq": { "circuitBreaker": { "state": "CLOSED", "failureCount": 0, "successCount": 0, "nextAttemptTime": null }, "connection": { "hasConnection": true, "hasChannel": true, "isReconnecting": false }, "health": "healthy" } } ``` **Health Status Values:** - `"healthy"` - Connection & channel active, circuit CLOSED - `"reconnecting"` - Actively attempting to reconnect - `"unhealthy"` - Connection/channel down or circuit OPEN/HALF_OPEN **Production Disabled Response:** ```json { "error": "Internal endpoints are disabled in production", "hint": "Set ENABLE_INTERNAL_ENDPOINTS=true to enable" } ``` **Example Usage:** ```bash # Development/Staging (no auth required) curl http://localhost:3000/api/v1/internal/rabbitmq/status # Production (requires ENABLE_INTERNAL_ENDPOINTS=true) curl http://prod.example.com/api/v1/internal/rabbitmq/status ``` **Module Configuration:** ```typescript // rabbitmq.module.ts @Module({ controllers: [ RabbitmqFallbackReplayController, RabbitmqStatusController // ✨ NEW ], // ... }) ``` ### Refactored User Activity Event Publishing The `publishUserLogin()` and `publishAdsClick()` methods have been refactored to share the robust publishing pipeline used by stats events, while maintaining their simpler Tier 2 resilience profile. #### Code Structure **Before:** ```typescript async publishUserLogin(event: UserLoginEventPayload): Promise { if (!this.channel) { this.logger.warn('RabbitMQ channel not ready. Skipping user.login publish.'); return; } // Direct publish, no retries, no circuit breaker } ``` **After:** ```typescript async publishUserLogin(event: UserLoginEventPayload): Promise { // Check circuit breaker if (!(await this.canAttempt())) { this.logger.warn( `⚠️ Circuit breaker OPEN. Dropping user.login event for uid=${event.uid} (non-critical event, no fallback)` ); return; } // Use shared retry logic await this.retryPublish(async () => { await this.publishUserLoginCore(event); }, context); // Record success/failure for circuit breaker this.recordSuccess() / this.recordFailure(); } // Core publish logic separated for retry wrapper private async publishUserLoginCore(event: UserLoginEventPayload): Promise { // Actual RabbitMQ publish } ``` #### Benefits of Refactoring **1. Shared Resilience Logic:** - Both methods now use `canAttempt()` for circuit breaker checks - Both methods use `retryPublish()` for exponential backoff retries - Both methods use `recordSuccess()` / `recordFailure()` for circuit state management **2. Consistent Error Handling:** - Clear warning logs when circuit breaker drops events - Structured error messages with retry attempt counts - Fire-and-forget behavior maintained (no exceptions thrown) **3. Easier Monitoring:** - Consistent log format across all event types - Circuit breaker state applies to all events - Clear indication of dropped events for impact analysis **4. Future Extensibility:** - Easy to upgrade to Tier 1 resilience (just add fallback/idempotency) - Easy to downgrade to Tier 3 (remove circuit breaker) - Consistent codebase for all publish methods #### Behavioral Changes **Callers are NOT affected:** - Method signatures unchanged - Still returns `Promise` - Still fire-and-forget (no exceptions thrown) - Still safe to use in async contexts **Internal improvements:** - 3 retry attempts with delays: 100ms, 500ms, 2000ms - Circuit breaker prevents cascading failures - Better logging for troubleshooting #### Example Logs **Normal Operation:** ``` [DEBUG] Published user.login event for uid=user123 [DEBUG] Published ads.click event for adsId=ad456 ``` **Retry After Transient Failure:** ``` [WARN] Publish attempt 1 failed (user.login uid=user123). Retrying in 100ms... [DEBUG] Published user.login event for uid=user123 ``` **Circuit Breaker Active:** ``` [WARN] ⚠️ Circuit breaker OPEN. Dropping user.login event for uid=user123 (non-critical event, no fallback) [WARN] ⚠️ Circuit breaker OPEN. Dropping ads.click event for adsId=ad456 (non-critical event, no fallback) ``` **Total Failure After Retries:** ``` [ERROR] Failed to publish user.login after 3 retries for uid=user123: Connection refused ``` --- ## Part B: Stats Aggregation Observability ### Enhanced Summary Logging Both ads and video aggregation services now provide comprehensive summary logs after each run. #### Ads Aggregation Summary **Log Level:** `INFO` (not spammy, one line per run) **Format:** ``` [INFO] 📊 Ads aggregation complete: updated=150, errors=2, scores(min=0.0234, max=2.4567, avg=0.8923), zeroScores=5 ``` **Metrics Included:** - `updated` - Number of AdsGlobalStats records successfully updated - `errors` - Number of ads that failed to aggregate - `scores.min` - Minimum computed score across all ads - `scores.max` - Maximum computed score across all ads - `scores.avg` - Average computed score across all ads - `zeroScores` - Count of ads with computedScore = 0 **Debug-Level Details:** Individual ad processing still logs at DEBUG level: ``` [DEBUG] Aggregated adId=ad123: impressions=1000, clicks=50, CTR=0.0476, score=1.2345 ``` #### Video Aggregation Summary **Log Level:** `INFO` **Format:** ``` [INFO] 📊 Video aggregation complete: updated=320, errors=1, scores(min=0.0156, max=3.1234, avg=1.0234), zeroScores=12 ``` **Metrics Included:** - `updated` - Number of VideoGlobalStats records successfully updated - `errors` - Number of videos that failed to aggregate - `scores.min` - Minimum computed score across all videos - `scores.max` - Maximum computed score across all videos - `scores.avg` - Average computed score across all videos - `zeroScores` - Count of videos with computedScore = 0 **Debug-Level Details:** ``` [DEBUG] Aggregated videoId=video456: impressions=500, clicks=500, CTR=0.4000, score=2.3456 ``` ### Implementation Details #### Score Tracking Both methods now track scores during aggregation: ```typescript const scores: number[] = []; let zeroScoreCount = 0; for (const id of ids) { const score = await this.aggregateSingle(id, cutoffTime); scores.push(score); if (score === 0) zeroScoreCount++; } ``` #### Score Statistics Calculation ```typescript const scoreStats = scores.length > 0 ? { min: Math.min(...scores), max: Math.max(...scores), avg: scores.reduce((sum, s) => sum + s, 0) / scores.length, } : { min: 0, max: 0, avg: 0 }; ``` #### Return Values The private aggregation methods now return the computed score: ```typescript private async aggregateSingleAd(adId: string, cutoffTime: bigint): Promise { // ... compute metrics ... const computedScore = this.computeScore(popularity, ctr, recency); // ... upsert to database ... return computedScore; // ✨ NEW } ``` --- ## Configuration ### Environment Variables **RabbitMQ Internal Endpoint:** ```bash # Enable in production (default: disabled) ENABLE_INTERNAL_ENDPOINTS=true # Or check NODE_ENV NODE_ENV=production # endpoint disabled by default NODE_ENV=development # endpoint always enabled ``` **Stats Aggregation (existing):** ```bash # CTR smoothing parameters STATS_CTR_ALPHA=1 STATS_CTR_BETA=2 # Scoring weights STATS_WEIGHT_POPULARITY=0.5 STATS_WEIGHT_CTR=0.3 STATS_WEIGHT_RECENCY=0.2 ``` --- ## Monitoring & Alerts ### RabbitMQ Monitoring **Recommended Metrics to Track:** 1. **Circuit State Duration** - Alert if circuit stays OPEN for > 5 minutes - Pattern: `grep "Circuit breaker OPENED" app.log` 2. **Reconnection Failures** - Alert if reconnection fails > 3 times in 10 minutes - Pattern: `grep "❌ RabbitMQ reconnection failed" app.log` 3. **Connection Health** - Poll `/api/v1/internal/rabbitmq/status` every 60 seconds - Alert if `health !== "healthy"` for > 2 minutes 4. **Failure Count Trends** - Extract `failureCount` from circuit transition logs - Alert if consistently > 0 even when circuit is CLOSED **Example Alert Query:** ```bash # Check if circuit has been OPEN for too long tail -n 1000 app.log | grep "Circuit breaker OPENED" | tail -n 1 ``` ### Stats Aggregation Monitoring **Recommended Metrics to Track:** 1. **Zero Score Count** - Alert if `zeroScores` > 10% of total items - Indicates potential data quality issues - Pattern: `grep "📊.*zeroScores=" app.log` 2. **Error Rate** - Alert if `errors / (updated + errors) > 0.05` (5%) - Pattern: Extract `errors` and `updated` from summary logs 3. **Score Distribution** - Monitor `min`, `max`, `avg` trends over time - Alert if `avg` drops significantly (e.g., > 50% decrease) 4. **Processing Time** - Use existing start/end timestamps - Alert if aggregation takes > expected duration **Example Log Analysis:** ```bash # Extract ads aggregation metrics grep "📊 Ads aggregation complete" app.log | tail -n 10 # Extract video aggregation metrics grep "📊 Video aggregation complete" app.log | tail -n 10 # Count zero scores over last hour grep "📊.*zeroScores=" app.log | \ grep "$(date -u +%Y-%m-%d)" | \ awk -F'zeroScores=' '{sum+=$2} END {print sum}' ``` --- ## Log Level Guide ### RabbitMQ Publisher | Level | What Gets Logged | | ----- | ----------------------------------------------------------------------------- | | ERROR | Connection failures, reconnection failures, publish errors | | WARN | Circuit opened, reconnection needed during HALF_OPEN | | INFO | Circuit state changes, reconnection success, module initialization | | DEBUG | Individual message publishes, connection health checks, reconnection progress | **Recommended Production Level:** `INFO` - Captures all state changes and errors - Minimal noise (no per-message logs) ### Stats Aggregation | Level | What Gets Logged | | ----- | ------------------------------------------------------- | | ERROR | Individual item aggregation failures (with stack trace) | | WARN | (not currently used) | | INFO | Start/end of aggregation runs, summary statistics | | DEBUG | Per-item aggregation details (CTR, score, etc.) | **Recommended Production Level:** `INFO` - One log line per aggregation run - Summary statistics for monitoring - Individual item details at DEBUG only --- ## Testing ### RabbitMQ Status Endpoint **Test in Development:** ```bash # Should return full status curl http://localhost:3000/api/v1/internal/rabbitmq/status | jq # Verify health status reflects circuit state # 1. Stop RabbitMQ # 2. Generate some events (triggers circuit breaker) # 3. Check endpoint - should show "unhealthy" # 4. Start RabbitMQ # 5. Wait 60s (circuit timeout) # 6. Check endpoint - should show "healthy" ``` **Test in Production:** ```bash # Without ENABLE_INTERNAL_ENDPOINTS (should be blocked) curl https://prod.example.com/api/v1/internal/rabbitmq/status # Expected: {"error": "Internal endpoints are disabled in production", ...} # With ENABLE_INTERNAL_ENDPOINTS=true (should work) export ENABLE_INTERNAL_ENDPOINTS=true # Restart app curl https://prod.example.com/api/v1/internal/rabbitmq/status # Expected: {"timestamp": "...", "rabbitmq": {...}} ``` ### Stats Aggregation Logs **Trigger Manual Aggregation:** ```bash # Trigger ads aggregation curl -X POST http://localhost:3001/api/v1/internal/stats/aggregate-ads # Check logs for summary tail -f logs/box-stats-api.log | grep "📊 Ads aggregation" # Trigger video aggregation curl -X POST http://localhost:3001/api/v1/internal/stats/aggregate-videos # Check logs for summary tail -f logs/box-stats-api.log | grep "📊 Video aggregation" ``` **Verify Metrics:** ```bash # Count items with zero scores # Insert test data with 0 impressions/clicks # Run aggregation # Verify zeroScores count matches ``` --- ## Troubleshooting ### RabbitMQ Publisher **Problem:** Circuit stays OPEN indefinitely **Diagnosis:** ```bash grep "Circuit breaker" app.log | tail -n 20 ``` **Common Causes:** - RabbitMQ still down - Reconnection failing (check for "❌ RabbitMQ reconnection failed") - Network connectivity issues **Solution:** 1. Verify RabbitMQ is running: `docker ps | grep rabbitmq` 2. Check network: `telnet localhost 5672` 3. Review reconnection error logs 4. Restart application if necessary --- **Problem:** Status endpoint returns unhealthy but RabbitMQ is running **Diagnosis:** ```bash curl http://localhost:3000/api/v1/internal/rabbitmq/status | jq .rabbitmq.connection ``` **Common Causes:** - Connection established but channel creation failed - Circuit breaker in OPEN/HALF_OPEN state - Stale connection object **Solution:** 1. Check `hasConnection` and `hasChannel` separately 2. Review logs for channel creation errors 3. Wait for next reconnection cycle (60s) ### Stats Aggregation **Problem:** High `zeroScores` count **Diagnosis:** ```bash # Find items with zero scores in database db.adsGlobalStats.find({ computedScore: 0 }).limit(10) ``` **Common Causes:** - Items with no impressions or clicks (new items) - Items older than aggregation window - Scoring formula issues **Solution:** 1. Verify items have events in the time window 2. Check scoring weights configuration 3. Review formula: `score = w1*log(1+impressions) + w2*ctr + w3*recency` --- **Problem:** Average score dropping over time **Diagnosis:** ```bash # Extract avg scores over time grep "📊.*avg=" app.log | \ awk '{print $1, $2}' | \ grep "avg=" | \ awk -F'avg=' '{print $1, $2}' ``` **Common Causes:** - Increased number of low-quality items - Recency decay (older items losing score) - Changed scoring weights **Solution:** 1. Review score distribution (min, max, avg) 2. Check if `zeroScores` count increased 3. Verify scoring weights haven't changed 4. Consider adjusting weights if needed --- ## Summary ### Part A: RabbitMQ Publisher ✅ - ✅ Enhanced circuit state transition logs with `failureCount`/`successCount` - ✅ Detailed reconnection lifecycle logs with structured messages - ✅ Extended `getCircuitStatus()` with connection health flags - ✅ New internal status endpoint `/api/v1/internal/rabbitmq/status` - ✅ Environment-based access control for production safety ### Part B: Stats Aggregation ✅ - ✅ Summary logs after each aggregation run - ✅ Score statistics (min, max, avg) included - ✅ Zero score count tracking - ✅ Count of updated vs errored items - ✅ INFO-level logs (not spammy) - ✅ DEBUG-level per-item details preserved ### Impact **Visibility Improvements:** - Circuit breaker state changes are now self-explanatory - Reconnection progress is traceable end-to-end - RabbitMQ health can be monitored programmatically - Stats aggregation quality is measurable per run **Operations Benefits:** - Faster troubleshooting with structured logs - Proactive monitoring via status endpoint - Clear separation of summary (INFO) vs detail (DEBUG) logs - Production-safe internal endpoints with environment guards