From fb39b1a754fe8d64e49e11a714235960d4a6ec3d Mon Sep 17 00:00:00 2001 From: =?utf8?q?Ale=C5=A1=20K=C5=99enek?= Date: Fri, 16 Sep 2005 11:18:49 +0000 Subject: [PATCH] reshuffle quite lot of code to avoid unresolved symbols in the LB JP plugin --- org.glite.lb.server/Makefile | 22 +- org.glite.lb.server/src/bkindex.c | 62 +++ org.glite.lb.server/src/get_events.h | 1 - org.glite.lb.server/src/jobstat.c | 781 ------------------------------- org.glite.lb.server/src/jobstat.h | 6 + org.glite.lb.server/src/jobstat_supp.c | 61 --- org.glite.lb.server/src/lbs_db.c | 27 -- org.glite.lb.server/src/lbs_db_supp.c | 30 ++ org.glite.lb.server/src/process_event.c | 795 ++++++++++++++++++++++++++++++++ 9 files changed, 905 insertions(+), 880 deletions(-) create mode 100644 org.glite.lb.server/src/lbs_db_supp.c create mode 100644 org.glite.lb.server/src/process_event.c diff --git a/org.glite.lb.server/Makefile b/org.glite.lb.server/Makefile index f040d45..197bfe0 100644 --- a/org.glite.lb.server/Makefile +++ b/org.glite.lb.server/Makefile @@ -108,11 +108,14 @@ EXT_LIBS:= -L${ares_prefix}/lib -lares \ SRVBONES_LIB:= -L${stagedir}/lib -lglite_lb_server_bones COMMON_LIBS:= -L${stagedir}/lib -lglite_lb_common_${nothrflavour} -lglite_security_gss_${nothrflavour} PLUGIN_LIBS:= -L${stagedir}/lib -lglite_lb_common_${nothrflavour}\ + ${GRIDSITE_LIBS} -lvomsc${vomsflavour} \ ${expatlib} -lexpat\ +PLUGIN_LOBJS:= lb_plugin.lo jobstat_supp.lo process_event.lo lbs_db_supp.lo + BKSERVER_BASE_OBJS:= \ - bkserverd.o il_lbproxy.o get_events.o index.o jobstat.o jobstat_supp.o \ - seqcode.o write2rgma.o lbs_db.o lb_html.o lb_http.o lb_proto.o lb_xml_parse.o \ + bkserverd.o il_lbproxy.o get_events.o index.o jobstat.o jobstat_supp.o process_event.o \ + seqcode.o write2rgma.o lbs_db.o lbs_db_supp.o lb_html.o lb_http.o lb_proto.o lb_xml_parse.o \ lb_xml_parse_V21.o \ lock.o openserver.o query.o userjobs.o db_store.o request.o store.o \ stored_master.o srv_purge.o server_state.o dump.o lb_authz.o load.o \ @@ -140,8 +143,8 @@ else ${EXT_LIBS} endif -INDEX_OBJS:= index.o index_parse.o jobstat_supp.o lbs_db.o openserver.o \ - jobstat.o query.o lock.o get_events.o write2rgma.o index_lex.o \ +INDEX_OBJS:= index.o index_parse.o jobstat_supp.o lbs_db.o lbs_db_supp.o openserver.o \ + jobstat.o process_event.o query.o lock.o get_events.o write2rgma.o index_lex.o \ lb_authz.o store.o bkindex.o INDEX_LIBS:= ${SRVBONES_LIB} ${COMMON_LIBS} ${EXT_LIBS} @@ -157,8 +160,8 @@ WS_CLIENT_LIBS:= -L${stagedir}/lib \ HDRS=index.h lb_authz.h lbs_db.h store.h LIB_OBJS_BK:= \ - il_lbproxy.o get_events.o index.o jobstat.o jobstat_supp.o \ - seqcode.o write2rgma.o lbs_db.o lb_html.o lb_http.o lb_proto.o lb_xml_parse.o \ + il_lbproxy.o get_events.o index.o jobstat.o jobstat_supp.o process_event.o \ + seqcode.o write2rgma.o lbs_db.o lbs_db_supp.o lb_html.o lb_http.o lb_proto.o lb_xml_parse.o \ lb_xml_parse_V21.o \ lock.o openserver.o query.o userjobs.o db_store.o request.o store.o \ stored_master.o srv_purge.o server_state.o dump.o lb_authz.o load.o \ @@ -171,8 +174,8 @@ glite_lb_bkserverd: ${NSMAP} ${BKSERVER_OBJS} glite_lb_bkindex: ${INDEX_OBJS} ${LINK} -o $@ ${INDEX_OBJS} ${INDEX_LIBS} -glite_lb_plugin.la: lb_plugin.lo - ${SOLINK} -o $@ lb_plugin.lo ${PLUGIN_LIBS} +glite_lb_plugin.la: ${PLUGIN_LOBJS} + ${SOLINK} -o $@ ${PLUGIN_LOBJS} ${PLUGIN_LIBS} default all: compile @@ -318,10 +321,9 @@ clean: ${AT3} $< >$@ || rm -f $@ chmod -w $@ >/dev/null -%.o: %.c +%.o %.lo: %.c ${COMPILE} -c $< - test_query_events.o: %.o: %.cpp ${CXX} -c ${CFLAGS} ${GLOBUSINC} ${TEST_INC} $< diff --git a/org.glite.lb.server/src/bkindex.c b/org.glite.lb.server/src/bkindex.c index fe8014e..2b45116 100644 --- a/org.glite.lb.server/src/bkindex.c +++ b/org.glite.lb.server/src/bkindex.c @@ -303,3 +303,65 @@ static void usage(const char *me) " [file] is applicable only without -d and -R\n", me); } + +/* + * Set values of index columns in state table (after index reconfiguration) + */ + +edg_wll_ErrorCode edg_wll_RefreshIColumns(edg_wll_Context ctx, void *job_index_cols) { + + edg_wll_Stmt sh, sh2; + int njobs, ret = -1; + intJobStat *stat; + edg_wlc_JobId jobid; + char *res[5]; + char *rest; + char *icvalues, *stmt; + int i; + + edg_wll_ResetError(ctx); + if (!job_index_cols) return 0; + + if ((njobs = edg_wll_ExecStmt(ctx, "select s.jobid,s.int_status,s.seq,s.version,j.dg_jobid" + " from states s, jobs j where s.jobid=j.jobid",&sh)) < 0) { + edg_wll_FreeStmt(&sh); + return edg_wll_Error(ctx, NULL, NULL); + } + while ((ret=edg_wll_FetchRow(sh,res)) >0) { + if (strcmp(res[3], INTSTAT_VERSION)) { + stat = NULL; + if (!edg_wlc_JobIdParse(res[4], &jobid)) { + if ((stat = malloc(sizeof(intJobStat))) != NULL) { + if (edg_wll_intJobStatus(ctx, jobid, 0, stat, 1)) { + free(stat); + stat = NULL; + } + } + edg_wlc_JobIdFree(jobid); + } + } else { + stat = dec_intJobStat(res[1], &rest); + if (rest == NULL) stat = NULL; + } + if (stat == NULL) { + edg_wll_FreeStmt(&sh); + return edg_wll_SetError(ctx, EDG_WLL_ERROR_SERVER_RESPONSE, + "cannot decode int_status from states DB table"); + } + + edg_wll_IColumnsSQLPart(ctx, job_index_cols, stat, 0, NULL, &icvalues); + trio_asprintf(&stmt, "update states set seq=%s%s where jobid='%|Ss'", res[2], icvalues, res[0]); + ret = edg_wll_ExecStmt(ctx, stmt, &sh2); + edg_wll_FreeStmt(&sh2); + + for (i = 0; i < 5; i++) free(res[i]); + destroy_intJobStat(stat); free(stat); + free(stmt); free(icvalues); + + if (ret < 0) return edg_wll_Error(ctx, NULL, NULL); + + } + edg_wll_FreeStmt(&sh); + return edg_wll_Error(ctx, NULL, NULL); +} + diff --git a/org.glite.lb.server/src/get_events.h b/org.glite.lb.server/src/get_events.h index cfa07c1..eee4e89 100644 --- a/org.glite.lb.server/src/get_events.h +++ b/org.glite.lb.server/src/get_events.h @@ -24,7 +24,6 @@ int edg_wll_QueryEventsServer(edg_wll_Context,int,const edg_wll_QueryRec **,cons int edg_wll_QueryJobsServer(edg_wll_Context, const edg_wll_QueryRec **, int, edg_wlc_JobId **, edg_wll_JobStat **); void edg_wll_SortEvents(edg_wll_Event *); -int edg_wll_compare_seq(const char *, const char *); #ifdef __cplusplus } diff --git a/org.glite.lb.server/src/jobstat.c b/org.glite.lb.server/src/jobstat.c index eaf29b4..9846fe6 100644 --- a/org.glite.lb.server/src/jobstat.c +++ b/org.glite.lb.server/src/jobstat.c @@ -32,19 +32,10 @@ #endif -#define rep(a,b) { free(a); a = (b == NULL) ? NULL : strdup(b); } #define mov(a,b) { free(a); a = b; b = NULL; } static void warn (const char* format, ...) UNUSED_VAR ; static char *job_owner(edg_wll_Context,char *); -static char* location_string(const char*, const char*, const char*); -static int add_stringlist(char ***, const char *) UNUSED_VAR; -static void free_stringlist(char ***); -static int add_taglist(edg_wll_TagValue **, const char *, const char *); -static void update_branch_state(char *, char *, char *, char *, branch_state **); -static void free_branch_state(branch_state **); -static void load_branch_state(intJobStat *); - int edg_wll_intJobStatus(edg_wll_Context, const edg_wlc_JobId, int, intJobStat *, int); edg_wll_ErrorCode edg_wll_StoreIntState(edg_wll_Context, intJobStat *, int); @@ -67,20 +58,6 @@ static void init_intJobStat(intJobStat *p) /* TBD: generate */ } -static void destroy_intJobStat_extension(intJobStat *p) -{ - free(p->last_seqcode); p->last_seqcode = NULL; - free(p->last_cancel_seqcode); p->last_cancel_seqcode = NULL; - p->resubmit_type = EDG_WLL_RESUBMISSION_UNDEFINED; -} - -void destroy_intJobStat(intJobStat *p) -{ - edg_wll_FreeStatus(&p->pub); - destroy_intJobStat_extension(p); - memset(p, 0, sizeof(intJobStat)); -} - #if 0 static int eval_expect_update(intJobStat *, int *, char **); #endif @@ -391,585 +368,6 @@ int edg_wll_intJobStatus( } -static int badEvent(intJobStat *js UNUSED_VAR, edg_wll_Event *e, int ev_seq UNUSED_VAR) -{ - char *str; - - str = edg_wll_EventToString(e->any.type); - fprintf(stderr, "edg_wll_JobStatus: bad event: type %d (%s)\n", - e->any.type, (str == NULL) ? "unknown" : str); - free(str); - return RET_FATAL; -} - -#define USABLE(res,strict) ((res) == RET_OK || ( (res) == RET_SOON && !strict)) -#define USABLE_DATA(res,strict) ((res) == RET_OK || ( (res) != RET_FATAL && !strict)) -#define USABLE_BRANCH(fine_res) ((fine_res) != RET_TOOOLD && (fine_res) != RET_BADBRANCH) -#define LRMS_STATE(state) ((state) == EDG_WLL_JOB_RUNNING || (state) == EDG_WLL_JOB_DONE) - - -int processEvent(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict, char **errstring) -{ - - edg_wll_JobStatCode old_state = js->pub.state; - edg_wll_JobStatCode new_state = EDG_WLL_JOB_UNKNOWN; - int res = RET_OK, - fine_res = RET_OK; - - - - if (old_state == EDG_WLL_JOB_ABORTED || - old_state == EDG_WLL_JOB_CANCELLED || - old_state == EDG_WLL_JOB_CLEARED) { - res = RET_LATE; - } - - - if (js->deep_resubmit_seqcode && - before_deep_resubmission(e->any.seqcode, js->deep_resubmit_seqcode)) { - res = RET_LATE; - fine_res = RET_TOOOLD; - } - else if (js->branch_tag_seqcode) { // ReallyRunning ev. arrived - if (same_branch(e->any.seqcode, js->branch_tag_seqcode)) { - if ((js->last_seqcode != NULL) && - edg_wll_compare_seq(e->any.seqcode, js->last_branch_seqcode) < 0) { - res = RET_LATE; - } - fine_res = RET_GOODBRANCH; - } - else { - res = RET_LATE; - fine_res = RET_BADBRANCH; - } - } - else if ((js->last_seqcode != NULL) && - edg_wll_compare_seq(e->any.seqcode, js->last_seqcode) < 0) { - res = RET_LATE; - } - - - switch (e->any.type) { - case EDG_WLL_EVENT_TRANSFER: - if (e->transfer.result == EDG_WLL_TRANSFER_OK) { - switch (e->transfer.source) { - case EDG_WLL_SOURCE_USER_INTERFACE: - new_state = EDG_WLL_JOB_WAITING; break; - case EDG_WLL_SOURCE_JOB_SUBMISSION: - /* if (LRMS_STATE(old_state)) res = RET_LATE; */ - new_state = EDG_WLL_JOB_READY; break; - case EDG_WLL_SOURCE_LOG_MONITOR: - if (LRMS_STATE(old_state)) { - js->pub.stateEnterTimes[1 + EDG_WLL_JOB_SCHEDULED] = - e->any.timestamp.tv_sec; - res = RET_LATE; - } - new_state = EDG_WLL_JOB_SCHEDULED; break; - default: - goto bad_event; break; - } - } else if (e->transfer.result == EDG_WLL_TRANSFER_FAIL) { - /* transfer failed */ - switch (e->transfer.source) { - case EDG_WLL_SOURCE_USER_INTERFACE: - new_state = EDG_WLL_JOB_SUBMITTED; break; - case EDG_WLL_SOURCE_JOB_SUBMISSION: - if (LRMS_STATE(old_state)) res = RET_LATE; - new_state = EDG_WLL_JOB_READY; break; - case EDG_WLL_SOURCE_LOG_MONITOR: - if (LRMS_STATE(old_state)) res = RET_LATE; - new_state = EDG_WLL_JOB_READY; break; - default: - goto bad_event; break; - } - } else { - /* e->transfer.result == EDG_WLL_TRANSFER_START */ - res = RET_IGNORE; - } - if (USABLE(res, strict)) { - js->pub.state = new_state; - rep(js->pub.reason, e->transfer.reason); - - free(js->pub.location); - if (e->transfer.result == EDG_WLL_TRANSFER_OK) { - js->pub.location = location_string( - edg_wll_SourceToString(e->transfer.destination), - e->transfer.dest_host, - e->transfer.dest_instance); - } else { - js->pub.location = location_string( - edg_wll_SourceToString(e->transfer.source), - e->transfer.host, - e->transfer.src_instance); - } - } - if (USABLE_DATA(res, strict)) { - switch (e->transfer.source) { - case EDG_WLL_SOURCE_USER_INTERFACE: - rep(js->pub.jdl, e->transfer.job); break; - case EDG_WLL_SOURCE_JOB_SUBMISSION: - rep(js->pub.condor_jdl, e->transfer.job); break; - case EDG_WLL_SOURCE_LOG_MONITOR: - rep(js->pub.rsl, e->transfer.job); break; - default: - goto bad_event; break; - - } - } - break; - case EDG_WLL_EVENT_ACCEPTED: - switch (e->accepted.source) { - case EDG_WLL_SOURCE_NETWORK_SERVER: - new_state = EDG_WLL_JOB_WAITING; break; - case EDG_WLL_SOURCE_LOG_MONITOR: - if (LRMS_STATE(old_state)) res = RET_LATE; - new_state = EDG_WLL_JOB_READY; break; - case EDG_WLL_SOURCE_LRMS: - new_state = EDG_WLL_JOB_SCHEDULED; break; - default: - goto bad_event; break; - } - if (USABLE(res, strict)) { - js->pub.state = new_state; - free(js->pub.location); - js->pub.location = location_string( - edg_wll_SourceToString(e->accepted.source), - e->accepted.host, - e->accepted.src_instance); - } - if (USABLE_DATA(res, strict)) { - switch (e->accepted.source) { - case EDG_WLL_SOURCE_NETWORK_SERVER: - break; /* no WM id */ - case EDG_WLL_SOURCE_LOG_MONITOR: - rep(js->pub.condorId, e->accepted.local_jobid); break; - case EDG_WLL_SOURCE_LRMS: - /* XXX localId */ - rep(js->pub.globusId, e->accepted.local_jobid); break; - default: - goto bad_event; break; - } - } - break; - case EDG_WLL_EVENT_REFUSED: - switch (e->refused.source) { - case EDG_WLL_SOURCE_NETWORK_SERVER: - new_state = EDG_WLL_JOB_SUBMITTED; break; - case EDG_WLL_SOURCE_LOG_MONITOR: - new_state = EDG_WLL_JOB_READY; break; - case EDG_WLL_SOURCE_LRMS: - new_state = EDG_WLL_JOB_READY; break; - default: - goto bad_event; break; - } - if (USABLE(res, strict)) { - js->pub.state = new_state; - rep(js->pub.reason, e->refused.reason); - - free(js->pub.location); - js->pub.location = location_string( - edg_wll_SourceToString(e->refused.from), - e->refused.from_host, - e->refused.from_instance); - } - break; - case EDG_WLL_EVENT_ENQUEUED: - if (e->enQueued.result == EDG_WLL_ENQUEUED_OK) { - switch (e->enQueued.source) { - case EDG_WLL_SOURCE_NETWORK_SERVER: - new_state = EDG_WLL_JOB_WAITING; break; - case EDG_WLL_SOURCE_WORKLOAD_MANAGER: - if (LRMS_STATE(old_state)) res = RET_LATE; - update_branch_state(e->any.seqcode, NULL, - NULL, e->enQueued.job, &js->branch_states); - new_state = EDG_WLL_JOB_READY; break; - case EDG_WLL_SOURCE_LOG_MONITOR: - new_state = EDG_WLL_JOB_WAITING; break; - default: - goto bad_event; break; - } - } else if (e->enQueued.result == EDG_WLL_ENQUEUED_FAIL) { - switch (e->enQueued.source) { - case EDG_WLL_SOURCE_NETWORK_SERVER: - new_state = EDG_WLL_JOB_WAITING; break; - case EDG_WLL_SOURCE_WORKLOAD_MANAGER: - new_state = EDG_WLL_JOB_WAITING; break; - case EDG_WLL_SOURCE_LOG_MONITOR: - new_state = old_state; break; - default: - goto bad_event; break; - } - } else { - /* e->enQueued.result == EDG_WLL_ENQUEUED_START */ - res = RET_IGNORE; - } - if (USABLE(res, strict)) { - js->pub.state = new_state; - rep(js->pub.reason, e->enQueued.reason); - - free(js->pub.location); - if (e->enQueued.result == EDG_WLL_ENQUEUED_OK) { - js->pub.location = location_string( - e->enQueued.queue, - e->enQueued.host, - e->enQueued.src_instance); - if (e->enQueued.source == EDG_WLL_SOURCE_LOG_MONITOR) - js->pub.resubmitted = 1; - } else { - js->pub.location = location_string( - edg_wll_SourceToString(e->enQueued.source), - e->enQueued.host, - e->enQueued.src_instance); - } - } - if (USABLE_DATA(res, strict)) { - switch (e->enQueued.source) { - case EDG_WLL_SOURCE_NETWORK_SERVER: - rep(js->pub.jdl, e->enQueued.job); break; - case EDG_WLL_SOURCE_WORKLOAD_MANAGER: - if (USABLE_BRANCH(res)) { - rep(js->pub.matched_jdl, e->enQueued.job); - } - break; - case EDG_WLL_SOURCE_LOG_MONITOR: - /* no interim JDL here */ - break; - default: - goto bad_event; break; - } - } - break; - case EDG_WLL_EVENT_DEQUEUED: - switch (e->deQueued.source) { - case EDG_WLL_SOURCE_WORKLOAD_MANAGER: - new_state = EDG_WLL_JOB_WAITING; break; - case EDG_WLL_SOURCE_JOB_SUBMISSION: - if (LRMS_STATE(old_state)) res = RET_LATE; - new_state = EDG_WLL_JOB_READY; break; - default: - goto bad_event; break; - } - if (USABLE(res, strict)) { - js->pub.state = new_state; - free(js->pub.location); - js->pub.location = location_string( - edg_wll_SourceToString(e->deQueued.source), - e->deQueued.host, - e->deQueued.src_instance); - } - if (USABLE_DATA(res, strict)) { - /* no WM/JSS local jobid */ - } - break; - case EDG_WLL_EVENT_HELPERCALL: - if (USABLE(res, strict)) { - js->pub.state = EDG_WLL_JOB_WAITING; - free(js->pub.location); - js->pub.location = location_string( - e->helperCall.helper_name, - e->helperCall.host, - e->helperCall.src_instance); - /* roles and params used only for debugging */ - } - break; - case EDG_WLL_EVENT_HELPERRETURN: - if (USABLE(res, strict)) { - js->pub.state = EDG_WLL_JOB_WAITING; - free(js->pub.location); - js->pub.location = location_string( - edg_wll_SourceToString(EDG_WLL_SOURCE_WORKLOAD_MANAGER), - e->helperReturn.host, - e->helperReturn.src_instance); - /* roles and retvals used only for debugging */ - } - break; - case EDG_WLL_EVENT_RUNNING: - if (USABLE(res, strict)) { - js->pub.state = EDG_WLL_JOB_RUNNING; - free(js->pub.location); - js->pub.location = location_string( - edg_wll_SourceToString(EDG_WLL_SOURCE_LRMS), - "worknode", - e->running.node); - } - if (USABLE_DATA(res, strict)) { - if (USABLE_BRANCH(fine_res)) { - rep(js->pub.ce_node, e->running.node); - } - if (e->any.source == EDG_WLL_SOURCE_LOG_MONITOR) { - if (e->running.node) { - update_branch_state(e->any.seqcode, NULL, - e->running.node, NULL, &js->branch_states); - add_stringlist(&js->pub.possible_ce_nodes, - e->running.node); - } - } - } - break; - case EDG_WLL_EVENT_REALLYRUNNING: - if (USABLE_DATA(res, strict)) { - js->pub.state = EDG_WLL_JOB_RUNNING; - free(js->pub.location); - js->pub.location = location_string( - edg_wll_SourceToString(EDG_WLL_SOURCE_LRMS), - "worknode", - e->running.node); - js->pub.payload_running = 1; - if (e->any.source == EDG_WLL_SOURCE_LRMS) { - rep(js->branch_tag_seqcode, e->any.seqcode); - rep(js->last_branch_seqcode, e->any.seqcode); - } - if (e->any.source == EDG_WLL_SOURCE_LOG_MONITOR) { - rep(js->branch_tag_seqcode, e->reallyRunning.wn_seq); - rep(js->last_branch_seqcode, e->reallyRunning.wn_seq); - } - - load_branch_state(js); - } - break; - case EDG_WLL_EVENT_RESUBMISSION: - if (USABLE(res, strict)) { - if (e->resubmission.result == EDG_WLL_RESUBMISSION_WONTRESUB) { - rep(js->pub.reason, e->resubmission.reason); - } - } - if (USABLE_DATA(res, strict)) { - if (e->resubmission.result == EDG_WLL_RESUBMISSION_WONTRESUB) { - js->resubmit_type = EDG_WLL_RESUBMISSION_WONTRESUB; - } - else - if (e->resubmission.result == EDG_WLL_RESUBMISSION_WILLRESUB && - e->any.source == EDG_WLL_SOURCE_WORKLOAD_MANAGER) { - js->resubmit_type = EDG_WLL_RESUBMISSION_WILLRESUB; - free_stringlist(&js->pub.possible_destinations); - free_stringlist(&js->pub.possible_ce_nodes); - free_branch_state(&js->branch_states); - js->pub.payload_running = 0; - rep(js->branch_tag_seqcode, NULL); - rep(js->deep_resubmit_seqcode, e->any.seqcode); - } - else - if (e->resubmission.result == EDG_WLL_RESUBMISSION_SHALLOW) { - js->resubmit_type = EDG_WLL_RESUBMISSION_SHALLOW; - rep(js->deep_resubmit_seqcode, NULL); - } - } - break; - case EDG_WLL_EVENT_DONE: - if (e->any.source == EDG_WLL_SOURCE_LRMS) { - /* Done from JobWrapper is not sufficient for transition - * to DONE state according its current definition */ - break; - } - if (USABLE(res, strict)) { - js->pub.state = EDG_WLL_JOB_DONE; - rep(js->pub.reason, e->done.reason); - if (fine_res == RET_GOODBRANCH) { - js->pub.payload_running = 0; - } - switch (e->done.status_code) { - case EDG_WLL_DONE_CANCELLED: - js->pub.state = EDG_WLL_JOB_CANCELLED; - case EDG_WLL_DONE_OK: - rep(js->pub.location, "none"); break; - default: - free(js->pub.location); - js->pub.location = location_string( - edg_wll_SourceToString(e->done.source), - e->done.host, - e->done.src_instance); - } - } - if (USABLE_DATA(res, strict)) { - switch (e->done.status_code) { - case EDG_WLL_DONE_OK: - js->pub.exit_code = e->done.exit_code; - js->pub.done_code = EDG_WLL_STAT_OK; break; - case EDG_WLL_DONE_CANCELLED: - js->pub.exit_code = 0; - js->pub.done_code = EDG_WLL_STAT_CANCELLED; break; - case EDG_WLL_DONE_FAILED: - js->pub.exit_code = 0; - js->pub.done_code = EDG_WLL_STAT_FAILED; break; - default: - goto bad_event; break; - } - } - break; - case EDG_WLL_EVENT_CANCEL: - if (fine_res != RET_BADBRANCH) { - if (js->last_cancel_seqcode != NULL && - edg_wll_compare_seq(e->any.seqcode, js->last_cancel_seqcode) < 0) { - res = RET_LATE; - } - } - else { - res = RET_LATE; - } - if (USABLE(res, strict)) { - switch (e->cancel.status_code) { - case EDG_WLL_CANCEL_REQ: - js->pub.cancelling = 1; break; - case EDG_WLL_CANCEL_DONE: - js->pub.state = EDG_WLL_JOB_CANCELLED; - rep(js->pub.reason, e->cancel.reason); - rep(js->last_seqcode, e->any.seqcode); - rep(js->pub.location, "none"); - /* fall though */ - case EDG_WLL_CANCEL_ABORT: - js->pub.cancelling = 0; break; - default: - /* do nothing */ - break; - - } - } - if (USABLE_DATA(res, strict)) { - rep(js->pub.cancelReason, e->cancel.reason); - } - break; - case EDG_WLL_EVENT_ABORT: - if (USABLE(res, strict)) { - js->pub.state = EDG_WLL_JOB_ABORTED; - rep(js->pub.reason, e->abort.reason); - rep(js->pub.location, "none"); - js->pub.payload_running = 0; - } - break; - - case EDG_WLL_EVENT_CLEAR: - if (USABLE(res, strict)) { - js->pub.state = EDG_WLL_JOB_CLEARED; - rep(js->pub.location, "none"); - switch (e->clear.reason) { - case EDG_WLL_CLEAR_USER: - rep(js->pub.reason, "user retrieved output sandbox"); - break; - case EDG_WLL_CLEAR_TIMEOUT: - rep(js->pub.reason, "timed out, resource purge forced"); - break; - case EDG_WLL_CLEAR_NOOUTPUT: - rep(js->pub.reason, "no output was generated"); - break; - default: - goto bad_event; break; - - } - } - break; - case EDG_WLL_EVENT_PURGE: - /* ignore, meta-information only */ - break; - case EDG_WLL_EVENT_MATCH: - if (USABLE(res, strict)) { - js->pub.state = EDG_WLL_JOB_WAITING; - js->pub.location = location_string( - edg_wll_SourceToString(EDG_WLL_SOURCE_WORKLOAD_MANAGER), - e->match.host, - e->match.src_instance); - } - if (USABLE_DATA(res, strict)) { - if (USABLE_BRANCH(fine_res)) { - rep(js->pub.destination, e->match.dest_id); - } - if (e->match.dest_id) { - update_branch_state(e->any.seqcode, e->match.dest_id, - NULL, NULL, &js->branch_states); - add_stringlist(&js->pub.possible_destinations, - e->match.dest_id); - } - } - break; - case EDG_WLL_EVENT_PENDING: - if (USABLE(res, strict)) { - js->pub.state = EDG_WLL_JOB_WAITING; - rep(js->pub.reason, e->pending.reason); - js->pub.location = location_string( - edg_wll_SourceToString(EDG_WLL_SOURCE_WORKLOAD_MANAGER), - e->match.host, - e->match.src_instance); - } - break; - case EDG_WLL_EVENT_REGJOB: - if (USABLE(res, strict)) { - js->pub.state = EDG_WLL_JOB_SUBMITTED; - } - if (USABLE_DATA(res, strict)) { - rep(js->pub.jdl, e->regJob.jdl); - edg_wlc_JobIdFree(js->pub.parent_job); - edg_wlc_JobIdDup(e->regJob.parent, - &js->pub.parent_job); - rep(js->pub.network_server, e->regJob.ns); - js->pub.children_num = e->regJob.nsubjobs; - if (e->regJob.jobtype == EDG_WLL_REGJOB_DAG - || e->regJob.jobtype == EDG_WLL_REGJOB_PARTITIONED) { - js->pub.jobtype = EDG_WLL_STAT_DAG; - } - rep(js->pub.seed, e->regJob.seed); - } - break; - case EDG_WLL_EVENT_USERTAG: - if (USABLE_DATA(res, strict)) { - if (e->userTag.name != NULL && e->userTag.value != NULL) { - add_taglist(&js->pub.user_tags, - e->userTag.name, e->userTag.value); - } else { - goto bad_event; - } - } - break; - case EDG_WLL_EVENT_LISTENER: - /* ignore, listener port is not part of job status */ - break; - case EDG_WLL_EVENT_CURDESCR: - case EDG_WLL_EVENT_CHKPT: - case EDG_WLL_EVENT_CHANGEACL: - /* ignore, only for event log */ - break; - - default: - goto bad_event; - break; - } - - if (USABLE(res,strict)) { - js->pub.lastUpdateTime = e->any.timestamp; - if (old_state != js->pub.state) { - js->pub.stateEnterTime = js->pub.lastUpdateTime; - js->pub.stateEnterTimes[1 + js->pub.state] - = (int)js->pub.lastUpdateTime.tv_sec; - } - } - - if (USABLE_DATA(res,strict)) { - if (e->any.source == EDG_WLL_SOURCE_NETWORK_SERVER && - js->pub.network_server == NULL) { - char *inst; - inst = e->any.src_instance; - asprintf(&js->pub.network_server, "%s%s%s", - e->any.host, - inst != NULL ? ":" : " ", - inst != NULL ? inst : ""); - } - } - - if (e->any.type == EDG_WLL_EVENT_CANCEL) { - rep(js->last_cancel_seqcode, e->any.seqcode); - } else { - rep(js->last_seqcode, e->any.seqcode); - } - - if (fine_res == RET_GOODBRANCH) { - rep(js->last_branch_seqcode, e->any.seqcode); - } - - return res; - -bad_event: - badEvent(js,e,ev_seq); - return RET_SUSPECT; -} - /* * Helper for warning printouts */ @@ -1067,185 +465,6 @@ static int eval_expect_update(intJobStat *js, int* went_through, char **expect_f } #endif -static char* location_string(const char *source, const char *host, const char *instance) -{ - char *ret; - asprintf(&ret, "%s/%s/%s", source, host, instance); - return ret; -} - -static int add_stringlist(char ***lptr, const char *new_item) -{ - char **itptr; - int i; - - if (*lptr == NULL) { - itptr = (char **) malloc(2*sizeof(char *)); - itptr[0] = strdup(new_item); - itptr[1] = NULL; - *lptr = itptr; - return 1; - } else { - for (i = 0, itptr = *lptr; itptr[i] != NULL; i++); - itptr = (char **) realloc(*lptr, (i+2)*sizeof(char *)); - if (itptr != NULL) { - itptr[i] = strdup(new_item); - itptr[i+1] = NULL; - *lptr = itptr; - return 1; - } else { - return 0; - } - } -} - -static void free_stringlist(char ***lptr) -{ - char **itptr; - int i; - - if (*lptr) { - for (i = 0, itptr = *lptr; itptr[i] != NULL; i++) - free(itptr[i]); - free(itptr); - *lptr = NULL; - } -} - -static int add_taglist(edg_wll_TagValue **lptr, const char *new_item, const char *new_item2) -{ - edg_wll_TagValue *itptr; - int i; - - if (*lptr == NULL) { - itptr = (edg_wll_TagValue *) calloc(2,sizeof(edg_wll_TagValue)); - itptr[0].tag = strdup(new_item); - itptr[0].value = strdup(new_item2); - *lptr = itptr; - return 1; - } else { - for (i = 0, itptr = *lptr; itptr[i].tag != NULL; i++) - if ( !strcasecmp(itptr[i].tag, new_item) ) - { - free(itptr[i].value); - itptr[i].value = strdup(new_item2); - return 1; - } - itptr = (edg_wll_TagValue *) realloc(*lptr, (i+2)*sizeof(edg_wll_TagValue)); - if (itptr != NULL) { - itptr[i].tag = strdup(new_item); - itptr[i].value = strdup(new_item2); - itptr[i+1].tag = NULL; - itptr[i+1].value = NULL; - *lptr = itptr; - return 1; - } else { - return 0; - } - } -} - -static void update_branch_state(char *b, char *d, char *c, char *j, branch_state **bs) -{ - int i = 0, branch; - - - if (!b) - return; - else - branch = component_seqcode(b, EDG_WLL_SOURCE_WORKLOAD_MANAGER); - - if (*bs != NULL) { - while ((*bs)[i].branch) { - if (branch == (*bs)[i].branch) { - if (d) rep((*bs)[i].destination, d); - if (c) rep((*bs)[i].ce_node, c); - if (j) rep((*bs)[i].jdl, j); - - return; - } - i++; - } - } - - *bs = (branch_state *) realloc(*bs, (i+2)*sizeof(branch_state)); - memset(&((*bs)[i]), 0, 2*sizeof(branch_state)); - - (*bs)[i].branch = branch; - rep((*bs)[i].destination, d); - rep((*bs)[i].ce_node, c); - rep((*bs)[i].jdl, j); -} - - -static void free_branch_state(branch_state **bs) -{ - int i = 0; - - if (*bs == NULL) return; - - while ((*bs)[i].branch) { - free((*bs)[i].destination); - free((*bs)[i].ce_node); - free((*bs)[i].jdl); - i++; - } - free(*bs); - *bs = NULL; -} - -static int compare_branch_states(const void *a, const void *b) -{ - branch_state *c = (branch_state *) a; - branch_state *d = (branch_state *) b; - - if (c->branch < d->branch) return -1; - if (c->branch == d->branch) return 0; - if (c->branch > d->branch) return 1; -} - -static void load_branch_state(intJobStat *js) -{ - int i, j, branch; - - - if ( (!js->branch_tag_seqcode) || (!js->branch_states) ) return; - - branch = component_seqcode(js->branch_tag_seqcode, EDG_WLL_SOURCE_WORKLOAD_MANAGER); - - // count elements - i = 0; - while (js->branch_states[i].branch) i++; - - // sort them - qsort(js->branch_states, (size_t) i, sizeof(branch_state), - compare_branch_states); - - // find row corresponding to ReallyRunning WM seq.code (aka branch) - i = 0; - while (js->branch_states[i].branch) { - if (js->branch_states[i].branch == branch) break; - i++; - } - - // copy this and two before branches data to final state - // (each field - dest,ce,jdl - comes from different event) - // (and these events have most likely different WM seq.codes) - // (even belonging into one logical branch) - // (the newer the more important - so i-th element is copied as last) - // (and may overwrite data from previous elements) - for (j = i - 2; j <= i; j++) { - if (j >= 0) { - if (js->branch_states[j].destination) - rep(js->pub.destination, js->branch_states[j].destination); - if (js->branch_states[j].ce_node) - rep(js->pub.ce_node, js->branch_states[j].ce_node); - if (js->branch_states[j].jdl) - rep(js->pub.matched_jdl, js->branch_states[j].jdl); - } - } -} - /* XXX more thorough malloc, calloc, and asprintf failure handling */ /* XXX indexes in {short,long}_fields */ /* XXX strict mode */ diff --git a/org.glite.lb.server/src/jobstat.h b/org.glite.lb.server/src/jobstat.h index d8a7a41..7460523 100644 --- a/org.glite.lb.server/src/jobstat.h +++ b/org.glite.lb.server/src/jobstat.h @@ -49,6 +49,8 @@ typedef struct _intJobStat { } intJobStat; void destroy_intJobStat(intJobStat *); +void destroy_intJobStat_extension(intJobStat *p); + edg_wll_ErrorCode edg_wll_IColumnsSQLPart(edg_wll_Context, void *, intJobStat *, int , char **, char **); edg_wll_ErrorCode edg_wll_RefreshIColumns(edg_wll_Context, void *); @@ -63,3 +65,7 @@ int before_deep_resubmission(const char *, const char *); int same_branch(const char *, const char *); int component_seqcode(const char *a, edg_wll_Source index); int processEvent(intJobStat *, edg_wll_Event *, int, int, char **); + +int add_stringlist(char ***, const char *); +int edg_wll_compare_seq(const char *, const char *); + diff --git a/org.glite.lb.server/src/jobstat_supp.c b/org.glite.lb.server/src/jobstat_supp.c index e3bbe79..84a9cc3 100644 --- a/org.glite.lb.server/src/jobstat_supp.c +++ b/org.glite.lb.server/src/jobstat_supp.c @@ -635,67 +635,6 @@ edg_wll_ErrorCode edg_wll_IColumnsSQLPart(edg_wll_Context ctx, } -/* - * Set values of index columns in state table (after index reconfiguration) - */ - -edg_wll_ErrorCode edg_wll_RefreshIColumns(edg_wll_Context ctx, void *job_index_cols) { - - edg_wll_Stmt sh, sh2; - int njobs, ret = -1; - intJobStat *stat; - edg_wlc_JobId jobid; - char *res[5]; - char *rest; - char *icvalues, *stmt; - int i; - - edg_wll_ResetError(ctx); - if (!job_index_cols) return 0; - - if ((njobs = edg_wll_ExecStmt(ctx, "select s.jobid,s.int_status,s.seq,s.version,j.dg_jobid" - " from states s, jobs j where s.jobid=j.jobid",&sh)) < 0) { - edg_wll_FreeStmt(&sh); - return edg_wll_Error(ctx, NULL, NULL); - } - while ((ret=edg_wll_FetchRow(sh,res)) >0) { - if (strcmp(res[3], INTSTAT_VERSION)) { - stat = NULL; - if (!edg_wlc_JobIdParse(res[4], &jobid)) { - if ((stat = malloc(sizeof(intJobStat))) != NULL) { - if (edg_wll_intJobStatus(ctx, jobid, 0, stat, 1)) { - free(stat); - stat = NULL; - } - } - edg_wlc_JobIdFree(jobid); - } - } else { - stat = dec_intJobStat(res[1], &rest); - if (rest == NULL) stat = NULL; - } - if (stat == NULL) { - edg_wll_FreeStmt(&sh); - return edg_wll_SetError(ctx, EDG_WLL_ERROR_SERVER_RESPONSE, - "cannot decode int_status from states DB table"); - } - - edg_wll_IColumnsSQLPart(ctx, job_index_cols, stat, 0, NULL, &icvalues); - trio_asprintf(&stmt, "update states set seq=%s%s where jobid='%|Ss'", res[2], icvalues, res[0]); - ret = edg_wll_ExecStmt(ctx, stmt, &sh2); - edg_wll_FreeStmt(&sh2); - - for (i = 0; i < 5; i++) free(res[i]); - destroy_intJobStat(stat); free(stat); - free(stmt); free(icvalues); - - if (ret < 0) return edg_wll_Error(ctx, NULL, NULL); - - } - edg_wll_FreeStmt(&sh); - return edg_wll_Error(ctx, NULL, NULL); -} - int component_seqcode(const char *a, edg_wll_Source index) { unsigned int c[EDG_WLL_SOURCE__LAST]; diff --git a/org.glite.lb.server/src/lbs_db.c b/org.glite.lb.server/src/lbs_db.c index a54f133..515d596 100644 --- a/org.glite.lb.server/src/lbs_db.c +++ b/org.glite.lb.server/src/lbs_db.c @@ -175,33 +175,6 @@ void edg_wll_FreeStmt(edg_wll_Stmt *stmt) } } - -char *edg_wll_TimeToDB(time_t t) -{ - struct tm *tm = gmtime(&t); - char tbuf[256]; - - sprintf(tbuf,"'%4d-%02d-%02d %02d:%02d:%02d'",tm->tm_year+1900,tm->tm_mon+1, - tm->tm_mday,tm->tm_hour,tm->tm_min,tm->tm_sec); - - return strdup(tbuf); -} - -time_t edg_wll_DBToTime(char *t) -{ - struct tm tm; - - memset(&tm,0,sizeof(tm)); - setenv("TZ","UTC",1); tzset(); - sscanf(t,"%4d-%02d-%02d %02d:%02d:%02d", - &tm.tm_year,&tm.tm_mon,&tm.tm_mday, - &tm.tm_hour,&tm.tm_min,&tm.tm_sec); - tm.tm_year -= 1900; - tm.tm_mon--; - - return mktime(&tm); -} - int edg_wll_DBCheckVersion(edg_wll_Context ctx) { MYSQL *m = (MYSQL *) ctx->mysql; diff --git a/org.glite.lb.server/src/lbs_db_supp.c b/org.glite.lb.server/src/lbs_db_supp.c new file mode 100644 index 0000000..6aacf39 --- /dev/null +++ b/org.glite.lb.server/src/lbs_db_supp.c @@ -0,0 +1,30 @@ +#include +#include +#include +#include + +char *edg_wll_TimeToDB(time_t t) +{ + struct tm *tm = gmtime(&t); + char tbuf[256]; + + sprintf(tbuf,"'%4d-%02d-%02d %02d:%02d:%02d'",tm->tm_year+1900,tm->tm_mon+1, + tm->tm_mday,tm->tm_hour,tm->tm_min,tm->tm_sec); + + return strdup(tbuf); +} + +time_t edg_wll_DBToTime(char *t) +{ + struct tm tm; + + memset(&tm,0,sizeof(tm)); + setenv("TZ","UTC",1); tzset(); + sscanf(t,"%4d-%02d-%02d %02d:%02d:%02d", + &tm.tm_year,&tm.tm_mon,&tm.tm_mday, + &tm.tm_hour,&tm.tm_min,&tm.tm_sec); + tm.tm_year -= 1900; + tm.tm_mon--; + + return mktime(&tm); +} diff --git a/org.glite.lb.server/src/process_event.c b/org.glite.lb.server/src/process_event.c new file mode 100644 index 0000000..ab13ed4 --- /dev/null +++ b/org.glite.lb.server/src/process_event.c @@ -0,0 +1,795 @@ +#include +#include +#include +#include +#include + +#include "glite/lb/producer.h" +#include "glite/lb/context-int.h" + +#include "jobstat.h" + +/* TBD: share in whole logging or workload */ +#ifdef __GNUC__ +#define UNUSED_VAR __attribute__((unused)) +#else +#define UNUSED_VAR +#endif + +#define rep(a,b) { free(a); a = (b == NULL) ? NULL : strdup(b); } + +static void free_stringlist(char ***lptr) +{ + char **itptr; + int i; + + if (*lptr) { + for (i = 0, itptr = *lptr; itptr[i] != NULL; i++) + free(itptr[i]); + free(itptr); + *lptr = NULL; + } +} + +static int add_taglist(edg_wll_TagValue **lptr, const char *new_item, const char *new_item2) +{ + edg_wll_TagValue *itptr; + int i; + + if (*lptr == NULL) { + itptr = (edg_wll_TagValue *) calloc(2,sizeof(edg_wll_TagValue)); + itptr[0].tag = strdup(new_item); + itptr[0].value = strdup(new_item2); + *lptr = itptr; + return 1; + } else { + for (i = 0, itptr = *lptr; itptr[i].tag != NULL; i++) + if ( !strcasecmp(itptr[i].tag, new_item) ) + { + free(itptr[i].value); + itptr[i].value = strdup(new_item2); + return 1; + } + itptr = (edg_wll_TagValue *) realloc(*lptr, (i+2)*sizeof(edg_wll_TagValue)); + if (itptr != NULL) { + itptr[i].tag = strdup(new_item); + itptr[i].value = strdup(new_item2); + itptr[i+1].tag = NULL; + itptr[i+1].value = NULL; + *lptr = itptr; + return 1; + } else { + return 0; + } + } +} + + +static void update_branch_state(char *b, char *d, char *c, char *j, branch_state **bs) +{ + int i = 0, branch; + + + if (!b) + return; + else + branch = component_seqcode(b, EDG_WLL_SOURCE_WORKLOAD_MANAGER); + + if (*bs != NULL) { + while ((*bs)[i].branch) { + if (branch == (*bs)[i].branch) { + if (d) rep((*bs)[i].destination, d); + if (c) rep((*bs)[i].ce_node, c); + if (j) rep((*bs)[i].jdl, j); + + return; + } + i++; + } + } + + *bs = (branch_state *) realloc(*bs, (i+2)*sizeof(branch_state)); + memset(&((*bs)[i]), 0, 2*sizeof(branch_state)); + + (*bs)[i].branch = branch; + rep((*bs)[i].destination, d); + rep((*bs)[i].ce_node, c); + rep((*bs)[i].jdl, j); +} + + +static void free_branch_state(branch_state **bs) +{ + int i = 0; + + if (*bs == NULL) return; + + while ((*bs)[i].branch) { + free((*bs)[i].destination); + free((*bs)[i].ce_node); + free((*bs)[i].jdl); + i++; + } + free(*bs); + *bs = NULL; +} + +static int compare_branch_states(const void *a, const void *b) +{ + branch_state *c = (branch_state *) a; + branch_state *d = (branch_state *) b; + + if (c->branch < d->branch) return -1; + if (c->branch == d->branch) return 0; + /* avoid warning: if (c->branch > d->branch) */ return 1; +} + +static void load_branch_state(intJobStat *js) +{ + int i, j, branch; + + + if ( (!js->branch_tag_seqcode) || (!js->branch_states) ) return; + + branch = component_seqcode(js->branch_tag_seqcode, EDG_WLL_SOURCE_WORKLOAD_MANAGER); + + // count elements + i = 0; + while (js->branch_states[i].branch) i++; + + // sort them + qsort(js->branch_states, (size_t) i, sizeof(branch_state), + compare_branch_states); + + // find row corresponding to ReallyRunning WM seq.code (aka branch) + i = 0; + while (js->branch_states[i].branch) { + if (js->branch_states[i].branch == branch) break; + i++; + } + + // copy this and two before branches data to final state + // (each field - dest,ce,jdl - comes from different event) + // (and these events have most likely different WM seq.codes) + // (even belonging into one logical branch) + // (the newer the more important - so i-th element is copied as last) + // (and may overwrite data from previous elements) + for (j = i - 2; j <= i; j++) { + if (j >= 0) { + if (js->branch_states[j].destination) + rep(js->pub.destination, js->branch_states[j].destination); + if (js->branch_states[j].ce_node) + rep(js->pub.ce_node, js->branch_states[j].ce_node); + if (js->branch_states[j].jdl) + rep(js->pub.matched_jdl, js->branch_states[j].jdl); + } + } +} + + +static char* location_string(const char *source, const char *host, const char *instance) +{ + char *ret; + asprintf(&ret, "%s/%s/%s", source, host, instance); + return ret; +} + + +static int badEvent(intJobStat *js UNUSED_VAR, edg_wll_Event *e, int ev_seq UNUSED_VAR) +{ + char *str; + + str = edg_wll_EventToString(e->any.type); + fprintf(stderr, "edg_wll_JobStatus: bad event: type %d (%s)\n", + e->any.type, (str == NULL) ? "unknown" : str); + free(str); + return RET_FATAL; +} + +#define USABLE(res,strict) ((res) == RET_OK || ( (res) == RET_SOON && !strict)) +#define USABLE_DATA(res,strict) ((res) == RET_OK || ( (res) != RET_FATAL && !strict)) +#define USABLE_BRANCH(fine_res) ((fine_res) != RET_TOOOLD && (fine_res) != RET_BADBRANCH) +#define LRMS_STATE(state) ((state) == EDG_WLL_JOB_RUNNING || (state) == EDG_WLL_JOB_DONE) + + +int processEvent(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict, char **errstring) +{ + + edg_wll_JobStatCode old_state = js->pub.state; + edg_wll_JobStatCode new_state = EDG_WLL_JOB_UNKNOWN; + int res = RET_OK, + fine_res = RET_OK; + + + + if (old_state == EDG_WLL_JOB_ABORTED || + old_state == EDG_WLL_JOB_CANCELLED || + old_state == EDG_WLL_JOB_CLEARED) { + res = RET_LATE; + } + + + if (js->deep_resubmit_seqcode && + before_deep_resubmission(e->any.seqcode, js->deep_resubmit_seqcode)) { + res = RET_LATE; + fine_res = RET_TOOOLD; + } + else if (js->branch_tag_seqcode) { // ReallyRunning ev. arrived + if (same_branch(e->any.seqcode, js->branch_tag_seqcode)) { + if ((js->last_seqcode != NULL) && + edg_wll_compare_seq(e->any.seqcode, js->last_branch_seqcode) < 0) { + res = RET_LATE; + } + fine_res = RET_GOODBRANCH; + } + else { + res = RET_LATE; + fine_res = RET_BADBRANCH; + } + } + else if ((js->last_seqcode != NULL) && + edg_wll_compare_seq(e->any.seqcode, js->last_seqcode) < 0) { + res = RET_LATE; + } + + + switch (e->any.type) { + case EDG_WLL_EVENT_TRANSFER: + if (e->transfer.result == EDG_WLL_TRANSFER_OK) { + switch (e->transfer.source) { + case EDG_WLL_SOURCE_USER_INTERFACE: + new_state = EDG_WLL_JOB_WAITING; break; + case EDG_WLL_SOURCE_JOB_SUBMISSION: + /* if (LRMS_STATE(old_state)) res = RET_LATE; */ + new_state = EDG_WLL_JOB_READY; break; + case EDG_WLL_SOURCE_LOG_MONITOR: + if (LRMS_STATE(old_state)) { + js->pub.stateEnterTimes[1 + EDG_WLL_JOB_SCHEDULED] = + e->any.timestamp.tv_sec; + res = RET_LATE; + } + new_state = EDG_WLL_JOB_SCHEDULED; break; + default: + goto bad_event; break; + } + } else if (e->transfer.result == EDG_WLL_TRANSFER_FAIL) { + /* transfer failed */ + switch (e->transfer.source) { + case EDG_WLL_SOURCE_USER_INTERFACE: + new_state = EDG_WLL_JOB_SUBMITTED; break; + case EDG_WLL_SOURCE_JOB_SUBMISSION: + if (LRMS_STATE(old_state)) res = RET_LATE; + new_state = EDG_WLL_JOB_READY; break; + case EDG_WLL_SOURCE_LOG_MONITOR: + if (LRMS_STATE(old_state)) res = RET_LATE; + new_state = EDG_WLL_JOB_READY; break; + default: + goto bad_event; break; + } + } else { + /* e->transfer.result == EDG_WLL_TRANSFER_START */ + res = RET_IGNORE; + } + if (USABLE(res, strict)) { + js->pub.state = new_state; + rep(js->pub.reason, e->transfer.reason); + + free(js->pub.location); + if (e->transfer.result == EDG_WLL_TRANSFER_OK) { + js->pub.location = location_string( + edg_wll_SourceToString(e->transfer.destination), + e->transfer.dest_host, + e->transfer.dest_instance); + } else { + js->pub.location = location_string( + edg_wll_SourceToString(e->transfer.source), + e->transfer.host, + e->transfer.src_instance); + } + } + if (USABLE_DATA(res, strict)) { + switch (e->transfer.source) { + case EDG_WLL_SOURCE_USER_INTERFACE: + rep(js->pub.jdl, e->transfer.job); break; + case EDG_WLL_SOURCE_JOB_SUBMISSION: + rep(js->pub.condor_jdl, e->transfer.job); break; + case EDG_WLL_SOURCE_LOG_MONITOR: + rep(js->pub.rsl, e->transfer.job); break; + default: + goto bad_event; break; + + } + } + break; + case EDG_WLL_EVENT_ACCEPTED: + switch (e->accepted.source) { + case EDG_WLL_SOURCE_NETWORK_SERVER: + new_state = EDG_WLL_JOB_WAITING; break; + case EDG_WLL_SOURCE_LOG_MONITOR: + if (LRMS_STATE(old_state)) res = RET_LATE; + new_state = EDG_WLL_JOB_READY; break; + case EDG_WLL_SOURCE_LRMS: + new_state = EDG_WLL_JOB_SCHEDULED; break; + default: + goto bad_event; break; + } + if (USABLE(res, strict)) { + js->pub.state = new_state; + free(js->pub.location); + js->pub.location = location_string( + edg_wll_SourceToString(e->accepted.source), + e->accepted.host, + e->accepted.src_instance); + } + if (USABLE_DATA(res, strict)) { + switch (e->accepted.source) { + case EDG_WLL_SOURCE_NETWORK_SERVER: + break; /* no WM id */ + case EDG_WLL_SOURCE_LOG_MONITOR: + rep(js->pub.condorId, e->accepted.local_jobid); break; + case EDG_WLL_SOURCE_LRMS: + /* XXX localId */ + rep(js->pub.globusId, e->accepted.local_jobid); break; + default: + goto bad_event; break; + } + } + break; + case EDG_WLL_EVENT_REFUSED: + switch (e->refused.source) { + case EDG_WLL_SOURCE_NETWORK_SERVER: + new_state = EDG_WLL_JOB_SUBMITTED; break; + case EDG_WLL_SOURCE_LOG_MONITOR: + new_state = EDG_WLL_JOB_READY; break; + case EDG_WLL_SOURCE_LRMS: + new_state = EDG_WLL_JOB_READY; break; + default: + goto bad_event; break; + } + if (USABLE(res, strict)) { + js->pub.state = new_state; + rep(js->pub.reason, e->refused.reason); + + free(js->pub.location); + js->pub.location = location_string( + edg_wll_SourceToString(e->refused.from), + e->refused.from_host, + e->refused.from_instance); + } + break; + case EDG_WLL_EVENT_ENQUEUED: + if (e->enQueued.result == EDG_WLL_ENQUEUED_OK) { + switch (e->enQueued.source) { + case EDG_WLL_SOURCE_NETWORK_SERVER: + new_state = EDG_WLL_JOB_WAITING; break; + case EDG_WLL_SOURCE_WORKLOAD_MANAGER: + if (LRMS_STATE(old_state)) res = RET_LATE; + update_branch_state(e->any.seqcode, NULL, + NULL, e->enQueued.job, &js->branch_states); + new_state = EDG_WLL_JOB_READY; break; + case EDG_WLL_SOURCE_LOG_MONITOR: + new_state = EDG_WLL_JOB_WAITING; break; + default: + goto bad_event; break; + } + } else if (e->enQueued.result == EDG_WLL_ENQUEUED_FAIL) { + switch (e->enQueued.source) { + case EDG_WLL_SOURCE_NETWORK_SERVER: + new_state = EDG_WLL_JOB_WAITING; break; + case EDG_WLL_SOURCE_WORKLOAD_MANAGER: + new_state = EDG_WLL_JOB_WAITING; break; + case EDG_WLL_SOURCE_LOG_MONITOR: + new_state = old_state; break; + default: + goto bad_event; break; + } + } else { + /* e->enQueued.result == EDG_WLL_ENQUEUED_START */ + res = RET_IGNORE; + } + if (USABLE(res, strict)) { + js->pub.state = new_state; + rep(js->pub.reason, e->enQueued.reason); + + free(js->pub.location); + if (e->enQueued.result == EDG_WLL_ENQUEUED_OK) { + js->pub.location = location_string( + e->enQueued.queue, + e->enQueued.host, + e->enQueued.src_instance); + if (e->enQueued.source == EDG_WLL_SOURCE_LOG_MONITOR) + js->pub.resubmitted = 1; + } else { + js->pub.location = location_string( + edg_wll_SourceToString(e->enQueued.source), + e->enQueued.host, + e->enQueued.src_instance); + } + } + if (USABLE_DATA(res, strict)) { + switch (e->enQueued.source) { + case EDG_WLL_SOURCE_NETWORK_SERVER: + rep(js->pub.jdl, e->enQueued.job); break; + case EDG_WLL_SOURCE_WORKLOAD_MANAGER: + if (USABLE_BRANCH(res)) { + rep(js->pub.matched_jdl, e->enQueued.job); + } + break; + case EDG_WLL_SOURCE_LOG_MONITOR: + /* no interim JDL here */ + break; + default: + goto bad_event; break; + } + } + break; + case EDG_WLL_EVENT_DEQUEUED: + switch (e->deQueued.source) { + case EDG_WLL_SOURCE_WORKLOAD_MANAGER: + new_state = EDG_WLL_JOB_WAITING; break; + case EDG_WLL_SOURCE_JOB_SUBMISSION: + if (LRMS_STATE(old_state)) res = RET_LATE; + new_state = EDG_WLL_JOB_READY; break; + default: + goto bad_event; break; + } + if (USABLE(res, strict)) { + js->pub.state = new_state; + free(js->pub.location); + js->pub.location = location_string( + edg_wll_SourceToString(e->deQueued.source), + e->deQueued.host, + e->deQueued.src_instance); + } + if (USABLE_DATA(res, strict)) { + /* no WM/JSS local jobid */ + } + break; + case EDG_WLL_EVENT_HELPERCALL: + if (USABLE(res, strict)) { + js->pub.state = EDG_WLL_JOB_WAITING; + free(js->pub.location); + js->pub.location = location_string( + e->helperCall.helper_name, + e->helperCall.host, + e->helperCall.src_instance); + /* roles and params used only for debugging */ + } + break; + case EDG_WLL_EVENT_HELPERRETURN: + if (USABLE(res, strict)) { + js->pub.state = EDG_WLL_JOB_WAITING; + free(js->pub.location); + js->pub.location = location_string( + edg_wll_SourceToString(EDG_WLL_SOURCE_WORKLOAD_MANAGER), + e->helperReturn.host, + e->helperReturn.src_instance); + /* roles and retvals used only for debugging */ + } + break; + case EDG_WLL_EVENT_RUNNING: + if (USABLE(res, strict)) { + js->pub.state = EDG_WLL_JOB_RUNNING; + free(js->pub.location); + js->pub.location = location_string( + edg_wll_SourceToString(EDG_WLL_SOURCE_LRMS), + "worknode", + e->running.node); + } + if (USABLE_DATA(res, strict)) { + if (USABLE_BRANCH(fine_res)) { + rep(js->pub.ce_node, e->running.node); + } + if (e->any.source == EDG_WLL_SOURCE_LOG_MONITOR) { + if (e->running.node) { + update_branch_state(e->any.seqcode, NULL, + e->running.node, NULL, &js->branch_states); + add_stringlist(&js->pub.possible_ce_nodes, + e->running.node); + } + } + } + break; + case EDG_WLL_EVENT_REALLYRUNNING: + if (USABLE_DATA(res, strict)) { + js->pub.state = EDG_WLL_JOB_RUNNING; + free(js->pub.location); + js->pub.location = location_string( + edg_wll_SourceToString(EDG_WLL_SOURCE_LRMS), + "worknode", + e->running.node); + js->pub.payload_running = 1; + if (e->any.source == EDG_WLL_SOURCE_LRMS) { + rep(js->branch_tag_seqcode, e->any.seqcode); + rep(js->last_branch_seqcode, e->any.seqcode); + } + if (e->any.source == EDG_WLL_SOURCE_LOG_MONITOR) { + rep(js->branch_tag_seqcode, e->reallyRunning.wn_seq); + rep(js->last_branch_seqcode, e->reallyRunning.wn_seq); + } + + load_branch_state(js); + } + break; + case EDG_WLL_EVENT_RESUBMISSION: + if (USABLE(res, strict)) { + if (e->resubmission.result == EDG_WLL_RESUBMISSION_WONTRESUB) { + rep(js->pub.reason, e->resubmission.reason); + } + } + if (USABLE_DATA(res, strict)) { + if (e->resubmission.result == EDG_WLL_RESUBMISSION_WONTRESUB) { + js->resubmit_type = EDG_WLL_RESUBMISSION_WONTRESUB; + } + else + if (e->resubmission.result == EDG_WLL_RESUBMISSION_WILLRESUB && + e->any.source == EDG_WLL_SOURCE_WORKLOAD_MANAGER) { + js->resubmit_type = EDG_WLL_RESUBMISSION_WILLRESUB; + free_stringlist(&js->pub.possible_destinations); + free_stringlist(&js->pub.possible_ce_nodes); + free_branch_state(&js->branch_states); + js->pub.payload_running = 0; + rep(js->branch_tag_seqcode, NULL); + rep(js->deep_resubmit_seqcode, e->any.seqcode); + } + else + if (e->resubmission.result == EDG_WLL_RESUBMISSION_SHALLOW) { + js->resubmit_type = EDG_WLL_RESUBMISSION_SHALLOW; + rep(js->deep_resubmit_seqcode, NULL); + } + } + break; + case EDG_WLL_EVENT_DONE: + if (e->any.source == EDG_WLL_SOURCE_LRMS) { + /* Done from JobWrapper is not sufficient for transition + * to DONE state according its current definition */ + break; + } + if (USABLE(res, strict)) { + js->pub.state = EDG_WLL_JOB_DONE; + rep(js->pub.reason, e->done.reason); + if (fine_res == RET_GOODBRANCH) { + js->pub.payload_running = 0; + } + switch (e->done.status_code) { + case EDG_WLL_DONE_CANCELLED: + js->pub.state = EDG_WLL_JOB_CANCELLED; + case EDG_WLL_DONE_OK: + rep(js->pub.location, "none"); break; + default: + free(js->pub.location); + js->pub.location = location_string( + edg_wll_SourceToString(e->done.source), + e->done.host, + e->done.src_instance); + } + } + if (USABLE_DATA(res, strict)) { + switch (e->done.status_code) { + case EDG_WLL_DONE_OK: + js->pub.exit_code = e->done.exit_code; + js->pub.done_code = EDG_WLL_STAT_OK; break; + case EDG_WLL_DONE_CANCELLED: + js->pub.exit_code = 0; + js->pub.done_code = EDG_WLL_STAT_CANCELLED; break; + case EDG_WLL_DONE_FAILED: + js->pub.exit_code = 0; + js->pub.done_code = EDG_WLL_STAT_FAILED; break; + default: + goto bad_event; break; + } + } + break; + case EDG_WLL_EVENT_CANCEL: + if (fine_res != RET_BADBRANCH) { + if (js->last_cancel_seqcode != NULL && + edg_wll_compare_seq(e->any.seqcode, js->last_cancel_seqcode) < 0) { + res = RET_LATE; + } + } + else { + res = RET_LATE; + } + if (USABLE(res, strict)) { + switch (e->cancel.status_code) { + case EDG_WLL_CANCEL_REQ: + js->pub.cancelling = 1; break; + case EDG_WLL_CANCEL_DONE: + js->pub.state = EDG_WLL_JOB_CANCELLED; + rep(js->pub.reason, e->cancel.reason); + rep(js->last_seqcode, e->any.seqcode); + rep(js->pub.location, "none"); + /* fall though */ + case EDG_WLL_CANCEL_ABORT: + js->pub.cancelling = 0; break; + default: + /* do nothing */ + break; + + } + } + if (USABLE_DATA(res, strict)) { + rep(js->pub.cancelReason, e->cancel.reason); + } + break; + case EDG_WLL_EVENT_ABORT: + if (USABLE(res, strict)) { + js->pub.state = EDG_WLL_JOB_ABORTED; + rep(js->pub.reason, e->abort.reason); + rep(js->pub.location, "none"); + js->pub.payload_running = 0; + } + break; + + case EDG_WLL_EVENT_CLEAR: + if (USABLE(res, strict)) { + js->pub.state = EDG_WLL_JOB_CLEARED; + rep(js->pub.location, "none"); + switch (e->clear.reason) { + case EDG_WLL_CLEAR_USER: + rep(js->pub.reason, "user retrieved output sandbox"); + break; + case EDG_WLL_CLEAR_TIMEOUT: + rep(js->pub.reason, "timed out, resource purge forced"); + break; + case EDG_WLL_CLEAR_NOOUTPUT: + rep(js->pub.reason, "no output was generated"); + break; + default: + goto bad_event; break; + + } + } + break; + case EDG_WLL_EVENT_PURGE: + /* ignore, meta-information only */ + break; + case EDG_WLL_EVENT_MATCH: + if (USABLE(res, strict)) { + js->pub.state = EDG_WLL_JOB_WAITING; + js->pub.location = location_string( + edg_wll_SourceToString(EDG_WLL_SOURCE_WORKLOAD_MANAGER), + e->match.host, + e->match.src_instance); + } + if (USABLE_DATA(res, strict)) { + if (USABLE_BRANCH(fine_res)) { + rep(js->pub.destination, e->match.dest_id); + } + if (e->match.dest_id) { + update_branch_state(e->any.seqcode, e->match.dest_id, + NULL, NULL, &js->branch_states); + add_stringlist(&js->pub.possible_destinations, + e->match.dest_id); + } + } + break; + case EDG_WLL_EVENT_PENDING: + if (USABLE(res, strict)) { + js->pub.state = EDG_WLL_JOB_WAITING; + rep(js->pub.reason, e->pending.reason); + js->pub.location = location_string( + edg_wll_SourceToString(EDG_WLL_SOURCE_WORKLOAD_MANAGER), + e->match.host, + e->match.src_instance); + } + break; + case EDG_WLL_EVENT_REGJOB: + if (USABLE(res, strict)) { + js->pub.state = EDG_WLL_JOB_SUBMITTED; + } + if (USABLE_DATA(res, strict)) { + rep(js->pub.jdl, e->regJob.jdl); + edg_wlc_JobIdFree(js->pub.parent_job); + edg_wlc_JobIdDup(e->regJob.parent, + &js->pub.parent_job); + rep(js->pub.network_server, e->regJob.ns); + js->pub.children_num = e->regJob.nsubjobs; + if (e->regJob.jobtype == EDG_WLL_REGJOB_DAG + || e->regJob.jobtype == EDG_WLL_REGJOB_PARTITIONED) { + js->pub.jobtype = EDG_WLL_STAT_DAG; + } + rep(js->pub.seed, e->regJob.seed); + } + break; + case EDG_WLL_EVENT_USERTAG: + if (USABLE_DATA(res, strict)) { + if (e->userTag.name != NULL && e->userTag.value != NULL) { + add_taglist(&js->pub.user_tags, + e->userTag.name, e->userTag.value); + } else { + goto bad_event; + } + } + break; + case EDG_WLL_EVENT_LISTENER: + /* ignore, listener port is not part of job status */ + break; + case EDG_WLL_EVENT_CURDESCR: + case EDG_WLL_EVENT_CHKPT: + case EDG_WLL_EVENT_CHANGEACL: + /* ignore, only for event log */ + break; + + default: + goto bad_event; + break; + } + + if (USABLE(res,strict)) { + js->pub.lastUpdateTime = e->any.timestamp; + if (old_state != js->pub.state) { + js->pub.stateEnterTime = js->pub.lastUpdateTime; + js->pub.stateEnterTimes[1 + js->pub.state] + = (int)js->pub.lastUpdateTime.tv_sec; + } + } + + if (USABLE_DATA(res,strict)) { + if (e->any.source == EDG_WLL_SOURCE_NETWORK_SERVER && + js->pub.network_server == NULL) { + char *inst; + inst = e->any.src_instance; + asprintf(&js->pub.network_server, "%s%s%s", + e->any.host, + inst != NULL ? ":" : " ", + inst != NULL ? inst : ""); + } + } + + if (e->any.type == EDG_WLL_EVENT_CANCEL) { + rep(js->last_cancel_seqcode, e->any.seqcode); + } else { + rep(js->last_seqcode, e->any.seqcode); + } + + if (fine_res == RET_GOODBRANCH) { + rep(js->last_branch_seqcode, e->any.seqcode); + } + + return res; + +bad_event: + badEvent(js,e,ev_seq); + return RET_SUSPECT; +} + +int add_stringlist(char ***lptr, const char *new_item) +{ + char **itptr; + int i; + + if (*lptr == NULL) { + itptr = (char **) malloc(2*sizeof(char *)); + itptr[0] = strdup(new_item); + itptr[1] = NULL; + *lptr = itptr; + return 1; + } else { + for (i = 0, itptr = *lptr; itptr[i] != NULL; i++); + itptr = (char **) realloc(*lptr, (i+2)*sizeof(char *)); + if (itptr != NULL) { + itptr[i] = strdup(new_item); + itptr[i+1] = NULL; + *lptr = itptr; + return 1; + } else { + return 0; + } + } +} + +void destroy_intJobStat_extension(intJobStat *p) +{ + free(p->last_seqcode); p->last_seqcode = NULL; + free(p->last_cancel_seqcode); p->last_cancel_seqcode = NULL; + p->resubmit_type = EDG_WLL_RESUBMISSION_UNDEFINED; +} + +void destroy_intJobStat(intJobStat *p) +{ + edg_wll_FreeStatus(&p->pub); + destroy_intJobStat_extension(p); + memset(p, 0, sizeof(intJobStat)); +} + -- 1.8.2.3