Просмотр исходного кода

feat(provider-video-sync): enhance sync options with pagination and state management
feat(prisma): add sync state and notification models for video synchronization

Dave 1 месяц назад
Родитель
Сommit
c3842abf5b

+ 397 - 193
apps/box-mgnt-api/src/mgnt-backend/feature/provider-video-sync/provider-video-sync.service.ts

@@ -3,18 +3,55 @@ import { Injectable, Logger } from '@nestjs/common';
 import { HttpService } from '@nestjs/axios';
 import { MongoPrismaService } from '@box/db/prisma/mongo-prisma.service';
 import { firstValueFrom } from 'rxjs';
+import { EntityType } from '@prisma/mongo/client';
 
 export interface ProviderVideoSyncOptions {
   providerCode?: string;
-  page?: number;
+
+  /**
+   * Optional override. In normal usage, we resume from SyncState cursor:
+   * - fullSync: pageNum resumes
+   * - incremental: always pageNum=1
+   */
+  pageNum?: number;
+
+  /**
+   * Default 200, hard-capped to 500.
+   */
   pageSize?: number;
+
+  /**
+   * Provider search param.
+   * - status: required in your business rule ("Completed")
+   * - updatedAt: ISO string filter "updated after"
+   */
+  param?: {
+    status?: string;
+    updatedAt?: string;
+    [k: string]: any;
+  };
+
+  /**
+   * fullSync:
+   * - true: no param.updatedAt; resume using stored pageNum
+   * - false: use param.updatedAt (cursor); pageNum forced to 1
+   */
+  fullSync?: boolean;
+
+  /**
+   * If true: ignore stored cursor and start fresh.
+   * - fullSync: pageNum from options.pageNum or 1
+   * - incremental: updatedAtCursor from options.param.updatedAt (if provided)
+   */
+  resetState?: boolean;
+
   [key: string]: any;
 }
 
 export interface ProviderVideoSyncResult {
   imported: number;
-  created: number;
-  updated: number;
+  created: number; // Option B: always 0
+  updated: number; // Option B: equals successful upserts
   skipped: number;
   errors?: Array<{ id?: string; error: string }>;
 }
@@ -71,6 +108,16 @@ interface RawProviderVideo {
   infoTsName?: string;
 }
 
