Skip to content

Commit 3306588

Browse files
committed
Refactor message application logic in Sync.ts
Simplifies and consolidates the logic for applying CRDT messages to tables and history by removing redundant helper functions and inlining their behavior directly in applyMessages. Updates related test snapshots to reflect the new query structure.
1 parent 208b1c9 commit 3306588

2 files changed

Lines changed: 74 additions & 127 deletions

File tree

packages/common/src/local-first/Sync.ts

Lines changed: 67 additions & 120 deletions
Original file line numberDiff line numberDiff line change
@@ -31,20 +31,13 @@ import {
3131
} from "../Sqlite.js";
3232
import { AbortError, createMutex } from "../Task.js";
3333
import { TimeDep } from "../Time.js";
34-
import {
35-
DateIso,
36-
IdBytes,
37-
idBytesToId,
38-
idToIdBytes,
39-
PositiveInt,
40-
} from "../Type.js";
34+
import { IdBytes, idBytesToId, idToIdBytes, PositiveInt } from "../Type.js";
4135
import { CreateWebSocketDep, WebSocket } from "../WebSocket.js";
4236
import type { AppOwnerDep, PostMessageDep } from "./Db.js";
4337
import {
4438
AppOwner,
4539
Owner,
4640
OwnerId,
47-
OwnerIdBytes,
4841
ownerIdBytesToOwnerId,
4942
ownerIdToOwnerIdBytes,
5043
OwnerTransport,
@@ -71,7 +64,6 @@ import {
7164
getOwnerUsage,
7265
getTimestampInsertStrategy,
7366
Storage,
74-
StorageInsertTimestampStrategy,
7567
StorageWriteError,
7668
updateOwnerUsage,
7769
} from "./Storage.js";
@@ -545,12 +537,6 @@ const createClientStorage =
545537
if (!applyMessagesResult.ok) return applyMessagesResult;
546538
}
547539

548-
// // Apply local mutations atomically with approved messages
549-
// for (const change of localMutations) {
550-
// const result = applyLocalOnlyChange(deps)(change);
551-
// if (!result.ok) return result;
552-
// }
553-
554540
return deps.clock.save(clockTimestamp);
555541
});
556542

