embriotic clon of "register DAGs' subjobs" function chain
authorMiloš Mulač <mulac@civ.zcu.cz>
Mon, 19 Jun 2006 16:02:32 +0000 (16:02 +0000)
committerMiloš Mulač <mulac@civ.zcu.cz>
Mon, 19 Jun 2006 16:02:32 +0000 (16:02 +0000)
- not really used yet, just for share and discussion

org.glite.lb.server/interface/store.h
org.glite.lb.server/src/jobstat.c
org.glite.lb.server/src/store.c.T

index 3466d05..f2d7dbf 100644 (file)
@@ -33,6 +33,13 @@ edg_wll_ErrorCode edg_wll_StepIntState(
        edg_wll_JobStat *
 );
 
+/* create embriotic job state for DAGs' subjob */
+
+edg_wll_ErrorCode edg_wll_StepIntStateEmbriotic(
+       edg_wll_Context ctx,    /* INOUT */
+        edg_wll_Event *e       /* IN */
+);
+
 int db_store(edg_wll_Context,char *,char *);
 int handle_request(edg_wll_Context,char *);
 int create_reply(const edg_wll_Context,char **);
index c485bd6..57f72f6 100644 (file)
@@ -40,6 +40,7 @@ static char *job_owner(edg_wll_Context,char *);
 
 int edg_wll_intJobStatus(edg_wll_Context, const edg_wlc_JobId, int, intJobStat *, int);
 edg_wll_ErrorCode edg_wll_StoreIntState(edg_wll_Context, intJobStat *, int);
+static edg_wll_ErrorCode edg_wll_StoreIntStateEmbriotic(edg_wll_Context, intJobStat *);
 edg_wll_ErrorCode edg_wll_LoadIntState(edg_wll_Context , edg_wlc_JobId , int, intJobStat **);
 
 int js_enable_store = 1;
@@ -69,6 +70,18 @@ static char* matched_substr(char *in, regmatch_t match)
 }
 
 
+/*
+ * Returns encoded instatus for embrio DAG subjob
+ */
+
+static char *states_values_embryotic(
+       edg_wlc_JobId jobid, 
+       edg_wlc_JobId parent_job)
+{
+       return 0;
+}
+
+
 int edg_wll_JobStatus(
        edg_wll_Context ctx,
        const edg_wlc_JobId             job,
@@ -358,6 +371,65 @@ int edg_wll_intJobStatus(
 
 }
 
+static int edg_wll_intJobStatusEmbriotic(
+       edg_wll_Context         ctx,
+       edg_wll_Event           *e,
+       intJobStat              *intstat,
+       int                     update_db)
+{
+
+/* Local variables */
+       char            *string_jobid;
+
+       int             intErr = 0;
+       int             res;
+       int             be_strict = 0;
+       char            *errstring = NULL;
+
+
+/* Processing */
+       edg_wll_ResetError(ctx);
+       init_intJobStat(intstat);
+
+/* XXX: integrity check ?? can we skip? */
+       string_jobid = edg_wlc_JobIdUnparse(e->any.jobId);
+       if (string_jobid == NULL || intstat == NULL)
+               return edg_wll_SetError(ctx,EINVAL, NULL);
+       free(string_jobid);
+
+       intstat->pub.owner = strdup(e->any.user); 
+       
+       res = processEvent(intstat, e, 0, be_strict, &errstring);
+       if (res == RET_FATAL || res == RET_INTERNAL) { /* !strict */
+               intErr = 1;
+       }
+
+       if (intstat->pub.state == EDG_WLL_JOB_UNDEF) {
+               intstat->pub.state = EDG_WLL_JOB_UNKNOWN;
+       }
+
+       if (intErr) {
+               destroy_intJobStat(intstat);
+               return edg_wll_SetError(ctx, EDG_WLL_ERROR_SERVER_RESPONSE, NULL);
+       } else {
+               /* XXX intstat->pub.expectUpdate = eval_expect_update(intstat, &intstat->pub.expectFrom); */
+               intErr = edg_wlc_JobIdDup(e->any.jobId, &intstat->pub.jobId);
+               if (!intErr) {
+                       if (update_db) {
+                               edg_wll_StoreIntStateEmbriotic(ctx, intstat);
+                               /* recheck
+                                * intJobStat *reread;
+                                * edg_wll_LoadIntState(ctx, job, tsq, &reread);
+                                * destroy_intJobStat(reread);
+                               */
+                       }
+               }
+               return edg_wll_SetError(ctx, intErr, NULL);
+       }
+
+}
+
+
 /*
  * Helper for warning printouts
  */
@@ -553,6 +625,42 @@ cleanup:
        return edg_wll_Error(ctx,NULL,NULL);
 }
 
