From: Miloš Mulač Date: Thu, 26 Oct 2006 11:44:11 +0000 (+0000) Subject: separate function for storing CollectionState events X-Git-Tag: merge_connpool_dst~22 X-Git-Url: http://scientific.zcu.cz/git/?a=commitdiff_plain;h=d2a0bb37e01b4111d54407a8fd5b2fc3c54acb1e;p=jra1mw.git separate function for storing CollectionState events - needs some optimalizations --- diff --git a/org.glite.lb.server/interface/store.h b/org.glite.lb.server/interface/store.h index f2d7dbf..1a66bf0 100644 --- a/org.glite.lb.server/interface/store.h +++ b/org.glite.lb.server/interface/store.h @@ -40,7 +40,8 @@ edg_wll_ErrorCode edg_wll_StepIntStateEmbriotic( edg_wll_Event *e /* IN */ ); -int db_store(edg_wll_Context,char *,char *); +int db_store(edg_wll_Context,char *, char *); +int db_parent_store(edg_wll_Context, edg_wll_Event *); int handle_request(edg_wll_Context,char *); int create_reply(const edg_wll_Context,char **); diff --git a/org.glite.lb.server/src/db_store.c b/org.glite.lb.server/src/db_store.c index 2a085f8..2e3769d 100644 --- a/org.glite.lb.server/src/db_store.c +++ b/org.glite.lb.server/src/db_store.c @@ -175,3 +175,141 @@ db_store(edg_wll_Context ctx,char *ucs, char *event) return edg_wll_Error(ctx,NULL,NULL); } + +int +db_parent_store(edg_wll_Context ctx, edg_wll_Event *ev) +{ + char *event = NULL; + int seq; + int err; + edg_wll_JobStat newstat; + + + edg_wll_ResetError(ctx); + memset(&newstat,0,sizeof newstat); + + +#ifdef LB_PERF + if (sink_mode == GLITE_LB_SINK_STORE) { + glite_wll_perftest_consumeEvent(ev); + edg_wll_FreeEvent(ev); + free(ev); + return 0; + } +#endif + + + /* XXX: if event type is user tag, convert the tag name to lowercase! + * (not sure whether to convert a value too is reasonable + * or keep it 'case sensitive') + */ + if ( ev->any.type == EDG_WLL_EVENT_USERTAG ) + { + int i; + for ( i = 0; ev->userTag.name[i] != '\0'; i++ ) + ev->userTag.name[i] = tolower(ev->userTag.name[i]); + } + + assert(ev->any.user); + + if(use_db) { + if(edg_wll_StoreEvent(ctx, ev,&seq)) + goto err; + } + + if ( ev->any.type == EDG_WLL_EVENT_CHANGEACL ) + err = edg_wll_UpdateACL(ctx, ev->any.jobId, + ev->changeACL.user_id, ev->changeACL.user_id_type, + ev->changeACL.permission, ev->changeACL.permission_type, + ev->changeACL.operation); + else { +#ifdef LB_PERF + if(sink_mode == GLITE_LB_SINK_STATE) { + glite_wll_perftest_consumeEvent(ev); + goto err; + } +#endif + + err = edg_wll_StepIntState(ctx,ev->any.jobId, ev, seq, ctx->isProxy? NULL: &newstat); + } + + if (err) goto err; + + if ( ctx->isProxy ) { + /* + * send event to the proper BK server + */ + /* XXX: RegJob events, which were logged also directly, are duplicated at server, + but it should not harm */ + +#ifdef LB_PERF + if( sink_mode == GLITE_LB_SINK_SEND ) { + glite_wll_perftest_consumeEvent(ev); + } else +#endif + event = edg_wll_UnparseEvent(ctx, ev); + assert(event); + + if (edg_wll_EventSendProxy(ctx, ev->any.jobId, event) ) { + edg_wll_SetError(ctx, EDG_WLL_IL_PROTO, "edg_wll_EventSendProxy() error."); + goto err; + } + + /* LB proxy purge + * XXX: Set propper set of states! + * TODO: Do the set of states configurable? + */ + switch ( ev->any.type ) { + case EDG_WLL_EVENT_CLEAR: + case EDG_WLL_EVENT_ABORT: + edg_wll_PurgeServerProxy(ctx, ev->any.jobId); + break; + case EDG_WLL_EVENT_CANCEL: + if (ev->cancel.status_code == EDG_WLL_CANCEL_DONE) + edg_wll_PurgeServerProxy(ctx, ev->any.jobId); + break; + default: break; + } + } else +#ifdef LB_PERF + if( sink_mode == GLITE_LB_SINK_SEND ) { + glite_wll_perftest_consumeEvent(ev); + } else +#endif + { + if ( newstat.state ) { + edg_wll_NotifMatch(ctx, &newstat); + edg_wll_FreeStatus(&newstat); + } + if ( ctx->jpreg_dir && ev->any.type == EDG_WLL_EVENT_REGJOB ) { + char *jids, *msg; + + if ( !(jids = edg_wlc_JobIdUnparse(ev->any.jobId)) ) { + edg_wll_SetError(ctx, errno, "Can't unparse jobid when registering to JP"); + goto err; + } + if ( !(msg = realloc(jids, strlen(jids)+strlen(ev->any.user)+2)) ) { + free(jids); + edg_wll_SetError(ctx, errno, "Can't allocate buffer when registering to JP"); + goto err; + } + strcat(msg, "\n"); + strcat(msg, ev->any.user); + if ( edg_wll_MaildirStoreMsg(ctx->jpreg_dir, ctx->srvName, msg) ) { + free(msg); + edg_wll_SetError(ctx, errno, lbm_errdesc); + goto err; + } + free(msg); + } + } + + free(event); + + return(0); + + err: + free(event); + + return edg_wll_Error(ctx,NULL,NULL); +} diff --git a/org.glite.lb.server/src/request.c b/org.glite.lb.server/src/request.c index 55ce6df..c2b4e1e 100644 --- a/org.glite.lb.server/src/request.c +++ b/org.glite.lb.server/src/request.c @@ -16,6 +16,27 @@ #else #define UNUSED_VAR #endif + + +int +trans_db_store(edg_wll_Context ctx, char *event_data, edg_wll_Event *e) +{ + int ret; + + if ((ret = edg_wll_Transaction(ctx) != 0)) goto err; + + if (e) ret = db_parent_store(ctx, e); + else ret = db_store(ctx, "NOT USED", event_data); + + if (ret == 0) { + if ((ret = edg_wll_Commit(ctx)) != 0) goto err; + } else { + edg_wll_Rollback(ctx); + } + +err: + return(ret); +} int handle_request(edg_wll_Context ctx,char *buf) @@ -31,15 +52,8 @@ handle_request(edg_wll_Context ctx,char *buf) return EDG_WLL_IL_PROTO; } - if ((ret = edg_wll_Transaction(ctx) != 0)) goto err; - ret = db_store(ctx, "NOT USED", event.data); - if (ret == 0) { - if ((ret = edg_wll_Commit(ctx)) != 0) goto err; - } else { - edg_wll_Rollback(ctx); - } + ret = trans_db_store(ctx, event.data, NULL); -err: if(event.data) free(event.data);