From: František Dvořák Date: Mon, 7 Feb 2011 18:56:41 +0000 (+0000) Subject: Streaming notification interface implementation: X-Git-Tag: merge_21_head_round3_dst~15 X-Git-Url: http://scientific.zcu.cz/git/?a=commitdiff_plain;h=9e47a6d893134196a2148df3ccc72a0aa7cf1114;p=jra1mw.git Streaming notification interface implementation: - add new notification flags (+simplify flags string creation) - setup the streaming on bootstrap flag - perform streaming through deferred action in bkserverd --- diff --git a/org.glite.lb.client/src/notify.c b/org.glite.lb.client/src/notify.c index 8cd78ac..df10e43 100644 --- a/org.glite.lb.client/src/notify.c +++ b/org.glite.lb.client/src/notify.c @@ -55,6 +55,7 @@ static void usage(char *cmd) " owner Match this owner DN\n" " requested_validity Validity of notification req. in seconds\n" " flags 0 - return basic status, 1 - return also JDL in status\n" + " 256 - bootstrap stream (send all existing jobs too)\n" " network_server Match only this network server (WMS entry point)\n" " -O Match owner - credentials are retrieved from environment\n" " -c Match only on state change\n" diff --git a/org.glite.lb.common/interface/context-int.h b/org.glite.lb.common/interface/context-int.h index 48c5d74..e348c14 100644 --- a/org.glite.lb.common/interface/context-int.h +++ b/org.glite.lb.common/interface/context-int.h @@ -178,6 +178,9 @@ glite_lb_padded_struct(_edg_wll_Context,150, _edg_wll_authz_policy authz_policy; int exclusive_zombies; + + int (*processRequest_cb)(edg_wll_Context ctx); + void *processRequestUserData; ) /* to be used internally: set, update and and clear the error information in diff --git a/org.glite.lb.common/interface/jobstat.h.T b/org.glite.lb.common/interface/jobstat.h.T index d9fdc87..c870a84 100644 --- a/org.glite.lb.common/interface/jobstat.h.T +++ b/org.glite.lb.common/interface/jobstat.h.T @@ -170,6 +170,8 @@ typedef struct _edg_wll_JobStat { #define EDG_WLL_STAT_CHILDSTAT 4 /**< apply the flags recursively to subjobs */ #define EDG_WLL_STAT_CHILDHIST_FAST 8 /**< partially complete histogram of child job states */ #define EDG_WLL_STAT_CHILDHIST_THOROUGH 16 /**< full and up-to date histogram of child job states */ +#define EDG_WLL_NOTIF_BOOTSTRAP 256 /**< send the state of the all already existing jobs too */ +#define EDG_WLL_NOTIF_VOLATILE 512 /**< (not used yet) send the notifications directly, without reliability and persistency */ /* starting from bit 10 private flags begins - do not add 1024 and more! */ diff --git a/org.glite.lb.common/src/xml_conversions.c b/org.glite.lb.common/src/xml_conversions.c index 1a7ba37..f734bf5 100644 --- a/org.glite.lb.common/src/xml_conversions.c +++ b/org.glite.lb.common/src/xml_conversions.c @@ -751,63 +751,30 @@ void *edg_wll_from_string_to_cclassad(edg_wll_XML_ctx *XMLCtx) /* various conversion functions */ -char *edg_wll_stat_flags_to_string(int flags) -{ - char *cflags = NULL, *temp_cflags = NULL; +static void append_flag(char **cflags, const char *cflag) { + char *temp_cflags; + if (*cflags) { + asprintf(&temp_cflags, "%s+%s", *cflags, cflag); + free(*cflags); + *cflags = temp_cflags; + } else + asprintf(cflags, "%s", cflag); +} - if (flags & EDG_WLL_STAT_CLASSADS) asprintf(&cflags,"%s","classadd"); - if (flags & EDG_WLL_STAT_CHILDREN) { - if (cflags) { - asprintf(&temp_cflags,"%s+%s",cflags,"children"); - free(cflags); - cflags=temp_cflags; - } - else asprintf(&cflags,"%s","children"); - } - if (flags & EDG_WLL_STAT_CHILDSTAT) { - if (cflags) { - asprintf(&temp_cflags,"%s+%s",cflags,"childstat"); - free(cflags); - cflags=temp_cflags; - } - else asprintf(&cflags,"%s","childstat"); - } - if (flags & EDG_WLL_STAT_NO_JOBS) { - if (cflags) { - asprintf(&temp_cflags,"%s+%s",cflags,"no_jobs"); - free(cflags); - cflags=temp_cflags; - } - else asprintf(&cflags,"%s","no_jobs"); - } - if (flags & EDG_WLL_STAT_NO_STATES) { - if (cflags) { - asprintf(&temp_cflags,"%s+%s",cflags,"no_states"); - free(cflags); - cflags=temp_cflags; - } - else asprintf(&cflags,"%s","no_states"); - } - - if (flags & EDG_WLL_STAT_CHILDHIST_FAST) { - if (cflags) { - asprintf(&temp_cflags,"%s+%s",cflags,"childhist_fast"); - free(cflags); - cflags=temp_cflags; - } - else asprintf(&cflags,"%s","childhist_fast"); - } - - if (flags & EDG_WLL_STAT_CHILDHIST_THOROUGH) { - if (cflags) { - asprintf(&temp_cflags,"%s+%s",cflags,"childhist_thorough"); - free(cflags); - cflags=temp_cflags; - } - else asprintf(&cflags,"%s","childhist_thorough"); - } - +char *edg_wll_stat_flags_to_string(int flags) +{ + char *cflags = NULL; + + if (flags & EDG_WLL_STAT_CLASSADS) append_flag(&cflags, "classadd"); + if (flags & EDG_WLL_STAT_CHILDREN) append_flag(&cflags, "children"); + if (flags & EDG_WLL_STAT_CHILDSTAT) append_flag(&cflags, "childstat"); + if (flags & EDG_WLL_STAT_NO_JOBS) append_flag(&cflags, "no_jobs"); + if (flags & EDG_WLL_STAT_NO_STATES) append_flag(&cflags, "no_states"); + if (flags & EDG_WLL_STAT_CHILDHIST_FAST) append_flag(&cflags, "childhist_fast"); + if (flags & EDG_WLL_STAT_CHILDHIST_THOROUGH) append_flag(&cflags, "childhist_thorough"); + if (flags & EDG_WLL_NOTIF_BOOTSTRAP) append_flag(&cflags, "bootstrap"); + if (flags & EDG_WLL_NOTIF_VOLATILE) append_flag(&cflags, "volatile"); if (!cflags) cflags = strdup(""); return(cflags); @@ -830,6 +797,8 @@ int edg_wll_string_to_stat_flags(char *cflags) if (!strcmp(sflag,"no_states")) flags = flags | EDG_WLL_STAT_NO_STATES; if (!strcmp(sflag,"childhist_fast")) flags = flags | EDG_WLL_STAT_CHILDHIST_FAST; if (!strcmp(sflag,"childhist_thorough")) flags = flags | EDG_WLL_STAT_CHILDHIST_THOROUGH; + if (!strcmp(sflag,"bootstrap")) flags = flags | EDG_WLL_NOTIF_BOOTSTRAP; + if (!strcmp(sflag,"volatile")) flags = flags | EDG_WLL_NOTIF_VOLATILE; sflag = strtok_r(NULL, "+", &last); } diff --git a/org.glite.lb.server/src/bkserverd.c b/org.glite.lb.server/src/bkserverd.c index efff832..43f8be3 100644 --- a/org.glite.lb.server/src/bkserverd.c +++ b/org.glite.lb.server/src/bkserverd.c @@ -1573,6 +1573,12 @@ int bk_accept_serve(int conn, struct timeval *timeout, void *cdata) return err; } + // additional actions (notification stream) + if (ctx->processRequest_cb) { + ctx->processRequest_cb(ctx); + ctx->processRequest_cb = NULL; + } + if (httpErr == HTTP_BADREQ && body) err = try_accept_ws(conn, timeout, cdata, body, strlen(body) + 1); if (httpErr != HTTP_BADREQ || err) @@ -1648,6 +1654,12 @@ int bk_accept_ws(int conn, struct timeval *timeout, void *cdata) return ECANCELED; } + // additional actions (notification stream) + if (ctx->processRequest_cb) { + ctx->processRequest_cb(ctx); + ctx->processRequest_cb = NULL; + } + return ENOTCONN; } #endif /* GLITE_LB_SERVER_WITH_WS */ diff --git a/org.glite.lb.server/src/il_notification.h b/org.glite.lb.server/src/il_notification.h index a1fcd5a..5490193 100644 --- a/org.glite.lb.server/src/il_notification.h +++ b/org.glite.lb.server/src/il_notification.h @@ -117,6 +117,11 @@ edg_wll_NotifCancelRegId(edg_wll_Context context, */ int edg_wll_NotifMatch(edg_wll_Context context, const edg_wll_JobStat *oldstat, const edg_wll_JobStat *stat); +/** + * Check permissions on job status. + */ +int edg_wll_NotifCheckACL(edg_wll_Context ctx,const edg_wll_JobStat *stat,const char *recip, int *authz_flags); + #ifdef __cplusplus } #endif diff --git a/org.glite.lb.server/src/notif_match.c b/org.glite.lb.server/src/notif_match.c index f516592..829c54b 100644 --- a/org.glite.lb.server/src/notif_match.c +++ b/org.glite.lb.server/src/notif_match.c @@ -38,7 +38,6 @@ limitations under the License. #include "authz_policy.h" static int notif_match_conditions(edg_wll_Context,const edg_wll_JobStat *,const edg_wll_JobStat *,const char *); -static int notif_check_acl(edg_wll_Context,const edg_wll_JobStat *,const char *, int *); int edg_wll_NotifExpired(edg_wll_Context,const char *); @@ -115,7 +114,7 @@ int edg_wll_NotifMatch(edg_wll_Context ctx, const edg_wll_JobStat *oldstat, cons getpid(),jobc[0],asctime(gmtime(&expires))); } else if (notif_match_conditions(ctx,oldstat,stat,jobc[4]) && - notif_check_acl(ctx,stat,jobc[3], &authz_flags)) + edg_wll_NotifCheckACL(ctx,stat,jobc[3], &authz_flags)) { char *dest; @@ -208,7 +207,7 @@ static int notif_match_conditions(edg_wll_Context ctx,const edg_wll_JobStat *old * effective VOMS groups of the recipient are not available here, should be * probably stored along with the registration. */ -static int notif_check_acl(edg_wll_Context ctx,const edg_wll_JobStat *stat,const char *recip, int *authz_flags) +int edg_wll_NotifCheckACL(edg_wll_Context ctx,const edg_wll_JobStat *stat,const char *recip, int *authz_flags) { int ret; struct _edg_wll_GssPrincipal_data princ; diff --git a/org.glite.lb.server/src/notification.c b/org.glite.lb.server/src/notification.c index fb7b813..329de31 100644 --- a/org.glite.lb.server/src/notification.c +++ b/org.glite.lb.server/src/notification.c @@ -34,9 +34,20 @@ limitations under the License. #include "query.h" #include "db_supp.h" #include "index.h" +#include "lb_xml_parse.h" +#include "get_events.h" -static char *get_user(edg_wll_Context ctx, int create); +typedef struct { + edg_wll_NotifId *nid; + char *nid_s; + char *xml_conds; + glite_lbu_Statement stmt; + int count; +} notif_stream_t; + + +static char *get_user(edg_wll_Context ctx, int create, char **subj); static int check_notif_request(edg_wll_Context, const edg_wll_NotifId, char **, char **); static int split_cond_list(edg_wll_Context, edg_wll_QueryRec const * const *, edg_wll_QueryRec ***, char ***); @@ -47,6 +58,7 @@ static int get_indexed_cols(edg_wll_Context,char const *,edg_wll_QueryRec **,cha static void adjust_validity(edg_wll_Context,time_t *); static int drop_notif_request(edg_wll_Context, const edg_wll_NotifId); static int check_notif_age(edg_wll_Context, const edg_wll_NotifId); +static int notif_streaming(edg_wll_Context); int edg_wll_NotifNewServer( @@ -64,9 +76,11 @@ int edg_wll_NotifNewServer( *addr_s = NULL, *xml_conds = NULL, *owner = NULL, + *subj = NULL, **jobs = NULL; edg_wll_QueryRec **nconds = NULL; char *add_index = NULL; + notif_stream_t *arg = NULL; /* Format notification ID @@ -76,7 +90,7 @@ int edg_wll_NotifNewServer( /* Get notification owner */ - if ( !(owner = get_user(ctx, 1)) ) + if ( !(owner = get_user(ctx, 1, &subj)) ) goto cleanup; /* Format conditions @@ -176,6 +190,30 @@ rollback: } while (edg_wll_TransNeedRetry(ctx)); + // set-up the streaming + if ((flags & EDG_WLL_NOTIF_BOOTSTRAP)) { + char *tmp; + + arg = calloc(1, sizeof(*arg)); + + if ( edg_wll_JobQueryRecToXML(ctx, conditions, &tmp) ) + { + edg_wll_SetError(ctx, errno, "Can't encode data into xml"); + goto cleanup; + } + asprintf(&arg->xml_conds, "%s", tmp); + free(tmp); + arg->nid = edg_wll_NotifIdDup(nid); + arg->nid_s = strdup(nid_s); + + ctx->processRequest_cb = notif_streaming; + ctx->processRequestUserData = arg; + + glite_common_log(LOG_CATEGORY_CONTROL, LOG_PRIORITY_INFO, "[%d] streaming set-up, notif '%s', initial destination '%s'", getpid(), arg->nid_s, addr_s ? : address_override); + arg = NULL; + } + + cleanup: if ( q ) free(q); if ( nid_s ) free(nid_s); @@ -191,6 +229,8 @@ cleanup: } if ( nconds ) free(nconds); free(add_index); + free(arg); + free(subj); return edg_wll_Error(ctx, NULL, NULL); } @@ -452,7 +492,7 @@ err: } -static char *get_user(edg_wll_Context ctx, int create) +static char *get_user(edg_wll_Context ctx, int create, char **subj) { glite_lbu_Statement stmt = NULL; char *userid = NULL, @@ -466,6 +506,7 @@ static char *get_user(edg_wll_Context ctx, int create) goto cleanup; } can_peername = edg_wll_gss_normalize_subj(ctx->peerName, 0); + if (subj) *subj = strdup(can_peername); trio_asprintf(&q, "select userid from users where cert_subj='%|Ss'", can_peername); glite_common_log_msg(LOG_CATEGORY_LB_SERVER_DB, LOG_PRIORITY_DEBUG, q); if ( edg_wll_ExecSQL(ctx, q, &stmt) < 0 ) @@ -521,7 +562,7 @@ static int check_notif_request( /* XXX: rewrite select below in order to handle cert_subj format changes */ - if ( !(user = get_user(ctx, 0)) ) + if ( !(user = get_user(ctx, 0, NULL)) ) { if ( !edg_wll_Error(ctx, NULL, NULL) ) edg_wll_SetError(ctx, EPERM, "Unknown user"); @@ -876,3 +917,138 @@ cleanup: return edg_wll_Error(ctx, NULL, NULL); } +// refresh notification info from DB +static int notif_streaming_refresh(edg_wll_Context ctx, glite_lbu_Statement stmt, const char *nid_s, char **row, size_t n) { + int ret; + + if (glite_lbu_ExecPreparedStmt(stmt, 1, GLITE_LBU_DB_TYPE_CHAR, nid_s) == -1 + || (ret = glite_lbu_FetchRow(stmt, n, NULL, row)) == -1) { + glite_common_log(LOG_CATEGORY_LB_SERVER_DB, LOG_PRIORITY_FATAL, "[%d] NOTIFY stream: %s, refresh DB query failed", getpid(), nid_s); + return edg_wll_SetErrorDB(ctx); + } + + if (ret == 0) { + return edg_wll_SetError(ctx, ENOENT, "notification not registered"); + } + + return 0; +} + +static int notif_streaming_event_cb(edg_wll_Context ctx, glite_jobid_t jobid, edg_wll_JobStat *stat, void *arg) { + char *dest; + const char *owner; + int flags; + time_t expire; + int authz_flags; + size_t i; + char *ju; + char *row[4] = {}; + notif_stream_t *lctx = (notif_stream_t *)arg; + + if (!stat || !jobid) { + // last job mark + goto cleanup; + } + + if (notif_streaming_refresh(ctx, lctx->stmt, lctx->nid_s, row, sizeof(row)/sizeof(row[0])) != 0) { + if (edg_wll_Error(ctx, NULL, NULL) == ENOENT) { + glite_common_log(LOG_CATEGORY_CONTROL, LOG_PRIORITY_WARN, "[%d] NOTIFY stream: %s, bootstrap interrupted (notification expired?)", getpid(), lctx->nid_s); + } + goto cleanup; + } + + expire = glite_lbu_DBToTime(ctx->dbctx, row[1]); + flags = atoi(row[2]); + owner = row[3]; + + if (!edg_wll_NotifCheckACL(ctx, (const edg_wll_JobStat *)stat, owner, &authz_flags)) { + glite_common_log(LOG_CATEGORY_CONTROL, LOG_PRIORITY_INFO, "[%d] NOTIFY stream: %s, authorization failed, job %s", getpid(), lctx->nid_s, ju = glite_jobid_getUnique(stat->jobId)); + free(ju); ju = NULL; + edg_wll_ResetError(ctx); + goto cleanup; + } + + glite_common_log(LOG_CATEGORY_CONTROL, LOG_PRIORITY_INFO, "[%d] NOTIFY stream: %s, job %s, destination '%s'", getpid(), lctx->nid_s, ju = glite_jobid_getUnique(stat->jobId), row[0]); + free(ju); ju = NULL; + + dest = strdup(row[0]); + /*XXX: ??? copied from notif_match.c */ free(ctx->p_instance); ctx->p_instance = strdup(""); + if (edg_wll_NotifJobStatus(ctx, lctx->nid, dest, owner, flags, authz_flags, expire, (const edg_wll_JobStat)(*stat)) != 0) { + glite_common_log(LOG_CATEGORY_CONTROL, LOG_PRIORITY_ERROR, "[%d] NOTIFY stream: %s, error", getpid(), lctx->nid_s); + goto cleanup; + } + lctx->count++; + +cleanup: + for (i = 0; i < sizeof(row)/sizeof(row[0]); i++) free(row[i]); + glite_jobid_free(jobid); + edg_wll_FreeStatus(stat); + return edg_wll_Error(ctx, NULL, NULL); +} + +static int notif_streaming(edg_wll_Context ctx) { + notif_stream_t *arg = (notif_stream_t *)ctx->processRequestUserData; + edg_wlc_JobId *jobs = NULL; + edg_wll_JobStat *states = NULL; + size_t i; + char *row[4]; + int flags; + edg_wll_QueryRec **qs = NULL, *q = NULL; + char *err = NULL, *desc = NULL; + + glite_common_log(LOG_CATEGORY_CONTROL, LOG_PRIORITY_INFO, "[%d] NOTIFY stream: %s, START", getpid(), arg->nid_s); + + if (glite_lbu_PrepareStmt(ctx->dbctx, "SELECT destination, valid, flags, cert_subj FROM notif_registrations nr, users u WHERE nr.userid = u.userid AND notifid = ?", &arg->stmt) != 0) { + edg_wll_SetErrorDB(ctx); + goto quit; + } + if (notif_streaming_refresh(ctx, arg->stmt, arg->nid_s, row, sizeof(row)/sizeof(row[0])) != 0) goto quit; + flags = atoi(row[2]); + for (i = 0; i < sizeof(row)/sizeof(row[0]); i++) free(row[i]); + + if (parseJobQueryRec(ctx, arg->xml_conds, strlen(arg->xml_conds), &qs) != 0) { + glite_common_log(LOG_CATEGORY_CONTROL, LOG_PRIORITY_ERROR, + "[%d] NOTIFY stream: %s, parseJobQueryRec failed", getpid(), arg->nid_s); + goto quit; + } + + // perform the streaming + if (edg_wll_QueryJobsServerStream(ctx, (edg_wll_QueryRec const **)qs, flags, notif_streaming_event_cb, arg) != 0) + goto quit; + + glite_common_log(LOG_CATEGORY_CONTROL, LOG_PRIORITY_INFO, "[%d] NOTIFY stream: %s, END, %d jobs sent", getpid(), arg->nid_s, arg->count); + +quit: + if (edg_wll_Error(ctx, &err, &desc)) + glite_common_log(LOG_CATEGORY_CONTROL, LOG_PRIORITY_ERROR, "[%d] NOTIFY stream: %s, %s: %s", getpid(), arg->nid_s, err, desc); + free(err); + free(desc); + + ctx->processRequest_cb = NULL; + ctx->processRequestUserData = NULL; + edg_wll_NotifIdFree(arg->nid); + free(arg->nid_s); + free(arg->xml_conds); + glite_lbu_FreeStmt(&arg->stmt); + free(arg); + + if (qs) { + for (i = 0; qs[i]; i++) { + for (q = qs[i]; q->attr; q++) { + edg_wll_QueryRecFree(q); + } + free(qs[i]); + } + free(qs); + } + if (jobs) { + for (i = 0; jobs[i]; i++) glite_jobid_free(jobs[i]); + free(jobs); + } + if (states) { + for (i = 0; states[i].jobId; i++) edg_wll_FreeStatus(states+i); + free(states); + } + + return edg_wll_Error(ctx, NULL, NULL); +}