Skip to content

Commit 44635f6

Browse files
committed
Add tests for writeMessages in Relay
1 parent 09348a0 commit 44635f6

2 files changed

Lines changed: 262 additions & 5 deletions

File tree

packages/common/test/Evolu/Relay.test.ts

Lines changed: 257 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,26 @@
1-
import { expect, test } from "vitest";
1+
import { assert, describe, expect, test } from "vitest";
22
import {
33
EncryptedCrdtMessage,
44
EncryptedDbChange,
55
} from "../../src/Evolu/Storage.js";
6-
import { sql, timestampBytesToTimestamp } from "../../src/index.js";
6+
import { createInitialTimestamp } from "../../src/Evolu/Timestamp.js";
7+
import {
8+
constFalse,
9+
err,
10+
NonNegativeInt,
11+
OwnerIdBytes,
12+
sql,
13+
SqliteDep,
14+
timestampBytesToTimestamp,
15+
wait,
16+
} from "../../src/index.js";
717
import {
818
testCreateRelayStorageAndSqliteDeps,
19+
testDeps,
920
testOwner,
1021
testOwner2,
1122
testOwnerIdBytes,
23+
testOwnerIdBytes2,
1224
} from "../_deps.js";
1325
import { testTimestampsAsc } from "./_fixtures.js";
1426

