OBSERVABILITY_ENHANCEMENTS.md 19 KB

Observability Enhancements for RabbitMQ and Stats Aggregation

This document describes the enhanced observability features added to the RabbitMQ publisher and stats aggregation services.


Part A: RabbitMQ Publisher Observability

Event Publishing Resilience Tiers

The RabbitMQ publisher implements three tiers of resilience based on event criticality:

Tier 1: Stats Events (Full Resilience)

Events: stats.ad.click, stats.video.click, stats.ad.impression

Features:

  • ✅ Circuit breaker protection
  • ✅ Retry logic (3 attempts with exponential backoff)
  • ✅ Redis fallback queue (24-hour TTL)
  • ✅ Dead Letter Queue (DLQ)
  • ✅ Publisher-level idempotency (7-day window)
  • ✅ Message TTL (24 hours)

Why: These events are critical for analytics and recommendations. Data loss is unacceptable.

Tier 2: User Activity Events (Partial Resilience)

Events: user.login, ads.click

Features:

  • ✅ Circuit breaker protection
  • ✅ Retry logic (3 attempts with exponential backoff)
  • ❌ No Redis fallback queue
  • ❌ No DLQ
  • ❌ No idempotency checking
  • ❌ No message TTL

Why: These events are less critical. Losing a few during outages is acceptable. Simpler implementation reduces overhead.

Warning Logs:

[WARN] ⚠️  Circuit breaker OPEN. Dropping user.login event for uid=user123 (non-critical event, no fallback)
[WARN] ⚠️  Circuit breaker OPEN. Dropping ads.click event for adsId=ad456 (non-critical event, no fallback)

Monitoring Impact: Track these warning logs to understand data loss during RabbitMQ outages:

# Count dropped user.login events
grep "Dropping user.login event" app.log | wc -l

# Count dropped ads.click events
grep "Dropping ads.click event" app.log | wc -l

Tier 3: Fire-and-Forget (No Resilience)

Events: (none currently, but pattern available for future use)

Features:

  • ❌ No circuit breaker
  • ❌ No retries
  • ❌ No fallback
  • ❌ Direct publish, drop on failure

Why: For truly ephemeral events where loss doesn't matter (e.g., real-time presence indicators).


Enhanced Structured Logging

1. Circuit Breaker State Transitions

All circuit breaker state transitions now include failureCount and successCount in log messages:

Circuit Opened:

[WARN] Circuit breaker OPENED (failureCount=5, successCount=0). Will retry after 60000ms

Circuit Half-Open:

[INFO] Circuit breaker HALF-OPEN (failureCount=0, successCount=0). Testing connection...

Circuit Closed:

[INFO] Circuit breaker CLOSED (failureCount=0, successCount=2). Resuming normal operation.

2. Reconnection Lifecycle Logging

Reconnection attempts now have detailed structured logs with emojis for easy visual scanning:

Reconnection Start:

[INFO] 🔄 Starting RabbitMQ reconnection attempt (circuitState=HALF_OPEN)...

Connection Progress:

[DEBUG] 🔌 Reconnecting to RabbitMQ at amqp://localhost:5672...

Reconnection Success:

[INFO] ✅ RabbitMQ reconnection successful (hasConnection=true, hasChannel=true)

Reconnection Failure:

[ERROR] ❌ RabbitMQ reconnection failed (circuitState=HALF_OPEN): Connection refused
[WARN] ⚠️  Reconnection failed during HALF_OPEN, reopening circuit

URL Not Configured:

[ERROR] ❌ Reconnection failed: RABBITMQ_URL is not set. Cannot reconnect to RabbitMQ.

Extended Circuit Status API

The getCircuitStatus() method now returns comprehensive health information:

interface CircuitStatus {
  state: 'CLOSED' | 'OPEN' | 'HALF_OPEN';
  failureCount: number;
  successCount: number;
  hasConnection: boolean; // ✨ NEW
  hasChannel: boolean; // ✨ NEW
  isReconnecting: boolean; // ✨ NEW
  nextAttemptTime: number; // ✨ NEW (timestamp in ms)
}

