|
@@ -34,9 +34,10 @@ export class StatsAggregationScheduler implements OnModuleInit {
|
|
|
// guardrails: avoid overlapping runs + spam
|
|
// guardrails: avoid overlapping runs + spam
|
|
|
private runningHourly = false;
|
|
private runningHourly = false;
|
|
|
private runningDaily = false;
|
|
private runningDaily = false;
|
|
|
|
|
+ private lastHourlyWindowToSec?: number;
|
|
|
|
|
|
|
|
// run a little after boundary so late events can land
|
|
// run a little after boundary so late events can land
|
|
|
- private runDelaySec = 90;
|
|
|
|
|
|
|
+ private runDelaySec = 300;
|
|
|
|
|
|
|
|
constructor(
|
|
constructor(
|
|
|
private readonly configService: ConfigService,
|
|
private readonly configService: ConfigService,
|
|
@@ -86,7 +87,7 @@ export class StatsAggregationScheduler implements OnModuleInit {
|
|
|
this.logger.log(
|
|
this.logger.log(
|
|
|
`📊 Stats aggregation scheduler enabled (windowDays=${
|
|
`📊 Stats aggregation scheduler enabled (windowDays=${
|
|
|
this.windowDays ?? 'all time'
|
|
this.windowDays ?? 'all time'
|
|
|
- }, hourly=${CronExpression.EVERY_HOUR}, daily=00:10, delaySec=${this.runDelaySec})`,
|
|
|
|
|
|
|
+ }, hourly=${CronExpression.EVERY_5_MINUTES}, daily=00:10, delaySec=${this.runDelaySec})`,
|
|
|
);
|
|
);
|
|
|
} else {
|
|
} else {
|
|
|
this.logger.warn(
|
|
this.logger.warn(
|
|
@@ -99,7 +100,7 @@ export class StatsAggregationScheduler implements OnModuleInit {
|
|
|
* Hourly is the unit of truth.
|
|
* Hourly is the unit of truth.
|
|
|
* Runs slightly after the hour, but still processes the previous full hour window.
|
|
* Runs slightly after the hour, but still processes the previous full hour window.
|
|
|
*/
|
|
*/
|
|
|
- @Cron(CronExpression.EVERY_HOUR, { name: 'stats-aggregation-hourly' })
|
|
|
|
|
|
|
+ @Cron(CronExpression.EVERY_5_MINUTES, { name: 'stats-aggregation-hourly' })
|
|
|
async runHourly(): Promise<void> {
|
|
async runHourly(): Promise<void> {
|
|
|
if (!this.enabled) return;
|
|
if (!this.enabled) return;
|
|
|
|
|
|
|
@@ -114,9 +115,26 @@ export class StatsAggregationScheduler implements OnModuleInit {
|
|
|
const startedAt = Date.now();
|
|
const startedAt = Date.now();
|
|
|
|
|
|
|
|
const nowSec = Math.floor(Date.now() / 1000);
|
|
const nowSec = Math.floor(Date.now() / 1000);
|
|
|
- const effectiveNowSec = nowSec - this.runDelaySec;
|
|
|
|
|
|
|
+ const currentHourStartSec = this.floorToHourGmt8(nowSec);
|
|
|
|
|
+ const gateOpenSec = currentHourStartSec + this.runDelaySec;
|
|
|
|
|
|
|
|
- const toSec = this.floorToHour(effectiveNowSec);
|
|
|
|
|
|
|
+ if (nowSec < gateOpenSec) {
|
|
|
|
|
+ this.logger.debug(
|
|
|
|
|
+ `⏰ Hourly aggregation gate not yet open (now=${nowSec}, openAt=${gateOpenSec}, delaySec=${this.runDelaySec}).`,
|
|
|
|
|
+ );
|
|
|
|
|
+ this.runningHourly = false;
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ if (this.lastHourlyWindowToSec === currentHourStartSec) {
|
|
|
|
|
+ this.logger.debug(
|
|
|
|
|
+ `⏰ Hourly aggregation already processed window [${currentHourStartSec - 3600},${currentHourStartSec}) this hour.`,
|
|
|
|
|
+ );
|
|
|
|
|
+ this.runningHourly = false;
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ const toSec = currentHourStartSec;
|
|
|
const fromSec = toSec - 3600;
|
|
const fromSec = toSec - 3600;
|
|
|
const tag = `⏰ Hourly aggregation [${fromSec},${toSec})`;
|
|
const tag = `⏰ Hourly aggregation [${fromSec},${toSec})`;
|
|
|
|
|
|
|
@@ -153,6 +171,8 @@ export class StatsAggregationScheduler implements OnModuleInit {
|
|
|
this.logger.log(`${tag} daily refresh done`);
|
|
this.logger.log(`${tag} daily refresh done`);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+ this.lastHourlyWindowToSec = toSec;
|
|
|
|
|
+
|
|
|
const ms = Date.now() - startedAt;
|
|
const ms = Date.now() - startedAt;
|
|
|
this.logger.log(`${tag} ✅ done in ${ms}ms`);
|
|
this.logger.log(`${tag} ✅ done in ${ms}ms`);
|
|
|
} catch (err: any) {
|
|
} catch (err: any) {
|
|
@@ -221,6 +241,13 @@ export class StatsAggregationScheduler implements OnModuleInit {
|
|
|
return sec - (sec % 3600);
|
|
return sec - (sec % 3600);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+ private floorToHourGmt8(secUtc: number): number {
|
|
|
|
|
+ const shift = 8 * 3600;
|
|
|
|
|
+ const shifted = secUtc + shift;
|
|
|
|
|
+ const hourStartShifted = shifted - (shifted % 3600);
|
|
|
|
|
+ return hourStartShifted - shift;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
/**
|
|
/**
|
|
|
* GMT+8 day bucket start in UTC seconds:
|
|
* GMT+8 day bucket start in UTC seconds:
|
|
|
* shift +8h, floor to day, shift back
|
|
* shift +8h, floor to day, shift back
|