merge 1.5
authorAleš Křenek <ljocha@ics.muni.cz>
Wed, 15 Mar 2006 18:12:21 +0000 (18:12 +0000)
committerAleš Křenek <ljocha@ics.muni.cz>
Wed, 15 Mar 2006 18:12:21 +0000 (18:12 +0000)
14 files changed:
org.glite.lb.server/config/startup
org.glite.lb.server/project/build.number
org.glite.lb.server/project/configure.properties.xml
org.glite.lb.server/project/version.properties
org.glite.lb.server/src/bkserverd.c
org.glite.lb.server/src/db_store.c
org.glite.lb.server/src/il_lbproxy.c
org.glite.lb.server/src/jobstat.h
org.glite.lb.server/src/jobstat_supp.c
org.glite.lb.server/src/process_event.c
org.glite.lb.server/src/query.c
org.glite.lb.server/src/request.c
org.glite.lb.server/src/store.c.T
org.glite.lb.server/src/stored_master.c

index 10b1a6d..d712223 100755 (executable)
@@ -1,7 +1,7 @@
 #!/bin/sh
 
 GLITE_LOCATION=${GLITE_LOCATION:-/opt/glite}
-GLITE_LOCATION_VAR=${GLITE_LOCATION_VAR:-${GLITE_LOCATION}/var}
+GLITE_LOCATION_VAR=${GLITE_LOCATION_VAR:-/var/glite}
 
 [ -f /etc/glite.conf ] && . /etc/glite.conf
 [ -f $GLITE_LOCATION/etc/glite-wms.conf ] && . $GLITE_LOCATION/etc/glite-wms.conf
@@ -34,6 +34,13 @@ start()
                fi
        fi
 
+       [ -z "$GLITE_LB_EXPORT_DUMPDIR" ] && GLITE_LB_EXPORT_DUMPDIR=/tmp/dump
+       purgedir="--dump-prefix $GLITE_LB_EXPORT_DUMPDIR"
+       [ -d "$GLITE_LB_EXPORT_DUMPDIR" ] || mkdir "$GLITE_LB_EXPORT_DUMPDIR" && chown $GLITE_USER:$GLITE_GROUP "$GLITE_LB_EXPORT_DUMPDIR"
+
+       [ -z "$GLITE_LB_EXPORT_JPREG_MAILDIR" ] && GLITE_LB_EXPORT_JPREG_MAILDIR=/tmp/jpreg
+       maildir="--jpreg-dir $GLITE_LB_EXPORT_JPREG_MAILDIR"
+       [ -d "$GLITE_LB_EXPORT_JPREG_MAILDIR" ] || mkdir "$GLITE_LB_EXPORT_JPREG_MAILDIR" && chown $GLITE_USER:$GLITE_GROUP "$GLITE_LB_EXPORT_JPREG_MAILDIR"
 
        [ -z "$creds" ] && echo $0: WARNING: No credentials specified. Using default lookup which is dangerous. >&2
 
@@ -43,7 +50,8 @@ start()
        su - $GLITE_USER -c "$GLITE_LOCATION/bin/glite-lb-bkserverd \
                --notif-il-sock=/tmp/glite-lb-notif.sock \
                --notif-il-fprefix=/var/tmp/glite-lb-notif \
-               $creds -i $pidfile $port" && echo " done" || echo " FAILED"
+               $creds -i $pidfile $port $purgedir $maildir" \
+       && echo " done" || echo " FAILED"
 
        echo -n Starting glite-lb-notif-interlogd ...
        su - $GLITE_USER -c "$GLITE_LOCATION/bin/glite-lb-notif-interlogd \
@@ -79,23 +87,29 @@ stop()
 
 status()
 {
+       retval=0
+
        if netstat -an --unix | grep "^unix .* LISTEN.* /tmp/glite-lb-notif.sock$" >/dev/null 2>&1 ;then
                echo glite-lb-notif-interlogd running
        else
                echo glite-lb-notif-interlogd not running
-               return 1
+               retval=1
        fi
 
        if [ -f $pidfile ]; then
                pid=`cat $pidfile`
                if ps p $pid >/dev/null 2>&1; then
                        echo glite-lb-bkserverd running as $pid 
-                       return 0
+               else
+                       echo glite-lb-bkserverd not running
+                       retval=1
                fi
+       else
+               echo glite-lb-bkserverd not running
+               retval=1
        fi
 
-       echo glite-lb-bkserverd not running
-       return 1
+       return $retval
 }
 
 case x$1 in
index 4380560..196dd55 100644 (file)
@@ -1,2 +1,2 @@
-#Sat Oct 15 06:56:05 CEST 2005
-module.build=154
+#Wed Mar 15 04:58:37 CET 2006
+module.build=0235
index f533759..c5b2837 100644 (file)
 
        Revision history:
        $Log$
