framework for supporting general workflows
authorAleš Křenek <ljocha@ics.muni.cz>
Mon, 7 Mar 2011 10:33:56 +0000 (10:33 +0000)
committerAleš Křenek <ljocha@ics.muni.cz>
Mon, 7 Mar 2011 10:33:56 +0000 (10:33 +0000)
org.glite.lb.client/examples/job_reg.c
org.glite.lb.client/examples/pbs_running.sh [new file with mode: 0644]
org.glite.lb.client/src/EventAttrNames.pl
org.glite.lb.client/src/StatusAttrNames.pl
org.glite.lb.client/src/logevent.c.T
org.glite.lb.state-machine/Makefile
org.glite.lb.state-machine/src/process_event.c
org.glite.lb.types/events.T
org.glite.lb.types/status.T

index 6cdd141..4fa8ed1 100644 (file)
@@ -35,14 +35,14 @@ extern int opterr,optind;
 
 static void usage(char *me)
 {
-       fprintf(stderr,"usage: %s [-m bkserver] [-x|-X non-default_sock_path] [-j dg_jobid] [-s source_id] [-n num_subjobs [-S][-C]] [-P] [-l jdl_file] [-e seed] [-E]\n", me);
+       fprintf(stderr,"usage: %s [-m bkserver] [-x|-X non-default_sock_path] [-j dg_jobid] [-s source_id] [-n num_subjobs [-S][-C]] [-P] [-W] [-l jdl_file] [-e seed] [-E]\n", me);
 }
 
 int main(int argc, char *argv[])
 {
        char *src = NULL,*job = NULL,*server = NULL,*seq,*jdl = NULL, *seed = NULL;
        int lbproxy = 0;
-       int done = 0,num_subjobs = 0,reg_subjobs = 0,i, collection = 0, pbs=0, cream=0, type, flags=0;
+       int done = 0,num_subjobs = 0,reg_subjobs = 0,i, collection = 0, pbs=0, cream=0, type, flags=0, workflow = 0;
        edg_wll_Context ctx;
        edg_wlc_JobId   jobid,*subjobs;
 
@@ -51,7 +51,7 @@ int main(int argc, char *argv[])
        opterr = 0;
 
        do {
-               switch (getopt(argc,argv,"xX:s:j:m:n:SCl:e:PcE")) {
+               switch (getopt(argc,argv,"xX:s:j:m:n:SCl:e:PcEW")) {
                        case 'x': lbproxy = 1; break;
                        case 'X': lbproxy = 1; 
                                  edg_wll_SetParam(ctx, EDG_WLL_PARAM_LBPROXY_STORE_SOCK, optarg);
@@ -67,6 +67,7 @@ int main(int argc, char *argv[])
                        case 'l': jdl = (char *) strdup(optarg); break;
                        case 'e': seed = strdup(optarg); break;
                        case 'E': flags = flags | EDG_WLL_LOGLFLAG_EXCL; break;
+                       case 'W': workflow = 1; break;
                        case '?': usage(argv[0]); exit(EINVAL);
                        case -1: done = 1; break;
                }
@@ -119,13 +120,15 @@ int main(int argc, char *argv[])
 
        edg_wll_SetParam(ctx,EDG_WLL_PARAM_SOURCE,edg_wll_StringToSource(src));
 
-       type = pbs ? EDG_WLL_REGJOB_PBS
+       type = workflow ? EDG_WLL_REGJOB_WORKFLOW : 
+               (pbs ? EDG_WLL_REGJOB_PBS
                : (cream ? EDG_WLL_REGJOB_CREAM
                        : (num_subjobs ?
                                (collection?EDG_WLL_REGJOB_COLLECTION:EDG_WLL_REGJOB_DAG)
                                :EDG_WLL_REGJOB_SIMPLE
                          )
-                 );
+                 )
+               );
 
        if (lbproxy) {
                if (edg_wll_RegisterJobProxy(ctx,jobid,type,
diff --git a/org.glite.lb.client/examples/pbs_running.sh b/org.glite.lb.client/examples/pbs_running.sh
new file mode 100644 (file)
index 0000000..c0878d1
--- /dev/null
@@ -0,0 +1,11 @@
+#!/bin/sh
+
+jobreg=./glite-lb-job_reg
+logevent=../bin/glite-lb-logevent
+
+eval `$jobreg -s userinterface -m alcarinque.ics.muni.cz:9000 -P`
+
+# TODO
+# eval `$logevent -s server 
+
+
index 8963265..9a9cc29 100644 (file)
        FAILURE_REASON
        WMS_DN
        PAYLOAD_OWNER
+       NODE_ID
+       NODE_INSTANCE
 /;
index 67b8d50..948487f 100644 (file)
        CREAM_ID
        PAYLOAD_OWNER
        ACCESS_RIGHTS
+       NODE_JOBS
+       WF_NODE
+       WF_NODE_IDS
+       WN_NODE_INSTANCE
 /;
index ed7c22c..8aa7f09 100644 (file)
@@ -341,7 +341,7 @@ static int flesh_seq(int event)
 {
        switch (event) {
 @@@{
-       %seq = ( CREAM => 'CREAM', 'PBS' => 'PBS', 'gLite' => 'NORMAL', Condor=>'CONDOR', Transfer=>'NORMAL' );
+       %seq = ( CREAM => 'CREAM', 'PBS' => 'PBS', 'gLite' => 'NORMAL', Condor=>'CONDOR', Transfer=>'NORMAL', Workflow=>'PBS' );
        for my $t (sort { $event->{order}->{$a} <=> $event->{order}->{$b} }
               $event->getTypes)
        {
index 7d8f6f7..20e28b0 100644 (file)
@@ -58,7 +58,7 @@ PLUGIN_LIBS:= -L${stagedir}${prefix}/${libdir} -lglite_lb_common${default_flavou
        ${CLASSADS_LIBS} -lstdc++ ${EXPAT_LIBS}
 
 PLUGIN_LOBJS:=seqcode_aux.lo process_event.lo process_event_pbs.lo process_event_condor.lo process_event_cream.lo lb_plugin.lo  process_event_file_transfer.lo process_event_file_transfer_collection.lo
-MACHINE_OBJS:=seqcode_aux.o process_event.o process_event_pbs.o process_event_condor.o process_event_cream.o process_event_file_transfer.o process_event_file_transfer_collection.o intjobstat_supp.o
+MACHINE_OBJS:=seqcode_aux.o process_event.o process_event_pbs.o process_event_condor.o process_event_cream.o process_event_file_transfer.o process_event_file_transfer_collection.o process_event_workflow.o intjobstat_supp.o
 
 PLUGIN_LIB=glite_lb_plugin.la
 MACHINE_LIB=libglite_lb_statemachine.a
index 694f9fa..0163b17 100644 (file)
@@ -81,6 +81,9 @@ int processEvent(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict, char
                        case EDG_WLL_REGJOB_FILE_TRANSFER_COLLECTION:
                                js->pub.jobtype = EDG_WLL_STAT_FILE_TRANSFER_COLLECTION;
                                break;
+                       case EDG_WLL_REGJOB_WORKFLOW:
+                               js->pub.jobtype = EDG_WLL_STAT_WORKFLOW;
+                               break;
                        default:
                                trio_asprintf(errstring,"unknown job type %d in registration",e->regJob.jobtype);
                                return RET_FAIL;
@@ -101,6 +104,8 @@ int processEvent(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict, char
                        return processEvent_FileTransfer(js,e,ev_seq,strict,errstring);
                case EDG_WLL_STAT_FILE_TRANSFER_COLLECTION:
                        return processEvent_FileTransferCollection(js,e,ev_seq,strict,errstring);
+               case EDG_WLL_STAT_WORKFLOW:
+                       return processEvent_Workflow(js,e,ev_seq,strict,errstring);
                case -1: return RET_UNREG;
                default: 
                        trio_asprintf(errstring,"undefined job type %d",js->pub.jobtype);
index 4c5d6f0..4e09c12 100644 (file)
        _code_  CREAM           CREAM job
         _code_ FILE_TRANSFER_COLLECTION File transfer collection
         _code_ FILE_TRANSFER            File transfer
+       _code_  WORKFLOW        general workflow
        
        int     nsubjobs        Number of subjobs this job plans to spawn.
        _optional_
        string  compute_job     Jobid of (compute) job the sandbox belongs. Exclusive with transfer_job.
        _optional_      
 
+
+@flesh Workflow
+
+@type WorkflowMember   this job implements a particular workflow node
+       jobid   parent          WF id (if not known with registration)
+       _optional_
+       string  node_id         Id of the WF node
+       int     node_instance   Instance (number) of the WF node execution
+       _optional_
index d64f2ad..6e8db9e 100644 (file)
@@ -15,6 +15,7 @@ int   jobtype         Type of job
        _code_ CREAM                    CREAM job
        _code_ FILE_TRANSFER_COLLECTION job containing all file transfers (i.e. sandbox)
        _code_ FILE_TRANSFER            subjob holding one file transfer
+       _code_ WORKFLOW                 general workflow
 jobid  parent_job                      parent job of subjob
 
 string seed            string used for generation of subjob IDs
@@ -179,6 +180,16 @@ string     ft_dest                 File transfer destination
 
 _pad_  20
 
+string wf_node                 Id of WF node this job implements
+int    wn_node_instance        Instance (number) of the WF node execution
+
+strlist        wf_node_ids             Static list of WF node Ids
+       _special_       XMLstructured
+strlist node_jobs              Current JobIds of WF nodes
+       _special_       XMLstructured
+
+_pad_  20
+
 @type Submitted                Entered by the user to the User Interface or registered by Job Partitioner.
 @type Waiting          Accepted by WMS, waiting for resource allocation.
 @type Ready            Matching resources found.