Transaction patch for sharing. Default switched off so it shouldn't break anything.
authorFrantišek Dvořák <valtri@civ.zcu.cz>
Tue, 20 Jun 2006 15:12:53 +0000 (15:12 +0000)
committerFrantišek Dvořák <valtri@civ.zcu.cz>
Tue, 20 Jun 2006 15:12:53 +0000 (15:12 +0000)
(with a tiny fix, and better error text on failed load).

org.glite.lb.common/interface/context-int.h
org.glite.lb.server/config/glite-lb-dbsetup.sql
org.glite.lb.server/interface/lbs_db.h
org.glite.lb.server/src/bkserverd.c
org.glite.lb.server/src/lbs_db.c
org.glite.lb.server/src/load.c
org.glite.lb.server/src/request.c
org.glite.lb.server/src/store.c.T
org.glite.lb.server/test/test_query_events.cpp

index 84cbedf..91c13cc 100644 (file)
@@ -137,6 +137,9 @@ struct _edg_wll_Context {
        edg_wlc_JobId   p_jobid;
        edg_wll_SeqCode p_seqcode;
        int             count_statistics;
+
+       /* TODO: belongs to database part */
+       int use_transactions;
 };
 
 /* to be used internally: set, update and and clear the error information in 
index af04974..81266fe 100644 (file)
@@ -7,7 +7,7 @@ create table jobs (
        primary key (jobid),
        unique (dg_jobid),
        index (userid)
-);
+) engine=innodb;
 
 create table users (
        userid          char(32)        binary not null,
@@ -15,7 +15,7 @@ create table users (
 
        primary key (userid),
        unique (cert_subj)
-);
+) engine=innodb;
 
 create table events (
        jobid           char(32)        binary not null,
@@ -35,7 +35,7 @@ create table events (
        index (time_stamp),
        index (host),
        index (arrived)
-);
+) engine=innodb;
 
 create table short_fields (
        jobid           char(32)        binary not null,
@@ -44,7 +44,7 @@ create table short_fields (
        value           varchar(255)    binary null,
 
        primary key (jobid,event,name)
-);
+) engine=innodb;
 
 create table long_fields (
        jobid           char(32)        binary not null,
@@ -53,7 +53,7 @@ create table long_fields (
        value           mediumblob      null,
 
        primary key (jobid,event,name)
-);
+) engine=innodb;
 
 create table states (
        jobid           char(32)        binary not null,
@@ -66,7 +66,7 @@ create table states (
        primary key (jobid),
        index (parent_job)
        
-);
+) engine=innodb;
 
 create table status_tags (
        jobid           char(32)        binary not null,
@@ -75,7 +75,7 @@ create table status_tags (
        value           varchar(255)    binary null,
 
        primary key (jobid,seq,name)
-);
+) engine=innodb;
 
 create table server_state (
        prefix          varchar(100)    not null,
@@ -83,7 +83,7 @@ create table server_state (
        value           varchar(255)    binary not null,
 
        primary key (prefix,name)
-);
+) engine=innodb;
 
 create table acls (
        aclid           char(32)        binary not null,
@@ -91,7 +91,7 @@ create table acls (
        refcnt          int             not null,
 
        primary key (aclid)
-);
+) engine=innodb;
 
 create table notif_registrations (
        notifid         char(32)        binary not null,
@@ -101,7 +101,7 @@ create table notif_registrations (
        conditions      mediumblob      not null,
 
        primary key (notifid)
-);
+) engine=innodb;
 
 create table notif_jobs (
        notifid         char(32)        binary not null,
@@ -109,4 +109,4 @@ create table notif_jobs (
 
        primary key (notifid,jobid),
        index (jobid)
-);
+) engine=innodb;
index 8b236f3..14dbad8 100644 (file)
@@ -80,6 +80,9 @@ extern edg_wll_ErrorCode edg_wll_Open(edg_wll_Context ctx, char *cs);
  */
 int edg_wll_DBCheckVersion(edg_wll_Context);
 
+int edg_wll_Transaction(edg_wll_Context ctx);
+int edg_wll_Commit(edg_wll_Context ctx);
+int edg_wll_Rollback(edg_wll_Context ctx);
 
 #ifdef __cplusplus
 }
index 243e395..de33b6b 100644 (file)
@@ -177,10 +177,11 @@ static struct option opts[] = {
 #ifdef LB_PERF
        {"perf-sink",           1, NULL,        'K'},
 #endif
+       {"transactions",        0,      NULL,   'b'},
        {NULL,0,NULL,0}
 };
 
-static const char *get_opt_string = "a:c:k:C:V:p:drm:ns:l:L:N:i:S:D:X:Y:T:t:J:jz"
+static const char *get_opt_string = "a:c:k:C:V:p:drm:ns:l:L:N:i:S:D:X:Y:T:t:J:jzb"
 #ifdef GLITE_LB_SERVER_WITH_WS
        "w:"
 #endif
