Skip to content

Commit a3ed6a0

Browse files
committed
CCBC-1597: Update threading example, reduce global state
* Json::Reader uses static global variable, this patch replaces calls to it with Json::CharReaderBuilder, which is re-entrant. * Constants for tracing system defined as mutable static strings, this patch replaces it with const static strings. * Updated examples for thread-safe usage is updated to SDK3 API and added them to the build pipeline Change-Id: Id4341b9ece379f8ad8b8ba76051c67383310c324 Reviewed-on: https://review.couchbase.org/c/libcouchbase/+/190334 Reviewed-by: Brett Lawson <brett19@gmail.com> Tested-by: Build Bot <build@couchbase.com>
1 parent d2e3e76 commit a3ed6a0

50 files changed

Lines changed: 489 additions & 219 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

doc/example/threads.c

Lines changed: 0 additions & 93 deletions
This file was deleted.

doc/mainpage.h

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,12 @@
8484
*
8585
* @example example/minimal/query.c
8686
* Shows N1QL query API. Also because queries executed in a loop, the sample might be used as simple benchmark (more sofisticated shipped with cbc tools, as cbc-n1qlback)
87+
*
88+
* @example example/threads-shared/threads-shared.c
89+
* Shows how to protect single `lcb_INSTANCE` when it is shared between multiple threads.
90+
*
91+
* @example example/threads-private/threads-private.c
92+
* Shows how to bind `lcb_INSTANCE` to each thread, and how to use custom logger in the thread-safe way.
8793
*/
8894

8995
/**
@@ -119,18 +125,30 @@
119125
* @}
120126
*/
121127

122-
123128
/**
124129
* @page lcb_thrsafe Thread Safety
125130
*
126-
* The library uses no internal locking and is thus not safe to be used
127-
* concurrently from multiple threads. As the library contains no globals
128-
* you may call into the library from multiple threads so long as the same data
129-
* structure (specifically, the same `lcb_INSTANCE *`) is not used.
131+
* This library is not designed to be thread-safe. However, it should be safe to use one `lcb_INSTANCE` object per
132+
* thread, with some caveats and careful consideration.
133+
*
134+
* 1. You must be certain that the `lcb_INSTANCE` is not shared with other threads. For performance, there are no
135+
* internal locks or other thread safety mechanisms to protect internal data structures.
136+
*
137+
* 2. Also for performance reasons, the default logger is not thread-safe, and is not bound to a single `lcb_INSTANCE`.
138+
* Therefore, a multi-threaded application must also override the logger with a thread-safe version or use a separate
139+
* logger for each instance (see example @ref example/threads-private/threads-private.c).
140+
*
141+
* 3. Likewise, any other shared instances that are registered with the library (e.g., `lcb_io_opt_t`) must also be
142+
* protected in a similar manner. Such instances must either be protected and made thread-safe internally or a new
143+
* instance per `lcb_INSTANCE` can be provided.
144+
*
145+
* As with any multi-threaded application extra testing and analysis should be done using a tool like
146+
* <a href="https://valgrind.org/docs/manual/drd-manual.html">Valgrind/DRD</a>,
147+
* <a href="https://clang.llvm.org/docs/ThreadSanitizer.html">ThreadSanitizer</a> or similar.
130148
*
131-
* @include doc/example/threads.c
149+
* * @ref example/threads-shared/threads-shared.c - this example shows how to protect single `lcb_INSTANCE` when it is
150+
* shared between multiple threads.
132151
*
133-
* In this quick mockup example, the same `lcb_INSTANCE *` is being used from multiple
134-
* threads and thus requires locking. Now if each thread created its own `lcb_INSTANCE *`
135-
* it would be free to operate upon it without locking.
152+
* * @ref example/threads-private/threads-private.c - this example shows how to bind `lcb_INSTANCE` to each thread, and
153+
* how to use custom logger in the thread-safe way.
136154
*/

