Finalize init, DB locking functions. Particular functions works good.
authorFrantišek Dvořák <valtri@civ.zcu.cz>
Tue, 27 Sep 2005 17:07:44 +0000 (17:07 +0000)
committerFrantišek Dvořák <valtri@civ.zcu.cz>
Tue, 27 Sep 2005 17:07:44 +0000 (17:07 +0000)
org.glite.jp.index/examples/jpis-db-internal.c
org.glite.jp.index/src/bones_server.c
org.glite.jp.index/src/db_ops.c
org.glite.jp.index/src/db_ops.h
org.glite.jp.index/src/soap_ps_calls.c
org.glite.jp.index/src/soap_ps_calls.h

index dd88972..b890445 100644 (file)
-#include <stddef.h>
+#include <errno.h>
+#include <string.h>
+#include <stdlib.h>
+#include <stdio.h>
 
 #include <glite/jp/db.h>
 
 #include "db_ops.h"
 
 
-#define CS "jpis/@localhost:jpis1"
-
-
 static print_err(glite_jp_context_t ctx) {
        glite_jp_error_t *e;
 
        e = ctx->error;
        while(e) {
-               printf("%s(%s)\n", e->desc, e->source);
+               printf("%s (%s)\n", e->desc, e->source);
                e = e->reason;
        }
        printf("\n");
 }
 
 
+int glite_jpis_db_queries_serialize(void **blob, size_t *len, glite_jp_query_rec_t **queries);
+int glite_jpis_db_queries_deserialize(glite_jp_query_rec_t ***queries, void *blob, size_t blob_size);
+
 int main(int argc, char *argv[]) {
-       glite_jp_context_t ctx;
+#if 1
+       glite_jp_context_t jpctx;
        glite_jp_is_conf *conf;
+       glite_jpis_context_t isctx;
+       int ret;
+       long int uniqueid;
+       char *ps, *feedid;
 
-       glite_jp_init_context(&ctx);
-       if (glite_jp_db_connect(ctx, CS) != 0) goto fail;
+       glite_jp_init_context(&jpctx);
+       if (glite_jpis_init_context(&isctx, jpctx) != 0) goto fail;
 
        printf("dropping...\n");
-       if (glite_jpis_dropDatabase(ctx) != 0) goto faildb;
+       if (glite_jpis_dropDatabase(jpctx) != 0) goto faildb;
 
        printf("initializing...\n");
        if (glite_jp_get_conf(argc, argv, NULL, &conf) != 0) goto faildb;
-       if (glite_jpis_initDatabase(ctx, conf) != 0) goto failconf;
+       if (glite_jpis_initDatabase(jpctx, conf) != 0) goto failconf;
+
+       printf("locking...\n");
+       do {
+               if ((ret = glite_jpis_lockUninitializedFeed(isctx, &uniqueid, &ps)) == ENOLCK) goto faildb;
+               if (ret == 0) {
+                       printf("locked: uniqueid=%li, ps=%s\n", uniqueid, ps);
+                       free(ps);
+
+                       asprintf(&feedid, "feed://%d", uniqueid + 3);
+                       if (glite_jpis_initFeed(isctx, uniqueid, feedid, (time_t)10000) != 0) {
+                               free(feedid);
+                               goto faildb;
+                       }
+                       free(feedid);
+
+                       if (glite_jpis_unlockFeed(isctx, uniqueid) != 0) goto faildb;
+               }
+       } while (ret == 0);
 
        glite_jp_free_conf(conf);
-       glite_jp_db_close(ctx);
-       glite_jp_free_context(ctx);
+       glite_jpis_free_context(isctx);
+       glite_jp_free_context(jpctx);
 
        return 0;
 
 failconf:
        glite_jp_free_conf(conf);
 faildb:
-       glite_jp_db_close(ctx);
+       glite_jpis_free_context(isctx);
 fail:
        printf("failed\n");
-       print_err(ctx);
+       print_err(jpctx);
+       glite_jp_free_context(jpctx);
+
+       return 1;
+#endif
+#if 0
+       glite_jp_context_t ctx;
+       glite_jp_is_conf *conf;
+       void *blob;
+       size_t len;
+       int ret, i;
+       glite_jp_query_rec_t **queries;
+
+       ret = 0;
+       glite_jp_init_context(&ctx);
+
+       if (glite_jp_get_conf(argc, argv, NULL, &conf) != 0) goto fail_ctx;
+       if ((ret = glite_jpis_db_queries_serialize(&blob, &len, conf->feeds[0]->query)) != 0) goto fail;
+
+       if (write(1, blob, len) != len) {
+               ret = errno;
+               free(blob);
+               goto fail;
+       }
+
+       if ((ret = glite_jpis_db_queries_deserialize(&queries, blob, len)) != 0) goto fail_blob;
+       i = 0;
+       while (queries[i] && queries[i]->attr) {
+               printf("query: attr=%s, op=%d, value=%s, value2=%s, bin=%d\n", queries[i]->attr, queries[i]->op, queries[i]->value, queries[i]->value2, queries[i]->binary);
+               free(queries[i]->attr);
+               free(queries[i]->value);
+               free(queries[i]->value2);
+               free(queries[i]);
+               i++;
+       }
+       free(queries);
+
+       free(blob);
+       glite_jp_free_context(ctx);
+       return 0;
+
+fail_blob:
+       free(blob);
+fail:
+       fprintf(stderr, "fail: %s\n", strerror(ret));
+fail_ctx:
        glite_jp_free_context(ctx);
        return 1;
+#endif
 }
