return(buffer);
}
-struct file_reader_data {
- int fd;
- size_t max_len;
- size_t pos;
-};
-
-#define IL_RD_VALUE(a,b) ((struct file_reader_data*)(a))->b
-
static
int
file_reader(void *user_data, char *buffer, const int len)
{
- int l, m, ret;
+ size_t ret = 0;
- m = IL_RD_VALUE(user_data, max_len) - IL_RD_VALUE(user_data, pos);
- l = (len > m) ? m : len;
- if(l > 0)
- ret = read(IL_RD_VALUE(user_data, fd), buffer, l);
- else
- ret = 0;
- IL_RD_VALUE(user_data, pos) += ret;
+ if(len > 0) {
+ ret = fread(buffer, 1, len, (FILE*)user_data);
+ if(ret == 0 && ferror((FILE*)user_data)) {
+ return -1;
+ }
+ }
return ret;
}
int
read_event_string(FILE *file, il_http_message_t *msg)
{
- struct file_reader_data rd;
- char s_len[20];
int len, ret;
int fd = fileno(file);
-
- len = read(fd, s_len, sizeof(s_len));
- if(len != sizeof(s_len)) {
- if(len < 0)
- set_error(IL_SYS, errno, "read_event_string: error reading record header");
- else
- set_error(IL_SYS, EIO, "read_event_string: record header too short");
- return -1;
+ long start;
+
+ /* remember the start position */
+ start = ftell(file);
+ ret = receive_http(file, file_reader, msg);
+ if(ret < 0) return ret;
+ /* seek at the end of message in case the reader read ahead */
+ len = fseek(file, start + msg->len, SEEK_SET);
+ len = fgetc(file);
+ if(len != '\n') {
+ il_log(LOG_ERR, "error reading event from file, missing terminator character at %d, found %c(%d))\n",
+ start+msg->len, len, len);
+ if(msg->data) { free(msg->data); msg->data = NULL; }
+ if(msg->host) { free(msg->host); msg->host = NULL; }
+ return EINVAL;
}
- if(s_len[0] != 0 || s_len[sizeof(s_len) - 1] != 0) {
- set_error(IL_SYS, EINVAL, "read_event_string: invalid record header");
- return -1;
- }
- len = atoi(s_len + 1);
- if(len < 0) {
- set_error(IL_SYS, EINVAL, "read_event_string: invalid record length in header");
- return -1;
- }
- rd.fd = fd;
- rd.max_len = len;
- rd.pos = 0;
- ret = receive_http(&rd, file_reader, msg);
return ret;
}
/* skip all committed or already enqueued events */
/* be careful - check, if the offset really points to the
beginning of event string */
- if(fseek(ef, last, SEEK_SET) < 0) {
+ if(fseek(ef, last - 1, SEEK_SET) < 0) {
set_error(IL_SYS, errno, "event_store_recover: error setting position for read");
event_store_unlock(es);
fclose(ef);
return(-1);
}
- /* the new event MUST start with 0 */
- if((c=fgetc(ef)) != 0) {
+ /* the last enqueued event MUST end with \n */
+ if((c=fgetc(ef)) != '\n') {
/* Houston, we have got a problem */
il_log(LOG_WARNING,
" file position %ld does not point at the beginning of event string, backing off!\n",
}
} else {
/* OK, break out of the loop */
- fseek(ef, -1, SEEK_CUR); /* should ungetc, but we are reading with read... */
break;
}
} else {
ret = -1;
/* create message for server */
- msg = server_msg_create(&hmsg, last);
+ msg = server_msg_create((il_octet_string_t*)&hmsg, last);
if(msg == NULL) {
il_log(LOG_ALERT, " event file corrupted! I will try to move it to quarantine (ie. rename it).\n");
/* actually do not bother if quarantine succeeded or not - we could not do more */
/* first enqueue to the LS */
if(!bs_only && (last >= es->last_committed_ls)) {
- il_log(LOG_DEBUG, " queueing event at %ld to logging server\n", last);
+ il_log(LOG_DEBUG, " queueing event at %ld to server %s\n", last, eq_l->dest_name);
#if !defined(IL_NOTIFICATIONS)
if(enqueue_msg(eq_l, msg) < 0)
if((eq_b != eq_l) &&
(last >= es->last_committed_bs)) {
- il_log(LOG_DEBUG, " queueing event at %ld to bookkeeping server\n", last);
+ il_log(LOG_DEBUG, " queueing event at %ld to server %s\n", last, eq_b->dest_name);
if(enqueue_msg(eq_b, msg) < 0)
break;
int
event_store_sync(struct event_store *es, long offset)
{
+ int ret;
+
assert(es != NULL);
- /* all events actually come through socket before going to file,
- so nothing can be found in file that was not seen here */
- return 1;
+ /* all events are actually read from file, the event on socket
+ * is ignored and serves just to notify us about file change
+ */
+ ret = event_store_recover(es);
+ ret = (ret < 0) ? ret : 0;
+ return(ret);
}
#include <errno.h>
#include <assert.h>
#include <string.h>
-#include <unistd.h>
-#include <fcntl.h>
#include <stdio.h>
-#include <sys/stat.h>
-#include <unistd.h>
-#include <errno.h>
#include "interlogd.h"
}
-int
-store_to_file(il_http_message_t *msg, long *offset) {
- char s_len[20];
- char filename[PATH_MAX];
- FILE *outfile;
- int i, filedesc;
- int ret = -1;
-
- if(msg->host == NULL) {
- set_error(IL_PROTO, EINVAL, "store_to_file: no message destination specified");
- }
-
- snprintf(filename, sizeof(filename), "%s.%s", file_prefix, msg->host);
- filename[sizeof(filename) - 1] = 0;
- snprintf(s_len+1, sizeof(s_len)-1, "%18d\n", msg->len);
- s_len[sizeof(s_len) - 1] = 0;
- s_len[0] = 0;
-
-try_again:
- if((outfile = fopen(filename, "a")) == NULL) {
- set_error(IL_SYS, errno, "store_to_file: error opening file");
- goto cleanup;
- }
- if((filedesc = fileno(outfile)) < 0) {
- set_error(IL_SYS, errno, "store_to_file: error getting file descriptor");
- goto cleanup;
- }
-
- for(i = 0; i < 5; i++) {
- struct flock filelock;
- int filelock_status;
- struct stat statbuf;
-
- filelock.l_type = F_WRLCK;
- filelock.l_whence = SEEK_SET;
- filelock.l_start = 0;
- filelock.l_len = 0;
-
- if((filelock_status=fcntl(filedesc, F_SETLK, &filelock)) < 0) {
- switch(errno) {
- case EAGAIN:
- case EACCES:
- case EINTR:
- if((i+1) < 5) sleep(1);
- break;
- default:
- set_error(IL_SYS, errno, "store_to_file: error locking file");
- goto cleanup;
- }
- } else {
- if(stat(filename, &statbuf)) {
- if(errno == ENOENT) {
- fclose(outfile);
- goto try_again;
- } else {
- set_error(IL_SYS, errno, "store_file: could not stat file");
- goto cleanup;
- }
- } else {
- /* success */
- break;
- }
- }
- }
-
- if(i == 5) {
- set_error(IL_SYS, ETIMEDOUT, "store_to_file: timed out trying to lock file");
- goto cleanup;
- }
- if(fseek(outfile, 0, SEEK_END) < 0) {
- set_error(IL_SYS, errno, "store_to_file: error seeking at end of file");
- goto cleanup;
- }
- if((*offset=ftell(outfile)) < 0) {
- set_error(IL_SYS, errno, "store_to_file: error getting current position");
- goto cleanup;
- }
- if(fwrite(s_len, sizeof(s_len), 1, outfile) != 1) {
- set_error(IL_SYS, errno, "store_to_file: error writing data header to file");
- goto cleanup;
- }
- if(fwrite(msg->data, msg->len, 1, outfile) != 1) {
- set_error(IL_SYS, errno, "store_to_file: error writing data to file");
- goto cleanup;
- }
- ret = 0;
- fflush(outfile);
-
-cleanup:
- if(outfile) fclose(outfile);
- return ret;
-}
-
-
-int
-send_reply(int sd)
-{
- const char reply[] = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>"
- "<SOAP-ENV:Envelope"
- " xmlns:SOAP-ENV=\"http://schemas.xmlsoap.org/soap/envelope/\""
- " xmlns:SOAP-ENC=\"http://schemas.xmlsoap.org/soap/encoding/\""
- " xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\""
- " xmlns:xsd=\"http://www.w3.org/2001/XMLSchema\""
- " xmlns:ns3=\"http://glite.org/wsdl/types/jp\""
- " xmlns:ns1=\"http://glite.org/wsdl/services/jp\""
- " xmlns:ns2=\"http://glite.org/wsdl/elements/jp\">"
- " <SOAP-ENV:Body>"
- " <ns2:UpdateJobsResponse>"
- " </ns2:UpdateJobsResponse>"
- " </SOAP-ENV:Body>"
- "</SOAP-ENV:Envelope>";
-
- return(write(sd, reply, sizeof(reply)));
-}
-
/*
* Returns: -1 on error, 0 if no message available, message length otherwise
return 0;
}
- if(store_to_file(&msg, offset) < 0) {
- close(accepted);
- return -1;
- }
-
- send_reply(accepted);
close(accepted);
+ *offset = -1;
return(msg.len);
}
#endif