| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104 |
- // provider-video-sync.service.ts
- import { Injectable, Logger } from '@nestjs/common';
- import { HttpService } from '@nestjs/axios';
- import { MongoPrismaService } from '@box/db/prisma/mongo-prisma.service';
- import { firstValueFrom } from 'rxjs';
- import { EntityType } from '@prisma/mongo/client';
- export interface ProviderVideoSyncOptions {
- providerCode?: string;
- /**
- * Optional override. In normal usage, we resume from SyncState cursor:
- * - fullSync: pageNum resumes
- * - incremental: always pageNum=1
- */
- pageNum?: number;
- /**
- * Default 200, hard-capped to 500.
- */
- pageSize?: number;
- /**
- * Provider search param.
- * - status: required in your business rule ("Completed")
- * - updatedAt: ISO string filter "updated after"
- */
- param?: {
- status?: string;
- updatedAt?: string;
- [k: string]: any;
- };
- /**
- * fullSync:
- * - true: no param.updatedAt; resume using stored pageNum
- * - false: use param.updatedAt (cursor); pageNum forced to 1
- */
- fullSync?: boolean;
- /**
- * If true: ignore stored cursor and start fresh.
- * - fullSync: pageNum from options.pageNum or 1
- * - incremental: updatedAtCursor from options.param.updatedAt (if provided)
- */
- resetState?: boolean;
- [key: string]: any;
- }
- export interface ProviderVideoSyncResult {
- imported: number;
- created: number; // Option B: always 0
- updated: number; // Option B: equals successful upserts
- skipped: number;
- errors?: Array<{ id?: string; error: string }>;
- }
- interface RawProviderVideo {
- id: string;
- srcId?: number;
- title?: string;
- checkSum?: string;
- type?: string;
- formatType?: number;
- contentType?: number;
- coverType?: number;
- coverImg?: string;
- coverImgNew?: string;
- videoTime?: number;
- publish?: string;
- country?: string;
- firstTag?: string;
- secondTags?: string[] | null;
- mediaSet?: string | null;
- preFileName?: string;
- status?: string;
- desc?: string;
- size?: number;
- bango?: string;
- actors?: string[] | null;
- studio?: string;
- addedTime: string;
- appids?: number[] | null;
- japanNames?: string[] | null;
- filename?: string;
- fieldNameFs?: string;
- ext?: string;
- taskId?: string;
- width?: number;
- height?: number;
- ratio?: number;
- frameRate?: string;
- syBitRate?: string;
- vidBitRate?: string;
- createdAt: string;
- updatedAt: string;
- proxyUpload?: number | null;
- isAdd?: boolean;
- retry?: number;
- notifySignal?: boolean;
- mergeRetry?: number;
- compressRetry?: number;
- segmentRetry?: number;
- linodeRetry?: number;
- failReason?: string;
- deleteDisk?: boolean;
- infoTsName?: string;
- }
- type SyncCursor = {
- pageNum: number;
- pageSize: number;
- updatedAtCursor?: string;
- };
- type ProviderPagingInfo = {
- total?: number;
- totalPages?: number;
- };
- type UpsertOutcome =
- | { ok: true }
- | { ok: false; error: { id?: string; error: string } };
- type UpsertTagsResult = {
- unique: number;
- upserted: number;
- skipped: number;
- errors: Array<{ name: string; error: string }>;
- };
- @Injectable()
- export class ProviderVideoSyncService {
- private readonly logger = new Logger(ProviderVideoSyncService.name);
- private lastSyncSummary: ProviderVideoSyncResult | null = null;
- private readonly PROVIDER_API_URL =
- 'https://vm.rvakc.xyz/api/web/mediafile/search';
- private readonly DEFAULT_PROVIDER_CODE = 'RVAKC';
- private readonly MAX_PAGE_SIZE = 500;
- private readonly DEFAULT_PAGE_SIZE = 500;
- private readonly BATCH_SIZE = 100;
- private readonly BASELINE_PARTIAL_COUNT = 20000;
- constructor(
- private readonly mongo: MongoPrismaService,
- private readonly httpService: HttpService,
- ) {}
- async syncFromProvider(
- options: ProviderVideoSyncOptions = {},
- ): Promise<ProviderVideoSyncResult> {
- const providerCode = options.providerCode ?? this.DEFAULT_PROVIDER_CODE;
- const requestedPageSize = options.pageSize ?? this.DEFAULT_PAGE_SIZE;
- const pageSize = this.clampInt(requestedPageSize, 1, this.MAX_PAGE_SIZE);
- const fullSync = options.fullSync ?? false;
- const resetState = options.resetState ?? false;
- const paramStatus = options.param?.status ?? 'Completed';
- const optionUpdatedAt = options.param?.updatedAt;
- // Only one entity exists in your enum now
- const entity = EntityType.VIDEO;
- this.logger.log(
- `[syncFromProvider] Start provider=${providerCode} entity=${entity} fullSync=${fullSync} pageSize=${pageSize} resetState=${resetState}`,
- );
- // Load cursor from SyncState (or fresh if resetState)
- const { cursor: initialCursor, checkpointUpdatedAtCursor } =
- await this.loadCursor({
- entity,
- pageSize,
- resetState,
- overridePageNum: options.pageNum,
- optionUpdatedAt,
- fullSync,
- });
- // Counters (Option B: created always 0, updated counts successful upserts)
- let imported = 0;
- let updated = 0;
- let skipped = 0;
- const created = 0;
- const errors: Array<{ id?: string; error: string }> = [];
- // Track max updatedAt seen (for incremental cursor advancement)
- let maxUpdatedAtSeen: Date | null = null;
- // Full sync resumes with pageNum; incremental always starts at 1
- let pageNum = fullSync ? initialCursor.pageNum : 1;
- // Keep a working cursor that we will persist as we go
- const cursor: SyncCursor = {
- pageNum,
- pageSize: initialCursor.pageSize,
- updatedAtCursor: initialCursor.updatedAtCursor,
- };
- const effectiveUpdatedAtCursor = fullSync
- ? undefined
- : (options.param?.updatedAt ?? checkpointUpdatedAtCursor);
- const shouldRunBaselinePartial = !fullSync && !checkpointUpdatedAtCursor;
- try {
- if (shouldRunBaselinePartial) {
- const baselineResult = await this.runBaselinePartialIfNeeded({
- entity,
- cursor: initialCursor,
- paramStatus,
- optionsParam: options.param,
- });
- if (baselineResult) {
- this.lastSyncSummary = baselineResult;
- return baselineResult;
- }
- }
- while (true) {
- const body = this.buildProviderBody({
- pageNum,
- pageSize: cursor.pageSize,
- status: paramStatus,
- // fullSync: no updatedAt filter
- updatedAt: fullSync ? undefined : effectiveUpdatedAtCursor,
- extraParam: options.param,
- });
- this.logger.log(
- `[syncFromProvider] POST pageNum=${pageNum} pageSize=${cursor.pageSize} status=${paramStatus} updatedAt=${fullSync ? '(none)' : (body.param.updatedAt ?? '(none)')}`,
- );
- const rawList = await this.fetchPage(body);
- if (!rawList.length) {
- this.logger.log(
- `[syncFromProvider] No more records (pageNum=${pageNum}). Stop.`,
- );
- // On completion:
- // - fullSync: reset pageNum to 1 and set lastFullSyncAt
- // - incremental: advance updatedAtCursor to maxUpdatedAtSeen, keep pageNum=1
- const fullSyncCompleted = fullSync;
- if (!fullSync && maxUpdatedAtSeen && imported > 0) {
- await this.saveCheckpoint({
- entity,
- updatedAtCursor: maxUpdatedAtSeen.toISOString(),
- fullSyncCompleted: false,
- });
- }
- await this.saveCursor({
- entity,
- fullSyncCompleted,
- });
- const result: ProviderVideoSyncResult = {
- imported,
- created,
- updated,
- skipped,
- errors: errors.length ? errors.slice(0, 10) : undefined,
- };
- this.lastSyncSummary = result;
- return result;
- }
- imported += rawList.length;
- const processed = await this.processProviderRawList(
- rawList,
- maxUpdatedAtSeen,
- );
- updated += processed.updated;
- skipped += processed.skipped;
- errors.push(...processed.errors);
- maxUpdatedAtSeen = processed.maxUpdatedAtSeen;
- // Persist progress so we can resume on crash
- await this.saveCursor({
- entity,
- fullSyncCompleted: false,
- });
- pageNum += 1;
- }
- } catch (e: any) {
- this.logger.error(
- `[syncFromProvider] Unexpected error: ${e?.message ?? e}`,
- );
- // Best-effort cursor persistence
- try {
- await this.saveCursor({
- entity,
- fullSyncCompleted: false,
- });
- } catch (saveErr: any) {
- this.logger.error(
- `[syncFromProvider] Failed to persist cursor after error: ${saveErr?.message ?? saveErr}`,
- );
- }
- const result: ProviderVideoSyncResult = {
- imported,
- created,
- updated,
- skipped,
- errors: [
- ...(errors.length ? errors.slice(0, 9) : []),
- { error: e?.message ?? 'Unexpected error' },
- ],
- };
- this.lastSyncSummary = result;
- return result;
- }
- }
- private async runBaselinePartialIfNeeded(args: {
- entity: EntityType;
- cursor: SyncCursor;
- paramStatus: string;
- optionsParam?: ProviderVideoSyncOptions['param'];
- }): Promise<ProviderVideoSyncResult | null> {
- if (!args.cursor || args.cursor.updatedAtCursor !== undefined) {
- return null;
- }
- const probeBody = this.buildProviderBody({
- pageNum: 1,
- pageSize: args.cursor.pageSize,
- status: args.paramStatus,
- updatedAt: undefined,
- extraParam: args.optionsParam,
- });
- const pagination = await this.probeProviderForPaging(probeBody);
- let totalPages = pagination.totalPages;
- if (totalPages === undefined && pagination.total !== undefined) {
- totalPages = Math.max(
- 0,
- Math.ceil(pagination.total / args.cursor.pageSize),
- );
- }
- if (!totalPages || totalPages < 1) {
- this.logger.warn(
- '[syncFromProvider] Baseline partial skipped because provider did not disclose total/pages; cannot compute bottom→top range.',
- );
- return null;
- }
- const pagesNeeded = Math.min(
- totalPages,
- Math.ceil(this.BASELINE_PARTIAL_COUNT / args.cursor.pageSize),
- );
- if (pagesNeeded <= 0) {
- return null;
- }
- const startPage = totalPages;
- const endPage = Math.max(1, totalPages - pagesNeeded + 1);
- this.logger.log(
- `[syncFromProvider] Baseline partial (first-ever) running pages ${startPage} down to ${endPage}`,
- );
- let imported = 0;
- let updated = 0;
- let skipped = 0;
- const errors: Array<{ id?: string; error: string }> = [];
- let maxUpdatedAtSeen: Date | null = null;
- for (let page = startPage; page >= endPage; page -= 1) {
- const body = this.buildProviderBody({
- pageNum: page,
- pageSize: args.cursor.pageSize,
- status: args.paramStatus,
- updatedAt: undefined,
- extraParam: args.optionsParam,
- });
- this.logger.log(`[syncFromProvider] param body ${JSON.stringify(body)} `);
- const rawList = await this.fetchPage(body);
- if (!rawList.length) {
- this.logger.log(
- `[syncFromProvider] Baseline partial page ${page} returned 0 records; continuing.`,
- );
- continue;
- }
- imported += rawList.length;
- const processed = await this.processProviderRawList(
- rawList,
- maxUpdatedAtSeen,
- );
- updated += processed.updated;
- skipped += processed.skipped;
- errors.push(...processed.errors);
- maxUpdatedAtSeen = processed.maxUpdatedAtSeen;
- }
- if (maxUpdatedAtSeen && imported > 0) {
- await this.saveCheckpoint({
- entity: args.entity,
- updatedAtCursor: maxUpdatedAtSeen.toISOString(),
- fullSyncCompleted: false,
- });
- }
- return {
- imported,
- created: 0,
- updated,
- skipped,
- errors: errors.length ? errors.slice(0, 10) : undefined,
- };
- }
- private async probeProviderForPaging(body: {
- pageNum: number;
- pageSize: number;
- param: Record<string, any>;
- }): Promise<ProviderPagingInfo> {
- try {
- const response = await firstValueFrom(
- this.httpService.post(this.PROVIDER_API_URL, body, {
- headers: { 'Content-Type': 'application/json' },
- timeout: 30000,
- }),
- );
- return {
- total: this.extractNumberFromPaths(response.data, [
- 'total',
- 'data.total',
- 'data.totalCount',
- 'data.pageInfo.total',
- 'data.pageInfo.totalCount',
- ]),
- totalPages: this.extractNumberFromPaths(response.data, [
- 'pages',
- 'data.pages',
- 'data.totalPages',
- 'data.pageInfo.pages',
- 'data.pageInfo.totalPages',
- ]),
- };
- } catch (error: any) {
- this.logger.error(
- `[probeProviderForPaging] Provider API call failed: ${error?.message ?? error}`,
- );
- throw new Error(`Provider API error: ${error?.message ?? 'unknown'}`);
- }
- }
- private extractNumberFromPaths(
- data: any,
- paths: string[],
- ): number | undefined {
- if (!data || typeof data !== 'object') return undefined;
- for (const path of paths) {
- const value = path
- .split('.')
- .reduce<any>(
- (obj, key) => (obj && typeof obj === 'object' ? obj[key] : undefined),
- data,
- );
- if (value === undefined || value === null) continue;
- const num = typeof value === 'number' ? value : Number(value);
- if (Number.isFinite(num)) return num;
- }
- return undefined;
- }
- getLastSyncSummary(): ProviderVideoSyncResult | null {
- return this.lastSyncSummary;
- }
- private buildProviderBody(args: {
- pageNum: number;
- pageSize: number;
- status: string;
- updatedAt?: string;
- extraParam?: ProviderVideoSyncOptions['param'];
- }) {
- // Provider contract:
- // {
- // pageNum: 1,
- // pageSize: 200,
- // param: { status: "Completed", updatedAt: "ISO" }
- // }
- const param: Record<string, any> = {
- status: args.status,
- };
- // Keep only if present (incremental)
- if (args.updatedAt) param.updatedAt = args.updatedAt;
- // Merge any extraParam fields, but status/updatedAt above remain authoritative
- if (args.extraParam && typeof args.extraParam === 'object') {
- for (const [k, v] of Object.entries(args.extraParam)) {
- if (k === 'status' || k === 'updatedAt') continue;
- param[k] = v;
- }
- }
- return {
- pageNum: args.pageNum,
- pageSize: args.pageSize,
- param,
- };
- }
- private async fetchPage(body: {
- pageNum: number;
- pageSize: number;
- param: Record<string, any>;
- }): Promise<RawProviderVideo[]> {
- try {
- // Provider expects { data: "<json string>" } (based on code=400 Field=data expecting string)
- const wrappedBody = {
- data: JSON.stringify({
- pageNum: body.pageNum,
- pageSize: body.pageSize,
- param: body.param,
- }),
- };
- const response = await firstValueFrom(
- this.httpService.post(this.PROVIDER_API_URL, wrappedBody, {
- headers: { 'Content-Type': 'application/json' },
- timeout: 30_000,
- }),
- );
- // Axios response unwrap: providerJson is the actual provider payload
- const providerJson = (response as any)?.data ?? response;
- // Log a small preview for debugging (avoid huge logs)
- this.logger.log(
- `[fetchPage] Provider response preview: ${JSON.stringify(
- providerJson,
- ).slice(0, 400)}...`,
- );
- // Fail fast on provider errors (prevents "successful" runs with empty lists)
- const code = (providerJson as any)?.code;
- if (code !== 200) {
- const msg = (providerJson as any)?.msg ?? 'unknown';
- const tip = (providerJson as any)?.tip ?? '';
- throw new Error(
- `Provider error code=${code} msg=${msg}${tip ? ` tip=${tip}` : ''}`,
- );
- }
- const list = this.extractList(providerJson);
- this.logger.log(`[fetchPage] Received ${list.length} items`);
- return list;
- } catch (error: any) {
- this.logger.error(
- `[fetchPage] Provider API call failed: ${error?.message ?? error}`,
- );
- throw new Error(`Provider API error: ${error?.message ?? 'unknown'}`);
- }
- }
- private async processProviderRawList(
- rawList: RawProviderVideo[],
- currentMaxUpdatedAt: Date | null,
- ): Promise<{
- updated: number;
- skipped: number;
- errors: Array<{ id?: string; error: string }>;
- maxUpdatedAtSeen: Date | null;
- }> {
- if (!rawList.length) {
- return {
- updated: 0,
- skipped: 0,
- errors: [],
- maxUpdatedAtSeen: currentMaxUpdatedAt,
- };
- }
- const normalized = rawList.map((item) => this.normalizeItem(item));
- const hasSecondTags = normalized.some(
- (v) => Array.isArray(v.secondTags) && v.secondTags.length > 0,
- );
- if (hasSecondTags) {
- await this.upsertSecondTagsFromVideos_NoUniqueName(normalized);
- }
- let maxUpdatedAtSeen = currentMaxUpdatedAt;
- for (const n of normalized) {
- const d = n.updatedAt as Date;
- if (!maxUpdatedAtSeen || d.getTime() > maxUpdatedAtSeen.getTime()) {
- maxUpdatedAtSeen = d;
- }
- }
- let updated = 0;
- let skipped = 0;
- const errors: Array<{ id?: string; error: string }> = [];
- for (let i = 0; i < normalized.length; i += this.BATCH_SIZE) {
- const batch = normalized.slice(i, i + this.BATCH_SIZE);
- // eslint-disable-next-line no-await-in-loop
- const outcomes = await Promise.all(batch.map((r) => this.upsertOne(r)));
- const okCount = outcomes.filter((o) => o.ok).length;
- const fail = outcomes.filter((o) => !o.ok) as Array<
- Extract<UpsertOutcome, { ok: false }>
- >;
- updated += okCount;
- skipped += fail.length;
- for (const f of fail) errors.push(f.error);
- }
- return {
- updated,
- skipped,
- errors,
- maxUpdatedAtSeen,
- };
- }
- private debugRespShape(resp: unknown) {
- const r: any = resp as any;
- const keys = r && typeof r === 'object' ? Object.keys(r).slice(0, 12) : [];
- const dataKeys =
- r?.data && typeof r.data === 'object'
- ? Object.keys(r.data).slice(0, 12)
- : [];
- const dataDataKeys =
- r?.data?.data && typeof r.data.data === 'object'
- ? Object.keys(r.data.data).slice(0, 12)
- : [];
- this.logger.warn(
- `[debugRespShape] topKeys=${JSON.stringify(keys)} dataKeys=${JSON.stringify(
- dataKeys,
- )} dataDataKeys=${JSON.stringify(dataDataKeys)} hasStatus=${Boolean(r?.status)} hasCode=${Boolean(
- r?.code,
- )} hasDataCode=${Boolean(r?.data?.code)}`,
- );
- }
- private extractList(apiResponse: unknown): RawProviderVideo[] {
- const data = apiResponse as any;
- if (Array.isArray(data)) return data as RawProviderVideo[];
- // ✅ axios response: { data: { code, data: { total, list } } }
- if (data?.data?.data?.list && Array.isArray(data.data.data.list)) {
- return data.data.data.list as RawProviderVideo[];
- }
- // provider json directly: { code, data: { total, list } }
- if (data?.data?.list && Array.isArray(data.data.list)) {
- return data.data.list as RawProviderVideo[];
- }
- if (data?.list && Array.isArray(data.list)) {
- return data.list as RawProviderVideo[];
- }
- if (data?.data?.records && Array.isArray(data.data.records)) {
- return data.data.records as RawProviderVideo[];
- }
- this.logger.warn(
- '[extractList] Unexpected API response structure, defaulting to empty list',
- );
- return [];
- }
- private normalizeItem(item: RawProviderVideo) {
- if (!item.id) throw new Error('Each item must have an id');
- if (!item.addedTime || !item.createdAt || !item.updatedAt) {
- throw new Error(`Item ${item.id} is missing required datetime fields`);
- }
- const addedTime = new Date(item.addedTime);
- const createdAt = new Date(item.createdAt);
- const updatedAt = new Date(item.updatedAt);
- if (
- isNaN(addedTime.getTime()) ||
- isNaN(createdAt.getTime()) ||
- isNaN(updatedAt.getTime())
- ) {
- throw new Error(`Item ${item.id} has invalid datetime format`);
- }
- return {
- id: item.id, // confirmed Mongo ObjectId string
- srcId: item.srcId ?? 0,
- title: item.title ?? '',
- checkSum: item.checkSum ?? '',
- type: item.type ?? '',
- formatType: item.formatType ?? 0,
- contentType: item.contentType ?? 0,
- coverType: item.coverType ?? 0,
- coverImg: item.coverImg ?? '',
- coverImgNew: item.coverImgNew ?? '',
- videoTime: item.videoTime ?? 0,
- publish: item.publish ?? '',
- country: item.country ?? '',
- firstTag: item.firstTag ?? '',
- secondTags: item.secondTags ?? [],
- mediaSet: item.mediaSet ?? '',
- preFileName: item.preFileName ?? '',
- status: item.status ?? '',
- desc: item.desc ?? '',
- size: BigInt(item.size ?? 0),
- bango: item.bango ?? '',
- actors: item.actors ?? [],
- studio: item.studio ?? '',
- addedTime,
- appids: item.appids ?? [],
- japanNames: item.japanNames ?? [],
- filename: item.filename ?? '',
- fieldNameFs: item.fieldNameFs ?? '',
- ext: item.ext ?? '',
- taskId: item.taskId ?? '',
- width: item.width ?? 0,
- height: item.height ?? 0,
- ratio: item.ratio ?? 0,
- frameRate: item.frameRate ?? '',
- syBitRate: item.syBitRate ?? '',
- vidBitRate: item.vidBitRate ?? '',
- proxyUpload: item.proxyUpload ?? 0,
- isAdd: item.isAdd ?? false,
- retry: item.retry ?? 0,
- notifySignal: item.notifySignal ?? false,
- mergeRetry: item.mergeRetry ?? 0,
- compressRetry: item.compressRetry ?? 0,
- segmentRetry: item.segmentRetry ?? 0,
- linodeRetry: item.linodeRetry ?? 0,
- failReason: item.failReason ?? '',
- deleteDisk: item.deleteDisk ?? false,
- infoTsName: item.infoTsName ?? '',
- createdAt,
- updatedAt,
- };
- }
- private async upsertOne(record: any): Promise<UpsertOutcome> {
- const id = record?.id as string | undefined;
- if (!id) return { ok: false, error: { error: 'Missing id' } };
- try {
- const { id: _, ...updateData } = record;
- await this.mongo.videoMedia.upsert({
- where: { id },
- create: record,
- update: updateData,
- });
- return { ok: true };
- } catch (e: any) {
- return {
- ok: false,
- error: { id, error: e?.message ?? 'Upsert failed' },
- };
- }
- }
- private async loadCursor(args: {
- entity: EntityType;
- pageSize: number;
- resetState: boolean;
- overridePageNum?: number;
- optionUpdatedAt?: string;
- fullSync: boolean;
- }): Promise<{ cursor: SyncCursor; checkpointUpdatedAtCursor?: string }> {
- if (args.resetState) {
- return {
- cursor: {
- pageNum: args.overridePageNum ?? 1,
- pageSize: args.pageSize,
- updatedAtCursor: args.fullSync ? undefined : args.optionUpdatedAt,
- },
- checkpointUpdatedAtCursor: undefined,
- };
- }
- const checkpoint = await this.loadCheckpoint(args.entity);
- const cursor: SyncCursor = {
- pageNum: args.overridePageNum ?? 1,
- pageSize: args.pageSize,
- updatedAtCursor: args.fullSync
- ? undefined
- : (checkpoint.updatedAtCursor ?? args.optionUpdatedAt),
- };
- return { cursor, checkpointUpdatedAtCursor: checkpoint.updatedAtCursor };
- }
- private async saveCursor(args: {
- entity: EntityType;
- fullSyncCompleted: boolean;
- }) {
- const now = new Date();
- const nowSec = Math.floor(Date.now() / 1000);
- await this.mongo.syncState.update({
- where: { entity: args.entity },
- data: {
- lastRunAt: now,
- lastFullSyncAt: args.fullSyncCompleted ? now : undefined,
- updatedAt: nowSec,
- },
- });
- }
- private async loadCheckpoint(entity: EntityType): Promise<{
- updatedAtCursor?: string;
- }> {
- const nowSec = Math.floor(Date.now() / 1000);
- const state = await this.mongo.syncState.upsert({
- where: { entity },
- update: {
- updatedAt: nowSec,
- },
- create: {
- entity,
- referId: null,
- lastRunAt: null,
- lastFullSyncAt: null,
- createdAt: nowSec,
- updatedAt: nowSec,
- },
- });
- const parsed = this.safeParseCursor(state.referId);
- return { updatedAtCursor: parsed?.updatedAtCursor };
- }
- private async saveCheckpoint(args: {
- entity: EntityType;
- updatedAtCursor?: string | null;
- fullSyncCompleted: boolean;
- }) {
- const now = new Date();
- const nowSec = Math.floor(Date.now() / 1000);
- await this.mongo.syncState.update({
- where: { entity: args.entity },
- data: {
- referId:
- args.updatedAtCursor !== undefined && args.updatedAtCursor !== null
- ? JSON.stringify({ updatedAtCursor: args.updatedAtCursor })
- : null,
- lastRunAt: now,
- lastFullSyncAt: args.fullSyncCompleted ? now : undefined,
- updatedAt: nowSec,
- },
- });
- }
- private safeParseCursor(
- raw: string | null | undefined,
- ): Partial<SyncCursor> | null {
- if (!raw) return null;
- try {
- const parsed = JSON.parse(raw) as any;
- if (!parsed || typeof parsed !== 'object') return null;
- return parsed as Partial<SyncCursor>;
- } catch {
- return null;
- }
- }
- private clampInt(n: number, min: number, max: number): number {
- const x = Number.isFinite(n) ? Math.trunc(n) : min;
- return Math.max(min, Math.min(max, x));
- }
- /**
- * Extract secondTags (string[]) from normalized video records and upsert into Tag collection.
- * - Dedup in-memory per call for performance
- * - Trims whitespace, filters empty
- * - Option B performance-first: upsert without pre-check
- */
- // private async upsertSecondTagsFromVideos(
- // normalizedVideos: Array<{ secondTags?: string[] }>,
- // ): Promise<UpsertTagsResult> {
- // // 1) Collect + normalize
- // const set = new Set<string>();
- // for (const v of normalizedVideos) {
- // const tags = v.secondTags ?? [];
- // for (const t of tags) {
- // if (typeof t !== 'string') continue;
- // const name = t.trim();
- // if (!name) continue;
- // set.add(name);
- // }
- // }
- // const names = Array.from(set);
- // if (!names.length) {
- // return { unique: 0, upserted: 0, skipped: 0, errors: [] };
- // }
- // // 2) Upsert in chunks (avoid massive Promise.all)
- // const CHUNK = 200;
- // let upserted = 0;
- // let skipped = 0;
- // const errors: Array<{ name: string; error: string }> = [];
- // for (let i = 0; i < names.length; i += CHUNK) {
- // const batch = names.slice(i, i + CHUNK);
- // // eslint-disable-next-line no-await-in-loop
- // const outcomes = await Promise.all(
- // batch.map(async (name) => {
- // try {
- // // 🔁 Adjust `where/create/update` if your Tag schema differs
- // await this.mongo.tag.upsert({
- // where: { name },
- // create: {
- // name,
- // // If Tag requires createdAt/updatedAt ints (seconds), uncomment:
- // // createdAt: Math.floor(Date.now() / 1000),
- // // updatedAt: Math.floor(Date.now() / 1000),
- // },
- // update: {
- // // keep it minimal; optionally touch updatedAt
- // // updatedAt: Math.floor(Date.now() / 1000),
- // },
- // });
- // return { ok: true as const };
- // } catch (e: any) {
- // return {
- // ok: false as const,
- // error: e?.message ?? 'Tag upsert failed',
- // };
- // }
- // }),
- // );
- // for (let j = 0; j < outcomes.length; j += 1) {
- // const o = outcomes[j];
- // if (o.ok) upserted += 1;
- // else {
- // skipped += 1;
- // errors.push({ name: batch[j], error: o.error });
- // }
- // }
- // }
- // if (errors.length) {
- // this.logger.warn(
- // `[upsertSecondTagsFromVideos] tag upsert errors=${errors.length}, sample=${JSON.stringify(
- // errors.slice(0, 3),
- // )}`,
- // );
- // } else {
- // this.logger.log(
- // `[upsertSecondTagsFromVideos] Upserted tags=${upserted} (unique=${names.length})`,
- // );
- // }
- // return {
- // unique: names.length,
- // upserted,
- // skipped,
- // errors,
- // };
- // }
- private async upsertSecondTagsFromVideos_NoUniqueName(
- normalizedVideos: Array<{ secondTags?: string[] }>,
- ): Promise<UpsertTagsResult> {
- try {
- const set = new Set<string>();
- for (const v of normalizedVideos) {
- const tags = v.secondTags ?? [];
- for (const t of tags) {
- if (typeof t !== 'string') continue;
- const name = t.trim();
- if (!name) continue;
- set.add(name);
- }
- }
- const names = Array.from(set);
- if (!names.length)
- return { unique: 0, upserted: 0, skipped: 0, errors: [] };
- // Concurrency limit to reduce race collisions and DB pressure
- const CONCURRENCY = 20;
- let idx = 0;
- let upserted = 0;
- let skipped = 0;
- const errors: Array<{ name: string; error: string }> = [];
- const worker = async () => {
- while (true) {
- const current = idx;
- idx += 1;
- if (current >= names.length) return;
- const name = names[current];
- try {
- // 1) check existence by name (NOT unique)
- const exists = await this.mongo.tag.findFirst({
- where: { name },
- select: { id: true },
- });
- if (exists?.id) {
- // already exists
- continue;
- }
- // 2) create if not exists
- await this.mongo.tag.create({
- data: {
- name,
- // If your Tag schema requires seconds fields:
- // createdAt: Math.floor(Date.now() / 1000),
- // updatedAt: Math.floor(Date.now() / 1000),
- },
- });
- upserted += 1;
- } catch (e: any) {
- // If another worker created it after our check, create may fail (duplicate on some index)
- // We treat that as skipped (safe).
- const msg = e?.message ?? 'Tag create failed';
- skipped += 1;
- errors.push({ name, error: msg });
- }
- }
- };
- await Promise.all(Array.from({ length: CONCURRENCY }, () => worker()));
- if (errors.length) {
- this.logger.warn(
- `[upsertSecondTagsFromVideos] errors=${errors.length}, sample=${JSON.stringify(
- errors.slice(0, 3),
- )}`,
- );
- } else {
- this.logger.log(
- `[upsertSecondTagsFromVideos] unique=${names.length} created=${upserted}`,
- );
- }
- return { unique: names.length, upserted, skipped, errors };
- } catch (error: any) {
- const message = error?.message ?? 'Unhandled tag upsert error';
- const trace = error?.stack ?? undefined;
- this.logger.error(
- `[upsertSecondTagsFromVideos_NoUniqueName] ${message}`,
- trace,
- );
- return {
- unique: 0,
- upserted: 0,
- skipped: 0,
- errors: [{ name: 'global', error: message }],
- };
- }
- }
- }
|