first step towards PBS job support
authorMiloš Mulač <mulac@civ.zcu.cz>
Thu, 1 Feb 2007 17:15:21 +0000 (17:15 +0000)
committerMiloš Mulač <mulac@civ.zcu.cz>
Thu, 1 Feb 2007 17:15:21 +0000 (17:15 +0000)
- added PBS related fields to jobStat structure
- very basic support in test client
- preliminary state automaton

org.glite.lb.client/examples/job_status.c
org.glite.lb.server/src/jobstat_supp.c
org.glite.lb.server/src/process_event_pbs.c
org.glite.lb/project/status.T

index 2a55424..289183b 100644 (file)
@@ -198,6 +198,7 @@ static void printstat(edg_wll_JobStat stat, int level)
        s = edg_wll_StatToString(stat.state); 
 /* print whole flat structure */
        printf("%sstate : %s\n", ind, s);
+       if (stat.pbs_state) printf("%sPBS state : %s\n", ind, stat.pbs_state);
        printf("%sjobId : %s\n", ind, j = edg_wlc_JobIdUnparse(stat.jobId));
        printf("%sowner : %s\n", ind, stat.owner);
        switch (stat.jobtype) {
@@ -210,6 +211,9 @@ static void printstat(edg_wll_JobStat stat, int level)
                case EDG_WLL_STAT_COLLECTION:
                        printf("%sjobtype : COLLECTION\n", ind);
                         break;
+               case EDG_WLL_STAT_PBS:
+                       printf("%sjobtype : PBS\n", ind);
+                        break;
                default:
                        break;
        }
@@ -227,7 +231,7 @@ static void printstat(edg_wll_JobStat stat, int level)
                        for  (i=0; stat.children_states[i].state; i++)
                                printstat(stat.children_states[i], level+1);
                printf("%schildren_hist :\n",ind);
-               if (stat.children_hist) 
+               if (stat.children && stat.children_hist) 
                        for (i=1; i<=stat.children_hist[0]; i++) 
                                printf("%s%14s  %d\n", ind, edg_wll_StatToString(i-1),stat.children_hist[i]);
        }
index dfff581..ba28baa 100644 (file)
@@ -498,6 +498,14 @@ static char *enc_JobStat(char *old, edg_wll_JobStat* stat)
        if (ret) ret = enc_strlist(ret, stat->possible_destinations);
        if (ret) ret = enc_strlist(ret, stat->possible_ce_nodes);
        if (ret) ret = enc_int_array(ret, stat->children_hist, EDG_WLL_NUMBER_OF_STATCODES);
+       if (ret) ret = enc_string(ret, stat->pbs_state);
+       if (ret) ret = enc_string(ret, stat->pbs_queue);
+       if (ret) ret = enc_string(ret, stat->pbs_owner);
+       if (ret) ret = enc_string(ret, stat->pbs_name);
+       if (ret) ret = enc_string(ret, stat->pbs_scheduler);
+       if (ret) ret = enc_string(ret, stat->pbs_dest_host);
+       if (ret) ret = enc_int(ret, stat->pbs_pid);
+       if (ret) ret = enc_int(ret, stat->pbs_exit_status);
 
        return ret;
 }
@@ -548,7 +556,16 @@ static edg_wll_JobStat* dec_JobStat(char *in, char **rest)
         if (tmp_in != NULL) stat->possible_ce_nodes = dec_strlist(tmp_in, &tmp_in);
         if (tmp_in != NULL) {
                            stat->children_hist = (int*)calloc(EDG_WLL_NUMBER_OF_STATCODES+1, sizeof(int));
-                           dec_int_array(tmp_in, &tmp_in, stat->children_hist); }
+                           dec_int_array(tmp_in, &tmp_in, stat->children_hist);
+       }
+        if (tmp_in != NULL) stat->pbs_state = dec_string(tmp_in, &tmp_in);
+        if (tmp_in != NULL) stat->pbs_queue = dec_string(tmp_in, &tmp_in);
+        if (tmp_in != NULL) stat->pbs_owner = dec_string(tmp_in, &tmp_in);
+        if (tmp_in != NULL) stat->pbs_name = dec_string(tmp_in, &tmp_in);
+        if (tmp_in != NULL) stat->pbs_scheduler = dec_string(tmp_in, &tmp_in);
+        if (tmp_in != NULL) stat->pbs_dest_host = dec_string(tmp_in, &tmp_in);
+        if (tmp_in != NULL) stat->pbs_pid = dec_int(tmp_in, &tmp_in);
+        if (tmp_in != NULL) stat->pbs_exit_status = dec_int(tmp_in, &tmp_in);
 
        *rest = tmp_in;
 
@@ -568,6 +585,7 @@ char *enc_intJobStat(char *old, intJobStat* stat)
        if (ret) ret = enc_string(ret, stat->deep_resubmit_seqcode);
        if (ret) ret = enc_branch_states(ret, stat->branch_states);
        if (ret) ret = enc_int_array(ret, stat->children_done_hist, EDG_WLL_NUMBER_OF_DONE_CODES-1);
+       if (ret) ret = enc_timeval(ret, stat->last_pbs_event_timestamp);
        return ret;
 }
 
@@ -607,6 +625,9 @@ intJobStat* dec_intJobStat(char *in, char **rest)
                if (tmp_in != NULL) {
                        dec_int_array(tmp_in, &tmp_in, stat->children_done_hist);
                }
