|
|
@@ -1,9 +1,9 @@
|
|
|
+// apps/box-mgnt-api/src/cache-sync/cache-sync.service.ts
|
|
|
import { Injectable, Logger } from '@nestjs/common';
|
|
|
-import { Prisma as MysqlPrisma, CacheSyncAction } from '@prisma/mysql/client'; // adjust if your generated types are in a different package
|
|
|
-import { RedisService } from '../redis/redis.service';
|
|
|
-// ⬇️ Adjust this import to your actual db lib (e.g., '@box/db')
|
|
|
-import { MysqlPrismaService } from '@box/db/prisma/mysql-prisma.service'; // TODO: change to your real path
|
|
|
-import { MongoPrismaService } from '@box/db/prisma/mongo-prisma.service'; // TODO: change to your real path
|
|
|
+import { Prisma as MysqlPrisma, CacheSyncAction } from '@prisma/mysql/client';
|
|
|
+import { MysqlPrismaService } from '@box/db/prisma/mysql-prisma.service';
|
|
|
+import { MongoPrismaService } from '@box/db/prisma/mongo-prisma.service';
|
|
|
+import { RedisService } from '@box/db/redis/redis.service';
|
|
|
|
|
|
import {
|
|
|
CacheEntityType,
|
|
|
@@ -17,8 +17,11 @@ export class CacheSyncService {
|
|
|
private readonly logger = new Logger(CacheSyncService.name);
|
|
|
|
|
|
constructor(
|
|
|
- private readonly mysqlPrisma: MysqlPrismaService, // for CacheSyncAction (MySQL)
|
|
|
- private readonly mongoPrisma: MongoPrismaService, // for Channel, Category, Ads, VideoMedia (Mongo)
|
|
|
+ // MySQL: durable queue of actions
|
|
|
+ private readonly mysqlPrisma: MysqlPrismaService,
|
|
|
+ // MongoDB: actual content sources (channels, categories, ads, videos)
|
|
|
+ private readonly mongoPrisma: MongoPrismaService,
|
|
|
+ // Redis: cache store consumed by box-app-api
|
|
|
private readonly redis: RedisService,
|
|
|
) {}
|
|
|
|
|
|
@@ -60,7 +63,9 @@ export class CacheSyncService {
|
|
|
);
|
|
|
}
|
|
|
|
|
|
+ // ─────────────────────────────────────────────
|
|
|
// Convenience helpers — used by mgnt services or debug controller
|
|
|
+ // ─────────────────────────────────────────────
|
|
|
|
|
|
async scheduleChannelRefreshAll(): Promise<void> {
|
|
|
await this.scheduleAction({
|
|
|
@@ -96,10 +101,11 @@ export class CacheSyncService {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * Minimal processing loop (single batch).
|
|
|
- * Later you can move this into a @Cron job.
|
|
|
- */
|
|
|
+ // ─────────────────────────────────────────────
|
|
|
+ // Minimal processing loop (single batch).
|
|
|
+ // Later you can move this into a @Cron job.
|
|
|
+ // ─────────────────────────────────────────────
|
|
|
+
|
|
|
async processPendingOnce(limit = 20): Promise<void> {
|
|
|
const now = this.nowBigInt();
|
|
|
|
|
|
@@ -124,9 +130,12 @@ export class CacheSyncService {
|
|
|
for (const action of actions) {
|
|
|
try {
|
|
|
await this.handleSingleAction(action);
|
|
|
- } catch (err: any) {
|
|
|
+ } catch (err: unknown) {
|
|
|
+ const message =
|
|
|
+ err instanceof Error ? err.message : String(err ?? 'Unknown error');
|
|
|
+
|
|
|
this.logger.error(
|
|
|
- `Error processing CacheSyncAction id=${action.id}: ${err?.message ?? err}`,
|
|
|
+ `Error processing CacheSyncAction id=${action.id}: ${message}`,
|
|
|
);
|
|
|
|
|
|
const attempts = action.attempts + 1;
|
|
|
@@ -141,7 +150,7 @@ export class CacheSyncService {
|
|
|
? CacheStatus.GAVE_UP
|
|
|
: CacheStatus.PENDING,
|
|
|
attempts,
|
|
|
- lastError: err?.message ?? String(err),
|
|
|
+ lastError: message,
|
|
|
nextAttemptAt: this.nowBigInt() + BigInt(backoffMs),
|
|
|
updatedAt: this.nowBigInt(),
|
|
|
},
|
|
|
@@ -154,7 +163,6 @@ export class CacheSyncService {
|
|
|
* Main dispatcher: decide what to do for each action.
|
|
|
*/
|
|
|
private async handleSingleAction(action: CacheSyncAction): Promise<void> {
|
|
|
- // 1) Dispatch based on entityType + operation
|
|
|
switch (action.entityType as CacheEntityType) {
|
|
|
case CacheEntityType.CHANNEL:
|
|
|
await this.handleChannelAction(action);
|
|
|
@@ -183,7 +191,6 @@ export class CacheSyncService {
|
|
|
break;
|
|
|
}
|
|
|
|
|
|
- // 2) If we get here without throwing, mark SUCCESS
|
|
|
await this.mysqlPrisma.cacheSyncAction.update({
|
|
|
where: { id: action.id },
|
|
|
data: {
|
|
|
@@ -216,21 +223,21 @@ export class CacheSyncService {
|
|
|
}
|
|
|
|
|
|
private async rebuildChannelsAll(): Promise<void> {
|
|
|
- // TODO: adjust to your actual Mongo Prisma model name & fields
|
|
|
const channels = await this.mongoPrisma.channel.findMany({
|
|
|
where: {
|
|
|
// e.g. only active / not deleted; adjust if needed
|
|
|
// isDeleted: false,
|
|
|
},
|
|
|
orderBy: {
|
|
|
- // adjust to your schema; example:
|
|
|
+ // adjust to your schema
|
|
|
// sortOrder: 'asc',
|
|
|
// createdAt: 'asc',
|
|
|
id: 'asc',
|
|
|
},
|
|
|
});
|
|
|
|
|
|
- // Store full list; app-api can consume as needed
|
|
|
+ // NOTE:
|
|
|
+ // Actual Redis key will be "box:channels:all" if REDIS_KEY_PREFIX="box:".
|
|
|
await this.redis.setJson('channels:all', channels);
|
|
|
|
|
|
this.logger.log(`Rebuilt channels:all with ${channels.length} item(s).`);
|
|
|
@@ -253,7 +260,6 @@ export class CacheSyncService {
|
|
|
}
|
|
|
|
|
|
private async rebuildCategoriesAll(): Promise<void> {
|
|
|
- // TODO: adjust to your actual Mongo Prisma model name & fields
|
|
|
const categories = await this.mongoPrisma.category.findMany({
|
|
|
where: {
|
|
|
// e.g. only active / not deleted; adjust if needed
|
|
|
@@ -279,7 +285,7 @@ export class CacheSyncService {
|
|
|
// ─────────────────────────────────────────────
|
|
|
|
|
|
private async handleAdAction(action: CacheSyncAction): Promise<void> {
|
|
|
- // TODO: implement real ad-by-id refresh using this.mongoPrisma.ads & Redis
|
|
|
+ // TODO: implement real ad-by-id refresh using this.mongoPrisma.ad & Redis
|
|
|
this.logger.debug(
|
|
|
`handleAdAction placeholder for id=${action.entityId}, operation=${action.operation}`,
|
|
|
);
|