Further tuning sandbox transfer support
authorAleš Křenek <ljocha@ics.muni.cz>
Mon, 8 Mar 2010 13:27:29 +0000 (13:27 +0000)
committerAleš Křenek <ljocha@ics.muni.cz>
Mon, 8 Mar 2010 13:27:29 +0000 (13:27 +0000)
- better (hopefully final) naming of events fields
- client convenience command to register the sandbox transfer and setup the
  link to the computational job and vice versa
- shell example how to integrate the stuff inside job wrapper

org.glite.lb.client/Makefile
org.glite.lb.client/examples/reg_sandbox_transfer.c [deleted file]
org.glite.lb.client/examples/sandbox_transfers.sh [new file with mode: 0644]
org.glite.lb.client/src/EventAttrNames.pl
org.glite.lb.client/src/logevent.c.T
org.glite.lb.client/src/register_sandbox.c [new file with mode: 0644]
org.glite.lb.state-machine/src/process_event.c
org.glite.lb.state-machine/src/process_event_file_transfer.c
org.glite.lb.types/events.T

index 34a1dc0..4954218 100644 (file)
@@ -147,7 +147,7 @@ PLUSLIB:=libglite_lb_clientpp_${nothrflavour}.la
 THRPLUSLIB:=libglite_lb_clientpp_${thrflavour}.la
 
 TOOLS:=${LB_PERF_TOOLS}
-EXAMPLES_SRC:=log_usertag_proxy.c job_log.c job_reg.c feed_shark.c query_ext.c query_seq_code.c stats.c abort_job.c change_acl.c stresslog.c flood_proxy.c dagids.c stress_context.c parse_eventsfile.c test_changed_jdl.c reg_sandbox_transfer.c done_failed_events.c
+EXAMPLES_SRC:=log_usertag_proxy.c job_log.c job_reg.c feed_shark.c query_ext.c query_seq_code.c stats.c abort_job.c change_acl.c stresslog.c flood_proxy.c dagids.c stress_context.c parse_eventsfile.c test_changed_jdl.c done_failed_events.c
 EXAMPLES:=${EXAMPLES_SRC:.c=}
 
 # TODO: migrate them here from branch_RC31_3
@@ -177,9 +177,9 @@ version_info:=-version-info ${shell \
        perl -e '$$,=":"; @F=split "\\.","${version}"; print $$F[0]+$$F[1]+${offset},$$F[2],$$F[1]' }
 
 ifdef LB_STANDALONE
-compile all: generate ${LIB} ${THRLIB} ${TOOLS} logevent notify examples ${MAN_GZ} ${MAN8_GZ}
+compile all: generate ${LIB} ${THRLIB} ${TOOLS} logevent notify register_sandbox examples ${MAN_GZ} ${MAN8_GZ}
 else
-compile all: check_version generate ${LIB} ${THRLIB} ${PLUSLIB} ${THRPLUSLIB} ${TOOLS} logevent notify examples ${MAN_GZ} ${MAN8_GZ}
+compile all: check_version generate ${LIB} ${THRLIB} ${PLUSLIB} ${THRPLUSLIB} ${TOOLS} logevent notify register_sandbox examples ${MAN_GZ} ${MAN8_GZ}
 endif
 
 generate: ${GEN_HDRS}
@@ -208,6 +208,9 @@ logevent: logevent.o args.o
 notify: notify.o
        ${LINKXX} -o $@ notify.o ${PLUSLIB} ${EXT_LIB} 
 
+register_sandbox: %: %.o
+       ${LINK} -o $@ $@.o ${LIB} ${EXT_LIB}
+
 ${TOOLS} ${EXAMPLES}: %: %.o
        ${LINK} -o $@ $< ${LIB} ${EXT_LIB} 
 
@@ -341,7 +344,7 @@ ifndef LB_STANDALONE
 #      cp -r C CPP ${PREFIX}/share/doc/${package}-${version}
 #       cp -r ${top_srcdir}/doc/api/{Makefile,api.tex,fig} ${PREFIX}/share/doc/${package}-${version}/api
 endif
-       for p in logevent notify; do \
+       for p in logevent notify register_sandbox; do \
                ${INSTALL} -m 755 "$$p" "${PREFIX}/bin/glite-lb-$$p"; \
        done
        for p in ${TOOLS} ; do \
