Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 32 additions & 8 deletions apps/backend/src/db/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,18 +67,23 @@ export const messages = pgTable(
senderId: uuid('sender_id')
.notNull()
.references(() => users.id, { onDelete: 'cascade' }),
content: text('content').notNull(),
senderDeviceId: uuid('sender_device_id')
.notNull()
.references(() => userDevices.id, { onDelete: 'cascade' }),
createdAt: timestamp('created_at').notNull().defaultNow(),
deletedAt: timestamp('deleted_at'),
},
(table) => [
index('messages_content_search_idx').using(
'gin',
sql`to_tsvector('english', ${table.content})`,
),
],
);

export const messageEnvelopes = pgTable('message_envelopes', {
id: uuid('id').primaryKey().defaultRandom(),
messageId: uuid('message_id')
.notNull()
.references(() => messages.id, { onDelete: 'cascade' }),
content: text('content').notNull(),
createdAt: timestamp('created_at').notNull().defaultNow(),
});

// ─── Devices & prekeys (issues #158, #159, #162) ─────────────────────────────
//
// Each user may register multiple devices. Each device has an Ed25519 identity
Expand Down Expand Up @@ -259,12 +264,24 @@ export const conversationMembersRelations = relations(conversationMembers, ({ on
user: one(users, { fields: [conversationMembers.userId], references: [users.id] }),
}));