+               if (tmp_in != NULL) {
+                       stat->last_pbs_event_timestamp = dec_timeval(tmp_in, &tmp_in);
+               }
        } else if (tmp_in != NULL) {
                edg_wll_FreeStatus(pubstat);
                free(pubstat);
index 264e9d1..f125ed0 100644 (file)
 #define UNUSED_VAR
 #endif
 
+static int compare_timestamps(struct timeval a, struct timeval b)
+{
+       if ( (a.tv_sec > b.tv_sec) || 
+               ((a.tv_sec == b.tv_sec) && (a.tv_usec > b.tv_usec)) ) return 1;
+       if ( (a.tv_sec < b.tv_sec) ||
+                ((a.tv_sec == b.tv_sec) && (a.tv_usec < b.tv_usec)) ) return -1;
+       return 0;
+}
+
+// XXX move this defines into some common place to be reusable
+#define USABLE(res) ((res) == RET_OK)
+#define USABLE_DATA(res) (1)
+#define rep(a,b) { free(a); a = (b == NULL) ? NULL : strdup(b); }
+
 int processEvent_PBS(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict, char **errstring)
 {
+       edg_wll_JobStatCode     old_state = js->pub.state;
+       int                     res = RET_OK;
+
+
        fputs("processEvent_PBS()",stderr);
 
+       if (compare_timestamps(js->last_pbs_event_timestamp, e->any.timestamp) > 0)
+               res = RET_LATE; 
+
+       switch (e->any.type) {
+               case EDG_WLL_EVENT_REGJOB:
+                       if (USABLE(res)) {
+                               js->pub.state = EDG_WLL_JOB_SUBMITTED;
+                               rep(js->pub.pbs_state, "Q");
+                       }
+                       if (USABLE_DATA(res)) {
+                       }
+                       break;
+               case EDG_WLL_EVENT_PBSREG:
+                       if (USABLE(res)) {
+                               js->pub.state = EDG_WLL_JOB_SUBMITTED;
+                               rep(js->pub.pbs_state, "Q");
+                       }
+                       if (USABLE_DATA(res)) {
+                               js->pub.pbs_queue = strdup(e->pBSReg.queue);
+                       }
+                       break;
+               case EDG_WLL_EVENT_PBSQUEUED:
+                       if (USABLE(res)) {
+                               js->pub.state = EDG_WLL_JOB_WAITING;
+                               rep(js->pub.pbs_state, "Q");
+                       }
+                       if (USABLE_DATA(res)) {
+                               if (!js->pub.pbs_queue)
+                                       strdup(e->pBSQueued.queue);
+                               assert(!strcmp(js->pub.pbs_queue, e->pBSQueued.queue));
+                               rep(js->pub.pbs_owner,e->pBSQueued.owner);
+                               rep(js->pub.pbs_name,e->pBSQueued.name);
+                       }
+                       break;
+               case EDG_WLL_EVENT_PBSPLAN:
+                       if (USABLE(res)) {
+                               js->pub.state = EDG_WLL_JOB_READY;
+                               rep(js->pub.pbs_state, "Q");
+                       }
+                       if (USABLE_DATA(res)) {
+                       }
+                       break;
+               case EDG_WLL_EVENT_PBSRUN:
+                       if (USABLE(res)) {
+                               js->pub.state = EDG_WLL_JOB_RUNNING;
+                               rep(js->pub.pbs_state, "R");
+                       }
+                       if (USABLE_DATA(res)) {
+                               rep(js->pub.pbs_scheduler, e->pBSRun.scheduler);
+                               rep(js->pub.pbs_dest_host, e->pBSRun.dest_host);
+                               js->pub.pbs_pid = e->pBSRun.pid;
+                       }
+                       break;
+               case EDG_WLL_EVENT_PBSDONE:
+                       if (USABLE(res)) {
+                               js->pub.state = EDG_WLL_JOB_DONE;
+                               rep(js->pub.pbs_state, "C");
+                       }
+                       if (USABLE_DATA(res)) {
+                               js->pub.pbs_exit_status =  e->pBSDone.exit_status;      
+                       }
+                       break;
+               default:
+                       break;
+       }
+
+       if (USABLE(res)) {
+               js->pub.lastUpdateTime = e->any.timestamp;
+               if (old_state != js->pub.state) {
+                       js->pub.stateEnterTime = js->pub.lastUpdateTime;
+                       js->pub.stateEnterTimes[1 + js->pub.state]
+                               = (int)js->pub.lastUpdateTime.tv_sec;
+               }
+       }
        if (! js->pub.location) js->pub.location = strdup("this is PBS");
        return RET_OK;
 }
index 9d554c4..71237d3 100644 (file)
@@ -75,6 +75,15 @@ strlist possible_ce_nodes    CE nodes matching to possible_destinations
 bool   suspended       Job is suspended
 string suspend_reason  Reason for the suspend
 
+string pbs_state       Job state which would probably return PBS qstat (Q/R/C/....)
+string  pbs_queue      Name of queue in which is job queued
+string  pbs_owner      Owner of job
+string  pbs_name       Name of job
+string  pbs_scheduler  Name of pbs scheduler
+string  pbs_dest_host  Hostname of node whre job is running
+int    pbs_pid         PID of running job
+int    pbs_exit_status Job exit status
+
 @type Submitted                Entered by the user to the User Interface or registered by Job Partitioner.
 @type Waiting          Accepted by WMS, waiting for resource allocation.
 @type Ready            Matching resources found.