ソースを参照

refactor: remove video click and ad impression handling from StatsEventsConsumer

- Removed interfaces and handling logic for video click and ad impression messages.
- Updated StatsEventsConsumer to only handle ad click messages.
- Simplified queue and routing key configurations, fixing them to ad click only.
- Adjusted logging and counters accordingly.

feat: add AdsStatsController and StatsAdClickPublisherService

- Introduced AdsStatsController for managing ad statistics.
- Added StatsAdClickPublisherService for publishing ad click events.

chore: update AdsClickHistory schema to make fields optional

- Changed adType and machine fields to optional in AdsClickHistory model.

chore: clean up backup-prisma.md by removing unused AdsGlobalStats model

- Removed AdsGlobalStats model from backup documentation.

feat: enhance events schema with new models

- Added ProcessedMessage model for tracking processed messages.
- Introduced AdsGlobalStats model for storing ad statistics with necessary fields and indexes.
Dave 3 ヶ月 前
コミット
96fda1511b

+ 126 - 0
apps/box-stats-api/src/feature/stats-events/ads-stats.controller.ts

@@ -0,0 +1,126 @@
+import {
+  BadRequestException,
+  Body,
+  Controller,
+  Headers,
+  HttpCode,
+  Logger,
+  Post,
+  Req,
+} from '@nestjs/common';
+import {
+  ApiBody,
+  ApiHeaders,
+  ApiOperation,
+  ApiResponse,
+  ApiTags,
+  ApiBadRequestResponse,
+} from '@nestjs/swagger';
+import { Request } from 'express';
+import { StatsAdClickPublisherService } from './stats-ad-click.publisher.service';
+import { ApiPropertyOptional } from '@nestjs/swagger';
+import { IsOptional, IsNumber, IsString } from 'class-validator';
+
+export class AdClickRequestDto {
+  @ApiPropertyOptional({
+    description: '广告 Mongo ObjectId(二选一)',
+    example: 'xxxxxxxx',
+  })
+  @IsOptional()
+  @IsString()
+  adsId?: string;
+
+  @ApiPropertyOptional({
+    description: '广告业务 ID(二选一)',
+    example: 123,
+  })
+  @IsOptional()
+  @IsNumber()
+  adId?: number;
+}
+
+@ApiTags('Ads Stats')
+@Controller('stats')
+export class AdsStatsController {
+  private readonly logger = new Logger(AdsStatsController.name);
+
+  constructor(private readonly publisher: StatsAdClickPublisherService) {}
+
+  @Post('ad-click')
+  @HttpCode(202)
+  @ApiOperation({
+    summary: '广告点击上报',
+    description: '用于上报广告点击事件(异步处理,前端无需等待结果)',
+  })
+  @ApiHeaders([
+    {
+      name: 'x-token-uid',
+      required: true,
+      description: '设备唯一标识(UID)',
+    },
+    {
+      name: 'x-token-channelid',
+      required: true,
+      description: '渠道 ID',
+    },
+  ])
+  @ApiBody({
+    required: true,
+    description: '广告标识(二选一,只能提供一个)',
+    schema: {
+      type: 'object',
+      properties: {
+        adsId: {
+          type: 'string',
+          description: '广告 Mongo ObjectId',
+          example: 'xxxxxxxx',
+        },
+        adId: {
+          type: 'number',
+          description: '广告业务 ID',
+          example: 123,
+        },
+      },
+      oneOf: [{ required: ['adsId'] }, { required: ['adId'] }],
+    },
+  })
+  @ApiResponse({
+    status: 202,
+    description: '事件已接收',
+    schema: { example: { ok: true } },
+  })
+  @ApiBadRequestResponse({
+    description: '请求参数错误(Header 缺失或 adsId/adId 不合法)',
+  })
+  publishAdClick(
+    @Req() req: Request,
+    @Headers('x-token-uid') uid?: string,
+    @Headers('x-token-channelid') channelId?: string,
+    @Body() body?: AdClickRequestDto,
+  ): { ok: true } {
+    if (!uid || !channelId) {
+      throw new BadRequestException(
+        'x-token-uid and x-token-channelid headers are required',
+      );
+    }
+
+    const hasAdsId = Boolean(body?.adsId);
+    const hasAdId = body?.adId !== undefined;
+    if (hasAdsId === hasAdId) {
+      throw new BadRequestException(
+        'Exactly one of adsId or adId must be provided',
+      );
+    }
+
+    this.publisher.publishAdClick({
+      uid,
+      channelId,
+      adsId: body?.adsId,
+      adId: body?.adId,
+      headers: req.headers,
+      ipFallback: req.ip,
+    });
+
+    return { ok: true };
+  }
+}

+ 146 - 0
apps/box-stats-api/src/feature/stats-events/stats-ad-click.publisher.service.ts

@@ -0,0 +1,146 @@
+import {
+  Injectable,
+  Logger,
+  OnModuleDestroy,
+  OnModuleInit,
+} from '@nestjs/common';
+import { ConfigService } from '@nestjs/config';
+import { Channel, Connection } from 'amqplib';
+import * as amqp from 'amqplib';
+import { randomUUID } from 'crypto';
+import { IncomingHttpHeaders } from 'http';
+
+export interface AdClickInput {
+  uid: string;
+  channelId: string;
+  adsId?: string;
+  adId?: number;
+  headers: IncomingHttpHeaders;
+  ipFallback?: string;
+}
+
+@Injectable()
+export class StatsAdClickPublisherService
+  implements OnModuleInit, OnModuleDestroy
+{
+  private readonly logger = new Logger(StatsAdClickPublisherService.name);
+
+  private connection?: Connection;
+  private channel?: Channel;
+  private url?: string;
+
+  private readonly exchange = 'stats.user';
+  private readonly routingKey = 'stats.ad.click';
+
+  constructor(private readonly config: ConfigService) {}
+
+  async onModuleInit(): Promise<void> {
+    this.url = this.config.get<string>('RABBITMQ_URL')?.trim();
+    if (!this.url) {
+      this.logger.warn(
+        'StatsAdClickPublisher is disabled: RABBITMQ_URL is not configured',
+      );
+      return;
+    }
+
+    try {
+      this.logger.log(
+        `Connecting to RabbitMQ at ${this.url} for ad click publish`,
+      );
+      const conn = await amqp.connect(this.url);
+      conn.on('error', (err) =>
+        this.logger.error('RabbitMQ connection error', err.stack ?? err),
+      );
+      conn.on('close', () => this.logger.warn('RabbitMQ connection closed'));
+      this.connection = conn;
+
+      const ch = await conn.createChannel();
+      ch.on('error', (err) =>
+        this.logger.error('RabbitMQ channel error', err.stack ?? err),
+      );
+      ch.on('close', () => this.logger.warn('RabbitMQ channel closed'));
+      await ch.assertExchange(this.exchange, 'topic', { durable: true });
+      this.channel = ch;
+      this.logger.log(
+        `StatsAdClickPublisher ready (exchange=${this.exchange})`,
+      );
+    } catch (error) {
+      this.logger.error(
+        'Failed to initialize StatsAdClickPublisher',
+        error instanceof Error ? error.stack : String(error),
+      );
+      this.channel = undefined;
+    }
+  }
+
+  publishAdClick(input: AdClickInput): void {
+    const messageId = randomUUID();
+    const clickedAtSec = Math.floor(Date.now() / 1000);
+
+    const headers = input.headers;
+    const ip =
+      (headers['cf-connecting-ip'] as string | undefined) ||
+      this.extractForwardedFor(headers) ||
+      input.ipFallback ||
+      'unknown';
+
+    const machine =
+      (headers['x-machine'] as string | undefined) ||
+      (headers['user-agent'] as string | undefined);
+
+    const payload: Record<string, unknown> = {
+      messageId,
+      clickedAtSec,
+      uid: input.uid,
+      channelId: input.channelId,
+      ip,
+    };
+
+    if (machine) payload.machine = machine;
+    if (input.adsId) payload.adsId = input.adsId;
+    else if (input.adId !== undefined) payload.adId = input.adId;
+
+    void this.publish(payload).catch((err) => {
+      this.logger.error(
+        'Failed to publish stats.ad.click event',
+        err instanceof Error ? err.stack : String(err),
+      );
+    });
+  }
+
+  private extractForwardedFor(
+    headers?: IncomingHttpHeaders,
+  ): string | undefined {
+    const header = headers?.['x-forwarded-for'];
+    if (!header) return undefined;
+    const value = Array.isArray(header) ? header[0] : header;
+    return value.split(',')[0].trim() || undefined;
+  }
+
+  async publish(payload: unknown): Promise<void> {
+    if (!this.channel) {
+      throw new Error('RabbitMQ channel not ready for stats.ad.click');
+    }
+
+    const buffer = Buffer.from(JSON.stringify(payload));
+
+    this.channel.publish(this.exchange, this.routingKey, buffer, {
+      persistent: true,
+      contentType: 'application/json',
+      timestamp: Date.now(),
+    });
+  }
+
+  async onModuleDestroy(): Promise<void> {
+    try {
+      await this.channel?.close();
+    } catch (err) {
+      this.logger.warn('Error closing stats ad click channel', err);
+    }
+    try {
+      await this.connection?.close();
+    } catch (err) {
+      this.logger.warn('Error closing stats ad click connection', err);
+    }
+  }
+}

