* added event source (reading from file and generating events)
authorMichal Voců <michal@ruk.cuni.cz>
Wed, 17 May 2006 15:10:13 +0000 (15:10 +0000)
committerMichal Voců <michal@ruk.cuni.cz>
Wed, 17 May 2006 15:10:13 +0000 (15:10 +0000)
org.glite.lb.common/src/lb_perftest.c

index d672bc6..07676af 100644 (file)
@@ -5,6 +5,9 @@
 #include <assert.h>
 #include <pthread.h>
 #include <string.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
 
 #include "lb_perftest.h"
 #include "glite/lb/producer.h"
 static pthread_mutex_t perftest_lock = PTHREAD_MUTEX_INITIALIZER;
 static struct timeval endtime;
 static char *termination_string;
+static char *events[];
+static int nevents;
+static int njobs = 0;
+static int cur_event = 0;
+static int cur_job = 0;
 
+#define EVENTS_BUFSIZ 16
+#define BUFFSZ        1024
+
+
+/* 
+ * reading lines (pasted from load.c)
+ */
+
+static 
 int 
-glite_wll_perftest_init()
+read_line(int fd, char **buff, size_t *maxsize)
 {
+        int             ct, i;
+        void            *tmp;
+
+
+        if ( *buff == NULL )
+        {
+                *buff = malloc(BUFFSZ);
+                if ( *buff == NULL )
+                        return -1;
+                *maxsize = BUFFSZ;
+        }
+
+        i = 0;
+        while ( 1 )
+        {
+                if (i >= *maxsize) {
+                        (*maxsize) *= 2;
+                        if ((tmp = realloc(*buff, *maxsize)) == NULL) return -1;
+                        *buff = (char *)tmp;
+                }
+                if ( (ct = read(fd, (*buff)+i, 1)) == -1 )
+                        return -1;
+
+                if ( ct == 0 )
+                        return 0;
+
+                if ( (*buff)[i] == '\n' )
+                {
+                        (*buff)[i--] = '\0';
+                        while ( (i != -1) && isspace((*buff)[i]) ) i--;
+                        if ( i == -1 )
+                        {
+                                /**     empty line
+                                 */
+                                i = 0;
+                                continue;
+                        }
+                        else
+                                return 1;
+                }
+
+                i++;
+        }
+
+}
+
+
+/* 
+ * read events from the stream
+ */
+static 
+int 
+read_events(int fd, char *(*events[])) 
+{
+       void *tmp;
+       size_t maxlinesize;
+       char **e = NULL, *line = NULL;
+       size_t i = 0, max = EVENTS_BUFSIZ;
+
+       if ((e = malloc(max * sizeof(char *))) == NULL) goto nomem;
+
+       while (read_line(fd, &line, &maxlinesize)) {
+               if (i + 1 >= max) {
+                       max <<= 1;
+                       if ((tmp = realloc(e, max * sizeof(char *))) == NULL) goto nomem;
+                       e = (char **)tmp;
+               }
+               if ((e[i] = strdup(line)) == NULL) goto nomem;
+               free(line); line = NULL;
+               i++;
+       }
+
+       *events = e;
+       return i;
+
+nomem:
+       if(e) free(e);
+       if(line) free(line);
+       fprintf(stderr, "read_events: insufficient memory\n");
+       return -1;
+}
+
+
+int 
+glite_wll_perftest_init(const char *host,
+                       int port,
+                       const char *user,
+                       const char *testname,
+                       const char *filename, 
+                       int n)
+{
+
        if(trio_asprintf(&termination_string, EDG_WLL_FORMAT_USERTAG,
                    PERFTEST_END_TAG_NAME, PERFTEST_END_TAG_VALUE) < 0)
                return(-1);
 
+       /* if we are asked to read events in, read them */
+       if(filename) {
+               int fd;
+               
+               if((fd=open(filename, O_RDONLY)) < 0) {
+                       fprintf(stderr, "glite_wll_perftest_init: Could not open event file %s: %s",
+                               filename, strerror(errno));
+                       return(-1);
+               }
+               
+               if((nevents=read_events(&events)) < 0)
+                       return(-1);
+
+               close(fd);
+
+               /* reset event source */
+               cur_event = cur_job = 0;
+
+               njobs = n;
+       }
+
+       return(0);
+}
+
+
+static char *cur_jobid;
+
+/**
+ * This produces (njobs*nevents + 1) events, one event for each call.
+ * For every nevents (one job) new jobid is generated and inserted into 
+ * event. The last event is termination - usertag.
+ */
+int
+glite_wll_perftest_produceEventString(char **event)
+{
+       char *e;
+
+       assert(event != NULL);
+
+       if(pthread_mutex_lock(&perftest_lock) < 0)
+               abort();
+
+
+       /* did we send all jobs? */
+       if(cur_job >= njobs) {
+               
+               /* construct termination event */
+               
+               /* and reset counters back to beginning */
+               cur_job = 0;
+               cur_event = 0;
+
+       } else {
+
+               /* are we starting new job? */
+               if(cur_event == 0) {
+                       edg_wlc_JobId jobid;
+                       
+                       /* generate new jobid */
+                       if(cur_jobid) free(cur_jobid);
+                       if(glite_wll_perftest_createJobId(dest_host,
+                                                         dest_port,
+                                                         test_user,
+                                                         test_name,
+                                                         cur_job,
+                                                         &jobid) != 0) {
+                               fprintf(stderr, "produceEventString: error creating jobid\n");
+                               return(-1)
+                                       }
+                       if((cur_jobid=edg_wlc_JobIdUnparse(jobid)) == NULL) {
+                               fprintf(stderr, "produceEventString: error unparsing jobid\n");
+                               return(-1);
+                       }
+               }
+               
+               /* return current event with jobid filled in */
+               if(trio_asprintf(&e, "DG.JOBID=\"%s\" %s", cur_jobid, events[cur_event]) < 0) {
+                       fprintf(stderr, "produceEventString: error generating event\n");
+                       return(-1);
+               }
+       }
+
+       *event = e;
+
+       /* advance to the next event */
+       if(++cur_event >= nevents) {
+               cur_job++;
+               cur_event = 0;
+       }
+
+       if(pthread_mutex_unlock(&perftest_lock) < 0)
+               abort();
+
        return(0);
 }