diff --git a/services/realtime/src/app.module.ts b/services/realtime/src/app.module.ts new file mode 100644 index 0000000..4a65270 --- /dev/null +++ b/services/realtime/src/app.module.ts @@ -0,0 +1,35 @@ +import { Module } from '@nestjs/common'; +import { ConfigModule, ConfigService } from '@nestjs/config'; +import { TypeOrmModule } from '@nestjs/typeorm'; +import { HealthModule } from './health/health.module'; +import { GatewayModule } from './gateway/gateway.module'; +import { MetricsModule } from './metrics/metrics.module'; + +@Module({ + imports: [ + ConfigModule.forRoot({ + isGlobal: true, + envFilePath: ['.env.local', '.env'], + }), + + TypeOrmModule.forRootAsync({ + inject: [ConfigService], + useFactory: (config: ConfigService) => ({ + type: 'postgres', + host: config.get('DATABASE_HOST', 'localhost'), + port: config.get('DATABASE_PORT', 5432), + username: config.get('DATABASE_USER', 'analytics'), + password: config.get('DATABASE_PASSWORD', 'analytics'), + database: config.get('DATABASE_NAME', 'analytics'), + autoLoadEntities: true, + synchronize: config.get('NODE_ENV') !== 'production', + logging: config.get('NODE_ENV') !== 'production', + }), + }), + + HealthModule, + GatewayModule, + MetricsModule, + ], +}) +export class AppModule {} diff --git a/services/realtime/src/entities/aggregated-metric.entity.ts b/services/realtime/src/entities/aggregated-metric.entity.ts new file mode 100644 index 0000000..750c111 --- /dev/null +++ b/services/realtime/src/entities/aggregated-metric.entity.ts @@ -0,0 +1,67 @@ +import { + Entity, + Column, + PrimaryGeneratedColumn, + CreateDateColumn, + Index, +} from 'typeorm'; + +export enum MetricType { + PAGE_VIEWS = 'page_views', + UNIQUE_VISITORS = 'unique_visitors', + SESSIONS = 'sessions', + EVENT_COUNT = 'event_count', + CONVERSION_RATE = 'conversion_rate', + REVENUE = 'revenue', +} + +export enum TimeGranularity { + MINUTE = 'minute', + HOUR = 'hour', + DAY = 'day', + WEEK = 'week', + MONTH = 'month', +} + +@Entity('aggregated_metrics') +@Index(['metricType', 'granularity', 'timestamp']) +@Index(['metricType', 'dimension', 'dimensionValue', 'timestamp']) +export class AggregatedMetric { + @PrimaryGeneratedColumn('uuid') + id!: string; + + @Column({ + type: 'enum', + enum: MetricType, + }) + @Index() + metricType!: MetricType; + + @Column({ + type: 'enum', + enum: TimeGranularity, + }) + granularity!: TimeGranularity; + + @Column({ type: 'timestamptz' }) + @Index() + timestamp!: Date; + + @Column({ type: 'decimal', precision: 20, scale: 4, default: 0 }) + value!: number; + + @Column({ type: 'bigint', default: 0 }) + count!: number; + + @Column({ nullable: true }) + dimension?: string; + + @Column({ nullable: true }) + dimensionValue?: string; + + @Column({ type: 'jsonb', nullable: true }) + metadata?: Record; + + @CreateDateColumn({ type: 'timestamptz' }) + createdAt!: Date; +} diff --git a/services/realtime/src/gateway/analytics.gateway.ts b/services/realtime/src/gateway/analytics.gateway.ts new file mode 100644 index 0000000..286a907 --- /dev/null +++ b/services/realtime/src/gateway/analytics.gateway.ts @@ -0,0 +1,89 @@ +import { + WebSocketGateway, + WebSocketServer, + SubscribeMessage, + OnGatewayConnection, + OnGatewayDisconnect, + ConnectedSocket, + MessageBody, +} from '@nestjs/websockets'; +import { Logger } from '@nestjs/common'; +import { Server, Socket } from 'socket.io'; +import { MetricsService } from '../metrics/metrics.service'; + +interface SubscriptionPayload { + metrics: string[]; + interval?: number; +} + +@WebSocketGateway({ + cors: { + origin: '*', + credentials: true, + }, + namespace: '/analytics', +}) +export class AnalyticsGateway implements OnGatewayConnection, OnGatewayDisconnect { + @WebSocketServer() + server!: Server; + + private readonly logger = new Logger(AnalyticsGateway.name); + private readonly subscriptions = new Map(); + + constructor(private readonly metricsService: MetricsService) {} + + handleConnection(client: Socket) { + this.logger.log(`Client connected: ${client.id}`); + } + + handleDisconnect(client: Socket) { + this.logger.log(`Client disconnected: ${client.id}`); + this.clearSubscriptions(client.id); + } + + @SubscribeMessage('subscribe') + handleSubscribe( + @ConnectedSocket() client: Socket, + @MessageBody() payload: SubscriptionPayload, + ) { + const { metrics, interval = 5000 } = payload; + + this.logger.log(`Client ${client.id} subscribing to: ${metrics.join(', ')}`); + + // Clear existing subscription + this.clearSubscriptions(client.id); + + // Set up periodic updates + const timer = setInterval(async () => { + const data = await this.metricsService.getRealtimeMetrics(metrics); + client.emit('metrics', data); + }, interval); + + this.subscriptions.set(client.id, timer); + + // Send initial data immediately + this.metricsService.getRealtimeMetrics(metrics).then((data) => { + client.emit('metrics', data); + }); + + return { subscribed: metrics, interval }; + } + + @SubscribeMessage('unsubscribe') + handleUnsubscribe(@ConnectedSocket() client: Socket) { + this.clearSubscriptions(client.id); + return { unsubscribed: true }; + } + + private clearSubscriptions(clientId: string) { + const timer = this.subscriptions.get(clientId); + if (timer) { + clearInterval(timer); + this.subscriptions.delete(clientId); + } + } + + broadcastMetricUpdate(metric: string, value: number) { + this.server.emit('metric-update', { metric, value, timestamp: new Date() }); + } +} diff --git a/services/realtime/src/gateway/gateway.module.ts b/services/realtime/src/gateway/gateway.module.ts new file mode 100644 index 0000000..64f16d3 --- /dev/null +++ b/services/realtime/src/gateway/gateway.module.ts @@ -0,0 +1,10 @@ +import { Module } from '@nestjs/common'; +import { AnalyticsGateway } from './analytics.gateway'; +import { MetricsModule } from '../metrics/metrics.module'; + +@Module({ + imports: [MetricsModule], + providers: [AnalyticsGateway], + exports: [AnalyticsGateway], +}) +export class GatewayModule {} diff --git a/services/realtime/src/health/health.controller.ts b/services/realtime/src/health/health.controller.ts new file mode 100644 index 0000000..8019b9a --- /dev/null +++ b/services/realtime/src/health/health.controller.ts @@ -0,0 +1,16 @@ +import { Controller, Get } from '@nestjs/common'; +import { HealthCheck, HealthCheckService, TypeOrmHealthIndicator } from '@nestjs/terminus'; + +@Controller('health') +export class HealthController { + constructor( + private health: HealthCheckService, + private db: TypeOrmHealthIndicator, + ) {} + + @Get() + @HealthCheck() + check() { + return this.health.check([() => this.db.pingCheck('database')]); + } +} diff --git a/services/realtime/src/health/health.module.ts b/services/realtime/src/health/health.module.ts new file mode 100644 index 0000000..0208ef7 --- /dev/null +++ b/services/realtime/src/health/health.module.ts @@ -0,0 +1,9 @@ +import { Module } from '@nestjs/common'; +import { TerminusModule } from '@nestjs/terminus'; +import { HealthController } from './health.controller'; + +@Module({ + imports: [TerminusModule], + controllers: [HealthController], +}) +export class HealthModule {} diff --git a/services/realtime/src/main.ts b/services/realtime/src/main.ts new file mode 100644 index 0000000..db4692c --- /dev/null +++ b/services/realtime/src/main.ts @@ -0,0 +1,21 @@ +import { NestFactory } from '@nestjs/core'; +import { Logger } from '@nestjs/common'; +import { AppModule } from './app.module'; + +async function bootstrap() { + const logger = new Logger('RealtimeService'); + const app = await NestFactory.create(AppModule); + + app.enableCors({ + origin: process.env.CORS_ORIGIN ?? '*', + credentials: true, + }); + + const port = process.env.PORT ?? 3004; + await app.listen(port); + + logger.log(`Analytics realtime service running on port ${port}`); + logger.log(`WebSocket gateway available at ws://localhost:${port}`); +} + +bootstrap(); diff --git a/services/realtime/src/metrics/metrics.module.ts b/services/realtime/src/metrics/metrics.module.ts new file mode 100644 index 0000000..4d9f362 --- /dev/null +++ b/services/realtime/src/metrics/metrics.module.ts @@ -0,0 +1,11 @@ +import { Module } from '@nestjs/common'; +import { TypeOrmModule } from '@nestjs/typeorm'; +import { MetricsService } from './metrics.service'; +import { AggregatedMetric } from '../entities/aggregated-metric.entity'; + +@Module({ + imports: [TypeOrmModule.forFeature([AggregatedMetric])], + providers: [MetricsService], + exports: [MetricsService], +}) +export class MetricsModule {} diff --git a/services/realtime/src/metrics/metrics.service.ts b/services/realtime/src/metrics/metrics.service.ts new file mode 100644 index 0000000..de09d5c --- /dev/null +++ b/services/realtime/src/metrics/metrics.service.ts @@ -0,0 +1,85 @@ +import { Injectable } from '@nestjs/common'; +import { InjectRepository } from '@nestjs/typeorm'; +import { Repository, MoreThan } from 'typeorm'; +import { AggregatedMetric, MetricType, TimeGranularity } from '../entities/aggregated-metric.entity'; + +export interface RealtimeMetric { + metric: string; + value: number; + change: number; + changePercent: number; + timestamp: Date; +} + +@Injectable() +export class MetricsService { + constructor( + @InjectRepository(AggregatedMetric) + private readonly metricsRepository: Repository, + ) {} + + async getRealtimeMetrics(metricNames: string[]): Promise { + const now = new Date(); + const oneHourAgo = new Date(now.getTime() - 60 * 60 * 1000); + const twoHoursAgo = new Date(now.getTime() - 2 * 60 * 60 * 1000); + + const results: RealtimeMetric[] = []; + + for (const metricName of metricNames) { + // Get current hour's data + const currentData = await this.metricsRepository.findOne({ + where: { + metricType: metricName as MetricType, + granularity: TimeGranularity.HOUR, + timestamp: MoreThan(oneHourAgo), + }, + order: { timestamp: 'DESC' }, + }); + + // Get previous hour's data for comparison + const previousData = await this.metricsRepository.findOne({ + where: { + metricType: metricName as MetricType, + granularity: TimeGranularity.HOUR, + timestamp: MoreThan(twoHoursAgo), + }, + order: { timestamp: 'DESC' }, + }); + + const currentValue = currentData ? Number(currentData.value) : 0; + const previousValue = previousData ? Number(previousData.value) : 0; + const change = currentValue - previousValue; + const changePercent = previousValue > 0 ? (change / previousValue) * 100 : 0; + + results.push({ + metric: metricName, + value: currentValue, + change, + changePercent, + timestamp: now, + }); + } + + return results; + } + + async getActiveUsers(): Promise { + // Count unique sessions in the last 5 minutes + // This would query session/visitor data + return 0; + } + + async getCurrentPageViews(): Promise { + const fiveMinutesAgo = new Date(Date.now() - 5 * 60 * 1000); + + const result = await this.metricsRepository + .createQueryBuilder('m') + .select('SUM(m.count)', 'total') + .where('m.metricType = :type', { type: MetricType.PAGE_VIEWS }) + .andWhere('m.granularity = :gran', { gran: TimeGranularity.MINUTE }) + .andWhere('m.timestamp > :since', { since: fiveMinutesAgo }) + .getRawOne(); + + return Number(result?.total ?? 0); + } +}