Example Response:

{
  "state": "CLOSED",
  "failureCount": 0,
  "successCount": 2,
  "hasConnection": true,
  "hasChannel": true,
  "isReconnecting": false,
  "nextAttemptTime": 0
}

Internal Status Endpoint

A new internal-only HTTP endpoint provides RabbitMQ health monitoring:

Endpoint: GET /api/v1/internal/rabbitmq/status

Access Control:

  • Disabled by default in production (NODE_ENV=production)
  • Enable in production with: ENABLE_INTERNAL_ENDPOINTS=true
  • Always accessible in development/staging environments

Response Structure:

{
  "timestamp": "2025-12-08T12:00:00.000Z",
  "rabbitmq": {
    "circuitBreaker": {
      "state": "CLOSED",
      "failureCount": 0,
      "successCount": 0,
      "nextAttemptTime": null
    },
    "connection": {
      "hasConnection": true,
      "hasChannel": true,
      "isReconnecting": false
    },
    "health": "healthy"
  }
}

Health Status Values:

  • "healthy" - Connection & channel active, circuit CLOSED
  • "reconnecting" - Actively attempting to reconnect
  • "unhealthy" - Connection/channel down or circuit OPEN/HALF_OPEN

Production Disabled Response:

{
  "error": "Internal endpoints are disabled in production",
  "hint": "Set ENABLE_INTERNAL_ENDPOINTS=true to enable"
}

Example Usage:

# Development/Staging (no auth required)
curl http://localhost:3000/api/v1/internal/rabbitmq/status

# Production (requires ENABLE_INTERNAL_ENDPOINTS=true)
curl http://prod.example.com/api/v1/internal/rabbitmq/status

Module Configuration:

// rabbitmq.module.ts
@Module({
  controllers: [
    RabbitmqFallbackReplayController,
    RabbitmqStatusController  // ✨ NEW
  ],
  // ...
})

Refactored User Activity Event Publishing

The publishUserLogin() and publishAdsClick() methods have been refactored to share the robust publishing pipeline used by stats events, while maintaining their simpler Tier 2 resilience profile.

Code Structure

Before:

async publishUserLogin(event: UserLoginEventPayload): Promise<void> {
  if (!this.channel) {
    this.logger.warn('RabbitMQ channel not ready. Skipping user.login publish.');
    return;
  }
  // Direct publish, no retries, no circuit breaker
}

After:

async publishUserLogin(event: UserLoginEventPayload): Promise<void> {
  // Check circuit breaker
  if (!(await this.canAttempt())) {
    this.logger.warn(
      `⚠️  Circuit breaker OPEN. Dropping user.login event for uid=${event.uid} (non-critical event, no fallback)`
    );
    return;
  }

  // Use shared retry logic
  await this.retryPublish(async () => {
    await this.publishUserLoginCore(event);
  }, context);

  // Record success/failure for circuit breaker
  this.recordSuccess() / this.recordFailure();
}

// Core publish logic separated for retry wrapper
private async publishUserLoginCore(event: UserLoginEventPayload): Promise<void> {
  // Actual RabbitMQ publish
}

Benefits of Refactoring

1. Shared Resilience Logic:

  • Both methods now use canAttempt() for circuit breaker checks
  • Both methods use retryPublish() for exponential backoff retries
  • Both methods use recordSuccess() / recordFailure() for circuit state management

2. Consistent Error Handling:

  • Clear warning logs when circuit breaker drops events
  • Structured error messages with retry attempt counts
  • Fire-and-forget behavior maintained (no exceptions thrown)

3. Easier Monitoring:

  • Consistent log format across all event types
  • Circuit breaker state applies to all events
  • Clear indication of dropped events for impact analysis

4. Future Extensibility:

  • Easy to upgrade to Tier 1 resilience (just add fallback/idempotency)
  • Easy to downgrade to Tier 3 (remove circuit breaker)
  • Consistent codebase for all publish methods

