// box-app-api/src/feature/video/video.service.ts import { Injectable, Logger } from '@nestjs/common'; import { RedisService } from '@box/db/redis/redis.service'; import { PrismaMongoService } from '../../prisma/prisma-mongo.service'; import { tsCacheKeys } from '@box/common/cache/ts-cache-key.provider'; import type { VideoHomeSectionKey } from '@box/common/cache/ts-cache-key.provider'; import { RawVideoPayloadRow, toVideoPayload, VideoPayload, parseVideoPayload, VideoCacheHelper, } from '@box/common/cache/video-cache.helper'; import { VideoCategoryDto, VideoTagDto, VideoDetailDto, VideoPageDto, VideoCategoryWithTagsResponseDto, VideoListRequestDto, VideoListResponseDto, VideoSearchByTagRequestDto, VideoClickDto, RecommendedVideosDto, VideoItemDto, } from './dto'; import { RabbitmqPublisherService, StatsVideoClickEventPayload, } from '../../rabbitmq/rabbitmq-publisher.service'; import { randomUUID } from 'crypto'; import { nowEpochMsBigInt } from '@box/common/time/time.util'; import { CategoryType, RECOMMENDED_CATEGORY_ID, RECOMMENDED_CATEGORY_NAME, } from '../homepage/homepage.constants'; import { CategoryDto } from '../homepage/dto/homepage.dto'; /** * VideoService provides read-only access to video data from Redis cache. * All data is prebuilt and maintained by box-mgnt-api cache builders. * Follows the new Redis cache semantics where: * - Video list keys store video IDs only (not JSON objects) * - Tag metadata keys store tag JSON objects * - Video details are fetched separately using video IDs */ @Injectable() export class VideoService { private readonly logger = new Logger(VideoService.name); private readonly cacheHelper: VideoCacheHelper; constructor( private readonly redis: RedisService, private readonly mongoPrisma: PrismaMongoService, private readonly rabbitmqPublisher: RabbitmqPublisherService, ) { this.cacheHelper = new VideoCacheHelper(redis); } /** * Get video detail by videoId. * Reads from appVideoDetailKey (JSON). */ async getVideoDetail(videoId: string): Promise { try { const key = tsCacheKeys.video.detail(videoId); const cached = await this.redis.getJson(key); return cached ?? null; } catch (err) { this.logger.error( `Error fetching video detail for videoId=${videoId}`, err instanceof Error ? err.stack : String(err), ); return null; } } /** * Get home section videos for a channel. * Reads from appVideoHomeSectionKey (LIST of videoIds). * Returns video details for each ID. */ async getHomeSectionVideos( channelId: string, section: VideoHomeSectionKey, ): Promise { try { const key = tsCacheKeys.video.homeSection(channelId, section); // Use helper to read all video IDs from the LIST const videoIds = await this.cacheHelper.getVideoIdList(key); if (!videoIds || videoIds.length === 0) { return []; } // Fetch details for all videoIds const details = await this.getVideoDetailsBatch(videoIds); return details.filter((d) => d !== null) as VideoDetailDto[]; } catch (err) { this.logger.error( `Error fetching home section videos for channelId=${channelId}, section=${section}`, err instanceof Error ? err.stack : String(err), ); return []; } } private async getVideoDetailsBatch( videoIds: string[], ): Promise<(VideoDetailDto | null)[]> { if (!videoIds || videoIds.length === 0) { return []; } try { const keys = videoIds.map((id) => tsCacheKeys.video.detail(id)); const results: (VideoDetailDto | null)[] = []; // Fetch all in parallel for (const key of keys) { const cached = await this.redis.getJson(key); results.push(cached ?? null); } return results; } catch (err) { this.logger.error( `Error fetching video details batch`, err instanceof Error ? err.stack : String(err), ); return videoIds.map(() => null); } } private async getVideoPayloadsByIds( videoIds: string[], ): Promise { if (!videoIds || videoIds.length === 0) { return []; } try { const keys = videoIds.map((id) => tsCacheKeys.video.payload(id)); const cached = await this.redis.mget(keys); const payloadMap = new Map(); const missing = new Set(); cached.forEach((raw, idx) => { const id = videoIds[idx]; if (!raw) { missing.add(id); return; } const parsed = parseVideoPayload(raw); if (!parsed) { missing.add(id); return; } payloadMap.set(id, parsed); }); if (missing.size > 0) { const records = await this.mongoPrisma.videoMedia.findMany({ where: { id: { in: Array.from(missing) } }, select: { id: true, title: true, coverImg: true, coverImgNew: true, videoTime: true, country: true, firstTag: true, secondTags: true, preFileName: true, desc: true, size: true, updatedAt: true, filename: true, fieldNameFs: true, ext: true, }, }); if (records.length > 0) { const pipelineEntries = records.map((row: RawVideoPayloadRow) => ({ key: tsCacheKeys.video.payload(row.id), value: toVideoPayload(row), })); await this.redis.pipelineSetJson(pipelineEntries); for (const row of records) { payloadMap.set(row.id, toVideoPayload(row)); missing.delete(row.id); } } } return videoIds .map((id) => payloadMap.get(id)) .filter((payload): payload is VideoPayload => Boolean(payload)); } catch (err) { this.logger.error( `Error fetching video payloads for ids=${videoIds.join(',')}`, err instanceof Error ? err.stack : String(err), ); return []; } } /** * Get paginated list of videos for a category with optional tag filtering. * Reads video IDs from Redis cache, fetches full details from MongoDB, * and returns paginated results. */ async getVideoList(dto: VideoListRequestDto): Promise { const { page, size, tagName } = dto; const categoryId = dto.categoryId; let key: string; let tagId: string | undefined; // If tagName is provided but categoryId is not, fallback to searchVideosByTagName if (tagName && !categoryId) { this.logger.debug( `tagName provided without categoryId, falling back to searchVideosByTagName`, ); return this.searchVideosByTagName({ page, size, tagName }); } // Validate categoryId is provided when no tagName if (!categoryId) { this.logger.debug(`categoryId is required for getVideoList`); return { page, size, total: 0, tagName, items: [], }; } // Step 1: Resolve the Redis key if (!tagName) { // No tag filter - use category list key = tsCacheKeys.video.categoryList(categoryId); } else { // Tag filter - need to find tag ID first try { const tagKey = tsCacheKeys.tag.metadataByCategory(categoryId); const tags = await this.cacheHelper.getTagListForCategory(tagKey); if (!tags || tags.length === 0) { this.logger.debug( `No tags found for categoryId=${categoryId}, tagName=${tagName}`, ); return { page, size, total: 0, tagName, items: [], }; } const tag = tags.find((t) => t.name === tagName || t.id === tagName); if (!tag) { this.logger.debug( `Tag not found: categoryId=${categoryId}, tagName=${tagName}`, ); return { page, size, total: 0, tagName, items: [], }; } tagId = tag.id; key = tsCacheKeys.video.tagList(categoryId, tagId); } catch (err) { this.logger.error( `Error fetching tag for categoryId=${categoryId}, tagName=${tagName}`, err instanceof Error ? err.stack : String(err), ); return { page, size, total: 0, tagName, items: [], }; } } type VideoPayloadWithTags = RawVideoPayloadRow & { tagIds?: string[] }; // Step 2: Compute pagination indices const start = (page - 1) * size; const stop = start + size - 1; let total = 0; let pageVideoIds: string[] = []; let fallbackRecords: VideoPayloadWithTags[] = []; let videoTagMap = new Map(); let listKeyExists: boolean; try { listKeyExists = (await this.redis.exists(key)) > 0; } catch (err) { this.logger.error( `Error checking list key existence for key=${key}`, err instanceof Error ? err.stack : String(err), ); return { page, size, total: 0, tagName, items: [], }; } if (listKeyExists) { try { total = await this.redis.llen(key); } catch (err) { this.logger.error( `Error getting list length for key=${key}`, err instanceof Error ? err.stack : String(err), ); return { page, size, total: 0, tagName, items: [], }; } if (total === 0) { this.logger.debug(`Empty video list for key=${key}`); return { page, size, total: 0, tagName, items: [], }; } if (start >= total) { this.logger.debug( `Page out of range: page=${page}, size=${size}, total=${total}, key=${key}`, ); return { page, size, total, tagName, items: [], }; } try { pageVideoIds = await this.redis.lrange(key, start, stop); } catch (err) { this.logger.error( `Error fetching video IDs from key=${key}`, err instanceof Error ? err.stack : String(err), ); return { page, size, total, tagName, items: [], }; } if (!pageVideoIds || pageVideoIds.length === 0) { this.logger.debug(`No video IDs found for key=${key}`); return { page, size, total, tagName, items: [], }; } } else { this.logger.debug(`Cache miss for video list key=${key}`); try { const where = tagId ? { categoryIds: { has: categoryId }, status: 'Completed', tagIds: { has: tagId }, } : { categoryIds: { has: categoryId }, status: 'Completed', }; fallbackRecords = (await this.mongoPrisma.videoMedia.findMany({ where, orderBy: [{ addedTime: 'desc' }, { createdAt: 'desc' }], select: { id: true, title: true, coverImg: true, coverImgNew: true, videoTime: true, country: true, firstTag: true, secondTags: true, preFileName: true, desc: true, size: true, updatedAt: true, filename: true, fieldNameFs: true, ext: true, tagIds: true, }, })) as VideoPayloadWithTags[]; } catch (err) { this.logger.error( `Error fetching videos from MongoDB for fallback key=${key}`, err instanceof Error ? err.stack : String(err), ); return { page, size, total: 0, tagName, items: [], }; } const allVideoIds = fallbackRecords.map((video) => video.id); total = allVideoIds.length; try { await this.cacheHelper.saveVideoIdList(key, allVideoIds); } catch (err) { this.logger.error( `Error saving video ID list for key=${key}`, err instanceof Error ? err.stack : String(err), ); } const entries = fallbackRecords.map((video) => ({ key: tsCacheKeys.video.payload(video.id), value: toVideoPayload(video), })); try { await this.redis.pipelineSetJson(entries); } catch (err) { this.logger.error( `Error writing payload cache for fallback key=${key}`, err instanceof Error ? err.stack : String(err), ); } if (total === 0) { this.logger.debug(`No videos found for fallback key=${key}`); return { page, size, total: 0, tagName, items: [], }; } if (start >= total) { this.logger.debug( `Page out of range: page=${page}, size=${size}, total=${total}, key=${key}`, ); return { page, size, total, tagName, items: [], }; } const slicedIds = allVideoIds.slice(start, stop + 1); pageVideoIds = slicedIds; videoTagMap = new Map( fallbackRecords.map((video) => [ video.id, Array.isArray(video.tagIds) ? video.tagIds : [], ]), ); } if (!pageVideoIds.length) { return { page, size, total, tagName, items: [], }; } if (!videoTagMap.size) { try { const tagRows = await this.mongoPrisma.videoMedia.findMany({ where: { id: { in: pageVideoIds } }, select: { id: true, tagIds: true }, }); for (const row of tagRows) { videoTagMap.set(row.id, Array.isArray(row.tagIds) ? row.tagIds : []); } } catch (err) { this.logger.error( `Error fetching video tag IDs for ids=${pageVideoIds.join(',')}`, err instanceof Error ? err.stack : String(err), ); } } const allTagIds = new Set(); for (const ids of videoTagMap.values()) { if (Array.isArray(ids)) { for (const tid of ids) { allTagIds.add(tid); } } } let tagsById = new Map(); if (allTagIds.size > 0) { try { const tagsList = await this.mongoPrisma.tag.findMany({ where: { id: { in: Array.from(allTagIds) } }, select: { id: true, name: true }, }); tagsById = new Map(tagsList.map((tag) => [tag.id, tag.name])); } catch (err) { this.logger.error( `Error fetching tags from MongoDB`, err instanceof Error ? err.stack : String(err), ); } } let category; try { category = await this.mongoPrisma.category.findUnique({ where: { id: categoryId }, }); } catch (err) { this.logger.error( `Error fetching category for categoryId=${categoryId}`, err instanceof Error ? err.stack : String(err), ); } const payloads = await this.getVideoPayloadsByIds(pageVideoIds); const payloadMap = new Map( payloads.map((payload) => [payload.id, payload]), ); const items = pageVideoIds .map((videoId) => { const payload = payloadMap.get(videoId); if (!payload) { this.logger.debug(`Video payload missing for videoId=${videoId}`); return null; } const tags: string[] = []; const videoTagIds = videoTagMap.get(videoId); if (videoTagIds && Array.isArray(videoTagIds)) { for (const tid of videoTagIds) { const tagName = tagsById.get(tid); if (tagName) { tags.push(tagName); } } } return { id: payload.id, title: payload.title ?? '', coverImg: payload.coverImg ?? undefined, duration: payload.videoTime ?? undefined, categoryId, name: category?.name ?? '', subtitle: category?.subtitle ?? undefined, tags, updateAt: payload.updatedAt ?? new Date().toISOString(), }; }) .filter((item): item is NonNullable => item !== null); return { page, size, total, tagName, items, }; } async searchVideosByTagName( dto: VideoSearchByTagRequestDto, ): Promise { const { page, size, tagName } = dto; // Step 1: Load all categories let categories: Array<{ id: string; name: string; subtitle?: string; }>; try { const categoriesKey = tsCacheKeys.category.all(); categories = await this.redis.getJson< Array<{ id: string; name: string; subtitle?: string; }> >(categoriesKey); if (!categories || categories.length === 0) { // Fallback to MongoDB if Redis cache is empty this.logger.debug( 'Categories not found in Redis, fetching from MongoDB', ); const categoriesFromDb = await this.mongoPrisma.category.findMany({ select: { id: true, name: true, subtitle: true, }, }); categories = categoriesFromDb.map((c) => ({ id: c.id, name: c.name ?? '', subtitle: c.subtitle ?? undefined, })); } } catch (err) { this.logger.error( 'Error loading categories for tag search', err instanceof Error ? err.stack : String(err), ); return { page, size, total: 0, tagName, items: [], }; } if (!categories || categories.length === 0) { this.logger.debug('No categories found'); return { page, size, total: 0, tagName, items: [], }; } // Step 2 & 3: For each category, find matching tags and collect (categoryId, tagId) pairs const categoryTagPairs: Array<{ categoryId: string; tagId: string }> = []; for (const category of categories) { try { const tagKey = tsCacheKeys.tag.metadataByCategory(category.id); const tagsMetadata = await this.cacheHelper.getTagListForCategory(tagKey); const matchingTags = (tagsMetadata ?? []).filter( (t) => t.name === tagName, ); for (const tag of matchingTags) { categoryTagPairs.push({ categoryId: category.id, tagId: tag.id, }); } } catch (err) { this.logger.debug( `Error fetching tags for categoryId=${category.id}`, err instanceof Error ? err.stack : String(err), ); // Continue with next category } } if (categoryTagPairs.length === 0) { this.logger.debug(`No categories found with tag: ${tagName}`); return { page, size, total: 0, tagName, items: [], }; } // Step 4: For each (categoryId, tagId) pair, read all video IDs const allVideoIds: string[] = []; for (const pair of categoryTagPairs) { try { const key = tsCacheKeys.video.tagList(pair.categoryId, pair.tagId); const videoIds = await this.redis.lrange(key, 0, -1); if (videoIds && videoIds.length > 0) { allVideoIds.push(...videoIds); } } catch (err) { this.logger.debug( `Error reading video IDs for categoryId=${pair.categoryId}, tagId=${pair.tagId}`, err instanceof Error ? err.stack : String(err), ); // Continue with next pair } } if (allVideoIds.length === 0) { this.logger.debug(`No videos found for tag: ${tagName}`); return { page, size, total: 0, tagName, items: [], }; } // Step 5: Deduplicate and compute total const uniqueVideoIds = Array.from(new Set(allVideoIds)); const total = uniqueVideoIds.length; // Step 6: Apply in-memory pagination const start = (page - 1) * size; const end = start + size; const pagedIds = uniqueVideoIds.slice(start, end); if (pagedIds.length === 0) { return { page, size, total, tagName, items: [], }; } // Step 7: Fetch videos from MongoDB let videos: Awaited< ReturnType >; try { videos = await this.mongoPrisma.videoMedia.findMany({ where: { id: { in: pagedIds }, }, }); } catch (err) { this.logger.error( `Error fetching videos from MongoDB for tag search`, err instanceof Error ? err.stack : String(err), ); return { page, size, total, tagName, items: [], }; } if (!videos || videos.length === 0) { return { page, size, total, tagName, items: [], }; } // Fetch category data for each video const categoryIdSet = new Set(); for (const video of videos) { if (video.categoryIds && Array.isArray(video.categoryIds)) { for (const cid of video.categoryIds) { categoryIdSet.add(cid); } } } const categoryIdsList = Array.from(categoryIdSet); const categoriesMap = new Map( categories .filter((c) => categoryIdsList.includes(c.id)) .map((c) => [c.id, c]), ); // Fetch all tags for mapping tag IDs to names const allTagIds = Array.from( new Set( videos.flatMap((v) => v.tagIds && Array.isArray(v.tagIds) ? v.tagIds : [], ), ), ); const tagsById = new Map(); try { if (allTagIds.length > 0) { const tags = await this.mongoPrisma.tag.findMany({ where: { id: { in: allTagIds }, }, select: { id: true, name: true, }, }); for (const tag of tags) { if (tag.name) { tagsById.set(tag.id, tag.name); } } } } catch (err) { this.logger.error( 'Error fetching tags for search results', err instanceof Error ? err.stack : String(err), ); // Continue without tag names } // Step 8: Map to VideoListItemDto (maintain order of pagedIds) const videoMap = new Map(videos.map((v) => [v.id, v])); const items = pagedIds .map((videoId) => { const video = videoMap.get(videoId); if (!video) { return null; } // Use first category ID from categoryIds array const firstCategoryId = Array.isArray(video.categoryIds) && video.categoryIds.length > 0 ? video.categoryIds[0] : undefined; const category = firstCategoryId ? categoriesMap.get(firstCategoryId) : undefined; // Map tag IDs to tag names const tags: string[] = []; if (video.tagIds && Array.isArray(video.tagIds)) { for (const tid of video.tagIds) { const tagName = tagsById.get(tid); if (tagName) { tags.push(tagName); } } } return { id: video.id, title: video.title ?? '', coverImg: video.coverImg ?? undefined, duration: video.videoTime ?? undefined, categoryId: firstCategoryId ?? '', name: category?.name ?? '', subtitle: category?.subtitle ?? undefined, tags, updateAt: video.updatedAt?.toString() ?? new Date().toISOString(), }; }) .filter((item): item is NonNullable => item !== null); return { page, size, total, tagName, items, }; } /** * Record video click event. * Publishes a stats.video.click event to RabbitMQ for analytics processing. * Uses fire-and-forget pattern for non-blocking operation. * * @param uid - User ID from JWT * @param body - Video click data from client * @param ip - Client IP address * @param userAgent - User agent string (unused but kept for compatibility) */ async recordVideoClick( uid: string, body: VideoClickDto, ip: string, userAgent: string, ): Promise { const clickedAt = nowEpochMsBigInt(); const payload: StatsVideoClickEventPayload = { messageId: randomUUID(), uid, videoId: body.videoId, clickedAt, ip, }; // Fire-and-forget: don't await, log errors asynchronously this.rabbitmqPublisher.publishStatsVideoClick(payload).catch((error) => { const message = error instanceof Error ? error.message : String(error); const stack = error instanceof Error ? error.stack : undefined; this.logger.error( `Failed to publish stats.video.click for videoId=${body.videoId}, uid=${uid}: ${message}`, stack, ); }); this.logger.debug( `Initiated stats.video.click publish for videoId=${body.videoId}, uid=${uid}`, ); } /** * Fisher-Yates shuffle for random ordering */ private shuffle(array: T[]): T[] { const shuffled = [...array]; for (let i = shuffled.length - 1; i > 0; i--) { const j = Math.floor(Math.random() * (i + 1)); [shuffled[i], shuffled[j]] = [shuffled[j], shuffled[i]]; } return shuffled; } /** * Get video categories for homepage * Returns shuffled categories with "推荐" as first item */ async getCategories(): Promise { try { const categories = await this.mongoPrisma.category.findMany({ where: { status: 1, // active only }, orderBy: { seq: 'asc', }, }); // Shuffle regular categories (keep recommended first) const recommended: CategoryDto = { id: RECOMMENDED_CATEGORY_ID, name: RECOMMENDED_CATEGORY_NAME, type: CategoryType.RECOMMENDED, isDefault: true, seq: 0, }; const regular = this.shuffle( categories.map((c, idx) => ({ id: c.id, name: c.name, type: CategoryType.REGULAR, isDefault: false, seq: idx + 1, })), ); return [recommended, ...regular]; } catch (error) { this.logger.warn( 'Category collection not found or error fetching categories', ); return [ { id: RECOMMENDED_CATEGORY_ID, name: RECOMMENDED_CATEGORY_NAME, type: CategoryType.RECOMMENDED, isDefault: true, seq: 0, }, ]; } } /** * Get recommended videos (7 random videos for homepage) */ async getRecommendedVideos(): Promise { try { // Try to fetch from Redis cache first const cached = await this.redis.getJson( tsCacheKeys.video.recommended(), ); if (cached && Array.isArray(cached) && cached.length > 0) { this.logger.debug( `[getRecommendedVideos] Returning ${cached.length} videos from cache`, ); return { items: cached, total: cached.length, }; } // Fallback to MongoDB if cache miss this.logger.warn( '[getRecommendedVideos] Cache miss, falling back to MongoDB', ); const videos = await this.mongoPrisma.videoMedia.aggregateRaw({ pipeline: [ { $match: { status: 'Completed' } }, { $sample: { size: 7 } }, ], }); const items = (Array.isArray(videos) ? videos : []).map((v: any) => this.mapVideoToDto(v), ); return { items, total: items.length, }; } catch (error) { this.logger.warn('Error fetching recommended videos, returning empty'); return { items: [], total: 0, }; } } /** * Map raw video from MongoDB to VideoItemDto */ mapVideoToDto(video: any): VideoItemDto { return { id: video._id?.$oid ?? video._id?.toString() ?? video.id, title: video.title ?? '', coverImg: video.coverImg ?? undefined, coverImgNew: video.coverImgNew ?? undefined, videoTime: video.videoTime ?? undefined, publish: video.publish ?? undefined, secondTags: Array.isArray(video.secondTags) ? video.secondTags : [], updatedAt: video.updatedAt?.$date ? new Date(video.updatedAt.$date) : video.updatedAt ? new Date(video.updatedAt) : undefined, filename: video.filename ?? undefined, fieldNameFs: video.fieldNameFs ?? undefined, width: video.width ?? undefined, height: video.height ?? undefined, tags: Array.isArray(video.tags) ? video.tags : [], preFileName: video.preFileName ?? undefined, actors: Array.isArray(video.actors) ? video.actors : [], size: video.size !== undefined && video.size !== null ? String(video.size) : undefined, }; } /** * Read the cached video list key built by box-mgnt-api. */ async getVideoListFromCache(): Promise { const key = tsCacheKeys.video.list(); try { const raw = await this.redis.get(key); if (!raw) { return []; } const parsed = JSON.parse(raw); if (Array.isArray(parsed)) { return parsed; } } catch (err) { this.logger.error( `Failed to read video list cache (${key})`, err instanceof Error ? err.stack : String(err), ); } return []; } /** * Search the cached video list by secondTags, with fallback for videos that have no secondTags. */ async searchVideosBySecondTags(tags?: string): Promise { const videos = await this.getVideoListFromCache(); if (!tags) { return videos; } const requestedTags = tags .split(',') .map((tag) => tag.trim()) .filter((tag) => tag.length > 0); if (requestedTags.length === 0) { return videos; } const tagSet = new Set(requestedTags); return videos.filter((video) => this.matchesSecondTags(video, tagSet)); } private matchesSecondTags( video: VideoItemDto, filters: Set, ): boolean { const secondTags = Array.isArray(video.secondTags) ? video.secondTags .map((tag) => tag?.trim()) .filter( (tag): tag is string => typeof tag === 'string' && tag.length > 0, ) : []; if (secondTags.length === 0) { // return true; } return secondTags.some((tag) => filters.has(tag)); } }