stats-events.consumer.ts 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411
  1. import {
  2. Injectable,
  3. Logger,
  4. OnModuleDestroy,
  5. OnModuleInit,
  6. } from '@nestjs/common';
  7. import { ConfigService } from '@nestjs/config';
  8. import * as amqp from 'amqplib';
  9. import { Channel, Connection, ConsumeMessage } from 'amqplib';
  10. import { PrismaMongoService } from '../../prisma/prisma-mongo.service';
  11. import { nowEpochMsBigInt } from '@box/common/time/time.util';
  12. interface BaseStatsMessage {
  13. messageId: string;
  14. uid: string;
  15. ip: string;
  16. userAgent: string;
  17. appVersion?: string;
  18. os?: string;
  19. createAt: string | number | bigint;
  20. updateAt: string | number | bigint;
  21. }
  22. interface AdClickMessage extends BaseStatsMessage {
  23. adsId: string; // Ad ID (from publisher)
  24. adId?: string; // Alternative field name (for backward compatibility)
  25. channelId: string;
  26. scene?: string;
  27. slot?: string;
  28. adType: string;
  29. clickedAt?: string | number | bigint;
  30. clickAt?: string | number | bigint; // Publisher sends this
  31. machine?: string;
  32. }
  33. @Injectable()
  34. export class StatsEventsConsumer implements OnModuleInit, OnModuleDestroy {
  35. private readonly logger = new Logger(StatsEventsConsumer.name);
  36. private connection?: Connection;
  37. private channel?: Channel;
  38. private consumerTags: string[] = [];
  39. private counters = {
  40. adClick: 0,
  41. parseError: 0,
  42. malformed: 0,
  43. duplicate: 0,
  44. persistError: 0,
  45. };
  46. private logInterval?: NodeJS.Timeout;
  47. // keep config for log clarity + cleanup
  48. private url?: string;
  49. private exchange = 'stats.user';
  50. private queueAdClick = 'stats.ad.click';
  51. private routingKeyAdClick = 'stats.ad.click';
  52. constructor(
  53. private readonly config: ConfigService,
  54. private readonly prisma: PrismaMongoService,
  55. ) {}
  56. getCounters() {
  57. return { ...this.counters };
  58. }
  59. async onModuleInit(): Promise<void> {
  60. this.url = this.config.get<string>('RABBITMQ_URL')?.trim() || undefined;
  61. this.exchange =
  62. this.config.get<string>('RABBITMQ_STATS_EXCHANGE')?.trim() ||
  63. this.exchange;
  64. this.queueAdClick =
  65. this.config.get<string>('RABBITMQ_STATS_AD_CLICK_QUEUE')?.trim() ||
  66. this.queueAdClick;
  67. // Routing key is fixed to stats.ad.click; overrides removed
  68. if (!this.url) {
  69. // If you want to fail-fast and stop app boot: throw new Error(...)
  70. this.logger.error(
  71. 'StatsEventsConsumer is DISABLED: RABBITMQ_URL is not set',
  72. );
  73. return;
  74. }
  75. this.logger.log(
  76. [
  77. 'StatsEventsConsumer bootstrap:',
  78. `url=${this.maskAmqpUrl(this.url)}`,
  79. `exchange=${this.exchange}`,
  80. `queue=${this.queueAdClick}`,
  81. `routingKey=${this.routingKeyAdClick}`,
  82. ].join(' '),
  83. );
  84. try {
  85. await this.connectAndConsume();
  86. this.logger.log('🚀 StatsEventsConsumer READY');
  87. this.logInterval = setInterval(() => {
  88. this.logger.log(
  89. `📊 Ingestion stats: ` +
  90. `adClick=${this.counters.adClick}, duplicate=${this.counters.duplicate}, ` +
  91. `malformed=${this.counters.malformed}, parseError=${this.counters.parseError}, persistError=${this.counters.persistError}`,
  92. );
  93. }, 60_000);
  94. } catch (err) {
  95. this.logger.error(
  96. 'StatsEventsConsumer FAILED to start (connection/assert/consume error)',
  97. err instanceof Error ? err.stack : String(err),
  98. );
  99. await this.safeClose();
  100. // keep app running, consumer disabled
  101. return;
  102. // OR fail-fast:
  103. // throw err;
  104. }
  105. }
  106. private async connectAndConsume(): Promise<void> {
  107. if (!this.url) throw new Error('RABBITMQ_URL missing at connect time');
  108. this.logger.log(
  109. `Connecting to RabbitMQ at ${this.maskAmqpUrl(this.url)} ...`,
  110. );
  111. const conn = await amqp.connect(this.url);
  112. conn.on('error', (e) => {
  113. this.logger.error(
  114. 'RabbitMQ connection error event',
  115. e instanceof Error ? e.stack : String(e),
  116. );
  117. });
  118. conn.on('close', () => {
  119. this.logger.warn('RabbitMQ connection closed');
  120. });
  121. this.connection = conn;
  122. const ch = await conn.createChannel();
  123. ch.on('error', (e) => {
  124. this.logger.error(
  125. 'RabbitMQ channel error event',
  126. e instanceof Error ? e.stack : String(e),
  127. );
  128. });
  129. ch.on('close', () => {
  130. this.logger.warn('RabbitMQ channel closed');
  131. });
  132. this.channel = ch;
  133. // QoS to avoid handler overload
  134. await ch.prefetch(200);
  135. this.logger.log(`Asserting exchange="${this.exchange}" type=topic durable`);
  136. await ch.assertExchange(this.exchange, 'topic', { durable: true });
  137. this.logger.log(
  138. `Asserting queue="${this.queueAdClick}" routingKey="${this.routingKeyAdClick}"`,
  139. );
  140. await ch.assertQueue(this.queueAdClick, { durable: true });
  141. await ch.bindQueue(
  142. this.queueAdClick,
  143. this.exchange,
  144. this.routingKeyAdClick,
  145. );
  146. this.logger.log(`Consuming queue="${this.queueAdClick}" (noAck=false)`);
  147. const consumer = await ch.consume(
  148. this.queueAdClick,
  149. (msg) => void this.handleAdClick(msg),
  150. { noAck: false },
  151. );
  152. this.consumerTags = [consumer.consumerTag];
  153. this.logger.log(`Consumer started (tag=${consumer.consumerTag})`);
  154. }
  155. private parseJson<T>(msg: ConsumeMessage): T | null {
  156. try {
  157. return JSON.parse(msg.content.toString('utf8')) as T;
  158. } catch (error) {
  159. this.counters.parseError++;
  160. this.logger.error(
  161. `Failed to parse message (deliveryTag=${msg.fields.deliveryTag}): ${msg.content.toString(
  162. 'utf8',
  163. )}`,
  164. error instanceof Error ? error.stack : String(error),
  165. );
  166. return null;
  167. }
  168. }
  169. private toBigInt(value: string | number | bigint | undefined): bigint {
  170. if (value === undefined || value === null) return BigInt(0);
  171. if (typeof value === 'bigint') return value;
  172. if (typeof value === 'number') return BigInt(Math.trunc(value));
  173. const s = String(value).trim();
  174. if (!s) return BigInt(0);
  175. try {
  176. return BigInt(s);
  177. } catch {
  178. return BigInt(0);
  179. }
  180. }
  181. private async markProcessed(
  182. messageId: string,
  183. eventType: string,
  184. ): Promise<'new' | 'duplicate' | 'error'> {
  185. const now = nowEpochMsBigInt();
  186. const client = this.prisma as any;
  187. try {
  188. await client.processedMessage.create({
  189. data: {
  190. messageId,
  191. eventType,
  192. processedAt: now,
  193. createdAt: now,
  194. },
  195. });
  196. return 'new';
  197. } catch (error: any) {
  198. if (error?.code === 'P2002') {
  199. this.counters.duplicate++;
  200. this.logger.debug(
  201. `Duplicate message ignored: messageId=${messageId}, eventType=${eventType}`,
  202. );
  203. return 'duplicate';
  204. }
  205. this.logger.error(
  206. `Failed to mark processed messageId=${messageId}, eventType=${eventType}`,
  207. error instanceof Error ? error.stack : String(error),
  208. );
  209. return 'error';
  210. }
  211. }
  212. private async cleanupProcessed(messageId: string): Promise<void> {
  213. const client = this.prisma as any;
  214. try {
  215. await client.processedMessage.delete({ where: { messageId } });
  216. } catch (error: any) {
  217. // ignore "not found"
  218. if (error?.code !== 'P2025') {
  219. this.logger.warn(
  220. `Cleanup processed message failed for messageId=${messageId}: ${
  221. error?.message ?? error
  222. }`,
  223. );
  224. }
  225. }
  226. }
  227. private ack(msg: ConsumeMessage): void {
  228. this.channel?.ack(msg);
  229. }
  230. private nackDrop(msg: ConsumeMessage): void {
  231. this.channel?.nack(msg, false, false);
  232. }
  233. private async handleAdClick(msg: ConsumeMessage | null): Promise<void> {
  234. if (!msg) return;
  235. const payload = this.parseJson<AdClickMessage>(msg);
  236. const adId = payload?.adId || payload?.adsId;
  237. if (
  238. !payload ||
  239. !payload.uid ||
  240. !adId ||
  241. !payload.channelId ||
  242. !payload.machine
  243. ) {
  244. this.counters.malformed++;
  245. this.logger.warn(
  246. `Malformed ad.click message, dropping (deliveryTag=${msg.fields.deliveryTag}): ${msg.content.toString(
  247. 'utf8',
  248. )}`,
  249. );
  250. this.nackDrop(msg);
  251. return;
  252. }
  253. // If publisher doesn't provide messageId, generate stable-ish one
  254. // NOTE: This is still best-effort; ideally publisher always sends messageId.
  255. const messageId =
  256. payload.messageId ||
  257. `${this.toBigInt(payload.clickAt ?? payload.clickedAt) || nowEpochMsBigInt()}-${adId}-${payload.uid}`;
  258. const status = await this.markProcessed(messageId, 'stats.ad.click');
  259. if (status === 'duplicate') {
  260. this.ack(msg);
  261. return;
  262. }
  263. if (status === 'error') {
  264. this.nackDrop(msg);
  265. return;
  266. }
  267. try {
  268. const client = this.prisma as any;
  269. const now = nowEpochMsBigInt();
  270. const clickTime = payload.clickAt || payload.clickedAt || now;
  271. await client.adClickEvents.create({
  272. data: {
  273. uid: payload.uid,
  274. adId,
  275. adType: payload.adType,
  276. clickedAt: this.toBigInt(clickTime),
  277. ip: payload.ip,
  278. channelId: payload.channelId,
  279. machine: payload.machine,
  280. createAt: this.toBigInt(payload.createAt || now),
  281. updateAt: this.toBigInt(payload.updateAt || now),
  282. },
  283. });
  284. this.counters.adClick++;
  285. this.ack(msg);
  286. } catch (error) {
  287. this.counters.persistError++;
  288. this.logger.error(
  289. `Failed to persist ad.click messageId=${messageId} (deliveryTag=${msg.fields.deliveryTag})`,
  290. error instanceof Error ? error.stack : String(error),
  291. );
  292. await this.cleanupProcessed(messageId);
  293. this.nackDrop(msg);
  294. }
  295. }
  296. async onModuleDestroy(): Promise<void> {
  297. if (this.logInterval) clearInterval(this.logInterval);
  298. this.logger.log('StatsEventsConsumer shutting down...');
  299. await this.safeClose();
  300. this.logger.log('StatsEventsConsumer shutdown complete');
  301. }
  302. private async safeClose(): Promise<void> {
  303. // cancel consumers first (best-effort)
  304. if (this.channel && this.consumerTags.length > 0) {
  305. for (const tag of this.consumerTags) {
  306. try {
  307. await this.channel.cancel(tag);
  308. } catch (err) {
  309. this.logger.warn(
  310. `Failed to cancel consumer tag="${tag}"`,
  311. err instanceof Error ? err.stack : String(err),
  312. );
  313. }
  314. }
  315. this.consumerTags = [];
  316. }
  317. if (this.channel) {
  318. try {
  319. await this.channel.close();
  320. } catch (err) {
  321. this.logger.warn(
  322. 'Error while closing RabbitMQ channel',
  323. err instanceof Error ? err.stack : String(err),
  324. );
  325. } finally {
  326. this.channel = undefined;
  327. }
  328. }
  329. if (this.connection) {
  330. try {
  331. await this.connection.close();
  332. } catch (err) {
  333. this.logger.warn(
  334. 'Error while closing RabbitMQ connection',
  335. err instanceof Error ? err.stack : String(err),
  336. );
  337. } finally {
  338. this.connection = undefined;
  339. }
  340. }
  341. }
  342. private maskAmqpUrl(url: string): string {
  343. try {
  344. const u = new URL(url);
  345. if (u.username || u.password) {
  346. const masked = new URL(url);
  347. masked.username = u.username ? '***' : '';
  348. masked.password = u.password ? '***' : '';
  349. return masked.toString();
  350. }
  351. return url;
  352. } catch {
  353. return url.replace(/\/\/([^:/@]+):([^@]+)@/g, '//***:***@');
  354. }
  355. }
  356. }