merge 1.5
authorAleš Křenek <ljocha@ics.muni.cz>
Wed, 15 Mar 2006 17:59:04 +0000 (17:59 +0000)
committerAleš Křenek <ljocha@ics.muni.cz>
Wed, 15 Mar 2006 17:59:04 +0000 (17:59 +0000)
19 files changed:
org.glite.lb.client/Makefile
org.glite.lb.client/examples/export.sh [new file with mode: 0644]
org.glite.lb.client/examples/gen_begin
org.glite.lb.client/examples/job_reg.c
org.glite.lb.client/examples/lbmon-db.c [new file with mode: 0644]
org.glite.lb.client/examples/resubmission.l
org.glite.lb.client/examples/shallow_and_full_resub.l
org.glite.lb.client/examples/shallow_resub_complex.l
org.glite.lb.client/examples/shallow_resub_simple.l
org.glite.lb.client/examples/shallow_resub_simple2.l
org.glite.lb.client/examples/stresslog.c [new file with mode: 0644]
org.glite.lb.client/project/build.number
org.glite.lb.client/project/configure.properties.xml
org.glite.lb.client/project/version.properties
org.glite.lb.client/src/connection.c
org.glite.lb.client/src/lb_dump_exporter.c
org.glite.lb.client/src/logevent.c.T
org.glite.lb.client/src/prod_proto.c
org.glite.lb.client/src/producer.c

index 14ed42a..544026d 100644 (file)
@@ -72,19 +72,20 @@ CFLAGS:=${DEBUG} \
        -I${glite_location}/include \
        -I${expat_prefix}/include \
        -I${ares_prefix}/include \
+       -I${mysql_prefix}/include -I${mysql_prefix}/include/mysql \
        ${COVERAGE_FLAGS} \
        -D_GNU_SOURCE
 
 
 CXXFLAGS:=${CFLAGS}
 
-GLOBUS_LIBS:=-L${globus_prefix}/lib \
-       -lglobus_common_${nothrflavour} \
-       -lglobus_gssapi_gsi_${nothrflavour} \
-
-GLOBUS_THRLIBS:=-L${globus_prefix}/lib \
-       -lglobus_common_${thrflavour} \
-       -lglobus_gssapi_gsi_${thrflavour}
+ifneq (${mysql_prefix},/usr)
+       ifeq ($(shell echo ${mysql_version} | cut -d. -f1,2),4.1)
+               mysqlib := -L${mysql_prefix}/lib/mysql
+       else
+               mysqlib := -L${mysql_prefix}/lib
+       endif
+endif
 
 ifneq (${expat_prefix},/usr)
        EXPAT_LIBS:=-L${expat_prefix}/lib
@@ -94,8 +95,9 @@ EXPAT_LIBS := ${EXPAT_LIBS} -lexpat
 ARES_LIBS:=-L${ares_prefix}/lib \
        -lares
 
-EXT_LIB:= ${EXPAT_LIBS} \
-    ${ARES_LIBS}
+EXT_LIB:= ${mysqlib} -lmysqlclient -lz\
+       ${EXPAT_LIBS} \
+       ${ARES_LIBS}
 
 TEST_LIBS:=-L${cppunit_prefix}/lib -lcppunit
 TEST_INC:=-I${cppunit_prefix}/include
@@ -140,9 +142,9 @@ PLUSLIB:=libglite_lb_clientpp_${nothrflavour}.la
 THRPLUSLIB:=libglite_lb_clientpp_${thrflavour}.la
 
 TOOLS:=dump load purge lb_dump_exporter
-EXAMPLES:=log_usertag_proxy job_log job_reg feed_shark notify query_ext query_seq_code stats abort_job change_acl lbmon
+EXAMPLES:=log_usertag_proxy job_log job_reg feed_shark notify query_ext query_seq_code stats abort_job change_acl lbmon stresslog
 
-EXAMPLES_CL=user_jobs job_status
+EXAMPLES_CL=user_jobs job_status 
 FAKE_EXAMPLES:=job_log_fake
 
 MAN_GZ:=glite-lb-logevent.1.gz
@@ -183,16 +185,19 @@ ${THRPLUSLIB}: ${PLUSTHROBJS}
        ${LINK} ${version_info} -o $@ ${PLUSTHRLOBJS} -rpath ${glite_location}/lib ${THRLIB}
 
 logevent: logevent.o args.o
-       ${LINK} -o $@ logevent.o args.o ${LIB} ${EXT_LIB} ${GLOBUS_LIBS}
+       ${LINK} -o $@ logevent.o args.o ${LIB} ${EXT_LIB} 
+
+lbmon-db: lbmon-db.o
+       ${LINK} -o $@ lbmon-db.o -lmysqlclient -lglite_lb_bkserver ${LIB} ${EXT_LIB} 
 
 ${TOOLS} ${EXAMPLES}: %: %.o
-       ${LINK} -o $@ $< ${LIB} ${EXT_LIB} ${GLOBUS_LIBS}
+       ${LINK} -o $@ $< ${LIB} ${EXT_LIB} 
 
 ${EXAMPLES_CL}: %: %.o
-       ${LINK} -o $@ $< ${LIB} ${COMMON_LIB} ${EXT_LIB} ${GLOBUS_LIBS}
+       ${LINK} -o $@ $< ${LIB} ${COMMON_LIB} ${EXT_LIB} 
 
 ${FAKE_EXAMPLES}: %: %.o ${FAKELIB}
-       ${LINK} -o $@ $< ${FAKELIB} ${TEST_LIBS} ${EXT_LIB} ${GLOBUS_LIBS}
+       ${LINK} -o $@ $< ${FAKELIB} ${TEST_LIBS} ${EXT_LIB} 
 
 ${TOOLS}: ${LIB}
 
@@ -241,7 +246,7 @@ ${MAN_GZ}: ${MAN}
 default: all
 
 
