Skip to content

Commit 4c6f403

Browse files
committed
Merge pull request 'Possible solution for #23249: prevent grey-out because of collision between pings and packets' (#12) from feature/23249 into develop
2 parents e2f2363 + 34f3d7e commit 4c6f403

3 files changed

Lines changed: 82 additions & 14 deletions

File tree

src/main/java/eu/openanalytics/containerproxy/service/HeartbeatService.java

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import eu.openanalytics.containerproxy.model.runtime.ProxyStatus;
4545
import eu.openanalytics.containerproxy.util.DelegatingStreamSinkConduit;
4646
import eu.openanalytics.containerproxy.util.DelegatingStreamSourceConduit;
47+
import eu.openanalytics.containerproxy.util.ChannelActiveListener;
4748
import io.undertow.server.HttpServerExchange;
4849
import io.undertow.server.protocol.http.HttpServerConnection;
4950

@@ -127,27 +128,38 @@ private void wrapChannels(StreamConnection streamConn) {
127128
if (!streamConn.isOpen()) return;
128129

129130
ConduitStreamSinkChannel sinkChannel = streamConn.getSinkChannel();
130-
DelegatingStreamSinkConduit conduitWrapper = new DelegatingStreamSinkConduit(sinkChannel.getConduit(), null);
131+
ChannelActiveListener writeListener = new ChannelActiveListener();
132+
DelegatingStreamSinkConduit conduitWrapper = new DelegatingStreamSinkConduit(sinkChannel.getConduit(), writeListener);
131133
sinkChannel.setConduit(conduitWrapper);
132134

133135
ConduitStreamSourceChannel sourceChannel = streamConn.getSourceChannel();
134136
DelegatingStreamSourceConduit srcConduitWrapper = new DelegatingStreamSourceConduit(sourceChannel.getConduit(), data -> checkPong(data));
135137
sourceChannel.setConduit(srcConduitWrapper);
136138

137-
heartbeatExecutor.schedule(() -> sendPing(streamConn), getHeartbeatRate(), TimeUnit.MILLISECONDS);
139+
heartbeatExecutor.schedule(() -> sendPing(writeListener, streamConn), getHeartbeatRate(), TimeUnit.MILLISECONDS);
138140
}
139141

140-
private void sendPing(StreamConnection streamConn) {
142+
private void sendPing(ChannelActiveListener writeListener, StreamConnection streamConn) {
143+
if (writeListener.isActive(getHeartbeatRate())) {
144+
// active means that data was written to the channel in the least heartbeat interval
145+
// therefore we don't send a ping now to not cause collisions
146+
147+
// reschedule ping
148+
heartbeatExecutor.schedule(() -> sendPing(writeListener, streamConn), getHeartbeatRate(), TimeUnit.MILLISECONDS);
149+
// mark as we received a heartbeat
150+
heartbeatReceived(proxyId);
151+
return;
152+
}
141153
if (!streamConn.isOpen()) return;
142154

143155
try {
144-
streamConn.getSinkChannel().write(ByteBuffer.wrap(WEBSOCKET_PING));
156+
((DelegatingStreamSinkConduit) streamConn.getSinkChannel().getConduit()).writeWithoutNotifying(ByteBuffer.wrap(WEBSOCKET_PING));
145157
streamConn.getSinkChannel().flush();
146158
} catch (IOException e) {
147159
// Ignore failure, keep trying as long as the stream connection is valid.
148160
}
149161

150-
heartbeatExecutor.schedule(() -> sendPing(streamConn), getHeartbeatRate(), TimeUnit.MILLISECONDS);
162+
heartbeatExecutor.schedule(() -> sendPing(writeListener, streamConn), getHeartbeatRate(), TimeUnit.MILLISECONDS);
151163
}
152164

153165
private void checkPong(byte[] response) {
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/**
2+
* ContainerProxy
3+
*
4+
* Copyright (C) 2016-2020 Open Analytics
5+
*
6+
* ===========================================================================
7+
*
8+
* This program is free software: you can redistribute it and/or modify
9+
* it under the terms of the Apache License as published by
10+
* The Apache Software Foundation, either version 2 of the License, or
11+
* (at your option) any later version.
12+
*
13+
* This program is distributed in the hope that it will be useful,
14+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
15+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16+
* Apache License for more details.
17+
*
18+
* You should have received a copy of the Apache License
19+
* along with this program. If not, see <http://www.apache.org/licenses/>
20+
*/
21+
package eu.openanalytics.containerproxy.util;
22+
23+
/**
24+
* A listener that keeps track of whether a channel is active.
25+
*
26+
*/
27+
public class ChannelActiveListener implements Runnable {
28+
29+
private long lastWrite = 0;
30+
31+
@Override
32+
public void run() {
33+
lastWrite = System.currentTimeMillis();
34+
}
35+
36+
/**
37+
* Checks whether the channel was active in the provided period.
38+
*/
39+
public boolean isActive(long period) {
40+
long diff = System.currentTimeMillis() - lastWrite;
41+
42+
// make sure the period is at least 5 seconds
43+
// this ensures that when the socket is active, the ping is delayed for at least 5 seconds
44+
if (period < 5000) {
45+
period = 5000;
46+
}
47+
48+
if (diff <= period) {
49+
return true;
50+
}
51+
return false;
52+
}
53+
54+
}

src/main/java/eu/openanalytics/containerproxy/util/DelegatingStreamSinkConduit.java

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,11 @@
3535
public class DelegatingStreamSinkConduit implements StreamSinkConduit {
3636

3737
private StreamSinkConduit delegate;
38-
private Consumer<byte[]> writeListener;
38+
private Runnable writeListener;
3939

40-
public DelegatingStreamSinkConduit(StreamSinkConduit delegate, Consumer<byte[]> writeListener) {
40+
41+
42+
public DelegatingStreamSinkConduit(StreamSinkConduit delegate, Runnable writeListener) {
4143
this.delegate = delegate;
4244
this.writeListener = writeListener;
4345
}
@@ -119,14 +121,14 @@ public long transferFrom(StreamSourceChannel source, long count, ByteBuffer thro
119121

120122
@Override
121123
public int write(ByteBuffer src) throws IOException {
122-
if (writeListener == null) {
123-
return delegate.write(src);
124-
} else {
125-
byte[] data = new byte[src.remaining()];
126-
src.get(data);
127-
writeListener.accept(data);
128-
return delegate.write(ByteBuffer.wrap(data));
124+
if (writeListener != null) {
125+
writeListener.run();
129126
}
127+
return delegate.write(src);
128+
}
129+
130+
public int writeWithoutNotifying(ByteBuffer src) throws IOException {
131+
return delegate.write(src);
130132
}
131133

132134
@Override

0 commit comments

Comments
 (0)