Procházet zdrojové kódy

feat: implement stats rerun functionality with hourly and daily aggregation services

Dave před 3 měsíci
rodič
revize
6722b05d25

+ 60 - 0
apps/box-stats-api/src/feature/stats-events/dto/stats-rerun.dto.ts

@@ -0,0 +1,60 @@
+import { ApiProperty, ApiPropertyOptional } from '@nestjs/swagger';
+import {
+  IsBoolean,
+  IsInt,
+  IsOptional,
+  Min,
+  ValidatorConstraint,
+  ValidatorConstraintInterface,
+  ValidationArguments,
+  Validate,
+} from 'class-validator';
+
+const MAX_HOURS = 48;
+const HOUR_SECONDS = 3600;
+
+@ValidatorConstraint({ name: 'StatsRerunWindow', async: false })
+class StatsRerunWindowConstraint implements ValidatorConstraintInterface {
+  validate(value: unknown, args: ValidationArguments): boolean {
+    const dto = args.object as StatsRerunDto;
+    if (typeof dto.fromSec !== 'number' || typeof value !== 'number') {
+      return false;
+    }
+    if (value <= dto.fromSec) return false;
+    const diff = value - dto.fromSec;
+    if (diff % HOUR_SECONDS !== 0) return false;
+    return diff <= MAX_HOURS * HOUR_SECONDS;
+  }
+
+  defaultMessage(args: ValidationArguments): string {
+    return `Range must be positive, hourly aligned, and at most ${MAX_HOURS} hours`;
+  }
+}
+
+export class StatsRerunDto {
+  @ApiProperty({
+    description: 'Inclusive window start (epoch seconds)',
+    example: 1690000000,
+  })
+  @IsInt()
+  @Min(0)
+  fromSec: number;
+
+  @ApiProperty({
+    description: 'Exclusive window end (epoch seconds)',
+    example: 1690007200,
+  })
+  @IsInt()
+  @Validate(StatsRerunWindowConstraint)
+  @Min(0)
+  toSec: number;
+
+  @ApiPropertyOptional({
+    description:
+      'If true, only describe the hourly buckets without running aggregation',
+    default: false,
+  })
+  @IsOptional()
+  @IsBoolean()
+  dryRun?: boolean;
+}

+ 4 - 2
apps/box-stats-api/src/feature/stats-events/stats-ad-click.publisher.service.ts

@@ -74,7 +74,9 @@ export class StatsAdClickPublisherService
 
   publishAdClick(input: AdClickInput): void {
     const messageId = randomUUID();
-    const clickedAtSec = Math.floor(Date.now() / 1000);
+    const nowSec = Math.floor(Date.now() / 1000);
+    const clickedAtSec = nowSec;
+    const sentAtSec = nowSec;
 
     const headers = input.headers;
     const ip =
@@ -90,6 +92,7 @@ export class StatsAdClickPublisherService
     const payload: Record<string, unknown> = {
       messageId,
       clickedAtSec,
+      sentAtSec,
       uid: input.uid,
       channelId: input.channelId,
       ip,
@@ -125,7 +128,6 @@ export class StatsAdClickPublisherService
     this.channel.publish(this.exchange, this.routingKey, buffer, {
       persistent: true,
       contentType: 'application/json',
-      timestamp: Date.now(),
     });
   }
 

+ 0 - 1
apps/box-stats-api/src/feature/stats-events/stats-aggregation.scheduler.ts

@@ -102,5 +102,4 @@ export class StatsAggregationScheduler implements OnModuleInit {
       this.runningAds = false;
     }
   }
-
 }

+ 25 - 12
apps/box-stats-api/src/feature/stats-events/stats-events.consumer.ts

@@ -8,7 +8,7 @@ import { ConfigService } from '@nestjs/config';
 import * as amqp from 'amqplib';
 import { Channel, Connection, ConsumeMessage } from 'amqplib';
 import { PrismaMongoService } from '../../prisma/prisma-mongo.service';
-import { nowEpochMsBigInt } from '@box/common/time/time.util';
+import { nowSecBigInt } from '@box/common/time/time.util';
 
 interface BaseStatsMessage {
   messageId: string;
@@ -213,11 +213,22 @@ export class StatsEventsConsumer implements OnModuleInit, OnModuleDestroy {
     }
   }
 
