(with a tiny fix, and better error text on failed load).
edg_wlc_JobId p_jobid;
edg_wll_SeqCode p_seqcode;
int count_statistics;
+
+ /* TODO: belongs to database part */
+ int use_transactions;
};
/* to be used internally: set, update and and clear the error information in
primary key (jobid),
unique (dg_jobid),
index (userid)
-);
+) engine=innodb;
create table users (
userid char(32) binary not null,
primary key (userid),
unique (cert_subj)
-);
+) engine=innodb;
create table events (
jobid char(32) binary not null,
index (time_stamp),
index (host),
index (arrived)
-);
+) engine=innodb;
create table short_fields (
jobid char(32) binary not null,
value varchar(255) binary null,
primary key (jobid,event,name)
-);
+) engine=innodb;
create table long_fields (
jobid char(32) binary not null,
value mediumblob null,
primary key (jobid,event,name)
-);
+) engine=innodb;
create table states (
jobid char(32) binary not null,
primary key (jobid),
index (parent_job)
-);
+) engine=innodb;
create table status_tags (
jobid char(32) binary not null,
value varchar(255) binary null,
primary key (jobid,seq,name)
-);
+) engine=innodb;
create table server_state (
prefix varchar(100) not null,
value varchar(255) binary not null,
primary key (prefix,name)
-);
+) engine=innodb;
create table acls (
aclid char(32) binary not null,
refcnt int not null,
primary key (aclid)
-);
+) engine=innodb;
create table notif_registrations (
notifid char(32) binary not null,
conditions mediumblob not null,
primary key (notifid)
-);
+) engine=innodb;
create table notif_jobs (
notifid char(32) binary not null,
primary key (notifid,jobid),
index (jobid)
-);
+) engine=innodb;
*/
int edg_wll_DBCheckVersion(edg_wll_Context);
+int edg_wll_Transaction(edg_wll_Context ctx);
+int edg_wll_Commit(edg_wll_Context ctx);
+int edg_wll_Rollback(edg_wll_Context ctx);
#ifdef __cplusplus
}
#ifdef LB_PERF
{"perf-sink", 1, NULL, 'K'},
#endif
+ {"transactions", 0, NULL, 'b'},
{NULL,0,NULL,0}
};
-static const char *get_opt_string = "a:c:k:C:V:p:drm:ns:l:L:N:i:S:D:X:Y:T:t:J:jz"
+static const char *get_opt_string = "a:c:k:C:V:p:drm:ns:l:L:N:i:S:D:X:Y:T:t:J:jzb"
#ifdef GLITE_LB_SERVER_WITH_WS
"w:"
#endif
{
fprintf(stderr,"usage: %s [option]\n"
"\t-a, --address\t use this server address (may be faked for debugging)\n"
+ "\t-b, --transactions\t use transactions\n"
"\t-k, --key\t private key file\n"
"\t-c, --cert\t certificate file\n"
"\t-C, --CAdir\t trusted certificates directory\n"
"\t--notif-il-fprefix\t file prefix for notifications\n"
"\t--count-statistics=1\t count certain statistics on jobs\n"
"\t =2\t ... and allow anonymous access\n"
+ "\t-t, --request-timeout\t request timeout for one client\n"
"\t--silent\t don't print diagnostic, even if -d is on\n"
#ifdef LB_PERF
"\t--perf-sink\t where to sink events\n"
edg_wll_GssStatus gss_code;
struct timeval to;
int request_timeout = REQUEST_TIMEOUT;
- int silent = 0;
+ int silent = 0;
+ int transactions = 0;
while ((opt = getopt_long(argc,argv,get_opt_string,opts,NULL)) != EOF) switch (opt) {
case 'a': fake_host = strdup(optarg); break;
+ case 'b': transactions = 1; break;
case 'c': server_cert = optarg; break;
case 'k': server_key = optarg; break;
case 'C': cadir = optarg; break;
/* Just check the database and let it be. The slaves do the job. */
edg_wll_InitContext(&ctx);
+ ctx->use_transactions = transactions;
wait_for_open(ctx, dbstring);
if (edg_wll_DBCheckVersion(ctx))
nl = strchr(buf,'\n');
if (nl) *nl = 0;
- super_users = realloc(super_users, (cnt+1) * sizeof super_users[0]);
+ super_users = realloc(super_users, (cnt+2) * sizeof super_users[0]);
super_users[cnt] = strdup(buf);
super_users[++cnt] = NULL;
}
}
free(buf);
+#ifdef LBS_DB_PROFILE
+ fprintf(stderr, "[%d] use_transactions = %d\n", getpid(), ctx->use_transactions);
+#endif
return edg_wll_ResetError(ctx);
}
int err;
int retry_nr = 0;
int do_reconnect = 0;
+#ifdef LBS_DB_PROFILE
struct timeval start,end;
int pid;
-#ifdef LBS_DB_PROFILE
+
static struct timeval sum = {
tv_sec: 0,
tv_usec: 0
return edg_wll_ResetError(ctx);
}
+
+
+int edg_wll_Transaction(edg_wll_Context ctx) {
+ int err = 0;
+
+ if (ctx->use_transactions) {
+ err = edg_wll_ExecStmt(ctx, "set autocommit=0", NULL);
+ if (!err)
+ return edg_wll_ExecStmt(ctx, "begin", NULL);
+ }
+
+ return err;
+}
+
+
+int edg_wll_Commit(edg_wll_Context ctx) {
+ int err = 0;
+
+ if (ctx->use_transactions) {
+ err = edg_wll_ExecStmt(ctx, "commit", NULL);
+ if (!err)
+ return edg_wll_ExecStmt(ctx, "set autocommit=1", NULL);
+ }
+
+ return err;
+}
+
+
+int edg_wll_Rollback(edg_wll_Context ctx) {
+ int err = 0;
+
+ if (ctx->use_transactions) {
+ err = edg_wll_ExecStmt(ctx, "rollback", NULL);
+ if (!err)
+ return edg_wll_ExecStmt(ctx, "set autocommit=1", NULL);
+ }
+
+ return err;
+}
if ( (fd = open(req->server_file, O_RDONLY)) == -1 )
return edg_wll_SetError(ctx, errno, "Server can not open the file");
+ if (edg_wll_Transaction(ctx) != 0)
+ return edg_wll_Error(ctx, NULL, NULL);
+
memset(result,0,sizeof(*result));
i = 0;
while ( 1 )
{
/* Read one line
*/
- if ( (readret = read_line(&line, &maxsize, fd)) == -1 )
+ if ( (readret = read_line(&line, &maxsize, fd)) == -1 ) {
+ edg_wll_Rollback(ctx);
return edg_wll_SetError(ctx, errno, "reading dump file");
+ }
if ( readret == 0 )
break;
ctx->event_load = 1;
if ( edg_wll_StoreEvent(ctx, event, NULL) )
{
+ char *errdesc;
int len = strlen(line),
total = 0,
written;
- fprintf(stderr, "Can't store event\n");
+ edg_wll_Error(ctx, NULL, &errdesc);
+ fprintf(stderr, "Can't store event: %s\n", errdesc);
if ( reject_fd == -1 )
{
char *s, *s1;
if ( reject_fd != -1 )
close(reject_fd);
+ if (edg_wll_Commit(ctx) != 0)
+ return edg_wll_Error(ctx, NULL, NULL);
+
return edg_wll_Error(ctx,NULL,NULL);
}
#include "glite/lb/context-int.h"
#include "store.h"
+#include "lbs_db.h"
#ifdef __GNUC__
#define UNUSED_VAR __attribute__((unused))
return EDG_WLL_IL_PROTO;
}
+ if ((ret = edg_wll_Transaction(ctx) != 0)) goto err;
ret = db_store(ctx, "NOT USED", event.data);
+ if (ret == 0) {
+ if ((ret = edg_wll_Commit(ctx)) != 0) goto err;
+ } else {
+ edg_wll_Rollback(ctx);
+ }
+err:
if(event.data)
free(event.data);
next = max && *max ? atoi(max)+1 : 0;
- /* store an UNDEF event first in order to prevent race condition
- * with readers */
+ /*
+ * 1) when using transactions:
+ * Store the whole event right now.
+ *
+ * 2) when not using transaction:
+ * Store an UNDEF event first in order to prevent race condition
+ * with readers and update event code later.
+ */
trio_asprintf(&stmt,
"insert into events(jobid,event,code,prog,host,time_stamp,usec,arrived,level,userid) "
"values ('%|Ss',%d,%d,'%|Ss','%|Ss',%s,%d,%s,%d,'%|Ss')",
- jobid,next,EDG_WLL_EVENT_UNDEF,ssrc,e->any.host,
+ jobid,next,
+ ctx->use_transactions ? (int) e->any.type : EDG_WLL_EVENT_UNDEF,
+ ssrc,e->any.host,
edg_wll_TimeToDB(e->any.timestamp.tv_sec),e->any.timestamp.tv_usec,
now_s, e->any.level,userid);
edg_wll_delete_event(ctx,jobid,next);
edg_wll_SetError(ctx,oerr,desc);
free(desc);
- }
- else {
+ } else
+ if (!ctx->use_transactions) {
/* emulate commit, i.e. swith to a real event type to make
* the record valid */
trio_asprintf(&stmt,
"more event records, what is that?");
break;
}
- }
+ } /* if !ctx->use_transactions */
if (err == 0 &&
e->any.type == EDG_WLL_EVENT_REGJOB &&
return (edg_wll_ErrorCode)0;
}
+int edg_wll_Transaction(edg_wll_Context ctx) { return 0; }
+int edg_wll_Commit(edg_wll_Context ctx) { return 0; }
+int edg_wll_Rollback(edg_wll_Context ctx) { return 0; }