Skip to content

Commit 5f5a867

Browse files
committed
Fix forward compatibility by quarantining messages
1 parent ed9ddd2 commit 5f5a867

11 files changed

Lines changed: 722 additions & 425 deletions

File tree

.changeset/three-buses-play.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
---
2+
"@evolu/common": patch
3+
---
4+
5+
Fix forward compatibility by quarantining messages with unknown schema
6+
7+
Messages with unknown tables or columns are now stored in `evolu_message_quarantine` table instead of being discarded. This fixes an issue where apps had to be updated to receive messages from newer versions. The quarantine table is queryable via `createQuery` and quarantined messages are automatically applied when the schema is updated.

packages/common/src/local-first/Db.ts

Lines changed: 60 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import {
3232
} from "../Worker.js";
3333
import {
3434
AppOwner,
35+
AppOwnerDep,
3536
createAppOwner,
3637
createOwnerSecret,
3738
createOwnerWebSocketTransport,
@@ -63,6 +64,7 @@ import {
6364
createSync,
6465
SyncDep,
6566
SyncOwner,
67+
tryApplyQuarantinedMessages,
6668
} from "./Sync.js";
6769
import {
6870
Timestamp,
@@ -320,10 +322,6 @@ type DbWorkerDeps = Omit<
320322
SqliteDep &
321323
SyncDep;
322324

323-
export interface AppOwnerDep {
324-
readonly appOwner: AppOwner;
325-
}
326-
327325
export interface PostMessageDep {
328326
readonly postMessage: (message: DbWorkerOutput) => void;
329327
}
@@ -372,9 +370,7 @@ const createDbWorkerDeps = async (
372370
const dbSchema = getDbSchema(deps)();
373371
if (!dbSchema.ok) return dbSchema;
374372

375-
const dbIsInitialized = dbSchema.value.tables.some(
376-
(table) => table.name === "evolu_version",
377-
);
373+
const dbIsInitialized = "evolu_version" in dbSchema.value.tables;
378374

379375
let appOwner: AppOwner;
380376
let clock: Clock;
@@ -426,15 +422,21 @@ const createDbWorkerDeps = async (
426422
if (!result.ok) return result;
427423
}
428424

429-
const result1 = ensureDbSchema(deps)(initMessage.dbSchema, dbSchema.value);
430-
if (!result1.ok) return result1;
425+
{
426+
const result = ensureDbSchema(deps)(initMessage.dbSchema, dbSchema.value);
427+
if (!result.ok) return result;
428+
}
429+
430+
{
431+
const result = ensureMessageQuarantineTable(deps);
432+
if (!result.ok) return result;
433+
}
431434

432435
const sync = createSync({
433436
...deps,
434437
clock,
435438
symmetricCrypto: createSymmetricCrypto(platformDeps),
436439
timestampConfig: initMessage.config,
437-
postMessage,
438440
dbSchema: initMessage.dbSchema,
439441
})({
440442
appOwner,
@@ -448,6 +450,14 @@ const createDbWorkerDeps = async (
448450
});
449451
if (!sync.ok) return sync;
450452

453+
{
454+
const result = tryApplyQuarantinedMessages({
455+
...deps,
456+
dbSchema: initMessage.dbSchema,
457+
})();
458+
if (!result.ok) return result;
459+
}
460+
451461
sync.value.useOwner(true, appOwner);
452462

453463
return ok({
@@ -557,6 +567,41 @@ const initializeDb =
557567
return ok();
558568
};
559569

570+
/**
571+
* Ensures the quarantine table exists for storing messages with unknown schema.
572+
*
573+
* When a device receives sync messages containing tables or columns that don't
574+
* exist in its current schema (e.g., from a newer app version), those messages
575+
* are stored here instead of being discarded. This enables forward
576+
* compatibility:
577+
*
578+
* 1. Unknown data is preserved and can be applied when the app is updated
579+
* 2. Messages are still propagated to other devices that may understand them
580+
* 3. Partial messages work - known columns go to app tables, unknown to quarantine
581+
*
582+
* The `union all` query in `readDbChange` combines `evolu_history` and this
583+
* table, ensuring all data (known and unknown) is included when syncing to
584+
* other devices.
585+
*/
586+
const ensureMessageQuarantineTable = (
587+
deps: SqliteDep,
588+
): Result<void, SqliteError> => {
589+
const result = deps.sqlite.exec(sql`
590+
create table if not exists evolu_message_quarantine (
591+
"ownerId" blob not null,
592+
"timestamp" blob not null,
593+
"table" text not null,
594+
"id" blob not null,
595+
"column" text not null,
596+
"value" any,
597+
primary key ("ownerId", "timestamp", "table", "id", "column")
598+
)
599+
strict;
600+
`);
601+
if (!result.ok) return result;
602+
return ok();
603+
};
604+
560605
const handlers: Omit<MessageHandlers<DbWorkerInput, DbWorkerDeps>, "init"> = {
561606
getAppOwner: (deps) => () => {
562607
deps.postMessage({
@@ -628,19 +673,19 @@ const handlers: Omit<MessageHandlers<DbWorkerInput, DbWorkerDeps>, "init"> = {
628673
},
629674

630675
reset: (deps) => (message) => {
631-
const resetResult = deps.sqlite.transaction(() => {
676+
const result = deps.sqlite.transaction(() => {
632677
const dbSchema = getDbSchema(deps)();
633678
if (!dbSchema.ok) return dbSchema;
634679

635-
for (const table of dbSchema.value.tables) {
680+
for (const tableName in dbSchema.value.tables) {
636681
/**
637682
* The dropped table is completely removed from the database schema and
638683
* the disk file. The table can not be recovered. All indices and
639684
* triggers associated with the table are also deleted.
640685
* https://sqlite.org/lang_droptable.html
641686
*/
642687
const result = deps.sqlite.exec(sql`
643-
drop table ${sql.identifier(table.name)};
688+
drop table ${sql.identifier(tableName)};
644689
`);
645690
if (!result.ok) return result;
646691
}
@@ -659,8 +704,8 @@ const handlers: Omit<MessageHandlers<DbWorkerInput, DbWorkerDeps>, "init"> = {
659704
return ok();
660705
});
661706

662-
if (!resetResult.ok) {
663-
deps.postMessage({ type: "onError", error: resetResult.error });
707+
if (!result.ok) {
708+
deps.postMessage({ type: "onError", error: result.error });
664709
return;
665710
}
666711

packages/common/src/local-first/Evolu.ts

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,9 @@ export interface EvoluConfig extends Partial<DbConfig> {
101101
* URL to reload browser tabs after reset or restore.
102102
*
103103
* The default value is `/`.
104+
*
105+
* Note: This option will be moved to web platform deps in the next major
106+
* version.
104107
*/
105108
readonly reloadUrl?: string;
106109
}
@@ -836,15 +839,15 @@ const createEvoluInstance =
836839

837840
subscribeQuery: (query) => (listener) => {
838841
// Call the listener only if the result has been changed.
839-
let previousResult: unknown = null;
842+
let previousRows: unknown = null;
840843
const unsubscribe = subscribedQueries.subscribe(query)(() => {
841-
const result = evolu.getQueryRows(query);
842-
if (previousResult === result) return;
843-
previousResult = result;
844+
const rows = evolu.getQueryRows(query);
845+
if (previousRows === rows) return;
846+
previousRows = rows;
844847
listener();
845848
});
846849
return () => {
847-
previousResult = null;
850+
previousRows = null;
848851
unsubscribe();
849852
};
850853
},

packages/common/src/local-first/Owner.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,10 @@ export interface AppOwner extends Owner {
188188
readonly mnemonic?: Mnemonic | null;
189189
}
190190

191+
export interface AppOwnerDep {
192+
readonly appOwner: AppOwner;
193+
}
194+
191195
/** Creates an {@link AppOwner} from an {@link OwnerSecret}. */
192196
export const createAppOwner = (secret: OwnerSecret): AppOwner => ({
193197
...createOwner(secret),

packages/common/src/local-first/Schema.ts

Lines changed: 48 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
11
import * as Kysely from "kysely";
2-
import { mapObject, objectToEntries, ReadonlyRecord } from "../Object.js";
2+
import {
3+
createRecord,
4+
getProperty,
5+
mapObject,
6+
ReadonlyRecord,
7+
} from "../Object.js";
38
import { ok, Result } from "../Result.js";
49
import {
510
SafeSql,
@@ -30,6 +35,8 @@ import {
3035
omit,
3136
optional,
3237
OptionalType,
38+
record,
39+
set,
3340
String,
3441
TableId,
3542
Type,
@@ -42,6 +49,7 @@ import { AppOwner, OwnerId } from "./Owner.js";
4249
import { Query, Row } from "./Query.js";
4350
import type { CrdtMessage, DbChange } from "./Storage.js";
4451
import { Timestamp, TimestampBytes } from "./Timestamp.js";
52+
import { readonly } from "../Function.js";
4553

4654
/**
4755
* Defines the schema of an Evolu database.
@@ -169,12 +177,10 @@ export const evoluSchemaToDbSchema = (
169177
schema: EvoluSchema,
170178
indexesConfig?: IndexesConfig,
171179
): DbSchema => {
172-
const tables = objectToEntries(schema).map(([tableName, table]) => ({
173-
name: tableName,
174-
columns: objectToEntries(table)
175-
.filter(([k]) => k !== "id")
176-
.map(([k]) => k),
177-
}));
180+
const tables = mapObject(
181+
schema,
182+
(table) => new Set(Object.keys(table).filter((k) => k !== "id")),
183+
);
178184

179185
const indexes = indexesConfig
180186
? indexesConfig(createIndex).map(
@@ -206,6 +212,13 @@ export type CreateQuery<S extends EvoluSchema> = <R extends Row>(
206212
readonly column: string;
207213
readonly value: SqliteValue;
208214
};
215+
readonly evolu_message_quarantine: {
216+
readonly timestamp: TimestampBytes;
217+
readonly table: string;
218+
readonly id: IdBytes;
219+
readonly column: string;
220+
readonly value: SqliteValue;
221+
};
209222
}
210223
>,
211224
"selectFrom" | "fn" | "with" | "withRecursive"
@@ -231,7 +244,11 @@ export const SystemColumns = object({
231244
});
232245
export type SystemColumns = typeof SystemColumns.Type;
233246

234-
export const systemColumns = Object.keys(SystemColumns.props);
247+
export const systemColumns = readonly(
248+
new Set(Object.keys(SystemColumns.props)),
249+
);
250+
251+
export const systemColumnsWithId = readonly([...systemColumns, "id"]);
235252

236253
export type MutationKind = "insert" | "update" | "upsert";
237254

@@ -443,21 +460,17 @@ export type InferColumnErrors<
443460
>;
444461
}[keyof MutationMapping<T, M>];
445462

446-
export const DbTable = object({
447-
name: String,
448-
columns: array(String),
449-
});
450-
export type DbTable = typeof DbTable.Type;
451-
452463
export const DbIndex = object({ name: String, sql: String });
453464
export type DbIndex = typeof DbIndex.Type;
454465

455466
export const DbSchema = object({
456-
tables: array(DbTable),
467+
tables: record(String, set(String)),
457468
indexes: array(DbIndex),
458469
});
459470
export type DbSchema = typeof DbSchema.Type;
460471

472+
// TODO: Use a ref and update dbSchema on hot reloading to support
473+
// development workflows where schema changes without full app restart.
461474
export interface DbSchemaDep {
462475
readonly dbSchema: DbSchema;
463476
}
@@ -469,7 +482,7 @@ export const getDbSchema =
469482
DbSchema,
470483
SqliteError
471484
> => {
472-
const map = new Map<string, Array<string>>();
485+
const tables = createRecord<string, Set<string>>();
473486

474487
const tableAndColumnInfoRows = deps.sqlite.exec(sql`
475488
select
@@ -487,12 +500,9 @@ export const getDbSchema =
487500
tableName: string;
488501
columnName: string;
489502
};
490-
if (!map.has(tableName)) map.set(tableName, []);
491-
map.get(tableName)?.push(columnName);
503+
(tables[tableName] ??= new Set()).add(columnName);
492504
});
493505

494-
const tables = Array.from(map, ([name, columns]) => ({ name, columns }));
495-
496506
const indexesRows = deps.sqlite.exec(
497507
allIndexes
498508
? sql`
@@ -546,23 +556,19 @@ export const ensureDbSchema =
546556
currentSchema = dbSchema.value;
547557
}
548558

549-
newSchema.tables.forEach((newTable) => {
550-
const currentTable = currentSchema.tables.find(
551-
(t) => t.name === newTable.name,
552-
);
553-
if (!currentTable) {
554-
queries.push(createAppTable(newTable));
559+
for (const [tableName, newColumns] of Object.entries(newSchema.tables)) {
560+
const currentColumns = getProperty(currentSchema.tables, tableName);
561+
if (!currentColumns) {
562+
queries.push(createAppTable(tableName, newColumns));
555563
} else {
556-
newTable.columns
557-
.filter((newColumn) => !currentTable.columns.includes(newColumn))
558-
.forEach((newColumn) => {
559-
queries.push(sql`
560-
alter table ${sql.identifier(newTable.name)}
561-
add column ${sql.identifier(newColumn)} blob;
562-
`);
563-
});
564+
for (const newColumn of newColumns.difference(currentColumns)) {
565+
queries.push(sql`
566+
alter table ${sql.identifier(tableName)}
567+
add column ${sql.identifier(newColumn)} any;
568+
`);
569+
}
564570
}
565-
});
571+
}
566572

567573
// Remove current indexes that are not in the newSchema.
568574
currentSchema.indexes
@@ -595,19 +601,19 @@ export const ensureDbSchema =
595601
return ok();
596602
};
597603

598-
const createAppTable = (table: DbTable) => sql`
599-
create table ${sql.identifier(table.name)} (
604+
const createAppTable = (tableName: string, columns: ReadonlySet<string>) => sql`
605+
create table ${sql.identifier(tableName)} (
600606
"id" text,
601607
${sql.raw(
602-
`${systemColumns
603-
.concat(table.columns)
604-
.filter((c) => c !== "id")
608+
`${[...systemColumns, ...columns]
605609
// With strict tables and any type, data is preserved exactly as received
606610
// without any type affinity coercion. This allows storing any data type
607611
// while maintaining strict null enforcement for primary key columns.
612+
// TODO: Use proper SQLite types for system columns (text for createdAt,
613+
// updatedAt, ownerId, integer for isDeleted) instead of "any".
608614
.map((name) => `${sql.identifier(name).sql} any`)
609-
.join(", ")}`,
610-
)},
615+
.join(", ")}, `,
616+
)}
611617
primary key ("ownerId", "id")
612618
)
613619
without rowid, strict;

0 commit comments

Comments
 (0)