-compile all: check_version ${LIB} ${THRLIB} ${TOOLS} logevent ${PLUSLIB} ${THRPLUSLIB} examples ${MAN_GZ}
+compile all: check_version ${LIB} ${THRLIB} ${TOOLS} logevent lbmon-db ${PLUSLIB} ${THRPLUSLIB} examples ${MAN_GZ}
 
 examples: ${EXAMPLES} ${EXAMPLES_CL} ${sh_PROGS}
 
@@ -253,7 +258,7 @@ check.producer: producer_test
        ./producer_test
 
 producer_test: producer_test.o prod_proto_test.o
-       ${LINKXX} -o $@ ${LIB} ${TEST_LIBS} $+ ${EXT_LIB} ${GLOBUS_LIBS}
+       ${LINKXX} -o $@ ${LIB} ${TEST_LIBS} $+ ${EXT_LIB}
 
 producer_test.o: %.o: %.cpp
        ${CXX} -c ${CXXFLAGS} ${TEST_INC} ${GLOBUSINC} $<
@@ -287,15 +292,16 @@ install:
        ${INSTALL} -m 644 ${top_srcdir}/LICENSE ${PREFIX}/share/doc/${package}-${version}
        ${INSTALL} -m 644 ${top_srcdir}/doc/README-fake ${top_srcdir}/doc/README-notify ${PREFIX}/share/doc/${package}-${version}
        mkdir -p ${PREFIX}/examples
-       for p in logevent; do \
+       for p in logevent ; do \
                ${INSTALL} -m 755 "$$p" "${PREFIX}/bin/glite-lb-$$p"; \
        done
        for p in ${TOOLS} ; do \
                ${INSTALL} -m 755 "$$p" "${PREFIX}/sbin/glite-lb-$$p"; \
        done
-       for p in ${EXAMPLES} ${EXAMPLES_CL} ${sh_PROGS}; do \
+       for p in ${EXAMPLES} lbmon-db ${EXAMPLES_CL} ${sh_PROGS} ; do \
                ${INSTALL} -m 755 "$$p" "${PREFIX}/examples/glite-lb-$$p"; \
        done
+       ${INSTALL} -m 755 ${top_srcdir}/examples/export.sh "${PREFIX}/examples/glite-lb-export.sh"
        ${INSTALL} -m 644 ${MAN_GZ} ${PREFIX}/share/man/man1
 
 clean:
