JP IS connections tuning:
authorFrantišek Dvořák <valtri@civ.zcu.cz>
Wed, 18 Apr 2007 22:55:52 +0000 (22:55 +0000)
committerFrantišek Dvořák <valtri@civ.zcu.cz>
Wed, 18 Apr 2007 22:55:52 +0000 (22:55 +0000)
- refactor code to feed_caller() function
- recall FeedIndex() several times immediately before giving up
Switched feed SELECT to non-parametric SQL command (comparing of time as
parameter seems not working).
Fixes in DB layers.

org.glite.jp.index/src/bones_server.c
org.glite.jp.index/src/db_ops.c
org.glite.jp.server-common/src/db.c
org.glite.lb-utils.db/src/db.c

index fa483c1..55b487a 100644 (file)
@@ -36,6 +36,7 @@
                                        // # PS to conntact is << MAX_SLAVES_NUM
 
 #define RECONNECT_TIME         60*20   // when try reconnect to PS in case of error (in sec)
+#define RECONNECT_TIME_QUICK   1       // time between feed requests
 
 
 extern SOAP_NMAC struct Namespace jp__namespaces[],jpps__namespaces[];
@@ -163,7 +164,7 @@ int main(int argc, char *argv[])
                // add some slaves for user queries and PS responses
                conf->slaves = nfeeds + (USER_QUERY_SLAVES_NUM - 1);  
                if (conf->slaves > MAX_SLAVES_NUM) conf->slaves = MAX_SLAVES_NUM;
-       }
+       }
        //
        // SUM(PS, feeds(PS) - slaves(PS)) slaves would be blocked
        // when waited for all PS
@@ -176,7 +177,6 @@ int main(int argc, char *argv[])
                fprintf(stderr, "WARNING: %d slaves can be too low for %d feeds\n", conf->slaves, nfeeds);
        }
        glite_srvbones_set_param(GLITE_SBPARAM_SLAVES_COUNT, conf->slaves);
        glite_srvbones_run(data_init,&stab,1 /* XXX: entries in stab */,debug);
 
        glite_jpis_free_db(isctx);
@@ -187,12 +187,50 @@ int main(int argc, char *argv[])
        return 0;
 }
 
+
+/* looking for some feed in DB */
+int feed_caller(glite_jpis_context_t isctx, glite_jp_is_conf *conf) {
+       char *PS_URL;
+       long int uniqueid;
+       int i;
+
+       // dirty hack - try quicker several times first
+       glite_jp_clear_error(isctx->jpctx);
+       switch (glite_jpis_lockUninitializedFeed(isctx,&uniqueid,&PS_URL)) {
+               case 0: 
+                       for (i = 0; i < 10; i++) {
+                               // contact PS server, ask for data, save
+                               // feedId and expiration to DB and unlock feed
+                               if (MyFeedIndex(isctx, conf, uniqueid, PS_URL) != 0) {
+                                       // error when connecting to PS
+                                       printf("[%d] %s: %s (%s), reconnecting later\n", getpid(), __FUNCTION__, isctx->jpctx->error->desc, isctx->jpctx->error->source);
+                                       if (i + 1 < 10) glite_jpis_unlockFeed(isctx, uniqueid);
+                                       else glite_jpis_tryReconnectFeed(isctx, uniqueid, time(NULL) + RECONNECT_TIME);
+                               } else {
+                                       free(PS_URL);
+                                       break;
+                               }
+                       }
+                       sleep(RECONNECT_TIME_QUICK);
+
+                       return 1;
+               case ENOENT:
+                       // no more feeds to initialize
+                       return 0;
+               default:
+                       // error during locking
+                       printf("[%d] %s: Locking error: ", getpid(), __FUNCTION__);
+                       if (isctx->jpctx->error) printf("%s (%d)\n", isctx->jpctx->error->desc, isctx->jpctx->error->code);
+                       else printf("\n");
+                       return -1;
+       }
+}
+
+
 /* slave's init comes here */  
 int data_init(void **data)
 {
        slave_data_t    *private;
-       long int        uniqueid;
-       char            *PS_URL = NULL;
 
        private = calloc(sizeof(*private), 1);
        glite_jpis_init_context(&private->ctx, ctx, conf);
@@ -201,32 +239,21 @@ int data_init(void **data)
                return -1;
        }
 
