- using functions edg_wll_log_event_write() and edg_wll_log_event_send() calls
authorJiří Škrábal <nykolas@ics.muni.cz>
Thu, 3 Mar 2005 13:07:09 +0000 (13:07 +0000)
committerJiří Škrábal <nykolas@ics.muni.cz>
Thu, 3 Mar 2005 13:07:09 +0000 (13:07 +0000)
  from common library

org.glite.lb.logger/src/logd_proto.c
org.glite.lb.server/src/il_lbproxy.c

index 463266a..6693ea0 100644 (file)
@@ -219,79 +219,6 @@ int do_listen(int port)
        return sock;
 }
 
-/*!
- *----------------------------------------------------------------------
- * Write to socket
- * Needn't write entire buffer. Timeout is applicable only for non-blocking
- * connections 
- * \param sock IN: connection to work with
- * \param buf IN: buffer
- * \param bufsize IN: max size to write
- * \param timeout INOUT: max time allowed for operation, remaining time on return
- * \retval bytes written (>0) on success
- * \retval -1 on write error
- *----------------------------------------------------------------------
- */
-static ssize_t edg_wll_socket_write(int sock,const void *buf,size_t bufsize,struct timeval *timeout)
-{ 
-        ssize_t        len = 0, ret = 0;
-       fd_set  fds;
-       struct timeval  to,before,after;
-
-       if (timeout) {
-               memcpy(&to,timeout,sizeof to);
-               gettimeofday(&before,NULL);
-       }
-        len = write(sock,buf,bufsize);
-        while (len <= 0) {
-               FD_ZERO(&fds);
-               FD_SET(sock,&fds);
-               if ((ret=select(sock+1,&fds,NULL,NULL,timeout?&to:NULL)) < 0) {
-                       edg_wll_ll_log(LOG_ERR,"edg_wll_socket_write(): error selecting socket\n");
-                       SYSTEM_ERROR("select"); 
-                       break;
-               }
-                len = write(sock,buf,bufsize);
-        }
-        if (timeout) {
-           gettimeofday(&after,NULL);
-           tv_sub(after,before);
-           tv_sub(*timeout,after);
-           if (timeout->tv_sec < 0) {
-                timeout->tv_sec = 0;
-                timeout->tv_usec = 0;
-           }
-       }
-        return len;
-} 
-
-/*!
- *----------------------------------------------------------------------
- * Write specified amount of data to socket
- * Attempts to call edg_wll_socket_write() untill the entire request is satisfied
- * (or times out).
- * \param sock IN: connection to work with
- * \param buf IN: buffer
- * \param bufsize IN: max size to write
- * \param timeout INOUT: max time allowed for operation, remaining time on return
- * \param total OUT: bytes actually written
- * \retval bytes written (>0) on success
- * \retval -1 on write error
- *----------------------------------------------------------------------
- */
-static ssize_t edg_wll_socket_write_full(int sock,void *buf,size_t bufsize,struct timeval *timeout,ssize_t *total)
-{
-        ssize_t        len;
-        *total = 0;
-
-        while (*total < bufsize) {
-                len = edg_wll_socket_write(sock,buf+*total,bufsize-*total,timeout);
-                if (len < 0) return len;
-                *total += len;
-        }
-        return 0;
-}
-
 /*
  *----------------------------------------------------------------------
  *
@@ -545,82 +472,16 @@ int edg_wll_log_proto_server(edg_wll_GssConnection *con, char *name, char *prefi
 
                i = 0;
 open_event_file:
+
                /* fopen and properly handle the filelock */
