- call LB plugin after LB log upload to determine job owner
authorAleš Křenek <ljocha@ics.muni.cz>
Tue, 26 Apr 2005 15:12:37 +0000 (15:12 +0000)
committerAleš Křenek <ljocha@ics.muni.cz>
Tue, 26 Apr 2005 15:12:37 +0000 (15:12 +0000)
- miscelaneous plugin related stuff

org.glite.jp.primary/interface/file_plugin.h
org.glite.jp.primary/src/bones_server.c
org.glite.jp.primary/src/feed.c
org.glite.jp.primary/src/file_plugin.c
org.glite.jp.primary/src/soap_ops.c

index 18ef3a1..144c94c 100644 (file)
@@ -8,9 +8,10 @@ typedef struct _glite_jpps_fplug_op_t {
 /** Open a file.
 \param[in] fpctx       Context of the plugin, returned by its init.
 \param[in] bhandle     Handle of the file via JPPS backend.
+\param[in] type                Index of the type in this plugin's uri's.
 \param[out] handle     Handle to the opened file structure, to be passed to other plugin functions.
 */
-       int     (*open)(void *fpctx,void *bhandle,void **handle);
+       int     (*open)(void *fpctx,void *bhandle,int type,void **handle);
 
 /** Close the file. Free data associated to a handle */
        int     (*close)(void *fpctx,void *handle);
@@ -65,6 +66,6 @@ typedef int (*glite_jpps_fplug_init_t)(
 /* XXX: not really public interface follows */
 
 int glite_jpps_fplug_load(glite_jp_context_t ctx,int argc,char **argv);
-int glite_jpps_fplug_lookup(glite_jp_context_t ctx,const char *uri, glite_jpps_fplug_data_t **plugin_data);
+int glite_jpps_fplug_lookup(glite_jp_context_t ctx,const char *uri, glite_jpps_fplug_data_t ***plugin_data);
 
 #endif
index 0a54d8e..207ba86 100644 (file)
@@ -46,35 +46,52 @@ static int call_opts(glite_jp_context_t,char *,char *,int (*)(glite_jp_context_t
 
 int main(int argc, char *argv[])
 {
-       int     one = 1,opt,bend = 0;
+       int     one = 1,opt;
        edg_wll_GssStatus       gss_code;
        struct sockaddr_in      a;
-
-       /* XXX: read options */
+       char    *b_argv[20] = { "backend" },*p_argv[20] = { "plugins" },*com;
+       int     b_argc,p_argc;
 
        glite_jp_init_context(&ctx);
 
+       b_argc = p_argc = 1;
+
        while ((opt = getopt(argc,argv,"B:P:")) != EOF) switch (opt) {
                case 'B':
-                       if (call_opts(ctx,optarg,"backend",glite_jppsbe_init)) {
-                               /* XXX log */
-                               fputs(glite_jp_error_chain(ctx), stderr);
-                               exit(1);
-                       }
-                       bend = 1;
+                       assert(b_argc < 20);
+                       if (com = strchr(optarg,',')) *com = 0;
+                       
+                       /* XXX: memleak -- who cares for once */
+                       asprintf(&b_argv[b_argc++],"-%s",optarg);
+                       if (com) b_argv[b_argc++] = com+1;
+
                        break;
                case 'P':
-                       if (call_opts(ctx,optarg,"plugins",glite_jpps_fplug_load)) {
-                               /* XXX log */
-                               fputs(glite_jp_error_chain(ctx), stderr);
-                               exit(1);
-                       }
+                       assert(p_argc < 20);
+                       p_argv[p_argc++] = optarg;
+
+                       break;
+               case '?': fprintf(stderr,"usage: %s: -Bb,val ... -Pplugin.so ...\n"
+                                         "b is backend option\n",argv[0]);
+                         exit (1);
        }
 
-       if (!bend) {
+       if (b_argc == 1) {
                fputs("-B required\n",stderr);
                exit (1);
        }
+       
+       optind = 0; /* XXX: getopt used internally */
+       if (glite_jppsbe_init(ctx,b_argc,b_argv)) {
+               fputs(glite_jp_error_chain(ctx), stderr);
+               exit(1);
+       }
+
+       optind = 0; /* XXX: getopt used internally */
+       if (b_argc > 1 && glite_jpps_fplug_load(ctx,p_argc,p_argv)) {
+               fputs(glite_jp_error_chain(ctx), stderr);
+               exit(1);
+       }
 
        srand48(time(NULL)); /* feed id generation */
 
index 89ea840..414e7f1 100644 (file)
@@ -4,10 +4,13 @@
 #include <string.h>
 #include <errno.h>
 #include <assert.h>
+#include <fcntl.h>
 
 #include "glite/jp/types.h"
 #include "glite/jp/strmd5.h"
 #include "feed.h"
+#include "file_plugin.h"
+#include "builtin_plugins.h"
 
 
 /* 
@@ -113,6 +116,7 @@ static int match_feed(
 
                        if (glite_jppsbe_get_job_metadata(ctx,job,meta)) {
                                glite_jp_error_t        err;
+                               memset(&err,0,sizeof err);
                                err.code = EIO;
                                err.source = __FUNCTION__;
                                err.desc = "complete query";
@@ -148,6 +152,7 @@ int glite_jpps_match_attr(
                                attrs[i].attr.type <= 0)
                {
                        glite_jp_error_t        err;
+                       memset(&err,0,sizeof err);
                        err.code = EINVAL;
                        err.source = __FUNCTION__;
                        err.desc = "unknown attribute";
@@ -155,6 +160,7 @@ int glite_jpps_match_attr(
                }
                if (attri[attrs[i].attr.type] >= 0) {
                        glite_jp_error_t        err;
+                       memset(&err,0,sizeof err);
                        err.code = EINVAL;
                        err.source = __FUNCTION__;
                        err.desc = "double attribute change";
@@ -180,7 +186,63 @@ int glite_jpps_match_file(
        const char *name
 )
 {
-       fprintf(stderr,"%s: \n",__FUNCTION__);
+       glite_jpps_fplug_data_t **pd = NULL;
+       int     pi;
+       void    *bh = NULL;
+       int     ret;
+
+       fprintf(stderr,"%s: %s %s %s\n",__FUNCTION__,job,class,name);
+
+       switch (glite_jpps_fplug_lookup(ctx,class,&pd)) {
+               case ENOENT: return 0;  /* XXX: shall we complain? */
+               case 0: break;
+               default: return -1;
+       }
+
+       for (pi=0; pd[pi]; pi++) {
+               int     ci;
+               for (ci=0; pd[pi]->uris[ci]; ci++) if (!strcmp(pd[pi]->uris[ci],class)) {
+                               void    *ph;
+
+                               if (!bh && (ret = glite_jppsbe_open_file(ctx,job,pd[pi]->classes[ci],name,O_RDONLY,&bh))) {
+                                       free(pd);
+                                       return ret;
+                               }
+
+                               if (pd[pi]->ops.open(pd[pi]->fpctx,bh,ci,&ph)) {
+                                       /* XXX: complain more visibly */
+                                       fputs("plugin open failed\n",stderr);
+                                       continue;
+                               }
+
+                               /* XXX: does not belong here but I'd like to avoid opening the file twice */
+                               if (!strcmp(class,GLITE_JP_FILETYPE_LB)) {
+                                       glite_jp_attr_t         owner = { GLITE_JP_ATTR_OWNER, NULL };
+                                       glite_jp_attrval_t      *val;
+
+                                       switch (pd[pi]->ops.attr(pd[pi]->fpctx,ph,owner,&val)) {
+                                               case ENOENT:
+                                               case ENOSYS: abort();
+                                               case 0: printf("LB plugin: owner = %s\n",val[0].value.s);
+                                                       /* TODO: store it in backend */
+
+                                                       glite_jp_attrval_free(val,1);
+                                                       break;
+
+                                               default: /* TODO: complain */; break;
+                                       }
+                               }
+
+                               /* TODO: extract attributes for the feeds */
+
+
+                               pd[pi]->ops.close(pd[pi]->fpctx,ph);
+                       }
+       }
+
+       if (bh) glite_jppsbe_close_file(ctx,bh);
+       free(pd);
+
        return 0;
 }
 
index 25687ef..144a231 100644 (file)
@@ -18,11 +18,12 @@ static int loadit(glite_jp_context_t ctx,const char *so)
        void    *dl_handle = dlopen(so,RTLD_NOW);
 
        glite_jp_error_t        err;
-       char    *e;
+       const char      *e;
        glite_jpps_fplug_data_t *data,*dp;
        int     i;
 
        glite_jpps_fplug_init_t init;
+       memset(&err,0,sizeof err);
 
        if (!dl_handle) {
                err.source = "dlopen()";
@@ -53,6 +54,8 @@ static int loadit(glite_jp_context_t ctx,const char *so)
        ctx->plugins = realloc(ctx->plugins, (i+2) * sizeof *ctx->plugins);
        ctx->plugins[i] = data;
        ctx->plugins[i+1] = NULL;
+
+       /* TODO: check consistency of uri+class pairs wrt. previous plugins */
        
        return 0;
 }
@@ -63,6 +66,7 @@ int glite_jpps_fplug_load(glite_jp_context_t ctx,int argc,char **argv)
 
        for (i=1; i<argc; i++) if (loadit(ctx,argv[i])) {
                glite_jp_error_t        err;
+               memset(&err,0,sizeof err);
                err.source = __FUNCTION__;
                err.code = EINVAL;
                err.desc = argv[i];
@@ -72,11 +76,15 @@ int glite_jpps_fplug_load(glite_jp_context_t ctx,int argc,char **argv)
        return 0;
 }
 
-int glite_jpps_fplug_lookup(glite_jp_context_t ctx,const char *uri, glite_jpps_fplug_data_t **plugin_data)
+int glite_jpps_fplug_lookup(glite_jp_context_t ctx,const char *uri, glite_jpps_fplug_data_t ***plugin_data)
 {
        int     i;
 
+       glite_jpps_fplug_data_t **out = NULL;
+       int     matches = 0;
+
        glite_jp_error_t        err;
+       memset(&err,0,sizeof err);
        err.source = __FUNCTION__;
        err.code = ENOENT;
        err.desc = (char *) uri;        /* XXX: we don't modify it, believe me, gcc! */
@@ -92,11 +100,16 @@ int glite_jpps_fplug_lookup(glite_jp_context_t ctx,const char *uri, glite_jpps_f
 
                for (j=0; p->uris && p->uris[j]; j++)
                        if (!strcmp(p->uris[j],uri)) {
-                               *plugin_data = p;
-                               return 0;
+                               out = realloc(out, (matches+2) * sizeof *out);
+                               out[matches++] = p;
+                               out[matches] = NULL;
                        }
        }
 
-       return glite_jp_stack_error(ctx,&err);
+       if (matches) {
+               *plugin_data = out;
+               return 0;
+       }
+       else return glite_jp_stack_error(ctx,&err);
 }
 
index caf3bca..96f2101 100644 (file)
@@ -1,5 +1,6 @@
 #include <stdio.h>
 #include <fcntl.h>
+#include <assert.h>
 
 #include "glite/jp/types.h"
 #include "glite/jp/context.h"
@@ -119,9 +120,35 @@ SOAP_FMAC5 int SOAP_FMAC6 jpsrv__StartUpload(
 {
        CONTEXT_FROM_SOAP(soap,ctx);
        char    *destination;
+       glite_jp_error_t        err;
+       glite_jpps_fplug_data_t **pd = NULL;
+       int     i;
+
+       glite_jp_clear_error(ctx);
+       memset(&err,0,sizeof err);
+
+       switch (glite_jpps_fplug_lookup(ctx,class,&pd)) {
+               case ENOENT:
+                       err.code = ENOENT;
+                       err.source = __FUNCTION__;
+                       err.desc = "unknown file class";
+                       glite_jp_stack_error(ctx,&err);
+                       err2fault(ctx,soap);
+                       return SOAP_FAULT;
+               case 0: break;
+               default:
+                       err2fault(ctx,soap);
+                       return SOAP_FAULT;
+       }
+
+       for (i=0; pd[0]->uris[i] && strcmp(pd[0]->uris[i],class); i++);
+       assert(pd[0]->uris[i]);
 
-       if (glite_jppsbe_start_upload(ctx,job,class,name,content_type,&destination,&commit_before)) {
+       if (glite_jppsbe_start_upload(ctx,job,pd[0]->classes[i],name,content_type,
+                               &destination,&commit_before))
+       {
                err2fault(ctx,soap);
+               free(pd);
                return SOAP_FAULT;
        }
 
@@ -129,6 +156,7 @@ SOAP_FMAC5 int SOAP_FMAC6 jpsrv__StartUpload(
        free(destination);
        response->commitBefore = commit_before;
 
+       free(pd);
        return SOAP_OK;
 }
 
@@ -166,42 +194,49 @@ SOAP_FMAC5 int SOAP_FMAC6 jpsrv__RecordTag(
 {
        CONTEXT_FROM_SOAP(soap,ctx);
        void    *file_be,*file_p;
-       glite_jpps_fplug_data_t *pd;
+       glite_jpps_fplug_data_t **pd = NULL;
 
        glite_jp_tagval_t       mytag;
 
        file_be = file_p = NULL;
 
-       /* XXX: we assume that TAGS plugin handles just one uri/class */
+       /* XXX: we assume just one plugin and also that TAGS plugin handles
+        * just one uri/class */
+
        if (glite_jpps_fplug_lookup(ctx,GLITE_JP_FILETYPE_TAGS,&pd)
-               || glite_jppsbe_open_file(ctx,job,pd->classes[0],NULL,
+               || glite_jppsbe_open_file(ctx,job,pd[0]->classes[0],NULL,
                                                O_WRONLY|O_CREAT,&file_be)
        ) {
+               free(pd);
                err2fault(ctx,soap);
                return SOAP_FAULT;
        }
 
        s2jp_tag(tag,&mytag);
 
-       if (pd->ops.open(pd->fpctx,file_be,&file_p)
-               || pd->ops.generic(pd->fpctx,file_p,GLITE_JP_FPLUG_TAGS_APPEND,&mytag))
+       /* XXX: assuming tag plugin handles just one type */
+       if (pd[0]->ops.open(pd[0]->fpctx,file_be,0,&file_p)
+               || pd[0]->ops.generic(pd[0]->fpctx,file_p,GLITE_JP_FPLUG_TAGS_APPEND,&mytag))
        {
                err2fault(ctx,soap);
-               if (file_p) pd->ops.close(pd->fpctx,file_p);
+               if (file_p) pd[0]->ops.close(pd[0]->fpctx,file_p);
                glite_jppsbe_close_file(ctx,file_be);
+               free(pd);
                return SOAP_FAULT;
        }
 
-       if (pd->ops.close(pd->fpctx,file_p)
+       if (pd[0]->ops.close(pd[0]->fpctx,file_p)
                || glite_jppsbe_close_file(ctx,file_be))
        {
                err2fault(ctx,soap);
+               free(pd);
                return SOAP_FAULT;
        }
 
        /* XXX: ignore errors but don't fail silenty */
        glite_jpps_match_tag(ctx,job,&mytag);
 
+       free(pd);
        return SOAP_OK;
 }
 
@@ -302,6 +337,7 @@ SOAP_FMAC5 int SOAP_FMAC6 jpsrv__FeedIndex(
 
        if (!history && !continuous) {
                glite_jp_error_t        err;
+               memset(&err,0,sizeof err);
                err.code = EINVAL;
                err.source = __FUNCTION__;
                err.desc = "at least one of <history> and <continous> must be true";
@@ -347,6 +383,7 @@ SOAP_FMAC5 int SOAP_FMAC6 jpsrv__GetJob(
        struct jptype__Files    *files;
        struct jptype__File     **f = NULL;
 
+       memset(&err,0,sizeof err);
        files = response->files = soap_malloc(soap,sizeof *response->files);
        files->__sizefile = 0;