From 5016dbbe8ec7bc4f4a7107420f359074cdbe8f92 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Franti=C5=A1ek=20Dvo=C5=99=C3=A1k?= Date: Fri, 15 Jan 2010 20:25:23 +0000 Subject: [PATCH] Purge support in RTM database: - support in harvester, reshuffle the code - added option --no-purge - update docs - preliminary purge test Requires recent LB server from HEAD. --- org.glite.lb.harvester/Makefile | 8 +- org.glite.lb.harvester/doc/glite-lb-harvester.sgml | 14 + org.glite.lb.harvester/examples/test.sh | 44 +++- org.glite.lb.harvester/src/harvester.c | 282 +++++++++++++-------- 4 files changed, 240 insertions(+), 108 deletions(-) diff --git a/org.glite.lb.harvester/Makefile b/org.glite.lb.harvester/Makefile index 5fb9fa4..15f0b92 100644 --- a/org.glite.lb.harvester/Makefile +++ b/org.glite.lb.harvester/Makefile @@ -46,7 +46,7 @@ INSTALL:=libtool --mode=install install default: all -compile all: harvester doc +compile all: harvester doc debug check: @@ -58,8 +58,10 @@ stage: compile $(MAKE) install PREFIX=${stagedir} install: compile - -mkdir -p ${PREFIX}/bin ${PREFIX}/share/doc/${package}-${version} ${PREFIX}/share/man/man8 + -mkdir -p ${PREFIX}/bin ${PREFIX}/examples ${PREFIX}/share/doc/${package}-${version} ${PREFIX}/share/man/man8 ${INSTALL} -m 755 harvester ${PREFIX}/bin/glite-lb-harvester + ${INSTALL} -m 755 harvester-dbg ${PREFIX}/examples/glite-lb-harvester-dbg + ${INSTALL} -m 755 ../examples/test.sh ${PREFIX}/examples/glite-lb-harvester-test.sh ${INSTALL} -m 444 ../doc/README ${PREFIX}/share/doc/${package}-${version} ${INSTALL} -m 444 glite-lb-harvester.8 ${PREFIX}/share/man/man8 @@ -75,7 +77,7 @@ harvester-dbg: harvester-dbg.o ${LINK} -o $@ $+ ${LIBS} harvester-dbg.o: harvester.c - ${COMPILE} -Werror -DLOG=1 -DWITH_RTM_SQL_STORAGE=1 -c $< -o $@ + ${COMPILE} -DLOG=1 -DWITH_RTM_SQL_STORAGE=1 -c $< -o $@ %.o: %.c ${COMPILE} -c $< diff --git a/org.glite.lb.harvester/doc/glite-lb-harvester.sgml b/org.glite.lb.harvester/doc/glite-lb-harvester.sgml index 55f7330..44d758f 100644 --- a/org.glite.lb.harvester/doc/glite-lb-harvester.sgml +++ b/org.glite.lb.harvester/doc/glite-lb-harvester.sgml @@ -98,6 +98,11 @@ + -u + --no-purge + + + -w --wlcg @@ -295,6 +300,15 @@ Each notification automatically expires. But if you know, than notifications us + | + +By default jobs are purged from local database when purged on L&B server. This option forces keeping all jobs in database, only with changed state to 'Purged'. + +For using together with (). + + + + | Enables delivery to MSG publish. Messages are sent by executing a binary with proper parameters. diff --git a/org.glite.lb.harvester/examples/test.sh b/org.glite.lb.harvester/examples/test.sh index 7105ee1..4bfba32 100755 --- a/org.glite.lb.harvester/examples/test.sh +++ b/org.glite.lb.harvester/examples/test.sh @@ -100,7 +100,9 @@ init() { jobreg="$GLITE_LOCATION/examples/glite-lb-job_reg -m `hostname -f`:${GLITE_LB_TEST_SERVER_PORT} -s UserInterface" logev="$GLITE_LOCATION/bin/glite-lb-logevent -x -S `pwd`/LB/proxy.sockstore.sock -U localhost" - for dir in "$GLITE_LOCATION/bin" "`pwd`/../build" "`pwd`"; do + purge="$GLITE_LOCATION/bin/glite-lb-purge" + [ -x "$purge" ] || purge="$GLITE_LOCATION/sbin/glite-lb-purge" + for dir in "$GLITE_LOCATION/examlpes" "`pwd`/../build" "`pwd`"; do if [ -x "$dir/glite-lb-harvester-dbg" ]; then rtm="$dir/glite-lb-harvester-dbg" fi @@ -732,7 +734,7 @@ EOF echo "FAIL" return 0 fi - echo -n "OK" + echo -n "OK " echo -n "changed after waiting..." ev -s WorkloadManager -e EnQueued --queue "very long and chaotic queue" --destination LogMonitor --dest_host localhost --dest_instance pid$$ --job "(car 'testing=true)" --result=OK || return $? @@ -754,6 +756,40 @@ EOF } +test_purge() { + ok=0 + + echo -n "purge." + pg_get "SELECT jobid FROM jobs" || return $? + if [ -z "$lines" -o $lines -le 0 ]; then + echo "no jobs! FAIL" + return 0 + fi + echo -n "P" + jobunique=`echo "$result" | head -n 1 | tr -d '\n'` + jobid="https://`hostname -f`:${GLITE_LB_TEST_SERVER_PORT}/$jobunique" + echo $jobid > jobs + echo "${purge} -a1 -c1 -n1 -e1 -o1 -m "`hostname -f`:${GLITE_LB_TEST_SERVER_PORT}" -j jobs" >> log + echo " jobs = `cat jobs` | tr -d '\n'" >> log + X509_USER_KEY=${X509_USER_KEY} X509_USER_CERT=${X509_USER_CERT} ${purge} -l -a1 -c1 -n1 -e1 -o1 -m "`hostname -f`:${GLITE_LB_TEST_SERVER_PORT}" -j jobs 2> purge-err.tmp >purge.tmp + if [ $? -ne 0 ]; then + echo " FAIL!" + return 2; + fi + rm -f jobs + echo -n "R " + + pg_wait 10 "SELECT * FROM jobs WHERE jobid='$jobunique'" 0 || return $? + if [ x"$lines" != x"0" ]; then + echo "FAIL" + return 0 + fi + + ok=1 + echo "OK" +} + + quit() { if [ x"$started" = x"" ]; then kill_daemons @@ -804,6 +840,10 @@ test() { echo -n "JDL: " test_jdl || fatal if [ $ok != 1 ]; then quit; fi + +# echo -n "Purge: " +# test_purge || fatal +# if [ $ok != 1]; then quit; fi } diff --git a/org.glite.lb.harvester/src/harvester.c b/org.glite.lb.harvester/src/harvester.c index 171a247..4362b12 100644 --- a/org.glite.lb.harvester/src/harvester.c +++ b/org.glite.lb.harvester/src/harvester.c @@ -159,7 +159,7 @@ typedef struct { int dash_fd; #ifdef WITH_LBU_DB glite_lbu_DBContext dbctx; - glite_lbu_Statement insertcmd, updatecmd, updatecmd_vo, updatecmd_mon; + glite_lbu_Statement insertcmd, updatecmd, updatecmd_vo, updatecmd_mon, deletecmd; int dbcaps; #endif } thread_t; @@ -185,6 +185,7 @@ typedef struct { char *wlcg_topic; // msg topic int wlcg_flush; // send message for eachnotification int silly; // old LB 1.9 mode + int no_purge; // disabled reaction on purge state int nservers; notif_t *notifs; @@ -234,10 +235,11 @@ static const struct option opts[] = { { "wlcg", no_argument, NULL, 'w'}, { "old", no_argument, NULL, 'o'}, { "cleanup", no_argument, NULL, 'l'}, + { "no-purge", no_argument, NULL, 'u'}, { NULL, no_argument, NULL, 0} }; -static const char *opts_line = "hvs:d:Di:t:H:c:n:p:m:C:K:wo"; +static const char *opts_line = "hvs:d:Di:t:H:c:n:p:m:C:K:wolu"; config_t config = { local_address: NULL, @@ -254,6 +256,7 @@ config_t config = { cleanup: 0, wlcg: 0, silly: 0, + no_purge: 0, nservers: 0, notifs: NULL, @@ -934,104 +937,113 @@ static notif_t *db_search_notif_by_server(notif_t *notifs, int n, const char *se } -static int db_store_change(__attribute((unused))thread_t *t, notif_t *notif, __attribute((unused))int index, edg_wll_JobStat *stat) { - char *jobid_str = NULL, *state_str = NULL, *sql = NULL, *sql2 = NULL, *state_entered_str = NULL, *rtm_timestamp_str = NULL, *lbhost = NULL, *unique_str = NULL, *regtime_str = NULL,*vo = NULL; - unsigned int port; - - jobid_str = stat->jobId ? glite_jobid_unparse(stat->jobId) : strdup("unknown"); - glite_jobid_getServerParts(stat->jobId, &lbhost, &port); - unique_str = glite_jobid_getUnique(stat->jobId); - state_str = edg_wll_StatToString(stat->state); - vo = edg_wll_JDLField(stat,"VirtualOrganisation"); - printf(RTM_TTY_GREEN "notifid: %s (%s), jobid: %s, state: %s, vo: %s, last time: %lf" RTM_TTY_RST "\n", notif->id_str, rtm_notiftype2str(notif->type), jobid_str, state_str, vo, notif->last_update); - -#ifdef WITH_LBU_DB - if (config.dbcs && t->dbctx) { - double state_entered, rtm_timestamp; - char *ce, *queue, *colon, *sql_part; - const char *rb, *ui, *state, *active, *state_changed, *lb; - time_t registered; - - ce = stat->destination ? : "unknown"; - queue = strchr(ce, '/'); - if (queue) *queue++='\0'; - else queue = "unknown"; - colon = strchr(ce, ':'); - if (colon) colon[0] = '\0'; - rb = stat->network_server ? rtm_ns2hostname(stat->network_server) : "unknown"; - ui = stat->ui_host ? : "unknown"; - state = state_str ? : "unknown"; - state_entered = stat->stateEnterTime.tv_sec + stat->stateEnterTime.tv_usec / 1000000.0; - rtm_timestamp = rtm_gettimeofday(); - registered = stat->stateEnterTimes[1 + EDG_WLL_JOB_SUBMITTED]; - lb = lbhost; - active = "true"; - state_changed = "true"; - +typedef struct { + char *lb; + char *jobid; + char *unique_str; + char *ce; + char *queue; + char *rb; + char *ui; + char *state; + double state_entered; + double rtm_timestamp; + int registered; + char * vo; +} db_job_t; + + +// +// store state into dababase +// on purged status deletes the record +// +static void db_store_change_perform_sql(thread_t *t, edg_wll_JobStatCode state, db_job_t *rec) { + char *state_entered_str = NULL, *rtm_timestamp_str = NULL, *regtime_str = NULL; + char *sql = NULL, *sql2 = NULL, *sql_part = NULL; + const char *active = "true", *state_changed = "true"; + + if (state == EDG_WLL_JOB_PURGED) { + if (!config.no_purge) { + lprintf(t, DBG, "purge %s", rec->jobid); + if ((t->dbcaps & GLITE_LBU_DB_CAP_PREPARED) == 0) { + trio_asprintf(&sql, "DELETE FROM " RTM_DB_TABLE_JOBS " WHERE jobid='%|Ss' AND lb='%|Ss'", rec->unique_str, rec->lb); + lprintf(t, INS, "delete: %s", sql); + if (glite_lbu_ExecSQL(t->dbctx, sql, NULL) == -1) { + lprintf_dbctx(t, WRN, "can't delete job %s", rec->jobid); + goto quit; + } + } else { + if (glite_lbu_ExecPreparedStmt(t->deletecmd, 2, + GLITE_LBU_DB_TYPE_VARCHAR, rec->unique_str, + GLITE_LBU_DB_TYPE_VARCHAR, rec->lb + ) == -1) { + lprintf_dbctx(t, WRN, "can't delete job %s", rec->jobid); + goto quit; + } + } + } + } else { if ((t->dbcaps & GLITE_LBU_DB_CAP_PREPARED) == 0) { - - glite_lbu_TimestampToDB(t->dbctx, state_entered, &state_entered_str); - glite_lbu_TimestampToDB(t->dbctx, rtm_timestamp, &rtm_timestamp_str); - glite_lbu_TimeToDB(t->dbctx, registered, ®time_str); - - if (vo) trio_asprintf(&sql_part, ", vo='%|Ss' ", vo); - else sql_part = strdup(""); - trio_asprintf(&sql, "UPDATE " RTM_DB_TABLE_JOBS " SET ce='%|Ss', queue='%|Ss', rb='%|Ss', ui='%|Ss', state='%|Ss', state_entered=%s, rtm_timestamp=%s, active=%s, state_changed=%s, registered=%s%sWHERE jobid='%|Ss' AND lb='%|Ss'", ce, queue, rb, ui, state, state_entered_str, rtm_timestamp_str, active, state_changed, regtime_str, sql_part, unique_str, lb); - free(sql_part); - lprintf(t, INS, "update: %s", sql); - switch (glite_lbu_ExecSQL(t->dbctx, sql, NULL)) { - case -1: - lprintf_dbctx(t, ERR, "can't get jobs"); - goto quit; - case 0: - trio_asprintf(&sql2, "INSERT INTO " RTM_DB_TABLE_JOBS " " - "(ce, queue, rb, ui, state, state_entered, rtm_timestamp, jobid, lb, active, state_changed, registered, vo) VALUES " - "('%|Ss', '%|Ss', '%|Ss', '%|Ss', '%|Ss', %s, %s, '%|Ss', '%|Ss', %s, %s, %s, '%|Ss')", ce, queue, rb, ui, state, state_entered_str, rtm_timestamp_str, unique_str, lb, active, state_changed, regtime_str, vo ? : "unknown"); - lprintf(t, INS, "insert: %s", sql2); - if (glite_lbu_ExecSQL(t->dbctx, sql2, NULL) == -1) { - lprintf_dbctx(t, ERR, "can't insert job"); + glite_lbu_TimestampToDB(t->dbctx, rec->state_entered, &state_entered_str); + glite_lbu_TimestampToDB(t->dbctx, rec->rtm_timestamp, &rtm_timestamp_str); + glite_lbu_TimeToDB(t->dbctx, rec->registered, ®time_str); + + if (rec->vo) trio_asprintf(&sql_part, ", vo='%|Ss' ", rec->vo); + else sql_part = strdup(""); + trio_asprintf(&sql, "UPDATE " RTM_DB_TABLE_JOBS " SET ce='%|Ss', queue='%|Ss', rb='%|Ss', ui='%|Ss', state='%|Ss', state_entered=%s, rtm_timestamp=%s, active=%s, state_changed=%s, registered=%s%sWHERE jobid='%|Ss' AND lb='%|Ss'", rec->ce, rec->queue, rec->rb, rec->ui, rec->state, state_entered_str, rtm_timestamp_str, active, state_changed, regtime_str, sql_part, rec->unique_str, rec->lb); + lprintf(t, INS, "update: %s", sql); + switch (glite_lbu_ExecSQL(t->dbctx, sql, NULL)) { + case -1: + lprintf_dbctx(t, ERR, "can't get jobs"); goto quit; + case 0: + trio_asprintf(&sql2, "INSERT INTO " RTM_DB_TABLE_JOBS " " + "(ce, queue, rb, ui, state, state_entered, rtm_timestamp, jobid, lb, active, state_changed, registered, vo) VALUES " + "('%|Ss', '%|Ss', '%|Ss', '%|Ss', '%|Ss', %s, %s, '%|Ss', '%|Ss', %s, %s, %s, '%|Ss')", rec->ce, rec->queue, rec->rb, rec->ui, rec->state, state_entered_str, rtm_timestamp_str, rec->unique_str, rec->lb, active, state_changed, regtime_str, rec->vo ? : "unknown"); + lprintf(t, INS, "insert: %s", sql2); + if (glite_lbu_ExecSQL(t->dbctx, sql2, NULL) == -1) { + lprintf_dbctx(t, ERR, "can't insert job"); + goto quit; + } + break; + default: + break; } - break; - default: - break; - } - } else { // prepared commands int ret; - if (vo) { + if (rec->vo) { ret = glite_lbu_ExecPreparedStmt(t->updatecmd_vo, 13, - GLITE_LBU_DB_TYPE_VARCHAR, ce, - GLITE_LBU_DB_TYPE_VARCHAR, queue, - GLITE_LBU_DB_TYPE_VARCHAR, rb, - GLITE_LBU_DB_TYPE_VARCHAR, ui, - GLITE_LBU_DB_TYPE_VARCHAR, state, - GLITE_LBU_DB_TYPE_TIMESTAMP, state_entered, - GLITE_LBU_DB_TYPE_TIMESTAMP, rtm_timestamp, + GLITE_LBU_DB_TYPE_VARCHAR, rec->ce, + GLITE_LBU_DB_TYPE_VARCHAR, rec->queue, + GLITE_LBU_DB_TYPE_VARCHAR, rec->rb, + GLITE_LBU_DB_TYPE_VARCHAR, rec->ui, + GLITE_LBU_DB_TYPE_VARCHAR, rec->state, + GLITE_LBU_DB_TYPE_TIMESTAMP, rec->state_entered, + GLITE_LBU_DB_TYPE_TIMESTAMP, rec->rtm_timestamp, GLITE_LBU_DB_TYPE_BOOLEAN, 1, // active GLITE_LBU_DB_TYPE_BOOLEAN, 1, // state_changed - GLITE_LBU_DB_TYPE_TIMESTAMP, (double)registered, - GLITE_LBU_DB_TYPE_VARCHAR, vo, // VO + GLITE_LBU_DB_TYPE_TIMESTAMP, (double)rec->registered, + GLITE_LBU_DB_TYPE_VARCHAR, rec->vo, // VO - GLITE_LBU_DB_TYPE_VARCHAR, unique_str, // jobid - GLITE_LBU_DB_TYPE_VARCHAR, lb // L&B server + GLITE_LBU_DB_TYPE_VARCHAR, rec->unique_str, // jobid + GLITE_LBU_DB_TYPE_VARCHAR, rec->lb // L&B server ); } else { ret = glite_lbu_ExecPreparedStmt(t->updatecmd, 12, - GLITE_LBU_DB_TYPE_VARCHAR, ce, - GLITE_LBU_DB_TYPE_VARCHAR, queue, - GLITE_LBU_DB_TYPE_VARCHAR, rb, - GLITE_LBU_DB_TYPE_VARCHAR, ui, - GLITE_LBU_DB_TYPE_VARCHAR, state, - GLITE_LBU_DB_TYPE_TIMESTAMP, state_entered, - GLITE_LBU_DB_TYPE_TIMESTAMP, rtm_timestamp, + GLITE_LBU_DB_TYPE_VARCHAR, rec->ce, + GLITE_LBU_DB_TYPE_VARCHAR, rec->queue, + GLITE_LBU_DB_TYPE_VARCHAR, rec->rb, + GLITE_LBU_DB_TYPE_VARCHAR, rec->ui, + GLITE_LBU_DB_TYPE_VARCHAR, rec->state, + GLITE_LBU_DB_TYPE_TIMESTAMP, rec->state_entered, + GLITE_LBU_DB_TYPE_TIMESTAMP, rec->rtm_timestamp, GLITE_LBU_DB_TYPE_BOOLEAN, 1, // active GLITE_LBU_DB_TYPE_BOOLEAN, 1, // state_changed - GLITE_LBU_DB_TYPE_TIMESTAMP, (double)registered, + GLITE_LBU_DB_TYPE_TIMESTAMP, (double)rec->registered, - GLITE_LBU_DB_TYPE_VARCHAR, unique_str, // jobid - GLITE_LBU_DB_TYPE_VARCHAR, lb // L&B server + GLITE_LBU_DB_TYPE_VARCHAR, rec->unique_str, // jobid + GLITE_LBU_DB_TYPE_VARCHAR, rec->lb // L&B server ); } @@ -1041,19 +1053,19 @@ static int db_store_change(__attribute((unused))thread_t *t, notif_t *notif, __a goto quit; case 0: if (glite_lbu_ExecPreparedStmt(t->insertcmd, 13, - GLITE_LBU_DB_TYPE_VARCHAR, ce, - GLITE_LBU_DB_TYPE_VARCHAR, queue, - GLITE_LBU_DB_TYPE_VARCHAR, rb, - GLITE_LBU_DB_TYPE_VARCHAR, ui, - GLITE_LBU_DB_TYPE_VARCHAR, state, - GLITE_LBU_DB_TYPE_TIMESTAMP, state_entered, - GLITE_LBU_DB_TYPE_TIMESTAMP, rtm_timestamp, - GLITE_LBU_DB_TYPE_VARCHAR, unique_str, // jobid - GLITE_LBU_DB_TYPE_VARCHAR, lb, // L&B server + GLITE_LBU_DB_TYPE_VARCHAR, rec->ce, + GLITE_LBU_DB_TYPE_VARCHAR, rec->queue, + GLITE_LBU_DB_TYPE_VARCHAR, rec->rb, + GLITE_LBU_DB_TYPE_VARCHAR, rec->ui, + GLITE_LBU_DB_TYPE_VARCHAR, rec->state, + GLITE_LBU_DB_TYPE_TIMESTAMP, rec->state_entered, + GLITE_LBU_DB_TYPE_TIMESTAMP, rec->rtm_timestamp, + GLITE_LBU_DB_TYPE_VARCHAR, rec->unique_str, // jobid + GLITE_LBU_DB_TYPE_VARCHAR, rec->lb, // L&B server GLITE_LBU_DB_TYPE_BOOLEAN, 1, // active GLITE_LBU_DB_TYPE_BOOLEAN, 1, // state_changed - GLITE_LBU_DB_TYPE_TIMESTAMP, (double)registered, - GLITE_LBU_DB_TYPE_VARCHAR, vo ? : "unknown" // VO + GLITE_LBU_DB_TYPE_TIMESTAMP, (double)rec->registered, + GLITE_LBU_DB_TYPE_VARCHAR, rec->vo ? : "unknown" // VO ) == -1) { lprintf_dbctx(t, ERR, "can't insert to " RTM_DB_TABLE_JOBS " table"); goto quit; @@ -1063,7 +1075,70 @@ static int db_store_change(__attribute((unused))thread_t *t, notif_t *notif, __a break; } } // prepare commands + } + +quit: + free(sql); + free(sql2); + free(sql_part); + free(state_entered_str); + free(rtm_timestamp_str); + free(regtime_str); +} + +static int db_store_change(thread_t *t, notif_t *notif, __attribute((unused))int index, edg_wll_JobStat *stat) { + char *jobid_str = NULL, *state_str = NULL, *vo = NULL, *lbhost = NULL; + unsigned int port; + + jobid_str = stat->jobId ? glite_jobid_unparse(stat->jobId) : strdup("unknown"); + glite_jobid_getServerParts(stat->jobId, &lbhost, &port); + state_str = edg_wll_StatToString(stat->state); + vo = edg_wll_JDLField(stat, "VirtualOrganisation"); + printf(RTM_TTY_GREEN "notifid: %s (%s), jobid: %s, state: %s, vo: %s, last time: %lf" RTM_TTY_RST "\n", notif->id_str, rtm_notiftype2str(notif->type), jobid_str, state_str, vo, notif->last_update); + +#ifdef WITH_LBU_DB + if (config.dbcs && t->dbctx) { + db_job_t rec; + char *colon; + char *unique_str = NULL, *network_server = NULL; + + memset(&rec, 0, sizeof rec); + // L&B server + rec.lb = lbhost; + // jobid + uniqe + unique_str = glite_jobid_getUnique(stat->jobId); + rec.unique_str = unique_str; + rec.jobid = jobid_str; + // CE + rec.ce = stat->destination ? : "unknown"; + // queue + rec.queue = strchr(rec.ce, '/'); + if (rec.queue) *rec.queue++='\0'; + else rec.queue = "unknown"; + colon = strchr(rec.ce, ':'); + if (colon) colon[0] = '\0'; + // Virtual Organization + rec.vo = vo; + // Resource Broker + network_server = stat->network_server ? rtm_ns2hostname(stat->network_server) : strdup("unknown"); + rec.rb = network_server; + // UI + rec.ui = stat->ui_host ? : "unknown"; + // state + rec.state = state_str ? : "unknown"; + // state time + rec.state_entered = stat->stateEnterTime.tv_sec + stat->stateEnterTime.tv_usec / 1000000.0; + // notification time + rec.rtm_timestamp = rtm_gettimeofday(); + // registration time + rec.registered = stat->stateEnterTimes[1 + EDG_WLL_JOB_SUBMITTED]; + + // store! + db_store_change_perform_sql(t, stat->state, &rec); + + free(unique_str); + free(network_server); } #endif @@ -1076,13 +1151,7 @@ static int db_store_change(__attribute((unused))thread_t *t, notif_t *notif, __a quit: free(jobid_str); free(state_str); - free(sql); - free(sql2); - free(state_entered_str); - free(rtm_timestamp_str); free(lbhost); - free(unique_str); - free(regtime_str); free(vo); return 0; @@ -1563,9 +1632,10 @@ void *notify_thread(void *thread_data) { "SET ce=$1, queue=$2, rb=$3, ui=$4, state=$5, state_entered=$6, rtm_timestamp=$7, active=$8, state_changed=$9, registered=$10, vo=$11 WHERE jobid=$12 AND lb=$13", &t->updatecmd_vo) != 0 || glite_lbu_PrepareStmt(t->dbctx, "UPDATE " DBAMP RTM_DB_TABLE_LBS DBAMP " " "SET monitored=$1 WHERE ip=$2", - &t->updatecmd_mon)) { + &t->updatecmd_mon) != 0 || glite_lbu_PrepareStmt(t->dbctx, "DELETE FROM " DBAMP RTM_DB_TABLE_JOBS DBAMP " WHERE jobid=$1 AND lb=$2", + &t->deletecmd) != 0) { lprintf_dbctx(t, ERR, "can't create prepare commands"); - lprintf(t, DBG, "insertcmd=%p, updatecmd=%p, updatecmd_vo=%p, updatecmd_mon=%p", t->insertcmd, t->updatecmd, t->updatecmd_vo, t->updatecmd_mon); + lprintf(t, DBG, "insertcmd=%p, updatecmd=%p, updatecmd_vo=%p, updatecmd_mon=%p, deletecmd=%p", t->insertcmd, t->updatecmd, t->updatecmd_vo, t->updatecmd_mon, t->deletecmd); quit = RTM_QUIT_PRESERVE; } } @@ -1906,6 +1976,7 @@ exit: if (t->updatecmd) glite_lbu_FreeStmt(&t->updatecmd); if (t->updatecmd_vo) glite_lbu_FreeStmt(&t->updatecmd_vo); if (t->updatecmd_mon) glite_lbu_FreeStmt(&t->updatecmd_mon); + if (t->deletecmd) glite_lbu_FreeStmt(&t->deletecmd); db_free(t, t->dbctx); #endif if (ctx) edg_wll_FreeContext(ctx); @@ -2035,6 +2106,7 @@ void usage(const char *prog) { " -K, --key X509 key file\n" " -o, --old \"silly\" mode for old L&B 1.9 servers\n" " -l, --cleanup clean up the notifications and exit\n" + " -u, --no-purge disable purging from RTM database\n" " -w, --wlcg enable messaging for dashboard\n" " --wlcg-binary full path to msg-publish binary\n" " --wlcg-topic topic for msg-publish\n" @@ -2131,6 +2203,9 @@ int config_preload(int argn, char *argv[]) { case 'o': config.silly = 1; break; + case 'u': + config.no_purge = 1; + break; case 0: switch(index) { case 0: @@ -2183,6 +2258,7 @@ int config_preload(int argn, char *argv[]) { lprintf(NULL, INF, "daemonize: %s", config.daemonize ? "enabled" : "disabled"); lprintf(NULL, INF, "fork guard: %s", config.guard ? "enabled" : "disabled"); lprintf(NULL, INF, "silly compatibility mode: %s", config.silly ? "enabled" : "disabled"); + lprintf(NULL, INF, "purge: %s", !config.no_purge ? "enabled" : "disabled"); } return 0; -- 1.8.2.3