// # PS to conntact is << MAX_SLAVES_NUM
#define RECONNECT_TIME 60*20 // when try reconnect to PS in case of error (in sec)
+#define RECONNECT_TIME_QUICK 1 // time between feed requests
extern SOAP_NMAC struct Namespace jp__namespaces[],jpps__namespaces[];
// add some slaves for user queries and PS responses
conf->slaves = nfeeds + (USER_QUERY_SLAVES_NUM - 1);
if (conf->slaves > MAX_SLAVES_NUM) conf->slaves = MAX_SLAVES_NUM;
- }
+ }
//
// SUM(PS, feeds(PS) - slaves(PS)) slaves would be blocked
// when waited for all PS
fprintf(stderr, "WARNING: %d slaves can be too low for %d feeds\n", conf->slaves, nfeeds);
}
glite_srvbones_set_param(GLITE_SBPARAM_SLAVES_COUNT, conf->slaves);
-
glite_srvbones_run(data_init,&stab,1 /* XXX: entries in stab */,debug);
glite_jpis_free_db(isctx);
return 0;
}
+
+/* looking for some feed in DB */
+int feed_caller(glite_jpis_context_t isctx, glite_jp_is_conf *conf) {
+ char *PS_URL;
+ long int uniqueid;
+ int i;
+
+ // dirty hack - try quicker several times first
+ glite_jp_clear_error(isctx->jpctx);
+ switch (glite_jpis_lockUninitializedFeed(isctx,&uniqueid,&PS_URL)) {
+ case 0:
+ for (i = 0; i < 10; i++) {
+ // contact PS server, ask for data, save
+ // feedId and expiration to DB and unlock feed
+ if (MyFeedIndex(isctx, conf, uniqueid, PS_URL) != 0) {
+ // error when connecting to PS
+ printf("[%d] %s: %s (%s), reconnecting later\n", getpid(), __FUNCTION__, isctx->jpctx->error->desc, isctx->jpctx->error->source);
+ if (i + 1 < 10) glite_jpis_unlockFeed(isctx, uniqueid);
+ else glite_jpis_tryReconnectFeed(isctx, uniqueid, time(NULL) + RECONNECT_TIME);
+ } else {
+ free(PS_URL);
+ break;
+ }
+ }
+ sleep(RECONNECT_TIME_QUICK);
+
+ return 1;
+ case ENOENT:
+ // no more feeds to initialize
+ return 0;
+ default:
+ // error during locking
+ printf("[%d] %s: Locking error: ", getpid(), __FUNCTION__);
+ if (isctx->jpctx->error) printf("%s (%d)\n", isctx->jpctx->error->desc, isctx->jpctx->error->code);
+ else printf("\n");
+ return -1;
+ }
+}
+
+
/* slave's init comes here */
int data_init(void **data)
{
slave_data_t *private;
- long int uniqueid;
- char *PS_URL = NULL;
private = calloc(sizeof(*private), 1);
glite_jpis_init_context(&private->ctx, ctx, conf);
return -1;
}
- private->soap = soap_new();
printf("[%d] slave started\n",getpid());
+ private->soap = soap_new();
/* ask PS server for data */
do {
- switch (glite_jpis_lockUninitializedFeed(private->ctx,&uniqueid,&PS_URL)) {
- case 0:
- // contact PS server, ask for data, save feedId and expiration
- // to DB and unlock feed
- if (MyFeedIndex(private->ctx, conf, uniqueid, PS_URL) != 0) {
- printf("[%d] slave_init(): %s (%s), reconnecting later\n", getpid(), ctx->error->desc, ctx->error->source);
- // error when connecting to PS
- glite_jpis_tryReconnectFeed(private->ctx, uniqueid,
- time(NULL) + RECONNECT_TIME);
- }
- free(PS_URL);
- PS_URL = NULL;
+ switch (feed_caller(private->ctx, conf)) {
+ case 1:
+ // one feed handled
break;
- case ENOENT:
+ case 0:
// no more feeds to initialize
*data = (void *) private;
return 0;
default:
// error during locking
- printf("[%d] slave_init(): Locking error.\n",getpid());
- free(PS_URL);
glite_jpis_free_db(private->ctx);
glite_jpis_free_context(private->ctx);
return -1;
} while (1);
}
+
int newconn(int conn,struct timeval *to,void *data)
{
slave_data_t *private = (slave_data_t *)data;
if ((cs = isctx->conf->cs) == NULL) cs = GLITE_JP_IS_DEFAULTCS;
if ((ret = glite_jp_db_connect(jpctx, cs)) != 0) goto fail;
- // sql command: select an uninitialized unlocked feed
- glite_jp_db_create_params(&myparam, 1, GLITE_JP_DB_TYPE_DATETIME, &isctx->param_expires);
- glite_jp_db_create_results(&myres, 2,
- GLITE_JP_DB_TYPE_INT, NULL, &(isctx->param_uniqueid),
- GLITE_JP_DB_TYPE_VARCHAR, NULL, isctx->param_ps, sizeof(isctx->param_ps), &isctx->param_ps_len);
- if ((ret = glite_jp_db_prepare(jpctx, "SELECT uniqueid, source FROM feeds WHERE (locked=0) AND (feedid IS NULL) AND ((state < " GLITE_JP_IS_STATE_ERROR_STR ") OR (expires <= ?))", &isctx->select_unlocked_feed_stmt, myparam, myres)) != 0) goto fail;
-
// sql command: lock the feed (via uniqueid)
glite_jp_db_create_params(&myparam, 1, GLITE_JP_DB_TYPE_INT, &isctx->param_uniqueid);
if ((ret = glite_jp_db_prepare(jpctx, "UPDATE feeds SET locked=1 WHERE (locked = 0) AND (uniqueid = ?)", &isctx->lock_feed_stmt, myparam, NULL)) != 0) goto fail;
void glite_jpis_free_db(glite_jpis_context_t ctx) {
- glite_jp_db_freestmt(&ctx->select_unlocked_feed_stmt);
glite_jp_db_freestmt(&ctx->lock_feed_stmt);
glite_jp_db_freestmt(&ctx->init_feed_stmt);
glite_jp_db_freestmt(&ctx->unlock_feed_stmt);
int glite_jpis_lockUninitializedFeed(glite_jpis_context_t ctx, long int *uniqueid, char **PS_URL)
{
int ret;
- time_t now;
+ static int uninit_msg = 1;
+ char *sql, *res[1], *t;
+ glite_jp_db_stmt_t stmt;
- now = time(NULL);
- glite_jp_db_set_time(ctx->param_expires, now);
do {
- switch (glite_jp_db_execute(ctx->select_unlocked_feed_stmt)) {
- case -1: lprintf("error selecting unlocked feed\n"); return ENOLCK;
- case 0: lprintf("no more uninit. feeds unlocked\n"); return ENOENT;
+ t = glite_jp_db_timetodb(time(NULL));
+ trio_asprintf(&sql, "SELECT uniqueid, source FROM feeds WHERE (locked=0) AND (feedid IS NULL) AND ((state < " GLITE_JP_IS_STATE_ERROR_STR ") OR (expires <= %s))", t);
+ free(t);
+ ret = glite_jp_db_execstmt(ctx->jpctx, sql, &stmt);
+ free(sql);
+ switch (ret) {
+ case -1:
+ lprintf("error selecting unlocked feed\n");
+ uninit_msg = 1;
+ glite_jp_db_freestmt(&stmt);
+ return ENOLCK;
+ case 0:
+ if (uninit_msg) {
+ lprintf("no more uninit. feeds unlocked\n");
+ uninit_msg = 0;
+ }
+ glite_jp_db_freestmt(&stmt);
+ return ENOENT;
default: break;
}
- if (glite_jp_db_fetch(ctx->select_unlocked_feed_stmt) != 0) return ENOLCK;
- lprintf("selected uninit. feed %ld\n", ctx->param_uniqueid);
+ uninit_msg = 1;
+ if (glite_jp_db_fetchrow(stmt, res) <= 0) {
+ lprintf("error fetching unlocked feed\n");
+ glite_jp_db_freestmt(&stmt);
+ return ENOLCK;
+ }
+ glite_jp_db_freestmt(&stmt);
+ ctx->param_uniqueid = atol(res[0]);
+ strncpy(ctx->param_ps, res[1], sizeof ctx->param_ps);
+ lprintf("selected uninit. feed, uniqueid='%s'\n", res[0]);
+ free(res[0]);
+ free(res[1]);
ret = glite_jp_db_execute(ctx->lock_feed_stmt);
- lprintf("locked %d feeds (uniqueid=%ld, time=%ld)\n", ret, ctx->param_uniqueid, now);
+ lprintf("locked %d feeds (uniqueid=%ld)\n", ret, ctx->param_uniqueid);
} while (ret != 1);
*uniqueid = ctx->param_uniqueid;
mybuffer = (MYSQL_TIME *)buffer;
gmtime_r(&time, &tm);
+ memset(mybuffer, 0, sizeof *mybuffer);
mybuffer->year = tm.tm_year + 1900;
mybuffer->month = tm.tm_mon + 1;
mybuffer->day = tm.tm_mday;
mybuffer = (MYSQL_TIME *)buffer;
memset(&tm, 0, sizeof(tm));
+ setenv("TZ","UTC",1); tzset();
tm.tm_year = mybuffer->year - 1900;
tm.tm_mon = mybuffer->month - 1;
tm.tm_mday = mybuffer->day;
struct tm tm;
gmtime_r(&time, &tm);
+ memset(mtime, 0, sizeof *mtime);
mtime->year = tm.tm_year + 1900;
mtime->month = tm.tm_mon + 1;
mtime->day = tm.tm_mday;