From: Zdeněk Salvet Date: Thu, 29 Jun 2006 14:25:32 +0000 (+0000) Subject: Optimized registration of DAG subjobs (still one-by-one). X-Git-Tag: gridsite-core_R_1_3_2~70 X-Git-Url: http://scientific.zcu.cz/git/?a=commitdiff_plain;h=a1b79eebf08de1b9b622ba90c607fd46c51f5900;p=jra1mw.git Optimized registration of DAG subjobs (still one-by-one). --- diff --git a/org.glite.lb.server/src/jobstat.c b/org.glite.lb.server/src/jobstat.c index 57f72f6..fc91a86 100644 --- a/org.glite.lb.server/src/jobstat.c +++ b/org.glite.lb.server/src/jobstat.c @@ -40,7 +40,8 @@ static char *job_owner(edg_wll_Context,char *); int edg_wll_intJobStatus(edg_wll_Context, const edg_wlc_JobId, int, intJobStat *, int); edg_wll_ErrorCode edg_wll_StoreIntState(edg_wll_Context, intJobStat *, int); -static edg_wll_ErrorCode edg_wll_StoreIntStateEmbriotic(edg_wll_Context, intJobStat *); +edg_wll_ErrorCode edg_wll_StoreIntStateEmbryonic(edg_wll_Context, edg_wlc_JobId, edg_wlc_JobId); +static edg_wll_ErrorCode states_values_embryonic(edg_wll_Context, edg_wlc_JobId, edg_wlc_JobId, char **, char**); edg_wll_ErrorCode edg_wll_LoadIntState(edg_wll_Context , edg_wlc_JobId , int, intJobStat **); int js_enable_store = 1; @@ -70,18 +71,6 @@ static char* matched_substr(char *in, regmatch_t match) } -/* - * Returns encoded instatus for embrio DAG subjob - */ - -static char *states_values_embryotic( - edg_wlc_JobId jobid, - edg_wlc_JobId parent_job) -{ - return 0; -} - - int edg_wll_JobStatus( edg_wll_Context ctx, const edg_wlc_JobId job, @@ -371,64 +360,6 @@ int edg_wll_intJobStatus( } -static int edg_wll_intJobStatusEmbriotic( - edg_wll_Context ctx, - edg_wll_Event *e, - intJobStat *intstat, - int update_db) -{ - -/* Local variables */ - char *string_jobid; - - int intErr = 0; - int res; - int be_strict = 0; - char *errstring = NULL; - - -/* Processing */ - edg_wll_ResetError(ctx); - init_intJobStat(intstat); - -/* XXX: integrity check ?? can we skip? */ - string_jobid = edg_wlc_JobIdUnparse(e->any.jobId); - if (string_jobid == NULL || intstat == NULL) - return edg_wll_SetError(ctx,EINVAL, NULL); - free(string_jobid); - - intstat->pub.owner = strdup(e->any.user); - - res = processEvent(intstat, e, 0, be_strict, &errstring); - if (res == RET_FATAL || res == RET_INTERNAL) { /* !strict */ - intErr = 1; - } - - if (intstat->pub.state == EDG_WLL_JOB_UNDEF) { - intstat->pub.state = EDG_WLL_JOB_UNKNOWN; - } - - if (intErr) { - destroy_intJobStat(intstat); - return edg_wll_SetError(ctx, EDG_WLL_ERROR_SERVER_RESPONSE, NULL); - } else { - /* XXX intstat->pub.expectUpdate = eval_expect_update(intstat, &intstat->pub.expectFrom); */ - intErr = edg_wlc_JobIdDup(e->any.jobId, &intstat->pub.jobId); - if (!intErr) { - if (update_db) { - edg_wll_StoreIntStateEmbriotic(ctx, intstat); - /* recheck - * intJobStat *reread; - * edg_wll_LoadIntState(ctx, job, tsq, &reread); - * destroy_intJobStat(reread); - */ - } - } - return edg_wll_SetError(ctx, intErr, NULL); - } - -} - /* * Helper for warning printouts @@ -626,29 +557,70 @@ cleanup: } -static edg_wll_ErrorCode edg_wll_StoreIntStateEmbriotic(edg_wll_Context ctx, - intJobStat *stat) +/* + * Returns encoded SQL table states record for embryonic DAG subjob + */ + +static edg_wll_ErrorCode states_values_embryonic( + edg_wll_Context ctx, + edg_wlc_JobId jobid, + edg_wlc_JobId parent_job, + char **icnames, + char **values) { - char *jobid_md5, *stat_enc, *parent_md5 = NULL; - char *stmt; - char *icnames, *icvalues; + char *jobid_md5, *stat_enc, *parent_md5; + char *stmt = NULL; + char *icvalues; + intJobStat stat_rec; + intJobStat *stat = &stat_rec; - jobid_md5 = edg_wlc_JobIdGetUnique(stat->pub.jobId); - stat_enc = states_values_embryotic(stat->pub.jobId, stat->pub.parent_job);; + init_intJobStat(stat); + if (edg_wlc_JobIdDup(jobid, &stat->pub.jobId) || + edg_wlc_JobIdDup(parent_job, &stat->pub.parent_job)) goto err; + stat->pub.state = EDG_WLL_JOB_SUBMITTED; - parent_md5 = edg_wlc_JobIdGetUnique(stat->pub.parent_job); - if (parent_md5 == NULL) parent_md5 = strdup("*no parent job*"); + jobid_md5 = edg_wlc_JobIdGetUnique(jobid); + parent_md5 = edg_wlc_JobIdGetUnique(parent_job); + stat_enc = enc_intJobStat(strdup(""), stat); + if (jobid_md5 || parent_md5 == NULL || stat_enc == NULL) goto err; + + + if (edg_wll_IColumnsSQLPart(ctx, ctx->job_index_cols, stat, 1, icnames, &icvalues)) goto err; + trio_asprintf(&stmt, + "('%|Ss',%d,%d,'%|Ss','%|Ss','%|Ss'%s)", + jobid_md5, stat->pub.state, 1, stat_enc, + INTSTAT_VERSION, parent_md5, icvalues); + free(icvalues); + +err: + destroy_intJobStat(stat); + free(jobid_md5); free(stat_enc); + free(parent_md5); + *values = stmt; + return edg_wll_Error(ctx,NULL,NULL); +} +edg_wll_ErrorCode edg_wll_StoreIntStateEmbryonic(edg_wll_Context ctx, + edg_wlc_JobId jobid, + edg_wlc_JobId parent_job) +{ + char *values = NULL; + char *stmt; + char *icnames, *icvalues; - edg_wll_IColumnsSQLPart(ctx, ctx->job_index_cols, stat, 1, &icnames, &icvalues); + if (states_values_embryonic(ctx, jobid, parent_job, &icnames, &values)) + goto cleanup; + +/* TODO + edg_wll_UpdateStatistics(ctx, NULL, e, &jobstat.pub); + if (ctx->rgma_export) write2rgma_status(&jobstat.pub); +*/ trio_asprintf(&stmt, "insert into states" "(jobid,status,seq,int_status,version" ",parent_job%s) " - "values ('%|Ss',%d,%d,'%|Ss','%|Ss','%|Ss'%s)", - icnames, - jobid_md5, stat->pub.state, 0, stat_enc, - INTSTAT_VERSION, parent_md5, icvalues); + "values %s", + icnames, values); free(icnames); free(icvalues); if (edg_wll_ExecStmt(ctx,stmt,NULL) < 0) goto cleanup; @@ -656,8 +628,7 @@ static edg_wll_ErrorCode edg_wll_StoreIntStateEmbriotic(edg_wll_Context ctx, cleanup: free(stmt); - free(jobid_md5); free(stat_enc); - free(parent_md5); + free(values); return edg_wll_Error(ctx,NULL,NULL); } @@ -783,23 +754,3 @@ edg_wll_ErrorCode edg_wll_StepIntState(edg_wll_Context ctx, return edg_wll_Error(ctx, NULL, NULL); } - -/* - * store embriotic state of DAGs' subjob - */ - -edg_wll_ErrorCode edg_wll_StepIntStateEmbriotic(edg_wll_Context ctx, - edg_wll_Event *e) -{ - intJobStat jobstat; - - - if (!edg_wll_intJobStatusEmbriotic(ctx, e, &jobstat, js_enable_store)) - { - edg_wll_UpdateStatistics(ctx, NULL, e, &jobstat.pub); - if (ctx->rgma_export) write2rgma_status(&jobstat.pub); - destroy_intJobStat(&jobstat); - } - - return edg_wll_Error(ctx, NULL, NULL); -} diff --git a/org.glite.lb.server/src/store.c.T b/org.glite.lb.server/src/store.c.T index 8258d26..1894ce8 100644 --- a/org.glite.lb.server/src/store.c.T +++ b/org.glite.lb.server/src/store.c.T @@ -567,7 +567,6 @@ static int register_subjobs_embriotic(edg_wll_Context ctx,const edg_wll_RegJobEv int i,err; edg_wlc_JobId *subjobs; struct timeval now; - edg_wll_Event e2; edg_wll_ResetError(ctx); @@ -584,43 +583,23 @@ static int register_subjobs_embriotic(edg_wll_Context ctx,const edg_wll_RegJobEv ctx->p_tmp_timeout.tv_sec += e->nsubjobs; if (ctx->p_tmp_timeout.tv_sec > 86400) ctx->p_tmp_timeout.tv_sec = 86400; - memset(&e2,0,sizeof e2); - e2.type = EDG_WLL_EVENT_REGJOB; - memcpy(&e2.regJob.timestamp,&now,sizeof now); - e2.any.host = strdup(ctx->srvName); - e2.any.level = e->level; - e2.any.priority = e->priority; - e2.any.seqcode = strdup(EDG_WLL_SEQ_NULL); - e2.any.user = strdup(e->user); - e2.any.source = e->source; - e2.any.src_instance = strdup(ctx->isProxy ? - "L&B proxy" : "L&B server"); - e2.regJob.ns = strdup(e->ns); - edg_wlc_JobIdDup(e->jobId,&e2.regJob.parent); - e2.regJob.jobtype = EDG_WLL_REGJOB_SIMPLE; - e2.regJob.jdl = strdup(""); - for (i=0; insubjobs; i++) { char *et,*ed,*job_s; - e2.any.jobId = subjobs[i]; subjobs[i] = NULL; - // XXX: timespamp each subjob with actual time?? - - if ((err = edg_wll_StepIntStateEmbriotic(ctx, &e2))) + if ((err = edg_wll_StoreIntStateEmbryonic(ctx, subjobs[i], e->jobId))) edg_wll_Error(ctx,&et,&ed); if (err) { - job_s = edg_wlc_JobIdUnparse(e2.any.jobId); + job_s = edg_wlc_JobIdUnparse(subjobs[i]); fprintf(stderr,"%s: %s (%s)\n",job_s,et,ed); syslog(LOG_ERR,"%s: %s (%s)",job_s,et,ed); free(job_s); free(et); free(ed); edg_wll_ResetError(ctx); } - edg_wlc_JobIdFree(e2.any.jobId); + edg_wlc_JobIdFree(subjobs[i]); } - edg_wll_FreeEvent(&e2); free(subjobs); return edg_wll_Error(ctx,NULL,NULL);