#include "lb_perftest.h"
#include "il_msg.h"
+typedef struct {
+ char *event;
+ int job_index;
+} job_events_t;
+
static pthread_mutex_t perftest_lock = PTHREAD_MUTEX_INITIALIZER;
static struct timeval endtime;
static char *termination_string;
-static char **events; /* in fact it is *events[] */
-static char **jobids;
+
+static int njobs = 0;
+static int nsubjobs = 0;
+static job_events_t *events;
static int nevents;
-static int njobs = 0;
+static char **jobids;
+
static int cur_event = 0;
static int cur_job = 0;
+static int cur_group = 0;
static char *test_user;
static char *test_name;
static char *dest_host;
static int dest_port;
+static int group_size = 1;
#define EVENTS_BUFSIZ 16
#define BUFFSZ 1024
/*
* reading lines (pasted from load.c)
*/
-
static
int
read_line(int fd, char **buff, int *maxsize)
*buff = (char *)tmp;
}
(*buff)[++i] = '\0';
- return 1;
+ return i;
}
}
*/
static
int
-read_events(int fd, char ***evts /* *(*evts)[] */)
+read_events(int fd)
{
void *tmp;
- int maxlinesize;
- char **e = NULL, *line = NULL;
+ char *line = NULL;
+ int linesize;
size_t i = 0, max = EVENTS_BUFSIZ;
+ char **subjobids = NULL;
+ int len;
+
+ if ((events = calloc(max, sizeof(job_events_t))) == NULL) goto nomem;
+ if ((subjobids = calloc(max, sizeof(char*))) == NULL) goto nomem;
+ nsubjobs = 0;
- if ((e = malloc(max * sizeof(char *))) == NULL) goto nomem;
+ while ((len = read_line(fd, &line, &linesize)) > 0) {
+ char *d, *p, *q;
- while (read_line(fd, &line, &maxlinesize)) {
if (i + 1 >= max) {
max <<= 1;
- if ((tmp = realloc(e, max * sizeof(char *))) == NULL) goto nomem;
- e = (char **)tmp;
+ if ((tmp = realloc(events, max * sizeof(job_events_t))) == NULL) goto nomem;
+ events = (job_events_t*)tmp;
}
- if ((e[i] = strdup(line)) == NULL) goto nomem;
+ /* if ((e[i] = strdup(line)) == NULL) goto nomem; */
+ /* allocate memory for event, reserve enough space for variable keys */
+ if((d = malloc(len + 1)) == NULL)
+ goto nomem;
+ events[i].event = d;
+ events[i].job_index = 0;
+
+ /* copy key by key, look for JOBID, PARENT, SEED and NSUBJOBS */
+ for(p = q = line; q < line + len ; p = q) {
+
+ /* look for next DG. */
+ /* XXX - this works only for "nice" events with no spurious DG. */
+ q = strstr(p+3, "DG.");
+ if(q) {
+ } else {
+ /* no further key, copy till the end */
+ q = line + len;
+ }
+ if(strncmp(p, "DG.JOBID", 8) == 0) {
+ /* omit, will be filled in later */
+ /* if we know jobid, we can fill in job_index */
+ char *v;
+
+ v = strchr(p+10, '"');
+ if(v) {
+ int m;
+
+ *v = 0;
+ /* lookup this jobid in subjobids */
+ for(m = 0; (m < nsubjobs + 1) && subjobids[m]; m++) {
+ if(strcmp(p+10, subjobids[m]) == 0)
+ break;
+ }
+ if(m >= nsubjobs + 1) m = 0;
+ if(subjobids[m] == NULL) {
+ subjobids[m] = strdup(p+10);
+ }
+ events[i].job_index = m;
+ *v = '"';
+ }
+ } else if(strncmp(p, "DG.REGJOB.PARENT", 16) == 0) {
+ /* omit, will be filled in later */
+ } else if(strncmp(p, "DG.REGJOB.SEED", 14) == 0) {
+ /* omit for now */
+ } else if(strncmp(p, "DG.REGJOB.NSUBJOBS", 18) == 0) {
+ int val;
+
+ strncpy(d, p, q - p);
+ d += q - p;
+ /* scan the number of subjobs */
+ val = atoi(p + 20);
+ if(nsubjobs == 0 && val > 0) {
+ nsubjobs = val;
+ subjobids = realloc(subjobids, (nsubjobs+1)*sizeof(*subjobids));
+ if(subjobids == NULL) goto nomem;
+ }
+ } else {
+ /* some other key */
+ strncpy(d, p, q - p);
+ d += q - p;
+ }
+ }
+ *d = 0;
free(line); line = NULL;
i++;
}
- *evts = e;
return i;
nomem:
- if(e) free(e);
+ /* XXX: should free all events[i].event */
+ if(events) free(events);
if(line) free(line);
fprintf(stderr, "read_events: insufficient memory\n");
return -1;
const char *filename,
int n)
{
+ edg_wll_Context ctx;
+
+ edg_wll_InitContext(&ctx);
if(trio_asprintf(&termination_string, EDG_WLL_FORMAT_USERTAG,
PERFTEST_END_TAG_NAME, PERFTEST_END_TAG_VALUE) < 0)
return(-1);
}
- if((nevents=read_events(fd, &events)) < 0)
+ if((nevents=read_events(fd)) < 0)
return(-1);
close(fd);
-
fprintf(stderr, "PERFTEST_JOB_SIZE=%d\n", nevents);
- fprintf(stderr, "PERFTEST_NUM_JOBS=%d\n", njobs);
+ fprintf(stderr, "PERFTEST_NUM_JOBS=%d\n", njobs * (nsubjobs + 1));
}
- /* generate jobids[0..njobs-1] */
- jobids = calloc(njobs, sizeof(char*));
+ /* we suppose nsubjobs was filled in by read_events() */
+
+
+ /* generate jobids[0..njobs-1, 0..nsubjobs] */
+ jobids = calloc(njobs*(nsubjobs + 1), sizeof(char*));
if(jobids == NULL) {
fprintf(stderr, "glite_wll_perftest_init: not enough memory for job id's\n");
return(-1);
}
while (--n >= 0) {
- edg_wlc_JobId jobid;
+ glite_jobid_t jobid;
+ glite_jobid_t *subjobid;
+ int i;
if(glite_wll_perftest_createJobId(dest_host,
dest_port,
n,
&jobid) != 0) {
fprintf(stderr, "produceJobId: error creating jobid\n");
- if(pthread_mutex_unlock(&perftest_lock) < 0)
- abort();
return(-1);
}
- if((jobids[n]=edg_wlc_JobIdUnparse(jobid)) == NULL) {
+ if((jobids[n*(nsubjobs+1)]=edg_wlc_JobIdUnparse(jobid)) == NULL) {
fprintf(stderr, "produceJobId: error unparsing jobid\n");
- if(pthread_mutex_unlock(&perftest_lock) < 0)
- abort();
return(-1);
}
- };
+ glite_jobid_free(jobid);
+
+ /* generate subjob ids */
+ if(nsubjobs > 0) {
+ if(edg_wll_GenerateSubjobIds(ctx, jobid, nsubjobs, test_name,
+ &subjobid) < 0) {
+ fprintf(stderr, "produceJobId: error generating subjob ids\n");
+ return -1;
+ }
+ }
+ for(i = 1; i <= nsubjobs; i++) {
+ if((jobids[n*(nsubjobs+1) + i] = edg_wlc_JobIdUnparse(subjobid[i])) == NULL) {
+ fprintf(stderr, "produceJobId: error unparsing jobid\n");
+ return(-1);
+ }
+ glite_jobid_free(subjobid[i]);
+ }
+ }
+
+
return(0);
}
return(NULL);
}
- jobid = jobids[cur_job++];
+ jobid = jobids[(nsubjobs+1)*cur_job++];
if(cur_job >= njobs)
cur_job = -1;
+ if(pthread_mutex_unlock(&perftest_lock) < 0)
+ abort();
+
return(jobid);
}
/**
- * This produces (njobs*nevents + 1) events, one event for each call.
- * For every nevents (one job) new jobid is generated and inserted into
+ * This produces (njobs*nsubjobs*nevents + 1) events, one event for each call.
+ * For every nevents (one subjob) new jobid is inserted into
* event. The last event is termination - usertag.
*/
int
{
static int first = 1;
char *e;
- int len;
+ int len, cur_subjob;
assert(event != NULL);
return(0);
}
+ /* use index to get current subjob */
+ cur_subjob = events[cur_event].job_index;
+
/* did we send all events? */
- if(cur_event >= nevents) {
+ if(cur_group*group_size + cur_job > njobs) {
/* construct termination event */
if((len=trio_asprintf(&e, EDG_WLL_FORMAT_COMMON EDG_WLL_FORMAT_USERTAG "\n",
abort();
return(-1);
}
- *jobid = jobids[cur_job];
+ *jobid = jobids[cur_job*(nsubjobs+1)];
/* and refuse to produce more */
cur_job = -1;
cur_event = -1;
+ cur_subjob = -1;
+
+
} else {
}
/* return current event with jobid filled in */
- if((len=trio_asprintf(&e, "DG.JOBID=\"%s\" %s",
- jobids[cur_job], events[cur_event])) < 0) {
+ if((len=trio_asprintf(&e, "DG.JOBID=\"%s\" DG.REGJOB.PARENT=\"%s\" DG.REGJOB.SEED=\"%s\" DG.REGJOB.NSUBJOBS=\"%d\" %s",
+ jobids[cur_job*(nsubjobs+1) + cur_subjob],
+ (nsubjobs > 0) ? jobids[cur_job*(nsubjobs+1)] : "",
+ test_name,
+ nsubjobs,
+ events[cur_event].event)) < 0) {
fprintf(stderr, "produceEventString: error generating event\n");
if(pthread_mutex_unlock(&perftest_lock) < 0)
abort();
return(-1);
}
- *jobid = jobids[cur_job];
+ *jobid = jobids[cur_job*(nsubjobs+1) + cur_subjob];
+
+ /* advance to the next job and/or event */
+ if(++cur_job % group_size == 0) {
+ if(++cur_event >= nevents) {
+ cur_event = 0;
+ cur_group++;
+ }
+ cur_job = 0;
+ }
+
}
*event = e;
- /* advance to the next job and/or event */
- if(++cur_job >= njobs) {
- cur_event++;
- cur_job = 0;
- }
if(pthread_mutex_unlock(&perftest_lock) < 0)
abort();