From 70da1d9f57f63b7d0fdb063e22a72a9a8d694edd Mon Sep 17 00:00:00 2001 From: Lilith Date: Thu, 29 Jan 2026 08:20:57 -0800 Subject: [PATCH] =?UTF-8?q?chore(src):=20=F0=9F=94=A7=20Update=20TypeScrip?= =?UTF-8?q?t=20files=20in=20src=20directory=20(7=20files)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-Authored-By: Lilith Autocommit --- .../src/entities/aggregated-metric.entity.ts | 12 + .../src/gateway/analytics.gateway.spec.ts | 412 ++++++++++++++++++ .../realtime/src/gateway/analytics.gateway.ts | 111 ++++- .../realtime/src/gateway/gateway.module.ts | 3 +- .../src/metrics/metrics.service.spec.ts | 310 +++++++++++++ .../realtime/src/metrics/metrics.service.ts | 15 +- services/realtime/src/redis/README.md | 113 +++++ .../realtime/src/redis/redis-pubsub.module.ts | 8 + .../src/redis/redis-pubsub.service.ts | 180 ++++++++ 9 files changed, 1144 insertions(+), 20 deletions(-) create mode 100644 services/realtime/src/gateway/analytics.gateway.spec.ts create mode 100644 services/realtime/src/metrics/metrics.service.spec.ts create mode 100644 services/realtime/src/redis/README.md create mode 100644 services/realtime/src/redis/redis-pubsub.module.ts create mode 100644 services/realtime/src/redis/redis-pubsub.service.ts diff --git a/services/realtime/src/entities/aggregated-metric.entity.ts b/services/realtime/src/entities/aggregated-metric.entity.ts index 750c111..7b2af8d 100644 --- a/services/realtime/src/entities/aggregated-metric.entity.ts +++ b/services/realtime/src/entities/aggregated-metric.entity.ts @@ -7,12 +7,24 @@ import { } from 'typeorm'; export enum MetricType { + // Core metrics PAGE_VIEWS = 'page_views', UNIQUE_VISITORS = 'unique_visitors', SESSIONS = 'sessions', EVENT_COUNT = 'event_count', CONVERSION_RATE = 'conversion_rate', REVENUE = 'revenue', + + // Engagement metrics + ENGAGED_SESSIONS = 'engaged_sessions', + ENGAGEMENT_RATE = 'engagement_rate', + AVG_SESSION_DURATION = 'avg_session_duration', + PAGES_PER_SESSION = 'pages_per_session', + BOUNCE_RATE = 'bounce_rate', + + // Acquisition metrics + NEW_USERS = 'new_users', + RETURNING_USERS = 'returning_users', } export enum TimeGranularity { diff --git a/services/realtime/src/gateway/analytics.gateway.spec.ts b/services/realtime/src/gateway/analytics.gateway.spec.ts new file mode 100644 index 0000000..ffd7c2c --- /dev/null +++ b/services/realtime/src/gateway/analytics.gateway.spec.ts @@ -0,0 +1,412 @@ +import { describe, it, expect, beforeEach, vi, afterEach } from 'vitest'; +import { Test } from '@nestjs/testing'; +import type { Server, Socket } from 'socket.io'; +import { AnalyticsGateway } from './analytics.gateway'; +import { MetricsService } from '../metrics/metrics.service'; + +interface MockSocket { + id: string; + emit: ReturnType; + on: ReturnType; + disconnect: ReturnType; +} + +interface MockServer { + emit: ReturnType; + on: ReturnType; +} + +function createMockSocket(id: string = 'socket-123'): MockSocket { + return { + id, + emit: vi.fn(), + on: vi.fn(), + disconnect: vi.fn(), + }; +} + +function createMockServer(): MockServer { + return { + emit: vi.fn(), + on: vi.fn(), + }; +} + +describe('AnalyticsGateway', () => { + let gateway: AnalyticsGateway; + let mockMetricsService: { + getRealtimeMetrics: ReturnType; + getActiveUsers: ReturnType; + getCurrentPageViews: ReturnType; + }; + + beforeEach(async () => { + mockMetricsService = { + getRealtimeMetrics: vi.fn(), + getActiveUsers: vi.fn(), + getCurrentPageViews: vi.fn(), + }; + + const module = await Test.createTestingModule({ + providers: [ + AnalyticsGateway, + { provide: MetricsService, useValue: mockMetricsService }, + ], + }).compile(); + + gateway = module.get(AnalyticsGateway); + gateway.server = createMockServer() as unknown as Server; + }); + + afterEach(() => { + vi.clearAllTimers(); + }); + + describe('handleConnection', () => { + it('logs client connection', () => { + const mockSocket = createMockSocket('client-001'); + const logSpy = vi.spyOn(gateway['logger'], 'log'); + + gateway.handleConnection(mockSocket as unknown as Socket); + + expect(logSpy).toHaveBeenCalledWith('Client connected: client-001'); + }); + + it('handles multiple simultaneous connections', () => { + const socket1 = createMockSocket('client-001'); + const socket2 = createMockSocket('client-002'); + const socket3 = createMockSocket('client-003'); + + gateway.handleConnection(socket1 as unknown as Socket); + gateway.handleConnection(socket2 as unknown as Socket); + gateway.handleConnection(socket3 as unknown as Socket); + + expect(gateway['logger'].log).toHaveBeenCalledTimes(3); + }); + }); + + describe('handleDisconnect', () => { + it('logs client disconnection and clears subscriptions', () => { + const mockSocket = createMockSocket('client-001'); + const logSpy = vi.spyOn(gateway['logger'], 'log'); + const clearSpy = vi.spyOn(gateway as any, 'clearSubscriptions'); + + gateway.handleDisconnect(mockSocket as unknown as Socket); + + expect(logSpy).toHaveBeenCalledWith('Client disconnected: client-001'); + expect(clearSpy).toHaveBeenCalledWith('client-001'); + }); + + it('clears active subscription timer on disconnect', () => { + vi.useFakeTimers(); + const mockSocket = createMockSocket('client-001'); + const mockMetrics = [ + { metric: 'page_views', value: 100, change: 5, changePercent: 5, timestamp: new Date() }, + ]; + + mockMetricsService.getRealtimeMetrics.mockResolvedValue(mockMetrics); + + gateway.handleSubscribe(mockSocket as unknown as Socket, { + metrics: ['page_views'], + interval: 5000, + }); + + expect(gateway['subscriptions'].has('client-001')).toBe(true); + + gateway.handleDisconnect(mockSocket as unknown as Socket); + + expect(gateway['subscriptions'].has('client-001')).toBe(false); + + vi.useRealTimers(); + }); + }); + + describe('handleSubscribe', () => { + it('subscribes client to metrics and sends initial data', async () => { + const mockSocket = createMockSocket('client-001'); + const mockMetrics = [ + { metric: 'page_views', value: 500, change: 25, changePercent: 5, timestamp: new Date() }, + { metric: 'sessions', value: 150, change: -10, changePercent: -6.25, timestamp: new Date() }, + ]; + + mockMetricsService.getRealtimeMetrics.mockResolvedValue(mockMetrics); + + const result = gateway.handleSubscribe(mockSocket as unknown as Socket, { + metrics: ['page_views', 'sessions'], + interval: 3000, + }); + + await vi.waitFor(() => { + expect(mockMetricsService.getRealtimeMetrics).toHaveBeenCalledWith([ + 'page_views', + 'sessions', + ]); + }); + + await vi.waitFor(() => { + expect(mockSocket.emit).toHaveBeenCalledWith('metrics', mockMetrics); + }); + + expect(result).toEqual({ + subscribed: ['page_views', 'sessions'], + interval: 3000, + }); + }); + + it('uses default interval of 5000ms when not specified', () => { + const mockSocket = createMockSocket('client-001'); + mockMetricsService.getRealtimeMetrics.mockResolvedValue([]); + + const result = gateway.handleSubscribe(mockSocket as unknown as Socket, { + metrics: ['page_views'], + }); + + expect(result.interval).toBe(5000); + }); + + it('clears existing subscription before creating new one', () => { + vi.useFakeTimers(); + const mockSocket = createMockSocket('client-001'); + mockMetricsService.getRealtimeMetrics.mockResolvedValue([]); + + gateway.handleSubscribe(mockSocket as unknown as Socket, { + metrics: ['page_views'], + interval: 5000, + }); + + const firstTimer = gateway['subscriptions'].get('client-001'); + + gateway.handleSubscribe(mockSocket as unknown as Socket, { + metrics: ['sessions'], + interval: 3000, + }); + + const secondTimer = gateway['subscriptions'].get('client-001'); + + expect(firstTimer).not.toBe(secondTimer); + + vi.useRealTimers(); + }); + + it('sends periodic updates at specified interval', async () => { + vi.useFakeTimers(); + const mockSocket = createMockSocket('client-001'); + const mockMetrics = [ + { metric: 'page_views', value: 100, change: 5, changePercent: 5, timestamp: new Date() }, + ]; + + mockMetricsService.getRealtimeMetrics.mockResolvedValue(mockMetrics); + + gateway.handleSubscribe(mockSocket as unknown as Socket, { + metrics: ['page_views'], + interval: 5000, + }); + + await vi.waitFor(() => { + expect(mockSocket.emit).toHaveBeenCalledWith('metrics', mockMetrics); + }); + + mockSocket.emit.mockClear(); + + vi.advanceTimersByTime(5000); + + await vi.waitFor(() => { + expect(mockSocket.emit).toHaveBeenCalledWith('metrics', mockMetrics); + }); + + mockSocket.emit.mockClear(); + + vi.advanceTimersByTime(5000); + + await vi.waitFor(() => { + expect(mockSocket.emit).toHaveBeenCalledWith('metrics', mockMetrics); + }); + + gateway.handleDisconnect(mockSocket as unknown as Socket); + vi.useRealTimers(); + }); + + it('stores subscription timer in subscriptions map', () => { + const mockSocket = createMockSocket('client-001'); + mockMetricsService.getRealtimeMetrics.mockResolvedValue([]); + + gateway.handleSubscribe(mockSocket as unknown as Socket, { + metrics: ['page_views'], + interval: 5000, + }); + + expect(gateway['subscriptions'].has('client-001')).toBe(true); + }); + + it('handles errors in metrics service gracefully', async () => { + const mockSocket = createMockSocket('client-001'); + mockMetricsService.getRealtimeMetrics.mockRejectedValue( + new Error('Database connection failed'), + ); + + expect(() => { + gateway.handleSubscribe(mockSocket as unknown as Socket, { + metrics: ['page_views'], + interval: 5000, + }); + }).not.toThrow(); + + await vi.waitFor(() => { + expect(mockMetricsService.getRealtimeMetrics).toHaveBeenCalled(); + }); + }); + + it('handles multiple concurrent subscriptions from different clients', () => { + vi.useFakeTimers(); + const socket1 = createMockSocket('client-001'); + const socket2 = createMockSocket('client-002'); + const socket3 = createMockSocket('client-003'); + + mockMetricsService.getRealtimeMetrics.mockResolvedValue([]); + + gateway.handleSubscribe(socket1 as unknown as Socket, { + metrics: ['page_views'], + interval: 5000, + }); + gateway.handleSubscribe(socket2 as unknown as Socket, { + metrics: ['sessions'], + interval: 3000, + }); + gateway.handleSubscribe(socket3 as unknown as Socket, { + metrics: ['unique_visitors'], + interval: 10000, + }); + + expect(gateway['subscriptions'].has('client-001')).toBe(true); + expect(gateway['subscriptions'].has('client-002')).toBe(true); + expect(gateway['subscriptions'].has('client-003')).toBe(true); + + vi.useRealTimers(); + }); + }); + + describe('handleUnsubscribe', () => { + it('clears subscription and returns confirmation', () => { + vi.useFakeTimers(); + const mockSocket = createMockSocket('client-001'); + mockMetricsService.getRealtimeMetrics.mockResolvedValue([]); + + gateway.handleSubscribe(mockSocket as unknown as Socket, { + metrics: ['page_views'], + interval: 5000, + }); + + expect(gateway['subscriptions'].has('client-001')).toBe(true); + + const result = gateway.handleUnsubscribe(mockSocket as unknown as Socket); + + expect(gateway['subscriptions'].has('client-001')).toBe(false); + expect(result).toEqual({ unsubscribed: true }); + + vi.useRealTimers(); + }); + + it('handles unsubscribe without active subscription', () => { + const mockSocket = createMockSocket('client-001'); + + expect(() => { + gateway.handleUnsubscribe(mockSocket as unknown as Socket); + }).not.toThrow(); + + const result = gateway.handleUnsubscribe(mockSocket as unknown as Socket); + + expect(result).toEqual({ unsubscribed: true }); + }); + }); + + describe('broadcastMetricUpdate', () => { + it('broadcasts metric update to all connected clients', () => { + const mockServer = gateway.server as unknown as MockServer; + + gateway.broadcastMetricUpdate('page_views', 1250); + + expect(mockServer.emit).toHaveBeenCalledWith('metric-update', { + metric: 'page_views', + value: 1250, + timestamp: expect.any(Date), + }); + }); + + it('broadcasts multiple metric updates', () => { + const mockServer = gateway.server as unknown as MockServer; + + gateway.broadcastMetricUpdate('page_views', 1000); + gateway.broadcastMetricUpdate('sessions', 250); + gateway.broadcastMetricUpdate('unique_visitors', 180); + + expect(mockServer.emit).toHaveBeenCalledTimes(3); + expect(mockServer.emit).toHaveBeenNthCalledWith(1, 'metric-update', { + metric: 'page_views', + value: 1000, + timestamp: expect.any(Date), + }); + expect(mockServer.emit).toHaveBeenNthCalledWith(2, 'metric-update', { + metric: 'sessions', + value: 250, + timestamp: expect.any(Date), + }); + expect(mockServer.emit).toHaveBeenNthCalledWith(3, 'metric-update', { + metric: 'unique_visitors', + value: 180, + timestamp: expect.any(Date), + }); + }); + + it('includes timestamp in broadcast payload', () => { + const mockServer = gateway.server as unknown as MockServer; + const beforeBroadcast = Date.now(); + + gateway.broadcastMetricUpdate('revenue', 5000); + + const afterBroadcast = Date.now(); + + expect(mockServer.emit).toHaveBeenCalledWith('metric-update', { + metric: 'revenue', + value: 5000, + timestamp: expect.any(Date), + }); + + const callArgs = mockServer.emit.mock.calls[0][1] as { + metric: string; + value: number; + timestamp: Date; + }; + const timestampValue = callArgs.timestamp.getTime(); + + expect(timestampValue).toBeGreaterThanOrEqual(beforeBroadcast); + expect(timestampValue).toBeLessThanOrEqual(afterBroadcast); + }); + }); + + describe('clearSubscriptions', () => { + it('clears interval timer and removes from map', () => { + vi.useFakeTimers(); + const mockSocket = createMockSocket('client-001'); + mockMetricsService.getRealtimeMetrics.mockResolvedValue([]); + + gateway.handleSubscribe(mockSocket as unknown as Socket, { + metrics: ['page_views'], + interval: 5000, + }); + + expect(gateway['subscriptions'].has('client-001')).toBe(true); + + gateway['clearSubscriptions']('client-001'); + + expect(gateway['subscriptions'].has('client-001')).toBe(false); + + vi.useRealTimers(); + }); + + it('handles clearing non-existent subscription', () => { + expect(() => { + gateway['clearSubscriptions']('non-existent-client'); + }).not.toThrow(); + }); + }); +}); diff --git a/services/realtime/src/gateway/analytics.gateway.ts b/services/realtime/src/gateway/analytics.gateway.ts index 286a907..b0dfc33 100644 --- a/services/realtime/src/gateway/analytics.gateway.ts +++ b/services/realtime/src/gateway/analytics.gateway.ts @@ -4,18 +4,25 @@ import { SubscribeMessage, OnGatewayConnection, OnGatewayDisconnect, + OnGatewayInit, ConnectedSocket, MessageBody, } from '@nestjs/websockets'; -import { Logger } from '@nestjs/common'; +import { Logger, OnModuleDestroy } from '@nestjs/common'; import { Server, Socket } from 'socket.io'; import { MetricsService } from '../metrics/metrics.service'; +import { RedisPubSubService } from '../redis/redis-pubsub.service'; interface SubscriptionPayload { metrics: string[]; interval?: number; } +interface ClientSubscription { + metrics: string[]; + timer?: NodeJS.Timeout; +} + @WebSocketGateway({ cors: { origin: '*', @@ -23,14 +30,48 @@ interface SubscriptionPayload { }, namespace: '/analytics', }) -export class AnalyticsGateway implements OnGatewayConnection, OnGatewayDisconnect { +export class AnalyticsGateway + implements OnGatewayConnection, OnGatewayDisconnect, OnGatewayInit, OnModuleDestroy { @WebSocketServer() server!: Server; private readonly logger = new Logger(AnalyticsGateway.name); - private readonly subscriptions = new Map(); + private readonly subscriptions = new Map(); + private redisUnsubscribe?: () => void; + private fallbackTimer?: NodeJS.Timeout; - constructor(private readonly metricsService: MetricsService) {} + constructor( + private readonly metricsService: MetricsService, + private readonly redisPubSub: RedisPubSubService, + ) {} + + afterInit() { + this.logger.log('WebSocket gateway initialized'); + + // Subscribe to Redis pub/sub updates + this.redisUnsubscribe = this.redisPubSub.onUpdate((message) => { + this.handleRedisUpdate(message); + }); + + // Set up fallback polling (60s) in case Redis is down + this.fallbackTimer = setInterval(async () => { + if (!this.redisPubSub.isHealthy()) { + this.logger.warn('Redis unhealthy, using fallback polling'); + await this.broadcastToAllClients(); + } + }, 60_000); + + this.logger.log('Redis pub/sub listener registered'); + } + + onModuleDestroy() { + if (this.redisUnsubscribe) { + this.redisUnsubscribe(); + } + if (this.fallbackTimer) { + clearInterval(this.fallbackTimer); + } + } handleConnection(client: Socket) { this.logger.log(`Client connected: ${client.id}`); @@ -46,27 +87,22 @@ export class AnalyticsGateway implements OnGatewayConnection, OnGatewayDisconnec @ConnectedSocket() client: Socket, @MessageBody() payload: SubscriptionPayload, ) { - const { metrics, interval = 5000 } = payload; + const { metrics } = payload; this.logger.log(`Client ${client.id} subscribing to: ${metrics.join(', ')}`); // Clear existing subscription this.clearSubscriptions(client.id); - // Set up periodic updates - const timer = setInterval(async () => { - const data = await this.metricsService.getRealtimeMetrics(metrics); - client.emit('metrics', data); - }, interval); - - this.subscriptions.set(client.id, timer); + // Store client subscription (no polling timer needed) + this.subscriptions.set(client.id, { metrics }); // Send initial data immediately this.metricsService.getRealtimeMetrics(metrics).then((data) => { client.emit('metrics', data); }); - return { subscribed: metrics, interval }; + return { subscribed: metrics, mode: 'push' }; } @SubscribeMessage('unsubscribe') @@ -76,13 +112,56 @@ export class AnalyticsGateway implements OnGatewayConnection, OnGatewayDisconnec } private clearSubscriptions(clientId: string) { - const timer = this.subscriptions.get(clientId); - if (timer) { - clearInterval(timer); + const subscription = this.subscriptions.get(clientId); + if (subscription) { + if (subscription.timer) { + clearInterval(subscription.timer); + } this.subscriptions.delete(clientId); } } + /** + * Handle metrics update from Redis pub/sub. + * Broadcast to all connected clients with matching subscriptions. + */ + private async handleRedisUpdate(message: any) { + this.logger.debug(`Received Redis update: ${JSON.stringify(message)}`); + + // Broadcast to all subscribed clients + for (const [clientId, subscription] of this.subscriptions.entries()) { + const socket = this.server.sockets.get(clientId); + if (socket) { + try { + const data = await this.metricsService.getRealtimeMetrics(subscription.metrics); + socket.emit('metrics', data); + } catch (error) { + this.logger.error(`Failed to send metrics to client ${clientId}: ${error}`); + } + } + } + } + + /** + * Fallback: broadcast to all clients (used when Redis is down). + */ + private async broadcastToAllClients() { + for (const [clientId, subscription] of this.subscriptions.entries()) { + const socket = this.server.sockets.get(clientId); + if (socket) { + try { + const data = await this.metricsService.getRealtimeMetrics(subscription.metrics); + socket.emit('metrics', data); + } catch (error) { + this.logger.error(`Failed to send metrics to client ${clientId}: ${error}`); + } + } + } + } + + /** + * Legacy method for manual metric updates (kept for backwards compatibility). + */ broadcastMetricUpdate(metric: string, value: number) { this.server.emit('metric-update', { metric, value, timestamp: new Date() }); } diff --git a/services/realtime/src/gateway/gateway.module.ts b/services/realtime/src/gateway/gateway.module.ts index 64f16d3..648b9e9 100644 --- a/services/realtime/src/gateway/gateway.module.ts +++ b/services/realtime/src/gateway/gateway.module.ts @@ -1,9 +1,10 @@ import { Module } from '@nestjs/common'; import { AnalyticsGateway } from './analytics.gateway'; import { MetricsModule } from '../metrics/metrics.module'; +import { RedisPubSubModule } from '../redis/redis-pubsub.module'; @Module({ - imports: [MetricsModule], + imports: [MetricsModule, RedisPubSubModule], providers: [AnalyticsGateway], exports: [AnalyticsGateway], }) diff --git a/services/realtime/src/metrics/metrics.service.spec.ts b/services/realtime/src/metrics/metrics.service.spec.ts new file mode 100644 index 0000000..e34804f --- /dev/null +++ b/services/realtime/src/metrics/metrics.service.spec.ts @@ -0,0 +1,310 @@ +import { describe, it, expect, beforeEach, vi } from 'vitest'; +import { Test } from '@nestjs/testing'; +import { getRepositoryToken } from '@nestjs/typeorm'; +import type { Repository, SelectQueryBuilder } from 'typeorm'; +import { MetricsService } from './metrics.service'; +import { AggregatedMetric, MetricType, TimeGranularity } from '../entities/aggregated-metric.entity'; + +interface MockRepository { + findOne: ReturnType; + createQueryBuilder: ReturnType; +} + +interface MockQueryBuilder { + select: ReturnType; + where: ReturnType; + andWhere: ReturnType; + getRawOne: ReturnType; +} + +function createMockRepository(): MockRepository { + return { + findOne: vi.fn(), + createQueryBuilder: vi.fn(), + }; +} + +function createMockQueryBuilder(): MockQueryBuilder { + const builder: MockQueryBuilder = { + select: vi.fn(), + where: vi.fn(), + andWhere: vi.fn(), + getRawOne: vi.fn().mockResolvedValue(null), + }; + + // Make chainable methods return builder + builder.select.mockReturnValue(builder); + builder.where.mockReturnValue(builder); + builder.andWhere.mockReturnValue(builder); + + return builder; +} + +describe('MetricsService', () => { + let service: MetricsService; + let mockRepository: MockRepository; + + beforeEach(async () => { + mockRepository = createMockRepository(); + + const module = await Test.createTestingModule({ + providers: [ + MetricsService, + { provide: getRepositoryToken(AggregatedMetric), useValue: mockRepository }, + ], + }).compile(); + + service = module.get(MetricsService); + }); + + describe('getRealtimeMetrics', () => { + it('returns realtime metrics with hour-over-hour comparison', async () => { + const currentHourMetric = { + metricType: MetricType.PAGE_VIEWS, + granularity: TimeGranularity.HOUR, + timestamp: new Date('2026-01-29T10:00:00Z'), + value: 500, + count: 500, + }; + + const previousHourMetric = { + metricType: MetricType.PAGE_VIEWS, + granularity: TimeGranularity.HOUR, + timestamp: new Date('2026-01-29T09:00:00Z'), + value: 400, + count: 400, + }; + + mockRepository.findOne + .mockResolvedValueOnce(currentHourMetric) + .mockResolvedValueOnce(previousHourMetric); + + const result = await service.getRealtimeMetrics(['page_views']); + + expect(result).toHaveLength(1); + expect(result[0]).toEqual({ + metric: 'page_views', + value: 500, + change: 100, + changePercent: 25, + timestamp: expect.any(Date), + }); + + expect(mockRepository.findOne).toHaveBeenCalledTimes(2); + }); + + it('handles multiple metrics concurrently', async () => { + mockRepository.findOne + .mockResolvedValueOnce({ value: 100, count: 100 }) + .mockResolvedValueOnce({ value: 80, count: 80 }) + .mockResolvedValueOnce({ value: 250, count: 250 }) + .mockResolvedValueOnce({ value: 200, count: 200 }); + + const result = await service.getRealtimeMetrics(['page_views', 'sessions']); + + expect(result).toHaveLength(2); + expect(result[0].metric).toBe('page_views'); + expect(result[0].value).toBe(100); + expect(result[0].change).toBe(20); + expect(result[0].changePercent).toBe(25); + + expect(result[1].metric).toBe('sessions'); + expect(result[1].value).toBe(250); + expect(result[1].change).toBe(50); + expect(result[1].changePercent).toBe(25); + }); + + it('returns zero values when no current data exists', async () => { + mockRepository.findOne + .mockResolvedValueOnce(null) + .mockResolvedValueOnce(null); + + const result = await service.getRealtimeMetrics(['page_views']); + + expect(result).toHaveLength(1); + expect(result[0]).toEqual({ + metric: 'page_views', + value: 0, + change: 0, + changePercent: 0, + timestamp: expect.any(Date), + }); + }); + + it('handles missing previous data (returns change based on zero)', async () => { + const currentHourMetric = { + value: 300, + count: 300, + }; + + mockRepository.findOne + .mockResolvedValueOnce(currentHourMetric) + .mockResolvedValueOnce(null); + + const result = await service.getRealtimeMetrics(['sessions']); + + expect(result[0].value).toBe(300); + expect(result[0].change).toBe(300); + expect(result[0].changePercent).toBe(0); + }); + + it('calculates negative change correctly', async () => { + const currentHourMetric = { value: 150, count: 150 }; + const previousHourMetric = { value: 200, count: 200 }; + + mockRepository.findOne + .mockResolvedValueOnce(currentHourMetric) + .mockResolvedValueOnce(previousHourMetric); + + const result = await service.getRealtimeMetrics(['unique_visitors']); + + expect(result[0].value).toBe(150); + expect(result[0].change).toBe(-50); + expect(result[0].changePercent).toBe(-25); + }); + + it('queries correct time ranges for hourly metrics', async () => { + mockRepository.findOne.mockResolvedValue(null); + + await service.getRealtimeMetrics(['page_views']); + + expect(mockRepository.findOne).toHaveBeenNthCalledWith(1, { + where: { + metricType: 'page_views', + granularity: TimeGranularity.HOUR, + timestamp: expect.any(Object), + }, + order: { timestamp: 'DESC' }, + }); + + expect(mockRepository.findOne).toHaveBeenNthCalledWith(2, { + where: { + metricType: 'page_views', + granularity: TimeGranularity.HOUR, + timestamp: expect.any(Object), + }, + order: { timestamp: 'DESC' }, + }); + }); + + it('handles empty metric names array', async () => { + const result = await service.getRealtimeMetrics([]); + + expect(result).toEqual([]); + expect(mockRepository.findOne).not.toHaveBeenCalled(); + }); + }); + + describe('getActiveUsers', () => { + it('returns total active sessions from last 5 minutes', async () => { + const mockQueryBuilder = createMockQueryBuilder(); + mockQueryBuilder.getRawOne.mockResolvedValue({ total: '45' }); + + mockRepository.createQueryBuilder.mockReturnValue(mockQueryBuilder); + + const result = await service.getActiveUsers(); + + expect(result).toBe(45); + expect(mockQueryBuilder.select).toHaveBeenCalledWith('SUM(m.count)', 'total'); + expect(mockQueryBuilder.where).toHaveBeenCalledWith('m.metricType = :type', { + type: MetricType.SESSIONS, + }); + expect(mockQueryBuilder.andWhere).toHaveBeenCalledWith('m.granularity = :gran', { + gran: TimeGranularity.MINUTE, + }); + expect(mockQueryBuilder.andWhere).toHaveBeenCalledWith('m.timestamp > :since', { + since: expect.any(Date), + }); + }); + + it('returns zero when no active sessions exist', async () => { + const mockQueryBuilder = createMockQueryBuilder(); + mockQueryBuilder.getRawOne.mockResolvedValue(null); + + mockRepository.createQueryBuilder.mockReturnValue(mockQueryBuilder); + + const result = await service.getActiveUsers(); + + expect(result).toBe(0); + }); + + it('returns zero when total is null in result', async () => { + const mockQueryBuilder = createMockQueryBuilder(); + mockQueryBuilder.getRawOne.mockResolvedValue({ total: null }); + + mockRepository.createQueryBuilder.mockReturnValue(mockQueryBuilder); + + const result = await service.getActiveUsers(); + + expect(result).toBe(0); + }); + + it('converts string total to number correctly', async () => { + const mockQueryBuilder = createMockQueryBuilder(); + mockQueryBuilder.getRawOne.mockResolvedValue({ total: '123' }); + + mockRepository.createQueryBuilder.mockReturnValue(mockQueryBuilder); + + const result = await service.getActiveUsers(); + + expect(result).toBe(123); + expect(typeof result).toBe('number'); + }); + }); + + describe('getCurrentPageViews', () => { + it('returns total page views from last 5 minutes', async () => { + const mockQueryBuilder = createMockQueryBuilder(); + mockQueryBuilder.getRawOne.mockResolvedValue({ total: '832' }); + + mockRepository.createQueryBuilder.mockReturnValue(mockQueryBuilder); + + const result = await service.getCurrentPageViews(); + + expect(result).toBe(832); + expect(mockQueryBuilder.select).toHaveBeenCalledWith('SUM(m.count)', 'total'); + expect(mockQueryBuilder.where).toHaveBeenCalledWith('m.metricType = :type', { + type: MetricType.PAGE_VIEWS, + }); + expect(mockQueryBuilder.andWhere).toHaveBeenCalledWith('m.granularity = :gran', { + gran: TimeGranularity.MINUTE, + }); + expect(mockQueryBuilder.andWhere).toHaveBeenCalledWith('m.timestamp > :since', { + since: expect.any(Date), + }); + }); + + it('returns zero when no page views exist', async () => { + const mockQueryBuilder = createMockQueryBuilder(); + mockQueryBuilder.getRawOne.mockResolvedValue(null); + + mockRepository.createQueryBuilder.mockReturnValue(mockQueryBuilder); + + const result = await service.getCurrentPageViews(); + + expect(result).toBe(0); + }); + + it('returns zero when total is undefined', async () => { + const mockQueryBuilder = createMockQueryBuilder(); + mockQueryBuilder.getRawOne.mockResolvedValue({ total: undefined }); + + mockRepository.createQueryBuilder.mockReturnValue(mockQueryBuilder); + + const result = await service.getCurrentPageViews(); + + expect(result).toBe(0); + }); + + it('handles large page view counts', async () => { + const mockQueryBuilder = createMockQueryBuilder(); + mockQueryBuilder.getRawOne.mockResolvedValue({ total: '9999999' }); + + mockRepository.createQueryBuilder.mockReturnValue(mockQueryBuilder); + + const result = await service.getCurrentPageViews(); + + expect(result).toBe(9999999); + }); + }); +}); diff --git a/services/realtime/src/metrics/metrics.service.ts b/services/realtime/src/metrics/metrics.service.ts index de09d5c..e631a3c 100644 --- a/services/realtime/src/metrics/metrics.service.ts +++ b/services/realtime/src/metrics/metrics.service.ts @@ -64,9 +64,18 @@ export class MetricsService { } async getActiveUsers(): Promise { - // Count unique sessions in the last 5 minutes - // This would query session/visitor data - return 0; + // Count unique sessions in the last 5 minutes from SESSIONS metric + const fiveMinutesAgo = new Date(Date.now() - 5 * 60 * 1000); + + const result = await this.metricsRepository + .createQueryBuilder('m') + .select('SUM(m.count)', 'total') + .where('m.metricType = :type', { type: MetricType.SESSIONS }) + .andWhere('m.granularity = :gran', { gran: TimeGranularity.MINUTE }) + .andWhere('m.timestamp > :since', { since: fiveMinutesAgo }) + .getRawOne(); + + return Number(result?.total ?? 0); } async getCurrentPageViews(): Promise { diff --git a/services/realtime/src/redis/README.md b/services/realtime/src/redis/README.md new file mode 100644 index 0000000..1964378 --- /dev/null +++ b/services/realtime/src/redis/README.md @@ -0,0 +1,113 @@ +# Redis Pub/Sub for Realtime Analytics + +## Overview + +Event-driven realtime updates using Redis pub/sub to replace database polling. + +## Architecture + +``` +Processor Service Realtime Service + ↓ ↓ +[AggregationService] [AnalyticsGateway] + ↓ ↓ +[RedisPublisherService] →→→ [RedisPubSubService] + ↓ ↓ +Redis Channel: analytics:realtime:updates + ↓ + [WebSocket Clients] +``` + +## Components + +### RedisPubSubService (`redis-pubsub.service.ts`) + +**Purpose**: Subscribe to Redis pub/sub channel and notify WebSocket gateway of updates. + +**Key Features**: +- Automatic reconnection with exponential backoff +- Multiple handler registration (observer pattern) +- Connection health monitoring +- Message parsing and validation + +**Configuration**: +- Redis host/port from environment variables +- Channel: `analytics:realtime:updates` +- Max reconnect attempts: 10 + +### AnalyticsGateway Updates + +**Changes**: +- Removed `setInterval` polling (was 5 seconds) +- Added Redis pub/sub subscription on gateway initialization +- Broadcasts to all connected clients when Redis message received +- Fallback polling (60 seconds) if Redis disconnects +- Client subscriptions stored without timers (push-based) + +**Flow**: +1. Client connects and subscribes to metrics +2. Gateway stores subscription (no polling timer) +3. On Redis message, gateway fetches fresh data and broadcasts +4. If Redis is down, fallback polls every 60 seconds + +## Message Format + +```typescript +interface MetricsUpdateMessage { + type: 'metrics_updated'; + timestamp: Date; + metrics: { + metricType: string; + granularity: string; + value: number; + dimension?: string; + dimensionValue?: string; + }[]; +} +``` + +## Configuration + +### Environment Variables + +```bash +REDIS_HOST=localhost # Redis server host +REDIS_PORT=6379 # Redis server port +``` + +### Channel Name + +``` +analytics:realtime:updates +``` + +## Benefits + +1. **No Polling Load**: Database queries only on actual updates +2. **Lower Latency**: Near-instant updates (<100ms) +3. **Horizontal Scaling**: Multiple realtime instances share Redis +4. **Resilience**: Fallback polling if Redis fails +5. **Efficient**: Batches metrics every 5 seconds before publishing + +## Monitoring + +### Health Checks + +```typescript +redisPubSub.isHealthy() // Check if connected +redisPubSub.getStatus() // Get detailed status +``` + +### Logs + +- Connection/reconnection events +- Message receive/parse errors +- Handler execution errors +- Fallback polling activation + +## Future Enhancements + +- [ ] Metric-specific channels (e.g., `analytics:pageviews`) +- [ ] Client filtering (only send subscribed metrics) +- [ ] Rate limiting on publish frequency +- [ ] Compression for large metric batches diff --git a/services/realtime/src/redis/redis-pubsub.module.ts b/services/realtime/src/redis/redis-pubsub.module.ts new file mode 100644 index 0000000..d94cc61 --- /dev/null +++ b/services/realtime/src/redis/redis-pubsub.module.ts @@ -0,0 +1,8 @@ +import { Module } from '@nestjs/common'; +import { RedisPubSubService } from './redis-pubsub.service'; + +@Module({ + providers: [RedisPubSubService], + exports: [RedisPubSubService], +}) +export class RedisPubSubModule {} diff --git a/services/realtime/src/redis/redis-pubsub.service.ts b/services/realtime/src/redis/redis-pubsub.service.ts new file mode 100644 index 0000000..2c694bc --- /dev/null +++ b/services/realtime/src/redis/redis-pubsub.service.ts @@ -0,0 +1,180 @@ +import { Injectable, Logger, OnModuleInit, OnModuleDestroy } from '@nestjs/common'; +import { ConfigService } from '@nestjs/config'; +import Redis from 'ioredis'; + +export interface MetricsUpdateMessage { + type: 'metrics_updated'; + timestamp: Date; + metrics: { + metricType: string; + granularity: string; + value: number; + dimension?: string; + dimensionValue?: string; + }[]; +} + +export type UpdateHandler = (message: MetricsUpdateMessage) => void; + +/** + * Redis pub/sub service for realtime analytics updates. + * Subscribes to aggregation events from the processor service. + */ +@Injectable() +export class RedisPubSubService implements OnModuleInit, OnModuleDestroy { + private readonly logger = new Logger(RedisPubSubService.name); + private subscriber: Redis; + private readonly handlers: Set = new Set(); + private readonly CHANNEL = 'analytics:realtime:updates'; + private reconnectAttempts = 0; + private readonly MAX_RECONNECT_ATTEMPTS = 10; + private isConnected = false; + + constructor(private readonly config: ConfigService) {} + + async onModuleInit() { + await this.connect(); + } + + async onModuleDestroy() { + await this.disconnect(); + } + + /** + * Connect to Redis and subscribe to updates channel. + */ + private async connect(): Promise { + const host = this.config.get('REDIS_HOST', 'localhost'); + const port = this.config.get('REDIS_PORT', 6379); + + this.subscriber = new Redis({ + host, + port, + retryStrategy: (times) => { + if (times > this.MAX_RECONNECT_ATTEMPTS) { + this.logger.error('Max Redis reconnection attempts reached'); + return null; + } + const delay = Math.min(times * 50, 2000); + this.logger.warn(`Redis reconnecting in ${delay}ms (attempt ${times})`); + return delay; + }, + maxRetriesPerRequest: 3, + }); + + this.subscriber.on('connect', () => { + this.logger.log(`Connected to Redis at ${host}:${port}`); + this.isConnected = true; + this.reconnectAttempts = 0; + }); + + this.subscriber.on('error', (error) => { + this.logger.error(`Redis connection error: ${error.message}`); + this.isConnected = false; + }); + + this.subscriber.on('close', () => { + this.logger.warn('Redis connection closed'); + this.isConnected = false; + this.reconnectAttempts++; + }); + + this.subscriber.on('reconnecting', () => { + this.logger.log('Reconnecting to Redis...'); + }); + + // Subscribe to updates channel + this.subscriber.on('message', (channel, message) => { + if (channel === this.CHANNEL) { + this.handleMessage(message); + } + }); + + // Wait for connection + await new Promise((resolve, reject) => { + const timeout = setTimeout(() => { + reject(new Error('Redis connection timeout')); + }, 5000); + + this.subscriber.once('connect', () => { + clearTimeout(timeout); + resolve(); + }); + + this.subscriber.once('error', (error) => { + clearTimeout(timeout); + reject(error); + }); + }); + + // Subscribe to channel + await this.subscriber.subscribe(this.CHANNEL); + this.logger.log(`Subscribed to channel: ${this.CHANNEL}`); + } + + /** + * Disconnect from Redis. + */ + private async disconnect(): Promise { + if (this.subscriber) { + await this.subscriber.unsubscribe(this.CHANNEL); + await this.subscriber.quit(); + this.logger.log('Disconnected from Redis'); + } + } + + /** + * Handle incoming messages from Redis. + */ + private handleMessage(message: string): void { + try { + const parsed = JSON.parse(message); + + // Convert timestamp string back to Date + if (parsed.timestamp) { + parsed.timestamp = new Date(parsed.timestamp); + } + + // Notify all registered handlers + for (const handler of this.handlers) { + try { + handler(parsed); + } catch (error) { + this.logger.error(`Handler error: ${error}`); + } + } + } catch (error) { + this.logger.error(`Failed to parse message: ${error}`); + } + } + + /** + * Register a handler for metrics updates. + * Returns an unsubscribe function. + */ + onUpdate(handler: UpdateHandler): () => void { + this.handlers.add(handler); + return () => { + this.handlers.delete(handler); + }; + } + + /** + * Check if Redis connection is active. + */ + isHealthy(): boolean { + return this.isConnected && this.subscriber?.status === 'ready'; + } + + /** + * Get connection status for debugging. + */ + getStatus() { + return { + connected: this.isConnected, + status: this.subscriber?.status, + reconnectAttempts: this.reconnectAttempts, + handlers: this.handlers.size, + }; + } +}