Bläddra i källkod

feat(cache): add management API base URL and implement ad cache warmup service

Dave 3 månader sedan
förälder
incheckning
30e50d1567

+ 2 - 0
.env.app

@@ -29,6 +29,8 @@ APP_HOST=0.0.0.0
 APP_PORT=3301
 APP_CORS_ORIGIN=*
 
+# Mgnt API for cache sync notifications
+MGNT_API_BASE_URL=http://localhost:3300
 
 # JWT
 JWT_SECRET=047df8aaa3d17dc1173c5a9a3052ba66c2b0bd96937147eb643319a0c90d132f

+ 2 - 0
.env.app.dev

@@ -28,6 +28,8 @@ APP_HOST=0.0.0.0
 APP_PORT=3301
 APP_CORS_ORIGIN=*
 
+# Mgnt API for cache sync notifications
+MGNT_API_BASE_URL=http://localhost:3300
 
 # JWT
 JWT_SECRET=047df8aaa3d17dc1173c5a9a3052ba66c2b0bd96937147eb643319a0c90d132f

+ 2 - 0
.env.app.test

@@ -29,6 +29,8 @@ APP_HOST=0.0.0.0
 APP_PORT=3301
 APP_CORS_ORIGIN=*
 
+# Mgnt API for cache sync notifications
+MGNT_API_BASE_URL=http://localhost:3300
 
 # JWT
 JWT_SECRET=047df8aaa3d17dc1173c5a9a3052ba66c2b0bd96937147eb643319a0c90d132f

+ 2 - 0
apps/box-app-api/src/feature/ads/ad.module.ts

@@ -1,5 +1,6 @@
 // apps/box-app-api/src/feature/ads/ad.module.ts
 import { Module } from '@nestjs/common';
+import { HttpModule } from '@nestjs/axios';
 import { RedisModule } from '@box/db/redis/redis.module';
 import { SharedModule } from '@box/db/shared.module';
 import { AdService } from './ad.service';