Behavioral Changes

Callers are NOT affected:

  • Method signatures unchanged
  • Still returns Promise<void>
  • Still fire-and-forget (no exceptions thrown)
  • Still safe to use in async contexts

Internal improvements:

  • 3 retry attempts with delays: 100ms, 500ms, 2000ms
  • Circuit breaker prevents cascading failures
  • Better logging for troubleshooting

Example Logs

Normal Operation:

[DEBUG] Published user.login event for uid=user123
[DEBUG] Published ads.click event for adsId=ad456

Retry After Transient Failure:

[WARN] Publish attempt 1 failed (user.login uid=user123). Retrying in 100ms...
[DEBUG] Published user.login event for uid=user123

Circuit Breaker Active:

[WARN] ⚠️  Circuit breaker OPEN. Dropping user.login event for uid=user123 (non-critical event, no fallback)
[WARN] ⚠️  Circuit breaker OPEN. Dropping ads.click event for adsId=ad456 (non-critical event, no fallback)

Total Failure After Retries:

[ERROR] Failed to publish user.login after 3 retries for uid=user123: Connection refused

Part B: Stats Aggregation Observability

Enhanced Summary Logging

Both ads and video aggregation services now provide comprehensive summary logs after each run.

Ads Aggregation Summary

Log Level: INFO (not spammy, one line per run)

Format:

[INFO] 📊 Ads aggregation complete: updated=150, errors=2, scores(min=0.0234, max=2.4567, avg=0.8923), zeroScores=5

Metrics Included:

  • updated - Number of AdsGlobalStats records successfully updated
  • errors - Number of ads that failed to aggregate
  • scores.min - Minimum computed score across all ads
  • scores.max - Maximum computed score across all ads
  • scores.avg - Average computed score across all ads
  • zeroScores - Count of ads with computedScore = 0

Debug-Level Details: Individual ad processing still logs at DEBUG level:

[DEBUG] Aggregated adId=ad123: impressions=1000, clicks=50, CTR=0.0476, score=1.2345

Video Aggregation Summary

Log Level: INFO

Format:

[INFO] 📊 Video aggregation complete: updated=320, errors=1, scores(min=0.0156, max=3.1234, avg=1.0234), zeroScores=12

Metrics Included:

  • updated - Number of VideoGlobalStats records successfully updated
  • errors - Number of videos that failed to aggregate
  • scores.min - Minimum computed score across all videos
  • scores.max - Maximum computed score across all videos
  • scores.avg - Average computed score across all videos
  • zeroScores - Count of videos with computedScore = 0

Debug-Level Details:

[DEBUG] Aggregated videoId=video456: impressions=500, clicks=500, CTR=0.4000, score=2.3456

Implementation Details

Score Tracking

Both methods now track scores during aggregation:

const scores: number[] = [];
let zeroScoreCount = 0;

for (const id of ids) {
  const score = await this.aggregateSingle(id, cutoffTime);
  scores.push(score);
  if (score === 0) zeroScoreCount++;
}

Score Statistics Calculation

const scoreStats =
  scores.length > 0
    ? {
        min: Math.min(...scores),
        max: Math.max(...scores),
        avg: scores.reduce((sum, s) => sum + s, 0) / scores.length,
      }
    : { min: 0, max: 0, avg: 0 };

Return Values

The private aggregation methods now return the computed score:

private async aggregateSingleAd(adId: string, cutoffTime: bigint): Promise<number> {
  // ... compute metrics ...
  const computedScore = this.computeScore(popularity, ctr, recency);
  // ... upsert to database ...
  return computedScore;  // ✨ NEW
}

Configuration

Environment Variables

RabbitMQ Internal Endpoint:

# Enable in production (default: disabled)
ENABLE_INTERNAL_ENDPOINTS=true

# Or check NODE_ENV
NODE_ENV=production  # endpoint disabled by default
NODE_ENV=development # endpoint always enabled

