import { Injectable, Logger, OnModuleInit } from '@nestjs/common'; import { Cron, CronExpression } from '@nestjs/schedule'; import { ConfigService } from '@nestjs/config'; import { StatsAggregationService } from './stats-aggregation.service'; type AggregationResult = { successCount: number; totalProcessed: number; errorCount: number; }; type RerunRangeCapable = { rerunRange: (args: { fromSec: number; toSec: number; dryRun?: boolean; }) => Promise; }; type DailyRefreshCapable = { refreshDailyDerivedFromHourly: (args: { fromSec: number; toSec: number; }) => Promise; }; @Injectable() export class StatsAggregationScheduler implements OnModuleInit { private readonly logger = new Logger(StatsAggregationScheduler.name); private enabled = true; private windowDays?: number; // guardrails: avoid overlapping runs + spam private runningHourly = false; private runningDaily = false; // run a little after boundary so late events can land private runDelaySec = 90; constructor( private readonly configService: ConfigService, private readonly statsAggregation: StatsAggregationService, ) {} onModuleInit(): void { const enabledRaw = this.configService .get('STATS_AGGREGATION_ENABLED') ?.trim() .toLowerCase(); this.enabled = !['false', '0', 'off', 'no'].includes(enabledRaw ?? ''); const daysRaw = this.configService .get('STATS_AGGREGATION_WINDOW_DAYS') ?.trim(); if (daysRaw) { const parsed = Number.parseInt(daysRaw, 10); if (Number.isFinite(parsed) && parsed > 0) { this.windowDays = parsed; } else { this.logger.warn( `Invalid STATS_AGGREGATION_WINDOW_DAYS="${daysRaw}" (expected positive integer). Falling back to "all time".`, ); this.windowDays = undefined; } } const delayRaw = this.configService .get('STATS_AGGREGATION_RUN_DELAY_SEC') ?.trim(); if (delayRaw) { const parsed = Number.parseInt(delayRaw, 10); if (Number.isFinite(parsed) && parsed >= 0 && parsed <= 900) { this.runDelaySec = parsed; } else { this.logger.warn( `Invalid STATS_AGGREGATION_RUN_DELAY_SEC="${delayRaw}" (expected 0..900). Using default ${this.runDelaySec}s.`, ); } } if (this.enabled) { this.logger.log( `📊 Stats aggregation scheduler enabled (windowDays=${ this.windowDays ?? 'all time' }, hourly=${CronExpression.EVERY_HOUR}, daily=00:10, delaySec=${this.runDelaySec})`, ); } else { this.logger.warn( `📊 Stats aggregation scheduler DISABLED (STATS_AGGREGATION_ENABLED="${enabledRaw ?? ''}")`, ); } } /** * Hourly is the unit of truth. * Runs slightly after the hour, but still processes the previous full hour window. */ @Cron(CronExpression.EVERY_HOUR, { name: 'stats-aggregation-hourly' }) async runHourly(): Promise { if (!this.enabled) return; if (this.runningHourly) { this.logger.warn( '⏭️ Skip hourly aggregation: previous run still in progress', ); return; } this.runningHourly = true; const startedAt = Date.now(); const nowSec = Math.floor(Date.now() / 1000); const effectiveNowSec = nowSec - this.runDelaySec; const { fromSec, toSec } = this.prevHourWindow(effectiveNowSec); const tag = `⏰ Hourly aggregation [${fromSec},${toSec})`; this.logger.log(`${tag} start`); try { // Preferred path: exact hour rerun (idempotent) if (this.hasRerunRange(this.statsAggregation)) { await this.statsAggregation.rerunRange({ fromSec, toSec, dryRun: false, }); this.logger.log(`${tag} rerunRange done`); } else { // Backward-compatible fallback const result = (await this.statsAggregation.aggregateAdsStats({ windowDays: this.windowDays, })) as AggregationResult; this.logger.warn( `${tag} used fallback aggregateAdsStats(windowDays=${ this.windowDays ?? 'all time' }) (${result.successCount}/${result.totalProcessed} updated, ${result.errorCount} errors)`, ); } // Optional daily refresh triggered after hourly if (this.hasDailyRefresh(this.statsAggregation)) { await this.statsAggregation.refreshDailyDerivedFromHourly({ fromSec, toSec, }); this.logger.log(`${tag} daily refresh done`); } const ms = Date.now() - startedAt; this.logger.log(`${tag} ✅ done in ${ms}ms`); } catch (err: any) { const ms = Date.now() - startedAt; this.logger.error( `${tag} ❌ failed after ${ms}ms: ${err?.message || String(err)}`, err?.stack, ); } finally { this.runningHourly = false; } } /** * Daily refresh is derived; safe to run once a day as a "catch-all". * 00:10 (GMT+8 business) — note: Cron uses server TZ; adjust if needed. */ @Cron('10 0 * * *', { name: 'stats-aggregation-daily' }) async runDailyCatchAll(): Promise { if (!this.enabled) return; if (this.runningDaily) { this.logger.warn( '⏭️ Skip daily catch-all: previous run still in progress', ); return; } if (!this.hasDailyRefresh(this.statsAggregation)) { // No-op if your service doesn’t support daily refresh yet this.logger.warn( 'ℹ️ Daily catch-all skipped: refreshDailyDerivedFromHourly not implemented', ); return; } this.runningDaily = true; const startedAt = Date.now(); // Refresh “yesterday” (GMT+8 aligned) to catch late events const nowSec = Math.floor(Date.now() / 1000); const yesterdayStartSec = this.floorToDayGmt8(nowSec) - 86400; const yesterdayEndSec = yesterdayStartSec + 86400; const tag = `🗓️ Daily catch-all (GMT+8) [${yesterdayStartSec},${yesterdayEndSec})`; this.logger.log(`${tag} start`); try { await this.statsAggregation.refreshDailyDerivedFromHourly({ fromSec: yesterdayStartSec, toSec: yesterdayEndSec, }); const ms = Date.now() - startedAt; this.logger.log(`${tag} ✅ done in ${ms}ms`); } catch (err: any) { const ms = Date.now() - startedAt; this.logger.error( `${tag} ❌ failed after ${ms}ms: ${err?.message || String(err)}`, err?.stack, ); } finally { this.runningDaily = false; } } private prevHourWindow(nowSec: number): { fromSec: number; toSec: number } { const endSec = this.floorToHour(nowSec); return { fromSec: endSec - 3600, toSec: endSec }; } private floorToHour(sec: number): number { return sec - (sec % 3600); } /** * GMT+8 day bucket start in UTC seconds: * shift +8h, floor to day, shift back */ private floorToDayGmt8(secUtc: number): number { const shift = 8 * 3600; const shifted = secUtc + shift; const dayStartShifted = shifted - (shifted % 86400); return dayStartShifted - shift; } private hasRerunRange( svc: StatsAggregationService, ): svc is StatsAggregationService & RerunRangeCapable { return typeof (svc as any)?.rerunRange === 'function'; } private hasDailyRefresh( svc: StatsAggregationService, ): svc is StatsAggregationService & DailyRefreshCapable { return typeof (svc as any)?.refreshDailyDerivedFromHourly === 'function'; } }