file transfer state automaton stubs + simple client for registrantion
authorMiloš Mulač <mulac@civ.zcu.cz>
Wed, 16 Dec 2009 15:53:45 +0000 (15:53 +0000)
committerMiloš Mulač <mulac@civ.zcu.cz>
Wed, 16 Dec 2009 15:53:45 +0000 (15:53 +0000)
org.glite.lb.client/Makefile
org.glite.lb.client/examples/job_status.c
org.glite.lb.client/examples/reg_sandbox_transfer.c [new file with mode: 0644]
org.glite.lb.state-machine/Makefile
org.glite.lb.state-machine/src/process_event.c
org.glite.lb.state-machine/src/process_event_file_transfer.c [new file with mode: 0644]
org.glite.lb.state-machine/src/process_event_file_transfer_collection.c [new file with mode: 0644]

index 3e2bccf..370c20f 100644 (file)
@@ -143,7 +143,7 @@ PLUSLIB:=libglite_lb_clientpp_${nothrflavour}.la
 THRPLUSLIB:=libglite_lb_clientpp_${thrflavour}.la
 
 TOOLS:=${LB_PERF_TOOLS}
-EXAMPLES_SRC:=log_usertag_proxy.c job_log.c job_reg.c feed_shark.c query_ext.c query_seq_code.c stats.c abort_job.c change_acl.c stresslog.c flood_proxy.c dagids.c stress_context.c parse_eventsfile.c test_changed_jdl.c
+EXAMPLES_SRC:=log_usertag_proxy.c job_log.c job_reg.c feed_shark.c query_ext.c query_seq_code.c stats.c abort_job.c change_acl.c stresslog.c flood_proxy.c dagids.c stress_context.c parse_eventsfile.c test_changed_jdl.c reg_sandbox_transfer.c
 EXAMPLES:=${EXAMPLES_SRC:.c=}
 
 # TODO: migrate them here from branch_RC31_3
index ac5cd67..bf4b33f 100644 (file)
@@ -222,6 +222,12 @@ static void printstat(edg_wll_JobStat stat, int level)
                case EDG_WLL_STAT_CREAM:
                        printf("%sjobtype : CREAM\n", ind);
                         break;
+               case EDG_WLL_STAT_FILE_TRANSFER:
+                       printf("%sjobtype : FILE_TRANSFER\n", ind);
+                        break;
+               case EDG_WLL_STAT_FILE_TRANSFER_COLLECTION:
+                       printf("%sjobtype : FILE_TRANSFER_COLLECTION\n", ind);
+                        break;
                default:
                        printf("%sjobtype : UNKNOWN\n", ind);
                        break;