Stats Aggregation (existing):

# CTR smoothing parameters
STATS_CTR_ALPHA=1
STATS_CTR_BETA=2

# Scoring weights
STATS_WEIGHT_POPULARITY=0.5
STATS_WEIGHT_CTR=0.3
STATS_WEIGHT_RECENCY=0.2

Monitoring & Alerts

RabbitMQ Monitoring

Recommended Metrics to Track:

  1. Circuit State Duration

    • Alert if circuit stays OPEN for > 5 minutes
    • Pattern: grep "Circuit breaker OPENED" app.log
  2. Reconnection Failures

    • Alert if reconnection fails > 3 times in 10 minutes
    • Pattern: grep "❌ RabbitMQ reconnection failed" app.log
  3. Connection Health

    • Poll /api/v1/internal/rabbitmq/status every 60 seconds
    • Alert if health !== "healthy" for > 2 minutes
  4. Failure Count Trends

    • Extract failureCount from circuit transition logs
    • Alert if consistently > 0 even when circuit is CLOSED

Example Alert Query:

# Check if circuit has been OPEN for too long
tail -n 1000 app.log | grep "Circuit breaker OPENED" | tail -n 1

Stats Aggregation Monitoring

Recommended Metrics to Track:

  1. Zero Score Count

    • Alert if zeroScores > 10% of total items
    • Indicates potential data quality issues
    • Pattern: grep "📊.*zeroScores=" app.log
  2. Error Rate

    • Alert if errors / (updated + errors) > 0.05 (5%)
    • Pattern: Extract errors and updated from summary logs
  3. Score Distribution

    • Monitor min, max, avg trends over time
    • Alert if avg drops significantly (e.g., > 50% decrease)
  4. Processing Time

    • Use existing start/end timestamps
    • Alert if aggregation takes > expected duration

Example Log Analysis:

# Extract ads aggregation metrics
grep "📊 Ads aggregation complete" app.log | tail -n 10

# Extract video aggregation metrics
grep "📊 Video aggregation complete" app.log | tail -n 10

# Count zero scores over last hour
grep "📊.*zeroScores=" app.log | \
  grep "$(date -u +%Y-%m-%d)" | \
  awk -F'zeroScores=' '{sum+=$2} END {print sum}'

Log Level Guide

RabbitMQ Publisher

Level What Gets Logged
ERROR Connection failures, reconnection failures, publish errors
WARN Circuit opened, reconnection needed during HALF_OPEN
INFO Circuit state changes, reconnection success, module initialization
DEBUG Individual message publishes, connection health checks, reconnection progress

Recommended Production Level: INFO

  • Captures all state changes and errors
  • Minimal noise (no per-message logs)

Stats Aggregation

Level What Gets Logged
ERROR Individual item aggregation failures (with stack trace)
WARN (not currently used)
INFO Start/end of aggregation runs, summary statistics
DEBUG Per-item aggregation details (CTR, score, etc.)

Recommended Production Level: INFO

  • One log line per aggregation run
  • Summary statistics for monitoring
  • Individual item details at DEBUG only

Testing

RabbitMQ Status Endpoint

Test in Development:

# Should return full status
curl http://localhost:3000/api/v1/internal/rabbitmq/status | jq

# Verify health status reflects circuit state
# 1. Stop RabbitMQ
# 2. Generate some events (triggers circuit breaker)
# 3. Check endpoint - should show "unhealthy"
# 4. Start RabbitMQ
# 5. Wait 60s (circuit timeout)
# 6. Check endpoint - should show "healthy"

Test in Production:

# Without ENABLE_INTERNAL_ENDPOINTS (should be blocked)
curl https://prod.example.com/api/v1/internal/rabbitmq/status
# Expected: {"error": "Internal endpoints are disabled in production", ...}

# With ENABLE_INTERNAL_ENDPOINTS=true (should work)
export ENABLE_INTERNAL_ENDPOINTS=true
# Restart app
curl https://prod.example.com/api/v1/internal/rabbitmq/status
# Expected: {"timestamp": "...", "rabbitmq": {...}}

