Skip to content

Commit 652a4d5

Browse files
CyrillKirill Sizov
andauthored
IGNITE-28543 Throttle VacuumTxStateReplicaRequests (#7987)
Co-authored-by: Kirill Sizov <sizov.kirill.y@gmail.com>
1 parent bd26df6 commit 652a4d5

2 files changed

Lines changed: 239 additions & 24 deletions

File tree

modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PersistentTxStateVacuumizer.java

Lines changed: 61 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525

2626
import java.util.ArrayList;
2727
import java.util.HashSet;
28+
import java.util.Iterator;
2829
import java.util.List;
2930
import java.util.Map;
3031
import java.util.Set;
@@ -60,6 +61,9 @@ public class PersistentTxStateVacuumizer {
6061

6162
private static final String VACUUM_THROTTLE_KEY = "vacuum-failed";
6263

64+
/** Maximum number of transaction IDs per vacuum request to avoid serialization timeouts. */
65+
static final int VACUUM_BATCH_SIZE = 1000;
66+
6367
private static final TxMessagesFactory TX_MESSAGES_FACTORY = new TxMessagesFactory();
6468

6569
private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new ReplicaMessagesFactory();
@@ -134,30 +138,13 @@ public CompletableFuture<PersistentTxStateVacuumResult> vacuumPersistentTxStates
134138
return nullCompletedFuture();
135139
}
136140

137-
VacuumTxStateReplicaRequest request = TX_MESSAGES_FACTORY.vacuumTxStateReplicaRequest()
138-
.enlistmentConsistencyToken(replicaMeta.getStartTime().longValue())
139-
.groupId(toZonePartitionIdMessage(REPLICA_MESSAGES_FACTORY, commitPartitionId))
140-
.transactionIds(filteredTxIds)
141-
.build();
142-
143-
return replicaService.invoke(localNode, request).whenComplete((v, e) -> {
144-
if (e == null) {
145-
successful.addAll(filteredTxIds);
146-
vacuumizedPersistentTxnStatesCount.addAndGet(filteredTxIds.size());
147-
} else if (expectedException(e)) {
148-
// We can log the exceptions without further handling because failed requests' txns are not added
149-
// to the set of successful and will be retried. PrimaryReplicaMissException can be considered as
150-
// a part of regular flow and doesn't need to be logged. NodeStoppingException should be ignored as
151-
// vacuumization will be retried after restart.
152-
LOG.debug("Failed to vacuum tx states from the persistent storage.", e);
153-
} else {
154-
// In general, even though this vacuum round has completed unsuccessfully,
155-
// due to ReplicationTimeoutException for instance,
156-
// it does not mean that correctness is violated, and we need to shutdown the node.
157-
// Perhaps the next attempt will be successful.
158-
throttledLogger.warn(VACUUM_THROTTLE_KEY, "Failed to vacuum tx states from the persistent storage.", e);
159-
}
160-
});
141+
return sendBatchedVacuumRequests(
142+
replicaMeta.getStartTime().longValue(),
143+
commitPartitionId,
144+
filteredTxIds,
145+
successful,
146+
vacuumizedPersistentTxnStatesCount
147+
);
161148
} else {
162149
successful.addAll(txs.stream().map(v -> v.txId).collect(toSet()));
163150

@@ -172,6 +159,56 @@ public CompletableFuture<PersistentTxStateVacuumResult> vacuumPersistentTxStates
172159
.handle((unused, unusedEx) -> new PersistentTxStateVacuumResult(successful, vacuumizedPersistentTxnStatesCount.get()));
173160
}
174161

