Procházet zdrojové kódy

refactor(stats-events): improve RabbitMQ connection handling and message processing

- Reorganized RabbitMQ connection setup and message consumption logic for clarity and error handling.
- Added detailed logging for connection status and message processing statistics.
- Enhanced error handling for malformed messages and persistence errors.
- Updated TypeScript target version from ES2019 to ES2020 for improved language features.
Dave před 3 měsíci
rodič
revize
417de8034f

+ 214 - 41
apps/box-stats-api/src/feature/rabbitmq/rabbitmq-consumer.service.ts

@@ -5,8 +5,8 @@ import {
   OnModuleInit,
 } from '@nestjs/common';
 import { ConfigService } from '@nestjs/config';
-import { Connection, Channel, ConsumeMessage } from 'amqplib';
 import * as amqp from 'amqplib';
+import { Channel, Connection, ConsumeMessage } from 'amqplib';
 import { UserLoginService } from '../user-login/user-login.service';
 import { UserLoginEventPayload } from '@box/common/events/user-login-event.dto';
 
@@ -17,83 +17,256 @@ export class RabbitmqConsumerService implements OnModuleInit, OnModuleDestroy {
   private connection?: Connection;
   private channel?: Channel;
 
+  private consumerTag?: string;
+
+  // Keep these for better logs + cleanup
+  private url?: string;
+  private exchange = 'stats.user';
+  private queue = 'stats.user.login.q';
+  private routingKey = 'user.login';
+
   constructor(
     private readonly config: ConfigService,
     private readonly userLoginHistoryService: UserLoginService,
   ) {}
 
-  async onModuleInit() {
-    const url = this.config.get<string>('RABBITMQ_URL');
-    const exchange =
-      this.config.get<string>('RABBITMQ_LOGIN_EXCHANGE') ?? 'stats.user';
-    const queue =
-      this.config.get<string>('RABBITMQ_LOGIN_QUEUE') ?? 'stats.user.login.q';
-    const routingKey =
-      this.config.get<string>('RABBITMQ_LOGIN_ROUTING_KEY') ?? 'user.login';
-
-    if (!url) {
-      this.logger.error('RABBITMQ_URL is not set');
+  async onModuleInit(): Promise<void> {
+    this.url = this.config.get<string>('RABBITMQ_URL')?.trim();
+    this.exchange =
+      this.config.get<string>('RABBITMQ_LOGIN_EXCHANGE')?.trim() ||
+      this.exchange;
+    this.queue =
+      this.config.get<string>('RABBITMQ_LOGIN_QUEUE')?.trim() || this.queue;
+    this.routingKey =
+      this.config.get<string>('RABBITMQ_LOGIN_ROUTING_KEY')?.trim() ||
+      this.routingKey;
+
+    if (!this.url) {
+      // IMPORTANT: we log and return, so app can still boot.
+      // If you want to fail-fast and crash the app, throw instead.
+      this.logger.error(
+        'RabbitMQ consumer is DISABLED: RABBITMQ_URL is not set',
+      );
       return;
     }
 
-    this.logger.log(`Connecting to RabbitMQ at ${url} ...`);
+    // Log once with all resolved params (helps debug “no Nest logs” cases too)
+    this.logger.log(
+      [
+        'RabbitMQ consumer bootstrap:',
+        `url=${this.maskAmqpUrl(this.url)}`,
+        `exchange=${this.exchange}`,
+        `queue=${this.queue}`,
+        `routingKey=${this.routingKey}`,
+      ].join(' '),
+    );
+
+    try {
+      await this.connectAndConsume();
+      this.logger.log('RabbitMQ consumer READY');
+    } catch (err) {
+      // Don’t let a connection problem silently kill startup visibility.
+      // You can either return (app keeps running) or throw (fail-fast).
+      this.logger.error(
+        'RabbitMQ consumer FAILED to start (connection/assert/consume error)',
+        err instanceof Error ? err.stack : String(err),
+      );
+
+      // Make sure partial resources get cleaned up.
+      await this.safeClose();
 
-    this.connection = await amqp.connect(url);
-    this.channel = await this.connection.createChannel();
+      // Choose ONE behavior:
+      // 1) keep app running but consumer disabled:
+      return;
 
-    await this.channel.assertExchange(exchange, 'topic', { durable: true });
-    await this.channel.assertQueue(queue, { durable: true });
-    await this.channel.bindQueue(queue, exchange, routingKey);
+      // 2) OR crash the app so you notice immediately:
+      // throw err;
+    }
+  }
+
+  private async connectAndConsume(): Promise<void> {
+    if (!this.url) throw new Error('RABBITMQ_URL missing at connect time');
 
+    // connect
     this.logger.log(
-      `Consuming queue="${queue}" exchange="${exchange}" routingKey="${routingKey}"`,
+      `Connecting to RabbitMQ at ${this.maskAmqpUrl(this.url)} ...`,
     );
+    const conn = await amqp.connect(this.url);
+
+    // connection lifecycle logs
+    conn.on('error', (e) => {
+      // amqplib can emit 'error' without closing
+      this.logger.error(
+        'RabbitMQ connection error event',
+        e instanceof Error ? e.stack : String(e),
+      );
+    });
+
+    conn.on('close', () => {
+      this.logger.warn('RabbitMQ connection closed');
+    });
+
+    this.connection = conn;
+
+    // channel
+    const ch = await conn.createChannel();
+
+    ch.on('error', (e) => {
+      this.logger.error(
+        'RabbitMQ channel error event',
+        e instanceof Error ? e.stack : String(e),
+      );
+    });
 
-    await this.channel.consume(queue, (msg) => this.handleMessage(msg), {
-      noAck: false,
+    ch.on('close', () => {
+      this.logger.warn('RabbitMQ channel closed');
     });
+
+    this.channel = ch;
+
+    // QoS: avoid flooding if handler is slower than producer
+    await ch.prefetch(20);
+
+    // asserts/bind
+    this.logger.log(
+      `Asserting exchange="${this.exchange}" queue="${this.queue}" and binding routingKey="${this.routingKey}"`,
+    );
+
+    await ch.assertExchange(this.exchange, 'topic', { durable: true });
+    await ch.assertQueue(this.queue, { durable: true });
+    await ch.bindQueue(this.queue, this.exchange, this.routingKey);
+
+    // consume
+    this.logger.log(
+      `Consuming: queue="${this.queue}" exchange="${this.exchange}" routingKey="${this.routingKey}" noAck=false`,
+    );
+
+    const res = await ch.consume(
+      this.queue,
+      (msg) => void this.handleMessage(msg),
+      { noAck: false },
+    );
+
+    this.consumerTag = res.consumerTag;
+    this.logger.log(`Consumer started (consumerTag="${this.consumerTag}")`);
   }
 
   private async handleMessage(msg: ConsumeMessage | null): Promise<void> {
-    if (!msg || !this.channel) return;
+    const ch = this.channel;
+    if (!msg || !ch) return;
 
-    const content = msg.content.toString();
+    const content = msg.content.toString('utf8');
 
     try {
-      const payload = JSON.parse(content) as UserLoginEventPayload;
+      const payload = this.safeParseJson<UserLoginEventPayload>(content);
+
+      if (!payload) {
+        this.logger.warn(
+          `Drop message: invalid JSON (deliveryTag=${msg.fields.deliveryTag}) content=${content}`,
+        );
+        ch.ack(msg);
+        return;
+      }
 
       // Basic sanity checks
       if (!payload.uid || !payload.ip || !payload.loginAt) {
         this.logger.warn(
-          `Invalid user.login payload, missing uid/ip/loginAt: ${content}`,
+          `Drop message: missing uid/ip/loginAt (deliveryTag=${msg.fields.deliveryTag}) content=${content}`,
         );
-        this.channel.ack(msg); // Don't retry poison messages
+        ch.ack(msg);
         return;
       }
 
       await this.userLoginHistoryService.recordLogin(payload);
-      this.channel.ack(msg);
-    } catch (error) {
+
+      ch.ack(msg);
+    } catch (err) {
       this.logger.error(
-        `Failed to process message: ${content}`,
-        error instanceof Error ? error.stack : String(error),
+        `Failed to process message (deliveryTag=${msg.fields.deliveryTag}) content=${content}`,
+        err instanceof Error ? err.stack : String(err),
       );
 
-      // For now: drop bad messages to avoid endless loops
-      // Later: route to DLQ.
-      this.channel.nack(msg, false, false);
+      // Drop bad messages to avoid endless loops (later: DLQ)
+      ch.nack(msg, false, false);
+    }
+  }
+
+  async onModuleDestroy(): Promise<void> {
+    this.logger.log('RabbitMQ consumer shutting down...');
+    await this.safeClose();
+    this.logger.log('RabbitMQ consumer shutdown complete');
+  }
+
+  private async safeClose(): Promise<void> {
+    // Try cancel consumer first (best-effort)
+    if (this.channel && this.consumerTag) {
+      try {
+        await this.channel.cancel(this.consumerTag);
+        this.logger.log(
+          `Consumer cancelled (consumerTag="${this.consumerTag}")`,
+        );
+      } catch (err) {
+        this.logger.warn(
+          `Failed to cancel consumer (consumerTag="${this.consumerTag}")`,
+          err instanceof Error ? err.stack : String(err),
+        );
+      } finally {
+        this.consumerTag = undefined;
+      }
+    }
+
+    // Close channel
+    if (this.channel) {
+      try {
+        await this.channel.close();
+      } catch (err) {
+        this.logger.warn(
+          'Error while closing RabbitMQ channel',
+          err instanceof Error ? err.stack : String(err),
+        );
+      } finally {
+        this.channel = undefined;
+      }
+    }
+
+    // Close connection
+    if (this.connection) {
+      try {
+        await this.connection.close();
+      } catch (err) {
+        this.logger.warn(
+          'Error while closing RabbitMQ connection',
+          err instanceof Error ? err.stack : String(err),
+        );
+      } finally {
+        this.connection = undefined;
+      }
+    }
+  }
+
+  private safeParseJson<T>(raw: string): T | null {
+    try {
+      return JSON.parse(raw) as T;
+    } catch {
+      return null;
     }
   }
 
-  async onModuleDestroy() {
+  private maskAmqpUrl(url: string): string {
+    // Mask credentials if present: amqp://user:pass@host/vhost
+    // Keep host + vhost for debugging.
     try {
-      await this.channel?.close();
-      await this.connection?.close();
-    } catch (error) {
-      this.logger.error(
-        'Error while closing RabbitMQ connection',
-        error instanceof Error ? error.stack : String(error),
-      );
+      const u = new URL(url);
+      if (u.username || u.password) {
+        const masked = new URL(url);
+        masked.username = u.username ? '***' : '';
+        masked.password = u.password ? '***' : '';
+        return masked.toString();
+      }
+      return url;
+    } catch {
+      // Non-standard URL formats: best effort
+      return url.replace(/\/\/([^:/@]+):([^@]+)@/g, '//***:***@');
     }
   }
 }

+ 104 - 37
apps/box-stats-api/src/feature/stats-events/stats-aggregation.scheduler.ts

@@ -1,79 +1,146 @@
-import { Injectable, Logger } from '@nestjs/common';
+import { Injectable, Logger, OnModuleInit } from '@nestjs/common';
 import { Cron, CronExpression } from '@nestjs/schedule';
 import { ConfigService } from '@nestjs/config';
 import { StatsAggregationService } from './stats-aggregation.service';
 
+type AggregationResult = {
+  successCount: number;
+  totalProcessed: number;
+  errorCount: number;
+};
+
 @Injectable()
-export class StatsAggregationScheduler {
+export class StatsAggregationScheduler implements OnModuleInit {
   private readonly logger = new Logger(StatsAggregationScheduler.name);
-  private readonly enabled: boolean;
-  private readonly windowDays?: number;
+
+  private enabled = true;
+  private windowDays?: number;
+
+  // guardrails: avoid overlapping runs + spam
+  private runningAds = false;
+  private runningVideo = false;
 
   constructor(
     private readonly configService: ConfigService,
     private readonly statsAggregation: StatsAggregationService,
-  ) {
-    this.enabled =
-      this.configService.get<string>('STATS_AGGREGATION_ENABLED') !== 'false';
-    const days = this.configService.get<string>(
-      'STATS_AGGREGATION_WINDOW_DAYS',
-    );
-    this.windowDays = days ? parseInt(days, 10) : undefined;
+  ) {}
+
+  onModuleInit(): void {
+    // Evaluate config on module init (safer than constructor for startup ordering)
+    const enabledRaw = this.configService
+      .get<string>('STATS_AGGREGATION_ENABLED')
+      ?.trim()
+      .toLowerCase();
+
+    // default: enabled (unless explicitly "false" or "0" etc.)
+    this.enabled = !['false', '0', 'off', 'no'].includes(enabledRaw ?? '');
+
+    const daysRaw = this.configService
+      .get<string>('STATS_AGGREGATION_WINDOW_DAYS')
+      ?.trim();
+
+    if (daysRaw) {
+      const parsed = Number.parseInt(daysRaw, 10);
+      if (Number.isFinite(parsed) && parsed > 0) {
+        this.windowDays = parsed;
+      } else {
+        this.logger.warn(
+          `Invalid STATS_AGGREGATION_WINDOW_DAYS="${daysRaw}" (expected positive integer). Falling back to "all time".`,
+        );
+        this.windowDays = undefined;
+      }
+    }
 
     if (this.enabled) {
       this.logger.log(
-        `📊 Stats aggregation scheduler enabled (window: ${this.windowDays ?? 'all time'})`,
+        `📊 Stats aggregation scheduler enabled (windowDays=${
+          this.windowDays ?? 'all time'
+        }, interval=${CronExpression.EVERY_5_MINUTES})`,
       );
     } else {
-      this.logger.warn('Stats aggregation scheduler is DISABLED');
+      this.logger.warn(
+        `📊 Stats aggregation scheduler DISABLED (STATS_AGGREGATION_ENABLED="${enabledRaw ?? ''}")`,
+      );
     }
   }
 
-  @Cron(CronExpression.EVERY_5_MINUTES)
-  async runAdsAggregation() {
+  @Cron(CronExpression.EVERY_5_MINUTES, { name: 'stats-aggregation-ads' })
+  async runAdsAggregation(): Promise<void> {
     if (!this.enabled) return;
 
-    const startTime = Date.now();
-    this.logger.log('⏰ Starting scheduled ads stats aggregation...');
+    if (this.runningAds) {
+      this.logger.warn(
+        '⏭️  Skip ads aggregation: previous run still in progress',
+      );
+      return;
+    }
+
+    this.runningAds = true;
+    const start = Date.now();
+
+    this.logger.log(
+      `⏰ Ads aggregation start (windowDays=${this.windowDays ?? 'all time'})`,
+    );
 
     try {
-      const result = await this.statsAggregation.aggregateAdsStats({
+      const result = (await this.statsAggregation.aggregateAdsStats({
         windowDays: this.windowDays,
-      });
-      const duration = Date.now() - startTime;
+      })) as AggregationResult;
+
+      const ms = Date.now() - start;
       this.logger.log(
-        `✅ Ads stats aggregation completed in ${duration}ms: ${result.successCount}/${result.totalProcessed} records updated (${result.errorCount} errors)`,
+        `✅ Ads aggregation done in ${ms}ms (${result.successCount}/${result.totalProcessed} updated, ${result.errorCount} errors)`,
       );
-    } catch (error: any) {
-      const duration = Date.now() - startTime;
+    } catch (err) {
+      const ms = Date.now() - start;
       this.logger.error(
-        `❌ Ads stats aggregation failed after ${duration}ms: ${error?.message ?? error}`,
-        error?.stack,
+        `❌ Ads aggregation failed after ${ms}ms: ${
+          err instanceof Error ? err.message : String(err)
+        }`,
+        err instanceof Error ? err.stack : undefined,
       );
+    } finally {
+      this.runningAds = false;
     }
   }
 
-  @Cron(CronExpression.EVERY_5_MINUTES)
-  async runVideoAggregation() {
+  @Cron(CronExpression.EVERY_5_MINUTES, { name: 'stats-aggregation-video' })
+  async runVideoAggregation(): Promise<void> {
     if (!this.enabled) return;
 
-    const startTime = Date.now();
-    this.logger.log('⏰ Starting scheduled video stats aggregation...');
+    if (this.runningVideo) {
+      this.logger.warn(
+        '⏭️  Skip video aggregation: previous run still in progress',
+      );
+      return;
+    }
+
+    this.runningVideo = true;
+    const start = Date.now();
+
+    this.logger.log(
+      `⏰ Video aggregation start (windowDays=${this.windowDays ?? 'all time'})`,
+    );
 
     try {
-      const result = await this.statsAggregation.aggregateVideoStats({
+      const result = (await this.statsAggregation.aggregateVideoStats({
         windowDays: this.windowDays,
-      });
-      const duration = Date.now() - startTime;
+      })) as AggregationResult;
+
+      const ms = Date.now() - start;
       this.logger.log(
-        `✅ Video stats aggregation completed in ${duration}ms: ${result.successCount}/${result.totalProcessed} records updated (${result.errorCount} errors)`,
+        `✅ Video aggregation done in ${ms}ms (${result.successCount}/${result.totalProcessed} updated, ${result.errorCount} errors)`,
       );
-    } catch (error: any) {
-      const duration = Date.now() - startTime;
+    } catch (err) {
+      const ms = Date.now() - start;
       this.logger.error(
-        `❌ Video stats aggregation failed after ${duration}ms: ${error?.message ?? error}`,
-        error?.stack,
+        `❌ Video aggregation failed after ${ms}ms: ${
+          err instanceof Error ? err.message : String(err)
+        }`,
+        err instanceof Error ? err.stack : undefined,
       );
+    } finally {
+      this.runningVideo = false;
     }
   }
 }

+ 285 - 134
apps/box-stats-api/src/feature/stats-events/stats-aggregation.service.ts

@@ -35,27 +35,24 @@ export class StatsAggregationService {
     private readonly redis: RedisService,
     private readonly mainMongo: MongoPrismaService,
   ) {
-    // Load CTR smoothing parameters (Laplace smoothing)
-    this.ctrAlpha = parseFloat(
-      this.configService.get<string>('STATS_CTR_ALPHA') ?? '1',
-    );
-    this.ctrBeta = parseFloat(
-      this.configService.get<string>('STATS_CTR_BETA') ?? '2',
-    );
+    this.ctrAlpha = this.readFiniteNumber('STATS_CTR_ALPHA', 1);
+    this.ctrBeta = this.readFiniteNumber('STATS_CTR_BETA', 2);
 
-    // Load scoring weights (must sum to meaningful proportion, normalize if needed)
-    this.weightPopularity = parseFloat(
-      this.configService.get<string>('STATS_WEIGHT_POPULARITY') ?? '0.5',
-    );
-    this.weightCtr = parseFloat(
-      this.configService.get<string>('STATS_WEIGHT_CTR') ?? '0.3',
-    );
-    this.weightRecency = parseFloat(
-      this.configService.get<string>('STATS_WEIGHT_RECENCY') ?? '0.2',
-    );
+    const wPop = this.readFiniteNumber('STATS_WEIGHT_POPULARITY', 0.5);
+    const wCtr = this.readFiniteNumber('STATS_WEIGHT_CTR', 0.3);
+    const wRec = this.readFiniteNumber('STATS_WEIGHT_RECENCY', 0.2);
+
+    const normalized = this.normalizeWeights(wPop, wCtr, wRec);
+
+    this.weightPopularity = normalized.popularity;
+    this.weightCtr = normalized.ctr;
+    this.weightRecency = normalized.recency;
 
     this.logger.log(
-      `📊 Scoring config: CTR(α=${this.ctrAlpha}, β=${this.ctrBeta}), Weights(pop=${this.weightPopularity}, ctr=${this.weightCtr}, rec=${this.weightRecency})`,
+      `📊 Scoring config loaded: CTR(α=${this.ctrAlpha}, β=${this.ctrBeta}), ` +
+        `Weights(pop=${this.weightPopularity.toFixed(4)}, ctr=${this.weightCtr.toFixed(
+          4,
+        )}, rec=${this.weightRecency.toFixed(4)})`,
     );
   }
 
@@ -66,57 +63,84 @@ export class StatsAggregationService {
   async aggregateAdsStats(
     options: AggregationOptions = {},
   ): Promise<AggregationResult> {
-    const client = this.prisma as any;
+    const runId = this.newRunId('ads');
+    const startedAtMs = Date.now();
+
     const { windowDays } = options;
+    const cutoffTime = this.computeCutoffTime(windowDays);
 
     this.logger.log(
-      `Starting ads stats aggregation (window: ${windowDays ?? 'all time'})`,
+      `[${runId}] Starting ads stats aggregation (window=${windowDays ?? 'all time'}, cutoff=${cutoffTime.toString()})`,
     );
 
-    const cutoffTime =
-      windowDays !== undefined
-        ? nowEpochMsBigInt() - BigInt(windowDays * 24 * 60 * 60 * 1000)
-        : BigInt(0);
-
-    // Get all unique adIds from click and impression events
-    const adIds = await this.getUniqueAdIds(cutoffTime);
+    let adIds: string[] = [];
+    try {
+      adIds = await this.getUniqueAdIds(cutoffTime);
+    } catch (err) {
+      this.logger.error(
+        `[${runId}] Failed to load unique adIds`,
+        err instanceof Error ? err.stack : String(err),
+      );
+      return { totalProcessed: 0, successCount: 0, errorCount: 1 };
+    }
 
-    this.logger.log(`Found ${adIds.length} unique ads to aggregate`);
+    this.logger.log(`[${runId}] Found ${adIds.length} unique ads to aggregate`);
 
     let successCount = 0;
     let errorCount = 0;
-    const scores: number[] = [];
+
+    // score stats without storing whole array
+    let scoreMin = Number.POSITIVE_INFINITY;
+    let scoreMax = Number.NEGATIVE_INFINITY;
+    let scoreSum = 0;
+    let scoreCount = 0;
     let zeroScoreCount = 0;
 
-    for (const adId of adIds) {
+    for (let i = 0; i < adIds.length; i++) {
+      const adId = adIds[i];
+
       try {
-        const score = await this.aggregateSingleAd(adId, cutoffTime);
-        scores.push(score);
+        const score = await this.aggregateSingleAd(adId, cutoffTime, runId);
+
+        scoreCount++;
+        scoreSum += score;
+        if (score < scoreMin) scoreMin = score;
+        if (score > scoreMax) scoreMax = score;
         if (score === 0) zeroScoreCount++;
+
         successCount++;
-      } catch (error: any) {
+      } catch (err) {
         errorCount++;
         this.logger.error(
-          `Failed to aggregate stats for adId=${adId}: ${error?.message ?? error}`,
-          error?.stack,
+          `[${runId}] Failed to aggregate adId=${adId}`,
+          err instanceof Error ? err.stack : String(err),
+        );
+      }
+
+      // light progress log every 500 items (helps “silent startup” / long runs)
+      if ((i + 1) % 500 === 0) {
+        this.logger.log(
+          `[${runId}] Progress ${i + 1}/${adIds.length} (ok=${successCount}, err=${errorCount})`,
         );
       }
     }
 
-    // Calculate score statistics
-    const scoreStats =
-      scores.length > 0
+    const durationMs = Date.now() - startedAtMs;
+
+    const stats =
+      scoreCount > 0
         ? {
-            min: Math.min(...scores),
-            max: Math.max(...scores),
-            avg: scores.reduce((sum, s) => sum + s, 0) / scores.length,
+            min: Number.isFinite(scoreMin) ? scoreMin : 0,
+            max: Number.isFinite(scoreMax) ? scoreMax : 0,
+            avg: scoreSum / scoreCount,
           }
         : { min: 0, max: 0, avg: 0 };
 
     this.logger.log(
-      `📊 Ads aggregation complete: updated=${successCount}, errors=${errorCount}, ` +
-        `scores(min=${scoreStats.min.toFixed(4)}, max=${scoreStats.max.toFixed(4)}, avg=${scoreStats.avg.toFixed(4)}), ` +
-        `zeroScores=${zeroScoreCount}`,
+      `[${runId}] ✅ Ads aggregation complete in ${durationMs}ms: updated=${successCount}, errors=${errorCount}, ` +
+        `scores(min=${stats.min.toFixed(4)}, max=${stats.max.toFixed(4)}, avg=${stats.avg.toFixed(
+          4,
+        )}), zeroScores=${zeroScoreCount}`,
     );
 
     return {
@@ -132,56 +156,88 @@ export class StatsAggregationService {
   async aggregateVideoStats(
     options: AggregationOptions = {},
   ): Promise<AggregationResult> {
-    const client = this.prisma as any;
+    const runId = this.newRunId('video');
+    const startedAtMs = Date.now();
+
     const { windowDays } = options;
+    const cutoffTime = this.computeCutoffTime(windowDays);
 
     this.logger.log(
-      `Starting video stats aggregation (window: ${windowDays ?? 'all time'})`,
+      `[${runId}] Starting video stats aggregation (window=${windowDays ?? 'all time'}, cutoff=${cutoffTime.toString()})`,
     );
 
-    const cutoffTime =
-      windowDays !== undefined
-        ? nowEpochMsBigInt() - BigInt(windowDays * 24 * 60 * 60 * 1000)
-        : BigInt(0);
-
-    const videoIds = await this.getUniqueVideoIds(cutoffTime);
+    let videoIds: string[] = [];
+    try {
+      videoIds = await this.getUniqueVideoIds(cutoffTime);
+    } catch (err) {
+      this.logger.error(
+        `[${runId}] Failed to load unique videoIds`,
+        err instanceof Error ? err.stack : String(err),
+      );
+      return { totalProcessed: 0, successCount: 0, errorCount: 1 };
+    }
 
-    this.logger.log(`Found ${videoIds.length} unique videos to aggregate`);
+    this.logger.log(
+      `[${runId}] Found ${videoIds.length} unique videos to aggregate`,
+    );
 
     let successCount = 0;
     let errorCount = 0;
-    const scores: number[] = [];
+
+    let scoreMin = Number.POSITIVE_INFINITY;
+    let scoreMax = Number.NEGATIVE_INFINITY;
+    let scoreSum = 0;
+    let scoreCount = 0;
     let zeroScoreCount = 0;
 
-    for (const videoId of videoIds) {
+    for (let i = 0; i < videoIds.length; i++) {
+      const videoId = videoIds[i];
+
       try {
-        const score = await this.aggregateSingleVideo(videoId, cutoffTime);
-        scores.push(score);
+        const score = await this.aggregateSingleVideo(
+          videoId,
+          cutoffTime,
+          runId,
+        );
+
+        scoreCount++;
+        scoreSum += score;
+        if (score < scoreMin) scoreMin = score;
+        if (score > scoreMax) scoreMax = score;
         if (score === 0) zeroScoreCount++;
+
         successCount++;
-      } catch (error: any) {
+      } catch (err) {
         errorCount++;
         this.logger.error(
-          `Failed to aggregate stats for videoId=${videoId}: ${error?.message ?? error}`,
-          error?.stack,
+          `[${runId}] Failed to aggregate videoId=${videoId}`,
+          err instanceof Error ? err.stack : String(err),
+        );
+      }
+
+      if ((i + 1) % 500 === 0) {
+        this.logger.log(
+          `[${runId}] Progress ${i + 1}/${videoIds.length} (ok=${successCount}, err=${errorCount})`,
         );
       }
     }
 
-    // Calculate score statistics
-    const scoreStats =
-      scores.length > 0
+    const durationMs = Date.now() - startedAtMs;
+
+    const stats =
+      scoreCount > 0
         ? {
-            min: Math.min(...scores),
-            max: Math.max(...scores),
-            avg: scores.reduce((sum, s) => sum + s, 0) / scores.length,
+            min: Number.isFinite(scoreMin) ? scoreMin : 0,
+            max: Number.isFinite(scoreMax) ? scoreMax : 0,
+            avg: scoreSum / scoreCount,
           }
         : { min: 0, max: 0, avg: 0 };
 
     this.logger.log(
-      `📊 Video aggregation complete: updated=${successCount}, errors=${errorCount}, ` +
-        `scores(min=${scoreStats.min.toFixed(4)}, max=${scoreStats.max.toFixed(4)}, avg=${scoreStats.avg.toFixed(4)}), ` +
-        `zeroScores=${zeroScoreCount}`,
+      `[${runId}] ✅ Video aggregation complete in ${durationMs}ms: updated=${successCount}, errors=${errorCount}, ` +
+        `scores(min=${stats.min.toFixed(4)}, max=${stats.max.toFixed(4)}, avg=${stats.avg.toFixed(
+          4,
+        )}), zeroScores=${zeroScoreCount}`,
     );
 
     return {
@@ -207,8 +263,10 @@ export class StatsAggregationService {
     });
 
     const allAdIds = new Set<string>();
-    clickAdIds.forEach((item: any) => allAdIds.add(item.adId));
-    impressionAdIds.forEach((item: any) => allAdIds.add(item.adId));
+    clickAdIds.forEach((item: any) => item?.adId && allAdIds.add(item.adId));
+    impressionAdIds.forEach(
+      (item: any) => item?.adId && allAdIds.add(item.adId),
+    );
 
     return Array.from(allAdIds);
   }
@@ -222,12 +280,13 @@ export class StatsAggregationService {
       distinct: ['videoId'],
     });
 
-    return videoIds.map((item: any) => item.videoId);
+    return videoIds.map((item: any) => item.videoId).filter(Boolean);
   }
 
   private async aggregateSingleAd(
     adId: string,
     cutoffTime: bigint,
+    runId: string,
   ): Promise<number> {
     const client = this.prisma as any;
 
@@ -260,20 +319,18 @@ export class StatsAggregationService {
       orderBy: { impressionAt: 'asc' },
     });
 
-    const allTimes = [
-      ...clickTimes.map((t: any) => t.clickedAt),
-      ...impressionTimes.map((t: any) => t.impressionAt),
+    const allTimes: bigint[] = [
+      ...clickTimes.map((t: any) => t.clickedAt).filter((v: any) => v != null),
+      ...impressionTimes
+        .map((t: any) => t.impressionAt)
+        .filter((v: any) => v != null),
     ];
 
-    if (allTimes.length === 0) {
-      // No events, skip
-      return 0;
-    }
+    if (allTimes.length === 0) return 0;
 
     const firstSeenAt = allTimes.reduce((min, val) => (val < min ? val : min));
     const lastSeenAt = allTimes.reduce((max, val) => (val > max ? val : max));
 
-    // Compute metrics using updated formulas
     const computedPopularity = this.computePopularity(impressions);
     const computedCtr = this.computeCtr(clicks, impressions);
     const computedRecency = this.computeRecency(firstSeenAt);
@@ -285,7 +342,6 @@ export class StatsAggregationService {
 
     const now = nowEpochMsBigInt();
 
-    // Upsert into AdsGlobalStats
     await client.adsGlobalStats.upsert({
       where: { adId },
       update: {
@@ -314,11 +370,12 @@ export class StatsAggregationService {
     });
 
     this.logger.debug(
-      `Aggregated adId=${adId}: impressions=${impressions}, clicks=${clicks}, CTR=${computedCtr.toFixed(4)}, score=${computedScore.toFixed(4)}`,
+      `[${runId}] adId=${adId} imp=${impressions} clk=${clicks} ctr=${computedCtr.toFixed(
+        4,
+      )} score=${computedScore.toFixed(4)}`,
     );
 
-    // Sync score to Redis sorted sets
-    await this.syncAdScoreToRedis(adId, computedScore);
+    await this.syncAdScoreToRedis(adId, computedScore, runId);
 
     return computedScore;
   }
@@ -326,10 +383,11 @@ export class StatsAggregationService {
   private async aggregateSingleVideo(
     videoId: string,
     cutoffTime: bigint,
+    runId: string,
   ): Promise<number> {
     const client = this.prisma as any;
 
-    // Count clicks (we don't have impressions for videos yet, so just clicks)
+    // Count clicks (no impressions for videos yet)
     const clicks = await client.videoClickEvents.count({
       where: {
         videoId,
@@ -337,9 +395,7 @@ export class StatsAggregationService {
       },
     });
 
-    // For videos, we can treat clicks as a proxy for both impressions and clicks
-    // Or set impressions = 0 if no separate impression tracking
-    const impressions = clicks; // Adjust as needed
+    const impressions = clicks;
 
     const clickTimes = await client.videoClickEvents.findMany({
       where: { videoId, clickedAt: { gte: cutoffTime } },
@@ -347,11 +403,14 @@ export class StatsAggregationService {
       orderBy: { clickedAt: 'asc' },
     });
 
-    if (clickTimes.length === 0) {
-      return 0;
-    }
+    if (clickTimes.length === 0) return 0;
+
+    const allTimes: bigint[] = clickTimes
+      .map((t: any) => t.clickedAt)
+      .filter((v: any) => v != null);
+
+    if (allTimes.length === 0) return 0;
 
-    const allTimes = clickTimes.map((t: any) => t.clickedAt);
     const firstSeenAt = allTimes.reduce((min, val) => (val < min ? val : min));
     const lastSeenAt = allTimes.reduce((max, val) => (val > max ? val : max));
 
@@ -394,11 +453,12 @@ export class StatsAggregationService {
     });
 
     this.logger.debug(
-      `Aggregated videoId=${videoId}: impressions=${impressions}, clicks=${clicks}, CTR=${computedCtr.toFixed(4)}, score=${computedScore.toFixed(4)}`,
+      `[${runId}] videoId=${videoId} imp=${impressions} clk=${clicks} ctr=${computedCtr.toFixed(
+        4,
+      )} score=${computedScore.toFixed(4)}`,
     );
 
-    // Sync score to Redis sorted sets
-    await this.syncVideoScoreToRedis(videoId, computedScore);
+    await this.syncVideoScoreToRedis(videoId, computedScore, runId);
 
     return computedScore;
   }
@@ -408,33 +468,44 @@ export class StatsAggregationService {
    * Formula: log(1 + impressions)
    */
   private computePopularity(impressions: number): number {
-    return Math.log(1 + impressions);
+    return Math.log(1 + Math.max(0, impressions));
   }
 
   /**
-   * Smoothed CTR to avoid division by zero and reduce variance on low-volume items.
+   * Smoothed CTR
    * Formula: (clicks + alpha) / (impressions + beta)
-   * Default: alpha=1, beta=2 (Laplace smoothing)
    */
   private computeCtr(clicks: number, impressions: number): number {
-    return (clicks + this.ctrAlpha) / (impressions + this.ctrBeta);
+    const c = Math.max(0, clicks);
+    const i = Math.max(0, impressions);
+    const denom = i + this.ctrBeta;
+
+    // denom should never be 0 because beta defaults to 2, but guard anyway
+    if (!Number.isFinite(denom) || denom <= 0) return 0;
+
+    return (c + this.ctrAlpha) / denom;
   }
 
   /**
    * Recency score based on age since first appearance.
-   * More recent = higher score.
    * Formula: 1 / (1 + ageDays)
    */
   private computeRecency(firstSeenAt: bigint): number {
     const now = nowEpochMsBigInt();
-    const ageMs = Number(now - firstSeenAt);
+
+    // If clocks/time data is weird, avoid negative ages blowing up score
+    let ageMsBig = now - firstSeenAt;
+    if (ageMsBig < BigInt(0)) ageMsBig = BigInt(0);
+
+    // Convert safely to number for division; clamp if extremely large
+    const ageMs = this.bigIntToSafeNumber(ageMsBig);
+
     const ageDays = ageMs / (1000 * 60 * 60 * 24);
     return 1 / (1 + ageDays);
   }
 
   /**
-   * Composite score combining popularity, CTR, and recency.
-   * Formula: w1 * popularity + w2 * ctr + w3 * recency
+   * Composite score: w1 * popularity + w2 * ctr + w3 * recency
    */
   private computeScore(
     popularity: number,
@@ -448,72 +519,152 @@ export class StatsAggregationService {
     );
   }
 
-  /**
-   * Sync ad score to Redis sorted sets:
-   * - ads:global:score (all ads)
-   * - ads:tag:<tagId>:score (per tag, if ads have tags in the future)
-   */
-  private async syncAdScoreToRedis(adId: string, score: number): Promise<void> {
+  private async syncAdScoreToRedis(
+    adId: string,
+    score: number,
+    runId: string,
+  ): Promise<void> {
     try {
-      // Global sorted set for all ads
       const client = (this.redis as any).ensureClient();
       await client.zadd('ads:global:score', score, adId);
 
-      // TODO: If ads have tags in the future, fetch tagIds and add to tag-based sets
-      // For now, ads don't have tagIds in the schema, so skip tag-based syncing
-
       this.logger.debug(
-        `Synced adId=${adId} score=${score.toFixed(4)} to Redis`,
+        `[${runId}] Redis sync adId=${adId} score=${score.toFixed(4)} ok`,
       );
-    } catch (error: any) {
+    } catch (err) {
       this.logger.error(
-        `Failed to sync ad score to Redis for adId=${adId}: ${error?.message ?? error}`,
-        error?.stack,
+        `[${runId}] Redis sync FAILED for adId=${adId}`,
+        err instanceof Error ? err.stack : String(err),
       );
-      // Don't fail the whole aggregation job
     }
   }
 
-  /**
-   * Sync video score to Redis sorted sets:
-   * - video:global:score (all videos)
-   * - video:tag:<tagId>:score (per tag)
-   */
   private async syncVideoScoreToRedis(
     videoId: string,
     score: number,
+    runId: string,
   ): Promise<void> {
     try {
       const client = (this.redis as any).ensureClient();
 
-      // Global sorted set for all videos
       await client.zadd('video:global:score', score, videoId);
 
-      // Fetch tagIds from main Mongo DB
       const video = await this.mainMongo.videoMedia.findUnique({
         where: { id: videoId },
         select: { tagIds: true },
       });
 
-      if (video && video.tagIds && video.tagIds.length > 0) {
-        // Add to tag-based sorted sets
-        for (const tagId of video.tagIds) {
+      const tagIds = video?.tagIds ?? [];
+      if (Array.isArray(tagIds) && tagIds.length > 0) {
+        for (const tagId of tagIds) {
           await client.zadd(`video:tag:${tagId}:score`, score, videoId);
         }
+
         this.logger.debug(
-          `Synced videoId=${videoId} score=${score.toFixed(4)} to Redis (${video.tagIds.length} tags)`,
+          `[${runId}] Redis sync videoId=${videoId} score=${score.toFixed(
+            4,
+          )} ok (tags=${tagIds.length})`,
         );
       } else {
         this.logger.debug(
-          `Synced videoId=${videoId} score=${score.toFixed(4)} to Redis (no tags)`,
+          `[${runId}] Redis sync videoId=${videoId} score=${score.toFixed(
+            4,
+          )} ok (no tags)`,
         );
       }
-    } catch (error: any) {
+    } catch (err) {
       this.logger.error(
-        `Failed to sync video score to Redis for videoId=${videoId}: ${error?.message ?? error}`,
-        error?.stack,
+        `[${runId}] Redis sync FAILED for videoId=${videoId}`,
+        err instanceof Error ? err.stack : String(err),
       );
-      // Don't fail the whole aggregation job
     }
   }
+
+  // -------------------------
+  // helpers (config + safety)
+  // -------------------------
+
+  private computeCutoffTime(windowDays?: number): bigint {
+    if (windowDays === undefined) return BigInt(0);
+
+    const days = Number(windowDays);
+    if (!Number.isFinite(days) || days <= 0) return BigInt(0);
+
+    const ms = BigInt(Math.floor(days * 24 * 60 * 60 * 1000));
+    const now = nowEpochMsBigInt();
+    const cutoff = now - ms;
+
+    return cutoff > BigInt(0) ? cutoff : BigInt(0);
+  }
+
+  private readFiniteNumber(key: string, fallback: number): number {
+    const raw = this.configService.get<string>(key);
+
+    if (raw == null || raw.trim() === '') return fallback;
+
+    const n = Number.parseFloat(raw.trim());
+    if (!Number.isFinite(n)) {
+      this.logger.warn(
+        `Config ${key}="${raw}" is not a finite number; using ${fallback}`,
+      );
+      return fallback;
+    }
+
+    return n;
+  }
+
+  private normalizeWeights(
+    popularity: number,
+    ctr: number,
+    recency: number,
+  ): {
+    popularity: number;
+    ctr: number;
+    recency: number;
+  } {
+    const wp = Number.isFinite(popularity) && popularity >= 0 ? popularity : 0;
+    const wc = Number.isFinite(ctr) && ctr >= 0 ? ctr : 0;
+    const wr = Number.isFinite(recency) && recency >= 0 ? recency : 0;
+
+    const sum = wp + wc + wr;
+
+    if (sum <= 0) {
+      this.logger.warn(
+        `Invalid weights (sum<=0). Falling back to defaults (0.5,0.3,0.2)`,
+      );
+      return { popularity: 0.5, ctr: 0.3, recency: 0.2 };
+    }
+
+    // Normalize to sum=1 so config mistakes don’t blow up scoring scale
+    const np = wp / sum;
+    const nc = wc / sum;
+    const nr = wr / sum;
+
+    if (Math.abs(sum - 1) > 1e-9) {
+      this.logger.warn(
+        `Weights normalized from (pop=${wp}, ctr=${wc}, rec=${wr}, sum=${sum}) to ` +
+          `(pop=${np.toFixed(4)}, ctr=${nc.toFixed(4)}, rec=${nr.toFixed(4)})`,
+      );
+    }
+
+    return { popularity: np, ctr: nc, recency: nr };
+  }
+
+  private bigIntToSafeNumber(v: bigint): number {
+    const MAX_SAFE = BigInt(Number.MAX_SAFE_INTEGER);
+    if (v <= BigInt(0)) return 0;
+
+    if (v > MAX_SAFE) return Number.MAX_SAFE_INTEGER;
+
+    return Number(v);
+  }
+
+  private newRunId(prefix: string): string {
+    // tiny unique-ish id for logs
+    const ts = Date.now().toString(36);
+    const rnd = Math.floor(Math.random() * 1_000_000)
+      .toString(36)
+      .padStart(4, '0');
+    return `${prefix}-${ts}-${rnd}`;
+  }
 }

+ 291 - 104
apps/box-stats-api/src/feature/stats-events/stats-events.consumer.ts

@@ -5,8 +5,8 @@ import {
   OnModuleInit,
 } from '@nestjs/common';
 import { ConfigService } from '@nestjs/config';
-import { Connection, Channel, ConsumeMessage } from 'amqplib';
 import * as amqp from 'amqplib';
+import { Channel, Connection, ConsumeMessage } from 'amqplib';
 import { PrismaMongoService } from '../../prisma/prisma-mongo.service';
 import { nowEpochMsBigInt } from '@box/common/time/time.util';
 
@@ -25,18 +25,17 @@ interface AdClickMessage extends BaseStatsMessage {
   adsId: string; // Ad ID (from publisher)
   adId?: string; // Alternative field name (for backward compatibility)
   channelId: string;
-  scene?: string; // Optional - from ad placement context
-  slot?: string; // Optional - from ad placement context
+  scene?: string;
+  slot?: string;
   adType: string;
-  clickedAt?: string | number | bigint; // Alternative field name
+  clickedAt?: string | number | bigint;
   clickAt?: string | number | bigint; // Publisher sends this
-  machine?: string; // Device info: brand and system version
+  machine?: string;
 }
 
 interface VideoClickMessage extends BaseStatsMessage {
   videoId: string;
-  // channelId: string;
-  machine: string; // Device info: brand and system version
+  machine: string;
   categoryId?: string;
   scene: string;
   clickedAt: string | number | bigint;
@@ -44,13 +43,12 @@ interface VideoClickMessage extends BaseStatsMessage {
 
 interface AdImpressionMessage extends BaseStatsMessage {
   adId: string;
-  // channelId: string;
   scene: string;
   slot: string;
   adType: string;
   impressionAt: string | number | bigint;
   visibleDurationMs?: number;
-  machine?: string; // Device info: brand and system version
+  machine?: string;
 }
 
 @Injectable()
@@ -60,14 +58,32 @@ export class StatsEventsConsumer implements OnModuleInit, OnModuleDestroy {
   private connection?: Connection;
   private channel?: Channel;
 
+  private consumerTags: string[] = [];
+
   private counters = {
     adClick: 0,
     videoClick: 0,
     adImpression: 0,
+    parseError: 0,
+    malformed: 0,
+    duplicate: 0,
+    persistError: 0,
   };
 
   private logInterval?: NodeJS.Timeout;
 
+  // keep config for log clarity + cleanup
+  private url?: string;
+  private exchange = 'stats.user';
+
+  private queueAdClick = 'stats.ad.click';
+  private queueVideoClick = 'stats.video.click';
+  private queueAdImpression = 'stats.ad.impression';
+
+  private routingKeyAdClick = 'stats.ad.click';
+  private routingKeyVideoClick = 'stats.video.click';
+  private routingKeyAdImpression = 'stats.ad.impression';
+
   constructor(
     private readonly config: ConfigService,
     private readonly prisma: PrismaMongoService,
@@ -78,74 +94,156 @@ export class StatsEventsConsumer implements OnModuleInit, OnModuleDestroy {
   }
 
   async onModuleInit(): Promise<void> {
-    const url = this.config.get<string>('RABBITMQ_URL');
-    const exchange =
-      this.config.get<string>('RABBITMQ_STATS_EXCHANGE') ?? 'stats.user';
-
-    const queueAdClick =
-      this.config.get<string>('RABBITMQ_STATS_AD_CLICK_QUEUE') ??
-      'stats.ad.click';
-    const queueVideoClick =
-      this.config.get<string>('RABBITMQ_STATS_VIDEO_CLICK_QUEUE') ??
-      'stats.video.click';
-    const queueAdImpression =
-      this.config.get<string>('RABBITMQ_STATS_AD_IMPRESSION_QUEUE') ??
-      'stats.ad.impression';
-
-    const routingKeyAdClick =
-      this.config.get<string>('RABBITMQ_STATS_AD_CLICK_ROUTING_KEY') ??
-      'stats.ad.click';
-    const routingKeyVideoClick =
-      this.config.get<string>('RABBITMQ_STATS_VIDEO_CLICK_ROUTING_KEY') ??
-      'stats.video.click';
-    const routingKeyAdImpression =
-      this.config.get<string>('RABBITMQ_STATS_AD_IMPRESSION_ROUTING_KEY') ??
-      'stats.ad.impression';
-
-    if (!url) {
-      this.logger.error('RABBITMQ_URL is not set');
+    this.url = this.config.get<string>('RABBITMQ_URL')?.trim() || undefined;
+    this.exchange =
+      this.config.get<string>('RABBITMQ_STATS_EXCHANGE')?.trim() ||
+      this.exchange;
+
+    this.queueAdClick =
+      this.config.get<string>('RABBITMQ_STATS_AD_CLICK_QUEUE')?.trim() ||
+      this.queueAdClick;
+    this.queueVideoClick =
+      this.config.get<string>('RABBITMQ_STATS_VIDEO_CLICK_QUEUE')?.trim() ||
+      this.queueVideoClick;
+    this.queueAdImpression =
+      this.config.get<string>('RABBITMQ_STATS_AD_IMPRESSION_QUEUE')?.trim() ||
+      this.queueAdImpression;
+
+    this.routingKeyAdClick =
+      this.config.get<string>('RABBITMQ_STATS_AD_CLICK_ROUTING_KEY')?.trim() ||
+      this.routingKeyAdClick;
+    this.routingKeyVideoClick =
+      this.config
+        .get<string>('RABBITMQ_STATS_VIDEO_CLICK_ROUTING_KEY')
+        ?.trim() || this.routingKeyVideoClick;
+    this.routingKeyAdImpression =
+      this.config
+        .get<string>('RABBITMQ_STATS_AD_IMPRESSION_ROUTING_KEY')
+        ?.trim() || this.routingKeyAdImpression;
+
+    if (!this.url) {
+      // If you want to fail-fast and stop app boot: throw new Error(...)
+      this.logger.error(
+        'StatsEventsConsumer is DISABLED: RABBITMQ_URL is not set',
+      );
       return;
     }
 
-    this.logger.log(`Connecting to RabbitMQ at ${url} ...`);
+    this.logger.log(
+      [
+        'StatsEventsConsumer bootstrap:',
+        `url=${this.maskAmqpUrl(this.url)}`,
+        `exchange=${this.exchange}`,
+        `queues=[${this.queueAdClick}, ${this.queueVideoClick}, ${this.queueAdImpression}]`,
+        `routingKeys=[${this.routingKeyAdClick}, ${this.routingKeyVideoClick}, ${this.routingKeyAdImpression}]`,
+      ].join(' '),
+    );
 
-    this.connection = await amqp.connect(url);
-    this.channel = await this.connection.createChannel();
+    try {
+      await this.connectAndConsume();
+      this.logger.log('🚀 StatsEventsConsumer READY');
+
+      this.logInterval = setInterval(() => {
+        this.logger.log(
+          `📊 Ingestion stats: ` +
+            `adClick=${this.counters.adClick}, videoClick=${this.counters.videoClick}, adImpression=${this.counters.adImpression}, ` +
+            `duplicate=${this.counters.duplicate}, malformed=${this.counters.malformed}, parseError=${this.counters.parseError}, persistError=${this.counters.persistError}`,
+        );
+      }, 60_000);
+    } catch (err) {
+      this.logger.error(
+        'StatsEventsConsumer FAILED to start (connection/assert/consume error)',
+        err instanceof Error ? err.stack : String(err),
+      );
+      await this.safeClose();
+      // keep app running, consumer disabled
+      return;
+      // OR fail-fast:
+      // throw err;
+    }
+  }
 
-    await this.channel.assertExchange(exchange, 'topic', { durable: true });
+  private async connectAndConsume(): Promise<void> {
+    if (!this.url) throw new Error('RABBITMQ_URL missing at connect time');
 
-    await this.assertAndBind(queueAdClick, exchange, routingKeyAdClick);
-    await this.assertAndBind(queueVideoClick, exchange, routingKeyVideoClick);
-    await this.assertAndBind(
-      queueAdImpression,
-      exchange,
-      routingKeyAdImpression,
+    this.logger.log(
+      `Connecting to RabbitMQ at ${this.maskAmqpUrl(this.url)} ...`,
     );
+    const conn = await amqp.connect(this.url);
+
+    conn.on('error', (e) => {
+      this.logger.error(
+        'RabbitMQ connection error event',
+        e instanceof Error ? e.stack : String(e),
+      );
+    });
 
-    await this.channel.consume(queueAdClick, (msg) => this.handleAdClick(msg), {
-      noAck: false,
+    conn.on('close', () => {
+      this.logger.warn('RabbitMQ connection closed');
     });
-    await this.channel.consume(
-      queueVideoClick,
-      (msg) => this.handleVideoClick(msg),
-      { noAck: false },
+
+    this.connection = conn;
+
+    const ch = await conn.createChannel();
+
+    ch.on('error', (e) => {
+      this.logger.error(
+        'RabbitMQ channel error event',
+        e instanceof Error ? e.stack : String(e),
+      );
+    });
+
+    ch.on('close', () => {
+      this.logger.warn('RabbitMQ channel closed');
+    });
+
+    this.channel = ch;
+
+    // QoS to avoid handler overload
+    await ch.prefetch(200);
+
+    this.logger.log(`Asserting exchange="${this.exchange}" type=topic durable`);
+    await ch.assertExchange(this.exchange, 'topic', { durable: true });
+
+    await this.assertAndBind(
+      this.queueAdClick,
+      this.exchange,
+      this.routingKeyAdClick,
     );
-    await this.channel.consume(
-      queueAdImpression,
-      (msg) => this.handleAdImpression(msg),
-      { noAck: false },
+    await this.assertAndBind(
+      this.queueVideoClick,
+      this.exchange,
+      this.routingKeyVideoClick,
+    );
+    await this.assertAndBind(
+      this.queueAdImpression,
+      this.exchange,
+      this.routingKeyAdImpression,
     );
 
     this.logger.log(
-      `StatsEventsConsumer consuming queues: [${queueAdClick}, ${queueVideoClick}, ${queueAdImpression}] on exchange=${exchange}`,
+      `Consuming queues (noAck=false): ${this.queueAdClick}, ${this.queueVideoClick}, ${this.queueAdImpression}`,
     );
-    this.logger.log('🚀 StatsEventsConsumer started successfully');
 
-    this.logInterval = setInterval(() => {
-      this.logger.log(
-        `📊 Ingestion stats: adClick=${this.counters.adClick}, videoClick=${this.counters.videoClick}, adImpression=${this.counters.adImpression}`,
-      );
-    }, 60000);
+    const c1 = await ch.consume(
+      this.queueAdClick,
+      (msg) => void this.handleAdClick(msg),
+      { noAck: false },
+    );
+    const c2 = await ch.consume(
+      this.queueVideoClick,
+      (msg) => void this.handleVideoClick(msg),
+      { noAck: false },
+    );
+    const c3 = await ch.consume(
+      this.queueAdImpression,
+      (msg) => void this.handleAdImpression(msg),
+      { noAck: false },
+    );
+
+    this.consumerTags = [c1.consumerTag, c2.consumerTag, c3.consumerTag];
+
+    this.logger.log(`Consumer started (tags=${this.consumerTags.join(', ')})`);
   }
 
   private async assertAndBind(
@@ -153,16 +251,23 @@ export class StatsEventsConsumer implements OnModuleInit, OnModuleDestroy {
     exchange: string,
     routingKey: string,
   ): Promise<void> {
-    await this.channel!.assertQueue(queue, { durable: true });
-    await this.channel!.bindQueue(queue, exchange, routingKey);
+    const ch = this.channel;
+    if (!ch) throw new Error('RabbitMQ channel not initialized');
+
+    this.logger.log(`Assert+bind queue="${queue}" routingKey="${routingKey}"`);
+    await ch.assertQueue(queue, { durable: true });
+    await ch.bindQueue(queue, exchange, routingKey);
   }
 
   private parseJson<T>(msg: ConsumeMessage): T | null {
     try {
-      return JSON.parse(msg.content.toString()) as T;
+      return JSON.parse(msg.content.toString('utf8')) as T;
     } catch (error) {
+      this.counters.parseError++;
       this.logger.error(
-        `Failed to parse message: ${msg.content.toString()}`,
+        `Failed to parse message (deliveryTag=${msg.fields.deliveryTag}): ${msg.content.toString(
+          'utf8',
+        )}`,
         error instanceof Error ? error.stack : String(error),
       );
       return null;
@@ -173,7 +278,13 @@ export class StatsEventsConsumer implements OnModuleInit, OnModuleDestroy {
     if (value === undefined || value === null) return BigInt(0);
     if (typeof value === 'bigint') return value;
     if (typeof value === 'number') return BigInt(Math.trunc(value));
-    return BigInt(value);
+    const s = String(value).trim();
+    if (!s) return BigInt(0);
+    try {
+      return BigInt(s);
+    } catch {
+      return BigInt(0);
+    }
   }
 
   private async markProcessed(
@@ -182,6 +293,7 @@ export class StatsEventsConsumer implements OnModuleInit, OnModuleDestroy {
   ): Promise<'new' | 'duplicate' | 'error'> {
     const now = nowEpochMsBigInt();
     const client = this.prisma as any;
+
     try {
       await client.processedMessage.create({
         data: {
@@ -194,14 +306,16 @@ export class StatsEventsConsumer implements OnModuleInit, OnModuleDestroy {
       return 'new';
     } catch (error: any) {
       if (error?.code === 'P2002') {
+        this.counters.duplicate++;
         this.logger.debug(
           `Duplicate message ignored: messageId=${messageId}, eventType=${eventType}`,
         );
         return 'duplicate';
       }
+
       this.logger.error(
-        `Failed to mark processed messageId=${messageId}: ${error?.message ?? error}`,
-        error?.stack,
+        `Failed to mark processed messageId=${messageId}, eventType=${eventType}`,
+        error instanceof Error ? error.stack : String(error),
       );
       return 'error';
     }
@@ -212,9 +326,12 @@ export class StatsEventsConsumer implements OnModuleInit, OnModuleDestroy {
     try {
       await client.processedMessage.delete({ where: { messageId } });
     } catch (error: any) {
+      // ignore "not found"
       if (error?.code !== 'P2025') {
         this.logger.warn(
-          `Cleanup processed message failed for messageId=${messageId}: ${error?.message ?? error}`,
+          `Cleanup processed message failed for messageId=${messageId}: ${
+            error?.message ?? error
+          }`,
         );
       }
     }
@@ -230,10 +347,10 @@ export class StatsEventsConsumer implements OnModuleInit, OnModuleDestroy {
 
   private async handleAdClick(msg: ConsumeMessage | null): Promise<void> {
     if (!msg) return;
-    const payload = this.parseJson<AdClickMessage>(msg);
 
-    // Validate required fields (use adsId from publisher, adId for backward compatibility)
+    const payload = this.parseJson<AdClickMessage>(msg);
     const adId = payload?.adId || payload?.adsId;
+
     if (
       !payload ||
       !payload.uid ||
@@ -241,16 +358,21 @@ export class StatsEventsConsumer implements OnModuleInit, OnModuleDestroy {
       !payload.channelId ||
       !payload.machine
     ) {
+      this.counters.malformed++;
       this.logger.warn(
-        `Malformed ad.click message (missing uid, adId, channelId, or machine), dropping: ${msg.content.toString()}`,
+        `Malformed ad.click message, dropping (deliveryTag=${msg.fields.deliveryTag}): ${msg.content.toString(
+          'utf8',
+        )}`,
       );
       this.nackDrop(msg);
       return;
     }
 
-    // For deduplication, generate a simple messageId if not provided
+    // If publisher doesn't provide messageId, generate stable-ish one
+    // NOTE: This is still best-effort; ideally publisher always sends messageId.
     const messageId =
-      payload.messageId || `${Date.now()}-${adId}-${payload.uid}`;
+      payload.messageId ||
+      `${this.toBigInt(payload.clickAt ?? payload.clickedAt) || nowEpochMsBigInt()}-${adId}-${payload.uid}`;
 
     const status = await this.markProcessed(messageId, 'stats.ad.click');
     if (status === 'duplicate') {
@@ -266,13 +388,12 @@ export class StatsEventsConsumer implements OnModuleInit, OnModuleDestroy {
       const client = this.prisma as any;
       const now = nowEpochMsBigInt();
 
-      // Determine the click timestamp (use clickAt from publisher, or clickedAt for backward compatibility)
       const clickTime = payload.clickAt || payload.clickedAt || now;
 
       await client.adClickEvents.create({
         data: {
           uid: payload.uid,
-          adId: adId,
+          adId,
           adType: payload.adType,
           clickedAt: this.toBigInt(clickTime),
           ip: payload.ip,
@@ -282,12 +403,14 @@ export class StatsEventsConsumer implements OnModuleInit, OnModuleDestroy {
           updateAt: this.toBigInt(payload.updateAt || now),
         },
       });
+
       this.counters.adClick++;
       this.ack(msg);
-    } catch (error: any) {
+    } catch (error) {
+      this.counters.persistError++;
       this.logger.error(
-        `Failed to persist ad.click messageId=${messageId}: ${error?.message ?? error}`,
-        error?.stack,
+        `Failed to persist ad.click messageId=${messageId} (deliveryTag=${msg.fields.deliveryTag})`,
+        error instanceof Error ? error.stack : String(error),
       );
       await this.cleanupProcessed(messageId);
       this.nackDrop(msg);
@@ -296,17 +419,21 @@ export class StatsEventsConsumer implements OnModuleInit, OnModuleDestroy {
 
   private async handleVideoClick(msg: ConsumeMessage | null): Promise<void> {
     if (!msg) return;
+
     const payload = this.parseJson<VideoClickMessage>(msg);
+
     if (
       !payload ||
       !payload.messageId ||
       !payload.uid ||
       !payload.videoId ||
-      //!payload.channelId ||
       !payload.machine
     ) {
+      this.counters.malformed++;
       this.logger.warn(
-        `Malformed video.click message (missing required fields), dropping: ${msg?.content.toString()}`,
+        `Malformed video.click message, dropping (deliveryTag=${msg.fields.deliveryTag}): ${msg.content.toString(
+          'utf8',
+        )}`,
       );
       this.nackDrop(msg);
       return;
@@ -327,11 +454,11 @@ export class StatsEventsConsumer implements OnModuleInit, OnModuleDestroy {
 
     try {
       const client = this.prisma as any;
+
       await client.videoClickEvents.create({
         data: {
           uid: payload.uid,
           videoId: payload.videoId,
-          // channelId: payload.channelId,
           machine: payload.machine,
           categoryId: payload.categoryId ?? null,
           scene: payload.scene,
@@ -344,12 +471,14 @@ export class StatsEventsConsumer implements OnModuleInit, OnModuleDestroy {
           updateAt: this.toBigInt(payload.updateAt),
         },
       });
+
       this.counters.videoClick++;
       this.ack(msg);
-    } catch (error: any) {
+    } catch (error) {
+      this.counters.persistError++;
       this.logger.error(
-        `Failed to persist video.click messageId=${payload.messageId}: ${error?.message ?? error}`,
-        error?.stack,
+        `Failed to persist video.click messageId=${payload.messageId} (deliveryTag=${msg.fields.deliveryTag})`,
+        error instanceof Error ? error.stack : String(error),
       );
       await this.cleanupProcessed(payload.messageId);
       this.nackDrop(msg);
@@ -358,17 +487,21 @@ export class StatsEventsConsumer implements OnModuleInit, OnModuleDestroy {
 
   private async handleAdImpression(msg: ConsumeMessage | null): Promise<void> {
     if (!msg) return;
+
     const payload = this.parseJson<AdImpressionMessage>(msg);
+
     if (
       !payload ||
       !payload.messageId ||
       !payload.uid ||
       !payload.adId ||
-      // !payload.channelId ||
       !payload.machine
     ) {
+      this.counters.malformed++;
       this.logger.warn(
-        `Malformed ad.impression message (missing required fields), dropping: ${msg?.content.toString()}`,
+        `Malformed ad.impression message, dropping (deliveryTag=${msg.fields.deliveryTag}): ${msg.content.toString(
+          'utf8',
+        )}`,
       );
       this.nackDrop(msg);
       return;
@@ -389,28 +522,31 @@ export class StatsEventsConsumer implements OnModuleInit, OnModuleDestroy {
 
     try {
       const client = this.prisma as any;
+
       await client.adImpressionEvents.create({
         data: {
           uid: payload.uid,
           adId: payload.adId,
           adType: payload.adType,
           impressionAt: this.toBigInt(payload.impressionAt),
-          visibleDurationMs: payload.visibleDurationMs
-            ? BigInt(payload.visibleDurationMs)
-            : null,
+          visibleDurationMs:
+            payload.visibleDurationMs != null
+              ? BigInt(payload.visibleDurationMs)
+              : null,
           ip: payload.ip,
-          // channelId: payload.channelId ?? null,
           machine: payload.machine,
           createAt: this.toBigInt(payload.createAt),
           updateAt: this.toBigInt(payload.updateAt),
         },
       });
+
       this.counters.adImpression++;
       this.ack(msg);
-    } catch (error: any) {
+    } catch (error) {
+      this.counters.persistError++;
       this.logger.error(
-        `Failed to persist ad.impression messageId=${payload.messageId}: ${error?.message ?? error}`,
-        error?.stack,
+        `Failed to persist ad.impression messageId=${payload.messageId} (deliveryTag=${msg.fields.deliveryTag})`,
+        error instanceof Error ? error.stack : String(error),
       );
       await this.cleanupProcessed(payload.messageId);
       this.nackDrop(msg);
@@ -418,17 +554,68 @@ export class StatsEventsConsumer implements OnModuleInit, OnModuleDestroy {
   }
 
   async onModuleDestroy(): Promise<void> {
-    if (this.logInterval) {
-      clearInterval(this.logInterval);
+    if (this.logInterval) clearInterval(this.logInterval);
+
+    this.logger.log('StatsEventsConsumer shutting down...');
+    await this.safeClose();
+    this.logger.log('StatsEventsConsumer shutdown complete');
+  }
+
+  private async safeClose(): Promise<void> {
+    // cancel consumers first (best-effort)
+    if (this.channel && this.consumerTags.length > 0) {
+      for (const tag of this.consumerTags) {
+        try {
+          await this.channel.cancel(tag);
+        } catch (err) {
+          this.logger.warn(
+            `Failed to cancel consumer tag="${tag}"`,
+            err instanceof Error ? err.stack : String(err),
+          );
+        }
+      }
+      this.consumerTags = [];
+    }
+
+    if (this.channel) {
+      try {
+        await this.channel.close();
+      } catch (err) {
+        this.logger.warn(
+          'Error while closing RabbitMQ channel',
+          err instanceof Error ? err.stack : String(err),
+        );
+      } finally {
+        this.channel = undefined;
+      }
     }
+
+    if (this.connection) {
+      try {
+        await this.connection.close();
+      } catch (err) {
+        this.logger.warn(
+          'Error while closing RabbitMQ connection',
+          err instanceof Error ? err.stack : String(err),
+        );
+      } finally {
+        this.connection = undefined;
+      }
+    }
+  }
+
+  private maskAmqpUrl(url: string): string {
     try {
-      await this.channel?.close();
-      await this.connection?.close();
-    } catch (error) {
-      this.logger.error(
-        'Error while closing RabbitMQ connection',
-        error instanceof Error ? error.stack : String(error),
-      );
+      const u = new URL(url);
+      if (u.username || u.password) {
+        const masked = new URL(url);
+        masked.username = u.username ? '***' : '';
+        masked.password = u.password ? '***' : '';
+        return masked.toString();
+      }
+      return url;
+    } catch {
+      return url.replace(/\/\/([^:/@]+):([^@]+)@/g, '//***:***@');
     }
   }
 }

+ 1 - 1
tsconfig.base.json

@@ -5,7 +5,7 @@
     "rootDir": ".",
     "outDir": "./dist",
     "module": "commonjs",
-    "target": "ES2019",
+    "target": "ES2020",
     "moduleResolution": "node",
     "declaration": true,
     "removeComments": true,