From 982af7356f2674751e61df73d8236d4eef7fa810 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Ji=C5=99=C3=AD=20=C5=A0kr=C3=A1bal?= Date: Thu, 14 Jul 2005 16:31:49 +0000 Subject: [PATCH] - adding basics of 'maildir' fonctionality to LB - used for JPPS job async. registration --- org.glite.lb.common/Makefile | 15 +- org.glite.lb.common/interface/context-int.h | 5 +- org.glite.lb.common/interface/lb_maildir.h | 22 +++ org.glite.lb.common/src/lb_maildir.c | 257 ++++++++++++++++++++++++++++ org.glite.lb.common/src/param.c | 11 ++ 5 files changed, 305 insertions(+), 5 deletions(-) create mode 100644 org.glite.lb.common/interface/lb_maildir.h create mode 100644 org.glite.lb.common/src/lb_maildir.c diff --git a/org.glite.lb.common/Makefile b/org.glite.lb.common/Makefile index b107021..8d7c5e9 100644 --- a/org.glite.lb.common/Makefile +++ b/org.glite.lb.common/Makefile @@ -83,18 +83,21 @@ INSTALL:=libtool --mode=install install OBJS:=lb_plain_io.o escape.o events.o mini_http.o query_rec.o \ status.o xml_conversions.o xml_parse.o ulm_parse.o param.o \ events_parse.o il_string.o il_int.o notifid.o \ - il_log.o il_msg.o log_msg.o context.o trio.o strio.o + il_log.o il_msg.o log_msg.o context.o trio.o strio.o lb_maildir.o LOBJS:=${OBJS:.o=.lo} TRIO_OBJS:=escape.o trio.o strio.o TRIO_LIB:=libglite_lb_trio.a +MAILDIR_OBJS:=lb_maildir.o +MAILDIR_LIB:=libglite_lb_maildir.a + THROBJS:=${OBJS:.o=.thr.o} THRLOBJS:=${OBJS:.o=.thr.lo} HDRS:=context-int.h lb_plain_io.h mini_http.h authz.h xml_parse.h \ xml_conversions.h log_proto.h events_parse.h il_string.h il_msg.h \ - escape.h ulm_parse.h trio.h + escape.h ulm_parse.h trio.h lb_maildir.h STATICLIB:=libglite_lb_common_${nothrflavour}.a THRSTATICLIB:=libglite_lb_common_${thrflavour}.a @@ -105,7 +108,7 @@ REPORTS:=${top_srcdir}/reports default: all -all compile: check_version ${STATICLIB} ${LTLIB} ${THRSTATICLIB} ${THRLTLIB} ${TRIO_LIB} +all compile: check_version ${STATICLIB} ${LTLIB} ${THRSTATICLIB} ${THRLTLIB} ${TRIO_LIB} ${MAILDIR_LIB} # all compile: ${STATICLIB} ${LTLIB} ${THRSTATICLIB} ${THRLTLIB} version_info=-version-info `echo ${version} | cut -d. -f1,2 | tr . :` @@ -118,6 +121,10 @@ ${TRIO_LIB}: ${TRIO_OBJS} ar crv $@ ${TRIO_OBJS} ranlib $@ +${MAILDIR_LIB}: ${MAILDIR_OBJS} + ar crv $@ ${MAILDIR_OBJS} + ranlib $@ + ${THRSTATICLIB}: ${THROBJS} ar crv $@ ${THROBJS} ranlib $@ @@ -181,7 +188,7 @@ install: if [ x${DOSTAGE} = xyes ]; then \ mkdir -p ${PREFIX}/include/${globalprefix}/${lbprefix} ; \ (cd ${top_srcdir}/interface && install -m 644 ${HDRS} ${PREFIX}/include/${globalprefix}/${lbprefix}) ; \ - install -m 644 ${STATICLIB} ${THRSTATICLIB} ${TRIO_LIB} ${PREFIX}/lib; \ + install -m 644 ${STATICLIB} ${THRSTATICLIB} ${TRIO_LIB} ${MAILDIR_LIB} ${PREFIX}/lib; \ fi clean: diff --git a/org.glite.lb.common/interface/context-int.h b/org.glite.lb.common/interface/context-int.h index 973a2da..9d68ded 100644 --- a/org.glite.lb.common/interface/context-int.h +++ b/org.glite.lb.common/interface/context-int.h @@ -73,7 +73,7 @@ struct _edg_wll_Context { int is_V21; /* true if old (V21) request arrived */ int isProxy; /* LBProxy */ - + /* server limits */ int softLimit; int hardJobsLimit; @@ -85,6 +85,9 @@ struct _edg_wll_Context { char *dumpStorage; char *purgeStorage; +/* maildir location */ + char *jpreg_dir; + /* flag for function store_event * if set then event are loaded from dump file */ diff --git a/org.glite.lb.common/interface/lb_maildir.h b/org.glite.lb.common/interface/lb_maildir.h new file mode 100644 index 0000000..4c918d4 --- /dev/null +++ b/org.glite.lb.common/interface/lb_maildir.h @@ -0,0 +1,22 @@ +#ifndef LB_MAILDIR +#define LB_MAILDIR + +/* + * Functions for reading and writing messages via + * maildir protocol. + * Used when registering job to the JP, i.e. + */ + +enum { + LBMD_TRANS_OK, + LBMD_TRANS_FAILED +}; + +extern char lbm_errdesc[]; + +extern int edg_wll_MaildirInit(const char *); +extern int edg_wll_MaildirStoreMsg(const char *, const char *, const char *); +extern int edg_wll_MaildirTransStart(const char *, char **, char **); +extern int edg_wll_MaildirTransEnd(const char *, char *, int); + +#endif diff --git a/org.glite.lb.common/src/lb_maildir.c b/org.glite.lb.common/src/lb_maildir.c new file mode 100644 index 0000000..be4e180 --- /dev/null +++ b/org.glite.lb.common/src/lb_maildir.c @@ -0,0 +1,257 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "context-int.h" +#include "lb_maildir.h" + +#define DEFAULT_ROOT "/tmp/lb_maildir" + +enum { + LBMD_DIR_TMP = 0, + LBMD_DIR_NEW, + LBMD_DIR_WORK +}; + +static const char *dirs[] = { "tmp", "new", "work" }; + + +#define MAX_ERR_LEN 1024 +char lbm_errdesc[MAX_ERR_LEN]; + + +static int check_mkdir(const char *dir) +{ + struct stat sbuf; + + if ( stat(dir, &sbuf) ) { + if ( errno == ENOENT ) { + if ( mkdir(dir, S_IRWXU) ) return 1; + } + else return 1; + } + else if (S_ISDIR(sbuf.st_mode)) return 0; + else return 1; + + return 0; +} + + +int edg_wll_MaildirInit( + const char *dir) +{ + const char *root = dir? : DEFAULT_ROOT; + char dirname[PATH_MAX]; + int i; + + if ( check_mkdir(root) ) { + snprintf(lbm_errdesc, MAX_ERR_LEN, "%s: %s\n", root, strerror(errno)); + return 1; + } + + for ( i = 0; i < sizeof(dirs)/sizeof((dirs)[0]); i++ ) { + snprintf(dirname, PATH_MAX, "%s/%s", root, dirs[i]); + if ( check_mkdir(dirname) ) { + snprintf(lbm_errdesc, MAX_ERR_LEN, "%s: %s\n", dirname, strerror(errno)); + return 1; + } + } + + return 0; +} + + +int edg_wll_MaildirStoreMsg( + const char *root, + const char *srvname, + const char *msg) +{ + char fname[PATH_MAX], + newfname[PATH_MAX]; + int fhnd, + written, + msgsz, + ct, i; + + + if ( !root ) root = DEFAULT_ROOT; + + errno = 0; + i = 0; + while ( 1 ) { + if ( ++i > 10 ) { + errno = ECANCELED; + snprintf(lbm_errdesc, MAX_ERR_LEN, "Maximum tries limit reached with unsuccessful file creation"); + return -1; + } + if ( fname ) free(fname); + snprintf(fname, PATH_MAX, "%s/%s/%ld.%d.%s", root, dirs[LBMD_DIR_TMP], time(NULL), getpid(), srvname); + if ( (fhnd = open(fname, O_CREAT|O_EXCL|O_WRONLY, 00600)) < 0 ) { + if ( errno == EEXIST ) { sleep(2); continue; } + snprintf(lbm_errdesc, MAX_ERR_LEN, "Can't create file %s", fname); + return -1; + } + break; + } + + msgsz = strlen(msg); + written = 0; + while ( written < msgsz ) { + if ( (ct = write(fhnd, msg+written, msgsz-written)) < 0 ) { + if ( errno == EINTR ) { errno = 0; continue; } + snprintf(lbm_errdesc, MAX_ERR_LEN, "Can't write into file %s", fname); + return -1; + } + written += msgsz; + } + if ( fsync(fhnd) ) { + snprintf(lbm_errdesc, MAX_ERR_LEN, "Can't fsync file %s", fname); + return -1; + } + if ( close(fhnd) ) { + snprintf(lbm_errdesc, MAX_ERR_LEN, "Can't close file %s", fname); + return -1; + } + snprintf(newfname, PATH_MAX, "%s/%s/%s", root, dirs[LBMD_DIR_NEW], strrchr(fname, '/')+1); + if ( link(fname, newfname) ) { + snprintf(lbm_errdesc, MAX_ERR_LEN, "Can't link new file %s", newfname); + return -1; + } + + return 0; +} + + +int edg_wll_MaildirTransEnd( + const char *root, + char *fname, + int tstate) +{ + char workfname[PATH_MAX], + newfname[PATH_MAX], + origfname[PATH_MAX]; + struct stat st; + + + if ( !root ) root = DEFAULT_ROOT; + + snprintf(workfname, PATH_MAX, "%s/%s/%s", root, dirs[LBMD_DIR_WORK], fname); + unlink(workfname); + + snprintf(origfname, PATH_MAX, "%s/%s/%s", root, dirs[LBMD_DIR_TMP], fname); + if ( tstate == LBMD_TRANS_OK ) { + unlink(origfname); + return 0; + } + + if ( stat(origfname, &st) ) { + snprintf(lbm_errdesc, MAX_ERR_LEN, "Can't stat file '%s'", origfname); + return -1; + } + + snprintf(newfname, PATH_MAX, "%s/%s/%s", root, dirs[LBMD_DIR_NEW], fname); + if ( link(origfname, newfname) ) { + snprintf(lbm_errdesc, MAX_ERR_LEN, "Can't link new file %s", newfname); + return -1; + } + + return 0; +} + + +int edg_wll_MaildirTransStart( + const char *root, + char **msg, + char **fname) +{ + static DIR *dir = NULL; + struct dirent *ent; + char newfname[PATH_MAX], + oldfname[PATH_MAX], + *buf = NULL; + int fhnd, + toread, ct, + bufsz, bufuse; + + + if ( !root ) root = DEFAULT_ROOT; + + if ( !dir ) { + char dirname[PATH_MAX]; + snprintf(dirname, PATH_MAX, "%s/%s", root, dirs[LBMD_DIR_NEW]); + if ( !(dir = opendir(dirname)) ) { + snprintf(lbm_errdesc, MAX_ERR_LEN, "Can't open directory '%s'", root); + goto err; + } + } + + do { + errno = 0; + if ( !(ent = readdir(dir)) ) { + if ( errno == EBADF ) { + snprintf(lbm_errdesc, MAX_ERR_LEN, "Can't read directory '%s'", root); + dir = NULL; + goto err; + } else { + closedir(dir); + dir = NULL; + return 0; + } + } + if ( ent->d_name[0] == '.' ) continue; + snprintf(newfname, PATH_MAX, "%s/%s/%s", root, dirs[LBMD_DIR_WORK], ent->d_name); + snprintf(oldfname, PATH_MAX, "%s/%s/%s", root, dirs[LBMD_DIR_NEW], ent->d_name); + if ( rename(oldfname, newfname) ) { + if ( errno == ENOENT ) { + /* maybe some other instance moved this file away... */ + continue; + } else { + snprintf(lbm_errdesc, MAX_ERR_LEN, "Can't move file '%s'", oldfname); + goto err; + } + } else { + /* we have found and moved the file with which we will work now */ + break; + } + } while ( 1 ); + + if ( (fhnd = open(newfname, O_RDONLY)) < 0 ) { + snprintf(lbm_errdesc, MAX_ERR_LEN, "Can't open file '%s'", newfname); + goto err; + } + + bufuse = bufsz = toread = ct = 0; + do { + errno = 0; + if ( bufuse == bufsz ) { + char *tmp = realloc(buf, bufsz+BUFSIZ); + if ( !tmp ) goto err; + buf = tmp; + bufsz += BUFSIZ; + } + toread = bufsz - bufuse; + if ( (ct = read(fhnd, buf+bufuse, toread)) < 0 ) { + if ( errno == EINTR ) continue; + snprintf(lbm_errdesc, MAX_ERR_LEN, "Can't read file '%s'", newfname); + goto err; + } + bufuse += ct; + } while ( ct == toread ); + close(fhnd); + + if ( !(*fname = strdup(ent->d_name)) ) goto err; + *msg = buf; + return 1; + + +err: + if ( buf ) free(buf); + + return -1; +} diff --git a/org.glite.lb.common/src/param.c b/org.glite.lb.common/src/param.c index 001261c..253e241 100644 --- a/org.glite.lb.common/src/param.c +++ b/org.glite.lb.common/src/param.c @@ -41,6 +41,7 @@ static const char *myenv[] = { "EDG_WL_LBPROXY_STORE_SOCK", "EDG_WL_LBPROXY_SERVE_SOCK", "EDG_WL_LBPROXY_USER", + "EDG_WL_JPREG_TMPDIR", }; /* XXX: does not parse URL, just hostname[:port] */ @@ -160,6 +161,11 @@ int edg_wll_SetParamString(edg_wll_Context ctx,edg_wll_ContextParam param,const free(ctx->p_user_lbproxy); ctx->p_user_lbproxy = val ? strdup(val) : NULL; break; + case EDG_WLL_PARAM_JPREG_TMPDIR: + if (!val) val = getenv(myenv[param]); + free(ctx->jpreg_dir); + ctx->jpreg_dir = val ? strdup(val) : NULL; + break; default: return edg_wll_SetError(ctx,EINVAL,"unknown parameter"); } @@ -299,6 +305,7 @@ int edg_wll_SetParam(edg_wll_Context ctx,edg_wll_ContextParam param,...) case EDG_WLL_PARAM_LBPROXY_STORE_SOCK: case EDG_WLL_PARAM_LBPROXY_SERVE_SOCK: case EDG_WLL_PARAM_LBPROXY_USER: + case EDG_WLL_PARAM_JPREG_TMPDIR: return edg_wll_SetParamString(ctx,param,va_arg(ap,char *)); case EDG_WLL_PARAM_LOG_TIMEOUT: case EDG_WLL_PARAM_LOG_SYNC_TIMEOUT: @@ -408,6 +415,10 @@ int edg_wll_GetParam(edg_wll_Context ctx,edg_wll_ContextParam param,...) p_string = va_arg(ap, char **); *p_string = estrdup(ctx->p_user_lbproxy); break; + case EDG_WLL_PARAM_JPREG_TMPDIR: + p_string = va_arg(ap, char **); + *p_string = estrdup(ctx->jpreg_dir); + break; case EDG_WLL_PARAM_LOG_TIMEOUT: p_tv = va_arg(ap,struct timeval *); *p_tv = ctx->p_log_timeout; -- 1.8.2.3