basic support for file transfers
authorMiloš Mulač <mulac@civ.zcu.cz>
Thu, 17 Dec 2009 15:37:07 +0000 (15:37 +0000)
committerMiloš Mulač <mulac@civ.zcu.cz>
Thu, 17 Dec 2009 15:37:07 +0000 (15:37 +0000)
org.glite.lb.client/Makefile
org.glite.lb.client/examples/aborted_ft.l [new file with mode: 0644]
org.glite.lb.client/examples/done_ft.l [new file with mode: 0644]
org.glite.lb.client/examples/gen_sample_job
org.glite.lb.client/examples/job_status.c
org.glite.lb.client/examples/running_ft.l [new file with mode: 0644]
org.glite.lb.client/examples/submitted_ft_reg.l [new file with mode: 0644]
org.glite.lb.server/src/jobstat_supp.c
org.glite.lb.state-machine/interface/intjobstat.h
org.glite.lb.state-machine/src/process_event.c
org.glite.lb.state-machine/src/process_event_file_transfer.c

index 370c20f..2e3903c 100644 (file)
@@ -63,6 +63,10 @@ l_SRC = \
        cream_reallyrunning.l \
        cream_done.l \
        cream_failed.l \
+       submitted_ft_reg.l \
+       running_ft.l \
+       done_ft.l \
+       aborted_ft.l \
 
 # TODO: missing resubmission_deep
 #      shallow_resub_complex.l shallow_resub_simple.l shallow_resub_simple2.l \
diff --git a/org.glite.lb.client/examples/aborted_ft.l b/org.glite.lb.client/examples/aborted_ft.l
new file mode 100644 (file)
index 0000000..a592735
--- /dev/null
@@ -0,0 +1,5 @@
+# macro definition for RUNNING state
+
+:running_ft:
+-s Application, -e FileTransfer, --result=FAIL, --reason="problem with transfer"
+-s JobController,-e Abort,--reason "just to test"
diff --git a/org.glite.lb.client/examples/done_ft.l b/org.glite.lb.client/examples/done_ft.l
new file mode 100644 (file)
index 0000000..4a5c71d
--- /dev/null
@@ -0,0 +1,4 @@
+# macro definition for RUNNING state
+
+:running_ft:
+-s Application, -e FileTransfer, --result=OK, --reason="ok"
index 3f532f0..9a56d59 100755 (executable)
@@ -106,6 +106,11 @@ awk -F, $NESTED \
 /-e CREAMCancel/       { logit(); next;}
 /-e CREAMAbort/        { logit(); next;}
 
+/-e FileTransferRegister/ {  if (checkNOP(4) == 0) logit();
+                       next;}
+/-e FileTransfer/ { logit(); next;}
+/-e Sandbox/ { logit(); next;}
+
 # shell escape (for sequence number branching)
 
 /^!/           { print substr($0,2,(length($0) - 1)); }
index bf4b33f..03760e1 100644 (file)
@@ -201,7 +201,7 @@ static void printstat(edg_wll_JobStat stat, int level)
        s = edg_wll_StatToString(stat.state); 
 /* print whole flat structure */
        printf("%sstate : %s\n", ind, s);
-       printf("%sjobId : %s\n", ind, j1 = edg_wlc_JobIdUnparse(stat.jobId));
+       printf("%sjobId : %s\n", ind, j1 = edg_wlc_JobIdUnparse(stat.jobId)); free(j1);
        printf("%sowner : %s\n", ind, stat.owner);
        switch (stat.jobtype) {
                case EDG_WLL_STAT_SIMPLE:
@@ -309,7 +309,10 @@ static void printstat(edg_wll_JobStat stat, int level)
         }
        printf("%ssandbox_retrieved : %d\n", ind, stat.sandbox_retrieved);
        printf("%sjw_status : %s\n", ind, edg_wll_JWStatToString(stat.jw_status));
-
+       
+       printf("%sisb_transfer : %s\n", ind, j1 = edg_wlc_JobIdUnparse(stat.isb_transfer)); free(j1);
+       printf("%sosb_transfer : %s\n", ind, j1 = edg_wlc_JobIdUnparse(stat.osb_transfer)); free(j1);
+       
        /* PBS state section */
        if (stat.jobtype == EDG_WLL_STAT_PBS) {
                printf("%spbs_state : %s\n", ind, stat.pbs_state);
@@ -347,9 +350,20 @@ static void printstat(edg_wll_JobStat stat, int level)
                free(cream_stat_name);
        }
 
