Skip to content

Commit 4eae393

Browse files
committed
fixed memory usage
1 parent 6445d18 commit 4eae393

10 files changed

Lines changed: 958 additions & 330 deletions

File tree

core/sail/base/src/main/java/org/eclipse/rdf4j/sail/base/SketchBasedJoinEstimator.java

Lines changed: 396 additions & 194 deletions
Large diffs are not rendered by default.

core/sail/base/src/main/java/org/eclipse/rdf4j/sail/base/TupleSketchOps.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,30 @@ static ArrayOfDoublesSketch intersectMin(ArrayOfDoublesSketch left, ArrayOfDoubl
109109
return intersectMinStats(left, right, k).sketch();
110110
}
111111

112+
static ArrayOfDoublesSketch subtractPositive(ArrayOfDoublesSketch additions, ArrayOfDoublesSketch deletions,
113+
int k) {
114+
if (additions == null || additions.getRetainedEntries() == 0) {
115+
return newSketch(k).compact();
116+
}
117+
if (deletions == null || deletions.getRetainedEntries() == 0) {
118+
return additions;
119+
}
120+
DirectLookupTable deletionLookup = buildLookupTable(deletions);
121+
ArrayOfDoublesUpdatableSketch result = newSketch(k);
122+
double[] scratch = new double[1];
123+
ArrayOfDoublesSketchIterator iterator = additions.iterator();
124+
while (iterator.next()) {
125+
double value = Math.max(0.0d, iterator.getValues()[0])
126+
- Math.max(0.0d, deletionLookup.find(iterator.getKey()));
127+
if (value <= 0.0d) {
128+
continue;
129+
}
130+
scratch[0] = value;
131+
result.update(iterator.getKey(), scratch);
132+
}
133+
return result.compact();
134+
}
135+
112136
static IntersectionStats intersectProductStats(ArrayOfDoublesSketch left, ArrayOfDoublesSketch right, int k) {
113137
return intersect(left, right, k, MULTIPLY_POSITIVE_SUMMARIES, true);
114138
}

core/sail/base/src/test/java/org/eclipse/rdf4j/sail/base/SketchBasedJoinEstimatorAdaptiveIncrementalQueueTest.java

Lines changed: 70 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
import static org.junit.jupiter.api.Assertions.assertTrue;
1818

1919
import java.lang.reflect.Field;
20+
import java.nio.file.Files;
21+
import java.nio.file.Path;
2022
import java.util.ArrayList;
2123
import java.util.List;
2224
import java.util.concurrent.TimeUnit;
@@ -96,7 +98,7 @@ void partialStatementQueueFlushesOnDemand() throws Exception {
9698
}
9799

