Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.hugegraph.backend.id.SnowflakeIdGenerator;
import org.apache.hugegraph.backend.query.Query;
import org.apache.hugegraph.backend.serializer.AbstractSerializer;
import org.apache.hugegraph.backend.serializer.BytesBuffer;
import org.apache.hugegraph.backend.serializer.SerializerFactory;
import org.apache.hugegraph.backend.store.BackendFeatures;
import org.apache.hugegraph.backend.store.BackendProviderFactory;
Expand Down Expand Up @@ -233,6 +234,8 @@ public StandardHugeGraph(HugeConfig config) {

LockUtil.init(this.spaceGraphName());

BytesBuffer.initMaxBufferCapacity(
config.get(CoreOptions.SERIALIZER_BUFFER_MAX_CAPACITY));
MemoryManager.setMemoryMode(
MemoryManager.MemoryMode.fromValue(config.get(CoreOptions.MEMORY_MODE)));
MemoryManager.setMaxMemoryCapacityInBytes(config.get(CoreOptions.MAX_MEMORY_CAPACITY));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,15 @@ public final class BytesBuffer extends OutputStream {

public static final int DEFAULT_CAPACITY = 64;
public static final int MAX_BUFFER_CAPACITY = 128 * 1024 * 1024; // 128M
public static final int MAX_BUFFER_CAPACITY_UPPER_BOUND = (int) Bytes.GB;

public static final int BUF_EDGE_ID = 128;
public static final int BUF_PROPERTY = 64;

public static final byte[] BYTES_EMPTY = new byte[0];

private static volatile Integer maxBufferCapacity;

private ByteBuffer buffer;
private final boolean resize;

Expand All @@ -91,10 +94,11 @@ public BytesBuffer() {
}

public BytesBuffer(int capacity) {
if (capacity > MAX_BUFFER_CAPACITY) {
int maxCapacity = maxBufferCapacity();
if (capacity > maxCapacity) {
E.checkArgument(false,
"Capacity %s exceeds max buffer capacity: %s",
capacity, MAX_BUFFER_CAPACITY);
capacity, maxCapacity);
}
this.buffer = ByteBuffer.allocate(capacity);
this.resize = true;
Expand Down Expand Up @@ -122,6 +126,30 @@ public static BytesBuffer wrap(byte[] array, int offset, int length) {
return new BytesBuffer(ByteBuffer.wrap(array, offset, length));
}

public static int maxBufferCapacity() {
Integer capacity = maxBufferCapacity;
return capacity != null ? capacity : MAX_BUFFER_CAPACITY;
}

public static synchronized void initMaxBufferCapacity(int capacity) {
E.checkArgument(capacity >= DEFAULT_CAPACITY &&
capacity <= MAX_BUFFER_CAPACITY_UPPER_BOUND,
"Max buffer capacity must be in range [%s, %s], " +
"but got %s",
DEFAULT_CAPACITY, MAX_BUFFER_CAPACITY_UPPER_BOUND,
capacity);

if (maxBufferCapacity == null) {
maxBufferCapacity = capacity;
return;
}

E.checkArgument(maxBufferCapacity == capacity,
"The process-wide serializer buffer max capacity has " +
"been initialized to %s, but got conflicting value %s",
maxBufferCapacity, capacity);
}

public ByteBuffer asByteBuffer() {
return this.buffer;
}
Expand Down Expand Up @@ -173,14 +201,18 @@ private void require(int size) {
E.checkState(false, "Can't resize for wrapped buffer");
}

// Extra capacity as buffer
int newCapacity = size + this.buffer.limit() + DEFAULT_CAPACITY;
if (newCapacity > MAX_BUFFER_CAPACITY) {
int maxCapacity = maxBufferCapacity();
long requiredCapacity = (long) this.buffer.position() + size;
if (requiredCapacity > maxCapacity) {
E.checkArgument(false,
"Capacity %s exceeds max buffer capacity: %s",
newCapacity, MAX_BUFFER_CAPACITY);
requiredCapacity, maxCapacity);
}
ByteBuffer newBuffer = ByteBuffer.allocate(newCapacity);

// Extra capacity as buffer
long newCapacity = Math.min(requiredCapacity + DEFAULT_CAPACITY,
maxCapacity);
ByteBuffer newBuffer = ByteBuffer.allocate((int) newCapacity);
this.buffer.flip();
newBuffer.put(this.buffer);
this.buffer = newBuffer;
Expand Down Expand Up @@ -318,7 +350,6 @@ public byte[] readBytes() {

public BytesBuffer writeBigBytes(byte[] bytes) {
if (bytes.length > BLOB_LEN_MAX) {
// TODO: note the max blob size should be 128MB (due to MAX_BUFFER_CAPACITY)
E.checkArgument(false,
"The max length of bytes is %s, but got %s",
BLOB_LEN_MAX, bytes.length);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.hugegraph.config;

import org.apache.hugegraph.backend.query.Query;
import org.apache.hugegraph.backend.serializer.BytesBuffer;
import org.apache.hugegraph.backend.tx.GraphTransaction;
import org.apache.hugegraph.type.define.CollectionType;
import org.apache.hugegraph.util.Bytes;
Expand Down Expand Up @@ -79,6 +80,15 @@ public class CoreOptions extends OptionHolder {
disallowEmpty(),
"text"
);
public static final ConfigOption<Integer> SERIALIZER_BUFFER_MAX_CAPACITY =
new ConfigOption<>(
"serializer.buffer_max_capacity",
"The process-wide max capacity of one serialization " +
"buffer in bytes.",
rangeInt(BytesBuffer.DEFAULT_CAPACITY,
BytesBuffer.MAX_BUFFER_CAPACITY_UPPER_BOUND),
BytesBuffer.MAX_BUFFER_CAPACITY
);
public static final ConfigOption<Boolean> RAFT_MODE =
new ConfigOption<>(
"raft.mode",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public static BytesBuffer decompress(byte[] bytes, int blockSize, float bufferRa
LZ4FastDecompressor decompressor = factory.fastDecompressor();
ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
int initBufferSize = Math.min(Math.round(bytes.length * ratio),
BytesBuffer.MAX_BUFFER_CAPACITY);
BytesBuffer.maxBufferCapacity());
BytesBuffer buf = new BytesBuffer(initBufferSize);
LZ4BlockInputStream lzInput = new LZ4BlockInputStream(bais, decompressor);
int count;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ edge.cache_type=l2
# version before 1.7.0 of apache hugegraph for your application.
backend=hstore
serializer=binary
# The process-wide max capacity of one serialization buffer in bytes
#serializer.buffer_max_capacity=134217728

store=hugegraph

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ edge.cache_type=l2
# if you want to use Cassandra/MySql/PG... as backend, please use version < 1.7.0
backend=rocksdb
serializer=binary
# The process-wide max capacity of one serialization buffer in bytes
#serializer.buffer_max_capacity=134217728

store=hugegraph

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.hugegraph.unit.serializer;

import java.awt.Point;
import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Iterator;
Expand All @@ -27,24 +28,47 @@
import java.util.TimeZone;
import java.util.UUID;

import org.apache.hugegraph.HugeFactory;
import org.apache.hugegraph.HugeGraph;
import org.apache.hugegraph.backend.id.Id;
import org.apache.hugegraph.backend.id.IdGenerator;
import org.apache.hugegraph.backend.id.IdGenerator.UuidId;
import org.apache.hugegraph.backend.serializer.BytesBuffer;
import org.apache.hugegraph.config.CoreOptions;
import org.apache.hugegraph.config.HugeConfig;
import org.apache.hugegraph.schema.PropertyKey;
import org.apache.hugegraph.testutil.Assert;
import org.apache.hugegraph.type.define.Cardinality;
import org.apache.hugegraph.type.define.DataType;
import org.apache.hugegraph.unit.BaseUnitTest;
import org.apache.hugegraph.unit.FakeObjects;
import org.apache.hugegraph.util.Blob;
import org.apache.hugegraph.util.LZ4Util;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;

public class BytesBufferTest extends BaseUnitTest {

@Before
public void setUp() throws Exception {
resetMaxBufferCapacity();
}

@After
public void tearDown() throws Exception {
resetMaxBufferCapacity();
}

private static void resetMaxBufferCapacity() throws Exception {
Field field = BytesBuffer.class.getDeclaredField("maxBufferCapacity");
field.setAccessible(true);
field.set(null, null);
}

@Test
public void testAllocate() {
Assert.assertEquals(0, BytesBuffer.allocate(0).array().length);
Expand All @@ -69,6 +93,128 @@ public void testAllocate() {
buf0.bytes());
}

@Test
public void testAllocateWithMaxBufferCapacity() {
Assert.assertEquals(BytesBuffer.MAX_BUFFER_CAPACITY,
BytesBuffer.maxBufferCapacity());

BytesBuffer.initMaxBufferCapacity(128);
Assert.assertEquals(128, BytesBuffer.maxBufferCapacity());
BytesBuffer.initMaxBufferCapacity(128);
Assert.assertEquals(128, BytesBuffer.maxBufferCapacity());
Assert.assertEquals(128, BytesBuffer.allocate(128).array().length);

Assert.assertThrows(IllegalArgumentException.class, () -> {
BytesBuffer.allocate(129);
}, e -> {
Assert.assertContains("Capacity 129 exceeds max buffer " +
"capacity: 128",
e.getMessage());
});
}

@Test
public void testResizeWithMaxBufferCapacity() {
BytesBuffer.initMaxBufferCapacity(128);
BytesBuffer buffer = BytesBuffer.allocate(64);
buffer.write(new byte[64]);
buffer.write((byte) 1);
Assert.assertEquals(65, buffer.bytes().length);
Assert.assertEquals(128, buffer.array().length);

buffer.write(new byte[63]);

Assert.assertThrows(IllegalArgumentException.class, () -> {
buffer.write((byte) 1);
}, e -> {
Assert.assertContains("Capacity 129 exceeds max buffer " +
"capacity: 128",
e.getMessage());
});
}

@Test
public void testSetMaxBufferCapacityWithInvalidValue() {
Assert.assertThrows(IllegalArgumentException.class, () -> {
BytesBuffer.initMaxBufferCapacity(
BytesBuffer.DEFAULT_CAPACITY - 1);
}, e -> {
Assert.assertContains("Max buffer capacity must be in range",
e.getMessage());
});

Assert.assertThrows(IllegalArgumentException.class, () -> {
BytesBuffer.initMaxBufferCapacity(
BytesBuffer.MAX_BUFFER_CAPACITY_UPPER_BOUND + 1);
}, e -> {
Assert.assertContains("Max buffer capacity must be in range",
e.getMessage());
});
}

@Test
public void testLZ4DecompressWithMaxBufferCapacity() {
byte[] bytes = genBytes(256);
BytesBuffer compressed = LZ4Util.compress(bytes, 64);

BytesBuffer.initMaxBufferCapacity(128);

Assert.assertThrows(IllegalArgumentException.class, () -> {
LZ4Util.decompress(compressed.bytes(), 64);
}, e -> {
Assert.assertContains("exceeds max buffer capacity: 128",
e.getMessage());
});
}

@Test
public void testMaxBufferCapacityFromGraphConfig() throws Exception {
HugeConfig config = FakeObjects.newConfig();
config.setProperty(CoreOptions.STORE.name(), "buffer_capacity");
config.setProperty(CoreOptions.SERIALIZER_BUFFER_MAX_CAPACITY.name(),
256);

HugeGraph graph = HugeFactory.open(config);
try {
Assert.assertEquals(256, BytesBuffer.maxBufferCapacity());
} finally {
graph.close();
}
}

@Test
public void testMaxBufferCapacityRejectsConflictingGraphConfig()
throws Exception {
HugeConfig config1 = FakeObjects.newConfig();
config1.setProperty(CoreOptions.STORE.name(), "buffer_capacity_1");
config1.setProperty(CoreOptions.SERIALIZER_BUFFER_MAX_CAPACITY.name(),
256);

HugeGraph graph1 = HugeFactory.open(config1);
try {
Assert.assertEquals(256, BytesBuffer.maxBufferCapacity());

HugeConfig config2 = FakeObjects.newConfig();
config2.setProperty(CoreOptions.STORE.name(),
"buffer_capacity_2");
config2.setProperty(
CoreOptions.SERIALIZER_BUFFER_MAX_CAPACITY.name(), 512);

Assert.assertThrows(IllegalArgumentException.class, () -> {
HugeFactory.open(config2);
}, e -> {
Assert.assertContains("process-wide serializer buffer max " +
"capacity has been initialized to 256",
e.getMessage());
Assert.assertContains("conflicting value 512",
e.getMessage());
});
Assert.assertEquals(256, BytesBuffer.maxBufferCapacity());
} finally {
graph1.close();
}
}

@Test
public void testWrap() {
BytesBuffer buf4 = BytesBuffer.wrap(new byte[]{1, 2, 3, 4});
Expand Down
Loading