@@ -34,11 +34,10 @@ this software and associated documentation files (the "Software"), to deal in
3434
3535import java .lang .reflect .Field ;
3636import java .net .MalformedURLException ;
37- import java .util .List ;
38- import java .util .UUID ;
37+ import java .util .* ;
38+ import java .util .concurrent . ConcurrentLinkedQueue ;
3939import java .util .concurrent .ExecutorService ;
4040import java .util .concurrent .Future ;
41- import java .util .ArrayList ;
4241import java .lang .reflect .InvocationTargetException ;
4342import java .io .IOException ;
4443
@@ -72,6 +71,7 @@ public class SQLCursor implements FrameIterator {
7271 private final SQLColumn joinedCC ;
7372 private SQLColumn extJoinedCC ;
7473
74+ private static Map <Integer , ConcurrentLinkedQueue <FrameApi >> sfmap = new HashMap ();
7575 private static final int BATCH_SIZE = 4 ;
7676 private final static Logger logger = LoggerFactory .getLogger (SQLCursor .class );
7777
@@ -88,13 +88,19 @@ public SQLCursor (int id, FrameIterator lbi, FrameIterator rbi, NestedCondition
8888 this .last = last ;
8989 this .peristent = !cur .getSqlStmt ().isEntityResult ();
9090
91+ if (cur .isStream ()) {
92+ sfmap .put (lbi .getObjectId (), new ConcurrentLinkedQueue <FrameApi >());
93+ }
94+
9195 //todo wrong case - column set must be rebuilded for prevent bad indexes intersect cases
9296 //todo need to refactor SQLJoin - extJoinedCC must be set on construct time
9397
9498 if (cur .getType () == Cursor .SLAVE_TYPE && cur .getResultTargetName () != null && this .id == 1 ) {
9599 target = this .peristent ? s .registerTable (cur .getResultTargetName (), s , rscols , null , null , ixflag && last ) : new ResultList (cur .getSqlStmt ().getEntityTable ());
96- } else {
100+ } else if ( cur . getType () == Cursor . MASTER_TYPE ) {
97101 target = this .peristent ? s .registerTable ("su.interference.persistent.R$" + UUID .randomUUID ().toString ().replace ('-' , '$' ), s , rscols , null , null , ixflag && last ) : new ResultList (cur .getSqlStmt ().getEntityTable ());
102+ } else if (cur .getType () == Cursor .STREAM_TYPE ) {
103+ target = new StreamQueue ();
98104 }
99105 current = new FrameHolder (target );
100106
@@ -192,6 +198,37 @@ public void build() throws InternalException, IOException, ClassNotFoundExceptio
192198 logger .debug ("SQL cursor is build: tasks amount = " +tasks .size ()+", use NC check = " +last );
193199 }
194200
201+ public void stream () throws Exception {
202+ final Queue <FrameApi > q = sfmap .get (lbi .getObjectId ());
203+ boolean cnue = true ;
204+ if (!cur .isStream ()) {
205+ logger .error ("wrong stream method call: SQL statement is not a stream" );
206+ }
207+ if (q == null ) {
208+ throw new RuntimeException ("internal error: queue not exist for object id = " +lbi .getObjectId ());
209+ }
210+ while (cnue ) {
211+ FrameApi f = q .poll ();
212+ if (f != null ) {
213+ FrameJoinTask task = new FrameJoinTask (cur , f , null , target , rscols , nc , id , Config .getConfig ().LOCAL_NODE_ID , last , lbi .isLeftfs (), null , s );
214+ final Future <List <Object >> ft = exec .submit (task );
215+ for (Object o : ft .get ()) {
216+ target .persist (o , s );
217+ }
218+ }
219+ if (q .peek () == null ) {
220+ Thread .sleep (1000 );
221+ }
222+ }
223+ }
224+
225+ public static void addStreamFrame (FrameApi f ) {
226+ final Queue <FrameApi > q = sfmap .get (f .getObjectId ());
227+ if (q != null ) {
228+ q .add (f );
229+ }
230+ }
231+
195232 // execute single task (on local node only)
196233 public List <Object > execute (FrameApi bd1 , FrameApi bd2 ) throws Exception {
197234 final FrameJoinTask task = new FrameJoinTask (cur , bd1 , bd2 , target , rscols , nc , id , Config .getConfig ().LOCAL_NODE_ID , last , lbi .isLeftfs (), hmap , s );
0 commit comments