Skip to content

Commit 90eb60f

Browse files
committed
QPIDJMS-600 Ensure session and connection close await async sends
Session and Connection close should be awaiting the outcome of async send completions before returning. This change allows them to await up to the close timeout value before moving on and failing any completions that are not completed after that point. Several tests added to cover this behavior.
1 parent e0c4039 commit 90eb60f

11 files changed

Lines changed: 552 additions & 210 deletions

qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java

Lines changed: 29 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -29,27 +29,6 @@
2929
import java.util.concurrent.atomic.AtomicLong;
3030
import java.util.concurrent.atomic.AtomicReference;
3131

32-
import jakarta.jms.Connection;
33-
import jakarta.jms.ConnectionConsumer;
34-
import jakarta.jms.ConnectionMetaData;
35-
import jakarta.jms.Destination;
36-
import jakarta.jms.ExceptionListener;
37-
import jakarta.jms.IllegalStateException;
38-
import jakarta.jms.InvalidClientIDException;
39-
import jakarta.jms.InvalidDestinationException;
40-
import jakarta.jms.JMSException;
41-
import jakarta.jms.JMSRuntimeException;
42-
import jakarta.jms.Queue;
43-
import jakarta.jms.QueueConnection;
44-
import jakarta.jms.QueueSession;
45-
import jakarta.jms.ServerSessionPool;
46-
import jakarta.jms.Session;
47-
import jakarta.jms.TemporaryQueue;
48-
import jakarta.jms.TemporaryTopic;
49-
import jakarta.jms.Topic;
50-
import jakarta.jms.TopicConnection;
51-
import jakarta.jms.TopicSession;
52-
5332
import org.apache.qpid.jms.exceptions.JmsConnectionFailedException;
5433
import org.apache.qpid.jms.exceptions.JmsExceptionSupport;
5534
import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
@@ -91,6 +70,27 @@
9170
import org.slf4j.Logger;
9271
import org.slf4j.LoggerFactory;
9372

73+
import jakarta.jms.Connection;
74+
import jakarta.jms.ConnectionConsumer;
75+
import jakarta.jms.ConnectionMetaData;
76+
import jakarta.jms.Destination;
77+
import jakarta.jms.ExceptionListener;
78+
import jakarta.jms.IllegalStateException;
79+
import jakarta.jms.InvalidClientIDException;
80+
import jakarta.jms.InvalidDestinationException;
81+
import jakarta.jms.JMSException;
82+
import jakarta.jms.JMSRuntimeException;
83+
import jakarta.jms.Queue;
84+
import jakarta.jms.QueueConnection;
85+
import jakarta.jms.QueueSession;
86+
import jakarta.jms.ServerSessionPool;
87+
import jakarta.jms.Session;
88+
import jakarta.jms.TemporaryQueue;
89+
import jakarta.jms.TemporaryTopic;
90+
import jakarta.jms.Topic;
91+
import jakarta.jms.TopicConnection;
92+
import jakarta.jms.TopicSession;
93+
9494
/**
9595
* Implementation of a JMS Connection
9696
*/
@@ -916,6 +916,14 @@ void pull(JmsConsumerId consumerId, long timeout, ProviderSynchronization synchr
916916
}
917917
}
918918

919+
ProviderFuture newProviderFuture() {
920+
return newProviderFuture(null);
921+
}
922+
923+
ProviderFuture newProviderFuture(ProviderSynchronization synchronization) {
924+
return provider.newProviderFuture(synchronization);
925+
}
926+
919927
//----- Property setters and getters -------------------------------------//
920928

921929
@Override

qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java

Lines changed: 79 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,36 @@
3939
import java.util.concurrent.locks.ReentrantLock;
4040
import java.util.function.Consumer;
4141

