From e1bc02faecebaa9a467280ac710bc08e304378d0 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Franti=C5=A1ek=20Dvo=C5=99=C3=A1k?= Date: Wed, 7 Apr 2010 16:22:12 +0000 Subject: [PATCH] Get Resource Broker from JDL for CREAM jobs (two fields not to reset to 'unknown' now: vo, rb). --- org.glite.lb.harvester/src/harvester.c | 102 +++++++++++++++++---------------- 1 file changed, 54 insertions(+), 48 deletions(-) diff --git a/org.glite.lb.harvester/src/harvester.c b/org.glite.lb.harvester/src/harvester.c index 14901d5..658d021 100644 --- a/org.glite.lb.harvester/src/harvester.c +++ b/org.glite.lb.harvester/src/harvester.c @@ -179,7 +179,7 @@ typedef struct { int dash_fd; #ifdef WITH_LBU_DB glite_lbu_DBContext dbctx; - glite_lbu_Statement insertcmd, updatecmd, updatecmd_vo, updatecmd_mon, deletecmd; + glite_lbu_Statement insertcmd, updatecmd, updatecmd_vo, updatecmd_rb, updatecmd_mon, deletecmd; int dbcaps; #endif } thread_t; @@ -1036,8 +1036,9 @@ static void db_store_change_perform_sql(thread_t *t, edg_wll_JobStatCode state, glite_lbu_TimeToDB(t->dbctx, rec->registered, ®time_str); if (rec->vo) trio_asprintf(&sql_part, ", vo='%|Ss' ", rec->vo); - else sql_part = strdup(""); - trio_asprintf(&sql, "UPDATE " RTM_DB_TABLE_JOBS " SET ce='%|Ss', queue='%|Ss', rb='%|Ss', ui='%|Ss', state='%|Ss', state_entered=%s, rtm_timestamp=%s, active=%s, state_changed=%s, registered=%s%sWHERE jobid='%|Ss' AND lb='%|Ss'", rec->ce, rec->queue, rec->rb, rec->ui, rec->state, state_entered_str, rtm_timestamp_str, active, state_changed, regtime_str, sql_part, rec->unique_str, rec->lb); + if (rec->rb) trio_asprintf(&sql_part, ", rb='%|Ss' ", rec->rb); + if (!sql_part) sql_part = strdup(" "); + trio_asprintf(&sql, "UPDATE " RTM_DB_TABLE_JOBS " SET ce='%|Ss', queue='%|Ss', ui='%|Ss', state='%|Ss', state_entered=%s, rtm_timestamp=%s, active=%s, state_changed=%s, registered=%s%sWHERE jobid='%|Ss' AND lb='%|Ss'", rec->ce, rec->queue, rec->ui, rec->state, state_entered_str, rtm_timestamp_str, active, state_changed, regtime_str, sql_part, rec->unique_str, rec->lb); lprintf(t, INS, "update: %s", sql); switch (glite_lbu_ExecSQL(t->dbctx, sql, NULL)) { case -1: @@ -1046,7 +1047,7 @@ static void db_store_change_perform_sql(thread_t *t, edg_wll_JobStatCode state, case 0: trio_asprintf(&sql2, "INSERT INTO " RTM_DB_TABLE_JOBS " " "(ce, queue, rb, ui, state, state_entered, rtm_timestamp, jobid, lb, active, state_changed, registered, vo) VALUES " - "('%|Ss', '%|Ss', '%|Ss', '%|Ss', '%|Ss', %s, %s, '%|Ss', '%|Ss', %s, %s, %s, '%|Ss')", rec->ce, rec->queue, rec->rb, rec->ui, rec->state, state_entered_str, rtm_timestamp_str, rec->unique_str, rec->lb, active, state_changed, regtime_str, rec->vo ? : "unknown"); + "('%|Ss', '%|Ss', '%|Ss', '%|Ss', '%|Ss', %s, %s, '%|Ss', '%|Ss', %s, %s, %s, '%|Ss')", rec->ce, rec->queue, rec->rb ? : "unknown", rec->ui, rec->state, state_entered_str, rtm_timestamp_str, rec->unique_str, rec->lb, active, state_changed, regtime_str, rec->vo ? : "unknown"); lprintf(t, INS, "insert: %s", sql2); if (glite_lbu_ExecSQL(t->dbctx, sql2, NULL) == -1) { lprintf_dbctx(t, ERR, "can't insert job"); @@ -1059,40 +1060,20 @@ static void db_store_change_perform_sql(thread_t *t, edg_wll_JobStatCode state, } else { // prepared commands int ret; - if (rec->vo) { - ret = glite_lbu_ExecPreparedStmt(t->updatecmd_vo, 13, - GLITE_LBU_DB_TYPE_VARCHAR, rec->ce, - GLITE_LBU_DB_TYPE_VARCHAR, rec->queue, - GLITE_LBU_DB_TYPE_VARCHAR, rec->rb, - GLITE_LBU_DB_TYPE_VARCHAR, rec->ui, - GLITE_LBU_DB_TYPE_VARCHAR, rec->state, - GLITE_LBU_DB_TYPE_TIMESTAMP, rec->state_entered, - GLITE_LBU_DB_TYPE_TIMESTAMP, rec->rtm_timestamp, - GLITE_LBU_DB_TYPE_BOOLEAN, 1, // active - GLITE_LBU_DB_TYPE_BOOLEAN, 1, // state_changed - GLITE_LBU_DB_TYPE_TIMESTAMP, (double)rec->registered, - GLITE_LBU_DB_TYPE_VARCHAR, rec->vo, // VO - - GLITE_LBU_DB_TYPE_VARCHAR, rec->unique_str, // jobid - GLITE_LBU_DB_TYPE_VARCHAR, rec->lb // L&B server - ); - } else { - ret = glite_lbu_ExecPreparedStmt(t->updatecmd, 12, - GLITE_LBU_DB_TYPE_VARCHAR, rec->ce, - GLITE_LBU_DB_TYPE_VARCHAR, rec->queue, - GLITE_LBU_DB_TYPE_VARCHAR, rec->rb, - GLITE_LBU_DB_TYPE_VARCHAR, rec->ui, - GLITE_LBU_DB_TYPE_VARCHAR, rec->state, - GLITE_LBU_DB_TYPE_TIMESTAMP, rec->state_entered, - GLITE_LBU_DB_TYPE_TIMESTAMP, rec->rtm_timestamp, - GLITE_LBU_DB_TYPE_BOOLEAN, 1, // active - GLITE_LBU_DB_TYPE_BOOLEAN, 1, // state_changed - GLITE_LBU_DB_TYPE_TIMESTAMP, (double)rec->registered, - - GLITE_LBU_DB_TYPE_VARCHAR, rec->unique_str, // jobid - GLITE_LBU_DB_TYPE_VARCHAR, rec->lb // L&B server - ); - } + ret = glite_lbu_ExecPreparedStmt(t->updatecmd, 11, + GLITE_LBU_DB_TYPE_VARCHAR, rec->ce, + GLITE_LBU_DB_TYPE_VARCHAR, rec->queue, + GLITE_LBU_DB_TYPE_VARCHAR, rec->ui, + GLITE_LBU_DB_TYPE_VARCHAR, rec->state, + GLITE_LBU_DB_TYPE_TIMESTAMP, rec->state_entered, + GLITE_LBU_DB_TYPE_TIMESTAMP, rec->rtm_timestamp, + GLITE_LBU_DB_TYPE_BOOLEAN, 1, // active + GLITE_LBU_DB_TYPE_BOOLEAN, 1, // state_changed + GLITE_LBU_DB_TYPE_TIMESTAMP, (double)rec->registered, + + GLITE_LBU_DB_TYPE_VARCHAR, rec->unique_str, // jobid + GLITE_LBU_DB_TYPE_VARCHAR, rec->lb // L&B server + ); switch (ret) { case -1: @@ -1102,7 +1083,7 @@ static void db_store_change_perform_sql(thread_t *t, edg_wll_JobStatCode state, if (glite_lbu_ExecPreparedStmt(t->insertcmd, 13, GLITE_LBU_DB_TYPE_VARCHAR, rec->ce, GLITE_LBU_DB_TYPE_VARCHAR, rec->queue, - GLITE_LBU_DB_TYPE_VARCHAR, rec->rb, + GLITE_LBU_DB_TYPE_VARCHAR, rec->rb ? : "unknown", GLITE_LBU_DB_TYPE_VARCHAR, rec->ui, GLITE_LBU_DB_TYPE_VARCHAR, rec->state, GLITE_LBU_DB_TYPE_TIMESTAMP, rec->state_entered, @@ -1119,6 +1100,24 @@ static void db_store_change_perform_sql(thread_t *t, edg_wll_JobStatCode state, } break; default: + if (rec->vo) { + if (glite_lbu_ExecPreparedStmt(t->updatecmd_vo, 3, + GLITE_LBU_DB_TYPE_VARCHAR, rec->vo, // VO + + GLITE_LBU_DB_TYPE_VARCHAR, rec->unique_str, // jobid + GLITE_LBU_DB_TYPE_VARCHAR, rec->lb // L&B server + ) == -1) + lprintf(t, ERR, "can't update VO in " RTM_DB_TABLE_JOBS " table"); + } + if (rec->rb) { + if (glite_lbu_ExecPreparedStmt(t->updatecmd_rb, 3, + GLITE_LBU_DB_TYPE_VARCHAR, rec->rb, // RB + + GLITE_LBU_DB_TYPE_VARCHAR, rec->unique_str, // jobid + GLITE_LBU_DB_TYPE_VARCHAR, rec->lb // L&B server + ) == -1) + lprintf(t, ERR, "can't update RB in " RTM_DB_TABLE_JOBS " table"); + } break; } } // prepare commands @@ -1149,7 +1148,7 @@ static int db_store_change(thread_t *t, notif_t *notif, __attribute((unused))int if (config.dbcs && t->dbctx) { db_job_t rec; char *colon; - char *unique_str = NULL, *network_server = NULL, *destination = NULL; + char *unique_str = NULL, *resource_broker = NULL, *destination = NULL; memset(&rec, 0, sizeof rec); // L&B server @@ -1169,9 +1168,13 @@ static int db_store_change(thread_t *t, notif_t *notif, __attribute((unused))int if (colon) colon[0] = '\0'; // Virtual Organization rec.vo = vo; - // Resource Broker - network_server = stat->network_server ? rtm_url2hostname(stat->network_server) : strdup("unknown"); - rec.rb = network_server; + // Resource Broker (CREAM jobs from JDL, gLite jobs hostname from Network Server URL) + if (stat->jobtype == EDG_WLL_STAT_CREAM) { + resource_broker = edg_wll_JDLField(stat, "SubmitterService"); + } else { + resource_broker = stat->network_server ? rtm_url2hostname(stat->network_server) : NULL; + } + rec.rb = resource_broker; // UI rec.ui = stat->ui_host ? : "unknown"; // state @@ -1187,7 +1190,7 @@ static int db_store_change(thread_t *t, notif_t *notif, __attribute((unused))int db_store_change_perform_sql(t, stat->state, &rec); free(unique_str); - free(network_server); + free(resource_broker); free(destination); } #endif @@ -1677,15 +1680,17 @@ void *notify_thread(void *thread_data) { " VALUES " "($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)", &t->insertcmd) != 0 || glite_lbu_PrepareStmt(t->dbctx, "UPDATE " DBAMP RTM_DB_TABLE_JOBS DBAMP " " - "SET ce=$1, queue=$2, rb=$3, ui=$4, state=$5, state_entered=$6, rtm_timestamp=$7, active=$8, state_changed=$9, registered=$10 WHERE jobid=$11 AND lb=$12", + "SET ce=$1, queue=$2, ui=$3, state=$4, state_entered=$5, rtm_timestamp=$6, active=$7, state_changed=$8, registered=$9 WHERE jobid=$10 AND lb=$11", &t->updatecmd) != 0 || glite_lbu_PrepareStmt(t->dbctx, "UPDATE " DBAMP RTM_DB_TABLE_JOBS DBAMP " " - "SET ce=$1, queue=$2, rb=$3, ui=$4, state=$5, state_entered=$6, rtm_timestamp=$7, active=$8, state_changed=$9, registered=$10, vo=$11 WHERE jobid=$12 AND lb=$13", + "SET vo=$1 WHERE jobid=$2 AND lb=$3", &t->updatecmd_vo) != 0 || glite_lbu_PrepareStmt(t->dbctx, "UPDATE " DBAMP RTM_DB_TABLE_LBS DBAMP " " + "SET rb=$1 WHERE jobid=$2 AND lb=$3", + &t->updatecmd_rb) != 0 || glite_lbu_PrepareStmt(t->dbctx, "UPDATE " DBAMP RTM_DB_TABLE_LBS DBAMP " " "SET monitored=$1 WHERE ip=$2", &t->updatecmd_mon) != 0 || glite_lbu_PrepareStmt(t->dbctx, "DELETE FROM " DBAMP RTM_DB_TABLE_JOBS DBAMP " WHERE jobid=$1 AND lb=$2", &t->deletecmd) != 0) { lprintf_dbctx(t, ERR, "can't create prepare commands"); - lprintf(t, DBG, "insertcmd=%p, updatecmd=%p, updatecmd_vo=%p, updatecmd_mon=%p, deletecmd=%p", t->insertcmd, t->updatecmd, t->updatecmd_vo, t->updatecmd_mon, t->deletecmd); + lprintf(t, DBG, "insertcmd=%p, updatecmd=%p, updatecmd_vo=%p, updatecmd_rb=%p, updatecmd_mon=%p, deletecmd=%p", t->insertcmd, t->updatecmd, t->updatecmd_vo, t->updatecmd_rb, t->updatecmd_mon, t->deletecmd); quit = RTM_QUIT_PRESERVE; } } @@ -1800,7 +1805,7 @@ void *notify_thread(void *thread_data) { } if (edg_wll_NotifNew(ctx, (edg_wll_QueryRec const * const *) conditions, flags, sock, config.local_address, ¬if->id, ¬if->valid)) { memset(condition,0,sizeof condition); - lprintf_ctx(t, ERR, ctx, "can't create notification on %s:%d", notif->server, notif->port); + lprintf_ctx(t, ERR, ctx, "can't create %d. notification for %s:%d (%s)", i, notif->server, notif->port, rtm_notiftype2str(notif->type)); notif->valid = 0; notif->id = NULL; rtm_update_error_state(t, notif, i, 1); @@ -2028,6 +2033,7 @@ exit: if (t->insertcmd) glite_lbu_FreeStmt(&t->insertcmd); if (t->updatecmd) glite_lbu_FreeStmt(&t->updatecmd); if (t->updatecmd_vo) glite_lbu_FreeStmt(&t->updatecmd_vo); + if (t->updatecmd_rb) glite_lbu_FreeStmt(&t->updatecmd_rb); if (t->updatecmd_mon) glite_lbu_FreeStmt(&t->updatecmd_mon); if (t->deletecmd) glite_lbu_FreeStmt(&t->deletecmd); db_free(t, t->dbctx); -- 1.8.2.3