-               if ((outfile = fopen(outfilename,"a")) == NULL) {
-                       edg_wll_ll_log(LOG_INFO,"error.\n");
-                       SYSTEM_ERROR("fopen"); 
-                       answer = errno; 
-                       goto edg_wll_log_proto_server_end;
-               } else {
-                       edg_wll_ll_log(LOG_INFO,".");
-               }
-               if ((filedesc = fileno(outfile)) == -1) {
-                       edg_wll_ll_log(LOG_INFO,"error.\n");
-                       SYSTEM_ERROR("fileno"); 
-                       answer = errno; 
-                       fclose(outfile); 
-                       goto edg_wll_log_proto_server_end;
-               } else {
-                       edg_wll_ll_log(LOG_INFO,".");
-               }
-               filelock.l_type = F_WRLCK;
-               filelock.l_whence = SEEK_SET;
-               filelock.l_start = 0;
-               filelock.l_len = 0;
-               if ((filelock_status=fcntl(filedesc,F_SETLK,&filelock) < 0) && (i < FCNTL_ATTEMPTS)) {
-                       fclose(outfile);
-                       edg_wll_ll_log(LOG_DEBUG,"\nWaiting %d seconds for filelock to open...\n",FCNTL_TIMEOUT);
-                       sleep(FCNTL_TIMEOUT);
-                       i++;
-                       goto open_event_file;
-               }
-               if (filelock_status < 0) {
-                       edg_wll_ll_log(LOG_INFO,"error.\n");
-                       SYSTEM_ERROR("fcntl"); 
-                       answer = errno; 
-                       fclose(outfile); 
-                       goto edg_wll_log_proto_server_end;
-               } else {
-                       edg_wll_ll_log(LOG_INFO,".");
-               }
-               if (fseek(outfile, 0, SEEK_END) == -1) {
-                       SYSTEM_ERROR("fseek"); 
-                       answer = errno; 
-                       fclose(outfile); 
-                       goto edg_wll_log_proto_server_end;
-               }
-               if ((filepos=ftell(outfile)) == -1) {
-                       SYSTEM_ERROR("ftell"); 
-                       answer = errno; 
-                       fclose(outfile); 
-                       goto edg_wll_log_proto_server_end;
-               }
-               /* write, flush and sync */
-               if (fputs(msg,outfile) == EOF) {
-                       edg_wll_ll_log(LOG_INFO,"error.\n");
-                       SYSTEM_ERROR("fputs"); 
-                       answer = errno; 
-                       fclose(outfile); 
-                       goto edg_wll_log_proto_server_end;
-               }       
-               if (fflush(outfile) == EOF) {
-                       edg_wll_ll_log(LOG_INFO,"error.\n");
-                       SYSTEM_ERROR("fflush"); 
-                       answer = errno; 
-                       fclose(outfile); 
-                       goto edg_wll_log_proto_server_end;
-               }
-               if (fsync(filedesc) < 0) { /* synchronize */
+               if ( edg_wll_log_event_write(context, outfilename, msg, FCNTL_ATTEMPTS, FCNTL_TIMEOUT, &filepos) ) {
+                       char *errd;
+                       answer = edg_wll_Error(context, NULL, &errd);
                        edg_wll_ll_log(LOG_INFO,"error.\n");
-                       SYSTEM_ERROR("fsync"); 
-                       answer = errno; 
-                       fclose(outfile); 
+                       SYSTEM_ERROR(errd);
+                       free(errd);
                        goto edg_wll_log_proto_server_end;
-               } else {
-                       edg_wll_ll_log(LOG_INFO,"o.k.\n");
-               }
-               /* close and unlock */
-               fclose(outfile); 
+               } else edg_wll_ll_log(LOG_INFO,"o.k.");
        } else {
                filepos = 0;
        }
