@@ -628,99 +628,77 @@ protected void enableFlowControl()
628628 }
629629 }
630630
631+ /** Maximum number of times to retry a partial write before giving up. */
632+ private static final int MAX_SEND_RETRIES = 10 ;
633+
631634 /**
632635 * Send a buffer through the transport.
633- * NOTE: TCP sent buffer/sending has to be synchronized.
636+ * NOTE: TCP sent buffer/sending has to be synchronized.
634637 * @param buffer buffer to be sent
635- * @throws IOException
638+ * @throws IOException
636639 */
637- public void send (ByteBuffer buffer , boolean asyncCloseOnError ) throws IOException
640+ public void send (ByteBuffer buffer ) throws IOException
638641 {
639642 sendLock .lock ();
640643 try
641644 {
642- noSyncSend (buffer , asyncCloseOnError );
645+ noSyncSend (buffer );
643646 }
644- finally
647+ finally
645648 {
646649 sendLock .unlock ();
647650 }
648651 }
649652
650653 /**
651- * Send a buffer through the transport.
652- * NOTE: TCP sent buffer/sending has to be synchronized.
653- * @param buffer buffer to be sent
654- * @throws IOException
654+ * Send a buffer through the transport without acquiring the send lock
655+ * (caller is responsible for holding it).
656+ *
657+ * <p>Flips the buffer, then loops until all bytes are written. If the
658+ * kernel TCP send buffer is full, waits briefly and retries. Throws
659+ * {@link IOException} (and closes the transport) if the send buffer
660+ * remains full after {@value #MAX_SEND_RETRIES} retries, the channel is
661+ * already closed, or the calling thread is interrupted.
662+ *
663+ * @param buffer fully-written buffer to send (will be flipped)
664+ * @throws IOException on write error, persistent backpressure, or interrupt
655665 */
656- // TODO optimize !!!
657- private void noSyncSend (ByteBuffer buffer , boolean asyncCloseOnError ) throws IOException
666+ private void noSyncSend (ByteBuffer buffer ) throws IOException
658667 {
659668 try
660669 {
661- // prepare buffer
662670 buffer .flip ();
663671
664- final int SEND_BUFFER_LIMIT = 16000 ;
665- int bufferLimit = buffer .limit ();
666-
667- // TODO remove?!
668- context .getLogger ().finest ("Sending " + bufferLimit + " bytes to " + socketAddress + "." );
672+ context .getLogger ().finest ("Sending " + buffer .limit () + " bytes to " + socketAddress + "." );
669673
670- // limit sending large buffers, split the into parts
671- int parts = (buffer .limit ()-1 ) / SEND_BUFFER_LIMIT + 1 ;
672- for (int part = 1 ; part <= parts ; part ++)
674+ int tries = 0 ;
675+ while (buffer .hasRemaining ())
673676 {
674- if (parts > 1 )
675- {
676- buffer .limit (Math .min (part * SEND_BUFFER_LIMIT , bufferLimit ));
677- context .getLogger ().finest ("[Parted] Sending (part " + part + "/" + parts + ") " + (buffer .limit ()-buffer .position ()) + " bytes to " + socketAddress + "." );
678- }
679-
680- final int TRIES = 10 ;
681- for (int tries = 0 ; /* tries <= TRIES */ ; tries ++)
677+ int bytesSent = channel .write (buffer );
678+ if (bytesSent < 0 )
679+ throw new IOException ("channel closed" );
680+
681+ if (buffer .hasRemaining ())
682682 {
683-
684- // send
685- int bytesSent = channel .write (buffer );
686- if (bytesSent < 0 )
687- throw new IOException ("bytesSent < 0" );
688-
689- // bytesSend == buffer.position(), so there is no need for flip()
690- if (buffer .position () != buffer .limit ())
691- {
692- if (closed )
693- throw new IOException ("transport closed on the client side" );
694-
695- if (tries >= TRIES )
696- {
697- context .getLogger ().warning ("Failed to send message to " + socketAddress + " - buffer full, will retry." );
698-
699- //if (tries >= 2*TRIES)
700- // throw new IOException("TCP send buffer persistently full, disconnecting!");
701-
702- }
703-
704- // wait for a while to let the OS drain the TCP send buffer
705- // channel.socket().getOutputStream().flush() was here but throws
706- // IllegalBlockingModeException in non-blocking mode and is a no-op for TCP output)
707- context .getLogger ().finest ("Send buffer full for " + socketAddress + ", waiting..." );
708- try {
709- Thread .sleep (Math .min (15000 ,10 +tries *100 ));
710- } catch (InterruptedException e ) {
711- // noop
712- }
713- continue ;
683+ if (closed )
684+ throw new IOException ("transport closed on the client side" );
685+
686+ if (++tries > MAX_SEND_RETRIES )
687+ throw new IOException ("TCP send buffer persistently full, disconnecting " + socketAddress );
688+
689+ context .getLogger ().finest ("Send buffer full for " + socketAddress
690+ + ", waiting (attempt " + tries + "/" + MAX_SEND_RETRIES + ")..." );
691+ try {
692+ Thread .sleep (Math .min (15000 , 10 + tries * 100 ));
693+ } catch (InterruptedException e ) {
694+ Thread .currentThread ().interrupt ();
695+ throw new IOException ("interrupted while sending to " + socketAddress );
714696 }
715- else
716- break ;
717697 }
718-
719698 }
720699 }
721- catch (IOException ioex )
700+ catch (IOException ioex )
722701 {
723- // close connection
724702 close (true );
725703 throw ioex ;
726704 }
@@ -833,7 +811,7 @@ public boolean flushInternal()
833811 }
834812
835813 try {
836- send (buf , false );
814+ send (buf );
837815 }
838816 finally {
839817 // return back to the cache
@@ -884,7 +862,7 @@ public void submit(Request requestMessage) throws IOException {
884862 {
885863 try
886864 {
887- noSyncSend (message , true );
865+ noSyncSend (message );
888866 return ;
889867 }
890868 finally
0 commit comments