diff --git a/org.glite.lb.client/examples/reg_sandbox_transfer.c b/org.glite.lb.client/examples/reg_sandbox_transfer.c
new file mode 100644 (file)
index 0000000..52275ea
--- /dev/null
@@ -0,0 +1,147 @@
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <string.h>
+#include <errno.h>
+#include <fcntl.h>
+
+#include "glite/jobid/cjobid.h"
+#ifdef BUILDING_LB_CLIENT
+#include "producer.h"
+#else
+#include "glite/lb/producer.h"
+#endif
+#include "glite/lb/events.h"
+
+extern char *optarg;
+extern int opterr,optind;
+
+static void usage(char *me)
+{
+       fprintf(stderr,"usage: %s [-m bkserver] [-x|-X non-default_sock_path] [-j dg_jobid] [-s source_id] [-n num_subjobs [-S]] [-e seed]\n", me);
+}
+
+int main(int argc, char *argv[])
+{
+       char *src = NULL,*job = NULL,*server = NULL,*seq, *seed = NULL;
+       int lbproxy = 0;
+       int done = 0, num_subjobs = 0, reg_subjobs = 0, i, type;
+       edg_wll_Context ctx;
+       edg_wlc_JobId   jobid,*subjobs;
+
+
+       edg_wll_InitContext(&ctx);
+       opterr = 0;
+
+       do {
+               switch (getopt(argc,argv,"xX:s:j:m:n:Se:")) {
+                       case 'x': lbproxy = 1; break;
+                       case 'X': lbproxy = 1; 
+                                 edg_wll_SetParam(ctx, EDG_WLL_PARAM_LBPROXY_STORE_SOCK, optarg);
+                                 break;
+                       case 's': src = (char *) strdup(optarg); break;
+                       case 'j': job = (char *) strdup(optarg); break;
+                       case 'm': server = strdup(optarg); break;
+                       case 'n': num_subjobs = atoi(optarg); printf("Not implemented yet \n"); exit(1);  break;
+                       case 'S': reg_subjobs = 1; break;
+                       case 'e': seed = strdup(optarg); break;
+                       case '?': usage(argv[0]); exit(EINVAL);
+                       case -1: done = 1; break;
+               }
+       } while (!done);
+
+       if ((num_subjobs <= 0) && (reg_subjobs) ) {
+               usage(argv[0]);
+               exit(EINVAL);
+       }       
+
+       if (!job && !server) {
+               fprintf(stderr,"%s: either -m server or -j jobid has to be specified\n",argv[0]);
+               exit(1);
+       }
+
+       if (!src) {
+               fprintf(stderr,"%s: -s required\n",argv[0]);
+               exit(1);
+       }
+
+       if (!job) {
+               char *p = strchr(server,':');
+               if (p) *p=0;
+               edg_wlc_JobIdCreate(server,p?atoi(p+1):0,&jobid);
+               job = edg_wlc_JobIdUnparse(jobid);
+               printf("new jobid: %s\n",job);
+       }
+       else if ((errno = edg_wlc_JobIdParse(job,&jobid))) {
+               perror(job);
+               exit(1);
+       }
+
+       edg_wll_SetParam(ctx,EDG_WLL_PARAM_SOURCE,edg_wll_StringToSource(src));
+
+       type = num_subjobs ? EDG_WLL_REGJOB_FILE_TRANSFER_COLLECTION : EDG_WLL_REGJOB_FILE_TRANSFER;
+
+       if (lbproxy) {
+               if (edg_wll_RegisterJobProxy(ctx,jobid,type,
+                       "", "NS",
+                       num_subjobs,seed,&subjobs))
+               {
+                       char    *et,*ed;
+                       edg_wll_Error(ctx,&et,&ed);
+                       fprintf(stderr,"edg_wll_RegisterJobProxy(%s): %s (%s)\n",job,et,ed);
+                       exit(1);
+               }
+       } else {
+               if (edg_wll_RegisterJob(ctx,jobid,type,
+                       "", "NS",
+                       num_subjobs,seed,&subjobs))
+               {
+                       char    *et,*ed;
+                       edg_wll_Error(ctx,&et,&ed);
+                       fprintf(stderr,"edg_wll_RegisterJob(%s): %s (%s)\n",job,et,ed);
+                       exit(1);
+               }
+       }
+
+       seq = edg_wll_GetSequenceCode(ctx);
+       printf("\n%s=\"%s\"\n",num_subjobs?"EDG_WL_FILE_TRANSFER_COLLECTION_JOBID":"EDG_WL_FILE_TRANSFER_JOBID",job);
+       printf("EDG_WL_SEQUENCE=\"%s\"\n",seq);
+       free(seq);
+       free(job);
+
+       if (num_subjobs) for (i=0; subjobs[i]; i++) {
+               char    *job_s = edg_wlc_JobIdUnparse(subjobs[i]);
+               printf("EDG_WL_SUB_JOBID[%d]=\"%s\"\n",i,job_s);
+               free(job_s);
+       }
+
+       if (reg_subjobs) {
+               char ** jdls = (char**) calloc(num_subjobs+1, sizeof(char*));
+
+               for (i=0; subjobs[i]; i++) {
+                       asprintf(jdls+i, "JDL of subjob #%d\n", i+1);
+               }
+
+               if (lbproxy) {
+                       if (edg_wll_RegisterSubjobsProxy(ctx, jobid, (const char **) jdls, "NS", subjobs)) {
+                               char    *et,*ed;
+                               edg_wll_Error(ctx,&et,&ed);
+                               fprintf(stderr,"edg_wll_RegisterSubjobsProxy: %s (%s)\n", et, ed);
+                               exit(1);
+                       }
+               } else {
+                       if (edg_wll_RegisterSubjobs(ctx, jobid, (const char **) jdls, "NS", subjobs)) {
+                               char    *et,*ed;
+                               edg_wll_Error(ctx,&et,&ed);
+                               fprintf(stderr,"edg_wll_RegisterSubjobs: %s (%s)\n", et, ed);
+                               exit(1);
+                       }
+               }
+
+               for (i=0; subjobs[i]; i++) free(jdls[i]);
+       }
+
+       edg_wll_FreeContext(ctx);
+
+       return 0;
+}
index fc147a7..8f7f9cf 100644 (file)
@@ -47,8 +47,8 @@ COMMON_LIBS:= -L${stagedir}/lib  -lglite_lb_common_${nothrflavour} -lglite_secur
 PLUGIN_LIBS:= -L${stagedir}/lib -lglite_lb_common_${nothrflavour}\
        ${classadslib} -lstdc++ ${expatlib} -lexpat\
 