@@ -193,6 +194,7 @@ static void usage(char *me)
 {
        fprintf(stderr,"usage: %s [option]\n"
                "\t-a, --address\t use this server address (may be faked for debugging)\n"
+               "\t-b, --transactions\t use transactions\n"
                "\t-k, --key\t private key file\n"
                "\t-c, --cert\t certificate file\n"
                "\t-C, --CAdir\t trusted certificates directory\n"
@@ -223,6 +225,7 @@ static void usage(char *me)
                "\t--notif-il-fprefix\t file prefix for notifications\n"
                "\t--count-statistics=1\t count certain statistics on jobs\n"
                "\t                  =2\t ... and allow anonymous access\n"
+               "\t-t, --request-timeout\t request timeout for one client\n"
                "\t--silent\t don't print diagnostic, even if -d is on\n"
 #ifdef LB_PERF
                "\t--perf-sink\t where to sink events\n"
@@ -308,7 +311,8 @@ int main(int argc, char *argv[])
        edg_wll_GssStatus       gss_code;
        struct timeval          to;
        int                     request_timeout = REQUEST_TIMEOUT;
-       int     silent = 0;
+       int                     silent = 0;
+       int                     transactions = 0;
 
 
 
@@ -332,6 +336,7 @@ int main(int argc, char *argv[])
 
        while ((opt = getopt_long(argc,argv,get_opt_string,opts,NULL)) != EOF) switch (opt) {
                case 'a': fake_host = strdup(optarg); break;
+               case 'b': transactions = 1; break;
                case 'c': server_cert = optarg; break;
                case 'k': server_key = optarg; break;
                case 'C': cadir = optarg; break;
@@ -559,6 +564,7 @@ a.sin_addr.s_addr = INADDR_ANY;
 
        /* Just check the database and let it be. The slaves do the job. */
        edg_wll_InitContext(&ctx);
+       ctx->use_transactions = transactions;
        wait_for_open(ctx, dbstring);
 
        if (edg_wll_DBCheckVersion(ctx))
@@ -1394,7 +1400,7 @@ static int read_roots(const char *file)
                nl = strchr(buf,'\n');
                if (nl) *nl = 0;
 
-               super_users = realloc(super_users, (cnt+1) * sizeof super_users[0]);
+               super_users = realloc(super_users, (cnt+2) * sizeof super_users[0]);
                super_users[cnt] = strdup(buf);
                super_users[++cnt] = NULL;
        }
index 04e5111..0aa72d2 100644 (file)
@@ -64,6 +64,9 @@ edg_wll_ErrorCode edg_wll_DBConnect(edg_wll_Context ctx,char *cs)
        }
 
        free(buf);
+#ifdef LBS_DB_PROFILE
+       fprintf(stderr, "[%d] use_transactions = %d\n", getpid(), ctx->use_transactions);
+#endif
        return edg_wll_ResetError(ctx);
 }
 
@@ -78,9 +81,10 @@ int edg_wll_ExecStmt(edg_wll_Context ctx,char *txt,edg_wll_Stmt *stmt)
        int     err;
        int     retry_nr = 0;
        int     do_reconnect = 0;
+#ifdef LBS_DB_PROFILE
        struct timeval  start,end;
        int     pid;
-#ifdef LBS_DB_PROFILE
+
        static struct timeval sum = {
                tv_sec: 0,
                tv_usec: 0
@@ -223,3 +227,42 @@ int edg_wll_DBCheckVersion(edg_wll_Context ctx)
 
        return edg_wll_ResetError(ctx);
 }
