refactor(server): optimize rockdb batch query perf#2982
Conversation
|
This CI failure is unrelated to the changes in this PR. The PR focuses on optimizing RocksDB batch query performance, and the failing build check does not involve the code modified here. |
|
Already rerun CI (also could check the tests could pass in local env) |
Temporarily use super.queryByIds() instead of getByIds() for batch version support.
|
@imbajin could you please take another look when you are convenient❤️ |
There was a problem hiding this comment.
Pull request overview
This PR introduces batched backend fetching for queryVerticesByIds() to reduce overhead when querying many vertex ids (e.g., g.V(id1, id2, ...)) by splitting backend id lookups into multiple IdQuery requests.
Changes:
- Collect backend-only vertex ids during
queryVerticesByIds()and issue backend queries in batches usingquery.batch_size. - Add per-batch capacity checks before executing backend queries.
- Minor formatting adjustment in RocksDB table iterator helper signature.
Reviewed changes
Copilot reviewed 1 out of 2 changed files in this pull request and generated 5 comments.
| File | Description |
|---|---|
| hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/backend/tx/GraphTransaction.java | Batch backend id queries in queryVerticesByIds() instead of building a single IdQuery for all backend ids. |
| hugegraph-server/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBTable.java | Formatting-only change to method parameter indentation. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if (!backendIds.isEmpty()) { | ||
| // Query from backend store | ||
| query.mustSortByInput(false); | ||
| Iterator<HugeVertex> it = this.queryVerticesFromBackend(query); | ||
| QueryResults.fillMap(it, vertices); | ||
| final int batch = this.batchSize > 0 ? this.batchSize : backendIds.size(); | ||
| for (int i = 0; i < backendIds.size(); i += batch) { | ||
| int end = Math.min(i + batch, backendIds.size()); | ||
| IdQuery query = new IdQuery(type); |
There was a problem hiding this comment.
The PR title says this is a RocksDB optimization, but the functional change here is backend-agnostic batching in GraphTransaction (affecting all stores, especially RPC-based ones). Please align the PR title (or add RocksDB-specific changes) so the intent matches what’s actually being modified.
| @@ -799,17 +800,30 @@ protected Iterator<Vertex> queryVerticesByIds(Object[] vertexIds, boolean adjace | |||
| // Found from local tx | |||
| vertices.put(vertex.id(), vertex); | |||
| } else { | |||
| // Prepare to query from backend store | |||
| query.query(id); | |||
| // store the IDs queried from backend | |||
| backendIds.add(id); | |||
| } | |||
There was a problem hiding this comment.
This change allocates backendIds as a second full list of ids (in addition to ids), which doubles memory usage for large g.V(id1,id2,...) calls (up to Query.DEFAULT_CAPACITY). Consider batching/issuing backend IdQuery requests incrementally during the main loop (flush when reaching batch size) to avoid retaining all backend ids at once.
| final int batch = this.batchSize > 0 ? this.batchSize : backendIds.size(); | ||
| for (int i = 0; i < backendIds.size(); i += batch) { | ||
| int end = Math.min(i + batch, backendIds.size()); | ||
| IdQuery query = new IdQuery(type); | ||
| for (int j = i; j < end; j++) { | ||
| Id id = backendIds.get(j); | ||
| query.query(id); | ||
| } |
There was a problem hiding this comment.
With batching, duplicated ids that fall into different batches will trigger repeated backend reads/RPCs for the same id. You can keep the output behavior (duplicates preserved via ids) while deduplicating backend fetches (e.g., track a seen-set for backendIds or build per-batch unique ids) to avoid redundant backend queries.
| query.mustSortByInput(false); | ||
| Iterator<HugeVertex> it = this.queryVerticesFromBackend(query); | ||
| QueryResults.fillMap(it, vertices); | ||
| final int batch = this.batchSize > 0 ? this.batchSize : backendIds.size(); |
There was a problem hiding this comment.
final int batch = this.batchSize > 0 ? this.batchSize : backendIds.size(); is effectively always this.batchSize because query.batch_size is configured with a minimum of 1 (see CoreOptions.QUERY_BATCH_SIZE). Removing the dead fallback branch will simplify the code.
| final int batch = this.batchSize > 0 ? this.batchSize : backendIds.size(); | |
| final int batch = this.batchSize; |
| if (!backendIds.isEmpty()) { | ||
| // Query from backend store | ||
| query.mustSortByInput(false); | ||
| Iterator<HugeVertex> it = this.queryVerticesFromBackend(query); | ||
| QueryResults.fillMap(it, vertices); | ||
| final int batch = this.batchSize > 0 ? this.batchSize : backendIds.size(); | ||
| for (int i = 0; i < backendIds.size(); i += batch) { | ||
| int end = Math.min(i + batch, backendIds.size()); | ||
| IdQuery query = new IdQuery(type); | ||
| for (int j = i; j < end; j++) { | ||
| Id id = backendIds.get(j); | ||
| query.query(id); | ||
| } | ||
| // Single batch capacity check | ||
| Query.checkForceCapacity(query.idsSize()); | ||
|
|
||
| // Query from backend store | ||
| query.mustSortByInput(false); | ||
| Iterator<HugeVertex> it = this.queryVerticesFromBackend(query); | ||
| QueryResults.fillMap(it, vertices); | ||
| } |
There was a problem hiding this comment.
The new multi-batch path isn’t covered by tests. Please add a unit/integration test that exercises queryVerticesByIds() with vertexIds.length > query.batch_size, including (1) duplicates across a batch boundary and (2) mixed local-tx + backend ids, to ensure results and NotFoundException behavior remain unchanged.
| // NOTE: allowed duplicated vertices if query by duplicated ids | ||
| List<Id> ids = InsertionOrderUtil.newList(); | ||
| Map<Id, HugeVertex> vertices = new HashMap<>(vertexIds.length); | ||
| Set<Id> fetchedIds = InsertionOrderUtil.newSet(); |
There was a problem hiding this comment.
fetchedIds 之后,非相邻的重复 id 也会被全局去重;而旧逻辑里 IdQuery.query() 只会折叠相邻重复 id。最终返回结果看起来应该还是保持重复输出,但真实的后端访问路径已经变了。建议补一个回归测试,至少覆盖 超过 query.batch_size、跨 batch 的重复 id、缺失 id + checkMustExist 这几个组合场景,避免后面再改这里时把语义悄悄带偏。
| Map<Id, HugeVertex> vertices = new HashMap<>(vertexIds.length); | ||
| Set<Id> fetchedIds = InsertionOrderUtil.newSet(); | ||
| IdQuery batchQuery = null; | ||
| final int batchSize = this.batchSize; |
There was a problem hiding this comment.
GraphTransaction 通用层后,会影响所有 backend,而不只是 issue #2674 里提到的 RPC backend。
以 RocksDB 为例,当前 queryByIds() 仍然是逐 id 展开查询,并没有真正走 multi-get;现在强制按 query.batch_size 拆成多个 IdQuery,很可能只是增加额外的 query/iterator 次数。建议把这类分批策略下沉到具体 backend,或者至少通过 feature/store type 把它限定在 HBase/HStore 这类 RPC backend 上,避免把针对性优化变成全局行为变化。
PS: 后续我们应该让 RocksDB 使用上原生的 multi-get API (这应该是之前的 TODO)
There was a problem hiding this comment.
- GraphTransaction 不再按 batch_size 拆分 ID,一次性下发完整 IdQuery;
- RocksDBTables.Vertex/Edge 已覆写 queryByIds(),在 !session.hasChanges() 时走 multiGetAsList() 原生批量读取,脏 session 安全回退到逐 id scan;
| @Override | ||
| public byte[] get(String table, byte[] key) { | ||
| throw new AssertionError( | ||
| "reads should not be performed when hasChanges"); | ||
| } | ||
|
|
| try { | ||
| BackendColumnIterator iter = this.vertexTable.queryByIds(mockSession, ids); | ||
| // FlatMapperIterator is lazy; trigger evaluation to hit the mock | ||
| iter.hasNext(); | ||
| Assert.fail("queryByIds should fail when session has pending changes"); | ||
| } catch (AssertionError e) { | ||
| Assert.assertTrue(e.getMessage().contains("hasChanges")); | ||
| } | ||
| } |
| protected BackendColumnIterator queryByIdsWithGet(RocksDBSessions.Session session, | ||
| Collection<Id> ids) { | ||
| if (ids.size() == 1) { | ||
| return this.queryById(session, ids.iterator().next()); | ||
| } | ||
|
|
||
| if (!session.hasChanges()) { | ||
| return this.getByIds(session, ids); | ||
| } | ||
|
|
||
| // NOTE: this will lead to lazy create rocksdb iterator | ||
| return BackendColumnIterator.wrap(new FlatMapperIterator<>( | ||
| ids.iterator(), id -> this.queryById(session, id) | ||
| )); | ||
| } |
| @Override | ||
| public byte[] get(String table, byte[] key) { | ||
| throw new AssertionError( | ||
| "reads should not be performed when hasChanges"); | ||
| } | ||
|
|
| try { | ||
| BackendColumnIterator iter = this.vertexTable.queryByIds(mockSession, ids); | ||
| // FlatMapperIterator is lazy; trigger evaluation to hit the mock | ||
| iter.hasNext(); | ||
| Assert.fail("queryByIds should fail when session has pending changes"); | ||
| } catch (AssertionError e) { | ||
| Assert.assertTrue(e.getMessage().contains("hasChanges")); | ||
| } | ||
| } |
imbajin
left a comment
There was a problem hiding this comment.
Found one additional test coverage issue on the current head. I also +1'd the existing Copilot comments that already cover the hasChanges fallback problem.
| List<Id> ids = Arrays.asList(id1, id2, id1); | ||
| BackendColumnIterator iter = this.vertexTable.queryByIds(this.rocks.session(), ids); | ||
|
|
||
| Map<String, String> results = toResultMap(iter); |
There was a problem hiding this comment.
This test does not actually protect the duplicate-id behavior because toResultMap() collapses duplicate rows by key before the assertion. If queryByIds() regressed and returned v1, v2, v1, the map would still contain only two entries and this test would pass. Since this PR explicitly deduplicates ids before multi-get, please assert the iterator output sequence/count directly (for example collect the column names into a list) so duplicate backend results would fail the test.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 4 out of 4 changed files in this pull request and generated 1 comment.
Comments suppressed due to low confidence (1)
hugegraph-server/hugegraph-rocksdb/src/main/java/org/apache/hugegraph/backend/store/rocksdb/RocksDBTable.java:250
getByIds()checksids.size()==1before de-duplicating, so an input like[id1, id1]takes the multi-get path even though there's only one unique key. Since this PR explicitly de-dups ids for performance, it would be more consistent (and slightly cheaper) to de-duplicate first and then apply the single-key fast path.
protected BackendColumnIterator getByIds(RocksDBSessions.Session session,
Collection<Id> ids) {
if (ids.size() == 1) {
return this.getById(session, ids.iterator().next());
}
Collection<Id> uniqueIds = ids instanceof Set ? ids : new LinkedHashSet<>(ids);
List<byte[]> keys = new ArrayList<>(uniqueIds.size());
for (Id id : uniqueIds) {
keys.add(id.asBytes());
}
return session.get(this.table(), keys);
}
| @Override | ||
| protected BackendColumnIterator queryByIds(RocksDBSessions.Session session, | ||
| Collection<Id> ids) { | ||
| // TODO: use getByIds() after batch version multi-get is ready | ||
| return super.queryByIds(session, ids); | ||
| return this.queryByIdsWithGet(session, ids); | ||
| } |
There was a problem hiding this comment.
Catch! This PR is RocksDB-only. I’ve updated the description to change closes 2674 to related 2674
|
|
||
| protected BackendColumnIterator queryByIdsWithGet(RocksDBSessions.Session session, | ||
| Collection<Id> ids) { | ||
| E.checkState(!session.hasChanges(), |
There was a problem hiding this comment.
Non-blocking: enabling queryByIdsWithGet() for vertex/edge changes pending-write behavior: any session.hasChanges() now throws before even the single-id path. Please confirm all callers reach this only after commit/rollback, or keep the old per-id fallback for pending sessions and add a test through a public query path.
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #2982 +/- ##
============================================
- Coverage 35.57% 31.14% -4.43%
+ Complexity 333 264 -69
============================================
Files 801 803 +2
Lines 67592 68232 +640
Branches 8790 8964 +174
============================================
- Hits 24045 21252 -2793
- Misses 40985 44570 +3585
+ Partials 2562 2410 -152 ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
imbajin
left a comment
There was a problem hiding this comment.
当前 head 上还有一个阻塞合入的 correctness 问题:Edge 的公开查询路径可能因为 RocksDB table 层全局去重而吞掉非相邻重复 id。hasChanges() 行为变化已有现有线程覆盖,这里不重复发 inline。
|
|
||
| List<byte[]> keys = new ArrayList<>(ids.size()); | ||
| for (Id id : ids) { | ||
| Collection<Id> uniqueIds = ids instanceof Set ? ids : new LinkedHashSet<>(ids); |
There was a problem hiding this comment.
完整路径是:
graph.edges(e1, e2, e1)
|
v
GraphTransaction.queryEdgesByIds()
|
| IdQuery.query() 只折叠相邻重复 id
| [e1, e2, e1] => query.idsSize()==3, ids.size()==3
v
edges.isEmpty() && query.idsSize() == ids.size()
|
| 命中 fast path,直接返回 backend iterator
v
RocksDBTables.Edge.queryByIds()
|
v
RocksDBTable.getByIds()
|
| new LinkedHashSet<>(ids) 全局去重
v
[e1, e2, e1] => [e1, e2]
也就是说,GraphTransaction 只有在检测到重复 id 时才会回到按原始 ids 重建结果的慢路径;但当前 fast path 只能识别相邻重复,识别不了 e1, e2, e1 这种非相邻重复。旧的 scan/flat-map 路径会按输入 id 展开 3 次查询,新路径在这里被压成 2 个 key,最终返回结果会少一条。
建议不要在 Edge 的这条 fast path 上做 table-level 全局去重,或者在 transaction 层改成能检测任意重复 id 后再决定是否直接返回 backend iterator。
| this.rocks.session().put(this.vertexTable.table(), id2.asBytes(), getBytes("value2")); | ||
| this.commit(); | ||
|
|
||
| List<Id> ids = Arrays.asList(id1, id2, id1); |
There was a problem hiding this comment.
[id1, id2, id1] 固化成“RocksDB table 层去重后只返回 2 条”,但它没有覆盖真正会出问题的公开 Edge 查询路径。
需要补一个从 Graph API / GraphTransaction 进入的回归测试,至少覆盖非相邻重复 edge id:
Object e1 = ...;
Object e2 = ...;
List<Edge> edges = ImmutableList.copyOf(graph.edges(e1, e2, e1));
Assert.assertEquals(3, edges.size());图例:
helper test:
TestEdgeTable.queryByIds([e1, e2, e1])
-> 只验证 table 层去重行为
真实风险路径:
graph.edges(e1, e2, e1)
-> GraphTransaction fast path
-> RocksDB Edge queryByIds
-> table 层去重
-> 返回 2 条而不是 3 条
如果保留当前测试,它会让“后端吞掉重复 id”看起来像预期行为;但对 graph.edges(...) 的公开语义来说,重复输入应保留重复输出,现有 graph.edges(id, id) 测试已经固定了这个契约。
imbajin
left a comment
There was a problem hiding this comment.
Current implementation looks mostly converged. I’m leaving two non-CI follow-ups: one test assertion should be tightened, and the PR verification section should be filled in.
|
|
||
| // Graph API does not guarantee duplicate results for duplicate ids | ||
| List<Edge> edges = ImmutableList.copyOf(graph.edges(id1, id2, id1)); | ||
| Assert.assertTrue(edges.size() >= 2); |
There was a problem hiding this comment.
当前 graph.edges(id1, id2, id1) 只断言 edges.size() >= 2 且结果包含 id1/id2。如果实现再次退化成错误结果 [id1, id2],这个测试仍然会通过。
这里风险路径正是:
graph.edges(id1, id2, id1)
-> GraphTransaction fast path
-> RocksDB Edge queryByIds()
-> backend iterator 直接返回
建议把断言改成精确校验 3 条结果,并确认顺序或至少确认 id1 出现两次:
Assert.assertEquals(3, edges.size());
Assert.assertEquals(id1, edges.get(0).id());
Assert.assertEquals(id2, edges.get(1).id());
Assert.assertEquals(id1, edges.get(2).id());这样才能真正覆盖这次 PR 要修住的 Graph API multiplicity 行为。
There was a problem hiding this comment.
之前就是这样的,但是在 IdQuery.query(Id) 只跳过连续重复(和上一个相同的 ID)。[id1, id2, id1] 中第二个 id1 的上一个是 id2,不相等,所以没被跳过,query.idsSize() 返回 3,跟 ids.size() 相等,快路径被误触发。ci 无法通过
There was a problem hiding this comment.
这个 assert 放在 shared EdgeCoreTest 里确实不合适,因为 memory backend 当前会返回 2,CI failure 也验证了这一点。这个 core test 作为跨 backend smoke 保留弱断言可以接受;
RocksDB-specific multiplicity 现在由 RocksDBTableQueryByIdsTest 的 iterator sequence/count 覆盖 不再阻塞这个点。
|
这个 PR 已经来回调整过几轮,最终 scope 也收敛为 RocksDB-only multi-get 优化。建议把验证项补实,至少列出当前新增/应运行的测试,例如: 这样后续 reviewer 可以直接确认当前 head 的验证边界,避免继续围绕“是否真的覆盖了最终实现路径”反复确认。 |
done |

Purpose of the PR
Main Changes
The core change in this PR is a refactor/optimization of the RocksDB batch query path in three main areas: request coalescing, result backfilling, and shard-level parallelism control.
The focus is not on the API layer, but on how batch get is executed internally and how objects are reused, with the goal of reducing query latency and memory jitter under high concurrency.
Verifying these changes
RocksDBTableQueryByIdsTest
1.testVertexQueryByIdsWithAllExistingIds
2. testVertexQueryByIdsWithExistingAndMissingIdsMixed
3. testVertexQueryByIdsDuplicateIds
4. testEdgeOutQueryByIdsWithAllExistingIds
5. testEdgeInQueryByIdsWithAllExistingIds
6. testVertexQueryByIdsFailsWhenHasChanges
7. testPublicQueryMultiIdsFails
8. testPublicQuerySingleIdFails
EdgeCoreTest
9. testQueryEdgesByNonConsecutiveDuplicateIds — graph.edges(id1, id2, id1)
RocksDBTableQueryByIdsTest(8 tests) — covers multi-get path, duplicate ids, missing ids, hasChanges guardEdgeCoreTest#testQueryEdgesByNonConsecutiveDuplicateIds— verifygraph.edges(id1, id2, id1)via public APIEdgeCoreTest#testQueryEdgesByIdWithGraphAPI— verify adjacent duplicate ids (graph.edges(id, id))Does this PR potentially affect the following parts?
Documentation Status
Doc - TODODoc - DoneDoc - No Need