From 37f2be8bc68ed35fe3eb37bd4bfeece6613b3796 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Franti=C5=A1ek=20Dvo=C5=99=C3=A1k?= Date: Tue, 20 Jun 2006 15:12:53 +0000 Subject: [PATCH] Transaction patch for sharing. Default switched off so it shouldn't break anything. (with a tiny fix, and better error text on failed load). --- org.glite.lb.common/interface/context-int.h | 3 ++ org.glite.lb.server/config/glite-lb-dbsetup.sql | 22 ++++++------ org.glite.lb.server/interface/lbs_db.h | 3 ++ org.glite.lb.server/src/bkserverd.c | 12 +++++-- org.glite.lb.server/src/lbs_db.c | 45 ++++++++++++++++++++++++- org.glite.lb.server/src/load.c | 14 ++++++-- org.glite.lb.server/src/request.c | 8 +++++ org.glite.lb.server/src/store.c.T | 20 +++++++---- org.glite.lb.server/test/test_query_events.cpp | 3 ++ 9 files changed, 107 insertions(+), 23 deletions(-) diff --git a/org.glite.lb.common/interface/context-int.h b/org.glite.lb.common/interface/context-int.h index 84cbedf..91c13cc 100644 --- a/org.glite.lb.common/interface/context-int.h +++ b/org.glite.lb.common/interface/context-int.h @@ -137,6 +137,9 @@ struct _edg_wll_Context { edg_wlc_JobId p_jobid; edg_wll_SeqCode p_seqcode; int count_statistics; + + /* TODO: belongs to database part */ + int use_transactions; }; /* to be used internally: set, update and and clear the error information in diff --git a/org.glite.lb.server/config/glite-lb-dbsetup.sql b/org.glite.lb.server/config/glite-lb-dbsetup.sql index af04974..81266fe 100644 --- a/org.glite.lb.server/config/glite-lb-dbsetup.sql +++ b/org.glite.lb.server/config/glite-lb-dbsetup.sql @@ -7,7 +7,7 @@ create table jobs ( primary key (jobid), unique (dg_jobid), index (userid) -); +) engine=innodb; create table users ( userid char(32) binary not null, @@ -15,7 +15,7 @@ create table users ( primary key (userid), unique (cert_subj) -); +) engine=innodb; create table events ( jobid char(32) binary not null, @@ -35,7 +35,7 @@ create table events ( index (time_stamp), index (host), index (arrived) -); +) engine=innodb; create table short_fields ( jobid char(32) binary not null, @@ -44,7 +44,7 @@ create table short_fields ( value varchar(255) binary null, primary key (jobid,event,name) -); +) engine=innodb; create table long_fields ( jobid char(32) binary not null, @@ -53,7 +53,7 @@ create table long_fields ( value mediumblob null, primary key (jobid,event,name) -); +) engine=innodb; create table states ( jobid char(32) binary not null, @@ -66,7 +66,7 @@ create table states ( primary key (jobid), index (parent_job) -); +) engine=innodb; create table status_tags ( jobid char(32) binary not null, @@ -75,7 +75,7 @@ create table status_tags ( value varchar(255) binary null, primary key (jobid,seq,name) -); +) engine=innodb; create table server_state ( prefix varchar(100) not null, @@ -83,7 +83,7 @@ create table server_state ( value varchar(255) binary not null, primary key (prefix,name) -); +) engine=innodb; create table acls ( aclid char(32) binary not null, @@ -91,7 +91,7 @@ create table acls ( refcnt int not null, primary key (aclid) -); +) engine=innodb; create table notif_registrations ( notifid char(32) binary not null, @@ -101,7 +101,7 @@ create table notif_registrations ( conditions mediumblob not null, primary key (notifid) -); +) engine=innodb; create table notif_jobs ( notifid char(32) binary not null, @@ -109,4 +109,4 @@ create table notif_jobs ( primary key (notifid,jobid), index (jobid) -); +) engine=innodb; diff --git a/org.glite.lb.server/interface/lbs_db.h b/org.glite.lb.server/interface/lbs_db.h index 8b236f3..14dbad8 100644 --- a/org.glite.lb.server/interface/lbs_db.h +++ b/org.glite.lb.server/interface/lbs_db.h @@ -80,6 +80,9 @@ extern edg_wll_ErrorCode edg_wll_Open(edg_wll_Context ctx, char *cs); */ int edg_wll_DBCheckVersion(edg_wll_Context); +int edg_wll_Transaction(edg_wll_Context ctx); +int edg_wll_Commit(edg_wll_Context ctx); +int edg_wll_Rollback(edg_wll_Context ctx); #ifdef __cplusplus } diff --git a/org.glite.lb.server/src/bkserverd.c b/org.glite.lb.server/src/bkserverd.c index 243e395..de33b6b 100644 --- a/org.glite.lb.server/src/bkserverd.c +++ b/org.glite.lb.server/src/bkserverd.c @@ -177,10 +177,11 @@ static struct option opts[] = { #ifdef LB_PERF {"perf-sink", 1, NULL, 'K'}, #endif + {"transactions", 0, NULL, 'b'}, {NULL,0,NULL,0} }; -static const char *get_opt_string = "a:c:k:C:V:p:drm:ns:l:L:N:i:S:D:X:Y:T:t:J:jz" +static const char *get_opt_string = "a:c:k:C:V:p:drm:ns:l:L:N:i:S:D:X:Y:T:t:J:jzb" #ifdef GLITE_LB_SERVER_WITH_WS "w:" #endif @@ -193,6 +194,7 @@ static void usage(char *me) { fprintf(stderr,"usage: %s [option]\n" "\t-a, --address\t use this server address (may be faked for debugging)\n" + "\t-b, --transactions\t use transactions\n" "\t-k, --key\t private key file\n" "\t-c, --cert\t certificate file\n" "\t-C, --CAdir\t trusted certificates directory\n" @@ -223,6 +225,7 @@ static void usage(char *me) "\t--notif-il-fprefix\t file prefix for notifications\n" "\t--count-statistics=1\t count certain statistics on jobs\n" "\t =2\t ... and allow anonymous access\n" + "\t-t, --request-timeout\t request timeout for one client\n" "\t--silent\t don't print diagnostic, even if -d is on\n" #ifdef LB_PERF "\t--perf-sink\t where to sink events\n" @@ -308,7 +311,8 @@ int main(int argc, char *argv[]) edg_wll_GssStatus gss_code; struct timeval to; int request_timeout = REQUEST_TIMEOUT; - int silent = 0; + int silent = 0; + int transactions = 0; @@ -332,6 +336,7 @@ int main(int argc, char *argv[]) while ((opt = getopt_long(argc,argv,get_opt_string,opts,NULL)) != EOF) switch (opt) { case 'a': fake_host = strdup(optarg); break; + case 'b': transactions = 1; break; case 'c': server_cert = optarg; break; case 'k': server_key = optarg; break; case 'C': cadir = optarg; break; @@ -559,6 +564,7 @@ a.sin_addr.s_addr = INADDR_ANY; /* Just check the database and let it be. The slaves do the job. */ edg_wll_InitContext(&ctx); + ctx->use_transactions = transactions; wait_for_open(ctx, dbstring); if (edg_wll_DBCheckVersion(ctx)) @@ -1394,7 +1400,7 @@ static int read_roots(const char *file) nl = strchr(buf,'\n'); if (nl) *nl = 0; - super_users = realloc(super_users, (cnt+1) * sizeof super_users[0]); + super_users = realloc(super_users, (cnt+2) * sizeof super_users[0]); super_users[cnt] = strdup(buf); super_users[++cnt] = NULL; } diff --git a/org.glite.lb.server/src/lbs_db.c b/org.glite.lb.server/src/lbs_db.c index 04e5111..0aa72d2 100644 --- a/org.glite.lb.server/src/lbs_db.c +++ b/org.glite.lb.server/src/lbs_db.c @@ -64,6 +64,9 @@ edg_wll_ErrorCode edg_wll_DBConnect(edg_wll_Context ctx,char *cs) } free(buf); +#ifdef LBS_DB_PROFILE + fprintf(stderr, "[%d] use_transactions = %d\n", getpid(), ctx->use_transactions); +#endif return edg_wll_ResetError(ctx); } @@ -78,9 +81,10 @@ int edg_wll_ExecStmt(edg_wll_Context ctx,char *txt,edg_wll_Stmt *stmt) int err; int retry_nr = 0; int do_reconnect = 0; +#ifdef LBS_DB_PROFILE struct timeval start,end; int pid; -#ifdef LBS_DB_PROFILE + static struct timeval sum = { tv_sec: 0, tv_usec: 0 @@ -223,3 +227,42 @@ int edg_wll_DBCheckVersion(edg_wll_Context ctx) return edg_wll_ResetError(ctx); } + + +int edg_wll_Transaction(edg_wll_Context ctx) { + int err = 0; + + if (ctx->use_transactions) { + err = edg_wll_ExecStmt(ctx, "set autocommit=0", NULL); + if (!err) + return edg_wll_ExecStmt(ctx, "begin", NULL); + } + + return err; +} + + +int edg_wll_Commit(edg_wll_Context ctx) { + int err = 0; + + if (ctx->use_transactions) { + err = edg_wll_ExecStmt(ctx, "commit", NULL); + if (!err) + return edg_wll_ExecStmt(ctx, "set autocommit=1", NULL); + } + + return err; +} + + +int edg_wll_Rollback(edg_wll_Context ctx) { + int err = 0; + + if (ctx->use_transactions) { + err = edg_wll_ExecStmt(ctx, "rollback", NULL); + if (!err) + return edg_wll_ExecStmt(ctx, "set autocommit=1", NULL); + } + + return err; +} diff --git a/org.glite.lb.server/src/load.c b/org.glite.lb.server/src/load.c index 7c6f266..f1610c1 100644 --- a/org.glite.lb.server/src/load.c +++ b/org.glite.lb.server/src/load.c @@ -51,14 +51,19 @@ int edg_wll_LoadEvents(edg_wll_Context ctx,const edg_wll_LoadRequest *req,edg_wl if ( (fd = open(req->server_file, O_RDONLY)) == -1 ) return edg_wll_SetError(ctx, errno, "Server can not open the file"); + if (edg_wll_Transaction(ctx) != 0) + return edg_wll_Error(ctx, NULL, NULL); + memset(result,0,sizeof(*result)); i = 0; while ( 1 ) { /* Read one line */ - if ( (readret = read_line(&line, &maxsize, fd)) == -1 ) + if ( (readret = read_line(&line, &maxsize, fd)) == -1 ) { + edg_wll_Rollback(ctx); return edg_wll_SetError(ctx, errno, "reading dump file"); + } if ( readret == 0 ) break; @@ -84,11 +89,13 @@ int edg_wll_LoadEvents(edg_wll_Context ctx,const edg_wll_LoadRequest *req,edg_wl ctx->event_load = 1; if ( edg_wll_StoreEvent(ctx, event, NULL) ) { + char *errdesc; int len = strlen(line), total = 0, written; - fprintf(stderr, "Can't store event\n"); + edg_wll_Error(ctx, NULL, &errdesc); + fprintf(stderr, "Can't store event: %s\n", errdesc); if ( reject_fd == -1 ) { char *s, *s1; @@ -164,6 +171,9 @@ cycle_clean: if ( reject_fd != -1 ) close(reject_fd); + if (edg_wll_Commit(ctx) != 0) + return edg_wll_Error(ctx, NULL, NULL); + return edg_wll_Error(ctx,NULL,NULL); } diff --git a/org.glite.lb.server/src/request.c b/org.glite.lb.server/src/request.c index be1885a..55ce6df 100644 --- a/org.glite.lb.server/src/request.c +++ b/org.glite.lb.server/src/request.c @@ -9,6 +9,7 @@ #include "glite/lb/context-int.h" #include "store.h" +#include "lbs_db.h" #ifdef __GNUC__ #define UNUSED_VAR __attribute__((unused)) @@ -30,8 +31,15 @@ handle_request(edg_wll_Context ctx,char *buf) return EDG_WLL_IL_PROTO; } + if ((ret = edg_wll_Transaction(ctx) != 0)) goto err; ret = db_store(ctx, "NOT USED", event.data); + if (ret == 0) { + if ((ret = edg_wll_Commit(ctx)) != 0) goto err; + } else { + edg_wll_Rollback(ctx); + } +err: if(event.data) free(event.data); diff --git a/org.glite.lb.server/src/store.c.T b/org.glite.lb.server/src/store.c.T index 8403c9c..8258d26 100644 --- a/org.glite.lb.server/src/store.c.T +++ b/org.glite.lb.server/src/store.c.T @@ -111,12 +111,20 @@ int edg_wll_StoreEvent(edg_wll_Context ctx,edg_wll_Event *e,int *seq) next = max && *max ? atoi(max)+1 : 0; - /* store an UNDEF event first in order to prevent race condition - * with readers */ + /* + * 1) when using transactions: + * Store the whole event right now. + * + * 2) when not using transaction: + * Store an UNDEF event first in order to prevent race condition + * with readers and update event code later. + */ trio_asprintf(&stmt, "insert into events(jobid,event,code,prog,host,time_stamp,usec,arrived,level,userid) " "values ('%|Ss',%d,%d,'%|Ss','%|Ss',%s,%d,%s,%d,'%|Ss')", - jobid,next,EDG_WLL_EVENT_UNDEF,ssrc,e->any.host, + jobid,next, + ctx->use_transactions ? (int) e->any.type : EDG_WLL_EVENT_UNDEF, + ssrc,e->any.host, edg_wll_TimeToDB(e->any.timestamp.tv_sec),e->any.timestamp.tv_usec, now_s, e->any.level,userid); @@ -139,8 +147,8 @@ int edg_wll_StoreEvent(edg_wll_Context ctx,edg_wll_Event *e,int *seq) edg_wll_delete_event(ctx,jobid,next); edg_wll_SetError(ctx,oerr,desc); free(desc); - } - else { + } else + if (!ctx->use_transactions) { /* emulate commit, i.e. swith to a real event type to make * the record valid */ trio_asprintf(&stmt, @@ -184,7 +192,7 @@ int edg_wll_StoreEvent(edg_wll_Context ctx,edg_wll_Event *e,int *seq) "more event records, what is that?"); break; } - } + } /* if !ctx->use_transactions */ if (err == 0 && e->any.type == EDG_WLL_EVENT_REGJOB && diff --git a/org.glite.lb.server/test/test_query_events.cpp b/org.glite.lb.server/test/test_query_events.cpp index 4277872..8234f99 100644 --- a/org.glite.lb.server/test/test_query_events.cpp +++ b/org.glite.lb.server/test/test_query_events.cpp @@ -135,6 +135,9 @@ edg_wll_ErrorCode edg_wll_DBConnect(edg_wll_Context ctx, char*str) { return (edg_wll_ErrorCode)0; } +int edg_wll_Transaction(edg_wll_Context ctx) { return 0; } +int edg_wll_Commit(edg_wll_Context ctx) { return 0; } +int edg_wll_Rollback(edg_wll_Context ctx) { return 0; } -- 1.8.2.3