Skip to content

Commit 586b210

Browse files
authored
Sleeps after eventual scan RPC failures (#6315)
Fixes two problems. First the scan server client side plugins were not computing a sleep time based on observed errors. This could cause aggressive retries with scans. Modified the provided plugins to compute this. Second the batch scanner code was not properly collecting the information needed by scan server client side plugin to know if errors happened. The batch scanner code was not collecting information for all tablets, just a somewhat random subset of them. Corrected it to collect for all tablets. Also made the batch scanner code properly report failed tablets to the client side scan server plugin. Both changes together fix #6313. Manually tested the batch scanner code changes by adding logs and running test to ensure the correct sleeps were happening. Also manually tested the scanner to ensure it was working correctly and sleeping as expected after errors.
1 parent 9961765 commit 586b210

7 files changed

Lines changed: 224 additions & 51 deletions

File tree

core/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,11 @@
174174
<artifactId>junit-jupiter-api</artifactId>
175175
<scope>test</scope>
176176
</dependency>
177+
<dependency>
178+
<groupId>org.junit.jupiter</groupId>
179+
<artifactId>junit-jupiter-params</artifactId>
180+
<scope>test</scope>
181+
</dependency>
177182
<dependency>
178183
<groupId>org.lz4</groupId>
179184
<artifactId>lz4-java</artifactId>

core/src/main/java/org/apache/accumulo/core/clientImpl/ScanServerAttemptImpl.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,4 +42,8 @@ public Result getResult() {
4242
return result;
4343
}
4444

45+
@Override
46+
public String toString() {
47+
return "server:" + server + " result:" + result;
48+
}
4549
}

core/src/main/java/org/apache/accumulo/core/clientImpl/ScanServerAttemptsImpl.java

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,11 @@
2626
import java.util.List;
2727
import java.util.Map;
2828
import java.util.Map.Entry;
29+
import java.util.Set;
2930

