From 9f7469fb1f40b3c3e3004d82cc38e3028916648b Mon Sep 17 00:00:00 2001 From: =?utf8?q?Franti=C5=A1ek=20Dvo=C5=99=C3=A1k?= Date: Fri, 2 May 2008 11:48:08 +0000 Subject: [PATCH] Implemented JP IS feeder from local file + updated documentation. DB engine switched to InnoDB, enabled transactions for feeding. Transaction functions wrappers in JP + one bugfix in error propagation. --- .../config/glite-jp-index-dbsetup.sql | 12 +-- org.glite.jp.index/doc/glite-jp-indexd.sgml | 14 ++- org.glite.jp.index/src/bones_server.c | 35 ++++--- org.glite.jp.index/src/conf.c | 7 +- org.glite.jp.index/src/conf.h | 2 + org.glite.jp.index/src/db_ops.c | 106 ++++++++++++++++++++- org.glite.jp.index/src/db_ops.h | 2 + org.glite.jp.server-common/interface/db.h | 3 + org.glite.jp.server-common/src/db.c | 33 ++++++- 9 files changed, 188 insertions(+), 26 deletions(-) diff --git a/org.glite.jp.index/config/glite-jp-index-dbsetup.sql b/org.glite.jp.index/config/glite-jp-index-dbsetup.sql index 97a5a2f..615b11d 100644 --- a/org.glite.jp.index/config/glite-jp-index-dbsetup.sql +++ b/org.glite.jp.index/config/glite-jp-index-dbsetup.sql @@ -9,7 +9,7 @@ create table jobs ( unique (dg_jobid), index (jobid), index (dg_jobid) -); +) engine=innodb; create table attrs ( `attrid` char(32) binary not null, @@ -20,7 +20,7 @@ create table attrs ( primary key (attrid), index (attrid), index (name) -); +) engine=innodb; create table feeds ( `uniqueid` int auto_increment not null, @@ -35,7 +35,7 @@ create table feeds ( index (uniqueid), index (feedid), index (state) -); +) engine=innodb; create table acls ( `aclid` char(32) binary not null, @@ -43,7 +43,7 @@ create table acls ( `refcnt` int not null, primary key (aclid) -); +) engine=innodb; create table users ( `userid` char(32) binary not null, @@ -51,7 +51,7 @@ create table users ( primary key (userid), unique (cert_subj) -); +) engine=innodb; # data tables - created one for each configured attribute, index on @@ -65,5 +65,5 @@ create table users ( # # index (jobid), # index (value) -#); +#) engine=innodb; diff --git a/org.glite.jp.index/doc/glite-jp-indexd.sgml b/org.glite.jp.index/doc/glite-jp-indexd.sgml index 4a64ff5..f428a6b 100644 --- a/org.glite.jp.index/doc/glite-jp-indexd.sgml +++ b/org.glite.jp.index/doc/glite-jp-indexd.sgml @@ -1,4 +1,4 @@ - + @@ -93,6 +93,11 @@ --delete-db + + -f + --feeding + FILE + @@ -224,6 +229,13 @@ Delete and refetch the data in the database. You need use this parameter when fe + + | + +Feed the index server from the local file. Format of the file is job per line with attribute values using ";" separator. Attributes go in order according to server configuration file (possible attributes with jobid and owner in configuration file are ignored). + + + diff --git a/org.glite.jp.index/src/bones_server.c b/org.glite.jp.index/src/bones_server.c index 0c3994b..b52ffa0 100644 --- a/org.glite.jp.index/src/bones_server.c +++ b/org.glite.jp.index/src/bones_server.c @@ -74,7 +74,7 @@ int main(int argc, char *argv[]) edg_wll_GssStatus gss_code; struct sockaddr_in a; glite_jpis_context_t isctx; - + int retval = 0; glite_jp_init_context(&ctx); @@ -105,21 +105,28 @@ int main(int argc, char *argv[]) if (conf->delete_db) { if (glite_jpis_dropDatabase(isctx) != 0) { fprintf(stderr, "Drop DB failed: %s (%s)\n", ctx->error->desc, ctx->error->source); - glite_jpis_free_db(isctx); - glite_jpis_free_context(isctx); - glite_jp_free_context(ctx); - glite_jp_free_conf(conf); - return 1; + retval = 1; + goto quit; } } if (glite_jpis_initDatabase(isctx) != 0) { fprintf(stderr, "Init DB failed: %s (%s)\n", ctx->error->desc, ctx->error->source); - glite_jpis_free_db(isctx); - glite_jpis_free_context(isctx); - glite_jp_free_context(ctx); - glite_jp_free_conf(conf); - return 1; + retval = 1; + goto quit; + } + + if (conf->feeding) { + char *err; + + fprintf(stderr, "%s: Feeding from '%s'\n", argv[0], conf->feeding); + retval = glite_jpis_feeding(isctx, conf->feeding); + if (retval) { + err = glite_jp_error_chain(isctx->jpctx); + fprintf(stderr, "%s: %s\n", argv[0], err); + free(err); + } + goto quit; } stab.conn = socket(PF_INET, SOCK_STREAM, 0); @@ -190,12 +197,13 @@ int main(int argc, char *argv[]) #endif glite_srvbones_run(data_init,&stab,1 /* XXX: entries in stab */,debug); +quit: glite_jpis_free_db(isctx); glite_jp_free_conf(conf); glite_jpis_free_context(isctx); glite_jp_free_context(ctx); - return 0; + return retval; } @@ -227,7 +235,8 @@ static int feed_caller(struct soap *soap, glite_jpis_context_t isctx) { feedid = NULL; for (initialized = 0; initialized <= 1; initialized++) { switch (glite_jpis_lockSearchFeed(isctx,initialized,&uniqueid,&PS_URL,&status,&feedid)) { - case 0: + case 0: + // some locked feeds found ok = 0; for (i = 0; i < 10; i++) { if (!initialized) { diff --git a/org.glite.jp.index/src/conf.c b/org.glite.jp.index/src/conf.c index ac073fe..f1cbacb 100644 --- a/org.glite.jp.index/src/conf.c +++ b/org.glite.jp.index/src/conf.c @@ -24,7 +24,7 @@ extern SOAP_NMAC struct Namespace jp__namespaces[]; -static const char *get_opt_string = "dq:c:k:C:V:nm:p:i:o:x:s:D"; +static const char *get_opt_string = "dq:c:k:C:V:nm:p:i:o:x:s:Df:"; static struct option opts[] = { {"debug", 0, NULL, 'd'}, @@ -41,6 +41,7 @@ static struct option opts[] = { {"config", 1, NULL, 'x'}, {"slaves", 1, NULL, 's'}, {"delete-db", 0, NULL, 'D'}, + {"feeding", 1, NULL, 'f'}, {NULL, 0, NULL, 0} }; @@ -65,7 +66,8 @@ static void usage(char *me) "\t-o, --logfile\t file to store logs\n" "\t-x, --config\t file with server configuration\n" "\t-s, --slaves\t number of slaves for responses\n" - "\t-D, --delete-db\t delete and restore data in the database" + "\t-D, --delete-db\t delete and restore data in the database\n" + "\t-f, --feeding\t feed index server from local file\n" "\n" ,me); } @@ -95,6 +97,7 @@ int glite_jp_get_conf(int argc, char **argv, glite_jp_is_conf **configuration) case 'x': conf_file = optarg; break; case 's': conf->slaves = atoi(optarg); if (conf->slaves > 0) break; case 'D': conf->delete_db = 1; break; + case 'f': conf->feeding = optarg; break; default : usage(argv[0]); exit(0); break; } diff --git a/org.glite.jp.index/src/conf.h b/org.glite.jp.index/src/conf.h index 4925e19..4cd6060 100644 --- a/org.glite.jp.index/src/conf.h +++ b/org.glite.jp.index/src/conf.h @@ -50,6 +50,8 @@ typedef struct _glite_jp_is_conf { *server_key; int slaves; int delete_db; + + char *feeding; // feed DB from local file } glite_jp_is_conf; diff --git a/org.glite.jp.index/src/db_ops.c b/org.glite.jp.index/src/db_ops.c index fef6240..415400b 100644 --- a/org.glite.jp.index/src/db_ops.c +++ b/org.glite.jp.index/src/db_ops.c @@ -8,13 +8,16 @@ #include #include #include +#include #include #include +#include #include #include #include #include +#include "glite/jp/known_attr.h" #include "conf.h" #include "context.h" @@ -36,7 +39,7 @@ \n\ INDEX (jobid),\n\ INDEX (value)\n\ -);" +) ENGINE=innodb;" #define SQLCMD_INSERT_ATTRVAL "INSERT INTO " TABLE_PREFIX_DATA "%|Ss (jobid, value, full_value, origin) VALUES (\n\ '%|Ss',\n\ '%|Ss',\n\ @@ -309,7 +312,7 @@ int glite_jpis_initDatabase(glite_jpis_context_t ctx) { free(num); glite_jp_db_FreeStmt(&stmt); if (nattrs != 0) { - lprintf("database with %d attributes keeped (use -D for delete)\n"); + lprintf("database with %d attributes kept (use -D for delete)\n", nattrs); return 0; } @@ -423,7 +426,7 @@ fail: int glite_jpis_init_db(glite_jpis_context_t isctx) { - int ret; + int ret, caps; const char *cs; glite_jp_context_t jpctx; @@ -432,6 +435,15 @@ int glite_jpis_init_db(glite_jpis_context_t isctx) { if ((cs = isctx->conf->cs) == NULL) cs = GLITE_JP_IS_DEFAULTCS; if (glite_lbu_DBConnect(jpctx->dbhandle, cs) != 0) goto fail_db; + // try transaction for feeding + if (isctx->conf->feeding) { + caps = glite_lbu_DBQueryCaps(jpctx->dbhandle); + if (caps != -1) { + glite_lbu_DBSetCaps(jpctx->dbhandle, caps); + llprintf(LOG_SQL, "transactions %s\n", (caps & GLITE_LBU_DB_CAP_TRANSACTIONS) ? "supported" : "not supported"); + } + } + // sql command: lock the feed (via uniqueid) if ((ret = glite_jp_db_PrepareStmt(jpctx, "UPDATE feeds SET locked=1 WHERE (locked = 0) AND (uniqueid = ?)", &isctx->lock_feed_stmt)) != 0) goto fail; @@ -633,6 +645,8 @@ int glite_jpis_insertAttrVal(glite_jpis_context_t ctx, const char *jobid, glite_ free(value); free(full_value); llprintf(LOG_SQL, "(%s) sql=%s\n", av->name, sql); +// if (ctx->conf->feeding) printf("FEED: %s\n", sql); +// else if (glite_jp_db_ExecSQL(ctx->jpctx, sql, NULL) != 1) { free(sql); return ctx->jpctx->error->code; @@ -689,3 +703,89 @@ fail: free(md5_cert); return ctx->jpctx->error->code; } + + +#define FEEDING_SEPARATORS ";" +#define FEEDING_JOBID_BKSERVER "localhost-test" +#define FEEDING_JOBID_PORT 0 +#define FEEDING_PRIMARY_STORAGE "localhost:8901" +#define FEEDING_OWNER "God" +int glite_jpis_feeding(glite_jpis_context_t ctx, const char *fname) { + FILE *f; + char line[1024], *token, *lasts, *jobid = NULL; + int nattrs, lno, i, iname, c; + glite_jp_attrval_t *avs; + glite_jobid_t j; + + if ((f = fopen(fname, "rt")) == NULL) { + glite_jpis_stack_error(ctx->jpctx, errno, "can't open csv dump file"); + return 1; + } + + for (nattrs = 0; ctx->conf->attrs[nattrs]; nattrs++); + avs = malloc(nattrs * sizeof avs[0]); + + lno = 0; + while(fgets(line, sizeof line, f) != NULL) { + if ((lno % 100) == 0) { + if (lno) glite_jp_db_Commit(ctx->jpctx); + glite_jp_db_Transaction(ctx->jpctx); + } + lno++; + if (line[0]) { + c = strlen(line) - 1; + if (line[c] != '\r' && line[c] != '\n' && !feof(f)) { + glite_jpis_stack_error(ctx->jpctx, E2BIG, "line too large at %d (max. %d)", lno, sizeof line); + goto err; + } + while (c >= 0 && (line[c] == '\r' || line[c] == '\n')) c--; + line[c + 1] = 0; + } +// printf("'%s'\n", line); + + memset(avs, 0, nattrs * sizeof avs[0]); + i = 0; + iname = 0; + token = strtok_r(line, FEEDING_SEPARATORS, &lasts); + while (token && iname < nattrs) { +// printf("\t'%s'\n", token); + do { + avs[i].name = ctx->conf->attrs[iname]; + iname++; + } while (strcasecmp(avs[i].name, GLITE_JP_ATTR_JOBID) == 0 || strcasecmp(avs[i].name, GLITE_JP_ATTR_OWNER) == 0); + avs[i].value = token; + avs[i].timestamp = time(NULL); + fprintf(stderr, "\t %d: %s = '%s'\n", i, avs[i].name, avs[i].value); + i++; + + token = strtok_r(NULL, FEEDING_SEPARATORS, &lasts); + } + + if (glite_jobid_create(FEEDING_JOBID_BKSERVER, FEEDING_JOBID_PORT, &j) != 0) { + glite_jpis_stack_error(ctx->jpctx, errno, "can't create jobid"); + goto err; + } + if ((jobid = glite_jobid_unparse(j)) == NULL) { + glite_jobid_free(j); + glite_jpis_stack_error(ctx->jpctx, ENOMEM, "can't unparse jobid"); + goto err; + } + glite_jobid_free(j); + if (glite_jpis_lazyInsertJob(ctx, FEEDING_PRIMARY_STORAGE, jobid, FEEDING_OWNER)) goto err; + for (i = 0; i < nattrs && avs[i].name; i++) { + if (glite_jpis_insertAttrVal(ctx, jobid, &avs[i])) goto err; + } + free(jobid); jobid = NULL; + } + glite_jp_db_Commit(ctx->jpctx); + + fclose(f); + free(avs); + return 0; +err: + fclose(f); + free(avs); + free(jobid); + glite_jp_db_Rollback(ctx->jpctx); + return 1; +} diff --git a/org.glite.jp.index/src/db_ops.h b/org.glite.jp.index/src/db_ops.h index 19343d8..d95cda5 100644 --- a/org.glite.jp.index/src/db_ops.h +++ b/org.glite.jp.index/src/db_ops.h @@ -42,4 +42,6 @@ int glite_jpis_insertAttrVal(glite_jpis_context_t ctx, const char *jobid, glite_ int glite_jpis_lazyInsertJob(glite_jpis_context_t ctx, const char *ps, const char *jobid, const char *owner); +int glite_jpis_feeding(glite_jpis_context_t ctx, const char *fname); + #endif diff --git a/org.glite.jp.server-common/interface/db.h b/org.glite.jp.server-common/interface/db.h index 147719c..735b2b7 100644 --- a/org.glite.jp.server-common/interface/db.h +++ b/org.glite.jp.server-common/interface/db.h @@ -16,6 +16,9 @@ int glite_jp_db_FetchRow(glite_jp_context_t ctx, glite_lbu_Statement stmt, unsig int glite_jp_db_PrepareStmt(glite_jp_context_t ctx, const char *sql, glite_lbu_Statement *stmt); int glite_jp_db_ExecPreparedStmt(glite_jp_context_t ctx, glite_lbu_Statement stmt, int n,...); void glite_jp_db_FreeStmt(glite_lbu_Statement *stmt); +int glite_jp_db_Transaction(glite_jp_context_t ctx); +int glite_jp_db_Commit(glite_jp_context_t ctx); +int glite_jp_db_Rollback(glite_jp_context_t ctx); #ifdef __cplusplus } diff --git a/org.glite.jp.server-common/src/db.c b/org.glite.jp.server-common/src/db.c index 80d02a3..57826b6 100644 --- a/org.glite.jp.server-common/src/db.c +++ b/org.glite.jp.server-common/src/db.c @@ -60,7 +60,7 @@ int glite_jp_db_PrepareStmt(glite_jp_context_t ctx, const char *sql, glite_lbu_S ret = glite_lbu_PrepareStmt(ctx->dbhandle, sql, stmt); if (ret < 0) glite_jp_db_SetError(ctx, __FUNCTION__); - return glite_jp_db_SetError(ctx, __FUNCTION__); + return ret; } @@ -71,6 +71,7 @@ int glite_jp_db_ExecPreparedStmt(glite_jp_context_t ctx, glite_lbu_Statement stm va_start(ap, n); ret = glite_lbu_ExecPreparedStmt_v(stmt, n, ap); va_end(ap); + if (ret < 0) glite_jp_db_SetError(ctx, __FUNCTION__); return ret; } @@ -78,3 +79,33 @@ int glite_jp_db_ExecPreparedStmt(glite_jp_context_t ctx, glite_lbu_Statement stm void glite_jp_db_FreeStmt(glite_lbu_Statement *stmt) { glite_lbu_FreeStmt(stmt); } + + +int glite_jp_db_Transaction(glite_jp_context_t ctx) { + int ret; + + ret = glite_lbu_Transaction(ctx->dbhandle); + if (ret != 0) glite_jp_db_SetError(ctx, __FUNCTION__); + + return ret; +} + + +int glite_jp_db_Commit(glite_jp_context_t ctx) { + int ret; + + ret = glite_lbu_Commit(ctx->dbhandle); + if (ret != 0) glite_jp_db_SetError(ctx, __FUNCTION__); + + return ret; +} + + +int glite_jp_db_Rollback(glite_jp_context_t ctx) { + int ret; + + ret = glite_lbu_Rollback(ctx->dbhandle); + if (ret != 0) glite_jp_db_SetError(ctx, __FUNCTION__); + + return ret; +} -- 1.8.2.3