From: Michal Voců Date: Mon, 21 Apr 2008 15:43:04 +0000 (+0000) Subject: added handling of http messages X-Git-Tag: glite-yaim-lb_R_4_0_2_1~77 X-Git-Url: http://scientific.zcu.cz/git/?a=commitdiff_plain;h=143e6f22952924684ec49cdb0120588c9fd430be;p=jra1mw.git added handling of http messages --- diff --git a/org.glite.lb.logger/src/event_store_http.c b/org.glite.lb.logger/src/event_store_http.c new file mode 100644 index 0000000..0363ec2 --- /dev/null +++ b/org.glite.lb.logger/src/event_store_http.c @@ -0,0 +1,1126 @@ +#ident "$Header$" + +#include +#include +#include +#include +#include +#include +#include +#include +#ifdef HAVE_UNISTD_H +#include +#endif +#include +#include + +#include "glite/lb/events_parse.h" + +#include "interlogd.h" + +#ifdef __GNUC__ +#define UNUSED_VAR __attribute__((unused)) +#else +#define UNUSED_VAR +#endif + +static char *file_prefix = NULL; + + +struct event_store_list { + struct event_store *es; + struct event_store_list *next; +}; + + +static struct event_store_list *store_list; +static pthread_rwlock_t store_list_lock = PTHREAD_RWLOCK_INITIALIZER; + + +/* ---------------- + * helper functions + * ---------------- + */ +static +char * +jobid2eventfile(const char *job_id_s) +{ + char *buffer; + + if(job_id_s) { + asprintf(&buffer, "%s.%s", file_prefix, job_id_s); + } else + asprintf(&buffer, "%s.default", file_prefix); + + return(buffer); +} + + +static +char * +jobid2controlfile(char *job_id_s) +{ + char *buffer; + char *hash; + + if(job_id_s) { + asprintf(&buffer, "%s.%s.ctl", file_prefix, job_id_s); + } else + asprintf(&buffer, "%s.default.ctl", file_prefix); + + return(buffer); +} + +struct file_reader_data { + int fd; + size_t max_len; + size_t pos; +}; + +#define IL_RD_VALUE(a,b) ((struct file_reader_data*)(a))->b + +static +int +file_reader(void *user_data, char *buffer, const int len) +{ + int l, m, ret; + + m = IL_RD_VALUE(user_data, max_len) - IL_RD_VALUE(user_data, pos); + l = (len > m) ? m : len; + if(l > 0) + ret = read(IL_RD_VALUE(user_data, fd), buffer, l); + else + ret = 0; + IL_RD_VALUE(user_data, pos) += ret; + return ret; +} + + +static +int +read_event_string(FILE *file, il_http_message_t *msg) +{ + struct file_reader_data rd; + char s_len[20]; + int len, ret; + int fd = fileno(file); + + len = read(fd, s_len, sizeof(s_len)); + if(len != sizeof(s_len)) { + if(len < 0) + set_error(IL_SYS, errno, "read_event_string: error reading record header"); + else + set_error(IL_SYS, EIO, "read_event_string: record header too short"); + return -1; + } + if(s_len[0] != 0 || s_len[sizeof(s_len) - 1] != 0) { + set_error(IL_SYS, EINVAL, "read_event_string: invalid record header"); + return -1; + } + len = atoi(s_len + 1); + if(len < 0) { + set_error(IL_SYS, EINVAL, "read_event_string: invalid record length in header"); + return -1; + } + rd.fd = fd; + rd.max_len = len; + rd.pos = 0; + ret = receive_http(&rd, file_reader, msg); + return ret; +} + + + +/* ------------------------------ + * event_store 'member' functions + * ------------------------------ + */ +static +int +event_store_free(struct event_store *es) +{ + assert(es != NULL); + + if(es->job_id_s) free(es->job_id_s); + if(es->event_file_name) free(es->event_file_name); + if(es->control_file_name) free(es->control_file_name); + pthread_rwlock_destroy(&es->use_lock); + pthread_rwlock_destroy(&es->update_lock); + free(es); + + return(0); +} + + +static +struct event_store * +event_store_create(char *job_id_s) +{ + struct event_store *es; + + es = malloc(sizeof(*es)); + if(es == NULL) { + set_error(IL_NOMEM, ENOMEM, "event_store_create: error allocating room for structure"); + return(NULL); + } + + memset(es, 0, sizeof(*es)); + + il_log(LOG_DEBUG, " creating event store for id %s\n", job_id_s); + + es->job_id_s = strdup(job_id_s); + es->event_file_name = jobid2eventfile(job_id_s); + es->control_file_name = jobid2controlfile(job_id_s); + + if(pthread_rwlock_init(&es->update_lock, NULL)) + abort(); + if(pthread_rwlock_init(&es->use_lock, NULL)) + abort(); + + return(es); +} + + +static +int +event_store_lock_ro(struct event_store *es) +{ + assert(es != NULL); + + if(pthread_rwlock_rdlock(&es->update_lock)) + abort(); + + return(0); +} + + +static +int +event_store_lock(struct event_store *es) +{ + assert(es != NULL); + + if(pthread_rwlock_wrlock(&es->update_lock)) + abort(); + + return(0); +} + + +static +int +event_store_unlock(struct event_store *es) +{ + assert(es != NULL); + + if(pthread_rwlock_unlock(&es->update_lock)) + abort(); + return(0); +} + + +static +int +event_store_read_ctl(struct event_store *es) +{ + FILE *ctl_file; + + assert(es != NULL); + + event_store_lock(es); + if((ctl_file = fopen(es->control_file_name, "r")) == NULL) { + /* no control file, new event file */ + es->last_committed_ls = 0; + es->last_committed_bs = 0; + } else { + /* read last seen and last committed counts */ + fscanf(ctl_file, "%*s\n%ld\n%ld\n", + &es->last_committed_ls, + &es->last_committed_bs); + fclose(ctl_file); + } + event_store_unlock(es); + + return(0); +} + + +static +int +event_store_write_ctl(struct event_store *es) +{ + FILE *ctl; + + assert(es != NULL); + + ctl = fopen(es->control_file_name, "w"); + if(ctl == NULL) { + set_error(IL_SYS, errno, "event_store_write_ctl: error opening control file"); + return(-1); + } + + if(fprintf(ctl, "%s\n%ld\n%ld\n", + es->job_id_s, + es->last_committed_ls, + es->last_committed_bs) < 0) { + set_error(IL_SYS, errno, "event_store_write_ctl: error writing control record"); + return(-1); + } + + if(fclose(ctl) < 0) { + set_error(IL_SYS, errno, "event_store_write_ctl: error closing control file"); + return(-1); + } + + return(0); +} + + +/* + * event_store_qurantine() + * - rename damaged event store file + * - essentially does the same actions as cleanup, but the event store + * does not have to be empty + * returns 0 on success, -1 on error + */ +static +int +event_store_quarantine(struct event_store *es) +{ + int num; + char newname[MAXPATHLEN+1]; + + /* find available qurantine name */ + /* we give it at most 1024 tries */ + for(num = 0; num < 1024; num++) { + struct stat st; + + snprintf(newname, MAXPATHLEN, "%s.quarantine.%d", es->event_file_name, num); + newname[MAXPATHLEN] = 0; + if(stat(newname, &st) < 0) { + if(errno == ENOENT) { + /* file not found */ + break; + } else { + /* some other error with name, probably permanent */ + set_error(IL_SYS, errno, "event_store_qurantine: error looking for qurantine filename"); + return(-1); + + } + } else { + /* the filename is used already */ + } + } + if(num >= 1024) { + /* new name not found */ + /* XXX - is there more suitable error? */ + set_error(IL_SYS, ENOSPC, "event_store_quarantine: exhausted number of retries looking for quarantine filename"); + return(-1); + } + + /* actually rename the file */ + il_log(LOG_DEBUG, " renaming damaged event file from %s to %s\n", + es->event_file_name, newname); + if(rename(es->event_file_name, newname) < 0) { + set_error(IL_SYS, errno, "event_store_quarantine: error renaming event file"); + return(-1); + } + + /* clear the counters */ + es->last_committed_ls = 0; + es->last_committed_bs = 0; + es->offset = 0; + + return(0); +} + + +/* + * event_store_recover() + * - recover after restart or catch up when events missing in IPC + * - if offset > 0, read everything behind it + * - if offset == 0, read everything behind min(last_committed_bs, last_committed_es) + */ +int +event_store_recover(struct event_store *es) +{ + struct event_queue *eq_l = NULL, *eq_b = NULL; + struct server_msg *msg; + il_http_message_t hmsg; + char *event_s; + int fd, ret; + long last; + FILE *ef; + struct flock efl; + char err_msg[128]; + struct stat stbuf; + + assert(es != NULL); + +#if defined(IL_NOTIFICATIONS) + /* destination queue has to be found for each message separately */ +#else + /* find bookkepping server queue */ + eq_b = queue_list_get(es->job_id_s); + if(eq_b == NULL) + return(-1); +#endif + +#if !defined(IL_NOTIFICATIONS) + /* get log server queue */ + eq_l = queue_list_get(NULL); +#endif + + event_store_lock(es); + + il_log(LOG_DEBUG, " reading events from %s\n", es->event_file_name); + + /* open event file */ + ef = fopen(es->event_file_name, "r"); + if(ef == NULL) { + snprintf(err_msg, sizeof(err_msg), + "event_store_recover: error opening event file %s", + es->event_file_name); + set_error(IL_SYS, errno, err_msg); + event_store_unlock(es); + return(-1); + } + + /* lock the file for reading (we should not read while dglogd is writing) */ + fd = fileno(ef); + efl.l_type = F_RDLCK; + efl.l_whence = SEEK_SET; + efl.l_start = 0; + efl.l_len = 0; + if(fcntl(fd, F_SETLKW, &efl) < 0) { + snprintf(err_msg, sizeof(err_msg), + "event_store_recover: error locking event file %s", + es->event_file_name); + set_error(IL_SYS, errno, err_msg); + event_store_unlock(es); + fclose(ef); + return(-1); + } + + /* check the file modification time and size to avoid unnecessary operations */ + memset(&stbuf, 0, sizeof(stbuf)); + if(fstat(fd, &stbuf) < 0) { + il_log(LOG_ERR, " could not stat event file %s: %s\n", es->event_file_name, strerror(errno)); + fclose(ef); + event_store_unlock(es); + return -1; + } else { + if((es->offset == stbuf.st_size) && (es->last_modified == stbuf.st_mtime)) { + il_log(LOG_DEBUG, " event file not modified since last visit, skipping\n"); + fclose(ef); + event_store_unlock(es); + return(0); + } + } + + while(1) { /* try, try, try */ + + /* get the position in file to be sought */ + if(es->offset) + last = es->offset; + else { + last = es->last_committed_bs; + } + + il_log(LOG_DEBUG, " setting starting file position to %ld\n", last); + il_log(LOG_DEBUG, " bytes sent to destination: %d\n", es->last_committed_bs); + + if(last > 0) { + int c; + + /* skip all committed or already enqueued events */ + /* be careful - check, if the offset really points to the + beginning of event string */ + if(fseek(ef, last, SEEK_SET) < 0) { + set_error(IL_SYS, errno, "event_store_recover: error setting position for read"); + event_store_unlock(es); + fclose(ef); + return(-1); + } + /* the new event MUST start with 0 */ + if((c=fgetc(ef)) != 0) { + /* Houston, we have got a problem */ + il_log(LOG_WARNING, + " file position %ld does not point at the beginning of event string, backing off!\n", + last); + /* now, where were we? */ + if(es->offset) { + /* next try will be with + last_commited_bs */ + es->offset = 0; + } else { + /* this is really weird... back off completely */ + es->last_committed_ls = es->last_committed_bs = 0; + } + } else { + /* OK, break out of the loop */ + fseek(ef, -1, SEEK_CUR); /* should ungetc, but we are reading with read... */ + break; + } + } else { + /* this breaks out of the loop, we are starting at + * the beginning of file + */ + if(fseek(ef, 0, SEEK_SET) < 0) { + set_error(IL_SYS, errno, "event_store_recover: error setting position for read"); + event_store_unlock(es); + fclose(ef); + return(-1); + } + break; + } + } + + /* enqueue all remaining events */ + ret = 1; + msg = NULL; + while(read_event_string(ef, &hmsg) >= 0) { + + /* last holds the starting position of event_s in file */ + il_log(LOG_DEBUG, " reading event at %ld\n", last); + + /* break from now on means there was some error */ + ret = -1; + + /* create message for server */ + msg = server_msg_create(&hmsg, last); + if(msg == NULL) { + il_log(LOG_ALERT, " event file corrupted! I will try to move it to quarantine (ie. rename it).\n"); + /* actually do not bother if quarantine succeeded or not - we could not do more */ + event_store_quarantine(es); + fclose(ef); + event_store_unlock(es); + return(-1); + } + msg->es = es; + + /* first enqueue to the LS */ + if(!bs_only && (last >= es->last_committed_ls)) { + + il_log(LOG_DEBUG, " queueing event at %ld to logging server\n", last); + +#if !defined(IL_NOTIFICATIONS) + if(enqueue_msg(eq_l, msg) < 0) + break; +#endif + } + +#ifdef IL_NOTIFICATIONS + eq_b = queue_list_get(msg->dest); +#endif + + /* now enqueue to the BS, if neccessary */ + if((eq_b != eq_l) && + (last >= es->last_committed_bs)) { + + il_log(LOG_DEBUG, " queueing event at %ld to bookkeeping server\n", last); + + if(enqueue_msg(eq_b, msg) < 0) + break; + } + server_msg_free(msg); + msg = NULL; + + /* now last is also the offset behind the last successfully queued event */ + last = ftell(ef); + + /* ret == 0 means EOF or incomplete event found */ + ret = 0; + + } /* while */ + + /* due to this little assignment we had to lock the event_store for writing */ + es->offset = last; + es->last_modified = stbuf.st_mtime; + il_log(LOG_DEBUG, " event store offset set to %ld\n", last); + + if(msg) + server_msg_free(msg); + + fclose(ef); + il_log(LOG_DEBUG, " finished reading events with %d\n", ret); + + event_store_unlock(es); + return(ret); +} + + +/* + * event_store_sync() + * - check the position of event and fill holes from file + * - return 1 if the event is new, + * 0 if it was seen before, + * -1 if there was an error + */ +int +event_store_sync(struct event_store *es, long offset) +{ + assert(es != NULL); + + /* all events actually come through socket before going to file, + so nothing can be found in file that was not seen here */ + return 1; +} + + +int +event_store_next(struct event_store *es, long offset, int len) +{ + assert(es != NULL); + + /* offsets are good only to detect losses (differences between socket and file), + which is not possible now */ + return 0; +} + + +/* + * event_store_commit() + * + */ +int +event_store_commit(struct event_store *es, int len, int ls) +{ + assert(es != NULL); + + event_store_lock(es); + + if(ls) + es->last_committed_ls += len; + else { + es->last_committed_bs += len; + if (bs_only) es->last_committed_ls += len; + } + + if(event_store_write_ctl(es) < 0) { + event_store_unlock(es); + return(-1); + } + + event_store_unlock(es); + + + return(0); +} + + +/* + * event_store_clean() + * - remove the event files (event and ctl), if they are not needed anymore + * - returns 0 if event_store is in use, 1 if it was removed and -1 on error + * + * Q: How do we know that we can safely remove the files? + * A: When all events from file have been committed both by LS and BS. + */ +static +int +event_store_clean(struct event_store *es) +{ + long last; + int fd; + FILE *ef; + struct flock efl; + + assert(es != NULL); + + /* prevent sender threads from updating */ + event_store_lock(es); + + il_log(LOG_DEBUG, " trying to cleanup event store %s\n", es->job_id_s); + il_log(LOG_DEBUG, " bytes sent to logging server: %d\n", es->last_committed_ls); + il_log(LOG_DEBUG, " bytes sent to bookkeeping server: %d\n", es->last_committed_bs); + + /* preliminary check to avoid opening event file */ + /* if the positions differ, some events still have to be sent */ + if(es->last_committed_ls != es->last_committed_bs) { + event_store_unlock(es); + il_log(LOG_DEBUG, " not all events sent, cleanup aborted\n"); + return(0); + } + + /* the file can only be removed when all the events were succesfully sent + (ie. committed both by LS and BS */ + /* That also implies that the event queues are 'empty' at the moment. */ + ef = fopen(es->event_file_name, "r+"); + if(ef == NULL) { + /* if we can not open the event store, it is an error and the struct should be removed */ + /* XXX - is it true? */ + event_store_unlock(es); + il_log(LOG_ERR, " event_store_clean: error opening event file: %s\n", strerror(errno)); + return(1); + } + + fd = fileno(ef); + + /* prevent local-logger from writing into event file */ + efl.l_type = F_WRLCK; + efl.l_whence = SEEK_SET; + efl.l_start = 0; + efl.l_len = 0; + if(fcntl(fd, F_SETLK, &efl) < 0) { + il_log(LOG_DEBUG, " could not lock event file, cleanup aborted\n"); + fclose(ef); + event_store_unlock(es); + if(errno != EACCES && + errno != EAGAIN) { + set_error(IL_SYS, errno, "event_store_clean: error locking event file"); + return(-1); + } + return(0); + } + + /* now the file should not contain partially written event, so it is safe + to get offset behind last event by seeking the end of file */ + if(fseek(ef, 0, SEEK_END) < 0) { + set_error(IL_SYS, errno, "event_store_clean: error seeking the end of file"); + event_store_unlock(es); + fclose(ef); + return(-1); + } + + last = ftell(ef); + il_log(LOG_DEBUG, " total bytes in file: %d\n", last); + + if(es->last_committed_ls < last) { + fclose(ef); + event_store_unlock(es); + il_log(LOG_DEBUG, " events still waiting in queue, cleanup aborted\n"); + return(0); + } else if( es->last_committed_ls > last) { + il_log(LOG_WARNING, " warning: event file seems to shrink!\n"); + /* XXX - in that case we can not continue because there may be + some undelivered events referring to that event store */ + fclose(ef); + event_store_unlock(es); + return(0); + } + + /* now we are sure that all events were sent and the event queues are empty */ + il_log(LOG_INFO, " removing event file %s\n", es->event_file_name); + + /* remove the event file */ + unlink(es->event_file_name); + unlink(es->control_file_name); + + /* clear the counters */ + es->last_committed_ls = 0; + es->last_committed_bs = 0; + es->offset = 0; + + /* unlock the event_store even if it is going to be removed */ + event_store_unlock(es); + + /* close the event file (that unlocks it as well) */ + fclose(ef); + + /* indicate that it is safe to remove this event_store */ + return(1); +} + + +/* -------------------------------- + * event store management functions + * -------------------------------- + */ +struct event_store * +event_store_find(char *job_id_s) +{ + struct event_store_list *q, *p; + struct event_store *es; + + if(pthread_rwlock_wrlock(&store_list_lock)) { + abort(); + } + + es = NULL; + + q = NULL; + p = store_list; + + while(p) { + if(strcmp(p->es->job_id_s, job_id_s) == 0) { + es = p->es; + if(pthread_rwlock_rdlock(&es->use_lock)) + abort(); + if(pthread_rwlock_unlock(&store_list_lock)) + abort(); + return(es); + } + + q = p; + p = p->next; + } + + es = event_store_create(job_id_s); + if(es == NULL) { + if(pthread_rwlock_unlock(&store_list_lock)) + abort(); + return(NULL); + } + + p = malloc(sizeof(*p)); + if(p == NULL) { + set_error(IL_NOMEM, ENOMEM, "event_store_find: no room for new event store"); + if(pthread_rwlock_unlock(&store_list_lock)) + abort(); + return(NULL); + } + + p->next = store_list; + store_list = p; + + p->es = es; + + if(pthread_rwlock_rdlock(&es->use_lock)) + abort(); + + if(pthread_rwlock_unlock(&store_list_lock)) + abort(); + + return(es); +} + + +int +event_store_release(struct event_store *es) +{ + assert(es != NULL); + + if(pthread_rwlock_unlock(&es->use_lock)) + abort(); + il_log(LOG_DEBUG, " released lock on %s\n", es->job_id_s); + return(0); +} + + +event_store_from_file(char *filename) +{ + struct event_store *es; + FILE *event_file; + char *job_id_s = NULL, *p; + il_http_message_t hmsg; + int ret; + + il_log(LOG_INFO, " attaching to event file: %s\n", filename); + + if(strstr(filename, "quarantine") != NULL) { + il_log(LOG_INFO, " file name belongs to quarantine, not touching that.\n"); + return(0); + } + + event_file = fopen(filename, "r"); + if(event_file == NULL) { + set_error(IL_SYS, errno, "event_store_from_file: error opening event file"); + return(-1); + } + ret = read_event_string(event_file, &hmsg); + fclose(event_file); + if(ret < 0) + return(0); + + /* get id aka dest */ + job_id_s = hmsg.host; + + il_log(LOG_DEBUG, " message dest: '%s'\n", job_id_s); + if(job_id_s == NULL) { + il_log(LOG_NOTICE, " skipping file, could not parse event\n"); + ret = 0; + goto out; + } + + es=event_store_find(job_id_s); + + if(es == NULL) { + ret = -1; + goto out; + } + + if((es->last_committed_ls == 0) && + (es->last_committed_bs == 0) && + (es->offset == 0)) { + ret = event_store_read_ctl(es); + } else + ret = 0; + + event_store_release(es); + +out: + if(hmsg.data) free(hmsg.data); + if(job_id_s) free(job_id_s); + return(ret); +} + + +int +event_store_init(char *prefix) +{ + if(file_prefix == NULL) { + file_prefix = strdup(prefix); + store_list = NULL; + } + + /* read directory and get a list of event files */ + { + int len; + + char *p, *dir; + DIR *event_dir; + struct dirent *entry; + + + /* get directory name */ + p = strrchr(file_prefix, '/'); + if(p == NULL) { + dir = strdup("."); + p = ""; + len = 0; + } else { + *p = '\0'; + dir = strdup(file_prefix); + *p++ = '/'; + len = strlen(p); + } + + event_dir = opendir(dir); + if(event_dir == NULL) { + free(dir); + set_error(IL_SYS, errno, "event_store_init: error opening event directory"); + return(-1); + } + + while((entry=readdir(event_dir))) { + char *s; + + /* skip all files that do not match prefix */ + if(strncmp(entry->d_name, p, len) != 0) + continue; + + /* skip all control files */ + if((s=strstr(entry->d_name, ".ctl")) != NULL && + s[4] == '\0') + continue; + + s = malloc(strlen(dir) + strlen(entry->d_name) + 2); + if(s == NULL) { + free(dir); + set_error(IL_NOMEM, ENOMEM, "event_store_init: no room for file name"); + return(-1); + } + + *s = '\0'; + strcat(s, dir); + strcat(s, "/"); + strcat(s, entry->d_name); + + if(event_store_from_file(s) < 0) { + free(dir); + free(s); + closedir(event_dir); + return(-1); + } + + free(s); + } + closedir(event_dir); + + /* one more pass - this time remove stale .ctl files */ + event_dir = opendir(dir); + if(event_dir == NULL) { + free(dir); + set_error(IL_SYS, errno, "event_store_init: error opening event directory"); + return(-1); + } + + while((entry=readdir(event_dir))) { + char *s; + + /* skip all files that do not match prefix */ + if(strncmp(entry->d_name, p, len) != 0) + continue; + + /* find all control files */ + if((s=strstr(entry->d_name, ".ctl")) != NULL && + s[4] == '\0') { + char *ef; + struct stat st; + + /* is there corresponding event file? */ + ef = malloc(strlen(dir) + strlen(entry->d_name) + 2); + if(ef == NULL) { + free(dir); + set_error(IL_NOMEM, ENOMEM, "event_store_init: no room for event file name"); + return(-1); + } + + s[0] = 0; + *ef = '\0'; + strcat(ef, dir); + strcat(ef, "/"); + strcat(ef, entry->d_name); + s[0] = '.'; + + if(stat(ef, &st) == 0) { + /* something is there */ + /* XXX - it could be something else than event file, but do not bother now */ + } else { + /* could not stat file, remove ctl */ + strcat(ef, s); + il_log(LOG_DEBUG, " removing stale file %s\n", ef); + if(unlink(ef)) + il_log(LOG_ERR, " could not remove file %s: %s\n", ef, strerror(errno)); + + } + free(ef); + + } + } + closedir(event_dir); + free(dir); + } + + return(0); +} + + +int +event_store_recover_all() +{ + struct event_store_list *sl; + + + if(pthread_rwlock_rdlock(&store_list_lock)) + abort(); + + /* recover all event stores */ + sl = store_list; + while(sl != NULL) { + + /* recover this event store */ + /* no need to lock use_lock in event_store, the store_list_lock is in place */ + if(event_store_recover(sl->es) < 0) { + il_log(LOG_ERR, " error recovering event store %s:\n %s\n", sl->es->event_file_name, error_get_msg()); + clear_error(); + } + sl = sl->next; + } + + if(pthread_rwlock_unlock(&store_list_lock)) + abort(); + + return(0); +} + + +#if 0 +int +event_store_remove(struct event_store *es) +{ + struct event_store_list *p, **q; + + assert(es != NULL); + + switch(event_store_clean(es)) { + case 0: + il_log(LOG_DEBUG, " event store not removed, still used\n"); + return(0); + + case 1: + if(pthread_rwlock_wrlock(&store_list_lock) < 0) { + set_error(IL_SYS, errno, " event_store_remove: error locking event store list"); + return(-1); + } + + p = store_list; + q = &store_list; + + while(p) { + if(p->es == es) { + (*q) = p->next; + event_store_free(es); + free(p); + break; + } + q = &(p->next); + p = p->next; + } + + if(pthread_rwlock_unlock(&store_list_lock) < 0) { + set_error(IL_SYS, errno, " event_store_remove: error unlocking event store list"); + return(-1); + } + return(1); + + default: + return(-1); + } + /* not reached */ + return(0); +} +#endif + +int +event_store_cleanup() +{ + struct event_store_list *sl; + struct event_store_list *slnext; + struct event_store_list **prev; + + /* try to remove event files */ + + if(pthread_rwlock_wrlock(&store_list_lock)) + abort(); + + sl = store_list; + prev = &store_list; + + while(sl != NULL) { + int ret; + + slnext = sl->next; + + /* one event store at time */ + ret = pthread_rwlock_trywrlock(&sl->es->use_lock); + if(ret == EBUSY) { + il_log(LOG_DEBUG, " event_store %s is in use by another thread\n", + sl->es->job_id_s); + sl = slnext; + continue; + } else if (ret < 0) + abort(); + + switch(event_store_clean(sl->es)) { + + case 1: + /* remove this event store */ + (*prev) = slnext; + event_store_free(sl->es); + free(sl); + break; + + case -1: + il_log(LOG_ERR, " error removing event store %s (file %s):\n %s\n", + sl->es->job_id_s, sl->es->event_file_name, error_get_msg()); + /* event_store_release(sl->es); */ + clear_error(); + /* go on to the next */ + + default: + event_store_release(sl->es); + prev = &(sl->next); + break; + } + + sl = slnext; + } + + if(pthread_rwlock_unlock(&store_list_lock)) + abort(); + + return(0); +} + diff --git a/org.glite.lb.logger/src/http.c b/org.glite.lb.logger/src/http.c new file mode 100644 index 0000000..415e86e --- /dev/null +++ b/org.glite.lb.logger/src/http.c @@ -0,0 +1,196 @@ +#ident "$Header$" + +#include +#include + +#include "interlogd.h" + + +int +parse_request(const char *s, il_http_message_t *msg) +{ + if(!strncasecmp(s, "HTTP", 4)) { + msg->msg_type = IL_HTTP_REPLY; + } else if(!strncasecmp(s, "POST", 4)) { + msg->msg_type = IL_HTTP_POST; + } else if(!strncasecmp(s, "GET", 3)) { + msg->msg_type = IL_HTTP_GET; + } else { + msg->msg_type = IL_HTTP_OTHER; + } + if(msg->msg_type == IL_HTTP_REPLY) { + char *p = strchr(s, ' '); + + if(!p) goto parse_end; + p++; + msg->reply_code=atoi(p); + p = strchr(p, ' '); + if(!p) goto parse_end; + p++; + msg->reply_string = strdup(p); + + parse_end: + ; + } +} + + +int +parse_header(const char *s, il_http_message_t *msg) +{ + if(!strncasecmp(s, "Content-Length:", 15)) { + msg->content_length = atoi(s + 15); + } else if(!strncasecmp(s, "Host:", 5)) { + const char *p = s + 4; + while(*++p == ' '); /* skip spaces */ + msg->host = strdup(p); + } + return(0); +} + + +#define DEFAULT_CHUNK_SIZE 1024 + +// read what is available and parse what can be parsed +// returns the result of read operation of the underlying connection, +// ie. the number of bytes read or error code +int +receive_http(void *user_data, int (*reader)(void *, char *, const int), il_http_message_t *msg) +{ + static enum { NONE, IN_REQUEST, IN_HEADERS, IN_BODY } state = NONE; + int len, alen, clen, i, buffer_free, min_buffer_free = DEFAULT_CHUNK_SIZE; + char *buffer, *p, *s, *cr; + + msg->data = NULL; + msg->len = 0; + state = IN_REQUEST; + alen = 0; + buffer = NULL; + buffer_free = 0; + p = NULL; + s = NULL; + + do { + /* p - first empty position in buffer + alen - size of allocated buffer + len - number of bytes received in last read + s - points behind last scanned CRLF or at buffer start + buffer_free = alen - (p - buffer) + */ + + /* prepare at least chunk_size bytes for next data */ + if(buffer_free < min_buffer_free) { + char *n; + + alen += min_buffer_free; + n = realloc(buffer, alen); + if(n == NULL) { + free(buffer); + set_error(IL_NOMEM, ENOMEM, "read_event: no room for event"); + return(-1); + } + buffer_free += min_buffer_free; + p = n + (p - buffer); + s = n + (s - buffer); + buffer = n; + } + + if(buffer_free > 0) { + len = (*reader)(user_data, p, buffer_free); + if(len < 0) { + // error + free(buffer); + // set_error(IL_SYS, errno, "receive_http: error reading data"); + return -1; + } else if(len == 0) { + // EOF + free(buffer); + set_error(IL_PROTO, errno, "receive_http: error reading data - premature EOF"); + return -1; + } + buffer_free -= len; + p+= len; + } + + + switch(state) { + + // parse buffer, look for CRLFs + // s - start scan position + // p - start of current token + // cr - current CRLF position + + case IN_REQUEST: + if((s < p - 1) && + (cr = (char*)memchr(s, '\r', p - s - 1)) && + (cr[1] == '\n')) { + *cr = 0; + parse_request(s, msg); + *cr = '\r'; + // change state + state = IN_HEADERS; + // start new tokens (cr < p - 1 -> s < p + 1 <-> s <= p) + s = cr + 2; + } else { + break; + } + + case IN_HEADERS: + while((state != IN_BODY) && + (s < p - 1) && + (cr = (char*)memchr(s, '\r', p - s - 1)) && + (cr[1] == '\n')) { + if(s == cr) { /* do not consider request starting with CRLF */ + // found CRLFCRLF + state = IN_BODY; + } else { + *cr = 0; + parse_header(s, msg); + *cr = '\r'; + } + // next scan starts after CRLF + s = cr + 2; + } + if(state == IN_BODY) { + // we found body + // content-length should be set at the moment + if(msg->content_length > 0) { + int need_free = msg->content_length - (p - s); + char *n; + + alen += need_free - buffer_free + 1; + n = realloc(buffer, alen); + if(n == NULL) { + free(buffer); + set_error(IL_NOMEM, ENOMEM, "read_event: no room for event"); + return(-1); + } + buffer_free = need_free; + min_buffer_free = 0; + p = n + (p - buffer); + s = n + (s - buffer); + buffer = n; + } else { + // report error + free(buffer); + set_error(IL_PROTO, EINVAL, "receive_http: error reading data - no content length specified\n"); + return -1; + } + } + break; + + case IN_BODY: + if(buffer_free == 0) { + // finished reading + *p = 0; + state = NONE; + } + break; + } + } while(state != NONE); + + msg->data = buffer; + msg->len = p - buffer; + + return 0; +} diff --git a/org.glite.lb.logger/src/input_queue_socket_http.c b/org.glite.lb.logger/src/input_queue_socket_http.c new file mode 100644 index 0000000..53a519d --- /dev/null +++ b/org.glite.lb.logger/src/input_queue_socket_http.c @@ -0,0 +1,292 @@ +#ident "$Header$" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "interlogd.h" + +static const int SOCK_QUEUE_MAX = 50; +extern char *socket_path; +extern char *file_prefix; + +static int sock; +static int accepted; + +static +int plain_reader(void *user_data, char *buffer, const int len) +{ + return (recv(*(int*)user_data, buffer, len, MSG_NOSIGNAL)); +} + + +int +input_queue_attach() +{ + struct sockaddr_un saddr; + + if((sock=socket(PF_UNIX, SOCK_STREAM, 0)) < 0) { + set_error(IL_SYS, errno, "input_queue_attach: error creating socket"); + return(-1); + } + + memset(&saddr, 0, sizeof(saddr)); + saddr.sun_family = AF_UNIX; + strcpy(saddr.sun_path, socket_path); + + /* test for the presence of the socket and another instance + of interlogger listening */ + if(connect(sock, (struct sockaddr *)&saddr, sizeof(saddr.sun_path)) < 0) { + if(errno == ECONNREFUSED) { + /* socket present, but no one at the other end; remove it */ + il_log(LOG_WARNING, " removing stale input socket %s\n", socket_path); + unlink(socket_path); + } + /* ignore other errors for now */ + } else { + /* connection was successful, so bail out - there is + another interlogger running */ + set_error(IL_SYS, EADDRINUSE, "input_queue_attach: another instance of interlogger is running"); + return(-1); + } + + if(bind(sock, (struct sockaddr *)&saddr, sizeof(saddr)) < 0) { + set_error(IL_SYS, errno, "input_queue_attach: error binding socket"); + return(-1); + } + + if (listen(sock, SOCK_QUEUE_MAX)) { + set_error(IL_SYS, errno, "input_queue_attach: error listening on socket"); + return -1; + } + + return(0); +} + + +void input_queue_detach() +{ + if (sock >= 0) + close(sock); + unlink(socket_path); +} + + +int +store_to_file(il_http_message_t *msg, long *offset) { + char s_len[20]; + char filename[PATH_MAX]; + FILE *outfile; + int i, filedesc; + int ret = -1; + + if(msg->host == NULL) { + set_error(IL_PROTO, EINVAL, "store_to_file: no message destination specified"); + } + + snprintf(filename, sizeof(filename), "%s.%s", file_prefix, msg->host); + filename[sizeof(filename) - 1] = 0; + snprintf(s_len+1, sizeof(s_len)-1, "%18d\n", msg->len); + s_len[sizeof(s_len) - 1] = 0; + s_len[0] = 0; + +try_again: + if((outfile = fopen(filename, "a")) == NULL) { + set_error(IL_SYS, errno, "store_to_file: error opening file"); + goto cleanup; + } + if((filedesc = fileno(outfile)) < 0) { + set_error(IL_SYS, errno, "store_to_file: error getting file descriptor"); + goto cleanup; + } + + for(i = 0; i < 5; i++) { + struct flock filelock; + int filelock_status; + struct stat statbuf; + + filelock.l_type = F_WRLCK; + filelock.l_whence = SEEK_SET; + filelock.l_start = 0; + filelock.l_len = 0; + + if((filelock_status=fcntl(filedesc, F_SETLK, &filelock)) < 0) { + switch(errno) { + case EAGAIN: + case EACCES: + case EINTR: + if((i+1) < 5) sleep(1); + break; + default: + set_error(IL_SYS, errno, "store_to_file: error locking file"); + goto cleanup; + } + } else { + if(stat(filename, &statbuf)) { + if(errno == ENOENT) { + fclose(outfile); + goto try_again; + } else { + set_error(IL_SYS, errno, "store_file: could not stat file"); + goto cleanup; + } + } else { + /* success */ + break; + } + } + } + + if(i == 5) { + set_error(IL_SYS, ETIMEDOUT, "store_to_file: timed out trying to lock file"); + goto cleanup; + } + if(fseek(outfile, 0, SEEK_END) < 0) { + set_error(IL_SYS, errno, "store_to_file: error seeking at end of file"); + goto cleanup; + } + if((*offset=ftell(outfile)) < 0) { + set_error(IL_SYS, errno, "store_to_file: error getting current position"); + goto cleanup; + } + if(fwrite(s_len, sizeof(s_len), 1, outfile) != 1) { + set_error(IL_SYS, errno, "store_to_file: error writing data header to file"); + goto cleanup; + } + if(fwrite(msg->data, msg->len, 1, outfile) != 1) { + set_error(IL_SYS, errno, "store_to_file: error writing data to file"); + goto cleanup; + } + ret = 0; + fflush(outfile); + +cleanup: + if(outfile) fclose(outfile); + return ret; +} + + +int +send_reply(int sd) +{ + const char reply[] = "" + "" + " " + " " + " " + " " + ""; + + return(write(sd, reply, sizeof(reply))); +} + + +/* + * Returns: -1 on error, 0 if no message available, message length otherwise + * + */ +#ifdef PERF_EVENTS_INLINE +int +input_queue_get(il_octet_string_t **buffer, long *offset, int timeout) +{ + static long o = 0; + int len; + char *jobid; + static il_octet_string_t my_buffer; + + assert(buffer != NULL); + + *buffer = &my_buffer; + + len = glite_wll_perftest_produceEventString(&my_buffer.data, &jobid); + my_buffer.len = len; + if(len) { + o += len; + *offset = o; + } else if (len == 0) { + sleep(timeout); + } + return(len); +} +#else +int +input_queue_get(il_octet_string_t **buffer, long *offset, int timeout) +{ + fd_set fds; + struct timeval tv; + int msg_len; + static il_http_message_t msg; + + assert(buffer != NULL); + + *buffer = (il_octet_string_t *)&msg; + + FD_ZERO(&fds); + FD_SET(sock, &fds); + + tv.tv_sec = timeout; + tv.tv_usec = 0; + + msg_len = select(sock + 1, &fds, NULL, NULL, timeout >= 0 ? &tv : NULL); + switch(msg_len) { + + case 0: /* timeout */ + return(0); + + case -1: /* error */ + switch(errno) { + case EINTR: + il_log(LOG_DEBUG, " interrupted while waiting for event!\n"); + return(0); + + default: + set_error(IL_SYS, errno, "input_queue_get: error waiting for event"); + return(-1); + } + default: + break; + } + + if((accepted=accept(sock, NULL, NULL)) < 0) { + set_error(IL_SYS, errno, "input_queue_get: error accepting connection"); + return(-1); + } + + msg_len = receive_http(&accepted, plain_reader, &msg); + + if(msg_len < 0) { + close(accepted); + if(error_get_maj() != IL_OK) + return -1; + else + return 0; + } + + if(store_to_file(&msg, offset) < 0) { + close(accepted); + return -1; + } + + send_reply(accepted); + close(accepted); + return(msg.len); +} +#endif + diff --git a/org.glite.lb.logger/src/queue_mgr_http.c b/org.glite.lb.logger/src/queue_mgr_http.c new file mode 100644 index 0000000..59eba4a --- /dev/null +++ b/org.glite.lb.logger/src/queue_mgr_http.c @@ -0,0 +1,164 @@ +#ident "$Header$" + +#include +#include +#include + +#include "glite/jobid/cjobid.h" +#include "glite/lb/context.h" + +#include "interlogd.h" + +struct queue_list { + struct event_queue *queue; + char *dest; + struct queue_list *next; + time_t expires; +}; + +static struct event_queue *log_queue; +static struct queue_list *queues; + + +static +int +queue_list_create() +{ + queues = NULL; + + return(0); +} + + +static +int +queue_list_find(struct queue_list *ql, const char *dest, struct queue_list **el, struct queue_list **prev) +{ + struct queue_list *q, *p; + + assert(el != NULL); + + *el = NULL; + if(prev) + *prev = NULL; + + if(ql == NULL) + return(0); + + q = NULL; + p = ql; + + while(p) { + if(strcmp(p->dest, dest) == 0) { + *el = p; + if(prev) + *prev = q; + return(1); + } + + q = p; + p = p->next; + }; + + return(0); +} + + +static +int +queue_list_add(struct queue_list **ql, const char *dest, struct event_queue *eq) +{ + struct queue_list *el; + + assert(dest != NULL); + assert(eq != NULL); + assert(ql != NULL); + + el = malloc(sizeof(*el)); + if(el == NULL) { + set_error(IL_NOMEM, ENOMEM, "queue_list_add: not enough room for new queue"); + return(-1); + } + + el->dest = strdup(dest); + if(el->dest == NULL) { + free(el); + set_error(IL_NOMEM, ENOMEM, "queue_list_add: not enough memory for new queue"); + return(-1); + } + el->queue = eq; + el->next = queues; + *ql = el; + return 0; +} + + +struct event_queue * +queue_list_get(char *job_id_s) +{ + char *dest; + struct queue_list *q; + struct event_queue *eq; + dest = job_id_s; + + if(dest == NULL) + return(NULL); + + if(queue_list_find(queues, dest, &q, NULL)) { + return(q->queue); + } else { + eq = event_queue_create(dest); + if(eq) + queue_list_add(&queues, dest, eq); + return(eq); + } +} + + +int +queue_list_is_log(struct event_queue *eq) +{ + return(eq == queue_list_get(NULL)); +} + + +int +queue_list_init(char *ls) +{ + return(queue_list_create()); +} + + +static struct queue_list *current; + + +struct event_queue * +queue_list_first() +{ + current = queues; + return(current ? current->queue : NULL); +} + + +struct event_queue * +queue_list_next() +{ + current = current ? current->next : NULL; + return(current ? current->queue : NULL); +} + + +int +queue_list_remove_queue(struct event_queue *eq) +{ + assert(eq != NULL); + + free(eq); + return(1); +} + + + +/* Local Variables: */ +/* c-indentation-style: gnu */ +/* End: */ diff --git a/org.glite.lb.logger/src/send_event_http.c b/org.glite.lb.logger/src/send_event_http.c new file mode 100644 index 0000000..6eedcaf --- /dev/null +++ b/org.glite.lb.logger/src/send_event_http.c @@ -0,0 +1,282 @@ +#ident "$Header$" + +#include +#include +#include +#ifdef HAVE_UNISTD_H +#include +#endif +#include +#include +#include +#include + + +/* + * - L/B server protocol handling routines + */ + +#include "glite/jobid/cjobid.h" +#include "glite/lb/il_string.h" +#include "glite/lb/context.h" + +#include "interlogd.h" + +struct reader_data { + edg_wll_GssConnection *gss; + struct timeval *timeout; +}; + + +static +int +gss_reader(void *user_data, char *buffer, int max_len) +{ + int ret; + struct reader_data *data = (struct reader_data *)user_data; + edg_wll_GssStatus gss_stat; + + ret = edg_wll_gss_read(data->gss, buffer, max_len, data->timeout, &gss_stat); + if(ret < 0) { + char *gss_err = NULL; + + if(ret == EDG_WLL_GSS_ERROR_GSS) { + edg_wll_gss_get_error(&gss_stat, "get_reply", &gss_err); + set_error(IL_DGGSS, ret, gss_err); + free(gss_err); + } else + set_error(IL_DGGSS, ret, "get_reply"); + } + return(ret); +} + + +/* + * Read reply from server. + * Returns: -1 - error reading message, + * code > 0 - http status code from server + */ +static +int +get_reply(struct event_queue *eq, char **buf, int *code_min) +{ + int ret, code; + int len; + struct timeval tv; + struct reader_data data; + il_http_message_t msg; + + tv.tv_sec = TIMEOUT; + tv.tv_usec = 0; + data.gss = &eq->gss; + data.timeout = &tv; + len = receive_http(&data, gss_reader, &msg); + if(len < 0) { + set_error(IL_PROTO, LB_PROTO, "get_reply: error reading server reply"); + return(-1); + } + if(msg.data) free(msg.data); + if(msg.reply_string) *buf = msg.reply_string; + *code_min = 0; /* XXX fill in flag for fault */ + return(msg.reply_code); +} + + + +/* + * Returns: 0 - not connected, timeout set, 1 - OK + */ +int +event_queue_connect(struct event_queue *eq) +{ + int ret; + struct timeval tv; + edg_wll_GssStatus gss_stat; + cred_handle_t *local_cred_handle; + + assert(eq != NULL); + +#ifdef LB_PERF + if(!nosend) { +#endif + + if(eq->gss.context == NULL) { + + tv.tv_sec = TIMEOUT; + tv.tv_usec = 0; + + /* get pointer to the credentials */ + if(pthread_mutex_lock(&cred_handle_lock) < 0) + abort(); + local_cred_handle = cred_handle; + local_cred_handle->counter++; + if(pthread_mutex_unlock(&cred_handle_lock) < 0) + abort(); + + il_log(LOG_DEBUG, " trying to connect to %s:%d\n", eq->dest_name, eq->dest_port); + ret = edg_wll_gss_connect(local_cred_handle->creds, eq->dest_name, eq->dest_port, &tv, &eq->gss, &gss_stat); + if(pthread_mutex_lock(&cred_handle_lock) < 0) + abort(); + /* check if we need to release the credentials */ + --local_cred_handle->counter; + if(local_cred_handle != cred_handle && local_cred_handle->counter == 0) { + edg_wll_gss_release_cred(&local_cred_handle->creds, NULL); + free(local_cred_handle); + il_log(LOG_DEBUG, " freed credentials, not used anymore\n"); + } + if(pthread_mutex_unlock(&cred_handle_lock) < 0) + abort(); + + if(ret < 0) { + char *gss_err = NULL; + + if (ret == EDG_WLL_GSS_ERROR_GSS) + edg_wll_gss_get_error(&gss_stat, "event_queue_connect: edg_wll_gss_connect", &gss_err); + set_error(IL_DGGSS, ret, + (ret == EDG_WLL_GSS_ERROR_GSS) ? gss_err : "event_queue_connect: edg_wll_gss_connect"); + if (gss_err) free(gss_err); + eq->gss.context = NULL; + eq->timeout = TIMEOUT; + return(0); + } + } + +#ifdef LB_PERF + } +#endif + + return(1); +} + + +int +event_queue_close(struct event_queue *eq) +{ + assert(eq != NULL); + +#ifdef LB_PERF + if(!nosend) { +#endif + + if(eq->gss.context != NULL) { + edg_wll_gss_close(&eq->gss, NULL); + eq->gss.context = NULL; + } +#ifdef LB_PERF + } +#endif + return(0); +} + + +/* + * Send all events from the queue. + * Returns: -1 - system error, 0 - not sent, 1 - queue empty + */ +int +event_queue_send(struct event_queue *eq) +{ + int events_sent = 0; + assert(eq != NULL); + +#ifdef LB_PERF + if(!nosend) { +#endif + if(eq->gss.context == NULL) + return(0); +#ifdef LB_PERF + } +#endif + + /* feed the server with events */ + while (!event_queue_empty(eq)) { + struct server_msg *msg; + char *rep; + int ret, code, code_min; + size_t bytes_sent; + struct timeval tv; + edg_wll_GssStatus gss_stat; + + clear_error(); + + if(event_queue_get(eq, &msg) < 0) + return(-1); + + il_log(LOG_DEBUG, " trying to deliver event at offset %d for job %s\n", msg->offset, msg->job_id_s); + +#ifdef LB_PERF + if(!nosend) { +#endif + /* XXX: ljocha -- does it make sense to send empty messages ? */ + if (msg->len) { + tv.tv_sec = TIMEOUT; + tv.tv_usec = 0; + ret = edg_wll_gss_write_full(&eq->gss, msg->msg, msg->len, &tv, &bytes_sent, &gss_stat); + if(ret < 0) { + if (ret == EDG_WLL_GSS_ERROR_ERRNO && errno == EPIPE && events_sent > 0) { + eq->timeout = 0; + } else { + il_log(LOG_ERR, "send_event: %s\n", error_get_msg()); + eq->timeout = TIMEOUT; + } + return(0); + } + if((code = get_reply(eq, &rep, &code_min)) < 0) { + /* could not get the reply properly, so try again later */ + if (events_sent>0) + eq->timeout = 1; + else { + eq->timeout = TIMEOUT; + il_log(LOG_ERR, " error reading server %s reply:\n %s\n", eq->dest_name, error_get_msg()); + } + return(0); + } + } + else { code = 200; code_min = 0; rep = strdup("not sending empty message"); } +#ifdef LB_PERF + } else { + glite_wll_perftest_consumeEventIlMsg(msg->msg+17); + code = 200; + rep = strdup("OK"); + } +#endif + + il_log(LOG_DEBUG, " event sent, server %s replied with %d, %s\n", eq->dest_name, code, rep); + free(rep); + + /* the reply is back here, decide what to do with message */ + /* HTTP error codes: + 1xx - informational (eg. 100 Continue) + 2xx - successful (eg. 200 OK) + 3xx - redirection (eg. 301 Moved Permanently) + 4xx - client error (eq. 400 Bad Request) + 5xx - server error (eq. 500 Internal Server Error) + */ + if(code >= 500 && code < 600) { + + /* non fatal errors (for us), try to deliver later */ + eq->timeout = TIMEOUT; + return(0); + } + + /* the message was consumed (successfully or not) */ + /* update the event pointer */ + if(event_store_commit(msg->es, msg->ev_len, queue_list_is_log(eq)) < 0) + /* failure committing message, this is bad */ + return(-1); + + event_queue_remove(eq); + events_sent++; + } /* while */ + + return(1); + +} /* send_events */ + + +/* this is just not used */ +int +send_confirmation(long lllid, int code) +{ + return 0; +} diff --git a/org.glite.lb.logger/src/server_msg_http.c b/org.glite.lb.logger/src/server_msg_http.c new file mode 100644 index 0000000..37be900 --- /dev/null +++ b/org.glite.lb.logger/src/server_msg_http.c @@ -0,0 +1,127 @@ +#ident "$Header$" + +#include +#include +#include + +#include "interlogd.h" +#include "glite/lb/il_msg.h" +#include "glite/lb/events_parse.h" +#include "glite/lb/context.h" + +static +int +create_msg(il_http_message_t *ev, char **buffer, long *receipt, time_t *expires) +{ + char *event = ev->data; + + *receipt = 0; + *expires = 0; + + *buffer = ev->data; + return ev->len;; +} + + +struct server_msg * +server_msg_create(il_octet_string_t *event, long offset) +{ + struct server_msg *msg; + + msg = malloc(sizeof(*msg)); + if(msg == NULL) { + set_error(IL_NOMEM, ENOMEM, "server_msg_create: out of memory allocating message"); + return(NULL); + } + + if(server_msg_init(msg, event) < 0) { + server_msg_free(msg); + return(NULL); + } + msg->offset = offset; + + return(msg); +} + + +struct server_msg * +server_msg_copy(struct server_msg *src) +{ + struct server_msg *msg; + + msg = malloc(sizeof(*msg)); + if(msg == NULL) { + set_error(IL_NOMEM, ENOMEM, "server_msg_copy: out of memory allocating message"); + return(NULL); + } + + msg->msg = malloc(src->len); + if(msg->msg == NULL) { + set_error(IL_NOMEM, ENOMEM, "server_msg_copy: out of memory allocating server message"); + server_msg_free(msg); + return(NULL); + } + msg->len = src->len; + memcpy(msg->msg, src->msg, src->len); + + msg->job_id_s = strdup(src->job_id_s); + msg->ev_len = src->ev_len; + msg->es = src->es; + msg->receipt_to = src->receipt_to; + msg->offset = src->offset; +#if defined(IL_NOTIFICATIONS) + msg->dest_name = strdup(src->dest_name); + msg->dest_port = src->dest_port; + msg->dest = strdup(src->dest); +#endif + msg->expires = src->expires; + return(msg); +} + + +int +server_msg_init(struct server_msg *msg, il_octet_string_t *event) +{ + il_http_message_t *hmsg = (il_http_message_t *)event; + + assert(msg != NULL); + assert(event != NULL); + + memset(msg, 0, sizeof(*msg)); + + + msg->job_id_s = hmsg->host; + if(msg->job_id_s == NULL) { + set_error(IL_LBAPI, EDG_WLL_ERROR_PARSE_BROKEN_ULM, "server_msg_init: error getting id"); + return -1; + } + msg->len = create_msg(hmsg, &msg->msg, &msg->receipt_to, &msg->expires); + if(msg->len < 0) + return -1; + /* set this to indicate new data owner */ + hmsg->data = NULL; + msg->ev_len = hmsg->len; /* XXX: add lentgh size too */ + return 0; + +} + + +int +server_msg_is_priority(struct server_msg *msg) +{ + assert(msg != NULL); + + return(msg->receipt_to != 0); +} + + +int +server_msg_free(struct server_msg *msg) +{ + assert(msg != NULL); + + if(msg->msg) free(msg->msg); + if(msg->job_id_s) free(msg->job_id_s); + free(msg); + return 0; +}