|
|
@@ -57,6 +57,37 @@ interface CircuitBreakerConfig {
|
|
|
timeout: number; // Time in ms to wait before trying again (half-open)
|
|
|
}
|
|
|
|
|
|
+/**
|
|
|
+ * RabbitMQ Publisher Service
|
|
|
+ *
|
|
|
+ * Responsible for publishing analytics events to RabbitMQ with comprehensive error handling:
|
|
|
+ *
|
|
|
+ * PRIMARY RESPONSIBILITIES:
|
|
|
+ * 1. Publish stats events (stats.ad.click, stats.video.click, stats.ad.impression)
|
|
|
+ * 2. Publish user activity events (user.login, ads.click) with simplified resilience
|
|
|
+ * 3. Maintain connection health with automatic reconnection
|
|
|
+ *
|
|
|
+ * RESILIENCE FEATURES:
|
|
|
+ * - Circuit Breaker: Prevents overwhelming failed RabbitMQ (CLOSED/OPEN/HALF_OPEN states)
|
|
|
+ * - Retry Logic: 3 attempts with exponential backoff (100ms, 500ms, 2000ms)
|
|
|
+ * - Redis Fallback Queue: 24-hour TTL for stats events during outages
|
|
|
+ * - Dead Letter Queue (DLQ): Manual inspection of permanently failed messages
|
|
|
+ * - Publisher Idempotency: 7-day deduplication window using Redis (stats events only)
|
|
|
+ * - Message TTL: 24-hour expiration for stats events
|
|
|
+ * - Automatic Reconnection: Self-healing when RabbitMQ recovers
|
|
|
+ *
|
|
|
+ * EVENT TIERS:
|
|
|
+ * - Tier 1 (Full Resilience): stats.* events with fallback/DLQ/idempotency
|
|
|
+ * - Tier 2 (Partial Resilience): user.login, ads.click with circuit breaker + retries only
|
|
|
+ *
|
|
|
+ * OBSERVABILITY:
|
|
|
+ * - Structured logs with circuit state, retry counts, and routing context
|
|
|
+ * - Health status endpoint via getCircuitStatus()
|
|
|
+ * - Clear warnings when events are dropped due to circuit breaker
|
|
|
+ *
|
|
|
+ * @see OBSERVABILITY_ENHANCEMENTS.md for monitoring guide
|
|
|
+ * @see RABBITMQ_FALLBACK_REPLAY.md for recovery procedures
|
|
|
+ */
|
|
|
@Injectable()
|
|
|
export class RabbitmqPublisherService implements OnModuleInit, OnModuleDestroy {
|
|
|
private readonly logger = new Logger(RabbitmqPublisherService.name);
|
|
|
@@ -585,13 +616,50 @@ export class RabbitmqPublisherService implements OnModuleInit, OnModuleDestroy {
|
|
|
|
|
|
/**
|
|
|
* Publish a user.login event.
|
|
|
+ *
|
|
|
+ * This is a less-critical event compared to stats events:
|
|
|
+ * - Uses circuit breaker to avoid overwhelming failed RabbitMQ
|
|
|
+ * - Includes retry logic (3 attempts with exponential backoff)
|
|
|
+ * - Does NOT use Redis fallback or idempotency (keep it simple)
|
|
|
+ * - Drops events when circuit is OPEN with clear warning logs
|
|
|
*/
|
|
|
async publishUserLogin(event: UserLoginEventPayload): Promise<void> {
|
|
|
- if (!this.channel) {
|
|
|
+ // Check circuit breaker before attempting publish
|
|
|
+ if (!(await this.canAttempt())) {
|
|
|
this.logger.warn(
|
|
|
- 'RabbitMQ channel not ready. Skipping user.login publish.',
|
|
|
+ `⚠️ Circuit breaker OPEN. Dropping user.login event for uid=${event.uid} (non-critical event, no fallback)`,
|
|
|
);
|
|
|
- return;
|
|
|
+ return; // Drop event silently to maintain fire-and-forget behavior
|
|
|
+ }
|
|
|
+
|
|
|
+ const context = `user.login uid=${event.uid}`;
|
|
|
+
|
|
|
+ try {
|
|
|
+ await this.retryPublish(async () => {
|
|
|
+ await this.publishUserLoginCore(event);
|
|
|
+ }, context);
|
|
|
+
|
|
|
+ // Success!
|
|
|
+ this.recordSuccess();
|
|
|
+ this.logger.debug(`Published user.login event for uid=${event.uid}`);
|
|
|
+ } catch (error) {
|
|
|
+ // All retries failed
|
|
|
+ this.recordFailure();
|
|
|
+ this.logger.error(
|
|
|
+ `Failed to publish user.login after ${this.maxRetries} retries for uid=${event.uid}: ${error}`,
|
|
|
+ );
|
|
|
+ // Don't throw - maintain fire-and-forget behavior
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Core user.login publish logic (used by retry wrapper)
|
|
|
+ */
|
|
|
+ private async publishUserLoginCore(
|
|
|
+ event: UserLoginEventPayload,
|
|
|
+ ): Promise<void> {
|
|
|
+ if (!this.channel) {
|
|
|
+ throw new Error('RabbitMQ channel not ready');
|
|
|
}
|
|
|
|
|
|
const payloadBuffer = Buffer.from(JSON.stringify(event));
|
|
|
@@ -607,13 +675,8 @@ export class RabbitmqPublisherService implements OnModuleInit, OnModuleDestroy {
|
|
|
},
|
|
|
(err) => {
|
|
|
if (err) {
|
|
|
- this.logger.error(
|
|
|
- `Failed to publish user.login event for uid=${event.uid}: ${err.message}`,
|
|
|
- err.stack,
|
|
|
- );
|
|
|
return reject(err);
|
|
|
}
|
|
|
- this.logger.debug(`Published user.login event for uid=${event.uid}`);
|
|
|
resolve();
|
|
|
},
|
|
|
);
|
|
|
@@ -622,13 +685,50 @@ export class RabbitmqPublisherService implements OnModuleInit, OnModuleDestroy {
|
|
|
|
|
|
/**
|
|
|
* Publish an ads.click event.
|
|
|
+ *
|
|
|
+ * This is a less-critical event compared to stats events:
|
|
|
+ * - Uses circuit breaker to avoid overwhelming failed RabbitMQ
|
|
|
+ * - Includes retry logic (3 attempts with exponential backoff)
|
|
|
+ * - Does NOT use Redis fallback or idempotency (keep it simple)
|
|
|
+ * - Drops events when circuit is OPEN with clear warning logs
|
|
|
*/
|
|
|
async publishAdsClick(event: AdsClickEventPayload): Promise<void> {
|
|
|
- if (!this.channel) {
|
|
|
+ // Check circuit breaker before attempting publish
|
|
|
+ if (!(await this.canAttempt())) {
|
|
|
this.logger.warn(
|
|
|
- 'RabbitMQ channel not ready. Skipping ads.click publish.',
|
|
|
+ `⚠️ Circuit breaker OPEN. Dropping ads.click event for adsId=${event.adsId} (non-critical event, no fallback)`,
|
|
|
);
|
|
|
- return;
|
|
|
+ return; // Drop event silently to maintain fire-and-forget behavior
|
|
|
+ }
|
|
|
+
|
|
|
+ const context = `ads.click adsId=${event.adsId}`;
|
|
|
+
|
|
|
+ try {
|
|
|
+ await this.retryPublish(async () => {
|
|
|
+ await this.publishAdsClickCore(event);
|
|
|
+ }, context);
|
|
|
+
|
|
|
+ // Success!
|
|
|
+ this.recordSuccess();
|
|
|
+ this.logger.debug(`Published ads.click event for adsId=${event.adsId}`);
|
|
|
+ } catch (error) {
|
|
|
+ // All retries failed
|
|
|
+ this.recordFailure();
|
|
|
+ this.logger.error(
|
|
|
+ `Failed to publish ads.click after ${this.maxRetries} retries for adsId=${event.adsId}: ${error}`,
|
|
|
+ );
|
|
|
+ // Don't throw - maintain fire-and-forget behavior
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Core ads.click publish logic (used by retry wrapper)
|
|
|
+ */
|
|
|
+ private async publishAdsClickCore(
|
|
|
+ event: AdsClickEventPayload,
|
|
|
+ ): Promise<void> {
|
|
|
+ if (!this.channel) {
|
|
|
+ throw new Error('RabbitMQ channel not ready');
|
|
|
}
|
|
|
|
|
|
const payloadBuffer = Buffer.from(JSON.stringify(event));
|
|
|
@@ -644,15 +744,8 @@ export class RabbitmqPublisherService implements OnModuleInit, OnModuleDestroy {
|
|
|
},
|
|
|
(err) => {
|
|
|
if (err) {
|
|
|
- this.logger.error(
|
|
|
- `Failed to publish ads.click event for adsId=${event.adsId}: ${err.message}`,
|
|
|
- err.stack,
|
|
|
- );
|
|
|
return reject(err);
|
|
|
}
|
|
|
- this.logger.debug(
|
|
|
- `Published ads.click event for adsId=${event.adsId}`,
|
|
|
- );
|
|
|
resolve();
|
|
|
},
|
|
|
);
|
|
|
@@ -744,7 +837,7 @@ export class RabbitmqPublisherService implements OnModuleInit, OnModuleDestroy {
|
|
|
// 2. Check circuit breaker
|
|
|
if (!(await this.canAttempt())) {
|
|
|
this.logger.warn(
|
|
|
- `Circuit breaker OPEN. Storing ${messageId} in fallback queue (${context})`,
|
|
|
+ `Circuit breaker OPEN. Storing messageId=${messageId} in fallback queue (${context}, routingKey=${routingKey})`,
|
|
|
);
|
|
|
await this.storeInFallbackQueue(routingKey, event, messageId);
|
|
|
return;
|
|
|
@@ -765,7 +858,7 @@ export class RabbitmqPublisherService implements OnModuleInit, OnModuleDestroy {
|
|
|
this.recordFailure();
|
|
|
|
|
|
this.logger.error(
|
|
|
- `All retry attempts failed for ${messageId} (${context}): ${error}`,
|
|
|
+ `All retry attempts failed for messageId=${messageId} (${context}, routingKey=${routingKey}): ${error}`,
|
|
|
);
|
|
|
|
|
|
// 4. Store in fallback queue
|
|
|
@@ -792,6 +885,13 @@ export class RabbitmqPublisherService implements OnModuleInit, OnModuleDestroy {
|
|
|
|
|
|
const payloadBuffer = this.toPayloadBuffer(event);
|
|
|
|
|
|
+ // Safely convert BigInt timestamp to Number for RabbitMQ
|
|
|
+ // Note: RabbitMQ expects milliseconds since epoch as Number
|
|
|
+ const timestamp = this.safeNumberFromBigInt(
|
|
|
+ nowEpochMsBigInt(),
|
|
|
+ 'timestamp',
|
|
|
+ );
|
|
|
+
|
|
|
return new Promise((resolve, reject) => {
|
|
|
this.channel!.publish(
|
|
|
this.statsExchange,
|
|
|
@@ -800,13 +900,13 @@ export class RabbitmqPublisherService implements OnModuleInit, OnModuleDestroy {
|
|
|
{
|
|
|
persistent: true,
|
|
|
contentType: 'application/json',
|
|
|
- timestamp: Number(nowEpochMsBigInt()),
|
|
|
+ timestamp,
|
|
|
expiration: this.messageTTL.toString(), // Message TTL
|
|
|
},
|
|
|
(err) => {
|
|
|
if (err) {
|
|
|
this.logger.error(
|
|
|
- `Failed to publish stats event (${context}): ${err.message}`,
|
|
|
+ `Failed to publish stats event (${context}, routingKey=${routingKey}): ${err.message}`,
|
|
|
err.stack,
|
|
|
);
|
|
|
return reject(err);
|
|
|
@@ -820,14 +920,66 @@ export class RabbitmqPublisherService implements OnModuleInit, OnModuleDestroy {
|
|
|
});
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Convert event payload to Buffer for RabbitMQ publish
|
|
|
+ *
|
|
|
+ * BigInt Handling:
|
|
|
+ * - JSON.stringify does not natively support BigInt (throws TypeError)
|
|
|
+ * - We use a replacer function to convert BigInt → string
|
|
|
+ * - This ensures timestamps like clickedAt, impressionAt serialize correctly
|
|
|
+ * - Consumer side must parse these string timestamps back to appropriate numeric types
|
|
|
+ *
|
|
|
+ * Security Note:
|
|
|
+ * - Payload size is not explicitly limited here
|
|
|
+ * - RabbitMQ has max message size (default 128MB)
|
|
|
+ * - Consider adding size checks if payloads grow unexpectedly large
|
|
|
+ */
|
|
|
private toPayloadBuffer(event: unknown): Buffer {
|
|
|
- const json = JSON.stringify(event, (_, value) =>
|
|
|
- typeof value === 'bigint' ? value.toString() : value,
|
|
|
- );
|
|
|
+ const json = JSON.stringify(event, (key, value) => {
|
|
|
+ // Convert BigInt to string because JSON.stringify doesn't support BigInt
|
|
|
+ if (typeof value === 'bigint') {
|
|
|
+ // Defensive check: warn if BigInt is suspiciously large
|
|
|
+ // JavaScript Number.MAX_SAFE_INTEGER = 2^53 - 1 = 9007199254740991
|
|
|
+ // Timestamps in ms are ~13 digits, so 16+ digits might indicate corruption
|
|
|
+ if (value > BigInt(Number.MAX_SAFE_INTEGER) * BigInt(1000)) {
|
|
|
+ this.logger.warn(
|
|
|
+ `Suspiciously large BigInt in payload: key=${key}, value=${value.toString()}`,
|
|
|
+ );
|
|
|
+ }
|
|
|
+ return value.toString();
|
|
|
+ }
|
|
|
+ return value;
|
|
|
+ });
|
|
|
return Buffer.from(json);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
+ * Safely convert BigInt to Number with overflow detection
|
|
|
+ *
|
|
|
+ * JavaScript Number type uses double-precision (53-bit mantissa):
|
|
|
+ * - Safe integer range: -(2^53 - 1) to (2^53 - 1)
|
|
|
+ * - Timestamps in milliseconds since epoch fit easily (13-14 digits)
|
|
|
+ * - But corruption or bugs could produce unsafe values
|
|
|
+ *
|
|
|
+ * @param value - BigInt value to convert
|
|
|
+ * @param context - Context for error logging (e.g., 'timestamp', 'count')
|
|
|
+ * @returns Number representation, or 0 if unsafe
|
|
|
+ */
|
|
|
+ private safeNumberFromBigInt(value: bigint, context: string): number {
|
|
|
+ const MAX_SAFE = BigInt(Number.MAX_SAFE_INTEGER);
|
|
|
+ const MIN_SAFE = BigInt(Number.MIN_SAFE_INTEGER);
|
|
|
+
|
|
|
+ if (value > MAX_SAFE || value < MIN_SAFE) {
|
|
|
+ this.logger.warn(
|
|
|
+ `BigInt value out of safe range for ${context}: ${value.toString()}. Using 0 as fallback.`,
|
|
|
+ );
|
|
|
+ return 0; // Safe fallback
|
|
|
+ }
|
|
|
+
|
|
|
+ return Number(value);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
* Get circuit breaker status (for monitoring)
|
|
|
*/
|
|
|
getCircuitStatus(): {
|