Skip to content

Commit ad04729

Browse files
committed
CCBC-1607: fix collection id encoding in mixed cluster
Change-Id: I1893cfa1b67ca9457ad7dd0a14eda5695c5792c5 Reviewed-on: https://review.couchbase.org/c/libcouchbase/+/194901 Tested-by: Build Bot <build@couchbase.com> Reviewed-by: Brett Lawson <brett19@gmail.com>
1 parent 95e4542 commit ad04729

5 files changed

Lines changed: 141 additions & 19 deletions

File tree

src/handler.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -342,7 +342,7 @@ template <typename T>
342342
void invoke_callback(const mc_PACKET *pkt, lcb_INSTANCE *instance, T *resp, lcb_CALLBACK_TYPE cbtype)
343343
{
344344
if (instance != nullptr) {
345-
std::string collection_path = instance->collcache->id_to_name(mcreq_get_cid(instance, pkt));
345+
std::string collection_path = instance->collcache->id_to_name(mcreq_get_cid(instance, pkt, NULL));
346346
if (!collection_path.empty()) {
347347
size_t dot = collection_path.find('.');
348348
if (dot != std::string::npos) {

src/mc/mcreq.c

Lines changed: 90 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ lcb_STATUS mcreq_reserve_key(mc_PIPELINE *pipeline, mc_PACKET *packet, uint8_t h
109109
/* copy collection ID prefix */
110110
if (ncid) {
111111
memcpy(SPAN_BUFFER(&packet->kh_span) + hdrsize, cid, ncid);
112+
packet->flags |= MCREQ_F_HASCID;
112113
}
113114
/**
114115
* Copy the key into the packet starting at the extras end
@@ -243,10 +244,89 @@ void mcreq_reenqueue_packet(mc_PIPELINE *pipeline, mc_PACKET *packet)
243244
sllist_insert_sorted(reqs, &packet->slnode, pkt_tmo_compar);
244245
}
245246

247+
static void check_collection_id(mc_PIPELINE *pipeline, mc_PACKET *packet)
248+
{
249+
if ((packet->flags & MCREQ_F_NOCID) != 0) {
250+
return;
251+
}
252+
253+
// before adding packet to pipeline lets see if we need add or remove collection id prefix
254+
char *header_and_key = SPAN_BUFFER(&packet->kh_span);
255+
protocol_binary_request_header *request = (protocol_binary_request_header *)header_and_key;
256+
257+
uint16_t key_length;
258+
uint8_t flexible_extras_length = 0;
259+
260+
if (request->request.magic == PROTOCOL_BINARY_AREQ) {
261+
flexible_extras_length = request->request.keylen & 0xff;
262+
key_length = request->request.keylen >> 8;
263+
} else {
264+
key_length = ntohs(request->request.keylen);
265+
}
266+
if (key_length == 0) {
267+
return;
268+
}
269+
270+
char *key = header_and_key + sizeof(*request) + request->request.extlen + flexible_extras_length;
271+
uint32_t collection_id = 0;
272+
273+
uint16_t collection_id_length = 0;
274+
if ((packet->flags & MCREQ_F_HASCID) != 0) {
275+
collection_id_length = (uint16_t)leb128_decode((const uint8_t *)key, key_length, &collection_id);
276+
}
277+
278+
switch (pipeline->collections) {
279+
case MCREQ_COLLECTIONS_SUPPORTED:
280+
// the pipeline had negotiated collections feature with kv engine, so we have to encode collection id
281+
// prefix
282+
if (collection_id_length == 0) {
283+
// but collection id prefix was not encoded, we should assume default collection and prepend zero as
284+
// a collection identifier
285+
mcreq_set_cid(pipeline, packet, 0);
286+
}
287+
break;
288+
289+
case MCREQ_COLLECTIONS_UNSUPPORTTED:
290+
// the pipeline been told that the kv engine instance does not support collections
291+
if (collection_id_length != 0) {
292+
// but the packet has encoded collection id
293+
if (collection_id == 0) {
294+
// strip it if it is default collection
295+
request->request.bodylen = htonl(ntohl(request->request.bodylen) - collection_id_length);
296+
uint16_t new_key_length = key_length - collection_id_length;
297+
if (request->request.magic == PROTOCOL_BINARY_AREQ) {
298+
request->request.keylen = (new_key_length << 8U) | (flexible_extras_length & 0xffU);
299+
} else {
300+
request->request.keylen = htons(new_key_length);
301+
}
302+
303+
// shift the key content to the left
304+
for (int i = 0; i < new_key_length; ++i) {
305+
key[i] = key[i + collection_id_length];
306+
}
307+
} else {
308+
fprintf(
309+
stderr,
310+
"custom collection id has been dispatched to the node, that does not support collections\n");
311+
// TODO log error
312+
}
313+
}
314+
break;
315+
316+
default:
317+
// the pipeline hadn't completed handshake yet, so trust global settings, and let operation be fixed
318+
// when it will be retried in case of misprediction.
319+
fprintf(stderr, "collections has not been negotiated for the pipeline yet\n");
320+
break;
321+
}
322+
}
323+
246324
void mcreq_enqueue_packet(mc_PIPELINE *pipeline, mc_PACKET *packet)
247325
{
248326
nb_SPAN *vspan = &packet->u_value.single;
249327
sllist_append(&pipeline->requests, &packet->slnode);
328+
329+
check_collection_id(pipeline, packet);
250330
netbuf_enqueue_span(&pipeline->nbmgr, &packet->kh_span, packet);
251331
MC_INCR_METRIC(pipeline, bytes_queued, packet->kh_span.size);
252332

@@ -593,9 +673,10 @@ void mcreq_set_cid(mc_PIPELINE *pipeline, mc_PACKET *packet, uint32_t cid)
593673
netbuf_mblock_release(&pipeline->nbmgr, &packet->kh_span);
594674
}
595675
CREATE_STANDALONE_SPAN(&packet->kh_span, kdata, new_size);
676+
packet->flags |= MCREQ_F_HASCID;
596677
}
597678

598-
uint32_t mcreq_get_cid(lcb_INSTANCE *instance, const mc_PACKET *packet)
679+
uint32_t mcreq_get_cid(lcb_INSTANCE *instance, const mc_PACKET *packet, int *cid_set)
599680
{
600681
uint8_t ffext = 0;
601682
uint16_t nk = 0;
@@ -605,6 +686,10 @@ uint32_t mcreq_get_cid(lcb_INSTANCE *instance, const mc_PACKET *packet)
605686
char *kh = SPAN_BUFFER(&packet->kh_span);
606687
char *k = NULL;
607688

689+
if (cid_set != NULL) {
690+
*cid_set = 0;
691+
}
692+
608693
memcpy(&req, kh, sizeof(req));
609694
if (req.request.magic == PROTOCOL_BINARY_AREQ) {
610695
ffext = req.request.keylen & 0xff;
@@ -616,6 +701,9 @@ uint32_t mcreq_get_cid(lcb_INSTANCE *instance, const mc_PACKET *packet)
616701
if ((packet->flags & MCREQ_F_NOCID) == 0 && instance && LCBT_SETTING(instance, use_collections)) {
617702
ncid = leb128_decode((uint8_t *)k, nk, &cid);
618703
if (ncid) {
704+
if (cid_set != NULL) {
705+
*cid_set = 1;
706+
}
619707
return cid;
620708
}
621709
}
@@ -702,6 +790,7 @@ int mcreq_pipeline_init(mc_PIPELINE *pipeline)
702790
pipeline->index = 0;
703791
memset(&pipeline->ctxqueued, 0, sizeof pipeline->ctxqueued);
704792
pipeline->buf_done_callback = NULL;
793+
pipeline->collections = MCREQ_COLLECTIONS_UNKNOWN;
705794

706795
netbuf_default_settings(&settings);
707796

src/mc/mcreq.h

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -295,9 +295,20 @@ typedef enum {
295295
* The request has "replace" store semantics.
296296
* Utilized during error translation to map DOCUMENT_EXISTS to CAS_MISMATCH (see make_error() in handler.cc)
297297
*/
298-
MCREQ_F_REPLACE_SEMANTICS = 1u << 11u
298+
MCREQ_F_REPLACE_SEMANTICS = 1u << 11u,
299+
300+
/**
301+
* collection id is prepended to the key
302+
*/
303+
MCREQ_F_HASCID = 1u << 12u,
299304
} mcreq_flags;
300305

306+
typedef enum {
307+
MCREQ_COLLECTIONS_UNKNOWN = 0,
308+
MCREQ_COLLECTIONS_SUPPORTED = 1,
309+
MCREQ_COLLECTIONS_UNSUPPORTTED = 2,
310+
} mcreq_collections_support;
311+
301312
/** @brief mask of flags indicating user-allocated buffers */
302313
#define MCREQ_UBUF_FLAGS (MCREQ_F_KEY_NOCOPY | MCREQ_F_VALUE_NOCOPY)
303314
/** @brief mask of flags indicating response state of the packet */
@@ -425,6 +436,8 @@ typedef struct mc_pipeline_st {
425436
/** Allocator for packet structures */
426437
nb_MGR reqpool;
427438

439+
mcreq_collections_support collections;
440+
428441
/** Optional metrics structure for server */
429442
struct lcb_SERVERMETRICS_st *metrics;
430443
} mc_PIPELINE;
@@ -690,7 +703,7 @@ uint32_t mcreq_get_bodysize(const mc_PACKET *packet);
690703
*/
691704
uint32_t mcreq_get_size(const mc_PACKET *packet);
692705

693-
uint32_t mcreq_get_cid(lcb_INSTANCE *instance, const mc_PACKET *packet);
706+
uint32_t mcreq_get_cid(lcb_INSTANCE *instance, const mc_PACKET *packet, int *cid_set);
694707

695708
void mcreq_set_cid(mc_PIPELINE *pipeline, mc_PACKET *packet, uint32_t cid);
696709

@@ -704,6 +717,8 @@ uint16_t mcreq_get_vbucket(const mc_PACKET *packet);
704717
/** Initializes a single pipeline object */
705718
int mcreq_pipeline_init(mc_PIPELINE *pipeline);
706719

720+
int mcreq_pipeline_support_collections(mc_PIPELINE *pipeline);
721+
707722
/** Cleans up any initialization from pipeline_init */
708723
void mcreq_pipeline_cleanup(mc_PIPELINE *pipeline);
709724

src/mcserver/mcserver.cc

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,20 @@ bool Server::handle_unknown_collection(MemcachedResponse &resp, mc_PACKET *oldpk
273273
return true;
274274
}
275275

276-
uint32_t cid = mcreq_get_cid(instance, oldpkt);
276+
int cid_set = 0;
277+
uint32_t cid = mcreq_get_cid(instance, oldpkt, &cid_set);
278+
if ((collections == MCREQ_COLLECTIONS_UNSUPPORTTED && cid_set) /* we need to strip collection and retry */
279+
|| (collections == MCREQ_COLLECTIONS_SUPPORTED && !cid_set) /* we need to prepend collection and retry */) {
280+
lcb_log(LOGARGS_T(WARN),
281+
LOGFMT
282+
"UNKNOWN_COLLECTION. Packet=%p (M=0x%x, S=%u, OP=0x%x), CID=%u, collections=%d (set=%d). Retrying",
283+
LOGID_T(), (void *)oldpkt, (int)req.request.magic, oldpkt->opaque, (int)req.request.opcode,
284+
(unsigned)cid, (int)collections, cid_set);
285+
mc_PACKET *newpkt = mcreq_renew_packet(oldpkt);
286+
newpkt->flags &= ~MCREQ_STATE_FLAGS;
287+
instance->retryq->ucadd((mc_EXPACKET *)newpkt, LCB_ERR_TIMEOUT, orig_status);
288+
return true;
289+
}
277290
std::string name = instance->collcache->id_to_name(cid);
278291

279292
packet_wrapper wrapper;
@@ -1082,14 +1095,17 @@ void Server::handle_connected(lcbio_SOCKET *sock, lcb_STATUS err, lcbio_OSERR sy
10821095
sessinfo->has_feature(PROTOCOL_BINARY_FEATURE_CLUSTERMAP_CHANGE_NOTIFICATION_BRIEF);
10831096
config_with_known_version =
10841097
sessinfo->has_feature(PROTOCOL_BINARY_FEATURE_GET_CLUSTER_CONFIG_WITH_KNOWN_VERSION);
1098+
collections = sessinfo->has_feature(PROTOCOL_BINARY_FEATURE_COLLECTIONS) ? MCREQ_COLLECTIONS_SUPPORTED
1099+
: MCREQ_COLLECTIONS_UNSUPPORTTED;
10851100
lcb_log(
10861101
LOGARGS_T(TRACE),
1087-
R"(<%s:%s> (SRV=%p) Got new KV connection (json=%s, snappy=%s, mt=%s, durability=%s, config_push=%s, config_ver=%s, bucket=%s "%s"%s%s))",
1088-
curhost->host, curhost->port, (void *)this, jsonsupport ? "yes" : "no", compsupport ? "yes" : "no",
1089-
mutation_tokens ? "yes" : "no", new_durability ? "yes" : "no",
1090-
clustermap_change_notification ? "yes" : "no", config_with_known_version ? "yes" : "no",
1091-
selected_bucket ? "yes" : "no", selected_bucket ? bucket.c_str() : "-",
1092-
try_to_select_bucket ? " selecting " : "", try_to_select_bucket ? settings->bucket : "");
1102+
R"(<%s:%s> (SRV=%p) Got new KV connection (collections=%s, json=%s, snappy=%s, mt=%s, durability=%s, config_push=%s, config_ver=%s, bucket=%s "%s"%s%s))",
1103+
curhost->host, curhost->port, (void *)this, collections == MCREQ_COLLECTIONS_SUPPORTED ? "yes" : "no",
1104+
jsonsupport ? "yes" : "no", compsupport ? "yes" : "no", mutation_tokens ? "yes" : "no",
1105+
new_durability ? "yes" : "no", clustermap_change_notification ? "yes" : "no",
1106+
config_with_known_version ? "yes" : "no", selected_bucket ? "yes" : "no",
1107+
selected_bucket ? bucket.c_str() : "-", try_to_select_bucket ? " selecting " : "",
1108+
try_to_select_bucket ? settings->bucket : "");
10931109
}
10941110

10951111
lcbio_CTXPROCS procs{};

src/retryq.cc

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -273,12 +273,13 @@ void RetryQueue::flush(bool throttle)
273273
fail(op, LCB_ERR_NO_MATCHING_SERVER, now);
274274
}
275275
} else {
276-
uint32_t cid = mcreq_get_cid(get_instance(), op->pkt);
276+
int cid_set = 0;
277+
uint32_t cid = mcreq_get_cid(get_instance(), op->pkt, &cid_set);
277278
lcb_log(LOGARGS(this, TRACE),
278-
"Flush PKT=%p to network. retries=%u, cid=%u, opaque=%u, IX=%d, spent=%" PRIu64
279+
"Flush PKT=%p to network. retries=%u, cid=%u (%s), opaque=%u, IX=%d, spent=%" PRIu64
279280
"us, deadline_in=%" PRIu64 "us",
280-
(void *)op->pkt, op->pkt->retries, cid, op->pkt->opaque, srvix, LCB_NS2US(now - op->start),
281-
LCB_NS2US(op->deadline - now));
281+
(void *)op->pkt, op->pkt->retries, cid, cid_set ? "set" : "unset", op->pkt->opaque, srvix,
282+
LCB_NS2US(now - op->start), LCB_NS2US(op->deadline - now));
282283
mc_PIPELINE *newpl = cq->pipelines[srvix];
283284
mcreq_enqueue_packet(newpl, op->pkt);
284285
newpl->flush_start(newpl);
@@ -422,12 +423,13 @@ void RetryQueue::add(mc_EXPACKET *pkt, const lcb_STATUS err, protocol_binary_res
422423
lcb_list_add_sorted(&schedops, static_cast<SchedNode *>(op), cmpfn_retry);
423424
lcb_list_add_sorted(&tmoops, static_cast<TmoNode *>(op), cmpfn_tmo);
424425

425-
uint32_t cid = mcreq_get_cid(get_instance(), &pkt->base);
426+
int cid_set = 0;
427+
uint32_t cid = mcreq_get_cid(get_instance(), op->pkt, &cid_set);
426428
lcb_log(LOGARGS(this, DEBUG),
427-
"Adding PKT=%p to retry queue. retries=%u, cid=%u, opaque=%u, now=%" PRIu64 "ms, spent=%" PRIu64
429+
"Adding PKT=%p to retry queue. retries=%u, cid=%u (%s), opaque=%u, now=%" PRIu64 "ms, spent=%" PRIu64
428430
"us, deadline_in=%" PRIu64 "us, status=0x%02x, rc=%s",
429-
(void *)pkt, pkt->base.retries, cid, pkt->base.opaque, LCB_NS2MS(now), LCB_NS2US(now - op->start),
430-
LCB_NS2US(op->deadline - now), status, lcb_strerror_short(err));
431+
(void *)pkt, pkt->base.retries, cid, cid_set ? "set" : "unset", pkt->base.opaque, LCB_NS2MS(now),
432+
LCB_NS2US(now - op->start), LCB_NS2US(op->deadline - now), status, lcb_strerror_short(err));
431433
schedule();
432434

433435
if (settings->metrics) {

0 commit comments

Comments
 (0)