return sock;
}
-/*!
- *----------------------------------------------------------------------
- * Write to socket
- * Needn't write entire buffer. Timeout is applicable only for non-blocking
- * connections
- * \param sock IN: connection to work with
- * \param buf IN: buffer
- * \param bufsize IN: max size to write
- * \param timeout INOUT: max time allowed for operation, remaining time on return
- * \retval bytes written (>0) on success
- * \retval -1 on write error
- *----------------------------------------------------------------------
- */
-static ssize_t edg_wll_socket_write(int sock,const void *buf,size_t bufsize,struct timeval *timeout)
-{
- ssize_t len = 0, ret = 0;
- fd_set fds;
- struct timeval to,before,after;
-
- if (timeout) {
- memcpy(&to,timeout,sizeof to);
- gettimeofday(&before,NULL);
- }
- len = write(sock,buf,bufsize);
- while (len <= 0) {
- FD_ZERO(&fds);
- FD_SET(sock,&fds);
- if ((ret=select(sock+1,&fds,NULL,NULL,timeout?&to:NULL)) < 0) {
- edg_wll_ll_log(LOG_ERR,"edg_wll_socket_write(): error selecting socket\n");
- SYSTEM_ERROR("select");
- break;
- }
- len = write(sock,buf,bufsize);
- }
- if (timeout) {
- gettimeofday(&after,NULL);
- tv_sub(after,before);
- tv_sub(*timeout,after);
- if (timeout->tv_sec < 0) {
- timeout->tv_sec = 0;
- timeout->tv_usec = 0;
- }
- }
- return len;
-}
-
-/*!
- *----------------------------------------------------------------------
- * Write specified amount of data to socket
- * Attempts to call edg_wll_socket_write() untill the entire request is satisfied
- * (or times out).
- * \param sock IN: connection to work with
- * \param buf IN: buffer
- * \param bufsize IN: max size to write
- * \param timeout INOUT: max time allowed for operation, remaining time on return
- * \param total OUT: bytes actually written
- * \retval bytes written (>0) on success
- * \retval -1 on write error
- *----------------------------------------------------------------------
- */
-static ssize_t edg_wll_socket_write_full(int sock,void *buf,size_t bufsize,struct timeval *timeout,ssize_t *total)
-{
- ssize_t len;
- *total = 0;
-
- while (*total < bufsize) {
- len = edg_wll_socket_write(sock,buf+*total,bufsize-*total,timeout);
- if (len < 0) return len;
- *total += len;
- }
- return 0;
-}
-
/*
*----------------------------------------------------------------------
*
i = 0;
open_event_file:
+
/* fopen and properly handle the filelock */
- if ((outfile = fopen(outfilename,"a")) == NULL) {
- edg_wll_ll_log(LOG_INFO,"error.\n");
- SYSTEM_ERROR("fopen");
- answer = errno;
- goto edg_wll_log_proto_server_end;
- } else {
- edg_wll_ll_log(LOG_INFO,".");
- }
- if ((filedesc = fileno(outfile)) == -1) {
- edg_wll_ll_log(LOG_INFO,"error.\n");
- SYSTEM_ERROR("fileno");
- answer = errno;
- fclose(outfile);
- goto edg_wll_log_proto_server_end;
- } else {
- edg_wll_ll_log(LOG_INFO,".");
- }
- filelock.l_type = F_WRLCK;
- filelock.l_whence = SEEK_SET;
- filelock.l_start = 0;
- filelock.l_len = 0;
- if ((filelock_status=fcntl(filedesc,F_SETLK,&filelock) < 0) && (i < FCNTL_ATTEMPTS)) {
- fclose(outfile);
- edg_wll_ll_log(LOG_DEBUG,"\nWaiting %d seconds for filelock to open...\n",FCNTL_TIMEOUT);
- sleep(FCNTL_TIMEOUT);
- i++;
- goto open_event_file;
- }
- if (filelock_status < 0) {
- edg_wll_ll_log(LOG_INFO,"error.\n");
- SYSTEM_ERROR("fcntl");
- answer = errno;
- fclose(outfile);
- goto edg_wll_log_proto_server_end;
- } else {
- edg_wll_ll_log(LOG_INFO,".");
- }
- if (fseek(outfile, 0, SEEK_END) == -1) {
- SYSTEM_ERROR("fseek");
- answer = errno;
- fclose(outfile);
- goto edg_wll_log_proto_server_end;
- }
- if ((filepos=ftell(outfile)) == -1) {
- SYSTEM_ERROR("ftell");
- answer = errno;
- fclose(outfile);
- goto edg_wll_log_proto_server_end;
- }
- /* write, flush and sync */
- if (fputs(msg,outfile) == EOF) {
- edg_wll_ll_log(LOG_INFO,"error.\n");
- SYSTEM_ERROR("fputs");
- answer = errno;
- fclose(outfile);
- goto edg_wll_log_proto_server_end;
- }
- if (fflush(outfile) == EOF) {
- edg_wll_ll_log(LOG_INFO,"error.\n");
- SYSTEM_ERROR("fflush");
- answer = errno;
- fclose(outfile);
- goto edg_wll_log_proto_server_end;
- }
- if (fsync(filedesc) < 0) { /* synchronize */
+ if ( edg_wll_log_event_write(context, outfilename, msg, FCNTL_ATTEMPTS, FCNTL_TIMEOUT, &filepos) ) {
+ char *errd;
+ answer = edg_wll_Error(context, NULL, &errd);
edg_wll_ll_log(LOG_INFO,"error.\n");
- SYSTEM_ERROR("fsync");
- answer = errno;
- fclose(outfile);
+ SYSTEM_ERROR(errd);
+ free(errd);
goto edg_wll_log_proto_server_end;
- } else {
- edg_wll_ll_log(LOG_INFO,"o.k.\n");
- }
- /* close and unlock */
- fclose(outfile);
+ } else edg_wll_ll_log(LOG_INFO,"o.k.");
} else {
filepos = 0;
}
/* send message via IPC (UNIX socket) */
if (!noipc) {
- struct sockaddr_un saddr;
- edg_wll_ll_log(LOG_INFO,"The message will be send via IPC (UNIX socket):\n");
-
- /* initialize socket */
- edg_wll_ll_log(LOG_DEBUG,"Initializing UNIX socket...\n");
-
- edg_wll_ll_log(LOG_DEBUG,"- Getting UNIX socket descriptor...");
- msg_sock = socket(PF_UNIX, SOCK_STREAM, 0);
- if(msg_sock < 0) {
- edg_wll_ll_log(LOG_DEBUG,"error.\n");
- SYSTEM_ERROR("socket");
- answer = errno;
- goto edg_wll_log_proto_server_end;
- } else {
- edg_wll_ll_log(LOG_DEBUG,"o.k.\n");
- }
+ edg_wll_ll_log(LOG_DEBUG,
+ "Sending via IPC (UNIX socket \"%s\")\n\t"
+ "the message position %ld (%d bytes)...",
+ outfilename, filepos, sizeof(filepos));
- edg_wll_ll_log(LOG_DEBUG,"- Setting UNIX socket parameters...");
- memset(&saddr, 0, sizeof(saddr));
- saddr.sun_family = AF_UNIX;
- strcpy(saddr.sun_path, socket_path);
- edg_wll_ll_log(LOG_DEBUG,"o.k.\n");
-
- edg_wll_ll_log(LOG_DEBUG,"-- adding O_NONBLOCK to socket parameters...");
- if ((flags = fcntl(msg_sock, F_GETFL, 0)) < 0 ||
- fcntl(msg_sock, F_SETFL, flags | O_NONBLOCK) < 0) {
- edg_wll_ll_log(LOG_DEBUG,"error.\n");
- SYSTEM_ERROR("fcntl");
- answer = errno;
- close(msg_sock);
- goto edg_wll_log_proto_server_end;
- } else {
- edg_wll_ll_log(LOG_DEBUG,"o.k.\n");
- }
-
- /* for priority messages initialize also another socket for confirmation */
if (event->any.priority) {
edg_wll_ll_log(LOG_DEBUG,"- Initializing 2nd UNIX socket for priority messages confirmation...");
if(init_confirmation() < 0) {
}
}
- edg_wll_ll_log(LOG_DEBUG,"Connecting to UNIX socket...");
- for (i=0; i < CONNECT_ATTEMPTS; i++) {
- if(connect(msg_sock, (struct sockaddr *)&saddr, sizeof(saddr)) < 0) {
- if ((errno == EAGAIN) || (errno == ETIMEDOUT)) {
- edg_wll_ll_log(LOG_DEBUG,".");
- sleep(CONNECT_TIMEOUT);
- continue;
- } else if (errno == EISCONN) {
- edg_wll_ll_log(LOG_DEBUG,"warning.\n");
- edg_wll_ll_log(LOG_ERR,"The socket is already connected!\n");
- break;
- } else {
- edg_wll_ll_log(LOG_DEBUG,"error.\n");
- SYSTEM_ERROR("connect");
- answer = errno;
- close(msg_sock);
- goto edg_wll_log_proto_server_end_1;
- }
- } else {
- edg_wll_ll_log(LOG_DEBUG,"o.k.\n");
- break;
- }
- }
-
- edg_wll_ll_log(LOG_DEBUG,"Sending via IPC the message position %ld (%d bytes)...", filepos, sizeof(filepos));
- count = 0;
- if (edg_wll_socket_write_full(msg_sock, &filepos, sizeof(filepos), &timeout, &count) < 0) {
- edg_wll_ll_log(LOG_DEBUG,"error.\n");
- edg_wll_ll_log(LOG_ERR,"edg_wll_socket_write_full(): error,\n");
- answer = errno;
- close(msg_sock);
- goto edg_wll_log_proto_server_end_1;
- } else {
- edg_wll_ll_log(LOG_DEBUG,"o.k.\n");
- }
-
- edg_wll_ll_log(LOG_DEBUG,"Sending via IPC the message itself (%d bytes)...",msg_size);
- if (edg_wll_socket_write_full(msg_sock, msg, msg_size, &timeout, &count) < 0) {
- edg_wll_ll_log(LOG_DEBUG,"error.\n");
- edg_wll_ll_log(LOG_ERR,"edg_wll_socket_write_full(): error.");
- answer = errno;
- close(msg_sock);
+ if ( edg_wll_log_event_send(context, socket_path, filepos, msg, msg_size, CONNECT_ATTEMPTS, &timeout) ) {
+ char *errd;
+ answer = edg_wll_Error(context, NULL, &errd);
+ edg_wll_ll_log(LOG_INFO,"error.\n");
+ SYSTEM_ERROR(errd);
+ free(errd);
goto edg_wll_log_proto_server_end_1;
- } else {
- edg_wll_ll_log(LOG_DEBUG,"o.k.\n");
- }
-
- close(msg_sock);
+ } else edg_wll_ll_log(LOG_INFO,"o.k.");
if (event->any.priority) {
edg_wll_ll_log(LOG_INFO,"Waiting for confirmation...");
-/**
- * il_lbproxy.c
- * - implementation of IL API calls for LB proxy
- *
- */
-#include <unistd.h>
-#include <fcntl.h>
-#include <sys/socket.h>
-#include <sys/un.h>
-#include <sys/types.h>
-#include <fcntl.h>
-#include <string.h>
-#include <errno.h>
-
#include "glite/lb/context-int.h"
-#include "glite/lb/events_parse.h"
-#include "glite/lb/escape.h"
#include "glite/lb/log_proto.h"
-#include "glite/lb/lb_plain_io.h"
#include "il_lbproxy.h"
-#include "lb_xml_parse.h"
-
-
#define FCNTL_ATTEMPTS 5
#define FCNTL_TIMEOUT 1
-#define FILE_PREFIX "/tmp/lbproxy_events"
-#define DEFAULT_SOCKET "/tmp/lbproxy_interlogger.sock"
+#define FILE_PREFIX "/tmp/lbproxy_events"
+#define DEFAULT_SOCKET "/tmp/lbproxy_interlogger.sock"
char *lbproxy_ilog_socket_path = DEFAULT_SOCKET;
char *lbproxy_ilog_file_prefix = FILE_PREFIX;
-#define tv_sub(a,b) {\
- (a).tv_usec -= (b).tv_usec;\
- (a).tv_sec -= (b).tv_sec;\
- if ((a).tv_usec < 0) {\
- (a).tv_sec--;\
- (a).tv_usec += 1000000;\
- }\
-}
-static int event_save_to_file_proxy(
- edg_wll_Context ctx,
- const char *event_file,
- const char *ulm_data,
- long *filepos)
+int
+edg_wll_EventSendProxy(
+ edg_wll_Context ctx,
+ const edg_wlc_JobId jobid,
+ const char *event)
{
- FILE *outfile;
- struct flock filelock;
- int filedesc,
- i, filelock_status=-1;
+ struct timeval timeout;
+ long filepos;
+ char *jobid_s,
+ *event_file = NULL;
- for( i = 0; i < FCNTL_ATTEMPTS; i++ ) {
- /* fopen and properly handle the filelock */
- if ( (outfile = fopen(event_file, "a")) == NULL ) {
- edg_wll_SetError(ctx, errno, "fopen()");
- goto out;
- }
- if ( (filedesc = fileno(outfile)) == -1 ) {
- edg_wll_SetError(ctx, errno, "fileno()");
- goto out1;
- }
- filelock.l_type = F_WRLCK;
- filelock.l_whence = SEEK_SET;
- filelock.l_start = 0;
- filelock.l_len = 0;
- filelock_status=fcntl(filedesc, F_SETLK, &filelock);
- if(filelock_status < 0) {
- switch(errno) {
- case EAGAIN:
- case EACCES:
- case EINTR:
- /* lock is held by someone else */
- sleep(FCNTL_TIMEOUT);
- break;
- default:
- /* other error */
- edg_wll_SetError(ctx, errno, "fcntl()");
- goto out1;
- }
- } else {
- /* lock acquired, break out of the loop */
- break;
- }
- }
- if (fseek(outfile, 0, SEEK_END) == -1) { edg_wll_SetError(ctx, errno, "fseek()"); goto out1; }
- if ((*filepos=ftell(outfile)) == -1) { edg_wll_SetError(ctx, errno, "ftell()"); goto out1; }
- if (fputs(ulm_data, outfile) == EOF) { edg_wll_SetError(ctx, errno, "fputs()"); goto out1; }
- if (fflush(outfile) == EOF) { edg_wll_SetError(ctx, errno, "fflush()"); goto out1; }
- if (fsync(filedesc) < 0) { edg_wll_SetError(ctx, errno, "fsync()"); goto out1; }
-
-out1:
- fclose(outfile);
-
-out:
- return edg_wll_Error(ctx, NULL, NULL)?
- edg_wll_UpdateError(ctx, 0, "event_save_to_file_proxy()"): 0;
-}
-
-
-static int event_send_socket_proxy(
- edg_wll_Context ctx,
- long filepos,
- const char *ulm_data)
-{
- edg_wll_PlainConnection conn;
- struct sockaddr_un saddr;
- int flags;
- struct timeval timeout;
-
+ edg_wll_ResetError(ctx);
timeout.tv_sec = EDG_WLL_LOG_TIMEOUT_MAX;
timeout.tv_usec = 0;
- if ( (conn.sock = socket(PF_UNIX, SOCK_STREAM, 0)) < 0 )
- edg_wll_SetError(ctx, errno, "socket()");
-
- memset(&saddr, 0, sizeof(saddr));
- saddr.sun_family = AF_UNIX;
- strcpy(saddr.sun_path, lbproxy_ilog_socket_path);
-
- if ( (flags = fcntl(conn.sock, F_GETFL, 0)) < 0
- || fcntl(conn.sock, F_SETFL, flags | O_NONBLOCK) < 0) {
- edg_wll_SetError(ctx, errno, "fcntl()");
+ jobid_s = edg_wlc_JobIdGetUnique(jobid);
+ if ( !jobid_s ) {
+ edg_wll_SetError(ctx, ENOMEM, "edg_wlc_JobIdGetUnique()");
goto out;
}
- if ( connect(conn.sock, (struct sockaddr *)&saddr, sizeof(saddr)) < 0) {
- if(errno != EISCONN) { edg_wll_SetError(ctx, errno, "connect()"); goto out; }
+ asprintf(&event_file, "%s.%s", lbproxy_ilog_file_prefix, jobid_s);
+ if ( !event_file ) {
+ edg_wll_SetError(ctx, ENOMEM, "asprintf()");
+ goto out;
}
- if ( edg_wll_plain_write_full(&conn, &filepos, sizeof(filepos), &timeout) < 0
- || edg_wll_plain_write_full(&conn, (void*)ulm_data, strlen(ulm_data), &timeout) < 0 ) {
- edg_wll_SetError(ctx, errno, "edg_wll_plain_write_full()");
+ if ( edg_wll_log_event_write(ctx, event_file, event,
+ FCNTL_ATTEMPTS, FCNTL_TIMEOUT, &filepos) ) {
+ edg_wll_UpdateError(ctx, 0, "edg_wll_log_event_write()");
goto out;
}
-out:
- close(conn.sock);
- return edg_wll_Error(ctx, NULL, NULL)?
- edg_wll_UpdateError(ctx, 0, "event_send_socket_proxy()"): 0;
-}
-
-
-int edg_wll_EventSendProxy(edg_wll_Context ctx, const edg_wlc_JobId jobid, const char *event)
-{
- long filepos;
- char *jobid_s, *event_file = NULL;
-
-
- edg_wll_ResetError(ctx);
- jobid_s = edg_wlc_JobIdGetUnique(jobid);
- if ( !jobid_s ) { edg_wll_SetError(ctx, ENOMEM, "edg_wlc_JobIdGetUnique()"); goto out; }
-
- asprintf(&event_file, "%s.%s", lbproxy_ilog_file_prefix, jobid_s);
- if ( !event_file ) { edg_wll_SetError(ctx, ENOMEM, "asprintf()"); goto out; }
-
- if ( event_save_to_file_proxy(ctx, event_file, event, &filepos)
- || event_send_socket_proxy(ctx, filepos, event) ) goto out;
+ if ( edg_wll_log_event_send(ctx, lbproxy_ilog_socket_path, filepos,
+ event, strlen(event), 1, &timeout) ) {
+ edg_wll_UpdateError(ctx, 0, "edg_wll_log_event_send()");
+ goto out;
+ }
out:
if ( jobid_s ) free(jobid_s);