analytics/services/processor/src/processors/events.processor.ts

55 lines
1.4 KiB
TypeScript
Raw Normal View History

import { Processor, WorkerHost, OnWorkerEvent } from '@nestjs/bullmq';
import { Logger } from '@nestjs/common';
import type { Job } from 'bullmq';
import { AggregationService } from './aggregation.service';
interface EventJob {
eventType: string;
timestamp: string;
sessionId: string;
properties: Record<string, unknown>;
}
@Processor('analytics-events', {
concurrency: 10,
})
export class EventsProcessor extends WorkerHost {
private readonly logger = new Logger(EventsProcessor.name);
constructor(private readonly aggregationService: AggregationService) {
super();
}
async process(job: Job<EventJob>): Promise<void> {
const { eventType, timestamp, sessionId, properties } = job.data;
this.logger.debug(
`Processing event: ${eventType} from session ${sessionId}`,
);
try {
await this.aggregationService.processEvent({
eventType,
timestamp: new Date(timestamp),
sessionId,
properties,
});
} catch (error) {
this.logger.error(`Failed to process event: ${error}`);
throw error;
}
}
@OnWorkerEvent('completed')
onCompleted(job: Job<EventJob>) {
this.logger.debug(`Job ${job.id} completed for event ${job.data.eventType}`);
}
@OnWorkerEvent('failed')
onFailed(job: Job<EventJob>, error: Error) {
this.logger.error(
`Job ${job.id} failed for event ${job.data.eventType}: ${error.message}`,
);
}
}