diff --git a/org.glite.lb.client/examples/export.sh b/org.glite.lb.client/examples/export.sh
new file mode 100644 (file)
index 0000000..a4a8dc6
--- /dev/null
@@ -0,0 +1,39 @@
+#! /bin/bash
+
+#
+# script for exporing jobs from bkserver which should be periodically run
+# together with running jp-importer
+#
+# it uses configuration from enviroment ==> may require a configuration wrapper
+#
+
+#autodetect the prefix
+PREFIX=${GLITE_LOCATION:-`dirname $0`/..}
+
+# dump directory of bkserver
+GLITE_LB_EXPORT_DUMPDIR=${GLITE_LB_EXPORT_DUMPDIR:-/tmp/dump}
+GLITE_LB_EXPORT_DUMPDIR_OLD=${GLITE_LB_EXPORT_DUMPDIR_OLD:-$GLITE_LB_EXPORT_DUMPDIR.old}
+# maildir dump directory for jp importer
+GLITE_LB_EXPORT_JPDUMP_MAILDIR=${GLITE_LB_EXPORT_JPDUMP_MAILDIR:-/tmp/jpdump}
+# directory with exported data (file per job)
+GLITE_LB_EXPORT_EXPORTDIR=${GLITE_LB_EXPORT_EXPORTDIR:-/tmp/lbexport}
+# purge args (timeouts)
+GLITE_LB_EXPORT_PURGE_ARGS=${GLITE_LB_EXPORT_PURGE_ARGS:--a 1h -c 1h -n 1h -o 1d}
+# Book Keeping Server
+GLITE_LB_EXPORT_BKSERVER=${GLITE_LB_EXPORT_BKSERVER:-localhost:9000}
+
+[ -d $GLITE_LB_EXPORT_JPDUMP_MAILDIR ] || mkdir -p $GLITE_LB_EXPORT_JPDUMP_MAILDIR
+[ -d $GLITE_LB_EXPORT_DUMPDIR ] || mkdir -p $GLITE_LB_EXPORT_DUMPDIR
+[ -d $GLITE_LB_EXPORT_DUMPDIR_OLD ] || mkdir -p $GLITE_LB_EXPORT_DUMPDIR_OLD
+[ -d $GLITE_LB_EXPORT_EXPORTDIR ] || mkdir -p $GLITE_LB_EXPORT_EXPORTDIR
+
+$PREFIX/sbin/glite-lb-purge $GLITE_LB_EXPORT_PURGE_ARGS -l -m $GLITE_LB_EXPORT_BKSERVER
+
+for file in $GLITE_LB_EXPORT_DUMPDIR/*; do
+  if [ -s $file ]; then
+    $PREFIX/sbin/glite-lb-lb_dump_exporter -d $file -s $GLITE_LB_EXPORT_EXPORTDIR -m $GLITE_LB_EXPORT_JPDUMP_MAILDIR
+    mv $file $GLITE_LB_EXPORT_DUMPDIR_OLD
+  else
+    rm $file
+  fi
+done
index ae9f981..bcc5083 100755 (executable)
@@ -8,7 +8,12 @@ if [ `echo $(basename $0) | grep 'glite-lb-' | wc -l` = "1" ]; then
        PREFIX=glite-lb-
 fi
 LOGFD=${LOGFD:-2}
-LOGEV=${LOGEV:-`dirname $0`/${PREFIX}logevent}${SUFFIX}
+if [ -d `dirname $0`/../bin ]; then
+       BINDIR=`dirname $0`/../bin
+else
+       BINDIR=`dirname $0`
+fi
+LOGEV=${LOGEV:-${BINDIR}/${PREFIX}logevent}${SUFFIX}
 JOB_REG=${JOB_REG:-`dirname $0`/${PREFIX}job_reg}${SUFFIX}
 EDG_JOBID=
 LBPROXY=
index ad57239..2512ef8 100644 (file)
@@ -3,6 +3,7 @@
 #include <unistd.h>
 #include <string.h>
 #include <errno.h>
+#include <fcntl.h>
 
 #include "glite/wmsutils/jobid/cjobid.h"
 #include "glite/lb/producer.h"
@@ -18,7 +19,7 @@ static void usage(char *me)
 
 int main(int argc, char *argv[])
 {
-       char *src = NULL,*job = NULL,*server = NULL,*seq;
+       char *src = NULL,*job = NULL,*server = NULL,*seq,*jdl = NULL;
        int lbproxy = 0;
        int done = 0,num_subjobs = 0,reg_subjobs = 0,i;
        edg_wll_Context ctx;
@@ -29,13 +30,14 @@ int main(int argc, char *argv[])
        opterr = 0;
 
        do {
-               switch (getopt(argc,argv,"xs:j:m:n:S")) {
+               switch (getopt(argc,argv,"xs:j:m:n:Sl:")) {
                        case 'x': lbproxy = 1; break;
                        case 's': src = (char *) strdup(optarg); break;
                        case 'j': job = (char *) strdup(optarg); break;
                        case 'm': server = strdup(optarg); break;
                        case 'n': num_subjobs = atoi(optarg); break;
                        case 'S': if (num_subjobs>0) { reg_subjobs = 1; break; }
+                       case 'l': jdl = (char *) strdup(optarg); break;
                        case '?': usage(argv[0]); exit(EINVAL);
                        case -1: done = 1; break;
                }
@@ -63,11 +65,29 @@ int main(int argc, char *argv[])
                exit(1);
        }
 
+       if (jdl) {
+               int     f = open(jdl,O_RDONLY,0);
+               off_t   l,p,c;
+
+               if (f<0) { perror(jdl); exit(1); }
+               l = lseek(f,0,SEEK_END);
+               lseek(f,0,SEEK_SET);
+
+               jdl = malloc(l+1);
+
+               for (p=0; p < l && (c = read(f,jdl+p,l-p)) > 0; p += c);
+               if (c<0) {
+                       perror("read()");
+                       exit (1);
+               }
+               jdl[p] = 0;
+       }
+
        edg_wll_SetParam(ctx,EDG_WLL_PARAM_SOURCE,edg_wll_StringToSource(src));
        if (lbproxy) {
                if (edg_wll_RegisterJobProxy(ctx,jobid,
                        num_subjobs?EDG_WLL_REGJOB_DAG:EDG_WLL_REGJOB_SIMPLE,
-                       "blabla", "NNNSSSS",
+                       jdl ? jdl : "blabla", "NNNSSSS",
                        num_subjobs,NULL,&subjobs))
                {
                        char    *et,*ed;
@@ -78,7 +98,7 @@ int main(int argc, char *argv[])
        } else {
                if (edg_wll_RegisterJobSync(ctx,jobid,
                        num_subjobs?EDG_WLL_REGJOB_DAG:EDG_WLL_REGJOB_SIMPLE,
-                       "blabla", "NNNSSSS",
+                       jdl ? jdl : "blabla", "NNNSSSS",
                        num_subjobs,NULL,&subjobs))
                {
                        char    *et,*ed;
@@ -103,20 +123,24 @@ int main(int argc, char *argv[])
        if (reg_subjobs) {
                char ** jdls = (char**) malloc(num_subjobs*sizeof(char*));
 
-               if (lbproxy) {
-                       fprintf(stderr,"edg_wll_RegisterSubjobsProxy(): not implemented yet.\n");
-                       exit(1);
-               }
-
                for (i=0; subjobs[i]; i++) {
                        asprintf(jdls+i, "JDL of subjob #%d\n", i+1);
                }
 
-               if (edg_wll_RegisterSubjobs(ctx, jobid, (const char **) jdls, NULL, subjobs)) {
-                       char    *et,*ed;
-                       edg_wll_Error(ctx,&et,&ed);
-                       fprintf(stderr,"edg_wll_RegisterSubjobs: %s (%s)\n", et, ed);
-                       exit(1);
+               if (lbproxy) {
+                       if (edg_wll_RegisterSubjobsProxy(ctx, jobid, (const char **) jdls, NULL, subjobs)) {
+                               char    *et,*ed;
+                               edg_wll_Error(ctx,&et,&ed);
+                               fprintf(stderr,"edg_wll_RegisterSubjobsProxy: %s (%s)\n", et, ed);
+                               exit(1);
+                       }
+               } else {
+                       if (edg_wll_RegisterSubjobs(ctx, jobid, (const char **) jdls, NULL, subjobs)) {
+                               char    *et,*ed;
+                               edg_wll_Error(ctx,&et,&ed);
+                               fprintf(stderr,"edg_wll_RegisterSubjobs: %s (%s)\n", et, ed);
+                               exit(1);
+                       }
                }
 
                for (i=0; subjobs[i]; i++) free(jdls[i]);
diff --git a/org.glite.lb.client/examples/lbmon-db.c b/org.glite.lb.client/examples/lbmon-db.c
new file mode 100644 (file)
index 0000000..fee46cd
--- /dev/null
@@ -0,0 +1,90 @@
+#ident "$Header$"
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+#include <getopt.h>
+#include <sysexits.h>
+#include <assert.h>
+
+#include "glite/wmsutils/jobid/strmd5.h"
+#include "glite/lb/consumer.h"
+#include "glite/lb/context-int.h"
+#include "glite/lb/lbs_db.h"
+#include "glite/lb/jobstat.h"
+
+static struct option opts[] = {
+       { "mysql",1,NULL,'m' },
+       { "verbose",0,NULL,'v' },
+       { NULL, 0, NULL, 0 }
+};
+
+static void usage();
+static void do_exit(edg_wll_Context,int);
+static const char *me;
+
+int main(int argc,char **argv)
+{
+       int     opt;
+       char    *dbstring = getenv("LBDB");
+       int     verbose = 0, rows = 0, fields = 0, jobs = 0, i;
+       edg_wll_Context ctx;
+       char    *stmt = NULL, *status = NULL;
+       char    *str[2];
+       edg_wll_Stmt sh;
+
+       me = strdup(argv[0]);
+
+       while ((opt = getopt_long(argc,argv,"m:v",opts,NULL)) != EOF) switch (opt) {
+               case 'm': dbstring = optarg; break;
+               case 'v': verbose++; break;
+               case '?': usage(); exit(EX_USAGE);
+       }
+
+       edg_wll_InitContext(&ctx);
+       if (edg_wll_Open(ctx,dbstring)) do_exit(ctx,EX_UNAVAILABLE);
+       if (edg_wll_DBCheckVersion(ctx)) do_exit(ctx,EX_SOFTWARE);
+       if (asprintf(&stmt,"SELECT status,count(status) FROM states GROUP BY status;") < 0) do_exit(ctx,EX_OSERR);
+       if (verbose) fprintf(stderr,"mysql query: %s\n",stmt);
+       if ((rows = edg_wll_ExecStmt(ctx,stmt,&sh)) < 0) do_exit(ctx,EX_SOFTWARE);
+       if (verbose) fprintf(stderr,"number of states returned: %d\n",rows);
+       if (rows > 0) fprintf(stdout,"Number of jobs in each state: \n");
+       for (i = 0; i < rows; i++) {
+               fields = edg_wll_FetchRow(sh, str);
+               if (fields != 2) {
+                       edg_wll_FreeStmt(&sh);
+                       do_exit(ctx,EX_SOFTWARE);
+               }
+               status = edg_wll_StatToString((edg_wll_JobStatCode) atoi(str[0]));
+               jobs += atoi(str[1]);
+               fprintf(stdout,"%s: %s\n",status,str[1]);
+               if (str[0]) free(str[0]);
+               if (str[1]) free(str[1]);
+               if (status) free(status);
+       }
+       fprintf(stdout,"Total number of jobs: %d\n",jobs);
+
+       if (stmt) free(stmt);
+       edg_wll_FreeStmt(&sh);
+       edg_wll_FreeContext(ctx);
+
+       return 0;
+}
+
+static void do_exit(edg_wll_Context ctx,int code)
+{
+       char    *et,*ed;
+
+       edg_wll_Error(ctx,&et,&ed);
+       fprintf(stderr,"%s: %s (%s)\n",me,et,ed);
+       exit(code);
+}
+
+static void usage()
+{
+       fprintf(stderr,"usage: %s <options>\n"
+                       "       -m,--mysql <dbstring>   use non-default database connection\n"
+                       "       -v,--verbose            be verbose\n",
+                       me);
+}
index 3b8765c..ed6fdcb 100644 (file)
@@ -3,6 +3,7 @@
 -s LogMonitor,-e EnQueued, --queue="wm_input.fl", --job="job description in receiver language", --result=OK, --reason="detailed description of transfer"
 
 -s WorkloadManager,-e DeQueued, --queue="wm_input.fl", --local_jobid="WM_ID" 
+-s WorkloadManager, -e Resubmission, --result=${RESUB:-WILLRESUB}, --reason="let's try again", --tag=none
 -s WorkloadManager,-e HelperCall, --helper_name="name of the called component",--helper_params="parameters of the call", --src_role=CALLING
 -s WorkloadManager,-e Match,--dest_id="${DESTINATION:-destination CE/queue}"
 -s WorkloadManager,-e HelperReturn, --helper_name="name of the called component",--retval="returned data", --src_role=CALLING
index 50f3718..c171348 100644 (file)
@@ -9,14 +9,16 @@
 
 ! DESTINATION=CE2/queue2
 ! CE_NODE=worker2
-:resubmission_shallow:
+! RESUB=SHALLOW
+:resubmission:
 
 -s LogMonitor, -e Done, --status_code=FAILED, --reason="reason for the change", --exit_code=141
 -s LogMonitor, -e Resubmission, --result=WILLRESUB, --reason="let's try again", --tag=none
 
 ! DESTINATION=CE3/queue3
 ! CE_NODE=worker3
-:resubmission_shallow:
+! RESUB=SHALLOW
+:resubmission:
 
 -s LogMonitor, -e ReallyRunning, --wn_seq="$EDG_WL_SEQUENCE"
 
@@ -27,7 +29,8 @@
 
 ! DESTINATION=CE2/queue2
 ! CE_NODE=worker2
-:resubmission_deep:
+! RESUB=DEEP
+:resubmission:
 
 #       payload_running=false
 #       destination=CE2/queue2
index 9932d29..ba1201b 100644 (file)
@@ -2,6 +2,7 @@
 
 ! DESTINATION=CE1/queue1
 ! CE_NODE=worker1
+! RESUB=SHALLOW
 :running:
 
 -s LogMonitor, -e Done, --status_code=FAILED, --reason="reason for the change", --exit_code=141
@@ -9,7 +10,8 @@
 
 ! DESTINATION=CE2/queue2
 ! CE_NODE=worker2
-:resubmission_shallow:
+! RESUB=SHALLOW
+:resubmission:
 ! BRANCH_SEQUENCE="$EDG_WL_SEQUENCE"
 
 -s LogMonitor, -e Done, --status_code=FAILED, --reason="reason for the change", --exit_code=141
@@ -17,7 +19,8 @@
 
 ! DESTINATION=CE3/queue3
 ! CE_NODE=worker3
-:resubmission_shallow:
+! RESUB=SHALLOW
+:resubmission:
 
 -s LogMonitor, -e ReallyRunning, --wn_seq="$BRANCH_SEQUENCE"
 
@@ -34,7 +37,8 @@
 
 ! DESTINATION=CE4/queue4
 ! CE_NODE=worker4
-:resubmission_deep:
+! RESUB=DEEP
+:resubmission:
 
 #       payload_running=false
 #       destination=CE4/queue4
index a0e04d3..134368f 100644 (file)
@@ -9,14 +9,16 @@
 
 ! DESTINATION=CE2/queue2
 ! CE_NODE=worker2
-:resubmission_shallow:
+! RESUB=SHALLOW
+:resubmission:
 
 -s LogMonitor, -e Done, --status_code=FAILED, --reason="reason for the change", --exit_code=141
 -s LogMonitor, -e Resubmission, --result=WILLRESUB, --reason="let's try again", --tag=none
 
 ! DESTINATION=CE3/queue3
 ! CE_NODE=worker3
-:resubmission_shallow:
+! RESUB=SHALLOW
+:resubmission:
 
 -s LogMonitor, -e ReallyRunning, --wn_seq="$EDG_WL_SEQUENCE"
 
index a1e17b4..1cb74c2 100644 (file)
@@ -9,7 +9,8 @@
 
 ! DESTINATION=CE2/queue2
 ! CE_NODE=worker2
-:resubmission_shallow:
+! RESUB=SHALLOW
+:resubmission:
 ! BRANCH_SEQUENCE="$EDG_WL_SEQUENCE"
 
 -s LogMonitor, -e Done, --status_code=FAILED, --reason="reason for the change", --exit_code=141
@@ -17,7 +18,8 @@
 
 ! DESTINATION=CE3/queue3
 ! CE_NODE=worker3
-:resubmission_shallow:
+! RESUB=SHALLOW
+:resubmission:
 
 -s LogMonitor, -e ReallyRunning, --wn_seq="$BRANCH_SEQUENCE"
 
diff --git a/org.glite.lb.client/examples/stresslog.c b/org.glite.lb.client/examples/stresslog.c
new file mode 100644 (file)
index 0000000..8f47414
--- /dev/null
@@ -0,0 +1,165 @@
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <string.h>
+#include <errno.h>
+#include <fcntl.h>
+
+#include "glite/wmsutils/jobid/cjobid.h"
+#include "glite/lb/producer.h"
+#include "glite/lb/events.h"
+
+#define        MAXMSGSIZE      10240
+
+extern char *optarg;
+extern int opterr,optind;
+
+extern int edg_wll_DoLogEvent(edg_wll_Context context, edg_wll_LogLine logline);
+extern int edg_wll_DoLogEventProxy(edg_wll_Context context, edg_wll_LogLine logline);
+
+static const char *me;
+
+static void usage()
+{
+       fprintf(stderr,"usage: %s [-m bkserver] [-x] [-n jobs] [-f file_name]\n", me);
+}
+
+int main(int argc, char *argv[])
+{
+       char    *job = NULL,*server = NULL,*seq = NULL,*filename = NULL;
+       char    buf[MAXMSGSIZE];
+       int     lbproxy = 0, num_subjobs = 0;
+       int     done = 0, njobs = 1,i;
+       edg_wll_Context ctx;
+       edg_wlc_JobId   jobid,*subjobs;
+       FILE    *f;
+
+       edg_wll_InitContext(&ctx);
+       opterr = 0;
+
+       me = strdup(argv[0]);
+
+       do {
+               switch (getopt(argc,argv,"m:xn:f:")) {
+                       case 'm': server = strdup(optarg); break;
+                       case 'x': lbproxy = 1; break;
+                       case 'n': 
+                               njobs = atoi(optarg); 
+                               fprintf(stderr,"WARNING: -n option not implemented yet\n");
+                               break;
+                       case 'f': filename = (char *) strdup(optarg); break;
+                       case '?': usage(); exit(EINVAL);
+                       case -1: done = 1; break;
+               }
+       } while (!done);
+
+       if (!server) {
+               fprintf(stderr,"%s: -m required\n",me);
+               usage();
+               exit(1);
+       }
+
+       if (njobs <= 0) {
+               fprintf(stderr,"%s: wrong number of jobs\n",me);
+               usage();
+               exit(1);
+       }
+
+       if (!filename) {
+               fprintf(stderr,"%s: -f required\n",me);
+               usage();
+               exit(1);
+       }
+
+       if ( (f = fopen(filename,"r")) == NULL) {
+               perror(filename);
+               exit(1);
+       }
+       
+       /* create jobid */
+       if (!job) {
+               char *p = strchr(server,':');
+               if (p) *p=0;
+               edg_wlc_JobIdCreate(server,p?atoi(p+1):0,&jobid);
+               job = edg_wlc_JobIdUnparse(jobid);
+               // fprintf(stdout,"new jobid: %s\n",job);
+       }
+       else if ((errno = edg_wlc_JobIdParse(job,&jobid))) {
+               perror(job);
+               exit(1);
+       }
+
+       /* register */
+       edg_wll_SetParam(ctx,EDG_WLL_PARAM_SOURCE,EDG_WLL_SOURCE_USER_INTERFACE);
+       // edg_wll_SetParam(ctx,EDG_WLL_PARAM_SOURCE,edg_wll_StringToSource(src));
+       if (lbproxy) {
+               if (edg_wll_RegisterJobProxy(ctx,jobid,
+                       num_subjobs?EDG_WLL_REGJOB_DAG:EDG_WLL_REGJOB_SIMPLE,
+                       "JDL: blabla", "NNNSSSS",
+                       num_subjobs,NULL,&subjobs))
+               {
+                       char    *et,*ed;
+                       edg_wll_Error(ctx,&et,&ed);
+                       fprintf(stderr,"edg_wll_RegisterJobProxy(%s): %s (%s)\n",job,et,ed);
+                       exit(1);
+               }
+       } else {
+               if (edg_wll_RegisterJobSync(ctx,jobid,
+                       num_subjobs?EDG_WLL_REGJOB_DAG:EDG_WLL_REGJOB_SIMPLE,
+                       "JDL: blabla", "NNNSSSS",
+                       num_subjobs,NULL,&subjobs))
+               {
+                       char    *et,*ed;
+                       edg_wll_Error(ctx,&et,&ed);
+                       fprintf(stderr,"edg_wll_RegisterJobSync(%s): %s (%s)\n",job,et,ed);
+                       exit(1);
+               }
+       }
+
+       /* log events */
+       i = 1;
+       while (!feof(f)) {
+               edg_wll_LogLine logline;
+
+               if (!fgets(buf,sizeof(buf),f)) break;
+               if (strcmp(buf,"\n")) {
+                       // fprintf(stdout,"%d: %s\n",i,buf);
+                       asprintf(&logline,"DG.JOBID=\"%s\" %s",job,buf);
+                       if (lbproxy) {
+                               if (edg_wll_DoLogEventProxy(ctx,logline)) {
+                                       char    *et,*ed;
+                                       edg_wll_Error(ctx,&et,&ed);
+                                       fprintf(stderr,"edg_wll_DoLogEventProxy(): %s (%s)\n",et,ed);
+                                       exit(1);
+                               }
+                       } else {
+                               if (edg_wll_DoLogEvent(ctx,logline)) {
+                                       char    *et,*ed;
+                                       edg_wll_Error(ctx,&et,&ed);
+                                       fprintf(stderr,"edg_wll_DoLogEvent(): %s (%s)\n",et,ed);
+                                       exit(1);
+                               }
+                       }
+                       if (logline) free(logline);
+               }
+               i++;
+       }
+       fclose(f);
+
+       /* seq. code */
+       seq = edg_wll_GetSequenceCode(ctx);
+       fprintf(stdout,"\n%s=\"%s\"\n",num_subjobs?"EDG_WL_DAG_JOBID":"EDG_JOBID",job);
+       fprintf(stdout,"EDG_WL_SEQUENCE=\"%s\"\n",seq);
+       free(seq);
+       free(job);
+
+       if (num_subjobs) for (i=0; subjobs[i]; i++) {
+               char    *job_s = edg_wlc_JobIdUnparse(subjobs[i]);
+               fprintf(stdout,"EDG_WL_SUB_JOBID[%d]=\"%s\"\n",i,job_s);
+               free(job_s);
+       }
+
+       edg_wll_FreeContext(ctx);
+
+       return 0;
+}
index 557b922..45d44a1 100644 (file)
@@ -1,2 +1,2 @@
-#Sat Oct 15 06:41:11 CEST 2005
-module.build=153
+#Wed Mar 15 05:03:30 CET 2006
+module.build=0233
index 17c4c40..b6a2803 100644 (file)
 
        Revision history:
        $Log$
