Fixed queries for JP PS (and unused DB content), simplified conf structure.
authorFrantišek Dvořák <valtri@civ.zcu.cz>
Tue, 24 Apr 2007 13:38:51 +0000 (13:38 +0000)
committerFrantišek Dvořák <valtri@civ.zcu.cz>
Tue, 24 Apr 2007 13:38:51 +0000 (13:38 +0000)
org.glite.jp.index/src/bones_server.c
org.glite.jp.index/src/conf.c
org.glite.jp.index/src/conf.h
org.glite.jp.index/src/db_ops.c
org.glite.jp.index/src/soap_ps_calls.c
org.glite.jp.index/src/ws_is_typeref.c
org.glite.jp.index/src/ws_is_typeref.h

index 5f81290..1c95143 100644 (file)
@@ -37,6 +37,8 @@
 
 #define RECONNECT_TIME         60*20   // when try reconnect to PS in case of error (in sec)
 #define RECONNECT_TIME_QUICK   1       // time between feed requests
+#define REACTION_TIME          60*2    // when try reconnect to PS in case of new feeds (in sec)
+#define LAUNCH_TIME            2       // wait (for starting slaves) before requesting feeds
 
 
 extern SOAP_NMAC struct Namespace jp__namespaces[],jpps__namespaces[];
@@ -46,6 +48,9 @@ int request(int,struct timeval *,void *);
 static int reject(int);
 static int disconn(int,struct timeval *,void *);
 int data_init(void **data);
+#ifndef ONETIME_FEEDS
+int feed_loop_slave(void);
+#endif
 
 
 static struct glite_srvbones_service stab = {
@@ -177,6 +182,11 @@ int main(int argc, char *argv[])
                fprintf(stderr, "WARNING: %d slaves can be too low for %d feeds\n", conf->slaves, nfeeds);
        }
        glite_srvbones_set_param(GLITE_SBPARAM_SLAVES_COUNT, conf->slaves);
+#ifndef ONETIME_FEEDS
+       if (feed_loop_slave() < 0) {
+               fprintf(stderr, "forking feed_loop_slave failed!\n");
+       } else
+#endif
        glite_srvbones_run(data_init,&stab,1 /* XXX: entries in stab */,debug);
 
        glite_jpis_free_db(isctx);
@@ -228,6 +238,40 @@ int feed_caller(glite_jpis_context_t isctx, glite_jp_is_conf *conf) {
 }
 
 
+#ifndef ONETIME_FEEDS
+int feed_loop_slave(void) {
+       pid_t pid;
+       glite_jpis_context_t isctx;
+
+       if ( (pid = fork()) ) return pid;
+
+       glite_jpis_init_context(&isctx, ctx, conf);
+       if (glite_jpis_init_db(isctx) != 0) {
+               printf("[%d] %s: DB error: %s (%s)\n", getpid(), __FUNCTION__, ctx->error->desc, ctx->error->source);
+               exit(1);
+       }
+
+       printf("[%d] %s: waiting before feed requests...\n", getpid(), __FUNCTION__);
+       sleep(LAUNCH_TIME);
+       printf("[%d] %s: feeder slave started\n", getpid(), __FUNCTION__);
+       do {
+               switch (feed_caller(isctx, conf)) {
+                       case 1: break;
+                       case 0:
+                               sleep(REACTION_TIME);
+                               break;
+                       default:
+                               printf("[%d] %s: feed locking error, slave terminated\n", getpid(), __FUNCTION__);
+                               exit(1);
+               }
+       } while (1);
+
+       printf("[%d] %s: slave terminated\n", getpid(), __FUNCTION__);
+       exit(0);
+}
+#endif
+
+
 /* slave's init comes here */  
 int data_init(void **data)
 {
@@ -243,6 +287,7 @@ int data_init(void **data)
        printf("[%d] slave started\n",getpid());
        private->soap = soap_new();
 
+#if ONETIME_FEEDS
        /* ask PS server for data */
        do {
                switch (feed_caller(private->ctx, conf)) {
@@ -260,6 +305,10 @@ int data_init(void **data)
                                return -1;
                }
        } while (1);
+#else
+       *data = (void *) private;
+       return 0;
+#endif
 }
 
 
