Skip to content

Commit ac371f9

Browse files
committed
improvements
1 parent 5cb275b commit ac371f9

11 files changed

Lines changed: 368 additions & 37 deletions

File tree

core/queryalgebra/evaluation/src/main/java/org/eclipse/rdf4j/query/algebra/evaluation/optimizer/FilterSelectivityTelemetry.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ static void annotate(Filter filter, EvaluationStatistics statistics,
4646
String source = filter.getStringMetricPlanned(TelemetryMetricNames.FILTER_SELECTIVITY_SOURCE);
4747
if (!isValidPassRatio(passRatio)) {
4848
passRatio = estimate == null ? -1.0d : estimate.getPassRatio();
49-
source = estimate == null ? null : estimate.getSource().name().toLowerCase();
49+
source = sourceName(estimate);
5050
}
5151
if (!isValidPassRatio(passRatio)) {
5252
double cardinalityPassRatio = estimateCardinalityPassRatio(filter, statistics);
@@ -72,6 +72,13 @@ static void annotate(Filter filter, EvaluationStatistics statistics,
7272
}
7373
}
7474

75+
private static String sourceName(EvaluationStatistics.FilterPassEstimate estimate) {
76+
if (estimate == null || estimate.getSource() == EvaluationStatistics.FilterPassEstimate.Source.UNKNOWN) {
77+
return null;
78+
}
79+
return estimate.getSource().name().toLowerCase();
80+
}
81+
7582
private static double confidenceScore(EvaluationStatistics.FilterPassEstimate estimate, String source) {
7683
if (estimate != null && estimate.getEvidenceCount() >= 0L) {
7784
long evidenceCount = estimate.getEvidenceCount();

core/queryalgebra/evaluation/src/test/java/org/eclipse/rdf4j/query/algebra/evaluation/optimizer/FilterSelectivityTelemetryTest.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
package org.eclipse.rdf4j.query.algebra.evaluation.optimizer;
1313

1414
import static org.junit.jupiter.api.Assertions.assertEquals;
15+
import static org.junit.jupiter.api.Assertions.assertNull;
1516

1617
import org.eclipse.rdf4j.model.impl.SimpleValueFactory;
1718
import org.eclipse.rdf4j.query.algebra.Compare;
@@ -42,6 +43,8 @@ void skipsCardinalityFallbackForComplexFilterInput() {
4243
assertEquals(0, statistics.cardinalityCalls,
4344
"Telemetry should not recursively estimate full cardinality for complex filter inputs");
4445
assertEquals(-1.0d, filter.getDoubleMetricPlanned(TelemetryMetricNames.PLANNED_FILTER_PASS_RATIO));
46+
assertNull(filter.getStringMetricPlanned(TelemetryMetricNames.FILTER_SELECTIVITY_SOURCE),
47+
"Unsupported filter estimates should not render as an explicit unknown selectivity source");
4548
}
4649

4750
@Test

core/sail/base/src/main/java/org/eclipse/rdf4j/sail/base/SketchBasedJoinEstimator.java

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1983,8 +1983,14 @@ public synchronized long rebuild() {
19831983
tgt.clear(); // wipe everything (add + del)
19841984
rebuildEpoch.incrementAndGet(); // mark rebuild in progress (odd epoch)
19851985
long seen = 0L;
1986-
long l = System.currentTimeMillis();
1986+
long startMillis = System.currentTimeMillis();
1987+
long startNanos = System.nanoTime();
19871988
long lastLoggedRebuildMillion = -1L;
1989+
boolean stoppedEarly = false;
1990+
String targetBuffer = rebuildIntoA ? "bufA" : "bufB";
1991+
logger.info(
1992+
"RdfJoinEstimator: Rebuilding sketches started: targetBuffer={}, targetSlot={}, previousCurrentSlot={}, rebuildRequestVersion={}",
1993+
targetBuffer, targetSlot, previousCurrentSlot, requestVersion);
19881994
try {
19891995
try (
19901996
SailDataset ds = sailStore.getExplicitSailSource().dataset(IsolationLevels.READ_COMMITTED);
@@ -1994,6 +2000,7 @@ public synchronized long rebuild() {
19942000

19952001
while (it.hasNext()) {
19962002
if (Thread.currentThread().isInterrupted() || !running && Thread.currentThread() == refresher) {
2003+
stoppedEarly = true;
19972004
break;
19982005
}
19992006
Statement st = it.next();
@@ -2015,15 +2022,18 @@ public synchronized long rebuild() {
20152022
lastLoggedRebuildMillion = seenMillion;
20162023
logger.debug(
20172024
"RdfJoinEstimator: Rebuilding {}, seen {} million triples so far. Elapsed: {} s.",
2018-
rebuildIntoA ? "bufA" : "bufB",
2025+
targetBuffer,
20192026
seenMillion,
2020-
(System.currentTimeMillis() - l) / 1000);
2027+
(System.currentTimeMillis() - startMillis) / 1000);
20212028
}
20222029
}
20232030

20242031
}
20252032
} catch (OutOfMemoryError pressureFailure) {
2026-
2033+
logger.info(
2034+
"RdfJoinEstimator: Rebuilding sketches stopped under memory pressure: targetBuffer={}, targetSlot={}, scannedTriples={}, elapsedMillis={}",
2035+
targetBuffer, targetSlot, seen,
2036+
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos));
20272037
return seen;
20282038
} catch (Throwable e) {
20292039
throw e;
@@ -2067,9 +2077,16 @@ public synchronized long rebuild() {
20672077
lastRebuildPublishMs = System.currentTimeMillis();
20682078
churnSampler.reset();
20692079

2080+
logger.info(
2081+
"RdfJoinEstimator: Rebuilding sketches finished: targetBuffer={}, targetSlot={}, previousCurrentSlot={}, scannedTriples={}, stoppedEarly={}, elapsedMillis={}",
2082+
targetBuffer, targetSlot, previousCurrentSlot, seen, stoppedEarly,
2083+
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos));
20702084
return seen;
20712085
} catch (OutOfMemoryError pressureFailure) {
2072-
2086+
logger.info(
2087+
"RdfJoinEstimator: Rebuilding sketches stopped under memory pressure: targetBuffer={}, targetSlot={}, scannedTriples={}, elapsedMillis={}",
2088+
targetBuffer, targetSlot, seen,
2089+
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos));
20732090
return seen;
20742091
} finally {
20752092
if ((rebuildEpoch.get() & 1L) != 0L) {

core/sail/base/src/main/java/org/eclipse/rdf4j/sail/base/SketchJoinOrderPlanner.java

Lines changed: 0 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,6 @@ final class SketchJoinOrderPlanner {
125125
private final double[][] localFilterPassRatioByVarMask;
126126
private final boolean[][] localFilterPassRatioLoadedByVarMask;
127127
private final Set<Value>[][] finiteBindingVariableValuesByState;
128-
private final Map<Long, Long> variablesMaskMemo = new HashMap<>();
129128
private final Map<Long, Long> boundVariableMaskMemo = new HashMap<>();
130129
private final Map<Long, Set<String>> variableMaskSetMemo = new HashMap<>();
131130
private final Map<AccessShapeKey, SketchBasedJoinEstimator.AccessShape> accessShapeMemo = new HashMap<>();
@@ -3305,29 +3304,6 @@ private double deferredFilterConnectionWeight(JoinOrderPlanner.FilterConstraint
33053304
return deferredFilterOrderingWeight(filter);
33063305
}
33073306

3308-
private long variablesMask(long mask) {
3309-
int stateIndex = stateMemoIndex(mask);
3310-
if (stateIndex >= 0) {
3311-
long variables = variablesMaskByState[stateIndex];
3312-
if (variables == UNCOMPUTED_MASK) {
3313-
variables = computeVariablesMask(mask);
3314-
variablesMaskByState[stateIndex] = variables;
3315-
}
3316-
return variables;
3317-
}
3318-
return variablesMaskMemo.computeIfAbsent(mask, this::computeVariablesMask);
3319-
}
3320-
3321-
private long computeVariablesMask(long mask) {
3322-
long variables = 0L;
3323-
while (mask != 0L) {
3324-
int i = Long.numberOfTrailingZeros(mask);
3325-
mask &= mask - 1L;
3326-
variables |= joinVarMasks[i];
3327-
}
3328-
return variables;
3329-
}
3330-
33313307
private Set<String> boundVariables(long mask) {
33323308
return variableNames(boundVariableMask(mask));
33333309
}

core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbFilterSelectivityStats.java

Lines changed: 41 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,10 +61,14 @@
6161
import org.eclipse.rdf4j.sail.lmdb.TxnManager.Txn;
6262
import org.eclipse.rdf4j.sail.lmdb.config.LmdbStoreConfig;
6363
import org.eclipse.rdf4j.sail.lmdb.model.LmdbValue;
64+
import org.slf4j.Logger;
65+
import org.slf4j.LoggerFactory;
6466

6567
class LmdbFilterSelectivityStats
6668
implements JoinStatsProvider, SketchBasedJoinEstimator.PatternFilterSamplingEstimator {
6769

70+
private static final Logger logger = LoggerFactory.getLogger(LmdbFilterSelectivityStats.class);
71+
6872
private static final SimpleValueFactory VF = SimpleValueFactory.getInstance();
6973
private static final String SIDECAR_SUFFIX = ".filters";
7074
private static final int PERSIST_VERSION = 3;
@@ -305,27 +309,53 @@ int runBackgroundSamplingCycle(long maxMillis) {
305309
if (!backgroundRawSamplingEnabled || maxMillis <= 0L) {
306310
return 0;
307311
}
312+
int pendingBefore = pendingBackgroundSamplingRequests();
313+
if (pendingBefore == 0) {
314+
return 0;
315+
}
316+
long startNanos = System.nanoTime();
317+
logger.info("LMDB background filter sampling cycle started: pendingRequests={}, maxMillis={}", pendingBefore,
318+
maxMillis);
308319
long deadlineNanos = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(maxMillis);
320+
int attemptedCount = 0;
309321
int sampledCount = 0;
310322
while (!samplingDeadlineExceeded(deadlineNanos)) {
311323
List<BackgroundSamplingRequest> requests = drainBackgroundSamplingRequests(1);
312324
if (requests.isEmpty()) {
313325
break;
314326
}
315-
SampledPassRatio sampled = sampleFilterPassRatio(requests.getFirst().toSamplingCandidate(), false,
316-
deadlineNanos);
327+
BackgroundSamplingRequest request = requests.getFirst();
328+
attemptedCount++;
329+
SampledPassRatio sampled = sampleFilterPassRatio(request.toSamplingCandidate(), false, deadlineNanos);
317330
if (!isUsableSampledPassRatio(sampled)) {
331+
logger.info(
332+
"LMDB background filter sampling skipped request: predicate={}, filterKey={}, votes={}, foregroundNeeded={}, expectedRuntimeRows={}, expectedBenefitRows={}",
333+
request.predicateIri(), request.filterKey(), request.voteCount(), request.foregroundNeeded(),
334+
request.expectedRuntimeRows(), request.expectedBenefitRows());
318335
continue;
319336
}
320337
synchronized (this) {
321-
sampledByFilter.put(requests.getFirst().key, sampled);
338+
sampledByFilter.put(request.key, sampled);
322339
dirty = true;
323340
}
341+
logger.info(
342+
"LMDB background filter sampling sampled request: predicate={}, filterKey={}, passRatio={}, sampleSize={}, votes={}, foregroundNeeded={}, expectedRuntimeRows={}, expectedBenefitRows={}",
343+
request.predicateIri(), request.filterKey(), sampled.passRatio, sampled.sampleSize,
344+
request.voteCount(), request.foregroundNeeded(), request.expectedRuntimeRows(),
345+
request.expectedBenefitRows());
324346
sampledCount++;
325347
}
348+
logger.info(
349+
"LMDB background filter sampling cycle finished: attemptedRequests={}, sampledRequests={}, remainingRequests={}, elapsedMillis={}",
350+
attemptedCount, sampledCount, pendingBackgroundSamplingRequests(),
351+
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos));
326352
return sampledCount;
327353
}
328354

355+
synchronized int pendingBackgroundSamplingRequests() {
356+
return backgroundSamplingRequests.size();
357+
}
358+
329359
synchronized void persistIfDirty() {
330360
if (!dirty) {
331361
return;
@@ -571,6 +601,10 @@ private synchronized void voteBackgroundSamplingRequest(SamplingCandidate candid
571601
if (request == null) {
572602
request = new BackgroundSamplingRequest(candidate);
573603
backgroundSamplingRequests.put(candidate.key, request);
604+
logger.info(
605+
"LMDB background filter sampling queued request: predicate={}, filterKey={}, expectedRuntimeRows={}, expectedBenefitRows={}",
606+
request.predicateIri(), request.filterKey(), request.expectedRuntimeRows(),
607+
request.expectedBenefitRows());
574608
}
575609
request.vote(candidate, foregroundNeeded, ++backgroundSamplingSequence);
576610
trimBackgroundSamplingRequests();
@@ -588,6 +622,10 @@ private void trimBackgroundSamplingRequests() {
588622
return;
589623
}
590624
backgroundSamplingRequests.remove(lowest.key);
625+
logger.info(
626+
"LMDB background filter sampling dropped queued request: predicate={}, filterKey={}, votes={}, expectedRuntimeRows={}, expectedBenefitRows={}",
627+
lowest.predicateIri(), lowest.filterKey(), lowest.voteCount(), lowest.expectedRuntimeRows(),
628+
lowest.expectedBenefitRows());
591629
}
592630
}
593631

core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbSailStore.java

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -506,8 +506,14 @@ private void cancelAndDrainScheduledEstimatorPersist() {
506506

507507
private void startBackgroundFilterSampling() {
508508
if (filterSelectivityStats == null || backgroundRawSamplingMaxMillisPerCycle <= 0L) {
509+
logger.info(
510+
"LMDB background filter sampling not scheduled: filterStatsPresent={}, maxMillisPerCycle={}",
511+
filterSelectivityStats != null, backgroundRawSamplingMaxMillisPerCycle);
509512
return;
510513
}
514+
logger.info(
515+
"LMDB background filter sampling scheduled: initialDelayMillis={}, delayMillis={}, maxMillisPerCycle={}",
516+
estimatorPersistDelayMillis, estimatorPersistDelayMillis, backgroundRawSamplingMaxMillisPerCycle);
511517
backgroundSamplingFuture = estimatorPersistExec.scheduleWithFixedDelay(() -> {
512518
try {
513519
runBackgroundFilterSamplingCycle(backgroundRawSamplingMaxMillisPerCycle);
@@ -518,14 +524,36 @@ private void startBackgroundFilterSampling() {
518524
}
519525

520526
int runBackgroundFilterSamplingCycle(long maxMillis) {
521-
if (filterSelectivityStats == null || maxMillis <= 0L || storeTxnStarted.get()) {
527+
if (filterSelectivityStats == null || maxMillis <= 0L) {
528+
logger.info(
529+
"LMDB background filter sampling cycle skipped: filterStatsPresent={}, maxMillis={}",
530+
filterSelectivityStats != null, maxMillis);
531+
return 0;
532+
}
533+
int pendingRequests = filterSelectivityStats.pendingBackgroundSamplingRequests();
534+
if (storeTxnStarted.get()) {
535+
if (pendingRequests > 0) {
536+
logger.info(
537+
"LMDB background filter sampling cycle skipped: store transaction active, pendingRequests={}",
538+
pendingRequests);
539+
}
522540
return 0;
523541
}
524542
if (!sinkStoreAccessLock.tryLock()) {
543+
if (pendingRequests > 0) {
544+
logger.info(
545+
"LMDB background filter sampling cycle skipped: sink store access lock unavailable, pendingRequests={}",
546+
pendingRequests);
547+
}
525548
return 0;
526549
}
527550
try {
528551
if (storeTxnStarted.get()) {
552+
if (pendingRequests > 0) {
553+
logger.info(
554+
"LMDB background filter sampling cycle skipped: store transaction started after lock, pendingRequests={}",
555+
pendingRequests);
556+
}
529557
return 0;
530558
}
531559
int sampled = filterSelectivityStats.runBackgroundSamplingCycle(maxMillis);

core/sail/lmdb/src/main/java/org/eclipse/rdf4j/sail/lmdb/LmdbSketchJoinOptimizer.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2503,15 +2503,24 @@ private TupleExpr applyFilter(TupleExpr root, DeferredFilter deferredFilter, Str
25032503
}
25042504
filter.setStringMetricPlanned(TelemetryMetricNames.DEFERRED_FILTER_SCOPE, placement);
25052505
filter.setStringMetricPlanned(TelemetryMetricNames.OPTIMIZER_STRATEGY, "lmdb-sketch-filter-" + placement);
2506-
filter.setStringMetricPlanned(TelemetryMetricNames.FILTER_SELECTIVITY_SOURCE,
2507-
deferredFilter.passEstimate.getSource().name().toLowerCase());
2506+
String source = filterSelectivitySource(deferredFilter.passEstimate);
2507+
if (source != null) {
2508+
filter.setStringMetricPlanned(TelemetryMetricNames.FILTER_SELECTIVITY_SOURCE, source);
2509+
}
25082510
if (deferredFilter.passEstimate.getEvidenceCount() >= 0L) {
25092511
filter.setLongMetricPlanned(TelemetryMetricNames.PLANNED_FILTER_EVIDENCE_COUNT,
25102512
deferredFilter.passEstimate.getEvidenceCount());
25112513
}
25122514
return filter;
25132515
}
25142516

2517+
private String filterSelectivitySource(EvaluationStatistics.FilterPassEstimate estimate) {
2518+
if (estimate == null || estimate.getSource() == EvaluationStatistics.FilterPassEstimate.Source.UNKNOWN) {
2519+
return null;
2520+
}
2521+
return estimate.getSource().name().toLowerCase();
2522+
}
2523+
25152524
private TupleExpr relocateRootFilterToRightBindingPrefix(TupleExpr root, DeferredFilter deferredFilter) {
25162525
if (LmdbJoinPlanSupport.containsExists(deferredFilter.condition) || !(root instanceof Join join)) {
25172526
return null;

0 commit comments

Comments
 (0)