UpdateJob operation.
authorFrantišek Dvořák <valtri@civ.zcu.cz>
Thu, 29 Sep 2005 15:49:04 +0000 (15:49 +0000)
committerFrantišek Dvořák <valtri@civ.zcu.cz>
Thu, 29 Sep 2005 15:49:04 +0000 (15:49 +0000)
org.glite.jp.index/examples/jpis-test.c
org.glite.jp.index/src/bones_server.c
org.glite.jp.index/src/db_ops.c
org.glite.jp.index/src/db_ops.h
org.glite.jp.index/src/soap_ops.c
org.glite.jp.index/src/soap_ps_calls.c
org.glite.jp.index/src/ws_typeref.c
org.glite.jp.index/src/ws_typeref.h [new file with mode: 0644]

index 3116ad5..185a35a 100644 (file)
@@ -3,7 +3,7 @@
 #include <string.h>
 #include <assert.h>
 
-#include "glite/security/glite_gsplugin.h"
+#include <glite/security/glite_gsplugin.h>
 
 #include "jpis_H.h"
 #include "jpis_.nsmap"
@@ -67,8 +67,7 @@ int main(int argc,char *argv[])
        int     opt;
        struct soap     *soap = soap_new();
 
-
-       soap_init(soap);
+       soap_init(soap);        
        soap_set_namespaces(soap, jpis__namespaces);
 
        soap_register_plugin(soap,glite_gsplugin);
index dbe56a6..3f9280f 100644 (file)
@@ -3,6 +3,7 @@
 #include <sys/types.h>
 #include <sys/socket.h>
 #include <netinet/in.h>
+#include <unistd.h>
 
 #include "glite/jp/types.h"
 #include "glite/jp/context.h"
@@ -204,6 +205,8 @@ static int data_init(void **data)
                                MyFeedIndex(private->ctx, conf, uniqueid, PS_URL);
                                free(PS_URL);
                                PS_URL = NULL;
+// FIXME: infinite retrying on fail ==> fast increasing of used resources
+                               sleep(2); 
                                break;
                }
        } while (1);
@@ -226,6 +229,7 @@ static int newconn(int conn,struct timeval *to,void *data)
 
        soap_init2(soap,SOAP_IO_KEEPALIVE,SOAP_IO_KEEPALIVE);
        soap_set_namespaces(soap,jpis__namespaces);
