void *notif_index_cols;
time_t notifDurationMax;
- char **super_users;
+ char **msg_prefixes;
time_t rssTime;
_edg_wll_authz_policy authz_policy;
int (*processRequest_cb)(edg_wll_Context ctx);
void *processRequestUserData;
+
+ char **msg_brokers;
)
/* to be used internally: set, update and and clear the error information in
extern void edg_wll_FreeParams(edg_wll_Context context);
+extern int edg_wll_ParseMSGConf(char *msg_conf, char ***brokers, char ***prefixes);
+
#ifdef __cplusplus
}
#endif
if (ctx->jpreg_dir) free(ctx->jpreg_dir);
if (ctx->serverIdentity) free(ctx->serverIdentity);
+ if (ctx->msg_prefixes) {
+ char **fm;
+ for (fm = ctx->msg_prefixes; fm && *fm; fm++)
+ free(*fm);
+ free(ctx->msg_prefixes);
+ ctx->msg_prefixes = NULL;
+ }
+ if (ctx->msg_brokers) {
+ char **fm;
+ for (fm = ctx->msg_brokers; fm && *fm; fm++)
+ free(*fm);
+ free(ctx->msg_brokers);
+ ctx->msg_brokers = NULL;
+ }
edg_wll_FreeParams(ctx);
return 0;
}
+
+int
+edg_wll_ParseMSGConf(char *msg_conf, char ***brokers, char ***prefixes) {
+ FILE *conf;
+ char l[512];
+ char *data, *d_to_parse;
+ int inmsg = 0, ntoks;
+ char *tok_r = NULL;
+ char *token;
+ char **tokens;
+
+
+ conf = fopen (msg_conf, "r");
+ if (conf == NULL) return -1; //Cannot open file
+
+ while( 1 ) {
+ fgets(l, 512, conf);
+ if ( feof(conf) ) break;
+
+ if (l[0] == '[') { // Detect section [msg]
+ if (!strncasecmp(l, "[msg]", 5)) inmsg = 1;
+ else inmsg = 0;
+ }
+ else if (inmsg) {
+ if ((!strncasecmp(l, "prefix", 6)) || (!strncasecmp(l, "broker", 6))) {
+ data=strchr(l, '=');
+ if (!data) return -2; // No '='
+// data = data[1];
+ if (strlen(data) < 1) return -2; // No text after '='
+
+ tokens = NULL; ntoks = 0;
+ for (d_to_parse = data+1; ; d_to_parse = NULL) {
+ token = strtok_r(d_to_parse, " \t\n", &tok_r);
+ if (token == NULL) break;
+
+ tokens = (char**) realloc (tokens, sizeof(char**) * (ntoks + 2));
+ asprintf(&(tokens[ntoks]), "%s", token);
+ tokens[++ntoks] = NULL;
+ }
+
+ if (!strncasecmp(l, "prefix", 6)) *prefixes=tokens;
+ else *brokers=tokens;
+ }
+
+ }
+
+ }
+
+ return 0;
+}
\end{verbatim}
\end{quote}
-Alongside the broker address and port, \texttt{msg.conf} also specifies the messaging plugin to be used by the notification interlogger. Plugin settings should be correct \emph{ab initio} and do not require modification by administrators. Broker settings may require an adaptive change in case the currently configured broker disapears and automatic checks fail to switch the settings to another one on time.
+Overall, \texttt{msg.conf} specifies the following information:
+
+\begin{itemize}
+\item The messaging plugin to be used by the notification interlogger. Plugin settings should be correct \emph{ab initio} and do not require modification by administrators.
+\item Broker settings (attribute \texttt{broker}). They may require an adaptive change in case the currently configured broker disappears and automatic checks fail to switch the settings to another one on time.
+\item Permissible topic title prefixes (attribute \texttt{prefix}). Registrations for delivery to topics not matching the prefix will be rejected. In case no prefix is specified, \LB applies the default EGI prefix: \texttt{grid.emi.}
+\end{itemize}
\subsubsection{Index configuration}
-O -a x-msg://grid.emi.lbexample
\end{verbatim}
-Rather than using the \LB notification API to receive messages, tap to the given messaging topic (\texttt{grid.emi.lbexample} in our case) in the messaging infrastructure.
+Rather than using the \LB notification API to receive messages, access the messaging infrastructure and tap into the given messaging topic (\texttt{grid.emi.lbexample} in our case).
-Note that production environments can impose restrictions on topic names. In the context of EGI, for instance, the ``\texttt{grid.emi.}'' prefix is mandatory.
+Note that production environments can impose restrictions on topic names. In the context of EGI, for instance, prefix ``\texttt{grid.emi.}'' is mandatory.
\subsubsection{Example: Waiting for more notifications on one socket}
char * policy_file = NULL;
struct _edg_wll_authz_policy authz_policy = { NULL, 0};
static int exclusive_zombies = 1;
-
-
+static char **msg_brokers = NULL;
+static char **msg_prefixes = NULL;
static struct option opts[] = {
{"jpreg-dir", 1, NULL, 'J'},
{"enable-jpreg-export", 1, NULL, 'j'},
{"super-user", 1, NULL, 'R'},
- {"super-users-file", 1, NULL,'F'},
+// {"super-users-file", 1, NULL,'F'},
+ {"msg-conf", 1, NULL,'F'},
{"no-index", 1, NULL, 'x'},
{"strict-locking",0, NULL, 'O'},
{"limits", 1, NULL, 'L'},
"\t-J, --jpreg-dir\t JP registration temporary files prefix (implies '-j')\n"
"\t-j, --enable-jpreg-export\t enable JP registration export (disabled by default)\n"
"\t--super-user\t user allowed to bypass authorization and indexing\n"
- "\t--super-users-file (deprecated)\t the same but read the subjects from a file\n"
"\t--no-index=1\t don't enforce indices for superusers\n"
"\t =2\t don't enforce indices at all\n"
"\t--strict-locking=1\t lock jobs also on storing events (may be slow)\n"
"\t-I,--rss-time\t age (in seconds) of job states published via RSS\n"
"\t-l,--policy\tauthorization policy file\n"
"\t-E,--exclusive-zombies-off\twith 'exclusive' flag, allow reusing IDs of purged jobs\n"
+ "\t-F,--msg-conf\t path to configuration file with messaging settings\n"
,me);
}
struct timeval to;
int request_timeout = REQUEST_TIMEOUT;
char socket_path_prefix[PATH_MAX] = GLITE_LBPROXY_SOCK_PREFIX;
+ char * msg_conf = NULL;
name = strrchr(argv[0],'/');
case 'Y': notif_ilog_file_prefix = strdup(optarg); break;
case 'i': strcpy(pidfile,optarg); pidfile_forced = 1; break;
case 'R': add_root(ctx, optarg, ADMIN_ACCESS); break;
- case 'F': glite_common_log(LOG_CATEGORY_CONTROL, LOG_PRIORITY_FATAL,
- "%s: Option --super-users-file is deprecated, specify policy using --policy instead", argv[0]);
- return 1;
+ case 'F': msg_conf = strdup(optarg); break;
case 'x': noIndex = atoi(optarg);
if (noIndex < 0 || noIndex > 2) { usage(name); return 1; }
break;
return 1;
}
+ if (msg_conf) {
+ int retv_msg;
+ glite_common_log(LOG_CATEGORY_CONTROL, LOG_PRIORITY_DEBUG, "Parsing MSG conf file: %s", msg_conf);
+ retv_msg = edg_wll_ParseMSGConf(msg_conf, &msg_brokers, &msg_prefixes);
+ if (retv_msg) {
+ switch(retv_msg) {
+ case -1: glite_common_log(LOG_CATEGORY_CONTROL, LOG_PRIORITY_WARN, "Error opening MSG conf file: %s", msg_conf); break;
+ case -2: glite_common_log(LOG_CATEGORY_CONTROL, LOG_PRIORITY_WARN, "Error parsing MSG conf file: %s", msg_conf); break;
+ }
+ }
+ }
+ if (!msg_prefixes) { // Prefixes not extracted from file, put in defaults
+ msg_prefixes = (char**) calloc(sizeof(char**), 2);
+ asprintf(&(msg_prefixes[0]), "grid.emi.");
+ }
+
if (enable_lcas) {
char s[3];
*name_num = NULL,
*name = NULL;
int h_errno, ret;
+ int npref, totpref;
ctx->greyjobs = greyjobs;
ctx->exclusive_zombies = exclusive_zombies;
+ for (totpref = 0; msg_prefixes[totpref]; totpref++);
+ ctx->msg_prefixes = (char**) calloc(sizeof(char*), totpref);
+ for (npref = 0; npref<totpref; npref++)
+ ctx->msg_prefixes[npref]=strdup(msg_prefixes[npref]);
+
+ for (totpref = 0; msg_brokers[totpref]; totpref++);
+ ctx->msg_brokers = (char**) calloc(sizeof(char*), totpref);
+ for (npref = 0; npref<totpref; npref++)
+ ctx->msg_brokers[npref]=strdup(msg_brokers[npref]);
+
return 0;
}
edg_wll_QueryRec **nconds = NULL;
char *add_index = NULL;
notif_stream_t *arg = NULL;
+ int npref, okpref;
+ char *msgpref;
/* Format notification ID
edg_wll_SetError(ctx, EINVAL, "Addres override not in format host:port");
goto cleanup;
}
- if ( strstr(address_override, "x-msg")) {
- // XXX: Quick ugly hack. This will be made configurable soon
- if ( !strstr(address_override,"x-msg://grid.emi.")) {
- edg_wll_SetError(ctx, EINVAL, "This site requires that all topic names start with prefix 'grid.emi.'");
+ if ( !strncmp(address_override, "x-msg", 5)) {
+ npref = 0; okpref = 0;
+ while (ctx->msg_prefixes[npref]) {
+ asprintf(&msgpref, "x-msg://%s", ctx->msg_prefixes[npref++]);
+ if ( !strncmp(address_override, msgpref, strlen(msgpref))) {
+ okpref = 1;
+ free(msgpref);
+ break;
+ }
+ free(msgpref);
+ }
+ if (!okpref) {
+ char *prefmsg, *prefmsg2;
+ asprintf(&prefmsg,"This site requires that all topic names start with prefix %s", ctx->msg_prefixes[0]);
+ npref = 1;
+ while (ctx->msg_prefixes[npref]) {
+ prefmsg2 = prefmsg;
+ asprintf(&prefmsg,"%s or %s", prefmsg2, ctx->msg_prefixes[npref++]);
+ free(prefmsg2);
+ }
+
+ glite_common_log(LOG_CATEGORY_CONTROL, LOG_PRIORITY_DEBUG, "Rejecting notification with disallowed prefix (%s)", address_override);
+ edg_wll_SetError(ctx, EINVAL, prefmsg);
+ free(prefmsg);
goto cleanup;
}
}
{
char *time_s = NULL,
*addr_s = NULL;
+ int npref, okpref;
+ char *msgpref;
if ( !address_override )
edg_wll_SetError(ctx, EINVAL, "Addres override not in format host:port");
goto rollback;
}
- if ( strstr(address_override, "x-msg")) {
- // XXX: Quick ugly hack. This will be made configurable soon
- if ( !strstr(address_override,"x-msg://grid.emi.")) {
- edg_wll_SetError(ctx, EINVAL, "This site requires that all topic names start with prefix 'grid.emi.'");
+ if ( !strncmp(address_override, "x-msg", 5)) {
+
+ npref = 0; okpref = 0;
+ while (ctx->msg_prefixes[npref]) {
+ asprintf(&msgpref, "x-msg://%s", ctx->msg_prefixes[npref++]);
+ if ( !strncmp(address_override, msgpref, strlen(msgpref))) {
+ okpref = 1;
+ free(msgpref);
+ break;
+ }
+ free(msgpref);
+ }
+ if (!okpref) {
+ char *prefmsg, *prefmsg2;
+ asprintf(&prefmsg,"This site requires that all topic names start with prefix %s", ctx->msg_prefixes[0]);
+ npref = 1;
+ while (ctx->msg_prefixes[npref]) {
+ prefmsg2 = prefmsg;
+ asprintf(&prefmsg,"%s or %s", prefmsg2, ctx->msg_prefixes[npref++]);
+ free(prefmsg2);
+ }
+
+ glite_common_log(LOG_CATEGORY_CONTROL, LOG_PRIORITY_DEBUG, "Rejecting notification with disallowed prefix (%s)", address_override);
+ edg_wll_SetError(ctx, EINVAL, prefmsg);
+ free(prefmsg);
goto rollback;
}
}