| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411 |
- import {
- Injectable,
- Logger,
- OnModuleDestroy,
- OnModuleInit,
- } from '@nestjs/common';
- import { ConfigService } from '@nestjs/config';
- import * as amqp from 'amqplib';
- import { Channel, Connection, ConsumeMessage } from 'amqplib';
- import { PrismaMongoService } from '../../prisma/prisma-mongo.service';
- import { nowEpochMsBigInt } from '@box/common/time/time.util';
- interface BaseStatsMessage {
- messageId: string;
- uid: string;
- ip: string;
- userAgent: string;
- appVersion?: string;
- os?: string;
- createAt: string | number | bigint;
- updateAt: string | number | bigint;
- }
- interface AdClickMessage extends BaseStatsMessage {
- adsId: string; // Ad ID (from publisher)
- adId?: string; // Alternative field name (for backward compatibility)
- channelId: string;
- scene?: string;
- slot?: string;
- adType: string;
- clickedAt?: string | number | bigint;
- clickAt?: string | number | bigint; // Publisher sends this
- machine?: string;
- }
- @Injectable()
- export class StatsEventsConsumer implements OnModuleInit, OnModuleDestroy {
- private readonly logger = new Logger(StatsEventsConsumer.name);
- private connection?: Connection;
- private channel?: Channel;
- private consumerTags: string[] = [];
- private counters = {
- adClick: 0,
- parseError: 0,
- malformed: 0,
- duplicate: 0,
- persistError: 0,
- };
- private logInterval?: NodeJS.Timeout;
- // keep config for log clarity + cleanup
- private url?: string;
- private exchange = 'stats.user';
- private queueAdClick = 'stats.ad.click';
- private routingKeyAdClick = 'stats.ad.click';
- constructor(
- private readonly config: ConfigService,
- private readonly prisma: PrismaMongoService,
- ) {}
- getCounters() {
- return { ...this.counters };
- }
- async onModuleInit(): Promise<void> {
- this.url = this.config.get<string>('RABBITMQ_URL')?.trim() || undefined;
- this.exchange =
- this.config.get<string>('RABBITMQ_STATS_EXCHANGE')?.trim() ||
- this.exchange;
- this.queueAdClick =
- this.config.get<string>('RABBITMQ_STATS_AD_CLICK_QUEUE')?.trim() ||
- this.queueAdClick;
- // Routing key is fixed to stats.ad.click; overrides removed
- if (!this.url) {
- // If you want to fail-fast and stop app boot: throw new Error(...)
- this.logger.error(
- 'StatsEventsConsumer is DISABLED: RABBITMQ_URL is not set',
- );
- return;
- }
- this.logger.log(
- [
- 'StatsEventsConsumer bootstrap:',
- `url=${this.maskAmqpUrl(this.url)}`,
- `exchange=${this.exchange}`,
- `queue=${this.queueAdClick}`,
- `routingKey=${this.routingKeyAdClick}`,
- ].join(' '),
- );
- try {
- await this.connectAndConsume();
- this.logger.log('🚀 StatsEventsConsumer READY');
- this.logInterval = setInterval(() => {
- this.logger.log(
- `📊 Ingestion stats: ` +
- `adClick=${this.counters.adClick}, duplicate=${this.counters.duplicate}, ` +
- `malformed=${this.counters.malformed}, parseError=${this.counters.parseError}, persistError=${this.counters.persistError}`,
- );
- }, 60_000);
- } catch (err) {
- this.logger.error(
- 'StatsEventsConsumer FAILED to start (connection/assert/consume error)',
- err instanceof Error ? err.stack : String(err),
- );
- await this.safeClose();
- // keep app running, consumer disabled
- return;
- // OR fail-fast:
- // throw err;
- }
- }
- private async connectAndConsume(): Promise<void> {
- if (!this.url) throw new Error('RABBITMQ_URL missing at connect time');
- this.logger.log(
- `Connecting to RabbitMQ at ${this.maskAmqpUrl(this.url)} ...`,
- );
- const conn = await amqp.connect(this.url);
- conn.on('error', (e) => {
- this.logger.error(
- 'RabbitMQ connection error event',
- e instanceof Error ? e.stack : String(e),
- );
- });
- conn.on('close', () => {
- this.logger.warn('RabbitMQ connection closed');
- });
- this.connection = conn;
- const ch = await conn.createChannel();
- ch.on('error', (e) => {
- this.logger.error(
- 'RabbitMQ channel error event',
- e instanceof Error ? e.stack : String(e),
- );
- });
- ch.on('close', () => {
- this.logger.warn('RabbitMQ channel closed');
- });
- this.channel = ch;
- // QoS to avoid handler overload
- await ch.prefetch(200);
- this.logger.log(`Asserting exchange="${this.exchange}" type=topic durable`);
- await ch.assertExchange(this.exchange, 'topic', { durable: true });
- this.logger.log(
- `Asserting queue="${this.queueAdClick}" routingKey="${this.routingKeyAdClick}"`,
- );
- await ch.assertQueue(this.queueAdClick, { durable: true });
- await ch.bindQueue(
- this.queueAdClick,
- this.exchange,
- this.routingKeyAdClick,
- );
- this.logger.log(`Consuming queue="${this.queueAdClick}" (noAck=false)`);
- const consumer = await ch.consume(
- this.queueAdClick,
- (msg) => void this.handleAdClick(msg),
- { noAck: false },
- );
- this.consumerTags = [consumer.consumerTag];
- this.logger.log(`Consumer started (tag=${consumer.consumerTag})`);
- }
- private parseJson<T>(msg: ConsumeMessage): T | null {
- try {
- return JSON.parse(msg.content.toString('utf8')) as T;
- } catch (error) {
- this.counters.parseError++;
- this.logger.error(
- `Failed to parse message (deliveryTag=${msg.fields.deliveryTag}): ${msg.content.toString(
- 'utf8',
- )}`,
- error instanceof Error ? error.stack : String(error),
- );
- return null;
- }
- }
- private toBigInt(value: string | number | bigint | undefined): bigint {
- if (value === undefined || value === null) return BigInt(0);
- if (typeof value === 'bigint') return value;
- if (typeof value === 'number') return BigInt(Math.trunc(value));
- const s = String(value).trim();
- if (!s) return BigInt(0);
- try {
- return BigInt(s);
- } catch {
- return BigInt(0);
- }
- }
- private async markProcessed(
- messageId: string,
- eventType: string,
- ): Promise<'new' | 'duplicate' | 'error'> {
- const now = nowEpochMsBigInt();
- const client = this.prisma as any;
- try {
- await client.processedMessage.create({
- data: {
- messageId,
- eventType,
- processedAt: now,
- createdAt: now,
- },
- });
- return 'new';
- } catch (error: any) {
- if (error?.code === 'P2002') {
- this.counters.duplicate++;
- this.logger.debug(
- `Duplicate message ignored: messageId=${messageId}, eventType=${eventType}`,
- );
- return 'duplicate';
- }
- this.logger.error(
- `Failed to mark processed messageId=${messageId}, eventType=${eventType}`,
- error instanceof Error ? error.stack : String(error),
- );
- return 'error';
- }
- }
- private async cleanupProcessed(messageId: string): Promise<void> {
- const client = this.prisma as any;
- try {
- await client.processedMessage.delete({ where: { messageId } });
- } catch (error: any) {
- // ignore "not found"
- if (error?.code !== 'P2025') {
- this.logger.warn(
- `Cleanup processed message failed for messageId=${messageId}: ${
- error?.message ?? error
- }`,
- );
- }
- }
- }
- private ack(msg: ConsumeMessage): void {
- this.channel?.ack(msg);
- }
- private nackDrop(msg: ConsumeMessage): void {
- this.channel?.nack(msg, false, false);
- }
- private async handleAdClick(msg: ConsumeMessage | null): Promise<void> {
- if (!msg) return;
- const payload = this.parseJson<AdClickMessage>(msg);
- const adId = payload?.adId || payload?.adsId;
- if (
- !payload ||
- !payload.uid ||
- !adId ||
- !payload.channelId ||
- !payload.machine
- ) {
- this.counters.malformed++;
- this.logger.warn(
- `Malformed ad.click message, dropping (deliveryTag=${msg.fields.deliveryTag}): ${msg.content.toString(
- 'utf8',
- )}`,
- );
- this.nackDrop(msg);
- return;
- }
- // If publisher doesn't provide messageId, generate stable-ish one
- // NOTE: This is still best-effort; ideally publisher always sends messageId.
- const messageId =
- payload.messageId ||
- `${this.toBigInt(payload.clickAt ?? payload.clickedAt) || nowEpochMsBigInt()}-${adId}-${payload.uid}`;
- const status = await this.markProcessed(messageId, 'stats.ad.click');
- if (status === 'duplicate') {
- this.ack(msg);
- return;
- }
- if (status === 'error') {
- this.nackDrop(msg);
- return;
- }
- try {
- const client = this.prisma as any;
- const now = nowEpochMsBigInt();
- const clickTime = payload.clickAt || payload.clickedAt || now;
- await client.adClickEvents.create({
- data: {
- uid: payload.uid,
- adId,
- adType: payload.adType,
- clickedAt: this.toBigInt(clickTime),
- ip: payload.ip,
- channelId: payload.channelId,
- machine: payload.machine,
- createAt: this.toBigInt(payload.createAt || now),
- updateAt: this.toBigInt(payload.updateAt || now),
- },
- });
- this.counters.adClick++;
- this.ack(msg);
- } catch (error) {
- this.counters.persistError++;
- this.logger.error(
- `Failed to persist ad.click messageId=${messageId} (deliveryTag=${msg.fields.deliveryTag})`,
- error instanceof Error ? error.stack : String(error),
- );
- await this.cleanupProcessed(messageId);
- this.nackDrop(msg);
- }
- }
- async onModuleDestroy(): Promise<void> {
- if (this.logInterval) clearInterval(this.logInterval);
- this.logger.log('StatsEventsConsumer shutting down...');
- await this.safeClose();
- this.logger.log('StatsEventsConsumer shutdown complete');
- }
- private async safeClose(): Promise<void> {
- // cancel consumers first (best-effort)
- if (this.channel && this.consumerTags.length > 0) {
- for (const tag of this.consumerTags) {
- try {
- await this.channel.cancel(tag);
- } catch (err) {
- this.logger.warn(
- `Failed to cancel consumer tag="${tag}"`,
- err instanceof Error ? err.stack : String(err),
- );
- }
- }
- this.consumerTags = [];
- }
- if (this.channel) {
- try {
- await this.channel.close();
- } catch (err) {
- this.logger.warn(
- 'Error while closing RabbitMQ channel',
- err instanceof Error ? err.stack : String(err),
- );
- } finally {
- this.channel = undefined;
- }
- }
- if (this.connection) {
- try {
- await this.connection.close();
- } catch (err) {
- this.logger.warn(
- 'Error while closing RabbitMQ connection',
- err instanceof Error ? err.stack : String(err),
- );
- } finally {
- this.connection = undefined;
- }
- }
- }
- private maskAmqpUrl(url: string): string {
- try {
- const u = new URL(url);
- if (u.username || u.password) {
- const masked = new URL(url);
- masked.username = u.username ? '***' : '';
- masked.password = u.password ? '***' : '';
- return masked.toString();
- }
- return url;
- } catch {
- return url.replace(/\/\/([^:/@]+):([^@]+)@/g, '//***:***@');
- }
- }
- }
|