Skip to content

Commit 2229bf0

Browse files
committed
fixes after review
1 parent c744429 commit 2229bf0

2 files changed

Lines changed: 136 additions & 9 deletions

File tree

core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbSailStore.java

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -416,6 +416,7 @@ public void close() throws SailException {
416416
try {
417417
cancelAndDrainScheduledBackgroundSampling();
418418
cancelAndDrainScheduledEstimatorPersist();
419+
shutdownAndAwaitEstimatorPersistExecutor();
419420
if (sketchBasedJoinEstimator != null) {
420421
sketchBasedJoinEstimator.close();
421422
}
@@ -447,15 +448,7 @@ public void close() throws SailException {
447448
throw new InterruptedSailException(e);
448449
}
449450
} finally {
450-
estimatorPersistExec.shutdown();
451-
try {
452-
while (!estimatorPersistExec.awaitTermination(1, TimeUnit.SECONDS)) {
453-
logger.warn("Waiting for join estimator persist executor to terminate");
454-
}
455-
} catch (InterruptedException e) {
456-
Thread.currentThread().interrupt();
457-
throw new InterruptedSailException(e);
458-
}
451+
shutdownAndAwaitEstimatorPersistExecutor();
459452
tripleStore.close();
460453
}
461454
}
@@ -474,6 +467,18 @@ public void close() throws SailException {
474467
}
475468
}
476469

