buffered insert (under LB_BUF #ifdef)
authorMiloš Mulač <mulac@civ.zcu.cz>
Fri, 14 Jul 2006 09:25:03 +0000 (09:25 +0000)
committerMiloš Mulač <mulac@civ.zcu.cz>
Fri, 14 Jul 2006 09:25:03 +0000 (09:25 +0000)
- for sharing
- need more testing/tunning

org.glite.lb.server/Makefile
org.glite.lb.server/interface/lbs_db.h
org.glite.lb.server/src/jobstat.c
org.glite.lb.server/src/jobstat.h
org.glite.lb.server/src/jobstat_supp.c
org.glite.lb.server/src/lbs_db.c
org.glite.lb.server/src/store.c.T

index 5e4dda7..0216210 100644 (file)
@@ -45,6 +45,10 @@ ifdef LB_DAG_EMBRIONIC
        LB_DAG_FLAGS:=-DLB_DAG_EMBRIONIC
 endif
 
+ifdef LB_BUF
+       LB_BUF_FLAGS:=-DLB_BUF
+endif
+
 ifeq ($(GLITE_LB_SERVER_WITH_WS),yes)
        WS_CFLAGS=-DGLITE_LB_SERVER_WITH_WS 
        NSMAP=LoggingAndBookkeeping.nsmap
@@ -85,7 +89,8 @@ CFLAGS:= \
        -I${mysql_prefix}/include -I${mysql_prefix}/include/mysql \
        -I${globus_prefix}/include/${nothrflavour} \
        $(GRIDSITE_CFLAGS) \
-       -D_GNU_SOURCE ${LB_STANDALONE_FLAGS} ${LB_PERF_FLAGS} ${LB_DAG_FLAGS}
+       -D_GNU_SOURCE ${LB_STANDALONE_FLAGS} ${LB_PERF_FLAGS} ${LB_DAG_FLAGS} \
+       ${LB_BUF_FLAGS}
 
 ifdef LB_PROF
        SRVBONES_LIB:= ${stagedir}/lib/libglite_lb_server_bones.la
index fc875fd..de477a3 100644 (file)
@@ -30,15 +30,12 @@ edg_wll_ErrorCode edg_wll_DBConnect(
 typedef struct _edg_wll_bufInsert {
        edg_wll_Context ctx;
        char    *table_name;
-       int     num_cols;
-       char    **columns;      /* names of columns to be inserted into */
+       char    *columns;       /* names of columns to be inserted into 
+                                * (values separated with commas) */
+       char    **rows;         /* each row hold string of one row to be inserted
+                                * (values separated with commas) */
        long    rec_num,        /* actual number of rows in structure */
                rec_size;       /* approx. size of a real insert string */
-       long    *row_len;       /* array holding length of each row */
-       long    *row_alloc;     /* size of allocated space for each row */
-       char    **values;       /* each row hold long string with values
-                                * inserting into one column separated with 
-                                * commas */
        long    size_limit,     /* size and # of records limit which trigger */
                record_limit;   /* real insert; zero means unlimitted */
 } edg_wll_bufInsert;
@@ -110,18 +107,18 @@ int edg_wll_Rollback(edg_wll_Context ctx);
 /**
  * Init data structure for buffered insert
  *
- * takes num_cols string parameters as names of columns to be inserted into
+ * takes table_name and columns string for future multirow insert
  * when insert string oversize size_limit or number of rows to be inserted
  * overcome record_limit, the real insert is triggered
  */
-int edg_wll_bufferedInsertInit(edg_wll_Context ctx, edg_wll_bufInsert *bi, void *mysql, char *table_name, long size_limit, long record_limit, int num_cols, ...);
+edg_wll_ErrorCode edg_wll_bufferedInsertInit(edg_wll_Context ctx, edg_wll_bufInsert *bi, void *mysql, char *table_name, long size_limit, long record_limit, char * columns);
 
 /**
  * adds row of n values into n columns into an insert buffer
  * if num. of rows or size of data oversteps the limits, real
  * multi-row insert is done
  */
-edg_wll_ErrorCode edg_wll_bufferedInsert(edg_wll_bufInsert *bi, ...);
+edg_wll_ErrorCode edg_wll_bufferedInsert(edg_wll_bufInsert *bi, char **row);
 
 /**
  * flush buffered data and free bi structure
index d152dee..69d633d 100644 (file)
 #include "store.h"
 #include "lock.h"
 #include "index.h"
+#include "lbs_db.h"
 #include "jobstat.h"
 #include "lb_authz.h"
 #include "stats.h"
 
-
 #define DAG_ENABLE     1
 
 /* TBD: share in whole logging or workload */
@@ -585,7 +585,7 @@ static edg_wll_ErrorCode states_values_embryonic(
 
        if (edg_wll_IColumnsSQLPart(ctx, ctx->job_index_cols, stat, 1, icnames, &icvalues)) goto err;
        trio_asprintf(&stmt,
-               "('%|Ss',%d,%d,'%|Ss','%|Ss','%|Ss'%s)",
+               "'%|Ss',%d,%d,'%|Ss','%|Ss','%|Ss'%s",
                jobid_md5, stat->pub.state, 1, stat_enc,
                INTSTAT_VERSION, parent_md5, icvalues);
        free(icvalues);
@@ -601,7 +601,8 @@ err:
 
 edg_wll_ErrorCode edg_wll_StoreIntStateEmbryonic(edg_wll_Context ctx,
         edg_wlc_JobId jobid,
-        const edg_wll_RegJobEvent *e)
+        const edg_wll_RegJobEvent *e,
+       edg_wll_bufInsert *bi)
 {
        char *values = NULL;
        char *stmt = NULL;
@@ -614,15 +615,21 @@ edg_wll_ErrorCode edg_wll_StoreIntStateEmbryonic(edg_wll_Context ctx,
                edg_wll_UpdateStatistics(ctx, NULL, e, &jobstat.pub);
                if (ctx->rgma_export) write2rgma_status(&jobstat.pub);
 */
+
+#ifdef LB_BUF
+       if (edg_wll_bufferedInsert(bi, &values))
+               goto cleanup;
+#else
+
        trio_asprintf(&stmt,
                "insert into states"
                "(jobid,status,seq,int_status,version"
                        ",parent_job%s) "
-               "values %s",
+               "values (%s)",
                icnames, values);
 
        if (edg_wll_ExecStmt(ctx,stmt,NULL) < 0) goto cleanup;
-
+#endif
 
 cleanup:
        free(icnames);
index be29e0f..d424ed7 100644 (file)
@@ -1,5 +1,7 @@
 /* $Header$ */
 
+#include "lbs_db.h"
+
 /*
  * Internal representation of job state
  * (includes edg_wll_JobStat API structure)
@@ -56,7 +58,7 @@ edg_wll_ErrorCode edg_wll_IColumnsSQLPart(edg_wll_Context, void *, intJobStat *,
 edg_wll_ErrorCode edg_wll_RefreshIColumns(edg_wll_Context, void *);
 int edg_wll_intJobStatus( edg_wll_Context, const edg_wlc_JobId, int, intJobStat *, int);
 edg_wll_ErrorCode edg_wll_StoreIntState(edg_wll_Context, intJobStat *, int);
-edg_wll_ErrorCode edg_wll_StoreIntStateEmbryonic(edg_wll_Context, edg_wlc_JobId, const edg_wll_RegJobEvent *e);
+edg_wll_ErrorCode edg_wll_StoreIntStateEmbryonic(edg_wll_Context, edg_wlc_JobId, const edg_wll_RegJobEvent *e, edg_wll_bufInsert *bi);
 edg_wll_ErrorCode edg_wll_LoadIntState(edg_wll_Context , edg_wlc_JobId , int, intJobStat **);
 
 
index 54351ac..88ae42b 100644 (file)
@@ -17,8 +17,8 @@
 
 #include "store.h"
 #include "index.h"
-#include "jobstat.h"
 #include "lbs_db.h"
+#include "jobstat.h"
 #include "get_events.h"
 
 
index 464fce4..37f8f41 100644 (file)
@@ -17,7 +17,6 @@
 #include "glite/lb/context-int.h"
 #include "glite/lb/trio.h"
 
-
 #define DEFAULTCS      "lbserver/@localhost:lbserver20"
 
 #define my_err() edg_wll_SetError(ctx,EDG_WLL_ERROR_DB_CALL,mysql_error((MYSQL *) ctx->mysql))
@@ -329,35 +328,19 @@ err:
 }
 
 
-int edg_wll_bufferedInsertInit(edg_wll_Context ctx, edg_wll_bufInsert *bi, void *mysql, char *table_name, long size_limit, long record_limit, int num_cols, ...)
+edg_wll_ErrorCode edg_wll_bufferedInsertInit(edg_wll_Context ctx, edg_wll_bufInsert *bi, void *mysql, char *table_name, long size_limit, long record_limit, char *columns)
 {
-       va_list l;
-       long    i;
-
-
-       assert(num_cols != 1);  // XXX: I do not know one column multi-row insert :(
-       va_start(l, num_cols);
-       
        bi->ctx = ctx;
        bi->table_name = strdup(table_name);
-       bi->num_cols = num_cols;
-       bi->columns = calloc(num_cols, sizeof(*(bi->columns)) );
+       bi->columns = strdup(columns);
        bi->rec_num = 0;
        bi->rec_size = 0;
-       bi->row_len = calloc(num_cols, sizeof(*(bi->row_len)) );
-       bi->row_alloc = calloc(num_cols, sizeof(*(bi->row_alloc)) );
-       bi->values = calloc(num_cols, sizeof(*(bi->values)) );
+       bi->rows = calloc(record_limit, sizeof(*(bi->rows)) );;
        bi->size_limit = size_limit;
        bi->record_limit = record_limit;
 
-       for (i=0; i < num_cols; i++) {
-               bi->columns[i] = strdup(va_arg(l,char *));      
-               bi->row_len[i] = 0;
-               bi->row_alloc[i] = 0;
-       }
-
-       va_end(l);
-       return 0;
+       return edg_wll_Error(bi->ctx,NULL,NULL);
+;
 }
 
 
@@ -386,24 +369,24 @@ static int string_add(char *what, long *used_size, long *alloc_size, char **wher
 
 static int flush_bufferd_insert(edg_wll_bufInsert *bi)
 {
-       char *stmt, *cols, *vals, *temp;
+       char *stmt, *vals, *temp;
        long i;
 
-       asprintf(&cols,"%s", bi->columns[0]);
-       asprintf(&vals,"%s", bi->values[0]);
-       for (i=1; i < bi->num_cols; i++) {
-               asprintf(&temp,"%s,%s", cols, bi->columns[i]);
-               free(cols); cols = temp; temp = NULL;
-               asprintf(&temp,"%s),(%s", vals, bi->values[i]);
+
+       if (!bi->rec_num)
+               return edg_wll_Error(bi->ctx,NULL,NULL);
+
+       asprintf(&vals,"(%s)", bi->rows[0]);
+       for (i=1; i < bi->rec_num; i++) {
+               // XXX:  use string add (preallocated memory)
+               asprintf(&temp,"%s,(%s)", vals, bi->rows[i]);
                free(vals); vals = temp; temp = NULL;
-               
+               free(bi->rows[i]);
+               bi->rows[i] = NULL;
        }
        
-       trio_asprintf(&stmt, "insert into %|Ss(%|Ss) values (%|Ss);",
-               bi->table_name, cols, vals);
-
-       // XXX: remove after testing
-       fprintf(stderr,"\n%s\n",stmt);
+       trio_asprintf(&stmt, "insert into %|Ss(%|Ss) values %s;",
+               bi->table_name, bi->columns, vals);
 
        if (edg_wll_ExecStmt(bi->ctx,stmt,NULL) < 0) {
                 if (edg_wll_Error(bi->ctx,NULL,NULL) == EEXIST)
@@ -413,10 +396,7 @@ static int flush_bufferd_insert(edg_wll_bufInsert *bi)
        /* reset bi counters */
        bi->rec_size = 0;
        bi->rec_num = 0;
-       for (i=0; i < bi->num_cols; i++) 
-               bi->row_len[i] = 0;
        
-       free(cols);
        free(vals);
        free(stmt);
 
@@ -428,26 +408,13 @@ static int flush_bufferd_insert(edg_wll_bufInsert *bi)
  * adds row of n values into n columns into an insert buffer
  * if num. of rows or size of data oversteps the limits, real
  * multi-row insert is done
+ * Eats the row! No need to free it after the call :)
  */
-edg_wll_ErrorCode edg_wll_bufferedInsert(edg_wll_bufInsert *bi, ...)
+edg_wll_ErrorCode edg_wll_bufferedInsert(edg_wll_bufInsert *bi, char **row)
 {
-       va_list l;
-       long    i;
-
-
-       va_start(l, bi);
-
-       bi->rec_size = 0;
-       for (i=0; i < bi->num_cols; i++) {
-               if (bi->rec_num) { 
-                       if (string_add(",", &bi->row_len[i], &bi->row_alloc[i], &bi->values[i]))
-                               return edg_wll_SetError(bi->ctx,ENOMEM,NULL);;
-               }
-               if (string_add(va_arg(l,char *), &bi->row_len[i], &bi->row_alloc[i], &bi->values[i]))
-                       return edg_wll_SetError(bi->ctx,ENOMEM,NULL);;
-               bi->rec_size += bi->row_len[i];
-       }
-       bi->rec_num++;
+       bi->rows[bi->rec_num++] = *row;
+       bi->rec_size += strlen(*row);
+       *row = NULL;    // just to avoid freeing by caller function
 
        if ((bi->size_limit && bi->rec_size >= bi->size_limit) ||
                (bi->record_limit && bi->rec_num >= bi->record_limit))
@@ -456,22 +423,18 @@ edg_wll_ErrorCode edg_wll_bufferedInsert(edg_wll_bufInsert *bi, ...)
                        return edg_wll_Error(bi->ctx,NULL,NULL);
        }
 
-       va_end(l);
        return edg_wll_ResetError(bi->ctx);
 }
 
 static void free_buffered_insert(edg_wll_bufInsert *bi) {
        long i;
-       
+
        free(bi->table_name);
-       for (i=0; i < bi->num_cols; i++) {
-               free(bi->columns[i]);
-               free(bi->values[i]);
-       }
        free(bi->columns);
-       free(bi->values);
-       free(bi->row_len);
-       free(bi->row_alloc);
+       for (i=0; i < bi->rec_num; i++) {
+               free(bi->rows[i]);
+       }
+       free(bi->rows);
 }
 
 edg_wll_ErrorCode edg_wll_bufferedInsertClose(edg_wll_bufInsert *bi)
@@ -480,6 +443,7 @@ edg_wll_ErrorCode edg_wll_bufferedInsertClose(edg_wll_bufInsert *bi)
                return edg_wll_Error(bi->ctx,NULL,NULL);
        free_buffered_insert(bi);
 
+
        return edg_wll_ResetError(bi->ctx);
 }
 
index a0c213a..0a9a7c4 100644 (file)
@@ -36,6 +36,7 @@
 
 static int store_user(edg_wll_Context,const char *,const char *); 
 static int store_job(edg_wll_Context,const edg_wlc_JobId,const char *);
+static int store_job_block(edg_wll_Context, const edg_wlc_JobId, const char *, edg_wll_bufInsert *);
 static int store_flesh(edg_wll_Context,edg_wll_Event *,char *,int);
 static int store_seq(edg_wll_Context,edg_wll_Event *,int);
 static int check_dup(edg_wll_Context,edg_wll_Event *);
@@ -261,6 +262,27 @@ static int store_job(edg_wll_Context ctx,const edg_wlc_JobId job,const char *use
        return edg_wll_Error(ctx,NULL,NULL);
 }
 
+static int store_job_block(edg_wll_Context ctx,const edg_wlc_JobId job,const char *userid, edg_wll_bufInsert *bi)
+{
+       char *jobstr = edg_wlc_JobIdUnparse(job);
+       char *jobid = edg_wlc_JobIdGetUnique(job);
+       char *row;
+
+/* debug Duplicate key on index: Duplicate entry '(nil)' for key 1
+ */
+       if (jobid == NULL || jobstr == NULL) 
+               return edg_wll_SetError(ctx,EINVAL,"store_jobi_block()");
+
+       edg_wll_ResetError(ctx);
+
+       trio_asprintf(&row, "'%|Ss','%|Ss','%|Ss'", jobid,jobstr,userid);
+       edg_wll_bufferedInsert(bi, &row); // no need to free row
+
+       free(jobstr);
+       free(jobid);
+       return edg_wll_Error(ctx,NULL,NULL);
+}
+
 /*
  * XXX: store it in SHORT_FIELDS for now despite it should go to dedicated
  *     column in EVENTS.
@@ -571,13 +593,32 @@ static int register_subjobs(edg_wll_Context ctx,const edg_wll_RegJobEvent *e)
 
 static int register_subjobs_embryonic(edg_wll_Context ctx,const edg_wll_RegJobEvent *e)
 {
-       int             i, err = 0;
-       edg_wlc_JobId   *subjobs;
-       struct timeval  now;
-       char            *userid = strdup(strmd5(e->user,NULL));
+       int                     i, err = 0;
+       edg_wlc_JobId           *subjobs;
+       struct timeval          now;
+       char                    *userid = strdup(strmd5(e->user,NULL));
+       edg_wll_bufInsert       bi_j, bi_s;
+       edg_wll_bufInsert       *bi_jobs = &bi_j, *bi_states = &bi_s;
 
 
        edg_wll_ResetError(ctx);
+
+#ifdef LB_BUF
+       /* init multirows insert mechanism for tables used here */
+       if (edg_wll_bufferedInsertInit(ctx, bi_jobs, NULL, "jobs", 100000, 1000,
+               "jobid, dg_jobid, userid"))
+       {
+               return edg_wll_SetError(ctx, EINVAL, "edg_wll_bufferedInsertInit()");
+       }
+       if (edg_wll_bufferedInsertInit(ctx, bi_states, NULL, "states", 1000000, 1000, 
+               "jobid, status, seq,int_status, version, parent_job, `STD_location`,"
+               "`STD_owner`, `STD_destination`, `TIME_Cleared`, `TIME_Aborted`,"
+               "`TIME_Cancelled`, `TIME_Submitted`"))
+       {
+                return edg_wll_SetError(ctx, EINVAL, "edg_wll_bufferedInsertInit()");
+       }
+#endif         
+
        if (e->nsubjobs == 0) return 0;
        if (e->nsubjobs < 0) return edg_wll_SetError(ctx,EINVAL,"negative number of subjobs");
 
@@ -595,15 +636,19 @@ static int register_subjobs_embryonic(edg_wll_Context ctx,const edg_wll_RegJobEv
                char            *et,*ed,*job_s;
 
                /* save jobid-userid relation into jobs table */
+#ifdef LB_BUF
+               if ((err = store_job_block(ctx, subjobs[i], userid, bi_jobs)))
+#else
                if ((err = store_job(ctx, subjobs[i], userid)))
+#endif
                        edg_wll_Error(ctx,&et,&ed);
 
-               if (!err && (err = edg_wll_StoreIntStateEmbryonic(ctx, subjobs[i], e)))
+               if (!err && (err = edg_wll_StoreIntStateEmbryonic(ctx, subjobs[i], e, bi_states)))
                        edg_wll_Error(ctx,&et,&ed);
 
-job_s = edg_wlc_JobIdUnparse(subjobs[i]);
-printf("%s\n", job_s);
-free(job_s);
+//job_s = edg_wlc_JobIdUnparse(subjobs[i]);
+//printf("%s\n", job_s);
+//free(job_s);
 
                if (err) {
                        job_s = edg_wlc_JobIdUnparse(subjobs[i]);
@@ -617,6 +662,12 @@ free(job_s);
 
        free(subjobs);
 
+#ifdef LB_BUF
+       /* commit the rest of multirows insert and clean structures */
+       edg_wll_bufferedInsertClose(bi_jobs);
+       edg_wll_bufferedInsertClose(bi_states);
+#endif
+
        return edg_wll_Error(ctx,NULL,NULL);
 }