s = edg_wll_StatToString(stat.state);
/* print whole flat structure */
printf("%sstate : %s\n", ind, s);
+ if (stat.pbs_state) printf("%sPBS state : %s\n", ind, stat.pbs_state);
printf("%sjobId : %s\n", ind, j = edg_wlc_JobIdUnparse(stat.jobId));
printf("%sowner : %s\n", ind, stat.owner);
switch (stat.jobtype) {
case EDG_WLL_STAT_COLLECTION:
printf("%sjobtype : COLLECTION\n", ind);
break;
+ case EDG_WLL_STAT_PBS:
+ printf("%sjobtype : PBS\n", ind);
+ break;
default:
break;
}
for (i=0; stat.children_states[i].state; i++)
printstat(stat.children_states[i], level+1);
printf("%schildren_hist :\n",ind);
- if (stat.children_hist)
+ if (stat.children && stat.children_hist)
for (i=1; i<=stat.children_hist[0]; i++)
printf("%s%14s %d\n", ind, edg_wll_StatToString(i-1),stat.children_hist[i]);
}
if (ret) ret = enc_strlist(ret, stat->possible_destinations);
if (ret) ret = enc_strlist(ret, stat->possible_ce_nodes);
if (ret) ret = enc_int_array(ret, stat->children_hist, EDG_WLL_NUMBER_OF_STATCODES);
+ if (ret) ret = enc_string(ret, stat->pbs_state);
+ if (ret) ret = enc_string(ret, stat->pbs_queue);
+ if (ret) ret = enc_string(ret, stat->pbs_owner);
+ if (ret) ret = enc_string(ret, stat->pbs_name);
+ if (ret) ret = enc_string(ret, stat->pbs_scheduler);
+ if (ret) ret = enc_string(ret, stat->pbs_dest_host);
+ if (ret) ret = enc_int(ret, stat->pbs_pid);
+ if (ret) ret = enc_int(ret, stat->pbs_exit_status);
return ret;
}
if (tmp_in != NULL) stat->possible_ce_nodes = dec_strlist(tmp_in, &tmp_in);
if (tmp_in != NULL) {
stat->children_hist = (int*)calloc(EDG_WLL_NUMBER_OF_STATCODES+1, sizeof(int));
- dec_int_array(tmp_in, &tmp_in, stat->children_hist); }
+ dec_int_array(tmp_in, &tmp_in, stat->children_hist);
+ }
+ if (tmp_in != NULL) stat->pbs_state = dec_string(tmp_in, &tmp_in);
+ if (tmp_in != NULL) stat->pbs_queue = dec_string(tmp_in, &tmp_in);
+ if (tmp_in != NULL) stat->pbs_owner = dec_string(tmp_in, &tmp_in);
+ if (tmp_in != NULL) stat->pbs_name = dec_string(tmp_in, &tmp_in);
+ if (tmp_in != NULL) stat->pbs_scheduler = dec_string(tmp_in, &tmp_in);
+ if (tmp_in != NULL) stat->pbs_dest_host = dec_string(tmp_in, &tmp_in);
+ if (tmp_in != NULL) stat->pbs_pid = dec_int(tmp_in, &tmp_in);
+ if (tmp_in != NULL) stat->pbs_exit_status = dec_int(tmp_in, &tmp_in);
*rest = tmp_in;
if (ret) ret = enc_string(ret, stat->deep_resubmit_seqcode);
if (ret) ret = enc_branch_states(ret, stat->branch_states);
if (ret) ret = enc_int_array(ret, stat->children_done_hist, EDG_WLL_NUMBER_OF_DONE_CODES-1);
+ if (ret) ret = enc_timeval(ret, stat->last_pbs_event_timestamp);
return ret;
}
if (tmp_in != NULL) {
dec_int_array(tmp_in, &tmp_in, stat->children_done_hist);
}
+ if (tmp_in != NULL) {
+ stat->last_pbs_event_timestamp = dec_timeval(tmp_in, &tmp_in);
+ }
} else if (tmp_in != NULL) {
edg_wll_FreeStatus(pubstat);
free(pubstat);
#define UNUSED_VAR
#endif
+static int compare_timestamps(struct timeval a, struct timeval b)
+{
+ if ( (a.tv_sec > b.tv_sec) ||
+ ((a.tv_sec == b.tv_sec) && (a.tv_usec > b.tv_usec)) ) return 1;
+ if ( (a.tv_sec < b.tv_sec) ||
+ ((a.tv_sec == b.tv_sec) && (a.tv_usec < b.tv_usec)) ) return -1;
+ return 0;
+}
+
+// XXX move this defines into some common place to be reusable
+#define USABLE(res) ((res) == RET_OK)
+#define USABLE_DATA(res) (1)
+#define rep(a,b) { free(a); a = (b == NULL) ? NULL : strdup(b); }
+
int processEvent_PBS(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict, char **errstring)
{
+ edg_wll_JobStatCode old_state = js->pub.state;
+ int res = RET_OK;
+
+
fputs("processEvent_PBS()",stderr);
+ if (compare_timestamps(js->last_pbs_event_timestamp, e->any.timestamp) > 0)
+ res = RET_LATE;
+
+ switch (e->any.type) {
+ case EDG_WLL_EVENT_REGJOB:
+ if (USABLE(res)) {
+ js->pub.state = EDG_WLL_JOB_SUBMITTED;
+ rep(js->pub.pbs_state, "Q");
+ }
+ if (USABLE_DATA(res)) {
+ }
+ break;
+ case EDG_WLL_EVENT_PBSREG:
+ if (USABLE(res)) {
+ js->pub.state = EDG_WLL_JOB_SUBMITTED;
+ rep(js->pub.pbs_state, "Q");
+ }
+ if (USABLE_DATA(res)) {
+ js->pub.pbs_queue = strdup(e->pBSReg.queue);
+ }
+ break;
+ case EDG_WLL_EVENT_PBSQUEUED:
+ if (USABLE(res)) {
+ js->pub.state = EDG_WLL_JOB_WAITING;
+ rep(js->pub.pbs_state, "Q");
+ }
+ if (USABLE_DATA(res)) {
+ if (!js->pub.pbs_queue)
+ strdup(e->pBSQueued.queue);
+ assert(!strcmp(js->pub.pbs_queue, e->pBSQueued.queue));
+ rep(js->pub.pbs_owner,e->pBSQueued.owner);
+ rep(js->pub.pbs_name,e->pBSQueued.name);
+ }
+ break;
+ case EDG_WLL_EVENT_PBSPLAN:
+ if (USABLE(res)) {
+ js->pub.state = EDG_WLL_JOB_READY;
+ rep(js->pub.pbs_state, "Q");
+ }
+ if (USABLE_DATA(res)) {
+ }
+ break;
+ case EDG_WLL_EVENT_PBSRUN:
+ if (USABLE(res)) {
+ js->pub.state = EDG_WLL_JOB_RUNNING;
+ rep(js->pub.pbs_state, "R");
+ }
+ if (USABLE_DATA(res)) {
+ rep(js->pub.pbs_scheduler, e->pBSRun.scheduler);
+ rep(js->pub.pbs_dest_host, e->pBSRun.dest_host);
+ js->pub.pbs_pid = e->pBSRun.pid;
+ }
+ break;
+ case EDG_WLL_EVENT_PBSDONE:
+ if (USABLE(res)) {
+ js->pub.state = EDG_WLL_JOB_DONE;
+ rep(js->pub.pbs_state, "C");
+ }
+ if (USABLE_DATA(res)) {
+ js->pub.pbs_exit_status = e->pBSDone.exit_status;
+ }
+ break;
+ default:
+ break;
+ }
+
+ if (USABLE(res)) {
+ js->pub.lastUpdateTime = e->any.timestamp;
+ if (old_state != js->pub.state) {
+ js->pub.stateEnterTime = js->pub.lastUpdateTime;
+ js->pub.stateEnterTimes[1 + js->pub.state]
+ = (int)js->pub.lastUpdateTime.tv_sec;
+ }
+ }
if (! js->pub.location) js->pub.location = strdup("this is PBS");
return RET_OK;
}
bool suspended Job is suspended
string suspend_reason Reason for the suspend
+string pbs_state Job state which would probably return PBS qstat (Q/R/C/....)
+string pbs_queue Name of queue in which is job queued
+string pbs_owner Owner of job
+string pbs_name Name of job
+string pbs_scheduler Name of pbs scheduler
+string pbs_dest_host Hostname of node whre job is running
+int pbs_pid PID of running job
+int pbs_exit_status Job exit status
+
@type Submitted Entered by the user to the User Interface or registered by Job Partitioner.
@type Waiting Accepted by WMS, waiting for resource allocation.
@type Ready Matching resources found.