#include <unistd.h>
#include <time.h>
-#include "glite/jp/types.h"
-#include "glite/jp/context.h"
+#include <glite/jp/types.h>
+#include <glite/jp/context.h>
-#include "glite/lb/srvbones.h"
-#include "glite/security/glite_gss.h"
+#include <glite/lb/srvbones.h>
+#include <glite/security/glite_gss.h>
#include <stdsoap2.h>
-#include "glite/security/glite_gsplugin.h"
+#include <glite/security/glite_gsplugin.h>
#include "conf.h"
#include "db_ops.h"
extern SOAP_NMAC struct Namespace namespaces[] = { {NULL,NULL} };
// namespaces[] not used here, but need to prevent linker to complain...
-extern void MyFeedIndex(glite_jpis_context_t ctx, glite_jp_is_conf *conf, long int uniqueid, char *dest);
-
static int newconn(int,struct timeval *,void *);
static int request(int,struct timeval *,void *);
static int reject(int);
static int data_init(void **data)
{
slave_data_t *private;
- char *PS_URL = NULL;
long int uniqueid;
+ char *PS_URL = NULL;
private = calloc(sizeof(*private), 1);
glite_jpis_init_context(&private->ctx, ctx, conf);
/* ask PS server for data */
do {
switch (glite_jpis_lockUninitializedFeed(private->ctx,&uniqueid,&PS_URL)) {
+ case 0:
+ // contact PS server, ask for data, save feedId and expiration
+ // to DB and unlock feed
+ if (MyFeedIndex(private->ctx, conf, uniqueid, PS_URL) != 0) {
+ printf("[%d] slave_init(): %s (%s), reconnecting later\n", getpid(), ctx->error->desc, ctx->error->source);
+ // error when connecting to PS
+ glite_jpis_tryReconnectFeed(private->ctx, uniqueid,
+ time(NULL) + RECONNECT_TIME);
+ }
+ free(PS_URL);
+ PS_URL = NULL;
+ break;
case ENOENT:
// no more feeds to initialize
return 0;
- case ENOLCK:
+ default:
// error during locking
printf("[%d] slave_init(): Locking error.\n",getpid());
free(PS_URL);
glite_jpis_free_db(private->ctx);
glite_jpis_free_context(private->ctx);
return -1;
- case ENOTCONN:
- // error when connecting to PS
- glite_jpis_tryReconnectFeed(private->ctx, uniqueid,
- time(NULL) + RECONNECT_TIME);
- break;
- default:
- // contact PS server, ask for data, save feedId and expiration
- // to DB and unlock feed
- MyFeedIndex(private->ctx, conf, uniqueid, PS_URL);
- free(PS_URL);
- PS_URL = NULL;
- break;
}
} while (1);
}
glite_jp_db_create_results(&myres, 2,
GLITE_JP_DB_TYPE_INT, NULL, &(isctx->param_uniqueid),
GLITE_JP_DB_TYPE_VARCHAR, NULL, isctx->param_ps, sizeof(isctx->param_ps), &isctx->param_ps_len);
- if ((ret = glite_jp_db_prepare(jpctx, "SELECT uniqueid, source FROM feeds WHERE (locked=0) AND (feedid IS NULL) AND ((state <> " GLITE_JP_IS_STATE_ERROR_STR ") OR (expires >= ?))", &isctx->select_unlocked_feed_stmt, myparam, myres)) != 0) goto fail;
+ if ((ret = glite_jp_db_prepare(jpctx, "SELECT uniqueid, source FROM feeds WHERE (locked=0) AND (feedid IS NULL) AND ((state <> " GLITE_JP_IS_STATE_ERROR_STR ") OR (expires <= ?))", &isctx->select_unlocked_feed_stmt, myparam, myres)) != 0) goto fail;
// sql command: lock the feed (via uniqueid)
glite_jp_db_create_params(&myparam, 1, GLITE_JP_DB_TYPE_INT, &isctx->param_uniqueid);
int glite_jpis_lockUninitializedFeed(glite_jpis_context_t ctx, long int *uniqueid, char **PS_URL)
{
int ret;
+ time_t now;
- glite_jp_db_set_time(ctx->param_expires, time(NULL));
+ now = time(NULL);
+ glite_jp_db_set_time(ctx->param_expires, now);
do {
switch (glite_jp_db_execute(ctx->select_unlocked_feed_stmt)) {
case -1: lprintf("error selecting unlocked feed\n"); return ENOLCK;
default: break;
}
if (glite_jp_db_fetch(ctx->select_unlocked_feed_stmt) != 0) return ENOLCK;
- lprintf("selected uninit. feed %lu\n", ctx->param_uniqueid);
+ lprintf("selected uninit. feed %ld\n", ctx->param_uniqueid);
ret = glite_jp_db_execute(ctx->lock_feed_stmt);
- lprintf("locked %d feeds (uniqueid=%lu)\n", ret, ctx->param_uniqueid);
+ lprintf("locked %d feeds (uniqueid=%ld, time=%ld)\n", ret, ctx->param_uniqueid, now);
} while (ret != 1);
*uniqueid = ctx->param_uniqueid;
/* Saves TTL (when to reconnect if error occured) for given feed */
int glite_jpis_tryReconnectFeed(glite_jpis_context_t ctx, long int uniqueid, time_t reconn_time) {
+ lprintf("reconnect, un=%ld, %ld\n", uniqueid, reconn_time);
ctx->param_uniqueid = uniqueid;
ctx->param_state = GLITE_JP_IS_STATE_ERROR;
glite_jp_db_set_time(ctx->param_expires, reconn_time);
// call PS FeedIndex for a given destination
-void MyFeedIndex(glite_jpis_context_t ctx, glite_jp_is_conf *conf, long int uniqueid, char *dest)
+int MyFeedIndex(glite_jpis_context_t ctx, glite_jp_is_conf *conf, long int uniqueid, char *dest)
{
struct _jpelem__FeedIndex in;
struct _jpelem__FeedIndexResponse out;
// struct xsd__base64Binary blob;
int i, dest_index;
struct soap *soap = soap_new();
-
+ glite_jp_error_t err;
+ char *src;
printf("MyFeedIndex for %s called\n", dest);
soap_set_namespaces(soap,jpps__namespaces);
memset(&in, 0, sizeof(in));
+ memset(&err, 0, sizeof(err));
for (i=0; conf->attrs[i]; i++) ;
in.__sizeattributes = i;
for (i=0; conf->feeds[dest_index]->query[i]; i++);
in.__sizeconditions = i;
- in.conditions = malloc(in.__sizeconditions * sizeof(*in.conditions));
+ in.conditions = soap_malloc(soap, in.__sizeconditions * sizeof(*in.conditions));
for (i=0; conf->feeds[dest_index]->query[i]; i++) {
if (glite_jpis_QueryCondToSoap(soap, conf->feeds[dest_index]->query[i],
&(in.conditions[i])) != SOAP_OK) {
- printf("MyFeedIndex() - error during conds conversion\n");
+ err.code = EINVAL;
+ err.desc = "error during conds conversion";
+ asprintf(&src, "%s/%s():%d", __FILE__, __FUNCTION__, __LINE__);
+ printf("%s\n", src);
goto err;
}
}
//if (!check_fault(soap,soap_call_jpsrv___FeedIndex(soap,dest,"",
if (soap_call___jpsrv__FeedIndex(soap,dest,"", &in, &out)) {
- printf("soap_call___jpsrv__FeedIndex() returned error\n");
+ printf("\n");
glite_jpis_unlockFeed(ctx, uniqueid);
+ err.code = EIO;
+ err.desc = "soap_call___jpsrv__FeedIndex() returned error";
+ asprintf(&src, "%s/%s():%d", __FILE__, __FUNCTION__, __LINE__);
+ printf("%s\n", err.desc);
goto err;
}
else {
glite_jpis_initFeed(ctx, uniqueid, out.feedId, out.feedExpires);
glite_jpis_unlockFeed(ctx, uniqueid);
}
-
+
+ return 0;
err:
+ err.source = src;
+ glite_jp_stack_error(ctx->jpctx, &err);
+ free(src);
soap_end(soap);
+ return err.code;
}
#include "context.h"
#include "conf.h"
-void MyFeedIndex(glite_jpis_context_t ctx, glite_jp_is_conf *conf, long int uniqueid, char *dest);
+int MyFeedIndex(glite_jpis_context_t ctx, glite_jp_is_conf *conf, long int uniqueid, char *dest);
#endif