Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@ public void readEntriesComplete(int rc, long ledgerId, long startEntryId, ByteBu
heardFromHosts.add(rctx.to);
heardFromHostsBitSet.set(rctx.bookieIndex, true);

/*
* Retain the response while this read op handles it. complete() returns true only when it
* transfers the buffers into request.entries. For digest failures, duplicate responses, or
* other incomplete paths, complete() returns false and this retained reference is released here.
*/
bufList.retain();
// if entry has completed don't handle twice
if (entry.complete(rctx.bookieIndex, rctx.to, bufList)) {
Expand Down Expand Up @@ -160,32 +165,41 @@ boolean complete(int bookieIndex, BookieId host, final ByteBufList bufList) {
if (isComplete()) {
return false;
}
if (!complete.getAndSet(true)) {

/*
* Verify the whole batch before creating LedgerEntryImpl instances. If any entry fails
* digest verification, no partial entries are retained and readEntriesComplete() releases
* the retained ByteBufList after this method returns false.
*/
for (int i = 0; i < bufList.size(); i++) {
ByteBuf buffer = bufList.getBuffer(i);
try {
lh.macManager.verifyDigestAndReturnData(eId + i, buffer);
} catch (BKException.BKDigestMatchException e) {
clientCtx.getClientStats().getReadOpDmCounter().inc();
logErrorAndReattemptRead(bookieIndex, host, "Mac mismatch",
BKException.Code.DigestMatchException);
return false;
}
}

if (complete.compareAndSet(false, true)) {
rc = BKException.Code.OK;
for (int i = 0; i < bufList.size(); i++) {
ByteBuf buffer = bufList.getBuffer(i);
ByteBuf content;
try {
content = lh.macManager.verifyDigestAndReturnData(eId + i, buffer);
} catch (BKException.BKDigestMatchException e) {
clientCtx.getClientStats().getReadOpDmCounter().inc();
logErrorAndReattemptRead(bookieIndex, host, "Mac mismatch",
BKException.Code.DigestMatchException);
return false;
}
rc = BKException.Code.OK;
/*
* The length is a long and it is the last field of the metadata of an entry.
* Consequently, we have to subtract 8 from METADATA_LENGTH to get the length.
*/
LedgerEntryImpl entryImpl = LedgerEntryImpl.create(lh.ledgerId, startEntryId + i);
LedgerEntryImpl entryImpl = LedgerEntryImpl.create(lh.ledgerId, startEntryId + i);
entryImpl.setLength(buffer.getLong(DigestManager.METADATA_LENGTH - 8));
entryImpl.setEntryBuf(content);
entryImpl.setEntryBuf(buffer);
entries.add(entryImpl);
}
writeSet.recycle();
return true;
} else {
writeSet.recycle();
// Another response completed the request first; readEntriesComplete() releases bufList.
return false;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,32 @@
package org.apache.bookkeeper.client;

import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.bookie.Bookie;
import org.apache.bookkeeper.bookie.BookieException;
import org.apache.bookkeeper.bookie.TestBookieImpl;
import org.apache.bookkeeper.client.BKException.Code;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.client.api.LedgerEntry;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.awaitility.Awaitility;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -289,4 +299,130 @@ public void testReadFailureWithFailedBookies() throws Exception {
lh.close();
newBk.close();
}

@Test
public void testDigestMismatchRetriesNextReplicaAndCompletes() throws Exception {
ClientConfiguration conf = new ClientConfiguration(baseClientConf)
.setUseV2WireProtocol(true)
.setReorderReadSequenceEnabled(false)
.setMetadataServiceUri(zkUtil.getMetadataServiceUri());

try (BookKeeperTestClient bk = new BookKeeperTestClient(conf)) {
byte[] data = "batch-digest-data".getBytes(StandardCharsets.UTF_8);
LedgerHandle writer = bk.createLedger(3, 3, 2, digestType, passwd);
writer.addEntry(data);
long ledgerId = writer.getId();
BookieId corruptReplica = writer.getLedgerMetadata().getAllEnsembles().get(0L).get(0);
writer.close();

ServerConfiguration corruptConf = killBookie(corruptReplica);
startAndAddBookie(corruptConf, new CorruptReadBookie(corruptConf));

LedgerHandle reader = bk.openLedger(ledgerId, digestType, passwd);
BatchedReadOp readOp = new BatchedReadOp(reader, bk.getClientCtx(), 0, 1, 1024, false);
readOp.submit();

Iterator<LedgerEntry> entries = readOp.future().get().iterator();
assertTrue(entries.hasNext());
LedgerEntry entry = entries.next();
assertArrayEquals(data, entry.getEntryBytes());
entry.close();
assertFalse(entries.hasNext());
reader.close();
}
}

@Test
public void testDigestMismatchAfterPartialVerificationDoesNotRetainEntries() throws Exception {
ClientConfiguration conf = new ClientConfiguration(baseClientConf)
.setUseV2WireProtocol(true)
.setReorderReadSequenceEnabled(false)
.setMetadataServiceUri(zkUtil.getMetadataServiceUri());

try (BookKeeperTestClient bk = new BookKeeperTestClient(conf)) {
byte[] entry0 = "batch-digest-entry-0".getBytes(StandardCharsets.UTF_8);
byte[] entry1 = "batch-digest-entry-1".getBytes(StandardCharsets.UTF_8);
LedgerHandle writer = bk.createLedger(3, 3, 2, digestType, passwd);
writer.addEntry(entry0);
writer.addEntry(entry1);
long ledgerId = writer.getId();
List<BookieId> ensemble = writer.getLedgerMetadata().getAllEnsembles().get(0L);
BookieId corruptReplica = ensemble.get(0);
BookieId retryReplica = ensemble.get(1);
writer.close();

CountDownLatch corruptReadLatch = new CountDownLatch(1);
ServerConfiguration corruptConf = killBookie(corruptReplica);
startAndAddBookie(corruptConf, new CorruptReadBookie(corruptConf, 1L, corruptReadLatch));

CountDownLatch retryLatch = new CountDownLatch(1);
sleepBookie(retryReplica, retryLatch);

LedgerHandle reader = null;
try {
reader = bk.openLedger(ledgerId, digestType, passwd);
BatchedReadOp readOp = new BatchedReadOp(reader, bk.getClientCtx(), 0, 2, 2048, false);
readOp.submit();

assertTrue("corrupt replica did not read the corrupted entry",
corruptReadLatch.await(10, TimeUnit.SECONDS));
Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
assertNotNull(readOp.request);
BatchedReadOp.SequenceReadRequest request =
(BatchedReadOp.SequenceReadRequest) readOp.request;
assertTrue(request.nextReplicaIndexToReadFrom >= 2);
});
assertTrue("digest mismatch must not retain partially verified entries",
readOp.request.entries.isEmpty());

retryLatch.countDown();
Iterator<LedgerEntry> entries = readOp.future().get(10, TimeUnit.SECONDS).iterator();
assertTrue(entries.hasNext());
LedgerEntry first = entries.next();
assertArrayEquals(entry0, first.getEntryBytes());
first.close();
assertTrue(entries.hasNext());
LedgerEntry second = entries.next();
assertArrayEquals(entry1, second.getEntryBytes());
second.close();
assertFalse(entries.hasNext());
} finally {
retryLatch.countDown();
if (reader != null) {
reader.close();
}
}
}
}

static class CorruptReadBookie extends TestBookieImpl {
private final long corruptEntryId;
private final CountDownLatch corruptReadLatch;

CorruptReadBookie(ServerConfiguration conf) throws Exception {
this(conf, -1L, null);
}

CorruptReadBookie(ServerConfiguration conf, long corruptEntryId, CountDownLatch corruptReadLatch)
throws Exception {
super(conf);
this.corruptEntryId = corruptEntryId;
this.corruptReadLatch = corruptReadLatch;
}

@Override
public ByteBuf readEntry(long ledgerId, long entryId)
throws IOException, Bookie.NoLedgerException, BookieException {
ByteBuf localBuf = super.readEntry(ledgerId, entryId);
if (corruptEntryId < 0 || corruptEntryId == entryId) {
for (int i = 0; i < localBuf.capacity(); i++) {
localBuf.setByte(i, 0);
}
if (corruptReadLatch != null) {
corruptReadLatch.countDown();
}
}
return localBuf;
}
}
}