Purge support in RTM database:
authorFrantišek Dvořák <valtri@civ.zcu.cz>
Fri, 15 Jan 2010 20:25:23 +0000 (20:25 +0000)
committerFrantišek Dvořák <valtri@civ.zcu.cz>
Fri, 15 Jan 2010 20:25:23 +0000 (20:25 +0000)
 - support in harvester, reshuffle the code
 - added option --no-purge
 - update docs
 - preliminary purge test
Requires recent LB server from HEAD.

org.glite.lb.harvester/Makefile
org.glite.lb.harvester/doc/glite-lb-harvester.sgml
org.glite.lb.harvester/examples/test.sh
org.glite.lb.harvester/src/harvester.c

index 5fb9fa4..15f0b92 100644 (file)
@@ -46,7 +46,7 @@ INSTALL:=libtool --mode=install install
 
 default: all
 
-compile all: harvester doc
+compile all: harvester doc debug
 
 check:
 
@@ -58,8 +58,10 @@ stage: compile
        $(MAKE) install PREFIX=${stagedir}
 
 install: compile
-       -mkdir -p ${PREFIX}/bin ${PREFIX}/share/doc/${package}-${version} ${PREFIX}/share/man/man8
+       -mkdir -p ${PREFIX}/bin ${PREFIX}/examples ${PREFIX}/share/doc/${package}-${version} ${PREFIX}/share/man/man8
        ${INSTALL} -m 755 harvester ${PREFIX}/bin/glite-lb-harvester
+       ${INSTALL} -m 755 harvester-dbg ${PREFIX}/examples/glite-lb-harvester-dbg
+       ${INSTALL} -m 755 ../examples/test.sh ${PREFIX}/examples/glite-lb-harvester-test.sh
        ${INSTALL} -m 444 ../doc/README ${PREFIX}/share/doc/${package}-${version}
        ${INSTALL} -m 444 glite-lb-harvester.8 ${PREFIX}/share/man/man8
 
@@ -75,7 +77,7 @@ harvester-dbg: harvester-dbg.o
        ${LINK} -o $@ $+ ${LIBS}
 
 harvester-dbg.o: harvester.c
-       ${COMPILE} -Werror -DLOG=1 -DWITH_RTM_SQL_STORAGE=1 -c $< -o $@
+       ${COMPILE} -DLOG=1 -DWITH_RTM_SQL_STORAGE=1 -c $< -o $@
 
 %.o: %.c
        ${COMPILE} -c $<
index 55f7330..44d758f 100644 (file)
                        </group></arg>
 
                        <arg><group choice='plain'>
+                               <arg>-u</arg>
+                               <arg>--no-purge</arg>
+                       </group></arg>
+
+                       <arg><group choice='plain'>
                                <arg>-w</arg>
                                <arg>--wlcg</arg>
                        </group></arg>
@@ -295,6 +300,15 @@ Each  notification automatically expires. But if you know, than notifications us
                        </varlistentry>
 
                        <varlistentry>
+                               <term><option>-u</option>|<option>--no-purge</option></term>
+                               <listitem><para>
+By default jobs are purged from local database when purged on L&amp;B server. This option forces keeping all jobs in database, only with changed state to 'Purged'.
+                               </para><para>
+For using together with <option>-m</option> (<option>--pg</option>).
+                               </para></listitem>
+                       </varlistentry>
+
+                       <varlistentry>
                                <term><option>-w</option>|<option>--wlcg</option></term>
                                <listitem><para>
 Enables delivery to MSG publish. Messages are sent by executing a binary with proper parameters.
