Implemented JP IS feeder from local file + updated documentation.
authorFrantišek Dvořák <valtri@civ.zcu.cz>
Fri, 2 May 2008 11:48:08 +0000 (11:48 +0000)
committerFrantišek Dvořák <valtri@civ.zcu.cz>
Fri, 2 May 2008 11:48:08 +0000 (11:48 +0000)
DB engine switched to InnoDB, enabled transactions for feeding.
Transaction functions wrappers in JP + one bugfix in error propagation.

org.glite.jp.index/config/glite-jp-index-dbsetup.sql
org.glite.jp.index/doc/glite-jp-indexd.sgml
org.glite.jp.index/src/bones_server.c
org.glite.jp.index/src/conf.c
org.glite.jp.index/src/conf.h
org.glite.jp.index/src/db_ops.c
org.glite.jp.index/src/db_ops.h
org.glite.jp.server-common/interface/db.h
org.glite.jp.server-common/src/db.c

index 97a5a2f..615b11d 100644 (file)
@@ -9,7 +9,7 @@ create table jobs (
        unique (dg_jobid),
        index (jobid),
        index (dg_jobid)
-);
+) engine=innodb;
 
 create table attrs (
        `attrid`                char(32)        binary not null,
@@ -20,7 +20,7 @@ create table attrs (
        primary key (attrid),
        index (attrid),
        index (name)
-);
+) engine=innodb;
 
 create table feeds (
        `uniqueid`      int             auto_increment not null,
@@ -35,7 +35,7 @@ create table feeds (
         index (uniqueid),
         index (feedid),
        index (state)
-);
+) engine=innodb;
 
 create table acls (
        `aclid`           char(32)        binary not null,
@@ -43,7 +43,7 @@ create table acls (
        `refcnt`          int             not null,
 
         primary key (aclid)
-);
+) engine=innodb;
 
 create table users (
        `userid`          char(32)        binary not null,
@@ -51,7 +51,7 @@ create table users (
 
         primary key (userid),
         unique (cert_subj)
-);
+) engine=innodb;
 
 
 # data tables - created one for each configured attribute, index on
@@ -65,5 +65,5 @@ create table users (
 #
 #        index (jobid),
 #        index (value)
-#);
+#) engine=innodb;
 
index 4a64ff5..f428a6b 100644 (file)
@@ -1,4 +1,4 @@
-<!DOCTYPE refentry PUBLIC "-//OASIS//DTD DocBook V4.1//EN">
+<!DOCTYPE refentry PUBLIC "-//OASIS//DTD DocBook V4.1//EN" "http://docbook.org/xml/4.1/docbookx.dtd">
 
 <refentry id='glitejpindexd'>
 
                                <arg>--delete-db</arg>
                        </group></arg>
 
+                       <arg><group choice='plain'>
+                               <arg>-f</arg>
+                               <arg>--feeding</arg>
+                       </group> <replaceable>FILE</replaceable> </arg>
+
                </cmdsynopsis>
        </refsynopsisdiv>
 
@@ -224,6 +229,13 @@ Delete and refetch the data in the database. You need use this parameter when fe
                                </para></listitem>
                        </varlistentry>
 
+                       <varlistentry>
+                               <term><option>-f</option>|<option>--feeding</option></term>
+                               <listitem><para>
+Feed the index server from the local file. Format of the file is job per line with attribute values using ";" separator. Attributes go in order according to server configuration file (possible attributes with jobid and owner in configuration file are ignored).
+                               </para></listitem>
+                       </varlistentry>
+
                </variablelist>
 
        </refsect1>
index 0c3994b..b52ffa0 100644 (file)
@@ -74,7 +74,7 @@ int main(int argc, char *argv[])
        edg_wll_GssStatus       gss_code;
        struct sockaddr_in      a;
        glite_jpis_context_t    isctx;
-
+       int retval = 0;
 
        glite_jp_init_context(&ctx);
 
@@ -105,21 +105,28 @@ int main(int argc, char *argv[])
        if (conf->delete_db) {
                if (glite_jpis_dropDatabase(isctx) != 0) {
                        fprintf(stderr, "Drop DB failed: %s (%s)\n", ctx->error->desc, ctx->error->source);
-                       glite_jpis_free_db(isctx);
-                       glite_jpis_free_context(isctx);
-                       glite_jp_free_context(ctx);
-                       glite_jp_free_conf(conf);
-                       return 1;
+                       retval = 1;
+                       goto quit;
                }
        }
 
        if (glite_jpis_initDatabase(isctx) != 0) {
                fprintf(stderr, "Init DB failed: %s (%s)\n", ctx->error->desc, ctx->error->source);
-               glite_jpis_free_db(isctx);
-               glite_jpis_free_context(isctx);
-               glite_jp_free_context(ctx);
-               glite_jp_free_conf(conf);
-               return 1;
+               retval = 1;
+               goto quit;
+       }
+
+       if (conf->feeding) {
+               char *err;
+
+               fprintf(stderr, "%s: Feeding from '%s'\n", argv[0], conf->feeding);
+               retval = glite_jpis_feeding(isctx, conf->feeding);
+               if (retval) {
+                       err = glite_jp_error_chain(isctx->jpctx);
+                       fprintf(stderr, "%s: %s\n", argv[0], err);
+                       free(err);
+               }
+               goto quit;
        }
 
        stab.conn = socket(PF_INET, SOCK_STREAM, 0);
@@ -190,12 +197,13 @@ int main(int argc, char *argv[])
 #endif
        glite_srvbones_run(data_init,&stab,1 /* XXX: entries in stab */,debug);
 
+quit:
        glite_jpis_free_db(isctx);
        glite_jp_free_conf(conf);
        glite_jpis_free_context(isctx);
        glite_jp_free_context(ctx);
 
-       return 0;
+       return retval;
 }
 
 
