Skip to content

Commit f9b17bc

Browse files
fix wrong index scans within distibute queries
1 parent 1c76e95 commit f9b17bc

11 files changed

Lines changed: 147 additions & 66 deletions

File tree

src/main/java/su/interference/persistent/Session.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,8 @@ public static int getCLASS_ID() {
111111

112112
@Transient
113113
private final ExecutorService streampool = Executors.newCachedThreadPool();
114+
@Transient
115+
private volatile boolean stream;
114116

115117
public static Session getContextSession() {
116118
return contextSession.get();
@@ -539,4 +541,11 @@ public static void setDntmSession(Session dntmSession) {
539541
Session.dntmSession = dntmSession;
540542
}
541543

544+
public boolean isStream() {
545+
return stream;
546+
}
547+
548+
public void setStream(boolean stream) {
549+
this.stream = stream;
550+
}
542551
}

src/main/java/su/interference/sql/FrameApiJoin.java

Lines changed: 31 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -25,46 +25,53 @@ this software and associated documentation files (the "Software"), to deal in
2525
package su.interference.sql;
2626

2727
import su.interference.core.Config;
28+
import su.interference.metrics.Metrics;
2829

2930
import java.io.Serializable;
3031
import java.util.List;
31-
import java.util.Map;
3232
import java.util.concurrent.Callable;
33+
import java.util.concurrent.CountDownLatch;
3334

3435
/**
3536
* @author Yuriy Glotanov
3637
* @since 1.0
3738
*/
3839

3940
public class FrameApiJoin implements Serializable, Callable<FrameApiJoin> {
40-
private int nodeId;
41+
private final int nodeId;
4142
private final transient SQLCursor cur;
4243
private final transient FrameApi bd1;
4344
private final transient FrameApi bd2;
4445
private final transient FrameJoinTask frameJoinTask;
4546
private final long leftAllocId;
4647
private final long rightAllocId;
48+
private final transient CountDownLatch latch = new CountDownLatch(1);
4749
private List<Object> result;
50+
private boolean failed;
4851

49-
public FrameApiJoin(int nodeId, SQLCursor cur, FrameApi bd1, FrameApi bd2, Map<Integer, Map<String, FrameApiJoin>> joins) {
52+
public FrameApiJoin(int nodeId, SQLCursor cur, FrameApi bd1, FrameApi bd2) {
5053
this.nodeId = nodeId;
5154
this.cur = cur;
5255
this.bd1 = bd1;
5356
this.bd2 = bd2;
5457
this.leftAllocId = bd1.getAllocId();
55-
this.rightAllocId = bd2 == null ? 0 : bd2.getAllocId();
58+
this.rightAllocId = bd2 == null ? 0 : bd2 instanceof SQLIndexFrame ? 0 : bd2.getAllocId();
5659
if (nodeId == Config.getConfig().LOCAL_NODE_ID) {
5760
frameJoinTask = cur.buildFrameJoinTask(nodeId, bd1, bd2);
5861
} else {
5962
frameJoinTask = null;
60-
joins.get(nodeId).put(this.getKey(), this);
6163
}
6264
}
6365

6466
public FrameApiJoin call() throws Exception {
6567
if (nodeId == Config.getConfig().LOCAL_NODE_ID) {
66-
final FrameJoinTask frameJoinTask = cur.buildFrameJoinTask(nodeId, bd1, bd2);
68+
Metrics.get("localTask").start();
6769
result = frameJoinTask.call();
70+
Metrics.get("localTask").stop();
71+
} else {
72+
Metrics.get("remoteTask").start();
73+
latch.await();
74+
Metrics.get("remoteTask").stop();
6875
}
6976
return this;
7077
}
@@ -87,14 +94,6 @@ public int getNodeId() {
8794
return nodeId;
8895
}
8996

90-
public void setNodeId(int nodeId) {
91-
this.nodeId = nodeId;
92-
}
93-
94-
public FrameJoinTask getFrameJoinTask() {
95-
return frameJoinTask;
96-
}
97-
9897
public long getLeftAllocId() {
9998
return leftAllocId;
10099
}
@@ -103,11 +102,29 @@ public long getRightAllocId() {
103102
return rightAllocId;
104103
}
105104

105+
public FrameApi getBd1() {
106+
return bd1;
107+
}
108+
109+
public FrameApi getBd2() {
110+
return bd2;
111+
}
112+
106113
public List<Object> getResult() {
107114
return result;
108115
}
109116

110117
public void setResult(List<Object> result) {
111118
this.result = result;
119+
latch.countDown();
120+
}
121+
122+
public boolean isFailed() {
123+
return failed;
124+
}
125+
126+
public void setFailed(boolean failed) {
127+
this.failed = failed;
128+
latch.countDown();
112129
}
113130
}

src/main/java/su/interference/sql/FrameHolder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public class FrameHolder {
4444
private final boolean persistent;
4545

4646
public FrameHolder(ResultSet t) {
47-
if (t.getClass().getName().equals("su.interference.persistent.Table")) {
47+
if (t != null && t.getClass().getName().equals("su.interference.persistent.Table")) {
4848
Table tt = (Table)t;
4949
cframes = new AtomicReference[tt.getLbs().length];
5050
for (int i=0; i<tt.getLbs().length; i++) {

src/main/java/su/interference/sql/FrameJoinTask.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,6 @@ public List<Object> call() throws Exception {
117117
}
118118
*/
119119

120-
Metrics.get("localTask").start();
121120
final ArrayList<Object> drs1 = bd1.getFrameEntities(s);
122121
final ArrayList<Object> drs2 = bd2==null?null:bd2.getFrameEntities(s);
123122

@@ -221,7 +220,10 @@ public List<Object> call() throws Exception {
221220
} else {
222221
//one table loop
223222
if (r == null) {
224-
res.add(o1); //target table is null -> result class is null -> returns generic entities
223+
//todo need to cast o1 to RS type
224+
//if (nc.checkNC(o1, sqlcid, last)) {
225+
res.add(o1); //target table is null -> result class is null -> returns generic entities
226+
//}
225227
} else {
226228
Object j = joinDataRecords(r, c1, c2, t1, t2, o1, null, cols, c1rs, s);
227229
if (nc.checkNC(j, sqlcid, last)) {
@@ -240,7 +242,6 @@ public List<Object> call() throws Exception {
240242
}
241243
}
242244
}
243-
Metrics.get("localTask").stop();
244245
Metrics.get("recordLCount").put(res.size());
245246
// logger.info(res.size()+" records returns from local node " + nodeId);
246247
return res;

src/main/java/su/interference/sql/RemoteTask.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,14 +55,15 @@ public RemoteTask(Cursor cur, int nodeId, Map<String, FrameApiJoin> joins, Strin
5555
}
5656

5757
public Boolean call() throws Exception {
58-
Metrics.get("remoteTask").start();
5958
final TransportEvent transportEvent = new SQLEvent(nodeId, cur.getCursorId(), joins, rightType, 0, null, null, 0, true);
6059
TransportContext.getInstance().send(transportEvent);
6160
transportEvent.getLatch().await();
62-
Metrics.get("remoteTask").stop();
6361
if (!transportEvent.isFail()) {
6462
final List<FrameApiJoin> rs = ((SQLEvent) transportEvent).getCallback().getResult().getResultSet();
6563
if (rs.size() != joins.size()) {
64+
for (Map.Entry<String, FrameApiJoin> entry : joins.entrySet()) {
65+
entry.getValue().setFailed(true);
66+
}
6667
throw new InternalException();
6768
}
6869
Metrics.get("recordRCount").put(rs.size());
@@ -71,7 +72,7 @@ public Boolean call() throws Exception {
7172
}
7273
} else {
7374
for (Map.Entry<String, FrameApiJoin> entry : joins.entrySet()) {
74-
entry.getValue().setNodeId(Config.getConfig().LOCAL_NODE_ID);
75+
entry.getValue().setFailed(true);
7576
}
7677
}
7778
return true;

src/main/java/su/interference/sql/SQLCursor.java

Lines changed: 65 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ this software and associated documentation files (the "Software"), to deal in
3737
import java.util.*;
3838
import java.util.concurrent.ConcurrentLinkedQueue;
3939
import java.util.concurrent.ExecutorService;
40+
import java.util.concurrent.Executors;
4041
import java.util.concurrent.Future;
4142
import java.lang.reflect.InvocationTargetException;
4243
import java.io.IOException;
@@ -51,9 +52,11 @@ public class SQLCursor implements FrameIterator {
5152
private final int id;
5253
private ResultSet target;
5354
private ExecutorService exec = SQLJoinThreadPool.getThreadPool();
55+
private ExecutorService exec2 = SQLJoinThreadPool.getThreadPool2();
56+
private ExecutorService remotepool = Executors.newCachedThreadPool();
57+
private ExecutorService streampool = Executors.newCachedThreadPool();
5458
private List<FrameApiJoin> tasks;
5559
private List<FrameApiJoin> tasks_;
56-
private Map<Integer, Map<String, FrameApiJoin>> joins;
5760
private FrameData bdnext;
5861
private FrameHolder current;
5962
private int ptr = 0;
@@ -100,16 +103,16 @@ public SQLCursor (int id, FrameIterator lbi, FrameIterator rbi, NestedCondition
100103

101104
if (cur.getType() == Cursor.SLAVE_TYPE && cur.getResultTargetName() != null && this.id == 1) {
102105
target = this.peristent ? s.registerTable(cur.getResultTargetName(), s, rscols, null, null, ixflag && last) : new ResultList(cur.getSqlStmt().getEntityTable());
103-
} else if (cur.getType() == Cursor.MASTER_TYPE) {
104-
target = this.peristent ? s.registerTable("su.interference.persistent.R$" + UUID.randomUUID().toString().replace('-', '$'), s, rscols, null, null, ixflag && last) : new ResultList(cur.getSqlStmt().getEntityTable());
105106
} else if (cur.getType() == Cursor.STREAM_TYPE) {
106107
target = new StreamQueue();
108+
} else
109+
{
110+
target = this.peristent ? s.registerTable("su.interference.persistent.R$" + UUID.randomUUID().toString().replace('-', '$'), s, rscols, null, null, ixflag && last) : new ResultList(cur.getSqlStmt().getEntityTable());
107111
}
108112
current = new FrameHolder(target);
109113

110114
tasks = new ArrayList<FrameApiJoin>();
111115
tasks_ = new ArrayList<FrameApiJoin>();
112-
joins = new HashMap<>();
113116

114117
//rebuild column set for sqlcursor iterator
115118
final SQLCursor cursor_ = lbi.getType() == FrameIterator.TYPE_CURSOR ? (SQLCursor) lbi : rbi != null && rbi.getType() == FrameIterator.TYPE_CURSOR ? (SQLCursor) rbi : null;
@@ -184,19 +187,14 @@ protected FrameJoinTask buildFrameJoinTask(int nodeId, FrameApi bd1, FrameApi bd
184187

185188
public void build() throws InternalException, IOException, ClassNotFoundException, InvocationTargetException, NoSuchMethodException, InstantiationException, IllegalAccessException {
186189
final Integer[] ns = TransportContext.getInstance().getOnlineNodesWithLocal();
187-
for (Integer nodeId : ns) {
188-
if (nodeId != Config.getConfig().LOCAL_NODE_ID) {
189-
joins.put(nodeId, new HashMap<>());
190-
}
191-
}
192190
int i = 0;
193191
//int tnode = 0;
194192

195193
while (lbi.hasNextFrame()) {
196194
final FrameApi bd1 = lbi.nextFrame();
197195

198196
if (rbi == null) {
199-
tasks.add(new FrameApiJoin(ns[i], this, bd1, null, joins));
197+
tasks.add(new FrameApiJoin(ns[i], this, bd1, null));
200198
i++;
201199
if (i == ns.length) { i = 0; }
202200
} else {
@@ -205,45 +203,67 @@ public void build() throws InternalException, IOException, ClassNotFoundExceptio
205203
if (rightType == null) {
206204
rightType = bd2.getClass().getSimpleName();
207205
}
208-
tasks.add(new FrameApiJoin(ns[i], this, bd1, bd2, joins));
206+
tasks.add(new FrameApiJoin(ns[i], this, bd1, bd2));
209207
i++;
210208
if (i == ns.length) { i = 0; }
211209
}
212210
rbi.resetIterator();
213211
}
214212
}
215-
logger.debug("SQL cursor is build: local tasks amount = "+tasks.size()+", use NC check = "+last);
213+
logger.debug("SQL cursor is build: tasks amount = "+tasks.size()+", use NC check = "+last);
216214
if (!sent) {
217-
for (Map.Entry<Integer, Map<String, FrameApiJoin>> entry : joins.entrySet()) {
218-
final RemoteTask rt = new RemoteTask(cur, entry.getKey(), entry.getValue(), rightType);
219-
exec.submit(rt);
215+
for (Integer nodeId : ns) {
216+
if (nodeId != Config.getConfig().LOCAL_NODE_ID) {
217+
final Map<String, FrameApiJoin> joins = new HashMap<>();
218+
for (FrameApiJoin j : tasks) {
219+
if (j.getNodeId() == nodeId) {
220+
joins.put(j.getKey(), j);
221+
}
222+
}
223+
final RemoteTask rt = new RemoteTask(cur, nodeId, joins, rightType);
224+
remotepool.submit(rt);
225+
}
220226
}
221227
sent = true;
222228
}
223229
}
224230

225231
public void stream() throws Exception {
226232
final Queue<FrameApi> q = sfmap.get(lbi.getObjectId());
227-
boolean cnue = true;
228233
if (!cur.isStream()) {
229234
logger.error("wrong stream method call: SQL statement is not a stream");
230235
}
231236
if (q == null) {
232237
throw new RuntimeException("internal error: queue not exist for object id = "+lbi.getObjectId());
233238
}
234-
while (cnue) {
235-
FrameApi f = q.poll();
236-
if (f != null) {
237-
FrameJoinTask task = new FrameJoinTask(cur, f, null, target, rscols, nc, id, Config.getConfig().LOCAL_NODE_ID, last, lbi.isLeftfs(), null, s);
238-
final Future<List<Object>> ft = exec.submit(task);
239-
for (Object o : ft.get()) {
240-
target.persist(o, s);
239+
if (!(target instanceof StreamQueue)) {
240+
throw new RuntimeException("internal error: wrong target type for object id = "+lbi.getObjectId());
241+
}
242+
s.setStream(true);
243+
Runnable r = new Runnable() {
244+
@Override
245+
public void run() {
246+
try {
247+
while (((StreamQueue) target).isRunning()) {
248+
FrameApi f = q.poll();
249+
if (f != null) {
250+
FrameJoinTask task = new FrameJoinTask(cur, f, null, target, rscols, nc, id, Config.getConfig().LOCAL_NODE_ID, last, lbi.isLeftfs(), null, s);
251+
final Future<List<Object>> ft = exec.submit(task);
252+
for (Object o : ft.get()) {
253+
target.persist(o, s);
254+
}
255+
}
256+
if (q.peek() == null) {
257+
Thread.sleep(100);
258+
}
259+
}
260+
} catch (Exception e) {
261+
((StreamQueue) target).stop(s);
262+
throw new RuntimeException(e);
241263
}
242264
}
243-
if (q.peek() == null) {
244-
Thread.sleep(1000);
245-
}
246-
}
265+
};
266+
streampool.submit(r);
247267
}
248268

249269
public static void addStreamFrame(FrameApi f) {
@@ -270,21 +290,33 @@ private synchronized FrameData nextFrame2() throws InternalException {
270290

271291
while (!done && ret==null) {
272292
final ArrayList<Future<FrameApiJoin>> flist = new ArrayList<Future<FrameApiJoin>>();
293+
final ArrayList<Future<FrameApiJoin>> flist2 = new ArrayList<Future<FrameApiJoin>>();
273294
try {
274295
for (int i=0; i < BATCH_SIZE; i++) {
275296
if ((ptr + i) < tasks.size()) {
276297
final FrameApiJoin j = tasks.get(ptr + i);
277-
flist.add(exec.submit(j));
298+
if (j.getNodeId() == Config.getConfig().LOCAL_NODE_ID) {
299+
flist.add(exec.submit(j));
300+
} else {
301+
flist2.add(exec2.submit(j));
302+
}
278303
}
279304
}
280305
for (Future<FrameApiJoin> f : flist) {
281306
final FrameApiJoin j = f.get();
282-
final List<Object> ol = j.getResult();
283-
if (ol == null) {
284-
tasks_.add(j);
307+
logger.debug("SQL cursor next frame: the jointask call returned " + j.getResult().size() + " records");
308+
for (Object o : j.getResult()) {
309+
target.persist(o, s);
310+
}
311+
}
312+
for (Future<FrameApiJoin> f : flist2) {
313+
final FrameApiJoin j = f.get();
314+
if (j.isFailed()) {
315+
FrameApiJoin j_ = new FrameApiJoin(Config.getConfig().LOCAL_NODE_ID, this, j.getBd1(), j.getBd2());
316+
tasks_.add(j_);
285317
} else {
286-
logger.debug("SQL cursor next frame: the jointask call returned " + ol.size() + " records");
287-
for (Object o : ol) {
318+
logger.debug("SQL cursor next frame: the jointask call returned " + j.getResult().size() + " records");
319+
for (Object o : j.getResult()) {
288320
target.persist(o, s);
289321
}
290322
}

0 commit comments

Comments
 (0)