// provider-video-sync.service.ts import { Injectable, Logger } from '@nestjs/common'; import { HttpService } from '@nestjs/axios'; import { MongoPrismaService } from '@box/db/prisma/mongo-prisma.service'; import { firstValueFrom } from 'rxjs'; import { EntityType } from '@prisma/mongo/client'; export interface ProviderVideoSyncOptions { providerCode?: string; /** * Optional override. In normal usage, we resume from SyncState cursor: * - fullSync: pageNum resumes * - incremental: always pageNum=1 */ pageNum?: number; /** * Default 200, hard-capped to 500. */ pageSize?: number; /** * Provider search param. * - status: required in your business rule ("Completed") * - updatedAt: ISO string filter "updated after" */ param?: { status?: string; updatedAt?: string; [k: string]: any; }; /** * fullSync: * - true: no param.updatedAt; resume using stored pageNum * - false: use param.updatedAt (cursor); pageNum forced to 1 */ fullSync?: boolean; /** * If true: ignore stored cursor and start fresh. * - fullSync: pageNum from options.pageNum or 1 * - incremental: updatedAtCursor from options.param.updatedAt (if provided) */ resetState?: boolean; [key: string]: any; } export interface ProviderVideoSyncResult { imported: number; created: number; // Option B: always 0 updated: number; // Option B: equals successful upserts skipped: number; errors?: Array<{ id?: string; error: string }>; } interface RawProviderVideo { id: string; srcId?: number; title?: string; checkSum?: string; type?: string; formatType?: number; contentType?: number; coverType?: number; coverImg?: string; coverImgNew?: string; videoTime?: number; publish?: string; country?: string; firstTag?: string; secondTags?: string[] | null; mediaSet?: string | null; preFileName?: string; status?: string; desc?: string; size?: number; bango?: string; actors?: string[] | null; studio?: string; addedTime: string; appids?: number[] | null; japanNames?: string[] | null; filename?: string; fieldNameFs?: string; ext?: string; taskId?: string; width?: number; height?: number; ratio?: number; frameRate?: string; syBitRate?: string; vidBitRate?: string; createdAt: string; updatedAt: string; proxyUpload?: number | null; isAdd?: boolean; retry?: number; notifySignal?: boolean; mergeRetry?: number; compressRetry?: number; segmentRetry?: number; linodeRetry?: number; failReason?: string; deleteDisk?: boolean; infoTsName?: string; } type SyncCursor = { pageNum: number; pageSize: number; updatedAtCursor?: string; }; type ProviderPagingInfo = { total?: number; totalPages?: number; }; type UpsertOutcome = | { ok: true } | { ok: false; error: { id?: string; error: string } }; type UpsertTagsResult = { unique: number; upserted: number; skipped: number; errors: Array<{ name: string; error: string }>; }; @Injectable() export class ProviderVideoSyncService { private readonly logger = new Logger(ProviderVideoSyncService.name); private lastSyncSummary: ProviderVideoSyncResult | null = null; private readonly PROVIDER_API_URL = 'https://vm.rvakc.xyz/api/web/mediafile/search'; private readonly DEFAULT_PROVIDER_CODE = 'RVAKC'; private readonly MAX_PAGE_SIZE = 500; private readonly DEFAULT_PAGE_SIZE = 500; private readonly BATCH_SIZE = 100; private readonly BASELINE_PARTIAL_COUNT = 20000; constructor( private readonly mongo: MongoPrismaService, private readonly httpService: HttpService, ) {} async syncFromProvider( options: ProviderVideoSyncOptions = {}, ): Promise { const providerCode = options.providerCode ?? this.DEFAULT_PROVIDER_CODE; const requestedPageSize = options.pageSize ?? this.DEFAULT_PAGE_SIZE; const pageSize = this.clampInt(requestedPageSize, 1, this.MAX_PAGE_SIZE); const fullSync = options.fullSync ?? false; const resetState = options.resetState ?? false; const paramStatus = options.param?.status ?? 'Completed'; const optionUpdatedAt = options.param?.updatedAt; // Only one entity exists in your enum now const entity = EntityType.VIDEO; this.logger.log( `[syncFromProvider] Start provider=${providerCode} entity=${entity} fullSync=${fullSync} pageSize=${pageSize} resetState=${resetState}`, ); // Load cursor from SyncState (or fresh if resetState) const { cursor: initialCursor, checkpointUpdatedAtCursor } = await this.loadCursor({ entity, pageSize, resetState, overridePageNum: options.pageNum, optionUpdatedAt, fullSync, }); // Counters (Option B: created always 0, updated counts successful upserts) let imported = 0; let updated = 0; let skipped = 0; const created = 0; const errors: Array<{ id?: string; error: string }> = []; // Track max updatedAt seen (for incremental cursor advancement) let maxUpdatedAtSeen: Date | null = null; // Full sync resumes with pageNum; incremental always starts at 1 let pageNum = fullSync ? initialCursor.pageNum : 1; // Keep a working cursor that we will persist as we go const cursor: SyncCursor = { pageNum, pageSize: initialCursor.pageSize, updatedAtCursor: initialCursor.updatedAtCursor, }; const effectiveUpdatedAtCursor = fullSync ? undefined : (options.param?.updatedAt ?? checkpointUpdatedAtCursor); const shouldRunBaselinePartial = !fullSync && !checkpointUpdatedAtCursor; try { if (shouldRunBaselinePartial) { const baselineResult = await this.runBaselinePartialIfNeeded({ entity, cursor: initialCursor, paramStatus, optionsParam: options.param, }); if (baselineResult) { this.lastSyncSummary = baselineResult; return baselineResult; } } while (true) { const body = this.buildProviderBody({ pageNum, pageSize: cursor.pageSize, status: paramStatus, // fullSync: no updatedAt filter updatedAt: fullSync ? undefined : effectiveUpdatedAtCursor, extraParam: options.param, }); this.logger.log( `[syncFromProvider] POST pageNum=${pageNum} pageSize=${cursor.pageSize} status=${paramStatus} updatedAt=${fullSync ? '(none)' : (body.param.updatedAt ?? '(none)')}`, ); const rawList = await this.fetchPage(body); if (!rawList.length) { this.logger.log( `[syncFromProvider] No more records (pageNum=${pageNum}). Stop.`, ); // On completion: // - fullSync: reset pageNum to 1 and set lastFullSyncAt // - incremental: advance updatedAtCursor to maxUpdatedAtSeen, keep pageNum=1 const fullSyncCompleted = fullSync; if (!fullSync && maxUpdatedAtSeen && imported > 0) { await this.saveCheckpoint({ entity, updatedAtCursor: maxUpdatedAtSeen.toISOString(), fullSyncCompleted: false, }); } await this.saveCursor({ entity, fullSyncCompleted, }); const result: ProviderVideoSyncResult = { imported, created, updated, skipped, errors: errors.length ? errors.slice(0, 10) : undefined, }; this.lastSyncSummary = result; return result; } imported += rawList.length; const processed = await this.processProviderRawList( rawList, maxUpdatedAtSeen, ); updated += processed.updated; skipped += processed.skipped; errors.push(...processed.errors); maxUpdatedAtSeen = processed.maxUpdatedAtSeen; // Persist progress so we can resume on crash await this.saveCursor({ entity, fullSyncCompleted: false, }); pageNum += 1; } } catch (e: any) { this.logger.error( `[syncFromProvider] Unexpected error: ${e?.message ?? e}`, ); // Best-effort cursor persistence try { await this.saveCursor({ entity, fullSyncCompleted: false, }); } catch (saveErr: any) { this.logger.error( `[syncFromProvider] Failed to persist cursor after error: ${saveErr?.message ?? saveErr}`, ); } const result: ProviderVideoSyncResult = { imported, created, updated, skipped, errors: [ ...(errors.length ? errors.slice(0, 9) : []), { error: e?.message ?? 'Unexpected error' }, ], }; this.lastSyncSummary = result; return result; } } private async runBaselinePartialIfNeeded(args: { entity: EntityType; cursor: SyncCursor; paramStatus: string; optionsParam?: ProviderVideoSyncOptions['param']; }): Promise { if (!args.cursor || args.cursor.updatedAtCursor !== undefined) { return null; } const probeBody = this.buildProviderBody({ pageNum: 1, pageSize: args.cursor.pageSize, status: args.paramStatus, updatedAt: undefined, extraParam: args.optionsParam, }); const pagination = await this.probeProviderForPaging(probeBody); let totalPages = pagination.totalPages; if (totalPages === undefined && pagination.total !== undefined) { totalPages = Math.max( 0, Math.ceil(pagination.total / args.cursor.pageSize), ); } if (!totalPages || totalPages < 1) { this.logger.warn( '[syncFromProvider] Baseline partial skipped because provider did not disclose total/pages; cannot compute bottom→top range.', ); return null; } const pagesNeeded = Math.min( totalPages, Math.ceil(this.BASELINE_PARTIAL_COUNT / args.cursor.pageSize), ); if (pagesNeeded <= 0) { return null; } const startPage = totalPages; const endPage = Math.max(1, totalPages - pagesNeeded + 1); this.logger.log( `[syncFromProvider] Baseline partial (first-ever) running pages ${startPage} down to ${endPage}`, ); let imported = 0; let updated = 0; let skipped = 0; const errors: Array<{ id?: string; error: string }> = []; let maxUpdatedAtSeen: Date | null = null; for (let page = startPage; page >= endPage; page -= 1) { const body = this.buildProviderBody({ pageNum: page, pageSize: args.cursor.pageSize, status: args.paramStatus, updatedAt: undefined, extraParam: args.optionsParam, }); this.logger.log(`[syncFromProvider] param body ${JSON.stringify(body)} `); const rawList = await this.fetchPage(body); if (!rawList.length) { this.logger.log( `[syncFromProvider] Baseline partial page ${page} returned 0 records; continuing.`, ); continue; } imported += rawList.length; const processed = await this.processProviderRawList( rawList, maxUpdatedAtSeen, ); updated += processed.updated; skipped += processed.skipped; errors.push(...processed.errors); maxUpdatedAtSeen = processed.maxUpdatedAtSeen; } if (maxUpdatedAtSeen && imported > 0) { await this.saveCheckpoint({ entity: args.entity, updatedAtCursor: maxUpdatedAtSeen.toISOString(), fullSyncCompleted: false, }); } return { imported, created: 0, updated, skipped, errors: errors.length ? errors.slice(0, 10) : undefined, }; } private async probeProviderForPaging(body: { pageNum: number; pageSize: number; param: Record; }): Promise { try { const response = await firstValueFrom( this.httpService.post(this.PROVIDER_API_URL, body, { headers: { 'Content-Type': 'application/json' }, timeout: 30000, }), ); return { total: this.extractNumberFromPaths(response.data, [ 'total', 'data.total', 'data.totalCount', 'data.pageInfo.total', 'data.pageInfo.totalCount', ]), totalPages: this.extractNumberFromPaths(response.data, [ 'pages', 'data.pages', 'data.totalPages', 'data.pageInfo.pages', 'data.pageInfo.totalPages', ]), }; } catch (error: any) { this.logger.error( `[probeProviderForPaging] Provider API call failed: ${error?.message ?? error}`, ); throw new Error(`Provider API error: ${error?.message ?? 'unknown'}`); } } private extractNumberFromPaths( data: any, paths: string[], ): number | undefined { if (!data || typeof data !== 'object') return undefined; for (const path of paths) { const value = path .split('.') .reduce( (obj, key) => (obj && typeof obj === 'object' ? obj[key] : undefined), data, ); if (value === undefined || value === null) continue; const num = typeof value === 'number' ? value : Number(value); if (Number.isFinite(num)) return num; } return undefined; } getLastSyncSummary(): ProviderVideoSyncResult | null { return this.lastSyncSummary; } private buildProviderBody(args: { pageNum: number; pageSize: number; status: string; updatedAt?: string; extraParam?: ProviderVideoSyncOptions['param']; }) { // Provider contract: // { // pageNum: 1, // pageSize: 200, // param: { status: "Completed", updatedAt: "ISO" } // } const param: Record = { status: args.status, }; // Keep only if present (incremental) if (args.updatedAt) param.updatedAt = args.updatedAt; // Merge any extraParam fields, but status/updatedAt above remain authoritative if (args.extraParam && typeof args.extraParam === 'object') { for (const [k, v] of Object.entries(args.extraParam)) { if (k === 'status' || k === 'updatedAt') continue; param[k] = v; } } return { pageNum: args.pageNum, pageSize: args.pageSize, param, }; } private async fetchPage(body: { pageNum: number; pageSize: number; param: Record; }): Promise { try { // Provider expects { data: "" } (based on code=400 Field=data expecting string) const wrappedBody = { data: JSON.stringify({ pageNum: body.pageNum, pageSize: body.pageSize, param: body.param, }), }; const response = await firstValueFrom( this.httpService.post(this.PROVIDER_API_URL, wrappedBody, { headers: { 'Content-Type': 'application/json' }, timeout: 30_000, }), ); // Axios response unwrap: providerJson is the actual provider payload const providerJson = (response as any)?.data ?? response; // Log a small preview for debugging (avoid huge logs) this.logger.log( `[fetchPage] Provider response preview: ${JSON.stringify( providerJson, ).slice(0, 400)}...`, ); // Fail fast on provider errors (prevents "successful" runs with empty lists) const code = (providerJson as any)?.code; if (code !== 200) { const msg = (providerJson as any)?.msg ?? 'unknown'; const tip = (providerJson as any)?.tip ?? ''; throw new Error( `Provider error code=${code} msg=${msg}${tip ? ` tip=${tip}` : ''}`, ); } const list = this.extractList(providerJson); this.logger.log(`[fetchPage] Received ${list.length} items`); return list; } catch (error: any) { this.logger.error( `[fetchPage] Provider API call failed: ${error?.message ?? error}`, ); throw new Error(`Provider API error: ${error?.message ?? 'unknown'}`); } } private async processProviderRawList( rawList: RawProviderVideo[], currentMaxUpdatedAt: Date | null, ): Promise<{ updated: number; skipped: number; errors: Array<{ id?: string; error: string }>; maxUpdatedAtSeen: Date | null; }> { if (!rawList.length) { return { updated: 0, skipped: 0, errors: [], maxUpdatedAtSeen: currentMaxUpdatedAt, }; } const normalized = rawList.map((item) => this.normalizeItem(item)); const hasSecondTags = normalized.some( (v) => Array.isArray(v.secondTags) && v.secondTags.length > 0, ); if (hasSecondTags) { await this.upsertSecondTagsFromVideos_NoUniqueName(normalized); } let maxUpdatedAtSeen = currentMaxUpdatedAt; for (const n of normalized) { const d = n.updatedAt as Date; if (!maxUpdatedAtSeen || d.getTime() > maxUpdatedAtSeen.getTime()) { maxUpdatedAtSeen = d; } } let updated = 0; let skipped = 0; const errors: Array<{ id?: string; error: string }> = []; for (let i = 0; i < normalized.length; i += this.BATCH_SIZE) { const batch = normalized.slice(i, i + this.BATCH_SIZE); // eslint-disable-next-line no-await-in-loop const outcomes = await Promise.all(batch.map((r) => this.upsertOne(r))); const okCount = outcomes.filter((o) => o.ok).length; const fail = outcomes.filter((o) => !o.ok) as Array< Extract >; updated += okCount; skipped += fail.length; for (const f of fail) errors.push(f.error); } return { updated, skipped, errors, maxUpdatedAtSeen, }; } private debugRespShape(resp: unknown) { const r: any = resp as any; const keys = r && typeof r === 'object' ? Object.keys(r).slice(0, 12) : []; const dataKeys = r?.data && typeof r.data === 'object' ? Object.keys(r.data).slice(0, 12) : []; const dataDataKeys = r?.data?.data && typeof r.data.data === 'object' ? Object.keys(r.data.data).slice(0, 12) : []; this.logger.warn( `[debugRespShape] topKeys=${JSON.stringify(keys)} dataKeys=${JSON.stringify( dataKeys, )} dataDataKeys=${JSON.stringify(dataDataKeys)} hasStatus=${Boolean(r?.status)} hasCode=${Boolean( r?.code, )} hasDataCode=${Boolean(r?.data?.code)}`, ); } private extractList(apiResponse: unknown): RawProviderVideo[] { const data = apiResponse as any; if (Array.isArray(data)) return data as RawProviderVideo[]; // ✅ axios response: { data: { code, data: { total, list } } } if (data?.data?.data?.list && Array.isArray(data.data.data.list)) { return data.data.data.list as RawProviderVideo[]; } // provider json directly: { code, data: { total, list } } if (data?.data?.list && Array.isArray(data.data.list)) { return data.data.list as RawProviderVideo[]; } if (data?.list && Array.isArray(data.list)) { return data.list as RawProviderVideo[]; } if (data?.data?.records && Array.isArray(data.data.records)) { return data.data.records as RawProviderVideo[]; } this.logger.warn( '[extractList] Unexpected API response structure, defaulting to empty list', ); return []; } private normalizeItem(item: RawProviderVideo) { if (!item.id) throw new Error('Each item must have an id'); if (!item.addedTime || !item.createdAt || !item.updatedAt) { throw new Error(`Item ${item.id} is missing required datetime fields`); } const addedTime = new Date(item.addedTime); const createdAt = new Date(item.createdAt); const updatedAt = new Date(item.updatedAt); if ( isNaN(addedTime.getTime()) || isNaN(createdAt.getTime()) || isNaN(updatedAt.getTime()) ) { throw new Error(`Item ${item.id} has invalid datetime format`); } return { id: item.id, // confirmed Mongo ObjectId string srcId: item.srcId ?? 0, title: item.title ?? '', checkSum: item.checkSum ?? '', type: item.type ?? '', formatType: item.formatType ?? 0, contentType: item.contentType ?? 0, coverType: item.coverType ?? 0, coverImg: item.coverImg ?? '', coverImgNew: item.coverImgNew ?? '', videoTime: item.videoTime ?? 0, publish: item.publish ?? '', country: item.country ?? '', firstTag: item.firstTag ?? '', secondTags: item.secondTags ?? [], mediaSet: item.mediaSet ?? '', preFileName: item.preFileName ?? '', status: item.status ?? '', desc: item.desc ?? '', size: BigInt(item.size ?? 0), bango: item.bango ?? '', actors: item.actors ?? [], studio: item.studio ?? '', addedTime, appids: item.appids ?? [], japanNames: item.japanNames ?? [], filename: item.filename ?? '', fieldNameFs: item.fieldNameFs ?? '', ext: item.ext ?? '', taskId: item.taskId ?? '', width: item.width ?? 0, height: item.height ?? 0, ratio: item.ratio ?? 0, frameRate: item.frameRate ?? '', syBitRate: item.syBitRate ?? '', vidBitRate: item.vidBitRate ?? '', proxyUpload: item.proxyUpload ?? 0, isAdd: item.isAdd ?? false, retry: item.retry ?? 0, notifySignal: item.notifySignal ?? false, mergeRetry: item.mergeRetry ?? 0, compressRetry: item.compressRetry ?? 0, segmentRetry: item.segmentRetry ?? 0, linodeRetry: item.linodeRetry ?? 0, failReason: item.failReason ?? '', deleteDisk: item.deleteDisk ?? false, infoTsName: item.infoTsName ?? '', createdAt, updatedAt, }; } private async upsertOne(record: any): Promise { const id = record?.id as string | undefined; if (!id) return { ok: false, error: { error: 'Missing id' } }; try { const { id: _, ...updateData } = record; await this.mongo.videoMedia.upsert({ where: { id }, create: record, update: updateData, }); return { ok: true }; } catch (e: any) { return { ok: false, error: { id, error: e?.message ?? 'Upsert failed' }, }; } } private async loadCursor(args: { entity: EntityType; pageSize: number; resetState: boolean; overridePageNum?: number; optionUpdatedAt?: string; fullSync: boolean; }): Promise<{ cursor: SyncCursor; checkpointUpdatedAtCursor?: string }> { if (args.resetState) { return { cursor: { pageNum: args.overridePageNum ?? 1, pageSize: args.pageSize, updatedAtCursor: args.fullSync ? undefined : args.optionUpdatedAt, }, checkpointUpdatedAtCursor: undefined, }; } const checkpoint = await this.loadCheckpoint(args.entity); const cursor: SyncCursor = { pageNum: args.overridePageNum ?? 1, pageSize: args.pageSize, updatedAtCursor: args.fullSync ? undefined : (checkpoint.updatedAtCursor ?? args.optionUpdatedAt), }; return { cursor, checkpointUpdatedAtCursor: checkpoint.updatedAtCursor }; } private async saveCursor(args: { entity: EntityType; fullSyncCompleted: boolean; }) { const now = new Date(); const nowSec = Math.floor(Date.now() / 1000); await this.mongo.syncState.update({ where: { entity: args.entity }, data: { lastRunAt: now, lastFullSyncAt: args.fullSyncCompleted ? now : undefined, updatedAt: nowSec, }, }); } private async loadCheckpoint(entity: EntityType): Promise<{ updatedAtCursor?: string; }> { const nowSec = Math.floor(Date.now() / 1000); const state = await this.mongo.syncState.upsert({ where: { entity }, update: { updatedAt: nowSec, }, create: { entity, referId: null, lastRunAt: null, lastFullSyncAt: null, createdAt: nowSec, updatedAt: nowSec, }, }); const parsed = this.safeParseCursor(state.referId); return { updatedAtCursor: parsed?.updatedAtCursor }; } private async saveCheckpoint(args: { entity: EntityType; updatedAtCursor?: string | null; fullSyncCompleted: boolean; }) { const now = new Date(); const nowSec = Math.floor(Date.now() / 1000); await this.mongo.syncState.update({ where: { entity: args.entity }, data: { referId: args.updatedAtCursor !== undefined && args.updatedAtCursor !== null ? JSON.stringify({ updatedAtCursor: args.updatedAtCursor }) : null, lastRunAt: now, lastFullSyncAt: args.fullSyncCompleted ? now : undefined, updatedAt: nowSec, }, }); } private safeParseCursor( raw: string | null | undefined, ): Partial | null { if (!raw) return null; try { const parsed = JSON.parse(raw) as any; if (!parsed || typeof parsed !== 'object') return null; return parsed as Partial; } catch { return null; } } private clampInt(n: number, min: number, max: number): number { const x = Number.isFinite(n) ? Math.trunc(n) : min; return Math.max(min, Math.min(max, x)); } /** * Extract secondTags (string[]) from normalized video records and upsert into Tag collection. * - Dedup in-memory per call for performance * - Trims whitespace, filters empty * - Option B performance-first: upsert without pre-check */ // private async upsertSecondTagsFromVideos( // normalizedVideos: Array<{ secondTags?: string[] }>, // ): Promise { // // 1) Collect + normalize // const set = new Set(); // for (const v of normalizedVideos) { // const tags = v.secondTags ?? []; // for (const t of tags) { // if (typeof t !== 'string') continue; // const name = t.trim(); // if (!name) continue; // set.add(name); // } // } // const names = Array.from(set); // if (!names.length) { // return { unique: 0, upserted: 0, skipped: 0, errors: [] }; // } // // 2) Upsert in chunks (avoid massive Promise.all) // const CHUNK = 200; // let upserted = 0; // let skipped = 0; // const errors: Array<{ name: string; error: string }> = []; // for (let i = 0; i < names.length; i += CHUNK) { // const batch = names.slice(i, i + CHUNK); // // eslint-disable-next-line no-await-in-loop // const outcomes = await Promise.all( // batch.map(async (name) => { // try { // // 🔁 Adjust `where/create/update` if your Tag schema differs // await this.mongo.tag.upsert({ // where: { name }, // create: { // name, // // If Tag requires createdAt/updatedAt ints (seconds), uncomment: // // createdAt: Math.floor(Date.now() / 1000), // // updatedAt: Math.floor(Date.now() / 1000), // }, // update: { // // keep it minimal; optionally touch updatedAt // // updatedAt: Math.floor(Date.now() / 1000), // }, // }); // return { ok: true as const }; // } catch (e: any) { // return { // ok: false as const, // error: e?.message ?? 'Tag upsert failed', // }; // } // }), // ); // for (let j = 0; j < outcomes.length; j += 1) { // const o = outcomes[j]; // if (o.ok) upserted += 1; // else { // skipped += 1; // errors.push({ name: batch[j], error: o.error }); // } // } // } // if (errors.length) { // this.logger.warn( // `[upsertSecondTagsFromVideos] tag upsert errors=${errors.length}, sample=${JSON.stringify( // errors.slice(0, 3), // )}`, // ); // } else { // this.logger.log( // `[upsertSecondTagsFromVideos] Upserted tags=${upserted} (unique=${names.length})`, // ); // } // return { // unique: names.length, // upserted, // skipped, // errors, // }; // } private async upsertSecondTagsFromVideos_NoUniqueName( normalizedVideos: Array<{ secondTags?: string[] }>, ): Promise { try { const set = new Set(); for (const v of normalizedVideos) { const tags = v.secondTags ?? []; for (const t of tags) { if (typeof t !== 'string') continue; const name = t.trim(); if (!name) continue; set.add(name); } } const names = Array.from(set); if (!names.length) return { unique: 0, upserted: 0, skipped: 0, errors: [] }; // Concurrency limit to reduce race collisions and DB pressure const CONCURRENCY = 20; let idx = 0; let upserted = 0; let skipped = 0; const errors: Array<{ name: string; error: string }> = []; const worker = async () => { while (true) { const current = idx; idx += 1; if (current >= names.length) return; const name = names[current]; try { // 1) check existence by name (NOT unique) const exists = await this.mongo.tag.findFirst({ where: { name }, select: { id: true }, }); if (exists?.id) { // already exists continue; } // 2) create if not exists await this.mongo.tag.create({ data: { name, // If your Tag schema requires seconds fields: // createdAt: Math.floor(Date.now() / 1000), // updatedAt: Math.floor(Date.now() / 1000), }, }); upserted += 1; } catch (e: any) { // If another worker created it after our check, create may fail (duplicate on some index) // We treat that as skipped (safe). const msg = e?.message ?? 'Tag create failed'; skipped += 1; errors.push({ name, error: msg }); } } }; await Promise.all(Array.from({ length: CONCURRENCY }, () => worker())); if (errors.length) { this.logger.warn( `[upsertSecondTagsFromVideos] errors=${errors.length}, sample=${JSON.stringify( errors.slice(0, 3), )}`, ); } else { this.logger.log( `[upsertSecondTagsFromVideos] unique=${names.length} created=${upserted}`, ); } return { unique: names.length, upserted, skipped, errors }; } catch (error: any) { const message = error?.message ?? 'Unhandled tag upsert error'; const trace = error?.stack ?? undefined; this.logger.error( `[upsertSecondTagsFromVideos_NoUniqueName] ${message}`, trace, ); return { unique: 0, upserted: 0, skipped: 0, errors: [{ name: 'global', error: message }], }; } } }