merge of changes betwen merge_for_3_1_src and merge_30_31_src
authorAleš Křenek <ljocha@ics.muni.cz>
Tue, 15 Aug 2006 20:06:05 +0000 (20:06 +0000)
committerAleš Křenek <ljocha@ics.muni.cz>
Tue, 15 Aug 2006 20:06:05 +0000 (20:06 +0000)
org.glite.lb.client/Makefile
org.glite.lb.client/examples/flood_proxy.c [new file with mode: 0644]
org.glite.lb.client/examples/lbmon.c [new file with mode: 0644]
org.glite.lb.client/project/configure.properties.xml
org.glite.lb.client/src/connection.c
org.glite.lb.client/src/connection.h
org.glite.lb.client/src/producer.c

index 7fe5927..cec8b5a 100644 (file)
@@ -69,29 +69,22 @@ CFLAGS:=${DEBUG} \
        -I${top_srcdir}/src -I${top_srcdir}/interface \
        -I${stagedir}/include \
        -I${glite_location}/include \
-       -I${expat_prefix}/include \
-       -I${mysql_prefix}/include -I${mysql_prefix}/include/mysql \
        ${COVERAGE_FLAGS} \
        -D_GNU_SOURCE
 
+#      -I${expat_prefix}/include \
+#      -I${mysql_prefix}/include -I${mysql_prefix}/include/mysql \
 
 CXXFLAGS:=${CFLAGS}
 
-ifneq (${mysql_prefix},/usr)
-       ifeq ($(shell echo ${mysql_version} | cut -d. -f1,2),4.1)
-               mysqlib := -L${mysql_prefix}/lib/mysql
-       else
-               mysqlib := -L${mysql_prefix}/lib
-       endif
-endif
+#ifneq (${expat_prefix},/usr)
+#      EXPAT_LIBS:=-L${expat_prefix}/lib
+#endif
+#EXPAT_LIBS := ${EXPAT_LIBS} -lexpat
 
-ifneq (${expat_prefix},/usr)
-       EXPAT_LIBS:=-L${expat_prefix}/lib
-endif
-EXPAT_LIBS := ${EXPAT_LIBS} -lexpat
-
-EXT_LIB:= ${mysqlib} -lmysqlclient -lz\
-       ${EXPAT_LIBS}
+EXT_LIB:= 
+#${mysqlib} -lmysqlclient -lz\
+#      ${EXPAT_LIBS}
 
 TEST_LIBS:=-L${cppunit_prefix}/lib -lcppunit
 TEST_INC:=-I${cppunit_prefix}/include
@@ -136,9 +129,9 @@ PLUSLIB:=libglite_lb_clientpp_${nothrflavour}.la
 THRPLUSLIB:=libglite_lb_clientpp_${thrflavour}.la
 
 TOOLS:=dump load purge lb_dump_exporter
-EXAMPLES:=log_usertag_proxy job_log job_reg feed_shark notify query_ext query_seq_code stats abort_job change_acl stresslog
+EXAMPLES:=log_usertag_proxy job_log job_reg feed_shark notify query_ext query_seq_code stats abort_job change_acl stresslog lbmon flood_proxy
 
-EXAMPLES_CL=user_jobs job_status 
+EXAMPLES_CL=user_jobs job_status
 FAKE_EXAMPLES:=job_log_fake
 
 MAN_GZ:=glite-lb-logevent.1.gz
@@ -237,7 +230,7 @@ ${MAN_GZ}: ${MAN}
 default: all
 
 
-compile all: ${LIB} ${THRLIB} ${TOOLS} logevent ${PLUSLIB} ${THRPLUSLIB} examples ${MAN_GZ}
+compile all: check_version ${LIB} ${THRLIB} ${TOOLS} logevent ${PLUSLIB} ${THRPLUSLIB} examples ${MAN_GZ}
 
 examples: ${EXAMPLES} ${EXAMPLES_CL} ${sh_PROGS}
 
