Skip to content

Commit 9ae1629

Browse files
committed
Implement per-user and global transfer limits.
1 parent 8fe65ee commit 9ae1629

2 files changed

Lines changed: 182 additions & 3 deletions

File tree

src/Makefile.am

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ JNIHDIR=@JNIHDIR@
55
INCLUDES= -I$(JNIHDIR) -I$(JNIHDIR)/linux @INCLUDE@
66

77
#libglobus_gridftp_server_hdfs_la_SOURCES = globus_gridftp_server_hdfs.c
8-
libglobus_gridftp_server_hdfs_la_LDFLAGS = -lhdfs -lcrypto -lz
8+
libglobus_gridftp_server_hdfs_la_LDFLAGS = -lhdfs -lcrypto -lz -lrt
99
libglobus_gridftp_server_hdfs_la_SOURCES = \
1010
gridftp_hdfs.h \
1111
gridftp_hdfs_error.h \

src/gridftp_hdfs.c

Lines changed: 181 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
#include <sys/syscall.h>
1111
#include <signal.h>
1212
#include <execinfo.h>
13+
#include <stdio.h>
14+
#include <fcntl.h>
1315

1416
#include "gridftp_hdfs.h"
1517

@@ -32,6 +34,10 @@ static globus_mutex_t g_hdfs_mutex;
3234
static pthread_t g_thread_id;
3335
static int g_thread_pipe_fd;
3436

