Get Resource Broker from JDL for CREAM jobs (two fields not to reset to 'unknown...
authorFrantišek Dvořák <valtri@civ.zcu.cz>
Wed, 7 Apr 2010 16:22:12 +0000 (16:22 +0000)
committerFrantišek Dvořák <valtri@civ.zcu.cz>
Wed, 7 Apr 2010 16:22:12 +0000 (16:22 +0000)
org.glite.lb.harvester/src/harvester.c

index 14901d5..658d021 100644 (file)
@@ -179,7 +179,7 @@ typedef struct {
        int dash_fd;
 #ifdef WITH_LBU_DB
        glite_lbu_DBContext dbctx;
-       glite_lbu_Statement insertcmd, updatecmd, updatecmd_vo, updatecmd_mon, deletecmd;
+       glite_lbu_Statement insertcmd, updatecmd, updatecmd_vo, updatecmd_rb, updatecmd_mon, deletecmd;
        int dbcaps;
 #endif
 } thread_t;
@@ -1036,8 +1036,9 @@ static void db_store_change_perform_sql(thread_t *t, edg_wll_JobStatCode state,
                        glite_lbu_TimeToDB(t->dbctx, rec->registered, &regtime_str);
 
                        if (rec->vo) trio_asprintf(&sql_part, ", vo='%|Ss' ", rec->vo);
-                       else sql_part = strdup("");
-                       trio_asprintf(&sql, "UPDATE " RTM_DB_TABLE_JOBS " SET ce='%|Ss', queue='%|Ss', rb='%|Ss', ui='%|Ss', state='%|Ss', state_entered=%s, rtm_timestamp=%s, active=%s, state_changed=%s, registered=%s%sWHERE jobid='%|Ss' AND lb='%|Ss'", rec->ce, rec->queue, rec->rb, rec->ui, rec->state, state_entered_str, rtm_timestamp_str, active, state_changed, regtime_str, sql_part, rec->unique_str, rec->lb);
+                       if (rec->rb) trio_asprintf(&sql_part, ", rb='%|Ss' ", rec->rb);
+                       if (!sql_part) sql_part = strdup(" ");
+                       trio_asprintf(&sql, "UPDATE " RTM_DB_TABLE_JOBS " SET ce='%|Ss', queue='%|Ss', ui='%|Ss', state='%|Ss', state_entered=%s, rtm_timestamp=%s, active=%s, state_changed=%s, registered=%s%sWHERE jobid='%|Ss' AND lb='%|Ss'", rec->ce, rec->queue, rec->ui, rec->state, state_entered_str, rtm_timestamp_str, active, state_changed, regtime_str, sql_part, rec->unique_str, rec->lb);
                        lprintf(t, INS, "update: %s", sql);
                        switch (glite_lbu_ExecSQL(t->dbctx, sql, NULL)) {
                        case -1:
@@ -1046,7 +1047,7 @@ static void db_store_change_perform_sql(thread_t *t, edg_wll_JobStatCode state,
                        case 0:
                                trio_asprintf(&sql2, "INSERT INTO " RTM_DB_TABLE_JOBS " "
                                        "(ce, queue, rb, ui, state, state_entered, rtm_timestamp, jobid, lb, active, state_changed, registered, vo) VALUES "
-                                       "('%|Ss', '%|Ss', '%|Ss', '%|Ss', '%|Ss', %s, %s, '%|Ss', '%|Ss', %s, %s, %s, '%|Ss')", rec->ce, rec->queue, rec->rb, rec->ui, rec->state, state_entered_str, rtm_timestamp_str, rec->unique_str, rec->lb, active, state_changed, regtime_str, rec->vo ? : "unknown");
+                                       "('%|Ss', '%|Ss', '%|Ss', '%|Ss', '%|Ss', %s, %s, '%|Ss', '%|Ss', %s, %s, %s, '%|Ss')", rec->ce, rec->queue, rec->rb ? : "unknown", rec->ui, rec->state, state_entered_str, rtm_timestamp_str, rec->unique_str, rec->lb, active, state_changed, regtime_str, rec->vo ? : "unknown");
                                lprintf(t, INS, "insert: %s", sql2);
                                if (glite_lbu_ExecSQL(t->dbctx, sql2, NULL) == -1) {
                                        lprintf_dbctx(t, ERR, "can't insert job");
@@ -1059,40 +1060,20 @@ static void db_store_change_perform_sql(thread_t *t, edg_wll_JobStatCode state,
                } else { // prepared commands
                        int ret;
 
-                       if (rec->vo) {
-                               ret = glite_lbu_ExecPreparedStmt(t->updatecmd_vo, 13,
-                                       GLITE_LBU_DB_TYPE_VARCHAR, rec->ce,
-                                       GLITE_LBU_DB_TYPE_VARCHAR, rec->queue,
-                                       GLITE_LBU_DB_TYPE_VARCHAR, rec->rb,
-                                       GLITE_LBU_DB_TYPE_VARCHAR, rec->ui,
-                                       GLITE_LBU_DB_TYPE_VARCHAR, rec->state,
-                                       GLITE_LBU_DB_TYPE_TIMESTAMP, rec->state_entered,
-                                       GLITE_LBU_DB_TYPE_TIMESTAMP, rec->rtm_timestamp,
-                                       GLITE_LBU_DB_TYPE_BOOLEAN, 1, // active
-                                       GLITE_LBU_DB_TYPE_BOOLEAN, 1, // state_changed
-                                       GLITE_LBU_DB_TYPE_TIMESTAMP, (double)rec->registered,
-                                       GLITE_LBU_DB_TYPE_VARCHAR, rec->vo, // VO
-
-                                       GLITE_LBU_DB_TYPE_VARCHAR, rec->unique_str, // jobid
-                                       GLITE_LBU_DB_TYPE_VARCHAR, rec->lb // L&B server
-                               );
-                       } else {
-                               ret = glite_lbu_ExecPreparedStmt(t->updatecmd, 12,
-                                       GLITE_LBU_DB_TYPE_VARCHAR, rec->ce,
-                                       GLITE_LBU_DB_TYPE_VARCHAR, rec->queue,
-                                       GLITE_LBU_DB_TYPE_VARCHAR, rec->rb,
-                                       GLITE_LBU_DB_TYPE_VARCHAR, rec->ui,
-                                       GLITE_LBU_DB_TYPE_VARCHAR, rec->state,
-                                       GLITE_LBU_DB_TYPE_TIMESTAMP, rec->state_entered,
-                                       GLITE_LBU_DB_TYPE_TIMESTAMP, rec->rtm_timestamp,
-                                       GLITE_LBU_DB_TYPE_BOOLEAN, 1, // active
-                                       GLITE_LBU_DB_TYPE_BOOLEAN, 1, // state_changed
-                                       GLITE_LBU_DB_TYPE_TIMESTAMP, (double)rec->registered,
-
-                                       GLITE_LBU_DB_TYPE_VARCHAR, rec->unique_str, // jobid
-                                       GLITE_LBU_DB_TYPE_VARCHAR, rec->lb // L&B server
-                               );
-                       }
+                       ret = glite_lbu_ExecPreparedStmt(t->updatecmd, 11,
+                               GLITE_LBU_DB_TYPE_VARCHAR, rec->ce,
+                               GLITE_LBU_DB_TYPE_VARCHAR, rec->queue,
+                               GLITE_LBU_DB_TYPE_VARCHAR, rec->ui,
+                               GLITE_LBU_DB_TYPE_VARCHAR, rec->state,
+                               GLITE_LBU_DB_TYPE_TIMESTAMP, rec->state_entered,
+                               GLITE_LBU_DB_TYPE_TIMESTAMP, rec->rtm_timestamp,
+                               GLITE_LBU_DB_TYPE_BOOLEAN, 1, // active
+                               GLITE_LBU_DB_TYPE_BOOLEAN, 1, // state_changed
+                               GLITE_LBU_DB_TYPE_TIMESTAMP, (double)rec->registered,
+
+                               GLITE_LBU_DB_TYPE_VARCHAR, rec->unique_str, // jobid
+                               GLITE_LBU_DB_TYPE_VARCHAR, rec->lb // L&B server
+                       );
 
                        switch (ret) {
                        case -1:
@@ -1102,7 +1083,7 @@ static void db_store_change_perform_sql(thread_t *t, edg_wll_JobStatCode state,
                                if (glite_lbu_ExecPreparedStmt(t->insertcmd, 13,
                                        GLITE_LBU_DB_TYPE_VARCHAR, rec->ce,
                                        GLITE_LBU_DB_TYPE_VARCHAR, rec->queue,
-                                       GLITE_LBU_DB_TYPE_VARCHAR, rec->rb,
+                                       GLITE_LBU_DB_TYPE_VARCHAR, rec->rb ? : "unknown",
                                        GLITE_LBU_DB_TYPE_VARCHAR, rec->ui,
                                        GLITE_LBU_DB_TYPE_VARCHAR, rec->state,
                                        GLITE_LBU_DB_TYPE_TIMESTAMP, rec->state_entered,
@@ -1119,6 +1100,24 @@ static void db_store_change_perform_sql(thread_t *t, edg_wll_JobStatCode state,
                                }
                                break;
                        default:
+                               if (rec->vo) {
+                                       if (glite_lbu_ExecPreparedStmt(t->updatecmd_vo, 3,
+                                               GLITE_LBU_DB_TYPE_VARCHAR, rec->vo, // VO
+
+                                               GLITE_LBU_DB_TYPE_VARCHAR, rec->unique_str, // jobid
+                                               GLITE_LBU_DB_TYPE_VARCHAR, rec->lb // L&B server
+                                       ) == -1)
+                                               lprintf(t, ERR, "can't update VO in "  RTM_DB_TABLE_JOBS " table");
+                               }
+                               if (rec->rb) {
+                                       if (glite_lbu_ExecPreparedStmt(t->updatecmd_rb, 3,
+                                               GLITE_LBU_DB_TYPE_VARCHAR, rec->rb, // RB
+
+                                               GLITE_LBU_DB_TYPE_VARCHAR, rec->unique_str, // jobid
+                                               GLITE_LBU_DB_TYPE_VARCHAR, rec->lb // L&B server
+                                       ) == -1)
+                                               lprintf(t, ERR, "can't update RB in "  RTM_DB_TABLE_JOBS " table");
+                               }
                                break;
                        }
                } // prepare commands
@@ -1149,7 +1148,7 @@ static int db_store_change(thread_t *t, notif_t *notif, __attribute((unused))int
        if (config.dbcs && t->dbctx) {
                db_job_t rec;
                char *colon;
-               char *unique_str = NULL, *network_server = NULL, *destination = NULL;
+               char *unique_str = NULL, *resource_broker = NULL, *destination = NULL;
 
                memset(&rec, 0, sizeof rec);
                // L&B server
@@ -1169,9 +1168,13 @@ static int db_store_change(thread_t *t, notif_t *notif, __attribute((unused))int
                if (colon) colon[0] = '\0';
                // Virtual Organization
                rec.vo = vo;
-               // Resource Broker
-               network_server = stat->network_server ? rtm_url2hostname(stat->network_server) : strdup("unknown");
-               rec.rb = network_server;
+               // Resource Broker (CREAM jobs from JDL, gLite jobs hostname from Network Server URL)
+               if (stat->jobtype == EDG_WLL_STAT_CREAM) {
+                       resource_broker = edg_wll_JDLField(stat, "SubmitterService");
+               } else {
+                       resource_broker = stat->network_server ? rtm_url2hostname(stat->network_server) : NULL;
+               }
+               rec.rb = resource_broker;
                // UI
                rec.ui = stat->ui_host ? : "unknown";
                // state
@@ -1187,7 +1190,7 @@ static int db_store_change(thread_t *t, notif_t *notif, __attribute((unused))int
                db_store_change_perform_sql(t, stat->state, &rec);
 
                free(unique_str);
-               free(network_server);
+               free(resource_broker);
                free(destination);
        }
 #endif
@@ -1677,15 +1680,17 @@ void *notify_thread(void *thread_data) {
                            " VALUES "
                            "($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)",
                        &t->insertcmd) != 0 || glite_lbu_PrepareStmt(t->dbctx, "UPDATE " DBAMP RTM_DB_TABLE_JOBS DBAMP " "
-                           "SET ce=$1, queue=$2, rb=$3, ui=$4, state=$5, state_entered=$6, rtm_timestamp=$7, active=$8, state_changed=$9, registered=$10 WHERE jobid=$11 AND lb=$12", 
+                           "SET ce=$1, queue=$2, ui=$3, state=$4, state_entered=$5, rtm_timestamp=$6, active=$7, state_changed=$8, registered=$9 WHERE jobid=$10 AND lb=$11", 
                        &t->updatecmd) != 0 || glite_lbu_PrepareStmt(t->dbctx, "UPDATE " DBAMP RTM_DB_TABLE_JOBS DBAMP " "
-                           "SET ce=$1, queue=$2, rb=$3, ui=$4, state=$5, state_entered=$6, rtm_timestamp=$7, active=$8, state_changed=$9, registered=$10, vo=$11 WHERE jobid=$12 AND lb=$13", 
+                           "SET vo=$1 WHERE jobid=$2 AND lb=$3", 
                        &t->updatecmd_vo) != 0 || glite_lbu_PrepareStmt(t->dbctx, "UPDATE " DBAMP RTM_DB_TABLE_LBS DBAMP " "
+                           "SET rb=$1 WHERE jobid=$2 AND lb=$3",
+                       &t->updatecmd_rb) != 0 || glite_lbu_PrepareStmt(t->dbctx, "UPDATE " DBAMP RTM_DB_TABLE_LBS DBAMP " "
                            "SET monitored=$1 WHERE ip=$2",
                        &t->updatecmd_mon) != 0 || glite_lbu_PrepareStmt(t->dbctx, "DELETE FROM " DBAMP RTM_DB_TABLE_JOBS DBAMP " WHERE jobid=$1 AND lb=$2",
                        &t->deletecmd) != 0) {
                                lprintf_dbctx(t, ERR, "can't create prepare commands");
-                               lprintf(t, DBG, "insertcmd=%p, updatecmd=%p, updatecmd_vo=%p, updatecmd_mon=%p, deletecmd=%p", t->insertcmd, t->updatecmd, t->updatecmd_vo, t->updatecmd_mon, t->deletecmd);
+                               lprintf(t, DBG, "insertcmd=%p, updatecmd=%p, updatecmd_vo=%p, updatecmd_rb=%p, updatecmd_mon=%p, deletecmd=%p", t->insertcmd, t->updatecmd, t->updatecmd_vo, t->updatecmd_rb, t->updatecmd_mon, t->deletecmd);
                                quit = RTM_QUIT_PRESERVE;
                        }
                }
@@ -1800,7 +1805,7 @@ void *notify_thread(void *thread_data) {
                                }
                                if (edg_wll_NotifNew(ctx, (edg_wll_QueryRec const * const *) conditions, flags, sock, config.local_address, &notif->id, &notif->valid)) {
                                        memset(condition,0,sizeof condition);
-                                       lprintf_ctx(t, ERR, ctx, "can't create notification on %s:%d", notif->server, notif->port);
+                                       lprintf_ctx(t, ERR, ctx, "can't create %d. notification for %s:%d (%s)", i, notif->server, notif->port, rtm_notiftype2str(notif->type));
                                        notif->valid = 0;
                                        notif->id = NULL;
                                        rtm_update_error_state(t, notif, i, 1);
@@ -2028,6 +2033,7 @@ exit:
        if (t->insertcmd) glite_lbu_FreeStmt(&t->insertcmd);
        if (t->updatecmd) glite_lbu_FreeStmt(&t->updatecmd);
        if (t->updatecmd_vo) glite_lbu_FreeStmt(&t->updatecmd_vo);
+       if (t->updatecmd_rb) glite_lbu_FreeStmt(&t->updatecmd_rb);
        if (t->updatecmd_mon) glite_lbu_FreeStmt(&t->updatecmd_mon);
        if (t->deletecmd) glite_lbu_FreeStmt(&t->deletecmd);
        db_free(t, t->dbctx);