Bläddra i källkod

feat: integrate CacheSyncModule into Ads, Category, and Channel features

- Added CacheSyncModule to AdsModule, CategoryModule, and ChannelModule for cache synchronization.
- Updated AdsService, CategoryService, and ChannelService to utilize CacheSyncService for auto-scheduling cache refreshes upon create, update, and delete operations.
- Introduced ad pool configuration and types in common library for better ad management.
- Enhanced RedisService with pipeline and atomic swap methods for efficient cache operations.
- Updated Prisma schema to include AdType enum for better type safety and clarity in ad management.
Dave 4 månader sedan
förälder
incheckning
73d7723d4f

+ 2 - 2
.env.app

@@ -4,10 +4,10 @@ APP_ENV=test
 # Prisma Config
 # MONGO_URL="mongodb://boxuser:dwR%3D%29whu2Ze@localhost:27017/box_admin?authSource=admin"
 # Dave local
-# MONGO_URL="mongodb://admin:ZXcv%21%21996@localhost:27017/box_admin?authSource=admin&replicaSet=rs0"
+MONGO_URL="mongodb://admin:ZXcv%21%21996@localhost:27017/box_admin?authSource=admin&replicaSet=rs0"
 
 # office dev env
-MONGO_URL="mongodb://msAdmin:Fl1%2A29MJe%26jLvj@192.168.0.100:27017/box_admin?authSource=admin&replicaSet=rs0"
+# MONGO_URL="mongodb://msAdmin:Fl1%2A29MJe%26jLvj@192.168.0.100:27017/box_admin?authSource=admin&replicaSet=rs0"
 
 # Redis Config
 REDIS_HOST=127.0.0.1

+ 4 - 4
.env.mgnt

@@ -6,12 +6,12 @@ APP_ENV=test
 # MONGO_URL="mongodb://boxuser:dwR%3D%29whu2Ze@localhost:27017/box_admin?authSource=admin"
 
 # dave local
-# MYSQL_URL="mysql://root:123456@localhost:3306/box_admin"
-# MONGO_URL="mongodb://admin:ZXcv%21%21996@localhost:27017/box_admin?authSource=admin"
+MYSQL_URL="mysql://root:123456@localhost:3306/box_admin"
+MONGO_URL="mongodb://admin:ZXcv%21%21996@localhost:27017/box_admin?authSource=admin"
 
 # office dev env
-MYSQL_URL="mysql://root:123456@192.168.0.100:3306/box_admin"
-MONGO_URL="mongodb://msAdmin:Fl1%2A29MJe%26jLvj@192.168.0.100:27017/box_admin?authSource=admin&replicaSet=rs0"
+# MYSQL_URL="mysql://root:123456@192.168.0.100:3306/box_admin"
+# MONGO_URL="mongodb://msAdmin:Fl1%2A29MJe%26jLvj@192.168.0.100:27017/box_admin?authSource=admin&replicaSet=rs0"
 
 # Redis Config
 # REDIS_HOST=127.0.0.1

+ 2 - 0
apps/box-mgnt-api/src/app.module.ts

@@ -1,6 +1,7 @@
 import { Module, OnModuleInit } from '@nestjs/common';
 import { ConfigModule, ConfigService } from '@nestjs/config';
 import { DevtoolsModule } from '@nestjs/devtools-integration';
+import { ScheduleModule } from '@nestjs/schedule';
 import dayjs from 'dayjs';
 import timezone from 'dayjs/plugin/timezone.js';
 import utc from 'dayjs/plugin/utc.js';
@@ -24,6 +25,7 @@ import { RedisModule } from '@box/db/redis/redis.module';
       validate: validateEnvironment,
     }),
     ConfigModule.forFeature(appConfigFactory),
+    ScheduleModule.forRoot(),
 
     RedisModule.forRootAsync({
       imports: [ConfigModule],

+ 14 - 0
apps/box-mgnt-api/src/cache-sync/cache-sync-debug.controller.ts

@@ -85,4 +85,18 @@ export class CacheSyncDebugController {
       message: `Processed up to ${limit} pending actions (see logs).`,
     };
   }
+
+  /**
+   * POST /mgnt-debug/cache-sync/warm
+   * Pre-warm all critical cache keys (channels, categories, ad pools).
+   */
+  @Post('warm')
+  async warmCache() {
+    await this.cacheSyncService.warmCache();
+    this.logger.log('Cache warming completed');
+    return {
+      ok: true,
+      message: 'Cache warming completed (see logs for timing).',
+    };
+  }
 }

+ 236 - 135
apps/box-mgnt-api/src/cache-sync/cache-sync.service.ts

@@ -1,10 +1,18 @@
 // apps/box-mgnt-api/src/cache-sync/cache-sync.service.ts
 import { Injectable, Logger } from '@nestjs/common';
+import { Cron, CronExpression } from '@nestjs/schedule';
 import { Prisma as MysqlPrisma, CacheSyncAction } from '@prisma/mysql/client';
 import { MysqlPrismaService } from '@box/db/prisma/mysql-prisma.service';
 import { MongoPrismaService } from '@box/db/prisma/mongo-prisma.service';
 import { RedisService } from '@box/db/redis/redis.service';
-import { CacheKeys } from '@box/common/cache/cache-keys'; // 👈 new import
+import { CacheKeys } from '@box/common/cache/cache-keys';
+import { ADTYPE_POOLS } from '@box/common/ads/ad-pool-config';
+import type {
+  AdType,
+  AdPoolEntry,
+  AdScene,
+  AdSlot,
+} from '@box/common/ads/ad-types';
 
 import {
   CacheEntityType,
@@ -13,59 +21,26 @@ import {
   CachePayload,
 } from './cache-sync.types';
 
-interface AdPoolPlacement {
-  scene: string;
-  slot: string;
-}
-
-/**
- * Mapping from adType (AdsModule.adType) → one or more pool placements.
- * Each placement becomes a Redis key:
- *   app:adpool:<scene>:<slot>:<adType>
- *
- * Adjust these mappings later if your design changes.
- */
-const ADTYPE_POOLS: Record<string, AdPoolPlacement[]> = {
-  // 启动页广告
-  STARTUP: [{ scene: 'home', slot: 'startup' }],
-
-  // 首页轮播
-  CAROUSEL: [{ scene: 'home', slot: 'carousel' }],
-
-  // 弹窗类(详情页)
-  POPUP_ICON: [{ scene: 'detail', slot: 'popup' }],
-  POPUP_IMAGE: [{ scene: 'detail', slot: 'popup' }],
-  POPUP_OFFICIAL: [{ scene: 'detail', slot: 'popup' }],
-
-  // 瀑布流(首页)
-  WATERFALL_ICON: [{ scene: 'home', slot: 'waterfall' }],
-  WATERFALL_TEXT: [{ scene: 'home', slot: 'waterfall' }],
-  WATERFALL_VIDEO: [{ scene: 'home', slot: 'waterfall' }],
-
-  // 悬浮(全局)
-  FLOATING_BOTTOM: [{ scene: 'global', slot: 'floating_bottom' }],
-  FLOATING_EDGE: [{ scene: 'global', slot: 'floating_edge' }],
-
-  // 顶部 banner
-  BANNER: [{ scene: 'home', slot: 'top' }],
-
-  // 片头(前贴片)
-  PREROLL: [{ scene: 'player', slot: 'preroll' }],
-
-  // 暂停广告
-  PAUSE: [{ scene: 'player', slot: 'pause' }],
-};
-
-interface AdPoolEntry {
-  id: string;
-  // You can later change this to real weight if you add a weight field.
-  weight: number;
-}
+// Cache TTL (seconds)
+const CHANNEL_CACHE_TTL = 900; // 15 min
+const CATEGORY_CACHE_TTL = 900; // 15 min
+const AD_CACHE_TTL = 300; // 5 min (more dynamic)
+const AD_POOL_TTL = 300; // 5 min
 
 @Injectable()
 export class CacheSyncService {
   private readonly logger = new Logger(CacheSyncService.name);
 
+  private readonly actionHandlers: Partial<
+    Record<CacheEntityType, (action: CacheSyncAction) => Promise<void>>
+  > = {
+    [CacheEntityType.CHANNEL]: this.handleChannelAction.bind(this),
+    [CacheEntityType.CATEGORY]: this.handleCategoryAction.bind(this),
+    [CacheEntityType.AD]: this.handleAdAction.bind(this),
+    [CacheEntityType.AD_POOL]: this.handleAdPoolAction.bind(this),
+    [CacheEntityType.VIDEO_LIST]: this.handleVideoListAction.bind(this),
+  };
+
   constructor(
     // MySQL: durable queue of actions
     private readonly mysqlPrisma: MysqlPrismaService,
@@ -94,6 +69,27 @@ export class CacheSyncService {
     const now = this.nowBigInt();
     const nextAttemptAt = delayMs && delayMs > 0 ? now + BigInt(delayMs) : now;
 
+    // Basic dedupe: avoid piling up identical PENDING actions.
+    const existing = await this.mysqlPrisma.cacheSyncAction.findFirst({
+      where: {
+        entityType,
+        operation,
+        status: CacheStatus.PENDING,
+        entityId: entityId != null ? BigInt(entityId) : null,
+        // If payload carries a 'type' or 'adId', match them when present.
+        // Prisma can't query inside JSON easily across drivers; we keep it simple:
+        // only dedupe when entityId matches or when entityId is null (for pools),
+        // and rely on operation granularity.
+      },
+    });
+
+    if (existing) {
+      this.logger.debug(
+        `Deduped CacheSyncAction: entityType=${entityType}, operation=${operation}, entityId=${entityId ?? 'null'}`,
+      );
+      return;
+    }
+
     await this.mysqlPrisma.cacheSyncAction.create({
       data: {
         entityType,
@@ -162,10 +158,17 @@ export class CacheSyncService {
   }
 
   // ─────────────────────────────────────────────
-  // Minimal processing loop (single batch).
-  // Later you can move this into a @Cron job.
+  // Queue processing: cron + manual
   // ─────────────────────────────────────────────
 
+  /**
+   * Cron job: process pending actions every 10 seconds.
+   */
+  @Cron(CronExpression.EVERY_10_SECONDS)
+  async processQueueCron() {
+    await this.processPendingOnce(50);
+  }
+
   async processPendingOnce(limit = 20): Promise<void> {
     const now = this.nowBigInt();
 
@@ -201,6 +204,8 @@ export class CacheSyncService {
         const attempts = action.attempts + 1;
         const maxAttempts = 5;
         const backoffMs = Math.min(60000, 5000 * attempts); // up to 60s
+        const updateTime = this.nowBigInt();
+        const nextAttemptAt = updateTime + BigInt(backoffMs);
 
         await this.mysqlPrisma.cacheSyncAction.update({
           where: { id: action.id },
@@ -211,8 +216,8 @@ export class CacheSyncService {
                 : CacheStatus.PENDING,
             attempts,
             lastError: message,
-            nextAttemptAt: this.nowBigInt() + BigInt(backoffMs),
-            updatedAt: this.nowBigInt(),
+            nextAttemptAt,
+            updatedAt: updateTime,
           },
         });
       }
@@ -223,34 +228,24 @@ export class CacheSyncService {
    * Main dispatcher: decide what to do for each action.
    */
   private async handleSingleAction(action: CacheSyncAction): Promise<void> {
-    switch (action.entityType as CacheEntityType) {
-      case CacheEntityType.CHANNEL:
-        await this.handleChannelAction(action);
-        break;
-
-      case CacheEntityType.CATEGORY:
-        await this.handleCategoryAction(action);
-        break;
+    const handler = this.actionHandlers[action.entityType as CacheEntityType];
 
-      case CacheEntityType.AD:
-        await this.handleAdAction(action);
-        break;
+    if (!handler) {
+      this.logger.warn(
+        `Unknown entityType for CacheSyncAction id=${action.id}: ${action.entityType}`,
+      );
+    } else {
+      await handler(action);
+    }
 
-      case CacheEntityType.AD_POOL:
-        await this.handleAdPoolAction(action);
-        break;
+    await this.markActionSuccess(action);
 
-      case CacheEntityType.VIDEO_LIST:
-        await this.handleVideoListAction(action);
-        break;
-
-      default:
-        this.logger.warn(
-          `Unknown entityType for CacheSyncAction id=${action.id}: ${action.entityType}`,
-        );
-        break;
-    }
+    this.logger.debug(
+      `Processed CacheSyncAction id=${action.id}, entityType=${action.entityType}, operation=${action.operation}`,
+    );
+  }
 
+  private async markActionSuccess(action: CacheSyncAction): Promise<void> {
     await this.mysqlPrisma.cacheSyncAction.update({
       where: { id: action.id },
       data: {
@@ -260,10 +255,6 @@ export class CacheSyncService {
         updatedAt: this.nowBigInt(),
       },
     });
-
-    this.logger.debug(
-      `Processed CacheSyncAction id=${action.id}, entityType=${action.entityType}, operation=${action.operation}`,
-    );
   }
 
   // ─────────────────────────────────────────────
@@ -275,6 +266,20 @@ export class CacheSyncService {
       case CacheOperation.REFRESH_ALL:
         await this.rebuildChannelsAll();
         break;
+      case CacheOperation.INVALIDATE: {
+        const payload = action.payload as CachePayload | null;
+        const id = (payload as any)?.id as string | number | undefined;
+        if (id != null) {
+          await this.redis.del(CacheKeys.appChannelById(id));
+          this.logger.log(
+            `Invalidated channel by id key=${CacheKeys.appChannelById(id)}`,
+          );
+        } else {
+          await this.redis.del(CacheKeys.appChannelAll);
+          this.logger.log(`Invalidated ${CacheKeys.appChannelAll}`);
+        }
+        break;
+      }
       default:
         this.logger.warn(
           `Unsupported CHANNEL operation for action id=${action.id}: ${action.operation}`,
@@ -292,10 +297,34 @@ export class CacheSyncService {
       },
     });
 
-    await this.redis.setJson(CacheKeys.appChannelAll, channels);
+    const sanitized = channels.map((c) => ({
+      id: c.id,
+      name: c.name,
+      landingUrl: c.landingUrl,
+      videoCdn: c.videoCdn ?? null,
+      coverCdn: c.coverCdn ?? null,
+      clientName: c.clientName ?? null,
+      clientNotice: c.clientNotice ?? null,
+      remark: c.remark ?? null,
+      createAt:
+        typeof c.createAt === 'bigint'
+          ? Number(c.createAt)
+          : (c as any).createAt,
+      updateAt:
+        typeof c.updateAt === 'bigint'
+          ? Number(c.updateAt)
+          : (c as any).updateAt,
+    }));
+
+    const start = Date.now();
+    await this.redis.setJson(
+      CacheKeys.appChannelAll,
+      sanitized,
+      CHANNEL_CACHE_TTL,
+    );
 
     this.logger.log(
-      `Rebuilt ${CacheKeys.appChannelAll} with ${channels.length} item(s).`,
+      `Rebuilt ${CacheKeys.appChannelAll} with ${channels.length} item(s), ${Date.now() - start}ms`,
     );
   }
 
@@ -308,6 +337,20 @@ export class CacheSyncService {
       case CacheOperation.REFRESH_ALL:
         await this.rebuildCategoriesAll();
         break;
+      case CacheOperation.INVALIDATE: {
+        const payload = action.payload as CachePayload | null;
+        const id = (payload as any)?.id as string | number | undefined;
+        if (id != null) {
+          await this.redis.del(CacheKeys.appCategoryById(id));
+          this.logger.log(
+            `Invalidated category by id key=${CacheKeys.appCategoryById(id)}`,
+          );
+        } else {
+          await this.redis.del(CacheKeys.appCategoryAll);
+          this.logger.log(`Invalidated ${CacheKeys.appCategoryAll}`);
+        }
+        break;
+      }
       default:
         this.logger.warn(
           `Unsupported CATEGORY operation for action id=${action.id}: ${action.operation}`,
@@ -325,10 +368,32 @@ export class CacheSyncService {
       },
     });
 
-    await this.redis.setJson(CacheKeys.appCategoryAll, categories);
+    const sanitized = categories.map((c) => ({
+      id: c.id,
+      name: c.name,
+      subtitle: c.subtitle ?? null,
+      channelId: c.channelId,
+      seq: c.seq,
+      status: c.status,
+      createAt:
+        typeof c.createAt === 'bigint'
+          ? Number(c.createAt)
+          : (c as any).createAt,
+      updateAt:
+        typeof c.updateAt === 'bigint'
+          ? Number(c.updateAt)
+          : (c as any).updateAt,
+    }));
+
+    const start = Date.now();
+    await this.redis.setJson(
+      CacheKeys.appCategoryAll,
+      sanitized,
+      CATEGORY_CACHE_TTL,
+    );
 
     this.logger.log(
-      `Rebuilt ${CacheKeys.appCategoryAll} with ${categories.length} item(s).`,
+      `Rebuilt ${CacheKeys.appCategoryAll} with ${categories.length} item(s), ${Date.now() - start}ms`,
     );
   }
 
@@ -349,7 +414,18 @@ export class CacheSyncService {
     const adId = payload.adId;
     const adType = payload.type;
 
-    await this.rebuildSingleAdCache(adId, adType);
+    switch (action.operation as CacheOperation) {
+      case CacheOperation.INVALIDATE: {
+        const key = CacheKeys.appAdById(adId);
+        await this.redis.del(key);
+        this.logger.log(`Invalidated per-ad cache key=${key}`);
+        break;
+      }
+      case CacheOperation.REFRESH:
+      default:
+        await this.rebuildSingleAdCache(adId, adType);
+        break;
+    }
 
     this.logger.debug(
       `handleAdAction: rebuilt per-ad cache for adId=${adId}, adType=${adType ?? 'N/A'}, action id=${action.id}`,
@@ -403,24 +479,15 @@ export class CacheSyncService {
     // For now, let’s store the full ad + its module's adType.
     const cachedAd = {
       id: ad.id,
-      channelId: ad.channelId,
-      adsModuleId: ad.adsModuleId,
       advertiser: ad.advertiser,
       title: ad.title,
-      adsContent: ad.adsContent,
-      adsCoverImg: ad.adsCoverImg,
-      adsUrl: ad.adsUrl,
-      startDt: ad.startDt,
-      expiryDt: ad.expiryDt,
-      seq: ad.seq,
-      status: ad.status,
-      createAt: ad.createAt,
-      updateAt: ad.updateAt,
-      // include adType from AdsModule so app-api can know its type
+      adsContent: ad.adsContent ?? null,
+      adsCoverImg: ad.adsCoverImg ?? null,
+      adsUrl: ad.adsUrl ?? null,
       adType: ad.adsModule?.adType ?? adType ?? null,
     };
 
-    await this.redis.setJson(cacheKey, cachedAd);
+    await this.redis.setJson(cacheKey, cachedAd, AD_CACHE_TTL);
 
     this.logger.log(
       `rebuildSingleAdCache: updated per-ad cache for adId=${adId}, key=${cacheKey}`,
@@ -429,7 +496,7 @@ export class CacheSyncService {
 
   private async handleAdPoolAction(action: CacheSyncAction): Promise<void> {
     const payload = action.payload as CachePayload | null;
-    const adType = payload?.type;
+    const adType = payload?.type as AdType | undefined;
 
     if (!adType) {
       this.logger.warn(
@@ -438,7 +505,7 @@ export class CacheSyncService {
       return;
     }
 
-    const placements = ADTYPE_POOLS[adType];
+    const placements = ADTYPE_POOLS[adType] ?? [];
 
     if (!placements || placements.length === 0) {
       this.logger.warn(
@@ -447,64 +514,67 @@ export class CacheSyncService {
       return;
     }
 
-    this.logger.log(
-      `handleAdPoolAction: rebuilding ${placements.length} pool(s) for adType=${adType}, action id=${action.id}`,
-    );
-
-    for (const placement of placements) {
-      await this.rebuildAdPoolForPlacement(
-        adType,
-        placement.scene,
-        placement.slot,
-      );
+    switch (action.operation as CacheOperation) {
+      case CacheOperation.INVALIDATE: {
+        // remove all pools for this adType
+        const pattern = `app:adpool:*:*:${adType}`;
+        const deleted = await this.redis.deleteByPattern(pattern);
+        this.logger.log(
+          `Invalidated ${deleted} pool key(s) for adType=${adType} using pattern=${pattern}`,
+        );
+        break;
+      }
+      case CacheOperation.REBUILD_POOL:
+      default: {
+        this.logger.log(
+          `handleAdPoolAction: rebuilding ${placements.length} pool(s) for adType=${adType}, action id=${action.id}`,
+        );
+        for (const placement of placements) {
+          await this.rebuildAdPoolForPlacement(
+            adType,
+            placement.scene,
+            placement.slot,
+          );
+        }
+        break;
+      }
     }
   }
 
   private async rebuildAdPoolForPlacement(
-    adType: string,
-    scene: string,
-    slot: string,
+    adType: AdType,
+    scene: AdScene,
+    slot: AdSlot,
   ): Promise<void> {
     const now = this.nowBigInt();
 
-    // NOTE:
-    // - status: 1 means enabled
-    // - startDt <= now
-    // - expiryDt == 0 (no expiry) OR expiryDt >= now
-    //
-    // Adjust the expiry logic if your business rule is different.
     const ads = await this.mongoPrisma.ads.findMany({
       where: {
         status: 1,
-        startDt: {
-          lte: now,
-        },
-        OR: [
-          { expiryDt: BigInt(0) }, // "no expiry" if you use 0 as sentinel
-          { expiryDt: { gte: now } },
-        ],
+        startDt: { lte: now },
+        OR: [{ expiryDt: BigInt(0) }, { expiryDt: { gte: now } }],
         adsModule: {
-          adType, // join AdsModule on adType
+          is: { adType },
         },
       },
-      orderBy: {
-        seq: 'asc', // IMPORTANT: you said Ads list must be ordered by seq
-      },
+      orderBy: { seq: 'asc' },
     });
 
     const poolEntries: AdPoolEntry[] = ads.map((ad) => ({
       id: ad.id,
-      // For now use a flat weight.
-      // Later you can map weight from seq or a dedicated weight field.
       weight: 1,
     }));
 
     const key = CacheKeys.appAdPool(scene, slot, adType);
 
-    await this.redis.setJson(key, poolEntries);
+    // Atomic swap to avoid partial-read windows
+    const start = Date.now();
+    await this.redis.atomicSwapJson([
+      { key, value: poolEntries, ttlSeconds: AD_POOL_TTL },
+    ]);
 
     this.logger.log(
-      `Rebuilt ad pool ${key} with ${poolEntries.length} ad(s) for adType=${adType}, scene=${scene}, slot=${slot}.`,
+      `Rebuilt ad pool ${key} with ${poolEntries.length} ad(s) for adType=${adType}, scene=${scene}, slot=${slot}, ${Date.now() - start}ms`,
     );
   }
 
@@ -519,4 +589,35 @@ export class CacheSyncService {
       `handleVideoListAction placeholder, operation=${action.operation}`,
     );
   }
+
+  // ─────────────────────────────────────────────
+  // Cache warming
+  // ─────────────────────────────────────────────
+
+  /**
+   * Pre-warm critical cache keys on startup or on-demand.
+   * Call this from app startup or via admin endpoint.
+   */
+  async warmCache(): Promise<void> {
+    this.logger.log('Cache warming started');
+    const start = Date.now();
+
+    await Promise.all([
+      this.rebuildChannelsAll(),
+      this.rebuildCategoriesAll(),
+      this.warmAdPools(),
+    ]);
+
+    this.logger.log(`Cache warming complete in ${Date.now() - start}ms`);
+  }
+
+  private async warmAdPools(): Promise<void> {
+    const allAdTypes = Object.keys(ADTYPE_POOLS) as AdType[];
+    for (const adType of allAdTypes) {
+      const placements = ADTYPE_POOLS[adType];
+      for (const { scene, slot } of placements) {
+        await this.rebuildAdPoolForPlacement(adType, scene, slot);
+      }
+    }
+  }
 }

+ 2 - 1
apps/box-mgnt-api/src/mgnt-backend/feature/ads/ads.module.ts

@@ -1,10 +1,11 @@
 import { Module } from '@nestjs/common';
 import { PrismaModule } from '@box/db/prisma/prisma.module';
+import { CacheSyncModule } from '../../../cache-sync/cache-sync.module';
 import { AdsService } from './ads.service';
 import { AdsController } from './ads.controller';
 
 @Module({
-  imports: [PrismaModule],
+  imports: [PrismaModule, CacheSyncModule],
   providers: [AdsService],
   controllers: [AdsController],
   exports: [AdsService],

+ 29 - 3
apps/box-mgnt-api/src/mgnt-backend/feature/ads/ads.service.ts

@@ -5,12 +5,16 @@ import {
 } from '@nestjs/common';
 import { PrismaClientKnownRequestError } from '@prisma/client/runtime/library';
 import { MongoPrismaService } from '@box/db/prisma/mongo-prisma.service';
+import { CacheSyncService } from '../../../cache-sync/cache-sync.service';
 import { CreateAdsDto, ListAdsDto, UpdateAdsDto } from './ads.dto';
 import { CommonStatus } from '../common/status.enum';
 
 @Injectable()
 export class AdsService {
-  constructor(private readonly mongoPrismaService: MongoPrismaService) {}
+  constructor(
+    private readonly mongoPrismaService: MongoPrismaService,
+    private readonly cacheSyncService: CacheSyncService,
+  ) {}
 
   /**
    * Current epoch time in milliseconds.
@@ -61,7 +65,7 @@ export class AdsService {
 
     const now = this.now();
 
-    return this.mongoPrismaService.ads.create({
+    const ad = await this.mongoPrismaService.ads.create({
       data: {
         channelId: dto.channelId,
         adsModuleId: dto.adsModuleId,
@@ -79,6 +83,11 @@ export class AdsService {
       },
       include: { channel: true, adsModule: true },
     });
+
+    // Auto-schedule cache refresh (per-ad + pool)
+    await this.cacheSyncService.scheduleAdRefresh(ad.id, ad.adsModule.adType);
+
+    return ad;
   }
 
   async update(dto: UpdateAdsDto) {
@@ -109,11 +118,16 @@ export class AdsService {
     }
 
     try {
-      return await this.mongoPrismaService.ads.update({
+      const ad = await this.mongoPrismaService.ads.update({
         where: { id: dto.id },
         data,
         include: { channel: true, adsModule: true },
       });
+
+      // Auto-schedule cache refresh (per-ad + pool)
+      await this.cacheSyncService.scheduleAdRefresh(ad.id, ad.adsModule.adType);
+
+      return ad;
     } catch (e) {
       if (e instanceof PrismaClientKnownRequestError && e.code === 'P2025') {
         throw new NotFoundException('Ads not found');
@@ -183,7 +197,19 @@ export class AdsService {
 
   async remove(id: string) {
     try {
+      // Fetch ad first to get adType for cache invalidation
+      const ad = await this.mongoPrismaService.ads.findUnique({
+        where: { id },
+        include: { adsModule: true },
+      });
+
       await this.mongoPrismaService.ads.delete({ where: { id } });
+
+      // Auto-schedule cache refresh if ad existed
+      if (ad?.adsModule?.adType) {
+        await this.cacheSyncService.scheduleAdRefresh(id, ad.adsModule.adType);
+      }
+
       return { message: 'Deleted' };
     } catch (e) {
       if (e instanceof PrismaClientKnownRequestError && e.code === 'P2025') {

+ 2 - 1
apps/box-mgnt-api/src/mgnt-backend/feature/category/category.module.ts

@@ -1,10 +1,11 @@
 import { Module } from '@nestjs/common';
 import { PrismaModule } from '@box/db/prisma/prisma.module';
+import { CacheSyncModule } from '../../../cache-sync/cache-sync.module';
 import { CategoryService } from './category.service';
 import { CategoryController } from './category.controller';
 
 @Module({
-  imports: [PrismaModule],
+  imports: [PrismaModule, CacheSyncModule],
   providers: [CategoryService],
   controllers: [CategoryController],
   exports: [CategoryService],

+ 21 - 3
apps/box-mgnt-api/src/mgnt-backend/feature/category/category.service.ts

@@ -1,6 +1,7 @@
 import { Injectable, NotFoundException } from '@nestjs/common';
 import { PrismaClientKnownRequestError } from '@prisma/client/runtime/library';
 import { MongoPrismaService } from '@box/db/prisma/mongo-prisma.service';
+import { CacheSyncService } from '../../../cache-sync/cache-sync.service';
 import {
   CreateCategoryDto,
   ListCategoryDto,
@@ -10,7 +11,10 @@ import { CommonStatus } from '../common/status.enum';
 
 @Injectable()
 export class CategoryService {
-  constructor(private readonly mongoPrismaService: MongoPrismaService) {}
+  constructor(
+    private readonly mongoPrismaService: MongoPrismaService,
+    private readonly cacheSyncService: CacheSyncService,
+  ) {}
 
   /**
    * Current epoch time in milliseconds.
@@ -41,7 +45,7 @@ export class CategoryService {
     await this.assertChannelExists(dto.channelId);
     const now = this.now();
 
-    return this.mongoPrismaService.category.create({
+    const category = await this.mongoPrismaService.category.create({
       data: {
         name: dto.name,
         subtitle: dto.subtitle?.trim() ?? null,
@@ -52,6 +56,11 @@ export class CategoryService {
         updateAt: now,
       },
     });
+
+    // Auto-schedule cache refresh
+    await this.cacheSyncService.scheduleCategoryRefreshAll();
+
+    return category;
   }
 
   async update(dto: UpdateCategoryDto) {
@@ -73,10 +82,15 @@ export class CategoryService {
     }
 
     try {
-      return await this.mongoPrismaService.category.update({
+      const category = await this.mongoPrismaService.category.update({
         where: { id: dto.id },
         data,
       });
+
+      // Auto-schedule cache refresh
+      await this.cacheSyncService.scheduleCategoryRefreshAll();
+
+      return category;
     } catch (e) {
       if (e instanceof PrismaClientKnownRequestError && e.code === 'P2025') {
         throw new NotFoundException('Category not found');
@@ -136,6 +150,10 @@ export class CategoryService {
   async remove(id: string) {
     try {
       await this.mongoPrismaService.category.delete({ where: { id } });
+
+      // Auto-schedule cache refresh
+      await this.cacheSyncService.scheduleCategoryRefreshAll();
+
       return { message: 'Deleted' };
     } catch (e) {
       if (e instanceof PrismaClientKnownRequestError && e.code === 'P2025') {

+ 2 - 1
apps/box-mgnt-api/src/mgnt-backend/feature/channel/channel.module.ts

@@ -1,10 +1,11 @@
 import { Module } from '@nestjs/common';
 import { PrismaModule } from '@box/db/prisma/prisma.module';
+import { CacheSyncModule } from '../../../cache-sync/cache-sync.module';
 import { ChannelService } from './channel.service';
 import { ChannelController } from './channel.controller';
 
 @Module({
-  imports: [PrismaModule],
+  imports: [PrismaModule, CacheSyncModule],
   providers: [ChannelService],
   controllers: [ChannelController],
   exports: [ChannelService],

+ 21 - 3
apps/box-mgnt-api/src/mgnt-backend/feature/channel/channel.service.ts

@@ -1,6 +1,7 @@
 import { Injectable, NotFoundException } from '@nestjs/common';
 import { PrismaClientKnownRequestError } from '@prisma/client/runtime/library';
 import { MongoPrismaService } from '@box/db/prisma/mongo-prisma.service';
+import { CacheSyncService } from '../../../cache-sync/cache-sync.service';
 import {
   CreateChannelDto,
   ListChannelDto,
@@ -9,7 +10,10 @@ import {
 
 @Injectable()
 export class ChannelService {
-  constructor(private readonly mongoPrismaService: MongoPrismaService) {}
+  constructor(
+    private readonly mongoPrismaService: MongoPrismaService,
+    private readonly cacheSyncService: CacheSyncService,
+  ) {}
 
   /**
    * Current epoch time in milliseconds.
@@ -31,7 +35,7 @@ export class ChannelService {
   async create(dto: CreateChannelDto) {
     const now = this.now();
 
-    return this.mongoPrismaService.channel.create({
+    const channel = await this.mongoPrismaService.channel.create({
       data: {
         name: dto.name,
         landingUrl: dto.landingUrl,
@@ -44,13 +48,18 @@ export class ChannelService {
         updateAt: now,
       },
     });
+
+    // Auto-schedule cache refresh
+    await this.cacheSyncService.scheduleChannelRefreshAll();
+
+    return channel;
   }
 
   async update(dto: UpdateChannelDto) {
     const now = this.now();
 
     try {
-      return await this.mongoPrismaService.channel.update({
+      const channel = await this.mongoPrismaService.channel.update({
         where: { id: dto.id },
         data: {
           name: dto.name,
@@ -63,6 +72,11 @@ export class ChannelService {
           updateAt: now,
         },
       });
+
+      // Auto-schedule cache refresh
+      await this.cacheSyncService.scheduleChannelRefreshAll();
+
+      return channel;
     } catch (e) {
       if (e instanceof PrismaClientKnownRequestError && e.code === 'P2025') {
         throw new NotFoundException('Channel not found');
@@ -116,6 +130,10 @@ export class ChannelService {
   async remove(id: string) {
     try {
       await this.mongoPrismaService.channel.delete({ where: { id } });
+
+      // Auto-schedule cache refresh
+      await this.cacheSyncService.scheduleChannelRefreshAll();
+
       return { message: 'Deleted' };
     } catch (e) {
       if (e instanceof PrismaClientKnownRequestError && e.code === 'P2025') {

+ 31 - 0
libs/common/src/ads/ad-pool-config.ts

@@ -0,0 +1,31 @@
+// libs/common/src/ads/ad-pool-config.ts
+import { AdType } from '@prisma/mongo/client';
+import type { AdPoolPlacement } from './ad-types';
+
+export const ADTYPE_POOLS: Record<AdType, AdPoolPlacement[]> = {
+  [AdType.STARTUP]: [{ scene: 'home', slot: 'startup' }],
+
+  [AdType.CAROUSEL]: [{ scene: 'home', slot: 'carousel' }],
+
+  [AdType.POPUP_ICON]: [{ scene: 'home', slot: 'middle' }],
+
+  [AdType.POPUP_IMAGE]: [{ scene: 'home', slot: 'middle' }],
+
+  [AdType.POPUP_OFFICIAL]: [{ scene: 'home', slot: 'middle' }],
+
+  [AdType.WATERFALL_ICON]: [{ scene: 'home', slot: 'waterfall' }],
+
+  [AdType.WATERFALL_TEXT]: [{ scene: 'home', slot: 'waterfall' }],
+
+  [AdType.WATERFALL_VIDEO]: [{ scene: 'home', slot: 'waterfall' }],
+
+  [AdType.FLOATING_BOTTOM]: [{ scene: 'home', slot: 'floating' }],
+
+  [AdType.FLOATING_EDGE]: [{ scene: 'home', slot: 'edge' }],
+
+  [AdType.BANNER]: [{ scene: 'home', slot: 'top' }],
+
+  [AdType.PREROLL]: [{ scene: 'video_detail', slot: 'preroll' }],
+
+  [AdType.PAUSE]: [{ scene: 'video_detail', slot: 'pause_overlay' }],
+};

+ 46 - 0
libs/common/src/ads/ad-types.ts

@@ -0,0 +1,46 @@
+// libs/common/src/ads/ad-types.ts
+import { AdType as PrismaAdType } from '@prisma/mongo/client';
+
+/**
+ * Canonical ad type.
+ * This is exactly the same as the Prisma enum in AdsModule.adType.
+ */
+export type AdType = PrismaAdType;
+
+/**
+ * Ad placement scene (业务场景).
+ * You can extend this as you add more screens.
+ */
+export type AdScene = 'home' | 'video_detail' | 'category' | 'search';
+
+/**
+ * Ad placement slot (页面中的位置).
+ * You can add more as needed, just keep it centralized here.
+ */
+export type AdSlot =
+  | 'startup'
+  | 'top'
+  | 'carousel'
+  | 'middle'
+  | 'bottom'
+  | 'floating'
+  | 'edge'
+  | 'waterfall'
+  | 'preroll'
+  | 'pause_overlay';
+
+/**
+ * One placement where ads of a given AdType can appear.
+ */
+export interface AdPoolPlacement {
+  scene: AdScene;
+  slot: AdSlot;
+}
+
+/**
+ * One entry in the Redis ad pool.
+ */
+export interface AdPoolEntry {
+  id: string; // Ads.id (Mongo ObjectId string)
+  weight: number;
+}

+ 12 - 0
libs/common/src/ads/cache-keys.ts

@@ -0,0 +1,12 @@
+// libs/common/src/ads/cache-keys.ts
+import type { AdType, AdScene, AdSlot } from './ad-types';
+
+export const CacheKeys = {
+  appAdPool(scene: AdScene, slot: AdSlot, adType: AdType): string {
+    return `app:adpool:${scene}:${slot}:${adType}`;
+  },
+
+  appAd(id: string): string {
+    return `app:ad:${id}`;
+  },
+} as const;

+ 69 - 1
libs/db/src/redis/redis.service.ts

@@ -71,7 +71,9 @@ export class RedisService {
     value: T,
     ttlSeconds?: number,
   ): Promise<'OK' | null> {
-    const json = JSON.stringify(value);
+    const json = JSON.stringify(value, (_, v) =>
+      typeof v === 'bigint' ? v.toString() : v,
+    );
     return this.set(key, json, ttlSeconds);
   }
 
@@ -97,4 +99,70 @@ export class RedisService {
     if (!keys.length) return 0;
     return this.del(...keys);
   }
+
+  // ─────────────────────────────────────────────
+  // Pipelines & atomic swap helpers
+  // ─────────────────────────────────────────────
+
+  async pipelineSetJson(
+    entries: Array<{ key: string; value: unknown; ttlSeconds?: number }>,
+  ): Promise<void> {
+    const client = this.ensureClient();
+    if (!entries.length) return;
+    const pipeline = client.pipeline();
+    for (const { key, value, ttlSeconds } of entries) {
+      const json = JSON.stringify(value, (_, v) =>
+        typeof v === 'bigint' ? v.toString() : v,
+      );
+      if (ttlSeconds && ttlSeconds > 0) {
+        pipeline.set(key, json, 'EX', ttlSeconds);
+      } else {
+        pipeline.set(key, json);
+      }
+    }
+    await pipeline.exec();
+  }
+
+  /**
+   * Write to temporary keys and atomically swap them into place with RENAME.
+   * This avoids readers seeing a partially-updated state.
+   */
+  async atomicSwapJson(
+    entries: Array<{ key: string; value: unknown; ttlSeconds?: number }>,
+  ): Promise<void> {
+    const client = this.ensureClient();
+    if (!entries.length) return;
+    const tempEntries = entries.map((e) => ({
+      ...e,
+      tempKey: `${e.key}:tmp:${Date.now()}:${Math.random().toString(36).slice(2)}`,
+    }));
+
+    // 1) Write all temp keys
+    const writePipeline = client.pipeline();
+    for (const { tempKey, value, ttlSeconds } of tempEntries as Array<{
+      tempKey: string;
+      value: unknown;
+      ttlSeconds?: number;
+    }>) {
+      const json = JSON.stringify(value, (_, v) =>
+        typeof v === 'bigint' ? v.toString() : v,
+      );
+      if (ttlSeconds && ttlSeconds > 0) {
+        writePipeline.set(tempKey, json, 'EX', ttlSeconds);
+      } else {
+        writePipeline.set(tempKey, json);
+      }
+    }
+    await writePipeline.exec();
+
+    // 2) Atomically swap temp -> target via rename
+    const renamePipeline = client.pipeline();
+    for (const { key, tempKey } of tempEntries as Array<{
+      key: string;
+      tempKey: string;
+    }>) {
+      renamePipeline.rename(tempKey, key);
+    }
+    await renamePipeline.exec();
+  }
 }

+ 1 - 0
package.json

@@ -44,6 +44,7 @@
     "@nestjs/passport": "^10.0.3",
     "@nestjs/platform-express": "^10.4.20",
     "@nestjs/platform-fastify": "^10.3.8",
+    "@nestjs/schedule": "^6.0.1",
     "@nestjs/swagger": "^7.0.0",
     "@prisma/client": "^5.15.0",
     "axios": "^1.6.8",

+ 35 - 0
pnpm-lock.yaml

@@ -56,6 +56,9 @@ importers:
       '@nestjs/platform-fastify':
         specifier: ^10.3.8
         version: 10.4.20(@fastify/static@6.12.0)(@nestjs/common@10.4.20(class-transformer@0.5.1)(class-validator@0.14.2)(reflect-metadata@0.2.2)(rxjs@7.8.2))(@nestjs/core@10.4.20)
+      '@nestjs/schedule':
+        specifier: ^6.0.1
+        version: 6.0.1(@nestjs/common@10.4.20(class-transformer@0.5.1)(class-validator@0.14.2)(reflect-metadata@0.2.2)(rxjs@7.8.2))(@nestjs/core@10.4.20)
       '@nestjs/swagger':
         specifier: ^7.0.0
         version: 7.4.2(@fastify/static@6.12.0)(@nestjs/common@10.4.20(class-transformer@0.5.1)(class-validator@0.14.2)(reflect-metadata@0.2.2)(rxjs@7.8.2))(@nestjs/core@10.4.20)(class-transformer@0.5.1)(class-validator@0.14.2)(reflect-metadata@0.2.2)
@@ -1271,6 +1274,12 @@ packages:
       '@fastify/view':
         optional: true
 
+  '@nestjs/schedule@6.0.1':
+    resolution: {integrity: sha512-v3yO6cSPAoBSSyH67HWnXHzuhPhSNZhRmLY38JvCt2sqY8sPMOODpcU1D79iUMFf7k16DaMEbL4Mgx61ZhiC8Q==}
+    peerDependencies:
+      '@nestjs/common': ^10.0.0 || ^11.0.0
+      '@nestjs/core': ^10.0.0 || ^11.0.0
+
   '@nestjs/schematics@10.2.3':
     resolution: {integrity: sha512-4e8gxaCk7DhBxVUly2PjYL4xC2ifDFexCqq1/u4TtivLGXotVk0wHdYuPYe1tHTHuR1lsOkRbfOCpkdTnigLVg==}
     peerDependencies:
@@ -1799,6 +1808,9 @@ packages:
   '@types/lodash@4.17.20':
     resolution: {integrity: sha512-H3MHACvFUEiujabxhaI/ImO6gUrd8oOurg7LQtS7mbwIXA/cUqWrvBsaeJ23aZEPk1TAYkurjfMbSELfoCXlGA==}
 
+  '@types/luxon@3.7.1':
+    resolution: {integrity: sha512-H3iskjFIAn5SlJU7OuxUmTEpebK6TKB8rxZShDslBMZJ5u9S//KM1sbdAisiSrqwLQncVjnpi2OK2J51h+4lsg==}
+
   '@types/methods@1.1.4':
     resolution: {integrity: sha512-ymXWVrDiCxTBE3+RIrrP533E70eA+9qu7zdWoHuOmGujkYtzf4HQF96b8nwHLqhuf4ykX61IGRIB38CC6/sImQ==}
 
@@ -2677,6 +2689,10 @@ packages:
   create-require@1.1.1:
     resolution: {integrity: sha512-dcKFX3jn0MpIaXjisoRvexIJVEKzaq7z2rZKxf+MSr9TkdmHmsU4m2lcLojrj/FHl8mk5VxMmYA+ftRkP/3oKQ==}
 
+  cron@4.3.3:
+    resolution: {integrity: sha512-B/CJj5yL3sjtlun6RtYHvoSB26EmQ2NUmhq9ZiJSyKIM4K/fqfh9aelDFlIayD2YMeFZqWLi9hHV+c+pq2Djkw==}
+    engines: {node: '>=18.x'}
+
   cross-spawn@5.1.0:
     resolution: {integrity: sha512-pTgQJ5KC0d2hcY8eyL1IzlBPYjTkyH72XRZPnLyKus2mBfNjQs3klqbJU2VILqZryAZUt9JOb3h/mWMy23/f5A==}
 
@@ -3877,6 +3893,10 @@ packages:
   lru-cache@5.1.1:
     resolution: {integrity: sha512-KpNARQA3Iwv+jTA0utUVVbrh+Jlrr1Fv0e56GGzAFOXN7dk/FviaDW8LHmK52DlcH4WP2n6gI8vN1aesBFgo9w==}
 
+  luxon@3.7.2:
+    resolution: {integrity: sha512-vtEhXh/gNjI9Yg1u4jX/0YVPMvxzHuGgCm6tC5kZyb08yjGWGnqAjGJvcXbqQR2P3MyMEFnRbpcdFS6PBcLqew==}
+    engines: {node: '>=12'}
+
   magic-string@0.30.8:
     resolution: {integrity: sha512-ISQTe55T2ao7XtlAStud6qwYPZjE4GK1S/BeVPus4jrq6JuOnQ00YKQC581RWhR122W7msZV263KzVeLoqidyQ==}
     engines: {node: '>=12'}
@@ -6755,6 +6775,12 @@ snapshots:
     optionalDependencies:
       '@fastify/static': 6.12.0
 
+  '@nestjs/schedule@6.0.1(@nestjs/common@10.4.20(class-transformer@0.5.1)(class-validator@0.14.2)(reflect-metadata@0.2.2)(rxjs@7.8.2))(@nestjs/core@10.4.20)':
+    dependencies:
+      '@nestjs/common': 10.4.20(class-transformer@0.5.1)(class-validator@0.14.2)(reflect-metadata@0.2.2)(rxjs@7.8.2)
+      '@nestjs/core': 10.4.20(@nestjs/common@10.4.20(class-transformer@0.5.1)(class-validator@0.14.2)(reflect-metadata@0.2.2)(rxjs@7.8.2))(@nestjs/platform-express@10.4.20)(reflect-metadata@0.2.2)(rxjs@7.8.2)
+      cron: 4.3.3
+
   '@nestjs/schematics@10.2.3(chokidar@3.6.0)(typescript@5.7.2)':
     dependencies:
       '@angular-devkit/core': 17.3.11(chokidar@3.6.0)
@@ -7432,6 +7458,8 @@ snapshots:
 
   '@types/lodash@4.17.20': {}
 
+  '@types/luxon@3.7.1': {}
+
   '@types/methods@1.1.4': {}
 
   '@types/mime@1.3.5': {}
@@ -8394,6 +8422,11 @@ snapshots:
 
   create-require@1.1.1: {}
 
+  cron@4.3.3:
+    dependencies:
+      '@types/luxon': 3.7.1
+      luxon: 3.7.2
+
   cross-spawn@5.1.0:
     dependencies:
       lru-cache: 4.1.5
@@ -9933,6 +9966,8 @@ snapshots:
     dependencies:
       yallist: 3.1.1
 
+  luxon@3.7.2: {}
+
   magic-string@0.30.8:
     dependencies:
       '@jridgewell/sourcemap-codec': 1.5.5

+ 26 - 6
prisma/mongo/schema/ads-module.prisma

@@ -1,11 +1,31 @@
+// Ad types used as Redis keys and AdsModule identifiers
+enum AdType {
+  STARTUP         // 启动页: 启动页(10:21)
+  CAROUSEL        // 轮播: 轮播(2:1)
+
+  POPUP_ICON      // 弹窗-图标: 弹窗-图标(1:1)
+  POPUP_IMAGE     // 弹窗-图片: 弹窗-图片(2:3)
+  POPUP_OFFICIAL  // 弹窗-官方: 弹窗-官方(2:3)
+
+  WATERFALL_ICON  // 瀑布流-图标: 瀑布流-图标(1:1)
+  WATERFALL_TEXT  // 瀑布流-文字: 瀑布流-文字
+  WATERFALL_VIDEO // 瀑布流-视频: 瀑布流-视频(8:5)
+
+  FLOATING_BOTTOM // 悬浮-底部: 悬浮-底部(1:1)
+  FLOATING_EDGE   // 悬浮-边缘: 悬浮-边缘(1:1)
+
+  BANNER          // banner: banner(4:1)
+  PREROLL         // 片头: 片头(8:5)
+  PAUSE           // 暂停: 暂停(2:1)
+}
+
 model AdsModule {
-  id          String    @id @default(auto()) @map("_id") @db.ObjectId
-  adType      String    @unique
-  adsModule   String    @unique     /// 广告模块
-  moduleDesc  String?               /// 模块简介
-  seq         Int       @default(0)
+  id          String   @id @map("_id") @default(auto()) @db.ObjectId
+  adType      AdType   @unique     /// Redis key & module type
+  adsModule   String   @unique     /// 展示名称 (e.g. 启动页 / 轮播 / 弹窗-图标)
+  moduleDesc  String?              /// 模块简介 + 比例 (例如: 启动页(10:21))
+  seq         Int      @default(0)
   ads         Ads[]
 
   @@map("adsModule")
 }
-

+ 2 - 2
prisma/mongo/schema/ads.prisma

@@ -1,7 +1,7 @@
 model Ads {
   id           String     @id @map("_id") @default(auto()) @db.ObjectId
-  channelId    String     @db.ObjectId        // 渠道 ID
-  adsModuleId  String                        // 广告模块 Id (banner/startup/轮播等)
+  channelId    String     @db.ObjectId       // 渠道 ID
+  adsModuleId  String     @db.ObjectId       // 广告模块 Id (banner/startup/轮播等)
   advertiser   String                        // 广告商 (业务上限制 max 20 字符)
   title        String                        // 标题 (业务上限制 max 20 字符)
   adsContent   String?                       // 广告文案 (业务上限制 max 500 字符)