From: Michal Voců Date: Tue, 19 Aug 2008 12:42:56 +0000 (+0000) Subject: added handling of JPPS created SOAP files X-Git-Tag: glite-yaim-myproxy_R_4_0_3_1~2 X-Git-Url: http://scientific.zcu.cz/git/?a=commitdiff_plain;h=c7ce43a7de126dc4cda32f1211edf0ab2e7b975d;p=jra1mw.git added handling of JPPS created SOAP files --- diff --git a/org.glite.lb.logger/src/event_store_http.c b/org.glite.lb.logger/src/event_store_http.c index 0363ec2..a34e0a3 100644 --- a/org.glite.lb.logger/src/event_store_http.c +++ b/org.glite.lb.logger/src/event_store_http.c @@ -71,27 +71,18 @@ jobid2controlfile(char *job_id_s) return(buffer); } -struct file_reader_data { - int fd; - size_t max_len; - size_t pos; -}; - -#define IL_RD_VALUE(a,b) ((struct file_reader_data*)(a))->b - static int file_reader(void *user_data, char *buffer, const int len) { - int l, m, ret; + size_t ret = 0; - m = IL_RD_VALUE(user_data, max_len) - IL_RD_VALUE(user_data, pos); - l = (len > m) ? m : len; - if(l > 0) - ret = read(IL_RD_VALUE(user_data, fd), buffer, l); - else - ret = 0; - IL_RD_VALUE(user_data, pos) += ret; + if(len > 0) { + ret = fread(buffer, 1, len, (FILE*)user_data); + if(ret == 0 && ferror((FILE*)user_data)) { + return -1; + } + } return ret; } @@ -100,32 +91,24 @@ static int read_event_string(FILE *file, il_http_message_t *msg) { - struct file_reader_data rd; - char s_len[20]; int len, ret; int fd = fileno(file); - - len = read(fd, s_len, sizeof(s_len)); - if(len != sizeof(s_len)) { - if(len < 0) - set_error(IL_SYS, errno, "read_event_string: error reading record header"); - else - set_error(IL_SYS, EIO, "read_event_string: record header too short"); - return -1; + long start; + + /* remember the start position */ + start = ftell(file); + ret = receive_http(file, file_reader, msg); + if(ret < 0) return ret; + /* seek at the end of message in case the reader read ahead */ + len = fseek(file, start + msg->len, SEEK_SET); + len = fgetc(file); + if(len != '\n') { + il_log(LOG_ERR, "error reading event from file, missing terminator character at %d, found %c(%d))\n", + start+msg->len, len, len); + if(msg->data) { free(msg->data); msg->data = NULL; } + if(msg->host) { free(msg->host); msg->host = NULL; } + return EINVAL; } - if(s_len[0] != 0 || s_len[sizeof(s_len) - 1] != 0) { - set_error(IL_SYS, EINVAL, "read_event_string: invalid record header"); - return -1; - } - len = atoi(s_len + 1); - if(len < 0) { - set_error(IL_SYS, EINVAL, "read_event_string: invalid record length in header"); - return -1; - } - rd.fd = fd; - rd.max_len = len; - rd.pos = 0; - ret = receive_http(&rd, file_reader, msg); return ret; } @@ -436,14 +419,14 @@ event_store_recover(struct event_store *es) /* skip all committed or already enqueued events */ /* be careful - check, if the offset really points to the beginning of event string */ - if(fseek(ef, last, SEEK_SET) < 0) { + if(fseek(ef, last - 1, SEEK_SET) < 0) { set_error(IL_SYS, errno, "event_store_recover: error setting position for read"); event_store_unlock(es); fclose(ef); return(-1); } - /* the new event MUST start with 0 */ - if((c=fgetc(ef)) != 0) { + /* the last enqueued event MUST end with \n */ + if((c=fgetc(ef)) != '\n') { /* Houston, we have got a problem */ il_log(LOG_WARNING, " file position %ld does not point at the beginning of event string, backing off!\n", @@ -459,7 +442,6 @@ event_store_recover(struct event_store *es) } } else { /* OK, break out of the loop */ - fseek(ef, -1, SEEK_CUR); /* should ungetc, but we are reading with read... */ break; } } else { @@ -488,7 +470,7 @@ event_store_recover(struct event_store *es) ret = -1; /* create message for server */ - msg = server_msg_create(&hmsg, last); + msg = server_msg_create((il_octet_string_t*)&hmsg, last); if(msg == NULL) { il_log(LOG_ALERT, " event file corrupted! I will try to move it to quarantine (ie. rename it).\n"); /* actually do not bother if quarantine succeeded or not - we could not do more */ @@ -502,7 +484,7 @@ event_store_recover(struct event_store *es) /* first enqueue to the LS */ if(!bs_only && (last >= es->last_committed_ls)) { - il_log(LOG_DEBUG, " queueing event at %ld to logging server\n", last); + il_log(LOG_DEBUG, " queueing event at %ld to server %s\n", last, eq_l->dest_name); #if !defined(IL_NOTIFICATIONS) if(enqueue_msg(eq_l, msg) < 0) @@ -518,7 +500,7 @@ event_store_recover(struct event_store *es) if((eq_b != eq_l) && (last >= es->last_committed_bs)) { - il_log(LOG_DEBUG, " queueing event at %ld to bookkeeping server\n", last); + il_log(LOG_DEBUG, " queueing event at %ld to server %s\n", last, eq_b->dest_name); if(enqueue_msg(eq_b, msg) < 0) break; @@ -560,11 +542,16 @@ event_store_recover(struct event_store *es) int event_store_sync(struct event_store *es, long offset) { + int ret; + assert(es != NULL); - /* all events actually come through socket before going to file, - so nothing can be found in file that was not seen here */ - return 1; + /* all events are actually read from file, the event on socket + * is ignored and serves just to notify us about file change + */ + ret = event_store_recover(es); + ret = (ret < 0) ? ret : 0; + return(ret); } diff --git a/org.glite.lb.logger/src/http.c b/org.glite.lb.logger/src/http.c index 415e86e..c9fb89b 100644 --- a/org.glite.lb.logger/src/http.c +++ b/org.glite.lb.logger/src/http.c @@ -61,8 +61,9 @@ receive_http(void *user_data, int (*reader)(void *, char *, const int), il_http_ int len, alen, clen, i, buffer_free, min_buffer_free = DEFAULT_CHUNK_SIZE; char *buffer, *p, *s, *cr; - msg->data = NULL; - msg->len = 0; + memset(msg, 0, sizeof(*msg)); + // msg->data = NULL; + // msg->len = 0; state = IN_REQUEST; alen = 0; buffer = NULL; diff --git a/org.glite.lb.logger/src/input_queue_socket_http.c b/org.glite.lb.logger/src/input_queue_socket_http.c index 53a519d..939c45f 100644 --- a/org.glite.lb.logger/src/input_queue_socket_http.c +++ b/org.glite.lb.logger/src/input_queue_socket_http.c @@ -7,12 +7,7 @@ #include #include #include -#include -#include #include -#include -#include -#include #include "interlogd.h" @@ -82,121 +77,6 @@ void input_queue_detach() } -int -store_to_file(il_http_message_t *msg, long *offset) { - char s_len[20]; - char filename[PATH_MAX]; - FILE *outfile; - int i, filedesc; - int ret = -1; - - if(msg->host == NULL) { - set_error(IL_PROTO, EINVAL, "store_to_file: no message destination specified"); - } - - snprintf(filename, sizeof(filename), "%s.%s", file_prefix, msg->host); - filename[sizeof(filename) - 1] = 0; - snprintf(s_len+1, sizeof(s_len)-1, "%18d\n", msg->len); - s_len[sizeof(s_len) - 1] = 0; - s_len[0] = 0; - -try_again: - if((outfile = fopen(filename, "a")) == NULL) { - set_error(IL_SYS, errno, "store_to_file: error opening file"); - goto cleanup; - } - if((filedesc = fileno(outfile)) < 0) { - set_error(IL_SYS, errno, "store_to_file: error getting file descriptor"); - goto cleanup; - } - - for(i = 0; i < 5; i++) { - struct flock filelock; - int filelock_status; - struct stat statbuf; - - filelock.l_type = F_WRLCK; - filelock.l_whence = SEEK_SET; - filelock.l_start = 0; - filelock.l_len = 0; - - if((filelock_status=fcntl(filedesc, F_SETLK, &filelock)) < 0) { - switch(errno) { - case EAGAIN: - case EACCES: - case EINTR: - if((i+1) < 5) sleep(1); - break; - default: - set_error(IL_SYS, errno, "store_to_file: error locking file"); - goto cleanup; - } - } else { - if(stat(filename, &statbuf)) { - if(errno == ENOENT) { - fclose(outfile); - goto try_again; - } else { - set_error(IL_SYS, errno, "store_file: could not stat file"); - goto cleanup; - } - } else { - /* success */ - break; - } - } - } - - if(i == 5) { - set_error(IL_SYS, ETIMEDOUT, "store_to_file: timed out trying to lock file"); - goto cleanup; - } - if(fseek(outfile, 0, SEEK_END) < 0) { - set_error(IL_SYS, errno, "store_to_file: error seeking at end of file"); - goto cleanup; - } - if((*offset=ftell(outfile)) < 0) { - set_error(IL_SYS, errno, "store_to_file: error getting current position"); - goto cleanup; - } - if(fwrite(s_len, sizeof(s_len), 1, outfile) != 1) { - set_error(IL_SYS, errno, "store_to_file: error writing data header to file"); - goto cleanup; - } - if(fwrite(msg->data, msg->len, 1, outfile) != 1) { - set_error(IL_SYS, errno, "store_to_file: error writing data to file"); - goto cleanup; - } - ret = 0; - fflush(outfile); - -cleanup: - if(outfile) fclose(outfile); - return ret; -} - - -int -send_reply(int sd) -{ - const char reply[] = "" - "" - " " - " " - " " - " " - ""; - - return(write(sd, reply, sizeof(reply))); -} - /* * Returns: -1 on error, 0 if no message available, message length otherwise @@ -279,13 +159,8 @@ input_queue_get(il_octet_string_t **buffer, long *offset, int timeout) return 0; } - if(store_to_file(&msg, offset) < 0) { - close(accepted); - return -1; - } - - send_reply(accepted); close(accepted); + *offset = -1; return(msg.len); } #endif diff --git a/org.glite.lb.logger/src/interlogd.c b/org.glite.lb.logger/src/interlogd.c index 0ee2bde..8bf4833 100644 --- a/org.glite.lb.logger/src/interlogd.c +++ b/org.glite.lb.logger/src/interlogd.c @@ -290,6 +290,12 @@ main (int argc, char **argv) exit(EXIT_FAILURE); } + if(!debug && + (daemon(0,0) < 0)) { + perror("daemon"); + exit(EXIT_FAILURE); + } + #ifdef LB_PERF /* this must be called after installing signal handlers */ glite_wll_perftest_init(NULL, /* host */ @@ -342,12 +348,6 @@ main (int argc, char **argv) exit(EXIT_FAILURE); } - if(!debug && - (daemon(0,0) < 0)) { - perror("daemon"); - exit(EXIT_FAILURE); - } - #ifndef PERF_EMPTY /* find all unsent events waiting in files */ #ifdef LB_PERF diff --git a/org.glite.lb.logger/src/send_event_http.c b/org.glite.lb.logger/src/send_event_http.c index 6eedcaf..3c90562 100644 --- a/org.glite.lb.logger/src/send_event_http.c +++ b/org.glite.lb.logger/src/send_event_http.c @@ -252,7 +252,7 @@ event_queue_send(struct event_queue *eq) 4xx - client error (eq. 400 Bad Request) 5xx - server error (eq. 500 Internal Server Error) */ - if(code >= 500 && code < 600) { + if(code >= 100 && code < 200) { /* non fatal errors (for us), try to deliver later */ eq->timeout = TIMEOUT; diff --git a/org.glite.lb.logger/src/server_msg_http.c b/org.glite.lb.logger/src/server_msg_http.c index 37be900..8bd3623 100644 --- a/org.glite.lb.logger/src/server_msg_http.c +++ b/org.glite.lb.logger/src/server_msg_http.c @@ -100,7 +100,8 @@ server_msg_init(struct server_msg *msg, il_octet_string_t *event) return -1; /* set this to indicate new data owner */ hmsg->data = NULL; - msg->ev_len = hmsg->len; /* XXX: add lentgh size too */ + hmsg->host = NULL; + msg->ev_len = hmsg->len + 1; /* must add separator size too */ return 0; }