propagate registrations of subjobs to JP
authorMiloš Mulač <mulac@civ.zcu.cz>
Wed, 26 Mar 2008 11:35:15 +0000 (11:35 +0000)
committerMiloš Mulač <mulac@civ.zcu.cz>
Wed, 26 Mar 2008 11:35:15 +0000 (11:35 +0000)
org.glite.lb.server/interface/store.h
org.glite.lb.server/src/db_store.c
org.glite.lb.server/src/db_supp.c
org.glite.lb.server/src/store.c.T

index 03a7b49..6f0ebe0 100644 (file)
@@ -55,6 +55,11 @@ int edg_wll_delete_event(edg_wll_Context,const char *, int);
 
 #define USER_UNKNOWN   "unknown"
 
+/* flags for JP registrations */
+#define REG_JOB_TO_JP          1
+#define REG_SUBJOBS_TO_JP      2
+
+
 #ifdef __cplusplus
 }
 #endif
index cddcdd3..87e3ca3 100644 (file)
@@ -97,9 +97,11 @@ db_store(edg_wll_Context ctx, char *event)
                (ev->regJob.jobtype == EDG_WLL_REGJOB_DAG ||
                 ev->regJob.jobtype == EDG_WLL_REGJOB_PARTITIONED ||
                 ev->regJob.jobtype == EDG_WLL_REGJOB_COLLECTION) &&
-               ev->regJob.nsubjobs > 0)  
+               ev->regJob.nsubjobs > 0) { 
 
                        if (register_subjobs_embryonic(ctx,&ev->regJob)) goto rollback;
+                       reg_to_JP |= REG_SUBJOBS_TO_JP;
+       }
 
 commit:
 rollback:;
@@ -179,19 +181,22 @@ err:
 
 /* Send regitration to JP 
  */
-static int register_to_JP(edg_wll_Context ctx, edg_wll_Event *ev)
+static int register_to_JP(edg_wll_Context ctx, edg_wlc_JobId jobid, char *user)
 {
        char *jids, *msg;
        
-       if ( !(jids = edg_wlc_JobIdUnparse(ev->any.jobId)) ) {
+       
+       if ( !(jids = edg_wlc_JobIdUnparse(jobid)) ) {
                return edg_wll_SetError(ctx, errno, "Can't unparse jobid when registering to JP");
        }
-       if ( !(msg = realloc(jids, strlen(jids)+strlen(ev->any.user)+2)) ) {
+       if ( !(msg = calloc(strlen(jids)+strlen(user)+2, sizeof(char) )) ) {
                free(jids);
                return edg_wll_SetError(ctx, errno, "Can't allocate buffer when registering to JP");
        }
+       strcat(msg, jids);
+       free(jids);
        strcat(msg, "\n");
-       strcat(msg, ev->any.user);
+       strcat(msg, user);
        if ( edg_wll_MaildirStoreMsg(ctx->jpreg_dir, ctx->srvName, msg) ) {
                free(msg);
                return edg_wll_SetError(ctx, errno, lbm_errdesc);
@@ -202,6 +207,29 @@ static int register_to_JP(edg_wll_Context ctx, edg_wll_Event *ev)
 }
 
 
+static int register_subjobs_to_JP(edg_wll_Context ctx, edg_wll_Event *ev)
+{
+       edg_wlc_JobId   *subjobs = NULL;
+       int             i = 0, j;
+
+
+       if (edg_wll_GenerateSubjobIds(ctx, ev->regJob.jobId, 
+                       ev->regJob.nsubjobs, ev->regJob.seed, &subjobs)) 
+               goto err;
+
+       for (i=0; i<ev->regJob.nsubjobs; i++) {
+               if (register_to_JP(ctx, subjobs[i], ev->any.user))
+                       goto err;
+       }
+
+err:
+       for (j=i; j<ev->regJob.nsubjobs; j++) edg_wlc_JobIdFree(subjobs[j]);
+       free(subjobs);
+
+       return edg_wll_Error(ctx,NULL,NULL);
+}
+
+
 static int forward_event_to_server(edg_wll_Context ctx, char *event, edg_wll_Event *ev, int local_job)
 {
        if ( ctx->isProxy ) {
@@ -239,8 +267,12 @@ static int db_store_finalize(edg_wll_Context ctx, char *event, edg_wll_Event *ev
        }
 #endif
        
-       if (reg_to_JP && ctx->jpreg_dir) 
-               if (register_to_JP(ctx,ev)) goto err;
+       if (ctx->jpreg_dir) {
+               if (reg_to_JP & REG_JOB_TO_JP) 
+                       if (register_to_JP(ctx,ev->any.jobId,ev->any.user)) goto err;
+               if (reg_to_JP & REG_SUBJOBS_TO_JP)
+                       if (register_subjobs_to_JP(ctx,ev)) goto err;
+       }
 
        if (forward_event_to_server(ctx, event, ev, local_job)) goto err;
        
index 132ac20..c246e09 100644 (file)
@@ -62,6 +62,8 @@ int edg_wll_Transaction(edg_wll_Context ctx) {
        int retval;
 
        if ((retval = glite_lbu_Transaction(ctx->dbctx)) != 0) edg_wll_SetErrorDB(ctx);
+
+printf("edg_wll_Transaction(%d)\n", retval);
        return retval;
 }
 
@@ -69,6 +71,7 @@ int edg_wll_Commit(edg_wll_Context ctx) {
        int retval;
 
        if ((retval = glite_lbu_Commit(ctx->dbctx)) != 0) edg_wll_SetErrorDB(ctx);
+printf("edg_wll_Commit(%d)\n", retval);
        return retval;
 }
 
@@ -76,6 +79,7 @@ int edg_wll_Rollback(edg_wll_Context ctx) {
        int retval;
 
        if ((retval = glite_lbu_Rollback(ctx->dbctx)) != 0) edg_wll_SetErrorDB(ctx);
+printf("edg_wll_Rollback(%d)\n", retval);
        return retval;
 }
 
index 40bd32b..c3bf931 100644 (file)
@@ -308,7 +308,7 @@ int store_job_server_proxy(edg_wll_Context ctx, edg_wll_Event *event, int *regis
                if (store_user(ctx,userid,can_peername)) goto err;
                if (store_job(ctx,(glite_jobid_const_t) event->any.jobId,
                        userid, ctx->isProxy, local_job, grey, 0 )) goto err;
-               *register_to_JP = local_job;
+               *register_to_JP = (local_job) ? REG_JOB_TO_JP : 0;
        }
        else {
                /* Job already registered */
@@ -330,7 +330,7 @@ int store_job_server_proxy(edg_wll_Context ctx, edg_wll_Event *event, int *regis
                        if (store_job(ctx,(glite_jobid_const_t) event->any.jobId,
                                userid, (ctx->isProxy || !strcmp(res[0],"1")), 
                                !strcmp(res[1],"1") || (local_job ? ctx->serverRunning : 0), 0, 1)) goto err;
-                       *register_to_JP = 1;
+                       *register_to_JP = REG_JOB_TO_JP;
                                
                }
                else {
@@ -698,7 +698,7 @@ err:
        free(icnames);
        free(values);
        /* free the rest of subjobs if DEADLOCK occurs */
-       for (j=i; j<e->nsubjobs; j++) edg_wlc_JobIdFree(subjobs[i]);
+       for (j=i; j<e->nsubjobs; j++) edg_wlc_JobIdFree(subjobs[j]);
        free(subjobs);
        if (sh) glite_lbu_FreeStmt(&sh);
        free(stmt);