From b68b3dd2e08f0d1812c4f667e5855aaaa3a3057e Mon Sep 17 00:00:00 2001 From: =?utf8?q?Milo=C5=A1=20Mula=C4=8D?= Date: Wed, 16 Dec 2009 15:53:45 +0000 Subject: [PATCH] file transfer state automaton stubs + simple client for registrantion --- org.glite.lb.client/Makefile | 2 +- org.glite.lb.client/examples/job_status.c | 6 + .../examples/reg_sandbox_transfer.c | 147 +++++++++++++++++++++ org.glite.lb.state-machine/Makefile | 4 +- org.glite.lb.state-machine/src/process_event.c | 12 ++ .../src/process_event_file_transfer.c | 80 +++++++++++ .../src/process_event_file_transfer_collection.c | 80 +++++++++++ 7 files changed, 328 insertions(+), 3 deletions(-) create mode 100644 org.glite.lb.client/examples/reg_sandbox_transfer.c create mode 100644 org.glite.lb.state-machine/src/process_event_file_transfer.c create mode 100644 org.glite.lb.state-machine/src/process_event_file_transfer_collection.c diff --git a/org.glite.lb.client/Makefile b/org.glite.lb.client/Makefile index 3e2bccf..370c20f 100644 --- a/org.glite.lb.client/Makefile +++ b/org.glite.lb.client/Makefile @@ -143,7 +143,7 @@ PLUSLIB:=libglite_lb_clientpp_${nothrflavour}.la THRPLUSLIB:=libglite_lb_clientpp_${thrflavour}.la TOOLS:=${LB_PERF_TOOLS} -EXAMPLES_SRC:=log_usertag_proxy.c job_log.c job_reg.c feed_shark.c query_ext.c query_seq_code.c stats.c abort_job.c change_acl.c stresslog.c flood_proxy.c dagids.c stress_context.c parse_eventsfile.c test_changed_jdl.c +EXAMPLES_SRC:=log_usertag_proxy.c job_log.c job_reg.c feed_shark.c query_ext.c query_seq_code.c stats.c abort_job.c change_acl.c stresslog.c flood_proxy.c dagids.c stress_context.c parse_eventsfile.c test_changed_jdl.c reg_sandbox_transfer.c EXAMPLES:=${EXAMPLES_SRC:.c=} # TODO: migrate them here from branch_RC31_3 diff --git a/org.glite.lb.client/examples/job_status.c b/org.glite.lb.client/examples/job_status.c index ac5cd67..bf4b33f 100644 --- a/org.glite.lb.client/examples/job_status.c +++ b/org.glite.lb.client/examples/job_status.c @@ -222,6 +222,12 @@ static void printstat(edg_wll_JobStat stat, int level) case EDG_WLL_STAT_CREAM: printf("%sjobtype : CREAM\n", ind); break; + case EDG_WLL_STAT_FILE_TRANSFER: + printf("%sjobtype : FILE_TRANSFER\n", ind); + break; + case EDG_WLL_STAT_FILE_TRANSFER_COLLECTION: + printf("%sjobtype : FILE_TRANSFER_COLLECTION\n", ind); + break; default: printf("%sjobtype : UNKNOWN\n", ind); break; diff --git a/org.glite.lb.client/examples/reg_sandbox_transfer.c b/org.glite.lb.client/examples/reg_sandbox_transfer.c new file mode 100644 index 0000000..52275ea --- /dev/null +++ b/org.glite.lb.client/examples/reg_sandbox_transfer.c @@ -0,0 +1,147 @@ +#include +#include +#include +#include +#include +#include + +#include "glite/jobid/cjobid.h" +#ifdef BUILDING_LB_CLIENT +#include "producer.h" +#else +#include "glite/lb/producer.h" +#endif +#include "glite/lb/events.h" + +extern char *optarg; +extern int opterr,optind; + +static void usage(char *me) +{ + fprintf(stderr,"usage: %s [-m bkserver] [-x|-X non-default_sock_path] [-j dg_jobid] [-s source_id] [-n num_subjobs [-S]] [-e seed]\n", me); +} + +int main(int argc, char *argv[]) +{ + char *src = NULL,*job = NULL,*server = NULL,*seq, *seed = NULL; + int lbproxy = 0; + int done = 0, num_subjobs = 0, reg_subjobs = 0, i, type; + edg_wll_Context ctx; + edg_wlc_JobId jobid,*subjobs; + + + edg_wll_InitContext(&ctx); + opterr = 0; + + do { + switch (getopt(argc,argv,"xX:s:j:m:n:Se:")) { + case 'x': lbproxy = 1; break; + case 'X': lbproxy = 1; + edg_wll_SetParam(ctx, EDG_WLL_PARAM_LBPROXY_STORE_SOCK, optarg); + break; + case 's': src = (char *) strdup(optarg); break; + case 'j': job = (char *) strdup(optarg); break; + case 'm': server = strdup(optarg); break; + case 'n': num_subjobs = atoi(optarg); printf("Not implemented yet \n"); exit(1); break; + case 'S': reg_subjobs = 1; break; + case 'e': seed = strdup(optarg); break; + case '?': usage(argv[0]); exit(EINVAL); + case -1: done = 1; break; + } + } while (!done); + + if ((num_subjobs <= 0) && (reg_subjobs) ) { + usage(argv[0]); + exit(EINVAL); + } + + if (!job && !server) { + fprintf(stderr,"%s: either -m server or -j jobid has to be specified\n",argv[0]); + exit(1); + } + + if (!src) { + fprintf(stderr,"%s: -s required\n",argv[0]); + exit(1); + } + + if (!job) { + char *p = strchr(server,':'); + if (p) *p=0; + edg_wlc_JobIdCreate(server,p?atoi(p+1):0,&jobid); + job = edg_wlc_JobIdUnparse(jobid); + printf("new jobid: %s\n",job); + } + else if ((errno = edg_wlc_JobIdParse(job,&jobid))) { + perror(job); + exit(1); + } + + edg_wll_SetParam(ctx,EDG_WLL_PARAM_SOURCE,edg_wll_StringToSource(src)); + + type = num_subjobs ? EDG_WLL_REGJOB_FILE_TRANSFER_COLLECTION : EDG_WLL_REGJOB_FILE_TRANSFER; + + if (lbproxy) { + if (edg_wll_RegisterJobProxy(ctx,jobid,type, + "", "NS", + num_subjobs,seed,&subjobs)) + { + char *et,*ed; + edg_wll_Error(ctx,&et,&ed); + fprintf(stderr,"edg_wll_RegisterJobProxy(%s): %s (%s)\n",job,et,ed); + exit(1); + } + } else { + if (edg_wll_RegisterJob(ctx,jobid,type, + "", "NS", + num_subjobs,seed,&subjobs)) + { + char *et,*ed; + edg_wll_Error(ctx,&et,&ed); + fprintf(stderr,"edg_wll_RegisterJob(%s): %s (%s)\n",job,et,ed); + exit(1); + } + } + + seq = edg_wll_GetSequenceCode(ctx); + printf("\n%s=\"%s\"\n",num_subjobs?"EDG_WL_FILE_TRANSFER_COLLECTION_JOBID":"EDG_WL_FILE_TRANSFER_JOBID",job); + printf("EDG_WL_SEQUENCE=\"%s\"\n",seq); + free(seq); + free(job); + + if (num_subjobs) for (i=0; subjobs[i]; i++) { + char *job_s = edg_wlc_JobIdUnparse(subjobs[i]); + printf("EDG_WL_SUB_JOBID[%d]=\"%s\"\n",i,job_s); + free(job_s); + } + + if (reg_subjobs) { + char ** jdls = (char**) calloc(num_subjobs+1, sizeof(char*)); + + for (i=0; subjobs[i]; i++) { + asprintf(jdls+i, "JDL of subjob #%d\n", i+1); + } + + if (lbproxy) { + if (edg_wll_RegisterSubjobsProxy(ctx, jobid, (const char **) jdls, "NS", subjobs)) { + char *et,*ed; + edg_wll_Error(ctx,&et,&ed); + fprintf(stderr,"edg_wll_RegisterSubjobsProxy: %s (%s)\n", et, ed); + exit(1); + } + } else { + if (edg_wll_RegisterSubjobs(ctx, jobid, (const char **) jdls, "NS", subjobs)) { + char *et,*ed; + edg_wll_Error(ctx,&et,&ed); + fprintf(stderr,"edg_wll_RegisterSubjobs: %s (%s)\n", et, ed); + exit(1); + } + } + + for (i=0; subjobs[i]; i++) free(jdls[i]); + } + + edg_wll_FreeContext(ctx); + + return 0; +} diff --git a/org.glite.lb.state-machine/Makefile b/org.glite.lb.state-machine/Makefile index fc147a7..8f7f9cf 100644 --- a/org.glite.lb.state-machine/Makefile +++ b/org.glite.lb.state-machine/Makefile @@ -47,8 +47,8 @@ COMMON_LIBS:= -L${stagedir}/lib -lglite_lb_common_${nothrflavour} -lglite_secur PLUGIN_LIBS:= -L${stagedir}/lib -lglite_lb_common_${nothrflavour}\ ${classadslib} -lstdc++ ${expatlib} -lexpat\ -PLUGIN_LOBJS:=seqcode_aux.lo process_event.lo process_event_pbs.lo process_event_condor.lo process_event_cream.lo lb_plugin.lo -MACHINE_OBJS:=seqcode_aux.o process_event.o process_event_pbs.o process_event_condor.o process_event_cream.o +PLUGIN_LOBJS:=seqcode_aux.lo process_event.lo process_event_pbs.lo process_event_condor.lo process_event_cream.lo lb_plugin.lo process_event_file_transfer.lo process_event_file_transfer_collection.lo +MACHINE_OBJS:=seqcode_aux.o process_event.o process_event_pbs.o process_event_condor.o process_event_cream.o process_event_file_transfer.o process_event_file_transfer_collection.o PLUGIN_LIB=glite_lb_plugin.la MACHINE_LIB=libglite_lb_statemachine.a diff --git a/org.glite.lb.state-machine/src/process_event.c b/org.glite.lb.state-machine/src/process_event.c index 00ac728..0173701 100644 --- a/org.glite.lb.state-machine/src/process_event.c +++ b/org.glite.lb.state-machine/src/process_event.c @@ -29,6 +29,8 @@ static int processEvent_glite(intJobStat *js, edg_wll_Event *e, int ev_seq, int int processEvent_PBS(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict, char **errstring); int processEvent_Condor(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict, char **errstring); int processEvent_Cream(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict, char **errstring); +int processEvent_FileTransfer(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict, char **errstring); +int processEvent_FileTransferCollection(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict, char **errstring); int add_stringlist(char ***lptr, const char *new_item); @@ -56,6 +58,12 @@ int processEvent(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict, char case EDG_WLL_REGJOB_CREAM: js->pub.jobtype = EDG_WLL_STAT_CREAM; break; + case EDG_WLL_REGJOB_FILE_TRANSFER: + js->pub.jobtype = EDG_WLL_STAT_FILE_TRANSFER; + break; + case EDG_WLL_REGJOB_FILE_TRANSFER_COLLECTION: + js->pub.jobtype = EDG_WLL_STAT_FILE_TRANSFER_COLLECTION; + break; default: trio_asprintf(errstring,"unknown job type %d in registration",e->regJob.jobtype); return RET_FAIL; @@ -72,6 +80,10 @@ int processEvent(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict, char return processEvent_Condor(js,e,ev_seq,strict,errstring); case EDG_WLL_STAT_CREAM: return processEvent_Cream(js,e,ev_seq,strict,errstring); + case EDG_WLL_STAT_FILE_TRANSFER: + return processEvent_FileTransfer(js,e,ev_seq,strict,errstring); + case EDG_WLL_STAT_FILE_TRANSFER_COLLECTION: + return processEvent_FileTransferCollection(js,e,ev_seq,strict,errstring); case -1: return RET_UNREG; default: trio_asprintf(errstring,"undefined job type %d",js->pub.jobtype); diff --git a/org.glite.lb.state-machine/src/process_event_file_transfer.c b/org.glite.lb.state-machine/src/process_event_file_transfer.c new file mode 100644 index 0000000..494cbec --- /dev/null +++ b/org.glite.lb.state-machine/src/process_event_file_transfer.c @@ -0,0 +1,80 @@ +#ident "$Header$" + +#include +#include +#include +#include +#include +#include + +#include "glite/lb/context-int.h" + +#include "glite/lbu/trio.h" + +#include "intjobstat.h" +#include "seqcode_aux.h" + +/* TBD: share in whole logging or workload */ +#ifdef __GNUC__ +#define UNUSED_VAR __attribute__((unused)) +#else +#define UNUSED_VAR +#endif + +static int compare_timestamps(struct timeval a, struct timeval b) +{ + if ( (a.tv_sec > b.tv_sec) || + ((a.tv_sec == b.tv_sec) && (a.tv_usec > b.tv_usec)) ) return 1; + if ( (a.tv_sec < b.tv_sec) || + ((a.tv_sec == b.tv_sec) && (a.tv_usec < b.tv_usec)) ) return -1; + return 0; +} + + +// XXX move this defines into some common place to be reusable +#define USABLE(res) ((res) == RET_OK) +#define USABLE_DATA(res) (1) +#define rep(a,b) { free(a); a = (b == NULL) ? NULL : strdup(b); } +#define rep_cond(a,b) { if (b) { free(a); a = strdup(b); } } + +int processEvent_FileTransfer(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict, char **errstring) +{ + edg_wll_JobStatCode old_state = js->pub.state; + int res = RET_OK; + + +/* not used in FT + if ((js->last_seqcode != NULL) && + (edg_wll_compare_pbs_seq(js->last_seqcode, e->any.seqcode) > 0) ) { + res = RET_LATE; + } +*/ + switch (e->any.type) { + case EDG_WLL_EVENT_REGJOB: + if (USABLE(res)) { + js->pub.state = EDG_WLL_JOB_SUBMITTED; + } + if (USABLE_DATA(res)) { + ; + } + break; + default: + break; + } + + if (USABLE(res)) { + rep(js->last_seqcode, e->any.seqcode); + + js->pub.lastUpdateTime = e->any.timestamp; + if (old_state != js->pub.state) { + js->pub.stateEnterTime = js->pub.lastUpdateTime; + js->pub.stateEnterTimes[1 + js->pub.state] + = (int)js->pub.lastUpdateTime.tv_sec; + } + } + if (! js->pub.location) js->pub.location = strdup("this is FILE TRANSFER"); + + + return RET_OK; +} + diff --git a/org.glite.lb.state-machine/src/process_event_file_transfer_collection.c b/org.glite.lb.state-machine/src/process_event_file_transfer_collection.c new file mode 100644 index 0000000..3d7a758 --- /dev/null +++ b/org.glite.lb.state-machine/src/process_event_file_transfer_collection.c @@ -0,0 +1,80 @@ +#ident "$Header$" + +#include +#include +#include +#include +#include +#include + +#include "glite/lb/context-int.h" + +#include "glite/lbu/trio.h" + +#include "intjobstat.h" +#include "seqcode_aux.h" + +/* TBD: share in whole logging or workload */ +#ifdef __GNUC__ +#define UNUSED_VAR __attribute__((unused)) +#else +#define UNUSED_VAR +#endif + +static int compare_timestamps(struct timeval a, struct timeval b) +{ + if ( (a.tv_sec > b.tv_sec) || + ((a.tv_sec == b.tv_sec) && (a.tv_usec > b.tv_usec)) ) return 1; + if ( (a.tv_sec < b.tv_sec) || + ((a.tv_sec == b.tv_sec) && (a.tv_usec < b.tv_usec)) ) return -1; + return 0; +} + + +// XXX move this defines into some common place to be reusable +#define USABLE(res) ((res) == RET_OK) +#define USABLE_DATA(res) (1) +#define rep(a,b) { free(a); a = (b == NULL) ? NULL : strdup(b); } +#define rep_cond(a,b) { if (b) { free(a); a = strdup(b); } } + +int processEvent_FileTransferCollection(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict, char **errstring) +{ + edg_wll_JobStatCode old_state = js->pub.state; + int res = RET_OK; + + +/* not used in FTC + if ((js->last_seqcode != NULL) && + (edg_wll_compare_pbs_seq(js->last_seqcode, e->any.seqcode) > 0) ) { + res = RET_LATE; + } +*/ + switch (e->any.type) { + case EDG_WLL_EVENT_REGJOB: + if (USABLE(res)) { + js->pub.state = EDG_WLL_JOB_SUBMITTED; + } + if (USABLE_DATA(res)) { + ; + } + break; + default: + break; + } + + if (USABLE(res)) { + rep(js->last_seqcode, e->any.seqcode); + + js->pub.lastUpdateTime = e->any.timestamp; + if (old_state != js->pub.state) { + js->pub.stateEnterTime = js->pub.lastUpdateTime; + js->pub.stateEnterTimes[1 + js->pub.state] + = (int)js->pub.lastUpdateTime.tv_sec; + } + } + if (! js->pub.location) js->pub.location = strdup("this is FILE TRANSFER"); + + + return RET_OK; +} + -- 1.8.2.3