Skip to content

Commit 3f23e9d

Browse files
committed
Cleanup race condition that could lead to missed messages.
1 parent 6e18d38 commit 3f23e9d

1 file changed

Lines changed: 69 additions & 7 deletions

File tree

src/gridftp_hdfs.c

Lines changed: 69 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,10 @@ char err_msg[MSG_SIZE];
2828
int local_io_block_size = 0;
2929
int local_io_count = 0;
3030

31+
static globus_mutex_t g_hdfs_mutex;
32+
static pthread_t g_thread_id;
33+
static int g_thread_pipe_fd;
34+
3135
static void hdfs_trev(globus_gfs_event_info_t *, void *);
3236
inline void set_done(hdfs_handle_t *, globus_result_t);
3337
static int hdfs_activate(void);
@@ -140,25 +144,34 @@ gridftp_check_core()
140144
/*
141145
* Simple thread target - continuously drain
142146
*/
143-
static void
147+
static void *
144148
hdfs_forward_log(void *user_arg)
145149
{
146150
int *pipe_r = (int *)user_arg;
147151
FILE *fpipe = fdopen(*pipe_r, "r");
148152
if (!fpipe)
149153
{
150154
globus_gfs_log_message(GLOBUS_GFS_LOG_ERR, "Unable to reopen forwarding log descriptor at fd %d: (errno=%d, %s)\n", *pipe_r, errno, strerror(errno));
151-
return;
155+
return NULL;
152156
}
157+
globus_gfs_log_message(GLOBUS_GFS_LOG_DUMP, "Starting HDFS log forwarder; messages from HDFS are prefixed with 'HDFS: '\n");
153158
char line_buffer[1024];
159+
unsigned log_count = 0;
154160
while (fgets(line_buffer, 1024, fpipe))
155161
{
156162
if (!strncmp(line_buffer, "\tat ", 4)) {continue;}
157163
else if ((line_buffer[0] == '\0') || (line_buffer[0] == '\n')) {continue;}
158-
globus_gfs_log_message(GLOBUS_GFS_LOG_INFO, line_buffer);
164+
globus_gfs_log_message(GLOBUS_GFS_LOG_INFO, "HDFS: %s", line_buffer);
165+
log_count++;
159166
}
160167
fclose(fpipe);
161168
free(user_arg);
169+
if (log_count)
170+
{
171+
globus_gfs_log_message(GLOBUS_GFS_LOG_ERR, "Stopping HDFS log forwarder; %lu messages forwarded.\n", log_count);
172+
}
173+
globus_mutex_unlock(&g_hdfs_mutex);
174+
return NULL;
162175
}
163176

