Skip to content

Commit fca7beb

Browse files
Merge pull request #13 from yuriy-glotanov/master
stream prototype
2 parents 1f87093 + 0f20889 commit fca7beb

11 files changed

Lines changed: 130 additions & 10 deletions

File tree

src/main/java/su/interference/core/SyncQueue.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ this software and associated documentation files (the "Software"), to deal in
3131
import org.slf4j.LoggerFactory;
3232
import su.interference.persistent.*;
3333
import su.interference.exception.*;
34+
import su.interference.sql.SQLCursor;
3435
import su.interference.transport.TransportSyncTask;
3536

3637
/**
@@ -66,6 +67,7 @@ private synchronized boolean syncFramesFromQueue() throws Exception {
6667
FreeFrame fb = null;
6768
try {
6869
frames.add(new SyncFrame((Frame) entry.getValue(), s, fb));
70+
SQLCursor.addStreamFrame(((Frame) entry.getValue()).getFrameData());
6971
} catch (MissingSyncFrameException e) {
7072
logger.debug("Unable to sync frame "+((Frame) entry.getValue()).getPtr()+" because removed by freeing");
7173
}

src/main/java/su/interference/persistent/Cursor.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ public class Cursor implements Serializable {
4848
@Transient
4949
public static final int SLAVE_TYPE = 2;
5050
@Transient
51+
public static final int STREAM_TYPE = 3;
52+
@Transient
5153
public static final int STATE_IDLE = 1;
5254
@Transient
5355
public static final int STATE_PREPARED = 2;
@@ -211,4 +213,9 @@ public void setState(int state) {
211213
public void startStatement(long tranId) throws Exception {
212214
this.session.startStatement(tranId);
213215
}
216+
217+
public boolean isStream() {
218+
return this.type == STREAM_TYPE ? true : false;
219+
}
220+
214221
}

src/main/java/su/interference/persistent/Table.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1164,6 +1164,10 @@ protected DataChunk persist (final Object o, final Session s, final LLT extllt)
11641164
}
11651165
}
11661166

1167+
public Object poll() {
1168+
return null;
1169+
}
1170+
11671171
private void persistIndexes(DataChunk c, Session s, LLT llt) throws Exception {
11681172
for (IndexDescript ids : this.getIndexNames()) {
11691173
final Table ixt = Instance.getInstance().getTableByName("su.interference.persistent."+ids.getName());

src/main/java/su/interference/sql/ResultList.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,10 @@ public DataChunk persist(Object o, Session s) {
5353
return null;
5454
}
5555

56+
public Object poll() {
57+
return null;
58+
}
59+
5660
public List<Chunk> getAll(Session s) {
5761
return null;
5862
}

src/main/java/su/interference/sql/ResultSet.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ this software and associated documentation files (the "Software"), to deal in
4141

4242
public interface ResultSet {
4343
DataChunk persist(Object o, Session s) throws Exception;
44+
Object poll();
4445
List<Chunk> getAll(Session s) throws Exception;
4546
ArrayList<Object> getAll(Session s, int ptr) throws Exception;
4647
int getObjectId();

src/main/java/su/interference/sql/ResultSetImpl.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,10 @@ public DataChunk persist(Object o, Session s) throws Exception {
5656
return null;
5757
}
5858

59+
public Object poll() {
60+
return null;
61+
}
62+
5963
public List<Chunk> getAll(Session s) throws Exception {
6064
return null;
6165
}

src/main/java/su/interference/sql/SQLCursor.java

Lines changed: 41 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,10 @@ this software and associated documentation files (the "Software"), to deal in
3434

3535
import java.lang.reflect.Field;
3636
import java.net.MalformedURLException;
37-
import java.util.List;
38-
import java.util.UUID;
37+
import java.util.*;
38+
import java.util.concurrent.ConcurrentLinkedQueue;
3939
import java.util.concurrent.ExecutorService;
4040
import java.util.concurrent.Future;
41-
import java.util.ArrayList;
4241
import java.lang.reflect.InvocationTargetException;
4342
import java.io.IOException;
4443

@@ -72,6 +71,7 @@ public class SQLCursor implements FrameIterator {
7271
private final SQLColumn joinedCC;
7372
private SQLColumn extJoinedCC;
7473

74+
private static Map<Integer, ConcurrentLinkedQueue<FrameApi>> sfmap = new HashMap();
7575
private static final int BATCH_SIZE = 4;
7676
private final static Logger logger = LoggerFactory.getLogger(SQLCursor.class);
7777

@@ -88,13 +88,19 @@ public SQLCursor (int id, FrameIterator lbi, FrameIterator rbi, NestedCondition
8888
this.last = last;
8989
this.peristent = !cur.getSqlStmt().isEntityResult();
9090

91+
if (cur.isStream()) {
92+
sfmap.put(lbi.getObjectId(), new ConcurrentLinkedQueue<FrameApi>());
93+
}
94+
9195
//todo wrong case - column set must be rebuilded for prevent bad indexes intersect cases
9296
//todo need to refactor SQLJoin - extJoinedCC must be set on construct time
9397

9498
if (cur.getType() == Cursor.SLAVE_TYPE && cur.getResultTargetName() != null && this.id == 1) {
9599
target = this.peristent ? s.registerTable(cur.getResultTargetName(), s, rscols, null, null, ixflag && last) : new ResultList(cur.getSqlStmt().getEntityTable());
96-
} else {
100+
} else if (cur.getType() == Cursor.MASTER_TYPE) {
97101
target = this.peristent ? s.registerTable("su.interference.persistent.R$" + UUID.randomUUID().toString().replace('-', '$'), s, rscols, null, null, ixflag && last) : new ResultList(cur.getSqlStmt().getEntityTable());
102+
} else if (cur.getType() == Cursor.STREAM_TYPE) {
103+
target = new StreamQueue();
98104
}
99105
current = new FrameHolder(target);
100106

@@ -192,6 +198,37 @@ public void build() throws InternalException, IOException, ClassNotFoundExceptio
192198
logger.debug("SQL cursor is build: tasks amount = "+tasks.size()+", use NC check = "+last);
193199
}
194200

201+
public void stream() throws Exception {
202+
final Queue<FrameApi> q = sfmap.get(lbi.getObjectId());
203+
boolean cnue = true;
204+
if (!cur.isStream()) {
205+
logger.error("wrong stream method call: SQL statement is not a stream");
206+
}
207+
if (q == null) {
208+
throw new RuntimeException("internal error: queue not exist for object id = "+lbi.getObjectId());
209+
}
210+
while (cnue) {
211+
FrameApi f = q.poll();
212+
if (f != null) {
213+
FrameJoinTask task = new FrameJoinTask(cur, f, null, target, rscols, nc, id, Config.getConfig().LOCAL_NODE_ID, last, lbi.isLeftfs(), null, s);
214+
final Future<List<Object>> ft = exec.submit(task);
215+
for (Object o : ft.get()) {
216+
target.persist(o, s);
217+
}
218+
}
219+
if (q.peek() == null) {
220+
Thread.sleep(1000);
221+
}
222+
}
223+
}
224+
225+
public static void addStreamFrame(FrameApi f) {
226+
final Queue<FrameApi> q = sfmap.get(f.getObjectId());
227+
if (q != null) {
228+
q.add(f);
229+
}
230+
}
231+
195232
// execute single task (on local node only)
196233
public List<Object> execute(FrameApi bd1, FrameApi bd2) throws Exception {
197234
final FrameJoinTask task = new FrameJoinTask(cur, bd1, bd2, target, rscols, nc, id, Config.getConfig().LOCAL_NODE_ID, last, lbi.isLeftfs(), hmap, s);

src/main/java/su/interference/sql/SQLJoin.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,11 @@ public ResultSet executeJoin(Session s, int mode, ArrayList<SQLSetValue> uset) t
124124
SQLCursor last = null;
125125
for (SQLCursor sqlc : preparedCursors) {
126126
last = sqlc;
127-
sqlc.build();
127+
if (cur.isStream()) {
128+
sqlc.stream();
129+
} else {
130+
sqlc.build();
131+
}
128132
}
129133

130134
temp = last.flushTarget();

src/main/java/su/interference/sql/SQLJoinThreadPool.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ this software and associated documentation files (the "Software"), to deal in
3434

3535
public class SQLJoinThreadPool {
3636

37-
private static ExecutorService exec = Executors.newFixedThreadPool(8);
37+
private static ExecutorService exec = Executors.newFixedThreadPool(4);
3838

3939
public static ExecutorService getThreadPool() {
4040
return exec;

src/main/java/su/interference/sql/SQLSelect.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -148,16 +148,16 @@ private final void parseSQL (String s, Cursor cur, Session sn) throws Exception
148148
final String sql = s.trim();
149149
final String SQL = s.toUpperCase().trim();
150150

151-
cursor = cur==null?new Cursor(sql, Cursor.MASTER_TYPE):cur;
151+
cursor = cur==null?new Cursor(sql, this.stream ? Cursor.STREAM_TYPE : Cursor.MASTER_TYPE):cur;
152152
cursor.setSqlStmt(this);
153153

154154
if (!SQL.startsWith(SELECT_CLAUSE)) {
155155
throw new InvalidSQLStatement();
156156
}
157-
if (SQL.indexOf(STREAM_CLAUSE) >6 ) {
157+
if (SQL.indexOf(STREAM_CLAUSE) == SELECT_CLAUSE.length()) {
158158
this.stream = true;
159159
}
160-
if (SQL.indexOf(DISTINCT_CLAUSE) >6 ) {
160+
if (SQL.indexOf(DISTINCT_CLAUSE) == SELECT_CLAUSE.length()) {
161161
this.distinct = true;
162162
}
163163
if (SQL.indexOf(FROM_CLAUSE) < SELECT_CLAUSE.length() + 1) {
@@ -186,7 +186,10 @@ private final void parseSQL (String s, Cursor cur, Session sn) throws Exception
186186
}
187187
}
188188

189-
final String[] clds = sql.substring(SELECT_CLAUSE.length(), SQL.indexOf(FROM_CLAUSE)).trim().split(",");
189+
final int cldsStart = stream ? SELECT_CLAUSE.length() + STREAM_CLAUSE.length() :
190+
distinct ? SELECT_CLAUSE.length() + DISTINCT_CLAUSE.length() :
191+
SELECT_CLAUSE.length();
192+
final String[] clds = sql.substring(cldsStart, SQL.indexOf(FROM_CLAUSE)).trim().split(",");
190193
final String[] tbls = sql.substring(SQL.indexOf(FROM_CLAUSE)+6, baselen).trim().split(",");
191194

192195
final int wpos = SQL.indexOf(WHERE_CLAUSE);

0 commit comments

Comments
 (0)