Skip to content

Commit 91cdf60

Browse files
committed
Merge branch 'develop' into sketch-based-optimizer-4-new-unified-planners-and-optimizers2
# Conflicts: # core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/TxnManager.java
2 parents da9f9af + c6de073 commit 91cdf60

6 files changed

Lines changed: 813 additions & 35 deletions

File tree

core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/TxnManager.java

Lines changed: 175 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,20 @@
1515
import static org.lwjgl.system.MemoryStack.stackPush;
1616
import static org.lwjgl.system.MemoryUtil.NULL;
1717
import static org.lwjgl.util.lmdb.LMDB.MDB_RDONLY;
18+
import static org.lwjgl.util.lmdb.LMDB.MDB_READERS_FULL;
19+
import static org.lwjgl.util.lmdb.LMDB.MDB_SUCCESS;
20+
import static org.lwjgl.util.lmdb.LMDB.mdb_reader_check;
1821
import static org.lwjgl.util.lmdb.LMDB.mdb_txn_abort;
1922
import static org.lwjgl.util.lmdb.LMDB.mdb_txn_begin;
2023
import static org.lwjgl.util.lmdb.LMDB.mdb_txn_renew;
2124
import static org.lwjgl.util.lmdb.LMDB.mdb_txn_reset;
2225

2326
import java.io.Closeable;
2427
import java.io.IOException;
28+
import java.nio.IntBuffer;
29+
import java.util.ArrayList;
2530
import java.util.IdentityHashMap;
31+
import java.util.List;
2632

