From 67a8bdeb0c55b2d250f5dc3247d91f351ac003fc Mon Sep 17 00:00:00 2001 From: =?utf8?q?Milo=C5=A1=20Mula=C4=8D?= Date: Wed, 26 Mar 2008 11:35:15 +0000 Subject: [PATCH] propagate registrations of subjobs to JP --- org.glite.lb.server/interface/store.h | 5 ++++ org.glite.lb.server/src/db_store.c | 46 +++++++++++++++++++++++++++++------ org.glite.lb.server/src/db_supp.c | 4 +++ org.glite.lb.server/src/store.c.T | 6 ++--- 4 files changed, 51 insertions(+), 10 deletions(-) diff --git a/org.glite.lb.server/interface/store.h b/org.glite.lb.server/interface/store.h index 03a7b49..6f0ebe0 100644 --- a/org.glite.lb.server/interface/store.h +++ b/org.glite.lb.server/interface/store.h @@ -55,6 +55,11 @@ int edg_wll_delete_event(edg_wll_Context,const char *, int); #define USER_UNKNOWN "unknown" +/* flags for JP registrations */ +#define REG_JOB_TO_JP 1 +#define REG_SUBJOBS_TO_JP 2 + + #ifdef __cplusplus } #endif diff --git a/org.glite.lb.server/src/db_store.c b/org.glite.lb.server/src/db_store.c index cddcdd3..87e3ca3 100644 --- a/org.glite.lb.server/src/db_store.c +++ b/org.glite.lb.server/src/db_store.c @@ -97,9 +97,11 @@ db_store(edg_wll_Context ctx, char *event) (ev->regJob.jobtype == EDG_WLL_REGJOB_DAG || ev->regJob.jobtype == EDG_WLL_REGJOB_PARTITIONED || ev->regJob.jobtype == EDG_WLL_REGJOB_COLLECTION) && - ev->regJob.nsubjobs > 0) + ev->regJob.nsubjobs > 0) { if (register_subjobs_embryonic(ctx,&ev->regJob)) goto rollback; + reg_to_JP |= REG_SUBJOBS_TO_JP; + } commit: rollback:; @@ -179,19 +181,22 @@ err: /* Send regitration to JP */ -static int register_to_JP(edg_wll_Context ctx, edg_wll_Event *ev) +static int register_to_JP(edg_wll_Context ctx, edg_wlc_JobId jobid, char *user) { char *jids, *msg; - if ( !(jids = edg_wlc_JobIdUnparse(ev->any.jobId)) ) { + + if ( !(jids = edg_wlc_JobIdUnparse(jobid)) ) { return edg_wll_SetError(ctx, errno, "Can't unparse jobid when registering to JP"); } - if ( !(msg = realloc(jids, strlen(jids)+strlen(ev->any.user)+2)) ) { + if ( !(msg = calloc(strlen(jids)+strlen(user)+2, sizeof(char) )) ) { free(jids); return edg_wll_SetError(ctx, errno, "Can't allocate buffer when registering to JP"); } + strcat(msg, jids); + free(jids); strcat(msg, "\n"); - strcat(msg, ev->any.user); + strcat(msg, user); if ( edg_wll_MaildirStoreMsg(ctx->jpreg_dir, ctx->srvName, msg) ) { free(msg); return edg_wll_SetError(ctx, errno, lbm_errdesc); @@ -202,6 +207,29 @@ static int register_to_JP(edg_wll_Context ctx, edg_wll_Event *ev) } +static int register_subjobs_to_JP(edg_wll_Context ctx, edg_wll_Event *ev) +{ + edg_wlc_JobId *subjobs = NULL; + int i = 0, j; + + + if (edg_wll_GenerateSubjobIds(ctx, ev->regJob.jobId, + ev->regJob.nsubjobs, ev->regJob.seed, &subjobs)) + goto err; + + for (i=0; iregJob.nsubjobs; i++) { + if (register_to_JP(ctx, subjobs[i], ev->any.user)) + goto err; + } + +err: + for (j=i; jregJob.nsubjobs; j++) edg_wlc_JobIdFree(subjobs[j]); + free(subjobs); + + return edg_wll_Error(ctx,NULL,NULL); +} + + static int forward_event_to_server(edg_wll_Context ctx, char *event, edg_wll_Event *ev, int local_job) { if ( ctx->isProxy ) { @@ -239,8 +267,12 @@ static int db_store_finalize(edg_wll_Context ctx, char *event, edg_wll_Event *ev } #endif - if (reg_to_JP && ctx->jpreg_dir) - if (register_to_JP(ctx,ev)) goto err; + if (ctx->jpreg_dir) { + if (reg_to_JP & REG_JOB_TO_JP) + if (register_to_JP(ctx,ev->any.jobId,ev->any.user)) goto err; + if (reg_to_JP & REG_SUBJOBS_TO_JP) + if (register_subjobs_to_JP(ctx,ev)) goto err; + } if (forward_event_to_server(ctx, event, ev, local_job)) goto err; diff --git a/org.glite.lb.server/src/db_supp.c b/org.glite.lb.server/src/db_supp.c index 132ac20..c246e09 100644 --- a/org.glite.lb.server/src/db_supp.c +++ b/org.glite.lb.server/src/db_supp.c @@ -62,6 +62,8 @@ int edg_wll_Transaction(edg_wll_Context ctx) { int retval; if ((retval = glite_lbu_Transaction(ctx->dbctx)) != 0) edg_wll_SetErrorDB(ctx); + +printf("edg_wll_Transaction(%d)\n", retval); return retval; } @@ -69,6 +71,7 @@ int edg_wll_Commit(edg_wll_Context ctx) { int retval; if ((retval = glite_lbu_Commit(ctx->dbctx)) != 0) edg_wll_SetErrorDB(ctx); +printf("edg_wll_Commit(%d)\n", retval); return retval; } @@ -76,6 +79,7 @@ int edg_wll_Rollback(edg_wll_Context ctx) { int retval; if ((retval = glite_lbu_Rollback(ctx->dbctx)) != 0) edg_wll_SetErrorDB(ctx); +printf("edg_wll_Rollback(%d)\n", retval); return retval; } diff --git a/org.glite.lb.server/src/store.c.T b/org.glite.lb.server/src/store.c.T index 40bd32b..c3bf931 100644 --- a/org.glite.lb.server/src/store.c.T +++ b/org.glite.lb.server/src/store.c.T @@ -308,7 +308,7 @@ int store_job_server_proxy(edg_wll_Context ctx, edg_wll_Event *event, int *regis if (store_user(ctx,userid,can_peername)) goto err; if (store_job(ctx,(glite_jobid_const_t) event->any.jobId, userid, ctx->isProxy, local_job, grey, 0 )) goto err; - *register_to_JP = local_job; + *register_to_JP = (local_job) ? REG_JOB_TO_JP : 0; } else { /* Job already registered */ @@ -330,7 +330,7 @@ int store_job_server_proxy(edg_wll_Context ctx, edg_wll_Event *event, int *regis if (store_job(ctx,(glite_jobid_const_t) event->any.jobId, userid, (ctx->isProxy || !strcmp(res[0],"1")), !strcmp(res[1],"1") || (local_job ? ctx->serverRunning : 0), 0, 1)) goto err; - *register_to_JP = 1; + *register_to_JP = REG_JOB_TO_JP; } else { @@ -698,7 +698,7 @@ err: free(icnames); free(values); /* free the rest of subjobs if DEADLOCK occurs */ - for (j=i; jnsubjobs; j++) edg_wlc_JobIdFree(subjobs[i]); + for (j=i; jnsubjobs; j++) edg_wlc_JobIdFree(subjobs[j]); free(subjobs); if (sh) glite_lbu_FreeStmt(&sh); free(stmt); -- 1.8.2.3