provider-video-sync.service.ts 31 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121
  1. // provider-video-sync.service.ts
  2. import { Injectable, Logger } from '@nestjs/common';
  3. import { HttpService } from '@nestjs/axios';
  4. import { MongoPrismaService } from '@box/db/prisma/mongo-prisma.service';
  5. import { SysConfigReaderService } from '@box/core/sys-config/sys-config-reader.service';
  6. import { firstValueFrom } from 'rxjs';
  7. import { EntityType } from '@prisma/mongo/client';
  8. export interface ProviderVideoSyncOptions {
  9. providerCode?: string;
  10. /**
  11. * Optional override. In normal usage, we resume from SyncState cursor:
  12. * - fullSync: pageNum resumes
  13. * - incremental: always pageNum=1
  14. */
  15. pageNum?: number;
  16. /**
  17. * Default 200, hard-capped to 500.
  18. */
  19. pageSize?: number;
  20. /**
  21. * Provider search param.
  22. * - status: required in your business rule ("Completed")
  23. * - updatedAt: ISO string filter "updated after"
  24. */
  25. param?: {
  26. status?: string;
  27. updatedAt?: string;
  28. [k: string]: any;
  29. };
  30. /**
  31. * fullSync:
  32. * - true: no param.updatedAt; resume using stored pageNum
  33. * - false: use param.updatedAt (cursor); pageNum forced to 1
  34. */
  35. fullSync?: boolean;
  36. /**
  37. * If true: ignore stored cursor and start fresh.
  38. * - fullSync: pageNum from options.pageNum or 1
  39. * - incremental: updatedAtCursor from options.param.updatedAt (if provided)
  40. */
  41. resetState?: boolean;
  42. [key: string]: any;
  43. }
  44. export interface ProviderVideoSyncResult {
  45. imported: number;
  46. created: number; // Option B: always 0
  47. updated: number; // Option B: equals successful upserts
  48. skipped: number;
  49. errors?: Array<{ id?: string; error: string }>;
  50. }
  51. interface RawProviderVideo {
  52. id: string;
  53. srcId?: number;
  54. title?: string;
  55. checkSum?: string;
  56. type?: string;
  57. formatType?: number;
  58. contentType?: number;
  59. coverType?: number;
  60. coverImg?: string;
  61. coverImgNew?: string;
  62. videoTime?: number;
  63. publish?: string;
  64. country?: string;
  65. firstTag?: string;
  66. secondTags?: string[] | null;
  67. mediaSet?: string | null;
  68. preFileName?: string;
  69. status?: string;
  70. desc?: string;
  71. size?: number;
  72. bango?: string;
  73. actors?: string[] | null;
  74. studio?: string;
  75. addedTime: string;
  76. appids?: number[] | null;
  77. japanNames?: string[] | null;
  78. filename?: string;
  79. fieldNameFs?: string;
  80. ext?: string;
  81. taskId?: string;
  82. width?: number;
  83. height?: number;
  84. ratio?: number;
  85. frameRate?: string;
  86. syBitRate?: string;
  87. vidBitRate?: string;
  88. createdAt: string;
  89. updatedAt: string;
  90. proxyUpload?: number | null;
  91. isAdd?: boolean;
  92. retry?: number;
  93. notifySignal?: boolean;
  94. mergeRetry?: number;
  95. compressRetry?: number;
  96. segmentRetry?: number;
  97. linodeRetry?: number;
  98. failReason?: string;
  99. deleteDisk?: boolean;
  100. infoTsName?: string;
  101. }
  102. type SyncCursor = {
  103. pageNum: number;
  104. pageSize: number;
  105. updatedAtCursor?: string;
  106. };
  107. type ProviderPagingInfo = {
  108. total?: number;
  109. totalPages?: number;
  110. };
  111. type UpsertOutcome =
  112. | { ok: true }
  113. | { ok: false; error: { id?: string; error: string } };
  114. type UpsertTagsResult = {
  115. unique: number;
  116. upserted: number;
  117. skipped: number;
  118. errors: Array<{ name: string; error: string }>;
  119. };
  120. @Injectable()
  121. export class ProviderVideoSyncService {
  122. private readonly logger = new Logger(ProviderVideoSyncService.name);
  123. private lastSyncSummary: ProviderVideoSyncResult | null = null;
  124. private readonly MAX_PAGE_SIZE = 500;
  125. private readonly DEFAULT_PAGE_SIZE = 500;
  126. private readonly BATCH_SIZE = 100;
  127. private readonly BASELINE_PARTIAL_COUNT = 20000;
  128. constructor(
  129. private readonly mongo: MongoPrismaService,
  130. private readonly httpService: HttpService,
  131. private readonly sysConfigReader: SysConfigReaderService,
  132. ) {}
  133. async syncFromProvider(
  134. options: ProviderVideoSyncOptions = {},
  135. ): Promise<ProviderVideoSyncResult> {
  136. const providerConfig = await this.sysConfigReader.getProviderConfig();
  137. const providerCode = options.providerCode ?? providerConfig.providerCode;
  138. const providerApiUrl = providerConfig.apiUrl;
  139. if (!providerApiUrl) {
  140. throw new Error(
  141. 'sysConfig.provider.apiUrl is required for provider sync',
  142. );
  143. }
  144. const defaultPageSize = providerConfig.itemsLimit ?? this.DEFAULT_PAGE_SIZE;
  145. const requestedPageSize = options.pageSize ?? defaultPageSize;
  146. const pageSize = this.clampInt(requestedPageSize, 1, this.MAX_PAGE_SIZE);
  147. const fullSync = options.fullSync ?? false;
  148. const resetState = options.resetState ?? false;
  149. const paramStatus = options.param?.status ?? 'Completed';
  150. const optionUpdatedAt = options.param?.updatedAt;
  151. // Only one entity exists in your enum now
  152. const entity = EntityType.VIDEO;
  153. this.logger.log(
  154. `[syncFromProvider] Start provider=${providerCode} entity=${entity} fullSync=${fullSync} pageSize=${pageSize} resetState=${resetState}`,
  155. );
  156. // Load cursor from SyncState (or fresh if resetState)
  157. const { cursor: initialCursor, checkpointUpdatedAtCursor } =
  158. await this.loadCursor({
  159. entity,
  160. pageSize,
  161. resetState,
  162. overridePageNum: options.pageNum,
  163. optionUpdatedAt,
  164. fullSync,
  165. });
  166. // Counters (Option B: created always 0, updated counts successful upserts)
  167. let imported = 0;
  168. let updated = 0;
  169. let skipped = 0;
  170. const created = 0;
  171. const errors: Array<{ id?: string; error: string }> = [];
  172. // Track max updatedAt seen (for incremental cursor advancement)
  173. let maxUpdatedAtSeen: Date | null = null;
  174. // Full sync resumes with pageNum; incremental always starts at 1
  175. let pageNum = fullSync ? initialCursor.pageNum : 1;
  176. // Keep a working cursor that we will persist as we go
  177. const cursor: SyncCursor = {
  178. pageNum,
  179. pageSize: initialCursor.pageSize,
  180. updatedAtCursor: initialCursor.updatedAtCursor,
  181. };
  182. const effectiveUpdatedAtCursor = fullSync
  183. ? undefined
  184. : (options.param?.updatedAt ?? checkpointUpdatedAtCursor);
  185. const shouldRunBaselinePartial = !fullSync && !checkpointUpdatedAtCursor;
  186. try {
  187. if (shouldRunBaselinePartial) {
  188. const baselineResult = await this.runBaselinePartialIfNeeded({
  189. entity,
  190. cursor: initialCursor,
  191. paramStatus,
  192. optionsParam: options.param,
  193. apiUrl: providerApiUrl,
  194. });
  195. if (baselineResult) {
  196. this.lastSyncSummary = baselineResult;
  197. return baselineResult;
  198. }
  199. }
  200. while (true) {
  201. const body = this.buildProviderBody({
  202. pageNum,
  203. pageSize: cursor.pageSize,
  204. status: paramStatus,
  205. // fullSync: no updatedAt filter
  206. updatedAt: fullSync ? undefined : effectiveUpdatedAtCursor,
  207. extraParam: options.param,
  208. });
  209. this.logger.log(
  210. `[syncFromProvider] POST pageNum=${pageNum} pageSize=${cursor.pageSize} status=${paramStatus} updatedAt=${fullSync ? '(none)' : (body.param.updatedAt ?? '(none)')}`,
  211. );
  212. const rawList = await this.fetchPage(providerApiUrl, body);
  213. if (!rawList.length) {
  214. this.logger.log(
  215. `[syncFromProvider] No more records (pageNum=${pageNum}). Stop.`,
  216. );
  217. // On completion:
  218. // - fullSync: reset pageNum to 1 and set lastFullSyncAt
  219. // - incremental: advance updatedAtCursor to maxUpdatedAtSeen, keep pageNum=1
  220. const fullSyncCompleted = fullSync;
  221. if (!fullSync && maxUpdatedAtSeen && imported > 0) {
  222. await this.saveCheckpoint({
  223. entity,
  224. updatedAtCursor: maxUpdatedAtSeen.toISOString(),
  225. fullSyncCompleted: false,
  226. });
  227. }
  228. await this.saveCursor({
  229. entity,
  230. fullSyncCompleted,
  231. });
  232. const result: ProviderVideoSyncResult = {
  233. imported,
  234. created,
  235. updated,
  236. skipped,
  237. errors: errors.length ? errors.slice(0, 10) : undefined,
  238. };
  239. this.lastSyncSummary = result;
  240. return result;
  241. }
  242. imported += rawList.length;
  243. const processed = await this.processProviderRawList(
  244. rawList,
  245. maxUpdatedAtSeen,
  246. );
  247. updated += processed.updated;
  248. skipped += processed.skipped;
  249. errors.push(...processed.errors);
  250. maxUpdatedAtSeen = processed.maxUpdatedAtSeen;
  251. // Persist progress so we can resume on crash
  252. await this.saveCursor({
  253. entity,
  254. fullSyncCompleted: false,
  255. });
  256. pageNum += 1;
  257. }
  258. } catch (e: any) {
  259. this.logger.error(
  260. `[syncFromProvider] Unexpected error: ${e?.message ?? e}`,
  261. );
  262. // Best-effort cursor persistence
  263. try {
  264. await this.saveCursor({
  265. entity,
  266. fullSyncCompleted: false,
  267. });
  268. } catch (saveErr: any) {
  269. this.logger.error(
  270. `[syncFromProvider] Failed to persist cursor after error: ${saveErr?.message ?? saveErr}`,
  271. );
  272. }
  273. const result: ProviderVideoSyncResult = {
  274. imported,
  275. created,
  276. updated,
  277. skipped,
  278. errors: [
  279. ...(errors.length ? errors.slice(0, 9) : []),
  280. { error: e?.message ?? 'Unexpected error' },
  281. ],
  282. };
  283. this.lastSyncSummary = result;
  284. return result;
  285. }
  286. }
  287. private async runBaselinePartialIfNeeded(args: {
  288. entity: EntityType;
  289. cursor: SyncCursor;
  290. paramStatus: string;
  291. apiUrl: string;
  292. optionsParam?: ProviderVideoSyncOptions['param'];
  293. }): Promise<ProviderVideoSyncResult | null> {
  294. if (!args.cursor || args.cursor.updatedAtCursor !== undefined) {
  295. return null;
  296. }
  297. const probeBody = this.buildProviderBody({
  298. pageNum: 1,
  299. pageSize: args.cursor.pageSize,
  300. status: args.paramStatus,
  301. updatedAt: undefined,
  302. extraParam: args.optionsParam,
  303. });
  304. const pagination = await this.probeProviderForPaging(
  305. args.apiUrl,
  306. probeBody,
  307. );
  308. let totalPages = pagination.totalPages;
  309. if (totalPages === undefined && pagination.total !== undefined) {
  310. totalPages = Math.max(
  311. 0,
  312. Math.ceil(pagination.total / args.cursor.pageSize),
  313. );
  314. }
  315. if (!totalPages || totalPages < 1) {
  316. this.logger.warn(
  317. '[syncFromProvider] Baseline partial skipped because provider did not disclose total/pages; cannot compute bottom→top range.',
  318. );
  319. return null;
  320. }
  321. const pagesNeeded = Math.min(
  322. totalPages,
  323. Math.ceil(this.BASELINE_PARTIAL_COUNT / args.cursor.pageSize),
  324. );
  325. if (pagesNeeded <= 0) {
  326. return null;
  327. }
  328. const startPage = totalPages;
  329. const endPage = Math.max(1, totalPages - pagesNeeded + 1);
  330. this.logger.log(
  331. `[syncFromProvider] Baseline partial (first-ever) running pages ${startPage} down to ${endPage}`,
  332. );
  333. let imported = 0;
  334. let updated = 0;
  335. let skipped = 0;
  336. const errors: Array<{ id?: string; error: string }> = [];
  337. let maxUpdatedAtSeen: Date | null = null;
  338. for (let page = startPage; page >= endPage; page -= 1) {
  339. const body = this.buildProviderBody({
  340. pageNum: page,
  341. pageSize: args.cursor.pageSize,
  342. status: args.paramStatus,
  343. updatedAt: undefined,
  344. extraParam: args.optionsParam,
  345. });
  346. this.logger.log(`[syncFromProvider] param body ${JSON.stringify(body)} `);
  347. const rawList = await this.fetchPage(args.apiUrl, body);
  348. if (!rawList.length) {
  349. this.logger.log(
  350. `[syncFromProvider] Baseline partial page ${page} returned 0 records; continuing.`,
  351. );
  352. continue;
  353. }
  354. imported += rawList.length;
  355. const processed = await this.processProviderRawList(
  356. rawList,
  357. maxUpdatedAtSeen,
  358. );
  359. updated += processed.updated;
  360. skipped += processed.skipped;
  361. errors.push(...processed.errors);
  362. maxUpdatedAtSeen = processed.maxUpdatedAtSeen;
  363. }
  364. if (maxUpdatedAtSeen && imported > 0) {
  365. await this.saveCheckpoint({
  366. entity: args.entity,
  367. updatedAtCursor: maxUpdatedAtSeen.toISOString(),
  368. fullSyncCompleted: false,
  369. });
  370. }
  371. return {
  372. imported,
  373. created: 0,
  374. updated,
  375. skipped,
  376. errors: errors.length ? errors.slice(0, 10) : undefined,
  377. };
  378. }
  379. private async probeProviderForPaging(
  380. apiUrl: string,
  381. body: {
  382. pageNum: number;
  383. pageSize: number;
  384. param: Record<string, any>;
  385. },
  386. ): Promise<ProviderPagingInfo> {
  387. try {
  388. const response = await firstValueFrom(
  389. this.httpService.post(apiUrl, body, {
  390. headers: { 'Content-Type': 'application/json' },
  391. timeout: 30000,
  392. }),
  393. );
  394. return {
  395. total: this.extractNumberFromPaths(response.data, [
  396. 'total',
  397. 'data.total',
  398. 'data.totalCount',
  399. 'data.pageInfo.total',
  400. 'data.pageInfo.totalCount',
  401. ]),
  402. totalPages: this.extractNumberFromPaths(response.data, [
  403. 'pages',
  404. 'data.pages',
  405. 'data.totalPages',
  406. 'data.pageInfo.pages',
  407. 'data.pageInfo.totalPages',
  408. ]),
  409. };
  410. } catch (error: any) {
  411. this.logger.error(
  412. `[probeProviderForPaging] Provider API call failed: ${error?.message ?? error}`,
  413. );
  414. throw new Error(`Provider API error: ${error?.message ?? 'unknown'}`);
  415. }
  416. }
  417. private extractNumberFromPaths(
  418. data: any,
  419. paths: string[],
  420. ): number | undefined {
  421. if (!data || typeof data !== 'object') return undefined;
  422. for (const path of paths) {
  423. const value = path
  424. .split('.')
  425. .reduce<any>(
  426. (obj, key) => (obj && typeof obj === 'object' ? obj[key] : undefined),
  427. data,
  428. );
  429. if (value === undefined || value === null) continue;
  430. const num = typeof value === 'number' ? value : Number(value);
  431. if (Number.isFinite(num)) return num;
  432. }
  433. return undefined;
  434. }
  435. getLastSyncSummary(): ProviderVideoSyncResult | null {
  436. return this.lastSyncSummary;
  437. }
  438. private buildProviderBody(args: {
  439. pageNum: number;
  440. pageSize: number;
  441. status: string;
  442. updatedAt?: string;
  443. extraParam?: ProviderVideoSyncOptions['param'];
  444. }) {
  445. // Provider contract:
  446. // {
  447. // pageNum: 1,
  448. // pageSize: 200,
  449. // param: { status: "Completed", updatedAt: "ISO" }
  450. // }
  451. const param: Record<string, any> = {
  452. status: args.status,
  453. };
  454. // Keep only if present (incremental)
  455. if (args.updatedAt) param.updatedAt = args.updatedAt;
  456. // Merge any extraParam fields, but status/updatedAt above remain authoritative
  457. if (args.extraParam && typeof args.extraParam === 'object') {
  458. for (const [k, v] of Object.entries(args.extraParam)) {
  459. if (k === 'status' || k === 'updatedAt') continue;
  460. param[k] = v;
  461. }
  462. }
  463. return {
  464. pageNum: args.pageNum,
  465. pageSize: args.pageSize,
  466. param,
  467. };
  468. }
  469. private async fetchPage(
  470. apiUrl: string,
  471. body: {
  472. pageNum: number;
  473. pageSize: number;
  474. param: Record<string, any>;
  475. },
  476. ): Promise<RawProviderVideo[]> {
  477. try {
  478. // Provider expects { data: "<json string>" } (based on code=400 Field=data expecting string)
  479. const wrappedBody = {
  480. data: JSON.stringify({
  481. pageNum: body.pageNum,
  482. pageSize: body.pageSize,
  483. param: body.param,
  484. }),
  485. };
  486. const response = await firstValueFrom(
  487. this.httpService.post(apiUrl, wrappedBody, {
  488. headers: { 'Content-Type': 'application/json' },
  489. timeout: 30_000,
  490. }),
  491. );
  492. // Axios response unwrap: providerJson is the actual provider payload
  493. const providerJson = (response as any)?.data ?? response;
  494. // Log a small preview for debugging (avoid huge logs)
  495. this.logger.log(
  496. `[fetchPage] Provider response preview: ${JSON.stringify(
  497. providerJson,
  498. ).slice(0, 400)}...`,
  499. );
  500. // Fail fast on provider errors (prevents "successful" runs with empty lists)
  501. const code = (providerJson as any)?.code;
  502. if (code !== 200) {
  503. const msg = (providerJson as any)?.msg ?? 'unknown';
  504. const tip = (providerJson as any)?.tip ?? '';
  505. throw new Error(
  506. `Provider error code=${code} msg=${msg}${tip ? ` tip=${tip}` : ''}`,
  507. );
  508. }
  509. const list = this.extractList(providerJson);
  510. this.logger.log(`[fetchPage] Received ${list.length} items`);
  511. return list;
  512. } catch (error: any) {
  513. this.logger.error(
  514. `[fetchPage] Provider API call failed: ${error?.message ?? error}`,
  515. );
  516. throw new Error(`Provider API error: ${error?.message ?? 'unknown'}`);
  517. }
  518. }
  519. private async processProviderRawList(
  520. rawList: RawProviderVideo[],
  521. currentMaxUpdatedAt: Date | null,
  522. ): Promise<{
  523. updated: number;
  524. skipped: number;
  525. errors: Array<{ id?: string; error: string }>;
  526. maxUpdatedAtSeen: Date | null;
  527. }> {
  528. if (!rawList.length) {
  529. return {
  530. updated: 0,
  531. skipped: 0,
  532. errors: [],
  533. maxUpdatedAtSeen: currentMaxUpdatedAt,
  534. };
  535. }
  536. const normalized = rawList.map((item) => this.normalizeItem(item));
  537. const hasSecondTags = normalized.some(
  538. (v) => Array.isArray(v.secondTags) && v.secondTags.length > 0,
  539. );
  540. if (hasSecondTags) {
  541. await this.upsertSecondTagsFromVideos_NoUniqueName(normalized);
  542. }
  543. let maxUpdatedAtSeen = currentMaxUpdatedAt;
  544. for (const n of normalized) {
  545. const d = n.updatedAt as Date;
  546. if (!maxUpdatedAtSeen || d.getTime() > maxUpdatedAtSeen.getTime()) {
  547. maxUpdatedAtSeen = d;
  548. }
  549. }
  550. let updated = 0;
  551. let skipped = 0;
  552. const errors: Array<{ id?: string; error: string }> = [];
  553. for (let i = 0; i < normalized.length; i += this.BATCH_SIZE) {
  554. const batch = normalized.slice(i, i + this.BATCH_SIZE);
  555. // eslint-disable-next-line no-await-in-loop
  556. const outcomes = await Promise.all(batch.map((r) => this.upsertOne(r)));
  557. const okCount = outcomes.filter((o) => o.ok).length;
  558. const fail = outcomes.filter((o) => !o.ok) as Array<
  559. Extract<UpsertOutcome, { ok: false }>
  560. >;
  561. updated += okCount;
  562. skipped += fail.length;
  563. for (const f of fail) errors.push(f.error);
  564. }
  565. return {
  566. updated,
  567. skipped,
  568. errors,
  569. maxUpdatedAtSeen,
  570. };
  571. }
  572. private debugRespShape(resp: unknown) {
  573. const r: any = resp as any;
  574. const keys = r && typeof r === 'object' ? Object.keys(r).slice(0, 12) : [];
  575. const dataKeys =
  576. r?.data && typeof r.data === 'object'
  577. ? Object.keys(r.data).slice(0, 12)
  578. : [];
  579. const dataDataKeys =
  580. r?.data?.data && typeof r.data.data === 'object'
  581. ? Object.keys(r.data.data).slice(0, 12)
  582. : [];
  583. this.logger.warn(
  584. `[debugRespShape] topKeys=${JSON.stringify(keys)} dataKeys=${JSON.stringify(
  585. dataKeys,
  586. )} dataDataKeys=${JSON.stringify(dataDataKeys)} hasStatus=${Boolean(r?.status)} hasCode=${Boolean(
  587. r?.code,
  588. )} hasDataCode=${Boolean(r?.data?.code)}`,
  589. );
  590. }
  591. private extractList(apiResponse: unknown): RawProviderVideo[] {
  592. const data = apiResponse as any;
  593. if (Array.isArray(data)) return data as RawProviderVideo[];
  594. // ✅ axios response: { data: { code, data: { total, list } } }
  595. if (data?.data?.data?.list && Array.isArray(data.data.data.list)) {
  596. return data.data.data.list as RawProviderVideo[];
  597. }
  598. // provider json directly: { code, data: { total, list } }
  599. if (data?.data?.list && Array.isArray(data.data.list)) {
  600. return data.data.list as RawProviderVideo[];
  601. }
  602. if (data?.list && Array.isArray(data.list)) {
  603. return data.list as RawProviderVideo[];
  604. }
  605. if (data?.data?.records && Array.isArray(data.data.records)) {
  606. return data.data.records as RawProviderVideo[];
  607. }
  608. this.logger.warn(
  609. '[extractList] Unexpected API response structure, defaulting to empty list',
  610. );
  611. return [];
  612. }
  613. private normalizeItem(item: RawProviderVideo) {
  614. if (!item.id) throw new Error('Each item must have an id');
  615. if (!item.addedTime || !item.createdAt || !item.updatedAt) {
  616. throw new Error(`Item ${item.id} is missing required datetime fields`);
  617. }
  618. const addedTime = new Date(item.addedTime);
  619. const createdAt = new Date(item.createdAt);
  620. const updatedAt = new Date(item.updatedAt);
  621. if (
  622. isNaN(addedTime.getTime()) ||
  623. isNaN(createdAt.getTime()) ||
  624. isNaN(updatedAt.getTime())
  625. ) {
  626. throw new Error(`Item ${item.id} has invalid datetime format`);
  627. }
  628. return {
  629. id: item.id, // confirmed Mongo ObjectId string
  630. srcId: item.srcId ?? 0,
  631. title: item.title ?? '',
  632. checkSum: item.checkSum ?? '',
  633. type: item.type ?? '',
  634. formatType: item.formatType ?? 0,
  635. contentType: item.contentType ?? 0,
  636. coverType: item.coverType ?? 0,
  637. coverImg: item.coverImg ?? '',
  638. coverImgNew: item.coverImgNew ?? '',
  639. videoTime: item.videoTime ?? 0,
  640. publish: item.publish ?? '',
  641. country: item.country ?? '',
  642. firstTag: item.firstTag ?? '',
  643. secondTags: item.secondTags ?? [],
  644. mediaSet: item.mediaSet ?? '',
  645. preFileName: item.preFileName ?? '',
  646. status: item.status ?? '',
  647. desc: item.desc ?? '',
  648. size: BigInt(item.size ?? 0),
  649. bango: item.bango ?? '',
  650. actors: item.actors ?? [],
  651. studio: item.studio ?? '',
  652. addedTime,
  653. appids: item.appids ?? [],
  654. japanNames: item.japanNames ?? [],
  655. filename: item.filename ?? '',
  656. fieldNameFs: item.fieldNameFs ?? '',
  657. ext: item.ext ?? '',
  658. taskId: item.taskId ?? '',
  659. width: item.width ?? 0,
  660. height: item.height ?? 0,
  661. ratio: item.ratio ?? 0,
  662. frameRate: item.frameRate ?? '',
  663. syBitRate: item.syBitRate ?? '',
  664. vidBitRate: item.vidBitRate ?? '',
  665. proxyUpload: item.proxyUpload ?? 0,
  666. isAdd: item.isAdd ?? false,
  667. retry: item.retry ?? 0,
  668. notifySignal: item.notifySignal ?? false,
  669. mergeRetry: item.mergeRetry ?? 0,
  670. compressRetry: item.compressRetry ?? 0,
  671. segmentRetry: item.segmentRetry ?? 0,
  672. linodeRetry: item.linodeRetry ?? 0,
  673. failReason: item.failReason ?? '',
  674. deleteDisk: item.deleteDisk ?? false,
  675. infoTsName: item.infoTsName ?? '',
  676. createdAt,
  677. updatedAt,
  678. };
  679. }
  680. private async upsertOne(record: any): Promise<UpsertOutcome> {
  681. const id = record?.id as string | undefined;
  682. if (!id) return { ok: false, error: { error: 'Missing id' } };
  683. try {
  684. const { id: _, ...updateData } = record;
  685. await this.mongo.videoMedia.upsert({
  686. where: { id },
  687. create: record,
  688. update: updateData,
  689. });
  690. return { ok: true };
  691. } catch (e: any) {
  692. return {
  693. ok: false,
  694. error: { id, error: e?.message ?? 'Upsert failed' },
  695. };
  696. }
  697. }
  698. private async loadCursor(args: {
  699. entity: EntityType;
  700. pageSize: number;
  701. resetState: boolean;
  702. overridePageNum?: number;
  703. optionUpdatedAt?: string;
  704. fullSync: boolean;
  705. }): Promise<{ cursor: SyncCursor; checkpointUpdatedAtCursor?: string }> {
  706. if (args.resetState) {
  707. return {
  708. cursor: {
  709. pageNum: args.overridePageNum ?? 1,
  710. pageSize: args.pageSize,
  711. updatedAtCursor: args.fullSync ? undefined : args.optionUpdatedAt,
  712. },
  713. checkpointUpdatedAtCursor: undefined,
  714. };
  715. }
  716. const checkpoint = await this.loadCheckpoint(args.entity);
  717. const cursor: SyncCursor = {
  718. pageNum: args.overridePageNum ?? 1,
  719. pageSize: args.pageSize,
  720. updatedAtCursor: args.fullSync
  721. ? undefined
  722. : (checkpoint.updatedAtCursor ?? args.optionUpdatedAt),
  723. };
  724. return { cursor, checkpointUpdatedAtCursor: checkpoint.updatedAtCursor };
  725. }
  726. private async saveCursor(args: {
  727. entity: EntityType;
  728. fullSyncCompleted: boolean;
  729. }) {
  730. const now = new Date();
  731. const nowSec = Math.floor(Date.now() / 1000);
  732. await this.mongo.syncState.update({
  733. where: { entity: args.entity },
  734. data: {
  735. lastRunAt: now,
  736. lastFullSyncAt: args.fullSyncCompleted ? now : undefined,
  737. updatedAt: nowSec,
  738. },
  739. });
  740. }
  741. private async loadCheckpoint(entity: EntityType): Promise<{
  742. updatedAtCursor?: string;
  743. }> {
  744. const nowSec = Math.floor(Date.now() / 1000);
  745. const state = await this.mongo.syncState.upsert({
  746. where: { entity },
  747. update: {
  748. updatedAt: nowSec,
  749. },
  750. create: {
  751. entity,
  752. referId: null,
  753. lastRunAt: null,
  754. lastFullSyncAt: null,
  755. createdAt: nowSec,
  756. updatedAt: nowSec,
  757. },
  758. });
  759. const parsed = this.safeParseCursor(state.referId);
  760. return { updatedAtCursor: parsed?.updatedAtCursor };
  761. }
  762. private async saveCheckpoint(args: {
  763. entity: EntityType;
  764. updatedAtCursor?: string | null;
  765. fullSyncCompleted: boolean;
  766. }) {
  767. const now = new Date();
  768. const nowSec = Math.floor(Date.now() / 1000);
  769. await this.mongo.syncState.update({
  770. where: { entity: args.entity },
  771. data: {
  772. referId:
  773. args.updatedAtCursor !== undefined && args.updatedAtCursor !== null
  774. ? JSON.stringify({ updatedAtCursor: args.updatedAtCursor })
  775. : null,
  776. lastRunAt: now,
  777. lastFullSyncAt: args.fullSyncCompleted ? now : undefined,
  778. updatedAt: nowSec,
  779. },
  780. });
  781. }
  782. private safeParseCursor(
  783. raw: string | null | undefined,
  784. ): Partial<SyncCursor> | null {
  785. if (!raw) return null;
  786. try {
  787. const parsed = JSON.parse(raw) as any;
  788. if (!parsed || typeof parsed !== 'object') return null;
  789. return parsed as Partial<SyncCursor>;
  790. } catch {
  791. return null;
  792. }
  793. }
  794. private clampInt(n: number, min: number, max: number): number {
  795. const x = Number.isFinite(n) ? Math.trunc(n) : min;
  796. return Math.max(min, Math.min(max, x));
  797. }
  798. /**
  799. * Extract secondTags (string[]) from normalized video records and upsert into Tag collection.
  800. * - Dedup in-memory per call for performance
  801. * - Trims whitespace, filters empty
  802. * - Option B performance-first: upsert without pre-check
  803. */
  804. // private async upsertSecondTagsFromVideos(
  805. // normalizedVideos: Array<{ secondTags?: string[] }>,
  806. // ): Promise<UpsertTagsResult> {
  807. // // 1) Collect + normalize
  808. // const set = new Set<string>();
  809. // for (const v of normalizedVideos) {
  810. // const tags = v.secondTags ?? [];
  811. // for (const t of tags) {
  812. // if (typeof t !== 'string') continue;
  813. // const name = t.trim();
  814. // if (!name) continue;
  815. // set.add(name);
  816. // }
  817. // }
  818. // const names = Array.from(set);
  819. // if (!names.length) {
  820. // return { unique: 0, upserted: 0, skipped: 0, errors: [] };
  821. // }
  822. // // 2) Upsert in chunks (avoid massive Promise.all)
  823. // const CHUNK = 200;
  824. // let upserted = 0;
  825. // let skipped = 0;
  826. // const errors: Array<{ name: string; error: string }> = [];
  827. // for (let i = 0; i < names.length; i += CHUNK) {
  828. // const batch = names.slice(i, i + CHUNK);
  829. // // eslint-disable-next-line no-await-in-loop
  830. // const outcomes = await Promise.all(
  831. // batch.map(async (name) => {
  832. // try {
  833. // // 🔁 Adjust `where/create/update` if your Tag schema differs
  834. // await this.mongo.tag.upsert({
  835. // where: { name },
  836. // create: {
  837. // name,
  838. // // If Tag requires createdAt/updatedAt ints (seconds), uncomment:
  839. // // createdAt: Math.floor(Date.now() / 1000),
  840. // // updatedAt: Math.floor(Date.now() / 1000),
  841. // },
  842. // update: {
  843. // // keep it minimal; optionally touch updatedAt
  844. // // updatedAt: Math.floor(Date.now() / 1000),
  845. // },
  846. // });
  847. // return { ok: true as const };
  848. // } catch (e: any) {
  849. // return {
  850. // ok: false as const,
  851. // error: e?.message ?? 'Tag upsert failed',
  852. // };
  853. // }
  854. // }),
  855. // );
  856. // for (let j = 0; j < outcomes.length; j += 1) {
  857. // const o = outcomes[j];
  858. // if (o.ok) upserted += 1;
  859. // else {
  860. // skipped += 1;
  861. // errors.push({ name: batch[j], error: o.error });
  862. // }
  863. // }
  864. // }
  865. // if (errors.length) {
  866. // this.logger.warn(
  867. // `[upsertSecondTagsFromVideos] tag upsert errors=${errors.length}, sample=${JSON.stringify(
  868. // errors.slice(0, 3),
  869. // )}`,
  870. // );
  871. // } else {
  872. // this.logger.log(
  873. // `[upsertSecondTagsFromVideos] Upserted tags=${upserted} (unique=${names.length})`,
  874. // );
  875. // }
  876. // return {
  877. // unique: names.length,
  878. // upserted,
  879. // skipped,
  880. // errors,
  881. // };
  882. // }
  883. private async upsertSecondTagsFromVideos_NoUniqueName(
  884. normalizedVideos: Array<{ secondTags?: string[] }>,
  885. ): Promise<UpsertTagsResult> {
  886. try {
  887. const set = new Set<string>();
  888. for (const v of normalizedVideos) {
  889. const tags = v.secondTags ?? [];
  890. for (const t of tags) {
  891. if (typeof t !== 'string') continue;
  892. const name = t.trim();
  893. if (!name) continue;
  894. set.add(name);
  895. }
  896. }
  897. const names = Array.from(set);
  898. if (!names.length)
  899. return { unique: 0, upserted: 0, skipped: 0, errors: [] };
  900. // Concurrency limit to reduce race collisions and DB pressure
  901. const CONCURRENCY = 20;
  902. let idx = 0;
  903. let upserted = 0;
  904. let skipped = 0;
  905. const errors: Array<{ name: string; error: string }> = [];
  906. const worker = async () => {
  907. while (true) {
  908. const current = idx;
  909. idx += 1;
  910. if (current >= names.length) return;
  911. const name = names[current];
  912. try {
  913. // 1) check existence by name (NOT unique)
  914. const exists = await this.mongo.tag.findFirst({
  915. where: { name },
  916. select: { id: true },
  917. });
  918. if (exists?.id) {
  919. // already exists
  920. continue;
  921. }
  922. // 2) create if not exists
  923. await this.mongo.tag.create({
  924. data: {
  925. name,
  926. // If your Tag schema requires seconds fields:
  927. // createdAt: Math.floor(Date.now() / 1000),
  928. // updatedAt: Math.floor(Date.now() / 1000),
  929. },
  930. });
  931. upserted += 1;
  932. } catch (e: any) {
  933. // If another worker created it after our check, create may fail (duplicate on some index)
  934. // We treat that as skipped (safe).
  935. const msg = e?.message ?? 'Tag create failed';
  936. skipped += 1;
  937. errors.push({ name, error: msg });
  938. }
  939. }
  940. };
  941. await Promise.all(Array.from({ length: CONCURRENCY }, () => worker()));
  942. if (errors.length) {
  943. this.logger.warn(
  944. `[upsertSecondTagsFromVideos] errors=${errors.length}, sample=${JSON.stringify(
  945. errors.slice(0, 3),
  946. )}`,
  947. );
  948. } else {
  949. this.logger.log(
  950. `[upsertSecondTagsFromVideos] unique=${names.length} created=${upserted}`,
  951. );
  952. }
  953. return { unique: names.length, upserted, skipped, errors };
  954. } catch (error: any) {
  955. const message = error?.message ?? 'Unhandled tag upsert error';
  956. const trace = error?.stack ?? undefined;
  957. this.logger.error(
  958. `[upsertSecondTagsFromVideos_NoUniqueName] ${message}`,
  959. trace,
  960. );
  961. return {
  962. unique: 0,
  963. upserted: 0,
  964. skipped: 0,
  965. errors: [{ name: 'global', error: message }],
  966. };
  967. }
  968. }
  969. }