From abc60b754083542c2a860f10d7671bb984e137a3 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Ale=C5=A1=20K=C5=99enek?= Date: Mon, 8 Mar 2010 13:27:29 +0000 Subject: [PATCH] Further tuning sandbox transfer support - better (hopefully final) naming of events fields - client convenience command to register the sandbox transfer and setup the link to the computational job and vice versa - shell example how to integrate the stuff inside job wrapper --- org.glite.lb.client/Makefile | 14 +- .../examples/reg_sandbox_transfer.c | 147 -------------------- org.glite.lb.client/examples/sandbox_transfers.sh | 73 ++++++++++ org.glite.lb.client/src/EventAttrNames.pl | 2 +- org.glite.lb.client/src/logevent.c.T | 2 +- org.glite.lb.client/src/register_sandbox.c | 149 +++++++++++++++++++++ org.glite.lb.state-machine/src/process_event.c | 4 +- .../src/process_event_file_transfer.c | 4 +- org.glite.lb.types/events.T | 5 +- 9 files changed, 241 insertions(+), 159 deletions(-) delete mode 100644 org.glite.lb.client/examples/reg_sandbox_transfer.c create mode 100644 org.glite.lb.client/examples/sandbox_transfers.sh create mode 100644 org.glite.lb.client/src/register_sandbox.c diff --git a/org.glite.lb.client/Makefile b/org.glite.lb.client/Makefile index 34a1dc0..4954218 100644 --- a/org.glite.lb.client/Makefile +++ b/org.glite.lb.client/Makefile @@ -147,7 +147,7 @@ PLUSLIB:=libglite_lb_clientpp_${nothrflavour}.la THRPLUSLIB:=libglite_lb_clientpp_${thrflavour}.la TOOLS:=${LB_PERF_TOOLS} -EXAMPLES_SRC:=log_usertag_proxy.c job_log.c job_reg.c feed_shark.c query_ext.c query_seq_code.c stats.c abort_job.c change_acl.c stresslog.c flood_proxy.c dagids.c stress_context.c parse_eventsfile.c test_changed_jdl.c reg_sandbox_transfer.c done_failed_events.c +EXAMPLES_SRC:=log_usertag_proxy.c job_log.c job_reg.c feed_shark.c query_ext.c query_seq_code.c stats.c abort_job.c change_acl.c stresslog.c flood_proxy.c dagids.c stress_context.c parse_eventsfile.c test_changed_jdl.c done_failed_events.c EXAMPLES:=${EXAMPLES_SRC:.c=} # TODO: migrate them here from branch_RC31_3 @@ -177,9 +177,9 @@ version_info:=-version-info ${shell \ perl -e '$$,=":"; @F=split "\\.","${version}"; print $$F[0]+$$F[1]+${offset},$$F[2],$$F[1]' } ifdef LB_STANDALONE -compile all: generate ${LIB} ${THRLIB} ${TOOLS} logevent notify examples ${MAN_GZ} ${MAN8_GZ} +compile all: generate ${LIB} ${THRLIB} ${TOOLS} logevent notify register_sandbox examples ${MAN_GZ} ${MAN8_GZ} else -compile all: check_version generate ${LIB} ${THRLIB} ${PLUSLIB} ${THRPLUSLIB} ${TOOLS} logevent notify examples ${MAN_GZ} ${MAN8_GZ} +compile all: check_version generate ${LIB} ${THRLIB} ${PLUSLIB} ${THRPLUSLIB} ${TOOLS} logevent notify register_sandbox examples ${MAN_GZ} ${MAN8_GZ} endif generate: ${GEN_HDRS} @@ -208,6 +208,9 @@ logevent: logevent.o args.o notify: notify.o ${LINKXX} -o $@ notify.o ${PLUSLIB} ${EXT_LIB} +register_sandbox: %: %.o + ${LINK} -o $@ $@.o ${LIB} ${EXT_LIB} + ${TOOLS} ${EXAMPLES}: %: %.o ${LINK} -o $@ $< ${LIB} ${EXT_LIB} @@ -341,7 +344,7 @@ ifndef LB_STANDALONE # cp -r C CPP ${PREFIX}/share/doc/${package}-${version} # cp -r ${top_srcdir}/doc/api/{Makefile,api.tex,fig} ${PREFIX}/share/doc/${package}-${version}/api endif - for p in logevent notify; do \ + for p in logevent notify register_sandbox; do \ ${INSTALL} -m 755 "$$p" "${PREFIX}/bin/glite-lb-$$p"; \ done for p in ${TOOLS} ; do \ @@ -358,13 +361,14 @@ endif ${INSTALL} -m 644 ${top_srcdir}/examples/README.examples "${PREFIX}/share/doc/${package}-${version}/examples/" ${INSTALL} -m 755 ${top_srcdir}/src/export.sh "${PREFIX}/sbin/glite-lb-export.sh" ${INSTALL} -m 755 ${top_srcdir}/examples/notify.pl ${PREFIX}/examples/glite-lb-notify.pl + ${INSTALL} -m 755 ${top_srcdir}/examples/sandbox_transfers.sh ${PREFIX}/examples/glite-lb-sandbox_transfers.sh ${INSTALL} -m 644 ${MAN_GZ} ${PREFIX}/share/man/man1 ${INSTALL} -m 644 ${MAN8_GZ} ${PREFIX}/share/man/man8 ${INSTALL} -m 644 ${top_srcdir}/m4/*.m4 ${PREFIX}/share/build/m4/ clean: rm -rvf *.o *.lo .libs lib* *.c *.cpp *.h *.dox producer_test notify C/ CPP/ - rm -rvf ${LIB} ${THRLIB} ${TOOLS} logevent ${PLUSLIB} ${THRPLUSLIB} ${MAN_GZ} ${MAN8_GZ} + rm -rvf ${LIB} ${THRLIB} ${TOOLS} logevent register_sandbox ${PLUSLIB} ${THRPLUSLIB} ${MAN_GZ} ${MAN8_GZ} rm -rvf ${EXAMPLES} ${EXAMPLES_CL} ${EXAMPLES_CL_THR} ${sh_PROGS} rm -f EventAttrNames.pl rm -rvf log.xml project/ rpmbuild/ RPMS/ tgz/ diff --git a/org.glite.lb.client/examples/reg_sandbox_transfer.c b/org.glite.lb.client/examples/reg_sandbox_transfer.c deleted file mode 100644 index 52275ea..0000000 --- a/org.glite.lb.client/examples/reg_sandbox_transfer.c +++ /dev/null @@ -1,147 +0,0 @@ -#include -#include -#include -#include -#include -#include - -#include "glite/jobid/cjobid.h" -#ifdef BUILDING_LB_CLIENT -#include "producer.h" -#else -#include "glite/lb/producer.h" -#endif -#include "glite/lb/events.h" - -extern char *optarg; -extern int opterr,optind; - -static void usage(char *me) -{ - fprintf(stderr,"usage: %s [-m bkserver] [-x|-X non-default_sock_path] [-j dg_jobid] [-s source_id] [-n num_subjobs [-S]] [-e seed]\n", me); -} - -int main(int argc, char *argv[]) -{ - char *src = NULL,*job = NULL,*server = NULL,*seq, *seed = NULL; - int lbproxy = 0; - int done = 0, num_subjobs = 0, reg_subjobs = 0, i, type; - edg_wll_Context ctx; - edg_wlc_JobId jobid,*subjobs; - - - edg_wll_InitContext(&ctx); - opterr = 0; - - do { - switch (getopt(argc,argv,"xX:s:j:m:n:Se:")) { - case 'x': lbproxy = 1; break; - case 'X': lbproxy = 1; - edg_wll_SetParam(ctx, EDG_WLL_PARAM_LBPROXY_STORE_SOCK, optarg); - break; - case 's': src = (char *) strdup(optarg); break; - case 'j': job = (char *) strdup(optarg); break; - case 'm': server = strdup(optarg); break; - case 'n': num_subjobs = atoi(optarg); printf("Not implemented yet \n"); exit(1); break; - case 'S': reg_subjobs = 1; break; - case 'e': seed = strdup(optarg); break; - case '?': usage(argv[0]); exit(EINVAL); - case -1: done = 1; break; - } - } while (!done); - - if ((num_subjobs <= 0) && (reg_subjobs) ) { - usage(argv[0]); - exit(EINVAL); - } - - if (!job && !server) { - fprintf(stderr,"%s: either -m server or -j jobid has to be specified\n",argv[0]); - exit(1); - } - - if (!src) { - fprintf(stderr,"%s: -s required\n",argv[0]); - exit(1); - } - - if (!job) { - char *p = strchr(server,':'); - if (p) *p=0; - edg_wlc_JobIdCreate(server,p?atoi(p+1):0,&jobid); - job = edg_wlc_JobIdUnparse(jobid); - printf("new jobid: %s\n",job); - } - else if ((errno = edg_wlc_JobIdParse(job,&jobid))) { - perror(job); - exit(1); - } - - edg_wll_SetParam(ctx,EDG_WLL_PARAM_SOURCE,edg_wll_StringToSource(src)); - - type = num_subjobs ? EDG_WLL_REGJOB_FILE_TRANSFER_COLLECTION : EDG_WLL_REGJOB_FILE_TRANSFER; - - if (lbproxy) { - if (edg_wll_RegisterJobProxy(ctx,jobid,type, - "", "NS", - num_subjobs,seed,&subjobs)) - { - char *et,*ed; - edg_wll_Error(ctx,&et,&ed); - fprintf(stderr,"edg_wll_RegisterJobProxy(%s): %s (%s)\n",job,et,ed); - exit(1); - } - } else { - if (edg_wll_RegisterJob(ctx,jobid,type, - "", "NS", - num_subjobs,seed,&subjobs)) - { - char *et,*ed; - edg_wll_Error(ctx,&et,&ed); - fprintf(stderr,"edg_wll_RegisterJob(%s): %s (%s)\n",job,et,ed); - exit(1); - } - } - - seq = edg_wll_GetSequenceCode(ctx); - printf("\n%s=\"%s\"\n",num_subjobs?"EDG_WL_FILE_TRANSFER_COLLECTION_JOBID":"EDG_WL_FILE_TRANSFER_JOBID",job); - printf("EDG_WL_SEQUENCE=\"%s\"\n",seq); - free(seq); - free(job); - - if (num_subjobs) for (i=0; subjobs[i]; i++) { - char *job_s = edg_wlc_JobIdUnparse(subjobs[i]); - printf("EDG_WL_SUB_JOBID[%d]=\"%s\"\n",i,job_s); - free(job_s); - } - - if (reg_subjobs) { - char ** jdls = (char**) calloc(num_subjobs+1, sizeof(char*)); - - for (i=0; subjobs[i]; i++) { - asprintf(jdls+i, "JDL of subjob #%d\n", i+1); - } - - if (lbproxy) { - if (edg_wll_RegisterSubjobsProxy(ctx, jobid, (const char **) jdls, "NS", subjobs)) { - char *et,*ed; - edg_wll_Error(ctx,&et,&ed); - fprintf(stderr,"edg_wll_RegisterSubjobsProxy: %s (%s)\n", et, ed); - exit(1); - } - } else { - if (edg_wll_RegisterSubjobs(ctx, jobid, (const char **) jdls, "NS", subjobs)) { - char *et,*ed; - edg_wll_Error(ctx,&et,&ed); - fprintf(stderr,"edg_wll_RegisterSubjobs: %s (%s)\n", et, ed); - exit(1); - } - } - - for (i=0; subjobs[i]; i++) free(jdls[i]); - } - - edg_wll_FreeContext(ctx); - - return 0; -} diff --git a/org.glite.lb.client/examples/sandbox_transfers.sh b/org.glite.lb.client/examples/sandbox_transfers.sh new file mode 100644 index 0000000..9b6269c --- /dev/null +++ b/org.glite.lb.client/examples/sandbox_transfers.sh @@ -0,0 +1,73 @@ +#!/bin/sh + + +# Example script illustrating injection of LB commands into Job Wrapper +# to log sandbox transfer related events + +# It can be used as a submitted job to check the functionality, however, +# in this way it interferes with sequence codes logged by JW itself. +# Therefore it should be taken as an example only; the commands from here +# should end up in the generated job wrapper instead. + + +# register sandbox transfers with LB +# sets environment variables: +# GLITE_WMS_SEQUENCE_CODE updated seq. code for the job itself +# GLITE_LB_ISB_JOBID +# GLITE_LB_ISB_SEQUENCE + +eval `glite-lb-register_sandbox \ + --jobid $GLITE_WMS_JOBID \ + --input \ + --from http://users.machine/path/to/sandbox.file \ + --to file://where/it/is/sandbox.file \ + --sequence $GLITE_WMS_SEQUENCE_CODE` + + +eval `glite-lb-register_sandbox \ + --jobid $GLITE_WMS_JOBID \ + --output \ + --from file://where/it/is/sandbox.file2 \ + --to http://users.machine/path/to/sandbox.file2 \ + --sequence $GLITE_WMS_SEQUENCE_CODE` + +# ISB transfer +GLITE_LB_ISB_SEQUENCE=`glite-lb-logevent \ + --source LRMS \ + --jobid $GLITE_LB_ISB_JOBID \ + --sequence $GLITE_LB_ISB_SEQUENCE \ + --event FileTransfer \ + --result START` + +# it takes looong +sleep 60 + +GLITE_LB_ISB_SEQUENCE=`glite-lb-logevent \ + --source LRMS \ + --jobid $GLITE_LB_ISB_JOBID \ + --sequence $GLITE_LB_ISB_SEQUENCE \ + --event FileTransfer \ + --result OK` + +# or FAIL with --reason "because of bad weather" + +# job payload here +sleep 120 + + +GLITE_LB_OSB_SEQUENCE=`glite-lb-logevent \ + --source LRMS \ + --jobid $GLITE_LB_OSB_JOBID \ + --sequence $GLITE_LB_OSB_SEQUENCE \ + --event FileTransfer \ + --result START` + +sleep 60 + +GLITE_LB_OSB_SEQUENCE=`glite-lb-logevent \ + --source LRMS \ + --jobid $GLITE_LB_OSB_JOBID \ + --sequence $GLITE_LB_OSB_SEQUENCE \ + --event FileTransfer \ + --result OK` + diff --git a/org.glite.lb.client/src/EventAttrNames.pl b/org.glite.lb.client/src/EventAttrNames.pl index bddf11c..2bd5593 100644 --- a/org.glite.lb.client/src/EventAttrNames.pl +++ b/org.glite.lb.client/src/EventAttrNames.pl @@ -90,7 +90,7 @@ ORIG_TIMESTAMP SRC DEST - VARIANT + SANDBOX_TYPE TRANSFER_JOB COMPUTE_JOB DEST_URL diff --git a/org.glite.lb.client/src/logevent.c.T b/org.glite.lb.client/src/logevent.c.T index a68c0ef..0f3f4c0 100644 --- a/org.glite.lb.client/src/logevent.c.T +++ b/org.glite.lb.client/src/logevent.c.T @@ -313,7 +313,7 @@ static int flesh_seq(int event) { switch (event) { @@@{ - %seq = ( CREAM => 'CREAM', 'PBS' => 'PBS', 'gLite' => 'NORMAL', Condor=>'CONDOR' ); + %seq = ( CREAM => 'CREAM', 'PBS' => 'PBS', 'gLite' => 'NORMAL', Condor=>'CONDOR', Transfer=>'NORMAL' ); for my $t (sort { $event->{order}->{$a} <=> $event->{order}->{$b} } $event->getTypes) { diff --git a/org.glite.lb.client/src/register_sandbox.c b/org.glite.lb.client/src/register_sandbox.c new file mode 100644 index 0000000..07622b8 --- /dev/null +++ b/org.glite.lb.client/src/register_sandbox.c @@ -0,0 +1,149 @@ +#include +#include +#include +#include + +#ifndef _GNU_SOURCE +#define _GNU_SOURCE +#endif +#include + + +#include "glite/jobid/cjobid.h" +#ifdef BUILDING_LB_CLIENT +#include "producer.h" +#else +#include "glite/lb/producer.h" +#endif +#include "glite/lb/events.h" + +static void usage(char *me) +{ + fprintf(stderr,"usage %s: \n" + " -j,--jobid jobid\n" + " -i,--input or -o,--output (mutually exclusive)\n" + " -f,--from URI\n" + " -t,--to URI\n" + " [ -s,--source lb-source ] (default LRMS)\n" + " [ -c,--sequence lb-seqcode ]\n" + ,me); +} + +#define check_log(fun,_jobid,arg) \ + if (fun arg) { \ + char *et,*ed; \ + edg_wll_Error(ctx,&et,&ed); \ + fprintf(stderr,#fun "(%s): %s (%s)\n",_jobid,et,ed); \ + exit(1); \ + } + + +int main(int argc,char **argv) +{ + struct option opts[] = { + { "jobid",1,NULL,'j' }, + { "input",0,NULL,'i' }, + { "output",0,NULL,'o' }, + { "from",1,NULL,'f' }, + { "to",1,NULL,'t' }, + { "source",1,NULL,'s' }, + { "sequence",1,NULL,'c' }, + }; + + char *jobid_s = NULL, *ftjobid_s = NULL, *from = NULL, *to = NULL, *source_s = "LRMS", + *sequence = NULL, *srv, *uniq, type_c = 'x'; + + unsigned int port; + int o; + + glite_jobid_t jobid, ftjobid; + edg_wll_Source source; + + enum edg_wll_SandboxSandbox_type type = + EDG_WLL_SANDBOX_SANDBOX_TYPE_UNDEFINED; + + edg_wll_Context ctx; + + while ((o = getopt_long(argc,argv,"j:iof:t:s:c:",opts,NULL)) != EOF) switch(o) { + case 'j': jobid_s = optarg; break; + case 'i': if (type != EDG_WLL_SANDBOX_SANDBOX_TYPE_UNDEFINED) { usage(argv[0]); exit(1); } + type = EDG_WLL_SANDBOX_INPUT; + break; + case 'o': if (type != EDG_WLL_SANDBOX_SANDBOX_TYPE_UNDEFINED) { usage(argv[0]); exit(1); } + type = EDG_WLL_SANDBOX_OUTPUT; + break; + case 'f': from = optarg; break; + case 't': to = optarg; break; + case 's': source_s = optarg; break; + case 'c': sequence = optarg; break; + default: usage(argv[0]); exit(1); + } + + edg_wll_InitContext(&ctx); + + if (!jobid_s || type == EDG_WLL_SANDBOX_SANDBOX_TYPE_UNDEFINED + || !from || !to) + { + usage(argv[0]); + exit(1); + } + + type_c = type == EDG_WLL_SANDBOX_INPUT ? 'I' : 'O'; + + if (glite_jobid_parse(jobid_s,&jobid)) { + fprintf(stderr,"%s: can't parse\n",jobid_s); + exit(1); + } + + if ((source = edg_wll_StringToSource(source_s)) == EDG_WLL_SOURCE_NONE) { + fprintf(stderr,"%s: invalid source\n",source_s); + exit(1); +} +; + edg_wll_SetParam(ctx, EDG_WLL_PARAM_SOURCE, source); + + glite_jobid_getServerParts(jobid,&srv,&port); + glite_jobid_create(srv,port,&ftjobid); + uniq = glite_jobid_getUnique(ftjobid); + glite_jobid_free(ftjobid); + + asprintf(&ftjobid_s,"https://%s:%d/FT:%s",srv,port,uniq); + assert(glite_jobid_parse(ftjobid_s,&ftjobid) == 0); + + edg_wll_SetLoggingJob(ctx,ftjobid,NULL,EDG_WLL_SEQ_NORMAL); + + check_log(edg_wll_RegisterJob,ftjobid_s,( + ctx, + ftjobid, EDG_WLL_REGJOB_FILE_TRANSFER, + "n/a","n/a", + 0,NULL,NULL)); + + check_log(edg_wll_LogFileTransferRegister,ftjobid_s,(ctx,from,to)); + check_log(edg_wll_LogSandbox,ftjobid_s,( + ctx, + edg_wll_SandboxSandbox_typeToString(type), + NULL, + jobid_s)); + + printf("GLITE_LB_%cSB_JOBID=\"%s\"\nGLITE_LB_%cSB_SEQUENCE=\"%s\"\n", + type_c,ftjobid_s, + type_c,edg_wll_GetSequenceCode(ctx)); + + if (edg_wll_SetLoggingJob(ctx,jobid,sequence,EDG_WLL_SEQ_NORMAL)) { + char *et,*ed; + edg_wll_Error(ctx,&et,&ed); + fprintf(stderr,"edg_wll_SetLoggingJob(%s,%s): %s (%s)\n", + jobid_s,sequence,et,ed); + exit(1); + } + + check_log(edg_wll_LogSandbox,jobid_s,( + ctx, + edg_wll_SandboxSandbox_typeToString(type), + ftjobid_s, + NULL)); + + printf("GLITE_WMS_SEQUENCE_CODE=\"%s\"\n",edg_wll_GetSequenceCode(ctx)); + + return 0; +} diff --git a/org.glite.lb.state-machine/src/process_event.c b/org.glite.lb.state-machine/src/process_event.c index 2af3def..5d91734 100644 --- a/org.glite.lb.state-machine/src/process_event.c +++ b/org.glite.lb.state-machine/src/process_event.c @@ -952,12 +952,12 @@ static int processEvent_glite(intJobStat *js, edg_wll_Event *e, int ev_seq, int break; case EDG_WLL_EVENT_SANDBOX: if (USABLE_DATA(res, strict)) { - if ((e->sandbox.variant == EDG_WLL_SANDBOX_INPUT) && e->sandbox.transfer_job) { + if ((e->sandbox.sandbox_type == EDG_WLL_SANDBOX_INPUT) && e->sandbox.transfer_job) { edg_wlc_JobIdFree(js->pub.isb_transfer); edg_wlc_JobIdParse(e->sandbox.transfer_job,&js->pub.isb_transfer); } - if ((e->sandbox.variant == EDG_WLL_SANDBOX_OUTPUT) && e->sandbox.transfer_job) { + if ((e->sandbox.sandbox_type == EDG_WLL_SANDBOX_OUTPUT) && e->sandbox.transfer_job) { edg_wlc_JobIdFree(js->pub.osb_transfer); edg_wlc_JobIdParse(e->sandbox.transfer_job,&js->pub.osb_transfer); } diff --git a/org.glite.lb.state-machine/src/process_event_file_transfer.c b/org.glite.lb.state-machine/src/process_event_file_transfer.c index 144c3b5..db2364b 100644 --- a/org.glite.lb.state-machine/src/process_event_file_transfer.c +++ b/org.glite.lb.state-machine/src/process_event_file_transfer.c @@ -100,10 +100,10 @@ int processEvent_FileTransfer(intJobStat *js, edg_wll_Event *e, int ev_seq, int break; case EDG_WLL_EVENT_SANDBOX: if (USABLE_DATA(res)) { - if (e->sandbox.variant == EDG_WLL_SANDBOX_INPUT) + if (e->sandbox.sandbox_type == EDG_WLL_SANDBOX_INPUT) js->pub.ft_sandbox_type = EDG_WLL_STAT_INPUT; - if (e->sandbox.variant == EDG_WLL_SANDBOX_OUTPUT) + if (e->sandbox.sandbox_type == EDG_WLL_SANDBOX_OUTPUT) js->pub.ft_sandbox_type = EDG_WLL_STAT_OUTPUT; if (e->sandbox.compute_job) { diff --git a/org.glite.lb.types/events.T b/org.glite.lb.types/events.T index 0cf542b..feef0de 100644 --- a/org.glite.lb.types/events.T +++ b/org.glite.lb.types/events.T @@ -434,6 +434,8 @@ string LRMS_jobid _optional_ +@flesh Transfer + @type FileTransferRegister register file transfer string src Source of file transfer. string dest Destination of file transfer. @@ -445,9 +447,10 @@ _code_ OK The file was transfered successfully. _code_ FAIL The file transfer failed (see reason) string reason The reason of file transfer failure. + _optional_ @type Sandbox event for logging relationship between (compute) job and (file) transfer job - int variant Type of sandbox. + int sandbox_type Type of sandbox. _code_ INPUT _code_ OUTPUT string transfer_job Jobid of transfer job (collection) containing set of file transfers (subjobs). -- 1.8.2.3