Skip to content

Commit 7dfce5b

Browse files
authored
Merge pull request #87 from jacomago/resubscribe-bug
Fix ByteBuffer corruption on reconnect causing IndexOutOfBoundsException
2 parents 1a8c43e + 873db32 commit 7dfce5b

5 files changed

Lines changed: 394 additions & 66 deletions

File tree

src/core/com/cosylab/epics/caj/CAJContext.java

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,12 @@ public class CAJContext extends Context implements CAContext, CAJConstants, Conf
176176
*/
177177
protected int maxArrayBytes = 16384;
178178

179+
/**
180+
* Maximum number of times to retry a partial TCP send before giving up.
181+
* A value of -1 means retry indefinitely (the default).
182+
*/
183+
protected int maxSendRetries = -1;
184+
179185
/**
180186
* Minimum interval in seconds between CA search broadcasts. Default is 0.1 seconds
181187
*/
@@ -465,6 +471,11 @@ protected void loadConfiguration()
465471
catch (Exception ex)
466472
{ logger.log(Level.WARNING, "Cannot parse EPICS_CA_MAX_ARRAY_BYTES='" + tmp + "'", ex); }
467473

474+
tmp = System.getenv("EPICS_CA_MAX_SEND_RETRIES");
475+
if (tmp != null)
476+
try { maxSendRetries = Integer.parseInt(tmp); }
477+
catch (Exception ex)
478+
{ logger.log(Level.WARNING, "Cannot parse EPICS_CA_MAX_SEND_RETRIES='" + tmp + "'", ex); }
468479

469480
tmp = System.getenv("EPICS_CA_MAX_SEARCH_PERIOD");
470481
if (tmp != null)
@@ -485,9 +496,10 @@ protected void loadConfiguration()
485496
repeaterPort = jcaLibrary.getPropertyAsInt(contextClassName + ".repeater_port", repeaterPort);
486497
serverPort = jcaLibrary.getPropertyAsInt(contextClassName + ".server_port", serverPort);
487498
maxArrayBytes = jcaLibrary.getPropertyAsInt(contextClassName + ".max_array_bytes", maxArrayBytes);
499+
maxSendRetries = jcaLibrary.getPropertyAsInt(contextClassName + ".max_send_retries", maxSendRetries);
488500
maxSearchInterval = jcaLibrary.getPropertyAsFloat(contextClassName + ".max_search_interval", maxSearchInterval);
489501
eventDispatcherClassName = jcaLibrary.getProperty(contextClassName + ".event_dispatcher");
490-
502+
491503
// load CAJ specific configuration (overrides default)
492504
addressList = jcaLibrary.getProperty(thisClassName + ".addr_list", addressList);
493505
autoAddressList = jcaLibrary.getPropertyAsBoolean(thisClassName + ".auto_addr_list", autoAddressList);
@@ -500,6 +512,7 @@ protected void loadConfiguration()
500512
repeaterPort = jcaLibrary.getPropertyAsInt(thisClassName + ".repeater_port", repeaterPort);
501513
serverPort = jcaLibrary.getPropertyAsInt(thisClassName + ".server_port", serverPort);
502514
maxArrayBytes = jcaLibrary.getPropertyAsInt(thisClassName + ".max_array_bytes", maxArrayBytes);
515+
maxSendRetries = jcaLibrary.getPropertyAsInt(thisClassName + ".max_send_retries", maxSendRetries);
503516
minSearchInterval = jcaLibrary.getPropertyAsFloat(thisClassName + ".min_search_interval", minSearchInterval);
504517
maxSearchInterval = jcaLibrary.getPropertyAsFloat(thisClassName + ".max_search_interval", maxSearchInterval);
505518
}
@@ -580,6 +593,13 @@ public void configure(Configuration configuration)
580593
maxArrayBytes = configuration.getAttributeAsInteger("max_array_bytes", maxArrayBytes);
581594
}
582595

