From: Miloš Mulač Date: Tue, 18 Dec 2007 10:40:27 +0000 (+0000) Subject: reshuffle of db_store_finalize X-Git-Tag: merge_313_3_dst~7 X-Git-Url: http://scientific.zcu.cz/git/?a=commitdiff_plain;h=807c84153d2423ad49e3f6c87b54a146331024b7;p=jra1mw.git reshuffle of db_store_finalize - enable registration of local subjobs - propagate regitrations of subjobs to JP - correct trigerring of notifications --- diff --git a/org.glite.lb.server/interface/store.h b/org.glite.lb.server/interface/store.h index a9b50c8..bb0bb38 100644 --- a/org.glite.lb.server/interface/store.h +++ b/org.glite.lb.server/interface/store.h @@ -45,7 +45,8 @@ int db_parent_store(edg_wll_Context, edg_wll_Event *, intJobStat *); int handle_request(edg_wll_Context,char *); int create_reply(const edg_wll_Context,char **); int trans_db_store(edg_wll_Context,char *,edg_wll_Event *,intJobStat *); -int store_job_server_proxy(edg_wll_Context ctx, edg_wll_Event *event, char *jobIdHost, unsigned int jobIdPort); +int is_job_local(edg_wll_Context, edg_wlc_JobId jobId); +int store_job_server_proxy(edg_wll_Context ctx, edg_wll_Event *event); int edg_wll_delete_event(edg_wll_Context,const char *, int); diff --git a/org.glite.lb.server/src/bkserverd.c b/org.glite.lb.server/src/bkserverd.c index cc4bf3b..8af2ed8 100644 --- a/org.glite.lb.server/src/bkserverd.c +++ b/org.glite.lb.server/src/bkserverd.c @@ -1214,6 +1214,8 @@ int bk_handle_connection_proxy(int conn, struct timeval *timeout, void *data) /* set globals */ + ctx->notifDuration = notif_duration; + if ( jpreg ) ctx->jpreg_dir = strdup(jpregDir); else ctx->jpreg_dir = NULL; ctx->allowAnonymous = 1; ctx->isProxy = 1; ctx->noAuth = 1; @@ -1221,6 +1223,7 @@ int bk_handle_connection_proxy(int conn, struct timeval *timeout, void *data) ctx->semset = semset; ctx->semaphores = semaphores; + if (fake_host) { ctx->srvName = strdup(fake_host); diff --git a/org.glite.lb.server/src/db_store.c b/org.glite.lb.server/src/db_store.c index 2032bfa..6102b4f 100644 --- a/org.glite.lb.server/src/db_store.c +++ b/org.glite.lb.server/src/db_store.c @@ -28,18 +28,17 @@ extern int unset_proxy_flag(edg_wll_Context, edg_wlc_JobId); extern int edg_wll_NotifMatch(edg_wll_Context, const edg_wll_JobStat *); -static int db_store_finalize(edg_wll_Context ctx, char *event, edg_wll_Event *ev, edg_wll_JobStat *newstat, int seq); +static int db_store_finalize(edg_wll_Context ctx, char *event, edg_wll_Event *ev, edg_wll_JobStat *newstat, int seq, int reg_to_JP); int db_store(edg_wll_Context ctx,char *ucs, char *event) { edg_wll_Event *ev; - int seq; + int seq, reg_to_JP = 0; int err; + int local_job; edg_wll_JobStat newstat; - char *srvName = NULL; - unsigned int srvPort; ev = NULL; @@ -49,6 +48,7 @@ db_store(edg_wll_Context ctx,char *ucs, char *event) if(edg_wll_ParseEvent(ctx, event, &ev)) goto err; + local_job = is_job_local(ctx, ev->any.jobId); #ifdef LB_PERF if (sink_mode == GLITE_LB_SINK_STORE) { @@ -59,14 +59,12 @@ db_store(edg_wll_Context ctx,char *ucs, char *event) } #endif - edg_wlc_JobIdGetServerParts(ev->any.jobId, &srvName, &srvPort); - if(use_db) { char *ed; int code; if (edg_wll_LockJob(ctx,ev->any.jobId)) goto err; - store_job_server_proxy(ctx, ev, srvName, srvPort); + store_job_server_proxy(ctx, ev); code = edg_wll_Error(ctx,NULL,&ed); edg_wll_UnlockJob(ctx,ev->any.jobId); /* XXX: ignore error */ if (code) { @@ -81,12 +79,15 @@ db_store(edg_wll_Context ctx,char *ucs, char *event) * if jobid prefix hostname matches server hostname -> they will * sooner or later arrive to server too and are stored in common DB */ - if (ctx->isProxy && ctx->serverRunning && (ev->any.priority & EDG_WLL_LOGFLAG_DIRECT) ) { - if (!strcmp(ctx->srvName, srvName)) { - free(srvName); + if (ctx->isProxy && local_job) { + if (ev->any.priority & EDG_WLL_LOGFLAG_DIRECT) { return 0; } - + else { + /* these are re-registrations of subjobs on proxy */ + /* embryonic registrations does not trigger registration in JP */ + reg_to_JP = 1; + } } /* XXX: if event type is user tag, convert the tag name to lowercase! @@ -138,7 +139,7 @@ db_store(edg_wll_Context ctx,char *ucs, char *event) */ if (err) goto err; - db_store_finalize(ctx, event, ev, &newstat, seq); + db_store_finalize(ctx, event, ev, &newstat, seq, reg_to_JP); err: @@ -147,8 +148,6 @@ err: free(ev); } - if (srvName) free(srvName); - if ( newstat.state ) edg_wll_FreeStatus(&newstat); @@ -204,7 +203,7 @@ db_parent_store(edg_wll_Context ctx, edg_wll_Event *ev, intJobStat *is) assert(event); } - db_store_finalize(ctx, event, ev, &newstat, seq); + db_store_finalize(ctx, event, ev, &newstat, seq, 0); err: @@ -216,63 +215,22 @@ err: -static int db_store_finalize(edg_wll_Context ctx, char *event, edg_wll_Event *ev, edg_wll_JobStat *newstat, int seq) { -printf("%d\n", seq); - if ( ctx->isProxy ) { - /* - * send event to the proper BK server - * event with priority flag EDG_WLL_LOGFLAG_DIRECT (typically RegJob) is not sent - */ - -#ifdef LB_PERF - if( sink_mode == GLITE_LB_SINK_SEND ) { - glite_wll_perftest_consumeEvent(ev); - } else -#endif - - /* XXX: ending here may break the backward compatibility */ - if (!(ev->any.priority & EDG_WLL_LOGFLAG_PROXY)) { - edg_wll_UpdateError(ctx, 0, "db_actual_store() WARNING: the event is not PROXY"); - //return edg_wll_SetError(ctx, EDG_WLL_IL_PROTO, "db_actual_store() ERROR: the event is not PROXY"); - } +static int db_store_finalize(edg_wll_Context ctx, char *event, edg_wll_Event *ev, edg_wll_JobStat *newstat, int seq, int reg_to_JP) +{ + int local_job = is_job_local(ctx, ev->any.jobId); - if (!(ev->any.priority & EDG_WLL_LOGFLAG_DIRECT)) { - if (edg_wll_EventSendProxy(ctx, ev->any.jobId, event) ) { - return edg_wll_SetError(ctx, EDG_WLL_IL_PROTO, "edg_wll_EventSendProxy() error."); - } - } - /* LB proxy purge */ - if (newstat->remove_from_proxy) { - edg_wll_PurgeServerProxy(ctx, ev->any.jobId); - } - } else #ifdef LB_PERF if( sink_mode == GLITE_LB_SINK_SEND ) { glite_wll_perftest_consumeEvent(ev); - } else -#endif - { - char *jobIdHost = NULL; - unsigned int jobIdPort; - - - /* Purge proxy flag */ - edg_wlc_JobIdGetServerParts(ev->any.jobId, &jobIdHost, &jobIdPort); - if ( newstat->remove_from_proxy && (ctx->srvPort == jobIdPort) && - !strcmp(jobIdHost,ctx->srvName) ) - { - if (unset_proxy_flag(ctx, ev->any.jobId) < 0) { - free(jobIdHost); - return(edg_wll_Error(ctx,NULL,NULL)); - } + return edg_wll_Error(ctx,NULL,NULL); } - free(jobIdHost); - - if ( newstat->state ) { - edg_wll_NotifMatch(ctx, newstat); - } - if ( ctx->jpreg_dir && ev->any.type == EDG_WLL_EVENT_REGJOB && seq == 0) { +#endif + + /* Send regitration to JP + */ + if ( ctx->jpreg_dir && ev->any.type == EDG_WLL_EVENT_REGJOB && seq == 0 && + (!ctx->isProxy || reg_to_JP) ) { char *jids, *msg; if ( !(jids = edg_wlc_JobIdUnparse(ev->any.jobId)) ) { @@ -290,6 +248,51 @@ printf("%d\n", seq); } free(msg); } - } - return edg_wll_Error(ctx,NULL,NULL); + + + if ( ctx->isProxy ) { + /* + * send event to the proper BK server + * event with priority flag EDG_WLL_LOGFLAG_DIRECT (typically RegJob) is not sent + */ + + /* XXX: ending here may break the backward compatibility */ + if (!(ev->any.priority & EDG_WLL_LOGFLAG_PROXY)) { + edg_wll_UpdateError(ctx, 0, "db_actual_store() WARNING: the event is not PROXY"); + //return edg_wll_SetError(ctx, EDG_WLL_IL_PROTO, "db_actual_store() ERROR: the event is not PROXY"); + } + + if (!(ev->any.priority & EDG_WLL_LOGFLAG_DIRECT) && !local_job) { + if (edg_wll_EventSendProxy(ctx, ev->any.jobId, event) ) { + return edg_wll_SetError(ctx, EDG_WLL_IL_PROTO, "edg_wll_EventSendProxy() error."); + } + } + else { + /* event will not arrive to server, only flag was set */ + /* check whether some pending notifications are not triggered */ + if ( newstat->state ) { + edg_wll_NotifMatch(ctx, newstat); + } + } + + /* LB proxy purge */ + if (newstat->remove_from_proxy) { + edg_wll_PurgeServerProxy(ctx, ev->any.jobId); + } + } else + { + /* Purge proxy flag */ + if ( newstat->remove_from_proxy && local_job ) { + if (unset_proxy_flag(ctx, ev->any.jobId) < 0) { + return(edg_wll_Error(ctx,NULL,NULL)); + } + } + + if ( newstat->state ) { + edg_wll_NotifMatch(ctx, newstat); + } + } + + + return edg_wll_Error(ctx,NULL,NULL); } diff --git a/org.glite.lb.server/src/store.c.T b/org.glite.lb.server/src/store.c.T index c409918..2be77e3 100644 --- a/org.glite.lb.server/src/store.c.T +++ b/org.glite.lb.server/src/store.c.T @@ -355,14 +355,29 @@ static int store_job_grey(edg_wll_Context ctx,const edg_wlc_JobId job,time_t eti return edg_wll_Error(ctx,NULL,NULL); } +/* test whether job shares LB proxy and server DB or not */ +int is_job_local(edg_wll_Context ctx, edg_wlc_JobId jobId) +{ + char *srvName = NULL; + unsigned int srvPort; + int ret; + + + edg_wlc_JobIdGetServerParts(jobId, &srvName, &srvPort); + ret = ((ctx->srvPort == srvPort) && !strcmp(srvName,ctx->srvName)); + free(srvName); + + return(ret); +} -int store_job_server_proxy(edg_wll_Context ctx, edg_wll_Event *event, char *jobIdHost, unsigned int jobIdPort) +int store_job_server_proxy(edg_wll_Context ctx, edg_wll_Event *event) { char *unique = edg_wlc_JobIdGetUnique(event->any.jobId); char *q = NULL, *owner = NULL, *userid = NULL; glite_lbu_Statement stmt = NULL; int nar; char *can_peername = NULL; + int local_job = is_job_local(ctx, event->any.jobId); edg_wll_ResetError(ctx); @@ -374,7 +389,7 @@ int store_job_server_proxy(edg_wll_Context ctx, edg_wll_Event *event, char *jobI if (event->any.type == EDG_WLL_EVENT_REGJOB) { if (event->any.priority & EDG_WLL_LOGFLAG_DIRECT) { /* first synchronous registration */ - if ((ctx->srvPort == jobIdPort) && !strcmp(jobIdHost,ctx->srvName)) { + if (local_job) { /* we are both server and proxy for this job */ trio_asprintf(&q,"update jobs set proxy=1 where jobid='%|Ss'", unique); @@ -412,7 +427,7 @@ int store_job_server_proxy(edg_wll_Context ctx, edg_wll_Event *event, char *jobI } else { /* supplementary re-registration (JDL of subjob, etc.) */ - if ((ctx->srvPort == jobIdPort) && !strcmp(jobIdHost,ctx->srvName)) { + if (local_job) { /* previous registration via GSI required */ trio_asprintf(&q,"update jobs set jobid='%|Ss', proxy='1' where jobid='%|Ss'", unique, unique);