|
|
@@ -9,6 +9,21 @@ type AggregationResult = {
|
|
|
errorCount: number;
|
|
|
};
|
|
|
|
|
|
+type RerunRangeCapable = {
|
|
|
+ rerunRange: (args: {
|
|
|
+ fromSec: number;
|
|
|
+ toSec: number;
|
|
|
+ dryRun?: boolean;
|
|
|
+ }) => Promise<unknown>;
|
|
|
+};
|
|
|
+
|
|
|
+type DailyRefreshCapable = {
|
|
|
+ refreshDailyDerivedFromHourly: (args: {
|
|
|
+ fromSec: number;
|
|
|
+ toSec: number;
|
|
|
+ }) => Promise<void>;
|
|
|
+};
|
|
|
+
|
|
|
@Injectable()
|
|
|
export class StatsAggregationScheduler implements OnModuleInit {
|
|
|
private readonly logger = new Logger(StatsAggregationScheduler.name);
|
|
|
@@ -17,7 +32,11 @@ export class StatsAggregationScheduler implements OnModuleInit {
|
|
|
private windowDays?: number;
|
|
|
|
|
|
// guardrails: avoid overlapping runs + spam
|
|
|
- private runningAds = false;
|
|
|
+ private runningHourly = false;
|
|
|
+ private runningDaily = false;
|
|
|
+
|
|
|
+ // run a little after boundary so late events can land
|
|
|
+ private runDelaySec = 90;
|
|
|
|
|
|
constructor(
|
|
|
private readonly configService: ConfigService,
|
|
|
@@ -25,13 +44,11 @@ export class StatsAggregationScheduler implements OnModuleInit {
|
|
|
) {}
|
|
|
|
|
|
onModuleInit(): void {
|
|
|
- // Evaluate config on module init (safer than constructor for startup ordering)
|
|
|
const enabledRaw = this.configService
|
|
|
.get<string>('STATS_AGGREGATION_ENABLED')
|
|
|
?.trim()
|
|
|
.toLowerCase();
|
|
|
|
|
|
- // default: enabled (unless explicitly "false" or "0" etc.)
|
|
|
this.enabled = !['false', '0', 'off', 'no'].includes(enabledRaw ?? '');
|
|
|
|
|
|
const daysRaw = this.configService
|
|
|
@@ -50,11 +67,26 @@ export class StatsAggregationScheduler implements OnModuleInit {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ const delayRaw = this.configService
|
|
|
+ .get<string>('STATS_AGGREGATION_RUN_DELAY_SEC')
|
|
|
+ ?.trim();
|
|
|
+
|
|
|
+ if (delayRaw) {
|
|
|
+ const parsed = Number.parseInt(delayRaw, 10);
|
|
|
+ if (Number.isFinite(parsed) && parsed >= 0 && parsed <= 900) {
|
|
|
+ this.runDelaySec = parsed;
|
|
|
+ } else {
|
|
|
+ this.logger.warn(
|
|
|
+ `Invalid STATS_AGGREGATION_RUN_DELAY_SEC="${delayRaw}" (expected 0..900). Using default ${this.runDelaySec}s.`,
|
|
|
+ );
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
if (this.enabled) {
|
|
|
this.logger.log(
|
|
|
`📊 Stats aggregation scheduler enabled (windowDays=${
|
|
|
this.windowDays ?? 'all time'
|
|
|
- }, interval=${CronExpression.EVERY_5_MINUTES})`,
|
|
|
+ }, hourly=${CronExpression.EVERY_HOUR}, daily=00:10, delaySec=${this.runDelaySec})`,
|
|
|
);
|
|
|
} else {
|
|
|
this.logger.warn(
|
|
|
@@ -63,43 +95,158 @@ export class StatsAggregationScheduler implements OnModuleInit {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- @Cron(CronExpression.EVERY_5_MINUTES, { name: 'stats-aggregation-ads' })
|
|
|
- async runAdsAggregation(): Promise<void> {
|
|
|
+ /**
|
|
|
+ * Hourly is the unit of truth.
|
|
|
+ * Runs slightly after the hour, but still processes the previous full hour window.
|
|
|
+ */
|
|
|
+ @Cron(CronExpression.EVERY_HOUR, { name: 'stats-aggregation-hourly' })
|
|
|
+ async runHourly(): Promise<void> {
|
|
|
if (!this.enabled) return;
|
|
|
|
|
|
- if (this.runningAds) {
|
|
|
+ if (this.runningHourly) {
|
|
|
this.logger.warn(
|
|
|
- '⏭️ Skip ads aggregation: previous run still in progress',
|
|
|
+ '⏭️ Skip hourly aggregation: previous run still in progress',
|
|
|
);
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- this.runningAds = true;
|
|
|
- const start = Date.now();
|
|
|
+ this.runningHourly = true;
|
|
|
+ const startedAt = Date.now();
|
|
|
+
|
|
|
+ const nowSec = Math.floor(Date.now() / 1000);
|
|
|
+ const effectiveNowSec = nowSec - this.runDelaySec;
|
|
|
|
|
|
- this.logger.log(
|
|
|
- `⏰ Ads aggregation start (windowDays=${this.windowDays ?? 'all time'})`,
|
|
|
- );
|
|
|
+ const { fromSec, toSec } = this.prevHourWindow(effectiveNowSec);
|
|
|
+ const tag = `⏰ Hourly aggregation [${fromSec},${toSec})`;
|
|
|
+
|
|
|
+ this.logger.log(`${tag} start`);
|
|
|
|
|
|
try {
|
|
|
- const result = (await this.statsAggregation.aggregateAdsStats({
|
|
|
- windowDays: this.windowDays,
|
|
|
- })) as AggregationResult;
|
|
|
+ // Preferred path: exact hour rerun (idempotent)
|
|
|
+ if (this.hasRerunRange(this.statsAggregation)) {
|
|
|
+ await this.statsAggregation.rerunRange({
|
|
|
+ fromSec,
|
|
|
+ toSec,
|
|
|
+ dryRun: false,
|
|
|
+ });
|
|
|
+ this.logger.log(`${tag} rerunRange done`);
|
|
|
+ } else {
|
|
|
+ // Backward-compatible fallback
|
|
|
+ const result = (await this.statsAggregation.aggregateAdsStats({
|
|
|
+ windowDays: this.windowDays,
|
|
|
+ })) as AggregationResult;
|
|
|
|
|
|
- const ms = Date.now() - start;
|
|
|
- this.logger.log(
|
|
|
- `✅ Ads aggregation done in ${ms}ms (${result.successCount}/${result.totalProcessed} updated, ${result.errorCount} errors)`,
|
|
|
+ this.logger.warn(
|
|
|
+ `${tag} used fallback aggregateAdsStats(windowDays=${
|
|
|
+ this.windowDays ?? 'all time'
|
|
|
+ }) (${result.successCount}/${result.totalProcessed} updated, ${result.errorCount} errors)`,
|
|
|
+ );
|
|
|
+ }
|
|
|
+
|
|
|
+ // Optional daily refresh triggered after hourly
|
|
|
+ if (this.hasDailyRefresh(this.statsAggregation)) {
|
|
|
+ await this.statsAggregation.refreshDailyDerivedFromHourly({
|
|
|
+ fromSec,
|
|
|
+ toSec,
|
|
|
+ });
|
|
|
+ this.logger.log(`${tag} daily refresh done`);
|
|
|
+ }
|
|
|
+
|
|
|
+ const ms = Date.now() - startedAt;
|
|
|
+ this.logger.log(`${tag} ✅ done in ${ms}ms`);
|
|
|
+ } catch (err: any) {
|
|
|
+ const ms = Date.now() - startedAt;
|
|
|
+ this.logger.error(
|
|
|
+ `${tag} ❌ failed after ${ms}ms: ${err?.message || String(err)}`,
|
|
|
+ err?.stack,
|
|
|
+ );
|
|
|
+ } finally {
|
|
|
+ this.runningHourly = false;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Daily refresh is derived; safe to run once a day as a "catch-all".
|
|
|
+ * 00:10 (GMT+8 business) — note: Cron uses server TZ; adjust if needed.
|
|
|
+ */
|
|
|
+ @Cron('10 0 * * *', { name: 'stats-aggregation-daily' })
|
|
|
+ async runDailyCatchAll(): Promise<void> {
|
|
|
+ if (!this.enabled) return;
|
|
|
+
|
|
|
+ if (this.runningDaily) {
|
|
|
+ this.logger.warn(
|
|
|
+ '⏭️ Skip daily catch-all: previous run still in progress',
|
|
|
+ );
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (!this.hasDailyRefresh(this.statsAggregation)) {
|
|
|
+ // No-op if your service doesn’t support daily refresh yet
|
|
|
+ this.logger.warn(
|
|
|
+ 'ℹ️ Daily catch-all skipped: refreshDailyDerivedFromHourly not implemented',
|
|
|
);
|
|
|
- } catch (err) {
|
|
|
- const ms = Date.now() - start;
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ this.runningDaily = true;
|
|
|
+ const startedAt = Date.now();
|
|
|
+
|
|
|
+ // Refresh “yesterday” (GMT+8 aligned) to catch late events
|
|
|
+ const nowSec = Math.floor(Date.now() / 1000);
|
|
|
+ const yesterdayStartSec = this.floorToDayGmt8(nowSec) - 86400;
|
|
|
+ const yesterdayEndSec = yesterdayStartSec + 86400;
|
|
|
+ const tag = `🗓️ Daily catch-all (GMT+8) [${yesterdayStartSec},${yesterdayEndSec})`;
|
|
|
+
|
|
|
+ this.logger.log(`${tag} start`);
|
|
|
+
|
|
|
+ try {
|
|
|
+ await this.statsAggregation.refreshDailyDerivedFromHourly({
|
|
|
+ fromSec: yesterdayStartSec,
|
|
|
+ toSec: yesterdayEndSec,
|
|
|
+ });
|
|
|
+
|
|
|
+ const ms = Date.now() - startedAt;
|
|
|
+ this.logger.log(`${tag} ✅ done in ${ms}ms`);
|
|
|
+ } catch (err: any) {
|
|
|
+ const ms = Date.now() - startedAt;
|
|
|
this.logger.error(
|
|
|
- `❌ Ads aggregation failed after ${ms}ms: ${
|
|
|
- err instanceof Error ? err.message : String(err)
|
|
|
- }`,
|
|
|
- err instanceof Error ? err.stack : undefined,
|
|
|
+ `${tag} ❌ failed after ${ms}ms: ${err?.message || String(err)}`,
|
|
|
+ err?.stack,
|
|
|
);
|
|
|
} finally {
|
|
|
- this.runningAds = false;
|
|
|
+ this.runningDaily = false;
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ private prevHourWindow(nowSec: number): { fromSec: number; toSec: number } {
|
|
|
+ const endSec = this.floorToHour(nowSec);
|
|
|
+ return { fromSec: endSec - 3600, toSec: endSec };
|
|
|
+ }
|
|
|
+
|
|
|
+ private floorToHour(sec: number): number {
|
|
|
+ return sec - (sec % 3600);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * GMT+8 day bucket start in UTC seconds:
|
|
|
+ * shift +8h, floor to day, shift back
|
|
|
+ */
|
|
|
+ private floorToDayGmt8(secUtc: number): number {
|
|
|
+ const shift = 8 * 3600;
|
|
|
+ const shifted = secUtc + shift;
|
|
|
+ const dayStartShifted = shifted - (shifted % 86400);
|
|
|
+ return dayStartShifted - shift;
|
|
|
+ }
|
|
|
+
|
|
|
+ private hasRerunRange(
|
|
|
+ svc: StatsAggregationService,
|
|
|
+ ): svc is StatsAggregationService & RerunRangeCapable {
|
|
|
+ return typeof (svc as any)?.rerunRange === 'function';
|
|
|
+ }
|
|
|
+
|
|
|
+ private hasDailyRefresh(
|
|
|
+ svc: StatsAggregationService,
|
|
|
+ ): svc is StatsAggregationService & DailyRefreshCapable {
|
|
|
+ return typeof (svc as any)?.refreshDailyDerivedFromHourly === 'function';
|
|
|
+ }
|
|
|
}
|