596+
// max. send retries (-1 = infinite)
597+
try {
598+
maxSendRetries = configuration.getChild("max_send_retries", false).getValueAsInteger();
599+
} catch(Exception ex) {
600+
maxSendRetries = configuration.getAttributeAsInteger("max_send_retries", maxSendRetries);
601+
}
602+
583603
// max. search interval
584604
try {
585605
maxSearchInterval = configuration.getChild("max_search_interval", false).getValueAsFloat();
@@ -1322,6 +1342,7 @@ public void printInfo(PrintStream out) throws IllegalStateException {
13221342
out.println("REPEATER_PORT : " + repeaterPort);
13231343
out.println("SERVER_PORT : " + serverPort);
13241344
out.println("MAX_ARRAY_BYTES : " + maxArrayBytes);
1345+
out.println("MAX_SEND_RETRIES : " + (maxSendRetries < 0 ? "infinite" : maxSendRetries));
13251346
out.println("MIN_SEARCH_INTERVAL : " + minSearchInterval);
13261347
out.println("MAX_SEARCH_INTERVAL : " + maxSearchInterval);
13271348
out.println("EVENT_DISPATCHER: " + eventDispatcher);
@@ -1438,6 +1459,14 @@ public int getMaxArrayBytes() {
14381459
return maxArrayBytes;
14391460
}
14401461

1462+
/**
1463+
* Get maximum number of TCP send retries (-1 means infinite).
1464+
* @return max send retries, or -1 for unlimited.
1465+
*/
1466+
public int getMaxSendRetries() {
1467+
return maxSendRetries;
1468+
}
1469+
14411470
/**
14421471
* Get repeater port.
14431472
* @return repeater port.

src/core/com/cosylab/epics/caj/impl/CATransport.java

Lines changed: 45 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -630,96 +630,76 @@ protected void enableFlowControl()
630630

631631
/**
632632
* Send a buffer through the transport.
633-
* NOTE: TCP sent buffer/sending has to be synchronized.
633+
* NOTE: TCP sent buffer/sending has to be synchronized.
634634
* @param buffer buffer to be sent
635-
* @throws IOException
635+
* @throws IOException
636636
*/
637-
public void send(ByteBuffer buffer, boolean asyncCloseOnError) throws IOException
637+
public void send(ByteBuffer buffer) throws IOException
638638
{
639639
sendLock.lock();
640640
try
641641
{
642-
noSyncSend(buffer, asyncCloseOnError);
642+
noSyncSend(buffer);
643643
}
644-
finally
644+
finally
645645
{
646646
sendLock.unlock();
647647
}
648648
}
649649

650650
/**
651-
* Send a buffer through the transport.
652-
* NOTE: TCP sent buffer/sending has to be synchronized.
653-
* @param buffer buffer to be sent
654-
* @throws IOException
651+
* Send a buffer through the transport without acquiring the send lock
652+
* (caller is responsible for holding it).
653+
*
654+
* <p>Flips the buffer, then loops until all bytes are written. If the
655+
* kernel TCP send buffer is full, waits briefly and retries. Throws
656+
* {@link IOException} (and closes the transport) if the send buffer
657+
* remains full after the configured max send retries (see
658+
* {@link CAJContext#getMaxSendRetries()}; -1 means retry indefinitely),
659+
* the channel is already closed, or the calling thread is interrupted.
660+
*
661+
* @param buffer fully-written buffer to send (will be flipped)
662+
* @throws IOException on write error, persistent backpressure, or interrupt
655663
*/
656-
// TODO optimize !!!
657-
private void noSyncSend(ByteBuffer buffer, boolean asyncCloseOnError) throws IOException
664+
private void noSyncSend(ByteBuffer buffer) throws IOException
658665
{
659666
try
660667
{
661-
// prepare buffer
662668
buffer.flip();
663669

664-
final int SEND_BUFFER_LIMIT = 16000;
665-
int bufferLimit = buffer.limit();
670+
context.getLogger().finest("Sending " + buffer.limit() + " bytes to " + socketAddress + ".");
666671

667-
// TODO remove?!
668-
context.getLogger().finest("Sending " + bufferLimit + " bytes to " + socketAddress + ".");
669-
670-
// limit sending large buffers, split the into parts
671-
int parts = (buffer.limit()-1) / SEND_BUFFER_LIMIT + 1;
672-
for (int part = 1; part <= parts; part++)
672+
int tries = 0;
673+
final int maxRetries = context.getMaxSendRetries();
674+
while (buffer.hasRemaining())
673675
{
674-
if (parts > 1)
675-
{
676-
buffer.limit(Math.min(part * SEND_BUFFER_LIMIT, bufferLimit));
677-
context.getLogger().finest("[Parted] Sending (part " + part + "/" + parts + ") " + (buffer.limit()-buffer.position()) + " bytes to " + socketAddress + ".");
678-
}
679-
680-
final int TRIES = 10;
681-
for (int tries = 0; /* tries <= TRIES */ ; tries++)
676+
int bytesSent = channel.write(buffer);
677+
if (bytesSent < 0)
678+
throw new IOException("channel closed");
679+
680+
if (buffer.hasRemaining())
682681
{
683-
684-
// send
685-
int bytesSent = channel.write(buffer);
686-
if (bytesSent < 0)
687-
throw new IOException("bytesSent < 0");
688-
689-
// bytesSend == buffer.position(), so there is no need for flip()
690-
if (buffer.position() != buffer.limit())
691-
{
692-
if (closed)
693-
throw new IOException("transport closed on the client side");
694-
695-
if (tries >= TRIES)
696-
{
697-
context.getLogger().warning("Failed to send message to " + socketAddress + " - buffer full, will retry.");
698-
699-
//if (tries >= 2*TRIES)
700-
// throw new IOException("TCP send buffer persistently full, disconnecting!");
701-
702-
}
703-
704-
// flush & wait for a while...
705-
context.getLogger().finest("Send buffer full for " + socketAddress + ", waiting...");
706-
channel.socket().getOutputStream().flush();
707-
try {
708-
Thread.sleep(Math.min(15000,10+tries*100));
709-
} catch (InterruptedException e) {
710-
// noop
711-
}
712-
continue;
682+
if (closed)
683+
throw new IOException("transport closed on the client side");
684+
685+
tries++;
686+
if (maxRetries >= 0 && tries > maxRetries)
687+
throw new IOException("TCP send buffer persistently full, disconnecting " + socketAddress);
688+
689+
context.getLogger().finest("Send buffer full for " + socketAddress
690+
+ ", waiting (attempt " + tries
691+
+ (maxRetries < 0 ? "" : "/" + maxRetries) + ")...");
692+
try {
693+
Thread.sleep(Math.min(15000, 10 + tries * 100));
694+
} catch (InterruptedException e) {
695+
Thread.currentThread().interrupt();
696+
throw new IOException("interrupted while sending to " + socketAddress);
713697
}
714-
else
715-
break;
716698
}
717-
718699
}
719700
}
720-
catch (IOException ioex)
701+
catch (IOException ioex)
721702
{
722-
// close connection
723703
close(true);
724704
throw ioex;
725705
}
@@ -832,7 +812,7 @@ public boolean flushInternal()
832812
}
833813

834814
try {
835-
send(buf, false);
815+
send(buf);
836816
}
837817
finally {
838818
// return back to the cache
@@ -883,7 +863,7 @@ public void submit(Request requestMessage) throws IOException {
883863
{
884864
try
885865
{
886-
noSyncSend(message, true);
866+
noSyncSend(message);
887867
return;
888868
}
889869
finally

src/core/com/cosylab/epics/caj/impl/requests/EventAddRequest.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,18 @@ public void submit() throws IOException
163163
public void resubscribeSubscription(Transport transport) throws IOException
164164
{
165165
this.transport = transport;
166+
// Reset buffer state: a prior failed send (IOException mid-write in noSyncSend)
167+
// can leave position < capacity and limit = capacity. The next flip() would then
168+
// set limit = that partial position, causing putInt(8,…) to throw
169+
// IndexOutOfBoundsException on the reconnect after that.
170+
//
171+
// requestMessage is a fixed-size, single-purpose buffer allocated in the
172+
// constructor (see EventAddRequest() lines ~107/114) to hold exactly one
173+
// EventAdd message, so capacity() == message size is always true. Resetting
174+
// to position == limit == capacity restores the fully-written state, so the
175+
// flip() in Transport.submit() always produces position=0, limit=capacity.
176+
requestMessage.limit(requestMessage.capacity());
177+
requestMessage.position(requestMessage.capacity());
166178
// update channel sid
167179
requestMessage.putInt(8, channel.getServerChannelID());
168180
// immediate send (increase priority - all subsequent sends will be done immediately).

0 commit comments

Comments
 (0)