Skip to content

Commit da9f9af

Browse files
committed
improvements to ingestion
1 parent ac371f9 commit da9f9af

9 files changed

Lines changed: 862 additions & 241 deletions

File tree

core/model/src/main/java/org/eclipse/rdf4j/model/impl/AbstractMemoryOverflowModel.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -243,9 +243,13 @@ public boolean add(Resource subj, IRI pred, Value obj, Resource... contexts) {
243243
return add;
244244
}
245245

246+
long count = 0;
247+
246248
@Override
247249
public boolean add(Statement st) {
248-
checkMemoryOverflow();
250+
if (count++ % BATCH_SIZE == 0) {
251+
checkMemoryOverflow();
252+
}
249253
boolean add = getDelegate().add(st);
250254
if (add && memory instanceof DynamicModel) {
251255
((DynamicModel) memory).maybeRegisterLargeStatementSetForReuse((DynamicModel) memory);

core/sail/base/src/main/java/org/eclipse/rdf4j/sail/base/Changeset.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131

3232
import org.eclipse.rdf4j.common.annotation.Experimental;
3333
import org.eclipse.rdf4j.common.annotation.InternalUseOnly;
34+
import org.eclipse.rdf4j.common.transaction.IsolationLevel;
3435
import org.eclipse.rdf4j.common.transaction.IsolationLevels;
3536
import org.eclipse.rdf4j.model.IRI;
3637
import org.eclipse.rdf4j.model.Model;
@@ -122,6 +123,7 @@ public abstract class Changeset implements SailSink, ModelFactory {
122123
private volatile boolean statementCleared;
123124

124125
private boolean closed;
126+
private IsolationLevel sinkIsolationLevel = IsolationLevels.NONE;
125127

126128
public static boolean isOrderIndependent(Changeset changeset1, Changeset changeset2) {
127129
Objects.requireNonNull(changeset1, "changeset1");
@@ -539,6 +541,7 @@ protected void setChangeset(Changeset from) {
539541
assert !closed;
540542
assert !from.closed;
541543

544+
this.sinkIsolationLevel = from.sinkIsolationLevel;
542545
this.observed = from.observed;
543546
this.approved = from.approved;
544547
this.approvedEmpty = from.approvedEmpty;
@@ -552,6 +555,14 @@ protected void setChangeset(Changeset from) {
552555
this.statementCleared = from.statementCleared;
553556
}
554557

558+
IsolationLevel getSinkIsolationLevel() {
559+
return sinkIsolationLevel;
560+
}
561+
562+
void setSinkIsolationLevel(IsolationLevel sinkIsolationLevel) {
563+
this.sinkIsolationLevel = Objects.requireNonNull(sinkIsolationLevel);
564+
}
565+
555566
/**
556567
* Create a shallow clone of this Changeset. The shallow clone does not clone the underlying data structures, this
557568
* means that any changes made to the original will potentially be reflected in the clone and vice versa.

core/sail/base/src/main/java/org/eclipse/rdf4j/sail/base/SailSourceBranch.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -269,6 +269,7 @@ public Model createEmptyModel() {
269269
return modelFactory.createEmptyModel();
270270
}
271271
};
272+
changeset.setSinkIsolationLevel(level);
272273
try {
273274
semaphore.lock();
274275
pending.add(changeset);
@@ -315,7 +316,7 @@ public void prepare() throws SailException {
315316
semaphore.lock();
316317
if (!changes.isEmpty()) {
317318
if (prepared == null && serializable == null) {
318-
prepared = backingSource.sink(IsolationLevels.NONE);
319+
prepared = backingSource.sink(backingSinkIsolationLevel());
319320
} else if (prepared == null) {
320321
prepared = serializable;
321322
}
@@ -327,6 +328,16 @@ public void prepare() throws SailException {
327328
}
328329
}
329330

331+
private IsolationLevel backingSinkIsolationLevel() {
332+
for (Changeset change : changes) {
333+
IsolationLevel level = change.getSinkIsolationLevel();
334+
if (!IsolationLevels.NONE.isCompatibleWith(level)) {
335+
return level;
336+
}
337+
}
338+
return IsolationLevels.NONE;
339+
}
340+
330341
@Override
331342
public void flush() throws SailException {
332343
try {

0 commit comments

Comments
 (0)