From 6dea4fa17b0b4a9ae0a5ed0d3e859c6e0793ea41 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Franti=C5=A1ek=20Dvo=C5=99=C3=A1k?= Date: Wed, 18 Apr 2007 22:55:52 +0000 Subject: [PATCH] JP IS connections tuning: - refactor code to feed_caller() function - recall FeedIndex() several times immediately before giving up Switched feed SELECT to non-parametric SQL command (comparing of time as parameter seems not working). Fixes in DB layers. --- org.glite.jp.index/src/bones_server.c | 68 ++++++++++++++++++++++++----------- org.glite.jp.index/src/db_ops.c | 51 +++++++++++++++++--------- org.glite.jp.server-common/src/db.c | 2 ++ org.glite.lb-utils.db/src/db.c | 1 + 4 files changed, 85 insertions(+), 37 deletions(-) diff --git a/org.glite.jp.index/src/bones_server.c b/org.glite.jp.index/src/bones_server.c index fa483c1..55b487a 100644 --- a/org.glite.jp.index/src/bones_server.c +++ b/org.glite.jp.index/src/bones_server.c @@ -36,6 +36,7 @@ // # PS to conntact is << MAX_SLAVES_NUM #define RECONNECT_TIME 60*20 // when try reconnect to PS in case of error (in sec) +#define RECONNECT_TIME_QUICK 1 // time between feed requests extern SOAP_NMAC struct Namespace jp__namespaces[],jpps__namespaces[]; @@ -163,7 +164,7 @@ int main(int argc, char *argv[]) // add some slaves for user queries and PS responses conf->slaves = nfeeds + (USER_QUERY_SLAVES_NUM - 1); if (conf->slaves > MAX_SLAVES_NUM) conf->slaves = MAX_SLAVES_NUM; - } + } // // SUM(PS, feeds(PS) - slaves(PS)) slaves would be blocked // when waited for all PS @@ -176,7 +177,6 @@ int main(int argc, char *argv[]) fprintf(stderr, "WARNING: %d slaves can be too low for %d feeds\n", conf->slaves, nfeeds); } glite_srvbones_set_param(GLITE_SBPARAM_SLAVES_COUNT, conf->slaves); - glite_srvbones_run(data_init,&stab,1 /* XXX: entries in stab */,debug); glite_jpis_free_db(isctx); @@ -187,12 +187,50 @@ int main(int argc, char *argv[]) return 0; } + +/* looking for some feed in DB */ +int feed_caller(glite_jpis_context_t isctx, glite_jp_is_conf *conf) { + char *PS_URL; + long int uniqueid; + int i; + + // dirty hack - try quicker several times first + glite_jp_clear_error(isctx->jpctx); + switch (glite_jpis_lockUninitializedFeed(isctx,&uniqueid,&PS_URL)) { + case 0: + for (i = 0; i < 10; i++) { + // contact PS server, ask for data, save + // feedId and expiration to DB and unlock feed + if (MyFeedIndex(isctx, conf, uniqueid, PS_URL) != 0) { + // error when connecting to PS + printf("[%d] %s: %s (%s), reconnecting later\n", getpid(), __FUNCTION__, isctx->jpctx->error->desc, isctx->jpctx->error->source); + if (i + 1 < 10) glite_jpis_unlockFeed(isctx, uniqueid); + else glite_jpis_tryReconnectFeed(isctx, uniqueid, time(NULL) + RECONNECT_TIME); + } else { + free(PS_URL); + break; + } + } + sleep(RECONNECT_TIME_QUICK); + + return 1; + case ENOENT: + // no more feeds to initialize + return 0; + default: + // error during locking + printf("[%d] %s: Locking error: ", getpid(), __FUNCTION__); + if (isctx->jpctx->error) printf("%s (%d)\n", isctx->jpctx->error->desc, isctx->jpctx->error->code); + else printf("\n"); + return -1; + } +} + + /* slave's init comes here */ int data_init(void **data) { slave_data_t *private; - long int uniqueid; - char *PS_URL = NULL; private = calloc(sizeof(*private), 1); glite_jpis_init_context(&private->ctx, ctx, conf); @@ -201,32 +239,21 @@ int data_init(void **data) return -1; } - private->soap = soap_new(); printf("[%d] slave started\n",getpid()); + private->soap = soap_new(); /* ask PS server for data */ do { - switch (glite_jpis_lockUninitializedFeed(private->ctx,&uniqueid,&PS_URL)) { - case 0: - // contact PS server, ask for data, save feedId and expiration - // to DB and unlock feed - if (MyFeedIndex(private->ctx, conf, uniqueid, PS_URL) != 0) { - printf("[%d] slave_init(): %s (%s), reconnecting later\n", getpid(), ctx->error->desc, ctx->error->source); - // error when connecting to PS - glite_jpis_tryReconnectFeed(private->ctx, uniqueid, - time(NULL) + RECONNECT_TIME); - } - free(PS_URL); - PS_URL = NULL; + switch (feed_caller(private->ctx, conf)) { + case 1: + // one feed handled break; - case ENOENT: + case 0: // no more feeds to initialize *data = (void *) private; return 0; default: // error during locking - printf("[%d] slave_init(): Locking error.\n",getpid()); - free(PS_URL); glite_jpis_free_db(private->ctx); glite_jpis_free_context(private->ctx); return -1; @@ -234,6 +261,7 @@ int data_init(void **data) } while (1); } + int newconn(int conn,struct timeval *to,void *data) { slave_data_t *private = (slave_data_t *)data; diff --git a/org.glite.jp.index/src/db_ops.c b/org.glite.jp.index/src/db_ops.c index e6bc5b8..48a6bfb 100644 --- a/org.glite.jp.index/src/db_ops.c +++ b/org.glite.jp.index/src/db_ops.c @@ -428,13 +428,6 @@ int glite_jpis_init_db(glite_jpis_context_t isctx) { if ((cs = isctx->conf->cs) == NULL) cs = GLITE_JP_IS_DEFAULTCS; if ((ret = glite_jp_db_connect(jpctx, cs)) != 0) goto fail; - // sql command: select an uninitialized unlocked feed - glite_jp_db_create_params(&myparam, 1, GLITE_JP_DB_TYPE_DATETIME, &isctx->param_expires); - glite_jp_db_create_results(&myres, 2, - GLITE_JP_DB_TYPE_INT, NULL, &(isctx->param_uniqueid), - GLITE_JP_DB_TYPE_VARCHAR, NULL, isctx->param_ps, sizeof(isctx->param_ps), &isctx->param_ps_len); - if ((ret = glite_jp_db_prepare(jpctx, "SELECT uniqueid, source FROM feeds WHERE (locked=0) AND (feedid IS NULL) AND ((state < " GLITE_JP_IS_STATE_ERROR_STR ") OR (expires <= ?))", &isctx->select_unlocked_feed_stmt, myparam, myres)) != 0) goto fail; - // sql command: lock the feed (via uniqueid) glite_jp_db_create_params(&myparam, 1, GLITE_JP_DB_TYPE_INT, &isctx->param_uniqueid); if ((ret = glite_jp_db_prepare(jpctx, "UPDATE feeds SET locked=1 WHERE (locked = 0) AND (uniqueid = ?)", &isctx->lock_feed_stmt, myparam, NULL)) != 0) goto fail; @@ -509,7 +502,6 @@ fail: void glite_jpis_free_db(glite_jpis_context_t ctx) { - glite_jp_db_freestmt(&ctx->select_unlocked_feed_stmt); glite_jp_db_freestmt(&ctx->lock_feed_stmt); glite_jp_db_freestmt(&ctx->init_feed_stmt); glite_jp_db_freestmt(&ctx->unlock_feed_stmt); @@ -535,21 +527,46 @@ void glite_jpis_free_db(glite_jpis_context_t ctx) { int glite_jpis_lockUninitializedFeed(glite_jpis_context_t ctx, long int *uniqueid, char **PS_URL) { int ret; - time_t now; + static int uninit_msg = 1; + char *sql, *res[1], *t; + glite_jp_db_stmt_t stmt; - now = time(NULL); - glite_jp_db_set_time(ctx->param_expires, now); do { - switch (glite_jp_db_execute(ctx->select_unlocked_feed_stmt)) { - case -1: lprintf("error selecting unlocked feed\n"); return ENOLCK; - case 0: lprintf("no more uninit. feeds unlocked\n"); return ENOENT; + t = glite_jp_db_timetodb(time(NULL)); + trio_asprintf(&sql, "SELECT uniqueid, source FROM feeds WHERE (locked=0) AND (feedid IS NULL) AND ((state < " GLITE_JP_IS_STATE_ERROR_STR ") OR (expires <= %s))", t); + free(t); + ret = glite_jp_db_execstmt(ctx->jpctx, sql, &stmt); + free(sql); + switch (ret) { + case -1: + lprintf("error selecting unlocked feed\n"); + uninit_msg = 1; + glite_jp_db_freestmt(&stmt); + return ENOLCK; + case 0: + if (uninit_msg) { + lprintf("no more uninit. feeds unlocked\n"); + uninit_msg = 0; + } + glite_jp_db_freestmt(&stmt); + return ENOENT; default: break; } - if (glite_jp_db_fetch(ctx->select_unlocked_feed_stmt) != 0) return ENOLCK; - lprintf("selected uninit. feed %ld\n", ctx->param_uniqueid); + uninit_msg = 1; + if (glite_jp_db_fetchrow(stmt, res) <= 0) { + lprintf("error fetching unlocked feed\n"); + glite_jp_db_freestmt(&stmt); + return ENOLCK; + } + glite_jp_db_freestmt(&stmt); + ctx->param_uniqueid = atol(res[0]); + strncpy(ctx->param_ps, res[1], sizeof ctx->param_ps); + lprintf("selected uninit. feed, uniqueid='%s'\n", res[0]); + free(res[0]); + free(res[1]); ret = glite_jp_db_execute(ctx->lock_feed_stmt); - lprintf("locked %d feeds (uniqueid=%ld, time=%ld)\n", ret, ctx->param_uniqueid, now); + lprintf("locked %d feeds (uniqueid=%ld)\n", ret, ctx->param_uniqueid); } while (ret != 1); *uniqueid = ctx->param_uniqueid; diff --git a/org.glite.jp.server-common/src/db.c b/org.glite.jp.server-common/src/db.c index 380daa3..5f1b9f4 100644 --- a/org.glite.jp.server-common/src/db.c +++ b/org.glite.jp.server-common/src/db.c @@ -525,6 +525,7 @@ void glite_jp_db_set_time(void *buffer, const time_t time) { mybuffer = (MYSQL_TIME *)buffer; gmtime_r(&time, &tm); + memset(mybuffer, 0, sizeof *mybuffer); mybuffer->year = tm.tm_year + 1900; mybuffer->month = tm.tm_mon + 1; mybuffer->day = tm.tm_mday; @@ -540,6 +541,7 @@ time_t glite_jp_db_get_time(const void *buffer) { mybuffer = (MYSQL_TIME *)buffer; memset(&tm, 0, sizeof(tm)); + setenv("TZ","UTC",1); tzset(); tm.tm_year = mybuffer->year - 1900; tm.tm_mon = mybuffer->month - 1; tm.tm_mday = mybuffer->day; diff --git a/org.glite.lb-utils.db/src/db.c b/org.glite.lb-utils.db/src/db.c index a8adc57..ba54dfb 100644 --- a/org.glite.lb-utils.db/src/db.c +++ b/org.glite.lb-utils.db/src/db.c @@ -1012,6 +1012,7 @@ void set_time(MYSQL_TIME *mtime, const time_t time) { struct tm tm; gmtime_r(&time, &tm); + memset(mtime, 0, sizeof *mtime); mtime->year = tm.tm_year + 1900; mtime->month = tm.tm_mon + 1; mtime->day = tm.tm_mday; -- 1.8.2.3