From b744d2dd0c25205e0ee86c1b1f5163e3e687c9c6 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 8 Jun 2026 19:19:15 +0100 Subject: [PATCH 1/4] Fix flaky TestPartialStateJoin/Outgoing_device_list_updates from a leave-handler race `WithWaitForLeave` registered a PDU handler (whose callback feeds `leaveChannel`) but then decided whether to wait by peeking at `room.CurrentState`. Because an incoming PDU is added to the room (`HandleTransactionRequests` calls `room.AddEvent`, updating `CurrentState`) *before* the PDU callback runs, the test goroutine could observe the updated state, take the "already seen" shortcut and return, firing the deferred `removePDUHandler()` in the window before the callback ran. The now-handler-less callback then flagged the (expected) leave as an unexpected PDU, failing the test. Wait on the channel fed by the handler instead of polling `room.CurrentState`, so the handler stays registered until its own callback has run. Every caller passes a joined user and an action that performs the leave, so the leave always arrives fresh via that callback. Make this contract explicit: snapshot membership before the action and fail if the user has already left (rather than silently treating it as "already seen"), and fail on timeout, since neither should happen in correct usage. The action still runs in the already-left case so any cleanup it performs is preserved. The `HandleTransactionRequests` ordering (callback after `room.AddEvent`) is left unchanged, as other tests rely on it (e.g. `federation_redaction_test.go` reads `serverRoom.Timeline` immediately after a callback-driven waiter). --- ...federation_room_join_partial_state_test.go | 37 ++++++++++++------- 1 file changed, 24 insertions(+), 13 deletions(-) diff --git a/tests/msc3902/federation_room_join_partial_state_test.go b/tests/msc3902/federation_room_join_partial_state_test.go index 9e11b163d..3bbda80ae 100644 --- a/tests/msc3902/federation_room_join_partial_state_test.go +++ b/tests/msc3902/federation_room_join_partial_state_test.go @@ -141,23 +141,34 @@ func (s *server) WithWaitForLeave( ) defer removePDUHandler() - leaveAction() - + // `WithWaitForLeave` expects the user to be in the room and to leave as a + // result of the action, so that the leave arrives as a PDU our handler + // matches. Snapshot the membership *before* the action: if the user has + // already left, no such PDU is coming, which is a test bug rather than + // something to wait for. memberEvent := room.CurrentState("m.room.member", userID) - membership := "" + membership := "leave" if memberEvent != nil { membership, _ = memberEvent.Membership() } - if membership == "leave" { - t.Logf("%s has already seen %s leave test room %s.", s.ServerName(), userID, room.RoomID) - } else { - select { - case <-leaveChannel: - t.Logf("%s saw %s leave test room %s.", s.ServerName(), userID, room.RoomID) - break - case <-time.After(1 * time.Second): - t.Errorf("%s timed out waiting for %s to leave test room %s.", s.ServerName(), userID, room.RoomID) - } + alreadyLeft := membership == "leave" + if alreadyLeft { + t.Errorf("%s: %s had already left test room %s before WithWaitForLeave ran.", s.ServerName(), userID, room.RoomID) + } + + leaveAction() + + // Wait on the channel fed by our PDU handler rather than polling + // `room.CurrentState`: the room's current state is updated (by + // `room.AddEvent`) *before* the PDU callback runs, so returning on a + // `CurrentState` check could deregister our handler in the window before + // the callback fires, making the (expected) leave look unexpected to + // `HandleTransactionRequests`. + select { + case <-leaveChannel: + t.Logf("%s saw %s leave test room %s.", s.ServerName(), userID, room.RoomID) + case <-time.After(1 * time.Second): + t.Errorf("%s timed out waiting for %s to leave test room %s.", s.ServerName(), userID, room.RoomID) } } From b48b4ebe15922c0873039e41525839073d855fc7 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 9 Jun 2026 20:59:46 +0100 Subject: [PATCH 2/4] Update tests/msc3902/federation_room_join_partial_state_test.go Co-authored-by: Eric Eastwood --- tests/msc3902/federation_room_join_partial_state_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/msc3902/federation_room_join_partial_state_test.go b/tests/msc3902/federation_room_join_partial_state_test.go index 3bbda80ae..0557fdd87 100644 --- a/tests/msc3902/federation_room_join_partial_state_test.go +++ b/tests/msc3902/federation_room_join_partial_state_test.go @@ -153,7 +153,7 @@ func (s *server) WithWaitForLeave( } alreadyLeft := membership == "leave" if alreadyLeft { - t.Errorf("%s: %s had already left test room %s before WithWaitForLeave ran.", s.ServerName(), userID, room.RoomID) + t.Fatalf("%s: %s had already left test room %s before WithWaitForLeave ran.", s.ServerName(), userID, room.RoomID) } leaveAction() From a5030682057fc20234d44a5dac19dcc1615f378a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 10 Jun 2026 16:30:04 +0100 Subject: [PATCH 3/4] Fixup withWaitForLeave We need to always run the `leaveAction`, as the rest of the code assumes it will always run. We also need to handle the case the user isn't in the room anymore, as that is a valid situation too. --- ...federation_room_join_partial_state_test.go | 96 +++++++++++++------ 1 file changed, 69 insertions(+), 27 deletions(-) diff --git a/tests/msc3902/federation_room_join_partial_state_test.go b/tests/msc3902/federation_room_join_partial_state_test.go index 0557fdd87..e145237fa 100644 --- a/tests/msc3902/federation_room_join_partial_state_test.go +++ b/tests/msc3902/federation_room_join_partial_state_test.go @@ -123,10 +123,15 @@ func (s *server) AddEDUHandler(eduHandler func(gomatrixserverlib.EDU) bool) func } } -// WithWaitForLeave runs the given action and waits for the user to leave the room. +// WithWaitForLeave runs the given action and, when the resulting leave is +// expected to reach this server, waits for it. `leaveAction` is always run; the +// wait is skipped when `user` had already left the room (per their own +// homeserver, so the action produces no new leave) or when this server isn't in +// the room (so the leave won't be federated to us). func (s *server) WithWaitForLeave( - t *testing.T, room *federation.ServerRoom, userID string, leaveAction func(), + t *testing.T, room *federation.ServerRoom, user *client.CSAPI, leaveAction func(), ) { + userID := user.UserID leaveChannel := make(chan gomatrixserverlib.PDU, 10) removePDUHandler := s.AddPDUHandler( func(e gomatrixserverlib.PDU) bool { @@ -141,29 +146,37 @@ func (s *server) WithWaitForLeave( ) defer removePDUHandler() - // `WithWaitForLeave` expects the user to be in the room and to leave as a - // result of the action, so that the leave arrives as a PDU our handler - // matches. Snapshot the membership *before* the action: if the user has - // already left, no such PDU is coming, which is a test bug rather than - // something to wait for. - memberEvent := room.CurrentState("m.room.member", userID) - membership := "leave" - if memberEvent != nil { - membership, _ = memberEvent.Membership() - } - alreadyLeft := membership == "leave" - if alreadyLeft { - t.Fatalf("%s: %s had already left test room %s before WithWaitForLeave ran.", s.ServerName(), userID, room.RoomID) - } + // We need to check if the user (on their homeserver) thinks they're in the + // room, before performing the `leaveAction` (to avoid races). If they are + // in the room we need to wait until the server sees the leave, but if they + // aren't no leave event will be sent out. + userInRoom := userIsJoinedTo(t, user, room.RoomID) leaveAction() - // Wait on the channel fed by our PDU handler rather than polling + if !userInRoom { + // The user had already left, so the action produced no new leave and + // none is coming: don't wait. + t.Logf("%s is not joined to test room %s; not waiting for them to leave.", userID, room.RoomID) + return + } + + if !s.isInRoom(room) { + // The homeserver only federates the leave to servers that are in the + // room. If we aren't, no leave PDU is coming to us, so don't block until + // the timeout. + t.Logf("%s is not in test room %s; not waiting for %s to leave.", s.ServerName(), room.RoomID, userID) + return + } + + // Otherwise the action triggered the leave, which arrives as a PDU our + // handler matches. Wait on its channel rather than polling // `room.CurrentState`: the room's current state is updated (by // `room.AddEvent`) *before* the PDU callback runs, so returning on a - // `CurrentState` check could deregister our handler in the window before - // the callback fires, making the (expected) leave look unexpected to - // `HandleTransactionRequests`. + // `CurrentState` check could deregister our handler in the window before the + // callback fires, making the (expected) leave look unexpected to + // `HandleTransactionRequests`. This returns as soon as the leave arrives; the + // timeout is only a ceiling for declaring failure. select { case <-leaveChannel: t.Logf("%s saw %s leave test room %s.", s.ServerName(), userID, room.RoomID) @@ -172,6 +185,35 @@ func (s *server) WithWaitForLeave( } } +// isInRoom reports whether this Complement server has a joined user in the room, +// according to its own `ServerRoom` view. The server reliably tracks its own +// users' membership (it created their join/leave events), so this answers "will +// the homeserver federate events in this room to us?". +func (s *server) isInRoom(room *federation.ServerRoom) bool { + for _, serverInRoom := range room.ServersInRoom() { + if serverInRoom == s.ServerName() { + return true + } + } + return false +} + +// userIsJoinedTo reports whether the user is currently joined to the room, +// according to the user's own homeserver. This is the authoritative answer to +// "will a leave still be sent?", unlike a Complement server's local `ServerRoom` +// view which may not track the user's membership at all. +func userIsJoinedTo(t *testing.T, user *client.CSAPI, roomID string) bool { + t.Helper() + res := user.MustDo(t, "GET", []string{"_matrix", "client", "v3", "joined_rooms"}) + joinedRooms := gjson.ParseBytes(client.ParseJSON(t, res)).Get("joined_rooms") + for _, joinedRoom := range joinedRooms.Array() { + if joinedRoom.Str == roomID { + return true + } + } + return false +} + // Wait for the server to receive the event with given event ID. func (s *server) WaitForEvent( t *testing.T, room *federation.ServerRoom, eventID string, @@ -2260,7 +2302,7 @@ func TestPartialStateJoin(t *testing.T) { // @t23alice:hs1 joins the room. psjResult := beginPartialStateJoin(t, server1, room, alice) - defer server2.WithWaitForLeave(t, server2Room, alice.UserID, func() { psjResult.Destroy(t) }) + defer server2.WithWaitForLeave(t, server2Room, alice, func() { psjResult.Destroy(t) }) // Both homeservers should receive device list updates. renameDevice(t, alice, "A new device name 1") @@ -2307,7 +2349,7 @@ func TestPartialStateJoin(t *testing.T) { ) // NB: We register the `psjResult.Destroy()` cleanup twice. This is alright because it // is idempotent. Here we wait for server 2 to observe the leave too. - defer server2.WithWaitForLeave(t, server2Room, alice.UserID, func() { psjResult.Destroy(t) }) + defer server2.WithWaitForLeave(t, server2Room, alice, func() { psjResult.Destroy(t) }) joinEvent := room.CurrentState("m.room.member", server2.UserID("elsie")) server1.MustSendTransaction(t, deployment, deployment.GetFullyQualifiedHomeserverName(t, "hs1"), []json.RawMessage{joinEvent.JSON()}, nil) awaitEventViaSync(t, alice, room.RoomID, joinEvent.EventID(), "") @@ -2484,7 +2526,7 @@ func TestPartialStateJoin(t *testing.T) { syncToken = awaitEventViaSync(t, alice, partialStateRoom.RoomID, leaveEvent.EventID(), syncToken) leaveSharedRoom = func() { - server2.WithWaitForLeave(t, server2Room, alice.UserID, func() { + server2.WithWaitForLeave(t, server2Room, alice, func() { alice.MustLeaveRoom(t, roomID) }) } @@ -2544,7 +2586,7 @@ func TestPartialStateJoin(t *testing.T) { // @t26alice:hs1 joins the room, followed by @elsie:server2. // @elsie:server2 is kicked with an invalid event. syncToken, server2Room, psjResult := setupIncorrectlyAcceptedKick(t, deployment, alice, server1, server2, deviceListUpdateChannel1, deviceListUpdateChannel2, room) - defer server2.WithWaitForLeave(t, server2Room, alice.UserID, func() { psjResult.Destroy(t) }) + defer server2.WithWaitForLeave(t, server2Room, alice, func() { psjResult.Destroy(t) }) // @t26alice:hs1 sends out a device list update which is missed by @elsie:server2. // @elsie:server2 must receive missed device list updates once the partial state join finishes. @@ -2604,7 +2646,7 @@ func TestPartialStateJoin(t *testing.T) { federation.WithPartialState(), ) psjResult := beginPartialStateJoin(t, server1, room, alice) - defer server2.WithWaitForLeave(t, server2Room, alice.UserID, func() { psjResult.Destroy(t) }) + defer server2.WithWaitForLeave(t, server2Room, alice, func() { psjResult.Destroy(t) }) // @t28alice:hs1 sends out a device list update which is missed by @elsie:server2. // @elsie:server2 must receive missed device list updates once the partial state join finishes. @@ -3007,7 +3049,7 @@ func TestPartialStateJoin(t *testing.T) { }, client.SyncJoinedTo(server.UserID("charlie"), otherRoomID), ) - defer server.WithWaitForLeave(t, otherRoom, alice.UserID, func() { alice.MustLeaveRoom(t, otherRoomID) }) + defer server.WithWaitForLeave(t, otherRoom, alice, func() { alice.MustLeaveRoom(t, otherRoomID) }) // Depending on the homeserver implementation, @t31alice:hs1 must have been told that either: // * charlie updated their device list, or @@ -4433,7 +4475,7 @@ func (psj *partialStateJoinResult) Destroy(t *testing.T) { psj.Server.WithWaitForLeave( t, psj.ServerRoom, - psj.User.UserID, + psj.User, func() { psj.User.MustLeaveRoom(t, psj.ServerRoom.RoomID) }, ) } From d9da09afce45931d702cfc60baaa3c3272b3b04d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 11 Jun 2026 11:38:26 +0100 Subject: [PATCH 4/4] Update tests/msc3902/federation_room_join_partial_state_test.go Co-authored-by: Eric Eastwood --- tests/msc3902/federation_room_join_partial_state_test.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/tests/msc3902/federation_room_join_partial_state_test.go b/tests/msc3902/federation_room_join_partial_state_test.go index e145237fa..c37adffd9 100644 --- a/tests/msc3902/federation_room_join_partial_state_test.go +++ b/tests/msc3902/federation_room_join_partial_state_test.go @@ -199,9 +199,7 @@ func (s *server) isInRoom(room *federation.ServerRoom) bool { } // userIsJoinedTo reports whether the user is currently joined to the room, -// according to the user's own homeserver. This is the authoritative answer to -// "will a leave still be sent?", unlike a Complement server's local `ServerRoom` -// view which may not track the user's membership at all. +// according to the user's own homeserver. func userIsJoinedTo(t *testing.T, user *client.CSAPI, roomID string) bool { t.Helper() res := user.MustDo(t, "GET", []string{"_matrix", "client", "v3", "joined_rooms"})