|
@@ -1,8 +1,9 @@
|
|
|
import { Injectable, Logger } from '@nestjs/common';
|
|
import { Injectable, Logger } from '@nestjs/common';
|
|
|
-import { Prisma } from '@prisma/mysql/client'; // adjust if your generated types are in a different package
|
|
|
|
|
|
|
+import { Prisma as MysqlPrisma, CacheSyncAction } from '@prisma/mysql/client'; // adjust if your generated types are in a different package
|
|
|
import { RedisService } from '../redis/redis.service';
|
|
import { RedisService } from '../redis/redis.service';
|
|
|
// ⬇️ Adjust this import to your actual db lib (e.g., '@box/db')
|
|
// ⬇️ Adjust this import to your actual db lib (e.g., '@box/db')
|
|
|
import { MysqlPrismaService } from '@box/db/prisma/mysql-prisma.service'; // TODO: change to your real path
|
|
import { MysqlPrismaService } from '@box/db/prisma/mysql-prisma.service'; // TODO: change to your real path
|
|
|
|
|
+import { MongoPrismaService } from '@box/db/prisma/mongo-prisma.service'; // TODO: change to your real path
|
|
|
|
|
|
|
|
import {
|
|
import {
|
|
|
CacheEntityType,
|
|
CacheEntityType,
|
|
@@ -16,7 +17,8 @@ export class CacheSyncService {
|
|
|
private readonly logger = new Logger(CacheSyncService.name);
|
|
private readonly logger = new Logger(CacheSyncService.name);
|
|
|
|
|
|
|
|
constructor(
|
|
constructor(
|
|
|
- private readonly prisma: MysqlPrismaService,
|
|
|
|
|
|
|
+ private readonly mysqlPrisma: MysqlPrismaService, // for CacheSyncAction (MySQL)
|
|
|
|
|
+ private readonly mongoPrisma: MongoPrismaService, // for Channel, Category, Ads, VideoMedia (Mongo)
|
|
|
private readonly redis: RedisService,
|
|
private readonly redis: RedisService,
|
|
|
) {}
|
|
) {}
|
|
|
|
|
|
|
@@ -27,24 +29,19 @@ export class CacheSyncService {
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
* Core generic scheduler.
|
|
* Core generic scheduler.
|
|
|
- * Domain code should call this via convenience helpers,
|
|
|
|
|
- * but it's reusable for any entityType/operation.
|
|
|
|
|
*/
|
|
*/
|
|
|
async scheduleAction(params: {
|
|
async scheduleAction(params: {
|
|
|
entityType: CacheEntityType;
|
|
entityType: CacheEntityType;
|
|
|
operation: CacheOperation;
|
|
operation: CacheOperation;
|
|
|
entityId?: bigint | number | null;
|
|
entityId?: bigint | number | null;
|
|
|
- payload?: CachePayload | Prisma.JsonValue | null;
|
|
|
|
|
|
|
+ payload?: CachePayload | MysqlPrisma.JsonValue | null;
|
|
|
delayMs?: number; // optional backoff for first attempt
|
|
delayMs?: number; // optional backoff for first attempt
|
|
|
}): Promise<void> {
|
|
}): Promise<void> {
|
|
|
const { entityType, operation, entityId, payload, delayMs } = params;
|
|
const { entityType, operation, entityId, payload, delayMs } = params;
|
|
|
const now = this.nowBigInt();
|
|
const now = this.nowBigInt();
|
|
|
- const nextAttemptAt =
|
|
|
|
|
- delayMs && delayMs > 0
|
|
|
|
|
- ? now + BigInt(delayMs) // schedule in the future
|
|
|
|
|
- : now;
|
|
|
|
|
|
|
+ const nextAttemptAt = delayMs && delayMs > 0 ? now + BigInt(delayMs) : now;
|
|
|
|
|
|
|
|
- await this.prisma.cacheSyncAction.create({
|
|
|
|
|
|
|
+ await this.mysqlPrisma.cacheSyncAction.create({
|
|
|
data: {
|
|
data: {
|
|
|
entityType,
|
|
entityType,
|
|
|
operation,
|
|
operation,
|
|
@@ -52,7 +49,7 @@ export class CacheSyncService {
|
|
|
status: CacheStatus.PENDING,
|
|
status: CacheStatus.PENDING,
|
|
|
attempts: 0,
|
|
attempts: 0,
|
|
|
nextAttemptAt,
|
|
nextAttemptAt,
|
|
|
- payload: (payload ?? null) as Prisma.JsonValue,
|
|
|
|
|
|
|
+ payload: (payload ?? null) as MysqlPrisma.JsonValue,
|
|
|
createdAt: now,
|
|
createdAt: now,
|
|
|
updatedAt: now,
|
|
updatedAt: now,
|
|
|
},
|
|
},
|
|
@@ -63,7 +60,7 @@ export class CacheSyncService {
|
|
|
);
|
|
);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- // Convenience helpers — you can add more as needed
|
|
|
|
|
|
|
+ // Convenience helpers — used by mgnt services or debug controller
|
|
|
|
|
|
|
|
async scheduleChannelRefreshAll(): Promise<void> {
|
|
async scheduleChannelRefreshAll(): Promise<void> {
|
|
|
await this.scheduleAction({
|
|
await this.scheduleAction({
|
|
@@ -101,12 +98,12 @@ export class CacheSyncService {
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
* Minimal processing loop (single batch).
|
|
* Minimal processing loop (single batch).
|
|
|
- * Later you can move this into a @Cron job using @nestjs/schedule.
|
|
|
|
|
|
|
+ * Later you can move this into a @Cron job.
|
|
|
*/
|
|
*/
|
|
|
async processPendingOnce(limit = 20): Promise<void> {
|
|
async processPendingOnce(limit = 20): Promise<void> {
|
|
|
const now = this.nowBigInt();
|
|
const now = this.nowBigInt();
|
|
|
|
|
|
|
|
- const actions = await this.prisma.cacheSyncAction.findMany({
|
|
|
|
|
|
|
+ const actions = await this.mysqlPrisma.cacheSyncAction.findMany({
|
|
|
where: {
|
|
where: {
|
|
|
status: CacheStatus.PENDING,
|
|
status: CacheStatus.PENDING,
|
|
|
nextAttemptAt: {
|
|
nextAttemptAt: {
|
|
@@ -134,9 +131,9 @@ export class CacheSyncService {
|
|
|
|
|
|
|
|
const attempts = action.attempts + 1;
|
|
const attempts = action.attempts + 1;
|
|
|
const maxAttempts = 5;
|
|
const maxAttempts = 5;
|
|
|
- const backoffMs = Math.min(60000, 5000 * attempts); // simple backoff up to 60s
|
|
|
|
|
|
|
+ const backoffMs = Math.min(60000, 5000 * attempts); // up to 60s
|
|
|
|
|
|
|
|
- await this.prisma.cacheSyncAction.update({
|
|
|
|
|
|
|
+ await this.mysqlPrisma.cacheSyncAction.update({
|
|
|
where: { id: action.id },
|
|
where: { id: action.id },
|
|
|
data: {
|
|
data: {
|
|
|
status:
|
|
status:
|
|
@@ -154,45 +151,40 @@ export class CacheSyncService {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
- * For now this just demonstrates Redis usage + marks SUCCESS.
|
|
|
|
|
- * Later you'll plug in real logic (channels/categories/ads/videos).
|
|
|
|
|
|
|
+ * Main dispatcher: decide what to do for each action.
|
|
|
*/
|
|
*/
|
|
|
- private async handleSingleAction(
|
|
|
|
|
- action: Prisma.CacheSyncActionGetPayload<unknown>, // loose type; you can refine
|
|
|
|
|
- ): Promise<void> {
|
|
|
|
|
- // TODO: replace this with real logic per entityType/operation
|
|
|
|
|
- // e.g., rebuild channels:all, ads pools, etc.
|
|
|
|
|
-
|
|
|
|
|
- // Example: write a simple trace into Redis so you can see it's working.
|
|
|
|
|
- const redisKey = `mgnt:cache-sync:last-processed:${action.id}`;
|
|
|
|
|
- await this.redis.setJson(
|
|
|
|
|
- redisKey,
|
|
|
|
|
- {
|
|
|
|
|
- id: action.id,
|
|
|
|
|
- entityType: action.entityType,
|
|
|
|
|
- operation: action.operation,
|
|
|
|
|
- processedAt: Date.now(),
|
|
|
|
|
- },
|
|
|
|
|
- 3600,
|
|
|
|
|
- );
|
|
|
|
|
|
|
+ private async handleSingleAction(action: CacheSyncAction): Promise<void> {
|
|
|
|
|
+ // 1) Dispatch based on entityType + operation
|
|
|
|
|
+ switch (action.entityType as CacheEntityType) {
|
|
|
|
|
+ case CacheEntityType.CHANNEL:
|
|
|
|
|
+ await this.handleChannelAction(action);
|
|
|
|
|
+ break;
|
|
|
|
|
+
|
|
|
|
|
+ case CacheEntityType.CATEGORY:
|
|
|
|
|
+ await this.handleCategoryAction(action);
|
|
|
|
|
+ break;
|
|
|
|
|
+
|
|
|
|
|
+ case CacheEntityType.AD:
|
|
|
|
|
+ await this.handleAdAction(action);
|
|
|
|
|
+ break;
|
|
|
|
|
+
|
|
|
|
|
+ case CacheEntityType.AD_POOL:
|
|
|
|
|
+ await this.handleAdPoolAction(action);
|
|
|
|
|
+ break;
|
|
|
|
|
+
|
|
|
|
|
+ case CacheEntityType.VIDEO_LIST:
|
|
|
|
|
+ await this.handleVideoListAction(action);
|
|
|
|
|
+ break;
|
|
|
|
|
|
|
|
- // Here is where you'd branch by entityType/operation, e.g.:
|
|
|
|
|
- //
|
|
|
|
|
- // switch (action.entityType) {
|
|
|
|
|
- // case CacheEntityType.CHANNEL:
|
|
|
|
|
- // if (action.operation === CacheOperation.REFRESH_ALL) {
|
|
|
|
|
- // await this.rebuildChannelsAll();
|
|
|
|
|
- // }
|
|
|
|
|
- // break;
|
|
|
|
|
- // case CacheEntityType.AD:
|
|
|
|
|
- // if (action.operation === CacheOperation.REFRESH) {
|
|
|
|
|
- // await this.refreshAdById(action.entityId!);
|
|
|
|
|
- // }
|
|
|
|
|
- // break;
|
|
|
|
|
- // // ...
|
|
|
|
|
- // }
|
|
|
|
|
-
|
|
|
|
|
- await this.prisma.cacheSyncAction.update({
|
|
|
|
|
|
|
+ default:
|
|
|
|
|
+ this.logger.warn(
|
|
|
|
|
+ `Unknown entityType for CacheSyncAction id=${action.id}: ${action.entityType}`,
|
|
|
|
|
+ );
|
|
|
|
|
+ break;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 2) If we get here without throwing, mark SUCCESS
|
|
|
|
|
+ await this.mysqlPrisma.cacheSyncAction.update({
|
|
|
where: { id: action.id },
|
|
where: { id: action.id },
|
|
|
data: {
|
|
data: {
|
|
|
status: CacheStatus.SUCCESS,
|
|
status: CacheStatus.SUCCESS,
|
|
@@ -206,4 +198,111 @@ export class CacheSyncService {
|
|
|
`Processed CacheSyncAction id=${action.id}, entityType=${action.entityType}, operation=${action.operation}`,
|
|
`Processed CacheSyncAction id=${action.id}, entityType=${action.entityType}, operation=${action.operation}`,
|
|
|
);
|
|
);
|
|
|
}
|
|
}
|
|
|
|
|
+
|
|
|
|
|
+ // ─────────────────────────────────────────────
|
|
|
|
|
+ // CHANNELS
|
|
|
|
|
+ // ─────────────────────────────────────────────
|
|
|
|
|
+
|
|
|
|
|
+ private async handleChannelAction(action: CacheSyncAction): Promise<void> {
|
|
|
|
|
+ switch (action.operation as CacheOperation) {
|
|
|
|
|
+ case CacheOperation.REFRESH_ALL:
|
|
|
|
|
+ await this.rebuildChannelsAll();
|
|
|
|
|
+ break;
|
|
|
|
|
+ default:
|
|
|
|
|
+ this.logger.warn(
|
|
|
|
|
+ `Unsupported CHANNEL operation for action id=${action.id}: ${action.operation}`,
|
|
|
|
|
+ );
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ private async rebuildChannelsAll(): Promise<void> {
|
|
|
|
|
+ // TODO: adjust to your actual Mongo Prisma model name & fields
|
|
|
|
|
+ const channels = await this.mongoPrisma.channel.findMany({
|
|
|
|
|
+ where: {
|
|
|
|
|
+ // e.g. only active / not deleted; adjust if needed
|
|
|
|
|
+ // isDeleted: false,
|
|
|
|
|
+ },
|
|
|
|
|
+ orderBy: {
|
|
|
|
|
+ // adjust to your schema; example:
|
|
|
|
|
+ // sortOrder: 'asc',
|
|
|
|
|
+ // createdAt: 'asc',
|
|
|
|
|
+ id: 'asc',
|
|
|
|
|
+ },
|
|
|
|
|
+ });
|
|
|
|
|
+
|
|
|
|
|
+ // Store full list; app-api can consume as needed
|
|
|
|
|
+ await this.redis.setJson('channels:all', channels);
|
|
|
|
|
+
|
|
|
|
|
+ this.logger.log(`Rebuilt channels:all with ${channels.length} item(s).`);
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // ─────────────────────────────────────────────
|
|
|
|
|
+ // CATEGORIES
|
|
|
|
|
+ // ─────────────────────────────────────────────
|
|
|
|
|
+
|
|
|
|
|
+ private async handleCategoryAction(action: CacheSyncAction): Promise<void> {
|
|
|
|
|
+ switch (action.operation as CacheOperation) {
|
|
|
|
|
+ case CacheOperation.REFRESH_ALL:
|
|
|
|
|
+ await this.rebuildCategoriesAll();
|
|
|
|
|
+ break;
|
|
|
|
|
+ default:
|
|
|
|
|
+ this.logger.warn(
|
|
|
|
|
+ `Unsupported CATEGORY operation for action id=${action.id}: ${action.operation}`,
|
|
|
|
|
+ );
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ private async rebuildCategoriesAll(): Promise<void> {
|
|
|
|
|
+ // TODO: adjust to your actual Mongo Prisma model name & fields
|
|
|
|
|
+ const categories = await this.mongoPrisma.category.findMany({
|
|
|
|
|
+ where: {
|
|
|
|
|
+ // e.g. only active / not deleted; adjust if needed
|
|
|
|
|
+ // isDeleted: false,
|
|
|
|
|
+ },
|
|
|
|
|
+ orderBy: {
|
|
|
|
|
+ // adjust to your schema
|
|
|
|
|
+ // sortOrder: 'asc',
|
|
|
|
|
+ // createdAt: 'asc',
|
|
|
|
|
+ id: 'asc',
|
|
|
|
|
+ },
|
|
|
|
|
+ });
|
|
|
|
|
+
|
|
|
|
|
+ await this.redis.setJson('categories:all', categories);
|
|
|
|
|
+
|
|
|
|
|
+ this.logger.log(
|
|
|
|
|
+ `Rebuilt categories:all with ${categories.length} item(s).`,
|
|
|
|
|
+ );
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // ─────────────────────────────────────────────
|
|
|
|
|
+ // ADS (placeholders for now)
|
|
|
|
|
+ // ─────────────────────────────────────────────
|
|
|
|
|
+
|
|
|
|
|
+ private async handleAdAction(action: CacheSyncAction): Promise<void> {
|
|
|
|
|
+ // TODO: implement real ad-by-id refresh using this.mongoPrisma.ads & Redis
|
|
|
|
|
+ this.logger.debug(
|
|
|
|
|
+ `handleAdAction placeholder for id=${action.entityId}, operation=${action.operation}`,
|
|
|
|
|
+ );
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ private async handleAdPoolAction(action: CacheSyncAction): Promise<void> {
|
|
|
|
|
+ // const payload = action.payload as CachePayload | null;
|
|
|
|
|
+ // const adType = payload?.type;
|
|
|
|
|
+ // TODO: implement real pool rebuild logic
|
|
|
|
|
+ this.logger.debug(
|
|
|
|
|
+ `handleAdPoolAction placeholder, operation=${action.operation}`,
|
|
|
|
|
+ );
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // ─────────────────────────────────────────────
|
|
|
|
|
+ // VIDEO LISTS (placeholder)
|
|
|
|
|
+ // ─────────────────────────────────────────────
|
|
|
|
|
+
|
|
|
|
|
+ private async handleVideoListAction(action: CacheSyncAction): Promise<void> {
|
|
|
|
|
+ // const payload = action.payload as CachePayload | null;
|
|
|
|
|
+ // TODO: implement lists rebuild for HOME / CHANNEL / TRENDING
|
|
|
|
|
+ this.logger.debug(
|
|
|
|
|
+ `handleVideoListAction placeholder, operation=${action.operation}`,
|
|
|
|
|
+ );
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|