- forwarding events prom LB Proxy to LB server added
authorJiří Škrábal <nykolas@ics.muni.cz>
Thu, 9 Dec 2004 14:35:23 +0000 (14:35 +0000)
committerJiří Škrábal <nykolas@ics.muni.cz>
Thu, 9 Dec 2004 14:35:23 +0000 (14:35 +0000)
org.glite.lb.server/Makefile
org.glite.lb.server/src/db_store.c
org.glite.lb.server/src/il_lbproxy.c
org.glite.lb.server/src/il_lbproxy.h

index c4d2927..ac6cd76 100644 (file)
@@ -117,8 +117,8 @@ LB_PROXY_OBJS:= lbproxy.o il_lbproxy.o get_events.o index.o jobstat.o seqcode.o
        stored_master.o srv_purge.o server_state.o dump.o lb_authz.o load.o \
        notification.o il_notification.o notif_match.o
 
-SERVER_OBJS:= bkserverd.o get_events.o index.o jobstat.o jobstat_supp.o seqcode.o \
-       write2rgma.o lbs_db.o lb_html.o lb_http.o lb_proto.o lb_xml_parse.o \
+SERVER_OBJS:= bkserverd.o il_lbproxy.o get_events.o index.o jobstat.o jobstat_supp.o \
+       seqcode.o write2rgma.o lbs_db.o lb_html.o lb_http.o lb_proto.o lb_xml_parse.o \
        lb_xml_parse_V21.o \
        lock.o openserver.o query.o userjobs.o db_store.o request.o store.o \
        stored_master.o srv_purge.o server_state.o dump.o lb_authz.o load.o \
index b4a1c1c..8ff5fb5 100644 (file)
@@ -11,6 +11,7 @@
 #include "store.h"
 #include "lbs_db.h"
 #include "lock.h"
+#include "il_lbproxy.h"
 
 extern int debug;
 
@@ -65,13 +66,19 @@ db_store(edg_wll_Context ctx,char *ucs, char *event)
                        ev->changeACL.permission, ev->changeACL.permission_type,
                        ev->changeACL.operation);
   else
-    err = edg_wll_StepIntState(ctx,ev->any.jobId, ev, seq,&newstat);
+    err = edg_wll_StepIntState(ctx,ev->any.jobId, ev, seq, ctx->isProxy? NULL: &newstat);
 
   if (edg_wll_UnlockJob(ctx,ev->any.jobId)) goto err;
   if (err) goto err;
 
-  if (!ctx->isProxy && newstat.state) {
-         edg_wll_NotifMatch(ctx,&newstat);
+  if ( ctx->isProxy ) {
+       /*
+        *      send event to the proper BK server
+        */
+       if (   ev->any.type != EDG_WLL_EVENT_REGJOB
+               && edg_wll_EventSendProxy(ctx, ev->any.jobId, event) ) goto err;
+  } else if ( newstat.state ) {
+         edg_wll_NotifMatch(ctx, &newstat);
          edg_wll_FreeStatus(&newstat);
   }
 
index e4c228b..0d768f6 100644 (file)
@@ -16,6 +16,7 @@
 #include "glite/lb/events_parse.h"
 #include "glite/lb/escape.h"
 #include "glite/lb/log_proto.h"
+#include "glite/lb/lb_plain_io.h"
 
 #include "il_lbproxy.h"
 #include "lb_xml_parse.h"
@@ -39,85 +40,26 @@ char *lbproxy_ilog_file_prefix = FILE_PREFIX;
        }\
 }
 
