Skip to content

Commit 42909c4

Browse files
committed
Fix cancellation races in shared async execution queue
1 parent c82096b commit 42909c4

7 files changed

Lines changed: 28 additions & 278 deletions

File tree

httpclient5-testing/src/test/java/org/apache/hc/client5/http/impl/async/InternalTestHttpAsyncExecRuntime.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import java.util.List;
3232
import java.util.concurrent.Future;
3333
import java.util.concurrent.atomic.AtomicBoolean;
34-
import java.util.concurrent.atomic.AtomicInteger;
3534

3635
import org.apache.hc.client5.http.HttpRoute;
3736
import org.apache.hc.client5.http.async.AsyncExecRuntime;
@@ -64,7 +63,7 @@ public final class InternalTestHttpAsyncExecRuntime extends InternalHttpAsyncExe
6463
public InternalTestHttpAsyncExecRuntime(final AsyncClientConnectionManager manager,
6564
final ConnectionInitiator connectionInitiator,
6665
final TlsConfig tlsConfig) {
67-
super(LOG, manager, connectionInitiator, null, tlsConfig, -1, new AtomicInteger());
66+
super(LOG, manager, connectionInitiator, null, tlsConfig);
6867
this.cancelled = new AtomicBoolean();
6968
}
7069

httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/H2AsyncClientBuilder.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -328,12 +328,12 @@ public final H2AsyncClientBuilder disableRequestPriority() {
328328
}
329329

330330
/**
331-
* Sets a hard cap on the number of requests allowed to be queued/in-flight
332-
* within the internal async execution pipeline. When the limit is reached,
333-
* new submissions fail fast with {@link java.util.concurrent.RejectedExecutionException}.
331+
* Sets the maximum number of concurrent request executions within the
332+
* internal async execution pipeline. Requests beyond this limit are queued
333+
* in FIFO order and dispatched as in-flight executions complete.
334334
* A value {@code <= 0} means unlimited (default).
335335
*
336-
* @param max maximum number of queued requests; {@code <= 0} to disable the cap
336+
* @param max maximum number of concurrent executions; {@code <= 0} to disable the cap
337337
* @return this builder
338338
* @since 5.7
339339
*/

httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/HttpAsyncClientBuilder.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -910,12 +910,12 @@ public final HttpAsyncClientBuilder setTlsRequired(final boolean tlsRequired) {
910910
}
911911

912912
/**
913-
* Sets a hard cap on the number of requests allowed to be queued/in-flight
914-
* within the internal async execution pipeline. When the limit is reached,
915-
* new submissions fail fast with {@link java.util.concurrent.RejectedExecutionException}.
916-
* A value <= 0 means unlimited (default).
913+
* Sets the maximum number of concurrent request executions within the
914+
* internal async execution pipeline. Requests beyond this limit are queued
915+
* in FIFO order and dispatched as in-flight executions complete.
916+
* A value {@code <= 0} means unlimited (default).
917917
*
918-
* @param max maximum number of queued requests; <= 0 to disable the cap
918+
* @param max maximum number of concurrent executions; {@code <= 0} to disable the cap
919919
* @return this builder
920920
* @since 5.7
921921
*/

httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalH2AsyncClient.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,8 @@ public final class InternalH2AsyncClient extends InternalAbstractHttpAsyncClient
6767
private final InternalH2ConnPool connPool;
6868

6969
/**
70-
* One shared FIFO queue per client instance (Oleg).
71-
* null means "unlimited" / no throttling.
70+
* One shared FIFO queue per client instance.
71+
* {@code null} means unlimited / no throttling.
7272
*/
7373
private final SharedRequestExecutionQueue executionQueue;
7474

httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncExecRuntime.java

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -383,29 +383,26 @@ public Cancellable execute(
383383
return startExecution(id, exchangeHandler, context);
384384
}
385385

386-
final AtomicReference<Cancellable> activeCancelRef = new AtomicReference<>(Operations.nonCancellable());
386+
final ComplexCancellable complexCancellable = new ComplexCancellable();
387387

388388
final Cancellable queued = executionQueue.enqueue(
389389
() -> {
390390
final AsyncClientExchangeHandler wrapped =
391391
new ReleasingAsyncClientExchangeHandler(exchangeHandler, executionQueue::completed);
392392
try {
393393
final Cancellable cancellable = startExecution(id, wrapped, context);
394-
activeCancelRef.set(cancellable);
394+
complexCancellable.setDependency(cancellable);
395395
} catch (final RuntimeException ex) {
396396
wrapped.failed(ex);
397397
}
398398
},
399-
() -> {
400-
activeCancelRef.get().cancel();
401-
exchangeHandler.cancel();
402-
});
399+
exchangeHandler::cancel);
403400

404401
return () -> {
405402
if (queued.cancel()) {
406403
return true;
407404
}
408-
return activeCancelRef.get().cancel();
405+
return complexCancellable.cancel();
409406
};
410407
}
411408

httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/SharedRequestExecutionQueue.java

Lines changed: 12 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import java.util.concurrent.ConcurrentLinkedQueue;
3131
import java.util.concurrent.atomic.AtomicBoolean;
3232
import java.util.concurrent.atomic.AtomicInteger;
33+
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
3334

3435
import org.apache.hc.core5.annotation.Internal;
3536
import org.apache.hc.core5.concurrent.Cancellable;
@@ -83,10 +84,9 @@ private void drain() {
8384
if (entry == null) {
8485
break;
8586
}
86-
if (entry.isCancelled()) {
87+
if (!entry.tryStart()) {
8788
continue;
8889
}
89-
entry.markStarted();
9090
inFlight.incrementAndGet();
9191
entry.run();
9292
}
@@ -102,36 +102,32 @@ private void drain() {
102102

103103
private static final class Entry implements Cancellable {
104104

105+
private static final int QUEUED = 0;
106+
private static final int STARTED = 1;
107+
private static final int CANCELLED = 2;
108+
109+
private static final AtomicIntegerFieldUpdater<Entry> STATE = AtomicIntegerFieldUpdater.newUpdater(Entry.class, "state");
110+
105111
private final Runnable task;
106112
private final Runnable onCancel;
107-
private final AtomicBoolean started;
108-
private final AtomicBoolean cancelled;
113+
private volatile int state;
109114

110115
Entry(final Runnable task, final Runnable onCancel) {
111116
this.task = task;
112117
this.onCancel = onCancel;
113-
this.started = new AtomicBoolean(false);
114-
this.cancelled = new AtomicBoolean(false);
115118
}
116119

117120
void run() {
118121
task.run();
119122
}
120123

121-
void markStarted() {
122-
started.set(true);
123-
}
124-
125-
boolean isCancelled() {
126-
return cancelled.get();
124+
boolean tryStart() {
125+
return STATE.compareAndSet(this, QUEUED, STARTED);
127126
}
128127

129128
@Override
130129
public boolean cancel() {
131-
if (started.get()) {
132-
return false;
133-
}
134-
if (cancelled.compareAndSet(false, true)) {
130+
if (STATE.compareAndSet(this, QUEUED, CANCELLED)) {
135131
onCancel.run();
136132
return true;
137133
}

0 commit comments

Comments
 (0)