Stats Aggregation Logs

Trigger Manual Aggregation:

# Trigger ads aggregation
curl -X POST http://localhost:3001/api/v1/internal/stats/aggregate-ads

# Check logs for summary
tail -f logs/box-stats-api.log | grep "📊 Ads aggregation"

# Trigger video aggregation
curl -X POST http://localhost:3001/api/v1/internal/stats/aggregate-videos

# Check logs for summary
tail -f logs/box-stats-api.log | grep "📊 Video aggregation"

Verify Metrics:

# Count items with zero scores
# Insert test data with 0 impressions/clicks
# Run aggregation
# Verify zeroScores count matches

Troubleshooting

RabbitMQ Publisher

Problem: Circuit stays OPEN indefinitely

Diagnosis:

grep "Circuit breaker" app.log | tail -n 20

Common Causes:

  • RabbitMQ still down
  • Reconnection failing (check for "❌ RabbitMQ reconnection failed")
  • Network connectivity issues

Solution:

  1. Verify RabbitMQ is running: docker ps | grep rabbitmq
  2. Check network: telnet localhost 5672
  3. Review reconnection error logs
  4. Restart application if necessary

Problem: Status endpoint returns unhealthy but RabbitMQ is running

Diagnosis:

curl http://localhost:3000/api/v1/internal/rabbitmq/status | jq .rabbitmq.connection

Common Causes:

  • Connection established but channel creation failed
  • Circuit breaker in OPEN/HALF_OPEN state
  • Stale connection object

Solution:

  1. Check hasConnection and hasChannel separately
  2. Review logs for channel creation errors
  3. Wait for next reconnection cycle (60s)

Stats Aggregation

Problem: High zeroScores count

Diagnosis:

# Find items with zero scores in database
db.adsGlobalStats.find({ computedScore: 0 }).limit(10)

Common Causes:

  • Items with no impressions or clicks (new items)
  • Items older than aggregation window
  • Scoring formula issues

Solution:

  1. Verify items have events in the time window
  2. Check scoring weights configuration
  3. Review formula: score = w1*log(1+impressions) + w2*ctr + w3*recency

Problem: Average score dropping over time

Diagnosis:

# Extract avg scores over time
grep "📊.*avg=" app.log | \
  awk '{print $1, $2}' | \
  grep "avg=" | \
  awk -F'avg=' '{print $1, $2}'

Common Causes:

  • Increased number of low-quality items
  • Recency decay (older items losing score)
  • Changed scoring weights

Solution:

  1. Review score distribution (min, max, avg)
  2. Check if zeroScores count increased
  3. Verify scoring weights haven't changed
  4. Consider adjusting weights if needed

Summary

Part A: RabbitMQ Publisher ✅

  • ✅ Enhanced circuit state transition logs with failureCount/successCount
  • ✅ Detailed reconnection lifecycle logs with structured messages
  • ✅ Extended getCircuitStatus() with connection health flags
  • ✅ New internal status endpoint /api/v1/internal/rabbitmq/status
  • ✅ Environment-based access control for production safety

Part B: Stats Aggregation ✅

  • ✅ Summary logs after each aggregation run
  • ✅ Score statistics (min, max, avg) included
  • ✅ Zero score count tracking
  • ✅ Count of updated vs errored items
  • ✅ INFO-level logs (not spammy)
  • ✅ DEBUG-level per-item details preserved

Impact

Visibility Improvements:

  • Circuit breaker state changes are now self-explanatory
  • Reconnection progress is traceable end-to-end
  • RabbitMQ health can be monitored programmatically
  • Stats aggregation quality is measurable per run

Operations Benefits:

  • Faster troubleshooting with structured logs
  • Proactive monitoring via status endpoint
  • Clear separation of summary (INFO) vs detail (DEBUG) logs
  • Production-safe internal endpoints with environment guards