| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249 |
- // libs/db/src/redis/redis.service.ts
- import { Inject, Injectable, Optional } from '@nestjs/common';
- import type { Redis } from 'ioredis';
- import { REDIS_CLIENT } from './redis.constants';
- @Injectable()
- export class RedisService {
- constructor(
- @Optional()
- @Inject(REDIS_CLIENT)
- private readonly client?: Redis,
- ) {}
- private ensureClient(): Redis {
- if (!this.client) {
- throw new Error(
- 'Redis client is not available. Did you import RedisModule.forRootAsync?',
- );
- }
- return this.client;
- }
- async get(key: string): Promise<string | null> {
- const client = this.ensureClient();
- return client.get(key);
- }
- async set(
- key: string,
- value: string,
- ttlSeconds?: number,
- ): Promise<'OK' | null> {
- const client = this.ensureClient();
- if (ttlSeconds && ttlSeconds > 0) {
- return client.set(key, value, 'EX', ttlSeconds);
- }
- return client.set(key, value);
- }
- async del(...keys: string[]): Promise<number> {
- const client = this.ensureClient();
- if (!keys.length) return 0;
- return client.del(...keys);
- }
- async exists(key: string): Promise<boolean> {
- const client = this.ensureClient();
- const result = await client.exists(key);
- return result === 1;
- }
- async sadd(key: string, ...members: string[]): Promise<number> {
- const client = this.ensureClient();
- if (!members.length) return 0;
- return client.sadd(key, ...members);
- }
- async srandmember(key: string): Promise<string | null> {
- const client = this.ensureClient();
- return client.srandmember(key);
- }
- async scard(key: string): Promise<number> {
- const client = this.ensureClient();
- return client.scard(key);
- }
- async expire(key: string, ttlSeconds: number): Promise<boolean> {
- const client = this.ensureClient();
- const result = await client.expire(key, ttlSeconds);
- return result === 1;
- }
- async incr(key: string): Promise<number> {
- const client = this.ensureClient();
- return client.incr(key);
- }
- async ping(): Promise<string> {
- const client = this.ensureClient();
- return client.ping();
- }
- // Helper for JSON values
- async setJson<T>(
- key: string,
- value: T,
- ttlSeconds?: number,
- ): Promise<'OK' | null> {
- const json = JSON.stringify(value, (_, v) =>
- typeof v === 'bigint' ? v.toString() : v,
- );
- return this.set(key, json, ttlSeconds);
- }
- async getJson<T>(key: string): Promise<T | null> {
- const raw = await this.get(key);
- if (!raw) return null;
- try {
- return JSON.parse(raw) as T;
- } catch {
- return null;
- }
- }
- // 🔎 New helper: list keys by pattern (for cache-sync)
- async keys(pattern: string): Promise<string[]> {
- const client = this.ensureClient();
- return client.keys(pattern);
- }
- // 🔥 New helper: delete all keys matching a pattern
- async deleteByPattern(pattern: string): Promise<number> {
- const keys = await this.keys(pattern);
- if (!keys.length) return 0;
- return this.del(...keys);
- }
- // ─────────────────────────────────────────────
- // List operations
- // ─────────────────────────────────────────────
- /**
- * Push items to a Redis LIST (right push).
- * Atomically deletes the old key first, then pushes all items.
- */
- async rpushList(key: string, items: string[]): Promise<void> {
- const client = this.ensureClient();
- const pipeline = client.pipeline();
- // Delete old key first
- pipeline.del(key);
- // Push all items if any exist
- if (items.length > 0) {
- pipeline.rpush(key, ...items);
- }
- await pipeline.exec();
- }
- /**
- * Build a Redis ZSET with members and scores.
- * Atomically deletes the old key first, then adds all members.
- */
- async zadd(
- key: string,
- members: Array<{ score: number; member: string }>,
- ): Promise<void> {
- const client = this.ensureClient();
- const pipeline = client.pipeline();
- // Delete old key first
- pipeline.del(key);
- // Add members if any exist
- if (members.length > 0) {
- for (const { score, member } of members) {
- pipeline.zadd(key, score, member);
- }
- }
- await pipeline.exec();
- }
- /**
- * Get a range of elements from a Redis LIST by index.
- * Returns elements from start to stop (inclusive, 0-based).
- * Use 0 to -1 to get all elements.
- */
- async lrange(key: string, start: number, stop: number): Promise<string[]> {
- const client = this.ensureClient();
- return client.lrange(key, start, stop);
- }
- /**
- * Get a range of elements from a Redis ZSET in reverse score order.
- * Returns elements from offset to offset+limit-1.
- * Useful for pagination: zrevrange(key, (page-1)*pageSize, page*pageSize-1)
- */
- async zrevrange(key: string, start: number, stop: number): Promise<string[]> {
- const client = this.ensureClient();
- return client.zrevrange(key, start, stop);
- }
- // ─────────────────────────────────────────────
- // Pipelines & atomic swap helpers
- // ───────────────────────────────────────────── async pipelineSetJson(
- entries: Array<{ key: string; value: unknown; ttlSeconds?: number }>,
- ): Promise<void> {
- const client = this.ensureClient();
- if (!entries.length) return;
- const pipeline = client.pipeline();
- for (const { key, value, ttlSeconds } of entries) {
- const json = JSON.stringify(value, (_, v) =>
- typeof v === 'bigint' ? v.toString() : v,
- );
- if (ttlSeconds && ttlSeconds > 0) {
- pipeline.set(key, json, 'EX', ttlSeconds);
- } else {
- pipeline.set(key, json);
- }
- }
- await pipeline.exec();
- }
- /**
- * Write to temporary keys and atomically swap them into place with RENAME.
- * This avoids readers seeing a partially-updated state.
- */
- async atomicSwapJson(
- entries: Array<{ key: string; value: unknown; ttlSeconds?: number }>,
- ): Promise<void> {
- const client = this.ensureClient();
- if (!entries.length) return;
- const tempEntries = entries.map((e) => ({
- ...e,
- tempKey: `${e.key}:tmp:${Date.now()}:${Math.random().toString(36).slice(2)}`,
- }));
- // 1) Write all temp keys
- const writePipeline = client.pipeline();
- for (const { tempKey, value, ttlSeconds } of tempEntries as Array<{
- tempKey: string;
- value: unknown;
- ttlSeconds?: number;
- }>) {
- const json = JSON.stringify(value, (_, v) =>
- typeof v === 'bigint' ? v.toString() : v,
- );
- if (ttlSeconds && ttlSeconds > 0) {
- writePipeline.set(tempKey, json, 'EX', ttlSeconds);
- } else {
- writePipeline.set(tempKey, json);
- }
- }
- await writePipeline.exec();
- // 2) Atomically swap temp -> target via rename
- const renamePipeline = client.pipeline();
- for (const { key, tempKey } of tempEntries as Array<{
- key: string;
- tempKey: string;
- }>) {
- renamePipeline.rename(tempKey, key);
- }
- await renamePipeline.exec();
- }
- }
|