@@ -57,3 +69,246 @@ test("deleteOwner", async () => {
5769
expect(countResult.ok && countResult.value.rows[0].count).toBe(0);
5870
}
5971
});
72+
73+
describe("writeMessages", () => {
74+
const createTestMessage = (length = 3): EncryptedCrdtMessage => ({
75+
timestamp: createInitialTimestamp(testDeps),
76+
change: new Uint8Array(length) as EncryptedDbChange,
77+
});
78+
79+
const getStoredBytes =
80+
(deps: SqliteDep) =>
81+
(ownerId: OwnerIdBytes): NonNegativeInt => {
82+
const usageResult = deps.sqlite.exec(sql`
83+
select storedBytes
84+
from evolu_usage
85+
where ownerId = ${ownerId};
86+
`);
87+
assert(usageResult.ok);
88+
return usageResult.value.rows[0].storedBytes as NonNegativeInt;
89+
};
90+
91+
const message = createTestMessage();
92+
93+
test("calculates storedBytes correctly", async () => {
94+
const { storage, sqlite } = await testCreateRelayStorageAndSqliteDeps();
95+
96+
await storage.writeMessages(testOwnerIdBytes, [message]);
97+
98+
expect(getStoredBytes({ sqlite })(testOwnerIdBytes)).toBe(3);
99+
});
100+
101+
test("accumulates storedBytes across multiple writes", async () => {
102+
const { storage, sqlite } = await testCreateRelayStorageAndSqliteDeps();
103+
104+
await storage.writeMessages(testOwnerIdBytes, [message]);
105+
await storage.writeMessages(testOwnerIdBytes, [message]);
106+
107+
expect(getStoredBytes({ sqlite })(testOwnerIdBytes)).toBe(6);
108+
});
109+
110+
test("prevents duplicate timestamp writes", async () => {
111+
const { storage, sqlite } = await testCreateRelayStorageAndSqliteDeps();
112+
113+
const result1 = await storage.writeMessages(testOwnerIdBytes, [message]);
114+
assert(result1.ok);
115+
116+
const result2 = await storage.writeMessages(testOwnerIdBytes, [message]);
117+
assert(result2.ok);
118+
119+
const countResult = sqlite.exec<{ count: number }>(sql`
120+
select count(*) as count
121+
from evolu_message
122+
where ownerId = ${testOwnerIdBytes};
123+
`);
124+
125+
assert(countResult.ok);
126+
expect(countResult.value.rows[0].count).toBe(1);
127+
});
128+
129+
test("mutex prevents concurrent writes for same owner", async () => {
130+
let concurrentAccess = false;
131+
let activeWrites = 0;
132+
133+
const { storage } = await testCreateRelayStorageAndSqliteDeps({
134+
isOwnerWithinQuota: async (_ownerId, _requiredBytes) => {
135+
activeWrites++;
136+
if (activeWrites > 1) {
137+
concurrentAccess = true;
138+
}
139+
await wait("1ms")(); // Simulate some work
140+
activeWrites--;
141+
return true;
142+
},
143+
});
144+
145+
const message1 = createTestMessage();
146+
const message2 = createTestMessage();
147+
148+
await Promise.all([
149+
storage.writeMessages(testOwnerIdBytes, [message1]),
150+
storage.writeMessages(testOwnerIdBytes, [message2]),
151+
]);
152+
153+
expect(concurrentAccess).toBe(false);
154+
expect(storage.getSize(testOwnerIdBytes)).toBe(2);
155+
});
156+
157+
test("allows concurrent writes for different owners", async () => {
158+
let activeWrites = 0;
159+
let maxConcurrentWrites = 0;
160+
161+
const { storage } = await testCreateRelayStorageAndSqliteDeps({
162+
isOwnerWithinQuota: async (_ownerId, _requiredBytes) => {
163+
activeWrites++;
164+
maxConcurrentWrites = Math.max(maxConcurrentWrites, activeWrites);
165+
await wait("1ms")(); // Simulate some work
166+
activeWrites--;
167+
return true;
168+
},
169+
});
170+
171+
const message1 = createTestMessage();
172+
const message2 = createTestMessage();
173+
174+
await Promise.all([
175+
storage.writeMessages(testOwnerIdBytes, [message1]),
176+
storage.writeMessages(testOwnerIdBytes2, [message2]),
177+
]);
178+
179+
expect(maxConcurrentWrites).toBe(2); // Both writes should be active simultaneously
180+
expect(storage.getSize(testOwnerIdBytes)).toBe(1);
181+
expect(storage.getSize(testOwnerIdBytes2)).toBe(1);
182+
});
183+
184+
test("transaction rollback on quota error", async () => {
185+
const { storage, sqlite } = await testCreateRelayStorageAndSqliteDeps({
186+
isOwnerWithinQuota: constFalse,
187+
});
188+
189+
const result = await storage.writeMessages(testOwnerIdBytes, [message]);
190+
191+
expect(result).toEqual(
192+
err({ type: "StorageQuotaError", ownerId: testOwner.id }),
193+
);
194+
195+
const messageCountResult = sqlite.exec<{ count: number }>(sql`
196+
select count(*) as count
197+
from evolu_message
198+
where ownerId = ${testOwnerIdBytes};
199+
`);
200+
201+
assert(messageCountResult.ok);
202+
expect(messageCountResult.value.rows[0].count).toBe(0);
203+
204+
const usageResult = sqlite.exec<{ count: number }>(sql`
205+
select count(*) as count
206+
from evolu_usage
207+
where ownerId = ${testOwnerIdBytes};
208+
`);
209+
210+
assert(usageResult.ok);
211+
expect(usageResult.value.rows[0].count).toBe(0);
212+
});
213+
214+
describe("isOwnerWithinQuota", () => {
215+
test("succeeds when isOwnerWithinQuota returns true", async () => {
216+
let quotaCheckCalled = false;
217+
let receivedOwnerId = "";
218+
let receivedBytes = 0;
219+
220+
const { storage } = await testCreateRelayStorageAndSqliteDeps({
221+
isOwnerWithinQuota: (ownerId, requiredBytes) => {
222+
quotaCheckCalled = true;
223+
receivedOwnerId = ownerId;
224+
receivedBytes = requiredBytes;
225+
return true;
226+
},
227+
});
228+
229+
const result = await storage.writeMessages(testOwnerIdBytes, [message]);
230+
231+
assert(result.ok);
232+
expect(quotaCheckCalled).toBe(true);
233+
expect(receivedOwnerId).toBe(testOwner.id);
234+
expect(receivedBytes).toBe(3);
235+
});
236+
237+
test("succeeds when async isOwnerWithinQuota returns true", async () => {
238+
let quotaCheckCalled = false;
239+
let receivedOwnerId = "";
240+
let receivedBytes = 0;
241+
242+
const { storage } = await testCreateRelayStorageAndSqliteDeps({
243+
isOwnerWithinQuota: async (ownerId, requiredBytes) => {
244+
await wait("1ms")();
245+
quotaCheckCalled = true;
246+
receivedOwnerId = ownerId;
247+
receivedBytes = requiredBytes;
248+
return true;
249+
},
250+
});
251+
252+
const result = await storage.writeMessages(testOwnerIdBytes, [message]);
253+
254+
assert(result.ok);
255+
expect(quotaCheckCalled).toBe(true);
256+
expect(receivedOwnerId).toBe(testOwner.id);
257+
expect(receivedBytes).toBe(3);
258+
});
259+
260+
test("fails when isOwnerWithinQuota returns false", async () => {
261+
const { storage } = await testCreateRelayStorageAndSqliteDeps({
262+
isOwnerWithinQuota: constFalse,
263+
});
264+
265+
const result = await storage.writeMessages(testOwnerIdBytes, [message]);
266+
267+
expect(result).toEqual(
268+
err({ type: "StorageQuotaError", ownerId: testOwner.id }),
269+
);
270+
});
271+
272+
test("fails when async isOwnerWithinQuota returns false", async () => {
273+
const { storage } = await testCreateRelayStorageAndSqliteDeps({
274+
isOwnerWithinQuota: async () => {
275+
await wait("1ms")();
276+
return false;
277+
},
278+
});
279+
280+
const result = await storage.writeMessages(testOwnerIdBytes, [message]);
281+
282+
expect(result).toEqual(
283+
err({ type: "StorageQuotaError", ownerId: testOwner.id }),
284+
);
285+
});
286+
287+
test("with quota check based on cumulative bytes", async () => {
288+
const quotaLimit = 100;
289+
290+
const { storage, sqlite } = await testCreateRelayStorageAndSqliteDeps({
291+
isOwnerWithinQuota: (_ownerId, requiredBytes) =>
292+
requiredBytes <= quotaLimit,
293+
});
294+
295+
const message1 = createTestMessage(50);
296+
const result1 = await storage.writeMessages(testOwnerIdBytes, [message1]);
297+
assert(result1.ok);
298+
299+
const message2 = createTestMessage(40);
300+
const result2 = await storage.writeMessages(testOwnerIdBytes, [message2]);
301+
assert(result2.ok);
302+
303+
const largeMessage = createTestMessage(20);
304+
const result3 = await storage.writeMessages(testOwnerIdBytes, [
305+
largeMessage,
306+
]);
307+
expect(result3).toEqual(
308+
err({ type: "StorageQuotaError", ownerId: testOwner.id }),
309+
);
310+
311+
expect(getStoredBytes({ sqlite })(testOwnerIdBytes)).toBe(90);
312+
});
313+
});
314+
});

packages/common/test/_deps.ts

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import {
1919
} from "../src/Evolu/Relay.js";
2020
import {
2121
createBaseSqliteStorageTables,
22+
StorageConfig,
2223
StorageDep,
2324
} from "../src/Evolu/Storage.js";
2425
import { constFalse, constVoid } from "../src/Function.js";
@@ -297,9 +298,9 @@ export const testCreateConsole = (): TestConsole => {
297298
};
298299
};
299300

300-
export const testCreateRelayStorageAndSqliteDeps = async (): Promise<
301-
StorageDep & SqliteDep
302-
> => {
301+
export const testCreateRelayStorageAndSqliteDeps = async (
302+
config?: Partial<StorageConfig>,
303+
): Promise<StorageDep & SqliteDep> => {
303304
const sqlite = await testCreateSqlite();
304305

305306
getOrThrow(createBaseSqliteStorageTables({ sqlite }));
@@ -318,6 +319,7 @@ export const testCreateRelayStorageAndSqliteDeps = async (): Promise<
318319
onStorageError: (error) => {
319320
throw new Error(error.type);
320321
},
322+
...config,
321323
});
322324

323325
return { sqlite, storage };

0 commit comments

Comments
 (0)