From aa06d7da69d2b3b7fb6cebd3132448c73d8ef1f4 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Franti=C5=A1ek=20Dvo=C5=99=C3=A1k?= Date: Mon, 18 Jun 2007 11:04:18 +0000 Subject: [PATCH] Refresh feeds feature. Update Provenance Challenge queries (second part will need some enhancements yet). Use "launch time" timeout before start the JP IS in tests. --- org.glite.jp.index/Makefile | 4 +- org.glite.jp.index/examples/pch06/pch.pm | 2 +- org.glite.jp.index/examples/pch06/query2.pl | 4 +- .../examples/query-tests/run-test.sh | 1 + org.glite.jp.index/src/bones_server.c | 106 +++++++++++++++--- org.glite.jp.index/src/context.h | 2 +- org.glite.jp.index/src/db_ops.c | 86 +++++++++------ org.glite.jp.index/src/db_ops.h | 7 +- org.glite.jp.index/src/soap_ps_calls.c | 118 ++++++++++++--------- org.glite.jp.index/src/soap_ps_calls.h | 3 +- 10 files changed, 226 insertions(+), 107 deletions(-) diff --git a/org.glite.jp.index/Makefile b/org.glite.jp.index/Makefile index 940ddf1..36410a5 100644 --- a/org.glite.jp.index/Makefile +++ b/org.glite.jp.index/Makefile @@ -63,8 +63,8 @@ SRCS:= conf.c bones_server.c soap_ops.c soap_ps_calls.c common.c \ ${is_prefix}Server.c ${ps_prefix}Client.c ${ws_prefix}C.c \ ws_ps_typeref.c ws_is_typeref.c db_ops.c context.c common_server.c -EXA_TEST_SRCS:=jpis-test.c ${is_prefix}Client.c ${is_prefix}C.c context.c db_ops.c conf.c ws_is_typeref.c -EXA_DB_SRCS:=jpis-db-internal.c db_ops.c conf.c context.c ws_is_typeref.c +EXA_TEST_SRCS:=jpis-test.c ${is_prefix}Client.c ${is_prefix}C.c context.c db_ops.c conf.c ws_is_typeref.c common.c +EXA_DB_SRCS:=jpis-db-internal.c db_ops.c conf.c context.c ws_is_typeref.c common.c EXA_CLIENT_SRCS:=jpis-client.c ${is_prefix}Client.c ${is_prefix}C.c common.c OBJS:=${SRCS:.c=.o} diff --git a/org.glite.jp.index/examples/pch06/pch.pm b/org.glite.jp.index/examples/pch06/pch.pm index 6f33f17..75c0250 100644 --- a/org.glite.jp.index/examples/pch06/pch.pm +++ b/org.glite.jp.index/examples/pch06/pch.pm @@ -12,7 +12,7 @@ use warnings; use XML::Twig; use Data::Dumper; -our $ps='https://skurut1,cesnet.cz:8901'; +our $ps='https://skurut1.cesnet.cz:8901'; our $is='https://scientific.civ.zcu.cz:8904'; our $lbattr='http://egee.cesnet.cz/en/Schema/LB/Attributes'; diff --git a/org.glite.jp.index/examples/pch06/query2.pl b/org.glite.jp.index/examples/pch06/query2.pl index c6d8527..43206c6 100644 --- a/org.glite.jp.index/examples/pch06/query2.pl +++ b/org.glite.jp.index/examples/pch06/query2.pl @@ -14,8 +14,8 @@ use strict; use pch; use Data::Dumper; -my $ps=$pch:ps; -my $is=$pch:is; +my $ps=$pch::ps; +my $is=$pch::is; my $program_name = "softmean"; my @according_jobs = (); # sequencially jobid list diff --git a/org.glite.jp.index/examples/query-tests/run-test.sh b/org.glite.jp.index/examples/query-tests/run-test.sh index 0ee2d68..9c01ad0 100755 --- a/org.glite.jp.index/examples/query-tests/run-test.sh +++ b/org.glite.jp.index/examples/query-tests/run-test.sh @@ -161,6 +161,7 @@ run_is() { exit 1; fi echo -n "S " + sleep 1 } kill_is() { diff --git a/org.glite.jp.index/src/bones_server.c b/org.glite.jp.index/src/bones_server.c index bc3b397..57e645e 100644 --- a/org.glite.jp.index/src/bones_server.c +++ b/org.glite.jp.index/src/bones_server.c @@ -19,6 +19,7 @@ #include "db_ops.h" #include "soap_ps_calls.h" #include "context.h" +#include "common.h" #include "common_server.h" #include "soap_version.h" @@ -198,43 +199,89 @@ int main(int argc, char *argv[]) } +static int get_soap(struct soap *soap, glite_jpis_context_t ctx) { + glite_gsplugin_Context plugin_ctx; + + glite_gsplugin_init_context(&plugin_ctx); + + soap_init(soap); + soap_set_namespaces(soap, jp__namespaces); + soap_set_omode(soap, SOAP_IO_BUFFER); // set buffered response + // buffer set to SOAP_BUFLEN (default = 8k) + if (soap_register_plugin_arg(soap,glite_gsplugin,plugin_ctx)) + return glite_jpis_stack_error(ctx->jpctx, EIO, "can't register gsoap plugin"); + + return 0; +} + + /* looking for some feed in DB */ -int feed_caller(glite_jpis_context_t isctx, glite_jp_is_conf *conf) { - char *PS_URL; +static int feed_caller(struct soap *soap, glite_jpis_context_t isctx) { + char *PS_URL, *feedid, *errs; long int uniqueid; - int i, ok; + int i, ok, ret, status, initialized, result = 0; // dirty hack - try quicker several times first glite_jp_clear_error(isctx->jpctx); - switch (glite_jpis_lockUninitializedFeed(isctx,&uniqueid,&PS_URL)) { + + feedid = NULL; + for (initialized = 0; initialized <= 1; initialized++) { + switch (glite_jpis_lockSearchFeed(isctx,initialized,&uniqueid,&PS_URL,&status,&feedid)) { case 0: ok = 0; for (i = 0; i < 10; i++) { - // contact PS server, ask for data, save - // feedId and expiration to DB and unlock feed - if (MyFeedIndex(isctx, conf, uniqueid, PS_URL) != 0) { + if (!initialized) { + // contact PS server, ask for data, save + // feedId and expiration to DB and unlock the feed + ret = MyFeedIndex(soap, isctx, uniqueid, PS_URL); + } else { + ret = MyFeedRefresh(soap, isctx, uniqueid, PS_URL, status, feedid); + } + if (ret) { // error when connecting to PS - printf("[%d] %s: %s (%s), reconnecting later\n", getpid(), __FUNCTION__, isctx->jpctx->error->desc, isctx->jpctx->error->source); + errs = glite_jp_error_chain(isctx->jpctx); + printf("[%d] %s: %s, reconnecting later\n", getpid(), __FUNCTION__, errs); + free(errs); } else { - free(PS_URL); + lprintf("%s %s (%ld) ok\n", initialized ? "refresh" : "init", feedid, uniqueid); ok = 1; break; } } - if (!ok) glite_jpis_tryReconnectFeed(isctx, uniqueid, time(NULL) + RECONNECT_TIME); + if (!ok) { + // when unintialized feed: always reconnect + // when not refreshed feed: reconnect only once and two times quicker + if (!initialized || (status & GLITE_JP_IS_STATE_ERROR) == 0) { + lprintf("reconnecting %s (%ld)\n", feedid, uniqueid); + glite_jpis_tryReconnectFeed(isctx, uniqueid, time(NULL) + RECONNECT_TIME / (initialized + 1), status | GLITE_JP_IS_STATE_ERROR); + } else { + lprintf("destroying %s (%ld)\n", feedid, uniqueid); + glite_jpis_destroyTryReconnectFeed(isctx, uniqueid, time(NULL) - 1); + } + } + free(PS_URL); PS_URL = NULL; + free(feedid); feedid = NULL; + sleep(RECONNECT_TIME_QUICK); - return 1; + result = 1; + break; case ENOENT: // no more feeds to initialize - return 0; + break; default: // error during locking printf("[%d] %s: Locking error: ", getpid(), __FUNCTION__); - if (isctx->jpctx->error) printf("%s (%d)\n", isctx->jpctx->error->desc, isctx->jpctx->error->code); - else printf("\n"); + if (isctx->jpctx->error) { + errs = glite_jp_error_chain(isctx->jpctx); + printf("%s\n", errs); + free(errs); + } else printf("(no detail)\n"); return -1; + } } + + return result; } @@ -242,6 +289,8 @@ int feed_caller(glite_jpis_context_t isctx, glite_jp_is_conf *conf) { int feed_loop_slave(void) { pid_t pid; glite_jpis_context_t isctx; + struct soap soap; + char *errs; if ( (pid = fork()) ) return pid; @@ -251,16 +300,31 @@ int feed_loop_slave(void) { exit(1); } + if (get_soap(&soap, isctx) != 0) { + printf("[%d] %s: ", getpid(), __FUNCTION__); + if (isctx->jpctx->error) { + errs = glite_jp_error_chain(isctx->jpctx); + printf("%s\n", errs); + free(errs); + } else printf("(no detail)\n"); + exit(1); + } + printf("[%d] %s: waiting before feed requests...\n", getpid(), __FUNCTION__); sleep(LAUNCH_TIME); printf("[%d] %s: feeder slave started\n", getpid(), __FUNCTION__); do { - switch (feed_caller(isctx, conf)) { + switch (feed_caller(&soap, isctx)) { case 1: break; case 0: sleep(REACTION_TIME); break; default: + if (isctx->jpctx->error) { + errs = glite_jp_error_chain(isctx->jpctx); + printf("[%d] %s: %s\n", getpid(), __FUNCTION__, errs); + free(errs); + } printf("[%d] %s: feed locking error, slave terminated\n", getpid(), __FUNCTION__); exit(1); } @@ -288,9 +352,19 @@ int data_init(void **data) private->soap = soap_new(); #if ONETIME_FEEDS + if (get_soap(private->soap, ctx) != 0) { + printf("[%d] %s: ", getpid(), __FUNCTION__); + if (isctx->jpctx->error) { + errs = glite_jp_error_chain(ctx->jpctx); + printf("%s\n", errs); + free(errs); + } else printf("(no error)\n"); + exit(1); + } + /* ask PS server for data */ do { - switch (feed_caller(private->ctx, conf)) { + switch (feed_caller(private->soap, private->ctx)) { case 1: // one feed handled break; diff --git a/org.glite.jp.index/src/context.h b/org.glite.jp.index/src/context.h index d67445a..eb1edea 100644 --- a/org.glite.jp.index/src/context.h +++ b/org.glite.jp.index/src/context.h @@ -10,7 +10,7 @@ typedef struct _glite_jpis_context { glite_jp_context_t jpctx; glite_jp_is_conf *conf; - glite_jp_db_stmt_t select_unlocked_feed_stmt, lock_feed_stmt, init_feed_stmt, unlock_feed_stmt, select_info_feed_stmt, update_state_feed_stmt, update_error_feed_stmt, select_info_attrs_indexed, select_jobid_stmt, select_user_stmt, insert_job_stmt, insert_user_stmt; + glite_jp_db_stmt_t select_unlocked_feed_stmt, lock_feed_stmt, init_feed_stmt, unlock_feed_stmt, select_info_feed_stmt, update_state_feed_stmt, select_info_attrs_indexed, select_jobid_stmt, select_user_stmt, insert_job_stmt, insert_user_stmt; long int param_uniqueid, param_state; char param_feedid[33], param_ps[256], param_indexed[256], param_jobid[33], param_dg_jobid[256], param_ownerid[33], param_cert[256]; unsigned long param_ps_len, param_feedid_len, param_indexed_len, param_jobid_len, param_dg_jobid_len, param_ownerid_len, param_cert_len; diff --git a/org.glite.jp.index/src/db_ops.c b/org.glite.jp.index/src/db_ops.c index 798723f..0ac3d04 100644 --- a/org.glite.jp.index/src/db_ops.c +++ b/org.glite.jp.index/src/db_ops.c @@ -18,6 +18,7 @@ #include "conf.h" #include "context.h" #include "db_ops.h" +#include "common.h" #ifndef LOG_SQL @@ -322,13 +323,7 @@ int glite_jpis_initDatabase(glite_jpis_context_t ctx) { snprintf(sql, sizeof(sql) - 1, SQLCMD_CREATE_DATA_TABLE, attrid, type_index, type_full); llprintf(LOG_SQL, "creating table: '%s'\n", sql); if ((glite_jp_db_execstmt(jpctx, sql, NULL)) == -1) { - glite_jp_error_t err; - - memset(&err,0,sizeof err); - err.code = EAGAIN; - err.source = __FUNCTION__; - err.desc = "If the atribute table already exists, restart may help."; - glite_jp_stack_error(ctx->jpctx, &err); + glite_jpis_stack_error(ctx->jpctx, EAGAIN, "if the atribute table already exists, restart may help"); goto fail; } @@ -454,13 +449,6 @@ int glite_jpis_init_db(glite_jpis_context_t isctx) { GLITE_JP_DB_TYPE_INT, &isctx->param_uniqueid); if ((ret = glite_jp_db_prepare(jpctx, "UPDATE feeds SET state=? WHERE (uniqueid=?)", &isctx->update_state_feed_stmt, myparam, NULL)) != 0) goto fail; - // sql command: set the error on feed - glite_jp_db_create_params(&myparam, 3, - GLITE_JP_DB_TYPE_INT, &isctx->param_state, - GLITE_JP_DB_TYPE_DATETIME, &isctx->param_expires, - GLITE_JP_DB_TYPE_INT, &isctx->param_uniqueid); - if ((ret = glite_jp_db_prepare(jpctx, "UPDATE feeds SET state=?, expires=? WHERE (uniqueid=?)", &isctx->update_error_feed_stmt, myparam, NULL)) != 0) goto fail; - // sql command: get info about indexed attributes glite_jp_db_create_results(&myres, 1, GLITE_JP_DB_TYPE_VARCHAR, NULL, isctx->param_indexed, sizeof(isctx->param_indexed), &isctx->param_indexed_len); @@ -504,7 +492,6 @@ void glite_jpis_free_db(glite_jpis_context_t ctx) { glite_jp_db_freestmt(&ctx->unlock_feed_stmt); glite_jp_db_freestmt(&ctx->select_info_feed_stmt); glite_jp_db_freestmt(&ctx->update_state_feed_stmt); - glite_jp_db_freestmt(&ctx->update_error_feed_stmt); glite_jp_db_freestmt(&ctx->select_info_attrs_indexed); glite_jp_db_freestmt(&ctx->select_jobid_stmt); glite_jp_db_freestmt(&ctx->select_user_stmt); @@ -521,28 +508,32 @@ void glite_jpis_free_db(glite_jpis_context_t ctx) { * ENOENT - no more feeds to initialize * ENOLCK - error during locking */ -int glite_jpis_lockUninitializedFeed(glite_jpis_context_t ctx, long int *uniqueid, char **PS_URL) +int glite_jpis_lockSearchFeed(glite_jpis_context_t ctx, int initialized, long int *uniqueid, char **PS_URL, int *status, char **feedid) { int ret; static int uninit_msg = 1; - char *sql, *res[1], *t; + char *sql, *res[4], *t; glite_jp_db_stmt_t stmt; + if (feedid) *feedid = NULL; do { t = glite_jp_db_timetodb(time(NULL)); - trio_asprintf(&sql, "SELECT uniqueid, source FROM feeds WHERE (locked=0) AND (feedid IS NULL) AND ((state < " GLITE_JP_IS_STATE_ERROR_STR ") OR (expires <= %s))", t); + if (initialized) { + trio_asprintf(&sql, "SELECT uniqueid, source, state, feedid FROM feeds WHERE (locked=0) AND (feedid IS NOT NULL) AND (expires <= %s)", t); + } else + trio_asprintf(&sql, "SELECT uniqueid, source, state, feedid FROM feeds WHERE (locked=0) AND (feedid IS NULL) AND ((state < " GLITE_JP_IS_STATE_ERROR_STR ") OR (expires <= %s))", t); free(t); ret = glite_jp_db_execstmt(ctx->jpctx, sql, &stmt); free(sql); switch (ret) { case -1: - lprintf("error selecting unlocked feed\n"); + glite_jpis_stack_error(ctx->jpctx, ENOLCK, "error selecting unlocked feed"); uninit_msg = 1; glite_jp_db_freestmt(&stmt); return ENOLCK; case 0: if (uninit_msg) { - lprintf("no more uninit. feeds unlocked\n"); + lprintf("no more %s feeds for now\n", initialized ? "not-refreshed" : "uninitialized"); uninit_msg = 0; } glite_jp_db_freestmt(&stmt); @@ -551,16 +542,22 @@ int glite_jpis_lockUninitializedFeed(glite_jpis_context_t ctx, long int *uniquei } uninit_msg = 1; if (glite_jp_db_fetchrow(stmt, res) <= 0) { - lprintf("error fetching unlocked feed\n"); + glite_jpis_stack_error(ctx->jpctx, ENOLCK, "error fetching unlocked feed"); glite_jp_db_freestmt(&stmt); return ENOLCK; } glite_jp_db_freestmt(&stmt); ctx->param_uniqueid = atol(res[0]); strncpy(ctx->param_ps, res[1], sizeof ctx->param_ps); - lprintf("selected uninit. feed, uniqueid='%s'\n", res[0]); + lprintf("selected feed, uniqueid=%s\n", res[0]); + if (status) *status = atoi(res[2]); free(res[0]); free(res[1]); + free(res[2]); + if (feedid) { + free(*feedid); + *feedid = res[3]; + } else free(res[3]); ret = glite_jp_db_execute(ctx->lock_feed_stmt); lprintf("locked %d feeds (uniqueid=%ld)\n", ret, ctx->param_uniqueid); @@ -575,17 +572,19 @@ int glite_jpis_lockUninitializedFeed(glite_jpis_context_t ctx, long int *uniquei /* Store feed ID and expiration time returned by PS for locked feed. */ -int glite_jpis_initFeed(glite_jpis_context_t ctx, long int uniqueid, char *feedId, time_t feedExpires, int status) +int glite_jpis_initFeed(glite_jpis_context_t ctx, long int uniqueid, const char *feedId, time_t feedExpires, int status) { int ret; + time_t tnow; GLITE_JPIS_PARAM(ctx->param_feedid, ctx->param_feedid_len, feedId); - glite_jp_db_set_time(ctx->param_expires, feedExpires); + tnow = time(NULL); + glite_jp_db_set_time(ctx->param_expires, tnow + (feedExpires - tnow) / 2); ctx->param_uniqueid = uniqueid; ctx->param_state = status; ret = glite_jp_db_execute(ctx->init_feed_stmt); - lprintf("initializing feed, uniqueid=%li, result=%d\n", uniqueid, ret); + lprintf("initializing feed, uniqueid=%ld, result=%d\n", uniqueid, ret); return ret == 1 ? 0 : ENOLCK; } @@ -598,7 +597,7 @@ int glite_jpis_unlockFeed(glite_jpis_context_t ctx, long int uniqueid) { ctx->param_uniqueid = uniqueid; ret = glite_jp_db_execute(ctx->unlock_feed_stmt); - lprintf("unlocking feed, uniqueid=%li, result=%d\n", uniqueid, ret); + lprintf("unlocking feed, uniqueid=%ld, result=%d\n", uniqueid, ret); return ret == 1 ? 0 : ENOLCK; } @@ -606,13 +605,34 @@ int glite_jpis_unlockFeed(glite_jpis_context_t ctx, long int uniqueid) { /* Saves TTL (when to reconnect if error occured) for given feed */ -int glite_jpis_tryReconnectFeed(glite_jpis_context_t ctx, long int uniqueid, time_t reconn_time) { - lprintf("reconnect, un=%ld, %ld\n", uniqueid, reconn_time); - ctx->param_uniqueid = uniqueid; - ctx->param_state = GLITE_JP_IS_STATE_ERROR; - glite_jp_db_set_time(ctx->param_expires, reconn_time); - if (glite_jp_db_execute(ctx->update_error_feed_stmt) == -1) return ctx->jpctx->error->code; - return 0; +int glite_jpis_tryReconnectFeed(glite_jpis_context_t ctx, long int uniqueid, time_t reconn_time, int state) { + int ret; + char *sql, *t; + + t = glite_jp_db_timetodb(reconn_time); + lprintf("reconnect, un=%ld, %s\n", uniqueid, t); + trio_asprintf(&sql, "UPDATE feeds SET state=%d, expires=%s WHERE (uniqueid=%ld)", state, t, uniqueid); + free(t); + if ((ret = glite_jp_db_execstmt(ctx->jpctx, sql, NULL)) != 1) + glite_jpis_stack_error(ctx->jpctx, EIO, "can't update feed no. %ld in DB", uniqueid); + free(sql); + return ret == -1 ? ctx->jpctx->error->code : 0; +} + + +// TODO: could be merged with initFeed +int glite_jpis_destroyTryReconnectFeed(glite_jpis_context_t ctx, long int uniqueid, time_t reconn_time) { + int ret; + char *sql, *t; + + t = glite_jp_db_timetodb(reconn_time); + lprintf("destroy not refreshed feed, un=%ld, %s\n", uniqueid, t); + trio_asprintf(&sql, "UPDATE feeds SET feedid=NULL, state=0, expires=%s WHERE (uniqueid=%ld)", t, uniqueid); + free(t); + if ((ret = glite_jp_db_execstmt(ctx->jpctx, sql, NULL)) != 1) + glite_jpis_stack_error(ctx->jpctx, EIO, "can't destroy non-refreshable feed no. %ld in DB", uniqueid); + free(sql); + return ret == -1 ? ctx->jpctx->error->code : 0; } diff --git a/org.glite.jp.index/src/db_ops.h b/org.glite.jp.index/src/db_ops.h index 9105ba2..19343d8 100644 --- a/org.glite.jp.index/src/db_ops.h +++ b/org.glite.jp.index/src/db_ops.h @@ -32,10 +32,11 @@ int glite_jpis_dropDatabase(glite_jpis_context_t ctx); int glite_jpis_init_db(glite_jpis_context_t isctx); void glite_jpis_free_db(glite_jpis_context_t ctx); -int glite_jpis_lockUninitializedFeed(glite_jpis_context_t ctx, long int *uinqueid, char **PS_URL); -int glite_jpis_initFeed(glite_jpis_context_t ctx, long int uniqueid, char *feedId, time_t feedExpires, int status); +int glite_jpis_lockSearchFeed(glite_jpis_context_t ctx, int initialized, long int *uinqueid, char **PS_URL, int *status, char **feedid); +int glite_jpis_initFeed(glite_jpis_context_t ctx, long int uniqueid, const char *feedId, time_t feedExpires, int status); int glite_jpis_unlockFeed(glite_jpis_context_t ctx, long int uniqueid); -int glite_jpis_tryReconnectFeed(glite_jpis_context_t ctx, long int uniqueid, time_t reconn_time); +int glite_jpis_tryReconnectFeed(glite_jpis_context_t ctx, long int uniqueid, time_t reconn_time, int state); +int glite_jpis_destroyTryReconnectFeed(glite_jpis_context_t ctx, long int uniqueid, time_t reconn_time); int glite_jpis_insertAttrVal(glite_jpis_context_t ctx, const char *jobid, glite_jp_attrval_t *av); diff --git a/org.glite.jp.index/src/soap_ps_calls.c b/org.glite.jp.index/src/soap_ps_calls.c index 3cf4d7c..47e5db4 100644 --- a/org.glite.jp.index/src/soap_ps_calls.c +++ b/org.glite.jp.index/src/soap_ps_calls.c @@ -6,6 +6,7 @@ #include "soap_version.h" #include "glite/jp/types.h" #include "glite/jp/context.h" +#include "glite/security/glite_gss.h" #include "glite/security/glite_gsplugin.h" #include "glite/security/glite_gscompat.h" @@ -15,6 +16,7 @@ #include "db_ops.h" #include "ws_ps_typeref.h" #include "context.h" +#include "common.h" #include "stdsoap2.h" @@ -46,8 +48,32 @@ static int find_dest_index(glite_jp_is_conf *conf, long int uniqueid) } +static int refresh_gsoap(glite_jpis_context_t ctx, struct soap *soap) { + gss_cred_id_t cred; + edg_wll_GssStatus gss_code; + char *et; + // preventive very long timeout + static const struct timeval to = {tv_sec: 7200, tv_usec: 0}; + glite_gsplugin_Context plugin_ctx; + + if (edg_wll_gss_acquire_cred_gsi(ctx->conf->server_cert, ctx->conf->server_key, &cred, NULL, &gss_code) != 0) { + edg_wll_gss_get_error(&gss_code,"",&et); + glite_jpis_stack_error(ctx->jpctx, EINVAL, "can't refresh certificates (%s)", et); + free(et); + return EINVAL; + //printf("[%d] %s: %s\n", getpid(), __FUNCTION__, err.desc); + } + + plugin_ctx = glite_gsplugin_get_context(soap); + glite_gsplugin_set_timeout(plugin_ctx, &to); + glite_gsplugin_set_credential(plugin_ctx, cred); + + return 0; +} + + // call PS FeedIndex for a given destination -int MyFeedIndex(glite_jpis_context_t ctx, glite_jp_is_conf *conf, long int uniqueid, char *dest) +int MyFeedIndex(struct soap *soap, glite_jpis_context_t ctx, long int uniqueid, const char *dest) { struct _jpelem__FeedIndex in; struct _jpelem__FeedIndexResponse out; @@ -55,54 +81,31 @@ int MyFeedIndex(glite_jpis_context_t ctx, glite_jp_is_conf *conf, long int uniqu // struct jptype__stringOrBlob value; // struct xsd__base64Binary blob; int i, dest_index, status; - struct soap *soap = soap_new(); - glite_gsplugin_Context plugin_ctx; - gss_cred_id_t cred; - glite_jp_error_t err; - char *src, *desc = NULL; - // preventive very long timeout - static const struct timeval to = {tv_sec: 7200, tv_usec: 0}; + glite_jp_is_conf *conf = ctx->conf; lprintf("(%ld) for %s called\n", uniqueid, dest); - glite_gsplugin_init_context(&plugin_ctx); - glite_gsplugin_set_timeout(plugin_ctx, &to); - if (edg_wll_gss_acquire_cred_gsi(ctx->conf->server_cert, ctx->conf->server_key, &cred, NULL, NULL) != 0) { - - err.code = EINVAL; - err.desc = "can't set credentials"; - asprintf(&src, "%s/%s():%d", __FILE__, __FUNCTION__, __LINE__); - fprintf(stderr, "%s\n", src); - goto err; - } - glite_gsplugin_set_credential(plugin_ctx, cred); - - soap_init(soap); - soap_set_namespaces(soap, jp__namespaces); - soap_set_omode(soap, SOAP_IO_BUFFER); // set buffered response - // buffer set to SOAP_BUFLEN (default = 8k) - soap_register_plugin_arg(soap,glite_gsplugin,plugin_ctx); + if (refresh_gsoap(ctx, soap) != 0) + return glite_jpis_stack_error(ctx->jpctx, EINVAL, "can't refresh credentials"); memset(&in, 0, sizeof(in)); - memset(&err, 0, sizeof(err)); for (i=0; conf->attrs[i]; i++) ; in.__sizeattributes = i; in.attributes = conf->attrs; - if ((dest_index = find_dest_index(conf, uniqueid)) < 0) goto err; + if ((dest_index = find_dest_index(conf, uniqueid)) < 0) + return glite_jpis_stack_error(ctx->jpctx, EINVAL, "internal error (feed index %ld not found)", uniqueid); + soap_begin(soap); for (i=0; conf->feeds[dest_index]->query[i].attr; i++); GLITE_SECURITY_GSOAP_LIST_CREATE(soap, &in, conditions, struct jptype__primaryQuery, i); for (i=0; conf->feeds[dest_index]->query[i].attr; i++) { if (glite_jpis_QueryCondToSoap(soap, &conf->feeds[dest_index]->query[i], GLITE_SECURITY_GSOAP_LIST_GET(in.conditions, i)) != SOAP_OK) { - err.code = EINVAL; - err.desc = "error during conds conversion"; - asprintf(&src, "%s/%s():%d", __FILE__, __FUNCTION__, __LINE__); - fprintf(stderr, "%s\n", src); - goto err; + soap_end(soap); + return glite_jpis_stack_error(ctx->jpctx, EINVAL, "error during conds conversion"); } } @@ -114,35 +117,54 @@ int MyFeedIndex(glite_jpis_context_t ctx, glite_jp_is_conf *conf, long int uniqu if (check_fault(soap,soap_call___jpsrv__FeedIndex(soap,dest,"", &in, &out)) != 0) { fprintf(stderr, "\n"); glite_jpis_unlockFeed(ctx, uniqueid); - err.code = EIO; - asprintf(&desc, "soap_call___jpsrv__FeedIndex() returned error %d", soap->error); - err.desc = desc; - asprintf(&src, "%s/%s():%d", __FILE__, __FUNCTION__, __LINE__); - fprintf(stderr, "%s\n", err.desc); - goto err; + glite_jpis_stack_error(ctx->jpctx, EIO, "soap_call___jpsrv__FeedIndex() returned error %d", soap->error); + soap_end(soap); + return EIO; } else { status = (conf->feeds[dest_index]->history ? GLITE_JP_IS_STATE_HIST : 0) | (conf->feeds[dest_index]->continuous ? GLITE_JP_IS_STATE_CONT : 0); lprintf("(%ld) FeedId: %s\n", uniqueid, out.feedId); lprintf("(%ld) Expires: %s", uniqueid, ctime(&out.feedExpires)); - glite_jpis_initFeed(ctx, uniqueid, out.feedId, out.feedExpires, status); + glite_jpis_initFeed(ctx, uniqueid, out.feedId, time(NULL) + (out.feedExpires - time(NULL)) / 2, status); glite_jpis_unlockFeed(ctx, uniqueid); } soap_end(soap); - soap_done(soap); return 0; +} -err: - err.source = src; - glite_jp_stack_error(ctx->jpctx, &err); - free(src); - free(desc); - soap_end(soap); - soap_done(soap); - return err.code; +int MyFeedRefresh(struct soap *soap, glite_jpis_context_t ctx, long int uniqueid, const char *dest, int status, const char *feedid) +{ + struct _jpelem__FeedIndexRefresh in; + struct _jpelem__FeedIndexRefreshResponse out; + + lprintf("(%ld) for %s called, status = %d\n", uniqueid, feedid, status); + + if (refresh_gsoap(ctx, soap) != 0) + return glite_jpis_stack_error(ctx->jpctx, EINVAL, "can't refresh credentials"); + + soap_begin(soap); + memset(&in, 0, sizeof(in)); + in.feedId = soap_strdup(soap, feedid); + if (check_fault(soap,soap_call___jpsrv__FeedIndexRefresh(soap,dest,"", &in, &out)) != 0) { + fprintf(stderr, "\n"); + glite_jpis_unlockFeed(ctx, uniqueid); + glite_jpis_stack_error(ctx->jpctx, EIO, "soap_call___jpsrv__FeedRefresh() returned error %d", soap->error); + soap_end(soap); + return EIO; + } + else { + status &= (~GLITE_JP_IS_STATE_ERROR); + lprintf("(%ld) FeedId: %s\n", uniqueid, feedid); + lprintf("(%ld) Expires: %s", uniqueid, ctime(&out.feedExpires)); + glite_jpis_initFeed(ctx, uniqueid, feedid, time(NULL) + (out.feedExpires - time(NULL)) / 2, status); + glite_jpis_unlockFeed(ctx, uniqueid); + } + + soap_end(soap); + return 0; } diff --git a/org.glite.jp.index/src/soap_ps_calls.h b/org.glite.jp.index/src/soap_ps_calls.h index cf167ef..070fd4f 100644 --- a/org.glite.jp.index/src/soap_ps_calls.h +++ b/org.glite.jp.index/src/soap_ps_calls.h @@ -6,6 +6,7 @@ #include "context.h" #include "conf.h" -int MyFeedIndex(glite_jpis_context_t ctx, glite_jp_is_conf *conf, long int uniqueid, char *dest); +int MyFeedIndex(struct soap *soap, glite_jpis_context_t ctx, long int uniqueid, const char *dest); +int MyFeedRefresh(struct soap *soap, glite_jpis_context_t ctx, long int uniqueid, const char *dest, int status, const char *feedid); #endif -- 1.8.2.3