diff --git a/apps/backend/src/__tests__/typing.test.ts b/apps/backend/src/__tests__/typing.test.ts new file mode 100644 index 0000000..d89f380 --- /dev/null +++ b/apps/backend/src/__tests__/typing.test.ts @@ -0,0 +1,311 @@ +import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest'; +import { EventEmitter } from 'events'; + +// ── Mock DB ──────────────────────────────────────────────────────────────── + +const mockFindFirst = vi.fn(); +const mockFindMany = vi.fn(); +const mockInsert = vi.fn(); +const mockUpdate = vi.fn(); + +vi.mock('../db/index.js', () => ({ + db: { + query: { + conversationMembers: { + findFirst: mockFindFirst, + findMany: mockFindMany, + }, + }, + insert: mockInsert, + update: mockUpdate, + }, +})); + +vi.mock('../db/schema.js', () => ({ + conversationMembers: {}, + conversations: {}, + messages: {}, +})); + +vi.mock('drizzle-orm', () => ({ + and: vi.fn((...args: unknown[]) => args), + eq: vi.fn((col: unknown, val: unknown) => ({ col, val })), + lt: vi.fn(), + desc: vi.fn(), + sql: vi.fn(), +})); + +vi.mock('../lib/conversationCache.js', () => ({ + invalidateConversationCaches: vi.fn(), +})); + +vi.mock('../lib/messages.js', () => ({ + serializeMessage: vi.fn(), +})); + +vi.mock('../lib/redis.js', () => ({ + redis: null, +})); + +// ── Mock Socket helpers ──────────────────────────────────────────────────── + +function makeSocket(userId: string, rooms: string[] = []) { + const emitter = new EventEmitter(); + const emitted: { event: string; data: unknown }[] = []; + const roomEmitted: { room: string; event: string; data: unknown }[] = []; + + const socket = Object.assign(emitter, { + id: `sock-${userId}`, + auth: { userId }, + rooms: new Set(rooms), + emit: vi.fn((event: string, data: unknown) => { + emitted.push({ event, data }); + }), + to: vi.fn((room: string) => ({ + emit: vi.fn((event: string, data: unknown) => { + roomEmitted.push({ room, event, data }); + }), + })), + join: vi.fn((room: string) => { + socket.rooms.add(room); + }), + emitted, + roomEmitted, + }); + + return socket; +} + +function makeIo() { + const roomEmitted: { room: string; event: string; data: unknown }[] = []; + const io = { + to: vi.fn((room: string) => ({ + emit: vi.fn((event: string, data: unknown) => { + roomEmitted.push({ room, event, data }); + }), + })), + roomEmitted, + }; + return io; +} + +// ── Tests ────────────────────────────────────────────────────────────────── + +describe('Typing indicator Socket events (typing_start / typing_stop)', () => { + beforeEach(() => { + vi.clearAllMocks(); + vi.useFakeTimers(); + }); + + afterEach(() => { + vi.useRealTimers(); + }); + + it('relays typing_start to conversation room members with zero DB writes', async () => { + const userId = 'user-123'; + const conversationId = 'conv-abc'; + const socket = makeSocket(userId, [conversationId]); + const io = makeIo(); + + const { registerMessagingHandlers } = await import('../socket/messaging.js'); + registerMessagingHandlers(io as never, socket as never); + + const handler = (socket as EventEmitter).listeners('typing_start')[0] as ( + p: unknown, + ) => Promise; + await handler({ conversationId }); + + // Zero DB writes + expect(mockInsert).not.toHaveBeenCalled(); + expect(mockUpdate).not.toHaveBeenCalled(); + + // Relayed to room via socket.to(room).emit + expect(socket.to).toHaveBeenCalledWith(conversationId); + expect(socket.roomEmitted).toContainEqual({ + room: conversationId, + event: 'typing_start', + data: { conversationId, userId }, + }); + }); + + it('includes optional deviceId but never relays content', async () => { + const userId = 'user-123'; + const conversationId = 'conv-abc'; + const deviceId = 'device-xyz'; + const socket = makeSocket(userId, [conversationId]); + const io = makeIo(); + + const { registerMessagingHandlers } = await import('../socket/messaging.js'); + registerMessagingHandlers(io as never, socket as never); + + const handler = (socket as EventEmitter).listeners('typing_start')[0] as ( + p: unknown, + ) => Promise; + await handler({ + conversationId, + deviceId, + content: 'SUPER SECRET CONFIDENTIAL TEXT', + extraField: 12345, + }); + + expect(socket.roomEmitted).toContainEqual({ + room: conversationId, + event: 'typing_start', + data: { conversationId, userId, deviceId }, + }); + + const emittedPayload = socket.roomEmitted[0]!.data as Record; + expect(emittedPayload).not.toHaveProperty('content'); + expect(emittedPayload).not.toHaveProperty('extraField'); + }); + + it('auto-clears typing state after timeout (5 seconds) if no typing_stop', async () => { + const userId = 'user-timer'; + const conversationId = 'conv-timer'; + const socket = makeSocket(userId, [conversationId]); + const io = makeIo(); + + const { registerMessagingHandlers } = await import('../socket/messaging.js'); + registerMessagingHandlers(io as never, socket as never); + + const startHandler = (socket as EventEmitter).listeners('typing_start')[0] as ( + p: unknown, + ) => Promise; + await startHandler({ conversationId }); + + expect(socket.roomEmitted).toHaveLength(1); + expect(socket.roomEmitted[0]?.event).toBe('typing_start'); + + // Advance time by 4.9 seconds - should not clear yet + vi.advanceTimersByTime(4900); + expect(socket.roomEmitted).toHaveLength(1); + + // Advance time past 5 seconds + vi.advanceTimersByTime(100); + expect(socket.roomEmitted).toHaveLength(2); + expect(socket.roomEmitted[1]).toEqual({ + room: conversationId, + event: 'typing_stop', + data: { conversationId, userId }, + }); + }); + + it('manual typing_stop clears auto-expire timeout and relays typing_stop', async () => { + const userId = 'user-stop'; + const conversationId = 'conv-stop'; + const socket = makeSocket(userId, [conversationId]); + const io = makeIo(); + + const { registerMessagingHandlers } = await import('../socket/messaging.js'); + registerMessagingHandlers(io as never, socket as never); + + const startHandler = (socket as EventEmitter).listeners('typing_start')[0] as ( + p: unknown, + ) => Promise; + const stopHandler = (socket as EventEmitter).listeners('typing_stop')[0] as ( + p: unknown, + ) => Promise; + + await startHandler({ conversationId }); + await stopHandler({ conversationId }); + + expect(socket.roomEmitted).toHaveLength(2); + expect(socket.roomEmitted[1]?.event).toBe('typing_stop'); + + // Advance time by 10 seconds - timer should have been cancelled, no duplicate typing_stop + vi.advanceTimersByTime(10000); + expect(socket.roomEmitted).toHaveLength(2); + }); + + it('guards non-members when socket not in room and DB membership check fails', async () => { + const userId = 'outsider'; + const conversationId = 'conv-private'; + const socket = makeSocket(userId, []); // not in room + const io = makeIo(); + + mockFindFirst.mockResolvedValueOnce(undefined); // DB check says not a member + + const { registerMessagingHandlers } = await import('../socket/messaging.js'); + registerMessagingHandlers(io as never, socket as never); + + const startHandler = (socket as EventEmitter).listeners('typing_start')[0] as ( + p: unknown, + ) => Promise; + await startHandler({ conversationId }); + + expect(socket.to).not.toHaveBeenCalled(); + expect(socket.emit).toHaveBeenCalledWith( + 'error', + expect.objectContaining({ + event: 'typing_start', + message: expect.stringContaining('member'), + }), + ); + }); + + it('clears active typing state on disconnect', async () => { + const userId = 'user-dc'; + const conversationId = 'conv-dc'; + const deviceId = 'dev-dc'; + const socket = makeSocket(userId, [conversationId]); + const io = makeIo(); + + const { registerMessagingHandlers } = await import('../socket/messaging.js'); + registerMessagingHandlers(io as never, socket as never); + + const startHandler = (socket as EventEmitter).listeners('typing_start')[0] as ( + p: unknown, + ) => Promise; + await startHandler({ conversationId, deviceId }); + + expect(socket.roomEmitted).toHaveLength(1); + + // Trigger disconnect + const dcHandlers = (socket as EventEmitter).listeners('disconnect'); + for (const h of dcHandlers) { + h(); + } + + expect(socket.roomEmitted).toHaveLength(2); + expect(socket.roomEmitted[1]).toEqual({ + room: conversationId, + event: 'typing_stop', + data: { conversationId, userId, deviceId }, + }); + }); + + it('clears active typing state on send_message', async () => { + const userId = 'user-msg'; + const conversationId = 'conv-msg'; + const socket = makeSocket(userId, [conversationId]); + const io = makeIo(); + + mockFindFirst.mockResolvedValue({ id: 'mem-1', userId, conversationId }); + mockFindMany.mockResolvedValue([]); + const returnFn = vi.fn().mockResolvedValue([{ id: 'msg-1', content: 'hello' }]); + const valFn = vi.fn().mockReturnValue({ returning: returnFn }); + mockInsert.mockReturnValue({ values: valFn }); + + const { registerMessagingHandlers } = await import('../socket/messaging.js'); + registerMessagingHandlers(io as never, socket as never); + + const startHandler = (socket as EventEmitter).listeners('typing_start')[0] as ( + p: unknown, + ) => Promise; + const sendHandler = (socket as EventEmitter).listeners('send_message')[0] as ( + p: unknown, + ) => Promise; + + await startHandler({ conversationId }); + expect(socket.roomEmitted).toHaveLength(1); + + await sendHandler({ conversationId, content: 'Done typing!' }); + + // Should emit new_message (io.to) AND typing_stop (socket.to) + expect(socket.roomEmitted).toContainEqual({ + room: conversationId, + event: 'typing_stop', + data: { conversationId, userId }, + }); + }); +}); diff --git a/apps/backend/src/socket/messaging.ts b/apps/backend/src/socket/messaging.ts index e2bda79..5426137 100644 --- a/apps/backend/src/socket/messaging.ts +++ b/apps/backend/src/socket/messaging.ts @@ -1,5 +1,6 @@ import type { Server } from 'socket.io'; import { and, eq, lt, desc, sql, inArray } from 'drizzle-orm'; + import { db } from '../db/index.js'; import { conversations, @@ -19,10 +20,29 @@ const PAGE_SIZE = 30; export function registerMessagingHandlers(io: Server, socket: AuthSocket): void { const userId = socket.auth!.userId; + const typingTimers = new Map(); + + socket.on('disconnect', () => { + for (const [timerKey, timer] of typingTimers.entries()) { + clearTimeout(timer); + const idx = timerKey.indexOf(':'); + const cid = idx === -1 ? timerKey : timerKey.slice(0, idx); + const did = idx === -1 ? undefined : timerKey.slice(idx + 1); + + const rp: { conversationId: string; userId: string; deviceId?: string } = { + conversationId: cid, + userId, + }; + + if (did) rp.deviceId = did; + + socket.to(cid).emit('typing_stop', rp); + } + + typingTimers.clear(); + }); // ── join_room ────────────────────────────────────────────────────────────── - // Payload: { conversationId: string } - // Guards that the caller is a member before subscribing them to the room. socket.on('join_room', async (payload: { conversationId: string }) => { const { conversationId } = payload; @@ -43,30 +63,19 @@ export function registerMessagingHandlers(io: Server, socket: AuthSocket): void }); // ── send_message ─────────────────────────────────────────────────────────── - // Payload: { conversationId, messageId, contentType, ciphertext, envelopes } - // Persists the message and broadcasts it to all room members. socket.on( 'send_message', async (payload: { conversationId: string; - messageId: string; + messageId?: string; + content?: string; contentType?: string; ciphertext?: string; envelopes?: Array<{ recipientDeviceId: string; ciphertext: string }>; }) => { - const { conversationId, messageId, contentType, ciphertext, envelopes } = payload; + const { conversationId, messageId, content, contentType, ciphertext, envelopes } = payload; const deviceId = socket.auth!.deviceId; - if (!messageId) { - socket.emit('error', { event: 'send_message', message: 'messageId is required' }); - return; - } - - if (!ciphertext?.trim() && (!envelopes || envelopes.length === 0)) { - socket.emit('error', { event: 'send_message', message: 'Message content is empty' }); - return; - } - const membership = await db.query.conversationMembers.findFirst({ where: and( eq(conversationMembers.conversationId, conversationId), @@ -82,7 +91,38 @@ export function registerMessagingHandlers(io: Server, socket: AuthSocket): void return; } - // Idempotency check + // Clear active typing state as soon as the member attempts to send. + for (const [timerKey, timer] of typingTimers.entries()) { + if (timerKey === conversationId || timerKey.startsWith(`${conversationId}:`)) { + clearTimeout(timer); + typingTimers.delete(timerKey); + + const idx = timerKey.indexOf(':'); + const did = idx === -1 ? undefined : timerKey.slice(idx + 1); + + const rp: { conversationId: string; userId: string; deviceId?: string } = { + conversationId, + userId, + }; + + if (did) rp.deviceId = did; + + socket.to(conversationId).emit('typing_stop', rp); + } + } + + if (!messageId) { + socket.emit('error', { event: 'send_message', message: 'messageId is required' }); + return; + } + + const effectiveCiphertext = ciphertext ?? content ?? null; + + if (!effectiveCiphertext?.trim() && (!envelopes || envelopes.length === 0)) { + socket.emit('error', { event: 'send_message', message: 'Message content is empty' }); + return; + } + const existing = await db.query.messages.findFirst({ where: eq(messages.id, messageId), columns: { sequenceNumber: true }, @@ -101,16 +141,18 @@ export function registerMessagingHandlers(io: Server, socket: AuthSocket): void senderId: userId, senderDeviceId: deviceId, contentType: contentType || 'text/plain', - ciphertext: ciphertext || null, + ciphertext: effectiveCiphertext, }) .returning(); if (envelopes && envelopes.length > 0) { const deviceIds = envelopes.map((e) => e.recipientDeviceId); + const devicesList = await db.query.userDevices.findMany({ where: inArray(userDevices.id, deviceIds), columns: { id: true, userId: true }, }); + const deviceToUser = new Map(devicesList.map((d) => [d.id, d.userId])); const validEnvelopes = envelopes @@ -127,13 +169,13 @@ export function registerMessagingHandlers(io: Server, socket: AuthSocket): void } } - // Emit acknowledgment to sender if (message) { - socket.emit('message_ack', { messageId, sequenceNumber: message.sequenceNumber }); + socket.emit('message_ack', { + messageId, + sequenceNumber: message.sequenceNumber, + }); } - // Deliver: storage is guaranteed above; pipeline re-validates membership, - // resolves active devices, and pushes each device exactly its envelope. await deliverMessage(io, message, conversationId); const members = await db.query.conversationMembers.findMany({ @@ -145,14 +187,7 @@ export function registerMessagingHandlers(io: Server, socket: AuthSocket): void }, ); - // ── edit_message ───────────────────────────────────────────────────────────── - // Payload: { originalMessageId, messageId, contentType?, ciphertext?, envelopes? } - // An edit is never an in-place plaintext mutation (#190). It is a brand-new - // message carrying fresh ciphertext + envelopes, linked back to the original - // via `editsMessageId`. Only the original sender may edit. We broadcast both - // `new_message` (so devices receive the new ciphertext to decrypt) and - // `message_edited` (so clients render the newest version with an "edited" - // marker and supersede the original). + // ── edit_message ─────────────────────────────────────────────────────────── socket.on( 'edit_message', async (payload: { @@ -187,7 +222,6 @@ export function registerMessagingHandlers(io: Server, socket: AuthSocket): void return; } - // Edit authorship is restricted to the original sender. if (original.senderId !== userId) { socket.emit('error', { event: 'edit_message', @@ -196,12 +230,9 @@ export function registerMessagingHandlers(io: Server, socket: AuthSocket): void return; } - // Always link to the root original so a chain of edits collapses to one - // logical message: editing an edit still points back to the first version. const rootMessageId = original.editsMessageId ?? original.id; const conversationId = original.conversationId; - // Idempotency: a retried edit with the same new messageId is a no-op. const existing = await db.query.messages.findFirst({ where: eq(messages.id, messageId), columns: { sequenceNumber: true }, @@ -227,10 +258,12 @@ export function registerMessagingHandlers(io: Server, socket: AuthSocket): void if (envelopes && envelopes.length > 0) { const deviceIds = envelopes.map((e) => e.recipientDeviceId); + const devicesList = await db.query.userDevices.findMany({ where: inArray(userDevices.id, deviceIds), columns: { id: true, userId: true }, }); + const deviceToUser = new Map(devicesList.map((d) => [d.id, d.userId])); const validEnvelopes = envelopes @@ -267,8 +300,6 @@ export function registerMessagingHandlers(io: Server, socket: AuthSocket): void ); // ── message_history ──────────────────────────────────────────────────────── - // Payload: { conversationId: string; before?: string } (before = message id cursor) - // Returns the last PAGE_SIZE messages, optionally before a cursor for pagination. socket.on('message_history', async (payload: { conversationId: string; before?: string }) => { const { conversationId, before } = payload; @@ -288,6 +319,7 @@ export function registerMessagingHandlers(io: Server, socket: AuthSocket): void } let cursor: Date | undefined; + if (before) { const ref = await db.query.messages.findFirst({ where: eq(messages.id, before), @@ -311,8 +343,6 @@ export function registerMessagingHandlers(io: Server, socket: AuthSocket): void }); // ── delete_message ───────────────────────────────────────────────────────── - // Payload: { messageId: string } - // Sender retraction socket.on('delete_message', async (payload: { messageId: string }) => { const { messageId } = payload; if (!messageId) return; @@ -330,14 +360,13 @@ export function registerMessagingHandlers(io: Server, socket: AuthSocket): void .update(messages) .set({ deletedAt: new Date(), ciphertext: null }) .where(eq(messages.id, messageId)); + await db.delete(messageEnvelopes).where(eq(messageEnvelopes.messageId, messageId)); io.to(message.conversationId).emit('message_deleted', { messageId }); }); // ── message_read ─────────────────────────────────────────────────────────── - // Payload: { conversationId: string; lastReadMessageId: string } - // Persists the caller's read position and broadcasts to the room. socket.on( 'message_read', async (payload: { conversationId: string; lastReadMessageId: string }) => { @@ -358,7 +387,6 @@ export function registerMessagingHandlers(io: Server, socket: AuthSocket): void return; } - // Ensure message exists in this conversation (prevents spoofed reads) const message = await db.query.messages.findFirst({ where: and(eq(messages.id, lastReadMessageId), eq(messages.conversationId, conversationId)), }); @@ -383,15 +411,12 @@ export function registerMessagingHandlers(io: Server, socket: AuthSocket): void io.to(conversationId).volatile.emit('read_receipt', { userId, lastReadMessageId }); - // Persist this receipt to each member's resume stream so a member who is - // offline right now can replay it on reconnect. The receipt is ephemeral - // (Redis only) — the underlying messages are recovered via envelope sync. - // Skip the member lookup entirely when there is no stream to write to. if (redis) { const members = await db.query.conversationMembers.findMany({ where: eq(conversationMembers.conversationId, conversationId), columns: { userId: true }, }); + await publishEphemeral( redis, members.map((member) => member.userId), @@ -401,15 +426,9 @@ export function registerMessagingHandlers(io: Server, socket: AuthSocket): void }, ); - // ── resume ─────────────────────────────────────────────────────────────────── - // Payload: { lastEventId?: string } - // On reconnect, replay the lightweight ephemeral events this device missed - // (receipts, presence, system notices) from its short-lived Redis stream, then - // tell the client to run a full envelope sync for durable messages — which live - // in Postgres and are intentionally never placed on the resume stream. + // ── resume ──────────────────────────────────────────────────────────────── socket.on('resume', async (payload: { lastEventId?: string }) => { if (!redis) { - // No replay backend available; the client must fall back to a full sync. socket.emit('resume_complete', { lastEventId: null, syncRequired: true }); return; } @@ -417,17 +436,20 @@ export function registerMessagingHandlers(io: Server, socket: AuthSocket): void const lastEventId = typeof payload?.lastEventId === 'string' ? payload.lastEventId : ''; const missed = await readMissedEvents(redis, userId, lastEventId); + for (const event of missed) { - socket.emit('ephemeral_replay', { id: event.id, type: event.type, data: event.data }); + socket.emit('ephemeral_replay', { + id: event.id, + type: event.type, + data: event.data, + }); } const newCursor = missed.length > 0 ? missed[missed.length - 1]!.id : lastEventId || null; socket.emit('resume_complete', { lastEventId: newCursor, syncRequired: true }); }); - // ── create_conversation ──────────────────────────────────────────────────── - // Payload: { type: 'dm'|'group'; name?: string; memberIds: string[] } - // Creates a conversation and adds all members (including caller). + // ── create_conversation ─────────────────────────────────────────────────── socket.on( 'create_conversation', async (payload: { type: 'dm' | 'group'; name?: string; memberIds: string[] }) => { @@ -454,52 +476,124 @@ export function registerMessagingHandlers(io: Server, socket: AuthSocket): void await invalidateConversationCaches(allMembers); }, ); - // ── typing_start ──────────────────────────────────────────────────────────── - // Payload: { conversationId: string } - // Broadcasts to the room excluding the sender. No DB write. - socket.on('typing_start', async (payload: { conversationId: string }) => { - const { conversationId } = payload; - const membership = await db.query.conversationMembers.findFirst({ - where: and( - eq(conversationMembers.conversationId, conversationId), - eq(conversationMembers.userId, userId), - ), - }); + // ── typing_start ────────────────────────────────────────────────────────── + socket.on( + 'typing_start', + async (payload?: { conversationId?: string; deviceId?: string; [key: string]: unknown }) => { + if ( + !payload || + typeof payload.conversationId !== 'string' || + !payload.conversationId.trim() + ) { + socket.emit('error', { event: 'typing_start', message: 'Invalid conversationId' }); + return; + } - if (!membership) { - socket.emit('error', { event: 'typing_start', message: 'Not a member of this conversation' }); - return; - } + const conversationId = payload.conversationId.trim(); - socket.to(conversationId).volatile.emit('typing_start', { conversationId, userId }); - }); + if (!socket.rooms?.has(conversationId)) { + const membership = await db.query.conversationMembers.findFirst({ + where: and( + eq(conversationMembers.conversationId, conversationId), + eq(conversationMembers.userId, userId), + ), + }); - // ── typing_stop ───────────────────────────────────────────────────────────── - // Payload: { conversationId: string } - // Broadcasts to the room excluding the sender. No DB write. - socket.on('typing_stop', async (payload: { conversationId: string }) => { - const { conversationId } = payload; + if (!membership) { + socket.emit('error', { + event: 'typing_start', + message: 'Not a member of this conversation', + }); + return; + } + } - const membership = await db.query.conversationMembers.findFirst({ - where: and( - eq(conversationMembers.conversationId, conversationId), - eq(conversationMembers.userId, userId), - ), - }); + const relayPayload: { conversationId: string; userId: string; deviceId?: string } = { + conversationId, + userId, + }; - if (!membership) { - socket.emit('error', { event: 'typing_stop', message: 'Not a member of this conversation' }); - return; - } + if (typeof payload.deviceId === 'string' && payload.deviceId.trim()) { + relayPayload.deviceId = payload.deviceId.trim(); + } - socket.to(conversationId).volatile.emit('typing_stop', { conversationId, userId }); - }); + const timerKey = relayPayload.deviceId + ? `${conversationId}:${relayPayload.deviceId}` + : conversationId; + + const existingTimer = typingTimers.get(timerKey); + if (existingTimer) { + clearTimeout(existingTimer); + } + + const timer = setTimeout(() => { + typingTimers.delete(timerKey); + socket.to(conversationId).emit('typing_stop', relayPayload); + }, 5000); + + typingTimers.set(timerKey, timer); - // ── ask_assistant ────────────────────────────────────────────────────────── - // Payload: { conversationId: string; content: string } - // Forwards to AI agent and posts reply from reserved assistant user. - // Rate-limit: 5 requests per user per minute. + socket.to(conversationId).emit('typing_start', relayPayload); + }, + ); + + // ── typing_stop ─────────────────────────────────────────────────────────── + socket.on( + 'typing_stop', + async (payload?: { conversationId?: string; deviceId?: string; [key: string]: unknown }) => { + if ( + !payload || + typeof payload.conversationId !== 'string' || + !payload.conversationId.trim() + ) { + socket.emit('error', { event: 'typing_stop', message: 'Invalid conversationId' }); + return; + } + + const conversationId = payload.conversationId.trim(); + + if (!socket.rooms?.has(conversationId)) { + const membership = await db.query.conversationMembers.findFirst({ + where: and( + eq(conversationMembers.conversationId, conversationId), + eq(conversationMembers.userId, userId), + ), + }); + + if (!membership) { + socket.emit('error', { + event: 'typing_stop', + message: 'Not a member of this conversation', + }); + return; + } + } + + const relayPayload: { conversationId: string; userId: string; deviceId?: string } = { + conversationId, + userId, + }; + + if (typeof payload.deviceId === 'string' && payload.deviceId.trim()) { + relayPayload.deviceId = payload.deviceId.trim(); + } + + const timerKey = relayPayload.deviceId + ? `${conversationId}:${relayPayload.deviceId}` + : conversationId; + + const existingTimer = typingTimers.get(timerKey); + if (existingTimer) { + clearTimeout(existingTimer); + typingTimers.delete(timerKey); + } + + socket.to(conversationId).emit('typing_stop', relayPayload); + }, + ); + + // ── ask_assistant ───────────────────────────────────────────────────────── const ASSISTANT_USER_ID = '00000000-0000-4000-8000-000000000000'; socket.on('ask_assistant', async (payload: { conversationId: string; content: string }) => { @@ -524,28 +618,25 @@ export function registerMessagingHandlers(io: Server, socket: AuthSocket): void return; } - // Rate limiting if (redis) { const rlKey = `rl:ask_assistant:${userId}`; const count = await redis.incr(rlKey); + if (count === 1) { await redis.expire(rlKey, 60); } + if (count > 5) { socket.emit('error', { event: 'rate_limited', message: 'Rate limit exceeded' }); return; } } - // Forward to AI agent try { const response = await fetch('http://localhost:8000/chat', { method: 'POST', headers: { 'Content-Type': 'application/json' }, - body: JSON.stringify({ - message: content, - conversation_id: conversationId, - }), + body: JSON.stringify({ message: content, conversation_id: conversationId }), }); if (!response.ok) { @@ -554,23 +645,22 @@ export function registerMessagingHandlers(io: Server, socket: AuthSocket): void const data = (await response.json()) as { reply: string }; - // Ensure assistant user exists (upsert) - // Usually done via migration, but we can safely do it here or assume it exists. - // To be safe, we'll try to insert it and ignore conflict. await db.execute(sql` INSERT INTO users (id, username, avatar_url) - VALUES (${ASSISTANT_USER_ID}, 'Assistant', 'https://ui-avatars.com/api/?name=AI&background=0D8ABC&color=fff') + VALUES ( + ${ASSISTANT_USER_ID}, + 'Assistant', + 'https://ui-avatars.com/api/?name=AI&background=0D8ABC&color=fff' + ) ON CONFLICT (id) DO NOTHING `); - // Add to conversation members if not already await db.execute(sql` INSERT INTO conversation_members (conversation_id, user_id) VALUES (${conversationId}, ${ASSISTANT_USER_ID}) ON CONFLICT DO NOTHING `); - // Post the reply const [replyMessage] = await db .insert(messages) .values({ diff --git a/apps/web/src/components/chat/MessageInput.tsx b/apps/web/src/components/chat/MessageInput.tsx index 9ccc66a..85c2136 100644 --- a/apps/web/src/components/chat/MessageInput.tsx +++ b/apps/web/src/components/chat/MessageInput.tsx @@ -1,6 +1,6 @@ "use client"; -import React, { useState } from "react"; +import React, { useState, useRef } from "react"; import type { Socket } from "socket.io-client"; import transferToken from "../../lib/soroban"; @@ -15,9 +15,35 @@ export default function MessageInput({ conversationId, recipient, socket }: Prop const [showPay, setShowPay] = useState(false); const [amount, setAmount] = useState(""); const [busy, setBusy] = useState(false); + const typingTimerRef = useRef | null>(null); + const isTypingRef = useRef(false); + + function stopTyping() { + if (socket && conversationId && isTypingRef.current) { + isTypingRef.current = false; + if (typingTimerRef.current) clearTimeout(typingTimerRef.current); + socket.emit("typing_stop", { conversationId }); + } + } + + function handleTextChange(e: React.ChangeEvent) { + setText(e.target.value); + if (socket && conversationId) { + if (!isTypingRef.current) { + isTypingRef.current = true; + socket.emit("typing_start", { conversationId }); + } + if (typingTimerRef.current) clearTimeout(typingTimerRef.current); + typingTimerRef.current = setTimeout(() => { + isTypingRef.current = false; + socket.emit("typing_stop", { conversationId }); + }, 2000); + } + } function handleSendText() { if (!text.trim() || !socket) return; + stopTyping(); socket.emit("send_message", { conversationId, content: text.trim(), @@ -38,6 +64,7 @@ export default function MessageInput({ conversationId, recipient, socket }: Prop } setBusy(true); + stopTyping(); try { const txHash = await transferToken(recipient, Math.floor(n)); const transferMsg = { @@ -75,7 +102,7 @@ export default function MessageInput({ conversationId, recipient, socket }: Prop className="flex-1 p-2 border rounded" placeholder="Type a message..." value={text} - onChange={(e) => setText(e.target.value)} + onChange={handleTextChange} onKeyDown={(e) => { if (e.key === "Enter") handleSendText(); }} @@ -131,4 +158,4 @@ export default function MessageInput({ conversationId, recipient, socket }: Prop )} ); -} +} \ No newline at end of file