#define RET_BADSEQ 4
#define RET_SUSPECT 5
#define RET_IGNORE 6
+#define RET_BADBRANCH 7
+#define RET_GOODBRANCH 8
+#define RET_TOOOLD 9
#define RET_INTERNAL 100
#define rep(a,b) { free(a); a = (b == NULL) ? NULL : strdup(b); }
static int add_stringlist(char ***, const char *) UNUSED_VAR;
static void free_stringlist(char ***);
static int add_taglist(edg_wll_TagValue **, const char *, const char *);
+static void update_branch_state(char *, char *, char *, char *, sr_container **);
+static void free_branch_state(sr_container **);
+static void load_branch_state(intJobStat *);
int edg_wll_intJobStatus(edg_wll_Context, const edg_wlc_JobId, int, intJobStat *, int);
{
free(p->last_seqcode); p->last_seqcode = NULL;
free(p->last_cancel_seqcode); p->last_cancel_seqcode = NULL;
- p->wontresub = 0;
+ p->resubmit_type = 0;
}
void destroy_intJobStat(intJobStat *p)
#define USABLE(res,strict) ((res) == RET_OK || ( (res) == RET_SOON && !strict))
#define USABLE_DATA(res,strict) ((res) == RET_OK || ( (res) != RET_FATAL && !strict))
+#define USABLE_BRANCH(fine_res) ((fine_res) != RET_TOOOLD && (fine_res) != RET_BADBRANCH)
#define LRMS_STATE(state) ((state) == EDG_WLL_JOB_RUNNING || (state) == EDG_WLL_JOB_DONE)
edg_wll_JobStatCode old_state = js->pub.state;
edg_wll_JobStatCode new_state = EDG_WLL_JOB_UNKNOWN;
- int res = RET_OK;
+ int res = RET_OK,
+ fine_res = RET_OK;
+
+
if (old_state == EDG_WLL_JOB_ABORTED ||
old_state == EDG_WLL_JOB_CANCELLED ||
res = RET_LATE;
}
- if (js->last_seqcode != NULL &&
- edg_wll_compare_seq_full(e->any.seqcode, js->last_seqcode, js->wn_seqcode) < 0) {
+
+ if (js->deep_resubmit_seqcode &&
+ before_deep_resubmission(e->any.seqcode, js->deep_resubmit_seqcode)) {
res = RET_LATE;
+ fine_res = RET_TOOOLD;
}
+ if (js->branch_tag_seqcode) { // ReallyRunning ev. arrived
+ if (same_branch(e->any.seqcode, js->branch_tag_seqcode)) {
+ if ((js->last_seqcode != NULL) &&
+ edg_wll_compare_seq(e->any.seqcode, js->last_branch_seqcode) < 0) {
+ res = RET_LATE;
+ }
+ fine_res = RET_GOODBRANCH;
+ }
+ else {
+ res = RET_LATE;
+ fine_res = RET_BADBRANCH;
+ }
+ }
+ else if ((js->last_seqcode != NULL) &&
+ edg_wll_compare_seq(e->any.seqcode, js->last_seqcode) < 0) {
+ res = RET_LATE;
+ }
+
+
switch (e->any.type) {
case EDG_WLL_EVENT_TRANSFER:
if (e->transfer.result == EDG_WLL_TRANSFER_OK) {
new_state = EDG_WLL_JOB_WAITING; break;
case EDG_WLL_SOURCE_WORKLOAD_MANAGER:
if (LRMS_STATE(old_state)) res = RET_LATE;
+ update_branch_state(e->any.seqcode, NULL,
+ NULL, e->enQueued.job, &js->branch_states);
new_state = EDG_WLL_JOB_READY; break;
case EDG_WLL_SOURCE_LOG_MONITOR:
new_state = EDG_WLL_JOB_WAITING; break;
case EDG_WLL_SOURCE_NETWORK_SERVER:
rep(js->pub.jdl, e->enQueued.job); break;
case EDG_WLL_SOURCE_WORKLOAD_MANAGER:
- rep(js->pub.matched_jdl, e->enQueued.job); break;
+ if (USABLE_BRANCH(res)) {
+ rep(js->pub.matched_jdl, e->enQueued.job);
+ break;
+ }
case EDG_WLL_SOURCE_LOG_MONITOR:
/* no interim JDL here */
break;
e->running.node);
}
if (USABLE_DATA(res, strict)) {
- rep(js->pub.ce_node, e->running.node);
- if (e->running.node)
- add_stringlist(&js->pub.possible_ce_nodes, e->running.node);
+ if (USABLE_BRANCH(fine_res)) {
+ rep(js->pub.ce_node, e->running.node);
+ }
+ if (e->any.source == EDG_WLL_SOURCE_LOG_MONITOR) {
+ if (e->running.node) {
+ update_branch_state(e->any.seqcode, NULL,
+ e->running.node, NULL, &js->branch_states);
+ add_stringlist(&js->pub.possible_ce_nodes,
+ e->running.node);
+ }
+ }
}
break;
case EDG_WLL_EVENT_REALLYRUNNING:
if (USABLE_DATA(res, strict)) {
- rep(js->wn_seqcode, e->reallyRunning.wn_seq);
+ js->pub.payload_running = 1;
+ if (e->any.source == EDG_WLL_SOURCE_LRMS) {
+ rep(js->branch_tag_seqcode, e->any.seqcode);
+ rep(js->last_branch_seqcode, e->any.seqcode);
+ }
+ if (e->any.source == EDG_WLL_SOURCE_LOG_MONITOR) {
+ rep(js->branch_tag_seqcode, e->reallyRunning.wn_seq);
+ rep(js->last_branch_seqcode, e->reallyRunning.wn_seq);
+ }
+
+ load_branch_state(js);
}
break;
-
case EDG_WLL_EVENT_RESUBMISSION:
if (USABLE(res, strict)) {
if (e->resubmission.result == EDG_WLL_RESUBMISSION_WONTRESUB) {
}
if (USABLE_DATA(res, strict)) {
if (e->resubmission.result == EDG_WLL_RESUBMISSION_WONTRESUB) {
- js->wontresub = 1;
+ js->resubmit_type = EDG_WLL_RESUBMISSION_WONTRESUB;
}
- }
- if (USABLE_DATA(res, strict)) {
+ else
if (e->resubmission.result == EDG_WLL_RESUBMISSION_WILLRESUB) {
+ js->resubmit_type = EDG_WLL_RESUBMISSION_WILLRESUB;
free_stringlist(&js->pub.possible_destinations);
free_stringlist(&js->pub.possible_ce_nodes);
- if (js->wn_seqcode) {
- free(js->wn_seqcode);
- js->wn_seqcode = NULL;
- }
+ free_branch_state(&js->branch_states);
+ js->pub.payload_running = 0;
+ rep(js->branch_tag_seqcode, NULL);
+ rep(js->deep_resubmit_seqcode, e->any.seqcode);
+ }
+ else
+ if (e->resubmission.result == EDG_WLL_RESUBMISSION_SHALLOW) {
+ js->resubmit_type = EDG_WLL_RESUBMISSION_SHALLOW;
+ rep(js->deep_resubmit_seqcode, NULL);
}
}
break;
if (USABLE(res, strict)) {
js->pub.state = EDG_WLL_JOB_DONE;
rep(js->pub.reason, e->done.reason);
+ if (fine_res == RET_GOODBRANCH)
+ js->pub.payload_running = 0;
+
switch (e->done.status_code) {
case EDG_WLL_DONE_CANCELLED:
js->pub.state = EDG_WLL_JOB_CANCELLED;
break;
case EDG_WLL_EVENT_CANCEL:
if (js->last_cancel_seqcode != NULL &&
- edg_wll_compare_seq_full(e->any.seqcode, js->last_cancel_seqcode, js->wn_seqcode) < 0) {
+ edg_wll_compare_seq_full(e->any.seqcode, js->last_cancel_seqcode, js->branch_tag_seqcode) < 0) {
res = RET_LATE;
}
if (USABLE(res, strict)) {
js->pub.state = EDG_WLL_JOB_ABORTED;
rep(js->pub.reason, e->abort.reason);
rep(js->pub.location, "none");
+ js->pub.payload_running = 0;
}
break;
e->match.host,
e->match.src_instance);
}
- if (USABLE_DATA(res, strict)) {
+ if (USABLE_DATA(res, strict) && USABLE_BRANCH(fine_res)) {
rep(js->pub.destination, e->match.dest_id);
- if (e->match.dest_id)
- add_stringlist(&js->pub.possible_destinations, e->match.dest_id);
+ if (e->match.dest_id) {
+ update_branch_state(e->any.seqcode, e->match.dest_id,
+ NULL, NULL, &js->branch_states);
+ add_stringlist(&js->pub.possible_destinations,
+ e->match.dest_id);
+ }
}
break;
case EDG_WLL_EVENT_PENDING:
rep(js->last_cancel_seqcode, e->any.seqcode);
} else {
rep(js->last_seqcode, e->any.seqcode);
+ if (fine_res == RET_GOODBRANCH)
+ rep(js->last_branch_seqcode, e->any.seqcode);
}
return res;
}
}
+static void update_branch_state(char *b, char *d, char *c, char *j, sr_container **bs)
+{
+ int i = 0, branch;
+
+
+ if (!b)
+ return;
+ else
+ branch = component_seqcode(b, EDG_WLL_SOURCE_WORKLOAD_MANAGER);
+
+ if (*bs != NULL) {
+ while ((*bs)[i].branch) {
+ if (branch == (*bs)[i].branch) {
+ if (d) rep((*bs)[i].destination, d);
+ if (c) rep((*bs)[i].ce_node, c);
+ if (j) rep((*bs)[i].jdl, j);
+
+ return;
+ }
+ i++;
+ }
+ }
+
+ *bs = (sr_container *) realloc(*bs, (i+2)*sizeof(sr_container));
+ memset(&((*bs)[i]), 0, 2*sizeof(sr_container));
+
+ (*bs)[i].branch = branch;
+ rep((*bs)[i].destination, d);
+ rep((*bs)[i].ce_node, c);
+ rep((*bs)[i].jdl, j);
+}
+
+
+static void free_branch_state(sr_container **bs)
+{
+ int i = 0;
+
+ if (*bs == NULL) return;
+
+ while ((*bs)[i].branch) {
+ free((*bs)[i].destination);
+ free((*bs)[i].ce_node);
+ free((*bs)[i].jdl);
+ i++;
+ }
+ free(*bs);
+ *bs = NULL;
+}
+
+static int compare_branch_states(const void *a, const void *b)
+{
+ sr_container *c = (sr_container *) a;
+ sr_container *d = (sr_container *) b;
+
+ if (c->branch < d->branch) return -1;
+ if (c->branch == d->branch) return 0;
+ if (c->branch > d->branch) return 1;
+}
+
+static void load_branch_state(intJobStat *js)
+{
+ int i, j, branch;
+
+
+ if ( (!js->branch_tag_seqcode) || (!js->branch_states) ) return;
+
+ branch = component_seqcode(js->branch_tag_seqcode, EDG_WLL_SOURCE_WORKLOAD_MANAGER);
+
+ // count elements
+ i = 0;
+ while (js->branch_states[i].branch) i++;
+
+ // sort them
+ qsort(js->branch_states, (size_t) i, sizeof(sr_container),
+ compare_branch_states);
+
+ // find row corresponding to ReallyRunning WM seq.code (aka branch)
+ i = 0;
+ while (js->branch_states[i].branch) {
+ if (js->branch_states[i].branch == branch) break;
+ i++;
+ }
+
+ // copy this and two before branches data to final state
+ // (the newer the more important - so i-th element is copied as last)
+ // (and may overwrite data from previous elements)
+ for (j = i - 2; j <= i; j++) {
+ if (j >= 0) {
+ if (js->branch_states[j].destination)
+ rep(js->pub.destination, js->branch_states[j].destination);
+ if (js->branch_states[j].ce_node)
+ rep(js->pub.ce_node, js->branch_states[j].ce_node);
+ if (js->branch_states[j].jdl)
+ rep(js->pub.matched_jdl, js->branch_states[j].jdl);
+ }
+ }
+}
+
/* XXX more thorough malloc, calloc, and asprintf failure handling */
/* XXX indexes in {short,long}_fields */
/* XXX strict mode */
return out;
}
+static char *enc_sr_container(char *old, sr_container *item)
+{
+ char *ret;
+
+ if (item == NULL) {
+ asprintf(&ret,"%s-1 ", old);
+ free(old);
+ return ret;
+ } else {
+ asprintf(&ret,"%s1 ",old);
+ free(old);
+ if (ret == NULL) return ret;
+ }
+ do {
+ ret = enc_int(ret, (*item).branch);
+ ret = enc_string(ret, (*item).destination);
+ ret = enc_string(ret, (*item).ce_node);
+ ret = enc_string(ret, (*item).jdl);
+ } while ((*(item++)).branch != 0);
+ return ret;
+}
+
+static sr_container *dec_sr_container(char *in, char **rest)
+{
+ sr_container *out;
+ int len = -1, b = 0;
+ char *tmp_in, *tmp_ret;
+ int scret;
+
+ scret = sscanf(in, "%d", &len);
+ if (scret < 1) {
+ *rest = NULL;
+ return NULL;
+ }
+ if (len == -1) {
+ *rest = strchr(in, ' ') ? strchr(in, ' ') + 1 : NULL;
+ return NULL;
+ }
+
+ len = 0;
+ tmp_in = in = strchr(in, ' ') + 1 ;
+ do {
+ b = dec_int(tmp_in, &tmp_in);
+ tmp_ret = dec_string(tmp_in, &tmp_in); free(tmp_ret);
+ if (!tmp_in) { *rest = tmp_in; return NULL; }
+ tmp_ret = dec_string(tmp_in, &tmp_in); free(tmp_ret);
+ if (!tmp_in) { *rest = tmp_in; return NULL; }
+ tmp_ret = dec_string(tmp_in, &tmp_in); free(tmp_ret);
+ if (!tmp_in) { *rest = tmp_in; return NULL; }
+ len++;
+ } while (b != 0);
+
+ out = (sr_container *) calloc(len+1, sizeof(sr_container));
+
+ if (out) {
+ len = 0;
+ tmp_in = in;
+
+ do {
+ out[len].branch = dec_int(tmp_in, &tmp_in);
+ out[len].destination = dec_string(tmp_in, &tmp_in);
+ out[len].ce_node = dec_string(tmp_in, &tmp_in);
+ out[len].jdl = dec_string(tmp_in, &tmp_in);
+ } while (out[len++].branch != 0);
+ *rest = tmp_in;
+ }
+ else
+ *rest = 0;
+
+ return out;
+
+}
+
+
+
static char* enc_taglist(char *old, edg_wll_TagValue *item)
{
char *ret;
{
edg_wll_TagValue *out;
int len = -1;
- char *tmp_in, *tmp_ret, *tmp_ret2;
- int scret;
+ char *tmp_in, *tmp_ret;
+ int scret, end = 0;
scret = sscanf(in, "%d", &len);
if (scret < 1) {
len = 0;
tmp_in = in = strchr(in, ' ') + 1 ;
do {
- tmp_ret2 = dec_string(tmp_in, &tmp_in);
+ tmp_ret = dec_string(tmp_in, &tmp_in);
+ if (tmp_ret) free(tmp_ret);
+ else end = 1;
if (!tmp_in) { *rest = tmp_in; return NULL; }
tmp_ret = dec_string(tmp_in, &tmp_in);
+ free(tmp_ret);
if (!tmp_in) { *rest = tmp_in; return NULL; }
len++;
- } while (tmp_ret2 != NULL);
+ } while (end == 0);
out = (edg_wll_TagValue *) malloc(len*sizeof(edg_wll_TagValue));
if (ret) ret = enc_timeval(ret, stat->lastUpdateTime);
if (ret) ret = enc_int(ret, stat->expectUpdate);
if (ret) ret = enc_string(ret, stat->expectFrom);
+ if (ret) ret = enc_int(ret, stat->payload_running);
if (ret) ret = enc_strlist(ret, stat->possible_destinations);
if (ret) ret = enc_strlist(ret, stat->possible_ce_nodes);
if (tmp_in != NULL) stat->lastUpdateTime = dec_timeval(tmp_in, &tmp_in);
if (tmp_in != NULL) stat->expectUpdate = dec_int(tmp_in, &tmp_in);
if (tmp_in != NULL) stat->expectFrom = dec_string(tmp_in, &tmp_in);
+ if (tmp_in != NULL) stat->payload_running = dec_int(tmp_in, &tmp_in);
if (tmp_in != NULL) stat->possible_destinations = dec_strlist(tmp_in, &tmp_in);
if (tmp_in != NULL) stat->possible_ce_nodes = dec_strlist(tmp_in, &tmp_in);
char *ret;
ret = enc_JobStat(old, &stat->pub);
- if (ret) ret = enc_int(ret, stat->wontresub);
+ if (ret) ret = enc_int(ret, stat->resubmit_type);
if (ret) ret = enc_string(ret, stat->last_seqcode);
if (ret) ret = enc_string(ret, stat->last_cancel_seqcode);
- if (ret) ret = enc_string(ret, stat->wn_seqcode);
+ if (ret) ret = enc_string(ret, stat->branch_tag_seqcode);
+ if (ret) ret = enc_string(ret, stat->last_branch_seqcode);
+ if (ret) ret = enc_string(ret, stat->deep_resubmit_seqcode);
+ if (ret) ret = enc_sr_container(ret, stat->branch_states);
return ret;
}
if (stat != NULL) {
stat->pub = *pubstat;
free(pubstat);
- stat->wontresub = dec_int(tmp_in, &tmp_in);
+ stat->resubmit_type = dec_int(tmp_in, &tmp_in);
if (tmp_in != NULL) {
stat->last_seqcode = dec_string(tmp_in, &tmp_in);
}
stat->last_cancel_seqcode = dec_string(tmp_in, &tmp_in);
}
if (tmp_in != NULL) {
- stat->wn_seqcode = dec_string(tmp_in, &tmp_in);
+ stat->branch_tag_seqcode = dec_string(tmp_in, &tmp_in);
+ }
+ if (tmp_in != NULL) {
+ stat->last_branch_seqcode = dec_string(tmp_in, &tmp_in);
+ }
+ if (tmp_in != NULL) {
+ stat->deep_resubmit_seqcode = dec_string(tmp_in, &tmp_in);
+ }
+ if (tmp_in != NULL) {
+ stat->branch_states = dec_sr_container(tmp_in, &tmp_in);
}
} else if (tmp_in != NULL) {
edg_wll_FreeStatus(pubstat);
unsigned int c[EDG_WLL_SOURCE__LAST];
unsigned int d[EDG_WLL_SOURCE__LAST];
unsigned int w[EDG_WLL_SOURCE__LAST];
- int res, i;
+ int res;
assert(EDG_WLL_SOURCE__LAST == 9);
}
+ if (c[EDG_WLL_SOURCE_WORKLOAD_MANAGER] == w[EDG_WLL_SOURCE_WORKLOAD_MANAGER]) {
+ if (d[EDG_WLL_SOURCE_WORKLOAD_MANAGER] == w[EDG_WLL_SOURCE_WORKLOAD_MANAGER])
+ return(edg_wll_compare_seq(a,b));
+ else
+ return 1;
+ }
+ else {
+ return -1;
+ }
+
+/*
if ( (c[EDG_WLL_SOURCE_WORKLOAD_MANAGER] == w[EDG_WLL_SOURCE_WORKLOAD_MANAGER]) &&
(d[EDG_WLL_SOURCE_WORKLOAD_MANAGER] != w[EDG_WLL_SOURCE_WORKLOAD_MANAGER]) )
return 1;
return -1;
return(edg_wll_compare_seq(a,b));
+*/
}
+int component_seqcode(const char *a, edg_wll_Source index)
+{
+ unsigned int c[EDG_WLL_SOURCE__LAST];
+ int res;
+
+ res = sscanf(a, "UI=%d:NS=%d:WM=%d:BH=%d:JSS=%d:LM=%d:LRMS=%d:APP=%d",
+ &c[EDG_WLL_SOURCE_USER_INTERFACE],
+ &c[EDG_WLL_SOURCE_NETWORK_SERVER],
+ &c[EDG_WLL_SOURCE_WORKLOAD_MANAGER],
+ &c[EDG_WLL_SOURCE_BIG_HELPER],
+ &c[EDG_WLL_SOURCE_JOB_SUBMISSION],
+ &c[EDG_WLL_SOURCE_LOG_MONITOR],
+ &c[EDG_WLL_SOURCE_LRMS],
+ &c[EDG_WLL_SOURCE_APPLICATION]);
+ if (res != EDG_WLL_SOURCE__LAST-1) {
+ syslog(LOG_ERR, "unparsable sequence code %s\n", a);
+ fprintf(stderr, "unparsable sequence code %s\n", a);
+ return -1;
+ }
+
+ return(c[index]);
+}
+
+int before_deep_resubmission(const char *a, const char *b)
+{
+ if (component_seqcode(a, EDG_WLL_SOURCE_WORKLOAD_MANAGER) <
+ component_seqcode(b, EDG_WLL_SOURCE_WORKLOAD_MANAGER) )
+ return(1);
+ else
+ return(0);
+
+}
+
+int same_branch(const char *a, const char *b)
+{
+ if (component_seqcode(a, EDG_WLL_SOURCE_WORKLOAD_MANAGER) ==
+ component_seqcode(b, EDG_WLL_SOURCE_WORKLOAD_MANAGER) )
+ return(1);
+ else
+ return(0);
+}
int edg_wll_compare_seq(const char *a, const char *b)
{