Skip to content

Commit 4d3635e

Browse files
committed
CCBC-1671: Send requests to all replica nodes for strategy "ANY"
This change makes implementation of get_any_replica behave the same way as other SDKs. The old behavior: "get any replica" would send requests sequentially one by one. So that next request will be dispatched only when previous fails. The new behavior: "get any replica" send requests to active and replica nodes simulaneously and returns only first response to the user, discarding everything else. Change-Id: Idc6db89b8a0b99623369a712fdf261c8ba201927 Reviewed-on: https://review.couchbase.org/c/libcouchbase/+/232672 Tested-by: Build Bot <build@couchbase.com> Reviewed-by: Sergey Avseyev <sergey.avseyev@gmail.com>
1 parent f3eb280 commit 4d3635e

2 files changed

Lines changed: 24 additions & 68 deletions

File tree

src/operations/get_replica.cc

Lines changed: 22 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,7 @@ struct RGetCookie : mc_REQDATAEX {
171171
unsigned r_cur{0};
172172
unsigned r_max;
173173
int remaining{0};
174+
bool skip{false};
174175
int vbucket;
175176
get_replica_mode strategy;
176177
lcb_INSTANCE *instance;
@@ -210,35 +211,10 @@ static void rget_callback(mc_PIPELINE *pipeline, mc_PACKET *pkt, lcb_CALLBACK_TY
210211
}
211212
callback(instance, LCB_CALLBACK_GETREPLICA, (const lcb_RESPBASE *)resp);
212213
} else {
213-
mc_CMDQUEUE *cq = &instance->cmdq;
214-
mc_PIPELINE *nextpl = nullptr;
215-
216-
/** FIRST */
217-
do {
218-
int nextix;
219-
rck->r_cur++;
220-
nextix = lcbvb_vbreplica(cq->config, rck->vbucket, rck->r_cur);
221-
if (nextix > -1 && nextix < (int)cq->npipelines) {
222-
/* have a valid next index? */
223-
nextpl = cq->pipelines[nextix];
224-
break;
225-
}
226-
} while (rck->r_cur < rck->r_max);
227-
228-
if (err == LCB_SUCCESS || rck->r_cur == rck->r_max || nextpl == nullptr) {
214+
if (!rck->skip && (err == LCB_SUCCESS || rck->remaining == 1)) {
215+
rck->skip = true;
229216
resp->rflags |= LCB_RESP_F_FINAL;
230217
callback(instance, LCB_CALLBACK_GETREPLICA, (lcb_RESPBASE *)resp);
231-
/* refcount=1 . Free this now */
232-
rck->remaining = 1;
233-
} else if (err != LCB_SUCCESS) {
234-
mc_PACKET *newpkt = mcreq_renew_packet(pkt);
235-
newpkt->flags &= ~MCREQ_STATE_FLAGS;
236-
mcreq_sched_add(nextpl, newpkt);
237-
/* Use this, rather than lcb_sched_leave(), because this is being
238-
* invoked internally by the library. */
239-
mcreq_sched_leave(cq, 1);
240-
/* wait */
241-
rck->remaining = 2;
242218
}
243219
}
244220
rck->decref();
@@ -306,16 +282,6 @@ static std::vector<readable_node> select_effective_node_indexes(lcb_INSTANCE *in
306282
}
307283
}
308284

