first attempt at asynchronous event registration
authorMichal Voců <michal@ruk.cuni.cz>
Mon, 18 Feb 2013 15:00:37 +0000 (15:00 +0000)
committerFrantišek Dvořák <valtri@civ.zcu.cz>
Tue, 26 Feb 2013 14:18:04 +0000 (15:18 +0100)
org.glite.lb.server/src/authz_policy.c
org.glite.lb.server/src/authz_policy.h
org.glite.lb.server/src/bkserverd.c
org.glite.lb.server/src/store.c.T

index 2d03e88..51f513f 100644 (file)
@@ -38,6 +38,7 @@ struct action_name action_names[] = {
     { PURGE, "PURGE" },
     { GRANT_OWNERSHIP, "GRANT_OWNERSHIP" },
     { READ_ANONYMIZED, "READ_ANONYMIZED" },
+    { REGISTER_FOREIGN, "REGISTER_FOREIGN" },
 };
 
 static int num_actions =
index a37d87a..42b817f 100644 (file)
@@ -35,6 +35,7 @@ typedef enum {
     PURGE              = 1 << 8,
     GRANT_OWNERSHIP    = 1 << 9,
     READ_ANONYMIZED    = 1 << 10,
+    REGISTER_FOREIGN    = 1 << 11,
 } authz_action;
 
 typedef struct action_name {
index b1f94d0..ddd8cc8 100644 (file)
@@ -144,6 +144,7 @@ static int                          noAuth = 0;
 static int                             noIndex = 0;
 static int                             strict_locking = 0;
 static int                             greyjobs = 0;
+static int                              async_registrations = 0;
 static int                             count_statistics = 1;
 static int                             count_server_stats = 1;
 static char                            *stats_file_prefix = NULL;
@@ -231,6 +232,7 @@ static struct option opts[] = {
 #endif
        {"transactions",        1,      NULL,   'b'},
        {"greyjobs",    0,      NULL,   'g'},
+       {"async-registrations", 0, NULL, 'Q'},
        {"withproxy",   0,      NULL,   'B'},
        {"proxyonly",   0,      NULL,   'P'},
        {"sock",        1,      NULL,   'o'},
@@ -245,7 +247,7 @@ static struct option opts[] = {
        {NULL,0,NULL,0}
 };
 
-static const char *get_opt_string = "Ac:k:C:V:p:a:drm:M:ns:i:S:D:J:jR:F:xOL:N:X:Y:T:t:e:f:zb:gPBo:q:W:Z:GI:l:EH:"
+static const char *get_opt_string = "Ac:k:C:V:p:a:drm:M:ns:i:S:D:J:jR:F:xOL:N:X:Y:T:t:e:f:zb:gQPBo:q:W:Z:GI:l:EH:"
 #ifdef GLITE_LB_SERVER_WITH_WS
        "w:"
 #endif
@@ -298,6 +300,7 @@ static void usage(char *me)
                "\t-K, --perf-sink\t where to sink events\n"
 #endif
                "\t-g,--greyjobs\t allow delayed registration (grey jobs), implies --strict-locking\n"
+               "\t-Q,--async-registrations\t allow asynchronous registrations (weaker form of grey jobs) \n"
                "\t-P,--proxyonly\t     run only proxy service\n"
                "\t-B,--withproxy\t     run both server and proxy service\n"
                "\t-o,--sock\t path-name to the local socket for communication with LB proxy\n"
@@ -508,8 +511,10 @@ int main(int argc, char *argv[])
                case 'K': sink_mode = atoi(optarg);
                          break;
 #endif
-               case 'g': greyjobs = strict_locking = 1;
+               case 'g': greyjobs = strict_locking = 1; 
                          break;
+               case 'Q': async_registrations = 1; /* XXX should set strict locking too? */
+                         break;
                case 'P': mode = SERVICE_PROXY;
                          break;
                case 'B': mode = SERVICE_PROXY_SERVER;
@@ -1291,6 +1296,7 @@ int bk_handle_connection(int conn, struct timeval *timeout, void *data)
        }
        ctx->strict_locking = strict_locking;
        ctx->greyjobs = greyjobs;
+       ctx->async_registrations = async_registrations;
        ctx->exclusive_zombies = exclusive_zombies;
 
        for (totpref = 0; msg_prefixes[totpref]; totpref++);
index 05246d4..87573b1 100644 (file)
@@ -46,6 +46,7 @@ limitations under the License.
 #include "store.h"
 #include "get_events.h"
 #include "lb_authz.h"
+#include "authz_policy.h"
 #include "jobstat.h"
 #include "db_calls.h"
 #include "db_supp.h"
@@ -284,7 +285,7 @@ int store_job_server_proxy(edg_wll_Context ctx, edg_wll_Event *event, int *regis
        char            *unique = edg_wlc_JobIdGetUnique(event->any.jobId);
        char            *q = NULL, *userid = NULL, *subj = NULL, *owner = NULL;
        glite_lbu_Statement    stmt = NULL, stmt_zomb = NULL;
-       int             nar, grey = 0;
+       int             nar, grey = 0, async = 0;
        char            *can_peername = NULL;
        int             local_job = is_job_local(ctx, event->any.jobId);
        char            *res[3] = {NULL, NULL, NULL};
@@ -316,9 +317,16 @@ int store_job_server_proxy(edg_wll_Context ctx, edg_wll_Event *event, int *regis
                /* XXX: directness is checked by any.user == peerName. Not perfect but better than event flags. */
 
                        if (!ctx->isProxy && !edg_wll_gss_equal_subj(event->any.user, ctx->peerName) && !ctx->greyjobs) {
-                               edg_wll_SetError(ctx,EPERM,"job registration must go directly");
-                               goto err;
-
+                               if(ctx->async_registrations) {
+                                       /* check authorization to register job as different user */
+                                       if(check_authz_policy_ctx(ctx, REGISTER_FOREIGN)) {
+                                               edg_wll_SetError(ctx, EPERM, "not allowed to register jobs as different user");
+                                               goto err;
+                                       }
+                               } else {
+                                       edg_wll_SetError(ctx,EPERM,"job registration must go directly");
+                                       goto err;
+                               }
                        }
                        if ((event->any.priority & EDG_WLL_LOGLFLAG_EXCL) && ctx->exclusive_zombies) {
                                trio_asprintf(&q,"select jobid from zombie_jobs "
@@ -336,37 +344,66 @@ int store_job_server_proxy(edg_wll_Context ctx, edg_wll_Event *event, int *regis
                }
                else {
                        if (ctx->greyjobs) grey = 1;
+                       else if(ctx->async_registrations) { 
+                               /* TODO: possibly add another permission */
+                               if(check_authz_policy_ctx(ctx, REGISTER_FOREIGN)) {
+                                       edg_wll_SetError(ctx, EPERM, "not allowed to log events to unregistered job");
+                                       goto err;
+                               }
+                               async = 1;
+                       }
                        else {
                                edg_wll_SetError(ctx, ENOENT, "job not registered");
                                goto err;
                        }
                }
 
-               can_peername = grey ?  strdup("GREY JOB") : edg_wll_gss_normalize_subj(event->any.user, 0);
+               /* first event is not registration:
+                *   - when grey jobs are allowed, job is owned by "GREY JOB"
+                *   - when asynchronous registrations are allowed, job is owned by the logging component
+                * when none of the above applies, we should not get here at all.
+                */
+               can_peername = grey ?  strdup("GREY JOB") : ( async ? edg_wll_gss_normalize_subj(ctx->peerName, 0) : edg_wll_gss_normalize_subj(event->any.user, 0) );
                userid = strdup(strmd5(can_peername, NULL));
                if (store_user(ctx,userid,can_peername)) goto err;
+               /* store job as unregistered (grey) when either grey jobs or async registrations are allowed */
                if (store_job(ctx,(glite_jobid_const_t) event->any.jobId,
-                       userid, ctx->isProxy, local_job, grey, 0 )) goto err;
-               if (event->any.type == EDG_WLL_EVENT_REGJOB && event->regJob.wms_dn) {
-                       char *names = strdup(event->regJob.wms_dn);
-                       char *p, *name;
-                       int ret;
-
-                       name = names;
-                       while ((p = strchr(name, '\n'))) {
-                               *p = '\0';
-                               ret = edg_wll_UpdateACL(ctx, event->any.jobId, name,
+                       userid, ctx->isProxy, local_job, grey | async, 0 )) goto err;
+               if (event->any.type == EDG_WLL_EVENT_REGJOB) {
+                       if(event->regJob.wms_dn) {
+                               char *names = strdup(event->regJob.wms_dn);
+                               char *p, *name;
+                               int ret;
+                               
+                               name = names;
+                               while ((p = strchr(name, '\n'))) {
+                                       *p = '\0';
+                                       ret = edg_wll_UpdateACL(ctx, event->any.jobId, name,
+                                                               EDG_WLL_CHANGEACL_DN,
+                                                               EDG_WLL_CHANGEACL_READ,
+                                                               EDG_WLL_CHANGEACL_ALLOW,
+                                                               EDG_WLL_CHANGEACL_ADD);
+                                       if (ret) { 
+                                               free(names);
+                                               goto err;
+                                       }
+                                       name = p+1;
+                               }
+                               free(names);
+                       } else if(async) {
+                               int ret;
+                               
+                               /* allow logging component to modify job */
+                               ret = edg_wll_UpdateACL(ctx, event->any.jobId, 
+                                                       strdup(edg_wll_gss_normalize_subj(ctx->peerName, 0)),
                                                        EDG_WLL_CHANGEACL_DN,
                                                        EDG_WLL_CHANGEACL_READ,
                                                        EDG_WLL_CHANGEACL_ALLOW,
                                                        EDG_WLL_CHANGEACL_ADD);
                                if (ret) { 
-                                       free(names);
                                        goto err;
                                }
-                               name = p+1;
                        }
-                       free(names);
                }
 
                *register_to_JP = (local_job) ? REG_JOB_TO_JP : 0;
@@ -378,9 +415,17 @@ int store_job_server_proxy(edg_wll_Context ctx, edg_wll_Event *event, int *regis
                if (stmt) { glite_lbu_FreeStmt(&stmt); stmt = NULL; }
 
 
-               if (ctx->greyjobs && !strcmp(res[2],"1") && 
+               if ((ctx->greyjobs || ctx->async_registrations) && !strcmp(res[2],"1") && 
                        (event->any.type == EDG_WLL_EVENT_REGJOB)) 
                {
+                       /* late registration event for already existing job */
+
+                       /* check permission to change owner if not doing greyjobs (ie. in async registration) */
+                       if(!ctx->greyjobs) {
+                               
+                       }
+
+                       /* set proper job owner */
                        can_peername = edg_wll_gss_normalize_subj(event->any.user, 0);
                        userid = strdup(strmd5(can_peername, NULL));
                        if (store_user(ctx,userid,can_peername)) goto err;