index 0600bdb..767a120 100644 (file)
@@ -121,7 +121,7 @@ int glite_jp_get_conf(int argc, char **argv, glite_jp_is_conf **configuration)
 
 void glite_jp_free_conf(glite_jp_is_conf *conf)
 {
-       size_t i, j, k;
+       size_t i, j;
        glite_jp_is_feed *feed;
 
        if (!conf) return;
@@ -132,10 +132,7 @@ void glite_jp_free_conf(glite_jp_is_conf *conf)
        if (conf->feeds) for (i = 0; conf->feeds[i]; i++) {
                feed = conf->feeds[i];
                free(feed->PS_URL);
-               for (j = 0; feed->query[j]; j++) {
-                       for (k = 0; feed->query[j][k].attr; k++) glite_jp_free_query_rec(&feed->query[j][k]);
-                       free(feed->query[j]);
-               }
+               for (j = 0; feed->query[j].attr; j++) glite_jp_free_query_rec(&feed->query[j]);
                free(feed->query);
                free(feed);
        }
index cfdf588..4c64a79 100644 (file)
@@ -24,7 +24,7 @@
 
 typedef struct _glite_jp_is_feed {
        char                    *PS_URL;        //URLs of Primary Storage servers
-       glite_jp_query_rec_t    **query;        // query to Primary Server (aka filter)
+       glite_jp_query_rec_t    *query;         // query to Primary Server (aka filter)
        int                     history,        // type of query
                                continuous;
        long int                uniqueid;       // internal ID
index d8bd345..798723f 100644 (file)
@@ -54,7 +54,7 @@
 #define COND_MAGIC 0x444E4F43
 
 
-static int glite_jpis_db_queries_deserialize(glite_jp_query_rec_t ***queries, void *blob, size_t blob_size) UNUSED;
+static int glite_jpis_db_queries_deserialize(glite_jp_query_rec_t **queries, void *blob, size_t blob_size) UNUSED;
 
 
 static int is_indexed(glite_jp_is_conf *conf, const char *attr) {
@@ -151,14 +151,14 @@ static void *array_get(void **data, size_t data_len) {
 }
 
 
-static int glite_jpis_db_queries_serialize(glite_jpis_context_t isctx, void **blob, size_t *len, glite_jp_query_rec_t **queries) {
+static int glite_jpis_db_queries_serialize(glite_jpis_context_t isctx, void **blob, size_t *len, glite_jp_query_rec_t *queries) {
        size_t maxlen;
        glite_jp_query_rec_t *query;
        int ret;
        size_t datalen;
 
        if ((ret = array_init(blob, len, &maxlen, 1024)) != 0) return ret;
-       query = queries ? *queries : NULL;
+       query = queries;
        while(query && query->attr) {
                if ((ret = array_add_long(blob, len, &maxlen, COND_MAGIC)) != 0) goto fail;
                datalen = strlen(query->attr) + 1;
@@ -176,7 +176,7 @@ static int glite_jpis_db_queries_serialize(glite_jpis_context_t isctx, void **bl
                if ((ret = array_add_long(blob, len, &maxlen, datalen)) != 0) goto fail;
                if (datalen)
                        if ((ret = array_add(blob, len, &maxlen, query->value2, datalen)) != 0) goto fail;
-               
+
                query++;
        }
 
@@ -188,20 +188,20 @@ fail:
 }
 
 
-static int glite_jpis_db_queries_deserialize(glite_jp_query_rec_t ***queries, void *blob, size_t blob_size) {
+static int glite_jpis_db_queries_deserialize(glite_jp_query_rec_t **queries, void *blob, size_t blob_size) {
        size_t maxlen, len, datalen;
        void *blob_ptr, *blob_end;
        int ret;
        uint32_t l;
-       glite_jp_query_rec_t *query;
+       glite_jp_query_rec_t query;
        int i;
 
-       if ((ret = array_init((void **)queries, &len, &maxlen, 512)) != 0) return ret;
+       if ((ret = array_init((void *)queries, &len, &maxlen, 512)) != 0) return ret;
        blob_ptr = blob;
        blob_end = (char *)blob + blob_size;
        while (blob_end > blob_ptr) {
                ret = ENOMEM;
-               if ((query = calloc(sizeof(*query), 1)) == NULL) goto fail;
+               memset(&query, 0, sizeof query);
                l = array_get_long(&blob_ptr);
                if (l != COND_MAGIC) {
                        lprintf("blob=%p, blob_ptr=%p, 0x%08" PRIX32 "\n", blob, blob_ptr, l);
@@ -211,48 +211,44 @@ static int glite_jpis_db_queries_deserialize(glite_jp_query_rec_t ***queries, vo
 
                datalen = array_get_long(&blob_ptr);
                if (datalen) {
-                       if ((query->attr = malloc(datalen)) == NULL) goto fail_query;
-                       memcpy(query->attr, array_get(&blob_ptr, datalen), datalen);
-               } else query->attr = NULL;
+                       if ((query.attr = malloc(datalen)) == NULL) goto fail_query;
+                       memcpy(query.attr, array_get(&blob_ptr, datalen), datalen);
+               } else query.attr = NULL;
 
-               query->op = array_get_long(&blob_ptr);
-               query->binary = array_get_long(&blob_ptr);
+               query.op = array_get_long(&blob_ptr);
+               query.binary = array_get_long(&blob_ptr);
 
                datalen = array_get_long(&blob_ptr);
                if (datalen) {
-                       if ((query->value = malloc(datalen)) == NULL) goto fail_query;
-                       memcpy(query->value, array_get(&blob_ptr, datalen), datalen);
-               } else query->value = NULL;
-               query->size = datalen;
+                       if ((query.value = malloc(datalen)) == NULL) goto fail_query;
+                       memcpy(query.value, array_get(&blob_ptr, datalen), datalen);
+               } else query.value = NULL;
+               query.size = datalen;
 
                datalen = array_get_long(&blob_ptr);
                if (datalen) {
-                       if ((query->value2 = malloc(datalen)) == NULL) goto fail_query;
-                       memcpy(query->value2, array_get(&blob_ptr, datalen), datalen);
-               } else query->value2 = NULL;
-               query->size2 = datalen;
+                       if ((query.value2 = malloc(datalen)) == NULL) goto fail_query;
+                       memcpy(query.value2, array_get(&blob_ptr, datalen), datalen);
+               } else query.value2 = NULL;
+               query.size2 = datalen;
 
-               if ((ret = array_add((void **)queries, &len, &maxlen, &query, sizeof(query))) != 0) goto fail_query;
+               if ((ret = array_add((void *)queries, &len, &maxlen, &query, sizeof(query))) != 0) goto fail_query;
        }
        assert(blob_end == blob_ptr);
 
-       query = NULL;
-       if ((ret = array_add((void **)queries, &len, &maxlen, &query, sizeof(query))) != 0) goto fail;
+       memset(&query, 0, sizeof query);
+       if ((ret = array_add((void *)queries, &len, &maxlen, &query, sizeof(query))) != 0) goto fail;
 
        return 0;
 
 fail_query:
-       free(query);
 fail:
        i = 0;
-       query = (*queries)[i];
-       while (query && query->attr) {
-               free(query->attr);
-               free(query->value);
-               free(query->value2);
-               free(query);
+       while ((*queries)[i].attr) {
+               free((*queries)[i].attr);
+               free((*queries)[i].value);
+               free((*queries)[i].value2);
                i++;
-               query = (*queries)[i];
        }
        free(*queries);
        return ret;
index 2c77271..0f9eed2 100644 (file)
@@ -1,3 +1,4 @@
+#include <sys/time.h>
 #include <stdio.h>
 #include <fcntl.h>
 #include <assert.h>
@@ -58,10 +59,13 @@ int MyFeedIndex(glite_jpis_context_t ctx, glite_jp_is_conf *conf, long int uniqu
        glite_gsplugin_Context                  plugin_ctx;
        glite_jp_error_t err;
        char *src, *desc = NULL;
+       // preventive very long timeout
+       static const struct timeval to = {tv_sec: 7200, tv_usec: 0};
 
        lprintf("(%ld) for %s called\n", uniqueid, dest);
 
        glite_gsplugin_init_context(&plugin_ctx);
+       glite_gsplugin_set_timeout(plugin_ctx, &to);
        if (ctx->conf->server_key) plugin_ctx->key_filename = strdup(ctx->conf->server_key);
        if (ctx->conf->server_cert) plugin_ctx->cert_filename = strdup(ctx->conf->server_cert);
        
@@ -80,11 +84,11 @@ int MyFeedIndex(glite_jpis_context_t ctx, glite_jp_is_conf *conf, long int uniqu
 
        if ((dest_index = find_dest_index(conf, uniqueid)) < 0) goto err;
 
-       for (i=0; conf->feeds[dest_index]->query[i]; i++);
+       for (i=0; conf->feeds[dest_index]->query[i].attr; i++);
        GLITE_SECURITY_GSOAP_LIST_CREATE(soap, &in, conditions, struct jptype__primaryQuery, i);
 
-       for (i=0; conf->feeds[dest_index]->query[i]; i++) {
-               if (glite_jpis_QueryCondToSoap(soap, conf->feeds[dest_index]->query[i], 
+       for (i=0; conf->feeds[dest_index]->query[i].attr; i++) {
+               if (glite_jpis_QueryCondToSoap(soap, &conf->feeds[dest_index]->query[i], 
                                GLITE_SECURITY_GSOAP_LIST_GET(in.conditions, i)) != SOAP_OK) {
                        err.code = EINVAL;
                        err.desc = "error during conds conversion";
index 42afedb..17e315e 100644 (file)
@@ -75,6 +75,7 @@ static int SoapToQueryRecordVal(
 }
 
 
+#if 0
 static int SoapToQueryCond(
         struct jptype__indexQuery      *in,
         glite_jp_query_rec_t           **out)
@@ -143,17 +144,18 @@ int glite_jpis_SoapToQueryConds(
        
        return 0;
 }
+#endif
 
 
 static int SoapToPrimaryQueryCond(
         struct jptype__primaryQuery    *in,
-        glite_jp_query_rec_t           **out)
+        glite_jp_query_rec_t           *out)
 {
        glite_jp_query_rec_t    *qr;    
 
 
        assert(in && in->attr); assert(out);
-       *out = qr = calloc(2, sizeof(*qr));     
+       qr = out;       
 
        qr[0].attr = strdup(in->attr);
        glite_jpis_SoapToQueryOp(in->op, &(qr[0].op));
@@ -169,7 +171,6 @@ static int SoapToPrimaryQueryCond(
        default:
                if ( SoapToQueryRecordVal(in->value, &(qr[0].binary), 
                                &(qr[0].size),  &(qr[0].value)) ) {
-                       *out = NULL;
                        return 1;
                }
                break;
@@ -191,16 +192,16 @@ static int SoapToPrimaryQueryCond(
 int glite_jpis_SoapToPrimaryQueryConds(
        int                             size,
         GLITE_SECURITY_GSOAP_LIST_TYPE(jptype, primaryQuery)   in,
-        glite_jp_query_rec_t           ***out)
+        glite_jp_query_rec_t           **out)
 {
-       glite_jp_query_rec_t    **qr;
+       glite_jp_query_rec_t    *qr;
        int                     i;
 
        assert(in || !size); assert(out);
         qr = calloc(size+1, sizeof(*qr));
 
        for (i=0; i<size; i++) {
-               if ( SoapToPrimaryQueryCond(GLITE_SECURITY_GSOAP_LIST_GET(in, i), &(qr[i])) ) {
+               if ( SoapToPrimaryQueryCond(GLITE_SECURITY_GSOAP_LIST_GET(in, i), &qr[i]) ) {
                        *out = NULL;
                        free(qr);
                        return 1;
index acada7a..0bb0430 100644 (file)
@@ -5,7 +5,9 @@
 
 void glite_jpis_SoapToQueryOp(const enum jptype__queryOp in, glite_jp_queryop_t *out);
 void glite_jpis_SoapToAttrOrig(const enum jptype__attrOrig *in, glite_jp_attr_orig_t *out);
+#if 0
 int glite_jpis_SoapToQueryConds(int size, struct jptype__indexQuery **in, glite_jp_query_rec_t ***out);
-int glite_jpis_SoapToPrimaryQueryConds(int size, GLITE_SECURITY_GSOAP_LIST_TYPE(jptype, primaryQuery) in, glite_jp_query_rec_t ***out);
+#endif
+int glite_jpis_SoapToPrimaryQueryConds(int size, GLITE_SECURITY_GSOAP_LIST_TYPE(jptype, primaryQuery) in, glite_jp_query_rec_t **out);
 
 #endif