(ev->regJob.jobtype == EDG_WLL_REGJOB_DAG ||
ev->regJob.jobtype == EDG_WLL_REGJOB_PARTITIONED ||
ev->regJob.jobtype == EDG_WLL_REGJOB_COLLECTION) &&
- ev->regJob.nsubjobs > 0)
+ ev->regJob.nsubjobs > 0) {
if (register_subjobs_embryonic(ctx,&ev->regJob)) goto rollback;
+ reg_to_JP |= REG_SUBJOBS_TO_JP;
+ }
commit:
rollback:;
/* Send regitration to JP
*/
-static int register_to_JP(edg_wll_Context ctx, edg_wll_Event *ev)
+static int register_to_JP(edg_wll_Context ctx, edg_wlc_JobId jobid, char *user)
{
char *jids, *msg;
- if ( !(jids = edg_wlc_JobIdUnparse(ev->any.jobId)) ) {
+
+ if ( !(jids = edg_wlc_JobIdUnparse(jobid)) ) {
return edg_wll_SetError(ctx, errno, "Can't unparse jobid when registering to JP");
}
- if ( !(msg = realloc(jids, strlen(jids)+strlen(ev->any.user)+2)) ) {
+ if ( !(msg = calloc(strlen(jids)+strlen(user)+2, sizeof(char) )) ) {
free(jids);
return edg_wll_SetError(ctx, errno, "Can't allocate buffer when registering to JP");
}
+ strcat(msg, jids);
+ free(jids);
strcat(msg, "\n");
- strcat(msg, ev->any.user);
+ strcat(msg, user);
if ( edg_wll_MaildirStoreMsg(ctx->jpreg_dir, ctx->srvName, msg) ) {
free(msg);
return edg_wll_SetError(ctx, errno, lbm_errdesc);
}
+static int register_subjobs_to_JP(edg_wll_Context ctx, edg_wll_Event *ev)
+{
+ edg_wlc_JobId *subjobs = NULL;
+ int i = 0, j;
+
+
+ if (edg_wll_GenerateSubjobIds(ctx, ev->regJob.jobId,
+ ev->regJob.nsubjobs, ev->regJob.seed, &subjobs))
+ goto err;
+
+ for (i=0; i<ev->regJob.nsubjobs; i++) {
+ if (register_to_JP(ctx, subjobs[i], ev->any.user))
+ goto err;
+ }
+
+err:
+ for (j=i; j<ev->regJob.nsubjobs; j++) edg_wlc_JobIdFree(subjobs[j]);
+ free(subjobs);
+
+ return edg_wll_Error(ctx,NULL,NULL);
+}
+
+
static int forward_event_to_server(edg_wll_Context ctx, char *event, edg_wll_Event *ev, int local_job)
{
if ( ctx->isProxy ) {
}
#endif
- if (reg_to_JP && ctx->jpreg_dir)
- if (register_to_JP(ctx,ev)) goto err;
+ if (ctx->jpreg_dir) {
+ if (reg_to_JP & REG_JOB_TO_JP)
+ if (register_to_JP(ctx,ev->any.jobId,ev->any.user)) goto err;
+ if (reg_to_JP & REG_SUBJOBS_TO_JP)
+ if (register_subjobs_to_JP(ctx,ev)) goto err;
+ }
if (forward_event_to_server(ctx, event, ev, local_job)) goto err;
if (store_user(ctx,userid,can_peername)) goto err;
if (store_job(ctx,(glite_jobid_const_t) event->any.jobId,
userid, ctx->isProxy, local_job, grey, 0 )) goto err;
- *register_to_JP = local_job;
+ *register_to_JP = (local_job) ? REG_JOB_TO_JP : 0;
}
else {
/* Job already registered */
if (store_job(ctx,(glite_jobid_const_t) event->any.jobId,
userid, (ctx->isProxy || !strcmp(res[0],"1")),
!strcmp(res[1],"1") || (local_job ? ctx->serverRunning : 0), 0, 1)) goto err;
- *register_to_JP = 1;
+ *register_to_JP = REG_JOB_TO_JP;
}
else {
free(icnames);
free(values);
/* free the rest of subjobs if DEADLOCK occurs */
- for (j=i; j<e->nsubjobs; j++) edg_wlc_JobIdFree(subjobs[i]);
+ for (j=i; j<e->nsubjobs; j++) edg_wlc_JobIdFree(subjobs[j]);
free(subjobs);
if (sh) glite_lbu_FreeStmt(&sh);
free(stmt);