#endif
#define RTM_FILE_NOTIFS "/var/tmp/notifs.txt"
+#define RTM_FILE_NOTIF_PRINTF "%s\t%s\t%s\t%s\t%s\t%d\n"
+#define RTM_FILE_NOTIF_SCANF "%511[^\t]\t%511[^\t]\t%511[^\t]\t%511[^\t]\t%511[^\t]\t%511[^\t\r\n]\n"
+#define RTM_FILE_NOTIF_NUM 6
+
#define WLCG_FILENAME_TEMPLATE "/tmp/wlcg_%02d_XXXXXX"
#define WLCG_COMMAND_MESSAGE "/opt/lcg/bin/msg-publish -c /opt/lcg/etc/msg-publish.conf org.wlcg.usage.jobStatus %s"
#define WLCG_BINARY "/opt/lcg/bin/msg-publish"
#define glite_jobid_getServer edg_wlc_JobIdGetServer
#define glite_jobid_getServerParts edg_wlc_JobIdGetServerParts
#define glite_jobid_getUnique edg_wlc_JobIdGetUnique
-#endif
-#ifndef GLITE_JOBID_DEFAULT_PORT
-#define GLITE_JOBID_DEFAULT_PORT GLITE_WMSC_JOBID_DEFAULT_PORT
#define edg_wll_NotifNew(CTX, CONDS, FLAGS, SOCK, LADDR, ID, VALID) edg_wll_NotifNew((CTX), (CONDS), (SOCK), (LADDR), (ID), (VALID))
#define edg_wll_JDLField(STAT, NAME) NULL
+#ifndef GLITE_JOBID_DEFAULT_PORT
+#define GLITE_JOBID_DEFAULT_PORT GLITE_WMSC_JOBID_DEFAULT_PORT
+#endif
#endif
// TODO: ipv6? :-)
pthread_t thread;
notif_t *notifs;
int nservers;
- time_t first_refresh;
+ time_t next_refresh;
char time_s[100];
char *dash_filename;
int dash_fd;
}
+int rtm_str2time(const char *s) {
+ time_t t;
+
+ if (s && memcmp(s, "-", 2) != 0) {
+ t = glite_lbu_StrToTime(s);
+ if (t == (time_t)-1) return 0;
+ } else
+ t = 0;
+
+ return t;
+}
+
+
+int rtm_str2timestamp(const char *s) {
+ double t;
+
+ if (s && memcmp(s, "-", 2) != 0) {
+ t = glite_lbu_StrToTimestamp(s);
+ if (t <= 0.5) return 0.0;
+ } else
+ t = 0.0;
+
+ return t;
+}
+
+
int rtm_str2notiftype(const char *str) {
if (strcasecmp(str, "STATUS") == 0) return RTM_NOTIF_TYPE_STATUS;
if (strcasecmp(str, "DONE") == 0) return RTM_NOTIF_TYPE_DONE;
char *command;
// WLCG message
- if (t->dash_fd) {
+ if (t->dash_fd) { // send only if anything to send
close(t->dash_fd);
asprintf(&command, "'%s' -c '%s' '%s' '%s'", config.wlcg_binary, config.wlcg_config, config.wlcg_topic, t->dash_filename);
lprintf(t, DBG, "calling %s", command);
#endif
-static notif_t *db_add_notif(char *notifid, int type, time_t valid, time_t refresh, double last_update, char *server, int port, int active) {
+static notif_t *db_add_notif(char *notifid, int type, time_t valid, time_t refresh, double last_update, char *server, int port, int active, int errors) {
void *tmp;
notif_t *notif;
notif->server = server;
notif->port = port;
notif->active = active;
+ notif->error = errors;
db.n++;
return notif;
int retval = 1;
notif_t *notif;
int i;
- char *valid_str = NULL, *refresh_str = NULL, *last_update_str = NULL;
+ char *valid_str = NULL, *refresh_str = NULL, *last_update_str = NULL, *id_str = NULL;
asprintf(&filename, "%s-new", config.notif_file);
if ((f = fopen(filename, "wt")) == NULL) {
lprintf(t, DBG, "not saving inactive notif %s (%s), server %s:%d", notif->id_str, rtm_notiftype2str(notif->type), notif->server, notif->port);
continue;
}
- if (notif->id_str) {
+
+ if (notif->id_str) id_str = strdup(notif->id_str);
+ else if (notif->error) asprintf(&id_str, "%s:%d", notif->server, notif->port);
+ if (id_str) {
rtm_time2str(notif->valid, &valid_str);
rtm_time2str(notif->refresh, &refresh_str);
rtm_timestamp2str(notif->last_update, &last_update_str);
- fprintf(f, "%s\t%s\t%s\t%s\t%s\n", notif->id_str, rtm_notiftype2str(notif->type), valid_str, refresh_str, last_update_str);
+ fprintf(f, RTM_FILE_NOTIF_PRINTF, id_str, rtm_notiftype2str(notif->type), valid_str, refresh_str, last_update_str, notif->error);
free(valid_str); valid_str = NULL;
free(refresh_str); refresh_str = NULL;
free(last_update_str); last_update_str = NULL;
}
+ free(id_str);
+ id_str = NULL;
}
fclose(f);
if (rename(filename, config.notif_file) != 0) {
notif_t *notif;
int i;
char *sql = NULL, *valid_str = NULL, *refresh_str = NULL, *last_update_str = NULL;
- const char *type_str;
+ const char *type_str, *amp;
for (i = 0; i < db.n; i++) {
notif = db.notifs + i;
}
*/
type_str = rtm_notiftype2str(notif->type);
- if (notif->id_str) {
+ if (notif->id_str || notif->error) {
if (notif->valid) glite_lbu_TimeToDB(db.dbctx, notif->valid, &valid_str);
else valid_str = strdup("NULL");
if (notif->refresh) glite_lbu_TimeToDB(db.dbctx, notif->refresh, &refresh_str);
else refresh_str = strdup("NULL");
if (notif->last_update) glite_lbu_TimestampToDB(db.dbctx, notif->last_update, &last_update_str);
else last_update_str = strdup("NULL");
- trio_asprintf(&sql, "UPDATE notifs SET notifid='%|Ss', valid=%s, refresh=%s, last_update=%s WHERE lb='%|Ss' AND port=%d AND notiftype='%|Ss'", notif->id_str, valid_str, refresh_str, last_update_str, notif->server, notif->port, type_str);
+ amp = notif->id_str ? "'" : " ";
+ trio_asprintf(&sql, "UPDATE notifs SET notifid=%s%|Ss%s, valid=%s, refresh=%s, last_update=%s, errors=%d WHERE lb='%|Ss' AND port=%d AND notiftype='%|Ss'", amp, notif->id_str ? : "NULL", amp, valid_str, refresh_str, last_update_str, notif->error, notif->server, notif->port, type_str);
switch (glite_lbu_ExecSQL(db.dbctx, sql, NULL)) {
case 0:
// not found - insert
// can be handy when using file as input of LBs
free(sql);
- trio_asprintf(&sql, "INSERT INTO notifs (lb, port, notifid, notiftype, valid, refresh, last_update) VALUES ('%|Ss', %d, '%|Ss', '%|Ss', %s, %s, %s)", notif->server, notif->port, notif->id_str, type_str, valid_str, refresh_str, last_update_str);
+ trio_asprintf(&sql, "INSERT INTO notifs (lb, port, notifid, notiftype, valid, refresh, last_update, errors) VALUES ('%|Ss', %d, %s%|Ss%s, '%|Ss', %s, %s, %s, %d)", notif->server, notif->port, amp, notif->id_str ? : "NULL", amp, type_str, valid_str, refresh_str, last_update_str, notif->error);
switch (glite_lbu_ExecSQL(db.dbctx, sql, NULL)) {
case -1:
lprintf_dbctx(t, ERR, "notif '%s' (%s) insert failed", notif->id_str, type_str);
old_error = notif->error;
if (is_error) {
- if (!notif->error++) notif->refresh = time(NULL);
+ if (!notif->error++ || !notif->refresh) notif->refresh = time(NULL);
max_count = config.dive / RTM_ERROR_REPEAT_RATE / 2;
if (max_count <= 0) max_count = 1;
notif->refresh += (notif->error <= max_count ? notif->error : max_count) * RTM_ERROR_REPEAT_RATE;
if (new_notif) {
if ((notif = db_search_notif_by_server(db.notifs, db.n, new_notif->server, new_notif->port, new_notif->type)) == NULL) {
- if (db_add_notif(strdup(new_notif->id_str), new_notif->type, new_notif->valid, new_notif->refresh, new_notif->last_update, strdup(new_notif->server), new_notif->port, 1) == NULL) {
+ if (db_add_notif(strdup(new_notif->id_str), new_notif->type, new_notif->valid, new_notif->refresh, new_notif->last_update, strdup(new_notif->server), new_notif->port, 1, 0) == NULL) {
lprintf(t, ERR, "can't realloc");
goto quit;
}
time_t valid, refresh;
double last_update;
edg_wll_NotifId id;
- int type;
+ int type, i, errcnt, port;
int retval = 1;
if ((f = fopen(config.notif_file, "rt")) == NULL) {
return 0;
}
- results[0] = malloc(5 * 512);
- results[1] = results[0] + 512;
- results[2] = results[0] + 1024;
- results[3] = results[0] + 1536;
- results[4] = results[0] + 2048;
- while ((err = fscanf(f, "%511[^\t]\t%511[^\t]\t%511[^\t]\t%511[^\t]\t%511[^\t\r\n]\n", results[0], results[1], results[2], results[3], results[4])) == 5) {
+ results[0] = malloc(RTM_FILE_NOTIF_NUM * 512);
+ for (i = 1; i < RTM_FILE_NOTIF_NUM; i++) {
+ results[i] = results[0] + i * 512;
+ }
+ while ((err = fscanf(f, RTM_FILE_NOTIF_SCANF, results[0], results[1], results[2], results[3], results[4], results[5])) == RTM_FILE_NOTIF_NUM) {
notifidstr = results[0];
if ((type = rtm_str2notiftype(results[1])) == -1) {
lprintf(NULL, ERR, "unknown notification type '%s' in '%s'", results[1], notifidstr);
continue;
}
- valid = 0;
- if (results[2] && strcasecmp(results[2], "-") != 0) {
- valid = glite_lbu_StrToTime(results[2]);
- }
-
- refresh = 0;
- if (results[3] && strcasecmp(results[3], "-") != 0) {
- refresh = glite_lbu_StrToTime(results[2]);
- }
+ valid = rtm_str2time(results[2]);
+ refresh = rtm_str2time(results[3]);
+ last_update = rtm_str2timestamp(results[4]);
- last_update = 0;
- if (results[4] && strcasecmp(results[4], "-") != 0) {
- last_update = glite_lbu_StrToTimestamp(results[4]);
+ errcnt = 0;
+ if (results[5] && strcasecmp(results[5], "-") != 0) {
+ errcnt = atoi(results[5]);
}
- if ((new_notif = db_add_notif(strdup(notifidstr), type, valid, refresh, last_update, NULL, 0, 0)) == NULL) {
- lprintf(NULL, ERR, "can't alloc");
- goto quit;
- }
- if (edg_wll_NotifIdParse(notifidstr, &id) != 0) {
- lprintf(NULL, WRN, "can't parse notification ID '%s'", notifidstr);
- notif_free(new_notif);
- db.n--;
- continue;
+ if (errcnt) {
+ if (sscanf(notifidstr, "%511[^:]:%d", results[1], &port) != 2) {
+ lprintf(NULL, WRN, "can't parse server specification '%s'", notifidstr);
+ continue;
+ }
+ if ((new_notif = db_add_notif(NULL, type, valid, refresh, last_update, strdup(results[1]), port, 0, errcnt)) == NULL) {
+ lprintf(NULL, ERR, "can't alloc");
+ goto quit;
+ }
+ } else {
+ if (edg_wll_NotifIdParse(notifidstr, &id) != 0) {
+ lprintf(NULL, WRN, "can't parse notification ID '%s'", notifidstr);
+ continue;
+ }
+ if ((new_notif = db_add_notif(strdup(notifidstr), type, valid, refresh, last_update, NULL, 0, 0, errcnt)) == NULL) {
+ lprintf(NULL, ERR, "can't alloc");
+ goto quit;
+ }
+ edg_wll_NotifIdGetServerParts(id, &new_notif->server, &new_notif->port);
+ edg_wll_NotifIdFree(id);
}
- edg_wll_NotifIdGetServerParts(id, &new_notif->server, &new_notif->port);
- edg_wll_NotifIdFree(id);
}
if (err == EOF) retval = 0;
else lprintf(NULL, ERR, "can't parse notification file '%s'", config.notif_file);
time_t valid, refresh;
double last_update;
edg_wll_NotifId id;
- int type;
+ int type, i, errcnt;
int retval = 1;
glite_lbu_Statement stmt = NULL;
- char *results[5];
+ char *results[8];
- if (glite_lbu_ExecSQL(db.dbctx, "SELECT notifid, notiftype, valid, refresh, last_update FROM notifs WHERE notifid IS NOT NULL", &stmt) == -1) {
+ if (glite_lbu_ExecSQL(db.dbctx, "SELECT notifid, notiftype, valid, refresh, last_update, errors, lb, port FROM notifs", &stmt) == -1) {
lprintf_dbctx(NULL, ERR, "fetching notification failed");
goto quit;
}
- while ((err = glite_lbu_FetchRow(stmt, 5, NULL, results)) > 0) {
- notifidstr = results[0];
+ while ((err = glite_lbu_FetchRow(stmt, 8, NULL, results)) > 0) {
+ if (results[0] && results[0][0]) notifidstr = results[0];
+ else {
+ notifidstr = NULL;
+ free(results[0]);
+ }
results[0] = NULL;
+
if ((type = rtm_str2notiftype(results[1])) == -1) {
lprintf(NULL, ERR, "unknown notification type '%s' in '%s'", results[1], notifidstr);
- free(results[1]);
- free(results[2]);
- free(results[3]);
- free(results[4]);
+ for (i = 0; i < 8; i++) free(results[i]);
continue;
}
free(results[1]);
}
free(results[4]);
- if ((new_notif = db_add_notif(notifidstr, type, valid, refresh, last_update, NULL, 0, 0)) == NULL) {
+ errcnt = 0;
+ if (results[5] && results[5][0]) errcnt = atoi(results[5]);
+ free(results[5]);
+
+ if ((new_notif = db_add_notif(notifidstr, type, valid, refresh, last_update, results[6], atoi(results[7]), 0, errcnt)) == NULL) {
free(notifidstr);
+ free(results[6]);
+ free(results[7]);
lprintf(NULL, ERR, "can't alloc");
goto quit;
}
- if (edg_wll_NotifIdParse(notifidstr, &id) != 0) {
- lprintf(NULL, WRN, "can't parse notification IDs '%s'", notifidstr);
- notif_free(new_notif);
- db.n--;
- continue;
+ free(results[6]);
+ free(results[7]);
+ if (notifidstr) {
+ if (edg_wll_NotifIdParse(notifidstr, &id) != 0) {
+ lprintf(NULL, WRN, "can't parse notification IDs '%s'", notifidstr);
+ notif_free(new_notif);
+ db.n--;
+ continue;
+ }
+ edg_wll_NotifIdGetServerParts(id, &new_notif->server, &new_notif->port);
+ edg_wll_NotifIdFree(id);
}
- edg_wll_NotifIdGetServerParts(id, &new_notif->server, &new_notif->port);
- edg_wll_NotifIdFree(id);
}
if (err == 0) retval = 0;
else lprintf_dbctx(NULL, ERR, "fetching failed");
int load_notifs() {
- int ret;
+ int i, ret;
pthread_mutex_lock(&db.lock);
#else
ret = load_notifs_file();
#endif
+ // try to reconnect on bad notifications immediately
+ for (i = 0; i < db.n; i++)
+ if (db.notifs[i].error) db.notifs[i].refresh = 0;
pthread_mutex_unlock(&db.lock);
edg_wll_JobStat jobstat, *jobstates;
notif_t *notif, *notif_jdl;
edg_wll_QueryRec *conditions[3] = { NULL, NULL, NULL }, condition[2], condition2[2];
- int sock = -1, updated = 0, received = 0;
+ int sock = -1, updated = 0, error = 0, received = 0;
thread_t *t = (thread_t *)thread_data;
edg_wll_Context ctx = NULL;
int flags = 0;
//
while (!quit) {
now = time(NULL);
- t->first_refresh = now + RTM_NOTIF_LOOP_MAX_TIME;
+ t->next_refresh = now + RTM_NOTIF_LOOP_MAX_TIME;
for (i = 0; i < t->nservers; i++) {
notif = t->notifs + i;
if (!notif->active) {
if (notif->error) {
if (notif->refresh > now) {
lprintf(t, INS, "not planned to retry previously failed %d. notification '%s' (%s), plan %s", i, notif->id_str, rtm_notiftype2str(notif->type), time2str(t, notif->refresh));
- if (t->first_refresh > notif->refresh) t->first_refresh = notif->refresh;
+ if (t->next_refresh > notif->refresh) t->next_refresh = notif->refresh;
continue;
}
lprintf(t, DBG, "retry previously failed %d. notification '%s' (%s)", i, notif->id_str, rtm_notiftype2str(notif->type));
// next retry of STATUS stright before the JDL
if (notif_jdl) {
notif->refresh = notif_jdl->refresh;
- if (t->first_refresh > notif->refresh) t->first_refresh = notif->refresh;
+ if (t->next_refresh > notif->refresh) t->next_refresh = notif->refresh;
}
continue;
}
notif->valid = 0;
notif->id = NULL;
rtm_update_error_state(t, notif, i, 1);
- if (t->first_refresh > notif->refresh) t->first_refresh = notif->refresh;
- continue;
+ error = 1;
+ goto cont;
}
notif->id_str = edg_wll_NotifIdUnparse(notif->id);
lprintf(t, INF, "created %d. notification '%s' (%s), valid: %s", i, notif->id_str, rtm_notiftype2str(notif->type), time2str(t, notif->valid));
notif->id_str = NULL;
notif->valid = 0;
rtm_update_error_state(t, notif, i, 1);
- if (t->first_refresh > notif->refresh) t->first_refresh = notif->refresh;
- continue;
+ error = 1;
+ if (t->next_refresh > notif->refresh) t->next_refresh = notif->refresh;
+ goto cont;
} else {
for (j = 0; jobstates[j].state != EDG_WLL_JOB_UNDEF; j++) {
notif->last_update = jobstates[j].lastUpdateTime.tv_sec + jobstates[j].lastUpdateTime.tv_usec / 1000000.0;
lprintf(t, INS, "no change in %d. notification '%s' (%s)", i, notif->id_str, rtm_notiftype2str(notif->type));
}
- if (updated) {
- assert(notif->valid);
+cont:
+ if (updated || error) {
+ if (!error) {
+ assert(notif->valid);
+ notif->refresh = notif->valid ? (now + ((notif->valid - now) >> 1)) : 0;
+
+ }
// create or refresh OK, bootstrap if needed OK, store the new notification
updated = 0;
- notif->refresh = notif->valid ? (now + ((notif->valid - now) >> 1)) : 0;
+ error = 0;
+
// quicker refresh (or recreate) if needed
now = time(NULL);
if (notif->valid && now >= notif->refresh) {
}
// compute time of the next event from the new refresh on notification
- if (t->first_refresh > notif->refresh) t->first_refresh = notif->refresh;
+ if (t->next_refresh > notif->refresh) t->next_refresh = notif->refresh;
}
// receive
// cycle here locally around NotifReceive, we know about next
// refresh time
//
- lprintf(t, DBG, "waiting for the notifications up to %s...", t->first_refresh ? time2str(t, t->first_refresh) : "0 (no wait)");
- while (t->first_refresh > now && !quit) {
- to.tv_sec = t->first_refresh - now;
+ lprintf(t, DBG, "waiting for the notifications up to %s...", t->next_refresh ? time2str(t, t->next_refresh) : "0 (no wait)");
+ while (t->next_refresh > now && !quit) {
+ to.tv_sec = t->next_refresh - now;
if (to.tv_sec > RTM_NOTIF_READ_TIMEOUT) to.tv_sec = RTM_NOTIF_READ_TIMEOUT;
to.tv_usec = 0;
memset(&jobstat, 0, sizeof(jobstat));
}
-int reconcile_config_db() {
- int i, j, n, type, typestart, typeend;
+int reconcile_threads() {
+ int iserver, ithread, inotif, gran, mod, nnotifs;
+ int j, oldn, type, typestart, typeend;
notif_t *a, *b;
edg_wll_Context ctx = NULL;
edg_wll_NotifId notifid;
+ thread_t *t;
if (!config.cleanup) {
if (config.silly) {
typestart = RTM_NOTIF_TYPE_OLD;
typeend = RTM_NOTIF_TYPE_OLD;
+ nnotifs = 1;
} else {
typestart = RTM_NOTIF_TYPE_STATUS;
typeend = RTM_NOTIF_TYPE_JDL;
+ nnotifs = 2;
}
- n = db.n;
- for (i = 0; i < config.nservers; i++) {
- a = config.notifs + i;
- for (type = typestart; type <= typeend; type++)
- {
- b = db_search_notif_by_server(db.notifs, n, a->server, a->port, type);
- if (!b) b = db_add_notif(NULL, type, 0, 0, 0, strdup(a->server), a->port, 1);
- else lprintf(NULL, INF, "found previous notification '%s' (%s)", b->id_str, rtm_notiftype2str(b->type));
- b->active = 1;
+
+ oldn = db.n;
+
+ // distribute LB servers between threads
+ // (always use existing loaded notification when found)
+ threads = (thread_t *)calloc(config.nthreads, sizeof(thread_t));
+ gran = config.nservers / config.nthreads, mod = config.nservers % config.nthreads;
+ t = NULL;
+ ithread = 0;
+ inotif = 0;
+ for (iserver = 0; iserver < config.nservers; iserver++) {
+ // new thread
+ if (!t || inotif + nnotifs > t->nservers) {
+ assert(ithread < config.nthreads); // proper number of threads
+ assert(!t || inotif == t->nservers); // start or exactly distributed
+ t = threads + ithread;
+ t->nservers = nnotifs * ((ithread < mod) ? gran + 1 : gran);
+ t->notifs = (notif_t *)calloc(t->nservers, sizeof(notif_t));
+ lprintf(NULL, DBG, "%d. thread: %d notifications", ithread, t->nservers);
+ ithread++;
+ inotif = 0;
+ }
+
+ // next configured server
+ a = config.notifs + iserver;
+ for (type = typestart; type <= typeend; type++) {
+ // find or create all notification types
+ b = db_search_notif_by_server(db.notifs, oldn, a->server, a->port, type);
+ if (!b) b = db_add_notif(NULL, type, 0, 0, 0, strdup(a->server), a->port, 1, 0);
+ else {
+ if (b->id_str) {
+ lprintf(NULL, INF, "found previous notification '%s' (%s)", b->id_str, rtm_notiftype2str(b->type));
+ } else {
+ lprintf(NULL, INF, "found previous server %s:%d (%s), %d errors", b->server, b->port, rtm_notiftype2str(b->type), b->error);
+ }
+ b->active = 1;
+ }
+ // and add each to the thread
+ notif_copy(t->notifs + inotif, b);
+ lprintf(NULL, INS, "thread[%d][%d] <- %s:%d (%s), id %s", ithread-1, inotif, b->server, b->port, rtm_notiftype2str(b->type), b->id_str);
+ inotif++;
}
}
+ assert(db.n % nnotifs == 0); // each server all notifs
}
if (edg_wll_InitContext(&ctx) != 0) {
config.notifs = calloc(n, sizeof(notif_t));
config.nservers = 0;
- if ((err = glite_lbu_ExecSQL(db.dbctx, "SELECT DISTINCT lb, port FROM " RTM_DB_TABLE_LBS, &stmt)) < 0) {
+ if ((err = glite_lbu_ExecSQL(db.dbctx, "SELECT DISTINCT ip FROM " RTM_DB_TABLE_LBS, &stmt)) < 0) {
goto err;
}
while (config.nservers < n && (err = glite_lbu_FetchRow(stmt, 2, NULL, results)) > 0) {
config.notifs[config.nservers].server = strdup(results[0]);
- config.notifs[config.nservers++].port = atoi(results[1]);
+ config.notifs[config.nservers++].port = GLITE_JOBID_DEFAULT_PORT;
free(results[0]);
- free(results[1]);
}
if (err < 0) goto err;
glite_lbu_FreeStmt(&stmt);
int main(int argn, char *argv[]) {
struct sigaction sa;
sigset_t sset;
- int i, j, k, gran, mod, nnotifs;
+ int i, j;
double t1, t2, last_summary = 0, start_time;
thread_t *t;
struct stat pstat;
lprintf(NULL, ERR, "caught signal %d from process %d, resurrecting...", WTERMSIG(status), watched);
// slow down the core generator ;-)
// disabled signals and ended child in pidfile, live with it
+ pthread_sigmask(SIG_UNBLOCK, &sset, NULL);
+ if (config.pidfile) {
+ if (remove(config.pidfile) == -1) lprintf(NULL, WRN, "can't remove pidfile '%s': %s", config.pidfile, strerror(errno));
+ }
sleep(2);
+ pthread_sigmask(SIG_BLOCK, &sset, NULL);
break;
default:
lprintf(NULL, WRN, "ended with signal %d", WTERMSIG(status));
}
}
+ // child continues...
+
// threads && Globus
if (edg_wll_gss_initialize()) {
lprintf(NULL, ERR, "can't initialize GSS");
if (load_notifs()) goto quit;
// compare lb servers from configuration and notifications,
// or clean up and exit if specified
- if (reconcile_config_db()) goto quit;
+ if (reconcile_threads()) goto quit;
if (config.cleanup) {
retval = RTM_EXIT_OK;
goto quit;
// enable signals in main
pthread_sigmask(SIG_UNBLOCK, &sset, NULL);
- // distribution LB servers between threads
- nnotifs = config.silly ? 1 : 2;
- threads = (thread_t *)calloc(config.nthreads, sizeof(thread_t));
- assert(db.n % nnotifs == 0); // each server RTM_NOTIF_TYPE_LAST notification types
- gran = (db.n / nnotifs) / config.nthreads, mod = (db.n / nnotifs) % config.nthreads;
- i = 0;
- j = 0;
- do {
- assert(j < config.nthreads);
- t = threads + j;
- t->nservers = nnotifs * ((j < mod) ? gran + 1 : gran);
- lprintf(NULL, DBG, "%d thread: %d notifications", j, t->nservers);
- if (t->nservers) {
- t->notifs = (notif_t *)calloc(t->nservers, sizeof(notif_t));
- for (k = 0; k < t->nservers; k++) {
- notif_copy(t->notifs + k, db.notifs + i);
- i++;
- }
- }
- j++;
- } while (i < db.n);
// launch the threads
- for (j = 0; j < config.nthreads; j++) {
- t = threads + j;
- t->id = j;
- if (pthread_create(&threads[j].thread, NULL, notify_thread, t) != 0) {
- lprintf(NULL, ERR, "[main] can't create %d. thread: %s\n", j, strerror(errno));
+ for (i = 0; i < config.nthreads; i++) {
+ t = threads + i;
+ t->id = i;
+ if (pthread_create(&threads[i].thread, NULL, notify_thread, t) != 0) {
+ lprintf(NULL, ERR, "[main] can't create %d. thread: %s\n", i, strerror(errno));
goto quit;
}
}