Slight refactoring of notifications loading:
authorFrantišek Dvořák <valtri@civ.zcu.cz>
Tue, 1 Dec 2009 12:06:11 +0000 (12:06 +0000)
committerFrantišek Dvořák <valtri@civ.zcu.cz>
Tue, 1 Dec 2009 12:06:11 +0000 (12:06 +0000)
- preserve error counter (save not registered notifs too)
- more robust reconciliation
- distribution to threads in reconciliation

Different config DB schema for RTM.

org.glite.lb.harvester/examples/test.sql
org.glite.lb.harvester/src/harvester.c

index 7bf81b5..15736f6 100644 (file)
@@ -1,3 +1,19 @@
+--
+-- Inicialization (replace pgsql by actual postgres superuser):
+--
+-- 1) grant privileges, someting like this in $data/pg_hba.conf:
+--     local all all trust
+--
+-- 2) create user:
+--     createuser -U pgsql rtm
+--
+-- 3) crate database:
+--     createuser -U pgsql rtm
+--
+-- 4) create tables:
+--     psql -U rtm rtm < test.sql
+--
+
 CREATE TABLE "jobs" (
        jobid VARCHAR PRIMARY KEY,
        lb VARCHAR,
@@ -15,10 +31,14 @@ CREATE TABLE "jobs" (
 );
 
 CREATE TABLE "lb20" (
-       lb VARCHAR,
-       port INTEGER,
+       ip TEXT NOT NULL,
+       branch TEXT NOT NULL,
+       serv_version TEXT NOT NULL,
+       monitored BOOLEAN DEFAULT FALSE,
+       last_seen DATE,
+       first_seen DATE,
 
-       PRIMARY KEY(lb, port)
+       PRIMARY KEY(ip)
 );
 
 CREATE TABLE "notifs" (
index 7af1b5e..9fe52a6 100644 (file)
 #endif
 
 #define RTM_FILE_NOTIFS "/var/tmp/notifs.txt"
+#define RTM_FILE_NOTIF_PRINTF "%s\t%s\t%s\t%s\t%s\t%d\n"
+#define RTM_FILE_NOTIF_SCANF "%511[^\t]\t%511[^\t]\t%511[^\t]\t%511[^\t]\t%511[^\t]\t%511[^\t\r\n]\n"
+#define RTM_FILE_NOTIF_NUM 6
+
 #define WLCG_FILENAME_TEMPLATE "/tmp/wlcg_%02d_XXXXXX"
 #define WLCG_COMMAND_MESSAGE "/opt/lcg/bin/msg-publish -c /opt/lcg/etc/msg-publish.conf org.wlcg.usage.jobStatus %s"
 #define WLCG_BINARY "/opt/lcg/bin/msg-publish"
 #define glite_jobid_getServer edg_wlc_JobIdGetServer
 #define glite_jobid_getServerParts edg_wlc_JobIdGetServerParts
 #define glite_jobid_getUnique edg_wlc_JobIdGetUnique
-#endif
-#ifndef GLITE_JOBID_DEFAULT_PORT
-#define GLITE_JOBID_DEFAULT_PORT GLITE_WMSC_JOBID_DEFAULT_PORT
 #define edg_wll_NotifNew(CTX, CONDS, FLAGS, SOCK, LADDR, ID, VALID) edg_wll_NotifNew((CTX), (CONDS), (SOCK), (LADDR), (ID), (VALID))
 #define edg_wll_JDLField(STAT, NAME) NULL
+#ifndef GLITE_JOBID_DEFAULT_PORT
+#define GLITE_JOBID_DEFAULT_PORT GLITE_WMSC_JOBID_DEFAULT_PORT
+#endif
 #endif
 
 // TODO: ipv6? :-)
@@ -149,7 +153,7 @@ typedef struct {
        pthread_t thread;
        notif_t *notifs;
        int nservers;
-       time_t first_refresh;
+       time_t next_refresh;
        char time_s[100];
        char *dash_filename;
        int dash_fd;
@@ -429,6 +433,32 @@ void rtm_timestamp2str(double t, char **str) {
 }
 
 
+int rtm_str2time(const char *s) {
+       time_t t;
+
+       if (s && memcmp(s, "-", 2) != 0) {
+               t = glite_lbu_StrToTime(s);
+               if (t == (time_t)-1) return 0;
+       } else
+               t = 0;
+
+       return t;
+}
+
+
+int rtm_str2timestamp(const char *s) {
+       double t;
+
+       if (s && memcmp(s, "-", 2) != 0) {
+               t = glite_lbu_StrToTimestamp(s);
+               if (t <= 0.5) return 0.0;
+       } else
+               t = 0.0;
+
+       return t;
+}
+
+
 int rtm_str2notiftype(const char *str) {
        if (strcasecmp(str, "STATUS") == 0) return RTM_NOTIF_TYPE_STATUS;
        if (strcasecmp(str, "DONE") == 0) return RTM_NOTIF_TYPE_DONE;
@@ -515,7 +545,7 @@ int wlcg_send_message(thread_t *t) {
        char *command;
 
        // WLCG message
-       if (t->dash_fd) {
+       if (t->dash_fd) { // send only if anything to send
                close(t->dash_fd);
                asprintf(&command, "'%s' -c '%s' '%s' '%s'", config.wlcg_binary, config.wlcg_config, config.wlcg_topic, t->dash_filename);
                lprintf(t, DBG, "calling %s", command);
@@ -634,7 +664,7 @@ static void db_free(__attribute((unused))thread_t *t, glite_lbu_DBContext dbctx)
 #endif
 
 
-static notif_t *db_add_notif(char *notifid, int type, time_t valid, time_t refresh, double last_update, char *server, int port, int active) {
+static notif_t *db_add_notif(char *notifid, int type, time_t valid, time_t refresh, double last_update, char *server, int port, int active, int errors) {
        void *tmp;
        notif_t *notif;
 
@@ -653,6 +683,7 @@ static notif_t *db_add_notif(char *notifid, int type, time_t valid, time_t refre
        notif->server = server;
        notif->port = port;
        notif->active = active;
+       notif->error = errors;
        db.n++;
 
        return notif;
@@ -665,7 +696,7 @@ static int db_save_notifs_file(thread_t *t) {
        int retval = 1;
        notif_t *notif;
        int i;
-       char *valid_str = NULL, *refresh_str = NULL, *last_update_str = NULL;
+       char *valid_str = NULL, *refresh_str = NULL, *last_update_str = NULL, *id_str = NULL;
 
        asprintf(&filename, "%s-new", config.notif_file);
        if ((f = fopen(filename, "wt")) == NULL) {
@@ -679,17 +710,22 @@ static int db_save_notifs_file(thread_t *t) {
                        lprintf(t, DBG, "not saving inactive notif %s (%s), server %s:%d", notif->id_str, rtm_notiftype2str(notif->type), notif->server, notif->port);
                        continue;
                }
-               if (notif->id_str) {
+
+               if (notif->id_str) id_str = strdup(notif->id_str);
+               else if (notif->error) asprintf(&id_str, "%s:%d", notif->server, notif->port);
+               if (id_str) {
                        rtm_time2str(notif->valid, &valid_str);
                        rtm_time2str(notif->refresh, &refresh_str);
                        rtm_timestamp2str(notif->last_update, &last_update_str);
 
-                       fprintf(f, "%s\t%s\t%s\t%s\t%s\n", notif->id_str, rtm_notiftype2str(notif->type), valid_str, refresh_str, last_update_str);
+                       fprintf(f, RTM_FILE_NOTIF_PRINTF, id_str, rtm_notiftype2str(notif->type), valid_str, refresh_str, last_update_str, notif->error);
 
                        free(valid_str); valid_str = NULL;
                        free(refresh_str); refresh_str = NULL;
                        free(last_update_str); last_update_str = NULL;
                }
+               free(id_str);
+               id_str = NULL;
        }
        fclose(f);
        if (rename(filename, config.notif_file) != 0) {
@@ -712,7 +748,7 @@ static int db_save_notifs_sql(thread_t *t) {
        notif_t *notif;
        int i;
        char *sql = NULL, *valid_str = NULL, *refresh_str = NULL, *last_update_str = NULL;
-       const char *type_str;
+       const char *type_str, *amp;
 
        for (i = 0; i < db.n; i++) {
                notif = db.notifs + i;
@@ -723,20 +759,21 @@ static int db_save_notifs_sql(thread_t *t) {
                }
 */
                type_str = rtm_notiftype2str(notif->type);
-               if (notif->id_str) {
+               if (notif->id_str || notif->error) {
                        if (notif->valid) glite_lbu_TimeToDB(db.dbctx, notif->valid, &valid_str);
                        else valid_str = strdup("NULL");
                        if (notif->refresh) glite_lbu_TimeToDB(db.dbctx, notif->refresh, &refresh_str);
                        else refresh_str = strdup("NULL");
                        if (notif->last_update) glite_lbu_TimestampToDB(db.dbctx, notif->last_update, &last_update_str);
                        else last_update_str = strdup("NULL");
-                       trio_asprintf(&sql, "UPDATE notifs SET notifid='%|Ss', valid=%s, refresh=%s, last_update=%s WHERE lb='%|Ss' AND port=%d AND notiftype='%|Ss'", notif->id_str, valid_str, refresh_str, last_update_str, notif->server, notif->port, type_str);
+                       amp = notif->id_str ? "'" : " ";
+                       trio_asprintf(&sql, "UPDATE notifs SET notifid=%s%|Ss%s, valid=%s, refresh=%s, last_update=%s, errors=%d WHERE lb='%|Ss' AND port=%d AND notiftype='%|Ss'", amp, notif->id_str ? : "NULL", amp, valid_str, refresh_str, last_update_str, notif->error, notif->server, notif->port, type_str);
                        switch (glite_lbu_ExecSQL(db.dbctx, sql, NULL)) {
                        case 0:
                                // not found - insert
                                // can be handy when using file as input of LBs
                                free(sql);
-                               trio_asprintf(&sql, "INSERT INTO notifs (lb, port, notifid, notiftype, valid, refresh, last_update) VALUES ('%|Ss', %d, '%|Ss', '%|Ss', %s, %s, %s)", notif->server, notif->port, notif->id_str, type_str, valid_str, refresh_str, last_update_str);
+                               trio_asprintf(&sql, "INSERT INTO notifs (lb, port, notifid, notiftype, valid, refresh, last_update, errors) VALUES ('%|Ss', %d, %s%|Ss%s, '%|Ss', %s, %s, %s, %d)", notif->server, notif->port, amp, notif->id_str ? : "NULL", amp, type_str, valid_str, refresh_str, last_update_str, notif->error);
                                switch (glite_lbu_ExecSQL(db.dbctx, sql, NULL)) {
                                case -1:
                                        lprintf_dbctx(t, ERR, "notif '%s' (%s) insert failed", notif->id_str, type_str);
@@ -1137,7 +1174,7 @@ static int rtm_update_error_state(thread_t *t, notif_t *notif, int index, int is
 
        old_error = notif->error;
        if (is_error) {
-               if (!notif->error++) notif->refresh = time(NULL);
+               if (!notif->error++ || !notif->refresh) notif->refresh = time(NULL);
                max_count = config.dive / RTM_ERROR_REPEAT_RATE / 2;
                if (max_count <= 0) max_count = 1;
                notif->refresh += (notif->error <= max_count ? notif->error : max_count) * RTM_ERROR_REPEAT_RATE;
@@ -1169,7 +1206,7 @@ int rtm_update_notif(thread_t *t, notif_t *new_notif, int store) {
 
        if (new_notif) {        
                if ((notif = db_search_notif_by_server(db.notifs, db.n, new_notif->server, new_notif->port, new_notif->type)) == NULL) {
-                       if (db_add_notif(strdup(new_notif->id_str), new_notif->type, new_notif->valid, new_notif->refresh, new_notif->last_update, strdup(new_notif->server), new_notif->port, 1) == NULL) {
+                       if (db_add_notif(strdup(new_notif->id_str), new_notif->type, new_notif->valid, new_notif->refresh, new_notif->last_update, strdup(new_notif->server), new_notif->port, 1, 0) == NULL) {
                                lprintf(t, ERR, "can't realloc");
                                goto quit;
                        }
@@ -1218,7 +1255,7 @@ int load_notifs_file() {
        time_t valid, refresh;
        double last_update;
        edg_wll_NotifId id;
-       int type;
+       int type, i, errcnt, port;
        int retval = 1;
 
        if ((f = fopen(config.notif_file, "rt")) == NULL) {
@@ -1226,45 +1263,47 @@ int load_notifs_file() {
                return 0;
        }
 
-       results[0] = malloc(5 * 512);
-       results[1] = results[0] + 512;
-       results[2] = results[0] + 1024;
-       results[3] = results[0] + 1536;
-       results[4] = results[0] + 2048;
-       while ((err = fscanf(f, "%511[^\t]\t%511[^\t]\t%511[^\t]\t%511[^\t]\t%511[^\t\r\n]\n", results[0], results[1], results[2], results[3], results[4])) == 5) {
+       results[0] = malloc(RTM_FILE_NOTIF_NUM * 512);
+       for (i = 1; i < RTM_FILE_NOTIF_NUM; i++) {
+               results[i] = results[0] + i * 512;
+       }
+       while ((err = fscanf(f, RTM_FILE_NOTIF_SCANF, results[0], results[1], results[2], results[3], results[4], results[5])) == RTM_FILE_NOTIF_NUM) {
                notifidstr = results[0];
                if ((type = rtm_str2notiftype(results[1])) == -1) {
                        lprintf(NULL, ERR, "unknown notification type '%s' in '%s'", results[1], notifidstr);
                        continue;
                }
 
-               valid = 0;
-               if (results[2] && strcasecmp(results[2], "-") != 0) {
-                       valid = glite_lbu_StrToTime(results[2]);
-               }
-
-               refresh = 0;
-               if (results[3] && strcasecmp(results[3], "-") != 0) {
-                       refresh = glite_lbu_StrToTime(results[2]);
-               }
+               valid = rtm_str2time(results[2]);
+               refresh = rtm_str2time(results[3]);
+               last_update = rtm_str2timestamp(results[4]);
 
-               last_update = 0;
-               if (results[4] && strcasecmp(results[4], "-") != 0) {
-                       last_update = glite_lbu_StrToTimestamp(results[4]);
+               errcnt = 0;
+               if (results[5] && strcasecmp(results[5], "-") != 0) {
+                       errcnt = atoi(results[5]);
                }
 
-               if ((new_notif = db_add_notif(strdup(notifidstr), type, valid, refresh, last_update, NULL, 0, 0)) == NULL) {
-                       lprintf(NULL, ERR, "can't alloc");
-                       goto quit;
-               }
-               if (edg_wll_NotifIdParse(notifidstr, &id) != 0) {
-                       lprintf(NULL, WRN, "can't parse notification ID '%s'", notifidstr);
-                       notif_free(new_notif);
-                       db.n--;
-                       continue;
+               if (errcnt) {
+                       if (sscanf(notifidstr, "%511[^:]:%d", results[1], &port) != 2) {
+                               lprintf(NULL, WRN, "can't parse server specification '%s'", notifidstr);
+                               continue;
+                       }
+                       if ((new_notif = db_add_notif(NULL, type, valid, refresh, last_update, strdup(results[1]), port, 0, errcnt)) == NULL) {
+                               lprintf(NULL, ERR, "can't alloc");
+                               goto quit;
+                       }
+               } else {
+                       if (edg_wll_NotifIdParse(notifidstr, &id) != 0) {
+                               lprintf(NULL, WRN, "can't parse notification ID '%s'", notifidstr);
+                               continue;
+                       }
+                       if ((new_notif = db_add_notif(strdup(notifidstr), type, valid, refresh, last_update, NULL, 0, 0, errcnt)) == NULL) {
+                               lprintf(NULL, ERR, "can't alloc");
+                               goto quit;
+                       }
+                       edg_wll_NotifIdGetServerParts(id, &new_notif->server, &new_notif->port);
+                       edg_wll_NotifIdFree(id);
                }
-               edg_wll_NotifIdGetServerParts(id, &new_notif->server, &new_notif->port);
-               edg_wll_NotifIdFree(id);
        }
        if (err == EOF) retval = 0;
        else lprintf(NULL, ERR, "can't parse notification file '%s'", config.notif_file);
@@ -1283,24 +1322,26 @@ int load_notifs_sql() {
        time_t valid, refresh;
        double last_update;
        edg_wll_NotifId id;
-       int type;
+       int type, i, errcnt;
        int retval = 1;
        glite_lbu_Statement stmt = NULL;
-       char *results[5];
+       char *results[8];
 
-       if (glite_lbu_ExecSQL(db.dbctx, "SELECT notifid, notiftype, valid, refresh, last_update FROM notifs WHERE notifid IS NOT NULL", &stmt) == -1) {
+       if (glite_lbu_ExecSQL(db.dbctx, "SELECT notifid, notiftype, valid, refresh, last_update, errors, lb, port FROM notifs", &stmt) == -1) {
                lprintf_dbctx(NULL, ERR, "fetching notification failed");
                goto quit;
        }
-       while ((err = glite_lbu_FetchRow(stmt, 5, NULL, results)) > 0) {
-               notifidstr = results[0];
+       while ((err = glite_lbu_FetchRow(stmt, 8, NULL, results)) > 0) {
+               if (results[0] && results[0][0]) notifidstr = results[0];
+               else {
+                       notifidstr = NULL;
+                       free(results[0]);
+               }
                results[0] = NULL;
+
                if ((type = rtm_str2notiftype(results[1])) == -1) {
                        lprintf(NULL, ERR, "unknown notification type '%s' in '%s'", results[1], notifidstr);
-                       free(results[1]);
-                       free(results[2]);
-                       free(results[3]);
-                       free(results[4]);
+                       for (i = 0; i < 8; i++) free(results[i]);
                        continue;
                }
                free(results[1]);
@@ -1323,19 +1364,29 @@ int load_notifs_sql() {
                }
                free(results[4]);
 
-               if ((new_notif = db_add_notif(notifidstr, type, valid, refresh, last_update, NULL, 0, 0)) == NULL) {
+               errcnt = 0;
+               if (results[5] && results[5][0]) errcnt = atoi(results[5]);
+               free(results[5]);
+
+               if ((new_notif = db_add_notif(notifidstr, type, valid, refresh, last_update, results[6], atoi(results[7]), 0, errcnt)) == NULL) {
                        free(notifidstr);
+                       free(results[6]);
+                       free(results[7]);
                        lprintf(NULL, ERR, "can't alloc");
                        goto quit;
                }
-               if (edg_wll_NotifIdParse(notifidstr, &id) != 0) {
-                       lprintf(NULL, WRN, "can't parse notification IDs '%s'", notifidstr);
-                       notif_free(new_notif);
-                       db.n--;
-                       continue;
+               free(results[6]);
+               free(results[7]);
+               if (notifidstr) {
+                       if (edg_wll_NotifIdParse(notifidstr, &id) != 0) {
+                               lprintf(NULL, WRN, "can't parse notification IDs '%s'", notifidstr);
+                               notif_free(new_notif);
+                               db.n--;
+                               continue;
+                       }
+                       edg_wll_NotifIdGetServerParts(id, &new_notif->server, &new_notif->port);
+                       edg_wll_NotifIdFree(id);
                }
-               edg_wll_NotifIdGetServerParts(id, &new_notif->server, &new_notif->port);
-               edg_wll_NotifIdFree(id);
        }
        if (err == 0) retval = 0;
        else lprintf_dbctx(NULL, ERR, "fetching failed");
@@ -1347,7 +1398,7 @@ quit:
 
 
 int load_notifs() {
-       int ret;
+       int i, ret;
 
        pthread_mutex_lock(&db.lock);
 
@@ -1357,6 +1408,9 @@ int load_notifs() {
 #else
        ret = load_notifs_file();
 #endif
+       // try to reconnect on bad notifications immediately
+       for (i = 0; i < db.n; i++)
+               if (db.notifs[i].error) db.notifs[i].refresh = 0;
 
        pthread_mutex_unlock(&db.lock);
 
@@ -1383,7 +1437,7 @@ void *notify_thread(void *thread_data) {
        edg_wll_JobStat jobstat, *jobstates;
        notif_t *notif, *notif_jdl;
        edg_wll_QueryRec *conditions[3] = { NULL, NULL, NULL }, condition[2], condition2[2];
-       int sock = -1, updated = 0, received = 0;
+       int sock = -1, updated = 0, error = 0, received = 0;
        thread_t *t = (thread_t *)thread_data;
        edg_wll_Context ctx = NULL;
        int flags = 0;
@@ -1450,7 +1504,7 @@ void *notify_thread(void *thread_data) {
        //
        while (!quit) {
                now = time(NULL);
-               t->first_refresh = now + RTM_NOTIF_LOOP_MAX_TIME;
+               t->next_refresh = now + RTM_NOTIF_LOOP_MAX_TIME;
                for (i = 0; i < t->nservers; i++) {
                        notif = t->notifs + i;
                        if (!notif->active) {
@@ -1461,7 +1515,7 @@ void *notify_thread(void *thread_data) {
                        if (notif->error) {
                                if (notif->refresh > now) {
                                        lprintf(t, INS, "not planned to retry previously failed %d. notification '%s' (%s), plan %s", i, notif->id_str, rtm_notiftype2str(notif->type), time2str(t, notif->refresh));
-                                       if (t->first_refresh > notif->refresh) t->first_refresh = notif->refresh;
+                                       if (t->next_refresh > notif->refresh) t->next_refresh = notif->refresh;
                                        continue;
                                }
                                lprintf(t, DBG, "retry previously failed %d. notification '%s' (%s)", i, notif->id_str, rtm_notiftype2str(notif->type));
@@ -1483,7 +1537,7 @@ void *notify_thread(void *thread_data) {
                                                // next retry of STATUS stright before the JDL
                                                if (notif_jdl) {
                                                        notif->refresh = notif_jdl->refresh;
-                                                       if (t->first_refresh > notif->refresh) t->first_refresh = notif->refresh;
+                                                       if (t->next_refresh > notif->refresh) t->next_refresh = notif->refresh;
                                                }
                                                continue;
                                        }
@@ -1553,8 +1607,8 @@ void *notify_thread(void *thread_data) {
                                        notif->valid = 0;
                                        notif->id = NULL;
                                        rtm_update_error_state(t, notif, i, 1);
-                                       if (t->first_refresh > notif->refresh) t->first_refresh = notif->refresh;
-                                       continue;
+                                       error = 1;
+                                       goto cont;
                                } 
                                notif->id_str = edg_wll_NotifIdUnparse(notif->id);
                                lprintf(t, INF, "created %d. notification '%s' (%s), valid: %s", i, notif->id_str, rtm_notiftype2str(notif->type), time2str(t, notif->valid));
@@ -1608,8 +1662,9 @@ void *notify_thread(void *thread_data) {
                                        notif->id_str = NULL;
                                        notif->valid = 0;
                                        rtm_update_error_state(t, notif, i, 1);
-                                       if (t->first_refresh > notif->refresh) t->first_refresh = notif->refresh;
-                                       continue;
+                                       error = 1;
+                                       if (t->next_refresh > notif->refresh) t->next_refresh = notif->refresh;
+                                       goto cont;
                                } else {
                                        for (j = 0; jobstates[j].state != EDG_WLL_JOB_UNDEF; j++) {
                                                notif->last_update = jobstates[j].lastUpdateTime.tv_sec + jobstates[j].lastUpdateTime.tv_usec / 1000000.0;
@@ -1667,11 +1722,17 @@ void *notify_thread(void *thread_data) {
                                lprintf(t, INS, "no change in %d. notification '%s' (%s)", i, notif->id_str, rtm_notiftype2str(notif->type));
                        }
 
-                       if (updated) {
-                               assert(notif->valid);
+cont:
+                       if (updated || error) {
+                               if (!error) {
+                                       assert(notif->valid);
+                                       notif->refresh = notif->valid ? (now + ((notif->valid - now) >> 1)) : 0;
+                               
+                               }
                                // create or refresh OK, bootstrap if needed OK, store the new notification
                                updated = 0;
-                               notif->refresh = notif->valid ? (now + ((notif->valid - now) >> 1)) : 0;
+                               error = 0;
+
                                // quicker refresh (or recreate) if needed
                                now = time(NULL);
                                if (notif->valid && now >= notif->refresh) {
@@ -1683,7 +1744,7 @@ void *notify_thread(void *thread_data) {
                        }
 
                        // compute time of the next event from the new refresh on notification
-                       if (t->first_refresh > notif->refresh) t->first_refresh = notif->refresh;
+                       if (t->next_refresh > notif->refresh) t->next_refresh = notif->refresh;
                }
 
                // receive
@@ -1691,9 +1752,9 @@ void *notify_thread(void *thread_data) {
                // cycle here locally around NotifReceive, we know about next
                // refresh time
                //
-               lprintf(t, DBG, "waiting for the notifications up to %s...", t->first_refresh ? time2str(t, t->first_refresh) : "0 (no wait)");
-               while (t->first_refresh > now && !quit) {
-                       to.tv_sec = t->first_refresh - now;
+               lprintf(t, DBG, "waiting for the notifications up to %s...", t->next_refresh ? time2str(t, t->next_refresh) : "0 (no wait)");
+               while (t->next_refresh > now && !quit) {
+                       to.tv_sec = t->next_refresh - now;
                        if (to.tv_sec > RTM_NOTIF_READ_TIMEOUT) to.tv_sec = RTM_NOTIF_READ_TIMEOUT;
                        to.tv_usec = 0;
                        memset(&jobstat, 0, sizeof(jobstat));
@@ -1779,31 +1840,68 @@ exit:
 }
 
 
-int reconcile_config_db() {
-       int i, j, n, type, typestart, typeend;
+int reconcile_threads() {
+       int iserver, ithread, inotif, gran, mod, nnotifs;
+       int j, oldn, type, typestart, typeend;
        notif_t *a, *b;
        edg_wll_Context ctx = NULL;
        edg_wll_NotifId notifid;
+       thread_t *t;
 
        if (!config.cleanup) {
                if (config.silly) {
                        typestart = RTM_NOTIF_TYPE_OLD;
                        typeend = RTM_NOTIF_TYPE_OLD;
+                       nnotifs = 1;
                } else {
                        typestart = RTM_NOTIF_TYPE_STATUS;
                        typeend = RTM_NOTIF_TYPE_JDL;
+                       nnotifs = 2;
                }
-               n = db.n;
-               for (i = 0; i < config.nservers; i++) {
-                       a = config.notifs + i;
-                       for (type = typestart; type <= typeend; type++)
-                       {
-                               b = db_search_notif_by_server(db.notifs, n, a->server, a->port, type);
-                               if (!b) b = db_add_notif(NULL, type, 0, 0, 0, strdup(a->server), a->port, 1);
-                               else lprintf(NULL, INF, "found previous notification '%s' (%s)", b->id_str, rtm_notiftype2str(b->type));
-                               b->active = 1;
+
+               oldn = db.n;
+
+               // distribute LB servers between threads
+               // (always use existing loaded notification when found)
+               threads = (thread_t *)calloc(config.nthreads, sizeof(thread_t));
+               gran = config.nservers / config.nthreads, mod = config.nservers % config.nthreads;
+               t = NULL;
+               ithread = 0;
+               inotif = 0;
+               for (iserver = 0; iserver < config.nservers; iserver++) {
+                       // new thread
+                       if (!t || inotif + nnotifs > t->nservers) {
+                               assert(ithread < config.nthreads);   // proper number of threads
+                               assert(!t || inotif == t->nservers); // start or exactly distributed
+                               t = threads + ithread;
+                               t->nservers = nnotifs * ((ithread < mod) ? gran + 1 : gran);
+                               t->notifs = (notif_t *)calloc(t->nservers, sizeof(notif_t));
+                               lprintf(NULL, DBG, "%d. thread: %d notifications", ithread, t->nservers);
+                               ithread++;
+                               inotif = 0;
+                       }
+
+                       // next configured server
+                       a = config.notifs + iserver;
+                       for (type = typestart; type <= typeend; type++) {
+                               // find or create all notification types
+                               b = db_search_notif_by_server(db.notifs, oldn, a->server, a->port, type);
+                               if (!b) b = db_add_notif(NULL, type, 0, 0, 0, strdup(a->server), a->port, 1, 0);
+                               else {
+                                       if (b->id_str) {
+                                               lprintf(NULL, INF, "found previous notification '%s' (%s)", b->id_str, rtm_notiftype2str(b->type));
+                                       } else {
+                                               lprintf(NULL, INF, "found previous server %s:%d (%s), %d errors", b->server, b->port, rtm_notiftype2str(b->type), b->error);
+                                       }
+                                       b->active = 1;
+                               }
+                               // and add each to the thread
+                               notif_copy(t->notifs + inotif, b);
+                               lprintf(NULL, INS, "thread[%d][%d] <- %s:%d (%s), id %s", ithread-1, inotif, b->server, b->port, rtm_notiftype2str(b->type), b->id_str);
+                               inotif++;
                        }
                }
+               assert(db.n % nnotifs == 0); // each server all notifs
        }
 
        if (edg_wll_InitContext(&ctx) != 0) {
@@ -2066,14 +2164,13 @@ int config_load() {
 
                config.notifs = calloc(n, sizeof(notif_t));
                config.nservers = 0;
-               if ((err = glite_lbu_ExecSQL(db.dbctx, "SELECT DISTINCT lb, port FROM " RTM_DB_TABLE_LBS, &stmt)) < 0) {
+               if ((err = glite_lbu_ExecSQL(db.dbctx, "SELECT DISTINCT ip FROM " RTM_DB_TABLE_LBS, &stmt)) < 0) {
                        goto err;
                }
                while (config.nservers < n && (err = glite_lbu_FetchRow(stmt, 2, NULL, results)) > 0) {
                        config.notifs[config.nservers].server = strdup(results[0]);
-                       config.notifs[config.nservers++].port = atoi(results[1]);
+                       config.notifs[config.nservers++].port = GLITE_JOBID_DEFAULT_PORT;
                        free(results[0]);
-                       free(results[1]);
                }
                if (err < 0) goto err;
                glite_lbu_FreeStmt(&stmt);
@@ -2130,7 +2227,7 @@ void handle_signal(int num) {
 int main(int argn, char *argv[]) {
        struct sigaction sa;
        sigset_t sset;
-       int i, j, k, gran, mod, nnotifs;
+       int i, j;
        double t1, t2, last_summary = 0, start_time;
        thread_t *t;
        struct stat pstat;
@@ -2240,7 +2337,12 @@ int main(int argn, char *argv[]) {
                                lprintf(NULL, ERR, "caught signal %d from process %d, resurrecting...", WTERMSIG(status), watched);
                                // slow down the core generator ;-)
                                // disabled signals and ended child in pidfile, live with it
+                               pthread_sigmask(SIG_UNBLOCK, &sset, NULL);
+                               if (config.pidfile) {
+                                       if (remove(config.pidfile) == -1) lprintf(NULL, WRN, "can't remove pidfile '%s': %s", config.pidfile, strerror(errno));
+                               }
                                sleep(2);
+                               pthread_sigmask(SIG_BLOCK, &sset, NULL);
                                break;
                        default:
                                lprintf(NULL, WRN, "ended with signal %d", WTERMSIG(status));
@@ -2265,6 +2367,8 @@ int main(int argn, char *argv[]) {
                }
        }
 
+       // child continues...
+
        // threads && Globus
        if (edg_wll_gss_initialize()) {
                lprintf(NULL, ERR, "can't initialize GSS");
@@ -2300,7 +2404,7 @@ int main(int argn, char *argv[]) {
        if (load_notifs()) goto quit;
        // compare lb servers from configuration and notifications,
        // or clean up and exit if specified
-       if (reconcile_config_db()) goto quit;
+       if (reconcile_threads()) goto quit;
        if (config.cleanup) {
                retval = RTM_EXIT_OK;
                goto quit;
@@ -2319,33 +2423,12 @@ int main(int argn, char *argv[]) {
        // enable signals in main
        pthread_sigmask(SIG_UNBLOCK, &sset, NULL);
 
-       // distribution LB servers between threads
-       nnotifs = config.silly ? 1 : 2;
-       threads = (thread_t *)calloc(config.nthreads, sizeof(thread_t));
-       assert(db.n % nnotifs == 0); // each server RTM_NOTIF_TYPE_LAST notification types
-       gran = (db.n / nnotifs) / config.nthreads, mod = (db.n / nnotifs) % config.nthreads;
-       i = 0;
-       j = 0;
-       do {
-               assert(j < config.nthreads);
-               t = threads + j;
-               t->nservers = nnotifs * ((j < mod) ? gran + 1 : gran);
-               lprintf(NULL, DBG, "%d thread: %d notifications", j, t->nservers);
-               if (t->nservers) {
-                       t->notifs = (notif_t *)calloc(t->nservers, sizeof(notif_t));
-                       for (k = 0; k < t->nservers; k++) {
-                               notif_copy(t->notifs + k, db.notifs + i);
-                               i++;
-                       }
-               }
-               j++;
-       } while (i < db.n);
        // launch the threads
-       for (j = 0; j < config.nthreads; j++) {
-               t = threads + j;
-               t->id = j;
-               if (pthread_create(&threads[j].thread, NULL, notify_thread, t) != 0) {
-                       lprintf(NULL, ERR, "[main] can't create %d. thread: %s\n", j, strerror(errno));
+       for (i = 0; i < config.nthreads; i++) {
+               t = threads + i;
+               t->id = i;
+               if (pthread_create(&threads[i].thread, NULL, notify_thread, t) != 0) {
+                       lprintf(NULL, ERR, "[main] can't create %d. thread: %s\n", i, strerror(errno));
                        goto quit;
                }
        }