enable reading collection events
authorMichal Voců <michal@ruk.cuni.cz>
Mon, 9 Mar 2009 15:08:56 +0000 (15:08 +0000)
committerMichal Voců <michal@ruk.cuni.cz>
Mon, 9 Mar 2009 15:08:56 +0000 (15:08 +0000)
org.glite.lb.common/src/lb_perftest.c

index 2c87459..7c3607c 100644 (file)
 #include "lb_perftest.h"
 #include "il_msg.h"
 
+typedef struct {
+  char *event;
+  int   job_index;
+} job_events_t;
+
 static pthread_mutex_t perftest_lock = PTHREAD_MUTEX_INITIALIZER;
 static struct timeval endtime;
 static char *termination_string;
-static char **events; /* in fact it is *events[] */
-static char **jobids;
+
+static int njobs = 0; 
+static int nsubjobs = 0; 
+static job_events_t *events;
 static int nevents;
-static int njobs = 0;
+static char **jobids;
+
 static int cur_event = 0;
 static int cur_job = 0;
+static int cur_group = 0;
 static char *test_user;
 static char *test_name;
 static char *dest_host;
 static int dest_port;
+static int group_size = 1;
 
 #define EVENTS_BUFSIZ 16
 #define BUFFSZ        1024
@@ -66,7 +76,6 @@ _strnstr(const char *big, const char *little, size_t max)
 /* 
  * reading lines (pasted from load.c)
  */
-
 static 
 int 
 read_line(int fd, char **buff, int *maxsize)
@@ -118,7 +127,7 @@ read_line(int fd, char **buff, int *maxsize)
                                        *buff = (char *)tmp;
                                }
                                (*buff)[++i] = '\0';
-                                return 1;
+                                return i;
                        }
                 }
 
@@ -133,31 +142,99 @@ read_line(int fd, char **buff, int *maxsize)
  */
 static 
 int 
