return not_jobid_only;
}
+
+typedef struct {
+ glite_jobid_t *jobs;
+ edg_wll_JobStat *states;
+ size_t maxn, n;
+} queryjobs_cb_data_t;
+
+
+static int queryjobs_cb(edg_wll_Context ctx, glite_jobid_t jobid, edg_wll_JobStat *status, void *store_gen) {
+ queryjobs_cb_data_t *store = (queryjobs_cb_data_t *)store_gen;
+ size_t n = store->n;
+ size_t maxn = store->maxn;
+ void *tmp;
+
+ if (n >= maxn) {
+ maxn = maxn ? maxn << 1 : 256;
+ if ((tmp = realloc(store->jobs, maxn * sizeof(*store->jobs))) == NULL)
+ return edg_wll_SetError(ctx, errno ? : ENOMEM, NULL);
+ store->jobs = tmp;
+
+ if ((tmp = realloc(store->states, maxn * sizeof(*store->states))) == NULL)
+ return edg_wll_SetError(ctx, errno ? : ENOMEM, NULL);
+ store->states = tmp;
+
+ store->maxn = maxn;
+ }
+ store->jobs[n] = jobid;
+ if (status) store->states[n] = *status;
+ else memset(&store->states[n], 0, sizeof(*store->states));
+ store->n++;
+
+ return 0;
+}
+
+
int edg_wll_QueryJobsServer(
edg_wll_Context ctx,
const edg_wll_QueryRec **conditions,
edg_wlc_JobId **jobs,
edg_wll_JobStat **states)
{
+ queryjobs_cb_data_t store;
+ size_t i;
+
+ memset(&store, 0, sizeof store);
+ if (edg_wll_QueryJobsServerStream(ctx, conditions, flags, queryjobs_cb, &store))
+ goto cleanup;
+
+ if (jobs) {
+ *jobs = store.jobs;
+ store.jobs = NULL;
+ }
+ if (states) {
+ *states = store.states;
+ store.states = NULL;
+ }
+
+cleanup:
+ if (store.jobs) {
+ for ( i = 0; i < store.n; i++ )
+ glite_jobid_free(store.jobs[i]);
+ free(store.jobs);
+ }
+
+ if (store.states) {
+ for (i=0; i < store.n; i++) edg_wll_FreeStatus(store.states+i);
+ free(store.states);
+ }
+
+ return edg_wll_Error(ctx,NULL,NULL);
+}
+
+
+int edg_wll_QueryJobsServerStream(
+ edg_wll_Context ctx,
+ const edg_wll_QueryRec **conditions,
+ int flags,
+ edg_wll_QueryJobs_cb *cb, void *data)
+{
char *job_where = NULL,
*state_where = NULL,
*tags_where = NULL,
*dbjob,
*zomb_where = NULL,
*zomb_where_temp = NULL;
- edg_wlc_JobId *jobs_out = NULL;
- edg_wll_JobStat *states_out = NULL;
+
+ glite_jobid_t jobid = NULL;
+ edg_wll_JobStat status;
+
glite_lbu_Statement sh;
int i = 0,
j = 0,
limit_loop = 1,
where_flags = 0,
first_or;
+ size_t n = 0;
+
memset(res,0,sizeof res);
+ memset(&status, 0, sizeof status);
edg_wll_ResetError(ctx);
if ( !conditions )
else
limit = 0;
- jobs_out = calloc(1, sizeof(*jobs_out));
- states_out = calloc(1, sizeof(*states_out));
- i = 0;
+ n = 0;
do
{
if ( limit )
offset += ret;
while ( (ret=edg_wll_FetchRow(ctx,sh,sizofa(res),NULL,res)) > 0 )
{
- if ( (ret = edg_wlc_JobIdParse(res[0], jobs_out+i)) )
+ if ( (ret = glite_jobid_parse(res[0], &jobid)) )
{ /* unlikely to happen, internal inconsistency */
char buf[200];
snprintf(buf,sizeof buf,"JobIdParse(%s)",res[0]);
edg_wll_SetError(ctx,ret,buf);
free(res[0]); free(res[1]); free(res[2]);
- jobs_out[i] = NULL;
+ jobid = NULL;
goto cleanup;
}
- if ( check_strict_jobid(ctx, jobs_out[i]) )
+ if ( check_strict_jobid(ctx, jobid) )
{
- edg_wlc_JobIdFree(jobs_out[i]);
+ glite_jobid_free(jobid);
edg_wll_ResetError(ctx); /* check_strict_jobid() sets it */
goto fetch_cycle_cleanup;
}
// if some condition hits unindexed column or states of matching jobs wanted
if ((where_flags & FL_FILTER) || !(flags & EDG_WLL_STAT_NO_STATES)) {
- if ( edg_wll_JobStatusServer(ctx, jobs_out[i], (where_flags & FL_SEL_JDL)?(flags|EDG_WLL_STAT_CLASSADS):flags, &states_out[i]) )
+ if ( edg_wll_JobStatusServer(ctx, jobid, (where_flags & FL_SEL_JDL)?(flags|EDG_WLL_STAT_CLASSADS):flags, &status) )
{
- edg_wlc_JobIdFree(jobs_out[i]);
+ glite_jobid_free(jobid);
if (edg_wll_Error(ctx,NULL,NULL) == EPERM) eperm = 1;
goto fetch_cycle_cleanup;
}
}
if (where_flags & FL_FILTER) {
- if ( !match_status(ctx, NULL, states_out+i, conditions) )
+ if ( !match_status(ctx, NULL, &status, conditions) )
{
- edg_wlc_JobIdFree(jobs_out[i]);
- edg_wll_FreeStatus(states_out+i);
+ glite_jobid_free(jobid);
+ edg_wll_FreeStatus(&status);
edg_wll_ResetError(ctx); /* check_strict_jobid() sets it */
goto fetch_cycle_cleanup;
}
if ( !ctx->noAuth && (!ctx->peerName || strcmp(res[1], strmd5(ctx->peerName, NULL))) )
{
eperm = 1;
- edg_wlc_JobIdFree(jobs_out[i]);
- edg_wll_FreeStatus(states_out+i);
+ glite_jobid_free(jobid);
+ edg_wll_FreeStatus(&status);
goto fetch_cycle_cleanup;
}
#endif
- if ( (ctx->p_query_results != EDG_WLL_QUERYRES_ALL) && limit && (i+1 > limit) )
+ if ( (ctx->p_query_results != EDG_WLL_QUERYRES_ALL) && limit && (n+1 > limit) )
{
edg_wll_SetError(ctx, E2BIG, "Query result size limit exceeded");
free(res[0]); free(res[1]); free(res[2]);
- memset(states_out+i, 0, sizeof(*states_out));
- jobs_out[i] = NULL;
+ edg_wll_FreeStatus(&status);
+ glite_jobid_free(jobid);
+ memset(&status, 0, sizeof status);
+ jobid = NULL;
if ( ctx->p_query_results == EDG_WLL_QUERYRES_LIMITED )
{
limit_loop = 0;
goto cleanup;
}
- i++;
- jobs_out = (edg_wlc_JobId *) realloc(jobs_out, (i+1) * sizeof(*jobs_out));
- states_out = (edg_wll_JobStat *) realloc(states_out, (i+1) * sizeof(*states_out));
+ if (cb(ctx, jobid, &status, data) != 0)
+ goto cleanup;
+
+ n++;
fetch_cycle_cleanup:
free(res[0]); free(res[1]); free(res[2]);
- memset(states_out+i, 0, sizeof(*states_out));
- jobs_out[i] = NULL;
+ memset(&status, 0, sizeof status);
+ jobid = NULL;
}
limit_cycle_cleanup:
glite_lbu_FreeStmt(&sh);
} while ( limit_loop );
- if ( !*jobs_out ) {
+ if ( !n ) {
if(!jobid_only_query(conditions)) {
i = 0;
while(conditions[i]) {
j = edg_wll_ExecSQL(ctx,zquery,&sh);
if (j > 0) {
- jobs_out = (edg_wlc_JobId *) calloc(j+1, sizeof(*jobs_out));
- states_out = (edg_wll_JobStat *) calloc(j+1, sizeof(*states_out));
-
- i = 0;
+ n = 0;
while ( (ret=edg_wll_FetchRow(ctx,sh,sizofa(res),NULL,res)) > 0 ) {
- edg_wlc_JobIdParse(res[0], jobs_out+i);
- edg_wlc_JobIdParse(res[0], &(states_out[i].jobId));
- states_out[i].state = EDG_WLL_JOB_PURGED;
+ edg_wlc_JobIdParse(res[0], &jobid);
+ edg_wlc_JobIdParse(res[0], &status.jobId);
+ status.state = EDG_WLL_JOB_PURGED;
free(res[0]); free(res[1]); free(res[2]);
-
- i++;
+ if (cb(ctx, jobid, &status, data) != 0)
+ goto cleanup;
+ n++;
}
}
glite_lbu_FreeStmt(&sh);
}
}
- if ( !*jobs_out ) {
+ if ( !n ) {
if (eperm) edg_wll_SetError(ctx, EPERM, "matching jobs found but authorization failed");
else edg_wll_SetError(ctx, ENOENT, "no matching jobs found");
}
- if ( i && (ret == 0) )
- {
- if ( states )
- {
- *states = states_out;
- states_out = NULL;
- }
- if ( jobs )
- {
- *jobs = jobs_out;
- jobs_out = NULL;
- }
- }
-
+ // finish
+ cb(ctx, NULL, NULL, data);
cleanup:
free(qbase);
free(state_where);
free(tags_where);
free(job_where);
-
- if (jobs_out)
- {
- for ( i = 0; jobs_out[i]; i++ )
- edg_wlc_JobIdFree(jobs_out[i]);
- free(jobs_out);
- }
-
- if (states_out)
- {
- for (i=0; states_out[i].state; i++) edg_wll_FreeStatus(states_out+i);
- free(states_out);
- }
+ if (jobid) glite_jobid_free(jobid);
+ if (status.state) edg_wll_FreeStatus(&status);
return edg_wll_Error(ctx,NULL,NULL);
}