2733
import org.eclipse.rdf4j.common.concurrent.locks.StampedLongAdderLockManager;
2834
import org.eclipse.rdf4j.sail.SailException;
@@ -35,6 +41,9 @@
3541
*/
3642
class TxnManager {
3743

44+
private static final int READERS_FULL_RETRIES = 500;
45+
private static final long READERS_FULL_WAIT_MILLIS = 10L;
46+
3847
private final Mode mode;
3948
private final IdentityHashMap<Txn, Boolean> active = new IdentityHashMap<>();
4049
private final long[] pool;
@@ -52,12 +61,66 @@ private long startReadTxn() throws IOException {
5261
long readTxn;
5362
try (MemoryStack stack = stackPush()) {
5463
PointerBuffer pp = stack.mallocPointer(1);
55-
E(mdb_txn_begin(env, NULL, MDB_RDONLY, pp));
64+
int rc = mdb_txn_begin(env, NULL, MDB_RDONLY, pp);
65+
if (rc == MDB_READERS_FULL) {
66+
rc = retryStartReadTxn(stack, pp);
67+
}
68+
E(rc);
5669
readTxn = pp.get(0);
5770
}
5871
return readTxn;
5972
}
6073

74+
private int retryStartReadTxn(MemoryStack stack, PointerBuffer pp) throws IOException {
75+
int rc = MDB_READERS_FULL;
76+
for (int i = 0; i < READERS_FULL_RETRIES && rc == MDB_READERS_FULL; i++) {
77+
closePooledReaders();
78+
checkForDeadReaders(stack);
79+
waitForTrackedReaderToClose(null);
80+
rc = mdb_txn_begin(env, NULL, MDB_RDONLY, pp);
81+
}
82+
return rc;
83+
}
84+
85+
private void checkForDeadReaders(MemoryStack stack) throws IOException {
86+
IntBuffer dead = stack.mallocInt(1);
87+
E(mdb_reader_check(env, dead));
88+
}
89+
90+
private void closePooledReaders() {
91+
if (mode == Mode.RESET) {
92+
synchronized (pool) {
93+
while (poolIndex >= 0) {
94+
long txn = pool[poolIndex];
95+
pool[poolIndex--] = 0;
96+
mdb_txn_abort(txn);
97+
}
98+
}
99+
}
100+
}
101+
102+
private void waitForTrackedReaderToClose(Txn excludedTxn) throws IOException {
103+
synchronized (active) {
104+
if (hasTrackedReaders(excludedTxn)) {
105+
try {
106+
active.wait(READERS_FULL_WAIT_MILLIS);
107+
} catch (InterruptedException e) {
108+
Thread.currentThread().interrupt();
109+
throw new IOException(e);
110+
}
111+
}
112+
}
113+
}
114+
115+
private boolean hasTrackedReaders(Txn excludedTxn) {
116+
for (Txn txn : active.keySet()) {
117+
if (txn != excludedTxn) {
118+
return true;
119+
}
120+
}
121+
return false;
122+
}
123+
61124
/**
62125
* Wraps an existing transaction into a txn reference object.
63126
*
@@ -114,7 +177,12 @@ long createReadTxnInternal() throws IOException {
114177
if (txn == 0) {
115178
txn = startReadTxn();
116179
} else {
117-
mdb_txn_renew(txn);
180+
try {
181+
renewReadTxn(txn, null);
182+
} catch (IOException e) {
183+
mdb_txn_abort(txn);
184+
throw e;
185+
}
118186
}
119187
} else {
120188
txn = startReadTxn();
@@ -148,27 +216,36 @@ StampedLongAdderLockManager lockManager() {
148216
}
149217

150218
void activate() throws IOException {
151-
synchronized (active) {
152-
for (Txn txn : active.keySet()) {
153-
txn.setActive(true);
154-
}
219+
for (Txn txn : activeTransactions()) {
220+
txn.setActive(true);
155221
}
156222
}
157223

158224
void deactivate() throws IOException {
159-
synchronized (active) {
160-
for (Txn txn : active.keySet()) {
161-
txn.setActive(false);
162-
}
225+
for (Txn txn : activeTransactions()) {
226+
txn.setActive(false);
163227
}
164228
}
165229

166230
void reset() throws IOException {
231+
for (Txn txn : activeTransactions()) {
232+
txn.reset();
233+
}
234+
}
235+
236+
private List<Txn> activeTransactions() {
167237
synchronized (active) {
168-
for (var entry : active.entrySet()) {
169-
if (Boolean.TRUE.equals(entry.getValue())) {
170-
entry.getKey().reset();
171-
}
238+
return new ArrayList<>(active.keySet());
239+
}
240+
}
241+
242+
private void updateActiveState(Txn txn, boolean isActive) {
243+
synchronized (active) {
244+
if (active.containsKey(txn)) {
245+
active.put(txn, isActive);
246+
}
247+
if (!isActive) {
248+
active.notifyAll();
172249
}
173250
}
174251
}
@@ -181,8 +258,10 @@ enum Mode {
181258

182259
class Txn implements Closeable, AutoCloseable {
183260

184-
private final long txn;
261+
private long txn;
185262
private long version;
263+
private boolean txnActive = true;
264+
private boolean closed;
186265

187266
Txn(long txn) {
188267
this.txn = txn;
@@ -196,12 +275,18 @@ StampedLongAdderLockManager lockManager() {
196275
return lockManager;
197276
}
198277

199-
private void free(long txn) {
278+
private void free(boolean resetTxn) {
279+
if (txn == 0) {
280+
return;
281+
}
282+
200283
switch (mode) {
201284
case RESET:
202285
synchronized (pool) {
203286
if (poolIndex < pool.length - 1) {
204-
mdb_txn_reset(txn);
287+
if (resetTxn) {
288+
mdb_txn_reset(txn);
289+
}
205290
pool[++poolIndex] = txn;
206291
} else {
207292
mdb_txn_abort(txn);
@@ -214,39 +299,103 @@ private void free(long txn) {
214299
case NONE:
215300
break;
216301
}
302+
txn = 0;
217303
}
218304

219305
@Override
220-
public void close() {
221-
synchronized (active) {
222-
active.remove(this);
306+
public synchronized void close() {
307+
if (closed) {
308+
return;
309+
}
310+
closed = true;
311+
synchronized (TxnManager.this.active) {
312+
TxnManager.this.active.remove(this);
313+
}
314+
try {
315+
free(txnActive);
316+
} finally {
317+
synchronized (TxnManager.this.active) {
318+
TxnManager.this.active.notifyAll();
319+
}
223320
}
224-
free(txn);
225321
}
226322

227323
/**
228324
* Resets current transaction as it points to "old" data.
229325
*/
230-
void reset() throws IOException {
231-
mdb_txn_reset(txn);
232-
E(mdb_txn_renew(txn));
326+
synchronized void reset() throws IOException {
327+
if (closed) {
328+
return;
329+
}
330+
if (txnActive) {
331+
mdb_txn_reset(txn);
332+
txnActive = false;
333+
updateActiveState(this, false);
334+
activate();
335+
}
233336
version++;
234337
}
235338

236339
/**
237340
* Triggers active state of current transaction.
238341
*/
239-
void setActive(boolean active) throws IOException {
342+
synchronized void setActive(boolean active) throws IOException {
343+
if (closed) {
344+
return;
345+
}
240346
if (active) {
241-
E(mdb_txn_renew(txn));
347+
activate();
242348
version++;
243349
} else {
350+
deactivate();
351+
}
352+
}
353+
354+
private void activate() throws IOException {
355+
if (!txnActive) {
356+
if (txn == 0) {
357+
txn = startReadTxn();
358+
} else {
359+
renewReadTxn(txn, this);
360+
}
361+
txnActive = true;
362+
updateActiveState(this, true);
363+
}
364+
}
365+
366+
private void deactivate() {
367+
if (txnActive && txn != 0) {
244368
mdb_txn_reset(txn);
245369
}
370+
txnActive = false;
371+
updateActiveState(this, false);
246372
}
247373

248374
long version() {
249375
return version;
250376
}
251377
}
378+
379+
private void renewReadTxn(long txn, Txn excludedTxn) throws IOException {
380+
int rc = mdb_txn_renew(txn);
381+
if (rc == MDB_READERS_FULL) {
382+
rc = retryRenewReadTxn(txn, excludedTxn);
383+
}
384+
if (rc != MDB_SUCCESS) {
385+
E(rc);
386+
}
387+
}
388+
389+
private int retryRenewReadTxn(long txn, Txn excludedTxn) throws IOException {
390+
int rc = MDB_READERS_FULL;
391+
try (MemoryStack stack = stackPush()) {
392+
for (int i = 0; i < READERS_FULL_RETRIES && rc == MDB_READERS_FULL; i++) {
393+
closePooledReaders();
394+
checkForDeadReaders(stack);
395+
waitForTrackedReaderToClose(excludedTxn);
396+
rc = mdb_txn_renew(txn);
397+
}
398+
}
399+
return rc;
400+
}
252401
}

0 commit comments

Comments
 (0)