--- /dev/null
+#include <sys/time.h>
+#include <sys/stat.h>
+#include <sys/types.h>
+#include <unistd.h>
+#include <signal.h>
+#include <stdlib.h>
+#include <stdio.h>
+#include <errno.h>
+#include <string.h>
+#include <time.h>
+#include <getopt.h>
+#include "jp_client.h"
+#include "jpimporter.h"
+#include "glite/lb/lb_maildir.h"
+#include "glite/jobid/cjobid.h"
+
+
+#define USER "Job Generator Buddy"
+#define BKSERVER "funny.zcu.cz"
+#define BKPORT 9000
+#ifndef EDG_DUMP_STORAGE
+#define EDG_DUMP_STORAGE "/tmp/dump"
+#endif
+#ifndef EDG_PURGE_STORAGE
+#define EDG_PURGE_STORAGE "/tmp/purge"
+#endif
+
+
+char *jpreg_dir;
+char *dump_dir;
+char *user;
+int do_exit = 0;
+int perf_regs, perf_dumps;
+char perf_ts[100];
+char *dump; char **dump_index; size_t dump_tokens;
+int speed = 0;
+double duration = 0.0;
+
+static struct option opts[] = {
+ { "help", 0, NULL, 'h'},
+ { "reg-mdir", 1, NULL, 'R'},
+ { "dump-mdir", 1, NULL, 'D'},
+ { "break", 1, NULL, 'b'},
+ { "dump", 1, NULL, 'd'},
+// { "sandbox-mdir",1, NULL, 's'},
+ { NULL, 0, NULL, 0}
+};
+static const char *get_opt_string = "hR:D:b:d:";
+
+static int register_init();
+static int register_add(const char *jobid, char **new_jobid);
+static void get_time(char *s, size_t maxs, double *t);
+static int dump_init(const char *start_jobid, const char *filenmae);
+static int dump_add(const char *filename, const char *jobid);
+static void dump_done();
+
+
+static void handler(int sig) {
+ do_exit = sig;
+ signal(sig, SIG_DFL);
+}
+
+
+static void usage(const char *program) {
+ fprintf(stderr, "Usage: %s [OPTIONS]\n"
+ "\t-R,--reg-mdir\n"
+ "\t-D,--dump-mdir\n"
+// "\t-s,--sandbox-mdir\n"
+ "\t-b,--break speed (jobs/day)\n"
+ "\t-d,--dump dump file\n"
+ , program);
+}
+
+
+int main(int argc, char *argv[]) {
+ char start_jobid[256], stop_jobid[256], *fn;
+ double ts, ts2, last, now;
+ int ret, opt;
+ FILE *f;
+ char *jobid, *dumpfile = NULL;
+
+ while ((opt = getopt_long(argc, argv, get_opt_string, opts, NULL)) != EOF)
+ switch (opt) {
+ case 'h': usage(argv[0]); return 0;
+ case 'R': jpreg_dir = strdup(optarg); break;
+ case 'D': dump_dir = strdup(optarg); break;
+ case 'b': speed = atoi(optarg); if (speed) duration = 24.0*3600.0*1000000.0/speed; break;
+ case 'd': dumpfile = optarg; break;
+ default: printf("opt: %c\n", opt); usage(argv[0]); return 1;
+ }
+
+ get_time(perf_ts, sizeof(perf_ts), &ts);
+ snprintf(start_jobid, sizeof(start_jobid), PERF_JOBID_START_PREFIX "%s", perf_ts);
+ snprintf(stop_jobid, sizeof(stop_jobid), PERF_JOBID_STOP_PREFIX "%s", perf_ts);
+
+ if ((ret = register_init()) != 0) return ret;
+ if ((ret = dump_init(start_jobid, dumpfile)) != 0) return ret;
+ if ((ret = register_add(start_jobid, NULL)) != 0) return ret;
+ if (signal(SIGINT, handler) == SIG_ERR) {
+ ret = errno;
+ fprintf(stderr, "%s: can't set signal handler: %s\n", __FUNCTION__, strerror(errno));
+ return ret;
+ }
+ if (speed) printf("speed: %d jobs/day (delay %lf)\n", speed, duration / 1000000.0);
+ else printf("speed: unlimited\n");
+ printf("dump: %s\n", dumpfile ? dumpfile : "(none)");
+ printf("reg-mdir: %s\n", jpreg_dir);
+ printf("dump-mdir: %s\n", dump_dir);
+ printf("start: %lf\n", ts);
+ printf("%s\n", start_jobid);
+ last = ts;
+ while (!do_exit) {
+ struct timeval tv;
+
+ if ((ret = register_add(NULL, &jobid)) != 0) return ret;
+// printf("%s\n", jobid);
+ if (dumpfile) {
+ if ((ret = dump_add(dumpfile, jobid)) != 0) return ret;
+// printf(" dumped %s\n", dumpfile);
+ }
+ free(jobid);
+ gettimeofday(&tv, NULL);
+ now = tv.tv_sec + (double)tv.tv_usec / 1000000.0;
+ if (now < last + duration) usleep(last + duration - now);
+ last = now;
+ }
+ if ((ret = register_add(stop_jobid, NULL)) != 0) return ret;
+ asprintf(&fn, PERF_STOP_FILE_FORMAT, perf_ts);
+ if ((f = fopen(fn, "wt")) == NULL) {
+ ret = errno;
+ free(fn);
+ fprintf(stderr, "Can' create file '%s': %s\n", fn, strerror(errno));
+ return ret;
+ }
+ free(fn);
+ fprintf(f, "reg-imp\t%d\n", perf_regs);
+ fprintf(f, "dump-imp\t%d\n", perf_dumps);
+ fclose(f);
+ dump_done();
+
+ get_time(NULL, -1, &ts2);
+ printf("stop: %lf\n", ts2);
+ printf("regs: %d (%lf jobs/day)\n", perf_regs, 86400.0 * perf_regs / (ts2-ts));
+ printf("dumps: %d (%lf jobs/day)\n", perf_dumps, 86400.0 * perf_dumps / (ts2-ts));
+ printf("%s\n", stop_jobid);
+
+ return 0;
+}
+
+
+static void get_time(char *s, size_t maxs, double *t) {
+ struct timeval tv;
+ struct tm tm;
+
+ gettimeofday(&tv, NULL);
+ if (t) *t = tv.tv_sec + (double)tv.tv_usec / 1000000.0;
+ gmtime_r(&tv.tv_sec, &tm);
+ if (s && maxs > 0) strftime(s, maxs, "%FT%TZ", &tm);
+}
+
+
+static int register_init() {
+ char *env;
+
+ if (!jpreg_dir) {
+ env = getenv("GLITE_LB_EXPORT_JPREG_MAILDIR");
+ if (env) jpreg_dir = strdup(env);
+ else jpreg_dir = strdup(GLITE_REG_IMPORTER_MDIR);
+ }
+
+
+ // TODO: better from certificate
+ env = getenv("GLITE_USER");
+ if (!env) env = USER;
+ user = strdup(env);
+
+ if (edg_wll_MaildirInit(jpreg_dir) != 0) {
+ fprintf(stderr, "maildir init on %s failed\n", jpreg_dir);
+ return EIO;
+ }
+
+ perf_regs = 0;
+ return 0;
+}
+
+
+static int register_add(const char *jobid, char **new_jobid) {
+ glite_jobid_t j;
+ char *tmpjobid, *msg;
+
+ if (!jobid) {
+ if (glite_jobid_create(BKSERVER, BKPORT, &j) != 0 || (tmpjobid = glite_jobid_unparse(j)) == NULL) {
+ fprintf(stderr, "Can't create jobid\n");
+ return EIO;
+ }
+ glite_jobid_free(j);
+ } else tmpjobid = strdup(jobid);
+ asprintf(&msg, "%s\n%s", tmpjobid, user);
+ if (new_jobid) *new_jobid = tmpjobid;
+ else free(tmpjobid);
+ if (edg_wll_MaildirStoreMsg(jpreg_dir, BKSERVER, msg) != 0) {
+ fprintf(stderr, "Can't store message: %s\n", lbm_errdesc);
+ return EIO;
+ }
+ free(msg);
+
+ perf_regs++;
+ return 0;
+}
+
+
+static int dump_init(const char *start_jobid, const char *filename) {
+ char *env, *ptr, *delim;
+ FILE *f;
+ long ssize;
+ size_t i, dump_maxtokens, size;
+ int ret;
+
+ unlink(PERF_START_FILE);
+
+ dump = NULL;
+ dump_index = NULL;
+ dump_tokens = 0;
+ if (filename) {
+ if ((f = fopen(filename, "rt")) == NULL) {
+ fprintf(stderr, "Can't open '%s': %s\n", filename, strerror(errno));
+ return EIO;
+ }
+ if (fseek(f, 0, SEEK_END) == -1 || (ssize = ftell(f)) == -1 || fseek(f, 0, SEEK_SET) == -1) {
+ fprintf(stderr, "Can't get position in '%s': %s\n", filename, strerror(errno));
+ return EIO;
+ }
+ dump = malloc(size = ssize);
+ if (fread(dump, size, 1, f) != 1) {
+ ret = errno;
+ fprintf(stderr, "Error reading %ld bytes from file: %s\n", ssize, strerror(errno));
+ return ret;
+ }
+ fclose(f);
+
+ dump_maxtokens = 1024;
+ dump_index = malloc(sizeof(char *) * dump_maxtokens);
+ i = 0;
+ ptr = dump;
+ do {
+ if (dump_tokens >= dump_maxtokens) {
+ dump_maxtokens *= 2;
+ dump_index = realloc(dump_index, sizeof(char *) * dump_maxtokens);
+ }
+ delim = strstr(ptr, "DG.JOBID=\"");
+ if (delim != ptr) {
+ dump_index[dump_tokens++] = ptr;
+ if (delim) {
+ delim[10] = '\0';
+ ptr = delim + 11;
+ } else ptr = NULL;
+ }
+ if (ptr) ptr = strchr(ptr, '\"');
+ } while (ptr && ptr[0]);
+ }
+//for (i = 0; i < dump_tokens; i++) printf("####%s\n", dump_index[i]);
+
+ if (!dump_dir) {
+ // wrong purging to GLITE_LB_EXPORT_DUMPDIR on older versions
+ env = getenv("GLITE_LB_EXPORT_PURGEDIR");
+ if (env) dump_dir = strdup(env);
+ else dump_dir = strdup(EDG_DUMP_STORAGE);
+ }
+ mkdir(dump_dir, 0755);
+ perf_dumps = 0;
+
+ if ((f = fopen(PERF_START_FILE, "wt")) == NULL) {
+ fprintf(stderr, "Can't create file '" PERF_START_FILE "': %s\n", strerror(errno));
+ return EIO;
+ }
+ if (start_jobid) fprintf(f, "%s\n", start_jobid);
+ fclose(f);
+
+ return 0;
+}
+
+
+static int dump_add(const char *filename, const char *jobid) {
+ char *fn;
+ int ret;
+ size_t i;
+ FILE *f;
+
+ ret = 0;
+ asprintf(&fn, "%s/mill-test-%s-%06d", dump_dir, perf_ts, perf_dumps);
+ if ((f = fopen(fn , "wt")) == NULL) {
+ ret = errno;
+ fprintf(stderr, "Can't create file '%s': %s\n", fn, strerror(errno));
+ goto err;
+ }
+ for (i = 0; i < dump_tokens; i++) {
+ if (fputs(dump_index[i], f) == EOF || (i + 1 < dump_tokens && (fputs(jobid, f) == EOF))) {
+ ret = errno;
+ fprintf(stderr, "Can't write to '%s': %s\n", fn, strerror(errno));
+ goto err_close;
+ }
+ }
+
+ perf_dumps++;
+err_close:
+ fclose(f);
+err:
+ free(fn);
+ return ret;
+}
+
+
+static void dump_done() {
+ free(dump_index);
+ free(dump);
+ dump_tokens = 0;
+}
#include <string.h>
#include <assert.h>
#include <linux/limits.h>
+#include <sys/time.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <sys/stat.h>
#include <syslog.h>
#include <fcntl.h>
#include <libgen.h>
+#include <ctype.h>
#include "glite/lb/lb_maildir.h"
#include "glite/security/glite_gscompat.h"
#include "globus_ftp_client.h"
+#include "jp_client.h"
+#include "jpimporter.h"
#if GSOAP_VERSION <= 20602
#define soap_call___jpsrv__RegisterJob soap_call___ns1__RegisterJob
char *val;
} msg_pattern_t;
-
#ifndef dprintf
#define dprintf(FMT, ARGS...) { if (debug) printf(FMT, ##ARGS); }
#endif
#define GLITE_JPIMPORTER_PIDFILE "/var/run/glite-jpimporter.pid"
#endif
-#ifndef GLITE_REG_IMPORTER_MDIR
-#define GLITE_REG_IMPORTER_MDIR "/tmp/jpreg"
-#endif
-
-#ifndef GLITE_DUMP_IMPORTER_MDIR
-#define GLITE_DUMP_IMPORTER_MDIR "/tmp/jpdump"
-#endif
-
-#ifndef GLITE_SANDBOX_IMPORTER_MDIR
-#define GLITE_SANDBOX_IMPORTER_MDIR "/tmp/jpsandbox"
-#endif
-
#ifndef GLITE_JPPS
#define GLITE_JPPS "http://localhost:8901"
#endif
-
#define MAX_REG_CONNS 500
#define JPPS_NO_RESPONSE_TIMEOUT 120
-
+#define JPREG_REPEAT_TIMEOUT 300
+#define JPREG_GIVUP_TIMEOUT 3000
+#define JP_REPEAT_TIMEOUT 360
+#define JP_GIVUP_TIMEOUT 3600
static int debug = 0;
static int die = 0;
*server_key = NULL,
*cadir = NULL;
static edg_wll_GssCred mycred = NULL;
-static char *mysubj;
-struct timeval to = {JPPS_NO_RESPONSE_TIMEOUT, 0};
+#ifdef JP_PERF
+typedef struct {
+ char *id, *name;
+ long int count, limit;
+ double start, end;
+} perf_t;
+
+int sink = 0;
+perf_t perf = {name:NULL,};
+#endif
static struct option opts[] = {
{ "pidfile", 1, NULL, 'i'},
{ "poll", 1, NULL, 't'},
{ "store", 1, NULL, 'S'},
+ { "store", 1, NULL, 'S'},
+#ifdef JP_PERF
+ { "perf-sink", 1, NULL, 'K'},
+#endif
{ NULL, 0, NULL, 0}
};
-static const char *get_opt_string = "hgp:r:d:s:i:t:c:k:C:";
+static const char *get_opt_string = "hgp:r:d:s:i:t:c:k:C:"
+#ifdef JP_PERF
+ "K:"
+#endif
+;
#include "glite/jp/ws_fault.c"
+#ifdef JP_PERF
+static void stats_init(perf_t *perf, const char *name);
+static void stats_set_jobid(perf_t *perf, const char *jobid);
+static void stats_get_limit(perf_t *perf, const char *name);
+static void stats_done(perf_t *perf);
+#endif
+
static void usage(char *me)
{
fprintf(stderr,"usage: %s [option]\n"
"\t-d, --dump-mdir path to the 'LB maildir' subtree for LB dumps\n"
"\t-s, --sandbox-mdir path to the 'LB maildir' subtree for input/output sandboxes\n"
"\t-i, --pidfile file to store master pid\n"
- "\t-t, --poll maildir polling interval (in seconds)\n",
- "\t-S, --store keep uploaded jobs in this directory\n",
- me);
+ "\t-t, --poll maildir polling interval (in seconds)\n"
+ "\t-S, --store keep uploaded jobs in this directory\n"
+#ifdef JP_PERF
+ "\t-K, --perf-sink 1=stats, 2=without WS calls, 3=stats+without WS\n"
+#endif
+ , me);
}
static void catchsig(int sig)
die = sig;
}
-static void catch_chld(int sig)
+static void catch_chld(int sig __attribute__((unused)))
{
child_died = 1;
}
static int sandbox_importer(void);
static int parse_msg(char *, msg_pattern_t []);
static int gftp_put_file(const char *, int);
-
+static int refresh_connection(struct soap *soap);
int main(int argc, char *argv[])
char *name,
pidfile[PATH_MAX] = GLITE_JPIMPORTER_PIDFILE;
glite_gsplugin_Context plugin_ctx;
- edg_wll_GssCred cred;
name = strrchr(argv[0],'/');
if (name) name++; else name = argv[0];
case 'd': strcpy(dump_mdir, optarg); break;
case 's': strcpy(sandbox_mdir, optarg); break;
case 'i': strcpy(pidfile, optarg); break;
+#ifdef JP_PERF
+ case 'K': sink = atoi(optarg); break;
+#endif
case '?': usage(name); return 1;
}
if ( optind < argc ) { usage(name); return 1; }
" - unable to watch them for changes!\n", argv[0]);
if ( cadir ) setenv("X509_CERT_DIR", cadir, 1);
edg_wll_gss_watch_creds(server_cert, &cert_mtime);
- if ( !edg_wll_gss_acquire_cred_gsi(server_cert, server_key, &mycred, &mysubj, &gss_code) ) {
- dprintf("[master] Server identity: %s\n", mysubj);
+ if ( !edg_wll_gss_acquire_cred_gsi(server_cert, server_key, &mycred, &gss_code) ) {
+ dprintf("[master] Server identity: %s\n", mycred->name);
} else {
char *errmsg;
edg_wll_gss_get_error(&gss_code, "edg_wll_gss_acquire_cred_gsi()", &errmsg);
soap_set_namespaces(soap, jpps__namespaces);
glite_gsplugin_init_context(&plugin_ctx);
- if (edg_wll_gss_acquire_cred_gsi(server_cert, server_key, &cred, NULL, NULL) != 0) {
- perror("can't acquire credentials");
- exit(1);
- }
- glite_gsplugin_set_timeout(plugin_ctx, &to);
- glite_gsplugin_set_credential(plugin_ctx, mycred);
-
soap_register_plugin_arg(soap, glite_gsplugin,plugin_ctx);
if ( (reg_pid = slave(reg_importer, "reg-imp")) < 0 ) {
dprintf("[%s] slave started - pid [%d]\n", name, getpid());
- while ( !die && conn_cnt < MAX_REG_CONNS ) {
+#ifdef JP_PERF
+ while ( !die && (conn_cnt < MAX_REG_CONNS || (sink & 1)) ) {
+#else
+ while ( !die && (conn_cnt < MAX_REG_CONNS) ) {
+#endif
int ret = fn();
if ( ret > 0 ) conn_cnt++;
}
if ( die ) {
- dprintf("[%s] Terminating on signal %d\n", name, getpid(), die);
+ dprintf("[%s] %d: Terminating on signal %d\n", name, getpid(), die);
if ( !debug ) syslog(LOG_INFO, "Terminating on signal %d", die);
}
dprintf("[%s] Terminating after %d connections\n", name, conn_cnt);
*aux;
if ( readnew ) ret = edg_wll_MaildirTransStart(reg_mdir, &msg, &fname);
- else ret = edg_wll_MaildirRetryTransStart(reg_mdir, (time_t)30, (time_t)500, &msg, &fname);
+ else ret = edg_wll_MaildirRetryTransStart(reg_mdir, (time_t)JPREG_REPEAT_TIMEOUT, (time_t)JPREG_GIVUP_TIMEOUT, &msg, &fname);
if ( !ret ) {
readnew = !readnew;
if ( readnew ) ret = edg_wll_MaildirTransStart(reg_mdir, &msg, &fname);
- else ret = edg_wll_MaildirRetryTransStart(reg_mdir, (time_t)30, (time_t)500, &msg, &fname);
+ else ret = edg_wll_MaildirRetryTransStart(reg_mdir, (time_t)JPREG_REPEAT_TIMEOUT, (time_t)JPREG_GIVUP_TIMEOUT, &msg, &fname);
if ( !ret ) {
readnew = !readnew;
return 0;
dprintf("[%s] edg_wll_MaildirTransStart: %s (%s)\n", name, strerror(errno), lbm_errdesc);
if ( !debug ) syslog(LOG_ERR, "edg_wll_MaildirTransStart: %s (%s)", strerror(errno), lbm_errdesc);
return -1;
- }
-
- if ( ret < 0 ) {
- dprintf("[%s] edg_wll_MaildirTransStart: %s (%s)\n", name, strerror(errno), lbm_errdesc);
- if ( !debug ) syslog(LOG_ERR, "edg_wll_MaildirTransStart: %s (%s)", strerror(errno), lbm_errdesc);
- return -1;
} else if ( ret > 0 ) {
dprintf("[%s] JP registration request received\n", name);
if ( !debug ) syslog(LOG_INFO, "JP registration request received\n");
in.owner = aux;
dprintf("[%s] Registering '%s'\n", name, msg);
if ( !debug ) syslog(LOG_INFO, "Registering '%s'\n", msg);
+#ifdef JP_PERF
+ if ((sink & 1)) {
+ if (strncasecmp(msg, PERF_JOBID_START_PREFIX, sizeof(PERF_JOBID_START_PREFIX) - 1) == 0) {
+ stats_init(&perf, name);
+ stats_set_jobid(&perf, msg);
+ }
+ if (perf.name && !perf.limit) stats_get_limit(&perf, name);
+ }
+ if (!(sink & 2)) {
+#endif
+ refresh_connection(soap);
ret = soap_call___jpsrv__RegisterJob(soap, jpps, "", &in, &empty);
if ( (ret = check_soap_fault(soap, ret)) ) break;
+#ifdef JP_PERF
+ } else ret = 0;
+ if (perf.name && ret == 0) {
+ perf.count++;
+ if (perf.limit) {
+ dprintf("[%s statistics] done %ld/%ld\n", name, perf.count, perf.limit);
+ if (perf.count >= perf.limit) stats_done(&perf);
+ } else
+ dprintf("[%s statistics] done %ld/no limit\n", name, perf.count);
+ }
+#endif
} while (0);
edg_wll_MaildirTransEnd(reg_mdir, fname, ret? LBMD_TRANS_FAILED_RETRY: LBMD_TRANS_OK);
free(fname);
static int readnew = 1;
char *msg = NULL,
*fname = NULL,
- *aux,
*bname;
char fspec[PATH_MAX];
int ret;
if ( readnew ) ret = edg_wll_MaildirTransStart(dump_mdir, &msg, &fname);
- else ret = edg_wll_MaildirRetryTransStart(dump_mdir, (time_t)60, (time_t)600, &msg, &fname);
+ else ret = edg_wll_MaildirRetryTransStart(dump_mdir, (time_t)JP_REPEAT_TIMEOUT, (time_t)JP_GIVUP_TIMEOUT, &msg, &fname);
if ( !ret ) {
readnew = !readnew;
if ( readnew ) ret = edg_wll_MaildirTransStart(dump_mdir, &msg, &fname);
- else ret = edg_wll_MaildirRetryTransStart(dump_mdir, (time_t)60, (time_t)600, &msg, &fname);
+ else ret = edg_wll_MaildirRetryTransStart(dump_mdir, (time_t)JP_REPEAT_TIMEOUT, (time_t)JP_GIVUP_TIMEOUT, &msg, &fname);
if ( !ret ) {
readnew = !readnew;
return 0;
su_in.contentType = "text/lb";
dprintf("[%s] Importing LB dump file '%s'\n", name, tab[_file].val);
if ( !debug ) syslog(LOG_INFO, "Importing LB dump file '%s'\n", msg);
+#ifdef JP_PERF
+ if ((sink & 1)) {
+ /* statistics started by file, ended by count limit (from the appropriate result fikle) */
+ FILE *f;
+ char item[200];
+
+ /* starter */
+ if (!perf.name) {
+ f = fopen(PERF_START_FILE, "rt");
+ if (f) {
+ stats_init(&perf, name);
+ fscanf(f, "%s", item);
+ fclose(f);
+ unlink(PERF_START_FILE);
+ stats_set_jobid(&perf, item);
+ } else
+ dprintf("[%s statistics]: not started/too much dumps: %s\n", name, strerror(errno));
+ }
+ if (perf.name && !perf.limit) stats_get_limit(&perf, name);
+ }
+ if (!(sink & 2)) {
+#endif
+ refresh_connection(soap);
ret = soap_call___jpsrv__StartUpload(soap, tab[_jpps].val?:jpps, "", &su_in, &su_out);
if ( (ret = check_soap_fault(soap, ret)) ) break;
dprintf("[%s] Destination: %s\n\tCommit before: %s\n", name, su_out.destination, ctime(&su_out.commitBefore));
close(fhnd);
dprintf("[%s] File sent, commiting the upload\n", name);
cu_in.destination = su_out.destination;
+ refresh_connection(soap);
ret = soap_call___jpsrv__CommitUpload(soap, tab[_jpps].val?:jpps, "", &cu_in, &empty);
if ( (ret = check_soap_fault(soap, ret)) ) break;
dprintf("[%s] Dump upload succesfull\n", name);
+#ifdef JP_PERF
+ } else ret = 0;
+ if (perf.name && ret == 0) {
+ perf.count++;
+ if (perf.limit) {
+ dprintf("[%s statistics] done %ld/%ld\n", name, perf.count, perf.limit);
+ if (perf.count >= perf.limit) stats_done(&perf);
+ } else
+ dprintf("[%s statistics] done %ld/no limit\n", name, perf.count);
+ }
+#endif
if (store && *store) {
bname = strdup(tab[_file].val);
snprintf(fspec, sizeof fspec, "%s/%s", store, basename(bname));
struct _jpelem__CommitUploadResponse empty;
static int readnew = 1;
char *msg = NULL,
- *fname = NULL,
- *aux;
+ *fname = NULL;
int ret;
int fhnd;
msg_pattern_t tab[] = {
if ( readnew ) ret = edg_wll_MaildirTransStart(sandbox_mdir, &msg, &fname);
- else ret = edg_wll_MaildirRetryTransStart(sandbox_mdir, (time_t)60, (time_t) 600,&msg, &fname);
+ else ret = edg_wll_MaildirRetryTransStart(sandbox_mdir, (time_t)JP_REPEAT_TIMEOUT, (time_t)JP_GIVUP_TIMEOUT ,&msg, &fname);
if ( !ret ) {
readnew = !readnew;
if ( readnew ) ret = edg_wll_MaildirTransStart(sandbox_mdir, &msg, &fname);
- else ret = edg_wll_MaildirRetryTransStart(sandbox_mdir, (time_t)60, (time_t) 600,&msg, &fname);
+ else ret = edg_wll_MaildirRetryTransStart(sandbox_mdir, (time_t)JP_REPEAT_TIMEOUT, (time_t)JP_GIVUP_TIMEOUT ,&msg, &fname);
if ( !ret ) {
readnew = !readnew;
return 0;
su_in.contentType = "tar/lb";
dprintf("[%s] Importing LB sandbox tar file '%s'\n", name, tab[_file].val);
if ( !debug ) syslog(LOG_INFO, "Importing LB sandbox tar file '%s'\n", msg);
+#ifdef JP_PERF
+ if (!(sink & 2)) {
+#endif
+ refresh_connection(soap);
ret = soap_call___jpsrv__StartUpload(soap, tab[_jpps].val?:jpps, "", &su_in, &su_out);
ret = check_soap_fault(soap, ret);
/* XXX: grrrrrrr! test it!!!*/
close(fhnd);
dprintf("[%s] File sent, commiting the upload\n", name);
cu_in.destination = su_out.destination;
+ refresh_connection(soap);
ret = soap_call___jpsrv__CommitUpload(soap, tab[_jpps].val?:jpps, "", &cu_in, &empty);
if ( (ret = check_soap_fault(soap, ret)) ) break;
dprintf("[%s] Dump upload succesfull\n", name);
+#ifdef JP_PERF
+ } else ret = 0;
+#endif
} while (0);
edg_wll_MaildirTransEnd(sandbox_mdir, fname, ret? LBMD_TRANS_FAILED_RETRY: LBMD_TRANS_OK);
put_file_err("Could not initialise operation attributes");
if ( globus_ftp_client_operationattr_set_authorization(
- &op_attr, server_cert? mycred: GSS_C_NO_CREDENTIAL,
+ &op_attr, server_cert? mycred->gss_cred: GSS_C_NO_CREDENTIAL,
NULL, "", 0, NULL) != GLOBUS_SUCCESS )
put_file_err("Could not set authorization procedure");
return (gError == GLOBUS_TRUE)? 1: 0;
}
+
+static int refresh_connection(struct soap *soap) {
+ struct timeval to = {JPPS_NO_RESPONSE_TIMEOUT, 0};
+ edg_wll_GssCred newcred;
+ edg_wll_GssStatus gss_code;
+ glite_gsplugin_Context gp_ctx;
+
+ gp_ctx = glite_gsplugin_get_context(soap);
+ glite_gsplugin_set_timeout(gp_ctx, &to);
+
+ switch ( edg_wll_gss_watch_creds(server_cert, &cert_mtime) ) {
+ case 0: break;
+ case 1:
+ if ( !edg_wll_gss_acquire_cred_gsi(server_cert, server_key, &newcred, &gss_code) ) {
+ dprintf("[%s] reloading credentials successful\n", name);
+ edg_wll_gss_release_cred(&mycred, &gss_code);
+ mycred = newcred;
+ glite_gsplugin_set_credential(gp_ctx, newcred);
+ } else { dprintf("[%s] reloading credentials failed, using old ones\n", name); }
+ break;
+ case -1: dprintf("[%s] edg_wll_gss_watch_creds failed\n", name); break;
+ }
+
+ return 0;
+}
+
+
+#ifdef JP_PERF
+static void stats_init(perf_t *perf, const char *name) {
+ struct timeval tv;
+
+ memset(perf, 0, sizeof *perf);
+ perf->count = 0;
+ perf->name = strdup(name);
+ gettimeofday(&tv, NULL);
+ perf->start = tv.tv_sec + (double)tv.tv_usec / 1000000.0;
+ dprintf("[%s statistics] start detected\n", name);
+}
+
+static void stats_set_jobid(perf_t *perf, const char *jobid) {
+ perf->id = strdup(jobid + sizeof(PERF_JOBID_START_PREFIX) - 1);
+ dprintf("[%s statistics] ID %s\n", perf->name, perf->id);
+}
+
+static void stats_get_limit(perf_t *perf, const char *name) {
+ FILE *f;
+ char *fn, item[200];
+ int count;
+
+ /* stopper */
+ asprintf(&fn, PERF_STOP_FILE_FORMAT, perf->id);
+ f = fopen(fn, "rt");
+ free(fn);
+ if (f) {
+ fscanf(f, "%s\t%d", item, &count);
+ if (strcasecmp(item, name) != 0) fscanf(f, "%s\t%d", item, &count);
+ dprintf("[%s statistics] expected %d %s\n", name, count, item);
+ fclose(f);
+ perf->limit = count;
+ }
+}
+
+static void stats_done(perf_t *perf) {
+ struct timeval tv;
+
+ gettimeofday(&tv, NULL);
+ perf->end = tv.tv_sec + (double)tv.tv_usec / 1000000.0;
+ dprintf("[%s statistics] %s\n", perf->name, perf->id);
+ dprintf("[%s statistics] start: %lf\n", perf->name, perf->start);
+ dprintf("[%s statistics] stop: %lf\n", perf->name, perf->end);
+ dprintf("[%s statistics] count: %ld (%lf job/day)\n", perf->name, perf->count, 86400.0 * perf->count / (perf->end - perf->start));
+ free(perf->id);
+ free(perf->name);
+ memset(perf, 0, sizeof *perf);
+}
+#endif
+
/* XXX: we don't use it */
SOAP_NMAC struct Namespace namespaces[] = { {NULL,NULL} };