static int store_user(edg_wll_Context,const char *,const char *);
static int store_job(edg_wll_Context,glite_jobid_const_t,const char *, int, int, int, int);
-#ifdef LB_BUF
-static int store_job_block(edg_wll_Context, glite_jobid_const_t, const char *, glite_lbu_bufInsert *, int, int);
-#endif
static int set_job_grey(edg_wll_Context ctx, char *jobid);
static int store_flesh(edg_wll_Context,edg_wll_Event *,const char *ulm, char *,int);
static int check_dup(edg_wll_Context,edg_wll_Event *);
return edg_wll_Error(ctx,NULL,NULL);
}
-#ifdef LB_BUF
-static int store_job_block(edg_wll_Context ctx,glite_jobid_const_t job,const char *userid, glite_lbu_bufInsert *bi, int proxy, int server)
-{
- char *jobstr = edg_wlc_JobIdUnparse(job);
- char *jobid = edg_wlc_JobIdGetUnique(job);
- char *row;
-
-/* debug Duplicate key on index: Duplicate entry '(nil)' for key 1
- */
-
- assert(!ctx->greyjobs); /* XXX: should not happen */
-
- if (jobid == NULL || jobstr == NULL)
- return edg_wll_SetError(ctx,EINVAL,"store_jobi_block()");
-
- edg_wll_ResetError(ctx);
-
- trio_asprintf(&row, "'%|Ss','%|Ss','%|Ss','%|Ss','%|Ss'", jobid,jobstr,userid,proxy,server);
- edg_wll_bufferedInsert(bi, row); // no need to free row
-
- free(jobstr);
- free(jobid);
- return edg_wll_Error(ctx,NULL,NULL);
-}
-#endif
static int set_job_grey(edg_wll_Context ctx, char *jobid)
{
return edg_wll_Error(ctx,NULL,NULL);
}
-#ifndef LB_DAG_EMBRIONIC
-int register_subjobs(edg_wll_Context ctx,const edg_wll_RegJobEvent *e)
-{
- int i,err;
- edg_wlc_JobId *subjobs;
- struct timeval now;
-
- edg_wll_ResetError(ctx);
- if (e->nsubjobs == 0) return 0;
- if (e->nsubjobs < 0) return edg_wll_SetError(ctx,EINVAL,"negative number of subjobs");
-
- if ((err = edg_wll_GenerateSubjobIds(ctx,e->jobId,e->nsubjobs,e->seed,&subjobs)))
- return err;
-
- gettimeofday(&now,NULL);
-
-/* XXX: increase the overall request timeout. */
- ctx->p_tmp_timeout.tv_sec += e->nsubjobs;
- if (ctx->p_tmp_timeout.tv_sec > 86400) ctx->p_tmp_timeout.tv_sec = 86400;
-
- for (i=0; i<e->nsubjobs; i++) {
- edg_wll_Event e2;
- int seq;
- char *et,*ed,*job_s;
-
- memset(&e2,0,sizeof e2);
- e2.type = EDG_WLL_EVENT_REGJOB;
- e2.any.jobId = subjobs[i]; subjobs[i] = NULL;
- memcpy(&e2.regJob.timestamp,&now,sizeof now);
- e2.any.host = strdup(ctx->srvName);
- e2.any.level = e->level;
- e2.any.priority = e->priority;
- e2.any.seqcode = strdup(EDG_WLL_SEQ_NULL);
- e2.any.user = strdup(e->user);
- e2.any.source = e->source;
- e2.any.src_instance = strdup(ctx->isProxy ?
- "L&B proxy" : "L&B server");
- e2.regJob.ns = strdup(e->ns);
- edg_wlc_JobIdDup(e->jobId,&e2.regJob.parent);
- e2.regJob.jobtype = EDG_WLL_REGJOB_SIMPLE;
- e2.regJob.jdl = strdup("");
-
- switch (edg_wll_StoreEvent(ctx,&e2,NULL,&seq)) {
-
- case 0: break;
- /* maybe some non-ignorable errors should be handled here */
-
- default:
- edg_wll_Error(ctx,&et,&ed);
- job_s = edg_wlc_JobIdUnparse(e2.any.jobId);
- fprintf(stderr,"register subjob %s: %s (%s)\n",job_s,et,ed);
- syslog(LOG_ERR,"register subjob %s: %s (%s)",job_s,et,ed);
- free(job_s); free(et); free(ed);
- edg_wll_FreeEvent(&e2);
- edg_wll_ResetError(ctx);
- continue;
- }
-
- if (edg_wll_LockJob(ctx,e2.any.jobId)) {
- job_s = edg_wlc_JobIdUnparse(e2.any.jobId);
- fprintf(stderr,"lock job %s: %s (%s)\n",job_s,et,ed);
- syslog(LOG_ERR,"lock job %s: %s (%s)",job_s,et,ed);
- free(job_s); free(et); free(ed);
- edg_wll_FreeEvent(&e2);
- edg_wll_ResetError(ctx);
- continue;
- }
-
- if ((err = edg_wll_StepIntState(ctx,e2.any.jobId,&e2,seq,NULL)))
- edg_wll_Error(ctx,&et,&ed);
-
- edg_wll_UnlockJob(ctx,e2.any.jobId);
- edg_wll_ResetError(ctx);
-
- if (err) {
- job_s = edg_wlc_JobIdUnparse(e2.any.jobId);
- fprintf(stderr,"%s: %s (%s)\n",job_s,et,ed);
- syslog(LOG_ERR,"%s: %s (%s)",job_s,et,ed);
- free(job_s); free(et); free(ed);
- edg_wll_ResetError(ctx);
- }
-
- edg_wll_FreeEvent(&e2);
- }
-
- free(subjobs);
- return edg_wll_Error(ctx,NULL,NULL);
-}
-#endif
/*
struct timeval now;
char *jobid = NULL, *jobid_md5 = NULL, *jobid_md5_old = NULL;
size_t jobid_len;
-#ifdef LB_BUF
- glite_lbu_bufInsert bi_j;
- glite_lbu_bufInsert *bi_jobs = &bi_j;
- char *states_cols = NULL;
-#endif
glite_lbu_bufInsert bi_s, *bi_states = &bi_s;
char *icnames = NULL, *values = NULL, *userid = NULL, *stmt = NULL;
int server, proxy, membership = 0;
jobid_len = strlen(jobid_md5_old);
-#ifdef LB_BUF
- /* init multirows insert mechanism for tables used here */
- if (edg_wll_bufferedInsertInit(ctx, bi_jobs, NULL, "jobs", 4000, 1000,
- "jobid, dg_jobid, userid, proxy, server"))
- {
- return edg_wll_SetError(ctx, EINVAL, "edg_wll_bufferedInsertInit()");
- }
-
- asprintf(&states_cols,"jobid, status, seq,int_status, version, parent_job%s", icnames);
- if (edg_wll_bufferedInsertInit(ctx, bi_states, NULL, "states", 4000, 1000, states_cols))
- {
- return edg_wll_SetError(ctx, EINVAL, "edg_wll_bufferedInsertInit()");
- }
- free(states_cols);
-#endif
-
-
gettimeofday(&now,NULL);
/* increase the overall request timeout. */
char *et,*ed,*job_s,*p,*p1;
/* save jobid-userid relation into jobs table */
-#ifdef LB_BUF
- if ((err = store_job_block(ctx, subjobs[i], userid, bi_jobs, proxy, server)))
-#else
if ((err = store_job(ctx, subjobs[i], userid, proxy, server, 0, 0)))
-#endif
if (edg_wll_Error(ctx,&et,&ed) == EDEADLOCK) goto err;
/* interchange variable parts (jobids) in values */
if (sh) glite_lbu_FreeStmt(&sh);
free(stmt);
-#ifdef LB_BUF
- /* commit the rest of multirows insert and clean structures */
- edg_wll_bufferedInsertClose(bi_jobs);
- edg_wll_bufferedInsertClose(bi_states);
-#endif
-
return edg_wll_Error(ctx,NULL,NULL);
}