2626import java .util .concurrent .atomic .AtomicBoolean ;
2727import java .util .concurrent .atomic .AtomicReference ;
2828
29+ import org .eclipse .rdf4j .http .client .QueryCircuitBreaker ;
30+ import org .eclipse .rdf4j .http .client .QueryCircuitBreakerHandle ;
31+ import org .eclipse .rdf4j .http .client .QueryExecutionContext ;
2932import org .eclipse .rdf4j .http .client .QueryExplanationRequestContext ;
3033import org .eclipse .rdf4j .http .protocol .Protocol ;
3134import org .eclipse .rdf4j .http .server .repository .ExplainQueryResultView ;
@@ -311,6 +314,56 @@ void explainRequestIdShouldPropagateToProxyRepositoriesAndForwardCancel() throws
311314 }
312315 }
313316
317+ @ Test
318+ void breakerCancellationShouldForwardTrackedExplainCancelToProxyRepository () throws Exception {
319+ RepositoryResolver repositoryResolver = mock (RepositoryResolver .class );
320+ HTTPRepository repository = mock (HTTPRepository .class );
321+ RepositoryConnection connection = mock (RepositoryConnection .class );
322+ TupleQuery tupleQuery = mock (TupleQuery .class );
323+ DefaultQueryRequestHandler handler = new DefaultQueryRequestHandler (repositoryResolver );
324+ ExecutorService executor = Executors .newSingleThreadExecutor ();
325+
326+ CountDownLatch explainStarted = new CountDownLatch (1 );
327+ CountDownLatch explainInterrupted = new CountDownLatch (1 );
328+
329+ MockHttpServletRequest explainRequest = newAsyncExplainRequest ("req-breaker" );
330+ MockHttpServletResponse explainResponse = new MockHttpServletResponse ();
331+
332+ when (repositoryResolver .getRepository (explainRequest )).thenReturn (repository );
333+ when (repositoryResolver .getRepositoryConnection (explainRequest , repository )).thenReturn (connection );
334+ when (connection .prepareQuery (QueryLanguage .SPARQL , SELECT_ALL_QUERY , null )).thenReturn (tupleQuery );
335+ when (tupleQuery .explain (Explanation .Level .Optimized )).thenAnswer (invocation -> {
336+ QueryCircuitBreakerHandle handle = QueryExecutionContext .getHandle ();
337+ assertThat (handle ).isNotNull ();
338+ QueryCircuitBreaker .getInstance ().markHeavy (handle , "GROUP_BY" );
339+ explainStarted .countDown ();
340+ try {
341+ new CountDownLatch (1 ).await ();
342+ return mock (Explanation .class );
343+ } catch (InterruptedException e ) {
344+ explainInterrupted .countDown ();
345+ throw new QueryInterruptedException ("interrupted" , e );
346+ }
347+ });
348+
349+ try {
350+ withBreakerDisabled (() -> {
351+ Future <ModelAndView > explainFuture = executor
352+ .submit (() -> handler .handleQueryRequest (explainRequest , RequestMethod .POST , explainResponse ));
353+
354+ assertThat (explainStarted .await (5 , TimeUnit .SECONDS )).isTrue ();
355+ withCriticalBreakerProperties (this ::triggerCriticalBreakerCancellation );
356+
357+ assertThat (explainInterrupted .await (5 , TimeUnit .SECONDS )).isTrue ();
358+ assertThatThrownBy (() -> explainFuture .get (5 , TimeUnit .SECONDS ))
359+ .hasCauseInstanceOf (org .eclipse .rdf4j .http .server .ServerHTTPException .class );
360+ verify (repository ).cancelQueryExplanation ("req-breaker" );
361+ });
362+ } finally {
363+ executor .shutdownNow ();
364+ }
365+ }
366+
314367 private static MockHttpServletRequest newAsyncExplainRequest (String explainRequestId ) {
315368 MockHttpServletRequest request = new MockHttpServletRequest ();
316369 request .setMethod (RequestMethod .POST .name ());
@@ -330,6 +383,47 @@ private static MockHttpServletRequest newCancelExplainRequest(String explainRequ
330383 return request ;
331384 }
332385
386+ private void triggerCriticalBreakerCancellation () {
387+ QueryCircuitBreaker breaker = QueryCircuitBreaker .getInstance ();
388+ QueryCircuitBreakerHandle handle = breaker .register (QueryCircuitBreakerHandle .Source .SERVER , "repo" ,
389+ "critical-admission" );
390+ try {
391+ assertThatThrownBy (() -> breaker .beforeExecution (handle ))
392+ .isInstanceOf (QueryCircuitBreaker .CircuitBreakerException .class );
393+ } finally {
394+ breaker .complete (handle );
395+ }
396+ }
397+
398+ private void withBreakerDisabled (ThrowingRunnable action ) throws Exception {
399+ String previousEnabled = System .getProperty (BREAKER_ENABLED );
400+ try {
401+ System .setProperty (BREAKER_ENABLED , "false" );
402+ action .run ();
403+ } finally {
404+ restoreProperty (BREAKER_ENABLED , previousEnabled );
405+ }
406+ }
407+
408+ private void withCriticalBreakerProperties (ThrowingRunnable action ) throws Exception {
409+ String previousEnabled = System .getProperty (BREAKER_ENABLED );
410+ String previousWarnFreeMb = System .getProperty (BREAKER_WARN_FREE_MB );
411+ String previousHighFreeMb = System .getProperty (BREAKER_HIGH_FREE_MB );
412+ String previousCriticalFreeMb = System .getProperty (BREAKER_CRITICAL_FREE_MB );
413+ try {
414+ System .setProperty (BREAKER_ENABLED , "true" );
415+ System .setProperty (BREAKER_WARN_FREE_MB , Integer .toString (Integer .MAX_VALUE ));
416+ System .setProperty (BREAKER_HIGH_FREE_MB , Integer .toString (Integer .MAX_VALUE ));
417+ System .setProperty (BREAKER_CRITICAL_FREE_MB , Integer .toString (Integer .MAX_VALUE ));
418+ action .run ();
419+ } finally {
420+ restoreProperty (BREAKER_ENABLED , previousEnabled );
421+ restoreProperty (BREAKER_WARN_FREE_MB , previousWarnFreeMb );
422+ restoreProperty (BREAKER_HIGH_FREE_MB , previousHighFreeMb );
423+ restoreProperty (BREAKER_CRITICAL_FREE_MB , previousCriticalFreeMb );
424+ }
425+ }
426+
333427 private void withBreakerProperties (ThrowingRunnable action ) throws Exception {
334428 String previousEnabled = System .getProperty (BREAKER_ENABLED );
335429 String previousWarnFreeMb = System .getProperty (BREAKER_WARN_FREE_MB );
0 commit comments