@@ -1165,6 +1165,286 @@ void testStreamIdleTimeoutTriggersH2StreamTimeoutException() throws Exception {
11651165
11661166 }
11671167
1168+ @ Test
1169+ void testResetIfExpiredResetsStreamPastDeadline () throws Exception {
1170+ final H2Config h2Config = H2Config .custom ().build ();
1171+ final AbstractH2StreamMultiplexer mux = new H2StreamMultiplexerImpl (
1172+ protocolIOSession ,
1173+ FRAME_FACTORY ,
1174+ StreamIdGenerator .ODD ,
1175+ httpProcessor ,
1176+ CharCodingConfig .DEFAULT ,
1177+ h2Config ,
1178+ h2StreamListener ,
1179+ () -> streamHandler );
1180+
1181+ final H2StreamChannel channel = mux .createChannel (1 );
1182+ final H2Stream stream = mux .createStream (channel , streamHandler );
1183+
1184+ stream .setTimeout (Timeout .ofMilliseconds (50 ));
1185+ stream .activate ();
1186+
1187+ // Push last activity into the past so the timeout is definitely expired
1188+ final Field lastActivityField = H2Stream .class .getDeclaredField ("lastActivityNanos" );
1189+ lastActivityField .setAccessible (true );
1190+ lastActivityField .set (stream , System .nanoTime () - TimeUnit .MILLISECONDS .toNanos (100 ));
1191+
1192+ Assertions .assertTrue (mux .resetIfExpired (stream , System .nanoTime ()));
1193+
1194+ Mockito .verify (streamHandler ).failed (exceptionCaptor .capture ());
1195+ Assertions .assertInstanceOf (H2StreamTimeoutException .class , exceptionCaptor .getValue ());
1196+ Assertions .assertTrue (stream .isLocalClosed ());
1197+ Assertions .assertTrue (stream .isClosed ());
1198+ }
1199+
1200+ @ Test
1201+ void testResetIfExpiredIgnoresStreamBeforeDeadline () throws Exception {
1202+ final H2Config h2Config = H2Config .custom ().build ();
1203+ final AbstractH2StreamMultiplexer mux = new H2StreamMultiplexerImpl (
1204+ protocolIOSession ,
1205+ FRAME_FACTORY ,
1206+ StreamIdGenerator .ODD ,
1207+ httpProcessor ,
1208+ CharCodingConfig .DEFAULT ,
1209+ h2Config ,
1210+ h2StreamListener ,
1211+ () -> streamHandler );
1212+
1213+ final H2StreamChannel channel = mux .createChannel (1 );
1214+ final H2Stream stream = mux .createStream (channel , streamHandler );
1215+
1216+ stream .setTimeout (Timeout .ofMinutes (1 ));
1217+ stream .activate ();
1218+
1219+ Assertions .assertFalse (mux .resetIfExpired (stream , System .nanoTime ()));
1220+ Assertions .assertFalse (stream .isLocalClosed ());
1221+
1222+ Mockito .verify (streamHandler , Mockito .never ()).failed (ArgumentMatchers .any ());
1223+ }
1224+
1225+ @ Test
1226+ void testExpiredStreamResetOnInboundData () throws Exception {
1227+ Mockito .when (protocolIOSession .write (ArgumentMatchers .any (ByteBuffer .class )))
1228+ .thenAnswer (invocation -> {
1229+ final ByteBuffer buffer = invocation .getArgument (0 , ByteBuffer .class );
1230+ final int remaining = buffer .remaining ();
1231+ buffer .position (buffer .limit ());
1232+ return remaining ;
1233+ });
1234+ Mockito .doNothing ().when (protocolIOSession ).setEvent (ArgumentMatchers .anyInt ());
1235+ Mockito .doNothing ().when (protocolIOSession ).clearEvent (ArgumentMatchers .anyInt ());
1236+
1237+ final H2Config h2Config = H2Config .custom ().build ();
1238+ final AbstractH2StreamMultiplexer mux = new H2StreamMultiplexerImpl (
1239+ protocolIOSession ,
1240+ FRAME_FACTORY ,
1241+ StreamIdGenerator .EVEN ,
1242+ httpProcessor ,
1243+ CharCodingConfig .DEFAULT ,
1244+ h2Config ,
1245+ h2StreamListener ,
1246+ () -> streamHandler );
1247+
1248+ // Encode request headers
1249+ final ByteArrayBuffer headerBuf = new ByteArrayBuffer (200 );
1250+ final HPackEncoder encoder = new HPackEncoder (h2Config .getHeaderTableSize (),
1251+ CharCodingSupport .createEncoder (CharCodingConfig .DEFAULT ));
1252+ final List <Header > headers = Arrays .asList (
1253+ new BasicHeader (":method" , "GET" ),
1254+ new BasicHeader (":scheme" , "http" ),
1255+ new BasicHeader (":path" , "/" ),
1256+ new BasicHeader (":authority" , "www.example.com" ));
1257+ encoder .encodeHeaders (headerBuf , headers , h2Config .isCompressionEnabled ());
1258+
1259+ final WritableByteChannelMock writableChannel = new WritableByteChannelMock (1024 );
1260+ final FrameOutputBuffer outBuffer = new FrameOutputBuffer (16 * 1024 );
1261+
1262+ // Send HEADERS (endHeaders=true, endStream=false) to create stream 1
1263+ final RawFrame headerFrame = FRAME_FACTORY .createHeaders (1 ,
1264+ ByteBuffer .wrap (headerBuf .array (), 0 , headerBuf .length ()), true , false );
1265+ outBuffer .write (headerFrame , writableChannel );
1266+ mux .onInput (ByteBuffer .wrap (writableChannel .toByteArray ()));
1267+
1268+ Mockito .verify (streamHandler ).consumeHeader (headersCaptor .capture (), ArgumentMatchers .eq (false ));
1269+ Assertions .assertFalse (headersCaptor .getValue ().isEmpty ());
1270+
1271+ // Retrieve the stream and set a short timeout
1272+ final Field streamsField = AbstractH2StreamMultiplexer .class .getDeclaredField ("streams" );
1273+ streamsField .setAccessible (true );
1274+ final H2Streams h2Streams = (H2Streams ) streamsField .get (mux );
1275+ final H2Stream stream = h2Streams .lookupValid (1 );
1276+ stream .setTimeout (Timeout .ofMilliseconds (50 ));
1277+
1278+ // Push last activity into the past so the timeout is expired
1279+ final Field lastActivityField = H2Stream .class .getDeclaredField ("lastActivityNanos" );
1280+ lastActivityField .setAccessible (true );
1281+ lastActivityField .set (stream , System .nanoTime () - TimeUnit .MILLISECONDS .toNanos (100 ));
1282+
1283+ // Send DATA frame for the expired stream
1284+ writableChannel .reset ();
1285+ final RawFrame dataFrame = FRAME_FACTORY .createData (1 ,
1286+ ByteBuffer .wrap ("hello" .getBytes (StandardCharsets .US_ASCII )), true );
1287+ outBuffer .write (dataFrame , writableChannel );
1288+ mux .onInput (ByteBuffer .wrap (writableChannel .toByteArray ()));
1289+
1290+ // The handler must receive a timeout failure, not data
1291+ Mockito .verify (streamHandler ).failed (exceptionCaptor .capture ());
1292+ Assertions .assertInstanceOf (H2StreamTimeoutException .class , exceptionCaptor .getValue ());
1293+ Mockito .verify (streamHandler , Mockito .never ()).consumeData (
1294+ ArgumentMatchers .any (ByteBuffer .class ), ArgumentMatchers .anyBoolean ());
1295+ }
1296+
1297+ @ Test
1298+ void testExpiredStreamResetOnInboundHeaders () throws Exception {
1299+ Mockito .when (protocolIOSession .write (ArgumentMatchers .any (ByteBuffer .class )))
1300+ .thenAnswer (invocation -> {
1301+ final ByteBuffer buffer = invocation .getArgument (0 , ByteBuffer .class );
1302+ final int remaining = buffer .remaining ();
1303+ buffer .position (buffer .limit ());
1304+ return remaining ;
1305+ });
1306+ Mockito .doNothing ().when (protocolIOSession ).setEvent (ArgumentMatchers .anyInt ());
1307+ Mockito .doNothing ().when (protocolIOSession ).clearEvent (ArgumentMatchers .anyInt ());
1308+
1309+ final H2Config h2Config = H2Config .custom ().build ();
1310+ final AbstractH2StreamMultiplexer mux = new H2StreamMultiplexerImpl (
1311+ protocolIOSession ,
1312+ FRAME_FACTORY ,
1313+ StreamIdGenerator .EVEN ,
1314+ httpProcessor ,
1315+ CharCodingConfig .DEFAULT ,
1316+ h2Config ,
1317+ h2StreamListener ,
1318+ () -> streamHandler );
1319+
1320+ // Encode request headers
1321+ final ByteArrayBuffer headerBuf = new ByteArrayBuffer (200 );
1322+ final HPackEncoder encoder = new HPackEncoder (h2Config .getHeaderTableSize (),
1323+ CharCodingSupport .createEncoder (CharCodingConfig .DEFAULT ));
1324+ final List <Header > headers = Arrays .asList (
1325+ new BasicHeader (":method" , "POST" ),
1326+ new BasicHeader (":scheme" , "http" ),
1327+ new BasicHeader (":path" , "/" ),
1328+ new BasicHeader (":authority" , "www.example.com" ));
1329+ encoder .encodeHeaders (headerBuf , headers , h2Config .isCompressionEnabled ());
1330+
1331+ final WritableByteChannelMock writableChannel = new WritableByteChannelMock (1024 );
1332+ final FrameOutputBuffer outBuffer = new FrameOutputBuffer (16 * 1024 );
1333+
1334+ // Send initial HEADERS (endHeaders=true, endStream=false) to create stream 1
1335+ final RawFrame headerFrame = FRAME_FACTORY .createHeaders (1 ,
1336+ ByteBuffer .wrap (headerBuf .array (), 0 , headerBuf .length ()), true , false );
1337+ outBuffer .write (headerFrame , writableChannel );
1338+ mux .onInput (ByteBuffer .wrap (writableChannel .toByteArray ()));
1339+
1340+ Mockito .verify (streamHandler ).consumeHeader (headersCaptor .capture (), ArgumentMatchers .eq (false ));
1341+ Assertions .assertFalse (headersCaptor .getValue ().isEmpty ());
1342+
1343+ // Retrieve the stream and set a short timeout
1344+ final Field streamsField = AbstractH2StreamMultiplexer .class .getDeclaredField ("streams" );
1345+ streamsField .setAccessible (true );
1346+ final H2Streams h2Streams = (H2Streams ) streamsField .get (mux );
1347+ final H2Stream stream = h2Streams .lookupValid (1 );
1348+ stream .setTimeout (Timeout .ofMilliseconds (50 ));
1349+
1350+ // Push last activity into the past so the timeout is expired
1351+ final Field lastActivityField = H2Stream .class .getDeclaredField ("lastActivityNanos" );
1352+ lastActivityField .setAccessible (true );
1353+ lastActivityField .set (stream , System .nanoTime () - TimeUnit .MILLISECONDS .toNanos (100 ));
1354+
1355+ // Send trailing HEADERS (endHeaders=true, endStream=true) for the expired stream
1356+ writableChannel .reset ();
1357+ final ByteArrayBuffer trailerBuf = new ByteArrayBuffer (64 );
1358+ encoder .encodeHeaders (trailerBuf , Arrays .asList (
1359+ new BasicHeader ("x-checksum" , "abc123" )), h2Config .isCompressionEnabled ());
1360+ final RawFrame trailerFrame = FRAME_FACTORY .createHeaders (1 ,
1361+ ByteBuffer .wrap (trailerBuf .array (), 0 , trailerBuf .length ()), true , true );
1362+ outBuffer .write (trailerFrame , writableChannel );
1363+ mux .onInput (ByteBuffer .wrap (writableChannel .toByteArray ()));
1364+
1365+ // The handler must receive a timeout failure, not a second header consumption
1366+ Mockito .verify (streamHandler ).failed (exceptionCaptor .capture ());
1367+ Assertions .assertInstanceOf (H2StreamTimeoutException .class , exceptionCaptor .getValue ());
1368+ // consumeHeader was called exactly once (the initial HEADERS), not twice
1369+ Mockito .verify (streamHandler , Mockito .times (1 )).consumeHeader (
1370+ ArgumentMatchers .anyList (), ArgumentMatchers .anyBoolean ());
1371+ }
1372+
1373+ @ Test
1374+ void testExpiredStreamResetOnInboundContinuation () throws Exception {
1375+ Mockito .when (protocolIOSession .write (ArgumentMatchers .any (ByteBuffer .class )))
1376+ .thenAnswer (invocation -> {
1377+ final ByteBuffer buffer = invocation .getArgument (0 , ByteBuffer .class );
1378+ final int remaining = buffer .remaining ();
1379+ buffer .position (buffer .limit ());
1380+ return remaining ;
1381+ });
1382+ Mockito .doNothing ().when (protocolIOSession ).setEvent (ArgumentMatchers .anyInt ());
1383+ Mockito .doNothing ().when (protocolIOSession ).clearEvent (ArgumentMatchers .anyInt ());
1384+
1385+ final H2Config h2Config = H2Config .custom ().build ();
1386+ final AbstractH2StreamMultiplexer mux = new H2StreamMultiplexerImpl (
1387+ protocolIOSession ,
1388+ FRAME_FACTORY ,
1389+ StreamIdGenerator .EVEN ,
1390+ httpProcessor ,
1391+ CharCodingConfig .DEFAULT ,
1392+ h2Config ,
1393+ h2StreamListener ,
1394+ () -> streamHandler );
1395+
1396+ // Encode request headers
1397+ final ByteArrayBuffer headerBuf = new ByteArrayBuffer (200 );
1398+ final HPackEncoder encoder = new HPackEncoder (h2Config .getHeaderTableSize (),
1399+ CharCodingSupport .createEncoder (CharCodingConfig .DEFAULT ));
1400+ final List <Header > headers = Arrays .asList (
1401+ new BasicHeader (":method" , "GET" ),
1402+ new BasicHeader (":scheme" , "http" ),
1403+ new BasicHeader (":path" , "/" ),
1404+ new BasicHeader (":authority" , "www.example.com" ));
1405+ encoder .encodeHeaders (headerBuf , headers , h2Config .isCompressionEnabled ());
1406+
1407+ // Split encoded headers: first part in HEADERS, remainder in CONTINUATION
1408+ final int split = headerBuf .length () / 2 ;
1409+ final WritableByteChannelMock writableChannel = new WritableByteChannelMock (1024 );
1410+ final FrameOutputBuffer outBuffer = new FrameOutputBuffer (16 * 1024 );
1411+
1412+ // Send HEADERS (endHeaders=false, endStream=false) to create stream 1
1413+ final RawFrame headerFrame = FRAME_FACTORY .createHeaders (1 ,
1414+ ByteBuffer .wrap (headerBuf .array (), 0 , split ), false , false );
1415+ outBuffer .write (headerFrame , writableChannel );
1416+ mux .onInput (ByteBuffer .wrap (writableChannel .toByteArray ()));
1417+
1418+ // Stream created but consumeHeader not yet called (waiting for CONTINUATION)
1419+ Mockito .verify (streamHandler , Mockito .never ()).consumeHeader (
1420+ ArgumentMatchers .anyList (), ArgumentMatchers .anyBoolean ());
1421+
1422+ // Retrieve the stream and set a short timeout
1423+ final Field streamsField = AbstractH2StreamMultiplexer .class .getDeclaredField ("streams" );
1424+ streamsField .setAccessible (true );
1425+ final H2Streams h2Streams = (H2Streams ) streamsField .get (mux );
1426+ final H2Stream stream = h2Streams .lookupValid (1 );
1427+ stream .setTimeout (Timeout .ofMilliseconds (50 ));
1428+
1429+ // Push last activity into the past so the timeout is expired
1430+ final Field lastActivityField = H2Stream .class .getDeclaredField ("lastActivityNanos" );
1431+ lastActivityField .setAccessible (true );
1432+ lastActivityField .set (stream , System .nanoTime () - TimeUnit .MILLISECONDS .toNanos (100 ));
1433+
1434+ // Send CONTINUATION (endHeaders=true) for the expired stream
1435+ writableChannel .reset ();
1436+ final RawFrame continuationFrame = FRAME_FACTORY .createContinuation (1 ,
1437+ ByteBuffer .wrap (headerBuf .array (), split , headerBuf .length () - split ), true );
1438+ outBuffer .write (continuationFrame , writableChannel );
1439+ mux .onInput (ByteBuffer .wrap (writableChannel .toByteArray ()));
1440+
1441+ // The handler must receive a timeout failure, not header consumption
1442+ Mockito .verify (streamHandler ).failed (exceptionCaptor .capture ());
1443+ Assertions .assertInstanceOf (H2StreamTimeoutException .class , exceptionCaptor .getValue ());
1444+ Mockito .verify (streamHandler , Mockito .never ()).consumeHeader (
1445+ ArgumentMatchers .anyList (), ArgumentMatchers .anyBoolean ());
1446+ }
1447+
11681448 @ Test
11691449 void testOutboundTrailersWithPseudoHeaderRejected () throws Exception {
11701450 final AbstractH2StreamMultiplexer mux = new H2StreamMultiplexerImpl (
0 commit comments