+       Revision 1.8.4.2  2006/02/20 09:14:30  zsalvet
+       Revert unfinished changes committed by mistake.
+       
+       Revision 1.8  2005/09/19 15:24:20  akrenek
+       "The gigantic merge"; from release 1.4 branch to HEAD
+       
        Revision 1.7.2.1  2005/08/09 15:02:10  jskrabal
        - build with broken gsoap 2.7.0 repository package
        
index ac0be14..d6d8219 100644 (file)
@@ -1,4 +1,3 @@
 #Fri Sep 02 14:18:35 CEST 2005
-module.version=1.4.0
-module.build=2
-module.age=1
+module.version=1.3.7
+module.age=0
index 3fdc08c..1a34af5 100644 (file)
@@ -40,6 +40,7 @@
 #include "glite/lb/context.h"
 #include "glite/lb/mini_http.h"
 #include "glite/lb/context-int.h"
+#include "glite/lb/lb_maildir.h"
 
 #include "lb_http.h"
 #include "lb_proto.h"
@@ -69,14 +70,12 @@ extern edg_wll_ErrorCode edg_wll_Close(edg_wll_Context);
 
 #define CON_QUEUE              20      /* accept() */
 #define SLAVE_OVERLOAD         10      /* queue items per slave */
-#define CLNT_TIMEOUT           10      /* keep idle connection that many seconds */
-#define TOTAL_CLNT_TIMEOUT     60      /* one client may ask one slave multiple times */
+#define CONNECT_TIMEOUT                30
+#define IDLE_TIMEOUT           10      /* keep idle connection that many seconds */
+#define REQUEST_TIMEOUT                120     /* one client may ask one slave multiple times */
                                        /* but only limited time to avoid DoS attacks */
-#define CLNT_REJECT_TIMEOUT    100000  /* time limit for client rejection in !usec! */
 #define DNS_TIMEOUT            5       /* how long wait for DNS lookup */
 #define SLAVE_CONNS_MAX                500     /* commit suicide after that many connections */
-#define MASTER_TIMEOUT         30      /* maximal time of one-round of master network communication */
-#define SLAVE_TIMEOUT          30      /* maximal time of one-round of slave network communication */
 
 #ifndef EDG_PURGE_STORAGE
 #define EDG_PURGE_STORAGE      "/tmp/purge"
@@ -277,8 +276,7 @@ int main(int argc, char *argv[])
        int                                     opt;
        char                            pidfile[PATH_MAX] = EDG_BKSERVERD_PIDFILE,
                                           *port,
-                                          *name,
-                                          *tmps;
+                                          *name;
 #ifdef GLITE_LB_SERVER_WITH_WS
        char                       *ws_port;
 #endif /* GLITE_LB_SERVER_WITH_WS */
