redis.service.ts 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249
  1. // libs/db/src/redis/redis.service.ts
  2. import { Inject, Injectable, Optional } from '@nestjs/common';
  3. import type { Redis } from 'ioredis';
  4. import { REDIS_CLIENT } from './redis.constants';
  5. @Injectable()
  6. export class RedisService {
  7. constructor(
  8. @Optional()
  9. @Inject(REDIS_CLIENT)
  10. private readonly client?: Redis,
  11. ) {}
  12. private ensureClient(): Redis {
  13. if (!this.client) {
  14. throw new Error(
  15. 'Redis client is not available. Did you import RedisModule.forRootAsync?',
  16. );
  17. }
  18. return this.client;
  19. }
  20. async get(key: string): Promise<string | null> {
  21. const client = this.ensureClient();
  22. return client.get(key);
  23. }
  24. async set(
  25. key: string,
  26. value: string,
  27. ttlSeconds?: number,
  28. ): Promise<'OK' | null> {
  29. const client = this.ensureClient();
  30. if (ttlSeconds && ttlSeconds > 0) {
  31. return client.set(key, value, 'EX', ttlSeconds);
  32. }
  33. return client.set(key, value);
  34. }
  35. async del(...keys: string[]): Promise<number> {
  36. const client = this.ensureClient();
  37. if (!keys.length) return 0;
  38. return client.del(...keys);
  39. }
  40. async exists(key: string): Promise<boolean> {
  41. const client = this.ensureClient();
  42. const result = await client.exists(key);
  43. return result === 1;
  44. }
  45. async sadd(key: string, ...members: string[]): Promise<number> {
  46. const client = this.ensureClient();
  47. if (!members.length) return 0;
  48. return client.sadd(key, ...members);
  49. }
  50. async srandmember(key: string): Promise<string | null> {
  51. const client = this.ensureClient();
  52. return client.srandmember(key);
  53. }
  54. async scard(key: string): Promise<number> {
  55. const client = this.ensureClient();
  56. return client.scard(key);
  57. }
  58. async expire(key: string, ttlSeconds: number): Promise<boolean> {
  59. const client = this.ensureClient();
  60. const result = await client.expire(key, ttlSeconds);
  61. return result === 1;
  62. }
  63. async incr(key: string): Promise<number> {
  64. const client = this.ensureClient();
  65. return client.incr(key);
  66. }
  67. async ping(): Promise<string> {
  68. const client = this.ensureClient();
  69. return client.ping();
  70. }
  71. // Helper for JSON values
  72. async setJson<T>(
  73. key: string,
  74. value: T,
  75. ttlSeconds?: number,
  76. ): Promise<'OK' | null> {
  77. const json = JSON.stringify(value, (_, v) =>
  78. typeof v === 'bigint' ? v.toString() : v,
  79. );
  80. return this.set(key, json, ttlSeconds);
  81. }
  82. async getJson<T>(key: string): Promise<T | null> {
  83. const raw = await this.get(key);
  84. if (!raw) return null;
  85. try {
  86. return JSON.parse(raw) as T;
  87. } catch {
  88. return null;
  89. }
  90. }
  91. // 🔎 New helper: list keys by pattern (for cache-sync)
  92. async keys(pattern: string): Promise<string[]> {
  93. const client = this.ensureClient();
  94. return client.keys(pattern);
  95. }
  96. // 🔥 New helper: delete all keys matching a pattern
  97. async deleteByPattern(pattern: string): Promise<number> {
  98. const keys = await this.keys(pattern);
  99. if (!keys.length) return 0;
  100. return this.del(...keys);
  101. }
  102. // ─────────────────────────────────────────────
  103. // List operations
  104. // ─────────────────────────────────────────────
  105. /**
  106. * Push items to a Redis LIST (right push).
  107. * Atomically deletes the old key first, then pushes all items.
  108. */
  109. async rpushList(key: string, items: string[]): Promise<void> {
  110. const client = this.ensureClient();
  111. const pipeline = client.pipeline();
  112. // Delete old key first
  113. pipeline.del(key);
  114. // Push all items if any exist
  115. if (items.length > 0) {
  116. pipeline.rpush(key, ...items);
  117. }
  118. await pipeline.exec();
  119. }
  120. /**
  121. * Build a Redis ZSET with members and scores.
  122. * Atomically deletes the old key first, then adds all members.
  123. */
  124. async zadd(
  125. key: string,
  126. members: Array<{ score: number; member: string }>,
  127. ): Promise<void> {
  128. const client = this.ensureClient();
  129. const pipeline = client.pipeline();
  130. // Delete old key first
  131. pipeline.del(key);
  132. // Add members if any exist
  133. if (members.length > 0) {
  134. for (const { score, member } of members) {
  135. pipeline.zadd(key, score, member);
  136. }
  137. }
  138. await pipeline.exec();
  139. }
  140. /**
  141. * Get a range of elements from a Redis LIST by index.
  142. * Returns elements from start to stop (inclusive, 0-based).
  143. * Use 0 to -1 to get all elements.
  144. */
  145. async lrange(key: string, start: number, stop: number): Promise<string[]> {
  146. const client = this.ensureClient();
  147. return client.lrange(key, start, stop);
  148. }
  149. /**
  150. * Get a range of elements from a Redis ZSET in reverse score order.
  151. * Returns elements from offset to offset+limit-1.
  152. * Useful for pagination: zrevrange(key, (page-1)*pageSize, page*pageSize-1)
  153. */
  154. async zrevrange(key: string, start: number, stop: number): Promise<string[]> {
  155. const client = this.ensureClient();
  156. return client.zrevrange(key, start, stop);
  157. }
  158. // ─────────────────────────────────────────────
  159. // Pipelines & atomic swap helpers
  160. // ───────────────────────────────────────────── async pipelineSetJson(
  161. entries: Array<{ key: string; value: unknown; ttlSeconds?: number }>,
  162. ): Promise<void> {
  163. const client = this.ensureClient();
  164. if (!entries.length) return;
  165. const pipeline = client.pipeline();
  166. for (const { key, value, ttlSeconds } of entries) {
  167. const json = JSON.stringify(value, (_, v) =>
  168. typeof v === 'bigint' ? v.toString() : v,
  169. );
  170. if (ttlSeconds && ttlSeconds > 0) {
  171. pipeline.set(key, json, 'EX', ttlSeconds);
  172. } else {
  173. pipeline.set(key, json);
  174. }
  175. }
  176. await pipeline.exec();
  177. }
  178. /**
  179. * Write to temporary keys and atomically swap them into place with RENAME.
  180. * This avoids readers seeing a partially-updated state.
  181. */
  182. async atomicSwapJson(
  183. entries: Array<{ key: string; value: unknown; ttlSeconds?: number }>,
  184. ): Promise<void> {
  185. const client = this.ensureClient();
  186. if (!entries.length) return;
  187. const tempEntries = entries.map((e) => ({
  188. ...e,
  189. tempKey: `${e.key}:tmp:${Date.now()}:${Math.random().toString(36).slice(2)}`,
  190. }));
  191. // 1) Write all temp keys
  192. const writePipeline = client.pipeline();
  193. for (const { tempKey, value, ttlSeconds } of tempEntries as Array<{
  194. tempKey: string;
  195. value: unknown;
  196. ttlSeconds?: number;
  197. }>) {
  198. const json = JSON.stringify(value, (_, v) =>
  199. typeof v === 'bigint' ? v.toString() : v,
  200. );
  201. if (ttlSeconds && ttlSeconds > 0) {
  202. writePipeline.set(tempKey, json, 'EX', ttlSeconds);
  203. } else {
  204. writePipeline.set(tempKey, json);
  205. }
  206. }
  207. await writePipeline.exec();
  208. // 2) Atomically swap temp -> target via rename
  209. const renamePipeline = client.pipeline();
  210. for (const { key, tempKey } of tempEntries as Array<{
  211. key: string;
  212. tempKey: string;
  213. }>) {
  214. renamePipeline.rename(tempKey, key);
  215. }
  216. await renamePipeline.exec();
  217. }
  218. }