Pārlūkot izejas kodu

refactor: streamline stats aggregation by consolidating rerun logic and enhancing hourly aggregation handling

Dave 1 mēnesi atpakaļ
vecāks
revīzija
90b60fceee

+ 1 - 49
apps/box-stats-api/src/feature/stats-events/stats-aggregation.scheduler.ts

@@ -3,20 +3,6 @@ import { Cron, CronExpression } from '@nestjs/schedule';
 import { ConfigService } from '@nestjs/config';
 import { StatsAggregationService } from './stats-aggregation.service';
 
-type AggregationResult = {
-  successCount: number;
-  totalProcessed: number;
-  errorCount: number;
-};
-
-type RerunRangeCapable = {
-  rerunRange: (args: {
-    fromSec: number;
-    toSec: number;
-    dryRun?: boolean;
-  }) => Promise<unknown>;
-};
-
 type DailyRefreshCapable = {
   refreshDailyDerivedFromHourly: (args: {
     fromSec: number;
@@ -141,35 +127,7 @@ export class StatsAggregationScheduler implements OnModuleInit {
     this.logger.log(`${tag} start`);
 
     try {
-      // Preferred path: exact hour rerun (idempotent)
-      if (this.hasRerunRange(this.statsAggregation)) {
-        await this.statsAggregation.rerunRange({
-          fromSec,
-          toSec,
-          dryRun: false,
-        });
-        this.logger.log(`${tag} rerunRange done`);
-      } else {
-        // Backward-compatible fallback
-        const result = (await this.statsAggregation.aggregateAdsStats({
-          windowDays: this.windowDays,
-        })) as AggregationResult;
-
-        this.logger.warn(
-          `${tag} used fallback aggregateAdsStats(windowDays=${
-            this.windowDays ?? 'all time'
-          }) (${result.successCount}/${result.totalProcessed} updated, ${result.errorCount} errors)`,
-        );
-      }
-
-      // Optional daily refresh triggered after hourly
-      if (this.hasDailyRefresh(this.statsAggregation)) {
-        await this.statsAggregation.refreshDailyDerivedFromHourly({
-          fromSec,
-          toSec,
-        });
-        this.logger.log(`${tag} daily refresh done`);
-      }
+      await this.statsAggregation.aggregateHourWindow({ fromSec, toSec });
 
       this.lastHourlyWindowToSec = toSec;
 
@@ -259,12 +217,6 @@ export class StatsAggregationScheduler implements OnModuleInit {
     return dayStartShifted - shift;
   }
 
-  private hasRerunRange(
-    svc: StatsAggregationService,
-  ): svc is StatsAggregationService & RerunRangeCapable {
-    return typeof (svc as any)?.rerunRange === 'function';
-  }
-
   private hasDailyRefresh(
     svc: StatsAggregationService,
   ): svc is StatsAggregationService & DailyRefreshCapable {

+ 27 - 1
apps/box-stats-api/src/feature/stats-events/stats-aggregation.service.ts

@@ -2,6 +2,8 @@
 import { Injectable, Logger } from '@nestjs/common';
 import { PrismaMongoService } from '../../prisma/prisma-mongo.service';
 import { nowSecBigInt } from '@box/common/time/time.util';
+import { StatsHourlyAggregationService } from './stats.hourly.aggregation.service';
+import { StatsDailyAggregationService } from './stats.daily.aggregation.service';
 
 interface AggregationOptions {
   windowDays?: number; // kept for API compatibility, but ignored per new requirements
@@ -21,7 +23,11 @@ type AdIdentity =
 export class StatsAggregationService {
   private readonly logger = new Logger(StatsAggregationService.name);
 
-  constructor(private readonly prisma: PrismaMongoService) {}
+  constructor(
+    private readonly prisma: PrismaMongoService,
+    private readonly statsHourly: StatsHourlyAggregationService,
+    private readonly statsDaily: StatsDailyAggregationService,
+  ) {}
 
   async aggregateAdsStats(
     _options: AggregationOptions = {},
@@ -198,6 +204,26 @@ export class StatsAggregationService {
     );
   }
 
+  async aggregateHourWindow(args: {
+    fromSec: number;
+    toSec: number;
+  }): Promise<void> {
+    const { fromSec, toSec } = args;
+    const windowLen = toSec - fromSec;
+    if (windowLen !== 3600) {
+      this.logger.warn(
+        `aggregateHourWindow expected 1h window but got length=${windowLen} [${fromSec},${toSec})`,
+      );
+    }
+
+    await this.statsHourly.aggregateAdsHourly(fromSec, toSec);
+    await this.statsHourly.aggregateChannelHourly(fromSec, toSec);
+    await this.statsDaily.aggregateAdsDaily(fromSec, toSec);
+    await this.statsDaily.aggregateChannelDaily(fromSec, toSec);
+
+    this.logger.log(`aggregateHourWindow done window=[${fromSec},${toSec})`);
+  }
+
   private identityKey(idn: AdIdentity): string {
     return idn.kind === 'adsId' ? `adsId:${idn.adsId}` : `adId:${idn.adId}`;
   }

+ 6 - 10
apps/box-stats-api/src/feature/stats-events/stats-rerun.service.ts

@@ -1,16 +1,12 @@
 import { Injectable, Logger } from '@nestjs/common';
-import { StatsHourlyAggregationService } from './stats.hourly.aggregation.service';
-import { StatsDailyAggregationService } from './stats.daily.aggregation.service';
+import { StatsAggregationService } from './stats-aggregation.service';
 import { StatsRerunDto } from './dto/stats-rerun.dto';
 
 @Injectable()
 export class StatsRerunService {
   private readonly logger = new Logger(StatsRerunService.name);
 
-  constructor(
-    private readonly statsHourly: StatsHourlyAggregationService,
-    private readonly statsDaily: StatsDailyAggregationService,
-  ) {}
+  constructor(private readonly statsAggregation: StatsAggregationService) {}
 
   buildHours(fromSec: number, toSec: number): number[] {
     const hours: number[] = [];
@@ -45,10 +41,10 @@ export class StatsRerunService {
     while (cursor < dto.toSec) {
       const startSec = cursor;
       const endSec = cursor + 3600;
-      await this.statsHourly.aggregateAdsHourly(startSec, endSec);
-      await this.statsHourly.aggregateChannelHourly(startSec, endSec);
-      await this.statsDaily.aggregateAdsDaily(startSec, endSec);
-      await this.statsDaily.aggregateChannelDaily(startSec, endSec);
+      await this.statsAggregation.aggregateHourWindow({
+        fromSec: startSec,
+        toSec: endSec,
+      });
       cursor = endSec;
     }