2020 */
2121package eu .openanalytics .containerproxy .service .hearbeat ;
2222
23+ import com .google .common .collect .ArrayListMultimap ;
24+ import com .google .common .collect .ListMultimap ;
25+ import com .google .common .collect .Multimaps ;
2326import eu .openanalytics .containerproxy .service .session .ISessionService ;
2427import eu .openanalytics .containerproxy .util .ChannelActiveListener ;
2528import eu .openanalytics .containerproxy .util .DelegatingStreamSinkConduit ;
2932import org .apache .logging .log4j .LogManager ;
3033import org .apache .logging .log4j .Logger ;
3134import org .springframework .context .annotation .Lazy ;
35+ import org .springframework .context .event .EventListener ;
3236import org .springframework .core .env .Environment ;
3337import org .springframework .scheduling .annotation .Async ;
38+ import org .springframework .security .web .session .HttpSessionDestroyedEvent ;
3439import org .xnio .StreamConnection ;
3540import org .xnio .conduits .ConduitStreamSinkChannel ;
3641import org .xnio .conduits .ConduitStreamSourceChannel ;
@@ -70,7 +75,7 @@ public enum HeartbeatSource {
7075 */
7176 INTERNAL ,
7277 /**
73- * Hearbeat send because of a fallback heartbeat request.
78+ * Heartbeat send because of a fallback heartbeat request.
7479 */
7580 FALLBACK
7681 }
@@ -87,6 +92,10 @@ public enum HeartbeatSource {
8792
8893 private final List <IHeartbeatProcessor > heartbeatProcessors ;
8994
95+ // keep track of the HeartbeatConnector for every SessionId so that the websocket connection can be closed
96+ // when the user logs out from that session. This is required for apps that keep running even if when the user signs out.
97+ private final ListMultimap <String , HeartbeatConnector > heartbeatConnectors = Multimaps .synchronizedListMultimap (ArrayListMultimap .create ());
98+
9099 public HeartbeatService (List <IHeartbeatProcessor > heartbeatProcessors ) {
91100 this .heartbeatProcessors = heartbeatProcessors ;
92101 }
@@ -95,10 +104,11 @@ public void attachHeartbeatChecker(HttpServerExchange exchange, String proxyId)
95104 if (exchange .isUpgrade ()) {
96105 // For websockets, attach a ping-pong listener to the underlying TCP channel.
97106 String sessionId = sessionService .extractSessionIdFromExchange (exchange );
98- HeartbeatConnector connector = new HeartbeatConnector (proxyId , sessionId );
99- // Delay the wrapping, because Undertow will make changes to the channel while the upgrade is being performed.
100107 HttpServerConnection httpConn = (HttpServerConnection ) exchange .getConnection ();
108+ HeartbeatConnector connector = new HeartbeatConnector (proxyId , sessionId , httpConn .getChannel ());
109+ // Delay the wrapping, because Undertow will make changes to the channel while the upgrade is being performed.
101110 heartbeatExecutor .schedule (() -> connector .wrapChannels (httpConn .getChannel ()), 3000 , TimeUnit .MILLISECONDS );
111+ heartbeatConnectors .put (sessionId , connector );
102112 } else {
103113 // For regular HTTP requests, just trigger one heartbeat.
104114 self .heartbeatReceived (HeartbeatSource .HTTP_REQUEST , proxyId , null );
@@ -125,27 +135,39 @@ public long getHeartbeatRate() {
125135 return environment .getProperty (ActiveProxiesService .PROP_RATE , Long .class , ActiveProxiesService .DEFAULT_RATE );
126136 }
127137
138+ @ EventListener
139+ public void onSessionDestroyedEvent (HttpSessionDestroyedEvent event ) {
140+ // stop every websocket connection started by the session
141+ heartbeatConnectors .get (event .getId ()).forEach (HeartbeatConnector ::closeConnection );
142+ // remove the session from the map
143+ heartbeatConnectors .removeAll (event .getId ());
144+ }
145+
128146 private class HeartbeatConnector {
129147
130148 private final String proxyId ;
131149
132150 private final String sessionId ;
133151
134- private HeartbeatConnector (String proxyId , String sessionId ) {
152+ private StreamConnection streamConnection ;
153+
154+ private HeartbeatConnector (String proxyId , String sessionId , StreamConnection streamConnection ) {
135155 this .proxyId = proxyId ;
136156 this .sessionId = sessionId ;
157+ this .streamConnection = streamConnection ;
137158 }
138159
139160 private void wrapChannels (StreamConnection streamConn ) {
140161 if (!streamConn .isOpen ()) return ;
162+ this .streamConnection = streamConn ; // save final streamConnection
141163
142164 ConduitStreamSinkChannel sinkChannel = streamConn .getSinkChannel ();
143165 ChannelActiveListener writeListener = new ChannelActiveListener ();
144166 DelegatingStreamSinkConduit conduitWrapper = new DelegatingStreamSinkConduit (sinkChannel .getConduit (), writeListener );
145167 sinkChannel .setConduit (conduitWrapper );
146168
147169 ConduitStreamSourceChannel sourceChannel = streamConn .getSourceChannel ();
148- DelegatingStreamSourceConduit srcConduitWrapper = new DelegatingStreamSourceConduit (sourceChannel .getConduit (), data -> checkPong ( data ) );
170+ DelegatingStreamSourceConduit srcConduitWrapper = new DelegatingStreamSourceConduit (sourceChannel .getConduit (), this :: checkPong );
149171 sourceChannel .setConduit (srcConduitWrapper );
150172
151173 heartbeatExecutor .schedule (() -> sendPing (writeListener , streamConn ), getHeartbeatRate (), TimeUnit .MILLISECONDS );
@@ -179,6 +201,20 @@ private void checkPong(byte[] response) {
179201 self .heartbeatReceived (HeartbeatSource .WEBSOCKET_PONG , proxyId , sessionId );
180202 }
181203 }
204+
205+ /**
206+ * Closes the WebSocket connection associated with this connector.
207+ */
208+ public void closeConnection () {
209+ try {
210+ if (streamConnection != null ) {
211+ streamConnection .close ();
212+ }
213+ } catch (Throwable e ) {
214+ // ignore error since we cannot do anything about it anyway
215+ }
216+ }
217+
182218 }
183219
184220}
0 commit comments