@@ -635,42 +496,11 @@ open_event_file:
 
        /* send message via IPC (UNIX socket) */
        if (!noipc) {
-               struct sockaddr_un saddr;
-               edg_wll_ll_log(LOG_INFO,"The message will be send via IPC (UNIX socket):\n");
-
-               /* initialize socket */
-               edg_wll_ll_log(LOG_DEBUG,"Initializing UNIX socket...\n");
-
-               edg_wll_ll_log(LOG_DEBUG,"- Getting UNIX socket descriptor...");
-               msg_sock = socket(PF_UNIX, SOCK_STREAM, 0);
-               if(msg_sock < 0) {
-                       edg_wll_ll_log(LOG_DEBUG,"error.\n");
-                       SYSTEM_ERROR("socket"); 
-                       answer = errno; 
-                       goto edg_wll_log_proto_server_end;
-               } else {
-                       edg_wll_ll_log(LOG_DEBUG,"o.k.\n");
-               }
+               edg_wll_ll_log(LOG_DEBUG,
+                                       "Sending via IPC (UNIX socket \"%s\")\n\t"
+                                       "the message position %ld (%d bytes)...",
+                                       outfilename, filepos, sizeof(filepos));
 
-               edg_wll_ll_log(LOG_DEBUG,"- Setting UNIX socket parameters...");
-               memset(&saddr, 0, sizeof(saddr));
-               saddr.sun_family = AF_UNIX;
-               strcpy(saddr.sun_path, socket_path);
-               edg_wll_ll_log(LOG_DEBUG,"o.k.\n");
-
-               edg_wll_ll_log(LOG_DEBUG,"-- adding O_NONBLOCK to socket parameters...");
-               if ((flags = fcntl(msg_sock, F_GETFL, 0)) < 0 ||
-                       fcntl(msg_sock, F_SETFL, flags | O_NONBLOCK) < 0) {
-                       edg_wll_ll_log(LOG_DEBUG,"error.\n");
-                       SYSTEM_ERROR("fcntl"); 
-                       answer = errno; 
-                       close(msg_sock); 
-                       goto edg_wll_log_proto_server_end;
-               } else {
-                       edg_wll_ll_log(LOG_DEBUG,"o.k.\n");
-               }
-
-               /* for priority messages initialize also another socket for confirmation */
                if (event->any.priority) {
                        edg_wll_ll_log(LOG_DEBUG,"- Initializing 2nd UNIX socket for priority messages confirmation...");
                        if(init_confirmation() < 0) { 
@@ -682,54 +512,14 @@ open_event_file:
                        }
                }         
 
-               edg_wll_ll_log(LOG_DEBUG,"Connecting to UNIX socket...");
-               for (i=0; i < CONNECT_ATTEMPTS; i++) {
-                       if(connect(msg_sock, (struct sockaddr *)&saddr, sizeof(saddr)) < 0) {
-                               if ((errno == EAGAIN) || (errno == ETIMEDOUT)) {
-                                       edg_wll_ll_log(LOG_DEBUG,"."); 
-                                       sleep(CONNECT_TIMEOUT);
-                                       continue;
-                               } else if (errno == EISCONN) {
-                                       edg_wll_ll_log(LOG_DEBUG,"warning.\n");
-                                       edg_wll_ll_log(LOG_ERR,"The socket is already connected!\n");
-                                       break;
-                               } else {
-                                       edg_wll_ll_log(LOG_DEBUG,"error.\n");
-                                       SYSTEM_ERROR("connect"); 
-                                       answer = errno; 
-                                       close(msg_sock); 
-                                       goto edg_wll_log_proto_server_end_1;
-                               }
-                       } else {
-                               edg_wll_ll_log(LOG_DEBUG,"o.k.\n");
-                               break;
-                       }
-               }
-
-               edg_wll_ll_log(LOG_DEBUG,"Sending via IPC the message position %ld (%d bytes)...", filepos, sizeof(filepos));
-               count = 0;
-               if (edg_wll_socket_write_full(msg_sock, &filepos, sizeof(filepos), &timeout, &count) < 0) {
-                       edg_wll_ll_log(LOG_DEBUG,"error.\n");
-                       edg_wll_ll_log(LOG_ERR,"edg_wll_socket_write_full(): error,\n");
-                       answer = errno; 
-                       close(msg_sock); 
-                       goto edg_wll_log_proto_server_end_1;
-               } else {
-                       edg_wll_ll_log(LOG_DEBUG,"o.k.\n");
-               }
-
-               edg_wll_ll_log(LOG_DEBUG,"Sending via IPC the message itself (%d bytes)...",msg_size);
-               if (edg_wll_socket_write_full(msg_sock, msg, msg_size, &timeout, &count) < 0) {
-                       edg_wll_ll_log(LOG_DEBUG,"error.\n");
-                       edg_wll_ll_log(LOG_ERR,"edg_wll_socket_write_full(): error."); 
-                       answer = errno; 
-                       close(msg_sock); 
+               if ( edg_wll_log_event_send(context, socket_path, filepos, msg, msg_size, CONNECT_ATTEMPTS, &timeout) ) {
+                       char *errd;
+                       answer = edg_wll_Error(context, NULL, &errd);
+                       edg_wll_ll_log(LOG_INFO,"error.\n");
+                       SYSTEM_ERROR(errd);
+                       free(errd);
                        goto edg_wll_log_proto_server_end_1;
-               } else {
-                       edg_wll_ll_log(LOG_DEBUG,"o.k.\n");
-               }
-
-               close(msg_sock);
+               } else edg_wll_ll_log(LOG_INFO,"o.k.");
 
                if (event->any.priority) {
                        edg_wll_ll_log(LOG_INFO,"Waiting for confirmation...");
index 0d768f6..10775ee 100644 (file)
-/**
- * il_lbproxy.c
- *   - implementation of IL API calls for LB proxy
- *
- */
-#include <unistd.h>
-#include <fcntl.h>
-#include <sys/socket.h>
-#include <sys/un.h>
-#include <sys/types.h>
-#include <fcntl.h>
-#include <string.h>
-#include <errno.h>
-
 #include "glite/lb/context-int.h"
-#include "glite/lb/events_parse.h"
-#include "glite/lb/escape.h"
 #include "glite/lb/log_proto.h"
-#include "glite/lb/lb_plain_io.h"
 
 #include "il_lbproxy.h"
-#include "lb_xml_parse.h"
-
-
 
 #define FCNTL_ATTEMPTS         5
 #define FCNTL_TIMEOUT          1
-#define FILE_PREFIX             "/tmp/lbproxy_events"
-#define DEFAULT_SOCKET          "/tmp/lbproxy_interlogger.sock"
+#define FILE_PREFIX                    "/tmp/lbproxy_events"
+#define DEFAULT_SOCKET         "/tmp/lbproxy_interlogger.sock"
 
 char *lbproxy_ilog_socket_path = DEFAULT_SOCKET;
 char *lbproxy_ilog_file_prefix = FILE_PREFIX;
 
-#define tv_sub(a,b) {\
-       (a).tv_usec -= (b).tv_usec;\
-       (a).tv_sec -= (b).tv_sec;\
-       if ((a).tv_usec < 0) {\
-               (a).tv_sec--;\
-               (a).tv_usec += 1000000;\
-       }\
-}
 
-static int event_save_to_file_proxy(
-       edg_wll_Context     ctx,
-       const char         *event_file,
-       const char         *ulm_data,
-       long               *filepos)
+int
+edg_wll_EventSendProxy(
+       edg_wll_Context                 ctx,
+       const edg_wlc_JobId             jobid,
+       const char                         *event)
 {
-       FILE               *outfile;
-       struct flock    filelock;
-       int                             filedesc,
-                                       i, filelock_status=-1;
+       struct timeval  timeout;
+       long                    filepos;
+       char               *jobid_s,
+                                  *event_file = NULL;
 
 
-       for( i = 0; i < FCNTL_ATTEMPTS; i++ ) {
-               /* fopen and properly handle the filelock */
-               if ( (outfile = fopen(event_file, "a")) == NULL ) {
-                       edg_wll_SetError(ctx, errno, "fopen()");
-                       goto out;
-               }
-               if ( (filedesc = fileno(outfile)) == -1 ) {
-                       edg_wll_SetError(ctx, errno, "fileno()");
-                       goto out1;
-               }
-               filelock.l_type = F_WRLCK;
-               filelock.l_whence = SEEK_SET;
-               filelock.l_start = 0;
-               filelock.l_len = 0;
-               filelock_status=fcntl(filedesc, F_SETLK, &filelock);
-               if(filelock_status < 0) {
-                       switch(errno) {
-                       case EAGAIN:
-                       case EACCES:
-                       case EINTR:
-                               /* lock is held by someone else */
-                               sleep(FCNTL_TIMEOUT);
-                               break;
-                       default:
-                               /* other error */
-                               edg_wll_SetError(ctx, errno, "fcntl()");
-                               goto out1;
-                       }
-               } else {
-                       /* lock acquired, break out of the loop */
-                       break;
-               }
-       }
-       if (fseek(outfile, 0, SEEK_END) == -1) { edg_wll_SetError(ctx, errno, "fseek()"); goto out1; }
-       if ((*filepos=ftell(outfile)) == -1) { edg_wll_SetError(ctx, errno, "ftell()"); goto out1; }
-       if (fputs(ulm_data, outfile) == EOF) { edg_wll_SetError(ctx, errno, "fputs()"); goto out1; }       
-       if (fflush(outfile) == EOF) { edg_wll_SetError(ctx, errno, "fflush()"); goto out1; }
-       if (fsync(filedesc) < 0) { edg_wll_SetError(ctx, errno, "fsync()"); goto out1; } 
-
-out1:
-       fclose(outfile); 
-
-out:
-       return edg_wll_Error(ctx, NULL, NULL)?
-                       edg_wll_UpdateError(ctx, 0, "event_save_to_file_proxy()"): 0;
-}
-
-
-static int event_send_socket_proxy(
-       edg_wll_Context         ctx,
-       long                            filepos,
-       const char                 *ulm_data)
-{
-       edg_wll_PlainConnection conn;
-       struct sockaddr_un              saddr;
-       int                                             flags;
-       struct timeval                  timeout;
-
+       edg_wll_ResetError(ctx);
 
        timeout.tv_sec = EDG_WLL_LOG_TIMEOUT_MAX;
        timeout.tv_usec = 0;    
 
-       if ( (conn.sock = socket(PF_UNIX, SOCK_STREAM, 0)) < 0 )
-               edg_wll_SetError(ctx, errno, "socket()");
-
-       memset(&saddr, 0, sizeof(saddr));
-       saddr.sun_family = AF_UNIX;
-       strcpy(saddr.sun_path, lbproxy_ilog_socket_path);
-
-       if (   (flags = fcntl(conn.sock, F_GETFL, 0)) < 0
-               || fcntl(conn.sock, F_SETFL, flags | O_NONBLOCK) < 0) {
-               edg_wll_SetError(ctx, errno, "fcntl()");
+       jobid_s = edg_wlc_JobIdGetUnique(jobid);
+       if ( !jobid_s ) {
+               edg_wll_SetError(ctx, ENOMEM, "edg_wlc_JobIdGetUnique()");
                goto out;
        }
 
-       if ( connect(conn.sock, (struct sockaddr *)&saddr, sizeof(saddr)) < 0) {
-               if(errno != EISCONN) { edg_wll_SetError(ctx, errno, "connect()"); goto out; }
+       asprintf(&event_file, "%s.%s", lbproxy_ilog_file_prefix, jobid_s);
+       if ( !event_file ) {
+               edg_wll_SetError(ctx, ENOMEM, "asprintf()");
+               goto out;
        }
 
-       if (   edg_wll_plain_write_full(&conn, &filepos, sizeof(filepos), &timeout) < 0
-               || edg_wll_plain_write_full(&conn, (void*)ulm_data, strlen(ulm_data), &timeout) < 0 ) {
-               edg_wll_SetError(ctx, errno, "edg_wll_plain_write_full()");
+       if ( edg_wll_log_event_write(ctx, event_file, event,
+                                               FCNTL_ATTEMPTS, FCNTL_TIMEOUT, &filepos) ) {
+               edg_wll_UpdateError(ctx, 0, "edg_wll_log_event_write()");
                goto out;
        }
 
-out:
-       close(conn.sock);
-       return edg_wll_Error(ctx, NULL, NULL)?
-                       edg_wll_UpdateError(ctx, 0, "event_send_socket_proxy()"): 0;
-}
-
-
-int edg_wll_EventSendProxy(edg_wll_Context ctx, const edg_wlc_JobId jobid, const char *event)
-{
-       long    filepos;
-       char   *jobid_s, *event_file = NULL;
-
-
-       edg_wll_ResetError(ctx);
-       jobid_s = edg_wlc_JobIdGetUnique(jobid);
-       if ( !jobid_s ) { edg_wll_SetError(ctx, ENOMEM, "edg_wlc_JobIdGetUnique()"); goto out; }
-
-       asprintf(&event_file, "%s.%s", lbproxy_ilog_file_prefix, jobid_s);
-       if ( !event_file ) { edg_wll_SetError(ctx, ENOMEM, "asprintf()"); goto out; }
-
-       if (   event_save_to_file_proxy(ctx, event_file, event, &filepos)
-               || event_send_socket_proxy(ctx, filepos, event) ) goto out;
+       if ( edg_wll_log_event_send(ctx, lbproxy_ilog_socket_path, filepos,
+                                               event, strlen(event), 1, &timeout) ) {
+               edg_wll_UpdateError(ctx, 0, "edg_wll_log_event_send()");
+               goto out;
+       }
 
 out:
        if ( jobid_s ) free(jobid_s);