@@ -412,9 +410,9 @@ int main(int argc, char *argv[])
        if (check_mkdir(dumpStorage)) exit(1);
        if (check_mkdir(purgeStorage)) exit(1);
        if ( jpreg ) {
-               if ( edg_wll_MaildirInit(jpregDir, &tmps) ) {
-                       dprintf(("[%d] %s\n", getpid(), tmps));
-                       if (!debug) syslog(LOG_CRIT, tmps);
+               if ( edg_wll_MaildirInit(jpregDir) ) {
+                       dprintf(("[%d] edg_wll_MaildirInit failed: %s\n", getpid(), lbm_errdesc));
+                       if (!debug) syslog(LOG_CRIT, "edg_wll_MaildirInit failed: %s", lbm_errdesc);
                        exit(1);
                }
        }
@@ -577,14 +575,12 @@ a.sin_addr.s_addr = INADDR_ANY;
        glite_srvbones_set_param(GLITE_SBPARAM_SLAVES_COUNT, slaves);
        glite_srvbones_set_param(GLITE_SBPARAM_SLAVE_OVERLOAD, SLAVE_OVERLOAD);
        glite_srvbones_set_param(GLITE_SBPARAM_SLAVE_CONNS_MAX, SLAVE_CONNS_MAX);
-       /* XXX
-        * not final version - yet!
-        */
-       to = (struct timeval){CLNT_TIMEOUT, 0};
+
+       to = (struct timeval){CONNECT_TIMEOUT, 0};
        glite_srvbones_set_param(GLITE_SBPARAM_CONNECT_TIMEOUT, &to);
-       to = (struct timeval){CLNT_TIMEOUT, 0};
+       to = (struct timeval){REQUEST_TIMEOUT, 0};
        glite_srvbones_set_param(GLITE_SBPARAM_REQUEST_TIMEOUT, &to);
-       to = (struct timeval){TOTAL_CLNT_TIMEOUT, 0};
+       to = (struct timeval){IDLE_TIMEOUT, 0};
        glite_srvbones_set_param(GLITE_SBPARAM_IDLE_TIMEOUT, &to);
 
        glite_srvbones_run(bk_clnt_data_init, service_table, sizofa(service_table), debug);
@@ -694,7 +690,6 @@ int bk_handle_connection(int conn, struct timeval *timeout, void *data)
        OM_uint32                       min_stat,
                                                maj_stat;
        struct timeval          dns_to = {DNS_TIMEOUT, 0},
-                                               total_to = { TOTAL_CLNT_TIMEOUT,0 },
                                                conn_start, now;
        struct sockaddr_in      a;
        int                                     alen;
@@ -743,13 +738,8 @@ int bk_handle_connection(int conn, struct timeval *timeout, void *data)
        ctx->rgma_export = rgma_export;
        memcpy(ctx->purge_timeout, purge_timeout, sizeof(ctx->purge_timeout));
 
-       ctx->p_tmp_timeout.tv_sec = SLAVE_TIMEOUT;
-       ctx->p_tmp_timeout.tv_usec = 0;
-       if ( total_to.tv_sec < ctx->p_tmp_timeout.tv_sec )
-       {
-               ctx->p_tmp_timeout.tv_sec = total_to.tv_sec;
-               ctx->p_tmp_timeout.tv_usec = total_to.tv_usec;
-       }
+       ctx->p_tmp_timeout.tv_sec = timeout->tv_sec;
+       ctx->p_tmp_timeout.tv_usec = timeout->tv_usec;
        
        ctx->poolSize = 1;
        ctx->connPool = calloc(1, sizeof(edg_wll_ConnPool));
@@ -763,7 +753,6 @@ int bk_handle_connection(int conn, struct timeval *timeout, void *data)
 
        gettimeofday(&conn_start, 0);
 
-       /* not a critical operation, do not waste all SLAVE_TIMEOUT */
        h_errno = asyn_gethostbyaddr(&name, (char *)&a.sin_addr.s_addr,sizeof(a.sin_addr.s_addr), AF_INET, &dns_to);
        switch ( h_errno )
        {
@@ -985,6 +974,7 @@ int bk_accept_store(int conn, struct timeval *timeout, void *cdata)
                case ETIMEDOUT:
                case EDG_WLL_ERROR_GSS:
                case EPIPE:
+               case EIO:
                        dprintf(("[%d] %s (%s)\n", getpid(), errt, errd));
                        if (!debug) syslog(LOG_ERR,"%s (%s)", errt, errd);
                        /*      fallthrough
@@ -1141,51 +1131,10 @@ int bk_accept_ws(int conn, struct timeval *timeout, void *cdata)
        }
 
        if ( err ) {
-               char    *errt, *errd;
-               int             ret;
-
-               
-               errt = errd = NULL;
-               switch ( (ret = edg_wll_Error(ctx, &errt, &errd)) ) {
-               case ETIMEDOUT:
-               case EDG_WLL_ERROR_GSS:
-               case EPIPE:
-                       dprintf(("[%d] %s (%s)\n", getpid(), errt, errd));
-                       if (!debug) syslog(LOG_ERR,"%s (%s)", errt, errd);
-                       /*      "recoverable" error - return (>0)
-                        *      fallthrough
-                        */
-               case ENOTCONN:
-                       /*      "recoverable" error - return (>0)
-                        *      return ENOTCONN to tell bones to clean up
-                        */
-                       free(errt); free(errd);
-                       return ret;
-                       break;
-
-               case ENOENT:
-               case EINVAL:
-               case EPERM:
-               case EEXIST:
-               case EDG_WLL_ERROR_NOINDEX:
-               case E2BIG:
-                       dprintf(("[%d] %s (%s)\n", getpid(), errt, errd));
-                       if ( !debug ) syslog(LOG_ERR,"%s (%s)", errt, errd);
-                       /*
-                        *      no action for non-fatal errors
-                        */
-                       break;
-                       
-               default:
-                       dprintf(("[%d] %s (%s)\n", getpid(), errt, errd));
-                       if (!debug) syslog(LOG_CRIT,"%s (%s)",errt,errd);
-                       /*
-                        *      unknown error - do rather return (<0) (slave will be killed)
-                        */
-                       return -1;
-               } 
-               free(errt); free(errd);
-               return 1;
+               // soap_print_fault(struct soap *soap, FILE *fd) maybe useful here
+               dprintf(("[%d] SOAP error (bk_accept_ws) \n", getpid()));
+               if (!debug) syslog(LOG_CRIT,"SOAP error (bk_accept_ws)");
+               return ECANCELED;
        }
 
        return 0;
