|
|
@@ -9,6 +9,7 @@ import * as amqp from 'amqplib';
|
|
|
import { Channel, Connection, ConsumeMessage } from 'amqplib';
|
|
|
import { PrismaMongoService } from '../../prisma/prisma-mongo.service';
|
|
|
import { nowSecBigInt } from '@box/common/time/time.util';
|
|
|
+import { RedisService } from '@box/db/redis/redis.service';
|
|
|
|
|
|
interface BaseStatsMessage {
|
|
|
messageId: string;
|
|
|
@@ -33,6 +34,8 @@ interface AdClickMessage extends BaseStatsMessage {
|
|
|
machine?: string;
|
|
|
}
|
|
|
|
|
|
+const AD_META_KEY_PREFIX = 'box:app:ad:meta:';
|
|
|
+
|
|
|
@Injectable()
|
|
|
export class StatsEventsConsumer implements OnModuleInit, OnModuleDestroy {
|
|
|
private readonly logger = new Logger(StatsEventsConsumer.name);
|
|
|
@@ -62,6 +65,7 @@ export class StatsEventsConsumer implements OnModuleInit, OnModuleDestroy {
|
|
|
constructor(
|
|
|
private readonly config: ConfigService,
|
|
|
private readonly prisma: PrismaMongoService,
|
|
|
+ private readonly redis: RedisService,
|
|
|
) {}
|
|
|
|
|
|
getCounters() {
|
|
|
@@ -328,6 +332,17 @@ export class StatsEventsConsumer implements OnModuleInit, OnModuleDestroy {
|
|
|
}
|
|
|
|
|
|
try {
|
|
|
+ const resolvedAdType = await this.resolveAdTypeForAdsId(adsId);
|
|
|
+
|
|
|
+ if (!resolvedAdType) {
|
|
|
+ this.logger.error(
|
|
|
+ `Invalid ad metadata for adsId=${adsId}, uid=${payload.uid}, channelId=${payload.channelId}, messageId=${messageId}; dropping event per Task 2 – stats-consumer-enrichment-flow.md`,
|
|
|
+ );
|
|
|
+ this.counters.malformed++;
|
|
|
+ this.ack(msg);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
const client = this.prisma as any;
|
|
|
|
|
|
this.logger.log(
|
|
|
@@ -343,7 +358,7 @@ export class StatsEventsConsumer implements OnModuleInit, OnModuleDestroy {
|
|
|
uid: payload.uid,
|
|
|
adsId: adsId,
|
|
|
adId: adId,
|
|
|
- adType: payload.adType,
|
|
|
+ adType: resolvedAdType,
|
|
|
clickedAt: clickTime,
|
|
|
ip: payload.ip,
|
|
|
channelId: payload.channelId,
|
|
|
@@ -366,6 +381,48 @@ export class StatsEventsConsumer implements OnModuleInit, OnModuleDestroy {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ private async resolveAdTypeForAdsId(adsId: string): Promise<string | null> {
|
|
|
+ // Task 2 – stats-consumer-enrichment-flow.md: Redis read-only, Mongo fallback, hostile adsId drop path.
|
|
|
+ const cacheKey = `${AD_META_KEY_PREFIX}${adsId}`;
|
|
|
+ let cachedValue: string | null = null;
|
|
|
+
|
|
|
+ try {
|
|
|
+ cachedValue = await this.redis.get(cacheKey);
|
|
|
+ } catch (err) {
|
|
|
+ this.logger.debug(
|
|
|
+ `Redis lookup failed for key=${cacheKey}, falling back to Mongo`,
|
|
|
+ err instanceof Error ? err.stack : String(err),
|
|
|
+ );
|
|
|
+ }
|
|
|
+
|
|
|
+ if (cachedValue) {
|
|
|
+ const trimmed = cachedValue.trim();
|
|
|
+ if (trimmed) {
|
|
|
+ if (trimmed.startsWith('{')) {
|
|
|
+ try {
|
|
|
+ const parsed = JSON.parse(trimmed);
|
|
|
+ if (parsed && typeof parsed === 'object' && 'adType' in parsed) {
|
|
|
+ return String(parsed.adType);
|
|
|
+ }
|
|
|
+ } catch {
|
|
|
+ // ignore parse failures and treat value as raw string below
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (!trimmed.startsWith('{')) {
|
|
|
+ return trimmed;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ const client = this.prisma as any;
|
|
|
+ const adRecord = await client.ads.findUnique({
|
|
|
+ where: { id: adsId },
|
|
|
+ select: { adType: true },
|
|
|
+ });
|
|
|
+
|
|
|
+ return adRecord?.adType ?? null;
|
|
|
+ }
|
|
|
+
|
|
|
async onModuleDestroy(): Promise<void> {
|
|
|
if (this.logInterval) clearInterval(this.logInterval);
|
|
|
|