diff --git a/hugegraph-cluster-test/hugegraph-clustertest-dist/src/assembly/static/conf/hugegraph.properties.template b/hugegraph-cluster-test/hugegraph-clustertest-dist/src/assembly/static/conf/hugegraph.properties.template index 005031fe60..f97e365748 100644 --- a/hugegraph-cluster-test/hugegraph-clustertest-dist/src/assembly/static/conf/hugegraph.properties.template +++ b/hugegraph-cluster-test/hugegraph-clustertest-dist/src/assembly/static/conf/hugegraph.properties.template @@ -45,7 +45,6 @@ store=hugegraph pd.peers=$PD_PEERS_LIST$ # task config -task.scheduler_type=local task.schedule_period=10 task.retry=0 task.wait_timeout=10 diff --git a/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/config/ServerOptions.java b/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/config/ServerOptions.java index 278542854b..f699ac199c 100644 --- a/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/config/ServerOptions.java +++ b/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/config/ServerOptions.java @@ -556,9 +556,9 @@ public class ServerOptions extends OptionHolder { public static final ConfigOption SERVER_ID = new ConfigOption<>( "server.id", - "The id of hugegraph-server.", - disallowEmpty(), - "server-1" + "The id of hugegraph-server, auto-generated if not specified.", + null, + "" ); public static final ConfigOption SERVER_ROLE = new ConfigOption<>( diff --git a/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/core/GraphManager.java b/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/core/GraphManager.java index 770e75cc74..cfd1d93f57 100644 --- a/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/core/GraphManager.java +++ b/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/core/GraphManager.java @@ -58,7 +58,6 @@ import org.apache.hugegraph.backend.BackendException; import org.apache.hugegraph.backend.cache.Cache; import org.apache.hugegraph.backend.cache.CacheManager; -import org.apache.hugegraph.backend.id.IdGenerator; import org.apache.hugegraph.backend.store.AbstractBackendStoreProvider; import org.apache.hugegraph.backend.store.BackendStoreInfo; import org.apache.hugegraph.config.ConfigOption; @@ -68,6 +67,7 @@ import org.apache.hugegraph.config.TypedOption; import org.apache.hugegraph.event.EventHub; import org.apache.hugegraph.exception.ExistedException; +import org.apache.hugegraph.exception.NotFoundException; import org.apache.hugegraph.exception.NotSupportException; import org.apache.hugegraph.io.HugeGraphSONModule; import org.apache.hugegraph.k8s.K8sDriver; @@ -195,8 +195,6 @@ public final class GraphManager { public GraphManager(HugeConfig conf, EventHub hub) { LOG.info("Init graph manager"); E.checkArgumentNotNull(conf, "The config can't be null"); - String server = conf.get(ServerOptions.SERVER_ID); - String role = conf.get(ServerOptions.SERVER_ROLE); this.config = conf; this.url = conf.get(ServerOptions.REST_SERVER_URL); @@ -206,10 +204,6 @@ public GraphManager(HugeConfig conf, EventHub hub) { conf.get(ServerOptions.SERVER_DEPLOY_IN_K8S); this.startIgnoreSingleGraphError = conf.get( ServerOptions.SERVER_START_IGNORE_SINGLE_GRAPH_ERROR); - E.checkArgument(server != null && !server.isEmpty(), - "The server name can't be null or empty"); - E.checkArgument(role != null && !role.isEmpty(), - "The server role can't be null or empty"); this.graphsDir = conf.get(ServerOptions.GRAPHS); this.cluster = conf.get(ServerOptions.CLUSTER); this.graphSpaces = new ConcurrentHashMap<>(); @@ -1557,6 +1551,9 @@ private void loadGraph(String name, String graphConfPath) { String raftGroupPeers = this.conf.get(ServerOptions.RAFT_GROUP_PEERS); config.addProperty(ServerOptions.RAFT_GROUP_PEERS.name(), raftGroupPeers); + + this.transferPdPeersConfig(config); + this.transferRoleWorkerConfig(config); Graph graph = GraphFactory.open(config); @@ -1575,6 +1572,19 @@ private void loadGraph(String name, String graphConfPath) { } } + private void transferPdPeersConfig(HugeConfig config) { + if (config.containsKey(CoreOptions.PD_PEERS.name())) { + return; + } + + String backend = config.get(CoreOptions.BACKEND); + boolean needPdPeers = this.conf.get(ServerOptions.USE_PD) || + "hstore".equals(backend); + if (needPdPeers) { + config.addProperty(CoreOptions.PD_PEERS.name(), this.pdPeers); + } + } + private void transferRoleWorkerConfig(HugeConfig config) { config.setProperty(RoleElectionOptions.NODE_EXTERNAL_URL.name(), this.conf.get(ServerOptions.REST_SERVER_URL)); @@ -1635,23 +1645,14 @@ private void checkBackendVersionOrExit(HugeConfig config) { } private void initNodeRole() { - String id = config.get(ServerOptions.SERVER_ID); + boolean enableRoleElection = config.get( + ServerOptions.ENABLE_SERVER_ROLE_ELECTION); + E.checkArgument(!enableRoleElection, + "The server.role_election is no longer supported"); + String role = config.get(ServerOptions.SERVER_ROLE); - E.checkArgument(StringUtils.isNotEmpty(id), - "The server name can't be null or empty"); - E.checkArgument(StringUtils.isNotEmpty(role), - "The server role can't be null or empty"); NodeRole nodeRole = NodeRole.valueOf(role.toUpperCase()); - boolean supportRoleElection = !nodeRole.computer() && - this.supportRoleElection() && - config.get(ServerOptions.ENABLE_SERVER_ROLE_ELECTION); - if (supportRoleElection) { - // Init any server as Worker role, then do role election - nodeRole = NodeRole.WORKER; - } - - this.globalNodeRoleInfo.initNodeId(IdGenerator.of(id)); this.globalNodeRoleInfo.initNodeRole(nodeRole); } @@ -1937,26 +1938,29 @@ public Set getServiceUrls(String graphSpace, String service, public HugeGraph graph(String graphSpace, String name) { String key = String.join(DELIMITER, graphSpace, name); Graph graph = this.graphs.get(key); - if (graph == null && isPDEnabled()) { - Map> configs = - this.metaManager.graphConfigs(graphSpace); - // If current server registered graph space is not DEFAULT, only load graph creation - // under registered graph space - if (!configs.containsKey(key) || - (!"DEFAULT".equals(this.serviceGraphSpace) && - !graphSpace.equals(this.serviceGraphSpace))) { - return null; + if (graph == null) { + if (isPDEnabled()) { + Map> configs = + this.metaManager.graphConfigs(graphSpace); + // If current server registered graph space is not DEFAULT, only load graph creation + // under registered graph space + if (!configs.containsKey(key) || + (!"DEFAULT".equals(this.serviceGraphSpace) && + !graphSpace.equals(this.serviceGraphSpace))) { + return null; + } + Map config = configs.get(key); + String creator = String.valueOf(config.get("creator")); + Date createTime = parseDate(config.get("create_time")); + Date updateTime = parseDate(config.get("update_time")); + HugeGraph graph1 = this.createGraph(graphSpace, name, + creator, config, false); + graph1.createTime(createTime); + graph1.updateTime(updateTime); + this.graphs.put(key, graph1); + return graph1; } - Map config = configs.get(key); - String creator = String.valueOf(config.get("creator")); - Date createTime = parseDate(config.get("create_time")); - Date updateTime = parseDate(config.get("update_time")); - HugeGraph graph1 = this.createGraph(graphSpace, name, - creator, config, false); - graph1.createTime(createTime); - graph1.updateTime(updateTime); - this.graphs.put(key, graph1); - return graph1; + throw new NotFoundException(String.format("Graph '%s' does not exist", name)); } else if (graph instanceof HugeGraph) { return (HugeGraph) graph; } diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java index f8f24ab626..338824e233 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java @@ -176,7 +176,6 @@ public class StandardHugeGraph implements HugeGraph { private final BackendStoreProvider storeProvider; private final TinkerPopTransaction tx; private final RamTable ramtable; - private final String schedulerType; private volatile boolean started; private volatile boolean closed; private volatile GraphMode mode; @@ -229,7 +228,6 @@ public StandardHugeGraph(HugeConfig config) { this.closed = false; this.mode = GraphMode.NONE; this.readMode = GraphReadMode.OLTP_ONLY; - this.schedulerType = config.get(CoreOptions.SCHEDULER_TYPE); LockUtil.init(this.spaceGraphName()); @@ -315,6 +313,7 @@ public String backend() { return this.storeProvider.type(); } + @Override public BackendStoreInfo backendStoreInfo() { // Just for trigger Tx.getOrNewTransaction, then load 3 stores // TODO: pass storeProvider.metaStore() @@ -332,11 +331,10 @@ public void serverStarted(GlobalMasterInfo nodeInfo) { LOG.info("Init system info for graph '{}'", this.spaceGraphName()); this.initSystemInfo(); - LOG.info("Init server info [{}-{}] for graph '{}'...", - nodeInfo.nodeId(), nodeInfo.nodeRole(), this.spaceGraphName()); - this.serverInfoManager().initServerInfo(nodeInfo); - - this.initRoleStateMachine(nodeInfo.nodeId()); + if (nodeInfo != null && nodeInfo.nodeId() != null) { + this.serverInfoManager().initServerInfo(nodeInfo); + this.initRoleStateMachine(nodeInfo.nodeId()); + } // TODO: check necessary? LOG.info("Check olap property-key tables for graph '{}'", this.spaceGraphName()); @@ -465,6 +463,7 @@ public void updateTime(Date updateTime) { this.updateTime = updateTime; } + @Override public void waitStarted() { // Just for trigger Tx.getOrNewTransaction, then load 3 stores this.schemaTransaction(); @@ -481,9 +480,7 @@ public void initBackend() { try { this.storeProvider.init(); /* - * NOTE: The main goal is to write the serverInfo to the central - * node, such as etcd, and also create the system schema in memory, - * which has no side effects + * NOTE: Create system schema in memory, which has no side effects. */ this.initSystemInfo(); } finally { @@ -524,8 +521,7 @@ public void truncateBackend() { LockUtil.lock(this.spaceGraphName(), LockUtil.GRAPH_LOCK); try { this.storeProvider.truncate(); - // TODO: remove this after serverinfo saved in etcd - this.serverStarted(this.serverInfoManager().globalNodeRoleInfo()); + this.serverStarted(null); } finally { LockUtil.unlock(this.spaceGraphName(), LockUtil.GRAPH_LOCK); } @@ -547,7 +543,6 @@ public KvStore kvStore() { public void initSystemInfo() { try { this.taskScheduler().init(); - this.serverInfoManager().init(); this.authManager().init(); } finally { this.closeTx(); @@ -1632,7 +1627,9 @@ public void submitEphemeralJob(EphemeralJob job) { @Override public String schedulerType() { - return StandardHugeGraph.this.schedulerType; + // Use distributed scheduler for hstore backend, otherwise use local + // After the merger of rocksdb and hstore, consider whether to change this logic + return StandardHugeGraph.this.isHstore() ? "distributed" : "local"; } } diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/config/CoreOptions.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/config/CoreOptions.java index ba4d4a1c0e..72a2da9324 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/config/CoreOptions.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/config/CoreOptions.java @@ -303,13 +303,7 @@ public class CoreOptions extends OptionHolder { rangeInt(1, 500), 1 ); - public static final ConfigOption SCHEDULER_TYPE = - new ConfigOption<>( - "task.scheduler_type", - "The type of scheduler used in distribution system.", - allowValues("local", "distributed"), - "local" - ); + public static final ConfigOption TASK_SYNC_DELETION = new ConfigOption<>( "task.sync_deletion", diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/masterelection/GlobalMasterInfo.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/masterelection/GlobalMasterInfo.java index c345c50e60..4856744459 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/masterelection/GlobalMasterInfo.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/masterelection/GlobalMasterInfo.java @@ -22,7 +22,7 @@ import org.apache.hugegraph.type.define.NodeRole; import org.apache.hugegraph.util.E; -// TODO: rename to GlobalNodeRoleInfo +// TODO: We need to completely delete the startup of master-worker public final class GlobalMasterInfo { private static final NodeInfo NO_MASTER = new NodeInfo(false, ""); diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/masterelection/StandardRoleListener.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/masterelection/StandardRoleListener.java index dbbea6d91e..74515dacec 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/masterelection/StandardRoleListener.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/masterelection/StandardRoleListener.java @@ -17,12 +17,12 @@ package org.apache.hugegraph.masterelection; -import java.util.Objects; - import org.apache.hugegraph.task.TaskManager; import org.apache.hugegraph.util.Log; import org.slf4j.Logger; +import java.util.Objects; + public class StandardRoleListener implements RoleListener { private static final Logger LOG = Log.logger(StandardRoleListener.class); @@ -36,7 +36,6 @@ public class StandardRoleListener implements RoleListener { public StandardRoleListener(TaskManager taskManager, GlobalMasterInfo roleInfo) { this.taskManager = taskManager; - this.taskManager.enableRoleElection(); this.roleInfo = roleInfo; this.selfIsMaster = false; } diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/DistributedTaskScheduler.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/DistributedTaskScheduler.java index b4bba2ea12..b12bd3a088 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/DistributedTaskScheduler.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/DistributedTaskScheduler.java @@ -19,7 +19,9 @@ import java.util.Iterator; import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -48,6 +50,7 @@ import org.slf4j.Logger; public class DistributedTaskScheduler extends TaskAndResultScheduler { + private static final Logger LOG = Log.logger(DistributedTaskScheduler.class); private final long schedulePeriod; private final ExecutorService taskDbExecutor; @@ -118,6 +121,11 @@ private static boolean sleep(long ms) { public void cronSchedule() { // Perform periodic scheduling tasks + // Check closed flag first to exit early + if (this.closed.get()) { + return; + } + if (!this.graph.started() || this.graph.closed()) { return; } @@ -253,6 +261,10 @@ public Future schedule(HugeTask task) { return this.ephemeralTaskExecutor.submit(task); } + // Validate task state before saving to ensure correct exception type + E.checkState(task.type() != null, "Task type can't be null"); + E.checkState(task.name() != null, "Task name can't be null"); + // Process schema task // Handle gremlin task // Handle OLAP calculation tasks @@ -284,14 +296,41 @@ protected void initTaskParams(HugeTask task) { } } + /** + * Note: This method will update the status of the input task. + * + * @param task + * @param + */ @Override public void cancel(HugeTask task) { - // Update status to CANCELLING - if (!task.completed()) { - // Task not completed, can only execute status not CANCELLING - this.updateStatus(task.id(), null, TaskStatus.CANCELLING); + E.checkArgumentNotNull(task, "Task can't be null"); + + if (task.completed() || task.cancelling()) { + return; + } + + LOG.info("Cancel task '{}' in status {}", task.id(), task.status()); + + // Check if task is running locally, cancel it directly if so + HugeTask runningTask = this.runningTasks.get(task.id()); + if (runningTask != null) { + boolean cancelled = runningTask.cancel(true); + if (cancelled) { + task.overwriteStatus(TaskStatus.CANCELLED); + } + LOG.info("Cancel local running task '{}' result: {}", task.id(), cancelled); + return; + } + + // Task not running locally, update status to CANCELLING + // for cronSchedule() or other nodes to handle + TaskStatus currentStatus = task.status(); + if (!this.updateStatus(task.id(), currentStatus, TaskStatus.CANCELLING)) { + LOG.info("Failed to cancel task '{}', status may have changed from {}", + task.id(), currentStatus); } else { - LOG.info("cancel task({}) error, task has completed", task.id()); + task.overwriteStatus(TaskStatus.CANCELLING); } } @@ -316,14 +355,18 @@ protected HugeTask deleteFromDB(Id id) { @Override public HugeTask delete(Id id, boolean force) { - if (!force) { - // Change status to DELETING, perform the deletion operation through automatic - // scheduling. + HugeTask task = this.taskWithoutResult(id); + + if (!force && !task.completed()) { + // Check task status: can't delete running tasks without force this.updateStatus(id, null, TaskStatus.DELETING); return null; - } else { - return this.deleteFromDB(id); + // Already in DELETING status, delete directly from DB + // Completed tasks can also be deleted directly } + + // Delete from DB directly for completed/DELETING tasks or force=true + return this.deleteFromDB(id); } @Override @@ -353,6 +396,21 @@ public boolean close() { cronFuture.cancel(false); } + // Wait behind the scheduler thread to ensure any running cron task is completed + try { + Future barrier = this.schedulerExecutor.submit(() -> { + // pass + }); + barrier.get(schedulePeriod + 5, TimeUnit.SECONDS); + } catch (TimeoutException e) { + LOG.warn("Cron task did not complete in time when closing scheduler"); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.warn("Interrupted while waiting for cron task to complete", e); + } catch (ExecutionException e) { + LOG.warn("Exception while waiting for cron task to complete", e); + } + if (!this.taskDbExecutor.isShutdown()) { this.call(() -> { try { @@ -363,7 +421,10 @@ public boolean close() { this.graph.closeTx(); }); } - return true; + + //todo: serverInfoManager section should be removed in the future. + return this.serverManager().close(); + //return true; } @Override @@ -437,9 +498,7 @@ public void waitUntilAllTasksCompleted(long seconds) @Override public void checkRequirement(String op) { - if (!this.serverManager().selfIsMaster()) { - throw new HugeException("Can't %s task on non-master server", op); - } + // Distributed scheduler uses task locks to coordinate workers. } @Override diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/HugeServerInfo.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/HugeServerInfo.java index 71feb3f688..f0485f6656 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/HugeServerInfo.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/HugeServerInfo.java @@ -209,14 +209,6 @@ public static HugeServerInfo fromVertex(Vertex vertex) { return serverInfo; } - public boolean suitableFor(HugeTask task, long now) { - if (task.computer() != this.role.computer()) { - return false; - } - return this.updateTime.getTime() + EXPIRED_INTERVAL >= now && - this.load() + task.load() <= this.maxLoad; - } - public static Schema schema(HugeGraphParams graph) { return new Schema(graph); } diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/ServerInfoManager.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/ServerInfoManager.java index 1560ec0b8f..e88c68ea95 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/ServerInfoManager.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/ServerInfoManager.java @@ -17,55 +17,26 @@ package org.apache.hugegraph.task; -import static org.apache.hugegraph.backend.query.Query.NO_LIMIT; - -import java.util.Collection; -import java.util.Iterator; -import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import org.apache.hugegraph.HugeException; -import org.apache.hugegraph.HugeGraph; import org.apache.hugegraph.HugeGraphParams; import org.apache.hugegraph.backend.id.Id; import org.apache.hugegraph.backend.id.IdGenerator; -import org.apache.hugegraph.backend.page.PageInfo; -import org.apache.hugegraph.backend.query.Condition; -import org.apache.hugegraph.backend.query.ConditionQuery; -import org.apache.hugegraph.backend.query.QueryResults; import org.apache.hugegraph.backend.tx.GraphTransaction; import org.apache.hugegraph.exception.ConnectionException; -import org.apache.hugegraph.iterator.ListIterator; -import org.apache.hugegraph.iterator.MapperIterator; import org.apache.hugegraph.masterelection.GlobalMasterInfo; -import org.apache.hugegraph.schema.PropertyKey; -import org.apache.hugegraph.schema.VertexLabel; -import org.apache.hugegraph.structure.HugeVertex; -import org.apache.hugegraph.type.HugeType; -import org.apache.hugegraph.type.define.HugeKeys; import org.apache.hugegraph.type.define.NodeRole; -import org.apache.hugegraph.util.DateUtil; import org.apache.hugegraph.util.E; -import org.apache.hugegraph.util.Log; -import org.apache.tinkerpop.gremlin.structure.Vertex; -import org.slf4j.Logger; - -import com.google.common.collect.ImmutableMap; public class ServerInfoManager { - private static final Logger LOG = Log.logger(ServerInfoManager.class); - - public static final long MAX_SERVERS = 100000L; - public static final long PAGE_SIZE = 10L; - private final HugeGraphParams graph; private final ExecutorService dbExecutor; private volatile GlobalMasterInfo globalNodeInfo; - private volatile boolean onlySingleNode; private volatile boolean closed; public ServerInfoManager(HugeGraphParams graph, ExecutorService dbExecutor) { @@ -77,18 +48,16 @@ public ServerInfoManager(HugeGraphParams graph, ExecutorService dbExecutor) { this.globalNodeInfo = null; - this.onlySingleNode = false; this.closed = false; } public void init() { - HugeServerInfo.schema(this.graph).initSchemaIfNeeded(); + // ServerInfo is soft-disabled; keep this method for compatibility. } public synchronized boolean close() { this.closed = true; if (!this.dbExecutor.isShutdown()) { - this.removeSelfServerInfo(); this.call(() -> { try { this.tx().close(); @@ -106,53 +75,14 @@ public synchronized void initServerInfo(GlobalMasterInfo nodeInfo) { E.checkArgument(nodeInfo != null, "The global node info can't be null"); this.globalNodeInfo = nodeInfo; - - Id serverId = this.selfNodeId(); - HugeServerInfo existed = this.serverInfo(serverId); - if (existed != null && existed.alive()) { - final long now = DateUtil.now().getTime(); - if (existed.expireTime() > now + 30 * 1000) { - LOG.info("The node time maybe skew very much: {}", existed); - throw new HugeException("The server with name '%s' maybe skew very much", serverId); - } - try { - Thread.sleep(existed.expireTime() - now + 1); - } catch (InterruptedException e) { - throw new HugeException("Interrupted when waiting for server info expired", e); - } - } - E.checkArgument(existed == null || !existed.alive(), - "The server with name '%s' already in cluster", serverId); - - if (nodeInfo.nodeRole().master()) { - String page = this.supportsPaging() ? PageInfo.PAGE_NONE : null; - do { - Iterator servers = this.serverInfos(PAGE_SIZE, page); - while (servers.hasNext()) { - existed = servers.next(); - E.checkArgument(!existed.role().master() || !existed.alive(), - "Already existed master '%s' in current cluster", - existed.id()); - } - if (page != null) { - page = PageInfo.pageInfo(servers); - } - } while (page != null); - } - - // TODO: save ServerInfo to AuthServer - this.saveServerInfo(this.selfNodeId(), this.selfNodeRole()); } public synchronized void changeServerRole(NodeRole nodeRole) { - if (this.closed) { + if (this.closed || this.globalNodeInfo == null) { return; } this.globalNodeInfo.changeNodeRole(nodeRole); - - // TODO: save ServerInfo to AuthServer - this.saveServerInfo(this.selfNodeId(), this.selfNodeRole()); } public GlobalMasterInfo globalNodeRoleInfo() { @@ -163,9 +93,13 @@ public Id selfNodeId() { if (this.globalNodeInfo == null) { return null; } + Id nodeId = this.globalNodeInfo.nodeId(); + if (nodeId == null) { + return null; + } // Scope server id to graph to avoid cross-graph collision return IdGenerator.of(this.graph.spaceGraphName() + "/" + - this.globalNodeInfo.nodeId().asString()); + nodeId.asString()); } public NodeRole selfNodeRole() { @@ -179,98 +113,8 @@ public boolean selfIsMaster() { return this.selfNodeRole() != null && this.selfNodeRole().master(); } - public boolean onlySingleNode() { - // Only exists one node in the whole master - return this.onlySingleNode; - } - public synchronized void heartbeat() { - assert this.graphIsReady(); - - HugeServerInfo serverInfo = this.selfServerInfo(); - if (serverInfo != null) { - // Update heartbeat time for this server - serverInfo.updateTime(DateUtil.now()); - this.save(serverInfo); - return; - } - - /* ServerInfo is missing */ - if (this.selfNodeId() == null) { - // Ignore if ServerInfo is not initialized - LOG.info("ServerInfo is missing: {}, may not be initialized yet", this.selfNodeId()); - return; - } - if (this.selfIsMaster()) { - // On the master node, just wait for ServerInfo re-init - LOG.warn("ServerInfo is missing: {}, may be cleared before", this.selfNodeId()); - return; - } - /* - * Missing server info on non-master node, may be caused by graph - * truncated on master node then synced by raft. - * TODO: we just patch it here currently, to be improved. - */ - serverInfo = this.saveServerInfo(this.selfNodeId(), this.selfNodeRole()); - assert serverInfo != null; - } - - public synchronized void decreaseLoad(int load) { - assert load > 0 : load; - HugeServerInfo serverInfo = this.selfServerInfo(); - serverInfo.increaseLoad(-load); - this.save(serverInfo); - } - - public int calcMaxLoad() { - // TODO: calc max load based on CPU and Memory resources - return 10000; - } - - protected boolean graphIsReady() { - return !this.closed && this.graph.started() && this.graph.initialized(); - } - - protected synchronized HugeServerInfo pickWorkerNode(Collection servers, - HugeTask task) { - HugeServerInfo master = null; - HugeServerInfo serverWithMinLoad = null; - int minLoad = Integer.MAX_VALUE; - boolean hasWorkerNode = false; - long now = DateUtil.now().getTime(); - - // Iterate servers to find suitable one - for (HugeServerInfo server : servers) { - if (!server.alive()) { - continue; - } - if (server.role().master()) { - master = server; - continue; - } - hasWorkerNode = true; - if (!server.suitableFor(task, now)) { - continue; - } - if (server.load() < minLoad) { - minLoad = server.load(); - serverWithMinLoad = server; - } - } - - boolean singleNode = !hasWorkerNode; - if (singleNode != this.onlySingleNode) { - LOG.info("Switch only_single_node to {}", singleNode); - this.onlySingleNode = singleNode; - } - - // Only schedule to master if there are no workers and master are suitable - if (!hasWorkerNode) { - if (master != null && master.suitableFor(task, now)) { - serverWithMinLoad = master; - } - } - return serverWithMinLoad; + // ServerInfo heartbeat is deprecated for local scheduling. } private GraphTransaction tx() { @@ -278,57 +122,6 @@ private GraphTransaction tx() { return this.graph.systemTransaction(); } - private HugeServerInfo saveServerInfo(Id serverId, NodeRole serverRole) { - HugeServerInfo serverInfo = new HugeServerInfo(serverId, serverRole); - serverInfo.maxLoad(this.calcMaxLoad()); - this.save(serverInfo); - - LOG.info("Init server info: {}", serverInfo); - return serverInfo; - } - - private Id save(HugeServerInfo serverInfo) { - return this.call(() -> { - // Construct vertex from server info - HugeServerInfo.Schema schema = HugeServerInfo.schema(this.graph); - if (!schema.existVertexLabel(HugeServerInfo.P.SERVER)) { - throw new HugeException("Schema is missing for %s '%s'", - HugeServerInfo.P.SERVER, serverInfo); - } - HugeVertex vertex = this.tx().constructVertex(false, serverInfo.asArray()); - // Add or update server info in backend store - vertex = this.tx().addVertex(vertex); - return vertex.id(); - }); - } - - private int save(Collection serverInfos) { - return this.call(() -> { - if (serverInfos.isEmpty()) { - return 0; - } - HugeServerInfo.Schema schema = HugeServerInfo.schema(this.graph); - if (!schema.existVertexLabel(HugeServerInfo.P.SERVER)) { - throw new HugeException("Schema is missing for %s", HugeServerInfo.P.SERVER); - } - // Save server info in batch - GraphTransaction tx = this.tx(); - int updated = 0; - for (HugeServerInfo server : serverInfos) { - if (!server.updated()) { - continue; - } - HugeVertex vertex = tx.constructVertex(false, server.asArray()); - tx.addVertex(vertex); - updated++; - } - // NOTE: actually it is auto-commit, to be improved - tx.commitOrRollback(); - - return updated; - }); - } - private V call(Callable callable) { assert !Thread.currentThread().getName().startsWith( "server-info-db-worker") : "can't call by itself"; @@ -342,110 +135,4 @@ private V call(Callable callable) { e, e.toString()); } } - - private HugeServerInfo selfServerInfo() { - HugeServerInfo selfServerInfo = this.serverInfo(this.selfNodeId()); - if (selfServerInfo == null && this.selfNodeId() != null) { - LOG.warn("ServerInfo is missing: {}", this.selfNodeId()); - } - return selfServerInfo; - } - - private HugeServerInfo serverInfo(Id serverId) { - return this.call(() -> { - Iterator vertices = this.tx().queryServerInfos(serverId); - Vertex vertex = QueryResults.one(vertices); - if (vertex == null) { - return null; - } - return HugeServerInfo.fromVertex(vertex); - }); - } - - private HugeServerInfo removeSelfServerInfo() { - /* - * Check this.selfServerId != null to avoid graph.initialized() call. - * NOTE: graph.initialized() may throw exception if we can't connect to - * backend store, initServerInfo() is not called in this case, so - * this.selfServerId is null at this time. - */ - if (this.selfNodeId() != null && this.graph.initialized()) { - return this.removeServerInfo(this.selfNodeId()); - } - return null; - } - - private HugeServerInfo removeServerInfo(Id serverId) { - if (serverId == null) { - return null; - } - LOG.info("Remove server info: {}", serverId); - return this.call(() -> { - Iterator vertices = this.tx().queryServerInfos(serverId); - Vertex vertex = QueryResults.one(vertices); - if (vertex == null) { - return null; - } - this.tx().removeVertex((HugeVertex) vertex); - return HugeServerInfo.fromVertex(vertex); - }); - } - - protected void updateServerInfos(Collection serverInfos) { - this.save(serverInfos); - } - - protected Collection allServerInfos() { - Iterator infos = this.serverInfos(NO_LIMIT, null); - try (ListIterator iter = new ListIterator<>( - MAX_SERVERS, infos)) { - return iter.list(); - } catch (Exception e) { - throw new HugeException("Failed to close server info iterator", e); - } - } - - protected Iterator serverInfos(String page) { - return this.serverInfos(ImmutableMap.of(), PAGE_SIZE, page); - } - - protected Iterator serverInfos(long limit, String page) { - return this.serverInfos(ImmutableMap.of(), limit, page); - } - - private Iterator serverInfos(Map conditions, - long limit, String page) { - return this.call(() -> { - ConditionQuery query; - if (this.graph.backendStoreFeatures().supportsTaskAndServerVertex()) { - query = new ConditionQuery(HugeType.SERVER); - } else { - query = new ConditionQuery(HugeType.VERTEX); - } - if (page != null) { - query.page(page); - } - - HugeGraph graph = this.graph.graph(); - VertexLabel vl = graph.vertexLabel(HugeServerInfo.P.SERVER); - query.eq(HugeKeys.LABEL, vl.id()); - for (Map.Entry entry : conditions.entrySet()) { - PropertyKey pk = graph.propertyKey(entry.getKey()); - query.query(Condition.eq(pk.id(), entry.getValue())); - } - query.showHidden(true); - if (limit != NO_LIMIT) { - query.limit(limit); - } - Iterator vertices = this.tx().queryServerInfos(query); - Iterator servers = - new MapperIterator<>(vertices, HugeServerInfo::fromVertex); - // Convert iterator to list to avoid across thread tx accessed - return QueryResults.toList(servers); - }); - } - - private boolean supportsPaging() { - return this.graph.graph().backendStoreFeatures().supportsQueryByPage(); - } } diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/StandardTaskScheduler.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/StandardTaskScheduler.java index 5f60792af1..79dd98c0f4 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/StandardTaskScheduler.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/StandardTaskScheduler.java @@ -18,7 +18,6 @@ package org.apache.hugegraph.task; import java.util.ArrayList; -import java.util.Collection; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -125,11 +124,9 @@ private TaskTransaction tx() { // NOTE: only the owner thread can access task tx if (this.taskTx == null) { /* - * NOTE: don't synchronized(this) due to scheduler thread hold - * this lock through scheduleTasks(), then query tasks and wait - * for db-worker thread after call(), the tx may not be initialized - * but can't catch this lock, then cause deadlock. - * We just use this.serverManager as a monitor here + * NOTE: don't synchronized(this) to avoid potential deadlock + * when multiple threads are accessing task transaction. + * We use this.serverManager as a monitor here for thread safety. */ synchronized (this.serverManager) { if (this.taskTx == null) { @@ -146,9 +143,9 @@ private TaskTransaction tx() { @Override public void restoreTasks() { - Id selfServer = this.serverManager().selfNodeId(); List> taskList = new ArrayList<>(); // Restore 'RESTORING', 'RUNNING' and 'QUEUED' tasks in order. + // Single-node mode: restore all pending tasks without server filtering for (TaskStatus status : TaskStatus.PENDING_STATUSES) { String page = this.supportsPaging() ? PageInfo.PAGE_NONE : null; do { @@ -156,9 +153,7 @@ public void restoreTasks() { for (iter = this.findTask(status, PAGE_SIZE, page); iter.hasNext(); ) { HugeTask task = iter.next(); - if (selfServer.equals(task.server())) { - taskList.add(task); - } + taskList.add(task); } if (page != null) { page = PageInfo.pageInfo(iter); @@ -211,30 +206,9 @@ public Future schedule(HugeTask task) { return this.submitTask(task); } - // Check this is on master for normal task schedule - this.checkOnMasterNode("schedule"); - if (this.serverManager().onlySingleNode() && !task.computer()) { - /* - * Speed up for single node, submit the task immediately, - * this code can be removed without affecting code logic - */ - task.status(TaskStatus.QUEUED); - task.server(this.serverManager().selfNodeId()); - this.save(task); - return this.submitTask(task); - } else { - /* - * Just set the SCHEDULING status and save the task, - * it will be scheduled by periodic scheduler worker - */ - task.status(TaskStatus.SCHEDULING); - this.save(task); - - // Notify master server to schedule and execute immediately - TaskManager.instance().notifyNewTask(task); - - return task; - } + task.status(TaskStatus.QUEUED); + this.save(task); + return this.submitTask(task); } private Future submitTask(HugeTask task) { @@ -273,7 +247,6 @@ public void initTaskCallable(HugeTask task) { @Override public synchronized void cancel(HugeTask task) { E.checkArgumentNotNull(task, "Task can't be null"); - this.checkOnMasterNode("cancel"); if (task.completed() || task.cancelling()) { return; @@ -281,31 +254,15 @@ public synchronized void cancel(HugeTask task) { LOG.info("Cancel task '{}' in status {}", task.id(), task.status()); - if (task.server() == null) { - // The task not scheduled to workers, set canceled immediately - assert task.status().code() < TaskStatus.QUEUED.code(); - if (task.status(TaskStatus.CANCELLED)) { - this.save(task); - return; - } - } else if (task.status(TaskStatus.CANCELLING)) { - // The task scheduled to workers, let the worker node to cancel + HugeTask memTask = this.tasks.get(task.id()); + if (memTask != null) { + boolean cancelled = memTask.cancel(true); + LOG.info("Task '{}' cancel result: {}", task.id(), cancelled); + return; + } + + if (task.status(TaskStatus.CANCELLED)) { this.save(task); - assert task.server() != null : task; - assert this.serverManager().selfIsMaster(); - if (!task.server().equals(this.serverManager().selfNodeId())) { - /* - * Remove the task from memory if it's running on worker node, - * but keep the task in memory if it's running on master node. - * Cancel-scheduling will read the task from backend store, if - * removed this instance from memory, there will be two task - * instances with the same id, and can't cancel the real task that - * is running but removed from memory. - */ - this.remove(task); - } - // Notify master server to schedule and execute immediately - TaskManager.instance().notifyNewTask(task); return; } @@ -318,128 +275,11 @@ public ServerInfoManager serverManager() { return this.serverManager; } - protected synchronized void scheduleTasksOnMaster() { - // Master server schedule all scheduling tasks to suitable worker nodes - Collection serverInfos = this.serverManager().allServerInfos(); - String page = this.supportsPaging() ? PageInfo.PAGE_NONE : null; - do { - Iterator> tasks = this.tasks(TaskStatus.SCHEDULING, PAGE_SIZE, page); - while (tasks.hasNext()) { - HugeTask task = tasks.next(); - if (task.server() != null) { - // Skip if already scheduled - continue; - } - - if (!this.serverManager.selfIsMaster()) { - return; - } - - HugeServerInfo server = this.serverManager().pickWorkerNode(serverInfos, task); - if (server == null) { - LOG.info("The master can't find suitable servers to " + - "execute task '{}', wait for next schedule", task.id()); - continue; - } - - // Found suitable server, update task status - assert server.id() != null; - task.server(server.id()); - task.status(TaskStatus.SCHEDULED); - this.save(task); - - // Update server load in memory, it will be saved at the ending - server.increaseLoad(task.load()); - - LOG.info("Scheduled task '{}' to server '{}'", task.id(), server.id()); - } - if (page != null) { - page = PageInfo.pageInfo(tasks); - } - } while (page != null); - - // Save to store - this.serverManager().updateServerInfos(serverInfos); - } - - protected void executeTasksOnWorker(Id server) { - String page = this.supportsPaging() ? PageInfo.PAGE_NONE : null; - do { - Iterator> tasks = this.tasks(TaskStatus.SCHEDULED, PAGE_SIZE, page); - while (tasks.hasNext()) { - HugeTask task = tasks.next(); - this.initTaskCallable(task); - Id taskServer = task.server(); - if (taskServer == null) { - LOG.warn("Task '{}' may not be scheduled", task.id()); - continue; - } - HugeTask memTask = this.tasks.get(task.id()); - if (memTask != null) { - assert memTask.status().code() > task.status().code(); - continue; - } - if (taskServer.equals(server)) { - task.status(TaskStatus.QUEUED); - this.save(task); - this.submitTask(task); - } - } - if (page != null) { - page = PageInfo.pageInfo(tasks); - } - } while (page != null); - } - - protected void cancelTasksOnWorker(Id server) { - String page = this.supportsPaging() ? PageInfo.PAGE_NONE : null; - do { - Iterator> tasks = this.tasks(TaskStatus.CANCELLING, PAGE_SIZE, page); - while (tasks.hasNext()) { - HugeTask task = tasks.next(); - Id taskServer = task.server(); - if (taskServer == null) { - LOG.warn("Task '{}' may not be scheduled", task.id()); - continue; - } - if (!taskServer.equals(server)) { - continue; - } - /* - * Task may be loaded from backend store and not initialized. - * like: A task is completed but failed to save in the last - * step, resulting in the status of the task not being - * updated to storage, the task is not in memory, so it's not - * initialized when canceled. - */ - HugeTask memTask = this.tasks.get(task.id()); - if (memTask != null) { - task = memTask; - } else { - this.initTaskCallable(task); - } - boolean cancelled = task.cancel(true); - LOG.info("Server '{}' cancel task '{}' with cancelled={}", - server, task.id(), cancelled); - } - if (page != null) { - page = PageInfo.pageInfo(tasks); - } - } while (page != null); - } - @Override public void taskDone(HugeTask task) { this.remove(task); - - Id selfServerId = this.serverManager().selfNodeId(); - try { - this.serverManager().decreaseLoad(task.load()); - } catch (Throwable e) { - LOG.error("Failed to decrease load for task '{}' on server '{}'", - task.id(), selfServerId, e); - } - LOG.debug("Task '{}' done on server '{}'", task.id(), selfServerId); + // Single-node mode: no need to manage load + LOG.debug("Task '{}' done", task.id()); } protected void remove(HugeTask task) { @@ -738,10 +578,9 @@ public V call(Callable callable) { } } + @Deprecated private void checkOnMasterNode(String op) { - if (!this.serverManager().selfIsMaster()) { - throw new HugeException("Can't %s task on non-master server", op); - } + // Single-node mode: all operations are allowed, no role check needed } private boolean supportsPaging() { diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskAndResultScheduler.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskAndResultScheduler.java index 2ba3fd8a6d..6c99ef156d 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskAndResultScheduler.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskAndResultScheduler.java @@ -46,6 +46,7 @@ * Base class of task & result scheduler */ public abstract class TaskAndResultScheduler implements TaskScheduler { + /** * Which graph the scheduler belongs to */ @@ -61,8 +62,8 @@ public abstract class TaskAndResultScheduler implements TaskScheduler { private final ServerInfoManager serverManager; public TaskAndResultScheduler( - HugeGraphParams graph, - ExecutorService serverInfoDbExecutor) { + HugeGraphParams graph, + ExecutorService serverInfoDbExecutor) { E.checkNotNull(graph, "graph"); this.graph = graph; @@ -90,7 +91,7 @@ public void save(HugeTask task) { // Save result outcome if (rawResult != null) { HugeTaskResult result = - new HugeTaskResult(HugeTaskResult.genId(task.id())); + new HugeTaskResult(HugeTaskResult.genId(task.id())); result.result(rawResult); this.call(() -> { @@ -164,7 +165,7 @@ protected Iterator> queryTask(Map conditions, } Iterator vertices = this.tx().queryTaskInfos(query); Iterator> tasks = - new MapperIterator<>(vertices, HugeTask::fromVertex); + new MapperIterator<>(vertices, HugeTask::fromVertex); // Convert iterator to list to avoid across thread tx accessed return QueryResults.toList(tasks); }); @@ -180,16 +181,16 @@ protected Iterator> queryTask(Map conditions, protected Iterator> queryTask(List ids) { ListIterator> ts = this.call( - () -> { - Object[] idArray = ids.toArray(new Id[ids.size()]); - Iterator vertices = this.tx() - .queryTaskInfos(idArray); - Iterator> tasks = - new MapperIterator<>(vertices, - HugeTask::fromVertex); - // Convert iterator to list to avoid across thread tx accessed - return QueryResults.toList(tasks); - }); + () -> { + Object[] idArray = ids.toArray(new Id[ids.size()]); + Iterator vertices = this.tx() + .queryTaskInfos(idArray); + Iterator> tasks = + new MapperIterator<>(vertices, + HugeTask::fromVertex); + // Convert iterator to list to avoid across thread tx accessed + return QueryResults.toList(tasks); + }); Iterator results = queryTaskResult(ids); @@ -201,7 +202,7 @@ protected Iterator> queryTask(List ids) { return new MapperIterator<>(ts, (task) -> { HugeTaskResult taskResult = - resultCaches.get(HugeTaskResult.genId(task.id())); + resultCaches.get(HugeTaskResult.genId(task.id())); if (taskResult != null) { task.result(taskResult); } @@ -219,6 +220,10 @@ protected HugeTask taskWithoutResult(Id id) { return HugeTask.fromVertex(vertex); }); + if (result == null) { + throw new NotFoundException("Can't find task with id '%s'", id); + } + return result; } @@ -227,7 +232,7 @@ protected Iterator> tasksWithoutResult(List ids) { Object[] idArray = ids.toArray(new Id[ids.size()]); Iterator vertices = this.tx().queryTaskInfos(idArray); Iterator> tasks = - new MapperIterator<>(vertices, HugeTask::fromVertex); + new MapperIterator<>(vertices, HugeTask::fromVertex); // Convert iterator to list to avoid across thread tx accessed return QueryResults.toList(tasks); }); @@ -250,7 +255,7 @@ protected Iterator> queryTaskWithoutResult(String key, } protected Iterator> queryTaskWithoutResult(Map conditions, long limit, String page) { + Object> conditions, long limit, String page) { return this.call(() -> { ConditionQuery query = new ConditionQuery(HugeType.TASK); if (page != null) { @@ -268,7 +273,7 @@ protected Iterator> queryTaskWithoutResult(Map vertices = this.tx().queryTaskInfos(query); Iterator> tasks = - new MapperIterator<>(vertices, HugeTask::fromVertex); + new MapperIterator<>(vertices, HugeTask::fromVertex); // Convert iterator to list to avoid across thread tx accessed return QueryResults.toList(tasks); }); @@ -277,7 +282,7 @@ protected Iterator> queryTaskWithoutResult(Map { Iterator vertices = - this.tx().queryTaskInfos(HugeTaskResult.genId(taskid)); + this.tx().queryTaskInfos(HugeTaskResult.genId(taskid)); Vertex vertex = QueryResults.one(vertices); if (vertex == null) { return null; @@ -292,12 +297,12 @@ protected HugeTaskResult queryTaskResult(Id taskid) { protected Iterator queryTaskResult(List taskIds) { return this.call(() -> { Object[] idArray = - taskIds.stream().map(HugeTaskResult::genId).toArray(); + taskIds.stream().map(HugeTaskResult::genId).toArray(); Iterator vertices = this.tx() .queryTaskInfos(idArray); Iterator tasks = - new MapperIterator<>(vertices, - HugeTaskResult::fromVertex); + new MapperIterator<>(vertices, + HugeTaskResult::fromVertex); // Convert iterator to list to avoid across thread tx accessed return QueryResults.toList(tasks); }); diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskManager.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskManager.java index 277822a386..ec063754d8 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskManager.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskManager.java @@ -18,7 +18,6 @@ package org.apache.hugegraph.task; import java.util.Map; -import java.util.Queue; import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; @@ -29,11 +28,9 @@ import org.apache.hugegraph.HugeException; import org.apache.hugegraph.HugeGraphParams; import org.apache.hugegraph.concurrent.PausableScheduledThreadPool; -import org.apache.hugegraph.type.define.NodeRole; import org.apache.hugegraph.util.Consumers; import org.apache.hugegraph.util.E; import org.apache.hugegraph.util.ExecutorUtil; -import org.apache.hugegraph.util.LockUtil; import org.apache.hugegraph.util.Log; import org.slf4j.Logger; @@ -76,8 +73,6 @@ public final class TaskManager { private final ExecutorService ephemeralTaskExecutor; private final PausableScheduledThreadPool distributedSchedulerExecutor; - private boolean enableRoleElected = false; - public static TaskManager instance() { return MANAGER; } @@ -102,11 +97,6 @@ private TaskManager(int pool) { // For a schedule task to run, just one thread is ok this.schedulerExecutor = ExecutorUtil.newPausableScheduledThreadPool( 1, TASK_SCHEDULER); - // Start after 10x period time waiting for HugeGraphServer startup - this.schedulerExecutor.scheduleWithFixedDelay(this::scheduleOrExecuteJob, - 10 * SCHEDULE_PERIOD, - SCHEDULE_PERIOD, - TimeUnit.MILLISECONDS); } public void addScheduler(HugeGraphParams graph) { @@ -230,14 +220,6 @@ private void closeDistributedSchedulerTx(HugeGraphParams graph) { } } - public void pauseScheduledThreadPool() { - this.schedulerExecutor.pauseSchedule(); - } - - public void resumeScheduledThreadPool() { - this.schedulerExecutor.resumeSchedule(); - } - public TaskScheduler getScheduler(HugeGraphParams graph) { return this.schedulers.get(graph); } @@ -349,125 +331,12 @@ public int pendingTasks() { return size; } - public void enableRoleElection() { - this.enableRoleElected = true; - } - public void onAsRoleMaster() { - try { - for (TaskScheduler entry : this.schedulers.values()) { - ServerInfoManager serverInfoManager = entry.serverManager(); - if (serverInfoManager != null) { - serverInfoManager.changeServerRole(NodeRole.MASTER); - } else { - LOG.warn("ServerInfoManager is null for graph {}", entry.spaceGraphName()); - } - } - } catch (Throwable e) { - LOG.error("Exception occurred when change to master role", e); - throw e; - } + // ServerInfo based role propagation is deprecated. } public void onAsRoleWorker() { - try { - for (TaskScheduler entry : this.schedulers.values()) { - ServerInfoManager serverInfoManager = entry.serverManager(); - if (serverInfoManager != null) { - serverInfoManager.changeServerRole(NodeRole.WORKER); - } else { - LOG.warn("ServerInfoManager is null for graph {}", entry.spaceGraphName()); - } - } - } catch (Throwable e) { - LOG.error("Exception occurred when change to worker role", e); - throw e; - } - } - - void notifyNewTask(HugeTask task) { - Queue queue = this.schedulerExecutor - .getQueue(); - if (queue.size() <= 1) { - /* - * Notify to schedule tasks initiatively when have new task - * It's OK to not notify again if there are more than one task in - * queue(like two, one is timer task, one is immediate task), - * we don't want too many immediate tasks to be inserted into queue, - * one notify will cause all the tasks to be processed. - */ - this.schedulerExecutor.submit(this::scheduleOrExecuteJob); - } - } - - private void scheduleOrExecuteJob() { - // Called by scheduler timer - try { - for (TaskScheduler entry : this.schedulers.values()) { - // Maybe other threads close&remove scheduler at the same time - synchronized (entry) { - this.scheduleOrExecuteJobForGraph(entry); - } - } - } catch (Throwable e) { - LOG.error("Exception occurred when schedule job", e); - } - } - - private void scheduleOrExecuteJobForGraph(TaskScheduler scheduler) { - E.checkNotNull(scheduler, "scheduler"); - - if (scheduler instanceof StandardTaskScheduler) { - StandardTaskScheduler standardTaskScheduler = (StandardTaskScheduler) (scheduler); - ServerInfoManager serverManager = scheduler.serverManager(); - String spaceGraphName = scheduler.spaceGraphName(); - - LockUtil.lock(spaceGraphName, LockUtil.GRAPH_LOCK); - try { - /* - * Skip if: - * graph is closed (iterate schedulers before graph is closing) - * or - * graph is not initialized(maybe truncated or cleared). - * - * If graph is closing by other thread, current thread get - * serverManager and try lock graph, at the same time other - * thread deleted the lock-group, current thread would get - * exception 'LockGroup xx does not exists'. - * If graph is closed, don't call serverManager.initialized() - * due to it will reopen graph tx. - */ - if (!serverManager.graphIsReady()) { - return; - } - - // Update server heartbeat - serverManager.heartbeat(); - - /* - * Master will schedule tasks to suitable servers. - * Note a Worker may become to a Master, so elected-Master also needs to - * execute tasks assigned by previous Master when enableRoleElected=true. - * However, when enableRoleElected=false, a Master is only set by the - * config assignment, assigned-Master always stays the same state. - */ - if (serverManager.selfIsMaster()) { - standardTaskScheduler.scheduleTasksOnMaster(); - if (!this.enableRoleElected && !serverManager.onlySingleNode()) { - // assigned-Master + non-single-node don't need to execute tasks - return; - } - } - - // Execute queued tasks scheduled to current server - standardTaskScheduler.executeTasksOnWorker(serverManager.selfNodeId()); - - // Cancel tasks scheduled to current server - standardTaskScheduler.cancelTasksOnWorker(serverManager.selfNodeId()); - } finally { - LockUtil.unlock(spaceGraphName, LockUtil.GRAPH_LOCK); - } - } + // ServerInfo based role propagation is deprecated. } private static final ThreadLocal CONTEXTS = new ThreadLocal<>(); diff --git a/hugegraph-server/hugegraph-dist/src/assembly/static/conf/graphs/hstore.properties.template b/hugegraph-server/hugegraph-dist/src/assembly/static/conf/graphs/hstore.properties.template index d3834baf5c..fd2782a87d 100644 --- a/hugegraph-server/hugegraph-dist/src/assembly/static/conf/graphs/hstore.properties.template +++ b/hugegraph-server/hugegraph-dist/src/assembly/static/conf/graphs/hstore.properties.template @@ -31,7 +31,6 @@ store=hugegraph pd.peers=127.0.0.1:8686 # task config -task.scheduler_type=local task.schedule_period=10 task.retry=0 task.wait_timeout=10 diff --git a/hugegraph-server/hugegraph-dist/src/assembly/static/conf/graphs/hugegraph.properties b/hugegraph-server/hugegraph-dist/src/assembly/static/conf/graphs/hugegraph.properties index b77cacb2de..3727919bbb 100644 --- a/hugegraph-server/hugegraph-dist/src/assembly/static/conf/graphs/hugegraph.properties +++ b/hugegraph-server/hugegraph-dist/src/assembly/static/conf/graphs/hugegraph.properties @@ -30,7 +30,6 @@ store=hugegraph #pd.peers=127.0.0.1:8686 # task config -task.scheduler_type=local task.schedule_period=10 task.retry=0 task.wait_timeout=10 diff --git a/hugegraph-server/hugegraph-dist/src/assembly/static/conf/rest-server.properties b/hugegraph-server/hugegraph-dist/src/assembly/static/conf/rest-server.properties index ad3e2700f8..eba2ed1f5d 100644 --- a/hugegraph-server/hugegraph-dist/src/assembly/static/conf/rest-server.properties +++ b/hugegraph-server/hugegraph-dist/src/assembly/static/conf/rest-server.properties @@ -23,9 +23,6 @@ arthas.disabled_commands=jad #auth.admin_pa=pa #auth.graph_store=hugegraph -# lightweight load balancing (TODO: legacy mode, remove soon) -server.id=server-1 -server.role=master # use pd # usePD=true diff --git a/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/core/MultiGraphsTest.java b/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/core/MultiGraphsTest.java index 4fae0f76c6..5c34236857 100644 --- a/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/core/MultiGraphsTest.java +++ b/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/core/MultiGraphsTest.java @@ -248,7 +248,7 @@ public void testCreateGraphsWithInvalidNames() { @Test public void testCreateGraphsWithSameName() { - List graphs = openGraphs("g", "g", "G"); + List graphs = openGraphs("gg", "gg", "GG"); HugeGraph g1 = graphs.get(0); HugeGraph g2 = graphs.get(1); HugeGraph g3 = graphs.get(2); diff --git a/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/core/TaskCoreTest.java b/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/core/TaskCoreTest.java index 212ccc0588..3811a46f02 100644 --- a/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/core/TaskCoreTest.java +++ b/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/core/TaskCoreTest.java @@ -17,8 +17,8 @@ package org.apache.hugegraph.core; -import java.util.Arrays; import java.util.Iterator; +import java.util.List; import java.util.Random; import java.util.concurrent.TimeoutException; @@ -33,6 +33,7 @@ import org.apache.hugegraph.job.GremlinJob; import org.apache.hugegraph.job.JobBuilder; import org.apache.hugegraph.task.HugeTask; +import org.apache.hugegraph.task.StandardTaskScheduler; import org.apache.hugegraph.task.TaskCallable; import org.apache.hugegraph.task.TaskScheduler; import org.apache.hugegraph.task.TaskStatus; @@ -76,12 +77,14 @@ public void testTask() throws TimeoutException { Assert.assertEquals(id, task.id()); Assert.assertFalse(task.completed()); - Assert.assertThrows(IllegalArgumentException.class, () -> { - scheduler.delete(id, false); - }, e -> { - Assert.assertContains("Can't delete incomplete task '88888'", - e.getMessage()); - }); + if (scheduler.getClass().equals(StandardTaskScheduler.class)) { + Assert.assertThrows(IllegalArgumentException.class, () -> { + scheduler.delete(id, false); + }, e -> { + Assert.assertContains("Can't delete incomplete task '88888'", + e.getMessage()); + }); + } task = scheduler.waitUntilTaskCompleted(task.id(), 10); Assert.assertEquals(id, task.id()); @@ -89,7 +92,7 @@ public void testTask() throws TimeoutException { Assert.assertEquals(TaskStatus.SUCCESS, task.status()); Assert.assertEquals("test-task", scheduler.task(id).name()); - Assert.assertEquals("test-task", scheduler.tasks(Arrays.asList(id)) + Assert.assertEquals("test-task", scheduler.tasks(List.of(id)) .next().name()); Iterator> iter = scheduler.tasks(ImmutableList.of(id)); @@ -196,13 +199,18 @@ public Object execute() throws Exception { Assert.assertEquals("test", task.type()); Assert.assertFalse(task.completed()); - HugeTask task2 = scheduler.waitUntilTaskCompleted(task.id(), 10); + // Ephemeral tasks are node-local and not persisted to DB. + // Use Future.get() to wait for completion instead of ID-based lookup. + try { + task.get(10, java.util.concurrent.TimeUnit.SECONDS); + } catch (Exception e) { + throw new RuntimeException("Ephemeral task execution failed", e); + } + Assert.assertEquals(TaskStatus.SUCCESS, task.status()); Assert.assertEquals("{\"k1\":13579,\"k2\":\"24680\"}", task.result()); - Assert.assertEquals(TaskStatus.SUCCESS, task2.status()); - Assert.assertEquals("{\"k1\":13579,\"k2\":\"24680\"}", task2.result()); - + // Ephemeral tasks are not stored in DB, so these should throw NotFoundException Assert.assertThrows(NotFoundException.class, () -> { scheduler.waitUntilTaskCompleted(task.id(), 10); }); @@ -557,7 +565,12 @@ public void testGremlinJobAndCancel() throws TimeoutException { scheduler.cancel(task); task = scheduler.task(task.id()); - Assert.assertEquals(TaskStatus.CANCELLING, task.status()); + // For DistributedTaskScheduler, local cancel may result in CANCELLED directly + // (task thread updates status after being interrupted) + // or CANCELLING (if task hasn't processed the interrupt yet) + Assert.assertTrue("Task status should be CANCELLING or CANCELLED, but was " + task.status(), + task.status() == TaskStatus.CANCELLING || + task.status() == TaskStatus.CANCELLED); task = scheduler.waitUntilTaskCompleted(task.id(), 10); Assert.assertEquals(TaskStatus.CANCELLED, task.status()); @@ -629,46 +642,51 @@ public void testGremlinJobAndRestore() throws Exception { scheduler.cancel(task); task = scheduler.task(task.id()); - Assert.assertEquals(TaskStatus.CANCELLING, task.status()); + Assert.assertTrue("Task status should be CANCELLING or CANCELLED, but was " + task.status(), + task.status() == TaskStatus.CANCELLING || + task.status() == TaskStatus.CANCELLED); task = scheduler.waitUntilTaskCompleted(task.id(), 10); Assert.assertEquals(TaskStatus.CANCELLED, task.status()); Assert.assertTrue("progress=" + task.progress(), 0 < task.progress() && task.progress() < 10); Assert.assertEquals(0, task.retries()); - Assert.assertEquals(null, task.result()); + Assert.assertNull(task.result()); HugeTask finalTask = task; - Assert.assertThrows(IllegalArgumentException.class, () -> { - Whitebox.invoke(scheduler.getClass(), "restore", scheduler, - finalTask); - }, e -> { - Assert.assertContains("No need to restore completed task", - e.getMessage()); - }); - HugeTask task2 = scheduler.task(task.id()); - Assert.assertThrows(IllegalArgumentException.class, () -> { + // because Distributed do nothing in restore, so only test StandardTaskScheduler here + if (scheduler.getClass().equals(StandardTaskScheduler.class)) { + Assert.assertThrows(IllegalArgumentException.class, () -> { + Whitebox.invoke(scheduler.getClass(), "restore", scheduler, + finalTask); + }, e -> { + Assert.assertContains("No need to restore completed task", + e.getMessage()); + }); + + HugeTask task2 = scheduler.task(task.id()); + Assert.assertThrows(IllegalArgumentException.class, () -> { + Whitebox.invoke(scheduler.getClass(), "restore", scheduler, task2); + }, e -> { + Assert.assertContains("No need to restore completed task", + e.getMessage()); + }); + + Whitebox.setInternalState(task2, "status", TaskStatus.RUNNING); Whitebox.invoke(scheduler.getClass(), "restore", scheduler, task2); - }, e -> { - Assert.assertContains("No need to restore completed task", - e.getMessage()); - }); - - Whitebox.setInternalState(task2, "status", TaskStatus.RUNNING); - Whitebox.invoke(scheduler.getClass(), "restore", scheduler, task2); - Assert.assertThrows(IllegalArgumentException.class, () -> { - Whitebox.invoke(scheduler.getClass(), "restore", scheduler, task2); - }, e -> { - Assert.assertContains("is already in the queue", e.getMessage()); - }); - - scheduler.waitUntilTaskCompleted(task2.id(), 10); - sleepAWhile(500); - Assert.assertEquals(10, task2.progress()); - Assert.assertEquals(1, task2.retries()); - Assert.assertEquals("100", task2.result()); + Assert.assertThrows(IllegalArgumentException.class, () -> { + Whitebox.invoke(scheduler.getClass(), "restore", scheduler, task2); + }, e -> { + Assert.assertContains("is already in the queue", e.getMessage()); + }); + scheduler.waitUntilTaskCompleted(task2.id(), 10); + sleepAWhile(500); + Assert.assertEquals(10, task2.progress()); + Assert.assertEquals(1, task2.retries()); + Assert.assertEquals("100", task2.result()); + } } private HugeTask runGremlinJob(String gremlin) { diff --git a/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/UnitTestSuite.java b/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/UnitTestSuite.java index 0780b03a64..07b3541aca 100644 --- a/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/UnitTestSuite.java +++ b/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/UnitTestSuite.java @@ -48,6 +48,7 @@ import org.apache.hugegraph.unit.core.SerialEnumTest; import org.apache.hugegraph.unit.core.ServerInfoManagerTest; import org.apache.hugegraph.unit.core.SystemSchemaStoreTest; +import org.apache.hugegraph.unit.core.TaskSchedulerServerInfoTest; import org.apache.hugegraph.unit.core.TraversalUtilTest; import org.apache.hugegraph.unit.id.EdgeIdTest; import org.apache.hugegraph.unit.id.IdTest; @@ -129,6 +130,7 @@ PageStateTest.class, SystemSchemaStoreTest.class, ServerInfoManagerTest.class, + TaskSchedulerServerInfoTest.class, RoleElectionStateMachineTest.class, HugeGraphAuthProxyTest.class, SchemaElementTest.class, diff --git a/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/core/ServerInfoManagerTest.java b/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/core/ServerInfoManagerTest.java index 85815bbc19..607d81de91 100644 --- a/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/core/ServerInfoManagerTest.java +++ b/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/core/ServerInfoManagerTest.java @@ -50,6 +50,20 @@ public void setup() { this.hugegraphManager = new ServerInfoManager(hugegraphParams, executor); } + @Test + public void testInitDoesNotAccessBackendStore() { + HugeGraphParams graphParams = Mockito.mock(HugeGraphParams.class); + ExecutorService executor = Mockito.mock(ExecutorService.class); + ServerInfoManager manager = new ServerInfoManager(graphParams, executor); + + manager.init(); + + Mockito.verify(graphParams, Mockito.never()).systemTransaction(); + Mockito.verify(graphParams, Mockito.never()).backendStoreFeatures(); + Mockito.verify(graphParams, Mockito.never()).graph(); + Mockito.verify(graphParams, Mockito.never()).closeTx(); + } + @Test public void testSelfNodeIdScopedByGraphWithSameNodeId() { GlobalMasterInfo nodeInfo = GlobalMasterInfo.master("server-1"); @@ -73,4 +87,22 @@ public void testSelfNodeIdScopedByGraphWithSameNodeId() { public void testSelfNodeIdReturnsNullWhenNotInitialized() { Assert.assertNull(this.sysGraphManager.selfNodeId()); } + + @Test + public void testSelfNodeIdReturnsNullWhenNodeIdMissing() { + Whitebox.setInternalState(this.sysGraphManager, + "globalNodeInfo", new GlobalMasterInfo()); + + Assert.assertNull(this.sysGraphManager.selfNodeId()); + } + + @Test + public void testInitServerInfoDoesNotAccessBackendStore() { + GlobalMasterInfo nodeInfo = GlobalMasterInfo.master("server-1"); + + this.sysGraphManager.initServerInfo(nodeInfo); + + Assert.assertEquals("DEFAULT-~sys_graph/server-1", + this.sysGraphManager.selfNodeId().asString()); + } } diff --git a/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/core/TaskSchedulerServerInfoTest.java b/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/core/TaskSchedulerServerInfoTest.java new file mode 100644 index 0000000000..225801608b --- /dev/null +++ b/hugegraph-server/hugegraph-test/src/main/java/org/apache/hugegraph/unit/core/TaskSchedulerServerInfoTest.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hugegraph.unit.core; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import org.apache.commons.configuration2.PropertiesConfiguration; +import org.apache.hugegraph.HugeGraph; +import org.apache.hugegraph.HugeGraphParams; +import org.apache.hugegraph.concurrent.PausableScheduledThreadPool; +import org.apache.hugegraph.config.CoreOptions; +import org.apache.hugegraph.config.HugeConfig; +import org.apache.hugegraph.config.ServerOptions; +import org.apache.hugegraph.core.GraphManager; +import org.apache.hugegraph.event.EventHub; +import org.apache.hugegraph.task.DistributedTaskScheduler; +import org.apache.hugegraph.testutil.Assert; +import org.apache.hugegraph.testutil.Whitebox; +import org.apache.hugegraph.util.ExecutorUtil; +import org.junit.Test; +import org.mockito.Mockito; + +public class TaskSchedulerServerInfoTest { + + @Test + public void testDistributedCheckRequirementDoesNotNeedServerInfo() { + HugeGraph graph = Mockito.mock(HugeGraph.class); + Mockito.when(graph.graphSpace()).thenReturn("DEFAULT"); + + HugeGraphParams params = Mockito.mock(HugeGraphParams.class); + Mockito.when(params.graph()).thenReturn(graph); + Mockito.when(params.name()).thenReturn("hugegraph"); + Mockito.when(params.spaceGraphName()).thenReturn("DEFAULT-hugegraph"); + Mockito.when(params.configuration()).thenReturn(newConfig()); + + PausableScheduledThreadPool schedulerExecutor = + ExecutorUtil.newPausableScheduledThreadPool( + 1, "distributed-scheduler-test-%d"); + ExecutorService taskDbExecutor = Executors.newSingleThreadExecutor(); + ExecutorService schemaTaskExecutor = Executors.newSingleThreadExecutor(); + ExecutorService olapTaskExecutor = Executors.newSingleThreadExecutor(); + ExecutorService gremlinTaskExecutor = Executors.newSingleThreadExecutor(); + ExecutorService ephemeralTaskExecutor = Executors.newSingleThreadExecutor(); + ExecutorService serverInfoDbExecutor = Executors.newSingleThreadExecutor(); + + DistributedTaskScheduler scheduler = new DistributedTaskScheduler( + params, schedulerExecutor, taskDbExecutor, schemaTaskExecutor, + olapTaskExecutor, gremlinTaskExecutor, ephemeralTaskExecutor, + serverInfoDbExecutor); + try { + scheduler.checkRequirement("schedule"); + } finally { + schedulerExecutor.shutdownNow(); + taskDbExecutor.shutdownNow(); + schemaTaskExecutor.shutdownNow(); + olapTaskExecutor.shutdownNow(); + gremlinTaskExecutor.shutdownNow(); + ephemeralTaskExecutor.shutdownNow(); + serverInfoDbExecutor.shutdownNow(); + } + } + + @Test + public void testGraphManagerDoesNotGenerateServerIdWhenElectionDisabled() { + HugeConfig config = newConfig(); + + GraphManager manager = new GraphManager(config, new EventHub("test")); + + Assert.assertEquals("", config.get(ServerOptions.SERVER_ID)); + Assert.assertNull(manager.globalNodeRoleInfo().nodeId()); + } + + @Test + public void testGraphManagerRejectsRoleElection() { + PropertiesConfiguration conf = new PropertiesConfiguration(); + conf.setProperty(ServerOptions.ENABLE_SERVER_ROLE_ELECTION.name(), true); + HugeConfig config = new HugeConfig(conf); + + Assert.assertThrows(IllegalArgumentException.class, () -> { + new GraphManager(config, new EventHub("test")); + }, e -> { + Assert.assertContains("The server.role_election is no longer supported", + e.getMessage()); + }); + } + + @Test + public void testGraphManagerDoesNotInjectPdPeersForStandaloneRocksDB() { + HugeConfig serverConfig = newConfig(); + GraphManager manager = new GraphManager(serverConfig, new EventHub("test")); + HugeConfig graphConfig = newGraphConfig("rocksdb"); + + Whitebox.invoke(manager.getClass(), "transferPdPeersConfig", + manager, graphConfig); + + Assert.assertFalse(graphConfig.containsKey(CoreOptions.PD_PEERS.name())); + } + + @Test + public void testGraphManagerInjectsPdPeersForHStoreGraph() { + HugeConfig serverConfig = newConfig(); + GraphManager manager = new GraphManager(serverConfig, new EventHub("test")); + HugeConfig graphConfig = newGraphConfig("hstore"); + + Whitebox.invoke(manager.getClass(), "transferPdPeersConfig", + manager, graphConfig); + + Assert.assertEquals(serverConfig.get(ServerOptions.PD_PEERS), + graphConfig.get(CoreOptions.PD_PEERS)); + } + + private static HugeConfig newConfig() { + return new HugeConfig(new PropertiesConfiguration()); + } + + private static HugeConfig newGraphConfig(String backend) { + PropertiesConfiguration conf = new PropertiesConfiguration(); + conf.setProperty(CoreOptions.BACKEND.name(), backend); + return new HugeConfig(conf); + } +} diff --git a/hugegraph-struct/src/main/java/org/apache/hugegraph/options/CoreOptions.java b/hugegraph-struct/src/main/java/org/apache/hugegraph/options/CoreOptions.java index 849539419b..caf0146bb9 100644 --- a/hugegraph-struct/src/main/java/org/apache/hugegraph/options/CoreOptions.java +++ b/hugegraph-struct/src/main/java/org/apache/hugegraph/options/CoreOptions.java @@ -295,13 +295,7 @@ public class CoreOptions extends OptionHolder { rangeInt(1, 500), 1 ); - public static final ConfigOption SCHEDULER_TYPE = - new ConfigOption<>( - "task.scheduler_type", - "The type of scheduler used in distribution system.", - allowValues("local", "distributed"), - "local" - ); + public static final ConfigOption TASK_SYNC_DELETION = new ConfigOption<>( "task.sync_deletion",