37+
static globus_result_t check_connection_limits(const hdfs_handle_t *, int, int);
38+
static int dumb_sem_open(const char *fname, int flags, mode_t mode, int value);
39+
static int dumb_sem_timedwait(int fd, int value, int secs);
40+
3541
static void hdfs_trev(globus_gfs_event_info_t *, void *);
3642
inline void set_done(hdfs_handle_t *, globus_result_t);
3743
static int hdfs_activate(void);
@@ -504,6 +510,8 @@ hdfs_start(
504510
int load_limit = 20;
505511
int replicas;
506512
int port;
513+
int user_transfer_limit = -1;
514+
int transfer_limit = -1;
507515

508516
hdfs_handle = (hdfs_handle_t *)globus_malloc(sizeof(hdfs_handle_t));
509517
memset(hdfs_handle, 0, sizeof(hdfs_handle_t));
@@ -560,6 +568,34 @@ hdfs_start(
560568
char * port_char = getenv("GRIDFTP_HDFS_PORT");
561569
char * mount_point_char = getenv("GRIDFTP_HDFS_MOUNT_POINT");
562570
char * load_limit_char = getenv("GRIDFTP_LOAD_LIMIT");
571+
char * global_transfer_limit_char = getenv("GRIDFTP_TRANSFER_LIMIT");
572+
char * default_user_limit_char = getenv("GRIDFTP_DEFAULT_USER_TRANSFER_LIMIT");
573+
574+
char specific_limit_env_var[256];
575+
snprintf(specific_limit_env_var, 255, "GRIDFTP_%s_USER_TRANSFER_LIMIT", hdfs_handle->username);
576+
specific_limit_env_var[255] = '\0';
577+
int idx;
578+
for (idx=0; idx<256; idx++) {
579+
if (specific_limit_env_var[idx] == '\0') {break;}
580+
specific_limit_env_var[idx] = toupper(specific_limit_env_var[idx]);
581+
}
582+
char * specific_user_limit_char = getenv(specific_limit_env_var);
583+
584+
if (!specific_user_limit_char) {
585+
specific_user_limit_char = default_user_limit_char;
586+
}
587+
if (specific_user_limit_char) {
588+
user_transfer_limit = atoi(specific_user_limit_char);
589+
}
590+
if (global_transfer_limit_char) {
591+
transfer_limit = atoi(global_transfer_limit_char);
592+
}
593+
594+
if (load_limit_char != NULL) {
595+
load_limit = atoi(load_limit_char);
596+
if (load_limit < 1)
597+
load_limit = 20;
598+
}
563599

564600
// Get our hostname
565601
hdfs_handle->local_host = globus_malloc(256);
@@ -645,7 +681,7 @@ hdfs_start(
645681
double load;
646682
int ctr = 0;
647683
while (fd >= 0) {
648-
if (ctr == 120)
684+
if (ctr == 10)
649685
break;
650686
ctr += 1;
651687
nbytes = read(fd, buf, bufsize);
@@ -658,13 +694,26 @@ hdfs_start(
658694
globus_gfs_log_message(GLOBUS_GFS_LOG_DUMP, "Detected system load %.2f.\n", load);
659695
if ((load >= load_limit) && (load < 1000)) {
660696
globus_gfs_log_message(GLOBUS_GFS_LOG_DUMP, "Preventing gridftp transfer startup due to system load of %.2f.\n", load);
661-
sleep(5);
697+
sleep(5*ctr);
662698
} else {
663699
break;
664700
}
665701
close(fd);
666702
fd = open("/proc/loadavg", O_RDONLY);
667703
}
704+
if (load > load_limit) {
705+
globus_gfs_log_message(GLOBUS_GFS_LOG_INFO, "Failing transfer due to load %.2f over limit %d\n", load, load_limit);
706+
GenericError(hdfs_handle, "Server is cancelling transfer due to over-load limit", rc);
707+
finished_info.result = rc;
708+
globus_gridftp_server_operation_finished(op, rc, &finished_info);
709+
return;
710+
}
711+
712+
if ((rc = check_connection_limits(hdfs_handle, user_transfer_limit, transfer_limit)) != GLOBUS_SUCCESS) {
713+
finished_info.result = rc;
714+
globus_gridftp_server_operation_finished(op, rc, &finished_info);
715+
return;
716+
}
668717

669718
globus_gfs_log_message(GLOBUS_GFS_LOG_INFO,
670719
"Start gridftp server; hadoop nameserver %s, port %i, replicas %i.\n",
@@ -808,3 +857,133 @@ set_close_done(
808857
}
809858
}
810859

860+
/*************************************************************************
861+
* check_connection_limits
862+
* -----------------------
863+
* Make sure the number of concurrent connections to HDFS is below a certain
864+
* threshold. If we are over-threshold, wait for a fixed amount of time (1
865+
* minute) and fail the transfer.
866+
* Implementation baed on named POSIX semaphores.
867+
*************************************************************************/
868+
globus_result_t
869+
check_connection_limits(const hdfs_handle_t *hdfs_handle, int user_transfer_limit, int transfer_limit)
870+
{
871+
GlobusGFSName(check_connection_limit);
872+
globus_result_t result = GLOBUS_SUCCESS;
873+
874+
int user_lock_count = 0;
875+
if (user_transfer_limit > 0) {
876+
char user_sem_name[256];
877+
snprintf(user_sem_name, 255, "/dev/shm/gridftp-hdfs-%s-%d", hdfs_handle->username, user_transfer_limit);
878+
user_sem_name[255] = '\0';
879+
int usem = dumb_sem_open(user_sem_name, O_CREAT, 0600, user_transfer_limit);
880+
if (usem == -1) {
881+
SystemError(hdfs_handle, "Failure when determining user connection limit", result);
882+
return result;
883+
}
884+
if (-1 == (user_lock_count = dumb_sem_timedwait(usem, user_transfer_limit, 60))) {
885+
if (errno == ETIMEDOUT) {
886+
globus_gfs_log_message(GLOBUS_GFS_LOG_INFO, "Failing transfer for %s due to user connection limit of %d.\n", hdfs_handle->username, user_transfer_limit);
887+
char * failure_msg = (char *)globus_malloc(1024);
888+
snprintf(failure_msg, 1024, "Server over the user connection limit of %d", user_transfer_limit);
889+
failure_msg[1023] = '\0';
890+
GenericError(hdfs_handle, failure_msg, result);
891+
globus_free(failure_msg);
892+
} else {
893+
SystemError(hdfs_handle, "Failed to check user connection semaphore", result);
894+
}
895+
return result;
896+
}
897+
// NOTE: We now purposely leak the semaphore. It will be automatically closed when
898+
// the server process finishes this connection.
899+
}
900+
901+
int global_lock_count = 0;
902+
if (transfer_limit > 0) {
903+
char global_sem_name[256];
904+
snprintf(global_sem_name, 255, "/dev/shm//gridftp-hdfs-overall-%d", transfer_limit);
905+
global_sem_name[255] = '\0';
906+
int gsem = dumb_sem_open(global_sem_name, O_CREAT, 0666, transfer_limit);
907+
if (gsem == -1) {
908+
SystemError(hdfs_handle, "Failure when determining global connection limit", result);
909+
return result;
910+
}
911+
if (-1 == (global_lock_count=dumb_sem_timedwait(gsem, transfer_limit, 60))) {
912+
if (errno == ETIMEDOUT) {
913+
globus_gfs_log_message(GLOBUS_GFS_LOG_INFO, "Failing transfer for %s due to global connection limit of %d (user has %d transfers).\n", hdfs_handle->username, transfer_limit, user_lock_count);
914+
char * failure_msg = (char *)globus_malloc(1024);
915+
snprintf(failure_msg, 1024, "Server over the global connection limit of %d (user has %d transfers)", transfer_limit, user_lock_count);
916+
failure_msg[1023] = '\0';
917+
GenericError(hdfs_handle, failure_msg, result);
918+
globus_free(failure_msg);
919+
} else {
920+
SystemError(hdfs_handle, "Failed to check global connection semaphore", result);
921+
}
922+
return result;
923+
}
924+
// NOTE: We now purposely leak the semaphore. It will be automatically closed when
925+
// the server process finishes this connection.
926+
}
927+
if ((transfer_limit > 0) || (user_transfer_limit > 0)) {
928+
globus_gfs_log_message(GLOBUS_GFS_LOG_INFO, "Proceeding with transfer; user %s has %d active transfers (limit %d); server has %d active transfers (limit %d).\n", hdfs_handle->username, user_lock_count, user_transfer_limit, global_lock_count, transfer_limit);
929+
}
930+
931+
return result;
932+
}
933+
934+
int
935+
dumb_sem_open(const char *fname, int flags, mode_t mode, int value) {
936+
int fd = open(fname, flags | O_RDWR, mode);
937+
if (-1 == fd) {
938+
return fd;
939+
}
940+
if (-1 == posix_fallocate(fd, 0, value)) {
941+
return -1;
942+
}
943+
fchmod(fd, mode);
944+
return fd;
945+
}
946+
947+
int
948+
dumb_sem_timedwait(int fd, int value, int secs) {
949+
struct timespec start, now, sleeptime;
950+
clock_gettime(CLOCK_MONOTONIC, &start);
951+
sleeptime.tv_sec = 0;
952+
sleeptime.tv_nsec = 500*1e6;
953+
while (1) {
954+
int idx = 0;
955+
int lock_count = 0;
956+
int need_lock = 1;
957+
for (idx=0; idx<value; idx++) {
958+
struct flock mylock; memset(&mylock, '\0', sizeof(mylock));
959+
mylock.l_type = F_WRLCK;
960+
mylock.l_whence = SEEK_SET;
961+
mylock.l_start = idx;
962+
mylock.l_len = 1;
963+
if (0 == fcntl(fd, need_lock ? F_SETLK : F_GETLK, &mylock)) {
964+
if (need_lock) { // We now have the lock.
965+
need_lock = 0;
966+
lock_count++;
967+
} else if (mylock.l_type != F_UNLCK) { // We're just seeing how many locks are taken.
968+
lock_count++;
969+
}
970+
continue;
971+
}
972+
if (errno == EAGAIN || errno == EACCES || errno == EINTR) {
973+
lock_count++;
974+
continue;
975+
}
976+
return -1;
977+
}
978+
if (!need_lock) { // we were able to take a lock.
979+
return lock_count;
980+
}
981+
nanosleep(&sleeptime, NULL);
982+
clock_gettime(CLOCK_MONOTONIC, &now);
983+
if (now.tv_sec > start.tv_sec + secs) {
984+
errno = ETIMEDOUT;
985+
return -1;
986+
}
987+
}
988+
}
989+

0 commit comments

Comments
 (0)