@@ -50,12 +50,14 @@ public class SQLCursor implements FrameIterator {
5050
5151 private final int id ;
5252 private ResultSet target ;
53- private boolean done ;
5453 private ExecutorService exec = SQLJoinThreadPool .getThreadPool ();
55- private ArrayList <FrameJoinTask > tasks ;
54+ private List <FrameApiJoin > tasks ;
55+ private List <FrameApiJoin > tasks_ ;
56+ private Map <Integer , Map <String , FrameApiJoin >> joins ;
5657 private FrameData bdnext ;
5758 private FrameHolder current ;
5859 private int ptr = 0 ;
60+ private boolean sent ;
5961 private Cursor cur ;
6062 private Session s ;
6163 private ArrayList <SQLColumn > rscols ;
@@ -66,6 +68,7 @@ public class SQLCursor implements FrameIterator {
6668 private final FrameIterator lbi ;
6769 private final FrameIterator rbi ;
6870 private final List <Integer > objectIds ;
71+ private String rightType ;
6972 private boolean leftFS ;
7073 private final boolean furtherUseUC ;
7174 private final SQLColumn joinedCC ;
@@ -104,7 +107,9 @@ public SQLCursor (int id, FrameIterator lbi, FrameIterator rbi, NestedCondition
104107 }
105108 current = new FrameHolder (target );
106109
107- tasks = new ArrayList <FrameJoinTask >();
110+ tasks = new ArrayList <FrameApiJoin >();
111+ tasks_ = new ArrayList <FrameApiJoin >();
112+ joins = new HashMap <>();
108113
109114 //rebuild column set for sqlcursor iterator
110115 final SQLCursor cursor_ = lbi .getType () == FrameIterator .TYPE_CURSOR ? (SQLCursor ) lbi : rbi != null && rbi .getType () == FrameIterator .TYPE_CURSOR ? (SQLCursor ) rbi : null ;
@@ -173,29 +178,48 @@ public int getObjectId() {
173178 return target .getObjectId ();
174179 }
175180
181+ protected FrameJoinTask buildFrameJoinTask (int nodeId , FrameApi bd1 , FrameApi bd2 ) {
182+ return new FrameJoinTask (cur , bd1 , bd2 , target , rscols , nc , id , nodeId , last , lbi .isLeftfs (), hmap , s );
183+ }
184+
176185 public void build () throws InternalException , IOException , ClassNotFoundException , InvocationTargetException , NoSuchMethodException , InstantiationException , IllegalAccessException {
177186 final Integer [] ns = TransportContext .getInstance ().getOnlineNodesWithLocal ();
187+ for (Integer nodeId : ns ) {
188+ if (nodeId != Config .getConfig ().LOCAL_NODE_ID ) {
189+ joins .put (nodeId , new HashMap <>());
190+ }
191+ }
178192 int i = 0 ;
179193 //int tnode = 0;
180194
181195 while (lbi .hasNextFrame ()) {
182- FrameApi bd1 = lbi .nextFrame ();
196+ final FrameApi bd1 = lbi .nextFrame ();
183197
184198 if (rbi == null ) {
185- tasks .add (new FrameJoinTask ( cur , bd1 , null , target , rscols , nc , id , ns [i ], last , lbi . isLeftfs () , null , s ));
199+ tasks .add (new FrameApiJoin ( ns [i ], this , bd1 , null , joins ));
186200 i ++;
187201 if (i == ns .length ) { i = 0 ; }
188202 } else {
189203 while (rbi .hasNextFrame ()) {
190- FrameApi bd2 = rbi .nextFrame ();
191- tasks .add (new FrameJoinTask (cur , bd1 , bd2 , target , rscols , nc , id , ns [i ], last , lbi .isLeftfs (), hmap , s ));
204+ final FrameApi bd2 = rbi .nextFrame ();
205+ if (rightType == null ) {
206+ rightType = bd2 .getClass ().getSimpleName ();
207+ }
208+ tasks .add (new FrameApiJoin (ns [i ], this , bd1 , bd2 , joins ));
192209 i ++;
193210 if (i == ns .length ) { i = 0 ; }
194211 }
195212 rbi .resetIterator ();
196213 }
197214 }
198- logger .debug ("SQL cursor is build: tasks amount = " +tasks .size ()+", use NC check = " +last );
215+ logger .debug ("SQL cursor is build: local tasks amount = " +tasks .size ()+", use NC check = " +last );
216+ if (!sent ) {
217+ for (Map .Entry <Integer , Map <String , FrameApiJoin >> entry : joins .entrySet ()) {
218+ final RemoteTask rt = new RemoteTask (cur , entry .getKey (), entry .getValue (), rightType );
219+ exec .submit (rt );
220+ }
221+ sent = true ;
222+ }
199223 }
200224
201225 public void stream () throws Exception {
@@ -241,27 +265,38 @@ public synchronized FrameData nextFrame() {
241265 }
242266
243267 private synchronized FrameData nextFrame2 () throws InternalException {
244- boolean done = !(ptr < tasks .size ());
268+ boolean done = !(ptr < tasks .size ());
245269 FrameData ret = current .getFrame (done );
246270
247- while (!done &&ret ==null ) {
271+ while (!done && ret ==null ) {
272+ final ArrayList <Future <FrameApiJoin >> flist = new ArrayList <Future <FrameApiJoin >>();
248273 try {
249- ArrayList <Future <List <Object >>> flist = new ArrayList <Future <List <Object >>>();
250- for (int i =0 ; i <BATCH_SIZE ; i ++) {
251- if ((ptr + i )<tasks .size ()) {
252- FrameJoinTask jt = tasks .get (ptr + i );
253- flist .add (exec .submit (jt ));
274+ for (int i =0 ; i < BATCH_SIZE ; i ++) {
275+ if ((ptr + i ) < tasks .size ()) {
276+ final FrameApiJoin j = tasks .get (ptr + i );
277+ flist .add (exec .submit (j ));
254278 }
255279 }
256- for (Future <List <Object >> f : flist ) {
257- List <Object > ol = f .get ();
258- logger .debug ("SQL cursor next frame: the jointask call returned " + ol .size () + " records" );
259- for (Object o : ol ) {
260- target .persist (o , s );
280+ for (Future <FrameApiJoin > f : flist ) {
281+ final FrameApiJoin j = f .get ();
282+ final List <Object > ol = j .getResult ();
283+ if (ol == null ) {
284+ tasks_ .add (j );
285+ } else {
286+ logger .debug ("SQL cursor next frame: the jointask call returned " + ol .size () + " records" );
287+ for (Object o : ol ) {
288+ target .persist (o , s );
289+ }
261290 }
262291 }
263292 ptr = ptr + BATCH_SIZE ;
264- done = !(ptr <tasks .size ());
293+ done = !(ptr < tasks .size ());
294+ if (done && tasks_ .size () > 0 ) {
295+ done = false ;
296+ tasks = tasks_ ;
297+ tasks_ = new ArrayList <>();
298+ ptr = 0 ;
299+ }
265300 ret = current .getFrame (done );
266301 } catch (Exception e ) {
267302 e .printStackTrace ();
0 commit comments