diff --git a/apps/backend/src/__tests__/resume.socket.test.ts b/apps/backend/src/__tests__/resume.socket.test.ts new file mode 100644 index 0000000..8786b0f --- /dev/null +++ b/apps/backend/src/__tests__/resume.socket.test.ts @@ -0,0 +1,143 @@ +import { describe, it, expect, vi, beforeEach } from 'vitest'; +import { EventEmitter } from 'events'; + +// ── Mocks ──────────────────────────────────────────────────────────────────── + +vi.mock('../db/index.js', () => ({ + db: { + query: { + conversationMembers: { findFirst: vi.fn(), findMany: vi.fn() }, + messages: { findFirst: vi.fn() }, + userDevices: { findMany: vi.fn() }, + }, + insert: vi.fn(), + update: vi.fn(), + delete: vi.fn(), + }, +})); + +vi.mock('../db/schema.js', () => ({ + conversations: {}, + conversationMembers: {}, + messages: {}, + messageEnvelopes: {}, + userDevices: {}, +})); + +vi.mock('../lib/conversationCache.js', () => ({ + invalidateConversationCaches: vi.fn().mockResolvedValue(undefined), +})); + +// Truthy redis so the resume handler takes the replay path. +vi.mock('../lib/redis.js', () => ({ redis: {} })); + +vi.mock('drizzle-orm', () => ({ + and: vi.fn(), + eq: vi.fn(), + lt: vi.fn(), + desc: vi.fn(), + sql: vi.fn(), + inArray: vi.fn(), +})); + +const mockReadMissed = vi.fn(); +const mockPublish = vi.fn().mockResolvedValue(undefined); +vi.mock('../services/resumeStream.js', () => ({ + readMissedEvents: mockReadMissed, + publishEphemeral: mockPublish, +})); + +// ── Helpers ────────────────────────────────────────────────────────────────── + +function makeSocket(userId: string) { + const emitter = new EventEmitter(); + const emitted: { event: string; data: unknown }[] = []; + return Object.assign(emitter, { + auth: { userId, deviceId: 'device-1' }, + emit: vi.fn((event: string, data: unknown) => { + emitted.push({ event, data }); + }), + join: vi.fn(), + emitted, + }); +} + +function makeIo() { + const emitFn = vi.fn(); + return { to: vi.fn(() => ({ emit: emitFn, volatile: { emit: emitFn } })) }; +} + +async function getHandler(socket: EventEmitter, io: unknown) { + const { registerMessagingHandlers } = await import('../socket/messaging.js'); + registerMessagingHandlers(io as never, socket as never); + return socket.listeners('resume')[0] as (p: unknown) => Promise; +} + +const USER_ID = 'user-1'; + +beforeEach(() => { + vi.clearAllMocks(); +}); + +describe('resume socket event', () => { + it('replays the missed ephemeral events and signals a message sync', async () => { + mockReadMissed.mockResolvedValue([ + { id: '1-0', type: 'read_receipt', data: { conversationId: 'c1' } }, + { id: '2-0', type: 'presence_update', data: { online: true } }, + ]); + + const socket = makeSocket(USER_ID); + const handler = await getHandler(socket, makeIo()); + + await handler({ lastEventId: '0-1' }); + + expect(socket.emit).toHaveBeenCalledWith('ephemeral_replay', { + id: '1-0', + type: 'read_receipt', + data: { conversationId: 'c1' }, + }); + expect(socket.emit).toHaveBeenCalledWith('ephemeral_replay', { + id: '2-0', + type: 'presence_update', + data: { online: true }, + }); + // Durable messages are recovered via sync, never via the resume stream. + expect(socket.emit).toHaveBeenCalledWith('resume_complete', { + lastEventId: '2-0', + syncRequired: true, + }); + }); + + it('reads from an exclusive cursor so replay is idempotent', async () => { + mockReadMissed.mockResolvedValue([]); + + const socket = makeSocket(USER_ID); + const handler = await getHandler(socket, makeIo()); + + await handler({ lastEventId: '7-0' }); + + expect(mockReadMissed).toHaveBeenCalledWith(expect.anything(), USER_ID, '7-0'); + // Nothing missed: no replays, cursor unchanged, still asks for a sync. + const replays = socket.emitted.filter((e) => e.event === 'ephemeral_replay'); + expect(replays).toHaveLength(0); + expect(socket.emit).toHaveBeenCalledWith('resume_complete', { + lastEventId: '7-0', + syncRequired: true, + }); + }); + + it('treats a missing lastEventId as a full replay from the start', async () => { + mockReadMissed.mockResolvedValue([]); + + const socket = makeSocket(USER_ID); + const handler = await getHandler(socket, makeIo()); + + await handler({}); + + expect(mockReadMissed).toHaveBeenCalledWith(expect.anything(), USER_ID, ''); + expect(socket.emit).toHaveBeenCalledWith('resume_complete', { + lastEventId: null, + syncRequired: true, + }); + }); +}); diff --git a/apps/backend/src/__tests__/resumeStream.test.ts b/apps/backend/src/__tests__/resumeStream.test.ts new file mode 100644 index 0000000..6eb7a2c --- /dev/null +++ b/apps/backend/src/__tests__/resumeStream.test.ts @@ -0,0 +1,110 @@ +import { describe, it, expect, vi, beforeEach } from 'vitest'; +import { + recordEphemeralEvent, + publishEphemeral, + readMissedEvents, + eventStreamKey, + RESUME_STREAM_TTL_SECONDS, + RESUME_STREAM_MAXLEN, +} from '../services/resumeStream.js'; + +function makeRedis() { + return { + xadd: vi.fn().mockResolvedValue('1-0'), + expire: vi.fn().mockResolvedValue(1), + xrange: vi.fn().mockResolvedValue([]), + }; +} + +beforeEach(() => { + vi.clearAllMocks(); +}); + +describe('eventStreamKey', () => { + it('namespaces per user', () => { + expect(eventStreamKey('u1')).toBe('resume:events:u1'); + }); +}); + +describe('recordEphemeralEvent', () => { + it('appends a capped entry, refreshes TTL, and returns the stream id', async () => { + const redis = makeRedis(); + + const id = await recordEphemeralEvent(redis as never, 'u1', { + type: 'read_receipt', + data: { conversationId: 'c1' }, + }); + + expect(id).toBe('1-0'); + expect(redis.xadd).toHaveBeenCalledWith( + 'resume:events:u1', + 'MAXLEN', + '~', + RESUME_STREAM_MAXLEN, + '*', + 'type', + 'read_receipt', + 'data', + JSON.stringify({ conversationId: 'c1' }), + ); + expect(redis.expire).toHaveBeenCalledWith('resume:events:u1', RESUME_STREAM_TTL_SECONDS); + }); +}); + +describe('publishEphemeral', () => { + it('does nothing when Redis is unavailable', async () => { + await expect( + publishEphemeral(null, ['u1'], { type: 'presence_update', data: {} }), + ).resolves.toBeUndefined(); + }); + + it('records once per unique recipient', async () => { + const redis = makeRedis(); + + await publishEphemeral(redis as never, ['u1', 'u1', 'u2'], { + type: 'presence_update', + data: { online: true }, + }); + + expect(redis.xadd).toHaveBeenCalledTimes(2); + }); + + it('is a no-op for an empty recipient list', async () => { + const redis = makeRedis(); + await publishEphemeral(redis as never, [], { type: 'system', data: {} }); + expect(redis.xadd).not.toHaveBeenCalled(); + }); +}); + +describe('readMissedEvents', () => { + it('reads the whole stream when no cursor is supplied', async () => { + const redis = makeRedis(); + redis.xrange.mockResolvedValue([ + ['1-0', ['type', 'read_receipt', 'data', '{"conversationId":"c1"}']], + ['2-0', ['type', 'presence_update', 'data', '{"online":true}']], + ]); + + const out = await readMissedEvents(redis as never, 'u1', ''); + + expect(redis.xrange).toHaveBeenCalledWith('resume:events:u1', '-', '+'); + expect(out).toEqual([ + { id: '1-0', type: 'read_receipt', data: { conversationId: 'c1' } }, + { id: '2-0', type: 'presence_update', data: { online: true } }, + ]); + }); + + it('uses an exclusive lower bound so replay is idempotent', async () => { + const redis = makeRedis(); + await readMissedEvents(redis as never, 'u1', '5-0'); + expect(redis.xrange).toHaveBeenCalledWith('resume:events:u1', '(5-0', '+'); + }); + + it('tolerates malformed payloads without throwing', async () => { + const redis = makeRedis(); + redis.xrange.mockResolvedValue([['9-0', ['type', 'system', 'data', 'not-json']]]); + + const out = await readMissedEvents(redis as never, 'u1', ''); + + expect(out).toEqual([{ id: '9-0', type: 'system', data: {} }]); + }); +}); diff --git a/apps/backend/src/index.ts b/apps/backend/src/index.ts index 3335e09..a7a18ce 100644 --- a/apps/backend/src/index.ts +++ b/apps/backend/src/index.ts @@ -3,9 +3,10 @@ import { Server } from 'socket.io'; import { createAdapter } from '@socket.io/redis-adapter'; import { createClient } from 'redis'; import dotenv from 'dotenv'; -import { eq } from 'drizzle-orm'; +import { eq, inArray } from 'drizzle-orm'; import { db } from './db/index.js'; import { conversationMembers, users } from './db/schema.js'; +import { publishEphemeral } from './services/resumeStream.js'; import { socketAuthMiddleware, type AuthSocket } from './middleware/socketAuth.js'; import { registerMessagingHandlers } from './socket/messaging.js'; import { app } from './app.js'; @@ -46,6 +47,28 @@ const io = new Server(httpServer, { setSocketServer(io); +// Record a presence change on the resume streams of everyone who shares a +// conversation with this user (#200), so members who are offline at the moment +// of the change can replay it when they reconnect. Best-effort and Redis-only. +async function recordPresenceForCoMembers( + userId: string, + online: boolean, + conversationIds: string[], +): Promise { + if (!appRedis || conversationIds.length === 0) { + return; + } + const coMembers = await db.query.conversationMembers.findMany({ + where: inArray(conversationMembers.conversationId, conversationIds), + columns: { userId: true }, + }); + await publishEphemeral( + appRedis, + coMembers.map((m) => m.userId).filter((id) => id !== userId), + { type: 'presence_update', data: { userId, online } }, + ); +} + io.use(socketAuthMiddleware); io.on('connection', async (socket: AuthSocket) => { @@ -124,6 +147,11 @@ io.on('connection', async (socket: AuthSocket) => { io.to(m.conversationId).emit('user_online', { userId }); io.to(m.conversationId).emit('presence_update', { userId, online: true }); } + await recordPresenceForCoMembers( + userId, + true, + memberships.map((m) => m.conversationId), + ); } } @@ -157,6 +185,11 @@ io.on('connection', async (socket: AuthSocket) => { io.to(m.conversationId).emit('user_offline', { userId }); io.to(m.conversationId).emit('presence_update', { userId, online: false }); } + await recordPresenceForCoMembers( + userId, + false, + memberships.map((m) => m.conversationId), + ); } } } diff --git a/apps/backend/src/services/resumeStream.ts b/apps/backend/src/services/resumeStream.ts new file mode 100644 index 0000000..554b9de --- /dev/null +++ b/apps/backend/src/services/resumeStream.ts @@ -0,0 +1,125 @@ +/** + * Resume protocol — missed ephemeral-event replay (#200). + * + * Lightweight, non-durable events (read/delivery receipts, presence changes, + * system notices) are appended to a short-lived per-user Redis stream as they + * are emitted live. When a device reconnects it sends `resume { lastEventId }` + * and the gateway replays everything recorded after that id, then tells the + * client to run a full envelope sync for the durable messages it missed. + * + * Durable chat messages live in Postgres and are deliberately NEVER written to + * this stream — they are recovered through message/envelope sync, keeping the + * stream cheap and bounded. + * + * The stream is keyed per user; each device tracks its own `lastEventId` + * cursor, so two devices of the same user resume independently. Redis stream + * ids are monotonic and unique, which makes them the natural event id clients + * persist. Replay uses an exclusive range, so re-issuing `resume` with an + * advanced cursor never re-delivers an event the client already saw. + */ +import type { Redis } from 'ioredis'; + +/** Streams expire after this many seconds of inactivity. Long enough to cover + * transient disconnects (network blips, app backgrounding) without retaining + * ephemeral chatter indefinitely. */ +export const RESUME_STREAM_TTL_SECONDS = 300; + +/** Hard cap on backlog length per user. Approximate trimming (`MAXLEN ~`) keeps + * XADD O(1) while bounding memory. */ +export const RESUME_STREAM_MAXLEN = 500; + +export interface EphemeralEvent { + type: string; + data: Record; +} + +export interface ReplayedEvent { + id: string; + type: string; + data: Record; +} + +export function eventStreamKey(userId: string): string { + return `resume:events:${userId}`; +} + +/** + * Append a single ephemeral event to a user's replay stream and return the + * generated stream id. The id is what the client stores as `lastEventId`. + */ +export async function recordEphemeralEvent( + redis: Redis, + userId: string, + event: EphemeralEvent, +): Promise { + const key = eventStreamKey(userId); + const id = await redis.xadd( + key, + 'MAXLEN', + '~', + RESUME_STREAM_MAXLEN, + '*', + 'type', + event.type, + 'data', + JSON.stringify(event.data), + ); + await redis.expire(key, RESUME_STREAM_TTL_SECONDS); + return id; +} + +/** + * Fan an ephemeral event out to every recipient's stream. Recording is + * best-effort: a Redis failure for one recipient must not block live delivery + * or the others. No-op when Redis is unavailable. + */ +export async function publishEphemeral( + redis: Redis | null, + recipientUserIds: string[], + event: EphemeralEvent, +): Promise { + if (!redis || recipientUserIds.length === 0) { + return; + } + const client = redis; + await Promise.allSettled( + [...new Set(recipientUserIds)].map((userId) => recordEphemeralEvent(client, userId, event)), + ); +} + +/** + * Read every ephemeral event recorded after `lastEventId` (exclusive). When + * `lastEventId` is empty the whole retained stream is returned. The exclusive + * lower bound is what makes replay idempotent. + */ +export async function readMissedEvents( + redis: Redis, + userId: string, + lastEventId: string, +): Promise { + const key = eventStreamKey(userId); + const start = lastEventId ? `(${lastEventId}` : '-'; + const entries = await redis.xrange(key, start, '+'); + + return entries.map(([id, fields]) => ({ + id, + type: readField(fields, 'type'), + data: parseData(readField(fields, 'data')), + })); +} + +function readField(fields: string[], name: string): string { + const idx = fields.indexOf(name); + return idx >= 0 ? (fields[idx + 1] ?? '') : ''; +} + +function parseData(raw: string): Record { + if (!raw) { + return {}; + } + try { + return JSON.parse(raw) as Record; + } catch { + return {}; + } +} diff --git a/apps/backend/src/socket/messaging.ts b/apps/backend/src/socket/messaging.ts index 47b1471..844ce3f 100644 --- a/apps/backend/src/socket/messaging.ts +++ b/apps/backend/src/socket/messaging.ts @@ -12,6 +12,7 @@ import type { AuthSocket } from '../middleware/socketAuth.js'; import { invalidateConversationCaches } from '../lib/conversationCache.js'; import { serializeMessage } from '../lib/messages.js'; import { redis } from '../lib/redis.js'; +import { publishEphemeral, readMissedEvents } from '../services/resumeStream.js'; const PAGE_SIZE = 30; @@ -257,9 +258,49 @@ export function registerMessagingHandlers(io: Server, socket: AuthSocket): void ); io.to(conversationId).volatile.emit('read_receipt', { userId, lastReadMessageId }); + + // Persist this receipt to each member's resume stream so a member who is + // offline right now can replay it on reconnect. The receipt is ephemeral + // (Redis only) — the underlying messages are recovered via envelope sync. + // Skip the member lookup entirely when there is no stream to write to. + if (redis) { + const members = await db.query.conversationMembers.findMany({ + where: eq(conversationMembers.conversationId, conversationId), + columns: { userId: true }, + }); + await publishEphemeral( + redis, + members.map((member) => member.userId), + { type: 'read_receipt', data: { conversationId, userId, lastReadMessageId } }, + ); + } }, ); + // ── resume ─────────────────────────────────────────────────────────────────── + // Payload: { lastEventId?: string } + // On reconnect, replay the lightweight ephemeral events this device missed + // (receipts, presence, system notices) from its short-lived Redis stream, then + // tell the client to run a full envelope sync for durable messages — which live + // in Postgres and are intentionally never placed on the resume stream. + socket.on('resume', async (payload: { lastEventId?: string }) => { + if (!redis) { + // No replay backend available; the client must fall back to a full sync. + socket.emit('resume_complete', { lastEventId: null, syncRequired: true }); + return; + } + + const lastEventId = typeof payload?.lastEventId === 'string' ? payload.lastEventId : ''; + + const missed = await readMissedEvents(redis, userId, lastEventId); + for (const event of missed) { + socket.emit('ephemeral_replay', { id: event.id, type: event.type, data: event.data }); + } + + const newCursor = missed.length > 0 ? missed[missed.length - 1]!.id : lastEventId || null; + socket.emit('resume_complete', { lastEventId: newCursor, syncRequired: true }); + }); + // ── create_conversation ──────────────────────────────────────────────────── // Payload: { type: 'dm'|'group'; name?: string; memberIds: string[] } // Creates a conversation and adds all members (including caller).