From eaf0df11be8fc4efb3dae4029913fd2bdee508ac Mon Sep 17 00:00:00 2001 From: =?utf8?q?Franti=C5=A1ek=20Dvo=C5=99=C3=A1k?= Date: Tue, 24 Apr 2007 13:38:51 +0000 Subject: [PATCH] Fixed queries for JP PS (and unused DB content), simplified conf structure. --- org.glite.jp.index/src/bones_server.c | 49 +++++++++++++++++++++++++++ org.glite.jp.index/src/conf.c | 7 ++-- org.glite.jp.index/src/conf.h | 2 +- org.glite.jp.index/src/db_ops.c | 60 ++++++++++++++++------------------ org.glite.jp.index/src/soap_ps_calls.c | 10 ++++-- org.glite.jp.index/src/ws_is_typeref.c | 13 ++++---- org.glite.jp.index/src/ws_is_typeref.h | 4 ++- 7 files changed, 97 insertions(+), 48 deletions(-) diff --git a/org.glite.jp.index/src/bones_server.c b/org.glite.jp.index/src/bones_server.c index 5f81290..1c95143 100644 --- a/org.glite.jp.index/src/bones_server.c +++ b/org.glite.jp.index/src/bones_server.c @@ -37,6 +37,8 @@ #define RECONNECT_TIME 60*20 // when try reconnect to PS in case of error (in sec) #define RECONNECT_TIME_QUICK 1 // time between feed requests +#define REACTION_TIME 60*2 // when try reconnect to PS in case of new feeds (in sec) +#define LAUNCH_TIME 2 // wait (for starting slaves) before requesting feeds extern SOAP_NMAC struct Namespace jp__namespaces[],jpps__namespaces[]; @@ -46,6 +48,9 @@ int request(int,struct timeval *,void *); static int reject(int); static int disconn(int,struct timeval *,void *); int data_init(void **data); +#ifndef ONETIME_FEEDS +int feed_loop_slave(void); +#endif static struct glite_srvbones_service stab = { @@ -177,6 +182,11 @@ int main(int argc, char *argv[]) fprintf(stderr, "WARNING: %d slaves can be too low for %d feeds\n", conf->slaves, nfeeds); } glite_srvbones_set_param(GLITE_SBPARAM_SLAVES_COUNT, conf->slaves); +#ifndef ONETIME_FEEDS + if (feed_loop_slave() < 0) { + fprintf(stderr, "forking feed_loop_slave failed!\n"); + } else +#endif glite_srvbones_run(data_init,&stab,1 /* XXX: entries in stab */,debug); glite_jpis_free_db(isctx); @@ -228,6 +238,40 @@ int feed_caller(glite_jpis_context_t isctx, glite_jp_is_conf *conf) { } +#ifndef ONETIME_FEEDS +int feed_loop_slave(void) { + pid_t pid; + glite_jpis_context_t isctx; + + if ( (pid = fork()) ) return pid; + + glite_jpis_init_context(&isctx, ctx, conf); + if (glite_jpis_init_db(isctx) != 0) { + printf("[%d] %s: DB error: %s (%s)\n", getpid(), __FUNCTION__, ctx->error->desc, ctx->error->source); + exit(1); + } + + printf("[%d] %s: waiting before feed requests...\n", getpid(), __FUNCTION__); + sleep(LAUNCH_TIME); + printf("[%d] %s: feeder slave started\n", getpid(), __FUNCTION__); + do { + switch (feed_caller(isctx, conf)) { + case 1: break; + case 0: + sleep(REACTION_TIME); + break; + default: + printf("[%d] %s: feed locking error, slave terminated\n", getpid(), __FUNCTION__); + exit(1); + } + } while (1); + + printf("[%d] %s: slave terminated\n", getpid(), __FUNCTION__); + exit(0); +} +#endif + + /* slave's init comes here */ int data_init(void **data) { @@ -243,6 +287,7 @@ int data_init(void **data) printf("[%d] slave started\n",getpid()); private->soap = soap_new(); +#if ONETIME_FEEDS /* ask PS server for data */ do { switch (feed_caller(private->ctx, conf)) { @@ -260,6 +305,10 @@ int data_init(void **data) return -1; } } while (1); +#else + *data = (void *) private; + return 0; +#endif } diff --git a/org.glite.jp.index/src/conf.c b/org.glite.jp.index/src/conf.c index 0600bdb..767a120 100644 --- a/org.glite.jp.index/src/conf.c +++ b/org.glite.jp.index/src/conf.c @@ -121,7 +121,7 @@ int glite_jp_get_conf(int argc, char **argv, glite_jp_is_conf **configuration) void glite_jp_free_conf(glite_jp_is_conf *conf) { - size_t i, j, k; + size_t i, j; glite_jp_is_feed *feed; if (!conf) return; @@ -132,10 +132,7 @@ void glite_jp_free_conf(glite_jp_is_conf *conf) if (conf->feeds) for (i = 0; conf->feeds[i]; i++) { feed = conf->feeds[i]; free(feed->PS_URL); - for (j = 0; feed->query[j]; j++) { - for (k = 0; feed->query[j][k].attr; k++) glite_jp_free_query_rec(&feed->query[j][k]); - free(feed->query[j]); - } + for (j = 0; feed->query[j].attr; j++) glite_jp_free_query_rec(&feed->query[j]); free(feed->query); free(feed); } diff --git a/org.glite.jp.index/src/conf.h b/org.glite.jp.index/src/conf.h index cfdf588..4c64a79 100644 --- a/org.glite.jp.index/src/conf.h +++ b/org.glite.jp.index/src/conf.h @@ -24,7 +24,7 @@ typedef struct _glite_jp_is_feed { char *PS_URL; //URLs of Primary Storage servers - glite_jp_query_rec_t **query; // query to Primary Server (aka filter) + glite_jp_query_rec_t *query; // query to Primary Server (aka filter) int history, // type of query continuous; long int uniqueid; // internal ID diff --git a/org.glite.jp.index/src/db_ops.c b/org.glite.jp.index/src/db_ops.c index d8bd345..798723f 100644 --- a/org.glite.jp.index/src/db_ops.c +++ b/org.glite.jp.index/src/db_ops.c @@ -54,7 +54,7 @@ #define COND_MAGIC 0x444E4F43 -static int glite_jpis_db_queries_deserialize(glite_jp_query_rec_t ***queries, void *blob, size_t blob_size) UNUSED; +static int glite_jpis_db_queries_deserialize(glite_jp_query_rec_t **queries, void *blob, size_t blob_size) UNUSED; static int is_indexed(glite_jp_is_conf *conf, const char *attr) { @@ -151,14 +151,14 @@ static void *array_get(void **data, size_t data_len) { } -static int glite_jpis_db_queries_serialize(glite_jpis_context_t isctx, void **blob, size_t *len, glite_jp_query_rec_t **queries) { +static int glite_jpis_db_queries_serialize(glite_jpis_context_t isctx, void **blob, size_t *len, glite_jp_query_rec_t *queries) { size_t maxlen; glite_jp_query_rec_t *query; int ret; size_t datalen; if ((ret = array_init(blob, len, &maxlen, 1024)) != 0) return ret; - query = queries ? *queries : NULL; + query = queries; while(query && query->attr) { if ((ret = array_add_long(blob, len, &maxlen, COND_MAGIC)) != 0) goto fail; datalen = strlen(query->attr) + 1; @@ -176,7 +176,7 @@ static int glite_jpis_db_queries_serialize(glite_jpis_context_t isctx, void **bl if ((ret = array_add_long(blob, len, &maxlen, datalen)) != 0) goto fail; if (datalen) if ((ret = array_add(blob, len, &maxlen, query->value2, datalen)) != 0) goto fail; - + query++; } @@ -188,20 +188,20 @@ fail: } -static int glite_jpis_db_queries_deserialize(glite_jp_query_rec_t ***queries, void *blob, size_t blob_size) { +static int glite_jpis_db_queries_deserialize(glite_jp_query_rec_t **queries, void *blob, size_t blob_size) { size_t maxlen, len, datalen; void *blob_ptr, *blob_end; int ret; uint32_t l; - glite_jp_query_rec_t *query; + glite_jp_query_rec_t query; int i; - if ((ret = array_init((void **)queries, &len, &maxlen, 512)) != 0) return ret; + if ((ret = array_init((void *)queries, &len, &maxlen, 512)) != 0) return ret; blob_ptr = blob; blob_end = (char *)blob + blob_size; while (blob_end > blob_ptr) { ret = ENOMEM; - if ((query = calloc(sizeof(*query), 1)) == NULL) goto fail; + memset(&query, 0, sizeof query); l = array_get_long(&blob_ptr); if (l != COND_MAGIC) { lprintf("blob=%p, blob_ptr=%p, 0x%08" PRIX32 "\n", blob, blob_ptr, l); @@ -211,48 +211,44 @@ static int glite_jpis_db_queries_deserialize(glite_jp_query_rec_t ***queries, vo datalen = array_get_long(&blob_ptr); if (datalen) { - if ((query->attr = malloc(datalen)) == NULL) goto fail_query; - memcpy(query->attr, array_get(&blob_ptr, datalen), datalen); - } else query->attr = NULL; + if ((query.attr = malloc(datalen)) == NULL) goto fail_query; + memcpy(query.attr, array_get(&blob_ptr, datalen), datalen); + } else query.attr = NULL; - query->op = array_get_long(&blob_ptr); - query->binary = array_get_long(&blob_ptr); + query.op = array_get_long(&blob_ptr); + query.binary = array_get_long(&blob_ptr); datalen = array_get_long(&blob_ptr); if (datalen) { - if ((query->value = malloc(datalen)) == NULL) goto fail_query; - memcpy(query->value, array_get(&blob_ptr, datalen), datalen); - } else query->value = NULL; - query->size = datalen; + if ((query.value = malloc(datalen)) == NULL) goto fail_query; + memcpy(query.value, array_get(&blob_ptr, datalen), datalen); + } else query.value = NULL; + query.size = datalen; datalen = array_get_long(&blob_ptr); if (datalen) { - if ((query->value2 = malloc(datalen)) == NULL) goto fail_query; - memcpy(query->value2, array_get(&blob_ptr, datalen), datalen); - } else query->value2 = NULL; - query->size2 = datalen; + if ((query.value2 = malloc(datalen)) == NULL) goto fail_query; + memcpy(query.value2, array_get(&blob_ptr, datalen), datalen); + } else query.value2 = NULL; + query.size2 = datalen; - if ((ret = array_add((void **)queries, &len, &maxlen, &query, sizeof(query))) != 0) goto fail_query; + if ((ret = array_add((void *)queries, &len, &maxlen, &query, sizeof(query))) != 0) goto fail_query; } assert(blob_end == blob_ptr); - query = NULL; - if ((ret = array_add((void **)queries, &len, &maxlen, &query, sizeof(query))) != 0) goto fail; + memset(&query, 0, sizeof query); + if ((ret = array_add((void *)queries, &len, &maxlen, &query, sizeof(query))) != 0) goto fail; return 0; fail_query: - free(query); fail: i = 0; - query = (*queries)[i]; - while (query && query->attr) { - free(query->attr); - free(query->value); - free(query->value2); - free(query); + while ((*queries)[i].attr) { + free((*queries)[i].attr); + free((*queries)[i].value); + free((*queries)[i].value2); i++; - query = (*queries)[i]; } free(*queries); return ret; diff --git a/org.glite.jp.index/src/soap_ps_calls.c b/org.glite.jp.index/src/soap_ps_calls.c index 2c77271..0f9eed2 100644 --- a/org.glite.jp.index/src/soap_ps_calls.c +++ b/org.glite.jp.index/src/soap_ps_calls.c @@ -1,3 +1,4 @@ +#include #include #include #include @@ -58,10 +59,13 @@ int MyFeedIndex(glite_jpis_context_t ctx, glite_jp_is_conf *conf, long int uniqu glite_gsplugin_Context plugin_ctx; glite_jp_error_t err; char *src, *desc = NULL; + // preventive very long timeout + static const struct timeval to = {tv_sec: 7200, tv_usec: 0}; lprintf("(%ld) for %s called\n", uniqueid, dest); glite_gsplugin_init_context(&plugin_ctx); + glite_gsplugin_set_timeout(plugin_ctx, &to); if (ctx->conf->server_key) plugin_ctx->key_filename = strdup(ctx->conf->server_key); if (ctx->conf->server_cert) plugin_ctx->cert_filename = strdup(ctx->conf->server_cert); @@ -80,11 +84,11 @@ int MyFeedIndex(glite_jpis_context_t ctx, glite_jp_is_conf *conf, long int uniqu if ((dest_index = find_dest_index(conf, uniqueid)) < 0) goto err; - for (i=0; conf->feeds[dest_index]->query[i]; i++); + for (i=0; conf->feeds[dest_index]->query[i].attr; i++); GLITE_SECURITY_GSOAP_LIST_CREATE(soap, &in, conditions, struct jptype__primaryQuery, i); - for (i=0; conf->feeds[dest_index]->query[i]; i++) { - if (glite_jpis_QueryCondToSoap(soap, conf->feeds[dest_index]->query[i], + for (i=0; conf->feeds[dest_index]->query[i].attr; i++) { + if (glite_jpis_QueryCondToSoap(soap, &conf->feeds[dest_index]->query[i], GLITE_SECURITY_GSOAP_LIST_GET(in.conditions, i)) != SOAP_OK) { err.code = EINVAL; err.desc = "error during conds conversion"; diff --git a/org.glite.jp.index/src/ws_is_typeref.c b/org.glite.jp.index/src/ws_is_typeref.c index 42afedb..17e315e 100644 --- a/org.glite.jp.index/src/ws_is_typeref.c +++ b/org.glite.jp.index/src/ws_is_typeref.c @@ -75,6 +75,7 @@ static int SoapToQueryRecordVal( } +#if 0 static int SoapToQueryCond( struct jptype__indexQuery *in, glite_jp_query_rec_t **out) @@ -143,17 +144,18 @@ int glite_jpis_SoapToQueryConds( return 0; } +#endif static int SoapToPrimaryQueryCond( struct jptype__primaryQuery *in, - glite_jp_query_rec_t **out) + glite_jp_query_rec_t *out) { glite_jp_query_rec_t *qr; assert(in && in->attr); assert(out); - *out = qr = calloc(2, sizeof(*qr)); + qr = out; qr[0].attr = strdup(in->attr); glite_jpis_SoapToQueryOp(in->op, &(qr[0].op)); @@ -169,7 +171,6 @@ static int SoapToPrimaryQueryCond( default: if ( SoapToQueryRecordVal(in->value, &(qr[0].binary), &(qr[0].size), &(qr[0].value)) ) { - *out = NULL; return 1; } break; @@ -191,16 +192,16 @@ static int SoapToPrimaryQueryCond( int glite_jpis_SoapToPrimaryQueryConds( int size, GLITE_SECURITY_GSOAP_LIST_TYPE(jptype, primaryQuery) in, - glite_jp_query_rec_t ***out) + glite_jp_query_rec_t **out) { - glite_jp_query_rec_t **qr; + glite_jp_query_rec_t *qr; int i; assert(in || !size); assert(out); qr = calloc(size+1, sizeof(*qr)); for (i=0; i