Skip to content

Commit 3e21d89

Browse files
committed
CCBC-1610: fix memory management for collection id prefix
* fix deallocation of standalone buffers when wiping packets * use managed buffer when setting collection id prefix for the command Change-Id: Idb99f6a972f89be0d2254b767e35e7f8d695938c Reviewed-on: https://review.couchbase.org/c/libcouchbase/+/196405 Reviewed-by: Brett Lawson <brett19@gmail.com> Tested-by: Build Bot <build@couchbase.com>
1 parent 6e411d4 commit 3e21d89

2 files changed

Lines changed: 56 additions & 39 deletions

File tree

src/mc/mcreq.c

Lines changed: 54 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@
2020
#include "sllist-inl.h"
2121
#include "internal.h"
2222

23+
#define LOGARGS(pipeline, lvl) \
24+
((lcb_INSTANCE *)((pipeline)->parent->cqdata))->settings, "mcreq", LCB_LOG_##lvl, __FILE__, __LINE__
25+
2326
#define PKT_HDRSIZE(pkt) (MCREQ_PKT_BASESIZE + (pkt)->extlen)
2427

2528
lcb_STATUS mcreq_reserve_header(mc_PIPELINE *pipeline, mc_PACKET *packet, uint8_t hdrsize)
@@ -304,19 +307,19 @@ static void check_collection_id(mc_PIPELINE *pipeline, mc_PACKET *packet)
304307
for (int i = 0; i < new_key_length; ++i) {
305308
key[i] = key[i + collection_id_length];
306309
}
307-
} else {
308-
fprintf(
309-
stderr,
310-
"custom collection id has been dispatched to the node, that does not support collections\n");
311-
// TODO log error
310+
} else if (pipeline->parent->cqdata) {
311+
lcb_log(LOGARGS(pipeline, DEBUG),
312+
"Custom collection id has been dispatched to the node, that does not support collections");
312313
}
313314
}
314315
break;
315316

