瀏覽代碼

feat(stats-reporting): add StatsAdsCacheService for managing ad statistics cache

Dave 1 月之前
父節點
當前提交
eb016321c6
共有 2 個文件被更改,包括 98 次插入1 次删除
  1. 2 1
      apps/box-mgnt-api/src/app.module.ts
  2. 96 0
      apps/box-mgnt-api/src/cache-sync/stats-ads-cache.service.ts

+ 2 - 1
apps/box-mgnt-api/src/app.module.ts

@@ -18,6 +18,7 @@ import { CacheSyncModule } from './cache-sync/cache-sync.module';
 import { DevVideoCacheModule } from './dev/dev-video-cache.module';
 import { RedisModule } from '@box/db/redis/redis.module';
 import { CoreModule } from '@box/core/core.module';
+import { StatsAdsCacheService } from './cache-sync/stats-ads-cache.service';
 import path from 'path';
 
 /**
@@ -61,7 +62,7 @@ const isProd = process.env.NODE_ENV === 'production';
       http: process.env.NODE_ENV === 'development',
     }),
   ],
-  providers: [],
+  providers: [StatsAdsCacheService],
 })
 export class AppModule implements OnModuleInit {
   onModuleInit() {

+ 96 - 0
apps/box-mgnt-api/src/cache-sync/stats-ads-cache.service.ts

@@ -0,0 +1,96 @@
+import { Inject, Injectable, Logger, OnModuleInit } from '@nestjs/common';
+import { ConfigService } from '@nestjs/config';
+import { Redis } from 'ioredis';
+import { MongoPrismaService } from '@box/db/prisma/mongo-prisma.service';
+import { REDIS_CLIENT } from '@box/db/redis/redis.constants';
+
+@Injectable()
+export class StatsAdsCacheService implements OnModuleInit {
+  private readonly logger = new Logger(StatsAdsCacheService.name);
+  private readonly cacheKey = 'box:stats:ads';
+  private readonly placeholderField = '__stats_ads_empty__';
+
+  constructor(
+    private readonly configService: ConfigService,
+    private readonly mongoPrisma: MongoPrismaService,
+    @Inject(REDIS_CLIENT)
+    private readonly redisClient: Redis,
+  ) {}
+
+  async onModuleInit(): Promise<void> {
+    try {
+      await this.rebuildCache();
+    } catch (error) {
+      this.logger.error(
+        'Failed to build box:stats:ads cache on startup',
+        error instanceof Error ? error.stack : String(error),
+      );
+      throw error;
+    }
+  }
+
+  private getTtlSeconds(): number {
+    const raw = this.configService.get<string>('STATS_ADS_HASH_TTL_SECONDS');
+    if (!raw) return 0;
+    const parsed = Number(raw);
+    if (!Number.isFinite(parsed) || parsed <= 0) {
+      return 0;
+    }
+    return Math.floor(parsed);
+  }
+
+  private async rebuildCache(): Promise<void> {
+    // This hash exists so the read-only box-stats-api pipeline can resolve adType/adId without any writes or rebuild triggers.
+    const ads = await this.mongoPrisma.ads.findMany({
+      select: {
+        id: true,
+        adId: true,
+        adType: true,
+      },
+    });
+
+    const ttlSeconds = this.getTtlSeconds();
+    const payloads: Record<string, string> = {};
+
+    for (const ad of ads) {
+      payloads[ad.id] = JSON.stringify({
+        adType: ad.adType,
+        adId: ad.adId,
+      });
+    }
+
+    const tempKey = `${this.cacheKey}:tmp:${Date.now()}:${Math.random()
+      .toString(36)
+      .slice(2)}`;
+
+    // StatsEventsConsumer is intentionally read-only (see docs/stats-consumer-enrichment-flow.md) so we own every write to this key.
+    const pipeline = this.redisClient.pipeline();
+    pipeline.del(tempKey);
+
+    if (Object.keys(payloads).length > 0) {
+      pipeline.hset(tempKey, payloads);
+    } else {
+      pipeline.hset(tempKey, this.placeholderField, '1');
+    }
+
+    pipeline.rename(tempKey, this.cacheKey);
+    pipeline.hdel(this.cacheKey, this.placeholderField);
+
+    if (ttlSeconds > 0) {
+      pipeline.expire(this.cacheKey, ttlSeconds);
+    }
+
+    const results = await pipeline.exec();
+    const firstError = results.find(([error]) => error instanceof Error) as
+      | [Error, unknown]
+      | undefined;
+
+    if (firstError) {
+      throw firstError[0];
+    }
+
+    this.logger.log(
+      `Rebuilt ${this.cacheKey} hash with ${ads.length} entries (ttl=${ttlSeconds})`,
+    );
+  }
+}