Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 36 additions & 107 deletions apps/backend/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,27 +5,13 @@ import { createClient } from 'redis';
import dotenv from 'dotenv';
import { eq } from 'drizzle-orm';
import { db } from './db/index.js';
import { conversationMembers, users } from './db/schema.js';
import { conversationMembers } from './db/schema.js';
import { socketAuthMiddleware, type AuthSocket } from './middleware/socketAuth.js';
import { registerMessagingHandlers } from './socket/messaging.js';
import { app } from './app.js';
import { redis as appRedis } from './lib/redis.js';
import { setSocketServer } from './lib/socket.js';
import { setOnline, setOffline } from './services/presence.js';
import { startHeartbeatTimer, clearHeartbeatTimer } from './services/heartbeat.js';
import {
registerDeviceSocket,
unregisterDeviceSocket,
isDeviceRevoked,
startDeviceRevocationListener,
} from './services/deviceRevocation.js';
import {
checkRateLimit,
checkPayloadSize,
recordViolation,
clearViolations,
} from './services/rateLimit.js';
import { registerForBackpressure, unregisterForBackpressure } from './services/backpressure.js';
import { setOnline, setOffline, refreshPresence, isOnline } from './services/presence.js';
import {
buildRpcFetcher,
buildTreasuryRpcFetcher,
Expand All @@ -50,57 +36,8 @@ io.use(socketAuthMiddleware);

io.on('connection', async (socket: AuthSocket) => {
const userId = socket.auth!.userId;
const deviceId = socket.auth!.deviceId;
console.log('User connected:', userId, socket.id);

// Register socket for device-revocation tracking (cross-instance via Redis pub/sub).
if (appRedis) {
registerDeviceSocket(deviceId, socket.id);
}

// Start the server-side heartbeat watchdog (90 s timeout).
startHeartbeatTimer(socket, userId, deviceId, appRedis, io);

// Per-socket middleware: intercept every incoming event before handlers.
const EXCLUDED_EVENTS = new Set(['heartbeat']);
socket.use(async ([event, ...args], next) => {
// Skip internal heartbeat pings.
if (EXCLUDED_EVENTS.has(event)) {
return next();
}

// Reject events from a device that was revoked mid-session.
if (isDeviceRevoked(deviceId)) {
socket.emit('error', { event: 'device_revoked', message: 'Device has been revoked' });
socket.disconnect(true);
return;
}

// Enforce maximum payload size (configurable via MAX_PAYLOAD_SIZE env).
const payloadArgs = args.filter((a) => typeof a !== 'function');
const { valid, size } = checkPayloadSize(payloadArgs);
if (!valid) {
socket.emit('error', {
event: 'payload_too_large',
message: `Payload size ${size} exceeds limit`,
});
return;
}

// Per-socket rate limiting (configurable via SOCKET_RATE_LIMIT_PER_SEC env).
const { allowed } = await checkRateLimit(appRedis, socket.id);
if (!allowed) {
const violations = recordViolation(socket.id);
socket.emit('error', { event: 'rate_limited', message: 'Rate limit exceeded' });
if (violations >= 3) {
socket.disconnect(true);
}
return;
}

next();
});

// Auto-join all conversation rooms so the socket receives new_message events
// for every conversation the user belongs to (needed for unread badge tracking).
const memberships = await db.query.conversationMembers.findMany({
Expand All @@ -111,53 +48,52 @@ io.on('connection', async (socket: AuthSocket) => {
await socket.join(m.conversationId);
}

const user = await db.query.users.findFirst({
where: eq(users.id, userId),
columns: { presenceVisible: true },
});
const presenceVisible = user?.presenceVisible ?? true;

if (appRedis) {
await setOnline(appRedis, userId, socket.id);
if (presenceVisible) {
const shouldBroadcast = await setOnline(appRedis, userId, socket.id);
if (shouldBroadcast) {
for (const m of memberships) {
io.to(m.conversationId).emit('user_online', { userId });
io.to(m.conversationId).emit('presence_update', { userId, online: true });
io.to(m.conversationId).emit('presence_update', {
userId,
online: true,
status: 'online',
lastSeen: Date.now(),
});
}
}
}

registerMessagingHandlers(io, socket);
socket.on('heartbeat', async () => {
if (appRedis) {
await refreshPresence(appRedis, userId);
}
});

// Monitor send-buffer to detect slow/stalled consumers.
registerForBackpressure(socket);
registerMessagingHandlers(io, socket);

socket.on('disconnect', async () => {
console.log('User disconnected:', userId);
clearHeartbeatTimer(socket.id);
unregisterDeviceSocket(socket.id);
unregisterForBackpressure(socket);
clearViolations(socket.id);

if (appRedis) {
const fullyOffline = await setOffline(appRedis, userId, socket.id);
if (fullyOffline) {
const user = await db.query.users.findFirst({
where: eq(users.id, userId),
columns: { presenceVisible: true },
});
const presenceVisible = user?.presenceVisible ?? true;

if (presenceVisible) {
const memberships = await db.query.conversationMembers.findMany({
where: eq(conversationMembers.userId, userId),
columns: { conversationId: true },
});
for (const m of memberships) {
io.to(m.conversationId).emit('user_offline', { userId });
io.to(m.conversationId).emit('presence_update', { userId, online: false });
const startDebounce = await setOffline(appRedis, userId, socket.id);
if (startDebounce) {
setTimeout(async () => {
const currentlyOnline = await isOnline(appRedis, userId);
if (!currentlyOnline) {
const memberships = await db.query.conversationMembers.findMany({
where: eq(conversationMembers.userId, userId),
columns: { conversationId: true },
});
for (const m of memberships) {
io.to(m.conversationId).emit('user_offline', { userId });
io.to(m.conversationId).emit('presence_update', {
userId,
online: false,
status: 'offline',
lastSeen: Date.now(),
});
}
}
}
}, 3000);
}
}
});
Expand Down Expand Up @@ -204,13 +140,6 @@ httpServer.listen(PORT, () => {
// Redis is unreachable; on failure we fall back to the in-process adapter.
void attachRedisAdapter();

// Subscribe to device_revoked:* channels so any gateway instance can
// disconnect a revoked device's sockets within seconds, even when the
// revocation was issued on a different node.
if (appRedis) {
void startDeviceRevocationListener(appRedis, appRedis);
}

// #46 — Stellar transfer event listener. Only spin up when the contract
// id is configured so local-dev and unit-test runs don't try to talk to
// Soroban RPC. The listener never throws out of runForever, so a failed
Expand All @@ -236,4 +165,4 @@ if (stellarRpcUrl && tokenTransferContractId) {
console.log(
'[stellar-listener] STELLAR_RPC_URL or TOKEN_TRANSFER_CONTRACT_ID unset; listener disabled.',
);
}
}
25 changes: 16 additions & 9 deletions apps/backend/src/services/presence.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,22 @@ function presenceKey(userId: string): string {
return `presence:${userId}`;
}

/**
* Register a socket connection for a user. Adds the socketId to the
* user's presence set and sets/refreshes the TTL.
*/
export async function setOnline(redis: Redis, userId: string, socketId: string): Promise<void> {
export async function setOnline(redis: Redis, userId: string, socketId: string): Promise<boolean> {
const key = presenceKey(userId);
const debounceKey = `presence_debounce:${userId}`;

const count = await redis.scard(key);
await redis.sadd(key, socketId);
await redis.expire(key, PRESENCE_TTL);

if (count === 0) {
const debouncing = await redis.del(debounceKey);
if (debouncing === 1) {
return false; // Flap detected, don't broadcast online
}
return true; // First socket connected
}
return false;
}

/**
Expand All @@ -39,16 +47,15 @@ export async function refreshPresence(redis: Redis, userId: string): Promise<voi
}
}

/**
* Remove a socket connection from the user's presence set.
* Returns true if the user has gone fully offline (no remaining sockets).
*/
export async function setOffline(redis: Redis, userId: string, socketId: string): Promise<boolean> {
const key = presenceKey(userId);
const debounceKey = `presence_debounce:${userId}`;

await redis.srem(key, socketId);
const remaining = await redis.scard(key);
if (remaining === 0) {
await redis.del(key);
await redis.set(debounceKey, '1', 'EX', 3);
return true;
}
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,8 +240,9 @@ export function ConversationListSidebar() {
handleOffline(data.userId);
}

function onPresenceUpdate(data: { userId: string; online: boolean }) {
if (data.online) {
function onPresenceUpdate(data: { userId: string; online?: boolean; status?: 'online' | 'offline'; lastSeen?: number }) {
const isOnline = data.status ? data.status === 'online' : !!data.online;
if (isOnline) {
handleOnline(data.userId);
} else {
handleOffline(data.userId);
Expand Down
Loading