@@ -62,7 +62,7 @@ import {
6262 ProtocolTimestampMismatchError ,
6363 SubscriptionFlags ,
6464} from "./Protocol.js" ;
65- import { MutationChange } from "./Schema.js" ;
65+ import { DbSchemaDep , MutationChange } from "./Schema.js" ;
6666import {
6767 BaseSqliteStorage ,
6868 CrdtMessage ,
@@ -157,6 +157,7 @@ export const createSync =
157157 deps : ClockDep &
158158 ConsoleDep &
159159 CreateWebSocketDep &
160+ DbSchemaDep &
160161 PostMessageDep &
161162 RandomBytesDep &
162163 RandomDep &
@@ -445,6 +446,7 @@ export interface ClientStorageDep {
445446const createClientStorage =
446447 (
447448 deps : ClockDep &
449+ DbSchemaDep &
448450 GetSyncOwnerDep &
449451 RandomDep &
450452 SqliteDep &
@@ -687,7 +689,7 @@ export const applyLocalOnlyChange =
687689 } ;
688690
689691const applyMessages =
690- ( deps : ClientStorageDep & ClockDep & RandomDep & SqliteDep ) =>
692+ ( deps : ClientStorageDep & ClockDep & DbSchemaDep & RandomDep & SqliteDep ) =>
691693 (
692694 ownerId : OwnerId ,
693695 messages : NonEmptyReadonlyArray < CrdtMessage > ,
@@ -702,10 +704,24 @@ const applyMessages =
702704
703705 let { firstTimestamp, lastTimestamp } = usage . value ;
704706
705- for ( const { timestamp, change } of messages ) {
706- const timestampBytes = timestampToTimestampBytes ( timestamp ) ;
707- const idBytes = idToIdBytes ( change . id ) ;
708- const columns = dbChangeToColumns ( change , timestampToDateIso ( timestamp ) ) ;
707+ // const tableColumnsMap = new Map(
708+ // deps.dbSchema.tables.map((table) => [table.name, new Set(table.columns)]),
709+ // );
710+
711+ for ( const message of messages ) {
712+ // const tableColumns = tableColumnsMap.get(message.change.table);
713+ // const isValidMessage =
714+ // tableColumns != null &&
715+ // new Set(Object.keys(message.change.values)).isSubsetOf(tableColumns);
716+
717+ // console.log({ isValidMessage });
718+
719+ const timestampBytes = timestampToTimestampBytes ( message . timestamp ) ;
720+ const idBytes = idToIdBytes ( message . change . id ) ;
721+ const columns = dbChangeToColumns (
722+ message . change ,
723+ timestampToDateIso ( message . timestamp ) ,
724+ ) ;
709725
710726 for ( const [ column , value ] of columns ) {
711727 const updateAppTable = deps . sqlite . exec ( sql . prepared `
@@ -715,15 +731,15 @@ const applyMessages =
715731 from evolu_history
716732 where
717733 "ownerId" = ${ ownerIdBytes }
718- and "table" = ${ change . table }
734+ and "table" = ${ message . change . table }
719735 and "id" = ${ idBytes }
720736 and "column" = ${ column }
721737 and "timestamp" >= ${ timestampBytes }
722738 limit 1
723739 )
724- insert into ${ sql . identifier ( change . table ) }
740+ insert into ${ sql . identifier ( message . change . table ) }
725741 ("ownerId", "id", ${ sql . identifier ( column ) } )
726- select ${ ownerId } , ${ change . id } , ${ value }
742+ select ${ ownerId } , ${ message . change . id } , ${ value }
727743 where not exists (select 1 from existingTimestamp)
728744 on conflict ("ownerId", "id") do update
729745 set ${ sql . identifier ( column ) } = ${ value }
@@ -738,7 +754,7 @@ const applyMessages =
738754 values
739755 (
740756 ${ ownerIdBytes } ,
741- ${ change . table } ,
757+ ${ message . change . table } ,
742758 ${ idBytes } ,
743759 ${ column } ,
744760 ${ value } ,
@@ -766,9 +782,8 @@ const applyMessages =
766782 }
767783
768784 /**
769- * TODO: Implement proper storedBytes tracking for client using encrypted
770- * message sizes (need to figure out how to reuse received or postpone
771- * client...).
785+ * TODO: Implement proper storedBytes tracking for client using received and
786+ * sent encrypted message sizes.
772787 */
773788 const updateUsage = updateOwnerUsage ( deps ) (
774789 ownerIdBytes ,
0 commit comments