|
|
@@ -2,6 +2,7 @@
|
|
|
import { Injectable, Logger } from '@nestjs/common';
|
|
|
import { HttpService } from '@nestjs/axios';
|
|
|
import { MongoPrismaService } from '@box/db/prisma/mongo-prisma.service';
|
|
|
+import { SysConfigReaderService } from '@box/core/sys-config/sys-config-reader.service';
|
|
|
import { firstValueFrom } from 'rxjs';
|
|
|
import { EntityType } from '@prisma/mongo/client';
|
|
|
|
|
|
@@ -114,6 +115,11 @@ type SyncCursor = {
|
|
|
updatedAtCursor?: string;
|
|
|
};
|
|
|
|
|
|
+type ProviderPagingInfo = {
|
|
|
+ total?: number;
|
|
|
+ totalPages?: number;
|
|
|
+};
|
|
|
+
|
|
|
type UpsertOutcome =
|
|
|
| { ok: true }
|
|
|
| { ok: false; error: { id?: string; error: string } };
|
|
|
@@ -131,25 +137,31 @@ export class ProviderVideoSyncService {
|
|
|
|
|
|
private lastSyncSummary: ProviderVideoSyncResult | null = null;
|
|
|
|
|
|
- private readonly PROVIDER_API_URL =
|
|
|
- 'https://vm.rvakc.xyz/api/web/mediafile/search';
|
|
|
-
|
|
|
- private readonly DEFAULT_PROVIDER_CODE = 'RVAKC';
|
|
|
private readonly MAX_PAGE_SIZE = 500;
|
|
|
- private readonly DEFAULT_PAGE_SIZE = 200;
|
|
|
+ private readonly DEFAULT_PAGE_SIZE = 500;
|
|
|
private readonly BATCH_SIZE = 100;
|
|
|
+ private readonly BASELINE_PARTIAL_COUNT = 20000;
|
|
|
|
|
|
constructor(
|
|
|
private readonly mongo: MongoPrismaService,
|
|
|
private readonly httpService: HttpService,
|
|
|
+ private readonly sysConfigReader: SysConfigReaderService,
|
|
|
) {}
|
|
|
|
|
|
async syncFromProvider(
|
|
|
options: ProviderVideoSyncOptions = {},
|
|
|
): Promise<ProviderVideoSyncResult> {
|
|
|
- const providerCode = options.providerCode ?? this.DEFAULT_PROVIDER_CODE;
|
|
|
+ const providerConfig = await this.sysConfigReader.getProviderConfig();
|
|
|
+ const providerCode = options.providerCode ?? providerConfig.providerCode;
|
|
|
+ const providerApiUrl = providerConfig.apiUrl;
|
|
|
+ if (!providerApiUrl) {
|
|
|
+ throw new Error(
|
|
|
+ 'sysConfig.provider.apiUrl is required for provider sync',
|
|
|
+ );
|
|
|
+ }
|
|
|
|
|
|
- const requestedPageSize = options.pageSize ?? this.DEFAULT_PAGE_SIZE;
|
|
|
+ const defaultPageSize = providerConfig.itemsLimit ?? this.DEFAULT_PAGE_SIZE;
|
|
|
+ const requestedPageSize = options.pageSize ?? defaultPageSize;
|
|
|
const pageSize = this.clampInt(requestedPageSize, 1, this.MAX_PAGE_SIZE);
|
|
|
|
|
|
const fullSync = options.fullSync ?? false;
|
|
|
@@ -166,14 +178,15 @@ export class ProviderVideoSyncService {
|
|
|
);
|
|
|
|
|
|
// Load cursor from SyncState (or fresh if resetState)
|
|
|
- const { cursor: initialCursor } = await this.loadCursor({
|
|
|
- entity,
|
|
|
- pageSize,
|
|
|
- resetState,
|
|
|
- overridePageNum: options.pageNum,
|
|
|
- optionUpdatedAt,
|
|
|
- fullSync,
|
|
|
- });
|
|
|
+ const { cursor: initialCursor, checkpointUpdatedAtCursor } =
|
|
|
+ await this.loadCursor({
|
|
|
+ entity,
|
|
|
+ pageSize,
|
|
|
+ resetState,
|
|
|
+ overridePageNum: options.pageNum,
|
|
|
+ optionUpdatedAt,
|
|
|
+ fullSync,
|
|
|
+ });
|
|
|
|
|
|
// Counters (Option B: created always 0, updated counts successful upserts)
|
|
|
let imported = 0;
|
|
|
@@ -195,16 +208,34 @@ export class ProviderVideoSyncService {
|
|
|
updatedAtCursor: initialCursor.updatedAtCursor,
|
|
|
};
|
|
|
|
|
|
+ const effectiveUpdatedAtCursor = fullSync
|
|
|
+ ? undefined
|
|
|
+ : (options.param?.updatedAt ?? checkpointUpdatedAtCursor);
|
|
|
+
|
|
|
+ const shouldRunBaselinePartial = !fullSync && !checkpointUpdatedAtCursor;
|
|
|
+
|
|
|
try {
|
|
|
+ if (shouldRunBaselinePartial) {
|
|
|
+ const baselineResult = await this.runBaselinePartialIfNeeded({
|
|
|
+ entity,
|
|
|
+ cursor: initialCursor,
|
|
|
+ paramStatus,
|
|
|
+ optionsParam: options.param,
|
|
|
+ apiUrl: providerApiUrl,
|
|
|
+ });
|
|
|
+ if (baselineResult) {
|
|
|
+ this.lastSyncSummary = baselineResult;
|
|
|
+ return baselineResult;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
while (true) {
|
|
|
const body = this.buildProviderBody({
|
|
|
pageNum,
|
|
|
pageSize: cursor.pageSize,
|
|
|
status: paramStatus,
|
|
|
// fullSync: no updatedAt filter
|
|
|
- updatedAt: fullSync
|
|
|
- ? undefined
|
|
|
- : (cursor.updatedAtCursor ?? optionUpdatedAt),
|
|
|
+ updatedAt: fullSync ? undefined : effectiveUpdatedAtCursor,
|
|
|
extraParam: options.param,
|
|
|
});
|
|
|
|
|
|
@@ -212,7 +243,7 @@ export class ProviderVideoSyncService {
|
|
|
`[syncFromProvider] POST pageNum=${pageNum} pageSize=${cursor.pageSize} status=${paramStatus} updatedAt=${fullSync ? '(none)' : (body.param.updatedAt ?? '(none)')}`,
|
|
|
);
|
|
|
|
|
|
- const rawList = await this.fetchPage(body);
|
|
|
+ const rawList = await this.fetchPage(providerApiUrl, body);
|
|
|
if (!rawList.length) {
|
|
|
this.logger.log(
|
|
|
`[syncFromProvider] No more records (pageNum=${pageNum}). Stop.`,
|
|
|
@@ -223,20 +254,16 @@ export class ProviderVideoSyncService {
|
|
|
// - incremental: advance updatedAtCursor to maxUpdatedAtSeen, keep pageNum=1
|
|
|
const fullSyncCompleted = fullSync;
|
|
|
|
|
|
- if (fullSync) {
|
|
|
- cursor.pageNum = 1;
|
|
|
- // Optional visibility: store last seen updatedAt as updatedAtCursor too
|
|
|
- if (maxUpdatedAtSeen)
|
|
|
- cursor.updatedAtCursor = maxUpdatedAtSeen.toISOString();
|
|
|
- } else {
|
|
|
- cursor.pageNum = 1;
|
|
|
- if (maxUpdatedAtSeen)
|
|
|
- cursor.updatedAtCursor = maxUpdatedAtSeen.toISOString();
|
|
|
+ if (!fullSync && maxUpdatedAtSeen && imported > 0) {
|
|
|
+ await this.saveCheckpoint({
|
|
|
+ entity,
|
|
|
+ updatedAtCursor: maxUpdatedAtSeen.toISOString(),
|
|
|
+ fullSyncCompleted: false,
|
|
|
+ });
|
|
|
}
|
|
|
|
|
|
await this.saveCursor({
|
|
|
entity,
|
|
|
- cursor,
|
|
|
fullSyncCompleted,
|
|
|
});
|
|
|
|
|
|
@@ -252,60 +279,18 @@ export class ProviderVideoSyncService {
|
|
|
}
|
|
|
|
|
|
imported += rawList.length;
|
|
|
-
|
|
|
- const normalized = rawList.map((item) => this.normalizeItem(item));
|
|
|
-
|
|
|
- const hasSecondTags = normalized.some(
|
|
|
- (v) => Array.isArray(v.secondTags) && v.secondTags.length > 0,
|
|
|
+ const processed = await this.processProviderRawList(
|
|
|
+ rawList,
|
|
|
+ maxUpdatedAtSeen,
|
|
|
);
|
|
|
-
|
|
|
- if (hasSecondTags) {
|
|
|
- await this.upsertSecondTagsFromVideos_NoUniqueName(normalized);
|
|
|
- }
|
|
|
-
|
|
|
- // update maxUpdatedAtSeen for cursor advance (incremental correctness)
|
|
|
- for (const n of normalized) {
|
|
|
- const d = n.updatedAt as Date;
|
|
|
- if (!maxUpdatedAtSeen || d.getTime() > maxUpdatedAtSeen.getTime()) {
|
|
|
- maxUpdatedAtSeen = d;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- // Upsert in batches (Option B)
|
|
|
- for (let i = 0; i < normalized.length; i += this.BATCH_SIZE) {
|
|
|
- const batch = normalized.slice(i, i + this.BATCH_SIZE);
|
|
|
-
|
|
|
- // eslint-disable-next-line no-await-in-loop
|
|
|
- const outcomes = await Promise.all(
|
|
|
- batch.map((r) => this.upsertOne(r)),
|
|
|
- );
|
|
|
-
|
|
|
- const okCount = outcomes.filter((o) => o.ok).length;
|
|
|
- const fail = outcomes.filter((o) => !o.ok) as Array<
|
|
|
- Extract<UpsertOutcome, { ok: false }>
|
|
|
- >;
|
|
|
-
|
|
|
- updated += okCount;
|
|
|
- skipped += fail.length;
|
|
|
- for (const f of fail) errors.push(f.error);
|
|
|
- }
|
|
|
+ updated += processed.updated;
|
|
|
+ skipped += processed.skipped;
|
|
|
+ errors.push(...processed.errors);
|
|
|
+ maxUpdatedAtSeen = processed.maxUpdatedAtSeen;
|
|
|
|
|
|
// Persist progress so we can resume on crash
|
|
|
- if (fullSync) {
|
|
|
- cursor.pageNum = pageNum + 1;
|
|
|
- // Optional: keep moving max updatedAt for visibility
|
|
|
- if (maxUpdatedAtSeen)
|
|
|
- cursor.updatedAtCursor = maxUpdatedAtSeen.toISOString();
|
|
|
- } else {
|
|
|
- // incremental resumes by updatedAtCursor, so keep pageNum=1
|
|
|
- cursor.pageNum = 1;
|
|
|
- if (maxUpdatedAtSeen)
|
|
|
- cursor.updatedAtCursor = maxUpdatedAtSeen.toISOString();
|
|
|
- }
|
|
|
-
|
|
|
await this.saveCursor({
|
|
|
entity,
|
|
|
- cursor,
|
|
|
fullSyncCompleted: false,
|
|
|
});
|
|
|
|
|
|
@@ -318,13 +303,8 @@ export class ProviderVideoSyncService {
|
|
|
|
|
|
// Best-effort cursor persistence
|
|
|
try {
|
|
|
- if (!fullSync && maxUpdatedAtSeen) {
|
|
|
- cursor.pageNum = 1;
|
|
|
- cursor.updatedAtCursor = maxUpdatedAtSeen.toISOString();
|
|
|
- }
|
|
|
await this.saveCursor({
|
|
|
entity,
|
|
|
- cursor,
|
|
|
fullSyncCompleted: false,
|
|
|
});
|
|
|
} catch (saveErr: any) {
|
|
|
@@ -348,6 +328,173 @@ export class ProviderVideoSyncService {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ private async runBaselinePartialIfNeeded(args: {
|
|
|
+ entity: EntityType;
|
|
|
+ cursor: SyncCursor;
|
|
|
+ paramStatus: string;
|
|
|
+ apiUrl: string;
|
|
|
+ optionsParam?: ProviderVideoSyncOptions['param'];
|
|
|
+ }): Promise<ProviderVideoSyncResult | null> {
|
|
|
+ if (!args.cursor || args.cursor.updatedAtCursor !== undefined) {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
+ const probeBody = this.buildProviderBody({
|
|
|
+ pageNum: 1,
|
|
|
+ pageSize: args.cursor.pageSize,
|
|
|
+ status: args.paramStatus,
|
|
|
+ updatedAt: undefined,
|
|
|
+ extraParam: args.optionsParam,
|
|
|
+ });
|
|
|
+
|
|
|
+ const pagination = await this.probeProviderForPaging(
|
|
|
+ args.apiUrl,
|
|
|
+ probeBody,
|
|
|
+ );
|
|
|
+
|
|
|
+ let totalPages = pagination.totalPages;
|
|
|
+ if (totalPages === undefined && pagination.total !== undefined) {
|
|
|
+ totalPages = Math.max(
|
|
|
+ 0,
|
|
|
+ Math.ceil(pagination.total / args.cursor.pageSize),
|
|
|
+ );
|
|
|
+ }
|
|
|
+
|
|
|
+ if (!totalPages || totalPages < 1) {
|
|
|
+ this.logger.warn(
|
|
|
+ '[syncFromProvider] Baseline partial skipped because provider did not disclose total/pages; cannot compute bottom→top range.',
|
|
|
+ );
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
+ const pagesNeeded = Math.min(
|
|
|
+ totalPages,
|
|
|
+ Math.ceil(this.BASELINE_PARTIAL_COUNT / args.cursor.pageSize),
|
|
|
+ );
|
|
|
+
|
|
|
+ if (pagesNeeded <= 0) {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
+ const startPage = totalPages;
|
|
|
+ const endPage = Math.max(1, totalPages - pagesNeeded + 1);
|
|
|
+
|
|
|
+ this.logger.log(
|
|
|
+ `[syncFromProvider] Baseline partial (first-ever) running pages ${startPage} down to ${endPage}`,
|
|
|
+ );
|
|
|
+
|
|
|
+ let imported = 0;
|
|
|
+ let updated = 0;
|
|
|
+ let skipped = 0;
|
|
|
+ const errors: Array<{ id?: string; error: string }> = [];
|
|
|
+ let maxUpdatedAtSeen: Date | null = null;
|
|
|
+
|
|
|
+ for (let page = startPage; page >= endPage; page -= 1) {
|
|
|
+ const body = this.buildProviderBody({
|
|
|
+ pageNum: page,
|
|
|
+ pageSize: args.cursor.pageSize,
|
|
|
+ status: args.paramStatus,
|
|
|
+ updatedAt: undefined,
|
|
|
+ extraParam: args.optionsParam,
|
|
|
+ });
|
|
|
+
|
|
|
+ this.logger.log(`[syncFromProvider] param body ${JSON.stringify(body)} `);
|
|
|
+
|
|
|
+ const rawList = await this.fetchPage(args.apiUrl, body);
|
|
|
+ if (!rawList.length) {
|
|
|
+ this.logger.log(
|
|
|
+ `[syncFromProvider] Baseline partial page ${page} returned 0 records; continuing.`,
|
|
|
+ );
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ imported += rawList.length;
|
|
|
+ const processed = await this.processProviderRawList(
|
|
|
+ rawList,
|
|
|
+ maxUpdatedAtSeen,
|
|
|
+ );
|
|
|
+ updated += processed.updated;
|
|
|
+ skipped += processed.skipped;
|
|
|
+ errors.push(...processed.errors);
|
|
|
+ maxUpdatedAtSeen = processed.maxUpdatedAtSeen;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (maxUpdatedAtSeen && imported > 0) {
|
|
|
+ await this.saveCheckpoint({
|
|
|
+ entity: args.entity,
|
|
|
+ updatedAtCursor: maxUpdatedAtSeen.toISOString(),
|
|
|
+ fullSyncCompleted: false,
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ return {
|
|
|
+ imported,
|
|
|
+ created: 0,
|
|
|
+ updated,
|
|
|
+ skipped,
|
|
|
+ errors: errors.length ? errors.slice(0, 10) : undefined,
|
|
|
+ };
|
|
|
+ }
|
|
|
+
|
|
|
+ private async probeProviderForPaging(
|
|
|
+ apiUrl: string,
|
|
|
+ body: {
|
|
|
+ pageNum: number;
|
|
|
+ pageSize: number;
|
|
|
+ param: Record<string, any>;
|
|
|
+ },
|
|
|
+ ): Promise<ProviderPagingInfo> {
|
|
|
+ try {
|
|
|
+ const response = await firstValueFrom(
|
|
|
+ this.httpService.post(apiUrl, body, {
|
|
|
+ headers: { 'Content-Type': 'application/json' },
|
|
|
+ timeout: 30000,
|
|
|
+ }),
|
|
|
+ );
|
|
|
+
|
|
|
+ return {
|
|
|
+ total: this.extractNumberFromPaths(response.data, [
|
|
|
+ 'total',
|
|
|
+ 'data.total',
|
|
|
+ 'data.totalCount',
|
|
|
+ 'data.pageInfo.total',
|
|
|
+ 'data.pageInfo.totalCount',
|
|
|
+ ]),
|
|
|
+ totalPages: this.extractNumberFromPaths(response.data, [
|
|
|
+ 'pages',
|
|
|
+ 'data.pages',
|
|
|
+ 'data.totalPages',
|
|
|
+ 'data.pageInfo.pages',
|
|
|
+ 'data.pageInfo.totalPages',
|
|
|
+ ]),
|
|
|
+ };
|
|
|
+ } catch (error: any) {
|
|
|
+ this.logger.error(
|
|
|
+ `[probeProviderForPaging] Provider API call failed: ${error?.message ?? error}`,
|
|
|
+ );
|
|
|
+ throw new Error(`Provider API error: ${error?.message ?? 'unknown'}`);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private extractNumberFromPaths(
|
|
|
+ data: any,
|
|
|
+ paths: string[],
|
|
|
+ ): number | undefined {
|
|
|
+ if (!data || typeof data !== 'object') return undefined;
|
|
|
+ for (const path of paths) {
|
|
|
+ const value = path
|
|
|
+ .split('.')
|
|
|
+ .reduce<any>(
|
|
|
+ (obj, key) => (obj && typeof obj === 'object' ? obj[key] : undefined),
|
|
|
+ data,
|
|
|
+ );
|
|
|
+ if (value === undefined || value === null) continue;
|
|
|
+ const num = typeof value === 'number' ? value : Number(value);
|
|
|
+ if (Number.isFinite(num)) return num;
|
|
|
+ }
|
|
|
+ return undefined;
|
|
|
+ }
|
|
|
+
|
|
|
getLastSyncSummary(): ProviderVideoSyncResult | null {
|
|
|
return this.lastSyncSummary;
|
|
|
}
|
|
|
@@ -386,21 +533,52 @@ export class ProviderVideoSyncService {
|
|
|
param,
|
|
|
};
|
|
|
}
|
|
|
-
|
|
|
- private async fetchPage(body: {
|
|
|
- pageNum: number;
|
|
|
- pageSize: number;
|
|
|
- param: Record<string, any>;
|
|
|
- }): Promise<RawProviderVideo[]> {
|
|
|
+ private async fetchPage(
|
|
|
+ apiUrl: string,
|
|
|
+ body: {
|
|
|
+ pageNum: number;
|
|
|
+ pageSize: number;
|
|
|
+ param: Record<string, any>;
|
|
|
+ },
|
|
|
+ ): Promise<RawProviderVideo[]> {
|
|
|
try {
|
|
|
+ // Provider expects { data: "<json string>" } (based on code=400 Field=data expecting string)
|
|
|
+ const wrappedBody = {
|
|
|
+ data: JSON.stringify({
|
|
|
+ pageNum: body.pageNum,
|
|
|
+ pageSize: body.pageSize,
|
|
|
+ param: body.param,
|
|
|
+ }),
|
|
|
+ };
|
|
|
+
|
|
|
const response = await firstValueFrom(
|
|
|
- this.httpService.post(this.PROVIDER_API_URL, body, {
|
|
|
+ this.httpService.post(apiUrl, wrappedBody, {
|
|
|
headers: { 'Content-Type': 'application/json' },
|
|
|
- timeout: 30000,
|
|
|
+ timeout: 30_000,
|
|
|
}),
|
|
|
);
|
|
|
|
|
|
- const list = this.extractList(response.data);
|
|
|
+ // Axios response unwrap: providerJson is the actual provider payload
|
|
|
+ const providerJson = (response as any)?.data ?? response;
|
|
|
+
|
|
|
+ // Log a small preview for debugging (avoid huge logs)
|
|
|
+ this.logger.log(
|
|
|
+ `[fetchPage] Provider response preview: ${JSON.stringify(
|
|
|
+ providerJson,
|
|
|
+ ).slice(0, 400)}...`,
|
|
|
+ );
|
|
|
+
|
|
|
+ // Fail fast on provider errors (prevents "successful" runs with empty lists)
|
|
|
+ const code = (providerJson as any)?.code;
|
|
|
+ if (code !== 200) {
|
|
|
+ const msg = (providerJson as any)?.msg ?? 'unknown';
|
|
|
+ const tip = (providerJson as any)?.tip ?? '';
|
|
|
+ throw new Error(
|
|
|
+ `Provider error code=${code} msg=${msg}${tip ? ` tip=${tip}` : ''}`,
|
|
|
+ );
|
|
|
+ }
|
|
|
+
|
|
|
+ const list = this.extractList(providerJson);
|
|
|
this.logger.log(`[fetchPage] Received ${list.length} items`);
|
|
|
return list;
|
|
|
} catch (error: any) {
|
|
|
@@ -411,11 +589,101 @@ export class ProviderVideoSyncService {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ private async processProviderRawList(
|
|
|
+ rawList: RawProviderVideo[],
|
|
|
+ currentMaxUpdatedAt: Date | null,
|
|
|
+ ): Promise<{
|
|
|
+ updated: number;
|
|
|
+ skipped: number;
|
|
|
+ errors: Array<{ id?: string; error: string }>;
|
|
|
+ maxUpdatedAtSeen: Date | null;
|
|
|
+ }> {
|
|
|
+ if (!rawList.length) {
|
|
|
+ return {
|
|
|
+ updated: 0,
|
|
|
+ skipped: 0,
|
|
|
+ errors: [],
|
|
|
+ maxUpdatedAtSeen: currentMaxUpdatedAt,
|
|
|
+ };
|
|
|
+ }
|
|
|
+
|
|
|
+ const normalized = rawList.map((item) => this.normalizeItem(item));
|
|
|
+
|
|
|
+ const hasSecondTags = normalized.some(
|
|
|
+ (v) => Array.isArray(v.secondTags) && v.secondTags.length > 0,
|
|
|
+ );
|
|
|
+
|
|
|
+ if (hasSecondTags) {
|
|
|
+ await this.upsertSecondTagsFromVideos_NoUniqueName(normalized);
|
|
|
+ }
|
|
|
+
|
|
|
+ let maxUpdatedAtSeen = currentMaxUpdatedAt;
|
|
|
+ for (const n of normalized) {
|
|
|
+ const d = n.updatedAt as Date;
|
|
|
+ if (!maxUpdatedAtSeen || d.getTime() > maxUpdatedAtSeen.getTime()) {
|
|
|
+ maxUpdatedAtSeen = d;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ let updated = 0;
|
|
|
+ let skipped = 0;
|
|
|
+ const errors: Array<{ id?: string; error: string }> = [];
|
|
|
+
|
|
|
+ for (let i = 0; i < normalized.length; i += this.BATCH_SIZE) {
|
|
|
+ const batch = normalized.slice(i, i + this.BATCH_SIZE);
|
|
|
+
|
|
|
+ // eslint-disable-next-line no-await-in-loop
|
|
|
+ const outcomes = await Promise.all(batch.map((r) => this.upsertOne(r)));
|
|
|
+
|
|
|
+ const okCount = outcomes.filter((o) => o.ok).length;
|
|
|
+ const fail = outcomes.filter((o) => !o.ok) as Array<
|
|
|
+ Extract<UpsertOutcome, { ok: false }>
|
|
|
+ >;
|
|
|
+
|
|
|
+ updated += okCount;
|
|
|
+ skipped += fail.length;
|
|
|
+ for (const f of fail) errors.push(f.error);
|
|
|
+ }
|
|
|
+
|
|
|
+ return {
|
|
|
+ updated,
|
|
|
+ skipped,
|
|
|
+ errors,
|
|
|
+ maxUpdatedAtSeen,
|
|
|
+ };
|
|
|
+ }
|
|
|
+
|
|
|
+ private debugRespShape(resp: unknown) {
|
|
|
+ const r: any = resp as any;
|
|
|
+ const keys = r && typeof r === 'object' ? Object.keys(r).slice(0, 12) : [];
|
|
|
+ const dataKeys =
|
|
|
+ r?.data && typeof r.data === 'object'
|
|
|
+ ? Object.keys(r.data).slice(0, 12)
|
|
|
+ : [];
|
|
|
+ const dataDataKeys =
|
|
|
+ r?.data?.data && typeof r.data.data === 'object'
|
|
|
+ ? Object.keys(r.data.data).slice(0, 12)
|
|
|
+ : [];
|
|
|
+ this.logger.warn(
|
|
|
+ `[debugRespShape] topKeys=${JSON.stringify(keys)} dataKeys=${JSON.stringify(
|
|
|
+ dataKeys,
|
|
|
+ )} dataDataKeys=${JSON.stringify(dataDataKeys)} hasStatus=${Boolean(r?.status)} hasCode=${Boolean(
|
|
|
+ r?.code,
|
|
|
+ )} hasDataCode=${Boolean(r?.data?.code)}`,
|
|
|
+ );
|
|
|
+ }
|
|
|
+
|
|
|
private extractList(apiResponse: unknown): RawProviderVideo[] {
|
|
|
const data = apiResponse as any;
|
|
|
|
|
|
if (Array.isArray(data)) return data as RawProviderVideo[];
|
|
|
|
|
|
+ // ✅ axios response: { data: { code, data: { total, list } } }
|
|
|
+ if (data?.data?.data?.list && Array.isArray(data.data.data.list)) {
|
|
|
+ return data.data.data.list as RawProviderVideo[];
|
|
|
+ }
|
|
|
+
|
|
|
+ // provider json directly: { code, data: { total, list } }
|
|
|
if (data?.data?.list && Array.isArray(data.data.list)) {
|
|
|
return data.data.list as RawProviderVideo[];
|
|
|
}
|
|
|
@@ -551,9 +819,7 @@ export class ProviderVideoSyncService {
|
|
|
overridePageNum?: number;
|
|
|
optionUpdatedAt?: string;
|
|
|
fullSync: boolean;
|
|
|
- }): Promise<{ cursor: SyncCursor; hasState: boolean }> {
|
|
|
- const nowSec = Math.floor(Date.now() / 1000);
|
|
|
-
|
|
|
+ }): Promise<{ cursor: SyncCursor; checkpointUpdatedAtCursor?: string }> {
|
|
|
if (args.resetState) {
|
|
|
return {
|
|
|
cursor: {
|
|
|
@@ -561,41 +827,25 @@ export class ProviderVideoSyncService {
|
|
|
pageSize: args.pageSize,
|
|
|
updatedAtCursor: args.fullSync ? undefined : args.optionUpdatedAt,
|
|
|
},
|
|
|
- hasState: false,
|
|
|
+ checkpointUpdatedAtCursor: undefined,
|
|
|
};
|
|
|
}
|
|
|
|
|
|
- const state = await this.mongo.syncState.upsert({
|
|
|
- where: { entity: args.entity },
|
|
|
- update: {
|
|
|
- updatedAt: nowSec,
|
|
|
- },
|
|
|
- create: {
|
|
|
- entity: args.entity,
|
|
|
- referId: null,
|
|
|
- lastRunAt: null,
|
|
|
- lastFullSyncAt: null,
|
|
|
- createdAt: nowSec,
|
|
|
- updatedAt: nowSec,
|
|
|
- },
|
|
|
- });
|
|
|
-
|
|
|
- const parsed = this.safeParseCursor(state.referId);
|
|
|
+ const checkpoint = await this.loadCheckpoint(args.entity);
|
|
|
|
|
|
const cursor: SyncCursor = {
|
|
|
- pageNum: args.overridePageNum ?? parsed?.pageNum ?? 1,
|
|
|
+ pageNum: args.overridePageNum ?? 1,
|
|
|
pageSize: args.pageSize,
|
|
|
updatedAtCursor: args.fullSync
|
|
|
? undefined
|
|
|
- : (parsed?.updatedAtCursor ?? args.optionUpdatedAt),
|
|
|
+ : (checkpoint.updatedAtCursor ?? args.optionUpdatedAt),
|
|
|
};
|
|
|
|
|
|
- return { cursor, hasState: Boolean(state.referId) };
|
|
|
+ return { cursor, checkpointUpdatedAtCursor: checkpoint.updatedAtCursor };
|
|
|
}
|
|
|
|
|
|
private async saveCursor(args: {
|
|
|
entity: EntityType;
|
|
|
- cursor: SyncCursor;
|
|
|
fullSyncCompleted: boolean;
|
|
|
}) {
|
|
|
const now = new Date();
|
|
|
@@ -604,7 +854,6 @@ export class ProviderVideoSyncService {
|
|
|
await this.mongo.syncState.update({
|
|
|
where: { entity: args.entity },
|
|
|
data: {
|
|
|
- referId: JSON.stringify(args.cursor),
|
|
|
lastRunAt: now,
|
|
|
lastFullSyncAt: args.fullSyncCompleted ? now : undefined,
|
|
|
updatedAt: nowSec,
|
|
|
@@ -612,38 +861,45 @@ export class ProviderVideoSyncService {
|
|
|
});
|
|
|
}
|
|
|
|
|
|
+ private async loadCheckpoint(entity: EntityType): Promise<{
|
|
|
+ updatedAtCursor?: string;
|
|
|
+ }> {
|
|
|
+ const nowSec = Math.floor(Date.now() / 1000);
|
|
|
+
|
|
|
+ const state = await this.mongo.syncState.upsert({
|
|
|
+ where: { entity },
|
|
|
+ update: {
|
|
|
+ updatedAt: nowSec,
|
|
|
+ },
|
|
|
+ create: {
|
|
|
+ entity,
|
|
|
+ referId: null,
|
|
|
+ lastRunAt: null,
|
|
|
+ lastFullSyncAt: null,
|
|
|
+ createdAt: nowSec,
|
|
|
+ updatedAt: nowSec,
|
|
|
+ },
|
|
|
+ });
|
|
|
+
|
|
|
+ const parsed = this.safeParseCursor(state.referId);
|
|
|
+ return { updatedAtCursor: parsed?.updatedAtCursor };
|
|
|
+ }
|
|
|
+
|
|
|
private async saveCheckpoint(args: {
|
|
|
entity: EntityType;
|
|
|
- nextUpdatedAtCursor?: string; // ONLY when batch completed
|
|
|
+ updatedAtCursor?: string | null;
|
|
|
fullSyncCompleted: boolean;
|
|
|
}) {
|
|
|
const now = new Date();
|
|
|
const nowSec = Math.floor(Date.now() / 1000);
|
|
|
|
|
|
- // Build referId safely (do not overwrite blindly)
|
|
|
- const state = await this.mongo.syncState.findUnique({
|
|
|
- where: { entity: args.entity },
|
|
|
- select: { referId: true },
|
|
|
- });
|
|
|
-
|
|
|
- let persisted: { updatedAtCursor?: string } = {};
|
|
|
- if (state?.referId) {
|
|
|
- try {
|
|
|
- persisted = JSON.parse(state.referId);
|
|
|
- } catch {
|
|
|
- persisted = {};
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- // Only commit updatedAtCursor when explicitly provided
|
|
|
- if (args.nextUpdatedAtCursor) {
|
|
|
- persisted.updatedAtCursor = args.nextUpdatedAtCursor;
|
|
|
- }
|
|
|
-
|
|
|
await this.mongo.syncState.update({
|
|
|
where: { entity: args.entity },
|
|
|
data: {
|
|
|
- referId: JSON.stringify(persisted),
|
|
|
+ referId:
|
|
|
+ args.updatedAtCursor !== undefined && args.updatedAtCursor !== null
|
|
|
+ ? JSON.stringify({ updatedAtCursor: args.updatedAtCursor })
|
|
|
+ : null,
|
|
|
lastRunAt: now,
|
|
|
lastFullSyncAt: args.fullSyncCompleted ? now : undefined,
|
|
|
updatedAt: nowSec,
|
|
|
@@ -766,85 +1022,100 @@ export class ProviderVideoSyncService {
|
|
|
private async upsertSecondTagsFromVideos_NoUniqueName(
|
|
|
normalizedVideos: Array<{ secondTags?: string[] }>,
|
|
|
): Promise<UpsertTagsResult> {
|
|
|
- const set = new Set<string>();
|
|
|
-
|
|
|
- for (const v of normalizedVideos) {
|
|
|
- const tags = v.secondTags ?? [];
|
|
|
- for (const t of tags) {
|
|
|
- if (typeof t !== 'string') continue;
|
|
|
- const name = t.trim();
|
|
|
- if (!name) continue;
|
|
|
- set.add(name);
|
|
|
+ try {
|
|
|
+ const set = new Set<string>();
|
|
|
+
|
|
|
+ for (const v of normalizedVideos) {
|
|
|
+ const tags = v.secondTags ?? [];
|
|
|
+ for (const t of tags) {
|
|
|
+ if (typeof t !== 'string') continue;
|
|
|
+ const name = t.trim();
|
|
|
+ if (!name) continue;
|
|
|
+ set.add(name);
|
|
|
+ }
|
|
|
}
|
|
|
- }
|
|
|
-
|
|
|
- const names = Array.from(set);
|
|
|
- if (!names.length)
|
|
|
- return { unique: 0, upserted: 0, skipped: 0, errors: [] };
|
|
|
-
|
|
|
- // Concurrency limit to reduce race collisions and DB pressure
|
|
|
- const CONCURRENCY = 20;
|
|
|
- let idx = 0;
|
|
|
-
|
|
|
- let upserted = 0;
|
|
|
- let skipped = 0;
|
|
|
- const errors: Array<{ name: string; error: string }> = [];
|
|
|
-
|
|
|
- const worker = async () => {
|
|
|
- while (true) {
|
|
|
- const current = idx;
|
|
|
- idx += 1;
|
|
|
- if (current >= names.length) return;
|
|
|
|
|
|
- const name = names[current];
|
|
|
-
|
|
|
- try {
|
|
|
- // 1) check existence by name (NOT unique)
|
|
|
- const exists = await this.mongo.tag.findFirst({
|
|
|
- where: { name },
|
|
|
- select: { id: true },
|
|
|
- });
|
|
|
-
|
|
|
- if (exists?.id) {
|
|
|
- // already exists
|
|
|
- continue;
|
|
|
+ const names = Array.from(set);
|
|
|
+ if (!names.length)
|
|
|
+ return { unique: 0, upserted: 0, skipped: 0, errors: [] };
|
|
|
+
|
|
|
+ // Concurrency limit to reduce race collisions and DB pressure
|
|
|
+ const CONCURRENCY = 20;
|
|
|
+ let idx = 0;
|
|
|
+
|
|
|
+ let upserted = 0;
|
|
|
+ let skipped = 0;
|
|
|
+ const errors: Array<{ name: string; error: string }> = [];
|
|
|
+
|
|
|
+ const worker = async () => {
|
|
|
+ while (true) {
|
|
|
+ const current = idx;
|
|
|
+ idx += 1;
|
|
|
+ if (current >= names.length) return;
|
|
|
+
|
|
|
+ const name = names[current];
|
|
|
+
|
|
|
+ try {
|
|
|
+ // 1) check existence by name (NOT unique)
|
|
|
+ const exists = await this.mongo.tag.findFirst({
|
|
|
+ where: { name },
|
|
|
+ select: { id: true },
|
|
|
+ });
|
|
|
+
|
|
|
+ if (exists?.id) {
|
|
|
+ // already exists
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 2) create if not exists
|
|
|
+ await this.mongo.tag.create({
|
|
|
+ data: {
|
|
|
+ name,
|
|
|
+ // If your Tag schema requires seconds fields:
|
|
|
+ // createdAt: Math.floor(Date.now() / 1000),
|
|
|
+ // updatedAt: Math.floor(Date.now() / 1000),
|
|
|
+ },
|
|
|
+ });
|
|
|
+
|
|
|
+ upserted += 1;
|
|
|
+ } catch (e: any) {
|
|
|
+ // If another worker created it after our check, create may fail (duplicate on some index)
|
|
|
+ // We treat that as skipped (safe).
|
|
|
+ const msg = e?.message ?? 'Tag create failed';
|
|
|
+ skipped += 1;
|
|
|
+ errors.push({ name, error: msg });
|
|
|
}
|
|
|
+ }
|
|
|
+ };
|
|
|
|
|
|
- // 2) create if not exists
|
|
|
- await this.mongo.tag.create({
|
|
|
- data: {
|
|
|
- name,
|
|
|
- // If your Tag schema requires seconds fields:
|
|
|
- // createdAt: Math.floor(Date.now() / 1000),
|
|
|
- // updatedAt: Math.floor(Date.now() / 1000),
|
|
|
- },
|
|
|
- });
|
|
|
+ await Promise.all(Array.from({ length: CONCURRENCY }, () => worker()));
|
|
|
|
|
|
- upserted += 1;
|
|
|
- } catch (e: any) {
|
|
|
- // If another worker created it after our check, create may fail (duplicate on some index)
|
|
|
- // We treat that as skipped (safe).
|
|
|
- const msg = e?.message ?? 'Tag create failed';
|
|
|
- skipped += 1;
|
|
|
- errors.push({ name, error: msg });
|
|
|
- }
|
|
|
+ if (errors.length) {
|
|
|
+ this.logger.warn(
|
|
|
+ `[upsertSecondTagsFromVideos] errors=${errors.length}, sample=${JSON.stringify(
|
|
|
+ errors.slice(0, 3),
|
|
|
+ )}`,
|
|
|
+ );
|
|
|
+ } else {
|
|
|
+ this.logger.log(
|
|
|
+ `[upsertSecondTagsFromVideos] unique=${names.length} created=${upserted}`,
|
|
|
+ );
|
|
|
}
|
|
|
- };
|
|
|
|
|
|
- await Promise.all(Array.from({ length: CONCURRENCY }, () => worker()));
|
|
|
-
|
|
|
- if (errors.length) {
|
|
|
- this.logger.warn(
|
|
|
- `[upsertSecondTagsFromVideos] errors=${errors.length}, sample=${JSON.stringify(
|
|
|
- errors.slice(0, 3),
|
|
|
- )}`,
|
|
|
- );
|
|
|
- } else {
|
|
|
- this.logger.log(
|
|
|
- `[upsertSecondTagsFromVideos] unique=${names.length} created=${upserted}`,
|
|
|
+ return { unique: names.length, upserted, skipped, errors };
|
|
|
+ } catch (error: any) {
|
|
|
+ const message = error?.message ?? 'Unhandled tag upsert error';
|
|
|
+ const trace = error?.stack ?? undefined;
|
|
|
+ this.logger.error(
|
|
|
+ `[upsertSecondTagsFromVideos_NoUniqueName] ${message}`,
|
|
|
+ trace,
|
|
|
);
|
|
|
+ return {
|
|
|
+ unique: 0,
|
|
|
+ upserted: 0,
|
|
|
+ skipped: 0,
|
|
|
+ errors: [{ name: 'global', error: message }],
|
|
|
+ };
|
|
|
}
|
|
|
-
|
|
|
- return { unique: names.length, upserted, skipped, errors };
|
|
|
}
|
|
|
}
|