+type SyncCursor = {
+  pageNum: number;
+  pageSize: number;
+  updatedAtCursor?: string;
+};
+
+type UpsertOutcome =
+  | { ok: true }
+  | { ok: false; error: { id?: string; error: string } };
+
 @Injectable()
 export class ProviderVideoSyncService {
   private readonly logger = new Logger(ProviderVideoSyncService.name);
@@ -80,218 +127,279 @@ export class ProviderVideoSyncService {
   private readonly PROVIDER_API_URL =
     'https://vm.rvakc.xyz/api/web/mediafile/search';
 
+  private readonly DEFAULT_PROVIDER_CODE = 'RVAKC';
+  private readonly MAX_PAGE_SIZE = 500;
+  private readonly DEFAULT_PAGE_SIZE = 200;
+  private readonly BATCH_SIZE = 100;
+
   constructor(
     private readonly mongo: MongoPrismaService,
     private readonly httpService: HttpService,
   ) {}
 
-  /**
-   * Sync video media from provider(s).
-   * Fetches data from external provider API, normalizes it, and upserts to MongoDB.
-   *
-   * @param options Configuration options (providerCode, page, pageSize, etc.)
-   * @returns Sync result with counts and errors
-   */
   async syncFromProvider(
     options: ProviderVideoSyncOptions = {},
   ): Promise<ProviderVideoSyncResult> {
-    this.logger.log('[syncFromProvider] Starting provider video sync');
-    this.logger.debug('[syncFromProvider] Options:', options);
+    const providerCode = options.providerCode ?? this.DEFAULT_PROVIDER_CODE;
 
-    const { providerCode = 'RVAKC', page = 1, pageSize = 100 } = options;
+    const requestedPageSize = options.pageSize ?? this.DEFAULT_PAGE_SIZE;
+    const pageSize = this.clampInt(requestedPageSize, 1, this.MAX_PAGE_SIZE);
 
-    try {
-      // Fetch data from provider API
-      this.logger.log(
-        `[syncFromProvider] Fetching from provider API: ${this.PROVIDER_API_URL}`,
-      );
+    const fullSync = options.fullSync ?? false;
+    const resetState = options.resetState ?? false;
 
-      let rawList: RawProviderVideo[] = [];
+    const paramStatus = options.param?.status ?? 'Completed';
+    const optionUpdatedAt = options.param?.updatedAt;
 
-      try {
-        const response = await firstValueFrom(
-          this.httpService.get(this.PROVIDER_API_URL, {
-            params: {
-              page,
-              pageSize,
-              providerCode,
-            },
-            timeout: 30000,
-          }),
-        );
+    // Only one entity exists in your enum now
+    const entity = EntityType.VIDEO;
 
-        rawList = this.extractList(response.data);
-        this.logger.log(
-          `[syncFromProvider] Extracted ${rawList.length} items from provider API`,
-        );
-      } catch (error: any) {
-        this.logger.error(
-          `[syncFromProvider] Provider API call failed: ${error.message}`,
-        );
-        const result: ProviderVideoSyncResult = {
-          imported: 0,
-          created: 0,
-          updated: 0,
-          skipped: 0,
-          errors: [{ error: `Provider API error: ${error.message}` }],
-        };
-        this.lastSyncSummary = result;
-        return result;
-      }
+    this.logger.log(
+      `[syncFromProvider] Start provider=${providerCode} entity=${entity} fullSync=${fullSync} pageSize=${pageSize} resetState=${resetState}`,
+    );
 
-      if (!rawList.length) {
-        this.logger.log('[syncFromProvider] No videos from provider to sync');
-        const result: ProviderVideoSyncResult = {
-          imported: 0,
-          created: 0,
-          updated: 0,
-          skipped: 0,
-        };
-        this.lastSyncSummary = result;
-        return result;
-      }
+    // Load cursor from SyncState (or fresh if resetState)
+    const { cursor: initialCursor } = await this.loadCursor({
+      entity,
+      pageSize,
+      resetState,
+      overridePageNum: options.pageNum,
+      optionUpdatedAt,
+      fullSync,
+    });
+
+    // Counters (Option B: created always 0, updated counts successful upserts)
+    let imported = 0;
+    let updated = 0;
+    let skipped = 0;
+    const created = 0;
+    const errors: Array<{ id?: string; error: string }> = [];
+
+    // Track max updatedAt seen (for incremental cursor advancement)
+    let maxUpdatedAtSeen: Date | null = null;
+
+    // Full sync resumes with pageNum; incremental always starts at 1
+    let pageNum = fullSync ? initialCursor.pageNum : 1;
+
+    // Keep a working cursor that we will persist as we go
+    const cursor: SyncCursor = {
+      pageNum,
+      pageSize: initialCursor.pageSize,
+      updatedAtCursor: initialCursor.updatedAtCursor,
+    };
 
-      // Normalize items
-      let normalized: any[] = [];
+    try {
+      while (true) {
+        const body = this.buildProviderBody({
+          pageNum,
+          pageSize: cursor.pageSize,
+          status: paramStatus,
+          // fullSync: no updatedAt filter
+          updatedAt: fullSync
+            ? undefined
+            : (cursor.updatedAtCursor ?? optionUpdatedAt),
+          extraParam: options.param,
+        });
 
-      try {
-        normalized = rawList.map((item) => this.normalizeItem(item));
         this.logger.log(
-          `[syncFromProvider] Ready to import ${normalized.length} records`,
+          `[syncFromProvider] POST pageNum=${pageNum} pageSize=${cursor.pageSize} status=${paramStatus} updatedAt=${fullSync ? '(none)' : (body.param.updatedAt ?? '(none)')}`,
         );
-        this.logger.debug(
-          '[syncFromProvider] First record sample:',
-          normalized[0],
-        );
-      } catch (error: any) {
-        this.logger.error(
-          `[syncFromProvider] Normalization failed: ${error.message}`,
-        );
-        const result: ProviderVideoSyncResult = {
-          imported: rawList.length,
-          created: 0,
-          updated: 0,
-          skipped: rawList.length,
-          errors: [{ error: `Normalization error: ${error.message}` }],
-        };
-        this.lastSyncSummary = result;
-        return result;
-      }
 
-      // Batch processing - try to create each record individually and catch duplicate errors
-      const BATCH_SIZE = 100;
-      let created = 0;
-      let updated = 0;
-      let skipped = 0;
-      const errors: Array<{ id?: string; error: string }> = [];
-
-      for (let i = 0; i < normalized.length; i += BATCH_SIZE) {
-        const batch = normalized.slice(i, i + BATCH_SIZE);
-        this.logger.debug(
-          `[syncFromProvider] Processing batch ${i / BATCH_SIZE + 1}, size: ${batch.length}`,
-        );
+        const rawList = await this.fetchPage(body);
+        if (!rawList.length) {
+          this.logger.log(
+            `[syncFromProvider] No more records (pageNum=${pageNum}). Stop.`,
+          );
+
+          // On completion:
+          // - fullSync: reset pageNum to 1 and set lastFullSyncAt
+          // - incremental: advance updatedAtCursor to maxUpdatedAtSeen, keep pageNum=1
+          const fullSyncCompleted = fullSync;
+
+          if (fullSync) {
+            cursor.pageNum = 1;
+            // Optional visibility: store last seen updatedAt as updatedAtCursor too
+            if (maxUpdatedAtSeen)
+              cursor.updatedAtCursor = maxUpdatedAtSeen.toISOString();
+          } else {
+            cursor.pageNum = 1;
+            if (maxUpdatedAtSeen)
+              cursor.updatedAtCursor = maxUpdatedAtSeen.toISOString();
+          }
+
+          await this.saveCursor({
+            entity,
+            cursor,
+            fullSyncCompleted,
+          });
+
+          const result: ProviderVideoSyncResult = {
+            imported,
+            created,
+            updated,
+            skipped,
+            errors: errors.length ? errors.slice(0, 10) : undefined,
+          };
+          this.lastSyncSummary = result;
+          return result;
+        }
+
+        imported += rawList.length;
+
+        const normalized = rawList.map((item) => this.normalizeItem(item));
+
+        // update maxUpdatedAtSeen for cursor advance (incremental correctness)
+        for (const n of normalized) {
+          const d = n.updatedAt as Date;
+          if (!maxUpdatedAtSeen || d.getTime() > maxUpdatedAtSeen.getTime()) {
+            maxUpdatedAtSeen = d;
+          }
+        }
+
+        // Upsert in batches (Option B)
+        for (let i = 0; i < normalized.length; i += this.BATCH_SIZE) {
+          const batch = normalized.slice(i, i + this.BATCH_SIZE);
+
+          // eslint-disable-next-line no-await-in-loop
+          const outcomes = await Promise.all(
+            batch.map((r) => this.upsertOne(r)),
+          );
+
+          const okCount = outcomes.filter((o) => o.ok).length;
+          const fail = outcomes.filter((o) => !o.ok) as Array<
+            Extract<UpsertOutcome, { ok: false }>
+          >;
+
+          updated += okCount;
+          skipped += fail.length;
+          for (const f of fail) errors.push(f.error);
+        }
+
+        // Persist progress so we can resume on crash
+        if (fullSync) {
+          cursor.pageNum = pageNum + 1;
+          // Optional: keep moving max updatedAt for visibility
+          if (maxUpdatedAtSeen)
+            cursor.updatedAtCursor = maxUpdatedAtSeen.toISOString();
+        } else {
+          // incremental resumes by updatedAtCursor, so keep pageNum=1
+          cursor.pageNum = 1;
+          if (maxUpdatedAtSeen)
+            cursor.updatedAtCursor = maxUpdatedAtSeen.toISOString();
+        }
+
+        await this.saveCursor({
+          entity,
+          cursor,
+          fullSyncCompleted: false,
+        });
+
+        pageNum += 1;
+      }
+    } catch (e: any) {
+      this.logger.error(
+        `[syncFromProvider] Unexpected error: ${e?.message ?? e}`,
+      );
 
-        // eslint-disable-next-line no-await-in-loop
-        await Promise.all(
-          batch.map(async (record) => {
-            try {
-              // Try to create the record
-              await this.mongo.videoMedia.create({ data: record });
-              // eslint-disable-next-line no-plusplus
-              created++;
-              this.logger.debug(
-                `[syncFromProvider] Created record: ${record.id}`,
-              );
-            } catch (error: any) {
-              this.logger.debug(
-                `[syncFromProvider] Create failed for ${record.id}: ${error.code} ${error.message?.substring(0, 100)}`,
-              );
-              // If duplicate key error (code 11000), try to update
-              if (
-                error.code === 11000 ||
-                error.message?.includes('duplicate')
-              ) {
-                try {
-                  const { id, ...updateData } = record;
-                  await this.mongo.videoMedia.update({
-                    where: { id },
-                    data: updateData,
-                  });
-                  // eslint-disable-next-line no-plusplus
-                  updated++;
-                  this.logger.debug(`[syncFromProvider] Updated record: ${id}`);
-                } catch (updateError: any) {
-                  this.logger.error(
-                    `[syncFromProvider] Update failed for ${record.id}: ${updateError.message}`,
-                  );
-                  // eslint-disable-next-line no-plusplus
-                  skipped++;
-                  errors.push({ id: record.id, error: updateError.message });
-                }
-              } else {
-                this.logger.error(
-                  `[syncFromProvider] Skipped ${record.id}: ${error.message}`,
-                );
-                // eslint-disable-next-line no-plusplus
-                skipped++;
-                errors.push({ id: record.id, error: error.message });
-              }
-            }
-          }),
+      // Best-effort cursor persistence
+      try {
+        if (!fullSync && maxUpdatedAtSeen) {
+          cursor.pageNum = 1;
+          cursor.updatedAtCursor = maxUpdatedAtSeen.toISOString();
+        }
+        await this.saveCursor({
+          entity,
+          cursor,
+          fullSyncCompleted: false,
+        });
+      } catch (saveErr: any) {
+        this.logger.error(
+          `[syncFromProvider] Failed to persist cursor after error: ${saveErr?.message ?? saveErr}`,
         );
       }
 
       const result: ProviderVideoSyncResult = {
-        imported: normalized.length,
+        imported,
         created,
         updated,
         skipped,
-        errors: errors.length > 0 ? errors.slice(0, 10) : undefined,
-      };
-
-      this.logger.log(
-        `[syncFromProvider] Sync complete: ${created} created, ${updated} updated, ${skipped} skipped`,
-      );
-      if (errors.length > 0) {
-        this.logger.log('[syncFromProvider] Errors:', errors.slice(0, 5));
-      }
-
-      this.lastSyncSummary = result;
-      return result;
-    } catch (error: any) {
-      this.logger.error(
-        `[syncFromProvider] Unexpected error: ${error.message}`,
-      );
-      const result: ProviderVideoSyncResult = {
-        imported: 0,
-        created: 0,
-        updated: 0,
-        skipped: 0,
-        errors: [{ error: error.message || 'Unexpected error' }],
+        errors: [
+          ...(errors.length ? errors.slice(0, 9) : []),
+          { error: e?.message ?? 'Unexpected error' },
+        ],
       };
       this.lastSyncSummary = result;
       return result;
     }
   }
 
-  /**
-   * Get the last sync summary.
-   * Returns null if no sync has been performed yet.
-   */
   getLastSyncSummary(): ProviderVideoSyncResult | null {
     return this.lastSyncSummary;
   }
 
-  /**
-   * Extracts the list of items from different possible API response shapes.
-   * Supports: { list: [...] }, { data: { list: [...] } }, or direct array
-   */
+  private buildProviderBody(args: {
+    pageNum: number;
+    pageSize: number;
+    status: string;
+    updatedAt?: string;
+    extraParam?: ProviderVideoSyncOptions['param'];
+  }) {
+    // Provider contract:
+    // {
+    //   pageNum: 1,
+    //   pageSize: 200,
+    //   param: { status: "Completed", updatedAt: "ISO" }
+    // }
+    const param: Record<string, any> = {
+      status: args.status,
+    };
+
+    // Keep only if present (incremental)
+    if (args.updatedAt) param.updatedAt = args.updatedAt;
+
+    // Merge any extraParam fields, but status/updatedAt above remain authoritative
+    if (args.extraParam && typeof args.extraParam === 'object') {
+      for (const [k, v] of Object.entries(args.extraParam)) {
+        if (k === 'status' || k === 'updatedAt') continue;
+        param[k] = v;
+      }
+    }
+
+    return {
+      pageNum: args.pageNum,
+      pageSize: args.pageSize,
+      param,
+    };
+  }
+
+  private async fetchPage(body: {
+    pageNum: number;
+    pageSize: number;
+    param: Record<string, any>;
+  }): Promise<RawProviderVideo[]> {
+    try {
+      const response = await firstValueFrom(
+        this.httpService.post(this.PROVIDER_API_URL, body, {
+          headers: { 'Content-Type': 'application/json' },
+          timeout: 30000,
+        }),
+      );
+
+      const list = this.extractList(response.data);
+      this.logger.log(`[fetchPage] Received ${list.length} items`);
+      return list;
+    } catch (error: any) {
+      this.logger.error(
+        `[fetchPage] Provider API call failed: ${error?.message ?? error}`,
+      );
+      throw new Error(`Provider API error: ${error?.message ?? 'unknown'}`);
+    }
+  }
+
   private extractList(apiResponse: unknown): RawProviderVideo[] {
     const data = apiResponse as any;
 
-    if (Array.isArray(data)) {
-      return data as RawProviderVideo[];
-    }
+    if (Array.isArray(data)) return data as RawProviderVideo[];
 
     if (data?.data?.list && Array.isArray(data.data.list)) {
       return data.data.list as RawProviderVideo[];
@@ -301,21 +409,18 @@ export class ProviderVideoSyncService {
       return data.list as RawProviderVideo[];
     }
 
+    if (data?.data?.records && Array.isArray(data.data.records)) {
+      return data.data.records as RawProviderVideo[];
+    }
+
     this.logger.warn(
       '[extractList] Unexpected API response structure, defaulting to empty list',
     );
     return [];
   }
 
-  /**
-   * Maps RawProviderVideo to Prisma videoMedia create/update input.
-   * Applies defaults and type conversions to match the Prisma model.
-   */
   private normalizeItem(item: RawProviderVideo) {
-    // Basic validation
-    if (!item.id) {
-      throw new Error('Each item must have an id');
-    }
+    if (!item.id) throw new Error('Each item must have an id');
     if (!item.addedTime || !item.createdAt || !item.updatedAt) {
       throw new Error(`Item ${item.id} is missing required datetime fields`);
     }
@@ -333,7 +438,7 @@ export class ProviderVideoSyncService {
     }
 
     return {
-      id: item.id, // String mapped to Mongo ObjectId via @db.ObjectId
+      id: item.id, // confirmed Mongo ObjectId string
 
       srcId: item.srcId ?? 0,
       title: item.title ?? '',
@@ -353,30 +458,21 @@ export class ProviderVideoSyncService {
       country: item.country ?? '',
       firstTag: item.firstTag ?? '',
 
-      // null → []
       secondTags: item.secondTags ?? [],
-
-      // null → ""
       mediaSet: item.mediaSet ?? '',
-
       preFileName: item.preFileName ?? '',
 
       status: item.status ?? '',
       desc: item.desc ?? '',
 
-      // number → BigInt
       size: BigInt(item.size ?? 0),
 
       bango: item.bango ?? '',
-
-      // null → []
       actors: item.actors ?? [],
-
       studio: item.studio ?? '',
 
       addedTime,
       appids: item.appids ?? [],
-
       japanNames: item.japanNames ?? [],
 
       filename: item.filename ?? '',
@@ -410,4 +506,112 @@ export class ProviderVideoSyncService {
       updatedAt,
     };
   }
+
+  private async upsertOne(record: any): Promise<UpsertOutcome> {
+    const id = record?.id as string | undefined;
+    if (!id) return { ok: false, error: { error: 'Missing id' } };
+
+    try {
+      const { id: _, ...updateData } = record;
+
+      await this.mongo.videoMedia.upsert({
+        where: { id },
+        create: record,
+        update: updateData,
+      });
+
+      return { ok: true };
+    } catch (e: any) {
+      return {
+        ok: false,
+        error: { id, error: e?.message ?? 'Upsert failed' },
+      };
+    }
+  }
+
+  private async loadCursor(args: {
+    entity: EntityType;
+    pageSize: number;
+    resetState: boolean;
+    overridePageNum?: number;
+    optionUpdatedAt?: string;
+    fullSync: boolean;
+  }): Promise<{ cursor: SyncCursor; hasState: boolean }> {
+    const nowSec = Math.floor(Date.now() / 1000);
+
+    if (args.resetState) {
+      return {
+        cursor: {
+          pageNum: args.overridePageNum ?? 1,
+          pageSize: args.pageSize,
+          updatedAtCursor: args.fullSync ? undefined : args.optionUpdatedAt,
+        },
+        hasState: false,
+      };
+    }
+
+    const state = await this.mongo.syncState.upsert({
+      where: { entity: args.entity },
+      update: {
+        updatedAt: nowSec,
+      },
+      create: {
+        entity: args.entity,
+        referId: null,
+        lastRunAt: null,
+        lastFullSyncAt: null,
+        createdAt: nowSec,
+        updatedAt: nowSec,
+      },
+    });
+
+    const parsed = this.safeParseCursor(state.referId);
+
+    const cursor: SyncCursor = {
+      pageNum: args.overridePageNum ?? parsed?.pageNum ?? 1,
+      pageSize: args.pageSize,
+      updatedAtCursor: args.fullSync
+        ? undefined
+        : (parsed?.updatedAtCursor ?? args.optionUpdatedAt),
+    };
+
+    return { cursor, hasState: Boolean(state.referId) };
+  }
+
+  private async saveCursor(args: {
+    entity: EntityType;
+    cursor: SyncCursor;
+    fullSyncCompleted: boolean;
+  }) {
+    const now = new Date();
+    const nowSec = Math.floor(Date.now() / 1000);
+
+    await this.mongo.syncState.update({
+      where: { entity: args.entity },
+      data: {
+        referId: JSON.stringify(args.cursor),
+        lastRunAt: now,
+        lastFullSyncAt: args.fullSyncCompleted ? now : undefined,
+        updatedAt: nowSec,
+      },
+    });
+  }
+
+  private safeParseCursor(
+    raw: string | null | undefined,
+  ): Partial<SyncCursor> | null {
+    if (!raw) return null;
+    try {
+      const parsed = JSON.parse(raw) as any;
+      if (!parsed || typeof parsed !== 'object') return null;
+      return parsed as Partial<SyncCursor>;
+    } catch {
+      return null;
+    }
+  }
+
+  private clampInt(n: number, min: number, max: number): number {
+    const x = Number.isFinite(n) ? Math.trunc(n) : min;
+    return Math.max(min, Math.min(max, x));
+  }
 }

+ 1 - 1
prisma/mongo/schema/category.prisma

@@ -3,7 +3,7 @@ model Category {
   name        String                          // 分类名称
   subtitle    String?                         // 副标题
   seq         Int        @default(0)         // 排序
-  status      Int                              // 状态 0: 禁用; 1: 启用
+  status      Int        @default(1)        // 状态 0: 禁用; 1: 启用
 
   createAt    BigInt     @default(0)          // 创建时间
   updateAt    BigInt     @default(0)          // 更新时间

+ 172 - 0
prisma/mongo/schema/video-sync.prisma

@@ -0,0 +1,172 @@
+enum EntityType {
+  VIDEO
+}
+
+enum ChangeType {
+  CREATED  // type = 0
+  UPDATED  // type = 1
+  DELETED  // type = 2
+}
+
+model SyncState {
+  id             String    @id @default(auto()) @map("_id") @db.ObjectId
+  entity         EntityType
+  referId        String?
+  lastRunAt      DateTime?
+  lastFullSyncAt DateTime?
+  createdAt      Int
+  updatedAt      Int
+
+  @@unique([entity])
+  @@map("syncState")
+}
+
+model SyncChangeNotification {
+  id                  String     @id @default(auto()) @map("_id") @db.ObjectId
+  entity              EntityType
+  changeType          Int
+  externalId          String?
+  nonce               String?
+  timestamp           BigInt?
+  sign                String?
+  processed           Boolean    @default(false)
+  processedAt         DateTime?
+  statusCode          Int?
+  errorMsg            String?
+  rawBody             Json?
+  createdAt           Int
+
+  // NEW: explicit notify discriminator (1=new order, 2=status update)
+  notifyType          Int?
+  // NEW: free-form notes / debug info
+  notes               String?
+
+  @@index([entity, externalId])
+  @@map("syncChangeNotification")
+}
+
+model SyncRequestSignature {
+  id         String   @id @default(auto()) @map("_id") @db.ObjectId
+  nonce      String   @unique
+  timestamp  BigInt
+  sign       String
+  endpoint   String
+  createdAt  Int
+
+  @@map("syncRequestSignature")
+}
+
+enum SyncRunType {
+  FULL
+  NOTIFY
+  RETRY
+}
+
+enum SyncAction {
+  CREATED
+  UPDATED
+  DELETED
+  NOOP        // fetched but nothing changed
+  FAILED
+}
+
+model SyncRun {
+  id              String      @id @default(auto()) @map("_id") @db.ObjectId
+  entity          EntityType
+  type            SyncRunType
+  isInitial       Boolean     @default(false)
+  referIdStart    String?
+  referIdEnd      String?
+  pageSize        Int?
+  processedCount  Int         @default(0)
+  createdCount    Int         @default(0)
+  updatedCount    Int         @default(0)
+  deletedCount    Int         @default(0)
+  failedCount     Int         @default(0)
+  startedAt       DateTime
+  finishedAt      DateTime?
+  status          Int?        // 0 success, non-zero error code
+  errorMsg        String?
+  notes           String?     // NEW: free-form reason/context
+  createdAt       Int
+  updatedAt       Int
+
+  records         SyncRecord[] @relation("RunToRecords")
+
+  @@index([entity, type, startedAt])
+  @@map("syncRun")
+}
+
+model SyncRecord {
+  id              String       @id @default(auto()) @map("_id") @db.ObjectId
+  runId           String
+  run             SyncRun      @relation("RunToRecords", fields: [runId], references: [id])
+  entity          EntityType
+  externalId      String
+  action          SyncAction
+  source          SyncRunType
+  processedAt     DateTime?    // CHANGED: allow pending records
+  notificationId  String?
+
+  // Snapshots / Audit
+  before          Json?
+  after           Json?
+  diff            Json?
+  checksumBefore  String?
+  checksumAfter   String?
+
+  // NEW: evidence & normalization
+  payloadRaw      String?      // verbatim upstream JSON string
+  payloadHash     String?      // sha256(payloadRaw)
+  normalized      Json?        // our mapped DTO snapshot
+  resultHash      String?      // sha256(normalized)
+  mediaAudit      Json?        // { photos: [...], video?: {...} }
+  hold            Boolean      @default(false) // legal hold (skip retention)
+  prevRecordId    String?      @db.ObjectId    // chain to previous record for same externalId
+
+  // Status semantics (existing)
+  status          Int?         // 0=success, 1=transient_failed(pending retry), 2=permanent_failed
+  errorMsg        String?
+  createdAt       Int
+  updatedAt       Int?
+
+
+  // ---------------- NEW: Job identity & retry scheduling ----------------
+  /// "SubmitOrder" | "SubmitStatus" (null for older/other record types)
+  jobType         String?
+  attempt         Int          @default(0)
+  maxAttempts     Int          @default(3)
+  /// epoch seconds for backoff scheduling
+  nextRunAt       Int?
+  /// De-dupe key; recommend `${jobType}:${externalId}`
+  dedupeKey       String?
+
+  // ---------------- NEW: HTTP request/response audit --------------------
+  requestUrl      String?
+  requestMethod   String?      // e.g., "POST"
+  requestHeaders  Json?
+  requestBody     Json?
+  responseCode    Int?
+  responseBody    String?
+  latencyMs       Int?
+
+  // ----------- NEW: Idempotency & future auth scaffolding ---------------
+  idempotencyKey  String?
+  securityNonce   String?
+  securityTs      BigInt?
+  securitySign    String?
+
+  @@index([entity, externalId, processedAt])
+  @@index([runId])
+  @@index([entity, action, processedAt])
+  @@index([entity, externalId, createdAt])  // quick newest lookups
+
+  // NEW: scheduler hot-path index
+  @@index([status, nextRunAt])
+
+  // NEW: safe uniqueness for one active logical job per order per type
+  // (compound unique is safe with nulls in Mongo—docs missing any field don't collide)
+  @@unique([entity, externalId, jobType])
+
+  @@map("syncRecord")
+}