2424#include "capi/cmd_get.hh"
2525#include "capi/cmd_get_replica.hh"
2626
27+ #include <random>
28+ #include <algorithm>
29+
2730LIBCOUCHBASE_API lcb_STATUS lcb_respgetreplica_status (const lcb_RESPGETREPLICA * resp )
2831{
2932 return resp -> ctx .rc ;
@@ -139,6 +142,12 @@ LIBCOUCHBASE_API lcb_STATUS lcb_cmdgetreplica_key(lcb_CMDGETREPLICA *cmd, const
139142 return cmd -> key (std ::string (key , key_len ));
140143}
141144
145+ LIBCOUCHBASE_API lcb_STATUS lcb_cmdgetreplica_read_preference (lcb_CMDGETREPLICA * cmd ,
146+ lcb_REPLICA_READ_PREFERENCE preference )
147+ {
148+ return cmd -> read_preference (preference );
149+ }
150+
142151LIBCOUCHBASE_API lcb_STATUS lcb_cmdgetreplica_on_behalf_of (lcb_CMDGETREPLICA * cmd , const char * data , size_t data_len )
143152{
144153 return cmd -> on_behalf_of (std ::string (data , data_len ));
@@ -243,6 +252,90 @@ RGetCookie::RGetCookie(void *cookie_, lcb_INSTANCE *instance_, get_replica_mode
243252{
244253}
245254
255+ struct readable_node {
256+ bool is_replica ;
257+ std ::size_t node_index ; // index of the node in configuration
258+ std ::size_t replica_index ; // zero-based index of the replica
259+ };
260+
261+ template < typename COMMAND >
262+ static std ::vector < readable_node > select_effective_node_indexes (lcb_INSTANCE * instance , COMMAND cmd )
263+ {
264+ mc_CMDQUEUE * cq = & instance -> cmdq ;
265+ int vbid ;
266+ int active_index ;
267+ lcb_KEYBUF keybuf {LCB_KV_COPY , {cmd -> key ().c_str (), cmd -> key ().size ()}};
268+ mcreq_map_key (cq , & keybuf , MCREQ_PKT_BASESIZE , & vbid , & active_index );
269+
270+ if (active_index < 0 ) {
271+ return {};
272+ }
273+
274+ std ::vector < readable_node > effective_nodes {};
275+
276+ if (cmd -> mode () == get_replica_mode ::select ) {
277+ int replica_index = cmd -> selected_replica_index ();
278+ int node_index = lcbvb_vbreplica (cq -> config , vbid , replica_index );
279+ if (node_index >= 0 ) {
280+ effective_nodes .push_back (
281+ {true, static_cast < std ::size_t > (node_index ), static_cast < std ::size_t > (replica_index )});
282+ }
283+ return effective_nodes ;
284+ }
285+
286+ std ::string preferred_server_group {};
287+ bool use_preferred_server_group {cmd -> read_preference () == LCB_REPLICA_READ_PREFERENCE_SELECTED_SERVER_GROUP };
288+
289+ if (use_preferred_server_group ) {
290+ if (instance -> settings -> preferred_server_group == nullptr ) {
291+ return {};
292+ } else {
293+ preferred_server_group = instance -> settings -> preferred_server_group ;
294+ }
295+ }
296+
297+ /* Make sure they're all online */
298+ for (std ::size_t ii = 0 ; ii < LCBT_NREPLICAS (instance ); ii ++ ) {
299+ int node_index = lcbvb_vbreplica (cq -> config , vbid , ii );
300+ if (node_index < 0 ) {
301+ if (cmd -> mode () == get_replica_mode ::all ) {
302+ return {};
303+ }
304+ continue ;
305+ }
306+ if (use_preferred_server_group ) {
307+ const char * server_group = cq -> config -> servers [node_index ].server_group ;
308+ if (server_group != nullptr && preferred_server_group == server_group ) {
309+ effective_nodes .push_back ({true, static_cast < std ::size_t > (node_index ), ii });
310+ }
311+ } else {
312+ effective_nodes .push_back ({true, static_cast < std ::size_t > (node_index ), ii });
313+ }
314+ }
315+
316+ if (cmd -> mode () == get_replica_mode ::any ) {
317+ if (effective_nodes .empty ()) {
318+ return {};
319+ }
320+ static std ::random_device rd ;
321+ std ::mt19937 g (rd ());
322+ std ::shuffle (effective_nodes .begin (), effective_nodes .end (), g );
323+ return {effective_nodes .front ()};
324+ }
325+
326+ /* read from active */
327+ if (use_preferred_server_group ) {
328+ const char * active_server_group = cq -> config -> servers [active_index ].server_group ;
329+ if (active_server_group != nullptr && preferred_server_group == active_server_group ) {
330+ effective_nodes .push_back ({false, static_cast < std ::size_t > (active_index ), 0xff });
331+ }
332+ } else {
333+ effective_nodes .push_back ({false, static_cast < std ::size_t > (active_index ), 0xff });
334+ }
335+
336+ return effective_nodes ;
337+ }
338+
246339static lcb_STATUS get_replica_validate (lcb_INSTANCE * instance , const lcb_CMDGETREPLICA * cmd )
247340{
248341 if (cmd -> key ().empty ()) {
@@ -283,6 +376,13 @@ static lcb_STATUS get_replica_validate(lcb_INSTANCE *instance, const lcb_CMDGETR
283376 break ;
284377
285378 case get_replica_mode ::all :
379+ if (cmd -> read_preference () == LCB_REPLICA_READ_PREFERENCE_SELECTED_SERVER_GROUP ) {
380+ auto effective_nodes = select_effective_node_indexes (instance , cmd );
381+ if (effective_nodes .empty ()) {
382+ return LCB_ERR_DOCUMENT_UNRETRIEVABLE ;
383+ }
384+ }
385+
286386 r0 = 0 ;
287387 r1 = LCBT_NREPLICAS (instance );
288388 /* Make sure they're all online */
@@ -309,7 +409,6 @@ static lcb_STATUS get_replica_schedule(lcb_INSTANCE *instance, std::shared_ptr<l
309409 */
310410 mc_CMDQUEUE * cq = & instance -> cmdq ;
311411 int vbid , ixtmp ;
312- protocol_binary_request_header req {};
313412 unsigned r0 = 0 , r1 = 0 ;
314413
315414 lcb_KEYBUF keybuf {LCB_KV_COPY , {cmd -> key ().c_str (), cmd -> key ().size ()}};
@@ -355,6 +454,11 @@ static lcb_STATUS get_replica_schedule(lcb_INSTANCE *instance, std::shared_ptr<l
355454 return LCB_ERR_NO_MATCHING_SERVER ;
356455 }
357456
457+ const auto effective_nodes = select_effective_node_indexes (instance , cmd );
458+ if (effective_nodes .empty ()) {
459+ return LCB_ERR_DOCUMENT_UNRETRIEVABLE ;
460+ }
461+
358462 std ::vector < std ::uint8_t > framing_extras ;
359463 if (cmd -> want_impersonation ()) {
360464 lcb_STATUS err = lcb ::flexible_framing_extras ::encode_impersonate_user (cmd -> impostor (), framing_extras );
@@ -376,68 +480,67 @@ static lcb_STATUS get_replica_schedule(lcb_INSTANCE *instance, std::shared_ptr<l
376480 rck -> start + cmd -> timeout_or_default_in_nanoseconds (LCB_US2NS (LCBT_SETTING (instance , operation_timeout )));
377481
378482 /* Initialize the packet */
483+ protocol_binary_request_header req {};
379484 req .request .magic = framing_extras .empty () ? PROTOCOL_BINARY_REQ : PROTOCOL_BINARY_AREQ ;
380- req .request .opcode = PROTOCOL_BINARY_CMD_GET_REPLICA ;
381485 req .request .datatype = PROTOCOL_BINARY_RAW_BYTES ;
382486 req .request .vbucket = htons (static_cast < std ::uint16_t > (vbid ));
383487 req .request .cas = 0 ;
384488 req .request .extlen = 0 ;
385489
386490 auto ffextlen = static_cast < std ::uint8_t > (framing_extras .size ());
387491
388- rck -> r_cur = r0 ;
389- do {
390- int curix ;
391- mc_PIPELINE * pl ;
392- mc_PACKET * pkt ;
393-
394- curix = lcbvb_vbreplica ( cq -> config , vbid , r0 );
395- /* XXX: this is always expected to be in range. For the FIRST mode
396- * it will seek to the first valid index (checked above), and for the
397- * ALL mode, it will fail if not all replicas are already online
398- * (also checked above) */
399- pl = cq -> pipelines [curix ];
400- pkt = mcreq_allocate_packet (pl );
401- if (!pkt ) {
402- delete rck ;
403- return LCB_ERR_NO_MEMORY ;
404- }
492+ for ( auto node : effective_nodes ) {
493+ if ( node . is_replica ) {
494+ req . request . opcode = PROTOCOL_BINARY_CMD_GET_REPLICA ;
495+ rck -> r_cur = node . replica_index ;
496+ mc_PIPELINE * pl ;
497+ mc_PACKET * pkt ;
498+
499+ /* XXX: this is always expected to be in range. For the FIRST mode
500+ * it will seek to the first valid index (checked above), and for the
501+ * ALL mode, it will fail if not all replicas are already online
502+ * (also checked above) */
503+ pl = cq -> pipelines [node . node_index ];
504+ pkt = mcreq_allocate_packet (pl );
505+ if (!pkt ) {
506+ delete rck ;
507+ return LCB_ERR_NO_MEMORY ;
508+ }
405509
406- pkt -> u_rdata .exdata = rck ;
407- pkt -> flags |= MCREQ_F_REQEXT ;
408-
409- mcreq_reserve_key (pl , pkt , sizeof (req .bytes ) + ffextlen , & keybuf , cmd -> collection ().collection_id ());
410- size_t nkey = pkt -> kh_span .size - MCREQ_PKT_BASESIZE + pkt -> extlen ;
411- req .request .keylen = htons ((uint16_t )nkey );
412- req .request .bodylen = htonl ((uint32_t )nkey + framing_extras .size ());
413- req .request .opaque = pkt -> opaque ;
414- rck -> remaining ++ ;
415- mcreq_write_hdr (pkt , & req );
416- if (!framing_extras .empty ()) {
417- memcpy (SPAN_BUFFER (& pkt -> kh_span ) + sizeof (req .bytes ), framing_extras .data (), framing_extras .size ());
418- }
419- mcreq_sched_add (pl , pkt );
420- } while ( ++ r0 < r1 );
421-
422- if ( cmd -> need_get_active ()) {
423- req . request . opcode = PROTOCOL_BINARY_CMD_GET ;
424- mc_PIPELINE * pl ;
425- mc_PACKET * pkt ;
426- lcb_STATUS err = mcreq_basic_packet ( cq , & keybuf , cmd -> collection (). collection_id (), & req , 0 , ffextlen , & pkt ,
427- & pl , MCREQ_BASICPACKET_F_FALLBACKOK ) ;
428- if ( err != LCB_SUCCESS ) {
429- delete rck ;
430- return err ;
431- }
432- req . request . opaque = pkt -> opaque ;
433- pkt -> u_rdata . exdata = rck ;
434- pkt -> flags |= MCREQ_F_REQEXT ;
435- rck -> remaining ++ ;
436- mcreq_write_hdr ( pkt , & req );
437- if (! framing_extras . empty ()) {
438- memcpy ( SPAN_BUFFER ( & pkt -> kh_span ) + sizeof ( req . bytes ), framing_extras . data (), framing_extras . size () );
510+ pkt -> u_rdata .exdata = rck ;
511+ pkt -> flags |= MCREQ_F_REQEXT ;
512+
513+ mcreq_reserve_key (pl , pkt , sizeof (req .bytes ) + ffextlen , & keybuf , cmd -> collection ().collection_id ());
514+ size_t nkey = pkt -> kh_span .size - MCREQ_PKT_BASESIZE + pkt -> extlen ;
515+ req .request .keylen = htons ((uint16_t )nkey );
516+ req .request .bodylen = htonl ((uint32_t )nkey + framing_extras .size ());
517+ req .request .opaque = pkt -> opaque ;
518+ rck -> remaining ++ ;
519+ mcreq_write_hdr (pkt , & req );
520+ if (!framing_extras .empty ()) {
521+ memcpy (SPAN_BUFFER (& pkt -> kh_span ) + sizeof (req .bytes ), framing_extras .data (), framing_extras .size ());
522+ }
523+ mcreq_sched_add (pl , pkt );
524+ } else {
525+ req . request . opcode = PROTOCOL_BINARY_CMD_GET ;
526+ mc_PIPELINE * pl ;
527+ mc_PACKET * pkt ;
528+ lcb_STATUS err = mcreq_basic_packet ( cq , & keybuf , cmd -> collection (). collection_id (), & req , 0 , ffextlen , & pkt ,
529+ & pl , MCREQ_BASICPACKET_F_FALLBACKOK ) ;
530+ if ( err != LCB_SUCCESS ) {
531+ delete rck ;
532+ return err ;
533+ }
534+ req . request . opaque = pkt -> opaque ;
535+ pkt -> u_rdata . exdata = rck ;
536+ pkt -> flags |= MCREQ_F_REQEXT ;
537+ rck -> remaining ++ ;
538+ mcreq_write_hdr ( pkt , & req ) ;
539+ if (! framing_extras . empty ()) {
540+ memcpy ( SPAN_BUFFER ( & pkt -> kh_span ) + sizeof ( req . bytes ), framing_extras . data (), framing_extras . size () );
541+ }
542+ mcreq_sched_add ( pl , pkt );
439543 }
440- mcreq_sched_add (pl , pkt );
441544 }
442545
443546 MAYBE_SCHEDLEAVE (instance )
0 commit comments