Skip to content

Commit 5b61461

Browse files
stream prototype (further impl)
1 parent ca41411 commit 5b61461

5 files changed

Lines changed: 112 additions & 3 deletions

File tree

src/main/java/su/interference/core/Frame.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -529,12 +529,20 @@ public synchronized ArrayList<Chunk> getFrameChunks (Session s) throws IOExcepti
529529
}
530530
if (dataObject.isNoTran() || s.isStream()) {
531531

532+
final int streamptr = s.isStream() ? s.streamFramePtr(this) : 0;
533+
532534
for (Chunk c : data.getChunks()) {
533535
if (c.getHeader().getState()==Header.RECORD_NORMAL_STATE) {
534-
res.add(c);
536+
if (c.getHeader().getPtr() >= streamptr) {
537+
res.add(c);
538+
}
535539
}
536540
}
537541

542+
if (s.isStream()) {
543+
s.streamFramePtr(this, rowCntr);
544+
}
545+
538546
} else { //if (local()) {
539547

540548
final long tr = s.getTransaction().getTransId();

src/main/java/su/interference/persistent/Session.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ this software and associated documentation files (the "Software"), to deal in
4242
import java.lang.reflect.InvocationTargetException;
4343
import java.lang.reflect.Modifier;
4444
import java.net.MalformedURLException;
45+
import java.util.concurrent.ConcurrentHashMap;
4546
import java.util.concurrent.ExecutorService;
4647
import java.util.concurrent.Executors;
4748
import javax.persistence.*;
@@ -112,6 +113,8 @@ public static int getCLASS_ID() {
112113
@Transient
113114
private final ExecutorService streampool = Executors.newCachedThreadPool();
114115
@Transient
116+
private final Map<Long, Integer> streammap = new ConcurrentHashMap<>();
117+
@Transient
115118
private volatile boolean stream;
116119

117120
public static Session getContextSession() {
@@ -541,6 +544,14 @@ public static void setDntmSession(Session dntmSession) {
541544
Session.dntmSession = dntmSession;
542545
}
543546

547+
public void streamFramePtr(Frame f, int ptr) {
548+
streammap.put(f.getAllocFile()+f.getAllocPointer(), ptr);
549+
}
550+
551+
public int streamFramePtr(Frame f) {
552+
return streammap.get(f.getAllocFile()+f.getAllocPointer()) == null ? 0 : streammap.get(f.getAllocFile()+f.getAllocPointer());
553+
}
554+
544555
public boolean isStream() {
545556
return stream;
546557
}
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
package su.interference.sql;
2+
3+
import su.interference.core.DataChunk;
4+
import su.interference.core.EntityContainer;
5+
import su.interference.persistent.Cursor;
6+
import su.interference.persistent.Session;
7+
import su.interference.persistent.Table;
8+
9+
import java.util.Queue;
10+
11+
public class FrameGroupTask implements Runnable {
12+
13+
private final Cursor cur;
14+
private final Queue<Object> q;
15+
private final ResultSet target;
16+
private final Table gtable;
17+
private final Session s;
18+
19+
public FrameGroupTask(Cursor cur, Queue<Object> q, ResultSet target, Table gtable, Session s) {
20+
this.cur = cur;
21+
this.q = q;
22+
this.target = target;
23+
this.gtable = gtable;
24+
this.s = s;
25+
}
26+
27+
@Override
28+
public void run() {
29+
30+
final boolean ixflag = cur.getSqlStmt().getCols().getOrderColumns().size() > 0;
31+
32+
try {
33+
DataChunk cdc = null;
34+
SQLGroup sqlg = null;
35+
while (((StreamQueue) target).isRunning()) {
36+
EntityContainer o = (EntityContainer) q.poll();
37+
if (o != null) {
38+
if (cdc != null) {
39+
if (o.getDataChunk().compare(cdc, cur.getSqlStmt().getCols().getGroupColumns().size()) == 0) { //cdc & c chunks grouped
40+
sqlg.add(o.getDataChunk());
41+
} else { // start next group
42+
DataChunk gdc = sqlg.getDC();
43+
Object oo = gdc.getEntity(gtable);
44+
target.persist(oo, s);
45+
sqlg = new SQLGroup(o.getDataChunk(), cur.getSqlStmt().getCols().getColumns());
46+
sqlg.add(o.getDataChunk());
47+
}
48+
} else {
49+
sqlg = new SQLGroup(o.getDataChunk(), cur.getSqlStmt().getCols().getColumns());
50+
sqlg.add(o.getDataChunk());
51+
}
52+
cdc = o.getDataChunk();
53+
54+
if (q.peek() == null) {
55+
Thread.sleep(100);
56+
}
57+
}
58+
}
59+
DataChunk gdc = sqlg.getDC();
60+
Object oo = gdc.getEntity(gtable);
61+
target.persist(oo, s);
62+
} catch (Exception e) {
63+
((StreamQueue) target).stop(s);
64+
throw new RuntimeException(e);
65+
}
66+
}
67+
68+
}

src/main/java/su/interference/sql/SQLCursor.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ public class SQLCursor implements FrameIterator {
5555
private ExecutorService exec2 = SQLJoinThreadPool.getThreadPool2();
5656
private ExecutorService remotepool = Executors.newCachedThreadPool();
5757
private ExecutorService streampool = Executors.newCachedThreadPool();
58+
private ExecutorService groupspool = Executors.newCachedThreadPool();
5859
private List<FrameApiJoin> tasks;
5960
private List<FrameApiJoin> tasks_;
6061
private FrameData bdnext;
@@ -229,6 +230,7 @@ public void build() throws InternalException, IOException, ClassNotFoundExceptio
229230

230231
public void stream() throws Exception {
231232
final Queue<FrameApi> q = sfmap.get(lbi.getObjectId());
233+
final Table gtable = Instance.getInstance().getTableById(lbi.getObjectId());
232234
if (!cur.isStream()) {
233235
logger.error("wrong stream method call: SQL statement is not a stream");
234236
}
@@ -242,14 +244,27 @@ public void stream() throws Exception {
242244
Runnable r = new Runnable() {
243245
@Override
244246
public void run() {
247+
Thread.currentThread().setName("Stream SQL thread "+Thread.currentThread().getId());
245248
try {
249+
final ConcurrentLinkedQueue<Object> q_in = new ConcurrentLinkedQueue<>();
250+
FrameGroupTask group = null;
246251
while (((StreamQueue) target).isRunning()) {
247252
FrameApi f = q.poll();
248253
if (f != null) {
249254
FrameJoinTask task = new FrameJoinTask(cur, f, null, target, rscols, nc, id, Config.getConfig().LOCAL_NODE_ID, last, lbi.isLeftfs(), null, s);
250255
final Future<List<Object>> ft = exec.submit(task);
251-
for (Object o : ft.get()) {
252-
target.persist(o, s);
256+
if (cur.getSqlStmt().isGroupedResult()) {
257+
if (group == null) {
258+
group = new FrameGroupTask(cur, q_in, target, gtable, s);
259+
groupspool.submit(group);
260+
}
261+
for (Object o : ft.get()) {
262+
q_in.add(o);
263+
}
264+
} else {
265+
for (Object o : ft.get()) {
266+
target.persist(o, s);
267+
}
253268
}
254269
}
255270
if (q.peek() == null) {

src/main/java/su/interference/sql/SQLSelect.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -383,6 +383,13 @@ public void setEntityResult(boolean entityResult) {
383383
this.entityResult = entityResult;
384384
}
385385

386+
public boolean isGroupedResult() {
387+
if (cols.getGroupColumns().size() > 0 || cols.getFResultColumns().size() > 0) {
388+
return true;
389+
}
390+
return false;
391+
}
392+
386393
public Table getEntityTable() {
387394
return entityTable;
388395
}

0 commit comments

Comments
 (0)