- adding basics of 'maildir' fonctionality to LB
authorJiří Škrábal <nykolas@ics.muni.cz>
Thu, 14 Jul 2005 16:31:49 +0000 (16:31 +0000)
committerJiří Škrábal <nykolas@ics.muni.cz>
Thu, 14 Jul 2005 16:31:49 +0000 (16:31 +0000)
- used for JPPS job async. registration

org.glite.lb.common/Makefile
org.glite.lb.common/interface/context-int.h
org.glite.lb.common/interface/lb_maildir.h [new file with mode: 0644]
org.glite.lb.common/src/lb_maildir.c [new file with mode: 0644]
org.glite.lb.common/src/param.c

index b107021..8d7c5e9 100644 (file)
@@ -83,18 +83,21 @@ INSTALL:=libtool --mode=install install
 OBJS:=lb_plain_io.o escape.o events.o mini_http.o query_rec.o \
        status.o xml_conversions.o xml_parse.o ulm_parse.o param.o \
        events_parse.o il_string.o il_int.o notifid.o \
-       il_log.o il_msg.o log_msg.o context.o trio.o strio.o
+       il_log.o il_msg.o log_msg.o context.o trio.o strio.o lb_maildir.o
 LOBJS:=${OBJS:.o=.lo}
 
 TRIO_OBJS:=escape.o trio.o strio.o
 TRIO_LIB:=libglite_lb_trio.a
 
+MAILDIR_OBJS:=lb_maildir.o
+MAILDIR_LIB:=libglite_lb_maildir.a
+
 THROBJS:=${OBJS:.o=.thr.o}
 THRLOBJS:=${OBJS:.o=.thr.lo}
 
 HDRS:=context-int.h lb_plain_io.h mini_http.h authz.h xml_parse.h \
        xml_conversions.h log_proto.h events_parse.h il_string.h il_msg.h \
-       escape.h ulm_parse.h trio.h
+       escape.h ulm_parse.h trio.h lb_maildir.h
 
 STATICLIB:=libglite_lb_common_${nothrflavour}.a
 THRSTATICLIB:=libglite_lb_common_${thrflavour}.a
@@ -105,7 +108,7 @@ REPORTS:=${top_srcdir}/reports
 
 default: all
 
-all compile: check_version ${STATICLIB} ${LTLIB} ${THRSTATICLIB} ${THRLTLIB} ${TRIO_LIB}
+all compile: check_version ${STATICLIB} ${LTLIB} ${THRSTATICLIB} ${THRLTLIB} ${TRIO_LIB} ${MAILDIR_LIB}
 # all compile: ${STATICLIB} ${LTLIB} ${THRSTATICLIB} ${THRLTLIB}
 
 version_info=-version-info `echo ${version} | cut -d. -f1,2 | tr . :`
@@ -118,6 +121,10 @@ ${TRIO_LIB}: ${TRIO_OBJS}
        ar crv $@ ${TRIO_OBJS}
        ranlib $@
 
+${MAILDIR_LIB}: ${MAILDIR_OBJS}
+       ar crv $@ ${MAILDIR_OBJS}
+       ranlib $@
+
 ${THRSTATICLIB}: ${THROBJS}
        ar crv $@ ${THROBJS}
        ranlib $@
@@ -181,7 +188,7 @@ install:
        if [ x${DOSTAGE} = xyes ]; then \
                mkdir -p ${PREFIX}/include/${globalprefix}/${lbprefix} ; \
                (cd ${top_srcdir}/interface && install -m 644 ${HDRS} ${PREFIX}/include/${globalprefix}/${lbprefix}) ; \
-               install -m 644 ${STATICLIB} ${THRSTATICLIB} ${TRIO_LIB} ${PREFIX}/lib; \
+               install -m 644 ${STATICLIB} ${THRSTATICLIB} ${TRIO_LIB} ${MAILDIR_LIB} ${PREFIX}/lib; \
        fi
 
 clean:
