fix(server): fix the scheduler and the scheduler selection logic#2937
fix(server): fix the scheduler and the scheduler selection logic#2937Tsukilc wants to merge 7 commits into
Conversation
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #2937 +/- ##
============================================
- Coverage 35.96% 1.57% -34.39%
+ Complexity 338 43 -295
============================================
Files 803 781 -22
Lines 68053 65288 -2765
Branches 8907 8410 -497
============================================
- Hits 24472 1026 -23446
- Misses 40955 64176 +23221
+ Partials 2626 86 -2540 ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
47ec466 to
68b906a
Compare
There was a problem hiding this comment.
Pull request overview
This PR disables the master-worker scheduling logic in StandardTaskScheduler, transitioning to a simplified task execution model. The scheduler type is now auto-determined based on storage backend (distributed for hstore, local otherwise), and server.id is auto-generated if not specified.
Changes:
- Removed SCHEDULER_TYPE configuration option and master-role scheduling logic from TaskManager and StandardTaskScheduler
- Removed server.id and server.role configuration requirements from rest-server.properties
- Added auto-generation of server.id using UUID when not explicitly configured
Reviewed changes
Copilot reviewed 17 out of 17 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| CoreOptions.java (hugegraph-struct) | Removed SCHEDULER_TYPE configuration option |
| CoreOptions.java (hugegraph-core) | Removed SCHEDULER_TYPE configuration option |
| TaskCoreTest.java | Updated tests with conditional assertions for DistributedTaskScheduler and code formatting changes |
| rest-server.properties | Removed server.id and server.role configuration entries |
| hugegraph.properties | Removed task.scheduler_type configuration entry |
| hstore.properties.template | Removed task.scheduler_type configuration entry |
| TaskManager.java | Removed scheduling thread pool and master/worker role-based scheduling logic |
| StandardTaskScheduler.java | Simplified to remove multi-node task distribution, server filtering, and load balancing |
| ServerInfoManager.java | Removed pickWorkerNode, updateServerInfos, and load management methods |
| HugeServerInfo.java | Removed suitableFor method for task-server matching |
| StandardRoleListener.java | Removed enableRoleElection call during initialization |
| GlobalMasterInfo.java | Updated TODO comment |
| StandardHugeGraph.java | Changed schedulerType to be dynamically determined based on backend type |
| GraphManager.java | Added auto-generation of server.id and removed validation requirements |
| ServerOptions.java | Changed SERVER_ID default to empty string and updated description |
| hugegraph.properties.template | Removed task.scheduler_type from cluster test configuration |
| server2-conf/hugegraph.properties | Removed task.scheduler_type from Docker configuration |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| HugeTask<?> memTask = this.tasks.get(task.id()); | ||
| if (memTask != null) { | ||
| boolean cancelled = memTask.cancel(true); | ||
| LOG.info("Task '{}' cancel result: {}", task.id(), cancelled); |
There was a problem hiding this comment.
The cancel method no longer saves the task to storage when cancelling a task that's in memory. If the task is running in memory and gets cancelled via memTask.cancel(true), the cancelled status won't be persisted to storage until the task naturally completes. This could cause issues if the server restarts before the task finishes - the task would be restored and re-executed. Consider saving the task status to storage after successfully calling memTask.cancel(true).
| LOG.info("Task '{}' cancel result: {}", task.id(), cancelled); | |
| LOG.info("Task '{}' cancel result: {}", task.id(), cancelled); | |
| if (cancelled) { | |
| this.save(memTask); | |
| } |
There was a problem hiding this comment.
this.callable.cancelled()会负责调用 taskSchduler 持久化
5416720 to
f8fc58a
Compare
|
|
||
| if (callable instanceof TaskCallable.SysTaskCallable) { | ||
| ((TaskCallable.SysTaskCallable<?>) callable).params(this.graph); | ||
| } |
There was a problem hiding this comment.
在 DistributedTaskScheduler.delete() 方法第 286-305 行,删除逻辑与原实现存在重大差异:
原逻辑:
force=false: 设置状态为 DELETING,返回 nullforce=true: 直接从数据库删除
新逻辑:
if (!force) {
if (!task.completed() && task.status() != TaskStatus.DELETING) {
throw new IllegalArgumentException(
String.format("Can't delete incomplete task '%s' in status %s, " +
"Please try to cancel the task first",
id, task.status()));
}
}
return this.deleteFromDB(id);问题:
- 移除了 DELETING 状态的设置逻辑,这可能破坏依赖定时清理的代码
- 非强制删除现在会直接删除完成的任务,而不是先标记为 DELETING
- 与
StandardTaskScheduler的实现可能不一致
建议:重新考虑删除流程,保持与原有逻辑的兼容性,或在 PR 描述中明确说明此行为变更
| LOG.info("cancel task({}) error, task has completed", task.id()); | ||
| task.overwriteStatus(TaskStatus.CANCELLING); | ||
| } | ||
| } |
There was a problem hiding this comment.
第 334 行的 TODO 注释过于模糊:
//todo: serverInfoManager section should be removed in the future.
return this.serverManager().close();
//return true;问题:
- 未说明为什么要移除 serverInfoManager
- 未说明移除的时间节点或前提条件
- 注释掉的代码应该删除,而不是保留
建议:
| } | |
| // TODO(issue-XXX): Remove serverInfoManager.close() after migrating to | |
| // pure single-node architecture. Currently kept for backward compatibility. | |
| return this.serverManager().close(); |
代码审查总结感谢提交这个重要的架构简化 PR!我已经详细审查了所有变更,除了已经在具体代码行发布的评论外,还有以下关键问题需要关注:
|
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 20 out of 20 changed files in this pull request and generated 12 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| @Override | ||
| public String schedulerType() { | ||
| return StandardHugeGraph.this.schedulerType; | ||
| // Use distributed scheduler for hstore backend, otherwise use local |
There was a problem hiding this comment.
The schedulerType() method now determines the scheduler type based on whether the backend is hstore. However, this logic change is undocumented. Consider adding a comment explaining why hstore backends require distributed scheduling while other backends use local scheduling, as this is an important architectural decision.
| // Use distributed scheduler for hstore backend, otherwise use local | |
| /* | |
| * HStore is a distributed backend: data and tasks may be handled by | |
| * multiple graph servers that must coordinate scheduling and state. | |
| * For this reason we require a distributed task scheduler when the | |
| * backend is hstore so that jobs can be balanced and recovered | |
| * across nodes. For other backends, the graph is served by a single | |
| * server instance and tasks are executed locally, so a local | |
| * in-process scheduler is sufficient and avoids the overhead of | |
| * distributed coordination. | |
| */ |
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 20 out of 20 changed files in this pull request and generated 7 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
027b71a to
e3eb7fc
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 19 out of 19 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
Due to the lack of activity, the current pr is marked as stale and will be closed after 180 days, any update will remove the stale label |
imbajin
left a comment
There was a problem hiding this comment.
I found a few remaining scheduler compatibility/test-coverage issues on the current head. The existing current-head Copilot comments for the graph exception type and cron shutdown ordering are still valid, so I added +1 reactions there instead of duplicating them.
| return StandardHugeGraph.this.schedulerType; | ||
| // Use distributed scheduler for hstore backend, otherwise use local | ||
| // After the merger of rocksdb and hstore, consider whether to change this logic | ||
| return StandardHugeGraph.this.isHstore() ? "distributed" : "local"; |
There was a problem hiding this comment.
task.scheduler_type=distributed will silently fall back to local after upgrade. HugeConfig only warns for an unregistered option, and TaskManager.addScheduler() still trusts only graph.schedulerType() here, so there is no migration guard or fail-fast path. Please either keep the old option as an override during migration, or fail fast when the removed option is still configured, and add a compatibility regression test for that upgrade path.
| Assert.assertContains("Can't delete incomplete task '88888'", | ||
| e.getMessage()); | ||
| }); | ||
| if (scheduler.getClass().equals(StandardTaskScheduler.class)) { |
There was a problem hiding this comment.
DistributedTaskScheduler, even though this PR changes DistributedTaskScheduler.delete(). Please add an explicit distributed case instead of skipping it: what should delete(id, false) do for a running task, what should happen for a completed task, and what final state should be persisted. Otherwise the test no longer proves the changed distributed delete semantics.
|
|
||
| HugeTask<Object> task2 = scheduler.task(task.id()); | ||
| Assert.assertThrows(IllegalArgumentException.class, () -> { | ||
| // because Distributed do nothing in restore, so only test StandardTaskScheduler here |
There was a problem hiding this comment.
CANCELLING || CANCELLED. That leaves the most sensitive distributed state transitions unpinned. Please add distributed-specific regression coverage for the observable cancel state, the final persisted state, and the restore/close behavior instead of excluding the scheduler here.
imbajin
left a comment
There was a problem hiding this comment.
Found two high-confidence issues in the distributed scheduler path. Existing Copilot comments already cover the graph exception-type and cronFuture waiting issues, so I am not duplicating those.
|
|
||
| if (!force && !task.completed()) { | ||
| // Check task status: can't delete running tasks without force | ||
| this.updateStatus(id, null, TaskStatus.DELETING); |
There was a problem hiding this comment.
Deleting a running task can race with the task's own cancellation callbacks and resurrect the task record. In this branch the scheduler cancels the in-memory task and immediately removes the task vertex from DB, but HugeTask.cancel() invokes TaskCallable.cancelled(), and both UserJob.cancelled() / SysJob.cancelled() call save(). Since taskDone() is now a no-op, a long-running job deleted via delete(id, false) can be removed here and then written back as CANCELLED during cancellation cleanup. Please make deletion win over later task saves, or defer physical deletion until the runner has fully stopped and can no longer persist the task.
| // Only inject if not already configured in graph config | ||
| if (!config.containsKey("pd.peers")) { | ||
| String pdPeers = this.conf.get(ServerOptions.PD_PEERS); | ||
| config.addProperty("pd.peers", pdPeers); |
There was a problem hiding this comment.
Non-blocking: this now injects server-level pd.peers into every graph config when the graph config does not contain it. Please guard this with a non-empty/PD-enabled check, or add coverage for standalone RocksDB graphs, so a default or empty server-level PD setting does not silently change graph startup behavior.
| @Override | ||
| public <V> HugeTask<V> delete(Id id, boolean force) { | ||
| if (!force) { | ||
| // Change status to DELETING, perform the deletion operation through automatic | ||
| // scheduling. | ||
| HugeTask<?> task = this.taskWithoutResult(id); | ||
|
|
||
| if (!force && !task.completed()) { |
| this.distributedSchedulerExecutor = | ||
| ExecutorUtil.newPausableScheduledThreadPool(1, DISTRIBUTED_TASK_SCHEDULER); | ||
|
|
||
| // For a schedule task to run, just one thread is ok | ||
| this.schedulerExecutor = ExecutorUtil.newPausableScheduledThreadPool( | ||
| 1, TASK_SCHEDULER); | ||
| // Start after 10x period time waiting for HugeGraphServer startup | ||
| this.schedulerExecutor.scheduleWithFixedDelay(this::scheduleOrExecuteJob, | ||
| 10 * SCHEDULE_PERIOD, | ||
| SCHEDULE_PERIOD, | ||
| TimeUnit.MILLISECONDS); | ||
| } |
| import java.util.Iterator; | ||
| import java.util.concurrent.Callable; | ||
| import java.util.concurrent.CancellationException; | ||
| import java.util.concurrent.ConcurrentHashMap; | ||
| import java.util.concurrent.ExecutionException; |
| // Check task status: can't delete running tasks without force | ||
| this.updateStatus(id, null, TaskStatus.DELETING); | ||
| return null; | ||
| } else { | ||
| return this.deleteFromDB(id); | ||
| // Already in DELETING status, delete directly from DB | ||
| // Completed tasks can also be deleted directly |
| import org.apache.hugegraph.task.TaskManager; | ||
| import org.apache.hugegraph.util.Log; | ||
| import org.slf4j.Logger; | ||
|
|
||
| import java.util.Objects; | ||
|
|
| graph1.updateTime(updateTime); | ||
| this.graphs.put(key, graph1); | ||
| return graph1; | ||
| throw new NotFoundException(String.format("Graph '%s' does not exist", name)); |
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 22 out of 22 changed files in this pull request and generated 3 comments.
Comments suppressed due to low confidence (1)
hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/ServerInfoManager.java:1
ServerInfoManageris described as 'soft-disabled', butclose()still opens/closes the system transaction (this.tx()) and callsgraph.closeTx(), which can touch the backend store during shutdown. This can introduce shutdown-time failures (e.g.,ConnectionException) or unwanted backend dependency when ServerInfo is intentionally disabled. Consider makingclose()avoid backend interaction in this compatibility mode (e.g., only mark closed + stop executors), or guard the tx-close path behind a feature flag / explicit enablement.
/*
| LOG.info("Cancel task '{}' in status {}", task.id(), task.status()); | ||
|
|
||
| // Check if task is running locally, cancel it directly if so | ||
| HugeTask<?> runningTask = this.runningTasks.get(task.id()); | ||
| if (runningTask != null) { | ||
| boolean cancelled = runningTask.cancel(true); | ||
| if (cancelled) { | ||
| task.overwriteStatus(TaskStatus.CANCELLED); | ||
| } | ||
| LOG.info("Cancel local running task '{}' result: {}", task.id(), cancelled); | ||
| return; | ||
| } |
| public HugeGraph graph(String graphSpace, String name) { | ||
| String key = String.join(DELIMITER, graphSpace, name); | ||
| Graph graph = this.graphs.get(key); | ||
| if (graph == null && isPDEnabled()) { | ||
| Map<String, Map<String, Object>> configs = | ||
| this.metaManager.graphConfigs(graphSpace); | ||
| // If current server registered graph space is not DEFAULT, only load graph creation | ||
| // under registered graph space | ||
| if (!configs.containsKey(key) || | ||
| (!"DEFAULT".equals(this.serviceGraphSpace) && | ||
| !graphSpace.equals(this.serviceGraphSpace))) { | ||
| return null; | ||
| if (graph == null) { | ||
| if (isPDEnabled()) { | ||
| Map<String, Map<String, Object>> configs = | ||
| this.metaManager.graphConfigs(graphSpace); | ||
| // If current server registered graph space is not DEFAULT, only load graph creation | ||
| // under registered graph space | ||
| if (!configs.containsKey(key) || | ||
| (!"DEFAULT".equals(this.serviceGraphSpace) && | ||
| !graphSpace.equals(this.serviceGraphSpace))) { | ||
| return null; | ||
| } | ||
| Map<String, Object> config = configs.get(key); | ||
| String creator = String.valueOf(config.get("creator")); | ||
| Date createTime = parseDate(config.get("create_time")); | ||
| Date updateTime = parseDate(config.get("update_time")); | ||
| HugeGraph graph1 = this.createGraph(graphSpace, name, | ||
| creator, config, false); | ||
| graph1.createTime(createTime); | ||
| graph1.updateTime(updateTime); | ||
| this.graphs.put(key, graph1); | ||
| return graph1; | ||
| } | ||
| Map<String, Object> config = configs.get(key); | ||
| String creator = String.valueOf(config.get("creator")); | ||
| Date createTime = parseDate(config.get("create_time")); | ||
| Date updateTime = parseDate(config.get("update_time")); | ||
| HugeGraph graph1 = this.createGraph(graphSpace, name, | ||
| creator, config, false); | ||
| graph1.createTime(createTime); | ||
| graph1.updateTime(updateTime); | ||
| this.graphs.put(key, graph1); | ||
| return graph1; | ||
| throw new NotFoundException(String.format("Graph '%s' does not exist", name)); |
| public <V> void restoreTasks() { | ||
| Id selfServer = this.serverManager().selfNodeId(); | ||
| List<HugeTask<V>> taskList = new ArrayList<>(); | ||
| // Restore 'RESTORING', 'RUNNING' and 'QUEUED' tasks in order. | ||
| // Single-node mode: restore all pending tasks without server filtering | ||
| for (TaskStatus status : TaskStatus.PENDING_STATUSES) { | ||
| String page = this.supportsPaging() ? PageInfo.PAGE_NONE : null; | ||
| do { | ||
| Iterator<HugeTask<V>> iter; | ||
| for (iter = this.findTask(status, PAGE_SIZE, page); | ||
| iter.hasNext(); ) { | ||
| HugeTask<V> task = iter.next(); | ||
| if (selfServer.equals(task.server())) { | ||
| taskList.add(task); | ||
| } | ||
| taskList.add(task); | ||
| } |
New Features
Server IDs now support automatic generation, eliminating the need for manual configuration.
Refactoring
Significantly simplified the task scheduling architecture by adopting a single-node scheduling path by default, removing multi-node scheduling and role-election–related controls.
Streamlined server information and scheduling management logic by eliminating redundant multi-node–related workflows.
Configuration Changes
Removed explicit scheduling type configuration; the scheduling mode is now determined by the runtime environment.
Bug Fixes
Adjusted the handling logic for task cancellation/deletion and missing tasks, improving exception handling and state management.
Tests
Updated test cases to accommodate the new scheduling behavior and serialization differences.