/* various conversion functions */
-char *edg_wll_stat_flags_to_string(int flags)
-{
- char *cflags = NULL, *temp_cflags = NULL;
+static void append_flag(char **cflags, const char *cflag) {
+ char *temp_cflags;
+ if (*cflags) {
+ asprintf(&temp_cflags, "%s+%s", *cflags, cflag);
+ free(*cflags);
+ *cflags = temp_cflags;
+ } else
+ asprintf(cflags, "%s", cflag);
+}
- if (flags & EDG_WLL_STAT_CLASSADS) asprintf(&cflags,"%s","classadd");
- if (flags & EDG_WLL_STAT_CHILDREN) {
- if (cflags) {
- asprintf(&temp_cflags,"%s+%s",cflags,"children");
- free(cflags);
- cflags=temp_cflags;
- }
- else asprintf(&cflags,"%s","children");
- }
- if (flags & EDG_WLL_STAT_CHILDSTAT) {
- if (cflags) {
- asprintf(&temp_cflags,"%s+%s",cflags,"childstat");
- free(cflags);
- cflags=temp_cflags;
- }
- else asprintf(&cflags,"%s","childstat");
- }
- if (flags & EDG_WLL_STAT_NO_JOBS) {
- if (cflags) {
- asprintf(&temp_cflags,"%s+%s",cflags,"no_jobs");
- free(cflags);
- cflags=temp_cflags;
- }
- else asprintf(&cflags,"%s","no_jobs");
- }
- if (flags & EDG_WLL_STAT_NO_STATES) {
- if (cflags) {
- asprintf(&temp_cflags,"%s+%s",cflags,"no_states");
- free(cflags);
- cflags=temp_cflags;
- }
- else asprintf(&cflags,"%s","no_states");
- }
-
- if (flags & EDG_WLL_STAT_CHILDHIST_FAST) {
- if (cflags) {
- asprintf(&temp_cflags,"%s+%s",cflags,"childhist_fast");
- free(cflags);
- cflags=temp_cflags;
- }
- else asprintf(&cflags,"%s","childhist_fast");
- }
-
- if (flags & EDG_WLL_STAT_CHILDHIST_THOROUGH) {
- if (cflags) {
- asprintf(&temp_cflags,"%s+%s",cflags,"childhist_thorough");
- free(cflags);
- cflags=temp_cflags;
- }
- else asprintf(&cflags,"%s","childhist_thorough");
- }
-
+char *edg_wll_stat_flags_to_string(int flags)
+{
+ char *cflags = NULL;
+
+ if (flags & EDG_WLL_STAT_CLASSADS) append_flag(&cflags, "classadd");
+ if (flags & EDG_WLL_STAT_CHILDREN) append_flag(&cflags, "children");
+ if (flags & EDG_WLL_STAT_CHILDSTAT) append_flag(&cflags, "childstat");
+ if (flags & EDG_WLL_STAT_NO_JOBS) append_flag(&cflags, "no_jobs");
+ if (flags & EDG_WLL_STAT_NO_STATES) append_flag(&cflags, "no_states");
+ if (flags & EDG_WLL_STAT_CHILDHIST_FAST) append_flag(&cflags, "childhist_fast");
+ if (flags & EDG_WLL_STAT_CHILDHIST_THOROUGH) append_flag(&cflags, "childhist_thorough");
+ if (flags & EDG_WLL_NOTIF_BOOTSTRAP) append_flag(&cflags, "bootstrap");
+ if (flags & EDG_WLL_NOTIF_VOLATILE) append_flag(&cflags, "volatile");
if (!cflags) cflags = strdup("");
return(cflags);
if (!strcmp(sflag,"no_states")) flags = flags | EDG_WLL_STAT_NO_STATES;
if (!strcmp(sflag,"childhist_fast")) flags = flags | EDG_WLL_STAT_CHILDHIST_FAST;
if (!strcmp(sflag,"childhist_thorough")) flags = flags | EDG_WLL_STAT_CHILDHIST_THOROUGH;
+ if (!strcmp(sflag,"bootstrap")) flags = flags | EDG_WLL_NOTIF_BOOTSTRAP;
+ if (!strcmp(sflag,"volatile")) flags = flags | EDG_WLL_NOTIF_VOLATILE;
sflag = strtok_r(NULL, "+", &last);
}
#include "query.h"
#include "db_supp.h"
#include "index.h"
+#include "lb_xml_parse.h"
+#include "get_events.h"
-static char *get_user(edg_wll_Context ctx, int create);
+typedef struct {
+ edg_wll_NotifId *nid;
+ char *nid_s;
+ char *xml_conds;
+ glite_lbu_Statement stmt;
+ int count;
+} notif_stream_t;
+
+
+static char *get_user(edg_wll_Context ctx, int create, char **subj);
static int check_notif_request(edg_wll_Context, const edg_wll_NotifId, char **, char **);
static int split_cond_list(edg_wll_Context, edg_wll_QueryRec const * const *,
edg_wll_QueryRec ***, char ***);
static void adjust_validity(edg_wll_Context,time_t *);
static int drop_notif_request(edg_wll_Context, const edg_wll_NotifId);
static int check_notif_age(edg_wll_Context, const edg_wll_NotifId);
+static int notif_streaming(edg_wll_Context);
int edg_wll_NotifNewServer(
*addr_s = NULL,
*xml_conds = NULL,
*owner = NULL,
+ *subj = NULL,
**jobs = NULL;
edg_wll_QueryRec **nconds = NULL;
char *add_index = NULL;
+ notif_stream_t *arg = NULL;
/* Format notification ID
/* Get notification owner
*/
- if ( !(owner = get_user(ctx, 1)) )
+ if ( !(owner = get_user(ctx, 1, &subj)) )
goto cleanup;
/* Format conditions
} while (edg_wll_TransNeedRetry(ctx));
+ // set-up the streaming
+ if ((flags & EDG_WLL_NOTIF_BOOTSTRAP)) {
+ char *tmp;
+
+ arg = calloc(1, sizeof(*arg));
+
+ if ( edg_wll_JobQueryRecToXML(ctx, conditions, &tmp) )
+ {
+ edg_wll_SetError(ctx, errno, "Can't encode data into xml");
+ goto cleanup;
+ }
+ asprintf(&arg->xml_conds, "<and>%s</and>", tmp);
+ free(tmp);
+ arg->nid = edg_wll_NotifIdDup(nid);
+ arg->nid_s = strdup(nid_s);
+
+ ctx->processRequest_cb = notif_streaming;
+ ctx->processRequestUserData = arg;
+
+ glite_common_log(LOG_CATEGORY_CONTROL, LOG_PRIORITY_INFO, "[%d] streaming set-up, notif '%s', initial destination '%s'", getpid(), arg->nid_s, addr_s ? : address_override);
+ arg = NULL;
+ }
+
+
cleanup:
if ( q ) free(q);
if ( nid_s ) free(nid_s);
}
if ( nconds ) free(nconds);
free(add_index);
+ free(arg);
+ free(subj);
return edg_wll_Error(ctx, NULL, NULL);
}
}
-static char *get_user(edg_wll_Context ctx, int create)
+static char *get_user(edg_wll_Context ctx, int create, char **subj)
{
glite_lbu_Statement stmt = NULL;
char *userid = NULL,
goto cleanup;
}
can_peername = edg_wll_gss_normalize_subj(ctx->peerName, 0);
+ if (subj) *subj = strdup(can_peername);
trio_asprintf(&q, "select userid from users where cert_subj='%|Ss'", can_peername);
glite_common_log_msg(LOG_CATEGORY_LB_SERVER_DB, LOG_PRIORITY_DEBUG, q);
if ( edg_wll_ExecSQL(ctx, q, &stmt) < 0 )
/* XXX: rewrite select below in order to handle cert_subj format changes */
- if ( !(user = get_user(ctx, 0)) )
+ if ( !(user = get_user(ctx, 0, NULL)) )
{
if ( !edg_wll_Error(ctx, NULL, NULL) )
edg_wll_SetError(ctx, EPERM, "Unknown user");
return edg_wll_Error(ctx, NULL, NULL);
}
+// refresh notification info from DB
+static int notif_streaming_refresh(edg_wll_Context ctx, glite_lbu_Statement stmt, const char *nid_s, char **row, size_t n) {
+ int ret;
+
+ if (glite_lbu_ExecPreparedStmt(stmt, 1, GLITE_LBU_DB_TYPE_CHAR, nid_s) == -1
+ || (ret = glite_lbu_FetchRow(stmt, n, NULL, row)) == -1) {
+ glite_common_log(LOG_CATEGORY_LB_SERVER_DB, LOG_PRIORITY_FATAL, "[%d] NOTIFY stream: %s, refresh DB query failed", getpid(), nid_s);
+ return edg_wll_SetErrorDB(ctx);
+ }
+
+ if (ret == 0) {
+ return edg_wll_SetError(ctx, ENOENT, "notification not registered");
+ }
+
+ return 0;
+}
+
+static int notif_streaming_event_cb(edg_wll_Context ctx, glite_jobid_t jobid, edg_wll_JobStat *stat, void *arg) {
+ char *dest;
+ const char *owner;
+ int flags;
+ time_t expire;
+ int authz_flags;
+ size_t i;
+ char *ju;
+ char *row[4] = {};
+ notif_stream_t *lctx = (notif_stream_t *)arg;
+
+ if (!stat || !jobid) {
+ // last job mark
+ goto cleanup;
+ }
+
+ if (notif_streaming_refresh(ctx, lctx->stmt, lctx->nid_s, row, sizeof(row)/sizeof(row[0])) != 0) {
+ if (edg_wll_Error(ctx, NULL, NULL) == ENOENT) {
+ glite_common_log(LOG_CATEGORY_CONTROL, LOG_PRIORITY_WARN, "[%d] NOTIFY stream: %s, bootstrap interrupted (notification expired?)", getpid(), lctx->nid_s);
+ }
+ goto cleanup;
+ }
+
+ expire = glite_lbu_DBToTime(ctx->dbctx, row[1]);
+ flags = atoi(row[2]);
+ owner = row[3];
+
+ if (!edg_wll_NotifCheckACL(ctx, (const edg_wll_JobStat *)stat, owner, &authz_flags)) {
+ glite_common_log(LOG_CATEGORY_CONTROL, LOG_PRIORITY_INFO, "[%d] NOTIFY stream: %s, authorization failed, job %s", getpid(), lctx->nid_s, ju = glite_jobid_getUnique(stat->jobId));
+ free(ju); ju = NULL;
+ edg_wll_ResetError(ctx);
+ goto cleanup;
+ }
+
+ glite_common_log(LOG_CATEGORY_CONTROL, LOG_PRIORITY_INFO, "[%d] NOTIFY stream: %s, job %s, destination '%s'", getpid(), lctx->nid_s, ju = glite_jobid_getUnique(stat->jobId), row[0]);
+ free(ju); ju = NULL;
+
+ dest = strdup(row[0]);
+ /*XXX: ??? copied from notif_match.c */ free(ctx->p_instance); ctx->p_instance = strdup("");
+ if (edg_wll_NotifJobStatus(ctx, lctx->nid, dest, owner, flags, authz_flags, expire, (const edg_wll_JobStat)(*stat)) != 0) {
+ glite_common_log(LOG_CATEGORY_CONTROL, LOG_PRIORITY_ERROR, "[%d] NOTIFY stream: %s, error", getpid(), lctx->nid_s);
+ goto cleanup;
+ }
+ lctx->count++;
+
+cleanup:
+ for (i = 0; i < sizeof(row)/sizeof(row[0]); i++) free(row[i]);
+ glite_jobid_free(jobid);
+ edg_wll_FreeStatus(stat);
+ return edg_wll_Error(ctx, NULL, NULL);
+}
+
+static int notif_streaming(edg_wll_Context ctx) {
+ notif_stream_t *arg = (notif_stream_t *)ctx->processRequestUserData;
+ edg_wlc_JobId *jobs = NULL;
+ edg_wll_JobStat *states = NULL;
+ size_t i;
+ char *row[4];
+ int flags;
+ edg_wll_QueryRec **qs = NULL, *q = NULL;
+ char *err = NULL, *desc = NULL;
+
+ glite_common_log(LOG_CATEGORY_CONTROL, LOG_PRIORITY_INFO, "[%d] NOTIFY stream: %s, START", getpid(), arg->nid_s);
+
+ if (glite_lbu_PrepareStmt(ctx->dbctx, "SELECT destination, valid, flags, cert_subj FROM notif_registrations nr, users u WHERE nr.userid = u.userid AND notifid = ?", &arg->stmt) != 0) {
+ edg_wll_SetErrorDB(ctx);
+ goto quit;
+ }
+ if (notif_streaming_refresh(ctx, arg->stmt, arg->nid_s, row, sizeof(row)/sizeof(row[0])) != 0) goto quit;
+ flags = atoi(row[2]);
+ for (i = 0; i < sizeof(row)/sizeof(row[0]); i++) free(row[i]);
+
+ if (parseJobQueryRec(ctx, arg->xml_conds, strlen(arg->xml_conds), &qs) != 0) {
+ glite_common_log(LOG_CATEGORY_CONTROL, LOG_PRIORITY_ERROR,
+ "[%d] NOTIFY stream: %s, parseJobQueryRec failed", getpid(), arg->nid_s);
+ goto quit;
+ }
+
+ // perform the streaming
+ if (edg_wll_QueryJobsServerStream(ctx, (edg_wll_QueryRec const **)qs, flags, notif_streaming_event_cb, arg) != 0)
+ goto quit;
+
+ glite_common_log(LOG_CATEGORY_CONTROL, LOG_PRIORITY_INFO, "[%d] NOTIFY stream: %s, END, %d jobs sent", getpid(), arg->nid_s, arg->count);
+
+quit:
+ if (edg_wll_Error(ctx, &err, &desc))
+ glite_common_log(LOG_CATEGORY_CONTROL, LOG_PRIORITY_ERROR, "[%d] NOTIFY stream: %s, %s: %s", getpid(), arg->nid_s, err, desc);
+ free(err);
+ free(desc);
+
+ ctx->processRequest_cb = NULL;
+ ctx->processRequestUserData = NULL;
+ edg_wll_NotifIdFree(arg->nid);
+ free(arg->nid_s);
+ free(arg->xml_conds);
+ glite_lbu_FreeStmt(&arg->stmt);
+ free(arg);
+
+ if (qs) {
+ for (i = 0; qs[i]; i++) {
+ for (q = qs[i]; q->attr; q++) {
+ edg_wll_QueryRecFree(q);
+ }
+ free(qs[i]);
+ }
+ free(qs);
+ }
+ if (jobs) {
+ for (i = 0; jobs[i]; i++) glite_jobid_free(jobs[i]);
+ free(jobs);
+ }
+ if (states) {
+ for (i = 0; states[i].jobId; i++) edg_wll_FreeStatus(states+i);
+ free(states);
+ }
+
+ return edg_wll_Error(ctx, NULL, NULL);
+}