Skip to content

Commit 45f0d12

Browse files
committed
Add HTTP/2 CONNECT tunnel support over HTTP/2 proxy connections. Implements multiplexing-safe CONNECT tunneling by adapting a single HTTP/2 proxy stream to a ProtocolIOSession that can be upgraded with TLS and an optional protocol handler. T
1 parent 23bb1c8 commit 45f0d12

13 files changed

Lines changed: 3292 additions & 0 deletions

File tree

Lines changed: 287 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,287 @@
1+
/*
2+
* ====================================================================
3+
* Licensed to the Apache Software Foundation (ASF) under one
4+
* or more contributor license agreements. See the NOTICE file
5+
* distributed with this work for additional information
6+
* regarding copyright ownership. The ASF licenses this file
7+
* to you under the Apache License, Version 2.0 (the
8+
* "License"); you may not use this file except in compliance
9+
* with the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing,
14+
* software distributed under the License is distributed on an
15+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
* KIND, either express or implied. See the License for the
17+
* specific language governing permissions and limitations
18+
* under the License.
19+
* ====================================================================
20+
*
21+
* This software consists of voluntary contributions made by many
22+
* individuals on behalf of the Apache Software Foundation. For more
23+
* information on the Apache Software Foundation, please see
24+
* <http://www.apache.org/>.
25+
*
26+
*/
27+
package org.apache.hc.core5.http2.nio.support;
28+
29+
import java.io.IOException;
30+
import java.nio.ByteBuffer;
31+
import java.util.List;
32+
import java.util.concurrent.atomic.AtomicBoolean;
33+
34+
import org.apache.hc.core5.concurrent.FutureCallback;
35+
import org.apache.hc.core5.http.ConnectionClosedException;
36+
import org.apache.hc.core5.http.EntityDetails;
37+
import org.apache.hc.core5.http.Header;
38+
import org.apache.hc.core5.http.HttpException;
39+
import org.apache.hc.core5.http.HttpRequest;
40+
import org.apache.hc.core5.http.HttpResponse;
41+
import org.apache.hc.core5.http.Method;
42+
import org.apache.hc.core5.http.StreamControl;
43+
import org.apache.hc.core5.http.HttpRequestInterceptor;
44+
import org.apache.hc.core5.http.impl.BasicEntityDetails;
45+
import org.apache.hc.core5.http.message.BasicHttpRequest;
46+
import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
47+
import org.apache.hc.core5.http.nio.CapacityChannel;
48+
import org.apache.hc.core5.http.nio.DataStreamChannel;
49+
import org.apache.hc.core5.http.nio.RequestChannel;
50+
import org.apache.hc.core5.http.nio.ssl.TlsStrategy;
51+
import org.apache.hc.core5.http.protocol.HttpContext;
52+
import org.apache.hc.core5.io.CloseMode;
53+
import org.apache.hc.core5.net.NamedEndpoint;
54+
import org.apache.hc.core5.net.URIAuthority;
55+
import org.apache.hc.core5.reactor.IOEventHandler;
56+
import org.apache.hc.core5.reactor.IOEventHandlerFactory;
57+
import org.apache.hc.core5.reactor.IOSession;
58+
import org.apache.hc.core5.reactor.ProtocolIOSession;
59+
import org.apache.hc.core5.reactor.ssl.TransportSecurityLayer;
60+
import org.apache.hc.core5.util.Timeout;
61+
62+
/**
63+
* Exchange handler that establishes an HTTP/2 CONNECT tunnel and exposes
64+
* the resulting data stream as a {@link ProtocolIOSession}.
65+
*
66+
* @since 5.5
67+
*/
68+
final class H2OverH2TunnelExchangeHandler implements AsyncClientExchangeHandler {
69+
70+
private final IOSession physicalSession;
71+
private final NamedEndpoint targetEndpoint;
72+
private final Timeout connectTimeout;
73+
private final boolean secure;
74+
private final TlsStrategy tlsStrategy;
75+
private final HttpRequestInterceptor connectRequestInterceptor;
76+
private final IOEventHandlerFactory protocolStarter;
77+
private final FutureCallback<ProtocolIOSession> callback;
78+
79+
private final AtomicBoolean done;
80+
81+
private volatile DataStreamChannel dataChannel;
82+
private volatile CapacityChannel capacityChannel;
83+
private volatile StreamControl streamControl;
84+
private volatile H2TunnelProtocolIOSession tunnelSession;
85+
86+
H2OverH2TunnelExchangeHandler(
87+
final IOSession physicalSession,
88+
final NamedEndpoint targetEndpoint,
89+
final Timeout connectTimeout,
90+
final boolean secure,
91+
final TlsStrategy tlsStrategy,
92+
final HttpRequestInterceptor connectRequestInterceptor,
93+
final IOEventHandlerFactory protocolStarter,
94+
final FutureCallback<ProtocolIOSession> callback) {
95+
this.physicalSession = physicalSession;
96+
this.targetEndpoint = targetEndpoint;
97+
this.connectTimeout = connectTimeout;
98+
this.secure = secure;
99+
this.tlsStrategy = tlsStrategy;
100+
this.connectRequestInterceptor = connectRequestInterceptor;
101+
this.protocolStarter = protocolStarter;
102+
this.callback = callback;
103+
this.done = new AtomicBoolean(false);
104+
}
105+
106+
void initiated(final StreamControl streamControl) {
107+
this.streamControl = streamControl;
108+
final H2TunnelProtocolIOSession tunnel = this.tunnelSession;
109+
if (tunnel != null) {
110+
tunnel.bindStreamControl(streamControl);
111+
}
112+
}
113+
114+
@Override
115+
public void releaseResources() {
116+
}
117+
118+
@Override
119+
public void failed(final Exception cause) {
120+
fail(cause);
121+
}
122+
123+
@Override
124+
public void cancel() {
125+
fail(new ConnectionClosedException("Tunnel setup cancelled"));
126+
}
127+
128+
@Override
129+
public void produceRequest(final RequestChannel requestChannel, final HttpContext context) throws HttpException, IOException {
130+
final HttpRequest connectRequest = new BasicHttpRequest(Method.CONNECT.name(), (String) null);
131+
connectRequest.setAuthority(new URIAuthority(targetEndpoint));
132+
if (connectRequestInterceptor != null) {
133+
connectRequestInterceptor.process(connectRequest, null, context);
134+
}
135+
requestChannel.sendRequest(connectRequest, new BasicEntityDetails(-1, null), context);
136+
}
137+
138+
@Override
139+
public int available() {
140+
final H2TunnelProtocolIOSession tunnel = this.tunnelSession;
141+
return tunnel != null ? tunnel.available() : 0;
142+
}
143+
144+
@Override
145+
public void produce(final DataStreamChannel channel) throws IOException {
146+
this.dataChannel = channel;
147+
final H2TunnelProtocolIOSession tunnel = this.tunnelSession;
148+
if (tunnel != null) {
149+
tunnel.attachChannel(channel);
150+
tunnel.onOutputReady();
151+
}
152+
}
153+
154+
@Override
155+
public void consumeInformation(final HttpResponse response, final HttpContext context) {
156+
}
157+
158+
@Override
159+
public void consumeResponse(
160+
final HttpResponse response,
161+
final EntityDetails entityDetails,
162+
final HttpContext context) throws HttpException, IOException {
163+
164+
final int status = response.getCode();
165+
if (status < 200 || status >= 300) {
166+
throw new TunnelRefusedException(response);
167+
}
168+
169+
if (entityDetails == null) {
170+
throw new HttpException("CONNECT response does not provide a tunneled data stream");
171+
}
172+
173+
if (this.tunnelSession != null) {
174+
return;
175+
}
176+
177+
final H2TunnelProtocolIOSession tunnel =
178+
new H2TunnelProtocolIOSession(physicalSession, targetEndpoint, connectTimeout, streamControl);
179+
180+
final DataStreamChannel currentChannel = this.dataChannel;
181+
if (currentChannel != null) {
182+
tunnel.attachChannel(currentChannel);
183+
}
184+
final CapacityChannel currentCapacity = this.capacityChannel;
185+
if (currentCapacity != null) {
186+
tunnel.updateCapacityChannel(currentCapacity);
187+
}
188+
this.tunnelSession = tunnel;
189+
190+
if (secure) {
191+
tlsStrategy.upgrade(
192+
tunnel,
193+
targetEndpoint,
194+
null,
195+
connectTimeout,
196+
new FutureCallback<TransportSecurityLayer>() {
197+
198+
@Override
199+
public void completed(final TransportSecurityLayer transportSecurityLayer) {
200+
try {
201+
startProtocol(tunnel);
202+
complete(tunnel);
203+
} catch (final Exception ex) {
204+
fail(ex);
205+
}
206+
}
207+
208+
@Override
209+
public void failed(final Exception ex) {
210+
fail(ex);
211+
}
212+
213+
@Override
214+
public void cancelled() {
215+
fail(new ConnectionClosedException("Tunnel TLS upgrade cancelled"));
216+
}
217+
});
218+
} else {
219+
startProtocol(tunnel);
220+
complete(tunnel);
221+
}
222+
}
223+
224+
private void startProtocol(final H2TunnelProtocolIOSession tunnel) throws IOException {
225+
if (protocolStarter == null) {
226+
return;
227+
}
228+
final IOEventHandler protocolHandler = protocolStarter.createHandler(tunnel, null);
229+
tunnel.upgrade(protocolHandler);
230+
protocolHandler.connected(tunnel);
231+
}
232+
233+
@Override
234+
public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
235+
this.capacityChannel = capacityChannel;
236+
final H2TunnelProtocolIOSession tunnel = this.tunnelSession;
237+
if (tunnel != null) {
238+
tunnel.updateCapacityChannel(capacityChannel);
239+
}
240+
}
241+
242+
@Override
243+
public void consume(final ByteBuffer src) throws IOException {
244+
final H2TunnelProtocolIOSession tunnel = this.tunnelSession;
245+
if (tunnel != null && src != null && src.hasRemaining()) {
246+
tunnel.onInput(src);
247+
}
248+
}
249+
250+
@Override
251+
public void streamEnd(final List<? extends Header> trailers) {
252+
final H2TunnelProtocolIOSession tunnel = this.tunnelSession;
253+
if (tunnel != null) {
254+
tunnel.onRemoteStreamEnd();
255+
} else {
256+
closeTransport(CloseMode.GRACEFUL);
257+
}
258+
if (done.compareAndSet(false, true) && callback != null) {
259+
callback.failed(new ConnectionClosedException("Tunnel stream closed"));
260+
}
261+
}
262+
263+
private void closeTransport(final CloseMode closeMode) {
264+
final H2TunnelProtocolIOSession tunnel = this.tunnelSession;
265+
if (tunnel != null) {
266+
tunnel.close(closeMode);
267+
return;
268+
}
269+
final StreamControl currentStreamControl = this.streamControl;
270+
if (currentStreamControl != null) {
271+
currentStreamControl.cancel();
272+
}
273+
}
274+
275+
private void fail(final Exception cause) {
276+
closeTransport(CloseMode.IMMEDIATE);
277+
if (done.compareAndSet(false, true) && callback != null) {
278+
callback.failed(cause);
279+
}
280+
}
281+
282+
private void complete(final H2TunnelProtocolIOSession tunnel) {
283+
if (done.compareAndSet(false, true) && callback != null) {
284+
callback.completed(tunnel);
285+
}
286+
}
287+
}

0 commit comments

Comments
 (0)