Dependency on DB module optional.
authorFrantišek Dvořák <valtri@civ.zcu.cz>
Thu, 12 Nov 2009 14:53:34 +0000 (14:53 +0000)
committerFrantišek Dvořák <valtri@civ.zcu.cz>
Thu, 12 Nov 2009 14:53:34 +0000 (14:53 +0000)
org.glite.lb.harvester/Makefile
org.glite.lb.harvester/examples/test.sh
org.glite.lb.harvester/src/harvester.c

index 5431f1a..6045dbf 100644 (file)
@@ -25,9 +25,13 @@ LDFLAGS:=${LDFLAGS}
 LIBS:=-L${stagedir}/${archlib} -L${stagedir}/lib \
        -lglite_lb_common_${thrflavour} \
        -lglite_lb_client_${thrflavour} \
-       -lglite_lbu_db \
        -lpthread
 
+ifneq ($(GLITE_LB_HARVESTER_WITH_LBU_DB),no)
+CPPFLAGS:=$(CPPFLAGS) -DWITH_LBU_DB=1
+LIBS:=$(LIBS) -lglite_lbu_db
+endif
+
 COMPILE:=libtool --mode=compile ${CC} ${CPPFLAGS} ${CFLAGS}
 LINK:=libtool --mode=link ${CC} ${LDFLAGS}
 INSTALL:=libtool --mode=install install
@@ -61,7 +65,7 @@ harvester-dbg: harvester-dbg.o
        ${LINK} -o $@ $+ ${LDFLAGS} ${LIBS}
 
 harvester-dbg.o: harvester.c
-       ${COMPILE} -Werror -DLOG=1 -DRTM_SQL_STORAGE_ENABLED=1 -c $< -o $@
+       ${COMPILE} -Werror -DLOG=1 -DWITH_RTM_SQL_STORAGE=1 -c $< -o $@
 
 %.o: %.c
        ${COMPILE} -c $<
