2929#include <lcbio/timer-cxx.h>
3030#include <lcbio/ssl.h>
3131#include "ctx-log-inl.h"
32+ #include "mc/compress.h"
3233
33- #include <cstring >
34+ #include <stdio.h >
3435
3536#define LOGFMT CTX_LOGFMT
3637#define LOGID (p ) CTX_LOGID(p->ioctx)
@@ -51,13 +52,14 @@ struct CccpProvider : public Provider {
5152 * because we have received a successful response.
5253 */
5354 void stop_current_request (bool is_clean );
54- lcb_STATUS schedule_next_request (lcb_STATUS err , bool can_rollover );
55+ lcb_STATUS schedule_next_request (lcb_STATUS err , bool can_rollover , bool skip_if_push_supported );
56+ lcb_STATUS expect_config_with_version (const lcb_host_t * origin , config_version version );
5557 lcb_STATUS mcio_error (lcb_STATUS err );
5658 void on_timeout ()
5759 {
5860 mcio_error (LCB_ERR_TIMEOUT );
5961 }
60- lcb_STATUS update (const char * host , const char * data );
62+ lcb_STATUS update (const char * host , const std :: string & config_json );
6163 void request_config ();
6264 void on_io_read ();
6365
@@ -96,6 +98,14 @@ struct CccpProvider : public Provider {
9698 lcb ::io ::ConnectionRequest * creq {};
9799 lcbio_CTX * ioctx ;
98100 CccpCookie * cmdcookie ;
101+ /*
102+ * The version, that the library has seen in the notifications, but hasn't fetched yet.
103+ *
104+ * For example, if KV engine sends clustermap notification while this config provider is waiting for response, the
105+ * version number from the notification will be recorded here, and once the current operation will be completed, the
106+ * provider will make new request if the expected version is newer than the current one.
107+ */
108+ config_version expected_config_version {-1 , -1 };
99109};
100110
101111struct CccpCookie {
@@ -149,17 +159,104 @@ void CccpProvider::stop_current_request(bool is_clean)
149159 }
150160}
151161
152- lcb_STATUS CccpProvider ::schedule_next_request (lcb_STATUS err , bool can_rollover )
162+ /**
163+ * Tell the provider that the newer configuration might be existing on the server side.
164+ *
165+ * If there is no request in-flight, the library will do PROTOCOL_BINARY_CMD_GET_CLUSTER_CONFIG (0xb5) immediately,
166+ * otherwise update internal state with the version number and return;
167+ *
168+ * @param origin the hostname of the connection that was notification about configuration update
169+ * @param requested the version of the configuration from the KV notification
170+ * @return
171+ */
172+ lcb_STATUS CccpProvider ::expect_config_with_version (const lcb_host_t * origin , config_version requested )
173+ {
174+
175+ auto current = parent -> get_current_version ();
176+ auto previous_expected = expected_config_version ;
177+ if (current < requested && previous_expected < requested ) {
178+ /*
179+ * The requested version is newer than we've seen so far (both current and previously requested)
180+ */
181+ expected_config_version = requested ;
182+ if (has_pending_request ()) {
183+ /*
184+ * Only one configuration request could be in-flight, but the version is stored in expected_config_version
185+ * already and will be checked once current request completes.
186+ */
187+ lcb_log (LOGARGS (this , DEBUG ),
188+ "Configuration request is in flight " LCB_HOST_FMT ": expected=%" PRId64 ":%" PRId64
189+ ", current=%" PRId64 ":%" PRId64 ", requested=%" PRId64 ":%" PRId64 ,
190+ LCB_HOST_ARG (parent -> settings , origin ), previous_expected .epoch , previous_expected .revision ,
191+ current .epoch , current .revision , requested .epoch , requested .revision );
192+ return LCB_SUCCESS ;
193+ }
194+ /*
195+ * Request new configuration immediately
196+ */
197+ return schedule_next_request (LCB_SUCCESS , /* can_rollover */ true, /* skip_if_push_supported */ false);
198+ } else {
199+ /*
200+ * The config provider already seen this revision and probably already applied it and using as the current.
201+ */
202+ lcb_log (LOGARGS (this , TRACE ),
203+ "Ignore configuration request " LCB_HOST_FMT ": expected=%" PRId64 ":%" PRId64 ", current=%" PRId64
204+ ":%" PRId64 ", requested=%" PRId64 ":%" PRId64 ,
205+ LCB_HOST_ARG (parent -> settings , origin ), previous_expected .epoch , previous_expected .revision ,
206+ current .epoch , current .revision , requested .epoch , requested .revision );
207+ }
208+ return LCB_SUCCESS ;
209+ }
210+
211+ lcb_STATUS CccpProvider ::schedule_next_request (lcb_STATUS err , bool can_rollover , bool skip_if_push_supported )
153212{
154- lcb_host_t * next_host = nodes -> next (can_rollover );
213+ if (nodes -> empty ()) {
214+ timer .cancel ();
215+ parent -> provider_failed (this , err );
216+ return err ;
217+ }
218+
219+ if (skip_if_push_supported && nodes -> all_hosts_support_config_push ()) {
220+ /* all nodes support configuration push, and this function invoked from periodic poller, so nothing has to be
221+ * done here */
222+ parent -> stop ();
223+ return LCB_SUCCESS ;
224+ }
225+
226+ lcb ::Server * server {nullptr };
227+ lcb_host_t * next_host = nodes -> next (can_rollover , skip_if_push_supported );
155228 if (!next_host ) {
156229 timer .cancel ();
157230 parent -> provider_failed (this , err );
158231 return err ;
159232 }
160233
161- lcb ::Server * server = instance -> find_server (* next_host );
162- if (server ) {
234+ do {
235+ /* try to find connected socket that corresponds the hostname */
236+ server = instance -> find_server (* next_host );
237+ if (server != nullptr && server -> supports_config_push ()) {
238+ /* mark the address in the list, that it supports configuration push, so that it could be skipped later */
239+ next_host -> supports_config_push = true;
240+ if (skip_if_push_supported ) {
241+ next_host = nodes -> next (can_rollover , skip_if_push_supported );
242+ } else {
243+ break ;
244+ }
245+ } else {
246+ break ;
247+ }
248+ } while (next_host != nullptr );
249+
250+ /* there is no connected sockets */
251+ if (server != nullptr ) {
252+ if (skip_if_push_supported && server -> supports_config_push ()) {
253+ /* we found the server, but at the same time all sockets support push, so we can stop polling and just
254+ * expect notifications from KV engine */
255+ lcb_log (LOGARGS (this , DEBUG ), "Stop background polling, as all nodes support configuration push" );
256+ parent -> stop ();
257+ return LCB_SUCCESS ;
258+ }
259+
163260 cmdcookie = new CccpCookie (this );
164261 lcb_log (LOGARGS (this , TRACE ), "Re-Issuing CCCP Command on server struct %p (" LCB_HOST_FMT ")" , (void * )server ,
165262 LCB_HOST_ARG (this -> parent -> settings , next_host ));
@@ -169,10 +266,10 @@ lcb_STATUS CccpProvider::schedule_next_request(lcb_STATUS err, bool can_rollover
169266 instance -> select_bucket (cmdcookie , server );
170267 }
171268 cmdcookie -> incref ();
172- instance -> request_config (cmdcookie , server );
269+ instance -> request_config (cmdcookie , server , parent -> get_current_version () );
173270
174271 } else {
175-
272+ /* initiate new connection */
176273 lcb_log (LOGARGS (this , INFO ), "Requesting connection to node " LCB_HOST_FMT " for CCCP configuration" ,
177274 LCB_HOST_ARG (this -> parent -> settings , next_host ));
178275 creq = instance -> memd_sockpool -> get (* next_host , settings ().config_node_timeout , on_connected , this );
@@ -194,18 +291,28 @@ lcb_STATUS CccpProvider::mcio_error(lcb_STATUS err)
194291 parent -> provider_failed (this , err );
195292 return err ;
196293 } else {
197- return schedule_next_request (err , false);
294+ return schedule_next_request (err , /* can_rollover */ false, /* skip_if_push_supported */ false);
198295 }
199296}
200297
201298/** Update the configuration from a server. */
202- lcb_STATUS lcb ::clconfig ::cccp_update (Provider * provider , const char * host , const char * data )
299+ lcb_STATUS lcb ::clconfig ::cccp_update (Provider * provider , const char * host , const std ::string & config_json )
300+ {
301+ return static_cast < CccpProvider * > (provider )-> update (host , config_json );
302+ }
303+
304+ lcb_STATUS lcb ::clconfig ::schedule_get_config (Provider * provider , const lcb_host_t * origin , config_version version )
203305{
204- return static_cast < CccpProvider * > (provider )-> update ( host , data );
306+ return static_cast < CccpProvider * > (provider )-> expect_config_with_version ( origin , version );
205307}
206308
207- lcb_STATUS CccpProvider ::update (const char * host , const char * data )
309+ lcb_STATUS CccpProvider ::update (const char * host , const std :: string & config_json )
208310{
311+ if (config_json .empty ()) {
312+ // ignore empty payloads, in case of brief mode
313+ parent -> stop ();
314+ return LCB_SUCCESS ;
315+ }
209316 lcbvb_CONFIG * vbc ;
210317 int rv ;
211318 ConfigInfo * new_config ;
@@ -214,11 +321,11 @@ lcb_STATUS CccpProvider::update(const char *host, const char *data)
214321 if (!vbc ) {
215322 return LCB_ERR_NO_MEMORY ;
216323 }
217- rv = lcbvb_load_json_ex (vbc , data , host , & LCBT_SETTING (this -> parent , network ));
324+ rv = lcbvb_load_json_ex (vbc , config_json . c_str () , host , & LCBT_SETTING (this -> parent , network ));
218325
219326 if (rv ) {
220327 lcb_log (LOGARGS (this , ERROR ), LOGFMT "Failed to parse config" , LOGID (this ));
221- lcb_log_badconfig (LOGARGS (this , ERROR ), vbc , data );
328+ lcb_log_badconfig (LOGARGS (this , ERROR ), vbc , config_json . c_str () );
222329 lcbvb_destroy (vbc );
223330 return LCB_ERR_PROTOCOL_ERROR ;
224331 }
@@ -238,6 +345,10 @@ lcb_STATUS CccpProvider::update(const char *host, const char *data)
238345 /** TODO: Figure out the comparison vector */
239346 config = new_config ;
240347 parent -> provider_got_config (this , new_config );
348+
349+ if (parent -> get_current_version () < expected_config_version ) {
350+ return schedule_next_request (LCB_SUCCESS , /* can_rollover */ true, /* skip_if_push_supported */ false);
351+ }
241352 return LCB_SUCCESS ;
242353}
243354
@@ -248,8 +359,8 @@ void lcb::clconfig::select_status(const void *cookie_, lcb_STATUS err)
248359 cookie -> decref ();
249360}
250361
251- void lcb ::clconfig ::cccp_update (const void * cookie_ , lcb_STATUS err , const void * bytes , size_t nbytes ,
252- const lcb_host_t * origin )
362+ void lcb ::clconfig ::cccp_update (const void * cookie_ , lcb_STATUS err , const lcb_host_t * origin ,
363+ const std :: string & config_json )
253364{
254365 auto * cookie = reinterpret_cast < CccpCookie * > (const_cast < void * > (cookie_ ));
255366 CccpProvider * cccp = cookie -> parent ;
@@ -269,8 +380,7 @@ void lcb::clconfig::cccp_update(const void *cookie_, lcb_STATUS err, const void
269380 }
270381
271382 if (err == LCB_SUCCESS ) {
272- std ::string ss (reinterpret_cast < const char * > (bytes ), nbytes );
273- err = cccp -> update (origin -> host , ss .c_str ());
383+ err = cccp -> update (origin -> host , config_json );
274384 }
275385
276386 if (err != LCB_SUCCESS && was_active ) {
@@ -312,7 +422,7 @@ lcb_STATUS CccpProvider::refresh()
312422 return LCB_ERR_BUSY ;
313423 }
314424
315- return schedule_next_request (LCB_SUCCESS , true);
425+ return schedule_next_request (LCB_SUCCESS , /* can_rollover */ true, /* skip_if_push_supported */ true);
316426}
317427
318428bool CccpProvider ::pause ()
@@ -339,6 +449,9 @@ CccpProvider::~CccpProvider()
339449
340450void CccpProvider ::configure_nodes (const lcb ::Hostlist & nodes_ )
341451{
452+ /* note, that provider assumes that none of the nodes supports configuration push. It will be checked later, when
453+ * the address will be selected to fetch configuration. It allows to handle downgrade scenario, when newer server
454+ * version replaced with older, that do not support configuration push */
342455 nodes -> assign (nodes_ );
343456 if (parent -> settings -> randomize_bootstrap_nodes ) {
344457 nodes -> randomize ();
@@ -422,7 +535,7 @@ void CccpProvider::on_io_read()
422535 return_error (LCB_ERR_PROTOCOL_ERROR );
423536 }
424537
425- std :: string jsonstr ( resp . value (), resp .vallen () );
538+ auto jsonstr = resp .inflated_value ( );
426539 std ::string hoststr (lcbio_get_host (lcbio_ctx_sock (ioctx ))-> host );
427540
428541 resp .release (ioctx );
@@ -433,14 +546,17 @@ void CccpProvider::on_io_read()
433546 if (err == LCB_SUCCESS ) {
434547 timer .cancel ();
435548 } else {
436- schedule_next_request (LCB_ERR_PROTOCOL_ERROR , false);
549+ schedule_next_request (LCB_ERR_PROTOCOL_ERROR , /* can_rollover */ false, /* skip_if_push_supported */ false);
437550 }
438551
439552#undef return_error
440553}
441554
442555void CccpProvider ::request_config ()
443556{
557+ lcb_log (LOGARGS (this , TRACE ), "Attempting to retrieve cluster map via CCCP (timeout=%uus)" ,
558+ settings ().config_node_timeout );
559+
444560 lcb ::MemcachedRequest req (PROTOCOL_BINARY_CMD_GET_CLUSTER_CONFIG );
445561 req .opaque (0xF00D );
446562 lcbio_ctx_put (ioctx , req .data (), req .size ());
0 commit comments