470+
private void shutdownAndAwaitEstimatorPersistExecutor() {
471+
estimatorPersistExec.shutdown();
472+
try {
473+
while (!estimatorPersistExec.awaitTermination(1, TimeUnit.SECONDS)) {
474+
logger.warn("Waiting for join estimator persist executor to terminate");
475+
}
476+
} catch (InterruptedException e) {
477+
Thread.currentThread().interrupt();
478+
throw new InterruptedSailException(e);
479+
}
480+
}
481+
477482
private void cancelAndDrainScheduledBackgroundSampling() {
478483
ScheduledFuture<?> future = backgroundSamplingFuture;
479484
if (future == null) {
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
/*******************************************************************************
2+
* Copyright (c) 2026 Eclipse RDF4J contributors.
3+
*
4+
* All rights reserved. This program and the accompanying materials
5+
* are made available under the terms of the Eclipse Distribution License v1.0
6+
* which accompanies this distribution, and is available at
7+
* http://www.eclipse.org/org/documents/edl-v10.php.
8+
*
9+
* SPDX-License-Identifier: BSD-3-Clause
10+
*******************************************************************************/
11+
// Some portions generated by Codex
12+
13+
package org.eclipse.rdf4j.sail.lmdb;
14+
15+
import static org.junit.jupiter.api.Assertions.assertEquals;
16+
import static org.junit.jupiter.api.Assertions.assertFalse;
17+
import static org.junit.jupiter.api.Assertions.assertNull;
18+
import static org.junit.jupiter.api.Assertions.assertTrue;
19+
import static org.mockito.Mockito.doAnswer;
20+
import static org.mockito.Mockito.spy;
21+
22+
import java.io.File;
23+
import java.lang.reflect.Field;
24+
import java.util.concurrent.CountDownLatch;
25+
import java.util.concurrent.ScheduledExecutorService;
26+
import java.util.concurrent.ScheduledFuture;
27+
import java.util.concurrent.TimeUnit;
28+
import java.util.concurrent.atomic.AtomicReference;
29+
30+
import org.eclipse.rdf4j.sail.lmdb.config.LmdbStoreConfig;
31+
import org.junit.jupiter.api.Test;
32+
import org.junit.jupiter.api.io.TempDir;
33+
34+
class LmdbSailStoreCloseTest {
35+
36+
@Test
37+
void closeDrainsRunningScheduledPersistBeforeClosingEstimator(@TempDir File dataDir) throws Exception {
38+
LmdbStoreConfig config = new LmdbStoreConfig("spoc").setBackgroundRawSamplingMaxMillisPerCycle(0L);
39+
LmdbStore store = new LmdbStore(dataDir, config);
40+
store.init();
41+
42+
LmdbSailStore backingStore = store.getBackingStore();
43+
LmdbFilterSelectivityStats filterStats = (LmdbFilterSelectivityStats) getField(backingStore,
44+
"filterSelectivityStats");
45+
LmdbFilterSelectivityStats filterStatsSpy = spy(filterStats);
46+
CountDownLatch filterPersistStarted = new CountDownLatch(1);
47+
doAnswer(invocation -> {
48+
filterPersistStarted.countDown();
49+
return invocation.callRealMethod();
50+
}).when(filterStatsSpy).persistIfDirty();
51+
setField(backingStore, "filterSelectivityStats", filterStatsSpy);
52+
53+
ScheduledExecutorService executor = (ScheduledExecutorService) getField(backingStore, "estimatorPersistExec");
54+
CountDownLatch taskStarted = new CountDownLatch(1);
55+
CountDownLatch releaseTask = new CountDownLatch(1);
56+
CountDownLatch taskFinished = new CountDownLatch(1);
57+
AtomicReference<Throwable> closeFailure = new AtomicReference<>();
58+
59+
ScheduledFuture<?> runningPersist = executor.schedule(() -> {
60+
taskStarted.countDown();
61+
try {
62+
assertTrue(releaseTask.await(5, TimeUnit.SECONDS),
63+
"Test timed out before releasing the scheduled persist task");
64+
} catch (InterruptedException e) {
65+
Thread.currentThread().interrupt();
66+
} finally {
67+
taskFinished.countDown();
68+
}
69+
}, 0L, TimeUnit.MILLISECONDS);
70+
setField(backingStore, "persistFuture", runningPersist);
71+
72+
assertTrue(taskStarted.await(5, TimeUnit.SECONDS), "Scheduled persist task should be running before close");
73+
74+
Thread closeThread = new Thread(() -> {
75+
try {
76+
store.shutDown();
77+
} catch (Throwable e) {
78+
closeFailure.set(e);
79+
}
80+
}, "LmdbSailStore-close-test");
81+
closeThread.start();
82+
try {
83+
awaitExecutorShutdown(executor);
84+
85+
assertFalse(isCountedDown(filterPersistStarted),
86+
"Filter stats must not persist while a scheduled persist task is still running");
87+
assertFalse(isCountedDown(taskFinished), "Scheduled persist task should still be blocked by the test");
88+
} finally {
89+
releaseTask.countDown();
90+
closeThread.join(TimeUnit.SECONDS.toMillis(5));
91+
}
92+
93+
assertFalse(closeThread.isAlive(), "Store shutdown should finish after releasing the scheduled task");
94+
assertEquals(0, taskFinished.getCount(), "Scheduled persist task should have finished");
95+
assertEquals(0, filterPersistStarted.getCount(), "Filter stats should persist after scheduled task drain");
96+
assertNull(closeFailure.get(), "Store shutdown should not fail");
97+
}
98+
99+
private static void awaitExecutorShutdown(ScheduledExecutorService executor) throws InterruptedException {
100+
long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(5);
101+
while (!executor.isShutdown() && System.nanoTime() < deadline) {
102+
Thread.sleep(10L);
103+
}
104+
assertTrue(executor.isShutdown(), "Store close should begin draining the estimator scheduler");
105+
}
106+
107+
private static boolean isCountedDown(CountDownLatch latch) {
108+
return latch.getCount() == 0;
109+
}
110+
111+
private static Object getField(Object target, String name) throws ReflectiveOperationException {
112+
Field field = target.getClass().getDeclaredField(name);
113+
field.setAccessible(true);
114+
return field.get(target);
115+
}
116+
117+
private static void setField(Object target, String name, Object value) throws ReflectiveOperationException {
118+
Field field = target.getClass().getDeclaredField(name);
119+
field.setAccessible(true);
120+
field.set(target, value);
121+
}
122+
}

0 commit comments

Comments
 (0)