Optimized registration of DAG subjobs (still one-by-one).
authorZdeněk Salvet <salvet@ics.muni.cz>
Thu, 29 Jun 2006 14:25:32 +0000 (14:25 +0000)
committerZdeněk Salvet <salvet@ics.muni.cz>
Thu, 29 Jun 2006 14:25:32 +0000 (14:25 +0000)
org.glite.lb.server/src/jobstat.c
org.glite.lb.server/src/store.c.T

index 57f72f6..fc91a86 100644 (file)
@@ -40,7 +40,8 @@ static char *job_owner(edg_wll_Context,char *);
 
 int edg_wll_intJobStatus(edg_wll_Context, const edg_wlc_JobId, int, intJobStat *, int);
 edg_wll_ErrorCode edg_wll_StoreIntState(edg_wll_Context, intJobStat *, int);
-static edg_wll_ErrorCode edg_wll_StoreIntStateEmbriotic(edg_wll_Context, intJobStat *);
+edg_wll_ErrorCode edg_wll_StoreIntStateEmbryonic(edg_wll_Context, edg_wlc_JobId, edg_wlc_JobId);
+static edg_wll_ErrorCode states_values_embryonic(edg_wll_Context, edg_wlc_JobId, edg_wlc_JobId, char **, char**);
 edg_wll_ErrorCode edg_wll_LoadIntState(edg_wll_Context , edg_wlc_JobId , int, intJobStat **);
 
 int js_enable_store = 1;
@@ -70,18 +71,6 @@ static char* matched_substr(char *in, regmatch_t match)
 }
 
 
