Skip to content

Commit f6f2e5c

Browse files
author
Egor Kuts
committed
ignite-28305 review
1 parent 0da1807 commit f6f2e5c

30 files changed

Lines changed: 377 additions & 231 deletions

File tree

modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -519,6 +519,9 @@ public static class Replicator {
519519

520520
/** Replica is absent on the node and the node is not in assignments for this replica. */
521521
public static final int REPLICA_ABSENT_ERR = REPLICATOR_ERR_GROUP.registerErrorCode((short) 11);
522+
523+
/** Node is overloaded: in-flight partition operation byte limit reached. */
524+
public static final int REPLICA_OVERLOADED_ERR = REPLICATOR_ERR_GROUP.registerErrorCode((short) 12);
522525
}
523526

524527
/** Storage error group. */

modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/TestServer.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@
4747
import org.apache.ignite.internal.sql.engine.QueryProcessor;
4848
import org.apache.ignite.internal.table.IgniteTablesInternal;
4949
import org.apache.ignite.internal.tx.TxManager;
50-
import org.apache.ignite.internal.util.PartitionOperationInFlightLimiter;
5150
import org.apache.ignite.network.NetworkAddress;
5251
import org.jetbrains.annotations.Nullable;
5352
import org.junit.jupiter.api.TestInfo;
@@ -145,7 +144,6 @@ ClientHandlerModule start(TestInfo testInfo) {
145144
EventLog.NOOP,
146145
new TestLowWatermark(),
147146
Runnable::run,
148-
new PartitionOperationInFlightLimiter(0),
149147
() -> true
150148
);
151149

modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,6 @@
7474
import org.apache.ignite.internal.table.IgniteTablesInternal;
7575
import org.apache.ignite.internal.tx.TxManager;
7676
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
77-
import org.apache.ignite.internal.util.PartitionOperationInFlightLimiter;
7877
import org.apache.ignite.lang.IgniteException;
7978
import org.jetbrains.annotations.Nullable;
8079
import org.jetbrains.annotations.TestOnly;
@@ -164,8 +163,6 @@ public class ClientHandlerModule implements IgniteComponent, PlatformComputeTran
164163

165164
private final Executor partitionOperationsExecutor;
166165

167-
private final PartitionOperationInFlightLimiter partitionOperationInFlightLimiter;
168-
169166
private final ConcurrentHashMap<String, CompletableFuture<PlatformComputeConnection>> computeExecutors = new ConcurrentHashMap<>();
170167

