int dash_fd;
#ifdef WITH_LBU_DB
glite_lbu_DBContext dbctx;
- glite_lbu_Statement insertcmd, updatecmd, updatecmd_vo, updatecmd_mon;
+ glite_lbu_Statement insertcmd, updatecmd, updatecmd_vo, updatecmd_mon, deletecmd;
int dbcaps;
#endif
} thread_t;
char *wlcg_topic; // msg topic
int wlcg_flush; // send message for eachnotification
int silly; // old LB 1.9 mode
+ int no_purge; // disabled reaction on purge state
int nservers;
notif_t *notifs;
{ "wlcg", no_argument, NULL, 'w'},
{ "old", no_argument, NULL, 'o'},
{ "cleanup", no_argument, NULL, 'l'},
+ { "no-purge", no_argument, NULL, 'u'},
{ NULL, no_argument, NULL, 0}
};
-static const char *opts_line = "hvs:d:Di:t:H:c:n:p:m:C:K:wo";
+static const char *opts_line = "hvs:d:Di:t:H:c:n:p:m:C:K:wolu";
config_t config = {
local_address: NULL,
cleanup: 0,
wlcg: 0,
silly: 0,
+ no_purge: 0,
nservers: 0,
notifs: NULL,
}
-static int db_store_change(__attribute((unused))thread_t *t, notif_t *notif, __attribute((unused))int index, edg_wll_JobStat *stat) {
- char *jobid_str = NULL, *state_str = NULL, *sql = NULL, *sql2 = NULL, *state_entered_str = NULL, *rtm_timestamp_str = NULL, *lbhost = NULL, *unique_str = NULL, *regtime_str = NULL,*vo = NULL;
- unsigned int port;
-
- jobid_str = stat->jobId ? glite_jobid_unparse(stat->jobId) : strdup("unknown");
- glite_jobid_getServerParts(stat->jobId, &lbhost, &port);
- unique_str = glite_jobid_getUnique(stat->jobId);
- state_str = edg_wll_StatToString(stat->state);
- vo = edg_wll_JDLField(stat,"VirtualOrganisation");
- printf(RTM_TTY_GREEN "notifid: %s (%s), jobid: %s, state: %s, vo: %s, last time: %lf" RTM_TTY_RST "\n", notif->id_str, rtm_notiftype2str(notif->type), jobid_str, state_str, vo, notif->last_update);
-
-#ifdef WITH_LBU_DB
- if (config.dbcs && t->dbctx) {
- double state_entered, rtm_timestamp;
- char *ce, *queue, *colon, *sql_part;
- const char *rb, *ui, *state, *active, *state_changed, *lb;
- time_t registered;
-
- ce = stat->destination ? : "unknown";
- queue = strchr(ce, '/');
- if (queue) *queue++='\0';
- else queue = "unknown";
- colon = strchr(ce, ':');
- if (colon) colon[0] = '\0';
- rb = stat->network_server ? rtm_ns2hostname(stat->network_server) : "unknown";
- ui = stat->ui_host ? : "unknown";
- state = state_str ? : "unknown";
- state_entered = stat->stateEnterTime.tv_sec + stat->stateEnterTime.tv_usec / 1000000.0;
- rtm_timestamp = rtm_gettimeofday();
- registered = stat->stateEnterTimes[1 + EDG_WLL_JOB_SUBMITTED];
- lb = lbhost;
- active = "true";
- state_changed = "true";
-
+typedef struct {
+ char *lb;
+ char *jobid;
+ char *unique_str;
+ char *ce;
+ char *queue;
+ char *rb;
+ char *ui;
+ char *state;
+ double state_entered;
+ double rtm_timestamp;
+ int registered;
+ char * vo;
+} db_job_t;
+
+
+//
+// store state into dababase
+// on purged status deletes the record
+//
+static void db_store_change_perform_sql(thread_t *t, edg_wll_JobStatCode state, db_job_t *rec) {
+ char *state_entered_str = NULL, *rtm_timestamp_str = NULL, *regtime_str = NULL;
+ char *sql = NULL, *sql2 = NULL, *sql_part = NULL;
+ const char *active = "true", *state_changed = "true";
+
+ if (state == EDG_WLL_JOB_PURGED) {
+ if (!config.no_purge) {
+ lprintf(t, DBG, "purge %s", rec->jobid);
+ if ((t->dbcaps & GLITE_LBU_DB_CAP_PREPARED) == 0) {
+ trio_asprintf(&sql, "DELETE FROM " RTM_DB_TABLE_JOBS " WHERE jobid='%|Ss' AND lb='%|Ss'", rec->unique_str, rec->lb);
+ lprintf(t, INS, "delete: %s", sql);
+ if (glite_lbu_ExecSQL(t->dbctx, sql, NULL) == -1) {
+ lprintf_dbctx(t, WRN, "can't delete job %s", rec->jobid);
+ goto quit;
+ }
+ } else {
+ if (glite_lbu_ExecPreparedStmt(t->deletecmd, 2,
+ GLITE_LBU_DB_TYPE_VARCHAR, rec->unique_str,
+ GLITE_LBU_DB_TYPE_VARCHAR, rec->lb
+ ) == -1) {
+ lprintf_dbctx(t, WRN, "can't delete job %s", rec->jobid);
+ goto quit;
+ }
+ }
+ }
+ } else {
if ((t->dbcaps & GLITE_LBU_DB_CAP_PREPARED) == 0) {
-
- glite_lbu_TimestampToDB(t->dbctx, state_entered, &state_entered_str);
- glite_lbu_TimestampToDB(t->dbctx, rtm_timestamp, &rtm_timestamp_str);
- glite_lbu_TimeToDB(t->dbctx, registered, ®time_str);
-
- if (vo) trio_asprintf(&sql_part, ", vo='%|Ss' ", vo);
- else sql_part = strdup("");
- trio_asprintf(&sql, "UPDATE " RTM_DB_TABLE_JOBS " SET ce='%|Ss', queue='%|Ss', rb='%|Ss', ui='%|Ss', state='%|Ss', state_entered=%s, rtm_timestamp=%s, active=%s, state_changed=%s, registered=%s%sWHERE jobid='%|Ss' AND lb='%|Ss'", ce, queue, rb, ui, state, state_entered_str, rtm_timestamp_str, active, state_changed, regtime_str, sql_part, unique_str, lb);
- free(sql_part);
- lprintf(t, INS, "update: %s", sql);
- switch (glite_lbu_ExecSQL(t->dbctx, sql, NULL)) {
- case -1:
- lprintf_dbctx(t, ERR, "can't get jobs");
- goto quit;
- case 0:
- trio_asprintf(&sql2, "INSERT INTO " RTM_DB_TABLE_JOBS " "
- "(ce, queue, rb, ui, state, state_entered, rtm_timestamp, jobid, lb, active, state_changed, registered, vo) VALUES "
- "('%|Ss', '%|Ss', '%|Ss', '%|Ss', '%|Ss', %s, %s, '%|Ss', '%|Ss', %s, %s, %s, '%|Ss')", ce, queue, rb, ui, state, state_entered_str, rtm_timestamp_str, unique_str, lb, active, state_changed, regtime_str, vo ? : "unknown");
- lprintf(t, INS, "insert: %s", sql2);
- if (glite_lbu_ExecSQL(t->dbctx, sql2, NULL) == -1) {
- lprintf_dbctx(t, ERR, "can't insert job");
+ glite_lbu_TimestampToDB(t->dbctx, rec->state_entered, &state_entered_str);
+ glite_lbu_TimestampToDB(t->dbctx, rec->rtm_timestamp, &rtm_timestamp_str);
+ glite_lbu_TimeToDB(t->dbctx, rec->registered, ®time_str);
+
+ if (rec->vo) trio_asprintf(&sql_part, ", vo='%|Ss' ", rec->vo);
+ else sql_part = strdup("");
+ trio_asprintf(&sql, "UPDATE " RTM_DB_TABLE_JOBS " SET ce='%|Ss', queue='%|Ss', rb='%|Ss', ui='%|Ss', state='%|Ss', state_entered=%s, rtm_timestamp=%s, active=%s, state_changed=%s, registered=%s%sWHERE jobid='%|Ss' AND lb='%|Ss'", rec->ce, rec->queue, rec->rb, rec->ui, rec->state, state_entered_str, rtm_timestamp_str, active, state_changed, regtime_str, sql_part, rec->unique_str, rec->lb);
+ lprintf(t, INS, "update: %s", sql);
+ switch (glite_lbu_ExecSQL(t->dbctx, sql, NULL)) {
+ case -1:
+ lprintf_dbctx(t, ERR, "can't get jobs");
goto quit;
+ case 0:
+ trio_asprintf(&sql2, "INSERT INTO " RTM_DB_TABLE_JOBS " "
+ "(ce, queue, rb, ui, state, state_entered, rtm_timestamp, jobid, lb, active, state_changed, registered, vo) VALUES "
+ "('%|Ss', '%|Ss', '%|Ss', '%|Ss', '%|Ss', %s, %s, '%|Ss', '%|Ss', %s, %s, %s, '%|Ss')", rec->ce, rec->queue, rec->rb, rec->ui, rec->state, state_entered_str, rtm_timestamp_str, rec->unique_str, rec->lb, active, state_changed, regtime_str, rec->vo ? : "unknown");
+ lprintf(t, INS, "insert: %s", sql2);
+ if (glite_lbu_ExecSQL(t->dbctx, sql2, NULL) == -1) {
+ lprintf_dbctx(t, ERR, "can't insert job");
+ goto quit;
+ }
+ break;
+ default:
+ break;
}
- break;
- default:
- break;
- }
-
} else { // prepared commands
int ret;
- if (vo) {
+ if (rec->vo) {
ret = glite_lbu_ExecPreparedStmt(t->updatecmd_vo, 13,
- GLITE_LBU_DB_TYPE_VARCHAR, ce,
- GLITE_LBU_DB_TYPE_VARCHAR, queue,
- GLITE_LBU_DB_TYPE_VARCHAR, rb,
- GLITE_LBU_DB_TYPE_VARCHAR, ui,
- GLITE_LBU_DB_TYPE_VARCHAR, state,
- GLITE_LBU_DB_TYPE_TIMESTAMP, state_entered,
- GLITE_LBU_DB_TYPE_TIMESTAMP, rtm_timestamp,
+ GLITE_LBU_DB_TYPE_VARCHAR, rec->ce,
+ GLITE_LBU_DB_TYPE_VARCHAR, rec->queue,
+ GLITE_LBU_DB_TYPE_VARCHAR, rec->rb,
+ GLITE_LBU_DB_TYPE_VARCHAR, rec->ui,
+ GLITE_LBU_DB_TYPE_VARCHAR, rec->state,
+ GLITE_LBU_DB_TYPE_TIMESTAMP, rec->state_entered,
+ GLITE_LBU_DB_TYPE_TIMESTAMP, rec->rtm_timestamp,
GLITE_LBU_DB_TYPE_BOOLEAN, 1, // active
GLITE_LBU_DB_TYPE_BOOLEAN, 1, // state_changed
- GLITE_LBU_DB_TYPE_TIMESTAMP, (double)registered,
- GLITE_LBU_DB_TYPE_VARCHAR, vo, // VO
+ GLITE_LBU_DB_TYPE_TIMESTAMP, (double)rec->registered,
+ GLITE_LBU_DB_TYPE_VARCHAR, rec->vo, // VO
- GLITE_LBU_DB_TYPE_VARCHAR, unique_str, // jobid
- GLITE_LBU_DB_TYPE_VARCHAR, lb // L&B server
+ GLITE_LBU_DB_TYPE_VARCHAR, rec->unique_str, // jobid
+ GLITE_LBU_DB_TYPE_VARCHAR, rec->lb // L&B server
);
} else {
ret = glite_lbu_ExecPreparedStmt(t->updatecmd, 12,
- GLITE_LBU_DB_TYPE_VARCHAR, ce,
- GLITE_LBU_DB_TYPE_VARCHAR, queue,
- GLITE_LBU_DB_TYPE_VARCHAR, rb,
- GLITE_LBU_DB_TYPE_VARCHAR, ui,
- GLITE_LBU_DB_TYPE_VARCHAR, state,
- GLITE_LBU_DB_TYPE_TIMESTAMP, state_entered,
- GLITE_LBU_DB_TYPE_TIMESTAMP, rtm_timestamp,
+ GLITE_LBU_DB_TYPE_VARCHAR, rec->ce,
+ GLITE_LBU_DB_TYPE_VARCHAR, rec->queue,
+ GLITE_LBU_DB_TYPE_VARCHAR, rec->rb,
+ GLITE_LBU_DB_TYPE_VARCHAR, rec->ui,
+ GLITE_LBU_DB_TYPE_VARCHAR, rec->state,
+ GLITE_LBU_DB_TYPE_TIMESTAMP, rec->state_entered,
+ GLITE_LBU_DB_TYPE_TIMESTAMP, rec->rtm_timestamp,
GLITE_LBU_DB_TYPE_BOOLEAN, 1, // active
GLITE_LBU_DB_TYPE_BOOLEAN, 1, // state_changed
- GLITE_LBU_DB_TYPE_TIMESTAMP, (double)registered,
+ GLITE_LBU_DB_TYPE_TIMESTAMP, (double)rec->registered,
- GLITE_LBU_DB_TYPE_VARCHAR, unique_str, // jobid
- GLITE_LBU_DB_TYPE_VARCHAR, lb // L&B server
+ GLITE_LBU_DB_TYPE_VARCHAR, rec->unique_str, // jobid
+ GLITE_LBU_DB_TYPE_VARCHAR, rec->lb // L&B server
);
}
goto quit;
case 0:
if (glite_lbu_ExecPreparedStmt(t->insertcmd, 13,
- GLITE_LBU_DB_TYPE_VARCHAR, ce,
- GLITE_LBU_DB_TYPE_VARCHAR, queue,
- GLITE_LBU_DB_TYPE_VARCHAR, rb,
- GLITE_LBU_DB_TYPE_VARCHAR, ui,
- GLITE_LBU_DB_TYPE_VARCHAR, state,
- GLITE_LBU_DB_TYPE_TIMESTAMP, state_entered,
- GLITE_LBU_DB_TYPE_TIMESTAMP, rtm_timestamp,
- GLITE_LBU_DB_TYPE_VARCHAR, unique_str, // jobid
- GLITE_LBU_DB_TYPE_VARCHAR, lb, // L&B server
+ GLITE_LBU_DB_TYPE_VARCHAR, rec->ce,
+ GLITE_LBU_DB_TYPE_VARCHAR, rec->queue,
+ GLITE_LBU_DB_TYPE_VARCHAR, rec->rb,
+ GLITE_LBU_DB_TYPE_VARCHAR, rec->ui,
+ GLITE_LBU_DB_TYPE_VARCHAR, rec->state,
+ GLITE_LBU_DB_TYPE_TIMESTAMP, rec->state_entered,
+ GLITE_LBU_DB_TYPE_TIMESTAMP, rec->rtm_timestamp,
+ GLITE_LBU_DB_TYPE_VARCHAR, rec->unique_str, // jobid
+ GLITE_LBU_DB_TYPE_VARCHAR, rec->lb, // L&B server
GLITE_LBU_DB_TYPE_BOOLEAN, 1, // active
GLITE_LBU_DB_TYPE_BOOLEAN, 1, // state_changed
- GLITE_LBU_DB_TYPE_TIMESTAMP, (double)registered,
- GLITE_LBU_DB_TYPE_VARCHAR, vo ? : "unknown" // VO
+ GLITE_LBU_DB_TYPE_TIMESTAMP, (double)rec->registered,
+ GLITE_LBU_DB_TYPE_VARCHAR, rec->vo ? : "unknown" // VO
) == -1) {
lprintf_dbctx(t, ERR, "can't insert to " RTM_DB_TABLE_JOBS " table");
goto quit;
break;
}
} // prepare commands
+ }
+
+quit:
+ free(sql);
+ free(sql2);
+ free(sql_part);
+ free(state_entered_str);
+ free(rtm_timestamp_str);
+ free(regtime_str);
+}
+
+static int db_store_change(thread_t *t, notif_t *notif, __attribute((unused))int index, edg_wll_JobStat *stat) {
+ char *jobid_str = NULL, *state_str = NULL, *vo = NULL, *lbhost = NULL;
+ unsigned int port;
+
+ jobid_str = stat->jobId ? glite_jobid_unparse(stat->jobId) : strdup("unknown");
+ glite_jobid_getServerParts(stat->jobId, &lbhost, &port);
+ state_str = edg_wll_StatToString(stat->state);
+ vo = edg_wll_JDLField(stat, "VirtualOrganisation");
+ printf(RTM_TTY_GREEN "notifid: %s (%s), jobid: %s, state: %s, vo: %s, last time: %lf" RTM_TTY_RST "\n", notif->id_str, rtm_notiftype2str(notif->type), jobid_str, state_str, vo, notif->last_update);
+
+#ifdef WITH_LBU_DB
+ if (config.dbcs && t->dbctx) {
+ db_job_t rec;
+ char *colon;
+ char *unique_str = NULL, *network_server = NULL;
+
+ memset(&rec, 0, sizeof rec);
+ // L&B server
+ rec.lb = lbhost;
+ // jobid + uniqe
+ unique_str = glite_jobid_getUnique(stat->jobId);
+ rec.unique_str = unique_str;
+ rec.jobid = jobid_str;
+ // CE
+ rec.ce = stat->destination ? : "unknown";
+ // queue
+ rec.queue = strchr(rec.ce, '/');
+ if (rec.queue) *rec.queue++='\0';
+ else rec.queue = "unknown";
+ colon = strchr(rec.ce, ':');
+ if (colon) colon[0] = '\0';
+ // Virtual Organization
+ rec.vo = vo;
+ // Resource Broker
+ network_server = stat->network_server ? rtm_ns2hostname(stat->network_server) : strdup("unknown");
+ rec.rb = network_server;
+ // UI
+ rec.ui = stat->ui_host ? : "unknown";
+ // state
+ rec.state = state_str ? : "unknown";
+ // state time
+ rec.state_entered = stat->stateEnterTime.tv_sec + stat->stateEnterTime.tv_usec / 1000000.0;
+ // notification time
+ rec.rtm_timestamp = rtm_gettimeofday();
+ // registration time
+ rec.registered = stat->stateEnterTimes[1 + EDG_WLL_JOB_SUBMITTED];
+
+ // store!
+ db_store_change_perform_sql(t, stat->state, &rec);
+
+ free(unique_str);
+ free(network_server);
}
#endif
quit:
free(jobid_str);
free(state_str);
- free(sql);
- free(sql2);
- free(state_entered_str);
- free(rtm_timestamp_str);
free(lbhost);
- free(unique_str);
- free(regtime_str);
free(vo);
return 0;
"SET ce=$1, queue=$2, rb=$3, ui=$4, state=$5, state_entered=$6, rtm_timestamp=$7, active=$8, state_changed=$9, registered=$10, vo=$11 WHERE jobid=$12 AND lb=$13",
&t->updatecmd_vo) != 0 || glite_lbu_PrepareStmt(t->dbctx, "UPDATE " DBAMP RTM_DB_TABLE_LBS DBAMP " "
"SET monitored=$1 WHERE ip=$2",
- &t->updatecmd_mon)) {
+ &t->updatecmd_mon) != 0 || glite_lbu_PrepareStmt(t->dbctx, "DELETE FROM " DBAMP RTM_DB_TABLE_JOBS DBAMP " WHERE jobid=$1 AND lb=$2",
+ &t->deletecmd) != 0) {
lprintf_dbctx(t, ERR, "can't create prepare commands");
- lprintf(t, DBG, "insertcmd=%p, updatecmd=%p, updatecmd_vo=%p, updatecmd_mon=%p", t->insertcmd, t->updatecmd, t->updatecmd_vo, t->updatecmd_mon);
+ lprintf(t, DBG, "insertcmd=%p, updatecmd=%p, updatecmd_vo=%p, updatecmd_mon=%p, deletecmd=%p", t->insertcmd, t->updatecmd, t->updatecmd_vo, t->updatecmd_mon, t->deletecmd);
quit = RTM_QUIT_PRESERVE;
}
}
if (t->updatecmd) glite_lbu_FreeStmt(&t->updatecmd);
if (t->updatecmd_vo) glite_lbu_FreeStmt(&t->updatecmd_vo);
if (t->updatecmd_mon) glite_lbu_FreeStmt(&t->updatecmd_mon);
+ if (t->deletecmd) glite_lbu_FreeStmt(&t->deletecmd);
db_free(t, t->dbctx);
#endif
if (ctx) edg_wll_FreeContext(ctx);
" -K, --key X509 key file\n"
" -o, --old \"silly\" mode for old L&B 1.9 servers\n"
" -l, --cleanup clean up the notifications and exit\n"
+ " -u, --no-purge disable purging from RTM database\n"
" -w, --wlcg enable messaging for dashboard\n"
" --wlcg-binary full path to msg-publish binary\n"
" --wlcg-topic topic for msg-publish\n"
case 'o':
config.silly = 1;
break;
+ case 'u':
+ config.no_purge = 1;
+ break;
case 0:
switch(index) {
case 0:
lprintf(NULL, INF, "daemonize: %s", config.daemonize ? "enabled" : "disabled");
lprintf(NULL, INF, "fork guard: %s", config.guard ? "enabled" : "disabled");
lprintf(NULL, INF, "silly compatibility mode: %s", config.silly ? "enabled" : "disabled");
+ lprintf(NULL, INF, "purge: %s", !config.no_purge ? "enabled" : "disabled");
}
return 0;