index 973a2da..9d68ded 100644 (file)
@@ -73,7 +73,7 @@ struct _edg_wll_Context {
 
        int             is_V21;         /* true if old (V21) request arrived */
        int             isProxy;        /* LBProxy */
-       
+
 /* server limits */
        int             softLimit;
        int             hardJobsLimit;
@@ -85,6 +85,9 @@ struct _edg_wll_Context {
        char            *dumpStorage;
        char            *purgeStorage;
 
+/* maildir location */
+       char    *jpreg_dir;
+
 /* flag for function store_event
  * if set then event are loaded from dump file
  */
diff --git a/org.glite.lb.common/interface/lb_maildir.h b/org.glite.lb.common/interface/lb_maildir.h
new file mode 100644 (file)
index 0000000..4c918d4
--- /dev/null
@@ -0,0 +1,22 @@
+#ifndef LB_MAILDIR
+#define LB_MAILDIR
+
+/*
+ * Functions for reading and writing messages via
+ * maildir protocol.
+ * Used when registering job to the JP, i.e.
+ */
+
+enum {
+       LBMD_TRANS_OK,
+       LBMD_TRANS_FAILED
+};
+       
+extern char lbm_errdesc[];
+
+extern int edg_wll_MaildirInit(const char *);
+extern int edg_wll_MaildirStoreMsg(const char *, const char *, const char *);
+extern int edg_wll_MaildirTransStart(const char *, char **, char **);
+extern int edg_wll_MaildirTransEnd(const char *, char *, int);
+
+#endif
diff --git a/org.glite.lb.common/src/lb_maildir.c b/org.glite.lb.common/src/lb_maildir.c
new file mode 100644 (file)
index 0000000..be4e180
--- /dev/null
@@ -0,0 +1,257 @@
+#include <malloc.h>
+#include <stdio.h>
+#include <unistd.h>
+#include <stdlib.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <dirent.h>
+#include <time.h>
+#include <fcntl.h>
+
+#include "context-int.h"
+#include "lb_maildir.h"
+
+#define DEFAULT_ROOT   "/tmp/lb_maildir"
+
+enum {
+       LBMD_DIR_TMP = 0,
+       LBMD_DIR_NEW,
+       LBMD_DIR_WORK
+};
+
+static const char *dirs[] = { "tmp", "new", "work" };
+
+
+#define MAX_ERR_LEN            1024
+char lbm_errdesc[MAX_ERR_LEN];
+
+
+static int check_mkdir(const char *dir)
+{
+       struct stat sbuf;
+
+       if ( stat(dir, &sbuf) ) {
+               if ( errno == ENOENT ) {
+                       if ( mkdir(dir, S_IRWXU) ) return 1;
+               }
+               else return 1;
+       }
+       else if (S_ISDIR(sbuf.st_mode)) return 0;
+       else return 1;
+
+       return 0;
+}
+
+
+int edg_wll_MaildirInit(
+       const char                 *dir)
+{
+       const char *root = dir? : DEFAULT_ROOT;
+       char            dirname[PATH_MAX];
+       int                     i;
+
+       if ( check_mkdir(root) ) {
+               snprintf(lbm_errdesc, MAX_ERR_LEN, "%s: %s\n", root, strerror(errno));
+               return 1;
+       }
+       
+       for ( i = 0; i < sizeof(dirs)/sizeof((dirs)[0]); i++ ) {
+               snprintf(dirname, PATH_MAX, "%s/%s", root, dirs[i]);
+               if ( check_mkdir(dirname) ) {
+                       snprintf(lbm_errdesc, MAX_ERR_LEN, "%s: %s\n", dirname, strerror(errno));
+                       return 1;
+               }
+       }
+
+       return 0;
+}
+
+
+int edg_wll_MaildirStoreMsg(
+       const char                 *root,
+       const char                 *srvname,
+       const char         *msg)
+{
+       char            fname[PATH_MAX],
+                               newfname[PATH_MAX];
+       int                     fhnd,
+                               written,
+                               msgsz,
+                               ct, i;
+
+
+       if ( !root ) root = DEFAULT_ROOT;
+
+       errno = 0;
+       i = 0;
+       while ( 1 ) {
+               if ( ++i > 10 ) {
+                       errno = ECANCELED;
+                       snprintf(lbm_errdesc, MAX_ERR_LEN, "Maximum tries limit reached with unsuccessful file creation");
+                       return -1;
+               }
+               if ( fname ) free(fname);
+               snprintf(fname, PATH_MAX, "%s/%s/%ld.%d.%s", root, dirs[LBMD_DIR_TMP], time(NULL), getpid(), srvname);
+               if ( (fhnd = open(fname, O_CREAT|O_EXCL|O_WRONLY, 00600)) < 0 ) {
+                       if ( errno == EEXIST ) { sleep(2); continue; }
+                       snprintf(lbm_errdesc, MAX_ERR_LEN, "Can't create file %s", fname);
+                       return -1;
+               }
+               break;
+       }
+
+       msgsz = strlen(msg);
+       written = 0;
+       while ( written < msgsz ) {
+               if ( (ct = write(fhnd, msg+written, msgsz-written)) < 0 ) {
+                       if ( errno == EINTR ) { errno = 0; continue; }
+                       snprintf(lbm_errdesc, MAX_ERR_LEN, "Can't write into file %s", fname);
+                       return -1;
+               }
+               written += msgsz;
+       }
+       if ( fsync(fhnd) ) {
+               snprintf(lbm_errdesc, MAX_ERR_LEN, "Can't fsync file %s", fname);
+               return -1;
+       }
+       if ( close(fhnd) ) {
+               snprintf(lbm_errdesc, MAX_ERR_LEN, "Can't close file %s", fname);
+               return -1;
+       }
+       snprintf(newfname, PATH_MAX, "%s/%s/%s", root, dirs[LBMD_DIR_NEW], strrchr(fname, '/')+1);
+       if ( link(fname, newfname) ) {
+               snprintf(lbm_errdesc, MAX_ERR_LEN, "Can't link new file %s", newfname);
+               return -1;
+       }
+
+       return 0;
+}
+
+
+int edg_wll_MaildirTransEnd(
+       const char                 *root,
+       char                       *fname,
+       int                                     tstate)
+{
+       char            workfname[PATH_MAX],
+                               newfname[PATH_MAX],
+                               origfname[PATH_MAX];
+       struct stat     st;
+
+
+       if ( !root ) root = DEFAULT_ROOT;
+
+       snprintf(workfname, PATH_MAX, "%s/%s/%s", root, dirs[LBMD_DIR_WORK], fname);
+       unlink(workfname);
+
+       snprintf(origfname, PATH_MAX, "%s/%s/%s", root, dirs[LBMD_DIR_TMP], fname);
+       if ( tstate == LBMD_TRANS_OK ) {
+               unlink(origfname);
+               return 0;
+       }
+
+       if ( stat(origfname, &st) ) {
+               snprintf(lbm_errdesc, MAX_ERR_LEN, "Can't stat file '%s'", origfname);
+               return -1;
+       }
+
+       snprintf(newfname, PATH_MAX, "%s/%s/%s", root, dirs[LBMD_DIR_NEW], fname);
+       if ( link(origfname, newfname) ) {
+               snprintf(lbm_errdesc, MAX_ERR_LEN, "Can't link new file %s", newfname);
+               return -1;
+       }
+
+       return 0;
+}
+
+
+int edg_wll_MaildirTransStart(
+       const char                 *root,
+       char              **msg,
+       char                      **fname)
+{
+       static DIR         *dir = NULL;
+       struct dirent  *ent;
+       char                    newfname[PATH_MAX],
+                                       oldfname[PATH_MAX],
+                                  *buf = NULL;
+       int                             fhnd,
+                                       toread, ct,
+                                       bufsz, bufuse;
+
+
+       if ( !root ) root = DEFAULT_ROOT;
+
+       if ( !dir ) {
+               char    dirname[PATH_MAX];
+               snprintf(dirname, PATH_MAX, "%s/%s", root, dirs[LBMD_DIR_NEW]);
+               if ( !(dir = opendir(dirname)) ) {
+                       snprintf(lbm_errdesc, MAX_ERR_LEN, "Can't open directory '%s'", root);
+                       goto err;
+               }
+       }
+
+       do {
+               errno = 0;
+               if ( !(ent = readdir(dir)) ) {
+                       if ( errno == EBADF ) {
+                               snprintf(lbm_errdesc, MAX_ERR_LEN, "Can't read directory '%s'", root);
+                               dir = NULL;
+                               goto err;
+                       } else {
+                               closedir(dir);
+                               dir = NULL;
+                               return 0;
+                       }
+               }
+               if ( ent->d_name[0] == '.' ) continue;
+               snprintf(newfname, PATH_MAX, "%s/%s/%s", root, dirs[LBMD_DIR_WORK], ent->d_name);
+               snprintf(oldfname, PATH_MAX, "%s/%s/%s", root, dirs[LBMD_DIR_NEW], ent->d_name);
+               if ( rename(oldfname, newfname) ) {
+                       if ( errno == ENOENT ) {
+                               /* maybe some other instance moved this file away... */
+                               continue;
+                       } else {
+                               snprintf(lbm_errdesc, MAX_ERR_LEN, "Can't move file '%s'", oldfname);
+                               goto err;
+                       }
+               } else {
+                       /* we have found and moved the file with which we will work now */
+                       break;
+               }
+       } while ( 1 );
+
+       if ( (fhnd = open(newfname, O_RDONLY)) < 0 ) {
+               snprintf(lbm_errdesc, MAX_ERR_LEN, "Can't open file '%s'", newfname);
+               goto err;
+       }
+
+       bufuse = bufsz = toread = ct = 0;
+       do {
+               errno = 0;
+               if ( bufuse == bufsz ) {
+                       char *tmp = realloc(buf, bufsz+BUFSIZ);
+                       if ( !tmp ) goto err;
+                       buf = tmp;
+                       bufsz += BUFSIZ;
+               }
+               toread = bufsz - bufuse;
+               if ( (ct = read(fhnd, buf+bufuse, toread)) < 0 ) {
+                       if ( errno == EINTR ) continue;
+                       snprintf(lbm_errdesc, MAX_ERR_LEN, "Can't read file '%s'", newfname);
+                       goto err;
+               }
+               bufuse += ct;
+       } while ( ct == toread );
+       close(fhnd);
+
+       if ( !(*fname = strdup(ent->d_name)) ) goto err;
+       *msg = buf;
+       return 1;
+
+
+err:
+       if ( buf ) free(buf);
+
+       return -1;
+}
index 001261c..253e241 100644 (file)
@@ -41,6 +41,7 @@ static const char *myenv[] = {
        "EDG_WL_LBPROXY_STORE_SOCK",
        "EDG_WL_LBPROXY_SERVE_SOCK",
        "EDG_WL_LBPROXY_USER",
+       "EDG_WL_JPREG_TMPDIR",
 };
 
 /* XXX: does not parse URL, just hostname[:port] */
@@ -160,6 +161,11 @@ int edg_wll_SetParamString(edg_wll_Context ctx,edg_wll_ContextParam param,const
                        free(ctx->p_user_lbproxy);
                        ctx->p_user_lbproxy = val ? strdup(val) : NULL;
                        break;
+               case EDG_WLL_PARAM_JPREG_TMPDIR:
+                       if (!val) val = getenv(myenv[param]);
+                       free(ctx->jpreg_dir);
+                       ctx->jpreg_dir = val ? strdup(val) : NULL;
+                       break;
                default:
                        return edg_wll_SetError(ctx,EINVAL,"unknown parameter");
        }
@@ -299,6 +305,7 @@ int edg_wll_SetParam(edg_wll_Context ctx,edg_wll_ContextParam param,...)
                case EDG_WLL_PARAM_LBPROXY_STORE_SOCK:
                case EDG_WLL_PARAM_LBPROXY_SERVE_SOCK:
                case EDG_WLL_PARAM_LBPROXY_USER:
+               case EDG_WLL_PARAM_JPREG_TMPDIR:
                        return edg_wll_SetParamString(ctx,param,va_arg(ap,char *));
                case EDG_WLL_PARAM_LOG_TIMEOUT:      
                case EDG_WLL_PARAM_LOG_SYNC_TIMEOUT: 
@@ -408,6 +415,10 @@ int edg_wll_GetParam(edg_wll_Context ctx,edg_wll_ContextParam param,...)
                        p_string = va_arg(ap, char **);
                        *p_string = estrdup(ctx->p_user_lbproxy);
                        break;
+               case EDG_WLL_PARAM_JPREG_TMPDIR:
+                       p_string = va_arg(ap, char **);
+                       *p_string = estrdup(ctx->jpreg_dir);
+                       break;
                case EDG_WLL_PARAM_LOG_TIMEOUT:      
                        p_tv = va_arg(ap,struct timeval *);
                        *p_tv = ctx->p_log_timeout;