#include <string.h>
#include <stdlib.h>
#include <stdio.h>
+#include <time.h>
#include <glite/jp/db.h>
asprintf(&feedid, "feed://%lu", uniqueid + 3);
if (glite_jpis_initFeed(isctx, uniqueid, feedid, (time_t)10000) != 0) {
free(feedid);
- goto faildb;
+ goto failconf;
}
free(feedid);
- if (glite_jpis_unlockFeed(isctx, uniqueid) != 0) goto faildb;
+ if (glite_jpis_unlockFeed(isctx, uniqueid) != 0) goto failconf;
}
} while (ret == 0);
+ if (glite_jpis_tryReconnectFeed(isctx, uniqueid, time(NULL) + 10) != 0) goto failconf;
+
glite_jp_free_conf(conf);
glite_jpis_free_context(isctx);
glite_jp_free_context(jpctx);
conf = calloc(1, sizeof(*conf));
- conf->attrs = calloc(5, sizeof(*conf->attrs));
+ conf->attrs = calloc(8, sizeof(*conf->attrs));
conf->attrs[0] = strdup("owner");
conf->attrs[1] = strdup("status");
conf->attrs[2] = strdup("location");
conf->attrs[3] = strdup("jobid");
+ conf->attrs[4] = strdup("user");
+ conf->attrs[5] = strdup("tag");
+ conf->attrs[6] = strdup("host");
- conf->indexed_attrs = calloc(3, sizeof(*conf->indexed_attrs));
+ conf->indexed_attrs = calloc(4, sizeof(*conf->indexed_attrs));
conf->indexed_attrs[0] = strdup("owner");
- conf->indexed_attrs[1] = strdup("location");
+ conf->indexed_attrs[1] = strdup("user");
+ conf->indexed_attrs[2] = strdup("location");
// XXX: some plugin names should come here in future
conf->plugins = NULL;
conf->feeds = calloc(2, sizeof(*(conf->feeds)));
conf->feeds[0] = calloc(1, sizeof(**(conf->feeds)));
- conf->feeds[0]->PS_URL = strdup("http://localhost:8901");
+ conf->feeds[0]->PS_URL = strdup("http://umbar-test.ics.muni.cz:8901");
// all job since Epoche
conf->feeds[0]->query = calloc(2,sizeof(*conf->feeds[0]->query));
if ((ret = glite_jp_db_connect(jpctx, cs)) != 0) goto fail;
// sql command: select an uninitialized unlocked feed
+ glite_jp_db_create_params(&myparam, 1, GLITE_JP_DB_TYPE_DATETIME, &(*isctx)->param_expires);
glite_jp_db_create_results(&myres, 2,
GLITE_JP_DB_TYPE_INT, NULL, &((*isctx)->param_uniqueid),
GLITE_JP_DB_TYPE_VARCHAR, NULL, (*isctx)->param_ps, sizeof((*isctx)->param_ps), &(*isctx)->param_ps_len);
- if ((ret = glite_jp_db_prepare(jpctx, "SELECT uniqueid, source FROM feeds WHERE (locked=0) AND (feedid IS NULL)", &(*isctx)->select_unlocked_feed_stmt, NULL, myres)) != 0) goto fail_connect;
+ if ((ret = glite_jp_db_prepare(jpctx, "SELECT uniqueid, source FROM feeds WHERE (locked=0) AND (feedid IS NULL) AND ((state <> " GLITE_JP_IS_STATE_ERROR_STR ") OR (expires >= ?))", &(*isctx)->select_unlocked_feed_stmt, myparam, myres)) != 0) goto fail_connect;
// sql command: lock the feed (via uniqueid)
glite_jp_db_create_params(&myparam, 1, GLITE_JP_DB_TYPE_INT, &(*isctx)->param_uniqueid);
GLITE_JP_DB_TYPE_INT, &(*isctx)->param_uniqueid);
if ((ret = glite_jp_db_prepare(jpctx, "UPDATE feeds SET state=? WHERE (uniqueid=?)", &(*isctx)->update_state_feed_stmt, myparam, NULL)) != 0) goto fail_cmd5;
+ // sql command: set the error on feed
+ glite_jp_db_create_params(&myparam, 3,
+ GLITE_JP_DB_TYPE_INT, &(*isctx)->param_state,
+ GLITE_JP_DB_TYPE_DATETIME, &(*isctx)->param_expires,
+ GLITE_JP_DB_TYPE_INT, &(*isctx)->param_uniqueid);
+ if ((ret = glite_jp_db_prepare(jpctx, "UPDATE feeds SET state=?, expires=? WHERE (uniqueid=?)", &(*isctx)->update_error_feed_stmt, myparam, NULL)) != 0) goto fail_cmd6;
+
return 0;
+ glite_jp_db_freestmt(&(*isctx)->update_error_feed_stmt);
+fail_cmd6:
glite_jp_db_freestmt(&(*isctx)->update_state_feed_stmt);
fail_cmd5:
glite_jp_db_freestmt(&(*isctx)->select_info_feed_stmt);
void glite_jpis_free_context(glite_jpis_context_t ctx) {
+ glite_jp_db_freestmt(&ctx->update_error_feed_stmt);
glite_jp_db_freestmt(&ctx->select_unlocked_feed_stmt);
glite_jp_db_freestmt(&ctx->lock_feed_stmt);
glite_jp_db_freestmt(&ctx->init_feed_stmt);
{
int ret;
+ glite_jp_db_set_time(ctx->param_expires, time(NULL));
do {
switch (glite_jp_db_execute(ctx->select_unlocked_feed_stmt)) {
case -1: lprintf("error selecting unlocked feed\n"); return ENOLCK;
/* Saves TTL (when to reconnect if error occured) for given feed */
-void glite_jpis_tryReconnectFeed(glite_jpis_context_t ctx, long int uniqueid, time_t reconn_time) {
-
+int glite_jpis_tryReconnectFeed(glite_jpis_context_t ctx, long int uniqueid, time_t reconn_time) {
+ ctx->param_uniqueid = uniqueid;
+ ctx->param_state = GLITE_JP_IS_STATE_ERROR;
+ glite_jp_db_set_time(ctx->param_expires, reconn_time);
+ if (glite_jp_db_execute(ctx->update_error_feed_stmt) == -1) return ctx->jpctx->error->code;
+ return 0;
}
#define GLITE_JP_IS_STATE_HIST 1
#define GLITE_JP_IS_STATE_CONT 2
#define GLITE_JP_IS_STATE_DONE 4
+#define GLITE_JP_IS_STATE_ERROR 8
+#define GLITE_JP_IS_STATE_ERROR_STR "8"
typedef struct _glite_jpis_context {
glite_jp_context_t jpctx;
- glite_jp_db_stmt_t select_unlocked_feed_stmt, lock_feed_stmt, init_feed_stmt, unlock_feed_stmt, select_info_feed_stmt, update_state_feed_stmt;
+ glite_jp_db_stmt_t select_unlocked_feed_stmt, lock_feed_stmt, init_feed_stmt, unlock_feed_stmt, select_info_feed_stmt, update_state_feed_stmt, update_error_feed_stmt;
long int param_uniqueid, param_state;
char param_feedid[33], param_ps[256];
unsigned long param_ps_len, param_feedid_len;
int glite_jpis_lockUninitializedFeed(glite_jpis_context_t ctx, long int *uinqueid, char **PS_URL);
int glite_jpis_initFeed(glite_jpis_context_t ctx, long int uniqueid, char *feedId, time_t feedExpires);
int glite_jpis_unlockFeed(glite_jpis_context_t ctx, long int uniqueid);
-void glite_jpis_tryReconnectFeed(glite_jpis_context_t ctx, long int uniqueid, time_t reconn_time);
+int glite_jpis_tryReconnectFeed(glite_jpis_context_t ctx, long int uniqueid, time_t reconn_time);
int glite_jpis_insertAttrVal(glite_jpis_context_t ctx, const char *jobid, glite_jp_attrval_t *av);