Refresh feeds feature.
authorFrantišek Dvořák <valtri@civ.zcu.cz>
Mon, 18 Jun 2007 11:04:18 +0000 (11:04 +0000)
committerFrantišek Dvořák <valtri@civ.zcu.cz>
Mon, 18 Jun 2007 11:04:18 +0000 (11:04 +0000)
Update Provenance Challenge queries (second part will need some enhancements yet).
Use "launch time" timeout before start the JP IS in tests.

org.glite.jp.index/Makefile
org.glite.jp.index/examples/pch06/pch.pm
org.glite.jp.index/examples/pch06/query2.pl
org.glite.jp.index/examples/query-tests/run-test.sh
org.glite.jp.index/src/bones_server.c
org.glite.jp.index/src/context.h
org.glite.jp.index/src/db_ops.c
org.glite.jp.index/src/db_ops.h
org.glite.jp.index/src/soap_ps_calls.c
org.glite.jp.index/src/soap_ps_calls.h

index 940ddf1..36410a5 100644 (file)
@@ -63,8 +63,8 @@ SRCS:= conf.c bones_server.c soap_ops.c soap_ps_calls.c common.c \
        ${is_prefix}Server.c ${ps_prefix}Client.c ${ws_prefix}C.c \
        ws_ps_typeref.c ws_is_typeref.c db_ops.c context.c common_server.c 
 
-EXA_TEST_SRCS:=jpis-test.c ${is_prefix}Client.c ${is_prefix}C.c context.c db_ops.c conf.c ws_is_typeref.c
-EXA_DB_SRCS:=jpis-db-internal.c db_ops.c conf.c context.c ws_is_typeref.c
+EXA_TEST_SRCS:=jpis-test.c ${is_prefix}Client.c ${is_prefix}C.c context.c db_ops.c conf.c ws_is_typeref.c common.c
+EXA_DB_SRCS:=jpis-db-internal.c db_ops.c conf.c context.c ws_is_typeref.c common.c
 EXA_CLIENT_SRCS:=jpis-client.c ${is_prefix}Client.c ${is_prefix}C.c common.c
 
 OBJS:=${SRCS:.c=.o}
index 6f33f17..75c0250 100644 (file)
@@ -12,7 +12,7 @@ use warnings;
 use XML::Twig;
 use Data::Dumper;
 
-our $ps='https://skurut1,cesnet.cz:8901';
+our $ps='https://skurut1.cesnet.cz:8901';
 our $is='https://scientific.civ.zcu.cz:8904';
 
 our $lbattr='http://egee.cesnet.cz/en/Schema/LB/Attributes';
index c6d8527..43206c6 100644 (file)
@@ -14,8 +14,8 @@ use strict;
 use pch;
 use Data::Dumper;
 
-my $ps=$pch:ps;
-my $is=$pch:is;
+my $ps=$pch::ps;
+my $is=$pch::is;
 my $program_name = "softmean";
 
 my @according_jobs = (); # sequencially jobid list
index 0ee2d68..9c01ad0 100755 (executable)
@@ -161,6 +161,7 @@ run_is() {
                exit 1;
        fi
        echo -n "S "
+       sleep 1
 }
 
 kill_is() {
index bc3b397..57e645e 100644 (file)
@@ -19,6 +19,7 @@
 #include "db_ops.h"
 #include "soap_ps_calls.h"
 #include "context.h"
+#include "common.h"
 #include "common_server.h"
 
 #include "soap_version.h"
@@ -198,43 +199,89 @@ int main(int argc, char *argv[])
 }
 
 
+static int get_soap(struct soap *soap, glite_jpis_context_t ctx) {
+       glite_gsplugin_Context                  plugin_ctx;
+
+       glite_gsplugin_init_context(&plugin_ctx);
+       
+       soap_init(soap);
+       soap_set_namespaces(soap, jp__namespaces);
+       soap_set_omode(soap, SOAP_IO_BUFFER);   // set buffered response
+                                                // buffer set to SOAP_BUFLEN (default = 8k)    
+       if (soap_register_plugin_arg(soap,glite_gsplugin,plugin_ctx))
+               return glite_jpis_stack_error(ctx->jpctx, EIO, "can't register gsoap plugin");
+
+       return 0;
+}
+
+
 /* looking for some feed in DB */
