1616import java .util .LinkedList ;
1717import java .util .List ;
1818import java .util .Set ;
19+ import java .util .concurrent .ExecutorService ;
20+ import java .util .concurrent .Executors ;
21+ import java .util .concurrent .Future ;
1922
2023import org .eclipse .rdf4j .common .iteration .CloseableIteration ;
2124import org .eclipse .rdf4j .common .iteration .EmptyIteration ;
@@ -62,6 +65,10 @@ private class BatchingServiceIteration extends JoinExecutorBase<BindingSet> {
6265
6366 private final Service service ;
6467
68+ private final ExecutorService threadExecutor = Executors .newSingleThreadExecutor ();
69+
70+ private final Future <?> querySubmissionTask ;
71+
6572 /**
6673 * @param inputBindings
6774 * @throws QueryEvaluationException
@@ -71,13 +78,26 @@ public BatchingServiceIteration(CloseableIteration<BindingSet> inputBindings,
7178 super (inputBindings , null , EmptyBindingSet .getInstance ());
7279 this .blockSize = blockSize ;
7380 this .service = service ;
74- run ();
81+
82+ // Set up a consumer task to send HTTP requests in parallel. This must be done in a
83+ // separate thread, because submitting HTTP requests may block if the HTTP pool is full.
84+ // In that case, we would enter a deadlock, with the main thread waiting for both the
85+ // pool to yield, and the consumer of the bindings to read from the queue.
86+ // See: https://github.com/eclipse-rdf4j/rdf4j/discussions/5120
87+ // Test case: https://github.com/tkuhn/rdf4j-timeout-test
88+ try {
89+ querySubmissionTask = threadExecutor .submit (this ::run );
90+ } catch (Exception e ) {
91+ throw new QueryEvaluationException ("Failed to start a thread for batched federated query submission" ,
92+ e );
93+ }
7594 }
7695
7796 @ Override
7897 protected void handleBindings () throws Exception {
98+ // Note: any exceptions here will be intercepted by the caller and tossed asynchronously
99+ // via the rightQueue.
79100 while (!isClosed () && leftIter .hasNext ()) {
80-
81101 ArrayList <BindingSet > blockBindings = new ArrayList <>(blockSize );
82102 for (int i = 0 ; i < blockSize ; i ++) {
83103 if (!leftIter .hasNext ()) {
@@ -87,9 +107,19 @@ protected void handleBindings() throws Exception {
87107 }
88108 CloseableIteration <BindingSet > materializedIter = new CollectionIteration <>(
89109 blockBindings );
110+ // evaluateInternal is BLOCKING if the HTTP pool is exhausted
90111 addResult (evaluateInternal (service , materializedIter , service .getBaseURI ()));
91112 }
92113 }
114+
115+ @ Override
116+ public void handleClose () throws QueryEvaluationException {
117+ super .handleClose ();
118+ if (querySubmissionTask != null ) {
119+ querySubmissionTask .cancel (true );
120+ }
121+ threadExecutor .shutdownNow ();
122+ }
93123 }
94124
95125 /**
@@ -631,4 +661,4 @@ private static void closeQuietly(RepositoryConnection conn) {
631661 logger .debug ("Details: " , t );
632662 }
633663 }
634- }
664+ }
0 commit comments