Skip to content

Commit c69f38f

Browse files
committed
Bug-fix: corrects message exchange cancellation logic in InternalHttpAsyncExecRuntime
* Fixes the problem with message exchanges over an existing persistent connection being non-cancellable * Response timeout not be applied to H2 endpoints * Test coverage
1 parent 30386d3 commit c69f38f

4 files changed

Lines changed: 421 additions & 22 deletions

File tree

Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
/*
2+
* ====================================================================
3+
* Licensed to the Apache Software Foundation (ASF) under one
4+
* or more contributor license agreements. See the NOTICE file
5+
* distributed with this work for additional information
6+
* regarding copyright ownership. The ASF licenses this file
7+
* to you under the Apache License, Version 2.0 (the
8+
* "License"); you may not use this file except in compliance
9+
* with the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing,
14+
* software distributed under the License is distributed on an
15+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
* KIND, either express or implied. See the License for the
17+
* specific language governing permissions and limitations
18+
* under the License.
19+
* ====================================================================
20+
*
21+
* This software consists of voluntary contributions made by many
22+
* individuals on behalf of the Apache Software Foundation. For more
23+
* information on the Apache Software Foundation, please see
24+
* <http://www.apache.org/>.
25+
*
26+
*/
27+
package org.apache.hc.client5.http.impl.async;
28+
29+
import java.io.IOException;
30+
import java.nio.ByteBuffer;
31+
import java.util.List;
32+
import java.util.concurrent.Future;
33+
import java.util.concurrent.atomic.AtomicBoolean;
34+
35+
import org.apache.hc.client5.http.HttpRoute;
36+
import org.apache.hc.client5.http.async.AsyncExecRuntime;
37+
import org.apache.hc.client5.http.config.TlsConfig;
38+
import org.apache.hc.client5.http.nio.AsyncClientConnectionManager;
39+
import org.apache.hc.client5.http.protocol.HttpClientContext;
40+
import org.apache.hc.core5.concurrent.BasicFuture;
41+
import org.apache.hc.core5.concurrent.Cancellable;
42+
import org.apache.hc.core5.concurrent.FutureContribution;
43+
import org.apache.hc.core5.http.EntityDetails;
44+
import org.apache.hc.core5.http.Header;
45+
import org.apache.hc.core5.http.HttpException;
46+
import org.apache.hc.core5.http.HttpHost;
47+
import org.apache.hc.core5.http.HttpResponse;
48+
import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
49+
import org.apache.hc.core5.http.nio.CapacityChannel;
50+
import org.apache.hc.core5.http.nio.DataStreamChannel;
51+
import org.apache.hc.core5.http.nio.RequestChannel;
52+
import org.apache.hc.core5.http.protocol.HttpContext;
53+
import org.apache.hc.core5.reactor.ConnectionInitiator;
54+
import org.slf4j.Logger;
55+
import org.slf4j.LoggerFactory;
56+
57+
public final class InternalTestHttpAsyncExecRuntime extends InternalHttpAsyncExecRuntime {
58+
59+
private static final Logger LOG = LoggerFactory.getLogger(InternalTestHttpAsyncExecRuntime.class);
60+
61+
private final AtomicBoolean cancelled;
62+
63+
public InternalTestHttpAsyncExecRuntime(final AsyncClientConnectionManager manager,
64+
final ConnectionInitiator connectionInitiator,
65+
final TlsConfig tlsConfig) {
66+
super(LOG, manager, connectionInitiator, null, tlsConfig);
67+
this.cancelled = new AtomicBoolean();
68+
}
69+
70+
public Future<Boolean> leaseAndConnect(final HttpHost target, final HttpClientContext context) {
71+
final BasicFuture<Boolean> resultFuture = new BasicFuture<>(null);
72+
acquireEndpoint("test", new HttpRoute(target), null, context, new FutureContribution<AsyncExecRuntime>(resultFuture) {
73+
74+
@Override
75+
public void completed(final AsyncExecRuntime runtime) {
76+
if (!runtime.isEndpointConnected()) {
77+
runtime.connectEndpoint(context, new FutureContribution<AsyncExecRuntime>(resultFuture) {
78+
79+
@Override
80+
public void completed(final AsyncExecRuntime runtime) {
81+
resultFuture.completed(true);
82+
}
83+
84+
});
85+
} else {
86+
resultFuture.completed(true);
87+
}
88+
}
89+
90+
});
91+
return resultFuture;
92+
}
93+
94+
@Override
95+
public Cancellable execute(final String id, final AsyncClientExchangeHandler exchangeHandler, final HttpClientContext context) {
96+
return super.execute(id, new AsyncClientExchangeHandler() {
97+
98+
public void cancel() {
99+
if (cancelled.compareAndSet(false, true)) {
100+
exchangeHandler.cancel();
101+
}
102+
}
103+
104+
public void failed(final Exception cause) {
105+
exchangeHandler.failed(cause);
106+
}
107+
108+
public void produceRequest(final RequestChannel channel, final HttpContext context) throws HttpException, IOException {
109+
exchangeHandler.produceRequest(channel, context);
110+
}
111+
112+
public void consumeResponse(final HttpResponse response, final EntityDetails entityDetails, final HttpContext context) throws HttpException, IOException {
113+
exchangeHandler.consumeResponse(response, entityDetails, context);
114+
}
115+
116+
public void consumeInformation(final HttpResponse response, final HttpContext context) throws HttpException, IOException {
117+
exchangeHandler.consumeInformation(response, context);
118+
}
119+
120+
public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
121+
exchangeHandler.updateCapacity(capacityChannel);
122+
}
123+
124+
public void consume(final ByteBuffer src) throws IOException {
125+
exchangeHandler.consume(src);
126+
}
127+
128+
public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
129+
exchangeHandler.streamEnd(trailers);
130+
}
131+
132+
public void releaseResources() {
133+
exchangeHandler.releaseResources();
134+
}
135+
136+
public int available() {
137+
return exchangeHandler.available();
138+
}
139+
140+
public void produce(final DataStreamChannel channel) throws IOException {
141+
exchangeHandler.produce(channel);
142+
}
143+
144+
}, context);
145+
}
146+
147+
public boolean isAborted() {
148+
return cancelled.get();
149+
}
150+
151+
}
Lines changed: 217 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,217 @@
1+
/*
2+
* ====================================================================
3+
* Licensed to the Apache Software Foundation (ASF) under one
4+
* or more contributor license agreements. See the NOTICE file
5+
* distributed with this work for additional information
6+
* regarding copyright ownership. The ASF licenses this file
7+
* to you under the Apache License, Version 2.0 (the
8+
* "License"); you may not use this file except in compliance
9+
* with the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing,
14+
* software distributed under the License is distributed on an
15+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
* KIND, either express or implied. See the License for the
17+
* specific language governing permissions and limitations
18+
* under the License.
19+
* ====================================================================
20+
*
21+
* This software consists of voluntary contributions made by many
22+
* individuals on behalf of the Apache Software Foundation. For more
23+
* information on the Apache Software Foundation, please see
24+
* <http://www.apache.org/>.
25+
*
26+
*/
27+
package org.apache.hc.client5.testing.async;
28+
29+
import java.net.InetSocketAddress;
30+
import java.util.concurrent.CancellationException;
31+
import java.util.concurrent.Future;
32+
import java.util.function.Consumer;
33+
34+
import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
35+
import org.apache.hc.client5.http.async.methods.SimpleRequestBuilder;
36+
import org.apache.hc.client5.http.async.methods.SimpleRequestProducer;
37+
import org.apache.hc.client5.http.async.methods.SimpleResponseConsumer;
38+
import org.apache.hc.client5.http.config.RequestConfig;
39+
import org.apache.hc.client5.http.config.TlsConfig;
40+
import org.apache.hc.client5.http.impl.async.InternalTestHttpAsyncExecRuntime;
41+
import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManager;
42+
import org.apache.hc.client5.http.protocol.HttpClientContext;
43+
import org.apache.hc.client5.testing.extension.async.ClientProtocolLevel;
44+
import org.apache.hc.client5.testing.extension.async.ServerProtocolLevel;
45+
import org.apache.hc.client5.testing.extension.async.TestAsyncClient;
46+
import org.apache.hc.client5.testing.extension.async.TestAsyncResources;
47+
import org.apache.hc.client5.testing.extension.async.TestAsyncServer;
48+
import org.apache.hc.client5.testing.extension.async.TestAsyncServerBootstrap;
49+
import org.apache.hc.core5.concurrent.BasicFuture;
50+
import org.apache.hc.core5.concurrent.Cancellable;
51+
import org.apache.hc.core5.concurrent.FutureContribution;
52+
import org.apache.hc.core5.http.HttpHeaders;
53+
import org.apache.hc.core5.http.HttpHost;
54+
import org.apache.hc.core5.http.HttpRequest;
55+
import org.apache.hc.core5.http.URIScheme;
56+
import org.apache.hc.core5.http.nio.support.BasicClientExchangeHandler;
57+
import org.apache.hc.core5.http.support.BasicRequestBuilder;
58+
import org.apache.hc.core5.http2.HttpVersionPolicy;
59+
import org.apache.hc.core5.pool.PoolStats;
60+
import org.apache.hc.core5.reactor.ConnectionInitiator;
61+
import org.apache.hc.core5.util.TimeValue;
62+
import org.apache.hc.core5.util.Timeout;
63+
import org.junit.jupiter.api.Assertions;
64+
import org.junit.jupiter.api.Test;
65+
import org.junit.jupiter.api.extension.RegisterExtension;
66+
67+
public class TestInternalHttpAsyncExecRuntime {
68+
69+
public static final Timeout TIMEOUT = Timeout.ofMinutes(1);
70+
71+
@RegisterExtension
72+
private final TestAsyncResources testResources;
73+
74+
public TestInternalHttpAsyncExecRuntime() {
75+
this.testResources = new TestAsyncResources(URIScheme.HTTP, ClientProtocolLevel.STANDARD, ServerProtocolLevel.STANDARD, TIMEOUT);
76+
}
77+
78+
public void configureServer(final Consumer<TestAsyncServerBootstrap> serverCustomizer) {
79+
testResources.configureServer(serverCustomizer);
80+
}
81+
82+
public HttpHost startServer() throws Exception {
83+
final TestAsyncServer server = testResources.server();
84+
final InetSocketAddress inetSocketAddress = server.start();
85+
return new HttpHost(testResources.scheme().id, "localhost", inetSocketAddress.getPort());
86+
}
87+
88+
public TestAsyncClient startClient() throws Exception {
89+
final TestAsyncClient client = testResources.client();
90+
client.start();
91+
return client;
92+
}
93+
94+
static final int REQ_NUM = 5;
95+
96+
HttpRequest createRequest(final HttpHost target) {
97+
return BasicRequestBuilder.get()
98+
.setHttpHost(target)
99+
.setPath("/random/20000")
100+
.addHeader(HttpHeaders.HOST, target.toHostString())
101+
.build();
102+
}
103+
104+
@Test
105+
void testExecutionCancellation_http11HardCancellation_connectionMarkedNonReusable() throws Exception {
106+
configureServer(bootstrap -> bootstrap.register("/random/*", AsyncRandomHandler::new));
107+
final HttpHost target = startServer();
108+
109+
final TestAsyncClient client = startClient();
110+
final ConnectionInitiator connectionInitiator = client.getImplementation();
111+
final PoolingAsyncClientConnectionManager connectionManager = client.getConnectionManager();
112+
for (int i = 0; i < REQ_NUM; i++) {
113+
final HttpClientContext context = HttpClientContext.create();
114+
115+
final InternalTestHttpAsyncExecRuntime testRuntime = new InternalTestHttpAsyncExecRuntime(
116+
connectionManager,
117+
connectionInitiator,
118+
TlsConfig.custom()
119+
.setVersionPolicy(HttpVersionPolicy.FORCE_HTTP_1)
120+
.build());
121+
final Future<Boolean> connectFuture = testRuntime.leaseAndConnect(target, context);
122+
Assertions.assertTrue(connectFuture.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()));
123+
124+
final BasicFuture<SimpleHttpResponse> resultFuture = new BasicFuture<>(null);
125+
final Cancellable cancellable = testRuntime.execute(
126+
"test-" + i,
127+
new BasicClientExchangeHandler<>(
128+
SimpleRequestProducer.create(SimpleRequestBuilder.get()
129+
.setHttpHost(target)
130+
.setPath("/random/20000")
131+
.addHeader(HttpHeaders.HOST, target.toHostString())
132+
.build()),
133+
SimpleResponseConsumer.create(),
134+
new FutureContribution<SimpleHttpResponse>(resultFuture) {
135+
136+
@Override
137+
public void completed(final SimpleHttpResponse result) {
138+
resultFuture.completed(result);
139+
}
140+
141+
}),
142+
context);
143+
// sleep a bit
144+
Thread.sleep(i % 10);
145+
cancellable.cancel();
146+
147+
// The message exchange is expected to get aborted
148+
try {
149+
resultFuture.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
150+
} catch (final CancellationException expected) {
151+
}
152+
Assertions.assertTrue(testRuntime.isAborted());
153+
testRuntime.discardEndpoint();
154+
}
155+
}
156+
157+
@Test
158+
void testExecutionCancellation_http11NoHardCancellation_connectionAlive() throws Exception {
159+
configureServer(bootstrap -> bootstrap.register("/random/*", AsyncRandomHandler::new));
160+
final HttpHost target = startServer();
161+
162+
final TestAsyncClient client = startClient();
163+
final ConnectionInitiator connectionInitiator = client.getImplementation();
164+
final PoolingAsyncClientConnectionManager connectionManager = client.getConnectionManager();
165+
for (int i = 0; i < REQ_NUM; i++) {
166+
final HttpClientContext context = HttpClientContext.create();
167+
context.setRequestConfig(RequestConfig.custom()
168+
.setHardCancellationEnabled(false)
169+
.build());
170+
171+
final InternalTestHttpAsyncExecRuntime testRuntime = new InternalTestHttpAsyncExecRuntime(
172+
connectionManager,
173+
connectionInitiator,
174+
TlsConfig.custom()
175+
.setVersionPolicy(HttpVersionPolicy.FORCE_HTTP_1)
176+
.build());
177+
final Future<Boolean> connectFuture = testRuntime.leaseAndConnect(target, context);
178+
Assertions.assertTrue(connectFuture.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()));
179+
180+
final BasicFuture<SimpleHttpResponse> resultFuture = new BasicFuture<>(null);
181+
final Cancellable cancellable = testRuntime.execute(
182+
"test-" + i,
183+
new BasicClientExchangeHandler<>(
184+
SimpleRequestProducer.create(SimpleRequestBuilder.get()
185+
.setHttpHost(target)
186+
.setPath("/random/20000")
187+
.addHeader(HttpHeaders.HOST, target.toHostString())
188+
.build()),
189+
SimpleResponseConsumer.create(),
190+
new FutureContribution<SimpleHttpResponse>(resultFuture) {
191+
192+
@Override
193+
public void completed(final SimpleHttpResponse result) {
194+
resultFuture.completed(result);
195+
}
196+
197+
}),
198+
context);
199+
// sleep a bit
200+
Thread.sleep(i % 10);
201+
cancellable.cancel();
202+
203+
// The message exchange should not get aborted and is expected to successfully complete
204+
final SimpleHttpResponse response = resultFuture.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
205+
Assertions.assertNotNull(response);
206+
Assertions.assertFalse(testRuntime.isAborted());
207+
// The underlying connection is expected to stay valid
208+
Assertions.assertTrue(testRuntime.isEndpointConnected());
209+
testRuntime.markConnectionReusable(null, TimeValue.ofMinutes(1));
210+
testRuntime.releaseEndpoint();
211+
212+
final PoolStats totalStats = connectionManager.getTotalStats();
213+
Assertions.assertTrue(totalStats.getAvailable() > 0);
214+
}
215+
}
216+
217+
}

httpclient5-testing/src/test/java/org/apache/hc/client5/testing/extension/async/TestAsyncServerBootstrap.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ public HandlerEntry(final String uriPattern, final T handler) {
6666
}
6767

6868
private final URIScheme scheme;
69-
private final ServerProtocolLevel serverProtocolLevel;
69+
private ServerProtocolLevel serverProtocolLevel;
7070

7171
private final List<HandlerEntry<Supplier<AsyncServerExchangeHandler>>> handlerList;
7272
private Timeout timeout;
@@ -79,6 +79,10 @@ public TestAsyncServerBootstrap(final URIScheme scheme, final ServerProtocolLeve
7979
this.handlerList = new ArrayList<>();
8080
}
8181

82+
public void setServerProtocolLevel(final ServerProtocolLevel serverProtocolLevel) {
83+
this.serverProtocolLevel = serverProtocolLevel;
84+
}
85+
8286
public ServerProtocolLevel getProtocolLevel() {
8387
return serverProtocolLevel;
8488
}

0 commit comments

Comments
 (0)