From 7cad1ef0dcd94fd87f5c8aadc6720363785fe7fc Mon Sep 17 00:00:00 2001 From: =?utf8?q?Ale=C5=A1=20K=C5=99enek?= Date: Wed, 15 Mar 2006 18:12:21 +0000 Subject: [PATCH] merge 1.5 --- org.glite.lb.server/config/startup | 26 +++- org.glite.lb.server/project/build.number | 4 +- .../project/configure.properties.xml | 6 + org.glite.lb.server/project/version.properties | 5 +- org.glite.lb.server/src/bkserverd.c | 89 +++--------- org.glite.lb.server/src/db_store.c | 6 +- org.glite.lb.server/src/il_lbproxy.c | 11 +- org.glite.lb.server/src/jobstat.h | 2 + org.glite.lb.server/src/jobstat_supp.c | 35 +++++ org.glite.lb.server/src/process_event.c | 161 +++++++++++++++++---- org.glite.lb.server/src/query.c | 7 +- org.glite.lb.server/src/request.c | 2 +- org.glite.lb.server/src/store.c.T | 18 ++- org.glite.lb.server/src/stored_master.c | 4 +- 14 files changed, 246 insertions(+), 130 deletions(-) diff --git a/org.glite.lb.server/config/startup b/org.glite.lb.server/config/startup index 10b1a6d..d712223 100755 --- a/org.glite.lb.server/config/startup +++ b/org.glite.lb.server/config/startup @@ -1,7 +1,7 @@ #!/bin/sh GLITE_LOCATION=${GLITE_LOCATION:-/opt/glite} -GLITE_LOCATION_VAR=${GLITE_LOCATION_VAR:-${GLITE_LOCATION}/var} +GLITE_LOCATION_VAR=${GLITE_LOCATION_VAR:-/var/glite} [ -f /etc/glite.conf ] && . /etc/glite.conf [ -f $GLITE_LOCATION/etc/glite-wms.conf ] && . $GLITE_LOCATION/etc/glite-wms.conf @@ -34,6 +34,13 @@ start() fi fi + [ -z "$GLITE_LB_EXPORT_DUMPDIR" ] && GLITE_LB_EXPORT_DUMPDIR=/tmp/dump + purgedir="--dump-prefix $GLITE_LB_EXPORT_DUMPDIR" + [ -d "$GLITE_LB_EXPORT_DUMPDIR" ] || mkdir "$GLITE_LB_EXPORT_DUMPDIR" && chown $GLITE_USER:$GLITE_GROUP "$GLITE_LB_EXPORT_DUMPDIR" + + [ -z "$GLITE_LB_EXPORT_JPREG_MAILDIR" ] && GLITE_LB_EXPORT_JPREG_MAILDIR=/tmp/jpreg + maildir="--jpreg-dir $GLITE_LB_EXPORT_JPREG_MAILDIR" + [ -d "$GLITE_LB_EXPORT_JPREG_MAILDIR" ] || mkdir "$GLITE_LB_EXPORT_JPREG_MAILDIR" && chown $GLITE_USER:$GLITE_GROUP "$GLITE_LB_EXPORT_JPREG_MAILDIR" [ -z "$creds" ] && echo $0: WARNING: No credentials specified. Using default lookup which is dangerous. >&2 @@ -43,7 +50,8 @@ start() su - $GLITE_USER -c "$GLITE_LOCATION/bin/glite-lb-bkserverd \ --notif-il-sock=/tmp/glite-lb-notif.sock \ --notif-il-fprefix=/var/tmp/glite-lb-notif \ - $creds -i $pidfile $port" && echo " done" || echo " FAILED" + $creds -i $pidfile $port $purgedir $maildir" \ + && echo " done" || echo " FAILED" echo -n Starting glite-lb-notif-interlogd ... su - $GLITE_USER -c "$GLITE_LOCATION/bin/glite-lb-notif-interlogd \ @@ -79,23 +87,29 @@ stop() status() { + retval=0 + if netstat -an --unix | grep "^unix .* LISTEN.* /tmp/glite-lb-notif.sock$" >/dev/null 2>&1 ;then echo glite-lb-notif-interlogd running else echo glite-lb-notif-interlogd not running - return 1 + retval=1 fi if [ -f $pidfile ]; then pid=`cat $pidfile` if ps p $pid >/dev/null 2>&1; then echo glite-lb-bkserverd running as $pid - return 0 + else + echo glite-lb-bkserverd not running + retval=1 fi + else + echo glite-lb-bkserverd not running + retval=1 fi - echo glite-lb-bkserverd not running - return 1 + return $retval } case x$1 in diff --git a/org.glite.lb.server/project/build.number b/org.glite.lb.server/project/build.number index 4380560..196dd55 100644 --- a/org.glite.lb.server/project/build.number +++ b/org.glite.lb.server/project/build.number @@ -1,2 +1,2 @@ -#Sat Oct 15 06:56:05 CEST 2005 -module.build=154 +#Wed Mar 15 04:58:37 CET 2006 +module.build=0235 diff --git a/org.glite.lb.server/project/configure.properties.xml b/org.glite.lb.server/project/configure.properties.xml index f533759..c5b2837 100644 --- a/org.glite.lb.server/project/configure.properties.xml +++ b/org.glite.lb.server/project/configure.properties.xml @@ -20,6 +20,12 @@ Revision history: $Log$ + Revision 1.8.4.2 2006/02/20 09:14:30 zsalvet + Revert unfinished changes committed by mistake. + + Revision 1.8 2005/09/19 15:24:20 akrenek + "The gigantic merge"; from release 1.4 branch to HEAD + Revision 1.7.2.1 2005/08/09 15:02:10 jskrabal - build with broken gsoap 2.7.0 repository package diff --git a/org.glite.lb.server/project/version.properties b/org.glite.lb.server/project/version.properties index ac0be14..d6d8219 100644 --- a/org.glite.lb.server/project/version.properties +++ b/org.glite.lb.server/project/version.properties @@ -1,4 +1,3 @@ #Fri Sep 02 14:18:35 CEST 2005 -module.version=1.4.0 -module.build=2 -module.age=1 +module.version=1.3.7 +module.age=0 diff --git a/org.glite.lb.server/src/bkserverd.c b/org.glite.lb.server/src/bkserverd.c index 3fdc08c..1a34af5 100644 --- a/org.glite.lb.server/src/bkserverd.c +++ b/org.glite.lb.server/src/bkserverd.c @@ -40,6 +40,7 @@ #include "glite/lb/context.h" #include "glite/lb/mini_http.h" #include "glite/lb/context-int.h" +#include "glite/lb/lb_maildir.h" #include "lb_http.h" #include "lb_proto.h" @@ -69,14 +70,12 @@ extern edg_wll_ErrorCode edg_wll_Close(edg_wll_Context); #define CON_QUEUE 20 /* accept() */ #define SLAVE_OVERLOAD 10 /* queue items per slave */ -#define CLNT_TIMEOUT 10 /* keep idle connection that many seconds */ -#define TOTAL_CLNT_TIMEOUT 60 /* one client may ask one slave multiple times */ +#define CONNECT_TIMEOUT 30 +#define IDLE_TIMEOUT 10 /* keep idle connection that many seconds */ +#define REQUEST_TIMEOUT 120 /* one client may ask one slave multiple times */ /* but only limited time to avoid DoS attacks */ -#define CLNT_REJECT_TIMEOUT 100000 /* time limit for client rejection in !usec! */ #define DNS_TIMEOUT 5 /* how long wait for DNS lookup */ #define SLAVE_CONNS_MAX 500 /* commit suicide after that many connections */ -#define MASTER_TIMEOUT 30 /* maximal time of one-round of master network communication */ -#define SLAVE_TIMEOUT 30 /* maximal time of one-round of slave network communication */ #ifndef EDG_PURGE_STORAGE #define EDG_PURGE_STORAGE "/tmp/purge" @@ -277,8 +276,7 @@ int main(int argc, char *argv[]) int opt; char pidfile[PATH_MAX] = EDG_BKSERVERD_PIDFILE, *port, - *name, - *tmps; + *name; #ifdef GLITE_LB_SERVER_WITH_WS char *ws_port; #endif /* GLITE_LB_SERVER_WITH_WS */ @@ -412,9 +410,9 @@ int main(int argc, char *argv[]) if (check_mkdir(dumpStorage)) exit(1); if (check_mkdir(purgeStorage)) exit(1); if ( jpreg ) { - if ( edg_wll_MaildirInit(jpregDir, &tmps) ) { - dprintf(("[%d] %s\n", getpid(), tmps)); - if (!debug) syslog(LOG_CRIT, tmps); + if ( edg_wll_MaildirInit(jpregDir) ) { + dprintf(("[%d] edg_wll_MaildirInit failed: %s\n", getpid(), lbm_errdesc)); + if (!debug) syslog(LOG_CRIT, "edg_wll_MaildirInit failed: %s", lbm_errdesc); exit(1); } } @@ -577,14 +575,12 @@ a.sin_addr.s_addr = INADDR_ANY; glite_srvbones_set_param(GLITE_SBPARAM_SLAVES_COUNT, slaves); glite_srvbones_set_param(GLITE_SBPARAM_SLAVE_OVERLOAD, SLAVE_OVERLOAD); glite_srvbones_set_param(GLITE_SBPARAM_SLAVE_CONNS_MAX, SLAVE_CONNS_MAX); - /* XXX - * not final version - yet! - */ - to = (struct timeval){CLNT_TIMEOUT, 0}; + + to = (struct timeval){CONNECT_TIMEOUT, 0}; glite_srvbones_set_param(GLITE_SBPARAM_CONNECT_TIMEOUT, &to); - to = (struct timeval){CLNT_TIMEOUT, 0}; + to = (struct timeval){REQUEST_TIMEOUT, 0}; glite_srvbones_set_param(GLITE_SBPARAM_REQUEST_TIMEOUT, &to); - to = (struct timeval){TOTAL_CLNT_TIMEOUT, 0}; + to = (struct timeval){IDLE_TIMEOUT, 0}; glite_srvbones_set_param(GLITE_SBPARAM_IDLE_TIMEOUT, &to); glite_srvbones_run(bk_clnt_data_init, service_table, sizofa(service_table), debug); @@ -694,7 +690,6 @@ int bk_handle_connection(int conn, struct timeval *timeout, void *data) OM_uint32 min_stat, maj_stat; struct timeval dns_to = {DNS_TIMEOUT, 0}, - total_to = { TOTAL_CLNT_TIMEOUT,0 }, conn_start, now; struct sockaddr_in a; int alen; @@ -743,13 +738,8 @@ int bk_handle_connection(int conn, struct timeval *timeout, void *data) ctx->rgma_export = rgma_export; memcpy(ctx->purge_timeout, purge_timeout, sizeof(ctx->purge_timeout)); - ctx->p_tmp_timeout.tv_sec = SLAVE_TIMEOUT; - ctx->p_tmp_timeout.tv_usec = 0; - if ( total_to.tv_sec < ctx->p_tmp_timeout.tv_sec ) - { - ctx->p_tmp_timeout.tv_sec = total_to.tv_sec; - ctx->p_tmp_timeout.tv_usec = total_to.tv_usec; - } + ctx->p_tmp_timeout.tv_sec = timeout->tv_sec; + ctx->p_tmp_timeout.tv_usec = timeout->tv_usec; ctx->poolSize = 1; ctx->connPool = calloc(1, sizeof(edg_wll_ConnPool)); @@ -763,7 +753,6 @@ int bk_handle_connection(int conn, struct timeval *timeout, void *data) gettimeofday(&conn_start, 0); - /* not a critical operation, do not waste all SLAVE_TIMEOUT */ h_errno = asyn_gethostbyaddr(&name, (char *)&a.sin_addr.s_addr,sizeof(a.sin_addr.s_addr), AF_INET, &dns_to); switch ( h_errno ) { @@ -985,6 +974,7 @@ int bk_accept_store(int conn, struct timeval *timeout, void *cdata) case ETIMEDOUT: case EDG_WLL_ERROR_GSS: case EPIPE: + case EIO: dprintf(("[%d] %s (%s)\n", getpid(), errt, errd)); if (!debug) syslog(LOG_ERR,"%s (%s)", errt, errd); /* fallthrough @@ -1141,51 +1131,10 @@ int bk_accept_ws(int conn, struct timeval *timeout, void *cdata) } if ( err ) { - char *errt, *errd; - int ret; - - - errt = errd = NULL; - switch ( (ret = edg_wll_Error(ctx, &errt, &errd)) ) { - case ETIMEDOUT: - case EDG_WLL_ERROR_GSS: - case EPIPE: - dprintf(("[%d] %s (%s)\n", getpid(), errt, errd)); - if (!debug) syslog(LOG_ERR,"%s (%s)", errt, errd); - /* "recoverable" error - return (>0) - * fallthrough - */ - case ENOTCONN: - /* "recoverable" error - return (>0) - * return ENOTCONN to tell bones to clean up - */ - free(errt); free(errd); - return ret; - break; - - case ENOENT: - case EINVAL: - case EPERM: - case EEXIST: - case EDG_WLL_ERROR_NOINDEX: - case E2BIG: - dprintf(("[%d] %s (%s)\n", getpid(), errt, errd)); - if ( !debug ) syslog(LOG_ERR,"%s (%s)", errt, errd); - /* - * no action for non-fatal errors - */ - break; - - default: - dprintf(("[%d] %s (%s)\n", getpid(), errt, errd)); - if (!debug) syslog(LOG_CRIT,"%s (%s)",errt,errd); - /* - * unknown error - do rather return (<0) (slave will be killed) - */ - return -1; - } - free(errt); free(errd); - return 1; + // soap_print_fault(struct soap *soap, FILE *fd) maybe useful here + dprintf(("[%d] SOAP error (bk_accept_ws) \n", getpid())); + if (!debug) syslog(LOG_CRIT,"SOAP error (bk_accept_ws)"); + return ECANCELED; } return 0; diff --git a/org.glite.lb.server/src/db_store.c b/org.glite.lb.server/src/db_store.c index fc61e47..13a56fd 100644 --- a/org.glite.lb.server/src/db_store.c +++ b/org.glite.lb.server/src/db_store.c @@ -89,10 +89,12 @@ db_store(edg_wll_Context ctx,char *ucs, char *event) switch ( ev->any.type ) { case EDG_WLL_EVENT_CLEAR: case EDG_WLL_EVENT_ABORT: - case EDG_WLL_EVENT_CANCEL: - case EDG_WLL_EVENT_DONE: edg_wll_PurgeServerProxy(ctx, ev->any.jobId); break; + case EDG_WLL_EVENT_CANCEL: + if (ev->cancel.status_code == EDG_WLL_CANCEL_DONE) + edg_wll_PurgeServerProxy(ctx, ev->any.jobId); + break; default: break; } } else { diff --git a/org.glite.lb.server/src/il_lbproxy.c b/org.glite.lb.server/src/il_lbproxy.c index 4b682e7..487c847 100644 --- a/org.glite.lb.server/src/il_lbproxy.c +++ b/org.glite.lb.server/src/il_lbproxy.c @@ -18,7 +18,6 @@ edg_wll_EventSendProxy( const edg_wlc_JobId jobid, const char *event) { - struct timeval timeout; long filepos; char *jobid_s, *event_file = NULL; @@ -28,9 +27,6 @@ edg_wll_EventSendProxy( edg_wll_ResetError(ctx); - timeout.tv_sec = EDG_WLL_LOG_TIMEOUT_MAX; - timeout.tv_usec = 0; - jobid_s = edg_wlc_JobIdGetUnique(jobid); if ( !jobid_s ) { edg_wll_SetError(ctx, ENOMEM, "edg_wlc_JobIdGetUnique()"); @@ -44,13 +40,16 @@ edg_wll_EventSendProxy( } if ( edg_wll_log_event_write(ctx, event_file, event, - FCNTL_ATTEMPTS, FCNTL_TIMEOUT, &filepos) ) { + (ctx->p_tmp_timeout.tv_sec > FCNTL_ATTEMPTS ? + ctx->p_tmp_timeout.tv_sec : FCNTL_ATTEMPTS), + FCNTL_TIMEOUT, &filepos) ) { + edg_wll_UpdateError(ctx, 0, "edg_wll_log_event_write()"); _err(1); } if ( edg_wll_log_event_send(ctx, lbproxy_ilog_socket_path, filepos, - event, strlen(event), 1, &timeout) ) { + event, strlen(event), 1, &ctx->p_tmp_timeout) ) { edg_wll_UpdateError(ctx, 0, "edg_wll_log_event_send()"); _err(-1); } diff --git a/org.glite.lb.server/src/jobstat.h b/org.glite.lb.server/src/jobstat.h index 181a85d..352af06 100644 --- a/org.glite.lb.server/src/jobstat.h +++ b/org.glite.lb.server/src/jobstat.h @@ -64,9 +64,11 @@ void write2rgma_status(edg_wll_JobStat *); int before_deep_resubmission(const char *, const char *); int same_branch(const char *, const char *); int component_seqcode(const char *a, edg_wll_Source index); +char * set_component_seqcode(char *s,edg_wll_Source index,int val); int processEvent(intJobStat *, edg_wll_Event *, int, int, char **); int add_stringlist(char ***, const char *); int edg_wll_compare_seq(const char *, const char *); void init_intJobStat(intJobStat *p); + diff --git a/org.glite.lb.server/src/jobstat_supp.c b/org.glite.lb.server/src/jobstat_supp.c index 58d4a51..6fb09ef 100644 --- a/org.glite.lb.server/src/jobstat_supp.c +++ b/org.glite.lb.server/src/jobstat_supp.c @@ -658,6 +658,41 @@ int component_seqcode(const char *a, edg_wll_Source index) return(c[index]); } +char * set_component_seqcode(char *s,edg_wll_Source index,int val) +{ + unsigned int c[EDG_WLL_SOURCE__LAST]; + int res; + char *ret; + + res = sscanf(s, "UI=%d:NS=%d:WM=%d:BH=%d:JSS=%d:LM=%d:LRMS=%d:APP=%d", + &c[EDG_WLL_SOURCE_USER_INTERFACE], + &c[EDG_WLL_SOURCE_NETWORK_SERVER], + &c[EDG_WLL_SOURCE_WORKLOAD_MANAGER], + &c[EDG_WLL_SOURCE_BIG_HELPER], + &c[EDG_WLL_SOURCE_JOB_SUBMISSION], + &c[EDG_WLL_SOURCE_LOG_MONITOR], + &c[EDG_WLL_SOURCE_LRMS], + &c[EDG_WLL_SOURCE_APPLICATION]); + if (res != EDG_WLL_SOURCE__LAST-1) { + syslog(LOG_ERR, "unparsable sequence code %s\n", s); + fprintf(stderr, "unparsable sequence code %s\n", s); + return NULL; + } + + c[index] = val; + trio_asprintf(&ret,"UI=%06d:NS=%010d:WM=%06d:BH=%010d:JSS=%06d" + ":LM=%06d:LRMS=%06d:APP=%06d", + c[EDG_WLL_SOURCE_USER_INTERFACE], + c[EDG_WLL_SOURCE_NETWORK_SERVER], + c[EDG_WLL_SOURCE_WORKLOAD_MANAGER], + c[EDG_WLL_SOURCE_BIG_HELPER], + c[EDG_WLL_SOURCE_JOB_SUBMISSION], + c[EDG_WLL_SOURCE_LOG_MONITOR], + c[EDG_WLL_SOURCE_LRMS], + c[EDG_WLL_SOURCE_APPLICATION]); + return ret; +} + int before_deep_resubmission(const char *a, const char *b) { if (component_seqcode(a, EDG_WLL_SOURCE_WORKLOAD_MANAGER) < diff --git a/org.glite.lb.server/src/process_event.c b/org.glite.lb.server/src/process_event.c index ab13ed4..7b0fb1d 100644 --- a/org.glite.lb.server/src/process_event.c +++ b/org.glite.lb.server/src/process_event.c @@ -1,8 +1,11 @@ +#ident "$Header$" + #include #include #include #include #include +#include #include "glite/lb/producer.h" #include "glite/lb/context-int.h" @@ -166,6 +169,17 @@ static void load_branch_state(intJobStat *js) } } +// clear branches (deep resub. or abort) +static void reset_branch(intJobStat *js, edg_wll_Event *e) +{ + js->resubmit_type = EDG_WLL_RESUBMISSION_WILLRESUB; + free_stringlist(&js->pub.possible_destinations); + free_stringlist(&js->pub.possible_ce_nodes); + free_branch_state(&js->branch_states); + js->pub.payload_running = 0; + rep(js->branch_tag_seqcode, NULL); + rep(js->deep_resubmit_seqcode, e->any.seqcode); +} static char* location_string(const char *source, const char *host, const char *instance) { @@ -174,6 +188,12 @@ static char* location_string(const char *source, const char *host, const char *i return ret; } +static int after_enter_wm(const char *es,const char *js) +{ + return component_seqcode(es,EDG_WLL_SOURCE_NETWORK_SERVER) > + component_seqcode(js,EDG_WLL_SOURCE_NETWORK_SERVER); +} + static int badEvent(intJobStat *js UNUSED_VAR, edg_wll_Event *e, int ev_seq UNUSED_VAR) { @@ -186,11 +206,15 @@ static int badEvent(intJobStat *js UNUSED_VAR, edg_wll_Event *e, int ev_seq UNUS return RET_FATAL; } +// (?) || (0 && 1) => true if (res == RET_OK) #define USABLE(res,strict) ((res) == RET_OK || ( (res) == RET_SOON && !strict)) + +// (?) || (1 && 1) => always true #define USABLE_DATA(res,strict) ((res) == RET_OK || ( (res) != RET_FATAL && !strict)) + #define USABLE_BRANCH(fine_res) ((fine_res) != RET_TOOOLD && (fine_res) != RET_BADBRANCH) #define LRMS_STATE(state) ((state) == EDG_WLL_JOB_RUNNING || (state) == EDG_WLL_JOB_DONE) - +#define PARSABLE_SEQCODE(code) (component_seqcode((code),0) >= 0) int processEvent(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict, char **errstring) { @@ -200,7 +224,7 @@ int processEvent(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict, char int res = RET_OK, fine_res = RET_OK; - + int lm_favour_lrms = 0; if (old_state == EDG_WLL_JOB_ABORTED || old_state == EDG_WLL_JOB_CANCELLED || @@ -208,6 +232,15 @@ int processEvent(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict, char res = RET_LATE; } +/* new event coming from NS => forget about any resubmission loops */ + if (e->type != EDG_WLL_EVENT_CANCEL && + js->last_seqcode && + after_enter_wm(e->any.seqcode,js->last_seqcode)) + { + rep(js->branch_tag_seqcode,NULL); + rep(js->deep_resubmit_seqcode,NULL); + rep(js->last_branch_seqcode,NULL); + } if (js->deep_resubmit_seqcode && before_deep_resubmission(e->any.seqcode, js->deep_resubmit_seqcode)) { @@ -216,7 +249,7 @@ int processEvent(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict, char } else if (js->branch_tag_seqcode) { // ReallyRunning ev. arrived if (same_branch(e->any.seqcode, js->branch_tag_seqcode)) { - if ((js->last_seqcode != NULL) && + if ((js->last_branch_seqcode != NULL) && edg_wll_compare_seq(e->any.seqcode, js->last_branch_seqcode) < 0) { res = RET_LATE; } @@ -248,7 +281,9 @@ int processEvent(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict, char e->any.timestamp.tv_sec; res = RET_LATE; } - new_state = EDG_WLL_JOB_SCHEDULED; break; + new_state = EDG_WLL_JOB_SCHEDULED; + lm_favour_lrms = 1; + break; default: goto bad_event; break; } @@ -307,7 +342,9 @@ int processEvent(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict, char new_state = EDG_WLL_JOB_WAITING; break; case EDG_WLL_SOURCE_LOG_MONITOR: if (LRMS_STATE(old_state)) res = RET_LATE; - new_state = EDG_WLL_JOB_READY; break; + new_state = EDG_WLL_JOB_READY; + lm_favour_lrms = 1; + break; case EDG_WLL_SOURCE_LRMS: new_state = EDG_WLL_JOB_SCHEDULED; break; default: @@ -480,17 +517,62 @@ int processEvent(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict, char if (USABLE_BRANCH(fine_res)) { rep(js->pub.ce_node, e->running.node); } - if (e->any.source == EDG_WLL_SOURCE_LOG_MONITOR) { + /* why? if (e->any.source == EDG_WLL_SOURCE_LOG_MONITOR) { */ if (e->running.node) { update_branch_state(e->any.seqcode, NULL, e->running.node, NULL, &js->branch_states); add_stringlist(&js->pub.possible_ce_nodes, e->running.node); } - } + /* } */ } break; case EDG_WLL_EVENT_REALLYRUNNING: + /* consistence check -- should not receive two contradicting ReallyRunning's within single + deep resub cycle */ + if (fine_res == RET_BADBRANCH) { + syslog(LOG_ERR,"ReallyRunning on bad branch %s", + e->any.source == EDG_WLL_SOURCE_LOG_MONITOR ? e->reallyRunning.wn_seq : e->any.seqcode); + break; + } + /* select the branch unless TOOOLD, i.e. before deep resubmission */ + if (!(res == RET_LATE && fine_res == RET_TOOOLD)) { + if (e->any.source == EDG_WLL_SOURCE_LRMS) { + rep(js->branch_tag_seqcode, e->any.seqcode); + if (res == RET_OK) { + rep(js->last_branch_seqcode, e->any.seqcode); + js->pub.state = EDG_WLL_JOB_RUNNING; + } + } + if (e->any.source == EDG_WLL_SOURCE_LOG_MONITOR) { + if (!js->branch_tag_seqcode) { + if (PARSABLE_SEQCODE(e->reallyRunning.wn_seq)) { + rep(js->branch_tag_seqcode, e->reallyRunning.wn_seq); + } else + goto bad_event; + } + if (!js->last_branch_seqcode) { + if (PARSABLE_SEQCODE(e->reallyRunning.wn_seq)) { + if (res == RET_OK) { + rep(js->last_branch_seqcode, e->reallyRunning.wn_seq); + js->pub.state = EDG_WLL_JOB_RUNNING; + } + } else + goto bad_event; + } + } + + /* XXX: best effort -- if we are lucky, ReallyRunning is on the last shallow cycle, + so we take in account events processed so far */ + if (res == RET_LATE && !js->last_branch_seqcode) { + if (same_branch(js->last_seqcode,js->branch_tag_seqcode)) + rep(js->last_branch_seqcode,js->last_seqcode); + } + + js->pub.payload_running = 1; + load_branch_state(js); + } +#if 0 if (USABLE_DATA(res, strict)) { js->pub.state = EDG_WLL_JOB_RUNNING; free(js->pub.location); @@ -504,12 +586,22 @@ int processEvent(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict, char rep(js->last_branch_seqcode, e->any.seqcode); } if (e->any.source == EDG_WLL_SOURCE_LOG_MONITOR) { - rep(js->branch_tag_seqcode, e->reallyRunning.wn_seq); - rep(js->last_branch_seqcode, e->reallyRunning.wn_seq); + if (!js->branch_tag_seqcode) { + if (PARSABLE_SEQCODE(e->reallyRunning.wn_seq)) { + rep(js->branch_tag_seqcode, e->reallyRunning.wn_seq); + } else + goto bad_event; + } + if (!js->last_branch_seqcode) { + if (PARSABLE_SEQCODE(e->reallyRunning.wn_seq)) { + rep(js->last_branch_seqcode, e->reallyRunning.wn_seq); + } else + goto bad_event; + } } - load_branch_state(js); } +#endif break; case EDG_WLL_EVENT_RESUBMISSION: if (USABLE(res, strict)) { @@ -524,18 +616,13 @@ int processEvent(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict, char else if (e->resubmission.result == EDG_WLL_RESUBMISSION_WILLRESUB && e->any.source == EDG_WLL_SOURCE_WORKLOAD_MANAGER) { - js->resubmit_type = EDG_WLL_RESUBMISSION_WILLRESUB; - free_stringlist(&js->pub.possible_destinations); - free_stringlist(&js->pub.possible_ce_nodes); - free_branch_state(&js->branch_states); - js->pub.payload_running = 0; - rep(js->branch_tag_seqcode, NULL); - rep(js->deep_resubmit_seqcode, e->any.seqcode); + reset_branch(js, e); } else if (e->resubmission.result == EDG_WLL_RESUBMISSION_SHALLOW) { js->resubmit_type = EDG_WLL_RESUBMISSION_SHALLOW; - rep(js->deep_resubmit_seqcode, NULL); + // deep resubmit stays forever deadline for events + // rep(js->deep_resubmit_seqcode, NULL); } } break; @@ -613,11 +700,15 @@ int processEvent(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict, char } break; case EDG_WLL_EVENT_ABORT: + // XXX: accept Abort from WM in every case + // setting res make USABLE macro true (awful !!) + if (e->any.source == EDG_WLL_SOURCE_WORKLOAD_MANAGER) res = RET_OK; if (USABLE(res, strict)) { js->pub.state = EDG_WLL_JOB_ABORTED; rep(js->pub.reason, e->abort.reason); rep(js->pub.location, "none"); - js->pub.payload_running = 0; + + reset_branch(js, e); } break; @@ -723,6 +814,28 @@ int processEvent(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict, char js->pub.stateEnterTimes[1 + js->pub.state] = (int)js->pub.lastUpdateTime.tv_sec; } + if (e->any.type == EDG_WLL_EVENT_CANCEL) { + rep(js->last_cancel_seqcode, e->any.seqcode); + } else { + +/* the first set of LM events (Accept, Transfer/* -> LRMS) + should not should shift the state (to Ready, Scheduled) but NOT to + update js->last_seqcode completely, in order not to block following + LRMS events which are likely to arrive later but should still affect + job state (as there may be no more LM events due to the Condor bug). + However, don't ignore the incoming seqcode completely, to catch up + with possibly delayed WM/JSS events */ + + if (lm_favour_lrms) { + free(js->last_seqcode); + js->last_seqcode = set_component_seqcode(e->any.seqcode,EDG_WLL_SOURCE_LOG_MONITOR,0); + } + else rep(js->last_seqcode, e->any.seqcode); + } + + if (fine_res == RET_GOODBRANCH) { + rep(js->last_branch_seqcode, e->any.seqcode); + } } if (USABLE_DATA(res,strict)) { @@ -737,16 +850,6 @@ int processEvent(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict, char } } - if (e->any.type == EDG_WLL_EVENT_CANCEL) { - rep(js->last_cancel_seqcode, e->any.seqcode); - } else { - rep(js->last_seqcode, e->any.seqcode); - } - - if (fine_res == RET_GOODBRANCH) { - rep(js->last_branch_seqcode, e->any.seqcode); - } - return res; bad_event: diff --git a/org.glite.lb.server/src/query.c b/org.glite.lb.server/src/query.c index 5d86547..196c9be 100644 --- a/org.glite.lb.server/src/query.c +++ b/org.glite.lb.server/src/query.c @@ -78,7 +78,8 @@ int edg_wll_QueryEventsServer( goto cleanup; if (event_conditions && *event_conditions && (*event_conditions)->attr && - !(event_where = ec_to_head_where(ctx,event_conditions))) + !(event_where = ec_to_head_where(ctx,event_conditions)) && + edg_wll_Error(ctx,NULL,NULL) != 0) goto cleanup; if ( job_conditions && *job_conditions && (*job_conditions)->attr && @@ -863,7 +864,7 @@ static char *jc_to_head_where( free(aux); } else - trio_asprintf(&tmps, "%s OR s.%s %s s.%s", conds, cname, opToString(jc[m][n].op), dbt); + trio_asprintf(&tmps, "%s OR s.%s %s %s", conds, cname, opToString(jc[m][n].op), dbt); free(conds); conds = tmps; @@ -872,7 +873,7 @@ static char *jc_to_head_where( { trio_asprintf(&aux, "%s", dbt); dbt = edg_wll_TimeToDB(jc[m][n].value2.t.tv_sec); - trio_asprintf(&conds, "(%s >= s.%s AND s.%s <= %s)", cname, aux, cname, dbt); + trio_asprintf(&conds, "(s.%s >= %s AND s.%s <= %s)", cname, aux, cname, dbt); free(aux); } else diff --git a/org.glite.lb.server/src/request.c b/org.glite.lb.server/src/request.c index a3288a4..f0364ac 100644 --- a/org.glite.lb.server/src/request.c +++ b/org.glite.lb.server/src/request.c @@ -26,7 +26,7 @@ handle_request(edg_wll_Context ctx,char *buf) ret = decode_il_msg(&event, buf); if(ret < 0) { - edg_wll_SetError(ctx,EDG_WLL_IL_PROTO,"reading event string"); + edg_wll_SetError(ctx,EDG_WLL_IL_PROTO,"decoding event string failed"); return EDG_WLL_IL_PROTO; } diff --git a/org.glite.lb.server/src/store.c.T b/org.glite.lb.server/src/store.c.T index b2bcbc4..1130c93 100644 --- a/org.glite.lb.server/src/store.c.T +++ b/org.glite.lb.server/src/store.c.T @@ -128,7 +128,7 @@ int edg_wll_StoreEvent(edg_wll_Context ctx,edg_wll_Event *e,int *seq) free(stmt); } - free(stmt); + free(stmt); stmt = NULL; if ((err = store_seq(ctx,e,next)) || (err = store_flesh(ctx,e,jobid,next))) { /* attempt to cleanup, ignore new errors */ @@ -333,11 +333,13 @@ static int store_flesh(edg_wll_Context ctx,edg_wll_Event *e,char *jobid,int no) for (i=0; iany.src_instance); - if (edg_wll_ExecStmt(ctx,stmt,NULL) < 0) err = edg_wll_Error(ctx,NULL,NULL); - free(stmt); + if (!err) { + trio_asprintf(&stmt,"insert into short_fields(jobid,event,name,value) " + "values ('%|Ss',%d,'SRC_INSTANCE','%|Ss')", + jobid,no,e->any.src_instance); + if (edg_wll_ExecStmt(ctx,stmt,NULL) < 0) err = edg_wll_Error(ctx,NULL,NULL); + free(stmt); + } return err; } @@ -490,9 +492,11 @@ static int register_subjobs(edg_wll_Context ctx,const edg_wll_RegJobEvent *e) e2.any.host = strdup(ctx->srvName); e2.any.level = e->level; e2.any.priority = e->priority; - e2.any.seqcode = NULL; /* XXX: I'm not sure :-( */ + e2.any.seqcode = strdup(EDG_WLL_SEQ_NULL); e2.any.user = strdup(e->user); e2.any.source = e->source; + e2.any.src_instance = strdup(ctx->isProxy ? + "L&B proxy" : "L&B server"); e2.regJob.ns = strdup(e->ns); edg_wlc_JobIdDup(e->jobId,&e2.regJob.parent); e2.regJob.jobtype = EDG_WLL_REGJOB_SIMPLE; diff --git a/org.glite.lb.server/src/stored_master.c b/org.glite.lb.server/src/stored_master.c index 748d851..c2ca48d 100644 --- a/org.glite.lb.server/src/stored_master.c +++ b/org.glite.lb.server/src/stored_master.c @@ -58,8 +58,10 @@ int edg_wll_StoreProto(edg_wll_Context ctx) edg_wll_ResetError(ctx); ret = read_il_data(ctx, &buf, gss_reader); + if (ret == EDG_WLL_GSS_ERROR_EOF) + return edg_wll_SetError(ctx,ENOTCONN,"client side"); if(ret < 0) - return(ret); + return edg_wll_SetError(ctx,EIO,"interlogger protocol"); handle_request(ctx,buf); free(buf); -- 1.8.2.3