file trasfer collections -- initial implementation
authorJiří Filipovič <fila@ics.muni.cz>
Tue, 22 Feb 2011 09:38:45 +0000 (09:38 +0000)
committerJiří Filipovič <fila@ics.muni.cz>
Tue, 22 Feb 2011 09:38:45 +0000 (09:38 +0000)
org.glite.lb.client/Makefile
org.glite.lb.client/interface/producer.h.T
org.glite.lb.client/src/producer.c
org.glite.lb.server/src/db_store.c
org.glite.lb.server/src/jobstat.c
org.glite.lb.server/src/lb_html.c
org.glite.lb.state-machine/src/process_event.c
org.glite.lb.state-machine/src/process_event_file_transfer.c
org.glite.lb.state-machine/src/process_event_file_transfer_collection.c

index 3955278..43b8dce 100644 (file)
@@ -157,7 +157,7 @@ endif
 lib_LTLIBRARIES=${c_LIBRARIES} ${plus_LIBRARIES}
 
 TOOLS:=${LB_PERF_TOOLS}
-EXAMPLES_SRC:=log_usertag_proxy.c job_log.c job_reg.c feed_shark.c query_ext.c query_seq_code.c stats.c stats-duration-fromto.c abort_job.c change_acl.c stresslog.c flood_proxy.c dagids.c stress_context.c parse_eventsfile.c test_changed_jdl.c done_failed_events.c
+EXAMPLES_SRC:=log_usertag_proxy.c job_log.c job_reg.c feed_shark.c query_ext.c query_seq_code.c stats.c stats-duration-fromto.c abort_job.c change_acl.c stresslog.c flood_proxy.c dagids.c stress_context.c parse_eventsfile.c test_changed_jdl.c done_failed_events.c sandbox_collection_reg_dummy.c
 EXAMPLES:=${EXAMPLES_SRC:.c=}
 
 # TODO: migrate them here from branch_RC31_3
index 05648c5..91177aa 100644 (file)
@@ -361,6 +361,22 @@ extern int edg_wll_RegisterSubjobsProxy(
        glite_jobid_t const *   subjobs
 );
 
