From a70ccf29b481153acaed3eba6a0daa735364586f Mon Sep 17 00:00:00 2001 From: =?utf8?q?Milo=C5=A1=20Mula=C4=8D?= Date: Thu, 14 Feb 2008 10:32:08 +0000 Subject: [PATCH] switch to transactions - compiles, but not completed yet - no tests, only for sharing --- org.glite.lb.server/config/glite-lb-dbsetup.sql | 13 +- org.glite.lb.server/interface/store.h | 9 +- org.glite.lb.server/src/db_calls.c | 41 +- org.glite.lb.server/src/db_calls.h | 7 +- org.glite.lb.server/src/db_store.c | 260 +++++------ org.glite.lb.server/src/jobstat.c | 394 ++++++++--------- org.glite.lb.server/src/jobstat.h | 2 +- org.glite.lb.server/src/load.c | 24 +- org.glite.lb.server/src/notification.c | 353 +++++++-------- org.glite.lb.server/src/purge.h | 2 +- org.glite.lb.server/src/query.c | 2 +- org.glite.lb.server/src/srv_purge.c | 362 ++++++++-------- org.glite.lb.server/src/store.c.T | 546 +++++++++--------------- org.glite.lb.server/src/userjobs.c | 14 +- 14 files changed, 963 insertions(+), 1066 deletions(-) diff --git a/org.glite.lb.server/config/glite-lb-dbsetup.sql b/org.glite.lb.server/config/glite-lb-dbsetup.sql index 530353b..7ff36cb 100644 --- a/org.glite.lb.server/config/glite-lb-dbsetup.sql +++ b/org.glite.lb.server/config/glite-lb-dbsetup.sql @@ -5,22 +5,14 @@ create table jobs ( aclid char(32) binary null, proxy bool not null, server bool not null, - + grey bool not null, + nevents int not null, primary key (jobid), unique (dg_jobid), index (userid) ) engine=innodb; -create table grey_jobs ( - jobid char(32) binary not null, - dg_jobid varchar(255) binary not null, - time_stamp datetime not null, - - primary key (jobid), - unique (dg_jobid) -) engine=innodb; - create table users ( userid char(32) binary not null, cert_subj varchar(255) binary not null, @@ -42,6 +34,7 @@ create table events ( arrived datetime not null, ulm mediumblob not null, -- testing (1) + seqcode varchar(255) binary not null, primary key (jobid,event), index (time_stamp), diff --git a/org.glite.lb.server/interface/store.h b/org.glite.lb.server/interface/store.h index d3ec61c..9518e00 100644 --- a/org.glite.lb.server/interface/store.h +++ b/org.glite.lb.server/interface/store.h @@ -45,8 +45,13 @@ int db_store(edg_wll_Context, char *); int db_parent_store(edg_wll_Context, edg_wll_Event *, intJobStat *); int handle_request(edg_wll_Context,char *); int create_reply(const edg_wll_Context,char **); -int is_job_local(edg_wll_Context, edg_wlc_JobId jobId); -int store_job_server_proxy(edg_wll_Context ctx, edg_wll_Event *event); +int is_job_local(edg_wll_Context, glite_jobid_const_t jobId); +int store_job_server_proxy(edg_wll_Context ctx, edg_wll_Event *event, int *register_to_JP); +#ifndef LB_DAG_EMBRIONIC +int register_subjobs(edg_wll_Context,const edg_wll_RegJobEvent *); +#endif +int register_subjobs_embryonic(edg_wll_Context,const edg_wll_RegJobEvent *); + int edg_wll_delete_event(edg_wll_Context,const char *, int); diff --git a/org.glite.lb.server/src/db_calls.c b/org.glite.lb.server/src/db_calls.c index 203ac2b..970ce55 100644 --- a/org.glite.lb.server/src/db_calls.c +++ b/org.glite.lb.server/src/db_calls.c @@ -9,11 +9,11 @@ #include "glite/lb/context-int.h" #include "db_calls.h" -# include "db_supp.h" +#include "db_supp.h" /** Returns bitmask of job membership in common server/proxy database */ -int edg_wll_jobMembership(edg_wll_Context ctx, edg_wlc_JobId job) +int edg_wll_jobMembership(edg_wll_Context ctx, glite_jobid_const_t job) { char *dbjob; char *stmt = NULL; @@ -25,7 +25,7 @@ int edg_wll_jobMembership(edg_wll_Context ctx, edg_wlc_JobId job) dbjob = edg_wlc_JobIdGetUnique(job); - trio_asprintf(&stmt,"select proxy,server from jobs where jobid = '%|Ss'",dbjob); + trio_asprintf(&stmt,"select proxy,server from jobs where jobid = '%|Ss' for update",dbjob); ret = edg_wll_ExecSQL(ctx,stmt,&q); if (ret <= 0) { if (ret == 0) { @@ -53,3 +53,38 @@ clean: free(stmt); return(result); } + + +/* just lock one row corresponding to job in table jobs + * lock_mode: 0 = in share mode / 1 = for update + */ +int edg_wll_LockJobRow(edg_wll_Context ctx, glite_jobid_const_t job, int lock_mode) +{ + char *jobid_md5 = NULL; + char *stmt = NULL; + glite_lbu_Statement sh; + int nr; + + + edg_wll_ResetError(ctx); + jobid_md5 = edg_wlc_JobIdGetUnique(job); + + if (lock_mode) + trio_asprintf(&stmt, "select count(*) from jobs where jobid='%|Ss' for update", jobid_md5); + else + trio_asprintf(&stmt, "select count(*) from jobs where jobid='%|Ss' in share mode", jobid_md5); + + if ((nr = edg_wll_ExecSQL(ctx,stmt,&sh)) < 0) goto cleanup; + if (nr == 0) { + edg_wll_SetError(ctx,ENOENT,"no state in DB"); + goto cleanup; + } + +cleanup: + if (sh) glite_lbu_FreeStmt(&sh); + free(stmt); stmt = NULL; + free(jobid_md5); + + return edg_wll_Error(ctx, NULL, NULL); +} + diff --git a/org.glite.lb.server/src/db_calls.h b/org.glite.lb.server/src/db_calls.h index 1adcbcc..71a19da 100644 --- a/org.glite.lb.server/src/db_calls.h +++ b/org.glite.lb.server/src/db_calls.h @@ -6,6 +6,11 @@ #define DB_PROXY_JOB 1 #define DB_SERVER_JOB 2 -int edg_wll_jobMembership(edg_wll_Context ctx, edg_wlc_JobId job); +int edg_wll_jobMembership(edg_wll_Context ctx, glite_jobid_const_t job); + +#define edg_wll_LockJobRowInShareMode(X,Y) edg_wll_LockJobRow(X,Y,0) +#define edg_wll_LockJobRowForUpdate(X,Y) edg_wll_LockJobRow(X,Y,1) +int edg_wll_LockJobRow(edg_wll_Context ctx, glite_jobid_const_t job, int lock_mode); + #endif /* GLITE_LB_LB_CALLS_H */ diff --git a/org.glite.lb.server/src/db_store.c b/org.glite.lb.server/src/db_store.c index a0d0a2f..2654e34 100644 --- a/org.glite.lb.server/src/db_store.c +++ b/org.glite.lb.server/src/db_store.c @@ -12,9 +12,9 @@ #include "glite/lb/lb_maildir.h" #include "purge.h" #include "store.h" -#include "lock.h" #include "il_lbproxy.h" #include "jobstat.h" +#include "db_supp.h" #ifdef LB_PERF #include "glite/lb/lb_perftest.h" @@ -22,32 +22,25 @@ #endif -/* XXX */ -#define use_db 1 - extern int unset_proxy_flag(edg_wll_Context, edg_wlc_JobId); extern int edg_wll_NotifMatch(edg_wll_Context, const edg_wll_JobStat *); -static int db_store_finalize(edg_wll_Context ctx, char *event, edg_wll_Event *ev, edg_wll_JobStat *newstat, int seq, int reg_to_JP); +static int db_store_finalize(edg_wll_Context ctx, char *event, edg_wll_Event *ev, edg_wll_JobStat *newstat, int reg_to_JP); int db_store(edg_wll_Context ctx, char *event) { - edg_wll_Event *ev; - int seq, reg_to_JP = 0; - int err; - int local_job; + edg_wll_Event *ev = NULL; + int seq, reg_to_JP = 0, local_job; edg_wll_JobStat newstat; - ev = NULL; - edg_wll_ResetError(ctx); memset(&newstat,0,sizeof newstat); - if(edg_wll_ParseEvent(ctx, event, &ev)) - goto err; + if(edg_wll_ParseEvent(ctx, event, &ev)) goto err; + local_job = is_job_local(ctx, ev->any.jobId); #ifdef LB_PERF @@ -59,101 +52,86 @@ db_store(edg_wll_Context ctx, char *event) } #endif - if(use_db) { - char *ed; - int code; - - if (edg_wll_LockJob(ctx,ev->any.jobId)) goto err; - store_job_server_proxy(ctx, ev); - code = edg_wll_Error(ctx,NULL,&ed); - edg_wll_UnlockJob(ctx,ev->any.jobId); /* XXX: ignore error */ - if (code) { - edg_wll_SetError(ctx,code,ed); - free(ed); - goto err; - } - } + do { + if (edg_wll_Transaction(ctx)) goto err; + + if (store_job_server_proxy(ctx, ev, ®_to_JP)) goto rollback; + /* events logged to proxy and server (DIRECT flag) may be ignored on proxy + * if jobid prefix hostname matches server hostname -> they will + * sooner or later arrive to server too and are stored in common DB + */ + if (ctx->isProxy && local_job && (ev->any.priority & EDG_WLL_LOGFLAG_DIRECT)) { + goto commit; + } - /* events logged to proxy and server (DIRECT flag) may be ignored on proxy - * if jobid prefix hostname matches server hostname -> they will - * sooner or later arrive to server too and are stored in common DB - */ - if (ctx->isProxy && local_job) { - if (ev->any.priority & EDG_WLL_LOGFLAG_DIRECT) { - edg_wll_FreeEvent(ev); - free(ev); - return 0; + if (edg_wll_StoreEvent(ctx, ev, event, &seq)) goto rollback; + + if ( ev->any.type == EDG_WLL_EVENT_CHANGEACL ) { + if (edg_wll_UpdateACL(ctx, ev->any.jobId, + ev->changeACL.user_id, ev->changeACL.user_id_type, + ev->changeACL.permission, ev->changeACL.permission_type, + ev->changeACL.operation)) goto rollback; + } else { - /* these are re-registrations of subjobs on proxy */ - /* embryonic registrations does not trigger registration in JP */ - reg_to_JP = 1; +#ifdef LB_PERF + if(sink_mode == GLITE_LB_SINK_STATE) { + glite_wll_perftest_consumeEvent(ev); + goto commit; + } +#endif + + if ( newstat.state ) { /* prevent memleaks in case of transaction retry */ + edg_wll_FreeStatus(&newstat); + newstat.state = EDG_WLL_JOB_UNDEF; + } + if (edg_wll_StepIntState(ctx,ev->any.jobId, ev, seq, &newstat)) goto rollback; + + if (newstat.remove_from_proxy) + if (edg_wll_PurgeServerProxy(ctx, ev->any.jobId)) goto rollback; } - } - /* XXX: if event type is user tag, convert the tag name to lowercase! - * (not sure whether to convert a value too is reasonable - * or keep it 'case sensitive') - */ - if ( ev->any.type == EDG_WLL_EVENT_USERTAG ) - { - int i; - for ( i = 0; ev->userTag.name[i] != '\0'; i++ ) - ev->userTag.name[i] = tolower(ev->userTag.name[i]); - } - - if(use_db) { - if (ctx->strict_locking && edg_wll_LockJob(ctx,ev->any.jobId)) goto err; - if(edg_wll_StoreEvent(ctx, ev, event, &seq)) { - edg_wll_UnlockJob(ctx,ev->any.jobId); - goto err; - } - } +commit: +rollback:; + } while (edg_wll_TransNeedRetry(ctx)); + + if (edg_wll_Error(ctx, NULL, NULL)) goto err; - if (!ctx->strict_locking && edg_wll_LockJob(ctx,ev->any.jobId)) goto err; - if ( ev->any.type == EDG_WLL_EVENT_CHANGEACL ) { - err = edg_wll_UpdateACL(ctx, ev->any.jobId, - ev->changeACL.user_id, ev->changeACL.user_id_type, - ev->changeACL.permission, ev->changeACL.permission_type, - ev->changeACL.operation); + do { + if (edg_wll_Transaction(ctx)) goto err; - edg_wll_UnlockJob(ctx,ev->any.jobId); - } - else { -#ifdef LB_PERF - if(sink_mode == GLITE_LB_SINK_STATE) { - glite_wll_perftest_consumeEvent(ev); - edg_wll_UnlockJob(ctx,ev->any.jobId); - goto err; - } + if (ev->any.type == EDG_WLL_EVENT_REGJOB && + (ev->regJob.jobtype == EDG_WLL_REGJOB_DAG || + ev->regJob.jobtype == EDG_WLL_REGJOB_PARTITIONED || + ev->regJob.jobtype == EDG_WLL_REGJOB_COLLECTION) && + ev->regJob.nsubjobs > 0) + +#ifdef LB_DAG_EMBRIONIC + if (register_subjobs_embryonic(ctx,&ev->regJob)) goto rollback2; +#else + if (register_subjobs(ctx,&ev->regJob)) goto rollback2; #endif - err = edg_wll_StepIntState(ctx,ev->any.jobId, ev, seq, &newstat); - } +rollback2:; + } while (edg_wll_TransNeedRetry(ctx)); - /* XXX: in edg_wll_StepIntState() - * if (edg_wll_UnlockJob(ctx,ev->any.jobId)) goto err; - */ - if (err) goto err; + if (edg_wll_Error(ctx, NULL, NULL)) goto err; - db_store_finalize(ctx, event, ev, &newstat, seq, reg_to_JP); -err: + db_store_finalize(ctx, event, ev, &newstat, reg_to_JP); - if(ev) { - edg_wll_FreeEvent(ev); - free(ev); - } +err: + if(ev) { edg_wll_FreeEvent(ev); free(ev); } if ( newstat.state ) edg_wll_FreeStatus(&newstat); - return edg_wll_Error(ctx,NULL,NULL); } + /* Called only when CollectionStateEvent generated */ int db_parent_store(edg_wll_Context ctx, edg_wll_Event *ev, intJobStat *is) @@ -167,7 +145,7 @@ db_parent_store(edg_wll_Context ctx, edg_wll_Event *ev, intJobStat *is) edg_wll_ResetError(ctx); memset(&newstat,0,sizeof newstat); - /* Locked from load_parent_intJobStat() */ + /* Transaction opened from db_store */ #ifdef LB_PERF if (sink_mode == GLITE_LB_SINK_STORE) { @@ -181,10 +159,8 @@ db_parent_store(edg_wll_Context ctx, edg_wll_Event *ev, intJobStat *is) assert(ev->any.user); - if(use_db) { if(edg_wll_StoreEvent(ctx, ev, NULL, &seq)) goto err; - } #ifdef LB_PERF if(sink_mode == GLITE_LB_SINK_STATE) { @@ -202,7 +178,7 @@ db_parent_store(edg_wll_Context ctx, edg_wll_Event *ev, intJobStat *is) assert(event); } - db_store_finalize(ctx, event, ev, &newstat, seq, 0); + db_store_finalize(ctx, event, ev, &newstat, 0); err: @@ -213,42 +189,33 @@ err: } - -static int db_store_finalize(edg_wll_Context ctx, char *event, edg_wll_Event *ev, edg_wll_JobStat *newstat, int seq, int reg_to_JP) +/* Send regitration to JP + */ +static int register_to_JP(edg_wll_Context ctx, edg_wll_Event *ev) { - int local_job = is_job_local(ctx, ev->any.jobId); - - -#ifdef LB_PERF - if( sink_mode == GLITE_LB_SINK_SEND ) { - glite_wll_perftest_consumeEvent(ev); - return edg_wll_Error(ctx,NULL,NULL); - } -#endif + char *jids, *msg; - /* Send regitration to JP - */ - if ( ctx->jpreg_dir && ev->any.type == EDG_WLL_EVENT_REGJOB && seq == 0 && - (!ctx->isProxy || reg_to_JP) ) { - char *jids, *msg; - - if ( !(jids = edg_wlc_JobIdUnparse(ev->any.jobId)) ) { - return edg_wll_SetError(ctx, errno, "Can't unparse jobid when registering to JP"); - } - if ( !(msg = realloc(jids, strlen(jids)+strlen(ev->any.user)+2)) ) { - free(jids); - return edg_wll_SetError(ctx, errno, "Can't allocate buffer when registering to JP"); - } - strcat(msg, "\n"); - strcat(msg, ev->any.user); - if ( edg_wll_MaildirStoreMsg(ctx->jpreg_dir, ctx->srvName, msg) ) { - free(msg); - return edg_wll_SetError(ctx, errno, lbm_errdesc); - } + if ( !(jids = edg_wlc_JobIdUnparse(ev->any.jobId)) ) { + return edg_wll_SetError(ctx, errno, "Can't unparse jobid when registering to JP"); + } + if ( !(msg = realloc(jids, strlen(jids)+strlen(ev->any.user)+2)) ) { + free(jids); + return edg_wll_SetError(ctx, errno, "Can't allocate buffer when registering to JP"); + } + strcat(msg, "\n"); + strcat(msg, ev->any.user); + if ( edg_wll_MaildirStoreMsg(ctx->jpreg_dir, ctx->srvName, msg) ) { free(msg); + return edg_wll_SetError(ctx, errno, lbm_errdesc); } + free(msg); + + return edg_wll_Error(ctx,NULL,NULL); +} +static int forward_event_to_server(edg_wll_Context ctx, char *event, edg_wll_Event *ev, int local_job) +{ if ( ctx->isProxy ) { /* * send event to the proper BK server @@ -266,32 +233,41 @@ static int db_store_finalize(edg_wll_Context ctx, char *event, edg_wll_Event *ev return edg_wll_SetError(ctx, EDG_WLL_IL_PROTO, "edg_wll_EventSendProxy() error."); } } - else { - /* event will not arrive to server, only flag was set */ - /* check whether some pending notifications are not triggered */ - if ( newstat->state ) { + } + + return edg_wll_Error(ctx,NULL,NULL); +} + + +static int db_store_finalize(edg_wll_Context ctx, char *event, edg_wll_Event *ev, edg_wll_JobStat *newstat, int reg_to_JP) +{ + int local_job = is_job_local(ctx, ev->any.jobId); + + +#ifdef LB_PERF + if( sink_mode == GLITE_LB_SINK_SEND ) { + glite_wll_perftest_consumeEvent(ev); + return edg_wll_Error(ctx,NULL,NULL); + } +#endif + + if (reg_to_JP) + if (register_to_JP(ctx,ev)) goto err; + + if (forward_event_to_server(ctx, event, ev, local_job)) goto err; + + if (newstat->state) { + if ( ctx->isProxy ) { + if ((ev->any.priority & EDG_WLL_LOGFLAG_DIRECT) || local_job) + /* event will not arrive to server, only flag was set */ + /* check whether some pending notifications are not triggered */ edg_wll_NotifMatch(ctx, newstat); } - } - - /* LB proxy purge */ - if (newstat->remove_from_proxy) { - edg_wll_PurgeServerProxy(ctx, ev->any.jobId); - } - } else - { - /* Purge proxy flag */ - if ( newstat->remove_from_proxy && local_job ) { - if (unset_proxy_flag(ctx, ev->any.jobId) < 0) { - return(edg_wll_Error(ctx,NULL,NULL)); - } - } - - if ( newstat->state ) { - edg_wll_NotifMatch(ctx, newstat); + else { + edg_wll_NotifMatch(ctx, newstat); } } - +err: return edg_wll_Error(ctx,NULL,NULL); } diff --git a/org.glite.lb.server/src/jobstat.c b/org.glite.lb.server/src/jobstat.c index 42192dd..b02e79c 100644 --- a/org.glite.lb.server/src/jobstat.c +++ b/org.glite.lb.server/src/jobstat.c @@ -22,9 +22,13 @@ #include "lb_authz.h" #include "stats.h" #include "db_supp.h" +#include "db_calls.h" #define DAG_ENABLE 1 +#define DONT_LOCK 0 +#define LOCK 1 + /* TBD: share in whole logging or workload */ #ifdef __GNUC__ #define UNUSED_VAR __attribute__((unused)) @@ -74,245 +78,256 @@ int edg_wll_JobStatusServer( { /* Local variables */ - char *string_jobid; - char *md5_jobid; + char *string_jobid = NULL; + char *md5_jobid = NULL; intJobStat jobstat; intJobStat *ijsp; - int intErr = 0; - int lockErr; + int whole_cycle; edg_wll_Acl acl = NULL; #if DAG_ENABLE char *stmt = NULL; #endif - char *errdesc = NULL; - //The following declarations have originally been positioned in the funcion's code - //That was rather messy and lead to redeclaratios :-( char *stat_str, *s_out; intJobStat *js; char *out[1]; - glite_lbu_Statement sh; + glite_lbu_Statement sh = NULL; int num_sub, num_f, i, ii; + edg_wll_ResetError(ctx); + jobstat.pub.owner = NULL; string_jobid = edg_wlc_JobIdUnparse(job); if (string_jobid == NULL || stat == NULL) return edg_wll_SetError(ctx,EINVAL, NULL); md5_jobid = edg_wlc_JobIdGetUnique(job); - if ( !(jobstat.pub.owner = job_owner(ctx,md5_jobid)) ) { - free(md5_jobid); - free(string_jobid); - return edg_wll_Error(ctx,NULL,NULL); - } + do { + whole_cycle = 0; - intErr = edg_wll_GetACL(ctx, job, &acl); - if (intErr) { - free(md5_jobid); - free(string_jobid); - free(jobstat.pub.owner); - return edg_wll_Error(ctx,NULL,NULL); - } + if (edg_wll_Transaction(ctx)) goto rollback; + if (edg_wll_LockJobRowInShareMode(ctx, job)) goto rollback;; - /* authorization check */ - if ( !(ctx->noAuth) && - (!(ctx->peerName) || !edg_wll_gss_equal_subj(ctx->peerName, jobstat.pub.owner))) { - intErr = (acl == NULL) || edg_wll_CheckACL(ctx, acl, EDG_WLL_PERM_READ); - if (intErr) { - free(string_jobid); - free(md5_jobid); - free(jobstat.pub.owner); jobstat.pub.owner = NULL; - if (acl) { - edg_wll_FreeAcl(acl); - return edg_wll_Error(ctx, NULL, NULL); - } else { - return edg_wll_SetError(ctx,EPERM, "not owner, no ACL is set"); - } - } - } - intErr = edg_wll_LoadIntState(ctx, job, -1 /*all events*/, &ijsp); - if (!intErr) { - *stat = ijsp->pub; - free(jobstat.pub.owner); jobstat.pub.owner = NULL; - destroy_intJobStat_extension(ijsp); - free(ijsp); + if (edg_wll_GetACL(ctx, job, &acl)) goto rollback; - } else { - lockErr = edg_wll_LockJob(ctx,job); - intErr = edg_wll_intJobStatus(ctx, job, flags,&jobstat, js_enable_store && !lockErr); - if (intErr) edg_wll_Error(ctx, NULL, &errdesc); - if (!lockErr) { - edg_wll_UnlockJob(ctx,job); + /* authorization check */ + if ( !(ctx->noAuth) && + (!(ctx->peerName) || !edg_wll_gss_equal_subj(ctx->peerName, jobstat.pub.owner))) { + if ((acl == NULL) || edg_wll_CheckACL(ctx, acl, EDG_WLL_PERM_READ)) { + if (acl) { + goto rollback; + } else { + edg_wll_SetError(ctx,EPERM, "not owner, no ACL is set"); + goto rollback; + } + } } - - *stat = jobstat.pub; - if (intErr) edg_wll_FreeStatus(&jobstat.pub); - destroy_intJobStat_extension(&jobstat); - } - - if (intErr) { - free(string_jobid); - free(md5_jobid); - if (acl) edg_wll_FreeAcl(acl); - edg_wll_SetError(ctx, intErr, errdesc); - free(errdesc); - return edg_wll_UpdateError(ctx, EDG_WLL_ERROR_SERVER_RESPONSE, "Could not compute job status from events"); - } - if (acl) { - stat->acl = strdup(acl->string); - edg_wll_FreeAcl(acl); - } + if (!edg_wll_LoadIntState(ctx, job, DONT_LOCK, -1 /*all events*/, &ijsp)) { + *stat = ijsp->pub; + free(jobstat.pub.owner); jobstat.pub.owner = NULL; + destroy_intJobStat_extension(ijsp); + free(ijsp); + } else { + if (edg_wll_intJobStatus(ctx, job, flags,&jobstat, js_enable_store)) { + edg_wll_UpdateError(ctx, EDG_WLL_ERROR_SERVER_RESPONSE, "Could not compute job status from events"); + goto rollback; + } + *stat = jobstat.pub; + } + + if (acl) { + stat->acl = strdup(acl->string); + edg_wll_FreeAcl(acl); + } - if ((flags & EDG_WLL_STAT_CLASSADS) == 0) { - char *null = NULL; + if ((flags & EDG_WLL_STAT_CLASSADS) == 0) { + char *null = NULL; - mov(stat->jdl, null); - mov(stat->matched_jdl, null); - mov(stat->condor_jdl, null); - mov(stat->rsl, null); - } + mov(stat->jdl, null); + mov(stat->matched_jdl, null); + mov(stat->condor_jdl, null); + mov(stat->rsl, null); + } -#if DAG_ENABLE - if (stat->jobtype == EDG_WLL_STAT_DAG || stat->jobtype == EDG_WLL_STAT_COLLECTION) { + #if DAG_ENABLE + if (stat->jobtype == EDG_WLL_STAT_DAG || stat->jobtype == EDG_WLL_STAT_COLLECTION) { -// XXX: The users does not want any histogram. What do we do about it? -// if ((!(flags & EDG_WLL_STAT_CHILDHIST_FAST))&&(!(flags & EDG_WLL_STAT_CHILDHIST_THOROUGH))) { /* No Histogram */ -// if (stat->children_hist != NULL) { /* No histogram will be sent even if there was one */ -// -// printf("\nNo Histogram required\n\n"); -// -// free(stat->children_hist); -// } -// -// } + // XXX: The users does not want any histogram. What do we do about it? + // if ((!(flags & EDG_WLL_STAT_CHILDHIST_FAST))&&(!(flags & EDG_WLL_STAT_CHILDHIST_THOROUGH))) { /* No Histogram */ + // if (stat->children_hist != NULL) { /* No histogram will be sent even if there was one */ + // + // printf("\nNo Histogram required\n\n"); + // + // free(stat->children_hist); + // } + // + // } - if (flags & EDG_WLL_STAT_CHILDSTAT) { + if (flags & EDG_WLL_STAT_CHILDSTAT) { - trio_asprintf(&stmt, "SELECT int_status FROM states WHERE parent_job='%|Ss'" - " AND version='%|Ss'", - md5_jobid, INTSTAT_VERSION); - if (stmt != NULL) { - num_sub = edg_wll_ExecSQL(ctx, stmt, &sh); - if (num_sub >=0 ) { - i = 0; - stat->children_states = calloc(num_sub+1, sizeof(edg_wll_JobStat)); - if (stat->children_states == NULL) { - glite_lbu_FreeStmt(&sh); - goto dag_enomem; - } - while ((num_f = edg_wll_FetchRow(ctx, sh, 1, NULL, &stat_str)) == 1 - && i < num_sub) { - js = dec_intJobStat(stat_str, &s_out); - if (s_out != NULL && js != NULL) { - stat->children_states[i] = js->pub; - destroy_intJobStat_extension(js); - free(js); - i++; // Careful, this value will also be used further + trio_asprintf(&stmt, "SELECT int_status FROM states WHERE parent_job='%|Ss'" + " AND version='%|Ss'", + md5_jobid, INTSTAT_VERSION); + if (stmt != NULL) { + num_sub = edg_wll_ExecSQL(ctx, stmt, &sh); + if (num_sub >=0 ) { + i = 0; + stat->children_states = calloc(num_sub+1, sizeof(edg_wll_JobStat)); + if (stat->children_states == NULL) { + edg_wll_SetError(ctx, ENOMEM, "edg_wll_JobStatusServer() calloc children_states failed!"); + goto rollback; } - free(stat_str); + while ((num_f = edg_wll_FetchRow(ctx, sh, 1, NULL, &stat_str)) == 1 + && i < num_sub) { + js = dec_intJobStat(stat_str, &s_out); + if (s_out != NULL && js != NULL) { + stat->children_states[i] = js->pub; + destroy_intJobStat_extension(js); + free(js); + i++; // Careful, this value will also be used further + } + free(stat_str); + } + if (num_f < 0) goto rollback; + + glite_lbu_FreeStmt(&sh); sh = NULL; } - glite_lbu_FreeStmt(&sh); + else goto rollback; + + free(stmt); stmt = NULL; + } else { + edg_wll_SetError(ctx, ENOMEM, "edg_wll_JobStatusServer() trio_asprintf failed!"); + goto rollback; } - free(stmt); - } else goto dag_enomem; - } + } - if (flags & EDG_WLL_STAT_CHILDHIST_THOROUGH) { /* Full (thorough) Histogram */ + if (flags & EDG_WLL_STAT_CHILDHIST_THOROUGH) { /* Full (thorough) Histogram */ - if (stat->children_hist == NULL) { - stat->children_hist = (int*) calloc(1+EDG_WLL_NUMBER_OF_STATCODES, sizeof(int)); - stat->children_hist[0] = EDG_WLL_NUMBER_OF_STATCODES; - } - else { - /* If hist is loaded, it probably contain only incomplete histogram - * built in update_parent_status. Count it from scratch...*/ - for (ii=1; ii<=EDG_WLL_NUMBER_OF_STATCODES; ii++) - stat->children_hist[ii] = 0; - } + if (stat->children_hist == NULL) { + stat->children_hist = (int*) calloc(1+EDG_WLL_NUMBER_OF_STATCODES, sizeof(int)); + if (stat->children_hist == NULL) { + edg_wll_SetError(ctx, ENOMEM, "edg_wll_JobStatusServer() calloc children_hist failed!"); + goto rollback; + } - if (flags & EDG_WLL_STAT_CHILDSTAT) { // Job states have already been loaded - for ( ii = 0 ; ii < i ; ii++ ) { - stat->children_hist[(stat->children_states[ii].state)+1]++; + stat->children_hist[0] = EDG_WLL_NUMBER_OF_STATCODES; } - } - else { - // Get child states from the database - trio_asprintf(&stmt, "SELECT status FROM states WHERE parent_job='%|Ss' AND version='%|Ss'", - md5_jobid, INTSTAT_VERSION); - out[1] = NULL; - if (stmt != NULL) { - num_sub = edg_wll_ExecSQL(ctx, stmt, &sh); - if (num_sub >=0 ) { - while ((num_f = edg_wll_FetchRow(ctx, sh, sizeof(out)/sizeof(out[0]), NULL, out)) == 1 ) { - num_f = atoi(out[0]); - if (num_f > EDG_WLL_JOB_UNDEF && num_f < EDG_WLL_NUMBER_OF_STATCODES) - stat->children_hist[num_f+1]++; - free(out[0]); + else { + /* If hist is loaded, it probably contain only incomplete histogram + * built in update_parent_status. Count it from scratch...*/ + for (ii=1; ii<=EDG_WLL_NUMBER_OF_STATCODES; ii++) + stat->children_hist[ii] = 0; + } + + if (flags & EDG_WLL_STAT_CHILDSTAT) { // Job states have already been loaded + for ( ii = 0 ; ii < i ; ii++ ) { + stat->children_hist[(stat->children_states[ii].state)+1]++; + } + } + else { + // Get child states from the database + trio_asprintf(&stmt, "SELECT status FROM states WHERE parent_job='%|Ss' AND version='%|Ss'", + md5_jobid, INTSTAT_VERSION); + out[1] = NULL; + if (stmt != NULL) { + num_sub = edg_wll_ExecSQL(ctx, stmt, &sh); + if (num_sub >=0 ) { + while ((num_f = edg_wll_FetchRow(ctx, sh, sizeof(out)/sizeof(out[0]), NULL, out)) == 1 ) { + num_f = atoi(out[0]); + if (num_f > EDG_WLL_JOB_UNDEF && num_f < EDG_WLL_NUMBER_OF_STATCODES) + stat->children_hist[num_f+1]++; + free(out[0]); + } + if (num_f < 0) goto rollback; + + glite_lbu_FreeStmt(&sh); sh = NULL; } - glite_lbu_FreeStmt(&sh); + else goto rollback; + + free(stmt); stmt = NULL; + } else { + edg_wll_SetError(ctx, ENOMEM, "edg_wll_JobStatusServer() trio_asprintf failed!"); + goto rollback; } - free(stmt); - } else goto dag_enomem; - } - } - else { - if (flags & EDG_WLL_STAT_CHILDHIST_FAST) { /* Fast Histogram */ - - if (stat->children_hist == NULL) { - // If the histogram exists, assume that it was already filled during job state retrieval - stat->children_hist = (int*) calloc(1+EDG_WLL_NUMBER_OF_STATCODES, sizeof(int)); - edg_wll_GetSubjobHistogram(ctx, job, stat->children_hist); } } else { - if (stat->children_hist) { - free (stat->children_hist); - stat->children_hist = NULL; + if (flags & EDG_WLL_STAT_CHILDHIST_FAST) { /* Fast Histogram */ + + if (stat->children_hist == NULL) { + // If the histogram exists, assume that it was already filled during job state retrieval + stat->children_hist = (int*) calloc(1+EDG_WLL_NUMBER_OF_STATCODES, sizeof(int)); + if (stat->children_hist == NULL) { + edg_wll_SetError(ctx, ENOMEM, "edg_wll_JobStatusServer() calloc children_hist failed!"); + goto rollback; + } + + if (edg_wll_GetSubjobHistogram(ctx, job, stat->children_hist)) + goto rollback; + } + } + else { + if (stat->children_hist) { + free (stat->children_hist); + stat->children_hist = NULL; + } } + } - } + if (flags & EDG_WLL_STAT_CHILDREN) { - if (flags & EDG_WLL_STAT_CHILDREN) { + trio_asprintf(&stmt, "SELECT j.dg_jobid FROM states s,jobs j " + "WHERE s.parent_job='%|Ss' AND s.version='%|Ss' AND s.jobid=j.jobid", + md5_jobid, INTSTAT_VERSION); + if (stmt != NULL) { + num_sub = edg_wll_ExecSQL(ctx, stmt, &sh); + if (num_sub >=0 ) { + while ((num_f = edg_wll_FetchRow(ctx, sh, sizeof(out)/sizeof(out[0]), NULL, out)) == 1 ) { + add_stringlist(&stat->children, out[0]); + free(out[0]); + } + if (num_f < 0) goto rollback; - trio_asprintf(&stmt, "SELECT j.dg_jobid FROM states s,jobs j " - "WHERE s.parent_job='%|Ss' AND s.version='%|Ss' AND s.jobid=j.jobid", - md5_jobid, INTSTAT_VERSION); - if (stmt != NULL) { - num_sub = edg_wll_ExecSQL(ctx, stmt, &sh); - if (num_sub >=0 ) { - while (edg_wll_FetchRow(ctx, sh, sizeof(out)/sizeof(out[0]), NULL, out) == 1 ) { - add_stringlist(&stat->children, out[0]); - free(out[0]); + glite_lbu_FreeStmt(&sh); sh = NULL; } - glite_lbu_FreeStmt(&sh); + else goto rollback; + + free(stmt); stmt = NULL; + } else { + edg_wll_SetError(ctx, ENOMEM, "edg_wll_JobStatusServer() trio_asprintf failed!"); + goto rollback; } - free(stmt); - } else goto dag_enomem; + } + } +#endif + whole_cycle = 1; +commit: +rollback: + if (!whole_cycle) { + edg_wll_FreeStatus(&jobstat.pub); + jobstat.pub.owner = NULL; + destroy_intJobStat_extension(&jobstat); } + if (jobstat.pub.owner) { free(jobstat.pub.owner); jobstat.pub.owner = NULL; } + if (acl) { edg_wll_FreeAcl(acl); acl = NULL; } + if (stmt) { free(stmt); stmt = NULL; } + if (sh) { glite_lbu_FreeStmt(&sh); sh = NULL; } - } -#endif - free(string_jobid); - free(md5_jobid); - return edg_wll_Error(ctx, NULL, NULL); + } while (edg_wll_TransNeedRetry(ctx)); -#if DAG_ENABLE -dag_enomem: free(string_jobid); free(md5_jobid); - edg_wll_FreeStatus(stat); - free(stmt); - return edg_wll_SetError(ctx, ENOMEM, NULL); -#endif + + return edg_wll_Error(ctx, NULL, NULL); } int edg_wll_intJobStatus( @@ -667,6 +682,7 @@ cleanup: edg_wll_ErrorCode edg_wll_LoadIntState(edg_wll_Context ctx, edg_wlc_JobId jobid, + int lock, int seq, intJobStat **stat) { @@ -679,6 +695,10 @@ edg_wll_ErrorCode edg_wll_LoadIntState(edg_wll_Context ctx, edg_wll_ResetError(ctx); jobid_md5 = edg_wlc_JobIdGetUnique(jobid); + if (lock) { + edg_wll_LockJobRowForUpdate(ctx,jobid); + } + if (seq == -1) { /* any sequence number */ trio_asprintf(&stmt, @@ -712,7 +732,8 @@ edg_wll_ErrorCode edg_wll_LoadIntState(edg_wll_Context ctx, free(res); cleanup: free(jobid_md5); - free(stmt); glite_lbu_FreeStmt(&sh); + free(stmt); + if (sh) glite_lbu_FreeStmt(&sh); return edg_wll_Error(ctx,NULL,NULL); } @@ -755,9 +776,7 @@ static edg_wll_ErrorCode load_parent_intJobStat(edg_wll_Context ctx, intJobStat { if (*pis) return edg_wll_Error(ctx, NULL, NULL); // already loaded and locked - if (edg_wll_LockJob(ctx,cis->pub.parent_job)) goto err; - - if (edg_wll_LoadIntState(ctx, cis->pub.parent_job, - 1, pis)) + if (edg_wll_LoadIntState(ctx, cis->pub.parent_job, LOCK, - 1, pis)) goto err; assert(*pis); // deadlock would happen with next call of this function @@ -903,8 +922,6 @@ static edg_wll_ErrorCode update_parent_status(edg_wll_Context ctx, edg_wll_JobSt } err: - edg_wll_UnlockJob(ctx,cis->pub.parent_job); - if (pis) destroy_intJobStat(pis); @@ -986,7 +1003,7 @@ edg_wll_ErrorCode edg_wll_StepIntState(edg_wll_Context ctx, memset(&oldstat,0,sizeof oldstat); - if (!edg_wll_LoadIntState(ctx, job, seq - 1, &ijsp)) { + if (!edg_wll_LoadIntState(ctx, job, DONT_LOCK, seq - 1, &ijsp)) { edg_wll_CpyStatus(&ijsp->pub,&oldstat); if (ctx->rgma_export) oldstat_rgmaline = write2rgma_statline(&ijsp->pub); @@ -994,11 +1011,9 @@ edg_wll_ErrorCode edg_wll_StepIntState(edg_wll_Context ctx, res = processEvent(ijsp, e, seq, be_strict, &errstring); if (res == RET_FATAL || res == RET_INTERNAL) { /* !strict */ edg_wll_FreeStatus(&oldstat); - edg_wll_UnlockJob(ctx,job); /* XXX: error lost */ return edg_wll_SetError(ctx, EINVAL, errstring); } edg_wll_StoreIntState(ctx, ijsp, seq); - if (edg_wll_UnlockJob(ctx,job)) goto err; edg_wll_UpdateStatistics(ctx,&oldstat,e,&ijsp->pub); @@ -1028,7 +1043,6 @@ edg_wll_ErrorCode edg_wll_StepIntState(edg_wll_Context ctx, Right approach is computing parent status from scratch. */ - if (edg_wll_UnlockJob(ctx,job)) goto err; edg_wll_UpdateStatistics(ctx,NULL,e,&jobstat.pub); if (ctx->rgma_export) write2rgma_status(&jobstat.pub); @@ -1039,7 +1053,7 @@ edg_wll_ErrorCode edg_wll_StepIntState(edg_wll_Context ctx, } else destroy_intJobStat(&jobstat); } - else edg_wll_UnlockJob(ctx,job); + err: return edg_wll_Error(ctx, NULL, NULL); } diff --git a/org.glite.lb.server/src/jobstat.h b/org.glite.lb.server/src/jobstat.h index 964aa7a..7fba0fb 100644 --- a/org.glite.lb.server/src/jobstat.h +++ b/org.glite.lb.server/src/jobstat.h @@ -103,7 +103,7 @@ void destroy_intJobStat_extension(intJobStat *p); int edg_wll_intJobStatus( edg_wll_Context, glite_jobid_const_t, int, intJobStat *, int); edg_wll_ErrorCode edg_wll_StoreIntState(edg_wll_Context, intJobStat *, int); edg_wll_ErrorCode edg_wll_StoreIntStateEmbryonic(edg_wll_Context, edg_wlc_JobId, char *icnames, char *values, glite_lbu_bufInsert *bi); -edg_wll_ErrorCode edg_wll_LoadIntState(edg_wll_Context , edg_wlc_JobId , int, intJobStat **); +edg_wll_ErrorCode edg_wll_LoadIntState(edg_wll_Context , edg_wlc_JobId , int, int, intJobStat **); edg_wll_ErrorCode edg_wll_StepIntState(edg_wll_Context ctx, edg_wlc_JobId job, edg_wll_Event *e, int seq, edg_wll_JobStat *stat_out); edg_wll_ErrorCode edg_wll_StepIntStateParent(edg_wll_Context,edg_wlc_JobId,edg_wll_Event *,int,intJobStat *,edg_wll_JobStat *); diff --git a/org.glite.lb.server/src/load.c b/org.glite.lb.server/src/load.c index 21d8e48..68073bb 100644 --- a/org.glite.lb.server/src/load.c +++ b/org.glite.lb.server/src/load.c @@ -32,7 +32,7 @@ int edg_wll_LoadEventsServer(edg_wll_Context ctx,const edg_wll_LoadRequest *req, { int fd, reject_fd = -1, - readret, i; + readret, i, ret; size_t maxsize; char *line = NULL, buff[30]; @@ -48,9 +48,6 @@ int edg_wll_LoadEventsServer(edg_wll_Context ctx,const edg_wll_LoadRequest *req, if ( (fd = open(req->server_file, O_RDONLY)) == -1 ) return edg_wll_SetError(ctx, errno, "Server can not open the file"); - if (edg_wll_Transaction(ctx) != 0) - return edg_wll_Error(ctx, NULL, NULL); - memset(result,0,sizeof(*result)); i = 0; while ( 1 ) @@ -58,7 +55,6 @@ int edg_wll_LoadEventsServer(edg_wll_Context ctx,const edg_wll_LoadRequest *req, /* Read one line */ if ( (readret = read_line(&line, &maxsize, fd)) == -1 ) { - edg_wll_Rollback(ctx); return edg_wll_SetError(ctx, errno, "reading dump file"); } @@ -84,8 +80,15 @@ int edg_wll_LoadEventsServer(edg_wll_Context ctx,const edg_wll_LoadRequest *req, result->to = event->any.arrived.tv_sec; } ctx->event_load = 1; - if ( edg_wll_StoreEvent(ctx, event, line, NULL) ) - { + + do { + if (edg_wll_Transaction(ctx)) goto err; + + ret = edg_wll_StoreEvent(ctx, event, line, NULL); + + } while (edg_wll_TransNeedRetry(ctx)); + + if (ret) { char *errdesc; int len = strlen(line), total = 0, @@ -127,8 +130,7 @@ int edg_wll_LoadEventsServer(edg_wll_Context ctx,const edg_wll_LoadRequest *req, } write(reject_fd,"\n",1); } - else - { + else { result->to = event->any.arrived.tv_sec; if ( jobid ) { @@ -156,6 +158,7 @@ cycle_clean: edg_wll_FreeEvent(event); } +err: if ( jobid ) { edg_wll_JobStat st; @@ -168,9 +171,6 @@ cycle_clean: if ( reject_fd != -1 ) close(reject_fd); - if (edg_wll_Commit(ctx) != 0) - return edg_wll_Error(ctx, NULL, NULL); - return edg_wll_Error(ctx,NULL,NULL); } diff --git a/org.glite.lb.server/src/notification.c b/org.glite.lb.server/src/notification.c index 6138881..074260c 100644 --- a/org.glite.lb.server/src/notification.c +++ b/org.glite.lb.server/src/notification.c @@ -107,48 +107,45 @@ int edg_wll_NotifNewServer( trio_asprintf(&addr_s, "%s:%s", ctx->connections->serverConnection->peerName, aux+1); } - /* Format DB insert statement - */ - trio_asprintf(&q, - "insert into notif_registrations(notifid,destination,valid,userid,conditions) " - "values ('%|Ss','%|Ss',%s,'%|Ss', '%|Ss')", - nid_s, addr_s? addr_s: address_override, time_s, owner, xml_conds); + do { + if (edg_wll_Transaction(ctx) != 0) goto cleanup; - if ( edg_wll_ExecSQL(ctx, q, NULL) < 0 ) - goto cleanup; + /* Format DB insert statement + */ + trio_asprintf(&q, + "insert into notif_registrations(notifid,destination,valid,userid,conditions) " + "values ('%|Ss','%|Ss',%s,'%|Ss', '%|Ss')", + nid_s, addr_s? addr_s: address_override, time_s, owner, xml_conds); - if (get_indexed_cols(ctx,nid_s,nconds,&add_index) || - (add_index && edg_wll_ExecSQL(ctx,add_index,NULL) < 0) - ) goto cleanup; + if ( edg_wll_ExecSQL(ctx, q, NULL) < 0 ) + goto rollback; + if (get_indexed_cols(ctx,nid_s,nconds,&add_index) || + (add_index && edg_wll_ExecSQL(ctx,add_index,NULL) < 0) + ) goto rollback; - if (jobs) for ( i = 0; jobs[i]; i++ ) - { - free(q); - trio_asprintf(&q, - "insert into notif_jobs(notifid,jobid) values ('%|Ss','%|Ss')", - nid_s, jobs[i]); - if ( edg_wll_ExecSQL(ctx, q, NULL) < 0 ) + + if (jobs) for ( i = 0; jobs[i]; i++ ) { - /* XXX: Remove uncoplete registration? - * Which error has to be returned? - */ free(q); - trio_asprintf(&q, "delete from notif_jobs where notifid='%|Ss'", nid_s); - edg_wll_ExecSQL(ctx, q, NULL); - free(q); - trio_asprintf(&q, "delete from notif_registrations where notifid='%|Ss'", nid_s); - edg_wll_ExecSQL(ctx, q, NULL); - goto cleanup; + trio_asprintf(&q, + "insert into notif_jobs(notifid,jobid) values ('%|Ss','%|Ss')", + nid_s, jobs[i]); + if ( edg_wll_ExecSQL(ctx, q, NULL) < 0 ) + goto rollback; } - } - else { - trio_asprintf(&q,"insert into notif_jobs(notifid,jobid) values ('%|Ss','%|Ss')", - nid_s,NOTIF_ALL_JOBS); - if ( edg_wll_ExecSQL(ctx, q, NULL) < 0 ) goto cleanup; + else { + trio_asprintf(&q,"insert into notif_jobs(notifid,jobid) values ('%|Ss','%|Ss')", + nid_s,NOTIF_ALL_JOBS); + if ( edg_wll_ExecSQL(ctx, q, NULL) < 0 ) goto rollback; - } + } +rollback: + free(q); q= NULL; + free(add_index); add_index = NULL; + + } while (edg_wll_TransNeedRetry(ctx)); cleanup: if ( q ) free(q); @@ -176,57 +173,63 @@ int edg_wll_NotifBindServer( const char *address_override, time_t *valid) { - char *time_s = NULL, - *addr_s = NULL; + char *time_s = NULL, + *addr_s = NULL; if ( !address_override ) { edg_wll_SetError(ctx, EINVAL, "Address parameter not given"); - goto cleanup; + goto err; } + + do { + if (edg_wll_Transaction(ctx) != 0) goto err; - if ( check_notif_request(ctx, nid, NULL) ) - goto cleanup; - - /* Format time of validity - */ - *valid = time(NULL); - if ( ctx->peerProxyValidity - && (ctx->peerProxyValidity - *valid) < ctx->notifDuration ) - *valid = ctx->peerProxyValidity; - else - *valid += ctx->notifDuration; + if ( check_notif_request(ctx, nid, NULL) ) + goto rollback; - glite_lbu_TimeToDB(*valid, &time_s); - if ( !time_s ) - { - edg_wll_SetError(ctx, errno, "Formating validity time"); - goto cleanup; - } + /* Format time of validity + */ + *valid = time(NULL); + if ( ctx->peerProxyValidity + && (ctx->peerProxyValidity - *valid) < ctx->notifDuration ) + *valid = ctx->peerProxyValidity; + else + *valid += ctx->notifDuration; - /* Format the address - */ - if ( address_override ) - { - char *aux; + glite_lbu_TimeToDB(*valid, &time_s); + if ( !time_s ) + { + edg_wll_SetError(ctx, errno, "Formating validity time"); + goto rollback; + } - if ( !(aux = strchr(address_override, ':')) ) + /* Format the address + */ + if ( address_override ) { - edg_wll_SetError(ctx, EINVAL, "Addres overrirde not in format host:port"); - goto cleanup; + char *aux; + + if ( !(aux = strchr(address_override, ':')) ) + { + edg_wll_SetError(ctx, EINVAL, "Addres overrirde not in format host:port"); + goto rollback; + } + if ( !strncmp(address_override, "0.0.0.0", aux-address_override) ) + trio_asprintf(&addr_s, "%s:%s", ctx->connections->serverConnection->peerName, aux+1); } - if ( !strncmp(address_override, "0.0.0.0", aux-address_override) ) - trio_asprintf(&addr_s, "%s:%s", ctx->connections->serverConnection->peerName, aux+1); - } - update_notif(ctx, nid, NULL, addr_s? addr_s: address_override, (const char *)(time_s)); + update_notif(ctx, nid, NULL, addr_s? addr_s: address_override, (const char *)(time_s)); -cleanup: - if ( time_s ) free(time_s); - if ( addr_s ) free(addr_s); +rollback: + free(time_s); time_s = NULL; + free(addr_s); addr_s = NULL; + + } while (edg_wll_TransNeedRetry(ctx)); +err: return edg_wll_Error(ctx, NULL, NULL); } @@ -248,89 +251,94 @@ int edg_wll_NotifChangeServer( /* Format notification ID */ if ( !(nid_s = edg_wll_NotifIdGetUnique(nid)) ) - goto cleanup; + goto err; - if ( check_notif_request(ctx, nid, NULL) ) - goto cleanup; + do { + if (edg_wll_Transaction(ctx) != 0) goto err; - switch ( op ) - { - case EDG_WLL_NOTIF_REPLACE: - /* Format conditions - * - separate all jobids - * - format new condition list without jobids - */ - if ( split_cond_list(ctx, conditions, &nconds, &jobs) ) - goto cleanup; + if ( check_notif_request(ctx, nid, NULL) ) + goto rollback; - /* - * encode new cond. list into a XML string - */ - if ( edg_wll_JobQueryRecToXML(ctx, (edg_wll_QueryRec const * const *) nconds, &xml_conds) ) + switch ( op ) { - /* XXX: edg_wll_JobQueryRecToXML() do not set errors in context! - * can't get propper error number :( + case EDG_WLL_NOTIF_REPLACE: + /* Format conditions + * - separate all jobids + * - format new condition list without jobids */ - edg_wll_SetError(ctx, errno, "Can't encode data into xml"); - goto cleanup; - } + if ( split_cond_list(ctx, conditions, &nconds, &jobs) ) + goto rollback; - /* Format DB insert statement - */ - if ( update_notif(ctx, nid, xml_conds, NULL, NULL) ) - goto cleanup; + /* + * encode new cond. list into a XML string + */ + if ( edg_wll_JobQueryRecToXML(ctx, (edg_wll_QueryRec const * const *) nconds, &xml_conds) ) + { + /* XXX: edg_wll_JobQueryRecToXML() do not set errors in context! + * can't get propper error number :( + */ + edg_wll_SetError(ctx, errno, "Can't encode data into xml"); + goto rollback; + } - if ( jobs ) - { /* Format DB insert statement */ - trio_asprintf(&q, "delete from notif_jobs where notifid='%|Ss'", nid_s); - if ( edg_wll_ExecSQL(ctx, q, NULL) < 0 ) - goto cleanup; + if ( update_notif(ctx, nid, xml_conds, NULL, NULL) ) + goto rollback; - for ( i = 0; jobs[i]; i++ ) + if ( jobs ) { - free(q); - trio_asprintf(&q, - "insert into notif_jobs(notifid,jobid) values ('%|Ss','%|Ss')", - nid_s, jobs[i]); + /* Format DB insert statement + */ + trio_asprintf(&q, "delete from notif_jobs where notifid='%|Ss'", nid_s); if ( edg_wll_ExecSQL(ctx, q, NULL) < 0 ) + goto rollback; + + for ( i = 0; jobs[i]; i++ ) { - /* XXX: Remove uncoplete registration? - * Which error has to be returned? - */ - free(q); - trio_asprintf(&q, "delete from notif_jobs where notifid='%|Ss'", nid_s); - edg_wll_ExecSQL(ctx, q, NULL); free(q); - trio_asprintf(&q,"delete from notif_registrations where notifid='%|Ss'", nid_s); - edg_wll_ExecSQL(ctx, q, NULL); - goto cleanup; + trio_asprintf(&q, + "insert into notif_jobs(notifid,jobid) values ('%|Ss','%|Ss')", + nid_s, jobs[i]); + if ( edg_wll_ExecSQL(ctx, q, NULL) < 0 ) + { + /* XXX: Remove uncoplete registration? + * Which error has to be returned? + */ + free(q); + trio_asprintf(&q, "delete from notif_jobs where notifid='%|Ss'", nid_s); + edg_wll_ExecSQL(ctx, q, NULL); + free(q); + trio_asprintf(&q,"delete from notif_registrations where notifid='%|Ss'", nid_s); + edg_wll_ExecSQL(ctx, q, NULL); + goto rollback; + } } } + break; + + case EDG_WLL_NOTIF_ADD: + break; + case EDG_WLL_NOTIF_REMOVE: + break; + default: + break; } - break; - - case EDG_WLL_NOTIF_ADD: - break; - case EDG_WLL_NOTIF_REMOVE: - break; - default: - break; - } -cleanup: - if ( q ) free(q); - if ( xml_conds ) free(xml_conds); - if ( nid_s ) free(nid_s); - if ( jobs ) - { - for ( i = 0; jobs[i]; i++ ) - free(jobs[i]); - free(jobs); - } - if ( nconds ) free(nconds); +rollback: + free(q); q = NULL; + free(xml_conds); xml_conds = NULL; + free(nid_s); nid_s = NULL; + if ( jobs ) { + for ( i = 0; jobs[i]; i++ ) + free(jobs[i]); + free(jobs); jobs = NULL; + } + free(nconds); nconds = NULL; + + } while (edg_wll_TransNeedRetry(ctx)); +err: return edg_wll_Error(ctx, NULL, NULL); } @@ -341,31 +349,36 @@ int edg_wll_NotifRefreshServer( { char *time_s = NULL; + do { + if (edg_wll_Transaction(ctx) != 0) goto err; - if ( check_notif_request(ctx, nid, NULL) ) - goto cleanup; + if ( check_notif_request(ctx, nid, NULL) ) + goto rollback; - /* Format time of validity - */ - *valid = time(NULL); - if ( ctx->peerProxyValidity - && (ctx->peerProxyValidity - *valid) < ctx->notifDuration ) - *valid = ctx->peerProxyValidity; - else - *valid += ctx->notifDuration; + /* Format time of validity + */ + *valid = time(NULL); + if ( ctx->peerProxyValidity + && (ctx->peerProxyValidity - *valid) < ctx->notifDuration ) + *valid = ctx->peerProxyValidity; + else + *valid += ctx->notifDuration; - glite_lbu_TimeToDB(*valid, &time_s); - if ( !time_s ) - { - edg_wll_SetError(ctx, errno, "Formating validity time"); - goto cleanup; - } + glite_lbu_TimeToDB(*valid, &time_s); + if ( !time_s ) + { + edg_wll_SetError(ctx, errno, "Formating validity time"); + goto rollback; + } - update_notif(ctx, nid, NULL, NULL, time_s); + update_notif(ctx, nid, NULL, NULL, time_s); -cleanup: - if ( time_s ) free(time_s); +rollback: + free(time_s); time_s = NULL; + } while (edg_wll_TransNeedRetry(ctx)); + +err: return edg_wll_Error(ctx, NULL, NULL); } @@ -375,27 +388,33 @@ int edg_wll_NotifDropServer( { char *nid_s = NULL, *stmt = NULL; - int ret; + + do { + if (edg_wll_Transaction(ctx) != 0) goto err; - if ( check_notif_request(ctx, nid, NULL) ) - goto cleanup; + if ( check_notif_request(ctx, nid, NULL) ) + goto rollback; - if ( !(nid_s = edg_wll_NotifIdGetUnique(nid)) ) - goto cleanup; + if ( !(nid_s = edg_wll_NotifIdGetUnique(nid)) ) + goto rollback; - trio_asprintf(&stmt, "delete from notif_registrations where notifid='%|Ss'", nid_s); - if ( (ret = edg_wll_ExecSQL(ctx, stmt, NULL)) < 0 ) - goto cleanup; - free(stmt); - trio_asprintf(&stmt, "delete from notif_jobs where notifid='%|Ss'", nid_s); - edg_wll_ExecSQL(ctx, stmt, NULL); - edg_wll_NotifCancelRegId(ctx, nid); + trio_asprintf(&stmt, "delete from notif_registrations where notifid='%|Ss'", nid_s); + if ( edg_wll_ExecSQL(ctx, stmt, NULL) < 0 ) + goto rollback; + free(stmt); + trio_asprintf(&stmt, "delete from notif_jobs where notifid='%|Ss'", nid_s); + if ( edg_wll_ExecSQL(ctx, stmt, NULL) < 0 ) + goto rollback; + edg_wll_NotifCancelRegId(ctx, nid); -cleanup: - if ( nid_s ) free(nid_s); - if ( stmt ) free(stmt); +rollback: + free(nid_s); nid_s = NULL; + free(stmt); stmt = NULL; + + } while (edg_wll_TransNeedRetry(ctx)); +err: return edg_wll_Error(ctx, NULL, NULL); } @@ -478,7 +497,7 @@ static int check_notif_request( trio_asprintf(&stmt, "select notifid from notif_registrations " - "where notifid='%|Ss' and userid='%|Ss'", + "where notifid='%|Ss' and userid='%|Ss' FOR UPDATE", nid_s, user); if ( (ret = edg_wll_ExecSQL(ctx, stmt, NULL)) < 0 ) diff --git a/org.glite.lb.server/src/purge.h b/org.glite.lb.server/src/purge.h index 697acdf..37e534e 100644 --- a/org.glite.lb.server/src/purge.h +++ b/org.glite.lb.server/src/purge.h @@ -19,7 +19,7 @@ int edg_wll_PurgeServer( */ int edg_wll_PurgeServerProxy( edg_wll_Context ctx, - edg_wlc_JobId job + glite_jobid_const_t job ); #define FILE_TYPE_ANY "" diff --git a/org.glite.lb.server/src/query.c b/org.glite.lb.server/src/query.c index 2ec8236..e0279de 100644 --- a/org.glite.lb.server/src/query.c +++ b/org.glite.lb.server/src/query.c @@ -211,7 +211,7 @@ int edg_wll_QueryEventsServer( } } - // Auth checked in edg_wll_JobStatus above + // Auth checked in edg_wll_JobStatusServer above if ( !(where_flags & FL_FILTER) && !noAuth ) { if (!ctx->peerName || (strcmp(res[1],peerid) && strcmp(res[1], can_peerid))) { diff --git a/org.glite.lb.server/src/srv_purge.c b/org.glite.lb.server/src/srv_purge.c index 47f1227..674b725 100644 --- a/org.glite.lb.server/src/srv_purge.c +++ b/org.glite.lb.server/src/srv_purge.c @@ -42,9 +42,9 @@ static const char* const resp_headers[] = { NULL }; -static int purge_one(edg_wll_Context ctx,glite_jobid_const_t,int,int); -int unset_proxy_flag(edg_wll_Context ctx, edg_wlc_JobId job); -static int unset_server_flag(edg_wll_Context ctx, edg_wlc_JobId job); +static int purge_one(edg_wll_Context ctx,glite_jobid_const_t,int,int,int); +int unset_proxy_flag(edg_wll_Context ctx, glite_jobid_const_t job); +static int unset_server_flag(edg_wll_Context ctx, glite_jobid_const_t job); int edg_wll_CreateTmpFileStorage(edg_wll_Context ctx, char *prefix, char **fname) @@ -184,16 +184,16 @@ int edg_wll_CreateFileStorage(edg_wll_Context ctx, char *file_type, char *prefix return retfd; } -int edg_wll_PurgeServerProxy(edg_wll_Context ctx, edg_wlc_JobId job) +int edg_wll_PurgeServerProxy(edg_wll_Context ctx, glite_jobid_const_t job) { - switch ( purge_one(ctx, job, -1, 1) ) { + switch ( purge_one(ctx, job, -1, 1, 1) ) { case 0: case ENOENT: - edg_wll_ResetError(ctx); - return 0; - + return(edg_wll_ResetError(ctx)); + break; default: - return -1; + return(edg_wll_Error(ctx,NULL,NULL)); + break; } } @@ -252,7 +252,7 @@ int edg_wll_PurgeServer(edg_wll_Context ctx,const edg_wll_PurgeRequest *request) parse = 1; } else { - switch (purge_one(ctx,job,dumpfile,request->flags&EDG_WLL_PURGE_REALLY_PURGE)) { + switch (purge_one(ctx,job,dumpfile,request->flags&EDG_WLL_PURGE_REALLY_PURGE,0)) { case 0: if (request->flags & EDG_WLL_PURGE_LIST_JOBS) { result.jobs = realloc(result.jobs,(naffected_jobs+2) * sizeof(*result.jobs)); result.jobs[naffected_jobs] = strdup(request->jobs[i]); @@ -260,7 +260,7 @@ int edg_wll_PurgeServer(edg_wll_Context ctx,const edg_wll_PurgeRequest *request) } naffected_jobs++; break; - case ENOENT: parse = 1; + case ENOENT: /* job does not exist, consider purged and ignore */ edg_wll_ResetError(ctx); break; default: goto abort; @@ -298,6 +298,11 @@ int edg_wll_PurgeServer(edg_wll_Context ctx,const edg_wll_PurgeRequest *request) memset(&stat,0,sizeof stat); if (edg_wll_JobStatusServer(ctx,job,0,&stat)) { /* FIXME: replace by intJobStatus ?? */ + if (edg_wll_Error(ctx, NULL, NULL) == ENOENT) { + /* job purged meanwhile, ignore */ + edg_wll_ResetError(ctx); + continue; + } edg_wll_FreeStatus(&stat); goto abort; } @@ -314,8 +319,13 @@ int edg_wll_PurgeServer(edg_wll_Context ctx,const edg_wll_PurgeRequest *request) if (now-stat.lastUpdateTime.tv_sec > timeout[i] && !check_strict_jobid(ctx,job)) { - if (purge_one(ctx,job,dumpfile,request->flags&EDG_WLL_PURGE_REALLY_PURGE)) { + if (purge_one(ctx,job,dumpfile,request->flags&EDG_WLL_PURGE_REALLY_PURGE,0)) { edg_wll_FreeStatus(&stat); + if (edg_wll_Error(ctx, NULL, NULL) == ENOENT) { + /* job purged meanwhile, ignore */ + edg_wll_ResetError(ctx); + continue; + } goto abort; } @@ -424,214 +434,204 @@ static void unlock_and_check(edg_wll_Context ctx,edg_wlc_JobId job) } } +static int dump_events(edg_wll_Context ctx, glite_jobid_const_t job, int dump, char **res) +{ + edg_wll_Event e; + int event; + + + event = atoi(res[0]); + free(res[0]); res[0] = NULL; + + res[0] = edg_wlc_JobIdUnparse(job); + if (convert_event_head(ctx,res,&e) || edg_wll_get_event_flesh(ctx,event,&e)) + { + char *et,*ed, *dbjob; + int i; + -int purge_one(edg_wll_Context ctx,glite_jobid_const_t job,int dump, int purge) + /* Most likely sort of internal inconsistency. + * Must not be fatal -- just complain + */ + edg_wll_Error(ctx,&et,&ed); + dbjob = edg_wlc_JobIdGetUnique(job); + fprintf(stderr,"%s event %d: %s (%s)\n",dbjob,event,et,ed); + syslog(LOG_WARNING,"%s event %d: %s (%s)",dbjob,event,et,ed); + free(et); free(ed); free(dbjob); + for (i=0; iisProxy) { - /* should not happen */ - return 0; - } - /* continue */ - break; - case DB_SERVER_JOB: - if (ctx->isProxy) { - /* should not happen */ - return 0; - } - /* continue */ - break; - case DB_PROXY_JOB+DB_SERVER_JOB: - if (ctx->isProxy) { - purge = 0; - if (unset_proxy_flag(ctx, job) < 0) { - return(edg_wll_Error(ctx,NULL,NULL)); + do { + if (edg_wll_Transaction(ctx)) goto err; + + switch (edg_wll_jobMembership(ctx, job)) { + case DB_PROXY_JOB: + if (!ctx->isProxy) { + /* should not happen */ + goto commit; } - } - else { - purge = 0; - if (unset_server_flag(ctx, job) < 0) { - return(edg_wll_Error(ctx,NULL,NULL)); + /* continue */ + break; + case DB_SERVER_JOB: + if (ctx->isProxy) { + /* should not happen */ + goto commit; } - } - break; - case 0: - // Zombie job (server=0, proxy=0)? should not happen; - // clear it to keep DB healthy - break; - default: - return 0; - break; - } + /* continue */ + break; + case DB_PROXY_JOB+DB_SERVER_JOB: + if (ctx->isProxy) { + purge = 0; + if (unset_proxy_flag(ctx, job) < 0) { + goto rollback; + } + } + else { + purge = 0; + /* if server&proxy DB is shared ... */ + if (is_job_local(ctx,job) && purge_from_proxy_only) { + if (unset_proxy_flag(ctx, job) < 0) { + goto rollback; + } + } + else { + if (unset_server_flag(ctx, job) < 0) { + goto rollback; + } + } + } + break; + case 0: + // Zombie job (server=0, proxy=0)? should not happen; + // clear it to keep DB healthy + break; + default: + goto rollback; + break; + } - dbjob = edg_wlc_JobIdGetUnique(job); /* XXX: strict jobid already checked */ - if (edg_wll_LockJob(ctx,job)) goto clean; + dbjob = edg_wlc_JobIdGetUnique(job); /* XXX: strict jobid already checked */ - if ( purge ) - { - trio_asprintf(&stmt,"delete from jobs where jobid = '%|Ss'",dbjob); - ret = edg_wll_ExecSQL(ctx,stmt,NULL); - if (ret <= 0) { - unlock_and_check(ctx,job); - if (ret == 0) { - fprintf(stderr,"%s: no such job\n",dbjob); - edg_wll_SetError(ctx,ENOENT,dbjob); - } - goto clean; - } - free(stmt); stmt = NULL; + if ( purge ) + { + trio_asprintf(&stmt,"delete from jobs where jobid = '%|Ss'",dbjob); + if (edg_wll_ExecSQL(ctx,stmt,NULL) < 0) goto rollback; + free(stmt); stmt = NULL; - trio_asprintf(&stmt,"delete from states where jobid = '%|Ss'",dbjob); - if (edg_wll_ExecSQL(ctx,stmt,NULL) < 0) { - unlock_and_check(ctx,job); - goto clean; + trio_asprintf(&stmt,"delete from states where jobid = '%|Ss'",dbjob); + if (edg_wll_ExecSQL(ctx,stmt,NULL) < 0) goto rollback; + free(stmt); stmt = NULL; } - free(stmt); stmt = NULL; -/* Why on earth ? - trio_asprintf(&stmt,"delete from states where jobid = '%|Ss'",dbjob); - if (edg_wll_ExecSQL(ctx,stmt,NULL) < 0) { - unlock_and_check(ctx,job); - goto clean; + if ( purge ) + { + trio_asprintf(&stmt,"delete from status_tags where jobid = '%|Ss'",dbjob); + if (edg_wll_ExecSQL(ctx,stmt,NULL) < 0) goto rollback; + free(stmt); stmt = NULL; } - free(stmt); stmt = NULL; -*/ - } - - if (!ctx->strict_locking) unlock_and_check(ctx,job); + if (dump >= 0) + trio_asprintf(&stmt, + "select event,code,prog,host,u.cert_subj,time_stamp,usec,level,arrived " + "from events e,users u " + "where e.jobid='%|Ss' " + "and u.userid=e.userid " + "order by event", dbjob); + else + trio_asprintf(&stmt,"select event from events " + "where jobid='%|Ss' " + "order by event", dbjob); - if ( purge ) - { - trio_asprintf(&stmt,"delete from status_tags where jobid = '%|Ss'",dbjob); - if (edg_wll_ExecSQL(ctx,stmt,NULL) < 0) goto unlock; + if (edg_wll_ExecSQL(ctx,stmt,&q) < 0) goto rollback; free(stmt); stmt = NULL; - } - - if (dump >= 0) - trio_asprintf(&stmt, - "select event,code,prog,host,u.cert_subj,time_stamp,usec,level,arrived " - "from events e,users u " - "where e.jobid='%|Ss' " - "and u.userid=e.userid " - "order by event", dbjob); - else - trio_asprintf(&stmt,"select event from events " - "where jobid='%|Ss' " - "order by event", dbjob); - -/* check for events repeatedly -- new one may have arrived in the meantime */ - while ((ret = edg_wll_ExecSQL(ctx,stmt,&q)) > 0) { - char *res[9]; dumped = 1; while ((ret = edg_wll_FetchRow(ctx,q,sizofa(res),NULL,res)) > 0) { int event; + + assert(ret == 9); event = atoi(res[0]); - free(res[0]); res[0] = NULL; - if (dump >= 0) { - edg_wll_Event e; + if (dump >= 0) + if (dump_events( ctx, job, dump, (char **) &res)) goto rollback; - assert(ret == 9); - res[0] = edg_wlc_JobIdUnparse(job); - if (convert_event_head(ctx,res,&e) || edg_wll_get_event_flesh(ctx,event,&e)) - { - char *et,*ed; - int i; - - /* Most likely sort of internal inconsistency. - * Must not be fatal -- just complain - */ - edg_wll_Error(ctx,&et,&ed); - fprintf(stderr,"%s event %d: %s (%s)\n",dbjob,event,et,ed); - syslog(LOG_WARNING,"%s event %d: %s (%s)",dbjob,event,et,ed); - free(et); free(ed); - for (i=0; istrict_locking) unlock_and_check(ctx,job); -clean: +err: free(dbjob); free(stmt); return edg_wll_Error(ctx,NULL,NULL); } -int unset_proxy_flag(edg_wll_Context ctx, edg_wlc_JobId job) +int unset_proxy_flag(edg_wll_Context ctx, glite_jobid_const_t job) { char *stmt = NULL; char *dbjob; @@ -646,7 +646,7 @@ int unset_proxy_flag(edg_wll_Context ctx, edg_wlc_JobId job) } -int unset_server_flag(edg_wll_Context ctx, edg_wlc_JobId job) +int unset_server_flag(edg_wll_Context ctx, glite_jobid_const_t job) { char *stmt = NULL; char *dbjob; diff --git a/org.glite.lb.server/src/store.c.T b/org.glite.lb.server/src/store.c.T index 1bb3623..5610b8e 100644 --- a/org.glite.lb.server/src/store.c.T +++ b/org.glite.lb.server/src/store.c.T @@ -16,6 +16,7 @@ #include #include #include +#include #include "glite/jobid/strmd5.h" #include "glite/lbu/trio.h" @@ -31,37 +32,39 @@ #include "jobstat.h" #include "db_calls.h" #include "db_supp.h" +#include "index.h" static int store_user(edg_wll_Context,const char *,const char *); -static int store_job(edg_wll_Context,glite_jobid_const_t,const char *, int, int); +static int store_job(edg_wll_Context,glite_jobid_const_t,const char *, int, int, int, int); #ifdef LB_BUF static int store_job_block(edg_wll_Context, glite_jobid_const_t, const char *, glite_lbu_bufInsert *, int, int); #endif -static int store_job_grey(edg_wll_Context,glite_jobid_const_t,time_t); +static int set_job_grey(edg_wll_Context ctx, char *jobid); static int store_flesh(edg_wll_Context,edg_wll_Event *,const char *ulm, char *,int); -static int store_seq(edg_wll_Context,edg_wll_Event *,int); static int check_dup(edg_wll_Context,edg_wll_Event *); static int check_auth(edg_wll_Context,edg_wll_Event *e); -#ifndef LB_DAG_EMBRIONIC -static int register_subjobs(edg_wll_Context,const edg_wll_RegJobEvent *); -#endif -static int register_subjobs_embryonic(edg_wll_Context,const edg_wll_RegJobEvent *, const char *); +static void lowercase_usertag(edg_wll_Event *ev); void edg_wll_StoreAnonymous(edg_wll_Context ctx,int anon) { ctx->allowAnonymous = anon; } + +/* !!! to be called from OPEN TRANSACTION only !!! + */ int edg_wll_StoreEvent(edg_wll_Context ctx,edg_wll_Event *e,const char *ulm,int *seq) { - edg_wll_ErrorCode err = 0; - char *userid = NULL,*jobid,*stmt; - char *select_max,*ssrc; + char *userid, *jobid, *stmt, *ssrc, *now_s, *stamp, *dummy, *max; glite_lbu_Statement sh = NULL; - int next = 0xDEAD; - int lbproxy_notreg = 0; - char *now_s = NULL; + int next = 0xDEAD, nr; - ssrc = jobid = stmt = select_max = NULL; + + userid = ssrc = jobid = stmt = now_s = stamp = dummy = max = NULL; + + lowercase_usertag(e); + jobid = edg_wlc_JobIdGetUnique(e->any.jobId); + glite_lbu_TimeToDB(e->any.timestamp.tv_sec, &stamp); + ssrc = edg_wll_SourceToString(e->any.source); if ( ctx->event_load ) glite_lbu_TimeToDB(e->any.arrived.tv_sec, &now_s); @@ -69,21 +72,12 @@ int edg_wll_StoreEvent(edg_wll_Context ctx,edg_wll_Event *e,const char *ulm,int glite_lbu_TimeToDB(time(NULL), &now_s); edg_wll_ResetError(ctx); - switch (err = check_auth(ctx,e)) { + switch (check_auth(ctx,e)) { case 0: break; case ENOENT: - if ( !ctx->isProxy ) { - if (ctx->greyjobs) { - edg_wll_ResetError(ctx); - if (store_job_grey(ctx,e->any.jobId,e->any.timestamp.tv_sec)) - goto clean; - break; - } - else goto clean; - } - - edg_wll_ResetError(ctx); - lbproxy_notreg = 1; + /* job not registered */ + // should not happen, store_job_server_proxy() miscoded or going thu load? + goto clean; break; case EPERM: if (!ctx->noAuth) goto clean; @@ -92,11 +86,6 @@ int edg_wll_StoreEvent(edg_wll_Context ctx,edg_wll_Event *e,const char *ulm,int default: goto clean; } -/* FIXME: does not work for grey jobs due to "select from jobs" -- I don't care for the time being */ - if ((err = check_dup(ctx,e))) goto clean; - - jobid = edg_wlc_JobIdGetUnique(e->any.jobId); - trio_asprintf(&stmt,"select userid from jobs where jobid='%|Ss'", jobid); if (edg_wll_ExecSQL(ctx,stmt,&sh) < 0 || edg_wll_FetchRow(ctx,sh,1,NULL,&userid) < 0) goto clean; @@ -104,131 +93,69 @@ int edg_wll_StoreEvent(edg_wll_Context ctx,edg_wll_Event *e,const char *ulm,int free(stmt); stmt = NULL; -/* obtain next event sequence number */ - trio_asprintf(&select_max, - "select max(event) from events " - "where jobid = '%|Ss'",jobid); +/* check duplicity */ + trio_asprintf(&stmt, + "select arrived from events where jobid='%|Ss' and code='%d'" + " and prog='%|Ss' and host='%|Ss' and time_stamp='%s' and usec='%d'" + " and level='%d' and userid='%|Ss' and seqcode='%|Ss') ", + jobid, (int) e->any.type, + ssrc,e->any.host, + stamp,e->any.timestamp.tv_usec, + e->any.level,userid, e->any.seqcode); + + if (edg_wll_ExecSQL(ctx,stmt,NULL) < 0) goto clean; + nr = edg_wll_FetchRow(ctx,sh,1,NULL,&dummy); + if (sh) glite_lbu_FreeStmt(&sh); + free(stmt); stmt = NULL; + free(dummy); - ssrc = edg_wll_SourceToString(e->any.source); + if (nr < 0) goto clean; + if (nr > 0) { + /* possible duplicity (99%) */ + // XXX: check event flesh to be 100% sure + edg_wll_SetError(ctx,EEXIST,"duplicate event"); + goto clean; + } + /* else (nr == 0) -> unique event, continue */ + -/* try to insert (someone else may be doing the same) */ - { - char *max = NULL; +/* obtain number of stored events */ + trio_asprintf(&stmt, + "select nevents from jobs " + "where jobid = '%|Ss'",jobid); + + if (edg_wll_ExecSQL(ctx,stmt,&sh) < 0 || + edg_wll_FetchRow(ctx,sh,1,NULL,&max) < 0) goto clean; + glite_lbu_FreeStmt(&sh); + + next = (max && *max) ? atoi(max)+1 : 0; + free(max); - if (edg_wll_ExecSQL(ctx,select_max,&sh) < 0 || - edg_wll_FetchRow(ctx,sh,1,NULL,&max) < 0) - { - err = edg_wll_Error(ctx,NULL,NULL); - goto clean; - } - glite_lbu_FreeStmt(&sh); - - next = max && *max ? atoi(max)+1 : 0; - free(max); - } - - while (1) { - /* - * 1) when using transactions: - * Store the whole event right now. - * - * 2) when not using transactions: - * Store an UNDEF event first in order to prevent race condition - * with readers and update event code later. - */ - char *stamp = NULL; - - glite_lbu_TimeToDB(e->any.timestamp.tv_sec, &stamp); - trio_asprintf(&stmt, - "insert into events(jobid,event,code,prog,host,time_stamp,usec,arrived,level,userid) " - "values ('%|Ss',%d,%d,'%|Ss','%|Ss',%s,%d,%s,%d,'%|Ss')", - jobid,next, - ctx->dbcaps & GLITE_LBU_DB_CAP_TRANSACTIONS ? (int) e->any.type : EDG_WLL_EVENT_UNDEF, - ssrc,e->any.host, - stamp,e->any.timestamp.tv_usec, - now_s, e->any.level,userid); - free(stamp); - - if (edg_wll_ExecSQL(ctx,stmt,NULL) < 0) { - if ((err = edg_wll_Error(ctx,NULL,NULL)) != EEXIST) - goto clean; - } else break; /* successful insert */ - - /* we were late -- try once again */ - next++; - free(stmt); - } +/* store event */ + trio_asprintf(&stmt, + "insert into events(jobid,event,code,prog,host,time_stamp,usec,arrived,level,userid,seqcode) " + "values ('%|Ss',%d,%d,'%|Ss','%|Ss',%s,%d,%s,%d,'%|Ss','%|Ss')", + jobid,next, + (int) e->any.type, + ssrc,e->any.host, + stamp,e->any.timestamp.tv_usec, + now_s, e->any.level,userid, e->any.seqcode); + + if (edg_wll_ExecSQL(ctx,stmt,NULL) < 0) goto clean; free(stmt); stmt = NULL; - if ((err = store_seq(ctx,e,next)) || - (err = store_flesh(ctx,e,ulm,jobid,next))) { - /* attempt to cleanup, ignore new errors */ - char *desc; - edg_wll_ErrorCode oerr = edg_wll_Error(ctx,NULL,&desc); - - edg_wll_delete_event(ctx,jobid,next); - edg_wll_SetError(ctx,oerr,desc); - free(desc); - } else - if (!(ctx->dbcaps & GLITE_LBU_DB_CAP_TRANSACTIONS)) { - /* emulate commit, i.e. swith to a real event type to make - * the record valid */ - trio_asprintf(&stmt, - "update events set code=%d " - "where jobid='%|Ss' and event=%d", - (int) e->any.type,jobid,next); - switch (edg_wll_ExecSQL(ctx,stmt,NULL)) { - case 0: if (ctx->strict_locking) - err = edg_wll_SetError(ctx,ENOENT,"event disappeared on store while strict locking"); - /* purge in progres: drop the garbage, ignore errors */ - else { - edg_wll_delete_event(ctx,jobid,next); - err = edg_wll_SetError(ctx,ENOENT,"job being purged"); - } - break; - case 1: if (ctx->strict_locking) err = 0; - else { - /* check whether the job is still there to prevent garbage - * left while there is a concurrent purge - */ - free(stmt); - trio_asprintf(&stmt, - "select 'x' from jobs where jobid='%|Ss'", - jobid); - switch (edg_wll_ExecSQL(ctx,stmt,NULL)) { - case 1: break; - case 0: /* purge in progres */ - edg_wll_delete_event(ctx,jobid,next); - err = edg_wll_SetError(ctx,ENOENT,"job being purged"); - break; - default: err = edg_wll_SetError(ctx,EDG_WLL_ERROR_DB_CALL, - "more job records, what is that?"); - break; - } - } - break; - case -1: err = edg_wll_Error(ctx,NULL,NULL); - break; - - default: err = edg_wll_SetError(ctx,EDG_WLL_ERROR_DB_CALL, - "more event records, what is that?"); - break; - } - } /* if !transactions */ - if (err == 0 && - e->any.type == EDG_WLL_EVENT_REGJOB && - (e->regJob.jobtype == EDG_WLL_REGJOB_DAG || - e->regJob.jobtype == EDG_WLL_REGJOB_PARTITIONED || - e->regJob.jobtype == EDG_WLL_REGJOB_COLLECTION) && - e->regJob.nsubjobs > 0) +/* increase number of stored events */ + trio_asprintf(&stmt, + "update jobs set nevents='%d'" + "where jobid = '%|Ss'", next, jobid); -#ifdef LB_DAG_EMBRIONIC - err = register_subjobs_embryonic(ctx,&e->regJob,userid); -#else - err = register_subjobs(ctx,&e->regJob); -#endif + if (edg_wll_ExecSQL(ctx,stmt,NULL) < 0) goto clean; + free(stmt); stmt = NULL; + +/* store event record */ + if (store_flesh(ctx,e,ulm,jobid,next)) goto clean; clean: free(now_s); @@ -236,10 +163,10 @@ clean: free(jobid); free(stmt); free(ssrc); - free(select_max); if (sh) glite_lbu_FreeStmt(&sh); - if (!err && seq) *seq = next; - return err; + if (!edg_wll_Error(ctx,NULL,NULL) && seq) *seq = next; + free(stamp); + return edg_wll_Error(ctx,NULL,NULL); } static int store_user(edg_wll_Context ctx,const char *userid,const char *subj) @@ -258,7 +185,7 @@ static int store_user(edg_wll_Context ctx,const char *userid,const char *subj) return edg_wll_Error(ctx,NULL,NULL); } -static int store_job(edg_wll_Context ctx,glite_jobid_const_t job,const char *userid, int proxy, int server) +static int store_job(edg_wll_Context ctx,glite_jobid_const_t job,const char *userid, int proxy, int server,int grey, int update) { char *jobstr = edg_wlc_JobIdUnparse(job); char *jobid = edg_wlc_JobIdGetUnique(job); @@ -288,22 +215,26 @@ static int store_job(edg_wll_Context ctx,glite_jobid_const_t job,const char *use else { server = 1; } - - trio_asprintf(&stmt,"insert into jobs(jobid,dg_jobid,userid,proxy,server) " - "values ('%|Ss','%|Ss','%|Ss', '%|Sd', '%|Sd')",jobid,jobstr,userid,proxy,server); - if (edg_wll_ExecSQL(ctx,stmt,NULL) < 0) { - if (edg_wll_Error(ctx,NULL,NULL) == EEXIST) - edg_wll_ResetError(ctx); + if (update) { + trio_asprintf(&stmt,"update jobs set userid='%|Ss', proxy='%|Sd', server='%|Sd', grey='%|Sd' where jobid='%|Ss'", + userid,proxy,server,grey,jobid); + } + else { + trio_asprintf(&stmt,"insert into jobs(jobid,dg_jobid,userid,proxy,server,grey) " + "values ('%|Ss','%|Ss','%|Ss', '%|Sd', '%|Sd', '%|Sd')",jobid,jobstr,userid,proxy,server,grey); } - free(stmt); - if (ctx->greyjobs) { - trio_asprintf(&stmt,"delete from grey_jobs where jobid = '%|Ss'",jobid); - edg_wll_ExecSQL(ctx,stmt,NULL); /* XXX: error propagates */ - free(stmt); + if (edg_wll_ExecSQL(ctx,stmt,NULL) < 0) { + if (edg_wll_Error(ctx,NULL,NULL) == EEXIST && !update) + edg_wll_ResetError(ctx); + else + goto err; } + free(stmt); stmt = NULL; +err: + free(stmt); free(jobstr); free(jobid); return edg_wll_Error(ctx,NULL,NULL); @@ -335,35 +266,19 @@ static int store_job_block(edg_wll_Context ctx,glite_jobid_const_t job,const cha } #endif -static int store_job_grey(edg_wll_Context ctx,glite_jobid_const_t job,time_t etime) +static int set_job_grey(edg_wll_Context ctx, char *jobid) { - char *jobstr = edg_wlc_JobIdUnparse(job); - char *jobid = edg_wlc_JobIdGetUnique(job); - char *stmt, *s_etime; - - if (jobid == NULL || jobstr == NULL) - return edg_wll_SetError(ctx,EINVAL,"store_job_grey()"); - - edg_wll_ResetError(ctx); - glite_lbu_TimeToDB(etime, &s_etime); - trio_asprintf(&stmt,"insert into grey_jobs(jobid,dg_jobid,time_stamp) " - "values ('%|Ss','%|Ss',%s)", - jobid,jobstr,s_etime); - free(s_etime); + char *stmt; - if (edg_wll_ExecSQL(ctx,stmt,NULL) < 0) { - if (edg_wll_Error(ctx,NULL,NULL) == EEXIST) - edg_wll_ResetError(ctx); - } + trio_asprintf(&stmt,"update jobs set grey='1' where jobid='%|Ss'", jobid); + edg_wll_ExecSQL(ctx,stmt,NULL); free(stmt); - free(jobstr); - free(jobid); return edg_wll_Error(ctx,NULL,NULL); } /* test whether job shares LB proxy and server DB or not */ -int is_job_local(edg_wll_Context ctx, edg_wlc_JobId jobId) +int is_job_local(edg_wll_Context ctx, glite_jobid_const_t jobId) { char *srvName = NULL; unsigned int srvPort; @@ -377,169 +292,88 @@ int is_job_local(edg_wll_Context ctx, edg_wlc_JobId jobId) return(ret); } -int store_job_server_proxy(edg_wll_Context ctx, edg_wll_Event *event) -{ +int store_job_server_proxy(edg_wll_Context ctx, edg_wll_Event *event, int *register_to_JP) +{ char *unique = edg_wlc_JobIdGetUnique(event->any.jobId); - char *q = NULL, *owner = NULL, *userid = NULL; + char *q = NULL, *userid = NULL, *subj; glite_lbu_Statement stmt = NULL; - int nar; + int nar, grey = 0; char *can_peername = NULL; - int local_job = is_job_local(ctx, event->any.jobId); + int local_job = is_job_local(ctx, event->any.jobId); + char *res[3] = {NULL, NULL, NULL}; + + + /* check auth */ + if (!ctx->isProxy && !ctx->peerName) + return edg_wll_SetError(ctx,EPERM,"LB server can't store using unauthenticated connection"); + if (ctx->isProxy && (!event->any.user || !strcmp(event->any.user,EDG_WLL_LOG_USER_DEFAULT)) ) + return edg_wll_SetError(ctx,EPERM,"LB proxy can't store using unauthenticated connection"); - edg_wll_ResetError(ctx); - can_peername = edg_wll_gss_normalize_subj(ctx->peerName, 0); - if (ctx->isProxy) { - /* event arrived on proxy socket */ - if (event->any.type == EDG_WLL_EVENT_REGJOB) { - if (event->any.priority & EDG_WLL_LOGFLAG_DIRECT) { - /* first synchronous registration */ - if (local_job) { - /* we are both server and proxy for this job */ - trio_asprintf(&q,"update jobs set proxy=1 where jobid='%|Ss'", - unique); - - nar = edg_wll_ExecSQL(ctx, q, NULL); - - if (nar == 0) { - /* job isn't registered yet */ - userid = strdup(strmd5("unknown_to_proxy", NULL)); - if (store_user(ctx,userid,"unknown_to_proxy")) goto err; - - if (store_job(ctx,(glite_jobid_const_t) event->any.jobId, - userid, 1, ctx->serverRunning)) goto err; - - } - else {} /* job was registered thru GSI, no further action needed */ - /* or error occured - and will go out via return() */ - } - else { - /* we are only proxy for this job, forward it to server */ - - /* XXX - does it have any sence ?? - if (!strcmp(e->any.user,EDG_WLL_LOG_USER_DEFAULT)) { - edg_wll_SetError(ctx,EPERM,"can't register jobs anonymously"); - goto err; - } - */ - - userid = strdup(strmd5(event->any.user, NULL)); - if (store_user(ctx,userid,event->any.user)) goto err; - - if (store_job(ctx,(glite_jobid_const_t) event->any.jobId, - userid, 1, 0)) goto err; - } - } + trio_asprintf(&q,"select proxy,server,grey from jobs where jobid='%|Ss' for update", event->any.jobId); + + if ( (nar = edg_wll_ExecSQL(ctx,q,&stmt)) < 0 ) { + /* Job not registered yet */ + + if (!( (event->any.type == EDG_WLL_EVENT_REGJOB) && + (event->any.priority & EDG_WLL_LOGFLAG_DIRECT) )) + { + if (ctx->greyjobs) grey = 1; else { - /* supplementary re-registration (JDL of subjob, etc.) */ - if (local_job) { - /* previous registration via GSI required */ - trio_asprintf(&q,"update jobs set jobid='%|Ss', proxy='1' where jobid='%|Ss'", - unique, unique); - /* does the job exists ? */ - if (edg_wll_ExecSQL(ctx,q,NULL) < 0) { - edg_wll_SetError(ctx, ENOENT, "job not registered"); - goto err; - } - } - else { - /* try to register job in case that first reistration */ - /* was sent to server only; ignore errors (EEXIST) */ - userid = strdup(strmd5(event->any.user, NULL)); - if (store_user(ctx,userid,event->any.user)) goto err; - - store_job(ctx,(glite_jobid_const_t) event->any.jobId, - userid, 1, 0); - edg_wll_ResetError(ctx); - - } - } - } - else { - /* any other event than JobReg */ - trio_asprintf(&q,"update jobs set jobid='%|Ss', proxy='1' where jobid='%|Ss'", - unique, unique); - /* does the job exists ? now we require registration on proxy too */ - if (edg_wll_ExecSQL(ctx,q,NULL) < 0) { edg_wll_SetError(ctx, ENOENT, "job not registered"); - goto err; + goto err; } } + + subj = strdup( ctx->isProxy ? event->any.user : can_peername); + userid = strdup(strmd5(subj, NULL)); + if (store_user(ctx,userid,subj)) goto err; + if (store_job(ctx,(glite_jobid_const_t) event->any.jobId, + userid, ctx->isProxy, local_job ? ctx->serverRunning : 0, grey, 0 )) goto err; + *register_to_JP = 1; } else { - /* event arrived on LB port */ - if (event->any.type == EDG_WLL_EVENT_REGJOB) { - trio_asprintf(&q,"select cert_subj from jobs,users where jobs.jobid='%|Ss'" - " AND jobs.userid=users.userid",unique); - if ( (nar = edg_wll_ExecSQL(ctx,q,&stmt)) < 0 || edg_wll_FetchRow(ctx,stmt,1,NULL,&owner) < 0 ) { - goto err; - } - free(q); q = NULL; - - if (nar) { - /* job is already registered */ - if (!strcmp(owner,"unknown_to_proxy")) { - /* proxy registration was already done */ - userid = strdup(strmd5(can_peername, NULL)); - if (store_user(ctx,userid,can_peername)) goto err; - - trio_asprintf(&q,"update jobs set server=1, userid='%|Ss' where jobid='%|Ss'", - userid, unique); - - edg_wll_ExecSQL(ctx, q, NULL); - } - else { } /* re-registration, no action needed */ - } - else { - userid = strdup(strmd5(can_peername, NULL)); - if (store_user(ctx,userid,can_peername)) goto err; + /* Job already registered */ - if (store_job(ctx,(glite_jobid_const_t) event->any.jobId, - userid, 0, 1)) goto err; - } + if (edg_wll_FetchRow(ctx,stmt,sizeof(res)/sizeof(res[0]),NULL,res) < 0) goto err; + if (ctx->greyjobs && !strcmp(res[2],"1") && + (event->any.type == EDG_WLL_EVENT_REGJOB) && + (event->any.priority & EDG_WLL_LOGFLAG_DIRECT)) + { + subj = strdup(ctx->isProxy ? event->any.user : can_peername); + userid = strdup(strmd5(subj, NULL)); + if (store_user(ctx,userid,subj)) goto err; + if (store_job(ctx,(glite_jobid_const_t) event->any.jobId, + userid, (ctx->isProxy || !strcmp(res[0],"1")), + !strcmp(res[1],"1") || (local_job ? ctx->serverRunning : 0), 0, 1)) goto err; + *register_to_JP = 1; + } else { - /* any other event than JobReg */ - /* no action needed */ + // if (!strcmp(res[0],"1") && !strcmp(res[1],"1") ) /*nothing to do */; + if ( (!strcmp(res[0],"0") && ctx->isProxy) || (!strcmp(res[1],"0") && !ctx->isProxy) ) { + trio_asprintf(&q,"update jobs set server='1', proxy='1' where jobid='%|Ss'", + unique); + } } + + /* ??? test whether user from proxy is the same as user from server ??? */ } - + err: + free(res[0]); free(res[1]); free(res[3]); if (stmt) glite_lbu_FreeStmt(&stmt); - free(unique); + free(subj); free(userid); free(q); return edg_wll_Error(ctx,NULL,NULL); -} +} -/* - * XXX: store it in SHORT_FIELDS for now despite it should go to dedicated - * column in EVENTS. - * - * don't want to change the database structure now, will be done anyway - * soon - */ -static int store_seq(edg_wll_Context ctx,edg_wll_Event *e,int no) -{ - int ret; - char *stmt; - char *jobid = edg_wlc_JobIdGetUnique(e->any.jobId); - - edg_wll_ResetError(ctx); - trio_asprintf(&stmt,"insert into short_fields(jobid,event,name,value) " - "values ('%|Ss',%d,'SEQCODE','%|Ss')", - jobid,no,e->any.seqcode); - - ret = edg_wll_ExecSQL(ctx,stmt,NULL); - free(stmt); - free(jobid); - return ret>=0 ? 0 : edg_wll_Error(ctx,NULL,NULL); -} #define SHORT_LEN 255 /* short_fiels.value db column lenght */ @@ -705,19 +539,12 @@ static int check_auth(edg_wll_Context ctx,edg_wll_Event *e) char *jobid = edg_wlc_JobIdGetUnique(e->any.jobId); char *q = NULL,*owner = NULL; glite_lbu_Statement stmt = NULL; - char *user; edg_wll_ResetError(ctx); if (!ctx->isProxy && !ctx->peerName) return edg_wll_SetError(ctx,EPERM,"can't store using unauthenticated connection"); -#if 0 - if (e->type == EDG_WLL_EVENT_REGJOB) - return strcmp(e->any.user,EDG_WLL_LOG_USER_DEFAULT) ? - 0 : edg_wll_SetError(ctx,EPERM,"can't register jobs anonymously"); -#endif - trio_asprintf(&q,"select u.cert_subj from jobs j, users u " "where j.jobid='%|Ss' and u.userid=j.userid",jobid); @@ -726,14 +553,7 @@ static int check_auth(edg_wll_Context ctx,edg_wll_Event *e) ) goto clean; if (!owner) { - if ( ctx->isProxy ) - edg_wll_SetError(ctx, EINVAL, "Job not registered"); - else - /* We have to let the calling function know what happened here - * even if it hapens inside the LB Proxy which shouldn't consider - * this as an error - */ - edg_wll_SetError(ctx, ENOENT, "job not registered"); + edg_wll_SetError(ctx, ENOENT, "job not registered"); goto clean; } @@ -760,7 +580,7 @@ clean: } #ifndef LB_DAG_EMBRIONIC -static int register_subjobs(edg_wll_Context ctx,const edg_wll_RegJobEvent *e) +int register_subjobs(edg_wll_Context ctx,const edg_wll_RegJobEvent *e) { int i,err; edg_wlc_JobId *subjobs; @@ -856,7 +676,7 @@ static int register_subjobs(edg_wll_Context ctx,const edg_wll_RegJobEvent *e) static edg_wll_ErrorCode states_values_embryonic( edg_wll_Context ctx, - edg_wlc_JobId jobid, + glite_jobid_const_t jobid, const edg_wll_RegJobEvent *e, char **icnames, char **values) @@ -898,21 +718,22 @@ err: return edg_wll_Error(ctx,NULL,NULL); } -static int register_subjobs_embryonic(edg_wll_Context ctx,const edg_wll_RegJobEvent *e, const char *userid) +int register_subjobs_embryonic(edg_wll_Context ctx,const edg_wll_RegJobEvent *e) { - int i, err = 0; - edg_wlc_JobId *subjobs; + int i, j, err = 0; + edg_wlc_JobId *subjobs = NULL; struct timeval now; - char *jobid_md5, *jobid_md5_old; + char *jobid = NULL, *jobid_md5 = NULL, *jobid_md5_old = NULL; size_t jobid_len; #ifdef LB_BUF glite_lbu_bufInsert bi_j; glite_lbu_bufInsert *bi_jobs = &bi_j; - char *states_cols; + char *states_cols = NULL; #endif glite_lbu_bufInsert bi_s, *bi_states = &bi_s; - char *icnames, *values; + char *icnames = NULL, *values = NULL, *userid = NULL, *stmt = NULL; int server, proxy, membership = 0; + glite_lbu_Statement sh = NULL; edg_wll_ResetError(ctx); @@ -952,10 +773,16 @@ static int register_subjobs_embryonic(edg_wll_Context ctx,const edg_wll_RegJobEv ctx->p_tmp_timeout.tv_sec += e->nsubjobs/10; if (ctx->p_tmp_timeout.tv_sec > 86400) ctx->p_tmp_timeout.tv_sec = 86400; - membership = edg_wll_jobMembership(ctx, e->jobId); + if ((membership = edg_wll_jobMembership(ctx, e->jobId)) < 0) goto err; + proxy = membership & DB_PROXY_JOB; server = membership & DB_SERVER_JOB; + /* get userid of parent job */ + jobid = edg_wlc_JobIdGetUnique(e->jobId); + trio_asprintf(&stmt,"select userid from jobs where jobid='%|Ss'", jobid); + if (edg_wll_ExecSQL(ctx,stmt,&sh) < 0 || edg_wll_FetchRow(ctx,sh,1,NULL,&userid) < 0) goto err; + for (i=0; insubjobs; i++) { char *et,*ed,*job_s,*p,*p1; @@ -963,9 +790,9 @@ static int register_subjobs_embryonic(edg_wll_Context ctx,const edg_wll_RegJobEv #ifdef LB_BUF if ((err = store_job_block(ctx, subjobs[i], userid, bi_jobs, proxy, server))) #else - if ((err = store_job(ctx, subjobs[i], userid, proxy, server))) + if ((err = store_job(ctx, subjobs[i], userid, proxy, server, 0, 0))) #endif - edg_wll_Error(ctx,&et,&ed); + if (edg_wll_Error(ctx,&et,&ed) == EDEADLOCK) goto err; /* interchange variable parts (jobids) in values */ /* there are only two occurences of subjob jobid */ @@ -985,10 +812,6 @@ static int register_subjobs_embryonic(edg_wll_Context ctx,const edg_wll_RegJobEv if (!err && (err = edg_wll_StoreIntStateEmbryonic(ctx, subjobs[i], icnames, values, bi_states))) edg_wll_Error(ctx,&et,&ed); -//job_s = edg_wlc_JobIdUnparse(subjobs[i]); -//printf("%s\n", job_s); -//free(job_s); - if (err) { job_s = edg_wlc_JobIdUnparse(subjobs[i]); fprintf(stderr,"%s: %s (%s)\n",job_s,et,ed); @@ -999,10 +822,15 @@ static int register_subjobs_embryonic(edg_wll_Context ctx,const edg_wll_RegJobEv edg_wlc_JobIdFree(subjobs[i]); } +err: free(jobid_md5_old); //free the last one free(icnames); free(values); + /* free the rest of subjobs if DEADLOCK occurs */ + for (j=i; jnsubjobs; j++) edg_wlc_JobIdFree(subjobs[i]); free(subjobs); + if (sh) glite_lbu_FreeStmt(&sh); + free(stmt); #ifdef LB_BUF /* commit the rest of multirows insert and clean structures */ @@ -1050,3 +878,19 @@ int edg_wll_delete_event(edg_wll_Context ctx,const char *jobid,int event) return edg_wll_Error(ctx,NULL,NULL); } + + +/* XXX: if event type is user tag, convert the tag name to lowercase! + * (not sure whether to convert a value too is reasonable + * or keep it 'case sensitive') + */ +static void lowercase_usertag(edg_wll_Event *ev) +{ + int i; + + if ( ev->any.type == EDG_WLL_EVENT_USERTAG ) { + + for ( i = 0; ev->userTag.name[i] != '\0'; i++ ) + ev->userTag.name[i] = tolower(ev->userTag.name[i]); + } +} diff --git a/org.glite.lb.server/src/userjobs.c b/org.glite.lb.server/src/userjobs.c index 61063a3..799b0de 100644 --- a/org.glite.lb.server/src/userjobs.c +++ b/org.glite.lb.server/src/userjobs.c @@ -21,7 +21,7 @@ int edg_wll_UserJobsServer( char *userid, *stmt = NULL, *res = NULL; char *can_peername; - int njobs = 0,ret,i,j; + int njobs = 0,ret,i,j,idx; edg_wlc_JobId *out = NULL; glite_lbu_Statement sth = NULL; edg_wll_ErrorCode err = 0; @@ -52,7 +52,7 @@ int edg_wll_UserJobsServer( free(stmt); stmt = NULL; free(res); res = NULL; - trio_asprintf(&stmt,"select dg_jobid from jobs where userid = '%|Ss'",userid); + trio_asprintf(&stmt,"select dg_jobid from jobs where userid = '%|Ss' and grey='0'",userid); switch (njobs = edg_wll_ExecSQL(ctx,stmt,&sth)) { case 0: edg_wll_SetError(ctx,ENOENT,ctx->peerName); case -1: goto err; @@ -71,12 +71,18 @@ int edg_wll_UserJobsServer( } *states = calloc(njobs, sizeof(**states)); + idx = 0; for (i = 0; i < njobs; i++) { - if (edg_wll_JobStatusServer(ctx, out[i], -1, &(*states)[i]) != 0) { + if (edg_wll_JobStatusServer(ctx, out[idx], -1, &(*states)[idx]) != 0) { for (j = 0; j < i; j++) edg_wll_FreeStatus(&(*states)[j]); *states = NULL; - break; + if (edg_wll_Error(ctx, NULL, NULL) == ENOENT) { + /* some jobs may be purged meanwhile, ignore */ + continue; + } + else break; } + idx++; } err: free(res); -- 1.8.2.3