From: František Dvořák Date: Tue, 1 Dec 2009 12:06:11 +0000 (+0000) Subject: Slight refactoring of notifications loading: X-Git-Tag: glite-lb-harvester_R_1_0_1_1~32 X-Git-Url: http://scientific.zcu.cz/git/?a=commitdiff_plain;h=0c701344eab48d0c759fd32e9374bb5749704fc7;p=jra1mw.git Slight refactoring of notifications loading: - preserve error counter (save not registered notifs too) - more robust reconciliation - distribution to threads in reconciliation Different config DB schema for RTM. --- diff --git a/org.glite.lb.harvester/examples/test.sql b/org.glite.lb.harvester/examples/test.sql index 7bf81b5..15736f6 100644 --- a/org.glite.lb.harvester/examples/test.sql +++ b/org.glite.lb.harvester/examples/test.sql @@ -1,3 +1,19 @@ +-- +-- Inicialization (replace pgsql by actual postgres superuser): +-- +-- 1) grant privileges, someting like this in $data/pg_hba.conf: +-- local all all trust +-- +-- 2) create user: +-- createuser -U pgsql rtm +-- +-- 3) crate database: +-- createuser -U pgsql rtm +-- +-- 4) create tables: +-- psql -U rtm rtm < test.sql +-- + CREATE TABLE "jobs" ( jobid VARCHAR PRIMARY KEY, lb VARCHAR, @@ -15,10 +31,14 @@ CREATE TABLE "jobs" ( ); CREATE TABLE "lb20" ( - lb VARCHAR, - port INTEGER, + ip TEXT NOT NULL, + branch TEXT NOT NULL, + serv_version TEXT NOT NULL, + monitored BOOLEAN DEFAULT FALSE, + last_seen DATE, + first_seen DATE, - PRIMARY KEY(lb, port) + PRIMARY KEY(ip) ); CREATE TABLE "notifs" ( diff --git a/org.glite.lb.harvester/src/harvester.c b/org.glite.lb.harvester/src/harvester.c index 7af1b5e..9fe52a6 100644 --- a/org.glite.lb.harvester/src/harvester.c +++ b/org.glite.lb.harvester/src/harvester.c @@ -102,6 +102,10 @@ #endif #define RTM_FILE_NOTIFS "/var/tmp/notifs.txt" +#define RTM_FILE_NOTIF_PRINTF "%s\t%s\t%s\t%s\t%s\t%d\n" +#define RTM_FILE_NOTIF_SCANF "%511[^\t]\t%511[^\t]\t%511[^\t]\t%511[^\t]\t%511[^\t]\t%511[^\t\r\n]\n" +#define RTM_FILE_NOTIF_NUM 6 + #define WLCG_FILENAME_TEMPLATE "/tmp/wlcg_%02d_XXXXXX" #define WLCG_COMMAND_MESSAGE "/opt/lcg/bin/msg-publish -c /opt/lcg/etc/msg-publish.conf org.wlcg.usage.jobStatus %s" #define WLCG_BINARY "/opt/lcg/bin/msg-publish" @@ -120,11 +124,11 @@ #define glite_jobid_getServer edg_wlc_JobIdGetServer #define glite_jobid_getServerParts edg_wlc_JobIdGetServerParts #define glite_jobid_getUnique edg_wlc_JobIdGetUnique -#endif -#ifndef GLITE_JOBID_DEFAULT_PORT -#define GLITE_JOBID_DEFAULT_PORT GLITE_WMSC_JOBID_DEFAULT_PORT #define edg_wll_NotifNew(CTX, CONDS, FLAGS, SOCK, LADDR, ID, VALID) edg_wll_NotifNew((CTX), (CONDS), (SOCK), (LADDR), (ID), (VALID)) #define edg_wll_JDLField(STAT, NAME) NULL +#ifndef GLITE_JOBID_DEFAULT_PORT +#define GLITE_JOBID_DEFAULT_PORT GLITE_WMSC_JOBID_DEFAULT_PORT +#endif #endif // TODO: ipv6? :-) @@ -149,7 +153,7 @@ typedef struct { pthread_t thread; notif_t *notifs; int nservers; - time_t first_refresh; + time_t next_refresh; char time_s[100]; char *dash_filename; int dash_fd; @@ -429,6 +433,32 @@ void rtm_timestamp2str(double t, char **str) { } +int rtm_str2time(const char *s) { + time_t t; + + if (s && memcmp(s, "-", 2) != 0) { + t = glite_lbu_StrToTime(s); + if (t == (time_t)-1) return 0; + } else + t = 0; + + return t; +} + + +int rtm_str2timestamp(const char *s) { + double t; + + if (s && memcmp(s, "-", 2) != 0) { + t = glite_lbu_StrToTimestamp(s); + if (t <= 0.5) return 0.0; + } else + t = 0.0; + + return t; +} + + int rtm_str2notiftype(const char *str) { if (strcasecmp(str, "STATUS") == 0) return RTM_NOTIF_TYPE_STATUS; if (strcasecmp(str, "DONE") == 0) return RTM_NOTIF_TYPE_DONE; @@ -515,7 +545,7 @@ int wlcg_send_message(thread_t *t) { char *command; // WLCG message - if (t->dash_fd) { + if (t->dash_fd) { // send only if anything to send close(t->dash_fd); asprintf(&command, "'%s' -c '%s' '%s' '%s'", config.wlcg_binary, config.wlcg_config, config.wlcg_topic, t->dash_filename); lprintf(t, DBG, "calling %s", command); @@ -634,7 +664,7 @@ static void db_free(__attribute((unused))thread_t *t, glite_lbu_DBContext dbctx) #endif -static notif_t *db_add_notif(char *notifid, int type, time_t valid, time_t refresh, double last_update, char *server, int port, int active) { +static notif_t *db_add_notif(char *notifid, int type, time_t valid, time_t refresh, double last_update, char *server, int port, int active, int errors) { void *tmp; notif_t *notif; @@ -653,6 +683,7 @@ static notif_t *db_add_notif(char *notifid, int type, time_t valid, time_t refre notif->server = server; notif->port = port; notif->active = active; + notif->error = errors; db.n++; return notif; @@ -665,7 +696,7 @@ static int db_save_notifs_file(thread_t *t) { int retval = 1; notif_t *notif; int i; - char *valid_str = NULL, *refresh_str = NULL, *last_update_str = NULL; + char *valid_str = NULL, *refresh_str = NULL, *last_update_str = NULL, *id_str = NULL; asprintf(&filename, "%s-new", config.notif_file); if ((f = fopen(filename, "wt")) == NULL) { @@ -679,17 +710,22 @@ static int db_save_notifs_file(thread_t *t) { lprintf(t, DBG, "not saving inactive notif %s (%s), server %s:%d", notif->id_str, rtm_notiftype2str(notif->type), notif->server, notif->port); continue; } - if (notif->id_str) { + + if (notif->id_str) id_str = strdup(notif->id_str); + else if (notif->error) asprintf(&id_str, "%s:%d", notif->server, notif->port); + if (id_str) { rtm_time2str(notif->valid, &valid_str); rtm_time2str(notif->refresh, &refresh_str); rtm_timestamp2str(notif->last_update, &last_update_str); - fprintf(f, "%s\t%s\t%s\t%s\t%s\n", notif->id_str, rtm_notiftype2str(notif->type), valid_str, refresh_str, last_update_str); + fprintf(f, RTM_FILE_NOTIF_PRINTF, id_str, rtm_notiftype2str(notif->type), valid_str, refresh_str, last_update_str, notif->error); free(valid_str); valid_str = NULL; free(refresh_str); refresh_str = NULL; free(last_update_str); last_update_str = NULL; } + free(id_str); + id_str = NULL; } fclose(f); if (rename(filename, config.notif_file) != 0) { @@ -712,7 +748,7 @@ static int db_save_notifs_sql(thread_t *t) { notif_t *notif; int i; char *sql = NULL, *valid_str = NULL, *refresh_str = NULL, *last_update_str = NULL; - const char *type_str; + const char *type_str, *amp; for (i = 0; i < db.n; i++) { notif = db.notifs + i; @@ -723,20 +759,21 @@ static int db_save_notifs_sql(thread_t *t) { } */ type_str = rtm_notiftype2str(notif->type); - if (notif->id_str) { + if (notif->id_str || notif->error) { if (notif->valid) glite_lbu_TimeToDB(db.dbctx, notif->valid, &valid_str); else valid_str = strdup("NULL"); if (notif->refresh) glite_lbu_TimeToDB(db.dbctx, notif->refresh, &refresh_str); else refresh_str = strdup("NULL"); if (notif->last_update) glite_lbu_TimestampToDB(db.dbctx, notif->last_update, &last_update_str); else last_update_str = strdup("NULL"); - trio_asprintf(&sql, "UPDATE notifs SET notifid='%|Ss', valid=%s, refresh=%s, last_update=%s WHERE lb='%|Ss' AND port=%d AND notiftype='%|Ss'", notif->id_str, valid_str, refresh_str, last_update_str, notif->server, notif->port, type_str); + amp = notif->id_str ? "'" : " "; + trio_asprintf(&sql, "UPDATE notifs SET notifid=%s%|Ss%s, valid=%s, refresh=%s, last_update=%s, errors=%d WHERE lb='%|Ss' AND port=%d AND notiftype='%|Ss'", amp, notif->id_str ? : "NULL", amp, valid_str, refresh_str, last_update_str, notif->error, notif->server, notif->port, type_str); switch (glite_lbu_ExecSQL(db.dbctx, sql, NULL)) { case 0: // not found - insert // can be handy when using file as input of LBs free(sql); - trio_asprintf(&sql, "INSERT INTO notifs (lb, port, notifid, notiftype, valid, refresh, last_update) VALUES ('%|Ss', %d, '%|Ss', '%|Ss', %s, %s, %s)", notif->server, notif->port, notif->id_str, type_str, valid_str, refresh_str, last_update_str); + trio_asprintf(&sql, "INSERT INTO notifs (lb, port, notifid, notiftype, valid, refresh, last_update, errors) VALUES ('%|Ss', %d, %s%|Ss%s, '%|Ss', %s, %s, %s, %d)", notif->server, notif->port, amp, notif->id_str ? : "NULL", amp, type_str, valid_str, refresh_str, last_update_str, notif->error); switch (glite_lbu_ExecSQL(db.dbctx, sql, NULL)) { case -1: lprintf_dbctx(t, ERR, "notif '%s' (%s) insert failed", notif->id_str, type_str); @@ -1137,7 +1174,7 @@ static int rtm_update_error_state(thread_t *t, notif_t *notif, int index, int is old_error = notif->error; if (is_error) { - if (!notif->error++) notif->refresh = time(NULL); + if (!notif->error++ || !notif->refresh) notif->refresh = time(NULL); max_count = config.dive / RTM_ERROR_REPEAT_RATE / 2; if (max_count <= 0) max_count = 1; notif->refresh += (notif->error <= max_count ? notif->error : max_count) * RTM_ERROR_REPEAT_RATE; @@ -1169,7 +1206,7 @@ int rtm_update_notif(thread_t *t, notif_t *new_notif, int store) { if (new_notif) { if ((notif = db_search_notif_by_server(db.notifs, db.n, new_notif->server, new_notif->port, new_notif->type)) == NULL) { - if (db_add_notif(strdup(new_notif->id_str), new_notif->type, new_notif->valid, new_notif->refresh, new_notif->last_update, strdup(new_notif->server), new_notif->port, 1) == NULL) { + if (db_add_notif(strdup(new_notif->id_str), new_notif->type, new_notif->valid, new_notif->refresh, new_notif->last_update, strdup(new_notif->server), new_notif->port, 1, 0) == NULL) { lprintf(t, ERR, "can't realloc"); goto quit; } @@ -1218,7 +1255,7 @@ int load_notifs_file() { time_t valid, refresh; double last_update; edg_wll_NotifId id; - int type; + int type, i, errcnt, port; int retval = 1; if ((f = fopen(config.notif_file, "rt")) == NULL) { @@ -1226,45 +1263,47 @@ int load_notifs_file() { return 0; } - results[0] = malloc(5 * 512); - results[1] = results[0] + 512; - results[2] = results[0] + 1024; - results[3] = results[0] + 1536; - results[4] = results[0] + 2048; - while ((err = fscanf(f, "%511[^\t]\t%511[^\t]\t%511[^\t]\t%511[^\t]\t%511[^\t\r\n]\n", results[0], results[1], results[2], results[3], results[4])) == 5) { + results[0] = malloc(RTM_FILE_NOTIF_NUM * 512); + for (i = 1; i < RTM_FILE_NOTIF_NUM; i++) { + results[i] = results[0] + i * 512; + } + while ((err = fscanf(f, RTM_FILE_NOTIF_SCANF, results[0], results[1], results[2], results[3], results[4], results[5])) == RTM_FILE_NOTIF_NUM) { notifidstr = results[0]; if ((type = rtm_str2notiftype(results[1])) == -1) { lprintf(NULL, ERR, "unknown notification type '%s' in '%s'", results[1], notifidstr); continue; } - valid = 0; - if (results[2] && strcasecmp(results[2], "-") != 0) { - valid = glite_lbu_StrToTime(results[2]); - } - - refresh = 0; - if (results[3] && strcasecmp(results[3], "-") != 0) { - refresh = glite_lbu_StrToTime(results[2]); - } + valid = rtm_str2time(results[2]); + refresh = rtm_str2time(results[3]); + last_update = rtm_str2timestamp(results[4]); - last_update = 0; - if (results[4] && strcasecmp(results[4], "-") != 0) { - last_update = glite_lbu_StrToTimestamp(results[4]); + errcnt = 0; + if (results[5] && strcasecmp(results[5], "-") != 0) { + errcnt = atoi(results[5]); } - if ((new_notif = db_add_notif(strdup(notifidstr), type, valid, refresh, last_update, NULL, 0, 0)) == NULL) { - lprintf(NULL, ERR, "can't alloc"); - goto quit; - } - if (edg_wll_NotifIdParse(notifidstr, &id) != 0) { - lprintf(NULL, WRN, "can't parse notification ID '%s'", notifidstr); - notif_free(new_notif); - db.n--; - continue; + if (errcnt) { + if (sscanf(notifidstr, "%511[^:]:%d", results[1], &port) != 2) { + lprintf(NULL, WRN, "can't parse server specification '%s'", notifidstr); + continue; + } + if ((new_notif = db_add_notif(NULL, type, valid, refresh, last_update, strdup(results[1]), port, 0, errcnt)) == NULL) { + lprintf(NULL, ERR, "can't alloc"); + goto quit; + } + } else { + if (edg_wll_NotifIdParse(notifidstr, &id) != 0) { + lprintf(NULL, WRN, "can't parse notification ID '%s'", notifidstr); + continue; + } + if ((new_notif = db_add_notif(strdup(notifidstr), type, valid, refresh, last_update, NULL, 0, 0, errcnt)) == NULL) { + lprintf(NULL, ERR, "can't alloc"); + goto quit; + } + edg_wll_NotifIdGetServerParts(id, &new_notif->server, &new_notif->port); + edg_wll_NotifIdFree(id); } - edg_wll_NotifIdGetServerParts(id, &new_notif->server, &new_notif->port); - edg_wll_NotifIdFree(id); } if (err == EOF) retval = 0; else lprintf(NULL, ERR, "can't parse notification file '%s'", config.notif_file); @@ -1283,24 +1322,26 @@ int load_notifs_sql() { time_t valid, refresh; double last_update; edg_wll_NotifId id; - int type; + int type, i, errcnt; int retval = 1; glite_lbu_Statement stmt = NULL; - char *results[5]; + char *results[8]; - if (glite_lbu_ExecSQL(db.dbctx, "SELECT notifid, notiftype, valid, refresh, last_update FROM notifs WHERE notifid IS NOT NULL", &stmt) == -1) { + if (glite_lbu_ExecSQL(db.dbctx, "SELECT notifid, notiftype, valid, refresh, last_update, errors, lb, port FROM notifs", &stmt) == -1) { lprintf_dbctx(NULL, ERR, "fetching notification failed"); goto quit; } - while ((err = glite_lbu_FetchRow(stmt, 5, NULL, results)) > 0) { - notifidstr = results[0]; + while ((err = glite_lbu_FetchRow(stmt, 8, NULL, results)) > 0) { + if (results[0] && results[0][0]) notifidstr = results[0]; + else { + notifidstr = NULL; + free(results[0]); + } results[0] = NULL; + if ((type = rtm_str2notiftype(results[1])) == -1) { lprintf(NULL, ERR, "unknown notification type '%s' in '%s'", results[1], notifidstr); - free(results[1]); - free(results[2]); - free(results[3]); - free(results[4]); + for (i = 0; i < 8; i++) free(results[i]); continue; } free(results[1]); @@ -1323,19 +1364,29 @@ int load_notifs_sql() { } free(results[4]); - if ((new_notif = db_add_notif(notifidstr, type, valid, refresh, last_update, NULL, 0, 0)) == NULL) { + errcnt = 0; + if (results[5] && results[5][0]) errcnt = atoi(results[5]); + free(results[5]); + + if ((new_notif = db_add_notif(notifidstr, type, valid, refresh, last_update, results[6], atoi(results[7]), 0, errcnt)) == NULL) { free(notifidstr); + free(results[6]); + free(results[7]); lprintf(NULL, ERR, "can't alloc"); goto quit; } - if (edg_wll_NotifIdParse(notifidstr, &id) != 0) { - lprintf(NULL, WRN, "can't parse notification IDs '%s'", notifidstr); - notif_free(new_notif); - db.n--; - continue; + free(results[6]); + free(results[7]); + if (notifidstr) { + if (edg_wll_NotifIdParse(notifidstr, &id) != 0) { + lprintf(NULL, WRN, "can't parse notification IDs '%s'", notifidstr); + notif_free(new_notif); + db.n--; + continue; + } + edg_wll_NotifIdGetServerParts(id, &new_notif->server, &new_notif->port); + edg_wll_NotifIdFree(id); } - edg_wll_NotifIdGetServerParts(id, &new_notif->server, &new_notif->port); - edg_wll_NotifIdFree(id); } if (err == 0) retval = 0; else lprintf_dbctx(NULL, ERR, "fetching failed"); @@ -1347,7 +1398,7 @@ quit: int load_notifs() { - int ret; + int i, ret; pthread_mutex_lock(&db.lock); @@ -1357,6 +1408,9 @@ int load_notifs() { #else ret = load_notifs_file(); #endif + // try to reconnect on bad notifications immediately + for (i = 0; i < db.n; i++) + if (db.notifs[i].error) db.notifs[i].refresh = 0; pthread_mutex_unlock(&db.lock); @@ -1383,7 +1437,7 @@ void *notify_thread(void *thread_data) { edg_wll_JobStat jobstat, *jobstates; notif_t *notif, *notif_jdl; edg_wll_QueryRec *conditions[3] = { NULL, NULL, NULL }, condition[2], condition2[2]; - int sock = -1, updated = 0, received = 0; + int sock = -1, updated = 0, error = 0, received = 0; thread_t *t = (thread_t *)thread_data; edg_wll_Context ctx = NULL; int flags = 0; @@ -1450,7 +1504,7 @@ void *notify_thread(void *thread_data) { // while (!quit) { now = time(NULL); - t->first_refresh = now + RTM_NOTIF_LOOP_MAX_TIME; + t->next_refresh = now + RTM_NOTIF_LOOP_MAX_TIME; for (i = 0; i < t->nservers; i++) { notif = t->notifs + i; if (!notif->active) { @@ -1461,7 +1515,7 @@ void *notify_thread(void *thread_data) { if (notif->error) { if (notif->refresh > now) { lprintf(t, INS, "not planned to retry previously failed %d. notification '%s' (%s), plan %s", i, notif->id_str, rtm_notiftype2str(notif->type), time2str(t, notif->refresh)); - if (t->first_refresh > notif->refresh) t->first_refresh = notif->refresh; + if (t->next_refresh > notif->refresh) t->next_refresh = notif->refresh; continue; } lprintf(t, DBG, "retry previously failed %d. notification '%s' (%s)", i, notif->id_str, rtm_notiftype2str(notif->type)); @@ -1483,7 +1537,7 @@ void *notify_thread(void *thread_data) { // next retry of STATUS stright before the JDL if (notif_jdl) { notif->refresh = notif_jdl->refresh; - if (t->first_refresh > notif->refresh) t->first_refresh = notif->refresh; + if (t->next_refresh > notif->refresh) t->next_refresh = notif->refresh; } continue; } @@ -1553,8 +1607,8 @@ void *notify_thread(void *thread_data) { notif->valid = 0; notif->id = NULL; rtm_update_error_state(t, notif, i, 1); - if (t->first_refresh > notif->refresh) t->first_refresh = notif->refresh; - continue; + error = 1; + goto cont; } notif->id_str = edg_wll_NotifIdUnparse(notif->id); lprintf(t, INF, "created %d. notification '%s' (%s), valid: %s", i, notif->id_str, rtm_notiftype2str(notif->type), time2str(t, notif->valid)); @@ -1608,8 +1662,9 @@ void *notify_thread(void *thread_data) { notif->id_str = NULL; notif->valid = 0; rtm_update_error_state(t, notif, i, 1); - if (t->first_refresh > notif->refresh) t->first_refresh = notif->refresh; - continue; + error = 1; + if (t->next_refresh > notif->refresh) t->next_refresh = notif->refresh; + goto cont; } else { for (j = 0; jobstates[j].state != EDG_WLL_JOB_UNDEF; j++) { notif->last_update = jobstates[j].lastUpdateTime.tv_sec + jobstates[j].lastUpdateTime.tv_usec / 1000000.0; @@ -1667,11 +1722,17 @@ void *notify_thread(void *thread_data) { lprintf(t, INS, "no change in %d. notification '%s' (%s)", i, notif->id_str, rtm_notiftype2str(notif->type)); } - if (updated) { - assert(notif->valid); +cont: + if (updated || error) { + if (!error) { + assert(notif->valid); + notif->refresh = notif->valid ? (now + ((notif->valid - now) >> 1)) : 0; + + } // create or refresh OK, bootstrap if needed OK, store the new notification updated = 0; - notif->refresh = notif->valid ? (now + ((notif->valid - now) >> 1)) : 0; + error = 0; + // quicker refresh (or recreate) if needed now = time(NULL); if (notif->valid && now >= notif->refresh) { @@ -1683,7 +1744,7 @@ void *notify_thread(void *thread_data) { } // compute time of the next event from the new refresh on notification - if (t->first_refresh > notif->refresh) t->first_refresh = notif->refresh; + if (t->next_refresh > notif->refresh) t->next_refresh = notif->refresh; } // receive @@ -1691,9 +1752,9 @@ void *notify_thread(void *thread_data) { // cycle here locally around NotifReceive, we know about next // refresh time // - lprintf(t, DBG, "waiting for the notifications up to %s...", t->first_refresh ? time2str(t, t->first_refresh) : "0 (no wait)"); - while (t->first_refresh > now && !quit) { - to.tv_sec = t->first_refresh - now; + lprintf(t, DBG, "waiting for the notifications up to %s...", t->next_refresh ? time2str(t, t->next_refresh) : "0 (no wait)"); + while (t->next_refresh > now && !quit) { + to.tv_sec = t->next_refresh - now; if (to.tv_sec > RTM_NOTIF_READ_TIMEOUT) to.tv_sec = RTM_NOTIF_READ_TIMEOUT; to.tv_usec = 0; memset(&jobstat, 0, sizeof(jobstat)); @@ -1779,31 +1840,68 @@ exit: } -int reconcile_config_db() { - int i, j, n, type, typestart, typeend; +int reconcile_threads() { + int iserver, ithread, inotif, gran, mod, nnotifs; + int j, oldn, type, typestart, typeend; notif_t *a, *b; edg_wll_Context ctx = NULL; edg_wll_NotifId notifid; + thread_t *t; if (!config.cleanup) { if (config.silly) { typestart = RTM_NOTIF_TYPE_OLD; typeend = RTM_NOTIF_TYPE_OLD; + nnotifs = 1; } else { typestart = RTM_NOTIF_TYPE_STATUS; typeend = RTM_NOTIF_TYPE_JDL; + nnotifs = 2; } - n = db.n; - for (i = 0; i < config.nservers; i++) { - a = config.notifs + i; - for (type = typestart; type <= typeend; type++) - { - b = db_search_notif_by_server(db.notifs, n, a->server, a->port, type); - if (!b) b = db_add_notif(NULL, type, 0, 0, 0, strdup(a->server), a->port, 1); - else lprintf(NULL, INF, "found previous notification '%s' (%s)", b->id_str, rtm_notiftype2str(b->type)); - b->active = 1; + + oldn = db.n; + + // distribute LB servers between threads + // (always use existing loaded notification when found) + threads = (thread_t *)calloc(config.nthreads, sizeof(thread_t)); + gran = config.nservers / config.nthreads, mod = config.nservers % config.nthreads; + t = NULL; + ithread = 0; + inotif = 0; + for (iserver = 0; iserver < config.nservers; iserver++) { + // new thread + if (!t || inotif + nnotifs > t->nservers) { + assert(ithread < config.nthreads); // proper number of threads + assert(!t || inotif == t->nservers); // start or exactly distributed + t = threads + ithread; + t->nservers = nnotifs * ((ithread < mod) ? gran + 1 : gran); + t->notifs = (notif_t *)calloc(t->nservers, sizeof(notif_t)); + lprintf(NULL, DBG, "%d. thread: %d notifications", ithread, t->nservers); + ithread++; + inotif = 0; + } + + // next configured server + a = config.notifs + iserver; + for (type = typestart; type <= typeend; type++) { + // find or create all notification types + b = db_search_notif_by_server(db.notifs, oldn, a->server, a->port, type); + if (!b) b = db_add_notif(NULL, type, 0, 0, 0, strdup(a->server), a->port, 1, 0); + else { + if (b->id_str) { + lprintf(NULL, INF, "found previous notification '%s' (%s)", b->id_str, rtm_notiftype2str(b->type)); + } else { + lprintf(NULL, INF, "found previous server %s:%d (%s), %d errors", b->server, b->port, rtm_notiftype2str(b->type), b->error); + } + b->active = 1; + } + // and add each to the thread + notif_copy(t->notifs + inotif, b); + lprintf(NULL, INS, "thread[%d][%d] <- %s:%d (%s), id %s", ithread-1, inotif, b->server, b->port, rtm_notiftype2str(b->type), b->id_str); + inotif++; } } + assert(db.n % nnotifs == 0); // each server all notifs } if (edg_wll_InitContext(&ctx) != 0) { @@ -2066,14 +2164,13 @@ int config_load() { config.notifs = calloc(n, sizeof(notif_t)); config.nservers = 0; - if ((err = glite_lbu_ExecSQL(db.dbctx, "SELECT DISTINCT lb, port FROM " RTM_DB_TABLE_LBS, &stmt)) < 0) { + if ((err = glite_lbu_ExecSQL(db.dbctx, "SELECT DISTINCT ip FROM " RTM_DB_TABLE_LBS, &stmt)) < 0) { goto err; } while (config.nservers < n && (err = glite_lbu_FetchRow(stmt, 2, NULL, results)) > 0) { config.notifs[config.nservers].server = strdup(results[0]); - config.notifs[config.nservers++].port = atoi(results[1]); + config.notifs[config.nservers++].port = GLITE_JOBID_DEFAULT_PORT; free(results[0]); - free(results[1]); } if (err < 0) goto err; glite_lbu_FreeStmt(&stmt); @@ -2130,7 +2227,7 @@ void handle_signal(int num) { int main(int argn, char *argv[]) { struct sigaction sa; sigset_t sset; - int i, j, k, gran, mod, nnotifs; + int i, j; double t1, t2, last_summary = 0, start_time; thread_t *t; struct stat pstat; @@ -2240,7 +2337,12 @@ int main(int argn, char *argv[]) { lprintf(NULL, ERR, "caught signal %d from process %d, resurrecting...", WTERMSIG(status), watched); // slow down the core generator ;-) // disabled signals and ended child in pidfile, live with it + pthread_sigmask(SIG_UNBLOCK, &sset, NULL); + if (config.pidfile) { + if (remove(config.pidfile) == -1) lprintf(NULL, WRN, "can't remove pidfile '%s': %s", config.pidfile, strerror(errno)); + } sleep(2); + pthread_sigmask(SIG_BLOCK, &sset, NULL); break; default: lprintf(NULL, WRN, "ended with signal %d", WTERMSIG(status)); @@ -2265,6 +2367,8 @@ int main(int argn, char *argv[]) { } } + // child continues... + // threads && Globus if (edg_wll_gss_initialize()) { lprintf(NULL, ERR, "can't initialize GSS"); @@ -2300,7 +2404,7 @@ int main(int argn, char *argv[]) { if (load_notifs()) goto quit; // compare lb servers from configuration and notifications, // or clean up and exit if specified - if (reconcile_config_db()) goto quit; + if (reconcile_threads()) goto quit; if (config.cleanup) { retval = RTM_EXIT_OK; goto quit; @@ -2319,33 +2423,12 @@ int main(int argn, char *argv[]) { // enable signals in main pthread_sigmask(SIG_UNBLOCK, &sset, NULL); - // distribution LB servers between threads - nnotifs = config.silly ? 1 : 2; - threads = (thread_t *)calloc(config.nthreads, sizeof(thread_t)); - assert(db.n % nnotifs == 0); // each server RTM_NOTIF_TYPE_LAST notification types - gran = (db.n / nnotifs) / config.nthreads, mod = (db.n / nnotifs) % config.nthreads; - i = 0; - j = 0; - do { - assert(j < config.nthreads); - t = threads + j; - t->nservers = nnotifs * ((j < mod) ? gran + 1 : gran); - lprintf(NULL, DBG, "%d thread: %d notifications", j, t->nservers); - if (t->nservers) { - t->notifs = (notif_t *)calloc(t->nservers, sizeof(notif_t)); - for (k = 0; k < t->nservers; k++) { - notif_copy(t->notifs + k, db.notifs + i); - i++; - } - } - j++; - } while (i < db.n); // launch the threads - for (j = 0; j < config.nthreads; j++) { - t = threads + j; - t->id = j; - if (pthread_create(&threads[j].thread, NULL, notify_thread, t) != 0) { - lprintf(NULL, ERR, "[main] can't create %d. thread: %s\n", j, strerror(errno)); + for (i = 0; i < config.nthreads; i++) { + t = threads + i; + t->id = i; + if (pthread_create(&threads[i].thread, NULL, notify_thread, t) != 0) { + lprintf(NULL, ERR, "[main] can't create %d. thread: %s\n", i, strerror(errno)); goto quit; } }