Skip to content

Commit c9b3652

Browse files
committed
IGNITE-24963 Final cleanup + WW
1 parent b4fd60d commit c9b3652

21 files changed

Lines changed: 68 additions & 123 deletions

File tree

modules/api/src/main/java/org/apache/ignite/tx/RunInTransactionInternalImpl.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
import java.util.Set;
3131
import java.util.concurrent.CompletableFuture;
3232
import java.util.concurrent.TimeUnit;
33-
import java.util.concurrent.TimeoutException;
3433
import java.util.function.Function;
3534
import org.jetbrains.annotations.Nullable;
3635

modules/replicator/src/main/java/org/apache/ignite/internal/replicator/exception/ReplicationException.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import org.apache.ignite.internal.lang.IgniteInternalException;
2424
import org.apache.ignite.internal.replicator.ReplicationGroupId;
2525
import org.apache.ignite.tx.RetriableReplicaRequestException;
26-
import org.apache.ignite.tx.RetriableTransactionException;
2726

2827
/**
2928
* The exception is thrown when some issue happened during a replication.

modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItDataConsistencyTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ public void testDataConsistency() throws InterruptedException {
156156
readThread.join(3_000);
157157
}
158158

159-
// TODO unregisted from timeout tracker killed transactions!!!!
159+
// TODO IGNITE-28464 unregister from expiration tracker.
160160
validate();
161161
}
162162

modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionInflights.java

Lines changed: 10 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,13 @@
1717

1818
package org.apache.ignite.internal.table.distributed.replicator;
1919

20+
import static java.util.concurrent.atomic.AtomicLongFieldUpdater.newUpdater;
2021
import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
2122

2223
import java.util.UUID;
2324
import java.util.concurrent.CompletableFuture;
2425
import java.util.concurrent.ConcurrentHashMap;
25-
import java.util.concurrent.atomic.AtomicLong;
26+
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
2627
import java.util.function.Predicate;
2728
import org.apache.ignite.internal.partition.replicator.network.replication.RequestType;
2829
import org.jetbrains.annotations.Nullable;
@@ -35,6 +36,9 @@ public class PartitionInflights {
3536
/** Hint for maximum concurrent txns. */
3637
private static final int MAX_CONCURRENT_TXNS_HINT = 1024;
3738

39+
/** Field updater for inflights. */
40+
private static final AtomicLongFieldUpdater<CleanupContext> UPDATER = newUpdater(CleanupContext.class, "inflights");
41+
3842
/** Txn contexts. */
3943
private final ConcurrentHashMap<UUID, CleanupContext> txCtxMap = new ConcurrentHashMap<>(MAX_CONCURRENT_TXNS_HINT);
4044

