Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 10 additions & 12 deletions prisma/schema/webhook.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,14 @@ enum WebhookEventType {
}

model Webhook {
id String @id @default(cuid())
creatorId String
callbackUrl String
events WebhookEventType[]
isActive Boolean @default(true)
isFailing Boolean @default(false)
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt

id String @id @default(cuid())
creatorId String
callbackUrl String
events WebhookEventType[]
isActive Boolean @default(true)
isFailing Boolean @default(false)
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
events_dispatched WebhookEvent[]

@@index([creatorId])
Expand All @@ -37,9 +36,8 @@ model WebhookEvent {
lastError String?
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt

webhook Webhook @relation(fields: [webhookId], references: [id], onDelete: Cascade)
webhook Webhook @relation(fields: [webhookId], references: [id], onDelete: Cascade)

@@index([webhookId])
@@index([status])
}
}
2 changes: 1 addition & 1 deletion src/modules/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,4 @@ router.use(CREATORS_BASE, webhookRouter);
router.use('/wallets', walletsRouter);
router.use('/alerts', alertsRouter);

export default router;
export default router;
90 changes: 90 additions & 0 deletions src/modules/webhook/webhook.controllers.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import { AsyncController } from '../../types/auth.types';
import { RegisterWebhookSchema, SimulateTradeSchema } from './webhook.schemas';
import { upsertWebhookSubscription, findMatchingSubscriptions } from './webhook.service';
import { sendSuccess, sendValidationError } from '../../utils/api-response.utils';
import { prisma } from '../../utils/prisma.utils';
import { logger } from '../../utils/logger.utils';

export const httpRegisterWebhook: AsyncController = async (req, res, next) => {
try {
const parsed = RegisterWebhookSchema.safeParse(req.body);
if (!parsed.success) {
return sendValidationError(
res,
'Invalid webhook registration payload',
parsed.error.issues.map(issue => ({
field: issue.path.join('.'),
message: issue.message,
}))
);
}

const subscription = await upsertWebhookSubscription(
parsed.data.url,
parsed.data.events
);

sendSuccess(res, subscription);
} catch (error) {
next(error);
}
};

export const httpSimulateTrade: AsyncController = async (req, res, next) => {
try {
const parsed = SimulateTradeSchema.safeParse(req.body);
if (!parsed.success) {
return sendValidationError(
res,
'Invalid trade simulation payload',
parsed.error.issues.map(issue => ({
field: issue.path.join('.'),
message: issue.message,
}))
);
}

const { type, amount, price, creatorId, actor } = parsed.data;

// 1. Create corresponding Activity record
const activityType = type === 'buy' ? 'KEY_BOUGHT' : 'KEY_SOLD';
const activity = await prisma.activity.create({
data: {
type: activityType as any,
actor,
creatorId,
payload: { amount, price },
},
});

// 2. Query subscriptions subscribed to this type
const subscriptions = await findMatchingSubscriptions(type);

// 3. Deliver webhook payloads
await Promise.all(
subscriptions.map(async (sub: any) => {
try {
await fetch(sub.url, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({
event_type: type,
activity,
}),
});
} catch (err: any) {
logger.error(
{ err: err.message, url: sub.url },
'Failed to deliver webhook'
);
}
})
);

sendSuccess(res, { activity, deliveredTo: subscriptions.map((s: any) => s.url) });
} catch (error) {
next(error);
}
};
252 changes: 252 additions & 0 deletions src/modules/webhook/webhook.integration.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,252 @@
import http from 'http';
import { httpRegisterWebhook, httpSimulateTrade } from './webhook.controllers';
import { prisma } from '../../utils/prisma.utils';

// Mock the prisma client to avoid needing a live DB connection
jest.mock('../../utils/prisma.utils', () => ({
prisma: {
activity: {
create: jest.fn(),
},
webhookSubscription: {
upsert: jest.fn(),
findMany: jest.fn(),
},
},
}));

const activityCreateMock = prisma.activity.create as jest.Mock;
const webhookUpsertMock = (prisma as any).webhookSubscription.upsert as jest.Mock;
const webhookFindManyMock = (prisma as any).webhookSubscription.findMany as jest.Mock;

// Helper to mock Express req, res, next
function makeReq(body: any = {}): any {
return { body };
}

function makeRes(): any {
const res: any = {};
res.status = jest.fn().mockReturnValue(res);
res.json = jest.fn().mockReturnValue(res);
return res;
}

function makeNext() {
return jest.fn();
}

describe('Webhook Integration Tests', () => {
let mockServer: http.Server;
let mockServerUrl: string;
let receivedPayloads: any[] = [];

beforeAll((done) => {
// Start a mock HTTP server to receive webhook payloads
mockServer = http.createServer((req, res) => {
let body = '';
req.on('data', chunk => {
body += chunk;
});
req.on('end', () => {
try {
receivedPayloads.push(JSON.parse(body));
} catch (_e) {
// Ignore non-json bodies
}
res.writeHead(200, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ ok: true }));
});
});

mockServer.listen(0, '127.0.0.1', () => {
const address = mockServer.address() as any;
mockServerUrl = `http://127.0.0.1:${address.port}/webhook`;
done();
});
});

