From: Jiří Škrábal Date: Thu, 3 Mar 2005 13:07:09 +0000 (+0000) Subject: - using functions edg_wll_log_event_write() and edg_wll_log_event_send() calls X-Git-Tag: glite-deployment-lb_R_1_1_1~33 X-Git-Url: http://scientific.zcu.cz/git/?a=commitdiff_plain;h=89730d27f13b0380dcdfb55fbf6007b5adc35d19;p=jra1mw.git - using functions edg_wll_log_event_write() and edg_wll_log_event_send() calls from common library --- diff --git a/org.glite.lb.logger/src/logd_proto.c b/org.glite.lb.logger/src/logd_proto.c index 463266a..6693ea0 100644 --- a/org.glite.lb.logger/src/logd_proto.c +++ b/org.glite.lb.logger/src/logd_proto.c @@ -219,79 +219,6 @@ int do_listen(int port) return sock; } -/*! - *---------------------------------------------------------------------- - * Write to socket - * Needn't write entire buffer. Timeout is applicable only for non-blocking - * connections - * \param sock IN: connection to work with - * \param buf IN: buffer - * \param bufsize IN: max size to write - * \param timeout INOUT: max time allowed for operation, remaining time on return - * \retval bytes written (>0) on success - * \retval -1 on write error - *---------------------------------------------------------------------- - */ -static ssize_t edg_wll_socket_write(int sock,const void *buf,size_t bufsize,struct timeval *timeout) -{ - ssize_t len = 0, ret = 0; - fd_set fds; - struct timeval to,before,after; - - if (timeout) { - memcpy(&to,timeout,sizeof to); - gettimeofday(&before,NULL); - } - len = write(sock,buf,bufsize); - while (len <= 0) { - FD_ZERO(&fds); - FD_SET(sock,&fds); - if ((ret=select(sock+1,&fds,NULL,NULL,timeout?&to:NULL)) < 0) { - edg_wll_ll_log(LOG_ERR,"edg_wll_socket_write(): error selecting socket\n"); - SYSTEM_ERROR("select"); - break; - } - len = write(sock,buf,bufsize); - } - if (timeout) { - gettimeofday(&after,NULL); - tv_sub(after,before); - tv_sub(*timeout,after); - if (timeout->tv_sec < 0) { - timeout->tv_sec = 0; - timeout->tv_usec = 0; - } - } - return len; -} - -/*! - *---------------------------------------------------------------------- - * Write specified amount of data to socket - * Attempts to call edg_wll_socket_write() untill the entire request is satisfied - * (or times out). - * \param sock IN: connection to work with - * \param buf IN: buffer - * \param bufsize IN: max size to write - * \param timeout INOUT: max time allowed for operation, remaining time on return - * \param total OUT: bytes actually written - * \retval bytes written (>0) on success - * \retval -1 on write error - *---------------------------------------------------------------------- - */ -static ssize_t edg_wll_socket_write_full(int sock,void *buf,size_t bufsize,struct timeval *timeout,ssize_t *total) -{ - ssize_t len; - *total = 0; - - while (*total < bufsize) { - len = edg_wll_socket_write(sock,buf+*total,bufsize-*total,timeout); - if (len < 0) return len; - *total += len; - } - return 0; -} - /* *---------------------------------------------------------------------- * @@ -545,82 +472,16 @@ int edg_wll_log_proto_server(edg_wll_GssConnection *con, char *name, char *prefi i = 0; open_event_file: + /* fopen and properly handle the filelock */ - if ((outfile = fopen(outfilename,"a")) == NULL) { - edg_wll_ll_log(LOG_INFO,"error.\n"); - SYSTEM_ERROR("fopen"); - answer = errno; - goto edg_wll_log_proto_server_end; - } else { - edg_wll_ll_log(LOG_INFO,"."); - } - if ((filedesc = fileno(outfile)) == -1) { - edg_wll_ll_log(LOG_INFO,"error.\n"); - SYSTEM_ERROR("fileno"); - answer = errno; - fclose(outfile); - goto edg_wll_log_proto_server_end; - } else { - edg_wll_ll_log(LOG_INFO,"."); - } - filelock.l_type = F_WRLCK; - filelock.l_whence = SEEK_SET; - filelock.l_start = 0; - filelock.l_len = 0; - if ((filelock_status=fcntl(filedesc,F_SETLK,&filelock) < 0) && (i < FCNTL_ATTEMPTS)) { - fclose(outfile); - edg_wll_ll_log(LOG_DEBUG,"\nWaiting %d seconds for filelock to open...\n",FCNTL_TIMEOUT); - sleep(FCNTL_TIMEOUT); - i++; - goto open_event_file; - } - if (filelock_status < 0) { - edg_wll_ll_log(LOG_INFO,"error.\n"); - SYSTEM_ERROR("fcntl"); - answer = errno; - fclose(outfile); - goto edg_wll_log_proto_server_end; - } else { - edg_wll_ll_log(LOG_INFO,"."); - } - if (fseek(outfile, 0, SEEK_END) == -1) { - SYSTEM_ERROR("fseek"); - answer = errno; - fclose(outfile); - goto edg_wll_log_proto_server_end; - } - if ((filepos=ftell(outfile)) == -1) { - SYSTEM_ERROR("ftell"); - answer = errno; - fclose(outfile); - goto edg_wll_log_proto_server_end; - } - /* write, flush and sync */ - if (fputs(msg,outfile) == EOF) { - edg_wll_ll_log(LOG_INFO,"error.\n"); - SYSTEM_ERROR("fputs"); - answer = errno; - fclose(outfile); - goto edg_wll_log_proto_server_end; - } - if (fflush(outfile) == EOF) { - edg_wll_ll_log(LOG_INFO,"error.\n"); - SYSTEM_ERROR("fflush"); - answer = errno; - fclose(outfile); - goto edg_wll_log_proto_server_end; - } - if (fsync(filedesc) < 0) { /* synchronize */ + if ( edg_wll_log_event_write(context, outfilename, msg, FCNTL_ATTEMPTS, FCNTL_TIMEOUT, &filepos) ) { + char *errd; + answer = edg_wll_Error(context, NULL, &errd); edg_wll_ll_log(LOG_INFO,"error.\n"); - SYSTEM_ERROR("fsync"); - answer = errno; - fclose(outfile); + SYSTEM_ERROR(errd); + free(errd); goto edg_wll_log_proto_server_end; - } else { - edg_wll_ll_log(LOG_INFO,"o.k.\n"); - } - /* close and unlock */ - fclose(outfile); + } else edg_wll_ll_log(LOG_INFO,"o.k."); } else { filepos = 0; } @@ -635,42 +496,11 @@ open_event_file: /* send message via IPC (UNIX socket) */ if (!noipc) { - struct sockaddr_un saddr; - edg_wll_ll_log(LOG_INFO,"The message will be send via IPC (UNIX socket):\n"); - - /* initialize socket */ - edg_wll_ll_log(LOG_DEBUG,"Initializing UNIX socket...\n"); - - edg_wll_ll_log(LOG_DEBUG,"- Getting UNIX socket descriptor..."); - msg_sock = socket(PF_UNIX, SOCK_STREAM, 0); - if(msg_sock < 0) { - edg_wll_ll_log(LOG_DEBUG,"error.\n"); - SYSTEM_ERROR("socket"); - answer = errno; - goto edg_wll_log_proto_server_end; - } else { - edg_wll_ll_log(LOG_DEBUG,"o.k.\n"); - } + edg_wll_ll_log(LOG_DEBUG, + "Sending via IPC (UNIX socket \"%s\")\n\t" + "the message position %ld (%d bytes)...", + outfilename, filepos, sizeof(filepos)); - edg_wll_ll_log(LOG_DEBUG,"- Setting UNIX socket parameters..."); - memset(&saddr, 0, sizeof(saddr)); - saddr.sun_family = AF_UNIX; - strcpy(saddr.sun_path, socket_path); - edg_wll_ll_log(LOG_DEBUG,"o.k.\n"); - - edg_wll_ll_log(LOG_DEBUG,"-- adding O_NONBLOCK to socket parameters..."); - if ((flags = fcntl(msg_sock, F_GETFL, 0)) < 0 || - fcntl(msg_sock, F_SETFL, flags | O_NONBLOCK) < 0) { - edg_wll_ll_log(LOG_DEBUG,"error.\n"); - SYSTEM_ERROR("fcntl"); - answer = errno; - close(msg_sock); - goto edg_wll_log_proto_server_end; - } else { - edg_wll_ll_log(LOG_DEBUG,"o.k.\n"); - } - - /* for priority messages initialize also another socket for confirmation */ if (event->any.priority) { edg_wll_ll_log(LOG_DEBUG,"- Initializing 2nd UNIX socket for priority messages confirmation..."); if(init_confirmation() < 0) { @@ -682,54 +512,14 @@ open_event_file: } } - edg_wll_ll_log(LOG_DEBUG,"Connecting to UNIX socket..."); - for (i=0; i < CONNECT_ATTEMPTS; i++) { - if(connect(msg_sock, (struct sockaddr *)&saddr, sizeof(saddr)) < 0) { - if ((errno == EAGAIN) || (errno == ETIMEDOUT)) { - edg_wll_ll_log(LOG_DEBUG,"."); - sleep(CONNECT_TIMEOUT); - continue; - } else if (errno == EISCONN) { - edg_wll_ll_log(LOG_DEBUG,"warning.\n"); - edg_wll_ll_log(LOG_ERR,"The socket is already connected!\n"); - break; - } else { - edg_wll_ll_log(LOG_DEBUG,"error.\n"); - SYSTEM_ERROR("connect"); - answer = errno; - close(msg_sock); - goto edg_wll_log_proto_server_end_1; - } - } else { - edg_wll_ll_log(LOG_DEBUG,"o.k.\n"); - break; - } - } - - edg_wll_ll_log(LOG_DEBUG,"Sending via IPC the message position %ld (%d bytes)...", filepos, sizeof(filepos)); - count = 0; - if (edg_wll_socket_write_full(msg_sock, &filepos, sizeof(filepos), &timeout, &count) < 0) { - edg_wll_ll_log(LOG_DEBUG,"error.\n"); - edg_wll_ll_log(LOG_ERR,"edg_wll_socket_write_full(): error,\n"); - answer = errno; - close(msg_sock); - goto edg_wll_log_proto_server_end_1; - } else { - edg_wll_ll_log(LOG_DEBUG,"o.k.\n"); - } - - edg_wll_ll_log(LOG_DEBUG,"Sending via IPC the message itself (%d bytes)...",msg_size); - if (edg_wll_socket_write_full(msg_sock, msg, msg_size, &timeout, &count) < 0) { - edg_wll_ll_log(LOG_DEBUG,"error.\n"); - edg_wll_ll_log(LOG_ERR,"edg_wll_socket_write_full(): error."); - answer = errno; - close(msg_sock); + if ( edg_wll_log_event_send(context, socket_path, filepos, msg, msg_size, CONNECT_ATTEMPTS, &timeout) ) { + char *errd; + answer = edg_wll_Error(context, NULL, &errd); + edg_wll_ll_log(LOG_INFO,"error.\n"); + SYSTEM_ERROR(errd); + free(errd); goto edg_wll_log_proto_server_end_1; - } else { - edg_wll_ll_log(LOG_DEBUG,"o.k.\n"); - } - - close(msg_sock); + } else edg_wll_ll_log(LOG_INFO,"o.k."); if (event->any.priority) { edg_wll_ll_log(LOG_INFO,"Waiting for confirmation..."); diff --git a/org.glite.lb.server/src/il_lbproxy.c b/org.glite.lb.server/src/il_lbproxy.c index 0d768f6..10775ee 100644 --- a/org.glite.lb.server/src/il_lbproxy.c +++ b/org.glite.lb.server/src/il_lbproxy.c @@ -1,164 +1,57 @@ -/** - * il_lbproxy.c - * - implementation of IL API calls for LB proxy - * - */ -#include -#include -#include -#include -#include -#include -#include -#include - #include "glite/lb/context-int.h" -#include "glite/lb/events_parse.h" -#include "glite/lb/escape.h" #include "glite/lb/log_proto.h" -#include "glite/lb/lb_plain_io.h" #include "il_lbproxy.h" -#include "lb_xml_parse.h" - - #define FCNTL_ATTEMPTS 5 #define FCNTL_TIMEOUT 1 -#define FILE_PREFIX "/tmp/lbproxy_events" -#define DEFAULT_SOCKET "/tmp/lbproxy_interlogger.sock" +#define FILE_PREFIX "/tmp/lbproxy_events" +#define DEFAULT_SOCKET "/tmp/lbproxy_interlogger.sock" char *lbproxy_ilog_socket_path = DEFAULT_SOCKET; char *lbproxy_ilog_file_prefix = FILE_PREFIX; -#define tv_sub(a,b) {\ - (a).tv_usec -= (b).tv_usec;\ - (a).tv_sec -= (b).tv_sec;\ - if ((a).tv_usec < 0) {\ - (a).tv_sec--;\ - (a).tv_usec += 1000000;\ - }\ -} -static int event_save_to_file_proxy( - edg_wll_Context ctx, - const char *event_file, - const char *ulm_data, - long *filepos) +int +edg_wll_EventSendProxy( + edg_wll_Context ctx, + const edg_wlc_JobId jobid, + const char *event) { - FILE *outfile; - struct flock filelock; - int filedesc, - i, filelock_status=-1; + struct timeval timeout; + long filepos; + char *jobid_s, + *event_file = NULL; - for( i = 0; i < FCNTL_ATTEMPTS; i++ ) { - /* fopen and properly handle the filelock */ - if ( (outfile = fopen(event_file, "a")) == NULL ) { - edg_wll_SetError(ctx, errno, "fopen()"); - goto out; - } - if ( (filedesc = fileno(outfile)) == -1 ) { - edg_wll_SetError(ctx, errno, "fileno()"); - goto out1; - } - filelock.l_type = F_WRLCK; - filelock.l_whence = SEEK_SET; - filelock.l_start = 0; - filelock.l_len = 0; - filelock_status=fcntl(filedesc, F_SETLK, &filelock); - if(filelock_status < 0) { - switch(errno) { - case EAGAIN: - case EACCES: - case EINTR: - /* lock is held by someone else */ - sleep(FCNTL_TIMEOUT); - break; - default: - /* other error */ - edg_wll_SetError(ctx, errno, "fcntl()"); - goto out1; - } - } else { - /* lock acquired, break out of the loop */ - break; - } - } - if (fseek(outfile, 0, SEEK_END) == -1) { edg_wll_SetError(ctx, errno, "fseek()"); goto out1; } - if ((*filepos=ftell(outfile)) == -1) { edg_wll_SetError(ctx, errno, "ftell()"); goto out1; } - if (fputs(ulm_data, outfile) == EOF) { edg_wll_SetError(ctx, errno, "fputs()"); goto out1; } - if (fflush(outfile) == EOF) { edg_wll_SetError(ctx, errno, "fflush()"); goto out1; } - if (fsync(filedesc) < 0) { edg_wll_SetError(ctx, errno, "fsync()"); goto out1; } - -out1: - fclose(outfile); - -out: - return edg_wll_Error(ctx, NULL, NULL)? - edg_wll_UpdateError(ctx, 0, "event_save_to_file_proxy()"): 0; -} - - -static int event_send_socket_proxy( - edg_wll_Context ctx, - long filepos, - const char *ulm_data) -{ - edg_wll_PlainConnection conn; - struct sockaddr_un saddr; - int flags; - struct timeval timeout; - + edg_wll_ResetError(ctx); timeout.tv_sec = EDG_WLL_LOG_TIMEOUT_MAX; timeout.tv_usec = 0; - if ( (conn.sock = socket(PF_UNIX, SOCK_STREAM, 0)) < 0 ) - edg_wll_SetError(ctx, errno, "socket()"); - - memset(&saddr, 0, sizeof(saddr)); - saddr.sun_family = AF_UNIX; - strcpy(saddr.sun_path, lbproxy_ilog_socket_path); - - if ( (flags = fcntl(conn.sock, F_GETFL, 0)) < 0 - || fcntl(conn.sock, F_SETFL, flags | O_NONBLOCK) < 0) { - edg_wll_SetError(ctx, errno, "fcntl()"); + jobid_s = edg_wlc_JobIdGetUnique(jobid); + if ( !jobid_s ) { + edg_wll_SetError(ctx, ENOMEM, "edg_wlc_JobIdGetUnique()"); goto out; } - if ( connect(conn.sock, (struct sockaddr *)&saddr, sizeof(saddr)) < 0) { - if(errno != EISCONN) { edg_wll_SetError(ctx, errno, "connect()"); goto out; } + asprintf(&event_file, "%s.%s", lbproxy_ilog_file_prefix, jobid_s); + if ( !event_file ) { + edg_wll_SetError(ctx, ENOMEM, "asprintf()"); + goto out; } - if ( edg_wll_plain_write_full(&conn, &filepos, sizeof(filepos), &timeout) < 0 - || edg_wll_plain_write_full(&conn, (void*)ulm_data, strlen(ulm_data), &timeout) < 0 ) { - edg_wll_SetError(ctx, errno, "edg_wll_plain_write_full()"); + if ( edg_wll_log_event_write(ctx, event_file, event, + FCNTL_ATTEMPTS, FCNTL_TIMEOUT, &filepos) ) { + edg_wll_UpdateError(ctx, 0, "edg_wll_log_event_write()"); goto out; } -out: - close(conn.sock); - return edg_wll_Error(ctx, NULL, NULL)? - edg_wll_UpdateError(ctx, 0, "event_send_socket_proxy()"): 0; -} - - -int edg_wll_EventSendProxy(edg_wll_Context ctx, const edg_wlc_JobId jobid, const char *event) -{ - long filepos; - char *jobid_s, *event_file = NULL; - - - edg_wll_ResetError(ctx); - jobid_s = edg_wlc_JobIdGetUnique(jobid); - if ( !jobid_s ) { edg_wll_SetError(ctx, ENOMEM, "edg_wlc_JobIdGetUnique()"); goto out; } - - asprintf(&event_file, "%s.%s", lbproxy_ilog_file_prefix, jobid_s); - if ( !event_file ) { edg_wll_SetError(ctx, ENOMEM, "asprintf()"); goto out; } - - if ( event_save_to_file_proxy(ctx, event_file, event, &filepos) - || event_send_socket_proxy(ctx, filepos, event) ) goto out; + if ( edg_wll_log_event_send(ctx, lbproxy_ilog_socket_path, filepos, + event, strlen(event), 1, &timeout) ) { + edg_wll_UpdateError(ctx, 0, "edg_wll_log_event_send()"); + goto out; + } out: if ( jobid_s ) free(jobid_s);