316317
default:
317318
// the pipeline hadn't completed handshake yet, so trust global settings, and let operation be fixed
318319
// when it will be retried in case of misprediction.
319-
fprintf(stderr, "collections has not been negotiated for the pipeline yet\n");
320+
if (pipeline->parent->cqdata) {
321+
lcb_log(LOGARGS(pipeline, DEBUG), "Collections has not been negotiated for the pipeline yet");
322+
}
320323
break;
321324
}
322325
}
@@ -354,7 +357,7 @@ void mcreq_enqueue_packet(mc_PIPELINE *pipeline, mc_PACKET *packet)
354357
void mcreq_wipe_packet(mc_PIPELINE *pipeline, mc_PACKET *packet)
355358
{
356359
if (!(packet->flags & MCREQ_F_KEY_NOCOPY)) {
357-
if (packet->flags & MCREQ_F_DETACHED) {
360+
if ((packet->flags & MCREQ_F_DETACHED) || IS_STANDALONE_SPAN(&packet->kh_span)) {
358361
free(SPAN_BUFFER(&packet->kh_span));
359362
} else {
360363
netbuf_mblock_release(&pipeline->nbmgr, &packet->kh_span);
@@ -373,7 +376,7 @@ void mcreq_wipe_packet(mc_PIPELINE *pipeline, mc_PACKET *packet)
373376
return;
374377
}
375378

376-
if (packet->flags & MCREQ_F_DETACHED) {
379+
if ((packet->flags & MCREQ_F_DETACHED) || IS_STANDALONE_SPAN(&packet->kh_span)) {
377380
free(SPAN_BUFFER(&packet->u_value.single));
378381
} else {
379382
netbuf_mblock_release(&pipeline->nbmgr, &packet->u_value.single);
@@ -628,51 +631,63 @@ lcb_STATUS mcreq_basic_packet(mc_CMDQUEUE *queue, const lcb_KEYBUF *key, uint32_
628631

629632
void mcreq_set_cid(mc_PIPELINE *pipeline, mc_PACKET *packet, uint32_t cid)
630633
{
631-
uint8_t ffext = 0;
632-
uint16_t nk = 0;
633-
uint8_t nbuf = 0;
634-
uint8_t buf[5] = {0};
635-
uint32_t old;
636-
int nold;
634+
nb_SPAN old_span = packet->kh_span;
635+
636+
// extract header
637+
char *header_and_key = SPAN_BUFFER(&old_span);
637638
protocol_binary_request_header req;
638-
char *kh = SPAN_BUFFER(&packet->kh_span);
639-
char *k = NULL;
639+
memcpy(&req, header_and_key, sizeof(req));
640640

641-
memcpy(&req, kh, sizeof(req));
641+
// extract key
642+
uint16_t key_length = 0;
643+
uint8_t flexible_extras_length = 0;
642644
if (req.request.magic == PROTOCOL_BINARY_AREQ) {
643-
ffext = req.request.keylen & 0xff;
644-
nk = req.request.keylen >> 8;
645+
flexible_extras_length = req.request.keylen & 0xff;
646+
key_length = req.request.keylen >> 8;
645647
} else {
646-
nk = ntohs(req.request.keylen);
648+
key_length = ntohs(req.request.keylen);
647649
}
648-
size_t nhdr = sizeof(req) + req.request.extlen + ffext;
649-
k = kh + nhdr;
650-
nold = leb128_decode((uint8_t *)k, nk, &old);
651-
nbuf = leb128_encode(cid, buf);
650+
size_t header_size = sizeof(req) + req.request.extlen + flexible_extras_length;
651+
char *key = header_and_key + header_size;
652+
653+
// parse old collection id and determine its length
654+
uint32_t old_collection_id;
655+
int old_collection_id_length = leb128_decode((uint8_t *)key, key_length, &old_collection_id);
652656

653-
int diff = (int)nbuf - (int)nold;
654-
size_t new_size = packet->kh_span.size + diff;
657+
// encode new collection id
658+
uint8_t collection_id[5] = {0};
659+
int collection_id_length = leb128_encode(cid, collection_id);
660+
661+
// fix field lengths in the packet
662+
int diff = collection_id_length - old_collection_id_length;
663+
size_t new_header_and_key_size = old_span.size + diff;
655664
req.request.bodylen = htonl(ntohl(req.request.bodylen) + diff);
656-
size_t new_klen = nk + diff;
665+
size_t new_klen = key_length + diff;
657666
if (req.request.magic == PROTOCOL_BINARY_AREQ) {
658-
req.request.keylen = (new_klen << 8) | (ffext & 0xff);
667+
req.request.keylen = (new_klen << 8) | (flexible_extras_length & 0xff);
659668
} else {
660669
req.request.keylen = htons(new_klen);
661670
}
662-
char *kdata = (char *)calloc(new_size, sizeof(char));
663-
char *ptr = kh;
664-
memcpy(kdata, ptr, nhdr);
665-
memcpy(kdata, req.bytes, sizeof(req.bytes));
666-
ptr += nhdr + nold;
667-
memcpy(kdata + nhdr, buf, nbuf);
668-
memcpy(kdata + nhdr + nbuf, ptr, new_size - nbuf - nhdr);
669-
if (packet->kh_span.offset == NETBUF_INVALID_OFFSET) {
671+
672+
// copy old header fields, with only collection id updated
673+
netbuf_mblock_reserve(&pipeline->nbmgr, &packet->kh_span);
674+
char *new_header_and_key = SPAN_BUFFER(&packet->kh_span);
675+
const char *ptr = header_and_key;
676+
memcpy(new_header_and_key, ptr, header_size);
677+
memcpy(new_header_and_key, req.bytes, sizeof(req.bytes));
678+
ptr += header_size + old_collection_id_length;
679+
memcpy(new_header_and_key + header_size, collection_id, collection_id_length);
680+
memcpy(new_header_and_key + header_size + collection_id_length, ptr,
681+
new_header_and_key_size - collection_id_length - header_size);
682+
683+
// deallocate the old span
684+
if (IS_STANDALONE_SPAN(&old_span)) {
670685
/* standalone buffer */
671-
free(SPAN_BUFFER(&packet->kh_span));
686+
free(SPAN_BUFFER(&old_span));
672687
} else {
673-
netbuf_mblock_release(&pipeline->nbmgr, &packet->kh_span);
688+
netbuf_mblock_release(&pipeline->nbmgr, &old_span);
674689
}
675-
CREATE_STANDALONE_SPAN(&packet->kh_span, kdata, new_size);
690+
676691
packet->flags |= MCREQ_F_HASCID;
677692
}
678693

src/netbuf/netbuf.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,8 @@ typedef struct {
8888
(span)->offset = NETBUF_INVALID_OFFSET; \
8989
(span)->size = len;
9090

91+
#define IS_STANDALONE_SPAN(span) (span)->offset == NETBUF_INVALID_OFFSET
92+
9193
/** @private */
9294
typedef struct {
9395
sllist_node slnode;

0 commit comments

Comments
 (0)