+ 0 - 40
apps/box-stats-api/src/feature/stats-events/stats-aggregation.scheduler.ts

@@ -18,7 +18,6 @@ export class StatsAggregationScheduler implements OnModuleInit {
 
   // guardrails: avoid overlapping runs + spam
   private runningAds = false;
-  private runningVideo = false;
 
   constructor(
     private readonly configService: ConfigService,
@@ -104,43 +103,4 @@ export class StatsAggregationScheduler implements OnModuleInit {
     }
   }
 
-  @Cron(CronExpression.EVERY_5_MINUTES, { name: 'stats-aggregation-video' })
-  async runVideoAggregation(): Promise<void> {
-    if (!this.enabled) return;
-
-    if (this.runningVideo) {
-      this.logger.warn(
-        '⏭️  Skip video aggregation: previous run still in progress',
-      );
-      return;
-    }
-
-    this.runningVideo = true;
-    const start = Date.now();
-
-    this.logger.log(
-      `⏰ Video aggregation start (windowDays=${this.windowDays ?? 'all time'})`,
-    );
-
-    try {
-      const result = (await this.statsAggregation.aggregateVideoStats({
-        windowDays: this.windowDays,
-      })) as AggregationResult;
-
-      const ms = Date.now() - start;
-      this.logger.log(
-        `✅ Video aggregation done in ${ms}ms (${result.successCount}/${result.totalProcessed} updated, ${result.errorCount} errors)`,
-      );
-    } catch (err) {
-      const ms = Date.now() - start;
-      this.logger.error(
-        `❌ Video aggregation failed after ${ms}ms: ${
-          err instanceof Error ? err.message : String(err)
-        }`,
-        err instanceof Error ? err.stack : undefined,
-      );
-    } finally {
-      this.runningVideo = false;
-    }
-  }
 }

+ 89 - 540
apps/box-stats-api/src/feature/stats-events/stats-aggregation.service.ts

@@ -1,13 +1,10 @@
 // box-stats-api/src/feature/stats-events/stats-aggregation.service.ts
 import { Injectable, Logger } from '@nestjs/common';
-import { ConfigService } from '@nestjs/config';
 import { PrismaMongoService } from '../../prisma/prisma-mongo.service';
