Skip to content

Commit 6e18d38

Browse files
committed
Forward HDFS logging to GridFTP logging.
1 parent 24d6dff commit 6e18d38

1 file changed

Lines changed: 87 additions & 0 deletions

File tree

src/gridftp_hdfs.c

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,90 @@ gridftp_check_core()
137137
}
138138
}
139139

140+
/*
141+
* Simple thread target - continuously drain
142+
*/
143+
static void
144+
hdfs_forward_log(void *user_arg)
145+
{
146+
int *pipe_r = (int *)user_arg;
147+
FILE *fpipe = fdopen(*pipe_r, "r");
148+
if (!fpipe)
149+
{
150+
globus_gfs_log_message(GLOBUS_GFS_LOG_ERR, "Unable to reopen forwarding log descriptor at fd %d: (errno=%d, %s)\n", *pipe_r, errno, strerror(errno));
151+
return;
152+
}
153+
char line_buffer[1024];
154+
while (fgets(line_buffer, 1024, fpipe))
155+
{
156+
if (!strncmp(line_buffer, "\tat ", 4)) {continue;}
157+
else if ((line_buffer[0] == '\0') || (line_buffer[0] == '\n')) {continue;}
158+
globus_gfs_log_message(GLOBUS_GFS_LOG_INFO, line_buffer);
159+
}
160+
fclose(fpipe);
161+
free(user_arg);
162+
}
163+
164+
/*
165+
* Open stderr as a pipe which is continuously drained
166+
* via a separate thread (forwarding to the globus logging system).
167+
*/
168+
static void
169+
setup_hdfs_logging()
170+
{
171+
char fd2_path[PATH_MAX];
172+
ssize_t bytes_in_path;
173+
if ((-1 == (bytes_in_path = readlink("/dev/fd/2", fd2_path, PATH_MAX-1))) && (errno != ENOENT) && (errno != EACCES))
174+
{
175+
globus_gfs_log_message(GLOBUS_GFS_LOG_ERR, "Unable to check /dev/fd/2 as eUID %d (UID %d) to see if it is /dev/null. (errno=%d, %s)\n", geteuid(), getuid(), errno, strerror(errno));
176+
return;
177+
}
178+
if (bytes_in_path >= 0) {fd2_path[bytes_in_path] = '\0';}
179+
if ((bytes_in_path != -1) && strcmp("/dev/null", fd2_path))
180+
{
181+
globus_gfs_log_message(GLOBUS_GFS_LOG_DUMP, "stderr does not point to /dev/null; not redirecting HDFS output.\n");
182+
return;
183+
}
184+
185+
int err;
186+
pthread_attr_t attr;
187+
if ((err = pthread_attr_init(&attr)))
188+
{
189+
globus_gfs_log_message(GLOBUS_GFS_LOG_ERR, "Unable to initialize pthread attribute: (errno=%d, %s).\n", err, sterror(err));
190+
return;
191+
}
192+
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
193+
pthread_t thread_id;
194+
195+
int pipe_fds[2];
196+
if (-1 == pipe2(pipe_fds, O_CLOEXEC))
197+
{
198+
globus_gfs_log_message(GLOBUS_GFS_LOG_ERR, "Failed to open pipes for HDFS logging: (errno=%d, %s).\n", errno, strerror(errno));
199+
return;
200+
}
201+
if (-1 == dup3(pipe_fds[1], 2, O_CLOEXEC))
202+
{
203+
globus_gfs_log_message(GLOBUS_GFS_LOG_ERR, "Failed to reopen stderr for HDFS logging: (errno=%d, %s).\n", errno, strerror(errno));
204+
return;
205+
}
206+
207+
int *pipe_ptr = malloc(sizeof(int));
208+
if (pipe_ptr == NULL)
209+
{
210+
globus_gfs_log_message(GLOBUS_GFS_LOG_ERR, "Failed to allocate pointer for pipe.\n");
211+
return;
212+
}
213+
*pipe_ptr = pipe_fds[0];
214+
if ((err = pthread_create(&thread_id, &attr, hdfs_forward_log, pipe_ptr)))
215+
{
216+
globus_gfs_log_message(GLOBUS_GFS_LOG_ERR, "Failed to launch thread for monitoring HDFS logging: (errno=%d, %s).\n", errno, strerror(errno));
217+
free(pipe_ptr);
218+
return;
219+
}
220+
221+
222+
}
223+
140224
/*
141225
* Called when the HDFS module is activated.
142226
* Completely boilerplate.
@@ -449,6 +533,9 @@ hdfs_start(
449533
snprintf(hdfs_handle->syslog_msg, 255, "%s %s %%s %%i %%i", hdfs_handle->local_host, hdfs_handle->remote_host);
450534
}
451535

536+
// Forward the contents of stderr to the globus logging system.
537+
setup_hdfs_logging();
538+
452539
// Determine the maximum number of buffers; default to 200.
453540
char * max_buffer_char = getenv("GRIDFTP_BUFFER_COUNT");
454541
if (max_buffer_char != NULL) {

0 commit comments

Comments
 (0)