-       private->soap = soap_new();
        printf("[%d] slave started\n",getpid());
+       private->soap = soap_new();
 
        /* ask PS server for data */
        do {
-               switch (glite_jpis_lockUninitializedFeed(private->ctx,&uniqueid,&PS_URL)) {
-                       case 0:
-                               // contact PS server, ask for data, save feedId and expiration
-                               // to DB and unlock feed
-                               if (MyFeedIndex(private->ctx, conf, uniqueid, PS_URL) != 0) {
-                                       printf("[%d] slave_init(): %s (%s), reconnecting later\n", getpid(), ctx->error->desc, ctx->error->source);
-                                       // error when connecting to PS
-                                       glite_jpis_tryReconnectFeed(private->ctx, uniqueid,
-                                               time(NULL) + RECONNECT_TIME);
-                               }
-                               free(PS_URL);
-                               PS_URL = NULL;
+               switch (feed_caller(private->ctx, conf)) {
+                       case 1:
+                               // one feed handled
                                break;
-                       case ENOENT:
+                       case 0:
                                // no more feeds to initialize
                                *data = (void *) private;
                                return 0;
                        default:
                                // error during locking
-                               printf("[%d] slave_init(): Locking error.\n",getpid());
-                               free(PS_URL);
                                glite_jpis_free_db(private->ctx);
                                glite_jpis_free_context(private->ctx);
                                return -1;
@@ -234,6 +261,7 @@ int data_init(void **data)
        } while (1);
 }
 
+
 int newconn(int conn,struct timeval *to,void *data)
 {
        slave_data_t     *private = (slave_data_t *)data;
index e6bc5b8..48a6bfb 100644 (file)
@@ -428,13 +428,6 @@ int glite_jpis_init_db(glite_jpis_context_t isctx) {
        if ((cs = isctx->conf->cs) == NULL) cs = GLITE_JP_IS_DEFAULTCS;
        if ((ret = glite_jp_db_connect(jpctx, cs)) != 0) goto fail;
 
-       // sql command: select an uninitialized unlocked feed
-       glite_jp_db_create_params(&myparam, 1, GLITE_JP_DB_TYPE_DATETIME, &isctx->param_expires);
-       glite_jp_db_create_results(&myres, 2,
-               GLITE_JP_DB_TYPE_INT, NULL, &(isctx->param_uniqueid),
-               GLITE_JP_DB_TYPE_VARCHAR, NULL, isctx->param_ps, sizeof(isctx->param_ps), &isctx->param_ps_len);
-       if ((ret = glite_jp_db_prepare(jpctx, "SELECT uniqueid, source FROM feeds WHERE (locked=0) AND (feedid IS NULL) AND ((state < " GLITE_JP_IS_STATE_ERROR_STR ") OR (expires <= ?))", &isctx->select_unlocked_feed_stmt, myparam, myres)) != 0) goto fail;
-
        // sql command: lock the feed (via uniqueid)
        glite_jp_db_create_params(&myparam, 1, GLITE_JP_DB_TYPE_INT, &isctx->param_uniqueid);
        if ((ret = glite_jp_db_prepare(jpctx, "UPDATE feeds SET locked=1 WHERE (locked = 0) AND (uniqueid = ?)", &isctx->lock_feed_stmt, myparam, NULL)) != 0) goto fail;
@@ -509,7 +502,6 @@ fail:
 
 
 void glite_jpis_free_db(glite_jpis_context_t ctx) {
-       glite_jp_db_freestmt(&ctx->select_unlocked_feed_stmt);
        glite_jp_db_freestmt(&ctx->lock_feed_stmt);
        glite_jp_db_freestmt(&ctx->init_feed_stmt);
        glite_jp_db_freestmt(&ctx->unlock_feed_stmt);
@@ -535,21 +527,46 @@ void glite_jpis_free_db(glite_jpis_context_t ctx) {
 int glite_jpis_lockUninitializedFeed(glite_jpis_context_t ctx, long int *uniqueid, char **PS_URL)
 {
        int ret;
-       time_t now;
+       static int uninit_msg = 1;
+       char *sql, *res[1], *t;
+       glite_jp_db_stmt_t stmt;
 
-       now = time(NULL);
-       glite_jp_db_set_time(ctx->param_expires, now);
        do {
-               switch (glite_jp_db_execute(ctx->select_unlocked_feed_stmt)) {
-               case -1: lprintf("error selecting unlocked feed\n"); return ENOLCK;
-               case 0: lprintf("no more uninit. feeds unlocked\n"); return ENOENT;
+               t = glite_jp_db_timetodb(time(NULL));
+               trio_asprintf(&sql, "SELECT uniqueid, source FROM feeds WHERE (locked=0) AND (feedid IS NULL) AND ((state < " GLITE_JP_IS_STATE_ERROR_STR ") OR (expires <= %s))", t);
+               free(t);
+               ret = glite_jp_db_execstmt(ctx->jpctx, sql, &stmt);
+               free(sql);
+               switch (ret) {
+               case -1:
+                       lprintf("error selecting unlocked feed\n");
+                       uninit_msg = 1;
+                       glite_jp_db_freestmt(&stmt);
+                       return ENOLCK;
+               case 0:
+                       if (uninit_msg) {
+                               lprintf("no more uninit. feeds unlocked\n");
+                               uninit_msg = 0;
+                       }
+                       glite_jp_db_freestmt(&stmt);
+                       return ENOENT;
                default: break;
                }
-               if (glite_jp_db_fetch(ctx->select_unlocked_feed_stmt) != 0) return ENOLCK;
-               lprintf("selected uninit. feed %ld\n", ctx->param_uniqueid);
+               uninit_msg = 1;
+               if (glite_jp_db_fetchrow(stmt, res) <= 0) {
+                       lprintf("error fetching unlocked feed\n");
+                       glite_jp_db_freestmt(&stmt);
+                       return ENOLCK;
+               }
+               glite_jp_db_freestmt(&stmt);
+               ctx->param_uniqueid = atol(res[0]);
+               strncpy(ctx->param_ps, res[1], sizeof ctx->param_ps);
+               lprintf("selected uninit. feed, uniqueid='%s'\n", res[0]);
+               free(res[0]);
+               free(res[1]);
 
                ret = glite_jp_db_execute(ctx->lock_feed_stmt);
-               lprintf("locked %d feeds (uniqueid=%ld, time=%ld)\n", ret, ctx->param_uniqueid, now);
+               lprintf("locked %d feeds (uniqueid=%ld)\n", ret, ctx->param_uniqueid);
        } while (ret != 1);
 
        *uniqueid = ctx->param_uniqueid;
index 380daa3..5f1b9f4 100644 (file)
@@ -525,6 +525,7 @@ void glite_jp_db_set_time(void *buffer, const time_t time) {
 
        mybuffer = (MYSQL_TIME *)buffer;
        gmtime_r(&time, &tm);
+       memset(mybuffer, 0, sizeof *mybuffer);
        mybuffer->year = tm.tm_year + 1900;
        mybuffer->month = tm.tm_mon + 1;
        mybuffer->day = tm.tm_mday;
@@ -540,6 +541,7 @@ time_t glite_jp_db_get_time(const void *buffer) {
 
        mybuffer = (MYSQL_TIME *)buffer;
        memset(&tm, 0, sizeof(tm));
+       setenv("TZ","UTC",1); tzset();
        tm.tm_year = mybuffer->year - 1900;
        tm.tm_mon = mybuffer->month - 1;
        tm.tm_mday = mybuffer->day;
index a8adc57..ba54dfb 100644 (file)
@@ -1012,6 +1012,7 @@ void set_time(MYSQL_TIME *mtime, const time_t time) {
        struct tm tm;
 
        gmtime_r(&time, &tm);
+       memset(mtime, 0, sizeof *mtime);
        mtime->year = tm.tm_year + 1900;
        mtime->month = tm.tm_mon + 1;
        mtime->day = tm.tm_mday;