DB engine switched to InnoDB, enabled transactions for feeding.
Transaction functions wrappers in JP + one bugfix in error propagation.
unique (dg_jobid),
index (jobid),
index (dg_jobid)
-);
+) engine=innodb;
create table attrs (
`attrid` char(32) binary not null,
primary key (attrid),
index (attrid),
index (name)
-);
+) engine=innodb;
create table feeds (
`uniqueid` int auto_increment not null,
index (uniqueid),
index (feedid),
index (state)
-);
+) engine=innodb;
create table acls (
`aclid` char(32) binary not null,
`refcnt` int not null,
primary key (aclid)
-);
+) engine=innodb;
create table users (
`userid` char(32) binary not null,
primary key (userid),
unique (cert_subj)
-);
+) engine=innodb;
# data tables - created one for each configured attribute, index on
#
# index (jobid),
# index (value)
-#);
+#) engine=innodb;
-<!DOCTYPE refentry PUBLIC "-//OASIS//DTD DocBook V4.1//EN">
+<!DOCTYPE refentry PUBLIC "-//OASIS//DTD DocBook V4.1//EN" "http://docbook.org/xml/4.1/docbookx.dtd">
<refentry id='glitejpindexd'>
<arg>--delete-db</arg>
</group></arg>
+ <arg><group choice='plain'>
+ <arg>-f</arg>
+ <arg>--feeding</arg>
+ </group> <replaceable>FILE</replaceable> </arg>
+
</cmdsynopsis>
</refsynopsisdiv>
</para></listitem>
</varlistentry>
+ <varlistentry>
+ <term><option>-f</option>|<option>--feeding</option></term>
+ <listitem><para>
+Feed the index server from the local file. Format of the file is job per line with attribute values using ";" separator. Attributes go in order according to server configuration file (possible attributes with jobid and owner in configuration file are ignored).
+ </para></listitem>
+ </varlistentry>
+
</variablelist>
</refsect1>
edg_wll_GssStatus gss_code;
struct sockaddr_in a;
glite_jpis_context_t isctx;
-
+ int retval = 0;
glite_jp_init_context(&ctx);
if (conf->delete_db) {
if (glite_jpis_dropDatabase(isctx) != 0) {
fprintf(stderr, "Drop DB failed: %s (%s)\n", ctx->error->desc, ctx->error->source);
- glite_jpis_free_db(isctx);
- glite_jpis_free_context(isctx);
- glite_jp_free_context(ctx);
- glite_jp_free_conf(conf);
- return 1;
+ retval = 1;
+ goto quit;
}
}
if (glite_jpis_initDatabase(isctx) != 0) {
fprintf(stderr, "Init DB failed: %s (%s)\n", ctx->error->desc, ctx->error->source);
- glite_jpis_free_db(isctx);
- glite_jpis_free_context(isctx);
- glite_jp_free_context(ctx);
- glite_jp_free_conf(conf);
- return 1;
+ retval = 1;
+ goto quit;
+ }
+
+ if (conf->feeding) {
+ char *err;
+
+ fprintf(stderr, "%s: Feeding from '%s'\n", argv[0], conf->feeding);
+ retval = glite_jpis_feeding(isctx, conf->feeding);
+ if (retval) {
+ err = glite_jp_error_chain(isctx->jpctx);
+ fprintf(stderr, "%s: %s\n", argv[0], err);
+ free(err);
+ }
+ goto quit;
}
stab.conn = socket(PF_INET, SOCK_STREAM, 0);
#endif
glite_srvbones_run(data_init,&stab,1 /* XXX: entries in stab */,debug);
+quit:
glite_jpis_free_db(isctx);
glite_jp_free_conf(conf);
glite_jpis_free_context(isctx);
glite_jp_free_context(ctx);
- return 0;
+ return retval;
}
feedid = NULL;
for (initialized = 0; initialized <= 1; initialized++) {
switch (glite_jpis_lockSearchFeed(isctx,initialized,&uniqueid,&PS_URL,&status,&feedid)) {
- case 0:
+ case 0:
+ // some locked feeds found
ok = 0;
for (i = 0; i < 10; i++) {
if (!initialized) {
extern SOAP_NMAC struct Namespace jp__namespaces[];
-static const char *get_opt_string = "dq:c:k:C:V:nm:p:i:o:x:s:D";
+static const char *get_opt_string = "dq:c:k:C:V:nm:p:i:o:x:s:Df:";
static struct option opts[] = {
{"debug", 0, NULL, 'd'},
{"config", 1, NULL, 'x'},
{"slaves", 1, NULL, 's'},
{"delete-db", 0, NULL, 'D'},
+ {"feeding", 1, NULL, 'f'},
{NULL, 0, NULL, 0}
};
"\t-o, --logfile\t file to store logs\n"
"\t-x, --config\t file with server configuration\n"
"\t-s, --slaves\t number of slaves for responses\n"
- "\t-D, --delete-db\t delete and restore data in the database"
+ "\t-D, --delete-db\t delete and restore data in the database\n"
+ "\t-f, --feeding\t feed index server from local file\n"
"\n"
,me);
}
case 'x': conf_file = optarg; break;
case 's': conf->slaves = atoi(optarg); if (conf->slaves > 0) break;
case 'D': conf->delete_db = 1; break;
+ case 'f': conf->feeding = optarg; break;
default : usage(argv[0]); exit(0); break;
}
*server_key;
int slaves;
int delete_db;
+
+ char *feeding; // feed DB from local file
} glite_jp_is_conf;
#include <assert.h>
#include <stdio.h>
#include <ctype.h>
+#include <string.h>
#include <glite/lbu/trio.h>
#include <glite/jobid/strmd5.h>
+#include <glite/jobid/cjobid.h>
#include <glite/jp/types.h>
#include <glite/jp/context.h>
#include <glite/jp/db.h>
#include <glite/jp/attr.h>
+#include "glite/jp/known_attr.h"
#include "conf.h"
#include "context.h"
\n\
INDEX (jobid),\n\
INDEX (value)\n\
-);"
+) ENGINE=innodb;"
#define SQLCMD_INSERT_ATTRVAL "INSERT INTO " TABLE_PREFIX_DATA "%|Ss (jobid, value, full_value, origin) VALUES (\n\
'%|Ss',\n\
'%|Ss',\n\
free(num);
glite_jp_db_FreeStmt(&stmt);
if (nattrs != 0) {
- lprintf("database with %d attributes keeped (use -D for delete)\n");
+ lprintf("database with %d attributes kept (use -D for delete)\n", nattrs);
return 0;
}
int glite_jpis_init_db(glite_jpis_context_t isctx) {
- int ret;
+ int ret, caps;
const char *cs;
glite_jp_context_t jpctx;
if ((cs = isctx->conf->cs) == NULL) cs = GLITE_JP_IS_DEFAULTCS;
if (glite_lbu_DBConnect(jpctx->dbhandle, cs) != 0) goto fail_db;
+ // try transaction for feeding
+ if (isctx->conf->feeding) {
+ caps = glite_lbu_DBQueryCaps(jpctx->dbhandle);
+ if (caps != -1) {
+ glite_lbu_DBSetCaps(jpctx->dbhandle, caps);
+ llprintf(LOG_SQL, "transactions %s\n", (caps & GLITE_LBU_DB_CAP_TRANSACTIONS) ? "supported" : "not supported");
+ }
+ }
+
// sql command: lock the feed (via uniqueid)
if ((ret = glite_jp_db_PrepareStmt(jpctx, "UPDATE feeds SET locked=1 WHERE (locked = 0) AND (uniqueid = ?)", &isctx->lock_feed_stmt)) != 0) goto fail;
free(value);
free(full_value);
llprintf(LOG_SQL, "(%s) sql=%s\n", av->name, sql);
+// if (ctx->conf->feeding) printf("FEED: %s\n", sql);
+// else
if (glite_jp_db_ExecSQL(ctx->jpctx, sql, NULL) != 1) {
free(sql);
return ctx->jpctx->error->code;
free(md5_cert);
return ctx->jpctx->error->code;
}
+
+
+#define FEEDING_SEPARATORS ";"
+#define FEEDING_JOBID_BKSERVER "localhost-test"
+#define FEEDING_JOBID_PORT 0
+#define FEEDING_PRIMARY_STORAGE "localhost:8901"
+#define FEEDING_OWNER "God"
+int glite_jpis_feeding(glite_jpis_context_t ctx, const char *fname) {
+ FILE *f;
+ char line[1024], *token, *lasts, *jobid = NULL;
+ int nattrs, lno, i, iname, c;
+ glite_jp_attrval_t *avs;
+ glite_jobid_t j;
+
+ if ((f = fopen(fname, "rt")) == NULL) {
+ glite_jpis_stack_error(ctx->jpctx, errno, "can't open csv dump file");
+ return 1;
+ }
+
+ for (nattrs = 0; ctx->conf->attrs[nattrs]; nattrs++);
+ avs = malloc(nattrs * sizeof avs[0]);
+
+ lno = 0;
+ while(fgets(line, sizeof line, f) != NULL) {
+ if ((lno % 100) == 0) {
+ if (lno) glite_jp_db_Commit(ctx->jpctx);
+ glite_jp_db_Transaction(ctx->jpctx);
+ }
+ lno++;
+ if (line[0]) {
+ c = strlen(line) - 1;
+ if (line[c] != '\r' && line[c] != '\n' && !feof(f)) {
+ glite_jpis_stack_error(ctx->jpctx, E2BIG, "line too large at %d (max. %d)", lno, sizeof line);
+ goto err;
+ }
+ while (c >= 0 && (line[c] == '\r' || line[c] == '\n')) c--;
+ line[c + 1] = 0;
+ }
+// printf("'%s'\n", line);
+
+ memset(avs, 0, nattrs * sizeof avs[0]);
+ i = 0;
+ iname = 0;
+ token = strtok_r(line, FEEDING_SEPARATORS, &lasts);
+ while (token && iname < nattrs) {
+// printf("\t'%s'\n", token);
+ do {
+ avs[i].name = ctx->conf->attrs[iname];
+ iname++;
+ } while (strcasecmp(avs[i].name, GLITE_JP_ATTR_JOBID) == 0 || strcasecmp(avs[i].name, GLITE_JP_ATTR_OWNER) == 0);
+ avs[i].value = token;
+ avs[i].timestamp = time(NULL);
+ fprintf(stderr, "\t %d: %s = '%s'\n", i, avs[i].name, avs[i].value);
+ i++;
+
+ token = strtok_r(NULL, FEEDING_SEPARATORS, &lasts);
+ }
+
+ if (glite_jobid_create(FEEDING_JOBID_BKSERVER, FEEDING_JOBID_PORT, &j) != 0) {
+ glite_jpis_stack_error(ctx->jpctx, errno, "can't create jobid");
+ goto err;
+ }
+ if ((jobid = glite_jobid_unparse(j)) == NULL) {
+ glite_jobid_free(j);
+ glite_jpis_stack_error(ctx->jpctx, ENOMEM, "can't unparse jobid");
+ goto err;
+ }
+ glite_jobid_free(j);
+ if (glite_jpis_lazyInsertJob(ctx, FEEDING_PRIMARY_STORAGE, jobid, FEEDING_OWNER)) goto err;
+ for (i = 0; i < nattrs && avs[i].name; i++) {
+ if (glite_jpis_insertAttrVal(ctx, jobid, &avs[i])) goto err;
+ }
+ free(jobid); jobid = NULL;
+ }
+ glite_jp_db_Commit(ctx->jpctx);
+
+ fclose(f);
+ free(avs);
+ return 0;
+err:
+ fclose(f);
+ free(avs);
+ free(jobid);
+ glite_jp_db_Rollback(ctx->jpctx);
+ return 1;
+}
int glite_jpis_lazyInsertJob(glite_jpis_context_t ctx, const char *ps, const char *jobid, const char *owner);
+int glite_jpis_feeding(glite_jpis_context_t ctx, const char *fname);
+
#endif
int glite_jp_db_PrepareStmt(glite_jp_context_t ctx, const char *sql, glite_lbu_Statement *stmt);
int glite_jp_db_ExecPreparedStmt(glite_jp_context_t ctx, glite_lbu_Statement stmt, int n,...);
void glite_jp_db_FreeStmt(glite_lbu_Statement *stmt);
+int glite_jp_db_Transaction(glite_jp_context_t ctx);
+int glite_jp_db_Commit(glite_jp_context_t ctx);
+int glite_jp_db_Rollback(glite_jp_context_t ctx);
#ifdef __cplusplus
}
ret = glite_lbu_PrepareStmt(ctx->dbhandle, sql, stmt);
if (ret < 0) glite_jp_db_SetError(ctx, __FUNCTION__);
- return glite_jp_db_SetError(ctx, __FUNCTION__);
+ return ret;
}
va_start(ap, n);
ret = glite_lbu_ExecPreparedStmt_v(stmt, n, ap);
va_end(ap);
+ if (ret < 0) glite_jp_db_SetError(ctx, __FUNCTION__);
return ret;
}
void glite_jp_db_FreeStmt(glite_lbu_Statement *stmt) {
glite_lbu_FreeStmt(stmt);
}
+
+
+int glite_jp_db_Transaction(glite_jp_context_t ctx) {
+ int ret;
+
+ ret = glite_lbu_Transaction(ctx->dbhandle);
+ if (ret != 0) glite_jp_db_SetError(ctx, __FUNCTION__);
+
+ return ret;
+}
+
+
+int glite_jp_db_Commit(glite_jp_context_t ctx) {
+ int ret;
+
+ ret = glite_lbu_Commit(ctx->dbhandle);
+ if (ret != 0) glite_jp_db_SetError(ctx, __FUNCTION__);
+
+ return ret;
+}
+
+
+int glite_jp_db_Rollback(glite_jp_context_t ctx) {
+ int ret;
+
+ ret = glite_lbu_Rollback(ctx->dbhandle);
+ if (ret != 0) glite_jp_db_SetError(ctx, __FUNCTION__);
+
+ return ret;
+}