From 07ce665b9a5972cfab46c0e808555bd2a85eadd2 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Ale=C5=A1=20K=C5=99enek?= Date: Fri, 23 Feb 2007 13:10:28 +0000 Subject: [PATCH] grey jobs --- org.glite.lb.server/config/glite-lb-dbsetup.sql | 9 ++++ org.glite.lb.server/src/bkserverd.c | 8 ++- org.glite.lb.server/src/jobstat.h | 1 + org.glite.lb.server/src/process_event.c | 1 + org.glite.lb.server/src/store.c.T | 67 +++++++++++++++++++++++-- 5 files changed, 81 insertions(+), 5 deletions(-) diff --git a/org.glite.lb.server/config/glite-lb-dbsetup.sql b/org.glite.lb.server/config/glite-lb-dbsetup.sql index 81266fe..6e512be 100644 --- a/org.glite.lb.server/config/glite-lb-dbsetup.sql +++ b/org.glite.lb.server/config/glite-lb-dbsetup.sql @@ -9,6 +9,15 @@ create table jobs ( index (userid) ) engine=innodb; +create table grey_jobs ( + jobid char(32) binary not null, + dg_jobid varchar(255) binary not null, + time_stamp datetime not null, + + primary key (jobid), + unique (dg_jobid) +) engine=innodb; + create table users ( userid char(32) binary not null, cert_subj varchar(255) binary not null, diff --git a/org.glite.lb.server/src/bkserverd.c b/org.glite.lb.server/src/bkserverd.c index 00ccabf..97d5484 100644 --- a/org.glite.lb.server/src/bkserverd.c +++ b/org.glite.lb.server/src/bkserverd.c @@ -117,6 +117,7 @@ static const int one = 1; static int noAuth = 0; static int noIndex = 0; static int strict_locking = 0; +static int greyjobs = 0; static int count_statistics = 0; static int hardJobsLimit = 0; static int hardEventsLimit = 0; @@ -182,10 +183,11 @@ static struct option opts[] = { {"perf-sink", 1, NULL, 'K'}, #endif {"transactions", 1, NULL, 'b'}, + {"greyjobs", 0, NULL, 'g'}, {NULL,0,NULL,0} }; -static const char *get_opt_string = "a:c:k:C:V:p:drm:ns:l:L:N:i:S:D:X:Y:T:t:J:jzb:" +static const char *get_opt_string = "a:c:k:C:V:p:drm:ns:l:L:N:i:S:D:X:Y:T:t:J:jzb:g" #ifdef GLITE_LB_SERVER_WITH_WS "w:" #endif @@ -234,6 +236,7 @@ static void usage(char *me) #ifdef LB_PERF "\t--perf-sink\t where to sink events\n" #endif + "\t-g,--greyjobs\t allow delayed registration (grey jobs), implies --strict-locking\n" ,me); } @@ -400,6 +403,8 @@ int main(int argc, char *argv[]) case 'K': sink_mode = atoi(optarg); break; #endif + case 'g': greyjobs = strict_locking = 1; + break; case '?': usage(name); return 1; } @@ -947,6 +952,7 @@ int bk_handle_connection(int conn, struct timeval *timeout, void *data) case 2: ctx->noIndex = 1; break; } ctx->strict_locking = strict_locking; + ctx->greyjobs = greyjobs; return 0; diff --git a/org.glite.lb.server/src/jobstat.h b/org.glite.lb.server/src/jobstat.h index 9fc07ab..f21bbdd 100644 --- a/org.glite.lb.server/src/jobstat.h +++ b/org.glite.lb.server/src/jobstat.h @@ -27,6 +27,7 @@ #define RET_BADBRANCH 7 #define RET_GOODBRANCH 8 #define RET_TOOOLD 9 +#define RET_UNREG 10 #define RET_INTERNAL 100 diff --git a/org.glite.lb.server/src/process_event.c b/org.glite.lb.server/src/process_event.c index 7133219..de0b038 100644 --- a/org.glite.lb.server/src/process_event.c +++ b/org.glite.lb.server/src/process_event.c @@ -52,6 +52,7 @@ int processEvent(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict, char return processEvent_glite(js,e,ev_seq,strict,errstring); case EDG_WLL_STAT_PBS: return processEvent_PBS(js,e,ev_seq,strict,errstring); + case -1: return RET_UNREG; default: asprintf(errstring,"undefined job type %d",js->pub.jobtype); return RET_FAIL; diff --git a/org.glite.lb.server/src/store.c.T b/org.glite.lb.server/src/store.c.T index f24dd2f..cdaa0e4 100644 --- a/org.glite.lb.server/src/store.c.T +++ b/org.glite.lb.server/src/store.c.T @@ -36,12 +36,17 @@ static int store_user(edg_wll_Context,const char *,const char *); static int store_job(edg_wll_Context,const edg_wlc_JobId,const char *); +#ifdef LB_BUF static int store_job_block(edg_wll_Context, const edg_wlc_JobId, const char *, edg_wll_bufInsert *); +#endif +static int store_job_grey(edg_wll_Context,const edg_wlc_JobId,time_t); static int store_flesh(edg_wll_Context,edg_wll_Event *,char *,int); static int store_seq(edg_wll_Context,edg_wll_Event *,int); static int check_dup(edg_wll_Context,edg_wll_Event *); static int check_auth(edg_wll_Context,edg_wll_Event *e); +#ifndef LB_DAG_EMBRIONIC static int register_subjobs(edg_wll_Context,const edg_wll_RegJobEvent *); +#endif static int register_subjobs_embryonic(edg_wll_Context,const edg_wll_RegJobEvent *); void edg_wll_StoreAnonymous(edg_wll_Context ctx,int anon) { @@ -69,7 +74,16 @@ int edg_wll_StoreEvent(edg_wll_Context ctx,edg_wll_Event *e,int *seq) switch (err = check_auth(ctx,e)) { case 0: break; case ENOENT: - if ( !ctx->isProxy ) goto clean; + if ( !ctx->isProxy ) { + if (ctx->greyjobs) { + edg_wll_ResetError(ctx); + if (store_job_grey(ctx,e->any.jobId,e->any.timestamp.tv_sec)) + goto clean; + break; + } + else goto clean; + } + edg_wll_ResetError(ctx); lbproxy_notreg = 1; break; @@ -79,6 +93,8 @@ int edg_wll_StoreEvent(edg_wll_Context ctx,edg_wll_Event *e,int *seq) break; default: goto clean; } + +/* FIXME: does not work for grey jobs due to "select from jobs" -- I don't care for the time being */ if ((err = check_dup(ctx,e))) goto clean; userid = strdup(strmd5(e->any.user,NULL)); @@ -259,11 +275,19 @@ static int store_job(edg_wll_Context ctx,const edg_wlc_JobId job,const char *use edg_wll_ResetError(ctx); } free(stmt); + + if (ctx->greyjobs) { + trio_asprintf(&stmt,"delete from grey_jobs where jobid = '%|Ss'",jobid); + edg_wll_ExecStmt(ctx,stmt,NULL); /* XXX: error propagates */ + free(stmt); + } + free(jobstr); free(jobid); return edg_wll_Error(ctx,NULL,NULL); } +#ifdef LB_BUF static int store_job_block(edg_wll_Context ctx,const edg_wlc_JobId job,const char *userid, edg_wll_bufInsert *bi) { char *jobstr = edg_wlc_JobIdUnparse(job); @@ -272,6 +296,9 @@ static int store_job_block(edg_wll_Context ctx,const edg_wlc_JobId job,const cha /* debug Duplicate key on index: Duplicate entry '(nil)' for key 1 */ + + assert(!ctx->greyjobs); /* XXX: should not happen */ + if (jobid == NULL || jobstr == NULL) return edg_wll_SetError(ctx,EINVAL,"store_jobi_block()"); @@ -284,6 +311,32 @@ static int store_job_block(edg_wll_Context ctx,const edg_wlc_JobId job,const cha free(jobid); return edg_wll_Error(ctx,NULL,NULL); } +#endif + +static int store_job_grey(edg_wll_Context ctx,const edg_wlc_JobId job,time_t etime) +{ + char *jobstr = edg_wlc_JobIdUnparse(job); + char *jobid = edg_wlc_JobIdGetUnique(job); + char *stmt; + + if (jobid == NULL || jobstr == NULL) + return edg_wll_SetError(ctx,EINVAL,"store_job_grey()"); + + edg_wll_ResetError(ctx); + trio_asprintf(&stmt,"insert into grey_jobs(jobid,dg_jobid,time_stamp) " + "values ('%|Ss','%|Ss',%s)", + jobid,jobstr,edg_wll_TimeToDB(etime)); + + if (edg_wll_ExecStmt(ctx,stmt,NULL) < 0) { + if (edg_wll_Error(ctx,NULL,NULL) == EEXIST) + edg_wll_ResetError(ctx); + } + + free(stmt); + free(jobstr); + free(jobid); + return edg_wll_Error(ctx,NULL,NULL); +} /* * XXX: store it in SHORT_FIELDS for now despite it should go to dedicated @@ -506,6 +559,7 @@ clean: return edg_wll_Error(ctx,NULL,NULL); } +#ifndef LB_DAG_EMBRIONIC static int register_subjobs(edg_wll_Context ctx,const edg_wll_RegJobEvent *e) { int i,err; @@ -593,6 +647,7 @@ static int register_subjobs(edg_wll_Context ctx,const edg_wll_RegJobEvent *e) free(subjobs); return edg_wll_Error(ctx,NULL,NULL); } +#endif /* @@ -648,10 +703,14 @@ static int register_subjobs_embryonic(edg_wll_Context ctx,const edg_wll_RegJobEv edg_wlc_JobId *subjobs; struct timeval now; char *userid = strdup(strmd5(e->user,NULL)); - char *jobid_md5, *jobid_md5_old, *states_cols; + char *jobid_md5, *jobid_md5_old; size_t jobid_len; - edg_wll_bufInsert bi_j, bi_s; - edg_wll_bufInsert *bi_jobs = &bi_j, *bi_states = &bi_s; +#ifdef LB_BUF + edg_wll_bufInsert bi_j; + edg_wll_bufInsert *bi_jobs = &bi_j; + char *states_cols; +#endif + edg_wll_bufInsert bi_s, *bi_states = &bi_s; char *icnames, *values; -- 1.8.2.3