42+
import org.apache.qpid.jms.exceptions.JmsConnectionFailedException;
43+
import org.apache.qpid.jms.exceptions.JmsExceptionSupport;
44+
import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
45+
import org.apache.qpid.jms.message.JmsMessage;
46+
import org.apache.qpid.jms.message.JmsMessageTransformation;
47+
import org.apache.qpid.jms.message.JmsOutboundMessageDispatch;
48+
import org.apache.qpid.jms.meta.JmsConsumerId;
49+
import org.apache.qpid.jms.meta.JmsConsumerInfo;
50+
import org.apache.qpid.jms.meta.JmsProducerId;
51+
import org.apache.qpid.jms.meta.JmsProducerInfo;
52+
import org.apache.qpid.jms.meta.JmsResource.ResourceState;
53+
import org.apache.qpid.jms.meta.JmsSessionId;
54+
import org.apache.qpid.jms.meta.JmsSessionInfo;
55+
import org.apache.qpid.jms.policy.JmsDeserializationPolicy;
56+
import org.apache.qpid.jms.policy.JmsMessageIDPolicy;
57+
import org.apache.qpid.jms.policy.JmsPrefetchPolicy;
58+
import org.apache.qpid.jms.policy.JmsPresettlePolicy;
59+
import org.apache.qpid.jms.policy.JmsRedeliveryPolicy;
60+
import org.apache.qpid.jms.provider.Provider;
61+
import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE;
62+
import org.apache.qpid.jms.provider.ProviderException;
63+
import org.apache.qpid.jms.provider.ProviderFuture;
64+
import org.apache.qpid.jms.provider.ProviderSynchronization;
65+
import org.apache.qpid.jms.selector.SelectorParser;
66+
import org.apache.qpid.jms.selector.filter.FilterException;
67+
import org.apache.qpid.jms.util.NoOpExecutor;
68+
import org.apache.qpid.jms.util.QpidJMSThreadFactory;
69+
import org.slf4j.Logger;
70+
import org.slf4j.LoggerFactory;
71+
4272
import jakarta.jms.BytesMessage;
4373
import jakarta.jms.CompletionListener;
4474
import jakarta.jms.DeliveryMode;
@@ -71,36 +101,6 @@
71101
import jakarta.jms.TopicSession;
72102
import jakarta.jms.TopicSubscriber;
73103

74-
import org.apache.qpid.jms.exceptions.JmsConnectionFailedException;
75-
import org.apache.qpid.jms.exceptions.JmsExceptionSupport;
76-
import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
77-
import org.apache.qpid.jms.message.JmsMessage;
78-
import org.apache.qpid.jms.message.JmsMessageTransformation;
79-
import org.apache.qpid.jms.message.JmsOutboundMessageDispatch;
80-
import org.apache.qpid.jms.meta.JmsConsumerId;
81-
import org.apache.qpid.jms.meta.JmsConsumerInfo;
82-
import org.apache.qpid.jms.meta.JmsProducerId;
83-
import org.apache.qpid.jms.meta.JmsProducerInfo;
84-
import org.apache.qpid.jms.meta.JmsResource.ResourceState;
85-
import org.apache.qpid.jms.meta.JmsSessionId;
86-
import org.apache.qpid.jms.meta.JmsSessionInfo;
87-
import org.apache.qpid.jms.policy.JmsDeserializationPolicy;
88-
import org.apache.qpid.jms.policy.JmsMessageIDPolicy;
89-
import org.apache.qpid.jms.policy.JmsPrefetchPolicy;
90-
import org.apache.qpid.jms.policy.JmsPresettlePolicy;
91-
import org.apache.qpid.jms.policy.JmsRedeliveryPolicy;
92-
import org.apache.qpid.jms.provider.Provider;
93-
import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE;
94-
import org.apache.qpid.jms.provider.ProviderException;
95-
import org.apache.qpid.jms.provider.ProviderFuture;
96-
import org.apache.qpid.jms.provider.ProviderSynchronization;
97-
import org.apache.qpid.jms.selector.SelectorParser;
98-
import org.apache.qpid.jms.selector.filter.FilterException;
99-
import org.apache.qpid.jms.util.NoOpExecutor;
100-
import org.apache.qpid.jms.util.QpidJMSThreadFactory;
101-
import org.slf4j.Logger;
102-
import org.slf4j.LoggerFactory;
103-
104104
/**
105105
* JMS Session implementation
106106
*/
@@ -126,6 +126,7 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
126126
private final ReentrantLock sendLock = new ReentrantLock();
127127
private volatile ThreadPoolExecutor deliveryExecutor;
128128
private volatile ThreadPoolExecutor completionExcecutor;
129+
private volatile ProviderFuture asyncSendsCompletion;
129130
private AtomicReference<Thread> deliveryThread = new AtomicReference<Thread>();
130131
private boolean deliveryThreadCheckEnabled = true;
131132
private AtomicReference<Thread> completionThread = new AtomicReference<Thread>();
@@ -351,6 +352,7 @@ protected boolean shutdown(Throwable cause) throws JMSException {
351352
for (JmsMessageProducer producer : new ArrayList<JmsMessageProducer>(this.producers.values())) {
352353
producer.shutdown(cause);
353354
}
355+
354356
} catch (JMSException jmsEx) {
355357
shutdownError = jmsEx;
356358
}
@@ -367,30 +369,52 @@ protected boolean shutdown(Throwable cause) throws JMSException {
367369
}
368370
}
369371

