Skip to content

Commit b74914a

Browse files
committed
Ensure async data consumers can avoid NPE if they have been canceled / released from another thread at the same with concurrent data processing
1 parent c7e0317 commit b74914a

5 files changed

Lines changed: 58 additions & 18 deletions

File tree

httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/AbstractAsyncRequesterConsumer.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -113,19 +113,27 @@ public void completed(final E entity) {
113113
@Override
114114
public final void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
115115
final AsyncEntityConsumer<E> dataConsumer = dataConsumerRef.get();
116-
dataConsumer.updateCapacity(capacityChannel);
116+
if (dataConsumer != null) {
117+
dataConsumer.updateCapacity(capacityChannel);
118+
} else {
119+
capacityChannel.update(Integer.MAX_VALUE);
120+
}
117121
}
118122

119123
@Override
120124
public final void consume(final ByteBuffer src) throws IOException {
121125
final AsyncEntityConsumer<E> dataConsumer = dataConsumerRef.get();
122-
dataConsumer.consume(src);
126+
if (dataConsumer != null) {
127+
dataConsumer.consume(src);
128+
}
123129
}
124130

125131
@Override
126132
public final void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
127133
final AsyncEntityConsumer<E> dataConsumer = dataConsumerRef.get();
128-
dataConsumer.streamEnd(trailers);
134+
if (dataConsumer != null) {
135+
dataConsumer.streamEnd(trailers);
136+
}
129137
}
130138

131139
@Override

httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/AbstractClientExchangeHandler.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -147,17 +147,28 @@ public void cancel() {
147147

148148
@Override
149149
public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
150-
responseConsumerRef.get().updateCapacity(capacityChannel);
150+
final AsyncResponseConsumer<T> responseConsumer = responseConsumerRef.get();
151+
if (responseConsumer != null) {
152+
responseConsumer.updateCapacity(capacityChannel);
153+
} else {
154+
capacityChannel.update(Integer.MAX_VALUE);
155+
}
151156
}
152157

153158
@Override
154159
public void consume(final ByteBuffer src) throws IOException {
155-
responseConsumerRef.get().consume(src);
160+
final AsyncResponseConsumer<T> responseConsumer = responseConsumerRef.get();
161+
if (responseConsumer != null) {
162+
responseConsumer.consume(src);
163+
}
156164
}
157165

158166
@Override
159167
public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
160-
responseConsumerRef.get().streamEnd(trailers);
168+
final AsyncResponseConsumer<T> responseConsumer = responseConsumerRef.get();
169+
if (responseConsumer != null) {
170+
responseConsumer.streamEnd(trailers);
171+
}
161172
}
162173

163174
@Override

httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/AbstractServerExchangeHandler.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -177,22 +177,27 @@ public void cancelled() {
177177
@Override
178178
public final void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
179179
final AsyncRequestConsumer<T> requestConsumer = requestConsumerRef.get();
180-
Asserts.notNull(requestConsumer, "Data consumer");
181-
requestConsumer.updateCapacity(capacityChannel);
180+
if (requestConsumer != null) {
181+
requestConsumer.updateCapacity(capacityChannel);
182+
} else {
183+
capacityChannel.update(Integer.MAX_VALUE);
184+
}
182185
}
183186

184187
@Override
185188
public final void consume(final ByteBuffer src) throws IOException {
186189
final AsyncRequestConsumer<T> requestConsumer = requestConsumerRef.get();
187-
Asserts.notNull(requestConsumer, "Data consumer");
188-
requestConsumer.consume(src);
190+
if (requestConsumer != null) {
191+
requestConsumer.consume(src);
192+
}
189193
}
190194

191195
@Override
192196
public final void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
193197
final AsyncRequestConsumer<T> requestConsumer = requestConsumerRef.get();
194-
Asserts.notNull(requestConsumer, "Data consumer");
195-
requestConsumer.streamEnd(trailers);
198+
if (requestConsumer != null) {
199+
requestConsumer.streamEnd(trailers);
200+
}
196201
}
197202

198203
@Override

httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/BasicRequestConsumer.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -100,19 +100,27 @@ public void completed(final T body) {
100100
@Override
101101
public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
102102
final AsyncEntityConsumer<T> dataConsumer = dataConsumerRef.get();
103-
dataConsumer.updateCapacity(capacityChannel);
103+
if (dataConsumer != null) {
104+
dataConsumer.updateCapacity(capacityChannel);
105+
} else {
106+
capacityChannel.update(Integer.MAX_VALUE);
107+
}
104108
}
105109

106110
@Override
107111
public void consume(final ByteBuffer src) throws IOException {
108112
final AsyncEntityConsumer<T> dataConsumer = dataConsumerRef.get();
109-
dataConsumer.consume(src);
113+
if (dataConsumer != null) {
114+
dataConsumer.consume(src);
115+
}
110116
}
111117

112118
@Override
113119
public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
114120
final AsyncEntityConsumer<T> dataConsumer = dataConsumerRef.get();
115-
dataConsumer.streamEnd(trailers);
121+
if (dataConsumer != null) {
122+
dataConsumer.streamEnd(trailers);
123+
}
116124
}
117125

118126
@Override

httpcore5/src/main/java/org/apache/hc/core5/http/nio/support/BasicResponseConsumer.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -104,19 +104,27 @@ public void informationResponse(final HttpResponse response, final HttpContext h
104104
@Override
105105
public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
106106
final AsyncEntityConsumer<T> dataConsumer = dataConsumerRef.get();
107-
dataConsumer.updateCapacity(capacityChannel);
107+
if (dataConsumer != null) {
108+
dataConsumer.updateCapacity(capacityChannel);
109+
} else {
110+
capacityChannel.update(Integer.MAX_VALUE);
111+
}
108112
}
109113

110114
@Override
111115
public void consume(final ByteBuffer src) throws IOException {
112116
final AsyncEntityConsumer<T> dataConsumer = dataConsumerRef.get();
113-
dataConsumer.consume(src);
117+
if (dataConsumer != null) {
118+
dataConsumer.consume(src);
119+
}
114120
}
115121

116122
@Override
117123
public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
118124
final AsyncEntityConsumer<T> dataConsumer = dataConsumerRef.get();
119-
dataConsumer.streamEnd(trailers);
125+
if (dataConsumer != null) {
126+
dataConsumer.streamEnd(trailers);
127+
}
120128
}
121129

122130
@Override

0 commit comments

Comments
 (0)