export const messagesRelations = relations(messages, ({ one }) => ({
export const messagesRelations = relations(messages, ({ one, many }) => ({
conversation: one(conversations, {
fields: [messages.conversationId],
references: [conversations.id],
}),
sender: one(users, { fields: [messages.senderId], references: [users.id] }),
senderDevice: one(userDevices, {
fields: [messages.senderDeviceId],
references: [userDevices.id],
}),
envelopes: many(messageEnvelopes),
}));

export const messageEnvelopesRelations = relations(messageEnvelopes, ({ one }) => ({
message: one(messages, {
fields: [messageEnvelopes.messageId],
references: [messages.id],
}),
}));

export const tokenTransfersRelations = relations(tokenTransfers, ({ one }) => ({
Expand All @@ -284,6 +301,11 @@ export const devicesRelations = relations(devices, ({ one, many }) => ({
oneTimePreKeys: many(oneTimePreKeys),
}));

export const userDevicesRelations = relations(userDevices, ({ one, many }) => ({
user: one(users, { fields: [userDevices.userId], references: [users.id] }),
messages: many(messages),
}));

export const signedPreKeysRelations = relations(signedPreKeys, ({ one }) => ({
device: one(devices, { fields: [signedPreKeys.deviceId], references: [devices.id] }),
}));
Expand All @@ -303,6 +325,8 @@ export type NewConversation = typeof conversations.$inferInsert;
export type ConversationMember = typeof conversationMembers.$inferSelect;
export type Message = typeof messages.$inferSelect;
export type NewMessage = typeof messages.$inferInsert;
export type MessageEnvelope = typeof messageEnvelopes.$inferSelect;
export type NewMessageEnvelope = typeof messageEnvelopes.$inferInsert;
export type TokenTransfer = typeof tokenTransfers.$inferSelect;
export type NewTokenTransfer = typeof tokenTransfers.$inferInsert;
export type Device = typeof devices.$inferSelect;
Expand Down
18 changes: 12 additions & 6 deletions apps/backend/src/lib/messages.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,21 @@
type MessageLike = {
content: string | null;
deletedAt?: Date | null;
import type { Message, MessageEnvelope } from '../db/schema.js';

type MessageWithEnvelopes = Message & {
envelopes?: MessageEnvelope[];
};

export function serializeMessage<T extends MessageLike>(
export function serializeMessage<T extends MessageWithEnvelopes>(
message: T,
): Omit<T, 'deletedAt'> & { content: string | null } {
const { deletedAt, ...rest } = message;
const content = deletedAt
? null
: message.envelopes && message.envelopes.length > 0
? message.envelopes[0].content
: null;

return {
...rest,
content: deletedAt ? null : message.content,
};
content,
} as Omit<T, 'deletedAt'> & { content: string | null };
}
29 changes: 20 additions & 9 deletions apps/backend/src/routes/conversations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { Router } from 'express';
import type { IRouter } from 'express';
import { asc, and, count, desc, eq, lt, sql, ne } from 'drizzle-orm';
import { db } from '../db/index.js';
import { conversationMembers, conversations, messages, tokenTransfers } from '../db/schema.js';
import { messageEnvelopes } from '../db/schema.js';
import { requireAuth, type AuthRequest } from '../middleware/auth.js';
import { redis, CONV_CACHE_TTL, convCacheKey } from '../lib/redis.js';
import { invalidateConversationCaches } from '../lib/conversationCache.js';
Expand All @@ -28,7 +28,11 @@ const conversationRelations = {
messages: {
orderBy: desc(messages.createdAt),
limit: 1,
with: { sender: { columns: { id: true, username: true, avatarUrl: true } } },
with: {
sender: { columns: { id: true, username: true, avatarUrl: true } },
envelopes: true,
senderDevice: true,
},
},
} as const;

Expand Down Expand Up @@ -230,7 +234,9 @@ conversationsRouter.get('/:id/members', async (req: AuthRequest, res) => {
with: {
user: {
columns: { id: true, username: true, avatarUrl: true },
with: { wallets: { columns: { address: true, isPrimary: true } } },
with: {
wallets: { columns: { address: true, isPrimary: true } },
},
},
},
})) as ConversationMemberPayload[];
Expand Down Expand Up @@ -471,7 +477,11 @@ conversationsRouter.get('/:id/messages', async (req: AuthRequest, res) => {
: eq(messages.conversationId, conversationId),
orderBy: desc(messages.createdAt),
limit: limit + 1,
with: { sender: { columns: { id: true, username: true, avatarUrl: true } } },
with: {
sender: { columns: { id: true, username: true, avatarUrl: true } },
envelopes: true,
senderDevice: true,
},
});

const hasMore = rows.length > limit;
Expand Down Expand Up @@ -528,19 +538,20 @@ conversationsRouter.get('/:id/search', async (req: AuthRequest, res) => {
${messages.id} AS "id",
${messages.conversationId} AS "conversationId",
${messages.senderId} AS "senderId",
${messages.content} AS "content",
${messageEnvelopes.content} AS "content",
${messages.createdAt} AS "createdAt",
ts_headline(
'english',
${messages.content},
${messageEnvelopes.content},
search_query.query,
'StartSel=<mark>, StopSel=</mark>, MaxWords=24, MinWords=8, ShortWord=3, HighlightAll=false'
) AS "snippet",
ts_rank_cd(to_tsvector('english', ${messages.content}), search_query.query) AS "rank"
FROM ${messages}, search_query
ts_rank_cd(to_tsvector('english', ${messageEnvelopes.content}), search_query.query) AS "rank"
FROM ${messages}
JOIN ${messageEnvelopes} ON ${messageEnvelopes.messageId} = ${messages.id}, search_query
WHERE ${messages.conversationId} = ${conversationId}
AND ${messages.deletedAt} IS NULL
AND search_query.query @@ to_tsvector('english', ${messages.content})
AND search_query.query @@ to_tsvector('english', ${messageEnvelopes.content})
ORDER BY "rank" DESC, ${messages.createdAt} DESC
LIMIT ${SEARCH_RESULT_LIMIT}
`);
Expand Down
79 changes: 73 additions & 6 deletions apps/backend/src/socket/messaging.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import type { Server } from 'socket.io';
import { and, eq, lt, desc, sql } from 'drizzle-orm';
import { db } from '../db/index.js';
import { conversations, conversationMembers, messages } from '../db/schema.js';
import { conversations, conversationMembers, messages, messageEnvelopes, userDevices } from '../db/schema.js';
import type { AuthSocket } from '../middleware/socketAuth.js';
import { invalidateConversationCaches } from '../lib/conversationCache.js';
import { serializeMessage } from '../lib/messages.js';
Expand Down Expand Up @@ -57,12 +57,41 @@ export function registerMessagingHandlers(io: Server, socket: AuthSocket): void
return;
}

// Get the user's primary device (or first active device)
const userDevice = await db.query.userDevices.findFirst({
where: and(eq(userDevices.userId, userId), eq(userDevices.revokedAt, null)),
});

if (!userDevice) {
socket.emit('error', { event: 'send_message', message: 'No active device found' });
return;
}

const [message] = await db
.insert(messages)
.values({ conversationId, senderId: userId, content: content.trim() })
.values({
conversationId,
senderId: userId,
senderDeviceId: userDevice.id,
})
.returning();

io.to(conversationId).emit('new_message', message);
// Create the message envelope with the content
await db.insert(messageEnvelopes).values({
messageId: message.id,
content: content.trim(),
});

// Fetch the complete message with envelopes
const completeMessage = await db.query.messages.findFirst({
where: eq(messages.id, message.id),
with: {
envelopes: true,
senderDevice: true,
},
});

io.to(conversationId).emit('new_message', completeMessage);

const members = await db.query.conversationMembers.findMany({
where: eq(conversationMembers.conversationId, conversationId),
Expand Down Expand Up @@ -107,7 +136,11 @@ export function registerMessagingHandlers(io: Server, socket: AuthSocket): void
: eq(messages.conversationId, conversationId),
orderBy: desc(messages.createdAt),
limit: PAGE_SIZE,
with: { sender: { columns: { id: true, username: true, avatarUrl: true } } },
with: {
envelopes: true,
senderDevice: true,
sender: { columns: { id: true, username: true, avatarUrl: true } },
},
});

socket.emit('message_history', {
Expand Down Expand Up @@ -311,17 +344,51 @@ export function registerMessagingHandlers(io: Server, socket: AuthSocket): void
ON CONFLICT DO NOTHING
`);

// Get or create an assistant device
let assistantDevice = await db.query.userDevices.findFirst({
where: eq(userDevices.userId, ASSISTANT_USER_ID),
});

if (!assistantDevice) {
const [newDevice] = await db
.insert(userDevices)
.values({
userId: ASSISTANT_USER_ID,
deviceId: 'assistant-device',
deviceName: 'Assistant',
platform: 'web',
identityPublicKey: 'assistant-public-key',
})
.returning();
assistantDevice = newDevice;
}

// Post the reply
const [replyMessage] = await db
.insert(messages)
.values({
conversationId,
senderId: ASSISTANT_USER_ID,
content: data.reply,
senderDeviceId: assistantDevice.id,
})
.returning();

io.to(conversationId).emit('new_message', replyMessage);
// Create the message envelope with the content
await db.insert(messageEnvelopes).values({
messageId: replyMessage.id,
content: data.reply,
});

// Fetch the complete message with envelopes
const completeMessage = await db.query.messages.findFirst({
where: eq(messages.id, replyMessage.id),
with: {
envelopes: true,
senderDevice: true,
},
});

io.to(conversationId).emit('new_message', completeMessage);

const members = await db.query.conversationMembers.findMany({
where: eq(conversationMembers.conversationId, conversationId),
Expand Down