Adapt RGMA export file format and file handling to lcg-mon-job-status
authorZdeněk Salvet <salvet@ics.muni.cz>
Fri, 17 Mar 2006 11:47:02 +0000 (11:47 +0000)
committerZdeněk Salvet <salvet@ics.muni.cz>
Fri, 17 Mar 2006 11:47:02 +0000 (11:47 +0000)
use. Socket notification code is still available and is nonblocking now.
C ClassAd API is used to retrieve VO from JDL.

org.glite.lb.server/Makefile
org.glite.lb.server/project/configure.properties.xml
org.glite.lb.server/src/write2rgma.c

index b81524e..f9c15fa 100644 (file)
@@ -17,6 +17,7 @@ thrflavour=gcc32pthr
 expat_prefix=/opt/expat
 cares_prefix=/opt/c-ares
 gsoap_prefix=/opt/gsoap
+classads_prefix=/opt/classads
 
 -include Makefile.inc
 
@@ -61,6 +62,7 @@ CFLAGS:= \
        -I${expat_prefix}/include \
        -I${cares_prefix}/include \
        -I${gsoap_prefix}/include -I${gsoap_prefix}/ \
+       -I${classads_prefix}/include \
        ${COVERAGE_FLAGS} \
        -I${mysql_prefix}/include -I${mysql_prefix}/include/mysql \
        -I${globus_prefix}/include/${nothrflavour} \
@@ -117,9 +119,14 @@ endif
 #      -lvomsc${vomsflavour} \
 #      ${GLOBUS_LIBS}
 
+ifneq (${classads_prefix},/usr)
+       classadslib := -L${classads_prefix}/lib -lclassad
+endif
+
 EXT_LIBS:=  \
        ${mysqlib} -lmysqlclient -lz\
        ${GRIDSITE_LIBS} \
+       ${classadslib} \
        -lvomsc${vomsflavour}
 
 SRVBONES_LIB:= -L${stagedir}/lib -lglite_lb_server_bones
@@ -191,7 +198,7 @@ LIB_OBJS_BK:= \
 STATIC_LIB_BK:=libglite_lb_bkserver.a
 
 glite_lb_bkserverd: ${NSMAP} ${BKSERVER_OBJS}
-       ${LINK} -o $@ ${BKSERVER_OBJS} ${BKSERVER_LIBS}
+       ${LINKXX} -o $@ ${BKSERVER_OBJS} ${BKSERVER_LIBS}
 
 glite_lb_bkindex: ${INDEX_OBJS}
        ${LINK} -o $@ ${INDEX_OBJS} ${INDEX_LIBS}
index 6a1250a..5a2df46 100644 (file)
@@ -20,6 +20,9 @@
 
        Revision history:
        $Log$
+       Revision 1.10  2006/03/15 18:13:51  akrenek
+       cares
+       
        Revision 1.9  2006/03/15 18:12:21  akrenek
        merge 1.5
        
@@ -101,6 +104,7 @@ mysql_version=${ext.mysql.version}
 cppunit=${with.cppunit.prefix}
 gsoap_prefix=${with.gsoap.prefix}
 gsoap_version=${ext.gsoap.version}
+classads_prefix=${with.classads.prefix}
                        </echo>
            </target>
        </project>
index 2d291fb..4dd8065 100755 (executable)
@@ -11,6 +11,8 @@
 #include <sys/uio.h>
 
 
+#include <cclassad.h>
+
 #include "glite/lb/trio.h"
 #include "glite/lb/producer.h"
 #include "glite/lb/context-int.h"
 #include "jobstat.h"
 
 static int     rgma_fd = -1;
-static int     rgma_sock = -1;
-static struct sockaddr_un rgma_saddr;
 