309-
if (cmd->mode() == get_replica_mode::any) {
310-
if (effective_nodes.empty()) {
311-
return {};
312-
}
313-
static std::random_device rd;
314-
std::mt19937 g(rd());
315-
std::shuffle(effective_nodes.begin(), effective_nodes.end(), g);
316-
return {effective_nodes.front()};
317-
}
318-
319285
/* read from active, if it is accessible */
320286
if (active_index >= 0) {
321287
if (use_preferred_server_group) {
@@ -359,17 +325,6 @@ static lcb_STATUS get_replica_validate(lcb_INSTANCE *instance, const lcb_CMDGETR
359325
break;
360326

361327
case get_replica_mode::any:
362-
for (r0 = 0; r0 < LCBT_NREPLICAS(instance); r0++) {
363-
if (lcbvb_vbreplica(cq->config, vbid, r0) > -1) {
364-
r1 = r0;
365-
break;
366-
}
367-
}
368-
if (r0 == LCBT_NREPLICAS(instance)) {
369-
return LCB_ERR_NO_MATCHING_SERVER;
370-
}
371-
break;
372-
373328
case get_replica_mode::all:
374329
if (cmd->read_preference() == LCB_REPLICA_READ_PREFERENCE_SELECTED_SERVER_GROUP) {
375330
auto effective_nodes = select_effective_node_indexes(instance, cmd);
@@ -378,11 +333,18 @@ static lcb_STATUS get_replica_validate(lcb_INSTANCE *instance, const lcb_CMDGETR
378333
}
379334
}
380335

381-
r0 = 0;
382-
r1 = LCBT_NREPLICAS(instance);
383-
/* Make sure they're all online */
384-
for (unsigned ii = 0; ii < LCBT_NREPLICAS(instance); ii++) {
385-
if (lcbvb_vbreplica(cq->config, vbid, ii) < 0) {
336+
{
337+
bool no_replicas = true;
338+
r0 = 0;
339+
r1 = LCBT_NREPLICAS(instance);
340+
/* Make sure at least one of them is online */
341+
for (unsigned ii = 0; ii < LCBT_NREPLICAS(instance); ii++) {
342+
if (lcbvb_vbreplica(cq->config, vbid, ii) >= 0) {
343+
no_replicas = false;
344+
break;
345+
}
346+
}
347+
if (no_replicas) {
386348
return LCB_ERR_NO_MATCHING_SERVER;
387349
}
388350
}
@@ -422,27 +384,21 @@ static lcb_STATUS get_replica_schedule(lcb_INSTANCE *instance, std::shared_ptr<l
422384
break;
423385

424386
case get_replica_mode::all:
387+
case get_replica_mode::any: {
388+
bool no_replicas = true;
425389
r0 = 0;
426390
r1 = LCBT_NREPLICAS(instance);
427-
/* Make sure they're all online */
391+
/* Make sure at least one of them is online */
428392
for (unsigned ii = 0; ii < LCBT_NREPLICAS(instance); ii++) {
429-
if ((ixtmp = lcbvb_vbreplica(cq->config, vbid, ii)) < 0) {
430-
return LCB_ERR_NO_MATCHING_SERVER;
431-
}
432-
}
433-
break;
434-
435-
case get_replica_mode::any:
436-
for (r0 = 0; r0 < LCBT_NREPLICAS(instance); r0++) {
437-
if ((ixtmp = lcbvb_vbreplica(cq->config, vbid, r0)) > -1) {
438-
r1 = r0;
393+
if (lcbvb_vbreplica(cq->config, vbid, ii) >= 0) {
394+
no_replicas = false;
439395
break;
440396
}
441397
}
442-
if (r0 == LCBT_NREPLICAS(instance)) {
398+
if (no_replicas) {
443399
return LCB_ERR_NO_MATCHING_SERVER;
444400
}
445-
break;
401+
}
446402
}
447403

448404
if (r1 < r0 || r1 >= cq->npipelines) {

tests/iotests/t_get.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -488,13 +488,13 @@ TEST_F(GetUnitTest, testGetReplica)
488488
lcb_wait(instance, LCB_WAIT_DEFAULT);
489489
ASSERT_EQ(0, rck.remaining);
490490

491-
// Try with ALL again (should give an error)
491+
// Try with ALL again (should not give an error)
492492
lcb_cmdgetreplica_create(&rcmd, LCB_REPLICA_MODE_ALL);
493493
lcb_cmdgetreplica_key(rcmd, key.c_str(), key.size());
494494
lcb_sched_enter(instance);
495495
err = lcb_getreplica(instance, nullptr, rcmd);
496496
lcb_cmdgetreplica_destroy(rcmd);
497-
ASSERT_EQ(LCB_ERR_NO_MATCHING_SERVER, err);
497+
ASSERT_EQ(LCB_SUCCESS, err);
498498
lcb_sched_leave(instance);
499499

500500
vb->servers[2] = oldix;

0 commit comments

Comments
 (0)