@@ -686,35 +672,88 @@ const applyMessages =
686672
): Result<void, SqliteError> => {
687673
const ownerIdBytes = ownerIdToOwnerIdBytes(ownerId);
688674

689-
const usageResult = getOwnerUsage(deps)(
675+
const usage = getOwnerUsage(deps)(
690676
ownerIdBytes,
691677
timestampToTimestampBytes(firstInArray(messages).timestamp),
692678
);
693-
if (!usageResult.ok) return usageResult;
679+
if (!usage.ok) return usage;
680+
681+
let { firstTimestamp, lastTimestamp } = usage.value;
682+
683+
for (const { timestamp, change } of messages) {
684+
const dateIso = timestampToDateIso(timestamp);
685+
const timestampBytes = timestampToTimestampBytes(timestamp);
686+
const idBytes = idToIdBytes(change.id);
687+
688+
const values = [...objectToEntries(change.values)];
694689

695-
let { firstTimestamp, lastTimestamp } = usageResult.value;
690+
// SystemColumns are not encoded in change.values.
691+
if (change.isInsert) {
692+
values.push(["createdAt", dateIso]);
693+
}
694+
if (change.isDelete !== null) {
695+
values.push(["isDeleted", booleanToSqliteBoolean(change.isDelete)]);
696+
}
697+
// No `ownerId` and `updatedAt` because they are evolu_history columns.
698+
699+
for (const [column, value] of values) {
700+
const updateAppTable = deps.sqlite.exec(sql.prepared`
701+
with
702+
existingTimestamp as (
703+
select 1
704+
from evolu_history
705+
where
706+
"ownerId" = ${ownerIdBytes}
707+
and "table" = ${change.table}
708+
and "id" = ${idBytes}
709+
and "column" = ${column}
710+
and "timestamp" >= ${timestampBytes}
711+
limit 1
712+
)
713+
insert into ${sql.identifier(change.table)}
714+
("ownerId", "id", ${sql.identifier(column)}, "updatedAt")
715+
select ${ownerId}, ${change.id}, ${value}, ${dateIso}
716+
where not exists (select 1 from existingTimestamp)
717+
on conflict ("ownerId", "id") do update
718+
set
719+
${sql.identifier(column)} = ${value},
720+
"updatedAt" = ${dateIso}
721+
where not exists (select 1 from existingTimestamp);
722+
`);
696723

697-
for (const message of messages) {
698-
const date = timestampToDateIso(message.timestamp);
699-
const result1 = applyMessageToAppTable(deps)(ownerIdBytes, message, date);
700-
if (!result1.ok) return result1;
724+
if (!updateAppTable.ok) return updateAppTable;
725+
726+
const insertHistory = deps.sqlite.exec(sql.prepared`
727+
insert into evolu_history
728+
("ownerId", "table", "id", "column", "value", "timestamp")
729+
values
730+
(
731+
${ownerIdBytes},
732+
${change.table},
733+
${idBytes},
734+
${column},
735+
${value},
736+
${timestampBytes}
737+
)
738+
on conflict do nothing;
739+
`);
701740

702-
const timestamp = timestampToTimestampBytes(message.timestamp);
741+
if (!insertHistory.ok) return insertHistory;
742+
}
703743

704744
let strategy;
705745
[strategy, firstTimestamp, lastTimestamp] = getTimestampInsertStrategy(
706-
timestamp,
746+
timestampBytes,
707747
firstTimestamp,
708748
lastTimestamp,
709749
);
710750

711-
const result2 = applyMessageToTimestampAndHistoryTables(deps)(
751+
const insertTimestamp = deps.storage.insertTimestamp(
712752
ownerIdBytes,
713-
message,
753+
timestampBytes,
714754
strategy,
715-
date,
716755
);
717-
if (!result2.ok) return result2;
756+
if (!insertTimestamp.ok) return insertTimestamp;
718757
}
719758

720759
/**
@@ -733,98 +772,6 @@ const applyMessages =
733772
return ok();
734773
};
735774

736-
const applyMessageToAppTable =
737-
(deps: SqliteDep) =>
738-
(
739-
ownerIdBytes: OwnerIdBytes,
740-
message: CrdtMessage,
741-
date: DateIso,
742-
): Result<void, SqliteError> => {
743-
const ownerId = ownerIdBytesToOwnerId(ownerIdBytes);
744-
const columns = dbChangeToColumns(message.change, date);
745-
746-
for (const [column, value] of columns) {
747-
const result = deps.sqlite.exec(sql.prepared`
748-
with
749-
existingTimestamp as (
750-
select 1
751-
from evolu_history
752-
where
753-
"ownerId" = ${ownerIdBytes}
754-
and "table" = ${message.change.table}
755-
and "id" = ${idToIdBytes(message.change.id)}
756-
and "column" = ${column}
757-
and "timestamp" >= ${timestampToTimestampBytes(message.timestamp)}
758-
limit 1
759-
)
760-
insert into ${sql.identifier(message.change.table)}
761-
("ownerId", "id", ${sql.identifier(column)}, updatedAt)
762-
select ${ownerId}, ${message.change.id}, ${value}, ${date}
763-
where not exists (select 1 from existingTimestamp)
764-
on conflict ("ownerId", "id") do update
765-
set
766-
${sql.identifier(column)} = ${value},
767-
updatedAt = ${date}
768-
where not exists (select 1 from existingTimestamp);
769-
`);
770-
771-
if (!result.ok) return result;
772-
}
773-
774-
return ok();
775-
};
776-
777-
export const applyMessageToTimestampAndHistoryTables =
778-
(deps: ClientStorageDep & SqliteDep) =>
779-
(
780-
ownerId: OwnerIdBytes,
781-
message: CrdtMessage,
782-
strategy: StorageInsertTimestampStrategy,
783-
date: DateIso,
784-
): Result<void, SqliteError> => {
785-
const timestamp = timestampToTimestampBytes(message.timestamp);
786-
const id = idToIdBytes(message.change.id);
787-
788-
const result = deps.storage.insertTimestamp(ownerId, timestamp, strategy);
789-
if (!result.ok) return result;
790-
791-
const columns = dbChangeToColumns(message.change, date);
792-
793-
for (const [column, value] of columns) {
794-
const result = deps.sqlite.exec(sql.prepared`
795-
insert into evolu_history
796-
("ownerId", "table", "id", "column", "value", "timestamp")
797-
values
798-
(
799-
${ownerId},
800-
${message.change.table},
801-
${id},
802-
${column},
803-
${value},
804-
${timestamp}
805-
)
806-
on conflict do nothing;
807-
`);
808-
if (!result.ok) return result;
809-
}
810-
811-
return ok();
812-
};
813-
814-
const dbChangeToColumns = (
815-
change: DbChange,
816-
date: DateIso,
817-
): Array<[string, SqliteValue | DateIso]> => {
818-
const entries = [...objectToEntries(change.values)];
819-
if (change.isInsert) {
820-
entries.push(["createdAt", date]);
821-
}
822-
if (change.isDelete !== null) {
823-
entries.push(["isDeleted", booleanToSqliteBoolean(change.isDelete)]);
824-
}
825-
return entries;
826-
};
827-
828775
/**
829776
* TODO: Rework for the new owners API.
830777
*

packages/common/test/Evolu/__snapshots__/Db.test.ts.snap

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -99,13 +99,13 @@ exports[`sends messages when socket is opened 2`] = `
9999
"create table "_localTable" ( "id" text, "createdAt" any, "updatedAt" any, "is...",
100100
"select storedBytes, firstTimestamp, lastTimestamp from evolu_usage where owne...",
101101
"with existingTimestamp as ( select 1 from evolu_history where "ownerId" = ? a...",
102+
"insert into evolu_history ("ownerId", "table", "id", "column", "value", "time...",
102103
"with existingTimestamp as ( select 1 from evolu_history where "ownerId" = ? a...",
104+
"insert into evolu_history ("ownerId", "table", "id", "column", "value", "time...",
103105
"with existingTimestamp as ( select 1 from evolu_history where "ownerId" = ? a...",
106+
"insert into evolu_history ("ownerId", "table", "id", "column", "value", "time...",
104107
"insert into evolu_timestamp (ownerId, l, t, h1, h2, c) values (?, 1, ?, ?, ?,...",
105108
"with p(l, t, h1, h2) as ( select ( select max(l) + 1 from evolu_timestamp whe...",
106-
"insert into evolu_history ("ownerId", "table", "id", "column", "value", "time...",
107-
"insert into evolu_history ("ownerId", "table", "id", "column", "value", "time...",
108-
"insert into evolu_history ("ownerId", "table", "id", "column", "value", "time...",
109109
"insert into evolu_usage ("ownerId", "storedBytes", "firstTimestamp", "lastTim...",
110110
"update evolu_config set "clock" = ?;",
111111
"with ml(ml) as ( select max(l) from evolu_timestamp where ownerId = ? ), sc(l...",
@@ -131,11 +131,11 @@ exports[`sync mutations 9`] = `
131131
"create table "_localTable" ( "id" text, "createdAt" any, "updatedAt" any, "is...",
132132
"select storedBytes, firstTimestamp, lastTimestamp from evolu_usage where owne...",
133133
"with existingTimestamp as ( select 1 from evolu_history where "ownerId" = ? a...",
134+
"insert into evolu_history ("ownerId", "table", "id", "column", "value", "time...",
134135
"with existingTimestamp as ( select 1 from evolu_history where "ownerId" = ? a...",
136+
"insert into evolu_history ("ownerId", "table", "id", "column", "value", "time...",
135137
"insert into evolu_timestamp (ownerId, t, l) values (?, ?, ?) on conflict do n...",
136138
"with c0(b, cl, pt, nt, h1, h2, c) as ( select 0, ( select max(l) from evolu_t...",
137-
"insert into evolu_history ("ownerId", "table", "id", "column", "value", "time...",
138-
"insert into evolu_history ("ownerId", "table", "id", "column", "value", "time...",
139139
"insert into evolu_usage ("ownerId", "storedBytes", "firstTimestamp", "lastTim...",
140140
"update evolu_config set "clock" = ?;",
141141
"select * from "testTable" where "isDeleted" is null",
@@ -150,8 +150,8 @@ exports[`sync mutations 9`] = `
150150
"select * from "_localTable";",
151151
"select storedBytes, firstTimestamp, lastTimestamp from evolu_usage where owne...",
152152
"with existingTimestamp as ( select 1 from evolu_history where "ownerId" = ? a...",
153-
"insert into evolu_timestamp (ownerId, l, t, h1, h2, c) values (?, 1, ?, ?, ?,...",
154153
"insert into evolu_history ("ownerId", "table", "id", "column", "value", "time...",
154+
"insert into evolu_timestamp (ownerId, l, t, h1, h2, c) values (?, 1, ?, ?, ?,...",
155155
"insert into evolu_usage ("ownerId", "storedBytes", "firstTimestamp", "lastTim...",
156156
"update evolu_config set "clock" = ?;",
157157
"select * from "testTable" where "isDeleted" is null",
@@ -166,8 +166,8 @@ exports[`sync mutations 9`] = `
166166
"select * from "_localTable";",
167167
"select storedBytes, firstTimestamp, lastTimestamp from evolu_usage where owne...",
168168
"with existingTimestamp as ( select 1 from evolu_history where "ownerId" = ? a...",
169-
"insert into evolu_timestamp (ownerId, l, t, h1, h2, c) values (?, 1, ?, ?, ?,...",
170169
"insert into evolu_history ("ownerId", "table", "id", "column", "value", "time...",
170+
"insert into evolu_timestamp (ownerId, l, t, h1, h2, c) values (?, 1, ?, ?, ?,...",
171171
"insert into evolu_usage ("ownerId", "storedBytes", "firstTimestamp", "lastTim...",
172172
"update evolu_config set "clock" = ?;",
173173
"select * from "testTable" where "isDeleted" is null",

0 commit comments

Comments
 (0)