@@ -227,7 +235,8 @@ static int feed_caller(struct soap *soap, glite_jpis_context_t isctx) {
        feedid = NULL;
        for (initialized = 0; initialized <= 1; initialized++) {
                switch (glite_jpis_lockSearchFeed(isctx,initialized,&uniqueid,&PS_URL,&status,&feedid)) {
-               case 0: 
+               case 0:
+                       // some locked feeds found
                        ok = 0;
                        for (i = 0; i < 10; i++) {
                                if (!initialized) {
index ac073fe..f1cbacb 100644 (file)
@@ -24,7 +24,7 @@
 
 extern SOAP_NMAC struct Namespace jp__namespaces[];
 
-static const char *get_opt_string = "dq:c:k:C:V:nm:p:i:o:x:s:D";
+static const char *get_opt_string = "dq:c:k:C:V:nm:p:i:o:x:s:Df:";
 
 static struct option opts[] = {
        {"debug",       0, NULL,        'd'},
@@ -41,6 +41,7 @@ static struct option opts[] = {
        {"config",      1, NULL,        'x'},
        {"slaves",      1, NULL,        's'},
        {"delete-db",   0, NULL,        'D'},
+       {"feeding",     1, NULL,        'f'},
        {NULL,          0, NULL,        0}
 };
 
@@ -65,7 +66,8 @@ static void usage(char *me)
                "\t-o, --logfile\t file to store logs\n"
                "\t-x, --config\t file with server configuration\n"
                "\t-s, --slaves\t number of slaves for responses\n"
-               "\t-D, --delete-db\t delete and restore data in the database"
+               "\t-D, --delete-db\t delete and restore data in the database\n"
+               "\t-f, --feeding\t feed index server from local file\n"
                "\n"
        ,me);
 }
@@ -95,6 +97,7 @@ int glite_jp_get_conf(int argc, char **argv, glite_jp_is_conf **configuration)
                case 'x': conf_file = optarg; break;
                case 's': conf->slaves = atoi(optarg); if (conf->slaves > 0) break;
                case 'D': conf->delete_db = 1; break;
+               case 'f': conf->feeding = optarg; break;
                default : usage(argv[0]); exit(0); break;
        }
 
index 4925e19..4cd6060 100644 (file)
@@ -50,6 +50,8 @@ typedef struct _glite_jp_is_conf {
                *server_key;
        int     slaves;
        int     delete_db;
+
+       char    *feeding;               // feed DB from local file
 } glite_jp_is_conf;
 
 
index fef6240..415400b 100644 (file)
@@ -8,13 +8,16 @@
 #include <assert.h>
 #include <stdio.h>
 #include <ctype.h>
+#include <string.h>
 
 #include <glite/lbu/trio.h>
 #include <glite/jobid/strmd5.h>
+#include <glite/jobid/cjobid.h>
 #include <glite/jp/types.h>
 #include <glite/jp/context.h>
 #include <glite/jp/db.h>
 #include <glite/jp/attr.h>
+#include "glite/jp/known_attr.h"
 
 #include "conf.h"
 #include "context.h"
@@ -36,7 +39,7 @@
 \n\
         INDEX (jobid),\n\
         INDEX (value)\n\
-);"
+) ENGINE=innodb;"
 #define SQLCMD_INSERT_ATTRVAL "INSERT INTO " TABLE_PREFIX_DATA "%|Ss (jobid, value, full_value, origin) VALUES (\n\
        '%|Ss',\n\
        '%|Ss',\n\
@@ -309,7 +312,7 @@ int glite_jpis_initDatabase(glite_jpis_context_t ctx) {
        free(num);
        glite_jp_db_FreeStmt(&stmt);
        if (nattrs != 0) {
-               lprintf("database with %d attributes keeped (use -D for delete)\n");
+               lprintf("database with %d attributes kept (use -D for delete)\n", nattrs);
                return 0;
        }
 
@@ -423,7 +426,7 @@ fail:
 
 
 int glite_jpis_init_db(glite_jpis_context_t isctx) {
-       int ret;
+       int ret, caps;
        const char *cs;
        glite_jp_context_t jpctx;
 
@@ -432,6 +435,15 @@ int glite_jpis_init_db(glite_jpis_context_t isctx) {
        if ((cs = isctx->conf->cs) == NULL) cs = GLITE_JP_IS_DEFAULTCS;
        if (glite_lbu_DBConnect(jpctx->dbhandle, cs) != 0) goto fail_db;
 
+       // try transaction for feeding
+       if (isctx->conf->feeding) {
+               caps = glite_lbu_DBQueryCaps(jpctx->dbhandle);
+               if (caps != -1) {
+                       glite_lbu_DBSetCaps(jpctx->dbhandle, caps);
+                       llprintf(LOG_SQL, "transactions %s\n", (caps & GLITE_LBU_DB_CAP_TRANSACTIONS) ? "supported" : "not supported");
+               }
+       }
+
        // sql command: lock the feed (via uniqueid)
        if ((ret = glite_jp_db_PrepareStmt(jpctx, "UPDATE feeds SET locked=1 WHERE (locked = 0) AND (uniqueid = ?)", &isctx->lock_feed_stmt)) != 0) goto fail;
 
@@ -633,6 +645,8 @@ int glite_jpis_insertAttrVal(glite_jpis_context_t ctx, const char *jobid, glite_
        free(value);
        free(full_value);
        llprintf(LOG_SQL, "(%s) sql=%s\n", av->name, sql);
+//     if (ctx->conf->feeding) printf("FEED: %s\n", sql);
+//     else
        if (glite_jp_db_ExecSQL(ctx->jpctx, sql, NULL) != 1) {
                free(sql);
                return ctx->jpctx->error->code;
@@ -689,3 +703,89 @@ fail:
        free(md5_cert);
        return ctx->jpctx->error->code;
 }
+
+
+#define FEEDING_SEPARATORS ";"
+#define FEEDING_JOBID_BKSERVER "localhost-test"
+#define FEEDING_JOBID_PORT 0
+#define FEEDING_PRIMARY_STORAGE "localhost:8901"
+#define FEEDING_OWNER "God"
+int glite_jpis_feeding(glite_jpis_context_t ctx, const char *fname) {
+       FILE *f;
+       char line[1024], *token, *lasts, *jobid = NULL;
+       int nattrs, lno, i, iname, c;
+       glite_jp_attrval_t *avs;
+       glite_jobid_t j;
+
+       if ((f = fopen(fname, "rt")) == NULL) {
+               glite_jpis_stack_error(ctx->jpctx, errno, "can't open csv dump file");
+               return 1;
+       }
+
+       for (nattrs = 0; ctx->conf->attrs[nattrs]; nattrs++);
+       avs = malloc(nattrs * sizeof avs[0]);
+
+       lno = 0;
+       while(fgets(line, sizeof line, f) != NULL) {
+               if ((lno % 100) == 0) {
+                       if (lno) glite_jp_db_Commit(ctx->jpctx);
+                       glite_jp_db_Transaction(ctx->jpctx);
+               }
+               lno++;
+               if (line[0]) {
+                       c = strlen(line) - 1;
+                       if (line[c] != '\r' && line[c] != '\n' && !feof(f)) {
+                               glite_jpis_stack_error(ctx->jpctx, E2BIG, "line too large at %d (max. %d)", lno, sizeof line);
+                               goto err;
+                       }
+                       while (c >= 0 && (line[c] == '\r' || line[c] == '\n')) c--;
+                       line[c + 1] = 0;
+               }
+//             printf("'%s'\n", line);
+
+               memset(avs, 0, nattrs * sizeof avs[0]);
+               i = 0;
+               iname = 0;
+               token = strtok_r(line, FEEDING_SEPARATORS, &lasts);
+               while (token && iname < nattrs) {
+//                     printf("\t'%s'\n", token);
+                       do {
+                               avs[i].name = ctx->conf->attrs[iname];
+                               iname++;
+                       } while (strcasecmp(avs[i].name, GLITE_JP_ATTR_JOBID) == 0 || strcasecmp(avs[i].name, GLITE_JP_ATTR_OWNER) == 0);
+                       avs[i].value = token;
+                       avs[i].timestamp = time(NULL);
+                       fprintf(stderr, "\t %d: %s = '%s'\n", i, avs[i].name, avs[i].value);
+                       i++;
+
+                       token = strtok_r(NULL, FEEDING_SEPARATORS, &lasts);
+               }
+
+               if (glite_jobid_create(FEEDING_JOBID_BKSERVER, FEEDING_JOBID_PORT, &j) != 0) {
+                       glite_jpis_stack_error(ctx->jpctx, errno, "can't create jobid");
+                       goto err;
+               }
+               if ((jobid = glite_jobid_unparse(j)) == NULL) {
+                       glite_jobid_free(j);
+                       glite_jpis_stack_error(ctx->jpctx, ENOMEM, "can't unparse jobid");
+                       goto err;
+               }
+               glite_jobid_free(j);
+               if (glite_jpis_lazyInsertJob(ctx, FEEDING_PRIMARY_STORAGE, jobid, FEEDING_OWNER)) goto err;
+               for (i = 0; i < nattrs && avs[i].name; i++) {
+                       if (glite_jpis_insertAttrVal(ctx, jobid, &avs[i])) goto err;
+               }
+               free(jobid); jobid = NULL;
+       }
+       glite_jp_db_Commit(ctx->jpctx);
+
+       fclose(f);
+       free(avs);
+       return 0;
+err:
+       fclose(f);
+       free(avs);
+       free(jobid);
+       glite_jp_db_Rollback(ctx->jpctx);
+       return 1;
+}
index 19343d8..d95cda5 100644 (file)
@@ -42,4 +42,6 @@ int glite_jpis_insertAttrVal(glite_jpis_context_t ctx, const char *jobid, glite_
 
 int glite_jpis_lazyInsertJob(glite_jpis_context_t ctx, const char *ps, const char *jobid, const char *owner);
 
+int glite_jpis_feeding(glite_jpis_context_t ctx, const char *fname);
+
 #endif
index 147719c..735b2b7 100644 (file)
@@ -16,6 +16,9 @@ int glite_jp_db_FetchRow(glite_jp_context_t ctx, glite_lbu_Statement stmt, unsig
 int glite_jp_db_PrepareStmt(glite_jp_context_t ctx, const char *sql, glite_lbu_Statement *stmt);
 int glite_jp_db_ExecPreparedStmt(glite_jp_context_t ctx, glite_lbu_Statement stmt, int n,...);
 void glite_jp_db_FreeStmt(glite_lbu_Statement *stmt);
+int glite_jp_db_Transaction(glite_jp_context_t ctx);
+int glite_jp_db_Commit(glite_jp_context_t ctx);
+int glite_jp_db_Rollback(glite_jp_context_t ctx);
 
 #ifdef __cplusplus
 }
index 80d02a3..57826b6 100644 (file)
@@ -60,7 +60,7 @@ int glite_jp_db_PrepareStmt(glite_jp_context_t ctx, const char *sql, glite_lbu_S
 
        ret = glite_lbu_PrepareStmt(ctx->dbhandle, sql, stmt);
        if (ret < 0) glite_jp_db_SetError(ctx, __FUNCTION__);
-       return glite_jp_db_SetError(ctx, __FUNCTION__);
+       return ret;
 }
 
 
@@ -71,6 +71,7 @@ int glite_jp_db_ExecPreparedStmt(glite_jp_context_t ctx, glite_lbu_Statement stm
        va_start(ap, n);
        ret = glite_lbu_ExecPreparedStmt_v(stmt, n, ap);
        va_end(ap);
+       if (ret < 0) glite_jp_db_SetError(ctx, __FUNCTION__);
        return ret;
 }
 
@@ -78,3 +79,33 @@ int glite_jp_db_ExecPreparedStmt(glite_jp_context_t ctx, glite_lbu_Statement stm
 void glite_jp_db_FreeStmt(glite_lbu_Statement *stmt) {
        glite_lbu_FreeStmt(stmt);
 }
+
+
+int glite_jp_db_Transaction(glite_jp_context_t ctx) {
+       int ret;
+
+       ret = glite_lbu_Transaction(ctx->dbhandle);
+       if (ret != 0) glite_jp_db_SetError(ctx, __FUNCTION__);
+
+       return ret;
+}
+
+
+int glite_jp_db_Commit(glite_jp_context_t ctx) {
+       int ret;
+
+       ret = glite_lbu_Commit(ctx->dbhandle);
+       if (ret != 0) glite_jp_db_SetError(ctx, __FUNCTION__);
+
+       return ret;
+}
+
+
+int glite_jp_db_Rollback(glite_jp_context_t ctx) {
+       int ret;
+
+       ret = glite_lbu_Rollback(ctx->dbhandle);
+       if (ret != 0) glite_jp_db_SetError(ctx, __FUNCTION__);
+
+       return ret;
+}