LB_DAG_FLAGS:=-DLB_DAG_EMBRIONIC
endif
+ifdef LB_BUF
+ LB_BUF_FLAGS:=-DLB_BUF
+endif
+
ifeq ($(GLITE_LB_SERVER_WITH_WS),yes)
WS_CFLAGS=-DGLITE_LB_SERVER_WITH_WS
NSMAP=LoggingAndBookkeeping.nsmap
-I${mysql_prefix}/include -I${mysql_prefix}/include/mysql \
-I${globus_prefix}/include/${nothrflavour} \
$(GRIDSITE_CFLAGS) \
- -D_GNU_SOURCE ${LB_STANDALONE_FLAGS} ${LB_PERF_FLAGS} ${LB_DAG_FLAGS}
+ -D_GNU_SOURCE ${LB_STANDALONE_FLAGS} ${LB_PERF_FLAGS} ${LB_DAG_FLAGS} \
+ ${LB_BUF_FLAGS}
ifdef LB_PROF
SRVBONES_LIB:= ${stagedir}/lib/libglite_lb_server_bones.la
typedef struct _edg_wll_bufInsert {
edg_wll_Context ctx;
char *table_name;
- int num_cols;
- char **columns; /* names of columns to be inserted into */
+ char *columns; /* names of columns to be inserted into
+ * (values separated with commas) */
+ char **rows; /* each row hold string of one row to be inserted
+ * (values separated with commas) */
long rec_num, /* actual number of rows in structure */
rec_size; /* approx. size of a real insert string */
- long *row_len; /* array holding length of each row */
- long *row_alloc; /* size of allocated space for each row */
- char **values; /* each row hold long string with values
- * inserting into one column separated with
- * commas */
long size_limit, /* size and # of records limit which trigger */
record_limit; /* real insert; zero means unlimitted */
} edg_wll_bufInsert;
/**
* Init data structure for buffered insert
*
- * takes num_cols string parameters as names of columns to be inserted into
+ * takes table_name and columns string for future multirow insert
* when insert string oversize size_limit or number of rows to be inserted
* overcome record_limit, the real insert is triggered
*/
-int edg_wll_bufferedInsertInit(edg_wll_Context ctx, edg_wll_bufInsert *bi, void *mysql, char *table_name, long size_limit, long record_limit, int num_cols, ...);
+edg_wll_ErrorCode edg_wll_bufferedInsertInit(edg_wll_Context ctx, edg_wll_bufInsert *bi, void *mysql, char *table_name, long size_limit, long record_limit, char * columns);
/**
* adds row of n values into n columns into an insert buffer
* if num. of rows or size of data oversteps the limits, real
* multi-row insert is done
*/
-edg_wll_ErrorCode edg_wll_bufferedInsert(edg_wll_bufInsert *bi, ...);
+edg_wll_ErrorCode edg_wll_bufferedInsert(edg_wll_bufInsert *bi, char **row);
/**
* flush buffered data and free bi structure
#include "store.h"
#include "lock.h"
#include "index.h"
+#include "lbs_db.h"
#include "jobstat.h"
#include "lb_authz.h"
#include "stats.h"
-
#define DAG_ENABLE 1
/* TBD: share in whole logging or workload */
if (edg_wll_IColumnsSQLPart(ctx, ctx->job_index_cols, stat, 1, icnames, &icvalues)) goto err;
trio_asprintf(&stmt,
- "('%|Ss',%d,%d,'%|Ss','%|Ss','%|Ss'%s)",
+ "'%|Ss',%d,%d,'%|Ss','%|Ss','%|Ss'%s",
jobid_md5, stat->pub.state, 1, stat_enc,
INTSTAT_VERSION, parent_md5, icvalues);
free(icvalues);
edg_wll_ErrorCode edg_wll_StoreIntStateEmbryonic(edg_wll_Context ctx,
edg_wlc_JobId jobid,
- const edg_wll_RegJobEvent *e)
+ const edg_wll_RegJobEvent *e,
+ edg_wll_bufInsert *bi)
{
char *values = NULL;
char *stmt = NULL;
edg_wll_UpdateStatistics(ctx, NULL, e, &jobstat.pub);
if (ctx->rgma_export) write2rgma_status(&jobstat.pub);
*/
+
+#ifdef LB_BUF
+ if (edg_wll_bufferedInsert(bi, &values))
+ goto cleanup;
+#else
+
trio_asprintf(&stmt,
"insert into states"
"(jobid,status,seq,int_status,version"
",parent_job%s) "
- "values %s",
+ "values (%s)",
icnames, values);
if (edg_wll_ExecStmt(ctx,stmt,NULL) < 0) goto cleanup;
-
+#endif
cleanup:
free(icnames);
/* $Header$ */
+#include "lbs_db.h"
+
/*
* Internal representation of job state
* (includes edg_wll_JobStat API structure)
edg_wll_ErrorCode edg_wll_RefreshIColumns(edg_wll_Context, void *);
int edg_wll_intJobStatus( edg_wll_Context, const edg_wlc_JobId, int, intJobStat *, int);
edg_wll_ErrorCode edg_wll_StoreIntState(edg_wll_Context, intJobStat *, int);
-edg_wll_ErrorCode edg_wll_StoreIntStateEmbryonic(edg_wll_Context, edg_wlc_JobId, const edg_wll_RegJobEvent *e);
+edg_wll_ErrorCode edg_wll_StoreIntStateEmbryonic(edg_wll_Context, edg_wlc_JobId, const edg_wll_RegJobEvent *e, edg_wll_bufInsert *bi);
edg_wll_ErrorCode edg_wll_LoadIntState(edg_wll_Context , edg_wlc_JobId , int, intJobStat **);
#include "store.h"
#include "index.h"
-#include "jobstat.h"
#include "lbs_db.h"
+#include "jobstat.h"
#include "get_events.h"
#include "glite/lb/context-int.h"
#include "glite/lb/trio.h"
-
#define DEFAULTCS "lbserver/@localhost:lbserver20"
#define my_err() edg_wll_SetError(ctx,EDG_WLL_ERROR_DB_CALL,mysql_error((MYSQL *) ctx->mysql))
}
-int edg_wll_bufferedInsertInit(edg_wll_Context ctx, edg_wll_bufInsert *bi, void *mysql, char *table_name, long size_limit, long record_limit, int num_cols, ...)
+edg_wll_ErrorCode edg_wll_bufferedInsertInit(edg_wll_Context ctx, edg_wll_bufInsert *bi, void *mysql, char *table_name, long size_limit, long record_limit, char *columns)
{
- va_list l;
- long i;
-
-
- assert(num_cols != 1); // XXX: I do not know one column multi-row insert :(
- va_start(l, num_cols);
-
bi->ctx = ctx;
bi->table_name = strdup(table_name);
- bi->num_cols = num_cols;
- bi->columns = calloc(num_cols, sizeof(*(bi->columns)) );
+ bi->columns = strdup(columns);
bi->rec_num = 0;
bi->rec_size = 0;
- bi->row_len = calloc(num_cols, sizeof(*(bi->row_len)) );
- bi->row_alloc = calloc(num_cols, sizeof(*(bi->row_alloc)) );
- bi->values = calloc(num_cols, sizeof(*(bi->values)) );
+ bi->rows = calloc(record_limit, sizeof(*(bi->rows)) );;
bi->size_limit = size_limit;
bi->record_limit = record_limit;
- for (i=0; i < num_cols; i++) {
- bi->columns[i] = strdup(va_arg(l,char *));
- bi->row_len[i] = 0;
- bi->row_alloc[i] = 0;
- }
-
- va_end(l);
- return 0;
+ return edg_wll_Error(bi->ctx,NULL,NULL);
+;
}
static int flush_bufferd_insert(edg_wll_bufInsert *bi)
{
- char *stmt, *cols, *vals, *temp;
+ char *stmt, *vals, *temp;
long i;
- asprintf(&cols,"%s", bi->columns[0]);
- asprintf(&vals,"%s", bi->values[0]);
- for (i=1; i < bi->num_cols; i++) {
- asprintf(&temp,"%s,%s", cols, bi->columns[i]);
- free(cols); cols = temp; temp = NULL;
- asprintf(&temp,"%s),(%s", vals, bi->values[i]);
+
+ if (!bi->rec_num)
+ return edg_wll_Error(bi->ctx,NULL,NULL);
+
+ asprintf(&vals,"(%s)", bi->rows[0]);
+ for (i=1; i < bi->rec_num; i++) {
+ // XXX: use string add (preallocated memory)
+ asprintf(&temp,"%s,(%s)", vals, bi->rows[i]);
free(vals); vals = temp; temp = NULL;
-
+ free(bi->rows[i]);
+ bi->rows[i] = NULL;
}
- trio_asprintf(&stmt, "insert into %|Ss(%|Ss) values (%|Ss);",
- bi->table_name, cols, vals);
-
- // XXX: remove after testing
- fprintf(stderr,"\n%s\n",stmt);
+ trio_asprintf(&stmt, "insert into %|Ss(%|Ss) values %s;",
+ bi->table_name, bi->columns, vals);
if (edg_wll_ExecStmt(bi->ctx,stmt,NULL) < 0) {
if (edg_wll_Error(bi->ctx,NULL,NULL) == EEXIST)
/* reset bi counters */
bi->rec_size = 0;
bi->rec_num = 0;
- for (i=0; i < bi->num_cols; i++)
- bi->row_len[i] = 0;
- free(cols);
free(vals);
free(stmt);
* adds row of n values into n columns into an insert buffer
* if num. of rows or size of data oversteps the limits, real
* multi-row insert is done
+ * Eats the row! No need to free it after the call :)
*/
-edg_wll_ErrorCode edg_wll_bufferedInsert(edg_wll_bufInsert *bi, ...)
+edg_wll_ErrorCode edg_wll_bufferedInsert(edg_wll_bufInsert *bi, char **row)
{
- va_list l;
- long i;
-
-
- va_start(l, bi);
-
- bi->rec_size = 0;
- for (i=0; i < bi->num_cols; i++) {
- if (bi->rec_num) {
- if (string_add(",", &bi->row_len[i], &bi->row_alloc[i], &bi->values[i]))
- return edg_wll_SetError(bi->ctx,ENOMEM,NULL);;
- }
- if (string_add(va_arg(l,char *), &bi->row_len[i], &bi->row_alloc[i], &bi->values[i]))
- return edg_wll_SetError(bi->ctx,ENOMEM,NULL);;
- bi->rec_size += bi->row_len[i];
- }
- bi->rec_num++;
+ bi->rows[bi->rec_num++] = *row;
+ bi->rec_size += strlen(*row);
+ *row = NULL; // just to avoid freeing by caller function
if ((bi->size_limit && bi->rec_size >= bi->size_limit) ||
(bi->record_limit && bi->rec_num >= bi->record_limit))
return edg_wll_Error(bi->ctx,NULL,NULL);
}
- va_end(l);
return edg_wll_ResetError(bi->ctx);
}
static void free_buffered_insert(edg_wll_bufInsert *bi) {
long i;
-
+
free(bi->table_name);
- for (i=0; i < bi->num_cols; i++) {
- free(bi->columns[i]);
- free(bi->values[i]);
- }
free(bi->columns);
- free(bi->values);
- free(bi->row_len);
- free(bi->row_alloc);
+ for (i=0; i < bi->rec_num; i++) {
+ free(bi->rows[i]);
+ }
+ free(bi->rows);
}
edg_wll_ErrorCode edg_wll_bufferedInsertClose(edg_wll_bufInsert *bi)
return edg_wll_Error(bi->ctx,NULL,NULL);
free_buffered_insert(bi);
+
return edg_wll_ResetError(bi->ctx);
}
static int store_user(edg_wll_Context,const char *,const char *);
static int store_job(edg_wll_Context,const edg_wlc_JobId,const char *);
+static int store_job_block(edg_wll_Context, const edg_wlc_JobId, const char *, edg_wll_bufInsert *);
static int store_flesh(edg_wll_Context,edg_wll_Event *,char *,int);
static int store_seq(edg_wll_Context,edg_wll_Event *,int);
static int check_dup(edg_wll_Context,edg_wll_Event *);
return edg_wll_Error(ctx,NULL,NULL);
}
+static int store_job_block(edg_wll_Context ctx,const edg_wlc_JobId job,const char *userid, edg_wll_bufInsert *bi)
+{
+ char *jobstr = edg_wlc_JobIdUnparse(job);
+ char *jobid = edg_wlc_JobIdGetUnique(job);
+ char *row;
+
+/* debug Duplicate key on index: Duplicate entry '(nil)' for key 1
+ */
+ if (jobid == NULL || jobstr == NULL)
+ return edg_wll_SetError(ctx,EINVAL,"store_jobi_block()");
+
+ edg_wll_ResetError(ctx);
+
+ trio_asprintf(&row, "'%|Ss','%|Ss','%|Ss'", jobid,jobstr,userid);
+ edg_wll_bufferedInsert(bi, &row); // no need to free row
+
+ free(jobstr);
+ free(jobid);
+ return edg_wll_Error(ctx,NULL,NULL);
+}
+
/*
* XXX: store it in SHORT_FIELDS for now despite it should go to dedicated
* column in EVENTS.
static int register_subjobs_embryonic(edg_wll_Context ctx,const edg_wll_RegJobEvent *e)
{
- int i, err = 0;
- edg_wlc_JobId *subjobs;
- struct timeval now;
- char *userid = strdup(strmd5(e->user,NULL));
+ int i, err = 0;
+ edg_wlc_JobId *subjobs;
+ struct timeval now;
+ char *userid = strdup(strmd5(e->user,NULL));
+ edg_wll_bufInsert bi_j, bi_s;
+ edg_wll_bufInsert *bi_jobs = &bi_j, *bi_states = &bi_s;
edg_wll_ResetError(ctx);
+
+#ifdef LB_BUF
+ /* init multirows insert mechanism for tables used here */
+ if (edg_wll_bufferedInsertInit(ctx, bi_jobs, NULL, "jobs", 100000, 1000,
+ "jobid, dg_jobid, userid"))
+ {
+ return edg_wll_SetError(ctx, EINVAL, "edg_wll_bufferedInsertInit()");
+ }
+ if (edg_wll_bufferedInsertInit(ctx, bi_states, NULL, "states", 1000000, 1000,
+ "jobid, status, seq,int_status, version, parent_job, `STD_location`,"
+ "`STD_owner`, `STD_destination`, `TIME_Cleared`, `TIME_Aborted`,"
+ "`TIME_Cancelled`, `TIME_Submitted`"))
+ {
+ return edg_wll_SetError(ctx, EINVAL, "edg_wll_bufferedInsertInit()");
+ }
+#endif
+
if (e->nsubjobs == 0) return 0;
if (e->nsubjobs < 0) return edg_wll_SetError(ctx,EINVAL,"negative number of subjobs");
char *et,*ed,*job_s;
/* save jobid-userid relation into jobs table */
+#ifdef LB_BUF
+ if ((err = store_job_block(ctx, subjobs[i], userid, bi_jobs)))
+#else
if ((err = store_job(ctx, subjobs[i], userid)))
+#endif
edg_wll_Error(ctx,&et,&ed);
- if (!err && (err = edg_wll_StoreIntStateEmbryonic(ctx, subjobs[i], e)))
+ if (!err && (err = edg_wll_StoreIntStateEmbryonic(ctx, subjobs[i], e, bi_states)))
edg_wll_Error(ctx,&et,&ed);
-job_s = edg_wlc_JobIdUnparse(subjobs[i]);
-printf("%s\n", job_s);
-free(job_s);
+//job_s = edg_wlc_JobIdUnparse(subjobs[i]);
+//printf("%s\n", job_s);
+//free(job_s);
if (err) {
job_s = edg_wlc_JobIdUnparse(subjobs[i]);
free(subjobs);
+#ifdef LB_BUF
+ /* commit the rest of multirows insert and clean structures */
+ edg_wll_bufferedInsertClose(bi_jobs);
+ edg_wll_bufferedInsertClose(bi_states);
+#endif
+
return edg_wll_Error(ctx,NULL,NULL);
}