From: František Dvořák Date: Thu, 18 Nov 2010 16:15:13 +0000 (+0000) Subject: Prepare for streaming approach in QueryJobs. X-Git-Tag: gridsite-core_R_1_7_9~2 X-Git-Url: http://scientific.zcu.cz/git/?a=commitdiff_plain;h=608ae6668f17f0c04100c03d96e323b8dff1432d;p=jra1mw.git Prepare for streaming approach in QueryJobs. --- diff --git a/org.glite.lb.server/src/get_events.h b/org.glite.lb.server/src/get_events.h index 024b3da..951b7a4 100644 --- a/org.glite.lb.server/src/get_events.h +++ b/org.glite.lb.server/src/get_events.h @@ -27,6 +27,15 @@ limitations under the License. extern "C" { #endif +/* queryjobs callback + * + * called for each found jobs, should free the structures if unused + * + * after last job: called with jobid=NULL, stat=NULL + * current error status in ctx + */ +typedef int edg_wll_QueryJobs_cb(edg_wll_Context ctx, glite_jobid_t jobid, edg_wll_JobStat *stat, void *); + #if 0 /* rel 1 */ char *edg_wll_jobid_to_user( edg_wll_Context, char *); void edg_wll_set_event_field_warn( edg_wll_Event *, char *, char *); @@ -44,6 +53,8 @@ int edg_wll_QueryEventsServer(edg_wll_Context,int,const edg_wll_QueryRec **,cons int edg_wll_QueryJobsServer(edg_wll_Context, const edg_wll_QueryRec **, int, edg_wlc_JobId **, edg_wll_JobStat **); +int edg_wll_QueryJobsServerStream(edg_wll_Context, const edg_wll_QueryRec **, int, edg_wll_QueryJobs_cb *, void *); + void edg_wll_SortEvents(edg_wll_Event *); void edg_wll_SortPEvents(edg_wll_Event **); diff --git a/org.glite.lb.server/src/query.c b/org.glite.lb.server/src/query.c index 8664b24..132e9f4 100644 --- a/org.glite.lb.server/src/query.c +++ b/org.glite.lb.server/src/query.c @@ -344,6 +344,41 @@ int jobid_only_query(const edg_wll_QueryRec **conditions) { return not_jobid_only; } + +typedef struct { + glite_jobid_t *jobs; + edg_wll_JobStat *states; + size_t maxn, n; +} queryjobs_cb_data_t; + + +static int queryjobs_cb(edg_wll_Context ctx, glite_jobid_t jobid, edg_wll_JobStat *status, void *store_gen) { + queryjobs_cb_data_t *store = (queryjobs_cb_data_t *)store_gen; + size_t n = store->n; + size_t maxn = store->maxn; + void *tmp; + + if (n >= maxn) { + maxn = maxn ? maxn << 1 : 256; + if ((tmp = realloc(store->jobs, maxn * sizeof(*store->jobs))) == NULL) + return edg_wll_SetError(ctx, errno ? : ENOMEM, NULL); + store->jobs = tmp; + + if ((tmp = realloc(store->states, maxn * sizeof(*store->states))) == NULL) + return edg_wll_SetError(ctx, errno ? : ENOMEM, NULL); + store->states = tmp; + + store->maxn = maxn; + } + store->jobs[n] = jobid; + if (status) store->states[n] = *status; + else memset(&store->states[n], 0, sizeof(*store->states)); + store->n++; + + return 0; +} + + int edg_wll_QueryJobsServer( edg_wll_Context ctx, const edg_wll_QueryRec **conditions, @@ -351,6 +386,44 @@ int edg_wll_QueryJobsServer( edg_wlc_JobId **jobs, edg_wll_JobStat **states) { + queryjobs_cb_data_t store; + size_t i; + + memset(&store, 0, sizeof store); + if (edg_wll_QueryJobsServerStream(ctx, conditions, flags, queryjobs_cb, &store)) + goto cleanup; + + if (jobs) { + *jobs = store.jobs; + store.jobs = NULL; + } + if (states) { + *states = store.states; + store.states = NULL; + } + +cleanup: + if (store.jobs) { + for ( i = 0; i < store.n; i++ ) + glite_jobid_free(store.jobs[i]); + free(store.jobs); + } + + if (store.states) { + for (i=0; i < store.n; i++) edg_wll_FreeStatus(store.states+i); + free(store.states); + } + + return edg_wll_Error(ctx,NULL,NULL); +} + + +int edg_wll_QueryJobsServerStream( + edg_wll_Context ctx, + const edg_wll_QueryRec **conditions, + int flags, + edg_wll_QueryJobs_cb *cb, void *data) +{ char *job_where = NULL, *state_where = NULL, *tags_where = NULL, @@ -361,8 +434,10 @@ int edg_wll_QueryJobsServer( *dbjob, *zomb_where = NULL, *zomb_where_temp = NULL; - edg_wlc_JobId *jobs_out = NULL; - edg_wll_JobStat *states_out = NULL; + + glite_jobid_t jobid = NULL; + edg_wll_JobStat status; + glite_lbu_Statement sh; int i = 0, j = 0, @@ -372,8 +447,11 @@ int edg_wll_QueryJobsServer( limit_loop = 1, where_flags = 0, first_or; + size_t n = 0; + memset(res,0,sizeof res); + memset(&status, 0, sizeof status); edg_wll_ResetError(ctx); if ( !conditions ) @@ -426,9 +504,7 @@ int edg_wll_QueryJobsServer( else limit = 0; - jobs_out = calloc(1, sizeof(*jobs_out)); - states_out = calloc(1, sizeof(*states_out)); - i = 0; + n = 0; do { if ( limit ) @@ -457,19 +533,19 @@ int edg_wll_QueryJobsServer( offset += ret; while ( (ret=edg_wll_FetchRow(ctx,sh,sizofa(res),NULL,res)) > 0 ) { - if ( (ret = edg_wlc_JobIdParse(res[0], jobs_out+i)) ) + if ( (ret = glite_jobid_parse(res[0], &jobid)) ) { /* unlikely to happen, internal inconsistency */ char buf[200]; snprintf(buf,sizeof buf,"JobIdParse(%s)",res[0]); edg_wll_SetError(ctx,ret,buf); free(res[0]); free(res[1]); free(res[2]); - jobs_out[i] = NULL; + jobid = NULL; goto cleanup; } - if ( check_strict_jobid(ctx, jobs_out[i]) ) + if ( check_strict_jobid(ctx, jobid) ) { - edg_wlc_JobIdFree(jobs_out[i]); + glite_jobid_free(jobid); edg_wll_ResetError(ctx); /* check_strict_jobid() sets it */ goto fetch_cycle_cleanup; } @@ -477,19 +553,19 @@ int edg_wll_QueryJobsServer( // if some condition hits unindexed column or states of matching jobs wanted if ((where_flags & FL_FILTER) || !(flags & EDG_WLL_STAT_NO_STATES)) { - if ( edg_wll_JobStatusServer(ctx, jobs_out[i], (where_flags & FL_SEL_JDL)?(flags|EDG_WLL_STAT_CLASSADS):flags, &states_out[i]) ) + if ( edg_wll_JobStatusServer(ctx, jobid, (where_flags & FL_SEL_JDL)?(flags|EDG_WLL_STAT_CLASSADS):flags, &status) ) { - edg_wlc_JobIdFree(jobs_out[i]); + glite_jobid_free(jobid); if (edg_wll_Error(ctx,NULL,NULL) == EPERM) eperm = 1; goto fetch_cycle_cleanup; } } if (where_flags & FL_FILTER) { - if ( !match_status(ctx, NULL, states_out+i, conditions) ) + if ( !match_status(ctx, NULL, &status, conditions) ) { - edg_wlc_JobIdFree(jobs_out[i]); - edg_wll_FreeStatus(states_out+i); + glite_jobid_free(jobid); + edg_wll_FreeStatus(&status); edg_wll_ResetError(ctx); /* check_strict_jobid() sets it */ goto fetch_cycle_cleanup; } @@ -499,18 +575,20 @@ int edg_wll_QueryJobsServer( if ( !ctx->noAuth && (!ctx->peerName || strcmp(res[1], strmd5(ctx->peerName, NULL))) ) { eperm = 1; - edg_wlc_JobIdFree(jobs_out[i]); - edg_wll_FreeStatus(states_out+i); + glite_jobid_free(jobid); + edg_wll_FreeStatus(&status); goto fetch_cycle_cleanup; } #endif - if ( (ctx->p_query_results != EDG_WLL_QUERYRES_ALL) && limit && (i+1 > limit) ) + if ( (ctx->p_query_results != EDG_WLL_QUERYRES_ALL) && limit && (n+1 > limit) ) { edg_wll_SetError(ctx, E2BIG, "Query result size limit exceeded"); free(res[0]); free(res[1]); free(res[2]); - memset(states_out+i, 0, sizeof(*states_out)); - jobs_out[i] = NULL; + edg_wll_FreeStatus(&status); + glite_jobid_free(jobid); + memset(&status, 0, sizeof status); + jobid = NULL; if ( ctx->p_query_results == EDG_WLL_QUERYRES_LIMITED ) { limit_loop = 0; @@ -519,20 +597,21 @@ int edg_wll_QueryJobsServer( goto cleanup; } - i++; - jobs_out = (edg_wlc_JobId *) realloc(jobs_out, (i+1) * sizeof(*jobs_out)); - states_out = (edg_wll_JobStat *) realloc(states_out, (i+1) * sizeof(*states_out)); + if (cb(ctx, jobid, &status, data) != 0) + goto cleanup; + + n++; fetch_cycle_cleanup: free(res[0]); free(res[1]); free(res[2]); - memset(states_out+i, 0, sizeof(*states_out)); - jobs_out[i] = NULL; + memset(&status, 0, sizeof status); + jobid = NULL; } limit_cycle_cleanup: glite_lbu_FreeStmt(&sh); } while ( limit_loop ); - if ( !*jobs_out ) { + if ( !n ) { if(!jobid_only_query(conditions)) { i = 0; while(conditions[i]) { @@ -575,61 +654,36 @@ limit_cycle_cleanup: j = edg_wll_ExecSQL(ctx,zquery,&sh); if (j > 0) { - jobs_out = (edg_wlc_JobId *) calloc(j+1, sizeof(*jobs_out)); - states_out = (edg_wll_JobStat *) calloc(j+1, sizeof(*states_out)); - - i = 0; + n = 0; while ( (ret=edg_wll_FetchRow(ctx,sh,sizofa(res),NULL,res)) > 0 ) { - edg_wlc_JobIdParse(res[0], jobs_out+i); - edg_wlc_JobIdParse(res[0], &(states_out[i].jobId)); - states_out[i].state = EDG_WLL_JOB_PURGED; + edg_wlc_JobIdParse(res[0], &jobid); + edg_wlc_JobIdParse(res[0], &status.jobId); + status.state = EDG_WLL_JOB_PURGED; free(res[0]); free(res[1]); free(res[2]); - - i++; + if (cb(ctx, jobid, &status, data) != 0) + goto cleanup; + n++; } } glite_lbu_FreeStmt(&sh); } } - if ( !*jobs_out ) { + if ( !n ) { if (eperm) edg_wll_SetError(ctx, EPERM, "matching jobs found but authorization failed"); else edg_wll_SetError(ctx, ENOENT, "no matching jobs found"); } - if ( i && (ret == 0) ) - { - if ( states ) - { - *states = states_out; - states_out = NULL; - } - if ( jobs ) - { - *jobs = jobs_out; - jobs_out = NULL; - } - } - + // finish + cb(ctx, NULL, NULL, data); cleanup: free(qbase); free(state_where); free(tags_where); free(job_where); - - if (jobs_out) - { - for ( i = 0; jobs_out[i]; i++ ) - edg_wlc_JobIdFree(jobs_out[i]); - free(jobs_out); - } - - if (states_out) - { - for (i=0; states_out[i].state; i++) edg_wll_FreeStatus(states_out+i); - free(states_out); - } + if (jobid) glite_jobid_free(jobid); + if (status.state) edg_wll_FreeStatus(&status); return edg_wll_Error(ctx,NULL,NULL); }