EDG_WLL_PARAM_QUERY_JOBS_LIMIT, /**< maximal query jobs result size */
EDG_WLL_PARAM_QUERY_EVENTS_LIMIT,/**< maximal query events result size */
EDG_WLL_PARAM_QUERY_RESULTS, /**< maximal query result size */
- EDG_WLL_PARAM_QUERY_CONNECTIONS,/**< maximal number of open connections in ctx->connPoll */
+ EDG_WLL_PARAM_CONNPOOL_SIZE, /**< maximal number of open connections in connectionsHandle.poolSize */
EDG_WLL_PARAM_NOTIF_SERVER, /**< default notification server name */
EDG_WLL_PARAM_NOTIF_SERVER_PORT,/**< default notification server port */
EDG_WLL_PARAM_NOTIF_TIMEOUT, /**< notif timeout */
EXAMPLES:=log_usertag_proxy job_log job_reg feed_shark notify query_ext query_seq_code stats abort_job change_acl stresslog
EXAMPLES:=log_usertag_proxy job_log job_reg feed_shark notify query_ext query_seq_code stats abort_job change_acl stresslog lbmon flood_proxy dagids stress_context
-EXAMPLES_CL=user_jobs job_status
+EXAMPLES_CL=user_jobs job_status
+EXAMPLES_CL_THR=user_jobs_threaded
FAKE_EXAMPLES:=job_log_fake
MAN_GZ:=glite-lb-logevent.1.gz
${EXAMPLES_CL}: %: %.o
${LINK} -o $@ $< ${LIB} ${COMMON_LIB} ${EXT_LIB}
+${EXAMPLES_CL_THR}: %: %.o
+ ${LINK} -o $@ $< ${THRLIB} ${COMMON_LIB_THR} ${EXT_LIB}
+
${FAKE_EXAMPLES}: %: %.o ${FAKELIB}
${LINK} -o $@ $< ${FAKELIB} ${TEST_LIBS} ${EXT_LIB}
compile all: check_version ${LIB} ${THRLIB} ${TOOLS} logevent ${PLUSLIB} ${THRPLUSLIB} examples ${MAN_GZ}
endif
-examples: ${EXAMPLES} ${EXAMPLES_CL} ${sh_PROGS}
+examples: ${EXAMPLES} ${EXAMPLES_CL} ${EXAMPLES_CL_THR} ${sh_PROGS}
fake: ${FAKE_EXAMPLES}
for p in ${TOOLS} ; do \
${INSTALL} -m 755 "$$p" "${PREFIX}/sbin/glite-lb-$$p"; \
done
- for p in ${EXAMPLES} ${EXAMPLES_CL} ${sh_PROGS} ; do \
+ for p in ${EXAMPLES} ${EXAMPLES_CL} ${EXAMPLES_CL_THR} ${sh_PROGS} ; do \
${INSTALL} -m 755 "$$p" "${PREFIX}/examples/glite-lb-$$p"; \
done
${INSTALL} -m 755 ${top_srcdir}/src/export.sh "${PREFIX}/sbin/glite-lb-export.sh"
--- /dev/null
+#ident "$Header$"
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+#include <getopt.h>
+
+#include <expat.h>
+
+#include <pthread.h>
+#include <globus_common.h>
+
+#include "glite/lb/context.h"
+#include "glite/lb/xml_conversions.h"
+#include "glite/lb/consumer.h"
+
+int use_proxy = 0;
+
+int (*user_jobs)(edg_wll_Context, edg_wlc_JobId **, edg_wll_JobStat **);
+
+
+
+static const char *get_opt_string = "hxj:t:p:r:s:w:";
+
+static struct option opts[] = {
+ {"help", 0, NULL, 'h'},
+ {"proxy", 0, NULL, 'x'},
+ {"owner", 1, NULL, 'o'},
+ {"num-threads", 1, NULL, 't'},
+ {"port_range", 1, NULL, 'p'},
+ {"repeat", 1, NULL, 'r'},
+ {"rand-start", 1, NULL, 's'},
+// {"rand-work", 1, NULL, 'w'},
+ {NULL, 0, NULL, 0}
+};
+
+
+
+static void usage(char *me)
+{
+ fprintf(stderr,"usage: %s [option]\n"
+ "\t-h, --help\t show this help\n"
+ "\t-x, --proxy\t contact proxy (not implemented yet)\n"
+ "\t-o, --owner DN\t show jobs of user with this DN\n"
+ "\t-t, --num-threads N\t number for threads to create\n"
+ "\t-p, --port-range N\t connect to server:port, server:port+10, \n"
+ "\t\t\t ... server:port+N*10 bkservers\n"
+ "\t-r, --repeat N\t repeat query in each slave N-times \n"
+ "\t-s, --rand-start N\t start threads in random interval <0,N> sec\n"
+// "\t-w, --rand-work N\t simulate random server respose time <0,N> sec\n"
+ "\n"
+ ,me);
+}
+
+
+typedef struct {
+ char *owner;
+ int proxy;
+ int rand_start;
+ int port_range;
+ int repeat;
+ char *argv_0;} thread_code_args;
+
+void *thread_code(thread_code_args *arguments) {
+
+ edg_wll_Context ctx;
+ char *errt,*errd;
+ edg_wlc_JobId *jobs = NULL;
+ edg_wll_JobStat *states = NULL;
+ int i,j,k,port;
+ long sl;
+
+
+ sl = (unsigned long) ((double) random()/ RAND_MAX * arguments->rand_start * 1000000);
+ printf("Thread [%d] - sleeping for %ld us\n",pthread_self(),sl);
+ usleep( sl );
+
+ user_jobs = edg_wll_UserJobs;
+
+ if (arguments->proxy) {
+ user_jobs = edg_wll_UserJobsProxy;
+ }
+
+ edg_wll_InitContext(&ctx);
+ if ( user_jobs == edg_wll_UserJobsProxy && arguments->owner )
+ edg_wll_SetParam(ctx, EDG_WLL_PARAM_LBPROXY_USER, arguments->owner);
+
+ edg_wll_GetParam(ctx, EDG_WLL_PARAM_QUERY_SERVER_PORT, &port);
+ // pthread_self tend to return even number, so /7 makes some odd numbers...
+ edg_wll_SetParam(ctx, EDG_WLL_PARAM_QUERY_SERVER_PORT,
+ port + ((long) pthread_self()/7 % arguments->port_range)*10 );
+
+ for (k=0; k<arguments->repeat; k++) {
+ if (user_jobs(ctx,&jobs,&states)) goto err;
+ for (i=0; states[i].state != EDG_WLL_JOB_UNDEF; i++) {
+ char *id = edg_wlc_JobIdUnparse(states[i].jobId),
+ *st = edg_wll_StatToString(states[i].state);
+
+ if (!states[i].parent_job) {
+ if (states[i].jobtype == EDG_WLL_STAT_SIMPLE) {
+ printf(" %s .... %s %s\n", id, st,
+ (states[i].state==EDG_WLL_JOB_DONE) ?
+ edg_wll_done_codeToString(states[i].done_code) : "" );
+ }
+ else if (states[i].jobtype == EDG_WLL_STAT_DAG) {
+ printf("DAG %s .... %s %s\n", id, st,
+ (states[i].state==EDG_WLL_JOB_DONE) ?
+ edg_wll_done_codeToString(states[i].done_code) : "");
+ for (j=0; states[j].state != EDG_WLL_JOB_UNDEF; j++) {
+ if (states[j].parent_job) {
+ char *par_id = edg_wlc_JobIdUnparse(states[j].parent_job);
+
+ if (!strcmp(id,par_id)) {
+ char *sub_id = edg_wlc_JobIdUnparse(states[j].jobId),
+ *sub_st = edg_wll_StatToString(states[j].state);
+
+ printf(" `- %s .... %s %s\n", sub_id, sub_st,
+ (states[j].state==EDG_WLL_JOB_DONE) ?
+ edg_wll_done_codeToString(states[j].done_code) : "");
+ free(sub_id);
+ free(sub_st);
+ }
+ free(par_id);
+ }
+ }
+ }
+ }
+
+ free(id);
+ free(st);
+ }
+
+ printf("\nFound %d jobs\n",i);
+ }
+err:
+ free(arguments->owner);
+ if (jobs) {
+ for (i=0; jobs[i]; i++) edg_wlc_JobIdFree(jobs[i]);
+ free(jobs);
+ }
+
+ if (states) {
+ for (i=0; states[i].state; i++) edg_wll_FreeStatus(&states[i]);
+ free(states);
+ }
+
+ if (edg_wll_Error(ctx,&errt,&errd)) {
+ fprintf(stderr,"%s: %s (%s)\n",arguments->argv_0,errt,errd);
+ edg_wll_FreeContext(ctx);
+ pthread_exit(NULL);
+ }
+
+ edg_wll_FreeContext(ctx);
+
+// printf("Thread %d exitting\n",(int)pthread_self());
+
+ pthread_exit(NULL);
+
+}
+
+int main(int argc,char **argv)
+{
+ thread_code_args arguments = { NULL, 0, 0, 1, 1,NULL };
+ int i,rc,status,opt;
+ int thr_num = 10; // default
+
+
+ while ((opt = getopt_long(argc,argv,get_opt_string,opts,NULL)) != EOF) switch (opt) {
+ case 'x': arguments.proxy = 1; break;
+ case 'o': arguments.owner = optarg; break;
+ case 't': thr_num = atoi(optarg); break;
+ case 'p': arguments.port_range = atoi(optarg); break;
+ case 'r': arguments.repeat = atoi(optarg); break;
+ case 's': arguments.rand_start = atoi(optarg); break;
+ default : usage(argv[0]); exit(0); break;
+ }
+
+ arguments.argv_0 = argv[0];
+
+ /* Do a thready work */
+ {
+ pthread_t threads[thr_num];
+ pthread_attr_t attr;
+
+ if (globus_module_activate(GLOBUS_COMMON_MODULE) != GLOBUS_SUCCESS) {
+ fputs("globus_module_activate()\n",stderr);
+ return 1;
+ }
+
+ pthread_attr_init(&attr);
+ pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
+
+ for (i=0;i<thr_num;i++) {
+ printf("Running thread %d\n",i);
+ rc = pthread_create(&threads[i], &attr, (void*)thread_code, &arguments );
+ if (rc){
+ printf("ERROR; return code from pthread_create() is %d\n", rc);
+ exit(-1);
+ }
+ }
+
+
+
+ // thread_code(&arguments);
+
+ // pthread_exit(NULL)
+
+ pthread_attr_destroy(&attr);
+
+ for (i=0;i<thr_num;i++) {
+ // printf("*** Joining thread %d.\n",i);
+ rc=pthread_join(threads[i],(void **)&status);
+ // printf("*** Thread %d joined. (status %d ret. code %d)\n",i,status,rc);
+
+ }
+ }
+
+ printf("Threaded example main() exitting\n");
+
+
+// exit(0);
+// pthread_exit(NULL);
+}
+
#include "glite/lb/consumer.h"
#include "glite/lb/context-int.h"
#include "glite/lb/mini_http.h"
+#include "glite/lb/connpool.h"
#include "connection.h"
-static void CloseConnection(edg_wll_Context ctx, int conn_index)
+
+static void CloseConnection(edg_wll_Context ctx, int* conn_index)
{
/* close connection ad free its structures */
OM_uint32 min_stat;
+ int cIndex;
+
+ cIndex = *conn_index;
- assert(ctx->connOpened);
- assert(conn_index < ctx->connOpened);
+ assert(ctx->connections->connOpened);
- edg_wll_gss_close(&ctx->connPool[conn_index].gss, &ctx->p_tmp_timeout);
- if (ctx->connPool[conn_index].gsiCred)
- gss_release_cred(&min_stat, &ctx->connPool[conn_index].gsiCred);
- free(ctx->connPool[conn_index].peerName);
- free(ctx->connPool[conn_index].buf);
+ edg_wll_gss_close(&ctx->connections->connPool[cIndex].gss, &ctx->p_tmp_timeout);
+ if (ctx->connections->connPool[cIndex].gsiCred)
+ gss_release_cred(&min_stat, &ctx->connections->connPool[cIndex].gsiCred);
+ free(ctx->connections->connPool[cIndex].peerName);
+ free(ctx->connections->connPool[cIndex].buf);
- memset(ctx->connPool + conn_index, 0, sizeof(edg_wll_ConnPool));
+ memset(ctx->connections->connPool + cIndex, 0, sizeof(edg_wll_ConnPool));
- /* if deleted conn was not the last one -> there is a 'hole' and then */
- /* 'shake' together connections in pool, no holes are allowed */
- if (conn_index < ctx->connOpened - 1) {
- ctx->connPool[conn_index] = ctx->connPool[ctx->connOpened - 1];
- memset(ctx->connPool + ctx->connOpened - 1 , 0, sizeof(edg_wll_ConnPool));
- }
- ctx->connOpened--;
+ ctx->connections->connOpened--;
+
+ *conn_index = cIndex;
}
{
int i;
- for (i=0; i<ctx->connOpened;i++)
- if (!strcmp(name, ctx->connPool[i].peerName) &&
- (port == ctx->connPool[i].peerPort)) return i;
-
+ for (i=0; i<ctx->connections->poolSize;i++) {
+ if ((ctx->connections->connPool[i].peerName != NULL) &&
+ !strcmp(name, ctx->connections->connPool[i].peerName) &&
+ (port == ctx->connections->connPool[i].peerPort)) {
+
+ /* TryLock (next line) is in fact used only
+ to check the mutex status */
+ switch (edg_wll_connectionTryLock(ctx, i)) {
+ case 0:
+ /* Connection was not locked but now it is. Since we do not
+ really know wheter we are interested in that connection, we
+ are simply unlocking it now. */
+ edg_wll_connectionUnlock(ctx, i);
+ return i;
+
+ case EBUSY:
+ /* Connection locked. Do not consider it */
+ // try to find another free connection
+ break;
+ default:
+ /* Some obscure error occured. Need inspection */
+ perror("ConnectionIndex() - locking problem \n");
+ assert(0);
+ }
+ }
+ }
+
return -1;
}
static int AddConnection(edg_wll_Context ctx, char *name, int port)
{
- int index = ctx->connOpened;
-
- free(ctx->connPool[index].peerName); // should be empty; just to be sure
- ctx->connPool[index].peerName = strdup(ctx->srvName);
- ctx->connPool[index].peerPort = ctx->srvPort;
- ctx->connOpened++;
+ int i,index = -1;
+
+ for (i = 0; i < ctx->connections->poolSize; i++) {
+ if (ctx->connections->connPool[i].peerName == NULL) {
+ if (!edg_wll_connectionTryLock(ctx, i)) {
+ index = i; // This connection is free and was not locked. We may lock it and use it.
+ break;
+ }
+ }
+ }
+
+ if (index < 0) return -1;
+
+ free(ctx->connections->connPool[index].peerName); // should be empty; just to be sure
+ ctx->connections->connPool[index].peerName = strdup(ctx->srvName);
+ ctx->connections->connPool[index].peerPort = ctx->srvPort;
+ ctx->connections->connOpened++;
return index;
}
-static void ReleaseConnection(edg_wll_Context ctx, char *name, int port)
+static int ReleaseConnection(edg_wll_Context ctx, char *name, int port)
{
- int i, index = 0;
+ int i, index = 0, foundConnToDrop = 0;
long min;
- if (ctx->connOpened == 0) return; /* nothing to release */
+ edg_wll_ResetError(ctx);
+ if (ctx->connections->connOpened == 0) return 0; /* nothing to release */
if (name) {
if ((index = ConnectionIndex(ctx, name, port)) >= 0)
- CloseConnection(ctx, index);
+ CloseConnection(ctx, &index);
}
- else { /* free the oldest connection*/
- min = ctx->connPool[0].lastUsed.tv_sec;
- for (i=0; i<ctx->connOpened; i++) {
- if (ctx->connPool[i].lastUsed.tv_sec < min) {
- min = ctx->connPool[i].lastUsed.tv_sec;
- index = i;
+ else { /* free the oldest (unlocked) connection */
+ for (i=0; i<ctx->connections->poolSize; i++) {
+ assert(ctx->connections->connPool[i].peerName); // Full pool expected - accept non-NULL values only
+ if (!edg_wll_connectionTryLock(ctx, i)) {
+ edg_wll_connectionUnlock(ctx, i); // Connection unlocked. Consider releasing it
+ if (foundConnToDrop) { // This is not the first unlocked connection
+ if (ctx->connections->connPool[i].lastUsed.tv_sec < min) {
+ min = ctx->connections->connPool[i].lastUsed.tv_sec;
+ index = i;
+ foundConnToDrop++;
+ }
+ }
+ else { // This is the first unlocked connection we have found.
+ foundConnToDrop++;
+ index = i;
+ min = ctx->connections->connPool[i].lastUsed.tv_sec;
+ }
}
}
- CloseConnection(ctx, index);
+ if (!foundConnToDrop) return edg_wll_SetError(ctx,EAGAIN,"all connections in the connection pool are locked");
+ CloseConnection(ctx, &index);
}
+ return edg_wll_Error(ctx,NULL,NULL);
}
-int edg_wll_close(edg_wll_Context ctx)
+int edg_wll_close(edg_wll_Context ctx, int* connToUse)
{
edg_wll_ResetError(ctx);
- if (ctx->connToUse == -1) return 0;
+ if (*connToUse == -1) return 0;
- CloseConnection(ctx, ctx->connToUse);
+ CloseConnection(ctx, connToUse);
+
+ edg_wll_connectionUnlock(ctx, *connToUse); /* Forgetting the conn. Unlocking is safe. */
- ctx->connToUse = -1;
+ *connToUse = -1;
return edg_wll_Error(ctx,NULL,NULL);
}
-int edg_wll_open(edg_wll_Context ctx)
+int edg_wll_open(edg_wll_Context ctx, int* connToUse)
{
int index;
edg_wll_GssStatus gss_stat;
edg_wll_ResetError(ctx);
+ edg_wll_poolLock(); /* We are going to search the pool, it has better be locked */
+
if ( (index = ConnectionIndex(ctx, ctx->srvName, ctx->srvPort)) == -1 ) {
/* no such open connection in pool */
- if (ctx->connOpened == ctx->poolSize)
- ReleaseConnection(ctx, NULL, 0);
+ if (ctx->connections->connOpened == ctx->connections->poolSize)
+ if(ReleaseConnection(ctx, NULL, 0)) goto end;
index = AddConnection(ctx, ctx->srvName, ctx->srvPort);
+ if (index < 0) {
+ edg_wll_SetError(ctx,EAGAIN,"connection pool size exceeded");
+ goto end;
+ }
+
+ #ifdef EDG_WLL_CONNPOOL_DEBUG
+ printf("Connection to %s:%d opened as No. %d in the pool\n",ctx->srvName,ctx->srvPort,index);
+ #endif
}
/* else - there is cached open connection, reuse it */
-
- ctx->connToUse = index;
+ #ifdef EDG_WLL_CONNPOOL_DEBUG
+ else printf("Connection to %s:%d exists (No. %d in the pool) - reusing\n",ctx->srvName,ctx->srvPort,index);
+ #endif
+
+ *connToUse = index;
/* XXX support anonymous connections, perhaps add a flag to the connPool
* struct specifying whether or not this connection shall be authenticated
* to prevent from repeated calls to edg_wll_gss_acquire_cred_gsi() */
- if (!ctx->connPool[index].gsiCred &&
+ if (!ctx->connections->connPool[index].gsiCred &&
edg_wll_gss_acquire_cred_gsi(
ctx->p_proxy_filename ? ctx->p_proxy_filename : ctx->p_cert_filename,
ctx->p_proxy_filename ? ctx->p_proxy_filename : ctx->p_key_filename,
- &ctx->connPool[index].gsiCred, NULL, &gss_stat)) {
+ &ctx->connections->connPool[index].gsiCred, NULL, &gss_stat)) {
edg_wll_SetErrorGss(ctx, "failed to load GSI credentials", &gss_stat);
goto err;
}
- if (ctx->connPool[index].gss.context == GSS_C_NO_CONTEXT) {
- switch (edg_wll_gss_connect(ctx->connPool[index].gsiCred,
- ctx->connPool[index].peerName, ctx->connPool[index].peerPort,
- &ctx->p_tmp_timeout,&ctx->connPool[index].gss,
+ if (ctx->connections->connPool[index].gss.context == GSS_C_NO_CONTEXT) {
+ switch (edg_wll_gss_connect(ctx->connections->connPool[index].gsiCred,
+ ctx->connections->connPool[index].peerName, ctx->connections->connPool[index].peerPort,
+ &ctx->p_tmp_timeout,&ctx->connections->connPool[index].gss,
&gss_stat)) {
case EDG_WLL_GSS_OK:
err:
/* some error occured; close created connection
* and free all fields in connPool[index] */
- CloseConnection(ctx, index);
- ctx->connToUse = -1;
+ if (index >= 0) CloseConnection(ctx, &index);
+ *connToUse = -1;
ok:
+
+ if (*connToUse>-1) edg_wll_connectionTryLock(ctx, *connToUse); /* Just to be sure we have not forgotten to lock it */
+
+end:
+
+ edg_wll_poolUnlock(); /* One way or the other, there are no more pool-wide operations */
+
+// xxxxx
+
return edg_wll_Error(ctx,NULL,NULL);
}
{
int ec;
char *ed = NULL;
+ int connToUse = -1; //Index of the connection to use. Used to be a context member.
- if (edg_wll_open(ctx)) return edg_wll_Error(ctx,NULL,NULL);
-
- switch (edg_wll_http_send(ctx,request,req_head,req_body)) {
+ if (edg_wll_open(ctx,&connToUse)) return edg_wll_Error(ctx,NULL,NULL);
+
+ switch (edg_wll_http_send(ctx,request,req_head,req_body,&ctx->connections->connPool[connToUse])) {
case ENOTCONN:
- edg_wll_close(ctx);
- if (edg_wll_open(ctx)
- || edg_wll_http_send(ctx,request,req_head,req_body))
+ edg_wll_close(ctx,&connToUse);
+ if (edg_wll_open(ctx,&connToUse)
+ || edg_wll_http_send(ctx,request,req_head,req_body,&ctx->connections->connPool[connToUse]))
goto err;
/* fallthrough */
case 0: break;
default: goto err;
}
- switch (edg_wll_http_recv(ctx,response,resp_head,resp_body)) {
+ switch (edg_wll_http_recv(ctx,response,resp_head,resp_body,&ctx->connections->connPool[connToUse])) {
case ENOTCONN:
- edg_wll_close(ctx);
- if (edg_wll_open(ctx)
- || edg_wll_http_send(ctx,request,req_head,req_body)
- || edg_wll_http_recv(ctx,response,resp_head,resp_body))
+ edg_wll_close(ctx,&connToUse);
+ if (edg_wll_open(ctx,&connToUse)
+ || edg_wll_http_send(ctx,request,req_head,req_body,&ctx->connections->connPool[connToUse])
+ || edg_wll_http_recv(ctx,response,resp_head,resp_body,&ctx->connections->connPool[connToUse]))
goto err;
/* fallthrough */
case 0: break;
default: goto err;
}
- assert(ctx->connToUse >= 0);
- gettimeofday(&ctx->connPool[ctx->connToUse].lastUsed, NULL);
+ assert(connToUse >= 0);
+ gettimeofday(&ctx->connections->connPool[connToUse].lastUsed, NULL);
+
+ edg_wll_connectionUnlock(ctx, connToUse);
+
return 0;
err:
ec = edg_wll_Error(ctx,NULL,&ed);
- edg_wll_close(ctx);
+ edg_wll_close(ctx,&connToUse);
edg_wll_SetError(ctx,ec,ed);
free(ed);
return ec;
#ident "$Header$"
-int edg_wll_close(edg_wll_Context ctx);
-int edg_wll_open(edg_wll_Context ctx);
+int edg_wll_close(edg_wll_Context ctx,int *);
+int edg_wll_open(edg_wll_Context ctx,int *);
int edg_wll_http_send_recv(edg_wll_Context, char *, const char * const *, char *, char **, char ***, char **);
int edg_wll_close_proxy(edg_wll_Context ctx);
OBJS:=${JOBID_OBJS} ${PERF_OBJS} lb_plain_io.o escape.o events.o mini_http.o query_rec.o \
status.o xml_conversions.o xml_parse.o ulm_parse.o param.o \
events_parse.o il_string.o il_int.o notifid.o \
- il_log.o il_msg.o log_msg.o context.o trio.o strio.o lb_maildir.o
+ il_log.o il_msg.o log_msg.o context.o trio.o strio.o lb_maildir.o \
+ connpool.o
LOBJS:=${OBJS:.o=.lo}
TRIO_OBJS:=escape.o trio.o strio.o
HDRS:=context-int.h lb_plain_io.h mini_http.h authz.h xml_parse.h \
xml_conversions.h log_proto.h events_parse.h il_string.h il_msg.h \
- escape.h ulm_parse.h trio.h lb_maildir.h ${PERF_HDRS}
+ escape.h ulm_parse.h trio.h lb_maildir.h connpool.h ${PERF_HDRS}
STATICLIB:=libglite_lb_common_${nothrflavour}.a
THRSTATICLIB:=libglite_lb_common_${thrflavour}.a
${CXX} -c ${CFLAGS} ${TEST_INC} $<
%.thr.o: %.c
- ${COMPILE} ${GLOBUSTHRINC} ${CFLAGS} -o $@ -c $<
+ ${COMPILE} ${GLOBUSTHRINC} ${CFLAGS} -D_REENTRANT -DGLITE_LB_THREADED -o $@ -c $<
%.h: %.h.T
rm -f $@
--- /dev/null
+#include "glite/security/glite_gss.h"
+#include "glite/lb/consumer.h"
+#include "lb_plain_io.h"
+#include "authz.h"
+#include "glite/lb/log_proto.h"
+#ifdef GLITE_LB_THREADED
+ #include <pthread.h>
+#endif
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#ifndef EDG_WLL_CONNPOOL_DECLARED
+#define EDG_WLL_CONNPOOL_DECLARED 1
+
+#define GLITE_LB_COMMON_CONNPOOL_SIZE 50
+
+
+typedef struct _edg_wll_ConnPool {
+/* address and port where we are connected to */
+ char *peerName;
+ unsigned int peerPort;
+
+/* http(s) stream */
+ gss_cred_id_t gsiCred;
+ edg_wll_GssConnection gss;
+ char *buf;
+ int bufUse,bufSize;
+
+/* timestamp of usage of this entry in ctx.connPool */
+ struct timeval lastUsed;
+} edg_wll_ConnPool;
+#endif
+
+
+#ifndef EDG_WLL_CONNECIONS_DECLARED
+#define EDG_WLL_CONNECIONS_DECLARED 1
+typedef struct _edg_wll_Connections {
+
+/* connection pool array */
+ edg_wll_ConnPool *connPool;
+ edg_wll_ConnPool *serverConnection; /* this is in fact a single-item array intended for use
+ by the server (since the server only uses one connection)*/
+
+/* pool of connections from client */
+ int poolSize; /* number of connections in the pool */
+ int connOpened; /* number of opened connections */
+
+/* Connection pool locks & accessories.*/
+#ifdef GLITE_LB_THREADED
+ pthread_mutex_t poolLock; /* Global pool lock (to be used for pool-wide operations carried out locally) */
+ pthread_mutex_t *connectionLock; /* Per-connection lock (used to lock out connections that are in use) */
+#endif
+ edg_wll_Context *locked_by; /* identifies contexts that have been used to lock a connection since they
+ do probably have a connToUse value stored in them*/
+
+} edg_wll_Connections;
+#endif
+
+
+/* Connections Global Handle */
+extern edg_wll_Connections connectionsHandle;
+
+/** ** Locking Functions (wrappers) **/
+/* Introduced to simplify debugging and switching betwen single/multi-threaded behavior */
+
+/** Lock (try) the pool */
+int edg_wll_poolTryLock();
+
+/** Lock the pool (blocking) */
+int edg_wll_poolLock();
+
+/** Unlock the pool */
+int edg_wll_poolUnlock();
+
+
+/** Lock (try) a single connection */
+int edg_wll_connectionTryLock(edg_wll_Context ctx, int index);
+
+/** Lock a single connection (blocking) */
+int edg_wll_connectionLock(edg_wll_Context ctx, int index);
+
+/** Unlock a connection */
+int edg_wll_connectionUnlock(edg_wll_Context ctx, int index);
+
+
+/** Free memory used by the pool and lock array */
+void edg_wll_poolFree();
+
+/** Allocate memory for the edg_wll_Connections structure and its properties and return a pointer.
+ in case memory has been already allocated, just return a pointer */
+edg_wll_Connections* edg_wll_initConnections();
+
+#ifdef __cplusplus
+}
+#endif
+
#include "glite/lb/consumer.h"
#include "lb_plain_io.h"
#include "authz.h"
+#include "connpool.h"
#ifdef __cplusplus
extern "C" {
-typedef struct _edg_wll_ConnPool {
-/* address and port where we are connected to */
- char *peerName;
- unsigned int peerPort;
-
-/* http(s) stream */
- gss_cred_id_t gsiCred;
- edg_wll_GssConnection gss;
- char *buf;
- int bufUse,bufSize;
-
-/* timestamp of usage of this entry in ctx.connPool */
- struct timeval lastUsed;
-} edg_wll_ConnPool;
-
-
-
struct _edg_wll_Context {
/* Error handling */
int errCode; /* recent error code */
/* server part */
void *mysql;
- edg_wll_ConnPool *connPool;
+ edg_wll_Connections *connections;
edg_wll_ConnPool *connPoolNotif; /* hold _one_ connection from notif-interlogger */
edg_wll_ConnProxy *connProxy; /* holds one plain connection */
char *srvName;
unsigned int srvPort;
-/* pool of connections from client */
- int poolSize;
- int connOpened; /* number of opened connections */
- int connToUse; /* index of connection that will *
- * be used by low-level f-cions */
- // XXX similar variables will be needed for connPoolNotif
-
-
/* other client stuff */
int notifSock; /* default client socket *
* for receiving notifications */
/**
* default maximal number of simultaneously open connections from one client
*/
-#define EDG_WLL_LOG_CONNECTIONS_DEFAULT 4
+// XXX: not used? #define EDG_WLL_LOG_CONNECTIONS_DEFAULT 4
#ifdef __cplusplus
#ident "$Header$"
#include "glite/lb/consumer.h"
+#include "connpool.h"
/* XXX: not a good place for the folowing #def's but we ain't got better currently */
/** protocol version */
extern edg_wll_ErrorCode edg_wll_http_recv(
edg_wll_Context, /* INOUT: context */
- char **, /* OUT: first line */
- char ***, /* OUT: null terminated array of headers */
- char ** /* OUT: message body */
+ char **, /* OUT: first line */
+ char ***, /* OUT: null terminated array of headers */
+ char **, /* OUT: message body */
+ edg_wll_ConnPool *connPTR /* IN: Pointer to the connection to use */
);
extern edg_wll_ErrorCode edg_wll_http_send(
edg_wll_Context, /* INOUT: context */
const char *, /* IN: first line */
char const * const *, /* IN: headers */
- const char * /* IN: message body */
+ const char *, /* IN: message body */
+ edg_wll_ConnPool *connPTR /* IN: Pointer to the connection to use */
);
extern edg_wll_ErrorCode edg_wll_http_recv_proxy(
--- /dev/null
+#include "connpool.h"
+
+#ifdef GLITE_LB_THREADED
+edg_wll_Connections connectionsHandle =
+ { NULL , NULL , GLITE_LB_COMMON_CONNPOOL_SIZE , 0 , PTHREAD_MUTEX_INITIALIZER , NULL , NULL};
+#endif
+
+#ifndef GLITE_LB_THREADED
+edg_wll_Connections connectionsHandle =
+ { NULL , NULL , GLITE_LB_COMMON_CONNPOOL_SIZE , 0 , NULL};
+#endif
+
+/** Lock (try) the pool */
+int edg_wll_poolTryLock() {
+int RetVal;
+
+ #ifdef GLITE_LB_THREADED /* Threaded version */
+ #ifdef EDG_WLL_CONNPOOL_DEBUG
+ printf("Thread %d trying to lock the pool ",pthread_self());
+ #endif
+ RetVal =
+ pthread_mutex_trylock(
+ &connectionsHandle.poolLock
+ );
+
+ #ifdef EDG_WLL_CONNPOOL_DEBUG
+ printf("- result %d\n",RetVal);
+ #endif
+ #endif
+
+
+ #ifndef GLITE_LB_THREADED /* Single-thread version */
+ RetVal = 0;
+
+ #ifdef EDG_WLL_CONNPOOL_DEBUG
+ printf("Dummy - Trying to lock the pool (pthreads not included)\n");
+ #endif
+ #endif
+
+
+ return(RetVal);
+}
+
+
+/** Lock the pool (blocking) */
+int edg_wll_poolLock() {
+int RetVal;
+
+ #ifdef GLITE_LB_THREADED
+
+ #ifdef EDG_WLL_CONNPOOL_DEBUG
+ printf("Thread %d trying to lock the pool\n",pthread_self());
+ #endif
+
+ RetVal = pthread_mutex_trylock(&connectionsHandle.poolLock);
+
+ if (RetVal) {
+
+ #ifdef EDG_WLL_CONNPOOL_DEBUG
+ printf("Thread %d locking the pool\n",pthread_self());
+ #endif
+ RetVal = pthread_mutex_lock(&connectionsHandle.poolLock);
+ }
+
+ #ifdef EDG_WLL_CONNPOOL_DEBUG
+ printf("Thread %d - lock result %d\n",pthread_self(),RetVal);
+ #endif
+ #endif
+
+ #ifndef GLITE_LB_THREADED
+ RetVal = 0;
+
+ #ifdef EDG_WLL_CONNPOOL_DEBUG
+ printf("Dummy - Locking the pool (pthreads not included)\n");
+ #endif
+ #endif
+
+
+ return(RetVal);
+}
+
+
+/** Unlock the pool */
+int edg_wll_poolUnlock() {
+int RetVal;
+
+ #ifdef GLITE_LB_THREADED
+ #ifdef EDG_WLL_CONNPOOL_DEBUG
+ printf("Thread %d unlocking the pool\n",pthread_self());
+ #endif
+ RetVal = pthread_mutex_unlock(&connectionsHandle.poolLock);
+
+ #ifdef EDG_WLL_CONNPOOL_DEBUG
+ printf("Thread %d - unlock result %d\n",pthread_self(),RetVal);
+ #endif
+
+ #endif
+
+ #ifndef GLITE_LB_THREADED
+ RetVal = 0;
+
+ #ifdef EDG_WLL_CONNPOOL_DEBUG
+ printf("Dummy - Unlocking the pool (pthreads not included)\n");
+ #endif
+
+ #endif
+
+
+ return(RetVal);
+}
+
+
+/** Lock (try) a single connection */
+int edg_wll_connectionTryLock(edg_wll_Context ctx, int index) {
+int RetVal;
+
+ #ifdef GLITE_LB_THREADED
+ RetVal = pthread_mutex_trylock(&connectionsHandle.connectionLock[index]); /* Try to lock the connection */
+ if (!RetVal) connectionsHandle.locked_by[index] = (void*)ctx; /* If lock succeeded, store the
+ locking context address */
+
+ #ifdef EDG_WLL_CONNPOOL_DEBUG
+ printf("Thread %d trying to lock connection No. [%d] - result %d\n",pthread_self(),index,RetVal);
+ #endif
+
+ #endif
+
+ #ifndef GLITE_LB_THREADED
+ RetVal = 0;
+
+ #ifdef EDG_WLL_CONNPOOL_DEBUG
+ printf("Dummy - Trying to lock connection No. [%d] (pthreads not included)\n",index);
+ #endif
+
+ #endif
+
+
+ return(RetVal);
+}
+
+
+/** Lock a single connection (blocking) */
+int edg_wll_connectionLock(edg_wll_Context ctx, int index) {
+int RetVal;
+
+ #ifdef GLITE_LB_THREADED
+ RetVal = pthread_mutex_lock(&connectionsHandle.connectionLock[index]); /* Lock the connection (wait if
+ not available)*/
+ if (!RetVal) connectionsHandle.locked_by[index] = (void*)ctx; /* If lock succeeded, store the
+ locking context address */
+
+ #ifdef EDG_WLL_CONNPOOL_DEBUG
+ printf("Thread %d locking connection No. [%d] - result %d\n",pthread_self(),index,RetVal);
+ #endif
+
+ #endif
+
+ #ifndef GLITE_LB_THREADED
+ RetVal = 0;
+
+ #ifdef EDG_WLL_CONNPOOL_DEBUG
+ printf("Dummy - locking connection No. [%d] (pthreads not included)\n",index);
+ #endif
+
+ #endif
+
+
+ return(RetVal);
+}
+
+
+/** Unlock a connection */
+int edg_wll_connectionUnlock(edg_wll_Context ctx, int index) {
+int RetVal;
+
+ #ifdef GLITE_LB_THREADED
+ RetVal = pthread_mutex_unlock(&connectionsHandle.connectionLock[index]);
+ if (!RetVal) connectionsHandle.locked_by[index] = NULL;
+
+ #ifdef EDG_WLL_CONNPOOL_DEBUG
+ printf("Thread %d unlocking connection No. [%d] - result %d\n",pthread_self(),index,RetVal);
+ #endif
+
+ #endif
+
+ #ifndef GLITE_LB_THREADED
+ RetVal = 0;
+
+ #ifdef EDG_WLL_CONNPOOL_DEBUG
+ printf("Dummy - unlocking connection No. [%d] (pthreads not included)\n",index);
+ #endif
+
+ #endif
+
+
+ return(RetVal);
+}
+
+
+/** Free all memory used by the pool and lock-related arrays */
+void edg_wll_poolFree() {
+ int i;
+ struct timeval close_timeout = {0, 50000}; /* Declarations taken over from edg_wll_FreeContext() */
+ OM_uint32 min_stat; /* Does not seem to have any use whatsoever - neither
+ here nor in edg_wll_FreeContext() */
+
+ #ifdef EDG_WLL_CONNPOOL_DEBUG
+ #ifdef GLITE_LB_THREADED
+ printf("Thread %d ",pthread_self());
+ #endif
+ printf("Entering edg_wll_poolFree\n");
+ #endif
+
+
+ for (i=0; i<connectionsHandle.poolSize; i++) {
+ if (connectionsHandle.connPool[i].peerName) free(connectionsHandle.connPool[i].peerName);
+ edg_wll_gss_close(&connectionsHandle.connPool[i].gss,&close_timeout);
+ if (connectionsHandle.connPool[i].gsiCred)
+ gss_release_cred(&min_stat, &connectionsHandle.connPool[i].gsiCred);
+ if (connectionsHandle.connPool[i].buf) free(connectionsHandle.connPool[i].buf);
+ }
+
+ edg_wll_poolLock();
+#ifdef GLITE_LB_THREADED
+ free(connectionsHandle.connectionLock);
+ connectionsHandle.connectionLock = NULL;
+#endif
+ free(connectionsHandle.serverConnection);
+ free(connectionsHandle.connPool);
+ free(connectionsHandle.locked_by);
+ connectionsHandle.serverConnection = NULL;
+ connectionsHandle.connPool = NULL;
+ connectionsHandle.locked_by = NULL;
+
+
+}
+
+/** Allocate memory for arrays within the edg_wll_Connections structure */
+edg_wll_Connections* edg_wll_initConnections() {
+
+
+ #ifdef EDG_WLL_CONNPOOL_DEBUG
+ #ifdef GLITE_LB_THREADED
+ printf("Thread %d ",pthread_self());
+ #endif
+ printf("Entering edg_wll_initConnections\n");
+ #endif
+
+ if((connectionsHandle.connPool == NULL) &&
+ (connectionsHandle.poolSize > 0)) { /* We need to allocate memory for the connPool and connectionLock arrays */
+
+ #ifdef EDG_WLL_CONNPOOL_DEBUG
+ #ifdef GLITE_LB_THREADED
+ printf("Thread %d ",pthread_self());
+ #endif
+ printf("Initializng connections AND THE CONNECTION POOL\n");
+ #endif
+
+
+ connectionsHandle.connPool = (edg_wll_ConnPool *) calloc(connectionsHandle.poolSize, sizeof(edg_wll_ConnPool));
+
+ #ifdef GLITE_LB_THREADED
+ connectionsHandle.connectionLock = (pthread_mutex_t *) calloc(connectionsHandle.poolSize, sizeof(pthread_mutex_t));
+ #endif
+
+ connectionsHandle.locked_by = (edg_wll_Context) calloc(connectionsHandle.poolSize, sizeof(edg_wll_Context));
+
+ }
+ if(connectionsHandle.serverConnection == NULL) {
+ connectionsHandle.serverConnection = (edg_wll_ConnPool *) calloc(1, sizeof(edg_wll_ConnPool));
+ }
+
+
+ return (&connectionsHandle);
+}
+
+
/* XXX */
for (i=0; i<EDG_WLL_PARAM__LAST; i++) edg_wll_SetParam(out,i,NULL);
- out->connPool = (edg_wll_ConnPool *) calloc(out->poolSize, sizeof(edg_wll_ConnPool));
+ out->connections = edg_wll_initConnections();
+// out->connections->connPool = (edg_wll_ConnPool *) calloc(out->connections->poolSize, sizeof(edg_wll_ConnPool));
out->connPoolNotif = (edg_wll_ConnPool *) calloc(1, sizeof(edg_wll_ConnPool));
out->connProxy = (edg_wll_ConnProxy *) calloc(1, sizeof(edg_wll_ConnProxy));
out->connProxy->conn.sock = -1;
- out->connToUse = -1;
+// out->connToUse = -1;
*ctx = out;
}
#endif
if (ctx->errDesc) free(ctx->errDesc);
- if (ctx->connPool) {
+ if (ctx->connections->connPool) {
int i;
-
- for (i=0; i<ctx->poolSize; i++) {
- if (ctx->connPool[i].peerName) free(ctx->connPool[i].peerName);
- edg_wll_gss_close(&ctx->connPool[i].gss,&close_timeout);
- if (ctx->connPool[i].gsiCred)
- gss_release_cred(&min_stat, &ctx->connPool[i].gsiCred);
- if (ctx->connPool[i].buf) free(ctx->connPool[i].buf);
+
+#ifdef GLITE_LB_THREADED
+ /* Since the introduction of a shared connection pool, the pool
+ cannot be freed here. We only need to unlock connections that
+ may have been locked using this context. */
+
+ #ifdef EDG_WLL_CONNPOOL_DEBUG
+ printf("Running edg_wll_FreeContext - checking for connections locked by the current context.\n");
+ #endif
+
+ edg_wll_poolLock();
+
+ for (i=0; i<ctx->connections->poolSize; i++) {
+
+ if (ctx->connections->locked_by[i]==ctx) {
+ #ifdef EDG_WLL_CONNPOOL_DEBUG
+ printf("Unlocking connection No. %d...",i);
+ #endif
+ edg_wll_connectionUnlock(ctx, i);
+
+ }
+ }
+
+ edg_wll_poolUnlock();
+#endif
+
+/*
+ for (i=0; i<ctx->connections->poolSize; i++) {
+ if (ctx->connections->connPool[i].peerName) free(ctx->connections->connPool[i].peerName);
+ edg_wll_gss_close(&ctx->connections->connPool[i].gss,&close_timeout);
+ if (ctx->connections->connPool[i].gsiCred)
+ gss_release_cred(&min_stat, &ctx->connections->connPool[i].gsiCred);
+ if (ctx->connections->connPool[i].buf) free(ctx->connections->connPool[i].buf);
}
- free(ctx->connPool);
+ free(ctx->connections->connPool);*/
}
if (ctx->connPoolNotif) {
if (ctx->connPoolNotif[0].peerName) free(ctx->connPoolNotif[0].peerName);
#define min(x,y) ((x) < (y) ? (x) : (y))
#define CONTENT_LENGTH "Content-Length:"
-edg_wll_ErrorCode edg_wll_http_recv(edg_wll_Context ctx,char **firstOut,char ***hdrOut,char **bodyOut)
+edg_wll_ErrorCode edg_wll_http_recv(edg_wll_Context ctx,char **firstOut,char ***hdrOut,char **bodyOut, edg_wll_ConnPool *connPTR)
{
char **hdr = NULL,*first = NULL,*body = NULL;
enum { FIRST, HEAD, BODY, DONE } pstat = FIRST;
edg_wll_GssStatus gss_code;
#define bshift(shift) {\
- memmove(ctx->connPool[ctx->connToUse].buf,ctx->connPool[ctx->connToUse].buf+(shift),ctx->connPool[ctx->connToUse].bufUse-(shift));\
- ctx->connPool[ctx->connToUse].bufUse -= (shift);\
+ memmove(connPTR->buf,connPTR->buf+(shift),connPTR->bufUse-(shift));\
+ connPTR->bufUse -= (shift);\
}
edg_wll_ResetError(ctx);
- if (ctx->connPool[ctx->connToUse].gss.context != GSS_C_NO_CONTEXT)
- sock = ctx->connPool[ctx->connToUse].gss.sock;
+ if (connPTR->gss.context != GSS_C_NO_CONTEXT)
+ sock = connPTR->gss.sock;
else {
edg_wll_SetError(ctx,ENOTCONN,NULL);
goto error;
}
- if (!ctx->connPool[ctx->connToUse].buf) ctx->connPool[ctx->connToUse].buf = malloc(ctx->connPool[ctx->connToUse].bufSize = BUFSIZ);
+ if (!connPTR->buf) connPTR->buf = malloc(connPTR->bufSize = BUFSIZ);
do {
- len = edg_wll_gss_read(&ctx->connPool[ctx->connToUse].gss,
- ctx->connPool[ctx->connToUse].buf+ctx->connPool[ctx->connToUse].bufUse,ctx->connPool[ctx->connToUse].bufSize-ctx->connPool[ctx->connToUse].bufUse,&ctx->p_tmp_timeout, &gss_code);
+ len = edg_wll_gss_read(&connPTR->gss,
+ connPTR->buf+connPTR->bufUse,connPTR->bufSize-connPTR->bufUse,&ctx->p_tmp_timeout, &gss_code);
switch (len) {
case EDG_WLL_GSS_OK:
}
- ctx->connPool[ctx->connToUse].bufUse += len;
+ connPTR->bufUse += len;
rdmore = 0;
while (!rdmore && pstat != DONE) switch (pstat) {
char *cr;
case FIRST:
- if ((cr = memchr(ctx->connPool[ctx->connToUse].buf,'\r',ctx->connPool[ctx->connToUse].bufUse)) &&
- ctx->connPool[ctx->connToUse].bufUse >= cr-ctx->connPool[ctx->connToUse].buf+2 && cr[1] == '\n')
+ if ((cr = memchr(connPTR->buf,'\r',connPTR->bufUse)) &&
+ connPTR->bufUse >= cr-connPTR->buf+2 && cr[1] == '\n')
{
*cr = 0;
- first = strdup(ctx->connPool[ctx->connToUse].buf);
- bshift(cr-ctx->connPool[ctx->connToUse].buf+2);
+ first = strdup(connPTR->buf);
+ bshift(cr-connPTR->buf+2);
pstat = HEAD;
} else rdmore = 1;
break;
case HEAD:
- if ((cr = memchr(ctx->connPool[ctx->connToUse].buf,'\r',ctx->connPool[ctx->connToUse].bufUse)) &&
- ctx->connPool[ctx->connToUse].bufUse >= cr-ctx->connPool[ctx->connToUse].buf+2 && cr[1] == '\n')
+ if ((cr = memchr(connPTR->buf,'\r',connPTR->bufUse)) &&
+ connPTR->bufUse >= cr-connPTR->buf+2 && cr[1] == '\n')
{
- if (cr == ctx->connPool[ctx->connToUse].buf) {
+ if (cr == connPTR->buf) {
bshift(2);
pstat = clen ? BODY : DONE;
if (clen) body = malloc(clen+1);
*cr = 0;
hdr = realloc(hdr,(nhdr+2) * sizeof(*hdr));
- hdr[nhdr] = strdup(ctx->connPool[ctx->connToUse].buf);
+ hdr[nhdr] = strdup(connPTR->buf);
hdr[++nhdr] = NULL;
- if (!strncasecmp(ctx->connPool[ctx->connToUse].buf,CONTENT_LENGTH,sizeof(CONTENT_LENGTH)-1))
- clen = atoi(ctx->connPool[ctx->connToUse].buf+sizeof(CONTENT_LENGTH)-1);
+ if (!strncasecmp(connPTR->buf,CONTENT_LENGTH,sizeof(CONTENT_LENGTH)-1))
+ clen = atoi(connPTR->buf+sizeof(CONTENT_LENGTH)-1);
- bshift(cr-ctx->connPool[ctx->connToUse].buf+2);
+ bshift(cr-connPTR->buf+2);
} else rdmore = 1;
break;
case BODY:
- if (ctx->connPool[ctx->connToUse].bufUse) {
- int m = min(ctx->connPool[ctx->connToUse].bufUse,clen-blen);
- memcpy(body+blen,ctx->connPool[ctx->connToUse].buf,m);
+ if (connPTR->bufUse) {
+ int m = min(connPTR->bufUse,clen-blen);
+ memcpy(body+blen,connPTR->buf,m);
blen += m;
bshift(m);
}
}
}
-edg_wll_ErrorCode edg_wll_http_send(edg_wll_Context ctx,const char *first,const char * const *head,const char *body)
+edg_wll_ErrorCode edg_wll_http_send(edg_wll_Context ctx,const char *first,const char * const *head,const char *body, edg_wll_ConnPool *connPTR)
{
const char* const *h;
int len = 0, blen;
edg_wll_ResetError(ctx);
- if (ctx->connPool[ctx->connToUse].gss.context == GSS_C_NO_CONTEXT)
+ if (connPTR->gss.context == GSS_C_NO_CONTEXT)
return edg_wll_SetError(ctx,ENOTCONN,NULL);
- if (real_write(ctx,&ctx->connPool[ctx->connToUse].gss,first,strlen(first)) < 0 ||
- real_write(ctx,&ctx->connPool[ctx->connToUse].gss,"\r\n",2) < 0)
+ if (real_write(ctx,&connPTR->gss,first,strlen(first)) < 0 ||
+ real_write(ctx,&connPTR->gss,"\r\n",2) < 0)
return edg_wll_SetError(ctx,errno,"edg_wll_http_send()");
if (head) for (h=head; *h; h++)
- if (real_write(ctx,&ctx->connPool[ctx->connToUse].gss,*h,strlen(*h)) < 0 ||
- real_write(ctx,&ctx->connPool[ctx->connToUse].gss,"\r\n",2) < 0)
+ if (real_write(ctx,&connPTR->gss,*h,strlen(*h)) < 0 ||
+ real_write(ctx,&connPTR->gss,"\r\n",2) < 0)
return edg_wll_SetError(ctx,errno,"edg_wll_http_send()");
if (body) {
len = strlen(body);
blen = sprintf(buf,CONTENT_LENGTH " %d\r\n",len);
- if (real_write(ctx,&ctx->connPool[ctx->connToUse].gss,buf,blen) < 0)
+ if (real_write(ctx,&connPTR->gss,buf,blen) < 0)
return edg_wll_SetError(ctx,errno,"edg_wll_http_send()");
}
- if (real_write(ctx,&ctx->connPool[ctx->connToUse].gss,"\r\n",2) < 0)
+ if (real_write(ctx,&connPTR->gss,"\r\n",2) < 0)
return edg_wll_SetError(ctx,errno,"edg_wll_http_send()");
- if (body && real_write(ctx,&ctx->connPool[ctx->connToUse].gss,body,len) < 0)
+ if (body && real_write(ctx,&connPTR->gss,body,len) < 0)
return edg_wll_SetError(ctx,errno,"edg_wll_http_send()");
return edg_wll_Error(ctx,NULL,NULL);
"%sQUERY_JOBS_LIMIT",
"%sQUERY_EVENTS_LIMIT",
"%sQUERY_RESULTS",
- "%sQUERY_CONNECTIONS",
+ "%sCONNPOOL_SIZE",
"%sNOTIF_SERVER",
"%sNOTIF_SERVER",
"%sNOTIF_TIMEOUT",
return edg_wll_SetError(ctx,EINVAL,"can't parse query result parameter name");
}
break;
- case EDG_WLL_PARAM_QUERY_CONNECTIONS:
+ case EDG_WLL_PARAM_CONNPOOL_SIZE:
{
char *s = mygetenv(param);
if (!val && s) val = atoi(s);
- ctx->poolSize = val ? val : EDG_WLL_LOG_CONNECTIONS_DEFAULT;
+ connectionsHandle.poolSize = val ? val : GLITE_LB_COMMON_CONNPOOL_SIZE;
}
break;
case EDG_WLL_PARAM_SOURCE:
case EDG_WLL_PARAM_QUERY_JOBS_LIMIT:
case EDG_WLL_PARAM_QUERY_EVENTS_LIMIT:
case EDG_WLL_PARAM_QUERY_RESULTS:
- case EDG_WLL_PARAM_QUERY_CONNECTIONS:
+ case EDG_WLL_PARAM_CONNPOOL_SIZE:
case EDG_WLL_PARAM_SOURCE:
return edg_wll_SetParamInt(ctx,param,va_arg(ap,int));
case EDG_WLL_PARAM_HOST:
p_int = va_arg(ap, int *);
*p_int = ctx->p_query_results;
break;
- case EDG_WLL_PARAM_QUERY_CONNECTIONS:
+ case EDG_WLL_PARAM_CONNPOOL_SIZE:
p_int = va_arg(ap, int *);
- *p_int = ctx->poolSize;
+ *p_int = connectionsHandle.poolSize;
break;
case EDG_WLL_PARAM_SOURCE:
p_int = va_arg(ap, int *);
{
kick_client = KICK_HANDLER;
} else {
- req_cnt++;
first_request = 0;
to = set_request_to;
if ((rv = services[srv].on_request_hnd(conn,to.tv_sec>=0 ? &to : NULL,clnt_data)) == ENOTCONN) {
ctx->p_tmp_timeout.tv_sec = timeout->tv_sec;
ctx->p_tmp_timeout.tv_usec = timeout->tv_usec;
- ctx->poolSize = 1;
- ctx->connPool = calloc(1, sizeof(edg_wll_ConnPool));
- ctx->connToUse = 0;
+ edg_wll_initConnections();
alen = sizeof(a);
getpeername(conn, (struct sockaddr *)&a, &alen);
- ctx->connPool[ctx->connToUse].peerName = strdup(inet_ntoa(a.sin_addr));
- ctx->connPool[ctx->connToUse].peerPort = ntohs(a.sin_port);
+ ctx->connections->serverConnection->peerName = strdup(inet_ntoa(a.sin_addr));
+ ctx->connections->serverConnection->peerPort = ntohs(a.sin_port);
ctx->count_statistics = count_statistics;
ctx->serverIdentity = strdup(server_subject);
case NETDB_SUCCESS:
if (name) dprintf(("[%d] connection from %s:%d (%s)\n",
getpid(), inet_ntoa(a.sin_addr), ntohs(a.sin_port), name));
- free(ctx->connPool[ctx->connToUse].peerName);
- ctx->connPool[ctx->connToUse].peerName = name;
+ free(ctx->connections->serverConnection->peerName);
+ ctx->connections->serverConnection->peerName = name;
name = NULL;
break;
ctx->srvPort = ntohs(a.sin_port);
}
- if ( (ret = edg_wll_gss_accept(mycred, conn, timeout, &ctx->connPool[ctx->connToUse].gss, &gss_code)) )
+ if ( (ret = edg_wll_gss_accept(mycred, conn, timeout, &ctx->connections->serverConnection->gss, &gss_code)) )
{
if ( ret == EDG_WLL_GSS_ERROR_TIMEOUT )
{
return 1;
}
- maj_stat = gss_inquire_context(&min_stat, ctx->connPool[ctx->connToUse].gss.context,
+ maj_stat = gss_inquire_context(&min_stat, ctx->connections->serverConnection->gss.context,
&client_name, NULL, NULL, NULL, NULL, NULL, NULL);
if ( !GSS_ERROR(maj_stat) )
maj_stat = gss_display_name(&min_stat, client_name, &token, NULL);
if ( token.value )
gss_release_buffer(&min_stat, &token);
- if ( edg_wll_SetVomsGroups(ctx, &ctx->connPool[ctx->connToUse].gss, server_cert, server_key, vomsdir, cadir) )
+ if ( edg_wll_SetVomsGroups(ctx, &ctx->connections->serverConnection->gss, server_cert, server_key, vomsdir, cadir) )
{
char *errt, *errd;
soap_done(soap);
goto err;
}
- gsplugin_ctx->connection = &cdata->ctx->connPool[cdata->ctx->connToUse].gss;
+ gsplugin_ctx->connection = &cdata->ctx->connections->serverConnection->gss;
gsplugin_ctx->cred = mycred;
cdata->soap = soap;
edg_wll_Context ctx = ((struct clnt_data_t *) cdata)->ctx;
- if ( ctx->connPool[ctx->connToUse].gss.context != GSS_C_NO_CONTEXT)
- edg_wll_gss_close(&ctx->connPool[ctx->connToUse].gss, timeout);
+ if ( ctx->connections->serverConnection->gss.context != GSS_C_NO_CONTEXT)
+ edg_wll_gss_close(&ctx->connections->serverConnection->gss, timeout);
edg_wll_FreeContext(ctx);
ctx = NULL;
if ( ctx->isProxy ) err = edg_wll_http_recv_proxy(ctx,&req,&hdr,&body);
- else err = edg_wll_http_recv(ctx,&req,&hdr,&body);
+ else err = edg_wll_http_recv(ctx,&req,&hdr,&body,ctx->connections->serverConnection);
dprintf(("[%d] %s\n",getpid(),req));
if (body) dprintf(("\n%s\n\n",body));
if ( ctx->isProxy )
edg_wll_http_send_proxy(ctx,resp,(char const * const *)hdrOut,bodyOut);
else
- edg_wll_http_send(ctx,resp,(char const * const *)hdrOut,bodyOut);
+ edg_wll_http_send(ctx,resp,(char const * const *)hdrOut,bodyOut,ctx->connections->serverConnection);
}
}
goto cleanup;
}
if ( !strncmp(address_override, "0.0.0.0", aux-address_override) )
- trio_asprintf(&addr_s, "%s:%s", ctx->connPool[ctx->connToUse].peerName, aux+1);
+ trio_asprintf(&addr_s, "%s:%s", ctx->connections->serverConnection->peerName, aux+1);
}
/* Format DB insert statement
goto cleanup;
}
if ( !strncmp(address_override, "0.0.0.0", aux-address_override) )
- trio_asprintf(&addr_s, "%s:%s", ctx->connPool[ctx->connToUse].peerName, aux+1);
+ trio_asprintf(&addr_s, "%s:%s", ctx->connections->serverConnection->peerName, aux+1);
}
asprintf(&response, "HTTP/1.1 %d %s", ret, edg_wll_HTTPErrorMessage(ret));
- edg_wll_http_send(ctx, response, resp_headers, message);
+ edg_wll_http_send(ctx, response, resp_headers, message,ctx->connections->serverConnection);
return edg_wll_Error(ctx,NULL,NULL);
}
int ret, len;
edg_wll_GssStatus gss_code;
- ret = edg_wll_gss_read_full(&tmp_ctx->connPool[tmp_ctx->connToUse].gss,
+ ret = edg_wll_gss_read_full(&tmp_ctx->connections->serverConnection->gss,
buffer, max_len,
&tmp_ctx->p_tmp_timeout,
&len, &gss_code);
free(buf);
if ((len = create_reply(ctx,&buf)) > 0) {
- if ((ret = edg_wll_gss_write_full(&ctx->connPool[ctx->connToUse].gss,buf,len,&ctx->p_tmp_timeout,&total,&gss_code)) < 0)
+ if ((ret = edg_wll_gss_write_full(&ctx->connections->serverConnection->gss,buf,len,&ctx->p_tmp_timeout,&total,&gss_code)) < 0)
edg_wll_SetError(ctx,
ret == EDG_WLL_GSS_ERROR_TIMEOUT ?
ETIMEDOUT : EDG_WLL_ERROR_GSS,