Skip to content

Commit 483d40f

Browse files
committed
Merge pull request #3 from bbockelm/load_limits
Implement per-user and global transfer limits.
2 parents b734a95 + 9ae1629 commit 483d40f

2 files changed

Lines changed: 182 additions & 3 deletions

File tree

src/Makefile.am

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ JNIHDIR=@JNIHDIR@
55
INCLUDES= -I$(JNIHDIR) -I$(JNIHDIR)/linux @INCLUDE@
66

77
#libglobus_gridftp_server_hdfs_la_SOURCES = globus_gridftp_server_hdfs.c
8-
libglobus_gridftp_server_hdfs_la_LDFLAGS = -lhdfs -lcrypto -lz
8+
libglobus_gridftp_server_hdfs_la_LDFLAGS = -lhdfs -lcrypto -lz -lrt
99
libglobus_gridftp_server_hdfs_la_SOURCES = \
1010
gridftp_hdfs.h \
1111
gridftp_hdfs_error.h \

src/gridftp_hdfs.c

Lines changed: 181 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
#include <sys/syscall.h>
1111
#include <signal.h>
1212
#include <execinfo.h>
13+
#include <stdio.h>
14+
#include <fcntl.h>
1315

1416
#include "gridftp_hdfs.h"
1517

@@ -32,6 +34,10 @@ static globus_mutex_t g_hdfs_mutex;
3234
static pthread_t g_thread_id;
3335
static int g_thread_pipe_fd;
3436

