TOOLS:=dump load purge lb_dump_exporter ${LB_PERF_TOOLS}
EXAMPLES:=log_usertag_proxy job_log job_reg feed_shark notify query_ext query_seq_code stats abort_job change_acl stresslog
-EXAMPLES_CL=user_jobs job_status user_jobs_threaded
+EXAMPLES_CL=user_jobs job_status
+EXAMPLES_CL_THR=user_jobs_threaded
FAKE_EXAMPLES:=job_log_fake
MAN_GZ:=glite-lb-logevent.1.gz
${EXAMPLES_CL}: %: %.o
${LINK} -o $@ $< ${LIB} ${COMMON_LIB} ${EXT_LIB}
+${EXAMPLES_CL_THR}: %: %.o
+ ${LINK} -o $@ $< ${THRLIB} ${COMMON_LIB_THR} ${EXT_LIB}
+
${FAKE_EXAMPLES}: %: %.o ${FAKELIB}
${LINK} -o $@ $< ${FAKELIB} ${TEST_LIBS} ${EXT_LIB}
compile all: ${LIB} ${THRLIB} ${TOOLS} logevent ${PLUSLIB} ${THRPLUSLIB} examples ${MAN_GZ}
endif
-examples: ${EXAMPLES} ${EXAMPLES_CL} ${sh_PROGS}
+examples: ${EXAMPLES} ${EXAMPLES_CL} ${EXAMPLES_CL_THR} ${sh_PROGS}
fake: ${FAKE_EXAMPLES}
for p in ${TOOLS} ; do \
${INSTALL} -m 755 "$$p" "${PREFIX}/sbin/glite-lb-$$p"; \
done
- for p in ${EXAMPLES} ${EXAMPLES_CL} ${sh_PROGS} ; do \
+ for p in ${EXAMPLES} ${EXAMPLES_CL} ${EXAMPLES_CL_THR} ${sh_PROGS} ; do \
${INSTALL} -m 755 "$$p" "${PREFIX}/examples/glite-lb-$$p"; \
done
${INSTALL} -m 755 ${top_srcdir}/examples/export.sh "${PREFIX}/examples/glite-lb-export.sh"
#include <expat.h>
+#include <pthread.h>
+
#include "glite/lb/context.h"
#include "glite/lb/xml_conversions.h"
#include "glite/lb/consumer.h"
fprintf(stderr,"usage: %s [-h] [-x] [userid]\n", me);
}
-int main(int argc,char **argv)
-{
+typedef struct {
+ char *owner;
+ int proxy;
+ char *argv_0;} thread_code_args;
+
+void *thread_code(thread_code_args *arguments) {
+
edg_wll_Context ctx;
char *errt,*errd;
edg_wlc_JobId *jobs = NULL;
edg_wll_JobStat *states = NULL;
int i,j;
- char *owner = NULL;
user_jobs = edg_wll_UserJobs;
- for ( i = 1; i < argc; i++ ) {
- if ( !strcmp(argv[i], "-h") || !strcmp(argv[i], "--help") ) {
- usage(argv[0]);
- exit(0);
- } else if ( !strcmp(argv[i], "-x") ) {
- user_jobs = edg_wll_UserJobsProxy;
- continue;
- }
- owner = strdup(argv[i]);
- break;
- }
+ if (arguments->proxy) {
+ user_jobs = edg_wll_UserJobsProxy;
+ }
edg_wll_InitContext(&ctx);
- if ( user_jobs == edg_wll_UserJobsProxy && owner )
- edg_wll_SetParam(ctx, EDG_WLL_PARAM_LBPROXY_USER, owner);
+ if ( user_jobs == edg_wll_UserJobsProxy && arguments->owner )
+ edg_wll_SetParam(ctx, EDG_WLL_PARAM_LBPROXY_USER, arguments->owner);
if (user_jobs(ctx,&jobs,&states)) goto err;
for (i=0; states[i].state != EDG_WLL_JOB_UNDEF; i++) {
printf("\nFound %d jobs\n",i);
err:
- free(owner);
+ free(arguments->owner);
if (jobs) {
for (i=0; jobs[i]; i++) edg_wlc_JobIdFree(jobs[i]);
free(jobs);
}
if (edg_wll_Error(ctx,&errt,&errd)) {
- fprintf(stderr,"%s: %s (%s)\n",argv[0],errt,errd);
+ fprintf(stderr,"%s: %s (%s)\n",arguments->argv_0,errt,errd);
edg_wll_FreeContext(ctx);
- return 1;
+ pthread_exit(NULL);
}
edg_wll_FreeContext(ctx);
- return 0;
+
+ printf("Thread %d exitting\n",(int)pthread_self());
+
+ pthread_exit(NULL);
+
+ printf("Thread %d out !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!\n",(int)pthread_self());
+// return 0;
+}
+
+int main(int argc,char **argv)
+{
+
+ #define NUM_THREADS 10
+
+ thread_code_args arguments = { NULL , 0 , NULL };
+ int i,rc,status;
+ pthread_t threads[NUM_THREADS];
+ pthread_attr_t attr;
+
+ pthread_attr_init(&attr);
+ pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
+
+ for ( i = 1; i < argc; i++ ) {
+ if ( !strcmp(argv[i], "-h") || !strcmp(argv[i], "--help") ) {
+ usage(argv[0]);
+ exit(0);
+ }
+ else if ( !strcmp(argv[i], "-x") ) {
+ arguments.proxy = 1;
+ }
+
+ arguments.owner = strdup(argv[i]);
+ }
+
+ arguments.argv_0 = argv[0];
+
+
+ for (i=0;i<NUM_THREADS;i++) {
+ printf("Running thread %d\n",i);
+ rc = pthread_create(&threads[i], &attr, (void*)thread_code, &arguments );
+ if (rc){
+ printf("ERROR; return code from pthread_create() is %d\n", rc);
+ exit(-1);
+ }
+ }
+
+
+
+// thread_code(&arguments);
+
+// pthread_exit(NULL)
+
+ pthread_attr_destroy(&attr);
+
+ for (i=0;i<NUM_THREADS;i++) {
+ printf("*** Joining thread %d.\n",i);
+ rc=pthread_join(threads[i],(void **)&status);
+ printf("*** Thread %d joined. (status %d ret. code %d)\n",i,status,rc);
+
+ }
+
+ printf("Threaded example main() exitting\n");
+
+
+// exit(0);
+// pthread_exit(NULL);
}
OM_uint32 min_stat;
int cIndex;
- cIndex = conn_index[0];
+ cIndex = *conn_index;
assert(ctx->connections->connOpened);
assert(cIndex < ctx->connections->connOpened);
/* if deleted conn was not the last one -> there is a 'hole' and then */
/* 'shake' together connections in pool, no holes are allowed */
- if (cIndex < ctx->connections->connOpened - 1) {
+ /* */
+ /* This principle is unsuitable for multi-threaded applications. Too much waiting for connections */
+ /* to unlock. We need to allow "holes" in the pool. */
+
+/* if (cIndex < ctx->connections->connOpened - 1) {
ctx->connections->connPool[cIndex] = ctx->connections->connPool[ctx->connections->connOpened - 1];
memset(ctx->connections->connPool + ctx->connections->connOpened - 1 , 0, sizeof(edg_wll_ConnPool));
- }
+ }*/
+
ctx->connections->connOpened--;
- conn_index[0] = cIndex;
+ *conn_index = cIndex;
}
{
int i;
- for (i=0; i<ctx->connections->connOpened;i++) {
- if (!strcmp(name, ctx->connections->connPool[i].peerName) &&
+ for (i=0; i<ctx->connections->poolSize;i++) {
+ if ((ctx->connections->connPool[i].peerName != NULL) &&
+ !strcmp(name, ctx->connections->connPool[i].peerName) &&
(port == ctx->connections->connPool[i].peerPort)) {
/* TryLock (next line) is in fact used only
static int AddConnection(edg_wll_Context ctx, char *name, int port)
{
- int index = ctx->connections->connOpened;
-
+ int i,index = -1;
+
+ for (i = 0; i < ctx->connections->poolSize; i++) {
+ if (ctx->connections->connPool[i].peerName == NULL) {
+ if (!edg_wll_connectionTryLock(ctx, i)) {
+ index = i; // This connection is free and was not locked. We may lock it and use it.
+ break;
+ }
+ }
+ }
+
+ if (index < 0) return -1;
+
free(ctx->connections->connPool[index].peerName); // should be empty; just to be sure
ctx->connections->connPool[index].peerName = strdup(ctx->srvName);
ctx->connections->connPool[index].peerPort = ctx->srvPort;
if ((index = ConnectionIndex(ctx, name, port)) >= 0)
CloseConnection(ctx, &index);
}
- else { /* free the oldest connection*/
+ else { /* free the oldest (unlocked) connection */
+ assert(ctx->connections->connPool[0].peerName); // Full pool expected - accept non-NULL values only
min = ctx->connections->connPool[0].lastUsed.tv_sec;
- for (i=0; i<ctx->connections->connOpened; i++) {
+ for (i=0; i<ctx->connections->poolSize; i++) {
+ assert(ctx->connections->connPool[i].peerName); // Full pool expected - accept non-NULL values only
if (ctx->connections->connPool[i].lastUsed.tv_sec < min) {
min = ctx->connections->connPool[i].lastUsed.tv_sec;
index = i;
ReleaseConnection(ctx, NULL, 0);
index = AddConnection(ctx, ctx->srvName, ctx->srvPort);
+ #ifdef EDG_WLL_CONNPOOL_DEBUG
+ printf("Connection to %s:%d opened as No. %d in the pool\n",ctx->srvName,ctx->srvPort,index);
+ #endif
}
/* else - there is cached open connection, reuse it */
-
+ #ifdef EDG_WLL_CONNPOOL_DEBUG
+ else printf("Connection to %s:%d exists (No. %d in the pool) - reusing\n",ctx->srvName,ctx->srvPort,index);
+ #endif
+
*connToUse = index;
/* XXX support anonymous connections, perhaps add a flag to the connPool
*connToUse = -1;
ok:
- if (*connToUse>-1) edg_wll_connectionLock(ctx, *connToUse); /* Lock the connection */
+ if (*connToUse>-1) edg_wll_connectionTryLock(ctx, *connToUse); /* Just to be sure we have not forgotten to lock it */
edg_wll_poolUnlock(); /* One way or the other, there are no more pool-wide operations */
assert(connToUse >= 0);
gettimeofday(&ctx->connections->connPool[connToUse].lastUsed, NULL);
+ edg_wll_connectionUnlock(ctx, connToUse);
return 0;
err: