From: Miloš Mulač Date: Thu, 1 Feb 2007 17:15:21 +0000 (+0000) Subject: first step towards PBS job support X-Git-Tag: gridsite-core_R_1_4_0~23 X-Git-Url: http://scientific.zcu.cz/git/?a=commitdiff_plain;h=2bdd8d1555c918293a0747e20f9affcca4aa5a3c;p=jra1mw.git first step towards PBS job support - added PBS related fields to jobStat structure - very basic support in test client - preliminary state automaton --- diff --git a/org.glite.lb.client/examples/job_status.c b/org.glite.lb.client/examples/job_status.c index 2a55424..289183b 100644 --- a/org.glite.lb.client/examples/job_status.c +++ b/org.glite.lb.client/examples/job_status.c @@ -198,6 +198,7 @@ static void printstat(edg_wll_JobStat stat, int level) s = edg_wll_StatToString(stat.state); /* print whole flat structure */ printf("%sstate : %s\n", ind, s); + if (stat.pbs_state) printf("%sPBS state : %s\n", ind, stat.pbs_state); printf("%sjobId : %s\n", ind, j = edg_wlc_JobIdUnparse(stat.jobId)); printf("%sowner : %s\n", ind, stat.owner); switch (stat.jobtype) { @@ -210,6 +211,9 @@ static void printstat(edg_wll_JobStat stat, int level) case EDG_WLL_STAT_COLLECTION: printf("%sjobtype : COLLECTION\n", ind); break; + case EDG_WLL_STAT_PBS: + printf("%sjobtype : PBS\n", ind); + break; default: break; } @@ -227,7 +231,7 @@ static void printstat(edg_wll_JobStat stat, int level) for (i=0; stat.children_states[i].state; i++) printstat(stat.children_states[i], level+1); printf("%schildren_hist :\n",ind); - if (stat.children_hist) + if (stat.children && stat.children_hist) for (i=1; i<=stat.children_hist[0]; i++) printf("%s%14s %d\n", ind, edg_wll_StatToString(i-1),stat.children_hist[i]); } diff --git a/org.glite.lb.server/src/jobstat_supp.c b/org.glite.lb.server/src/jobstat_supp.c index dfff581..ba28baa 100644 --- a/org.glite.lb.server/src/jobstat_supp.c +++ b/org.glite.lb.server/src/jobstat_supp.c @@ -498,6 +498,14 @@ static char *enc_JobStat(char *old, edg_wll_JobStat* stat) if (ret) ret = enc_strlist(ret, stat->possible_destinations); if (ret) ret = enc_strlist(ret, stat->possible_ce_nodes); if (ret) ret = enc_int_array(ret, stat->children_hist, EDG_WLL_NUMBER_OF_STATCODES); + if (ret) ret = enc_string(ret, stat->pbs_state); + if (ret) ret = enc_string(ret, stat->pbs_queue); + if (ret) ret = enc_string(ret, stat->pbs_owner); + if (ret) ret = enc_string(ret, stat->pbs_name); + if (ret) ret = enc_string(ret, stat->pbs_scheduler); + if (ret) ret = enc_string(ret, stat->pbs_dest_host); + if (ret) ret = enc_int(ret, stat->pbs_pid); + if (ret) ret = enc_int(ret, stat->pbs_exit_status); return ret; } @@ -548,7 +556,16 @@ static edg_wll_JobStat* dec_JobStat(char *in, char **rest) if (tmp_in != NULL) stat->possible_ce_nodes = dec_strlist(tmp_in, &tmp_in); if (tmp_in != NULL) { stat->children_hist = (int*)calloc(EDG_WLL_NUMBER_OF_STATCODES+1, sizeof(int)); - dec_int_array(tmp_in, &tmp_in, stat->children_hist); } + dec_int_array(tmp_in, &tmp_in, stat->children_hist); + } + if (tmp_in != NULL) stat->pbs_state = dec_string(tmp_in, &tmp_in); + if (tmp_in != NULL) stat->pbs_queue = dec_string(tmp_in, &tmp_in); + if (tmp_in != NULL) stat->pbs_owner = dec_string(tmp_in, &tmp_in); + if (tmp_in != NULL) stat->pbs_name = dec_string(tmp_in, &tmp_in); + if (tmp_in != NULL) stat->pbs_scheduler = dec_string(tmp_in, &tmp_in); + if (tmp_in != NULL) stat->pbs_dest_host = dec_string(tmp_in, &tmp_in); + if (tmp_in != NULL) stat->pbs_pid = dec_int(tmp_in, &tmp_in); + if (tmp_in != NULL) stat->pbs_exit_status = dec_int(tmp_in, &tmp_in); *rest = tmp_in; @@ -568,6 +585,7 @@ char *enc_intJobStat(char *old, intJobStat* stat) if (ret) ret = enc_string(ret, stat->deep_resubmit_seqcode); if (ret) ret = enc_branch_states(ret, stat->branch_states); if (ret) ret = enc_int_array(ret, stat->children_done_hist, EDG_WLL_NUMBER_OF_DONE_CODES-1); + if (ret) ret = enc_timeval(ret, stat->last_pbs_event_timestamp); return ret; } @@ -607,6 +625,9 @@ intJobStat* dec_intJobStat(char *in, char **rest) if (tmp_in != NULL) { dec_int_array(tmp_in, &tmp_in, stat->children_done_hist); } + if (tmp_in != NULL) { + stat->last_pbs_event_timestamp = dec_timeval(tmp_in, &tmp_in); + } } else if (tmp_in != NULL) { edg_wll_FreeStatus(pubstat); free(pubstat); diff --git a/org.glite.lb.server/src/process_event_pbs.c b/org.glite.lb.server/src/process_event_pbs.c index 264e9d1..f125ed0 100644 --- a/org.glite.lb.server/src/process_event_pbs.c +++ b/org.glite.lb.server/src/process_event_pbs.c @@ -20,10 +20,102 @@ #define UNUSED_VAR #endif +static int compare_timestamps(struct timeval a, struct timeval b) +{ + if ( (a.tv_sec > b.tv_sec) || + ((a.tv_sec == b.tv_sec) && (a.tv_usec > b.tv_usec)) ) return 1; + if ( (a.tv_sec < b.tv_sec) || + ((a.tv_sec == b.tv_sec) && (a.tv_usec < b.tv_usec)) ) return -1; + return 0; +} + +// XXX move this defines into some common place to be reusable +#define USABLE(res) ((res) == RET_OK) +#define USABLE_DATA(res) (1) +#define rep(a,b) { free(a); a = (b == NULL) ? NULL : strdup(b); } + int processEvent_PBS(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict, char **errstring) { + edg_wll_JobStatCode old_state = js->pub.state; + int res = RET_OK; + + fputs("processEvent_PBS()",stderr); + if (compare_timestamps(js->last_pbs_event_timestamp, e->any.timestamp) > 0) + res = RET_LATE; + + switch (e->any.type) { + case EDG_WLL_EVENT_REGJOB: + if (USABLE(res)) { + js->pub.state = EDG_WLL_JOB_SUBMITTED; + rep(js->pub.pbs_state, "Q"); + } + if (USABLE_DATA(res)) { + } + break; + case EDG_WLL_EVENT_PBSREG: + if (USABLE(res)) { + js->pub.state = EDG_WLL_JOB_SUBMITTED; + rep(js->pub.pbs_state, "Q"); + } + if (USABLE_DATA(res)) { + js->pub.pbs_queue = strdup(e->pBSReg.queue); + } + break; + case EDG_WLL_EVENT_PBSQUEUED: + if (USABLE(res)) { + js->pub.state = EDG_WLL_JOB_WAITING; + rep(js->pub.pbs_state, "Q"); + } + if (USABLE_DATA(res)) { + if (!js->pub.pbs_queue) + strdup(e->pBSQueued.queue); + assert(!strcmp(js->pub.pbs_queue, e->pBSQueued.queue)); + rep(js->pub.pbs_owner,e->pBSQueued.owner); + rep(js->pub.pbs_name,e->pBSQueued.name); + } + break; + case EDG_WLL_EVENT_PBSPLAN: + if (USABLE(res)) { + js->pub.state = EDG_WLL_JOB_READY; + rep(js->pub.pbs_state, "Q"); + } + if (USABLE_DATA(res)) { + } + break; + case EDG_WLL_EVENT_PBSRUN: + if (USABLE(res)) { + js->pub.state = EDG_WLL_JOB_RUNNING; + rep(js->pub.pbs_state, "R"); + } + if (USABLE_DATA(res)) { + rep(js->pub.pbs_scheduler, e->pBSRun.scheduler); + rep(js->pub.pbs_dest_host, e->pBSRun.dest_host); + js->pub.pbs_pid = e->pBSRun.pid; + } + break; + case EDG_WLL_EVENT_PBSDONE: + if (USABLE(res)) { + js->pub.state = EDG_WLL_JOB_DONE; + rep(js->pub.pbs_state, "C"); + } + if (USABLE_DATA(res)) { + js->pub.pbs_exit_status = e->pBSDone.exit_status; + } + break; + default: + break; + } + + if (USABLE(res)) { + js->pub.lastUpdateTime = e->any.timestamp; + if (old_state != js->pub.state) { + js->pub.stateEnterTime = js->pub.lastUpdateTime; + js->pub.stateEnterTimes[1 + js->pub.state] + = (int)js->pub.lastUpdateTime.tv_sec; + } + } if (! js->pub.location) js->pub.location = strdup("this is PBS"); return RET_OK; } diff --git a/org.glite.lb/project/status.T b/org.glite.lb/project/status.T index 9d554c4..71237d3 100644 --- a/org.glite.lb/project/status.T +++ b/org.glite.lb/project/status.T @@ -75,6 +75,15 @@ strlist possible_ce_nodes CE nodes matching to possible_destinations bool suspended Job is suspended string suspend_reason Reason for the suspend +string pbs_state Job state which would probably return PBS qstat (Q/R/C/....) +string pbs_queue Name of queue in which is job queued +string pbs_owner Owner of job +string pbs_name Name of job +string pbs_scheduler Name of pbs scheduler +string pbs_dest_host Hostname of node whre job is running +int pbs_pid PID of running job +int pbs_exit_status Job exit status + @type Submitted Entered by the user to the User Interface or registered by Job Partitioner. @type Waiting Accepted by WMS, waiting for resource allocation. @type Ready Matching resources found.