INDEX (jobid),\n\
INDEX (value)\n\
);"
+#define SQLCMD_INSERT_ATTRVAL "INSERT INTO " TABLE_PREFIX_DATA "%s (jobid, value, full_value) VALUES (\n\
+ '%s',\n\
+ '%s',\n\
+ '%s'\n\
+)"
+#define INDEX_LENGTH 255
-#define lprintf
-//#define lprintf printf
+//#define lprintf
+#define lprintf printf
#define WORD_SWAP(X) ((((X) >> 8) & 0xFF) | (((X) & 0xFF) << 8))
#define LONG_SWAP(X) (WORD_SWAP(((X) >> 16) & 0xFFFF) | ((WORD_SWAP(X) & 0xFFFF) << 16))
}
+/**
+ * Convert attribute name to attribute id.
+ */
+char *glite_jpis_attr_name2id(const char *name) {
+ return str2md5(name);
+}
+
+
/* Init the database.
*
* \retval 0 OK
i = 0;
while (attrs[i]) {
type_full = glite_jp_attrval_db_type_full(ctx, attrs[i]);
- type_index = glite_jp_attrval_db_type_index(ctx, attrs[i], 255);
+ type_index = glite_jp_attrval_db_type_index(ctx, attrs[i], INDEX_LENGTH);
// attrid column
- tmp = str2md5(attrs[i]);
+ tmp = glite_jpis_attr_name2id(attrs[i]);
strncpy(attrid, tmp, sizeof(attrid) - 1);
free(tmp);
attrid_len = strlen(attrid);
// sql command: select an uninitialized unlocked feed
glite_jp_db_assign_result(&myres[0], MYSQL_TYPE_LONG, NULL, &((*isctx)->param_uniqueid));
glite_jp_db_assign_result(&myres[1], MYSQL_TYPE_VAR_STRING, NULL, (*isctx)->param_ps, sizeof((*isctx)->param_ps), &(*isctx)->param_ps_len);
- if ((ret = glite_jp_db_prepare(jpctx, "SELECT uniqueid, source FROM feeds WHERE (locked=0) AND (feedid IS NULL)", &(*isctx)->select_feed_stmt, NULL, myres)) != 0) goto fail_connect;
+ if ((ret = glite_jp_db_prepare(jpctx, "SELECT uniqueid, source FROM feeds WHERE (locked=0) AND (feedid IS NULL)", &(*isctx)->select_unlocked_feed_stmt, NULL, myres)) != 0) goto fail_connect;
// sql command: lock the feed (via uniqueid)
glite_jp_db_assign_param(&myparam[0], MYSQL_TYPE_LONG, &(*isctx)->param_uniqueid);
glite_jp_db_assign_param(&myparam[0], MYSQL_TYPE_VAR_STRING, (*isctx)->param_feedid, &(*isctx)->param_feedid_len);
glite_jp_db_assign_param(&myparam[1], MYSQL_TYPE_DATETIME, &(*isctx)->param_expires);
glite_jp_db_assign_param(&myparam[2], MYSQL_TYPE_LONG, &(*isctx)->param_uniqueid);
- if ((ret = glite_jp_db_prepare(jpctx, "UPDATE feeds SET feedid=?, expires=? WHERE (uniqueid=?)", &(*isctx)->init_feed_stmt, myparam, NULL)) != 0) goto fail_cmd2;
+ if ((ret = glite_jp_db_prepare(jpctx, "UPDATE feeds SET feedid=?, expires=? WHERE (uniqueid=?)", &(*isctx)->init_feed_stmt, myparam, NULL)) != 0) goto fail_cmd2;
// sql command: unlock the feed (via uniqueid)
glite_jp_db_assign_param(&myparam[0], MYSQL_TYPE_LONG, &(*isctx)->param_uniqueid);
- if ((ret = glite_jp_db_prepare(jpctx, "UPDATE feeds SET locked=0 WHERE (uniqueid=?)", &(*isctx)->unlock_feed_stmt, myparam, NULL)) != 0) goto fail_cmd3;
+ if ((ret = glite_jp_db_prepare(jpctx, "UPDATE feeds SET locked=0 WHERE (uniqueid=?)", &(*isctx)->unlock_feed_stmt, myparam, NULL)) != 0) goto fail_cmd3;
+
+ // sql command: get info about the feed (via feedid)
+ glite_jp_db_assign_param(&myparam[0], MYSQL_TYPE_STRING, (*isctx)->param_feedid, &(*isctx)->param_feedid_len);
+ glite_jp_db_assign_result(&myres[0], MYSQL_TYPE_LONG, NULL, &(*isctx)->param_uniqueid);
+ glite_jp_db_assign_result(&myres[1], MYSQL_TYPE_LONG, NULL, &(*isctx)->param_state);
+ if ((ret = glite_jp_db_prepare(jpctx, "SELECT uniqueid, state FROM feeds WHERE (feedid=?)", &(*isctx)->select_info_feed_stmt, myparam, myres)) != 0) goto fail_cmd4;
+
+ // sql command: update state of the feed (via uniqueid)
+ glite_jp_db_assign_param(&myparam[0], MYSQL_TYPE_LONG, &(*isctx)->param_state);
+ glite_jp_db_assign_param(&myparam[1], MYSQL_TYPE_LONG, &(*isctx)->param_uniqueid);
+ if ((ret = glite_jp_db_prepare(jpctx, "UPDATE feeds SET state=? WHERE (uniqueid=?)", &(*isctx)->update_state_feed_stmt, myparam, NULL)) != 0) goto fail_cmd5;
return 0;
+ glite_jp_db_freestmt(&(*isctx)->update_state_feed_stmt);
+fail_cmd5:
+ glite_jp_db_freestmt(&(*isctx)->select_info_feed_stmt);
+fail_cmd4:
+ glite_jp_db_freestmt(&(*isctx)->unlock_feed_stmt);
fail_cmd3:
glite_jp_db_freestmt(&(*isctx)->init_feed_stmt);
fail_cmd2:
glite_jp_db_freestmt(&(*isctx)->lock_feed_stmt);
fail_cmd:
- glite_jp_db_freestmt(&(*isctx)->select_feed_stmt);
+ glite_jp_db_freestmt(&(*isctx)->select_unlocked_feed_stmt);
fail_connect:
glite_jp_db_close((*isctx)->jpctx);
fail:
int glite_jpis_free_context(glite_jpis_context_t ctx) {
- glite_jp_db_freestmt(&ctx->select_feed_stmt);
+ glite_jp_db_freestmt(&ctx->select_unlocked_feed_stmt);
glite_jp_db_freestmt(&ctx->lock_feed_stmt);
glite_jp_db_freestmt(&ctx->init_feed_stmt);
glite_jp_db_freestmt(&ctx->unlock_feed_stmt);
+ glite_jp_db_freestmt(&ctx->select_info_feed_stmt);
+ glite_jp_db_freestmt(&ctx->update_state_feed_stmt);
glite_jp_db_close(ctx->jpctx);
free(ctx);
}
int ret;
do {
- switch (glite_jp_db_execute(ctx->select_feed_stmt)) {
+ switch (glite_jp_db_execute(ctx->select_unlocked_feed_stmt)) {
case -1: lprintf("error selecting unlocked feed\n"); return ENOLCK;
case 0: lprintf("no more uninit. feeds unlocked\n"); return ENOENT;
default: break;
}
- if (glite_jp_db_fetch(ctx->select_feed_stmt) != 0) return ENOLCK;
+ if (glite_jp_db_fetch(ctx->select_unlocked_feed_stmt) != 0) return ENOLCK;
lprintf("selected uninit. feed %lu\n", ctx->param_uniqueid);
ret = glite_jp_db_execute(ctx->lock_feed_stmt);
return ret == 1 ? 0 : ENOLCK;
}
+
+
+int glite_jpis_insertAttrVal(glite_jpis_context_t ctx, const char *jobid, glite_jp_attrval_t *av) {
+ char *sql, *table, *value, *full_value;
+
+ table = glite_jpis_attr_name2id(av->name);
+ value = glite_jp_attrval_to_db_index(ctx->jpctx, av, INDEX_LENGTH);
+ full_value = glite_jp_attrval_to_db_full(ctx->jpctx, av);
+ asprintf(&sql, SQLCMD_INSERT_ATTRVAL, table, jobid, value, full_value);
+ free(table);
+ free(value);
+ free(full_value);
+ lprintf("%s: sql=%s\n", __FUNCTION__, sql);
+ if (glite_jp_db_execstmt(ctx->jpctx, sql, NULL) != 1) {
+ free(sql);
+ return ctx->jpctx->error->code;
+ }
+ free(sql);
+
+ return 0;
+}
#define GLITE_JP_IS_STATE_HIST 1
#define GLITE_JP_IS_STATE_CONT 2
+#define GLITE_JP_IS_STATE_DONE 4
typedef struct _glite_jpis_context {
glite_jp_context_t jpctx;
- glite_jp_db_stmt_t select_feed_stmt, lock_feed_stmt, init_feed_stmt, unlock_feed_stmt;
- long int param_uniqueid;
+ glite_jp_db_stmt_t select_unlocked_feed_stmt, lock_feed_stmt, init_feed_stmt, unlock_feed_stmt, select_info_feed_stmt, update_state_feed_stmt;
+ long int param_uniqueid, param_state;
char param_feedid[33], param_ps[256];
unsigned long param_ps_len, param_feedid_len;
MYSQL_TIME param_expires;
} *glite_jpis_context_t;
+char *glite_jpis_attr_name2id(const char *name);
int glite_jpis_initDatabase(glite_jp_context_t ctx, glite_jp_is_conf *conf);
int glite_jpis_dropDatabase(glite_jp_context_t ctx);
int glite_jpis_initFeed(glite_jpis_context_t ctx, long int uniqueid, char *feedId, time_t feedExpires);
int glite_jpis_unlockFeed(glite_jpis_context_t ctx, long int uniqueid);
+int glite_jpis_insertAttrVal(glite_jpis_context_t ctx, const char *jobid, glite_jp_attrval_t *av);
+
#endif
#include <stdio.h>
#include <fcntl.h>
+#include <assert.h>
-#include "glite/jp/types.h"
-#include "glite/jp/context.h"
+#include <glite/jp/types.h>
+#include <glite/jp/context.h>
#include "jpis_H.h"
#include "jpis_.nsmap"
#include "soap_version.h"
-
-
+#include "db_ops.h"
/*-----------------------------------------*/
-#define CONTEXT_FROM_SOAP(soap,ctx) glite_jp_context_t ctx = (glite_jp_context_t) ((soap)->user)
+#define CONTEXT_FROM_SOAP(soap,ctx) glite_jpis_context_t ctx = (glite_jpis_context_t) ((soap)->user)
+
+
+static int updateJob(glite_jpis_context_t isctx, struct jptype__jobRecord *jobAttrs) {
+ glite_jp_attrval_t av;
+ struct jptype__attrValue *attr;
+ int ret, iattrs;
+
+ printf("%s: jobid='%s', attrs=%d\n", __FUNCTION__, jobAttrs->jobid, jobAttrs->__sizeattributes);
+
+ if (jobAttrs->remove) assert(*(jobAttrs->remove) == 0);
+
+ for (iattrs = 0; iattrs < jobAttrs->__sizeattributes; iattrs++) {
+ attr = jobAttrs->attributes[iattrs];
+ SoapToAttrVal(&av, attr);
+ if ((ret = glite_jpis_insertAttrVal(isctx, jobAttrs->jobid, &av)) != 0) return ret;
+ }
+
+ return 0;
+}
+
SOAP_FMAC5 int SOAP_FMAC6 __jpsrv__UpdateJobs(
struct soap *soap,
struct _jpelem__UpdateJobs *jpelem__UpdateJobs,
struct _jpelem__UpdateJobsResponse *jpelem__UpdateJobsResponse)
{
+ int ret, ijobs;
+ const char *feedid;
+ int status, done;
+ CONTEXT_FROM_SOAP(soap, isctx);
+ glite_jp_context_t jpctx = isctx->jpctx;
+
// XXX: test client in examples/jpis-test
// sends to this function some data for testing
puts(__FUNCTION__);
+
+ // get info about the feed
+ feedid = jpelem__UpdateJobs->feedId;
+ memset(isctx->param_feedid, 0, sizeof(isctx->param_feedid));
+ strncpy(isctx->param_feedid, feedid, sizeof(isctx->param_feedid) - 1);
+ if ((ret = glite_jp_db_execute(isctx->select_info_feed_stmt)) != 1) {
+ fprintf(stderr, "can't get info about '%s', returned %d records: %s (%s)\n", feedid, ret, jpctx->error->desc, jpctx->error->source);
+ return SOAP_FAULT;
+ }
+ // update status, if needed (only oring)
+ status = isctx->param_state;
+ done = jpelem__UpdateJobs->feedDone ? GLITE_JP_IS_STATE_DONE : 0;
+ if ((done != (status & GLITE_JP_IS_STATE_DONE)) && done) {
+ isctx->param_state != done;
+ if ((ret = glite_jp_db_execute(isctx->update_state_feed_stmt)) != 1) {
+ fprintf(stderr, "can't update state of '%s', returned %d records: %s (%s)\n", feedid, ret, jpctx->error->desc, jpctx->error->source);
+ return SOAP_FAULT;
+ }
+ }
+
+ // insert all attributes
+ for (ijobs = 0; ijobs < jpelem__UpdateJobs->__sizejobAttributes; ijobs++) {
+ if (updateJob(isctx, jpelem__UpdateJobs->jobAttributes[ijobs]) != 0) return SOAP_FAULT;
+ }
+
return SOAP_OK;
}