Files
fabrikanabytok/apps/fabrikanabytok/lib/websocket/socket-server.ts
2025-11-28 20:47:00 +01:00

714 lines
21 KiB
TypeScript

import type { Server as SocketIOServer, Socket } from "socket.io"
import { logger } from "../utils/logger"
import { verifyToken } from "../utils/jwt"
import { logActivity } from "../db/analytics"
import { getConfig } from "../config"
import { getDb } from "../db/mongodb"
import {
trackActivity,
trackFeatureUsage,
checkSubscriptionLimit,
detectSuspiciousActivity
} from "../utils/activity-tracker"
import type { UserActivityEvent, FeatureUsageEvent } from "../types/socket.types"
let io: SocketIOServer | null = null
const connectionsByIP = new Map<string, Set<string>>()
const connectionsByClientId = new Map<string, Socket>()
const roomSubscriptions = new Map<string, Set<string>>() // room -> Set of clientIds
export function setupSocketIO(socketIO: SocketIOServer) {
io = socketIO
io.use(async (socket, next) => {
try {
const token = socket.handshake.auth.token
const clientId = socket.handshake.auth.clientId || socket.id
const ipAddress = socket.handshake.address
socket.data.clientId = clientId
socket.data.ipAddress = ipAddress
const existingConnections = connectionsByIP.get(ipAddress) || new Set()
const config = await getConfig()
const maxConnectionsPerIP = config.websocket?.maxConnectionsPerIP || 5
if (existingConnections.size >= maxConnectionsPerIP && !existingConnections.has(clientId)) {
logger.warn(`Max connections reached for IP: ${ipAddress}`)
return next(new Error("Maximum connections per IP reached"))
}
if (token) {
try {
const decoded = await verifyToken(token)
socket.data.user = decoded
socket.data.isAdmin = decoded.role === "admin" || decoded.role === "superadmin"
socket.data.isSuperAdmin = decoded.role === "superadmin"
} catch (error) {
logger.warn("Invalid token provided", { clientId })
}
}
next()
} catch (error) {
logger.error("Socket authentication error:", error as Record<string, any>)
next(new Error("Authentication failed"))
}
})
io.on("connection", async (socket: Socket) => {
const clientId = socket.data.clientId
const ipAddress = socket.data.ipAddress
const isAdmin = socket.data.isAdmin || false
const isSuperAdmin = socket.data.isSuperAdmin || false
logger.info(`Client connected: ${clientId} from ${ipAddress} (Admin: ${isAdmin}, SuperAdmin: ${isSuperAdmin})`)
if (!connectionsByIP.has(ipAddress)) {
connectionsByIP.set(ipAddress, new Set())
}
connectionsByIP.get(ipAddress)!.add(clientId)
connectionsByClientId.set(clientId, socket)
if (isSuperAdmin) {
socket.join("superadmin")
}
if (isAdmin) {
socket.join("admin")
}
socket.join("public")
// Join user's personal notification channel for gamification and other user-specific events
const userId = socket.data.user?.id || socket.data.clientId
socket.join(`user-${userId}`)
logger.info(`User ${userId} joined personal notification channel`)
await logActivity("websocket_connection", "Client connected", {
type: "websocket_connection",
metadata: { clientId, ipAddress, isAdmin, isSuperAdmin },
})
socket.on("ping", (callback) => {
if (typeof callback === "function") {
callback({
pong: true,
timestamp: Date.now(),
transport: socket.conn.transport.name, // WebSocket or polling
})
}
})
// ===== ACTIVITY TRACKING EVENTS =====
socket.on("track:activity", async (data: {
eventType: string
eventData: any
timestamp: number
page?: string
}) => {
const userId = socket.data.user?.id || socket.data.clientId
const activityEvent: UserActivityEvent = {
userId,
sessionId: socket.id,
eventType: data.eventType as any,
eventData: data.eventData,
timestamp: new Date(data.timestamp),
page: data.page,
userAgent: socket.handshake.headers["user-agent"],
ipAddress: socket.data.ipAddress,
}
await trackActivity(activityEvent)
// Check for suspicious patterns
const suspiciousCheck = await detectSuspiciousActivity(userId, socket.id)
if (suspiciousCheck.suspicious) {
socket.emit("security:warning", {
reasons: suspiciousCheck.reasons,
severity: "medium",
})
// Notify admins
io!.to("admin").emit("security:alert", {
userId,
reasons: suspiciousCheck.reasons,
timestamp: Date.now(),
})
}
})
socket.on("track:feature", async (data: {
feature: string
action: string
metadata?: any
timestamp: number
}) => {
const userId = socket.data.user?.id || socket.data.clientId
// Check subscription limits BEFORE allowing action
const limitCheck = await checkSubscriptionLimit(userId, data.feature)
if (!limitCheck.allowed) {
socket.emit("subscription:limit_reached", {
feature: data.feature,
currentUsage: limitCheck.currentUsage,
limit: limitCheck.limit,
message: `You've reached your ${data.feature} limit. Upgrade to continue.`,
})
return
}
const featureEvent: FeatureUsageEvent = {
userId,
feature: data.feature,
action: data.action,
metadata: data.metadata,
subscriptionTier: socket.data.user?.subscriptionTier || "free",
timestamp: new Date(data.timestamp),
}
await trackFeatureUsage(featureEvent)
// Warn user if approaching limit (80%)
if (limitCheck.percentage >= 80 && limitCheck.percentage < 100) {
socket.emit("subscription:usage_warning", {
feature: data.feature,
currentUsage: limitCheck.currentUsage,
limit: limitCheck.limit,
percentage: limitCheck.percentage,
})
}
})
// ===== END ACTIVITY TRACKING EVENTS =====
socket.on("subscribe", (channel: string) => {
const config = getConfigSync()
const allowedChannels = config.websocket?.allowedChannels || ["notifications", "updates"]
if (allowedChannels.includes(channel)) {
socket.join(channel)
if (!roomSubscriptions.has(channel)) {
roomSubscriptions.set(channel, new Set())
}
roomSubscriptions.get(channel)!.add(clientId)
logger.info(`Client ${clientId} subscribed to ${channel}`)
socket.emit("subscribed", { channel })
} else {
socket.emit("error", { message: "Channel not allowed" })
}
})
socket.on("unsubscribe", (channel: string) => {
socket.leave(channel)
if (roomSubscriptions.has(channel)) {
roomSubscriptions.get(channel)!.delete(clientId)
}
logger.info(`Client ${clientId} unsubscribed from ${channel}`)
socket.emit("unsubscribed", { channel })
})
socket.on("ai:coach:start", (data: { userId: string }) => {
logger.info(`AI Coach session started for user: ${data.userId}`)
socket.join(`ai-coach-${data.userId}`)
})
socket.on("ai:coach:stop", (data: { userId: string }) => {
logger.info(`AI Coach session stopped for user: ${data.userId}`)
socket.leave(`ai-coach-${data.userId}`)
})
socket.on("workspace:join", async (data: { workspaceId: string }) => {
const { workspaceId } = data
socket.join(`workspace-${workspaceId}`)
const userName = socket.data.user?.name || "Anonymous"
const userId = socket.data.user?.id || socket.data.clientId
// Notify others in the workspace
socket.to(`workspace-${workspaceId}`).emit("workspace:user-joined", {
userId,
userName,
timestamp: Date.now(),
})
// Send current users list to the joining user
const socketsInRoom = await io!.in(`workspace-${workspaceId}`).fetchSockets()
const users = socketsInRoom.map((s) => ({
userId: s.data.user?.id || s.data.clientId,
userName: s.data.user?.name || "Anonymous",
socketId: s.id,
}))
socket.emit("workspace:users", { users })
logger.info(`User ${userName} joined workspace ${workspaceId}`)
})
socket.on("workspace:leave", (data: { workspaceId: string }) => {
const { workspaceId } = data
const userName = socket.data.user?.name || "Anonymous"
const userId = socket.data.user?.id || socket.data.clientId
socket.leave(`workspace-${workspaceId}`)
socket.to(`workspace-${workspaceId}`).emit("workspace:user-left", {
userId,
userName,
timestamp: Date.now(),
})
logger.info(`User ${userName} left workspace ${workspaceId}`)
})
socket.on("workspace:send-message", async (data: { workspaceId: string; message: string }) => {
const { workspaceId, message } = data
const userName = socket.data.user?.name || "Anonymous"
const userId = socket.data.user?.id || socket.data.clientId
const messageData = {
userId,
userName,
message,
createdAt: new Date().toISOString(),
}
// Broadcast to all users in the workspace including sender
io!.to(`workspace-${workspaceId}`).emit("workspace:message", messageData)
// Save message to database
try {
const db = await getDb()
await db?.collection("workspace_messages")?.insertOne({
workspaceId,
...messageData,
})
} catch (error) {
logger.error("Failed to save workspace message:", error as Record<string, any>)
}
})
socket.on(
"workspace:document-update",
(data: { workspaceId: string; documentId: string; content: any; cursorPosition?: any }) => {
const { workspaceId, documentId, content, cursorPosition } = data
const userName = socket.data.user?.name || "Anonymous"
const userId = socket.data.user?.id || socket.data.clientId
// Broadcast document update to others (not sender)
socket.to(`workspace-${workspaceId}`).emit("workspace:document-updated", {
documentId,
content,
userId,
userName,
cursorPosition,
timestamp: Date.now(),
})
},
)
socket.on("workspace:cursor-move", (data: { workspaceId: string; documentId: string; position: any }) => {
const { workspaceId, documentId, position } = data
const userName = socket.data.user?.name || "Anonymous"
const userId = socket.data.user?.id || socket.data.clientId
// Broadcast cursor position to others
socket.to(`workspace-${workspaceId}`).emit("workspace:cursor-moved", {
documentId,
userId,
userName,
position,
timestamp: Date.now(),
})
})
socket.on("workspace:whiteboard-update", (data: { workspaceId: string; shapes: any }) => {
const { workspaceId, shapes } = data
// Broadcast whiteboard update to others
socket.to(`workspace-${workspaceId}`).emit("workspace:whiteboard-updated", {
shapes,
timestamp: Date.now(),
})
})
socket.on("workspace:typing-start", (data: { workspaceId: string }) => {
const { workspaceId } = data
const userName = socket.data.user?.name || "Anonymous"
const userId = socket.data.user?.id || socket.data.clientId
socket.to(`workspace-${workspaceId}`).emit("workspace:user-typing", {
userId,
userName,
isTyping: true,
})
})
socket.on("workspace:typing-stop", (data: { workspaceId: string }) => {
const { workspaceId } = data
const userName = socket.data.user?.name || "Anonymous"
const userId = socket.data.user?.id || socket.data.clientId
socket.to(`workspace-${workspaceId}`).emit("workspace:user-typing", {
userId,
userName,
isTyping: false,
})
})
// ===== PLANNER COLLABORATION EVENTS =====
socket.on("design:join", async (data: { designId: string; userId: string; userName: string }) => {
const { designId, userId, userName } = data
const room = `design-${designId}`
socket.join(room)
// Notify others in the design
socket.to(room).emit("design:user-joined", {
userId,
userName,
timestamp: Date.now(),
})
// Send current users list
const socketsInRoom = await io!.in(room).fetchSockets()
const users = socketsInRoom
.filter((s) => s.id !== socket.id)
.map((s) => ({
userId: s.data.user?.id || s.data.clientId,
userName: s.data.user?.name || "Anonymous",
socketId: s.id,
}))
socket.emit("design:users", { users })
logger.info(`User ${userName} joined design ${designId}`)
})
socket.on("design:leave", (data: { designId: string; userId: string }) => {
const { designId, userId } = data
const room = `design-${designId}`
const userName = socket.data.user?.name || "Anonymous"
socket.leave(room)
socket.to(room).emit("design:user-left", {
userId,
userName,
timestamp: Date.now(),
})
logger.info(`User ${userName} left design ${designId}`)
})
socket.on("design:cursor-move", (data: {
designId: string
position: [number, number, number]
userId: string
userName: string
timestamp: number
}) => {
const { designId, position, userId, userName, timestamp } = data
const room = `design-${designId}`
// Get user color (assign based on userId hash)
const color = getUserColor(userId)
// Broadcast to others
socket.to(room).emit("design:cursor-moved", {
userId,
userName,
position,
color,
timestamp,
})
})
socket.on("design:object-update", (data: {
designId: string
objectId: string
updates: any
userId: string
userName: string
}) => {
const { designId, objectId, updates, userId, userName } = data
const room = `design-${designId}`
// Broadcast to others
socket.to(room).emit("design:object-updated", {
objectId,
updates,
userId,
userName,
timestamp: Date.now(),
})
logger.info(`User ${userName} updated object ${objectId} in design ${designId}`)
})
socket.on("design:object-add", (data: {
designId: string
object: any
userId: string
}) => {
const { designId, object, userId } = data
const room = `design-${designId}`
socket.to(room).emit("design:object-added", {
object,
userId,
timestamp: Date.now(),
})
})
socket.on("design:object-remove", (data: {
designId: string
objectId: string
userId: string
}) => {
const { designId, objectId, userId } = data
const room = `design-${designId}`
socket.to(room).emit("design:object-removed", {
objectId,
userId,
timestamp: Date.now(),
})
})
socket.on("design:selection-change", (data: {
designId: string
objectId: string | null
userId: string
userName: string
}) => {
const { designId, objectId, userId, userName } = data
const room = `design-${designId}`
const color = getUserColor(userId)
socket.to(room).emit("design:selection-changed", {
userId,
userName,
objectId,
color,
timestamp: Date.now(),
})
})
// ===== END PLANNER COLLABORATION EVENTS =====
socket.on("webinar:join", async (data: { webinarId: string; userId: string }) => {
const room = `webinar:${data.webinarId}`
socket.join(room)
// Broadcast to room
socketIO.to(room).emit("webinar:participant-joined", {
userId: data.userId,
userName: socket.data.userName || "Anonymous",
role: socket.data.role || "attendee",
})
console.log(`[v0] User ${data.userId} joined webinar ${data.webinarId}`)
})
socket.on("webinar:leave", async (data: { webinarId: string; userId: string }) => {
const room = `webinar:${data.webinarId}`
socket.leave(room)
socketIO.to(room).emit("webinar:participant-left", {
userId: data.userId,
})
console.log(`[v0] User ${data.userId} left webinar ${data.webinarId}`)
})
if (isSuperAdmin) {
socket.on("admin:broadcast", (data: { channel: string; event: string; message: any }) => {
io!.to(data.channel).emit(data.event, data.message)
logger.info(`SuperAdmin broadcast to ${data.channel}: ${data.event}`)
logActivity("admin_broadcast", "SuperAdmin broadcast message", {
type: "admin_broadcast",
metadata: { channel: data.channel, event: data.event, clientId },
})
})
socket.on("admin:stats", async (callback) => {
const stats = {
totalConnections: io!.engine.clientsCount,
connectionsByIP: Array.from(connectionsByIP.entries()).map(([ip, clients]) => ({
ip,
count: clients.size,
clients: Array.from(clients),
})),
rooms: Array.from(roomSubscriptions.entries()).map(([room, clients]) => ({
room,
count: clients.size,
clients: Array.from(clients),
})),
timestamp: Date.now(),
}
if (typeof callback === "function") {
callback(stats)
}
})
socket.on("admin:disconnect_client", (targetClientId: string) => {
const targetSocket = connectionsByClientId.get(targetClientId)
if (targetSocket) {
targetSocket.disconnect(true)
logger.info(`SuperAdmin disconnected client: ${targetClientId}`)
logActivity("admin_disconnect", "SuperAdmin disconnected client", {
type: "admin_disconnect",
metadata: { targetClientId, adminClientId: clientId },
})
}
})
socket.on("admin:create_room", (roomName: string) => {
if (!roomSubscriptions.has(roomName)) {
roomSubscriptions.set(roomName, new Set())
logger.info(`SuperAdmin created room: ${roomName}`)
socket.emit("room_created", { room: roomName })
}
})
socket.on("admin:emit_to_client", (data: { clientId: string; event: string; message: any }) => {
const targetSocket = connectionsByClientId.get(data.clientId)
if (targetSocket) {
targetSocket.emit(data.event, data.message)
logger.info(`SuperAdmin emitted to client ${data.clientId}: ${data.event}`)
}
})
}
socket.on("disconnect", (reason) => {
logger.info(`Client disconnected: ${clientId} (${reason})`)
const ipConnections = connectionsByIP.get(ipAddress)
if (ipConnections) {
ipConnections.delete(clientId)
if (ipConnections.size === 0) {
connectionsByIP.delete(ipAddress)
}
}
connectionsByClientId.delete(clientId)
// Clean up room subscriptions
roomSubscriptions.forEach((clients) => {
clients.delete(clientId)
})
logActivity("websocket_disconnection", "Client disconnected", {
type: "websocket_disconnection",
metadata: { clientId, ipAddress, reason },
})
})
})
logger.info("Socket.IO server setup complete")
}
export function getSocketIO(): SocketIOServer {
if (!io) {
throw new Error("Socket.IO server not initialized")
}
return io
}
export function emitToChannel(channel: string, event: string, data: any) {
if (io) {
io.to(channel).emit(event, data)
}
}
export function emitToAdmin(event: string, data: any) {
if (io) {
io.to("admin").emit(event, data)
}
}
export function emitToSuperAdmin(event: string, data: any) {
if (io) {
io.to("superadmin").emit(event, data)
}
}
export function emitToAll(event: string, data: any) {
if (io) {
io.emit(event, data)
}
}
export function emitToClient(clientId: string, event: string, data: any) {
const socket = connectionsByClientId.get(clientId)
if (socket) {
socket.emit(event, data)
}
}
// Emit to user by userId (preferred for authenticated users)
export function emitToUser(userId: string, event: string, data: any) {
if (io) {
io.to(`user-${userId}`).emit(event, data)
logger.info(`Emitted ${event} to user ${userId}`)
}
}
export function emitToAICoach(userId: string, event: string, data: any) {
if (io) {
io.to(`ai-coach-${userId}`).emit(event, data)
}
}
export function emitToWorkspace(workspaceId: string, event: string, data: any) {
if (io) {
io.to(`workspace-${workspaceId}`).emit(event, data)
}
}
let cachedConfigSync: any = null
function getConfigSync() {
return (
cachedConfigSync || {
websocket: {
maxConnectionsPerIP: 5,
allowedChannels: ["notifications", "updates", "chat", "admin", "workspace", "webinar"],
},
}
)
}
export function updateCachedConfigSync(config: any) {
cachedConfigSync = config
}
// Helper function to assign consistent colors to users
function getUserColor(userId: string): string {
const colors = [
"#3b82f6", // blue
"#22c55e", // green
"#f59e0b", // amber
"#ef4444", // red
"#8b5cf6", // purple
"#ec4899", // pink
"#06b6d4", // cyan
"#f97316", // orange
]
// Simple hash function to assign color
let hash = 0
for (let i = 0; i < userId.length; i++) {
hash = userId.charCodeAt(i) + ((hash << 5) - hash)
}
return colors[Math.abs(hash) % colors.length]
}