370-
// Ensure that no asynchronous completion sends remain blocked after close.
372+
try {
373+
if (getSessionMode() == Session.CLIENT_ACKNOWLEDGE) {
374+
acknowledge(ACK_TYPE.SESSION_SHUTDOWN);
375+
}
376+
} catch (Exception e) {
377+
LOG.trace("Exception during session shutdown cleanup acknowledgement", e);
378+
}
379+
380+
// Ensure that no asynchronous completion sends remain blocked after close but wait
381+
// using the close timeout for the asynchronous sends to complete normally.
382+
final ExecutorService completionExecutor = getCompletionExecutor();
383+
371384
synchronized (sessionInfo) {
385+
// Producers are now quiesced and we can await completion of asynchronous sends
386+
// that are still pending a result or timeout once we've done a quick check to
387+
// see if any are actually pending or have completed already.
388+
asyncSendsCompletion = connection.newProviderFuture();
389+
390+
completionExecutor.execute(() -> {
391+
if (asyncSendQueue.isEmpty()) {
392+
asyncSendsCompletion.onSuccess();
393+
}
394+
});
395+
}
396+
397+
try {
398+
asyncSendsCompletion.sync(connection.getCloseTimeout(), TimeUnit.MILLISECONDS);
399+
} catch (Exception ex) {
400+
LOG.trace("Exception during wait for asynchronous sends to complete", ex);
401+
} finally {
372402
if (cause == null) {
373403
cause = new JMSException("Session closed remotely before message transfer result was notified");
374404
}
375405

376-
getCompletionExecutor().execute(new FailOrCompleteAsyncCompletionsTask(JmsExceptionSupport.create(cause)));
377-
getCompletionExecutor().shutdown();
406+
// as a last task we want to fail any stragglers in the asynchronous send queue and then
407+
// shutdown the queue to prevent any more submissions while the cleanup goes on.
408+
completionExecutor.execute(new FailOrCompleteAsyncCompletionsTask(JmsExceptionSupport.create(cause)));
409+
completionExecutor.shutdown();
378410
}
379411

380412
try {
381-
getCompletionExecutor().awaitTermination(connection.getCloseTimeout(), TimeUnit.MILLISECONDS);
413+
completionExecutor.awaitTermination(connection.getCloseTimeout(), TimeUnit.MILLISECONDS);
382414
} catch (InterruptedException e) {
383415
LOG.trace("Session close awaiting send completions was interrupted");
384416
}
385417

386-
try {
387-
if (getSessionMode() == Session.CLIENT_ACKNOWLEDGE) {
388-
acknowledge(ACK_TYPE.SESSION_SHUTDOWN);
389-
}
390-
} catch (Exception e) {
391-
LOG.trace("Exception during session shutdown cleanup acknowledgement", e);
392-
}
393-
394418
if (shutdownError != null) {
395419
throw shutdownError;
396420
}
@@ -856,11 +880,12 @@ protected void send(JmsMessageProducer producer, Destination dest, Message msg,
856880
}
857881

858882
private void send(JmsMessageProducer producer, JmsDestination destination, Message original, int deliveryMode, int priority, long timeToLive, boolean disableMsgId, boolean disableTimestamp, long deliveryDelay, CompletionListener listener) throws JMSException {
859-
sendLock.lock();
860-
861883
JmsMessage outbound = null;
884+
sendLock.lock();
862885

863886
try {
887+
checkClosed();
888+
864889
original.setJMSDeliveryMode(deliveryMode);
865890
original.setJMSPriority(priority);
866891
original.setJMSRedelivered(false);
@@ -909,7 +934,7 @@ private void send(JmsMessageProducer producer, JmsDestination destination, Messa
909934
}
910935

911936
outbound.getFacade().setDeliveryTime(deliveryTime, hasDelay);
912-
if(!isJmsMessage) {
937+
if (!isJmsMessage) {
913938
// If the original was a foreign message, we still need to update it too.
914939
setForeignMessageDeliveryTime(original, deliveryTime);
915940
}
@@ -977,7 +1002,7 @@ public void onPendingFailure(ProviderException cause) {
9771002
}
9781003
} catch (JMSException jmsEx) {
9791004
// Ensure that on failure case the message is returned to usable state for another send attempt.
980-
if(outbound != null) {
1005+
if (outbound != null) {
9811006
outbound.onSendComplete();
9821007
}
9831008
throw jmsEx;
@@ -1511,6 +1536,10 @@ public void run() {
15111536
if (producerId == null) {
15121537
asyncSendQueue.clear();
15131538
}
1539+
1540+
if (closed.get() && asyncSendsCompletion != null && asyncSendQueue.isEmpty()) {
1541+
asyncSendsCompletion.onSuccess();
1542+
}
15141543
}
15151544
}
15161545

@@ -1577,6 +1606,10 @@ public void run() {
15771606
}
15781607
}
15791608
}
1609+
1610+
if (closed.get() && asyncSendsCompletion != null && asyncSendQueue.isEmpty()) {
1611+
asyncSendsCompletion.onSuccess();
1612+
}
15801613
} catch (Exception ex) {
15811614
LOG.error("Async completion task encountered unexpected failure", ex);
15821615
}

qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/BytesMessageIntegrationTest.java

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -30,19 +30,6 @@
3030
import java.util.Arrays;
3131
import java.util.Map;
3232

33-
import jakarta.jms.BytesMessage;
34-
import jakarta.jms.CompletionListener;
35-
import jakarta.jms.Connection;
36-
import jakarta.jms.DeliveryMode;
37-
import jakarta.jms.JMSException;
38-
import jakarta.jms.Message;
39-
import jakarta.jms.MessageConsumer;
40-
import jakarta.jms.MessageFormatException;
41-
import jakarta.jms.MessageNotWriteableException;
42-
import jakarta.jms.MessageProducer;
43-
import jakarta.jms.Queue;
44-
import jakarta.jms.Session;
45-
4633
import org.apache.qpid.jms.JmsConnection;
4734
import org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport;
4835
import org.apache.qpid.jms.test.QpidJmsTestCase;
@@ -62,6 +49,19 @@
6249
import org.junit.jupiter.api.Test;
6350
import org.junit.jupiter.api.Timeout;
6451

52+
import jakarta.jms.BytesMessage;
53+
import jakarta.jms.CompletionListener;
54+
import jakarta.jms.Connection;
55+
import jakarta.jms.DeliveryMode;
56+
import jakarta.jms.JMSException;
57+
import jakarta.jms.Message;
58+
import jakarta.jms.MessageConsumer;
59+
import jakarta.jms.MessageFormatException;
60+
import jakarta.jms.MessageNotWriteableException;
61+
import jakarta.jms.MessageProducer;
62+
import jakarta.jms.Queue;
63+
import jakarta.jms.Session;
64+
6565
public class BytesMessageIntegrationTest extends QpidJmsTestCase {
6666
private final IntegrationTestFixture testFixture = new IntegrationTestFixture();
6767

@@ -581,7 +581,7 @@ public void testAsyncSendDoesNotMarksBytesMessageReadOnly() throws Exception {
581581
@Timeout(20)
582582
public void testAsyncCompletionSendMarksBytesMessageReadOnly() throws Exception {
583583
try(TestAmqpPeer testPeer = new TestAmqpPeer();) {
584-
JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
584+
JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer, "jms.closeTimeout=50");
585585
connection.setSendTimeout(15000);
586586

587587
testPeer.expectBegin();

qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MapMessageIntegrationTest.java

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -30,18 +30,6 @@
3030
import java.util.LinkedHashMap;
3131
import java.util.Map;
3232

33-
import jakarta.jms.CompletionListener;
34-
import jakarta.jms.Connection;
35-
import jakarta.jms.DeliveryMode;
36-
import jakarta.jms.MapMessage;
37-
import jakarta.jms.Message;
38-
import jakarta.jms.MessageConsumer;
39-
import jakarta.jms.MessageFormatException;
40-
import jakarta.jms.MessageNotWriteableException;
41-
import jakarta.jms.MessageProducer;
42-
import jakarta.jms.Queue;
43-
import jakarta.jms.Session;
44-
4533
import org.apache.qpid.jms.JmsConnection;
4634
import org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport;
4735
import org.apache.qpid.jms.test.QpidJmsTestCase;
@@ -58,6 +46,18 @@
5846
import org.junit.jupiter.api.Test;
5947
import org.junit.jupiter.api.Timeout;
6048

49+
import jakarta.jms.CompletionListener;
50+
import jakarta.jms.Connection;
51+
import jakarta.jms.DeliveryMode;
52+
import jakarta.jms.MapMessage;
53+
import jakarta.jms.Message;
54+
import jakarta.jms.MessageConsumer;
55+
import jakarta.jms.MessageFormatException;
56+
import jakarta.jms.MessageNotWriteableException;
57+
import jakarta.jms.MessageProducer;
58+
import jakarta.jms.Queue;
59+
import jakarta.jms.Session;
60+
6161
public class MapMessageIntegrationTest extends QpidJmsTestCase {
6262
private final IntegrationTestFixture testFixture = new IntegrationTestFixture();
6363

@@ -440,7 +440,7 @@ public void testAsyncSendDoesNotMarkMapMessageReadOnly() throws Exception {
440440
@Timeout(20)
441441
public void testAsyncCompletionSendMarksMapMessageReadOnly() throws Exception {
442442
try(TestAmqpPeer testPeer = new TestAmqpPeer();) {
443-
JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
443+
JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer, "jms.closeTimeout=50");
444444
connection.setSendTimeout(15000);
445445

446446
testPeer.expectBegin();

0 commit comments

Comments
 (0)