|
|
@@ -27,9 +27,17 @@ const CATEGORY_CACHE_TTL = 900; // 15 min
|
|
|
const AD_CACHE_TTL = 300; // 5 min (more dynamic)
|
|
|
const AD_POOL_TTL = 300; // 5 min
|
|
|
|
|
|
+/**
|
|
|
+ * CacheSyncService
|
|
|
+ * - Writes durable CacheSyncAction records in MySQL.
|
|
|
+ * - Rebuilds Redis caches for channels/categories/ads/pools consumed by app-api.
|
|
|
+ * - Retries transient failures with backoff using attempts + nextAttemptAt.
|
|
|
+ */
|
|
|
@Injectable()
|
|
|
export class CacheSyncService {
|
|
|
private readonly logger = new Logger(CacheSyncService.name);
|
|
|
+ private readonly maxAttempts = 5;
|
|
|
+ private readonly baseBackoffMs = 5000; // initial retry delay
|
|
|
|
|
|
private readonly actionHandlers: Partial<
|
|
|
Record<CacheEntityType, (action: CacheSyncAction) => Promise<void>>
|
|
|
@@ -56,7 +64,8 @@ export class CacheSyncService {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Core generic scheduler.
|
|
|
+ * Enqueue a cache-sync action with optional initial delay.
|
|
|
+ * Downstream processing relies on attempts/nextAttemptAt for retries.
|
|
|
*/
|
|
|
async scheduleAction(params: {
|
|
|
entityType: CacheEntityType;
|
|
|
@@ -169,6 +178,11 @@ export class CacheSyncService {
|
|
|
await this.processPendingOnce(50);
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Pull a batch of pending actions (whose nextAttemptAt <= now) and process
|
|
|
+ * them with retry/backoff. Keeps PENDING actions in the queue until either
|
|
|
+ * success or we exhaust maxAttempts (then we mark GAVE_UP).
|
|
|
+ */
|
|
|
async processPendingOnce(limit = 20): Promise<void> {
|
|
|
const now = this.nowBigInt();
|
|
|
|
|
|
@@ -198,12 +212,11 @@ export class CacheSyncService {
|
|
|
err instanceof Error ? err.message : String(err ?? 'Unknown error');
|
|
|
|
|
|
this.logger.error(
|
|
|
- `Error processing CacheSyncAction id=${action.id}: ${message}`,
|
|
|
+ `Error processing CacheSyncAction id=${action.id} (attempt ${action.attempts + 1}/${this.maxAttempts}): ${message}`,
|
|
|
);
|
|
|
|
|
|
const attempts = action.attempts + 1;
|
|
|
- const maxAttempts = 5;
|
|
|
- const backoffMs = Math.min(60000, 5000 * attempts); // up to 60s
|
|
|
+ const backoffMs = this.calculateBackoffMs(attempts);
|
|
|
const updateTime = this.nowBigInt();
|
|
|
const nextAttemptAt = updateTime + BigInt(backoffMs);
|
|
|
|
|
|
@@ -211,7 +224,7 @@ export class CacheSyncService {
|
|
|
where: { id: action.id },
|
|
|
data: {
|
|
|
status:
|
|
|
- attempts >= maxAttempts
|
|
|
+ attempts >= this.maxAttempts
|
|
|
? CacheStatus.GAVE_UP
|
|
|
: CacheStatus.PENDING,
|
|
|
attempts,
|
|
|
@@ -220,6 +233,16 @@ export class CacheSyncService {
|
|
|
updatedAt: updateTime,
|
|
|
},
|
|
|
});
|
|
|
+
|
|
|
+ if (attempts >= this.maxAttempts) {
|
|
|
+ this.logger.warn(
|
|
|
+ `CacheSyncAction id=${action.id} reached max attempts (${this.maxAttempts}) and will not be retried.`,
|
|
|
+ );
|
|
|
+ } else {
|
|
|
+ this.logger.debug(
|
|
|
+ `CacheSyncAction id=${action.id} scheduled to retry in ${backoffMs}ms (nextAttemptAt=${nextAttemptAt}).`,
|
|
|
+ );
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
@@ -245,6 +268,15 @@ export class CacheSyncService {
|
|
|
);
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Exponential backoff with light jitter so multiple workers don't retry in
|
|
|
+ * lockstep. Capped at 60s to avoid unbounded delays.
|
|
|
+ */
|
|
|
+ private calculateBackoffMs(attempts: number): number {
|
|
|
+ const jitter = Math.floor(Math.random() * 500);
|
|
|
+ return Math.min(60000, this.baseBackoffMs * attempts + jitter);
|
|
|
+ }
|
|
|
+
|
|
|
private async markActionSuccess(action: CacheSyncAction): Promise<void> {
|
|
|
await this.mysqlPrisma.cacheSyncAction.update({
|
|
|
where: { id: action.id },
|
|
|
@@ -287,7 +319,8 @@ export class CacheSyncService {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private async rebuildChannelsAll(): Promise<void> {
|
|
|
+ // Made public so checklist service can invoke directly when a key is missing.
|
|
|
+ async rebuildChannelsAll(): Promise<void> {
|
|
|
const channels = await this.mongoPrisma.channel.findMany({
|
|
|
where: {
|
|
|
// isDeleted: false,
|
|
|
@@ -358,7 +391,8 @@ export class CacheSyncService {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private async rebuildCategoriesAll(): Promise<void> {
|
|
|
+ // Made public so checklist service can invoke directly when a key is missing.
|
|
|
+ async rebuildCategoriesAll(): Promise<void> {
|
|
|
const categories = await this.mongoPrisma.category.findMany({
|
|
|
where: {
|
|
|
// isDeleted: false,
|
|
|
@@ -432,6 +466,7 @@ export class CacheSyncService {
|
|
|
);
|
|
|
}
|
|
|
|
|
|
+ // Still private, only used internally for per-ad refresh logic.
|
|
|
private async rebuildSingleAdCache(
|
|
|
adId: string,
|
|
|
adType?: string,
|
|
|
@@ -541,7 +576,13 @@ export class CacheSyncService {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private async rebuildAdPoolForPlacement(
|
|
|
+ /**
|
|
|
+ * Rebuild a single ad pool for a placement (scene + slot).
|
|
|
+ * Reads active ads for the adType and atomically swaps the cache key to avoid
|
|
|
+ * partially-written pools being read by app-api.
|
|
|
+ */
|
|
|
+ // Made public so checklist service can invoke targeted pool rebuild.
|
|
|
+ async rebuildAdPoolForPlacement(
|
|
|
adType: AdType,
|
|
|
scene: AdScene,
|
|
|
slot: AdSlot,
|