Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions apps/backend/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
"license": "ISC",
"packageManager": "pnpm@10.28.1",
"dependencies": {
"@aws-sdk/client-s3": "^3.500.0",
"@aws-sdk/s3-request-presigner": "^3.500.0",
"@socket.io/redis-adapter": "^8.3.0",
"@stellar/stellar-sdk": "^15.1.0",
"cors": "^2.8.6",
Expand Down
2 changes: 2 additions & 0 deletions apps/backend/src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { devicesRouter } from './routes/devices.js';
import { messagesRouter } from './routes/messages.js';
import { usersRouter } from './routes/users.js';
import { treasuryRouter } from './routes/treasury.js';
import { uploadsRouter } from './routes/uploads.js';
import { requireAuth, type AuthRequest } from './middleware/auth.js';

const packageJson = JSON.parse(
Expand Down Expand Up @@ -51,6 +52,7 @@ app.use('/devices', devicesRouter);
app.use('/messages', messagesRouter);
app.use('/users', usersRouter);
app.use('/treasury', treasuryRouter);
app.use('/uploads', uploadsRouter);

app.get('/me', requireAuth, (req, res) => {
res.json({ user: (req as AuthRequest).auth });
Expand Down
52 changes: 52 additions & 0 deletions apps/backend/src/cron/garbageCollection.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import { eq, and, lt } from 'drizzle-orm';
import { db } from '../db/index.js';
import { files } from '../db/schema.js';
import { DeleteObjectCommand } from '@aws-sdk/client-s3';
import { s3Client, S3_BUCKET } from '../lib/s3.js';

const ONE_HOUR_MS = 60 * 60 * 1000;

export async function garbageCollectPendingFiles() {
try {
const oneHourAgo = new Date(Date.now() - ONE_HOUR_MS);

// Find unconfirmed pending files older than 1 hour
const pendingFiles = await db.query.files.findMany({
where: and(
eq(files.status, 'pending'),
lt(files.createdAt, oneHourAgo)
)
});

if (pendingFiles.length === 0) return;

for (const file of pendingFiles) {
try {
// Attempt to delete from S3 just in case the file was uploaded but not confirmed
const command = new DeleteObjectCommand({
Bucket: S3_BUCKET,
Key: file.objectKey,
});
await s3Client.send(command).catch(err => {
console.warn(`Failed to delete object from S3: ${file.objectKey}`, err);
});

// Delete from database
await db.delete(files).where(eq(files.id, file.id));

console.log(`Garbage collected pending file: ${file.id}`);
} catch (err) {
console.error(`Error garbage collecting file ${file.id}:`, err);
}
}
} catch (error) {
console.error('Error in garbage collection:', error);
}
}

export function startGarbageCollectionCron() {
// Run every 10 minutes
setInterval(() => {
void garbageCollectPendingFiles();
}, 10 * 60 * 1000);
}
25 changes: 25 additions & 0 deletions apps/backend/src/db/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,21 @@ export const conversationMembers = pgTable('conversation_members', {
joinedAt: timestamp('joined_at').notNull().defaultNow(),
});

export const fileStatusEnum = pgEnum('file_status', ['pending', 'ready']);

export const files = pgTable('files', {
id: uuid('id').primaryKey().defaultRandom(),
uploaderId: uuid('uploader_id')
.notNull()
.references(() => users.id, { onDelete: 'cascade' }),
objectKey: text('object_key').notNull(),
status: fileStatusEnum('status').notNull().default('pending'),
size: integer('size').notNull(),
sha256: text('sha256'),
createdAt: timestamp('created_at').notNull().defaultNow(),
updatedAt: timestamp('updated_at').notNull().defaultNow(),
});

export const messages = pgTable(
'messages',
{
Expand All @@ -68,6 +83,7 @@ export const messages = pgTable(
.notNull()
.references(() => users.id, { onDelete: 'cascade' }),
content: text('content').notNull(),
fileId: uuid('file_id').references(() => files.id, { onDelete: 'set null' }),
createdAt: timestamp('created_at').notNull().defaultNow(),
deletedAt: timestamp('deleted_at'),
},
Expand Down Expand Up @@ -236,6 +252,7 @@ export const usersRelations = relations(users, ({ many }) => ({
wallets: many(wallets),
memberships: many(conversationMembers),
messages: many(messages),
files: many(files),
transfers: many(tokenTransfers),
devices: many(devices),
}));
Expand Down Expand Up @@ -265,6 +282,12 @@ export const messagesRelations = relations(messages, ({ one }) => ({
references: [conversations.id],
}),
sender: one(users, { fields: [messages.senderId], references: [users.id] }),
file: one(files, { fields: [messages.fileId], references: [files.id] }),
}));

export const filesRelations = relations(files, ({ one, many }) => ({
uploader: one(users, { fields: [files.uploaderId], references: [users.id] }),
messages: many(messages),
}));

export const tokenTransfersRelations = relations(tokenTransfers, ({ one }) => ({
Expand Down Expand Up @@ -311,3 +334,5 @@ export type SignedPreKey = typeof signedPreKeys.$inferSelect;
export type NewSignedPreKey = typeof signedPreKeys.$inferInsert;
export type OneTimePreKey = typeof oneTimePreKeys.$inferSelect;
export type NewOneTimePreKey = typeof oneTimePreKeys.$inferInsert;
export type File = typeof files.$inferSelect;
export type NewFile = typeof files.$inferInsert;
3 changes: 3 additions & 0 deletions apps/backend/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import {
runForever as runStellarListener,
} from './services/stellarListener.js';
import { loadEnv } from './config.js';
import { startGarbageCollectionCron } from './cron/garbageCollection.js';

dotenv.config();

Expand Down Expand Up @@ -123,6 +124,8 @@ httpServer.listen(PORT, () => {
// Redis is unreachable; on failure we fall back to the in-process adapter.
void attachRedisAdapter();

startGarbageCollectionCron();

// #46 — Stellar transfer event listener. Only spin up when the contract
// id is configured so local-dev and unit-test runs don't try to talk to
// Soroban RPC. The listener never throws out of runForever, so a failed
Expand Down
14 changes: 14 additions & 0 deletions apps/backend/src/lib/s3.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import { S3Client } from '@aws-sdk/client-s3';

export const s3Client = new S3Client({
region: process.env.AWS_REGION || 'us-east-1',
credentials: {
accessKeyId: process.env.AWS_ACCESS_KEY_ID || '',
secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY || '',
},
// If using MinIO or R2, an endpoint can be provided
endpoint: process.env.S3_ENDPOINT,
forcePathStyle: process.env.S3_FORCE_PATH_STYLE === 'true',
});

export const S3_BUCKET = process.env.S3_BUCKET || 'clicked-uploads';
127 changes: 127 additions & 0 deletions apps/backend/src/routes/uploads.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
import { Router } from 'express';
import type { IRouter } from 'express';
import { eq, and } from 'drizzle-orm';
import { HeadObjectCommand, PutObjectCommand } from '@aws-sdk/client-s3';
import { getSignedUrl } from '@aws-sdk/s3-request-presigner';
import { z } from 'zod';
import { db } from '../db/index.js';
import { files } from '../db/schema.js';
import { requireAuth, type AuthRequest } from '../middleware/auth.js';
import { s3Client, S3_BUCKET } from '../lib/s3.js';
import { validate } from '../middleware/validate.js';

export const uploadsRouter: IRouter = Router();

uploadsRouter.use(requireAuth);

const RequestUploadSchema = z.object({
size: z.number().int().positive(),
sha256: z.string().optional(),
mimeType: z.string().optional().default('application/octet-stream'),
});

// 1. Request upload URL
uploadsRouter.post('/', validate(RequestUploadSchema), async (req: AuthRequest, res) => {
const userId = req.auth!.userId;
const { size, sha256, mimeType } = req.body as z.infer<typeof RequestUploadSchema>;

try {
// We insert a pending file record
const [file] = await db
.insert(files)
.values({
uploaderId: userId,
objectKey: `uploads/${userId}/${Date.now()}_${Math.random().toString(36).substring(7)}`,
size,
sha256,
status: 'pending',
})
.returning();

// Generate a presigned URL for upload
const command = new PutObjectCommand({
Bucket: S3_BUCKET,
Key: file.objectKey,
ContentType: mimeType,
...(sha256 ? { ChecksumSHA256: sha256 } : {}),
});

const presignedUrl = await getSignedUrl(s3Client, command, { expiresIn: 3600 });

res.status(201).json({
fileId: file.id,
uploadUrl: presignedUrl,
objectKey: file.objectKey,
});
} catch (error) {
console.error('Error creating presigned URL:', error);
res.status(500).json({ error: 'Failed to request upload' });
}
});

// 2. Complete/confirm upload
uploadsRouter.post('/:id/complete', async (req: AuthRequest, res) => {
const userId = req.auth!.userId;
const fileId = req.params['id'] as string;

try {
const file = await db.query.files.findFirst({
where: and(eq(files.id, fileId), eq(files.uploaderId, userId)),
});

if (!file) {
res.status(404).json({ error: 'File not found' });
return;
}

if (file.status === 'ready') {
res.status(200).json({ fileId: file.id, status: 'ready' });
return;
}

// Server HEADs the object
const command = new HeadObjectCommand({
Bucket: S3_BUCKET,
Key: file.objectKey,
});

const headResponse = await s3Client.send(command);

// Verify size matches
if (headResponse.ContentLength !== file.size) {
res.status(400).json({
error: 'Size mismatch',
expected: file.size,
actual: headResponse.ContentLength
});
return;
}

// Optionally verify ciphertext sha256 (if provided in the database and by S3)
// S3 might return it in ChecksumSHA256 depending on how it was uploaded
if (file.sha256 && headResponse.ChecksumSHA256 && headResponse.ChecksumSHA256 !== file.sha256) {
res.status(400).json({
error: 'Hash mismatch',
expected: file.sha256,
actual: headResponse.ChecksumSHA256
});
return;
}

// Flip the file to ready
const [updatedFile] = await db
.update(files)
.set({ status: 'ready', updatedAt: new Date() })
.where(eq(files.id, fileId))
.returning();

res.status(200).json(updatedFile);
} catch (error: any) {
if (error.name === 'NotFound') {
res.status(400).json({ error: 'Object not found in storage. Ensure upload is complete.' });
return;
}
console.error('Error completing upload:', error);
res.status(500).json({ error: 'Failed to complete upload' });
}
});
26 changes: 20 additions & 6 deletions apps/backend/src/socket/messaging.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import type { Server } from 'socket.io';
import { and, eq, lt, desc, sql } from 'drizzle-orm';
import { db } from '../db/index.js';
import { conversations, conversationMembers, messages } from '../db/schema.js';
import { conversations, conversationMembers, messages, files } from '../db/schema.js';
import type { AuthSocket } from '../middleware/socketAuth.js';
import { invalidateConversationCaches } from '../lib/conversationCache.js';
import { serializeMessage } from '../lib/messages.js';
Expand Down Expand Up @@ -37,11 +37,11 @@ export function registerMessagingHandlers(io: Server, socket: AuthSocket): void
// ── send_message ───────────────────────────────────────────────────────────
// Payload: { conversationId: string; content: string }
// Persists the message and broadcasts it to all room members.
socket.on('send_message', async (payload: { conversationId: string; content: string }) => {
const { conversationId, content } = payload;
socket.on('send_message', async (payload: { conversationId: string; content: string; fileId?: string }) => {
const { conversationId, content, fileId } = payload;

if (!content?.trim()) {
socket.emit('error', { event: 'send_message', message: 'Content must not be empty' });
if (!content?.trim() && !fileId) {
socket.emit('error', { event: 'send_message', message: 'Content or file must be provided' });
return;
}

Expand All @@ -57,9 +57,23 @@ export function registerMessagingHandlers(io: Server, socket: AuthSocket): void
return;
}

if (fileId) {
const file = await db.query.files.findFirst({
where: eq(files.id, fileId),
});
if (!file) {
socket.emit('error', { event: 'send_message', message: 'File not found' });
return;
}
if (file.status !== 'ready') {
socket.emit('error', { event: 'send_message', message: 'File is not ready' });
return;
}
}

const [message] = await db
.insert(messages)
.values({ conversationId, senderId: userId, content: content.trim() })
.values({ conversationId, senderId: userId, content: content?.trim() ?? '', fileId })
.returning();

io.to(conversationId).emit('new_message', message);
Expand Down
Loading