+       soap->user = (void *) ctx;
 
 /* not yet: client to JP Storage Server
  * probably wil come to other place, just not forget it....
index 7cd56a5..38ae666 100644 (file)
         INDEX (jobid),\n\
         INDEX (value)\n\
 );"
+#define SQLCMD_INSERT_ATTRVAL "INSERT INTO " TABLE_PREFIX_DATA "%s (jobid, value, full_value) VALUES (\n\
+       '%s',\n\
+       '%s',\n\
+       '%s'\n\
+)"
+#define INDEX_LENGTH 255
 
-#define lprintf
-//#define lprintf printf
+//#define lprintf
+#define lprintf printf
 
 #define WORD_SWAP(X) ((((X) >> 8) & 0xFF) | (((X) & 0xFF) << 8))
 #define LONG_SWAP(X) (WORD_SWAP(((X) >> 16) & 0xFFFF) | ((WORD_SWAP(X) & 0xFFFF) << 16))
@@ -238,6 +244,14 @@ fail:
 }
 
 
+/**
+ * Convert attribute name to attribute id.
+ */
+char *glite_jpis_attr_name2id(const char *name) {
+       return str2md5(name);
+}
+
+
 /* Init the database. 
  *
  * \retval 0        OK
@@ -271,10 +285,10 @@ int glite_jpis_initDatabase(glite_jp_context_t ctx, glite_jp_is_conf *conf) {
        i = 0;
        while (attrs[i]) {
                type_full = glite_jp_attrval_db_type_full(ctx, attrs[i]);
-               type_index = glite_jp_attrval_db_type_index(ctx, attrs[i], 255);
+               type_index = glite_jp_attrval_db_type_index(ctx, attrs[i], INDEX_LENGTH);
 
                // attrid column
-               tmp = str2md5(attrs[i]);
+               tmp = glite_jpis_attr_name2id(attrs[i]);
                strncpy(attrid, tmp, sizeof(attrid) - 1);
                free(tmp);
                attrid_len = strlen(attrid);
@@ -385,7 +399,7 @@ int glite_jpis_init_context(glite_jpis_context_t *isctx, glite_jp_context_t jpct
        // sql command: select an uninitialized unlocked feed
        glite_jp_db_assign_result(&myres[0], MYSQL_TYPE_LONG, NULL, &((*isctx)->param_uniqueid));
        glite_jp_db_assign_result(&myres[1], MYSQL_TYPE_VAR_STRING, NULL, (*isctx)->param_ps, sizeof((*isctx)->param_ps), &(*isctx)->param_ps_len);
-       if ((ret = glite_jp_db_prepare(jpctx, "SELECT uniqueid, source FROM feeds WHERE (locked=0) AND (feedid IS NULL)", &(*isctx)->select_feed_stmt, NULL, myres)) != 0) goto fail_connect;
+       if ((ret = glite_jp_db_prepare(jpctx, "SELECT uniqueid, source FROM feeds WHERE (locked=0) AND (feedid IS NULL)", &(*isctx)->select_unlocked_feed_stmt, NULL, myres)) != 0) goto fail_connect;
 
        // sql command: lock the feed (via uniqueid)
        glite_jp_db_assign_param(&myparam[0], MYSQL_TYPE_LONG, &(*isctx)->param_uniqueid);
@@ -395,20 +409,36 @@ int glite_jpis_init_context(glite_jpis_context_t *isctx, glite_jp_context_t jpct
        glite_jp_db_assign_param(&myparam[0], MYSQL_TYPE_VAR_STRING, (*isctx)->param_feedid, &(*isctx)->param_feedid_len);
        glite_jp_db_assign_param(&myparam[1], MYSQL_TYPE_DATETIME, &(*isctx)->param_expires);
        glite_jp_db_assign_param(&myparam[2], MYSQL_TYPE_LONG, &(*isctx)->param_uniqueid);
-       if ((ret = glite_jp_db_prepare(jpctx, "UPDATE feeds SET feedid=?, expires=?  WHERE (uniqueid=?)", &(*isctx)->init_feed_stmt, myparam, NULL)) != 0) goto fail_cmd2;
+       if ((ret = glite_jp_db_prepare(jpctx, "UPDATE feeds SET feedid=?, expires=? WHERE (uniqueid=?)", &(*isctx)->init_feed_stmt, myparam, NULL)) != 0) goto fail_cmd2;
 
        // sql command: unlock the feed (via uniqueid)
        glite_jp_db_assign_param(&myparam[0], MYSQL_TYPE_LONG, &(*isctx)->param_uniqueid);
-       if ((ret = glite_jp_db_prepare(jpctx, "UPDATE feeds SET locked=0  WHERE (uniqueid=?)", &(*isctx)->unlock_feed_stmt, myparam, NULL)) != 0) goto fail_cmd3;
+       if ((ret = glite_jp_db_prepare(jpctx, "UPDATE feeds SET locked=0 WHERE (uniqueid=?)", &(*isctx)->unlock_feed_stmt, myparam, NULL)) != 0) goto fail_cmd3;
+
+       // sql command: get info about the feed (via feedid)
+       glite_jp_db_assign_param(&myparam[0], MYSQL_TYPE_STRING, (*isctx)->param_feedid, &(*isctx)->param_feedid_len);
+       glite_jp_db_assign_result(&myres[0], MYSQL_TYPE_LONG, NULL, &(*isctx)->param_uniqueid);
+       glite_jp_db_assign_result(&myres[1], MYSQL_TYPE_LONG, NULL, &(*isctx)->param_state);
+       if ((ret = glite_jp_db_prepare(jpctx, "SELECT uniqueid, state FROM feeds WHERE (feedid=?)", &(*isctx)->select_info_feed_stmt, myparam, myres)) != 0) goto fail_cmd4;
+
+       // sql command: update state of the feed (via uniqueid)
+       glite_jp_db_assign_param(&myparam[0], MYSQL_TYPE_LONG, &(*isctx)->param_state);
+       glite_jp_db_assign_param(&myparam[1], MYSQL_TYPE_LONG, &(*isctx)->param_uniqueid);
+       if ((ret = glite_jp_db_prepare(jpctx, "UPDATE feeds SET state=? WHERE (uniqueid=?)", &(*isctx)->update_state_feed_stmt, myparam, NULL)) != 0) goto fail_cmd5;
 
        return 0;
 
+       glite_jp_db_freestmt(&(*isctx)->update_state_feed_stmt);
+fail_cmd5:
+       glite_jp_db_freestmt(&(*isctx)->select_info_feed_stmt);
+fail_cmd4:
+       glite_jp_db_freestmt(&(*isctx)->unlock_feed_stmt);
 fail_cmd3:
        glite_jp_db_freestmt(&(*isctx)->init_feed_stmt);
 fail_cmd2:
        glite_jp_db_freestmt(&(*isctx)->lock_feed_stmt);
 fail_cmd:
-       glite_jp_db_freestmt(&(*isctx)->select_feed_stmt);
+       glite_jp_db_freestmt(&(*isctx)->select_unlocked_feed_stmt);
 fail_connect:
        glite_jp_db_close((*isctx)->jpctx);
 fail:
@@ -418,10 +448,12 @@ fail:
 
 
 int glite_jpis_free_context(glite_jpis_context_t ctx) {
-       glite_jp_db_freestmt(&ctx->select_feed_stmt);
+       glite_jp_db_freestmt(&ctx->select_unlocked_feed_stmt);
        glite_jp_db_freestmt(&ctx->lock_feed_stmt);
        glite_jp_db_freestmt(&ctx->init_feed_stmt);
        glite_jp_db_freestmt(&ctx->unlock_feed_stmt);
+       glite_jp_db_freestmt(&ctx->select_info_feed_stmt);
+       glite_jp_db_freestmt(&ctx->update_state_feed_stmt);
        glite_jp_db_close(ctx->jpctx);
        free(ctx);
 }
@@ -439,12 +471,12 @@ int glite_jpis_lockUninitializedFeed(glite_jpis_context_t ctx, long int *uniquei
        int ret;
 
        do {
-               switch (glite_jp_db_execute(ctx->select_feed_stmt)) {
+               switch (glite_jp_db_execute(ctx->select_unlocked_feed_stmt)) {
                case -1: lprintf("error selecting unlocked feed\n"); return ENOLCK;
                case 0: lprintf("no more uninit. feeds unlocked\n"); return ENOENT;
                default: break;
                }
-               if (glite_jp_db_fetch(ctx->select_feed_stmt) != 0) return ENOLCK;
+               if (glite_jp_db_fetch(ctx->select_unlocked_feed_stmt) != 0) return ENOLCK;
                lprintf("selected uninit. feed %lu\n", ctx->param_uniqueid);
 
                ret = glite_jp_db_execute(ctx->lock_feed_stmt);
@@ -489,3 +521,24 @@ int glite_jpis_unlockFeed(glite_jpis_context_t ctx, long int uniqueid) {
 
        return ret == 1 ? 0 : ENOLCK;
 }
+
+
+int glite_jpis_insertAttrVal(glite_jpis_context_t ctx, const char *jobid, glite_jp_attrval_t *av) {
+       char *sql, *table, *value, *full_value;
+
+       table = glite_jpis_attr_name2id(av->name);
+       value = glite_jp_attrval_to_db_index(ctx->jpctx, av, INDEX_LENGTH);
+       full_value = glite_jp_attrval_to_db_full(ctx->jpctx, av);
+       asprintf(&sql, SQLCMD_INSERT_ATTRVAL, table, jobid, value, full_value);
+       free(table);
+       free(value);
+       free(full_value);
+       lprintf("%s: sql=%s\n", __FUNCTION__, sql);
+       if (glite_jp_db_execstmt(ctx->jpctx, sql, NULL) != 1) {
+               free(sql);
+               return ctx->jpctx->error->code;
+       }
+       free(sql);
+
+       return 0;
+}
index a93cc8e..3dbca3c 100644 (file)
 
 #define GLITE_JP_IS_STATE_HIST 1
 #define GLITE_JP_IS_STATE_CONT 2
+#define GLITE_JP_IS_STATE_DONE 4
 
 
 typedef struct _glite_jpis_context {
        glite_jp_context_t jpctx;
-       glite_jp_db_stmt_t select_feed_stmt, lock_feed_stmt, init_feed_stmt, unlock_feed_stmt;
-       long int param_uniqueid;
+       glite_jp_db_stmt_t select_unlocked_feed_stmt, lock_feed_stmt, init_feed_stmt, unlock_feed_stmt, select_info_feed_stmt, update_state_feed_stmt;
+       long int param_uniqueid, param_state;
        char param_feedid[33], param_ps[256];
        unsigned long param_ps_len, param_feedid_len;
        MYSQL_TIME param_expires;
 } *glite_jpis_context_t;
 
+char *glite_jpis_attr_name2id(const char *name);
 
 int glite_jpis_initDatabase(glite_jp_context_t ctx, glite_jp_is_conf *conf);
 int glite_jpis_dropDatabase(glite_jp_context_t ctx);
@@ -35,4 +37,6 @@ int glite_jpis_lockUninitializedFeed(glite_jpis_context_t ctx, long int *uinquei
 int glite_jpis_initFeed(glite_jpis_context_t ctx, long int uniqueid, char *feedId, time_t feedExpires);
 int glite_jpis_unlockFeed(glite_jpis_context_t ctx, long int uniqueid);
 
+int glite_jpis_insertAttrVal(glite_jpis_context_t ctx, const char *jobid, glite_jp_attrval_t *av);
+
 #endif
index 2b9d7a6..e1a17ad 100644 (file)
@@ -1,14 +1,14 @@
 #include <stdio.h>
 #include <fcntl.h>
+#include <assert.h>
 
-#include "glite/jp/types.h"
-#include "glite/jp/context.h"
+#include <glite/jp/types.h>
+#include <glite/jp/context.h>
 
 #include "jpis_H.h"
 #include "jpis_.nsmap"
 #include "soap_version.h"
-
-
+#include "db_ops.h"
 
 
 
@@ -64,16 +64,67 @@ static void err2fault(const glite_jp_context_t ctx,struct soap *soap)
 /*-----------------------------------------*/
 
 
