From: Miloš Mulač Date: Fri, 14 Jul 2006 09:25:03 +0000 (+0000) Subject: buffered insert (under LB_BUF #ifdef) X-Git-Tag: gridsite-core_R_1_3_2~52 X-Git-Url: http://scientific.zcu.cz/git/?a=commitdiff_plain;h=5f81017a61898bcdf1f597170dc4485307f7773a;p=jra1mw.git buffered insert (under LB_BUF #ifdef) - for sharing - need more testing/tunning --- diff --git a/org.glite.lb.server/Makefile b/org.glite.lb.server/Makefile index 5e4dda7..0216210 100644 --- a/org.glite.lb.server/Makefile +++ b/org.glite.lb.server/Makefile @@ -45,6 +45,10 @@ ifdef LB_DAG_EMBRIONIC LB_DAG_FLAGS:=-DLB_DAG_EMBRIONIC endif +ifdef LB_BUF + LB_BUF_FLAGS:=-DLB_BUF +endif + ifeq ($(GLITE_LB_SERVER_WITH_WS),yes) WS_CFLAGS=-DGLITE_LB_SERVER_WITH_WS NSMAP=LoggingAndBookkeeping.nsmap @@ -85,7 +89,8 @@ CFLAGS:= \ -I${mysql_prefix}/include -I${mysql_prefix}/include/mysql \ -I${globus_prefix}/include/${nothrflavour} \ $(GRIDSITE_CFLAGS) \ - -D_GNU_SOURCE ${LB_STANDALONE_FLAGS} ${LB_PERF_FLAGS} ${LB_DAG_FLAGS} + -D_GNU_SOURCE ${LB_STANDALONE_FLAGS} ${LB_PERF_FLAGS} ${LB_DAG_FLAGS} \ + ${LB_BUF_FLAGS} ifdef LB_PROF SRVBONES_LIB:= ${stagedir}/lib/libglite_lb_server_bones.la diff --git a/org.glite.lb.server/interface/lbs_db.h b/org.glite.lb.server/interface/lbs_db.h index fc875fd..de477a3 100644 --- a/org.glite.lb.server/interface/lbs_db.h +++ b/org.glite.lb.server/interface/lbs_db.h @@ -30,15 +30,12 @@ edg_wll_ErrorCode edg_wll_DBConnect( typedef struct _edg_wll_bufInsert { edg_wll_Context ctx; char *table_name; - int num_cols; - char **columns; /* names of columns to be inserted into */ + char *columns; /* names of columns to be inserted into + * (values separated with commas) */ + char **rows; /* each row hold string of one row to be inserted + * (values separated with commas) */ long rec_num, /* actual number of rows in structure */ rec_size; /* approx. size of a real insert string */ - long *row_len; /* array holding length of each row */ - long *row_alloc; /* size of allocated space for each row */ - char **values; /* each row hold long string with values - * inserting into one column separated with - * commas */ long size_limit, /* size and # of records limit which trigger */ record_limit; /* real insert; zero means unlimitted */ } edg_wll_bufInsert; @@ -110,18 +107,18 @@ int edg_wll_Rollback(edg_wll_Context ctx); /** * Init data structure for buffered insert * - * takes num_cols string parameters as names of columns to be inserted into + * takes table_name and columns string for future multirow insert * when insert string oversize size_limit or number of rows to be inserted * overcome record_limit, the real insert is triggered */ -int edg_wll_bufferedInsertInit(edg_wll_Context ctx, edg_wll_bufInsert *bi, void *mysql, char *table_name, long size_limit, long record_limit, int num_cols, ...); +edg_wll_ErrorCode edg_wll_bufferedInsertInit(edg_wll_Context ctx, edg_wll_bufInsert *bi, void *mysql, char *table_name, long size_limit, long record_limit, char * columns); /** * adds row of n values into n columns into an insert buffer * if num. of rows or size of data oversteps the limits, real * multi-row insert is done */ -edg_wll_ErrorCode edg_wll_bufferedInsert(edg_wll_bufInsert *bi, ...); +edg_wll_ErrorCode edg_wll_bufferedInsert(edg_wll_bufInsert *bi, char **row); /** * flush buffered data and free bi structure diff --git a/org.glite.lb.server/src/jobstat.c b/org.glite.lb.server/src/jobstat.c index d152dee..69d633d 100644 --- a/org.glite.lb.server/src/jobstat.c +++ b/org.glite.lb.server/src/jobstat.c @@ -18,11 +18,11 @@ #include "store.h" #include "lock.h" #include "index.h" +#include "lbs_db.h" #include "jobstat.h" #include "lb_authz.h" #include "stats.h" - #define DAG_ENABLE 1 /* TBD: share in whole logging or workload */ @@ -585,7 +585,7 @@ static edg_wll_ErrorCode states_values_embryonic( if (edg_wll_IColumnsSQLPart(ctx, ctx->job_index_cols, stat, 1, icnames, &icvalues)) goto err; trio_asprintf(&stmt, - "('%|Ss',%d,%d,'%|Ss','%|Ss','%|Ss'%s)", + "'%|Ss',%d,%d,'%|Ss','%|Ss','%|Ss'%s", jobid_md5, stat->pub.state, 1, stat_enc, INTSTAT_VERSION, parent_md5, icvalues); free(icvalues); @@ -601,7 +601,8 @@ err: edg_wll_ErrorCode edg_wll_StoreIntStateEmbryonic(edg_wll_Context ctx, edg_wlc_JobId jobid, - const edg_wll_RegJobEvent *e) + const edg_wll_RegJobEvent *e, + edg_wll_bufInsert *bi) { char *values = NULL; char *stmt = NULL; @@ -614,15 +615,21 @@ edg_wll_ErrorCode edg_wll_StoreIntStateEmbryonic(edg_wll_Context ctx, edg_wll_UpdateStatistics(ctx, NULL, e, &jobstat.pub); if (ctx->rgma_export) write2rgma_status(&jobstat.pub); */ + +#ifdef LB_BUF + if (edg_wll_bufferedInsert(bi, &values)) + goto cleanup; +#else + trio_asprintf(&stmt, "insert into states" "(jobid,status,seq,int_status,version" ",parent_job%s) " - "values %s", + "values (%s)", icnames, values); if (edg_wll_ExecStmt(ctx,stmt,NULL) < 0) goto cleanup; - +#endif cleanup: free(icnames); diff --git a/org.glite.lb.server/src/jobstat.h b/org.glite.lb.server/src/jobstat.h index be29e0f..d424ed7 100644 --- a/org.glite.lb.server/src/jobstat.h +++ b/org.glite.lb.server/src/jobstat.h @@ -1,5 +1,7 @@ /* $Header$ */ +#include "lbs_db.h" + /* * Internal representation of job state * (includes edg_wll_JobStat API structure) @@ -56,7 +58,7 @@ edg_wll_ErrorCode edg_wll_IColumnsSQLPart(edg_wll_Context, void *, intJobStat *, edg_wll_ErrorCode edg_wll_RefreshIColumns(edg_wll_Context, void *); int edg_wll_intJobStatus( edg_wll_Context, const edg_wlc_JobId, int, intJobStat *, int); edg_wll_ErrorCode edg_wll_StoreIntState(edg_wll_Context, intJobStat *, int); -edg_wll_ErrorCode edg_wll_StoreIntStateEmbryonic(edg_wll_Context, edg_wlc_JobId, const edg_wll_RegJobEvent *e); +edg_wll_ErrorCode edg_wll_StoreIntStateEmbryonic(edg_wll_Context, edg_wlc_JobId, const edg_wll_RegJobEvent *e, edg_wll_bufInsert *bi); edg_wll_ErrorCode edg_wll_LoadIntState(edg_wll_Context , edg_wlc_JobId , int, intJobStat **); diff --git a/org.glite.lb.server/src/jobstat_supp.c b/org.glite.lb.server/src/jobstat_supp.c index 54351ac..88ae42b 100644 --- a/org.glite.lb.server/src/jobstat_supp.c +++ b/org.glite.lb.server/src/jobstat_supp.c @@ -17,8 +17,8 @@ #include "store.h" #include "index.h" -#include "jobstat.h" #include "lbs_db.h" +#include "jobstat.h" #include "get_events.h" diff --git a/org.glite.lb.server/src/lbs_db.c b/org.glite.lb.server/src/lbs_db.c index 464fce4..37f8f41 100644 --- a/org.glite.lb.server/src/lbs_db.c +++ b/org.glite.lb.server/src/lbs_db.c @@ -17,7 +17,6 @@ #include "glite/lb/context-int.h" #include "glite/lb/trio.h" - #define DEFAULTCS "lbserver/@localhost:lbserver20" #define my_err() edg_wll_SetError(ctx,EDG_WLL_ERROR_DB_CALL,mysql_error((MYSQL *) ctx->mysql)) @@ -329,35 +328,19 @@ err: } -int edg_wll_bufferedInsertInit(edg_wll_Context ctx, edg_wll_bufInsert *bi, void *mysql, char *table_name, long size_limit, long record_limit, int num_cols, ...) +edg_wll_ErrorCode edg_wll_bufferedInsertInit(edg_wll_Context ctx, edg_wll_bufInsert *bi, void *mysql, char *table_name, long size_limit, long record_limit, char *columns) { - va_list l; - long i; - - - assert(num_cols != 1); // XXX: I do not know one column multi-row insert :( - va_start(l, num_cols); - bi->ctx = ctx; bi->table_name = strdup(table_name); - bi->num_cols = num_cols; - bi->columns = calloc(num_cols, sizeof(*(bi->columns)) ); + bi->columns = strdup(columns); bi->rec_num = 0; bi->rec_size = 0; - bi->row_len = calloc(num_cols, sizeof(*(bi->row_len)) ); - bi->row_alloc = calloc(num_cols, sizeof(*(bi->row_alloc)) ); - bi->values = calloc(num_cols, sizeof(*(bi->values)) ); + bi->rows = calloc(record_limit, sizeof(*(bi->rows)) );; bi->size_limit = size_limit; bi->record_limit = record_limit; - for (i=0; i < num_cols; i++) { - bi->columns[i] = strdup(va_arg(l,char *)); - bi->row_len[i] = 0; - bi->row_alloc[i] = 0; - } - - va_end(l); - return 0; + return edg_wll_Error(bi->ctx,NULL,NULL); +; } @@ -386,24 +369,24 @@ static int string_add(char *what, long *used_size, long *alloc_size, char **wher static int flush_bufferd_insert(edg_wll_bufInsert *bi) { - char *stmt, *cols, *vals, *temp; + char *stmt, *vals, *temp; long i; - asprintf(&cols,"%s", bi->columns[0]); - asprintf(&vals,"%s", bi->values[0]); - for (i=1; i < bi->num_cols; i++) { - asprintf(&temp,"%s,%s", cols, bi->columns[i]); - free(cols); cols = temp; temp = NULL; - asprintf(&temp,"%s),(%s", vals, bi->values[i]); + + if (!bi->rec_num) + return edg_wll_Error(bi->ctx,NULL,NULL); + + asprintf(&vals,"(%s)", bi->rows[0]); + for (i=1; i < bi->rec_num; i++) { + // XXX: use string add (preallocated memory) + asprintf(&temp,"%s,(%s)", vals, bi->rows[i]); free(vals); vals = temp; temp = NULL; - + free(bi->rows[i]); + bi->rows[i] = NULL; } - trio_asprintf(&stmt, "insert into %|Ss(%|Ss) values (%|Ss);", - bi->table_name, cols, vals); - - // XXX: remove after testing - fprintf(stderr,"\n%s\n",stmt); + trio_asprintf(&stmt, "insert into %|Ss(%|Ss) values %s;", + bi->table_name, bi->columns, vals); if (edg_wll_ExecStmt(bi->ctx,stmt,NULL) < 0) { if (edg_wll_Error(bi->ctx,NULL,NULL) == EEXIST) @@ -413,10 +396,7 @@ static int flush_bufferd_insert(edg_wll_bufInsert *bi) /* reset bi counters */ bi->rec_size = 0; bi->rec_num = 0; - for (i=0; i < bi->num_cols; i++) - bi->row_len[i] = 0; - free(cols); free(vals); free(stmt); @@ -428,26 +408,13 @@ static int flush_bufferd_insert(edg_wll_bufInsert *bi) * adds row of n values into n columns into an insert buffer * if num. of rows or size of data oversteps the limits, real * multi-row insert is done + * Eats the row! No need to free it after the call :) */ -edg_wll_ErrorCode edg_wll_bufferedInsert(edg_wll_bufInsert *bi, ...) +edg_wll_ErrorCode edg_wll_bufferedInsert(edg_wll_bufInsert *bi, char **row) { - va_list l; - long i; - - - va_start(l, bi); - - bi->rec_size = 0; - for (i=0; i < bi->num_cols; i++) { - if (bi->rec_num) { - if (string_add(",", &bi->row_len[i], &bi->row_alloc[i], &bi->values[i])) - return edg_wll_SetError(bi->ctx,ENOMEM,NULL);; - } - if (string_add(va_arg(l,char *), &bi->row_len[i], &bi->row_alloc[i], &bi->values[i])) - return edg_wll_SetError(bi->ctx,ENOMEM,NULL);; - bi->rec_size += bi->row_len[i]; - } - bi->rec_num++; + bi->rows[bi->rec_num++] = *row; + bi->rec_size += strlen(*row); + *row = NULL; // just to avoid freeing by caller function if ((bi->size_limit && bi->rec_size >= bi->size_limit) || (bi->record_limit && bi->rec_num >= bi->record_limit)) @@ -456,22 +423,18 @@ edg_wll_ErrorCode edg_wll_bufferedInsert(edg_wll_bufInsert *bi, ...) return edg_wll_Error(bi->ctx,NULL,NULL); } - va_end(l); return edg_wll_ResetError(bi->ctx); } static void free_buffered_insert(edg_wll_bufInsert *bi) { long i; - + free(bi->table_name); - for (i=0; i < bi->num_cols; i++) { - free(bi->columns[i]); - free(bi->values[i]); - } free(bi->columns); - free(bi->values); - free(bi->row_len); - free(bi->row_alloc); + for (i=0; i < bi->rec_num; i++) { + free(bi->rows[i]); + } + free(bi->rows); } edg_wll_ErrorCode edg_wll_bufferedInsertClose(edg_wll_bufInsert *bi) @@ -480,6 +443,7 @@ edg_wll_ErrorCode edg_wll_bufferedInsertClose(edg_wll_bufInsert *bi) return edg_wll_Error(bi->ctx,NULL,NULL); free_buffered_insert(bi); + return edg_wll_ResetError(bi->ctx); } diff --git a/org.glite.lb.server/src/store.c.T b/org.glite.lb.server/src/store.c.T index a0c213a..0a9a7c4 100644 --- a/org.glite.lb.server/src/store.c.T +++ b/org.glite.lb.server/src/store.c.T @@ -36,6 +36,7 @@ static int store_user(edg_wll_Context,const char *,const char *); static int store_job(edg_wll_Context,const edg_wlc_JobId,const char *); +static int store_job_block(edg_wll_Context, const edg_wlc_JobId, const char *, edg_wll_bufInsert *); static int store_flesh(edg_wll_Context,edg_wll_Event *,char *,int); static int store_seq(edg_wll_Context,edg_wll_Event *,int); static int check_dup(edg_wll_Context,edg_wll_Event *); @@ -261,6 +262,27 @@ static int store_job(edg_wll_Context ctx,const edg_wlc_JobId job,const char *use return edg_wll_Error(ctx,NULL,NULL); } +static int store_job_block(edg_wll_Context ctx,const edg_wlc_JobId job,const char *userid, edg_wll_bufInsert *bi) +{ + char *jobstr = edg_wlc_JobIdUnparse(job); + char *jobid = edg_wlc_JobIdGetUnique(job); + char *row; + +/* debug Duplicate key on index: Duplicate entry '(nil)' for key 1 + */ + if (jobid == NULL || jobstr == NULL) + return edg_wll_SetError(ctx,EINVAL,"store_jobi_block()"); + + edg_wll_ResetError(ctx); + + trio_asprintf(&row, "'%|Ss','%|Ss','%|Ss'", jobid,jobstr,userid); + edg_wll_bufferedInsert(bi, &row); // no need to free row + + free(jobstr); + free(jobid); + return edg_wll_Error(ctx,NULL,NULL); +} + /* * XXX: store it in SHORT_FIELDS for now despite it should go to dedicated * column in EVENTS. @@ -571,13 +593,32 @@ static int register_subjobs(edg_wll_Context ctx,const edg_wll_RegJobEvent *e) static int register_subjobs_embryonic(edg_wll_Context ctx,const edg_wll_RegJobEvent *e) { - int i, err = 0; - edg_wlc_JobId *subjobs; - struct timeval now; - char *userid = strdup(strmd5(e->user,NULL)); + int i, err = 0; + edg_wlc_JobId *subjobs; + struct timeval now; + char *userid = strdup(strmd5(e->user,NULL)); + edg_wll_bufInsert bi_j, bi_s; + edg_wll_bufInsert *bi_jobs = &bi_j, *bi_states = &bi_s; edg_wll_ResetError(ctx); + +#ifdef LB_BUF + /* init multirows insert mechanism for tables used here */ + if (edg_wll_bufferedInsertInit(ctx, bi_jobs, NULL, "jobs", 100000, 1000, + "jobid, dg_jobid, userid")) + { + return edg_wll_SetError(ctx, EINVAL, "edg_wll_bufferedInsertInit()"); + } + if (edg_wll_bufferedInsertInit(ctx, bi_states, NULL, "states", 1000000, 1000, + "jobid, status, seq,int_status, version, parent_job, `STD_location`," + "`STD_owner`, `STD_destination`, `TIME_Cleared`, `TIME_Aborted`," + "`TIME_Cancelled`, `TIME_Submitted`")) + { + return edg_wll_SetError(ctx, EINVAL, "edg_wll_bufferedInsertInit()"); + } +#endif + if (e->nsubjobs == 0) return 0; if (e->nsubjobs < 0) return edg_wll_SetError(ctx,EINVAL,"negative number of subjobs"); @@ -595,15 +636,19 @@ static int register_subjobs_embryonic(edg_wll_Context ctx,const edg_wll_RegJobEv char *et,*ed,*job_s; /* save jobid-userid relation into jobs table */ +#ifdef LB_BUF + if ((err = store_job_block(ctx, subjobs[i], userid, bi_jobs))) +#else if ((err = store_job(ctx, subjobs[i], userid))) +#endif edg_wll_Error(ctx,&et,&ed); - if (!err && (err = edg_wll_StoreIntStateEmbryonic(ctx, subjobs[i], e))) + if (!err && (err = edg_wll_StoreIntStateEmbryonic(ctx, subjobs[i], e, bi_states))) edg_wll_Error(ctx,&et,&ed); -job_s = edg_wlc_JobIdUnparse(subjobs[i]); -printf("%s\n", job_s); -free(job_s); +//job_s = edg_wlc_JobIdUnparse(subjobs[i]); +//printf("%s\n", job_s); +//free(job_s); if (err) { job_s = edg_wlc_JobIdUnparse(subjobs[i]); @@ -617,6 +662,12 @@ free(job_s); free(subjobs); +#ifdef LB_BUF + /* commit the rest of multirows insert and clean structures */ + edg_wll_bufferedInsertClose(bi_jobs); + edg_wll_bufferedInsertClose(bi_states); +#endif + return edg_wll_Error(ctx,NULL,NULL); }