import { Injectable, Logger, OnModuleInit } from '@nestjs/common'; import { Cron, CronExpression } from '@nestjs/schedule'; import { ConfigService } from '@nestjs/config'; import { StatsAggregationService } from './stats-aggregation.service'; type AggregationResult = { successCount: number; totalProcessed: number; errorCount: number; }; @Injectable() export class StatsAggregationScheduler implements OnModuleInit { private readonly logger = new Logger(StatsAggregationScheduler.name); private enabled = true; private windowDays?: number; // guardrails: avoid overlapping runs + spam private runningAds = false; private runningVideo = false; constructor( private readonly configService: ConfigService, private readonly statsAggregation: StatsAggregationService, ) {} onModuleInit(): void { // Evaluate config on module init (safer than constructor for startup ordering) const enabledRaw = this.configService .get('STATS_AGGREGATION_ENABLED') ?.trim() .toLowerCase(); // default: enabled (unless explicitly "false" or "0" etc.) this.enabled = !['false', '0', 'off', 'no'].includes(enabledRaw ?? ''); const daysRaw = this.configService .get('STATS_AGGREGATION_WINDOW_DAYS') ?.trim(); if (daysRaw) { const parsed = Number.parseInt(daysRaw, 10); if (Number.isFinite(parsed) && parsed > 0) { this.windowDays = parsed; } else { this.logger.warn( `Invalid STATS_AGGREGATION_WINDOW_DAYS="${daysRaw}" (expected positive integer). Falling back to "all time".`, ); this.windowDays = undefined; } } if (this.enabled) { this.logger.log( `📊 Stats aggregation scheduler enabled (windowDays=${ this.windowDays ?? 'all time' }, interval=${CronExpression.EVERY_5_MINUTES})`, ); } else { this.logger.warn( `📊 Stats aggregation scheduler DISABLED (STATS_AGGREGATION_ENABLED="${enabledRaw ?? ''}")`, ); } } @Cron(CronExpression.EVERY_5_MINUTES, { name: 'stats-aggregation-ads' }) async runAdsAggregation(): Promise { if (!this.enabled) return; if (this.runningAds) { this.logger.warn( '⏭️ Skip ads aggregation: previous run still in progress', ); return; } this.runningAds = true; const start = Date.now(); this.logger.log( `⏰ Ads aggregation start (windowDays=${this.windowDays ?? 'all time'})`, ); try { const result = (await this.statsAggregation.aggregateAdsStats({ windowDays: this.windowDays, })) as AggregationResult; const ms = Date.now() - start; this.logger.log( `✅ Ads aggregation done in ${ms}ms (${result.successCount}/${result.totalProcessed} updated, ${result.errorCount} errors)`, ); } catch (err) { const ms = Date.now() - start; this.logger.error( `❌ Ads aggregation failed after ${ms}ms: ${ err instanceof Error ? err.message : String(err) }`, err instanceof Error ? err.stack : undefined, ); } finally { this.runningAds = false; } } @Cron(CronExpression.EVERY_5_MINUTES, { name: 'stats-aggregation-video' }) async runVideoAggregation(): Promise { if (!this.enabled) return; if (this.runningVideo) { this.logger.warn( '⏭️ Skip video aggregation: previous run still in progress', ); return; } this.runningVideo = true; const start = Date.now(); this.logger.log( `⏰ Video aggregation start (windowDays=${this.windowDays ?? 'all time'})`, ); try { const result = (await this.statsAggregation.aggregateVideoStats({ windowDays: this.windowDays, })) as AggregationResult; const ms = Date.now() - start; this.logger.log( `✅ Video aggregation done in ${ms}ms (${result.successCount}/${result.totalProcessed} updated, ${result.errorCount} errors)`, ); } catch (err) { const ms = Date.now() - start; this.logger.error( `❌ Video aggregation failed after ${ms}ms: ${ err instanceof Error ? err.message : String(err) }`, err instanceof Error ? err.stack : undefined, ); } finally { this.runningVideo = false; } } }