index 7105ee1..4bfba32 100755 (executable)
@@ -100,7 +100,9 @@ init() {
 
        jobreg="$GLITE_LOCATION/examples/glite-lb-job_reg -m `hostname -f`:${GLITE_LB_TEST_SERVER_PORT} -s UserInterface"
        logev="$GLITE_LOCATION/bin/glite-lb-logevent -x -S `pwd`/LB/proxy.sockstore.sock -U localhost"
-       for dir in "$GLITE_LOCATION/bin" "`pwd`/../build" "`pwd`"; do
+       purge="$GLITE_LOCATION/bin/glite-lb-purge"
+       [ -x "$purge" ] || purge="$GLITE_LOCATION/sbin/glite-lb-purge"
+       for dir in "$GLITE_LOCATION/examlpes" "`pwd`/../build" "`pwd`"; do
                if [ -x "$dir/glite-lb-harvester-dbg" ]; then
                        rtm="$dir/glite-lb-harvester-dbg"
                fi
@@ -732,7 +734,7 @@ EOF
                echo "FAIL"
                return 0
        fi
-       echo -n "OK"
+       echo -n "OK "
 
        echo -n "changed after waiting..."
        ev -s WorkloadManager -e EnQueued --queue "very long and chaotic queue" --destination LogMonitor --dest_host localhost --dest_instance pid$$ --job "(car 'testing=true)"  --result=OK || return $?
@@ -754,6 +756,40 @@ EOF
 }
 
 
