| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121 |
- // provider-video-sync.service.ts
- import { Injectable, Logger } from '@nestjs/common';
- import { HttpService } from '@nestjs/axios';
- import { MongoPrismaService } from '@box/db/prisma/mongo-prisma.service';
- import { SysConfigReaderService } from '@box/core/sys-config/sys-config-reader.service';
- import { firstValueFrom } from 'rxjs';
- import { EntityType } from '@prisma/mongo/client';
- export interface ProviderVideoSyncOptions {
- providerCode?: string;
- /**
- * Optional override. In normal usage, we resume from SyncState cursor:
- * - fullSync: pageNum resumes
- * - incremental: always pageNum=1
- */
- pageNum?: number;
- /**
- * Default 200, hard-capped to 500.
- */
- pageSize?: number;
- /**
- * Provider search param.
- * - status: required in your business rule ("Completed")
- * - updatedAt: ISO string filter "updated after"
- */
- param?: {
- status?: string;
- updatedAt?: string;
- [k: string]: any;
- };
- /**
- * fullSync:
- * - true: no param.updatedAt; resume using stored pageNum
- * - false: use param.updatedAt (cursor); pageNum forced to 1
- */
- fullSync?: boolean;
- /**
- * If true: ignore stored cursor and start fresh.
- * - fullSync: pageNum from options.pageNum or 1
- * - incremental: updatedAtCursor from options.param.updatedAt (if provided)
- */
- resetState?: boolean;
- [key: string]: any;
- }
- export interface ProviderVideoSyncResult {
- imported: number;
- created: number; // Option B: always 0
- updated: number; // Option B: equals successful upserts
- skipped: number;
- errors?: Array<{ id?: string; error: string }>;
- }
- interface RawProviderVideo {
- id: string;
- srcId?: number;
- title?: string;
- checkSum?: string;
- type?: string;
- formatType?: number;
- contentType?: number;
- coverType?: number;
- coverImg?: string;
- coverImgNew?: string;
- videoTime?: number;
- publish?: string;
- country?: string;
- firstTag?: string;
- secondTags?: string[] | null;
- mediaSet?: string | null;
- preFileName?: string;
- status?: string;
- desc?: string;
- size?: number;
- bango?: string;
- actors?: string[] | null;
- studio?: string;
- addedTime: string;
- appids?: number[] | null;
- japanNames?: string[] | null;
- filename?: string;
- fieldNameFs?: string;
- ext?: string;
- taskId?: string;
- width?: number;
- height?: number;
- ratio?: number;
- frameRate?: string;
- syBitRate?: string;
- vidBitRate?: string;
- createdAt: string;
- updatedAt: string;
- proxyUpload?: number | null;
- isAdd?: boolean;
- retry?: number;
- notifySignal?: boolean;
- mergeRetry?: number;
- compressRetry?: number;
- segmentRetry?: number;
- linodeRetry?: number;
- failReason?: string;
- deleteDisk?: boolean;
- infoTsName?: string;
- }
- type SyncCursor = {
- pageNum: number;
- pageSize: number;
- updatedAtCursor?: string;
- };
- type ProviderPagingInfo = {
- total?: number;
- totalPages?: number;
- };
- type UpsertOutcome =
- | { ok: true }
- | { ok: false; error: { id?: string; error: string } };
- type UpsertTagsResult = {
- unique: number;
- upserted: number;
- skipped: number;
- errors: Array<{ name: string; error: string }>;
- };
- @Injectable()
- export class ProviderVideoSyncService {
- private readonly logger = new Logger(ProviderVideoSyncService.name);
- private lastSyncSummary: ProviderVideoSyncResult | null = null;
- private readonly MAX_PAGE_SIZE = 500;
- private readonly DEFAULT_PAGE_SIZE = 500;
- private readonly BATCH_SIZE = 100;
- private readonly BASELINE_PARTIAL_COUNT = 20000;
- constructor(
- private readonly mongo: MongoPrismaService,
- private readonly httpService: HttpService,
- private readonly sysConfigReader: SysConfigReaderService,
- ) {}
- async syncFromProvider(
- options: ProviderVideoSyncOptions = {},
- ): Promise<ProviderVideoSyncResult> {
- const providerConfig = await this.sysConfigReader.getProviderConfig();
- const providerCode = options.providerCode ?? providerConfig.providerCode;
- const providerApiUrl = providerConfig.apiUrl;
- if (!providerApiUrl) {
- throw new Error(
- 'sysConfig.provider.apiUrl is required for provider sync',
- );
- }
- const defaultPageSize = providerConfig.itemsLimit ?? this.DEFAULT_PAGE_SIZE;
- const requestedPageSize = options.pageSize ?? defaultPageSize;
- const pageSize = this.clampInt(requestedPageSize, 1, this.MAX_PAGE_SIZE);
- const fullSync = options.fullSync ?? false;
- const resetState = options.resetState ?? false;
- const paramStatus = options.param?.status ?? 'Completed';
- const optionUpdatedAt = options.param?.updatedAt;
- // Only one entity exists in your enum now
- const entity = EntityType.VIDEO;
- this.logger.log(
- `[syncFromProvider] Start provider=${providerCode} entity=${entity} fullSync=${fullSync} pageSize=${pageSize} resetState=${resetState}`,
- );
- // Load cursor from SyncState (or fresh if resetState)
- const { cursor: initialCursor, checkpointUpdatedAtCursor } =
- await this.loadCursor({
- entity,
- pageSize,
- resetState,
- overridePageNum: options.pageNum,
- optionUpdatedAt,
- fullSync,
- });
- // Counters (Option B: created always 0, updated counts successful upserts)
- let imported = 0;
- let updated = 0;
- let skipped = 0;
- const created = 0;
- const errors: Array<{ id?: string; error: string }> = [];
- // Track max updatedAt seen (for incremental cursor advancement)
- let maxUpdatedAtSeen: Date | null = null;
- // Full sync resumes with pageNum; incremental always starts at 1
- let pageNum = fullSync ? initialCursor.pageNum : 1;
- // Keep a working cursor that we will persist as we go
- const cursor: SyncCursor = {
- pageNum,
- pageSize: initialCursor.pageSize,
- updatedAtCursor: initialCursor.updatedAtCursor,
- };
- const effectiveUpdatedAtCursor = fullSync
- ? undefined
- : (options.param?.updatedAt ?? checkpointUpdatedAtCursor);
- const shouldRunBaselinePartial = !fullSync && !checkpointUpdatedAtCursor;
- try {
- if (shouldRunBaselinePartial) {
- const baselineResult = await this.runBaselinePartialIfNeeded({
- entity,
- cursor: initialCursor,
- paramStatus,
- optionsParam: options.param,
- apiUrl: providerApiUrl,
- });
- if (baselineResult) {
- this.lastSyncSummary = baselineResult;
- return baselineResult;
- }
- }
- while (true) {
- const body = this.buildProviderBody({
- pageNum,
- pageSize: cursor.pageSize,
- status: paramStatus,
- // fullSync: no updatedAt filter
- updatedAt: fullSync ? undefined : effectiveUpdatedAtCursor,
- extraParam: options.param,
- });
- this.logger.log(
- `[syncFromProvider] POST pageNum=${pageNum} pageSize=${cursor.pageSize} status=${paramStatus} updatedAt=${fullSync ? '(none)' : (body.param.updatedAt ?? '(none)')}`,
- );
- const rawList = await this.fetchPage(providerApiUrl, body);
- if (!rawList.length) {
- this.logger.log(
- `[syncFromProvider] No more records (pageNum=${pageNum}). Stop.`,
- );
- // On completion:
- // - fullSync: reset pageNum to 1 and set lastFullSyncAt
- // - incremental: advance updatedAtCursor to maxUpdatedAtSeen, keep pageNum=1
- const fullSyncCompleted = fullSync;
- if (!fullSync && maxUpdatedAtSeen && imported > 0) {
- await this.saveCheckpoint({
- entity,
- updatedAtCursor: maxUpdatedAtSeen.toISOString(),
- fullSyncCompleted: false,
- });
- }
- await this.saveCursor({
- entity,
- fullSyncCompleted,
- });
- const result: ProviderVideoSyncResult = {
- imported,
- created,
- updated,
- skipped,
- errors: errors.length ? errors.slice(0, 10) : undefined,
- };
- this.lastSyncSummary = result;
- return result;
- }
- imported += rawList.length;
- const processed = await this.processProviderRawList(
- rawList,
- maxUpdatedAtSeen,
- );
- updated += processed.updated;
- skipped += processed.skipped;
- errors.push(...processed.errors);
- maxUpdatedAtSeen = processed.maxUpdatedAtSeen;
- // Persist progress so we can resume on crash
- await this.saveCursor({
- entity,
- fullSyncCompleted: false,
- });
- pageNum += 1;
- }
- } catch (e: any) {
- this.logger.error(
- `[syncFromProvider] Unexpected error: ${e?.message ?? e}`,
- );
- // Best-effort cursor persistence
- try {
- await this.saveCursor({
- entity,
- fullSyncCompleted: false,
- });
- } catch (saveErr: any) {
- this.logger.error(
- `[syncFromProvider] Failed to persist cursor after error: ${saveErr?.message ?? saveErr}`,
- );
- }
- const result: ProviderVideoSyncResult = {
- imported,
- created,
- updated,
- skipped,
- errors: [
- ...(errors.length ? errors.slice(0, 9) : []),
- { error: e?.message ?? 'Unexpected error' },
- ],
- };
- this.lastSyncSummary = result;
- return result;
- }
- }
- private async runBaselinePartialIfNeeded(args: {
- entity: EntityType;
- cursor: SyncCursor;
- paramStatus: string;
- apiUrl: string;
- optionsParam?: ProviderVideoSyncOptions['param'];
- }): Promise<ProviderVideoSyncResult | null> {
- if (!args.cursor || args.cursor.updatedAtCursor !== undefined) {
- return null;
- }
- const probeBody = this.buildProviderBody({
- pageNum: 1,
- pageSize: args.cursor.pageSize,
- status: args.paramStatus,
- updatedAt: undefined,
- extraParam: args.optionsParam,
- });
- const pagination = await this.probeProviderForPaging(
- args.apiUrl,
- probeBody,
- );
- let totalPages = pagination.totalPages;
- if (totalPages === undefined && pagination.total !== undefined) {
- totalPages = Math.max(
- 0,
- Math.ceil(pagination.total / args.cursor.pageSize),
- );
- }
- if (!totalPages || totalPages < 1) {
- this.logger.warn(
- '[syncFromProvider] Baseline partial skipped because provider did not disclose total/pages; cannot compute bottom→top range.',
- );
- return null;
- }
- const pagesNeeded = Math.min(
- totalPages,
- Math.ceil(this.BASELINE_PARTIAL_COUNT / args.cursor.pageSize),
- );
- if (pagesNeeded <= 0) {
- return null;
- }
- const startPage = totalPages;
- const endPage = Math.max(1, totalPages - pagesNeeded + 1);
- this.logger.log(
- `[syncFromProvider] Baseline partial (first-ever) running pages ${startPage} down to ${endPage}`,
- );
- let imported = 0;
- let updated = 0;
- let skipped = 0;
- const errors: Array<{ id?: string; error: string }> = [];
- let maxUpdatedAtSeen: Date | null = null;
- for (let page = startPage; page >= endPage; page -= 1) {
- const body = this.buildProviderBody({
- pageNum: page,
- pageSize: args.cursor.pageSize,
- status: args.paramStatus,
- updatedAt: undefined,
- extraParam: args.optionsParam,
- });
- this.logger.log(`[syncFromProvider] param body ${JSON.stringify(body)} `);
- const rawList = await this.fetchPage(args.apiUrl, body);
- if (!rawList.length) {
- this.logger.log(
- `[syncFromProvider] Baseline partial page ${page} returned 0 records; continuing.`,
- );
- continue;
- }
- imported += rawList.length;
- const processed = await this.processProviderRawList(
- rawList,
- maxUpdatedAtSeen,
- );
- updated += processed.updated;
- skipped += processed.skipped;
- errors.push(...processed.errors);
- maxUpdatedAtSeen = processed.maxUpdatedAtSeen;
- }
- if (maxUpdatedAtSeen && imported > 0) {
- await this.saveCheckpoint({
- entity: args.entity,
- updatedAtCursor: maxUpdatedAtSeen.toISOString(),
- fullSyncCompleted: false,
- });
- }
- return {
- imported,
- created: 0,
- updated,
- skipped,
- errors: errors.length ? errors.slice(0, 10) : undefined,
- };
- }
- private async probeProviderForPaging(
- apiUrl: string,
- body: {
- pageNum: number;
- pageSize: number;
- param: Record<string, any>;
- },
- ): Promise<ProviderPagingInfo> {
- try {
- const response = await firstValueFrom(
- this.httpService.post(apiUrl, body, {
- headers: { 'Content-Type': 'application/json' },
- timeout: 30000,
- }),
- );
- return {
- total: this.extractNumberFromPaths(response.data, [
- 'total',
- 'data.total',
- 'data.totalCount',
- 'data.pageInfo.total',
- 'data.pageInfo.totalCount',
- ]),
- totalPages: this.extractNumberFromPaths(response.data, [
- 'pages',
- 'data.pages',
- 'data.totalPages',
- 'data.pageInfo.pages',
- 'data.pageInfo.totalPages',
- ]),
- };
- } catch (error: any) {
- this.logger.error(
- `[probeProviderForPaging] Provider API call failed: ${error?.message ?? error}`,
- );
- throw new Error(`Provider API error: ${error?.message ?? 'unknown'}`);
- }
- }
- private extractNumberFromPaths(
- data: any,
- paths: string[],
- ): number | undefined {
- if (!data || typeof data !== 'object') return undefined;
- for (const path of paths) {
- const value = path
- .split('.')
- .reduce<any>(
- (obj, key) => (obj && typeof obj === 'object' ? obj[key] : undefined),
- data,
- );
- if (value === undefined || value === null) continue;
- const num = typeof value === 'number' ? value : Number(value);
- if (Number.isFinite(num)) return num;
- }
- return undefined;
- }
- getLastSyncSummary(): ProviderVideoSyncResult | null {
- return this.lastSyncSummary;
- }
- private buildProviderBody(args: {
- pageNum: number;
- pageSize: number;
- status: string;
- updatedAt?: string;
- extraParam?: ProviderVideoSyncOptions['param'];
- }) {
- // Provider contract:
- // {
- // pageNum: 1,
- // pageSize: 200,
- // param: { status: "Completed", updatedAt: "ISO" }
- // }
- const param: Record<string, any> = {
- status: args.status,
- };
- // Keep only if present (incremental)
- if (args.updatedAt) param.updatedAt = args.updatedAt;
- // Merge any extraParam fields, but status/updatedAt above remain authoritative
- if (args.extraParam && typeof args.extraParam === 'object') {
- for (const [k, v] of Object.entries(args.extraParam)) {
- if (k === 'status' || k === 'updatedAt') continue;
- param[k] = v;
- }
- }
- return {
- pageNum: args.pageNum,
- pageSize: args.pageSize,
- param,
- };
- }
- private async fetchPage(
- apiUrl: string,
- body: {
- pageNum: number;
- pageSize: number;
- param: Record<string, any>;
- },
- ): Promise<RawProviderVideo[]> {
- try {
- // Provider expects { data: "<json string>" } (based on code=400 Field=data expecting string)
- const wrappedBody = {
- data: JSON.stringify({
- pageNum: body.pageNum,
- pageSize: body.pageSize,
- param: body.param,
- }),
- };
- const response = await firstValueFrom(
- this.httpService.post(apiUrl, wrappedBody, {
- headers: { 'Content-Type': 'application/json' },
- timeout: 30_000,
- }),
- );
- // Axios response unwrap: providerJson is the actual provider payload
- const providerJson = (response as any)?.data ?? response;
- // Log a small preview for debugging (avoid huge logs)
- this.logger.log(
- `[fetchPage] Provider response preview: ${JSON.stringify(
- providerJson,
- ).slice(0, 400)}...`,
- );
- // Fail fast on provider errors (prevents "successful" runs with empty lists)
- const code = (providerJson as any)?.code;
- if (code !== 200) {
- const msg = (providerJson as any)?.msg ?? 'unknown';
- const tip = (providerJson as any)?.tip ?? '';
- throw new Error(
- `Provider error code=${code} msg=${msg}${tip ? ` tip=${tip}` : ''}`,
- );
- }
- const list = this.extractList(providerJson);
- this.logger.log(`[fetchPage] Received ${list.length} items`);
- return list;
- } catch (error: any) {
- this.logger.error(
- `[fetchPage] Provider API call failed: ${error?.message ?? error}`,
- );
- throw new Error(`Provider API error: ${error?.message ?? 'unknown'}`);
- }
- }
- private async processProviderRawList(
- rawList: RawProviderVideo[],
- currentMaxUpdatedAt: Date | null,
- ): Promise<{
- updated: number;
- skipped: number;
- errors: Array<{ id?: string; error: string }>;
- maxUpdatedAtSeen: Date | null;
- }> {
- if (!rawList.length) {
- return {
- updated: 0,
- skipped: 0,
- errors: [],
- maxUpdatedAtSeen: currentMaxUpdatedAt,
- };
- }
- const normalized = rawList.map((item) => this.normalizeItem(item));
- const hasSecondTags = normalized.some(
- (v) => Array.isArray(v.secondTags) && v.secondTags.length > 0,
- );
- if (hasSecondTags) {
- await this.upsertSecondTagsFromVideos_NoUniqueName(normalized);
- }
- let maxUpdatedAtSeen = currentMaxUpdatedAt;
- for (const n of normalized) {
- const d = n.updatedAt as Date;
- if (!maxUpdatedAtSeen || d.getTime() > maxUpdatedAtSeen.getTime()) {
- maxUpdatedAtSeen = d;
- }
- }
- let updated = 0;
- let skipped = 0;
- const errors: Array<{ id?: string; error: string }> = [];
- for (let i = 0; i < normalized.length; i += this.BATCH_SIZE) {
- const batch = normalized.slice(i, i + this.BATCH_SIZE);
- // eslint-disable-next-line no-await-in-loop
- const outcomes = await Promise.all(batch.map((r) => this.upsertOne(r)));
- const okCount = outcomes.filter((o) => o.ok).length;
- const fail = outcomes.filter((o) => !o.ok) as Array<
- Extract<UpsertOutcome, { ok: false }>
- >;
- updated += okCount;
- skipped += fail.length;
- for (const f of fail) errors.push(f.error);
- }
- return {
- updated,
- skipped,
- errors,
- maxUpdatedAtSeen,
- };
- }
- private debugRespShape(resp: unknown) {
- const r: any = resp as any;
- const keys = r && typeof r === 'object' ? Object.keys(r).slice(0, 12) : [];
- const dataKeys =
- r?.data && typeof r.data === 'object'
- ? Object.keys(r.data).slice(0, 12)
- : [];
- const dataDataKeys =
- r?.data?.data && typeof r.data.data === 'object'
- ? Object.keys(r.data.data).slice(0, 12)
- : [];
- this.logger.warn(
- `[debugRespShape] topKeys=${JSON.stringify(keys)} dataKeys=${JSON.stringify(
- dataKeys,
- )} dataDataKeys=${JSON.stringify(dataDataKeys)} hasStatus=${Boolean(r?.status)} hasCode=${Boolean(
- r?.code,
- )} hasDataCode=${Boolean(r?.data?.code)}`,
- );
- }
- private extractList(apiResponse: unknown): RawProviderVideo[] {
- const data = apiResponse as any;
- if (Array.isArray(data)) return data as RawProviderVideo[];
- // ✅ axios response: { data: { code, data: { total, list } } }
- if (data?.data?.data?.list && Array.isArray(data.data.data.list)) {
- return data.data.data.list as RawProviderVideo[];
- }
- // provider json directly: { code, data: { total, list } }
- if (data?.data?.list && Array.isArray(data.data.list)) {
- return data.data.list as RawProviderVideo[];
- }
- if (data?.list && Array.isArray(data.list)) {
- return data.list as RawProviderVideo[];
- }
- if (data?.data?.records && Array.isArray(data.data.records)) {
- return data.data.records as RawProviderVideo[];
- }
- this.logger.warn(
- '[extractList] Unexpected API response structure, defaulting to empty list',
- );
- return [];
- }
- private normalizeItem(item: RawProviderVideo) {
- if (!item.id) throw new Error('Each item must have an id');
- if (!item.addedTime || !item.createdAt || !item.updatedAt) {
- throw new Error(`Item ${item.id} is missing required datetime fields`);
- }
- const addedTime = new Date(item.addedTime);
- const createdAt = new Date(item.createdAt);
- const updatedAt = new Date(item.updatedAt);
- if (
- isNaN(addedTime.getTime()) ||
- isNaN(createdAt.getTime()) ||
- isNaN(updatedAt.getTime())
- ) {
- throw new Error(`Item ${item.id} has invalid datetime format`);
- }
- return {
- id: item.id, // confirmed Mongo ObjectId string
- srcId: item.srcId ?? 0,
- title: item.title ?? '',
- checkSum: item.checkSum ?? '',
- type: item.type ?? '',
- formatType: item.formatType ?? 0,
- contentType: item.contentType ?? 0,
- coverType: item.coverType ?? 0,
- coverImg: item.coverImg ?? '',
- coverImgNew: item.coverImgNew ?? '',
- videoTime: item.videoTime ?? 0,
- publish: item.publish ?? '',
- country: item.country ?? '',
- firstTag: item.firstTag ?? '',
- secondTags: item.secondTags ?? [],
- mediaSet: item.mediaSet ?? '',
- preFileName: item.preFileName ?? '',
- status: item.status ?? '',
- desc: item.desc ?? '',
- size: BigInt(item.size ?? 0),
- bango: item.bango ?? '',
- actors: item.actors ?? [],
- studio: item.studio ?? '',
- addedTime,
- appids: item.appids ?? [],
- japanNames: item.japanNames ?? [],
- filename: item.filename ?? '',
- fieldNameFs: item.fieldNameFs ?? '',
- ext: item.ext ?? '',
- taskId: item.taskId ?? '',
- width: item.width ?? 0,
- height: item.height ?? 0,
- ratio: item.ratio ?? 0,
- frameRate: item.frameRate ?? '',
- syBitRate: item.syBitRate ?? '',
- vidBitRate: item.vidBitRate ?? '',
- proxyUpload: item.proxyUpload ?? 0,
- isAdd: item.isAdd ?? false,
- retry: item.retry ?? 0,
- notifySignal: item.notifySignal ?? false,
- mergeRetry: item.mergeRetry ?? 0,
- compressRetry: item.compressRetry ?? 0,
- segmentRetry: item.segmentRetry ?? 0,
- linodeRetry: item.linodeRetry ?? 0,
- failReason: item.failReason ?? '',
- deleteDisk: item.deleteDisk ?? false,
- infoTsName: item.infoTsName ?? '',
- createdAt,
- updatedAt,
- };
- }
- private async upsertOne(record: any): Promise<UpsertOutcome> {
- const id = record?.id as string | undefined;
- if (!id) return { ok: false, error: { error: 'Missing id' } };
- try {
- const { id: _, ...updateData } = record;
- await this.mongo.videoMedia.upsert({
- where: { id },
- create: record,
- update: updateData,
- });
- return { ok: true };
- } catch (e: any) {
- return {
- ok: false,
- error: { id, error: e?.message ?? 'Upsert failed' },
- };
- }
- }
- private async loadCursor(args: {
- entity: EntityType;
- pageSize: number;
- resetState: boolean;
- overridePageNum?: number;
- optionUpdatedAt?: string;
- fullSync: boolean;
- }): Promise<{ cursor: SyncCursor; checkpointUpdatedAtCursor?: string }> {
- if (args.resetState) {
- return {
- cursor: {
- pageNum: args.overridePageNum ?? 1,
- pageSize: args.pageSize,
- updatedAtCursor: args.fullSync ? undefined : args.optionUpdatedAt,
- },
- checkpointUpdatedAtCursor: undefined,
- };
- }
- const checkpoint = await this.loadCheckpoint(args.entity);
- const cursor: SyncCursor = {
- pageNum: args.overridePageNum ?? 1,
- pageSize: args.pageSize,
- updatedAtCursor: args.fullSync
- ? undefined
- : (checkpoint.updatedAtCursor ?? args.optionUpdatedAt),
- };
- return { cursor, checkpointUpdatedAtCursor: checkpoint.updatedAtCursor };
- }
- private async saveCursor(args: {
- entity: EntityType;
- fullSyncCompleted: boolean;
- }) {
- const now = new Date();
- const nowSec = Math.floor(Date.now() / 1000);
- await this.mongo.syncState.update({
- where: { entity: args.entity },
- data: {
- lastRunAt: now,
- lastFullSyncAt: args.fullSyncCompleted ? now : undefined,
- updatedAt: nowSec,
- },
- });
- }
- private async loadCheckpoint(entity: EntityType): Promise<{
- updatedAtCursor?: string;
- }> {
- const nowSec = Math.floor(Date.now() / 1000);
- const state = await this.mongo.syncState.upsert({
- where: { entity },
- update: {
- updatedAt: nowSec,
- },
- create: {
- entity,
- referId: null,
- lastRunAt: null,
- lastFullSyncAt: null,
- createdAt: nowSec,
- updatedAt: nowSec,
- },
- });
- const parsed = this.safeParseCursor(state.referId);
- return { updatedAtCursor: parsed?.updatedAtCursor };
- }
- private async saveCheckpoint(args: {
- entity: EntityType;
- updatedAtCursor?: string | null;
- fullSyncCompleted: boolean;
- }) {
- const now = new Date();
- const nowSec = Math.floor(Date.now() / 1000);
- await this.mongo.syncState.update({
- where: { entity: args.entity },
- data: {
- referId:
- args.updatedAtCursor !== undefined && args.updatedAtCursor !== null
- ? JSON.stringify({ updatedAtCursor: args.updatedAtCursor })
- : null,
- lastRunAt: now,
- lastFullSyncAt: args.fullSyncCompleted ? now : undefined,
- updatedAt: nowSec,
- },
- });
- }
- private safeParseCursor(
- raw: string | null | undefined,
- ): Partial<SyncCursor> | null {
- if (!raw) return null;
- try {
- const parsed = JSON.parse(raw) as any;
- if (!parsed || typeof parsed !== 'object') return null;
- return parsed as Partial<SyncCursor>;
- } catch {
- return null;
- }
- }
- private clampInt(n: number, min: number, max: number): number {
- const x = Number.isFinite(n) ? Math.trunc(n) : min;
- return Math.max(min, Math.min(max, x));
- }
- /**
- * Extract secondTags (string[]) from normalized video records and upsert into Tag collection.
- * - Dedup in-memory per call for performance
- * - Trims whitespace, filters empty
- * - Option B performance-first: upsert without pre-check
- */
- // private async upsertSecondTagsFromVideos(
- // normalizedVideos: Array<{ secondTags?: string[] }>,
- // ): Promise<UpsertTagsResult> {
- // // 1) Collect + normalize
- // const set = new Set<string>();
- // for (const v of normalizedVideos) {
- // const tags = v.secondTags ?? [];
- // for (const t of tags) {
- // if (typeof t !== 'string') continue;
- // const name = t.trim();
- // if (!name) continue;
- // set.add(name);
- // }
- // }
- // const names = Array.from(set);
- // if (!names.length) {
- // return { unique: 0, upserted: 0, skipped: 0, errors: [] };
- // }
- // // 2) Upsert in chunks (avoid massive Promise.all)
- // const CHUNK = 200;
- // let upserted = 0;
- // let skipped = 0;
- // const errors: Array<{ name: string; error: string }> = [];
- // for (let i = 0; i < names.length; i += CHUNK) {
- // const batch = names.slice(i, i + CHUNK);
- // // eslint-disable-next-line no-await-in-loop
- // const outcomes = await Promise.all(
- // batch.map(async (name) => {
- // try {
- // // 🔁 Adjust `where/create/update` if your Tag schema differs
- // await this.mongo.tag.upsert({
- // where: { name },
- // create: {
- // name,
- // // If Tag requires createdAt/updatedAt ints (seconds), uncomment:
- // // createdAt: Math.floor(Date.now() / 1000),
- // // updatedAt: Math.floor(Date.now() / 1000),
- // },
- // update: {
- // // keep it minimal; optionally touch updatedAt
- // // updatedAt: Math.floor(Date.now() / 1000),
- // },
- // });
- // return { ok: true as const };
- // } catch (e: any) {
- // return {
- // ok: false as const,
- // error: e?.message ?? 'Tag upsert failed',
- // };
- // }
- // }),
- // );
- // for (let j = 0; j < outcomes.length; j += 1) {
- // const o = outcomes[j];
- // if (o.ok) upserted += 1;
- // else {
- // skipped += 1;
- // errors.push({ name: batch[j], error: o.error });
- // }
- // }
- // }
- // if (errors.length) {
- // this.logger.warn(
- // `[upsertSecondTagsFromVideos] tag upsert errors=${errors.length}, sample=${JSON.stringify(
- // errors.slice(0, 3),
- // )}`,
- // );
- // } else {
- // this.logger.log(
- // `[upsertSecondTagsFromVideos] Upserted tags=${upserted} (unique=${names.length})`,
- // );
- // }
- // return {
- // unique: names.length,
- // upserted,
- // skipped,
- // errors,
- // };
- // }
- private async upsertSecondTagsFromVideos_NoUniqueName(
- normalizedVideos: Array<{ secondTags?: string[] }>,
- ): Promise<UpsertTagsResult> {
- try {
- const set = new Set<string>();
- for (const v of normalizedVideos) {
- const tags = v.secondTags ?? [];
- for (const t of tags) {
- if (typeof t !== 'string') continue;
- const name = t.trim();
- if (!name) continue;
- set.add(name);
- }
- }
- const names = Array.from(set);
- if (!names.length)
- return { unique: 0, upserted: 0, skipped: 0, errors: [] };
- // Concurrency limit to reduce race collisions and DB pressure
- const CONCURRENCY = 20;
- let idx = 0;
- let upserted = 0;
- let skipped = 0;
- const errors: Array<{ name: string; error: string }> = [];
- const worker = async () => {
- while (true) {
- const current = idx;
- idx += 1;
- if (current >= names.length) return;
- const name = names[current];
- try {
- // 1) check existence by name (NOT unique)
- const exists = await this.mongo.tag.findFirst({
- where: { name },
- select: { id: true },
- });
- if (exists?.id) {
- // already exists
- continue;
- }
- // 2) create if not exists
- await this.mongo.tag.create({
- data: {
- name,
- // If your Tag schema requires seconds fields:
- // createdAt: Math.floor(Date.now() / 1000),
- // updatedAt: Math.floor(Date.now() / 1000),
- },
- });
- upserted += 1;
- } catch (e: any) {
- // If another worker created it after our check, create may fail (duplicate on some index)
- // We treat that as skipped (safe).
- const msg = e?.message ?? 'Tag create failed';
- skipped += 1;
- errors.push({ name, error: msg });
- }
- }
- };
- await Promise.all(Array.from({ length: CONCURRENCY }, () => worker()));
- if (errors.length) {
- this.logger.warn(
- `[upsertSecondTagsFromVideos] errors=${errors.length}, sample=${JSON.stringify(
- errors.slice(0, 3),
- )}`,
- );
- } else {
- this.logger.log(
- `[upsertSecondTagsFromVideos] unique=${names.length} created=${upserted}`,
- );
- }
- return { unique: names.length, upserted, skipped, errors };
- } catch (error: any) {
- const message = error?.message ?? 'Unhandled tag upsert error';
- const trace = error?.stack ?? undefined;
- this.logger.error(
- `[upsertSecondTagsFromVideos_NoUniqueName] ${message}`,
- trace,
- );
- return {
- unique: 0,
- upserted: 0,
- skipped: 0,
- errors: [{ name: 'global', error: message }],
- };
- }
- }
- }
|