Procházet zdrojové kódy

feat: update AdClickRequestDto and related services for required adsId and improved validation

Dave před 1 měsícem
rodič
revize
ad6e1bf249

+ 39 - 62
apps/box-stats-api/src/feature/stats-events/ads-stats.controller.ts

@@ -15,6 +15,7 @@ import {
   ApiResponse,
   ApiTags,
   ApiBadRequestResponse,
+  ApiProperty,
 } from '@nestjs/swagger';
 import { Request } from 'express';
 import { StatsAdClickPublisherService } from './stats-ad-click.publisher.service';
@@ -22,21 +23,20 @@ import { ApiPropertyOptional } from '@nestjs/swagger';
 import { IsOptional, IsNumber, IsString } from 'class-validator';
 
 export class AdClickRequestDto {
-  @ApiPropertyOptional({
-    description: '广告 Mongo ObjectId(二选一)',
-    example: 'xxxxxxxx',
-  })
-  @IsOptional()
+  @ApiProperty({ description: '用户唯一设备ID', example: 'xxxxxx' })
+  @IsString()
+  uid!: string;
+
+  @ApiProperty({ description: '渠道ID', example: 'AAA' })
   @IsString()
-  adsId?: string;
+  channelId!: string;
 
-  @ApiPropertyOptional({
-    description: '广告业务 ID(二选一)',
-    example: 123,
+  @ApiProperty({
+    description: '广告 Mongo ObjectId',
+    example: '64b7c2f967ce4d6799c047d1',
   })
-  @IsOptional()
-  @IsNumber()
-  adId?: number;
+  @IsString()
+  adsId!: string; // ✅ required
 }
 
 @ApiTags('Ads Stats')
@@ -50,77 +50,54 @@ export class AdsStatsController {
   @HttpCode(202)
   @ApiOperation({
     summary: '广告点击上报',
-    description: '用于上报广告点击事件(异步处理,前端无需等待结果)',
+    description: '用于广告点击统计。uid、channelId、adsId 必填。',
   })
-  @ApiHeaders([
-    {
-      name: 'x-token-uid',
-      required: true,
-      description: '设备唯一标识(UID)',
-    },
-    {
-      name: 'x-token-channelid',
-      required: true,
-      description: '渠道 ID',
-    },
-  ])
   @ApiBody({
-    required: true,
-    description: '广告标识(二选一,只能提供一个)',
-    schema: {
-      type: 'object',
-      properties: {
-        adsId: {
-          type: 'string',
-          description: '广告 Mongo ObjectId',
-          example: 'xxxxxxxx',
-        },
-        adId: {
-          type: 'number',
-          description: '广告业务 ID',
-          example: 123,
+    type: AdClickRequestDto,
+    description: 'uid、channelId、adsId 必填(adsId 为 Mongo ObjectId)',
+    examples: {
+      example: {
+        summary: '示例',
+        value: {
+          uid: 'xxxxxx',
+          channelId: 'AAA',
+          adsId: '64b7c2f967ce4d6799c047d1',
         },
       },
-      oneOf: [{ required: ['adsId'] }, { required: ['adId'] }],
     },
   })
   @ApiResponse({
     status: 202,
-    description: '事件已接收',
+    description: '已接收(异步处理)',
     schema: { example: { ok: true } },
   })
   @ApiBadRequestResponse({
-    description: '请求参数错误(Header 缺失或 adsId/adId 不合法)',
+    description: '参数错误(缺少 uid/channelId/adsId)',
   })
   publishAdClick(
     @Req() req: Request,
-    @Headers('x-token-uid') uid?: string,
-    @Headers('x-token-channelid') channelId?: string,
-    @Body() body?: AdClickRequestDto,
+    @Body() body: AdClickRequestDto,
   ): { ok: true } {
-    if (!uid || !channelId) {
-      throw new BadRequestException(
-        'x-token-uid and x-token-channelid headers are required',
-      );
-    }
-
-    const hasAdsId = Boolean(body?.adsId);
-    const hasAdId = body?.adId !== undefined;
-    if (hasAdsId === hasAdId) {
-      throw new BadRequestException(
-        'Exactly one of adsId or adId must be provided',
-      );
+    if (!body?.uid || !body?.channelId || !body?.adsId) {
+      throw new BadRequestException('uid, channelId and adsId are required');
     }
 
     this.publisher.publishAdClick({
-      uid,
-      channelId,
-      adsId: body?.adsId,
-      adId: body?.adId,
+      uid: body.uid,
+      channelId: body.channelId,
+      adsId: body.adsId,
       headers: req.headers,
-      ipFallback: req.ip,
+      ipFallback: req.ip, // ✅ raw fallback only
     });
 
     return { ok: true };
   }
+
+  private extractForwardedFor(headers: Request['headers']): string | undefined {
+    const header = headers?.['x-forwarded-for'];
+    if (!header) return undefined;
+    const value = Array.isArray(header) ? header[0] : header;
+    const first = value.split(',')[0]?.trim();
+    return first || undefined;
+  }
 }

+ 2 - 4
apps/box-stats-api/src/feature/stats-events/stats-ad-click.publisher.service.ts

@@ -13,8 +13,7 @@ import { IncomingHttpHeaders } from 'http';
 export interface AdClickInput {
   uid: string;
   channelId: string;
-  adsId?: string;
-  adId?: number;
+  adsId: string; // ✅ required
   headers: IncomingHttpHeaders;
   ipFallback?: string;
 }
@@ -97,8 +96,7 @@ export class StatsAdClickPublisherService
     };
 
     if (machine) payload.machine = machine;
-    if (input.adsId) payload.adsId = input.adsId;
-    else if (input.adId !== undefined) payload.adId = input.adId;
+    payload.adsId = input.adsId;
 
     void this.publish(payload).catch((err) => {
       this.logger.error(

+ 62 - 65
apps/box-stats-api/src/feature/stats-events/stats-aggregation.service.ts

@@ -23,15 +23,6 @@ export class StatsAggregationService {
 
   constructor(private readonly prisma: PrismaMongoService) {}
 
-  /**
-   * Aggregate ads click statistics from raw events.
-   * Click-only stats collection: writes AdsGlobalStats(clicks, firstSeenAt, lastSeenAt, createAt, updateAt).
-   *
-   * Notes:
-   * - No scoring / recency / popularity.
-   * - No Redis sync.
-   * - windowDays is ignored (no cutoff filtering), per new requirements.
-   */
   async aggregateAdsStats(
     _options: AggregationOptions = {},
   ): Promise<AggregationResult> {
@@ -40,41 +31,49 @@ export class StatsAggregationService {
 
     this.logger.log(`[${runId}] Starting ads clicks aggregation (all time)`);
 
-    let identities: AdIdentity[] = [];
+    let adsIds: string[] = [];
     try {
-      identities = await this.getUniqueAdIdentities();
+      adsIds = await this.getUniqueAdsIds();
     } catch (err) {
       this.logger.error(
-        `[${runId}] Failed to load unique ad identities`,
+        `[${runId}] Failed to load unique adsIds for aggregation`,
         err instanceof Error ? err.stack : String(err),
       );
       return { totalProcessed: 0, successCount: 0, errorCount: 1 };
     }
 
-    this.logger.log(
-      `[${runId}] Found ${identities.length} unique ads to aggregate`,
-    );
+    const total = adsIds.length;
+
+    if (total === 0) {
+      const durationMs = Date.now() - startedAtMs;
+      this.logger.log(
+        `[${runId}] ✅ No ads to aggregate (0). Done in ${durationMs}ms`,
+      );
+      return { totalProcessed: 0, successCount: 0, errorCount: 0 };
+    }
+
+    this.logger.log(`[${runId}] Found ${total} unique ads to aggregate`);
 
     let successCount = 0;
     let errorCount = 0;
 
-    for (let i = 0; i < identities.length; i++) {
-      const idn = identities[i];
+    for (let i = 0; i < total; i++) {
+      const adsId = adsIds[i];
 
       try {
-        await this.aggregateSingleAdClicks(idn, runId);
+        await this.aggregateSingleAdClicks(adsId, runId);
         successCount++;
       } catch (err) {
         errorCount++;
         this.logger.error(
-          `[${runId}] Failed to aggregate ${this.identityLog(idn)}`,
+          `[${runId}] Failed to aggregate adsId=${adsId}`,
           err instanceof Error ? err.stack : String(err),
         );
       }
 
-      if ((i + 1) % 500 === 0) {
+      if ((i + 1) % 500 === 0 || i + 1 === total) {
         this.logger.log(
-          `[${runId}] Progress ${i + 1}/${identities.length} (ok=${successCount}, err=${errorCount})`,
+          `[${runId}] Progress ${i + 1}/${total} (ok=${successCount}, err=${errorCount})`,
         );
       }
     }
@@ -86,12 +85,26 @@ export class StatsAggregationService {
     );
 
     return {
-      totalProcessed: identities.length,
+      totalProcessed: total,
       successCount,
       errorCount,
     };
   }
 
+  private async getUniqueAdsIds(): Promise<string[]> {
+    const client = this.prisma as any;
+
+    const rows = await client.adClickEvents.findMany({
+      where: { adsId: { not: null } },
+      select: { adsId: true },
+      distinct: ['adsId'],
+    });
+
+    return rows
+      .map((r: any) => r?.adsId as string | undefined)
+      .filter((v: string | undefined): v is string => Boolean(v));
+  }
+
   private async getUniqueAdIdentities(): Promise<AdIdentity[]> {
     const client = this.prisma as any;
 
@@ -132,63 +145,47 @@ export class StatsAggregationService {
   }
 
   private async aggregateSingleAdClicks(
-    idn: AdIdentity,
+    adsId: string,
     runId: string,
   ): Promise<void> {
     const client = this.prisma as any;
-
-    const whereId =
-      idn.kind === 'adsId' ? { adsId: idn.adsId } : { adId: idn.adId };
-
-    const clicks = await client.adClickEvents.count({
-      where: whereId,
-    });
-
-    if (clicks <= 0) {
-      // No clicks: nothing to write/update
-      return;
-    }
-
-    // Find first/last click time (epoch ms BigInt)
-    const firstRow = await client.adClickEvents.findFirst({
-      where: whereId,
-      select: { clickedAt: true },
-      orderBy: { clickedAt: 'asc' },
-    });
-
-    const lastRow = await client.adClickEvents.findFirst({
-      where: whereId,
-      select: { clickedAt: true },
-      orderBy: { clickedAt: 'desc' },
-    });
+    const whereId = { adsId };
+
+    const clicks: number = await client.adClickEvents.count({ where: whereId });
+    if (clicks <= 0) return;
+
+    // Fetch first + last click time (epoch ms BigInt)
+    const [firstRow, lastRow] = await Promise.all([
+      client.adClickEvents.findFirst({
+        where: whereId,
+        select: { clickedAt: true },
+        orderBy: { clickedAt: 'asc' },
+      }),
+      client.adClickEvents.findFirst({
+        where: whereId,
+        select: { clickedAt: true },
+        orderBy: { clickedAt: 'desc' },
+      }),
+    ]);
 
     const firstSeenAt = firstRow?.clickedAt as bigint | undefined;
     const lastSeenAt = lastRow?.clickedAt as bigint | undefined;
-
-    if (!firstSeenAt || !lastSeenAt) {
-      return;
-    }
+    if (!firstSeenAt || !lastSeenAt) return;
 
     const now = nowEpochMsBigInt();
-
-    // Upsert AdsGlobalStats by identity.
-    // IMPORTANT: AdsGlobalStats.adsId and AdsGlobalStats.adId should be @unique to allow upsert.
-    const whereGlobal =
-      idn.kind === 'adsId' ? { adsId: idn.adsId } : { adId: idn.adId };
+    const clicksBig = BigInt(clicks);
 
     await client.adsGlobalStats.upsert({
-      where: whereGlobal,
+      where: { adsId },
       update: {
-        clicks: BigInt(clicks),
+        clicks: clicksBig,
         lastSeenAt,
         updateAt: now,
-        // impressions kept but not used in click-only mode
       },
       create: {
-        adsId: idn.kind === 'adsId' ? idn.adsId : undefined,
-        adId: idn.kind === 'adId' ? idn.adId : undefined,
-        impressions: BigInt(0),
-        clicks: BigInt(clicks),
+        adsId,
+        impressions: 0n,
+        clicks: clicksBig,
         firstSeenAt,
         lastSeenAt,
         createAt: now,
@@ -197,7 +194,7 @@ export class StatsAggregationService {
     });
 
     this.logger.debug(
-      `[${runId}] ${this.identityLog(idn)} clicks=${clicks} first=${firstSeenAt.toString()} last=${lastSeenAt.toString()}`,
+      `[${runId}] adsId=${adsId} clicks=${clicks} first=${firstSeenAt.toString()} last=${lastSeenAt.toString()}`,
     );
   }
 

+ 14 - 4
apps/box-stats-api/src/feature/stats-events/stats-events.consumer.ts

@@ -275,12 +275,13 @@ export class StatsEventsConsumer implements OnModuleInit, OnModuleDestroy {
     if (!msg) return;
 
     const payload = this.parseJson<AdClickMessage>(msg);
-    const adId = payload?.adId || payload?.adsId;
+    const adsId = payload.adsId;
+    const adId = payload.adId;
 
     if (
       !payload ||
       !payload.uid ||
-      !adId ||
+      !adsId ||
       !payload.channelId ||
       !payload.machine
     ) {
@@ -298,7 +299,7 @@ export class StatsEventsConsumer implements OnModuleInit, OnModuleDestroy {
     // NOTE: This is still best-effort; ideally publisher always sends messageId.
     const messageId =
       payload.messageId ||
-      `${this.toBigInt(payload.clickAt ?? payload.clickedAt) || nowEpochMsBigInt()}-${adId}-${payload.uid}`;
+      `${this.toBigInt(payload.clickAt ?? payload.clickedAt) || nowEpochMsBigInt()}-${adsId}-${payload.uid}`;
 
     const status = await this.markProcessed(messageId, 'stats.ad.click');
     if (status === 'duplicate') {
@@ -316,10 +317,19 @@ export class StatsEventsConsumer implements OnModuleInit, OnModuleDestroy {
 
       const clickTime = payload.clickAt || payload.clickedAt || now;
 
+      this.logger.log(
+        `Processing ad.click messageId=${messageId} (deliveryTag=${msg.fields.deliveryTag})`,
+      );
+
+      this.logger.log(
+        `adClickEvents create: adsId=${adsId}, adId=${adId}, uid=${payload.uid}`,
+      );
+
       await client.adClickEvents.create({
         data: {
           uid: payload.uid,
-          adId,
+          adsId: adsId,
+          adId: adId,
           adType: payload.adType,
           clickedAt: this.toBigInt(clickTime),
           ip: payload.ip,

+ 3 - 3
prisma/mongo-stats/schema/events.prisma

@@ -40,9 +40,9 @@ model ProcessedMessage {
 }
 
 model AdsGlobalStats {
-  id                 String @id @map("_id") @default(auto()) @db.ObjectId
-  adsId              String?  @db.ObjectId            // 广告 ID mongo objectId
-  adId               Int?     @db.Int                 // 广告 ID 自增数字唯一 ID
+  id                 String   @id @map("_id") @default(auto()) @db.ObjectId
+  adsId              String   @unique @db.ObjectId  // ✅ required + unique
+  // adId               Int?     @db.Int                 // 广告 ID 自增数字唯一 ID
   impressions        BigInt @default(0) // 曝光总数
   clicks             BigInt @default(0) // 点击总数
   firstSeenAt        BigInt // 首次出现时间 (epoch)