162+
private CompletableFuture<Void> sendBatchedVacuumRequests(
163+
long enlistmentConsistencyToken,
164+
ZonePartitionId commitPartitionId,
165+
Set<UUID> txIds,
166+
Set<UUID> successful,
167+
AtomicInteger vacuumizedCount
168+
) {
169+
List<CompletableFuture<?>> batchFutures = new ArrayList<>();
170+
Iterator<UUID> it = txIds.iterator();
171+
172+
while (it.hasNext()) {
173+
Set<UUID> batch = new HashSet<>(Math.min(VACUUM_BATCH_SIZE, txIds.size()));
174+
175+
for (int j = 0; j < VACUUM_BATCH_SIZE && it.hasNext(); j++) {
176+
batch.add(it.next());
177+
}
178+
179+
batchFutures.add(sendVacuumBatch(enlistmentConsistencyToken, commitPartitionId, batch, successful, vacuumizedCount));
180+
}
181+
182+
return allOf(batchFutures);
183+
}
184+
185+
private CompletableFuture<?> sendVacuumBatch(
186+
long enlistmentConsistencyToken,
187+
ZonePartitionId commitPartitionId,
188+
Set<UUID> batch,
189+
Set<UUID> successful,
190+
AtomicInteger vacuumizedCount
191+
) {
192+
VacuumTxStateReplicaRequest request = TX_MESSAGES_FACTORY.vacuumTxStateReplicaRequest()
193+
.enlistmentConsistencyToken(enlistmentConsistencyToken)
194+
.groupId(toZonePartitionIdMessage(REPLICA_MESSAGES_FACTORY, commitPartitionId))
195+
.transactionIds(batch)
196+
.build();
197+
198+
return replicaService.invoke(localNode, request).whenComplete((v, e) -> {
199+
if (e == null) {
200+
successful.addAll(batch);
201+
vacuumizedCount.addAndGet(batch.size());
202+
} else if (expectedException(e)) {
203+
// Failed requests' txns are not added to the set of successful and will be retried.
204+
LOG.debug("Failed to vacuum tx states from the persistent storage.", e);
205+
} else {
206+
throttledLogger.warn(VACUUM_THROTTLE_KEY,
207+
"Failed to vacuum tx states from the persistent storage.", e);
208+
}
209+
});
210+
}
211+
175212
private static boolean expectedException(Throwable e) {
176213
return hasCause(e,
177214
PrimaryReplicaMissException.class,
Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.ignite.internal.tx.impl;
19+
20+
import static java.util.concurrent.CompletableFuture.completedFuture;
21+
import static org.apache.ignite.internal.tx.impl.PersistentTxStateVacuumizer.VACUUM_BATCH_SIZE;
22+
import static org.hamcrest.MatcherAssert.assertThat;
23+
import static org.hamcrest.Matchers.everyItem;
24+
import static org.hamcrest.Matchers.lessThanOrEqualTo;
25+
import static org.junit.jupiter.api.Assertions.assertEquals;
26+
import static org.junit.jupiter.api.Assertions.assertTrue;
27+
import static org.mockito.ArgumentMatchers.any;
28+
import static org.mockito.Mockito.lenient;
29+
import static org.mockito.Mockito.when;
30+
31+
import java.util.HashSet;
32+
import java.util.Map;
33+
import java.util.Set;
34+
import java.util.UUID;
35+
import java.util.concurrent.CompletableFuture;
36+
import org.apache.ignite.internal.hlc.HybridClockImpl;
37+
import org.apache.ignite.internal.hlc.TestClockService;
38+
import org.apache.ignite.internal.network.InternalClusterNode;
39+
import org.apache.ignite.internal.placementdriver.PlacementDriver;
40+
import org.apache.ignite.internal.placementdriver.TestReplicaMetaImpl;
41+
import org.apache.ignite.internal.replicator.ReplicaService;
42+
import org.apache.ignite.internal.replicator.ZonePartitionId;
43+
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
44+
import org.apache.ignite.internal.tx.impl.PersistentTxStateVacuumizer.PersistentTxStateVacuumResult;
45+
import org.apache.ignite.internal.tx.impl.PersistentTxStateVacuumizer.VacuumizableTx;
46+
import org.apache.ignite.internal.tx.message.VacuumTxStateReplicaRequest;
47+
import org.junit.jupiter.api.BeforeEach;
48+
import org.junit.jupiter.api.Test;
49+
import org.junit.jupiter.api.extension.ExtendWith;
50+
import org.mockito.ArgumentCaptor;
51+
import org.mockito.Captor;
52+
import org.mockito.Mock;
53+
import org.mockito.junit.jupiter.MockitoExtension;
54+
55+
@ExtendWith(MockitoExtension.class)
56+
class PersistentTxStateVacuumizerTest extends BaseIgniteAbstractTest {
57+
@Mock
58+
private ReplicaService replicaService;
59+
60+
@Mock
61+
private InternalClusterNode localNode;
62+
63+
@Mock
64+
private PlacementDriver placementDriver;
65+
66+
@Captor
67+
private ArgumentCaptor<VacuumTxStateReplicaRequest> requestCaptor;
68+
69+
private PersistentTxStateVacuumizer vacuumizer;
70+
71+
@BeforeEach
72+
void setUp() {
73+
UUID localNodeId = UUID.randomUUID();
74+
when(localNode.id()).thenReturn(localNodeId);
75+
when(localNode.name()).thenReturn("test-node");
76+
77+
TestReplicaMetaImpl replicaMeta = new TestReplicaMetaImpl(localNode.name(), localNodeId);
78+
79+
when(placementDriver.getPrimaryReplica(any(), any()))
80+
.thenReturn(completedFuture(replicaMeta));
81+
lenient().when(replicaService.invoke(any(InternalClusterNode.class), requestCaptor.capture()))
82+
.thenReturn(completedFuture(null));
83+
84+
vacuumizer = new PersistentTxStateVacuumizer(
85+
replicaService,
86+
localNode,
87+
new TestClockService(new HybridClockImpl()),
88+
placementDriver
89+
);
90+
}
91+
92+
@Test
93+
void smallBatchSentAsSingleRequest() {
94+
int count = 10;
95+
Map<ZonePartitionId, Set<VacuumizableTx>> txIds = createTxIds(count);
96+
97+
PersistentTxStateVacuumResult result = vacuumizer.vacuumPersistentTxStates(txIds).join();
98+
99+
assertEquals(count, result.vacuumizedPersistentTxnStatesCount);
100+
assertEquals(count, result.txnsToVacuum.size());
101+
assertEquals(1, requestCaptor.getAllValues().size());
102+
assertEquals(count, requestCaptor.getValue().transactionIds().size());
103+
}
104+
105+
@Test
106+
void largeBatchIsSplitIntoMultipleRequests() {
107+
int count = VACUUM_BATCH_SIZE * 3 + 1;
108+
Map<ZonePartitionId, Set<VacuumizableTx>> txIds = createTxIds(count);
109+
110+
PersistentTxStateVacuumResult result = vacuumizer.vacuumPersistentTxStates(txIds).join();
111+
112+
assertEquals(count, result.vacuumizedPersistentTxnStatesCount);
113+
assertEquals(count, result.txnsToVacuum.size());
114+
115+
// Should be split into 4 requests.
116+
assertEquals(4, requestCaptor.getAllValues().size());
117+
118+
// Each request should have at most VACUUM_BATCH_SIZE tx IDs.
119+
assertThat(
120+
requestCaptor.getAllValues().stream().map(r -> r.transactionIds().size()).toList(),
121+
everyItem(lessThanOrEqualTo(VACUUM_BATCH_SIZE))
122+
);
123+
124+
// All tx IDs should be covered.
125+
Set<UUID> allSentIds = new HashSet<>();
126+
for (VacuumTxStateReplicaRequest req : requestCaptor.getAllValues()) {
127+
allSentIds.addAll(req.transactionIds());
128+
}
129+
assertEquals(count, allSentIds.size());
130+
}
131+
132+
@Test
133+
void partialBatchFailureDoesNotAffectOtherBatches() {
134+
int count = VACUUM_BATCH_SIZE * 2;
135+
Map<ZonePartitionId, Set<VacuumizableTx>> txIds = createTxIds(count);
136+
137+
// First invocation succeeds, second fails.
138+
when(replicaService.invoke(any(InternalClusterNode.class), any(VacuumTxStateReplicaRequest.class)))
139+
.thenReturn(completedFuture(null))
140+
.thenReturn(CompletableFuture.failedFuture(new RuntimeException("test failure")));
141+
142+
PersistentTxStateVacuumResult result = vacuumizer.vacuumPersistentTxStates(txIds).join();
143+
144+
// Only the first batch's tx IDs should be in the successful set.
145+
assertEquals(VACUUM_BATCH_SIZE, result.vacuumizedPersistentTxnStatesCount);
146+
assertEquals(VACUUM_BATCH_SIZE, result.txnsToVacuum.size());
147+
}
148+
149+
@Test
150+
void txsWithoutCleanupTimestampAreSuccessfulWithoutRequest() {
151+
ZonePartitionId partitionId = new ZonePartitionId(1, 0);
152+
Set<VacuumizableTx> txs = new HashSet<>();
153+
154+
// Tx without cleanup timestamp — should go directly to successful.
155+
UUID txWithoutCleanup = UUID.randomUUID();
156+
txs.add(new VacuumizableTx(txWithoutCleanup, null));
157+
158+
Map<ZonePartitionId, Set<VacuumizableTx>> txIds = Map.of(partitionId, txs);
159+
160+
PersistentTxStateVacuumResult result = vacuumizer.vacuumPersistentTxStates(txIds).join();
161+
162+
assertTrue(result.txnsToVacuum.contains(txWithoutCleanup));
163+
assertEquals(0, result.vacuumizedPersistentTxnStatesCount);
164+
// No requests should be sent since the only tx has no cleanup timestamp.
165+
assertEquals(0, requestCaptor.getAllValues().size());
166+
}
167+
168+
private static Map<ZonePartitionId, Set<VacuumizableTx>> createTxIds(int count) {
169+
ZonePartitionId partitionId = new ZonePartitionId(1, 0);
170+
Set<VacuumizableTx> txs = new HashSet<>();
171+
172+
for (int i = 0; i < count; i++) {
173+
txs.add(new VacuumizableTx(UUID.randomUUID(), 1L));
174+
}
175+
176+
return Map.of(partitionId, txs);
177+
}
178+
}

0 commit comments

Comments
 (0)