From dcd7b00bb5268f6c1b717223815f466523a607ba Mon Sep 17 00:00:00 2001 From: =?utf8?q?Milo=C5=A1=20Mula=C4=8D?= Date: Thu, 17 Dec 2009 15:37:07 +0000 Subject: [PATCH] basic support for file transfers --- org.glite.lb.client/Makefile | 4 ++ org.glite.lb.client/examples/aborted_ft.l | 5 ++ org.glite.lb.client/examples/done_ft.l | 4 ++ org.glite.lb.client/examples/gen_sample_job | 5 ++ org.glite.lb.client/examples/job_status.c | 20 ++++++-- org.glite.lb.client/examples/running_ft.l | 4 ++ org.glite.lb.client/examples/submitted_ft_reg.l | 3 ++ org.glite.lb.server/src/jobstat_supp.c | 13 ++++++ org.glite.lb.state-machine/interface/intjobstat.h | 2 +- org.glite.lb.state-machine/src/process_event.c | 14 ++++++ .../src/process_event_file_transfer.c | 54 ++++++++++++++++++++++ 11 files changed, 124 insertions(+), 4 deletions(-) create mode 100644 org.glite.lb.client/examples/aborted_ft.l create mode 100644 org.glite.lb.client/examples/done_ft.l create mode 100644 org.glite.lb.client/examples/running_ft.l create mode 100644 org.glite.lb.client/examples/submitted_ft_reg.l diff --git a/org.glite.lb.client/Makefile b/org.glite.lb.client/Makefile index 370c20f..2e3903c 100644 --- a/org.glite.lb.client/Makefile +++ b/org.glite.lb.client/Makefile @@ -63,6 +63,10 @@ l_SRC = \ cream_reallyrunning.l \ cream_done.l \ cream_failed.l \ + submitted_ft_reg.l \ + running_ft.l \ + done_ft.l \ + aborted_ft.l \ # TODO: missing resubmission_deep # shallow_resub_complex.l shallow_resub_simple.l shallow_resub_simple2.l \ diff --git a/org.glite.lb.client/examples/aborted_ft.l b/org.glite.lb.client/examples/aborted_ft.l new file mode 100644 index 0000000..a592735 --- /dev/null +++ b/org.glite.lb.client/examples/aborted_ft.l @@ -0,0 +1,5 @@ +# macro definition for RUNNING state + +:running_ft: +-s Application, -e FileTransfer, --result=FAIL, --reason="problem with transfer" +-s JobController,-e Abort,--reason "just to test" diff --git a/org.glite.lb.client/examples/done_ft.l b/org.glite.lb.client/examples/done_ft.l new file mode 100644 index 0000000..4a5c71d --- /dev/null +++ b/org.glite.lb.client/examples/done_ft.l @@ -0,0 +1,4 @@ +# macro definition for RUNNING state + +:running_ft: +-s Application, -e FileTransfer, --result=OK, --reason="ok" diff --git a/org.glite.lb.client/examples/gen_sample_job b/org.glite.lb.client/examples/gen_sample_job index 3f532f0..9a56d59 100755 --- a/org.glite.lb.client/examples/gen_sample_job +++ b/org.glite.lb.client/examples/gen_sample_job @@ -106,6 +106,11 @@ awk -F, $NESTED \ /-e CREAMCancel/ { logit(); next;} /-e CREAMAbort/ { logit(); next;} +/-e FileTransferRegister/ { if (checkNOP(4) == 0) logit(); + next;} +/-e FileTransfer/ { logit(); next;} +/-e Sandbox/ { logit(); next;} + # shell escape (for sequence number branching) /^!/ { print substr($0,2,(length($0) - 1)); } diff --git a/org.glite.lb.client/examples/job_status.c b/org.glite.lb.client/examples/job_status.c index bf4b33f..03760e1 100644 --- a/org.glite.lb.client/examples/job_status.c +++ b/org.glite.lb.client/examples/job_status.c @@ -201,7 +201,7 @@ static void printstat(edg_wll_JobStat stat, int level) s = edg_wll_StatToString(stat.state); /* print whole flat structure */ printf("%sstate : %s\n", ind, s); - printf("%sjobId : %s\n", ind, j1 = edg_wlc_JobIdUnparse(stat.jobId)); + printf("%sjobId : %s\n", ind, j1 = edg_wlc_JobIdUnparse(stat.jobId)); free(j1); printf("%sowner : %s\n", ind, stat.owner); switch (stat.jobtype) { case EDG_WLL_STAT_SIMPLE: @@ -309,7 +309,10 @@ static void printstat(edg_wll_JobStat stat, int level) } printf("%ssandbox_retrieved : %d\n", ind, stat.sandbox_retrieved); printf("%sjw_status : %s\n", ind, edg_wll_JWStatToString(stat.jw_status)); - + + printf("%sisb_transfer : %s\n", ind, j1 = edg_wlc_JobIdUnparse(stat.isb_transfer)); free(j1); + printf("%sosb_transfer : %s\n", ind, j1 = edg_wlc_JobIdUnparse(stat.osb_transfer)); free(j1); + /* PBS state section */ if (stat.jobtype == EDG_WLL_STAT_PBS) { printf("%spbs_state : %s\n", ind, stat.pbs_state); @@ -347,9 +350,20 @@ static void printstat(edg_wll_JobStat stat, int level) free(cream_stat_name); } + /* File Transfer section */ + printf("%sft_compute_job : %s\n", ind, j1 = edg_wlc_JobIdUnparse(stat.ft_compute_job)); free(j1); + if (stat.ft_sandbox_type == EDG_WLL_STAT_INPUT) + printf("%sft_sandbox_type : INPUT\n", ind); + else if (stat.ft_sandbox_type == EDG_WLL_STAT_OUTPUT) + printf("%sft_sandbox_type : OUTPUT\n", ind); + else + printf("%sft_sandbox_type : UNKNOWN\n", ind); + printf("%sft_src : %s\n", ind, stat.ft_src); + printf("%sft_dest : %s\n", ind, stat.ft_dest); + + printf("\n"); - free(j1); free(j2); free(s); } diff --git a/org.glite.lb.client/examples/running_ft.l b/org.glite.lb.client/examples/running_ft.l new file mode 100644 index 0000000..ad248c1 --- /dev/null +++ b/org.glite.lb.client/examples/running_ft.l @@ -0,0 +1,4 @@ +# macro definition for RUNNING state + +:submitted_ft_reg: +-s Application, -e FileTransfer, --result=START, --reason="started" diff --git a/org.glite.lb.client/examples/submitted_ft_reg.l b/org.glite.lb.client/examples/submitted_ft_reg.l new file mode 100644 index 0000000..81cb5fb --- /dev/null +++ b/org.glite.lb.client/examples/submitted_ft_reg.l @@ -0,0 +1,3 @@ +# macro definition for SUBMITTED state + +-s Application, -e FileTransferRegister, --src="muj.stroj:/data/file.tar", --dest="new_file.tar" diff --git a/org.glite.lb.server/src/jobstat_supp.c b/org.glite.lb.server/src/jobstat_supp.c index 3f71caf..ef39317 100644 --- a/org.glite.lb.server/src/jobstat_supp.c +++ b/org.glite.lb.server/src/jobstat_supp.c @@ -511,6 +511,8 @@ static char *enc_JobStat(char *old, edg_wll_JobStat* stat) if (ret) ret = enc_strlist(ret, stat->user_fqans); if (ret) ret = enc_int(ret, stat->sandbox_retrieved); if (ret) ret = enc_int(ret, stat->jw_status); + if (ret) ret = enc_jobid(ret, stat->isb_transfer); + if (ret) ret = enc_jobid(ret, stat->osb_transfer); if (ret) ret = enc_string(ret, stat->pbs_state); if (ret) ret = enc_string(ret, stat->pbs_queue); if (ret) ret = enc_string(ret, stat->pbs_owner); @@ -547,6 +549,10 @@ static char *enc_JobStat(char *old, edg_wll_JobStat* stat) if (ret) ret = enc_int(ret, stat->cream_cancelling); if (ret) ret = enc_int(ret, stat->cream_cpu_time); if (ret) ret = enc_int(ret, stat->cream_jw_status); + if (ret) ret = enc_jobid(ret, stat->ft_compute_job); + if (ret) ret = enc_int(ret, stat->ft_sandbox_type); + if (ret) ret = enc_string(ret, stat->ft_src); + if (ret) ret = enc_string(ret, stat->ft_dest); return ret; } @@ -608,6 +614,8 @@ static edg_wll_JobStat* dec_JobStat(char *in, char **rest) if (tmp_in != NULL) stat->user_fqans = dec_strlist(tmp_in, &tmp_in); if (tmp_in != NULL) stat->sandbox_retrieved = dec_int(tmp_in, &tmp_in); if (tmp_in != NULL) stat->jw_status = dec_int(tmp_in, &tmp_in); + if (tmp_in != NULL) stat->isb_transfer = dec_jobid(tmp_in, &tmp_in); + if (tmp_in != NULL) stat->osb_transfer = dec_jobid(tmp_in, &tmp_in); if (tmp_in != NULL) stat->pbs_state = dec_string(tmp_in, &tmp_in); if (tmp_in != NULL) stat->pbs_queue = dec_string(tmp_in, &tmp_in); if (tmp_in != NULL) stat->pbs_owner = dec_string(tmp_in, &tmp_in); @@ -644,6 +652,11 @@ static edg_wll_JobStat* dec_JobStat(char *in, char **rest) if (tmp_in != NULL) stat->cream_cancelling = dec_int(tmp_in, &tmp_in); if (tmp_in != NULL) stat->cream_cpu_time = dec_int(tmp_in, &tmp_in); if (tmp_in != NULL) stat->cream_jw_status = dec_int(tmp_in, &tmp_in); + if (tmp_in != NULL) stat->ft_compute_job = dec_jobid(tmp_in, &tmp_in); + if (tmp_in != NULL) stat->ft_sandbox_type = dec_int(tmp_in, &tmp_in); + if (tmp_in != NULL) stat->ft_src = dec_string(tmp_in, &tmp_in); + if (tmp_in != NULL) stat->ft_dest = dec_string(tmp_in, &tmp_in); + *rest = tmp_in; diff --git a/org.glite.lb.state-machine/interface/intjobstat.h b/org.glite.lb.state-machine/interface/intjobstat.h index bb18f2f..0684ca0 100644 --- a/org.glite.lb.state-machine/interface/intjobstat.h +++ b/org.glite.lb.state-machine/interface/intjobstat.h @@ -14,7 +14,7 @@ /* where Z.XX is version from indent + 1 (version after commit), Y = Z+1 */ /* and DESCRIPTION is short hit why version changed */ -#define INTSTAT_VERSION "revision 2.8 - undef status time" +#define INTSTAT_VERSION "revision 2.9 - file transfers" // ".... MAX LENGTH 32 BYTES !! ...." // Internal error codes diff --git a/org.glite.lb.state-machine/src/process_event.c b/org.glite.lb.state-machine/src/process_event.c index 0173701..2af3def 100644 --- a/org.glite.lb.state-machine/src/process_event.c +++ b/org.glite.lb.state-machine/src/process_event.c @@ -950,6 +950,20 @@ static int processEvent_glite(intJobStat *js, edg_wll_Event *e, int ev_seq, int js->pub.done_code = e->collectionState.done_code; } break; + case EDG_WLL_EVENT_SANDBOX: + if (USABLE_DATA(res, strict)) { + if ((e->sandbox.variant == EDG_WLL_SANDBOX_INPUT) && e->sandbox.transfer_job) { + edg_wlc_JobIdFree(js->pub.isb_transfer); + edg_wlc_JobIdParse(e->sandbox.transfer_job,&js->pub.isb_transfer); + } + + if ((e->sandbox.variant == EDG_WLL_SANDBOX_OUTPUT) && e->sandbox.transfer_job) { + edg_wlc_JobIdFree(js->pub.osb_transfer); + edg_wlc_JobIdParse(e->sandbox.transfer_job,&js->pub.osb_transfer); + } + } + break; + default: goto bad_event; break; diff --git a/org.glite.lb.state-machine/src/process_event_file_transfer.c b/org.glite.lb.state-machine/src/process_event_file_transfer.c index 494cbec..144c3b5 100644 --- a/org.glite.lb.state-machine/src/process_event_file_transfer.c +++ b/org.glite.lb.state-machine/src/process_event_file_transfer.c @@ -58,6 +58,60 @@ int processEvent_FileTransfer(intJobStat *js, edg_wll_Event *e, int ev_seq, int ; } break; + case EDG_WLL_EVENT_FILETRANSFERREGISTER: + if (USABLE(res)) { + ; + } + if (USABLE_DATA(res)) { + rep(js->pub.ft_src, e->fileTransferRegister.src); + rep(js->pub.ft_dest, e->fileTransferRegister.dest); + } + break; + case EDG_WLL_EVENT_FILETRANSFER: + if (USABLE(res)) { + switch (e->fileTransfer.result) { + case EDG_WLL_FILETRANSFER_START: + js->pub.state = EDG_WLL_JOB_RUNNING; + break; + case EDG_WLL_FILETRANSFER_OK: + js->pub.state = EDG_WLL_JOB_DONE; + js->pub.done_code = EDG_WLL_STAT_OK; + break; + case EDG_WLL_FILETRANSFER_FAIL: + js->pub.state = EDG_WLL_JOB_DONE; + js->pub.done_code = EDG_WLL_STAT_FAILED; + rep(js->pub.failure_reasons, e->fileTransfer.reason); + break; + default: + break; + } + } + if (USABLE_DATA(res)) { + ; + } + break; + case EDG_WLL_EVENT_ABORT: + if (USABLE(res)) { + js->pub.state = EDG_WLL_JOB_ABORTED; + js->pub.remove_from_proxy = 1; + rep(js->pub.reason, e->abort.reason); + rep(js->pub.location, "none"); + } + break; + case EDG_WLL_EVENT_SANDBOX: + if (USABLE_DATA(res)) { + if (e->sandbox.variant == EDG_WLL_SANDBOX_INPUT) + js->pub.ft_sandbox_type = EDG_WLL_STAT_INPUT; + + if (e->sandbox.variant == EDG_WLL_SANDBOX_OUTPUT) + js->pub.ft_sandbox_type = EDG_WLL_STAT_OUTPUT; + + if (e->sandbox.compute_job) { + edg_wlc_JobIdFree(js->pub.ft_compute_job); + edg_wlc_JobIdParse(e->sandbox.compute_job,&js->pub.ft_compute_job); + } + } + break; default: break; } -- 1.8.2.3