+/**
+ * Register file transfer subjobs in a batch.
+ * \param[in,out] context       context to work with
+ * \param[in] parent            parent's jobId
+ * \param[in] ns                network server contact
+ * \param[in] nsubjobs         number of subjobs
+ * \param[in] subjobs           array of jobid's in the same order
+ */
+
+extern int edg_wll_RegisterFTSubjobs(
+        edg_wll_Context         ctx,
+        glite_jobid_const_t     parent,
+        const char *            ns,
+        int                     nsubjobs,
+        edg_wlc_JobId const *   subjobs
+);
 
 /**
  * Change ACL for given job.
index 9f31eb8..1f09a71 100644 (file)
@@ -833,7 +833,8 @@ static int edg_wll_RegisterJobMaster(
        }
        if ((type == EDG_WLL_REGJOB_DAG || 
             type == EDG_WLL_REGJOB_PARTITIONED ||
-            type == EDG_WLL_REGJOB_COLLECTION)
+            type == EDG_WLL_REGJOB_COLLECTION ||
+            type == EDG_WLL_REGJOB_FILE_TRANSFER_COLLECTION)
                && num_subjobs > 0) {
                err = edg_wll_GenerateSubjobIds(ctx,job,num_subjobs,seed,subjobs);
                edg_wll_SetSequenceCode(ctx, NULL, EDG_WLL_SEQ_NORMAL);
@@ -1039,8 +1040,10 @@ static int edg_wll_RegisterSubjobsMaster(
        edg_wll_Context         ctx,
        int                     logging_flags,
        glite_jobid_const_t     parent,
+       enum edg_wll_RegJobJobtype      type,
        char const * const *    jdls, 
        const char *            ns, 
+       int                     nsubjobs,
        edg_wlc_JobId const *   subjobs)
 {
        char const * const      *pjdl;
@@ -1049,6 +1052,7 @@ static int edg_wll_RegisterSubjobsMaster(
        char *                  oldctxseq;
        int                     errcode = 0;
        char *                  errdesc = NULL;
+       int                     i;
 
        if (edg_wll_GetLoggingJob(ctx, &oldctxjob)) return edg_wll_Error(ctx, NULL, NULL);
        oldctxseq = edg_wll_GetSequenceCode(ctx);
@@ -1056,14 +1060,30 @@ static int edg_wll_RegisterSubjobsMaster(
        pjdl = jdls;
        psubjob = subjobs;
        
-       while (*pjdl != NULL) {
-               if (edg_wll_RegisterJobMaster(ctx, logging_flags,
-                       *psubjob, EDG_WLL_REGJOB_SIMPLE, *pjdl,
-                       ns, parent, 0, NULL, NULL, NULL) != 0) {
-                       errcode = edg_wll_Error(ctx, NULL, &errdesc);
-                       goto edg_wll_registersubjobsmaster_end;
+       if (type == EDG_WLL_REGJOB_SIMPLE)
+               while (*pjdl != NULL) {
+                       if (edg_wll_RegisterJobMaster(ctx, logging_flags,
+                               *psubjob, EDG_WLL_REGJOB_SIMPLE, *pjdl,
+                               ns, parent, 0, NULL, NULL, NULL) != 0) {
+                               errcode = edg_wll_Error(ctx, NULL, &errdesc);
+                               goto edg_wll_registersubjobsmaster_end;
+                       }
+                       pjdl++; psubjob++;
                }
-               pjdl++; psubjob++;
+       else if (type == EDG_WLL_REGJOB_FILE_TRANSFER)
+               for (i = 0; i < nsubjobs; i++){
+                       if (edg_wll_RegisterJobMaster(ctx, logging_flags,
+                               *psubjob, EDG_WLL_REGJOB_FILE_TRANSFER, NULL,
+                               ns, parent, 0, NULL, NULL, NULL) != 0) {
+                               errcode = edg_wll_Error(ctx, NULL, &errdesc);
+                               goto edg_wll_registersubjobsmaster_end;
+                       }
+                       psubjob++;
+               }       
+       else {
+               errcode = 1;
+               errdesc = strdup("Unsupported job type.");
+               goto edg_wll_registersubjobsmaster_end;
        }
 
 edg_wll_registersubjobsmaster_end:
@@ -1090,7 +1110,7 @@ int edg_wll_RegisterSubjobs(
         edg_wlc_JobId const *   subjobs)
 {
        return edg_wll_RegisterSubjobsMaster(ctx,EDG_WLL_LOGFLAG_LOCAL,
-               parent, jdls, ns, subjobs);
+               parent, EDG_WLL_REGJOB_SIMPLE, jdls, ns, 0, subjobs);
 }
 
 /**
@@ -1107,8 +1127,19 @@ int edg_wll_RegisterSubjobsProxy(
         edg_wlc_JobId const *   subjobs)
 {
        return edg_wll_RegisterSubjobsMaster(ctx,EDG_WLL_LOGFLAG_PROXY,
-               parent, jdls, ns, subjobs);
+               parent, EDG_WLL_REGJOB_SIMPLE, jdls, ns, 0, subjobs);
+
+}
 
+int edg_wll_RegisterFTSubjobs(
+        edg_wll_Context         ctx,
+        glite_jobid_const_t     parent,
+        const char *            ns,
+       int                     nsubjobs,
+        edg_wlc_JobId const *   subjobs)
+{
+        return edg_wll_RegisterSubjobsMaster(ctx,EDG_WLL_LOGFLAG_LOCAL,
+                parent, EDG_WLL_REGJOB_FILE_TRANSFER, NULL, ns, nsubjobs, subjobs);
 }
 
 /**
index 60cd29f..7a834a6 100644 (file)
@@ -132,7 +132,8 @@ db_store(edg_wll_Context ctx, char *event)
        if (ev->any.type == EDG_WLL_EVENT_REGJOB &&
                (ev->regJob.jobtype == EDG_WLL_REGJOB_DAG ||
                 ev->regJob.jobtype == EDG_WLL_REGJOB_PARTITIONED ||
-                ev->regJob.jobtype == EDG_WLL_REGJOB_COLLECTION) &&
+                ev->regJob.jobtype == EDG_WLL_REGJOB_COLLECTION /*||
+                ev->regJob.jobtype == EDG_WLL_REGJOB_FILE_TRANSFER_COLLECTION*/) &&
                ev->regJob.nsubjobs > 0) { 
 
                        if (register_subjobs_embryonic(ctx,&ev->regJob)) goto rollback;
index 47d684a..c62b2d4 100644 (file)
@@ -197,7 +197,9 @@ int edg_wll_JobStatusServer(
                }
 
        #if DAG_ENABLE
-               if (stat->jobtype == EDG_WLL_STAT_DAG || stat->jobtype == EDG_WLL_STAT_COLLECTION) {
+               if (stat->jobtype == EDG_WLL_STAT_DAG || 
+                       stat->jobtype == EDG_WLL_STAT_COLLECTION || 
+                       stat->jobtype == EDG_WLL_STAT_FILE_TRANSFER_COLLECTION) {
 
        //      XXX: The users does not want any histogram. What do we do about it? 
        //              if ((!(flags & EDG_WLL_STAT_CHILDHIST_FAST))&&(!(flags & EDG_WLL_STAT_CHILDHIST_THOROUGH))) { /* No Histogram */
@@ -1157,6 +1159,23 @@ static edg_wll_JobStatCode process_Histogram(intJobStat *pis)
                return EDG_WLL_JOB_WAITING;
 }
 
+static edg_wll_JobStatCode process_FT_Histogram(intJobStat *pis)
+{
+       if (intJobStat_to_JobStat(pis)->children_hist[class_to_statCode(SUBJOB_CLASS_RUNNING)+1] > 0) {
+                return EDG_WLL_JOB_RUNNING;
+        }
+       else if (intJobStat_to_JobStat(pis)->children_hist[class_to_statCode(SUBJOB_CLASS_DONE)+1] 
+               == intJobStat_to_JobStat(pis)->children_num) {
+               return EDG_WLL_JOB_DONE;
+       }
+       else if (intJobStat_to_JobStat(pis)->children_hist[class_to_statCode(SUBJOB_CLASS_DONE)+1]
+               + intJobStat_to_JobStat(pis)->children_hist[class_to_statCode(SUBJOB_CLASS_ABORTED)+1]
+               == intJobStat_to_JobStat(pis)->children_num)
+               return EDG_WLL_JOB_ABORTED;
+       else
+               return EDG_WLL_JOB_WAITING;
+}
+
 static edg_wll_ErrorCode update_parent_status(edg_wll_Context ctx, edg_wll_JobStat *subjob_stat_old, intJobStat *cis, edg_wll_Event *ce)
 {
        intJobStat              *pis = NULL;
@@ -1189,6 +1208,13 @@ static edg_wll_ErrorCode update_parent_status(edg_wll_Context ctx, edg_wll_JobSt
                                        goto err;
                        }
                }
+               else if (intJobStat_to_JobStat(pis)->jobtype == EDG_WLL_STAT_FILE_TRANSFER_COLLECTION) {
+                       parent_new_state = process_FT_Histogram(pis);
+                       if (intJobStat_to_JobStat(pis)->state != parent_new_state) {
+                               if (log_collectionState_event(ctx, parent_new_state, EDG_WLL_STAT_OK, cis, pis, ce))
+                                        goto err;
+                       }
+               }
        }
 
 err:
index 71f984d..0f47bdf 100644 (file)
@@ -390,7 +390,7 @@ int edg_wll_FileTransferStatusToHTML(edg_wll_Context ctx UNUSED_VAR, edg_wll_Job
 {
         char *pomA = NULL, *pomB = NULL, *lbstat, *children;
         int pomL = 0, i;
-        char    *chid,*chcj,*chsbt;
+        char    *chid,*chcj,*chpar,*chsbt=NULL;
 
        children = strdup("");
 
@@ -402,6 +402,9 @@ int edg_wll_FileTransferStatusToHTML(edg_wll_Context ctx UNUSED_VAR, edg_wll_Job
        chcj = edg_wlc_JobIdUnparse(stat.ft_compute_job);
        TRL("Compute job", "%s", chcj, NULL);
        free(chcj);
+       chpar = edg_wlc_JobIdUnparse(stat.parent_job);
+        TRL("Parent job", "%s", chpar, NULL);
+        free(chpar);
 
        if (stat.jobtype == EDG_WLL_STAT_FILE_TRANSFER){
                switch(stat.ft_sandbox_type){
@@ -409,7 +412,7 @@ int edg_wll_FileTransferStatusToHTML(edg_wll_Context ctx UNUSED_VAR, edg_wll_Job
                                break;
                        case EDG_WLL_STAT_OUTPUT: chsbt = strdup("OUTPUT");
                                break;
-                       default: chsbt = NULL;
+                       default:
                                break;
                }
                TR("Sandbox type", "%s", chsbt, NULL);
index 445f55f..694f9fa 100644 (file)
@@ -930,6 +930,10 @@ static int processEvent_glite(intJobStat *js, edg_wll_Event *e, int ev_seq, int
                                                js->pub.jobtype = EDG_WLL_STAT_COLLECTION;
                                                js->pub.children_hist[EDG_WLL_JOB_UNKNOWN+1] = js->pub.children_num;
                                                break;
+                                       case EDG_WLL_REGJOB_FILE_TRANSFER_COLLECTION:
+                                               js->pub.jobtype = EDG_WLL_STAT_FILE_TRANSFER_COLLECTION;
+                                               js->pub.children_hist[EDG_WLL_JOB_UNKNOWN+1] = js->pub.children_num;
+                                               break;
                                        default:
                                                break;
                                }
index bb3d813..e4e03d5 100644 (file)
@@ -72,7 +72,8 @@ int processEvent_FileTransfer(intJobStat *js, edg_wll_Event *e, int ev_seq, int
                                js->pub.state = EDG_WLL_JOB_SUBMITTED;
                        }
                        if (USABLE_DATA(res)) {
-                               ;
+                               edg_wlc_JobIdFree(js->pub.parent_job);
+                               edg_wlc_JobIdDup(e->regJob.parent, &js->pub.parent_job);
                        }
                        break;
                case EDG_WLL_EVENT_FILETRANSFERREGISTER:
@@ -127,6 +128,12 @@ int processEvent_FileTransfer(intJobStat *js, edg_wll_Event *e, int ev_seq, int
                                        edg_wlc_JobIdFree(js->pub.ft_compute_job);
                                        edg_wlc_JobIdParse(e->sandbox.compute_job,&js->pub.ft_compute_job);
                                }
+#if 0
+                               else if (e->sandbox.transfer_job) { /* mutually exclusive with compute_job */
+                                       edg_wlc_JobIdFree(js->pub.parent_job);
+                                       edg_wlc_JobIdParse(e->sandbox.transfer_job, &js->pub.parent_job);
+                               }
+#endif
                        }
                        break;
                default:
index e5bb78f..95b5650 100644 (file)
@@ -57,6 +57,7 @@ static int compare_timestamps(struct timeval a, struct timeval b)
 int processEvent_FileTransferCollection(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict, char **errstring)
 {
        edg_wll_JobStatCode     old_state = js->pub.state;
+       edg_wll_JobStatCode     new_state;
        int                     res = RET_OK;
 
 
@@ -72,9 +73,32 @@ int processEvent_FileTransferCollection(intJobStat *js, edg_wll_Event *e, int ev
                                js->pub.state = EDG_WLL_JOB_SUBMITTED;
                        }
                        if (USABLE_DATA(res)) {
-                               ;
+                               js->pub.children_num = e->regJob.nsubjobs;
+                               js->pub.children_hist[EDG_WLL_JOB_UNKNOWN+1] = js->pub.children_num;
                        }
                        break;
+               case EDG_WLL_EVENT_SANDBOX:
+                        if (USABLE_DATA(res)) {
+                                if (e->sandbox.sandbox_type == EDG_WLL_SANDBOX_INPUT)
+                                        js->pub.ft_sandbox_type = EDG_WLL_STAT_INPUT;
+
+                                if (e->sandbox.sandbox_type == EDG_WLL_SANDBOX_OUTPUT)
+                                        js->pub.ft_sandbox_type = EDG_WLL_STAT_OUTPUT;
+
+                                if (e->sandbox.compute_job) {
+                                        edg_wlc_JobIdFree(js->pub.ft_compute_job);
+                                        edg_wlc_JobIdParse(e->sandbox.compute_job,&js->pub.ft_compute_job);
+                                }
+                        }
+                        break;
+               case EDG_WLL_EVENT_COLLECTIONSTATE:
+                        new_state = edg_wll_StringToStat(e->collectionState.state);
+                        if (USABLE(res)) {
+                                js->pub.state = new_state;
+                                if (new_state == EDG_WLL_JOB_DONE)
+                                        js->pub.done_code = e->collectionState.done_code;
+                        }
+                        break;
                default:
                        break;
        }