example/CMakeLists.txt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,3 +103,8 @@ ENDIF()
103103
IF(HAVE_LIBUV AND LCB_BUILD_LIBUV)
104104
ADD_EXAMPLE(libuv-direct libuvdirect ${LIBUV_LIBRARIES} ${LIBUV_INCLUDE_DIR})
105105
ENDIF()
106+
107+
IF(NOT WIN32)
108+
ADD_EXAMPLE(threads-shared threads-shared pthread "")
109+
ADD_EXAMPLE(threads-private threads-private pthread "")
110+
ENDIF()
Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
/* -*- Mode: C; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2+
/*
3+
* Copyright 2017-Present Couchbase, Inc.
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
#include <unistd.h>
19+
#include <pthread.h>
20+
#include <string.h>
21+
#include <sys/types.h>
22+
#ifdef __linux__
23+
#include <sys/syscall.h>
24+
#endif
25+
26+
#include <libcouchbase/couchbase.h>
27+
28+
static void get_callback(const lcb_INSTANCE *instance, int cbtype, const lcb_RESPGET *resp)
29+
{
30+
lcb_STATUS rc = lcb_respget_status(resp);
31+
const char *key;
32+
size_t nkey;
33+
lcb_respget_key(resp, &key, &nkey);
34+
fprintf(stderr, "GET \"%.*s\", %s\n", (int)nkey, key, lcb_strerror_short(rc));
35+
36+
(void)instance;
37+
(void)cbtype;
38+
}
39+
40+
typedef struct {
41+
lcb_LOGGER *base;
42+
lcb_LOG_SEVERITY min_level;
43+
char path[1024];
44+
FILE *file;
45+
} my_LOGGER;
46+
47+
static const char *level_str(lcb_LOG_SEVERITY severity)
48+
{
49+
switch (severity) {
50+
case LCB_LOG_TRACE:
51+
return "TRACE";
52+
case LCB_LOG_DEBUG:
53+
return "DEBUG";
54+
case LCB_LOG_INFO:
55+
return "INFO ";
56+
case LCB_LOG_WARN:
57+
return "WARN ";
58+
case LCB_LOG_ERROR:
59+
return "ERROR";
60+
case LCB_LOG_FATAL:
61+
return "FATAL";
62+
default:
63+
return "";
64+
}
65+
}
66+
static void log_callback(const lcb_LOGGER *logger, uint64_t iid, const char *subsys, lcb_LOG_SEVERITY severity,
67+
const char *srcfile, int srcline, const char *fmt, va_list ap)
68+
{
69+
my_LOGGER *wrapper = NULL;
70+
lcb_logger_cookie(logger, (void **)&wrapper);
71+
if (wrapper == NULL) {
72+
return;
73+
}
74+
if (severity < wrapper->min_level) {
75+
return;
76+
}
77+
78+
uint64_t tid = 0;
79+
#if defined(__APPLE__)
80+
pthread_threadid_np(NULL, &tid);
81+
#elif defined(__linux__)
82+
tid = syscall(SYS_gettid);
83+
#endif
84+
char buf[1024] = {0};
85+
int written = snprintf(buf, sizeof(buf), "%s [thread=0x%08llx, instance=0x%08llx] ", level_str(severity), tid, iid);
86+
vsnprintf(buf + written, sizeof(buf) - written, fmt, ap);
87+
88+
fprintf(wrapper->file, "%s\n", buf);
89+
(void)srcfile;
90+
(void)srcline;
91+
(void)subsys;
92+
}
93+
94+
/*
95+
* This function uses an instance per thread. Since no other thread is using the instance, locking is not required
96+
*/
97+
static void *thread_func_unlocked(void *arg)
98+
{
99+
lcb_INSTANCE *instance = (lcb_INSTANCE *)arg;
100+
101+
lcb_connect(instance);
102+
lcb_wait(instance, LCB_WAIT_DEFAULT);
103+
lcb_install_callback(instance, LCB_CALLBACK_GET, (lcb_RESPCALLBACK)get_callback);
104+
105+
const char *key = "key";
106+
lcb_CMDGET *cmd = NULL;
107+
lcb_cmdget_create(&cmd);
108+
lcb_cmdget_key(cmd, key, strlen(key));
109+
110+
lcb_STATUS rc = lcb_get(instance, NULL, cmd);
111+
lcb_cmdget_destroy(cmd);
112+
if (rc != LCB_SUCCESS) {
113+
fprintf(stderr, "Could not schedule GET \"%.*s\", %s\n", (int)strlen(key), key, lcb_strerror_short(rc));
114+
} else {
115+
lcb_wait(instance, LCB_WAIT_DEFAULT);
116+
}
117+
return NULL;
118+
}
119+
120+
/**
121+
*
122+
* This example demonstrates strategy, where every thread has associated lcb_INSTANCE, which is never shared
123+
*
124+
* Key observations here:
125+
*
126+
* 1. more resources will be consumed by the library (memory, descriptors, etc)
127+
* 2. the application will be able to use network more efficiently
128+
* 3. the application must supply thread-safe version of the logger, or create new logger object for each of the
129+
* lcb_INSTANCE.
130+
*
131+
* As with any multi-threaded application, it requires extra testing and analysis to meet all performance and
132+
* correctness requirements.
133+
*
134+
* See threads-shared.c for the alternative approach.
135+
*
136+
*/
137+
int main(int argc, const char *argv[])
138+
{
139+
#define number_of_threads 10
140+
const char *connection_string = (argc > 1) ? argv[1] : "couchbase://127.0.0.1/default";
141+
const char *username = (argc > 2) ? argv[2] : "Administrator";
142+
const char *password = (argc > 3) ? argv[3] : "password";
143+
144+
pthread_t thrs[number_of_threads];
145+
146+
// multiple threads with independent instances
147+
lcb_INSTANCE *instances[number_of_threads];
148+
my_LOGGER loggers[number_of_threads];
149+
150+
for (int ii = 0; ii < number_of_threads; ii++) {
151+
lcb_CREATEOPTS *options = NULL;
152+
lcb_createopts_create(&options, LCB_TYPE_BUCKET);
153+
lcb_createopts_connstr(options, connection_string, strlen(connection_string));
154+
lcb_createopts_credentials(options, username, strlen(username), password, strlen(password));
155+
156+
// let each thread write logs to separate file
157+
loggers[ii].min_level = LCB_LOG_TRACE;
158+
snprintf(loggers[ii].path, sizeof(loggers[ii].path), "/tmp/lcb-%03d.log", ii);
159+
loggers[ii].file = fopen(loggers[ii].path, "a+");
160+
lcb_logger_create(&loggers[ii].base, &loggers[ii]);
161+
lcb_logger_callback(loggers[ii].base, log_callback);
162+
lcb_createopts_logger(options, loggers[ii].base);
163+
164+
lcb_create(&instances[ii], options);
165+
}
166+
167+
for (int ii = 0; ii < number_of_threads; ii++) {
168+
pthread_create(&thrs[ii], NULL, thread_func_unlocked, instances[ii]);
169+
}
170+
171+
for (int ii = 0; ii < number_of_threads; ii++) {
172+
void *ign;
173+
pthread_join(thrs[ii], &ign);
174+
lcb_destroy(instances[ii]);
175+
fclose(loggers[ii].file);
176+
}
177+
178+
return 0;
179+
}

0 commit comments

Comments
 (0)