diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BatchedReadOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BatchedReadOp.java index ebba79faea8..2fdb70195c3 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BatchedReadOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BatchedReadOp.java @@ -112,6 +112,11 @@ public void readEntriesComplete(int rc, long ledgerId, long startEntryId, ByteBu heardFromHosts.add(rctx.to); heardFromHostsBitSet.set(rctx.bookieIndex, true); + /* + * Retain the response while this read op handles it. complete() returns true only when it + * transfers the buffers into request.entries. For digest failures, duplicate responses, or + * other incomplete paths, complete() returns false and this retained reference is released here. + */ bufList.retain(); // if entry has completed don't handle twice if (entry.complete(rctx.bookieIndex, rctx.to, bufList)) { @@ -160,32 +165,41 @@ boolean complete(int bookieIndex, BookieId host, final ByteBufList bufList) { if (isComplete()) { return false; } - if (!complete.getAndSet(true)) { + + /* + * Verify the whole batch before creating LedgerEntryImpl instances. If any entry fails + * digest verification, no partial entries are retained and readEntriesComplete() releases + * the retained ByteBufList after this method returns false. + */ + for (int i = 0; i < bufList.size(); i++) { + ByteBuf buffer = bufList.getBuffer(i); + try { + lh.macManager.verifyDigestAndReturnData(eId + i, buffer); + } catch (BKException.BKDigestMatchException e) { + clientCtx.getClientStats().getReadOpDmCounter().inc(); + logErrorAndReattemptRead(bookieIndex, host, "Mac mismatch", + BKException.Code.DigestMatchException); + return false; + } + } + + if (complete.compareAndSet(false, true)) { + rc = BKException.Code.OK; for (int i = 0; i < bufList.size(); i++) { ByteBuf buffer = bufList.getBuffer(i); - ByteBuf content; - try { - content = lh.macManager.verifyDigestAndReturnData(eId + i, buffer); - } catch (BKException.BKDigestMatchException e) { - clientCtx.getClientStats().getReadOpDmCounter().inc(); - logErrorAndReattemptRead(bookieIndex, host, "Mac mismatch", - BKException.Code.DigestMatchException); - return false; - } - rc = BKException.Code.OK; /* * The length is a long and it is the last field of the metadata of an entry. * Consequently, we have to subtract 8 from METADATA_LENGTH to get the length. */ - LedgerEntryImpl entryImpl = LedgerEntryImpl.create(lh.ledgerId, startEntryId + i); + LedgerEntryImpl entryImpl = LedgerEntryImpl.create(lh.ledgerId, startEntryId + i); entryImpl.setLength(buffer.getLong(DigestManager.METADATA_LENGTH - 8)); - entryImpl.setEntryBuf(content); + entryImpl.setEntryBuf(buffer); entries.add(entryImpl); } writeSet.recycle(); return true; } else { - writeSet.recycle(); + // Another response completed the request first; readEntriesComplete() releases bufList. return false; } } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestBatchedRead.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestBatchedRead.java index 1bb95ed0478..7e11d2f70a1 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestBatchedRead.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestBatchedRead.java @@ -21,22 +21,32 @@ package org.apache.bookkeeper.client; import static org.apache.bookkeeper.common.concurrent.FutureUtils.result; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import io.netty.buffer.ByteBuf; +import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.Iterator; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import org.apache.bookkeeper.bookie.Bookie; +import org.apache.bookkeeper.bookie.BookieException; +import org.apache.bookkeeper.bookie.TestBookieImpl; import org.apache.bookkeeper.client.BKException.Code; import org.apache.bookkeeper.client.BookKeeper.DigestType; import org.apache.bookkeeper.client.api.LedgerEntry; import org.apache.bookkeeper.conf.ClientConfiguration; +import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.net.BookieId; import org.apache.bookkeeper.test.BookKeeperClusterTestCase; +import org.awaitility.Awaitility; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -289,4 +299,130 @@ public void testReadFailureWithFailedBookies() throws Exception { lh.close(); newBk.close(); } + + @Test + public void testDigestMismatchRetriesNextReplicaAndCompletes() throws Exception { + ClientConfiguration conf = new ClientConfiguration(baseClientConf) + .setUseV2WireProtocol(true) + .setReorderReadSequenceEnabled(false) + .setMetadataServiceUri(zkUtil.getMetadataServiceUri()); + + try (BookKeeperTestClient bk = new BookKeeperTestClient(conf)) { + byte[] data = "batch-digest-data".getBytes(StandardCharsets.UTF_8); + LedgerHandle writer = bk.createLedger(3, 3, 2, digestType, passwd); + writer.addEntry(data); + long ledgerId = writer.getId(); + BookieId corruptReplica = writer.getLedgerMetadata().getAllEnsembles().get(0L).get(0); + writer.close(); + + ServerConfiguration corruptConf = killBookie(corruptReplica); + startAndAddBookie(corruptConf, new CorruptReadBookie(corruptConf)); + + LedgerHandle reader = bk.openLedger(ledgerId, digestType, passwd); + BatchedReadOp readOp = new BatchedReadOp(reader, bk.getClientCtx(), 0, 1, 1024, false); + readOp.submit(); + + Iterator entries = readOp.future().get().iterator(); + assertTrue(entries.hasNext()); + LedgerEntry entry = entries.next(); + assertArrayEquals(data, entry.getEntryBytes()); + entry.close(); + assertFalse(entries.hasNext()); + reader.close(); + } + } + + @Test + public void testDigestMismatchAfterPartialVerificationDoesNotRetainEntries() throws Exception { + ClientConfiguration conf = new ClientConfiguration(baseClientConf) + .setUseV2WireProtocol(true) + .setReorderReadSequenceEnabled(false) + .setMetadataServiceUri(zkUtil.getMetadataServiceUri()); + + try (BookKeeperTestClient bk = new BookKeeperTestClient(conf)) { + byte[] entry0 = "batch-digest-entry-0".getBytes(StandardCharsets.UTF_8); + byte[] entry1 = "batch-digest-entry-1".getBytes(StandardCharsets.UTF_8); + LedgerHandle writer = bk.createLedger(3, 3, 2, digestType, passwd); + writer.addEntry(entry0); + writer.addEntry(entry1); + long ledgerId = writer.getId(); + List ensemble = writer.getLedgerMetadata().getAllEnsembles().get(0L); + BookieId corruptReplica = ensemble.get(0); + BookieId retryReplica = ensemble.get(1); + writer.close(); + + CountDownLatch corruptReadLatch = new CountDownLatch(1); + ServerConfiguration corruptConf = killBookie(corruptReplica); + startAndAddBookie(corruptConf, new CorruptReadBookie(corruptConf, 1L, corruptReadLatch)); + + CountDownLatch retryLatch = new CountDownLatch(1); + sleepBookie(retryReplica, retryLatch); + + LedgerHandle reader = null; + try { + reader = bk.openLedger(ledgerId, digestType, passwd); + BatchedReadOp readOp = new BatchedReadOp(reader, bk.getClientCtx(), 0, 2, 2048, false); + readOp.submit(); + + assertTrue("corrupt replica did not read the corrupted entry", + corruptReadLatch.await(10, TimeUnit.SECONDS)); + Awaitility.await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> { + assertNotNull(readOp.request); + BatchedReadOp.SequenceReadRequest request = + (BatchedReadOp.SequenceReadRequest) readOp.request; + assertTrue(request.nextReplicaIndexToReadFrom >= 2); + }); + assertTrue("digest mismatch must not retain partially verified entries", + readOp.request.entries.isEmpty()); + + retryLatch.countDown(); + Iterator entries = readOp.future().get(10, TimeUnit.SECONDS).iterator(); + assertTrue(entries.hasNext()); + LedgerEntry first = entries.next(); + assertArrayEquals(entry0, first.getEntryBytes()); + first.close(); + assertTrue(entries.hasNext()); + LedgerEntry second = entries.next(); + assertArrayEquals(entry1, second.getEntryBytes()); + second.close(); + assertFalse(entries.hasNext()); + } finally { + retryLatch.countDown(); + if (reader != null) { + reader.close(); + } + } + } + } + + static class CorruptReadBookie extends TestBookieImpl { + private final long corruptEntryId; + private final CountDownLatch corruptReadLatch; + + CorruptReadBookie(ServerConfiguration conf) throws Exception { + this(conf, -1L, null); + } + + CorruptReadBookie(ServerConfiguration conf, long corruptEntryId, CountDownLatch corruptReadLatch) + throws Exception { + super(conf); + this.corruptEntryId = corruptEntryId; + this.corruptReadLatch = corruptReadLatch; + } + + @Override + public ByteBuf readEntry(long ledgerId, long entryId) + throws IOException, Bookie.NoLedgerException, BookieException { + ByteBuf localBuf = super.readEntry(ledgerId, entryId); + if (corruptEntryId < 0 || corruptEntryId == entryId) { + for (int i = 0; i < localBuf.capacity(); i++) { + localBuf.setByte(i, 0); + } + if (corruptReadLatch != null) { + corruptReadLatch.countDown(); + } + } + return localBuf; + } + } }