164177
/*
@@ -168,17 +181,25 @@ hdfs_forward_log(void *user_arg)
168181
static void
169182
setup_hdfs_logging()
170183
{
184+
if (globus_mutex_trylock(&g_hdfs_mutex))
185+
{
186+
// The logging thread has already been initialized.
187+
return;
188+
}
189+
171190
char fd2_path[PATH_MAX];
172191
ssize_t bytes_in_path;
173192
if ((-1 == (bytes_in_path = readlink("/dev/fd/2", fd2_path, PATH_MAX-1))) && (errno != ENOENT) && (errno != EACCES))
174193
{
175194
globus_gfs_log_message(GLOBUS_GFS_LOG_ERR, "Unable to check /dev/fd/2 as eUID %d (UID %d) to see if it is /dev/null. (errno=%d, %s)\n", geteuid(), getuid(), errno, strerror(errno));
195+
globus_mutex_unlock(&g_hdfs_mutex);
176196
return;
177197
}
178198
if (bytes_in_path >= 0) {fd2_path[bytes_in_path] = '\0';}
179199
if ((bytes_in_path != -1) && strcmp("/dev/null", fd2_path))
180200
{
181201
globus_gfs_log_message(GLOBUS_GFS_LOG_DUMP, "stderr does not point to /dev/null; not redirecting HDFS output.\n");
202+
globus_mutex_unlock(&g_hdfs_mutex);
182203
return;
183204
}
184205

@@ -187,33 +208,38 @@ setup_hdfs_logging()
187208
if ((err = pthread_attr_init(&attr)))
188209
{
189210
globus_gfs_log_message(GLOBUS_GFS_LOG_ERR, "Unable to initialize pthread attribute: (errno=%d, %s).\n", err, sterror(err));
211+
globus_mutex_unlock(&g_hdfs_mutex);
190212
return;
191213
}
192-
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
193-
pthread_t thread_id;
194214

195215
int pipe_fds[2];
196216
if (-1 == pipe2(pipe_fds, O_CLOEXEC))
197217
{
198218
globus_gfs_log_message(GLOBUS_GFS_LOG_ERR, "Failed to open pipes for HDFS logging: (errno=%d, %s).\n", errno, strerror(errno));
219+
globus_mutex_unlock(&g_hdfs_mutex);
199220
return;
200221
}
201222
if (-1 == dup3(pipe_fds[1], 2, O_CLOEXEC))
202223
{
203224
globus_gfs_log_message(GLOBUS_GFS_LOG_ERR, "Failed to reopen stderr for HDFS logging: (errno=%d, %s).\n", errno, strerror(errno));
225+
globus_mutex_unlock(&g_hdfs_mutex);
204226
return;
205227
}
228+
close(pipe_fds[1]);
229+
g_thread_pipe_fd = 2;
206230

207231
int *pipe_ptr = malloc(sizeof(int));
208232
if (pipe_ptr == NULL)
209233
{
210234
globus_gfs_log_message(GLOBUS_GFS_LOG_ERR, "Failed to allocate pointer for pipe.\n");
235+
globus_mutex_unlock(&g_hdfs_mutex);
211236
return;
212237
}
213238
*pipe_ptr = pipe_fds[0];
214-
if ((err = pthread_create(&thread_id, &attr, hdfs_forward_log, pipe_ptr)))
239+
if ((err = pthread_create(&g_thread_id, &attr, hdfs_forward_log, pipe_ptr)))
215240
{
216241
globus_gfs_log_message(GLOBUS_GFS_LOG_ERR, "Failed to launch thread for monitoring HDFS logging: (errno=%d, %s).\n", errno, strerror(errno));
242+
globus_mutex_unlock(&g_hdfs_mutex);
217243
free(pipe_ptr);
218244
return;
219245
}
@@ -223,11 +249,18 @@ setup_hdfs_logging()
223249

224250
/*
225251
* Called when the HDFS module is activated.
226-
* Completely boilerplate.
252+
* Initializes the global mutex.
227253
*/
228254
int
229255
hdfs_activate(void)
230256
{
257+
if (globus_mutex_init(&g_hdfs_mutex, GLOBUS_NULL)) {
258+
globus_gfs_log_message(GLOBUS_GFS_LOG_ERR, "Unable to initialize global mutex");
259+
return 1;
260+
}
261+
g_thread_id = -1;
262+
g_thread_pipe_fd = -1;
263+
231264
globus_extension_registry_add(
232265
GLOBUS_GFS_DSI_REGISTRY,
233266
"hdfs",
@@ -244,6 +277,23 @@ hdfs_activate(void)
244277
int
245278
hdfs_deactivate(void)
246279
{
280+
if (g_thread_id > 0)
281+
{
282+
if (g_thread_pipe_fd >= 0)
283+
{
284+
fflush(stderr);
285+
close(g_thread_pipe_fd);
286+
}
287+
void *retval;
288+
pthread_join(g_thread_id, &retval);
289+
g_thread_id = -1;
290+
g_thread_pipe_fd = -1;
291+
}
292+
293+
globus_mutex_destroy(&g_hdfs_mutex);
294+
g_thread_id = -1;
295+
g_thread_pipe_fd = -1;
296+
247297
globus_extension_registry_remove(
248298
GLOBUS_GFS_DSI_REGISTRY, "hdfs");
249299

@@ -744,5 +794,17 @@ set_close_done(
744794
if ((hdfs_handle->done_status == GLOBUS_SUCCESS) && (rc != GLOBUS_SUCCESS)) {
745795
hdfs_handle->done_status = rc;
746796
}
797+
if (g_thread_id > 0)
798+
{
799+
if (g_thread_pipe_fd >= 0)
800+
{
801+
fflush(stderr);
802+
close(g_thread_pipe_fd);
803+
}
804+
void *retval;
805+
pthread_join(g_thread_id, &retval);
806+
g_thread_id = -1;
807+
g_thread_pipe_fd = -1;
808+
}
747809
}
748810

0 commit comments

Comments
 (0)