From b51067f32de36b8af358f27c39e6db8c5444bd1e Mon Sep 17 00:00:00 2001 From: =?utf8?q?Milo=C5=A1=20Mula=C4=8D?= Date: Thu, 29 Nov 2007 14:33:59 +0000 Subject: [PATCH] purge jobs or they flags from DB --- org.glite.lb.server/src/db_calls.c | 7 +--- org.glite.lb.server/src/db_store.c | 24 +++++++++++ org.glite.lb.server/src/srv_purge.c | 84 +++++++++++++++++++++++++++---------- 3 files changed, 87 insertions(+), 28 deletions(-) diff --git a/org.glite.lb.server/src/db_calls.c b/org.glite.lb.server/src/db_calls.c index 941abdb..203ac2b 100644 --- a/org.glite.lb.server/src/db_calls.c +++ b/org.glite.lb.server/src/db_calls.c @@ -42,11 +42,8 @@ int edg_wll_jobMembership(edg_wll_Context ctx, edg_wlc_JobId job) if (strcmp(res[1],"0")) result += DB_SERVER_JOB; } else { - if (ret == 0) result = 0; - else { - fprintf(stderr,"Error retrieving proxy&server fields of jobs table. Missing column?\n"); - edg_wll_SetError(ctx,ENOENT,dbjob); - } + fprintf(stderr,"Error retrieving proxy&server fields of jobs table. Missing column?\n"); + edg_wll_SetError(ctx,ENOENT,dbjob); } glite_lbu_FreeStmt(&q); diff --git a/org.glite.lb.server/src/db_store.c b/org.glite.lb.server/src/db_store.c index 7ac1a70..0479e2c 100644 --- a/org.glite.lb.server/src/db_store.c +++ b/org.glite.lb.server/src/db_store.c @@ -25,6 +25,7 @@ /* XXX */ #define use_db 1 +extern int unset_proxy_flag(edg_wll_Context, edg_wlc_JobId); extern int edg_wll_NotifMatch(edg_wll_Context, const edg_wll_JobStat *); @@ -271,6 +272,29 @@ static int db_actual_store(edg_wll_Context ctx, char *event, edg_wll_Event *ev, } else #endif { + /* Purge proxy flag + * XXX: Workaround - if these events arrive on server with shared DB + * it reaches terminal state from proxy point of view and so + * proxy flag have to be removed (cannot call edg_wll_PurgeServerProxy) + * - these triggers on events should be replaced by triggers on state change + * (remove extern unset_proxy_flag, set it static in srv_purge.c) + */ + switch ( ev->any.type ) { + case EDG_WLL_EVENT_CLEAR: + case EDG_WLL_EVENT_ABORT: + if (unset_proxy_flag(ctx, ev->any.jobId) < 0) { + return(edg_wll_Error(ctx,NULL,NULL)); + } + break; + case EDG_WLL_EVENT_CANCEL: + if (ev->cancel.status_code == EDG_WLL_CANCEL_DONE) + if (unset_proxy_flag(ctx, ev->any.jobId) < 0) { + return(edg_wll_Error(ctx,NULL,NULL)); + } + break; + default: break; + } + if ( newstat->state ) { edg_wll_NotifMatch(ctx, newstat); edg_wll_FreeStatus(newstat); diff --git a/org.glite.lb.server/src/srv_purge.c b/org.glite.lb.server/src/srv_purge.c index d303f4a..2478ad9 100644 --- a/org.glite.lb.server/src/srv_purge.c +++ b/org.glite.lb.server/src/srv_purge.c @@ -43,7 +43,8 @@ static const char* const resp_headers[] = { }; static int purge_one(edg_wll_Context ctx,const edg_wlc_JobId,int,int); -static int unset_proxy_flag(edg_wll_Context ctx, edg_wlc_JobId job); +int unset_proxy_flag(edg_wll_Context ctx, edg_wlc_JobId job); +static int unset_server_flag(edg_wll_Context ctx, edg_wlc_JobId job); int edg_wll_CreateTmpFileStorage(edg_wll_Context ctx, char *prefix, char **fname) @@ -185,31 +186,15 @@ int edg_wll_CreateFileStorage(edg_wll_Context ctx, char *file_type, char *prefix int edg_wll_PurgeServerProxy(edg_wll_Context ctx, edg_wlc_JobId job) { - switch (edg_wll_jobMembership(ctx, job)) { - case DB_PROXY_JOB: - switch ( purge_one(ctx, job, -1, 1) ) { - case 0: - case ENOENT: - edg_wll_ResetError(ctx); - return 0; - - default: - return -1; - } - break; - case DB_SERVER_JOB: - // should not happen, however, no action needed - // proxy flag is unset already + switch ( purge_one(ctx, job, -1, 1) ) { + case 0: + case ENOENT: edg_wll_ResetError(ctx); return 0; - break; - case DB_PROXY_JOB+DB_SERVER_JOB: - return(unset_proxy_flag(ctx, job)); - break; + default: return -1; - break; - } + } } int edg_wll_PurgeServer(edg_wll_Context ctx,const edg_wll_PurgeRequest *request) @@ -295,7 +280,7 @@ int edg_wll_PurgeServer(edg_wll_Context ctx,const edg_wll_PurgeRequest *request) for (i=0; itimeout[i] < 0 ? ctx->purge_timeout[i] : request->timeout[i]; - if (edg_wll_ExecSQL(ctx,"select dg_jobid from jobs",&s) < 0) goto abort; + if (edg_wll_ExecSQL(ctx,"select dg_jobid from jobs where server='1'",&s) < 0) goto abort; while ((res = edg_wll_FetchRow(ctx,s,1,NULL,&job_s)) > 0) { if (edg_wlc_JobIdParse(job_s,&job)) { fprintf(stderr,"%s: parse error (internal inconsistency !)\n",job_s); @@ -443,6 +428,44 @@ int purge_one(edg_wll_Context ctx,const edg_wlc_JobId job,int dump, int purge) edg_wll_ResetError(ctx); if ( !purge && dump < 0 ) return 0; + switch (edg_wll_jobMembership(ctx, job)) { + case DB_PROXY_JOB: + if (!ctx->isProxy) { + /* should not happen */ + return 0; + } + /* continue */ + break; + case DB_SERVER_JOB: + if (ctx->isProxy) { + /* should not happen */ + return 0; + } + /* continue */ + break; + case DB_PROXY_JOB+DB_SERVER_JOB: + if (ctx->isProxy) { + purge = 0; + if (unset_proxy_flag(ctx, job) < 0) { + return(edg_wll_Error(ctx,NULL,NULL)); + } + } + else { + purge = 0; + if (unset_server_flag(ctx, job) < 0) { + return(edg_wll_Error(ctx,NULL,NULL)); + } + } + break; + case 0: + // Zombie job (server=0, proxy=0)? should not happen; + // clear it to keep DB healthy + break; + default: + return 0; + break; + } + dbjob = edg_wlc_JobIdGetUnique(job); /* XXX: strict jobid already checked */ if (edg_wll_LockJob(ctx,job)) goto clean; @@ -615,3 +638,18 @@ int unset_proxy_flag(edg_wll_Context ctx, edg_wlc_JobId job) return(edg_wll_ExecSQL(ctx,stmt,NULL)); } + +int unset_server_flag(edg_wll_Context ctx, edg_wlc_JobId job) +{ + char *stmt = NULL; + char *dbjob; + + edg_wll_ResetError(ctx); + + dbjob = edg_wlc_JobIdGetUnique(job); + + trio_asprintf(&stmt,"update jobs set server='0' where jobid='%|Ss'", dbjob); + + return(edg_wll_ExecSQL(ctx,stmt,NULL)); +} + -- 1.8.2.3