-int feed_caller(glite_jpis_context_t isctx, glite_jp_is_conf *conf) {
-       char *PS_URL;
+static int feed_caller(struct soap *soap, glite_jpis_context_t isctx) {
+       char *PS_URL, *feedid, *errs;
        long int uniqueid;
-       int i, ok;
+       int i, ok, ret, status, initialized, result = 0;
 
        // dirty hack - try quicker several times first
        glite_jp_clear_error(isctx->jpctx);
-       switch (glite_jpis_lockUninitializedFeed(isctx,&uniqueid,&PS_URL)) {
+
+       feedid = NULL;
+       for (initialized = 0; initialized <= 1; initialized++) {
+               switch (glite_jpis_lockSearchFeed(isctx,initialized,&uniqueid,&PS_URL,&status,&feedid)) {
                case 0: 
                        ok = 0;
                        for (i = 0; i < 10; i++) {
-                               // contact PS server, ask for data, save
-                               // feedId and expiration to DB and unlock feed
-                               if (MyFeedIndex(isctx, conf, uniqueid, PS_URL) != 0) {
+                               if (!initialized) {
+                                       // contact PS server, ask for data, save
+                                       // feedId and expiration to DB and unlock the feed
+                                       ret = MyFeedIndex(soap, isctx, uniqueid, PS_URL);
+                               } else {
+                                       ret = MyFeedRefresh(soap, isctx, uniqueid, PS_URL, status, feedid);
+                               }
+                               if (ret) {
                                        // error when connecting to PS
-                                       printf("[%d] %s: %s (%s), reconnecting later\n", getpid(), __FUNCTION__, isctx->jpctx->error->desc, isctx->jpctx->error->source);
+                                       errs = glite_jp_error_chain(isctx->jpctx);
+                                       printf("[%d] %s: %s, reconnecting later\n", getpid(), __FUNCTION__, errs);
+                                       free(errs);
                                } else {
-                                       free(PS_URL);
+                                       lprintf("%s %s (%ld) ok\n", initialized ? "refresh" : "init", feedid, uniqueid);
                                        ok = 1;
                                        break;
                                }
                        }
-                       if (!ok) glite_jpis_tryReconnectFeed(isctx, uniqueid, time(NULL) + RECONNECT_TIME);
+                       if (!ok) {
+                               // when unintialized feed: always reconnect
+                               // when not refreshed feed: reconnect only once and two times quicker
+                               if (!initialized || (status & GLITE_JP_IS_STATE_ERROR) == 0) {
+                                       lprintf("reconnecting %s (%ld)\n", feedid, uniqueid);
+                                       glite_jpis_tryReconnectFeed(isctx, uniqueid, time(NULL) + RECONNECT_TIME / (initialized  + 1), status | GLITE_JP_IS_STATE_ERROR);
+                               } else {
+                                       lprintf("destroying %s (%ld)\n", feedid, uniqueid);
+                                       glite_jpis_destroyTryReconnectFeed(isctx, uniqueid, time(NULL) - 1);
+                               }
+                       }
+                       free(PS_URL); PS_URL = NULL;
+                       free(feedid); feedid = NULL;
+
                        sleep(RECONNECT_TIME_QUICK);
 
-                       return 1;
+                       result = 1;
+                       break;
                case ENOENT:
                        // no more feeds to initialize
-                       return 0;
+                       break;
                default:
                        // error during locking
                        printf("[%d] %s: Locking error: ", getpid(), __FUNCTION__);
-                       if (isctx->jpctx->error) printf("%s (%d)\n", isctx->jpctx->error->desc, isctx->jpctx->error->code);
-                       else printf("\n");
+                       if (isctx->jpctx->error) {
+                               errs = glite_jp_error_chain(isctx->jpctx);
+                               printf("%s\n", errs);
+                               free(errs);
+                       } else printf("(no detail)\n");
                        return -1;
+               }
        }
+
+       return result;
 }
 
 
@@ -242,6 +289,8 @@ int feed_caller(glite_jpis_context_t isctx, glite_jp_is_conf *conf) {
 int feed_loop_slave(void) {
        pid_t pid;
        glite_jpis_context_t isctx;
+       struct soap soap;
+       char *errs;
 
        if ( (pid = fork()) ) return pid;
 
@@ -251,16 +300,31 @@ int feed_loop_slave(void) {
                exit(1);
        }
 
+       if (get_soap(&soap, isctx) != 0) {
+               printf("[%d] %s: ", getpid(), __FUNCTION__);
+               if (isctx->jpctx->error) {
+                       errs = glite_jp_error_chain(isctx->jpctx);
+                       printf("%s\n", errs);
+                       free(errs);
+               } else printf("(no detail)\n");
+               exit(1);
+       }
+
        printf("[%d] %s: waiting before feed requests...\n", getpid(), __FUNCTION__);
        sleep(LAUNCH_TIME);
        printf("[%d] %s: feeder slave started\n", getpid(), __FUNCTION__);
        do {
-               switch (feed_caller(isctx, conf)) {
+               switch (feed_caller(&soap, isctx)) {
                        case 1: break;
                        case 0:
                                sleep(REACTION_TIME);
                                break;
                        default:
+                               if (isctx->jpctx->error) {
+                                       errs = glite_jp_error_chain(isctx->jpctx);
+                                       printf("[%d] %s: %s\n", getpid(), __FUNCTION__, errs);
+                                       free(errs);
+                               }
                                printf("[%d] %s: feed locking error, slave terminated\n", getpid(), __FUNCTION__);
                                exit(1);
                }
@@ -288,9 +352,19 @@ int data_init(void **data)
        private->soap = soap_new();
 
 #if ONETIME_FEEDS
+       if (get_soap(private->soap, ctx) != 0) {
+               printf("[%d] %s: ", getpid(), __FUNCTION__);
+               if (isctx->jpctx->error) {
+                       errs = glite_jp_error_chain(ctx->jpctx);
+                       printf("%s\n", errs);
+                       free(errs);
+               } else printf("(no error)\n");
+               exit(1);
+       }
+
        /* ask PS server for data */
        do {
-               switch (feed_caller(private->ctx, conf)) {
+               switch (feed_caller(private->soap, private->ctx)) {
                        case 1:
                                // one feed handled
                                break;
index d67445a..eb1edea 100644 (file)
@@ -10,7 +10,7 @@
 typedef struct _glite_jpis_context {
        glite_jp_context_t jpctx;
        glite_jp_is_conf *conf;
-       glite_jp_db_stmt_t select_unlocked_feed_stmt, lock_feed_stmt, init_feed_stmt, unlock_feed_stmt, select_info_feed_stmt, update_state_feed_stmt, update_error_feed_stmt, select_info_attrs_indexed, select_jobid_stmt, select_user_stmt, insert_job_stmt, insert_user_stmt;
+       glite_jp_db_stmt_t select_unlocked_feed_stmt, lock_feed_stmt, init_feed_stmt, unlock_feed_stmt, select_info_feed_stmt, update_state_feed_stmt, select_info_attrs_indexed, select_jobid_stmt, select_user_stmt, insert_job_stmt, insert_user_stmt;
        long int param_uniqueid, param_state;
        char param_feedid[33], param_ps[256], param_indexed[256], param_jobid[33], param_dg_jobid[256], param_ownerid[33], param_cert[256];
        unsigned long param_ps_len, param_feedid_len, param_indexed_len, param_jobid_len, param_dg_jobid_len, param_ownerid_len, param_cert_len;
index 798723f..0ac3d04 100644 (file)
@@ -18,6 +18,7 @@
 #include "conf.h"
 #include "context.h"
 #include "db_ops.h"
+#include "common.h"
 
 
 #ifndef LOG_SQL
@@ -322,13 +323,7 @@ int glite_jpis_initDatabase(glite_jpis_context_t ctx) {
                snprintf(sql, sizeof(sql) - 1, SQLCMD_CREATE_DATA_TABLE, attrid, type_index, type_full);
                llprintf(LOG_SQL, "creating table: '%s'\n", sql);
                if ((glite_jp_db_execstmt(jpctx, sql, NULL)) == -1) {
-                       glite_jp_error_t err;
-
-                       memset(&err,0,sizeof err);
-                       err.code = EAGAIN;
-                       err.source = __FUNCTION__;
-                       err.desc = "If the atribute table already exists, restart may help.";
-                       glite_jp_stack_error(ctx->jpctx, &err); 
+                       glite_jpis_stack_error(ctx->jpctx, EAGAIN, "if the atribute table already exists, restart may help");
                        goto fail;
                }
 
@@ -454,13 +449,6 @@ int glite_jpis_init_db(glite_jpis_context_t isctx) {
                GLITE_JP_DB_TYPE_INT, &isctx->param_uniqueid);
        if ((ret = glite_jp_db_prepare(jpctx, "UPDATE feeds SET state=? WHERE (uniqueid=?)", &isctx->update_state_feed_stmt, myparam, NULL)) != 0) goto fail;
 
-       // sql command: set the error on feed
-       glite_jp_db_create_params(&myparam, 3, 
-               GLITE_JP_DB_TYPE_INT, &isctx->param_state,
-               GLITE_JP_DB_TYPE_DATETIME, &isctx->param_expires,
-               GLITE_JP_DB_TYPE_INT, &isctx->param_uniqueid);
-       if ((ret = glite_jp_db_prepare(jpctx, "UPDATE feeds SET state=?, expires=? WHERE (uniqueid=?)", &isctx->update_error_feed_stmt, myparam, NULL)) != 0) goto fail;
-
        // sql command: get info about indexed attributes
        glite_jp_db_create_results(&myres, 1,
                GLITE_JP_DB_TYPE_VARCHAR, NULL, isctx->param_indexed,  sizeof(isctx->param_indexed), &isctx->param_indexed_len);
@@ -504,7 +492,6 @@ void glite_jpis_free_db(glite_jpis_context_t ctx) {
        glite_jp_db_freestmt(&ctx->unlock_feed_stmt);
        glite_jp_db_freestmt(&ctx->select_info_feed_stmt);
        glite_jp_db_freestmt(&ctx->update_state_feed_stmt);
-       glite_jp_db_freestmt(&ctx->update_error_feed_stmt);
        glite_jp_db_freestmt(&ctx->select_info_attrs_indexed);
        glite_jp_db_freestmt(&ctx->select_jobid_stmt);
        glite_jp_db_freestmt(&ctx->select_user_stmt);
@@ -521,28 +508,32 @@ void glite_jpis_free_db(glite_jpis_context_t ctx) {
  *      ENOENT - no more feeds to initialize
  *      ENOLCK - error during locking */
 
-int glite_jpis_lockUninitializedFeed(glite_jpis_context_t ctx, long int *uniqueid, char **PS_URL)
+int glite_jpis_lockSearchFeed(glite_jpis_context_t ctx, int initialized, long int *uniqueid, char **PS_URL, int *status, char **feedid)
 {
        int ret;
        static int uninit_msg = 1;
-       char *sql, *res[1], *t;
+       char *sql, *res[4], *t;
        glite_jp_db_stmt_t stmt;
 
+       if (feedid) *feedid = NULL;
        do {
                t = glite_jp_db_timetodb(time(NULL));
-               trio_asprintf(&sql, "SELECT uniqueid, source FROM feeds WHERE (locked=0) AND (feedid IS NULL) AND ((state < " GLITE_JP_IS_STATE_ERROR_STR ") OR (expires <= %s))", t);
+               if (initialized) {
+                       trio_asprintf(&sql, "SELECT uniqueid, source, state, feedid FROM feeds WHERE (locked=0) AND (feedid IS NOT NULL) AND (expires <= %s)", t);
+               } else
+                       trio_asprintf(&sql, "SELECT uniqueid, source, state, feedid FROM feeds WHERE (locked=0) AND (feedid IS NULL) AND ((state < " GLITE_JP_IS_STATE_ERROR_STR ") OR (expires <= %s))", t);
                free(t);
                ret = glite_jp_db_execstmt(ctx->jpctx, sql, &stmt);
                free(sql);
                switch (ret) {
                case -1:
-                       lprintf("error selecting unlocked feed\n");
+                       glite_jpis_stack_error(ctx->jpctx, ENOLCK, "error selecting unlocked feed");
                        uninit_msg = 1;
                        glite_jp_db_freestmt(&stmt);
                        return ENOLCK;
                case 0:
                        if (uninit_msg) {
-                               lprintf("no more uninit. feeds unlocked\n");
+                               lprintf("no more %s feeds for now\n", initialized ? "not-refreshed" : "uninitialized");
                                uninit_msg = 0;
                        }
                        glite_jp_db_freestmt(&stmt);
@@ -551,16 +542,22 @@ int glite_jpis_lockUninitializedFeed(glite_jpis_context_t ctx, long int *uniquei
                }
                uninit_msg = 1;
                if (glite_jp_db_fetchrow(stmt, res) <= 0) {
-                       lprintf("error fetching unlocked feed\n");
+                       glite_jpis_stack_error(ctx->jpctx, ENOLCK, "error fetching unlocked feed");
                        glite_jp_db_freestmt(&stmt);
                        return ENOLCK;
                }
                glite_jp_db_freestmt(&stmt);
                ctx->param_uniqueid = atol(res[0]);
                strncpy(ctx->param_ps, res[1], sizeof ctx->param_ps);
-               lprintf("selected uninit. feed, uniqueid='%s'\n", res[0]);
+               lprintf("selected feed, uniqueid=%s\n", res[0]);
+               if (status) *status = atoi(res[2]);
                free(res[0]);
                free(res[1]);
+               free(res[2]);
+               if (feedid) {
+                       free(*feedid);
+                       *feedid = res[3];
+               } else free(res[3]);
 
                ret = glite_jp_db_execute(ctx->lock_feed_stmt);
                lprintf("locked %d feeds (uniqueid=%ld)\n", ret, ctx->param_uniqueid);
@@ -575,17 +572,19 @@ int glite_jpis_lockUninitializedFeed(glite_jpis_context_t ctx, long int *uniquei
 
 /* Store feed ID and expiration time returned by PS for locked feed. */
 
-int glite_jpis_initFeed(glite_jpis_context_t ctx, long int uniqueid, char *feedId, time_t feedExpires, int status)
+int glite_jpis_initFeed(glite_jpis_context_t ctx, long int uniqueid, const char *feedId, time_t feedExpires, int status)
 {
        int ret;
+       time_t tnow;
 
        GLITE_JPIS_PARAM(ctx->param_feedid, ctx->param_feedid_len, feedId);
-       glite_jp_db_set_time(ctx->param_expires, feedExpires);
+       tnow = time(NULL);
+       glite_jp_db_set_time(ctx->param_expires, tnow + (feedExpires - tnow) / 2);
        ctx->param_uniqueid = uniqueid;
        ctx->param_state = status;
 
        ret = glite_jp_db_execute(ctx->init_feed_stmt);
-       lprintf("initializing feed, uniqueid=%li, result=%d\n", uniqueid, ret);
+       lprintf("initializing feed, uniqueid=%ld, result=%d\n", uniqueid, ret);
 
        return ret == 1 ? 0 : ENOLCK;
 }
@@ -598,7 +597,7 @@ int glite_jpis_unlockFeed(glite_jpis_context_t ctx, long int uniqueid) {
 
        ctx->param_uniqueid = uniqueid;
        ret = glite_jp_db_execute(ctx->unlock_feed_stmt);
-       lprintf("unlocking feed, uniqueid=%li, result=%d\n", uniqueid, ret);
+       lprintf("unlocking feed, uniqueid=%ld, result=%d\n", uniqueid, ret);
 
        return ret == 1 ? 0 : ENOLCK;
 }
@@ -606,13 +605,34 @@ int glite_jpis_unlockFeed(glite_jpis_context_t ctx, long int uniqueid) {
 
 /* Saves TTL (when to reconnect if error occured) for given feed */
 
-int glite_jpis_tryReconnectFeed(glite_jpis_context_t ctx, long int uniqueid, time_t reconn_time) {
-       lprintf("reconnect, un=%ld, %ld\n", uniqueid, reconn_time);
-       ctx->param_uniqueid = uniqueid;
-       ctx->param_state = GLITE_JP_IS_STATE_ERROR;
-       glite_jp_db_set_time(ctx->param_expires, reconn_time);
-       if (glite_jp_db_execute(ctx->update_error_feed_stmt) == -1) return ctx->jpctx->error->code;
-       return 0;
+int glite_jpis_tryReconnectFeed(glite_jpis_context_t ctx, long int uniqueid, time_t reconn_time, int state) {
+       int ret;
+       char *sql, *t;
+
+       t = glite_jp_db_timetodb(reconn_time);
+       lprintf("reconnect, un=%ld, %s\n", uniqueid, t);
+       trio_asprintf(&sql, "UPDATE feeds SET state=%d, expires=%s WHERE (uniqueid=%ld)", state, t, uniqueid);
+       free(t);
+       if ((ret = glite_jp_db_execstmt(ctx->jpctx, sql, NULL)) != 1)
+               glite_jpis_stack_error(ctx->jpctx, EIO, "can't update feed no. %ld in DB", uniqueid);
+       free(sql);
+       return ret == -1 ? ctx->jpctx->error->code : 0;
+}
+
+
+// TODO: could be merged with initFeed
+int glite_jpis_destroyTryReconnectFeed(glite_jpis_context_t ctx, long int uniqueid, time_t reconn_time) {
+       int ret;
+       char *sql, *t;
+
+       t = glite_jp_db_timetodb(reconn_time);
+       lprintf("destroy not refreshed feed, un=%ld, %s\n", uniqueid, t);
+       trio_asprintf(&sql, "UPDATE feeds SET feedid=NULL, state=0, expires=%s WHERE (uniqueid=%ld)", t, uniqueid);
+       free(t);
+       if ((ret = glite_jp_db_execstmt(ctx->jpctx, sql, NULL)) != 1)
+               glite_jpis_stack_error(ctx->jpctx, EIO, "can't destroy non-refreshable feed no. %ld in DB", uniqueid);
+       free(sql);
+       return ret == -1 ? ctx->jpctx->error->code : 0;
 }
 
 
index 9105ba2..19343d8 100644 (file)
@@ -32,10 +32,11 @@ int glite_jpis_dropDatabase(glite_jpis_context_t ctx);
 int glite_jpis_init_db(glite_jpis_context_t isctx);
 void glite_jpis_free_db(glite_jpis_context_t ctx);
 
-int glite_jpis_lockUninitializedFeed(glite_jpis_context_t ctx, long int *uinqueid, char **PS_URL);
-int glite_jpis_initFeed(glite_jpis_context_t ctx, long int uniqueid, char *feedId, time_t feedExpires, int status);
+int glite_jpis_lockSearchFeed(glite_jpis_context_t ctx, int initialized, long int *uinqueid, char **PS_URL, int *status, char **feedid);
+int glite_jpis_initFeed(glite_jpis_context_t ctx, long int uniqueid, const char *feedId, time_t feedExpires, int status);
 int glite_jpis_unlockFeed(glite_jpis_context_t ctx, long int uniqueid);
-int glite_jpis_tryReconnectFeed(glite_jpis_context_t ctx, long int uniqueid, time_t reconn_time);
+int glite_jpis_tryReconnectFeed(glite_jpis_context_t ctx, long int uniqueid, time_t reconn_time, int state);
+int glite_jpis_destroyTryReconnectFeed(glite_jpis_context_t ctx, long int uniqueid, time_t reconn_time);
 
 int glite_jpis_insertAttrVal(glite_jpis_context_t ctx, const char *jobid, glite_jp_attrval_t *av);
 
index 3cf4d7c..47e5db4 100644 (file)
@@ -6,6 +6,7 @@
 #include "soap_version.h"
 #include "glite/jp/types.h"
 #include "glite/jp/context.h"
+#include "glite/security/glite_gss.h"
 #include "glite/security/glite_gsplugin.h"
 #include "glite/security/glite_gscompat.h"
 
@@ -15,6 +16,7 @@
 #include "db_ops.h"
 #include "ws_ps_typeref.h"
 #include "context.h"
+#include "common.h"
 
 #include "stdsoap2.h"
 
@@ -46,8 +48,32 @@ static int find_dest_index(glite_jp_is_conf *conf, long int uniqueid)
 }
 
 
+static int refresh_gsoap(glite_jpis_context_t ctx, struct soap *soap) {
+       gss_cred_id_t           cred;
+       edg_wll_GssStatus       gss_code;
+       char                    *et;
+       // preventive very long timeout
+       static const struct timeval to = {tv_sec: 7200, tv_usec: 0};
+       glite_gsplugin_Context  plugin_ctx;
+
+       if (edg_wll_gss_acquire_cred_gsi(ctx->conf->server_cert, ctx->conf->server_key, &cred, NULL, &gss_code) != 0) {
+               edg_wll_gss_get_error(&gss_code,"",&et);
+               glite_jpis_stack_error(ctx->jpctx, EINVAL, "can't refresh certificates (%s)", et);
+               free(et);
+               return EINVAL;
+               //printf("[%d] %s: %s\n", getpid(), __FUNCTION__, err.desc);
+       }
+
+       plugin_ctx = glite_gsplugin_get_context(soap);
+       glite_gsplugin_set_timeout(plugin_ctx, &to);
+       glite_gsplugin_set_credential(plugin_ctx, cred);
+
+       return 0;
+}
+
+
 // call PS FeedIndex for a given destination
-int MyFeedIndex(glite_jpis_context_t ctx, glite_jp_is_conf *conf, long int uniqueid, char *dest)
+int MyFeedIndex(struct soap *soap, glite_jpis_context_t ctx, long int uniqueid, const char *dest)
 {
        struct _jpelem__FeedIndex               in;
        struct _jpelem__FeedIndexResponse       out;
@@ -55,54 +81,31 @@ int MyFeedIndex(glite_jpis_context_t ctx, glite_jp_is_conf *conf, long int uniqu
 //     struct jptype__stringOrBlob             value;
 //     struct xsd__base64Binary                blob;
        int                                     i, dest_index, status;
-       struct soap                             *soap = soap_new();
-       glite_gsplugin_Context                  plugin_ctx;
-       gss_cred_id_t                           cred;
-       glite_jp_error_t err;
-       char *src, *desc = NULL;
-       // preventive very long timeout
-       static const struct timeval to = {tv_sec: 7200, tv_usec: 0};
+       glite_jp_is_conf *conf = ctx->conf;
 
        lprintf("(%ld) for %s called\n", uniqueid, dest);
 
-       glite_gsplugin_init_context(&plugin_ctx);
-       glite_gsplugin_set_timeout(plugin_ctx, &to);
-       if (edg_wll_gss_acquire_cred_gsi(ctx->conf->server_cert, ctx->conf->server_key, &cred, NULL, NULL) != 0) {
-
-               err.code = EINVAL;
-               err.desc = "can't set credentials";
-               asprintf(&src, "%s/%s():%d", __FILE__, __FUNCTION__, __LINE__);
-               fprintf(stderr, "%s\n", src);
-               goto err;
-       }
-       glite_gsplugin_set_credential(plugin_ctx, cred);
-       
-       soap_init(soap);
-        soap_set_namespaces(soap, jp__namespaces);
-       soap_set_omode(soap, SOAP_IO_BUFFER);   // set buffered response
-                                                // buffer set to SOAP_BUFLEN (default = 8k)    
-       soap_register_plugin_arg(soap,glite_gsplugin,plugin_ctx);
+       if (refresh_gsoap(ctx, soap) != 0)
+               return glite_jpis_stack_error(ctx->jpctx, EINVAL, "can't refresh credentials");
 
        memset(&in, 0, sizeof(in));
-       memset(&err, 0, sizeof(err));
 
        for (i=0; conf->attrs[i]; i++) ;
        in.__sizeattributes = i;
        in.attributes = conf->attrs;
 
-       if ((dest_index = find_dest_index(conf, uniqueid)) < 0) goto err;
+       if ((dest_index = find_dest_index(conf, uniqueid)) < 0)
+               return glite_jpis_stack_error(ctx->jpctx, EINVAL, "internal error (feed index %ld not found)", uniqueid);
 
+       soap_begin(soap);
        for (i=0; conf->feeds[dest_index]->query[i].attr; i++);
        GLITE_SECURITY_GSOAP_LIST_CREATE(soap, &in, conditions, struct jptype__primaryQuery, i);
 
        for (i=0; conf->feeds[dest_index]->query[i].attr; i++) {
                if (glite_jpis_QueryCondToSoap(soap, &conf->feeds[dest_index]->query[i], 
                                GLITE_SECURITY_GSOAP_LIST_GET(in.conditions, i)) != SOAP_OK) {
-                       err.code = EINVAL;
-                       err.desc = "error during conds conversion";
-                       asprintf(&src, "%s/%s():%d", __FILE__, __FUNCTION__, __LINE__);
-                       fprintf(stderr, "%s\n", src);
-                       goto err;
+                       soap_end(soap);
+                       return glite_jpis_stack_error(ctx->jpctx, EINVAL, "error during conds conversion");
                }
        }
 
@@ -114,35 +117,54 @@ int MyFeedIndex(glite_jpis_context_t ctx, glite_jp_is_conf *conf, long int uniqu
        if (check_fault(soap,soap_call___jpsrv__FeedIndex(soap,dest,"", &in, &out)) != 0) {
                fprintf(stderr, "\n");
                glite_jpis_unlockFeed(ctx, uniqueid);
-               err.code = EIO;
-               asprintf(&desc, "soap_call___jpsrv__FeedIndex() returned error %d", soap->error);
-               err.desc = desc;
-               asprintf(&src, "%s/%s():%d", __FILE__, __FUNCTION__, __LINE__);
-               fprintf(stderr, "%s\n", err.desc);
-               goto err;
+               glite_jpis_stack_error(ctx->jpctx, EIO, "soap_call___jpsrv__FeedIndex() returned error %d", soap->error);
+               soap_end(soap);
+               return EIO;
        }
        else {
                status = (conf->feeds[dest_index]->history ? GLITE_JP_IS_STATE_HIST : 0) | (conf->feeds[dest_index]->continuous ? GLITE_JP_IS_STATE_CONT : 0);
                lprintf("(%ld) FeedId: %s\n", uniqueid, out.feedId);
                lprintf("(%ld) Expires: %s", uniqueid, ctime(&out.feedExpires));
-               glite_jpis_initFeed(ctx, uniqueid, out.feedId, out.feedExpires, status);
+               glite_jpis_initFeed(ctx, uniqueid, out.feedId, time(NULL) + (out.feedExpires - time(NULL)) / 2, status);
                glite_jpis_unlockFeed(ctx, uniqueid);
        }
 
        soap_end(soap);
-       soap_done(soap);
 
        return 0;
+}
 
-err:
-       err.source = src;
-       glite_jp_stack_error(ctx->jpctx, &err);
-       free(src);
-       free(desc);
-       soap_end(soap);
-       soap_done(soap);
 
-       return err.code;
+int MyFeedRefresh(struct soap *soap, glite_jpis_context_t ctx, long int uniqueid, const char *dest, int status, const char *feedid)
+{
+       struct _jpelem__FeedIndexRefresh                in;
+       struct _jpelem__FeedIndexRefreshResponse        out;
+
+       lprintf("(%ld) for %s called, status = %d\n", uniqueid, feedid, status);
+
+       if (refresh_gsoap(ctx, soap) != 0)
+               return glite_jpis_stack_error(ctx->jpctx, EINVAL, "can't refresh credentials");
+
+       soap_begin(soap);
+       memset(&in, 0, sizeof(in));
+       in.feedId = soap_strdup(soap, feedid);
+       if (check_fault(soap,soap_call___jpsrv__FeedIndexRefresh(soap,dest,"", &in, &out)) != 0) {
+               fprintf(stderr, "\n");
+               glite_jpis_unlockFeed(ctx, uniqueid);
+               glite_jpis_stack_error(ctx->jpctx, EIO, "soap_call___jpsrv__FeedRefresh() returned error %d", soap->error);
+               soap_end(soap);
+               return EIO;
+       }
+       else {
+               status &= (~GLITE_JP_IS_STATE_ERROR);
+               lprintf("(%ld) FeedId: %s\n", uniqueid, feedid);
+               lprintf("(%ld) Expires: %s", uniqueid, ctime(&out.feedExpires));
+               glite_jpis_initFeed(ctx, uniqueid, feedid, time(NULL) + (out.feedExpires - time(NULL)) / 2, status);
+               glite_jpis_unlockFeed(ctx, uniqueid);
+       }
+
+       soap_end(soap);
+       return 0;
 }
 
 
index cf167ef..070fd4f 100644 (file)
@@ -6,6 +6,7 @@
 #include "context.h"
 #include "conf.h"
 
-int MyFeedIndex(glite_jpis_context_t ctx, glite_jp_is_conf *conf, long int uniqueid, char *dest);
+int MyFeedIndex(struct soap *soap, glite_jpis_context_t ctx, long int uniqueid, const char *dest);
+int MyFeedRefresh(struct soap *soap, glite_jpis_context_t ctx, long int uniqueid, const char *dest, int status, const char *feedid);
 
 #endif