+       Revision 1.3.14.1  2006/03/08 11:49:31  jpospi
+       added mysql dependency
+       
+       Revision 1.3  2004/08/31 16:32:31  jpospi
+       First attempt to producer test
+       
        Revision 1.2  2004/07/06 20:47:11  flammer
        Moved to configure.properties.xml
        
@@ -51,6 +57,8 @@ thrflavour=${with.globus.thr.flavor}
 nothrflavour=${with.globus.nothr.flavor}
 expat_prefix=${with.expat.prefix}
 ares_prefix=${with.ares.prefix}
+mysql_prefix=${with.mysql.prefix}
+mysql_version=${ext.mysql.version}
 cppunit_prefix=${with.cppunit.prefix}
                        </echo>
            </target>
index 951cf76..8754ea7 100644 (file)
@@ -1,6 +1,4 @@
 #Fri Sep 02 14:17:41 CEST 2005
-module.version=3.0.0
 # glite-lb-client_branch_3_0_0_RC15 tag is taken!
-
-module.build=2
-module.age=1
+module.version=2.1.4
+module.age=0
index 33fdf78..232be5b 100644 (file)
@@ -204,6 +204,10 @@ int edg_wll_open_proxy(edg_wll_Context ctx)
        int                     flags;
        
 
+       if (ctx->connProxy->conn.sock > -1) {
+               // XXX: test path socket here?
+               return edg_wll_ResetError(ctx);
+       }
        ctx->connProxy->conn.sock = socket(PF_UNIX, SOCK_STREAM, 0);
        if (ctx->connProxy->conn.sock < 0) {
                edg_wll_SetError(ctx, errno, "socket() error");
@@ -253,7 +257,7 @@ int http_check_status(
        char *response)
 
 {
-       int     code,len;
+       int     code = HTTP_INTERNAL,len = 0;
 
        edg_wll_ResetError(ctx);
        sscanf(response,"HTTP/%*f %n%d",&len,&code);
index ce74d36..2eb1850 100644 (file)
@@ -134,6 +134,8 @@ int main(int argc, char **argv)
                }
                if ( !ln ) break;
 
+               if (*ln == 0) continue;
+
                if ( edg_wll_ParseEvent(ctx, ln, &ev) != 0 ) {
                        cleanup(1);
                }
@@ -158,7 +160,7 @@ int main(int argc, char **argv)
                                        cleanup(1);
                                }
                                snprintf(fname, PATH_MAX, "%s/%s.%ld", store_pref, unique, (long) time(NULL));
