From f1428a2f752fecd7519c0ecd6f8126b15ef19947 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Milo=C5=A1=20Mula=C4=8D?= Date: Fri, 10 Jun 2005 08:41:35 +0000 Subject: [PATCH] - data version increased - relax on resubmit from LogMonitor - clean up --- org.glite.lb.server/src/get_events.h | 1 - org.glite.lb.server/src/jobstat.c | 35 ++++++----- org.glite.lb.server/src/jobstat.h | 11 ++-- org.glite.lb.server/src/jobstat_supp.c | 105 +++------------------------------ 4 files changed, 36 insertions(+), 116 deletions(-) diff --git a/org.glite.lb.server/src/get_events.h b/org.glite.lb.server/src/get_events.h index 1f64629..cfa07c1 100644 --- a/org.glite.lb.server/src/get_events.h +++ b/org.glite.lb.server/src/get_events.h @@ -25,7 +25,6 @@ int edg_wll_QueryJobsServer(edg_wll_Context, const edg_wll_QueryRec **, int, edg void edg_wll_SortEvents(edg_wll_Event *); int edg_wll_compare_seq(const char *, const char *); -int edg_wll_compare_seq_full(const char *, const char *, const char *); #ifdef __cplusplus } diff --git a/org.glite.lb.server/src/jobstat.c b/org.glite.lb.server/src/jobstat.c index dd54b19..48071d3 100644 --- a/org.glite.lb.server/src/jobstat.c +++ b/org.glite.lb.server/src/jobstat.c @@ -53,8 +53,8 @@ static char* location_string(const char*, const char*, const char*); static int add_stringlist(char ***, const char *) UNUSED_VAR; static void free_stringlist(char ***); static int add_taglist(edg_wll_TagValue **, const char *, const char *); -static void update_branch_state(char *, char *, char *, char *, sr_container **); -static void free_branch_state(sr_container **); +static void update_branch_state(char *, char *, char *, char *, branch_state **); +static void free_branch_state(branch_state **); static void load_branch_state(intJobStat *); @@ -444,8 +444,7 @@ static int processEvent(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict res = RET_LATE; fine_res = RET_TOOOLD; } - - if (js->branch_tag_seqcode) { // ReallyRunning ev. arrived + else if (js->branch_tag_seqcode) { // ReallyRunning ev. arrived if (same_branch(e->any.seqcode, js->branch_tag_seqcode)) { if ((js->last_seqcode != NULL) && edg_wll_compare_seq(e->any.seqcode, js->last_branch_seqcode) < 0) { @@ -753,7 +752,8 @@ static int processEvent(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict js->resubmit_type = EDG_WLL_RESUBMISSION_WONTRESUB; } else - if (e->resubmission.result == EDG_WLL_RESUBMISSION_WILLRESUB) { + if (e->resubmission.result == EDG_WLL_RESUBMISSION_WILLRESUB && + e->any.source == EDG_WLL_SOURCE_WORKLOAD_MANAGER) { js->resubmit_type = EDG_WLL_RESUBMISSION_WILLRESUB; free_stringlist(&js->pub.possible_destinations); free_stringlist(&js->pub.possible_ce_nodes); @@ -811,10 +811,15 @@ static int processEvent(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict } break; case EDG_WLL_EVENT_CANCEL: - if (js->last_cancel_seqcode != NULL && - edg_wll_compare_seq_full(e->any.seqcode, js->last_cancel_seqcode, js->branch_tag_seqcode) < 0) { - res = RET_LATE; + if (fine_res != RET_BADBRANCH) { + if (js->last_cancel_seqcode != NULL && + edg_wll_compare_seq(e->any.seqcode, js->last_cancel_seqcode) < 0) { + res = RET_LATE; + } } + else { + res = RET_LATE; + } if (USABLE(res, strict)) { switch (e->cancel.status_code) { case EDG_WLL_CANCEL_REQ: @@ -1154,7 +1159,7 @@ static int add_taglist(edg_wll_TagValue **lptr, const char *new_item, const char } } -static void update_branch_state(char *b, char *d, char *c, char *j, sr_container **bs) +static void update_branch_state(char *b, char *d, char *c, char *j, branch_state **bs) { int i = 0, branch; @@ -1177,8 +1182,8 @@ static void update_branch_state(char *b, char *d, char *c, char *j, sr_container } } - *bs = (sr_container *) realloc(*bs, (i+2)*sizeof(sr_container)); - memset(&((*bs)[i]), 0, 2*sizeof(sr_container)); + *bs = (branch_state *) realloc(*bs, (i+2)*sizeof(branch_state)); + memset(&((*bs)[i]), 0, 2*sizeof(branch_state)); (*bs)[i].branch = branch; rep((*bs)[i].destination, d); @@ -1187,7 +1192,7 @@ static void update_branch_state(char *b, char *d, char *c, char *j, sr_container } -static void free_branch_state(sr_container **bs) +static void free_branch_state(branch_state **bs) { int i = 0; @@ -1205,8 +1210,8 @@ static void free_branch_state(sr_container **bs) static int compare_branch_states(const void *a, const void *b) { - sr_container *c = (sr_container *) a; - sr_container *d = (sr_container *) b; + branch_state *c = (branch_state *) a; + branch_state *d = (branch_state *) b; if (c->branch < d->branch) return -1; if (c->branch == d->branch) return 0; @@ -1227,7 +1232,7 @@ static void load_branch_state(intJobStat *js) while (js->branch_states[i].branch) i++; // sort them - qsort(js->branch_states, (size_t) i, sizeof(sr_container), + qsort(js->branch_states, (size_t) i, sizeof(branch_state), compare_branch_states); // find row corresponding to ReallyRunning WM seq.code (aka branch) diff --git a/org.glite.lb.server/src/jobstat.h b/org.glite.lb.server/src/jobstat.h index e9bbe2f..bb6fff5e 100644 --- a/org.glite.lb.server/src/jobstat.h +++ b/org.glite.lb.server/src/jobstat.h @@ -5,14 +5,17 @@ * (includes edg_wll_JobStat API structure) */ -#define INTSTAT_VERSION "release-2.0" +#define INTSTAT_VERSION "release-3.0_shallow" -typedef struct _sr_container { // shallow resubmission container +// shallow resubmission container - holds state of each branch +// (useful when state restore is needed after ReallyRunning event) +// +typedef struct _branch_state { int branch; char *destination; char *ce_node; char *jdl; -} sr_container; +} branch_state; typedef struct _intJobStat { @@ -23,7 +26,7 @@ typedef struct _intJobStat { char *branch_tag_seqcode; char *last_branch_seqcode; char *deep_resubmit_seqcode; - sr_container *branch_states; // branch zero terminated array + branch_state *branch_states; // branch zero terminated array /* int expect_mask; */ } intJobStat; diff --git a/org.glite.lb.server/src/jobstat_supp.c b/org.glite.lb.server/src/jobstat_supp.c index 09093c8..e3bbe79 100644 --- a/org.glite.lb.server/src/jobstat_supp.c +++ b/org.glite.lb.server/src/jobstat_supp.c @@ -175,7 +175,7 @@ static char **dec_strlist(char *in, char **rest) return out; } -static char *enc_sr_container(char *old, sr_container *item) +static char *enc_branch_states(char *old, branch_state *item) { char *ret; @@ -197,9 +197,9 @@ static char *enc_sr_container(char *old, sr_container *item) return ret; } -static sr_container *dec_sr_container(char *in, char **rest) +static branch_state *dec_branch_states(char *in, char **rest) { - sr_container *out; + branch_state *out; int len = -1, b = 0; char *tmp_in, *tmp_ret; int scret; @@ -227,7 +227,7 @@ static sr_container *dec_sr_container(char *in, char **rest) len++; } while (b != 0); - out = (sr_container *) calloc(len+1, sizeof(sr_container)); + out = (branch_state *) calloc(len+1, sizeof(branch_state)); if (out) { len = 0; @@ -248,8 +248,6 @@ static sr_container *dec_sr_container(char *in, char **rest) } - - static char* enc_taglist(char *old, edg_wll_TagValue *item) { char *ret; @@ -291,14 +289,14 @@ static edg_wll_TagValue *dec_taglist(char *in, char **rest) tmp_in = in = strchr(in, ' ') + 1 ; do { tmp_ret = dec_string(tmp_in, &tmp_in); - if (tmp_ret) free(tmp_ret); + if (tmp_ret) free(tmp_ret); else end = 1; if (!tmp_in) { *rest = tmp_in; return NULL; } - tmp_ret = dec_string(tmp_in, &tmp_in); + tmp_ret = dec_string(tmp_in, &tmp_in); free(tmp_ret); if (!tmp_in) { *rest = tmp_in; return NULL; } len++; - } while (end == 0); + } while (!end); out = (edg_wll_TagValue *) malloc(len*sizeof(edg_wll_TagValue)); @@ -483,8 +481,6 @@ static edg_wll_JobStat* dec_JobStat(char *in, char **rest) if (tmp_in != NULL) stat->payload_running = dec_int(tmp_in, &tmp_in); if (tmp_in != NULL) stat->possible_destinations = dec_strlist(tmp_in, &tmp_in); if (tmp_in != NULL) stat->possible_ce_nodes = dec_strlist(tmp_in, &tmp_in); - - *rest = tmp_in; return stat; @@ -501,7 +497,7 @@ char *enc_intJobStat(char *old, intJobStat* stat) if (ret) ret = enc_string(ret, stat->branch_tag_seqcode); if (ret) ret = enc_string(ret, stat->last_branch_seqcode); if (ret) ret = enc_string(ret, stat->deep_resubmit_seqcode); - if (ret) ret = enc_sr_container(ret, stat->branch_states); + if (ret) ret = enc_branch_states(ret, stat->branch_states); return ret; } @@ -535,7 +531,7 @@ intJobStat* dec_intJobStat(char *in, char **rest) stat->deep_resubmit_seqcode = dec_string(tmp_in, &tmp_in); } if (tmp_in != NULL) { - stat->branch_states = dec_sr_container(tmp_in, &tmp_in); + stat->branch_states = dec_branch_states(tmp_in, &tmp_in); } } else if (tmp_in != NULL) { edg_wll_FreeStatus(pubstat); @@ -700,89 +696,6 @@ edg_wll_ErrorCode edg_wll_RefreshIColumns(edg_wll_Context ctx, void *job_index_c return edg_wll_Error(ctx, NULL, NULL); } - -/* compare events taking into account shallow resubmission */ -int edg_wll_compare_seq_full(const char *a, const char *b, const char *wn) -{ - unsigned int c[EDG_WLL_SOURCE__LAST]; - unsigned int d[EDG_WLL_SOURCE__LAST]; - unsigned int w[EDG_WLL_SOURCE__LAST]; - int res; - - assert(EDG_WLL_SOURCE__LAST == 9); - - if (wn == NULL) return(edg_wll_compare_seq(a,b)); - - res = sscanf(a, "UI=%d:NS=%d:WM=%d:BH=%d:JSS=%d:LM=%d:LRMS=%d:APP=%d", - &c[EDG_WLL_SOURCE_USER_INTERFACE], - &c[EDG_WLL_SOURCE_NETWORK_SERVER], - &c[EDG_WLL_SOURCE_WORKLOAD_MANAGER], - &c[EDG_WLL_SOURCE_BIG_HELPER], - &c[EDG_WLL_SOURCE_JOB_SUBMISSION], - &c[EDG_WLL_SOURCE_LOG_MONITOR], - &c[EDG_WLL_SOURCE_LRMS], - &c[EDG_WLL_SOURCE_APPLICATION]); - if (res != EDG_WLL_SOURCE__LAST-1) { - syslog(LOG_ERR, "unparsable sequence code %s\n", a); - fprintf(stderr, "unparsable sequence code %s\n", a); - return -1; - } - - res = sscanf(b, "UI=%d:NS=%d:WM=%d:BH=%d:JSS=%d:LM=%d:LRMS=%d:APP=%d", - &d[EDG_WLL_SOURCE_USER_INTERFACE], - &d[EDG_WLL_SOURCE_NETWORK_SERVER], - &d[EDG_WLL_SOURCE_WORKLOAD_MANAGER], - &d[EDG_WLL_SOURCE_BIG_HELPER], - &d[EDG_WLL_SOURCE_JOB_SUBMISSION], - &d[EDG_WLL_SOURCE_LOG_MONITOR], - &d[EDG_WLL_SOURCE_LRMS], - &d[EDG_WLL_SOURCE_APPLICATION]); - if (res != EDG_WLL_SOURCE__LAST-1) { - syslog(LOG_ERR, "unparsable sequence code %s\n", b); - fprintf(stderr, "unparsable sequence code %s\n", b); - return 1; - } - - res = sscanf(wn, "UI=%d:NS=%d:WM=%d:BH=%d:JSS=%d:LM=%d:LRMS=%d:APP=%d", - &w[EDG_WLL_SOURCE_USER_INTERFACE], - &w[EDG_WLL_SOURCE_NETWORK_SERVER], - &w[EDG_WLL_SOURCE_WORKLOAD_MANAGER], - &w[EDG_WLL_SOURCE_BIG_HELPER], - &w[EDG_WLL_SOURCE_JOB_SUBMISSION], - &w[EDG_WLL_SOURCE_LOG_MONITOR], - &w[EDG_WLL_SOURCE_LRMS], - &w[EDG_WLL_SOURCE_APPLICATION]); - if (res != EDG_WLL_SOURCE__LAST-1) { - syslog(LOG_ERR, "unparsable sequence code %s\n", wn); - fprintf(stderr, "unparsable sequence code %s\n", wn); - return 1; - } - - - if (c[EDG_WLL_SOURCE_WORKLOAD_MANAGER] == w[EDG_WLL_SOURCE_WORKLOAD_MANAGER]) { - if (d[EDG_WLL_SOURCE_WORKLOAD_MANAGER] == w[EDG_WLL_SOURCE_WORKLOAD_MANAGER]) - return(edg_wll_compare_seq(a,b)); - else - return 1; - } - else { - return -1; - } - -/* - if ( (c[EDG_WLL_SOURCE_WORKLOAD_MANAGER] == w[EDG_WLL_SOURCE_WORKLOAD_MANAGER]) && - (d[EDG_WLL_SOURCE_WORKLOAD_MANAGER] != w[EDG_WLL_SOURCE_WORKLOAD_MANAGER]) ) - return 1; - - - if ( (c[EDG_WLL_SOURCE_WORKLOAD_MANAGER] != w[EDG_WLL_SOURCE_WORKLOAD_MANAGER]) && - (d[EDG_WLL_SOURCE_WORKLOAD_MANAGER] == w[EDG_WLL_SOURCE_WORKLOAD_MANAGER]) ) - return -1; - - return(edg_wll_compare_seq(a,b)); -*/ -} - int component_seqcode(const char *a, edg_wll_Source index) { unsigned int c[EDG_WLL_SOURCE__LAST]; -- 1.8.2.3