+       /* File Transfer section */
+       printf("%sft_compute_job : %s\n", ind, j1 = edg_wlc_JobIdUnparse(stat.ft_compute_job)); free(j1);
+       if (stat.ft_sandbox_type == EDG_WLL_STAT_INPUT)
+               printf("%sft_sandbox_type : INPUT\n", ind);
+       else  if (stat.ft_sandbox_type == EDG_WLL_STAT_OUTPUT)
+               printf("%sft_sandbox_type : OUTPUT\n", ind);
+       else
+               printf("%sft_sandbox_type : UNKNOWN\n", ind);
+       printf("%sft_src : %s\n", ind, stat.ft_src);
+       printf("%sft_dest : %s\n", ind, stat.ft_dest);
+       
+
        printf("\n");   
        
-       free(j1);
        free(j2);
        free(s);
 }
diff --git a/org.glite.lb.client/examples/running_ft.l b/org.glite.lb.client/examples/running_ft.l
new file mode 100644 (file)
index 0000000..ad248c1
--- /dev/null
@@ -0,0 +1,4 @@
+# macro definition for RUNNING state
+
+:submitted_ft_reg:
+-s Application, -e FileTransfer, --result=START, --reason="started"
diff --git a/org.glite.lb.client/examples/submitted_ft_reg.l b/org.glite.lb.client/examples/submitted_ft_reg.l
new file mode 100644 (file)
index 0000000..81cb5fb
--- /dev/null
@@ -0,0 +1,3 @@
+# macro definition for SUBMITTED state
+
+-s Application, -e FileTransferRegister, --src="muj.stroj:/data/file.tar", --dest="new_file.tar"
index 3f71caf..ef39317 100644 (file)
@@ -511,6 +511,8 @@ static char *enc_JobStat(char *old, edg_wll_JobStat* stat)
        if (ret) ret = enc_strlist(ret, stat->user_fqans);
        if (ret) ret = enc_int(ret, stat->sandbox_retrieved);
        if (ret) ret = enc_int(ret, stat->jw_status);
+       if (ret) ret = enc_jobid(ret, stat->isb_transfer);
+       if (ret) ret = enc_jobid(ret, stat->osb_transfer);
        if (ret) ret = enc_string(ret, stat->pbs_state);
        if (ret) ret = enc_string(ret, stat->pbs_queue);
        if (ret) ret = enc_string(ret, stat->pbs_owner);
@@ -547,6 +549,10 @@ static char *enc_JobStat(char *old, edg_wll_JobStat* stat)
        if (ret) ret = enc_int(ret, stat->cream_cancelling);
        if (ret) ret = enc_int(ret, stat->cream_cpu_time);
        if (ret) ret = enc_int(ret, stat->cream_jw_status);
+       if (ret) ret = enc_jobid(ret, stat->ft_compute_job);
+       if (ret) ret = enc_int(ret, stat->ft_sandbox_type);
+       if (ret) ret = enc_string(ret, stat->ft_src);
+       if (ret) ret = enc_string(ret, stat->ft_dest);
 
        return ret;
 }
@@ -608,6 +614,8 @@ static edg_wll_JobStat* dec_JobStat(char *in, char **rest)
         if (tmp_in != NULL) stat->user_fqans = dec_strlist(tmp_in, &tmp_in);
         if (tmp_in != NULL) stat->sandbox_retrieved = dec_int(tmp_in, &tmp_in);
         if (tmp_in != NULL) stat->jw_status = dec_int(tmp_in, &tmp_in);
+        if (tmp_in != NULL) stat->isb_transfer = dec_jobid(tmp_in, &tmp_in);
+        if (tmp_in != NULL) stat->osb_transfer = dec_jobid(tmp_in, &tmp_in);
         if (tmp_in != NULL) stat->pbs_state = dec_string(tmp_in, &tmp_in);
         if (tmp_in != NULL) stat->pbs_queue = dec_string(tmp_in, &tmp_in);
         if (tmp_in != NULL) stat->pbs_owner = dec_string(tmp_in, &tmp_in);
@@ -644,6 +652,11 @@ static edg_wll_JobStat* dec_JobStat(char *in, char **rest)
        if (tmp_in != NULL) stat->cream_cancelling = dec_int(tmp_in, &tmp_in);
        if (tmp_in != NULL) stat->cream_cpu_time = dec_int(tmp_in, &tmp_in);
        if (tmp_in != NULL) stat->cream_jw_status = dec_int(tmp_in, &tmp_in);
+        if (tmp_in != NULL) stat->ft_compute_job = dec_jobid(tmp_in, &tmp_in);
+       if (tmp_in != NULL) stat->ft_sandbox_type = dec_int(tmp_in, &tmp_in);
+       if (tmp_in != NULL) stat->ft_src = dec_string(tmp_in, &tmp_in);
+       if (tmp_in != NULL) stat->ft_dest = dec_string(tmp_in, &tmp_in);
+       
 
        *rest = tmp_in;
 