-PLUGIN_LOBJS:=seqcode_aux.lo process_event.lo process_event_pbs.lo process_event_condor.lo process_event_cream.lo lb_plugin.lo
-MACHINE_OBJS:=seqcode_aux.o process_event.o process_event_pbs.o process_event_condor.o process_event_cream.o
+PLUGIN_LOBJS:=seqcode_aux.lo process_event.lo process_event_pbs.lo process_event_condor.lo process_event_cream.lo lb_plugin.lo  process_event_file_transfer.lo process_event_file_transfer_collection.lo
+MACHINE_OBJS:=seqcode_aux.o process_event.o process_event_pbs.o process_event_condor.o process_event_cream.o process_event_file_transfer.o process_event_file_transfer_collection.o
 
 PLUGIN_LIB=glite_lb_plugin.la
 MACHINE_LIB=libglite_lb_statemachine.a
index 00ac728..0173701 100644 (file)
@@ -29,6 +29,8 @@ static int processEvent_glite(intJobStat *js, edg_wll_Event *e, int ev_seq, int
 int processEvent_PBS(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict, char **errstring);
 int processEvent_Condor(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict, char **errstring);
 int processEvent_Cream(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict, char **errstring);
+int processEvent_FileTransfer(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict, char **errstring);
+int processEvent_FileTransferCollection(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict, char **errstring);
 
 int add_stringlist(char ***lptr, const char *new_item);
 
@@ -56,6 +58,12 @@ int processEvent(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict, char
                        case EDG_WLL_REGJOB_CREAM:
                                js->pub.jobtype = EDG_WLL_STAT_CREAM;
                                break;
+                       case EDG_WLL_REGJOB_FILE_TRANSFER:
+                               js->pub.jobtype = EDG_WLL_STAT_FILE_TRANSFER;
+                               break;
+                       case EDG_WLL_REGJOB_FILE_TRANSFER_COLLECTION:
+                               js->pub.jobtype = EDG_WLL_STAT_FILE_TRANSFER_COLLECTION;
+                               break;
                        default:
                                trio_asprintf(errstring,"unknown job type %d in registration",e->regJob.jobtype);
                                return RET_FAIL;
@@ -72,6 +80,10 @@ int processEvent(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict, char
                        return processEvent_Condor(js,e,ev_seq,strict,errstring);
                case EDG_WLL_STAT_CREAM: 
                        return processEvent_Cream(js,e,ev_seq,strict,errstring);
+               case EDG_WLL_STAT_FILE_TRANSFER:
+                       return processEvent_FileTransfer(js,e,ev_seq,strict,errstring);
+               case EDG_WLL_STAT_FILE_TRANSFER_COLLECTION:
+                       return processEvent_FileTransferCollection(js,e,ev_seq,strict,errstring);
                case -1: return RET_UNREG;
                default: 
                        trio_asprintf(errstring,"undefined job type %d",js->pub.jobtype);
diff --git a/org.glite.lb.state-machine/src/process_event_file_transfer.c b/org.glite.lb.state-machine/src/process_event_file_transfer.c
new file mode 100644 (file)
index 0000000..494cbec
--- /dev/null
@@ -0,0 +1,80 @@
+#ident "$Header$"
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <errno.h>
+#include <assert.h>
+#include <syslog.h>
+
+#include "glite/lb/context-int.h"
+
+#include "glite/lbu/trio.h"
+
+#include "intjobstat.h"
+#include "seqcode_aux.h"
+
+/* TBD: share in whole logging or workload */
+#ifdef __GNUC__
+#define UNUSED_VAR __attribute__((unused))
+#else
+#define UNUSED_VAR
+#endif
+
+static int compare_timestamps(struct timeval a, struct timeval b)
+{
+       if ( (a.tv_sec > b.tv_sec) || 
+               ((a.tv_sec == b.tv_sec) && (a.tv_usec > b.tv_usec)) ) return 1;
+       if ( (a.tv_sec < b.tv_sec) ||
+                ((a.tv_sec == b.tv_sec) && (a.tv_usec < b.tv_usec)) ) return -1;
+       return 0;
+}
+
+
+// XXX move this defines into some common place to be reusable
+#define USABLE(res) ((res) == RET_OK)
+#define USABLE_DATA(res) (1)
+#define rep(a,b) { free(a); a = (b == NULL) ? NULL : strdup(b); }
+#define rep_cond(a,b) { if (b) { free(a); a = strdup(b); } }
+
+int processEvent_FileTransfer(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict, char **errstring)
+{
+       edg_wll_JobStatCode     old_state = js->pub.state;
+       int                     res = RET_OK;
+
+
+/*     not used in FT
+       if ((js->last_seqcode != NULL) &&
+                       (edg_wll_compare_pbs_seq(js->last_seqcode, e->any.seqcode) > 0) ) {
+               res = RET_LATE; 
+       }
+*/
+       switch (e->any.type) {
+               case EDG_WLL_EVENT_REGJOB:
+                       if (USABLE(res)) {
+                               js->pub.state = EDG_WLL_JOB_SUBMITTED;
+                       }
+                       if (USABLE_DATA(res)) {
+                               ;
+                       }
+                       break;
+               default:
+                       break;
+       }
+
+       if (USABLE(res)) {
+               rep(js->last_seqcode, e->any.seqcode);
+
+               js->pub.lastUpdateTime = e->any.timestamp;
+               if (old_state != js->pub.state) {
+                       js->pub.stateEnterTime = js->pub.lastUpdateTime;
+                       js->pub.stateEnterTimes[1 + js->pub.state]
+                               = (int)js->pub.lastUpdateTime.tv_sec;
+               }
+       }
+       if (! js->pub.location) js->pub.location = strdup("this is FILE TRANSFER");
+
+
+       return RET_OK;
+}
+
diff --git a/org.glite.lb.state-machine/src/process_event_file_transfer_collection.c b/org.glite.lb.state-machine/src/process_event_file_transfer_collection.c
new file mode 100644 (file)
index 0000000..3d7a758
--- /dev/null
@@ -0,0 +1,80 @@
+#ident "$Header$"
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <errno.h>
+#include <assert.h>
+#include <syslog.h>
+
+#include "glite/lb/context-int.h"
+
+#include "glite/lbu/trio.h"
+
+#include "intjobstat.h"
+#include "seqcode_aux.h"
+
+/* TBD: share in whole logging or workload */
+#ifdef __GNUC__
+#define UNUSED_VAR __attribute__((unused))
+#else
+#define UNUSED_VAR
+#endif
+
+static int compare_timestamps(struct timeval a, struct timeval b)
+{
+       if ( (a.tv_sec > b.tv_sec) || 
+               ((a.tv_sec == b.tv_sec) && (a.tv_usec > b.tv_usec)) ) return 1;
+       if ( (a.tv_sec < b.tv_sec) ||
+                ((a.tv_sec == b.tv_sec) && (a.tv_usec < b.tv_usec)) ) return -1;
+       return 0;
+}
+
+
+// XXX move this defines into some common place to be reusable
+#define USABLE(res) ((res) == RET_OK)
+#define USABLE_DATA(res) (1)
+#define rep(a,b) { free(a); a = (b == NULL) ? NULL : strdup(b); }
+#define rep_cond(a,b) { if (b) { free(a); a = strdup(b); } }
+
+int processEvent_FileTransferCollection(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict, char **errstring)
+{
+       edg_wll_JobStatCode     old_state = js->pub.state;
+       int                     res = RET_OK;
+
+
+/*     not used in FTC
+       if ((js->last_seqcode != NULL) &&
+                       (edg_wll_compare_pbs_seq(js->last_seqcode, e->any.seqcode) > 0) ) {
+               res = RET_LATE; 
+       }
+*/
+       switch (e->any.type) {
+               case EDG_WLL_EVENT_REGJOB:
+                       if (USABLE(res)) {
+                               js->pub.state = EDG_WLL_JOB_SUBMITTED;
+                       }
+                       if (USABLE_DATA(res)) {
+                               ;
+                       }
+                       break;
+               default:
+                       break;
+       }
+
+       if (USABLE(res)) {
+               rep(js->last_seqcode, e->any.seqcode);
+
+               js->pub.lastUpdateTime = e->any.timestamp;
+               if (old_state != js->pub.state) {
+                       js->pub.stateEnterTime = js->pub.lastUpdateTime;
+                       js->pub.stateEnterTimes[1 + js->pub.state]
+                               = (int)js->pub.lastUpdateTime.tv_sec;
+               }
+       }
+       if (! js->pub.location) js->pub.location = strdup("this is FILE TRANSFER");
+
+
+       return RET_OK;
+}
+