Skip to content

Commit 198b548

Browse files
committed
Add support for resizable buffers and on-demand checksumming.
1 parent c17b23c commit 198b548

1 file changed

Lines changed: 112 additions & 21 deletions

File tree

src/gridftp_hdfs_cksm.c

Lines changed: 112 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@
1515
#define CVMFS_CHUNK_SIZE (24*1024*1024)
1616

1717
// TODO: resizable output buffer.
18-
#define OUTPUT_BUFFER_SIZE (128*1024)
18+
#define OUTPUT_BUFFER_STARTING_SIZE (4*1024)
19+
#define OUTPUT_BUFFER_SIZE (256*1024)
1920

2021
// CRC table taken from POSIX description of algorithm.
2122
static uint32_t const crctab[256] =
@@ -74,6 +75,59 @@ static uint32_t const crctab[256] =
7475
0xa2f33668, 0xbcb4666d, 0xb8757bda, 0xb5365d03, 0xb1f740b4
7576
};
7677

78+
/*
79+
* Helper function to concatenate formatted strings onto a buffer.
80+
*
81+
* - `buffer` is the current base of the memory buffer as returned by malloc;
82+
* this memory may be realloc'd to grow the total string space.
83+
* - `offset` is the offset into the buffer where writing should occur.
84+
* - `length` is the total length of the memory buffer.
85+
*
86+
* If buffer, offset, or length are NULL, it is considered an error.
87+
*
88+
* On error, buffer is free'd and NULL is returned.
89+
*
90+
* On success, the current buffer location is returned and offset / length are updated
91+
* appropriately.
92+
*/
93+
static
94+
char *concatenate(char *buffer, globus_off_t *offset, globus_size_t *length, const char *format, ...) {
95+
if (buffer == NULL) {
96+
return NULL;
97+
}
98+
if (length == 0) {
99+
return buffer;
100+
}
101+
102+
globus_size_t remaining_length = *length - *offset;
103+
char *cur = buffer + *offset;
104+
105+
va_list args;
106+
va_start(args, format);
107+
int rc = vsnprintf(cur, remaining_length, format, args);
108+
if (rc < 0) {
109+
free(buffer);
110+
return NULL;
111+
} else if (rc >= remaining_length) {
112+
// Resize the buffer; add in an extra KB to avoid having to resize often.
113+
*length = *length - remaining_length + rc + 1 + 1024;
114+
remaining_length = *length - *offset;
115+
cur = realloc(buffer, *length);
116+
if (cur == NULL) {
117+
free(buffer);
118+
va_end(args);
119+
return NULL;
120+
}
121+
buffer = cur;
122+
cur = buffer + *offset;
123+
rc = vsnprintf(cur, remaining_length, format, args);
124+
assert((rc >= 0) && (rc < remaining_length));
125+
}
126+
va_end(args);
127+
*offset += rc;
128+
return buffer;
129+
}
130+
77131
/*
78132
* Taken from globus_gridftp_server_file.c
79133
* Assume md5_human is length digest_length*2+1
@@ -126,21 +180,23 @@ static void emit_cvmfs_chunk(hdfs_handle_t *hdfs_handle) {
126180
}
127181

128182
static void emit_cvmfs_graft(hdfs_handle_t *hdfs_handle) {
129-
hdfs_handle->cvmfs_graft = malloc(OUTPUT_BUFFER_SIZE);
183+
globus_size_t size = OUTPUT_BUFFER_STARTING_SIZE;
184+
hdfs_handle->cvmfs_graft = malloc(size);
185+
globus_off_t offset = 0;
130186
if (!hdfs_handle->cvmfs_graft) {return;}
131187

132-
unsigned int length = snprintf(hdfs_handle->cvmfs_graft, OUTPUT_BUFFER_SIZE, "size=%lld;checksum=%s", hdfs_handle->offset, hdfs_handle->file_sha1_human);
188+
hdfs_handle->cvmfs_graft = concatenate(hdfs_handle->cvmfs_graft, &offset, &size, "size=%lld;checksum=%s", hdfs_handle->offset, hdfs_handle->file_sha1_human);
133189
if (hdfs_handle->chunk_count < 2) {
134-
snprintf(hdfs_handle->cvmfs_graft+length, OUTPUT_BUFFER_SIZE-length, ";chunk_offsets=0;chunk_checksums=%s", hdfs_handle->file_sha1_human);
190+
hdfs_handle->cvmfs_graft = concatenate(hdfs_handle->cvmfs_graft, &offset, &size, ";chunk_offsets=0;chunk_checksums=%s", hdfs_handle->file_sha1_human);
135191
} else {
136-
length += snprintf(hdfs_handle->cvmfs_graft+length, OUTPUT_BUFFER_SIZE-length, ";chunk_offsets=0");
192+
hdfs_handle->cvmfs_graft = concatenate(hdfs_handle->cvmfs_graft, &offset, &size, ";chunk_offsets=0");
137193
unsigned int idx;
138194
for (idx = 1; idx<hdfs_handle->chunk_count; idx++) {
139-
length += snprintf(hdfs_handle->cvmfs_graft+length, OUTPUT_BUFFER_SIZE-length, ",%lld", hdfs_handle->chunk_offsets[idx]);
195+
hdfs_handle->cvmfs_graft = concatenate(hdfs_handle->cvmfs_graft, &offset, &size, ",%lld", hdfs_handle->chunk_offsets[idx]);
140196
}
141-
length += snprintf(hdfs_handle->cvmfs_graft+length, OUTPUT_BUFFER_SIZE-length, ";chunk_checksums=%s", hdfs_handle->chunk_sha1_human[0]);
197+
hdfs_handle->cvmfs_graft = concatenate(hdfs_handle->cvmfs_graft, &offset, &size, ";chunk_checksums=%s", hdfs_handle->chunk_sha1_human[0]);
142198
for (idx = 1; idx<hdfs_handle->chunk_count; idx++) {
143-
length += snprintf(hdfs_handle->cvmfs_graft+length, OUTPUT_BUFFER_SIZE-length, ",%s", hdfs_handle->chunk_sha1_human[idx]);
199+
hdfs_handle->cvmfs_graft = concatenate(hdfs_handle->cvmfs_graft, &offset, &size, ",%s", hdfs_handle->chunk_sha1_human[idx]);
144200
}
145201
}
146202
}
@@ -296,27 +352,29 @@ globus_result_t hdfs_save_checksum(hdfs_handle_t *hdfs_handle) {
296352
return rc;
297353
}
298354

299-
char *buffer = malloc(OUTPUT_BUFFER_SIZE);
300-
unsigned int length = 0;
355+
globus_size_t size = OUTPUT_BUFFER_STARTING_SIZE;
356+
globus_off_t offset = 0;
357+
char *buffer = malloc(size);
301358
if (hdfs_handle->cksm_types & HDFS_CKSM_TYPE_CKSUM) {
302-
length += snprintf(buffer, OUTPUT_BUFFER_SIZE, "CKSUM:%u\n", hdfs_handle->cksum);
359+
buffer = concatenate(buffer, &offset, &size, "CKSUM:%u\n", hdfs_handle->cksum);
303360
}
304361
if (hdfs_handle->cksm_types & HDFS_CKSM_TYPE_CRC32) {
305-
length += snprintf(buffer+length, OUTPUT_BUFFER_SIZE-length, "CRC32:%u\n", hdfs_handle->crc32);
362+
buffer = concatenate(buffer, &offset, &size, "CRC32:%u\n", hdfs_handle->crc32);
306363
}
307364
if (hdfs_handle->cksm_types & HDFS_CKSM_TYPE_ADLER32) {
308-
length += snprintf(buffer+length, OUTPUT_BUFFER_SIZE-length, "ADLER32:%s\n", hdfs_handle->adler32_human);
365+
buffer = concatenate(buffer, &offset, &size, "ADLER32:%s\n", hdfs_handle->adler32_human);
309366
}
310367
if (hdfs_handle->cksm_types & HDFS_CKSM_TYPE_MD5) {
311368
hdfs_handle->md5_output_human[MD5_DIGEST_LENGTH*2] = '\0';
312-
length += snprintf(buffer+length, OUTPUT_BUFFER_SIZE-length, "MD5:%s\n", hdfs_handle->md5_output_human);
369+
buffer = concatenate(buffer, &offset, &size, "MD5:%s\n", hdfs_handle->md5_output_human);
313370
}
314371
if (hdfs_handle->cksm_types & HDFS_CKSM_TYPE_CVMFS) {
315-
length += snprintf(buffer+length, OUTPUT_BUFFER_SIZE-length, "CVMFS:%s\n", hdfs_handle->cvmfs_graft);
372+
buffer = concatenate(buffer, &offset, &size, "CVMFS:%s\n", hdfs_handle->cvmfs_graft);
316373
}
317-
318-
// Returns # of bytes, -1 on err
319-
if (hdfsWrite(fs, fh, buffer, length) < 0) {
374+
if (buffer == NULL) {
375+
MemoryError(hdfs_handle, "Failed to allocate checksum string", rc);
376+
// Returns # of bytes, -1 on err
377+
} else if (hdfsWrite(fs, fh, buffer, size) < 0) {
320378
SystemError(hdfs_handle, "Failed to write checksum file", rc);
321379
}
322380

@@ -441,14 +499,44 @@ globus_result_t hdfs_get_checksum_internal(hdfs_handle_t *hdfs_handle, const cha
441499
}
442500
}
443501

444-
char buffer[OUTPUT_BUFFER_SIZE], cksm[OUTPUT_BUFFER_SIZE], *val;
445-
buffer[OUTPUT_BUFFER_SIZE-1] = '\0';
446-
if (hdfsRead(fs, fh, buffer, OUTPUT_BUFFER_SIZE-1) <= 0) {
502+
globus_size_t size = OUTPUT_BUFFER_STARTING_SIZE;
503+
char *buffer = malloc(size);
504+
char *read_buffer = malloc(OUTPUT_BUFFER_STARTING_SIZE);
505+
globus_off_t off = 0;
506+
if (!read_buffer || !buffer) {
507+
MemoryError(hdfs_handle, "Failed to allocate checksum read buffers", rc);
508+
return rc;
509+
}
510+
tSize retval = 0;
511+
do {
512+
do {
513+
errno = 0; // Some versions of libhdfs forget to clear errno internally.
514+
retval = hdfsRead(fs, fh, read_buffer, OUTPUT_BUFFER_SIZE-1);
515+
} while ((retval < 0) && errno == EINTR);
516+
517+
if (retval > 0) {
518+
buffer = concatenate(buffer, &off, &size, "%s", read_buffer);
519+
}
520+
} while (retval > 0);
521+
522+
if (retval < 0) {
447523
SystemError(hdfs_handle, "Failed to read checksum file", rc);
524+
free(buffer);
525+
free(read_buffer);
526+
return rc;
448527
}
528+
449529
unsigned length = 0;
450530
const char * ptr = buffer;
531+
char *cksm = malloc(length);
532+
char *val;
451533
*cksm_value = NULL;
534+
if (!cksm || !buffer) {
535+
MemoryError(hdfs_handle, "Failed to allocate checksum parse buffers", rc);
536+
free(buffer);
537+
free(read_buffer);
538+
return rc;
539+
}
452540
// Raise your hand if you hate string parsing in C.
453541
while (sscanf(ptr, "%s%n", cksm, &length) == 1) {
454542
//globus_gfs_log_message(GLOBUS_GFS_LOG_DUMP, "Checksum line: %s.\n", cksm);
@@ -513,6 +601,9 @@ globus_result_t hdfs_get_checksum_internal(hdfs_handle_t *hdfs_handle, const cha
513601
if (hdfs_handle->pathname) {
514602
free(hdfs_handle->pathname);
515603
}
604+
free(cksm);
605+
free(buffer);
606+
free(read_buffer);
516607
// Note we purposely leak the filesystem handle (fs), as Hadoop has disconnect issues.
517608
return rc;
518609
}

0 commit comments

Comments
 (0)