From: František Dvořák Date: Tue, 27 Sep 2005 17:07:44 +0000 (+0000) Subject: Finalize init, DB locking functions. Particular functions works good. X-Git-Tag: gridsite-core_R_1_1_12~80 X-Git-Url: http://scientific.zcu.cz/git/?a=commitdiff_plain;h=2d275d00bcddaf2c8cf0264d39c7f9d3a2345d44;p=jra1mw.git Finalize init, DB locking functions. Particular functions works good. --- diff --git a/org.glite.jp.index/examples/jpis-db-internal.c b/org.glite.jp.index/examples/jpis-db-internal.c index dd88972..b890445 100644 --- a/org.glite.jp.index/examples/jpis-db-internal.c +++ b/org.glite.jp.index/examples/jpis-db-internal.c @@ -1,52 +1,124 @@ -#include +#include +#include +#include +#include #include #include "db_ops.h" -#define CS "jpis/@localhost:jpis1" - - static print_err(glite_jp_context_t ctx) { glite_jp_error_t *e; e = ctx->error; while(e) { - printf("%s(%s)\n", e->desc, e->source); + printf("%s (%s)\n", e->desc, e->source); e = e->reason; } printf("\n"); } +int glite_jpis_db_queries_serialize(void **blob, size_t *len, glite_jp_query_rec_t **queries); +int glite_jpis_db_queries_deserialize(glite_jp_query_rec_t ***queries, void *blob, size_t blob_size); + int main(int argc, char *argv[]) { - glite_jp_context_t ctx; +#if 1 + glite_jp_context_t jpctx; glite_jp_is_conf *conf; + glite_jpis_context_t isctx; + int ret; + long int uniqueid; + char *ps, *feedid; - glite_jp_init_context(&ctx); - if (glite_jp_db_connect(ctx, CS) != 0) goto fail; + glite_jp_init_context(&jpctx); + if (glite_jpis_init_context(&isctx, jpctx) != 0) goto fail; printf("dropping...\n"); - if (glite_jpis_dropDatabase(ctx) != 0) goto faildb; + if (glite_jpis_dropDatabase(jpctx) != 0) goto faildb; printf("initializing...\n"); if (glite_jp_get_conf(argc, argv, NULL, &conf) != 0) goto faildb; - if (glite_jpis_initDatabase(ctx, conf) != 0) goto failconf; + if (glite_jpis_initDatabase(jpctx, conf) != 0) goto failconf; + + printf("locking...\n"); + do { + if ((ret = glite_jpis_lockUninitializedFeed(isctx, &uniqueid, &ps)) == ENOLCK) goto faildb; + if (ret == 0) { + printf("locked: uniqueid=%li, ps=%s\n", uniqueid, ps); + free(ps); + + asprintf(&feedid, "feed://%d", uniqueid + 3); + if (glite_jpis_initFeed(isctx, uniqueid, feedid, (time_t)10000) != 0) { + free(feedid); + goto faildb; + } + free(feedid); + + if (glite_jpis_unlockFeed(isctx, uniqueid) != 0) goto faildb; + } + } while (ret == 0); glite_jp_free_conf(conf); - glite_jp_db_close(ctx); - glite_jp_free_context(ctx); + glite_jpis_free_context(isctx); + glite_jp_free_context(jpctx); return 0; failconf: glite_jp_free_conf(conf); faildb: - glite_jp_db_close(ctx); + glite_jpis_free_context(isctx); fail: printf("failed\n"); - print_err(ctx); + print_err(jpctx); + glite_jp_free_context(jpctx); + + return 1; +#endif +#if 0 + glite_jp_context_t ctx; + glite_jp_is_conf *conf; + void *blob; + size_t len; + int ret, i; + glite_jp_query_rec_t **queries; + + ret = 0; + glite_jp_init_context(&ctx); + + if (glite_jp_get_conf(argc, argv, NULL, &conf) != 0) goto fail_ctx; + if ((ret = glite_jpis_db_queries_serialize(&blob, &len, conf->feeds[0]->query)) != 0) goto fail; + + if (write(1, blob, len) != len) { + ret = errno; + free(blob); + goto fail; + } + + if ((ret = glite_jpis_db_queries_deserialize(&queries, blob, len)) != 0) goto fail_blob; + i = 0; + while (queries[i] && queries[i]->attr) { + printf("query: attr=%s, op=%d, value=%s, value2=%s, bin=%d\n", queries[i]->attr, queries[i]->op, queries[i]->value, queries[i]->value2, queries[i]->binary); + free(queries[i]->attr); + free(queries[i]->value); + free(queries[i]->value2); + free(queries[i]); + i++; + } + free(queries); + + free(blob); + glite_jp_free_context(ctx); + return 0; + +fail_blob: + free(blob); +fail: + fprintf(stderr, "fail: %s\n", strerror(ret)); +fail_ctx: glite_jp_free_context(ctx); return 1; +#endif } diff --git a/org.glite.jp.index/src/bones_server.c b/org.glite.jp.index/src/bones_server.c index 5315b04..dbe56a6 100644 --- a/org.glite.jp.index/src/bones_server.c +++ b/org.glite.jp.index/src/bones_server.c @@ -34,7 +34,7 @@ extern SOAP_NMAC struct Namespace jpis__namespaces[],jpps__namespaces[]; extern SOAP_NMAC struct Namespace namespaces[] = { {NULL,NULL} }; // namespaces[] not used here, but need to prevent linker to complain... -extern void MyFeedIndex(glite_jp_context_t ctx, glite_jp_is_conf *conf, char *dest); +extern void MyFeedIndex(glite_jpis_context_t ctx, glite_jp_is_conf *conf, long int uniqueid, char *dest); static int newconn(int,struct timeval *,void *); static int request(int,struct timeval *,void *); @@ -46,6 +46,11 @@ static struct glite_srvbones_service stab = { "JP Index Server", -1, newconn, request, reject, disconn }; +typedef struct { + glite_jpis_context_t ctx; + struct soap *soap; +} slave_data_t; + static time_t cert_mtime; static char *server_cert, *server_key, *cadir; static gss_cred_id_t mycred = GSS_C_NO_CREDENTIAL; @@ -65,9 +70,15 @@ int main(int argc, char *argv[]) edg_wll_GssStatus gss_code; struct sockaddr_in a; char *config_file; + glite_jpis_context_t isctx; glite_jp_init_context(&ctx); + if (glite_jpis_init_context(&isctx, ctx) != 0) { + fprintf(stderr, "Connect DB failed: %s (%s)\n", ctx->error->desc, ctx->error->source); + glite_jp_free_context(ctx); + return 1; + } /* Read config options/file */ // XXX: need add something meaningfull to src/conf.c ! @@ -80,6 +91,21 @@ int main(int argc, char *argv[]) */ + if (glite_jpis_dropDatabase(ctx) != 0) { + fprintf(stderr, "Drop DB failed: %s (%s)\n", ctx->error->desc, ctx->error->source); + glite_jpis_free_context(isctx); + glite_jp_free_context(ctx); + return 1; + } + + if (glite_jpis_initDatabase(ctx, conf) != 0) { + fprintf(stderr, "Init DB failed: %s (%s)\n", ctx->error->desc, ctx->error->source); + glite_jpis_free_context(isctx); + glite_jp_free_context(ctx); + return 1; + } + + #if GSOAP_VERSION <= 20602 for (i=0; jpis__namespaces[i].id && strcmp(jpis__namespaces[i].id,"ns1"); i++); #else @@ -139,6 +165,8 @@ int main(int argc, char *argv[]) glite_jp_free_conf(conf); + glite_jpis_free_context(isctx); + glite_jp_free_context(ctx); return 0; } @@ -146,15 +174,21 @@ int main(int argc, char *argv[]) /* slave's init comes here */ static int data_init(void **data) { - char *PS_URL = NULL; - - - *data = (void *) soap_new(); + slave_data_t *private; + char *PS_URL = NULL; + long int uniqueid; + + private = calloc(sizeof(*private), 1); + if (glite_jpis_init_context(&private->ctx, ctx) != 0) { + printf("[%d] slave_init(): DB error: %s (%s)\n",getpid(),ctx->error->desc,ctx->error->source); + return -1; + } + private->soap = soap_new(); printf("[%d] slave started\n",getpid()); /* ask PS server for data */ do { - switch (glite_jpis_lockUninitializedFeed(ctx,&PS_URL)) { + switch (glite_jpis_lockUninitializedFeed(private->ctx,&uniqueid,&PS_URL)) { case ENOENT: // no more feeds to initialize return 0; @@ -162,11 +196,12 @@ static int data_init(void **data) // error during locking printf("[%d] slave_init(): Locking error.\n",getpid()); free(PS_URL); + glite_jpis_free_context(private->ctx); return -1; default: // contact PS server, ask for data, save feedId and expiration // to DB and unlock feed - MyFeedIndex(ctx, conf, PS_URL); + MyFeedIndex(private->ctx, conf, uniqueid, PS_URL); free(PS_URL); PS_URL = NULL; break; @@ -176,7 +211,9 @@ static int data_init(void **data) static int newconn(int conn,struct timeval *to,void *data) { - struct soap *soap = (struct soap *) data; + slave_data_t *private = (slave_data_t *)data; + struct soap *soap = private->soap; + glite_jp_context_t ctx = private->ctx->jpctx; glite_gsplugin_Context plugin_ctx; gss_cred_id_t newcred = GSS_C_NO_CREDENTIAL; @@ -189,7 +226,6 @@ static int newconn(int conn,struct timeval *to,void *data) soap_init2(soap,SOAP_IO_KEEPALIVE,SOAP_IO_KEEPALIVE); soap_set_namespaces(soap,jpis__namespaces); - soap->user = (void *) ctx; /* XXX: one instance per slave */ /* not yet: client to JP Storage Server * probably wil come to other place, just not forget it.... @@ -252,6 +288,7 @@ static int newconn(int conn,struct timeval *to,void *data) return 0; cleanup: + glite_jpis_free_context(private->ctx); glite_gsplugin_free_context(plugin_ctx); soap_end(soap); @@ -260,8 +297,9 @@ cleanup: static int request(int conn,struct timeval *to,void *data) { - struct soap *soap = data; - glite_jp_context_t ctx = soap->user; + slave_data_t *private = (slave_data_t *)data; + struct soap *soap = private->soap; + glite_jp_context_t ctx = private->ctx->jpctx; glite_gsplugin_set_timeout(glite_gsplugin_get_context(soap),to); @@ -311,7 +349,10 @@ static int reject(int conn) static int disconn(int conn,struct timeval *to,void *data) { - struct soap *soap = (struct soap *) data; + slave_data_t *private = (slave_data_t *)data; + struct soap *soap = private->soap; + + glite_jpis_free_context(private->ctx); soap_end(soap); // clean up everything and close socket return 0; diff --git a/org.glite.jp.index/src/db_ops.c b/org.glite.jp.index/src/db_ops.c index 164f402..7cd56a5 100644 --- a/org.glite.jp.index/src/db_ops.c +++ b/org.glite.jp.index/src/db_ops.c @@ -2,6 +2,10 @@ #include #include +#include +#include +#include +#include #include #include @@ -27,6 +31,15 @@ #define lprintf //#define lprintf printf +#define WORD_SWAP(X) ((((X) >> 8) & 0xFF) | (((X) & 0xFF) << 8)) +#define LONG_SWAP(X) (WORD_SWAP(((X) >> 16) & 0xFFFF) | ((WORD_SWAP(X) & 0xFFFF) << 16)) +#if __BYTE_ORDER == __LITTLE_ENDIAN +#define LONG_LE(X) (X) +#else +#define LONG_LE(X) LONG_SWAP(X) +#endif + +#define COND_MAGIC 0x444E4F43 static is_indexed(glite_jp_is_conf *conf, const char *attr) { size_t i; @@ -40,6 +53,191 @@ static is_indexed(glite_jp_is_conf *conf, const char *attr) { } +static size_t db_arg2_length(glite_jp_query_rec_t *query) { + size_t len; + + assert(query->op > GLITE_JP_QUERYOP_UNDEF && query->op <= GLITE_JP_QUERYOP__LAST); + len = 0; + switch (query->op) { + case GLITE_JP_QUERYOP_WITHIN: + len = query->binary ? query->size2 : strlen(query->value2) + 1; + case GLITE_JP_QUERYOP_EQUAL: + case GLITE_JP_QUERYOP_UNEQUAL: + case GLITE_JP_QUERYOP_LESS: + case GLITE_JP_QUERYOP_GREATER: + case GLITE_JP_QUERYOP_EXISTS: + len = 0; + } + + return len; +} + + +static int array_init(void **data, size_t *len, size_t *maxlen, size_t initial_len) { + *len = 0; + if ((*data = malloc(initial_len)) != NULL) { + *maxlen = initial_len; + return 0; + } else { + *maxlen = 0; + return ENOMEM; + } +} + + +static int array_add(void **data, size_t *len, size_t *maxlen, void *new_data, size_t new_data_len) { + void *tmp; + size_t ptr; + + ptr = *len; + (*len) += new_data_len; + while (*len > *maxlen) { + (*maxlen) *= 2; + if ((tmp = realloc(*data, *maxlen)) == NULL) return ENOMEM; + *data = tmp; + } + memcpy(((char *)(*data)) + ptr, new_data, new_data_len); + + return 0; +} + + +static int array_add_long(void **data, size_t *len, size_t *maxlen, uint32_t l) { + uint32_t lel; + + lel = LONG_LE(l); + return array_add(data, len, maxlen, &lel, sizeof(uint32_t)); +} + + +static uint32_t array_get_long(void **data) { + uint32_t *lel; + + lel = (uint32_t *)*data; + *data = ((char *)*data) + sizeof(uint32_t); + + return LONG_LE(*lel); +} + + +static void *array_get(void **data, size_t data_len) { + void *res; + + res = *data; + *data = ((char *)*data) + data_len; + + return res; +} + + +static int glite_jpis_db_queries_serialize(void **blob, size_t *len, glite_jp_query_rec_t **queries) { + size_t maxlen; + glite_jp_query_rec_t *query; + int ret; + size_t datalen; + + if ((ret = array_init(blob, len, &maxlen, 1024)) != 0) return ret; + query = *queries; + while(query && query->attr) { + if ((ret = array_add_long(blob, len, &maxlen, COND_MAGIC)) != 0) goto fail; + datalen = strlen(query->attr) + 1; + if ((ret = array_add_long(blob, len, &maxlen, datalen)) != 0) goto fail; + if ((ret = array_add(blob, len, &maxlen, query->attr, datalen)) != 0) goto fail; + if ((ret = array_add_long(blob, len, &maxlen, query->op)) != 0) goto fail; + if ((ret = array_add_long(blob, len, &maxlen, query->binary ? 1 : 0)) != 0) goto fail; + + datalen = query->binary ? query->size : strlen(query->value) + 1; + if ((ret = array_add_long(blob, len, &maxlen, datalen)) != 0) goto fail; + if (datalen) + if ((ret = array_add(blob, len, &maxlen, query->value, datalen)) != 0) goto fail; + + datalen = db_arg2_length(query); + if ((ret = array_add_long(blob, len, &maxlen, datalen)) != 0) goto fail; + if (datalen) + if ((ret = array_add(blob, len, &maxlen, query->value2, datalen)) != 0) goto fail; + + query++; + } + + return 0; +fail: + free(*blob); + *len = 0; + return ret; +} + + +static int glite_jpis_db_queries_deserialize(glite_jp_query_rec_t ***queries, void *blob, size_t blob_size) { + size_t maxlen, len, datalen; + void *blob_ptr, *blob_end; + int ret; + uint32_t l; + glite_jp_query_rec_t *query; + int i; + + if ((ret = array_init((void **)queries, &len, &maxlen, 512)) != 0) return ret; + blob_ptr = blob; + blob_end = (char *)blob + blob_size; + while (blob_end > blob_ptr) { + ret = ENOMEM; + if ((query = calloc(sizeof(*query), 1)) == NULL) goto fail; + l = array_get_long(&blob_ptr); + if (l != COND_MAGIC) { + printf("blob=%p, blob_ptr=%p, 0x%08" PRIX32 "\n", blob, blob_ptr, l); + ret = EINVAL; + goto fail_query; + } + + datalen = array_get_long(&blob_ptr); + if (datalen) { + if ((query->attr = malloc(datalen)) == NULL) goto fail_query; + memcpy(query->attr, array_get(&blob_ptr, datalen), datalen); + } else query->attr = NULL; + + query->op = array_get_long(&blob_ptr); + query->binary = array_get_long(&blob_ptr); + + datalen = array_get_long(&blob_ptr); + if (datalen) { + if ((query->value = malloc(datalen)) == NULL) goto fail_query; + memcpy(query->value, array_get(&blob_ptr, datalen), datalen); + } else query->value = NULL; + query->size = datalen; + + datalen = array_get_long(&blob_ptr); + if (datalen) { + if ((query->value2 = malloc(datalen)) == NULL) goto fail_query; + memcpy(query->value2, array_get(&blob_ptr, datalen), datalen); + } else query->value2 = NULL; + query->size2 = datalen; + + if ((ret = array_add((void **)queries, &len, &maxlen, &query, sizeof(query))) != 0) goto fail_query; + } + assert(blob_end == blob_ptr); + + query = NULL; + if ((ret = array_add((void **)queries, &len, &maxlen, &query, sizeof(query))) != 0) goto fail; + + return 0; + +fail_query: + free(query); +fail: + i = 0; + query = (*queries)[i]; + while (query && query->attr) { + free(query->attr); + free(query->value); + free(query->value2); + free(query); + i++; + query = (*queries)[i]; + } + free(*queries); + return ret; +} + + /* Init the database. * * \retval 0 OK @@ -52,11 +250,13 @@ int glite_jpis_initDatabase(glite_jp_context_t ctx, glite_jp_is_conf *conf) { const char *type_index, *type_full; size_t i; MYSQL_BIND param[4]; - unsigned long attrid_len, name_len, type_len, source_len; - char attrid[33], name[256], type[33], source[256]; + unsigned long attrid_len, name_len, type_len, source_len, dbconds_len; + char attrid[33], name[256], type[33], source[256], dbconds[1024]; int indexed, state, locked; + size_t conds_len; char sql[512]; glite_jp_is_feed **feeds; + void *conds; glite_jp_db_assign_param(¶m[0], MYSQL_TYPE_VAR_STRING, attrid, &attrid_len); glite_jp_db_assign_param(¶m[1], MYSQL_TYPE_VAR_STRING, name, &name_len); @@ -101,7 +301,8 @@ int glite_jpis_initDatabase(glite_jp_context_t ctx, glite_jp_is_conf *conf) { glite_jp_db_assign_param(¶m[0], MYSQL_TYPE_LONG, &state); glite_jp_db_assign_param(¶m[1], MYSQL_TYPE_LONG, &locked); glite_jp_db_assign_param(¶m[2], MYSQL_TYPE_VAR_STRING, source, &source_len); - if (glite_jp_db_prepare(ctx, "INSERT INTO feeds (state, locked, source) VALUES (?, ?, ?)", &stmt, param, NULL) != 0) goto fail; + glite_jp_db_assign_param(¶m[3], MYSQL_TYPE_MEDIUM_BLOB, dbconds, &dbconds_len); + if (glite_jp_db_prepare(ctx, "INSERT INTO feeds (state, locked, source, condition) VALUES (?, ?, ?, ?)", &stmt, param, NULL) != 0) goto fail; feeds = conf->feeds; i = 0; memset(source, 0, sizeof(source)); @@ -111,7 +312,12 @@ int glite_jpis_initDatabase(glite_jp_context_t ctx, glite_jp_is_conf *conf) { locked = 0; strncpy(source, feeds[i]->PS_URL, sizeof(source) - 1); source_len = strlen(source); - if (glite_jp_db_execute(stmt) == -1) goto fail_stmt; + assert(glite_jpis_db_queries_serialize(&conds, &conds_len, feeds[i]->query) == 0); + assert(conds_len <= sizeof(dbconds)); + dbconds_len = conds_len; + memcpy(dbconds, conds, conds_len); + free(conds); + if (glite_jp_db_execute(stmt) == -1) goto fail_conds; i++; } @@ -119,6 +325,8 @@ int glite_jpis_initDatabase(glite_jp_context_t ctx, glite_jp_is_conf *conf) { return 0; +fail_conds: + free(conds); fail_stmt: glite_jp_db_freestmt(&stmt); fail: @@ -164,6 +372,61 @@ fail: } +int glite_jpis_init_context(glite_jpis_context_t *isctx, glite_jp_context_t jpctx) { + int ret; + MYSQL_BIND myparam[3]; + MYSQL_BIND myres[2]; + + *isctx = calloc(sizeof(**isctx), 1); + + (*isctx)->jpctx = jpctx; + if ((ret = glite_jp_db_connect(jpctx, GLITE_JP_IS_DEFAULTCS)) != 0) goto fail; + + // sql command: select an uninitialized unlocked feed + glite_jp_db_assign_result(&myres[0], MYSQL_TYPE_LONG, NULL, &((*isctx)->param_uniqueid)); + glite_jp_db_assign_result(&myres[1], MYSQL_TYPE_VAR_STRING, NULL, (*isctx)->param_ps, sizeof((*isctx)->param_ps), &(*isctx)->param_ps_len); + if ((ret = glite_jp_db_prepare(jpctx, "SELECT uniqueid, source FROM feeds WHERE (locked=0) AND (feedid IS NULL)", &(*isctx)->select_feed_stmt, NULL, myres)) != 0) goto fail_connect; + + // sql command: lock the feed (via uniqueid) + glite_jp_db_assign_param(&myparam[0], MYSQL_TYPE_LONG, &(*isctx)->param_uniqueid); + if ((ret = glite_jp_db_prepare(jpctx, "UPDATE feeds SET locked=1 WHERE (locked = 0) AND (uniqueid = ?)", &(*isctx)->lock_feed_stmt, myparam, NULL)) != 0) goto fail_cmd; + + // sql command: assign the feed (via uniqueid) + glite_jp_db_assign_param(&myparam[0], MYSQL_TYPE_VAR_STRING, (*isctx)->param_feedid, &(*isctx)->param_feedid_len); + glite_jp_db_assign_param(&myparam[1], MYSQL_TYPE_DATETIME, &(*isctx)->param_expires); + glite_jp_db_assign_param(&myparam[2], MYSQL_TYPE_LONG, &(*isctx)->param_uniqueid); + if ((ret = glite_jp_db_prepare(jpctx, "UPDATE feeds SET feedid=?, expires=? WHERE (uniqueid=?)", &(*isctx)->init_feed_stmt, myparam, NULL)) != 0) goto fail_cmd2; + + // sql command: unlock the feed (via uniqueid) + glite_jp_db_assign_param(&myparam[0], MYSQL_TYPE_LONG, &(*isctx)->param_uniqueid); + if ((ret = glite_jp_db_prepare(jpctx, "UPDATE feeds SET locked=0 WHERE (uniqueid=?)", &(*isctx)->unlock_feed_stmt, myparam, NULL)) != 0) goto fail_cmd3; + + return 0; + +fail_cmd3: + glite_jp_db_freestmt(&(*isctx)->init_feed_stmt); +fail_cmd2: + glite_jp_db_freestmt(&(*isctx)->lock_feed_stmt); +fail_cmd: + glite_jp_db_freestmt(&(*isctx)->select_feed_stmt); +fail_connect: + glite_jp_db_close((*isctx)->jpctx); +fail: + free(*isctx); + return ret; +} + + +int glite_jpis_free_context(glite_jpis_context_t ctx) { + glite_jp_db_freestmt(&ctx->select_feed_stmt); + glite_jp_db_freestmt(&ctx->lock_feed_stmt); + glite_jp_db_freestmt(&ctx->init_feed_stmt); + glite_jp_db_freestmt(&ctx->unlock_feed_stmt); + glite_jp_db_close(ctx->jpctx); + free(ctx); +} + + /* Find first unitialized feed, lock it and return URL of corresponding PS * * Return value: @@ -171,21 +434,58 @@ fail: * ENOENT - no more feeds to initialize * ENOLCK - error during locking */ -int glite_jpis_lockUninitializedFeed(glite_jp_context_t ctx, char **PS_URL) +int glite_jpis_lockUninitializedFeed(glite_jpis_context_t ctx, long int *uniqueid, char **PS_URL) { - return ENOENT; + int ret; + + do { + switch (glite_jp_db_execute(ctx->select_feed_stmt)) { + case -1: lprintf("error selecting unlocked feed\n"); return ENOLCK; + case 0: lprintf("no more uninit. feeds unlocked\n"); return ENOENT; + default: break; + } + if (glite_jp_db_fetch(ctx->select_feed_stmt) != 0) return ENOLCK; + lprintf("selected uninit. feed %lu\n", ctx->param_uniqueid); + + ret = glite_jp_db_execute(ctx->lock_feed_stmt); + lprintf("locked %d feeds (uniqueid=%lu)\n", ret, ctx->param_uniqueid); + } while (ret != 1); + + *uniqueid = ctx->param_uniqueid; + if (PS_URL) *PS_URL = strdup(ctx->param_ps); + + return 0; } /* Store feed ID and expiration time returned by PS for locked feed. */ -void glite_jpis_feedInit(glite_jp_context_t ctx, char *PS_URL, char *feedId, time_t feedExpires) +int glite_jpis_initFeed(glite_jpis_context_t ctx, long int uniqueid, char *feedId, time_t feedExpires) { + int ret; + + memset(ctx->param_feedid, 0, sizeof(ctx->param_feedid)); + memset(&ctx->param_expires, 0, sizeof(ctx->param_expires)); + strncpy(ctx->param_feedid, feedId, sizeof(ctx->param_feedid) - 1); + ctx->param_feedid_len = strlen(ctx->param_feedid) + 1; + glite_jp_db_assign_time(&ctx->param_expires, feedExpires); + ctx->param_uniqueid = uniqueid; + + ret = glite_jp_db_execute(ctx->init_feed_stmt); + lprintf("initializing feed, uniqueid=%li, result=%d\n", uniqueid, ret); + + return ret == 1 ? 0 : ENOLCK; } + /* Unlock given feed */ -void glite_jpis_unlockFeed(glite_jp_context_t ctx, char *PS_URL) -{ -} +int glite_jpis_unlockFeed(glite_jpis_context_t ctx, long int uniqueid) { + int ret; + ctx->param_uniqueid = uniqueid; + ret = glite_jp_db_execute(ctx->unlock_feed_stmt); + lprintf("unlocking feed, uniqueid=%li, result=%d\n", uniqueid, ret); + + return ret == 1 ? 0 : ENOLCK; +} diff --git a/org.glite.jp.index/src/db_ops.h b/org.glite.jp.index/src/db_ops.h index 6c8b5f1..a93cc8e 100644 --- a/org.glite.jp.index/src/db_ops.h +++ b/org.glite.jp.index/src/db_ops.h @@ -5,17 +5,34 @@ #include #include +#include #include "conf.h" +#define GLITE_JP_IS_DEFAULTCS "jpis/@localhost:jpis1" + #define GLITE_JP_IS_STATE_HIST 1 #define GLITE_JP_IS_STATE_CONT 2 +typedef struct _glite_jpis_context { + glite_jp_context_t jpctx; + glite_jp_db_stmt_t select_feed_stmt, lock_feed_stmt, init_feed_stmt, unlock_feed_stmt; + long int param_uniqueid; + char param_feedid[33], param_ps[256]; + unsigned long param_ps_len, param_feedid_len; + MYSQL_TIME param_expires; +} *glite_jpis_context_t; + + int glite_jpis_initDatabase(glite_jp_context_t ctx, glite_jp_is_conf *conf); int glite_jpis_dropDatabase(glite_jp_context_t ctx); -int glite_jpis_lockUninitializedFeed(glite_jp_context_t ctx, char **PS_URL); -void glite_jpis_feedInit(glite_jp_context_t ctx, char *PS_URL, char *feedId, time_t feedExpires); -void glite_jpis_unlockFeed(glite_jp_context_t ctx, char *PS_URL); + +int glite_jpis_init_context(glite_jpis_context_t *isctx, glite_jp_context_t jpctx); +int glite_jpis_free_context(glite_jpis_context_t ctx); + +int glite_jpis_lockUninitializedFeed(glite_jpis_context_t ctx, long int *uinqueid, char **PS_URL); +int glite_jpis_initFeed(glite_jpis_context_t ctx, long int uniqueid, char *feedId, time_t feedExpires); +int glite_jpis_unlockFeed(glite_jpis_context_t ctx, long int uniqueid); #endif diff --git a/org.glite.jp.index/src/soap_ps_calls.c b/org.glite.jp.index/src/soap_ps_calls.c index b3b1d55..8eae0e5 100644 --- a/org.glite.jp.index/src/soap_ps_calls.c +++ b/org.glite.jp.index/src/soap_ps_calls.c @@ -77,7 +77,7 @@ static int find_dest_index(glite_jp_is_conf *conf, char *dest) // call PS FeedIndex for a given destination -void MyFeedIndex(glite_jp_context_t ctx, glite_jp_is_conf *conf, char *dest) +void MyFeedIndex(glite_jpis_context_t ctx, glite_jp_is_conf *conf, long int uniqueid, char *dest) { struct _jpelem__FeedIndex in; struct _jpelem__FeedIndexResponse out; @@ -119,13 +119,13 @@ printf("MyFeedIndex for %s called\n", dest); //if (!check_fault(soap,soap_call_jpsrv___FeedIndex(soap,dest,"", if (soap_call___jpsrv__FeedIndex(soap,dest,"", &in, &out)) { printf("soap_call___jpsrv__FeedIndex() returned error\n"); - glite_jpis_unlockFeed(ctx, dest); + glite_jpis_unlockFeed(ctx, uniqueid); goto err; } else { printf("FeedId: %s\nExpires: %s\n",out.feedId,ctime(&out.feedExpires)); - glite_jpis_feedInit(ctx, dest, out.feedId, out.feedExpires); - glite_jpis_unlockFeed(ctx, dest); + glite_jpis_initFeed(ctx, uniqueid, out.feedId, out.feedExpires); + glite_jpis_unlockFeed(ctx, uniqueid); } err: diff --git a/org.glite.jp.index/src/soap_ps_calls.h b/org.glite.jp.index/src/soap_ps_calls.h index bcf6bbb..cb2c992 100644 --- a/org.glite.jp.index/src/soap_ps_calls.h +++ b/org.glite.jp.index/src/soap_ps_calls.h @@ -5,6 +5,6 @@ #include "conf.h" -void MyFeedIndex(glite_jp_context_t ctx, glite_jp_is_conf *conf, char *dest); +void MyFeedIndex(glite_jpis_context_t ctx, glite_jp_is_conf *conf, long int uniqueid, char *dest); #endif