-read_events(int fd, char ***evts /* *(*evts)[] */
+read_events(int fd) 
 {
        void *tmp;
-       int maxlinesize;
-       char **e = NULL, *line = NULL;
+       char *line = NULL;
+       int linesize;
        size_t i = 0, max = EVENTS_BUFSIZ;
+       char **subjobids = NULL;
+       int len;
+
+       if ((events = calloc(max, sizeof(job_events_t))) == NULL) goto nomem;
+       if ((subjobids = calloc(max, sizeof(char*))) == NULL) goto nomem;
+       nsubjobs = 0;
 
-       if ((e = malloc(max * sizeof(char *))) == NULL) goto nomem;
+       while ((len = read_line(fd, &line, &linesize)) > 0) {
+               char *d, *p, *q;
 
-       while (read_line(fd, &line, &maxlinesize)) {
                if (i + 1 >= max) {
                        max <<= 1;
-                       if ((tmp = realloc(e, max * sizeof(char *))) == NULL) goto nomem;
-                       e = (char **)tmp;
+                       if ((tmp = realloc(events, max * sizeof(job_events_t))) == NULL) goto nomem;
+                       events = (job_events_t*)tmp;
                }
-               if ((e[i] = strdup(line)) == NULL) goto nomem;
+               /* if ((e[i] = strdup(line)) == NULL) goto nomem; */
+               /* allocate memory for event, reserve enough space for variable keys */
+               if((d = malloc(len + 1)) == NULL) 
+                       goto nomem;
+               events[i].event = d;
+               events[i].job_index = 0; 
+               
+               /* copy key by key, look for JOBID, PARENT, SEED and NSUBJOBS */
+               for(p = q = line; q < line + len ; p = q) {
+
+                       /* look for next DG. */
+                       /* XXX - this works only for "nice" events with no spurious DG. */
+                       q = strstr(p+3, "DG.");
+                       if(q) {
+                       } else {
+                               /* no further key, copy till the end */
+                               q = line + len;
+                       }
+                       if(strncmp(p, "DG.JOBID", 8) == 0) {
+                               /* omit, will be filled in later */
+                               /* if we know jobid, we can fill in job_index */
+                               char *v;
+
+                               v = strchr(p+10, '"');
+                               if(v) {
+                                       int m;
+
+                                       *v = 0;
+                                       /* lookup this jobid in subjobids */
+                                       for(m = 0; (m < nsubjobs + 1) && subjobids[m]; m++) {
+                                               if(strcmp(p+10, subjobids[m]) == 0) 
+                                                       break;
+                                       }
+                                       if(m >= nsubjobs + 1) m = 0;
+                                       if(subjobids[m] == NULL) {
+                                               subjobids[m] = strdup(p+10);
+                                       }
+                                       events[i].job_index = m;
+                                       *v = '"';
+                               }
+                       } else if(strncmp(p, "DG.REGJOB.PARENT", 16) == 0) {
+                               /* omit, will be filled in later */
+                       } else if(strncmp(p, "DG.REGJOB.SEED", 14) == 0) {
+                               /* omit for now */
+                       } else if(strncmp(p, "DG.REGJOB.NSUBJOBS", 18) == 0) {
+                               int val;
+
+                               strncpy(d, p, q - p);
+                               d += q - p;
+                               /* scan the number of subjobs */
+                               val = atoi(p + 20);
+                               if(nsubjobs == 0 && val > 0) {
+                                       nsubjobs = val;
+                                       subjobids = realloc(subjobids, (nsubjobs+1)*sizeof(*subjobids));
+                                       if(subjobids == NULL) goto nomem;
+                               }
+                       } else {
+                               /* some other key */
+                               strncpy(d, p, q - p);
+                               d += q - p;
+                       }
+               }
+               *d = 0;
                free(line); line = NULL;
                i++;
        }
 
-       *evts = e;
        return i;
 
 nomem:
-       if(e) free(e);
+       /* XXX: should free all events[i].event */
+       if(events) free(events);
        if(line) free(line);
        fprintf(stderr, "read_events: insufficient memory\n");
        return -1;
@@ -171,6 +248,9 @@ glite_wll_perftest_init(const char *host,
                        const char *filename, 
                        int n)
 {
+       edg_wll_Context ctx;
+
+       edg_wll_InitContext(&ctx);
 
        if(trio_asprintf(&termination_string, EDG_WLL_FORMAT_USERTAG,
                    PERFTEST_END_TAG_NAME, PERFTEST_END_TAG_VALUE) < 0)
@@ -224,24 +304,28 @@ glite_wll_perftest_init(const char *host,
                        return(-1);
                }
                
-               if((nevents=read_events(fd, &events)) < 0)
+               if((nevents=read_events(fd)) < 0)
                        return(-1);
 
                close(fd);
 
-
                fprintf(stderr, "PERFTEST_JOB_SIZE=%d\n", nevents);
-               fprintf(stderr, "PERFTEST_NUM_JOBS=%d\n", njobs);
+               fprintf(stderr, "PERFTEST_NUM_JOBS=%d\n", njobs * (nsubjobs + 1));
        }
 
-       /* generate jobids[0..njobs-1] */
-       jobids = calloc(njobs, sizeof(char*));
+       /* we suppose nsubjobs was filled in by read_events() */
+
+
+       /* generate jobids[0..njobs-1, 0..nsubjobs] */
+       jobids = calloc(njobs*(nsubjobs + 1), sizeof(char*));
        if(jobids == NULL) {
                fprintf(stderr, "glite_wll_perftest_init: not enough memory for job id's\n");
                return(-1);
        }
         while (--n >= 0) {
-               edg_wlc_JobId jobid;
+               glite_jobid_t jobid;
+               glite_jobid_t *subjobid;
+               int i;
 
                if(glite_wll_perftest_createJobId(dest_host,
                                                  dest_port,
@@ -250,17 +334,32 @@ glite_wll_perftest_init(const char *host,
                                                  n,
                                                  &jobid) != 0) {
                        fprintf(stderr, "produceJobId: error creating jobid\n");
-                       if(pthread_mutex_unlock(&perftest_lock) < 0)
-                               abort();
                        return(-1);
                }
-               if((jobids[n]=edg_wlc_JobIdUnparse(jobid)) == NULL) {
+               if((jobids[n*(nsubjobs+1)]=edg_wlc_JobIdUnparse(jobid)) == NULL) {
                        fprintf(stderr, "produceJobId: error unparsing jobid\n");
-                       if(pthread_mutex_unlock(&perftest_lock) < 0)
-                               abort();
                        return(-1);
                }
-       };
+               glite_jobid_free(jobid);
+
+               /* generate subjob ids */
+               if(nsubjobs > 0) {
+                       if(edg_wll_GenerateSubjobIds(ctx, jobid, nsubjobs, test_name,
+                                                    &subjobid) < 0) {
+                               fprintf(stderr, "produceJobId: error generating subjob ids\n");
+                               return -1;
+                       }
+               }
+               for(i = 1; i <= nsubjobs; i++) {
+                       if((jobids[n*(nsubjobs+1) + i] = edg_wlc_JobIdUnparse(subjobid[i])) == NULL) {
+                               fprintf(stderr, "produceJobId: error unparsing jobid\n");
+                               return(-1);
+                       }
+                       glite_jobid_free(subjobid[i]);
+               }
+       }
+
+                       
        return(0);
 }
 
@@ -285,18 +384,21 @@ glite_wll_perftest_produceJobId()
                return(NULL);
        }
 
-       jobid = jobids[cur_job++];
+       jobid = jobids[(nsubjobs+1)*cur_job++];
 
        if(cur_job >= njobs) 
                cur_job = -1;
                
+       if(pthread_mutex_unlock(&perftest_lock) < 0)
+               abort();
+
        return(jobid);
 }
 
 
 /**
- * This produces (njobs*nevents + 1) events, one event for each call.
- * For every nevents (one job) new jobid is generated and inserted into 
+ * This produces (njobs*nsubjobs*nevents + 1) events, one event for each call.
+ * For every nevents (one subjob) new jobid is inserted into 
  * event. The last event is termination - usertag.
  */
 int
@@ -304,7 +406,7 @@ glite_wll_perftest_produceEventString(char **event, char **jobid)
 {
        static int first = 1;
        char *e;
-       int len;
+       int len, cur_subjob;
 
        assert(event != NULL);
 
@@ -318,8 +420,11 @@ glite_wll_perftest_produceEventString(char **event, char **jobid)
                return(0);
        }
 
+       /* use index to get current subjob */
+       cur_subjob = events[cur_event].job_index;
+
        /* did we send all events? */
-       if(cur_event >= nevents) {
+       if(cur_group*group_size + cur_job > njobs) {
                
                /* construct termination event */
                if((len=trio_asprintf(&e, EDG_WLL_FORMAT_COMMON EDG_WLL_FORMAT_USERTAG "\n",
@@ -339,11 +444,14 @@ glite_wll_perftest_produceEventString(char **event, char **jobid)
                                abort();
                        return(-1);
                }
-               *jobid = jobids[cur_job];
+               *jobid = jobids[cur_job*(nsubjobs+1)];
 
                /* and refuse to produce more */
                cur_job = -1;
                cur_event = -1;
+               cur_subjob = -1;
+
+               
 
        } else {
 
@@ -358,23 +466,32 @@ glite_wll_perftest_produceEventString(char **event, char **jobid)
                }
                
                /* return current event with jobid filled in */
-               if((len=trio_asprintf(&e, "DG.JOBID=\"%s\" %s", 
-                                     jobids[cur_job], events[cur_event])) < 0) {
+               if((len=trio_asprintf(&e, "DG.JOBID=\"%s\" DG.REGJOB.PARENT=\"%s\" DG.REGJOB.SEED=\"%s\" DG.REGJOB.NSUBJOBS=\"%d\" %s", 
+                                     jobids[cur_job*(nsubjobs+1) + cur_subjob], 
+                                     (nsubjobs > 0) ? jobids[cur_job*(nsubjobs+1)] : "",
+                                     test_name,
+                                     nsubjobs,
+                                     events[cur_event].event)) < 0) {
                        fprintf(stderr, "produceEventString: error generating event\n");
                        if(pthread_mutex_unlock(&perftest_lock) < 0)
                                abort();
                        return(-1);
                }
-               *jobid = jobids[cur_job];
+               *jobid = jobids[cur_job*(nsubjobs+1) + cur_subjob];
+
+               /* advance to the next job and/or event */
+               if(++cur_job % group_size == 0) {
+                       if(++cur_event >= nevents) {
+                               cur_event = 0;
+                               cur_group++;
+                       }
+                       cur_job = 0;
+               }
+
        }
 
        *event = e;
 
-       /* advance to the next job and/or event */
-       if(++cur_job >= njobs) {
-               cur_event++;
-               cur_job = 0;
-       }
 
        if(pthread_mutex_unlock(&perftest_lock) < 0)
                abort();