-static void write2rgma_sql(char *sqlstat)
+static void write2rgma_line(char *line, int old_style)
 {
        
-       char *rgma_fname;
        int slen;
        int sysret;
        struct iovec iov[2];
 
-       assert(sqlstat != NULL);
+       static int      rgma_sock = -1;
+       static char    *rgma_fname = NULL;
+       static struct sockaddr_un rgma_saddr;
+
+       static int      rgma_fd_locked = 0;
+        struct stat    stat_buf,stat_fbuf;
+        struct flock   filelock;
+        int            filelock_status;
+
+       assert(line != NULL);
 
+       if (rgma_fd < -1) return;
        if (rgma_fd == -1) {
-               rgma_fname = getenv("EDG_WL_RGMA_FILE");
-               if (rgma_fname == NULL) return;
+               rgma_fname = getenv("GLITE_WMS_LCGMON_FILE");
+               if (rgma_fname == NULL) {
+                       rgma_fname = getenv("EDG_WL_RGMA_FILE");
+                       if (rgma_fname == NULL) {
+                               rgma_fd = -2; 
+                               return;
+                       }
+               }
 
                rgma_fd = open(rgma_fname, O_WRONLY|O_CREAT|O_APPEND, 0777);
                if (rgma_fd == -1) return;
        }
 
-       slen = strlen(sqlstat);
-       iov[0].iov_base = &slen;
-       iov[0].iov_len = sizeof(slen);
-       iov[1].iov_base = sqlstat;
-       iov[1].iov_len = slen + 1;
-       if ((sysret = flock(rgma_fd, LOCK_SH)) != -1) {
-               sysret = writev(rgma_fd, iov, 2);
-               flock(rgma_fd, LOCK_UN);
+
+       slen = strlen(line);
+       if (old_style) {
+               iov[0].iov_base = &slen;
+               iov[0].iov_len = sizeof(slen);
+               iov[1].iov_base = line;
+               iov[1].iov_len = slen + 1;
+       } else {
+               iov[0].iov_base = line;
+               iov[0].iov_len = slen;
        }
        
-       if (sysret == -1) return;
+       if (old_style) {
+
+               if ((sysret = flock(rgma_fd, LOCK_SH)) != -1) {
+                       sysret = writev(rgma_fd, iov, 2);
+                       flock(rgma_fd, LOCK_UN);
+               }
+               if (sysret == -1) return;
+       } else {
+               if (!rgma_fd_locked) {
+                   do {
+                     filelock.l_type = F_WRLCK;
+                     filelock.l_whence = SEEK_SET;
+                     filelock.l_start = 0;
+                     filelock.l_len = 0;
+                   } while((filelock_status = fcntl(rgma_fd,F_SETLKW,&filelock))<0 && errno==EINTR);
+                   if (filelock_status<0) goto leave;
+               }
+               rgma_fd_locked = 1;
+               if (fstat(rgma_fd, &stat_fbuf)<0 || stat(rgma_fname, &stat_buf)<0)
+                       goto leave;
+               if (stat_fbuf.st_dev != stat_buf.st_dev || stat_fbuf.st_ino != stat_buf.st_ino)
+                       goto leave;
+                if (writev(rgma_fd, iov, 1) < 0) 
+                       goto leave;
+               fsync(rgma_fd);
+               do {
+                       filelock.l_type = F_UNLCK;
+                       filelock.l_whence = SEEK_SET;
+                       filelock.l_start = 0;
+                       filelock.l_len = 0;
+               } while((filelock_status = fcntl(rgma_fd,F_SETLKW,&filelock))<0 && errno==EINTR);
+               if (filelock_status < 0) goto leave;
+               rgma_fd_locked = 0;
+       leave:  
+               /* add explicit unlock, incase the later close should fail for some reason */
+               if (rgma_fd_locked) {
+                               do {
+                               filelock.l_type = F_UNLCK;
+                               filelock.l_whence = SEEK_SET;
+                               filelock.l_start = 0;
+                               filelock.l_len = 0;
+                       } while((filelock_status = fcntl(rgma_fd,F_SETLKW,&filelock))<0 && errno==EINTR);
+                       if (filelock_status == 0) {
+                               rgma_fd_locked = 0;
+                       }
+               }
+               /* close the statefile */
+               if (close(rgma_fd)==0) {
+                       rgma_fd = -1;
+                       rgma_fd_locked = 0;
+               }
+
+       }
+
+       if (rgma_sock < -1) return;
 
        if (rgma_sock == -1) {
                rgma_fname = getenv("EDG_WL_RGMA_SOCK");
-               if (rgma_fname == NULL) return;
-
-               if ((strlen(rgma_fname) + 1) > sizeof(rgma_saddr.sun_path))
-                       return ;
+               if (rgma_fname == NULL 
+                   || (strlen(rgma_fname) + 1) > sizeof(rgma_saddr.sun_path) ) {
+                       rgma_sock = -2;
+                       return;
+               }
 
                rgma_sock = socket(PF_UNIX, SOCK_DGRAM,0);
                if (rgma_sock == -1) return;
@@ -71,7 +143,7 @@ static void write2rgma_sql(char *sqlstat)
                strcpy(rgma_saddr.sun_path, rgma_fname);
        }
 
-       sendto(rgma_sock, &slen, 1, 0,
+       sendto(rgma_sock, &slen, 1, MSG_DONTWAIT,
                (struct sockaddr*) &rgma_saddr, SUN_LEN(&rgma_saddr));
 
        return;
@@ -81,17 +153,60 @@ void write2rgma_status(edg_wll_JobStat *stat)
 {
        char *stmt = NULL;
        char *string_jobid, *string_stat, *string_server;
+       char *string_vo = NULL;
+       int lcgmon = 0;
+
+       if (rgma_fd < -1) return;
+       if (getenv("GLITE_WMS_LCGMON_FILE")) lcgmon = 1;
 
        string_jobid = edg_wlc_JobIdUnparse(stat->jobId);
        string_stat = edg_wll_StatToString(stat->state);
        string_server = edg_wlc_JobIdGetServer(stat->jobId);
 
-       trio_asprintf(&stmt, "INSERT INTO JobStatusRaw VALUES ('%|Ss','%s','%|Ss','%|Ss', '%d%03d')",
-                       string_jobid, string_stat, stat->owner, string_server,
-                       stat->stateEnterTime.tv_sec, stat->stateEnterTime.tv_usec/1000);
-
-       if (stmt) write2rgma_sql(stmt);
+       if (stat->jdl != NULL) {
+               struct cclassad *ad;
+       
+               ad = cclassad_create(stat->jdl);
+               if (ad) {
+                       if (!cclassad_evaluate_to_string(ad, "VirtualOrganisation", &string_vo))
+                               string_vo = NULL;
+                       cclassad_delete(ad);
+               } 
+       }
 
+       trio_asprintf(&stmt,
+               "JOBID=%|Ss "
+               "OWNER=%|Ss "
+               "BKSERVER=%|Ss "
+               "NETWORKSERVER=%|Ss "
+               "VO=%|Ss "
+               "LASTUPDATETIME=%d "
+               "STATENAME=%s "
+               "STATEENTERTIME=%d "
+               "CONDORID=%|Ss "
+               "DESTINATION=%|Ss "
+               "EXITCODE=%d "
+               "DONECODE=%d "
+               "STATUSREASON=%|Ss"
+               "\n",
+               string_jobid,
+               stat->owner,
+               string_server,
+               (stat->network_server) ? (stat->network_server) : "",
+               (string_vo) ? string_vo : "",
+               stat->lastUpdateTime.tv_sec,
+               string_stat,
+               stat->stateEnterTime.tv_sec,
+               (stat->condorId) ? (stat->condorId) : "",
+               (stat->destination) ? (stat->destination) : "",
+               stat->exit_code,
+               stat->done_code,
+               (stat->reason) ? (stat->reason) : ""
+       );
+                               
+       if (stmt) write2rgma_line(stmt, !lcgmon);
+
+       free(string_vo);
        free(string_jobid);
        free(string_stat);
        free(string_server);
@@ -104,7 +219,7 @@ int main(int argc, char* argv[])
        setenv("EDG_WL_RGMA_FILE", "/tmp/rgma_statefile", 1);
        setenv("EDG_WL_RGMA_SOCK", "/tmp/rgma_statesock", 1);
  
-       write2rgma_sql("INSERT INTO JobStatusRaw VALUES ('Job_id9','CLEARED','Owner','BKServer', '1050024158210')");
+       write2rgma_line("123---789\n", 0);
        return 0;
 }
 #endif