-int edg_wll_SendEventProxy(edg_wll_Context ctx, const char *owner)
+static int event_save_to_file_proxy(
+       edg_wll_Context     ctx,
+       const char         *event_file,
+       const char         *ulm_data,
+       long               *filepos)
 {
-       return 0;
-}
-
-static
-int
-notif_create_ulm(
-       edg_wll_Context context,
-       edg_wll_NotifId reg_id,
-       const char      *host,
-       const uint16_t  port,
-       const char      *owner,
-       const char      *notif_data,
-       char            **ulm_data,
-       char            **reg_id_s)
-{
-       int             ret;
-       edg_wll_Event   *event=NULL;
-
-       *ulm_data = NULL;
-       *reg_id_s = NULL;
-
-       event = edg_wll_InitEvent(EDG_WLL_EVENT_NOTIFICATION);
-
-       gettimeofday(&event->any.timestamp,0);
-       if (context->p_host) event->any.host = strdup(context->p_host);
-       event->any.level = context->p_level;
-       event->any.source = context->p_source;
-       if (context->p_instance) event->notification.src_instance = strdup(context->p_instance);
-       event->notification.notifId = reg_id;
-       if (owner) event->notification.owner = strdup(owner);
-       if (host) event->notification.dest_host = strdup(host);
-       event->notification.dest_port = port;
-       if (notif_data) event->notification.jobstat = strdup(notif_data);
-
-       if ((*ulm_data = edg_wll_UnparseNotifEvent(context,event)) == NULL) {
-               edg_wll_SetError(context, ret = ENOMEM, "edg_wll_UnparseNotifEvent()"); 
-               goto out;
-       }
-
-       if((*reg_id_s = edg_wll_NotifIdGetUnique(reg_id)) == NULL) {
-               edg_wll_SetError(context, ret = ENOMEM, "edg_wll_NotifIdGetUnique()");
-               goto out;
-       }
-
-       ret = 0;
-
-out:
-       if(event) { 
-               edg_wll_FreeEvent(event);
-               free(event);
-       }
-       if(ret) edg_wll_UpdateError(context, ret, "notif_create_ulm()");
-       return(ret);
-}
+       FILE               *outfile;
+       struct flock    filelock;
+       int                             filedesc,
+                                       i, filelock_status=-1;
 
 
-static
-int
-notif_save_to_file(edg_wll_Context     context,
-                  const char         *event_file,
-                  const char         *ulm_data,
-                  long               *filepos)
-{
-       int ret;
-       FILE *outfile;
-       int filedesc;
-       struct flock filelock;
-       int i, filelock_status=-1;
-
-       for(i=0; i < FCNTL_ATTEMPTS; i++) {
+       for( i = 0; i < FCNTL_ATTEMPTS; i++ ) {
                /* fopen and properly handle the filelock */
-               if ((outfile = fopen(event_file,"a")) == NULL) {
-                       edg_wll_SetError(context, ret = errno, "fopen()");
+               if ( (outfile = fopen(event_file, "a")) == NULL ) {
+                       edg_wll_SetError(ctx, errno, "fopen()");
                        goto out;
                }
-               if ((filedesc = fileno(outfile)) == -1) {
-                       edg_wll_SetError(context, ret = errno, "fileno()");
+               if ( (filedesc = fileno(outfile)) == -1 ) {
+                       edg_wll_SetError(ctx, errno, "fileno()");
                        goto out1;
                }
                filelock.l_type = F_WRLCK;
@@ -135,7 +77,7 @@ notif_save_to_file(edg_wll_Context     context,
                                break;
                        default:
                                /* other error */
-                               edg_wll_SetError(context, ret=errno, "fcntl()");
+                               edg_wll_SetError(ctx, errno, "fcntl()");
                                goto out1;
                        }
                } else {
@@ -143,231 +85,84 @@ notif_save_to_file(edg_wll_Context     context,
                        break;
                }
        }
-       if (fseek(outfile, 0, SEEK_END) == -1) {
-               edg_wll_SetError(context, ret = errno, "fseek()");
-               goto out1;
-       }
-       if ((*filepos=ftell(outfile)) == -1) {
-               edg_wll_SetError(context, ret = errno, "ftell()");
-               goto out1;
-       }
-       /* write, flush and sync */
-       if (fputs(ulm_data, outfile) == EOF) {
-               edg_wll_SetError(context, ret = errno, "fputs()");
-               goto out1;
-       }       
-       if (fflush(outfile) == EOF) {
-               edg_wll_SetError(context, ret = errno, "fflush()");
-               goto out1;
-       }
-       if (fsync(filedesc) < 0) { /* synchronize */
-               edg_wll_SetError(context, ret = errno, "fsync()");
-               goto out1;
-       } 
+       if (fseek(outfile, 0, SEEK_END) == -1) { edg_wll_SetError(ctx, errno, "fseek()"); goto out1; }
+       if ((*filepos=ftell(outfile)) == -1) { edg_wll_SetError(ctx, errno, "ftell()"); goto out1; }
+       if (fputs(ulm_data, outfile) == EOF) { edg_wll_SetError(ctx, errno, "fputs()"); goto out1; }       
+       if (fflush(outfile) == EOF) { edg_wll_SetError(ctx, errno, "fflush()"); goto out1; }
+       if (fsync(filedesc) < 0) { edg_wll_SetError(ctx, errno, "fsync()"); goto out1; } 
 
-       ret = 0;
 out1:
-       /* close and unlock */
        fclose(outfile); 
-out:
-       if(ret) edg_wll_UpdateError(context, ret, "notif_save_to_file()");
-       return(ret);
-}
 
-
-static 
-ssize_t 
-socket_write_full(edg_wll_Context  context,
-                 int              sock,
-                 void            *buf,
-                 size_t           bufsize,
-                 struct timeval  *timeout,
-                 ssize_t         *total)
-{
-       int ret = 0;
-        ssize_t        len;
-
-        *total = 0;
-        while (bufsize > 0) {
-
-               fd_set  fds;
-               struct timeval  to,before,after;
-               
-               if (timeout) {
-                       memcpy(&to, timeout, sizeof(to));
-                       gettimeofday(&before, NULL);
-               }
-
-               len = write(sock, buf, bufsize);
-               while (len <= 0) {
-                       FD_ZERO(&fds);
-                       FD_SET(sock, &fds);
-                       if (select(sock+1, &fds, NULL, NULL, timeout ? &to : NULL) < 0) {
-                               edg_wll_SetError(context, ret = errno, "select()");
-                               goto out;
-                       }
-                       len = write(sock, buf, bufsize);
-               }
-               if (timeout) {
-                       gettimeofday(&after,NULL);
-                       tv_sub(after, before);
-                       tv_sub(*timeout,after);
-                       if (timeout->tv_sec < 0) {
-                               timeout->tv_sec = 0;
-                               timeout->tv_usec = 0;
-                       }
-               }
-               
-                if (len < 0) {
-                       edg_wll_SetError(context, ret = errno, "write()");
-                       goto out;
-               }
-
-               bufsize -= len;
-               buf += len;
-                *total += len;
-        }
-
-       ret = 0;
 out:
-       if(ret) edg_wll_UpdateError(context, ret, "socket_write_full()");
-        return ret;
+       return edg_wll_Error(ctx, NULL, NULL)?
+                       edg_wll_UpdateError(ctx, 0, "event_save_to_file_proxy()"): 0;
 }
 
 
-static 
-int
-notif_send_socket(edg_wll_Context       context,
-                 long                  filepos,
-                 const char           *ulm_data)
+static int event_send_socket_proxy(
+       edg_wll_Context         ctx,
+       long                            filepos,
+       const char                 *ulm_data)
 {
-       int ret;
-       struct sockaddr_un saddr;
-       int msg_sock, flags;
-       size_t count;
-       struct timeval timeout;
+       edg_wll_PlainConnection conn;
+       struct sockaddr_un              saddr;
+       int                                             flags;
+       struct timeval                  timeout;
+
 
        timeout.tv_sec = EDG_WLL_LOG_TIMEOUT_MAX;
-        timeout.tv_usec = 0;   
+       timeout.tv_usec = 0;    
 
-       msg_sock = socket(PF_UNIX, SOCK_STREAM, 0);
-       if(msg_sock < 0) {
-               edg_wll_SetError(context, ret = errno, "socket()");
-               goto out;
-       }
+       if ( (conn.sock = socket(PF_UNIX, SOCK_STREAM, 0)) < 0 )
+               edg_wll_SetError(ctx, errno, "socket()");
 
        memset(&saddr, 0, sizeof(saddr));
        saddr.sun_family = AF_UNIX;
        strcpy(saddr.sun_path, lbproxy_ilog_socket_path);
 
-       if ((flags = fcntl(msg_sock, F_GETFL, 0)) < 0 ||
-           fcntl(msg_sock, F_SETFL, flags | O_NONBLOCK) < 0) {
-               edg_wll_SetError(context, ret = errno, "fcntl()");
-               goto out1;
-       }
-
-       if(connect(msg_sock, (struct sockaddr *)&saddr, sizeof(saddr)) < 0) {
-               if(errno != EISCONN) {
-                       edg_wll_SetError(context, ret = errno, "connect()");
-                       goto out1;
-               }
+       if (   (flags = fcntl(conn.sock, F_GETFL, 0)) < 0
+               || fcntl(conn.sock, F_SETFL, flags | O_NONBLOCK) < 0) {
+               edg_wll_SetError(ctx, errno, "fcntl()");
+               goto out;
        }
 
-       if (socket_write_full(context, msg_sock, &filepos, sizeof(filepos), &timeout, &count) < 0) {
-               ret = errno; 
-               goto out1;
+       if ( connect(conn.sock, (struct sockaddr *)&saddr, sizeof(saddr)) < 0) {
+               if(errno != EISCONN) { edg_wll_SetError(ctx, errno, "connect()"); goto out; }
        }
 
-       if (socket_write_full(context, msg_sock, (void*)ulm_data, strlen(ulm_data), &timeout, &count) < 0) {
-               ret = errno; 
-               goto out1;
+       if (   edg_wll_plain_write_full(&conn, &filepos, sizeof(filepos), &timeout) < 0
+               || edg_wll_plain_write_full(&conn, (void*)ulm_data, strlen(ulm_data), &timeout) < 0 ) {
+               edg_wll_SetError(ctx, errno, "edg_wll_plain_write_full()");
+               goto out;
        }
 
-       ret = 0;
-
-out1:
-       close(msg_sock);
 out:
-       if(ret) edg_wll_UpdateError(context, ret, "notif_send_socket()");
-       return(ret);
+       close(conn.sock);
+       return edg_wll_Error(ctx, NULL, NULL)?
+                       edg_wll_UpdateError(ctx, 0, "event_send_socket_proxy()"): 0;
 }
 
 
-int
-edg_wll_NotifSend_a(edg_wll_Context       context,
-                 edg_wll_NotifId       reg_id,
-                 const char           *host,
-                  int                   port,
-                 const char           *owner,
-                  const char           *notif_data)
+int edg_wll_EventSendProxy(edg_wll_Context ctx, const edg_wlc_JobId jobid, const char *event)
 {
-       int ret;
-       long filepos;
-       char *ulm_data, *reg_id_s, *event_file;
+       long    filepos;
+       char   *jobid_s, *event_file = NULL;
 
-       if((ret=notif_create_ulm(context, 
-                                reg_id, 
-                                host, 
-                                port, 
-                                owner, 
-                                notif_data,
-                                &ulm_data,
-                                &reg_id_s))) {
-               goto out;
-       }
 
-       asprintf(&event_file, "%s.%s", lbproxy_ilog_file_prefix, reg_id_s);
-       if(event_file == NULL) {
-               edg_wll_SetError(context, ret=ENOMEM, "asprintf()");
-               goto out;
-       }
+       edg_wll_ResetError(ctx);
+       jobid_s = edg_wlc_JobIdGetUnique(jobid);
+       if ( !jobid_s ) { edg_wll_SetError(ctx, ENOMEM, "edg_wlc_JobIdGetUnique()"); goto out; }
 
-       if((ret=notif_save_to_file(context,
-                                  event_file,
-                                  ulm_data,
-                                  &filepos))) {
-               goto out;
-       }
+       asprintf(&event_file, "%s.%s", lbproxy_ilog_file_prefix, jobid_s);
+       if ( !event_file ) { edg_wll_SetError(ctx, ENOMEM, "asprintf()"); goto out; }
 
-       if((ret=notif_send_socket(context,
-                                 filepos,
-                                 ulm_data))) {
-               goto out;
-       }
-       ret = 0;
+       if (   event_save_to_file_proxy(ctx, event_file, event, &filepos)
+               || event_send_socket_proxy(ctx, filepos, event) ) goto out;
 
 out:
-       if(ulm_data) free(ulm_data);
-       if(reg_id_s) free(reg_id_s);
-       if(ret) edg_wll_UpdateError(context, ret, "edg_wll_NotifSend()");
-       return(ret);
-}
-
+       if ( jobid_s ) free(jobid_s);
+       if ( event_file ) free(event_file);
 
-int
-edg_wll_NotifJobStatus_a(edg_wll_Context       context,
-                      edg_wll_NotifId  reg_id,
-                      const char      *host,
-                       int              port,
-                      const char      *owner,
-                      const edg_wll_JobStat notif_job_stat)
-{
-       int ret=0;
-       char   *xml_data, *xml_esc_data=NULL;
-
-       if(edg_wll_JobStatusToXML(context, notif_job_stat, &xml_data)) 
-               goto out;
-       
-       if((xml_esc_data = edg_wll_EscapeXML(xml_data)) == NULL) {
-               edg_wll_SetError(context, ret=ENOMEM, "edg_wll_EscapeXML()");
-               goto out;
-       }
-
-       ret=edg_wll_NotifSend(context, reg_id, host, port, owner, xml_esc_data);
-
-out:
-       if(xml_data) free(xml_data);
-       if(xml_esc_data) free(xml_esc_data);
-       if(ret) edg_wll_UpdateError(context, ret, "edg_wll_NotifJobStatus()");
-       return(ret);
+       return  edg_wll_Error(ctx, NULL, NULL)?  edg_wll_UpdateError(ctx, 0, "edg_wll_EventSendProxy()"): 0;
 }
index 1f586e6..4107304 100644 (file)
@@ -8,9 +8,7 @@
 extern char *lbproxy_ilog_socket_path;
 extern char *lbproxy_ilog_file_prefix;
 
-int edg_wll_SendEventProxy(
-               edg_wll_Context         context,
-               const char                 *owner);
+int edg_wll_EventSendProxy(edg_wll_Context ctx, const edg_wlc_JobId jobid, const char *event);
 
 #ifdef __cplusplus
 }