Streaming notification interface implementation:
authorFrantišek Dvořák <valtri@civ.zcu.cz>
Mon, 7 Feb 2011 18:56:41 +0000 (18:56 +0000)
committerFrantišek Dvořák <valtri@civ.zcu.cz>
Mon, 7 Feb 2011 18:56:41 +0000 (18:56 +0000)
- add new notification flags (+simplify flags string creation)
- setup the streaming on bootstrap flag
- perform streaming through deferred action in bkserverd

org.glite.lb.client/src/notify.c
org.glite.lb.common/interface/context-int.h
org.glite.lb.common/interface/jobstat.h.T
org.glite.lb.common/src/xml_conversions.c
org.glite.lb.server/src/bkserverd.c
org.glite.lb.server/src/il_notification.h
org.glite.lb.server/src/notif_match.c
org.glite.lb.server/src/notification.c

index 8cd78ac..df10e43 100644 (file)
@@ -55,6 +55,7 @@ static void usage(char *cmd)
                        "    owner              Match this owner DN\n"
                        "    requested_validity Validity of notification req. in seconds\n"
                        "    flags              0 - return basic status, 1 - return also JDL in status\n"
+                       "                       256 - bootstrap stream (send all existing jobs too)\n"
                        "    network_server     Match only this network server (WMS entry point)\n"
                        "    -O                 Match owner - credentials are retrieved from environment\n"
                        "    -c                 Match only on state change\n"
index 48c5d74..e348c14 100644 (file)
@@ -178,6 +178,9 @@ glite_lb_padded_struct(_edg_wll_Context,150,
        _edg_wll_authz_policy   authz_policy;
 
        int             exclusive_zombies;
+
+       int             (*processRequest_cb)(edg_wll_Context ctx);
+       void            *processRequestUserData;
 )
 
 /* to be used internally: set, update and and clear the error information in 
index d9fdc87..c870a84 100644 (file)
@@ -170,6 +170,8 @@ typedef struct _edg_wll_JobStat {
 #define EDG_WLL_STAT_CHILDSTAT 4       /**< apply the flags recursively to subjobs */
 #define EDG_WLL_STAT_CHILDHIST_FAST 8          /**< partially complete histogram of child job states */
 #define EDG_WLL_STAT_CHILDHIST_THOROUGH 16     /**< full and up-to date histogram of child job states */
+#define EDG_WLL_NOTIF_BOOTSTRAP 256    /**< send the state of the all already existing jobs too */
+#define EDG_WLL_NOTIF_VOLATILE 512     /**< (not used yet) send the notifications directly, without reliability and persistency */
 /* starting from bit 10 private flags begins - do not add 1024 and more! */
 
 
index 1a7ba37..f734bf5 100644 (file)
@@ -751,63 +751,30 @@ void *edg_wll_from_string_to_cclassad(edg_wll_XML_ctx *XMLCtx)
 /* various conversion functions                                        */
 
 