+  private normalizeTimestampToSeconds(
+    value: string | number | bigint | undefined,
+    fallbackSec: bigint,
+  ): bigint {
+    if (value === undefined || value === null) return fallbackSec;
+    if (typeof value === 'string' && value.trim() === '') return fallbackSec;
+
+    const normalized = this.toBigInt(value);
+    return normalized >= 1_000_000_000_000n ? normalized / 1000n : normalized;
+  }
+
   private async markProcessed(
     messageId: string,
     eventType: string,
   ): Promise<'new' | 'duplicate' | 'error'> {
-    const now = nowEpochMsBigInt();
+    const nowSec = nowSecBigInt();
     const client = this.prisma as any;
 
     try {
@@ -225,8 +236,8 @@ export class StatsEventsConsumer implements OnModuleInit, OnModuleDestroy {
         data: {
           messageId,
           eventType,
-          processedAt: now,
-          createdAt: now,
+          processedAt: nowSec,
+          createdAt: nowSec,
         },
       });
       return 'new';
@@ -297,9 +308,14 @@ export class StatsEventsConsumer implements OnModuleInit, OnModuleDestroy {
 
     // If publisher doesn't provide messageId, generate stable-ish one
     // NOTE: This is still best-effort; ideally publisher always sends messageId.
+    const nowSec = nowSecBigInt();
+    const clickTime = this.normalizeTimestampToSeconds(
+      payload.clickAt ?? payload.clickedAt,
+      nowSec,
+    );
+
     const messageId =
-      payload.messageId ||
-      `${this.toBigInt(payload.clickAt ?? payload.clickedAt) || nowEpochMsBigInt()}-${adsId}-${payload.uid}`;
+      payload.messageId || `${clickTime}-${adsId}-${payload.uid}`;
 
     const status = await this.markProcessed(messageId, 'stats.ad.click');
     if (status === 'duplicate') {
@@ -313,9 +329,6 @@ export class StatsEventsConsumer implements OnModuleInit, OnModuleDestroy {
 
     try {
       const client = this.prisma as any;
-      const now = nowEpochMsBigInt();
-
-      const clickTime = payload.clickAt || payload.clickedAt || now;
 
       this.logger.log(
         `Processing ad.click messageId=${messageId} (deliveryTag=${msg.fields.deliveryTag})`,
@@ -331,12 +344,12 @@ export class StatsEventsConsumer implements OnModuleInit, OnModuleDestroy {
           adsId: adsId,
           adId: adId,
           adType: payload.adType,
-          clickedAt: this.toBigInt(clickTime),
+          clickedAt: clickTime,
           ip: payload.ip,
           channelId: payload.channelId,
           machine: payload.machine,
-          createAt: this.toBigInt(payload.createAt || now),
-          updateAt: this.toBigInt(payload.updateAt || now),
+          createAt: this.normalizeTimestampToSeconds(payload.createAt, nowSec),
+          updateAt: this.normalizeTimestampToSeconds(payload.updateAt, nowSec),
         },
       });
 

+ 6 - 0
apps/box-stats-api/src/feature/stats-events/stats-events.module.ts

@@ -8,6 +8,9 @@ import { AdsStatsController } from './ads-stats.controller';
 import { StatsAdClickPublisherService } from './stats-ad-click.publisher.service';
 import { StatsAggregationService } from './stats-aggregation.service';
 import { StatsAggregationScheduler } from './stats-aggregation.scheduler';
+import { StatsHourlyAggregationService } from './stats.hourly.aggregation.service';
+import { StatsDailyAggregationService } from './stats.daily.aggregation.service';
+import { StatsRerunService } from './stats-rerun.service';
 
 @Module({
   imports: [ConfigModule, PrismaMongoModule, SharedModule],
@@ -17,6 +20,9 @@ import { StatsAggregationScheduler } from './stats-aggregation.scheduler';
     StatsAdClickPublisherService,
     StatsAggregationService,
     StatsAggregationScheduler,
+    StatsHourlyAggregationService,
+    StatsDailyAggregationService,
+    StatsRerunService,
   ],
   exports: [StatsAggregationService],
 })

+ 18 - 1
apps/box-stats-api/src/feature/stats-events/stats-internal.controller.ts

@@ -1,4 +1,4 @@
-import { Controller, Get, Logger, Post, Query } from '@nestjs/common';
+import { Controller, Get, Logger, Post, Query, Body } from '@nestjs/common';
 import {
   ApiOperation,
   ApiResponse,
@@ -8,6 +8,8 @@ import {
 } from '@nestjs/swagger';
 import { StatsEventsConsumer } from './stats-events.consumer';
 import { StatsAggregationService } from './stats-aggregation.service';
+import { StatsRerunService } from './stats-rerun.service';
+import { StatsRerunDto } from './dto/stats-rerun.dto';
 
 @ApiExcludeController()
 @ApiTags('内部统计')
@@ -18,6 +20,7 @@ export class StatsInternalController {
   constructor(
     private readonly statsEventsConsumer: StatsEventsConsumer,
     private readonly statsAggregation: StatsAggregationService,
+    private readonly statsRerunService: StatsRerunService,
   ) {}
 
   @Get('ingestion')
@@ -70,4 +73,18 @@ export class StatsInternalController {
 
     return { status: 'started', windowDays: days };
   }
+
+  @Post('rerun')
+  @ApiOperation({
+    summary: 'Re-run stats aggregation for a specific hourly range',
+    description:
+      'Internal endpoint to repair/backfill stats by replaying hourly buckets. Daily totals are refreshed during the hourly pass.',
+  })
+  @ApiResponse({
+    status: 200,
+    description: 'Details on processed hours or strategy',
+  })
+  async rerunAggregation(@Body() dto: StatsRerunDto) {
+    return this.statsRerunService.rerun(dto);
+  }
 }

+ 64 - 0
apps/box-stats-api/src/feature/stats-events/stats-rerun.service.ts

@@ -0,0 +1,64 @@
+import { Injectable, Logger } from '@nestjs/common';
+import { StatsHourlyAggregationService } from './stats.hourly.aggregation.service';
+import { StatsDailyAggregationService } from './stats.daily.aggregation.service';
+import { StatsRerunDto } from './dto/stats-rerun.dto';
+
+@Injectable()
+export class StatsRerunService {
+  private readonly logger = new Logger(StatsRerunService.name);
+
+  constructor(
+    private readonly statsHourly: StatsHourlyAggregationService,
+    private readonly statsDaily: StatsDailyAggregationService,
+  ) {}
+
+  buildHours(fromSec: number, toSec: number): number[] {
+    const hours: number[] = [];
+    for (let cursor = fromSec; cursor < toSec; cursor += 3600) {
+      hours.push(cursor);
+    }
+    return hours;
+  }
+
+  async rerun(
+    dto: StatsRerunDto,
+  ): Promise<
+    | { fromSec: number; toSec: number; hours: number[]; count: number }
+    | { fromSec: number; toSec: number; hoursProcessed: number }
+  > {
+    const totalHours = (dto.toSec - dto.fromSec) / 3600;
+    this.logger.log(
+      `Stats rerun requested [${dto.fromSec},${dto.toSec}) hours=${totalHours} dryRun=${dto.dryRun ?? false}`,
+    );
+
+    if (dto.dryRun) {
+      const hours = this.buildHours(dto.fromSec, dto.toSec);
+      return {
+        fromSec: dto.fromSec,
+        toSec: dto.toSec,
+        hours,
+        count: hours.length,
+      };
+    }
+
+    let cursor = dto.fromSec;
+    while (cursor < dto.toSec) {
+      const startSec = cursor;
+      const endSec = cursor + 3600;
+      await this.statsHourly.aggregateAdsHourly(startSec, endSec);
+      await this.statsHourly.aggregateChannelHourly(startSec, endSec);
+      await this.statsDaily.aggregateAdsDaily(startSec, endSec);
+      await this.statsDaily.aggregateChannelDaily(startSec, endSec);
+      cursor = endSec;
+    }
+
+    this.logger.log(
+      `Stats rerun completed [${dto.fromSec},${dto.toSec}) hours=${totalHours}`,
+    );
+    return {
+      fromSec: dto.fromSec,
+      toSec: dto.toSec,
+      hoursProcessed: totalHours,
+    };
+  }
+}

+ 218 - 0
apps/box-stats-api/src/feature/stats-events/stats.daily.aggregation.service.ts

@@ -0,0 +1,218 @@
+// box-stats-api/src/feature/stats-events/stats.daily.aggregation.service.ts
+import { Injectable, Logger } from '@nestjs/common';
+import { PrismaMongoService } from '../../prisma/prisma-mongo.service';
+
+type AdsDailyAggRow = {
+  adsId: string;
+  clicks: string;
+};
+
+type ChannelDailyAggRow = {
+  channelId: string;
+  dayStartAt: number;
+  total: number;
+  uniqueUsers: number;
+};
+
+@Injectable()
+export class StatsDailyAggregationService {
+  private readonly logger = new Logger(StatsDailyAggregationService.name);
+
+  constructor(private readonly prisma: PrismaMongoService) {}
+
+  async aggregateAdsDaily(startSec: number, endSec: number): Promise<number> {
+    const dayStarts = this.getAffectedDayStarts(startSec, endSec);
+    let totalRows = 0;
+
+    const nowSec = Math.floor(Date.now() / 1000);
+
+    for (const dayStartAt of dayStarts) {
+      const dayEndAt = dayStartAt + 86400;
+      const pipeline = [
+        {
+          $match: {
+            adsId: { $ne: null },
+            $expr: {
+              $and: [
+                { $gte: [{ $toLong: '$hourStartAt' }, dayStartAt] },
+                { $lt: [{ $toLong: '$hourStartAt' }, dayEndAt] },
+              ],
+            },
+          },
+        },
+        {
+          $project: {
+            adsId: { $toString: '$adsId' },
+            clicks: 1,
+          },
+        },
+        {
+          $group: {
+            _id: '$adsId',
+            clicks: { $sum: { $toLong: '$clicks' } },
+          },
+        },
+        {
+          $project: {
+            _id: 0,
+            adsId: '$_id',
+            clicks: { $toString: '$clicks' },
+          },
+        },
+      ] as const;
+
+      const rows = (await (this.prisma.adsHourlyStats as any).aggregateRaw({
+        pipeline,
+      })) as AdsDailyAggRow[];
+
+      totalRows += rows.length;
+
+      for (const row of rows) {
+        await this.prisma.adsDailyStats.upsert({
+          where: {
+            adsId_dayStartAt: {
+              adsId: row.adsId,
+              dayStartAt: BigInt(dayStartAt),
+            },
+          },
+          update: {
+            clicks: BigInt(row.clicks),
+            updateAt: BigInt(nowSec),
+          },
+          create: {
+            adsId: row.adsId,
+            dayStartAt: BigInt(dayStartAt),
+            clicks: BigInt(row.clicks),
+            createAt: BigInt(nowSec),
+            updateAt: BigInt(nowSec),
+          },
+        });
+      }
+    }
+
+    this.logger.log(
+      `AdsDaily aggregation: window=[${startSec},${endSec}) dayStarts=[${dayStarts.join(
+        ',',
+      )}] rows=${totalRows}`,
+    );
+    return totalRows;
+  }
+
+  async aggregateChannelDaily(
+    startSec: number,
+    endSec: number,
+  ): Promise<number> {
+    const pipeline = [
+      {
+        $match: {
+          $expr: {
+            $and: [
+              { $gte: [{ $toLong: '$createAt' }, startSec] },
+              { $lt: [{ $toLong: '$createAt' }, endSec] },
+            ],
+          },
+        },
+      },
+      {
+        $project: {
+          channelId: 1,
+          uid: 1,
+          dayStartAt: {
+            $toLong: {
+              $divide: [
+                {
+                  $toLong: {
+                    $dateTrunc: {
+                      date: { $toDate: { $multiply: ['$createAt', 1000] } },
+                      unit: 'day',
+                      timezone: '+08:00',
+                    },
+                  },
+                },
+                1000,
+              ],
+            },
+          },
+        },
+      },
+      {
+        $group: {
+          _id: {
+            channelId: '$channelId',
+            dayStartAt: '$dayStartAt',
+            uid: '$uid',
+          },
+          cnt: { $sum: 1 },
+        },
+      },
+      {
+        $group: {
+          _id: {
+            channelId: '$_id.channelId',
+            dayStartAt: '$_id.dayStartAt',
+          },
+          total: { $sum: '$cnt' },
+          uniqueUsers: { $sum: 1 },
+        },
+      },
+      {
+        $project: {
+          _id: 0,
+          channelId: '$_id.channelId',
+          dayStartAt: '$_id.dayStartAt',
+          total: 1,
+          uniqueUsers: 1,
+        },
+      },
+    ] as const;
+
+    const rows = (await (this.prisma.userLoginHistory as any).aggregateRaw({
+      pipeline,
+    })) as ChannelDailyAggRow[];
+
+    const nowSec = Math.floor(Date.now() / 1000);
+
+    for (const row of rows) {
+      await this.prisma.channelDailyUserStats.upsert({
+        where: {
+          channelId_dayStartAt: {
+            channelId: row.channelId,
+            dayStartAt: BigInt(row.dayStartAt),
+          },
+        },
+        update: {
+          total: BigInt(row.total),
+          uniqueUsers: BigInt(row.uniqueUsers),
+          updateAt: BigInt(nowSec),
+        },
+        create: {
+          channelId: row.channelId,
+          dayStartAt: BigInt(row.dayStartAt),
+          total: BigInt(row.total),
+          uniqueUsers: BigInt(row.uniqueUsers),
+          createAt: BigInt(nowSec),
+          updateAt: BigInt(nowSec),
+        },
+      });
+    }
+
+    this.logger.log(
+      `ChannelDaily aggregation: window=[${startSec},${endSec}) buckets=${rows.length}`,
+    );
+    return rows.length;
+  }
+
+  private getAffectedDayStarts(startSec: number, endSec: number): number[] {
+    const firstDay = this.getGmt8DayStart(startSec);
+    const lastMoment = endSec - 1;
+    const lastDay = this.getGmt8DayStart(lastMoment);
+    return firstDay === lastDay ? [firstDay] : [firstDay, lastDay];
+  }
+
+  private getGmt8DayStart(epochSec: number): number {
+    const offset = 8 * 3600;
+    const shifted = epochSec + offset;
+    const dayStartShifted = Math.floor(shifted / 86400) * 86400;
+    return dayStartShifted - offset;
+  }
+}

+ 230 - 0
apps/box-stats-api/src/feature/stats-events/stats.hourly.aggregation.service.ts

@@ -0,0 +1,230 @@
+// box-stats-api/src/feature/stats-events/stats.hourly.aggregation.service.ts
+import { Injectable, Logger } from '@nestjs/common';
+import { PrismaMongoService } from '../../prisma/prisma-mongo.service';
+import { computeHourlyWindowUtcSec } from '../time-helper';
+
+type AdsHourlyAggRow = {
+  adsId: string;
+  hourStartAt: number; // seconds
+  clicks: number;
+};
+
+type ChannelHourlyAggRow = {
+  channelId: string;
+  hourStartAt: number; // seconds
+  total: number;
+  uniqueUsers: number;
+};
+
+@Injectable()
+export class StatsHourlyAggregationService {
+  private readonly logger = new Logger(StatsHourlyAggregationService.name);
+
+  constructor(private readonly prisma: PrismaMongoService) {}
+
+  async aggregateAdsHourly(startSec: number, endSec: number): Promise<number> {
+    const pipeline = [
+      {
+        $match: {
+          adsId: { $ne: null },
+          $expr: {
+            $and: [
+              { $gte: [{ $toLong: '$clickedAt' }, startSec] },
+              { $lt: [{ $toLong: '$clickedAt' }, endSec] },
+            ],
+          },
+        },
+      },
+      {
+        $project: {
+          adsId: { $toString: '$adsId' },
+          // Convert seconds -> Date(ms), truncate to hour in GMT+8, convert back to seconds
+          hourStartAt: {
+            $toLong: {
+              $divide: [
+                {
+                  $toLong: {
+                    $dateTrunc: {
+                      date: { $toDate: { $multiply: ['$clickedAt', 1000] } },
+                      unit: 'hour',
+                      timezone: '+08:00',
+                    },
+                  },
+                },
+                1000,
+              ],
+            },
+          },
+        },
+      },
+      {
+        $group: {
+          _id: { adsId: '$adsId', hourStartAt: '$hourStartAt' },
+          clicks: { $sum: 1 },
+        },
+      },
+      {
+        $project: {
+          _id: 0,
+          adsId: '$_id.adsId',
+          hourStartAt: '$_id.hourStartAt',
+          clicks: 1,
+        },
+      },
+    ] as const;
+
+    const rows = (await (this.prisma.adClickEvents as any).aggregateRaw({
+      pipeline,
+    })) as AdsHourlyAggRow[];
+
+    const nowSec = Math.floor(Date.now() / 1000);
+
+    for (const row of rows) {
+      await this.prisma.adsHourlyStats.upsert({
+        where: {
+          adsId_hourStartAt: {
+            adsId: row.adsId,
+            hourStartAt: BigInt(row.hourStartAt),
+          },
+        },
+        update: {
+          clicks: BigInt(row.clicks),
+          updateAt: BigInt(nowSec),
+        },
+        create: {
+          adsId: row.adsId,
+          hourStartAt: BigInt(row.hourStartAt),
+          clicks: BigInt(row.clicks),
+          createAt: BigInt(nowSec),
+          updateAt: BigInt(nowSec),
+        },
+      });
+    }
+
+    this.logger.log(
+      `AdsHourly aggregation: window=[${startSec},${endSec}) buckets=${rows.length}`,
+    );
+    return rows.length;
+  }
+
+  async aggregateChannelHourly(
+    startSec: number,
+    endSec: number,
+  ): Promise<number> {
+    const pipeline = [
+      {
+        $match: {
+          $expr: {
+            $and: [
+              { $gte: [{ $toLong: '$createAt' }, startSec] },
+              { $lt: [{ $toLong: '$createAt' }, endSec] },
+            ],
+          },
+        },
+      },
+      {
+        $project: {
+          channelId: 1,
+          uid: 1,
+          hourStartAt: {
+            $toLong: {
+              $divide: [
+                {
+                  $toLong: {
+                    $dateTrunc: {
+                      date: { $toDate: { $multiply: ['$createAt', 1000] } },
+                      unit: 'hour',
+                      timezone: '+08:00',
+                    },
+                  },
+                },
+                1000,
+              ],
+            },
+          },
+        },
+      },
+      // 1) unique key per user per channel per hour
+      {
+        $group: {
+          _id: {
+            channelId: '$channelId',
+            hourStartAt: '$hourStartAt',
+            uid: '$uid',
+          },
+          cnt: { $sum: 1 },
+        },
+      },
+      // 2) roll up to channel/hour
+      {
+        $group: {
+          _id: {
+            channelId: '$_id.channelId',
+            hourStartAt: '$_id.hourStartAt',
+          },
+          total: { $sum: '$cnt' },
+          uniqueUsers: { $sum: 1 },
+        },
+      },
+      {
+        $project: {
+          _id: 0,
+          channelId: '$_id.channelId',
+          hourStartAt: '$_id.hourStartAt',
+          total: 1,
+          uniqueUsers: 1,
+        },
+      },
+    ] as const;
+
+    const rows = (await (this.prisma.userLoginHistory as any).aggregateRaw({
+      pipeline,
+    })) as ChannelHourlyAggRow[];
+
+    const nowSec = Math.floor(Date.now() / 1000);
+
+    for (const row of rows) {
+      await this.prisma.channelHourlyUserStats.upsert({
+        where: {
+          channelId_hourStartAt: {
+            channelId: row.channelId,
+            hourStartAt: BigInt(row.hourStartAt),
+          },
+        },
+        update: {
+          total: BigInt(row.total),
+          uniqueUsers: BigInt(row.uniqueUsers),
+          updateAt: BigInt(nowSec),
+        },
+        create: {
+          channelId: row.channelId,
+          hourStartAt: BigInt(row.hourStartAt),
+          total: BigInt(row.total),
+          uniqueUsers: BigInt(row.uniqueUsers),
+          createAt: BigInt(nowSec),
+          updateAt: BigInt(nowSec),
+        },
+      });
+    }
+
+    this.logger.log(
+      `ChannelHourly aggregation: window=[${startSec},${endSec}) buckets=${rows.length}`,
+    );
+    return rows.length;
+  }
+
+  async aggregateRecentHourly(lookbackHours = 2): Promise<void> {
+    const nowSec = Math.floor(Date.now() / 1000);
+    const { startSec, endSec } = computeHourlyWindowUtcSec(
+      nowSec,
+      lookbackHours,
+    );
+
+    await this.aggregateAdsHourly(startSec, endSec);
+    await this.aggregateChannelHourly(startSec, endSec);
+
+    this.logger.log(
+      `Hourly aggregation window computed via time-helper: lookback=${lookbackHours} window=[${startSec},${endSec})`,
+    );
+  }
+}

+ 19 - 0
apps/box-stats-api/src/feature/time-helper.ts

@@ -0,0 +1,19 @@
+// box-stats-api/src/feature/time-helper.ts
+const TZ_OFFSET_SEC = 8 * 3600;
+
+export function getGmt8HourStartUtcSec(epochSec: number): number {
+  const shifted = epochSec + TZ_OFFSET_SEC;
+  const hourStartShifted = Math.floor(shifted / 3600) * 3600;
+  return hourStartShifted - TZ_OFFSET_SEC; // still UTC epoch seconds
+}
+
+export function getGmt8CurrentHourStartUtcSec(nowSec: number): number {
+  return getGmt8HourStartUtcSec(nowSec);
+}
+
+export function computeHourlyWindowUtcSec(nowSec: number, lookbackHours = 2) {
+  const currentHourStartUtc = getGmt8CurrentHourStartUtcSec(nowSec);
+  const startSec = currentHourStartUtc - lookbackHours * 3600;
+  const endSec = nowSec; // realtime
+  return { startSec, endSec };
+}

+ 81 - 0
prisma/mongo-stats/schema/daily-hourly-stats.prisma

@@ -0,0 +1,81 @@
+model AdsHourlyStats {
+  id          String @id @map("_id") @default(auto()) @db.ObjectId
+
+  adsId       String @db.ObjectId
+
+  // Hour bucket start time (epoch seconds, GMT+8 aligned in code)
+  hourStartAt BigInt
+
+  clicks      BigInt @default(0)
+
+  createAt    BigInt
+  updateAt    BigInt
+
+  @@index([hourStartAt])
+  @@unique([adsId, hourStartAt])
+
+  @@map("adsHourlyStats")
+}
+
+model AdsDailyStats {
+  id          String @id @map("_id") @default(auto()) @db.ObjectId
+
+  adsId       String @db.ObjectId
+
+  // Day bucket start time (epoch seconds, GMT+8 aligned in code)
+  dayStartAt  BigInt
+
+  clicks      BigInt @default(0)
+
+  createAt    BigInt
+  updateAt    BigInt
+
+  @@index([dayStartAt])
+  @@unique([adsId, dayStartAt])
+
+  @@map("adsDailyStats")
+}
+
+model ChannelHourlyUserStats {
+  id          String @id @map("_id") @default(auto()) @db.ObjectId
+
+  channelId   String
+
+  // Hour bucket start time (epoch seconds, GMT+8 aligned in code)
+  hourStartAt BigInt
+
+  // Total events in this hour (e.g. login count)
+  total       BigInt @default(0)
+
+  // Unique user count (dedup by uid during aggregation)
+  uniqueUsers BigInt @default(0)
+
+  createAt    BigInt
+  updateAt    BigInt
+
+  @@index([hourStartAt])
+  @@unique([channelId, hourStartAt])
+
+  @@map("channelHourlyUserStats")
+}
+
+model ChannelDailyUserStats {
+  id          String @id @map("_id") @default(auto()) @db.ObjectId
+
+  channelId   String
+
+  // Day bucket start time (epoch seconds, GMT+8 aligned in code)
+  dayStartAt  BigInt
+
+  total       BigInt @default(0)
+  uniqueUsers BigInt @default(0)
+
+  createAt    BigInt
+  updateAt    BigInt
+
+
+  @@index([dayStartAt])
+  @@unique([channelId, dayStartAt])
+
+  @@map("channelDailyUserStats")
+}

+ 2 - 2
prisma/mongo-stats/schema/events.prisma

@@ -33,8 +33,8 @@ model ProcessedMessage {
   id          String @id @map("_id") @default(auto()) @db.ObjectId
   messageId   String @unique // 去重用的唯一消息 ID
   eventType   String // 事件类型(如 stats.ad.click)
-  processedAt BigInt // 处理时间(epoch ms)
-  createdAt   BigInt // 创建时间(epoch ms)
+  processedAt BigInt // 处理时间 (epoch seconds)
+  createdAt   BigInt // 创建时间 (epoch seconds)
 
   @@map("processedMessages")
 }