@@ -37,6 +37,7 @@ this software and associated documentation files (the "Software"), to deal in
3737import java .util .*;
3838import java .util .concurrent .ConcurrentLinkedQueue ;
3939import java .util .concurrent .ExecutorService ;
40+ import java .util .concurrent .Executors ;
4041import java .util .concurrent .Future ;
4142import java .lang .reflect .InvocationTargetException ;
4243import java .io .IOException ;
@@ -51,9 +52,11 @@ public class SQLCursor implements FrameIterator {
5152 private final int id ;
5253 private ResultSet target ;
5354 private ExecutorService exec = SQLJoinThreadPool .getThreadPool ();
55+ private ExecutorService exec2 = SQLJoinThreadPool .getThreadPool2 ();
56+ private ExecutorService remotepool = Executors .newCachedThreadPool ();
57+ private ExecutorService streampool = Executors .newCachedThreadPool ();
5458 private List <FrameApiJoin > tasks ;
5559 private List <FrameApiJoin > tasks_ ;
56- private Map <Integer , Map <String , FrameApiJoin >> joins ;
5760 private FrameData bdnext ;
5861 private FrameHolder current ;
5962 private int ptr = 0 ;
@@ -100,16 +103,16 @@ public SQLCursor (int id, FrameIterator lbi, FrameIterator rbi, NestedCondition
100103
101104 if (cur .getType () == Cursor .SLAVE_TYPE && cur .getResultTargetName () != null && this .id == 1 ) {
102105 target = this .peristent ? s .registerTable (cur .getResultTargetName (), s , rscols , null , null , ixflag && last ) : new ResultList (cur .getSqlStmt ().getEntityTable ());
103- } else if (cur .getType () == Cursor .MASTER_TYPE ) {
104- target = this .peristent ? s .registerTable ("su.interference.persistent.R$" + UUID .randomUUID ().toString ().replace ('-' , '$' ), s , rscols , null , null , ixflag && last ) : new ResultList (cur .getSqlStmt ().getEntityTable ());
105106 } else if (cur .getType () == Cursor .STREAM_TYPE ) {
106107 target = new StreamQueue ();
108+ } else
109+ {
110+ target = this .peristent ? s .registerTable ("su.interference.persistent.R$" + UUID .randomUUID ().toString ().replace ('-' , '$' ), s , rscols , null , null , ixflag && last ) : new ResultList (cur .getSqlStmt ().getEntityTable ());
107111 }
108112 current = new FrameHolder (target );
109113
110114 tasks = new ArrayList <FrameApiJoin >();
111115 tasks_ = new ArrayList <FrameApiJoin >();
112- joins = new HashMap <>();
113116
114117 //rebuild column set for sqlcursor iterator
115118 final SQLCursor cursor_ = lbi .getType () == FrameIterator .TYPE_CURSOR ? (SQLCursor ) lbi : rbi != null && rbi .getType () == FrameIterator .TYPE_CURSOR ? (SQLCursor ) rbi : null ;
@@ -184,19 +187,14 @@ protected FrameJoinTask buildFrameJoinTask(int nodeId, FrameApi bd1, FrameApi bd
184187
185188 public void build () throws InternalException , IOException , ClassNotFoundException , InvocationTargetException , NoSuchMethodException , InstantiationException , IllegalAccessException {
186189 final Integer [] ns = TransportContext .getInstance ().getOnlineNodesWithLocal ();
187- for (Integer nodeId : ns ) {
188- if (nodeId != Config .getConfig ().LOCAL_NODE_ID ) {
189- joins .put (nodeId , new HashMap <>());
190- }
191- }
192190 int i = 0 ;
193191 //int tnode = 0;
194192
195193 while (lbi .hasNextFrame ()) {
196194 final FrameApi bd1 = lbi .nextFrame ();
197195
198196 if (rbi == null ) {
199- tasks .add (new FrameApiJoin (ns [i ], this , bd1 , null , joins ));
197+ tasks .add (new FrameApiJoin (ns [i ], this , bd1 , null ));
200198 i ++;
201199 if (i == ns .length ) { i = 0 ; }
202200 } else {
@@ -205,45 +203,67 @@ public void build() throws InternalException, IOException, ClassNotFoundExceptio
205203 if (rightType == null ) {
206204 rightType = bd2 .getClass ().getSimpleName ();
207205 }
208- tasks .add (new FrameApiJoin (ns [i ], this , bd1 , bd2 , joins ));
206+ tasks .add (new FrameApiJoin (ns [i ], this , bd1 , bd2 ));
209207 i ++;
210208 if (i == ns .length ) { i = 0 ; }
211209 }
212210 rbi .resetIterator ();
213211 }
214212 }
215- logger .debug ("SQL cursor is build: local tasks amount = " +tasks .size ()+", use NC check = " +last );
213+ logger .debug ("SQL cursor is build: tasks amount = " +tasks .size ()+", use NC check = " +last );
216214 if (!sent ) {
217- for (Map .Entry <Integer , Map <String , FrameApiJoin >> entry : joins .entrySet ()) {
218- final RemoteTask rt = new RemoteTask (cur , entry .getKey (), entry .getValue (), rightType );
219- exec .submit (rt );
215+ for (Integer nodeId : ns ) {
216+ if (nodeId != Config .getConfig ().LOCAL_NODE_ID ) {
217+ final Map <String , FrameApiJoin > joins = new HashMap <>();
218+ for (FrameApiJoin j : tasks ) {
219+ if (j .getNodeId () == nodeId ) {
220+ joins .put (j .getKey (), j );
221+ }
222+ }
223+ final RemoteTask rt = new RemoteTask (cur , nodeId , joins , rightType );
224+ remotepool .submit (rt );
225+ }
220226 }
221227 sent = true ;
222228 }
223229 }
224230
225231 public void stream () throws Exception {
226232 final Queue <FrameApi > q = sfmap .get (lbi .getObjectId ());
227- boolean cnue = true ;
228233 if (!cur .isStream ()) {
229234 logger .error ("wrong stream method call: SQL statement is not a stream" );
230235 }
231236 if (q == null ) {
232237 throw new RuntimeException ("internal error: queue not exist for object id = " +lbi .getObjectId ());
233238 }
234- while (cnue ) {
235- FrameApi f = q .poll ();
236- if (f != null ) {
237- FrameJoinTask task = new FrameJoinTask (cur , f , null , target , rscols , nc , id , Config .getConfig ().LOCAL_NODE_ID , last , lbi .isLeftfs (), null , s );
238- final Future <List <Object >> ft = exec .submit (task );
239- for (Object o : ft .get ()) {
240- target .persist (o , s );
239+ if (!(target instanceof StreamQueue )) {
240+ throw new RuntimeException ("internal error: wrong target type for object id = " +lbi .getObjectId ());
241+ }
242+ s .setStream (true );
243+ Runnable r = new Runnable () {
244+ @ Override
245+ public void run () {
246+ try {
247+ while (((StreamQueue ) target ).isRunning ()) {
248+ FrameApi f = q .poll ();
249+ if (f != null ) {
250+ FrameJoinTask task = new FrameJoinTask (cur , f , null , target , rscols , nc , id , Config .getConfig ().LOCAL_NODE_ID , last , lbi .isLeftfs (), null , s );
251+ final Future <List <Object >> ft = exec .submit (task );
252+ for (Object o : ft .get ()) {
253+ target .persist (o , s );
254+ }
255+ }
256+ if (q .peek () == null ) {
257+ Thread .sleep (100 );
258+ }
259+ }
260+ } catch (Exception e ) {
261+ ((StreamQueue ) target ).stop (s );
262+ throw new RuntimeException (e );
241263 }
242264 }
243- if (q .peek () == null ) {
244- Thread .sleep (1000 );
245- }
246- }
265+ };
266+ streampool .submit (r );
247267 }
248268
249269 public static void addStreamFrame (FrameApi f ) {
@@ -270,21 +290,33 @@ private synchronized FrameData nextFrame2() throws InternalException {
270290
271291 while (!done && ret ==null ) {
272292 final ArrayList <Future <FrameApiJoin >> flist = new ArrayList <Future <FrameApiJoin >>();
293+ final ArrayList <Future <FrameApiJoin >> flist2 = new ArrayList <Future <FrameApiJoin >>();
273294 try {
274295 for (int i =0 ; i < BATCH_SIZE ; i ++) {
275296 if ((ptr + i ) < tasks .size ()) {
276297 final FrameApiJoin j = tasks .get (ptr + i );
277- flist .add (exec .submit (j ));
298+ if (j .getNodeId () == Config .getConfig ().LOCAL_NODE_ID ) {
299+ flist .add (exec .submit (j ));
300+ } else {
301+ flist2 .add (exec2 .submit (j ));
302+ }
278303 }
279304 }
280305 for (Future <FrameApiJoin > f : flist ) {
281306 final FrameApiJoin j = f .get ();
282- final List <Object > ol = j .getResult ();
283- if (ol == null ) {
284- tasks_ .add (j );
307+ logger .debug ("SQL cursor next frame: the jointask call returned " + j .getResult ().size () + " records" );
308+ for (Object o : j .getResult ()) {
309+ target .persist (o , s );
310+ }
311+ }
312+ for (Future <FrameApiJoin > f : flist2 ) {
313+ final FrameApiJoin j = f .get ();
314+ if (j .isFailed ()) {
315+ FrameApiJoin j_ = new FrameApiJoin (Config .getConfig ().LOCAL_NODE_ID , this , j .getBd1 (), j .getBd2 ());
316+ tasks_ .add (j_ );
285317 } else {
286- logger .debug ("SQL cursor next frame: the jointask call returned " + ol .size () + " records" );
287- for (Object o : ol ) {
318+ logger .debug ("SQL cursor next frame: the jointask call returned " + j . getResult () .size () + " records" );
319+ for (Object o : j . getResult () ) {
288320 target .persist (o , s );
289321 }
290322 }
0 commit comments