From f67584b8defe0319cf54d17ec7a16c8cd9dcda0b Mon Sep 17 00:00:00 2001 From: =?utf8?q?Michal=20Voc=C5=AF?= Date: Fri, 2 Jun 2006 13:03:17 +0000 Subject: [PATCH] * use modified il_string interface * faster read_event --- org.glite.lb.logger/src/event_store.c | 10 +- org.glite.lb.logger/src/il_master.c | 21 ++-- org.glite.lb.logger/src/input_queue_socket.c | 143 ++++++++++++++++++++++----- org.glite.lb.logger/src/interlogd.c | 2 +- org.glite.lb.logger/src/interlogd.h | 8 +- org.glite.lb.logger/src/perftest_il.sh | 9 +- org.glite.lb.logger/src/server_msg.c | 16 +-- 7 files changed, 159 insertions(+), 50 deletions(-) diff --git a/org.glite.lb.logger/src/event_store.c b/org.glite.lb.logger/src/event_store.c index feab102..365406b 100644 --- a/org.glite.lb.logger/src/event_store.c +++ b/org.glite.lb.logger/src/event_store.c @@ -426,8 +426,14 @@ event_store_recover(struct event_store *es) ret = -1; /* create message for server */ - msg = server_msg_create(event_s, last); - free(event_s); + { + il_octet_string_t e; + + e.data = event_s; + e.len = strlen(event_s); + msg = server_msg_create(&e, last); + free(event_s); + } if(msg == NULL) { il_log(LOG_ALERT, " event file corrupted! Please move it to quarantine (ie. somewhere else) and restart interlogger.\n"); break; diff --git a/org.glite.lb.logger/src/il_master.c b/org.glite.lb.logger/src/il_master.c index e127af6..c09499d 100644 --- a/org.glite.lb.logger/src/il_master.c +++ b/org.glite.lb.logger/src/il_master.c @@ -128,7 +128,7 @@ parse_cmd(char *event, char **job_id_s, long *receipt, int *timeout) static int -handle_cmd(char *event, long offset) +handle_cmd(il_octet_string_t *event, long offset) { char *job_id_s; struct event_queue *eq; @@ -139,7 +139,7 @@ handle_cmd(char *event, long offset) struct timeval tv; /* parse command */ - if(parse_cmd(event, &job_id_s, &receipt, &timeout) < 0) + if(parse_cmd(event->data, &job_id_s, &receipt, &timeout) < 0) return(0); #if defined(INTERLOGD_FLUSH) @@ -304,7 +304,7 @@ cmd_error: static int -handle_msg(char *event, long offset) +handle_msg(il_octet_string_t *event, long offset) { struct server_msg *msg = NULL; #if !defined(IL_NOTIFICATIONS) @@ -386,10 +386,13 @@ loop() { /* receive events */ while(1) { - char *msg; + il_octet_string_t msg; long offset; int ret; + if(killflg) + exit(0); + clear_error(); if((ret = input_queue_get(&msg, &offset, INPUT_TIMEOUT)) < 0) { @@ -408,17 +411,17 @@ loop() } #ifdef PERF_EMPTY - glite_wll_perftest_consumeEventString(msg); - free(msg); + glite_wll_perftest_consumeEventString(msg.data); + free(msg.data); continue; #endif #ifdef INTERLOGD_HANDLE_CMD - ret = handle_cmd(msg, offset); + ret = handle_cmd(&msg, offset); if(ret == 0) #endif - ret = handle_msg(msg, offset); - free(msg); + ret = handle_msg(&msg, offset); + free(msg.data); if(ret < 0) switch (error_get_maj()) { case IL_SYS: diff --git a/org.glite.lb.logger/src/input_queue_socket.c b/org.glite.lb.logger/src/input_queue_socket.c index 71fc89d..19c81e3 100644 --- a/org.glite.lb.logger/src/input_queue_socket.c +++ b/org.glite.lb.logger/src/input_queue_socket.c @@ -68,32 +68,96 @@ void input_queue_detach() } +#define DEFAULT_CHUNK_SIZE 1024 + static -char * -read_event(int sock, long *offset) +int +read_event(int sock, long *offset, il_octet_string_t *msg) { char *buffer, *p, *n; - int len, alen; - char buf[256]; + int len, alen, i, chunk_size = DEFAULT_CHUNK_SIZE; + static char buf[1024]; + + msg->data = NULL; + msg->len = 0; /* receive offset */ len = recv(sock, offset, sizeof(*offset), MSG_NOSIGNAL); if(len < sizeof(*offset)) { set_error(IL_PROTO, errno, "read_event: error reading offset"); - return(NULL); + return(-1); } /* receive event string */ - buffer=malloc(1024); + buffer=malloc(8*chunk_size); if(buffer == NULL) { set_error(IL_NOMEM, ENOMEM, "read_event: no room for event"); - return(NULL); + return(-1); } p = buffer; + alen = 8*chunk_size; - alen = 1024; + /* Variables used here: + - buffer points to allocated memory, + - alen is the allocated memory size, + - p points to the first free location in buffer, + - len is the amount actually read by recv, + - i is the amount of data belonging to the current event (including separator). + - n points to event separator or is NULL + Hence: + (p - buffer) gives the amount of valid data read so far, + (alen - (p - buffer)) is the free space, + */ + +#if 1 + /* Reading events - optimized version. Attempts to increase chunks read by recv + * when there are more data, reads directly into destination memory (instead of + * copying from static buffer) etc. + * + * For some reason it is not much faster than the old variant. + */ + do { + /* prepare at least chunk_size bytes for next data */ + if(alen - (p - buffer) < chunk_size) { + alen += (chunk_size < 8192) ? 8192 : 8*chunk_size; + n = realloc(buffer, alen); + if(n == NULL) { + free(buffer); + set_error(IL_NOMEM, ENOMEM, "read_event: no room for event"); + return(-1); + } + p = n + (p - buffer); + buffer = n; + } + + /* read chunk */ + if((len=recv(sock, p, chunk_size, MSG_PEEK | MSG_NOSIGNAL)) > 0) { + /* find the end of event, if any */ + /* faster (and dirty) way of doing strnchr (which is not in libc, anyway) */ + if((n=memccpy(p, p, EVENT_SEPARATOR, len)) != NULL) { + i = n - p; /* length including separator */ + } else { + i = len; + /* long event, huh? try reading more data at once */ + chunk_size += 1024; + } + /* remove the relevant data from input */ + /* i > 0 */ + if(recv(sock, p, i, MSG_NOSIGNAL) != i) { + set_error(IL_SYS, errno, "read_event: error reading data"); + free(buffer); + return(-1); + } + /* move the pointer to the first free place, separator is considered free space */ + p = (n == NULL) ? p + len : n - 1; + } + } while ( (len > 0) && (n == NULL) ); + +#else + /* Reading events - original version. + * Appears to behave quite good, anyway. + */ while((len=recv(sock, buf, sizeof(buf), MSG_PEEK | MSG_NOSIGNAL)) > 0) { - int i; /* we have to be prepared for sizeof(buf) bytes */ if(alen - (p - buffer) < (int)sizeof(buf)) { @@ -102,27 +166,41 @@ read_event(int sock, long *offset) if(n == NULL) { free(buffer); set_error(IL_NOMEM, ENOMEM, "read_event: no room for event"); - return(NULL); + return(-1); } p = p - buffer + n; buffer = n; } /* copy all relevant bytes from buffer */ - for(i=0; (i < len) && (buf[i] != EVENT_SEPARATOR); i++) - *p++ = buf[i]; + n = (char*)memccpy(p, buf, EVENT_SEPARATOR, len); + if(n) { + /* separator found */ + n--; /* but do not preserve it */ + i = n - p; + p = n; + } else { + /* separator not found */ + i = len; + p += len; + } + /* This was definitely slowing us down: + * for(i=0; (i < len) && (buf[i] != EVENT_SEPARATOR); i++) + * *p++ = buf[i]; + */ /* remove the data from queue */ if(i > 0) if(recv(sock, buf, i, MSG_NOSIGNAL) != i) { set_error(IL_SYS, errno, "read_event: error reading data"); free(buffer); - return(NULL); + return(-1); } if(i < len) /* the event is complete */ break; } +#endif /* terminate buffer */ *p = 0; @@ -130,16 +208,17 @@ read_event(int sock, long *offset) if(len < 0) { set_error(IL_SYS, errno, "read_event: error reading data"); free(buffer); - return(NULL); + return(-1); } /* if len == 0, we have not encountered EVENT_SEPARATOR and thus the event is not complete */ if(len == 0) { set_error(IL_PROTO, errno, "read_event: error reading data - premature EOF"); free(buffer); - return(NULL); + return(-1); } +#if 0 /* this is probably not necessary at all: either len <=0, which was covered before, or 0 <= i < len => p > buffer; @@ -148,10 +227,13 @@ read_event(int sock, long *offset) if(p == buffer) { set_error(IL_PROTO, errno, "read_event: error reading data - no data received"); free(buffer); - return(NULL); + return(-1); } +#endif - return(buffer); + msg->data = buffer; + msg->len = p - buffer; + return(msg->len); } @@ -161,12 +243,13 @@ read_event(int sock, long *offset) */ #ifdef PERF_EVENTS_INLINE int -input_queue_get(char **buffer, long *offset, int timeout) +input_queue_get(il_octet_string *buffer, long *offset, int timeout) { static long o = 0; int len; - len = glite_wll_perftest_produceEventString(buffer); + len = glite_wll_perftest_produceEventString(&buffer->data); + buffer->len = len; if(len) { o += len; *offset = o; @@ -177,7 +260,7 @@ input_queue_get(char **buffer, long *offset, int timeout) } #else int -input_queue_get(char **buffer, long *offset, int timeout) +input_queue_get(il_octet_string_t *buffer, long *offset, int timeout) { fd_set fds; struct timeval tv; @@ -198,11 +281,17 @@ input_queue_get(char **buffer, long *offset, int timeout) return(0); case -1: /* error */ - set_error(IL_SYS, errno, "input_queue_get: error waiting for event"); - return(-1); - + switch(errno) { + case EINTR: + il_log(LOG_DEBUG, " interrupted while waiting for event!\n"); + return(0); + + default: + set_error(IL_SYS, errno, "input_queue_get: error waiting for event"); + return(-1); + } default: - break; + break; } if((accepted=accept(sock, NULL, NULL)) < 0) { @@ -210,16 +299,16 @@ input_queue_get(char **buffer, long *offset, int timeout) return(-1); } - *buffer = read_event(accepted, offset); + read_event(accepted, offset, buffer); close(accepted); - if(*buffer == NULL) { + if(buffer->data == NULL) { if(error_get_maj() != IL_OK) return(-1); else return(0); } - return(strlen(*buffer)); + return(buffer->len); } #endif diff --git a/org.glite.lb.logger/src/interlogd.c b/org.glite.lb.logger/src/interlogd.c index d5b0c32..aff7510 100644 --- a/org.glite.lb.logger/src/interlogd.c +++ b/org.glite.lb.logger/src/interlogd.c @@ -30,7 +30,7 @@ /* The name the program was run with, stripped of any leading path. */ char *program_name; -static int killflg = 0; +int killflg = 0; int TIMEOUT = DEFAULT_TIMEOUT; diff --git a/org.glite.lb.logger/src/interlogd.h b/org.glite.lb.logger/src/interlogd.h index 9f91a0d..846d9ea 100644 --- a/org.glite.lb.logger/src/interlogd.h +++ b/org.glite.lb.logger/src/interlogd.h @@ -5,6 +5,7 @@ #include "il_error.h" #include "glite/security/glite_gss.h" +#include "glite/lb/il_msg.h" #include #include @@ -68,6 +69,7 @@ extern char *cert_file; extern char *key_file; extern char *CAcert_dir; extern int bs_only; +extern int killflg; #ifdef LB_PERF extern int nosend; #ifdef PERF_EVENTS_INLINE @@ -138,9 +140,9 @@ struct event_queue { /* server msg methods */ -struct server_msg *server_msg_create(char *, long); +struct server_msg *server_msg_create(il_octet_string_t *, long); struct server_msg *server_msg_copy(struct server_msg *); -int server_msg_init(struct server_msg *, char *); +int server_msg_init(struct server_msg *, il_octet_string_t *); #if defined(INTERLOGD_EMS) int server_msg_is_priority(struct server_msg *); #endif @@ -178,7 +180,7 @@ int event_queue_cond_unlock(struct event_queue *); /* input queue */ int input_queue_attach(); void input_queue_detach(); -int input_queue_get(char **, long *, int); +int input_queue_get(il_octet_string_t *, long *, int); /* queue management functions */ int queue_list_init(char *); diff --git a/org.glite.lb.logger/src/perftest_il.sh b/org.glite.lb.logger/src/perftest_il.sh index dcf0ff2..e9562f9 100644 --- a/org.glite.lb.logger/src/perftest_il.sh +++ b/org.glite.lb.logger/src/perftest_il.sh @@ -22,10 +22,17 @@ run_test il $numjobs j=0 while [[ $j -lt 4 ]] do - echo -e -n "\t ${PERFTEST_EV_THROUGHPUT[$j]}" + echo -e -n "\t ${PERFTEST_THROUGHPUT[$j]}" j=$((j+1)) done echo "" +#j=0 +#while [[ $j -lt 4 ]] +#do +# echo -e -n "\t (${PERFTEST_EV_THROUGHPUT[$j]})" +# j=$((j+1)) +#done +#echo "" # diff --git a/org.glite.lb.logger/src/server_msg.c b/org.glite.lb.logger/src/server_msg.c index 1a90e72..b7a005b 100644 --- a/org.glite.lb.logger/src/server_msg.c +++ b/org.glite.lb.logger/src/server_msg.c @@ -12,10 +12,11 @@ static int -create_msg(char *event, char **buffer, long *receipt) +create_msg(il_octet_string_t *ev, char **buffer, long *receipt) { char *p; int len; - + char *event = ev->data; + *receipt = 0; #if defined(INTERLOGD_EMS) @@ -56,7 +57,7 @@ create_msg(char *event, char **buffer, long *receipt) } #endif - len = encode_il_msg(buffer, event); + len = encode_il_msg(buffer, ev); if(len < 0) { set_error(IL_NOMEM, ENOMEM, "create_msg: out of memory allocating message"); return(-1); @@ -66,7 +67,7 @@ create_msg(char *event, char **buffer, long *receipt) struct server_msg * -server_msg_create(char *event, long offset) +server_msg_create(il_octet_string_t *event, long offset) { struct server_msg *msg; @@ -121,7 +122,7 @@ server_msg_copy(struct server_msg *src) int -server_msg_init(struct server_msg *msg, char *event) +server_msg_init(struct server_msg *msg, il_octet_string_t *event) { #if defined(IL_NOTIFICATIONS) edg_wll_Context context; @@ -130,6 +131,7 @@ server_msg_init(struct server_msg *msg, char *event) #endif assert(msg != NULL); + assert(event != NULL); memset(msg, 0, sizeof(*msg)); @@ -164,10 +166,10 @@ server_msg_init(struct server_msg *msg, char *event) if(msg->len < 0) { return(-1); } - msg->job_id_s = edg_wll_GetJobId(event); + msg->job_id_s = edg_wll_GetJobId(event->data); #endif /* remember to add event separator to the length */ - msg->ev_len = strlen(event) + 1; + msg->ev_len = event->len + 1; if(msg->job_id_s == NULL) { set_error(IL_LBAPI, EDG_WLL_ERROR_PARSE_BROKEN_ULM, "server_msg_init: error getting id"); -- 1.8.2.3