index 5315b04..dbe56a6 100644 (file)
@@ -34,7 +34,7 @@ extern SOAP_NMAC struct Namespace jpis__namespaces[],jpps__namespaces[];
 extern SOAP_NMAC struct Namespace namespaces[] = { {NULL,NULL} };
 // namespaces[] not used here, but need to prevent linker to complain...
 
-extern void MyFeedIndex(glite_jp_context_t ctx, glite_jp_is_conf *conf, char *dest);
+extern void MyFeedIndex(glite_jpis_context_t ctx, glite_jp_is_conf *conf, long int uniqueid, char *dest);
 
 static int newconn(int,struct timeval *,void *);
 static int request(int,struct timeval *,void *);
@@ -46,6 +46,11 @@ static struct glite_srvbones_service stab = {
        "JP Index Server", -1, newconn, request, reject, disconn
 };
 
+typedef struct {
+       glite_jpis_context_t ctx;
+       struct soap *soap;
+} slave_data_t;
+
 static time_t          cert_mtime;
 static char            *server_cert, *server_key, *cadir;
 static gss_cred_id_t   mycred = GSS_C_NO_CREDENTIAL;
@@ -65,9 +70,15 @@ int main(int argc, char *argv[])
        edg_wll_GssStatus       gss_code;
        struct sockaddr_in      a;
        char                    *config_file;
+       glite_jpis_context_t    isctx;
 
 
        glite_jp_init_context(&ctx);
+       if (glite_jpis_init_context(&isctx, ctx) != 0) {
+               fprintf(stderr, "Connect DB failed: %s (%s)\n", ctx->error->desc, ctx->error->source);
+               glite_jp_free_context(ctx);
+               return 1;
+       }
 
        /* Read config options/file */
        // XXX: need add something meaningfull to src/conf.c !
@@ -80,6 +91,21 @@ int main(int argc, char *argv[])
        */
        
 
+       if (glite_jpis_dropDatabase(ctx) != 0) {
+               fprintf(stderr, "Drop DB failed: %s (%s)\n", ctx->error->desc, ctx->error->source);
+               glite_jpis_free_context(isctx);
+               glite_jp_free_context(ctx);
+               return 1;
+       }
+
+       if (glite_jpis_initDatabase(ctx, conf) != 0) {
+               fprintf(stderr, "Init DB failed: %s (%s)\n", ctx->error->desc, ctx->error->source);
+               glite_jpis_free_context(isctx);
+               glite_jp_free_context(ctx);
+               return 1;
+       }
+
+
 #if GSOAP_VERSION <= 20602
        for (i=0; jpis__namespaces[i].id && strcmp(jpis__namespaces[i].id,"ns1"); i++);
 #else
@@ -139,6 +165,8 @@ int main(int argc, char *argv[])
 
 
        glite_jp_free_conf(conf);
+       glite_jpis_free_context(isctx);
+       glite_jp_free_context(ctx);
 
        return 0;
 }
