Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 143 additions & 0 deletions apps/backend/src/__tests__/resume.socket.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
import { describe, it, expect, vi, beforeEach } from 'vitest';
import { EventEmitter } from 'events';

// ── Mocks ────────────────────────────────────────────────────────────────────

vi.mock('../db/index.js', () => ({
db: {
query: {
conversationMembers: { findFirst: vi.fn(), findMany: vi.fn() },
messages: { findFirst: vi.fn() },
userDevices: { findMany: vi.fn() },
},
insert: vi.fn(),
update: vi.fn(),
delete: vi.fn(),
},
}));

vi.mock('../db/schema.js', () => ({
conversations: {},
conversationMembers: {},
messages: {},
messageEnvelopes: {},
userDevices: {},
}));

vi.mock('../lib/conversationCache.js', () => ({
invalidateConversationCaches: vi.fn().mockResolvedValue(undefined),
}));

// Truthy redis so the resume handler takes the replay path.
vi.mock('../lib/redis.js', () => ({ redis: {} }));

vi.mock('drizzle-orm', () => ({
and: vi.fn(),
eq: vi.fn(),
lt: vi.fn(),
desc: vi.fn(),
sql: vi.fn(),
inArray: vi.fn(),
}));

const mockReadMissed = vi.fn();
const mockPublish = vi.fn().mockResolvedValue(undefined);
vi.mock('../services/resumeStream.js', () => ({
readMissedEvents: mockReadMissed,
publishEphemeral: mockPublish,
}));

// ── Helpers ──────────────────────────────────────────────────────────────────

function makeSocket(userId: string) {
const emitter = new EventEmitter();
const emitted: { event: string; data: unknown }[] = [];
return Object.assign(emitter, {
auth: { userId, deviceId: 'device-1' },
emit: vi.fn((event: string, data: unknown) => {
emitted.push({ event, data });
}),
join: vi.fn(),
emitted,
});
}

function makeIo() {
const emitFn = vi.fn();
return { to: vi.fn(() => ({ emit: emitFn, volatile: { emit: emitFn } })) };
}

async function getHandler(socket: EventEmitter, io: unknown) {
const { registerMessagingHandlers } = await import('../socket/messaging.js');
registerMessagingHandlers(io as never, socket as never);
return socket.listeners('resume')[0] as (p: unknown) => Promise<void>;
}

const USER_ID = 'user-1';

beforeEach(() => {
vi.clearAllMocks();
});

describe('resume socket event', () => {
it('replays the missed ephemeral events and signals a message sync', async () => {
mockReadMissed.mockResolvedValue([
{ id: '1-0', type: 'read_receipt', data: { conversationId: 'c1' } },
{ id: '2-0', type: 'presence_update', data: { online: true } },
]);

const socket = makeSocket(USER_ID);
const handler = await getHandler(socket, makeIo());

await handler({ lastEventId: '0-1' });

expect(socket.emit).toHaveBeenCalledWith('ephemeral_replay', {
id: '1-0',
type: 'read_receipt',
data: { conversationId: 'c1' },
});
expect(socket.emit).toHaveBeenCalledWith('ephemeral_replay', {
id: '2-0',
type: 'presence_update',
data: { online: true },
});
// Durable messages are recovered via sync, never via the resume stream.
expect(socket.emit).toHaveBeenCalledWith('resume_complete', {
lastEventId: '2-0',
syncRequired: true,
});
});

it('reads from an exclusive cursor so replay is idempotent', async () => {
mockReadMissed.mockResolvedValue([]);

const socket = makeSocket(USER_ID);
const handler = await getHandler(socket, makeIo());

await handler({ lastEventId: '7-0' });

expect(mockReadMissed).toHaveBeenCalledWith(expect.anything(), USER_ID, '7-0');
// Nothing missed: no replays, cursor unchanged, still asks for a sync.
const replays = socket.emitted.filter((e) => e.event === 'ephemeral_replay');
expect(replays).toHaveLength(0);
expect(socket.emit).toHaveBeenCalledWith('resume_complete', {
lastEventId: '7-0',
syncRequired: true,
});
});

it('treats a missing lastEventId as a full replay from the start', async () => {
mockReadMissed.mockResolvedValue([]);

const socket = makeSocket(USER_ID);
const handler = await getHandler(socket, makeIo());

await handler({});

expect(mockReadMissed).toHaveBeenCalledWith(expect.anything(), USER_ID, '');
expect(socket.emit).toHaveBeenCalledWith('resume_complete', {
lastEventId: null,
syncRequired: true,
});
});
});
110 changes: 110 additions & 0 deletions apps/backend/src/__tests__/resumeStream.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import { describe, it, expect, vi, beforeEach } from 'vitest';
import {
recordEphemeralEvent,
publishEphemeral,
readMissedEvents,
eventStreamKey,
RESUME_STREAM_TTL_SECONDS,
RESUME_STREAM_MAXLEN,
} from '../services/resumeStream.js';

function makeRedis() {
return {
xadd: vi.fn().mockResolvedValue('1-0'),
expire: vi.fn().mockResolvedValue(1),
xrange: vi.fn().mockResolvedValue([]),
};
}

beforeEach(() => {
vi.clearAllMocks();
});

