stats-aggregation.scheduler.ts 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252
  1. import { Injectable, Logger, OnModuleInit } from '@nestjs/common';
  2. import { Cron, CronExpression } from '@nestjs/schedule';
  3. import { ConfigService } from '@nestjs/config';
  4. import { StatsAggregationService } from './stats-aggregation.service';
  5. type AggregationResult = {
  6. successCount: number;
  7. totalProcessed: number;
  8. errorCount: number;
  9. };
  10. type RerunRangeCapable = {
  11. rerunRange: (args: {
  12. fromSec: number;
  13. toSec: number;
  14. dryRun?: boolean;
  15. }) => Promise<unknown>;
  16. };
  17. type DailyRefreshCapable = {
  18. refreshDailyDerivedFromHourly: (args: {
  19. fromSec: number;
  20. toSec: number;
  21. }) => Promise<void>;
  22. };
  23. @Injectable()
  24. export class StatsAggregationScheduler implements OnModuleInit {
  25. private readonly logger = new Logger(StatsAggregationScheduler.name);
  26. private enabled = true;
  27. private windowDays?: number;
  28. // guardrails: avoid overlapping runs + spam
  29. private runningHourly = false;
  30. private runningDaily = false;
  31. // run a little after boundary so late events can land
  32. private runDelaySec = 90;
  33. constructor(
  34. private readonly configService: ConfigService,
  35. private readonly statsAggregation: StatsAggregationService,
  36. ) {}
  37. onModuleInit(): void {
  38. const enabledRaw = this.configService
  39. .get<string>('STATS_AGGREGATION_ENABLED')
  40. ?.trim()
  41. .toLowerCase();
  42. this.enabled = !['false', '0', 'off', 'no'].includes(enabledRaw ?? '');
  43. const daysRaw = this.configService
  44. .get<string>('STATS_AGGREGATION_WINDOW_DAYS')
  45. ?.trim();
  46. if (daysRaw) {
  47. const parsed = Number.parseInt(daysRaw, 10);
  48. if (Number.isFinite(parsed) && parsed > 0) {
  49. this.windowDays = parsed;
  50. } else {
  51. this.logger.warn(
  52. `Invalid STATS_AGGREGATION_WINDOW_DAYS="${daysRaw}" (expected positive integer). Falling back to "all time".`,
  53. );
  54. this.windowDays = undefined;
  55. }
  56. }
  57. const delayRaw = this.configService
  58. .get<string>('STATS_AGGREGATION_RUN_DELAY_SEC')
  59. ?.trim();
  60. if (delayRaw) {
  61. const parsed = Number.parseInt(delayRaw, 10);
  62. if (Number.isFinite(parsed) && parsed >= 0 && parsed <= 900) {
  63. this.runDelaySec = parsed;
  64. } else {
  65. this.logger.warn(
  66. `Invalid STATS_AGGREGATION_RUN_DELAY_SEC="${delayRaw}" (expected 0..900). Using default ${this.runDelaySec}s.`,
  67. );
  68. }
  69. }
  70. if (this.enabled) {
  71. this.logger.log(
  72. `📊 Stats aggregation scheduler enabled (windowDays=${
  73. this.windowDays ?? 'all time'
  74. }, hourly=${CronExpression.EVERY_HOUR}, daily=00:10, delaySec=${this.runDelaySec})`,
  75. );
  76. } else {
  77. this.logger.warn(
  78. `📊 Stats aggregation scheduler DISABLED (STATS_AGGREGATION_ENABLED="${enabledRaw ?? ''}")`,
  79. );
  80. }
  81. }
  82. /**
  83. * Hourly is the unit of truth.
  84. * Runs slightly after the hour, but still processes the previous full hour window.
  85. */
  86. @Cron(CronExpression.EVERY_HOUR, { name: 'stats-aggregation-hourly' })
  87. async runHourly(): Promise<void> {
  88. if (!this.enabled) return;
  89. if (this.runningHourly) {
  90. this.logger.warn(
  91. '⏭️ Skip hourly aggregation: previous run still in progress',
  92. );
  93. return;
  94. }
  95. this.runningHourly = true;
  96. const startedAt = Date.now();
  97. const nowSec = Math.floor(Date.now() / 1000);
  98. const effectiveNowSec = nowSec - this.runDelaySec;
  99. const { fromSec, toSec } = this.prevHourWindow(effectiveNowSec);
  100. const tag = `⏰ Hourly aggregation [${fromSec},${toSec})`;
  101. this.logger.log(`${tag} start`);
  102. try {
  103. // Preferred path: exact hour rerun (idempotent)
  104. if (this.hasRerunRange(this.statsAggregation)) {
  105. await this.statsAggregation.rerunRange({
  106. fromSec,
  107. toSec,
  108. dryRun: false,
  109. });
  110. this.logger.log(`${tag} rerunRange done`);
  111. } else {
  112. // Backward-compatible fallback
  113. const result = (await this.statsAggregation.aggregateAdsStats({
  114. windowDays: this.windowDays,
  115. })) as AggregationResult;
  116. this.logger.warn(
  117. `${tag} used fallback aggregateAdsStats(windowDays=${
  118. this.windowDays ?? 'all time'
  119. }) (${result.successCount}/${result.totalProcessed} updated, ${result.errorCount} errors)`,
  120. );
  121. }
  122. // Optional daily refresh triggered after hourly
  123. if (this.hasDailyRefresh(this.statsAggregation)) {
  124. await this.statsAggregation.refreshDailyDerivedFromHourly({
  125. fromSec,
  126. toSec,
  127. });
  128. this.logger.log(`${tag} daily refresh done`);
  129. }
  130. const ms = Date.now() - startedAt;
  131. this.logger.log(`${tag} ✅ done in ${ms}ms`);
  132. } catch (err: any) {
  133. const ms = Date.now() - startedAt;
  134. this.logger.error(
  135. `${tag} ❌ failed after ${ms}ms: ${err?.message || String(err)}`,
  136. err?.stack,
  137. );
  138. } finally {
  139. this.runningHourly = false;
  140. }
  141. }
  142. /**
  143. * Daily refresh is derived; safe to run once a day as a "catch-all".
  144. * 00:10 (GMT+8 business) — note: Cron uses server TZ; adjust if needed.
  145. */
  146. @Cron('10 0 * * *', { name: 'stats-aggregation-daily' })
  147. async runDailyCatchAll(): Promise<void> {
  148. if (!this.enabled) return;
  149. if (this.runningDaily) {
  150. this.logger.warn(
  151. '⏭️ Skip daily catch-all: previous run still in progress',
  152. );
  153. return;
  154. }
  155. if (!this.hasDailyRefresh(this.statsAggregation)) {
  156. // No-op if your service doesn’t support daily refresh yet
  157. this.logger.warn(
  158. 'ℹ️ Daily catch-all skipped: refreshDailyDerivedFromHourly not implemented',
  159. );
  160. return;
  161. }
  162. this.runningDaily = true;
  163. const startedAt = Date.now();
  164. // Refresh “yesterday” (GMT+8 aligned) to catch late events
  165. const nowSec = Math.floor(Date.now() / 1000);
  166. const yesterdayStartSec = this.floorToDayGmt8(nowSec) - 86400;
  167. const yesterdayEndSec = yesterdayStartSec + 86400;
  168. const tag = `🗓️ Daily catch-all (GMT+8) [${yesterdayStartSec},${yesterdayEndSec})`;
  169. this.logger.log(`${tag} start`);
  170. try {
  171. await this.statsAggregation.refreshDailyDerivedFromHourly({
  172. fromSec: yesterdayStartSec,
  173. toSec: yesterdayEndSec,
  174. });
  175. const ms = Date.now() - startedAt;
  176. this.logger.log(`${tag} ✅ done in ${ms}ms`);
  177. } catch (err: any) {
  178. const ms = Date.now() - startedAt;
  179. this.logger.error(
  180. `${tag} ❌ failed after ${ms}ms: ${err?.message || String(err)}`,
  181. err?.stack,
  182. );
  183. } finally {
  184. this.runningDaily = false;
  185. }
  186. }
  187. private prevHourWindow(nowSec: number): { fromSec: number; toSec: number } {
  188. const endSec = this.floorToHour(nowSec);
  189. return { fromSec: endSec - 3600, toSec: endSec };
  190. }
  191. private floorToHour(sec: number): number {
  192. return sec - (sec % 3600);
  193. }
  194. /**
  195. * GMT+8 day bucket start in UTC seconds:
  196. * shift +8h, floor to day, shift back
  197. */
  198. private floorToDayGmt8(secUtc: number): number {
  199. const shift = 8 * 3600;
  200. const shifted = secUtc + shift;
  201. const dayStartShifted = shifted - (shifted % 86400);
  202. return dayStartShifted - shift;
  203. }
  204. private hasRerunRange(
  205. svc: StatsAggregationService,
  206. ): svc is StatsAggregationService & RerunRangeCapable {
  207. return typeof (svc as any)?.rerunRange === 'function';
  208. }
  209. private hasDailyRefresh(
  210. svc: StatsAggregationService,
  211. ): svc is StatsAggregationService & DailyRefreshCapable {
  212. return typeof (svc as any)?.refreshDailyDerivedFromHourly === 'function';
  213. }
  214. }