171168
@TestOnly
@@ -189,7 +186,6 @@ public class ClientHandlerModule implements IgniteComponent, PlatformComputeTran
189186
* @param eventLog Event log.
190187
* @param lowWatermark Low watermark.
191188
* @param partitionOperationsExecutor Executor for a partition operation.
192-
* @param partitionOperationInFlightLimiter In-flight limiter for partition operations.
193189
* @param ddlBatchingSuggestionEnabled Boolean supplier indicates whether the suggestion related DDL batching is enabled.
194190
*/
195191
public ClientHandlerModule(
@@ -211,7 +207,6 @@ public ClientHandlerModule(
211207
EventLog eventLog,
212208
LowWatermark lowWatermark,
213209
Executor partitionOperationsExecutor,
214-
PartitionOperationInFlightLimiter partitionOperationInFlightLimiter,
215210
Supplier<Boolean> ddlBatchingSuggestionEnabled
216211
) {
217212
assert igniteTables != null;
@@ -257,7 +252,6 @@ public ClientHandlerModule(
257252
this.clientConnectorConfiguration = clientConnectorConfiguration;
258253
this.ddlBatchingSuggestionEnabled = ddlBatchingSuggestionEnabled;
259254
this.partitionOperationsExecutor = partitionOperationsExecutor;
260-
this.partitionOperationInFlightLimiter = partitionOperationInFlightLimiter;
261255
}
262256

263257
/** {@inheritDoc} */
@@ -477,7 +471,6 @@ private ClientInboundMessageHandler createInboundMessageHandler(
477471
connectionId,
478472
primaryReplicaTracker,
479473
partitionOperationsExecutor,
480-
partitionOperationInFlightLimiter,
481474
SUPPORTED_FEATURES,
482475
Map.of(),
483476
computeExecutors::remove,

modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java

Lines changed: 10 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,6 @@
153153
import org.apache.ignite.internal.jdbc.proto.JdbcQueryCursorHandler;
154154
import org.apache.ignite.internal.lang.IgniteExceptionMapperUtil;
155155
import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
156-
import org.apache.ignite.internal.lang.ReplicaOverloadedException;
157156
import org.apache.ignite.internal.logger.IgniteLogger;
158157
import org.apache.ignite.internal.logger.Loggers;
159158
import org.apache.ignite.internal.network.ClusterService;
@@ -181,7 +180,6 @@
181180
import org.apache.ignite.internal.tx.TransactionKilledException;
182181
import org.apache.ignite.internal.tx.TxManager;
183182
import org.apache.ignite.internal.util.ExceptionUtils;
184-
import org.apache.ignite.internal.util.PartitionOperationInFlightLimiter;
185183
import org.apache.ignite.lang.CancelHandle;
186184
import org.apache.ignite.lang.ErrorGroups.Compute;
187185
import org.apache.ignite.lang.ErrorGroups.Sql;
@@ -280,8 +278,6 @@ public class ClientInboundMessageHandler
280278

281279
private final Executor partitionOperationsExecutor;
282280

283-
private final PartitionOperationInFlightLimiter partitionOperationInFlightLimiter;
284-
285281
private final BitSet features;
286282

287283
private final Map<HandshakeExtension, Object> extensions;
@@ -313,7 +309,6 @@ public class ClientInboundMessageHandler
313309
* @param connectionId Connection ID.
314310
* @param primaryReplicaTracker Primary replica tracker.
315311
* @param partitionOperationsExecutor Partition operations executor.
316-
* @param partitionOperationInFlightLimiter In-flight limiter for partition operations.
317312
* @param features Features.
318313
* @param extensions Extensions.
319314
* @param eventLog Event log.
@@ -335,7 +330,6 @@ public ClientInboundMessageHandler(
335330
long connectionId,
336331
ClientPrimaryReplicaTracker primaryReplicaTracker,
337332
Executor partitionOperationsExecutor,
338-
PartitionOperationInFlightLimiter partitionOperationInFlightLimiter,
339333
BitSet features,
340334
Map<HandshakeExtension, Object> extensions,
341335
Function<String, CompletableFuture<PlatformComputeConnection>> computeConnectionFunc,
@@ -379,7 +373,6 @@ public ClientInboundMessageHandler(
379373
this.eventLog = eventLog;
380374
this.primaryReplicaTracker = primaryReplicaTracker;
381375
this.partitionOperationsExecutor = partitionOperationsExecutor;
382-
this.partitionOperationInFlightLimiter = partitionOperationInFlightLimiter;
383376
this.handshakeEventLoopSwitcher = handshakeEventLoopSwitcher;
384377

385378
jdbcQueryCursorHandler = new JdbcQueryCursorHandlerImpl(resources);
@@ -427,7 +420,6 @@ public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
427420
@Override
428421
public void channelRead(ChannelHandlerContext ctx, Object msg) {
429422
ByteBuf byteBuf = (ByteBuf) msg;
430-
431423
// Each inbound handler in a pipeline has to release the received messages.
432424
var unpacker = new ClientMessageUnpacker(byteBuf);
433425

@@ -889,28 +881,18 @@ private void processOperation(ChannelHandlerContext ctx, ClientMessageUnpacker i
889881
if (ClientOp.isPartitionOperation(opCode)) {
890882
long requestId0 = requestId;
891883
int opCode0 = opCode;
892-
if (!partitionOperationInFlightLimiter.tryAcquire()) {
893-
in.close();
884+
partitionOperationsExecutor.execute(() -> {
885+
try {
886+
processOperationInternal(ctx, in, requestId0, opCode0, guard);
887+
} catch (Throwable t) {
888+
in.close();
894889

895-
writeError(requestId0, opCode0, new ReplicaOverloadedException(), ctx, false, guard);
890+
writeError(requestId0, opCode0, t, ctx, false, guard);
896891

897-
metrics.requestsFailedIncrement();
898-
} else {
899-
partitionOperationsExecutor.execute(() -> {
900-
try {
901-
processOperationInternal(ctx, in, requestId0, opCode0, guard);
902-
} catch (Throwable t) {
903-
in.close();
904-
905-
writeError(requestId0, opCode0, t, ctx, false, guard);
906-
907-
metrics.requestsFailedIncrement();
908-
metrics.requestsActiveDecrement();
909-
} finally {
910-
partitionOperationInFlightLimiter.release();
911-
}
912-
});
913-
}
892+
metrics.requestsFailedIncrement();
893+
metrics.requestsActiveDecrement();
894+
}
895+
});
914896
} else {
915897
processOperationInternal(ctx, in, requestId, opCode, guard);
916898
}

modules/client/src/test/java/org/apache/ignite/client/TestClientHandlerModule.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,6 @@
6565
import org.apache.ignite.internal.schema.AlwaysSyncedSchemaSyncService;
6666
import org.apache.ignite.internal.security.authentication.AuthenticationManager;
6767
import org.apache.ignite.internal.table.IgniteTablesInternal;
68-
import org.apache.ignite.internal.util.PartitionOperationInFlightLimiter;
6968
import org.apache.ignite.lang.IgniteException;
7069
import org.jetbrains.annotations.Nullable;
7170

@@ -273,7 +272,6 @@ protected void initChannel(Channel ch) {
273272
new TestLowWatermark()
274273
),
275274
Runnable::run,
276-
new PartitionOperationInFlightLimiter(0),
277275
features,
278276
randomExtensions(),
279277
unused -> null,

modules/client/src/test/java/org/apache/ignite/client/TestServer.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,6 @@
7979
import org.apache.ignite.internal.security.authentication.AuthenticationManagerImpl;
8080
import org.apache.ignite.internal.security.configuration.SecurityConfiguration;
8181
import org.apache.ignite.internal.table.IgniteTablesInternal;
82-
import org.apache.ignite.internal.util.PartitionOperationInFlightLimiter;
8382
import org.apache.ignite.network.NetworkAddress;
8483
import org.jetbrains.annotations.Nullable;
8584
import org.mockito.Mockito;
@@ -291,7 +290,6 @@ public void log(String type, Supplier<Event> eventProvider) {
291290
EventLog.NOOP,
292291
new TestLowWatermark(),
293292
Runnable::run,
294-
new PartitionOperationInFlightLimiter(0),
295293
() -> true
296294
);
297295

modules/core/src/main/java/org/apache/ignite/internal/lang/ReplicaOverloadedException.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,17 @@
1717

1818
package org.apache.ignite.internal.lang;
1919

20-
import static org.apache.ignite.lang.ErrorGroups.Replicator.GROUP_OVERLOADED_ERR;
20+
import static org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_OVERLOADED_ERR;
2121

2222
/**
23-
* Thrown when the node has reached the maximum number of in-flight partition operations
24-
* ({@code replication.maxInFlightPartitionOperationsPerCore}) and cannot accept new requests.
23+
* Thrown when the node has reached the in-flight partition operation byte limit
24+
* ({@code replication.partitionOperationHeapUsagePercent}) and cannot accept new requests.
2525
*/
2626
public class ReplicaOverloadedException extends IgniteInternalException {
2727
private static final long serialVersionUID = -6023736883539658779L;
2828

2929
/** Constructor. */
3030
public ReplicaOverloadedException() {
31-
super(GROUP_OVERLOADED_ERR, "Node is overloaded: max in-flight partition operations limit reached.");
31+
super(REPLICA_OVERLOADED_ERR, "Node is overloaded: in-flight partition operation byte limit reached.");
3232
}
3333
}

modules/core/src/main/java/org/apache/ignite/internal/util/PartitionOperationInFlightLimiter.java

Lines changed: 0 additions & 105 deletions
This file was deleted.

0 commit comments

Comments
 (0)