+
+static edg_wll_ErrorCode edg_wll_StoreIntStateEmbriotic(edg_wll_Context ctx,
+                                    intJobStat *stat)
+{
+       char *jobid_md5, *stat_enc, *parent_md5 = NULL;
+       char *stmt;
+       char *icnames, *icvalues;
+
+       jobid_md5 = edg_wlc_JobIdGetUnique(stat->pub.jobId);
+       stat_enc = states_values_embryotic(stat->pub.jobId, stat->pub.parent_job);;
+
+       parent_md5 = edg_wlc_JobIdGetUnique(stat->pub.parent_job);
+       if (parent_md5 == NULL) parent_md5 = strdup("*no parent job*");
+
+
+       edg_wll_IColumnsSQLPart(ctx, ctx->job_index_cols, stat, 1, &icnames, &icvalues);
+       trio_asprintf(&stmt,
+               "insert into states"
+               "(jobid,status,seq,int_status,version"
+                       ",parent_job%s) "
+               "values ('%|Ss',%d,%d,'%|Ss','%|Ss','%|Ss'%s)",
+               icnames,
+               jobid_md5, stat->pub.state, 0, stat_enc,
+               INTSTAT_VERSION, parent_md5, icvalues);
+       free(icnames); free(icvalues);
+
+       if (edg_wll_ExecStmt(ctx,stmt,NULL) < 0) goto cleanup;
+
+
+cleanup:
+       free(stmt); 
+       free(jobid_md5); free(stat_enc);
+       free(parent_md5);
+       return edg_wll_Error(ctx,NULL,NULL);
+}
+
 /*
  * Retrieve stored job state from states and status_tags DB tables.
  * Should be called with the job locked.
@@ -674,3 +782,24 @@ edg_wll_ErrorCode edg_wll_StepIntState(edg_wll_Context ctx,
        }
        return edg_wll_Error(ctx, NULL, NULL);
 }
+
+
+/* 
+ * store embriotic state of DAGs' subjob 
+ */
+
+edg_wll_ErrorCode edg_wll_StepIntStateEmbriotic(edg_wll_Context ctx,
+                                       edg_wll_Event *e)
+{
+       intJobStat      jobstat;
+
+       
+       if (!edg_wll_intJobStatusEmbriotic(ctx, e, &jobstat, js_enable_store))
+       {
+               edg_wll_UpdateStatistics(ctx, NULL, e, &jobstat.pub);
+               if (ctx->rgma_export) write2rgma_status(&jobstat.pub);
+               destroy_intJobStat(&jobstat);
+       }
+
+       return edg_wll_Error(ctx, NULL, NULL);
+}
index dd49e07..cc131ed 100644 (file)
@@ -40,6 +40,7 @@ static int store_seq(edg_wll_Context,edg_wll_Event *,int);
 static int check_dup(edg_wll_Context,edg_wll_Event *);
 static int check_auth(edg_wll_Context,edg_wll_Event *e); 
 static int register_subjobs(edg_wll_Context,const edg_wll_RegJobEvent *);
+static int register_subjobs_embriotic(edg_wll_Context,const edg_wll_RegJobEvent *);
 
 void edg_wll_StoreAnonymous(edg_wll_Context ctx,int anon) {
        ctx->allowAnonymous = anon;
@@ -552,6 +553,68 @@ static int register_subjobs(edg_wll_Context ctx,const edg_wll_RegJobEvent *e)
        return edg_wll_Error(ctx,NULL,NULL);
 }
 
+static int register_subjobs_embriotic(edg_wll_Context ctx,const edg_wll_RegJobEvent *e)
+{
+       int     i,err;
+       edg_wlc_JobId   *subjobs;
+       struct timeval  now;
+
+       edg_wll_ResetError(ctx);
+       if (e->nsubjobs == 0) return 0;
+       if (e->nsubjobs < 0) return edg_wll_SetError(ctx,EINVAL,"negative number of subjobs");
+
+       if ((err = edg_wll_GenerateSubjobIds(ctx,e->jobId,e->nsubjobs,e->seed,&subjobs)))
+               return err;
+
+       gettimeofday(&now,NULL);
+
+/* XXX: increase the overall request timeout. */
+/* XXX: smaller incerease may be added - e->nsubjobs/100 ? */
+       ctx->p_tmp_timeout.tv_sec += e->nsubjobs;
+       if (ctx->p_tmp_timeout.tv_sec > 86400) ctx->p_tmp_timeout.tv_sec = 86400;
+
+       for (i=0; i<e->nsubjobs; i++) {
+               edg_wll_Event   e2;
+               int             seq;
+               char            *et,*ed,*job_s;
+
+/* XXX: shloud be possible to move all static stuff out of the cycle
+ *     and free event after the whole loop */
+               memset(&e2,0,sizeof e2);
+               e2.type = EDG_WLL_EVENT_REGJOB;
+               e2.any.jobId = subjobs[i]; subjobs[i] = NULL;
+               memcpy(&e2.regJob.timestamp,&now,sizeof now);
+               e2.any.host = strdup(ctx->srvName);
+               e2.any.level = e->level;
+               e2.any.priority = e->priority;
+               e2.any.seqcode = strdup(EDG_WLL_SEQ_NULL);
+               e2.any.user = strdup(e->user);
+               e2.any.source = e->source;
+               e2.any.src_instance = strdup(ctx->isProxy ? 
+                       "L&B proxy" : "L&B server");
+               e2.regJob.ns = strdup(e->ns);
+               edg_wlc_JobIdDup(e->jobId,&e2.regJob.parent);
+               e2.regJob.jobtype = EDG_WLL_REGJOB_SIMPLE;
+               e2.regJob.jdl = strdup("");
+
+
+               if ((err = edg_wll_StepIntStateEmbriotic(ctx, &e2)))
+                       edg_wll_Error(ctx,&et,&ed);
+
+               if (err) {
+                       job_s = edg_wlc_JobIdUnparse(e2.any.jobId);
+                       fprintf(stderr,"%s: %s (%s)\n",job_s,et,ed);
+                       syslog(LOG_ERR,"%s: %s (%s)",job_s,et,ed);
+                       free(job_s); free(et); free(ed);
+                       edg_wll_ResetError(ctx);
+               }
+
+               edg_wll_FreeEvent(&e2);
+       }
+
+       return edg_wll_Error(ctx,NULL,NULL);
+}
+
 int edg_wll_delete_event(edg_wll_Context ctx,const char *jobid,int event)
 {
        char    *stmt;