@@ -106,7 +106,7 @@ abstract class AbstractH2StreamMultiplexer implements Identifiable, HttpConnecti
106106
107107 private static final long CONNECTION_WINDOW_LOW_MARK = 10 * 1024 * 1024 ;
108108
109- enum ConnectionHandshake { READY , ACTIVE , GRACEFUL_SHUTDOWN , SHUTDOWN }
109+ enum ConnectionHandshake { READY , ACTIVE , DRAINING , GRACEFUL_SHUTDOWN , SHUTDOWN }
110110 enum SettingsHandshake { READY , TRANSMITTED , ACKED }
111111
112112 private final ProtocolIOSession ioSession ;
@@ -127,6 +127,7 @@ enum SettingsHandshake { READY, TRANSMITTED, ACKED }
127127 private final AtomicInteger connOutputWindow ;
128128 private final AtomicInteger outputRequests ;
129129 private final H2StreamListener streamListener ;
130+ private final boolean serverSide ;
130131
131132 private ConnectionHandshake connState = ConnectionHandshake .READY ;
132133 private SettingsHandshake localSettingState = SettingsHandshake .READY ;
@@ -142,6 +143,9 @@ enum SettingsHandshake { READY, TRANSMITTED, ACKED }
142143
143144 private EndpointDetails endpointDetails ;
144145 private boolean goAwayReceived ;
146+ private int shutdownLastStreamId ;
147+ private int lastProcessedRemoteStreamId ;
148+ private boolean drainPingSent ;
145149
146150 private volatile boolean peerNoRfc7540Priorities ;
147151
@@ -201,6 +205,10 @@ enum SettingsHandshake { READY, TRANSMITTED, ACKED }
201205 this .streamListener = streamListener ;
202206 this .lastActivityTime = System .currentTimeMillis ();
203207 this .validateAfterInactivity = validateAfterInactivity ;
208+ this .shutdownLastStreamId = 0 ;
209+ this .lastProcessedRemoteStreamId = 0 ;
210+ this .drainPingSent = false ;
211+ this .serverSide = this .streams .isSameSide (2 );
204212 }
205213
206214 @ Override
@@ -435,6 +443,11 @@ private void incrementInputCapacity(
435443 }
436444 }
437445
446+ private boolean isAcceptingNewRemoteStreams () {
447+ return connState .compareTo (ConnectionHandshake .ACTIVE ) <= 0
448+ || serverSide && connState == ConnectionHandshake .DRAINING ;
449+ }
450+
438451 void requestSessionOutput () {
439452 outputRequests .incrementAndGet ();
440453 ioSession .setEvent (SelectionKey .OP_WRITE );
@@ -506,6 +519,16 @@ public final void onOutput() throws HttpException, IOException {
506519 ioSession .getLock ().unlock ();
507520 }
508521
522+ if (serverSide
523+ && connState == ConnectionHandshake .DRAINING
524+ && !drainPingSent
525+ && outputBuffer .isEmpty ()
526+ && outputQueue .isEmpty ()) {
527+ drainPingSent = true ;
528+ executePing (new PingCommand (createGracefulShutdownPingHandler ()));
529+ return ;
530+ }
531+
509532 if (connState .compareTo (ConnectionHandshake .SHUTDOWN ) < 0 ) {
510533
511534 if (connOutputWindow .get () > 0 && remoteSettingState == SettingsHandshake .ACKED ) {
@@ -589,16 +612,16 @@ public final void onOutput() throws HttpException, IOException {
589612 streams .dropStreamId (stream .getId ());
590613 it .remove ();
591614 } else {
592- if (streams .isSameSide (stream .getId ()) || stream .getId () <= streams . getLastRemoteId () ) {
615+ if (streams .isSameSide (stream .getId ()) || shutdownLastStreamId == 0 || stream .getId () <= shutdownLastStreamId ) {
593616 liveStreams ++;
594617 }
595618 }
596619 }
597- if (liveStreams == 0 ) {
620+ if (shutdownLastStreamId != Integer . MAX_VALUE && liveStreams == 0 ) {
598621 connState = ConnectionHandshake .SHUTDOWN ;
599622 }
600623 }
601- if (connState .compareTo (ConnectionHandshake .GRACEFUL_SHUTDOWN ) >= 0 ) {
624+ if (connState .compareTo (ConnectionHandshake .DRAINING ) >= 0 ) {
602625 for (;;) {
603626 final Command command = ioSession .poll ();
604627 if (command == null ) {
@@ -628,6 +651,11 @@ public final void onOutput() throws HttpException, IOException {
628651 }
629652
630653 public final void onTimeout (final Timeout timeout ) throws HttpException , IOException {
654+ if (serverSide && connState == ConnectionHandshake .DRAINING ) {
655+ completeGracefulShutdown ();
656+ return ;
657+ }
658+
631659 connState = ConnectionHandshake .SHUTDOWN ;
632660
633661 final RawFrame goAway ;
@@ -663,15 +691,69 @@ private void executeShutdown(final ShutdownCommand shutdownCommand) throws IOExc
663691 if (shutdownCommand .getType () == CloseMode .IMMEDIATE ) {
664692 streams .shutdownAndReleaseAll ();
665693 connState = ConnectionHandshake .SHUTDOWN ;
666- } else {
667- if (connState .compareTo (ConnectionHandshake .ACTIVE ) <= 0 ) {
668- final RawFrame goAway = frameFactory .createGoAway (streams .getLastRemoteId (), H2Error .NO_ERROR , "Graceful shutdown" );
694+ return ;
695+ }
696+ if (connState .compareTo (ConnectionHandshake .ACTIVE ) <= 0 ) {
697+ if (serverSide ) {
698+ shutdownLastStreamId = Integer .MAX_VALUE ;
699+ drainPingSent = false ;
700+ final RawFrame goAway = frameFactory .createGoAway (
701+ shutdownLastStreamId ,
702+ H2Error .NO_ERROR ,
703+ "Graceful shutdown" );
704+ commitFrame (goAway );
705+ connState = ConnectionHandshake .DRAINING ;
706+ requestSessionOutput ();
707+ } else {
708+ final RawFrame goAway = frameFactory .createGoAway (
709+ streams .getLastRemoteId (),
710+ H2Error .NO_ERROR ,
711+ "Graceful shutdown" );
669712 commitFrame (goAway );
670713 connState = streams .isEmpty () ? ConnectionHandshake .SHUTDOWN : ConnectionHandshake .GRACEFUL_SHUTDOWN ;
671714 }
672715 }
673716 }
674717
718+ private void completeGracefulShutdown () throws IOException {
719+ if (!serverSide || connState != ConnectionHandshake .DRAINING ) {
720+ return ;
721+ }
722+ shutdownLastStreamId = lastProcessedRemoteStreamId ;
723+ final RawFrame goAway = frameFactory .createGoAway (shutdownLastStreamId , H2Error .NO_ERROR , "Graceful shutdown" );
724+ commitFrame (goAway );
725+ connState = ConnectionHandshake .GRACEFUL_SHUTDOWN ;
726+ }
727+
728+ private AsyncPingHandler createGracefulShutdownPingHandler () {
729+ final ByteBuffer data = ByteBuffer .allocate (8 );
730+ data .putLong (System .nanoTime ());
731+ data .flip ();
732+ return new AsyncPingHandler () {
733+
734+ @ Override
735+ public ByteBuffer getData () {
736+ return data .asReadOnlyBuffer ();
737+ }
738+
739+ @ Override
740+ public void consumeResponse (final ByteBuffer feedback ) throws IOException {
741+ if (connState == ConnectionHandshake .DRAINING ) {
742+ completeGracefulShutdown ();
743+ }
744+ }
745+
746+ @ Override
747+ public void failed (final Exception cause ) {
748+ }
749+
750+ @ Override
751+ public void cancel () {
752+ }
753+
754+ };
755+ }
756+
675757 private void executePing (final PingCommand pingCommand ) throws IOException {
676758 final AsyncPingHandler handler = pingCommand .getHandler ();
677759 pingHandlers .add (handler );
@@ -817,8 +899,11 @@ private void consumeFrame(final RawFrame frame) throws HttpException, IOExceptio
817899 }
818900
819901 final H2StreamChannel channel = createChannel (streamId );
820- if (connState . compareTo ( ConnectionHandshake . ACTIVE ) <= 0 ) {
902+ if (isAcceptingNewRemoteStreams () ) {
821903 stream = streams .createActive (channel , incomingRequest (channel ));
904+ if (serverSide ) {
905+ lastProcessedRemoteStreamId = Math .max (lastProcessedRemoteStreamId , streamId );
906+ }
822907 streams .resetIfExceedsMaxConcurrentLimit (stream , localConfig .getMaxConcurrentStreams ());
823908 } else {
824909 channel .localReset (H2Error .REFUSED_STREAM );
@@ -1026,8 +1111,11 @@ private void consumeFrame(final RawFrame frame) throws HttpException, IOExceptio
10261111
10271112 final H2StreamChannel channel = createChannel (promisedStreamId );
10281113 final H2Stream promisedStream ;
1029- if (connState . compareTo ( ConnectionHandshake . ACTIVE ) <= 0 ) {
1114+ if (isAcceptingNewRemoteStreams () ) {
10301115 promisedStream = streams .createReserved (channel , incomingPushPromise (channel , stream .getPushHandlerFactory ()));
1116+ if (serverSide ) {
1117+ lastProcessedRemoteStreamId = Math .max (lastProcessedRemoteStreamId , promisedStreamId );
1118+ }
10311119 } else {
10321120 channel .localReset (H2Error .REFUSED_STREAM );
10331121 promisedStream = streams .createActive (channel , NoopH2StreamHandler .INSTANCE );
@@ -1053,17 +1141,18 @@ private void consumeFrame(final RawFrame frame) throws HttpException, IOExceptio
10531141 final int errorCode = payload .getInt ();
10541142 goAwayReceived = true ;
10551143 if (errorCode == H2Error .NO_ERROR .getCode ()) {
1056- if (connState .compareTo (ConnectionHandshake .ACTIVE ) <= 0 ) {
1057- for (final Iterator <H2Stream > it = streams .iterator (); it .hasNext (); ) {
1058- final H2Stream stream = it .next ();
1059- final int activeStreamId = stream .getId ();
1060- if (!streams .isSameSide (activeStreamId ) && activeStreamId > processedLocalStreamId ) {
1061- stream .fail (new RequestNotExecutedException ());
1062- it .remove ();
1063- }
1144+ for (final Iterator <H2Stream > it = streams .iterator (); it .hasNext (); ) {
1145+ final H2Stream stream = it .next ();
1146+ final int activeStreamId = stream .getId ();
1147+ if (!streams .isSameSide (activeStreamId ) && activeStreamId > processedLocalStreamId ) {
1148+ stream .fail (new RequestNotExecutedException ());
1149+ it .remove ();
10641150 }
10651151 }
1066- connState = streams .isEmpty () ? ConnectionHandshake .SHUTDOWN : ConnectionHandshake .GRACEFUL_SHUTDOWN ;
1152+ if (connState != ConnectionHandshake .DRAINING ) {
1153+ shutdownLastStreamId = processedLocalStreamId ;
1154+ connState = ConnectionHandshake .GRACEFUL_SHUTDOWN ;
1155+ }
10671156 } else {
10681157 for (final Iterator <H2Stream > it = streams .iterator (); it .hasNext (); ) {
10691158 final H2Stream stream = it .next ();
0 commit comments