// apps/box-app-api/src/rabbitmq/rabbitmq-publisher.service.ts import { Injectable, Logger, OnModuleDestroy, OnModuleInit, } from '@nestjs/common'; import { ConfigService } from '@nestjs/config'; import { Connection, ConfirmChannel } from 'amqplib'; import * as amqp from 'amqplib'; import { UserLoginEventPayload } from '@box/common/events/user-login-event.dto'; import { AdsClickEventPayload } from '@box/common/events/ads-click-event.dto'; import { nowEpochMsBigInt } from '@box/common/time/time.util'; import { RedisService } from '@box/db/redis/redis.service'; type StatsAdClickRoutingKey = string; type StatsVideoClickRoutingKey = string; type StatsAdImpressionRoutingKey = string; export interface StatsAdClickEventPayload { messageId: string; uid: string; adId: string; adType: string; clickedAt: bigint; ip: string; channelId?: string; machine?: string; } export interface StatsVideoClickEventPayload { messageId: string; uid: string; videoId: string; clickedAt: bigint; ip: string; } export interface StatsAdImpressionEventPayload { messageId: string; uid: string; adId: string; adType: string; impressionAt: bigint; visibleDurationMs?: number; ip: string; channelId?: string; machine?: string; } // Circuit breaker states enum CircuitBreakerState { CLOSED = 'CLOSED', // Normal operation OPEN = 'OPEN', // Failing, reject requests HALF_OPEN = 'HALF_OPEN', // Testing if service recovered } interface CircuitBreakerConfig { failureThreshold: number; // Number of failures to open circuit successThreshold: number; // Number of successes to close circuit timeout: number; // Time in ms to wait before trying again (half-open) } /** * RabbitMQ Publisher Service * * Responsible for publishing analytics events to RabbitMQ with comprehensive error handling: * * PRIMARY RESPONSIBILITIES: * 1. Publish stats events (stats.ad.click, stats.video.click, stats.ad.impression) * 2. Publish user activity events (user.login, ads.click) with simplified resilience * 3. Maintain connection health with automatic reconnection * * RESILIENCE FEATURES: * - Circuit Breaker: Prevents overwhelming failed RabbitMQ (CLOSED/OPEN/HALF_OPEN states) * - Retry Logic: 3 attempts with exponential backoff (100ms, 500ms, 2000ms) * - Redis Fallback Queue: 24-hour TTL for stats events during outages * - Dead Letter Queue (DLQ): Manual inspection of permanently failed messages * - Publisher Idempotency: 7-day deduplication window using Redis (stats events only) * - Message TTL: 24-hour expiration for stats events * - Automatic Reconnection: Self-healing when RabbitMQ recovers * * EVENT TIERS: * - Tier 1 (Full Resilience): stats.* events with fallback/DLQ/idempotency * - Tier 2 (Partial Resilience): user.login, ads.click with circuit breaker + retries only * * OBSERVABILITY: * - Structured logs with circuit state, retry counts, and routing context * - Health status endpoint via getCircuitStatus() * - Clear warnings when events are dropped due to circuit breaker * * @see OBSERVABILITY_ENHANCEMENTS.md for monitoring guide * @see RABBITMQ_FALLBACK_REPLAY.md for recovery procedures */ @Injectable() export class RabbitmqPublisherService implements OnModuleInit, OnModuleDestroy { private readonly logger = new Logger(RabbitmqPublisherService.name); private connection?: Connection; private channel?: ConfirmChannel; private exchange!: string; private routingKeyLogin!: string; private routingKeyAdsClick!: string; private statsExchange!: string; private routingKeyStatsAdClick!: StatsAdClickRoutingKey; private routingKeyStatsVideoClick!: StatsVideoClickRoutingKey; private routingKeyStatsAdImpression!: StatsAdImpressionRoutingKey; private dlqExchange!: string; // Circuit breaker state private circuitState: CircuitBreakerState = CircuitBreakerState.CLOSED; private failureCount = 0; private successCount = 0; private nextAttemptTime = 0; private readonly circuitConfig: CircuitBreakerConfig = { failureThreshold: 5, // Open circuit after 5 failures successThreshold: 2, // Close circuit after 2 successes timeout: 60000, // Wait 60s before trying again }; // Reconnection state private isReconnecting = false; private reconnectionScheduled = false; // Retry configuration private readonly maxRetries = 3; private readonly retryDelays = [100, 500, 2000]; // Exponential backoff // Message TTL (24 hours for fallback queue) private readonly messageTTL = 86400000; // 24 hours in ms private readonly idempotencyTTL = 604800; // 7 days in seconds constructor( private readonly config: ConfigService, private readonly redis: RedisService, ) {} async onModuleInit(): Promise { const url = this.config.get('RABBITMQ_URL'); this.exchange = this.config.get('RABBITMQ_LOGIN_EXCHANGE') ?? 'stats.user'; this.routingKeyLogin = this.config.get('RABBITMQ_LOGIN_ROUTING_KEY') ?? 'user.login'; this.routingKeyAdsClick = this.config.get('RABBITMQ_ADS_CLICK_ROUTING_KEY') ?? 'ads.click'; this.statsExchange = this.config.get('RABBITMQ_STATS_EXCHANGE') ?? this.exchange; this.routingKeyStatsAdClick = this.config.get('RABBITMQ_STATS_AD_CLICK_ROUTING_KEY') ?? 'stats.ad.click'; this.routingKeyStatsVideoClick = this.config.get('RABBITMQ_STATS_VIDEO_CLICK_ROUTING_KEY') ?? 'stats.video.click'; this.routingKeyStatsAdImpression = this.config.get('RABBITMQ_STATS_AD_IMPRESSION_ROUTING_KEY') ?? 'stats.ad.impression'; this.dlqExchange = this.config.get('RABBITMQ_DLQ_EXCHANGE') ?? 'dlq.stats'; if (!url) { this.logger.error( 'RABBITMQ_URL is not set. Stats will be stored in Redis fallback queue only.', ); this.circuitState = CircuitBreakerState.OPEN; return; } try { this.logger.log(`Connecting to RabbitMQ at ${url} ...`); await this.initializeConnection(url); this.logger.log('RabbitMQ connection initialized successfully'); } catch (error) { this.logger.error( `Failed to initialize RabbitMQ connection: ${error}`, error instanceof Error ? error.stack : undefined, ); this.circuitState = CircuitBreakerState.OPEN; this.nextAttemptTime = Date.now() + this.circuitConfig.timeout; } } private async initializeConnection(url: string): Promise { this.connection = await amqp.connect(url); // Handle connection errors this.connection.on('error', (err) => { this.logger.error('RabbitMQ connection error:', err); this.openCircuit(); }); this.connection.on('close', () => { this.logger.warn('RabbitMQ connection closed'); this.openCircuit(); }); // Use a confirm channel so we know when broker has accepted the message this.channel = await this.connection.createConfirmChannel(); // Handle channel errors this.channel.on('error', (err) => { this.logger.error('RabbitMQ channel error:', err); this.openCircuit(); }); this.channel.on('close', () => { this.logger.warn('RabbitMQ channel closed'); this.openCircuit(); }); // Assert exchanges with DLQ await this.channel.assertExchange(this.exchange, 'topic', { durable: true, }); if (this.statsExchange !== this.exchange) { await this.channel.assertExchange(this.statsExchange, 'topic', { durable: true, }); } // Assert Dead Letter Exchange await this.channel.assertExchange(this.dlqExchange, 'topic', { durable: true, }); // Assert DLQ queue for stats events await this.channel.assertQueue('dlq.stats.events', { durable: true, arguments: { 'x-message-ttl': this.messageTTL, // Messages expire after 24 hours 'x-max-length': 100000, // Maximum 100k messages in DLQ }, }); // Bind DLQ queue to DLQ exchange // Routing convention: sendToDLQ() publishes with 'dlq.{original-routing-key}' format // Examples: dlq.stats.ad.click, dlq.stats.video.click, dlq.stats.ad.impression // Pattern 'dlq.#' matches all DLQ messages regardless of their original routing key await this.channel.bindQueue('dlq.stats.events', this.dlqExchange, 'dlq.#'); this.logger.log( `RabbitMQ publisher ready. exchange="${this.exchange}", statsExchange="${this.statsExchange}", dlqExchange="${this.dlqExchange}"`, ); } async onModuleDestroy(): Promise { try { await this.channel?.close(); await this.connection?.close(); } catch (error: any) { this.logger.error('Error while closing RabbitMQ connection', error.stack); } } /** * Circuit breaker: Open circuit (stop attempting to send to RabbitMQ) */ private openCircuit(): void { if (this.circuitState !== CircuitBreakerState.OPEN) { this.logger.warn( `Circuit breaker OPENED (failureCount=${this.failureCount}, successCount=${this.successCount}). Will retry after ${this.circuitConfig.timeout}ms`, ); this.circuitState = CircuitBreakerState.OPEN; this.failureCount = 0; this.successCount = 0; this.nextAttemptTime = Date.now() + this.circuitConfig.timeout; // Schedule reconnection attempt this.scheduleReconnection(); } } /** * Circuit breaker: Move to half-open state (test if service recovered) */ private async halfOpenCircuit(): Promise { this.logger.log( `Circuit breaker HALF-OPEN (failureCount=${this.failureCount}, successCount=${this.successCount}). Testing connection...`, ); this.circuitState = CircuitBreakerState.HALF_OPEN; this.successCount = 0; // Attempt reconnection before allowing publish attempts await this.reconnectIfNeeded(); } /** * Circuit breaker: Close circuit (resume normal operation) */ private closeCircuit(): void { this.logger.log( `Circuit breaker CLOSED (failureCount=${this.failureCount}, successCount=${this.successCount}). Resuming normal operation.`, ); this.circuitState = CircuitBreakerState.CLOSED; this.failureCount = 0; this.successCount = 0; } /** * Record successful publish (for circuit breaker) */ private recordSuccess(): void { this.failureCount = 0; if (this.circuitState === CircuitBreakerState.HALF_OPEN) { this.successCount++; if (this.successCount >= this.circuitConfig.successThreshold) { this.closeCircuit(); } } } /** * Record failed publish (for circuit breaker) */ private recordFailure(): void { this.failureCount++; if (this.circuitState === CircuitBreakerState.HALF_OPEN) { this.openCircuit(); } else if ( this.circuitState === CircuitBreakerState.CLOSED && this.failureCount >= this.circuitConfig.failureThreshold ) { this.openCircuit(); } } /** * Check if circuit breaker allows request */ private async canAttempt(): Promise { if (this.circuitState === CircuitBreakerState.CLOSED) { return true; } if (this.circuitState === CircuitBreakerState.HALF_OPEN) { return true; } // OPEN state: check if timeout elapsed if (Date.now() >= this.nextAttemptTime) { await this.halfOpenCircuit(); return true; } return false; } /** * Schedule a reconnection attempt after circuit timeout */ private scheduleReconnection(): void { if (this.reconnectionScheduled) { return; // Already scheduled } this.reconnectionScheduled = true; this.logger.debug( `Scheduling reconnection attempt in ${this.circuitConfig.timeout}ms`, ); setTimeout(async () => { this.reconnectionScheduled = false; if (this.circuitState === CircuitBreakerState.OPEN) { await this.halfOpenCircuit(); } }, this.circuitConfig.timeout); } /** * Reconnect to RabbitMQ if connection or channel is closed/undefined */ private async reconnectIfNeeded(): Promise { // Check if reconnection is needed const connectionClosed = !this.connection || this.connection.connection?.destroyed; const channelClosed = !this.channel; if (!connectionClosed && !channelClosed) { this.logger.debug( 'Connection and channel are healthy, no reconnection needed', ); return; } // Prevent concurrent reconnection attempts if (this.isReconnecting) { this.logger.debug('Reconnection already in progress, skipping'); return; } this.isReconnecting = true; this.logger.log( `🔄 Starting RabbitMQ reconnection attempt (circuitState=${this.circuitState})...`, ); try { // Get current URL from config const url = this.config.get('RABBITMQ_URL'); if (!url) { this.logger.error( '❌ Reconnection failed: RABBITMQ_URL is not set. Cannot reconnect to RabbitMQ.', ); this.isReconnecting = false; return; } // Close existing connections if any try { await this.channel?.close(); } catch (err) { // Ignore errors on close } try { await this.connection?.close(); } catch (err) { // Ignore errors on close } // Clear references this.channel = undefined; this.connection = undefined; // Reinitialize connection this.logger.debug(`🔌 Reconnecting to RabbitMQ at ${url}...`); await this.initializeConnection(url); this.logger.log( `✅ RabbitMQ reconnection successful (hasConnection=${!!this.connection}, hasChannel=${!!this.channel})`, ); this.isReconnecting = false; // Close circuit if reconnection succeeded if (this.circuitState === CircuitBreakerState.HALF_OPEN) { this.logger.log( 'Reconnection successful during HALF_OPEN, closing circuit', ); this.closeCircuit(); } } catch (error) { this.logger.error( `❌ RabbitMQ reconnection failed (circuitState=${this.circuitState}): ${error}`, error instanceof Error ? error.stack : undefined, ); this.isReconnecting = false; // Keep circuit open on reconnection failure if (this.circuitState === CircuitBreakerState.HALF_OPEN) { this.logger.warn( '⚠️ Reconnection failed during HALF_OPEN, reopening circuit', ); this.openCircuit(); } } } /** * Check publisher-level idempotency: Has this message already been published? * * This prevents duplicate publishes from the publisher side within a 7-day window. * This is NOT end-to-end idempotency - consumers must perform their own duplicate * detection on the receiving end based on their business logic. * * Redis key format: rabbitmq:publish-idempotency:{messageId} * TTL: 7 days (604800 seconds) * * @param messageId - Unique message identifier (UUID) * @returns true if message was already published, false otherwise * * Note: On Redis errors, returns false (prefer duplicates over data loss) */ private async checkIdempotency(messageId: string): Promise { try { const key = `rabbitmq:publish-idempotency:${messageId}`; const exists = await this.redis.exists(key); return exists > 0; } catch (error) { this.logger.error( `Failed to check publish idempotency for ${messageId}: ${error}`, ); // On Redis error, allow the message (better to have duplicate than lose data) return false; } } /** * Mark message as published (for publisher-level idempotency) * * Records that this messageId has been successfully published to RabbitMQ. * This prevents duplicate publishes from retry logic or circuit breaker recovery. * * Consumers still need to implement their own idempotency checks when processing * messages, as network issues or broker failures could cause duplicates downstream. * * Redis key format: rabbitmq:publish-idempotency:{messageId} * TTL: 7 days (604800 seconds) * * @param messageId - Unique message identifier (UUID) * * Note: Errors are logged but do not fail the publish operation */ private async markAsProcessed(messageId: string): Promise { try { const key = `rabbitmq:publish-idempotency:${messageId}`; await this.redis.set(key, '1', this.idempotencyTTL); } catch (error) { this.logger.error( `Failed to mark ${messageId} as published (idempotency): ${error}`, ); } } /** * Store message in Redis fallback queue */ private async storeInFallbackQueue( routingKey: string, payload: unknown, messageId: string, ): Promise { try { const fallbackKey = `rabbitmq:fallback:${routingKey}:${messageId}`; await this.redis.setJson(fallbackKey, payload, 86400); // 24 hours TTL this.logger.warn( `Stored message ${messageId} in Redis fallback queue: ${fallbackKey}`, ); } catch (error) { this.logger.error( `CRITICAL: Failed to store message ${messageId} in fallback queue: ${error}`, error instanceof Error ? error.stack : undefined, ); } } /** * Send message to Dead Letter Queue */ private async sendToDLQ( routingKey: string, payload: unknown, reason: string, ): Promise { if (!this.channel) { this.logger.error( `Cannot send to DLQ: channel not available. Reason: ${reason}`, ); return; } const dlqRoutingKey = `dlq.${routingKey}`; try { const payloadBuffer = this.toPayloadBuffer(payload); await new Promise((resolve, reject) => { this.channel!.publish( this.dlqExchange, dlqRoutingKey, payloadBuffer, { persistent: true, contentType: 'application/json', headers: { 'x-death-reason': reason, 'x-death-timestamp': Date.now(), }, }, (err) => { if (err) { reject(err); } else { resolve(); } }, ); }); this.logger.warn( `Sent message to DLQ: exchange="${this.dlqExchange}", routingKey="${dlqRoutingKey}", queue="dlq.stats.events". Reason: ${reason}`, ); } catch (error) { this.logger.error( `Failed to send message to DLQ (routingKey="${dlqRoutingKey}"): ${error}`, error instanceof Error ? error.stack : undefined, ); } } /** * Retry logic with exponential backoff */ private async retryPublish( publishFn: () => Promise, context: string, ): Promise { for (let attempt = 0; attempt < this.maxRetries; attempt++) { try { await publishFn(); return; // Success } catch (error) { const isLastAttempt = attempt === this.maxRetries - 1; if (isLastAttempt) { this.logger.error( `Failed to publish after ${this.maxRetries} attempts (${context}): ${error}`, ); throw error; } const delay = this.retryDelays[attempt]; this.logger.warn( `Publish attempt ${attempt + 1} failed (${context}). Retrying in ${delay}ms...`, ); await new Promise((resolve) => setTimeout(resolve, delay)); } } } /** * Publish a user.login event. * * This is a less-critical event compared to stats events: * - Uses circuit breaker to avoid overwhelming failed RabbitMQ * - Includes retry logic (3 attempts with exponential backoff) * - Does NOT use Redis fallback or idempotency (keep it simple) * - Drops events when circuit is OPEN with clear warning logs */ async publishUserLogin(event: UserLoginEventPayload): Promise { // Check circuit breaker before attempting publish if (!(await this.canAttempt())) { this.logger.warn( `⚠️ Circuit breaker OPEN. Dropping user.login event for uid=${event.uid} (non-critical event, no fallback)`, ); return; // Drop event silently to maintain fire-and-forget behavior } const context = `user.login uid=${event.uid}`; try { await this.retryPublish(async () => { await this.publishUserLoginCore(event); }, context); // Success! this.recordSuccess(); this.logger.debug(`Published user.login event for uid=${event.uid}`); } catch (error) { // All retries failed this.recordFailure(); this.logger.error( `Failed to publish user.login after ${this.maxRetries} retries for uid=${event.uid}: ${error}`, ); // Don't throw - maintain fire-and-forget behavior } } /** * Core user.login publish logic (used by retry wrapper) */ private async publishUserLoginCore( event: UserLoginEventPayload, ): Promise { if (!this.channel) { throw new Error('RabbitMQ channel not ready'); } const payloadBuffer = Buffer.from(JSON.stringify(event)); return new Promise((resolve, reject) => { this.channel!.publish( this.exchange, this.routingKeyLogin, payloadBuffer, { persistent: true, contentType: 'application/json', }, (err) => { if (err) { return reject(err); } resolve(); }, ); }); } /** * Publish an ads.click event. * * This is a less-critical event compared to stats events: * - Uses circuit breaker to avoid overwhelming failed RabbitMQ * - Includes retry logic (3 attempts with exponential backoff) * - Does NOT use Redis fallback or idempotency (keep it simple) * - Drops events when circuit is OPEN with clear warning logs */ async publishAdsClick(event: AdsClickEventPayload): Promise { // Check circuit breaker before attempting publish if (!(await this.canAttempt())) { this.logger.warn( `⚠️ Circuit breaker OPEN. Dropping ads.click event for adsId=${event.adsId} (non-critical event, no fallback)`, ); return; // Drop event silently to maintain fire-and-forget behavior } const context = `ads.click adsId=${event.adsId}`; try { await this.retryPublish(async () => { await this.publishAdsClickCore(event); }, context); // Success! this.recordSuccess(); this.logger.debug(`Published ads.click event for adsId=${event.adsId}`); } catch (error) { // All retries failed this.recordFailure(); this.logger.error( `Failed to publish ads.click after ${this.maxRetries} retries for adsId=${event.adsId}: ${error}`, ); // Don't throw - maintain fire-and-forget behavior } } /** * Core ads.click publish logic (used by retry wrapper) */ private async publishAdsClickCore( event: AdsClickEventPayload, ): Promise { if (!this.channel) { throw new Error('RabbitMQ channel not ready'); } const payloadBuffer = Buffer.from(JSON.stringify(event)); return new Promise((resolve, reject) => { this.channel!.publish( this.exchange, this.routingKeyAdsClick, payloadBuffer, { persistent: true, contentType: 'application/json', }, (err) => { if (err) { return reject(err); } resolve(); }, ); }); } /** * Publish stats.ad.click event with full error handling */ async publishStatsAdClick(event: StatsAdClickEventPayload): Promise { return this.publishStatsEventWithFallback( this.routingKeyStatsAdClick, event, event.messageId, `stats.ad.click adId=${event.adId}`, ); } /** * Publish stats.video.click event with full error handling */ async publishStatsVideoClick( event: StatsVideoClickEventPayload, ): Promise { return this.publishStatsEventWithFallback( this.routingKeyStatsVideoClick, event, event.messageId, `stats.video.click videoId=${event.videoId}`, ); } /** * Publish stats.ad.impression event with full error handling */ async publishStatsAdImpression( event: StatsAdImpressionEventPayload, ): Promise { return this.publishStatsEventWithFallback( this.routingKeyStatsAdImpression, event, event.messageId, `stats.ad.impression adId=${event.adId}`, ); } /** * PUBLIC API for replaying messages from Redis fallback queue * Used by RabbitmqFallbackReplayService to republish failed messages * * IMPORTANT: This method will NOT store failed replays back to the fallback queue * to prevent infinite loops. Failed replays will only go to DLQ for manual inspection. * * @param routingKey - Original routing key (e.g., 'stats.ad.click') * @param payload - Original message payload * @param messageId - Original message ID (from payload.messageId) */ async replayFallbackMessage( routingKey: string, payload: unknown, messageId: string, ): Promise { // Use the same internal publish logic, but with a special context // to indicate this is a replay from fallback queue return this.publishStatsEventWithFallback( routingKey, payload, messageId, `fallback-replay routingKey=${routingKey}`, ); } /** * Enhanced publish with circuit breaker, retry, fallback queue, DLQ, and idempotency */ private async publishStatsEventWithFallback( routingKey: string, event: unknown, messageId: string, context: string, ): Promise { // 1. Check idempotency const alreadyProcessed = await this.checkIdempotency(messageId); if (alreadyProcessed) { this.logger.debug(`Skipping duplicate message ${messageId} (${context})`); return; } // 2. Check circuit breaker if (!(await this.canAttempt())) { this.logger.warn( `Circuit breaker OPEN. Storing messageId=${messageId} in fallback queue (${context}, routingKey=${routingKey})`, ); await this.storeInFallbackQueue(routingKey, event, messageId); return; } // 3. Attempt to publish with retry logic try { await this.retryPublish(async () => { await this.publishStatsEvent(routingKey, event, context); }, context); // Success! this.recordSuccess(); await this.markAsProcessed(messageId); this.logger.debug(`Successfully published ${messageId} (${context})`); } catch (error) { // All retries failed this.recordFailure(); this.logger.error( `All retry attempts failed for messageId=${messageId} (${context}, routingKey=${routingKey}): ${error}`, ); // 4. Store in fallback queue await this.storeInFallbackQueue(routingKey, event, messageId); // 5. Send to DLQ for manual inspection await this.sendToDLQ(routingKey, event, `Max retries exceeded: ${error}`); // Don't throw error - fire-and-forget pattern } } /** * Core publish logic (used by retry mechanism) */ private async publishStatsEvent( routingKey: string, event: unknown, context: string, ): Promise { if (!this.channel) { throw new Error('RabbitMQ channel not ready'); } const payloadBuffer = this.toPayloadBuffer(event); // Safely convert BigInt timestamp to Number for RabbitMQ // Note: RabbitMQ expects milliseconds since epoch as Number const timestamp = this.safeNumberFromBigInt( nowEpochMsBigInt(), 'timestamp', ); return new Promise((resolve, reject) => { this.channel!.publish( this.statsExchange, routingKey, payloadBuffer, { persistent: true, contentType: 'application/json', timestamp, expiration: this.messageTTL.toString(), // Message TTL }, (err) => { if (err) { this.logger.error( `Failed to publish stats event (${context}, routingKey=${routingKey}): ${err.message}`, err.stack, ); return reject(err); } this.logger.debug( `Published stats event (${context}) to ${this.statsExchange}/${routingKey}`, ); resolve(); }, ); }); } /** * Convert event payload to Buffer for RabbitMQ publish * * BigInt Handling: * - JSON.stringify does not natively support BigInt (throws TypeError) * - We use a replacer function to convert BigInt → string * - This ensures timestamps like clickedAt, impressionAt serialize correctly * - Consumer side must parse these string timestamps back to appropriate numeric types * * Security Note: * - Payload size is not explicitly limited here * - RabbitMQ has max message size (default 128MB) * - Consider adding size checks if payloads grow unexpectedly large */ private toPayloadBuffer(event: unknown): Buffer { const json = JSON.stringify(event, (key, value) => { // Convert BigInt to string because JSON.stringify doesn't support BigInt if (typeof value === 'bigint') { // Defensive check: warn if BigInt is suspiciously large // JavaScript Number.MAX_SAFE_INTEGER = 2^53 - 1 = 9007199254740991 // Timestamps in ms are ~13 digits, so 16+ digits might indicate corruption if (value > BigInt(Number.MAX_SAFE_INTEGER) * BigInt(1000)) { this.logger.warn( `Suspiciously large BigInt in payload: key=${key}, value=${value.toString()}`, ); } return value.toString(); } return value; }); return Buffer.from(json); } /** * Safely convert BigInt to Number with overflow detection * * JavaScript Number type uses double-precision (53-bit mantissa): * - Safe integer range: -(2^53 - 1) to (2^53 - 1) * - Timestamps in milliseconds since epoch fit easily (13-14 digits) * - But corruption or bugs could produce unsafe values * * @param value - BigInt value to convert * @param context - Context for error logging (e.g., 'timestamp', 'count') * @returns Number representation, or 0 if unsafe */ private safeNumberFromBigInt(value: bigint, context: string): number { const MAX_SAFE = BigInt(Number.MAX_SAFE_INTEGER); const MIN_SAFE = BigInt(Number.MIN_SAFE_INTEGER); if (value > MAX_SAFE || value < MIN_SAFE) { this.logger.warn( `BigInt value out of safe range for ${context}: ${value.toString()}. Using 0 as fallback.`, ); return 0; // Safe fallback } return Number(value); } /** * Get circuit breaker status (for monitoring) */ getCircuitStatus(): { state: CircuitBreakerState; failureCount: number; successCount: number; hasConnection: boolean; hasChannel: boolean; isReconnecting: boolean; nextAttemptTime: number; } { return { state: this.circuitState, failureCount: this.failureCount, successCount: this.successCount, hasConnection: !!( this.connection && !this.connection.connection?.destroyed ), hasChannel: !!this.channel, isReconnecting: this.isReconnecting, nextAttemptTime: this.nextAttemptTime, }; } }