index fc61e47..13a56fd 100644 (file)
@@ -89,10 +89,12 @@ db_store(edg_wll_Context ctx,char *ucs, char *event)
        switch ( ev->any.type ) {
        case EDG_WLL_EVENT_CLEAR:
        case EDG_WLL_EVENT_ABORT:
-       case EDG_WLL_EVENT_CANCEL:
-       case EDG_WLL_EVENT_DONE:
                edg_wll_PurgeServerProxy(ctx, ev->any.jobId);
                break;
+       case EDG_WLL_EVENT_CANCEL:
+               if (ev->cancel.status_code == EDG_WLL_CANCEL_DONE) 
+                       edg_wll_PurgeServerProxy(ctx, ev->any.jobId);
+               break;
        default: break;
        }
   } else {
index 4b682e7..487c847 100644 (file)
@@ -18,7 +18,6 @@ edg_wll_EventSendProxy(
        const edg_wlc_JobId             jobid,
        const char                         *event)
 {
-       struct timeval  timeout;
        long                    filepos;
        char               *jobid_s,
                                   *event_file = NULL;
@@ -28,9 +27,6 @@ edg_wll_EventSendProxy(
 
        edg_wll_ResetError(ctx);
 
-       timeout.tv_sec = EDG_WLL_LOG_TIMEOUT_MAX;
-       timeout.tv_usec = 0;    
-
        jobid_s = edg_wlc_JobIdGetUnique(jobid);
        if ( !jobid_s ) {
                edg_wll_SetError(ctx, ENOMEM, "edg_wlc_JobIdGetUnique()");
@@ -44,13 +40,16 @@ edg_wll_EventSendProxy(
        }
 
        if ( edg_wll_log_event_write(ctx, event_file, event,
-                                               FCNTL_ATTEMPTS, FCNTL_TIMEOUT, &filepos) ) {
+                                       (ctx->p_tmp_timeout.tv_sec > FCNTL_ATTEMPTS ?
+                                               ctx->p_tmp_timeout.tv_sec : FCNTL_ATTEMPTS),
+                                       FCNTL_TIMEOUT, &filepos) ) {
+
                edg_wll_UpdateError(ctx, 0, "edg_wll_log_event_write()");
                _err(1);
        }
 
        if ( edg_wll_log_event_send(ctx, lbproxy_ilog_socket_path, filepos,
-                                               event, strlen(event), 1, &timeout) ) {
+                                               event, strlen(event), 1, &ctx->p_tmp_timeout) ) {
                edg_wll_UpdateError(ctx, 0, "edg_wll_log_event_send()");
                _err(-1);
        }
index 181a85d..352af06 100644 (file)
@@ -64,9 +64,11 @@ void write2rgma_status(edg_wll_JobStat *);
 int before_deep_resubmission(const char *, const char *);
 int same_branch(const char *, const char *);
 int component_seqcode(const char *a, edg_wll_Source index);
+char * set_component_seqcode(char *s,edg_wll_Source index,int val);
 int processEvent(intJobStat *, edg_wll_Event *, int, int, char **);
 
 int add_stringlist(char ***, const char *);
 int edg_wll_compare_seq(const char *, const char *);
 
 void init_intJobStat(intJobStat *p);
+
index 58d4a51..6fb09ef 100644 (file)
@@ -658,6 +658,41 @@ int component_seqcode(const char *a, edg_wll_Source index)
        return(c[index]);       
 }
 
+char * set_component_seqcode(char *s,edg_wll_Source index,int val)
+{
+       unsigned int    c[EDG_WLL_SOURCE__LAST];
+       int             res;
+       char            *ret;
+
+       res =  sscanf(s, "UI=%d:NS=%d:WM=%d:BH=%d:JSS=%d:LM=%d:LRMS=%d:APP=%d",
+                       &c[EDG_WLL_SOURCE_USER_INTERFACE],
+                       &c[EDG_WLL_SOURCE_NETWORK_SERVER],
+                       &c[EDG_WLL_SOURCE_WORKLOAD_MANAGER],
+                       &c[EDG_WLL_SOURCE_BIG_HELPER],
+                       &c[EDG_WLL_SOURCE_JOB_SUBMISSION],
+                       &c[EDG_WLL_SOURCE_LOG_MONITOR],
+                       &c[EDG_WLL_SOURCE_LRMS],
+                       &c[EDG_WLL_SOURCE_APPLICATION]);
+       if (res != EDG_WLL_SOURCE__LAST-1) {
+               syslog(LOG_ERR, "unparsable sequence code %s\n", s);
+               fprintf(stderr, "unparsable sequence code %s\n", s);
+               return NULL;
+       }
+
+       c[index] = val;
+       trio_asprintf(&ret,"UI=%06d:NS=%010d:WM=%06d:BH=%010d:JSS=%06d"
+                                ":LM=%06d:LRMS=%06d:APP=%06d",
+                        c[EDG_WLL_SOURCE_USER_INTERFACE],
+                        c[EDG_WLL_SOURCE_NETWORK_SERVER],
+                        c[EDG_WLL_SOURCE_WORKLOAD_MANAGER],
+                        c[EDG_WLL_SOURCE_BIG_HELPER],
+                        c[EDG_WLL_SOURCE_JOB_SUBMISSION],
+                        c[EDG_WLL_SOURCE_LOG_MONITOR],
+                        c[EDG_WLL_SOURCE_LRMS],
+                        c[EDG_WLL_SOURCE_APPLICATION]);
+       return ret;
+}
+
 int before_deep_resubmission(const char *a, const char *b)
 {
        if (component_seqcode(a, EDG_WLL_SOURCE_WORKLOAD_MANAGER) < 
index ab13ed4..7b0fb1d 100644 (file)
@@ -1,8 +1,11 @@
+#ident "$Header$"
+
 #include <stdio.h>
 #include <stdlib.h>
 #include <string.h>
 #include <errno.h>
 #include <assert.h>
+#include <syslog.h>
 
 #include "glite/lb/producer.h"
 #include "glite/lb/context-int.h"
@@ -166,6 +169,17 @@ static void load_branch_state(intJobStat *js)
        }
 }
 
+// clear branches (deep resub. or abort)
+static void reset_branch(intJobStat *js, edg_wll_Event *e) 
+{
+       js->resubmit_type = EDG_WLL_RESUBMISSION_WILLRESUB;
+       free_stringlist(&js->pub.possible_destinations);
+       free_stringlist(&js->pub.possible_ce_nodes);
+       free_branch_state(&js->branch_states);
+       js->pub.payload_running = 0;
+       rep(js->branch_tag_seqcode, NULL);
+       rep(js->deep_resubmit_seqcode, e->any.seqcode);
+}
 
 static char* location_string(const char *source, const char *host, const char *instance)
 {
@@ -174,6 +188,12 @@ static char* location_string(const char *source, const char *host, const char *i
        return ret;
 }
 
+static int after_enter_wm(const char *es,const char *js)
+{
+       return component_seqcode(es,EDG_WLL_SOURCE_NETWORK_SERVER) >
+               component_seqcode(js,EDG_WLL_SOURCE_NETWORK_SERVER);
+}
+
 
 static int badEvent(intJobStat *js UNUSED_VAR, edg_wll_Event *e, int ev_seq UNUSED_VAR)
 {
@@ -186,11 +206,15 @@ static int badEvent(intJobStat *js UNUSED_VAR, edg_wll_Event *e, int ev_seq UNUS
        return RET_FATAL;
 }
 
+// (?) || (0 && 1)  =>  true if (res == RET_OK)
 #define USABLE(res,strict) ((res) == RET_OK || ( (res) == RET_SOON && !strict))
+
+// (?) || (1 && 1)  =>  always true
 #define USABLE_DATA(res,strict) ((res) == RET_OK || ( (res) != RET_FATAL && !strict))
+
 #define USABLE_BRANCH(fine_res) ((fine_res) != RET_TOOOLD && (fine_res) != RET_BADBRANCH)
 #define LRMS_STATE(state) ((state) == EDG_WLL_JOB_RUNNING || (state) == EDG_WLL_JOB_DONE)
-
+#define PARSABLE_SEQCODE(code) (component_seqcode((code),0) >= 0)
 
 int processEvent(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict, char **errstring)
 {
@@ -200,7 +224,7 @@ int processEvent(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict, char
        int                     res = RET_OK,
                                fine_res = RET_OK;
                                
-
+       int     lm_favour_lrms = 0;
 
        if (old_state == EDG_WLL_JOB_ABORTED ||
                old_state == EDG_WLL_JOB_CANCELLED ||
@@ -208,6 +232,15 @@ int processEvent(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict, char
                res = RET_LATE;
        }
 
+/* new event coming from NS => forget about any resubmission loops */
+       if (e->type != EDG_WLL_EVENT_CANCEL && 
+               js->last_seqcode &&
+               after_enter_wm(e->any.seqcode,js->last_seqcode))
+       {
+               rep(js->branch_tag_seqcode,NULL); 
+               rep(js->deep_resubmit_seqcode,NULL); 
+               rep(js->last_branch_seqcode,NULL); 
+       }
 
        if (js->deep_resubmit_seqcode && 
                        before_deep_resubmission(e->any.seqcode, js->deep_resubmit_seqcode)) {
@@ -216,7 +249,7 @@ int processEvent(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict, char
        }
        else if (js->branch_tag_seqcode) {              // ReallyRunning ev. arrived
                if (same_branch(e->any.seqcode, js->branch_tag_seqcode)) {
-                       if ((js->last_seqcode != NULL) &&
+                       if ((js->last_branch_seqcode != NULL) &&
                                edg_wll_compare_seq(e->any.seqcode, js->last_branch_seqcode) < 0) {
                                res = RET_LATE;
                        }
@@ -248,7 +281,9 @@ int processEvent(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict, char
                                                                e->any.timestamp.tv_sec;
                                                        res = RET_LATE;
                                                }
-                                               new_state = EDG_WLL_JOB_SCHEDULED; break;
+                                               new_state = EDG_WLL_JOB_SCHEDULED;
+                                               lm_favour_lrms = 1;
+                                               break;
                                        default:
                                                goto bad_event; break;
                                }
@@ -307,7 +342,9 @@ int processEvent(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict, char
                                        new_state = EDG_WLL_JOB_WAITING; break;
                                case EDG_WLL_SOURCE_LOG_MONITOR:
                                        if (LRMS_STATE(old_state)) res = RET_LATE;
-                                       new_state = EDG_WLL_JOB_READY; break;
+                                       new_state = EDG_WLL_JOB_READY; 
+                                       lm_favour_lrms = 1;
+                                       break;
                                case EDG_WLL_SOURCE_LRMS:
                                        new_state = EDG_WLL_JOB_SCHEDULED; break;
                                default:
@@ -480,17 +517,62 @@ int processEvent(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict, char
                                if (USABLE_BRANCH(fine_res)) {
                                        rep(js->pub.ce_node, e->running.node);
                                }
-                               if (e->any.source == EDG_WLL_SOURCE_LOG_MONITOR) {
+                               /* why? if (e->any.source == EDG_WLL_SOURCE_LOG_MONITOR) { */
                                        if (e->running.node) {
                                                update_branch_state(e->any.seqcode, NULL,
                                                        e->running.node, NULL, &js->branch_states);
                                                add_stringlist(&js->pub.possible_ce_nodes,
                                                        e->running.node);
                                        }
-                               }
+                               /* } */
                        }
                        break;
                case EDG_WLL_EVENT_REALLYRUNNING:
+                       /* consistence check -- should not receive two contradicting ReallyRunning's within single
+                          deep resub cycle */
+                       if (fine_res == RET_BADBRANCH) {
+                               syslog(LOG_ERR,"ReallyRunning on bad branch %s",
+                                               e->any.source == EDG_WLL_SOURCE_LOG_MONITOR ? e->reallyRunning.wn_seq : e->any.seqcode);
+                               break;
+                       }
+                       /* select the branch unless TOOOLD, i.e. before deep resubmission */
+                       if (!(res == RET_LATE && fine_res == RET_TOOOLD)) {
+                               if (e->any.source == EDG_WLL_SOURCE_LRMS) {
+                                       rep(js->branch_tag_seqcode, e->any.seqcode);
+                                       if (res == RET_OK) {
+                                               rep(js->last_branch_seqcode, e->any.seqcode);
+                                               js->pub.state = EDG_WLL_JOB_RUNNING;
+                                       }
+                               }
+                               if (e->any.source == EDG_WLL_SOURCE_LOG_MONITOR) {
+                                       if (!js->branch_tag_seqcode) {
+                                               if (PARSABLE_SEQCODE(e->reallyRunning.wn_seq)) { 
+                                                       rep(js->branch_tag_seqcode, e->reallyRunning.wn_seq);
+                                               } else
+                                                       goto bad_event;
+                                       }
+                                       if (!js->last_branch_seqcode) {
+                                               if (PARSABLE_SEQCODE(e->reallyRunning.wn_seq)) {
+                                                       if (res == RET_OK) {
+                                                               rep(js->last_branch_seqcode, e->reallyRunning.wn_seq);
+                                                               js->pub.state = EDG_WLL_JOB_RUNNING;
+                                                       }
+                                               } else
+                                                       goto bad_event;
+                                       }
+                               }
+
+                               /* XXX: best effort -- if we are lucky, ReallyRunning is on the last shallow cycle,
+                                       so we take in account events processed so far */
+                               if (res == RET_LATE && !js->last_branch_seqcode) {
+                                       if (same_branch(js->last_seqcode,js->branch_tag_seqcode))
+                                               rep(js->last_branch_seqcode,js->last_seqcode);
+                               }
+
+                               js->pub.payload_running = 1;
+                               load_branch_state(js);
+                       }
+#if 0
                        if (USABLE_DATA(res, strict)) {
                                js->pub.state = EDG_WLL_JOB_RUNNING;
                                free(js->pub.location);
@@ -504,12 +586,22 @@ int processEvent(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict, char
                                        rep(js->last_branch_seqcode, e->any.seqcode);
                                }
                                if (e->any.source == EDG_WLL_SOURCE_LOG_MONITOR) {
-                                       rep(js->branch_tag_seqcode, e->reallyRunning.wn_seq);
-                                       rep(js->last_branch_seqcode, e->reallyRunning.wn_seq);
+                                       if (!js->branch_tag_seqcode) {
+                                               if (PARSABLE_SEQCODE(e->reallyRunning.wn_seq)) { 
+                                                       rep(js->branch_tag_seqcode, e->reallyRunning.wn_seq);
+                                               } else
+                                                       goto bad_event;
+                                       }
+                                       if (!js->last_branch_seqcode) {
+                                               if (PARSABLE_SEQCODE(e->reallyRunning.wn_seq)) {
+                                                       rep(js->last_branch_seqcode, e->reallyRunning.wn_seq);
+                                               } else
+                                                       goto bad_event;
+                                       }
                                }
-
                                load_branch_state(js);
                        }
+#endif
                        break;
                case EDG_WLL_EVENT_RESUBMISSION:
                        if (USABLE(res, strict)) {
@@ -524,18 +616,13 @@ int processEvent(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict, char
                                else 
                                if (e->resubmission.result == EDG_WLL_RESUBMISSION_WILLRESUB &&
                                                e->any.source == EDG_WLL_SOURCE_WORKLOAD_MANAGER) {
-                                       js->resubmit_type = EDG_WLL_RESUBMISSION_WILLRESUB;
-                                       free_stringlist(&js->pub.possible_destinations);
-                                       free_stringlist(&js->pub.possible_ce_nodes);
-                                       free_branch_state(&js->branch_states);
-                                       js->pub.payload_running = 0;
-                                       rep(js->branch_tag_seqcode, NULL);
-                                       rep(js->deep_resubmit_seqcode, e->any.seqcode);
+                                       reset_branch(js, e);
                                }
                                else
                                if (e->resubmission.result == EDG_WLL_RESUBMISSION_SHALLOW) {
                                        js->resubmit_type = EDG_WLL_RESUBMISSION_SHALLOW;
-                                       rep(js->deep_resubmit_seqcode, NULL);
+                                       // deep resubmit stays forever deadline for events
+                                       // rep(js->deep_resubmit_seqcode, NULL);
                                }
                        }
                        break;
@@ -613,11 +700,15 @@ int processEvent(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict, char
                        }
                        break;
                case EDG_WLL_EVENT_ABORT:
+                       // XXX: accept Abort from WM in every case
+                       //      setting res make USABLE macro true (awful !!)
+                       if (e->any.source == EDG_WLL_SOURCE_WORKLOAD_MANAGER) res = RET_OK;
                        if (USABLE(res, strict)) {
                                js->pub.state = EDG_WLL_JOB_ABORTED;
                                rep(js->pub.reason, e->abort.reason);
                                rep(js->pub.location, "none");
-                               js->pub.payload_running = 0;
+
+                               reset_branch(js, e);
                        }
                        break;
 
@@ -723,6 +814,28 @@ int processEvent(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict, char
                        js->pub.stateEnterTimes[1 + js->pub.state]
                                = (int)js->pub.lastUpdateTime.tv_sec;
                }
+               if (e->any.type == EDG_WLL_EVENT_CANCEL) {
+                       rep(js->last_cancel_seqcode, e->any.seqcode);
+               } else {
+
+/* the first set of LM events (Accept, Transfer/* -> LRMS)
+   should not should shift the state (to Ready, Scheduled) but NOT to
+   update js->last_seqcode completely, in order not to block following
+   LRMS events which are likely to arrive later but should still affect
+   job state (as there may be no more LM events due to the Condor bug).
+   However, don't ignore the incoming seqcode completely, to catch up
+   with possibly delayed WM/JSS events */
+
+                       if (lm_favour_lrms) {
+                               free(js->last_seqcode);
+                               js->last_seqcode = set_component_seqcode(e->any.seqcode,EDG_WLL_SOURCE_LOG_MONITOR,0);
+                       }
+                       else rep(js->last_seqcode, e->any.seqcode);
+               }
+
+               if (fine_res == RET_GOODBRANCH) {
+                       rep(js->last_branch_seqcode, e->any.seqcode);
+               }
        }
 
        if (USABLE_DATA(res,strict)) {
@@ -737,16 +850,6 @@ int processEvent(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict, char
                }
        }
        
-       if (e->any.type == EDG_WLL_EVENT_CANCEL) {
-               rep(js->last_cancel_seqcode, e->any.seqcode);
-       } else {
-               rep(js->last_seqcode, e->any.seqcode);
-       }
-
-       if (fine_res == RET_GOODBRANCH) {
-               rep(js->last_branch_seqcode, e->any.seqcode);
-       }
-
        return res;
 
 bad_event:
index 5d86547..196c9be 100644 (file)
@@ -78,7 +78,8 @@ int edg_wll_QueryEventsServer(
                goto cleanup;
 
        if (event_conditions && *event_conditions && (*event_conditions)->attr &&
-               !(event_where = ec_to_head_where(ctx,event_conditions)))
+               !(event_where = ec_to_head_where(ctx,event_conditions)) &&
+               edg_wll_Error(ctx,NULL,NULL) != 0)
                goto cleanup;
 
        if ( job_conditions && *job_conditions && (*job_conditions)->attr &&
@@ -863,7 +864,7 @@ static char *jc_to_head_where(
                                        free(aux);
                                }
                                else
-                                       trio_asprintf(&tmps, "%s OR s.%s %s s.%s", conds, cname, opToString(jc[m][n].op), dbt);
+                                       trio_asprintf(&tmps, "%s OR s.%s %s %s", conds, cname, opToString(jc[m][n].op), dbt);
 
                                free(conds);
                                conds = tmps;
@@ -872,7 +873,7 @@ static char *jc_to_head_where(
                        {
                                trio_asprintf(&aux, "%s", dbt);
                                dbt = edg_wll_TimeToDB(jc[m][n].value2.t.tv_sec);
-                               trio_asprintf(&conds, "(%s >= s.%s AND s.%s <= %s)", cname, aux, cname, dbt);
+                               trio_asprintf(&conds, "(s.%s >= %s AND s.%s <= %s)", cname, aux, cname, dbt);
                                free(aux);
                        }
                        else
index a3288a4..f0364ac 100644 (file)
@@ -26,7 +26,7 @@ handle_request(edg_wll_Context ctx,char *buf)
 
   ret = decode_il_msg(&event, buf);
   if(ret < 0) {
-    edg_wll_SetError(ctx,EDG_WLL_IL_PROTO,"reading event string");
+    edg_wll_SetError(ctx,EDG_WLL_IL_PROTO,"decoding event string failed");
     return EDG_WLL_IL_PROTO;
   }
 
index b2bcbc4..1130c93 100644 (file)
@@ -128,7 +128,7 @@ int edg_wll_StoreEvent(edg_wll_Context ctx,edg_wll_Event *e,int *seq)
                free(stmt);
        }
 
-       free(stmt);
+       free(stmt); stmt = NULL;
        if ((err = store_seq(ctx,e,next)) ||
                (err = store_flesh(ctx,e,jobid,next))) {
        /* attempt to cleanup, ignore new errors */
@@ -333,11 +333,13 @@ static int store_flesh(edg_wll_Context ctx,edg_wll_Event *e,char *jobid,int no)
        for (i=0; i<sizeof(f)/sizeof(f[0]); i++) free(f[i].val);
 
 /* XXX: hardcoded, no other suitable place to store it */
-       trio_asprintf(&stmt,"insert into short_fields(jobid,event,name,value) "
-               "values ('%|Ss',%d,'SRC_INSTANCE','%|Ss')",
-               jobid,no,e->any.src_instance);
-       if (edg_wll_ExecStmt(ctx,stmt,NULL) < 0) err = edg_wll_Error(ctx,NULL,NULL);
-       free(stmt);
+       if (!err) {
+               trio_asprintf(&stmt,"insert into short_fields(jobid,event,name,value) "
+                       "values ('%|Ss',%d,'SRC_INSTANCE','%|Ss')",
+                       jobid,no,e->any.src_instance);
+               if (edg_wll_ExecStmt(ctx,stmt,NULL) < 0) err = edg_wll_Error(ctx,NULL,NULL);
+               free(stmt);
+       }
 
        return err;
 }
@@ -490,9 +492,11 @@ static int register_subjobs(edg_wll_Context ctx,const edg_wll_RegJobEvent *e)
                e2.any.host = strdup(ctx->srvName);
                e2.any.level = e->level;
                e2.any.priority = e->priority;
-               e2.any.seqcode = NULL;          /* XXX: I'm not sure :-( */
+               e2.any.seqcode = strdup(EDG_WLL_SEQ_NULL);
                e2.any.user = strdup(e->user);
                e2.any.source = e->source;
+               e2.any.src_instance = strdup(ctx->isProxy ? 
+                       "L&B proxy" : "L&B server");
                e2.regJob.ns = strdup(e->ns);
                edg_wlc_JobIdDup(e->jobId,&e2.regJob.parent);
                e2.regJob.jobtype = EDG_WLL_REGJOB_SIMPLE;
index 748d851..c2ca48d 100644 (file)
@@ -58,8 +58,10 @@ int edg_wll_StoreProto(edg_wll_Context ctx)
 
        edg_wll_ResetError(ctx);
        ret = read_il_data(ctx, &buf, gss_reader);
+       if (ret == EDG_WLL_GSS_ERROR_EOF) 
+         return edg_wll_SetError(ctx,ENOTCONN,"client side");
        if(ret < 0) 
-         return(ret);
+         return edg_wll_SetError(ctx,EIO,"interlogger protocol");
 
        handle_request(ctx,buf);
        free(buf);