@@ -58,7 +62,7 @@ public class PartitionInflights {
5862
if (ctx.finishFut != null || testPred.test(txId)) {
5963
res[0] = false;
6064
} else {
61-
ctx.inflights.incrementAndGet();
65+
UPDATER.incrementAndGet(ctx);
6266
if (requestType.isWrite()) {
6367
ctx.hasWrites = true;
6468
}
@@ -76,7 +80,7 @@ public class PartitionInflights {
7680
* @param ctx Cleanup context.
7781
*/
7882
static void removeInflight(CleanupContext ctx) {
79-
long val = ctx.inflights.decrementAndGet();
83+
long val = UPDATER.decrementAndGet(ctx);
8084

8185
if (ctx.finishFut != null && val == 0) {
8286
ctx.finishFut.complete(null);
@@ -96,11 +100,11 @@ static void removeInflight(CleanupContext ctx) {
96100
}
97101

98102
if (ctx.finishFut == null) {
99-
ctx.finishFut = ctx.inflights.get() == 0 ? nullCompletedFuture() : new CompletableFuture<>();
103+
ctx.finishFut = UPDATER.get(ctx) == 0 ? nullCompletedFuture() : new CompletableFuture<>();
100104
}
101105

102106
// Avoiding a data race with a concurrent decrementing thread, which might not see finishFut publication.
103-
if (ctx.inflights.get() == 0 && !ctx.finishFut.isDone()) {
107+
if (UPDATER.get(ctx) == 0 && !ctx.finishFut.isDone()) {
104108
ctx.finishFut = nullCompletedFuture();
105109
}
106110

@@ -132,18 +136,8 @@ public boolean contains(UUID txId) {
132136
*/
133137
public static class CleanupContext {
134138
volatile CompletableFuture<Void> finishFut;
135-
AtomicLong inflights = new AtomicLong(0); // TODO atomic updater
139+
volatile long inflights = 0;
136140
volatile boolean hasWrites = false;
137-
138-
// void addInflight() {
139-
// inflights.incrementAndGet();
140-
// }
141-
//
142-
// void removeInflight(UUID txId) {
143-
// //assert inflights > 0 : format("No inflights, cannot remove any [txId={}, ctx={}]", txId, this);
144-
//
145-
// inflights.decrementAndGet();
146-
// }
147141
}
148142

149143
@TestOnly

modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -263,8 +263,6 @@ public class PartitionReplicaListener implements ReplicaTableProcessor {
263263
/** Factory for creating replica command messages. */
264264
private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new ReplicaMessagesFactory();
265265

266-
private static final int CONCURRENCY = Runtime.getRuntime().availableProcessors();
267-
268266
private final ZonePartitionId replicationGroupId;
269267

270268
private final int tableId;

modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/TraceableFuture.java

Lines changed: 0 additions & 20 deletions
This file was deleted.

modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -649,7 +649,7 @@ private <R> CompletableFuture<R> trackingInvoke(
649649
if (req.isWrite()) {
650650
// Track only write requests from explicit transactions.
651651
if (!tx.remote() && !transactionInflights.addInflight(tx.id())) {
652-
// TODO can add inflight even if the error
652+
// TODO IGNITE-28461 fail fast if TxContext.err != null.
653653
return failedFuture(tx.enlistFailedException());
654654
}
655655

modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java

Lines changed: 14 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -408,6 +408,10 @@ public void testLockOrdering() throws InterruptedException {
408408
InternalTransaction tx3 = (InternalTransaction) igniteTransactions.begin();
409409
InternalTransaction tx4 = (InternalTransaction) igniteTransactions.begin();
410410

411+
assertTrue(tx3.id().compareTo(tx4.id()) < 0);
412+
assertTrue(tx2.id().compareTo(tx3.id()) < 0);
413+
assertTrue(tx1.id().compareTo(tx2.id()) < 0);
414+
411415
boolean reversed = txManager(accounts).lockManager().policy().reverse();
412416
if (reversed) {
413417
InternalTransaction tmp = tx1;
@@ -419,10 +423,6 @@ public void testLockOrdering() throws InterruptedException {
419423
tx3 = tmp;
420424
}
421425

422-
assertTrue(tx3.id().compareTo(tx4.id()) < 0);
423-
assertTrue(tx2.id().compareTo(tx3.id()) < 0);
424-
assertTrue(tx1.id().compareTo(tx2.id()) < 0);
425-
426426
RecordView<Tuple> acc0 = accounts.recordView();
427427
RecordView<Tuple> acc2 = accounts.recordView();
428428
RecordView<Tuple> acc3 = accounts.recordView();
@@ -978,34 +978,25 @@ public void testGetAllAbort() throws TransactionException {
978978
public void testGetAllConflict() throws Exception {
979979
accounts.recordView().upsertAll(null, List.of(makeValue(1, 100.), makeValue(2, 200.)));
980980

981-
InternalTransaction tx1 = (InternalTransaction) igniteTransactions.begin();
982-
InternalTransaction tx2 = (InternalTransaction) igniteTransactions.begin();
981+
InternalTransaction older = (InternalTransaction) igniteTransactions.begin();
982+
InternalTransaction younger = (InternalTransaction) igniteTransactions.begin();
983983

984984
boolean reversed = txManager(accounts).lockManager().policy().reverse();
985985

986-
Transaction owner = reversed ? tx2 : tx1;
987-
Transaction waiter = reversed ? tx1 : tx2;
988-
989986
RecordView<Tuple> txAcc = accounts.recordView();
990987
RecordView<Tuple> txAcc2 = accounts.recordView();
991988

992-
txAcc2.upsert(owner, makeValue(1, 300.));
993-
txAcc.upsert(waiter, makeValue(2, 400.));
994-
995-
CompletableFuture<List<Tuple>> fut = txAcc.getAllAsync(waiter, List.of(makeKey(2), makeKey(1)));
996-
ensureFutureNotCompleted(fut, 100);
989+
txAcc2.upsert(older, makeValue(1, 300.));
990+
txAcc.upsert(younger, makeValue(2, 400.));
997991

998-
validateBalance(txAcc2.getAll(owner, List.of(makeKey(2), makeKey(1))), 200., 300.);
999-
validateBalance(txAcc2.getAll(owner, List.of(makeKey(1), makeKey(2))), 300., 200.);
992+
// Triggers a conflict, which invalidates younger transaction.
993+
txAcc.getAllAsync(reversed ? younger : older, List.of(makeKey(2), makeKey(1)));
994+
assertTrue(waitForCondition(() -> TxState.ABORTED == younger.state(), 5_000), younger.state().toString());
1000995

1001-
assertTrue(waitForCondition(() -> TxState.ABORTED == tx2.state(), 5_000), tx2.state().toString());
996+
validateBalance(txAcc2.getAll(older, List.of(makeKey(2), makeKey(1))), 200., 300.);
997+
validateBalance(txAcc2.getAll(older, List.of(makeKey(1), makeKey(2))), 300., 200.);
1002998

1003-
owner.commit();
1004-
try {
1005-
waiter.rollback();
1006-
} catch (TransactionException e) {
1007-
// Expected.
1008-
}
999+
older.commit();
10091000

10101001
validateBalance(accounts.recordView().getAll(null, List.of(makeKey(2), makeKey(1))), 200., 300.);
10111002
}

modules/transactions/src/main/java/org/apache/ignite/internal/tx/TransactionKilledException.java

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -45,14 +45,6 @@ public TransactionKilledException(UUID txId, TxManager txManager) {
4545
this.txId = txId;
4646
}
4747

48-
public TransactionKilledException(UUID txId) {
49-
super(
50-
TX_KILLED_ERR,
51-
"Transaction is killed " + txId
52-
);
53-
this.txId = txId;
54-
}
55-
5648
/**
5749
* Returns a transaction id.
5850
*

modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,6 @@
3333
import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
3434
import org.apache.ignite.internal.tx.metrics.ResourceVacuumMetrics;
3535
import org.apache.ignite.internal.tx.metrics.TransactionMetricsSource;
36-
import org.apache.ignite.tx.Transaction;
37-
import org.apache.ignite.tx.TransactionOptions;
3836
import org.jetbrains.annotations.Nullable;
3937
import org.jetbrains.annotations.TestOnly;
4038

0 commit comments

Comments
 (0)