From 03f8e90109b79bd5007b6b829c7ae3e2d4fdd1b3 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Michal=20Voc=C5=AF?= Date: Mon, 9 Mar 2009 15:08:56 +0000 Subject: [PATCH] enable reading collection events --- org.glite.lb.common/src/lb_perftest.c | 199 +++++++++++++++++++++++++++------- 1 file changed, 158 insertions(+), 41 deletions(-) diff --git a/org.glite.lb.common/src/lb_perftest.c b/org.glite.lb.common/src/lb_perftest.c index 2c87459..7c3607c 100644 --- a/org.glite.lb.common/src/lb_perftest.c +++ b/org.glite.lb.common/src/lb_perftest.c @@ -18,19 +18,29 @@ #include "lb_perftest.h" #include "il_msg.h" +typedef struct { + char *event; + int job_index; +} job_events_t; + static pthread_mutex_t perftest_lock = PTHREAD_MUTEX_INITIALIZER; static struct timeval endtime; static char *termination_string; -static char **events; /* in fact it is *events[] */ -static char **jobids; + +static int njobs = 0; +static int nsubjobs = 0; +static job_events_t *events; static int nevents; -static int njobs = 0; +static char **jobids; + static int cur_event = 0; static int cur_job = 0; +static int cur_group = 0; static char *test_user; static char *test_name; static char *dest_host; static int dest_port; +static int group_size = 1; #define EVENTS_BUFSIZ 16 #define BUFFSZ 1024 @@ -66,7 +76,6 @@ _strnstr(const char *big, const char *little, size_t max) /* * reading lines (pasted from load.c) */ - static int read_line(int fd, char **buff, int *maxsize) @@ -118,7 +127,7 @@ read_line(int fd, char **buff, int *maxsize) *buff = (char *)tmp; } (*buff)[++i] = '\0'; - return 1; + return i; } } @@ -133,31 +142,99 @@ read_line(int fd, char **buff, int *maxsize) */ static int -read_events(int fd, char ***evts /* *(*evts)[] */) +read_events(int fd) { void *tmp; - int maxlinesize; - char **e = NULL, *line = NULL; + char *line = NULL; + int linesize; size_t i = 0, max = EVENTS_BUFSIZ; + char **subjobids = NULL; + int len; + + if ((events = calloc(max, sizeof(job_events_t))) == NULL) goto nomem; + if ((subjobids = calloc(max, sizeof(char*))) == NULL) goto nomem; + nsubjobs = 0; - if ((e = malloc(max * sizeof(char *))) == NULL) goto nomem; + while ((len = read_line(fd, &line, &linesize)) > 0) { + char *d, *p, *q; - while (read_line(fd, &line, &maxlinesize)) { if (i + 1 >= max) { max <<= 1; - if ((tmp = realloc(e, max * sizeof(char *))) == NULL) goto nomem; - e = (char **)tmp; + if ((tmp = realloc(events, max * sizeof(job_events_t))) == NULL) goto nomem; + events = (job_events_t*)tmp; } - if ((e[i] = strdup(line)) == NULL) goto nomem; + /* if ((e[i] = strdup(line)) == NULL) goto nomem; */ + /* allocate memory for event, reserve enough space for variable keys */ + if((d = malloc(len + 1)) == NULL) + goto nomem; + events[i].event = d; + events[i].job_index = 0; + + /* copy key by key, look for JOBID, PARENT, SEED and NSUBJOBS */ + for(p = q = line; q < line + len ; p = q) { + + /* look for next DG. */ + /* XXX - this works only for "nice" events with no spurious DG. */ + q = strstr(p+3, "DG."); + if(q) { + } else { + /* no further key, copy till the end */ + q = line + len; + } + if(strncmp(p, "DG.JOBID", 8) == 0) { + /* omit, will be filled in later */ + /* if we know jobid, we can fill in job_index */ + char *v; + + v = strchr(p+10, '"'); + if(v) { + int m; + + *v = 0; + /* lookup this jobid in subjobids */ + for(m = 0; (m < nsubjobs + 1) && subjobids[m]; m++) { + if(strcmp(p+10, subjobids[m]) == 0) + break; + } + if(m >= nsubjobs + 1) m = 0; + if(subjobids[m] == NULL) { + subjobids[m] = strdup(p+10); + } + events[i].job_index = m; + *v = '"'; + } + } else if(strncmp(p, "DG.REGJOB.PARENT", 16) == 0) { + /* omit, will be filled in later */ + } else if(strncmp(p, "DG.REGJOB.SEED", 14) == 0) { + /* omit for now */ + } else if(strncmp(p, "DG.REGJOB.NSUBJOBS", 18) == 0) { + int val; + + strncpy(d, p, q - p); + d += q - p; + /* scan the number of subjobs */ + val = atoi(p + 20); + if(nsubjobs == 0 && val > 0) { + nsubjobs = val; + subjobids = realloc(subjobids, (nsubjobs+1)*sizeof(*subjobids)); + if(subjobids == NULL) goto nomem; + } + } else { + /* some other key */ + strncpy(d, p, q - p); + d += q - p; + } + } + *d = 0; free(line); line = NULL; i++; } - *evts = e; return i; nomem: - if(e) free(e); + /* XXX: should free all events[i].event */ + if(events) free(events); if(line) free(line); fprintf(stderr, "read_events: insufficient memory\n"); return -1; @@ -171,6 +248,9 @@ glite_wll_perftest_init(const char *host, const char *filename, int n) { + edg_wll_Context ctx; + + edg_wll_InitContext(&ctx); if(trio_asprintf(&termination_string, EDG_WLL_FORMAT_USERTAG, PERFTEST_END_TAG_NAME, PERFTEST_END_TAG_VALUE) < 0) @@ -224,24 +304,28 @@ glite_wll_perftest_init(const char *host, return(-1); } - if((nevents=read_events(fd, &events)) < 0) + if((nevents=read_events(fd)) < 0) return(-1); close(fd); - fprintf(stderr, "PERFTEST_JOB_SIZE=%d\n", nevents); - fprintf(stderr, "PERFTEST_NUM_JOBS=%d\n", njobs); + fprintf(stderr, "PERFTEST_NUM_JOBS=%d\n", njobs * (nsubjobs + 1)); } - /* generate jobids[0..njobs-1] */ - jobids = calloc(njobs, sizeof(char*)); + /* we suppose nsubjobs was filled in by read_events() */ + + + /* generate jobids[0..njobs-1, 0..nsubjobs] */ + jobids = calloc(njobs*(nsubjobs + 1), sizeof(char*)); if(jobids == NULL) { fprintf(stderr, "glite_wll_perftest_init: not enough memory for job id's\n"); return(-1); } while (--n >= 0) { - edg_wlc_JobId jobid; + glite_jobid_t jobid; + glite_jobid_t *subjobid; + int i; if(glite_wll_perftest_createJobId(dest_host, dest_port, @@ -250,17 +334,32 @@ glite_wll_perftest_init(const char *host, n, &jobid) != 0) { fprintf(stderr, "produceJobId: error creating jobid\n"); - if(pthread_mutex_unlock(&perftest_lock) < 0) - abort(); return(-1); } - if((jobids[n]=edg_wlc_JobIdUnparse(jobid)) == NULL) { + if((jobids[n*(nsubjobs+1)]=edg_wlc_JobIdUnparse(jobid)) == NULL) { fprintf(stderr, "produceJobId: error unparsing jobid\n"); - if(pthread_mutex_unlock(&perftest_lock) < 0) - abort(); return(-1); } - }; + glite_jobid_free(jobid); + + /* generate subjob ids */ + if(nsubjobs > 0) { + if(edg_wll_GenerateSubjobIds(ctx, jobid, nsubjobs, test_name, + &subjobid) < 0) { + fprintf(stderr, "produceJobId: error generating subjob ids\n"); + return -1; + } + } + for(i = 1; i <= nsubjobs; i++) { + if((jobids[n*(nsubjobs+1) + i] = edg_wlc_JobIdUnparse(subjobid[i])) == NULL) { + fprintf(stderr, "produceJobId: error unparsing jobid\n"); + return(-1); + } + glite_jobid_free(subjobid[i]); + } + } + + return(0); } @@ -285,18 +384,21 @@ glite_wll_perftest_produceJobId() return(NULL); } - jobid = jobids[cur_job++]; + jobid = jobids[(nsubjobs+1)*cur_job++]; if(cur_job >= njobs) cur_job = -1; + if(pthread_mutex_unlock(&perftest_lock) < 0) + abort(); + return(jobid); } /** - * This produces (njobs*nevents + 1) events, one event for each call. - * For every nevents (one job) new jobid is generated and inserted into + * This produces (njobs*nsubjobs*nevents + 1) events, one event for each call. + * For every nevents (one subjob) new jobid is inserted into * event. The last event is termination - usertag. */ int @@ -304,7 +406,7 @@ glite_wll_perftest_produceEventString(char **event, char **jobid) { static int first = 1; char *e; - int len; + int len, cur_subjob; assert(event != NULL); @@ -318,8 +420,11 @@ glite_wll_perftest_produceEventString(char **event, char **jobid) return(0); } + /* use index to get current subjob */ + cur_subjob = events[cur_event].job_index; + /* did we send all events? */ - if(cur_event >= nevents) { + if(cur_group*group_size + cur_job > njobs) { /* construct termination event */ if((len=trio_asprintf(&e, EDG_WLL_FORMAT_COMMON EDG_WLL_FORMAT_USERTAG "\n", @@ -339,11 +444,14 @@ glite_wll_perftest_produceEventString(char **event, char **jobid) abort(); return(-1); } - *jobid = jobids[cur_job]; + *jobid = jobids[cur_job*(nsubjobs+1)]; /* and refuse to produce more */ cur_job = -1; cur_event = -1; + cur_subjob = -1; + + } else { @@ -358,23 +466,32 @@ glite_wll_perftest_produceEventString(char **event, char **jobid) } /* return current event with jobid filled in */ - if((len=trio_asprintf(&e, "DG.JOBID=\"%s\" %s", - jobids[cur_job], events[cur_event])) < 0) { + if((len=trio_asprintf(&e, "DG.JOBID=\"%s\" DG.REGJOB.PARENT=\"%s\" DG.REGJOB.SEED=\"%s\" DG.REGJOB.NSUBJOBS=\"%d\" %s", + jobids[cur_job*(nsubjobs+1) + cur_subjob], + (nsubjobs > 0) ? jobids[cur_job*(nsubjobs+1)] : "", + test_name, + nsubjobs, + events[cur_event].event)) < 0) { fprintf(stderr, "produceEventString: error generating event\n"); if(pthread_mutex_unlock(&perftest_lock) < 0) abort(); return(-1); } - *jobid = jobids[cur_job]; + *jobid = jobids[cur_job*(nsubjobs+1) + cur_subjob]; + + /* advance to the next job and/or event */ + if(++cur_job % group_size == 0) { + if(++cur_event >= nevents) { + cur_event = 0; + cur_group++; + } + cur_job = 0; + } + } *event = e; - /* advance to the next job and/or event */ - if(++cur_job >= njobs) { - cur_event++; - cur_job = 0; - } if(pthread_mutex_unlock(&perftest_lock) < 0) abort(); -- 1.8.2.3