From 1e504446fa1a44b37910bfd93b9b3a713c189a5e Mon Sep 17 00:00:00 2001 From: =?utf8?q?Ale=C5=A1=20K=C5=99enek?= Date: Mon, 7 Mar 2011 10:33:56 +0000 Subject: [PATCH] framework for supporting general workflows --- org.glite.lb.client/examples/job_reg.c | 13 ++++++++----- org.glite.lb.client/examples/pbs_running.sh | 11 +++++++++++ org.glite.lb.client/src/EventAttrNames.pl | 2 ++ org.glite.lb.client/src/StatusAttrNames.pl | 4 ++++ org.glite.lb.client/src/logevent.c.T | 2 +- org.glite.lb.state-machine/Makefile | 2 +- org.glite.lb.state-machine/src/process_event.c | 5 +++++ org.glite.lb.types/events.T | 10 ++++++++++ org.glite.lb.types/status.T | 11 +++++++++++ 9 files changed, 53 insertions(+), 7 deletions(-) create mode 100644 org.glite.lb.client/examples/pbs_running.sh diff --git a/org.glite.lb.client/examples/job_reg.c b/org.glite.lb.client/examples/job_reg.c index 6cdd141..4fa8ed1 100644 --- a/org.glite.lb.client/examples/job_reg.c +++ b/org.glite.lb.client/examples/job_reg.c @@ -35,14 +35,14 @@ extern int opterr,optind; static void usage(char *me) { - fprintf(stderr,"usage: %s [-m bkserver] [-x|-X non-default_sock_path] [-j dg_jobid] [-s source_id] [-n num_subjobs [-S][-C]] [-P] [-l jdl_file] [-e seed] [-E]\n", me); + fprintf(stderr,"usage: %s [-m bkserver] [-x|-X non-default_sock_path] [-j dg_jobid] [-s source_id] [-n num_subjobs [-S][-C]] [-P] [-W] [-l jdl_file] [-e seed] [-E]\n", me); } int main(int argc, char *argv[]) { char *src = NULL,*job = NULL,*server = NULL,*seq,*jdl = NULL, *seed = NULL; int lbproxy = 0; - int done = 0,num_subjobs = 0,reg_subjobs = 0,i, collection = 0, pbs=0, cream=0, type, flags=0; + int done = 0,num_subjobs = 0,reg_subjobs = 0,i, collection = 0, pbs=0, cream=0, type, flags=0, workflow = 0; edg_wll_Context ctx; edg_wlc_JobId jobid,*subjobs; @@ -51,7 +51,7 @@ int main(int argc, char *argv[]) opterr = 0; do { - switch (getopt(argc,argv,"xX:s:j:m:n:SCl:e:PcE")) { + switch (getopt(argc,argv,"xX:s:j:m:n:SCl:e:PcEW")) { case 'x': lbproxy = 1; break; case 'X': lbproxy = 1; edg_wll_SetParam(ctx, EDG_WLL_PARAM_LBPROXY_STORE_SOCK, optarg); @@ -67,6 +67,7 @@ int main(int argc, char *argv[]) case 'l': jdl = (char *) strdup(optarg); break; case 'e': seed = strdup(optarg); break; case 'E': flags = flags | EDG_WLL_LOGLFLAG_EXCL; break; + case 'W': workflow = 1; break; case '?': usage(argv[0]); exit(EINVAL); case -1: done = 1; break; } @@ -119,13 +120,15 @@ int main(int argc, char *argv[]) edg_wll_SetParam(ctx,EDG_WLL_PARAM_SOURCE,edg_wll_StringToSource(src)); - type = pbs ? EDG_WLL_REGJOB_PBS + type = workflow ? EDG_WLL_REGJOB_WORKFLOW : + (pbs ? EDG_WLL_REGJOB_PBS : (cream ? EDG_WLL_REGJOB_CREAM : (num_subjobs ? (collection?EDG_WLL_REGJOB_COLLECTION:EDG_WLL_REGJOB_DAG) :EDG_WLL_REGJOB_SIMPLE ) - ); + ) + ); if (lbproxy) { if (edg_wll_RegisterJobProxy(ctx,jobid,type, diff --git a/org.glite.lb.client/examples/pbs_running.sh b/org.glite.lb.client/examples/pbs_running.sh new file mode 100644 index 0000000..c0878d1 --- /dev/null +++ b/org.glite.lb.client/examples/pbs_running.sh @@ -0,0 +1,11 @@ +#!/bin/sh + +jobreg=./glite-lb-job_reg +logevent=../bin/glite-lb-logevent + +eval `$jobreg -s userinterface -m alcarinque.ics.muni.cz:9000 -P` + +# TODO +# eval `$logevent -s server + + diff --git a/org.glite.lb.client/src/EventAttrNames.pl b/org.glite.lb.client/src/EventAttrNames.pl index 8963265..9a9cc29 100644 --- a/org.glite.lb.client/src/EventAttrNames.pl +++ b/org.glite.lb.client/src/EventAttrNames.pl @@ -115,4 +115,6 @@ FAILURE_REASON WMS_DN PAYLOAD_OWNER + NODE_ID + NODE_INSTANCE /; diff --git a/org.glite.lb.client/src/StatusAttrNames.pl b/org.glite.lb.client/src/StatusAttrNames.pl index 67b8d50..948487f 100644 --- a/org.glite.lb.client/src/StatusAttrNames.pl +++ b/org.glite.lb.client/src/StatusAttrNames.pl @@ -112,4 +112,8 @@ CREAM_ID PAYLOAD_OWNER ACCESS_RIGHTS + NODE_JOBS + WF_NODE + WF_NODE_IDS + WN_NODE_INSTANCE /; diff --git a/org.glite.lb.client/src/logevent.c.T b/org.glite.lb.client/src/logevent.c.T index ed7c22c..8aa7f09 100644 --- a/org.glite.lb.client/src/logevent.c.T +++ b/org.glite.lb.client/src/logevent.c.T @@ -341,7 +341,7 @@ static int flesh_seq(int event) { switch (event) { @@@{ - %seq = ( CREAM => 'CREAM', 'PBS' => 'PBS', 'gLite' => 'NORMAL', Condor=>'CONDOR', Transfer=>'NORMAL' ); + %seq = ( CREAM => 'CREAM', 'PBS' => 'PBS', 'gLite' => 'NORMAL', Condor=>'CONDOR', Transfer=>'NORMAL', Workflow=>'PBS' ); for my $t (sort { $event->{order}->{$a} <=> $event->{order}->{$b} } $event->getTypes) { diff --git a/org.glite.lb.state-machine/Makefile b/org.glite.lb.state-machine/Makefile index 7d8f6f7..20e28b0 100644 --- a/org.glite.lb.state-machine/Makefile +++ b/org.glite.lb.state-machine/Makefile @@ -58,7 +58,7 @@ PLUGIN_LIBS:= -L${stagedir}${prefix}/${libdir} -lglite_lb_common${default_flavou ${CLASSADS_LIBS} -lstdc++ ${EXPAT_LIBS} PLUGIN_LOBJS:=seqcode_aux.lo process_event.lo process_event_pbs.lo process_event_condor.lo process_event_cream.lo lb_plugin.lo process_event_file_transfer.lo process_event_file_transfer_collection.lo -MACHINE_OBJS:=seqcode_aux.o process_event.o process_event_pbs.o process_event_condor.o process_event_cream.o process_event_file_transfer.o process_event_file_transfer_collection.o intjobstat_supp.o +MACHINE_OBJS:=seqcode_aux.o process_event.o process_event_pbs.o process_event_condor.o process_event_cream.o process_event_file_transfer.o process_event_file_transfer_collection.o process_event_workflow.o intjobstat_supp.o PLUGIN_LIB=glite_lb_plugin.la MACHINE_LIB=libglite_lb_statemachine.a diff --git a/org.glite.lb.state-machine/src/process_event.c b/org.glite.lb.state-machine/src/process_event.c index 694f9fa..0163b17 100644 --- a/org.glite.lb.state-machine/src/process_event.c +++ b/org.glite.lb.state-machine/src/process_event.c @@ -81,6 +81,9 @@ int processEvent(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict, char case EDG_WLL_REGJOB_FILE_TRANSFER_COLLECTION: js->pub.jobtype = EDG_WLL_STAT_FILE_TRANSFER_COLLECTION; break; + case EDG_WLL_REGJOB_WORKFLOW: + js->pub.jobtype = EDG_WLL_STAT_WORKFLOW; + break; default: trio_asprintf(errstring,"unknown job type %d in registration",e->regJob.jobtype); return RET_FAIL; @@ -101,6 +104,8 @@ int processEvent(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict, char return processEvent_FileTransfer(js,e,ev_seq,strict,errstring); case EDG_WLL_STAT_FILE_TRANSFER_COLLECTION: return processEvent_FileTransferCollection(js,e,ev_seq,strict,errstring); + case EDG_WLL_STAT_WORKFLOW: + return processEvent_Workflow(js,e,ev_seq,strict,errstring); case -1: return RET_UNREG; default: trio_asprintf(errstring,"undefined job type %d",js->pub.jobtype); diff --git a/org.glite.lb.types/events.T b/org.glite.lb.types/events.T index 4c5d6f0..4e09c12 100644 --- a/org.glite.lb.types/events.T +++ b/org.glite.lb.types/events.T @@ -163,6 +163,7 @@ _code_ CREAM CREAM job _code_ FILE_TRANSFER_COLLECTION File transfer collection _code_ FILE_TRANSFER File transfer + _code_ WORKFLOW general workflow int nsubjobs Number of subjobs this job plans to spawn. _optional_ @@ -480,3 +481,12 @@ string compute_job Jobid of (compute) job the sandbox belongs. Exclusive with transfer_job. _optional_ + +@flesh Workflow + +@type WorkflowMember this job implements a particular workflow node + jobid parent WF id (if not known with registration) + _optional_ + string node_id Id of the WF node + int node_instance Instance (number) of the WF node execution + _optional_ diff --git a/org.glite.lb.types/status.T b/org.glite.lb.types/status.T index d64f2ad..6e8db9e 100644 --- a/org.glite.lb.types/status.T +++ b/org.glite.lb.types/status.T @@ -15,6 +15,7 @@ int jobtype Type of job _code_ CREAM CREAM job _code_ FILE_TRANSFER_COLLECTION job containing all file transfers (i.e. sandbox) _code_ FILE_TRANSFER subjob holding one file transfer + _code_ WORKFLOW general workflow jobid parent_job parent job of subjob string seed string used for generation of subjob IDs @@ -179,6 +180,16 @@ string ft_dest File transfer destination _pad_ 20 +string wf_node Id of WF node this job implements +int wn_node_instance Instance (number) of the WF node execution + +strlist wf_node_ids Static list of WF node Ids + _special_ XMLstructured +strlist node_jobs Current JobIds of WF nodes + _special_ XMLstructured + +_pad_ 20 + @type Submitted Entered by the user to the User Interface or registered by Job Partitioner. @type Waiting Accepted by WMS, waiting for resource allocation. @type Ready Matching resources found. -- 1.8.2.3