Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions backend/src/controllers/stream.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,34 @@ function sumStringI128(values: string[]): string {
*/
export const createStream = async (req: Request, res: Response) => {
try {
const callerPublicKey = (req as AuthenticatedRequest).user?.publicKey;
if (!callerPublicKey) {
return res.status(401).json({ error: 'Unauthorized', message: 'Authentication required' });
}

const { streamId, sender, recipient, tokenAddress, ratePerSecond, depositedAmount, startTime } = req.body;

// Issue #809: validate identity fields before any DB write.
if (typeof sender !== 'string' || sender.length === 0) {
return res.status(400).json({ error: 'Invalid sender: must be a non-empty string' });
}
if (typeof recipient !== 'string' || recipient.length === 0) {
return res.status(400).json({ error: 'Invalid recipient: must be a non-empty string' });
}
if (typeof tokenAddress !== 'string' || tokenAddress.length === 0) {
return res.status(400).json({ error: 'Invalid tokenAddress: must be a non-empty string' });
}

// Issue #809: the authenticated wallet may only create/modify streams it owns.
// Without this, any logged-in wallet could POST an arbitrary `sender` and have
// it persisted, or flip another owner's cancelled stream back to active.
if (sender !== callerPublicKey) {
return res.status(403).json({
error: 'Forbidden',
message: 'sender must match the authenticated wallet',
});
}

const parsedStreamId = Number.parseInt(streamId, 10);
const parsedStartTime = Number.parseInt(startTime, 10);
const parsedRatePerSecond = BigInt(ratePerSecond);
Expand All @@ -91,6 +117,18 @@ export const createStream = async (req: Request, res: Response) => {

const endTime = parsedStartTime + Number(parsedDepositedAmount / parsedRatePerSecond);

// Issue #809: never let the upsert update branch touch a stream owned by a
// different wallet. The caller is already proven to equal `sender` above, so
// reject any existing row whose sender differs — this blocks reactivating or
// overwriting someone else's (e.g. cancelled) stream.
const existing = await prisma.stream.findUnique({ where: { streamId: parsedStreamId } });
if (existing && existing.sender !== callerPublicKey) {
return res.status(403).json({
error: 'Forbidden',
message: 'Cannot modify a stream owned by another wallet',
});
}

const stream = await prisma.stream.upsert({
where: { streamId: parsedStreamId },
update: {
Expand Down
49 changes: 49 additions & 0 deletions backend/tests/stream.controller.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ describe('Stream Controller', () => {
query: {},
params: {},
};
// Authenticated caller matches body.sender by default (Issue #809).
(req as any).user = { publicKey: 'GSENDER' };
res = {
status: vi.fn().mockReturnThis(),
json: vi.fn().mockReturnThis(),
Expand All @@ -70,6 +72,7 @@ describe('Stream Controller', () => {

describe('createStream', () => {
it('should create a stream successfully', async () => {
(prisma.stream.findUnique as any).mockResolvedValue(null);
(prisma.stream.upsert as any).mockResolvedValue({ streamId: 123 });

await createStream(req as Request, res as Response);
Expand All @@ -78,6 +81,52 @@ describe('Stream Controller', () => {
expect(prisma.stream.upsert).toHaveBeenCalled();
});

it('should return 401 when the request is unauthenticated', async () => {
(req as any).user = undefined;

await createStream(req as Request, res as Response);

expect(res.status).toHaveBeenCalledWith(401);
expect(prisma.stream.upsert).not.toHaveBeenCalled();
});

it('should return 403 when the caller is not the body sender (Issue #809)', async () => {
(req as any).user = { publicKey: 'GATTACKER' };

await createStream(req as Request, res as Response);

expect(res.status).toHaveBeenCalledWith(403);
expect(prisma.stream.upsert).not.toHaveBeenCalled();
});

it('should reject 400 when sender is missing (Issue #809)', async () => {
delete req.body.sender;
(req as any).user = { publicKey: 'GSENDER' };

await createStream(req as Request, res as Response);

expect(res.status).toHaveBeenCalledWith(400);
expect(prisma.stream.upsert).not.toHaveBeenCalled();
});

it('should return 403 and not reactivate a cancelled stream owned by another wallet (Issue #809)', async () => {
// A victim previously created (and cancelled) this stream.
(prisma.stream.findUnique as any).mockResolvedValue({
streamId: 123,
sender: 'GVICTIM',
isActive: false,
});
// Attacker authenticates as themselves and sets sender to their own key,
// trying to hijack the victim's streamId.
req.body.sender = 'GATTACKER';
(req as any).user = { publicKey: 'GATTACKER' };

await createStream(req as Request, res as Response);

expect(res.status).toHaveBeenCalledWith(403);
expect(prisma.stream.upsert).not.toHaveBeenCalled();
});

it('should return 400 for invalid streamId', async () => {
req.body.streamId = 'abc';
await createStream(req as Request, res as Response);
Expand Down
Loading