@@ -146,15 +174,21 @@ int main(int argc, char *argv[])
 /* slave's init comes here */  
 static int data_init(void **data)
 {
-       char    *PS_URL = NULL;
-
-
-       *data = (void *) soap_new();
+       slave_data_t    *private;
+       char            *PS_URL = NULL;
+       long int        uniqueid;
+
+       private = calloc(sizeof(*private), 1);
+       if (glite_jpis_init_context(&private->ctx, ctx) != 0) {
+               printf("[%d] slave_init(): DB error: %s (%s)\n",getpid(),ctx->error->desc,ctx->error->source);
+               return -1;
+       }
+       private->soap = soap_new();
        printf("[%d] slave started\n",getpid());
 
        /* ask PS server for data */
        do {
-               switch (glite_jpis_lockUninitializedFeed(ctx,&PS_URL)) {
+               switch (glite_jpis_lockUninitializedFeed(private->ctx,&uniqueid,&PS_URL)) {
                        case ENOENT:
                                // no more feeds to initialize
                                return 0;
@@ -162,11 +196,12 @@ static int data_init(void **data)
                                // error during locking
                                printf("[%d] slave_init(): Locking error.\n",getpid());
                                free(PS_URL);
+                               glite_jpis_free_context(private->ctx);
                                return -1;
                        default:
                                // contact PS server, ask for data, save feedId and expiration
                                // to DB and unlock feed
-                               MyFeedIndex(ctx, conf, PS_URL);
+                               MyFeedIndex(private->ctx, conf, uniqueid, PS_URL);
                                free(PS_URL);
                                PS_URL = NULL;
                                break;
@@ -176,7 +211,9 @@ static int data_init(void **data)
 
 static int newconn(int conn,struct timeval *to,void *data)
 {
-       struct soap             *soap = (struct soap *) data;
+       slave_data_t     *private = (slave_data_t *)data;
+       struct soap             *soap = private->soap;
+       glite_jp_context_t      ctx = private->ctx->jpctx;
        glite_gsplugin_Context  plugin_ctx;
 
        gss_cred_id_t           newcred = GSS_C_NO_CREDENTIAL;
@@ -189,7 +226,6 @@ static int newconn(int conn,struct timeval *to,void *data)
 
        soap_init2(soap,SOAP_IO_KEEPALIVE,SOAP_IO_KEEPALIVE);
        soap_set_namespaces(soap,jpis__namespaces);
-       soap->user = (void *) ctx; /* XXX: one instance per slave */
 
 /* not yet: client to JP Storage Server
  * probably wil come to other place, just not forget it....
@@ -252,6 +288,7 @@ static int newconn(int conn,struct timeval *to,void *data)
        return 0;
 
 cleanup:
+       glite_jpis_free_context(private->ctx);
        glite_gsplugin_free_context(plugin_ctx);
        soap_end(soap);
 
@@ -260,8 +297,9 @@ cleanup:
 
 static int request(int conn,struct timeval *to,void *data)
 {
-       struct soap             *soap = data;
-       glite_jp_context_t      ctx = soap->user;
+       slave_data_t            *private = (slave_data_t *)data;
+       struct soap             *soap = private->soap;
+       glite_jp_context_t      ctx = private->ctx->jpctx;
 
        glite_gsplugin_set_timeout(glite_gsplugin_get_context(soap),to);
 
@@ -311,7 +349,10 @@ static int reject(int conn)
 
 static int disconn(int conn,struct timeval *to,void *data)
 {
-       struct soap     *soap = (struct soap *) data;
+       slave_data_t            *private = (slave_data_t *)data;
+       struct soap             *soap = private->soap;
+
+       glite_jpis_free_context(private->ctx);
        soap_end(soap); // clean up everything and close socket
        
        return 0;
index 164f402..7cd56a5 100644 (file)
@@ -2,6 +2,10 @@
 
 #include <time.h>
 #include <errno.h>
+#include <stdlib.h>
+#include <inttypes.h>
+#include <string.h>
+#include <assert.h>
 
 #include <glite/jp/types.h>
 #include <glite/jp/context.h>
 #define lprintf
 //#define lprintf printf
 
+#define WORD_SWAP(X) ((((X) >> 8) & 0xFF) | (((X) & 0xFF) << 8))
+#define LONG_SWAP(X) (WORD_SWAP(((X) >> 16) & 0xFFFF) | ((WORD_SWAP(X) & 0xFFFF) << 16))
+#if __BYTE_ORDER == __LITTLE_ENDIAN
+#define LONG_LE(X) (X)
+#else
+#define LONG_LE(X) LONG_SWAP(X)
+#endif
+
+#define COND_MAGIC 0x444E4F43
 
 static is_indexed(glite_jp_is_conf *conf, const char *attr) {
        size_t i;
@@ -40,6 +53,191 @@ static is_indexed(glite_jp_is_conf *conf, const char *attr) {
 }
 
 
+static size_t db_arg2_length(glite_jp_query_rec_t *query) {
+       size_t len;
+
+       assert(query->op > GLITE_JP_QUERYOP_UNDEF && query->op <= GLITE_JP_QUERYOP__LAST);
+       len = 0;
+       switch (query->op) {
+               case GLITE_JP_QUERYOP_WITHIN:
+                       len = query->binary ? query->size2 : strlen(query->value2) + 1;
+               case GLITE_JP_QUERYOP_EQUAL:
+               case GLITE_JP_QUERYOP_UNEQUAL:
+               case GLITE_JP_QUERYOP_LESS:
+               case GLITE_JP_QUERYOP_GREATER:
+               case GLITE_JP_QUERYOP_EXISTS:
+                       len = 0;
+       }
+
+       return len;
+}
+
+
+static int array_init(void **data, size_t *len, size_t *maxlen, size_t initial_len) {
+       *len = 0;
+       if ((*data = malloc(initial_len)) != NULL) {
+               *maxlen = initial_len;
+               return 0;
+       } else {
+               *maxlen = 0;
+               return ENOMEM;
+       }
+}
+
+
+static int array_add(void **data, size_t *len, size_t *maxlen, void *new_data, size_t new_data_len) {
+       void *tmp;
+       size_t ptr;
+
+       ptr = *len;
+       (*len) += new_data_len;
+       while (*len > *maxlen) {
+               (*maxlen) *= 2;
+               if ((tmp = realloc(*data, *maxlen)) == NULL) return ENOMEM;
+               *data = tmp;
+       }
+       memcpy(((char *)(*data)) + ptr, new_data, new_data_len);
+
+       return 0;
+}
+
+
+static int array_add_long(void **data, size_t *len, size_t *maxlen, uint32_t l) {
+       uint32_t lel;
+
+       lel = LONG_LE(l);
+       return array_add(data, len, maxlen, &lel, sizeof(uint32_t));
+}
+
+
+static uint32_t array_get_long(void **data) {
+       uint32_t *lel;
+
+       lel = (uint32_t *)*data;
+       *data = ((char *)*data) + sizeof(uint32_t);
+
+       return LONG_LE(*lel);
+}
+
+
+static void *array_get(void **data, size_t data_len) {
+       void *res;
+
+       res = *data;
+       *data = ((char *)*data) + data_len;
+
+       return res;
+}
+
+
+static int glite_jpis_db_queries_serialize(void **blob, size_t *len, glite_jp_query_rec_t **queries) {
+       size_t maxlen;
+       glite_jp_query_rec_t *query;
+       int ret;
+       size_t datalen;
+
+       if ((ret = array_init(blob, len, &maxlen, 1024)) != 0) return ret;
+       query = *queries;
+       while(query && query->attr) {
+               if ((ret = array_add_long(blob, len, &maxlen, COND_MAGIC)) != 0) goto fail;
+               datalen = strlen(query->attr) + 1;
+               if ((ret = array_add_long(blob, len, &maxlen, datalen)) != 0) goto fail;
+               if ((ret = array_add(blob, len, &maxlen, query->attr, datalen)) != 0) goto fail;
+               if ((ret = array_add_long(blob, len, &maxlen, query->op)) != 0) goto fail;
+               if ((ret = array_add_long(blob, len, &maxlen, query->binary ? 1 : 0)) != 0) goto fail;
+
+               datalen = query->binary ? query->size : strlen(query->value) + 1;
+               if ((ret = array_add_long(blob, len, &maxlen, datalen)) != 0) goto fail;
+               if (datalen)
+                       if ((ret = array_add(blob, len, &maxlen, query->value, datalen)) != 0) goto fail;
+               
+               datalen = db_arg2_length(query);
+               if ((ret = array_add_long(blob, len, &maxlen, datalen)) != 0) goto fail;
+               if (datalen)
+                       if ((ret = array_add(blob, len, &maxlen, query->value2, datalen)) != 0) goto fail;
+               
+               query++;
+       }
+
+       return 0;
+fail:
+       free(*blob);
+       *len = 0;
+       return ret;
+}
+
+
+static int glite_jpis_db_queries_deserialize(glite_jp_query_rec_t ***queries, void *blob, size_t blob_size) {
+       size_t maxlen, len, datalen;
+       void *blob_ptr, *blob_end;
+       int ret;
+       uint32_t l;
+       glite_jp_query_rec_t *query;
+       int i;
+
+       if ((ret = array_init((void **)queries, &len, &maxlen, 512)) != 0) return ret;
+       blob_ptr = blob;
+       blob_end = (char *)blob + blob_size;
+       while (blob_end > blob_ptr) {
+               ret = ENOMEM;
+               if ((query = calloc(sizeof(*query), 1)) == NULL) goto fail;
+               l = array_get_long(&blob_ptr);
+               if (l != COND_MAGIC) {
+                       printf("blob=%p, blob_ptr=%p, 0x%08" PRIX32 "\n", blob, blob_ptr, l);
+                       ret = EINVAL;
+                       goto fail_query;
+               }
+
+               datalen = array_get_long(&blob_ptr);
+               if (datalen) {
+                       if ((query->attr = malloc(datalen)) == NULL) goto fail_query;
+                       memcpy(query->attr, array_get(&blob_ptr, datalen), datalen);
+               } else query->attr = NULL;
+
+               query->op = array_get_long(&blob_ptr);
+               query->binary = array_get_long(&blob_ptr);
+
+               datalen = array_get_long(&blob_ptr);
+               if (datalen) {
+                       if ((query->value = malloc(datalen)) == NULL) goto fail_query;
+                       memcpy(query->value, array_get(&blob_ptr, datalen), datalen);
+               } else query->value = NULL;
+               query->size = datalen;
+
+               datalen = array_get_long(&blob_ptr);
+               if (datalen) {
+                       if ((query->value2 = malloc(datalen)) == NULL) goto fail_query;
+                       memcpy(query->value2, array_get(&blob_ptr, datalen), datalen);
+               } else query->value2 = NULL;
+               query->size2 = datalen;
+
+               if ((ret = array_add((void **)queries, &len, &maxlen, &query, sizeof(query))) != 0) goto fail_query;
+       }
+       assert(blob_end == blob_ptr);
+
+       query = NULL;
+       if ((ret = array_add((void **)queries, &len, &maxlen, &query, sizeof(query))) != 0) goto fail;
+
+       return 0;
+
+fail_query:
+       free(query);
+fail:
+       i = 0;
+       query = (*queries)[i];
+       while (query && query->attr) {
+               free(query->attr);
+               free(query->value);
+               free(query->value2);
+               free(query);
+               i++;
+               query = (*queries)[i];
+       }
+       free(*queries);
+       return ret;
+}
+
+
 /* Init the database. 
  *
  * \retval 0        OK
@@ -52,11 +250,13 @@ int glite_jpis_initDatabase(glite_jp_context_t ctx, glite_jp_is_conf *conf) {
        const char *type_index, *type_full;
        size_t i;
        MYSQL_BIND param[4];
-       unsigned long attrid_len, name_len, type_len, source_len;
-       char attrid[33], name[256], type[33], source[256];
+       unsigned long attrid_len, name_len, type_len, source_len, dbconds_len;
+       char attrid[33], name[256], type[33], source[256], dbconds[1024];
        int indexed, state, locked;
+       size_t conds_len;
        char sql[512];
        glite_jp_is_feed **feeds;
+       void *conds;
 
        glite_jp_db_assign_param(&param[0], MYSQL_TYPE_VAR_STRING, attrid, &attrid_len);
        glite_jp_db_assign_param(&param[1], MYSQL_TYPE_VAR_STRING, name, &name_len);
@@ -101,7 +301,8 @@ int glite_jpis_initDatabase(glite_jp_context_t ctx, glite_jp_is_conf *conf) {
        glite_jp_db_assign_param(&param[0], MYSQL_TYPE_LONG, &state);
        glite_jp_db_assign_param(&param[1], MYSQL_TYPE_LONG, &locked);
        glite_jp_db_assign_param(&param[2], MYSQL_TYPE_VAR_STRING, source, &source_len);
-       if (glite_jp_db_prepare(ctx, "INSERT INTO feeds (state, locked, source) VALUES (?, ?, ?)", &stmt, param, NULL) != 0) goto fail;
+       glite_jp_db_assign_param(&param[3], MYSQL_TYPE_MEDIUM_BLOB, dbconds, &dbconds_len);
+       if (glite_jp_db_prepare(ctx, "INSERT INTO feeds (state, locked, source, condition) VALUES (?, ?, ?, ?)", &stmt, param, NULL) != 0) goto fail;
        feeds = conf->feeds;
        i = 0;
        memset(source, 0, sizeof(source));
@@ -111,7 +312,12 @@ int glite_jpis_initDatabase(glite_jp_context_t ctx, glite_jp_is_conf *conf) {
                locked = 0;
                strncpy(source, feeds[i]->PS_URL, sizeof(source) - 1);
                source_len = strlen(source);
-               if (glite_jp_db_execute(stmt) == -1) goto fail_stmt;
+               assert(glite_jpis_db_queries_serialize(&conds, &conds_len, feeds[i]->query) == 0);
+               assert(conds_len <= sizeof(dbconds));
+               dbconds_len = conds_len;
+               memcpy(dbconds, conds, conds_len);
+               free(conds);
+               if (glite_jp_db_execute(stmt) == -1) goto fail_conds;
 
                i++;
        }
@@ -119,6 +325,8 @@ int glite_jpis_initDatabase(glite_jp_context_t ctx, glite_jp_is_conf *conf) {
 
        return 0;
 
+fail_conds:
+       free(conds);
 fail_stmt:
        glite_jp_db_freestmt(&stmt);
 fail:
@@ -164,6 +372,61 @@ fail:
 }
 
 
+int glite_jpis_init_context(glite_jpis_context_t *isctx, glite_jp_context_t jpctx) {
+       int ret;
+       MYSQL_BIND myparam[3];
+       MYSQL_BIND myres[2];
+
+       *isctx = calloc(sizeof(**isctx), 1);
+       
+       (*isctx)->jpctx = jpctx;
+       if ((ret = glite_jp_db_connect(jpctx, GLITE_JP_IS_DEFAULTCS)) != 0) goto fail;
+
+       // sql command: select an uninitialized unlocked feed
+       glite_jp_db_assign_result(&myres[0], MYSQL_TYPE_LONG, NULL, &((*isctx)->param_uniqueid));
+       glite_jp_db_assign_result(&myres[1], MYSQL_TYPE_VAR_STRING, NULL, (*isctx)->param_ps, sizeof((*isctx)->param_ps), &(*isctx)->param_ps_len);
+       if ((ret = glite_jp_db_prepare(jpctx, "SELECT uniqueid, source FROM feeds WHERE (locked=0) AND (feedid IS NULL)", &(*isctx)->select_feed_stmt, NULL, myres)) != 0) goto fail_connect;
+
+       // sql command: lock the feed (via uniqueid)
+       glite_jp_db_assign_param(&myparam[0], MYSQL_TYPE_LONG, &(*isctx)->param_uniqueid);
+       if ((ret = glite_jp_db_prepare(jpctx, "UPDATE feeds SET locked=1 WHERE (locked = 0) AND (uniqueid = ?)", &(*isctx)->lock_feed_stmt, myparam, NULL)) != 0) goto fail_cmd;
+
+       // sql command: assign the feed (via uniqueid)
+       glite_jp_db_assign_param(&myparam[0], MYSQL_TYPE_VAR_STRING, (*isctx)->param_feedid, &(*isctx)->param_feedid_len);
+       glite_jp_db_assign_param(&myparam[1], MYSQL_TYPE_DATETIME, &(*isctx)->param_expires);
+       glite_jp_db_assign_param(&myparam[2], MYSQL_TYPE_LONG, &(*isctx)->param_uniqueid);
+       if ((ret = glite_jp_db_prepare(jpctx, "UPDATE feeds SET feedid=?, expires=?  WHERE (uniqueid=?)", &(*isctx)->init_feed_stmt, myparam, NULL)) != 0) goto fail_cmd2;
+
+       // sql command: unlock the feed (via uniqueid)
+       glite_jp_db_assign_param(&myparam[0], MYSQL_TYPE_LONG, &(*isctx)->param_uniqueid);
+       if ((ret = glite_jp_db_prepare(jpctx, "UPDATE feeds SET locked=0  WHERE (uniqueid=?)", &(*isctx)->unlock_feed_stmt, myparam, NULL)) != 0) goto fail_cmd3;
+
+       return 0;
+
+fail_cmd3:
+       glite_jp_db_freestmt(&(*isctx)->init_feed_stmt);
+fail_cmd2:
+       glite_jp_db_freestmt(&(*isctx)->lock_feed_stmt);
+fail_cmd:
+       glite_jp_db_freestmt(&(*isctx)->select_feed_stmt);
+fail_connect:
+       glite_jp_db_close((*isctx)->jpctx);
+fail:
+       free(*isctx);
+       return ret;
+}
+
+
+int glite_jpis_free_context(glite_jpis_context_t ctx) {
+       glite_jp_db_freestmt(&ctx->select_feed_stmt);
+       glite_jp_db_freestmt(&ctx->lock_feed_stmt);
+       glite_jp_db_freestmt(&ctx->init_feed_stmt);
+       glite_jp_db_freestmt(&ctx->unlock_feed_stmt);
+       glite_jp_db_close(ctx->jpctx);
+       free(ctx);
+}
+
+
 /* Find first unitialized feed, lock it and return URL of corresponding PS 
  *
  * Return value:
@@ -171,21 +434,58 @@ fail:
  *      ENOENT - no more feeds to initialize
  *      ENOLCK - error during locking */
 
-int glite_jpis_lockUninitializedFeed(glite_jp_context_t ctx, char **PS_URL)
+int glite_jpis_lockUninitializedFeed(glite_jpis_context_t ctx, long int *uniqueid, char **PS_URL)
 {
-       return ENOENT;
+       int ret;
+
+       do {
+               switch (glite_jp_db_execute(ctx->select_feed_stmt)) {
+               case -1: lprintf("error selecting unlocked feed\n"); return ENOLCK;
+               case 0: lprintf("no more uninit. feeds unlocked\n"); return ENOENT;
+               default: break;
+               }
+               if (glite_jp_db_fetch(ctx->select_feed_stmt) != 0) return ENOLCK;
+               lprintf("selected uninit. feed %lu\n", ctx->param_uniqueid);
+
+               ret = glite_jp_db_execute(ctx->lock_feed_stmt);
+               lprintf("locked %d feeds (uniqueid=%lu)\n", ret, ctx->param_uniqueid);
+       } while (ret != 1);
+
+       *uniqueid = ctx->param_uniqueid;
+       if (PS_URL) *PS_URL = strdup(ctx->param_ps);
+
+       return 0;
 }
 
 
 /* Store feed ID and expiration time returned by PS for locked feed. */
 
-void glite_jpis_feedInit(glite_jp_context_t ctx, char *PS_URL, char *feedId, time_t feedExpires)
+int glite_jpis_initFeed(glite_jpis_context_t ctx, long int uniqueid, char *feedId, time_t feedExpires)
 {
+       int ret;
+
+       memset(ctx->param_feedid, 0, sizeof(ctx->param_feedid));
+       memset(&ctx->param_expires, 0, sizeof(ctx->param_expires));
+       strncpy(ctx->param_feedid, feedId, sizeof(ctx->param_feedid) - 1);
+       ctx->param_feedid_len = strlen(ctx->param_feedid) + 1;
+       glite_jp_db_assign_time(&ctx->param_expires, feedExpires);
+       ctx->param_uniqueid = uniqueid;
+
+       ret = glite_jp_db_execute(ctx->init_feed_stmt);
+       lprintf("initializing feed, uniqueid=%li, result=%d\n", uniqueid, ret);
+
+       return ret == 1 ? 0 : ENOLCK;
 }
 
+
 /* Unlock given feed */
 
-void glite_jpis_unlockFeed(glite_jp_context_t ctx, char *PS_URL)
-{
-}
+int glite_jpis_unlockFeed(glite_jpis_context_t ctx, long int uniqueid) {
+       int ret;
 
+       ctx->param_uniqueid = uniqueid;
+       ret = glite_jp_db_execute(ctx->unlock_feed_stmt);
+       lprintf("unlocking feed, uniqueid=%li, result=%d\n", uniqueid, ret);
+
+       return ret == 1 ? 0 : ENOLCK;
+}
index 6c8b5f1..a93cc8e 100644 (file)
@@ -5,17 +5,34 @@
 
 #include <glite/jp/types.h>
 #include <glite/jp/context.h>
+#include <glite/jp/db.h>
 #include "conf.h"
 
 
+#define GLITE_JP_IS_DEFAULTCS "jpis/@localhost:jpis1"
+
 #define GLITE_JP_IS_STATE_HIST 1
 #define GLITE_JP_IS_STATE_CONT 2
 
 
+typedef struct _glite_jpis_context {
+       glite_jp_context_t jpctx;
+       glite_jp_db_stmt_t select_feed_stmt, lock_feed_stmt, init_feed_stmt, unlock_feed_stmt;
+       long int param_uniqueid;
+       char param_feedid[33], param_ps[256];
+       unsigned long param_ps_len, param_feedid_len;
+       MYSQL_TIME param_expires;
+} *glite_jpis_context_t;
+
+
 int glite_jpis_initDatabase(glite_jp_context_t ctx, glite_jp_is_conf *conf);
 int glite_jpis_dropDatabase(glite_jp_context_t ctx);
-int glite_jpis_lockUninitializedFeed(glite_jp_context_t ctx, char **PS_URL);
-void glite_jpis_feedInit(glite_jp_context_t ctx, char *PS_URL, char *feedId, time_t feedExpires);
-void glite_jpis_unlockFeed(glite_jp_context_t ctx, char *PS_URL);
+
+int glite_jpis_init_context(glite_jpis_context_t *isctx, glite_jp_context_t jpctx);
+int glite_jpis_free_context(glite_jpis_context_t ctx);
+
+int glite_jpis_lockUninitializedFeed(glite_jpis_context_t ctx, long int *uinqueid, char **PS_URL);
+int glite_jpis_initFeed(glite_jpis_context_t ctx, long int uniqueid, char *feedId, time_t feedExpires);
+int glite_jpis_unlockFeed(glite_jpis_context_t ctx, long int uniqueid);
 
 #endif
index b3b1d55..8eae0e5 100644 (file)
@@ -77,7 +77,7 @@ static int find_dest_index(glite_jp_is_conf *conf, char *dest)
 
 
 // call PS FeedIndex for a given destination
-void MyFeedIndex(glite_jp_context_t ctx, glite_jp_is_conf *conf, char *dest)
+void MyFeedIndex(glite_jpis_context_t ctx, glite_jp_is_conf *conf, long int uniqueid, char *dest)
 {
        struct _jpelem__FeedIndex               in;
        struct _jpelem__FeedIndexResponse       out;
@@ -119,13 +119,13 @@ printf("MyFeedIndex for %s called\n", dest);
        //if (!check_fault(soap,soap_call_jpsrv___FeedIndex(soap,dest,"",
        if (soap_call___jpsrv__FeedIndex(soap,dest,"", &in, &out)) {
                printf("soap_call___jpsrv__FeedIndex() returned error\n");
-               glite_jpis_unlockFeed(ctx, dest);
+               glite_jpis_unlockFeed(ctx, uniqueid);
                goto err;
        }
        else {
                printf("FeedId: %s\nExpires: %s\n",out.feedId,ctime(&out.feedExpires));
-               glite_jpis_feedInit(ctx, dest, out.feedId, out.feedExpires);
-               glite_jpis_unlockFeed(ctx, dest);
+               glite_jpis_initFeed(ctx, uniqueid, out.feedId, out.feedExpires);
+               glite_jpis_unlockFeed(ctx, uniqueid);
        }
        
 err:
index bcf6bbb..cb2c992 100644 (file)
@@ -5,6 +5,6 @@
 
 #include "conf.h"
 
-void MyFeedIndex(glite_jp_context_t ctx, glite_jp_is_conf *conf, char *dest);
+void MyFeedIndex(glite_jpis_context_t ctx, glite_jp_is_conf *conf, long int uniqueid, char *dest);
 
 #endif