Skip to content

Commit ba5bb82

Browse files
committed
Refactor protocol message handling to be async
Converted applyProtocolMessageAsClient and applyProtocolMessageAsRelay to async functions and updated all usages and tests accordingly. Storage writeMessages is now async, and related code paths in Sync and Relay modules have been updated to handle promises.
1 parent bd0a47c commit ba5bb82

6 files changed

Lines changed: 235 additions & 221 deletions

File tree

packages/common/src/Evolu/Protocol.ts

Lines changed: 129 additions & 126 deletions
Original file line numberDiff line numberDiff line change
@@ -789,109 +789,122 @@ export type ApplyProtocolMessageAsClientResult =
789789

790790
export const applyProtocolMessageAsClient =
791791
(deps: StorageDep) =>
792-
(
792+
async (
793793
inputMessage: Uint8Array,
794794
options: ApplyProtocolMessageAsClientOptions = {},
795-
): Result<ApplyProtocolMessageAsClientResult, ProtocolError> =>
796-
tryDecodeProtocolData<ApplyProtocolMessageAsClientResult, ProtocolError>(
797-
inputMessage,
798-
(input) => {
799-
const [requestedVersion, ownerId] = decodeVersionAndOwner(input);
800-
const version = options.version ?? protocolVersion;
801-
802-
if (requestedVersion !== version) {
803-
return err<ProtocolUnsupportedVersionError>({
804-
type: "ProtocolUnsupportedVersionError",
805-
unsupportedVersion: requestedVersion,
806-
isInitiator: version < requestedVersion,
807-
ownerId,
808-
});
809-
}
795+
): Promise<
796+
Result<
797+
ApplyProtocolMessageAsClientResult,
798+
| ProtocolInvalidDataError
799+
| ProtocolSyncError
800+
| ProtocolUnsupportedVersionError
801+
| ProtocolWriteError
802+
| ProtocolWriteKeyError
803+
>
804+
> => {
805+
// try-catch instead of Result for performance and stacktraces
806+
try {
807+
const input = createBuffer(inputMessage);
808+
const [requestedVersion, ownerId] = decodeVersionAndOwner(input);
809+
const version = options.version ?? protocolVersion;
810810

811-
const messageType = input.shift() as MessageType;
812-
assert(
813-
messageType === MessageType.Response ||
814-
messageType === MessageType.Broadcast,
815-
"Invalid MessageType",
816-
);
811+
if (requestedVersion !== version) {
812+
return err<ProtocolUnsupportedVersionError>({
813+
type: "ProtocolUnsupportedVersionError",
814+
unsupportedVersion: requestedVersion,
815+
isInitiator: version < requestedVersion,
816+
ownerId,
817+
});
818+
}
817819

818-
if (messageType === MessageType.Response) {
819-
const errorCode = input.shift() as ProtocolErrorCode;
820-
if (errorCode !== ProtocolErrorCode.NoError) {
821-
switch (errorCode) {
822-
case ProtocolErrorCode.WriteKeyError:
823-
return err<ProtocolWriteKeyError>({
824-
type: "ProtocolWriteKeyError",
825-
ownerId,
826-
});
827-
case ProtocolErrorCode.WriteError:
828-
return err<ProtocolWriteError>({
829-
type: "ProtocolWriteError",
830-
ownerId,
831-
});
832-
case ProtocolErrorCode.SyncError:
833-
return err<ProtocolSyncError>({
834-
type: "ProtocolSyncError",
835-
ownerId,
836-
});
837-
default:
838-
throw new ProtocolDecodeError(
839-
`Invalid ProtocolErrorCode: ${errorCode}`,
840-
);
841-
}
820+
const messageType = input.shift() as MessageType;
821+
assert(
822+
messageType === MessageType.Response ||
823+
messageType === MessageType.Broadcast,
824+
"Invalid MessageType",
825+
);
826+
827+
if (messageType === MessageType.Response) {
828+
const errorCode = input.shift() as ProtocolErrorCode;
829+
if (errorCode !== ProtocolErrorCode.NoError) {
830+
switch (errorCode) {
831+
case ProtocolErrorCode.WriteKeyError:
832+
return err<ProtocolWriteKeyError>({
833+
type: "ProtocolWriteKeyError",
834+
ownerId,
835+
});
836+
case ProtocolErrorCode.WriteError:
837+
return err<ProtocolWriteError>({
838+
type: "ProtocolWriteError",
839+
ownerId,
840+
});
841+
case ProtocolErrorCode.SyncError:
842+
return err<ProtocolSyncError>({
843+
type: "ProtocolSyncError",
844+
ownerId,
845+
});
846+
default:
847+
throw new ProtocolDecodeError(
848+
`Invalid ProtocolErrorCode: ${errorCode}`,
849+
);
842850
}
843851
}
852+
}
844853

845-
const messages = decodeMessages(input);
846-
const binaryOwnerId = ownerIdToBinaryOwnerId(ownerId);
854+
const messages = decodeMessages(input);
855+
const binaryOwnerId = ownerIdToBinaryOwnerId(ownerId);
847856

848-
// TODO: async writeMessages via the main thread validation and processing
849-
// pipeline. storage.writeMessages will use mutex/asyncqueue imho
850-
if (
851-
isNonEmptyReadonlyArray(messages) &&
852-
!deps.storage.writeMessages(binaryOwnerId, messages)
853-
) {
854-
return ok({ type: "no-response" });
855-
}
857+
if (
858+
isNonEmptyReadonlyArray(messages) &&
859+
!(await deps.storage.writeMessages(binaryOwnerId, messages))
860+
) {
861+
return ok({ type: "no-response" });
862+
}
856863

857-
// Now: No writeKey, no sync.
858-
// TODO: Allow to sync SharedReadonlyOwner
859-
// Without local changes, writeKey will not be required.
860-
// With local changes, writeKey will be required and if not provided,
861-
// the sync should stop.
862-
// getWriteKey should be moved to sync fn.
863-
const writeKey = options.getWriteKey?.(ownerId);
864-
if (writeKey == null) {
865-
return ok({ type: "no-response" });
866-
}
864+
// Now: No writeKey, no sync.
865+
// TODO: Allow to sync SharedReadonlyOwner
866+
// Without local changes, writeKey will not be required.
867+
// With local changes, writeKey will be required and if not provided,
868+
// the sync should stop.
869+
// getWriteKey should be moved to sync fn.
870+
const writeKey = options.getWriteKey?.(ownerId);
871+
if (writeKey == null) {
872+
return ok({ type: "no-response" });
873+
}
867874

868-
if (messageType === MessageType.Broadcast) {
869-
return ok({ type: "broadcast" });
870-
}
875+
if (messageType === MessageType.Broadcast) {
876+
return ok({ type: "broadcast" });
877+
}
871878

872-
const ranges = decodeRanges(input);
879+
const ranges = decodeRanges(input);
873880

874-
if (!isNonEmptyReadonlyArray(ranges)) {
875-
return ok({ type: "no-response" });
876-
}
881+
if (!isNonEmptyReadonlyArray(ranges)) {
882+
return ok({ type: "no-response" });
883+
}
877884

878-
const output = createProtocolMessageBuffer(ownerId, {
879-
messageType: MessageType.Request,
880-
writeKey,
881-
totalMaxSize: options.totalMaxSize,
882-
rangesMaxSize: options.rangesMaxSize,
883-
});
885+
const output = createProtocolMessageBuffer(ownerId, {
886+
messageType: MessageType.Request,
887+
writeKey,
888+
totalMaxSize: options.totalMaxSize,
889+
rangesMaxSize: options.rangesMaxSize,
890+
});
884891

885-
const syncResult = sync(deps)(ranges, output, binaryOwnerId);
892+
const syncResult = sync(deps)(ranges, output, binaryOwnerId);
886893

887-
// Client sync error (handled via Storage) or no changes.
888-
if (!syncResult.ok || !syncResult.value) {
889-
return ok({ type: "no-response" });
890-
}
894+
// Client sync error (handled via Storage) or no changes.
895+
if (!syncResult.ok || !syncResult.value) {
896+
return ok({ type: "no-response" });
897+
}
891898

892-
return ok({ type: "response", message: output.unwrap() });
893-
},
894-
);
899+
return ok({ type: "response", message: output.unwrap() });
900+
} catch (error) {
901+
return err<ProtocolInvalidDataError>({
902+
type: "ProtocolInvalidDataError",
903+
data: inputMessage,
904+
error,
905+
});
906+
}
907+
};
895908

896909
export interface ApplyProtocolMessageAsRelayOptions {
897910
/** To subscribe an owner for broadcasting. */
@@ -923,16 +936,17 @@ export interface ApplyProtocolMessageAsRelayResult {
923936

924937
export const applyProtocolMessageAsRelay =
925938
(deps: StorageDep) =>
926-
(
939+
async (
927940
inputMessage: Uint8Array,
928941
options: ApplyProtocolMessageAsRelayOptions = {},
929942
/** For testing purposes only; should not be used in production. */
930943
version = protocolVersion,
931-
): Result<ApplyProtocolMessageAsRelayResult, ProtocolInvalidDataError> =>
932-
tryDecodeProtocolData<
933-
ApplyProtocolMessageAsRelayResult,
934-
ProtocolInvalidDataError
935-
>(inputMessage, (input) => {
944+
): Promise<
945+
Result<ApplyProtocolMessageAsRelayResult, ProtocolInvalidDataError>
946+
> => {
947+
// try-catch instead of Result for performance and stacktraces
948+
try {
949+
const input = createBuffer(inputMessage);
936950
const [requestedVersion, ownerId] = decodeVersionAndOwner(input);
937951
const binaryOwnerId = ownerIdToBinaryOwnerId(ownerId);
938952

@@ -1021,19 +1035,15 @@ export const applyProtocolMessageAsRelay =
10211035
options.broadcast(ownerId, broadcastBuffer.unwrap());
10221036
}
10231037

1024-
const messagesWritten = deps.storage.writeMessages(
1025-
binaryOwnerId,
1026-
messages,
1027-
);
1028-
1029-
if (!messagesWritten)
1038+
if (!(await deps.storage.writeMessages(binaryOwnerId, messages))) {
10301039
return ok({
10311040
type: "response",
10321041
message: createProtocolMessageBuffer(ownerId, {
10331042
messageType: MessageType.Response,
10341043
errorCode: ProtocolErrorCode.WriteError,
10351044
}).unwrap(),
10361045
});
1046+
}
10371047
}
10381048

10391049
const ranges = decodeRanges(input);
@@ -1062,27 +1072,14 @@ export const applyProtocolMessageAsRelay =
10621072

10631073
// Non-initiators always respond to provide sync completion feedback,
10641074
return ok({ type: "response", message });
1065-
});
1066-
1067-
/**
1068-
* Wraps Evolu Protocol decoding functions, which use exceptions instead of
1069-
* {@link Result} to provide stack traces for debugging and reduce allocation
1070-
* overhead in success cases.
1071-
*/
1072-
const tryDecodeProtocolData = <T, E>(
1073-
data: Uint8Array,
1074-
callback: (buffer: Buffer) => Result<T, E | ProtocolInvalidDataError>,
1075-
) => {
1076-
try {
1077-
return callback(createBuffer(data));
1078-
} catch (error: unknown) {
1079-
return err<ProtocolInvalidDataError>({
1080-
type: "ProtocolInvalidDataError",
1081-
data,
1082-
error,
1083-
});
1084-
}
1085-
};
1075+
} catch (error) {
1076+
return err<ProtocolInvalidDataError>({
1077+
type: "ProtocolInvalidDataError",
1078+
data: inputMessage,
1079+
error,
1080+
});
1081+
}
1082+
};
10861083

10871084
const decodeVersionAndOwner = (input: Buffer): [NonNegativeInt, OwnerId] => {
10881085
// This structure must never change across protocol versions. The version
@@ -1611,11 +1608,10 @@ export const decryptAndDecodeDbChange =
16111608
| SymmetricCryptoDecryptError
16121609
| ProtocolInvalidDataError
16131610
| ProtocolTimestampMismatchError
1614-
> =>
1615-
tryDecodeProtocolData<
1616-
DbChange,
1617-
SymmetricCryptoDecryptError | ProtocolTimestampMismatchError
1618-
>(message.change, (buffer) => {
1611+
> => {
1612+
// try-catch instead of Result for performance and stacktraces
1613+
try {
1614+
const buffer = createBuffer(message.change);
16191615
const nonce = buffer.shiftN(deps.symmetricCrypto.nonceLength);
16201616

16211617
const ciphertextLength = decodeLength(buffer);
@@ -1666,7 +1662,14 @@ export const decryptAndDecodeDbChange =
16661662
const dbChange = { table, id, values };
16671663

16681664
return ok(dbChange);
1669-
});
1665+
} catch (error) {
1666+
return err<ProtocolInvalidDataError>({
1667+
type: "ProtocolInvalidDataError",
1668+
data: message.change,
1669+
error,
1670+
});
1671+
}
1672+
};
16701673

16711674
/**
16721675
* Encodes a non-negative integer into a variable-length integer format. It's

packages/common/src/Evolu/Relay.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,9 @@ export const createRelayStorage =
108108
return true;
109109
},
110110

111-
writeMessages: (ownerId, messages) => {
111+
// https://eslint.org/docs/latest/rules/require-await#when-not-to-use-it
112+
// eslint-disable-next-line @typescript-eslint/require-await
113+
writeMessages: async (ownerId, messages) => {
112114
const result = deps.sqlite.transaction(() => {
113115
for (const message of messages) {
114116
const insertTimestampResult =

0 commit comments

Comments
 (0)