From 25ca368fd3792738aa22f423428342842af4880d Mon Sep 17 00:00:00 2001 From: =?utf8?q?Ale=C5=A1=20K=C5=99enek?= Date: Tue, 15 Aug 2006 20:06:05 +0000 Subject: [PATCH] merge of changes betwen merge_for_3_1_src and merge_30_31_src --- org.glite.lb.client/Makefile | 33 ++--- org.glite.lb.client/examples/flood_proxy.c | 77 ++++++++++++ org.glite.lb.client/examples/lbmon.c | 137 +++++++++++++++++++++ .../project/configure.properties.xml | 5 +- org.glite.lb.client/src/connection.c | 47 +++++-- org.glite.lb.client/src/connection.h | 4 +- org.glite.lb.client/src/producer.c | 41 +++++- 7 files changed, 314 insertions(+), 30 deletions(-) create mode 100644 org.glite.lb.client/examples/flood_proxy.c create mode 100644 org.glite.lb.client/examples/lbmon.c diff --git a/org.glite.lb.client/Makefile b/org.glite.lb.client/Makefile index 7fe5927..cec8b5a 100644 --- a/org.glite.lb.client/Makefile +++ b/org.glite.lb.client/Makefile @@ -69,29 +69,22 @@ CFLAGS:=${DEBUG} \ -I${top_srcdir}/src -I${top_srcdir}/interface \ -I${stagedir}/include \ -I${glite_location}/include \ - -I${expat_prefix}/include \ - -I${mysql_prefix}/include -I${mysql_prefix}/include/mysql \ ${COVERAGE_FLAGS} \ -D_GNU_SOURCE +# -I${expat_prefix}/include \ +# -I${mysql_prefix}/include -I${mysql_prefix}/include/mysql \ CXXFLAGS:=${CFLAGS} -ifneq (${mysql_prefix},/usr) - ifeq ($(shell echo ${mysql_version} | cut -d. -f1,2),4.1) - mysqlib := -L${mysql_prefix}/lib/mysql - else - mysqlib := -L${mysql_prefix}/lib - endif -endif +#ifneq (${expat_prefix},/usr) +# EXPAT_LIBS:=-L${expat_prefix}/lib +#endif +#EXPAT_LIBS := ${EXPAT_LIBS} -lexpat -ifneq (${expat_prefix},/usr) - EXPAT_LIBS:=-L${expat_prefix}/lib -endif -EXPAT_LIBS := ${EXPAT_LIBS} -lexpat - -EXT_LIB:= ${mysqlib} -lmysqlclient -lz\ - ${EXPAT_LIBS} +EXT_LIB:= +#${mysqlib} -lmysqlclient -lz\ +# ${EXPAT_LIBS} TEST_LIBS:=-L${cppunit_prefix}/lib -lcppunit TEST_INC:=-I${cppunit_prefix}/include @@ -136,9 +129,9 @@ PLUSLIB:=libglite_lb_clientpp_${nothrflavour}.la THRPLUSLIB:=libglite_lb_clientpp_${thrflavour}.la TOOLS:=dump load purge lb_dump_exporter -EXAMPLES:=log_usertag_proxy job_log job_reg feed_shark notify query_ext query_seq_code stats abort_job change_acl stresslog +EXAMPLES:=log_usertag_proxy job_log job_reg feed_shark notify query_ext query_seq_code stats abort_job change_acl stresslog lbmon flood_proxy -EXAMPLES_CL=user_jobs job_status +EXAMPLES_CL=user_jobs job_status FAKE_EXAMPLES:=job_log_fake MAN_GZ:=glite-lb-logevent.1.gz @@ -237,7 +230,7 @@ ${MAN_GZ}: ${MAN} default: all -compile all: ${LIB} ${THRLIB} ${TOOLS} logevent ${PLUSLIB} ${THRPLUSLIB} examples ${MAN_GZ} +compile all: check_version ${LIB} ${THRLIB} ${TOOLS} logevent ${PLUSLIB} ${THRPLUSLIB} examples ${MAN_GZ} examples: ${EXAMPLES} ${EXAMPLES_CL} ${sh_PROGS} @@ -283,7 +276,7 @@ install: ${INSTALL} -m 644 ${top_srcdir}/LICENSE ${PREFIX}/share/doc/${package}-${version} ${INSTALL} -m 644 ${top_srcdir}/doc/README-fake ${top_srcdir}/doc/README-notify ${PREFIX}/share/doc/${package}-${version} mkdir -p ${PREFIX}/examples - for p in logevent ; do \ + for p in logevent; do \ ${INSTALL} -m 755 "$$p" "${PREFIX}/bin/glite-lb-$$p"; \ done for p in ${TOOLS} ; do \ diff --git a/org.glite.lb.client/examples/flood_proxy.c b/org.glite.lb.client/examples/flood_proxy.c new file mode 100644 index 0000000..f59e426 --- /dev/null +++ b/org.glite.lb.client/examples/flood_proxy.c @@ -0,0 +1,77 @@ +#include +#include +#include +#include +#include +#include +#include +#include + +#include "glite/lb/producer.h" +#include "glite/wmsutils/jobid/cjobid.h" + +static void slave(); + +int main(int argc,char **argv) +{ + int i,nproc; + + if (argc != 2) { + fprintf(stderr,"usage: %s nproc\n",argv[0]); + return 1; + } + + nproc = atoi(argv[1]); + if (nproc < 1) { + fprintf(stderr,"%s: nproc must be >= 1\n",argv[0]); + return 1; + } + + for (i=0; i +#include +#include +#include +#include +#include + +#include "glite/lb/consumer.h" + +static void usage(char *); +static int query_all(edg_wll_Context, int, struct timeval, edg_wll_JobStat **, edg_wlc_JobId **); +static void dgerr(edg_wll_Context,char *); + +static char *myname = NULL; +static int debug = 0, verbose = 0, lbproxy =0; +static const char rcsid[] = "@(#)$Id$"; + +static struct option const long_options[] = { + { "help", no_argument, 0, 'h' }, + { "version", no_argument, 0, 'V' }, + { "verbose", no_argument, 0, 'v' }, + { "debug", no_argument, 0, 'd' }, + { "lbproxy", required_argument, 0, 'x' }, + { NULL, 0, NULL, 0} +}; + +int main(int argc,char *argv[]) { + edg_wll_Context ctx; + edg_wll_JobStat *statesOut = NULL; + edg_wlc_JobId *jobsOut = NULL; + struct timeval time_now; + + int i, result, opt, nJobs; + i = result = opt = 0; + gettimeofday(&time_now,0); + + myname = argv[0]; + fprintf(stdout,"\n"); + /* get arguments */ + while ((opt = getopt_long(argc,argv, + "h" /* help */ + "V" /* version */ + "v" /* verbose */ + "d" /* debug */ + "x", /* lbproxy */ + long_options, (int *) 0)) != EOF) { + + switch (opt) { + case 'V': fprintf(stdout,"%s:\t%s\n",argv[0],rcsid); exit(0); + case 'v': verbose = 1; break; + case 'd': debug = 1; break; + case 'x': lbproxy = 1; break; + case 'h': + default: + usage(argv[0]); exit(0); + } + } + if ( edg_wll_InitContext(&ctx) ) { + fprintf(stderr,"%s: cannot initialize edg_wll_Context\n ",myname); + exit(1); + } + + if ( (result = query_all(ctx, EDG_WLL_JOB_CLEARED, time_now, &statesOut, &jobsOut)) ) { + dgerr(ctx, "edg_wll_QueryJobs"); + } else { + fprintf(stdout,"Number of jobs... "); + } + + nJobs = 0; + if ( jobsOut ) { + for (i=0; jobsOut[i]; i++) edg_wlc_JobIdFree(jobsOut[i]); { + nJobs++; + free(jobsOut); + } + } + if ( statesOut ) { + for (i=0; statesOut[i].state; i++) edg_wll_FreeStatus(&statesOut[i]); + free(statesOut); + } + edg_wll_FreeContext(ctx); + + + return result; +} + +static void +usage(char *name) { + fprintf(stderr,"Usage: %s [-x]\n", name); +} + +static int +query_all(edg_wll_Context ctx, int query_status, struct timeval query_time, edg_wll_JobStat **statesOut, edg_wlc_JobId **jobsOut) { + edg_wll_QueryRec jc[3]; + int ret; + + memset(jc, 0, sizeof jc); + + /* jobs in the state 'query_status' within last hour */ + jc[0].attr = EDG_WLL_QUERY_ATTR_STATUS; + jc[0].op = EDG_WLL_QUERY_OP_EQUAL; + jc[0].value.i = query_status; + jc[1].attr = EDG_WLL_QUERY_ATTR_TIME; + jc[1].attr_id.state = query_status; + jc[1].op = EDG_WLL_QUERY_OP_WITHIN; + jc[1].value.t.tv_sec = query_time.tv_sec - 3600; + jc[1].value.t.tv_usec = query_time.tv_usec; + jc[1].value2.t.tv_sec = query_time.tv_sec; + jc[1].value2.t.tv_usec = query_time.tv_usec; + jc[2].attr = EDG_WLL_QUERY_ATTR_UNDEF; + + if ( (ret = edg_wll_QueryJobs(ctx, jc, 0, jobsOut, statesOut)) ) { + if ( ret == E2BIG ) { + int r; + if ( edg_wll_GetParam(ctx, EDG_WLL_PARAM_QUERY_RESULTS, &r) ) return ret; + if ( r != EDG_WLL_QUERYRES_LIMITED ) return ret; + + fprintf(stderr," edg_wll_QueryJobs() Warning: only limited result returned!\n"); + return 0; + } else return ret; + } + + return ret; +} + +static void +dgerr(edg_wll_Context ctx,char *where) { + char *etxt,*edsc; + + edg_wll_Error(ctx,&etxt,&edsc); + fprintf(stderr,"%s: %s: %s",myname,where,etxt); + if (edsc) fprintf(stderr," (%s)",edsc); + putc('\n',stderr); + if(etxt) free(etxt); + if(edsc) free(edsc); +} diff --git a/org.glite.lb.client/project/configure.properties.xml b/org.glite.lb.client/project/configure.properties.xml index 7fb2f41..8a32d8c 100644 --- a/org.glite.lb.client/project/configure.properties.xml +++ b/org.glite.lb.client/project/configure.properties.xml @@ -20,6 +20,9 @@ Revision history: $Log$ + Revision 1.5 2006/03/15 18:02:37 akrenek + cares + Revision 1.4 2006/03/15 17:59:04 akrenek merge 1.5 @@ -28,7 +31,7 @@ Revision 1.3 2004/08/31 16:32:31 jpospi First attempt to producer test - + Revision 1.2 2004/07/06 20:47:11 flammer Moved to configure.properties.xml diff --git a/org.glite.lb.client/src/connection.c b/org.glite.lb.client/src/connection.c index 6202d21..4230b07 100644 --- a/org.glite.lb.client/src/connection.c +++ b/org.glite.lb.client/src/connection.c @@ -17,7 +17,7 @@ #include "glite/lb/context-int.h" #include "glite/lb/mini_http.h" - +#include "connection.h" static void CloseConnection(edg_wll_Context ctx, int conn_index) { @@ -116,7 +116,7 @@ int edg_wll_close_proxy(edg_wll_Context ctx) { edg_wll_plain_close(&ctx->connProxy->conn); - return edg_wll_Error(ctx, NULL, NULL); + return edg_wll_ResetError(ctx); } @@ -202,7 +202,11 @@ int edg_wll_open_proxy(edg_wll_Context ctx) { struct sockaddr_un saddr; int flags; - + int err; + char *ed = NULL; + int retries = 0; + + edg_wll_ResetError(ctx); edg_wll_ResetError(ctx); @@ -236,19 +240,48 @@ int edg_wll_open_proxy(edg_wll_Context ctx) goto err; } - if (connect(ctx->connProxy->conn.sock, (struct sockaddr *)&saddr, sizeof(saddr)) < 0) { - edg_wll_SetError(ctx, errno, "connect()"); + while ((err = connect(ctx->connProxy->conn.sock, (struct sockaddr *)&saddr, sizeof(saddr))) < 0 && + errno == EAGAIN && + ctx->p_tmp_timeout.tv_sec >= 0 && ctx->p_tmp_timeout.tv_usec >= 0 && + !(ctx->p_tmp_timeout.tv_sec == 0 && ctx->p_tmp_timeout.tv_usec == 0) + ) + { + struct timespec ns = { 0, PROXY_CONNECT_RETRY * 1000000 /* 10 ms */ },rem; + + nanosleep(&ns,&rem); + + ctx->p_tmp_timeout.tv_usec -= ns.tv_nsec/1000; + ctx->p_tmp_timeout.tv_usec += rem.tv_nsec/1000; + + ctx->p_tmp_timeout.tv_sec -= ns.tv_sec; + ctx->p_tmp_timeout.tv_sec += rem.tv_sec; + + if (ctx->p_tmp_timeout.tv_usec < 0) { + ctx->p_tmp_timeout.tv_usec += 1000000; + ctx->p_tmp_timeout.tv_sec--; + } + retries++; + } + + /* printf("retries %d\n",retries); */ + + if (err) { + if (errno == EAGAIN) edg_wll_SetError(ctx,ETIMEDOUT, "edg_wll_open_proxy()"); + else edg_wll_SetError(ctx, errno, "connect()"); goto err; } - return edg_wll_Error(ctx,NULL,NULL); + return 0; err: /* some error occured; close created connection */ + err = edg_wll_Error(ctx,NULL,&ed); edg_wll_close_proxy(ctx); + edg_wll_SetError(ctx,err,ed); + free(ed); - return edg_wll_Error(ctx,NULL,NULL); + return err; } diff --git a/org.glite.lb.client/src/connection.h b/org.glite.lb.client/src/connection.h index 7de349d..1b12058 100644 --- a/org.glite.lb.client/src/connection.h +++ b/org.glite.lb.client/src/connection.h @@ -1,7 +1,7 @@ #ifndef __EDG_WORKLOAD_LOGGING_CLIENT_CONNECTION_H__ #define __EDG_WORKLOAD_LOGGING_CLIENT_CONNECTION_H__ -#ident "$Header" +#ident "$Header$" int edg_wll_close(edg_wll_Context ctx); int edg_wll_open(edg_wll_Context ctx); @@ -13,5 +13,7 @@ int edg_wll_http_send_recv_proxy(edg_wll_Context, char *, const char * const *, int http_check_status(edg_wll_Context, char *); +#define PROXY_CONNECT_RETRY 10 /* ms */ + #endif /* __EDG_WORKLOAD_LOGGING_CLIENT_CONNECTION_H__ */ diff --git a/org.glite.lb.client/src/producer.c b/org.glite.lb.client/src/producer.c index 0282c0a..4239d24 100644 --- a/org.glite.lb.client/src/producer.c +++ b/org.glite.lb.client/src/producer.c @@ -27,6 +27,7 @@ #include "glite/lb/escape.h" #include "prod_proto.h" +#include "connection.h" static const char* socket_path="/tmp/lb_proxy_store.sock"; @@ -156,7 +157,7 @@ int edg_wll_DoLogEventProxy( edg_wll_LogLine logline) { int answer; - int flags; + int flags,retries; char *name_esc,*dguser; struct sockaddr_un saddr; edg_wll_PlainConnection conn; @@ -187,6 +188,10 @@ int edg_wll_DoLogEventProxy( close(conn.sock); goto edg_wll_DoLogEventProxy_end; } + +/* non-retry variant (pre bug #18994) + * XXX: what is the EISCONN case good for? conn.sock is created above. + * if (connect(conn.sock, (struct sockaddr *)&saddr, sizeof(saddr)) < 0) { if(errno != EISCONN) { edg_wll_SetError(context,answer = errno,"connect()"); @@ -194,6 +199,40 @@ int edg_wll_DoLogEventProxy( goto edg_wll_DoLogEventProxy_end; } } +*/ + + retries = 0; + while ((answer = connect(conn.sock, (struct sockaddr *)&saddr, sizeof(saddr))) < 0 && + errno == EAGAIN && + context->p_tmp_timeout.tv_sec >= 0 && context->p_tmp_timeout.tv_usec >= 0 && + !(context->p_tmp_timeout.tv_sec == 0 && context->p_tmp_timeout.tv_usec == 0) + ) + { + struct timespec ns = { 0, PROXY_CONNECT_RETRY * 1000000 /* 10 ms */ },rem; + + nanosleep(&ns,&rem); + + context->p_tmp_timeout.tv_usec -= ns.tv_nsec/1000; + context->p_tmp_timeout.tv_usec += rem.tv_nsec/1000; + + context->p_tmp_timeout.tv_sec -= ns.tv_sec; + context->p_tmp_timeout.tv_sec += rem.tv_sec; + + if (context->p_tmp_timeout.tv_usec < 0) { + context->p_tmp_timeout.tv_usec += 1000000; + context->p_tmp_timeout.tv_sec--; + } + retries++; + } + + if (answer) { + if (errno == EAGAIN) edg_wll_SetError(context,answer = ETIMEDOUT,"edg_wll_DoLogEventProxy connect()"); + else edg_wll_SetError(context,answer = errno,"connect()"); + close(conn.sock); + goto edg_wll_DoLogEventProxy_end; + } + +/* just debug if (retries) printf("edg_wll_DoLogEventProxy connect retries %d\n",retries); */ /* add DG.USER to the message: */ name_esc = edg_wll_LogEscape(context->p_user_lbproxy); -- 1.8.2.3