Sfoglia il codice sorgente

feat(provider-video-sync): enhance sync functionality with improved pagination and checkpoint handling

Dave 3 mesi fa
parent
commit
8425201cf0

+ 24 - 7
apps/box-mgnt-api/src/mgnt-backend/feature/provider-video-sync/provider-video-sync.controller.ts

@@ -94,8 +94,8 @@ export class ProviderVideoSyncController {
     summary: 'Trigger provider video sync (full sync or incremental)',
     description: [
       'POST body supports optional parameters: providerCode, fullSync, resetState, pageNum, pageSize, param.',
-      'Incremental sync: send param.updatedAt (ISO string).',
-      'Full sync: set fullSync=true; service ignores param.updatedAt and resumes by pageNum cursor.',
+      'Full sync: set fullSync=true; service ignores updatedAt filters and starts from the provided pageNum without persisting page/pageSize cursors.',
+      'Incremental sync: by default uses the stored checkpoint updatedAtCursor, override with param.updatedAt; the first-ever incremental (no checkpoint) runs a bottom→top baseline partial before resuming.',
     ].join('\n'),
   })
   @ApiBody({ type: ProviderVideoSyncRunDto })
@@ -128,7 +128,7 @@ export class ProviderVideoSyncController {
   @ApiOperation({
     summary: 'Trigger incremental sync quickly (query params)',
     description:
-      'Convenience endpoint. Use /run for full control. Incremental uses param.updatedAt.',
+      'Convenience endpoint that reuses the same body parameters; incremental uses the stored checkpoint updatedAtCursor unless you override with param.updatedAt, and the first-ever incremental (no checkpoint) runs a bottom→top baseline partial before regular paging.',
   })
   @ApiQuery({
     name: 'updatedAt',
@@ -148,7 +148,7 @@ export class ProviderVideoSyncController {
       providerCode: providerCode || undefined,
       fullSync: false,
       resetState: resetState === 'true',
-      pageSize: pageSize ? Number(pageSize) : undefined,
+      pageSize: parsePageSize(pageSize),
       param: {
         status: 'Completed',
         updatedAt: updatedAt || undefined,
@@ -162,7 +162,7 @@ export class ProviderVideoSyncController {
   @ApiOperation({
     summary: 'Trigger full sync quickly (query params)',
     description:
-      'Convenience endpoint. Full sync ignores updatedAt filter and resumes by stored pageNum cursor.',
+      'Convenience endpoint. Full sync ignores updatedAt filters and starts at the provided pageNum without relying on a persisted page cursor.',
   })
   @ApiQuery({ name: 'pageSize', required: false, description: '1..500' })
   @ApiQuery({ name: 'providerCode', required: false })
@@ -182,8 +182,8 @@ export class ProviderVideoSyncController {
       providerCode: providerCode || undefined,
       fullSync: true,
       resetState: resetState === 'true',
-      pageSize: pageSize ? Number(pageSize) : undefined,
-      pageNum: pageNum ? Number(pageNum) : undefined,
+      pageSize: parsePageSize(pageSize),
+      pageNum: parsePositiveInt(pageNum),
       param: {
         status: 'Completed',
       },
@@ -192,3 +192,20 @@ export class ProviderVideoSyncController {
     return this.service.syncFromProvider(options);
   }
 }
+
+function parsePageSize(value?: string): number | undefined {
+  if (!value) return undefined;
+  const parsed = Number(value);
+  if (!Number.isFinite(parsed)) return undefined;
+  const clamped = Math.trunc(parsed);
+  if (clamped < 1) return undefined;
+  return Math.min(500, clamped);
+}
+
+function parsePositiveInt(value?: string): number | undefined {
+  if (!value) return undefined;
+  const parsed = Number(value);
+  if (!Number.isFinite(parsed)) return undefined;
+  const clamped = Math.trunc(parsed);
+  return clamped >= 1 ? clamped : undefined;
+}

+ 366 - 127
apps/box-mgnt-api/src/mgnt-backend/feature/provider-video-sync/provider-video-sync.service.ts

@@ -114,6 +114,11 @@ type SyncCursor = {
   updatedAtCursor?: string;
 };
 
+type ProviderPagingInfo = {
+  total?: number;
+  totalPages?: number;
+};
+
 type UpsertOutcome =
   | { ok: true }
   | { ok: false; error: { id?: string; error: string } };
@@ -136,8 +141,9 @@ export class ProviderVideoSyncService {
 
   private readonly DEFAULT_PROVIDER_CODE = 'RVAKC';
   private readonly MAX_PAGE_SIZE = 500;
-  private readonly DEFAULT_PAGE_SIZE = 200;
+  private readonly DEFAULT_PAGE_SIZE = 500;
   private readonly BATCH_SIZE = 100;
+  private readonly BASELINE_PARTIAL_COUNT = 20000;
 
   constructor(
     private readonly mongo: MongoPrismaService,
@@ -166,14 +172,15 @@ export class ProviderVideoSyncService {
     );
 
     // Load cursor from SyncState (or fresh if resetState)
-    const { cursor: initialCursor } = await this.loadCursor({
-      entity,
-      pageSize,
-      resetState,
-      overridePageNum: options.pageNum,
-      optionUpdatedAt,
-      fullSync,
-    });
+    const { cursor: initialCursor, checkpointUpdatedAtCursor } =
+      await this.loadCursor({
+        entity,
+        pageSize,
+        resetState,
+        overridePageNum: options.pageNum,
+        optionUpdatedAt,
+        fullSync,
+      });
 
     // Counters (Option B: created always 0, updated counts successful upserts)
     let imported = 0;
@@ -195,16 +202,33 @@ export class ProviderVideoSyncService {
       updatedAtCursor: initialCursor.updatedAtCursor,
     };
 
+    const effectiveUpdatedAtCursor = fullSync
+      ? undefined
+      : (options.param?.updatedAt ?? checkpointUpdatedAtCursor);
+
+    const shouldRunBaselinePartial = !fullSync && !checkpointUpdatedAtCursor;
+
     try {
+      if (shouldRunBaselinePartial) {
+        const baselineResult = await this.runBaselinePartialIfNeeded({
+          entity,
+          cursor: initialCursor,
+          paramStatus,
+          optionsParam: options.param,
+        });
+        if (baselineResult) {
+          this.lastSyncSummary = baselineResult;
+          return baselineResult;
+        }
+      }
+
       while (true) {
         const body = this.buildProviderBody({
           pageNum,
           pageSize: cursor.pageSize,
           status: paramStatus,
           // fullSync: no updatedAt filter
-          updatedAt: fullSync
-            ? undefined
-            : (cursor.updatedAtCursor ?? optionUpdatedAt),
+          updatedAt: fullSync ? undefined : effectiveUpdatedAtCursor,
           extraParam: options.param,
         });
 
@@ -223,20 +247,16 @@ export class ProviderVideoSyncService {
           // - incremental: advance updatedAtCursor to maxUpdatedAtSeen, keep pageNum=1
           const fullSyncCompleted = fullSync;
 
-          if (fullSync) {
-            cursor.pageNum = 1;
-            // Optional visibility: store last seen updatedAt as updatedAtCursor too
-            if (maxUpdatedAtSeen)
-              cursor.updatedAtCursor = maxUpdatedAtSeen.toISOString();
-          } else {
-            cursor.pageNum = 1;
-            if (maxUpdatedAtSeen)
-              cursor.updatedAtCursor = maxUpdatedAtSeen.toISOString();
+          if (!fullSync && maxUpdatedAtSeen && imported > 0) {
+            await this.saveCheckpoint({
+              entity,
+              updatedAtCursor: maxUpdatedAtSeen.toISOString(),
+              fullSyncCompleted: false,
+            });
           }
 
           await this.saveCursor({
             entity,
-            cursor,
             fullSyncCompleted,
           });
 
@@ -252,60 +272,18 @@ export class ProviderVideoSyncService {
         }
 
         imported += rawList.length;
-
-        const normalized = rawList.map((item) => this.normalizeItem(item));
-
-        const hasSecondTags = normalized.some(
-          (v) => Array.isArray(v.secondTags) && v.secondTags.length > 0,
+        const processed = await this.processProviderRawList(
+          rawList,
+          maxUpdatedAtSeen,
         );
-
-        if (hasSecondTags) {
-          await this.upsertSecondTagsFromVideos_NoUniqueName(normalized);
-        }
-
-        // update maxUpdatedAtSeen for cursor advance (incremental correctness)
-        for (const n of normalized) {
-          const d = n.updatedAt as Date;
-          if (!maxUpdatedAtSeen || d.getTime() > maxUpdatedAtSeen.getTime()) {
-            maxUpdatedAtSeen = d;
-          }
-        }
-
-        // Upsert in batches (Option B)
-        for (let i = 0; i < normalized.length; i += this.BATCH_SIZE) {
-          const batch = normalized.slice(i, i + this.BATCH_SIZE);
-
-          // eslint-disable-next-line no-await-in-loop
-          const outcomes = await Promise.all(
-            batch.map((r) => this.upsertOne(r)),
-          );
-
-          const okCount = outcomes.filter((o) => o.ok).length;
-          const fail = outcomes.filter((o) => !o.ok) as Array<
-            Extract<UpsertOutcome, { ok: false }>
-          >;
-
-          updated += okCount;
-          skipped += fail.length;
-          for (const f of fail) errors.push(f.error);
-        }
+        updated += processed.updated;
+        skipped += processed.skipped;
+        errors.push(...processed.errors);
+        maxUpdatedAtSeen = processed.maxUpdatedAtSeen;
 
         // Persist progress so we can resume on crash
-        if (fullSync) {
-          cursor.pageNum = pageNum + 1;
-          // Optional: keep moving max updatedAt for visibility
-          if (maxUpdatedAtSeen)
-            cursor.updatedAtCursor = maxUpdatedAtSeen.toISOString();
-        } else {
-          // incremental resumes by updatedAtCursor, so keep pageNum=1
-          cursor.pageNum = 1;
-          if (maxUpdatedAtSeen)
-            cursor.updatedAtCursor = maxUpdatedAtSeen.toISOString();
-        }
-
         await this.saveCursor({
           entity,
-          cursor,
           fullSyncCompleted: false,
         });
 
@@ -318,13 +296,8 @@ export class ProviderVideoSyncService {
 
       // Best-effort cursor persistence
       try {
-        if (!fullSync && maxUpdatedAtSeen) {
-          cursor.pageNum = 1;
-          cursor.updatedAtCursor = maxUpdatedAtSeen.toISOString();
-        }
         await this.saveCursor({
           entity,
-          cursor,
           fullSyncCompleted: false,
         });
       } catch (saveErr: any) {
@@ -348,6 +321,166 @@ export class ProviderVideoSyncService {
     }
   }
 
+  private async runBaselinePartialIfNeeded(args: {
+    entity: EntityType;
+    cursor: SyncCursor;
+    paramStatus: string;
+    optionsParam?: ProviderVideoSyncOptions['param'];
+  }): Promise<ProviderVideoSyncResult | null> {
+    if (!args.cursor || args.cursor.updatedAtCursor !== undefined) {
+      return null;
+    }
+
+    const probeBody = this.buildProviderBody({
+      pageNum: 1,
+      pageSize: args.cursor.pageSize,
+      status: args.paramStatus,
+      updatedAt: undefined,
+      extraParam: args.optionsParam,
+    });
+
+    const pagination = await this.probeProviderForPaging(probeBody);
+
+    let totalPages = pagination.totalPages;
+    if (totalPages === undefined && pagination.total !== undefined) {
+      totalPages = Math.max(
+        0,
+        Math.ceil(pagination.total / args.cursor.pageSize),
+      );
+    }
+
+    if (!totalPages || totalPages < 1) {
+      this.logger.warn(
+        '[syncFromProvider] Baseline partial skipped because provider did not disclose total/pages; cannot compute bottom→top range.',
+      );
+      return null;
+    }
+
+    const pagesNeeded = Math.min(
+      totalPages,
+      Math.ceil(this.BASELINE_PARTIAL_COUNT / args.cursor.pageSize),
+    );
+
+    if (pagesNeeded <= 0) {
+      return null;
+    }
+
+    const startPage = totalPages;
+    const endPage = Math.max(1, totalPages - pagesNeeded + 1);
+
+    this.logger.log(
+      `[syncFromProvider] Baseline partial (first-ever) running pages ${startPage} down to ${endPage}`,
+    );
+
+    let imported = 0;
+    let updated = 0;
+    let skipped = 0;
+    const errors: Array<{ id?: string; error: string }> = [];
+    let maxUpdatedAtSeen: Date | null = null;
+
+    for (let page = startPage; page >= endPage; page -= 1) {
+      const body = this.buildProviderBody({
+        pageNum: page,
+        pageSize: args.cursor.pageSize,
+        status: args.paramStatus,
+        updatedAt: undefined,
+        extraParam: args.optionsParam,
+      });
+
+      this.logger.log(`[syncFromProvider] param body ${JSON.stringify(body)} `);
+
+      const rawList = await this.fetchPage(body);
+      if (!rawList.length) {
+        this.logger.log(
+          `[syncFromProvider] Baseline partial page ${page} returned 0 records; continuing.`,
+        );
+        continue;
+      }
+
+      imported += rawList.length;
+      const processed = await this.processProviderRawList(
+        rawList,
+        maxUpdatedAtSeen,
+      );
+      updated += processed.updated;
+      skipped += processed.skipped;
+      errors.push(...processed.errors);
+      maxUpdatedAtSeen = processed.maxUpdatedAtSeen;
+    }
+
+    if (maxUpdatedAtSeen && imported > 0) {
+      await this.saveCheckpoint({
+        entity: args.entity,
+        updatedAtCursor: maxUpdatedAtSeen.toISOString(),
+        fullSyncCompleted: false,
+      });
+    }
+
+    return {
+      imported,
+      created: 0,
+      updated,
+      skipped,
+      errors: errors.length ? errors.slice(0, 10) : undefined,
+    };
+  }
+
+  private async probeProviderForPaging(body: {
+    pageNum: number;
+    pageSize: number;
+    param: Record<string, any>;
+  }): Promise<ProviderPagingInfo> {
+    try {
+      const response = await firstValueFrom(
+        this.httpService.post(this.PROVIDER_API_URL, body, {
+          headers: { 'Content-Type': 'application/json' },
+          timeout: 30000,
+        }),
+      );
+
+      return {
+        total: this.extractNumberFromPaths(response.data, [
+          'total',
+          'data.total',
+          'data.totalCount',
+          'data.pageInfo.total',
+          'data.pageInfo.totalCount',
+        ]),
+        totalPages: this.extractNumberFromPaths(response.data, [
+          'pages',
+          'data.pages',
+          'data.totalPages',
+          'data.pageInfo.pages',
+          'data.pageInfo.totalPages',
+        ]),
+      };
+    } catch (error: any) {
+      this.logger.error(
+        `[probeProviderForPaging] Provider API call failed: ${error?.message ?? error}`,
+      );
+      throw new Error(`Provider API error: ${error?.message ?? 'unknown'}`);
+    }
+  }
+
+  private extractNumberFromPaths(
+    data: any,
+    paths: string[],
+  ): number | undefined {
+    if (!data || typeof data !== 'object') return undefined;
+    for (const path of paths) {
+      const value = path
+        .split('.')
+        .reduce<any>(
+          (obj, key) => (obj && typeof obj === 'object' ? obj[key] : undefined),
+          data,
+        );
+      if (value === undefined || value === null) continue;
+      const num = typeof value === 'number' ? value : Number(value);
+      if (Number.isFinite(num)) return num;
+    }
+    return undefined;
+  }
+
   getLastSyncSummary(): ProviderVideoSyncResult | null {
     return this.lastSyncSummary;
   }
@@ -386,21 +519,49 @@ export class ProviderVideoSyncService {
       param,
     };
   }
-
   private async fetchPage(body: {
     pageNum: number;
     pageSize: number;
     param: Record<string, any>;
   }): Promise<RawProviderVideo[]> {
     try {
+      // Provider expects { data: "<json string>" } (based on code=400 Field=data expecting string)
+      const wrappedBody = {
+        data: JSON.stringify({
+          pageNum: body.pageNum,
+          pageSize: body.pageSize,
+          param: body.param,
+        }),
+      };
+
       const response = await firstValueFrom(
-        this.httpService.post(this.PROVIDER_API_URL, body, {
+        this.httpService.post(this.PROVIDER_API_URL, wrappedBody, {
           headers: { 'Content-Type': 'application/json' },
-          timeout: 30000,
+          timeout: 30_000,
         }),
       );
 
-      const list = this.extractList(response.data);
+      // Axios response unwrap: providerJson is the actual provider payload
+      const providerJson = (response as any)?.data ?? response;
+
+      // Log a small preview for debugging (avoid huge logs)
+      this.logger.log(
+        `[fetchPage] Provider response preview: ${JSON.stringify(
+          providerJson,
+        ).slice(0, 400)}...`,
+      );
+
+      // Fail fast on provider errors (prevents "successful" runs with empty lists)
+      const code = (providerJson as any)?.code;
+      if (code !== 200) {
+        const msg = (providerJson as any)?.msg ?? 'unknown';
+        const tip = (providerJson as any)?.tip ?? '';
+        throw new Error(
+          `Provider error code=${code} msg=${msg}${tip ? ` tip=${tip}` : ''}`,
+        );
+      }
+
+      const list = this.extractList(providerJson);
       this.logger.log(`[fetchPage] Received ${list.length} items`);
       return list;
     } catch (error: any) {
@@ -411,11 +572,101 @@ export class ProviderVideoSyncService {
     }
   }
 
+  private async processProviderRawList(
+    rawList: RawProviderVideo[],
+    currentMaxUpdatedAt: Date | null,
+  ): Promise<{
+    updated: number;
+    skipped: number;
+    errors: Array<{ id?: string; error: string }>;
+    maxUpdatedAtSeen: Date | null;
+  }> {
+    if (!rawList.length) {
+      return {
+        updated: 0,
+        skipped: 0,
+        errors: [],
+        maxUpdatedAtSeen: currentMaxUpdatedAt,
+      };
+    }
+
+    const normalized = rawList.map((item) => this.normalizeItem(item));
+
+    const hasSecondTags = normalized.some(
+      (v) => Array.isArray(v.secondTags) && v.secondTags.length > 0,
+    );
+
+    if (hasSecondTags) {
+      await this.upsertSecondTagsFromVideos_NoUniqueName(normalized);
+    }
+
+    let maxUpdatedAtSeen = currentMaxUpdatedAt;
+    for (const n of normalized) {
+      const d = n.updatedAt as Date;
+      if (!maxUpdatedAtSeen || d.getTime() > maxUpdatedAtSeen.getTime()) {
+        maxUpdatedAtSeen = d;
+      }
+    }
+
+    let updated = 0;
+    let skipped = 0;
+    const errors: Array<{ id?: string; error: string }> = [];
+
+    for (let i = 0; i < normalized.length; i += this.BATCH_SIZE) {
+      const batch = normalized.slice(i, i + this.BATCH_SIZE);
+
+      // eslint-disable-next-line no-await-in-loop
+      const outcomes = await Promise.all(batch.map((r) => this.upsertOne(r)));
+
+      const okCount = outcomes.filter((o) => o.ok).length;
+      const fail = outcomes.filter((o) => !o.ok) as Array<
+        Extract<UpsertOutcome, { ok: false }>
+      >;
+
+      updated += okCount;
+      skipped += fail.length;
+      for (const f of fail) errors.push(f.error);
+    }
+
+    return {
+      updated,
+      skipped,
+      errors,
+      maxUpdatedAtSeen,
+    };
+  }
+
+  private debugRespShape(resp: unknown) {
+    const r: any = resp as any;
+    const keys = r && typeof r === 'object' ? Object.keys(r).slice(0, 12) : [];
+    const dataKeys =
+      r?.data && typeof r.data === 'object'
+        ? Object.keys(r.data).slice(0, 12)
+        : [];
+    const dataDataKeys =
+      r?.data?.data && typeof r.data.data === 'object'
+        ? Object.keys(r.data.data).slice(0, 12)
+        : [];
+    this.logger.warn(
+      `[debugRespShape] topKeys=${JSON.stringify(keys)} dataKeys=${JSON.stringify(
+        dataKeys,
+      )} dataDataKeys=${JSON.stringify(dataDataKeys)} hasStatus=${Boolean(r?.status)} hasCode=${Boolean(
+        r?.code,
+      )} hasDataCode=${Boolean(r?.data?.code)}`,
+    );
+  }
+
   private extractList(apiResponse: unknown): RawProviderVideo[] {
     const data = apiResponse as any;
 
     if (Array.isArray(data)) return data as RawProviderVideo[];
 
+    // ✅ axios response: { data: { code, data: { total, list } } }
+    if (data?.data?.data?.list && Array.isArray(data.data.data.list)) {
+      return data.data.data.list as RawProviderVideo[];
+    }
+
+    // provider json directly: { code, data: { total, list } }
     if (data?.data?.list && Array.isArray(data.data.list)) {
       return data.data.list as RawProviderVideo[];
     }
@@ -551,9 +802,7 @@ export class ProviderVideoSyncService {
     overridePageNum?: number;
     optionUpdatedAt?: string;
     fullSync: boolean;
-  }): Promise<{ cursor: SyncCursor; hasState: boolean }> {
-    const nowSec = Math.floor(Date.now() / 1000);
-
+  }): Promise<{ cursor: SyncCursor; checkpointUpdatedAtCursor?: string }> {
     if (args.resetState) {
       return {
         cursor: {
@@ -561,41 +810,25 @@ export class ProviderVideoSyncService {
           pageSize: args.pageSize,
           updatedAtCursor: args.fullSync ? undefined : args.optionUpdatedAt,
         },
-        hasState: false,
+        checkpointUpdatedAtCursor: undefined,
       };
     }
 
-    const state = await this.mongo.syncState.upsert({
-      where: { entity: args.entity },
-      update: {
-        updatedAt: nowSec,
-      },
-      create: {
-        entity: args.entity,
-        referId: null,
-        lastRunAt: null,
-        lastFullSyncAt: null,
-        createdAt: nowSec,
-        updatedAt: nowSec,
-      },
-    });
-
-    const parsed = this.safeParseCursor(state.referId);
+    const checkpoint = await this.loadCheckpoint(args.entity);
 
     const cursor: SyncCursor = {
-      pageNum: args.overridePageNum ?? parsed?.pageNum ?? 1,
+      pageNum: args.overridePageNum ?? 1,
       pageSize: args.pageSize,
       updatedAtCursor: args.fullSync
         ? undefined
-        : (parsed?.updatedAtCursor ?? args.optionUpdatedAt),
+        : (checkpoint.updatedAtCursor ?? args.optionUpdatedAt),
     };
 
-    return { cursor, hasState: Boolean(state.referId) };
+    return { cursor, checkpointUpdatedAtCursor: checkpoint.updatedAtCursor };
   }
 
   private async saveCursor(args: {
     entity: EntityType;
-    cursor: SyncCursor;
     fullSyncCompleted: boolean;
   }) {
     const now = new Date();
@@ -604,7 +837,6 @@ export class ProviderVideoSyncService {
     await this.mongo.syncState.update({
       where: { entity: args.entity },
       data: {
-        referId: JSON.stringify(args.cursor),
         lastRunAt: now,
         lastFullSyncAt: args.fullSyncCompleted ? now : undefined,
         updatedAt: nowSec,
@@ -612,38 +844,45 @@ export class ProviderVideoSyncService {
     });
   }
 
+  private async loadCheckpoint(entity: EntityType): Promise<{
+    updatedAtCursor?: string;
+  }> {
+    const nowSec = Math.floor(Date.now() / 1000);
+
+    const state = await this.mongo.syncState.upsert({
+      where: { entity },
+      update: {
+        updatedAt: nowSec,
+      },
+      create: {
+        entity,
+        referId: null,
+        lastRunAt: null,
+        lastFullSyncAt: null,
+        createdAt: nowSec,
+        updatedAt: nowSec,
+      },
+    });
+
+    const parsed = this.safeParseCursor(state.referId);
+    return { updatedAtCursor: parsed?.updatedAtCursor };
+  }
+
   private async saveCheckpoint(args: {
     entity: EntityType;
-    nextUpdatedAtCursor?: string; // ONLY when batch completed
+    updatedAtCursor?: string | null;
     fullSyncCompleted: boolean;
   }) {
     const now = new Date();
     const nowSec = Math.floor(Date.now() / 1000);
 
-    // Build referId safely (do not overwrite blindly)
-    const state = await this.mongo.syncState.findUnique({
-      where: { entity: args.entity },
-      select: { referId: true },
-    });
-
-    let persisted: { updatedAtCursor?: string } = {};
-    if (state?.referId) {
-      try {
-        persisted = JSON.parse(state.referId);
-      } catch {
-        persisted = {};
-      }
-    }
-
-    // Only commit updatedAtCursor when explicitly provided
-    if (args.nextUpdatedAtCursor) {
-      persisted.updatedAtCursor = args.nextUpdatedAtCursor;
-    }
-
     await this.mongo.syncState.update({
       where: { entity: args.entity },
       data: {
-        referId: JSON.stringify(persisted),
+        referId:
+          args.updatedAtCursor !== undefined && args.updatedAtCursor !== null
+            ? JSON.stringify({ updatedAtCursor: args.updatedAtCursor })
+            : null,
         lastRunAt: now,
         lastFullSyncAt: args.fullSyncCompleted ? now : undefined,
         updatedAt: nowSec,