VPATH=${top_srcdir}/src:${top_srcdir}/examples:${top_srcdir}/project$:${top_srcdir}/interface:${stagedir}/interface:${top_srcdir}/build
+ifdef JP_PERF
+ JP_PERF_CFLAGS:=-DJP_PERF=1
+endif
+
GLOBUS_LIBS:=-L${globus_prefix}/lib \
-lglobus_common_${nothrflavour} \
-lglobus_gssapi_gsi_${nothrflavour} \
DEBUG:=-g -O0 -DDEBUG
-CFLAGS:=${DEBUG} -I. -I${top_srcdir}/interface -I${top_srcdir}/src -I${gsoap_prefix}/include -I${gsoap_prefix} -I${stagedir}/include ${GLOBUS_CFLAGS} -I${libtar}/include
+CFLAGS:=${DEBUG} -I. -I${top_srcdir}/interface -I${top_srcdir}/src -I${gsoap_prefix}/include -I${gsoap_prefix} -I${stagedir}/include ${GLOBUS_CFLAGS} -I${libtar}/include ${JP_PERF_CFLAGS}
LDFLAGS:=-L${stagedir}/lib -L${libtar}/lib
LINK:=libtool --mode=link ${CC} ${LDFLAGS}
STAGE_HDRS:=jpcl_ctx_int.h
HDRS:=jp_client.h jpimporter.h
-EXAMPLES:=jpps_upload_files
+EXAMPLES:=jpps_upload_files mill_feed
LIBOBJS:=jpcl_ctx.o jpimp_lib.o
LIBTHROBJS:=${LIBOBJS:.o=.thr.o}
${EXAMPLES}: ${LIB}
-${EXAMPLES}: %: %.o
+jpps_upload_files: %: %.o
${LINK} -o $@ $< ${LIB} ${LBMAILDIRLIB}
+mill_feed: %: %.o
+ ${LINK} -o $@ $< ${LBMAILDIRLIB} -lglite_lb_common_${nothrflavour}
+
JobProvenancePS.xh: %.xh: %.wsdl JobProvenanceTypes.wsdl typemap.dat
${INSTALL} -m 755 ${daemon} ${PREFIX}/bin
${INSTALL} -m 644 ${LIB} ${PREFIX}/lib
${INSTALL} -m 644 jpps_upload_files ${PREFIX}/examples/glite-jp-primary-upload_files
+ ${INSTALL} -m 755 mill_feed ${PREFIX}/examples/glite-jp-mill_feed
cd ${top_srcdir}/examples && ${INSTALL} -m 755 glite-jp-importer.sh ${PREFIX}/examples/
${INSTALL} -m 755 ${top_srcdir}/config/startup ${PREFIX}/etc/init.d/glite-jp-importer
cd ${top_srcdir}/interface && ${INSTALL} -m 644 ${HDRS} ${PREFIX}/${STAGETO}
--- /dev/null
+#include <signal.h>
+#include <stdlib.h>
+#include <stdio.h>
+#include <errno.h>
+#include <string.h>
+#include <time.h>
+#include "jp_client.h"
+#include "jpimporter.h"
+#include "glite/lb/lb_maildir.h"
+#include "glite/wmsutils/jobid/cjobid.h"
+#include "glite/wmsutils/jobid/cjobid.h"
+
+
+#define USER "Job Generator Buddy"
+#define BKSERVER "funny.zcu.cz"
+#define BKPORT 9000
+#ifndef EDG_DUMP_STORAGE
+#define EDG_DUMP_STORAGE "/tmp/dump"
+#endif
+#ifndef EDG_PURGE_STORAGE
+#define EDG_PURGE_STORAGE "/tmp/purge"
+#endif
+
+
+char *jpreg_dir;
+char *dump_dir;
+char *user;
+int do_exit = 0;
+int perf_regs, perf_dumps;
+char perf_ts[100];
+
+
+static int register_init();
+static int register_add(const char *jobid);
+static void get_time(char *s, size_t maxs, double *t);
+static int dump_init();
+
+
+static void handler(int sig) {
+ do_exit = sig;
+ signal(sig, SIG_DFL);
+}
+
+int main(int argc, char *argv[]) {
+ char start_jobid[256], stop_jobid[256];
+ double ts, ts2;
+ int ret;
+
+ get_time(perf_ts, sizeof(perf_ts), &ts);
+ snprintf(start_jobid, sizeof(start_jobid), PERF_JOBID_START_PREFIX "%s", perf_ts);
+ snprintf(stop_jobid, sizeof(stop_jobid), PERF_JOBID_STOP_PREFIX "%s", perf_ts);
+
+ if ((ret = register_init()) != 0) return ret;
+ if ((ret = dump_init()) != 0) return ret;
+ if ((ret = register_add(start_jobid)) != 0) return ret;
+ if (signal(SIGINT, handler) == SIG_ERR) {
+ ret = errno;
+ fprintf(stderr, "%s: can't set signal handler: %s\n", __FUNCTION__, strerror(errno));
+ return ret;
+ }
+ printf("%s\n", start_jobid);
+ printf("start: %lf\n", ts);
+ while (!do_exit) {
+ if ((ret = register_add(NULL)) != 0) return ret;
+ if (argc > 1)
+ if ((ret = dump_add(argv[1])) != 0) return ret;
+ }
+ if ((ret = register_add(stop_jobid)) != 0) return ret;
+ get_time(NULL, -1, &ts2);
+ printf("stop: %lf\n", ts2);
+ printf("regs: %d (%lf jobs/day)\n", perf_regs, 86400.0 * perf_regs / (ts2-ts));
+ printf("dumps: %d (%lf jobs/day)\n", perf_dumps, 86400.0 * perf_dumps / (ts2-ts));
+ printf("%s\n", stop_jobid);
+
+ return 0;
+}
+
+
+static void get_time(char *s, size_t maxs, double *t) {
+ struct timeval tv;
+ struct tm tm;
+
+ gettimeofday(&tv, NULL);
+ if (t) *t = tv.tv_sec + (double)tv.tv_usec / 1000000.0;
+ gmtime_r(&tv.tv_sec, &tm);
+ if (s && maxs > 0) strftime(s, maxs, "%FT%TZ", &tm);
+}
+
+
+static int register_init() {
+ char *env;
+
+ env = getenv("GLITE_LB_EXPORT_JPREG_MAILDIR");
+ if (!env) env = GLITE_REG_IMPORTER_MDIR;
+ jpreg_dir = strdup(env);
+
+ // TODO: better from certificate
+ env = getenv("GLITE_USER");
+ if (!env) env = USER;
+ user = strdup(env);
+
+ if (edg_wll_MaildirInit(jpreg_dir) != 0) {
+ fprintf(stderr, "maildir init on %s failed\n", jpreg_dir);
+ return EIO;
+ }
+
+ perf_regs = 0;
+ return 0;
+}
+
+
+static int register_add(const char *jobid) {
+ edg_wlc_JobId j;
+ char *tmpjobid, *msg;
+
+ if (!jobid) {
+ if (edg_wlc_JobIdCreate(BKSERVER, BKPORT, &j) != 0 || (tmpjobid = edg_wlc_JobIdUnparse(j)) == NULL) {
+ fprintf(stderr, "Can't create jobid\n");
+ return EIO;
+ }
+ edg_wlc_JobIdFree(j);
+ } else tmpjobid = strdup(jobid);
+ asprintf(&msg, "%s\n%s", tmpjobid, user);
+ free(tmpjobid);
+ if (edg_wll_MaildirStoreMsg(jpreg_dir, BKSERVER, msg) != 0) {
+ fprintf(stderr, "Can't store message: %s\n", lbm_errdesc);
+ return EIO;
+ }
+ free(msg);
+
+ perf_regs++;
+ return 0;
+}
+
+
+static int dump_init() {
+ char *env;
+
+ // FIXME: is it OK? (probably different HEAD and branch)
+ env = getenv("GLITE_LB_EXPORT_DUMPDIR");
+ if (!env) env = EDG_DUMP_STORAGE;
+ dump_dir = strdup(env);
+ mkdir(dump_dir, 0755);
+ perf_dumps = 0;
+
+ return 0;
+}
+
+
+static int dump_add(const char *filename) {
+ char *fn;
+ int ret;
+
+ asprintf(&fn, "%s/mill-test-%s-%06d", dump_dir, perf_ts, perf_dumps);
+ if ((ret = link(filename, fn)) != 0) {
+ fprintf(stderr, "Can't link file: %s\n", strerror(errno));
+ }
+ free(fn);
+
+ perf_dumps++;
+ return ret;
+}
#include <string.h>
#include <assert.h>
#include <linux/limits.h>
+#include <sys/time.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <sys/stat.h>
#include "glite/security/glite_gscompat.h"
#include "globus_ftp_client.h"
+#include "jp_client.h"
+#include "jpimporter.h"
#if GSOAP_VERSION <= 20602
#define soap_call___jpsrv__RegisterJob soap_call___ns1__RegisterJob
#define GLITE_JPIMPORTER_PIDFILE "/var/run/glite-jpimporter.pid"
#endif
-#ifndef GLITE_REG_IMPORTER_MDIR
-#define GLITE_REG_IMPORTER_MDIR "/tmp/jpreg"
-#endif
-
-#ifndef GLITE_DUMP_IMPORTER_MDIR
-#define GLITE_DUMP_IMPORTER_MDIR "/tmp/jpdump"
-#endif
-
-#ifndef GLITE_SANDBOX_IMPORTER_MDIR
-#define GLITE_SANDBOX_IMPORTER_MDIR "/tmp/jpsandbox"
-#endif
-
#ifndef GLITE_JPPS
#define GLITE_JPPS "http://localhost:8901"
#endif
-
#define MAX_REG_CONNS 500
#define JPPS_NO_RESPONSE_TIMEOUT 120
-
static int debug = 0;
static int die = 0;
static int child_died = 0;
static gss_cred_id_t mycred = GSS_C_NO_CREDENTIAL;
static char *mysubj;
struct timeval to = {JPPS_NO_RESPONSE_TIMEOUT, 0};
+#ifdef JP_PERF
+int sink = 0;
+int perf_regs, perf_dumps, perf_sandboxes;
+char *perf_id = NULL;
+double perf_start, perf_end;
+#endif
static struct option opts[] = {
{ "pidfile", 1, NULL, 'i'},
{ "poll", 1, NULL, 't'},
{ "store", 1, NULL, 'S'},
+ { "store", 1, NULL, 'S'},
+#ifdef JP_PERF
+ { "perf-sink", 1, NULL, 'K'},
+#endif
{ NULL, 0, NULL, 0}
};
-static const char *get_opt_string = "hgp:r:d:s:i:t:c:k:C:";
+static const char *get_opt_string = "hgp:r:d:s:i:t:c:k:C:"
+#ifdef JP_PERF
+ "K:"
+#endif
+;
#include "glite/jp/ws_fault.c"
"\t-d, --dump-mdir path to the 'LB maildir' subtree for LB dumps\n"
"\t-s, --sandbox-mdir path to the 'LB maildir' subtree for input/output sandboxes\n"
"\t-i, --pidfile file to store master pid\n"
- "\t-t, --poll maildir polling interval (in seconds)\n",
- "\t-S, --store keep uploaded jobs in this directory\n",
- me);
+ "\t-t, --poll maildir polling interval (in seconds)\n"
+ "\t-S, --store keep uploaded jobs in this directory\n"
+#ifdef JP_PERF
+ "\t-K, --perf-sink 1=stats, 2=stats+run w/o WS calls\n"
+#endif
+ , me);
}
static void catchsig(int sig)
case 'd': strcpy(dump_mdir, optarg); break;
case 's': strcpy(sandbox_mdir, optarg); break;
case 'i': strcpy(pidfile, optarg); break;
+#ifdef JP_PERF
+ case 'K': sink = atoi(optarg); break;
+#endif
case '?': usage(name); return 1;
}
if ( optind < argc ) { usage(name); return 1; }
dprintf("[%s] slave started - pid [%d]\n", name, getpid());
- while ( !die && conn_cnt < MAX_REG_CONNS ) {
+#ifdef JP_PERF
+ while ( !die && (conn_cnt < MAX_REG_CONNS || (sink & 1)) ) {
+#else
+ while ( !die && (conn_cnt < MAX_REG_CONNS) ) {
+#endif
int ret = fn();
if ( ret > 0 ) conn_cnt++;
in.owner = aux;
dprintf("[%s] Registering '%s'\n", name, msg);
if ( !debug ) syslog(LOG_INFO, "Registering '%s'\n", msg);
+#ifdef JP_PERF
+ if (sink) {
+ struct timeval tv;
+
+ if (strncasecmp(msg, PERF_JOBID_START_PREFIX, sizeof(PERF_JOBID_START_PREFIX) - 1) == 0) {
+ perf_regs = 0;
+ perf_dumps = 0;
+ perf_sandboxes = 0;
+ free(perf_id);
+ perf_id = strdup(msg + sizeof(PERF_JOBID_START_PREFIX) - 1);
+ gettimeofday(&tv, NULL);
+ perf_start = tv.tv_sec + (double)tv.tv_usec / 1000000.0;
+ dprintf("[statistics] %s\n", perf_id);
+ } else if (strncasecmp(msg, PERF_JOBID_STOP_PREFIX, sizeof(PERF_JOBID_STOP_PREFIX) - 1) == 0) {
+ gettimeofday(&tv, NULL);
+ perf_end = tv.tv_sec + (double)tv.tv_usec / 1000000.0;
+ dprintf("[statistics] %s\n", perf_id);
+ dprintf("[statistics] start: %lf\n", perf_start);
+ dprintf("[statistics] stop: %lf\n", perf_end);
+ dprintf("[statistics] regs: %d (%lf jobs/day)\n", perf_regs, 86400.0 * perf_regs / (perf_end - perf_start));
+ } else perf_regs++;
+ }
+ if (!(sink & 2)) {
+#endif
ret = soap_call___jpsrv__RegisterJob(soap, jpps, "", &in, &empty);
if ( (ret = check_soap_fault(soap, ret)) ) break;
+#ifdef JP_PERF
+ } else ret = 0;
+#endif
} while (0);
edg_wll_MaildirTransEnd(reg_mdir, fname, ret? LBMD_TRANS_FAILED_RETRY: LBMD_TRANS_OK);
free(fname);
su_in.contentType = "text/lb";
dprintf("[%s] Importing LB dump file '%s'\n", name, tab[_file].val);
if ( !debug ) syslog(LOG_INFO, "Importing LB dump file '%s'\n", msg);
+#ifdef JP_PERF
+ if (sink) perf_dumps++;
+ if (!(sink & 2)) {
+#endif
ret = soap_call___jpsrv__StartUpload(soap, tab[_jpps].val?:jpps, "", &su_in, &su_out);
if ( (ret = check_soap_fault(soap, ret)) ) break;
dprintf("[%s] Destination: %s\n\tCommit before: %s\n", name, su_out.destination, ctime(&su_out.commitBefore));
ret = soap_call___jpsrv__CommitUpload(soap, tab[_jpps].val?:jpps, "", &cu_in, &empty);
if ( (ret = check_soap_fault(soap, ret)) ) break;
dprintf("[%s] Dump upload succesfull\n", name);
+#ifdef JP_PERF
+ } else ret = 0;
+#endif
if (store && *store) {
bname = strdup(tab[_file].val);
snprintf(fspec, sizeof fspec, "%s/%s", store, basename(bname));
su_in.contentType = "tar/lb";
dprintf("[%s] Importing LB sandbox tar file '%s'\n", name, tab[_file].val);
if ( !debug ) syslog(LOG_INFO, "Importing LB sandbox tar file '%s'\n", msg);
+#ifdef JP_PERF
+ if (sink) perf_sandboxes++;
+ if (!(sink & 2)) {
+#endif
ret = soap_call___jpsrv__StartUpload(soap, tab[_jpps].val?:jpps, "", &su_in, &su_out);
ret = check_soap_fault(soap, ret);
/* XXX: grrrrrrr! test it!!!*/
ret = soap_call___jpsrv__CommitUpload(soap, tab[_jpps].val?:jpps, "", &cu_in, &empty);
if ( (ret = check_soap_fault(soap, ret)) ) break;
dprintf("[%s] Dump upload succesfull\n", name);
+#ifdef JP_PERF
+ } else ret = 0;
+#endif
} while (0);
edg_wll_MaildirTransEnd(sandbox_mdir, fname, ret? LBMD_TRANS_FAILED_RETRY: LBMD_TRANS_OK);