+
+
+int edg_wll_Transaction(edg_wll_Context ctx) {
+       int err = 0;
+
+       if (ctx->use_transactions) {
+               err = edg_wll_ExecStmt(ctx, "set autocommit=0", NULL);
+               if (!err)
+                       return edg_wll_ExecStmt(ctx, "begin", NULL);
+       }
+
+       return err;
+}
+
+
+int edg_wll_Commit(edg_wll_Context ctx) {
+       int err = 0;
+
+       if (ctx->use_transactions) {
+               err = edg_wll_ExecStmt(ctx, "commit", NULL);
+               if (!err)
+                       return edg_wll_ExecStmt(ctx, "set autocommit=1", NULL);
+       }
+
+       return err;
+}
+
+
+int edg_wll_Rollback(edg_wll_Context ctx) {
+       int err = 0;
+
+       if (ctx->use_transactions) { 
+               err = edg_wll_ExecStmt(ctx, "rollback", NULL);
+               if (!err)
+                       return edg_wll_ExecStmt(ctx, "set autocommit=1", NULL);
+       }
+
+       return err;
+}
index 7c6f266..f1610c1 100644 (file)
@@ -51,14 +51,19 @@ int edg_wll_LoadEvents(edg_wll_Context ctx,const edg_wll_LoadRequest *req,edg_wl
        if ( (fd = open(req->server_file, O_RDONLY)) == -1 )
                return edg_wll_SetError(ctx, errno, "Server can not open the file");
 
+       if (edg_wll_Transaction(ctx) != 0) 
+               return edg_wll_Error(ctx, NULL, NULL);
+
        memset(result,0,sizeof(*result));
        i = 0;
        while ( 1 )
        {
                /*      Read one line
                 */
-               if ( (readret = read_line(&line, &maxsize, fd)) == -1 )
+               if ( (readret = read_line(&line, &maxsize, fd)) == -1 ) {
+                       edg_wll_Rollback(ctx);
                        return edg_wll_SetError(ctx, errno, "reading dump file");
+               }
 
                if ( readret == 0 )
                        break;
@@ -84,11 +89,13 @@ int edg_wll_LoadEvents(edg_wll_Context ctx,const edg_wll_LoadRequest *req,edg_wl
                ctx->event_load = 1;
                if ( edg_wll_StoreEvent(ctx, event, NULL) )
                {
+                       char            *errdesc;
                        int             len = strlen(line),
                                        total = 0,
                                        written;
 
-                       fprintf(stderr, "Can't store event\n");
+                       edg_wll_Error(ctx, NULL, &errdesc);
+                       fprintf(stderr, "Can't store event: %s\n", errdesc);
                        if ( reject_fd == -1 )
                        {
                                char   *s, *s1;
@@ -164,6 +171,9 @@ cycle_clean:
        if ( reject_fd != -1 )
                close(reject_fd);
 
+       if (edg_wll_Commit(ctx) != 0)
+               return edg_wll_Error(ctx, NULL, NULL);
+
        return edg_wll_Error(ctx,NULL,NULL);
 }
 
index be1885a..55ce6df 100644 (file)
@@ -9,6 +9,7 @@
 #include "glite/lb/context-int.h"
 
 #include "store.h"
+#include "lbs_db.h"
 
 #ifdef __GNUC__
 #define UNUSED_VAR __attribute__((unused))
@@ -30,8 +31,15 @@ handle_request(edg_wll_Context ctx,char *buf)
     return EDG_WLL_IL_PROTO;
   }
 
+  if ((ret = edg_wll_Transaction(ctx) != 0)) goto err;
   ret = db_store(ctx, "NOT USED", event.data);
+  if (ret == 0) {
+    if ((ret = edg_wll_Commit(ctx)) != 0) goto err;
+  } else {
+    edg_wll_Rollback(ctx);
+  }
 
+err:
   if(event.data)
     free(event.data);
 
index 8403c9c..8258d26 100644 (file)
@@ -111,12 +111,20 @@ int edg_wll_StoreEvent(edg_wll_Context ctx,edg_wll_Event *e,int *seq)
                
                next = max && *max ? atoi(max)+1 : 0;
                
-       /* store an UNDEF event first in order to prevent race condition
-        * with readers */
+       /* 
+        * 1) when using transactions:
+        *   Store the whole event right now.
+        *
+        * 2) when not using transaction:
+        *   Store an UNDEF event first in order to prevent race condition
+        *   with readers and update event code later.
+        */
                trio_asprintf(&stmt,
                        "insert into events(jobid,event,code,prog,host,time_stamp,usec,arrived,level,userid) "
                        "values ('%|Ss',%d,%d,'%|Ss','%|Ss',%s,%d,%s,%d,'%|Ss')",
-                       jobid,next,EDG_WLL_EVENT_UNDEF,ssrc,e->any.host,
+                       jobid,next,
+                       ctx->use_transactions ? (int) e->any.type : EDG_WLL_EVENT_UNDEF,
+                       ssrc,e->any.host,
                        edg_wll_TimeToDB(e->any.timestamp.tv_sec),e->any.timestamp.tv_usec,
                        now_s, e->any.level,userid);
 
@@ -139,8 +147,8 @@ int edg_wll_StoreEvent(edg_wll_Context ctx,edg_wll_Event *e,int *seq)
                edg_wll_delete_event(ctx,jobid,next);
                edg_wll_SetError(ctx,oerr,desc);
                free(desc);
-       }
-       else {
+       } else
+       if (!ctx->use_transactions) {
        /* emulate commit, i.e. swith to a real event type to make
         * the record valid */
                trio_asprintf(&stmt,
@@ -184,7 +192,7 @@ int edg_wll_StoreEvent(edg_wll_Context ctx,edg_wll_Event *e,int *seq)
                                "more event records, what is that?");
                                break;
                }
-       }
+       } /* if !ctx->use_transactions */
 
        if (err == 0 && 
                e->any.type == EDG_WLL_EVENT_REGJOB &&
index 4277872..8234f99 100644 (file)
@@ -135,6 +135,9 @@ edg_wll_ErrorCode edg_wll_DBConnect(edg_wll_Context ctx, char*str) {
   return (edg_wll_ErrorCode)0;
 }
 
+int edg_wll_Transaction(edg_wll_Context ctx) { return 0; }
+int edg_wll_Commit(edg_wll_Context ctx) { return 0; }
+int edg_wll_Rollback(edg_wll_Context ctx) { return 0; }