describe('eventStreamKey', () => {
it('namespaces per user', () => {
expect(eventStreamKey('u1')).toBe('resume:events:u1');
});
});

describe('recordEphemeralEvent', () => {
it('appends a capped entry, refreshes TTL, and returns the stream id', async () => {
const redis = makeRedis();

const id = await recordEphemeralEvent(redis as never, 'u1', {
type: 'read_receipt',
data: { conversationId: 'c1' },
});

expect(id).toBe('1-0');
expect(redis.xadd).toHaveBeenCalledWith(
'resume:events:u1',
'MAXLEN',
'~',
RESUME_STREAM_MAXLEN,
'*',
'type',
'read_receipt',
'data',
JSON.stringify({ conversationId: 'c1' }),
);
expect(redis.expire).toHaveBeenCalledWith('resume:events:u1', RESUME_STREAM_TTL_SECONDS);
});
});

describe('publishEphemeral', () => {
it('does nothing when Redis is unavailable', async () => {
await expect(
publishEphemeral(null, ['u1'], { type: 'presence_update', data: {} }),
).resolves.toBeUndefined();
});

it('records once per unique recipient', async () => {
const redis = makeRedis();

await publishEphemeral(redis as never, ['u1', 'u1', 'u2'], {
type: 'presence_update',
data: { online: true },
});

expect(redis.xadd).toHaveBeenCalledTimes(2);
});

it('is a no-op for an empty recipient list', async () => {
const redis = makeRedis();
await publishEphemeral(redis as never, [], { type: 'system', data: {} });
expect(redis.xadd).not.toHaveBeenCalled();
});
});

describe('readMissedEvents', () => {
it('reads the whole stream when no cursor is supplied', async () => {
const redis = makeRedis();
redis.xrange.mockResolvedValue([
['1-0', ['type', 'read_receipt', 'data', '{"conversationId":"c1"}']],
['2-0', ['type', 'presence_update', 'data', '{"online":true}']],
]);

const out = await readMissedEvents(redis as never, 'u1', '');

expect(redis.xrange).toHaveBeenCalledWith('resume:events:u1', '-', '+');
expect(out).toEqual([
{ id: '1-0', type: 'read_receipt', data: { conversationId: 'c1' } },
{ id: '2-0', type: 'presence_update', data: { online: true } },
]);
});

it('uses an exclusive lower bound so replay is idempotent', async () => {
const redis = makeRedis();
await readMissedEvents(redis as never, 'u1', '5-0');
expect(redis.xrange).toHaveBeenCalledWith('resume:events:u1', '(5-0', '+');
});

it('tolerates malformed payloads without throwing', async () => {
const redis = makeRedis();
redis.xrange.mockResolvedValue([['9-0', ['type', 'system', 'data', 'not-json']]]);

const out = await readMissedEvents(redis as never, 'u1', '');

expect(out).toEqual([{ id: '9-0', type: 'system', data: {} }]);
});
});
35 changes: 34 additions & 1 deletion apps/backend/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ import { Server } from 'socket.io';
import { createAdapter } from '@socket.io/redis-adapter';
import { createClient } from 'redis';
import dotenv from 'dotenv';
import { eq } from 'drizzle-orm';
import { eq, inArray } from 'drizzle-orm';
import { db } from './db/index.js';
import { conversationMembers, users } from './db/schema.js';
import { publishEphemeral } from './services/resumeStream.js';
import { socketAuthMiddleware, type AuthSocket } from './middleware/socketAuth.js';
import { registerMessagingHandlers } from './socket/messaging.js';
import { app } from './app.js';
Expand Down Expand Up @@ -46,6 +47,28 @@ const io = new Server(httpServer, {

setSocketServer(io);

// Record a presence change on the resume streams of everyone who shares a
// conversation with this user (#200), so members who are offline at the moment
// of the change can replay it when they reconnect. Best-effort and Redis-only.
async function recordPresenceForCoMembers(
userId: string,
online: boolean,
conversationIds: string[],
): Promise<void> {
if (!appRedis || conversationIds.length === 0) {
return;
}
const coMembers = await db.query.conversationMembers.findMany({
where: inArray(conversationMembers.conversationId, conversationIds),
columns: { userId: true },
});
await publishEphemeral(
appRedis,
coMembers.map((m) => m.userId).filter((id) => id !== userId),
{ type: 'presence_update', data: { userId, online } },
);
}

io.use(socketAuthMiddleware);

io.on('connection', async (socket: AuthSocket) => {
Expand Down Expand Up @@ -124,6 +147,11 @@ io.on('connection', async (socket: AuthSocket) => {
io.to(m.conversationId).emit('user_online', { userId });
io.to(m.conversationId).emit('presence_update', { userId, online: true });
}
await recordPresenceForCoMembers(
userId,
true,
memberships.map((m) => m.conversationId),
);
}
}

Expand Down Expand Up @@ -157,6 +185,11 @@ io.on('connection', async (socket: AuthSocket) => {
io.to(m.conversationId).emit('user_offline', { userId });
io.to(m.conversationId).emit('presence_update', { userId, online: false });
}
await recordPresenceForCoMembers(
userId,
false,
memberships.map((m) => m.conversationId),
);
}
}
}
Expand Down
Loading
Loading