-#define CONTEXT_FROM_SOAP(soap,ctx) glite_jp_context_t ctx = (glite_jp_context_t) ((soap)->user)
+#define CONTEXT_FROM_SOAP(soap,ctx) glite_jpis_context_t       ctx = (glite_jpis_context_t) ((soap)->user)
+
+
+static int updateJob(glite_jpis_context_t isctx, struct jptype__jobRecord *jobAttrs) {
+       glite_jp_attrval_t av;
+       struct jptype__attrValue *attr;
+       int ret, iattrs;
+
+       printf("%s: jobid='%s', attrs=%d\n", __FUNCTION__, jobAttrs->jobid, jobAttrs->__sizeattributes);
+
+       if (jobAttrs->remove) assert(*(jobAttrs->remove) == 0);
+
+       for (iattrs = 0; iattrs < jobAttrs->__sizeattributes; iattrs++) {
+               attr = jobAttrs->attributes[iattrs];
+               SoapToAttrVal(&av, attr);
+               if ((ret = glite_jpis_insertAttrVal(isctx, jobAttrs->jobid, &av)) != 0) return ret;
+       }
+
+       return 0;
+}
+
 
 SOAP_FMAC5 int SOAP_FMAC6 __jpsrv__UpdateJobs(
        struct soap *soap,
        struct _jpelem__UpdateJobs *jpelem__UpdateJobs,
        struct _jpelem__UpdateJobsResponse *jpelem__UpdateJobsResponse)
 {
+       int ret, ijobs;
+       const char *feedid;
+       int status, done;
+       CONTEXT_FROM_SOAP(soap, isctx);
+       glite_jp_context_t jpctx = isctx->jpctx;
+
        // XXX: test client in examples/jpis-test
        //      sends to this function some data for testing
        puts(__FUNCTION__);
+
+       // get info about the feed
+       feedid = jpelem__UpdateJobs->feedId;
+       memset(isctx->param_feedid, 0, sizeof(isctx->param_feedid));
+       strncpy(isctx->param_feedid, feedid, sizeof(isctx->param_feedid) - 1);
+       if ((ret = glite_jp_db_execute(isctx->select_info_feed_stmt)) != 1) {
+               fprintf(stderr, "can't get info about '%s', returned %d records: %s (%s)\n", feedid, ret, jpctx->error->desc, jpctx->error->source);
+               return SOAP_FAULT;
+       }
+       // update status, if needed (only oring)
+       status = isctx->param_state;
+       done = jpelem__UpdateJobs->feedDone ? GLITE_JP_IS_STATE_DONE : 0;
+       if ((done != (status & GLITE_JP_IS_STATE_DONE)) && done) {
+               isctx->param_state != done;
+               if ((ret = glite_jp_db_execute(isctx->update_state_feed_stmt)) != 1) {
+                       fprintf(stderr, "can't update state of '%s', returned %d records: %s (%s)\n", feedid, ret, jpctx->error->desc, jpctx->error->source);
+                       return SOAP_FAULT;
+               }
+       }
+
+       // insert all attributes
+       for (ijobs = 0; ijobs < jpelem__UpdateJobs->__sizejobAttributes; ijobs++) {
+               if (updateJob(isctx, jpelem__UpdateJobs->jobAttributes[ijobs]) != 0) return SOAP_FAULT;
+       }
+
        return SOAP_OK;
 }
 