98100
@Test
99-
void bulkStatementAddsBypassPerStatementQueueAndApplyExactly() {
101+
void bulkStatementAddsUseEstimatorOwnedStatementQueue() {
100102
SketchBasedJoinEstimator estimator = newEstimator();
101103
IRI predicate = VF.createIRI("urn:adaptive:bulk:p");
102104
List<Statement> statements = new ArrayList<>();
@@ -106,8 +108,8 @@ void bulkStatementAddsBypassPerStatementQueueAndApplyExactly() {
106108

107109
estimator.addStatements(statements);
108110

109-
assertEquals(0, estimator.debugPendingIncrementalCount(),
110-
"Bulk committed adds should not fill the caller-side per-statement queue");
111+
assertEquals(512, estimator.debugPendingIncrementalCount(),
112+
"Bulk committed adds should use the same estimator-owned per-statement queue");
111113
estimator.debugFlushPendingIncremental();
112114
assertTrue(estimator.isReadyNonBlocking(), "Bulk exact ingest should leave the estimator ready");
113115
assertTrue(estimator.cardinalitySingle(SketchBasedJoinEstimator.Component.P, predicate.stringValue()) > 100.0,
@@ -133,6 +135,66 @@ void memoryPressureFallsBackToCoarseDeltaAndRebuild() throws Exception {
133135
}));
134136
}
135137

138+
@Test
139+
void bulkStatementAddsUsePeriodicMemoryMonitoringBeforeExactRetain() throws Exception {
140+
SketchBasedJoinEstimator estimator = newEstimator(SketchBasedJoinEstimator.Config.defaults()
141+
.withNominalEntries(64)
142+
.withThrottleEveryN(1)
143+
.withThrottleMillis(0)
144+
.withRefreshSleepMillis(5)
145+
.withMemoryMonitorCheckInterval(1)
146+
.withMemoryMonitorEstimatedOperationBytes(Long.MAX_VALUE / 2));
147+
IRI predicate = VF.createIRI("urn:adaptive:bulk-memory:p");
148+
List<Statement> statements = new ArrayList<>();
149+
for (int i = 0; i < 512; i++) {
150+
statements.add(statement("urn:adaptive:bulk-memory:" + i, predicate,
151+
"urn:adaptive:bulk-memory:o:" + i));
152+
}
153+
154+
estimator.addStatements(statements);
155+
156+
assertFalse(estimator.isReadyNonBlocking(),
157+
"Bulk memory monitor fallback should mark the estimator for rebuild");
158+
assertEquals(0, estimator.debugPendingIncrementalCount(),
159+
"Bulk fallback should not retain exact statement updates");
160+
assertEquals(0.0,
161+
estimator.cardinalitySingle(SketchBasedJoinEstimator.Component.P, predicate.stringValue()),
162+
"Bulk fallback should not publish partial exact sketch updates");
163+
}
164+
165+
@Test
166+
void rebuildUsesPeriodicMemoryMonitoringInsteadOfPublishingPartialSketches() throws Exception {
167+
StubSailStore store = new StubSailStore();
168+
IRI predicate = VF.createIRI("urn:adaptive:rebuild-memory:p");
169+
for (int i = 0; i < 8; i++) {
170+
store.add(statement("urn:adaptive:rebuild-memory:" + i, predicate,
171+
"urn:adaptive:rebuild-memory:o:" + i));
172+
}
173+
SketchBasedJoinEstimator estimator = new SketchBasedJoinEstimator(store,
174+
SketchBasedJoinEstimator.Config.defaults()
175+
.withNominalEntries(64)
176+
.withRefreshSleepMillis(5)
177+
.withMemoryMonitorCheckInterval(1)
178+
.withMemoryMonitorEstimatedOperationBytes(Long.MAX_VALUE / 2));
179+
180+
estimator.rebuild();
181+
182+
assertFalse(estimator.isReadyNonBlocking(),
183+
"Rebuild memory monitor fallback should leave the estimator awaiting a later rebuild");
184+
assertEquals(0.0,
185+
estimator.cardinalitySingle(SketchBasedJoinEstimator.Component.P, predicate.stringValue()),
186+
"Rebuild memory fallback should not publish partial sketches");
187+
}
188+
189+
@Test
190+
void estimatorDoesNotCatchOutOfMemoryError() throws Exception {
191+
String source = Files.readString(Path.of(
192+
"src/main/java/org/eclipse/rdf4j/sail/base/SketchBasedJoinEstimator.java"));
193+
194+
assertFalse(source.contains("catch (OutOfMemoryError"),
195+
"Estimator should use proactive memory monitoring instead of catching OutOfMemoryError");
196+
}
197+
136198
@Test
137199
void idleMonitorFlushesPartialQueueAndResetsGrowth() throws Exception {
138200
withProperty("incrementalQueueInitialLimit", "4",
@@ -153,13 +215,17 @@ void idleMonitorFlushesPartialQueueAndResetsGrowth() throws Exception {
153215
}
154216

155217
private static SketchBasedJoinEstimator newEstimator() {
156-
return new SketchBasedJoinEstimator(new StubSailStore(), SketchBasedJoinEstimator.Config.defaults()
218+
return newEstimator(SketchBasedJoinEstimator.Config.defaults()
157219
.withNominalEntries(64)
158220
.withThrottleEveryN(1)
159221
.withThrottleMillis(0)
160222
.withRefreshSleepMillis(5));
161223
}
162224

225+
private static SketchBasedJoinEstimator newEstimator(SketchBasedJoinEstimator.Config config) {
226+
return new SketchBasedJoinEstimator(new StubSailStore(), config);
227+
}
228+
163229
private static Statement statement(String subject, IRI predicate, String object) {
164230
return VF.createStatement(VF.createIRI(subject), predicate, VF.createIRI(object));
165231
}
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
/*******************************************************************************
2+
* Copyright (c) 2026 Eclipse RDF4J contributors.
3+
*
4+
* All rights reserved. This program and the accompanying materials
5+
* are made available under the terms of the Eclipse Distribution License v1.0
6+
* which accompanies this distribution, and is available at
7+
* http://www.eclipse.org/org/documents/edl-v10.php.
8+
*
9+
* SPDX-License-Identifier: BSD-3-Clause
10+
*******************************************************************************/
11+
// Some portions generated by Codex
12+
13+
package org.eclipse.rdf4j.sail.base;
14+
15+
import static org.junit.jupiter.api.Assertions.assertEquals;
16+
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
17+
import static org.junit.jupiter.api.Assertions.assertNotNull;
18+
import static org.junit.jupiter.api.Assertions.assertThrows;
19+
import static org.junit.jupiter.api.Assertions.assertTrue;
20+
21+
import java.lang.reflect.Field;
22+
import java.util.EnumMap;
23+
24+
import org.eclipse.rdf4j.model.IRI;
25+
import org.eclipse.rdf4j.model.Statement;
26+
import org.eclipse.rdf4j.model.ValueFactory;
27+
import org.eclipse.rdf4j.model.impl.SimpleValueFactory;
28+
import org.junit.jupiter.api.Test;
29+
30+
class SketchBasedJoinEstimatorMemoryLayoutTest {
31+
32+
private static final ValueFactory VF = SimpleValueFactory.getInstance();
33+
34+
@Test
35+
void stateAllocatesDeleteOnlySketchContainers() throws Exception {
36+
SketchBasedJoinEstimator estimator = new SketchBasedJoinEstimator(new StubSailStore(), smallConfig());
37+
Object state = privateFieldValue(estimator, "bufA");
38+
39+
assertNotNull(privateFieldValue(state, "delSingleTriples"));
40+
assertNotNull(privateFieldValue(state, "delSingles"));
41+
assertNotNull(privateFieldValue(state, "delPairs"));
42+
}
43+
44+
@Test
45+
void deleteUpdatesUseTombstoneSketches() {
46+
SketchBasedJoinEstimator estimator = new SketchBasedJoinEstimator(new StubSailStore(), smallConfig());
47+
IRI predicate = VF.createIRI("urn:memory-layout:p");
48+
Statement statement = VF.createStatement(VF.createIRI("urn:memory-layout:s"), predicate,
49+
VF.createIRI("urn:memory-layout:o"));
50+
51+
estimator.addStatement(statement);
52+
estimator.debugFlushPendingIncremental();
53+
54+
assertTrue(estimator.cardinalitySingle(SketchBasedJoinEstimator.Component.P, predicate.stringValue()) > 0.0);
55+
56+
estimator.deleteStatement(statement);
57+
estimator.debugFlushPendingIncremental();
58+
59+
assertEquals(0.0, estimator.cardinalitySingle(SketchBasedJoinEstimator.Component.P, predicate.stringValue()),
60+
0.0001);
61+
assertTrue(
62+
estimator.debugResidentSketches()
63+
.stream()
64+
.anyMatch(SketchBasedJoinEstimator.DebugSketchAddress::isDelete),
65+
"Delete updates should create resident tombstone sketches");
66+
}
67+
68+
@Test
69+
void pairSketchesUseSingleLevelPackedSparseArrays() throws Exception {
70+
SketchBasedJoinEstimator estimator = new SketchBasedJoinEstimator(new StubSailStore(), smallConfig());
71+
Statement statement = VF.createStatement(VF.createIRI("urn:memory-layout:s"),
72+
VF.createIRI("urn:memory-layout:p"), VF.createIRI("urn:memory-layout:o"),
73+
VF.createIRI("urn:memory-layout:c"));
74+
75+
estimator.addStatement(statement);
76+
estimator.debugFlushPendingIncremental();
77+
78+
Object state = privateFieldValue(estimator, "bufA");
79+
EnumMap<?, ?> pairs = assertInstanceOf(EnumMap.class, privateFieldValue(state, "pairs"));
80+
Object spBuild = pairs.get(SketchBasedJoinEstimator.Pair.SP);
81+
82+
assertThrows(NoSuchFieldException.class, () -> spBuild.getClass().getDeclaredField("rows"),
83+
"Pair sketches should not use row maps or a two-level sparse structure");
84+
assertSingleLevelSparseArray(privateFieldValue(spBuild, "triples"), 3);
85+
assertSingleLevelSparseArray(privateFieldValue(spBuild, "comp1"), 3);
86+
assertSingleLevelSparseArray(privateFieldValue(spBuild, "comp2"), 3);
87+
}
88+
89+
private static SketchBasedJoinEstimator.Config smallConfig() {
90+
return SketchBasedJoinEstimator.Config.defaults()
91+
.withNominalEntries(64)
92+
.withSubjectBucketCount(8)
93+
.withPredicateBucketCount(8)
94+
.withObjectBucketCount(8)
95+
.withContextBucketCount(4)
96+
.withThrottleEveryN(1)
97+
.withThrottleMillis(0);
98+
}
99+
100+
private static Object privateFieldValue(Object target, String fieldName) throws Exception {
101+
Field field = target.getClass().getDeclaredField(fieldName);
102+
field.setAccessible(true);
103+
return field.get(target);
104+
}
105+
106+
private static void assertSingleLevelSparseArray(Object sparseArray, int expectedYBits) throws Exception {
107+
assertNotNull(sparseArray);
108+
assertInstanceOf(long[].class, privateFieldValue(sparseArray, "keys"));
109+
assertTrue(((long[]) privateFieldValue(sparseArray, "keys")).length > 0);
110+
assertInstanceOf(Object[].class, privateFieldValue(sparseArray, "values"));
111+
assertEquals(expectedYBits, (int) privateFieldValue(sparseArray, "yBits"));
112+
}
113+
}

0 commit comments

Comments
 (0)