provider-video-sync.service.ts 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104
  1. // provider-video-sync.service.ts
  2. import { Injectable, Logger } from '@nestjs/common';
  3. import { HttpService } from '@nestjs/axios';
  4. import { MongoPrismaService } from '@box/db/prisma/mongo-prisma.service';
  5. import { firstValueFrom } from 'rxjs';
  6. import { EntityType } from '@prisma/mongo/client';
  7. export interface ProviderVideoSyncOptions {
  8. providerCode?: string;
  9. /**
  10. * Optional override. In normal usage, we resume from SyncState cursor:
  11. * - fullSync: pageNum resumes
  12. * - incremental: always pageNum=1
  13. */
  14. pageNum?: number;
  15. /**
  16. * Default 200, hard-capped to 500.
  17. */
  18. pageSize?: number;
  19. /**
  20. * Provider search param.
  21. * - status: required in your business rule ("Completed")
  22. * - updatedAt: ISO string filter "updated after"
  23. */
  24. param?: {
  25. status?: string;
  26. updatedAt?: string;
  27. [k: string]: any;
  28. };
  29. /**
  30. * fullSync:
  31. * - true: no param.updatedAt; resume using stored pageNum
  32. * - false: use param.updatedAt (cursor); pageNum forced to 1
  33. */
  34. fullSync?: boolean;
  35. /**
  36. * If true: ignore stored cursor and start fresh.
  37. * - fullSync: pageNum from options.pageNum or 1
  38. * - incremental: updatedAtCursor from options.param.updatedAt (if provided)
  39. */
  40. resetState?: boolean;
  41. [key: string]: any;
  42. }
  43. export interface ProviderVideoSyncResult {
  44. imported: number;
  45. created: number; // Option B: always 0
  46. updated: number; // Option B: equals successful upserts
  47. skipped: number;
  48. errors?: Array<{ id?: string; error: string }>;
  49. }
  50. interface RawProviderVideo {
  51. id: string;
  52. srcId?: number;
  53. title?: string;
  54. checkSum?: string;
  55. type?: string;
  56. formatType?: number;
  57. contentType?: number;
  58. coverType?: number;
  59. coverImg?: string;
  60. coverImgNew?: string;
  61. videoTime?: number;
  62. publish?: string;
  63. country?: string;
  64. firstTag?: string;
  65. secondTags?: string[] | null;
  66. mediaSet?: string | null;
  67. preFileName?: string;
  68. status?: string;
  69. desc?: string;
  70. size?: number;
  71. bango?: string;
  72. actors?: string[] | null;
  73. studio?: string;
  74. addedTime: string;
  75. appids?: number[] | null;
  76. japanNames?: string[] | null;
  77. filename?: string;
  78. fieldNameFs?: string;
  79. ext?: string;
  80. taskId?: string;
  81. width?: number;
  82. height?: number;
  83. ratio?: number;
  84. frameRate?: string;
  85. syBitRate?: string;
  86. vidBitRate?: string;
  87. createdAt: string;
  88. updatedAt: string;
  89. proxyUpload?: number | null;
  90. isAdd?: boolean;
  91. retry?: number;
  92. notifySignal?: boolean;
  93. mergeRetry?: number;
  94. compressRetry?: number;
  95. segmentRetry?: number;
  96. linodeRetry?: number;
  97. failReason?: string;
  98. deleteDisk?: boolean;
  99. infoTsName?: string;
  100. }
  101. type SyncCursor = {
  102. pageNum: number;
  103. pageSize: number;
  104. updatedAtCursor?: string;
  105. };
  106. type ProviderPagingInfo = {
  107. total?: number;
  108. totalPages?: number;
  109. };
  110. type UpsertOutcome =
  111. | { ok: true }
  112. | { ok: false; error: { id?: string; error: string } };
  113. type UpsertTagsResult = {
  114. unique: number;
  115. upserted: number;
  116. skipped: number;
  117. errors: Array<{ name: string; error: string }>;
  118. };
  119. @Injectable()
  120. export class ProviderVideoSyncService {
  121. private readonly logger = new Logger(ProviderVideoSyncService.name);
  122. private lastSyncSummary: ProviderVideoSyncResult | null = null;
  123. private readonly PROVIDER_API_URL =
  124. 'https://vm.rvakc.xyz/api/web/mediafile/search';
  125. private readonly DEFAULT_PROVIDER_CODE = 'RVAKC';
  126. private readonly MAX_PAGE_SIZE = 500;
  127. private readonly DEFAULT_PAGE_SIZE = 500;
  128. private readonly BATCH_SIZE = 100;
  129. private readonly BASELINE_PARTIAL_COUNT = 20000;
  130. constructor(
  131. private readonly mongo: MongoPrismaService,
  132. private readonly httpService: HttpService,
  133. ) {}
  134. async syncFromProvider(
  135. options: ProviderVideoSyncOptions = {},
  136. ): Promise<ProviderVideoSyncResult> {
  137. const providerCode = options.providerCode ?? this.DEFAULT_PROVIDER_CODE;
  138. const requestedPageSize = options.pageSize ?? this.DEFAULT_PAGE_SIZE;
  139. const pageSize = this.clampInt(requestedPageSize, 1, this.MAX_PAGE_SIZE);
  140. const fullSync = options.fullSync ?? false;
  141. const resetState = options.resetState ?? false;
  142. const paramStatus = options.param?.status ?? 'Completed';
  143. const optionUpdatedAt = options.param?.updatedAt;
  144. // Only one entity exists in your enum now
  145. const entity = EntityType.VIDEO;
  146. this.logger.log(
  147. `[syncFromProvider] Start provider=${providerCode} entity=${entity} fullSync=${fullSync} pageSize=${pageSize} resetState=${resetState}`,
  148. );
  149. // Load cursor from SyncState (or fresh if resetState)
  150. const { cursor: initialCursor, checkpointUpdatedAtCursor } =
  151. await this.loadCursor({
  152. entity,
  153. pageSize,
  154. resetState,
  155. overridePageNum: options.pageNum,
  156. optionUpdatedAt,
  157. fullSync,
  158. });
  159. // Counters (Option B: created always 0, updated counts successful upserts)
  160. let imported = 0;
  161. let updated = 0;
  162. let skipped = 0;
  163. const created = 0;
  164. const errors: Array<{ id?: string; error: string }> = [];
  165. // Track max updatedAt seen (for incremental cursor advancement)
  166. let maxUpdatedAtSeen: Date | null = null;
  167. // Full sync resumes with pageNum; incremental always starts at 1
  168. let pageNum = fullSync ? initialCursor.pageNum : 1;
  169. // Keep a working cursor that we will persist as we go
  170. const cursor: SyncCursor = {
  171. pageNum,
  172. pageSize: initialCursor.pageSize,
  173. updatedAtCursor: initialCursor.updatedAtCursor,
  174. };
  175. const effectiveUpdatedAtCursor = fullSync
  176. ? undefined
  177. : (options.param?.updatedAt ?? checkpointUpdatedAtCursor);
  178. const shouldRunBaselinePartial = !fullSync && !checkpointUpdatedAtCursor;
  179. try {
  180. if (shouldRunBaselinePartial) {
  181. const baselineResult = await this.runBaselinePartialIfNeeded({
  182. entity,
  183. cursor: initialCursor,
  184. paramStatus,
  185. optionsParam: options.param,
  186. });
  187. if (baselineResult) {
  188. this.lastSyncSummary = baselineResult;
  189. return baselineResult;
  190. }
  191. }
  192. while (true) {
  193. const body = this.buildProviderBody({
  194. pageNum,
  195. pageSize: cursor.pageSize,
  196. status: paramStatus,
  197. // fullSync: no updatedAt filter
  198. updatedAt: fullSync ? undefined : effectiveUpdatedAtCursor,
  199. extraParam: options.param,
  200. });
  201. this.logger.log(
  202. `[syncFromProvider] POST pageNum=${pageNum} pageSize=${cursor.pageSize} status=${paramStatus} updatedAt=${fullSync ? '(none)' : (body.param.updatedAt ?? '(none)')}`,
  203. );
  204. const rawList = await this.fetchPage(body);
  205. if (!rawList.length) {
  206. this.logger.log(
  207. `[syncFromProvider] No more records (pageNum=${pageNum}). Stop.`,
  208. );
  209. // On completion:
  210. // - fullSync: reset pageNum to 1 and set lastFullSyncAt
  211. // - incremental: advance updatedAtCursor to maxUpdatedAtSeen, keep pageNum=1
  212. const fullSyncCompleted = fullSync;
  213. if (!fullSync && maxUpdatedAtSeen && imported > 0) {
  214. await this.saveCheckpoint({
  215. entity,
  216. updatedAtCursor: maxUpdatedAtSeen.toISOString(),
  217. fullSyncCompleted: false,
  218. });
  219. }
  220. await this.saveCursor({
  221. entity,
  222. fullSyncCompleted,
  223. });
  224. const result: ProviderVideoSyncResult = {
  225. imported,
  226. created,
  227. updated,
  228. skipped,
  229. errors: errors.length ? errors.slice(0, 10) : undefined,
  230. };
  231. this.lastSyncSummary = result;
  232. return result;
  233. }
  234. imported += rawList.length;
  235. const processed = await this.processProviderRawList(
  236. rawList,
  237. maxUpdatedAtSeen,
  238. );
  239. updated += processed.updated;
  240. skipped += processed.skipped;
  241. errors.push(...processed.errors);
  242. maxUpdatedAtSeen = processed.maxUpdatedAtSeen;
  243. // Persist progress so we can resume on crash
  244. await this.saveCursor({
  245. entity,
  246. fullSyncCompleted: false,
  247. });
  248. pageNum += 1;
  249. }
  250. } catch (e: any) {
  251. this.logger.error(
  252. `[syncFromProvider] Unexpected error: ${e?.message ?? e}`,
  253. );
  254. // Best-effort cursor persistence
  255. try {
  256. await this.saveCursor({
  257. entity,
  258. fullSyncCompleted: false,
  259. });
  260. } catch (saveErr: any) {
  261. this.logger.error(
  262. `[syncFromProvider] Failed to persist cursor after error: ${saveErr?.message ?? saveErr}`,
  263. );
  264. }
  265. const result: ProviderVideoSyncResult = {
  266. imported,
  267. created,
  268. updated,
  269. skipped,
  270. errors: [
  271. ...(errors.length ? errors.slice(0, 9) : []),
  272. { error: e?.message ?? 'Unexpected error' },
  273. ],
  274. };
  275. this.lastSyncSummary = result;
  276. return result;
  277. }
  278. }
  279. private async runBaselinePartialIfNeeded(args: {
  280. entity: EntityType;
  281. cursor: SyncCursor;
  282. paramStatus: string;
  283. optionsParam?: ProviderVideoSyncOptions['param'];
  284. }): Promise<ProviderVideoSyncResult | null> {
  285. if (!args.cursor || args.cursor.updatedAtCursor !== undefined) {
  286. return null;
  287. }
  288. const probeBody = this.buildProviderBody({
  289. pageNum: 1,
  290. pageSize: args.cursor.pageSize,
  291. status: args.paramStatus,
  292. updatedAt: undefined,
  293. extraParam: args.optionsParam,
  294. });
  295. const pagination = await this.probeProviderForPaging(probeBody);
  296. let totalPages = pagination.totalPages;
  297. if (totalPages === undefined && pagination.total !== undefined) {
  298. totalPages = Math.max(
  299. 0,
  300. Math.ceil(pagination.total / args.cursor.pageSize),
  301. );
  302. }
  303. if (!totalPages || totalPages < 1) {
  304. this.logger.warn(
  305. '[syncFromProvider] Baseline partial skipped because provider did not disclose total/pages; cannot compute bottom→top range.',
  306. );
  307. return null;
  308. }
  309. const pagesNeeded = Math.min(
  310. totalPages,
  311. Math.ceil(this.BASELINE_PARTIAL_COUNT / args.cursor.pageSize),
  312. );
  313. if (pagesNeeded <= 0) {
  314. return null;
  315. }
  316. const startPage = totalPages;
  317. const endPage = Math.max(1, totalPages - pagesNeeded + 1);
  318. this.logger.log(
  319. `[syncFromProvider] Baseline partial (first-ever) running pages ${startPage} down to ${endPage}`,
  320. );
  321. let imported = 0;
  322. let updated = 0;
  323. let skipped = 0;
  324. const errors: Array<{ id?: string; error: string }> = [];
  325. let maxUpdatedAtSeen: Date | null = null;
  326. for (let page = startPage; page >= endPage; page -= 1) {
  327. const body = this.buildProviderBody({
  328. pageNum: page,
  329. pageSize: args.cursor.pageSize,
  330. status: args.paramStatus,
  331. updatedAt: undefined,
  332. extraParam: args.optionsParam,
  333. });
  334. this.logger.log(`[syncFromProvider] param body ${JSON.stringify(body)} `);
  335. const rawList = await this.fetchPage(body);
  336. if (!rawList.length) {
  337. this.logger.log(
  338. `[syncFromProvider] Baseline partial page ${page} returned 0 records; continuing.`,
  339. );
  340. continue;
  341. }
  342. imported += rawList.length;
  343. const processed = await this.processProviderRawList(
  344. rawList,
  345. maxUpdatedAtSeen,
  346. );
  347. updated += processed.updated;
  348. skipped += processed.skipped;
  349. errors.push(...processed.errors);
  350. maxUpdatedAtSeen = processed.maxUpdatedAtSeen;
  351. }
  352. if (maxUpdatedAtSeen && imported > 0) {
  353. await this.saveCheckpoint({
  354. entity: args.entity,
  355. updatedAtCursor: maxUpdatedAtSeen.toISOString(),
  356. fullSyncCompleted: false,
  357. });
  358. }
  359. return {
  360. imported,
  361. created: 0,
  362. updated,
  363. skipped,
  364. errors: errors.length ? errors.slice(0, 10) : undefined,
  365. };
  366. }
  367. private async probeProviderForPaging(body: {
  368. pageNum: number;
  369. pageSize: number;
  370. param: Record<string, any>;
  371. }): Promise<ProviderPagingInfo> {
  372. try {
  373. const response = await firstValueFrom(
  374. this.httpService.post(this.PROVIDER_API_URL, body, {
  375. headers: { 'Content-Type': 'application/json' },
  376. timeout: 30000,
  377. }),
  378. );
  379. return {
  380. total: this.extractNumberFromPaths(response.data, [
  381. 'total',
  382. 'data.total',
  383. 'data.totalCount',
  384. 'data.pageInfo.total',
  385. 'data.pageInfo.totalCount',
  386. ]),
  387. totalPages: this.extractNumberFromPaths(response.data, [
  388. 'pages',
  389. 'data.pages',
  390. 'data.totalPages',
  391. 'data.pageInfo.pages',
  392. 'data.pageInfo.totalPages',
  393. ]),
  394. };
  395. } catch (error: any) {
  396. this.logger.error(
  397. `[probeProviderForPaging] Provider API call failed: ${error?.message ?? error}`,
  398. );
  399. throw new Error(`Provider API error: ${error?.message ?? 'unknown'}`);
  400. }
  401. }
  402. private extractNumberFromPaths(
  403. data: any,
  404. paths: string[],
  405. ): number | undefined {
  406. if (!data || typeof data !== 'object') return undefined;
  407. for (const path of paths) {
  408. const value = path
  409. .split('.')
  410. .reduce<any>(
  411. (obj, key) => (obj && typeof obj === 'object' ? obj[key] : undefined),
  412. data,
  413. );
  414. if (value === undefined || value === null) continue;
  415. const num = typeof value === 'number' ? value : Number(value);
  416. if (Number.isFinite(num)) return num;
  417. }
  418. return undefined;
  419. }
  420. getLastSyncSummary(): ProviderVideoSyncResult | null {
  421. return this.lastSyncSummary;
  422. }
  423. private buildProviderBody(args: {
  424. pageNum: number;
  425. pageSize: number;
  426. status: string;
  427. updatedAt?: string;
  428. extraParam?: ProviderVideoSyncOptions['param'];
  429. }) {
  430. // Provider contract:
  431. // {
  432. // pageNum: 1,
  433. // pageSize: 200,
  434. // param: { status: "Completed", updatedAt: "ISO" }
  435. // }
  436. const param: Record<string, any> = {
  437. status: args.status,
  438. };
  439. // Keep only if present (incremental)
  440. if (args.updatedAt) param.updatedAt = args.updatedAt;
  441. // Merge any extraParam fields, but status/updatedAt above remain authoritative
  442. if (args.extraParam && typeof args.extraParam === 'object') {
  443. for (const [k, v] of Object.entries(args.extraParam)) {
  444. if (k === 'status' || k === 'updatedAt') continue;
  445. param[k] = v;
  446. }
  447. }
  448. return {
  449. pageNum: args.pageNum,
  450. pageSize: args.pageSize,
  451. param,
  452. };
  453. }
  454. private async fetchPage(body: {
  455. pageNum: number;
  456. pageSize: number;
  457. param: Record<string, any>;
  458. }): Promise<RawProviderVideo[]> {
  459. try {
  460. // Provider expects { data: "<json string>" } (based on code=400 Field=data expecting string)
  461. const wrappedBody = {
  462. data: JSON.stringify({
  463. pageNum: body.pageNum,
  464. pageSize: body.pageSize,
  465. param: body.param,
  466. }),
  467. };
  468. const response = await firstValueFrom(
  469. this.httpService.post(this.PROVIDER_API_URL, wrappedBody, {
  470. headers: { 'Content-Type': 'application/json' },
  471. timeout: 30_000,
  472. }),
  473. );
  474. // Axios response unwrap: providerJson is the actual provider payload
  475. const providerJson = (response as any)?.data ?? response;
  476. // Log a small preview for debugging (avoid huge logs)
  477. this.logger.log(
  478. `[fetchPage] Provider response preview: ${JSON.stringify(
  479. providerJson,
  480. ).slice(0, 400)}...`,
  481. );
  482. // Fail fast on provider errors (prevents "successful" runs with empty lists)
  483. const code = (providerJson as any)?.code;
  484. if (code !== 200) {
  485. const msg = (providerJson as any)?.msg ?? 'unknown';
  486. const tip = (providerJson as any)?.tip ?? '';
  487. throw new Error(
  488. `Provider error code=${code} msg=${msg}${tip ? ` tip=${tip}` : ''}`,
  489. );
  490. }
  491. const list = this.extractList(providerJson);
  492. this.logger.log(`[fetchPage] Received ${list.length} items`);
  493. return list;
  494. } catch (error: any) {
  495. this.logger.error(
  496. `[fetchPage] Provider API call failed: ${error?.message ?? error}`,
  497. );
  498. throw new Error(`Provider API error: ${error?.message ?? 'unknown'}`);
  499. }
  500. }
  501. private async processProviderRawList(
  502. rawList: RawProviderVideo[],
  503. currentMaxUpdatedAt: Date | null,
  504. ): Promise<{
  505. updated: number;
  506. skipped: number;
  507. errors: Array<{ id?: string; error: string }>;
  508. maxUpdatedAtSeen: Date | null;
  509. }> {
  510. if (!rawList.length) {
  511. return {
  512. updated: 0,
  513. skipped: 0,
  514. errors: [],
  515. maxUpdatedAtSeen: currentMaxUpdatedAt,
  516. };
  517. }
  518. const normalized = rawList.map((item) => this.normalizeItem(item));
  519. const hasSecondTags = normalized.some(
  520. (v) => Array.isArray(v.secondTags) && v.secondTags.length > 0,
  521. );
  522. if (hasSecondTags) {
  523. await this.upsertSecondTagsFromVideos_NoUniqueName(normalized);
  524. }
  525. let maxUpdatedAtSeen = currentMaxUpdatedAt;
  526. for (const n of normalized) {
  527. const d = n.updatedAt as Date;
  528. if (!maxUpdatedAtSeen || d.getTime() > maxUpdatedAtSeen.getTime()) {
  529. maxUpdatedAtSeen = d;
  530. }
  531. }
  532. let updated = 0;
  533. let skipped = 0;
  534. const errors: Array<{ id?: string; error: string }> = [];
  535. for (let i = 0; i < normalized.length; i += this.BATCH_SIZE) {
  536. const batch = normalized.slice(i, i + this.BATCH_SIZE);
  537. // eslint-disable-next-line no-await-in-loop
  538. const outcomes = await Promise.all(batch.map((r) => this.upsertOne(r)));
  539. const okCount = outcomes.filter((o) => o.ok).length;
  540. const fail = outcomes.filter((o) => !o.ok) as Array<
  541. Extract<UpsertOutcome, { ok: false }>
  542. >;
  543. updated += okCount;
  544. skipped += fail.length;
  545. for (const f of fail) errors.push(f.error);
  546. }
  547. return {
  548. updated,
  549. skipped,
  550. errors,
  551. maxUpdatedAtSeen,
  552. };
  553. }
  554. private debugRespShape(resp: unknown) {
  555. const r: any = resp as any;
  556. const keys = r && typeof r === 'object' ? Object.keys(r).slice(0, 12) : [];
  557. const dataKeys =
  558. r?.data && typeof r.data === 'object'
  559. ? Object.keys(r.data).slice(0, 12)
  560. : [];
  561. const dataDataKeys =
  562. r?.data?.data && typeof r.data.data === 'object'
  563. ? Object.keys(r.data.data).slice(0, 12)
  564. : [];
  565. this.logger.warn(
  566. `[debugRespShape] topKeys=${JSON.stringify(keys)} dataKeys=${JSON.stringify(
  567. dataKeys,
  568. )} dataDataKeys=${JSON.stringify(dataDataKeys)} hasStatus=${Boolean(r?.status)} hasCode=${Boolean(
  569. r?.code,
  570. )} hasDataCode=${Boolean(r?.data?.code)}`,
  571. );
  572. }
  573. private extractList(apiResponse: unknown): RawProviderVideo[] {
  574. const data = apiResponse as any;
  575. if (Array.isArray(data)) return data as RawProviderVideo[];
  576. // ✅ axios response: { data: { code, data: { total, list } } }
  577. if (data?.data?.data?.list && Array.isArray(data.data.data.list)) {
  578. return data.data.data.list as RawProviderVideo[];
  579. }
  580. // provider json directly: { code, data: { total, list } }
  581. if (data?.data?.list && Array.isArray(data.data.list)) {
  582. return data.data.list as RawProviderVideo[];
  583. }
  584. if (data?.list && Array.isArray(data.list)) {
  585. return data.list as RawProviderVideo[];
  586. }
  587. if (data?.data?.records && Array.isArray(data.data.records)) {
  588. return data.data.records as RawProviderVideo[];
  589. }
  590. this.logger.warn(
  591. '[extractList] Unexpected API response structure, defaulting to empty list',
  592. );
  593. return [];
  594. }
  595. private normalizeItem(item: RawProviderVideo) {
  596. if (!item.id) throw new Error('Each item must have an id');
  597. if (!item.addedTime || !item.createdAt || !item.updatedAt) {
  598. throw new Error(`Item ${item.id} is missing required datetime fields`);
  599. }
  600. const addedTime = new Date(item.addedTime);
  601. const createdAt = new Date(item.createdAt);
  602. const updatedAt = new Date(item.updatedAt);
  603. if (
  604. isNaN(addedTime.getTime()) ||
  605. isNaN(createdAt.getTime()) ||
  606. isNaN(updatedAt.getTime())
  607. ) {
  608. throw new Error(`Item ${item.id} has invalid datetime format`);
  609. }
  610. return {
  611. id: item.id, // confirmed Mongo ObjectId string
  612. srcId: item.srcId ?? 0,
  613. title: item.title ?? '',
  614. checkSum: item.checkSum ?? '',
  615. type: item.type ?? '',
  616. formatType: item.formatType ?? 0,
  617. contentType: item.contentType ?? 0,
  618. coverType: item.coverType ?? 0,
  619. coverImg: item.coverImg ?? '',
  620. coverImgNew: item.coverImgNew ?? '',
  621. videoTime: item.videoTime ?? 0,
  622. publish: item.publish ?? '',
  623. country: item.country ?? '',
  624. firstTag: item.firstTag ?? '',
  625. secondTags: item.secondTags ?? [],
  626. mediaSet: item.mediaSet ?? '',
  627. preFileName: item.preFileName ?? '',
  628. status: item.status ?? '',
  629. desc: item.desc ?? '',
  630. size: BigInt(item.size ?? 0),
  631. bango: item.bango ?? '',
  632. actors: item.actors ?? [],
  633. studio: item.studio ?? '',
  634. addedTime,
  635. appids: item.appids ?? [],
  636. japanNames: item.japanNames ?? [],
  637. filename: item.filename ?? '',
  638. fieldNameFs: item.fieldNameFs ?? '',
  639. ext: item.ext ?? '',
  640. taskId: item.taskId ?? '',
  641. width: item.width ?? 0,
  642. height: item.height ?? 0,
  643. ratio: item.ratio ?? 0,
  644. frameRate: item.frameRate ?? '',
  645. syBitRate: item.syBitRate ?? '',
  646. vidBitRate: item.vidBitRate ?? '',
  647. proxyUpload: item.proxyUpload ?? 0,
  648. isAdd: item.isAdd ?? false,
  649. retry: item.retry ?? 0,
  650. notifySignal: item.notifySignal ?? false,
  651. mergeRetry: item.mergeRetry ?? 0,
  652. compressRetry: item.compressRetry ?? 0,
  653. segmentRetry: item.segmentRetry ?? 0,
  654. linodeRetry: item.linodeRetry ?? 0,
  655. failReason: item.failReason ?? '',
  656. deleteDisk: item.deleteDisk ?? false,
  657. infoTsName: item.infoTsName ?? '',
  658. createdAt,
  659. updatedAt,
  660. };
  661. }
  662. private async upsertOne(record: any): Promise<UpsertOutcome> {
  663. const id = record?.id as string | undefined;
  664. if (!id) return { ok: false, error: { error: 'Missing id' } };
  665. try {
  666. const { id: _, ...updateData } = record;
  667. await this.mongo.videoMedia.upsert({
  668. where: { id },
  669. create: record,
  670. update: updateData,
  671. });
  672. return { ok: true };
  673. } catch (e: any) {
  674. return {
  675. ok: false,
  676. error: { id, error: e?.message ?? 'Upsert failed' },
  677. };
  678. }
  679. }
  680. private async loadCursor(args: {
  681. entity: EntityType;
  682. pageSize: number;
  683. resetState: boolean;
  684. overridePageNum?: number;
  685. optionUpdatedAt?: string;
  686. fullSync: boolean;
  687. }): Promise<{ cursor: SyncCursor; checkpointUpdatedAtCursor?: string }> {
  688. if (args.resetState) {
  689. return {
  690. cursor: {
  691. pageNum: args.overridePageNum ?? 1,
  692. pageSize: args.pageSize,
  693. updatedAtCursor: args.fullSync ? undefined : args.optionUpdatedAt,
  694. },
  695. checkpointUpdatedAtCursor: undefined,
  696. };
  697. }
  698. const checkpoint = await this.loadCheckpoint(args.entity);
  699. const cursor: SyncCursor = {
  700. pageNum: args.overridePageNum ?? 1,
  701. pageSize: args.pageSize,
  702. updatedAtCursor: args.fullSync
  703. ? undefined
  704. : (checkpoint.updatedAtCursor ?? args.optionUpdatedAt),
  705. };
  706. return { cursor, checkpointUpdatedAtCursor: checkpoint.updatedAtCursor };
  707. }
  708. private async saveCursor(args: {
  709. entity: EntityType;
  710. fullSyncCompleted: boolean;
  711. }) {
  712. const now = new Date();
  713. const nowSec = Math.floor(Date.now() / 1000);
  714. await this.mongo.syncState.update({
  715. where: { entity: args.entity },
  716. data: {
  717. lastRunAt: now,
  718. lastFullSyncAt: args.fullSyncCompleted ? now : undefined,
  719. updatedAt: nowSec,
  720. },
  721. });
  722. }
  723. private async loadCheckpoint(entity: EntityType): Promise<{
  724. updatedAtCursor?: string;
  725. }> {
  726. const nowSec = Math.floor(Date.now() / 1000);
  727. const state = await this.mongo.syncState.upsert({
  728. where: { entity },
  729. update: {
  730. updatedAt: nowSec,
  731. },
  732. create: {
  733. entity,
  734. referId: null,
  735. lastRunAt: null,
  736. lastFullSyncAt: null,
  737. createdAt: nowSec,
  738. updatedAt: nowSec,
  739. },
  740. });
  741. const parsed = this.safeParseCursor(state.referId);
  742. return { updatedAtCursor: parsed?.updatedAtCursor };
  743. }
  744. private async saveCheckpoint(args: {
  745. entity: EntityType;
  746. updatedAtCursor?: string | null;
  747. fullSyncCompleted: boolean;
  748. }) {
  749. const now = new Date();
  750. const nowSec = Math.floor(Date.now() / 1000);
  751. await this.mongo.syncState.update({
  752. where: { entity: args.entity },
  753. data: {
  754. referId:
  755. args.updatedAtCursor !== undefined && args.updatedAtCursor !== null
  756. ? JSON.stringify({ updatedAtCursor: args.updatedAtCursor })
  757. : null,
  758. lastRunAt: now,
  759. lastFullSyncAt: args.fullSyncCompleted ? now : undefined,
  760. updatedAt: nowSec,
  761. },
  762. });
  763. }
  764. private safeParseCursor(
  765. raw: string | null | undefined,
  766. ): Partial<SyncCursor> | null {
  767. if (!raw) return null;
  768. try {
  769. const parsed = JSON.parse(raw) as any;
  770. if (!parsed || typeof parsed !== 'object') return null;
  771. return parsed as Partial<SyncCursor>;
  772. } catch {
  773. return null;
  774. }
  775. }
  776. private clampInt(n: number, min: number, max: number): number {
  777. const x = Number.isFinite(n) ? Math.trunc(n) : min;
  778. return Math.max(min, Math.min(max, x));
  779. }
  780. /**
  781. * Extract secondTags (string[]) from normalized video records and upsert into Tag collection.
  782. * - Dedup in-memory per call for performance
  783. * - Trims whitespace, filters empty
  784. * - Option B performance-first: upsert without pre-check
  785. */
  786. // private async upsertSecondTagsFromVideos(
  787. // normalizedVideos: Array<{ secondTags?: string[] }>,
  788. // ): Promise<UpsertTagsResult> {
  789. // // 1) Collect + normalize
  790. // const set = new Set<string>();
  791. // for (const v of normalizedVideos) {
  792. // const tags = v.secondTags ?? [];
  793. // for (const t of tags) {
  794. // if (typeof t !== 'string') continue;
  795. // const name = t.trim();
  796. // if (!name) continue;
  797. // set.add(name);
  798. // }
  799. // }
  800. // const names = Array.from(set);
  801. // if (!names.length) {
  802. // return { unique: 0, upserted: 0, skipped: 0, errors: [] };
  803. // }
  804. // // 2) Upsert in chunks (avoid massive Promise.all)
  805. // const CHUNK = 200;
  806. // let upserted = 0;
  807. // let skipped = 0;
  808. // const errors: Array<{ name: string; error: string }> = [];
  809. // for (let i = 0; i < names.length; i += CHUNK) {
  810. // const batch = names.slice(i, i + CHUNK);
  811. // // eslint-disable-next-line no-await-in-loop
  812. // const outcomes = await Promise.all(
  813. // batch.map(async (name) => {
  814. // try {
  815. // // 🔁 Adjust `where/create/update` if your Tag schema differs
  816. // await this.mongo.tag.upsert({
  817. // where: { name },
  818. // create: {
  819. // name,
  820. // // If Tag requires createdAt/updatedAt ints (seconds), uncomment:
  821. // // createdAt: Math.floor(Date.now() / 1000),
  822. // // updatedAt: Math.floor(Date.now() / 1000),
  823. // },
  824. // update: {
  825. // // keep it minimal; optionally touch updatedAt
  826. // // updatedAt: Math.floor(Date.now() / 1000),
  827. // },
  828. // });
  829. // return { ok: true as const };
  830. // } catch (e: any) {
  831. // return {
  832. // ok: false as const,
  833. // error: e?.message ?? 'Tag upsert failed',
  834. // };
  835. // }
  836. // }),
  837. // );
  838. // for (let j = 0; j < outcomes.length; j += 1) {
  839. // const o = outcomes[j];
  840. // if (o.ok) upserted += 1;
  841. // else {
  842. // skipped += 1;
  843. // errors.push({ name: batch[j], error: o.error });
  844. // }
  845. // }
  846. // }
  847. // if (errors.length) {
  848. // this.logger.warn(
  849. // `[upsertSecondTagsFromVideos] tag upsert errors=${errors.length}, sample=${JSON.stringify(
  850. // errors.slice(0, 3),
  851. // )}`,
  852. // );
  853. // } else {
  854. // this.logger.log(
  855. // `[upsertSecondTagsFromVideos] Upserted tags=${upserted} (unique=${names.length})`,
  856. // );
  857. // }
  858. // return {
  859. // unique: names.length,
  860. // upserted,
  861. // skipped,
  862. // errors,
  863. // };
  864. // }
  865. private async upsertSecondTagsFromVideos_NoUniqueName(
  866. normalizedVideos: Array<{ secondTags?: string[] }>,
  867. ): Promise<UpsertTagsResult> {
  868. try {
  869. const set = new Set<string>();
  870. for (const v of normalizedVideos) {
  871. const tags = v.secondTags ?? [];
  872. for (const t of tags) {
  873. if (typeof t !== 'string') continue;
  874. const name = t.trim();
  875. if (!name) continue;
  876. set.add(name);
  877. }
  878. }
  879. const names = Array.from(set);
  880. if (!names.length)
  881. return { unique: 0, upserted: 0, skipped: 0, errors: [] };
  882. // Concurrency limit to reduce race collisions and DB pressure
  883. const CONCURRENCY = 20;
  884. let idx = 0;
  885. let upserted = 0;
  886. let skipped = 0;
  887. const errors: Array<{ name: string; error: string }> = [];
  888. const worker = async () => {
  889. while (true) {
  890. const current = idx;
  891. idx += 1;
  892. if (current >= names.length) return;
  893. const name = names[current];
  894. try {
  895. // 1) check existence by name (NOT unique)
  896. const exists = await this.mongo.tag.findFirst({
  897. where: { name },
  898. select: { id: true },
  899. });
  900. if (exists?.id) {
  901. // already exists
  902. continue;
  903. }
  904. // 2) create if not exists
  905. await this.mongo.tag.create({
  906. data: {
  907. name,
  908. // If your Tag schema requires seconds fields:
  909. // createdAt: Math.floor(Date.now() / 1000),
  910. // updatedAt: Math.floor(Date.now() / 1000),
  911. },
  912. });
  913. upserted += 1;
  914. } catch (e: any) {
  915. // If another worker created it after our check, create may fail (duplicate on some index)
  916. // We treat that as skipped (safe).
  917. const msg = e?.message ?? 'Tag create failed';
  918. skipped += 1;
  919. errors.push({ name, error: msg });
  920. }
  921. }
  922. };
  923. await Promise.all(Array.from({ length: CONCURRENCY }, () => worker()));
  924. if (errors.length) {
  925. this.logger.warn(
  926. `[upsertSecondTagsFromVideos] errors=${errors.length}, sample=${JSON.stringify(
  927. errors.slice(0, 3),
  928. )}`,
  929. );
  930. } else {
  931. this.logger.log(
  932. `[upsertSecondTagsFromVideos] unique=${names.length} created=${upserted}`,
  933. );
  934. }
  935. return { unique: names.length, upserted, skipped, errors };
  936. } catch (error: any) {
  937. const message = error?.message ?? 'Unhandled tag upsert error';
  938. const trace = error?.stack ?? undefined;
  939. this.logger.error(
  940. `[upsertSecondTagsFromVideos_NoUniqueName] ${message}`,
  941. trace,
  942. );
  943. return {
  944. unique: 0,
  945. upserted: 0,
  946. skipped: 0,
  947. errors: [{ name: 'global', error: message }],
  948. };
  949. }
  950. }
  951. }