From: Miloš Mulač Date: Thu, 9 Jun 2005 09:49:54 +0000 (+0000) Subject: shallow resubmission support in state automaton X-Git-Url: http://scientific.zcu.cz/git/?a=commitdiff_plain;h=6f49fb91be8cc162c9b7efb2752a89b2129a9351;p=jra1mw.git shallow resubmission support in state automaton - midway -> needs testing & improving --- diff --git a/org.glite.lb.server/src/jobstat.c b/org.glite.lb.server/src/jobstat.c index 4a2de3c..a2901f9 100644 --- a/org.glite.lb.server/src/jobstat.c +++ b/org.glite.lb.server/src/jobstat.c @@ -39,6 +39,9 @@ #define RET_BADSEQ 4 #define RET_SUSPECT 5 #define RET_IGNORE 6 +#define RET_BADBRANCH 7 +#define RET_GOODBRANCH 8 +#define RET_TOOOLD 9 #define RET_INTERNAL 100 #define rep(a,b) { free(a); a = (b == NULL) ? NULL : strdup(b); } @@ -50,6 +53,9 @@ static char* location_string(const char*, const char*, const char*); static int add_stringlist(char ***, const char *) UNUSED_VAR; static void free_stringlist(char ***); static int add_taglist(edg_wll_TagValue **, const char *, const char *); +static void update_branch_state(char *, char *, char *, char *, sr_container **); +static void free_branch_state(sr_container **); +static void load_branch_state(intJobStat *); int edg_wll_intJobStatus(edg_wll_Context, const edg_wlc_JobId, int, intJobStat *, int); @@ -77,7 +83,7 @@ static void destroy_intJobStat_extension(intJobStat *p) { free(p->last_seqcode); p->last_seqcode = NULL; free(p->last_cancel_seqcode); p->last_cancel_seqcode = NULL; - p->wontresub = 0; + p->resubmit_type = 0; } void destroy_intJobStat(intJobStat *p) @@ -412,6 +418,7 @@ static int badEvent(intJobStat *js UNUSED_VAR, edg_wll_Event *e, int ev_seq UNUS #define USABLE(res,strict) ((res) == RET_OK || ( (res) == RET_SOON && !strict)) #define USABLE_DATA(res,strict) ((res) == RET_OK || ( (res) != RET_FATAL && !strict)) +#define USABLE_BRANCH(fine_res) ((fine_res) != RET_TOOOLD && (fine_res) != RET_BADBRANCH) #define LRMS_STATE(state) ((state) == EDG_WLL_JOB_RUNNING || (state) == EDG_WLL_JOB_DONE) @@ -420,7 +427,10 @@ static int processEvent(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict edg_wll_JobStatCode old_state = js->pub.state; edg_wll_JobStatCode new_state = EDG_WLL_JOB_UNKNOWN; - int res = RET_OK; + int res = RET_OK, + fine_res = RET_OK; + + if (old_state == EDG_WLL_JOB_ABORTED || old_state == EDG_WLL_JOB_CANCELLED || @@ -428,11 +438,32 @@ static int processEvent(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict res = RET_LATE; } - if (js->last_seqcode != NULL && - edg_wll_compare_seq_full(e->any.seqcode, js->last_seqcode, js->wn_seqcode) < 0) { + + if (js->deep_resubmit_seqcode && + before_deep_resubmission(e->any.seqcode, js->deep_resubmit_seqcode)) { res = RET_LATE; + fine_res = RET_TOOOLD; } + if (js->branch_tag_seqcode) { // ReallyRunning ev. arrived + if (same_branch(e->any.seqcode, js->branch_tag_seqcode)) { + if ((js->last_seqcode != NULL) && + edg_wll_compare_seq(e->any.seqcode, js->last_branch_seqcode) < 0) { + res = RET_LATE; + } + fine_res = RET_GOODBRANCH; + } + else { + res = RET_LATE; + fine_res = RET_BADBRANCH; + } + } + else if ((js->last_seqcode != NULL) && + edg_wll_compare_seq(e->any.seqcode, js->last_seqcode) < 0) { + res = RET_LATE; + } + + switch (e->any.type) { case EDG_WLL_EVENT_TRANSFER: if (e->transfer.result == EDG_WLL_TRANSFER_OK) { @@ -564,6 +595,8 @@ static int processEvent(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict new_state = EDG_WLL_JOB_WAITING; break; case EDG_WLL_SOURCE_WORKLOAD_MANAGER: if (LRMS_STATE(old_state)) res = RET_LATE; + update_branch_state(e->any.seqcode, NULL, + NULL, e->enQueued.job, &js->branch_states); new_state = EDG_WLL_JOB_READY; break; case EDG_WLL_SOURCE_LOG_MONITOR: new_state = EDG_WLL_JOB_WAITING; break; @@ -609,7 +642,10 @@ static int processEvent(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict case EDG_WLL_SOURCE_NETWORK_SERVER: rep(js->pub.jdl, e->enQueued.job); break; case EDG_WLL_SOURCE_WORKLOAD_MANAGER: - rep(js->pub.matched_jdl, e->enQueued.job); break; + if (USABLE_BRANCH(res)) { + rep(js->pub.matched_jdl, e->enQueued.job); + break; + } case EDG_WLL_SOURCE_LOG_MONITOR: /* no interim JDL here */ break; @@ -672,17 +708,34 @@ static int processEvent(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict e->running.node); } if (USABLE_DATA(res, strict)) { - rep(js->pub.ce_node, e->running.node); - if (e->running.node) - add_stringlist(&js->pub.possible_ce_nodes, e->running.node); + if (USABLE_BRANCH(fine_res)) { + rep(js->pub.ce_node, e->running.node); + } + if (e->any.source == EDG_WLL_SOURCE_LOG_MONITOR) { + if (e->running.node) { + update_branch_state(e->any.seqcode, NULL, + e->running.node, NULL, &js->branch_states); + add_stringlist(&js->pub.possible_ce_nodes, + e->running.node); + } + } } break; case EDG_WLL_EVENT_REALLYRUNNING: if (USABLE_DATA(res, strict)) { - rep(js->wn_seqcode, e->reallyRunning.wn_seq); + js->pub.payload_running = 1; + if (e->any.source == EDG_WLL_SOURCE_LRMS) { + rep(js->branch_tag_seqcode, e->any.seqcode); + rep(js->last_branch_seqcode, e->any.seqcode); + } + if (e->any.source == EDG_WLL_SOURCE_LOG_MONITOR) { + rep(js->branch_tag_seqcode, e->reallyRunning.wn_seq); + rep(js->last_branch_seqcode, e->reallyRunning.wn_seq); + } + + load_branch_state(js); } break; - case EDG_WLL_EVENT_RESUBMISSION: if (USABLE(res, strict)) { if (e->resubmission.result == EDG_WLL_RESUBMISSION_WONTRESUB) { @@ -691,17 +744,22 @@ static int processEvent(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict } if (USABLE_DATA(res, strict)) { if (e->resubmission.result == EDG_WLL_RESUBMISSION_WONTRESUB) { - js->wontresub = 1; + js->resubmit_type = EDG_WLL_RESUBMISSION_WONTRESUB; } - } - if (USABLE_DATA(res, strict)) { + else if (e->resubmission.result == EDG_WLL_RESUBMISSION_WILLRESUB) { + js->resubmit_type = EDG_WLL_RESUBMISSION_WILLRESUB; free_stringlist(&js->pub.possible_destinations); free_stringlist(&js->pub.possible_ce_nodes); - if (js->wn_seqcode) { - free(js->wn_seqcode); - js->wn_seqcode = NULL; - } + free_branch_state(&js->branch_states); + js->pub.payload_running = 0; + rep(js->branch_tag_seqcode, NULL); + rep(js->deep_resubmit_seqcode, e->any.seqcode); + } + else + if (e->resubmission.result == EDG_WLL_RESUBMISSION_SHALLOW) { + js->resubmit_type = EDG_WLL_RESUBMISSION_SHALLOW; + rep(js->deep_resubmit_seqcode, NULL); } } break; @@ -714,6 +772,9 @@ static int processEvent(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict if (USABLE(res, strict)) { js->pub.state = EDG_WLL_JOB_DONE; rep(js->pub.reason, e->done.reason); + if (fine_res == RET_GOODBRANCH) + js->pub.payload_running = 0; + switch (e->done.status_code) { case EDG_WLL_DONE_CANCELLED: js->pub.state = EDG_WLL_JOB_CANCELLED; @@ -745,7 +806,7 @@ static int processEvent(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict break; case EDG_WLL_EVENT_CANCEL: if (js->last_cancel_seqcode != NULL && - edg_wll_compare_seq_full(e->any.seqcode, js->last_cancel_seqcode, js->wn_seqcode) < 0) { + edg_wll_compare_seq_full(e->any.seqcode, js->last_cancel_seqcode, js->branch_tag_seqcode) < 0) { res = RET_LATE; } if (USABLE(res, strict)) { @@ -775,6 +836,7 @@ static int processEvent(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict js->pub.state = EDG_WLL_JOB_ABORTED; rep(js->pub.reason, e->abort.reason); rep(js->pub.location, "none"); + js->pub.payload_running = 0; } break; @@ -809,10 +871,14 @@ static int processEvent(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict e->match.host, e->match.src_instance); } - if (USABLE_DATA(res, strict)) { + if (USABLE_DATA(res, strict) && USABLE_BRANCH(fine_res)) { rep(js->pub.destination, e->match.dest_id); - if (e->match.dest_id) - add_stringlist(&js->pub.possible_destinations, e->match.dest_id); + if (e->match.dest_id) { + update_branch_state(e->any.seqcode, e->match.dest_id, + NULL, NULL, &js->branch_states); + add_stringlist(&js->pub.possible_destinations, + e->match.dest_id); + } } break; case EDG_WLL_EVENT_PENDING: @@ -892,6 +958,8 @@ static int processEvent(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict rep(js->last_cancel_seqcode, e->any.seqcode); } else { rep(js->last_seqcode, e->any.seqcode); + if (fine_res == RET_GOODBRANCH) + rep(js->last_branch_seqcode, e->any.seqcode); } return res; @@ -1076,6 +1144,104 @@ static int add_taglist(edg_wll_TagValue **lptr, const char *new_item, const char } } +static void update_branch_state(char *b, char *d, char *c, char *j, sr_container **bs) +{ + int i = 0, branch; + + + if (!b) + return; + else + branch = component_seqcode(b, EDG_WLL_SOURCE_WORKLOAD_MANAGER); + + if (*bs != NULL) { + while ((*bs)[i].branch) { + if (branch == (*bs)[i].branch) { + if (d) rep((*bs)[i].destination, d); + if (c) rep((*bs)[i].ce_node, c); + if (j) rep((*bs)[i].jdl, j); + + return; + } + i++; + } + } + + *bs = (sr_container *) realloc(*bs, (i+2)*sizeof(sr_container)); + memset(&((*bs)[i]), 0, 2*sizeof(sr_container)); + + (*bs)[i].branch = branch; + rep((*bs)[i].destination, d); + rep((*bs)[i].ce_node, c); + rep((*bs)[i].jdl, j); +} + + +static void free_branch_state(sr_container **bs) +{ + int i = 0; + + if (*bs == NULL) return; + + while ((*bs)[i].branch) { + free((*bs)[i].destination); + free((*bs)[i].ce_node); + free((*bs)[i].jdl); + i++; + } + free(*bs); + *bs = NULL; +} + +static int compare_branch_states(const void *a, const void *b) +{ + sr_container *c = (sr_container *) a; + sr_container *d = (sr_container *) b; + + if (c->branch < d->branch) return -1; + if (c->branch == d->branch) return 0; + if (c->branch > d->branch) return 1; +} + +static void load_branch_state(intJobStat *js) +{ + int i, j, branch; + + + if ( (!js->branch_tag_seqcode) || (!js->branch_states) ) return; + + branch = component_seqcode(js->branch_tag_seqcode, EDG_WLL_SOURCE_WORKLOAD_MANAGER); + + // count elements + i = 0; + while (js->branch_states[i].branch) i++; + + // sort them + qsort(js->branch_states, (size_t) i, sizeof(sr_container), + compare_branch_states); + + // find row corresponding to ReallyRunning WM seq.code (aka branch) + i = 0; + while (js->branch_states[i].branch) { + if (js->branch_states[i].branch == branch) break; + i++; + } + + // copy this and two before branches data to final state + // (the newer the more important - so i-th element is copied as last) + // (and may overwrite data from previous elements) + for (j = i - 2; j <= i; j++) { + if (j >= 0) { + if (js->branch_states[j].destination) + rep(js->pub.destination, js->branch_states[j].destination); + if (js->branch_states[j].ce_node) + rep(js->pub.ce_node, js->branch_states[j].ce_node); + if (js->branch_states[j].jdl) + rep(js->pub.matched_jdl, js->branch_states[j].jdl); + } + } +} + /* XXX more thorough malloc, calloc, and asprintf failure handling */ /* XXX indexes in {short,long}_fields */ /* XXX strict mode */ diff --git a/org.glite.lb.server/src/jobstat.h b/org.glite.lb.server/src/jobstat.h index 8c5dbee..e9bbe2f 100644 --- a/org.glite.lb.server/src/jobstat.h +++ b/org.glite.lb.server/src/jobstat.h @@ -7,13 +7,23 @@ #define INTSTAT_VERSION "release-2.0" +typedef struct _sr_container { // shallow resubmission container + int branch; + char *destination; + char *ce_node; + char *jdl; +} sr_container; + typedef struct _intJobStat { edg_wll_JobStat pub; - int wontresub; + int resubmit_type; char *last_seqcode; char *last_cancel_seqcode; - char *wn_seqcode; + char *branch_tag_seqcode; + char *last_branch_seqcode; + char *deep_resubmit_seqcode; + sr_container *branch_states; // branch zero terminated array /* int expect_mask; */ } intJobStat; @@ -29,3 +39,6 @@ char *enc_intJobStat(char *, intJobStat* ); void write2rgma_status(edg_wll_JobStat *); +int before_deep_resubmission(const char *, const char *); +int same_branch(const char *, const char *); +int component_seqcode(const char *a, edg_wll_Source index); diff --git a/org.glite.lb.server/src/jobstat_supp.c b/org.glite.lb.server/src/jobstat_supp.c index f32f638..09093c8 100644 --- a/org.glite.lb.server/src/jobstat_supp.c +++ b/org.glite.lb.server/src/jobstat_supp.c @@ -175,6 +175,81 @@ static char **dec_strlist(char *in, char **rest) return out; } +static char *enc_sr_container(char *old, sr_container *item) +{ + char *ret; + + if (item == NULL) { + asprintf(&ret,"%s-1 ", old); + free(old); + return ret; + } else { + asprintf(&ret,"%s1 ",old); + free(old); + if (ret == NULL) return ret; + } + do { + ret = enc_int(ret, (*item).branch); + ret = enc_string(ret, (*item).destination); + ret = enc_string(ret, (*item).ce_node); + ret = enc_string(ret, (*item).jdl); + } while ((*(item++)).branch != 0); + return ret; +} + +static sr_container *dec_sr_container(char *in, char **rest) +{ + sr_container *out; + int len = -1, b = 0; + char *tmp_in, *tmp_ret; + int scret; + + scret = sscanf(in, "%d", &len); + if (scret < 1) { + *rest = NULL; + return NULL; + } + if (len == -1) { + *rest = strchr(in, ' ') ? strchr(in, ' ') + 1 : NULL; + return NULL; + } + + len = 0; + tmp_in = in = strchr(in, ' ') + 1 ; + do { + b = dec_int(tmp_in, &tmp_in); + tmp_ret = dec_string(tmp_in, &tmp_in); free(tmp_ret); + if (!tmp_in) { *rest = tmp_in; return NULL; } + tmp_ret = dec_string(tmp_in, &tmp_in); free(tmp_ret); + if (!tmp_in) { *rest = tmp_in; return NULL; } + tmp_ret = dec_string(tmp_in, &tmp_in); free(tmp_ret); + if (!tmp_in) { *rest = tmp_in; return NULL; } + len++; + } while (b != 0); + + out = (sr_container *) calloc(len+1, sizeof(sr_container)); + + if (out) { + len = 0; + tmp_in = in; + + do { + out[len].branch = dec_int(tmp_in, &tmp_in); + out[len].destination = dec_string(tmp_in, &tmp_in); + out[len].ce_node = dec_string(tmp_in, &tmp_in); + out[len].jdl = dec_string(tmp_in, &tmp_in); + } while (out[len++].branch != 0); + *rest = tmp_in; + } + else + *rest = 0; + + return out; + +} + + + static char* enc_taglist(char *old, edg_wll_TagValue *item) { char *ret; @@ -199,8 +274,8 @@ static edg_wll_TagValue *dec_taglist(char *in, char **rest) { edg_wll_TagValue *out; int len = -1; - char *tmp_in, *tmp_ret, *tmp_ret2; - int scret; + char *tmp_in, *tmp_ret; + int scret, end = 0; scret = sscanf(in, "%d", &len); if (scret < 1) { @@ -215,12 +290,15 @@ static edg_wll_TagValue *dec_taglist(char *in, char **rest) len = 0; tmp_in = in = strchr(in, ' ') + 1 ; do { - tmp_ret2 = dec_string(tmp_in, &tmp_in); + tmp_ret = dec_string(tmp_in, &tmp_in); + if (tmp_ret) free(tmp_ret); + else end = 1; if (!tmp_in) { *rest = tmp_in; return NULL; } tmp_ret = dec_string(tmp_in, &tmp_in); + free(tmp_ret); if (!tmp_in) { *rest = tmp_in; return NULL; } len++; - } while (tmp_ret2 != NULL); + } while (end == 0); out = (edg_wll_TagValue *) malloc(len*sizeof(edg_wll_TagValue)); @@ -354,6 +432,7 @@ static char *enc_JobStat(char *old, edg_wll_JobStat* stat) if (ret) ret = enc_timeval(ret, stat->lastUpdateTime); if (ret) ret = enc_int(ret, stat->expectUpdate); if (ret) ret = enc_string(ret, stat->expectFrom); + if (ret) ret = enc_int(ret, stat->payload_running); if (ret) ret = enc_strlist(ret, stat->possible_destinations); if (ret) ret = enc_strlist(ret, stat->possible_ce_nodes); @@ -401,6 +480,7 @@ static edg_wll_JobStat* dec_JobStat(char *in, char **rest) if (tmp_in != NULL) stat->lastUpdateTime = dec_timeval(tmp_in, &tmp_in); if (tmp_in != NULL) stat->expectUpdate = dec_int(tmp_in, &tmp_in); if (tmp_in != NULL) stat->expectFrom = dec_string(tmp_in, &tmp_in); + if (tmp_in != NULL) stat->payload_running = dec_int(tmp_in, &tmp_in); if (tmp_in != NULL) stat->possible_destinations = dec_strlist(tmp_in, &tmp_in); if (tmp_in != NULL) stat->possible_ce_nodes = dec_strlist(tmp_in, &tmp_in); @@ -415,10 +495,13 @@ char *enc_intJobStat(char *old, intJobStat* stat) char *ret; ret = enc_JobStat(old, &stat->pub); - if (ret) ret = enc_int(ret, stat->wontresub); + if (ret) ret = enc_int(ret, stat->resubmit_type); if (ret) ret = enc_string(ret, stat->last_seqcode); if (ret) ret = enc_string(ret, stat->last_cancel_seqcode); - if (ret) ret = enc_string(ret, stat->wn_seqcode); + if (ret) ret = enc_string(ret, stat->branch_tag_seqcode); + if (ret) ret = enc_string(ret, stat->last_branch_seqcode); + if (ret) ret = enc_string(ret, stat->deep_resubmit_seqcode); + if (ret) ret = enc_sr_container(ret, stat->branch_states); return ret; } @@ -435,7 +518,7 @@ intJobStat* dec_intJobStat(char *in, char **rest) if (stat != NULL) { stat->pub = *pubstat; free(pubstat); - stat->wontresub = dec_int(tmp_in, &tmp_in); + stat->resubmit_type = dec_int(tmp_in, &tmp_in); if (tmp_in != NULL) { stat->last_seqcode = dec_string(tmp_in, &tmp_in); } @@ -443,7 +526,16 @@ intJobStat* dec_intJobStat(char *in, char **rest) stat->last_cancel_seqcode = dec_string(tmp_in, &tmp_in); } if (tmp_in != NULL) { - stat->wn_seqcode = dec_string(tmp_in, &tmp_in); + stat->branch_tag_seqcode = dec_string(tmp_in, &tmp_in); + } + if (tmp_in != NULL) { + stat->last_branch_seqcode = dec_string(tmp_in, &tmp_in); + } + if (tmp_in != NULL) { + stat->deep_resubmit_seqcode = dec_string(tmp_in, &tmp_in); + } + if (tmp_in != NULL) { + stat->branch_states = dec_sr_container(tmp_in, &tmp_in); } } else if (tmp_in != NULL) { edg_wll_FreeStatus(pubstat); @@ -615,7 +707,7 @@ int edg_wll_compare_seq_full(const char *a, const char *b, const char *wn) unsigned int c[EDG_WLL_SOURCE__LAST]; unsigned int d[EDG_WLL_SOURCE__LAST]; unsigned int w[EDG_WLL_SOURCE__LAST]; - int res, i; + int res; assert(EDG_WLL_SOURCE__LAST == 9); @@ -667,6 +759,17 @@ int edg_wll_compare_seq_full(const char *a, const char *b, const char *wn) } + if (c[EDG_WLL_SOURCE_WORKLOAD_MANAGER] == w[EDG_WLL_SOURCE_WORKLOAD_MANAGER]) { + if (d[EDG_WLL_SOURCE_WORKLOAD_MANAGER] == w[EDG_WLL_SOURCE_WORKLOAD_MANAGER]) + return(edg_wll_compare_seq(a,b)); + else + return 1; + } + else { + return -1; + } + +/* if ( (c[EDG_WLL_SOURCE_WORKLOAD_MANAGER] == w[EDG_WLL_SOURCE_WORKLOAD_MANAGER]) && (d[EDG_WLL_SOURCE_WORKLOAD_MANAGER] != w[EDG_WLL_SOURCE_WORKLOAD_MANAGER]) ) return 1; @@ -677,8 +780,50 @@ int edg_wll_compare_seq_full(const char *a, const char *b, const char *wn) return -1; return(edg_wll_compare_seq(a,b)); +*/ } +int component_seqcode(const char *a, edg_wll_Source index) +{ + unsigned int c[EDG_WLL_SOURCE__LAST]; + int res; + + res = sscanf(a, "UI=%d:NS=%d:WM=%d:BH=%d:JSS=%d:LM=%d:LRMS=%d:APP=%d", + &c[EDG_WLL_SOURCE_USER_INTERFACE], + &c[EDG_WLL_SOURCE_NETWORK_SERVER], + &c[EDG_WLL_SOURCE_WORKLOAD_MANAGER], + &c[EDG_WLL_SOURCE_BIG_HELPER], + &c[EDG_WLL_SOURCE_JOB_SUBMISSION], + &c[EDG_WLL_SOURCE_LOG_MONITOR], + &c[EDG_WLL_SOURCE_LRMS], + &c[EDG_WLL_SOURCE_APPLICATION]); + if (res != EDG_WLL_SOURCE__LAST-1) { + syslog(LOG_ERR, "unparsable sequence code %s\n", a); + fprintf(stderr, "unparsable sequence code %s\n", a); + return -1; + } + + return(c[index]); +} + +int before_deep_resubmission(const char *a, const char *b) +{ + if (component_seqcode(a, EDG_WLL_SOURCE_WORKLOAD_MANAGER) < + component_seqcode(b, EDG_WLL_SOURCE_WORKLOAD_MANAGER) ) + return(1); + else + return(0); + +} + +int same_branch(const char *a, const char *b) +{ + if (component_seqcode(a, EDG_WLL_SOURCE_WORKLOAD_MANAGER) == + component_seqcode(b, EDG_WLL_SOURCE_WORKLOAD_MANAGER) ) + return(1); + else + return(0); +} int edg_wll_compare_seq(const char *a, const char *b) {