@@ -358,13 +361,14 @@ endif
        ${INSTALL} -m 644 ${top_srcdir}/examples/README.examples "${PREFIX}/share/doc/${package}-${version}/examples/"
        ${INSTALL} -m 755 ${top_srcdir}/src/export.sh "${PREFIX}/sbin/glite-lb-export.sh"
        ${INSTALL} -m 755 ${top_srcdir}/examples/notify.pl ${PREFIX}/examples/glite-lb-notify.pl
+       ${INSTALL} -m 755 ${top_srcdir}/examples/sandbox_transfers.sh ${PREFIX}/examples/glite-lb-sandbox_transfers.sh
        ${INSTALL} -m 644 ${MAN_GZ} ${PREFIX}/share/man/man1
        ${INSTALL} -m 644 ${MAN8_GZ} ${PREFIX}/share/man/man8
        ${INSTALL} -m 644 ${top_srcdir}/m4/*.m4 ${PREFIX}/share/build/m4/
 
 clean:
        rm -rvf *.o *.lo .libs lib* *.c *.cpp *.h *.dox producer_test notify C/ CPP/
-       rm -rvf ${LIB} ${THRLIB} ${TOOLS} logevent ${PLUSLIB} ${THRPLUSLIB} ${MAN_GZ} ${MAN8_GZ}
+       rm -rvf ${LIB} ${THRLIB} ${TOOLS} logevent  register_sandbox ${PLUSLIB} ${THRPLUSLIB} ${MAN_GZ} ${MAN8_GZ}
        rm -rvf ${EXAMPLES} ${EXAMPLES_CL} ${EXAMPLES_CL_THR} ${sh_PROGS}
        rm -f EventAttrNames.pl
        rm -rvf log.xml project/ rpmbuild/ RPMS/ tgz/
diff --git a/org.glite.lb.client/examples/reg_sandbox_transfer.c b/org.glite.lb.client/examples/reg_sandbox_transfer.c
deleted file mode 100644 (file)
index 52275ea..0000000
+++ /dev/null
@@ -1,147 +0,0 @@
-#include <stdio.h>
-#include <stdlib.h>
-#include <unistd.h>
-#include <string.h>
-#include <errno.h>
-#include <fcntl.h>
-
-#include "glite/jobid/cjobid.h"
-#ifdef BUILDING_LB_CLIENT
-#include "producer.h"
-#else
-#include "glite/lb/producer.h"
-#endif
-#include "glite/lb/events.h"
-
-extern char *optarg;
-extern int opterr,optind;
-
-static void usage(char *me)
-{
-       fprintf(stderr,"usage: %s [-m bkserver] [-x|-X non-default_sock_path] [-j dg_jobid] [-s source_id] [-n num_subjobs [-S]] [-e seed]\n", me);
-}
-
-int main(int argc, char *argv[])
-{
-       char *src = NULL,*job = NULL,*server = NULL,*seq, *seed = NULL;
-       int lbproxy = 0;
-       int done = 0, num_subjobs = 0, reg_subjobs = 0, i, type;
-       edg_wll_Context ctx;
-       edg_wlc_JobId   jobid,*subjobs;
-
-
-       edg_wll_InitContext(&ctx);
-       opterr = 0;
-
-       do {
-               switch (getopt(argc,argv,"xX:s:j:m:n:Se:")) {
-                       case 'x': lbproxy = 1; break;
-                       case 'X': lbproxy = 1; 
-                                 edg_wll_SetParam(ctx, EDG_WLL_PARAM_LBPROXY_STORE_SOCK, optarg);
-                                 break;
-                       case 's': src = (char *) strdup(optarg); break;
-                       case 'j': job = (char *) strdup(optarg); break;
-                       case 'm': server = strdup(optarg); break;
-                       case 'n': num_subjobs = atoi(optarg); printf("Not implemented yet \n"); exit(1);  break;
-                       case 'S': reg_subjobs = 1; break;
-                       case 'e': seed = strdup(optarg); break;
-                       case '?': usage(argv[0]); exit(EINVAL);
-                       case -1: done = 1; break;
-               }
-       } while (!done);
-
-       if ((num_subjobs <= 0) && (reg_subjobs) ) {
-               usage(argv[0]);
-               exit(EINVAL);
-       }       
-
-       if (!job && !server) {
-               fprintf(stderr,"%s: either -m server or -j jobid has to be specified\n",argv[0]);
-               exit(1);
-       }
-
-       if (!src) {
-               fprintf(stderr,"%s: -s required\n",argv[0]);
-               exit(1);
-       }
-
-       if (!job) {
-               char *p = strchr(server,':');
-               if (p) *p=0;
-               edg_wlc_JobIdCreate(server,p?atoi(p+1):0,&jobid);
-               job = edg_wlc_JobIdUnparse(jobid);
-               printf("new jobid: %s\n",job);
-       }
-       else if ((errno = edg_wlc_JobIdParse(job,&jobid))) {
-               perror(job);
-               exit(1);
-       }
-
-       edg_wll_SetParam(ctx,EDG_WLL_PARAM_SOURCE,edg_wll_StringToSource(src));
-
-       type = num_subjobs ? EDG_WLL_REGJOB_FILE_TRANSFER_COLLECTION : EDG_WLL_REGJOB_FILE_TRANSFER;
-
-       if (lbproxy) {
-               if (edg_wll_RegisterJobProxy(ctx,jobid,type,
-                       "", "NS",
-                       num_subjobs,seed,&subjobs))
-               {
-                       char    *et,*ed;
-                       edg_wll_Error(ctx,&et,&ed);
-                       fprintf(stderr,"edg_wll_RegisterJobProxy(%s): %s (%s)\n",job,et,ed);
-                       exit(1);
-               }
-       } else {
-               if (edg_wll_RegisterJob(ctx,jobid,type,
-                       "", "NS",
-                       num_subjobs,seed,&subjobs))
-               {
-                       char    *et,*ed;
-                       edg_wll_Error(ctx,&et,&ed);
-                       fprintf(stderr,"edg_wll_RegisterJob(%s): %s (%s)\n",job,et,ed);
-                       exit(1);
-               }
-       }
-
-       seq = edg_wll_GetSequenceCode(ctx);
-       printf("\n%s=\"%s\"\n",num_subjobs?"EDG_WL_FILE_TRANSFER_COLLECTION_JOBID":"EDG_WL_FILE_TRANSFER_JOBID",job);
-       printf("EDG_WL_SEQUENCE=\"%s\"\n",seq);
-       free(seq);
-       free(job);
-
-       if (num_subjobs) for (i=0; subjobs[i]; i++) {
-               char    *job_s = edg_wlc_JobIdUnparse(subjobs[i]);
-               printf("EDG_WL_SUB_JOBID[%d]=\"%s\"\n",i,job_s);
-               free(job_s);
-       }
-
-       if (reg_subjobs) {
-               char ** jdls = (char**) calloc(num_subjobs+1, sizeof(char*));
-
-               for (i=0; subjobs[i]; i++) {
-                       asprintf(jdls+i, "JDL of subjob #%d\n", i+1);
-               }
-
-               if (lbproxy) {
-                       if (edg_wll_RegisterSubjobsProxy(ctx, jobid, (const char **) jdls, "NS", subjobs)) {
-                               char    *et,*ed;
-                               edg_wll_Error(ctx,&et,&ed);
-                               fprintf(stderr,"edg_wll_RegisterSubjobsProxy: %s (%s)\n", et, ed);
-                               exit(1);
-                       }
-               } else {
-                       if (edg_wll_RegisterSubjobs(ctx, jobid, (const char **) jdls, "NS", subjobs)) {
-                               char    *et,*ed;
-                               edg_wll_Error(ctx,&et,&ed);
-                               fprintf(stderr,"edg_wll_RegisterSubjobs: %s (%s)\n", et, ed);
-                               exit(1);
-                       }
-               }
-
-               for (i=0; subjobs[i]; i++) free(jdls[i]);
-       }
-
-       edg_wll_FreeContext(ctx);
-
-       return 0;
-}
diff --git a/org.glite.lb.client/examples/sandbox_transfers.sh b/org.glite.lb.client/examples/sandbox_transfers.sh
new file mode 100644 (file)
index 0000000..9b6269c
--- /dev/null
@@ -0,0 +1,73 @@
+#!/bin/sh
+
+
+# Example script illustrating injection of LB commands into Job Wrapper
+# to log sandbox transfer related events
+
+# It can be used as a submitted job to check the functionality, however,
+# in this way it interferes with sequence codes logged by JW itself.
+# Therefore it should be taken as an example only; the commands from here
+# should end up in the generated job wrapper instead.
+
+
+# register sandbox transfers with LB
+# sets environment variables:
+#      GLITE_WMS_SEQUENCE_CODE updated seq. code for the job itself
+#      GLITE_LB_ISB_JOBID
+#      GLITE_LB_ISB_SEQUENCE
+
+eval `glite-lb-register_sandbox \
+       --jobid $GLITE_WMS_JOBID        \
+       --input \
+       --from http://users.machine/path/to/sandbox.file \
+       --to file://where/it/is/sandbox.file \
+       --sequence $GLITE_WMS_SEQUENCE_CODE`
+
+
+eval `glite-lb-register_sandbox \
+       --jobid $GLITE_WMS_JOBID        \
+       --output \
+       --from file://where/it/is/sandbox.file2 \
+       --to http://users.machine/path/to/sandbox.file2 \
+       --sequence $GLITE_WMS_SEQUENCE_CODE`
+
+# ISB transfer 
+GLITE_LB_ISB_SEQUENCE=`glite-lb-logevent \
+       --source LRMS \
+       --jobid $GLITE_LB_ISB_JOBID \
+       --sequence $GLITE_LB_ISB_SEQUENCE \
+       --event FileTransfer \
+       --result START`
+
+# it takes looong
+sleep 60
+
+GLITE_LB_ISB_SEQUENCE=`glite-lb-logevent \
+       --source LRMS \
+       --jobid $GLITE_LB_ISB_JOBID \
+       --sequence $GLITE_LB_ISB_SEQUENCE \
+       --event FileTransfer \
+       --result OK`
+
+# or FAIL with --reason "because of bad weather"
+
+# job payload here
+sleep 120
+
+
+GLITE_LB_OSB_SEQUENCE=`glite-lb-logevent \
+       --source LRMS \
+       --jobid $GLITE_LB_OSB_JOBID \
+       --sequence $GLITE_LB_OSB_SEQUENCE \
+       --event FileTransfer \
+       --result START`
+
+sleep 60
+
+GLITE_LB_OSB_SEQUENCE=`glite-lb-logevent \
+       --source LRMS \
+       --jobid $GLITE_LB_OSB_JOBID \
+       --sequence $GLITE_LB_OSB_SEQUENCE \
+       --event FileTransfer \
+       --result OK`
+
index bddf11c..2bd5593 100644 (file)
@@ -90,7 +90,7 @@
        ORIG_TIMESTAMP
        SRC
        DEST
-       VARIANT
+       SANDBOX_TYPE
        TRANSFER_JOB
        COMPUTE_JOB
        DEST_URL
index a68c0ef..0f3f4c0 100644 (file)
@@ -313,7 +313,7 @@ static int flesh_seq(int event)
 {
        switch (event) {
 @@@{
-       %seq = ( CREAM => 'CREAM', 'PBS' => 'PBS', 'gLite' => 'NORMAL', Condor=>'CONDOR' );
+       %seq = ( CREAM => 'CREAM', 'PBS' => 'PBS', 'gLite' => 'NORMAL', Condor=>'CONDOR', Transfer=>'NORMAL' );
        for my $t (sort { $event->{order}->{$a} <=> $event->{order}->{$b} }
               $event->getTypes)
        {
diff --git a/org.glite.lb.client/src/register_sandbox.c b/org.glite.lb.client/src/register_sandbox.c
new file mode 100644 (file)
index 0000000..07622b8
--- /dev/null
@@ -0,0 +1,149 @@
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <assert.h>
+
+#ifndef _GNU_SOURCE
+#define _GNU_SOURCE
+#endif
+#include <getopt.h>
+
+
+#include "glite/jobid/cjobid.h"
+#ifdef BUILDING_LB_CLIENT
+#include "producer.h"
+#else
+#include "glite/lb/producer.h"
+#endif
+#include "glite/lb/events.h"
+
+static void usage(char *me) 
+{
+       fprintf(stderr,"usage %s: \n"
+                       "       -j,--jobid jobid\n"
+                       "       -i,--input or -o,--output       (mutually exclusive)\n"
+                       "       -f,--from URI\n"
+                       "       -t,--to URI\n"
+                       "       [ -s,--source lb-source ]       (default LRMS)\n"
+                       "       [ -c,--sequence lb-seqcode ]\n"
+                       ,me);
+}
+
+#define check_log(fun,_jobid,arg) \
+       if (fun arg) {  \
+               char    *et,*ed;        \
+               edg_wll_Error(ctx,&et,&ed);     \
+               fprintf(stderr,#fun "(%s): %s (%s)\n",_jobid,et,ed);    \
+               exit(1);        \
+       }
+
+
+int main(int argc,char **argv)
+{
+       struct option opts[] = {
+               { "jobid",1,NULL,'j' },
+               { "input",0,NULL,'i' },
+               { "output",0,NULL,'o' },
+               { "from",1,NULL,'f' },
+               { "to",1,NULL,'t' },
+               { "source",1,NULL,'s' },
+               { "sequence",1,NULL,'c' },
+       };
+
+       char    *jobid_s = NULL, *ftjobid_s = NULL, *from = NULL, *to = NULL, *source_s = "LRMS",
+               *sequence = NULL, *srv, *uniq, type_c = 'x';
+
+       unsigned int    port;
+       int     o;
+
+       glite_jobid_t   jobid, ftjobid;
+       edg_wll_Source  source;
+
+       enum edg_wll_SandboxSandbox_type type =
+               EDG_WLL_SANDBOX_SANDBOX_TYPE_UNDEFINED;
+
+       edg_wll_Context ctx;
+
+       while ((o = getopt_long(argc,argv,"j:iof:t:s:c:",opts,NULL)) != EOF) switch(o) {
+               case 'j':       jobid_s = optarg; break;
+               case 'i':       if (type != EDG_WLL_SANDBOX_SANDBOX_TYPE_UNDEFINED) { usage(argv[0]); exit(1); }
+                               type = EDG_WLL_SANDBOX_INPUT;
+                               break;
+               case 'o':       if (type != EDG_WLL_SANDBOX_SANDBOX_TYPE_UNDEFINED) { usage(argv[0]); exit(1); }
+                               type = EDG_WLL_SANDBOX_OUTPUT;
+                               break;
+               case 'f':       from = optarg; break;
+               case 't':       to = optarg; break;
+               case 's':       source_s = optarg; break;
+               case 'c':       sequence = optarg; break;
+               default:        usage(argv[0]); exit(1);
+       }
+
+       edg_wll_InitContext(&ctx);
+
+       if (!jobid_s || type == EDG_WLL_SANDBOX_SANDBOX_TYPE_UNDEFINED
+                       || !from || !to)
+       {
+               usage(argv[0]); 
+               exit(1);
+       }
+
+       type_c = type == EDG_WLL_SANDBOX_INPUT ? 'I' : 'O';
+
+       if (glite_jobid_parse(jobid_s,&jobid)) {
+               fprintf(stderr,"%s: can't parse\n",jobid_s);
+               exit(1);
+       }
+
+       if ((source = edg_wll_StringToSource(source_s)) == EDG_WLL_SOURCE_NONE) {
+               fprintf(stderr,"%s: invalid source\n",source_s);
+               exit(1);
+}
+;
+       edg_wll_SetParam(ctx, EDG_WLL_PARAM_SOURCE, source);
+
+       glite_jobid_getServerParts(jobid,&srv,&port);
+       glite_jobid_create(srv,port,&ftjobid);
+       uniq = glite_jobid_getUnique(ftjobid);
+       glite_jobid_free(ftjobid);
+
+       asprintf(&ftjobid_s,"https://%s:%d/FT:%s",srv,port,uniq);
+       assert(glite_jobid_parse(ftjobid_s,&ftjobid) == 0);
+
+       edg_wll_SetLoggingJob(ctx,ftjobid,NULL,EDG_WLL_SEQ_NORMAL);
+
+       check_log(edg_wll_RegisterJob,ftjobid_s,(
+               ctx,
+               ftjobid, EDG_WLL_REGJOB_FILE_TRANSFER,
+               "n/a","n/a",
+               0,NULL,NULL));
+               
+       check_log(edg_wll_LogFileTransferRegister,ftjobid_s,(ctx,from,to));
+       check_log(edg_wll_LogSandbox,ftjobid_s,(
+                               ctx,
+                               edg_wll_SandboxSandbox_typeToString(type),
+                               NULL,
+                               jobid_s));
+
+       printf("GLITE_LB_%cSB_JOBID=\"%s\"\nGLITE_LB_%cSB_SEQUENCE=\"%s\"\n",
+                       type_c,ftjobid_s,
+                       type_c,edg_wll_GetSequenceCode(ctx));
+
+       if (edg_wll_SetLoggingJob(ctx,jobid,sequence,EDG_WLL_SEQ_NORMAL)) {
+               char    *et,*ed;
+               edg_wll_Error(ctx,&et,&ed);
+               fprintf(stderr,"edg_wll_SetLoggingJob(%s,%s): %s (%s)\n",
+                               jobid_s,sequence,et,ed);
+               exit(1);
+       }
+
+       check_log(edg_wll_LogSandbox,jobid_s,(
+                               ctx,
+                               edg_wll_SandboxSandbox_typeToString(type),
+                               ftjobid_s,
+                               NULL));
+
+       printf("GLITE_WMS_SEQUENCE_CODE=\"%s\"\n",edg_wll_GetSequenceCode(ctx));
+
+       return 0;
+}
index 2af3def..5d91734 100644 (file)
@@ -952,12 +952,12 @@ static int processEvent_glite(intJobStat *js, edg_wll_Event *e, int ev_seq, int
                        break;
                case EDG_WLL_EVENT_SANDBOX:
                        if (USABLE_DATA(res, strict)) {
-                               if ((e->sandbox.variant == EDG_WLL_SANDBOX_INPUT) && e->sandbox.transfer_job) {
+                               if ((e->sandbox.sandbox_type == EDG_WLL_SANDBOX_INPUT) && e->sandbox.transfer_job) {
                                        edg_wlc_JobIdFree(js->pub.isb_transfer);
                                        edg_wlc_JobIdParse(e->sandbox.transfer_job,&js->pub.isb_transfer);
                                }
 
-                               if ((e->sandbox.variant == EDG_WLL_SANDBOX_OUTPUT) && e->sandbox.transfer_job) {
+                               if ((e->sandbox.sandbox_type == EDG_WLL_SANDBOX_OUTPUT) && e->sandbox.transfer_job) {
                                        edg_wlc_JobIdFree(js->pub.osb_transfer);
                                        edg_wlc_JobIdParse(e->sandbox.transfer_job,&js->pub.osb_transfer);
                                }
index 144c3b5..db2364b 100644 (file)
@@ -100,10 +100,10 @@ int processEvent_FileTransfer(intJobStat *js, edg_wll_Event *e, int ev_seq, int
                        break;
                case EDG_WLL_EVENT_SANDBOX:
                        if (USABLE_DATA(res)) {
-                               if (e->sandbox.variant == EDG_WLL_SANDBOX_INPUT)
+                               if (e->sandbox.sandbox_type == EDG_WLL_SANDBOX_INPUT)
                                        js->pub.ft_sandbox_type = EDG_WLL_STAT_INPUT;
 
-                               if (e->sandbox.variant == EDG_WLL_SANDBOX_OUTPUT)
+                               if (e->sandbox.sandbox_type == EDG_WLL_SANDBOX_OUTPUT)
                                        js->pub.ft_sandbox_type = EDG_WLL_STAT_OUTPUT;
 
                                if (e->sandbox.compute_job) {
index 0cf542b..feef0de 100644 (file)
        string LRMS_jobid
        _optional_
 
+@flesh Transfer
+
 @type FileTransferRegister     register file transfer
        string  src             Source of file transfer.
        string  dest            Destination of file transfer.
         _code_  OK              The file was transfered successfully.
         _code_  FAIL            The file transfer failed (see reason)
        string  reason          The reason of file transfer failure.
+       _optional_
 
 @type Sandbox          event for logging relationship between (compute) job and (file) transfer job
-       int     variant         Type of sandbox.
+       int     sandbox_type            Type of sandbox.
        _code_  INPUT
        _code_  OUTPUT
        string  transfer_job    Jobid of transfer job (collection) containing set of file transfers (subjobs).