@@ -283,7 +276,7 @@ install:
        ${INSTALL} -m 644 ${top_srcdir}/LICENSE ${PREFIX}/share/doc/${package}-${version}
        ${INSTALL} -m 644 ${top_srcdir}/doc/README-fake ${top_srcdir}/doc/README-notify ${PREFIX}/share/doc/${package}-${version}
        mkdir -p ${PREFIX}/examples
-       for p in logevent ; do \
+       for p in logevent; do \
                ${INSTALL} -m 755 "$$p" "${PREFIX}/bin/glite-lb-$$p"; \
        done
        for p in ${TOOLS} ; do \
diff --git a/org.glite.lb.client/examples/flood_proxy.c b/org.glite.lb.client/examples/flood_proxy.c
new file mode 100644 (file)
index 0000000..f59e426
--- /dev/null
@@ -0,0 +1,77 @@
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <string.h>
+#include <errno.h>
+#include <signal.h>
+#include <sys/types.h>
+#include <sys/wait.h>
+
+#include "glite/lb/producer.h"
+#include "glite/wmsutils/jobid/cjobid.h"
+
+static void slave();
+
+int main(int argc,char **argv)
+{
+       int     i,nproc;
+
+       if (argc != 2) {
+               fprintf(stderr,"usage: %s nproc\n",argv[0]);
+               return 1;
+       }
+       
+       nproc = atoi(argv[1]);
+       if (nproc < 1) {
+               fprintf(stderr,"%s: nproc must be >= 1\n",argv[0]);
+               return 1;
+       }
+
+       for (i=0; i<nproc; i++) {
+               switch (fork()) {
+                       case -1: perror("fork()"); return 1;
+                       case 0: slave();
+                       default: break;
+               }
+       }
+
+       while (nproc) {
+               int     stat;
+               wait(&stat);
+               if (WIFEXITED(stat)) nproc--;
+       }
+
+       puts("done");
+       return 0;
+}
+
+
+static void slave()
+{
+       edg_wll_Context ctx;
+       edg_wlc_JobId   job;
+       int     i,pid = getpid(),noent = 0;
+
+       for (i=0; i<100; i++) {
+               int     err;
+               char    *et,*ed;
+
+               edg_wll_InitContext(&ctx);
+               edg_wlc_JobIdParse("https://fake.server/fakejob",&job);
+
+               if ((err = edg_wll_SetLoggingJobProxy(ctx,job,NULL,"some user",0))) edg_wll_Error(ctx,&et,&ed);
+               else et = ed = "none";
+
+               printf("[%d] %d: %s (%s)\n",pid,i,
+                               err == 0 || err == ENOENT ? "OK" : et,
+                               ed);
+
+               if (err == ENOENT) noent++;
+
+               edg_wll_LogUserTagProxy(ctx,"test","x");
+
+               edg_wll_FreeContext(ctx);
+       }
+       printf("[%d] done, ENOENTs %d\n",pid,noent);
+       exit(0);
+}
diff --git a/org.glite.lb.client/examples/lbmon.c b/org.glite.lb.client/examples/lbmon.c
new file mode 100644 (file)
index 0000000..2d466b9
--- /dev/null
@@ -0,0 +1,137 @@
+#ident "$Header$"
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <errno.h>
+#include <getopt.h>
+#include <time.h>
+
+#include "glite/lb/consumer.h"
+
+static void usage(char *);
+static int query_all(edg_wll_Context, int, struct timeval, edg_wll_JobStat **, edg_wlc_JobId **);
+static void dgerr(edg_wll_Context,char *);
+
+static char    *myname = NULL;
+static int     debug = 0, verbose = 0, lbproxy =0;
+static const char rcsid[] = "@(#)$Id$";
+
+static struct option const long_options[] = {
+        { "help", no_argument, 0, 'h' },
+        { "version", no_argument, 0, 'V' },
+        { "verbose", no_argument, 0, 'v' },
+        { "debug", no_argument, 0, 'd' },
+        { "lbproxy", required_argument, 0, 'x' },
+        { NULL, 0, NULL, 0}
+};
+
+int main(int argc,char *argv[]) {
+       edg_wll_Context         ctx;
+       edg_wll_JobStat         *statesOut = NULL;
+       edg_wlc_JobId           *jobsOut = NULL;
+       struct timeval time_now;
+
+       int i, result, opt, nJobs;
+       i = result = opt = 0;
+       gettimeofday(&time_now,0);
+
+       myname = argv[0];
+       fprintf(stdout,"\n");
+       /* get arguments */
+       while ((opt = getopt_long(argc,argv,
+               "h"  /* help */
+               "V"  /* version */
+               "v"  /* verbose */
+               "d"  /* debug */
+               "x", /* lbproxy */
+               long_options, (int *) 0)) != EOF) {
+
+               switch (opt) {
+                       case 'V': fprintf(stdout,"%s:\t%s\n",argv[0],rcsid); exit(0);
+                       case 'v': verbose = 1; break;
+                       case 'd': debug = 1; break;
+                       case 'x': lbproxy = 1; break;
+                       case 'h':
+                       default:
+                               usage(argv[0]); exit(0);
+               }
+       }
+       if ( edg_wll_InitContext(&ctx) ) {
+               fprintf(stderr,"%s: cannot initialize edg_wll_Context\n ",myname);
+               exit(1);
+       }
+
+       if ( (result = query_all(ctx, EDG_WLL_JOB_CLEARED, time_now, &statesOut, &jobsOut)) ) {
+               dgerr(ctx, "edg_wll_QueryJobs");
+       } else {
+               fprintf(stdout,"Number of jobs... ");
+       }
+
+       nJobs = 0;
+       if ( jobsOut ) {
+               for (i=0; jobsOut[i]; i++) edg_wlc_JobIdFree(jobsOut[i]); {
+                       nJobs++;
+                       free(jobsOut);
+               }
+       }
+       if ( statesOut ) {
+               for (i=0; statesOut[i].state; i++) edg_wll_FreeStatus(&statesOut[i]);
+                       free(statesOut);
+                }
+       edg_wll_FreeContext(ctx);
+
+
+       return result;
+}
+
+static void
+usage(char *name) {
+       fprintf(stderr,"Usage: %s [-x]\n", name);
+}
+
+static int
+query_all(edg_wll_Context ctx, int query_status, struct timeval query_time, edg_wll_JobStat **statesOut, edg_wlc_JobId **jobsOut) {
+       edg_wll_QueryRec        jc[3];
+       int                     ret;
+
+       memset(jc, 0, sizeof jc);
+
+       /* jobs in the state 'query_status' within last hour */
+       jc[0].attr = EDG_WLL_QUERY_ATTR_STATUS;
+       jc[0].op = EDG_WLL_QUERY_OP_EQUAL;
+       jc[0].value.i = query_status;
+       jc[1].attr = EDG_WLL_QUERY_ATTR_TIME;
+       jc[1].attr_id.state = query_status;
+       jc[1].op = EDG_WLL_QUERY_OP_WITHIN;
+       jc[1].value.t.tv_sec = query_time.tv_sec - 3600;
+       jc[1].value.t.tv_usec = query_time.tv_usec;
+       jc[1].value2.t.tv_sec = query_time.tv_sec;
+       jc[1].value2.t.tv_usec = query_time.tv_usec;
+       jc[2].attr = EDG_WLL_QUERY_ATTR_UNDEF;
+
+       if ( (ret = edg_wll_QueryJobs(ctx, jc, 0, jobsOut, statesOut)) ) {
+               if ( ret == E2BIG ) {
+                       int r;
+                       if ( edg_wll_GetParam(ctx, EDG_WLL_PARAM_QUERY_RESULTS, &r) ) return ret;
+                       if ( r != EDG_WLL_QUERYRES_LIMITED ) return ret;
+
+                       fprintf(stderr," edg_wll_QueryJobs() Warning: only limited result returned!\n");
+                       return 0;
+               } else return ret;
+       }
+
+       return ret;
+}
+
+static void
+dgerr(edg_wll_Context ctx,char *where) {
+       char    *etxt,*edsc;
+
+       edg_wll_Error(ctx,&etxt,&edsc);
+       fprintf(stderr,"%s: %s: %s",myname,where,etxt);
+       if (edsc) fprintf(stderr," (%s)",edsc);
+       putc('\n',stderr);
+       if(etxt) free(etxt); 
+       if(edsc) free(edsc);
+}
index 7fb2f41..8a32d8c 100644 (file)
@@ -20,6 +20,9 @@
 
        Revision history:
        $Log$