index 8eae0e5..85532b3 100644 (file)
 
 #include "conf.h"
 #include "db_ops.h"
+#include "ws_typeref.h"
 
 #include "stdsoap2.h"
 
-extern int glite_jpis_QueryCondToSoap(struct soap *soap, glite_jp_query_rec_t *in, struct jptype__primaryQuery **out);
-
 
 /*------------------*/
 /* Helper functions */
index e25b66b..8f5db7f 100644 (file)
@@ -7,6 +7,7 @@
 
 #include "jpps_H.h"
 #include "ws_typemap.h"
+#include "ws_typeref.h"
 
 
 
@@ -110,6 +111,29 @@ int glite_jpis_QueryCondToSoap(
        return SOAP_OK;
 }
 
+static void SoapToAttrOrig(glite_jp_attr_orig_t *out, const enum jptype__attrOrig in)
+{
+        switch ( in )
+        {
+       case jptype__attrOrig__SYSTEM: *out = GLITE_JP_ATTR_ORIG_SYSTEM; break;
+       case jptype__attrOrig__USER: *out = GLITE_JP_ATTR_ORIG_USER; break;
+       case jptype__attrOrig__FILE_: *out = GLITE_JP_ATTR_ORIG_FILE; break;
+       default: assert(0); break;
+        }
+}
 
-
-
+void SoapToAttrVal(glite_jp_attrval_t *av, const struct jptype__attrValue *attr) {
+       memset(av, 0, sizeof(*av));
+       av->name = attr->name;
+       av->binary = attr->value->blob ? 1 : 0;
+       assert(av->binary || attr->value->string);
+       if (av->binary) {
+               av->value = attr->value->blob->__ptr;
+               av->size =attr->value->blob->__size ;
+       } else {
+               av->size = -1;
+       }
+       SoapToAttrOrig(&av->origin, attr->origin);
+       av->origin_detail = attr->originDetail;
+       av->timestamp = attr->timestamp;
+}
diff --git a/org.glite.jp.index/src/ws_typeref.h b/org.glite.jp.index/src/ws_typeref.h
new file mode 100644 (file)
index 0000000..9aae027
--- /dev/null
@@ -0,0 +1,8 @@
+#ifndef GLITE_JPIS_TYPEREF_H
+#define GLITE_JPIS_TYPEREF_H
+
+int glite_jpis_QueryCondToSoap(struct soap *soap, glite_jp_query_rec_t *in, struct jptype__primaryQuery **out);
+
+void SoapToAttrVal(glite_jp_attrval_t *av, const struct jptype__attrValue *attr);
+
+#endif