-char *edg_wll_stat_flags_to_string(int flags)
-{
-        char *cflags = NULL, *temp_cflags = NULL;
+static void append_flag(char **cflags, const char *cflag) {
+       char *temp_cflags;
 
+       if (*cflags) {
+               asprintf(&temp_cflags, "%s+%s", *cflags, cflag);
+               free(*cflags);
+               *cflags = temp_cflags;
+       } else
+               asprintf(cflags, "%s", cflag);
+}
 
-        if (flags & EDG_WLL_STAT_CLASSADS) asprintf(&cflags,"%s","classadd");
-        if (flags & EDG_WLL_STAT_CHILDREN) {
-                if (cflags) {
-                        asprintf(&temp_cflags,"%s+%s",cflags,"children");
-                        free(cflags);
-                        cflags=temp_cflags;
-                }
-                else asprintf(&cflags,"%s","children");
-        }
-        if (flags & EDG_WLL_STAT_CHILDSTAT) {
-                if (cflags) {
-                        asprintf(&temp_cflags,"%s+%s",cflags,"childstat");
-                        free(cflags);
-                        cflags=temp_cflags;
-                }
-                else asprintf(&cflags,"%s","childstat");
-        }
-        if (flags & EDG_WLL_STAT_NO_JOBS) {
-                if (cflags) {
-                        asprintf(&temp_cflags,"%s+%s",cflags,"no_jobs");
-                        free(cflags);
-                        cflags=temp_cflags;
-                }
-                else asprintf(&cflags,"%s","no_jobs");
-        }
-        if (flags & EDG_WLL_STAT_NO_STATES) {
-                if (cflags) {
-                        asprintf(&temp_cflags,"%s+%s",cflags,"no_states");
-                        free(cflags);
-                        cflags=temp_cflags;
-                }
-                else asprintf(&cflags,"%s","no_states");
-        }
-
-        if (flags & EDG_WLL_STAT_CHILDHIST_FAST) {
-                if (cflags) {
-                        asprintf(&temp_cflags,"%s+%s",cflags,"childhist_fast");
-                        free(cflags);
-                        cflags=temp_cflags;
-                }
-                else asprintf(&cflags,"%s","childhist_fast");
-        }
-
-        if (flags & EDG_WLL_STAT_CHILDHIST_THOROUGH) {
-                if (cflags) {
-                        asprintf(&temp_cflags,"%s+%s",cflags,"childhist_thorough");
-                        free(cflags);
-                        cflags=temp_cflags;
-                }
-                else asprintf(&cflags,"%s","childhist_thorough");
-        }
-
+char *edg_wll_stat_flags_to_string(int flags)
+{
+       char *cflags = NULL;
+
+        if (flags & EDG_WLL_STAT_CLASSADS)       append_flag(&cflags, "classadd");
+        if (flags & EDG_WLL_STAT_CHILDREN)       append_flag(&cflags, "children");
+        if (flags & EDG_WLL_STAT_CHILDSTAT)      append_flag(&cflags, "childstat");
+        if (flags & EDG_WLL_STAT_NO_JOBS)        append_flag(&cflags, "no_jobs");
+        if (flags & EDG_WLL_STAT_NO_STATES)      append_flag(&cflags, "no_states");
+        if (flags & EDG_WLL_STAT_CHILDHIST_FAST) append_flag(&cflags, "childhist_fast");
+        if (flags & EDG_WLL_STAT_CHILDHIST_THOROUGH) append_flag(&cflags, "childhist_thorough");
+        if (flags & EDG_WLL_NOTIF_BOOTSTRAP)     append_flag(&cflags, "bootstrap");
+        if (flags & EDG_WLL_NOTIF_VOLATILE)      append_flag(&cflags, "volatile");
         if (!cflags) cflags = strdup("");
 
         return(cflags);
@@ -830,6 +797,8 @@ int edg_wll_string_to_stat_flags(char *cflags)
                if (!strcmp(sflag,"no_states")) flags = flags | EDG_WLL_STAT_NO_STATES;
                 if (!strcmp(sflag,"childhist_fast")) flags = flags | EDG_WLL_STAT_CHILDHIST_FAST;
                 if (!strcmp(sflag,"childhist_thorough")) flags = flags | EDG_WLL_STAT_CHILDHIST_THOROUGH;
+               if (!strcmp(sflag,"bootstrap")) flags = flags | EDG_WLL_NOTIF_BOOTSTRAP;
+               if (!strcmp(sflag,"volatile")) flags = flags | EDG_WLL_NOTIF_VOLATILE;
                sflag = strtok_r(NULL, "+", &last);
        } 
 
index efff832..43f8be3 100644 (file)
@@ -1573,6 +1573,12 @@ int bk_accept_serve(int conn, struct timeval *timeout, void *cdata)
                return err;
        }
 
+       // additional actions (notification stream)
+       if (ctx->processRequest_cb) {
+               ctx->processRequest_cb(ctx);
+               ctx->processRequest_cb = NULL;
+       }
+
        if (httpErr == HTTP_BADREQ && body)
                err = try_accept_ws(conn, timeout, cdata, body, strlen(body) + 1);
        if (httpErr != HTTP_BADREQ || err)
@@ -1648,6 +1654,12 @@ int bk_accept_ws(int conn, struct timeval *timeout, void *cdata)
                return ECANCELED;
        }
 
+       // additional actions (notification stream)
+       if (ctx->processRequest_cb) {
+               ctx->processRequest_cb(ctx);
+               ctx->processRequest_cb = NULL;
+       }
+
        return ENOTCONN;
 }
 #endif /* GLITE_LB_SERVER_WITH_WS */
index a1fcd5a..5490193 100644 (file)
@@ -117,6 +117,11 @@ edg_wll_NotifCancelRegId(edg_wll_Context context,
  */
 int edg_wll_NotifMatch(edg_wll_Context context, const edg_wll_JobStat *oldstat, const edg_wll_JobStat *stat);
 
+/**
+ * Check permissions on job status.
+ */
+int edg_wll_NotifCheckACL(edg_wll_Context ctx,const edg_wll_JobStat *stat,const char *recip, int *authz_flags);
+
 #ifdef __cplusplus
 }
 #endif
