From 7fdf1f2f27b579b58214dc915de14881ed32be52 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Ale=C5=A1=20K=C5=99enek?= Date: Wed, 3 Aug 2005 12:44:06 +0000 Subject: [PATCH] Merged the SHALLOW branch --- org.glite.lb.client/examples/gen_sample_job | 6 + org.glite.lb.client/examples/job_status.c | 12 ++ org.glite.lb.client/examples/ready.l | 2 +- org.glite.lb.client/examples/running.l | 2 +- org.glite.lb.common/src/xml_parse.c.T | 18 ++ org.glite.lb.server/Makefile | 5 +- org.glite.lb.server/src/jobstat.c | 240 +++++++++++++++++++++++++-- org.glite.lb.server/src/jobstat.h | 21 ++- org.glite.lb.server/src/jobstat_supp.c | 153 ++++++++++++++++- org.glite.lb.server/src/lb_xml_parse.c.T | 2 + org.glite.lb/project/dependencies.properties | 32 ++-- org.glite.lb/project/events.T | 11 +- org.glite.lb/project/status.T | 6 + 13 files changed, 473 insertions(+), 37 deletions(-) diff --git a/org.glite.lb.client/examples/gen_sample_job b/org.glite.lb.client/examples/gen_sample_job index cc9dfac..c24dc1c 100755 --- a/org.glite.lb.client/examples/gen_sample_job +++ b/org.glite.lb.client/examples/gen_sample_job @@ -93,6 +93,12 @@ awk -F, $NESTED \ /-e UserTag/ { if (checkNOP(4) == 0) logit(); next;} +/-e ReallyRunning/ { if (checkNOP(3) == 0) logit(); + next;} + +# shell escape (for sequence number branching) + +/^!/ { print substr($0,2,(length($0) - 1)); } # macro processing - macro name starts and ends with ':' # in a file gen_MACRONAME.txt is the macro describtion diff --git a/org.glite.lb.client/examples/job_status.c b/org.glite.lb.client/examples/job_status.c index 489d194..e894065 100644 --- a/org.glite.lb.client/examples/job_status.c +++ b/org.glite.lb.client/examples/job_status.c @@ -240,6 +240,18 @@ static void printstat(edg_wll_JobStat stat, int level) printf("%sexpectUpdate : %d\n", ind, stat.expectUpdate); printf("%sexpectFrom : %s\n", ind, stat.expectFrom); printf("%sacl : %s\n", ind, stat.acl); + printf("%spayload_running: %d\n", ind, stat.payload_running); + if (stat.possible_destinations) { + printf("%spossible_destinations : \n", ind); + for (i=0; stat.possible_destinations[i]; i++) + printf("%s\t%s \n", ind, stat.possible_destinations[i]); + } + if (stat.possible_ce_nodes) { + printf("%spossible_ce_nodes : \n", ind); + for (i=0; stat.possible_ce_nodes[i]; i++) + printf("%s\t%s \n", ind, stat.possible_ce_nodes[i]); + } + printf("\n"); free(j); diff --git a/org.glite.lb.client/examples/ready.l b/org.glite.lb.client/examples/ready.l index e4d9ac7..530a503 100644 --- a/org.glite.lb.client/examples/ready.l +++ b/org.glite.lb.client/examples/ready.l @@ -12,7 +12,7 @@ -s WorkloadManager,-e HelperCall, --helper_name="name of the called component",--helper_params="parameters of the call", --src_role=CALLING #-s Helper,-e HelperCall, --helper_name="name of the called component",--helper_params="parameters of the call", --src_role=CALLED #-s Helper,-e HelperReturn, --helper_name="name of the called component",--retval="returned data", --src_role=CALLED --s WorkloadManager,-e Match,--dest_id="Id of the destination CE/queue" +-s WorkloadManager,-e Match,--dest_id="${DESTINATION:-destination CE/queue}" -s WorkloadManager,-e HelperReturn, --helper_name="name of the called component",--retval="returned data", --src_role=CALLING -s WorkloadManager,-e EnQueued, --queue="destination queue", --job="job description in receiver language", --result=OK, --reason="detailed description of transfer" diff --git a/org.glite.lb.client/examples/running.l b/org.glite.lb.client/examples/running.l index 12d411a..60dfe6a 100644 --- a/org.glite.lb.client/examples/running.l +++ b/org.glite.lb.client/examples/running.l @@ -1,4 +1,4 @@ # macro definition for RUNNING state :scheduled: --s LogMonitor,-e Running,--node"worker node where the executable is run" +-s LogMonitor,-e Running,--node="${CE_NODE:-worker node}" diff --git a/org.glite.lb.common/src/xml_parse.c.T b/org.glite.lb.common/src/xml_parse.c.T index 749c4b8..cf265ff 100644 --- a/org.glite.lb.common/src/xml_parse.c.T +++ b/org.glite.lb.common/src/xml_parse.c.T @@ -320,6 +320,8 @@ static void startJobStatus(void *data, const char *el, const char **attr) case 1 : if (!strcmp("user_tags", el) || !strcmp("user_values", el) || !strcmp("children_hist", el) || !strcmp("stateEnterTimes", el) + || !strcmp("possible_destinations", el) + || !strcmp("possible_ce_nodes", el) || !strcmp("children_states", el) || !strcmp("children", el)) { XMLCtx->stat_begin = XML_GetCurrentByteIndex(XMLCtx->p); } @@ -937,6 +939,22 @@ static void endJobStat(void *data, const char *el) "children_hist",(int (*)()) edg_wll_StringToStat, &XMLCtx->jobStatSingleGlobal.children_hist); XMLCtx->stat_begin = 0; } + else if (!strcmp(el,"possible_destinations")) { + long len = (XML_GetCurrentByteIndex(XMLCtx->p) + XML_GetCurrentByteCount(XMLCtx->p)) + - XMLCtx->stat_begin; + + edg_wll_ParseStrList(XMLCtx->ctx, XMLCtx->message_body + XMLCtx->stat_begin, len, + "possible_destinations", "name", &XMLCtx->jobStatSingleGlobal.possible_destinations); + XMLCtx->stat_begin = 0; + } + else if (!strcmp(el,"possible_ce_nodes")) { + long len = (XML_GetCurrentByteIndex(XMLCtx->p) + XML_GetCurrentByteCount(XMLCtx->p)) + - XMLCtx->stat_begin; + + edg_wll_ParseStrList(XMLCtx->ctx, XMLCtx->message_body + XMLCtx->stat_begin, len, + "possible_ce_nodes", "name", &XMLCtx->jobStatSingleGlobal.possible_ce_nodes); + XMLCtx->stat_begin = 0; + } else if (!strcmp(el,"children")) { long len = (XML_GetCurrentByteIndex(XMLCtx->p) + XML_GetCurrentByteCount(XMLCtx->p)) - XMLCtx->stat_begin; diff --git a/org.glite.lb.server/Makefile b/org.glite.lb.server/Makefile index 9147b02..a1ce060 100644 --- a/org.glite.lb.server/Makefile +++ b/org.glite.lb.server/Makefile @@ -26,6 +26,7 @@ GLITE_LB_SERVER_WITH_WS=yes ifeq ($(GLITE_LB_SERVER_WITH_WS),yes) WS_CFLAGS=-DGLITE_LB_SERVER_WITH_WS + NSMAP=LoggingAndBookkeeping.nsmap else WS_CFLAGS= endif @@ -163,7 +164,7 @@ LIB_OBJS_BK:= \ notification.o il_notification.o notif_match.o stats.o STATIC_LIB_BK:=libglite_lb_bkserver.a -glite_lb_bkserverd: LoggingAndBookkeeping.nsmap ${BKSERVER_OBJS} +glite_lb_bkserverd: ${NSMAP} ${BKSERVER_OBJS} ${LINK} -o $@ ${BKSERVER_OBJS} ${BKSERVER_LIBS} glite_lb_bkindex: ${INDEX_OBJS} @@ -327,4 +328,6 @@ soap_version.h: perl -ne '$$. == 2 && /.*([0-9])\.([0-9])\.([0-9]).*/ && printf "#define GSOAP_VERSION %d%02d%02d\n",$$1,$$2,$$3' soapH.h >$@ -rm soapC.cpp soapH.h soapStub.h soapClient.cpp soapServer.cpp soapClientLib.cpp soapServerLib.cpp +ifeq ($(GLITE_LB_SERVER_WITH_WS),yes) bkserverd.o ws_fault.o: soap_version.h +endif diff --git a/org.glite.lb.server/src/jobstat.c b/org.glite.lb.server/src/jobstat.c index 00a8aa4..48071d3 100644 --- a/org.glite.lb.server/src/jobstat.c +++ b/org.glite.lb.server/src/jobstat.c @@ -39,6 +39,9 @@ #define RET_BADSEQ 4 #define RET_SUSPECT 5 #define RET_IGNORE 6 +#define RET_BADBRANCH 7 +#define RET_GOODBRANCH 8 +#define RET_TOOOLD 9 #define RET_INTERNAL 100 #define rep(a,b) { free(a); a = (b == NULL) ? NULL : strdup(b); } @@ -48,7 +51,11 @@ static void warn (const char* format, ...) UNUSED_VAR ; static char *job_owner(edg_wll_Context,char *); static char* location_string(const char*, const char*, const char*); static int add_stringlist(char ***, const char *) UNUSED_VAR; +static void free_stringlist(char ***); static int add_taglist(edg_wll_TagValue **, const char *, const char *); +static void update_branch_state(char *, char *, char *, char *, branch_state **); +static void free_branch_state(branch_state **); +static void load_branch_state(intJobStat *); int edg_wll_intJobStatus(edg_wll_Context, const edg_wlc_JobId, int, intJobStat *, int); @@ -76,7 +83,7 @@ static void destroy_intJobStat_extension(intJobStat *p) { free(p->last_seqcode); p->last_seqcode = NULL; free(p->last_cancel_seqcode); p->last_cancel_seqcode = NULL; - p->wontresub = 0; + p->resubmit_type = EDG_WLL_RESUBMISSION_UNDEFINED; } void destroy_intJobStat(intJobStat *p) @@ -411,6 +418,7 @@ static int badEvent(intJobStat *js UNUSED_VAR, edg_wll_Event *e, int ev_seq UNUS #define USABLE(res,strict) ((res) == RET_OK || ( (res) == RET_SOON && !strict)) #define USABLE_DATA(res,strict) ((res) == RET_OK || ( (res) != RET_FATAL && !strict)) +#define USABLE_BRANCH(fine_res) ((fine_res) != RET_TOOOLD && (fine_res) != RET_BADBRANCH) #define LRMS_STATE(state) ((state) == EDG_WLL_JOB_RUNNING || (state) == EDG_WLL_JOB_DONE) @@ -419,7 +427,10 @@ static int processEvent(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict edg_wll_JobStatCode old_state = js->pub.state; edg_wll_JobStatCode new_state = EDG_WLL_JOB_UNKNOWN; - int res = RET_OK; + int res = RET_OK, + fine_res = RET_OK; + + if (old_state == EDG_WLL_JOB_ABORTED || old_state == EDG_WLL_JOB_CANCELLED || @@ -427,11 +438,31 @@ static int processEvent(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict res = RET_LATE; } - if (js->last_seqcode != NULL && - edg_wll_compare_seq(e->any.seqcode, js->last_seqcode) < 0) { + + if (js->deep_resubmit_seqcode && + before_deep_resubmission(e->any.seqcode, js->deep_resubmit_seqcode)) { + res = RET_LATE; + fine_res = RET_TOOOLD; + } + else if (js->branch_tag_seqcode) { // ReallyRunning ev. arrived + if (same_branch(e->any.seqcode, js->branch_tag_seqcode)) { + if ((js->last_seqcode != NULL) && + edg_wll_compare_seq(e->any.seqcode, js->last_branch_seqcode) < 0) { + res = RET_LATE; + } + fine_res = RET_GOODBRANCH; + } + else { + res = RET_LATE; + fine_res = RET_BADBRANCH; + } + } + else if ((js->last_seqcode != NULL) && + edg_wll_compare_seq(e->any.seqcode, js->last_seqcode) < 0) { res = RET_LATE; } + switch (e->any.type) { case EDG_WLL_EVENT_TRANSFER: if (e->transfer.result == EDG_WLL_TRANSFER_OK) { @@ -563,6 +594,8 @@ static int processEvent(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict new_state = EDG_WLL_JOB_WAITING; break; case EDG_WLL_SOURCE_WORKLOAD_MANAGER: if (LRMS_STATE(old_state)) res = RET_LATE; + update_branch_state(e->any.seqcode, NULL, + NULL, e->enQueued.job, &js->branch_states); new_state = EDG_WLL_JOB_READY; break; case EDG_WLL_SOURCE_LOG_MONITOR: new_state = EDG_WLL_JOB_WAITING; break; @@ -608,7 +641,10 @@ static int processEvent(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict case EDG_WLL_SOURCE_NETWORK_SERVER: rep(js->pub.jdl, e->enQueued.job); break; case EDG_WLL_SOURCE_WORKLOAD_MANAGER: - rep(js->pub.matched_jdl, e->enQueued.job); break; + if (USABLE_BRANCH(res)) { + rep(js->pub.matched_jdl, e->enQueued.job); + } + break; case EDG_WLL_SOURCE_LOG_MONITOR: /* no interim JDL here */ break; @@ -671,7 +707,38 @@ static int processEvent(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict e->running.node); } if (USABLE_DATA(res, strict)) { - rep(js->pub.ce_node, e->running.node); + if (USABLE_BRANCH(fine_res)) { + rep(js->pub.ce_node, e->running.node); + } + if (e->any.source == EDG_WLL_SOURCE_LOG_MONITOR) { + if (e->running.node) { + update_branch_state(e->any.seqcode, NULL, + e->running.node, NULL, &js->branch_states); + add_stringlist(&js->pub.possible_ce_nodes, + e->running.node); + } + } + } + break; + case EDG_WLL_EVENT_REALLYRUNNING: + if (USABLE_DATA(res, strict)) { + js->pub.state = EDG_WLL_JOB_RUNNING; + free(js->pub.location); + js->pub.location = location_string( + edg_wll_SourceToString(EDG_WLL_SOURCE_LRMS), + "worknode", + e->running.node); + js->pub.payload_running = 1; + if (e->any.source == EDG_WLL_SOURCE_LRMS) { + rep(js->branch_tag_seqcode, e->any.seqcode); + rep(js->last_branch_seqcode, e->any.seqcode); + } + if (e->any.source == EDG_WLL_SOURCE_LOG_MONITOR) { + rep(js->branch_tag_seqcode, e->reallyRunning.wn_seq); + rep(js->last_branch_seqcode, e->reallyRunning.wn_seq); + } + + load_branch_state(js); } break; case EDG_WLL_EVENT_RESUBMISSION: @@ -682,7 +749,23 @@ static int processEvent(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict } if (USABLE_DATA(res, strict)) { if (e->resubmission.result == EDG_WLL_RESUBMISSION_WONTRESUB) { - js->wontresub = 1; + js->resubmit_type = EDG_WLL_RESUBMISSION_WONTRESUB; + } + else + if (e->resubmission.result == EDG_WLL_RESUBMISSION_WILLRESUB && + e->any.source == EDG_WLL_SOURCE_WORKLOAD_MANAGER) { + js->resubmit_type = EDG_WLL_RESUBMISSION_WILLRESUB; + free_stringlist(&js->pub.possible_destinations); + free_stringlist(&js->pub.possible_ce_nodes); + free_branch_state(&js->branch_states); + js->pub.payload_running = 0; + rep(js->branch_tag_seqcode, NULL); + rep(js->deep_resubmit_seqcode, e->any.seqcode); + } + else + if (e->resubmission.result == EDG_WLL_RESUBMISSION_SHALLOW) { + js->resubmit_type = EDG_WLL_RESUBMISSION_SHALLOW; + rep(js->deep_resubmit_seqcode, NULL); } } break; @@ -695,6 +778,9 @@ static int processEvent(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict if (USABLE(res, strict)) { js->pub.state = EDG_WLL_JOB_DONE; rep(js->pub.reason, e->done.reason); + if (fine_res == RET_GOODBRANCH) { + js->pub.payload_running = 0; + } switch (e->done.status_code) { case EDG_WLL_DONE_CANCELLED: js->pub.state = EDG_WLL_JOB_CANCELLED; @@ -725,10 +811,15 @@ static int processEvent(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict } break; case EDG_WLL_EVENT_CANCEL: - if (js->last_cancel_seqcode != NULL && - edg_wll_compare_seq(e->any.seqcode, js->last_cancel_seqcode) < 0) { - res = RET_LATE; + if (fine_res != RET_BADBRANCH) { + if (js->last_cancel_seqcode != NULL && + edg_wll_compare_seq(e->any.seqcode, js->last_cancel_seqcode) < 0) { + res = RET_LATE; + } } + else { + res = RET_LATE; + } if (USABLE(res, strict)) { switch (e->cancel.status_code) { case EDG_WLL_CANCEL_REQ: @@ -756,6 +847,7 @@ static int processEvent(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict js->pub.state = EDG_WLL_JOB_ABORTED; rep(js->pub.reason, e->abort.reason); rep(js->pub.location, "none"); + js->pub.payload_running = 0; } break; @@ -791,7 +883,15 @@ static int processEvent(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict e->match.src_instance); } if (USABLE_DATA(res, strict)) { - rep(js->pub.destination, e->match.dest_id); + if (USABLE_BRANCH(fine_res)) { + rep(js->pub.destination, e->match.dest_id); + } + if (e->match.dest_id) { + update_branch_state(e->any.seqcode, e->match.dest_id, + NULL, NULL, &js->branch_states); + add_stringlist(&js->pub.possible_destinations, + e->match.dest_id); + } } break; case EDG_WLL_EVENT_PENDING: @@ -873,6 +973,10 @@ static int processEvent(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict rep(js->last_seqcode, e->any.seqcode); } + if (fine_res == RET_GOODBRANCH) { + rep(js->last_branch_seqcode, e->any.seqcode); + } + return res; bad_event: @@ -1009,6 +1113,19 @@ static int add_stringlist(char ***lptr, const char *new_item) } } +static void free_stringlist(char ***lptr) +{ + char **itptr; + int i; + + if (*lptr) { + for (i = 0, itptr = *lptr; itptr[i] != NULL; i++) + free(itptr[i]); + free(itptr); + *lptr = NULL; + } +} + static int add_taglist(edg_wll_TagValue **lptr, const char *new_item, const char *new_item2) { edg_wll_TagValue *itptr; @@ -1042,6 +1159,107 @@ static int add_taglist(edg_wll_TagValue **lptr, const char *new_item, const char } } +static void update_branch_state(char *b, char *d, char *c, char *j, branch_state **bs) +{ + int i = 0, branch; + + + if (!b) + return; + else + branch = component_seqcode(b, EDG_WLL_SOURCE_WORKLOAD_MANAGER); + + if (*bs != NULL) { + while ((*bs)[i].branch) { + if (branch == (*bs)[i].branch) { + if (d) rep((*bs)[i].destination, d); + if (c) rep((*bs)[i].ce_node, c); + if (j) rep((*bs)[i].jdl, j); + + return; + } + i++; + } + } + + *bs = (branch_state *) realloc(*bs, (i+2)*sizeof(branch_state)); + memset(&((*bs)[i]), 0, 2*sizeof(branch_state)); + + (*bs)[i].branch = branch; + rep((*bs)[i].destination, d); + rep((*bs)[i].ce_node, c); + rep((*bs)[i].jdl, j); +} + + +static void free_branch_state(branch_state **bs) +{ + int i = 0; + + if (*bs == NULL) return; + + while ((*bs)[i].branch) { + free((*bs)[i].destination); + free((*bs)[i].ce_node); + free((*bs)[i].jdl); + i++; + } + free(*bs); + *bs = NULL; +} + +static int compare_branch_states(const void *a, const void *b) +{ + branch_state *c = (branch_state *) a; + branch_state *d = (branch_state *) b; + + if (c->branch < d->branch) return -1; + if (c->branch == d->branch) return 0; + if (c->branch > d->branch) return 1; +} + +static void load_branch_state(intJobStat *js) +{ + int i, j, branch; + + + if ( (!js->branch_tag_seqcode) || (!js->branch_states) ) return; + + branch = component_seqcode(js->branch_tag_seqcode, EDG_WLL_SOURCE_WORKLOAD_MANAGER); + + // count elements + i = 0; + while (js->branch_states[i].branch) i++; + + // sort them + qsort(js->branch_states, (size_t) i, sizeof(branch_state), + compare_branch_states); + + // find row corresponding to ReallyRunning WM seq.code (aka branch) + i = 0; + while (js->branch_states[i].branch) { + if (js->branch_states[i].branch == branch) break; + i++; + } + + // copy this and two before branches data to final state + // (each field - dest,ce,jdl - comes from different event) + // (and these events have most likely different WM seq.codes) + // (even belonging into one logical branch) + // (the newer the more important - so i-th element is copied as last) + // (and may overwrite data from previous elements) + for (j = i - 2; j <= i; j++) { + if (j >= 0) { + if (js->branch_states[j].destination) + rep(js->pub.destination, js->branch_states[j].destination); + if (js->branch_states[j].ce_node) + rep(js->pub.ce_node, js->branch_states[j].ce_node); + if (js->branch_states[j].jdl) + rep(js->pub.matched_jdl, js->branch_states[j].jdl); + } + } +} + /* XXX more thorough malloc, calloc, and asprintf failure handling */ /* XXX indexes in {short,long}_fields */ /* XXX strict mode */ diff --git a/org.glite.lb.server/src/jobstat.h b/org.glite.lb.server/src/jobstat.h index 1b17c7b..bb6fff5e 100644 --- a/org.glite.lb.server/src/jobstat.h +++ b/org.glite.lb.server/src/jobstat.h @@ -5,14 +5,28 @@ * (includes edg_wll_JobStat API structure) */ -#define INTSTAT_VERSION "release-2.0" +#define INTSTAT_VERSION "release-3.0_shallow" + +// shallow resubmission container - holds state of each branch +// (useful when state restore is needed after ReallyRunning event) +// +typedef struct _branch_state { + int branch; + char *destination; + char *ce_node; + char *jdl; +} branch_state; typedef struct _intJobStat { edg_wll_JobStat pub; - int wontresub; + int resubmit_type; char *last_seqcode; char *last_cancel_seqcode; + char *branch_tag_seqcode; + char *last_branch_seqcode; + char *deep_resubmit_seqcode; + branch_state *branch_states; // branch zero terminated array /* int expect_mask; */ } intJobStat; @@ -28,3 +42,6 @@ char *enc_intJobStat(char *, intJobStat* ); void write2rgma_status(edg_wll_JobStat *); +int before_deep_resubmission(const char *, const char *); +int same_branch(const char *, const char *); +int component_seqcode(const char *a, edg_wll_Source index); diff --git a/org.glite.lb.server/src/jobstat_supp.c b/org.glite.lb.server/src/jobstat_supp.c index 3f4f3ad..e3bbe79 100644 --- a/org.glite.lb.server/src/jobstat_supp.c +++ b/org.glite.lb.server/src/jobstat_supp.c @@ -175,6 +175,79 @@ static char **dec_strlist(char *in, char **rest) return out; } +static char *enc_branch_states(char *old, branch_state *item) +{ + char *ret; + + if (item == NULL) { + asprintf(&ret,"%s-1 ", old); + free(old); + return ret; + } else { + asprintf(&ret,"%s1 ",old); + free(old); + if (ret == NULL) return ret; + } + do { + ret = enc_int(ret, (*item).branch); + ret = enc_string(ret, (*item).destination); + ret = enc_string(ret, (*item).ce_node); + ret = enc_string(ret, (*item).jdl); + } while ((*(item++)).branch != 0); + return ret; +} + +static branch_state *dec_branch_states(char *in, char **rest) +{ + branch_state *out; + int len = -1, b = 0; + char *tmp_in, *tmp_ret; + int scret; + + scret = sscanf(in, "%d", &len); + if (scret < 1) { + *rest = NULL; + return NULL; + } + if (len == -1) { + *rest = strchr(in, ' ') ? strchr(in, ' ') + 1 : NULL; + return NULL; + } + + len = 0; + tmp_in = in = strchr(in, ' ') + 1 ; + do { + b = dec_int(tmp_in, &tmp_in); + tmp_ret = dec_string(tmp_in, &tmp_in); free(tmp_ret); + if (!tmp_in) { *rest = tmp_in; return NULL; } + tmp_ret = dec_string(tmp_in, &tmp_in); free(tmp_ret); + if (!tmp_in) { *rest = tmp_in; return NULL; } + tmp_ret = dec_string(tmp_in, &tmp_in); free(tmp_ret); + if (!tmp_in) { *rest = tmp_in; return NULL; } + len++; + } while (b != 0); + + out = (branch_state *) calloc(len+1, sizeof(branch_state)); + + if (out) { + len = 0; + tmp_in = in; + + do { + out[len].branch = dec_int(tmp_in, &tmp_in); + out[len].destination = dec_string(tmp_in, &tmp_in); + out[len].ce_node = dec_string(tmp_in, &tmp_in); + out[len].jdl = dec_string(tmp_in, &tmp_in); + } while (out[len++].branch != 0); + *rest = tmp_in; + } + else + *rest = 0; + + return out; + +} + static char* enc_taglist(char *old, edg_wll_TagValue *item) { char *ret; @@ -199,8 +272,8 @@ static edg_wll_TagValue *dec_taglist(char *in, char **rest) { edg_wll_TagValue *out; int len = -1; - char *tmp_in, *tmp_ret, *tmp_ret2; - int scret; + char *tmp_in, *tmp_ret; + int scret, end = 0; scret = sscanf(in, "%d", &len); if (scret < 1) { @@ -215,12 +288,15 @@ static edg_wll_TagValue *dec_taglist(char *in, char **rest) len = 0; tmp_in = in = strchr(in, ' ') + 1 ; do { - tmp_ret2 = dec_string(tmp_in, &tmp_in); - if (!tmp_in) { *rest = tmp_in; return NULL; } tmp_ret = dec_string(tmp_in, &tmp_in); + if (tmp_ret) free(tmp_ret); + else end = 1; + if (!tmp_in) { *rest = tmp_in; return NULL; } + tmp_ret = dec_string(tmp_in, &tmp_in); + free(tmp_ret); if (!tmp_in) { *rest = tmp_in; return NULL; } len++; - } while (tmp_ret2 != NULL); + } while (!end); out = (edg_wll_TagValue *) malloc(len*sizeof(edg_wll_TagValue)); @@ -354,6 +430,9 @@ static char *enc_JobStat(char *old, edg_wll_JobStat* stat) if (ret) ret = enc_timeval(ret, stat->lastUpdateTime); if (ret) ret = enc_int(ret, stat->expectUpdate); if (ret) ret = enc_string(ret, stat->expectFrom); + if (ret) ret = enc_int(ret, stat->payload_running); + if (ret) ret = enc_strlist(ret, stat->possible_destinations); + if (ret) ret = enc_strlist(ret, stat->possible_ce_nodes); return ret; } @@ -399,6 +478,9 @@ static edg_wll_JobStat* dec_JobStat(char *in, char **rest) if (tmp_in != NULL) stat->lastUpdateTime = dec_timeval(tmp_in, &tmp_in); if (tmp_in != NULL) stat->expectUpdate = dec_int(tmp_in, &tmp_in); if (tmp_in != NULL) stat->expectFrom = dec_string(tmp_in, &tmp_in); + if (tmp_in != NULL) stat->payload_running = dec_int(tmp_in, &tmp_in); + if (tmp_in != NULL) stat->possible_destinations = dec_strlist(tmp_in, &tmp_in); + if (tmp_in != NULL) stat->possible_ce_nodes = dec_strlist(tmp_in, &tmp_in); *rest = tmp_in; return stat; @@ -409,9 +491,13 @@ char *enc_intJobStat(char *old, intJobStat* stat) char *ret; ret = enc_JobStat(old, &stat->pub); - if (ret) ret = enc_int(ret, stat->wontresub); + if (ret) ret = enc_int(ret, stat->resubmit_type); if (ret) ret = enc_string(ret, stat->last_seqcode); if (ret) ret = enc_string(ret, stat->last_cancel_seqcode); + if (ret) ret = enc_string(ret, stat->branch_tag_seqcode); + if (ret) ret = enc_string(ret, stat->last_branch_seqcode); + if (ret) ret = enc_string(ret, stat->deep_resubmit_seqcode); + if (ret) ret = enc_branch_states(ret, stat->branch_states); return ret; } @@ -428,13 +514,25 @@ intJobStat* dec_intJobStat(char *in, char **rest) if (stat != NULL) { stat->pub = *pubstat; free(pubstat); - stat->wontresub = dec_int(tmp_in, &tmp_in); + stat->resubmit_type = dec_int(tmp_in, &tmp_in); if (tmp_in != NULL) { stat->last_seqcode = dec_string(tmp_in, &tmp_in); } if (tmp_in != NULL) { stat->last_cancel_seqcode = dec_string(tmp_in, &tmp_in); } + if (tmp_in != NULL) { + stat->branch_tag_seqcode = dec_string(tmp_in, &tmp_in); + } + if (tmp_in != NULL) { + stat->last_branch_seqcode = dec_string(tmp_in, &tmp_in); + } + if (tmp_in != NULL) { + stat->deep_resubmit_seqcode = dec_string(tmp_in, &tmp_in); + } + if (tmp_in != NULL) { + stat->branch_states = dec_branch_states(tmp_in, &tmp_in); + } } else if (tmp_in != NULL) { edg_wll_FreeStatus(pubstat); free(pubstat); @@ -598,6 +696,47 @@ edg_wll_ErrorCode edg_wll_RefreshIColumns(edg_wll_Context ctx, void *job_index_c return edg_wll_Error(ctx, NULL, NULL); } +int component_seqcode(const char *a, edg_wll_Source index) +{ + unsigned int c[EDG_WLL_SOURCE__LAST]; + int res; + + res = sscanf(a, "UI=%d:NS=%d:WM=%d:BH=%d:JSS=%d:LM=%d:LRMS=%d:APP=%d", + &c[EDG_WLL_SOURCE_USER_INTERFACE], + &c[EDG_WLL_SOURCE_NETWORK_SERVER], + &c[EDG_WLL_SOURCE_WORKLOAD_MANAGER], + &c[EDG_WLL_SOURCE_BIG_HELPER], + &c[EDG_WLL_SOURCE_JOB_SUBMISSION], + &c[EDG_WLL_SOURCE_LOG_MONITOR], + &c[EDG_WLL_SOURCE_LRMS], + &c[EDG_WLL_SOURCE_APPLICATION]); + if (res != EDG_WLL_SOURCE__LAST-1) { + syslog(LOG_ERR, "unparsable sequence code %s\n", a); + fprintf(stderr, "unparsable sequence code %s\n", a); + return -1; + } + + return(c[index]); +} + +int before_deep_resubmission(const char *a, const char *b) +{ + if (component_seqcode(a, EDG_WLL_SOURCE_WORKLOAD_MANAGER) < + component_seqcode(b, EDG_WLL_SOURCE_WORKLOAD_MANAGER) ) + return(1); + else + return(0); + +} + +int same_branch(const char *a, const char *b) +{ + if (component_seqcode(a, EDG_WLL_SOURCE_WORKLOAD_MANAGER) == + component_seqcode(b, EDG_WLL_SOURCE_WORKLOAD_MANAGER) ) + return(1); + else + return(0); +} int edg_wll_compare_seq(const char *a, const char *b) { diff --git a/org.glite.lb.server/src/lb_xml_parse.c.T b/org.glite.lb.server/src/lb_xml_parse.c.T index 6732a04..6a13b2b 100644 --- a/org.glite.lb.server/src/lb_xml_parse.c.T +++ b/org.glite.lb.server/src/lb_xml_parse.c.T @@ -1625,6 +1625,8 @@ int edg_wll_JobStatusToXML(edg_wll_Context ctx, edg_wll_JobStat stat, char **mes gen "edg_wll_add_$ft\_to_XMLBody(&pomB, stat.$_, \"$_\", $n);\n"; } @@@} + if (stat.possible_destinations) edg_wll_add_strlist_to_XMLBody(&pomB, stat.possible_destinations, "possible_destinations", "name", "\t\t\t", NULL); + if (stat.possible_ce_nodes) edg_wll_add_strlist_to_XMLBody(&pomB, stat.possible_ce_nodes, "possible_ce_nodes", "name", "\t\t\t", NULL); if (stat.children) edg_wll_add_strlist_to_XMLBody(&pomB, stat.children, "children", "jobId", "\t\t\t", NULL); if (stat.children_hist) edg_wll_add_intlist_to_XMLBody(&pomB, stat.children_hist, "children_hist", edg_wll_StatToString, "\t\t\t", 1, stat.children_hist[0]); if (stat.children_states) edg_wll_add_stslist_to_XMLBody(ctx, &pomB, stat.children_states, "children_states", "", EDG_WLL_JOB_UNDEF); diff --git a/org.glite.lb/project/dependencies.properties b/org.glite.lb/project/dependencies.properties index 6d9ae2f..a2f7c3b 100644 --- a/org.glite.lb/project/dependencies.properties +++ b/org.glite.lb/project/dependencies.properties @@ -3,15 +3,25 @@ # System dependencies ################################################################### -org.glite.version = HEAD -org.glite.lb.version = HEAD + + + org.glite.version = glite_B_1_0_5_190 + org.glite.lb.version = glite-lb_branch_1_1_0_2_SHALLOW + + org.glite.lb.client-interface.version = glite-lb-client-interface_branch_1_2_1_3 + + org.glite.lb.ws-interface.version = glite-lb-ws-interface_branch_1_2_0_2 + + org.glite.lb.common.version = glite-lb-common_branch_1_4_0_2 + + org.glite.lb.client.version = glite-lb-client_branch_1_2_1_2 + + org.glite.lb.server-bones.version = glite-lb-server-bones_branch_2_0_0_2 + + org.glite.lb.logger.version = glite-lb-logger_branch_1_1_1_2 + + org.glite.lb.server.version = glite-lb-server_branch_1_2_4_2 + + org.glite.lb.proxy.version = glite-lb-proxy_branch_1_1_1_2 + -# Component dependencies tag = do not remove this line = -org.glite.lb.client-interface.version = HEAD -org.glite.lb.ws-interface.version = HEAD -org.glite.lb.common.version = HEAD -org.glite.lb.client.version = HEAD -org.glite.lb.server.version = HEAD -org.glite.lb.proxy.version = HEAD -org.glite.lb.server-bones.version = HEAD -org.glite.lb.logger.version = HEAD diff --git a/org.glite.lb/project/events.T b/org.glite.lb/project/events.T index cf8e8b1..607072d 100644 --- a/org.glite.lb/project/events.T +++ b/org.glite.lb/project/events.T @@ -86,13 +86,14 @@ _code_ CALLING The logging component is caller. _code_ CALLED The logging component is callee. -@type Running Executable started. +@type Running Job wrapper started. string node Worker node on which the job executable is being run. @type Resubmission Result of resubmission decision. - int result Result code of the resubmission decision (WILLRESUB or WONTRESUB). - _code_ WILLRESUB The job will be resubmitted. + int result Result code of the resubmission decision (WILLRESUB or WONTRESUB or SHALLOW). + _code_ WILLRESUB The job will be resubmitted (deep resubmission). _code_ WONTRESUB The job will not be resubmitted. + _code_ SHALLOW Shallow resubmission (user payload has not started yet) string reason Reason why the job will or will not be resubmitted. string tag Value of the attribute on which the decision to resubmit the job was based. @@ -186,3 +187,7 @@ string resource which resource int quantity how much string unit units (sec, kB, etc.) + +@type ReallyRunning User payload started + _optional_ + string wn_seq sequence code on the worker node diff --git a/org.glite.lb/project/status.T b/org.glite.lb/project/status.T index 10071ac..5c409a8 100644 --- a/org.glite.lb/project/status.T +++ b/org.glite.lb/project/status.T @@ -64,6 +64,12 @@ bool expectUpdate Some logged information has not arrived yet string expectFrom Sources of the missing information string acl ACL of the job +bool payload_running User payload started +strlist possible_destinations Possible job destinations + _special_ XMLstructured +strlist possible_ce_nodes CE nodes matching to possible_destinations + _special_ XMLstructured + @type Submitted entered by the user to the User Interface or registered by Job Partitioner @type Waiting Accepted by WMS, waiting for resource allocation @type Ready Matching resources found -- 1.8.2.3