Skip to content

Commit 4ef09c3

Browse files
authored
IGNITE-28395 Fix Lease updater accumulates concurrent in-flight invocations causing constant CAS failures (#7976)
1 parent 4e2e64b commit 4ef09c3

3 files changed

Lines changed: 173 additions & 5 deletions

File tree

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.ignite.internal.placementdriver;
19+
20+
import static java.util.concurrent.TimeUnit.SECONDS;
21+
import static org.apache.ignite.internal.TestWrappers.unwrapTableImpl;
22+
import static org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync;
23+
import static org.apache.ignite.internal.testframework.IgniteTestUtils.sleep;
24+
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
25+
import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
26+
import static org.hamcrest.MatcherAssert.assertThat;
27+
import static org.junit.jupiter.api.Assertions.assertEquals;
28+
29+
import java.nio.charset.StandardCharsets;
30+
import java.util.concurrent.CompletableFuture;
31+
import java.util.concurrent.atomic.AtomicBoolean;
32+
import java.util.concurrent.atomic.AtomicInteger;
33+
import org.apache.ignite.InitParametersBuilder;
34+
import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
35+
import org.apache.ignite.internal.app.IgniteImpl;
36+
import org.apache.ignite.internal.catalog.CatalogService;
37+
import org.apache.ignite.internal.lang.ByteArray;
38+
import org.apache.ignite.internal.replicator.ZonePartitionId;
39+
import org.junit.jupiter.api.BeforeEach;
40+
import org.junit.jupiter.api.Test;
41+
42+
/**
43+
* Tests for the lease updater's inflight futures.
44+
*/
45+
public class ItLeaseUpdaterInflightTest extends ClusterPerTestIntegrationTest {
46+
private static final String TEST_ZONE = "TEST_ZONE";
47+
private static final String TEST_TABLE = "TEST_TABLE";
48+
private static final String LEASE_EXPIRATION_INTERVAL_MILLIS_STR = "2000";
49+
50+
@Override
51+
protected int initialNodes() {
52+
return 1;
53+
}
54+
55+
@BeforeEach
56+
public void setup() {
57+
sql("CREATE ZONE " + TEST_ZONE + " (partitions 1, replicas 1) storage profiles ['" + CatalogService.DEFAULT_STORAGE_PROFILE + "']");
58+
sql("CREATE TABLE " + TEST_TABLE + " (ID INT PRIMARY KEY, VAL VARCHAR(20)) ZONE " + TEST_ZONE);
59+
}
60+
61+
@Override
62+
protected void customizeInitParameters(InitParametersBuilder builder) {
63+
super.customizeInitParameters(builder);
64+
65+
builder.clusterConfiguration("ignite {"
66+
+ " replication.leaseExpirationIntervalMillis: " + LEASE_EXPIRATION_INTERVAL_MILLIS_STR
67+
+ "}");
68+
}
69+
70+
@Test
71+
public void test() {
72+
AtomicInteger msInflightCount = new AtomicInteger();
73+
AtomicBoolean stopped = new AtomicBoolean();
74+
75+
IgniteImpl node = anyNode();
76+
77+
int zoneId = unwrapTableImpl(node.tables().table(TEST_TABLE)).zoneId();
78+
ZonePartitionId partId = new ZonePartitionId(zoneId, 0);
79+
80+
ReplicaMeta replicaMeta = waitAndGetPrimaryReplica(node, partId);
81+
82+
log.info("Test: zoneId={}, leaseStartTime={}", zoneId, replicaMeta.getStartTime());
83+
84+
String testKey = "testKey";
85+
node.metaStorageManager().registerPrefixWatch(new ByteArray(testKey), event -> {
86+
sleep(10);
87+
88+
msInflightCount.decrementAndGet();
89+
90+
return nullCompletedFuture();
91+
});
92+
93+
runAsync(() -> {
94+
while (!stopped.get()) {
95+
if (msInflightCount.get() > 300) {
96+
continue;
97+
}
98+
99+
msInflightCount.incrementAndGet();
100+
node.metaStorageManager().put(new ByteArray(testKey), "testValue".getBytes(StandardCharsets.UTF_8));
101+
}
102+
});
103+
104+
try {
105+
sleep(Long.parseLong(LEASE_EXPIRATION_INTERVAL_MILLIS_STR) * 5);
106+
107+
ReplicaMeta newReplicaMeta = waitAndGetPrimaryReplica(node, partId);
108+
log.info("Test: newLease={}", newReplicaMeta);
109+
110+
assertEquals(replicaMeta.getStartTime().longValue(), newReplicaMeta.getStartTime().longValue());
111+
} finally {
112+
stopped.set(true);
113+
}
114+
}
115+
116+
private static ReplicaMeta waitAndGetPrimaryReplica(IgniteImpl node, ZonePartitionId replicationGrpId) {
117+
CompletableFuture<ReplicaMeta> primaryReplicaFut = node.placementDriver().awaitPrimaryReplica(
118+
replicationGrpId,
119+
node.clock().now(),
120+
10,
121+
SECONDS
122+
);
123+
124+
assertThat(primaryReplicaFut, willCompleteSuccessfully());
125+
126+
return primaryReplicaFut.join();
127+
}
128+
}

modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java

Lines changed: 41 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,11 @@
1717

1818
package org.apache.ignite.internal.placementdriver;
1919

20+
import static java.util.Collections.emptyMap;
2021
import static java.util.Objects.hash;
2122
import static java.util.Objects.requireNonNullElse;
2223
import static java.util.concurrent.CompletableFuture.completedFuture;
24+
import static java.util.concurrent.TimeUnit.MILLISECONDS;
2325
import static org.apache.ignite.internal.hlc.HybridTimestamp.NULL_HYBRID_TIMESTAMP;
2426
import static org.apache.ignite.internal.metastorage.dsl.Conditions.notExists;
2527
import static org.apache.ignite.internal.metastorage.dsl.Conditions.or;
@@ -28,9 +30,11 @@
2830
import static org.apache.ignite.internal.metastorage.dsl.Operations.put;
2931
import static org.apache.ignite.internal.placementdriver.PlacementDriverManager.PLACEMENTDRIVER_LEASES_KEY;
3032
import static org.apache.ignite.internal.placementdriver.leases.Lease.emptyLease;
33+
import static org.apache.ignite.internal.util.ArrayUtils.BYTE_EMPTY_ARRAY;
3134
import static org.apache.ignite.internal.util.CollectionUtils.union;
3235
import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
3336
import static org.apache.ignite.internal.util.ExceptionUtils.hasCause;
37+
import static org.apache.ignite.internal.util.IgniteUtils.newHashMap;
3438

3539
import java.util.ArrayList;
3640
import java.util.Collection;
@@ -135,6 +139,14 @@ public class LeaseUpdater {
135139

136140
private final Executor throttledLogExecutor;
137141

142+
private CompletableFuture<?> leaseUpdateFuture = nullCompletedFuture();
143+
144+
/**
145+
* Leases cache for updating leases via {@link MetaStorageManager#invoke}. It is renewed right before the lease update, because leases
146+
* in {@link LeaseTracker} may be stale a bit, which is critical for invoke.
147+
*/
148+
private volatile Leases leases = new Leases(emptyMap(), BYTE_EMPTY_ARRAY);
149+
138150
/**
139151
* Constructor.
140152
*
@@ -388,6 +400,8 @@ public void run() {
388400

389401
try {
390402
if (active()) {
403+
waitForInflight();
404+
391405
updateLeaseBatchInternal();
392406
}
393407
} catch (Throwable e) {
@@ -408,6 +422,28 @@ public void run() {
408422
}
409423
}
410424

425+
private void waitForInflight() {
426+
try {
427+
leaseUpdateFuture.get(replicationConfiguration.leaseExpirationIntervalMillis().value() / 2, MILLISECONDS);
428+
} catch (Exception e) {
429+
LOG.info("Could not wait for the previous lease update to complete, proceeding with the next update attempt.", e);
430+
}
431+
432+
var entry = msManager.getLocally(PLACEMENTDRIVER_LEASES_KEY);
433+
434+
if (entry != null && entry.value() != null) {
435+
LeaseBatch leaseBatch = LeaseBatch.fromBytes(entry.value());
436+
Map<ReplicationGroupId, Lease> newLeasesMap = newHashMap(leaseBatch.leases().size());
437+
for (Lease lease : leaseBatch.leases()) {
438+
newLeasesMap.put(lease.replicationGroupId(), lease);
439+
}
440+
441+
leases = new Leases(newLeasesMap, entry.value());
442+
} else {
443+
leases = leaseTracker.leasesLatest();
444+
}
445+
}
446+
411447
/** Updates leases in Meta storage. This method is supposed to be used in the busy lock. */
412448
private void updateLeaseBatchInternal() {
413449
HybridTimestamp currentTime = clockService.current();
@@ -418,7 +454,7 @@ private void updateLeaseBatchInternal() {
418454

419455
HybridTimestamp newExpirationTimestamp = new HybridTimestamp(currentTime.getPhysical() + leaseExpirationInterval, 0);
420456

421-
Leases leasesCurrent = leaseTracker.leasesLatest();
457+
Leases leasesCurrent = leases;
422458
Map<ReplicationGroupId, LeaseAgreement> toBeNegotiated = new HashMap<>();
423459
Map<ReplicationGroupId, Lease> renewedLeases = new HashMap<>(leasesCurrent.leaseByGroupId().size());
424460

@@ -505,11 +541,12 @@ private void updateLeaseBatchInternal() {
505541
// so we must start a negotiation round from the beginning; the same we do for the groups that don't have
506542
// leaseholders at all.
507543
if (isLeaseOutdated) {
508-
LOG.info("Lease is expired, creating a new one [groupId={}, lease={}, candidate={}]", grpId, lease, candidate);
509-
510544
// New lease is granted.
511545
Lease newLease = writeNewLease(grpId, candidate, renewedLeases);
512546

547+
LOG.info("Lease is expired, creating a new one [groupId={}, oldLease={}, newLease={}, candidate={}]",
548+
grpId, lease, newLease, candidate);
549+
513550
boolean force = !lease.isProlongable() && lease.proposedCandidate() != null;
514551

515552
toBeNegotiated.put(grpId, new LeaseAgreement(newLease, force));
@@ -553,7 +590,7 @@ private void updateLeaseBatchInternal() {
553590

554591
byte[] renewedValue = new LeaseBatch(renewedLeases.values()).bytes();
555592

556-
msManager.invoke(
593+
leaseUpdateFuture = msManager.invoke(
557594
or(notExists(key), value(key).eq(leasesCurrent.leasesBytes())),
558595
put(key, renewedValue),
559596
noop()

modules/replicator/src/main/java/org/apache/ignite/internal/replicator/configuration/ReplicationConfigurationSchema.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@ public class ReplicationConfigurationSchema {
3131
/** Default value for {@link #idleSafeTimePropagationDurationMillis}. */
3232
public static final long DEFAULT_IDLE_SAFE_TIME_PROP_DURATION = TimeUnit.SECONDS.toMillis(1);
3333

34+
/** Default value for {@link #leaseExpirationIntervalMillis}. */
35+
public static final long DEFAULT_LEASE_EXPIRATION_INTERVAL_MILLIS = 5000;
36+
3437
/** Default value for {@link #batchSizeBytes}. */
3538
public static final int DEFAULT_BATCH_SIZE_BYTES = 8192;
3639

@@ -57,7 +60,7 @@ public class ReplicationConfigurationSchema {
5760
@Value(hasDefault = true)
5861
@Range(min = 2000, max = 120000)
5962
@PublicName(legacyNames = "leaseExpirationInterval")
60-
public long leaseExpirationIntervalMillis = 5_000;
63+
public long leaseExpirationIntervalMillis = DEFAULT_LEASE_EXPIRATION_INTERVAL_MILLIS;
6164

6265
@Value(hasDefault = true)
6366
@Range(max = 10_000)

0 commit comments

Comments
 (0)