rabbitmq-publisher.service.ts 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854
  1. // apps/box-app-api/src/rabbitmq/rabbitmq-publisher.service.ts
  2. import {
  3. Injectable,
  4. Logger,
  5. OnModuleDestroy,
  6. OnModuleInit,
  7. } from '@nestjs/common';
  8. import { ConfigService } from '@nestjs/config';
  9. import { Connection, ConfirmChannel } from 'amqplib';
  10. import * as amqp from 'amqplib';
  11. import { UserLoginEventPayload } from '@box/common/events/user-login-event.dto';
  12. import { AdsClickEventPayload } from '@box/common/events/ads-click-event.dto';
  13. import { nowEpochMsBigInt } from '@box/common/time/time.util';
  14. import { RedisService } from '@box/db/redis/redis.service';
  15. type StatsAdClickRoutingKey = string;
  16. type StatsVideoClickRoutingKey = string;
  17. type StatsAdImpressionRoutingKey = string;
  18. export interface StatsAdClickEventPayload {
  19. messageId: string;
  20. uid: string;
  21. adId: string;
  22. adType: string;
  23. clickedAt: bigint;
  24. ip: string;
  25. }
  26. export interface StatsVideoClickEventPayload {
  27. messageId: string;
  28. uid: string;
  29. videoId: string;
  30. clickedAt: bigint;
  31. ip: string;
  32. }
  33. export interface StatsAdImpressionEventPayload {
  34. messageId: string;
  35. uid: string;
  36. adId: string;
  37. adType: string;
  38. impressionAt: bigint;
  39. visibleDurationMs?: number;
  40. ip: string;
  41. }
  42. // Circuit breaker states
  43. enum CircuitBreakerState {
  44. CLOSED = 'CLOSED', // Normal operation
  45. OPEN = 'OPEN', // Failing, reject requests
  46. HALF_OPEN = 'HALF_OPEN', // Testing if service recovered
  47. }
  48. interface CircuitBreakerConfig {
  49. failureThreshold: number; // Number of failures to open circuit
  50. successThreshold: number; // Number of successes to close circuit
  51. timeout: number; // Time in ms to wait before trying again (half-open)
  52. }
  53. @Injectable()
  54. export class RabbitmqPublisherService implements OnModuleInit, OnModuleDestroy {
  55. private readonly logger = new Logger(RabbitmqPublisherService.name);
  56. private connection?: Connection;
  57. private channel?: ConfirmChannel;
  58. private exchange!: string;
  59. private routingKeyLogin!: string;
  60. private routingKeyAdsClick!: string;
  61. private statsExchange!: string;
  62. private routingKeyStatsAdClick!: StatsAdClickRoutingKey;
  63. private routingKeyStatsVideoClick!: StatsVideoClickRoutingKey;
  64. private routingKeyStatsAdImpression!: StatsAdImpressionRoutingKey;
  65. private dlqExchange!: string;
  66. // Circuit breaker state
  67. private circuitState: CircuitBreakerState = CircuitBreakerState.CLOSED;
  68. private failureCount = 0;
  69. private successCount = 0;
  70. private nextAttemptTime = 0;
  71. private readonly circuitConfig: CircuitBreakerConfig = {
  72. failureThreshold: 5, // Open circuit after 5 failures
  73. successThreshold: 2, // Close circuit after 2 successes
  74. timeout: 60000, // Wait 60s before trying again
  75. };
  76. // Reconnection state
  77. private isReconnecting = false;
  78. private reconnectionScheduled = false;
  79. // Retry configuration
  80. private readonly maxRetries = 3;
  81. private readonly retryDelays = [100, 500, 2000]; // Exponential backoff
  82. // Message TTL (24 hours for fallback queue)
  83. private readonly messageTTL = 86400000; // 24 hours in ms
  84. private readonly idempotencyTTL = 604800; // 7 days in seconds
  85. constructor(
  86. private readonly config: ConfigService,
  87. private readonly redis: RedisService,
  88. ) {}
  89. async onModuleInit(): Promise<void> {
  90. const url = this.config.get<string>('RABBITMQ_URL');
  91. this.exchange =
  92. this.config.get<string>('RABBITMQ_LOGIN_EXCHANGE') ?? 'stats.user';
  93. this.routingKeyLogin =
  94. this.config.get<string>('RABBITMQ_LOGIN_ROUTING_KEY') ?? 'user.login';
  95. this.routingKeyAdsClick =
  96. this.config.get<string>('RABBITMQ_ADS_CLICK_ROUTING_KEY') ?? 'ads.click';
  97. this.statsExchange =
  98. this.config.get<string>('RABBITMQ_STATS_EXCHANGE') ?? this.exchange;
  99. this.routingKeyStatsAdClick =
  100. this.config.get<string>('RABBITMQ_STATS_AD_CLICK_ROUTING_KEY') ??
  101. 'stats.ad.click';
  102. this.routingKeyStatsVideoClick =
  103. this.config.get<string>('RABBITMQ_STATS_VIDEO_CLICK_ROUTING_KEY') ??
  104. 'stats.video.click';
  105. this.routingKeyStatsAdImpression =
  106. this.config.get<string>('RABBITMQ_STATS_AD_IMPRESSION_ROUTING_KEY') ??
  107. 'stats.ad.impression';
  108. this.dlqExchange =
  109. this.config.get<string>('RABBITMQ_DLQ_EXCHANGE') ?? 'dlq.stats';
  110. if (!url) {
  111. this.logger.error(
  112. 'RABBITMQ_URL is not set. Stats will be stored in Redis fallback queue only.',
  113. );
  114. this.circuitState = CircuitBreakerState.OPEN;
  115. return;
  116. }
  117. try {
  118. this.logger.log(`Connecting to RabbitMQ at ${url} ...`);
  119. await this.initializeConnection(url);
  120. this.logger.log('RabbitMQ connection initialized successfully');
  121. } catch (error) {
  122. this.logger.error(
  123. `Failed to initialize RabbitMQ connection: ${error}`,
  124. error instanceof Error ? error.stack : undefined,
  125. );
  126. this.circuitState = CircuitBreakerState.OPEN;
  127. this.nextAttemptTime = Date.now() + this.circuitConfig.timeout;
  128. }
  129. }
  130. private async initializeConnection(url: string): Promise<void> {
  131. this.connection = await amqp.connect(url);
  132. // Handle connection errors
  133. this.connection.on('error', (err) => {
  134. this.logger.error('RabbitMQ connection error:', err);
  135. this.openCircuit();
  136. });
  137. this.connection.on('close', () => {
  138. this.logger.warn('RabbitMQ connection closed');
  139. this.openCircuit();
  140. });
  141. // Use a confirm channel so we know when broker has accepted the message
  142. this.channel = await this.connection.createConfirmChannel();
  143. // Handle channel errors
  144. this.channel.on('error', (err) => {
  145. this.logger.error('RabbitMQ channel error:', err);
  146. this.openCircuit();
  147. });
  148. this.channel.on('close', () => {
  149. this.logger.warn('RabbitMQ channel closed');
  150. this.openCircuit();
  151. });
  152. // Assert exchanges with DLQ
  153. await this.channel.assertExchange(this.exchange, 'topic', {
  154. durable: true,
  155. });
  156. if (this.statsExchange !== this.exchange) {
  157. await this.channel.assertExchange(this.statsExchange, 'topic', {
  158. durable: true,
  159. });
  160. }
  161. // Assert Dead Letter Exchange
  162. await this.channel.assertExchange(this.dlqExchange, 'topic', {
  163. durable: true,
  164. });
  165. // Assert DLQ queue for stats events
  166. await this.channel.assertQueue('dlq.stats.events', {
  167. durable: true,
  168. arguments: {
  169. 'x-message-ttl': this.messageTTL, // Messages expire after 24 hours
  170. 'x-max-length': 100000, // Maximum 100k messages in DLQ
  171. },
  172. });
  173. // Bind DLQ queue to DLQ exchange
  174. // Routing convention: sendToDLQ() publishes with 'dlq.{original-routing-key}' format
  175. // Examples: dlq.stats.ad.click, dlq.stats.video.click, dlq.stats.ad.impression
  176. // Pattern 'dlq.#' matches all DLQ messages regardless of their original routing key
  177. await this.channel.bindQueue('dlq.stats.events', this.dlqExchange, 'dlq.#');
  178. this.logger.log(
  179. `RabbitMQ publisher ready. exchange="${this.exchange}", statsExchange="${this.statsExchange}", dlqExchange="${this.dlqExchange}"`,
  180. );
  181. }
  182. async onModuleDestroy(): Promise<void> {
  183. try {
  184. await this.channel?.close();
  185. await this.connection?.close();
  186. } catch (error: any) {
  187. this.logger.error('Error while closing RabbitMQ connection', error.stack);
  188. }
  189. }
  190. /**
  191. * Circuit breaker: Open circuit (stop attempting to send to RabbitMQ)
  192. */
  193. private openCircuit(): void {
  194. if (this.circuitState !== CircuitBreakerState.OPEN) {
  195. this.logger.warn(
  196. `Circuit breaker OPENED (failureCount=${this.failureCount}, successCount=${this.successCount}). Will retry after ${this.circuitConfig.timeout}ms`,
  197. );
  198. this.circuitState = CircuitBreakerState.OPEN;
  199. this.failureCount = 0;
  200. this.successCount = 0;
  201. this.nextAttemptTime = Date.now() + this.circuitConfig.timeout;
  202. // Schedule reconnection attempt
  203. this.scheduleReconnection();
  204. }
  205. }
  206. /**
  207. * Circuit breaker: Move to half-open state (test if service recovered)
  208. */
  209. private async halfOpenCircuit(): Promise<void> {
  210. this.logger.log(
  211. `Circuit breaker HALF-OPEN (failureCount=${this.failureCount}, successCount=${this.successCount}). Testing connection...`,
  212. );
  213. this.circuitState = CircuitBreakerState.HALF_OPEN;
  214. this.successCount = 0;
  215. // Attempt reconnection before allowing publish attempts
  216. await this.reconnectIfNeeded();
  217. }
  218. /**
  219. * Circuit breaker: Close circuit (resume normal operation)
  220. */
  221. private closeCircuit(): void {
  222. this.logger.log(
  223. `Circuit breaker CLOSED (failureCount=${this.failureCount}, successCount=${this.successCount}). Resuming normal operation.`,
  224. );
  225. this.circuitState = CircuitBreakerState.CLOSED;
  226. this.failureCount = 0;
  227. this.successCount = 0;
  228. }
  229. /**
  230. * Record successful publish (for circuit breaker)
  231. */
  232. private recordSuccess(): void {
  233. this.failureCount = 0;
  234. if (this.circuitState === CircuitBreakerState.HALF_OPEN) {
  235. this.successCount++;
  236. if (this.successCount >= this.circuitConfig.successThreshold) {
  237. this.closeCircuit();
  238. }
  239. }
  240. }
  241. /**
  242. * Record failed publish (for circuit breaker)
  243. */
  244. private recordFailure(): void {
  245. this.failureCount++;
  246. if (this.circuitState === CircuitBreakerState.HALF_OPEN) {
  247. this.openCircuit();
  248. } else if (
  249. this.circuitState === CircuitBreakerState.CLOSED &&
  250. this.failureCount >= this.circuitConfig.failureThreshold
  251. ) {
  252. this.openCircuit();
  253. }
  254. }
  255. /**
  256. * Check if circuit breaker allows request
  257. */
  258. private async canAttempt(): Promise<boolean> {
  259. if (this.circuitState === CircuitBreakerState.CLOSED) {
  260. return true;
  261. }
  262. if (this.circuitState === CircuitBreakerState.HALF_OPEN) {
  263. return true;
  264. }
  265. // OPEN state: check if timeout elapsed
  266. if (Date.now() >= this.nextAttemptTime) {
  267. await this.halfOpenCircuit();
  268. return true;
  269. }
  270. return false;
  271. }
  272. /**
  273. * Schedule a reconnection attempt after circuit timeout
  274. */
  275. private scheduleReconnection(): void {
  276. if (this.reconnectionScheduled) {
  277. return; // Already scheduled
  278. }
  279. this.reconnectionScheduled = true;
  280. this.logger.debug(
  281. `Scheduling reconnection attempt in ${this.circuitConfig.timeout}ms`,
  282. );
  283. setTimeout(async () => {
  284. this.reconnectionScheduled = false;
  285. if (this.circuitState === CircuitBreakerState.OPEN) {
  286. await this.halfOpenCircuit();
  287. }
  288. }, this.circuitConfig.timeout);
  289. }
  290. /**
  291. * Reconnect to RabbitMQ if connection or channel is closed/undefined
  292. */
  293. private async reconnectIfNeeded(): Promise<void> {
  294. // Check if reconnection is needed
  295. const connectionClosed =
  296. !this.connection || this.connection.connection?.destroyed;
  297. const channelClosed = !this.channel;
  298. if (!connectionClosed && !channelClosed) {
  299. this.logger.debug(
  300. 'Connection and channel are healthy, no reconnection needed',
  301. );
  302. return;
  303. }
  304. // Prevent concurrent reconnection attempts
  305. if (this.isReconnecting) {
  306. this.logger.debug('Reconnection already in progress, skipping');
  307. return;
  308. }
  309. this.isReconnecting = true;
  310. this.logger.log(
  311. `🔄 Starting RabbitMQ reconnection attempt (circuitState=${this.circuitState})...`,
  312. );
  313. try {
  314. // Get current URL from config
  315. const url = this.config.get<string>('RABBITMQ_URL');
  316. if (!url) {
  317. this.logger.error(
  318. '❌ Reconnection failed: RABBITMQ_URL is not set. Cannot reconnect to RabbitMQ.',
  319. );
  320. this.isReconnecting = false;
  321. return;
  322. }
  323. // Close existing connections if any
  324. try {
  325. await this.channel?.close();
  326. } catch (err) {
  327. // Ignore errors on close
  328. }
  329. try {
  330. await this.connection?.close();
  331. } catch (err) {
  332. // Ignore errors on close
  333. }
  334. // Clear references
  335. this.channel = undefined;
  336. this.connection = undefined;
  337. // Reinitialize connection
  338. this.logger.debug(`🔌 Reconnecting to RabbitMQ at ${url}...`);
  339. await this.initializeConnection(url);
  340. this.logger.log(
  341. `✅ RabbitMQ reconnection successful (hasConnection=${!!this.connection}, hasChannel=${!!this.channel})`,
  342. );
  343. this.isReconnecting = false;
  344. // Close circuit if reconnection succeeded
  345. if (this.circuitState === CircuitBreakerState.HALF_OPEN) {
  346. this.logger.log(
  347. 'Reconnection successful during HALF_OPEN, closing circuit',
  348. );
  349. this.closeCircuit();
  350. }
  351. } catch (error) {
  352. this.logger.error(
  353. `❌ RabbitMQ reconnection failed (circuitState=${this.circuitState}): ${error}`,
  354. error instanceof Error ? error.stack : undefined,
  355. );
  356. this.isReconnecting = false;
  357. // Keep circuit open on reconnection failure
  358. if (this.circuitState === CircuitBreakerState.HALF_OPEN) {
  359. this.logger.warn(
  360. '⚠️ Reconnection failed during HALF_OPEN, reopening circuit',
  361. );
  362. this.openCircuit();
  363. }
  364. }
  365. }
  366. /**
  367. * Check publisher-level idempotency: Has this message already been published?
  368. *
  369. * This prevents duplicate publishes from the publisher side within a 7-day window.
  370. * This is NOT end-to-end idempotency - consumers must perform their own duplicate
  371. * detection on the receiving end based on their business logic.
  372. *
  373. * Redis key format: rabbitmq:publish-idempotency:{messageId}
  374. * TTL: 7 days (604800 seconds)
  375. *
  376. * @param messageId - Unique message identifier (UUID)
  377. * @returns true if message was already published, false otherwise
  378. *
  379. * Note: On Redis errors, returns false (prefer duplicates over data loss)
  380. */
  381. private async checkIdempotency(messageId: string): Promise<boolean> {
  382. try {
  383. const key = `rabbitmq:publish-idempotency:${messageId}`;
  384. const exists = await this.redis.exists(key);
  385. return exists > 0;
  386. } catch (error) {
  387. this.logger.error(
  388. `Failed to check publish idempotency for ${messageId}: ${error}`,
  389. );
  390. // On Redis error, allow the message (better to have duplicate than lose data)
  391. return false;
  392. }
  393. }
  394. /**
  395. * Mark message as published (for publisher-level idempotency)
  396. *
  397. * Records that this messageId has been successfully published to RabbitMQ.
  398. * This prevents duplicate publishes from retry logic or circuit breaker recovery.
  399. *
  400. * Consumers still need to implement their own idempotency checks when processing
  401. * messages, as network issues or broker failures could cause duplicates downstream.
  402. *
  403. * Redis key format: rabbitmq:publish-idempotency:{messageId}
  404. * TTL: 7 days (604800 seconds)
  405. *
  406. * @param messageId - Unique message identifier (UUID)
  407. *
  408. * Note: Errors are logged but do not fail the publish operation
  409. */
  410. private async markAsProcessed(messageId: string): Promise<void> {
  411. try {
  412. const key = `rabbitmq:publish-idempotency:${messageId}`;
  413. await this.redis.set(key, '1', this.idempotencyTTL);
  414. } catch (error) {
  415. this.logger.error(
  416. `Failed to mark ${messageId} as published (idempotency): ${error}`,
  417. );
  418. }
  419. }
  420. /**
  421. * Store message in Redis fallback queue
  422. */
  423. private async storeInFallbackQueue(
  424. routingKey: string,
  425. payload: unknown,
  426. messageId: string,
  427. ): Promise<void> {
  428. try {
  429. const fallbackKey = `rabbitmq:fallback:${routingKey}:${messageId}`;
  430. await this.redis.setJson(fallbackKey, payload, 86400); // 24 hours TTL
  431. this.logger.warn(
  432. `Stored message ${messageId} in Redis fallback queue: ${fallbackKey}`,
  433. );
  434. } catch (error) {
  435. this.logger.error(
  436. `CRITICAL: Failed to store message ${messageId} in fallback queue: ${error}`,
  437. error instanceof Error ? error.stack : undefined,
  438. );
  439. }
  440. }
  441. /**
  442. * Send message to Dead Letter Queue
  443. */
  444. private async sendToDLQ(
  445. routingKey: string,
  446. payload: unknown,
  447. reason: string,
  448. ): Promise<void> {
  449. if (!this.channel) {
  450. this.logger.error(
  451. `Cannot send to DLQ: channel not available. Reason: ${reason}`,
  452. );
  453. return;
  454. }
  455. const dlqRoutingKey = `dlq.${routingKey}`;
  456. try {
  457. const payloadBuffer = this.toPayloadBuffer(payload);
  458. await new Promise<void>((resolve, reject) => {
  459. this.channel!.publish(
  460. this.dlqExchange,
  461. dlqRoutingKey,
  462. payloadBuffer,
  463. {
  464. persistent: true,
  465. contentType: 'application/json',
  466. headers: {
  467. 'x-death-reason': reason,
  468. 'x-death-timestamp': Date.now(),
  469. },
  470. },
  471. (err) => {
  472. if (err) {
  473. reject(err);
  474. } else {
  475. resolve();
  476. }
  477. },
  478. );
  479. });
  480. this.logger.warn(
  481. `Sent message to DLQ: exchange="${this.dlqExchange}", routingKey="${dlqRoutingKey}", queue="dlq.stats.events". Reason: ${reason}`,
  482. );
  483. } catch (error) {
  484. this.logger.error(
  485. `Failed to send message to DLQ (routingKey="${dlqRoutingKey}"): ${error}`,
  486. error instanceof Error ? error.stack : undefined,
  487. );
  488. }
  489. }
  490. /**
  491. * Retry logic with exponential backoff
  492. */
  493. private async retryPublish(
  494. publishFn: () => Promise<void>,
  495. context: string,
  496. ): Promise<void> {
  497. for (let attempt = 0; attempt < this.maxRetries; attempt++) {
  498. try {
  499. await publishFn();
  500. return; // Success
  501. } catch (error) {
  502. const isLastAttempt = attempt === this.maxRetries - 1;
  503. if (isLastAttempt) {
  504. this.logger.error(
  505. `Failed to publish after ${this.maxRetries} attempts (${context}): ${error}`,
  506. );
  507. throw error;
  508. }
  509. const delay = this.retryDelays[attempt];
  510. this.logger.warn(
  511. `Publish attempt ${attempt + 1} failed (${context}). Retrying in ${delay}ms...`,
  512. );
  513. await new Promise((resolve) => setTimeout(resolve, delay));
  514. }
  515. }
  516. }
  517. /**
  518. * Publish a user.login event.
  519. */
  520. async publishUserLogin(event: UserLoginEventPayload): Promise<void> {
  521. if (!this.channel) {
  522. this.logger.warn(
  523. 'RabbitMQ channel not ready. Skipping user.login publish.',
  524. );
  525. return;
  526. }
  527. const payloadBuffer = Buffer.from(JSON.stringify(event));
  528. return new Promise((resolve, reject) => {
  529. this.channel!.publish(
  530. this.exchange,
  531. this.routingKeyLogin,
  532. payloadBuffer,
  533. {
  534. persistent: true,
  535. contentType: 'application/json',
  536. },
  537. (err) => {
  538. if (err) {
  539. this.logger.error(
  540. `Failed to publish user.login event for uid=${event.uid}: ${err.message}`,
  541. err.stack,
  542. );
  543. return reject(err);
  544. }
  545. this.logger.debug(`Published user.login event for uid=${event.uid}`);
  546. resolve();
  547. },
  548. );
  549. });
  550. }
  551. /**
  552. * Publish an ads.click event.
  553. */
  554. async publishAdsClick(event: AdsClickEventPayload): Promise<void> {
  555. if (!this.channel) {
  556. this.logger.warn(
  557. 'RabbitMQ channel not ready. Skipping ads.click publish.',
  558. );
  559. return;
  560. }
  561. const payloadBuffer = Buffer.from(JSON.stringify(event));
  562. return new Promise((resolve, reject) => {
  563. this.channel!.publish(
  564. this.exchange,
  565. this.routingKeyAdsClick,
  566. payloadBuffer,
  567. {
  568. persistent: true,
  569. contentType: 'application/json',
  570. },
  571. (err) => {
  572. if (err) {
  573. this.logger.error(
  574. `Failed to publish ads.click event for adsId=${event.adsId}: ${err.message}`,
  575. err.stack,
  576. );
  577. return reject(err);
  578. }
  579. this.logger.debug(
  580. `Published ads.click event for adsId=${event.adsId}`,
  581. );
  582. resolve();
  583. },
  584. );
  585. });
  586. }
  587. /**
  588. * Publish stats.ad.click event with full error handling
  589. */
  590. async publishStatsAdClick(event: StatsAdClickEventPayload): Promise<void> {
  591. return this.publishStatsEventWithFallback(
  592. this.routingKeyStatsAdClick,
  593. event,
  594. event.messageId,
  595. `stats.ad.click adId=${event.adId}`,
  596. );
  597. }
  598. /**
  599. * Publish stats.video.click event with full error handling
  600. */
  601. async publishStatsVideoClick(
  602. event: StatsVideoClickEventPayload,
  603. ): Promise<void> {
  604. return this.publishStatsEventWithFallback(
  605. this.routingKeyStatsVideoClick,
  606. event,
  607. event.messageId,
  608. `stats.video.click videoId=${event.videoId}`,
  609. );
  610. }
  611. /**
  612. * Publish stats.ad.impression event with full error handling
  613. */
  614. async publishStatsAdImpression(
  615. event: StatsAdImpressionEventPayload,
  616. ): Promise<void> {
  617. return this.publishStatsEventWithFallback(
  618. this.routingKeyStatsAdImpression,
  619. event,
  620. event.messageId,
  621. `stats.ad.impression adId=${event.adId}`,
  622. );
  623. }
  624. /**
  625. * PUBLIC API for replaying messages from Redis fallback queue
  626. * Used by RabbitmqFallbackReplayService to republish failed messages
  627. *
  628. * IMPORTANT: This method will NOT store failed replays back to the fallback queue
  629. * to prevent infinite loops. Failed replays will only go to DLQ for manual inspection.
  630. *
  631. * @param routingKey - Original routing key (e.g., 'stats.ad.click')
  632. * @param payload - Original message payload
  633. * @param messageId - Original message ID (from payload.messageId)
  634. */
  635. async replayFallbackMessage(
  636. routingKey: string,
  637. payload: unknown,
  638. messageId: string,
  639. ): Promise<void> {
  640. // Use the same internal publish logic, but with a special context
  641. // to indicate this is a replay from fallback queue
  642. return this.publishStatsEventWithFallback(
  643. routingKey,
  644. payload,
  645. messageId,
  646. `fallback-replay routingKey=${routingKey}`,
  647. );
  648. }
  649. /**
  650. * Enhanced publish with circuit breaker, retry, fallback queue, DLQ, and idempotency
  651. */
  652. private async publishStatsEventWithFallback(
  653. routingKey: string,
  654. event: unknown,
  655. messageId: string,
  656. context: string,
  657. ): Promise<void> {
  658. // 1. Check idempotency
  659. const alreadyProcessed = await this.checkIdempotency(messageId);
  660. if (alreadyProcessed) {
  661. this.logger.debug(`Skipping duplicate message ${messageId} (${context})`);
  662. return;
  663. }
  664. // 2. Check circuit breaker
  665. if (!(await this.canAttempt())) {
  666. this.logger.warn(
  667. `Circuit breaker OPEN. Storing ${messageId} in fallback queue (${context})`,
  668. );
  669. await this.storeInFallbackQueue(routingKey, event, messageId);
  670. return;
  671. }
  672. // 3. Attempt to publish with retry logic
  673. try {
  674. await this.retryPublish(async () => {
  675. await this.publishStatsEvent(routingKey, event, context);
  676. }, context);
  677. // Success!
  678. this.recordSuccess();
  679. await this.markAsProcessed(messageId);
  680. this.logger.debug(`Successfully published ${messageId} (${context})`);
  681. } catch (error) {
  682. // All retries failed
  683. this.recordFailure();
  684. this.logger.error(
  685. `All retry attempts failed for ${messageId} (${context}): ${error}`,
  686. );
  687. // 4. Store in fallback queue
  688. await this.storeInFallbackQueue(routingKey, event, messageId);
  689. // 5. Send to DLQ for manual inspection
  690. await this.sendToDLQ(routingKey, event, `Max retries exceeded: ${error}`);
  691. // Don't throw error - fire-and-forget pattern
  692. }
  693. }
  694. /**
  695. * Core publish logic (used by retry mechanism)
  696. */
  697. private async publishStatsEvent(
  698. routingKey: string,
  699. event: unknown,
  700. context: string,
  701. ): Promise<void> {
  702. if (!this.channel) {
  703. throw new Error('RabbitMQ channel not ready');
  704. }
  705. const payloadBuffer = this.toPayloadBuffer(event);
  706. return new Promise((resolve, reject) => {
  707. this.channel!.publish(
  708. this.statsExchange,
  709. routingKey,
  710. payloadBuffer,
  711. {
  712. persistent: true,
  713. contentType: 'application/json',
  714. timestamp: Number(nowEpochMsBigInt()),
  715. expiration: this.messageTTL.toString(), // Message TTL
  716. },
  717. (err) => {
  718. if (err) {
  719. this.logger.error(
  720. `Failed to publish stats event (${context}): ${err.message}`,
  721. err.stack,
  722. );
  723. return reject(err);
  724. }
  725. this.logger.debug(
  726. `Published stats event (${context}) to ${this.statsExchange}/${routingKey}`,
  727. );
  728. resolve();
  729. },
  730. );
  731. });
  732. }
  733. private toPayloadBuffer(event: unknown): Buffer {
  734. const json = JSON.stringify(event, (_, value) =>
  735. typeof value === 'bigint' ? value.toString() : value,
  736. );
  737. return Buffer.from(json);
  738. }
  739. /**
  740. * Get circuit breaker status (for monitoring)
  741. */
  742. getCircuitStatus(): {
  743. state: CircuitBreakerState;
  744. failureCount: number;
  745. successCount: number;
  746. hasConnection: boolean;
  747. hasChannel: boolean;
  748. isReconnecting: boolean;
  749. nextAttemptTime: number;
  750. } {
  751. return {
  752. state: this.circuitState,
  753. failureCount: this.failureCount,
  754. successCount: this.successCount,
  755. hasConnection: !!(
  756. this.connection && !this.connection.connection?.destroyed
  757. ),
  758. hasChannel: !!this.channel,
  759. isReconnecting: this.isReconnecting,
  760. nextAttemptTime: this.nextAttemptTime,
  761. };
  762. }
  763. }