afterAll((done) => {
mockServer.close(done);
});

beforeEach(() => {
receivedPayloads = [];
jest.clearAllMocks();
});

it('should register a webhook successfully', async () => {
const req = makeReq({
url: mockServerUrl,
events: ['buy', 'sell'],
});
const res = makeRes();
const next = makeNext();

const mockSub = {
id: 'sub-123',
url: mockServerUrl,
events: ['buy', 'sell'],
createdAt: new Date(),
updatedAt: new Date(),
};
webhookUpsertMock.mockResolvedValue(mockSub);

await httpRegisterWebhook(req, res, next);

expect(res.status).not.toHaveBeenCalledWith(400);
expect(res.json).toHaveBeenCalledWith(
expect.objectContaining({
success: true,
data: expect.objectContaining({
url: mockServerUrl,
events: ['buy', 'sell'],
}),
})
);
expect(webhookUpsertMock).toHaveBeenCalledWith({
where: { url: mockServerUrl },
create: { url: mockServerUrl, events: ['buy', 'sell'] },
update: { events: ['buy', 'sell'] },
});
});

it('should deliver webhook with event_type: buy when a buy trade is simulated', async () => {
const req = makeReq({
type: 'buy',
amount: 5,
price: 15.5,
creatorId: 'creator-abc',
actor: 'user-xyz',
});
const res = makeRes();
const next = makeNext();

const mockActivity = {
id: 'activity-buy-1',
type: 'KEY_BOUGHT',
actor: 'user-xyz',
creatorId: 'creator-abc',
payload: { amount: 5, price: 15.5 },
createdAt: new Date(),
};

const mockSubscriptions = [
{
id: 'sub-123',
url: mockServerUrl,
events: ['buy', 'sell'],
createdAt: new Date(),
updatedAt: new Date(),
},
];

activityCreateMock.mockResolvedValue(mockActivity);
webhookFindManyMock.mockResolvedValue(mockSubscriptions);

await httpSimulateTrade(req, res, next);

expect(res.status).not.toHaveBeenCalledWith(400);
expect(res.json).toHaveBeenCalledWith(
expect.objectContaining({
success: true,
data: expect.objectContaining({
activity: expect.objectContaining({ id: 'activity-buy-1' }),
}),
})
);

// Verify webhook HTTP delivery
expect(receivedPayloads).toHaveLength(1);
expect(receivedPayloads[0]).toEqual({
event_type: 'buy',
activity: expect.objectContaining({
id: 'activity-buy-1',
type: 'KEY_BOUGHT',
}),
});
});

it('should deliver webhook with event_type: sell when a sell trade is simulated', async () => {
const req = makeReq({
type: 'sell',
amount: 2,
price: 8.0,
creatorId: 'creator-abc',
actor: 'user-xyz',
});
const res = makeRes();
const next = makeNext();

const mockActivity = {
id: 'activity-sell-1',
type: 'KEY_SOLD',
actor: 'user-xyz',
creatorId: 'creator-abc',
payload: { amount: 2, price: 8.0 },
createdAt: new Date(),
};

const mockSubscriptions = [
{
id: 'sub-123',
url: mockServerUrl,
events: ['buy', 'sell'],
createdAt: new Date(),
updatedAt: new Date(),
},
];

activityCreateMock.mockResolvedValue(mockActivity);
webhookFindManyMock.mockResolvedValue(mockSubscriptions);

await httpSimulateTrade(req, res, next);

expect(res.status).not.toHaveBeenCalledWith(400);
expect(res.json).toHaveBeenCalledWith(
expect.objectContaining({
success: true,
data: expect.objectContaining({
activity: expect.objectContaining({ id: 'activity-sell-1' }),
}),
})
);

// Verify webhook HTTP delivery
expect(receivedPayloads).toHaveLength(1);
expect(receivedPayloads[0]).toEqual({
event_type: 'sell',
activity: expect.objectContaining({
id: 'activity-sell-1',
type: 'KEY_SOLD',
}),
});
});

it('should not deliver webhook if subscription does not match event_type', async () => {
const req = makeReq({
type: 'sell',
amount: 1,
price: 10.0,
creatorId: 'creator-abc',
actor: 'user-xyz',
});
const res = makeRes();
const next = makeNext();

activityCreateMock.mockResolvedValue({
id: 'activity-sell-2',
type: 'KEY_SOLD',
actor: 'user-xyz',
creatorId: 'creator-abc',
payload: { amount: 1, price: 10.0 },
createdAt: new Date(),
});

// Mock findMany returning empty array (no subscriptions for sell event)
webhookFindManyMock.mockResolvedValue([]);

await httpSimulateTrade(req, res, next);

expect(receivedPayloads).toHaveLength(0);
});
});
9 changes: 9 additions & 0 deletions src/modules/webhook/webhook.routes.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import { Router } from 'express';
import { httpRegisterWebhook, httpSimulateTrade } from './webhook.controllers';

const router = Router();

router.post('/', httpRegisterWebhook);
router.post('/simulate-trade', httpSimulateTrade);

export default router;
Loading
Loading