|
@@ -22,13 +22,15 @@ interface BaseStatsMessage {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
interface AdClickMessage extends BaseStatsMessage {
|
|
interface AdClickMessage extends BaseStatsMessage {
|
|
|
- adId: string;
|
|
|
|
|
|
|
+ adsId: string; // Ad ID (from publisher)
|
|
|
|
|
+ adId?: string; // Alternative field name (for backward compatibility)
|
|
|
adsModuleId: string;
|
|
adsModuleId: string;
|
|
|
channelId: string;
|
|
channelId: string;
|
|
|
- scene: string;
|
|
|
|
|
- slot: string;
|
|
|
|
|
|
|
+ scene?: string; // Optional - from ad placement context
|
|
|
|
|
+ slot?: string; // Optional - from ad placement context
|
|
|
adType: string;
|
|
adType: string;
|
|
|
- clickedAt: string | number | bigint;
|
|
|
|
|
|
|
+ clickedAt?: string | number | bigint; // Alternative field name
|
|
|
|
|
+ clickAt?: string | number | bigint; // Publisher sends this
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
interface VideoClickMessage extends BaseStatsMessage {
|
|
interface VideoClickMessage extends BaseStatsMessage {
|
|
@@ -228,7 +230,10 @@ export class StatsEventsConsumer implements OnModuleInit, OnModuleDestroy {
|
|
|
private async handleAdClick(msg: ConsumeMessage | null): Promise<void> {
|
|
private async handleAdClick(msg: ConsumeMessage | null): Promise<void> {
|
|
|
if (!msg) return;
|
|
if (!msg) return;
|
|
|
const payload = this.parseJson<AdClickMessage>(msg);
|
|
const payload = this.parseJson<AdClickMessage>(msg);
|
|
|
- if (!payload || !payload.messageId || !payload.uid || !payload.adId) {
|
|
|
|
|
|
|
+
|
|
|
|
|
+ // Validate required fields (use adsId from publisher, adId for backward compatibility)
|
|
|
|
|
+ const adId = payload?.adId || payload?.adsId;
|
|
|
|
|
+ if (!payload || !payload.uid || !adId) {
|
|
|
this.logger.warn(
|
|
this.logger.warn(
|
|
|
`Malformed ad.click message, dropping: ${msg.content.toString()}`,
|
|
`Malformed ad.click message, dropping: ${msg.content.toString()}`,
|
|
|
);
|
|
);
|
|
@@ -236,10 +241,11 @@ export class StatsEventsConsumer implements OnModuleInit, OnModuleDestroy {
|
|
|
return;
|
|
return;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- const status = await this.markProcessed(
|
|
|
|
|
- payload.messageId,
|
|
|
|
|
- 'stats.ad.click',
|
|
|
|
|
- );
|
|
|
|
|
|
|
+ // For deduplication, generate a simple messageId if not provided
|
|
|
|
|
+ const messageId =
|
|
|
|
|
+ payload.messageId || `${Date.now()}-${adId}-${payload.uid}`;
|
|
|
|
|
+
|
|
|
|
|
+ const status = await this.markProcessed(messageId, 'stats.ad.click');
|
|
|
if (status === 'duplicate') {
|
|
if (status === 'duplicate') {
|
|
|
this.ack(msg);
|
|
this.ack(msg);
|
|
|
return;
|
|
return;
|
|
@@ -251,32 +257,30 @@ export class StatsEventsConsumer implements OnModuleInit, OnModuleDestroy {
|
|
|
|
|
|
|
|
try {
|
|
try {
|
|
|
const client = this.prisma as any;
|
|
const client = this.prisma as any;
|
|
|
|
|
+ const now = nowEpochMsBigInt();
|
|
|
|
|
+
|
|
|
|
|
+ // Determine the click timestamp (use clickAt from publisher, or clickedAt for backward compatibility)
|
|
|
|
|
+ const clickTime = payload.clickAt || payload.clickedAt || now;
|
|
|
|
|
+
|
|
|
await client.adClickEvents.create({
|
|
await client.adClickEvents.create({
|
|
|
data: {
|
|
data: {
|
|
|
uid: payload.uid,
|
|
uid: payload.uid,
|
|
|
- adId: payload.adId,
|
|
|
|
|
- adsModuleId: payload.adsModuleId,
|
|
|
|
|
- channelId: payload.channelId,
|
|
|
|
|
- scene: payload.scene,
|
|
|
|
|
- slot: payload.slot,
|
|
|
|
|
|
|
+ adId: adId,
|
|
|
adType: payload.adType,
|
|
adType: payload.adType,
|
|
|
- clickedAt: this.toBigInt(payload.clickedAt),
|
|
|
|
|
|
|
+ clickedAt: this.toBigInt(clickTime),
|
|
|
ip: payload.ip,
|
|
ip: payload.ip,
|
|
|
- userAgent: payload.userAgent,
|
|
|
|
|
- appVersion: payload.appVersion ?? null,
|
|
|
|
|
- os: payload.os ?? null,
|
|
|
|
|
- createAt: this.toBigInt(payload.createAt),
|
|
|
|
|
- updateAt: this.toBigInt(payload.updateAt),
|
|
|
|
|
|
|
+ createAt: this.toBigInt(payload.createAt || now),
|
|
|
|
|
+ updateAt: this.toBigInt(payload.updateAt || now),
|
|
|
},
|
|
},
|
|
|
});
|
|
});
|
|
|
this.counters.adClick++;
|
|
this.counters.adClick++;
|
|
|
this.ack(msg);
|
|
this.ack(msg);
|
|
|
} catch (error: any) {
|
|
} catch (error: any) {
|
|
|
this.logger.error(
|
|
this.logger.error(
|
|
|
- `Failed to persist ad.click messageId=${payload.messageId}: ${error?.message ?? error}`,
|
|
|
|
|
|
|
+ `Failed to persist ad.click messageId=${messageId}: ${error?.message ?? error}`,
|
|
|
error?.stack,
|
|
error?.stack,
|
|
|
);
|
|
);
|
|
|
- await this.cleanupProcessed(payload.messageId);
|
|
|
|
|
|
|
+ await this.cleanupProcessed(messageId);
|
|
|
this.nackDrop(msg);
|
|
this.nackDrop(msg);
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|