From: Michal Voců Date: Fri, 4 Nov 2011 11:45:14 +0000 (+0000) Subject: new PBS seq code ordering and fixes, PBS state machine X-Git-Tag: merge_torque_to_head_src~9 X-Git-Url: http://scientific.zcu.cz/git/?a=commitdiff_plain;h=99dcbb7ffd4cd9616d066df7957bae768cb95626;p=jra1mw.git new PBS seq code ordering and fixes, PBS state machine --- diff --git a/org.glite.lb.state-machine/interface/seqcode_aux.h b/org.glite.lb.state-machine/interface/seqcode_aux.h index daeb67c..19fbf3a 100644 --- a/org.glite.lb.state-machine/interface/seqcode_aux.h +++ b/org.glite.lb.state-machine/interface/seqcode_aux.h @@ -26,9 +26,8 @@ int before_deep_resubmission(const char *a, const char *b); int same_branch(const char *a, const char *b) ; int edg_wll_compare_pbs_seq(const char *a,const char *b); -#define edg_wll_compare_condor_seq edg_wll_compare_pbs_seq -edg_wll_PBSEventSource get_pbs_event_source(const char *pbs_seq_num) ; +int edg_wll_compare_condor_seq(const char *a,const char *b); edg_wll_CondorEventSource get_condor_event_source(const char *condor_seq_num) ; diff --git a/org.glite.lb.state-machine/src/process_event_pbs.c b/org.glite.lb.state-machine/src/process_event_pbs.c index 5e712e5..4f237da 100644 --- a/org.glite.lb.state-machine/src/process_event_pbs.c +++ b/org.glite.lb.state-machine/src/process_event_pbs.c @@ -56,6 +56,93 @@ static int compare_timestamps(struct timeval a, struct timeval b) #define rep(a,b) { free(a); a = (b == NULL) ? NULL : strdup(b); } #define rep_cond(a,b) { if (b) { free(a); a = strdup(b); } } +/* maps PBS/Torque substates to LB job states */ +static int _PBSsubstate2lbstate[] = { + /* TRANSIN */ EDG_WLL_JOB_SUBMITTED, + /* TRANSICM */ EDG_WLL_JOB_SUBMITTED, + /* TRNOUT */ EDG_WLL_JOB_SUBMITTED, + /* TRNOUTCM */ EDG_WLL_JOB_SUBMITTED, + /* SUBSTATE04 */ EDG_WLL_JOB_UNKNOWN, + /* SUBSTATE05 */ EDG_WLL_JOB_UNKNOWN, + /* SUBSTATE06 */ EDG_WLL_JOB_UNKNOWN, + /* SUBSTATE07 */ EDG_WLL_JOB_UNKNOWN, + /* SUBSTATE08 */ EDG_WLL_JOB_UNKNOWN, + /* SUBSTATE09 */ EDG_WLL_JOB_UNKNOWN, + /* QUEUED */ EDG_WLL_JOB_WAITING, + /* PRESTAGEIN */ EDG_WLL_JOB_READY, + /* SUBSTATE12 */ EDG_WLL_JOB_UNKNOWN, + /* SYNCRES */ EDG_WLL_JOB_READY, + /* STAGEIN */ EDG_WLL_JOB_READY, + /* STAGEGO */ EDG_WLL_JOB_SCHEDULED, + /* STAGECMP */ EDG_WLL_JOB_READY, + /* SUBSTATE17 */ EDG_WLL_JOB_UNKNOWN, + /* SUBSTATE18 */ EDG_WLL_JOB_UNKNOWN, + /* SUBSTATE19 */ EDG_WLL_JOB_UNKNOWN, + /* HELD */ EDG_WLL_JOB_WAITING, + /* SYNCHOLD */ EDG_WLL_JOB_WAITING, + /* DEPNHOLD */ EDG_WLL_JOB_WAITING, + /* SUBSTATE23 */ EDG_WLL_JOB_UNKNOWN, + /* SUBSTATE24 */ EDG_WLL_JOB_UNKNOWN, + /* SUBSTATE25 */ EDG_WLL_JOB_UNKNOWN, + /* SUBSTATE26 */ EDG_WLL_JOB_UNKNOWN, + /* SUBSTATE27 */ EDG_WLL_JOB_UNKNOWN, + /* SUBSTATE28 */ EDG_WLL_JOB_UNKNOWN, + /* SUBSTATE29 */ EDG_WLL_JOB_UNKNOWN, + /* WAITING */ EDG_WLL_JOB_WAITING, + /* SUBSTATE31 */ EDG_WLL_JOB_UNKNOWN, + /* SUBSTATE32 */ EDG_WLL_JOB_UNKNOWN, + /* SUBSTATE33 */ EDG_WLL_JOB_UNKNOWN, + /* SUBSTATE34 */ EDG_WLL_JOB_UNKNOWN, + /* SUBSTATE35 */ EDG_WLL_JOB_UNKNOWN, + /* SUBSTATE36 */ EDG_WLL_JOB_UNKNOWN, + /* STAGEFAIL */ EDG_WLL_JOB_UNKNOWN, + /* SUBSTATE38 */ EDG_WLL_JOB_UNKNOWN, + /* SUBSTATE39 */ EDG_WLL_JOB_UNKNOWN, + /* PRERUN */ EDG_WLL_JOB_SCHEDULED, + /* STARTING */ EDG_WLL_JOB_SCHEDULED, + /* RUNNING */ EDG_WLL_JOB_RUNNING, + /* SUSPEND */ EDG_WLL_JOB_WAITING, + /* SUBSTATE44 */ EDG_WLL_JOB_UNKNOWN, + /* SUBSTATE45 */ EDG_WLL_JOB_UNKNOWN, + /* SUBSTATE46 */ EDG_WLL_JOB_UNKNOWN, + /* SUBSTATE47 */ EDG_WLL_JOB_UNKNOWN, + /* SUBSTATE48 */ EDG_WLL_JOB_UNKNOWN, + /* SUBSTATE49 */ EDG_WLL_JOB_UNKNOWN, + /* EXITING */ EDG_WLL_JOB_RUNNING, + /* STAGEOUT */ EDG_WLL_JOB_RUNNING, + /* STAGEDEL */ EDG_WLL_JOB_RUNNING, + /* EXITED */ EDG_WLL_JOB_RUNNING, + /* ABORT */ EDG_WLL_JOB_CANCELLED, + /* SUBSTATE55 */ EDG_WLL_JOB_UNKNOWN, + /* SUBSTATE56 */ EDG_WLL_JOB_UNKNOWN, + /* SUBSTATE57 */ EDG_WLL_JOB_UNKNOWN, + /* OBIT */ EDG_WLL_JOB_RUNNING, + /* COMPLETE */ EDG_WLL_JOB_DONE, + /* RERUN */ EDG_WLL_JOB_WAITING, + /* RERUN1 */ EDG_WLL_JOB_WAITING, + /* RERUN2 */ EDG_WLL_JOB_WAITING, + /* RERUN3 */ EDG_WLL_JOB_WAITING, + /* SUBSTATE64 */ EDG_WLL_JOB_UNKNOWN, + /* SUBSTATE65 */ EDG_WLL_JOB_UNKNOWN, + /* SUBSTATE66 */ EDG_WLL_JOB_UNKNOWN, + /* SUBSTATE67 */ EDG_WLL_JOB_UNKNOWN, + /* SUBSTATE68 */ EDG_WLL_JOB_UNKNOWN, + /* SUBSTATE69 */ EDG_WLL_JOB_UNKNOWN, + /* RETURNSTD */ EDG_WLL_JOB_RUNNING +}; + + +/* maps PBS/Torque job states to status characters for display */ +static char *_PBSstate2char[] = { + /* TRANSIT -> */ "T", + /* QUEUED -> */ "Q", + /* HELD -> */ "H", + /* WAITING -> */ "W", + /* RUNNING -> */ "R", + /* EXITING -> */ "E", + /* COMPLETE-> */ "C" +}; + int processEvent_PBS(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict, char **errstring) { edg_wll_JobStatCode old_state = js->pub.state; @@ -68,162 +155,255 @@ int processEvent_PBS(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict, c } switch (e->any.type) { - case EDG_WLL_EVENT_REGJOB: - if (USABLE(res)) { - js->pub.state = EDG_WLL_JOB_SUBMITTED; - rep(js->pub.pbs_state, "Q"); + case EDG_WLL_EVENT_PBSINTERNALSTATECHANGE: + if(USABLE(res)) { + /* TODO: should we use this? Maybe to cross check... + * js->pub.state = _PBSsubstate2lbstate[e->PBSInternalStateChange.newsubstate]; + */ + rep(js->pub.pbs_state, _PBSstate2char[e->PBSInternalStateChange.newstate]); + js->pub.pbs_substate = e->PBSInternalStateChange.newsubstate; + } + break; + + case EDG_WLL_EVENT_REGJOB: + if (USABLE(res)) { + js->pub.state = EDG_WLL_JOB_SUBMITTED; + } + if (USABLE_DATA(res)) { + /* this is going to be the first server taking care of the job */ + rep(js->pub.network_server, e->RegJob.ns); + } + break; + + case EDG_WLL_EVENT_PBSTRANSFER: + if(USABLE(res)) { + switch(e->PBSTransfer.result) { + case EDG_WLL_PBSTRANSFER_START: + break; + + case EDG_WLL_PBSTRANSFER_OK: + break; + + case EDG_WLL_PBSTRANSFER_REFUSED: + case EDG_WLL_PBSTRANSFER_FAIL: + break; + + default: + break; + } - if (USABLE_DATA(res)) { + } + if(USABLE_DATA(res)) { + /* job going to another server */ + switch(e->PBSTransfer.result) { + case EDG_WLL_PBSTRANSFER_OK: + /* update job location */ + switch(e->PBSTransfer.destination) { + case EDG_WLL_SOURCE_SERVER: + rep(js->pub); + break; + default: + /* where is it going? */ + break; + + } + break; + default: + break; } - break; - case EDG_WLL_EVENT_PBSQUEUED: - if (USABLE(res)) { - js->pub.state = EDG_WLL_JOB_WAITING; - rep(js->pub.pbs_state, "Q"); + } + break; + + case EDG_WLL_EVENT_PBSACCEPTED: + if(USABLE(res)) { + switch(e->any.source) { + case EDG_WLL_SOURCE_PBS_SERVER: + /* accepted by server means job is submitted */ + js->pub.state = EDG_WLL_JOB_SUBMITTED; + break; + + case EDG_WLL_SOURCE_PBS_SMOM: + case EDG_WLL_SOURCE_PBS_MOM: + /* accepted by MOM: job is going to run */ + js->pub.state = EDG_WLL_JOB_SCHEDULED; + break; + + default: + /* this would be weird */ + break; } - if (USABLE_DATA(res)) { + } + break; + + case EDG_WLL_EVENT_PBSREFUSED: + break; + + case EDG_WLL_EVENT_PBSQUEUED: + if (USABLE(res)) { + js->pub.state = EDG_WLL_JOB_WAITING; + } + if (USABLE_DATA(res)) { + if(e->any.source == EDG_WLL_SOURCE_PBS_SERVER) { + /* queue */ if (!js->pub.pbs_queue) js->pub.pbs_queue = strdup(e->PBSQueued.queue); assert(!strcmp(js->pub.pbs_queue, e->PBSQueued.queue)); - /* rep_cond(js->pub.pbs_owner,e->PBSQueued.owner); - rep_cond(js->pub.pbs_name,e->PBSQueued.name); */ - } - break; - case EDG_WLL_EVENT_PBSMATCH: - if (USABLE(res)) { - js->pub.state = EDG_WLL_JOB_READY; - rep(js->pub.pbs_state, "Q"); + + /* job owner */ + if(!js->pub.pbs_owner) + rep_cond(js->pub.pbs_owner, e->PBSQueued.owner); + /* job_name */ + if(!js->pub.pbs_name) + rep_cond(js->pub.pbs_name, e->PBSQueued.name); } - if (USABLE_DATA(res)) { - rep_cond(js->pub.pbs_dest_host,e->PBSMatch.dest_host); + } + break; + + case EDG_WLL_EVENT_PBSMATCH: + /* XXX - not used yet */ + if (USABLE(res)) { + js->pub.state = EDG_WLL_JOB_READY; + } + if (USABLE_DATA(res)) { + rep_cond(js->pub.pbs_dest_host,e->PBSMatch.dest_host); + } + break; + + case EDG_WLL_EVENT_PBSPENDING: + /* XXX - not used yet */ + if (USABLE(res)) { + js->pub.state = EDG_WLL_JOB_WAITING; + js->pbs_reruning = 0; // reset possible reruning flag + } + if (USABLE_DATA(res)) { + rep_cond(js->pub.pbs_reason,e->PBSPending.reason); + } + break; + + case EDG_WLL_EVENT_PBSWAITING: + break; + + case EDG_WLL_EVENT_PBSRUN: + if (USABLE(res)) { + switch (e->any.source) { + case EDG_WLL_SOURCE_PBS_SERVER: + js->pub.state = EDG_WLL_JOB_SCHEDULED; + break; + case EDG_WLL_SOURCE_PBS_MOM: + js->pub.state = EDG_WLL_JOB_RUNNING; + break; + default: + assert(0); // running event from strange source + break; } - break; - case EDG_WLL_EVENT_PBSPENDING: - if (USABLE(res)) { + } + if (USABLE_DATA(res)) { + /* session id */ + rep_cond(js->pub.pbs_scheduler, e->PBSRun.scheduler); + rep_cond(js->pub.pbs_dest_host, e->PBSRun.dest_host); + js->pub.pbs_pid = e->PBSRun.pid; + } + break; + + case EDG_WLL_EVENT_PBSRERUN: + if (USABLE(res)) { + switch (e->any.source) { + case EDG_WLL_SOURCE_PBS_SERVER: js->pub.state = EDG_WLL_JOB_WAITING; - rep(js->pub.pbs_state, "Q"); - js->pbs_reruning = 0; // reset possible reruning flag - } - if (USABLE_DATA(res)) { - rep_cond(js->pub.pbs_reason,e->PBSPending.reason); - } - break; - case EDG_WLL_EVENT_PBSRUN: - if (USABLE(res)) { - switch (get_pbs_event_source(e->any.seqcode)) { - case EDG_WLL_PBS_EVENT_SOURCE_SERVER: - js->pub.state = EDG_WLL_JOB_SCHEDULED; - rep(js->pub.pbs_state, "Q"); - break; - case EDG_WLL_PBS_EVENT_SOURCE_MOM: - js->pub.state = EDG_WLL_JOB_RUNNING; - rep(js->pub.pbs_state, "R"); - break; - default: - assert(0); // running event from strange source - break; - } - } - if (USABLE_DATA(res)) { - rep_cond(js->pub.pbs_scheduler, e->PBSRun.scheduler); - rep_cond(js->pub.pbs_dest_host, e->PBSRun.dest_host); - js->pub.pbs_pid = e->PBSRun.pid; - } - break; - case EDG_WLL_EVENT_PBSRERUN: - if (USABLE(res)) { - switch (get_pbs_event_source(e->any.seqcode)) { - case EDG_WLL_PBS_EVENT_SOURCE_SERVER: - js->pub.state = EDG_WLL_JOB_WAITING; - rep(js->pub.pbs_state, "Q"); - break; - case EDG_WLL_PBS_EVENT_SOURCE_MOM: - js->pub.state = EDG_WLL_JOB_WAITING; - rep(js->pub.pbs_state, "E"); - js->pbs_reruning = 1; - break; - default: - assert(0); // running event from strande source - break; - } - } - if (USABLE_DATA(res)) { - } - break; - case EDG_WLL_EVENT_PBSDONE: - if (USABLE(res)) { - switch (get_pbs_event_source(e->any.seqcode)) { - case EDG_WLL_PBS_EVENT_SOURCE_SERVER: - js->pub.state = EDG_WLL_JOB_DONE; - js->pub.done_code = EDG_WLL_STAT_OK; - rep(js->pub.pbs_state, "C"); - break; - case EDG_WLL_PBS_EVENT_SOURCE_MOM: - if (!js->pbs_reruning) { - js->pub.state = EDG_WLL_JOB_DONE; - js->pub.done_code = EDG_WLL_STAT_OK; - rep(js->pub.pbs_state, "C"); - } - break; - default: - assert(0); //done event from strange source - break; - } - } - if (USABLE_DATA(res)) { - js->pub.pbs_exit_status = e->PBSDone.exit_status; - } - break; - case EDG_WLL_EVENT_PBSRESOURCEUSAGE: - if (USABLE(res)) { - // signalize state done, done_code uknown - js->pub.state = EDG_WLL_JOB_DONE; - rep(js->pub.pbs_state, "C"); - } - if (USABLE_DATA(res)) { - char *new_resource_usage; - - trio_asprintf(&new_resource_usage,"%s%s\t%s = %f [%s]", - (js->pub.pbs_resource_usage) ? js->pub.pbs_resource_usage : "", - (js->pub.pbs_resource_usage) ? "\n": "", - e->PBSResourceUsage.name, - e->PBSResourceUsage.quantity, - e->PBSResourceUsage.unit); - - if (js->pub.pbs_resource_usage) free(js->pub.pbs_resource_usage); - js->pub.pbs_resource_usage = new_resource_usage; - } - break; - case EDG_WLL_EVENT_PBSERROR: - if (USABLE(res)) { - js->pub.state = EDG_WLL_JOB_DONE; - js->pub.done_code = EDG_WLL_STAT_FAILED; - rep(js->pub.pbs_state, "C"); + break; + case EDG_WLL_SOURCE_PBS_MOM: + js->pub.state = EDG_WLL_JOB_WAITING; + js->pbs_reruning = 1; + break; + default: + assert(0); // running event from strande source + break; } - if (USABLE_DATA(res)) { - char *new_error_desc; + } + if (USABLE_DATA(res)) { + /* session id */ + } + break; - trio_asprintf(&new_error_desc,"%s%s\t%s", - (js->pub.pbs_error_desc) ? js->pub.pbs_error_desc : "", - (js->pub.pbs_error_desc) ? "\n" : "", - e->PBSError.error_desc); - - if (js->pub.pbs_error_desc) free(js->pub.pbs_error_desc); - js->pub.pbs_error_desc = new_error_desc; - } - break; - case EDG_WLL_EVENT_USERTAG: - if (USABLE_DATA(res)) { - if (e->userTag.name != NULL && e->userTag.value != NULL) { - add_taglist(e->userTag.name, e->userTag.value, e->any.seqcode, js); + case EDG_WLL_EVENT_PBSABORT: + break; + + case EDG_WLL_EVENT_PBSDONE: + if (USABLE(res)) { + switch (e->any.source) { + case EDG_WLL_SOURCE_PBS_SERVER: + js->pub.state = EDG_WLL_JOB_DONE; + js->pub.done_code = EDG_WLL_STAT_OK; + break; + case EDG_WLL_SOURCE_PBS_MOM: + if (!js->pbs_reruning) { + js->pub.state = EDG_WLL_JOB_DONE; + js->pub.done_code = EDG_WLL_STAT_OK; } + break; + default: + assert(0); //done event from strange source + break; } - break; + } + if (USABLE_DATA(res)) { + /* exit status */ + js->pub.pbs_exit_status = e->PBSDone.exit_status; + } + break; - default: - break; - } + case EDG_WLL_EVENT_PBSRESOURCEUSAGE: + if (USABLE(res)) { + // signalize state done, done_code uknown + js->pub.state = EDG_WLL_JOB_DONE; + } + if (USABLE_DATA(res)) { + char *new_resource_usage; + + /*trio_asprintf(&new_resource_usage,"%s%s\t%s = %f [%s]", + (js->pub.pbs_resource_usage) ? js->pub.pbs_resource_usage : "", + (js->pub.pbs_resource_usage) ? "\n": "", + e->PBSResourceUsage.name, + e->PBSResourceUsage.quantity, + e->PBSResourceUsage.unit); + */ + if (js->pub.pbs_resource_usage) free(js->pub.pbs_resource_usage); + js->pub.pbs_resource_usage = new_resource_usage; + } + break; + + case EDG_WLL_EVENT_PBSERROR: + /* XXX - not used yet */ + if (USABLE(res)) { + js->pub.state = EDG_WLL_JOB_DONE; + js->pub.done_code = EDG_WLL_STAT_FAILED; + rep(js->pub.pbs_state, "C"); + } + if (USABLE_DATA(res)) { + char *new_error_desc; + + trio_asprintf(&new_error_desc,"%s%s\t%s", + (js->pub.pbs_error_desc) ? js->pub.pbs_error_desc : "", + (js->pub.pbs_error_desc) ? "\n" : "", + e->PBSError.error_desc); + + if (js->pub.pbs_error_desc) free(js->pub.pbs_error_desc); + js->pub.pbs_error_desc = new_error_desc; + } + break; + case EDG_WLL_EVENT_USERTAG: + if (USABLE_DATA(res)) { + if (e->userTag.name != NULL && e->userTag.value != NULL) { + add_taglist(e->userTag.name, e->userTag.value, e->any.seqcode, js); + } + } + break; + + default: + break; + } + /* XXX : just debug output - remove */ /*printf("processEvent_PBS(): %s (%s), state: %s --> %s\n ", @@ -238,7 +418,7 @@ int processEvent_PBS(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict, c if (USABLE(res)) { rep(js->last_seqcode, e->any.seqcode); - + js->pub.lastUpdateTime = e->any.timestamp; if (old_state != js->pub.state) { js->pub.stateEnterTime = js->pub.lastUpdateTime; @@ -247,8 +427,8 @@ int processEvent_PBS(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict, c } } if (! js->pub.location) js->pub.location = strdup("this is PBS"); - - + + return RET_OK; } diff --git a/org.glite.lb.state-machine/src/seqcode_aux.c b/org.glite.lb.state-machine/src/seqcode_aux.c index b19d2ec..041202c 100644 --- a/org.glite.lb.state-machine/src/seqcode_aux.c +++ b/org.glite.lb.state-machine/src/seqcode_aux.c @@ -52,7 +52,7 @@ limitations under the License. int component_seqcode(const char *a, edg_wll_Source index) { - unsigned int c[EDG_WLL_SEQ_FORMAT_NUMBER+1]; + unsigned int c[EDG_WLL_SOURCE__LAST]; int res; char sc[EDG_WLL_SEQ_SIZE]; @@ -80,7 +80,7 @@ int component_seqcode(const char *a, edg_wll_Source index) char * set_component_seqcode(char *a,edg_wll_Source index,int val) { - unsigned int c[EDG_WLL_SEQ_FORMAT_NUMBER+1]; + unsigned int c[EDG_WLL_SOURCE__LAST]; int res; char *ret; char sc[EDG_WLL_SEQ_SIZE]; @@ -139,70 +139,31 @@ int same_branch(const char *a, const char *b) int edg_wll_compare_pbs_seq(const char *a,const char *b) { - char timestamp_a[14], pos_a[10], src_a; - char timestamp_b[14], pos_b[10], src_b; - int ev_code_a, ev_code_b; - int res; + unsigned int c[EDG_WLL_SEQ_PBS_FORMAT_NUMBER+1], d[EDG_WLL_SEQ_PBS_FORMAT_NUMBER+1]; + int i, res; - res = sscanf(a,"TIMESTAMP=%14s:POS=%10s:EV.CODE=%3d:SRC=%c", timestamp_a, pos_a, &ev_code_a, &src_a); - - if (res != 4) { -/* FIXME: syslog(LOG_ERR, "unparsable sequence code %s\n", a); */ + res = sscanf(a, EDG_WLL_SEQ_PBS_FORMAT_SCANF, + &c[0], &c[1], &c[2], &c[3], &c[4]); + if(res != EDG_WLL_SEQ_PBS_FORMAT_NUMBER) { fprintf(stderr, "unparsable sequence code %s\n", a); return -1; } - - res = sscanf(b,"TIMESTAMP=%14s:POS=%10s:EV.CODE=%3d:SRC=%c", timestamp_b, pos_b, &ev_code_b, &src_b); - - if (res != 4) { -/* FIXME: syslog(LOG_ERR, "unparsable sequence code %s\n", b); */ + + res = sscanf(b, EDG_WLL_SEQ_PBS_FORMAT_SCANF, + &d[0], &d[1], &d[2], &d[3], &d[4]); + if(res != EDG_WLL_SEQ_PBS_FORMAT_NUMBER) { fprintf(stderr, "unparsable sequence code %s\n", b); return -1; } - /* wild card for PBSJobReg - this event should always come as firt one */ - /* bacause it hold job.type, which is necessary for further event processing */ - if (ev_code_a == EDG_WLL_EVENT_REGJOB) return -1; - if (ev_code_b == EDG_WLL_EVENT_REGJOB) return 1; - - /* sort event w.t.r. to timestamps */ - if ((res = strcmp(timestamp_a,timestamp_b)) != 0) { - return res; - } - else { - /* if timestamps equal, sort if w.t.r. to file position */ - /* if you both events come from the same log file */ - if (src_a == src_b) { - /* zero mean in fact duplicate events in log */ - return strcmp(pos_a,pos_b); - } - /* if the events come from diffrent log files */ - /* it is possible to prioritize some src log file */ - else { - /* prioritize events from pbs_mom */ - if (src_a == 'm') return 1; - if (src_b == 'm') return -1; - - /* then prioritize events from pbs_server */ - if (src_a == 's') return 1; - if (src_b == 's') return -1; - - /* other priorities comes here... */ - } + for (i = 0 ; i <= EDG_WLL_SEQ_PBS_FORMAT_NUMBER; i++) { + if (c[i] < d[i]) return -1; + if (c[i] > d[i]) return 1; } return 0; } -edg_wll_PBSEventSource get_pbs_event_source(const char *pbs_seq_num) { - switch (pbs_seq_num[EDG_WLL_SEQ_PBS_SIZE - 2]) { - case 'c': return(EDG_WLL_PBS_EVENT_SOURCE_SCHEDULER); - case 's': return(EDG_WLL_PBS_EVENT_SOURCE_SERVER); - case 'm': return(EDG_WLL_PBS_EVENT_SOURCE_MOM); - case 'a': return(EDG_WLL_PBS_EVENT_SOURCE_ACCOUNTING); - default: return(EDG_WLL_PBS_EVENT_SOURCE_UNDEF); - } -} edg_wll_CondorEventSource get_condor_event_source(const char *condor_seq_num) { switch (condor_seq_num[EDG_WLL_SEQ_CONDOR_SIZE - 2]) { @@ -219,15 +180,73 @@ edg_wll_CondorEventSource get_condor_event_source(const char *condor_seq_num) { } } + +int edg_wll_compare_condor_seq(const char *a, const char *b) { + char timestamp_a[14], pos_a[10], src_a; + char timestamp_b[14], pos_b[10], src_b; + int ev_code_a, ev_code_b; + int res; + + res = sscanf(a,"TIMESTAMP=%14s:POS=%10s:EV.CODE=%3d:SRC=%c", timestamp_a, pos_a, &ev_code_a, &src_a); + + if (res != 4) { +/* FIXME: syslog(LOG_ERR, "unparsable sequence code %s\n", a); */ + fprintf(stderr, "unparsable sequence code %s\n", a); + return -1; + } + + res = sscanf(b,"TIMESTAMP=%14s:POS=%10s:EV.CODE=%3d:SRC=%c", timestamp_b, pos_b, &ev_code_b, &src_b); + + if (res != 4) { +/* FIXME: syslog(LOG_ERR, "unparsable sequence code %s\n", b); */ + fprintf(stderr, "unparsable sequence code %s\n", b); + return -1; + } + + /* wild card for JobReg - this event should always come as firt one */ + /* bacause it hold job.type, which is necessary for further event processing */ + if (ev_code_a == EDG_WLL_EVENT_REGJOB) return -1; + if (ev_code_b == EDG_WLL_EVENT_REGJOB) return 1; + + /* sort event w.t.r. to timestamps */ + if ((res = strcmp(timestamp_a,timestamp_b)) != 0) { + return res; + } + else { + /* if timestamps equal, sort if w.t.r. to file position */ + /* if you both events come from the same log file */ + if (src_a == src_b) { + /* zero mean in fact duplicate events in log */ + return strcmp(pos_a,pos_b); + } + /* if the events come from diffrent log files */ + /* it is possible to prioritize some src log file */ + else { + /* prioritize events from pbs_mom */ + if (src_a == 'm') return 1; + if (src_b == 'm') return -1; + + /* then prioritize events from pbs_server */ + if (src_a == 's') return 1; + if (src_b == 's') return -1; + + /* other priorities comes here... */ + } + } + + return 0; +} + + int edg_wll_compare_seq(const char *a, const char *b) { - unsigned int c[EDG_WLL_SEQ_FORMAT_NUMBER+1]; - unsigned int d[EDG_WLL_SEQ_FORMAT_NUMBER+1]; + unsigned int c[EDG_WLL_SOURCE__LAST]; + unsigned int d[EDG_WLL_SOURCE__LAST]; int res, i; char sca[EDG_WLL_SEQ_SIZE], scb[EDG_WLL_SEQ_SIZE]; - if ( (strstr(a,"TIMESTAMP=") == a) && (strstr(b,"TIMESTAMP=") == b) ) + if ( (strstr(a,"SMOM=") == a) && (strstr(b,"SMOM=") == b) ) return edg_wll_compare_pbs_seq(a,b); if (!strstr(a, "LBS")) snprintf(sca,EDG_WLL_SEQ_SIZE,"%s:LBS=000000",a);