import { Injectable, Logger, OnModuleDestroy, OnModuleInit, } from '@nestjs/common'; import { ConfigService } from '@nestjs/config'; import * as amqp from 'amqplib'; import { Channel, Connection, ConsumeMessage } from 'amqplib'; import { PrismaMongoService } from '../../prisma/prisma-mongo.service'; import { nowEpochMsBigInt } from '@box/common/time/time.util'; interface BaseStatsMessage { messageId: string; uid: string; ip: string; userAgent: string; appVersion?: string; os?: string; createAt: string | number | bigint; updateAt: string | number | bigint; } interface AdClickMessage extends BaseStatsMessage { adsId: string; // Ad ID (from publisher) adId?: string; // Alternative field name (for backward compatibility) channelId: string; scene?: string; slot?: string; adType: string; clickedAt?: string | number | bigint; clickAt?: string | number | bigint; // Publisher sends this machine?: string; } @Injectable() export class StatsEventsConsumer implements OnModuleInit, OnModuleDestroy { private readonly logger = new Logger(StatsEventsConsumer.name); private connection?: Connection; private channel?: Channel; private consumerTags: string[] = []; private counters = { adClick: 0, parseError: 0, malformed: 0, duplicate: 0, persistError: 0, }; private logInterval?: NodeJS.Timeout; // keep config for log clarity + cleanup private url?: string; private exchange = 'stats.user'; private queueAdClick = 'stats.ad.click'; private routingKeyAdClick = 'stats.ad.click'; constructor( private readonly config: ConfigService, private readonly prisma: PrismaMongoService, ) {} getCounters() { return { ...this.counters }; } async onModuleInit(): Promise { this.url = this.config.get('RABBITMQ_URL')?.trim() || undefined; this.exchange = this.config.get('RABBITMQ_STATS_EXCHANGE')?.trim() || this.exchange; this.queueAdClick = this.config.get('RABBITMQ_STATS_AD_CLICK_QUEUE')?.trim() || this.queueAdClick; // Routing key is fixed to stats.ad.click; overrides removed if (!this.url) { // If you want to fail-fast and stop app boot: throw new Error(...) this.logger.error( 'StatsEventsConsumer is DISABLED: RABBITMQ_URL is not set', ); return; } this.logger.log( [ 'StatsEventsConsumer bootstrap:', `url=${this.maskAmqpUrl(this.url)}`, `exchange=${this.exchange}`, `queue=${this.queueAdClick}`, `routingKey=${this.routingKeyAdClick}`, ].join(' '), ); try { await this.connectAndConsume(); this.logger.log('🚀 StatsEventsConsumer READY'); this.logInterval = setInterval(() => { this.logger.log( `📊 Ingestion stats: ` + `adClick=${this.counters.adClick}, duplicate=${this.counters.duplicate}, ` + `malformed=${this.counters.malformed}, parseError=${this.counters.parseError}, persistError=${this.counters.persistError}`, ); }, 60_000); } catch (err) { this.logger.error( 'StatsEventsConsumer FAILED to start (connection/assert/consume error)', err instanceof Error ? err.stack : String(err), ); await this.safeClose(); // keep app running, consumer disabled return; // OR fail-fast: // throw err; } } private async connectAndConsume(): Promise { if (!this.url) throw new Error('RABBITMQ_URL missing at connect time'); this.logger.log( `Connecting to RabbitMQ at ${this.maskAmqpUrl(this.url)} ...`, ); const conn = await amqp.connect(this.url); conn.on('error', (e) => { this.logger.error( 'RabbitMQ connection error event', e instanceof Error ? e.stack : String(e), ); }); conn.on('close', () => { this.logger.warn('RabbitMQ connection closed'); }); this.connection = conn; const ch = await conn.createChannel(); ch.on('error', (e) => { this.logger.error( 'RabbitMQ channel error event', e instanceof Error ? e.stack : String(e), ); }); ch.on('close', () => { this.logger.warn('RabbitMQ channel closed'); }); this.channel = ch; // QoS to avoid handler overload await ch.prefetch(200); this.logger.log(`Asserting exchange="${this.exchange}" type=topic durable`); await ch.assertExchange(this.exchange, 'topic', { durable: true }); this.logger.log( `Asserting queue="${this.queueAdClick}" routingKey="${this.routingKeyAdClick}"`, ); await ch.assertQueue(this.queueAdClick, { durable: true }); await ch.bindQueue( this.queueAdClick, this.exchange, this.routingKeyAdClick, ); this.logger.log(`Consuming queue="${this.queueAdClick}" (noAck=false)`); const consumer = await ch.consume( this.queueAdClick, (msg) => void this.handleAdClick(msg), { noAck: false }, ); this.consumerTags = [consumer.consumerTag]; this.logger.log(`Consumer started (tag=${consumer.consumerTag})`); } private parseJson(msg: ConsumeMessage): T | null { try { return JSON.parse(msg.content.toString('utf8')) as T; } catch (error) { this.counters.parseError++; this.logger.error( `Failed to parse message (deliveryTag=${msg.fields.deliveryTag}): ${msg.content.toString( 'utf8', )}`, error instanceof Error ? error.stack : String(error), ); return null; } } private toBigInt(value: string | number | bigint | undefined): bigint { if (value === undefined || value === null) return BigInt(0); if (typeof value === 'bigint') return value; if (typeof value === 'number') return BigInt(Math.trunc(value)); const s = String(value).trim(); if (!s) return BigInt(0); try { return BigInt(s); } catch { return BigInt(0); } } private async markProcessed( messageId: string, eventType: string, ): Promise<'new' | 'duplicate' | 'error'> { const now = nowEpochMsBigInt(); const client = this.prisma as any; try { await client.processedMessage.create({ data: { messageId, eventType, processedAt: now, createdAt: now, }, }); return 'new'; } catch (error: any) { if (error?.code === 'P2002') { this.counters.duplicate++; this.logger.debug( `Duplicate message ignored: messageId=${messageId}, eventType=${eventType}`, ); return 'duplicate'; } this.logger.error( `Failed to mark processed messageId=${messageId}, eventType=${eventType}`, error instanceof Error ? error.stack : String(error), ); return 'error'; } } private async cleanupProcessed(messageId: string): Promise { const client = this.prisma as any; try { await client.processedMessage.delete({ where: { messageId } }); } catch (error: any) { // ignore "not found" if (error?.code !== 'P2025') { this.logger.warn( `Cleanup processed message failed for messageId=${messageId}: ${ error?.message ?? error }`, ); } } } private ack(msg: ConsumeMessage): void { this.channel?.ack(msg); } private nackDrop(msg: ConsumeMessage): void { this.channel?.nack(msg, false, false); } private async handleAdClick(msg: ConsumeMessage | null): Promise { if (!msg) return; const payload = this.parseJson(msg); const adId = payload?.adId || payload?.adsId; if ( !payload || !payload.uid || !adId || !payload.channelId || !payload.machine ) { this.counters.malformed++; this.logger.warn( `Malformed ad.click message, dropping (deliveryTag=${msg.fields.deliveryTag}): ${msg.content.toString( 'utf8', )}`, ); this.nackDrop(msg); return; } // If publisher doesn't provide messageId, generate stable-ish one // NOTE: This is still best-effort; ideally publisher always sends messageId. const messageId = payload.messageId || `${this.toBigInt(payload.clickAt ?? payload.clickedAt) || nowEpochMsBigInt()}-${adId}-${payload.uid}`; const status = await this.markProcessed(messageId, 'stats.ad.click'); if (status === 'duplicate') { this.ack(msg); return; } if (status === 'error') { this.nackDrop(msg); return; } try { const client = this.prisma as any; const now = nowEpochMsBigInt(); const clickTime = payload.clickAt || payload.clickedAt || now; await client.adClickEvents.create({ data: { uid: payload.uid, adId, adType: payload.adType, clickedAt: this.toBigInt(clickTime), ip: payload.ip, channelId: payload.channelId, machine: payload.machine, createAt: this.toBigInt(payload.createAt || now), updateAt: this.toBigInt(payload.updateAt || now), }, }); this.counters.adClick++; this.ack(msg); } catch (error) { this.counters.persistError++; this.logger.error( `Failed to persist ad.click messageId=${messageId} (deliveryTag=${msg.fields.deliveryTag})`, error instanceof Error ? error.stack : String(error), ); await this.cleanupProcessed(messageId); this.nackDrop(msg); } } async onModuleDestroy(): Promise { if (this.logInterval) clearInterval(this.logInterval); this.logger.log('StatsEventsConsumer shutting down...'); await this.safeClose(); this.logger.log('StatsEventsConsumer shutdown complete'); } private async safeClose(): Promise { // cancel consumers first (best-effort) if (this.channel && this.consumerTags.length > 0) { for (const tag of this.consumerTags) { try { await this.channel.cancel(tag); } catch (err) { this.logger.warn( `Failed to cancel consumer tag="${tag}"`, err instanceof Error ? err.stack : String(err), ); } } this.consumerTags = []; } if (this.channel) { try { await this.channel.close(); } catch (err) { this.logger.warn( 'Error while closing RabbitMQ channel', err instanceof Error ? err.stack : String(err), ); } finally { this.channel = undefined; } } if (this.connection) { try { await this.connection.close(); } catch (err) { this.logger.warn( 'Error while closing RabbitMQ connection', err instanceof Error ? err.stack : String(err), ); } finally { this.connection = undefined; } } } private maskAmqpUrl(url: string): string { try { const u = new URL(url); if (u.username || u.password) { const masked = new URL(url); masked.username = u.username ? '***' : ''; masked.password = u.password ? '***' : ''; return masked.toString(); } return url; } catch { return url.replace(/\/\/([^:/@]+):([^@]+)@/g, '//***:***@'); } } }