ソースを参照

feat(homepage): add endpoint to retrieve tag list from Redis cache
feat(sync-videomedia): enhance second tags upsert logic with error handling and concurrency

Dave 1 ヶ月 前
コミット
718d4a6fb1

+ 10 - 0
apps/box-app-api/src/feature/homepage/homepage.controller.ts

@@ -48,6 +48,16 @@ export class HomepageController {
     return this.homepageService.getCategoryList();
   }
 
+  @Get('tags')
+  @ApiOperation({
+    summary: '获取分类列表',
+    description:
+      '返回 Redis 中的完整分类缓存(box:app:category:all),按 seq 升序。',
+  })
+  async getTagList(): Promise<any> {
+    return this.homepageService.getTagList();
+  }
+
   // @Get('tags')
   // @ApiOperation({
   //   summary: '获取标签列表',

+ 86 - 71
apps/box-mgnt-api/src/mgnt-backend/feature/provider-video-sync/provider-video-sync.service.ts

@@ -766,85 +766,100 @@ export class ProviderVideoSyncService {
   private async upsertSecondTagsFromVideos_NoUniqueName(
     normalizedVideos: Array<{ secondTags?: string[] }>,
   ): Promise<UpsertTagsResult> {
-    const set = new Set<string>();
-
-    for (const v of normalizedVideos) {
-      const tags = v.secondTags ?? [];
-      for (const t of tags) {
-        if (typeof t !== 'string') continue;
-        const name = t.trim();
-        if (!name) continue;
-        set.add(name);
+    try {
+      const set = new Set<string>();
+
+      for (const v of normalizedVideos) {
+        const tags = v.secondTags ?? [];
+        for (const t of tags) {
+          if (typeof t !== 'string') continue;
+          const name = t.trim();
+          if (!name) continue;
+          set.add(name);
+        }
       }
-    }
-
-    const names = Array.from(set);
-    if (!names.length)
-      return { unique: 0, upserted: 0, skipped: 0, errors: [] };
-
-    // Concurrency limit to reduce race collisions and DB pressure
-    const CONCURRENCY = 20;
-    let idx = 0;
-
-    let upserted = 0;
-    let skipped = 0;
-    const errors: Array<{ name: string; error: string }> = [];
 
-    const worker = async () => {
-      while (true) {
-        const current = idx;
-        idx += 1;
-        if (current >= names.length) return;
-
-        const name = names[current];
-
-        try {
-          // 1) check existence by name (NOT unique)
-          const exists = await this.mongo.tag.findFirst({
-            where: { name },
-            select: { id: true },
-          });
-
-          if (exists?.id) {
-            // already exists
-            continue;
+      const names = Array.from(set);
+      if (!names.length)
+        return { unique: 0, upserted: 0, skipped: 0, errors: [] };
+
+      // Concurrency limit to reduce race collisions and DB pressure
+      const CONCURRENCY = 20;
+      let idx = 0;
+
+      let upserted = 0;
+      let skipped = 0;
+      const errors: Array<{ name: string; error: string }> = [];
+
+      const worker = async () => {
+        while (true) {
+          const current = idx;
+          idx += 1;
+          if (current >= names.length) return;
+
+          const name = names[current];
+
+          try {
+            // 1) check existence by name (NOT unique)
+            const exists = await this.mongo.tag.findFirst({
+              where: { name },
+              select: { id: true },
+            });
+
+            if (exists?.id) {
+              // already exists
+              continue;
+            }
+
+            // 2) create if not exists
+            await this.mongo.tag.create({
+              data: {
+                name,
+                // If your Tag schema requires seconds fields:
+                // createdAt: Math.floor(Date.now() / 1000),
+                // updatedAt: Math.floor(Date.now() / 1000),
+              },
+            });
+
+            upserted += 1;
+          } catch (e: any) {
+            // If another worker created it after our check, create may fail (duplicate on some index)
+            // We treat that as skipped (safe).
+            const msg = e?.message ?? 'Tag create failed';
+            skipped += 1;
+            errors.push({ name, error: msg });
           }
+        }
+      };
 
-          // 2) create if not exists
-          await this.mongo.tag.create({
-            data: {
-              name,
-              // If your Tag schema requires seconds fields:
-              // createdAt: Math.floor(Date.now() / 1000),
-              // updatedAt: Math.floor(Date.now() / 1000),
-            },
-          });
+      await Promise.all(Array.from({ length: CONCURRENCY }, () => worker()));
 
-          upserted += 1;
-        } catch (e: any) {
-          // If another worker created it after our check, create may fail (duplicate on some index)
-          // We treat that as skipped (safe).
-          const msg = e?.message ?? 'Tag create failed';
-          skipped += 1;
-          errors.push({ name, error: msg });
-        }
+      if (errors.length) {
+        this.logger.warn(
+          `[upsertSecondTagsFromVideos] errors=${errors.length}, sample=${JSON.stringify(
+            errors.slice(0, 3),
+          )}`,
+        );
+      } else {
+        this.logger.log(
+          `[upsertSecondTagsFromVideos] unique=${names.length} created=${upserted}`,
+        );
       }
-    };
-
-    await Promise.all(Array.from({ length: CONCURRENCY }, () => worker()));
 
-    if (errors.length) {
-      this.logger.warn(
-        `[upsertSecondTagsFromVideos] errors=${errors.length}, sample=${JSON.stringify(
-          errors.slice(0, 3),
-        )}`,
-      );
-    } else {
-      this.logger.log(
-        `[upsertSecondTagsFromVideos] unique=${names.length} created=${upserted}`,
+      return { unique: names.length, upserted, skipped, errors };
+    } catch (error: any) {
+      const message = error?.message ?? 'Unhandled tag upsert error';
+      const trace = error?.stack ?? undefined;
+      this.logger.error(
+        `[upsertSecondTagsFromVideos_NoUniqueName] ${message}`,
+        trace,
       );
+      return {
+        unique: 0,
+        upserted: 0,
+        skipped: 0,
+        errors: [{ name: 'global', error: message }],
+      };
     }
-
-    return { unique: names.length, upserted, skipped, errors };
   }
 }

+ 135 - 27
apps/box-mgnt-api/src/mgnt-backend/feature/sync-videomedia/sync-videomedia.service.ts

@@ -89,6 +89,17 @@ export class SyncVideomediaService {
     );
     this.logger.debug('[syncFromJson] First record sample:', normalized[0]);
 
+    const hasSecondTags = normalized.some(
+      (v) => Array.isArray(v.secondTags) && v.secondTags.length > 0,
+    );
+
+    if (hasSecondTags) {
+      // this.logger.log(
+      //   `[syncFromJson] Extracted secondTags from ${normalized.length} records`,
+      // );
+      await this.upsertSecondTagsFromVideos_NoUniqueName(normalized);
+    }
+
     // Batch processing - try to create each record individually and catch duplicate errors
     const BATCH_SIZE = 100;
     let created = 0;
@@ -105,38 +116,31 @@ export class SyncVideomediaService {
       await Promise.all(
         batch.map(async (record) => {
           try {
-            // Try to create the record
+            const exists = await this.mongo.videoMedia.findUnique({
+              where: { id: record.id },
+              select: { id: true },
+            });
+
+            if (exists?.id) {
+              const { id, ...updateData } = record;
+              await this.mongo.videoMedia.update({
+                where: { id },
+                data: updateData,
+              });
+              updated++;
+              this.logger.debug(`[syncFromJson] Updated record: ${id}`);
+              return;
+            }
+
             await this.mongo.videoMedia.create({ data: record });
             created++;
             this.logger.debug(`[syncFromJson] Created record: ${record.id}`);
           } catch (error: any) {
-            this.logger.debug(
-              `[syncFromJson] Create failed for ${record.id}: ${error.code} ${error.message?.substring(0, 100)}`,
+            this.logger.error(
+              `[syncFromJson] Failed for ${record.id}: ${error.message}`,
             );
-            // If duplicate key error (code 11000), try to update
-            if (error.code === 11000 || error.message?.includes('duplicate')) {
-              try {
-                const { id, ...updateData } = record;
-                await this.mongo.videoMedia.update({
-                  where: { id },
-                  data: updateData,
-                });
-                updated++;
-                this.logger.debug(`[syncFromJson] Updated record: ${id}`);
-              } catch (updateError: any) {
-                this.logger.error(
-                  `[syncFromJson] Update failed for ${record.id}: ${updateError.message}`,
-                );
-                skipped++;
-                errors.push({ id: record.id, error: updateError.message });
-              }
-            } else {
-              this.logger.error(
-                `[syncFromJson] Skipped ${record.id}: ${error.message}`,
-              );
-              skipped++;
-              errors.push({ id: record.id, error: error.message });
-            }
+            skipped++;
+            errors.push({ id: record.id, error: error.message });
           }
         }),
       );
@@ -158,6 +162,110 @@ export class SyncVideomediaService {
     };
   }
 
+  private async upsertSecondTagsFromVideos_NoUniqueName(
+    normalizedVideos: Array<{ secondTags?: string[] }>,
+  ): Promise<any> {
+    try {
+      const set = new Set<string>();
+
+      for (const v of normalizedVideos) {
+        const tags = v.secondTags ?? [];
+        for (const t of tags) {
+          if (typeof t !== 'string') continue;
+          const name = t.trim();
+          if (!name) continue;
+          set.add(name);
+        }
+      }
+
+      // this.logger.log(
+      //   `[upsertSecondTagsFromVideos] secondTags found in: ${normalizedVideos}`,
+      // );
+
+      const names = Array.from(set);
+      if (!names.length)
+        return { unique: 0, upserted: 0, skipped: 0, errors: [] };
+
+      // Concurrency limit to reduce race collisions and DB pressure
+      const CONCURRENCY = 20;
+      let idx = 0;
+
+      let upserted = 0;
+      let skipped = 0;
+      const errors: Array<{ name: string; error: string }> = [];
+
+      const worker = async () => {
+        while (true) {
+          const current = idx;
+          idx += 1;
+          if (current >= names.length) return;
+
+          const name = names[current];
+
+          try {
+            // 1) check existence by name (NOT unique)
+            const exists = await this.mongo.tag.findFirst({
+              where: { name },
+              select: { id: true },
+            });
+
+            if (exists?.id) {
+              // already exists
+              continue;
+            }
+
+            // 2) create if not exists
+            await this.mongo.tag.create({
+              data: {
+                name,
+                // If your Tag schema requires seconds fields:
+                // createdAt: Math.floor(Date.now() / 1000),
+                // updatedAt: Math.floor(Date.now() / 1000),
+              },
+            });
+
+            upserted += 1;
+          } catch (e: any) {
+            // If another worker created it after our check, create may fail (duplicate on some index)
+            // We treat that as skipped (safe).
+            const msg = e?.message ?? 'Tag create failed';
+            skipped += 1;
+            errors.push({ name, error: msg });
+          }
+        }
+      };
+
+      await Promise.all(Array.from({ length: CONCURRENCY }, () => worker()));
+
+      if (errors.length) {
+        this.logger.warn(
+          `[upsertSecondTagsFromVideos] errors=${errors.length}, sample=${JSON.stringify(
+            errors.slice(0, 3),
+          )}`,
+        );
+      } else {
+        this.logger.log(
+          `[upsertSecondTagsFromVideos] unique=${names.length} created=${upserted}`,
+        );
+      }
+
+      return { unique: names.length, upserted, skipped, errors };
+    } catch (error: any) {
+      const message = error?.message ?? 'Unhandled tag upsert error';
+      const trace = error?.stack ?? undefined;
+      this.logger.error(
+        `[upsertSecondTagsFromVideos_NoUniqueName] ${message}`,
+        trace,
+      );
+      return {
+        unique: 0,
+        upserted: 0,
+        skipped: 0,
+        errors: [{ name: 'global', error: message }],
+      };
+    }
+  }
+
   /**
    * Extracts the list of items from different possible JSON shapes.
    */