1414import java .util .Objects ;
1515import java .util .concurrent .atomic .AtomicBoolean ;
1616import java .util .concurrent .atomic .AtomicLong ;
17+ import java .util .concurrent .atomic .AtomicReference ;
1718
1819import org .eclipse .rdf4j .repository .RepositoryConnection ;
1920import org .slf4j .Logger ;
@@ -39,14 +40,12 @@ public enum Source {
3940 private final long startTimeMillis ;
4041 private final Runnable remoteCancel ;
4142 private final AtomicBoolean active = new AtomicBoolean (true );
42- private final AtomicBoolean cancelRequested = new AtomicBoolean ( false );
43+ private final AtomicReference < Cancellation > cancellation = new AtomicReference <>( );
4344 private final AtomicLong lastHeavyCheckpointMillis = new AtomicLong (-1 );
4445
4546 private volatile Thread workerThread ;
4647 private volatile RepositoryConnection repositoryConnection ;
4748 private volatile String lastHeavyOperator ;
48- private volatile QueryPressureState cancellationState = QueryPressureState .NORMAL ;
49- private volatile String cancellationReason ;
5049
5150 QueryCircuitBreakerHandle (String executionId , Source source , String repositoryId , String queryHash ,
5251 long startTimeMillis , Runnable remoteCancel ) {
@@ -83,7 +82,7 @@ public boolean isActive() {
8382 }
8483
8584 public boolean isCancelRequested () {
86- return cancelRequested .get ();
85+ return cancellation .get () != null ;
8786 }
8887
8988 public long getLastHeavyCheckpointMillis () {
@@ -95,11 +94,13 @@ public String getLastHeavyOperator() {
9594 }
9695
9796 public QueryPressureState getCancellationState () {
98- return cancellationState ;
97+ Cancellation current = cancellation .get ();
98+ return current != null ? current .state : QueryPressureState .NORMAL ;
9999 }
100100
101101 public String getCancellationReason () {
102- return cancellationReason ;
102+ Cancellation current = cancellation .get ();
103+ return current != null ? current .reason : null ;
103104 }
104105
105106 public boolean hasHeavyCheckpoint () {
@@ -125,13 +126,10 @@ void markHeavy(String operator, long checkpointTimeMillis) {
125126 }
126127
127128 boolean requestCancel (QueryPressureState state , String reason ) {
128- if (!active .get () || !cancelRequested .compareAndSet (false , true )) {
129+ if (!active .get () || !cancellation .compareAndSet (null , new Cancellation ( state , reason ) )) {
129130 return false ;
130131 }
131132
132- cancellationState = state ;
133- cancellationReason = reason ;
134-
135133 Thread activeWorker = workerThread ;
136134 if (activeWorker != null && activeWorker != Thread .currentThread ()) {
137135 activeWorker .interrupt ();
@@ -168,4 +166,15 @@ private void runRemoteCancel() {
168166 LOGGER .debug ("Error while forwarding breaker-triggered cancellation" , e );
169167 }
170168 }
169+
170+ private static final class Cancellation {
171+
172+ private final QueryPressureState state ;
173+ private final String reason ;
174+
175+ private Cancellation (QueryPressureState state , String reason ) {
176+ this .state = state ;
177+ this .reason = reason ;
178+ }
179+ }
171180}
0 commit comments