Prepare for streaming approach in QueryJobs.
authorFrantišek Dvořák <valtri@civ.zcu.cz>
Thu, 18 Nov 2010 16:15:13 +0000 (16:15 +0000)
committerFrantišek Dvořák <valtri@civ.zcu.cz>
Thu, 18 Nov 2010 16:15:13 +0000 (16:15 +0000)
org.glite.lb.server/src/get_events.h
org.glite.lb.server/src/query.c

index 024b3da..951b7a4 100644 (file)
@@ -27,6 +27,15 @@ limitations under the License.
 extern "C" {
 #endif
 
+/* queryjobs callback
+ *
+ * called for each found jobs, should free the structures if unused
+ * 
+ * after last job: called with jobid=NULL, stat=NULL
+ *                 current error status in ctx
+ */
+typedef int edg_wll_QueryJobs_cb(edg_wll_Context ctx, glite_jobid_t jobid, edg_wll_JobStat *stat, void *);
+
 #if 0  /* rel 1 */
 char *edg_wll_jobid_to_user( edg_wll_Context, char *);
 void edg_wll_set_event_field_warn( edg_wll_Event *, char *, char *);
@@ -44,6 +53,8 @@ int edg_wll_QueryEventsServer(edg_wll_Context,int,const edg_wll_QueryRec **,cons
 
 int edg_wll_QueryJobsServer(edg_wll_Context, const edg_wll_QueryRec **, int, edg_wlc_JobId **, edg_wll_JobStat **);
 
+int edg_wll_QueryJobsServerStream(edg_wll_Context, const edg_wll_QueryRec **, int, edg_wll_QueryJobs_cb *, void *);
+
 void edg_wll_SortEvents(edg_wll_Event *);
 
 void edg_wll_SortPEvents(edg_wll_Event **);
index 8664b24..132e9f4 100644 (file)
@@ -344,6 +344,41 @@ int jobid_only_query(const edg_wll_QueryRec **conditions) {
        return not_jobid_only;
 }
 
+
+typedef struct {
+       glite_jobid_t *jobs;
+       edg_wll_JobStat *states;
+       size_t maxn, n;
+} queryjobs_cb_data_t;
+
+
+static int queryjobs_cb(edg_wll_Context ctx, glite_jobid_t jobid, edg_wll_JobStat *status, void *store_gen) {
+       queryjobs_cb_data_t *store = (queryjobs_cb_data_t *)store_gen;
+       size_t n = store->n;
+       size_t maxn = store->maxn;
+       void *tmp;
+
+       if (n >= maxn) {
+               maxn = maxn ? maxn << 1 : 256;
+               if ((tmp = realloc(store->jobs, maxn * sizeof(*store->jobs))) == NULL)
+                       return edg_wll_SetError(ctx, errno ? : ENOMEM, NULL);
+               store->jobs = tmp;
+
+               if ((tmp = realloc(store->states, maxn * sizeof(*store->states))) == NULL)
+                       return edg_wll_SetError(ctx, errno ? : ENOMEM, NULL);
+               store->states = tmp;
+
+               store->maxn = maxn;
+       }
+       store->jobs[n] = jobid;
+       if (status) store->states[n] = *status;
+       else memset(&store->states[n], 0, sizeof(*store->states));
+       store->n++;
+
+       return 0;
+}
+
+
 int edg_wll_QueryJobsServer(
                edg_wll_Context ctx,
                const edg_wll_QueryRec **conditions,
@@ -351,6 +386,44 @@ int edg_wll_QueryJobsServer(
                edg_wlc_JobId **jobs,
                edg_wll_JobStat **states)
 {
+       queryjobs_cb_data_t store;
+       size_t i;
+
+       memset(&store, 0, sizeof store);
+       if (edg_wll_QueryJobsServerStream(ctx, conditions, flags, queryjobs_cb, &store))
+               goto cleanup;
+
+       if (jobs) {
+               *jobs = store.jobs;
+               store.jobs = NULL;
+       }
+       if (states) {
+               *states = store.states;
+               store.states = NULL;
+       }
+
+cleanup:
+       if (store.jobs) {
+               for ( i = 0; i < store.n; i++ )
+                       glite_jobid_free(store.jobs[i]);
+               free(store.jobs);
+       }
+
+       if (store.states) {
+               for (i=0; i < store.n; i++) edg_wll_FreeStatus(store.states+i);
+               free(store.states);
+       }
+
+       return edg_wll_Error(ctx,NULL,NULL);
+}
+
+
+int edg_wll_QueryJobsServerStream(
+               edg_wll_Context ctx,
+               const edg_wll_QueryRec **conditions,
+               int     flags,
+               edg_wll_QueryJobs_cb *cb, void *data)
+{
        char                       *job_where = NULL,
                                           *state_where = NULL,
                                           *tags_where = NULL,
@@ -361,8 +434,10 @@ int edg_wll_QueryJobsServer(
                                           *dbjob,
                                           *zomb_where = NULL,
                                           *zomb_where_temp = NULL;
-       edg_wlc_JobId      *jobs_out = NULL;
-       edg_wll_JobStat    *states_out = NULL;
+
+       glite_jobid_t                   jobid = NULL;
+       edg_wll_JobStat                 status;
+
        glite_lbu_Statement             sh;
        int                                     i = 0,
                                                j = 0,
@@ -372,8 +447,11 @@ int edg_wll_QueryJobsServer(
                                                limit_loop = 1,
                                                where_flags = 0,
                                                first_or;
+       size_t                          n = 0;
+
 
        memset(res,0,sizeof res);
+       memset(&status, 0, sizeof status);
        edg_wll_ResetError(ctx);
 
        if ( !conditions )
@@ -426,9 +504,7 @@ int edg_wll_QueryJobsServer(
        else
                limit = 0;
 
-       jobs_out        = calloc(1, sizeof(*jobs_out));
-       states_out      = calloc(1, sizeof(*states_out));
-       i = 0;
+       n = 0;
        do
        {
                if ( limit )
@@ -457,19 +533,19 @@ int edg_wll_QueryJobsServer(
                offset += ret;
                while ( (ret=edg_wll_FetchRow(ctx,sh,sizofa(res),NULL,res)) > 0 )
                {
-                       if ( (ret = edg_wlc_JobIdParse(res[0], jobs_out+i)) )
+                       if ( (ret = glite_jobid_parse(res[0], &jobid)) )
                        {       /* unlikely to happen, internal inconsistency */
                                char    buf[200];
                                snprintf(buf,sizeof buf,"JobIdParse(%s)",res[0]);
                                edg_wll_SetError(ctx,ret,buf);
                                free(res[0]); free(res[1]); free(res[2]);
-                               jobs_out[i]     = NULL;
+                               jobid = NULL;
                                goto cleanup;
                        }
 
-                       if ( check_strict_jobid(ctx, jobs_out[i]) )
+                       if ( check_strict_jobid(ctx, jobid) )
                        {
-                               edg_wlc_JobIdFree(jobs_out[i]);
+                               glite_jobid_free(jobid);
                                edg_wll_ResetError(ctx);        /* check_strict_jobid() sets it */
                                goto fetch_cycle_cleanup;
                        }
@@ -477,19 +553,19 @@ int edg_wll_QueryJobsServer(
                        // if some condition hits unindexed column or states of matching jobs wanted
 
                        if ((where_flags & FL_FILTER) || !(flags & EDG_WLL_STAT_NO_STATES)) {
-                               if ( edg_wll_JobStatusServer(ctx, jobs_out[i], (where_flags & FL_SEL_JDL)?(flags|EDG_WLL_STAT_CLASSADS):flags, &states_out[i]) )
+                               if ( edg_wll_JobStatusServer(ctx, jobid, (where_flags & FL_SEL_JDL)?(flags|EDG_WLL_STAT_CLASSADS):flags, &status) )
                                {
-                                       edg_wlc_JobIdFree(jobs_out[i]);
+                                       glite_jobid_free(jobid);
                                        if (edg_wll_Error(ctx,NULL,NULL) == EPERM) eperm = 1;
                                        goto fetch_cycle_cleanup;
                                }
 
                        }
                        if (where_flags & FL_FILTER) {
-                               if ( !match_status(ctx, NULL, states_out+i, conditions) )
+                               if ( !match_status(ctx, NULL, &status, conditions) )
                                {
-                                       edg_wlc_JobIdFree(jobs_out[i]);
-                                       edg_wll_FreeStatus(states_out+i);
+                                       glite_jobid_free(jobid);
+                                       edg_wll_FreeStatus(&status);
                                        edg_wll_ResetError(ctx);        /* check_strict_jobid() sets it */
                                        goto fetch_cycle_cleanup;
                                }
@@ -499,18 +575,20 @@ int edg_wll_QueryJobsServer(
                        if ( !ctx->noAuth && (!ctx->peerName || strcmp(res[1], strmd5(ctx->peerName, NULL))) )
                        {
                                eperm = 1;
-                               edg_wlc_JobIdFree(jobs_out[i]);
-                               edg_wll_FreeStatus(states_out+i);
+                               glite_jobid_free(jobid);
+                               edg_wll_FreeStatus(&status);
                                goto fetch_cycle_cleanup;
                        }
 #endif
 
-                       if ( (ctx->p_query_results != EDG_WLL_QUERYRES_ALL) && limit && (i+1 > limit) )
+                       if ( (ctx->p_query_results != EDG_WLL_QUERYRES_ALL) && limit && (n+1 > limit) )
                        {
                                edg_wll_SetError(ctx, E2BIG, "Query result size limit exceeded");
                                free(res[0]); free(res[1]); free(res[2]);
-                               memset(states_out+i, 0, sizeof(*states_out));
-                               jobs_out[i]     = NULL;
+                               edg_wll_FreeStatus(&status);
+                               glite_jobid_free(jobid);
+                               memset(&status, 0, sizeof status);
+                               jobid   = NULL;
                                if ( ctx->p_query_results == EDG_WLL_QUERYRES_LIMITED )
                                {
                                        limit_loop = 0;
@@ -519,20 +597,21 @@ int edg_wll_QueryJobsServer(
                                goto cleanup;
                        }
 
-                       i++;
-                       jobs_out        = (edg_wlc_JobId *) realloc(jobs_out, (i+1) * sizeof(*jobs_out));
-                       states_out      = (edg_wll_JobStat *) realloc(states_out, (i+1) * sizeof(*states_out));
+                       if (cb(ctx, jobid, &status, data) != 0)
+                               goto cleanup;
+
+                       n++;
 
 fetch_cycle_cleanup:
                        free(res[0]); free(res[1]); free(res[2]);
-                       memset(states_out+i, 0, sizeof(*states_out));
-                       jobs_out[i]     = NULL;
+                       memset(&status, 0, sizeof status);
+                       jobid   = NULL;
                }
 limit_cycle_cleanup:
                glite_lbu_FreeStmt(&sh);
        } while ( limit_loop );
 
-       if ( !*jobs_out ) {
+       if ( !n ) {
                if(!jobid_only_query(conditions)) {
                        i = 0;
                        while(conditions[i]) {
@@ -575,61 +654,36 @@ limit_cycle_cleanup:
                        j = edg_wll_ExecSQL(ctx,zquery,&sh);
 
                        if (j > 0) {
-                               jobs_out        = (edg_wlc_JobId *) calloc(j+1, sizeof(*jobs_out));
-                               states_out      = (edg_wll_JobStat *) calloc(j+1, sizeof(*states_out));
-
-                               i = 0; 
+                               n = 0;
                                while ( (ret=edg_wll_FetchRow(ctx,sh,sizofa(res),NULL,res)) > 0 ) {
-                                       edg_wlc_JobIdParse(res[0], jobs_out+i);
-                                       edg_wlc_JobIdParse(res[0], &(states_out[i].jobId));
-                                       states_out[i].state = EDG_WLL_JOB_PURGED;
+                                       edg_wlc_JobIdParse(res[0], &jobid);
+                                       edg_wlc_JobIdParse(res[0], &status.jobId);
+                                       status.state = EDG_WLL_JOB_PURGED;
                                        free(res[0]); free(res[1]); free(res[2]);
-
-                                       i++;
+                                       if (cb(ctx, jobid, &status, data) != 0)
+                                               goto cleanup;
+                                       n++;
                                }
                        }
                        glite_lbu_FreeStmt(&sh);
                }
        }
 
-       if ( !*jobs_out ) {
+       if ( !n ) {
                if (eperm) edg_wll_SetError(ctx, EPERM, "matching jobs found but authorization failed");
                else edg_wll_SetError(ctx, ENOENT, "no matching jobs found");
        }
 
-       if ( i && (ret == 0) )
-       {
-               if ( states )
-               {
-                       *states = states_out;
-                       states_out = NULL;
-               }
-               if ( jobs )
-               {
-                       *jobs = jobs_out;
-                       jobs_out = NULL;
-               }
-       }
-
+       // finish
+       cb(ctx, NULL, NULL, data);
 
 cleanup:
        free(qbase);
        free(state_where);
        free(tags_where);
        free(job_where);
-
-       if (jobs_out)
-       {
-               for ( i = 0; jobs_out[i]; i++ )
-                       edg_wlc_JobIdFree(jobs_out[i]);
-               free(jobs_out);
-       }
-
-       if (states_out)
-       {
-               for (i=0; states_out[i].state; i++) edg_wll_FreeStatus(states_out+i);
-               free(states_out);
-       }
+       if (jobid) glite_jobid_free(jobid);
+       if (status.state) edg_wll_FreeStatus(&status);
 
        return edg_wll_Error(ctx,NULL,NULL);
 }