index 5158688..b6d5753 100755 (executable)
@@ -100,10 +100,13 @@ init() {
 
        jobreg="$GLITE_LOCATION/examples/glite-lb-job_reg -m `hostname -f`:${GLITE_LB_TEST_SERVER_PORT} -s UserInterface"
        logev="$GLITE_LOCATION/bin/glite-lb-logevent -x -S `pwd`/LB/proxy.sockstore.sock -U localhost"
-       for dir in "`pwd`" "`pwd`/../build" "$GLITE_LOCATION/bin"; do
+       for dir in "$GLITE_LOCATION/bin" "`pwd`/../build" "`pwd`"; do
                if [ -x "$dir/glite-lb-harvester-dbg" ]; then
                        rtm="$dir/glite-lb-harvester-dbg"
                fi
+               if [ -x "$dir/harvester-dbg" ]; then
+                       rtm="$dir/harvester-dbg"
+               fi
        done
        if [ -z "$rtm" ]; then
                echo "glite-lb-harvester-dbg not found"
index 8473e45..e7ee7ad 100644 (file)
 #include <fcntl.h>
 #include <time.h>
 #include <glite/security/glite_gss.h>
+#ifdef WITH_LBU_DB
 #include <glite/lbu/trio.h>
 #include <glite/lbu/db.h>
+#endif
 #include <glite/lb/context.h>
 #include <glite/lb/connpool.h>
 #include <glite/lb/notification.h>
@@ -128,11 +130,13 @@ typedef struct {
        int nservers;
        time_t first_refresh;
        char time_s[100];
+       char *dash_filename;
+       int dash_fd;
+#ifdef WITH_LBU_DB
        glite_lbu_DBContext dbctx;
        glite_lbu_Statement insertcmd, updatecmd, updatecmd_vo;
        int dbcaps;
-       char *dash_filename;
-       int dash_fd;
+#endif
 } thread_t;
 
 typedef struct {
@@ -166,7 +170,9 @@ typedef struct {
        pthread_mutex_t lock;
        double last_check;
        int was_summary; // flag for debugging
+#ifdef WITH_LBU_DB
        glite_lbu_DBContext dbctx;
+#endif
 } db_t;
 
 
@@ -229,7 +235,9 @@ db_t db = {
        n: 0,
        maxn: 0,
        lock: PTHREAD_MUTEX_INITIALIZER,
+#ifdef WITH_LBU_DB
        dbctx: NULL
+#endif
 };
 thread_t *threads = NULL;
 volatile sig_atomic_t quit = RTM_QUIT_RUN;
@@ -293,6 +301,7 @@ void lprintf_ctx_func(thread_t *t, edg_wll_Context ctx, int level, const char *f
 }
 
 
+#ifdef WITH_LBU_DB
 void lprintf_dbctx_func(thread_t *t, int level, const char *fmt, ...) {
        va_list ap;
        char *errText = NULL, *errDesc = NULL, *s = NULL;
@@ -309,6 +318,39 @@ void lprintf_dbctx_func(thread_t *t, int level, const char *fmt, ...) {
        free(s);
        va_end(ap);
 }
+#endif
+
+#ifndef WITH_LBU_DB
+time_t glite_lbu_StrToTime(const char *str) {
+       struct tm       tm;
+
+       memset(&tm,0,sizeof(tm));
+       putenv("TZ=UTC"); tzset();
+       sscanf(str,"%4d-%02d-%02d %02d:%02d:%02d",
+               &tm.tm_year,&tm.tm_mon,&tm.tm_mday,
+               &tm.tm_hour,&tm.tm_min,&tm.tm_sec);
+       tm.tm_year -= 1900;
+       tm.tm_mon--;
+
+       return mktime(&tm);
+}
+
+double glite_lbu_StrToTimestamp(const char *str) {
+       struct tm       tm;
+       double  sec;
+
+       memset(&tm,0,sizeof(tm));
+       putenv("TZ=UTC"); tzset();
+       sscanf(str,"%4d-%02d-%02d %02d:%02d:%lf",
+               &tm.tm_year,&tm.tm_mon,&tm.tm_mday,
+               &tm.tm_hour,&tm.tm_min,&sec);
+       tm.tm_year -= 1900;
+       tm.tm_mon--;
+       tm.tm_sec = sec;
+
+       return (sec - tm.tm_sec) + mktime(&tm);
+}
+#endif
 
 
 // hacky time->string conversion
@@ -518,6 +560,7 @@ int notif_copy(notif_t *dest, notif_t *src) {
 }
 
 
+#ifdef WITH_LBU_DB
 static int db_init(thread_t *t, glite_lbu_DBContext *dbctx) {
        int err, dbcaps;
 
@@ -559,6 +602,7 @@ static void db_free(__attribute((unused))thread_t *t, glite_lbu_DBContext dbctx)
                glite_lbu_FreeDBContext(dbctx);
        }
 }
+#endif
 
 
 static notif_t *db_add_notif(char *notifid, int type, time_t valid, time_t refresh, double last_update, char *server, int port, int active) {
@@ -633,7 +677,7 @@ quit:
 }
 
 
-#ifdef RTM_SQL_STORAGE_ENABLED
+#if defined(WITH_RTM_SQL_STORAGE) && defined(WITH_LBU_DB)
 static int db_save_notifs_sql(thread_t *t) {
        int retval = 1;
        notif_t *notif;
@@ -723,7 +767,7 @@ static int db_save_notifs(thread_t *t) {
        }
 #endif
 
-#if RTM_SQL_STORAGE_ENABLED
+#if defined(WITH_RTM_SQL_STORAGE) && defined(WITH_LBU_DB)
        if (!db.dbctx) return db_save_notifs_file(t);
        else return db_save_notifs_sql(t);
 #else
@@ -753,11 +797,7 @@ static notif_t *db_search_notif_by_server(notif_t *notifs, int n, const char *se
 
 static int db_store_change(__attribute((unused))thread_t *t, notif_t *notif, __attribute((unused))int index, edg_wll_JobStat *stat) {
        char *jobid_str = NULL, *state_str = NULL, *sql = NULL, *sql2 = NULL, *state_entered_str = NULL, *rtm_timestamp_str = NULL, *lbhost = NULL, *unique_str = NULL, *regtime_str = NULL,*vo = NULL;
-       const char *rb, *ui, *state, *active, *state_changed, *lb;
-       char *ce, *queue, *colon, *sql_part = NULL;
        unsigned int port;
-       time_t registered;
-       double state_entered, rtm_timestamp;
 
        jobid_str = stat->jobId ? glite_jobid_unparse(stat->jobId) : strdup("unknown");
        glite_jobid_getServerParts(stat->jobId, &lbhost, &port);
@@ -766,7 +806,13 @@ static int db_store_change(__attribute((unused))thread_t *t, notif_t *notif, __a
        vo = edg_wll_JDLField(stat,"VirtualOrganisation");
        printf(RTM_TTY_GREEN "notifid: %s (%s), jobid: %s, state: %s, vo: %s, last time: %lf" RTM_TTY_RST "\n", notif->id_str, rtm_notiftype2str(notif->type), jobid_str, state_str, vo, notif->last_update);
 
+#ifdef WITH_LBU_DB
        if (config.dbcs && t->dbctx) {
+               double state_entered, rtm_timestamp;
+               char *ce, *queue, *colon, *sql_part;
+               const char *rb, *ui, *state, *active, *state_changed, *lb;
+               time_t registered;
+
                ce = stat->destination ? : "unknown";
                queue = strchr(ce, '/');
                if (queue) *queue++='\0';
@@ -792,6 +838,7 @@ static int db_store_change(__attribute((unused))thread_t *t, notif_t *notif, __a
                if (vo) trio_asprintf(&sql_part, ", vo='%|Ss' ", vo);
                else sql_part = strdup("");
                trio_asprintf(&sql, "UPDATE " RTM_DB_TABLE_JOBS " SET ce='%|Ss', queue='%|Ss', rb='%|Ss', ui='%|Ss', state='%|Ss', state_entered=%s, rtm_timestamp=%s, active=%s, state_changed=%s, registered=%s%sWHERE jobid='%|Ss' AND lb='%|Ss'", ce, queue, rb, ui, state, state_entered_str, rtm_timestamp_str, active, state_changed, regtime_str, sql_part, unique_str, lb);
+               free(sql_part);
                lprintf(t, INS, "update: %s", sql);
                switch (glite_lbu_ExecSQL(t->dbctx, sql, NULL)) {
                case -1:
@@ -879,6 +926,7 @@ static int db_store_change(__attribute((unused))thread_t *t, notif_t *notif, __a
                } // prepare commands
 
        }
+#endif
 
        // store message
        if (config.wlcg) {
@@ -1198,7 +1246,7 @@ quit:
 }
 
 
-#ifdef RTM_SQL_STORAGE_ENABLED
+#if defined(WITH_RTM_SQL_STORAGE) && defined(WITH_LBU_DB)
 int load_notifs_sql() {
        notif_t *new_notif;
        int err;
@@ -1274,7 +1322,7 @@ int load_notifs() {
 
        pthread_mutex_lock(&db.lock);
 
-#ifdef RTM_SQL_STORAGE_ENABLED
+#if defined(WITH_RTM_SQL_STORAGE) && defined(WITH_LBU_DB)
        if (!db.dbctx) ret = load_notifs_file();
        else ret = load_notifs_sql();
 #else
@@ -1347,6 +1395,7 @@ void *notify_thread(void *thread_data) {
                goto exit;
        }
 
+#ifdef WITH_LBU_DB
        if (db_init(t, &t->dbctx) == 0)
                if ((t->dbcaps & GLITE_LBU_DB_CAP_PREPARED) != 0) {
                        if (glite_lbu_PrepareStmt(t->dbctx, "INSERT INTO " DBAMP RTM_DB_TABLE_JOBS DBAMP " "
@@ -1363,6 +1412,7 @@ void *notify_thread(void *thread_data) {
                                quit = RTM_QUIT_PRESERVE;
                        }
                }
+#endif
 
        //
        // notifications loop:
@@ -1685,11 +1735,13 @@ exit:
                }
                rtm_update_notif(t, NULL, 1);
        }
+#ifdef WITH_LBU_DB
        if (t->insertcmd) glite_lbu_FreeStmt(&t->insertcmd);
        if (t->updatecmd) glite_lbu_FreeStmt(&t->updatecmd);
        if (t->updatecmd_vo) glite_lbu_FreeStmt(&t->updatecmd_vo);
-       if (ctx) edg_wll_FreeContext(ctx);
        db_free(t, t->dbctx);
+#endif
+       if (ctx) edg_wll_FreeContext(ctx);
        lprintf(t, DBG, "thread ended");
        pthread_exit(NULL);
        return NULL;
@@ -1766,7 +1818,9 @@ void usage(const char *prog) {
                "       -c, --config       config file name (list of LB servers), precedence before " RTM_DB_TABLE_LBS " table\n"
                "       -n, --notifs       file for persistent information about active\n"
                "                          notifications\n"
+#ifdef WITH_LBU_DB
                "       -m, --pg           db connection string (user/pwd@server:dbname)\n"
+#endif
                "       -C, --cert         X509 certificate file\n"
                "       -K, --key          X509 key file\n"
                "       -o, --old          \"silly\" mode for old L&B 3.1 servers\n"
@@ -1918,10 +1972,12 @@ int config_load() {
        FILE *f;
        void *tmp;
        int i, n;
+#ifdef WITH_LBU_DB
        char *results[2];
-       int err = 0;
        char *result = NULL;
        glite_lbu_Statement stmt = NULL;
+       int err = 0;
+#endif
 
        if (config.config_file) {
                if ((f = fopen(config.config_file, "rt")) == NULL) {
@@ -1948,7 +2004,9 @@ int config_load() {
                }
 
                fclose(f);
-       } else if (db.dbctx) {
+       } else
+#ifdef WITH_LBU_DB
+       if (db.dbctx) {
                if ((err = glite_lbu_ExecSQL(db.dbctx, "SELECT COUNT(*) FROM " RTM_DB_TABLE_LBS, &stmt)) < 0 ||
                    (err = glite_lbu_FetchRow(stmt, 1, NULL, &result)) < 0) {
                        goto err;
@@ -1975,6 +2033,7 @@ int config_load() {
                if (err < 0) goto err;
                glite_lbu_FreeStmt(&stmt);
        }
+#endif
 
        if (INF <= config.debug) {
                lprintf(NULL, INF, "servers: %d", config.nservers);
@@ -1982,10 +2041,12 @@ int config_load() {
        }
 
        return 0;
+#ifdef WITH_LBU_DB
 err:
        if (err) lprintf_dbctx(NULL, ERR, "can't get LB servers");
        if (stmt) glite_lbu_FreeStmt(&stmt);
        if (result) free(result);
+#endif
        return 1;
 }
 
@@ -2177,6 +2238,7 @@ int main(int argn, char *argv[]) {
                goto quit_guard;
        }
 
+#ifdef WITH_LBU_DB
        // database
        switch(db_init(NULL, &db.dbctx)) {
        case 0:
@@ -2188,6 +2250,7 @@ int main(int argn, char *argv[]) {
                // error
                goto quit;
        }
+#endif
 
        // load configurations
        if (config_load()) goto quit;
@@ -2292,7 +2355,9 @@ quit:
                if (remove(config.pidfile) == -1) lprintf(NULL, WRN, "can't remove pidfile '%s': %s", config.pidfile, strerror(errno));
        }
 
+#ifdef WITH_LBU_DB
        db_free(NULL, db.dbctx);
+#endif
        edg_wll_FreeContext(ctx);
        db_free_notifs();
        config_free();