From 8b1475cc82843f59f2ebd843a4a953a30e0a032b Mon Sep 17 00:00:00 2001 From: =?utf8?q?Franti=C5=A1ek=20Dvo=C5=99=C3=A1k?= Date: Thu, 29 Sep 2005 15:49:04 +0000 Subject: [PATCH] UpdateJob operation. --- org.glite.jp.index/examples/jpis-test.c | 5 +-- org.glite.jp.index/src/bones_server.c | 4 ++ org.glite.jp.index/src/db_ops.c | 75 ++++++++++++++++++++++++++++----- org.glite.jp.index/src/db_ops.h | 8 +++- org.glite.jp.index/src/soap_ops.c | 61 ++++++++++++++++++++++++--- org.glite.jp.index/src/soap_ps_calls.c | 3 +- org.glite.jp.index/src/ws_typeref.c | 28 +++++++++++- org.glite.jp.index/src/ws_typeref.h | 8 ++++ 8 files changed, 167 insertions(+), 25 deletions(-) create mode 100644 org.glite.jp.index/src/ws_typeref.h diff --git a/org.glite.jp.index/examples/jpis-test.c b/org.glite.jp.index/examples/jpis-test.c index 3116ad5..185a35a 100644 --- a/org.glite.jp.index/examples/jpis-test.c +++ b/org.glite.jp.index/examples/jpis-test.c @@ -3,7 +3,7 @@ #include #include -#include "glite/security/glite_gsplugin.h" +#include #include "jpis_H.h" #include "jpis_.nsmap" @@ -67,8 +67,7 @@ int main(int argc,char *argv[]) int opt; struct soap *soap = soap_new(); - - soap_init(soap); + soap_init(soap); soap_set_namespaces(soap, jpis__namespaces); soap_register_plugin(soap,glite_gsplugin); diff --git a/org.glite.jp.index/src/bones_server.c b/org.glite.jp.index/src/bones_server.c index dbe56a6..3f9280f 100644 --- a/org.glite.jp.index/src/bones_server.c +++ b/org.glite.jp.index/src/bones_server.c @@ -3,6 +3,7 @@ #include #include #include +#include #include "glite/jp/types.h" #include "glite/jp/context.h" @@ -204,6 +205,8 @@ static int data_init(void **data) MyFeedIndex(private->ctx, conf, uniqueid, PS_URL); free(PS_URL); PS_URL = NULL; +// FIXME: infinite retrying on fail ==> fast increasing of used resources + sleep(2); break; } } while (1); @@ -226,6 +229,7 @@ static int newconn(int conn,struct timeval *to,void *data) soap_init2(soap,SOAP_IO_KEEPALIVE,SOAP_IO_KEEPALIVE); soap_set_namespaces(soap,jpis__namespaces); + soap->user = (void *) ctx; /* not yet: client to JP Storage Server * probably wil come to other place, just not forget it.... diff --git a/org.glite.jp.index/src/db_ops.c b/org.glite.jp.index/src/db_ops.c index 7cd56a5..38ae666 100644 --- a/org.glite.jp.index/src/db_ops.c +++ b/org.glite.jp.index/src/db_ops.c @@ -27,9 +27,15 @@ INDEX (jobid),\n\ INDEX (value)\n\ );" +#define SQLCMD_INSERT_ATTRVAL "INSERT INTO " TABLE_PREFIX_DATA "%s (jobid, value, full_value) VALUES (\n\ + '%s',\n\ + '%s',\n\ + '%s'\n\ +)" +#define INDEX_LENGTH 255 -#define lprintf -//#define lprintf printf +//#define lprintf +#define lprintf printf #define WORD_SWAP(X) ((((X) >> 8) & 0xFF) | (((X) & 0xFF) << 8)) #define LONG_SWAP(X) (WORD_SWAP(((X) >> 16) & 0xFFFF) | ((WORD_SWAP(X) & 0xFFFF) << 16)) @@ -238,6 +244,14 @@ fail: } +/** + * Convert attribute name to attribute id. + */ +char *glite_jpis_attr_name2id(const char *name) { + return str2md5(name); +} + + /* Init the database. * * \retval 0 OK @@ -271,10 +285,10 @@ int glite_jpis_initDatabase(glite_jp_context_t ctx, glite_jp_is_conf *conf) { i = 0; while (attrs[i]) { type_full = glite_jp_attrval_db_type_full(ctx, attrs[i]); - type_index = glite_jp_attrval_db_type_index(ctx, attrs[i], 255); + type_index = glite_jp_attrval_db_type_index(ctx, attrs[i], INDEX_LENGTH); // attrid column - tmp = str2md5(attrs[i]); + tmp = glite_jpis_attr_name2id(attrs[i]); strncpy(attrid, tmp, sizeof(attrid) - 1); free(tmp); attrid_len = strlen(attrid); @@ -385,7 +399,7 @@ int glite_jpis_init_context(glite_jpis_context_t *isctx, glite_jp_context_t jpct // sql command: select an uninitialized unlocked feed glite_jp_db_assign_result(&myres[0], MYSQL_TYPE_LONG, NULL, &((*isctx)->param_uniqueid)); glite_jp_db_assign_result(&myres[1], MYSQL_TYPE_VAR_STRING, NULL, (*isctx)->param_ps, sizeof((*isctx)->param_ps), &(*isctx)->param_ps_len); - if ((ret = glite_jp_db_prepare(jpctx, "SELECT uniqueid, source FROM feeds WHERE (locked=0) AND (feedid IS NULL)", &(*isctx)->select_feed_stmt, NULL, myres)) != 0) goto fail_connect; + if ((ret = glite_jp_db_prepare(jpctx, "SELECT uniqueid, source FROM feeds WHERE (locked=0) AND (feedid IS NULL)", &(*isctx)->select_unlocked_feed_stmt, NULL, myres)) != 0) goto fail_connect; // sql command: lock the feed (via uniqueid) glite_jp_db_assign_param(&myparam[0], MYSQL_TYPE_LONG, &(*isctx)->param_uniqueid); @@ -395,20 +409,36 @@ int glite_jpis_init_context(glite_jpis_context_t *isctx, glite_jp_context_t jpct glite_jp_db_assign_param(&myparam[0], MYSQL_TYPE_VAR_STRING, (*isctx)->param_feedid, &(*isctx)->param_feedid_len); glite_jp_db_assign_param(&myparam[1], MYSQL_TYPE_DATETIME, &(*isctx)->param_expires); glite_jp_db_assign_param(&myparam[2], MYSQL_TYPE_LONG, &(*isctx)->param_uniqueid); - if ((ret = glite_jp_db_prepare(jpctx, "UPDATE feeds SET feedid=?, expires=? WHERE (uniqueid=?)", &(*isctx)->init_feed_stmt, myparam, NULL)) != 0) goto fail_cmd2; + if ((ret = glite_jp_db_prepare(jpctx, "UPDATE feeds SET feedid=?, expires=? WHERE (uniqueid=?)", &(*isctx)->init_feed_stmt, myparam, NULL)) != 0) goto fail_cmd2; // sql command: unlock the feed (via uniqueid) glite_jp_db_assign_param(&myparam[0], MYSQL_TYPE_LONG, &(*isctx)->param_uniqueid); - if ((ret = glite_jp_db_prepare(jpctx, "UPDATE feeds SET locked=0 WHERE (uniqueid=?)", &(*isctx)->unlock_feed_stmt, myparam, NULL)) != 0) goto fail_cmd3; + if ((ret = glite_jp_db_prepare(jpctx, "UPDATE feeds SET locked=0 WHERE (uniqueid=?)", &(*isctx)->unlock_feed_stmt, myparam, NULL)) != 0) goto fail_cmd3; + + // sql command: get info about the feed (via feedid) + glite_jp_db_assign_param(&myparam[0], MYSQL_TYPE_STRING, (*isctx)->param_feedid, &(*isctx)->param_feedid_len); + glite_jp_db_assign_result(&myres[0], MYSQL_TYPE_LONG, NULL, &(*isctx)->param_uniqueid); + glite_jp_db_assign_result(&myres[1], MYSQL_TYPE_LONG, NULL, &(*isctx)->param_state); + if ((ret = glite_jp_db_prepare(jpctx, "SELECT uniqueid, state FROM feeds WHERE (feedid=?)", &(*isctx)->select_info_feed_stmt, myparam, myres)) != 0) goto fail_cmd4; + + // sql command: update state of the feed (via uniqueid) + glite_jp_db_assign_param(&myparam[0], MYSQL_TYPE_LONG, &(*isctx)->param_state); + glite_jp_db_assign_param(&myparam[1], MYSQL_TYPE_LONG, &(*isctx)->param_uniqueid); + if ((ret = glite_jp_db_prepare(jpctx, "UPDATE feeds SET state=? WHERE (uniqueid=?)", &(*isctx)->update_state_feed_stmt, myparam, NULL)) != 0) goto fail_cmd5; return 0; + glite_jp_db_freestmt(&(*isctx)->update_state_feed_stmt); +fail_cmd5: + glite_jp_db_freestmt(&(*isctx)->select_info_feed_stmt); +fail_cmd4: + glite_jp_db_freestmt(&(*isctx)->unlock_feed_stmt); fail_cmd3: glite_jp_db_freestmt(&(*isctx)->init_feed_stmt); fail_cmd2: glite_jp_db_freestmt(&(*isctx)->lock_feed_stmt); fail_cmd: - glite_jp_db_freestmt(&(*isctx)->select_feed_stmt); + glite_jp_db_freestmt(&(*isctx)->select_unlocked_feed_stmt); fail_connect: glite_jp_db_close((*isctx)->jpctx); fail: @@ -418,10 +448,12 @@ fail: int glite_jpis_free_context(glite_jpis_context_t ctx) { - glite_jp_db_freestmt(&ctx->select_feed_stmt); + glite_jp_db_freestmt(&ctx->select_unlocked_feed_stmt); glite_jp_db_freestmt(&ctx->lock_feed_stmt); glite_jp_db_freestmt(&ctx->init_feed_stmt); glite_jp_db_freestmt(&ctx->unlock_feed_stmt); + glite_jp_db_freestmt(&ctx->select_info_feed_stmt); + glite_jp_db_freestmt(&ctx->update_state_feed_stmt); glite_jp_db_close(ctx->jpctx); free(ctx); } @@ -439,12 +471,12 @@ int glite_jpis_lockUninitializedFeed(glite_jpis_context_t ctx, long int *uniquei int ret; do { - switch (glite_jp_db_execute(ctx->select_feed_stmt)) { + switch (glite_jp_db_execute(ctx->select_unlocked_feed_stmt)) { case -1: lprintf("error selecting unlocked feed\n"); return ENOLCK; case 0: lprintf("no more uninit. feeds unlocked\n"); return ENOENT; default: break; } - if (glite_jp_db_fetch(ctx->select_feed_stmt) != 0) return ENOLCK; + if (glite_jp_db_fetch(ctx->select_unlocked_feed_stmt) != 0) return ENOLCK; lprintf("selected uninit. feed %lu\n", ctx->param_uniqueid); ret = glite_jp_db_execute(ctx->lock_feed_stmt); @@ -489,3 +521,24 @@ int glite_jpis_unlockFeed(glite_jpis_context_t ctx, long int uniqueid) { return ret == 1 ? 0 : ENOLCK; } + + +int glite_jpis_insertAttrVal(glite_jpis_context_t ctx, const char *jobid, glite_jp_attrval_t *av) { + char *sql, *table, *value, *full_value; + + table = glite_jpis_attr_name2id(av->name); + value = glite_jp_attrval_to_db_index(ctx->jpctx, av, INDEX_LENGTH); + full_value = glite_jp_attrval_to_db_full(ctx->jpctx, av); + asprintf(&sql, SQLCMD_INSERT_ATTRVAL, table, jobid, value, full_value); + free(table); + free(value); + free(full_value); + lprintf("%s: sql=%s\n", __FUNCTION__, sql); + if (glite_jp_db_execstmt(ctx->jpctx, sql, NULL) != 1) { + free(sql); + return ctx->jpctx->error->code; + } + free(sql); + + return 0; +} diff --git a/org.glite.jp.index/src/db_ops.h b/org.glite.jp.index/src/db_ops.h index a93cc8e..3dbca3c 100644 --- a/org.glite.jp.index/src/db_ops.h +++ b/org.glite.jp.index/src/db_ops.h @@ -13,17 +13,19 @@ #define GLITE_JP_IS_STATE_HIST 1 #define GLITE_JP_IS_STATE_CONT 2 +#define GLITE_JP_IS_STATE_DONE 4 typedef struct _glite_jpis_context { glite_jp_context_t jpctx; - glite_jp_db_stmt_t select_feed_stmt, lock_feed_stmt, init_feed_stmt, unlock_feed_stmt; - long int param_uniqueid; + glite_jp_db_stmt_t select_unlocked_feed_stmt, lock_feed_stmt, init_feed_stmt, unlock_feed_stmt, select_info_feed_stmt, update_state_feed_stmt; + long int param_uniqueid, param_state; char param_feedid[33], param_ps[256]; unsigned long param_ps_len, param_feedid_len; MYSQL_TIME param_expires; } *glite_jpis_context_t; +char *glite_jpis_attr_name2id(const char *name); int glite_jpis_initDatabase(glite_jp_context_t ctx, glite_jp_is_conf *conf); int glite_jpis_dropDatabase(glite_jp_context_t ctx); @@ -35,4 +37,6 @@ int glite_jpis_lockUninitializedFeed(glite_jpis_context_t ctx, long int *uinquei int glite_jpis_initFeed(glite_jpis_context_t ctx, long int uniqueid, char *feedId, time_t feedExpires); int glite_jpis_unlockFeed(glite_jpis_context_t ctx, long int uniqueid); +int glite_jpis_insertAttrVal(glite_jpis_context_t ctx, const char *jobid, glite_jp_attrval_t *av); + #endif diff --git a/org.glite.jp.index/src/soap_ops.c b/org.glite.jp.index/src/soap_ops.c index 2b9d7a6..e1a17ad 100644 --- a/org.glite.jp.index/src/soap_ops.c +++ b/org.glite.jp.index/src/soap_ops.c @@ -1,14 +1,14 @@ #include #include +#include -#include "glite/jp/types.h" -#include "glite/jp/context.h" +#include +#include #include "jpis_H.h" #include "jpis_.nsmap" #include "soap_version.h" - - +#include "db_ops.h" @@ -64,16 +64,67 @@ static void err2fault(const glite_jp_context_t ctx,struct soap *soap) /*-----------------------------------------*/ -#define CONTEXT_FROM_SOAP(soap,ctx) glite_jp_context_t ctx = (glite_jp_context_t) ((soap)->user) +#define CONTEXT_FROM_SOAP(soap,ctx) glite_jpis_context_t ctx = (glite_jpis_context_t) ((soap)->user) + + +static int updateJob(glite_jpis_context_t isctx, struct jptype__jobRecord *jobAttrs) { + glite_jp_attrval_t av; + struct jptype__attrValue *attr; + int ret, iattrs; + + printf("%s: jobid='%s', attrs=%d\n", __FUNCTION__, jobAttrs->jobid, jobAttrs->__sizeattributes); + + if (jobAttrs->remove) assert(*(jobAttrs->remove) == 0); + + for (iattrs = 0; iattrs < jobAttrs->__sizeattributes; iattrs++) { + attr = jobAttrs->attributes[iattrs]; + SoapToAttrVal(&av, attr); + if ((ret = glite_jpis_insertAttrVal(isctx, jobAttrs->jobid, &av)) != 0) return ret; + } + + return 0; +} + SOAP_FMAC5 int SOAP_FMAC6 __jpsrv__UpdateJobs( struct soap *soap, struct _jpelem__UpdateJobs *jpelem__UpdateJobs, struct _jpelem__UpdateJobsResponse *jpelem__UpdateJobsResponse) { + int ret, ijobs; + const char *feedid; + int status, done; + CONTEXT_FROM_SOAP(soap, isctx); + glite_jp_context_t jpctx = isctx->jpctx; + // XXX: test client in examples/jpis-test // sends to this function some data for testing puts(__FUNCTION__); + + // get info about the feed + feedid = jpelem__UpdateJobs->feedId; + memset(isctx->param_feedid, 0, sizeof(isctx->param_feedid)); + strncpy(isctx->param_feedid, feedid, sizeof(isctx->param_feedid) - 1); + if ((ret = glite_jp_db_execute(isctx->select_info_feed_stmt)) != 1) { + fprintf(stderr, "can't get info about '%s', returned %d records: %s (%s)\n", feedid, ret, jpctx->error->desc, jpctx->error->source); + return SOAP_FAULT; + } + // update status, if needed (only oring) + status = isctx->param_state; + done = jpelem__UpdateJobs->feedDone ? GLITE_JP_IS_STATE_DONE : 0; + if ((done != (status & GLITE_JP_IS_STATE_DONE)) && done) { + isctx->param_state != done; + if ((ret = glite_jp_db_execute(isctx->update_state_feed_stmt)) != 1) { + fprintf(stderr, "can't update state of '%s', returned %d records: %s (%s)\n", feedid, ret, jpctx->error->desc, jpctx->error->source); + return SOAP_FAULT; + } + } + + // insert all attributes + for (ijobs = 0; ijobs < jpelem__UpdateJobs->__sizejobAttributes; ijobs++) { + if (updateJob(isctx, jpelem__UpdateJobs->jobAttributes[ijobs]) != 0) return SOAP_FAULT; + } + return SOAP_OK; } diff --git a/org.glite.jp.index/src/soap_ps_calls.c b/org.glite.jp.index/src/soap_ps_calls.c index 8eae0e5..85532b3 100644 --- a/org.glite.jp.index/src/soap_ps_calls.c +++ b/org.glite.jp.index/src/soap_ps_calls.c @@ -10,11 +10,10 @@ #include "conf.h" #include "db_ops.h" +#include "ws_typeref.h" #include "stdsoap2.h" -extern int glite_jpis_QueryCondToSoap(struct soap *soap, glite_jp_query_rec_t *in, struct jptype__primaryQuery **out); - /*------------------*/ /* Helper functions */ diff --git a/org.glite.jp.index/src/ws_typeref.c b/org.glite.jp.index/src/ws_typeref.c index e25b66b..8f5db7f 100644 --- a/org.glite.jp.index/src/ws_typeref.c +++ b/org.glite.jp.index/src/ws_typeref.c @@ -7,6 +7,7 @@ #include "jpps_H.h" #include "ws_typemap.h" +#include "ws_typeref.h" @@ -110,6 +111,29 @@ int glite_jpis_QueryCondToSoap( return SOAP_OK; } +static void SoapToAttrOrig(glite_jp_attr_orig_t *out, const enum jptype__attrOrig in) +{ + switch ( in ) + { + case jptype__attrOrig__SYSTEM: *out = GLITE_JP_ATTR_ORIG_SYSTEM; break; + case jptype__attrOrig__USER: *out = GLITE_JP_ATTR_ORIG_USER; break; + case jptype__attrOrig__FILE_: *out = GLITE_JP_ATTR_ORIG_FILE; break; + default: assert(0); break; + } +} - - +void SoapToAttrVal(glite_jp_attrval_t *av, const struct jptype__attrValue *attr) { + memset(av, 0, sizeof(*av)); + av->name = attr->name; + av->binary = attr->value->blob ? 1 : 0; + assert(av->binary || attr->value->string); + if (av->binary) { + av->value = attr->value->blob->__ptr; + av->size =attr->value->blob->__size ; + } else { + av->size = -1; + } + SoapToAttrOrig(&av->origin, attr->origin); + av->origin_detail = attr->originDetail; + av->timestamp = attr->timestamp; +} diff --git a/org.glite.jp.index/src/ws_typeref.h b/org.glite.jp.index/src/ws_typeref.h new file mode 100644 index 0000000..9aae027 --- /dev/null +++ b/org.glite.jp.index/src/ws_typeref.h @@ -0,0 +1,8 @@ +#ifndef GLITE_JPIS_TYPEREF_H +#define GLITE_JPIS_TYPEREF_H + +int glite_jpis_QueryCondToSoap(struct soap *soap, glite_jp_query_rec_t *in, struct jptype__primaryQuery **out); + +void SoapToAttrVal(glite_jp_attrval_t *av, const struct jptype__attrValue *attr); + +#endif -- 1.8.2.3