3031
import org.apache.accumulo.core.data.TabletId;
32+
import org.apache.accumulo.core.dataImpl.KeyExtent;
33+
import org.apache.accumulo.core.dataImpl.TabletIdImpl;
3134
import org.apache.accumulo.core.spi.scan.ScanServerAttempt;
3235
import org.slf4j.Logger;
3336
import org.slf4j.LoggerFactory;
@@ -47,14 +50,31 @@ public class ScanServerAttemptsImpl {
4750

4851
ScanServerAttemptReporter createReporter(String server, TabletId tablet) {
4952
return result -> {
50-
LOG.trace("Received result: {}", result);
53+
LOG.trace("Received result: {} {} {}", result, tablet, server);
5154
synchronized (attempts) {
5255
attempts.computeIfAbsent(tablet, k -> new ArrayList<>())
5356
.add(new ScanServerAttemptImpl(result, server));
5457
}
5558
};
5659
}
5760

61+
public interface BatchAttemptReporter {
62+
void report(Set<KeyExtent> extents, ScanServerAttempt.Result result);
63+
}
64+
65+
BatchAttemptReporter createReporter(String server) {
66+
return (tablets, result) -> {
67+
LOG.trace("Received result: {} {} {}", result, tablets, server);
68+
synchronized (attempts) {
69+
var attempt = new ScanServerAttemptImpl(result, server);
70+
tablets.forEach(extent -> {
71+
var tablet = new TabletIdImpl(extent);
72+
attempts.computeIfAbsent(tablet, k -> new ArrayList<>()).add(attempt);
73+
});
74+
}
75+
};
76+
}
77+
5878
/**
5979
* Creates and returns a snapshot of {@link ScanServerAttempt} objects that were added before this
6080
* call

core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReaderIterator.java

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import org.apache.accumulo.core.client.TableDeletedException;
5353
import org.apache.accumulo.core.client.TableNotFoundException;
5454
import org.apache.accumulo.core.client.TimedOutException;
55+
import org.apache.accumulo.core.clientImpl.ScanServerAttemptsImpl.BatchAttemptReporter;
5556
import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException;
5657
import org.apache.accumulo.core.data.Column;
5758
import org.apache.accumulo.core.data.Key;
@@ -365,12 +366,12 @@ private class QueryTask implements Runnable {
365366
private final List<Column> columns;
366367
private int semaphoreSize;
367368
private final long busyTimeout;
368-
private final ScanServerAttemptReporter reporter;
369+
private final BatchAttemptReporter reporter;
369370
private final Duration scanServerSelectorDelay;
370371

371372
QueryTask(String tsLocation, Map<KeyExtent,List<Range>> tabletsRanges,
372373
Map<KeyExtent,List<Range>> failures, ResultReceiver receiver, List<Column> columns,
373-
long busyTimeout, ScanServerAttemptReporter reporter, Duration scanServerSelectorDelay) {
374+
long busyTimeout, BatchAttemptReporter reporter, Duration scanServerSelectorDelay) {
374375
this.tsLocation = tsLocation;
375376
this.tabletsRanges = tabletsRanges;
376377
this.receiver = receiver;
@@ -401,6 +402,10 @@ public void run() {
401402
options, authorizations, timeoutTracker, busyTimeout);
402403

403404
if (!tsFailures.isEmpty()) {
405+
// On scan servers routine failures that occur on tservers, like not serving tablet or a
406+
// tablet closing, are not expected. So for scan server record any failures seen as an
407+
// error.
408+
reporter.report(tsFailures.keySet(), ScanServerAttempt.Result.ERROR);
404409
locator.invalidateCache(tsFailures.keySet());
405410
synchronized (failures) {
406411
failures.putAll(tsFailures);
@@ -422,7 +427,7 @@ public void run() {
422427
if (e.getCause() instanceof ScanServerBusyException) {
423428
result = ScanServerAttempt.Result.BUSY;
424429
}
425-
reporter.report(result);
430+
reporter.report(tabletsRanges.keySet(), result);
426431
} catch (AccumuloSecurityException e) {
427432
e.setTableInfo(getTableInfo());
428433
log.debug("AccumuloSecurityException thrown", e);
@@ -496,7 +501,6 @@ public void run() {
496501
}
497502
}
498503
}
499-
500504
}
501505

502506
private void doLookups(Map<String,Map<KeyExtent,List<Range>>> binnedRanges,
@@ -506,7 +510,7 @@ private void doLookups(Map<String,Map<KeyExtent,List<Range>>> binnedRanges,
506510

507511
long busyTimeout = 0;
508512
Duration scanServerSelectorDelay = null;
509-
Map<String,ScanServerAttemptReporter> reporters = Map.of();
513+
Map<String,BatchAttemptReporter> reporters = Map.of();
510514

511515
if (options.getConsistencyLevel().equals(ConsistencyLevel.EVENTUAL)) {
512516
var scanServerData = rebinToScanServers(binnedRanges, startTime);
@@ -564,7 +568,7 @@ private void doLookups(Map<String,Map<KeyExtent,List<Range>>> binnedRanges,
564568
final Map<KeyExtent,List<Range>> tabletsRanges = binnedRanges.get(tsLocation);
565569
if (maxTabletsPerRequest == Integer.MAX_VALUE || tabletsRanges.size() == 1) {
566570
QueryTask queryTask = new QueryTask(tsLocation, tabletsRanges, failures, receiver, columns,
567-
busyTimeout, reporters.getOrDefault(tsLocation, r -> {}), scanServerSelectorDelay);
571+
busyTimeout, reporters.getOrDefault(tsLocation, (t, r) -> {}), scanServerSelectorDelay);
568572
queryTasks.add(queryTask);
569573
} else {
570574
HashMap<KeyExtent,List<Range>> tabletSubset = new HashMap<>();
@@ -573,15 +577,16 @@ private void doLookups(Map<String,Map<KeyExtent,List<Range>>> binnedRanges,
573577
if (tabletSubset.size() >= maxTabletsPerRequest) {
574578
QueryTask queryTask =
575579
new QueryTask(tsLocation, tabletSubset, failures, receiver, columns, busyTimeout,
576-
reporters.getOrDefault(tsLocation, r -> {}), scanServerSelectorDelay);
580+
reporters.getOrDefault(tsLocation, (t, r) -> {}), scanServerSelectorDelay);
577581
queryTasks.add(queryTask);
578582
tabletSubset = new HashMap<>();
579583
}
580584
}
581585

582586
if (!tabletSubset.isEmpty()) {
583-
QueryTask queryTask = new QueryTask(tsLocation, tabletSubset, failures, receiver, columns,
584-
busyTimeout, reporters.getOrDefault(tsLocation, r -> {}), scanServerSelectorDelay);
587+
QueryTask queryTask =
588+
new QueryTask(tsLocation, tabletSubset, failures, receiver, columns, busyTimeout,
589+
reporters.getOrDefault(tsLocation, (t, r) -> {}), scanServerSelectorDelay);
585590
queryTasks.add(queryTask);
586591
}
587592
}
@@ -599,7 +604,7 @@ private void doLookups(Map<String,Map<KeyExtent,List<Range>>> binnedRanges,
599604
private static class ScanServerData {
600605
Map<String,Map<KeyExtent,List<Range>>> binnedRanges;
601606
ScanServerSelections actions;
602-
Map<String,ScanServerAttemptReporter> reporters;
607+
Map<String,BatchAttemptReporter> reporters;
603608
}
604609

605610
private ScanServerData rebinToScanServers(Map<String,Map<KeyExtent,List<Range>>> binnedRanges,
@@ -652,7 +657,7 @@ public <T> Optional<T> waitUntil(Supplier<Optional<T>> condition, Duration maxWa
652657

653658
Map<String,Map<KeyExtent,List<Range>>> binnedRanges2 = new HashMap<>();
654659

655-
Map<String,ScanServerAttemptReporter> reporters = new HashMap<>();
660+
Map<String,BatchAttemptReporter> reporters = new HashMap<>();
656661

657662
for (TabletIdImpl tabletId : tabletIds) {
658663
KeyExtent extent = tabletId.toKeyExtent();
@@ -672,7 +677,7 @@ public <T> Optional<T> waitUntil(Supplier<Optional<T>> condition, Duration maxWa
672677
rangeMap.put(extent, ranges);
673678

674679
var server = serverToUse;
675-
reporters.computeIfAbsent(serverToUse, k -> scanAttempts.createReporter(server, tabletId));
680+
reporters.computeIfAbsent(serverToUse, k -> scanAttempts.createReporter(server));
676681
}
677682

678683
ScanServerData ssd = new ScanServerData();

core/src/main/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerHostSelector.java

Lines changed: 37 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@
2020

2121
import static org.apache.accumulo.core.spi.scan.RendezvousHasher.Mode.HOST;
2222

23+
import java.time.Duration;
2324
import java.util.ArrayList;
25+
import java.util.Collection;
2426
import java.util.HashMap;
2527
import java.util.HashSet;
2628
import java.util.List;
@@ -82,8 +84,7 @@ public class ConfigurableScanServerHostSelector extends ConfigurableScanServerSe
8284
/**
8385
* @return map of previous failure keyed on host name with a set of servers per host
8486
*/
85-
Map<String,Set<String>> computeFailuresByHost(TabletId tablet, SelectorParameters params) {
86-
var attempts = params.getAttempts(tablet);
87+
Map<String,Set<String>> computeFailuresByHost(Collection<? extends ScanServerAttempt> attempts) {
8788
if (attempts.isEmpty()) {
8889
return Map.of();
8990
}
@@ -152,13 +153,25 @@ private List<String> getServersForHostAttempt(int hostAttempt, TabletId tablet,
152153
}
153154

154155
@Override
155-
int selectServers(SelectorParameters params, Profile profile, RendezvousHasher rhasher,
156-
Map<TabletId,String> serversToUse) {
156+
ScanServerSelections selectServers(ScanServerSelector.SelectorParameters params, Profile profile,
157+
RendezvousHasher rhasher) {
157158

158159
int maxHostAttempt = 0;
160+
int maxTabletErrors = 0;
161+
162+
HashMap<TabletId,String> serversToUse = new HashMap<>();
159163

160164
for (TabletId tablet : params.getTablets()) {
161-
Map<String,Set<String>> prevFailures = computeFailuresByHost(tablet, params);
165+
var attempts = params.getAttempts(tablet);
166+
Map<String,Set<String>> prevFailures = computeFailuresByHost(attempts);
167+
168+
int tabletErrors = 0;
169+
for (var attempt : attempts) {
170+
if (attempt.getResult() == ScanServerAttempt.Result.ERROR) {
171+
tabletErrors++;
172+
}
173+
}
174+
maxTabletErrors = Math.max(tabletErrors, maxTabletErrors);
162175

163176
for (int hostAttempt = 0; hostAttempt < profile.getAttemptPlans().size(); hostAttempt++) {
164177
maxHostAttempt = Math.max(hostAttempt, maxHostAttempt);
@@ -183,6 +196,24 @@ int selectServers(SelectorParameters params, Profile profile, RendezvousHasher r
183196
}
184197
}
185198

186-
return maxHostAttempt;
199+
Duration busyTO = Duration.ofMillis(profile.getBusyTimeout(maxHostAttempt));
200+
Duration delay = computeDelay(maxTabletErrors);
201+
202+
return new ScanServerSelections() {
203+
@Override
204+
public String getScanServer(TabletId tabletId) {
205+
return serversToUse.get(tabletId);
206+
}
207+
208+
@Override
209+
public Duration getDelay() {
210+
return delay;
211+
}
212+
213+
@Override
214+
public Duration getBusyTimeout() {
215+
return busyTO;
216+
}
217+
};
187218
}
188219
}

core/src/main/java/org/apache/accumulo/core/spi/scan/ConfigurableScanServerSelector.java

Lines changed: 48 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.ArrayList;
2727
import java.util.Collection;
2828
import java.util.HashMap;
29+
import java.util.HashSet;
2930
import java.util.List;
3031
import java.util.Map;
3132
import java.util.Optional;
@@ -424,34 +425,27 @@ public Duration getBusyTimeout() {
424425
};
425426
}
426427

427-
Map<TabletId,String> serversToUse = new HashMap<>();
428+
return selectServers(params, profile, rhasher);
429+
}
428430

429-
int maxAttempts = selectServers(params, profile, rhasher, serversToUse);
431+
protected Duration computeDelay(int errorAttempts) {
432+
if (errorAttempts == 0) {
433+
return Duration.ZERO;
434+
} else {
435+
return Duration.ofMillis((long) Math.min(30_000, 100 * Math.pow(2, (errorAttempts - 1))));
436+
}
437+
}
430438

431-
Duration busyTO = Duration.ofMillis(profile.getBusyTimeout(maxAttempts));
439+
ScanServerSelections selectServers(ScanServerSelector.SelectorParameters params, Profile profile,
440+
RendezvousHasher rhasher) {
441+
int attempts = 0;
442+
int errorAttempts = 0;
432443

433-
return new ScanServerSelections() {
434-
@Override
435-
public String getScanServer(TabletId tabletId) {
436-
return serversToUse.get(tabletId);
437-
}
444+
HashMap<TabletId,String> serversToUse = new HashMap<>();
438445

439-
@Override
440-
public Duration getDelay() {
441-
return Duration.ZERO;
442-
}
443-
444-
@Override
445-
public Duration getBusyTimeout() {
446-
return busyTO;
447-
}
448-
};
449-
}
450-
451-
int selectServers(ScanServerSelector.SelectorParameters params, Profile profile,
452-
RendezvousHasher rhasher, Map<TabletId,String> serversToUse) {
453-
int attempts = params.getTablets().stream()
454-
.mapToInt(tablet -> params.getAttempts(tablet).size()).max().orElse(0);
446+
for (TabletId tablet : params.getTablets()) {
447+
attempts = Math.max(attempts, params.getAttempts(tablet).size());
448+
}
455449

456450
int numServers = profile.getNumServers(attempts,
457451
rhasher.getSnapshot().getServersForGroup(profile.group).size());
@@ -461,9 +455,17 @@ int selectServers(ScanServerSelector.SelectorParameters params, Profile profile,
461455

462456
var tabletAttempts = params.getAttempts(tablet);
463457
if (!tabletAttempts.isEmpty()) {
458+
HashSet<String> attemptServers = new HashSet<>();
459+
int errorCount = 0;
460+
for (var attempt : tabletAttempts) {
461+
attemptServers.add(attempt.getServer());
462+
if (attempt.getResult() == ScanServerAttempt.Result.ERROR) {
463+
errorCount++;
464+
}
465+
}
466+
errorAttempts = Math.max(errorCount, errorAttempts);
464467
// remove servers that failed in previous attempts
465-
var attemptServers =
466-
tabletAttempts.stream().map(ScanServerAttempt::getServer).collect(Collectors.toSet());
468+
467469
var copy = rendezvousServers.stream().filter(server -> !attemptServers.contains(server))
468470
.collect(Collectors.toList());
469471
if (!copy.isEmpty()) {
@@ -475,6 +477,25 @@ int selectServers(ScanServerSelector.SelectorParameters params, Profile profile,
475477
String serverToUse = rendezvousServers.get(RANDOM.nextInt(rendezvousServers.size()));
476478
serversToUse.put(tablet, serverToUse);
477479
}
478-
return attempts;
480+
481+
Duration busyTO = Duration.ofMillis(profile.getBusyTimeout(attempts));
482+
Duration delay = computeDelay(errorAttempts);
483+
484+
return new ScanServerSelections() {
485+
@Override
486+
public String getScanServer(TabletId tabletId) {
487+
return serversToUse.get(tabletId);
488+
}
489+
490+
@Override
491+
public Duration getDelay() {
492+
return delay;
493+
}
494+
495+
@Override
496+
public Duration getBusyTimeout() {
497+
return busyTO;
498+
}
499+
};
479500
}
480501
}

0 commit comments

Comments
 (0)