Skip to content

Commit fde0b71

Browse files
committed
fix: thread safety, crash safety, and code cleanup for S3 SAIL
- Catalog: volatile copy-on-write for thread-safe concurrent reads - S3SailStore: save catalog before values for crash-safe ordering; persist namespaces/values even when memTable is empty; delete old compaction files only after catalog is saved - QuadIndex: fix context=0 treated as wildcard in range scans - QuadStats: filter tombstones from stats computation - FileSystemObjectStore: atomic writes via temp-file-then-rename - L2DiskCache: volatile lastAccessNanos, synchronized eviction - Remove dead config fields (quadIndexes, blockSize, valueCacheSize, valueIdCacheSize) from S3StoreConfig and S3StoreSchema - Unify ALL_INDEXES with SortOrder.values() single source of truth - Fix inline FQNs and wildcard imports across main and test sources - Fix stale javadocs in CompactionPolicy, MemTable, S3SailDataset
1 parent eeabec9 commit fde0b71

32 files changed

Lines changed: 379 additions & 553 deletions

core/sail/s3/src/main/java/org/eclipse/rdf4j/sail/s3/S3SailStore.java

Lines changed: 80 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import java.nio.file.Path;
1515
import java.util.ArrayList;
1616
import java.util.Comparator;
17+
import java.util.HashSet;
1718
import java.util.Iterator;
1819
import java.util.List;
1920
import java.util.Map;
@@ -55,6 +56,7 @@
5556
import org.eclipse.rdf4j.sail.s3.storage.ParquetFileBuilder;
5657
import org.eclipse.rdf4j.sail.s3.storage.ParquetQuadSource;
5758
import org.eclipse.rdf4j.sail.s3.storage.ParquetSchemas;
59+
import org.eclipse.rdf4j.sail.s3.storage.QuadEntry;
5860
import org.eclipse.rdf4j.sail.s3.storage.QuadIndex;
5961
import org.eclipse.rdf4j.sail.s3.storage.QuadStats;
6062
import org.eclipse.rdf4j.sail.s3.storage.RawEntrySource;
@@ -79,12 +81,19 @@
7981
*/
8082
class S3SailStore implements SailStore {
8183

82-
final Logger logger = LoggerFactory.getLogger(S3SailStore.class);
84+
private static final Logger logger = LoggerFactory.getLogger(S3SailStore.class);
8385

8486
private static final QuadIndex SPOC_INDEX = new QuadIndex("spoc");
85-
private static final QuadIndex OPSC_INDEX = new QuadIndex("opsc");
86-
private static final QuadIndex CSPO_INDEX = new QuadIndex("cspo");
87-
private static final List<QuadIndex> ALL_INDEXES = List.of(SPOC_INDEX, OPSC_INDEX, CSPO_INDEX);
87+
private static final List<QuadIndex> ALL_INDEXES;
88+
89+
static {
90+
ParquetSchemas.SortOrder[] orders = ParquetSchemas.SortOrder.values();
91+
List<QuadIndex> indexes = new ArrayList<>(orders.length);
92+
for (ParquetSchemas.SortOrder order : orders) {
93+
indexes.add(new QuadIndex(order.suffix()));
94+
}
95+
ALL_INDEXES = List.copyOf(indexes);
96+
}
8897

8998
private static final int DEFAULT_ROW_GROUP_SIZE = 8 * 1024 * 1024; // 8 MiB
9099
private static final int DEFAULT_PAGE_SIZE = 64 * 1024; // 64 KiB
@@ -218,18 +227,24 @@ private void flushToObjectStore() {
218227
return;
219228
}
220229

230+
// Always persist namespaces and values (they may have changed without any quad writes)
231+
valueStore.serialize(objectStore);
232+
namespaceStore.serialize(objectStore, jsonMapper);
233+
234+
if (memTable.size() == 0) {
235+
return; // no quads to flush — avoid wasting epoch numbers and S3 writes
236+
}
237+
221238
long epoch = epochCounter.getAndIncrement();
222239

223-
if (memTable.size() > 0) {
224-
// Freeze active MemTable and swap in fresh one
225-
MemTable frozen = memTable;
226-
frozen.freeze();
227-
memTable = new MemTable(SPOC_INDEX);
240+
// Freeze active MemTable and swap in fresh one
241+
MemTable frozen = memTable;
242+
frozen.freeze();
243+
memTable = new MemTable(SPOC_INDEX);
228244

229-
List<long[]> allQuads = collectQuads(frozen);
230-
QuadStats stats = QuadStats.fromQuads(allQuads);
231-
writeParquetFiles(epoch, allQuads, stats);
232-
}
245+
List<long[]> allQuads = collectQuads(frozen);
246+
QuadStats stats = QuadStats.fromQuads(allQuads);
247+
writeParquetFiles(epoch, allQuads, stats);
233248

234249
persistMetadata(epoch);
235250
runCompactionIfNeeded();
@@ -254,7 +269,7 @@ private static List<long[]> collectQuads(MemTable frozen) {
254269
private void writeParquetFiles(long epoch, List<long[]> allQuads, QuadStats stats) {
255270
for (QuadIndex sortIndex : ALL_INDEXES) {
256271
String sortSuffix = sortIndex.getFieldSeqString();
257-
List<ParquetFileBuilder.QuadEntry> sorted = sortQuadEntries(allQuads, sortIndex);
272+
List<QuadEntry> sorted = sortQuadEntries(allQuads, sortIndex);
258273

259274
ParquetSchemas.SortOrder sortOrder = ParquetSchemas.SortOrder.fromSuffix(sortSuffix);
260275
byte[] parquetData = ParquetFileBuilder.build(sorted, ParquetSchemas.QUAD_SCHEMA,
@@ -273,17 +288,20 @@ private void writeParquetFiles(long epoch, List<long[]> allQuads, QuadStats stat
273288
}
274289

275290
private void persistMetadata(long epoch) {
276-
valueStore.serialize(objectStore);
277-
namespaceStore.serialize(objectStore, jsonMapper);
291+
// Save catalog first: if we crash after catalog but before values,
292+
// on restart we have new nextValueId but old values — IDs are gaps (safe).
293+
// The reverse (values first, catalog second) risks ID reuse on crash (corruption).
278294
catalog.setNextValueId(valueStore.getNextId());
279295
catalog.setEpoch(epoch);
280296
catalog.save(objectStore, jsonMapper, epoch);
297+
valueStore.serialize(objectStore);
298+
namespaceStore.serialize(objectStore, jsonMapper);
281299
}
282300

283301
/**
284302
* Sorts quad entries according to the given sort index.
285303
*/
286-
private static List<ParquetFileBuilder.QuadEntry> sortQuadEntries(List<long[]> quads, QuadIndex sortIndex) {
304+
private static List<QuadEntry> sortQuadEntries(List<long[]> quads, QuadIndex sortIndex) {
287305
List<long[]> sorted = new ArrayList<>(quads);
288306
String seq = sortIndex.getFieldSeqString();
289307
sorted.sort((a, b) -> {
@@ -297,9 +315,9 @@ private static List<ParquetFileBuilder.QuadEntry> sortQuadEntries(List<long[]> q
297315
return 0;
298316
});
299317

300-
List<ParquetFileBuilder.QuadEntry> result = new ArrayList<>(sorted.size());
318+
List<QuadEntry> result = new ArrayList<>(sorted.size());
301319
for (long[] q : sorted) {
302-
result.add(new ParquetFileBuilder.QuadEntry(q[0], q[1], q[2], q[3], (byte) q[4]));
320+
result.add(new QuadEntry(q[0], q[1], q[2], q[3], (byte) q[4]));
303321
}
304322
return result;
305323
}
@@ -312,27 +330,40 @@ private void runCompactionIfNeeded() {
312330
return;
313331
}
314332

333+
List<Compactor.CompactionResult> results = new ArrayList<>();
315334
List<Catalog.ParquetFileInfo> files = catalog.getFiles();
316335

317336
// L0→L1 compaction
318337
if (compactionPolicy.shouldCompact(files, 0)) {
319338
List<Catalog.ParquetFileInfo> l0Files = CompactionPolicy.filesAtLevel(files, 0);
320339
long compactEpoch = epochCounter.getAndIncrement();
321-
compactor.compact(l0Files, 0, 1, compactEpoch, catalog);
340+
results.add(compactor.compact(l0Files, 0, 1, compactEpoch, catalog));
322341
files = catalog.getFiles();
323342
}
324343

325344
// L1→L2 compaction
326345
if (compactionPolicy.shouldCompact(files, 1)) {
327346
List<Catalog.ParquetFileInfo> l1Files = CompactionPolicy.filesAtLevel(files, 1);
328347
long compactEpoch = epochCounter.getAndIncrement();
329-
compactor.compact(l1Files, 1, 2, compactEpoch, catalog);
348+
results.add(compactor.compact(l1Files, 1, 2, compactEpoch, catalog));
330349
}
331350

332-
// Save catalog after compaction
333-
long epoch = epochCounter.getAndIncrement();
334-
catalog.setEpoch(epoch);
335-
catalog.save(objectStore, jsonMapper, epoch);
351+
if (!results.isEmpty()) {
352+
// Save catalog BEFORE deleting old files — crash-safe ordering
353+
long epoch = epochCounter.getAndIncrement();
354+
catalog.setEpoch(epoch);
355+
catalog.save(objectStore, jsonMapper, epoch);
356+
357+
// Now safe to delete old files
358+
for (Compactor.CompactionResult result : results) {
359+
for (String key : result.getDeletedKeys()) {
360+
objectStore.delete(key);
361+
if (cache != null) {
362+
cache.invalidate(key);
363+
}
364+
}
365+
}
366+
}
336367
}
337368

338369
private boolean hasPersistence() {
@@ -575,24 +606,9 @@ public void approve(Resource subj, IRI pred, Value obj, Resource ctx) throws Sai
575606
public void approveAll(Set<Statement> approved, Set<Resource> approvedContexts) {
576607
sinkStoreAccessLock.lock();
577608
try {
578-
for (Statement statement : approved) {
579-
Resource subj = statement.getSubject();
580-
IRI pred = statement.getPredicate();
581-
Value obj = statement.getObject();
582-
Resource context = statement.getContext();
583-
584-
long s = valueStore.storeValue(subj);
585-
long p = valueStore.storeValue(pred);
586-
long o = valueStore.storeValue(obj);
587-
long c = context == null ? 0 : valueStore.storeValue(context);
588-
589-
if (!explicit) {
590-
mayHaveInferred = true;
591-
}
592-
593-
memTable.put(s, p, o, c, explicit);
609+
for (Statement st : approved) {
610+
storeQuad(st.getSubject(), st.getPredicate(), st.getObject(), explicit, st.getContext());
594611
}
595-
596612
// Size-triggered flush
597613
if (objectStore != null && memTable.approximateSizeInBytes() >= memTableFlushSize) {
598614
flushToObjectStore();
@@ -621,21 +637,23 @@ public boolean supportsDeprecateByQuery() {
621637
private void addStatement(Resource subj, IRI pred, Value obj, boolean explicit, Resource context) {
622638
sinkStoreAccessLock.lock();
623639
try {
624-
long s = valueStore.storeValue(subj);
625-
long p = valueStore.storeValue(pred);
626-
long o = valueStore.storeValue(obj);
627-
long c = context == null ? 0 : valueStore.storeValue(context);
628-
629-
if (!explicit) {
630-
mayHaveInferred = true;
631-
}
632-
633-
memTable.put(s, p, o, c, explicit);
640+
storeQuad(subj, pred, obj, explicit, context);
634641
} finally {
635642
sinkStoreAccessLock.unlock();
636643
}
637644
}
638645

646+
private void storeQuad(Resource subj, IRI pred, Value obj, boolean explicit, Resource context) {
647+
long s = valueStore.storeValue(subj);
648+
long p = valueStore.storeValue(pred);
649+
long o = valueStore.storeValue(obj);
650+
long c = context == null ? 0 : valueStore.storeValue(context);
651+
if (!explicit) {
652+
mayHaveInferred = true;
653+
}
654+
memTable.put(s, p, o, c, explicit);
655+
}
656+
639657
private long removeStatements(Resource subj, IRI pred, Value obj, boolean explicit, Resource... contexts) {
640658
Objects.requireNonNull(contexts,
641659
"contexts argument may not be null; either the value should be cast to Resource or an empty array should be supplied");
@@ -735,7 +753,7 @@ protected Resource convert(long[] quad) {
735753
return val instanceof Resource ? (Resource) val : null;
736754
}
737755
}) {
738-
private final java.util.Set<Resource> seen = new java.util.HashSet<>();
756+
private final Set<Resource> seen = new HashSet<>();
739757

740758
@Override
741759
protected boolean accept(Resource ctx) {
@@ -755,10 +773,13 @@ public CloseableIteration<? extends Statement> getStatements(Resource subj, IRI
755773
return createStatementIterator(subj, pred, obj, explicit, contexts);
756774
}
757775

776+
/**
777+
* @throws UnsupportedOperationException always — ordered iteration is not supported
778+
*/
758779
@Override
759780
public CloseableIteration<? extends Statement> getStatements(StatementOrder statementOrder, Resource subj,
760781
IRI pred, Value obj, Resource... contexts) throws SailException {
761-
throw new UnsupportedOperationException("Not implemented yet");
782+
throw new UnsupportedOperationException("Ordered iteration is not supported by S3Store");
762783
}
763784

764785
@Override
@@ -796,6 +817,11 @@ public Statement next() {
796817
Resource subj = (Resource) valueStore.getValue(quad[0]);
797818
IRI pred = (IRI) valueStore.getValue(quad[1]);
798819
Value obj = valueStore.getValue(quad[2]);
820+
if (subj == null || pred == null || obj == null) {
821+
// Value ID exists in Parquet but not in value store — can happen after
822+
// crash recovery when catalog was saved but value file was not.
823+
return null;
824+
}
799825
Resource ctx = quad[3] == 0 ? null : (Resource) valueStore.getValue(quad[3]);
800826
return valueStore.createStatement(subj, pred, obj, ctx);
801827
}

core/sail/s3/src/main/java/org/eclipse/rdf4j/sail/s3/S3Store.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.eclipse.rdf4j.common.transaction.IsolationLevel;
1919
import org.eclipse.rdf4j.common.transaction.IsolationLevels;
2020
import org.eclipse.rdf4j.model.ValueFactory;
21+
import org.eclipse.rdf4j.model.impl.LinkedHashModel;
2122
import org.eclipse.rdf4j.query.algebra.evaluation.EvaluationStrategyFactory;
2223
import org.eclipse.rdf4j.query.algebra.evaluation.federation.FederatedServiceResolver;
2324
import org.eclipse.rdf4j.query.algebra.evaluation.federation.FederatedServiceResolverClient;
@@ -135,7 +136,7 @@ protected void initializeInternal() throws SailException {
135136

136137
try {
137138
backingStore = new S3SailStore(config);
138-
this.store = new SnapshotSailStore(backingStore, () -> new org.eclipse.rdf4j.model.impl.LinkedHashModel()) {
139+
this.store = new SnapshotSailStore(backingStore, LinkedHashModel::new) {
139140

140141
@Override
141142
public SailSource getExplicitSailSource() {

core/sail/s3/src/main/java/org/eclipse/rdf4j/sail/s3/S3ValueStore.java

Lines changed: 22 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -106,60 +106,26 @@ void serialize(ObjectStore objectStore) {
106106
try {
107107
ByteArrayOutputStream baos = new ByteArrayOutputStream();
108108
DataOutputStream out = new DataOutputStream(baos);
109+
ByteBuffer buf = ByteBuffer.allocate(9); // reusable scratch for varints
109110

110-
int count = idToValue.size();
111-
// Write count as varint
112-
ByteBuffer countBuf = ByteBuffer.allocate(9);
113-
Varint.writeUnsigned(countBuf, count);
114-
out.write(countBuf.array(), 0, countBuf.position());
111+
writeVarint(out, buf, idToValue.size());
115112

116113
for (Map.Entry<Long, Value> entry : idToValue.entrySet()) {
117-
long id = entry.getKey();
114+
writeVarint(out, buf, entry.getKey());
118115
Value val = entry.getValue();
119116

120-
// Write id as varint
121-
ByteBuffer idBuf = ByteBuffer.allocate(9);
122-
Varint.writeUnsigned(idBuf, id);
123-
out.write(idBuf.array(), 0, idBuf.position());
124-
125117
if (val instanceof IRI) {
126-
out.writeByte(0); // type = IRI
127-
byte[] payload = val.stringValue().getBytes(StandardCharsets.UTF_8);
128-
ByteBuffer lenBuf = ByteBuffer.allocate(9);
129-
Varint.writeUnsigned(lenBuf, payload.length);
130-
out.write(lenBuf.array(), 0, lenBuf.position());
131-
out.write(payload);
118+
out.writeByte(0);
119+
writeBytes(out, buf, val.stringValue().getBytes(StandardCharsets.UTF_8));
132120
} else if (val instanceof Literal) {
133-
out.writeByte(1); // type = Literal
121+
out.writeByte(1);
134122
Literal lit = (Literal) val;
135-
byte[] label = lit.getLabel().getBytes(StandardCharsets.UTF_8);
136-
byte[] dt = lit.getDatatype().stringValue().getBytes(StandardCharsets.UTF_8);
137-
String langStr = lit.getLanguage().orElse("");
138-
byte[] lang = langStr.getBytes(StandardCharsets.UTF_8);
139-
140-
ByteBuffer buf = ByteBuffer.allocate(9);
141-
142-
buf.clear();
143-
Varint.writeUnsigned(buf, label.length);
144-
out.write(buf.array(), 0, buf.position());
145-
out.write(label);
146-
147-
buf.clear();
148-
Varint.writeUnsigned(buf, dt.length);
149-
out.write(buf.array(), 0, buf.position());
150-
out.write(dt);
151-
152-
buf.clear();
153-
Varint.writeUnsigned(buf, lang.length);
154-
out.write(buf.array(), 0, buf.position());
155-
out.write(lang);
123+
writeBytes(out, buf, lit.getLabel().getBytes(StandardCharsets.UTF_8));
124+
writeBytes(out, buf, lit.getDatatype().stringValue().getBytes(StandardCharsets.UTF_8));
125+
writeBytes(out, buf, lit.getLanguage().orElse("").getBytes(StandardCharsets.UTF_8));
156126
} else if (val instanceof BNode) {
157-
out.writeByte(2); // type = BNode
158-
byte[] payload = ((BNode) val).getID().getBytes(StandardCharsets.UTF_8);
159-
ByteBuffer lenBuf = ByteBuffer.allocate(9);
160-
Varint.writeUnsigned(lenBuf, payload.length);
161-
out.write(lenBuf.array(), 0, lenBuf.position());
162-
out.write(payload);
127+
out.writeByte(2);
128+
writeBytes(out, buf, ((BNode) val).getID().getBytes(StandardCharsets.UTF_8));
163129
} else {
164130
throw new IllegalStateException("Unsupported value type: " + val.getClass());
165131
}
@@ -249,4 +215,15 @@ void deserialize(ObjectStore objectStore, long nextValueId) {
249215
public void close() {
250216
clear();
251217
}
218+
219+
private static void writeVarint(DataOutputStream out, ByteBuffer buf, long value) throws IOException {
220+
buf.clear();
221+
Varint.writeUnsigned(buf, value);
222+
out.write(buf.array(), 0, buf.position());
223+
}
224+
225+
private static void writeBytes(DataOutputStream out, ByteBuffer buf, byte[] data) throws IOException {
226+
writeVarint(out, buf, data.length);
227+
out.write(data);
228+
}
252229
}

core/sail/s3/src/main/java/org/eclipse/rdf4j/sail/s3/cache/L2DiskCache.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ public void remove(String s3Key) {
108108
}
109109
}
110110

111-
private void evictIfNeeded(long incomingSize) {
111+
private synchronized void evictIfNeeded(long incomingSize) {
112112
while (currentSizeBytes.get() + incomingSize > maxSizeBytes && !index.isEmpty()) {
113113
// Find LRU entry
114114
String lruKey = null;
@@ -196,7 +196,7 @@ static class CacheEntry {
196196
public long sizeBytes;
197197

198198
@JsonProperty("lastAccessNanos")
199-
public long lastAccessNanos;
199+
public volatile long lastAccessNanos;
200200

201201
public CacheEntry() {
202202
// for Jackson deserialization

0 commit comments

Comments
 (0)