+       Revision 1.5  2006/03/15 18:02:37  akrenek
+       cares
+
        Revision 1.4  2006/03/15 17:59:04  akrenek
        merge 1.5
        
@@ -28,7 +31,7 @@
        
        Revision 1.3  2004/08/31 16:32:31  jpospi
        First attempt to producer test
-       
+
        Revision 1.2  2004/07/06 20:47:11  flammer
        Moved to configure.properties.xml
        
index 6202d21..4230b07 100644 (file)
@@ -17,7 +17,7 @@
 #include "glite/lb/context-int.h"
 #include "glite/lb/mini_http.h"
 
-
+#include "connection.h"
 
 static void CloseConnection(edg_wll_Context ctx, int conn_index)
 {
@@ -116,7 +116,7 @@ int edg_wll_close_proxy(edg_wll_Context ctx)
 {
        edg_wll_plain_close(&ctx->connProxy->conn);
 
-       return edg_wll_Error(ctx, NULL, NULL);
+       return edg_wll_ResetError(ctx);
 }
 
 
@@ -202,7 +202,11 @@ int edg_wll_open_proxy(edg_wll_Context ctx)
 {
        struct sockaddr_un      saddr;
        int                     flags;
-       
+       int     err;
+       char    *ed = NULL;
+       int     retries = 0;
+
+       edg_wll_ResetError(ctx);
 
        edg_wll_ResetError(ctx);
 
@@ -236,19 +240,48 @@ int edg_wll_open_proxy(edg_wll_Context ctx)
                goto err;
        }
 
-       if (connect(ctx->connProxy->conn.sock, (struct sockaddr *)&saddr, sizeof(saddr)) < 0) {
-               edg_wll_SetError(ctx, errno, "connect()");
+       while ((err = connect(ctx->connProxy->conn.sock, (struct sockaddr *)&saddr, sizeof(saddr))) < 0 &&
+                       errno == EAGAIN &&
+                       ctx->p_tmp_timeout.tv_sec >= 0 && ctx->p_tmp_timeout.tv_usec >= 0 &&
+                       !(ctx->p_tmp_timeout.tv_sec == 0 && ctx->p_tmp_timeout.tv_usec == 0)
+                       )
+       {
+               struct timespec ns = { 0, PROXY_CONNECT_RETRY * 1000000 /* 10 ms */ },rem;
+
+               nanosleep(&ns,&rem);
+
+               ctx->p_tmp_timeout.tv_usec -= ns.tv_nsec/1000;
+               ctx->p_tmp_timeout.tv_usec += rem.tv_nsec/1000;
+
+               ctx->p_tmp_timeout.tv_sec -= ns.tv_sec;
+               ctx->p_tmp_timeout.tv_sec += rem.tv_sec;
+
+               if (ctx->p_tmp_timeout.tv_usec < 0) {
+                       ctx->p_tmp_timeout.tv_usec += 1000000;
+                       ctx->p_tmp_timeout.tv_sec--;
+               }
+               retries++;
+       }
+
+       /* printf("retries %d\n",retries); */
+
+       if (err) {
+               if (errno == EAGAIN) edg_wll_SetError(ctx,ETIMEDOUT, "edg_wll_open_proxy()");
+               else edg_wll_SetError(ctx, errno, "connect()");
                goto err;
        }
 
-       return edg_wll_Error(ctx,NULL,NULL);    
+       return 0;
        
 err:
        /* some error occured; close created connection */
 
+       err = edg_wll_Error(ctx,NULL,&ed);
        edg_wll_close_proxy(ctx);
+       edg_wll_SetError(ctx,err,ed);
+       free(ed);
                
-       return edg_wll_Error(ctx,NULL,NULL);
+       return err;
 }
        
 
index 7de349d..1b12058 100644 (file)
@@ -1,7 +1,7 @@
 #ifndef __EDG_WORKLOAD_LOGGING_CLIENT_CONNECTION_H__
 #define __EDG_WORKLOAD_LOGGING_CLIENT_CONNECTION_H__
 
-#ident "$Header"
+#ident "$Header$"
 
 int edg_wll_close(edg_wll_Context ctx);
 int edg_wll_open(edg_wll_Context ctx);
@@ -13,5 +13,7 @@ int edg_wll_http_send_recv_proxy(edg_wll_Context, char *, const char * const *,
 
 int http_check_status(edg_wll_Context, char *);
 
+#define PROXY_CONNECT_RETRY 10 /* ms */
+
 
 #endif /* __EDG_WORKLOAD_LOGGING_CLIENT_CONNECTION_H__ */
index 0282c0a..4239d24 100644 (file)
@@ -27,6 +27,7 @@
 #include "glite/lb/escape.h"
 
 #include "prod_proto.h"
+#include "connection.h"
 
 static const char* socket_path="/tmp/lb_proxy_store.sock";
 
@@ -156,7 +157,7 @@ int edg_wll_DoLogEventProxy(
        edg_wll_LogLine logline)
 {
        int     answer;
-       int     flags;
+       int     flags,retries;
        char    *name_esc,*dguser;
        struct sockaddr_un saddr;
        edg_wll_PlainConnection conn;
@@ -187,6 +188,10 @@ int edg_wll_DoLogEventProxy(
                close(conn.sock);
                goto edg_wll_DoLogEventProxy_end;
        }
+
+/* non-retry variant (pre bug #18994)
+ * XXX: what is the EISCONN case good for? conn.sock is created above.
+ *
        if (connect(conn.sock, (struct sockaddr *)&saddr, sizeof(saddr)) < 0) {
                if(errno != EISCONN) {
                        edg_wll_SetError(context,answer = errno,"connect()");
@@ -194,6 +199,40 @@ int edg_wll_DoLogEventProxy(
                        goto edg_wll_DoLogEventProxy_end;
                }
        }
+*/
+
+       retries = 0;
+       while ((answer = connect(conn.sock, (struct sockaddr *)&saddr, sizeof(saddr))) < 0 &&
+                       errno == EAGAIN &&
+                       context->p_tmp_timeout.tv_sec >= 0 && context->p_tmp_timeout.tv_usec >= 0 &&
+                       !(context->p_tmp_timeout.tv_sec == 0 && context->p_tmp_timeout.tv_usec == 0)
+                       )
+       {
+               struct timespec ns = { 0, PROXY_CONNECT_RETRY * 1000000 /* 10 ms */ },rem;
+
+               nanosleep(&ns,&rem);
+
+               context->p_tmp_timeout.tv_usec -= ns.tv_nsec/1000;
+               context->p_tmp_timeout.tv_usec += rem.tv_nsec/1000;
+
+               context->p_tmp_timeout.tv_sec -= ns.tv_sec;
+               context->p_tmp_timeout.tv_sec += rem.tv_sec;
+
+               if (context->p_tmp_timeout.tv_usec < 0) {
+                       context->p_tmp_timeout.tv_usec += 1000000;
+                       context->p_tmp_timeout.tv_sec--;
+               }
+               retries++;
+       }
+
+       if (answer) {
+               if (errno == EAGAIN) edg_wll_SetError(context,answer = ETIMEDOUT,"edg_wll_DoLogEventProxy connect()");
+               else edg_wll_SetError(context,answer = errno,"connect()");
+               close(conn.sock);
+               goto edg_wll_DoLogEventProxy_end;
+       }
+
+/* just debug  if (retries) printf("edg_wll_DoLogEventProxy connect retries %d\n",retries); */
 
    /* add DG.USER to the message: */
         name_esc = edg_wll_LogEscape(context->p_user_lbproxy);