From adb590e0b0beaa84da7175580259beff4d465d56 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Jan=20Posp=C3=AD=C5=A1il?= Date: Sun, 10 Jun 2007 14:40:27 +0000 Subject: [PATCH] added purge/dump/load, no build yet --- org.glite.lb.utils/src/dump.c | 230 +++++++++++++++++++ org.glite.lb.utils/src/dump_exporter.c | 331 ++++++++++++++++++++++++++++ org.glite.lb.utils/src/load.c | 208 ++++++++++++++++++ org.glite.lb.utils/src/purge.c | 389 +++++++++++++++++++++++++++++++++ 4 files changed, 1158 insertions(+) create mode 100644 org.glite.lb.utils/src/dump.c create mode 100644 org.glite.lb.utils/src/dump_exporter.c create mode 100644 org.glite.lb.utils/src/load.c create mode 100644 org.glite.lb.utils/src/purge.c diff --git a/org.glite.lb.utils/src/dump.c b/org.glite.lb.utils/src/dump.c new file mode 100644 index 0000000..5e87516 --- /dev/null +++ b/org.glite.lb.utils/src/dump.c @@ -0,0 +1,230 @@ +#ident "$Header$" + + +#include +#include +#include +#include +#include +#include + +#include + +#define CLIENT_SBIN_PROG + +#include "glite/lb/context-int.h" +#include "glite/lb/ulm_parse.h" +#include "glite/lb/xml_parse.h" +#include "glite/lb/mini_http.h" + +#include "query.h" +#include "consumer.h" + +#define dprintf(x) { if (debug) printf x; } + +static const char rcsid[] = "@(#)$Id$"; + +static int debug=0; + +static void printerr(edg_wll_Context ctx); + +static struct option opts[] = { + { "from", required_argument, NULL, 'f'}, + { "to", required_argument, NULL, 't'}, + { "help", no_argument, NULL, 'h' }, + { "version", no_argument, NULL, 'v' }, + { "debug", no_argument, NULL, 'd' }, + { "server", required_argument, NULL, 'm' }, + { NULL, no_argument, NULL, 0 } +}; + +static void usage(char *me) +{ + fprintf(stderr,"usage: %s [option]\n" + " -f, --from YYYYMMDDHHmmss beginning of the time interval for events to be dumped\n" + " -t, --to YYYYMMDDHHmmss end of the time interval for events to be dumped\n" + " -h, --help display this help\n" + " -v, --version display version\n" + " -d, --debug diagnostic output\n" + " -m, --server L&B server machine name\n", + me); +} + +int main(int argc,char *argv[]) +{ + edg_wll_DumpRequest *request; + edg_wll_DumpResult *result; + char *server = NULL; + char date[ULM_DATE_STRING_LENGTH+1]; + + char *me; + int opt; + edg_wll_Context ctx; + + /* initialize request to server defaults */ + request = (edg_wll_DumpRequest *) calloc(1,sizeof(edg_wll_DumpRequest)); + request->from = EDG_WLL_DUMP_LAST_END; + request->to = EDG_WLL_DUMP_NOW; + + /* initialize result */ + result = (edg_wll_DumpResult *) calloc(1,sizeof(edg_wll_DumpResult)); + + me = strrchr(argv[0],'/'); + if (me) me++; else me=argv[0]; + + /* get arguments */ + while ((opt = getopt_long(argc,argv,"f:t:m:dvh",opts,NULL)) != EOF) { + + switch (opt) { + + case 'f': request->from = (time_t) edg_wll_ULMDateToDouble(optarg); break; + case 't': request->to = (time_t) edg_wll_ULMDateToDouble(optarg); break; + case 'm': server = optarg; break; + case 'd': debug = 1; break; + case 'v': fprintf(stdout,"%s:\t%s\n",me,rcsid); exit(0); + case 'h': + case '?': usage(me); return 1; + } + } + + /* Initialize Globus common module */ + dprintf(("Initializing Globus common module...")); + if (globus_module_activate(GLOBUS_COMMON_MODULE) != GLOBUS_SUCCESS) { + dprintf(("no.\n")); + fprintf(stderr,"Unable to initialize Globus common module\n"); + } else { + dprintf(("yes.\n")); + } + + /* check request */ + if (debug) { + printf("Dump request:\n"); + if (request->from < 0) { + printf("- from: %ld.\n",(long) request->from); + } else { + if (edg_wll_ULMTimevalToDate(request->from,0,date) != 0) { + fprintf(stderr,"Error parsing 'from' argument.\n"); + goto main_end; + } + printf("- from: %ld (i.e. %s).\n",(long) request->from,date); + } + if (request->to < 0) { + printf("- to: %ld.\n",(long) request->to); + } else { + if (edg_wll_ULMTimevalToDate(request->to,0,date) != 0) { + fprintf(stderr,"Error parsing 'to' argument.\n"); + goto main_end; + } + printf("- to: %ld (i.e. %s).\n",(long) request->to,date); + } + } + + /* initialize context */ + edg_wll_InitContext(&ctx); + if ( server ) + { + char *p = strchr(server, ':'); + if ( p ) + { + edg_wll_SetParam(ctx, EDG_WLL_PARAM_QUERY_SERVER_PORT, atoi(p+1)); + *p = 0; + } + edg_wll_SetParam(ctx, EDG_WLL_PARAM_QUERY_SERVER, server); + } + + /* that is the DumpEvents */ + dprintf(("Running the edg_wll_DumpEvents...\n")); + if (edg_wll_DumpEvents(ctx, request, result) != 0) { + fprintf(stderr,"Error running the edg_wll_DumpEvents().\n"); + printerr(ctx); + switch ( edg_wll_Error(ctx, NULL, NULL) ) + { + case ENOENT: + case EPERM: + case EINVAL: + break; + default: + goto main_end; + } + } + + /* examine the result */ + dprintf(("Examining the result of edg_wll_DumpEvents...\n")); + printf("Dump result:\n"); + if (result->server_file) { + printf("- The jobs were dumped to the file '%s' at the server.\n",result->server_file); + } else { + printf("- The jobs were not dumped.\n"); + } + if (edg_wll_ULMTimevalToDate(result->from,0,date) != 0) { + fprintf(stderr,"Error parsing 'from' argument.\n"); + goto main_end; + } + printf("- from: %ld (i.e. %s).\n",(long) result->from,date); + if (edg_wll_ULMTimevalToDate(result->to,0,date) != 0) { + fprintf(stderr,"Error parsing 'to' argument.\n"); + goto main_end; + } + printf("- to: %ld (i.e. %s).\n",(long) result->to,date); + +main_end: + dprintf(("End.\n")); + if (request) free(request); + if (result) free(result); + edg_wll_FreeContext(ctx); + return 0; +} + + +static void printerr(edg_wll_Context ctx) +{ + char *errt,*errd; + + edg_wll_Error(ctx,&errt,&errd); + fprintf(stderr,"%s (%s)\n",errt,errd); +} + + +static const char* const request_headers[] = { + "Cache-Control: no-cache", + "Accept: application/x-dglb", + "User-Agent: edg_wll_Api/" PROTO_VERSION "/" COMP_PROTO, + "Content-Type: application/x-dglb", + NULL +}; + +int edg_wll_DumpEvents( + edg_wll_Context ctx, + const edg_wll_DumpRequest *request, + edg_wll_DumpResult *result) +{ + int error; + char *send_mess, + *response = NULL, + *recv_mess = NULL; + + edg_wll_DumpRequestToXML(ctx, request, &send_mess); + + ctx->p_tmp_timeout = ctx->p_query_timeout; + if (ctx->p_tmp_timeout.tv_sec < 600) ctx->p_tmp_timeout.tv_sec = 600; + + if (set_server_name_and_port(ctx, NULL)) + goto edg_wll_dumpevents_end; + + error = edg_wll_http_send_recv(ctx, + "POST /dumpRequest HTTP/1.1", request_headers, send_mess, + &response, NULL, &recv_mess); + if ( error != 0 ) + goto edg_wll_dumpevents_end; + + if (http_check_status(ctx, response, &recv_mess)) + goto edg_wll_dumpevents_end; + + edg_wll_ParseDumpResult(ctx, recv_mess, result); + +edg_wll_dumpevents_end: + if (response) free(response); + if (recv_mess) free(recv_mess); + if (send_mess) free(send_mess); + return edg_wll_Error(ctx,NULL,NULL); +} diff --git a/org.glite.lb.utils/src/dump_exporter.c b/org.glite.lb.utils/src/dump_exporter.c new file mode 100644 index 0000000..97a63ab --- /dev/null +++ b/org.glite.lb.utils/src/dump_exporter.c @@ -0,0 +1,331 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + + +#include "glite/lb/context.h" +#include "glite/lb/events.h" +#include "glite/lb/events_parse.h" +#include "glite/lb/lb_maildir.h" + + +#define DUMP_FILE_STORE_PREFIX "/tmp" +#define LB_MAILDIR_PATH "/tmp/dumpinfo_lbmd" + +#define KEYNAME_JOBID "jobid " +#define KEYNAME_FILE "file " +#define KEYNAME_JPPS "jpps " + +typedef struct _buffer_t { + char *str; + char *eol; + int sz; + int use; +} buffer_t; + +typedef struct _dump_storage_t { + char *job; + char *fname; + int fhnd; +} dump_storage_t; + +static const char *optstr = "d:s:j:m:h"; + +static struct option opts[] = { + { "help", 0, NULL, 'h'}, + { "dump", 0, NULL, 'd'}, + { "store", 0, NULL, 's'}, + { "jpps", 0, NULL, 'j'}, + { "lbmaildir", 0, NULL, 'm'}, + { NULL, 0, NULL, 0} +}; + +void usage(char *me) +{ + fprintf(stderr, "usage: %s [option]\n" + "\t-h, --help Shows this screen.\n" + "\t-d, --dump Dump file location.\n" + "\t-s, --store New dump files storage.\n" + "\t-j, --jpps Target JPPS.\n" + "\t-m, --lbmaildir LB maildir path.\n" + , me); +} + + +static int read_line(int, buffer_t *, char **); +static dump_storage_t *dump_storage_find(dump_storage_t *, char *); +static dump_storage_t *dump_storage_add(dump_storage_t **, char *, char *, int); +static void dump_storage_free(dump_storage_t *); + + +int main(int argc, char **argv) +{ + edg_wll_Context ctx; + edg_wll_Event *ev = NULL; + dump_storage_t *dstorage = NULL, + *st; + buffer_t buf; + char *store_pref = DUMP_FILE_STORE_PREFIX, + *lb_maildir = LB_MAILDIR_PATH, + *jpps = NULL, + *name, + *fname, + *ln; + int fhnd, + opt, ret, + msg_format_sz; + + + name = strrchr(argv[0], '/'); + if ( name ) name++; else name = argv[0]; + + fname = NULL; + while ( (opt = getopt_long(argc, argv, optstr, opts, NULL)) != EOF ) + switch ( opt ) { + case 'd': fname = optarg; break; + case 's': store_pref = optarg; break; + case 'j': jpps = optarg; break; + case 'm': lb_maildir = optarg; break; + case 'h': usage(name); return 0; + case '?': usage(name); return 1; + } + + msg_format_sz = sizeof(KEYNAME_JOBID) + 1 + + sizeof(KEYNAME_FILE) + 1 + + (jpps? sizeof(KEYNAME_JPPS) + 1: 0); + + if ( fname ) { + if ( (fhnd = open(fname, O_RDONLY)) < 0 ) { + perror("Opening input file"); + exit(1); + } + } else fhnd = 0; + + if ( edg_wll_MaildirInit(lb_maildir) ) { + perror(lbm_errdesc); + exit(1); + } + + +#ifndef cleanup +# define cleanup(r) { ret = (r); goto cleanup_lbl; } +#endif + + ret = 0; + memset(&buf, 0, sizeof(buf)); + edg_wll_InitContext(&ctx); + while ( 1 ) { + int rl_ret, + written, + lnsz, ct; + char *jobid, + *unique; + + if ( (rl_ret = read_line(fhnd, &buf, &ln)) < 0 ) { + perror("reading input file"); + cleanup(1); + } + if ( !ln ) break; + + if (*ln == 0) continue; + + if ( edg_wll_ParseEvent(ctx, ln, &ev) != 0 ) { + cleanup(1); + } + if ( !(jobid = edg_wlc_JobIdUnparse(ev->any.jobId)) ) { + perror("Can't unparse jobid from event"); + cleanup(1); + } + if ( !(unique = edg_wlc_JobIdGetUnique(ev->any.jobId)) ) { + perror("Can't unparse jobid from event"); + cleanup(1); + } + + if ( !(st = dump_storage_find(dstorage, jobid)) ) { + int fd, i; + char fname[PATH_MAX]; + struct timeval tv; + + i = 0; + while ( 1 ) { + if ( ++i > 10 ) { + errno = ECANCELED; + perror("Can't create dump file - max tries limit reached "); + cleanup(1); + } + gettimeofday(&tv, NULL); + snprintf(fname, PATH_MAX, "%s/%s.%ld_%ld", store_pref, unique, tv.tv_sec, tv.tv_usec); + if ( (fd = open(fname, O_CREAT|O_EXCL|O_RDWR, 00640)) < 0 ) { + if ( errno == EEXIST ) { usleep(1000); continue; } + perror(fname); + cleanup(1); + } + break; + } + + if ( !(st = dump_storage_add(&dstorage, jobid, fname, fd)) ) { + perror("Can't record dump informations"); + cleanup(1); + } + } + free(jobid); + free(unique); + + lnsz = strlen(ln); + ln[lnsz++] = '\n'; + written = 0; + while ( written < lnsz ) { + if ( (ct = write(st->fhnd, ln+written, lnsz-written)) < 0 ) { + if ( errno == EINTR ) { errno = 0; continue; } + perror(fname); + cleanup(1); + } + written += lnsz; + } + + + if ( !rl_ret ) break; + edg_wll_FreeEvent(ev); ev = NULL; + } + + /* store info in lb_maildir */ + for ( st = dstorage; st && st->job; st++ ) { + char *msg; + if ( !(msg = malloc(msg_format_sz + strlen(st->fname) + strlen(st->job))) ) { + perror("allocating message"); + cleanup(1); + } + if ( jpps ) + /* XXX: used to be 5x %s here, God knows why ... */ + sprintf(msg, "%s%s\n%s%s%s%s", + KEYNAME_JOBID, st->job, KEYNAME_FILE, st->fname, KEYNAME_JPPS, jpps); + else + sprintf(msg, "%s%s\n%s%s", + KEYNAME_JOBID, st->job, KEYNAME_FILE, st->fname); + if ( edg_wll_MaildirStoreMsg(lb_maildir, "localhost", msg) < 0 ) { + perror(lbm_errdesc); + exit(1); + } + free(msg); + } + +cleanup_lbl: + edg_wll_FreeContext(ctx); + if ( ev ) edg_wll_FreeEvent(ev); + for ( st = dstorage; st && st->job; st++ ) if ( st->fhnd > 0 ) close(st->fhnd); + dump_storage_free(dstorage); + close(fhnd); + free(buf.str); + + return (ret); +} + + +static dump_storage_t *dump_storage_find(dump_storage_t *st, char *job) +{ + while ( st && st->job ) { + if ( !strcmp(job, st->job) ) break; + st++; + } + if ( st && st->job ) return st; + return NULL; +} + +static dump_storage_t *dump_storage_add(dump_storage_t **st, char *job, char *fname, int fhnd) +{ + dump_storage_t *tmp; + int ct; + + for ( ct = 0, tmp = *st; tmp && tmp->job; ct++, tmp++ ) ; + if ( ct ) tmp = realloc(*st, (ct+2)*sizeof(*tmp)); + else tmp = calloc(2, sizeof(*tmp)); + if ( !tmp ) return NULL; + + *st = tmp; + while ( tmp && tmp->job ) tmp++; + + if ( !(tmp->job = strdup(job)) ) return NULL; + if ( !(tmp->fname = strdup(fname)) ) { free(tmp->job); return NULL; } + tmp->fhnd = fhnd; + (tmp+1)->job = NULL; + + return tmp; +} + +static void dump_storage_free(dump_storage_t *st) +{ + dump_storage_t *tmp; + for ( tmp = st; tmp && tmp->job; tmp++ ) { + free(tmp->job); + free(tmp->fname); + } + free(st); +} + +/* Reads line from file and returns pointer to that. + * Returned string is not mem allocated. It only points to the buffer! + * + * Buffer must be given - it is handled (allocated etc.) fully in the + * function. It has to be freed outside. + * + * returns: -1 on error + * 0 eof ('ln' could points to the last line in file) + * 1 line successfuly read + */ +static int read_line(int fhnd, buffer_t *buf, char **ln) +{ + int ct, toread; + + + if ( buf->eol ) { + int tmp = buf->eol - buf->str + 1; + + if ( tmp < buf->use ) { + char *aux; + if ( (aux = memchr(buf->eol+1, '\n', buf->use-tmp)) ) { + *ln = buf->eol+1; + *aux = 0; + buf->eol = aux; + return 1; + } + } + memmove(buf->str, buf->eol+1, buf->use - tmp); + buf->eol = NULL; + buf->use -= tmp; + } + + do { + if ( buf->use == buf->sz ) { + char *tmp = realloc(buf->str, buf->sz+BUFSIZ); + if ( !tmp ) return -1; + buf->str = tmp; + buf->sz += BUFSIZ; + } + + toread = buf->sz - buf->use; + if ( (ct = read(fhnd, buf->str+buf->use, toread)) < 0 ) { + if ( errno == EINTR ) continue; + return -1; + } + if ( ct == 0 ) break; + buf->eol = memchr(buf->str+buf->use, '\n', ct); + buf->use += ct; + } while ( ct == toread && !buf->eol ); + + *ln = buf->use? buf->str: NULL; + if ( buf->eol ) { + *buf->eol = 0; + return 1; + } + + return 0; +} diff --git a/org.glite.lb.utils/src/load.c b/org.glite.lb.utils/src/load.c new file mode 100644 index 0000000..efc453b --- /dev/null +++ b/org.glite.lb.utils/src/load.c @@ -0,0 +1,208 @@ +#ident "$Header$" + + +#include +#include +#include +#include +#include +#include + +#include + +#define CLIENT_SBIN_PROG + +#include "glite/lb/context-int.h" +#include "glite/lb/ulm_parse.h" +#include "glite/lb/xml_parse.h" +#include "glite/lb/mini_http.h" + +#include "query.h" +#include "consumer.h" + +#define dprintf(x) { if (debug) printf x; } + +static const char rcsid[] = "@(#)$Id$"; + +static int debug=0; + +static void printerr(edg_wll_Context ctx); + +static struct option opts[] = { + { "file", required_argument, NULL, 'f'}, + { "help", no_argument, NULL, 'h' }, + { "version", no_argument, NULL, 'v' }, + { "debug", no_argument, NULL, 'd' }, + { "server", required_argument, NULL, 'm' }, + { NULL, no_argument, NULL, 0 } +}; + +static void usage(char *me) +{ + fprintf(stderr,"usage: %s [option]\n" + " -m, --server L&B server machine name\n" + " -f, --file filename file with dumped data to be loaded\n" + " -h, --help display this help\n" + " -v, --version display version\n" + " -d, --debug diagnostic output\n", + me); +} + +int main(int argc,char *argv[]) +{ + edg_wll_LoadRequest *request; + edg_wll_LoadResult *result; + char *server = NULL; + char date[ULM_DATE_STRING_LENGTH+1]; + + char *me; + int opt; + edg_wll_Context ctx; + + /* initialize request to server defaults */ + request = (edg_wll_LoadRequest *) calloc(1,sizeof(edg_wll_LoadRequest)); + request->server_file = NULL; + + /* initialize result */ + result = (edg_wll_LoadResult *) calloc(1,sizeof(edg_wll_LoadResult)); + + me = strrchr(argv[0],'/'); + if (me) me++; else me=argv[0]; + + /* get arguments */ + while ((opt = getopt_long(argc,argv,"f:t:m:dvh",opts,NULL)) != EOF) { + + switch (opt) { + + case 'f': request->server_file = optarg; break; + case 'm': server = optarg; break; + case 'd': debug = 1; break; + case 'v': fprintf(stdout,"%s:\t%s\n",me,rcsid); exit(0); + case 'h': + case '?': usage(me); return 1; + } + } + + /* Initialize Globus common module */ + dprintf(("Initializing Globus common module...")); + if (globus_module_activate(GLOBUS_COMMON_MODULE) != GLOBUS_SUCCESS) { + dprintf(("no.\n")); + fprintf(stderr,"Unable to initialize Globus common module\n"); + } else { + dprintf(("yes.\n")); + } + + /* initialize context */ + edg_wll_InitContext(&ctx); + if ( server ) + { + char *p = strchr(server, ':'); + if ( p ) + { + edg_wll_SetParam(ctx, EDG_WLL_PARAM_QUERY_SERVER_PORT, atoi(p+1)); + *p = 0; + } + edg_wll_SetParam(ctx, EDG_WLL_PARAM_QUERY_SERVER, server); + } + + /* check request */ + if (debug) { + printf("Load request:\n"); + if (request->server_file) { + printf("- server_file: %s.\n",request->server_file); + } else { + printf("- server_file: not specified.\n"); + } + } + + /* that is the LoadEvents */ + dprintf(("Running the edg_wll_LoadEvents...\n")); + if (edg_wll_LoadEvents(ctx, request, result) != 0) { + fprintf(stderr,"Error running the edg_wll_LoadEvents().\n"); + printerr(ctx); + if ( !result->server_file ) + goto main_end; + } + + /* examine the result */ + dprintf(("Examining the result of edg_wll_LoadEvents...\n")); + printf("Load result:\n"); + if (result->server_file) + printf("- Unloaded events were stored into the server file '%s'.\n", result->server_file); + if (edg_wll_ULMTimevalToDate(result->from,0,date) != 0) { + fprintf(stderr,"Error parsing 'from' argument.\n"); + goto main_end; + } + printf("- from: %ld (i.e. %s).\n",result->from,date); + if (edg_wll_ULMTimevalToDate(result->to,0,date) != 0) { + fprintf(stderr,"Error parsing 'to' argument.\n"); + goto main_end; + } + printf("- to: %ld (i.e. %s).\n",result->to,date); + +main_end: + dprintf(("End.\n")); + if (request) free(request); + if (result) + { + if (result->server_file) + free(result->server_file); + free(result); + } + edg_wll_FreeContext(ctx); + return 0; +} + + +static void printerr(edg_wll_Context ctx) +{ + char *errt,*errd; + + edg_wll_Error(ctx,&errt,&errd); + fprintf(stderr,"%s (%s)\n",errt,errd); +} + + +static const char* const request_headers[] = { + "Cache-Control: no-cache", + "Accept: application/x-dglb", + "User-Agent: edg_wll_Api/" PROTO_VERSION "/" COMP_PROTO, + "Content-Type: application/x-dglb", + NULL +}; + +int edg_wll_LoadEvents( + edg_wll_Context ctx, + const edg_wll_LoadRequest *request, + edg_wll_LoadResult *result) +{ + int error; + char *send_mess, + *response = NULL, + *recv_mess = NULL; + + edg_wll_LoadRequestToXML(ctx, request, &send_mess); + + ctx->p_tmp_timeout = ctx->p_query_timeout; + if (ctx->p_tmp_timeout.tv_sec < 600) ctx->p_tmp_timeout.tv_sec = 600; + + if (set_server_name_and_port(ctx, NULL)) + goto edg_wll_loadevents_end; + + error = edg_wll_http_send_recv(ctx, + "POST /loadRequest HTTP/1.1", request_headers, send_mess, + &response, NULL, &recv_mess); + if ( error != 0 ) + goto edg_wll_loadevents_end; + + if (http_check_status(ctx, response, &recv_mess)) + goto edg_wll_loadevents_end; + + edg_wll_ParseLoadResult(ctx, recv_mess, result); + +edg_wll_loadevents_end: + if (response) free(response); + if (recv_mess) free(recv_mess); + if (send_mess) free(send_mess); + return edg_wll_Error(ctx,NULL,NULL); +} diff --git a/org.glite.lb.utils/src/purge.c b/org.glite.lb.utils/src/purge.c new file mode 100644 index 0000000..e791d0a --- /dev/null +++ b/org.glite.lb.utils/src/purge.c @@ -0,0 +1,389 @@ +#ident "$Header$" + + +#include +#include +#include +#include +#include +#include + +#include + +#define CLIENT_SBIN_PROG + +#include "glite/lb/context-int.h" +#include "glite/lb/xml_parse.h" +#include "glite/lb/mini_http.h" + +#include "query.h" +#include "consumer.h" + +#define dprintf(x) { if (debug) printf x; } + +#define free_jobs(jobs) { \ + if (jobs) { \ + int i; \ + for ( i = 0; jobs[i]; i++ ) \ + free(jobs[i]); \ + free(jobs); \ + } \ +} + +static const char rcsid[] = "@(#)$Id$"; + +static int debug=0; +static char *file; + +static int read_jobIds(const char *file, char ***jobs_out); +static int get_timeout(const char *arg, int *timeout); +static void printerr(edg_wll_Context ctx); + +static struct option opts[] = { + { "aborted", required_argument, NULL, 'a'}, + { "cleared", required_argument, NULL, 'c'}, + { "cancelled", required_argument, NULL, 'n'}, + { "other", required_argument, NULL, 'o'}, + { "dry-run", no_argument, NULL, 'r'}, + { "jobs", required_argument, NULL, 'j'}, + { "return-list", no_argument, NULL, 'l'}, + { "server-dump", no_argument, NULL, 's'}, + { "client-dump", no_argument, NULL, 'i'}, + { "help", no_argument, NULL, 'h' }, + { "version", no_argument, NULL, 'v' }, + { "debug", no_argument, NULL, 'd' }, + { "server", required_argument, NULL, 'm' }, + { NULL, no_argument, NULL, 0 } +}; + +static void usage(char *me) +{ + fprintf(stderr,"usage: %s [option]\n" + " -a, --aborted NNN[smhd] purge ABORTED jobs older than NNN secs/mins/hours/days\n" + " -c, --cleared NNN[smhd] purge CLEARED jobs older than given time\n" + " -n, --cancelled NNN[smhd] purge CANCELLED jobs older than given time\n" + " -o, --other NNN[smhd] purge OTHER jobs older than given time\n" + " -r, --dry-run do not really purge\n" + " -j, --jobs input file with jobIds of jobs to purge\n" + " -l, --return-list return list of jobid matching the purge/dump criteria\n" + " -s, --server-dump dump jobs into any server file\n" + " -i, --client-dump receive stream of dumped jobs\n" + " -h, --help display this help\n" + " -v, --version display version\n" + " -d, --debug diagnostic output\n" + " -m, --server L&B server machine name\n", + me); +} + +int main(int argc,char *argv[]) +{ + edg_wll_PurgeRequest *request; + edg_wll_PurgeResult *result; + int i, timeout; + char *server = NULL; + + char *me; + int opt; + edg_wll_Context ctx; + + /* initialize request to server defaults */ + request = (edg_wll_PurgeRequest *) calloc(1,sizeof(edg_wll_PurgeRequest)); + request->jobs = NULL; + for (i=0; i < EDG_WLL_NUMBER_OF_STATCODES; i++) { + request->timeout[i]=-1; + } + request->flags = EDG_WLL_PURGE_REALLY_PURGE; + + /* initialize result */ + result = (edg_wll_PurgeResult *) calloc(1,sizeof(edg_wll_PurgeResult)); + + me = strrchr(argv[0],'/'); + if (me) me++; else me=argv[0]; + + /* get arguments */ + while ((opt = getopt_long(argc,argv,"a:c:n:o:j:m:rlsidhv",opts,NULL)) != EOF) { + timeout=-1; + + switch (opt) { + + case 'a': + if ((get_timeout(optarg,&timeout) != 0 )) { + printf("Wrong usage of timeout argument.\n"); + usage(me); + return 1; + } + if (timeout >= 0) { + request->timeout[EDG_WLL_JOB_ABORTED]=timeout; + } + break; + + case 'c': + if (get_timeout(optarg,&timeout) != 0 ) { + printf("Wrong usage of timeout argument.\n"); + usage(me); + return 1; + } + if (timeout >= 0) { + request->timeout[EDG_WLL_JOB_CLEARED]=timeout; + } + break; + + case 'n': + if (get_timeout(optarg,&timeout) != 0 ) { + printf("Wrong usage of timeout argument.\n"); + usage(me); + return 1; + } + if (timeout >= 0) { + request->timeout[EDG_WLL_JOB_CANCELLED]=timeout; + } + break; + case 'o': + if (get_timeout(optarg,&timeout) != 0 ) { + printf("Wrong usage of timeout argument.\n"); + usage(me); + return 1; + } + if (timeout >= 0) request->timeout[EDG_WLL_PURGE_JOBSTAT_OTHER]= timeout; + break; + + case 'm': server = optarg; break; + case 'j': file = optarg; break; + case 'r': request->flags &= (~EDG_WLL_PURGE_REALLY_PURGE); break; + case 'l': request->flags |= EDG_WLL_PURGE_LIST_JOBS; break; + case 's': request->flags |= EDG_WLL_PURGE_SERVER_DUMP; break; + case 'i': request->flags |= EDG_WLL_PURGE_CLIENT_DUMP; break; + case 'd': debug = 1; break; + case 'v': fprintf(stdout,"%s:\t%s\n",me,rcsid); exit(0); + case 'h': + case '?': usage(me); return 1; + } + } + + /* Initialize Globus common module */ + dprintf(("Initializing Globus common module...")); + if (globus_module_activate(GLOBUS_COMMON_MODULE) != GLOBUS_SUCCESS) { + dprintf(("no.\n")); + fprintf(stderr,"Unable to initialize Globus common module\n"); + } else { + dprintf(("yes.\n")); + } + + /* initialize context */ + edg_wll_InitContext(&ctx); + + /* read the jobIds from file, if wanted */ + if (file) { + char **jobs=NULL; + dprintf(("Reading jobIds form file \'%s\'...",file)); + if (read_jobIds(file,&jobs) != 0) { + dprintf(("no.\n")); + fprintf(stderr,"Unable to read jobIds from file \'%s\'\n",file); + goto main_end; + } else { + dprintf(("yes.\n")); + } + request->jobs = jobs; + } + + /* check request */ + if (debug) { + printf("Purge request:\n"); + printf("- flags: %d\n",request->flags); + printf("- %d timeouts:\n",EDG_WLL_NUMBER_OF_STATCODES); + for (i=0; i < EDG_WLL_NUMBER_OF_STATCODES; i++) { + char *stat=edg_wll_StatToString(i); + printf("\t%s: %ld\n",stat,request->timeout[i]); + if (stat) free(stat); + } + printf("- list of jobs:\n"); + if (!request->jobs) { + printf("Not specified.\n"); + } else { + for ( i = 0; request->jobs[i]; i++ ) + printf("%s\n", request->jobs[i]); + } + } + + if ( server ) + { + char *p = strchr(server, ':'); + if ( p ) + { + edg_wll_SetParam(ctx, EDG_WLL_PARAM_QUERY_SERVER_PORT, atoi(p+1)); + *p = 0; + } + edg_wll_SetParam(ctx, EDG_WLL_PARAM_QUERY_SERVER, server); + } + + /* that is the Purge */ + dprintf(("Running the edg_wll_Purge...\n")); + if (edg_wll_Purge(ctx, request, result) != 0) { + fprintf(stderr,"Error running the edg_wll_Purge().\n"); + printerr(ctx); + switch ( edg_wll_Error(ctx, NULL, NULL) ) + { + case ENOENT: + case EPERM: + case EINVAL: + break; + default: + goto main_end; + } + } + + /* examine the result */ + dprintf(("Examining the result of edg_wll_Purge...\n")); + if (result->server_file) { + printf("Server dump: %s\n",result->server_file); + } else { + printf("The jobs were not dumped.\n"); + } + if (request->flags & EDG_WLL_PURGE_LIST_JOBS) { + printf("The following jobs %s purged:\n", + request->flags & EDG_WLL_PURGE_REALLY_PURGE ? "were" : "would be"); + if (!result->jobs) printf("None.\n"); + else { + int i; + for ( i = 0; result->jobs[i]; i++ ) + printf("%s\n",result->jobs[i]); + } + } + +main_end: + dprintf(("End.\n")); + if (request) + { + free_jobs(request->jobs); + free(request); + } + if (result) free(result); + edg_wll_FreeContext(ctx); + return 0; +} + + +static void printerr(edg_wll_Context ctx) +{ + char *errt,*errd; + + edg_wll_Error(ctx,&errt,&errd); + fprintf(stderr,"%s (%s)\n",errt,errd); +} + + +static int read_jobIds(const char *file, char ***jobs_out) +{ + FILE *jobIds = fopen(file,"r"); + char buf[256]; + char **jobs; + int cnt = 0; + + jobs = NULL; + + + if (!jobIds) { + perror(file); + return 1; + } + + while ( 1 ) { + char *nl; + if ( !fgets(buf,sizeof buf,jobIds) ) + { + if (feof(jobIds)) + break; + + free_jobs(jobs); + fprintf(stderr, "Error reading file\n"); + return 1; + } + nl = strchr(buf,'\n'); + if (nl) *nl = 0; + /* TODO: check if it is really jobId, e.g. by edg_wlc_JobIdParse() */ + + if ( !(jobs = realloc(jobs, (cnt+2)*sizeof(*jobs))) ) + { + perror("cond_parse()"); + return(1); + } + jobs[cnt++] = strdup(buf); + } + jobs[cnt] = NULL; + + fclose(jobIds); + *jobs_out = jobs; + + return 0; +} + +static int get_timeout(const char *arg, int *timeout) +{ + int t = -1; + char tunit = '\0'; + + if (sscanf(arg,"%d%c",&t,&tunit) > 0) { + if (tunit) { + switch (tunit) { + case 'd': t *= 86400; break; // 24*60*60 + case 'h': t *= 3600; break; // 60*60 + case 'm': t *= 60; break; + case 's': break; + default: fprintf(stderr,"Allowed time units are s,m,h,d\n"); + return -1; + } + } + } + if (t < 0) return -1; + *timeout = t; + return 0; +} + +static const char* const request_headers[] = { + "Cache-Control: no-cache", + "Accept: application/x-dglb", + "User-Agent: edg_wll_Api/" PROTO_VERSION "/" COMP_PROTO, + "Content-Type: application/x-dglb", + NULL +}; + +int edg_wll_Purge( + edg_wll_Context ctx, + edg_wll_PurgeRequest *request, + edg_wll_PurgeResult *result) +{ + char *send_mess, + *response = NULL, + *recv_mess = NULL; + + edg_wll_ResetError(ctx); + + if (request->flags & EDG_WLL_PURGE_CLIENT_DUMP) + return edg_wll_SetError(ctx,ENOSYS,"client dump"); + + if (edg_wll_PurgeRequestToXML(ctx, request, &send_mess)) + goto edg_wll_purge_end; + + ctx->p_tmp_timeout = ctx->p_query_timeout; + if (ctx->p_tmp_timeout.tv_sec < 600) ctx->p_tmp_timeout.tv_sec = 600; + + if (set_server_name_and_port(ctx, NULL)) + goto edg_wll_purge_end; + + if (edg_wll_http_send_recv(ctx, + "POST /purgeRequest HTTP/1.1", request_headers, send_mess, + &response, NULL, &recv_mess)) + goto edg_wll_purge_end; + + if (http_check_status(ctx, response, &recv_mess)) + goto edg_wll_purge_end; + + if (edg_wll_ParsePurgeResult(ctx, recv_mess, result)) + goto edg_wll_purge_end; + +edg_wll_purge_end: + if (response) free(response); + if (recv_mess) free(recv_mess); + if (send_mess) free(send_mess); + return edg_wll_Error(ctx,NULL,NULL); +} -- 1.8.2.3