-                               if ( (fd = open(fname, O_CREAT|O_EXCL|O_RDWR, 00600)) < 0 ) {
+                               if ( (fd = open(fname, O_CREAT|O_EXCL|O_RDWR, 00640)) < 0 ) {
                                        if ( errno == EEXIST ) { sleep(2); continue; }
                                        perror(fname);
                                        cleanup(1);
index c58f72b..b939e05 100644 (file)
@@ -35,7 +35,7 @@ int main(int argc, char *argv[])
        char    *ff = NULL, *jobid_s = NULL;
        char    *src_instance = NULL;
        int     err = 0/*,i,done = 0,fmtlen*/;
-       int     pri = 0;
+       int     pri = 0, noinc = 0;
        int             use_lbproxy = 0;
        char    *lbproxy_user = NULL;
        char    *lbproxy_store_sock = NULL;
@@ -91,6 +91,7 @@ int main(int argc, char *argv[])
            { EDG_WLL_ARGS_BOOL, "d", "debug mode", "enable debug mode", &deb },
 #ifdef ENABLE_REASON_LENGTH
            { EDG_WLL_ARGS_INT, "l", "reason-length", "extend 'reason' string to length (debug only)", &elength, 0, 1000000000 },
+           { EDG_WLL_ARGS_BOOL, "I", "dont-increment", "don't increment initial seqence code", &noinc },
 #endif
 @@@{
     my %typetab = (
@@ -187,6 +188,8 @@ int main(int argc, char *argv[])
                }
        }
 
+       if (noinc) edg_wll_SetSequenceCode(ctx,code,EDG_WLL_SEQ_NORMAL);
+
 #ifdef ENABLE_REASON_LENGTH
        if (elength > 200000000) {
                fprintf(stderr,"%s: usupported reason message length: %d\n", argv[0], elength);
index 0eb6948..19822e6 100644 (file)
@@ -81,7 +81,7 @@ plain_reader(void *user_data, char *buffer, int max_len)
 
        len = edg_wll_plain_read_full(data->conn, buffer, max_len, &data->ctx->p_tmp_timeout);
        if(len < 0) 
-               edg_wll_SetError(data->ctx, LB_PROTO, "get_reply_plain(): error reading message data");
+               edg_wll_SetError(data->ctx, EDG_WLL_IL_PROTO, "plain_reader(): error reading message data");
 
        return(len);
 }
@@ -96,23 +96,22 @@ plain_reader(void *user_data, char *buffer, int max_len)
  */
 static
 int
-get_reply_plain(edg_wll_Context context, edg_wll_PlainConnection *conn, char **buf, int *code_min)
+get_reply_plain(edg_wll_Context context, edg_wll_PlainConnection *conn, char **buf, int *code_maj, int *code_min)
 {
        char *msg=NULL;
-       int len, code;
+       int len;
        struct reader_data data;
 
        data.ctx = context;
        data.conn = conn;
-       code = 0;
        len = read_il_data(&data, &msg, plain_reader);
        if(len < 0) {
-               edg_wll_SetError(context, LB_PROTO, "get_reply_plain(): error reading message");
+               edg_wll_SetError(context, EDG_WLL_IL_PROTO, "get_reply_plain(): error reading message");
                goto get_reply_plain_end;
        }
 
-       if(decode_il_reply(&code, code_min, buf, msg) < 0) {
-               edg_wll_SetError(context, LB_PROTO, "get_reply_plain(): error decoding message");
+       if(decode_il_reply(code_maj, code_min, buf, msg) < 0) {
+               edg_wll_SetError(context, EDG_WLL_IL_PROTO, "get_reply_plain(): error decoding message");
                goto get_reply_plain_end;
        }
 
@@ -134,7 +133,7 @@ gss_reader(void *user_data, char *buffer, int max_len)
                                    &len, &gss_code);
        if(ret < 0) {
                edg_wll_log_proto_handle_gss_failures(data->ctx, ret, &gss_code, "edg_wll_gss_read_full");
-               edg_wll_UpdateError(data->ctx, LB_PROTO, "get_reply_gss(): error reading message");
+               edg_wll_UpdateError(data->ctx, EDG_WLL_IL_PROTO, "gss_reader(): error reading message");
        }
 
        return(ret);
@@ -143,7 +142,7 @@ gss_reader(void *user_data, char *buffer, int max_len)
 
 static
 int
-get_reply_gss(edg_wll_Context context, edg_wll_GssConnection *conn, char **buf, int *code_min)
+get_reply_gss(edg_wll_Context context, edg_wll_GssConnection *conn, char **buf, int *code_maj, int *code_min)
 {
        char *msg = NULL;
        int code;
@@ -153,12 +152,12 @@ get_reply_gss(edg_wll_Context context, edg_wll_GssConnection *conn, char **buf,
        data.conn = conn;
        code = read_il_data(&data, &msg, gss_reader);
        if(code < 0) {
-               edg_wll_SetError(context, LB_PROTO, "get_reply_gss(): error reading reply");
+               edg_wll_SetError(context, EDG_WLL_IL_PROTO, "get_reply_gss(): error reading reply");
                goto get_reply_gss_end;
        }
 
-       if(decode_il_reply(&code, code_min, buf, msg) < 0) {
-               edg_wll_SetError(context, LB_PROTO, "get_reply_gss(): error decoding reply");
+       if(decode_il_reply(code_maj, code_min, buf, msg) < 0) {
+               edg_wll_SetError(context, EDG_WLL_IL_PROTO, "get_reply_gss(): error decoding reply");
                goto get_reply_gss_end;
        }
 
@@ -282,9 +281,11 @@ int edg_wll_log_proto_client_proxy(edg_wll_Context context, edg_wll_PlainConnect
        static char et[256];
        int     err;
        int     code;
+       int     lbproto_code;
        int     count;
 
        errno = err = code = count = 0;
+       lbproto_code = 0;
        edg_wll_ResetError(context);
 
        len = encode_il_msg(&buffer, logline);
@@ -298,7 +299,7 @@ int edg_wll_log_proto_client_proxy(edg_wll_Context context, edg_wll_PlainConnect
        fprintf(stderr,"log_proto_client_proxy: sending message...\n");
 #endif
        if (( count = edg_wll_plain_write_full(conn, buffer, len, &context->p_tmp_timeout)) < 0) {
-               edg_wll_SetError(context,LB_PROTO,"edg_wll_log_proto_client_proxy(): error sending message to socket");
+               edg_wll_SetError(context, EDG_WLL_IL_PROTO,"edg_wll_log_proto_client_proxy(): error sending message to socket");
                goto edg_wll_log_proto_client_proxy_end;
        }
 
@@ -306,14 +307,28 @@ int edg_wll_log_proto_client_proxy(edg_wll_Context context, edg_wll_PlainConnect
 #ifdef EDG_WLL_LOG_STUB
        fprintf(stderr,"log_proto_client_proxy: reading answer from server...\n");
 #endif
-       if ((err = get_reply_plain(context, conn, &answer, &code)) < 0 ) {
-               edg_wll_SetError(context,LB_PROTO,"edg_wll_log_proto_client_proxy(): error reading answer from L&B Proxy server");
+       if ((err = get_reply_plain(context, conn, &answer, &lbproto_code, &code)) != 0 ) {
+               edg_wll_SetError(context, EDG_WLL_IL_PROTO,"edg_wll_log_proto_client_proxy(): error reading answer from L&B Proxy server");
        } else {
 #ifdef EDG_WLL_LOG_STUB
-               fprintf(stderr,"log_proto_client_proxy: read answer \"%d: %s\"\n",code,answer);
+               fprintf(stderr,"log_proto_client_proxy: read answer \"%d:%d: %s\"\n",lbproto_code,code,answer);
 #endif
-               snprintf(et, sizeof(et), "answer read from L&B Proxy server: %s", answer);
-               edg_wll_SetError(context,code,et);
+               switch (lbproto_code) {
+                       case LB_OK: break;
+                       case LB_NOMEM: 
+                               edg_wll_SetError(context, ENOMEM, "edg_wll_log_proto_client_proxy(): proxy out of memory"); 
+                               break;
+                       case LB_PROTO:
+                               edg_wll_SetError(context, EDG_WLL_IL_PROTO, "edg_wll_log_proto_client_proxy(): received protocol error response"); 
+                               break;
+                       case LB_DBERR:
+                               snprintf(et, sizeof(et), "error details from L&B Proxy server: %s", answer);
+                               edg_wll_SetError(context, code, et);
+                               break;
+                       default:
+                               edg_wll_SetError(context, EDG_WLL_IL_PROTO, "edg_wll_log_proto_client_proxy(): received unknown protocol response"); 
+                               break;
+               }
        }
 
 edg_wll_log_proto_client_proxy_end:
@@ -341,7 +356,7 @@ int edg_wll_log_proto_client_direct(edg_wll_Context context, edg_wll_GssConnecti
        char *buffer,*answer = NULL;
        static char et[256];
        int     err;
-       int     code;
+       int     code, lbproto_code;
        int     count;
        edg_wll_GssStatus gss_code;
 
@@ -361,7 +376,7 @@ int edg_wll_log_proto_client_direct(edg_wll_Context context, edg_wll_GssConnecti
        count = 0;
        if (( err = edg_wll_gss_write_full(con, buffer, len, &context->p_tmp_timeout,  &count, &gss_code)) < 0) {
                edg_wll_log_proto_handle_gss_failures(context,err,&gss_code,"edg_wll_gss_write_full()");
-               edg_wll_UpdateError(context,LB_PROTO,"edg_wll_log_proto_client_direct(): error sending message");
+               edg_wll_UpdateError(context, EDG_WLL_IL_PROTO,"edg_wll_log_proto_client_direct(): error sending message");
                goto edg_wll_log_proto_client_direct_end;
        }
 
@@ -369,14 +384,28 @@ int edg_wll_log_proto_client_direct(edg_wll_Context context, edg_wll_GssConnecti
 #ifdef EDG_WLL_LOG_STUB
        fprintf(stderr,"log_proto_client_direct: reading answer from server...\n");
 #endif
-       if ((err = get_reply_gss(context, con, &answer, &code)) < 0 ) {
-               edg_wll_SetError(context,LB_PROTO,"edg_wll_log_proto_client_direct(): error reading answer from L&B direct server");
+       if ((err = get_reply_gss(context, con, &answer, &lbproto_code, &code)) != 0 ) {
+               edg_wll_SetError(context, EDG_WLL_IL_PROTO,"edg_wll_log_proto_client_direct(): error reading answer from L&B direct server");
        } else {
 #ifdef EDG_WLL_LOG_STUB
-               fprintf(stderr,"log_proto_client_direct: read answer \"%d: %s\"\n",code,answer);
+               fprintf(stderr,"log_proto_client_direct: read answer \"%d:%d: %s\"\n",lbproto_code,code,answer);
 #endif
-               snprintf(et, sizeof(et), "answer read from bkserver: %s", answer);
-               edg_wll_SetError(context,code,et);
+               switch (lbproto_code) {
+                       case LB_OK: break;
+                       case LB_NOMEM: 
+                               edg_wll_SetError(context, ENOMEM, "log_proto_client_direct(): server out of memory"); 
+                               break;
+                       case LB_PROTO:
+                               edg_wll_SetError(context, EDG_WLL_IL_PROTO, "log_proto_client_direct(): received protocol error response"); 
+                               break;
+                       case LB_DBERR:
+                               snprintf(et, sizeof(et), "error details from L&B server: %s", answer);
+                               edg_wll_SetError(context, code, et);
+                               break;
+                       default:
+                               edg_wll_SetError(context, EDG_WLL_IL_PROTO, "log_proto_client_direct(): received unknown protocol response"); 
+                               break;
+               }
        }
 
 edg_wll_log_proto_client_direct_end:
index 0922d0b..1374d90 100644 (file)
@@ -88,7 +88,7 @@ int handle_answers(edg_wll_Context context, int code, const char *text)
  * \param logline      IN formated ULM string
  *----------------------------------------------------------------------
  */
-static int edg_wll_DoLogEvent(
+int edg_wll_DoLogEvent(
        edg_wll_Context context,
        edg_wll_LogLine logline)
 {
@@ -151,7 +151,7 @@ edg_wll_DoLogEvent_end:
  * \param logline      IN formated ULM string
  *----------------------------------------------------------------------
  */
-static int edg_wll_DoLogEventProxy(
+int edg_wll_DoLogEventProxy(
        edg_wll_Context context,
        edg_wll_LogLine logline)
 {
@@ -227,7 +227,7 @@ edg_wll_DoLogEventProxy_end:
  * \param logline      IN formated ULM string
  *----------------------------------------------------------------------
  */
-static int edg_wll_DoLogEventDirect(
+int edg_wll_DoLogEventDirect(
        edg_wll_Context context,
        edg_wll_LogLine logline)
 {
@@ -942,6 +942,7 @@ int edg_wll_RegisterSubjobsProxy(
                pjdl++; psubjob++;
        }
 
+edg_wll_registersubjobsproxy_end:
        edg_wll_SetLoggingJobProxy(ctx, oldctxjob, oldctxseq, NULL, EDG_WLL_SEQ_NORMAL);
 
 edg_wll_registersubjobsproxy_end: