|
|
@@ -24,6 +24,7 @@ import {
|
|
|
// Cache TTL (seconds)
|
|
|
const CHANNEL_CACHE_TTL = 900; // 15 min
|
|
|
const CATEGORY_CACHE_TTL = 900; // 15 min
|
|
|
+const TAG_CACHE_TTL = 900; // 15 min
|
|
|
const AD_CACHE_TTL = 300; // 5 min (more dynamic)
|
|
|
const AD_POOL_TTL = 300; // 5 min
|
|
|
|
|
|
@@ -38,6 +39,7 @@ export class CacheSyncService {
|
|
|
private readonly logger = new Logger(CacheSyncService.name);
|
|
|
private readonly maxAttempts = 5;
|
|
|
private readonly baseBackoffMs = 5000; // initial retry delay
|
|
|
+ private readonly MAX_LAST_ERROR_LENGTH = 500; // Matches DB VARCHAR(500)
|
|
|
|
|
|
private readonly actionHandlers: Partial<
|
|
|
Record<CacheEntityType, (action: CacheSyncAction) => Promise<void>>
|
|
|
@@ -47,6 +49,7 @@ export class CacheSyncService {
|
|
|
[CacheEntityType.AD]: this.handleAdAction.bind(this),
|
|
|
[CacheEntityType.AD_POOL]: this.handleAdPoolAction.bind(this),
|
|
|
[CacheEntityType.VIDEO_LIST]: this.handleVideoListAction.bind(this),
|
|
|
+ [CacheEntityType.TAG]: this.handleTagAction.bind(this),
|
|
|
};
|
|
|
|
|
|
constructor(
|
|
|
@@ -64,6 +67,39 @@ export class CacheSyncService {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
+ * Build a safe error string that fits within database column constraints.
|
|
|
+ * Truncates if necessary to prevent P2000 errors.
|
|
|
+ */
|
|
|
+ private buildLastErrorString(
|
|
|
+ err: unknown,
|
|
|
+ maxLength = this.MAX_LAST_ERROR_LENGTH,
|
|
|
+ ): string {
|
|
|
+ let base = '';
|
|
|
+
|
|
|
+ if (err instanceof Error) {
|
|
|
+ base = `${err.name}: ${err.message}`;
|
|
|
+ if (err.stack) {
|
|
|
+ base += `\n${err.stack}`;
|
|
|
+ }
|
|
|
+ } else if (typeof err === 'string') {
|
|
|
+ base = err;
|
|
|
+ } else {
|
|
|
+ // Fallback stringify
|
|
|
+ try {
|
|
|
+ base = JSON.stringify(err);
|
|
|
+ } catch {
|
|
|
+ base = String(err);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if (base.length > maxLength) {
|
|
|
+ return base.slice(0, maxLength - 20) + '... [truncated]';
|
|
|
+ }
|
|
|
+
|
|
|
+ return base;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
* Enqueue a cache-sync action with optional initial delay.
|
|
|
* Downstream processing relies on attempts/nextAttemptAt for retries.
|
|
|
*/
|
|
|
@@ -220,19 +256,63 @@ export class CacheSyncService {
|
|
|
const updateTime = this.nowBigInt();
|
|
|
const nextAttemptAt = updateTime + BigInt(backoffMs);
|
|
|
|
|
|
- await this.mysqlPrisma.cacheSyncAction.update({
|
|
|
- where: { id: action.id },
|
|
|
- data: {
|
|
|
- status:
|
|
|
- attempts >= this.maxAttempts
|
|
|
- ? CacheStatus.GAVE_UP
|
|
|
- : CacheStatus.PENDING,
|
|
|
- attempts,
|
|
|
- lastError: message,
|
|
|
- nextAttemptAt,
|
|
|
- updatedAt: updateTime,
|
|
|
- },
|
|
|
- });
|
|
|
+ const lastError = this.buildLastErrorString(err);
|
|
|
+
|
|
|
+ try {
|
|
|
+ await this.mysqlPrisma.cacheSyncAction.update({
|
|
|
+ where: { id: action.id },
|
|
|
+ data: {
|
|
|
+ status:
|
|
|
+ attempts >= this.maxAttempts
|
|
|
+ ? CacheStatus.GAVE_UP
|
|
|
+ : CacheStatus.PENDING,
|
|
|
+ attempts,
|
|
|
+ lastError,
|
|
|
+ nextAttemptAt,
|
|
|
+ updatedAt: updateTime,
|
|
|
+ },
|
|
|
+ });
|
|
|
+ } catch (updateErr) {
|
|
|
+ // Handle "value too long" for lastError (P2000) gracefully
|
|
|
+ if (
|
|
|
+ updateErr instanceof MysqlPrisma.PrismaClientKnownRequestError &&
|
|
|
+ updateErr.code === 'P2000' &&
|
|
|
+ updateErr.meta?.column_name === 'lastError'
|
|
|
+ ) {
|
|
|
+ // Try again with a minimal error message
|
|
|
+ const minimalError = (
|
|
|
+ err instanceof Error ? err.message : String(err)
|
|
|
+ ).slice(0, 200);
|
|
|
+
|
|
|
+ try {
|
|
|
+ await this.mysqlPrisma.cacheSyncAction.update({
|
|
|
+ where: { id: action.id },
|
|
|
+ data: {
|
|
|
+ status:
|
|
|
+ attempts >= this.maxAttempts
|
|
|
+ ? CacheStatus.GAVE_UP
|
|
|
+ : CacheStatus.PENDING,
|
|
|
+ attempts,
|
|
|
+ lastError: minimalError,
|
|
|
+ nextAttemptAt,
|
|
|
+ updatedAt: updateTime,
|
|
|
+ },
|
|
|
+ });
|
|
|
+ } catch (secondErr) {
|
|
|
+ // At this point we only log – we don't want our scheduler to die
|
|
|
+ this.logger.error(
|
|
|
+ `Failed to persist minimal lastError for CacheSyncAction id=${action.id}`,
|
|
|
+ { originalError: err, updateErr, secondErr },
|
|
|
+ );
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ // Log but don't rethrow - keep processing other actions
|
|
|
+ this.logger.error(
|
|
|
+ `Failed to update CacheSyncAction id=${action.id} after error`,
|
|
|
+ { originalError: err, updateErr },
|
|
|
+ );
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
if (attempts >= this.maxAttempts) {
|
|
|
this.logger.warn(
|
|
|
@@ -298,17 +378,34 @@ export class CacheSyncService {
|
|
|
case CacheOperation.REFRESH_ALL:
|
|
|
await this.rebuildChannelsAll();
|
|
|
break;
|
|
|
+ case CacheOperation.REFRESH: {
|
|
|
+ const payload = action.payload as CachePayload | null;
|
|
|
+ const channelId = (payload as any)?.channelId as string | undefined;
|
|
|
+ if (channelId) {
|
|
|
+ await this.rebuildChannelWithCategories(channelId);
|
|
|
+ } else {
|
|
|
+ this.logger.warn(
|
|
|
+ `handleChannelAction REFRESH: missing channelId for action id=${action.id}`,
|
|
|
+ );
|
|
|
+ }
|
|
|
+ break;
|
|
|
+ }
|
|
|
case CacheOperation.INVALIDATE: {
|
|
|
const payload = action.payload as CachePayload | null;
|
|
|
const id = (payload as any)?.id as string | number | undefined;
|
|
|
- if (id != null) {
|
|
|
- await this.redis.del(CacheKeys.appChannelById(id));
|
|
|
- this.logger.log(
|
|
|
- `Invalidated channel by id key=${CacheKeys.appChannelById(id)}`,
|
|
|
- );
|
|
|
- } else {
|
|
|
- await this.redis.del(CacheKeys.appChannelAll);
|
|
|
- this.logger.log(`Invalidated ${CacheKeys.appChannelAll}`);
|
|
|
+ try {
|
|
|
+ if (id != null) {
|
|
|
+ await this.redis.del(CacheKeys.appChannelById(id));
|
|
|
+ this.logger.log(
|
|
|
+ `Invalidated channel by id key=${CacheKeys.appChannelById(id)}`,
|
|
|
+ );
|
|
|
+ } else {
|
|
|
+ await this.redis.del(CacheKeys.appChannelAll);
|
|
|
+ this.logger.log(`Invalidated ${CacheKeys.appChannelAll}`);
|
|
|
+ }
|
|
|
+ } catch (err) {
|
|
|
+ this.logger.error('Failed to invalidate channel cache', err);
|
|
|
+ throw err;
|
|
|
}
|
|
|
break;
|
|
|
}
|
|
|
@@ -321,44 +418,49 @@ export class CacheSyncService {
|
|
|
|
|
|
// Made public so checklist service can invoke directly when a key is missing.
|
|
|
async rebuildChannelsAll(): Promise<void> {
|
|
|
- const channels = await this.mongoPrisma.channel.findMany({
|
|
|
- where: {
|
|
|
- // isDeleted: false,
|
|
|
- },
|
|
|
- orderBy: {
|
|
|
- id: 'asc',
|
|
|
- },
|
|
|
- });
|
|
|
-
|
|
|
- const sanitized = channels.map((c) => ({
|
|
|
- id: c.id,
|
|
|
- name: c.name,
|
|
|
- landingUrl: c.landingUrl,
|
|
|
- videoCdn: c.videoCdn ?? null,
|
|
|
- coverCdn: c.coverCdn ?? null,
|
|
|
- clientName: c.clientName ?? null,
|
|
|
- clientNotice: c.clientNotice ?? null,
|
|
|
- remark: c.remark ?? null,
|
|
|
- createAt:
|
|
|
- typeof c.createAt === 'bigint'
|
|
|
- ? Number(c.createAt)
|
|
|
- : (c as any).createAt,
|
|
|
- updateAt:
|
|
|
- typeof c.updateAt === 'bigint'
|
|
|
- ? Number(c.updateAt)
|
|
|
- : (c as any).updateAt,
|
|
|
- }));
|
|
|
+ try {
|
|
|
+ const channels = await this.mongoPrisma.channel.findMany({
|
|
|
+ where: {
|
|
|
+ // isDeleted: false,
|
|
|
+ },
|
|
|
+ orderBy: {
|
|
|
+ id: 'asc',
|
|
|
+ },
|
|
|
+ });
|
|
|
|
|
|
- const start = Date.now();
|
|
|
- await this.redis.setJson(
|
|
|
- CacheKeys.appChannelAll,
|
|
|
- sanitized,
|
|
|
- CHANNEL_CACHE_TTL,
|
|
|
- );
|
|
|
+ const sanitized = channels.map((c) => ({
|
|
|
+ id: c.id,
|
|
|
+ name: c.name,
|
|
|
+ landingUrl: c.landingUrl,
|
|
|
+ videoCdn: c.videoCdn ?? null,
|
|
|
+ coverCdn: c.coverCdn ?? null,
|
|
|
+ clientName: c.clientName ?? null,
|
|
|
+ clientNotice: c.clientNotice ?? null,
|
|
|
+ remark: c.remark ?? null,
|
|
|
+ createAt:
|
|
|
+ typeof c.createAt === 'bigint'
|
|
|
+ ? Number(c.createAt)
|
|
|
+ : (c as any).createAt,
|
|
|
+ updateAt:
|
|
|
+ typeof c.updateAt === 'bigint'
|
|
|
+ ? Number(c.updateAt)
|
|
|
+ : (c as any).updateAt,
|
|
|
+ }));
|
|
|
+
|
|
|
+ const start = Date.now();
|
|
|
+ await this.redis.setJson(
|
|
|
+ CacheKeys.appChannelAll,
|
|
|
+ sanitized,
|
|
|
+ CHANNEL_CACHE_TTL,
|
|
|
+ );
|
|
|
|
|
|
- this.logger.log(
|
|
|
- `Rebuilt ${CacheKeys.appChannelAll} with ${channels.length} item(s), ${Date.now() - start}ms`,
|
|
|
- );
|
|
|
+ this.logger.log(
|
|
|
+ `Rebuilt ${CacheKeys.appChannelAll} with ${channels.length} item(s), ${Date.now() - start}ms`,
|
|
|
+ );
|
|
|
+ } catch (err) {
|
|
|
+ this.logger.error('Failed to rebuild channels:all cache', err);
|
|
|
+ throw err; // Re-throw to trigger retry mechanism
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
// ─────────────────────────────────────────────
|
|
|
@@ -370,17 +472,41 @@ export class CacheSyncService {
|
|
|
case CacheOperation.REFRESH_ALL:
|
|
|
await this.rebuildCategoriesAll();
|
|
|
break;
|
|
|
+ case CacheOperation.REFRESH: {
|
|
|
+ const payload = action.payload as CachePayload | null;
|
|
|
+ const categoryId = (payload as any)?.categoryId as string | undefined;
|
|
|
+ if (categoryId) {
|
|
|
+ await this.rebuildCategoryWithTags(categoryId);
|
|
|
+ // Load category to get channelId for channel-with-categories rebuild
|
|
|
+ const category = await this.mongoPrisma.category.findUnique({
|
|
|
+ where: { id: categoryId },
|
|
|
+ });
|
|
|
+ if (category) {
|
|
|
+ await this.rebuildChannelWithCategories(category.channelId);
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ this.logger.warn(
|
|
|
+ `handleCategoryAction REFRESH: missing categoryId for action id=${action.id}`,
|
|
|
+ );
|
|
|
+ }
|
|
|
+ break;
|
|
|
+ }
|
|
|
case CacheOperation.INVALIDATE: {
|
|
|
const payload = action.payload as CachePayload | null;
|
|
|
const id = (payload as any)?.id as string | number | undefined;
|
|
|
- if (id != null) {
|
|
|
- await this.redis.del(CacheKeys.appCategoryById(id));
|
|
|
- this.logger.log(
|
|
|
- `Invalidated category by id key=${CacheKeys.appCategoryById(id)}`,
|
|
|
- );
|
|
|
- } else {
|
|
|
- await this.redis.del(CacheKeys.appCategoryAll);
|
|
|
- this.logger.log(`Invalidated ${CacheKeys.appCategoryAll}`);
|
|
|
+ try {
|
|
|
+ if (id != null) {
|
|
|
+ await this.redis.del(CacheKeys.appCategoryById(id));
|
|
|
+ this.logger.log(
|
|
|
+ `Invalidated category by id key=${CacheKeys.appCategoryById(id)}`,
|
|
|
+ );
|
|
|
+ } else {
|
|
|
+ await this.redis.del(CacheKeys.appCategoryAll);
|
|
|
+ this.logger.log(`Invalidated ${CacheKeys.appCategoryAll}`);
|
|
|
+ }
|
|
|
+ } catch (err) {
|
|
|
+ this.logger.error('Failed to invalidate category cache', err);
|
|
|
+ throw err;
|
|
|
}
|
|
|
break;
|
|
|
}
|
|
|
@@ -393,43 +519,48 @@ export class CacheSyncService {
|
|
|
|
|
|
// Made public so checklist service can invoke directly when a key is missing.
|
|
|
async rebuildCategoriesAll(): Promise<void> {
|
|
|
- const categories = await this.mongoPrisma.category.findMany({
|
|
|
- where: {
|
|
|
- // isDeleted: false,
|
|
|
- status: 1, // only active categories
|
|
|
- },
|
|
|
- orderBy: {
|
|
|
- seq: 'asc',
|
|
|
- },
|
|
|
- });
|
|
|
-
|
|
|
- const sanitized = categories.map((c) => ({
|
|
|
- id: c.id,
|
|
|
- name: c.name,
|
|
|
- subtitle: c.subtitle ?? null,
|
|
|
- channelId: c.channelId,
|
|
|
- seq: c.seq,
|
|
|
- status: c.status,
|
|
|
- createAt:
|
|
|
- typeof c.createAt === 'bigint'
|
|
|
- ? Number(c.createAt)
|
|
|
- : (c as any).createAt,
|
|
|
- updateAt:
|
|
|
- typeof c.updateAt === 'bigint'
|
|
|
- ? Number(c.updateAt)
|
|
|
- : (c as any).updateAt,
|
|
|
- }));
|
|
|
+ try {
|
|
|
+ const categories = await this.mongoPrisma.category.findMany({
|
|
|
+ where: {
|
|
|
+ // isDeleted: false,
|
|
|
+ status: 1, // only active categories
|
|
|
+ },
|
|
|
+ orderBy: {
|
|
|
+ seq: 'asc',
|
|
|
+ },
|
|
|
+ });
|
|
|
|
|
|
- const start = Date.now();
|
|
|
- await this.redis.setJson(
|
|
|
- CacheKeys.appCategoryAll,
|
|
|
- sanitized,
|
|
|
- CATEGORY_CACHE_TTL,
|
|
|
- );
|
|
|
+ const sanitized = categories.map((c) => ({
|
|
|
+ id: c.id,
|
|
|
+ name: c.name,
|
|
|
+ subtitle: c.subtitle ?? null,
|
|
|
+ channelId: c.channelId,
|
|
|
+ seq: c.seq,
|
|
|
+ status: c.status,
|
|
|
+ createAt:
|
|
|
+ typeof c.createAt === 'bigint'
|
|
|
+ ? Number(c.createAt)
|
|
|
+ : (c as any).createAt,
|
|
|
+ updateAt:
|
|
|
+ typeof c.updateAt === 'bigint'
|
|
|
+ ? Number(c.updateAt)
|
|
|
+ : (c as any).updateAt,
|
|
|
+ }));
|
|
|
+
|
|
|
+ const start = Date.now();
|
|
|
+ await this.redis.setJson(
|
|
|
+ CacheKeys.appCategoryAll,
|
|
|
+ sanitized,
|
|
|
+ CATEGORY_CACHE_TTL,
|
|
|
+ );
|
|
|
|
|
|
- this.logger.log(
|
|
|
- `Rebuilt ${CacheKeys.appCategoryAll} with ${categories.length} item(s), ${Date.now() - start}ms`,
|
|
|
- );
|
|
|
+ this.logger.log(
|
|
|
+ `Rebuilt ${CacheKeys.appCategoryAll} with ${categories.length} item(s), ${Date.now() - start}ms`,
|
|
|
+ );
|
|
|
+ } catch (err) {
|
|
|
+ this.logger.error('Failed to rebuild categories:all cache', err);
|
|
|
+ throw err; // Re-throw to trigger retry mechanism
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
// ─────────────────────────────────────────────
|
|
|
@@ -451,9 +582,17 @@ export class CacheSyncService {
|
|
|
|
|
|
switch (action.operation as CacheOperation) {
|
|
|
case CacheOperation.INVALIDATE: {
|
|
|
- const key = CacheKeys.appAdById(adId);
|
|
|
- await this.redis.del(key);
|
|
|
- this.logger.log(`Invalidated per-ad cache key=${key}`);
|
|
|
+ try {
|
|
|
+ const key = CacheKeys.appAdById(adId);
|
|
|
+ await this.redis.del(key);
|
|
|
+ this.logger.log(`Invalidated per-ad cache key=${key}`);
|
|
|
+ } catch (err) {
|
|
|
+ this.logger.error(
|
|
|
+ `Failed to invalidate ad cache for adId=${adId}`,
|
|
|
+ err,
|
|
|
+ );
|
|
|
+ throw err;
|
|
|
+ }
|
|
|
break;
|
|
|
}
|
|
|
case CacheOperation.REFRESH:
|
|
|
@@ -486,10 +625,17 @@ export class CacheSyncService {
|
|
|
|
|
|
if (!ad) {
|
|
|
// Ad no longer exists → ensure cache is cleared
|
|
|
- await this.redis.del(cacheKey);
|
|
|
- this.logger.log(
|
|
|
- `rebuildSingleAdCache: ad not found, removed cache key=${cacheKey}`,
|
|
|
- );
|
|
|
+ try {
|
|
|
+ await this.redis.del(cacheKey);
|
|
|
+ this.logger.log(
|
|
|
+ `rebuildSingleAdCache: ad not found, removed cache key=${cacheKey}`,
|
|
|
+ );
|
|
|
+ } catch (err) {
|
|
|
+ this.logger.error(
|
|
|
+ `Failed to delete Redis key ${cacheKey} for missing ad`,
|
|
|
+ err,
|
|
|
+ );
|
|
|
+ }
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
@@ -503,10 +649,17 @@ export class CacheSyncService {
|
|
|
(ad.expiryDt === BigInt(0) || ad.expiryDt >= now);
|
|
|
|
|
|
if (!isActive) {
|
|
|
- await this.redis.del(cacheKey);
|
|
|
- this.logger.log(
|
|
|
- `rebuildSingleAdCache: adId=${adId} is not active (status/time window), removed cache key=${cacheKey}`,
|
|
|
- );
|
|
|
+ try {
|
|
|
+ await this.redis.del(cacheKey);
|
|
|
+ this.logger.log(
|
|
|
+ `rebuildSingleAdCache: adId=${adId} is not active (status/time window), removed cache key=${cacheKey}`,
|
|
|
+ );
|
|
|
+ } catch (err) {
|
|
|
+ this.logger.error(
|
|
|
+ `Failed to delete Redis key ${cacheKey} for inactive ad`,
|
|
|
+ err,
|
|
|
+ );
|
|
|
+ }
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
@@ -523,11 +676,15 @@ export class CacheSyncService {
|
|
|
adType: ad.adsModule?.adType ?? adType ?? null,
|
|
|
};
|
|
|
|
|
|
- await this.redis.setJson(cacheKey, cachedAd, AD_CACHE_TTL);
|
|
|
-
|
|
|
- this.logger.log(
|
|
|
- `rebuildSingleAdCache: updated per-ad cache for adId=${adId}, key=${cacheKey}`,
|
|
|
- );
|
|
|
+ try {
|
|
|
+ await this.redis.setJson(cacheKey, cachedAd, AD_CACHE_TTL);
|
|
|
+ this.logger.log(
|
|
|
+ `rebuildSingleAdCache: updated per-ad cache for adId=${adId}, key=${cacheKey}`,
|
|
|
+ );
|
|
|
+ } catch (err) {
|
|
|
+ this.logger.error(`Failed to set Redis cache for ad adId=${adId}`, err);
|
|
|
+ throw err;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
private async handleAdPoolAction(action: CacheSyncAction): Promise<void> {
|
|
|
@@ -552,12 +709,21 @@ export class CacheSyncService {
|
|
|
|
|
|
switch (action.operation as CacheOperation) {
|
|
|
case CacheOperation.INVALIDATE: {
|
|
|
- // remove all pools for this adType
|
|
|
- const pattern = `app:adpool:*:*:${adType}`;
|
|
|
- const deleted = await this.redis.deleteByPattern(pattern);
|
|
|
- this.logger.log(
|
|
|
- `Invalidated ${deleted} pool key(s) for adType=${adType} using pattern=${pattern}`,
|
|
|
- );
|
|
|
+ try {
|
|
|
+ // remove all pools for this adType
|
|
|
+ // Pattern: app:adpool:*:*:<adType>
|
|
|
+ const pattern = `app:adpool:*:*:${adType}`;
|
|
|
+ const deleted = await this.redis.deleteByPattern(pattern);
|
|
|
+ this.logger.log(
|
|
|
+ `Invalidated ${deleted} pool key(s) for adType=${adType} using pattern=${pattern}`,
|
|
|
+ );
|
|
|
+ } catch (err) {
|
|
|
+ this.logger.error(
|
|
|
+ `Failed to invalidate ad pools for adType=${adType}`,
|
|
|
+ err,
|
|
|
+ );
|
|
|
+ throw err;
|
|
|
+ }
|
|
|
break;
|
|
|
}
|
|
|
case CacheOperation.REBUILD_POOL:
|
|
|
@@ -588,36 +754,44 @@ export class CacheSyncService {
|
|
|
scene: AdScene,
|
|
|
slot: AdSlot,
|
|
|
): Promise<void> {
|
|
|
- const now = this.nowBigInt();
|
|
|
-
|
|
|
- const ads = await this.mongoPrisma.ads.findMany({
|
|
|
- where: {
|
|
|
- status: 1,
|
|
|
- startDt: { lte: now },
|
|
|
- OR: [{ expiryDt: BigInt(0) }, { expiryDt: { gte: now } }],
|
|
|
- adsModule: {
|
|
|
- is: { adType },
|
|
|
+ try {
|
|
|
+ const now = this.nowBigInt();
|
|
|
+
|
|
|
+ const ads = await this.mongoPrisma.ads.findMany({
|
|
|
+ where: {
|
|
|
+ status: 1,
|
|
|
+ startDt: { lte: now },
|
|
|
+ OR: [{ expiryDt: BigInt(0) }, { expiryDt: { gte: now } }],
|
|
|
+ adsModule: {
|
|
|
+ is: { adType },
|
|
|
+ },
|
|
|
},
|
|
|
- },
|
|
|
- orderBy: { seq: 'asc' },
|
|
|
- });
|
|
|
+ orderBy: { seq: 'asc' },
|
|
|
+ });
|
|
|
|
|
|
- const poolEntries: AdPoolEntry[] = ads.map((ad) => ({
|
|
|
- id: ad.id,
|
|
|
- weight: 1,
|
|
|
- }));
|
|
|
+ const poolEntries: AdPoolEntry[] = ads.map((ad) => ({
|
|
|
+ id: ad.id,
|
|
|
+ weight: 1,
|
|
|
+ }));
|
|
|
|
|
|
- const key = CacheKeys.appAdPool(scene, slot, adType);
|
|
|
+ const key = CacheKeys.appAdPool(scene, slot, adType);
|
|
|
|
|
|
- // Atomic swap to avoid partial-read windows
|
|
|
- const start = Date.now();
|
|
|
- await this.redis.atomicSwapJson([
|
|
|
- { key, value: poolEntries, ttlSeconds: AD_POOL_TTL },
|
|
|
- ]);
|
|
|
+ // Atomic swap to avoid partial-read windows
|
|
|
+ const start = Date.now();
|
|
|
+ await this.redis.atomicSwapJson([
|
|
|
+ { key, value: poolEntries, ttlSeconds: AD_POOL_TTL },
|
|
|
+ ]);
|
|
|
|
|
|
- this.logger.log(
|
|
|
- `Rebuilt ad pool ${key} with ${poolEntries.length} ad(s) for adType=${adType}, scene=${scene}, slot=${slot}, ${Date.now() - start}ms`,
|
|
|
- );
|
|
|
+ this.logger.log(
|
|
|
+ `Rebuilt ad pool ${key} with ${poolEntries.length} ad(s) for adType=${adType}, scene=${scene}, slot=${slot}, ${Date.now() - start}ms`,
|
|
|
+ );
|
|
|
+ } catch (err) {
|
|
|
+ this.logger.error(
|
|
|
+ `Failed to rebuild ad pool for adType=${adType}, scene=${scene}, slot=${slot}`,
|
|
|
+ err,
|
|
|
+ );
|
|
|
+ throw err;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
// ─────────────────────────────────────────────
|
|
|
@@ -633,6 +807,263 @@ export class CacheSyncService {
|
|
|
}
|
|
|
|
|
|
// ─────────────────────────────────────────────
|
|
|
+ // TAGS
|
|
|
+ // ─────────────────────────────────────────────
|
|
|
+
|
|
|
+ private async handleTagAction(action: CacheSyncAction): Promise<void> {
|
|
|
+ switch (action.operation as CacheOperation) {
|
|
|
+ case CacheOperation.REFRESH_ALL:
|
|
|
+ await this.rebuildTagAll();
|
|
|
+ break;
|
|
|
+ case CacheOperation.REFRESH: {
|
|
|
+ const payload = action.payload as CachePayload | null;
|
|
|
+ const categoryId = action.entityId || (payload as any)?.categoryId;
|
|
|
+ if (categoryId && categoryId !== 'null') {
|
|
|
+ await this.rebuildCategoryWithTags(categoryId);
|
|
|
+ } else {
|
|
|
+ this.logger.warn(
|
|
|
+ `handleTagAction REFRESH: missing categoryId for action id=${action.id}`,
|
|
|
+ );
|
|
|
+ }
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ case CacheOperation.INVALIDATE: {
|
|
|
+ const payload = action.payload as CachePayload | null;
|
|
|
+ const categoryId = action.entityId || (payload as any)?.categoryId;
|
|
|
+ if (categoryId && categoryId !== 'null') {
|
|
|
+ try {
|
|
|
+ await this.redis.del(CacheKeys.appCategoryWithTags(categoryId));
|
|
|
+ this.logger.log(
|
|
|
+ `Invalidated category with tags key=${CacheKeys.appCategoryWithTags(categoryId)}`,
|
|
|
+ );
|
|
|
+ } catch (err) {
|
|
|
+ this.logger.error(
|
|
|
+ `Failed to invalidate category with tags cache for categoryId=${categoryId}`,
|
|
|
+ err,
|
|
|
+ );
|
|
|
+ throw err;
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ this.logger.warn(
|
|
|
+ `handleTagAction INVALIDATE: missing categoryId for action id=${action.id}`,
|
|
|
+ );
|
|
|
+ }
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ default:
|
|
|
+ this.logger.warn(
|
|
|
+ `Unsupported TAG operation for action id=${action.id}: ${action.operation}`,
|
|
|
+ );
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Rebuild channel with its categories tree for a specific channel.
|
|
|
+ */
|
|
|
+ async rebuildChannelWithCategories(channelId: string): Promise<void> {
|
|
|
+ const cacheKey = CacheKeys.appChannelWithCategories(channelId);
|
|
|
+
|
|
|
+ try {
|
|
|
+ const channel = await this.mongoPrisma.channel.findUnique({
|
|
|
+ where: { id: channelId },
|
|
|
+ });
|
|
|
+
|
|
|
+ if (!channel) {
|
|
|
+ try {
|
|
|
+ await this.redis.del(cacheKey);
|
|
|
+ this.logger.warn(
|
|
|
+ `rebuildChannelWithCategories: channel not found, removed cache key=${cacheKey}`,
|
|
|
+ );
|
|
|
+ } catch (redisErr) {
|
|
|
+ this.logger.error(`Failed to delete Redis key ${cacheKey}`, redisErr);
|
|
|
+ }
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ const categories = await this.mongoPrisma.category.findMany({
|
|
|
+ where: { channelId: channel.id, status: 1 },
|
|
|
+ orderBy: [{ seq: 'asc' }, { name: 'asc' }],
|
|
|
+ });
|
|
|
+
|
|
|
+ const channelLite = {
|
|
|
+ id: channel.id,
|
|
|
+ name: channel.name,
|
|
|
+ landingUrl: channel.landingUrl,
|
|
|
+ videoCdn: channel.videoCdn ?? null,
|
|
|
+ coverCdn: channel.coverCdn ?? null,
|
|
|
+ clientName: channel.clientName ?? null,
|
|
|
+ clientNotice: channel.clientNotice ?? null,
|
|
|
+ createAt:
|
|
|
+ typeof channel.createAt === 'bigint'
|
|
|
+ ? Number(channel.createAt)
|
|
|
+ : (channel as any).createAt,
|
|
|
+ updateAt:
|
|
|
+ typeof channel.updateAt === 'bigint'
|
|
|
+ ? Number(channel.updateAt)
|
|
|
+ : (channel as any).updateAt,
|
|
|
+ };
|
|
|
+
|
|
|
+ const categoryLites = categories.map((c) => ({
|
|
|
+ id: c.id,
|
|
|
+ name: c.name,
|
|
|
+ subtitle: c.subtitle ?? null,
|
|
|
+ channelId: c.channelId,
|
|
|
+ seq: c.seq,
|
|
|
+ status: c.status,
|
|
|
+ createAt:
|
|
|
+ typeof c.createAt === 'bigint'
|
|
|
+ ? Number(c.createAt)
|
|
|
+ : (c as any).createAt,
|
|
|
+ updateAt:
|
|
|
+ typeof c.updateAt === 'bigint'
|
|
|
+ ? Number(c.updateAt)
|
|
|
+ : (c as any).updateAt,
|
|
|
+ }));
|
|
|
+
|
|
|
+ const payload = {
|
|
|
+ channel: channelLite,
|
|
|
+ categories: categoryLites,
|
|
|
+ schemaVersion: 1,
|
|
|
+ updatedAt: Date.now(),
|
|
|
+ };
|
|
|
+
|
|
|
+ const start = Date.now();
|
|
|
+ await this.redis.setJson(cacheKey, payload, CATEGORY_CACHE_TTL);
|
|
|
+
|
|
|
+ this.logger.log(
|
|
|
+ `Rebuilt ${cacheKey} with ${categories.length} category(ies), ${Date.now() - start}ms`,
|
|
|
+ );
|
|
|
+ } catch (err) {
|
|
|
+ this.logger.error(
|
|
|
+ `Failed to rebuild channel with categories for channelId=${channelId}`,
|
|
|
+ err,
|
|
|
+ );
|
|
|
+ throw err; // Re-throw to trigger retry mechanism
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Rebuild category with its tags tree for a specific category.
|
|
|
+ */
|
|
|
+ async rebuildCategoryWithTags(categoryId: string): Promise<void> {
|
|
|
+ // Validate categoryId to prevent 'null' string or invalid ObjectID
|
|
|
+ if (!categoryId || categoryId === 'null' || categoryId === 'undefined') {
|
|
|
+ this.logger.warn(
|
|
|
+ `rebuildCategoryWithTags: invalid categoryId="${categoryId}"`,
|
|
|
+ );
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ const cacheKey = CacheKeys.appCategoryWithTags(categoryId);
|
|
|
+
|
|
|
+ try {
|
|
|
+ const category = await this.mongoPrisma.category.findUnique({
|
|
|
+ where: { id: categoryId },
|
|
|
+ });
|
|
|
+
|
|
|
+ if (!category) {
|
|
|
+ try {
|
|
|
+ await this.redis.del(cacheKey);
|
|
|
+ this.logger.warn(
|
|
|
+ `rebuildCategoryWithTags: category not found, removed cache key=${cacheKey}`,
|
|
|
+ );
|
|
|
+ } catch (redisErr) {
|
|
|
+ this.logger.error(`Failed to delete Redis key ${cacheKey}`, redisErr);
|
|
|
+ }
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ const tags = await this.mongoPrisma.tag.findMany({
|
|
|
+ where: { categoryId, status: 1 },
|
|
|
+ orderBy: [{ seq: 'asc' }, { name: 'asc' }],
|
|
|
+ });
|
|
|
+
|
|
|
+ const payload = {
|
|
|
+ category: {
|
|
|
+ id: category.id,
|
|
|
+ name: category.name,
|
|
|
+ subtitle: category.subtitle ?? null,
|
|
|
+ channelId: category.channelId,
|
|
|
+ seq: category.seq,
|
|
|
+ status: category.status,
|
|
|
+ createAt:
|
|
|
+ typeof category.createAt === 'bigint'
|
|
|
+ ? Number(category.createAt)
|
|
|
+ : (category as any).createAt,
|
|
|
+ updateAt:
|
|
|
+ typeof category.updateAt === 'bigint'
|
|
|
+ ? Number(category.updateAt)
|
|
|
+ : (category as any).updateAt,
|
|
|
+ },
|
|
|
+ tags: tags.map((t) => ({
|
|
|
+ id: t.id,
|
|
|
+ name: t.name,
|
|
|
+ channelId: t.channelId,
|
|
|
+ categoryId: t.categoryId,
|
|
|
+ seq: t.seq,
|
|
|
+ status: t.status,
|
|
|
+ createAt:
|
|
|
+ typeof t.createAt === 'bigint'
|
|
|
+ ? Number(t.createAt)
|
|
|
+ : (t as any).createAt,
|
|
|
+ updateAt:
|
|
|
+ typeof t.updateAt === 'bigint'
|
|
|
+ ? Number(t.updateAt)
|
|
|
+ : (t as any).updateAt,
|
|
|
+ })),
|
|
|
+ schemaVersion: 1,
|
|
|
+ updatedAt: Date.now(),
|
|
|
+ };
|
|
|
+
|
|
|
+ const start = Date.now();
|
|
|
+ await this.redis.setJson(cacheKey, payload, TAG_CACHE_TTL);
|
|
|
+
|
|
|
+ this.logger.log(
|
|
|
+ `Rebuilt ${cacheKey} with category and ${tags.length} tag(s), ${Date.now() - start}ms`,
|
|
|
+ );
|
|
|
+ } catch (err) {
|
|
|
+ this.logger.error(
|
|
|
+ `Failed to rebuild category with tags for categoryId=${categoryId}`,
|
|
|
+ err,
|
|
|
+ );
|
|
|
+ throw err; // Re-throw to trigger retry mechanism
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Rebuild global tag:all suggestion pool.
|
|
|
+ */
|
|
|
+ async rebuildTagAll(): Promise<void> {
|
|
|
+ try {
|
|
|
+ const tags = await this.mongoPrisma.tag.findMany({
|
|
|
+ where: { status: 1 },
|
|
|
+ orderBy: [{ name: 'asc' }],
|
|
|
+ });
|
|
|
+
|
|
|
+ const payload = {
|
|
|
+ tags: tags.map((t) => ({
|
|
|
+ id: t.id,
|
|
|
+ name: t.name,
|
|
|
+ channelId: t.channelId,
|
|
|
+ categoryId: t.categoryId,
|
|
|
+ })),
|
|
|
+ schemaVersion: 1,
|
|
|
+ updatedAt: Date.now(),
|
|
|
+ };
|
|
|
+
|
|
|
+ const start = Date.now();
|
|
|
+ await this.redis.setJson(CacheKeys.appTagAll, payload, TAG_CACHE_TTL);
|
|
|
+
|
|
|
+ this.logger.log(
|
|
|
+ `Rebuilt ${CacheKeys.appTagAll} with ${tags.length} tag(s), ${Date.now() - start}ms`,
|
|
|
+ );
|
|
|
+ } catch (err) {
|
|
|
+ this.logger.error('Failed to rebuild tag:all cache', err);
|
|
|
+ throw err; // Re-throw to trigger retry mechanism
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // ─────────────────────────────────────────────
|
|
|
// Cache warming
|
|
|
// ─────────────────────────────────────────────
|
|
|
|
|
|
@@ -647,6 +1078,7 @@ export class CacheSyncService {
|
|
|
await Promise.all([
|
|
|
this.rebuildChannelsAll(),
|
|
|
this.rebuildCategoriesAll(),
|
|
|
+ this.rebuildTagAll(),
|
|
|
this.warmAdPools(),
|
|
|
]);
|
|
|
|