static int purge_one(edg_wll_Context ctx,glite_jobid_const_t,int,int,int);
int unset_proxy_flag(edg_wll_Context ctx, glite_jobid_const_t job);
static int unset_server_flag(edg_wll_Context ctx, glite_jobid_const_t job);
+static void purge_throttle(int jobs_to_exa, double purge_end, double *time_per_job, time_t *target_runtime);
int edg_wll_CreateTmpFileStorage(edg_wll_Context ctx, char *prefix, char **fname)
edg_wlc_JobId job;
char *tmpfname = NULL;
int naffected_jobs = 0, ret;
+ double now, time_per_job, purge_end;
+ struct timeval tp;
+ int jobs_to_exa;
+ time_t target_runtime;
if (!ctx->noAuth) {
}
*/
- if (request->jobs) for (i=0; request->jobs[i]; i++) {
+ /* throttle parameters */
+ gettimeofday(&tp, NULL);
+ now = tp.tv_sec + (double)tp.tv_usec / 1000000.0;
+ purge_end = now + request->target_runtime;
+ target_runtime = request->target_runtime;
+ time_per_job = -1.0;
+
+ if (request->jobs) {
+
+ for (jobs_to_exa=0; request->jobs[jobs_to_exa]; jobs_to_exa++);
+ for (i=0; request->jobs[i]; i++) {
if (edg_wlc_JobIdParse(request->jobs[i],&job)) {
fprintf(stderr,"%s: parse error\n",request->jobs[i]);
parse = 1;
parse = 1;
}
else {
+ purge_throttle(jobs_to_exa, purge_end, &time_per_job, &target_runtime);
+
switch (purge_one(ctx,job,dumpfile,request->flags&EDG_WLL_PURGE_REALLY_PURGE,ctx->isProxy)) {
case 0: if (request->flags & EDG_WLL_PURGE_LIST_JOBS) {
result->jobs = realloc(result->jobs,(naffected_jobs+2) * sizeof(*(result->jobs)));
}
edg_wlc_JobIdFree(job);
}
+ jobs_to_exa--;
+ }
}
else {
glite_lbu_Statement s;
char *job_s;
- int jobs_to_exa;
time_t timeout[EDG_WLL_NUMBER_OF_STATCODES],
start = time(NULL);
- double now, time_per_job, target_this, purge_end;
- struct timeval tp;
for (i=0; i<EDG_WLL_NUMBER_OF_STATCODES; i++)
timeout[i] = request->timeout[i] < 0 ?
if ((jobs_to_exa = edg_wll_ExecSQL(ctx, (ctx->isProxy) ? "select dg_jobid from jobs where proxy='1'" :
"select dg_jobid from jobs where server='1'", &s)) < 0) goto abort;
-
- gettimeofday(&tp, NULL);
- now = tp.tv_sec + (double)tp.tv_usec / 1000000.0;
- purge_end = now + request->target_runtime;
- time_per_job = jobs_to_exa ? (double)request->target_runtime / jobs_to_exa : 0.0;
- //fprintf(stderr, "[%d] target runtime: %ld, end: %lf, jobs: %d, tpj: %lf\n",
- // getpid(), request->target_runtime, purge_end, jobs_to_exa, time_per_job);
while (edg_wll_FetchRow(ctx,s,1,NULL,&job_s) > 0) {
if (edg_wlc_JobIdParse(job_s,&job)) {
continue;
}
- /* throttle purging according to the required target_runtime */
- if (request->target_runtime) {
- target_this = purge_end - time_per_job * jobs_to_exa;
- gettimeofday(&tp, NULL);
- now = tp.tv_sec + (double)tp.tv_usec / 1000000.0;
- if (target_this > now) { /* enough time */
- //fprintf(stderr, "[%d] sleeping for %lf second...\n", getpid(), target_this - now);
- usleep(1e6*(target_this - now));
- }
- if (target_this < now) { /* speed up */
- time_per_job = (purge_end-now)/jobs_to_exa;
- }
- //fprintf(stderr, "[%d] tpj: %lf\n", getpid(), time_per_job);
- }
+ purge_throttle(jobs_to_exa, purge_end, &time_per_job, &target_runtime);
memset(&stat,0,sizeof stat);
if (edg_wll_JobStatusServer(ctx,job,0,&stat)) { /* FIXME: replace by intJobStatus ?? */
{
char *dbjob = NULL;
char *stmt = NULL;
- glite_lbu_Statement q;
+ glite_lbu_Statement q = NULL;
int ret,dumped = 0;
char *res[10];
char *prefix = NULL, *suffix = NULL, *root = NULL;
- char *prefix_id, *suffix_id;
+ char *prefix_id = NULL, *suffix_id = NULL;
int sql_retval;
edg_wll_ResetError(ctx);
return(edg_wll_ExecSQL(ctx,stmt,NULL));
}
+
+/* throttle purging according to the required target_runtime */
+static void purge_throttle(int jobs_to_exa, double purge_end, double *time_per_job, time_t *target_runtime) {
+ struct timeval tp;
+ double target_this, now;
+
+
+ if (*time_per_job < 0.0) {
+ *time_per_job = jobs_to_exa ? (double)*target_runtime / jobs_to_exa : 0.0;
+ //fprintf(stderr, "[%d] target runtime: %ld, end: %lf, jobs: %d, tpj: %lf\n",
+ // getpid(), *target_runtime, purge_end, jobs_to_exa, *time_per_job);
+ }
+
+ if (*target_runtime) {
+ target_this = purge_end - *time_per_job * jobs_to_exa;
+ gettimeofday(&tp, NULL);
+ now = tp.tv_sec + (double)tp.tv_usec / 1000000.0;
+ if (target_this > now) { /* enough time */
+ //fprintf(stderr, "[%d] sleeping for %lf second...\n", getpid(), target_this - now);
+ usleep(1e6*(target_this - now));
+ }
+ if (target_this < now) { /* speed up */
+ *time_per_job = (purge_end-now)/jobs_to_exa;
+ /* limit not catched, maximal speed */
+ if (*time_per_job <= 0) {
+ *time_per_job = 0.0;
+ *target_runtime = 0;
+ }
+ }
+ //fprintf(stderr, "[%d] tpj: %lf\n", getpid(), *time_per_job);
+ }
+
+}