index f516592..829c54b 100644 (file)
@@ -38,7 +38,6 @@ limitations under the License.
 #include "authz_policy.h"
 
 static int notif_match_conditions(edg_wll_Context,const edg_wll_JobStat *,const edg_wll_JobStat *,const char *);
-static int notif_check_acl(edg_wll_Context,const edg_wll_JobStat *,const char *, int *);
 
 int edg_wll_NotifExpired(edg_wll_Context,const char *);
 
@@ -115,7 +114,7 @@ int edg_wll_NotifMatch(edg_wll_Context ctx, const edg_wll_JobStat *oldstat, cons
                                getpid(),jobc[0],asctime(gmtime(&expires)));
                }
                else if (notif_match_conditions(ctx,oldstat,stat,jobc[4]) &&
-                               notif_check_acl(ctx,stat,jobc[3], &authz_flags))
+                               edg_wll_NotifCheckACL(ctx,stat,jobc[3], &authz_flags))
                {
                        char                       *dest;
 
@@ -208,7 +207,7 @@ static int notif_match_conditions(edg_wll_Context ctx,const edg_wll_JobStat *old
  * effective VOMS groups of the recipient are not available here, should be 
  * probably stored along with the registration.
  */
-static int notif_check_acl(edg_wll_Context ctx,const edg_wll_JobStat *stat,const char *recip, int *authz_flags)
+int edg_wll_NotifCheckACL(edg_wll_Context ctx,const edg_wll_JobStat *stat,const char *recip, int *authz_flags)
 {
        int             ret;
        struct _edg_wll_GssPrincipal_data princ;
index fb7b813..329de31 100644 (file)
@@ -34,9 +34,20 @@ limitations under the License.
 #include "query.h"
 #include "db_supp.h"
 #include "index.h"
+#include "lb_xml_parse.h"
+#include "get_events.h"
 
 
-static char *get_user(edg_wll_Context ctx, int create);
+typedef struct {
+       edg_wll_NotifId *nid;
+       char *nid_s;
+       char *xml_conds;
+       glite_lbu_Statement stmt;
+       int count;
+} notif_stream_t;
+
+
+static char *get_user(edg_wll_Context ctx, int create, char **subj);
 static int check_notif_request(edg_wll_Context, const edg_wll_NotifId, char **, char **);
 static int split_cond_list(edg_wll_Context, edg_wll_QueryRec const * const *,
                                                edg_wll_QueryRec ***, char ***);
@@ -47,6 +58,7 @@ static int get_indexed_cols(edg_wll_Context,char const *,edg_wll_QueryRec **,cha
 static void adjust_validity(edg_wll_Context,time_t *);
 static int drop_notif_request(edg_wll_Context, const edg_wll_NotifId);
 static int check_notif_age(edg_wll_Context, const edg_wll_NotifId);
+static int notif_streaming(edg_wll_Context);
 
 
 int edg_wll_NotifNewServer(
@@ -64,9 +76,11 @@ int edg_wll_NotifNewServer(
                                           *addr_s              = NULL,
                                           *xml_conds   = NULL,
                                           *owner               = NULL,
+                                          *subj                = NULL,
                                          **jobs                = NULL;
        edg_wll_QueryRec  **nconds              = NULL;
        char            *add_index = NULL;
+       notif_stream_t  *arg = NULL;
 
 
        /*      Format notification ID
@@ -76,7 +90,7 @@ int edg_wll_NotifNewServer(
 
        /*      Get notification owner
         */
-       if ( !(owner = get_user(ctx, 1)) )
+       if ( !(owner = get_user(ctx, 1, &subj)) )
                goto cleanup;
 
        /*      Format conditions
@@ -176,6 +190,30 @@ rollback:
 
        } while (edg_wll_TransNeedRetry(ctx));
 
+       // set-up the streaming
+       if ((flags & EDG_WLL_NOTIF_BOOTSTRAP)) {
+               char *tmp;
+
+               arg = calloc(1, sizeof(*arg));
+
+               if ( edg_wll_JobQueryRecToXML(ctx, conditions, &tmp) )
+               {
+                       edg_wll_SetError(ctx, errno, "Can't encode data into xml");
+                       goto cleanup;
+               }
+               asprintf(&arg->xml_conds, "<and>%s</and>", tmp);
+               free(tmp);
+               arg->nid = edg_wll_NotifIdDup(nid);
+               arg->nid_s = strdup(nid_s);
+
+               ctx->processRequest_cb = notif_streaming;
+               ctx->processRequestUserData = arg;
+
+               glite_common_log(LOG_CATEGORY_CONTROL, LOG_PRIORITY_INFO, "[%d] streaming set-up, notif '%s', initial destination '%s'", getpid(), arg->nid_s, addr_s ? : address_override);
+               arg = NULL;
+       }
+
+
 cleanup:
        if ( q ) free(q);
        if ( nid_s ) free(nid_s);
@@ -191,6 +229,8 @@ cleanup:
        }
        if ( nconds ) free(nconds);
        free(add_index);
+       free(arg);
+       free(subj);
 
        return edg_wll_Error(ctx, NULL, NULL);
 }
@@ -452,7 +492,7 @@ err:
 }
 
 
-static char *get_user(edg_wll_Context ctx, int create)
+static char *get_user(edg_wll_Context ctx, int create, char **subj)
 {
        glite_lbu_Statement     stmt    = NULL;
        char               *userid      = NULL,
@@ -466,6 +506,7 @@ static char *get_user(edg_wll_Context ctx, int create)
                goto cleanup;
        }
        can_peername = edg_wll_gss_normalize_subj(ctx->peerName, 0);
+       if (subj) *subj = strdup(can_peername);
        trio_asprintf(&q, "select userid from users where cert_subj='%|Ss'", can_peername);
        glite_common_log_msg(LOG_CATEGORY_LB_SERVER_DB, LOG_PRIORITY_DEBUG, q);
        if ( edg_wll_ExecSQL(ctx, q, &stmt) < 0 )
@@ -521,7 +562,7 @@ static int check_notif_request(
 
 
        /* XXX: rewrite select below in order to handle cert_subj format changes */
-       if ( !(user = get_user(ctx, 0)) )
+       if ( !(user = get_user(ctx, 0, NULL)) )
        {
                if ( !edg_wll_Error(ctx, NULL, NULL) )
                        edg_wll_SetError(ctx, EPERM, "Unknown user");
@@ -876,3 +917,138 @@ cleanup:
        return edg_wll_Error(ctx, NULL, NULL);
 }
 
+// refresh notification info from DB
+static int notif_streaming_refresh(edg_wll_Context ctx, glite_lbu_Statement stmt, const char *nid_s, char **row, size_t n) {
+       int ret;
+
+       if (glite_lbu_ExecPreparedStmt(stmt, 1, GLITE_LBU_DB_TYPE_CHAR, nid_s) == -1
+        || (ret = glite_lbu_FetchRow(stmt, n, NULL, row)) == -1) {
+               glite_common_log(LOG_CATEGORY_LB_SERVER_DB, LOG_PRIORITY_FATAL, "[%d] NOTIFY stream: %s, refresh DB query failed", getpid(), nid_s);
+               return edg_wll_SetErrorDB(ctx);
+       }
+
+       if (ret == 0) {
+               return edg_wll_SetError(ctx, ENOENT, "notification not registered");
+       }
+
+       return 0;
+}
+
+static int notif_streaming_event_cb(edg_wll_Context ctx, glite_jobid_t jobid, edg_wll_JobStat *stat, void *arg) {
+       char                       *dest;
+       const char *owner;
+       int flags;
+       time_t expire;
+       int authz_flags;
+       size_t i;
+       char *ju;
+       char *row[4] = {};
+       notif_stream_t *lctx = (notif_stream_t *)arg;
+
+       if (!stat || !jobid) {
+       // last job mark
+               goto cleanup;
+       }
+
+       if (notif_streaming_refresh(ctx, lctx->stmt, lctx->nid_s, row, sizeof(row)/sizeof(row[0])) != 0) {
+               if (edg_wll_Error(ctx, NULL, NULL) == ENOENT) {
+                       glite_common_log(LOG_CATEGORY_CONTROL, LOG_PRIORITY_WARN, "[%d] NOTIFY stream: %s, bootstrap interrupted (notification expired?)", getpid(), lctx->nid_s);
+               }
+               goto cleanup;
+       }
+
+       expire = glite_lbu_DBToTime(ctx->dbctx, row[1]);
+       flags = atoi(row[2]);
+       owner = row[3];
+
+       if (!edg_wll_NotifCheckACL(ctx, (const edg_wll_JobStat *)stat, owner, &authz_flags)) {
+               glite_common_log(LOG_CATEGORY_CONTROL, LOG_PRIORITY_INFO, "[%d] NOTIFY stream: %s, authorization failed, job %s", getpid(), lctx->nid_s, ju = glite_jobid_getUnique(stat->jobId));
+               free(ju); ju = NULL;
+               edg_wll_ResetError(ctx);
+               goto cleanup;
+       }
+
+       glite_common_log(LOG_CATEGORY_CONTROL, LOG_PRIORITY_INFO, "[%d] NOTIFY stream: %s, job %s, destination '%s'", getpid(), lctx->nid_s, ju = glite_jobid_getUnique(stat->jobId), row[0]);
+       free(ju); ju = NULL;
+
+       dest = strdup(row[0]);
+       /*XXX: ??? copied from notif_match.c */ free(ctx->p_instance); ctx->p_instance = strdup("");
+       if (edg_wll_NotifJobStatus(ctx, lctx->nid, dest, owner, flags, authz_flags, expire, (const edg_wll_JobStat)(*stat)) != 0) {
+               glite_common_log(LOG_CATEGORY_CONTROL, LOG_PRIORITY_ERROR, "[%d] NOTIFY stream: %s, error", getpid(), lctx->nid_s);
+               goto cleanup;
+       }
+       lctx->count++;
+
+cleanup:
+       for (i = 0; i < sizeof(row)/sizeof(row[0]); i++) free(row[i]);
+       glite_jobid_free(jobid);
+       edg_wll_FreeStatus(stat);
+       return edg_wll_Error(ctx, NULL, NULL);
+}
+
+static int notif_streaming(edg_wll_Context ctx) {
+       notif_stream_t *arg = (notif_stream_t *)ctx->processRequestUserData;
+       edg_wlc_JobId *jobs = NULL;
+       edg_wll_JobStat *states = NULL;
+       size_t i;
+       char *row[4];
+       int flags;
+       edg_wll_QueryRec **qs = NULL, *q = NULL;
+       char *err = NULL, *desc = NULL;
+
+       glite_common_log(LOG_CATEGORY_CONTROL, LOG_PRIORITY_INFO, "[%d] NOTIFY stream: %s, START", getpid(), arg->nid_s);
+
+       if (glite_lbu_PrepareStmt(ctx->dbctx, "SELECT destination, valid, flags, cert_subj FROM notif_registrations nr, users u WHERE nr.userid = u.userid AND notifid = ?", &arg->stmt) != 0) {
+               edg_wll_SetErrorDB(ctx);
+               goto quit;
+       }
+       if (notif_streaming_refresh(ctx, arg->stmt, arg->nid_s, row, sizeof(row)/sizeof(row[0])) != 0) goto quit;
+       flags = atoi(row[2]);
+       for (i = 0; i < sizeof(row)/sizeof(row[0]); i++) free(row[i]);
+
+       if (parseJobQueryRec(ctx, arg->xml_conds, strlen(arg->xml_conds), &qs) != 0) {
+               glite_common_log(LOG_CATEGORY_CONTROL, LOG_PRIORITY_ERROR,
+                       "[%d] NOTIFY stream: %s, parseJobQueryRec failed", getpid(), arg->nid_s);
+               goto quit;
+       }
+
+       // perform the streaming
+       if (edg_wll_QueryJobsServerStream(ctx, (edg_wll_QueryRec const **)qs, flags, notif_streaming_event_cb, arg) != 0)
+               goto quit;
+
+       glite_common_log(LOG_CATEGORY_CONTROL, LOG_PRIORITY_INFO, "[%d] NOTIFY stream: %s, END, %d jobs sent", getpid(), arg->nid_s, arg->count);
+
+quit:
+       if (edg_wll_Error(ctx, &err, &desc))
+               glite_common_log(LOG_CATEGORY_CONTROL, LOG_PRIORITY_ERROR, "[%d] NOTIFY stream: %s, %s: %s", getpid(), arg->nid_s, err, desc);
+       free(err);
+       free(desc);
+
+       ctx->processRequest_cb = NULL;
+       ctx->processRequestUserData = NULL;
+       edg_wll_NotifIdFree(arg->nid);
+       free(arg->nid_s);
+       free(arg->xml_conds);
+       glite_lbu_FreeStmt(&arg->stmt);
+       free(arg);
+
+       if (qs) {
+               for (i = 0; qs[i]; i++) {
+                       for (q = qs[i]; q->attr; q++) {
+                               edg_wll_QueryRecFree(q);
+                       }
+                       free(qs[i]);
+               }
+               free(qs);
+       }
+       if (jobs) {
+               for (i = 0; jobs[i]; i++) glite_jobid_free(jobs[i]);
+               free(jobs);
+       }
+       if (states) {
+               for (i = 0; states[i].jobId; i++) edg_wll_FreeStatus(states+i);
+               free(states);
+       }
+
+       return edg_wll_Error(ctx, NULL, NULL);
+}