From 68379b6b5b0e1ae132111ae4acb889e5f2f90f99 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Milo=C5=A1=20Mula=C4=8D?= Date: Mon, 19 Jun 2006 16:02:32 +0000 Subject: [PATCH] embriotic clon of "register DAGs' subjobs" function chain - not really used yet, just for share and discussion --- org.glite.lb.server/interface/store.h | 7 ++ org.glite.lb.server/src/jobstat.c | 129 ++++++++++++++++++++++++++++++++++ org.glite.lb.server/src/store.c.T | 63 +++++++++++++++++ 3 files changed, 199 insertions(+) diff --git a/org.glite.lb.server/interface/store.h b/org.glite.lb.server/interface/store.h index 3466d05..f2d7dbf 100644 --- a/org.glite.lb.server/interface/store.h +++ b/org.glite.lb.server/interface/store.h @@ -33,6 +33,13 @@ edg_wll_ErrorCode edg_wll_StepIntState( edg_wll_JobStat * ); +/* create embriotic job state for DAGs' subjob */ + +edg_wll_ErrorCode edg_wll_StepIntStateEmbriotic( + edg_wll_Context ctx, /* INOUT */ + edg_wll_Event *e /* IN */ +); + int db_store(edg_wll_Context,char *,char *); int handle_request(edg_wll_Context,char *); int create_reply(const edg_wll_Context,char **); diff --git a/org.glite.lb.server/src/jobstat.c b/org.glite.lb.server/src/jobstat.c index c485bd6..57f72f6 100644 --- a/org.glite.lb.server/src/jobstat.c +++ b/org.glite.lb.server/src/jobstat.c @@ -40,6 +40,7 @@ static char *job_owner(edg_wll_Context,char *); int edg_wll_intJobStatus(edg_wll_Context, const edg_wlc_JobId, int, intJobStat *, int); edg_wll_ErrorCode edg_wll_StoreIntState(edg_wll_Context, intJobStat *, int); +static edg_wll_ErrorCode edg_wll_StoreIntStateEmbriotic(edg_wll_Context, intJobStat *); edg_wll_ErrorCode edg_wll_LoadIntState(edg_wll_Context , edg_wlc_JobId , int, intJobStat **); int js_enable_store = 1; @@ -69,6 +70,18 @@ static char* matched_substr(char *in, regmatch_t match) } +/* + * Returns encoded instatus for embrio DAG subjob + */ + +static char *states_values_embryotic( + edg_wlc_JobId jobid, + edg_wlc_JobId parent_job) +{ + return 0; +} + + int edg_wll_JobStatus( edg_wll_Context ctx, const edg_wlc_JobId job, @@ -358,6 +371,65 @@ int edg_wll_intJobStatus( } +static int edg_wll_intJobStatusEmbriotic( + edg_wll_Context ctx, + edg_wll_Event *e, + intJobStat *intstat, + int update_db) +{ + +/* Local variables */ + char *string_jobid; + + int intErr = 0; + int res; + int be_strict = 0; + char *errstring = NULL; + + +/* Processing */ + edg_wll_ResetError(ctx); + init_intJobStat(intstat); + +/* XXX: integrity check ?? can we skip? */ + string_jobid = edg_wlc_JobIdUnparse(e->any.jobId); + if (string_jobid == NULL || intstat == NULL) + return edg_wll_SetError(ctx,EINVAL, NULL); + free(string_jobid); + + intstat->pub.owner = strdup(e->any.user); + + res = processEvent(intstat, e, 0, be_strict, &errstring); + if (res == RET_FATAL || res == RET_INTERNAL) { /* !strict */ + intErr = 1; + } + + if (intstat->pub.state == EDG_WLL_JOB_UNDEF) { + intstat->pub.state = EDG_WLL_JOB_UNKNOWN; + } + + if (intErr) { + destroy_intJobStat(intstat); + return edg_wll_SetError(ctx, EDG_WLL_ERROR_SERVER_RESPONSE, NULL); + } else { + /* XXX intstat->pub.expectUpdate = eval_expect_update(intstat, &intstat->pub.expectFrom); */ + intErr = edg_wlc_JobIdDup(e->any.jobId, &intstat->pub.jobId); + if (!intErr) { + if (update_db) { + edg_wll_StoreIntStateEmbriotic(ctx, intstat); + /* recheck + * intJobStat *reread; + * edg_wll_LoadIntState(ctx, job, tsq, &reread); + * destroy_intJobStat(reread); + */ + } + } + return edg_wll_SetError(ctx, intErr, NULL); + } + +} + + /* * Helper for warning printouts */ @@ -553,6 +625,42 @@ cleanup: return edg_wll_Error(ctx,NULL,NULL); } + +static edg_wll_ErrorCode edg_wll_StoreIntStateEmbriotic(edg_wll_Context ctx, + intJobStat *stat) +{ + char *jobid_md5, *stat_enc, *parent_md5 = NULL; + char *stmt; + char *icnames, *icvalues; + + jobid_md5 = edg_wlc_JobIdGetUnique(stat->pub.jobId); + stat_enc = states_values_embryotic(stat->pub.jobId, stat->pub.parent_job);; + + parent_md5 = edg_wlc_JobIdGetUnique(stat->pub.parent_job); + if (parent_md5 == NULL) parent_md5 = strdup("*no parent job*"); + + + edg_wll_IColumnsSQLPart(ctx, ctx->job_index_cols, stat, 1, &icnames, &icvalues); + trio_asprintf(&stmt, + "insert into states" + "(jobid,status,seq,int_status,version" + ",parent_job%s) " + "values ('%|Ss',%d,%d,'%|Ss','%|Ss','%|Ss'%s)", + icnames, + jobid_md5, stat->pub.state, 0, stat_enc, + INTSTAT_VERSION, parent_md5, icvalues); + free(icnames); free(icvalues); + + if (edg_wll_ExecStmt(ctx,stmt,NULL) < 0) goto cleanup; + + +cleanup: + free(stmt); + free(jobid_md5); free(stat_enc); + free(parent_md5); + return edg_wll_Error(ctx,NULL,NULL); +} + /* * Retrieve stored job state from states and status_tags DB tables. * Should be called with the job locked. @@ -674,3 +782,24 @@ edg_wll_ErrorCode edg_wll_StepIntState(edg_wll_Context ctx, } return edg_wll_Error(ctx, NULL, NULL); } + + +/* + * store embriotic state of DAGs' subjob + */ + +edg_wll_ErrorCode edg_wll_StepIntStateEmbriotic(edg_wll_Context ctx, + edg_wll_Event *e) +{ + intJobStat jobstat; + + + if (!edg_wll_intJobStatusEmbriotic(ctx, e, &jobstat, js_enable_store)) + { + edg_wll_UpdateStatistics(ctx, NULL, e, &jobstat.pub); + if (ctx->rgma_export) write2rgma_status(&jobstat.pub); + destroy_intJobStat(&jobstat); + } + + return edg_wll_Error(ctx, NULL, NULL); +} diff --git a/org.glite.lb.server/src/store.c.T b/org.glite.lb.server/src/store.c.T index dd49e07..cc131ed 100644 --- a/org.glite.lb.server/src/store.c.T +++ b/org.glite.lb.server/src/store.c.T @@ -40,6 +40,7 @@ static int store_seq(edg_wll_Context,edg_wll_Event *,int); static int check_dup(edg_wll_Context,edg_wll_Event *); static int check_auth(edg_wll_Context,edg_wll_Event *e); static int register_subjobs(edg_wll_Context,const edg_wll_RegJobEvent *); +static int register_subjobs_embriotic(edg_wll_Context,const edg_wll_RegJobEvent *); void edg_wll_StoreAnonymous(edg_wll_Context ctx,int anon) { ctx->allowAnonymous = anon; @@ -552,6 +553,68 @@ static int register_subjobs(edg_wll_Context ctx,const edg_wll_RegJobEvent *e) return edg_wll_Error(ctx,NULL,NULL); } +static int register_subjobs_embriotic(edg_wll_Context ctx,const edg_wll_RegJobEvent *e) +{ + int i,err; + edg_wlc_JobId *subjobs; + struct timeval now; + + edg_wll_ResetError(ctx); + if (e->nsubjobs == 0) return 0; + if (e->nsubjobs < 0) return edg_wll_SetError(ctx,EINVAL,"negative number of subjobs"); + + if ((err = edg_wll_GenerateSubjobIds(ctx,e->jobId,e->nsubjobs,e->seed,&subjobs))) + return err; + + gettimeofday(&now,NULL); + +/* XXX: increase the overall request timeout. */ +/* XXX: smaller incerease may be added - e->nsubjobs/100 ? */ + ctx->p_tmp_timeout.tv_sec += e->nsubjobs; + if (ctx->p_tmp_timeout.tv_sec > 86400) ctx->p_tmp_timeout.tv_sec = 86400; + + for (i=0; insubjobs; i++) { + edg_wll_Event e2; + int seq; + char *et,*ed,*job_s; + +/* XXX: shloud be possible to move all static stuff out of the cycle + * and free event after the whole loop */ + memset(&e2,0,sizeof e2); + e2.type = EDG_WLL_EVENT_REGJOB; + e2.any.jobId = subjobs[i]; subjobs[i] = NULL; + memcpy(&e2.regJob.timestamp,&now,sizeof now); + e2.any.host = strdup(ctx->srvName); + e2.any.level = e->level; + e2.any.priority = e->priority; + e2.any.seqcode = strdup(EDG_WLL_SEQ_NULL); + e2.any.user = strdup(e->user); + e2.any.source = e->source; + e2.any.src_instance = strdup(ctx->isProxy ? + "L&B proxy" : "L&B server"); + e2.regJob.ns = strdup(e->ns); + edg_wlc_JobIdDup(e->jobId,&e2.regJob.parent); + e2.regJob.jobtype = EDG_WLL_REGJOB_SIMPLE; + e2.regJob.jdl = strdup(""); + + + if ((err = edg_wll_StepIntStateEmbriotic(ctx, &e2))) + edg_wll_Error(ctx,&et,&ed); + + if (err) { + job_s = edg_wlc_JobIdUnparse(e2.any.jobId); + fprintf(stderr,"%s: %s (%s)\n",job_s,et,ed); + syslog(LOG_ERR,"%s: %s (%s)",job_s,et,ed); + free(job_s); free(et); free(ed); + edg_wll_ResetError(ctx); + } + + edg_wll_FreeEvent(&e2); + } + + return edg_wll_Error(ctx,NULL,NULL); +} + int edg_wll_delete_event(edg_wll_Context ctx,const char *jobid,int event) { char *stmt; -- 1.8.2.3