diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java index f8f24ab626..a8073b50fe 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java @@ -48,6 +48,7 @@ import org.apache.hugegraph.backend.id.SnowflakeIdGenerator; import org.apache.hugegraph.backend.query.Query; import org.apache.hugegraph.backend.serializer.AbstractSerializer; +import org.apache.hugegraph.backend.serializer.BytesBuffer; import org.apache.hugegraph.backend.serializer.SerializerFactory; import org.apache.hugegraph.backend.store.BackendFeatures; import org.apache.hugegraph.backend.store.BackendProviderFactory; @@ -231,8 +232,10 @@ public StandardHugeGraph(HugeConfig config) { this.readMode = GraphReadMode.OLTP_ONLY; this.schedulerType = config.get(CoreOptions.SCHEDULER_TYPE); - LockUtil.init(this.spaceGraphName()); - + // Init process-wide static configs before lock, so that validation + // failures won't leave stale lock groups in LockManager. + BytesBuffer.initMaxBufferCapacity( + config.get(CoreOptions.SERIALIZER_BUFFER_MAX_CAPACITY)); MemoryManager.setMemoryMode( MemoryManager.MemoryMode.fromValue(config.get(CoreOptions.MEMORY_MODE))); MemoryManager.setMaxMemoryCapacityInBytes(config.get(CoreOptions.MAX_MEMORY_CAPACITY)); @@ -240,6 +243,8 @@ public StandardHugeGraph(HugeConfig config) { config.get(CoreOptions.ONE_QUERY_MAX_MEMORY_CAPACITY)); RoundUtil.setAlignment(config.get(CoreOptions.MEMORY_ALIGNMENT)); + LockUtil.init(this.spaceGraphName()); + try { this.storeProvider = this.loadStoreProvider(); } catch (Exception e) { diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/serializer/BytesBuffer.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/serializer/BytesBuffer.java index fb52347677..05434ed3bb 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/serializer/BytesBuffer.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/serializer/BytesBuffer.java @@ -77,12 +77,15 @@ public final class BytesBuffer extends OutputStream { public static final int DEFAULT_CAPACITY = 64; public static final int MAX_BUFFER_CAPACITY = 128 * 1024 * 1024; // 128M + public static final int MAX_BUFFER_CAPACITY_UPPER_BOUND = (int) Bytes.GB; public static final int BUF_EDGE_ID = 128; public static final int BUF_PROPERTY = 64; public static final byte[] BYTES_EMPTY = new byte[0]; + private static volatile Integer maxBufferCapacity; + private ByteBuffer buffer; private final boolean resize; @@ -91,10 +94,11 @@ public BytesBuffer() { } public BytesBuffer(int capacity) { - if (capacity > MAX_BUFFER_CAPACITY) { + int maxCapacity = maxBufferCapacity(); + if (capacity > maxCapacity) { E.checkArgument(false, "Capacity %s exceeds max buffer capacity: %s", - capacity, MAX_BUFFER_CAPACITY); + capacity, maxCapacity); } this.buffer = ByteBuffer.allocate(capacity); this.resize = true; @@ -122,6 +126,30 @@ public static BytesBuffer wrap(byte[] array, int offset, int length) { return new BytesBuffer(ByteBuffer.wrap(array, offset, length)); } + public static int maxBufferCapacity() { + Integer capacity = maxBufferCapacity; + return capacity != null ? capacity : MAX_BUFFER_CAPACITY; + } + + public static synchronized void initMaxBufferCapacity(int capacity) { + E.checkArgument(capacity >= DEFAULT_CAPACITY && + capacity <= MAX_BUFFER_CAPACITY_UPPER_BOUND, + "Max buffer capacity must be in range [%s, %s], " + + "but got %s", + DEFAULT_CAPACITY, MAX_BUFFER_CAPACITY_UPPER_BOUND, + capacity); + + if (maxBufferCapacity == null) { + maxBufferCapacity = capacity; + return; + } + + E.checkArgument(maxBufferCapacity == capacity, + "The process-wide serializer buffer max capacity has " + + "been initialized to %s, but got conflicting value %s", + maxBufferCapacity, capacity); + } + public ByteBuffer asByteBuffer() { return this.buffer; } @@ -173,14 +201,18 @@ private void require(int size) { E.checkState(false, "Can't resize for wrapped buffer"); } - // Extra capacity as buffer - int newCapacity = size + this.buffer.limit() + DEFAULT_CAPACITY; - if (newCapacity > MAX_BUFFER_CAPACITY) { + int maxCapacity = maxBufferCapacity(); + long requiredCapacity = (long) this.buffer.position() + size; + if (requiredCapacity > maxCapacity) { E.checkArgument(false, "Capacity %s exceeds max buffer capacity: %s", - newCapacity, MAX_BUFFER_CAPACITY); + requiredCapacity, maxCapacity); } - ByteBuffer newBuffer = ByteBuffer.allocate(newCapacity); + + // Extra capacity as buffer + long newCapacity = Math.min(requiredCapacity + DEFAULT_CAPACITY, + maxCapacity); + ByteBuffer newBuffer = ByteBuffer.allocate((int) newCapacity); this.buffer.flip(); newBuffer.put(this.buffer); this.buffer = newBuffer; @@ -318,7 +350,6 @@ public byte[] readBytes() { public BytesBuffer writeBigBytes(byte[] bytes) { if (bytes.length > BLOB_LEN_MAX) { - // TODO: note the max blob size should be 128MB (due to MAX_BUFFER_CAPACITY) E.checkArgument(false, "The max length of bytes is %s, but got %s", BLOB_LEN_MAX, bytes.length); diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/config/CoreOptions.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/config/CoreOptions.java index ba4d4a1c0e..07109580a0 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/config/CoreOptions.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/config/CoreOptions.java @@ -18,6 +18,7 @@ package org.apache.hugegraph.config; import org.apache.hugegraph.backend.query.Query; +import org.apache.hugegraph.backend.serializer.BytesBuffer; import org.apache.hugegraph.backend.tx.GraphTransaction; import org.apache.hugegraph.type.define.CollectionType; import org.apache.hugegraph.util.Bytes; @@ -79,6 +80,15 @@ public class CoreOptions extends OptionHolder { disallowEmpty(), "text" ); + public static final ConfigOption SERIALIZER_BUFFER_MAX_CAPACITY = + new ConfigOption<>( + "serializer.buffer_max_capacity", + "The process-wide max capacity of one serialization " + + "buffer in bytes.", + rangeInt(BytesBuffer.DEFAULT_CAPACITY, + BytesBuffer.MAX_BUFFER_CAPACITY_UPPER_BOUND), + BytesBuffer.MAX_BUFFER_CAPACITY + ); public static final ConfigOption RAFT_MODE = new ConfigOption<>( "raft.mode", diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/util/LZ4Util.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/util/LZ4Util.java index 041afee7b0..44de9c60f6 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/util/LZ4Util.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/util/LZ4Util.java @@ -64,7 +64,7 @@ public static BytesBuffer decompress(byte[] bytes, int blockSize, float bufferRa LZ4FastDecompressor decompressor = factory.fastDecompressor(); ByteArrayInputStream bais = new ByteArrayInputStream(bytes); int initBufferSize = Math.min(Math.round(bytes.length * ratio), - BytesBuffer.MAX_BUFFER_CAPACITY); + BytesBuffer.maxBufferCapacity()); BytesBuffer buf = new BytesBuffer(initBufferSize); LZ4BlockInputStream lzInput = new LZ4BlockInputStream(bais, decompressor); int count; diff --git a/hugegraph-server/hugegraph-dist/src/assembly/static/conf/graphs/hstore.properties.template b/hugegraph-server/hugegraph-dist/src/assembly/static/conf/graphs/hstore.properties.template index d3834baf5c..f627b62029 100644 --- a/hugegraph-server/hugegraph-dist/src/assembly/static/conf/graphs/hstore.properties.template +++ b/hugegraph-server/hugegraph-dist/src/assembly/static/conf/graphs/hstore.properties.template @@ -24,6 +24,8 @@ edge.cache_type=l2 # version before 1.7.0 of apache hugegraph for your application. backend=hstore serializer=binary +# The process-wide max capacity of one serialization buffer in bytes +#serializer.buffer_max_capacity=134217728 store=hugegraph diff --git a/hugegraph-server/hugegraph-dist/src/assembly/static/conf/graphs/hugegraph.properties b/hugegraph-server/hugegraph-dist/src/assembly/static/conf/graphs/hugegraph.properties index b77cacb2de..bc15c39b39 100644 --- a/hugegraph-server/hugegraph-dist/src/assembly/static/conf/graphs/hugegraph.properties +++ b/hugegraph-server/hugegraph-dist/src/assembly/static/conf/graphs/hugegraph.properties @@ -23,6 +23,8 @@ edge.cache_type=l2 # if you want to use Cassandra/MySql/PG... as backend, please use version < 1.7.0 backend=rocksdb serializer=binary +# The process-wide max capacity of one serialization buffer in bytes +#serializer.buffer_max_capacity=134217728 store=hugegraph diff --git a/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/serializer/BytesBufferTest.java b/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/serializer/BytesBufferTest.java index fb25a5b255..f292808556 100644 --- a/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/serializer/BytesBufferTest.java +++ b/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/serializer/BytesBufferTest.java @@ -18,6 +18,7 @@ package org.apache.hugegraph.unit.serializer; import java.awt.Point; +import java.lang.reflect.Field; import java.util.Arrays; import java.util.Calendar; import java.util.Iterator; @@ -27,10 +28,14 @@ import java.util.TimeZone; import java.util.UUID; +import org.apache.hugegraph.HugeFactory; +import org.apache.hugegraph.HugeGraph; import org.apache.hugegraph.backend.id.Id; import org.apache.hugegraph.backend.id.IdGenerator; import org.apache.hugegraph.backend.id.IdGenerator.UuidId; import org.apache.hugegraph.backend.serializer.BytesBuffer; +import org.apache.hugegraph.config.CoreOptions; +import org.apache.hugegraph.config.HugeConfig; import org.apache.hugegraph.schema.PropertyKey; import org.apache.hugegraph.testutil.Assert; import org.apache.hugegraph.type.define.Cardinality; @@ -38,6 +43,9 @@ import org.apache.hugegraph.unit.BaseUnitTest; import org.apache.hugegraph.unit.FakeObjects; import org.apache.hugegraph.util.Blob; +import org.apache.hugegraph.util.LZ4Util; +import org.junit.After; +import org.junit.Before; import org.junit.Test; import com.google.common.collect.ImmutableList; @@ -45,6 +53,22 @@ public class BytesBufferTest extends BaseUnitTest { + @Before + public void setUp() throws Exception { + resetMaxBufferCapacity(); + } + + @After + public void tearDown() throws Exception { + resetMaxBufferCapacity(); + } + + private static void resetMaxBufferCapacity() throws Exception { + Field field = BytesBuffer.class.getDeclaredField("maxBufferCapacity"); + field.setAccessible(true); + field.set(null, null); + } + @Test public void testAllocate() { Assert.assertEquals(0, BytesBuffer.allocate(0).array().length); @@ -69,6 +93,128 @@ public void testAllocate() { buf0.bytes()); } + @Test + public void testAllocateWithMaxBufferCapacity() { + Assert.assertEquals(BytesBuffer.MAX_BUFFER_CAPACITY, + BytesBuffer.maxBufferCapacity()); + + BytesBuffer.initMaxBufferCapacity(128); + Assert.assertEquals(128, BytesBuffer.maxBufferCapacity()); + BytesBuffer.initMaxBufferCapacity(128); + Assert.assertEquals(128, BytesBuffer.maxBufferCapacity()); + Assert.assertEquals(128, BytesBuffer.allocate(128).array().length); + + Assert.assertThrows(IllegalArgumentException.class, () -> { + BytesBuffer.allocate(129); + }, e -> { + Assert.assertContains("Capacity 129 exceeds max buffer " + + "capacity: 128", + e.getMessage()); + }); + } + + @Test + public void testResizeWithMaxBufferCapacity() { + BytesBuffer.initMaxBufferCapacity(128); + BytesBuffer buffer = BytesBuffer.allocate(64); + buffer.write(new byte[64]); + buffer.write((byte) 1); + Assert.assertEquals(65, buffer.bytes().length); + Assert.assertEquals(128, buffer.array().length); + + buffer.write(new byte[63]); + + Assert.assertThrows(IllegalArgumentException.class, () -> { + buffer.write((byte) 1); + }, e -> { + Assert.assertContains("Capacity 129 exceeds max buffer " + + "capacity: 128", + e.getMessage()); + }); + } + + @Test + public void testSetMaxBufferCapacityWithInvalidValue() { + Assert.assertThrows(IllegalArgumentException.class, () -> { + BytesBuffer.initMaxBufferCapacity( + BytesBuffer.DEFAULT_CAPACITY - 1); + }, e -> { + Assert.assertContains("Max buffer capacity must be in range", + e.getMessage()); + }); + + Assert.assertThrows(IllegalArgumentException.class, () -> { + BytesBuffer.initMaxBufferCapacity( + BytesBuffer.MAX_BUFFER_CAPACITY_UPPER_BOUND + 1); + }, e -> { + Assert.assertContains("Max buffer capacity must be in range", + e.getMessage()); + }); + } + + @Test + public void testLZ4DecompressWithMaxBufferCapacity() { + byte[] bytes = genBytes(256); + BytesBuffer compressed = LZ4Util.compress(bytes, 64); + + BytesBuffer.initMaxBufferCapacity(128); + + Assert.assertThrows(IllegalArgumentException.class, () -> { + LZ4Util.decompress(compressed.bytes(), 64); + }, e -> { + Assert.assertContains("exceeds max buffer capacity: 128", + e.getMessage()); + }); + } + + @Test + public void testMaxBufferCapacityFromGraphConfig() throws Exception { + HugeConfig config = FakeObjects.newConfig(); + config.setProperty(CoreOptions.STORE.name(), "buffer_capacity"); + config.setProperty(CoreOptions.SERIALIZER_BUFFER_MAX_CAPACITY.name(), + 256); + + HugeGraph graph = HugeFactory.open(config); + try { + Assert.assertEquals(256, BytesBuffer.maxBufferCapacity()); + } finally { + graph.close(); + } + } + + @Test + public void testMaxBufferCapacityRejectsConflictingGraphConfig() + throws Exception { + HugeConfig config1 = FakeObjects.newConfig(); + config1.setProperty(CoreOptions.STORE.name(), "buffer_capacity_1"); + config1.setProperty(CoreOptions.SERIALIZER_BUFFER_MAX_CAPACITY.name(), + 256); + + HugeGraph graph1 = HugeFactory.open(config1); + try { + Assert.assertEquals(256, BytesBuffer.maxBufferCapacity()); + + HugeConfig config2 = FakeObjects.newConfig(); + config2.setProperty(CoreOptions.STORE.name(), + "buffer_capacity_2"); + config2.setProperty( + CoreOptions.SERIALIZER_BUFFER_MAX_CAPACITY.name(), 512); + + Assert.assertThrows(IllegalArgumentException.class, () -> { + HugeFactory.open(config2); + }, e -> { + Assert.assertContains("process-wide serializer buffer max " + + "capacity has been initialized to 256", + e.getMessage()); + Assert.assertContains("conflicting value 512", + e.getMessage()); + }); + Assert.assertEquals(256, BytesBuffer.maxBufferCapacity()); + } finally { + graph1.close(); + } + } + @Test public void testWrap() { BytesBuffer buf4 = BytesBuffer.wrap(new byte[]{1, 2, 3, 4});