-/*
- * Returns encoded instatus for embrio DAG subjob
- */
-
-static char *states_values_embryotic(
-       edg_wlc_JobId jobid, 
-       edg_wlc_JobId parent_job)
-{
-       return 0;
-}
-
-
 int edg_wll_JobStatus(
        edg_wll_Context ctx,
        const edg_wlc_JobId             job,
@@ -371,64 +360,6 @@ int edg_wll_intJobStatus(
 
 }
 
-static int edg_wll_intJobStatusEmbriotic(
-       edg_wll_Context         ctx,
-       edg_wll_Event           *e,
-       intJobStat              *intstat,
-       int                     update_db)
-{
-
-/* Local variables */
-       char            *string_jobid;
-
-       int             intErr = 0;
-       int             res;
-       int             be_strict = 0;
-       char            *errstring = NULL;
-
-
-/* Processing */
-       edg_wll_ResetError(ctx);
-       init_intJobStat(intstat);
-
-/* XXX: integrity check ?? can we skip? */
-       string_jobid = edg_wlc_JobIdUnparse(e->any.jobId);
-       if (string_jobid == NULL || intstat == NULL)
-               return edg_wll_SetError(ctx,EINVAL, NULL);
-       free(string_jobid);
-
-       intstat->pub.owner = strdup(e->any.user); 
-       
-       res = processEvent(intstat, e, 0, be_strict, &errstring);
-       if (res == RET_FATAL || res == RET_INTERNAL) { /* !strict */
-               intErr = 1;
-       }
-
-       if (intstat->pub.state == EDG_WLL_JOB_UNDEF) {
-               intstat->pub.state = EDG_WLL_JOB_UNKNOWN;
-       }
-
-       if (intErr) {
-               destroy_intJobStat(intstat);
-               return edg_wll_SetError(ctx, EDG_WLL_ERROR_SERVER_RESPONSE, NULL);
-       } else {
-               /* XXX intstat->pub.expectUpdate = eval_expect_update(intstat, &intstat->pub.expectFrom); */
-               intErr = edg_wlc_JobIdDup(e->any.jobId, &intstat->pub.jobId);
-               if (!intErr) {
-                       if (update_db) {
-                               edg_wll_StoreIntStateEmbriotic(ctx, intstat);
-                               /* recheck
-                                * intJobStat *reread;
-                                * edg_wll_LoadIntState(ctx, job, tsq, &reread);
-                                * destroy_intJobStat(reread);
-                               */
-                       }
-               }
-               return edg_wll_SetError(ctx, intErr, NULL);
-       }
-
-}
-
 
 /*
  * Helper for warning printouts
@@ -626,29 +557,70 @@ cleanup:
 }
 
 
-static edg_wll_ErrorCode edg_wll_StoreIntStateEmbriotic(edg_wll_Context ctx,
-                                    intJobStat *stat)
+/*
+ * Returns encoded SQL table states record for embryonic DAG subjob
+ */
+
+static edg_wll_ErrorCode states_values_embryonic(
+       edg_wll_Context ctx,
+       edg_wlc_JobId jobid,
+       edg_wlc_JobId parent_job,
+       char **icnames,
+       char **values)
 {
-       char *jobid_md5, *stat_enc, *parent_md5 = NULL;
-       char *stmt;
-       char *icnames, *icvalues;
+       char *jobid_md5, *stat_enc, *parent_md5;
+       char *stmt = NULL;
+       char *icvalues;
+       intJobStat stat_rec;
+       intJobStat *stat = &stat_rec;
 
-       jobid_md5 = edg_wlc_JobIdGetUnique(stat->pub.jobId);
-       stat_enc = states_values_embryotic(stat->pub.jobId, stat->pub.parent_job);;
+       init_intJobStat(stat);
+       if (edg_wlc_JobIdDup(jobid, &stat->pub.jobId) ||
+               edg_wlc_JobIdDup(parent_job, &stat->pub.parent_job)) goto err;
+       stat->pub.state = EDG_WLL_JOB_SUBMITTED;
 
-       parent_md5 = edg_wlc_JobIdGetUnique(stat->pub.parent_job);
-       if (parent_md5 == NULL) parent_md5 = strdup("*no parent job*");
+       jobid_md5 = edg_wlc_JobIdGetUnique(jobid);
+       parent_md5 = edg_wlc_JobIdGetUnique(parent_job);
+       stat_enc = enc_intJobStat(strdup(""), stat);
+       if (jobid_md5 || parent_md5 == NULL || stat_enc == NULL) goto err;
+
+
+       if (edg_wll_IColumnsSQLPart(ctx, ctx->job_index_cols, stat, 1, icnames, &icvalues)) goto err;
+       trio_asprintf(&stmt,
+               "('%|Ss',%d,%d,'%|Ss','%|Ss','%|Ss'%s)",
+               jobid_md5, stat->pub.state, 1, stat_enc,
+               INTSTAT_VERSION, parent_md5, icvalues);
+       free(icvalues);
+
+err:
+       destroy_intJobStat(stat);
+       free(jobid_md5); free(stat_enc);
+       free(parent_md5);
+       *values = stmt;
+       return edg_wll_Error(ctx,NULL,NULL);
+}
 
+edg_wll_ErrorCode edg_wll_StoreIntStateEmbryonic(edg_wll_Context ctx,
+        edg_wlc_JobId jobid,
+        edg_wlc_JobId parent_job)
+{
+       char *values = NULL;
+       char *stmt;
+       char *icnames, *icvalues;
 
-       edg_wll_IColumnsSQLPart(ctx, ctx->job_index_cols, stat, 1, &icnames, &icvalues);
+       if (states_values_embryonic(ctx, jobid, parent_job, &icnames, &values))
+               goto cleanup;
+
+/* TODO
+               edg_wll_UpdateStatistics(ctx, NULL, e, &jobstat.pub);
+               if (ctx->rgma_export) write2rgma_status(&jobstat.pub);
+*/
        trio_asprintf(&stmt,
                "insert into states"
                "(jobid,status,seq,int_status,version"
                        ",parent_job%s) "
-               "values ('%|Ss',%d,%d,'%|Ss','%|Ss','%|Ss'%s)",
-               icnames,
-               jobid_md5, stat->pub.state, 0, stat_enc,
-               INTSTAT_VERSION, parent_md5, icvalues);
+               "values %s",
+               icnames, values);
        free(icnames); free(icvalues);
 
        if (edg_wll_ExecStmt(ctx,stmt,NULL) < 0) goto cleanup;
@@ -656,8 +628,7 @@ static edg_wll_ErrorCode edg_wll_StoreIntStateEmbriotic(edg_wll_Context ctx,
 
 cleanup:
        free(stmt); 
-       free(jobid_md5); free(stat_enc);
-       free(parent_md5);
+       free(values);
        return edg_wll_Error(ctx,NULL,NULL);
 }
 
@@ -783,23 +754,3 @@ edg_wll_ErrorCode edg_wll_StepIntState(edg_wll_Context ctx,
        return edg_wll_Error(ctx, NULL, NULL);
 }
 
-
-/* 
- * store embriotic state of DAGs' subjob 
- */
-
-edg_wll_ErrorCode edg_wll_StepIntStateEmbriotic(edg_wll_Context ctx,
-                                       edg_wll_Event *e)
-{
-       intJobStat      jobstat;
-
-       
-       if (!edg_wll_intJobStatusEmbriotic(ctx, e, &jobstat, js_enable_store))
-       {
-               edg_wll_UpdateStatistics(ctx, NULL, e, &jobstat.pub);
-               if (ctx->rgma_export) write2rgma_status(&jobstat.pub);
-               destroy_intJobStat(&jobstat);
-       }
-
-       return edg_wll_Error(ctx, NULL, NULL);
-}
index 8258d26..1894ce8 100644 (file)
@@ -567,7 +567,6 @@ static int register_subjobs_embriotic(edg_wll_Context ctx,const edg_wll_RegJobEv
        int     i,err;
        edg_wlc_JobId   *subjobs;
        struct timeval  now;
-       edg_wll_Event   e2;
 
 
        edg_wll_ResetError(ctx);
@@ -584,43 +583,23 @@ static int register_subjobs_embriotic(edg_wll_Context ctx,const edg_wll_RegJobEv
        ctx->p_tmp_timeout.tv_sec += e->nsubjobs;
        if (ctx->p_tmp_timeout.tv_sec > 86400) ctx->p_tmp_timeout.tv_sec = 86400;
 
-       memset(&e2,0,sizeof e2);
-       e2.type = EDG_WLL_EVENT_REGJOB;
-       memcpy(&e2.regJob.timestamp,&now,sizeof now);
-       e2.any.host = strdup(ctx->srvName);
-       e2.any.level = e->level;
-       e2.any.priority = e->priority;
-       e2.any.seqcode = strdup(EDG_WLL_SEQ_NULL);
-       e2.any.user = strdup(e->user);
-       e2.any.source = e->source;
-       e2.any.src_instance = strdup(ctx->isProxy ? 
-               "L&B proxy" : "L&B server");
-       e2.regJob.ns = strdup(e->ns);
-       edg_wlc_JobIdDup(e->jobId,&e2.regJob.parent);
-       e2.regJob.jobtype = EDG_WLL_REGJOB_SIMPLE;
-       e2.regJob.jdl = strdup("");
-
        for (i=0; i<e->nsubjobs; i++) {
                char            *et,*ed,*job_s;
 
 
-               e2.any.jobId = subjobs[i]; subjobs[i] = NULL;
-               // XXX: timespamp each subjob with actual time??
-
-               if ((err = edg_wll_StepIntStateEmbriotic(ctx, &e2)))
+               if ((err = edg_wll_StoreIntStateEmbryonic(ctx, subjobs[i], e->jobId)))
                        edg_wll_Error(ctx,&et,&ed);
 
                if (err) {
-                       job_s = edg_wlc_JobIdUnparse(e2.any.jobId);
+                       job_s = edg_wlc_JobIdUnparse(subjobs[i]);
                        fprintf(stderr,"%s: %s (%s)\n",job_s,et,ed);
                        syslog(LOG_ERR,"%s: %s (%s)",job_s,et,ed);
                        free(job_s); free(et); free(ed);
                        edg_wll_ResetError(ctx);
                }
-               edg_wlc_JobIdFree(e2.any.jobId);
+               edg_wlc_JobIdFree(subjobs[i]);
        }
 
-       edg_wll_FreeEvent(&e2);
        free(subjobs);
 
        return edg_wll_Error(ctx,NULL,NULL);