@@ -9,6 +10,7 @@ import { RabbitmqModule } from '../../rabbitmq/rabbitmq.module';
 
 @Module({
   imports: [
+    HttpModule, // 👈 for notifying mgnt-api cache sync
     RedisModule, // 👈 make RedisService available here
     SharedModule, // 👈 make MongoPrismaService available here
     AuthModule, // 👈 make JwtAuthGuard available here

+ 110 - 9
apps/box-app-api/src/feature/ads/ad.service.ts

@@ -1,5 +1,7 @@
 // apps/box-app-api/src/feature/ads/ad.service.ts
 import { Injectable, Logger, NotFoundException } from '@nestjs/common';
+import { ConfigService } from '@nestjs/config';
+import { HttpService } from '@nestjs/axios';
 import { RedisService } from '@box/db/redis/redis.service';
 import { MongoPrismaService } from '@box/db/prisma/mongo-prisma.service';
 import { CacheKeys } from '@box/common/cache/cache-keys';
@@ -58,12 +60,20 @@ export interface GetAdForPlacementParams {
 @Injectable()
 export class AdService {
   private readonly logger = new Logger(AdService.name);
+  private readonly mgntApiBaseUrl: string;
 
   constructor(
     private readonly redis: RedisService,
     private readonly mongoPrisma: MongoPrismaService,
     private readonly rabbitmqPublisher: RabbitmqPublisherService,
-  ) {}
+    private readonly configService: ConfigService,
+    private readonly httpService: HttpService,
+  ) {
+    // Get mgnt-api base URL for cache rebuild notifications
+    this.mgntApiBaseUrl =
+      this.configService.get<string>('MGNT_API_BASE_URL') ||
+      'http://localhost:3300';
+  }
 
   /**
    * Core method for app-api:
@@ -364,9 +374,9 @@ export class AdService {
   }
 
   /**
-   * Get an ad by ID and validate it's enabled and within date range.
-   * Returns the ad with its relationships (channel, adsModule) loaded.
-   * Returns null if ad is not found, disabled, or outside date range.
+   * Get an ad by ID from Redis cache with MongoDB fallback.
+   * Returns the cached ad data if found in Redis, otherwise queries MongoDB.
+   * Note: Cache validation (status, date range) is handled during cache rebuild or query.
    */
   async getAdByIdValidated(adsId: string): Promise<{
     id: string;
@@ -377,9 +387,31 @@ export class AdService {
     advertiser: string;
     title: string;
   } | null> {
-    const now = BigInt(Date.now());
+    const adKey = CacheKeys.appAdById(adsId);
 
     try {
+      // Try Redis cache first
+      const cachedAd = await this.redis.getJson<CachedAd | null>(adKey);
+
+      if (cachedAd) {
+        // Cache hit - return cached data
+        return {
+          id: cachedAd.id,
+          channelId: cachedAd.channelId ?? '',
+          adsModuleId: cachedAd.adsModuleId ?? '',
+          adType: cachedAd.adType ?? '',
+          adsUrl: cachedAd.adsUrl,
+          advertiser: cachedAd.advertiser ?? '',
+          title: cachedAd.title ?? '',
+        };
+      }
+
+      // Cache miss - fallback to MongoDB
+      this.logger.debug(
+        `Ad cache miss: adsId=${adsId}, key=${adKey}, falling back to MongoDB`,
+      );
+
+      const now = BigInt(Date.now());
       const ad = await this.mongoPrisma.ads.findUnique({
         where: { id: adsId },
         include: {
@@ -389,7 +421,7 @@ export class AdService {
       });
 
       if (!ad) {
-        this.logger.debug(`Ad not found: adsId=${adsId}`);
+        this.logger.debug(`Ad not found in MongoDB: adsId=${adsId}`);
         return null;
       }
 
@@ -411,18 +443,48 @@ export class AdService {
         return null;
       }
 
+      // Cache the ad for future requests (fire-and-forget)
+      const cacheData: CachedAd = {
+        id: ad.id,
+        channelId: ad.channelId,
+        adsModuleId: ad.adsModuleId,
+        advertiser: ad.advertiser,
+        title: ad.title,
+        adsContent: ad.adsContent,
+        adsCoverImg: ad.adsCoverImg,
+        adsUrl: ad.adsUrl,
+        adType: ad.adsModule.adType,
+      };
+
+      // Warm cache in Redis (fire-and-forget)
+      this.redis.setJson(adKey, cacheData, 300).catch((err) => {
+        this.logger.warn(
+          `Failed to warm Redis cache for adsId=${adsId}: ${err instanceof Error ? err.message : String(err)}`,
+        );
+      });
+
+      // Also notify mgnt-api to persist cache rebuild for durability (fire-and-forget)
+      // This ensures the ad remains cached even if Redis is cleared
+      this.notifyCacheSyncForAdRefresh(ad.id, ad.adsModule.adType).catch(
+        (err) => {
+          this.logger.debug(
+            `Failed to notify mgnt-api for cache rebuild: ${err instanceof Error ? err.message : String(err)}`,
+          );
+        },
+      );
+
       return {
         id: ad.id,
         channelId: ad.channelId,
         adsModuleId: ad.adsModuleId,
         adType: ad.adsModule.adType,
         adsUrl: ad.adsUrl,
-        advertiser: ad.advertiser,
-        title: ad.title,
+        advertiser: ad.advertiser ?? '',
+        title: ad.title ?? '',
       };
     } catch (err) {
       this.logger.error(
-        `Error fetching ad by ID: adsId=${adsId}`,
+        `Error fetching ad: adsId=${adsId}, key=${adKey}`,
         err instanceof Error ? err.stack : String(err),
       );
       return null;
@@ -581,4 +643,43 @@ export class AdService {
       `Initiated stats.ad.impression publish for adId=${body.adId}, uid=${uid}`,
     );
   }
+
+  /**
+   * Notify mgnt-api to schedule a cache rebuild for a specific ad.
+   * This is called when an ad is loaded from MongoDB (cache miss),
+   * so that mgnt-api can persist the cache rebuild for durability.
+   * Uses fire-and-forget with timeout to avoid blocking requests.
+   *
+   * @param adId - The ad ID to refresh
+   * @param adType - The ad type for pool rebuild
+   */
+  private async notifyCacheSyncForAdRefresh(
+    adId: string,
+    adType: string,
+  ): Promise<void> {
+    try {
+      const url = `${this.mgntApiBaseUrl}/mgnt-debug/cache-sync/ad/refresh`;
+      const timeout = 5000; // 5 second timeout
+      const controller = new AbortController();
+      const timeoutId = setTimeout(() => controller.abort(), timeout);
+
+      const response = await this.httpService.axiosRef.post(
+        url,
+        { adId, adType },
+        { signal: controller.signal },
+      );
+
+      clearTimeout(timeoutId);
+
+      this.logger.debug(
+        `Successfully notified mgnt-api to rebuild cache for adId=${adId}`,
+      );
+    } catch (err) {
+      // Log but don't fail - this is best-effort
+      const errorMsg = err instanceof Error ? err.message : String(err);
+      this.logger.debug(
+        `Failed to notify mgnt-api for cache rebuild (adId=${adId}): ${errorMsg}`,
+      );
+    }
+  }
 }

+ 3 - 1
apps/box-mgnt-api/src/cache-sync/cache-sync.service.ts

@@ -568,9 +568,11 @@ export class CacheSyncService {
 
     // Decide what to store in per-ad cache.
     // You can store the full ad document or a trimmed DTO.
-    // For now, lets store the full ad + its module's adType.
+    // For now, let's store the full ad + its module's adType.
     const cachedAd = {
       id: ad.id,
+      channelId: ad.channelId,
+      adsModuleId: ad.adsModuleId,
       advertiser: ad.advertiser,
       title: ad.title,
       adsContent: ad.adsContent ?? null,

+ 2 - 1
apps/box-mgnt-api/src/mgnt-backend/feature/video-media/video-media.module.ts

@@ -2,10 +2,11 @@ import { Module } from '@nestjs/common';
 import { VideoMediaService } from './video-media.service';
 import { VideoMediaController } from './video-media.controller';
 import { PrismaModule } from '@box/db/prisma/prisma.module';
+import { CacheSyncModule } from '../../../cache-sync/cache-sync.module';
 import { ImageUploadModule } from '../image-upload/image-upload.module';
 
 @Module({
-  imports: [PrismaModule, ImageUploadModule],
+  imports: [PrismaModule, CacheSyncModule, ImageUploadModule],
   controllers: [VideoMediaController],
   providers: [VideoMediaService],
   exports: [VideoMediaService],

+ 60 - 0
apps/box-mgnt-api/src/mgnt-backend/feature/video-media/video-media.service.ts

@@ -5,6 +5,8 @@ import {
 } from '@nestjs/common';
 import { MongoPrismaService } from '@box/db/prisma/mongo-prisma.service';
 import { ImageUploadService } from '../image-upload/image-upload.service';
+import { CacheSyncService } from '../../../cache-sync/cache-sync.service';
+import { CacheEntityType } from '../../../cache-sync/cache-sync.types';
 import {
   VideoMediaListQueryDto,
   UpdateVideoMediaManageDto,
@@ -17,6 +19,7 @@ export class VideoMediaService {
   constructor(
     private readonly prisma: MongoPrismaService,
     private readonly imageUploadService: ImageUploadService,
+    private readonly cacheSyncService: CacheSyncService,
   ) {}
 
   async findAll(query: VideoMediaListQueryDto): Promise<any> {
@@ -191,6 +194,22 @@ export class VideoMediaService {
       data: updateData,
     });
 
+    // Refresh category video lists cache if category changed or affected
+    if (video.categoryId) {
+      await this.cacheSyncService.scheduleAction({
+        entityType: CacheEntityType.VIDEO_LIST,
+        operation: 'REFRESH',
+        payload: { categoryId: video.categoryId },
+      } as any);
+    }
+    if (updateData.categoryId && updateData.categoryId !== video.categoryId) {
+      await this.cacheSyncService.scheduleAction({
+        entityType: CacheEntityType.VIDEO_LIST,
+        operation: 'REFRESH',
+        payload: { categoryId: updateData.categoryId },
+      } as any);
+    }
+
     return this.findOne(id);
   }
 
@@ -219,6 +238,15 @@ export class VideoMediaService {
       },
     });
 
+    // Refresh category video lists cache if video has a category
+    if (video.categoryId) {
+      await this.cacheSyncService.scheduleAction({
+        entityType: CacheEntityType.VIDEO_LIST,
+        operation: 'REFRESH',
+        payload: { categoryId: video.categoryId },
+      } as any);
+    }
+
     return {
       id,
       listStatus: dto.listStatus,
@@ -238,6 +266,12 @@ export class VideoMediaService {
     const editedAt = BigInt(Date.now());
     const updatedAt = new Date();
 
+    // Fetch affected videos to get their categories for cache refresh
+    const affectedVideos = await this.prisma.videoMedia.findMany({
+      where: { id: { in: dto.ids } },
+      select: { categoryId: true },
+    });
+
     const result = await this.prisma.videoMedia.updateMany({
       where: { id: { in: dto.ids } },
       data: {
@@ -247,6 +281,23 @@ export class VideoMediaService {
       },
     });
 
+    // Refresh cache for all affected categories (fire-and-forget)
+    const affectedCategoryIds = [
+      ...new Set(
+        affectedVideos
+          .map((v) => v.categoryId)
+          .filter((cid) => cid !== null && cid !== undefined) as string[],
+      ),
+    ];
+
+    for (const categoryId of affectedCategoryIds) {
+      await this.cacheSyncService.scheduleAction({
+        entityType: CacheEntityType.VIDEO_LIST,
+        operation: 'REFRESH',
+        payload: { categoryId },
+      } as any);
+    }
+
     return {
       affected: result.count,
       listStatus: dto.listStatus,
@@ -287,6 +338,15 @@ export class VideoMediaService {
       },
     });
 
+    // Refresh category video lists cache if video has a category
+    if (video.categoryId) {
+      await this.cacheSyncService.scheduleAction({
+        entityType: CacheEntityType.VIDEO_LIST,
+        operation: 'REFRESH',
+        payload: { categoryId: video.categoryId },
+      } as any);
+    }
+
     return {
       id: updated.id,
       coverImg: updated.coverImg,

+ 169 - 0
libs/core/src/ad/ad-cache-warmup.service.ts

@@ -0,0 +1,169 @@
+import { Injectable, Logger, OnModuleInit } from '@nestjs/common';
+import { CacheKeys } from '@box/common/cache/cache-keys';
+import { RedisService } from '@box/db/redis/redis.service';
+import { MongoPrismaService } from '@box/db/prisma/mongo-prisma.service';
+import type { AdType } from '@box/common/ads/ad-types';
+
+interface CachedAd {
+  id: string;
+  channelId: string;
+  adsModuleId: string;
+  advertiser: string;
+  title: string;
+  adsContent: string | null;
+  adsCoverImg: string | null;
+  adsUrl: string | null;
+  adType: string;
+}
+
+/**
+ * AdCacheWarmupService
+ *
+ * Responsible for warming up individual ad caches (app:ad:by-id:${adId})
+ * on application startup or on-demand.
+ *
+ * This complements AdPoolWarmupService which handles ad pools.
+ */
+@Injectable()
+export class AdCacheWarmupService implements OnModuleInit {
+  private readonly logger = new Logger(AdCacheWarmupService.name);
+  private readonly AD_CACHE_TTL = 300; // 5 minutes
+
+  constructor(
+    private readonly redis: RedisService,
+    private readonly mongoPrisma: MongoPrismaService,
+  ) {}
+
+  async onModuleInit(): Promise<void> {
+    try {
+      this.logger.log('Individual ad cache warmup starting...');
+      await this.warmupAllAdCaches();
+    } catch (err) {
+      this.logger.error(
+        'Individual ad cache warmup encountered an error but will not block startup',
+        err instanceof Error ? err.stack : String(err),
+      );
+    }
+  }
+
+  /**
+   * Warm up all active ads' individual caches.
+   * Only caches ads that are enabled and within their date range.
+   */
+  async warmupAllAdCaches(): Promise<void> {
+    const startTime = Date.now();
+    const now = BigInt(Date.now());
+
+    try {
+      // Fetch all active ads
+      const ads = await this.mongoPrisma.ads.findMany({
+        where: {
+          status: 1, // enabled
+          startDt: { lte: now },
+          OR: [{ expiryDt: BigInt(0) }, { expiryDt: { gte: now } }],
+        },
+        include: {
+          adsModule: { select: { adType: true } },
+        },
+      });
+
+      this.logger.log(`Found ${ads.length} active ads to cache`);
+
+      let successCount = 0;
+      let errorCount = 0;
+
+      // Cache each ad individually
+      for (const ad of ads) {
+        try {
+          await this.cacheAd(ad.id, {
+            id: ad.id,
+            channelId: ad.channelId,
+            adsModuleId: ad.adsModuleId,
+            advertiser: ad.advertiser,
+            title: ad.title,
+            adsContent: ad.adsContent ?? null,
+            adsCoverImg: ad.adsCoverImg ?? null,
+            adsUrl: ad.adsUrl ?? null,
+            adType: ad.adsModule.adType,
+          });
+          successCount++;
+        } catch (err) {
+          errorCount++;
+          this.logger.warn(
+            `Failed to cache ad ${ad.id}: ${err instanceof Error ? err.message : String(err)}`,
+          );
+        }
+      }
+
+      const duration = Date.now() - startTime;
+      this.logger.log(
+        `Ad cache warmup completed: ${successCount} cached, ${errorCount} errors, ${duration}ms`,
+      );
+    } catch (err) {
+      this.logger.error(
+        'Ad cache warmup failed',
+        err instanceof Error ? err.stack : String(err),
+      );
+      throw err;
+    }
+  }
+
+  /**
+   * Cache a single ad by ID
+   */
+  private async cacheAd(adId: string, cachedAd: CachedAd): Promise<void> {
+    const key = CacheKeys.appAdById(adId);
+    await this.redis.setJson(key, cachedAd, this.AD_CACHE_TTL);
+  }
+
+  /**
+   * Warm up a single ad cache (used for on-demand refresh)
+   */
+  async warmupSingleAd(adId: string): Promise<void> {
+    const now = BigInt(Date.now());
+
+    const ad = await this.mongoPrisma.ads.findUnique({
+      where: { id: adId },
+      include: {
+        adsModule: { select: { adType: true } },
+      },
+    });
+
+    if (!ad) {
+      // Ad doesn't exist - remove from cache
+      const key = CacheKeys.appAdById(adId);
+      await this.redis.del(key);
+      this.logger.debug(`Ad ${adId} not found, removed from cache`);
+      return;
+    }
+
+    // Check if ad is active
+    const isActive =
+      ad.status === 1 &&
+      ad.startDt <= now &&
+      (ad.expiryDt === BigInt(0) || ad.expiryDt >= now);
+
+    if (!isActive) {
+      // Ad is not active - remove from cache
+      const key = CacheKeys.appAdById(adId);
+      await this.redis.del(key);
+      this.logger.debug(`Ad ${adId} is not active, removed from cache`);
+      return;
+    }
+
+    // Cache the ad
+    await this.cacheAd(adId, {
+      id: ad.id,
+      channelId: ad.channelId,
+      adsModuleId: ad.adsModuleId,
+      advertiser: ad.advertiser,
+      title: ad.title,
+      adsContent: ad.adsContent ?? null,
+      adsCoverImg: ad.adsCoverImg ?? null,
+      adsUrl: ad.adsUrl ?? null,
+      adType: ad.adsModule.adType,
+    });
+
+    this.logger.debug(`Cached ad ${adId}`);
+  }
+}

+ 4 - 1
libs/core/src/cache/cache-manager.module.ts

@@ -3,6 +3,7 @@ import { MongoPrismaService } from '@box/db/prisma/mongo-prisma.service';
 import { AdPoolService } from '../ad/ad-pool.service';
 import { AdPoolBuilder } from '../ad/ad-pool.builder';
 import { AdPoolWarmupService } from '../ad/ad-pool-warmup.service';
+import { AdCacheWarmupService } from '../ad/ad-cache-warmup.service';
 import { CategoryCacheService } from './category/category-cache.service';
 import { CategoryCacheBuilder } from './category/category-cache.builder';
 import { CategoryWarmupService } from './category/category-warmup.service';
@@ -21,10 +22,11 @@ import { VideoListCacheBuilder } from './video/list/video-list-cache.builder';
     // Shared data sources
     MongoPrismaService,
 
-    // Ad pools
+    // Ad pools & individual ad caches
     AdPoolService,
     AdPoolBuilder,
     AdPoolWarmupService,
+    AdCacheWarmupService,
 
     // Categories
     CategoryCacheService,
@@ -49,6 +51,7 @@ import { VideoListCacheBuilder } from './video/list/video-list-cache.builder';
   exports: [
     AdPoolService,
     AdPoolBuilder,
+    AdCacheWarmupService,
     CategoryCacheService,
     CategoryCacheBuilder,
     TagCacheService,