index bb18f2f..0684ca0 100644 (file)
@@ -14,7 +14,7 @@
 /* where Z.XX is version from indent + 1 (version after commit), Y = Z+1 */
 /* and DESCRIPTION is short hit why version changed            */
 
-#define INTSTAT_VERSION "revision 2.8 - undef status time"
+#define INTSTAT_VERSION "revision 2.9 - file transfers"
 //                      ".... MAX LENGTH 32 BYTES !! ...."
 
 // Internal error codes 
index 0173701..2af3def 100644 (file)
@@ -950,6 +950,20 @@ static int processEvent_glite(intJobStat *js, edg_wll_Event *e, int ev_seq, int
                                        js->pub.done_code = e->collectionState.done_code;
                        }
                        break;
+               case EDG_WLL_EVENT_SANDBOX:
+                       if (USABLE_DATA(res, strict)) {
+                               if ((e->sandbox.variant == EDG_WLL_SANDBOX_INPUT) && e->sandbox.transfer_job) {
+                                       edg_wlc_JobIdFree(js->pub.isb_transfer);
+                                       edg_wlc_JobIdParse(e->sandbox.transfer_job,&js->pub.isb_transfer);
+                               }
+
+                               if ((e->sandbox.variant == EDG_WLL_SANDBOX_OUTPUT) && e->sandbox.transfer_job) {
+                                       edg_wlc_JobIdFree(js->pub.osb_transfer);
+                                       edg_wlc_JobIdParse(e->sandbox.transfer_job,&js->pub.osb_transfer);
+                               }
+                       }
+                       break;
+
                default:
                        goto bad_event;
                        break;
index 494cbec..144c3b5 100644 (file)
@@ -58,6 +58,60 @@ int processEvent_FileTransfer(intJobStat *js, edg_wll_Event *e, int ev_seq, int
                                ;
                        }
                        break;
+               case EDG_WLL_EVENT_FILETRANSFERREGISTER:
+                       if (USABLE(res)) {
+                               ;
+                       }
+                       if (USABLE_DATA(res)) {
+                               rep(js->pub.ft_src, e->fileTransferRegister.src);
+                               rep(js->pub.ft_dest, e->fileTransferRegister.dest);
+                       }
+                       break;
+               case EDG_WLL_EVENT_FILETRANSFER:
+                       if (USABLE(res)) {
+                               switch (e->fileTransfer.result) {
+                                       case EDG_WLL_FILETRANSFER_START:
+                                               js->pub.state = EDG_WLL_JOB_RUNNING;
+                                               break;
+                                       case EDG_WLL_FILETRANSFER_OK:
+                                               js->pub.state = EDG_WLL_JOB_DONE;
+                                               js->pub.done_code = EDG_WLL_STAT_OK;
+                                               break;
+                                       case EDG_WLL_FILETRANSFER_FAIL:
+                                               js->pub.state = EDG_WLL_JOB_DONE;
+                                               js->pub.done_code = EDG_WLL_STAT_FAILED;
+                                               rep(js->pub.failure_reasons, e->fileTransfer.reason);
+                                               break;
+                                       default:
+                                               break;
+                               }
+                       }
+                       if (USABLE_DATA(res)) {
+                               ;
+                       }
+                       break;
+               case EDG_WLL_EVENT_ABORT:
+                       if (USABLE(res)) {
+                               js->pub.state = EDG_WLL_JOB_ABORTED;
+                               js->pub.remove_from_proxy = 1;
+                               rep(js->pub.reason, e->abort.reason);
+                               rep(js->pub.location, "none");
+                       }
+                       break;
+               case EDG_WLL_EVENT_SANDBOX:
+                       if (USABLE_DATA(res)) {
+                               if (e->sandbox.variant == EDG_WLL_SANDBOX_INPUT)
+                                       js->pub.ft_sandbox_type = EDG_WLL_STAT_INPUT;
+
+                               if (e->sandbox.variant == EDG_WLL_SANDBOX_OUTPUT)
+                                       js->pub.ft_sandbox_type = EDG_WLL_STAT_OUTPUT;
+
+                               if (e->sandbox.compute_job) {
+                                       edg_wlc_JobIdFree(js->pub.ft_compute_job);
+                                       edg_wlc_JobIdParse(e->sandbox.compute_job,&js->pub.ft_compute_job);
+                               }
+                       }
+                       break;
                default:
                        break;
        }