-import { RedisService } from '@box/db/redis/redis.service';
-import { MongoPrismaService } from '@box/db/prisma/mongo-prisma.service';
 import { nowEpochMsBigInt } from '@box/common/time/time.util';
 
 interface AggregationOptions {
-  windowDays?: number; // e.g., 30 for last 30 days, undefined for all time
+  windowDays?: number; // kept for API compatibility, but ignored per new requirements
 }
 
 interface AggregationResult {
@@ -16,651 +13,203 @@ interface AggregationResult {
   errorCount: number;
 }
 
+type AdIdentity =
+  | { kind: 'adsId'; adsId: string }
+  | { kind: 'adId'; adId: number };
+
 @Injectable()
 export class StatsAggregationService {
   private readonly logger = new Logger(StatsAggregationService.name);
 
-  // Smoothed CTR parameters
-  private readonly ctrAlpha: number;
-  private readonly ctrBeta: number;
-
-  // Scoring weights
-  private readonly weightPopularity: number;
-  private readonly weightCtr: number;
-  private readonly weightRecency: number;
-
-  constructor(
-    private readonly prisma: PrismaMongoService,
-    private readonly configService: ConfigService,
-    private readonly redis: RedisService,
-    private readonly mainMongo: MongoPrismaService,
-  ) {
-    this.ctrAlpha = this.readFiniteNumber('STATS_CTR_ALPHA', 1);
-    this.ctrBeta = this.readFiniteNumber('STATS_CTR_BETA', 2);
-
-    const wPop = this.readFiniteNumber('STATS_WEIGHT_POPULARITY', 0.5);
-    const wCtr = this.readFiniteNumber('STATS_WEIGHT_CTR', 0.3);
-    const wRec = this.readFiniteNumber('STATS_WEIGHT_RECENCY', 0.2);
-
-    const normalized = this.normalizeWeights(wPop, wCtr, wRec);
-
-    this.weightPopularity = normalized.popularity;
-    this.weightCtr = normalized.ctr;
-    this.weightRecency = normalized.recency;
-
-    this.logger.log(
-      `📊 Scoring config loaded: CTR(α=${this.ctrAlpha}, β=${this.ctrBeta}), ` +
-        `Weights(pop=${this.weightPopularity.toFixed(4)}, ctr=${this.weightCtr.toFixed(
-          4,
-        )}, rec=${this.weightRecency.toFixed(4)})`,
-    );
-  }
+  constructor(private readonly prisma: PrismaMongoService) {}
 
   /**
-   * Aggregate ad statistics from raw events.
-   * For now, this is a batch job that recalculates everything.
+   * Aggregate ads click statistics from raw events.
+   * Click-only stats collection: writes AdsGlobalStats(clicks, firstSeenAt, lastSeenAt, createAt, updateAt).
+   *
+   * Notes:
+   * - No scoring / recency / popularity.
+   * - No Redis sync.
+   * - windowDays is ignored (no cutoff filtering), per new requirements.
    */
   async aggregateAdsStats(
-    options: AggregationOptions = {},
+    _options: AggregationOptions = {},
   ): Promise<AggregationResult> {
     const runId = this.newRunId('ads');
     const startedAtMs = Date.now();
 
-    const { windowDays } = options;
-    const cutoffTime = this.computeCutoffTime(windowDays);
-
-    this.logger.log(
-      `[${runId}] Starting ads stats aggregation (window=${windowDays ?? 'all time'}, cutoff=${cutoffTime.toString()})`,
-    );
-
-    let adIds: string[] = [];
-    try {
-      adIds = await this.getUniqueAdIds(cutoffTime);
-    } catch (err) {
-      this.logger.error(
-        `[${runId}] Failed to load unique adIds`,
-        err instanceof Error ? err.stack : String(err),
-      );
-      return { totalProcessed: 0, successCount: 0, errorCount: 1 };
-    }
-
-    this.logger.log(`[${runId}] Found ${adIds.length} unique ads to aggregate`);
-
-    let successCount = 0;
-    let errorCount = 0;
-
-    // score stats without storing whole array
-    let scoreMin = Number.POSITIVE_INFINITY;
-    let scoreMax = Number.NEGATIVE_INFINITY;
-    let scoreSum = 0;
-    let scoreCount = 0;
-    let zeroScoreCount = 0;
-
-    for (let i = 0; i < adIds.length; i++) {
-      const adId = adIds[i];
-
-      try {
-        const score = await this.aggregateSingleAd(adId, cutoffTime, runId);
-
-        scoreCount++;
-        scoreSum += score;
-        if (score < scoreMin) scoreMin = score;
-        if (score > scoreMax) scoreMax = score;
-        if (score === 0) zeroScoreCount++;
-
-        successCount++;
-      } catch (err) {
-        errorCount++;
-        this.logger.error(
-          `[${runId}] Failed to aggregate adId=${adId}`,
-          err instanceof Error ? err.stack : String(err),
-        );
-      }
-
-      // light progress log every 500 items (helps “silent startup” / long runs)
-      if ((i + 1) % 500 === 0) {
-        this.logger.log(
-          `[${runId}] Progress ${i + 1}/${adIds.length} (ok=${successCount}, err=${errorCount})`,
-        );
-      }
-    }
-
-    const durationMs = Date.now() - startedAtMs;
-
-    const stats =
-      scoreCount > 0
-        ? {
-            min: Number.isFinite(scoreMin) ? scoreMin : 0,
-            max: Number.isFinite(scoreMax) ? scoreMax : 0,
-            avg: scoreSum / scoreCount,
-          }
-        : { min: 0, max: 0, avg: 0 };
-
-    this.logger.log(
-      `[${runId}] ✅ Ads aggregation complete in ${durationMs}ms: updated=${successCount}, errors=${errorCount}, ` +
-        `scores(min=${stats.min.toFixed(4)}, max=${stats.max.toFixed(4)}, avg=${stats.avg.toFixed(
-          4,
-        )}), zeroScores=${zeroScoreCount}`,
-    );
-
-    return {
-      totalProcessed: adIds.length,
-      successCount,
-      errorCount,
-    };
-  }
-
-  /**
-   * Aggregate video statistics from raw events.
-   */
-  async aggregateVideoStats(
-    options: AggregationOptions = {},
-  ): Promise<AggregationResult> {
-    const runId = this.newRunId('video');
-    const startedAtMs = Date.now();
-
-    const { windowDays } = options;
-    const cutoffTime = this.computeCutoffTime(windowDays);
-
-    this.logger.log(
-      `[${runId}] Starting video stats aggregation (window=${windowDays ?? 'all time'}, cutoff=${cutoffTime.toString()})`,
-    );
+    this.logger.log(`[${runId}] Starting ads clicks aggregation (all time)`);
 
-    let videoIds: string[] = [];
+    let identities: AdIdentity[] = [];
     try {
-      videoIds = await this.getUniqueVideoIds(cutoffTime);
+      identities = await this.getUniqueAdIdentities();
     } catch (err) {
       this.logger.error(
-        `[${runId}] Failed to load unique videoIds`,
+        `[${runId}] Failed to load unique ad identities`,
         err instanceof Error ? err.stack : String(err),
       );
       return { totalProcessed: 0, successCount: 0, errorCount: 1 };
     }
 
     this.logger.log(
-      `[${runId}] Found ${videoIds.length} unique videos to aggregate`,
+      `[${runId}] Found ${identities.length} unique ads to aggregate`,
     );
 
     let successCount = 0;
     let errorCount = 0;
 
-    let scoreMin = Number.POSITIVE_INFINITY;
-    let scoreMax = Number.NEGATIVE_INFINITY;
-    let scoreSum = 0;
-    let scoreCount = 0;
-    let zeroScoreCount = 0;
-
-    for (let i = 0; i < videoIds.length; i++) {
-      const videoId = videoIds[i];
+    for (let i = 0; i < identities.length; i++) {
+      const idn = identities[i];
 
       try {
-        const score = await this.aggregateSingleVideo(
-          videoId,
-          cutoffTime,
-          runId,
-        );
-
-        scoreCount++;
-        scoreSum += score;
-        if (score < scoreMin) scoreMin = score;
-        if (score > scoreMax) scoreMax = score;
-        if (score === 0) zeroScoreCount++;
-
+        await this.aggregateSingleAdClicks(idn, runId);
         successCount++;
       } catch (err) {
         errorCount++;
         this.logger.error(
-          `[${runId}] Failed to aggregate videoId=${videoId}`,
+          `[${runId}] Failed to aggregate ${this.identityLog(idn)}`,
           err instanceof Error ? err.stack : String(err),
         );
       }
 
       if ((i + 1) % 500 === 0) {
         this.logger.log(
-          `[${runId}] Progress ${i + 1}/${videoIds.length} (ok=${successCount}, err=${errorCount})`,
+          `[${runId}] Progress ${i + 1}/${identities.length} (ok=${successCount}, err=${errorCount})`,
         );
       }
     }
 
     const durationMs = Date.now() - startedAtMs;
 
-    const stats =
-      scoreCount > 0
-        ? {
-            min: Number.isFinite(scoreMin) ? scoreMin : 0,
-            max: Number.isFinite(scoreMax) ? scoreMax : 0,
-            avg: scoreSum / scoreCount,
-          }
-        : { min: 0, max: 0, avg: 0 };
-
     this.logger.log(
-      `[${runId}] ✅ Video aggregation complete in ${durationMs}ms: updated=${successCount}, errors=${errorCount}, ` +
-        `scores(min=${stats.min.toFixed(4)}, max=${stats.max.toFixed(4)}, avg=${stats.avg.toFixed(
-          4,
-        )}), zeroScores=${zeroScoreCount}`,
+      `[${runId}] ✅ Ads clicks aggregation complete in ${durationMs}ms: updated=${successCount}, errors=${errorCount}`,
     );
 
     return {
-      totalProcessed: videoIds.length,
+      totalProcessed: identities.length,
       successCount,
       errorCount,
     };
   }
 
-  private async getUniqueAdIds(cutoffTime: bigint): Promise<string[]> {
+  private async getUniqueAdIdentities(): Promise<AdIdentity[]> {
     const client = this.prisma as any;
 
-    const clickAdIds = await client.adClickEvents.findMany({
-      where: { clickedAt: { gte: cutoffTime } },
-      select: { adId: true },
-      distinct: ['adId'],
+    // We do 2 distinct queries to support both identifiers cleanly.
+    const byAdsId = await client.adClickEvents.findMany({
+      where: { adsId: { not: null } },
+      select: { adsId: true },
+      distinct: ['adsId'],
     });
 
-    const impressionAdIds = await client.adImpressionEvents.findMany({
-      where: { impressionAt: { gte: cutoffTime } },
+    const byAdId = await client.adClickEvents.findMany({
+      where: { adId: { not: null } },
       select: { adId: true },
       distinct: ['adId'],
     });
 
-    const allAdIds = new Set<string>();
-    clickAdIds.forEach((item: any) => item?.adId && allAdIds.add(item.adId));
-    impressionAdIds.forEach(
-      (item: any) => item?.adId && allAdIds.add(item.adId),
-    );
+    const identities: AdIdentity[] = [];
 
-    return Array.from(allAdIds);
-  }
+    for (const row of byAdsId) {
+      const adsId = row?.adsId as string | undefined;
+      if (adsId) identities.push({ kind: 'adsId', adsId });
+    }
 
-  private async getUniqueVideoIds(cutoffTime: bigint): Promise<string[]> {
-    const client = this.prisma as any;
+    for (const row of byAdId) {
+      const adId = row?.adId as number | undefined;
+      if (adId !== undefined && adId !== null)
+        identities.push({ kind: 'adId', adId });
+    }
 
-    const videoIds = await client.videoClickEvents.findMany({
-      where: { clickedAt: { gte: cutoffTime } },
-      select: { videoId: true },
-      distinct: ['videoId'],
+    // De-dupe in case some events contain both ids.
+    const seen = new Set<string>();
+    return identities.filter((idn) => {
+      const key = this.identityKey(idn);
+      if (seen.has(key)) return false;
+      seen.add(key);
+      return true;
     });
-
-    return videoIds.map((item: any) => item.videoId).filter(Boolean);
   }
 
-  private async aggregateSingleAd(
-    adId: string,
-    cutoffTime: bigint,
+  private async aggregateSingleAdClicks(
+    idn: AdIdentity,
     runId: string,
-  ): Promise<number> {
+  ): Promise<void> {
     const client = this.prisma as any;
 
-    // Count clicks
+    const whereId =
+      idn.kind === 'adsId' ? { adsId: idn.adsId } : { adId: idn.adId };
+
     const clicks = await client.adClickEvents.count({
-      where: {
-        adId,
-        clickedAt: { gte: cutoffTime },
-      },
+      where: whereId,
     });
 
-    // Count impressions
-    const impressions = await client.adImpressionEvents.count({
-      where: {
-        adId,
-        impressionAt: { gte: cutoffTime },
-      },
-    });
+    if (clicks <= 0) {
+      // No clicks: nothing to write/update
+      return;
+    }
 
-    // Get first/last seen times for clicks
-    const clickTimes = await client.adClickEvents.findMany({
-      where: { adId, clickedAt: { gte: cutoffTime } },
+    // Find first/last click time (epoch ms BigInt)
+    const firstRow = await client.adClickEvents.findFirst({
+      where: whereId,
       select: { clickedAt: true },
       orderBy: { clickedAt: 'asc' },
     });
 
-    const impressionTimes = await client.adImpressionEvents.findMany({
-      where: { adId, impressionAt: { gte: cutoffTime } },
-      select: { impressionAt: true },
-      orderBy: { impressionAt: 'asc' },
-    });
-
-    const allTimes: bigint[] = [
-      ...clickTimes.map((t: any) => t.clickedAt).filter((v: any) => v != null),
-      ...impressionTimes
-        .map((t: any) => t.impressionAt)
-        .filter((v: any) => v != null),
-    ];
-
-    if (allTimes.length === 0) return 0;
-
-    const firstSeenAt = allTimes.reduce((min, val) => (val < min ? val : min));
-    const lastSeenAt = allTimes.reduce((max, val) => (val > max ? val : max));
-
-    const computedPopularity = this.computePopularity(impressions);
-    const computedCtr = this.computeCtr(clicks, impressions);
-    const computedRecency = this.computeRecency(firstSeenAt);
-    const computedScore = this.computeScore(
-      computedPopularity,
-      computedCtr,
-      computedRecency,
-    );
-
-    const now = nowEpochMsBigInt();
-
-    await client.adsGlobalStats.upsert({
-      where: { adId },
-      update: {
-        impressions: BigInt(impressions),
-        clicks: BigInt(clicks),
-        lastSeenAt,
-        computedCtr,
-        computedPopularity,
-        computedRecency,
-        computedScore,
-        updateAt: now,
-      },
-      create: {
-        adId,
-        impressions: BigInt(impressions),
-        clicks: BigInt(clicks),
-        firstSeenAt,
-        lastSeenAt,
-        computedCtr,
-        computedPopularity,
-        computedRecency,
-        computedScore,
-        createAt: now,
-        updateAt: now,
-      },
-    });
-
-    this.logger.debug(
-      `[${runId}] adId=${adId} imp=${impressions} clk=${clicks} ctr=${computedCtr.toFixed(
-        4,
-      )} score=${computedScore.toFixed(4)}`,
-    );
-
-    await this.syncAdScoreToRedis(adId, computedScore, runId);
-
-    return computedScore;
-  }
-
-  private async aggregateSingleVideo(
-    videoId: string,
-    cutoffTime: bigint,
-    runId: string,
-  ): Promise<number> {
-    const client = this.prisma as any;
-
-    // Count clicks (no impressions for videos yet)
-    const clicks = await client.videoClickEvents.count({
-      where: {
-        videoId,
-        clickedAt: { gte: cutoffTime },
-      },
-    });
-
-    const impressions = clicks;
-
-    const clickTimes = await client.videoClickEvents.findMany({
-      where: { videoId, clickedAt: { gte: cutoffTime } },
+    const lastRow = await client.adClickEvents.findFirst({
+      where: whereId,
       select: { clickedAt: true },
-      orderBy: { clickedAt: 'asc' },
+      orderBy: { clickedAt: 'desc' },
     });
 
-    if (clickTimes.length === 0) return 0;
-
-    const allTimes: bigint[] = clickTimes
-      .map((t: any) => t.clickedAt)
-      .filter((v: any) => v != null);
-
-    if (allTimes.length === 0) return 0;
-
-    const firstSeenAt = allTimes.reduce((min, val) => (val < min ? val : min));
-    const lastSeenAt = allTimes.reduce((max, val) => (val > max ? val : max));
+    const firstSeenAt = firstRow?.clickedAt as bigint | undefined;
+    const lastSeenAt = lastRow?.clickedAt as bigint | undefined;
 
-    const computedPopularity = this.computePopularity(impressions);
-    const computedCtr = this.computeCtr(clicks, impressions);
-    const computedRecency = this.computeRecency(firstSeenAt);
-    const computedScore = this.computeScore(
-      computedPopularity,
-      computedCtr,
-      computedRecency,
-    );
+    if (!firstSeenAt || !lastSeenAt) {
+      return;
+    }
 
     const now = nowEpochMsBigInt();
 
-    await client.videoGlobalStats.upsert({
-      where: { videoId },
+    // Upsert AdsGlobalStats by identity.
+    // IMPORTANT: AdsGlobalStats.adsId and AdsGlobalStats.adId should be @unique to allow upsert.
+    const whereGlobal =
+      idn.kind === 'adsId' ? { adsId: idn.adsId } : { adId: idn.adId };
+
+    await client.adsGlobalStats.upsert({
+      where: whereGlobal,
       update: {
-        impressions: BigInt(impressions),
         clicks: BigInt(clicks),
         lastSeenAt,
-        computedCtr,
-        computedPopularity,
-        computedRecency,
-        computedScore,
         updateAt: now,
+        // impressions kept but not used in click-only mode
       },
       create: {
-        videoId,
-        impressions: BigInt(impressions),
+        adsId: idn.kind === 'adsId' ? idn.adsId : undefined,
+        adId: idn.kind === 'adId' ? idn.adId : undefined,
+        impressions: BigInt(0),
         clicks: BigInt(clicks),
         firstSeenAt,
         lastSeenAt,
-        computedCtr,
-        computedPopularity,
-        computedRecency,
-        computedScore,
         createAt: now,
         updateAt: now,
       },
     });
 
     this.logger.debug(
-      `[${runId}] videoId=${videoId} imp=${impressions} clk=${clicks} ctr=${computedCtr.toFixed(
-        4,
-      )} score=${computedScore.toFixed(4)}`,
+      `[${runId}] ${this.identityLog(idn)} clicks=${clicks} first=${firstSeenAt.toString()} last=${lastSeenAt.toString()}`,
     );
-
-    await this.syncVideoScoreToRedis(videoId, computedScore, runId);
-
-    return computedScore;
   }
 
-  /**
-   * Popularity score based on impressions (reach).
-   * Formula: log(1 + impressions)
-   */
-  private computePopularity(impressions: number): number {
-    return Math.log(1 + Math.max(0, impressions));
+  private identityKey(idn: AdIdentity): string {
+    return idn.kind === 'adsId' ? `adsId:${idn.adsId}` : `adId:${idn.adId}`;
   }
 
-  /**
-   * Smoothed CTR
-   * Formula: (clicks + alpha) / (impressions + beta)
-   */
-  private computeCtr(clicks: number, impressions: number): number {
-    const c = Math.max(0, clicks);
-    const i = Math.max(0, impressions);
-    const denom = i + this.ctrBeta;
-
-    // denom should never be 0 because beta defaults to 2, but guard anyway
-    if (!Number.isFinite(denom) || denom <= 0) return 0;
-
-    return (c + this.ctrAlpha) / denom;
-  }
-
-  /**
-   * Recency score based on age since first appearance.
-   * Formula: 1 / (1 + ageDays)
-   */
-  private computeRecency(firstSeenAt: bigint): number {
-    const now = nowEpochMsBigInt();
-
-    // If clocks/time data is weird, avoid negative ages blowing up score
-    let ageMsBig = now - firstSeenAt;
-    if (ageMsBig < BigInt(0)) ageMsBig = BigInt(0);
-
-    // Convert safely to number for division; clamp if extremely large
-    const ageMs = this.bigIntToSafeNumber(ageMsBig);
-
-    const ageDays = ageMs / (1000 * 60 * 60 * 24);
-    return 1 / (1 + ageDays);
-  }
-
-  /**
-   * Composite score: w1 * popularity + w2 * ctr + w3 * recency
-   */
-  private computeScore(
-    popularity: number,
-    ctr: number,
-    recency: number,
-  ): number {
-    return (
-      this.weightPopularity * popularity +
-      this.weightCtr * ctr +
-      this.weightRecency * recency
-    );
-  }
-
-  private async syncAdScoreToRedis(
-    adId: string,
-    score: number,
-    runId: string,
-  ): Promise<void> {
-    try {
-      const client = (this.redis as any).ensureClient();
-      await client.zadd('ads:global:score', score, adId);
-
-      this.logger.debug(
-        `[${runId}] Redis sync adId=${adId} score=${score.toFixed(4)} ok`,
-      );
-    } catch (err) {
-      this.logger.error(
-        `[${runId}] Redis sync FAILED for adId=${adId}`,
-        err instanceof Error ? err.stack : String(err),
-      );
-    }
-  }
-
-  private async syncVideoScoreToRedis(
-    videoId: string,
-    score: number,
-    runId: string,
-  ): Promise<void> {
-    try {
-      const client = (this.redis as any).ensureClient();
-
-      await client.zadd('video:global:score', score, videoId);
-
-      const video = await this.mainMongo.videoMedia.findUnique({
-        where: { id: videoId },
-        select: { tagIds: true },
-      });
-
-      const tagIds = video?.tagIds ?? [];
-      if (Array.isArray(tagIds) && tagIds.length > 0) {
-        for (const tagId of tagIds) {
-          await client.zadd(`video:tag:${tagId}:score`, score, videoId);
-        }
-
-        this.logger.debug(
-          `[${runId}] Redis sync videoId=${videoId} score=${score.toFixed(
-            4,
-          )} ok (tags=${tagIds.length})`,
-        );
-      } else {
-        this.logger.debug(
-          `[${runId}] Redis sync videoId=${videoId} score=${score.toFixed(
-            4,
-          )} ok (no tags)`,
-        );
-      }
-    } catch (err) {
-      this.logger.error(
-        `[${runId}] Redis sync FAILED for videoId=${videoId}`,
-        err instanceof Error ? err.stack : String(err),
-      );
-    }
-  }
-
-  // -------------------------
-  // helpers (config + safety)
-  // -------------------------
-
-  private computeCutoffTime(windowDays?: number): bigint {
-    if (windowDays === undefined) return BigInt(0);
-
-    const days = Number(windowDays);
-    if (!Number.isFinite(days) || days <= 0) return BigInt(0);
-
-    const ms = BigInt(Math.floor(days * 24 * 60 * 60 * 1000));
-    const now = nowEpochMsBigInt();
-    const cutoff = now - ms;
-
-    return cutoff > BigInt(0) ? cutoff : BigInt(0);
-  }
-
-  private readFiniteNumber(key: string, fallback: number): number {
-    const raw = this.configService.get<string>(key);
-
-    if (raw == null || raw.trim() === '') return fallback;
-
-    const n = Number.parseFloat(raw.trim());
-    if (!Number.isFinite(n)) {
-      this.logger.warn(
-        `Config ${key}="${raw}" is not a finite number; using ${fallback}`,
-      );
-      return fallback;
-    }
-
-    return n;
-  }
-
-  private normalizeWeights(
-    popularity: number,
-    ctr: number,
-    recency: number,
-  ): {
-    popularity: number;
-    ctr: number;
-    recency: number;
-  } {
-    const wp = Number.isFinite(popularity) && popularity >= 0 ? popularity : 0;
-    const wc = Number.isFinite(ctr) && ctr >= 0 ? ctr : 0;
-    const wr = Number.isFinite(recency) && recency >= 0 ? recency : 0;
-
-    const sum = wp + wc + wr;
-
-    if (sum <= 0) {
-      this.logger.warn(
-        `Invalid weights (sum<=0). Falling back to defaults (0.5,0.3,0.2)`,
-      );
-      return { popularity: 0.5, ctr: 0.3, recency: 0.2 };
-    }
-
-    // Normalize to sum=1 so config mistakes don’t blow up scoring scale
-    const np = wp / sum;
-    const nc = wc / sum;
-    const nr = wr / sum;
-
-    if (Math.abs(sum - 1) > 1e-9) {
-      this.logger.warn(
-        `Weights normalized from (pop=${wp}, ctr=${wc}, rec=${wr}, sum=${sum}) to ` +
-          `(pop=${np.toFixed(4)}, ctr=${nc.toFixed(4)}, rec=${nr.toFixed(4)})`,
-      );
-    }
-
-    return { popularity: np, ctr: nc, recency: nr };
-  }
-
-  private bigIntToSafeNumber(v: bigint): number {
-    const MAX_SAFE = BigInt(Number.MAX_SAFE_INTEGER);
-    if (v <= BigInt(0)) return 0;
-
-    if (v > MAX_SAFE) return Number.MAX_SAFE_INTEGER;
-
-    return Number(v);
+  private identityLog(idn: AdIdentity): string {
+    return idn.kind === 'adsId' ? `adsId=${idn.adsId}` : `adId=${idn.adId}`;
   }
 
   private newRunId(prefix: string): string {
-    // tiny unique-ish id for logs
     const ts = Date.now().toString(36);
     const rnd = Math.floor(Math.random() * 1_000_000)
       .toString(36)

+ 14 - 224
apps/box-stats-api/src/feature/stats-events/stats-events.consumer.ts

@@ -33,24 +33,6 @@ interface AdClickMessage extends BaseStatsMessage {
   machine?: string;
 }
 
-interface VideoClickMessage extends BaseStatsMessage {
-  videoId: string;
-  machine: string;
-  categoryId?: string;
-  scene: string;
-  clickedAt: string | number | bigint;
-}
-
-interface AdImpressionMessage extends BaseStatsMessage {
-  adId: string;
-  scene: string;
-  slot: string;
-  adType: string;
-  impressionAt: string | number | bigint;
-  visibleDurationMs?: number;
-  machine?: string;
-}
-
 @Injectable()
 export class StatsEventsConsumer implements OnModuleInit, OnModuleDestroy {
   private readonly logger = new Logger(StatsEventsConsumer.name);
@@ -62,8 +44,6 @@ export class StatsEventsConsumer implements OnModuleInit, OnModuleDestroy {
 
   private counters = {
     adClick: 0,
-    videoClick: 0,
-    adImpression: 0,
     parseError: 0,
     malformed: 0,
     duplicate: 0,
@@ -77,12 +57,7 @@ export class StatsEventsConsumer implements OnModuleInit, OnModuleDestroy {
   private exchange = 'stats.user';
 
   private queueAdClick = 'stats.ad.click';
-  private queueVideoClick = 'stats.video.click';
-  private queueAdImpression = 'stats.ad.impression';
-
   private routingKeyAdClick = 'stats.ad.click';
-  private routingKeyVideoClick = 'stats.video.click';
-  private routingKeyAdImpression = 'stats.ad.impression';
 
   constructor(
     private readonly config: ConfigService,
@@ -102,24 +77,7 @@ export class StatsEventsConsumer implements OnModuleInit, OnModuleDestroy {
     this.queueAdClick =
       this.config.get<string>('RABBITMQ_STATS_AD_CLICK_QUEUE')?.trim() ||
       this.queueAdClick;
-    this.queueVideoClick =
-      this.config.get<string>('RABBITMQ_STATS_VIDEO_CLICK_QUEUE')?.trim() ||
-      this.queueVideoClick;
-    this.queueAdImpression =
-      this.config.get<string>('RABBITMQ_STATS_AD_IMPRESSION_QUEUE')?.trim() ||
-      this.queueAdImpression;
-
-    this.routingKeyAdClick =
-      this.config.get<string>('RABBITMQ_STATS_AD_CLICK_ROUTING_KEY')?.trim() ||
-      this.routingKeyAdClick;
-    this.routingKeyVideoClick =
-      this.config
-        .get<string>('RABBITMQ_STATS_VIDEO_CLICK_ROUTING_KEY')
-        ?.trim() || this.routingKeyVideoClick;
-    this.routingKeyAdImpression =
-      this.config
-        .get<string>('RABBITMQ_STATS_AD_IMPRESSION_ROUTING_KEY')
-        ?.trim() || this.routingKeyAdImpression;
+    // Routing key is fixed to stats.ad.click; overrides removed
 
     if (!this.url) {
       // If you want to fail-fast and stop app boot: throw new Error(...)
@@ -134,8 +92,8 @@ export class StatsEventsConsumer implements OnModuleInit, OnModuleDestroy {
         'StatsEventsConsumer bootstrap:',
         `url=${this.maskAmqpUrl(this.url)}`,
         `exchange=${this.exchange}`,
-        `queues=[${this.queueAdClick}, ${this.queueVideoClick}, ${this.queueAdImpression}]`,
-        `routingKeys=[${this.routingKeyAdClick}, ${this.routingKeyVideoClick}, ${this.routingKeyAdImpression}]`,
+        `queue=${this.queueAdClick}`,
+        `routingKey=${this.routingKeyAdClick}`,
       ].join(' '),
     );
 
@@ -146,8 +104,8 @@ export class StatsEventsConsumer implements OnModuleInit, OnModuleDestroy {
       this.logInterval = setInterval(() => {
         this.logger.log(
           `📊 Ingestion stats: ` +
-            `adClick=${this.counters.adClick}, videoClick=${this.counters.videoClick}, adImpression=${this.counters.adImpression}, ` +
-            `duplicate=${this.counters.duplicate}, malformed=${this.counters.malformed}, parseError=${this.counters.parseError}, persistError=${this.counters.persistError}`,
+            `adClick=${this.counters.adClick}, duplicate=${this.counters.duplicate}, ` +
+            `malformed=${this.counters.malformed}, parseError=${this.counters.parseError}, persistError=${this.counters.persistError}`,
         );
       }, 60_000);
     } catch (err) {
@@ -205,58 +163,26 @@ export class StatsEventsConsumer implements OnModuleInit, OnModuleDestroy {
     this.logger.log(`Asserting exchange="${this.exchange}" type=topic durable`);
     await ch.assertExchange(this.exchange, 'topic', { durable: true });
 
-    await this.assertAndBind(
+    this.logger.log(
+      `Asserting queue="${this.queueAdClick}" routingKey="${this.routingKeyAdClick}"`,
+    );
+    await ch.assertQueue(this.queueAdClick, { durable: true });
+    await ch.bindQueue(
       this.queueAdClick,
       this.exchange,
       this.routingKeyAdClick,
     );
-    await this.assertAndBind(
-      this.queueVideoClick,
-      this.exchange,
-      this.routingKeyVideoClick,
-    );
-    await this.assertAndBind(
-      this.queueAdImpression,
-      this.exchange,
-      this.routingKeyAdImpression,
-    );
 
-    this.logger.log(
-      `Consuming queues (noAck=false): ${this.queueAdClick}, ${this.queueVideoClick}, ${this.queueAdImpression}`,
-    );
+    this.logger.log(`Consuming queue="${this.queueAdClick}" (noAck=false)`);
 
-    const c1 = await ch.consume(
+    const consumer = await ch.consume(
       this.queueAdClick,
       (msg) => void this.handleAdClick(msg),
       { noAck: false },
     );
-    const c2 = await ch.consume(
-      this.queueVideoClick,
-      (msg) => void this.handleVideoClick(msg),
-      { noAck: false },
-    );
-    const c3 = await ch.consume(
-      this.queueAdImpression,
-      (msg) => void this.handleAdImpression(msg),
-      { noAck: false },
-    );
 
-    this.consumerTags = [c1.consumerTag, c2.consumerTag, c3.consumerTag];
-
-    this.logger.log(`Consumer started (tags=${this.consumerTags.join(', ')})`);
-  }
-
-  private async assertAndBind(
-    queue: string,
-    exchange: string,
-    routingKey: string,
-  ): Promise<void> {
-    const ch = this.channel;
-    if (!ch) throw new Error('RabbitMQ channel not initialized');
-
-    this.logger.log(`Assert+bind queue="${queue}" routingKey="${routingKey}"`);
-    await ch.assertQueue(queue, { durable: true });
-    await ch.bindQueue(queue, exchange, routingKey);
+    this.consumerTags = [consumer.consumerTag];
+    this.logger.log(`Consumer started (tag=${consumer.consumerTag})`);
   }
 
   private parseJson<T>(msg: ConsumeMessage): T | null {
@@ -417,142 +343,6 @@ export class StatsEventsConsumer implements OnModuleInit, OnModuleDestroy {
     }
   }
 
-  private async handleVideoClick(msg: ConsumeMessage | null): Promise<void> {
-    if (!msg) return;
-
-    const payload = this.parseJson<VideoClickMessage>(msg);
-
-    if (
-      !payload ||
-      !payload.messageId ||
-      !payload.uid ||
-      !payload.videoId ||
-      !payload.machine
-    ) {
-      this.counters.malformed++;
-      this.logger.warn(
-        `Malformed video.click message, dropping (deliveryTag=${msg.fields.deliveryTag}): ${msg.content.toString(
-          'utf8',
-        )}`,
-      );
-      this.nackDrop(msg);
-      return;
-    }
-
-    const status = await this.markProcessed(
-      payload.messageId,
-      'stats.video.click',
-    );
-    if (status === 'duplicate') {
-      this.ack(msg);
-      return;
-    }
-    if (status === 'error') {
-      this.nackDrop(msg);
-      return;
-    }
-
-    try {
-      const client = this.prisma as any;
-
-      await client.videoClickEvents.create({
-        data: {
-          uid: payload.uid,
-          videoId: payload.videoId,
-          machine: payload.machine,
-          categoryId: payload.categoryId ?? null,
-          scene: payload.scene,
-          clickedAt: this.toBigInt(payload.clickedAt),
-          ip: payload.ip,
-          userAgent: payload.userAgent,
-          appVersion: payload.appVersion ?? null,
-          os: payload.os ?? null,
-          createAt: this.toBigInt(payload.createAt),
-          updateAt: this.toBigInt(payload.updateAt),
-        },
-      });
-
-      this.counters.videoClick++;
-      this.ack(msg);
-    } catch (error) {
-      this.counters.persistError++;
-      this.logger.error(
-        `Failed to persist video.click messageId=${payload.messageId} (deliveryTag=${msg.fields.deliveryTag})`,
-        error instanceof Error ? error.stack : String(error),
-      );
-      await this.cleanupProcessed(payload.messageId);
-      this.nackDrop(msg);
-    }
-  }
-
-  private async handleAdImpression(msg: ConsumeMessage | null): Promise<void> {
-    if (!msg) return;
-
-    const payload = this.parseJson<AdImpressionMessage>(msg);
-
-    if (
-      !payload ||
-      !payload.messageId ||
-      !payload.uid ||
-      !payload.adId ||
-      !payload.machine
-    ) {
-      this.counters.malformed++;
-      this.logger.warn(
-        `Malformed ad.impression message, dropping (deliveryTag=${msg.fields.deliveryTag}): ${msg.content.toString(
-          'utf8',
-        )}`,
-      );
-      this.nackDrop(msg);
-      return;
-    }
-
-    const status = await this.markProcessed(
-      payload.messageId,
-      'stats.ad.impression',
-    );
-    if (status === 'duplicate') {
-      this.ack(msg);
-      return;
-    }
-    if (status === 'error') {
-      this.nackDrop(msg);
-      return;
-    }
-
-    try {
-      const client = this.prisma as any;
-
-      await client.adImpressionEvents.create({
-        data: {
-          uid: payload.uid,
-          adId: payload.adId,
-          adType: payload.adType,
-          impressionAt: this.toBigInt(payload.impressionAt),
-          visibleDurationMs:
-            payload.visibleDurationMs != null
-              ? BigInt(payload.visibleDurationMs)
-              : null,
-          ip: payload.ip,
-          machine: payload.machine,
-          createAt: this.toBigInt(payload.createAt),
-          updateAt: this.toBigInt(payload.updateAt),
-        },
-      });
-
-      this.counters.adImpression++;
-      this.ack(msg);
-    } catch (error) {
-      this.counters.persistError++;
-      this.logger.error(
-        `Failed to persist ad.impression messageId=${payload.messageId} (deliveryTag=${msg.fields.deliveryTag})`,
-        error instanceof Error ? error.stack : String(error),
-      );
-      await this.cleanupProcessed(payload.messageId);
-      this.nackDrop(msg);
-    }
-  }
-
   async onModuleDestroy(): Promise<void> {
     if (this.logInterval) clearInterval(this.logInterval);
 

+ 4 - 1
apps/box-stats-api/src/feature/stats-events/stats-events.module.ts

@@ -4,14 +4,17 @@ import { SharedModule } from '@box/db/shared.module';
 import { PrismaMongoModule } from '../../prisma/prisma-mongo.module';
 import { StatsEventsConsumer } from './stats-events.consumer';
 import { StatsInternalController } from './stats-internal.controller';
+import { AdsStatsController } from './ads-stats.controller';
+import { StatsAdClickPublisherService } from './stats-ad-click.publisher.service';
 import { StatsAggregationService } from './stats-aggregation.service';
 import { StatsAggregationScheduler } from './stats-aggregation.scheduler';
 
 @Module({
   imports: [ConfigModule, PrismaMongoModule, SharedModule],
-  controllers: [StatsInternalController],
+  controllers: [StatsInternalController, AdsStatsController],
   providers: [
     StatsEventsConsumer,
+    StatsAdClickPublisherService,
     StatsAggregationService,
     StatsAggregationScheduler,
   ],

+ 10 - 273
apps/box-stats-api/src/feature/stats-events/stats-internal.controller.ts

@@ -1,9 +1,15 @@
 import { Controller, Get, Logger, Post, Query } from '@nestjs/common';
-import { ApiOperation, ApiResponse, ApiTags, ApiQuery } from '@nestjs/swagger';
-import { RedisService } from '@box/db/redis/redis.service';
+import {
+  ApiOperation,
+  ApiResponse,
+  ApiTags,
+  ApiQuery,
+  ApiExcludeController,
+} from '@nestjs/swagger';
 import { StatsEventsConsumer } from './stats-events.consumer';
 import { StatsAggregationService } from './stats-aggregation.service';
 
+@ApiExcludeController()
 @ApiTags('内部统计')
 @Controller('internal/stats')
 export class StatsInternalController {
@@ -12,14 +18,12 @@ export class StatsInternalController {
   constructor(
     private readonly statsEventsConsumer: StatsEventsConsumer,
     private readonly statsAggregation: StatsAggregationService,
-    private readonly redis: RedisService,
   ) {}
 
   @Get('ingestion')
   @ApiOperation({
     summary: '统计事件摄取计数器',
-    description:
-      '返回已处理的广告点击、视频点击和广告曝光事件数量(内存计数器)。',
+    description: '返回已处理的广告点击事件数量(内存计数器)。',
   })
   @ApiResponse({
     status: 200,
@@ -27,8 +31,7 @@ export class StatsInternalController {
     schema: {
       example: {
         adClick: 123,
-        videoClick: 456,
-        adImpression: 789,
+        malformed: 4,
       },
     },
   })
@@ -59,7 +62,6 @@ export class StatsInternalController {
       `Manual ads aggregation triggered (window: ${days ?? 'all time'})`,
     );
 
-    // Run async without blocking
     this.statsAggregation
       .aggregateAdsStats({ windowDays: days })
       .catch((err) =>
@@ -68,269 +70,4 @@ export class StatsInternalController {
 
     return { status: 'started', windowDays: days };
   }
-
-  @Post('aggregate/videos')
-  @ApiOperation({
-    summary: '触发视频统计聚合',
-    description: '手动触发视频统计数据聚合(批处理任务)。',
-  })
-  @ApiQuery({
-    name: 'windowDays',
-    required: false,
-    description: '聚合时间窗口(天数),不设置则聚合全部时间',
-  })
-  @ApiResponse({
-    status: 200,
-    description: '聚合任务已启动',
-    schema: { example: { status: 'started' } },
-  })
-  async triggerVideoAggregation(@Query('windowDays') windowDays?: string) {
-    const days = windowDays ? parseInt(windowDays, 10) : undefined;
-    this.logger.log(
-      `Manual video aggregation triggered (window: ${days ?? 'all time'})`,
-    );
-
-    this.statsAggregation
-      .aggregateVideoStats({ windowDays: days })
-      .catch((err) =>
-        this.logger.error('Video aggregation failed', err?.stack ?? err),
-      );
-
-    return { status: 'started', windowDays: days };
-  }
-
-  @Get('debug/redis/videos/top')
-  @ApiOperation({
-    summary: '[DEBUG] 获取Redis中排名最高的视频',
-    description:
-      '从 video:global:score 排序集合中返回得分最高的N个视频ID及其分数。仅用于测试和调试。',
-  })
-  @ApiQuery({
-    name: 'limit',
-    required: false,
-    description: '返回数量限制(默认10)',
-  })
-  @ApiResponse({
-    status: 200,
-    description: '成功返回排名视频列表',
-    schema: {
-      example: [
-        { videoId: '6756abc123def', score: 2.45 },
-        { videoId: '6756xyz789abc', score: 1.89 },
-      ],
-    },
-  })
-  async getTopVideosFromRedis(@Query('limit') limit?: string) {
-    const count = limit ? parseInt(limit, 10) : 10;
-    this.logger.debug(`Fetching top ${count} videos from Redis`);
-
-    try {
-      const client = (this.redis as any).ensureClient();
-      // ZREVRANGE returns members in descending order by score
-      const results = await client.zrevrange(
-        'video:global:score',
-        0,
-        count - 1,
-        'WITHSCORES',
-      );
-
-      // Parse results: [member1, score1, member2, score2, ...]
-      const videos = [];
-      for (let i = 0; i < results.length; i += 2) {
-        videos.push({
-          videoId: results[i],
-          score: parseFloat(results[i + 1]),
-        });
-      }
-
-      this.logger.debug(`Found ${videos.length} videos in Redis`);
-      return videos;
-    } catch (error: any) {
-      this.logger.error(
-        `Failed to fetch top videos from Redis: ${error?.message ?? error}`,
-        error?.stack,
-      );
-      return { error: error?.message ?? 'Unknown error', videos: [] };
-    }
-  }
-
-  @Get('debug/redis/ads/top')
-  @ApiOperation({
-    summary: '[DEBUG] 获取Redis中排名最高的广告',
-    description:
-      '从 ads:global:score 排序集合中返回得分最高的N个广告ID及其分数。仅用于测试和调试。',
-  })
-  @ApiQuery({
-    name: 'limit',
-    required: false,
-    description: '返回数量限制(默认10)',
-  })
-  @ApiResponse({
-    status: 200,
-    description: '成功返回排名广告列表',
-    schema: {
-      example: [
-        { adId: '6756def456ghi', score: 3.12 },
-        { adId: '6756ghi789jkl', score: 2.67 },
-      ],
-    },
-  })
-  async getTopAdsFromRedis(@Query('limit') limit?: string) {
-    const count = limit ? parseInt(limit, 10) : 10;
-    this.logger.debug(`Fetching top ${count} ads from Redis`);
-
-    try {
-      const client = (this.redis as any).ensureClient();
-      const results = await client.zrevrange(
-        'ads:global:score',
-        0,
-        count - 1,
-        'WITHSCORES',
-      );
-
-      const ads = [];
-      for (let i = 0; i < results.length; i += 2) {
-        ads.push({
-          adId: results[i],
-          score: parseFloat(results[i + 1]),
-        });
-      }
-
-      this.logger.debug(`Found ${ads.length} ads in Redis`);
-      return ads;
-    } catch (error: any) {
-      this.logger.error(
-        `Failed to fetch top ads from Redis: ${error?.message ?? error}`,
-        error?.stack,
-      );
-      return { error: error?.message ?? 'Unknown error', ads: [] };
-    }
-  }
-
-  @Get('debug/redis/videos/tag/:tagId')
-  @ApiOperation({
-    summary: '[DEBUG] 获取特定标签下排名最高的视频',
-    description:
-      '从 video:tag:<tagId>:score 排序集合中返回得分最高的N个视频ID及其分数。仅用于测试和调试。',
-  })
-  @ApiQuery({
-    name: 'limit',
-    required: false,
-    description: '返回数量限制(默认10)',
-  })
-  @ApiResponse({
-    status: 200,
-    description: '成功返回排名视频列表',
-    schema: {
-      example: [
-        { videoId: '6756abc123def', score: 2.45 },
-        { videoId: '6756xyz789abc', score: 1.89 },
-      ],
-    },
-  })
-  async getTopVideosByTag(
-    @Query('limit') limit?: string,
-    @Query('tagId') tagId?: string,
-  ) {
-    if (!tagId) {
-      return { error: 'tagId query parameter is required', videos: [] };
-    }
-
-    const count = limit ? parseInt(limit, 10) : 10;
-    this.logger.debug(
-      `Fetching top ${count} videos for tag ${tagId} from Redis`,
-    );
-
-    try {
-      const client = (this.redis as any).ensureClient();
-      const key = `video:tag:${tagId}:score`;
-      const results = await client.zrevrange(key, 0, count - 1, 'WITHSCORES');
-
-      const videos = [];
-      for (let i = 0; i < results.length; i += 2) {
-        videos.push({
-          videoId: results[i],
-          score: parseFloat(results[i + 1]),
-        });
-      }
-
-      this.logger.debug(
-        `Found ${videos.length} videos for tag ${tagId} in Redis`,
-      );
-      return { tagId, videos };
-    } catch (error: any) {
-      this.logger.error(
-        `Failed to fetch top videos by tag from Redis: ${error?.message ?? error}`,
-        error?.stack,
-      );
-      return { error: error?.message ?? 'Unknown error', tagId, videos: [] };
-    }
-  }
-
-  @Get('debug/redis/stats')
-  @ApiOperation({
-    summary: '[DEBUG] 获取Redis排序集合统计信息',
-    description: '返回各个排序集合的大小和样本数据。仅用于测试和调试。',
-  })
-  @ApiResponse({
-    status: 200,
-    description: '成功返回统计信息',
-    schema: {
-      example: {
-        videoGlobal: { count: 1234, sample: [{ id: 'xxx', score: 2.5 }] },
-        adsGlobal: { count: 56, sample: [{ id: 'yyy', score: 1.8 }] },
-      },
-    },
-  })
-  async getRedisStats() {
-    this.logger.debug('Fetching Redis sorted set statistics');
-
-    try {
-      const client = (this.redis as any).ensureClient();
-
-      // Get counts
-      const videoCount = await client.zcard('video:global:score');
-      const adsCount = await client.zcard('ads:global:score');
-
-      // Get top 3 samples
-      const videoSample = await client.zrevrange(
-        'video:global:score',
-        0,
-        2,
-        'WITHSCORES',
-      );
-      const adsSample = await client.zrevrange(
-        'ads:global:score',
-        0,
-        2,
-        'WITHSCORES',
-      );
-
-      const parseScores = (results: string[]) => {
-        const items = [];
-        for (let i = 0; i < results.length; i += 2) {
-          items.push({ id: results[i], score: parseFloat(results[i + 1]) });
-        }
-        return items;
-      };
-
-      return {
-        videoGlobal: {
-          count: videoCount,
-          sample: parseScores(videoSample),
-        },
-        adsGlobal: {
-          count: adsCount,
-          sample: parseScores(adsSample),
-        },
-        timestamp: new Date().toISOString(),
-      };
-    } catch (error: any) {
-      this.logger.error(
-        `Failed to fetch Redis stats: ${error?.message ?? error}`,
-        error?.stack,
-      );
-      return { error: error?.message ?? 'Unknown error' };
-    }
-  }
 }

+ 2 - 2
prisma/mongo-stats/schema/ads-click-history.prisma

@@ -8,10 +8,10 @@ model AdsClickHistory {
   adId         Int      @db.Int                         // 广告 ID 自增数字唯一 ID
   clickAt      BigInt                                   // 点击时间 (epoch seconds)
 
-  adType       String                                   // 广告类型 (BANNER, STARTUP, etc.)
+  adType       String?                                  // 广告类型 (BANNER, STARTUP, etc.)
   appVersion   String?                                  // 客户端版本 (optional)
   os           String?                                  // iOS / Android / Web (optional)
-  machine      String                                   // 客户端提供 : 设备的信息,品牌及系统版本什么的 (required)
+  machine      String?                                  // 客户端提供 : 设备的信息,品牌及系统版本什么的 (required)
 
   // Indexes for common queries:
   // 1. Query all clicks for a specific ad

+ 0 - 28
prisma/mongo-stats/schema/backup-prisma.md

@@ -93,31 +93,3 @@ updateAt BigInt // 更新时间 (epoch)
 
 @@map("videoGlobalStats")
 }
-
-model AdsGlobalStats {
-id String @id @map("\_id") @default(auto()) @db.ObjectId
-adId String @unique @db.ObjectId // 广告 ID(唯一)
-impressions BigInt @default(0) // 曝光总数
-clicks BigInt @default(0) // 点击总数
-firstSeenAt BigInt // 首次出现时间 (epoch)
-lastSeenAt BigInt // 最后活跃时间 (epoch)
-computedCtr Float @default(0) // 计算的点击率 (clicks/impressions)
-computedPopularity Float @default(0) // 计算的热度得分
-computedRecency Float @default(0) // 计算的时效性得分
-computedScore Float @default(0) // 综合得分
-createAt BigInt // 创建时间 (epoch)
-updateAt BigInt // 更新时间 (epoch)
-
-// Query helpers
-// 1. adId has unique index via @unique
-// 2. 按综合得分排序(热门广告推荐)
-@@index([computedScore])
-// 3. 按时效性排序(最新热点)
-@@index([computedRecency, computedScore])
-// 4. 按点击率排序(高转化广告)
-@@index([computedCtr])
-// 5. 按最后活跃时间排序
-@@index([lastSeenAt])
-
-@@map("adsGlobalStats")
-}

+ 38 - 5
prisma/mongo-stats/schema/events.prisma

@@ -2,14 +2,14 @@ model AdClickEvents {
   id           String  @id @map("_id") @default(auto()) @db.ObjectId
 
   uid          String                          // 设备码(from JWT / device)
-  adsId        String  @db.ObjectId            // 广告 ID mongo objectId
-  adId         Int     @db.Int                 // 广告 ID 自增数字唯一 ID
-  adType       String                          // 广告类型 (BANNER/STARTUP/...)
+  adsId        String?  @db.ObjectId            // 广告 ID mongo objectId
+  adId         Int?     @db.Int                 // 广告 ID 自增数字唯一 ID
+  adType       String?                         // 广告类型 (BANNER/STARTUP/...)
 
   clickedAt    BigInt                          // 点击时间 (epoch)
   ip           String                          // 点击 IP
-  channelId   String                          // 用户自带渠道 Id (required)
-  machine      String                          // 客户端提供 : 设备的信息,品牌及系统版本什么的 (required)
+  channelId    String                          // 用户自带渠道 Id (required)
+  machine      String?                         // 客户端提供 : 设备的信息,品牌及系统版本什么的 (required)
 
   createAt     BigInt                          // 记录创建时间
   updateAt     BigInt                          // 记录更新时间
@@ -28,3 +28,36 @@ model AdClickEvents {
 
   @@map("adClickEvents")
 }
+
+model ProcessedMessage {
+  id          String @id @map("_id") @default(auto()) @db.ObjectId
+  messageId   String @unique // 去重用的唯一消息 ID
+  eventType   String // 事件类型(如 stats.ad.click)
+  processedAt BigInt // 处理时间(epoch ms)
+  createdAt   BigInt // 创建时间(epoch ms)
+
+  @@map("processedMessages")
+}
+
+model AdsGlobalStats {
+  id                 String @id @map("_id") @default(auto()) @db.ObjectId
+  adsId              String?  @db.ObjectId            // 广告 ID mongo objectId
+  adId               Int?     @db.Int                 // 广告 ID 自增数字唯一 ID
+  impressions        BigInt @default(0) // 曝光总数
+  clicks             BigInt @default(0) // 点击总数
+  firstSeenAt        BigInt // 首次出现时间 (epoch)
+  lastSeenAt         BigInt // 最后活跃时间 (epoch)
+  computedCtr        Float  @default(0) // 计算的点击率 (clicks/impressions)
+  computedPopularity Float  @default(0) // 计算的热度得分
+  computedRecency    Float  @default(0) // 计算的时效性得分
+  computedScore      Float  @default(0) // 综合得分
+  createAt           BigInt // 创建时间 (epoch)
+  updateAt           BigInt // 更新时间 (epoch)
+
+  @@index([computedScore])
+  @@index([computedRecency, computedScore])
+  @@index([computedCtr])
+  @@index([lastSeenAt])
+
+  @@map("adsGlobalStats")
+}