37+
static globus_result_t check_connection_limits(const hdfs_handle_t *, int, int);
38+
static int dumb_sem_open(const char *fname, int flags, mode_t mode, int value);
39+
static int dumb_sem_timedwait(int fd, int value, int secs);
40+
3541
static void hdfs_trev(globus_gfs_event_info_t *, void *);
3642
inline void set_done(hdfs_handle_t *, globus_result_t);
3743
static int hdfs_activate(void);
@@ -556,6 +562,8 @@ hdfs_start(
556562
int load_limit = 20;
557563
int replicas;
558564
int port;
565+
int user_transfer_limit = -1;
566+
int transfer_limit = -1;
559567

560568
hdfs_handle = (hdfs_handle_t *)globus_malloc(sizeof(hdfs_handle_t));
561569
memset(hdfs_handle, 0, sizeof(hdfs_handle_t));
@@ -612,6 +620,34 @@ hdfs_start(
612620
char * port_char = getenv("GRIDFTP_HDFS_PORT");
613621
char * mount_point_char = getenv("GRIDFTP_HDFS_MOUNT_POINT");
614622
char * load_limit_char = getenv("GRIDFTP_LOAD_LIMIT");
623+
char * global_transfer_limit_char = getenv("GRIDFTP_TRANSFER_LIMIT");
624+
char * default_user_limit_char = getenv("GRIDFTP_DEFAULT_USER_TRANSFER_LIMIT");
625+
626+
char specific_limit_env_var[256];
627+
snprintf(specific_limit_env_var, 255, "GRIDFTP_%s_USER_TRANSFER_LIMIT", hdfs_handle->username);
628+
specific_limit_env_var[255] = '\0';
629+
int idx;
630+
for (idx=0; idx<256; idx++) {
631+
if (specific_limit_env_var[idx] == '\0') {break;}
632+
specific_limit_env_var[idx] = toupper(specific_limit_env_var[idx]);
633+
}
634+
char * specific_user_limit_char = getenv(specific_limit_env_var);
635+
636+
if (!specific_user_limit_char) {
637+
specific_user_limit_char = default_user_limit_char;
638+
}
639+
if (specific_user_limit_char) {
640+
user_transfer_limit = atoi(specific_user_limit_char);
641+
}
642+
if (global_transfer_limit_char) {
643+
transfer_limit = atoi(global_transfer_limit_char);
644+
}
645+
646+
if (load_limit_char != NULL) {
647+
load_limit = atoi(load_limit_char);
648+
if (load_limit < 1)
649+
load_limit = 20;
650+
}
615651

616652
// Get our hostname
617653
hdfs_handle->local_host = globus_malloc(256);
@@ -697,7 +733,7 @@ hdfs_start(
697733
double load;
698734
int ctr = 0;
699735
while (fd >= 0) {
700-
if (ctr == 120)
736+
if (ctr == 10)
701737
break;
702738
ctr += 1;
703739
nbytes = read(fd, buf, bufsize);
@@ -710,13 +746,26 @@ hdfs_start(
710746
globus_gfs_log_message(GLOBUS_GFS_LOG_DUMP, "Detected system load %.2f.\n", load);
711747
if ((load >= load_limit) && (load < 1000)) {
712748
globus_gfs_log_message(GLOBUS_GFS_LOG_DUMP, "Preventing gridftp transfer startup due to system load of %.2f.\n", load);
713-
sleep(5);
749+
sleep(5*ctr);
714750
} else {
715751
break;
716752
}
717753
close(fd);
718754
fd = open("/proc/loadavg", O_RDONLY);
719755
}
756+
if (load > load_limit) {
757+
globus_gfs_log_message(GLOBUS_GFS_LOG_INFO, "Failing transfer due to load %.2f over limit %d\n", load, load_limit);
758+
GenericError(hdfs_handle, "Server is cancelling transfer due to over-load limit", rc);
759+
finished_info.result = rc;
760+
globus_gridftp_server_operation_finished(op, rc, &finished_info);
761+
return;
762+
}
763+
764+
if ((rc = check_connection_limits(hdfs_handle, user_transfer_limit, transfer_limit)) != GLOBUS_SUCCESS) {
765+
finished_info.result = rc;
766+
globus_gridftp_server_operation_finished(op, rc, &finished_info);
767+
return;
768+
}
720769

721770
globus_gfs_log_message(GLOBUS_GFS_LOG_INFO,
722771
"Start gridftp server; hadoop nameserver %s, port %i, replicas %i.\n",
@@ -860,3 +909,133 @@ set_close_done(
860909
}
861910
}
862911

912+
/*************************************************************************
913+
* check_connection_limits
914+
* -----------------------
915+
* Make sure the number of concurrent connections to HDFS is below a certain
916+
* threshold. If we are over-threshold, wait for a fixed amount of time (1
917+
* minute) and fail the transfer.
918+
* Implementation baed on named POSIX semaphores.
919+
*************************************************************************/
920+
globus_result_t
921+
check_connection_limits(const hdfs_handle_t *hdfs_handle, int user_transfer_limit, int transfer_limit)
922+
{
923+
GlobusGFSName(check_connection_limit);
924+
globus_result_t result = GLOBUS_SUCCESS;
925+
926+
int user_lock_count = 0;
927+
if (user_transfer_limit > 0) {
928+
char user_sem_name[256];
929+
snprintf(user_sem_name, 255, "/dev/shm/gridftp-hdfs-%s-%d", hdfs_handle->username, user_transfer_limit);
930+
user_sem_name[255] = '\0';
931+
int usem = dumb_sem_open(user_sem_name, O_CREAT, 0600, user_transfer_limit);
932+
if (usem == -1) {
933+
SystemError(hdfs_handle, "Failure when determining user connection limit", result);
934+
return result;
935+
}
936+
if (-1 == (user_lock_count = dumb_sem_timedwait(usem, user_transfer_limit, 60))) {
937+
if (errno == ETIMEDOUT) {
938+
globus_gfs_log_message(GLOBUS_GFS_LOG_INFO, "Failing transfer for %s due to user connection limit of %d.\n", hdfs_handle->username, user_transfer_limit);
939+
char * failure_msg = (char *)globus_malloc(1024);
940+
snprintf(failure_msg, 1024, "Server over the user connection limit of %d", user_transfer_limit);
941+
failure_msg[1023] = '\0';
942+
GenericError(hdfs_handle, failure_msg, result);
943+
globus_free(failure_msg);
944+
} else {
945+
SystemError(hdfs_handle, "Failed to check user connection semaphore", result);
946+
}
947+
return result;
948+
}
949+
// NOTE: We now purposely leak the semaphore. It will be automatically closed when
950+
// the server process finishes this connection.
951+
}
952+
953+
int global_lock_count = 0;
954+
if (transfer_limit > 0) {
955+
char global_sem_name[256];
956+
snprintf(global_sem_name, 255, "/dev/shm//gridftp-hdfs-overall-%d", transfer_limit);
957+
global_sem_name[255] = '\0';
958+
int gsem = dumb_sem_open(global_sem_name, O_CREAT, 0666, transfer_limit);
959+
if (gsem == -1) {
960+
SystemError(hdfs_handle, "Failure when determining global connection limit", result);
961+
return result;
962+
}
963+
if (-1 == (global_lock_count=dumb_sem_timedwait(gsem, transfer_limit, 60))) {
964+
if (errno == ETIMEDOUT) {
965+
globus_gfs_log_message(GLOBUS_GFS_LOG_INFO, "Failing transfer for %s due to global connection limit of %d (user has %d transfers).\n", hdfs_handle->username, transfer_limit, user_lock_count);
966+
char * failure_msg = (char *)globus_malloc(1024);
967+
snprintf(failure_msg, 1024, "Server over the global connection limit of %d (user has %d transfers)", transfer_limit, user_lock_count);
968+
failure_msg[1023] = '\0';
969+
GenericError(hdfs_handle, failure_msg, result);
970+
globus_free(failure_msg);
971+
} else {
972+
SystemError(hdfs_handle, "Failed to check global connection semaphore", result);
973+
}
974+
return result;
975+
}
976+
// NOTE: We now purposely leak the semaphore. It will be automatically closed when
977+
// the server process finishes this connection.
978+
}
979+
if ((transfer_limit > 0) || (user_transfer_limit > 0)) {
980+
globus_gfs_log_message(GLOBUS_GFS_LOG_INFO, "Proceeding with transfer; user %s has %d active transfers (limit %d); server has %d active transfers (limit %d).\n", hdfs_handle->username, user_lock_count, user_transfer_limit, global_lock_count, transfer_limit);
981+
}
982+
983+
return result;
984+
}
985+
986+
int
987+
dumb_sem_open(const char *fname, int flags, mode_t mode, int value) {
988+
int fd = open(fname, flags | O_RDWR, mode);
989+
if (-1 == fd) {
990+
return fd;
991+
}
992+
if (-1 == posix_fallocate(fd, 0, value)) {
993+
return -1;
994+
}
995+
fchmod(fd, mode);
996+
return fd;
997+
}
998+
999+
int
1000+
dumb_sem_timedwait(int fd, int value, int secs) {
1001+
struct timespec start, now, sleeptime;
1002+
clock_gettime(CLOCK_MONOTONIC, &start);
1003+
sleeptime.tv_sec = 0;
1004+
sleeptime.tv_nsec = 500*1e6;
1005+
while (1) {
1006+
int idx = 0;
1007+
int lock_count = 0;
1008+
int need_lock = 1;
1009+
for (idx=0; idx<value; idx++) {
1010+
struct flock mylock; memset(&mylock, '\0', sizeof(mylock));
1011+
mylock.l_type = F_WRLCK;
1012+
mylock.l_whence = SEEK_SET;
1013+
mylock.l_start = idx;
1014+
mylock.l_len = 1;
1015+
if (0 == fcntl(fd, need_lock ? F_SETLK : F_GETLK, &mylock)) {
1016+
if (need_lock) { // We now have the lock.
1017+
need_lock = 0;
1018+
lock_count++;
1019+
} else if (mylock.l_type != F_UNLCK) { // We're just seeing how many locks are taken.
1020+
lock_count++;
1021+
}
1022+
continue;
1023+
}
1024+
if (errno == EAGAIN || errno == EACCES || errno == EINTR) {
1025+
lock_count++;
1026+
continue;
1027+
}
1028+
return -1;
1029+
}
1030+
if (!need_lock) { // we were able to take a lock.
1031+
return lock_count;
1032+
}
1033+
nanosleep(&sleeptime, NULL);
1034+
clock_gettime(CLOCK_MONOTONIC, &now);
1035+
if (now.tv_sec > start.tv_sec + secs) {
1036+
errno = ETIMEDOUT;
1037+
return -1;
1038+
}
1039+
}
1040+
}
1041+

0 commit comments

Comments
 (0)