+test_purge() {
+       ok=0
+
+       echo -n "purge."
+       pg_get "SELECT jobid FROM jobs" || return $?
+       if [ -z "$lines" -o $lines -le 0  ]; then
+               echo "no jobs! FAIL"
+               return 0
+       fi
+       echo -n "P"
+       jobunique=`echo "$result" | head -n 1 | tr -d '\n'`
+       jobid="https://`hostname -f`:${GLITE_LB_TEST_SERVER_PORT}/$jobunique"
+       echo $jobid > jobs
+       echo "${purge} -a1 -c1 -n1 -e1 -o1 -m "`hostname -f`:${GLITE_LB_TEST_SERVER_PORT}" -j jobs" >> log
+       echo "  jobs = `cat jobs` | tr -d '\n'" >> log
+       X509_USER_KEY=${X509_USER_KEY} X509_USER_CERT=${X509_USER_CERT} ${purge} -l -a1 -c1 -n1 -e1 -o1 -m "`hostname -f`:${GLITE_LB_TEST_SERVER_PORT}" -j jobs 2> purge-err.tmp >purge.tmp
+       if [ $? -ne 0 ]; then
+               echo " FAIL!"
+               return 2;
+       fi
+       rm -f jobs
+       echo -n "R "
+
+       pg_wait 10 "SELECT * FROM jobs WHERE jobid='$jobunique'" 0 || return $?
+       if [ x"$lines" != x"0" ]; then
+               echo "FAIL"
+               return 0
+       fi
+
+       ok=1
+       echo "OK"
+}
+
+
 quit() {
        if [ x"$started" = x"" ]; then
                kill_daemons
@@ -804,6 +840,10 @@ test() {
        echo -n "JDL: "
        test_jdl || fatal
        if [ $ok != 1 ]; then quit; fi
+
+#      echo -n "Purge: "
+#      test_purge || fatal
+#      if [ $ok != 1]; then quit; fi
 }
 
 
index 171a247..4362b12 100644 (file)
@@ -159,7 +159,7 @@ typedef struct {
        int dash_fd;
 #ifdef WITH_LBU_DB
        glite_lbu_DBContext dbctx;
-       glite_lbu_Statement insertcmd, updatecmd, updatecmd_vo, updatecmd_mon;
+       glite_lbu_Statement insertcmd, updatecmd, updatecmd_vo, updatecmd_mon, deletecmd;
        int dbcaps;
 #endif
 } thread_t;
@@ -185,6 +185,7 @@ typedef struct {
        char *wlcg_topic;   // msg topic
        int wlcg_flush;     // send message for eachnotification
        int silly;          // old LB 1.9 mode
+       int no_purge;       // disabled reaction on purge state
 
        int nservers;
        notif_t *notifs;
@@ -234,10 +235,11 @@ static const struct option opts[] = {
        { "wlcg",       no_argument,            NULL,   'w'},
        { "old",        no_argument,            NULL,   'o'},
        { "cleanup",    no_argument,            NULL,   'l'},
+       { "no-purge",   no_argument,            NULL,   'u'},
        { NULL,         no_argument,            NULL,   0}
 };
 
-static const char *opts_line = "hvs:d:Di:t:H:c:n:p:m:C:K:wo";
+static const char *opts_line = "hvs:d:Di:t:H:c:n:p:m:C:K:wolu";
 
 config_t config = {
        local_address: NULL,
@@ -254,6 +256,7 @@ config_t config = {
        cleanup: 0,
        wlcg: 0,
        silly: 0,
+       no_purge: 0,
 
        nservers: 0,
        notifs: NULL,
@@ -934,104 +937,113 @@ static notif_t *db_search_notif_by_server(notif_t *notifs, int n, const char *se
 }
 
 
-static int db_store_change(__attribute((unused))thread_t *t, notif_t *notif, __attribute((unused))int index, edg_wll_JobStat *stat) {
-       char *jobid_str = NULL, *state_str = NULL, *sql = NULL, *sql2 = NULL, *state_entered_str = NULL, *rtm_timestamp_str = NULL, *lbhost = NULL, *unique_str = NULL, *regtime_str = NULL,*vo = NULL;
-       unsigned int port;
-
-       jobid_str = stat->jobId ? glite_jobid_unparse(stat->jobId) : strdup("unknown");
-       glite_jobid_getServerParts(stat->jobId, &lbhost, &port);
-       unique_str = glite_jobid_getUnique(stat->jobId);
-       state_str = edg_wll_StatToString(stat->state);
-       vo = edg_wll_JDLField(stat,"VirtualOrganisation");
-       printf(RTM_TTY_GREEN "notifid: %s (%s), jobid: %s, state: %s, vo: %s, last time: %lf" RTM_TTY_RST "\n", notif->id_str, rtm_notiftype2str(notif->type), jobid_str, state_str, vo, notif->last_update);
-
-#ifdef WITH_LBU_DB
-       if (config.dbcs && t->dbctx) {
-               double state_entered, rtm_timestamp;
-               char *ce, *queue, *colon, *sql_part;
-               const char *rb, *ui, *state, *active, *state_changed, *lb;
-               time_t registered;
-
-               ce = stat->destination ? : "unknown";
-               queue = strchr(ce, '/');
-               if (queue) *queue++='\0';
-               else queue = "unknown";
-               colon = strchr(ce, ':');
-               if (colon) colon[0] = '\0';
-               rb = stat->network_server ? rtm_ns2hostname(stat->network_server) : "unknown";
-               ui = stat->ui_host ? : "unknown";
-               state = state_str ? : "unknown";
-               state_entered = stat->stateEnterTime.tv_sec + stat->stateEnterTime.tv_usec / 1000000.0;
-               rtm_timestamp = rtm_gettimeofday();
-               registered = stat->stateEnterTimes[1 + EDG_WLL_JOB_SUBMITTED];
-               lb = lbhost;
-               active = "true";
-               state_changed = "true";
-
+typedef struct {
+       char *lb;
+       char *jobid;
+       char *unique_str;
+       char *ce;
+       char *queue;
+       char *rb;
+       char *ui;
+       char *state;
+       double state_entered;
+       double rtm_timestamp;
+       int registered;
+       char * vo;
+} db_job_t;
+
+
+//
+// store state into dababase
+// on purged status deletes the record
+//
+static void db_store_change_perform_sql(thread_t *t, edg_wll_JobStatCode state, db_job_t *rec) {
+       char *state_entered_str = NULL, *rtm_timestamp_str = NULL, *regtime_str = NULL;
+       char *sql = NULL, *sql2 = NULL, *sql_part = NULL;
+       const char *active = "true", *state_changed = "true";
+
+       if (state == EDG_WLL_JOB_PURGED) {
+               if (!config.no_purge) {
+                       lprintf(t, DBG, "purge %s", rec->jobid);
+                       if ((t->dbcaps & GLITE_LBU_DB_CAP_PREPARED) == 0) {
+                               trio_asprintf(&sql, "DELETE FROM " RTM_DB_TABLE_JOBS " WHERE jobid='%|Ss' AND lb='%|Ss'", rec->unique_str, rec->lb);
+                               lprintf(t, INS, "delete: %s", sql);
+                               if (glite_lbu_ExecSQL(t->dbctx, sql, NULL) == -1) {
+                                       lprintf_dbctx(t, WRN, "can't delete job %s", rec->jobid);
+                                       goto quit;
+                               }
+                       } else {
+                               if (glite_lbu_ExecPreparedStmt(t->deletecmd, 2,
+                                       GLITE_LBU_DB_TYPE_VARCHAR, rec->unique_str,
+                                       GLITE_LBU_DB_TYPE_VARCHAR, rec->lb
+                               ) == -1) {
+                                       lprintf_dbctx(t, WRN, "can't delete job %s", rec->jobid);
+                                       goto quit;
+                               }
+                       }
+               }
+       } else {
                if ((t->dbcaps & GLITE_LBU_DB_CAP_PREPARED) == 0) {
-
-               glite_lbu_TimestampToDB(t->dbctx, state_entered, &state_entered_str);
-               glite_lbu_TimestampToDB(t->dbctx, rtm_timestamp, &rtm_timestamp_str);
-               glite_lbu_TimeToDB(t->dbctx, registered, &regtime_str);
-
-               if (vo) trio_asprintf(&sql_part, ", vo='%|Ss' ", vo);
-               else sql_part = strdup("");
-               trio_asprintf(&sql, "UPDATE " RTM_DB_TABLE_JOBS " SET ce='%|Ss', queue='%|Ss', rb='%|Ss', ui='%|Ss', state='%|Ss', state_entered=%s, rtm_timestamp=%s, active=%s, state_changed=%s, registered=%s%sWHERE jobid='%|Ss' AND lb='%|Ss'", ce, queue, rb, ui, state, state_entered_str, rtm_timestamp_str, active, state_changed, regtime_str, sql_part, unique_str, lb);
-               free(sql_part);
-               lprintf(t, INS, "update: %s", sql);
-               switch (glite_lbu_ExecSQL(t->dbctx, sql, NULL)) {
-               case -1:
-                       lprintf_dbctx(t, ERR, "can't get jobs");
-                       goto quit;
-               case 0:
-                       trio_asprintf(&sql2, "INSERT INTO " RTM_DB_TABLE_JOBS " "
-                               "(ce, queue, rb, ui, state, state_entered, rtm_timestamp, jobid, lb, active, state_changed, registered, vo) VALUES "
-                               "('%|Ss', '%|Ss', '%|Ss', '%|Ss', '%|Ss', %s, %s, '%|Ss', '%|Ss', %s, %s, %s, '%|Ss')", ce, queue, rb, ui, state, state_entered_str, rtm_timestamp_str, unique_str, lb, active, state_changed, regtime_str, vo ? : "unknown");
-                       lprintf(t, INS, "insert: %s", sql2);
-                       if (glite_lbu_ExecSQL(t->dbctx, sql2, NULL) == -1) {
-                               lprintf_dbctx(t, ERR, "can't insert job");
+                       glite_lbu_TimestampToDB(t->dbctx, rec->state_entered, &state_entered_str);
+                       glite_lbu_TimestampToDB(t->dbctx, rec->rtm_timestamp, &rtm_timestamp_str);
+                       glite_lbu_TimeToDB(t->dbctx, rec->registered, &regtime_str);
+
+                       if (rec->vo) trio_asprintf(&sql_part, ", vo='%|Ss' ", rec->vo);
+                       else sql_part = strdup("");
+                       trio_asprintf(&sql, "UPDATE " RTM_DB_TABLE_JOBS " SET ce='%|Ss', queue='%|Ss', rb='%|Ss', ui='%|Ss', state='%|Ss', state_entered=%s, rtm_timestamp=%s, active=%s, state_changed=%s, registered=%s%sWHERE jobid='%|Ss' AND lb='%|Ss'", rec->ce, rec->queue, rec->rb, rec->ui, rec->state, state_entered_str, rtm_timestamp_str, active, state_changed, regtime_str, sql_part, rec->unique_str, rec->lb);
+                       lprintf(t, INS, "update: %s", sql);
+                       switch (glite_lbu_ExecSQL(t->dbctx, sql, NULL)) {
+                       case -1:
+                               lprintf_dbctx(t, ERR, "can't get jobs");
                                goto quit;
+                       case 0:
+                               trio_asprintf(&sql2, "INSERT INTO " RTM_DB_TABLE_JOBS " "
+                                       "(ce, queue, rb, ui, state, state_entered, rtm_timestamp, jobid, lb, active, state_changed, registered, vo) VALUES "
+                                       "('%|Ss', '%|Ss', '%|Ss', '%|Ss', '%|Ss', %s, %s, '%|Ss', '%|Ss', %s, %s, %s, '%|Ss')", rec->ce, rec->queue, rec->rb, rec->ui, rec->state, state_entered_str, rtm_timestamp_str, rec->unique_str, rec->lb, active, state_changed, regtime_str, rec->vo ? : "unknown");
+                               lprintf(t, INS, "insert: %s", sql2);
+                               if (glite_lbu_ExecSQL(t->dbctx, sql2, NULL) == -1) {
+                                       lprintf_dbctx(t, ERR, "can't insert job");
+                                       goto quit;
+                               }
+                               break;
+                       default:
+                               break;
                        }
-                       break;
-               default:
-                       break;
-               }
-
                } else { // prepared commands
                        int ret;
 
-                       if (vo) {
+                       if (rec->vo) {
                                ret = glite_lbu_ExecPreparedStmt(t->updatecmd_vo, 13,
-                                       GLITE_LBU_DB_TYPE_VARCHAR, ce,
-                                       GLITE_LBU_DB_TYPE_VARCHAR, queue,
-                                       GLITE_LBU_DB_TYPE_VARCHAR, rb,
-                                       GLITE_LBU_DB_TYPE_VARCHAR, ui,
-                                       GLITE_LBU_DB_TYPE_VARCHAR, state,
-                                       GLITE_LBU_DB_TYPE_TIMESTAMP, state_entered,
-                                       GLITE_LBU_DB_TYPE_TIMESTAMP, rtm_timestamp,
+                                       GLITE_LBU_DB_TYPE_VARCHAR, rec->ce,
+                                       GLITE_LBU_DB_TYPE_VARCHAR, rec->queue,
+                                       GLITE_LBU_DB_TYPE_VARCHAR, rec->rb,
+                                       GLITE_LBU_DB_TYPE_VARCHAR, rec->ui,
+                                       GLITE_LBU_DB_TYPE_VARCHAR, rec->state,
+                                       GLITE_LBU_DB_TYPE_TIMESTAMP, rec->state_entered,
+                                       GLITE_LBU_DB_TYPE_TIMESTAMP, rec->rtm_timestamp,
                                        GLITE_LBU_DB_TYPE_BOOLEAN, 1, // active
                                        GLITE_LBU_DB_TYPE_BOOLEAN, 1, // state_changed
-                                       GLITE_LBU_DB_TYPE_TIMESTAMP, (double)registered,
-                                       GLITE_LBU_DB_TYPE_VARCHAR, vo, // VO
+                                       GLITE_LBU_DB_TYPE_TIMESTAMP, (double)rec->registered,
+                                       GLITE_LBU_DB_TYPE_VARCHAR, rec->vo, // VO
 
-                                       GLITE_LBU_DB_TYPE_VARCHAR, unique_str, // jobid
-                                       GLITE_LBU_DB_TYPE_VARCHAR, lb // L&B server
+                                       GLITE_LBU_DB_TYPE_VARCHAR, rec->unique_str, // jobid
+                                       GLITE_LBU_DB_TYPE_VARCHAR, rec->lb // L&B server
                                );
                        } else {
                                ret = glite_lbu_ExecPreparedStmt(t->updatecmd, 12,
-                                       GLITE_LBU_DB_TYPE_VARCHAR, ce,
-                                       GLITE_LBU_DB_TYPE_VARCHAR, queue,
-                                       GLITE_LBU_DB_TYPE_VARCHAR, rb,
-                                       GLITE_LBU_DB_TYPE_VARCHAR, ui,
-                                       GLITE_LBU_DB_TYPE_VARCHAR, state,
-                                       GLITE_LBU_DB_TYPE_TIMESTAMP, state_entered,
-                                       GLITE_LBU_DB_TYPE_TIMESTAMP, rtm_timestamp,
+                                       GLITE_LBU_DB_TYPE_VARCHAR, rec->ce,
+                                       GLITE_LBU_DB_TYPE_VARCHAR, rec->queue,
+                                       GLITE_LBU_DB_TYPE_VARCHAR, rec->rb,
+                                       GLITE_LBU_DB_TYPE_VARCHAR, rec->ui,
+                                       GLITE_LBU_DB_TYPE_VARCHAR, rec->state,
+                                       GLITE_LBU_DB_TYPE_TIMESTAMP, rec->state_entered,
+                                       GLITE_LBU_DB_TYPE_TIMESTAMP, rec->rtm_timestamp,
                                        GLITE_LBU_DB_TYPE_BOOLEAN, 1, // active
                                        GLITE_LBU_DB_TYPE_BOOLEAN, 1, // state_changed
-                                       GLITE_LBU_DB_TYPE_TIMESTAMP, (double)registered,
+                                       GLITE_LBU_DB_TYPE_TIMESTAMP, (double)rec->registered,
 
-                                       GLITE_LBU_DB_TYPE_VARCHAR, unique_str, // jobid
-                                       GLITE_LBU_DB_TYPE_VARCHAR, lb // L&B server
+                                       GLITE_LBU_DB_TYPE_VARCHAR, rec->unique_str, // jobid
+                                       GLITE_LBU_DB_TYPE_VARCHAR, rec->lb // L&B server
                                );
                        }
 
@@ -1041,19 +1053,19 @@ static int db_store_change(__attribute((unused))thread_t *t, notif_t *notif, __a
                                goto quit;
                        case 0:
                                if (glite_lbu_ExecPreparedStmt(t->insertcmd, 13,
-                                       GLITE_LBU_DB_TYPE_VARCHAR, ce,
-                                       GLITE_LBU_DB_TYPE_VARCHAR, queue,
-                                       GLITE_LBU_DB_TYPE_VARCHAR, rb,
-                                       GLITE_LBU_DB_TYPE_VARCHAR, ui,
-                                       GLITE_LBU_DB_TYPE_VARCHAR, state,
-                                       GLITE_LBU_DB_TYPE_TIMESTAMP, state_entered,
-                                       GLITE_LBU_DB_TYPE_TIMESTAMP, rtm_timestamp,
-                                       GLITE_LBU_DB_TYPE_VARCHAR, unique_str, // jobid
-                                       GLITE_LBU_DB_TYPE_VARCHAR, lb, // L&B server
+                                       GLITE_LBU_DB_TYPE_VARCHAR, rec->ce,
+                                       GLITE_LBU_DB_TYPE_VARCHAR, rec->queue,
+                                       GLITE_LBU_DB_TYPE_VARCHAR, rec->rb,
+                                       GLITE_LBU_DB_TYPE_VARCHAR, rec->ui,
+                                       GLITE_LBU_DB_TYPE_VARCHAR, rec->state,
+                                       GLITE_LBU_DB_TYPE_TIMESTAMP, rec->state_entered,
+                                       GLITE_LBU_DB_TYPE_TIMESTAMP, rec->rtm_timestamp,
+                                       GLITE_LBU_DB_TYPE_VARCHAR, rec->unique_str, // jobid
+                                       GLITE_LBU_DB_TYPE_VARCHAR, rec->lb, // L&B server
                                        GLITE_LBU_DB_TYPE_BOOLEAN, 1, // active
                                        GLITE_LBU_DB_TYPE_BOOLEAN, 1, // state_changed
-                                       GLITE_LBU_DB_TYPE_TIMESTAMP, (double)registered,
-                                       GLITE_LBU_DB_TYPE_VARCHAR, vo ? : "unknown" // VO
+                                       GLITE_LBU_DB_TYPE_TIMESTAMP, (double)rec->registered,
+                                       GLITE_LBU_DB_TYPE_VARCHAR, rec->vo ? : "unknown" // VO
                                ) == -1) {
                                        lprintf_dbctx(t, ERR, "can't insert to " RTM_DB_TABLE_JOBS " table");
                                        goto quit;
@@ -1063,7 +1075,70 @@ static int db_store_change(__attribute((unused))thread_t *t, notif_t *notif, __a
                                break;
                        }
                } // prepare commands
+       }
+
+quit:
+       free(sql);
+       free(sql2);
+       free(sql_part);
+       free(state_entered_str);
+       free(rtm_timestamp_str);
+       free(regtime_str);
+}
+
 
+static int db_store_change(thread_t *t, notif_t *notif, __attribute((unused))int index, edg_wll_JobStat *stat) {
+       char *jobid_str = NULL, *state_str = NULL, *vo = NULL, *lbhost = NULL;
+       unsigned int port;
+
+       jobid_str = stat->jobId ? glite_jobid_unparse(stat->jobId) : strdup("unknown");
+       glite_jobid_getServerParts(stat->jobId, &lbhost, &port);
+       state_str = edg_wll_StatToString(stat->state);
+       vo = edg_wll_JDLField(stat, "VirtualOrganisation");
+       printf(RTM_TTY_GREEN "notifid: %s (%s), jobid: %s, state: %s, vo: %s, last time: %lf" RTM_TTY_RST "\n", notif->id_str, rtm_notiftype2str(notif->type), jobid_str, state_str, vo, notif->last_update);
+
+#ifdef WITH_LBU_DB
+       if (config.dbcs && t->dbctx) {
+               db_job_t rec;
+               char *colon;
+               char *unique_str = NULL, *network_server = NULL;
+
+               memset(&rec, 0, sizeof rec);
+               // L&B server
+               rec.lb = lbhost;
+               // jobid + uniqe
+               unique_str = glite_jobid_getUnique(stat->jobId);
+               rec.unique_str = unique_str;
+               rec.jobid = jobid_str;
+               // CE
+               rec.ce = stat->destination ? : "unknown";
+               // queue
+               rec.queue = strchr(rec.ce, '/');
+               if (rec.queue) *rec.queue++='\0';
+               else rec.queue = "unknown";
+               colon = strchr(rec.ce, ':');
+               if (colon) colon[0] = '\0';
+               // Virtual Organization
+               rec.vo = vo;
+               // Resource Broker
+               network_server = stat->network_server ? rtm_ns2hostname(stat->network_server) : strdup("unknown");
+               rec.rb = network_server;
+               // UI
+               rec.ui = stat->ui_host ? : "unknown";
+               // state
+               rec.state = state_str ? : "unknown";
+               // state time
+               rec.state_entered = stat->stateEnterTime.tv_sec + stat->stateEnterTime.tv_usec / 1000000.0;
+               // notification time
+               rec.rtm_timestamp = rtm_gettimeofday();
+               // registration time
+               rec.registered = stat->stateEnterTimes[1 + EDG_WLL_JOB_SUBMITTED];
+
+               // store!
+               db_store_change_perform_sql(t, stat->state, &rec);
+
+               free(unique_str);
+               free(network_server);
        }
 #endif
 
@@ -1076,13 +1151,7 @@ static int db_store_change(__attribute((unused))thread_t *t, notif_t *notif, __a
 quit:
        free(jobid_str);
        free(state_str);
-       free(sql);
-       free(sql2);
-       free(state_entered_str);
-       free(rtm_timestamp_str);
        free(lbhost);
-       free(unique_str);
-       free(regtime_str);
        free(vo);
 
        return 0;
@@ -1563,9 +1632,10 @@ void *notify_thread(void *thread_data) {
                            "SET ce=$1, queue=$2, rb=$3, ui=$4, state=$5, state_entered=$6, rtm_timestamp=$7, active=$8, state_changed=$9, registered=$10, vo=$11 WHERE jobid=$12 AND lb=$13", 
                        &t->updatecmd_vo) != 0 || glite_lbu_PrepareStmt(t->dbctx, "UPDATE " DBAMP RTM_DB_TABLE_LBS DBAMP " "
                            "SET monitored=$1 WHERE ip=$2",
-                       &t->updatecmd_mon)) {
+                       &t->updatecmd_mon) != 0 || glite_lbu_PrepareStmt(t->dbctx, "DELETE FROM " DBAMP RTM_DB_TABLE_JOBS DBAMP " WHERE jobid=$1 AND lb=$2",
+                       &t->deletecmd) != 0) {
                                lprintf_dbctx(t, ERR, "can't create prepare commands");
-                               lprintf(t, DBG, "insertcmd=%p, updatecmd=%p, updatecmd_vo=%p, updatecmd_mon=%p", t->insertcmd, t->updatecmd, t->updatecmd_vo, t->updatecmd_mon);
+                               lprintf(t, DBG, "insertcmd=%p, updatecmd=%p, updatecmd_vo=%p, updatecmd_mon=%p, deletecmd=%p", t->insertcmd, t->updatecmd, t->updatecmd_vo, t->updatecmd_mon, t->deletecmd);
                                quit = RTM_QUIT_PRESERVE;
                        }
                }
@@ -1906,6 +1976,7 @@ exit:
        if (t->updatecmd) glite_lbu_FreeStmt(&t->updatecmd);
        if (t->updatecmd_vo) glite_lbu_FreeStmt(&t->updatecmd_vo);
        if (t->updatecmd_mon) glite_lbu_FreeStmt(&t->updatecmd_mon);
+       if (t->deletecmd) glite_lbu_FreeStmt(&t->deletecmd);
        db_free(t, t->dbctx);
 #endif
        if (ctx) edg_wll_FreeContext(ctx);
@@ -2035,6 +2106,7 @@ void usage(const char *prog) {
                "       -K, --key          X509 key file\n"
                "       -o, --old          \"silly\" mode for old L&B 1.9 servers\n"
                "       -l, --cleanup      clean up the notifications and exit\n"
+               "       -u, --no-purge     disable purging from RTM database\n"
                "       -w, --wlcg         enable messaging for dashboard\n"
                "       --wlcg-binary      full path to msg-publish binary\n"
                "       --wlcg-topic       topic for msg-publish\n"
@@ -2131,6 +2203,9 @@ int config_preload(int argn, char *argv[]) {
                case 'o':
                        config.silly = 1;
                        break;
+               case 'u':
+                       config.no_purge = 1;
+                       break;
                case 0:
                        switch(index) {
                        case 0:
@@ -2183,6 +2258,7 @@ int config_preload(int argn, char *argv[]) {
                lprintf(NULL, INF, "daemonize: %s", config.daemonize ? "enabled" : "disabled");
                lprintf(NULL, INF, "fork guard: %s", config.guard ? "enabled" : "disabled");
                lprintf(NULL, INF, "silly compatibility mode: %s", config.silly ? "enabled" : "disabled");
+               lprintf(NULL, INF, "purge: %s", !config.no_purge ? "enabled" : "disabled");
        }
 
        return 0;