| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252 |
- import { Injectable, Logger, OnModuleInit } from '@nestjs/common';
- import { Cron, CronExpression } from '@nestjs/schedule';
- import { ConfigService } from '@nestjs/config';
- import { StatsAggregationService } from './stats-aggregation.service';
- type AggregationResult = {
- successCount: number;
- totalProcessed: number;
- errorCount: number;
- };
- type RerunRangeCapable = {
- rerunRange: (args: {
- fromSec: number;
- toSec: number;
- dryRun?: boolean;
- }) => Promise<unknown>;
- };
- type DailyRefreshCapable = {
- refreshDailyDerivedFromHourly: (args: {
- fromSec: number;
- toSec: number;
- }) => Promise<void>;
- };
- @Injectable()
- export class StatsAggregationScheduler implements OnModuleInit {
- private readonly logger = new Logger(StatsAggregationScheduler.name);
- private enabled = true;
- private windowDays?: number;
- // guardrails: avoid overlapping runs + spam
- private runningHourly = false;
- private runningDaily = false;
- // run a little after boundary so late events can land
- private runDelaySec = 90;
- constructor(
- private readonly configService: ConfigService,
- private readonly statsAggregation: StatsAggregationService,
- ) {}
- onModuleInit(): void {
- const enabledRaw = this.configService
- .get<string>('STATS_AGGREGATION_ENABLED')
- ?.trim()
- .toLowerCase();
- this.enabled = !['false', '0', 'off', 'no'].includes(enabledRaw ?? '');
- const daysRaw = this.configService
- .get<string>('STATS_AGGREGATION_WINDOW_DAYS')
- ?.trim();
- if (daysRaw) {
- const parsed = Number.parseInt(daysRaw, 10);
- if (Number.isFinite(parsed) && parsed > 0) {
- this.windowDays = parsed;
- } else {
- this.logger.warn(
- `Invalid STATS_AGGREGATION_WINDOW_DAYS="${daysRaw}" (expected positive integer). Falling back to "all time".`,
- );
- this.windowDays = undefined;
- }
- }
- const delayRaw = this.configService
- .get<string>('STATS_AGGREGATION_RUN_DELAY_SEC')
- ?.trim();
- if (delayRaw) {
- const parsed = Number.parseInt(delayRaw, 10);
- if (Number.isFinite(parsed) && parsed >= 0 && parsed <= 900) {
- this.runDelaySec = parsed;
- } else {
- this.logger.warn(
- `Invalid STATS_AGGREGATION_RUN_DELAY_SEC="${delayRaw}" (expected 0..900). Using default ${this.runDelaySec}s.`,
- );
- }
- }
- if (this.enabled) {
- this.logger.log(
- `📊 Stats aggregation scheduler enabled (windowDays=${
- this.windowDays ?? 'all time'
- }, hourly=${CronExpression.EVERY_HOUR}, daily=00:10, delaySec=${this.runDelaySec})`,
- );
- } else {
- this.logger.warn(
- `📊 Stats aggregation scheduler DISABLED (STATS_AGGREGATION_ENABLED="${enabledRaw ?? ''}")`,
- );
- }
- }
- /**
- * Hourly is the unit of truth.
- * Runs slightly after the hour, but still processes the previous full hour window.
- */
- @Cron(CronExpression.EVERY_HOUR, { name: 'stats-aggregation-hourly' })
- async runHourly(): Promise<void> {
- if (!this.enabled) return;
- if (this.runningHourly) {
- this.logger.warn(
- '⏭️ Skip hourly aggregation: previous run still in progress',
- );
- return;
- }
- this.runningHourly = true;
- const startedAt = Date.now();
- const nowSec = Math.floor(Date.now() / 1000);
- const effectiveNowSec = nowSec - this.runDelaySec;
- const { fromSec, toSec } = this.prevHourWindow(effectiveNowSec);
- const tag = `⏰ Hourly aggregation [${fromSec},${toSec})`;
- this.logger.log(`${tag} start`);
- try {
- // Preferred path: exact hour rerun (idempotent)
- if (this.hasRerunRange(this.statsAggregation)) {
- await this.statsAggregation.rerunRange({
- fromSec,
- toSec,
- dryRun: false,
- });
- this.logger.log(`${tag} rerunRange done`);
- } else {
- // Backward-compatible fallback
- const result = (await this.statsAggregation.aggregateAdsStats({
- windowDays: this.windowDays,
- })) as AggregationResult;
- this.logger.warn(
- `${tag} used fallback aggregateAdsStats(windowDays=${
- this.windowDays ?? 'all time'
- }) (${result.successCount}/${result.totalProcessed} updated, ${result.errorCount} errors)`,
- );
- }
- // Optional daily refresh triggered after hourly
- if (this.hasDailyRefresh(this.statsAggregation)) {
- await this.statsAggregation.refreshDailyDerivedFromHourly({
- fromSec,
- toSec,
- });
- this.logger.log(`${tag} daily refresh done`);
- }
- const ms = Date.now() - startedAt;
- this.logger.log(`${tag} ✅ done in ${ms}ms`);
- } catch (err: any) {
- const ms = Date.now() - startedAt;
- this.logger.error(
- `${tag} ❌ failed after ${ms}ms: ${err?.message || String(err)}`,
- err?.stack,
- );
- } finally {
- this.runningHourly = false;
- }
- }
- /**
- * Daily refresh is derived; safe to run once a day as a "catch-all".
- * 00:10 (GMT+8 business) — note: Cron uses server TZ; adjust if needed.
- */
- @Cron('10 0 * * *', { name: 'stats-aggregation-daily' })
- async runDailyCatchAll(): Promise<void> {
- if (!this.enabled) return;
- if (this.runningDaily) {
- this.logger.warn(
- '⏭️ Skip daily catch-all: previous run still in progress',
- );
- return;
- }
- if (!this.hasDailyRefresh(this.statsAggregation)) {
- // No-op if your service doesn’t support daily refresh yet
- this.logger.warn(
- 'ℹ️ Daily catch-all skipped: refreshDailyDerivedFromHourly not implemented',
- );
- return;
- }
- this.runningDaily = true;
- const startedAt = Date.now();
- // Refresh “yesterday” (GMT+8 aligned) to catch late events
- const nowSec = Math.floor(Date.now() / 1000);
- const yesterdayStartSec = this.floorToDayGmt8(nowSec) - 86400;
- const yesterdayEndSec = yesterdayStartSec + 86400;
- const tag = `🗓️ Daily catch-all (GMT+8) [${yesterdayStartSec},${yesterdayEndSec})`;
- this.logger.log(`${tag} start`);
- try {
- await this.statsAggregation.refreshDailyDerivedFromHourly({
- fromSec: yesterdayStartSec,
- toSec: yesterdayEndSec,
- });
- const ms = Date.now() - startedAt;
- this.logger.log(`${tag} ✅ done in ${ms}ms`);
- } catch (err: any) {
- const ms = Date.now() - startedAt;
- this.logger.error(
- `${tag} ❌ failed after ${ms}ms: ${err?.message || String(err)}`,
- err?.stack,
- );
- } finally {
- this.runningDaily = false;
- }
- }
- private prevHourWindow(nowSec: number): { fromSec: number; toSec: number } {
- const endSec = this.floorToHour(nowSec);
- return { fromSec: endSec - 3600, toSec: endSec };
- }
- private floorToHour(sec: number): number {
- return sec - (sec % 3600);
- }
- /**
- * GMT+8 day bucket start in UTC seconds:
- * shift +8h, floor to day, shift back
- */
- private floorToDayGmt8(secUtc: number): number {
- const shift = 8 * 3600;
- const shifted = secUtc + shift;
- const dayStartShifted = shifted - (shifted % 86400);
- return dayStartShifted - shift;
- }
- private hasRerunRange(
- svc: StatsAggregationService,
- ): svc is StatsAggregationService & RerunRangeCapable {
- return typeof (svc as any)?.rerunRange === 'function';
- }
- private hasDailyRefresh(
- svc: StatsAggregationService,
- ): svc is StatsAggregationService & DailyRefreshCapable {
- return typeof (svc as any)?.refreshDailyDerivedFromHourly === 'function';
- }
- }
|