From 6486f92c94ed5ae871d84c61c83e1705fa67eae9 Mon Sep 17 00:00:00 2001 From: "Huncho.Dev" Date: Sun, 28 Jun 2026 04:14:50 +0000 Subject: [PATCH 1/3] #219 Typing indicators (ephemeral, never stored) FIXED --- apps/backend/src/__tests__/typing.test.ts | 293 ++++++++++++++++++ apps/backend/src/routes/conversations.ts | 4 +- apps/backend/src/routes/treasury.ts | 5 +- apps/backend/src/socket/messaging.ts | 160 ++++++++-- apps/web/src/components/chat/MessageInput.tsx | 33 +- 5 files changed, 455 insertions(+), 40 deletions(-) create mode 100644 apps/backend/src/__tests__/typing.test.ts diff --git a/apps/backend/src/__tests__/typing.test.ts b/apps/backend/src/__tests__/typing.test.ts new file mode 100644 index 0000000..6907d73 --- /dev/null +++ b/apps/backend/src/__tests__/typing.test.ts @@ -0,0 +1,293 @@ +import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest'; +import { EventEmitter } from 'events'; + +// ── Mock DB ──────────────────────────────────────────────────────────────── + +const mockFindFirst = vi.fn(); +const mockFindMany = vi.fn(); +const mockInsert = vi.fn(); +const mockUpdate = vi.fn(); + +vi.mock('../db/index.js', () => ({ + db: { + query: { + conversationMembers: { + findFirst: mockFindFirst, + findMany: mockFindMany, + }, + }, + insert: mockInsert, + update: mockUpdate, + }, +})); + +vi.mock('../db/schema.js', () => ({ + conversationMembers: {}, + conversations: {}, + messages: {}, +})); + +vi.mock('drizzle-orm', () => ({ + and: vi.fn((...args: unknown[]) => args), + eq: vi.fn((col: unknown, val: unknown) => ({ col, val })), + lt: vi.fn(), + desc: vi.fn(), + sql: vi.fn(), +})); + +vi.mock('../lib/conversationCache.js', () => ({ + invalidateConversationCaches: vi.fn(), +})); + +vi.mock('../lib/messages.js', () => ({ + serializeMessage: vi.fn(), +})); + +vi.mock('../lib/redis.js', () => ({ + redis: null, +})); + +// ── Mock Socket helpers ──────────────────────────────────────────────────── + +function makeSocket(userId: string, rooms: string[] = []) { + const emitter = new EventEmitter(); + const emitted: { event: string; data: unknown }[] = []; + const roomEmitted: { room: string; event: string; data: unknown }[] = []; + + const socket = Object.assign(emitter, { + id: `sock-${userId}`, + auth: { userId }, + rooms: new Set(rooms), + emit: vi.fn((event: string, data: unknown) => { + emitted.push({ event, data }); + }), + to: vi.fn((room: string) => ({ + emit: vi.fn((event: string, data: unknown) => { + roomEmitted.push({ room, event, data }); + }), + })), + join: vi.fn((room: string) => { + socket.rooms.add(room); + }), + emitted, + roomEmitted, + }); + + return socket; +} + +function makeIo() { + const roomEmitted: { room: string; event: string; data: unknown }[] = []; + const io = { + to: vi.fn((room: string) => ({ + emit: vi.fn((event: string, data: unknown) => { + roomEmitted.push({ room, event, data }); + }), + })), + roomEmitted, + }; + return io; +} + +// ── Tests ────────────────────────────────────────────────────────────────── + +describe('Typing indicator Socket events (typing_start / typing_stop)', () => { + beforeEach(() => { + vi.clearAllMocks(); + vi.useFakeTimers(); + }); + + afterEach(() => { + vi.useRealTimers(); + }); + + it('relays typing_start to conversation room members with zero DB writes', async () => { + const userId = 'user-123'; + const conversationId = 'conv-abc'; + const socket = makeSocket(userId, [conversationId]); + const io = makeIo(); + + const { registerMessagingHandlers } = await import('../socket/messaging.js'); + registerMessagingHandlers(io as never, socket as never); + + const handler = (socket as EventEmitter).listeners('typing_start')[0] as (p: unknown) => Promise; + await handler({ conversationId }); + + // Zero DB writes + expect(mockInsert).not.toHaveBeenCalled(); + expect(mockUpdate).not.toHaveBeenCalled(); + + // Relayed to room via socket.to(room).emit + expect(socket.to).toHaveBeenCalledWith(conversationId); + expect(socket.roomEmitted).toContainEqual({ + room: conversationId, + event: 'typing_start', + data: { conversationId, userId }, + }); + }); + + it('includes optional deviceId but never relays content', async () => { + const userId = 'user-123'; + const conversationId = 'conv-abc'; + const deviceId = 'device-xyz'; + const socket = makeSocket(userId, [conversationId]); + const io = makeIo(); + + const { registerMessagingHandlers } = await import('../socket/messaging.js'); + registerMessagingHandlers(io as never, socket as never); + + const handler = (socket as EventEmitter).listeners('typing_start')[0] as (p: unknown) => Promise; + await handler({ + conversationId, + deviceId, + content: 'SUPER SECRET CONFIDENTIAL TEXT', + extraField: 12345, + }); + + expect(socket.roomEmitted).toContainEqual({ + room: conversationId, + event: 'typing_start', + data: { conversationId, userId, deviceId }, + }); + + const emittedPayload = socket.roomEmitted[0]!.data as Record; + expect(emittedPayload).not.toHaveProperty('content'); + expect(emittedPayload).not.toHaveProperty('extraField'); + }); + + it('auto-clears typing state after timeout (5 seconds) if no typing_stop', async () => { + const userId = 'user-timer'; + const conversationId = 'conv-timer'; + const socket = makeSocket(userId, [conversationId]); + const io = makeIo(); + + const { registerMessagingHandlers } = await import('../socket/messaging.js'); + registerMessagingHandlers(io as never, socket as never); + + const startHandler = (socket as EventEmitter).listeners('typing_start')[0] as (p: unknown) => Promise; + await startHandler({ conversationId }); + + expect(socket.roomEmitted).toHaveLength(1); + expect(socket.roomEmitted[0]?.event).toBe('typing_start'); + + // Advance time by 4.9 seconds - should not clear yet + vi.advanceTimersByTime(4900); + expect(socket.roomEmitted).toHaveLength(1); + + // Advance time past 5 seconds + vi.advanceTimersByTime(100); + expect(socket.roomEmitted).toHaveLength(2); + expect(socket.roomEmitted[1]).toEqual({ + room: conversationId, + event: 'typing_stop', + data: { conversationId, userId }, + }); + }); + + it('manual typing_stop clears auto-expire timeout and relays typing_stop', async () => { + const userId = 'user-stop'; + const conversationId = 'conv-stop'; + const socket = makeSocket(userId, [conversationId]); + const io = makeIo(); + + const { registerMessagingHandlers } = await import('../socket/messaging.js'); + registerMessagingHandlers(io as never, socket as never); + + const startHandler = (socket as EventEmitter).listeners('typing_start')[0] as (p: unknown) => Promise; + const stopHandler = (socket as EventEmitter).listeners('typing_stop')[0] as (p: unknown) => Promise; + + await startHandler({ conversationId }); + await stopHandler({ conversationId }); + + expect(socket.roomEmitted).toHaveLength(2); + expect(socket.roomEmitted[1]?.event).toBe('typing_stop'); + + // Advance time by 10 seconds - timer should have been cancelled, no duplicate typing_stop + vi.advanceTimersByTime(10000); + expect(socket.roomEmitted).toHaveLength(2); + }); + + it('guards non-members when socket not in room and DB membership check fails', async () => { + const userId = 'outsider'; + const conversationId = 'conv-private'; + const socket = makeSocket(userId, []); // not in room + const io = makeIo(); + + mockFindFirst.mockResolvedValueOnce(undefined); // DB check says not a member + + const { registerMessagingHandlers } = await import('../socket/messaging.js'); + registerMessagingHandlers(io as never, socket as never); + + const startHandler = (socket as EventEmitter).listeners('typing_start')[0] as (p: unknown) => Promise; + await startHandler({ conversationId }); + + expect(socket.to).not.toHaveBeenCalled(); + expect(socket.emit).toHaveBeenCalledWith( + 'error', + expect.objectContaining({ + event: 'typing_start', + message: expect.stringContaining('member'), + }), + ); + }); + + it('clears active typing state on disconnect', async () => { + const userId = 'user-dc'; + const conversationId = 'conv-dc'; + const deviceId = 'dev-dc'; + const socket = makeSocket(userId, [conversationId]); + const io = makeIo(); + + const { registerMessagingHandlers } = await import('../socket/messaging.js'); + registerMessagingHandlers(io as never, socket as never); + + const startHandler = (socket as EventEmitter).listeners('typing_start')[0] as (p: unknown) => Promise; + await startHandler({ conversationId, deviceId }); + + expect(socket.roomEmitted).toHaveLength(1); + + // Trigger disconnect + const dcHandlers = (socket as EventEmitter).listeners('disconnect'); + for (const h of dcHandlers) { + h(); + } + + expect(socket.roomEmitted).toHaveLength(2); + expect(socket.roomEmitted[1]).toEqual({ + room: conversationId, + event: 'typing_stop', + data: { conversationId, userId, deviceId }, + }); + }); + + it('clears active typing state on send_message', async () => { + const userId = 'user-msg'; + const conversationId = 'conv-msg'; + const socket = makeSocket(userId, [conversationId]); + const io = makeIo(); + + mockFindFirst.mockResolvedValue({ id: 'mem-1', userId, conversationId }); + mockFindMany.mockResolvedValue([]); + const returnFn = vi.fn().mockResolvedValue([{ id: 'msg-1', content: 'hello' }]); + const valFn = vi.fn().mockReturnValue({ returning: returnFn }); + mockInsert.mockReturnValue({ values: valFn }); + + const { registerMessagingHandlers } = await import('../socket/messaging.js'); + registerMessagingHandlers(io as never, socket as never); + + const startHandler = (socket as EventEmitter).listeners('typing_start')[0] as (p: unknown) => Promise; + const sendHandler = (socket as EventEmitter).listeners('send_message')[0] as (p: unknown) => Promise; + + await startHandler({ conversationId }); + expect(socket.roomEmitted).toHaveLength(1); + + await sendHandler({ conversationId, content: 'Done typing!' }); + + // Should emit new_message (io.to) AND typing_stop (socket.to) + expect(socket.roomEmitted).toContainEqual({ + room: conversationId, + event: 'typing_stop', + data: { conversationId, userId }, + }); + }); +}); \ No newline at end of file diff --git a/apps/backend/src/routes/conversations.ts b/apps/backend/src/routes/conversations.ts index 822070f..dfcfb14 100644 --- a/apps/backend/src/routes/conversations.ts +++ b/apps/backend/src/routes/conversations.ts @@ -95,7 +95,7 @@ conversationsRouter.get('/', async (req: AuthRequest, res) => { with: { conversation: conversationRelations as never, }, - })) as unknown as Array<{ conversationId: string; conversation: ConversationPayload }>; + })) as unknown as Array<{ conversationId: string; isMuted: boolean; isArchived: boolean; conversation: ConversationPayload }>; // Single subquery for message counts — no N+1 const conversationIds = memberships.map((m) => m.conversationId); @@ -762,4 +762,4 @@ conversationsRouter.delete('/:id/leave', async (req: AuthRequest, res) => { await invalidateConversationCaches(members.map((member) => member.userId)); res.status(204).send(); -}); +}); \ No newline at end of file diff --git a/apps/backend/src/routes/treasury.ts b/apps/backend/src/routes/treasury.ts index 660f768..b9b28d4 100644 --- a/apps/backend/src/routes/treasury.ts +++ b/apps/backend/src/routes/treasury.ts @@ -1,9 +1,10 @@ import { Router } from 'express'; +import type { IRouter } from 'express'; import { z } from 'zod'; import { requireAuth, type AuthRequest } from '../middleware/auth.js'; import { validate } from '../middleware/validate.js'; -export const treasuryRouter = Router(); +export const treasuryRouter: IRouter = Router(); treasuryRouter.use(requireAuth); @@ -38,4 +39,4 @@ treasuryRouter.post('/propose', validate(proposeSchema), async (req, res) => { recipient, ttlLedgers: TTL_LEDGERS[ttl], }); -}); +}); \ No newline at end of file diff --git a/apps/backend/src/socket/messaging.ts b/apps/backend/src/socket/messaging.ts index 17d3bab..a3291d7 100644 --- a/apps/backend/src/socket/messaging.ts +++ b/apps/backend/src/socket/messaging.ts @@ -11,6 +11,23 @@ const PAGE_SIZE = 30; export function registerMessagingHandlers(io: Server, socket: AuthSocket): void { const userId = socket.auth!.userId; + const typingTimers = new Map(); + + socket.on('disconnect', () => { + for (const [timerKey, timer] of typingTimers.entries()) { + clearTimeout(timer); + const idx = timerKey.indexOf(':'); + const cid = idx === -1 ? timerKey : timerKey.slice(0, idx); + const did = idx === -1 ? undefined : timerKey.slice(idx + 1); + const rp: { conversationId: string; userId: string; deviceId?: string } = { + conversationId: cid, + userId, + }; + if (did) rp.deviceId = did; + socket.to(cid).emit('typing_stop', rp); + } + typingTimers.clear(); + }); // ── join_room ────────────────────────────────────────────────────────────── // Payload: { conversationId: string } @@ -64,6 +81,21 @@ export function registerMessagingHandlers(io: Server, socket: AuthSocket): void io.to(conversationId).emit('new_message', message); + for (const [timerKey, timer] of typingTimers.entries()) { + if (timerKey === conversationId || timerKey.startsWith(`${conversationId}:`)) { + clearTimeout(timer); + typingTimers.delete(timerKey); + const idx = timerKey.indexOf(':'); + const did = idx === -1 ? undefined : timerKey.slice(idx + 1); + const rp: { conversationId: string; userId: string; deviceId?: string } = { + conversationId, + userId, + }; + if (did) rp.deviceId = did; + socket.to(conversationId).emit('typing_stop', rp); + } + } + const members = await db.query.conversationMembers.findMany({ where: eq(conversationMembers.conversationId, conversationId), columns: { userId: true }, @@ -196,46 +228,108 @@ export function registerMessagingHandlers(io: Server, socket: AuthSocket): void }, ); // ── typing_start ──────────────────────────────────────────────────────────── - // Payload: { conversationId: string } - // Broadcasts to the room excluding the sender. No DB write. - socket.on('typing_start', async (payload: { conversationId: string }) => { - const { conversationId } = payload; + // Payload: { conversationId: string; deviceId?: string } + // Broadcasts to the room excluding the sender via Pub/Sub. Zero DB write. Auto-expires. + socket.on( + 'typing_start', + async (payload?: { conversationId?: string; deviceId?: string; [key: string]: unknown }) => { + if (!payload || typeof payload.conversationId !== 'string' || !payload.conversationId.trim()) { + socket.emit('error', { event: 'typing_start', message: 'Invalid conversationId' }); + return; + } - const membership = await db.query.conversationMembers.findFirst({ - where: and( - eq(conversationMembers.conversationId, conversationId), - eq(conversationMembers.userId, userId), - ), - }); + const conversationId = payload.conversationId.trim(); - if (!membership) { - socket.emit('error', { event: 'typing_start', message: 'Not a member of this conversation' }); - return; - } + if (!socket.rooms?.has(conversationId)) { + const membership = await db.query.conversationMembers.findFirst({ + where: and( + eq(conversationMembers.conversationId, conversationId), + eq(conversationMembers.userId, userId), + ), + }); - socket.to(conversationId).emit('typing_start', { conversationId, userId }); - }); + if (!membership) { + socket.emit('error', { + event: 'typing_start', + message: 'Not a member of this conversation', + }); + return; + } + } + + const relayPayload: { conversationId: string; userId: string; deviceId?: string } = { + conversationId, + userId, + }; + if (typeof payload.deviceId === 'string' && payload.deviceId.trim()) { + relayPayload.deviceId = payload.deviceId.trim(); + } + + const timerKey = relayPayload.deviceId ? `${conversationId}:${relayPayload.deviceId}` : conversationId; + const existingTimer = typingTimers.get(timerKey); + if (existingTimer) { + clearTimeout(existingTimer); + } + + const timer = setTimeout(() => { + typingTimers.delete(timerKey); + socket.to(conversationId).emit('typing_stop', relayPayload); + }, 5000); + + typingTimers.set(timerKey, timer); + + socket.to(conversationId).emit('typing_start', relayPayload); + }, + ); // ── typing_stop ───────────────────────────────────────────────────────────── - // Payload: { conversationId: string } - // Broadcasts to the room excluding the sender. No DB write. - socket.on('typing_stop', async (payload: { conversationId: string }) => { - const { conversationId } = payload; + // Payload: { conversationId: string; deviceId?: string } + // Broadcasts to the room excluding the sender via Pub/Sub. Zero DB write. + socket.on( + 'typing_stop', + async (payload?: { conversationId?: string; deviceId?: string; [key: string]: unknown }) => { + if (!payload || typeof payload.conversationId !== 'string' || !payload.conversationId.trim()) { + socket.emit('error', { event: 'typing_stop', message: 'Invalid conversationId' }); + return; + } - const membership = await db.query.conversationMembers.findFirst({ - where: and( - eq(conversationMembers.conversationId, conversationId), - eq(conversationMembers.userId, userId), - ), - }); + const conversationId = payload.conversationId.trim(); - if (!membership) { - socket.emit('error', { event: 'typing_stop', message: 'Not a member of this conversation' }); - return; - } + if (!socket.rooms?.has(conversationId)) { + const membership = await db.query.conversationMembers.findFirst({ + where: and( + eq(conversationMembers.conversationId, conversationId), + eq(conversationMembers.userId, userId), + ), + }); - socket.to(conversationId).emit('typing_stop', { conversationId, userId }); - }); + if (!membership) { + socket.emit('error', { + event: 'typing_stop', + message: 'Not a member of this conversation', + }); + return; + } + } + + const relayPayload: { conversationId: string; userId: string; deviceId?: string } = { + conversationId, + userId, + }; + if (typeof payload.deviceId === 'string' && payload.deviceId.trim()) { + relayPayload.deviceId = payload.deviceId.trim(); + } + + const timerKey = relayPayload.deviceId ? `${conversationId}:${relayPayload.deviceId}` : conversationId; + const existingTimer = typingTimers.get(timerKey); + if (existingTimer) { + clearTimeout(existingTimer); + typingTimers.delete(timerKey); + } + + socket.to(conversationId).emit('typing_stop', relayPayload); + }, + ); // ── ask_assistant ────────────────────────────────────────────────────────── // Payload: { conversationId: string; content: string } @@ -334,4 +428,4 @@ export function registerMessagingHandlers(io: Server, socket: AuthSocket): void socket.emit('error', { event: 'ask_assistant', message: 'Failed to get AI reply' }); } }); -} +} \ No newline at end of file diff --git a/apps/web/src/components/chat/MessageInput.tsx b/apps/web/src/components/chat/MessageInput.tsx index 9ccc66a..85c2136 100644 --- a/apps/web/src/components/chat/MessageInput.tsx +++ b/apps/web/src/components/chat/MessageInput.tsx @@ -1,6 +1,6 @@ "use client"; -import React, { useState } from "react"; +import React, { useState, useRef } from "react"; import type { Socket } from "socket.io-client"; import transferToken from "../../lib/soroban"; @@ -15,9 +15,35 @@ export default function MessageInput({ conversationId, recipient, socket }: Prop const [showPay, setShowPay] = useState(false); const [amount, setAmount] = useState(""); const [busy, setBusy] = useState(false); + const typingTimerRef = useRef | null>(null); + const isTypingRef = useRef(false); + + function stopTyping() { + if (socket && conversationId && isTypingRef.current) { + isTypingRef.current = false; + if (typingTimerRef.current) clearTimeout(typingTimerRef.current); + socket.emit("typing_stop", { conversationId }); + } + } + + function handleTextChange(e: React.ChangeEvent) { + setText(e.target.value); + if (socket && conversationId) { + if (!isTypingRef.current) { + isTypingRef.current = true; + socket.emit("typing_start", { conversationId }); + } + if (typingTimerRef.current) clearTimeout(typingTimerRef.current); + typingTimerRef.current = setTimeout(() => { + isTypingRef.current = false; + socket.emit("typing_stop", { conversationId }); + }, 2000); + } + } function handleSendText() { if (!text.trim() || !socket) return; + stopTyping(); socket.emit("send_message", { conversationId, content: text.trim(), @@ -38,6 +64,7 @@ export default function MessageInput({ conversationId, recipient, socket }: Prop } setBusy(true); + stopTyping(); try { const txHash = await transferToken(recipient, Math.floor(n)); const transferMsg = { @@ -75,7 +102,7 @@ export default function MessageInput({ conversationId, recipient, socket }: Prop className="flex-1 p-2 border rounded" placeholder="Type a message..." value={text} - onChange={(e) => setText(e.target.value)} + onChange={handleTextChange} onKeyDown={(e) => { if (e.key === "Enter") handleSendText(); }} @@ -131,4 +158,4 @@ export default function MessageInput({ conversationId, recipient, socket }: Prop )} ); -} +} \ No newline at end of file From 07d0d28efff976c5fd6aad8b925d755a3044e139 Mon Sep 17 00:00:00 2001 From: "Huncho.Dev" Date: Mon, 29 Jun 2026 09:38:44 +0000 Subject: [PATCH 2/3] fix(ci): format backend files --- apps/backend/src/__tests__/typing.test.ts | 38 +++++++++++++++++------ apps/backend/src/routes/conversations.ts | 7 ++--- apps/backend/src/routes/treasury.ts | 2 +- apps/backend/src/socket/messaging.ts | 22 ++++++++++--- 4 files changed, 48 insertions(+), 21 deletions(-) diff --git a/apps/backend/src/__tests__/typing.test.ts b/apps/backend/src/__tests__/typing.test.ts index 6907d73..d89f380 100644 --- a/apps/backend/src/__tests__/typing.test.ts +++ b/apps/backend/src/__tests__/typing.test.ts @@ -110,7 +110,9 @@ describe('Typing indicator Socket events (typing_start / typing_stop)', () => { const { registerMessagingHandlers } = await import('../socket/messaging.js'); registerMessagingHandlers(io as never, socket as never); - const handler = (socket as EventEmitter).listeners('typing_start')[0] as (p: unknown) => Promise; + const handler = (socket as EventEmitter).listeners('typing_start')[0] as ( + p: unknown, + ) => Promise; await handler({ conversationId }); // Zero DB writes @@ -136,7 +138,9 @@ describe('Typing indicator Socket events (typing_start / typing_stop)', () => { const { registerMessagingHandlers } = await import('../socket/messaging.js'); registerMessagingHandlers(io as never, socket as never); - const handler = (socket as EventEmitter).listeners('typing_start')[0] as (p: unknown) => Promise; + const handler = (socket as EventEmitter).listeners('typing_start')[0] as ( + p: unknown, + ) => Promise; await handler({ conversationId, deviceId, @@ -164,7 +168,9 @@ describe('Typing indicator Socket events (typing_start / typing_stop)', () => { const { registerMessagingHandlers } = await import('../socket/messaging.js'); registerMessagingHandlers(io as never, socket as never); - const startHandler = (socket as EventEmitter).listeners('typing_start')[0] as (p: unknown) => Promise; + const startHandler = (socket as EventEmitter).listeners('typing_start')[0] as ( + p: unknown, + ) => Promise; await startHandler({ conversationId }); expect(socket.roomEmitted).toHaveLength(1); @@ -193,8 +199,12 @@ describe('Typing indicator Socket events (typing_start / typing_stop)', () => { const { registerMessagingHandlers } = await import('../socket/messaging.js'); registerMessagingHandlers(io as never, socket as never); - const startHandler = (socket as EventEmitter).listeners('typing_start')[0] as (p: unknown) => Promise; - const stopHandler = (socket as EventEmitter).listeners('typing_stop')[0] as (p: unknown) => Promise; + const startHandler = (socket as EventEmitter).listeners('typing_start')[0] as ( + p: unknown, + ) => Promise; + const stopHandler = (socket as EventEmitter).listeners('typing_stop')[0] as ( + p: unknown, + ) => Promise; await startHandler({ conversationId }); await stopHandler({ conversationId }); @@ -218,7 +228,9 @@ describe('Typing indicator Socket events (typing_start / typing_stop)', () => { const { registerMessagingHandlers } = await import('../socket/messaging.js'); registerMessagingHandlers(io as never, socket as never); - const startHandler = (socket as EventEmitter).listeners('typing_start')[0] as (p: unknown) => Promise; + const startHandler = (socket as EventEmitter).listeners('typing_start')[0] as ( + p: unknown, + ) => Promise; await startHandler({ conversationId }); expect(socket.to).not.toHaveBeenCalled(); @@ -241,7 +253,9 @@ describe('Typing indicator Socket events (typing_start / typing_stop)', () => { const { registerMessagingHandlers } = await import('../socket/messaging.js'); registerMessagingHandlers(io as never, socket as never); - const startHandler = (socket as EventEmitter).listeners('typing_start')[0] as (p: unknown) => Promise; + const startHandler = (socket as EventEmitter).listeners('typing_start')[0] as ( + p: unknown, + ) => Promise; await startHandler({ conversationId, deviceId }); expect(socket.roomEmitted).toHaveLength(1); @@ -275,8 +289,12 @@ describe('Typing indicator Socket events (typing_start / typing_stop)', () => { const { registerMessagingHandlers } = await import('../socket/messaging.js'); registerMessagingHandlers(io as never, socket as never); - const startHandler = (socket as EventEmitter).listeners('typing_start')[0] as (p: unknown) => Promise; - const sendHandler = (socket as EventEmitter).listeners('send_message')[0] as (p: unknown) => Promise; + const startHandler = (socket as EventEmitter).listeners('typing_start')[0] as ( + p: unknown, + ) => Promise; + const sendHandler = (socket as EventEmitter).listeners('send_message')[0] as ( + p: unknown, + ) => Promise; await startHandler({ conversationId }); expect(socket.roomEmitted).toHaveLength(1); @@ -290,4 +308,4 @@ describe('Typing indicator Socket events (typing_start / typing_stop)', () => { data: { conversationId, userId }, }); }); -}); \ No newline at end of file +}); diff --git a/apps/backend/src/routes/conversations.ts b/apps/backend/src/routes/conversations.ts index 75a71c6..f0ab8e1 100644 --- a/apps/backend/src/routes/conversations.ts +++ b/apps/backend/src/routes/conversations.ts @@ -104,12 +104,9 @@ conversationsRouter.get('/', async (req: AuthRequest, res) => { eq(conversationMembers.userId, userId), showArchived ? undefined : ne(conversationMembers.isArchived, true), ), - with: { + with: { conversation: getConversationRelations(req.auth!.deviceId) as never, }, - - })) as unknown as Array<{ conversationId: string; isMuted: boolean; isArchived: boolean; conversation: ConversationPayload }>; - })) as unknown as Array<{ conversationId: string; isMuted: boolean; @@ -730,4 +727,4 @@ conversationsRouter.delete('/:id/leave', async (req: AuthRequest, res) => { await invalidateConversationCaches(members.map((member) => member.userId)); res.status(204).send(); -}); \ No newline at end of file +}); diff --git a/apps/backend/src/routes/treasury.ts b/apps/backend/src/routes/treasury.ts index d42ed46..6abee31 100644 --- a/apps/backend/src/routes/treasury.ts +++ b/apps/backend/src/routes/treasury.ts @@ -43,4 +43,4 @@ treasuryRouter.post('/propose', validate(proposeSchema), async (req, res) => { recipient, ttlLedgers: TTL_LEDGERS[ttl], }); -}); \ No newline at end of file +}); diff --git a/apps/backend/src/socket/messaging.ts b/apps/backend/src/socket/messaging.ts index 5a6d8af..8fe77fe 100644 --- a/apps/backend/src/socket/messaging.ts +++ b/apps/backend/src/socket/messaging.ts @@ -499,7 +499,11 @@ export function registerMessagingHandlers(io: Server, socket: AuthSocket): void socket.on( 'typing_start', async (payload?: { conversationId?: string; deviceId?: string; [key: string]: unknown }) => { - if (!payload || typeof payload.conversationId !== 'string' || !payload.conversationId.trim()) { + if ( + !payload || + typeof payload.conversationId !== 'string' || + !payload.conversationId.trim() + ) { socket.emit('error', { event: 'typing_start', message: 'Invalid conversationId' }); return; } @@ -532,7 +536,9 @@ export function registerMessagingHandlers(io: Server, socket: AuthSocket): void relayPayload.deviceId = payload.deviceId.trim(); } - const timerKey = relayPayload.deviceId ? `${conversationId}:${relayPayload.deviceId}` : conversationId; + const timerKey = relayPayload.deviceId + ? `${conversationId}:${relayPayload.deviceId}` + : conversationId; const existingTimer = typingTimers.get(timerKey); if (existingTimer) { clearTimeout(existingTimer); @@ -559,7 +565,11 @@ export function registerMessagingHandlers(io: Server, socket: AuthSocket): void socket.on( 'typing_stop', async (payload?: { conversationId?: string; deviceId?: string; [key: string]: unknown }) => { - if (!payload || typeof payload.conversationId !== 'string' || !payload.conversationId.trim()) { + if ( + !payload || + typeof payload.conversationId !== 'string' || + !payload.conversationId.trim() + ) { socket.emit('error', { event: 'typing_stop', message: 'Invalid conversationId' }); return; } @@ -592,7 +602,9 @@ export function registerMessagingHandlers(io: Server, socket: AuthSocket): void relayPayload.deviceId = payload.deviceId.trim(); } - const timerKey = relayPayload.deviceId ? `${conversationId}:${relayPayload.deviceId}` : conversationId; + const timerKey = relayPayload.deviceId + ? `${conversationId}:${relayPayload.deviceId}` + : conversationId; const existingTimer = typingTimers.get(timerKey); if (existingTimer) { clearTimeout(existingTimer); @@ -705,4 +717,4 @@ export function registerMessagingHandlers(io: Server, socket: AuthSocket): void socket.emit('error', { event: 'ask_assistant', message: 'Failed to get AI reply' }); } }); -} \ No newline at end of file +} From 76876eba2013b3e8097ad547189dab9bc3ee3393 Mon Sep 17 00:00:00 2001 From: "Huncho.Dev" Date: Mon, 29 Jun 2026 11:12:05 +0000 Subject: [PATCH 3/3] fix(ci): resolve backend formatting and typing handler issues --- apps/backend/src/routes/conversations.ts | 3 +- apps/backend/src/routes/treasury.ts | 5 - apps/backend/src/socket/messaging.ts | 187 ++++++++++------------- 3 files changed, 78 insertions(+), 117 deletions(-) diff --git a/apps/backend/src/routes/conversations.ts b/apps/backend/src/routes/conversations.ts index f0ab8e1..673385b 100644 --- a/apps/backend/src/routes/conversations.ts +++ b/apps/backend/src/routes/conversations.ts @@ -104,7 +104,7 @@ conversationsRouter.get('/', async (req: AuthRequest, res) => { eq(conversationMembers.userId, userId), showArchived ? undefined : ne(conversationMembers.isArchived, true), ), - with: { + with: { conversation: getConversationRelations(req.auth!.deviceId) as never, }, })) as unknown as Array<{ @@ -114,7 +114,6 @@ conversationsRouter.get('/', async (req: AuthRequest, res) => { conversation: ConversationPayload; }>; - // Single subquery for message counts — no N+1 const conversationIds = memberships.map((m) => m.conversationId); const countRows = diff --git a/apps/backend/src/routes/treasury.ts b/apps/backend/src/routes/treasury.ts index 6abee31..f11d342 100644 --- a/apps/backend/src/routes/treasury.ts +++ b/apps/backend/src/routes/treasury.ts @@ -1,9 +1,4 @@ - -import { Router } from 'express'; -import type { IRouter } from 'express'; - import { Router, type IRouter } from 'express'; - import { z } from 'zod'; import { requireAuth, type AuthRequest } from '../middleware/auth.js'; import { validate } from '../middleware/validate.js'; diff --git a/apps/backend/src/socket/messaging.ts b/apps/backend/src/socket/messaging.ts index 8fe77fe..5426137 100644 --- a/apps/backend/src/socket/messaging.ts +++ b/apps/backend/src/socket/messaging.ts @@ -1,5 +1,6 @@ import type { Server } from 'socket.io'; import { and, eq, lt, desc, sql, inArray } from 'drizzle-orm'; + import { db } from '../db/index.js'; import { conversations, @@ -27,19 +28,21 @@ export function registerMessagingHandlers(io: Server, socket: AuthSocket): void const idx = timerKey.indexOf(':'); const cid = idx === -1 ? timerKey : timerKey.slice(0, idx); const did = idx === -1 ? undefined : timerKey.slice(idx + 1); + const rp: { conversationId: string; userId: string; deviceId?: string } = { conversationId: cid, userId, }; + if (did) rp.deviceId = did; + socket.to(cid).emit('typing_stop', rp); } + typingTimers.clear(); }); // ── join_room ────────────────────────────────────────────────────────────── - // Payload: { conversationId: string } - // Guards that the caller is a member before subscribing them to the room. socket.on('join_room', async (payload: { conversationId: string }) => { const { conversationId } = payload; @@ -60,30 +63,19 @@ export function registerMessagingHandlers(io: Server, socket: AuthSocket): void }); // ── send_message ─────────────────────────────────────────────────────────── - // Payload: { conversationId, messageId, contentType, ciphertext, envelopes } - // Persists the message and broadcasts it to all room members. socket.on( 'send_message', async (payload: { conversationId: string; - messageId: string; + messageId?: string; + content?: string; contentType?: string; ciphertext?: string; envelopes?: Array<{ recipientDeviceId: string; ciphertext: string }>; }) => { - const { conversationId, messageId, contentType, ciphertext, envelopes } = payload; + const { conversationId, messageId, content, contentType, ciphertext, envelopes } = payload; const deviceId = socket.auth!.deviceId; - if (!messageId) { - socket.emit('error', { event: 'send_message', message: 'messageId is required' }); - return; - } - - if (!ciphertext?.trim() && (!envelopes || envelopes.length === 0)) { - socket.emit('error', { event: 'send_message', message: 'Message content is empty' }); - return; - } - const membership = await db.query.conversationMembers.findFirst({ where: and( eq(conversationMembers.conversationId, conversationId), @@ -99,7 +91,38 @@ export function registerMessagingHandlers(io: Server, socket: AuthSocket): void return; } - // Idempotency check + // Clear active typing state as soon as the member attempts to send. + for (const [timerKey, timer] of typingTimers.entries()) { + if (timerKey === conversationId || timerKey.startsWith(`${conversationId}:`)) { + clearTimeout(timer); + typingTimers.delete(timerKey); + + const idx = timerKey.indexOf(':'); + const did = idx === -1 ? undefined : timerKey.slice(idx + 1); + + const rp: { conversationId: string; userId: string; deviceId?: string } = { + conversationId, + userId, + }; + + if (did) rp.deviceId = did; + + socket.to(conversationId).emit('typing_stop', rp); + } + } + + if (!messageId) { + socket.emit('error', { event: 'send_message', message: 'messageId is required' }); + return; + } + + const effectiveCiphertext = ciphertext ?? content ?? null; + + if (!effectiveCiphertext?.trim() && (!envelopes || envelopes.length === 0)) { + socket.emit('error', { event: 'send_message', message: 'Message content is empty' }); + return; + } + const existing = await db.query.messages.findFirst({ where: eq(messages.id, messageId), columns: { sequenceNumber: true }, @@ -110,27 +133,6 @@ export function registerMessagingHandlers(io: Server, socket: AuthSocket): void return; } - - for (const [timerKey, timer] of typingTimers.entries()) { - if (timerKey === conversationId || timerKey.startsWith(`${conversationId}:`)) { - clearTimeout(timer); - typingTimers.delete(timerKey); - const idx = timerKey.indexOf(':'); - const did = idx === -1 ? undefined : timerKey.slice(idx + 1); - const rp: { conversationId: string; userId: string; deviceId?: string } = { - conversationId, - userId, - }; - if (did) rp.deviceId = did; - socket.to(conversationId).emit('typing_stop', rp); - } - } - - const members = await db.query.conversationMembers.findMany({ - where: eq(conversationMembers.conversationId, conversationId), - columns: { userId: true }, - }); - const [message] = await db .insert(messages) .values({ @@ -139,17 +141,18 @@ export function registerMessagingHandlers(io: Server, socket: AuthSocket): void senderId: userId, senderDeviceId: deviceId, contentType: contentType || 'text/plain', - ciphertext: ciphertext || null, + ciphertext: effectiveCiphertext, }) .returning(); - if (envelopes && envelopes.length > 0) { const deviceIds = envelopes.map((e) => e.recipientDeviceId); + const devicesList = await db.query.userDevices.findMany({ where: inArray(userDevices.id, deviceIds), columns: { id: true, userId: true }, }); + const deviceToUser = new Map(devicesList.map((d) => [d.id, d.userId])); const validEnvelopes = envelopes @@ -166,13 +169,13 @@ export function registerMessagingHandlers(io: Server, socket: AuthSocket): void } } - // Emit acknowledgment to sender if (message) { - socket.emit('message_ack', { messageId, sequenceNumber: message.sequenceNumber }); + socket.emit('message_ack', { + messageId, + sequenceNumber: message.sequenceNumber, + }); } - // Deliver: storage is guaranteed above; pipeline re-validates membership, - // resolves active devices, and pushes each device exactly its envelope. await deliverMessage(io, message, conversationId); const members = await db.query.conversationMembers.findMany({ @@ -184,14 +187,7 @@ export function registerMessagingHandlers(io: Server, socket: AuthSocket): void }, ); - // ── edit_message ───────────────────────────────────────────────────────────── - // Payload: { originalMessageId, messageId, contentType?, ciphertext?, envelopes? } - // An edit is never an in-place plaintext mutation (#190). It is a brand-new - // message carrying fresh ciphertext + envelopes, linked back to the original - // via `editsMessageId`. Only the original sender may edit. We broadcast both - // `new_message` (so devices receive the new ciphertext to decrypt) and - // `message_edited` (so clients render the newest version with an "edited" - // marker and supersede the original). + // ── edit_message ─────────────────────────────────────────────────────────── socket.on( 'edit_message', async (payload: { @@ -226,7 +222,6 @@ export function registerMessagingHandlers(io: Server, socket: AuthSocket): void return; } - // Edit authorship is restricted to the original sender. if (original.senderId !== userId) { socket.emit('error', { event: 'edit_message', @@ -235,12 +230,9 @@ export function registerMessagingHandlers(io: Server, socket: AuthSocket): void return; } - // Always link to the root original so a chain of edits collapses to one - // logical message: editing an edit still points back to the first version. const rootMessageId = original.editsMessageId ?? original.id; const conversationId = original.conversationId; - // Idempotency: a retried edit with the same new messageId is a no-op. const existing = await db.query.messages.findFirst({ where: eq(messages.id, messageId), columns: { sequenceNumber: true }, @@ -266,10 +258,12 @@ export function registerMessagingHandlers(io: Server, socket: AuthSocket): void if (envelopes && envelopes.length > 0) { const deviceIds = envelopes.map((e) => e.recipientDeviceId); + const devicesList = await db.query.userDevices.findMany({ where: inArray(userDevices.id, deviceIds), columns: { id: true, userId: true }, }); + const deviceToUser = new Map(devicesList.map((d) => [d.id, d.userId])); const validEnvelopes = envelopes @@ -306,8 +300,6 @@ export function registerMessagingHandlers(io: Server, socket: AuthSocket): void ); // ── message_history ──────────────────────────────────────────────────────── - // Payload: { conversationId: string; before?: string } (before = message id cursor) - // Returns the last PAGE_SIZE messages, optionally before a cursor for pagination. socket.on('message_history', async (payload: { conversationId: string; before?: string }) => { const { conversationId, before } = payload; @@ -327,6 +319,7 @@ export function registerMessagingHandlers(io: Server, socket: AuthSocket): void } let cursor: Date | undefined; + if (before) { const ref = await db.query.messages.findFirst({ where: eq(messages.id, before), @@ -350,8 +343,6 @@ export function registerMessagingHandlers(io: Server, socket: AuthSocket): void }); // ── delete_message ───────────────────────────────────────────────────────── - // Payload: { messageId: string } - // Sender retraction socket.on('delete_message', async (payload: { messageId: string }) => { const { messageId } = payload; if (!messageId) return; @@ -369,14 +360,13 @@ export function registerMessagingHandlers(io: Server, socket: AuthSocket): void .update(messages) .set({ deletedAt: new Date(), ciphertext: null }) .where(eq(messages.id, messageId)); + await db.delete(messageEnvelopes).where(eq(messageEnvelopes.messageId, messageId)); io.to(message.conversationId).emit('message_deleted', { messageId }); }); // ── message_read ─────────────────────────────────────────────────────────── - // Payload: { conversationId: string; lastReadMessageId: string } - // Persists the caller's read position and broadcasts to the room. socket.on( 'message_read', async (payload: { conversationId: string; lastReadMessageId: string }) => { @@ -397,7 +387,6 @@ export function registerMessagingHandlers(io: Server, socket: AuthSocket): void return; } - // Ensure message exists in this conversation (prevents spoofed reads) const message = await db.query.messages.findFirst({ where: and(eq(messages.id, lastReadMessageId), eq(messages.conversationId, conversationId)), }); @@ -422,15 +411,12 @@ export function registerMessagingHandlers(io: Server, socket: AuthSocket): void io.to(conversationId).volatile.emit('read_receipt', { userId, lastReadMessageId }); - // Persist this receipt to each member's resume stream so a member who is - // offline right now can replay it on reconnect. The receipt is ephemeral - // (Redis only) — the underlying messages are recovered via envelope sync. - // Skip the member lookup entirely when there is no stream to write to. if (redis) { const members = await db.query.conversationMembers.findMany({ where: eq(conversationMembers.conversationId, conversationId), columns: { userId: true }, }); + await publishEphemeral( redis, members.map((member) => member.userId), @@ -440,15 +426,9 @@ export function registerMessagingHandlers(io: Server, socket: AuthSocket): void }, ); - // ── resume ─────────────────────────────────────────────────────────────────── - // Payload: { lastEventId?: string } - // On reconnect, replay the lightweight ephemeral events this device missed - // (receipts, presence, system notices) from its short-lived Redis stream, then - // tell the client to run a full envelope sync for durable messages — which live - // in Postgres and are intentionally never placed on the resume stream. + // ── resume ──────────────────────────────────────────────────────────────── socket.on('resume', async (payload: { lastEventId?: string }) => { if (!redis) { - // No replay backend available; the client must fall back to a full sync. socket.emit('resume_complete', { lastEventId: null, syncRequired: true }); return; } @@ -456,17 +436,20 @@ export function registerMessagingHandlers(io: Server, socket: AuthSocket): void const lastEventId = typeof payload?.lastEventId === 'string' ? payload.lastEventId : ''; const missed = await readMissedEvents(redis, userId, lastEventId); + for (const event of missed) { - socket.emit('ephemeral_replay', { id: event.id, type: event.type, data: event.data }); + socket.emit('ephemeral_replay', { + id: event.id, + type: event.type, + data: event.data, + }); } const newCursor = missed.length > 0 ? missed[missed.length - 1]!.id : lastEventId || null; socket.emit('resume_complete', { lastEventId: newCursor, syncRequired: true }); }); - // ── create_conversation ──────────────────────────────────────────────────── - // Payload: { type: 'dm'|'group'; name?: string; memberIds: string[] } - // Creates a conversation and adds all members (including caller). + // ── create_conversation ─────────────────────────────────────────────────── socket.on( 'create_conversation', async (payload: { type: 'dm' | 'group'; name?: string; memberIds: string[] }) => { @@ -493,9 +476,8 @@ export function registerMessagingHandlers(io: Server, socket: AuthSocket): void await invalidateConversationCaches(allMembers); }, ); - // ── typing_start ──────────────────────────────────────────────────────────── - // Payload: { conversationId: string; deviceId?: string } - // Broadcasts to the room excluding the sender via Pub/Sub. Zero DB write. Auto-expires. + + // ── typing_start ────────────────────────────────────────────────────────── socket.on( 'typing_start', async (payload?: { conversationId?: string; deviceId?: string; [key: string]: unknown }) => { @@ -518,7 +500,6 @@ export function registerMessagingHandlers(io: Server, socket: AuthSocket): void ), }); - if (!membership) { socket.emit('error', { event: 'typing_start', @@ -532,6 +513,7 @@ export function registerMessagingHandlers(io: Server, socket: AuthSocket): void conversationId, userId, }; + if (typeof payload.deviceId === 'string' && payload.deviceId.trim()) { relayPayload.deviceId = payload.deviceId.trim(); } @@ -539,6 +521,7 @@ export function registerMessagingHandlers(io: Server, socket: AuthSocket): void const timerKey = relayPayload.deviceId ? `${conversationId}:${relayPayload.deviceId}` : conversationId; + const existingTimer = typingTimers.get(timerKey); if (existingTimer) { clearTimeout(existingTimer); @@ -555,13 +538,7 @@ export function registerMessagingHandlers(io: Server, socket: AuthSocket): void }, ); - socket.to(conversationId).volatile.emit('typing_start', { conversationId, userId }); - }); - - - // ── typing_stop ───────────────────────────────────────────────────────────── - // Payload: { conversationId: string; deviceId?: string } - // Broadcasts to the room excluding the sender via Pub/Sub. Zero DB write. + // ── typing_stop ─────────────────────────────────────────────────────────── socket.on( 'typing_stop', async (payload?: { conversationId?: string; deviceId?: string; [key: string]: unknown }) => { @@ -584,7 +561,6 @@ export function registerMessagingHandlers(io: Server, socket: AuthSocket): void ), }); - if (!membership) { socket.emit('error', { event: 'typing_stop', @@ -598,6 +574,7 @@ export function registerMessagingHandlers(io: Server, socket: AuthSocket): void conversationId, userId, }; + if (typeof payload.deviceId === 'string' && payload.deviceId.trim()) { relayPayload.deviceId = payload.deviceId.trim(); } @@ -605,6 +582,7 @@ export function registerMessagingHandlers(io: Server, socket: AuthSocket): void const timerKey = relayPayload.deviceId ? `${conversationId}:${relayPayload.deviceId}` : conversationId; + const existingTimer = typingTimers.get(timerKey); if (existingTimer) { clearTimeout(existingTimer); @@ -615,14 +593,7 @@ export function registerMessagingHandlers(io: Server, socket: AuthSocket): void }, ); - socket.to(conversationId).volatile.emit('typing_stop', { conversationId, userId }); - }); - - - // ── ask_assistant ────────────────────────────────────────────────────────── - // Payload: { conversationId: string; content: string } - // Forwards to AI agent and posts reply from reserved assistant user. - // Rate-limit: 5 requests per user per minute. + // ── ask_assistant ───────────────────────────────────────────────────────── const ASSISTANT_USER_ID = '00000000-0000-4000-8000-000000000000'; socket.on('ask_assistant', async (payload: { conversationId: string; content: string }) => { @@ -647,28 +618,25 @@ export function registerMessagingHandlers(io: Server, socket: AuthSocket): void return; } - // Rate limiting if (redis) { const rlKey = `rl:ask_assistant:${userId}`; const count = await redis.incr(rlKey); + if (count === 1) { await redis.expire(rlKey, 60); } + if (count > 5) { socket.emit('error', { event: 'rate_limited', message: 'Rate limit exceeded' }); return; } } - // Forward to AI agent try { const response = await fetch('http://localhost:8000/chat', { method: 'POST', headers: { 'Content-Type': 'application/json' }, - body: JSON.stringify({ - message: content, - conversation_id: conversationId, - }), + body: JSON.stringify({ message: content, conversation_id: conversationId }), }); if (!response.ok) { @@ -677,23 +645,22 @@ export function registerMessagingHandlers(io: Server, socket: AuthSocket): void const data = (await response.json()) as { reply: string }; - // Ensure assistant user exists (upsert) - // Usually done via migration, but we can safely do it here or assume it exists. - // To be safe, we'll try to insert it and ignore conflict. await db.execute(sql` INSERT INTO users (id, username, avatar_url) - VALUES (${ASSISTANT_USER_ID}, 'Assistant', 'https://ui-avatars.com/api/?name=AI&background=0D8ABC&color=fff') + VALUES ( + ${ASSISTANT_USER_ID}, + 'Assistant', + 'https://ui-avatars.com/api/?name=AI&background=0D8ABC&color=fff' + ) ON CONFLICT (id) DO NOTHING `); - // Add to conversation members if not already await db.execute(sql` INSERT INTO conversation_members (conversation_id, user_id) VALUES (${conversationId}, ${ASSISTANT_USER_ID}) ON CONFLICT DO NOTHING `); - // Post the reply const [replyMessage] = await db .insert(messages) .values({