This document describes the enhanced observability features added to the RabbitMQ publisher and stats aggregation services.
The RabbitMQ publisher implements three tiers of resilience based on event criticality:
Events: stats.ad.click, stats.video.click, stats.ad.impression
Features:
Why: These events are critical for analytics and recommendations. Data loss is unacceptable.
Events: user.login, ads.click
Features:
Why: These events are less critical. Losing a few during outages is acceptable. Simpler implementation reduces overhead.
Warning Logs:
[WARN] ⚠️ Circuit breaker OPEN. Dropping user.login event for uid=user123 (non-critical event, no fallback)
[WARN] ⚠️ Circuit breaker OPEN. Dropping ads.click event for adsId=ad456 (non-critical event, no fallback)
Monitoring Impact: Track these warning logs to understand data loss during RabbitMQ outages:
# Count dropped user.login events
grep "Dropping user.login event" app.log | wc -l
# Count dropped ads.click events
grep "Dropping ads.click event" app.log | wc -l
Events: (none currently, but pattern available for future use)
Features:
Why: For truly ephemeral events where loss doesn't matter (e.g., real-time presence indicators).
All circuit breaker state transitions now include failureCount and successCount in log messages:
Circuit Opened:
[WARN] Circuit breaker OPENED (failureCount=5, successCount=0). Will retry after 60000ms
Circuit Half-Open:
[INFO] Circuit breaker HALF-OPEN (failureCount=0, successCount=0). Testing connection...
Circuit Closed:
[INFO] Circuit breaker CLOSED (failureCount=0, successCount=2). Resuming normal operation.
Reconnection attempts now have detailed structured logs with emojis for easy visual scanning:
Reconnection Start:
[INFO] 🔄 Starting RabbitMQ reconnection attempt (circuitState=HALF_OPEN)...
Connection Progress:
[DEBUG] 🔌 Reconnecting to RabbitMQ at amqp://localhost:5672...
Reconnection Success:
[INFO] ✅ RabbitMQ reconnection successful (hasConnection=true, hasChannel=true)
Reconnection Failure:
[ERROR] ❌ RabbitMQ reconnection failed (circuitState=HALF_OPEN): Connection refused
[WARN] ⚠️ Reconnection failed during HALF_OPEN, reopening circuit
URL Not Configured:
[ERROR] ❌ Reconnection failed: RABBITMQ_URL is not set. Cannot reconnect to RabbitMQ.
The getCircuitStatus() method now returns comprehensive health information:
interface CircuitStatus {
state: 'CLOSED' | 'OPEN' | 'HALF_OPEN';
failureCount: number;
successCount: number;
hasConnection: boolean; // ✨ NEW
hasChannel: boolean; // ✨ NEW
isReconnecting: boolean; // ✨ NEW
nextAttemptTime: number; // ✨ NEW (timestamp in ms)
}
Example Response:
{
"state": "CLOSED",
"failureCount": 0,
"successCount": 2,
"hasConnection": true,
"hasChannel": true,
"isReconnecting": false,
"nextAttemptTime": 0
}
A new internal-only HTTP endpoint provides RabbitMQ health monitoring:
Endpoint: GET /api/v1/internal/rabbitmq/status
Access Control:
ENABLE_INTERNAL_ENDPOINTS=trueResponse Structure:
{
"timestamp": "2025-12-08T12:00:00.000Z",
"rabbitmq": {
"circuitBreaker": {
"state": "CLOSED",
"failureCount": 0,
"successCount": 0,
"nextAttemptTime": null
},
"connection": {
"hasConnection": true,
"hasChannel": true,
"isReconnecting": false
},
"health": "healthy"
}
}
Health Status Values:
"healthy" - Connection & channel active, circuit CLOSED"reconnecting" - Actively attempting to reconnect"unhealthy" - Connection/channel down or circuit OPEN/HALF_OPENProduction Disabled Response:
{
"error": "Internal endpoints are disabled in production",
"hint": "Set ENABLE_INTERNAL_ENDPOINTS=true to enable"
}
Example Usage:
# Development/Staging (no auth required)
curl http://localhost:3000/api/v1/internal/rabbitmq/status
# Production (requires ENABLE_INTERNAL_ENDPOINTS=true)
curl http://prod.example.com/api/v1/internal/rabbitmq/status
Module Configuration:
// rabbitmq.module.ts
@Module({
controllers: [
RabbitmqFallbackReplayController,
RabbitmqStatusController // ✨ NEW
],
// ...
})
The publishUserLogin() and publishAdsClick() methods have been refactored to share the robust publishing pipeline used by stats events, while maintaining their simpler Tier 2 resilience profile.
Before:
async publishUserLogin(event: UserLoginEventPayload): Promise<void> {
if (!this.channel) {
this.logger.warn('RabbitMQ channel not ready. Skipping user.login publish.');
return;
}
// Direct publish, no retries, no circuit breaker
}
After:
async publishUserLogin(event: UserLoginEventPayload): Promise<void> {
// Check circuit breaker
if (!(await this.canAttempt())) {
this.logger.warn(
`⚠️ Circuit breaker OPEN. Dropping user.login event for uid=${event.uid} (non-critical event, no fallback)`
);
return;
}
// Use shared retry logic
await this.retryPublish(async () => {
await this.publishUserLoginCore(event);
}, context);
// Record success/failure for circuit breaker
this.recordSuccess() / this.recordFailure();
}
// Core publish logic separated for retry wrapper
private async publishUserLoginCore(event: UserLoginEventPayload): Promise<void> {
// Actual RabbitMQ publish
}
1. Shared Resilience Logic:
canAttempt() for circuit breaker checksretryPublish() for exponential backoff retriesrecordSuccess() / recordFailure() for circuit state management2. Consistent Error Handling:
3. Easier Monitoring:
4. Future Extensibility:
Callers are NOT affected:
Promise<void>Internal improvements:
Normal Operation:
[DEBUG] Published user.login event for uid=user123
[DEBUG] Published ads.click event for adsId=ad456
Retry After Transient Failure:
[WARN] Publish attempt 1 failed (user.login uid=user123). Retrying in 100ms...
[DEBUG] Published user.login event for uid=user123
Circuit Breaker Active:
[WARN] ⚠️ Circuit breaker OPEN. Dropping user.login event for uid=user123 (non-critical event, no fallback)
[WARN] ⚠️ Circuit breaker OPEN. Dropping ads.click event for adsId=ad456 (non-critical event, no fallback)
Total Failure After Retries:
[ERROR] Failed to publish user.login after 3 retries for uid=user123: Connection refused
Both ads and video aggregation services now provide comprehensive summary logs after each run.
Log Level: INFO (not spammy, one line per run)
Format:
[INFO] 📊 Ads aggregation complete: updated=150, errors=2, scores(min=0.0234, max=2.4567, avg=0.8923), zeroScores=5
Metrics Included:
updated - Number of AdsGlobalStats records successfully updatederrors - Number of ads that failed to aggregatescores.min - Minimum computed score across all adsscores.max - Maximum computed score across all adsscores.avg - Average computed score across all adszeroScores - Count of ads with computedScore = 0Debug-Level Details: Individual ad processing still logs at DEBUG level:
[DEBUG] Aggregated adId=ad123: impressions=1000, clicks=50, CTR=0.0476, score=1.2345
Log Level: INFO
Format:
[INFO] 📊 Video aggregation complete: updated=320, errors=1, scores(min=0.0156, max=3.1234, avg=1.0234), zeroScores=12
Metrics Included:
updated - Number of VideoGlobalStats records successfully updatederrors - Number of videos that failed to aggregatescores.min - Minimum computed score across all videosscores.max - Maximum computed score across all videosscores.avg - Average computed score across all videoszeroScores - Count of videos with computedScore = 0Debug-Level Details:
[DEBUG] Aggregated videoId=video456: impressions=500, clicks=500, CTR=0.4000, score=2.3456
Both methods now track scores during aggregation:
const scores: number[] = [];
let zeroScoreCount = 0;
for (const id of ids) {
const score = await this.aggregateSingle(id, cutoffTime);
scores.push(score);
if (score === 0) zeroScoreCount++;
}
const scoreStats =
scores.length > 0
? {
min: Math.min(...scores),
max: Math.max(...scores),
avg: scores.reduce((sum, s) => sum + s, 0) / scores.length,
}
: { min: 0, max: 0, avg: 0 };
The private aggregation methods now return the computed score:
private async aggregateSingleAd(adId: string, cutoffTime: bigint): Promise<number> {
// ... compute metrics ...
const computedScore = this.computeScore(popularity, ctr, recency);
// ... upsert to database ...
return computedScore; // ✨ NEW
}
RabbitMQ Internal Endpoint:
# Enable in production (default: disabled)
ENABLE_INTERNAL_ENDPOINTS=true
# Or check NODE_ENV
NODE_ENV=production # endpoint disabled by default
NODE_ENV=development # endpoint always enabled
Stats Aggregation (existing):
# CTR smoothing parameters
STATS_CTR_ALPHA=1
STATS_CTR_BETA=2
# Scoring weights
STATS_WEIGHT_POPULARITY=0.5
STATS_WEIGHT_CTR=0.3
STATS_WEIGHT_RECENCY=0.2
Recommended Metrics to Track:
Circuit State Duration
grep "Circuit breaker OPENED" app.logReconnection Failures
grep "❌ RabbitMQ reconnection failed" app.logConnection Health
/api/v1/internal/rabbitmq/status every 60 secondshealth !== "healthy" for > 2 minutesFailure Count Trends
failureCount from circuit transition logsExample Alert Query:
# Check if circuit has been OPEN for too long
tail -n 1000 app.log | grep "Circuit breaker OPENED" | tail -n 1
Recommended Metrics to Track:
Zero Score Count
zeroScores > 10% of total itemsgrep "📊.*zeroScores=" app.logError Rate
errors / (updated + errors) > 0.05 (5%)errors and updated from summary logsScore Distribution
min, max, avg trends over timeavg drops significantly (e.g., > 50% decrease)Processing Time
Example Log Analysis:
# Extract ads aggregation metrics
grep "📊 Ads aggregation complete" app.log | tail -n 10
# Extract video aggregation metrics
grep "📊 Video aggregation complete" app.log | tail -n 10
# Count zero scores over last hour
grep "📊.*zeroScores=" app.log | \
grep "$(date -u +%Y-%m-%d)" | \
awk -F'zeroScores=' '{sum+=$2} END {print sum}'
| Level | What Gets Logged |
|---|---|
| ERROR | Connection failures, reconnection failures, publish errors |
| WARN | Circuit opened, reconnection needed during HALF_OPEN |
| INFO | Circuit state changes, reconnection success, module initialization |
| DEBUG | Individual message publishes, connection health checks, reconnection progress |
Recommended Production Level: INFO
| Level | What Gets Logged |
|---|---|
| ERROR | Individual item aggregation failures (with stack trace) |
| WARN | (not currently used) |
| INFO | Start/end of aggregation runs, summary statistics |
| DEBUG | Per-item aggregation details (CTR, score, etc.) |
Recommended Production Level: INFO
Test in Development:
# Should return full status
curl http://localhost:3000/api/v1/internal/rabbitmq/status | jq
# Verify health status reflects circuit state
# 1. Stop RabbitMQ
# 2. Generate some events (triggers circuit breaker)
# 3. Check endpoint - should show "unhealthy"
# 4. Start RabbitMQ
# 5. Wait 60s (circuit timeout)
# 6. Check endpoint - should show "healthy"
Test in Production:
# Without ENABLE_INTERNAL_ENDPOINTS (should be blocked)
curl https://prod.example.com/api/v1/internal/rabbitmq/status
# Expected: {"error": "Internal endpoints are disabled in production", ...}
# With ENABLE_INTERNAL_ENDPOINTS=true (should work)
export ENABLE_INTERNAL_ENDPOINTS=true
# Restart app
curl https://prod.example.com/api/v1/internal/rabbitmq/status
# Expected: {"timestamp": "...", "rabbitmq": {...}}
Trigger Manual Aggregation:
# Trigger ads aggregation
curl -X POST http://localhost:3001/api/v1/internal/stats/aggregate-ads
# Check logs for summary
tail -f logs/box-stats-api.log | grep "📊 Ads aggregation"
# Trigger video aggregation
curl -X POST http://localhost:3001/api/v1/internal/stats/aggregate-videos
# Check logs for summary
tail -f logs/box-stats-api.log | grep "📊 Video aggregation"
Verify Metrics:
# Count items with zero scores
# Insert test data with 0 impressions/clicks
# Run aggregation
# Verify zeroScores count matches
Problem: Circuit stays OPEN indefinitely
Diagnosis:
grep "Circuit breaker" app.log | tail -n 20
Common Causes:
Solution:
docker ps | grep rabbitmqtelnet localhost 5672Problem: Status endpoint returns unhealthy but RabbitMQ is running
Diagnosis:
curl http://localhost:3000/api/v1/internal/rabbitmq/status | jq .rabbitmq.connection
Common Causes:
Solution:
hasConnection and hasChannel separatelyProblem: High zeroScores count
Diagnosis:
# Find items with zero scores in database
db.adsGlobalStats.find({ computedScore: 0 }).limit(10)
Common Causes:
Solution:
score = w1*log(1+impressions) + w2*ctr + w3*recencyProblem: Average score dropping over time
Diagnosis:
# Extract avg scores over time
grep "📊.*avg=" app.log | \
awk '{print $1, $2}' | \
grep "avg=" | \
awk -F'avg=' '{print $1, $2}'
Common Causes:
Solution:
zeroScores count increasedfailureCount/successCountgetCircuitStatus() with connection health flags/api/v1/internal/rabbitmq/statusVisibility Improvements:
Operations Benefits: