Skip to content

Commit 727d87d

Browse files
pre-release 2020.3 sql rs refactoring
1 parent 84914b7 commit 727d87d

20 files changed

Lines changed: 440 additions & 324 deletions

src/main/java/su/interference/core/Chunk.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ public interface Chunk extends Comparable {
4545
int getBytesAmount();
4646
Comparable getId (Session s) throws InvocationTargetException, NoSuchMethodException, IllegalAccessException;
4747
ValueSet getDcs();
48+
boolean isTerminate();
49+
void setTerminate(boolean terminate);
4850
Object getEntity() throws ClassNotFoundException, InstantiationException, IllegalAccessException, NoSuchMethodException, InvocationTargetException;
4951
Object getUndoEntity() throws ClassNotFoundException, InstantiationException, IllegalAccessException, NoSuchMethodException, InvocationTargetException;
5052
void updateEntity(Object o) throws InternalException, ClassNotFoundException, InstantiationException, IllegalAccessException, NoSuchMethodException, InvocationTargetException, MalformedURLException;

src/main/java/su/interference/core/DataChunk.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ public class DataChunk implements Chunk {
6363
private Object undoentity;
6464
private DataChunk source;
6565
private UndoChunk uc;
66+
private boolean terminate;
6667
private final CustomSerializer sr = new CustomSerializer();
6768

6869
//returns datacolumn set
@@ -882,6 +883,14 @@ protected synchronized void undo(Session s, LLT llt) throws Exception {
882883
}
883884
}
884885

886+
public boolean isTerminate() {
887+
return terminate;
888+
}
889+
890+
public void setTerminate(boolean terminate) {
891+
this.terminate = terminate;
892+
}
893+
885894
private byte[] getBytesFromHexString(String s) {
886895
byte b[] = new byte[s.length()/2];
887896
for (int i=0; i<s.length(); i+=2) {

src/main/java/su/interference/core/Instance.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -435,8 +435,7 @@ private void startProcesses(Session s) throws Exception {
435435
}
436436

437437
private void stopProcesses(Session s) throws Exception {
438-
final Table t = getTableByName("su.interference.persistent.Process");
439-
final ArrayList<Object> pss = t.getAll(s, 0);
438+
final List<Process> pss = getProcesses();
440439
for (Object p : pss) {
441440
final Process ps = (Process) p;
442441
ps.stop();
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package su.interference.core;
2+
3+
import java.util.concurrent.Callable;
4+
5+
public interface ManagedCallable<T> extends Callable<T> {
6+
void stop();
7+
}

src/main/java/su/interference/core/ManageProcess.java renamed to src/main/java/su/interference/core/ManagedProcess.java

Lines changed: 34 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,34 @@
1-
/**
2-
The MIT License (MIT)
3-
4-
Copyright (c) 2010-2019 head systems, ltd
5-
6-
Permission is hereby granted, free of charge, to any person obtaining a copy of
7-
this software and associated documentation files (the "Software"), to deal in
8-
the Software without restriction, including without limitation the rights to
9-
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
10-
the Software, and to permit persons to whom the Software is furnished to do so,
11-
subject to the following conditions:
12-
13-
The above copyright notice and this permission notice shall be included in all
14-
copies or substantial portions of the Software.
15-
16-
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17-
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
18-
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
19-
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
20-
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
21-
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
22-
23-
*/
24-
25-
package su.interference.core;
26-
27-
/**
28-
* @author Yuriy Glotanov
29-
* @since 1.0
30-
*/
31-
32-
public interface ManageProcess {
33-
void stop() throws InterruptedException;
34-
}
1+
/**
2+
The MIT License (MIT)
3+
4+
Copyright (c) 2010-2019 head systems, ltd
5+
6+
Permission is hereby granted, free of charge, to any person obtaining a copy of
7+
this software and associated documentation files (the "Software"), to deal in
8+
the Software without restriction, including without limitation the rights to
9+
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
10+
the Software, and to permit persons to whom the Software is furnished to do so,
11+
subject to the following conditions:
12+
13+
The above copyright notice and this permission notice shall be included in all
14+
copies or substantial portions of the Software.
15+
16+
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17+
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
18+
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
19+
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
20+
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
21+
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
22+
23+
*/
24+
25+
package su.interference.core;
26+
27+
/**
28+
* @author Yuriy Glotanov
29+
* @since 1.0
30+
*/
31+
32+
public interface ManagedProcess {
33+
void stop() throws InterruptedException;
34+
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
package su.interference.core;
2+
3+
import java.util.concurrent.LinkedBlockingQueue;
4+
5+
public class RetrieveQueue {
6+
private final LinkedBlockingQueue<Chunk> q;
7+
private final ManagedCallable r;
8+
private boolean retrieve = true;
9+
10+
public RetrieveQueue(LinkedBlockingQueue<Chunk> q, ManagedCallable r) {
11+
this.q = q;
12+
this.r = r;
13+
}
14+
15+
public Object poll() {
16+
try {
17+
if (retrieve) {
18+
Chunk c = q.take();
19+
if (c.isTerminate()) {
20+
retrieve = false;
21+
return null;
22+
}
23+
return c.getEntity();
24+
}
25+
} catch (Exception e) {
26+
e.printStackTrace();
27+
}
28+
return null;
29+
}
30+
31+
public Chunk cpoll() {
32+
try {
33+
if (retrieve) {
34+
Chunk c = q.take();
35+
if (c.isTerminate()) {
36+
retrieve = false;
37+
return null;
38+
}
39+
return c;
40+
}
41+
} catch (Exception e) {
42+
e.printStackTrace();
43+
}
44+
return null;
45+
}
46+
47+
public void stop() {
48+
retrieve = false;
49+
if (r != null) {
50+
r.stop();
51+
}
52+
}
53+
54+
public LinkedBlockingQueue<Chunk> getQ() {
55+
return q;
56+
}
57+
58+
public ManagedCallable getR() {
59+
return r;
60+
}
61+
62+
public boolean isRetrieve() {
63+
return retrieve;
64+
}
65+
}

src/main/java/su/interference/core/SyncQueue.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ this software and associated documentation files (the "Software"), to deal in
4141
* @since 1.0
4242
*/
4343

44-
public class SyncQueue implements Runnable, ManageProcess {
44+
public class SyncQueue implements Runnable, ManagedProcess {
4545

4646
private volatile boolean f = true;
4747
private volatile boolean running = false;

src/main/java/su/interference/persistent/Process.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ public void start (Runnable r, Session s) {
9494

9595
public void stop () throws InterruptedException {
9696
if (th!=null&&ro!=null) {
97-
((ManageProcess)ro).stop();
97+
((ManagedProcess)ro).stop();
9898
th.join();
9999
}
100100
}

src/main/java/su/interference/persistent/Session.java

Lines changed: 30 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ this software and associated documentation files (the "Software"), to deal in
4545
import java.util.concurrent.ConcurrentHashMap;
4646
import java.util.concurrent.ExecutorService;
4747
import java.util.concurrent.Executors;
48+
import java.util.concurrent.Future;
4849
import javax.persistence.*;
4950

5051
/**
@@ -111,7 +112,11 @@ public static int getCLASS_ID() {
111112
private static ThreadLocal<Session> contextSession = new ThreadLocal<Session>();
112113

113114
@Transient
114-
private final ExecutorService streampool = Executors.newCachedThreadPool();
115+
private static final ExecutorService rqpool = Executors.newCachedThreadPool();
116+
@Transient
117+
private volatile RetrieveQueue retrieveQueue;
118+
@Transient
119+
private static final ExecutorService streampool = Executors.newCachedThreadPool();
115120
@Transient
116121
private final Map<Long, Integer> streammap = new ConcurrentHashMap<>();
117122
@Transient
@@ -168,7 +173,7 @@ private void startStatement(LLT llt) throws Exception {
168173
//this.getTransaction().startStatement(this, llt);
169174
}
170175

171-
public void setTransaction(Transaction transaction) {
176+
protected void setTransaction(Transaction transaction) {
172177
this.transaction = transaction;
173178
}
174179

@@ -269,27 +274,36 @@ public EntityFactory getEntityFactory(Class c) {
269274
return new EntityFactory(c, this);
270275
}
271276

272-
public List<Object> getAll (Class c) throws Exception {
277+
public Object find (Class c, long id) throws Exception {
273278
final Table t = Instance.getInstance().getTableByName(c.getName());
274279
if (t != null) {
275280
this.startStatement();
276-
// t.lockTable(this);
277-
final List<Object> r = t.getAll(this, 0);
278-
// t.unlockTable(this);
279-
return r;
281+
return t.getChunkById(id, this).getEntity();
280282
}
281283
return null;
282284
}
283285

284-
public Object find (Class c, long id) throws Exception {
285-
final Table t = Instance.getInstance().getTableByName(c.getName());
286+
protected synchronized RetrieveQueue getContentQueue(Table t) {
286287
if (t != null) {
287-
this.startStatement();
288-
return t.getChunkById(id, this).getEntity();
288+
retrieveQueue = t.getContentQueue(this);
289+
Future f = rqpool.submit(retrieveQueue.getR());
290+
return retrieveQueue;
289291
}
290292
return null;
291293
}
292294

295+
public void closeQueue() {
296+
if (this.retrieveQueue != null) {
297+
retrieveQueue.stop();
298+
}
299+
retrieveQueue = null;
300+
}
301+
302+
public void close() {
303+
rqpool.shutdownNow();
304+
streampool.isShutdown();
305+
}
306+
293307
//todo uncommitted data not retrieved
294308
public void stream (Class c, StreamCallable task) {
295309
final Map<Long, Long> retrieved = new HashMap<>();
@@ -318,17 +332,7 @@ public void run() {
318332
});
319333
}
320334

321-
public List<Object> ngetAll (Class c) throws Exception {
322-
final Table t = Instance.getInstance().getTableByName(c.getName());
323-
if (t != null) {
324-
// t.lockTable(this);
325-
final List<Object> r = t.getAll(this, 0);
326-
// t.unlockTable(this);
327-
return r;
328-
}
329-
return null;
330-
}
331-
335+
@Deprecated
332336
public Object nfind (Class c, long id) throws InternalException, NoSuchMethodException, InvocationTargetException, IOException, InvalidFrameHeader, InvalidFrame, EmptyFrameHeaderFound, IncorrectUndoChunkFound, ClassNotFoundException, InstantiationException, IllegalAccessException {
333337
final Table t = Instance.getInstance().getTableByName(c.getName());
334338
if (t != null) {
@@ -544,6 +548,10 @@ public static void setDntmSession(Session dntmSession) {
544548
Session.dntmSession = dntmSession;
545549
}
546550

551+
public RetrieveQueue getRetrieveQueue() {
552+
return retrieveQueue;
553+
}
554+
547555
public void streamFramePtr(Frame f, int ptr) {
548556
streammap.put(f.getAllocFile()+f.getAllocPointer(), ptr);
549557
}

0 commit comments

Comments
 (0)