From 00991066e3a8957e1fccabdbb9508ecae726aa99 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Ji=C5=99=C3=AD=20Filipovi=C4=8D?= Date: Tue, 22 Feb 2011 09:38:45 +0000 Subject: [PATCH] file trasfer collections -- initial implementation --- org.glite.lb.client/Makefile | 2 +- org.glite.lb.client/interface/producer.h.T | 16 +++++++ org.glite.lb.client/src/producer.c | 51 +++++++++++++++++----- org.glite.lb.server/src/db_store.c | 3 +- org.glite.lb.server/src/jobstat.c | 28 +++++++++++- org.glite.lb.server/src/lb_html.c | 7 ++- org.glite.lb.state-machine/src/process_event.c | 4 ++ .../src/process_event_file_transfer.c | 9 +++- .../src/process_event_file_transfer_collection.c | 26 ++++++++++- 9 files changed, 129 insertions(+), 17 deletions(-) diff --git a/org.glite.lb.client/Makefile b/org.glite.lb.client/Makefile index 3955278..43b8dce 100644 --- a/org.glite.lb.client/Makefile +++ b/org.glite.lb.client/Makefile @@ -157,7 +157,7 @@ endif lib_LTLIBRARIES=${c_LIBRARIES} ${plus_LIBRARIES} TOOLS:=${LB_PERF_TOOLS} -EXAMPLES_SRC:=log_usertag_proxy.c job_log.c job_reg.c feed_shark.c query_ext.c query_seq_code.c stats.c stats-duration-fromto.c abort_job.c change_acl.c stresslog.c flood_proxy.c dagids.c stress_context.c parse_eventsfile.c test_changed_jdl.c done_failed_events.c +EXAMPLES_SRC:=log_usertag_proxy.c job_log.c job_reg.c feed_shark.c query_ext.c query_seq_code.c stats.c stats-duration-fromto.c abort_job.c change_acl.c stresslog.c flood_proxy.c dagids.c stress_context.c parse_eventsfile.c test_changed_jdl.c done_failed_events.c sandbox_collection_reg_dummy.c EXAMPLES:=${EXAMPLES_SRC:.c=} # TODO: migrate them here from branch_RC31_3 diff --git a/org.glite.lb.client/interface/producer.h.T b/org.glite.lb.client/interface/producer.h.T index 05648c5..91177aa 100644 --- a/org.glite.lb.client/interface/producer.h.T +++ b/org.glite.lb.client/interface/producer.h.T @@ -361,6 +361,22 @@ extern int edg_wll_RegisterSubjobsProxy( glite_jobid_t const * subjobs ); +/** + * Register file transfer subjobs in a batch. + * \param[in,out] context context to work with + * \param[in] parent parent's jobId + * \param[in] ns network server contact + * \param[in] nsubjobs number of subjobs + * \param[in] subjobs array of jobid's in the same order + */ + +extern int edg_wll_RegisterFTSubjobs( + edg_wll_Context ctx, + glite_jobid_const_t parent, + const char * ns, + int nsubjobs, + edg_wlc_JobId const * subjobs +); /** * Change ACL for given job. diff --git a/org.glite.lb.client/src/producer.c b/org.glite.lb.client/src/producer.c index 9f31eb8..1f09a71 100644 --- a/org.glite.lb.client/src/producer.c +++ b/org.glite.lb.client/src/producer.c @@ -833,7 +833,8 @@ static int edg_wll_RegisterJobMaster( } if ((type == EDG_WLL_REGJOB_DAG || type == EDG_WLL_REGJOB_PARTITIONED || - type == EDG_WLL_REGJOB_COLLECTION) + type == EDG_WLL_REGJOB_COLLECTION || + type == EDG_WLL_REGJOB_FILE_TRANSFER_COLLECTION) && num_subjobs > 0) { err = edg_wll_GenerateSubjobIds(ctx,job,num_subjobs,seed,subjobs); edg_wll_SetSequenceCode(ctx, NULL, EDG_WLL_SEQ_NORMAL); @@ -1039,8 +1040,10 @@ static int edg_wll_RegisterSubjobsMaster( edg_wll_Context ctx, int logging_flags, glite_jobid_const_t parent, + enum edg_wll_RegJobJobtype type, char const * const * jdls, const char * ns, + int nsubjobs, edg_wlc_JobId const * subjobs) { char const * const *pjdl; @@ -1049,6 +1052,7 @@ static int edg_wll_RegisterSubjobsMaster( char * oldctxseq; int errcode = 0; char * errdesc = NULL; + int i; if (edg_wll_GetLoggingJob(ctx, &oldctxjob)) return edg_wll_Error(ctx, NULL, NULL); oldctxseq = edg_wll_GetSequenceCode(ctx); @@ -1056,14 +1060,30 @@ static int edg_wll_RegisterSubjobsMaster( pjdl = jdls; psubjob = subjobs; - while (*pjdl != NULL) { - if (edg_wll_RegisterJobMaster(ctx, logging_flags, - *psubjob, EDG_WLL_REGJOB_SIMPLE, *pjdl, - ns, parent, 0, NULL, NULL, NULL) != 0) { - errcode = edg_wll_Error(ctx, NULL, &errdesc); - goto edg_wll_registersubjobsmaster_end; + if (type == EDG_WLL_REGJOB_SIMPLE) + while (*pjdl != NULL) { + if (edg_wll_RegisterJobMaster(ctx, logging_flags, + *psubjob, EDG_WLL_REGJOB_SIMPLE, *pjdl, + ns, parent, 0, NULL, NULL, NULL) != 0) { + errcode = edg_wll_Error(ctx, NULL, &errdesc); + goto edg_wll_registersubjobsmaster_end; + } + pjdl++; psubjob++; } - pjdl++; psubjob++; + else if (type == EDG_WLL_REGJOB_FILE_TRANSFER) + for (i = 0; i < nsubjobs; i++){ + if (edg_wll_RegisterJobMaster(ctx, logging_flags, + *psubjob, EDG_WLL_REGJOB_FILE_TRANSFER, NULL, + ns, parent, 0, NULL, NULL, NULL) != 0) { + errcode = edg_wll_Error(ctx, NULL, &errdesc); + goto edg_wll_registersubjobsmaster_end; + } + psubjob++; + } + else { + errcode = 1; + errdesc = strdup("Unsupported job type."); + goto edg_wll_registersubjobsmaster_end; } edg_wll_registersubjobsmaster_end: @@ -1090,7 +1110,7 @@ int edg_wll_RegisterSubjobs( edg_wlc_JobId const * subjobs) { return edg_wll_RegisterSubjobsMaster(ctx,EDG_WLL_LOGFLAG_LOCAL, - parent, jdls, ns, subjobs); + parent, EDG_WLL_REGJOB_SIMPLE, jdls, ns, 0, subjobs); } /** @@ -1107,8 +1127,19 @@ int edg_wll_RegisterSubjobsProxy( edg_wlc_JobId const * subjobs) { return edg_wll_RegisterSubjobsMaster(ctx,EDG_WLL_LOGFLAG_PROXY, - parent, jdls, ns, subjobs); + parent, EDG_WLL_REGJOB_SIMPLE, jdls, ns, 0, subjobs); + +} +int edg_wll_RegisterFTSubjobs( + edg_wll_Context ctx, + glite_jobid_const_t parent, + const char * ns, + int nsubjobs, + edg_wlc_JobId const * subjobs) +{ + return edg_wll_RegisterSubjobsMaster(ctx,EDG_WLL_LOGFLAG_LOCAL, + parent, EDG_WLL_REGJOB_FILE_TRANSFER, NULL, ns, nsubjobs, subjobs); } /** diff --git a/org.glite.lb.server/src/db_store.c b/org.glite.lb.server/src/db_store.c index 60cd29f..7a834a6 100644 --- a/org.glite.lb.server/src/db_store.c +++ b/org.glite.lb.server/src/db_store.c @@ -132,7 +132,8 @@ db_store(edg_wll_Context ctx, char *event) if (ev->any.type == EDG_WLL_EVENT_REGJOB && (ev->regJob.jobtype == EDG_WLL_REGJOB_DAG || ev->regJob.jobtype == EDG_WLL_REGJOB_PARTITIONED || - ev->regJob.jobtype == EDG_WLL_REGJOB_COLLECTION) && + ev->regJob.jobtype == EDG_WLL_REGJOB_COLLECTION /*|| + ev->regJob.jobtype == EDG_WLL_REGJOB_FILE_TRANSFER_COLLECTION*/) && ev->regJob.nsubjobs > 0) { if (register_subjobs_embryonic(ctx,&ev->regJob)) goto rollback; diff --git a/org.glite.lb.server/src/jobstat.c b/org.glite.lb.server/src/jobstat.c index 47d684a..c62b2d4 100644 --- a/org.glite.lb.server/src/jobstat.c +++ b/org.glite.lb.server/src/jobstat.c @@ -197,7 +197,9 @@ int edg_wll_JobStatusServer( } #if DAG_ENABLE - if (stat->jobtype == EDG_WLL_STAT_DAG || stat->jobtype == EDG_WLL_STAT_COLLECTION) { + if (stat->jobtype == EDG_WLL_STAT_DAG || + stat->jobtype == EDG_WLL_STAT_COLLECTION || + stat->jobtype == EDG_WLL_STAT_FILE_TRANSFER_COLLECTION) { // XXX: The users does not want any histogram. What do we do about it? // if ((!(flags & EDG_WLL_STAT_CHILDHIST_FAST))&&(!(flags & EDG_WLL_STAT_CHILDHIST_THOROUGH))) { /* No Histogram */ @@ -1157,6 +1159,23 @@ static edg_wll_JobStatCode process_Histogram(intJobStat *pis) return EDG_WLL_JOB_WAITING; } +static edg_wll_JobStatCode process_FT_Histogram(intJobStat *pis) +{ + if (intJobStat_to_JobStat(pis)->children_hist[class_to_statCode(SUBJOB_CLASS_RUNNING)+1] > 0) { + return EDG_WLL_JOB_RUNNING; + } + else if (intJobStat_to_JobStat(pis)->children_hist[class_to_statCode(SUBJOB_CLASS_DONE)+1] + == intJobStat_to_JobStat(pis)->children_num) { + return EDG_WLL_JOB_DONE; + } + else if (intJobStat_to_JobStat(pis)->children_hist[class_to_statCode(SUBJOB_CLASS_DONE)+1] + + intJobStat_to_JobStat(pis)->children_hist[class_to_statCode(SUBJOB_CLASS_ABORTED)+1] + == intJobStat_to_JobStat(pis)->children_num) + return EDG_WLL_JOB_ABORTED; + else + return EDG_WLL_JOB_WAITING; +} + static edg_wll_ErrorCode update_parent_status(edg_wll_Context ctx, edg_wll_JobStat *subjob_stat_old, intJobStat *cis, edg_wll_Event *ce) { intJobStat *pis = NULL; @@ -1189,6 +1208,13 @@ static edg_wll_ErrorCode update_parent_status(edg_wll_Context ctx, edg_wll_JobSt goto err; } } + else if (intJobStat_to_JobStat(pis)->jobtype == EDG_WLL_STAT_FILE_TRANSFER_COLLECTION) { + parent_new_state = process_FT_Histogram(pis); + if (intJobStat_to_JobStat(pis)->state != parent_new_state) { + if (log_collectionState_event(ctx, parent_new_state, EDG_WLL_STAT_OK, cis, pis, ce)) + goto err; + } + } } err: diff --git a/org.glite.lb.server/src/lb_html.c b/org.glite.lb.server/src/lb_html.c index 71f984d..0f47bdf 100644 --- a/org.glite.lb.server/src/lb_html.c +++ b/org.glite.lb.server/src/lb_html.c @@ -390,7 +390,7 @@ int edg_wll_FileTransferStatusToHTML(edg_wll_Context ctx UNUSED_VAR, edg_wll_Job { char *pomA = NULL, *pomB = NULL, *lbstat, *children; int pomL = 0, i; - char *chid,*chcj,*chsbt; + char *chid,*chcj,*chpar,*chsbt=NULL; children = strdup(""); @@ -402,6 +402,9 @@ int edg_wll_FileTransferStatusToHTML(edg_wll_Context ctx UNUSED_VAR, edg_wll_Job chcj = edg_wlc_JobIdUnparse(stat.ft_compute_job); TRL("Compute job", "%s", chcj, NULL); free(chcj); + chpar = edg_wlc_JobIdUnparse(stat.parent_job); + TRL("Parent job", "%s", chpar, NULL); + free(chpar); if (stat.jobtype == EDG_WLL_STAT_FILE_TRANSFER){ switch(stat.ft_sandbox_type){ @@ -409,7 +412,7 @@ int edg_wll_FileTransferStatusToHTML(edg_wll_Context ctx UNUSED_VAR, edg_wll_Job break; case EDG_WLL_STAT_OUTPUT: chsbt = strdup("OUTPUT"); break; - default: chsbt = NULL; + default: break; } TR("Sandbox type", "%s", chsbt, NULL); diff --git a/org.glite.lb.state-machine/src/process_event.c b/org.glite.lb.state-machine/src/process_event.c index 445f55f..694f9fa 100644 --- a/org.glite.lb.state-machine/src/process_event.c +++ b/org.glite.lb.state-machine/src/process_event.c @@ -930,6 +930,10 @@ static int processEvent_glite(intJobStat *js, edg_wll_Event *e, int ev_seq, int js->pub.jobtype = EDG_WLL_STAT_COLLECTION; js->pub.children_hist[EDG_WLL_JOB_UNKNOWN+1] = js->pub.children_num; break; + case EDG_WLL_REGJOB_FILE_TRANSFER_COLLECTION: + js->pub.jobtype = EDG_WLL_STAT_FILE_TRANSFER_COLLECTION; + js->pub.children_hist[EDG_WLL_JOB_UNKNOWN+1] = js->pub.children_num; + break; default: break; } diff --git a/org.glite.lb.state-machine/src/process_event_file_transfer.c b/org.glite.lb.state-machine/src/process_event_file_transfer.c index bb3d813..e4e03d5 100644 --- a/org.glite.lb.state-machine/src/process_event_file_transfer.c +++ b/org.glite.lb.state-machine/src/process_event_file_transfer.c @@ -72,7 +72,8 @@ int processEvent_FileTransfer(intJobStat *js, edg_wll_Event *e, int ev_seq, int js->pub.state = EDG_WLL_JOB_SUBMITTED; } if (USABLE_DATA(res)) { - ; + edg_wlc_JobIdFree(js->pub.parent_job); + edg_wlc_JobIdDup(e->regJob.parent, &js->pub.parent_job); } break; case EDG_WLL_EVENT_FILETRANSFERREGISTER: @@ -127,6 +128,12 @@ int processEvent_FileTransfer(intJobStat *js, edg_wll_Event *e, int ev_seq, int edg_wlc_JobIdFree(js->pub.ft_compute_job); edg_wlc_JobIdParse(e->sandbox.compute_job,&js->pub.ft_compute_job); } +#if 0 + else if (e->sandbox.transfer_job) { /* mutually exclusive with compute_job */ + edg_wlc_JobIdFree(js->pub.parent_job); + edg_wlc_JobIdParse(e->sandbox.transfer_job, &js->pub.parent_job); + } +#endif } break; default: diff --git a/org.glite.lb.state-machine/src/process_event_file_transfer_collection.c b/org.glite.lb.state-machine/src/process_event_file_transfer_collection.c index e5bb78f..95b5650 100644 --- a/org.glite.lb.state-machine/src/process_event_file_transfer_collection.c +++ b/org.glite.lb.state-machine/src/process_event_file_transfer_collection.c @@ -57,6 +57,7 @@ static int compare_timestamps(struct timeval a, struct timeval b) int processEvent_FileTransferCollection(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict, char **errstring) { edg_wll_JobStatCode old_state = js->pub.state; + edg_wll_JobStatCode new_state; int res = RET_OK; @@ -72,9 +73,32 @@ int processEvent_FileTransferCollection(intJobStat *js, edg_wll_Event *e, int ev js->pub.state = EDG_WLL_JOB_SUBMITTED; } if (USABLE_DATA(res)) { - ; + js->pub.children_num = e->regJob.nsubjobs; + js->pub.children_hist[EDG_WLL_JOB_UNKNOWN+1] = js->pub.children_num; } break; + case EDG_WLL_EVENT_SANDBOX: + if (USABLE_DATA(res)) { + if (e->sandbox.sandbox_type == EDG_WLL_SANDBOX_INPUT) + js->pub.ft_sandbox_type = EDG_WLL_STAT_INPUT; + + if (e->sandbox.sandbox_type == EDG_WLL_SANDBOX_OUTPUT) + js->pub.ft_sandbox_type = EDG_WLL_STAT_OUTPUT; + + if (e->sandbox.compute_job) { + edg_wlc_JobIdFree(js->pub.ft_compute_job); + edg_wlc_JobIdParse(e->sandbox.compute_job,&js->pub.ft_compute_job); + } + } + break; + case EDG_WLL_EVENT_COLLECTIONSTATE: + new_state = edg_wll_StringToStat(e->collectionState.state); + if (USABLE(res)) { + js->pub.state = new_state; + if (new_state == EDG_WLL_JOB_DONE) + js->pub.done_code = e->collectionState.done_code; + } + break; default: break; } -- 1.8.2.3