static
int
-handle_cmd(char *event, long offset)
+handle_cmd(il_octet_string_t *event, long offset)
{
char *job_id_s;
struct event_queue *eq;
struct timeval tv;
/* parse command */
- if(parse_cmd(event, &job_id_s, &receipt, &timeout) < 0)
+ if(parse_cmd(event->data, &job_id_s, &receipt, &timeout) < 0)
return(0);
#if defined(INTERLOGD_FLUSH)
static
int
-handle_msg(char *event, long offset)
+handle_msg(il_octet_string_t *event, long offset)
{
struct server_msg *msg = NULL;
#if !defined(IL_NOTIFICATIONS)
{
/* receive events */
while(1) {
- char *msg;
+ il_octet_string_t msg;
long offset;
int ret;
+ if(killflg)
+ exit(0);
+
clear_error();
if((ret = input_queue_get(&msg, &offset, INPUT_TIMEOUT)) < 0)
{
}
#ifdef PERF_EMPTY
- glite_wll_perftest_consumeEventString(msg);
- free(msg);
+ glite_wll_perftest_consumeEventString(msg.data);
+ free(msg.data);
continue;
#endif
#ifdef INTERLOGD_HANDLE_CMD
- ret = handle_cmd(msg, offset);
+ ret = handle_cmd(&msg, offset);
if(ret == 0)
#endif
- ret = handle_msg(msg, offset);
- free(msg);
+ ret = handle_msg(&msg, offset);
+ free(msg.data);
if(ret < 0)
switch (error_get_maj()) {
case IL_SYS:
}
+#define DEFAULT_CHUNK_SIZE 1024
+
static
-char *
-read_event(int sock, long *offset)
+int
+read_event(int sock, long *offset, il_octet_string_t *msg)
{
char *buffer, *p, *n;
- int len, alen;
- char buf[256];
+ int len, alen, i, chunk_size = DEFAULT_CHUNK_SIZE;
+ static char buf[1024];
+
+ msg->data = NULL;
+ msg->len = 0;
/* receive offset */
len = recv(sock, offset, sizeof(*offset), MSG_NOSIGNAL);
if(len < sizeof(*offset)) {
set_error(IL_PROTO, errno, "read_event: error reading offset");
- return(NULL);
+ return(-1);
}
/* receive event string */
- buffer=malloc(1024);
+ buffer=malloc(8*chunk_size);
if(buffer == NULL) {
set_error(IL_NOMEM, ENOMEM, "read_event: no room for event");
- return(NULL);
+ return(-1);
}
p = buffer;
+ alen = 8*chunk_size;
- alen = 1024;
+ /* Variables used here:
+ - buffer points to allocated memory,
+ - alen is the allocated memory size,
+ - p points to the first free location in buffer,
+ - len is the amount actually read by recv,
+ - i is the amount of data belonging to the current event (including separator).
+ - n points to event separator or is NULL
+ Hence:
+ (p - buffer) gives the amount of valid data read so far,
+ (alen - (p - buffer)) is the free space,
+ */
+
+#if 1
+ /* Reading events - optimized version. Attempts to increase chunks read by recv
+ * when there are more data, reads directly into destination memory (instead of
+ * copying from static buffer) etc.
+ *
+ * For some reason it is not much faster than the old variant.
+ */
+ do {
+ /* prepare at least chunk_size bytes for next data */
+ if(alen - (p - buffer) < chunk_size) {
+ alen += (chunk_size < 8192) ? 8192 : 8*chunk_size;
+ n = realloc(buffer, alen);
+ if(n == NULL) {
+ free(buffer);
+ set_error(IL_NOMEM, ENOMEM, "read_event: no room for event");
+ return(-1);
+ }
+ p = n + (p - buffer);
+ buffer = n;
+ }
+
+ /* read chunk */
+ if((len=recv(sock, p, chunk_size, MSG_PEEK | MSG_NOSIGNAL)) > 0) {
+ /* find the end of event, if any */
+ /* faster (and dirty) way of doing strnchr (which is not in libc, anyway) */
+ if((n=memccpy(p, p, EVENT_SEPARATOR, len)) != NULL) {
+ i = n - p; /* length including separator */
+ } else {
+ i = len;
+ /* long event, huh? try reading more data at once */
+ chunk_size += 1024;
+ }
+ /* remove the relevant data from input */
+ /* i > 0 */
+ if(recv(sock, p, i, MSG_NOSIGNAL) != i) {
+ set_error(IL_SYS, errno, "read_event: error reading data");
+ free(buffer);
+ return(-1);
+ }
+ /* move the pointer to the first free place, separator is considered free space */
+ p = (n == NULL) ? p + len : n - 1;
+ }
+ } while ( (len > 0) && (n == NULL) );
+
+#else
+ /* Reading events - original version.
+ * Appears to behave quite good, anyway.
+ */
while((len=recv(sock, buf, sizeof(buf), MSG_PEEK | MSG_NOSIGNAL)) > 0) {
- int i;
/* we have to be prepared for sizeof(buf) bytes */
if(alen - (p - buffer) < (int)sizeof(buf)) {
if(n == NULL) {
free(buffer);
set_error(IL_NOMEM, ENOMEM, "read_event: no room for event");
- return(NULL);
+ return(-1);
}
p = p - buffer + n;
buffer = n;
}
/* copy all relevant bytes from buffer */
- for(i=0; (i < len) && (buf[i] != EVENT_SEPARATOR); i++)
- *p++ = buf[i];
+ n = (char*)memccpy(p, buf, EVENT_SEPARATOR, len);
+ if(n) {
+ /* separator found */
+ n--; /* but do not preserve it */
+ i = n - p;
+ p = n;
+ } else {
+ /* separator not found */
+ i = len;
+ p += len;
+ }
+ /* This was definitely slowing us down:
+ * for(i=0; (i < len) && (buf[i] != EVENT_SEPARATOR); i++)
+ * *p++ = buf[i];
+ */
/* remove the data from queue */
if(i > 0)
if(recv(sock, buf, i, MSG_NOSIGNAL) != i) {
set_error(IL_SYS, errno, "read_event: error reading data");
free(buffer);
- return(NULL);
+ return(-1);
}
if(i < len)
/* the event is complete */
break;
}
+#endif
/* terminate buffer */
*p = 0;
if(len < 0) {
set_error(IL_SYS, errno, "read_event: error reading data");
free(buffer);
- return(NULL);
+ return(-1);
}
/* if len == 0, we have not encountered EVENT_SEPARATOR and thus the event is not complete */
if(len == 0) {
set_error(IL_PROTO, errno, "read_event: error reading data - premature EOF");
free(buffer);
- return(NULL);
+ return(-1);
}
+#if 0
/* this is probably not necessary at all:
either len <=0, which was covered before,
or 0 <= i < len => p > buffer;
if(p == buffer) {
set_error(IL_PROTO, errno, "read_event: error reading data - no data received");
free(buffer);
- return(NULL);
+ return(-1);
}
+#endif
- return(buffer);
+ msg->data = buffer;
+ msg->len = p - buffer;
+ return(msg->len);
}
*/
#ifdef PERF_EVENTS_INLINE
int
-input_queue_get(char **buffer, long *offset, int timeout)
+input_queue_get(il_octet_string *buffer, long *offset, int timeout)
{
static long o = 0;
int len;
- len = glite_wll_perftest_produceEventString(buffer);
+ len = glite_wll_perftest_produceEventString(&buffer->data);
+ buffer->len = len;
if(len) {
o += len;
*offset = o;
}
#else
int
-input_queue_get(char **buffer, long *offset, int timeout)
+input_queue_get(il_octet_string_t *buffer, long *offset, int timeout)
{
fd_set fds;
struct timeval tv;
return(0);
case -1: /* error */
- set_error(IL_SYS, errno, "input_queue_get: error waiting for event");
- return(-1);
-
+ switch(errno) {
+ case EINTR:
+ il_log(LOG_DEBUG, " interrupted while waiting for event!\n");
+ return(0);
+
+ default:
+ set_error(IL_SYS, errno, "input_queue_get: error waiting for event");
+ return(-1);
+ }
default:
- break;
+ break;
}
if((accepted=accept(sock, NULL, NULL)) < 0) {
return(-1);
}
- *buffer = read_event(accepted, offset);
+ read_event(accepted, offset, buffer);
close(accepted);
- if(*buffer == NULL) {
+ if(buffer->data == NULL) {
if(error_get_maj() != IL_OK)
return(-1);
else
return(0);
}
- return(strlen(*buffer));
+ return(buffer->len);
}
#endif
static
int
-create_msg(char *event, char **buffer, long *receipt)
+create_msg(il_octet_string_t *ev, char **buffer, long *receipt)
{
char *p; int len;
-
+ char *event = ev->data;
+
*receipt = 0;
#if defined(INTERLOGD_EMS)
}
#endif
- len = encode_il_msg(buffer, event);
+ len = encode_il_msg(buffer, ev);
if(len < 0) {
set_error(IL_NOMEM, ENOMEM, "create_msg: out of memory allocating message");
return(-1);
struct server_msg *
-server_msg_create(char *event, long offset)
+server_msg_create(il_octet_string_t *event, long offset)
{
struct server_msg *msg;
int
-server_msg_init(struct server_msg *msg, char *event)
+server_msg_init(struct server_msg *msg, il_octet_string_t *event)
{
#if defined(IL_NOTIFICATIONS)
edg_wll_Context context;
#endif
assert(msg != NULL);
+ assert(event != NULL);
memset(msg, 0, sizeof(*msg));
if(msg->len < 0) {
return(-1);
}
- msg->job_id_s = edg_wll_GetJobId(event);
+ msg->job_id_s = edg_wll_GetJobId(event->data);
#endif
/* remember to add event separator to the length */
- msg->ev_len = strlen(event) + 1;
+ msg->ev_len = event->len + 1;
if(msg->job_id_s == NULL) {
set_error(IL_LBAPI, EDG_WLL_ERROR_PARSE_BROKEN_ULM, "server_msg_init: error getting id");