aclid char(32) binary null,
proxy bool not null,
server bool not null,
-
+ grey bool not null,
+ nevents int not null,
primary key (jobid),
unique (dg_jobid),
index (userid)
) engine=innodb;
-create table grey_jobs (
- jobid char(32) binary not null,
- dg_jobid varchar(255) binary not null,
- time_stamp datetime not null,
-
- primary key (jobid),
- unique (dg_jobid)
-) engine=innodb;
-
create table users (
userid char(32) binary not null,
cert_subj varchar(255) binary not null,
arrived datetime not null,
ulm mediumblob not null, -- testing (1)
+ seqcode varchar(255) binary not null,
primary key (jobid,event),
index (time_stamp),
int db_parent_store(edg_wll_Context, edg_wll_Event *, intJobStat *);
int handle_request(edg_wll_Context,char *);
int create_reply(const edg_wll_Context,char **);
-int is_job_local(edg_wll_Context, edg_wlc_JobId jobId);
-int store_job_server_proxy(edg_wll_Context ctx, edg_wll_Event *event);
+int is_job_local(edg_wll_Context, glite_jobid_const_t jobId);
+int store_job_server_proxy(edg_wll_Context ctx, edg_wll_Event *event, int *register_to_JP);
+#ifndef LB_DAG_EMBRIONIC
+int register_subjobs(edg_wll_Context,const edg_wll_RegJobEvent *);
+#endif
+int register_subjobs_embryonic(edg_wll_Context,const edg_wll_RegJobEvent *);
+
int edg_wll_delete_event(edg_wll_Context,const char *, int);
#include "glite/lb/context-int.h"
#include "db_calls.h"
-# include "db_supp.h"
+#include "db_supp.h"
/** Returns bitmask of job membership in common server/proxy database
*/
-int edg_wll_jobMembership(edg_wll_Context ctx, edg_wlc_JobId job)
+int edg_wll_jobMembership(edg_wll_Context ctx, glite_jobid_const_t job)
{
char *dbjob;
char *stmt = NULL;
dbjob = edg_wlc_JobIdGetUnique(job);
- trio_asprintf(&stmt,"select proxy,server from jobs where jobid = '%|Ss'",dbjob);
+ trio_asprintf(&stmt,"select proxy,server from jobs where jobid = '%|Ss' for update",dbjob);
ret = edg_wll_ExecSQL(ctx,stmt,&q);
if (ret <= 0) {
if (ret == 0) {
free(stmt);
return(result);
}
+
+
+/* just lock one row corresponding to job in table jobs
+ * lock_mode: 0 = in share mode / 1 = for update
+ */
+int edg_wll_LockJobRow(edg_wll_Context ctx, glite_jobid_const_t job, int lock_mode)
+{
+ char *jobid_md5 = NULL;
+ char *stmt = NULL;
+ glite_lbu_Statement sh;
+ int nr;
+
+
+ edg_wll_ResetError(ctx);
+ jobid_md5 = edg_wlc_JobIdGetUnique(job);
+
+ if (lock_mode)
+ trio_asprintf(&stmt, "select count(*) from jobs where jobid='%|Ss' for update", jobid_md5);
+ else
+ trio_asprintf(&stmt, "select count(*) from jobs where jobid='%|Ss' in share mode", jobid_md5);
+
+ if ((nr = edg_wll_ExecSQL(ctx,stmt,&sh)) < 0) goto cleanup;
+ if (nr == 0) {
+ edg_wll_SetError(ctx,ENOENT,"no state in DB");
+ goto cleanup;
+ }
+
+cleanup:
+ if (sh) glite_lbu_FreeStmt(&sh);
+ free(stmt); stmt = NULL;
+ free(jobid_md5);
+
+ return edg_wll_Error(ctx, NULL, NULL);
+}
+
#define DB_PROXY_JOB 1
#define DB_SERVER_JOB 2
-int edg_wll_jobMembership(edg_wll_Context ctx, edg_wlc_JobId job);
+int edg_wll_jobMembership(edg_wll_Context ctx, glite_jobid_const_t job);
+
+#define edg_wll_LockJobRowInShareMode(X,Y) edg_wll_LockJobRow(X,Y,0)
+#define edg_wll_LockJobRowForUpdate(X,Y) edg_wll_LockJobRow(X,Y,1)
+int edg_wll_LockJobRow(edg_wll_Context ctx, glite_jobid_const_t job, int lock_mode);
+
#endif /* GLITE_LB_LB_CALLS_H */
#include "glite/lb/lb_maildir.h"
#include "purge.h"
#include "store.h"
-#include "lock.h"
#include "il_lbproxy.h"
#include "jobstat.h"
+#include "db_supp.h"
#ifdef LB_PERF
#include "glite/lb/lb_perftest.h"
#endif
-/* XXX */
-#define use_db 1
-
extern int unset_proxy_flag(edg_wll_Context, edg_wlc_JobId);
extern int edg_wll_NotifMatch(edg_wll_Context, const edg_wll_JobStat *);
-static int db_store_finalize(edg_wll_Context ctx, char *event, edg_wll_Event *ev, edg_wll_JobStat *newstat, int seq, int reg_to_JP);
+static int db_store_finalize(edg_wll_Context ctx, char *event, edg_wll_Event *ev, edg_wll_JobStat *newstat, int reg_to_JP);
int
db_store(edg_wll_Context ctx, char *event)
{
- edg_wll_Event *ev;
- int seq, reg_to_JP = 0;
- int err;
- int local_job;
+ edg_wll_Event *ev = NULL;
+ int seq, reg_to_JP = 0, local_job;
edg_wll_JobStat newstat;
- ev = NULL;
-
edg_wll_ResetError(ctx);
memset(&newstat,0,sizeof newstat);
- if(edg_wll_ParseEvent(ctx, event, &ev))
- goto err;
+ if(edg_wll_ParseEvent(ctx, event, &ev)) goto err;
+
local_job = is_job_local(ctx, ev->any.jobId);
#ifdef LB_PERF
}
#endif
- if(use_db) {
- char *ed;
- int code;
-
- if (edg_wll_LockJob(ctx,ev->any.jobId)) goto err;
- store_job_server_proxy(ctx, ev);
- code = edg_wll_Error(ctx,NULL,&ed);
- edg_wll_UnlockJob(ctx,ev->any.jobId); /* XXX: ignore error */
- if (code) {
- edg_wll_SetError(ctx,code,ed);
- free(ed);
- goto err;
- }
- }
+ do {
+ if (edg_wll_Transaction(ctx)) goto err;
+
+ if (store_job_server_proxy(ctx, ev, ®_to_JP)) goto rollback;
+ /* events logged to proxy and server (DIRECT flag) may be ignored on proxy
+ * if jobid prefix hostname matches server hostname -> they will
+ * sooner or later arrive to server too and are stored in common DB
+ */
+ if (ctx->isProxy && local_job && (ev->any.priority & EDG_WLL_LOGFLAG_DIRECT)) {
+ goto commit;
+ }
- /* events logged to proxy and server (DIRECT flag) may be ignored on proxy
- * if jobid prefix hostname matches server hostname -> they will
- * sooner or later arrive to server too and are stored in common DB
- */
- if (ctx->isProxy && local_job) {
- if (ev->any.priority & EDG_WLL_LOGFLAG_DIRECT) {
- edg_wll_FreeEvent(ev);
- free(ev);
- return 0;
+ if (edg_wll_StoreEvent(ctx, ev, event, &seq)) goto rollback;
+
+ if ( ev->any.type == EDG_WLL_EVENT_CHANGEACL ) {
+ if (edg_wll_UpdateACL(ctx, ev->any.jobId,
+ ev->changeACL.user_id, ev->changeACL.user_id_type,
+ ev->changeACL.permission, ev->changeACL.permission_type,
+ ev->changeACL.operation)) goto rollback;
+
}
else {
- /* these are re-registrations of subjobs on proxy */
- /* embryonic registrations does not trigger registration in JP */
- reg_to_JP = 1;
+#ifdef LB_PERF
+ if(sink_mode == GLITE_LB_SINK_STATE) {
+ glite_wll_perftest_consumeEvent(ev);
+ goto commit;
+ }
+#endif
+
+ if ( newstat.state ) { /* prevent memleaks in case of transaction retry */
+ edg_wll_FreeStatus(&newstat);
+ newstat.state = EDG_WLL_JOB_UNDEF;
+ }
+ if (edg_wll_StepIntState(ctx,ev->any.jobId, ev, seq, &newstat)) goto rollback;
+
+ if (newstat.remove_from_proxy)
+ if (edg_wll_PurgeServerProxy(ctx, ev->any.jobId)) goto rollback;
}
- }
- /* XXX: if event type is user tag, convert the tag name to lowercase!
- * (not sure whether to convert a value too is reasonable
- * or keep it 'case sensitive')
- */
- if ( ev->any.type == EDG_WLL_EVENT_USERTAG )
- {
- int i;
- for ( i = 0; ev->userTag.name[i] != '\0'; i++ )
- ev->userTag.name[i] = tolower(ev->userTag.name[i]);
- }
-
- if(use_db) {
- if (ctx->strict_locking && edg_wll_LockJob(ctx,ev->any.jobId)) goto err;
- if(edg_wll_StoreEvent(ctx, ev, event, &seq)) {
- edg_wll_UnlockJob(ctx,ev->any.jobId);
- goto err;
- }
- }
+commit:
+rollback:;
+ } while (edg_wll_TransNeedRetry(ctx));
+
+ if (edg_wll_Error(ctx, NULL, NULL)) goto err;
- if (!ctx->strict_locking && edg_wll_LockJob(ctx,ev->any.jobId)) goto err;
- if ( ev->any.type == EDG_WLL_EVENT_CHANGEACL ) {
- err = edg_wll_UpdateACL(ctx, ev->any.jobId,
- ev->changeACL.user_id, ev->changeACL.user_id_type,
- ev->changeACL.permission, ev->changeACL.permission_type,
- ev->changeACL.operation);
+ do {
+ if (edg_wll_Transaction(ctx)) goto err;
- edg_wll_UnlockJob(ctx,ev->any.jobId);
- }
- else {
-#ifdef LB_PERF
- if(sink_mode == GLITE_LB_SINK_STATE) {
- glite_wll_perftest_consumeEvent(ev);
- edg_wll_UnlockJob(ctx,ev->any.jobId);
- goto err;
- }
+ if (ev->any.type == EDG_WLL_EVENT_REGJOB &&
+ (ev->regJob.jobtype == EDG_WLL_REGJOB_DAG ||
+ ev->regJob.jobtype == EDG_WLL_REGJOB_PARTITIONED ||
+ ev->regJob.jobtype == EDG_WLL_REGJOB_COLLECTION) &&
+ ev->regJob.nsubjobs > 0)
+
+#ifdef LB_DAG_EMBRIONIC
+ if (register_subjobs_embryonic(ctx,&ev->regJob)) goto rollback2;
+#else
+ if (register_subjobs(ctx,&ev->regJob)) goto rollback2;
#endif
- err = edg_wll_StepIntState(ctx,ev->any.jobId, ev, seq, &newstat);
- }
+rollback2:;
+ } while (edg_wll_TransNeedRetry(ctx));
- /* XXX: in edg_wll_StepIntState()
- * if (edg_wll_UnlockJob(ctx,ev->any.jobId)) goto err;
- */
- if (err) goto err;
+ if (edg_wll_Error(ctx, NULL, NULL)) goto err;
- db_store_finalize(ctx, event, ev, &newstat, seq, reg_to_JP);
-err:
+ db_store_finalize(ctx, event, ev, &newstat, reg_to_JP);
- if(ev) {
- edg_wll_FreeEvent(ev);
- free(ev);
- }
+err:
+ if(ev) { edg_wll_FreeEvent(ev); free(ev); }
if ( newstat.state ) edg_wll_FreeStatus(&newstat);
-
return edg_wll_Error(ctx,NULL,NULL);
}
+
/* Called only when CollectionStateEvent generated */
int
db_parent_store(edg_wll_Context ctx, edg_wll_Event *ev, intJobStat *is)
edg_wll_ResetError(ctx);
memset(&newstat,0,sizeof newstat);
- /* Locked from load_parent_intJobStat() */
+ /* Transaction opened from db_store */
#ifdef LB_PERF
if (sink_mode == GLITE_LB_SINK_STORE) {
assert(ev->any.user);
- if(use_db) {
if(edg_wll_StoreEvent(ctx, ev, NULL, &seq))
goto err;
- }
#ifdef LB_PERF
if(sink_mode == GLITE_LB_SINK_STATE) {
assert(event);
}
- db_store_finalize(ctx, event, ev, &newstat, seq, 0);
+ db_store_finalize(ctx, event, ev, &newstat, 0);
err:
}
-
-static int db_store_finalize(edg_wll_Context ctx, char *event, edg_wll_Event *ev, edg_wll_JobStat *newstat, int seq, int reg_to_JP)
+/* Send regitration to JP
+ */
+static int register_to_JP(edg_wll_Context ctx, edg_wll_Event *ev)
{
- int local_job = is_job_local(ctx, ev->any.jobId);
-
-
-#ifdef LB_PERF
- if( sink_mode == GLITE_LB_SINK_SEND ) {
- glite_wll_perftest_consumeEvent(ev);
- return edg_wll_Error(ctx,NULL,NULL);
- }
-#endif
+ char *jids, *msg;
- /* Send regitration to JP
- */
- if ( ctx->jpreg_dir && ev->any.type == EDG_WLL_EVENT_REGJOB && seq == 0 &&
- (!ctx->isProxy || reg_to_JP) ) {
- char *jids, *msg;
-
- if ( !(jids = edg_wlc_JobIdUnparse(ev->any.jobId)) ) {
- return edg_wll_SetError(ctx, errno, "Can't unparse jobid when registering to JP");
- }
- if ( !(msg = realloc(jids, strlen(jids)+strlen(ev->any.user)+2)) ) {
- free(jids);
- return edg_wll_SetError(ctx, errno, "Can't allocate buffer when registering to JP");
- }
- strcat(msg, "\n");
- strcat(msg, ev->any.user);
- if ( edg_wll_MaildirStoreMsg(ctx->jpreg_dir, ctx->srvName, msg) ) {
- free(msg);
- return edg_wll_SetError(ctx, errno, lbm_errdesc);
- }
+ if ( !(jids = edg_wlc_JobIdUnparse(ev->any.jobId)) ) {
+ return edg_wll_SetError(ctx, errno, "Can't unparse jobid when registering to JP");
+ }
+ if ( !(msg = realloc(jids, strlen(jids)+strlen(ev->any.user)+2)) ) {
+ free(jids);
+ return edg_wll_SetError(ctx, errno, "Can't allocate buffer when registering to JP");
+ }
+ strcat(msg, "\n");
+ strcat(msg, ev->any.user);
+ if ( edg_wll_MaildirStoreMsg(ctx->jpreg_dir, ctx->srvName, msg) ) {
free(msg);
+ return edg_wll_SetError(ctx, errno, lbm_errdesc);
}
+ free(msg);
+
+ return edg_wll_Error(ctx,NULL,NULL);
+}
+static int forward_event_to_server(edg_wll_Context ctx, char *event, edg_wll_Event *ev, int local_job)
+{
if ( ctx->isProxy ) {
/*
* send event to the proper BK server
return edg_wll_SetError(ctx, EDG_WLL_IL_PROTO, "edg_wll_EventSendProxy() error.");
}
}
- else {
- /* event will not arrive to server, only flag was set */
- /* check whether some pending notifications are not triggered */
- if ( newstat->state ) {
+ }
+
+ return edg_wll_Error(ctx,NULL,NULL);
+}
+
+
+static int db_store_finalize(edg_wll_Context ctx, char *event, edg_wll_Event *ev, edg_wll_JobStat *newstat, int reg_to_JP)
+{
+ int local_job = is_job_local(ctx, ev->any.jobId);
+
+
+#ifdef LB_PERF
+ if( sink_mode == GLITE_LB_SINK_SEND ) {
+ glite_wll_perftest_consumeEvent(ev);
+ return edg_wll_Error(ctx,NULL,NULL);
+ }
+#endif
+
+ if (reg_to_JP)
+ if (register_to_JP(ctx,ev)) goto err;
+
+ if (forward_event_to_server(ctx, event, ev, local_job)) goto err;
+
+ if (newstat->state) {
+ if ( ctx->isProxy ) {
+ if ((ev->any.priority & EDG_WLL_LOGFLAG_DIRECT) || local_job)
+ /* event will not arrive to server, only flag was set */
+ /* check whether some pending notifications are not triggered */
edg_wll_NotifMatch(ctx, newstat);
}
- }
-
- /* LB proxy purge */
- if (newstat->remove_from_proxy) {
- edg_wll_PurgeServerProxy(ctx, ev->any.jobId);
- }
- } else
- {
- /* Purge proxy flag */
- if ( newstat->remove_from_proxy && local_job ) {
- if (unset_proxy_flag(ctx, ev->any.jobId) < 0) {
- return(edg_wll_Error(ctx,NULL,NULL));
- }
- }
-
- if ( newstat->state ) {
- edg_wll_NotifMatch(ctx, newstat);
+ else {
+ edg_wll_NotifMatch(ctx, newstat);
}
}
-
+err:
return edg_wll_Error(ctx,NULL,NULL);
}
#include "lb_authz.h"
#include "stats.h"
#include "db_supp.h"
+#include "db_calls.h"
#define DAG_ENABLE 1
+#define DONT_LOCK 0
+#define LOCK 1
+
/* TBD: share in whole logging or workload */
#ifdef __GNUC__
#define UNUSED_VAR __attribute__((unused))
{
/* Local variables */
- char *string_jobid;
- char *md5_jobid;
+ char *string_jobid = NULL;
+ char *md5_jobid = NULL;
intJobStat jobstat;
intJobStat *ijsp;
- int intErr = 0;
- int lockErr;
+ int whole_cycle;
edg_wll_Acl acl = NULL;
#if DAG_ENABLE
char *stmt = NULL;
#endif
- char *errdesc = NULL;
- //The following declarations have originally been positioned in the funcion's code
- //That was rather messy and lead to redeclaratios :-(
char *stat_str, *s_out;
intJobStat *js;
char *out[1];
- glite_lbu_Statement sh;
+ glite_lbu_Statement sh = NULL;
int num_sub, num_f, i, ii;
+
edg_wll_ResetError(ctx);
+ jobstat.pub.owner = NULL;
string_jobid = edg_wlc_JobIdUnparse(job);
if (string_jobid == NULL || stat == NULL)
return edg_wll_SetError(ctx,EINVAL, NULL);
md5_jobid = edg_wlc_JobIdGetUnique(job);
- if ( !(jobstat.pub.owner = job_owner(ctx,md5_jobid)) ) {
- free(md5_jobid);
- free(string_jobid);
- return edg_wll_Error(ctx,NULL,NULL);
- }
+ do {
+ whole_cycle = 0;
- intErr = edg_wll_GetACL(ctx, job, &acl);
- if (intErr) {
- free(md5_jobid);
- free(string_jobid);
- free(jobstat.pub.owner);
- return edg_wll_Error(ctx,NULL,NULL);
- }
+ if (edg_wll_Transaction(ctx)) goto rollback;
+ if (edg_wll_LockJobRowInShareMode(ctx, job)) goto rollback;;
- /* authorization check */
- if ( !(ctx->noAuth) &&
- (!(ctx->peerName) || !edg_wll_gss_equal_subj(ctx->peerName, jobstat.pub.owner))) {
- intErr = (acl == NULL) || edg_wll_CheckACL(ctx, acl, EDG_WLL_PERM_READ);
- if (intErr) {
- free(string_jobid);
- free(md5_jobid);
- free(jobstat.pub.owner); jobstat.pub.owner = NULL;
- if (acl) {
- edg_wll_FreeAcl(acl);
- return edg_wll_Error(ctx, NULL, NULL);
- } else {
- return edg_wll_SetError(ctx,EPERM, "not owner, no ACL is set");
- }
- }
- }
- intErr = edg_wll_LoadIntState(ctx, job, -1 /*all events*/, &ijsp);
- if (!intErr) {
- *stat = ijsp->pub;
- free(jobstat.pub.owner); jobstat.pub.owner = NULL;
- destroy_intJobStat_extension(ijsp);
- free(ijsp);
+ if (edg_wll_GetACL(ctx, job, &acl)) goto rollback;
- } else {
- lockErr = edg_wll_LockJob(ctx,job);
- intErr = edg_wll_intJobStatus(ctx, job, flags,&jobstat, js_enable_store && !lockErr);
- if (intErr) edg_wll_Error(ctx, NULL, &errdesc);
- if (!lockErr) {
- edg_wll_UnlockJob(ctx,job);
+ /* authorization check */
+ if ( !(ctx->noAuth) &&
+ (!(ctx->peerName) || !edg_wll_gss_equal_subj(ctx->peerName, jobstat.pub.owner))) {
+ if ((acl == NULL) || edg_wll_CheckACL(ctx, acl, EDG_WLL_PERM_READ)) {
+ if (acl) {
+ goto rollback;
+ } else {
+ edg_wll_SetError(ctx,EPERM, "not owner, no ACL is set");
+ goto rollback;
+ }
+ }
}
-
- *stat = jobstat.pub;
- if (intErr) edg_wll_FreeStatus(&jobstat.pub);
- destroy_intJobStat_extension(&jobstat);
- }
-
- if (intErr) {
- free(string_jobid);
- free(md5_jobid);
- if (acl) edg_wll_FreeAcl(acl);
- edg_wll_SetError(ctx, intErr, errdesc);
- free(errdesc);
- return edg_wll_UpdateError(ctx, EDG_WLL_ERROR_SERVER_RESPONSE, "Could not compute job status from events");
- }
- if (acl) {
- stat->acl = strdup(acl->string);
- edg_wll_FreeAcl(acl);
- }
+ if (!edg_wll_LoadIntState(ctx, job, DONT_LOCK, -1 /*all events*/, &ijsp)) {
+ *stat = ijsp->pub;
+ free(jobstat.pub.owner); jobstat.pub.owner = NULL;
+ destroy_intJobStat_extension(ijsp);
+ free(ijsp);
+ } else {
+ if (edg_wll_intJobStatus(ctx, job, flags,&jobstat, js_enable_store)) {
+ edg_wll_UpdateError(ctx, EDG_WLL_ERROR_SERVER_RESPONSE, "Could not compute job status from events");
+ goto rollback;
+ }
+ *stat = jobstat.pub;
+ }
+
+ if (acl) {
+ stat->acl = strdup(acl->string);
+ edg_wll_FreeAcl(acl);
+ }
- if ((flags & EDG_WLL_STAT_CLASSADS) == 0) {
- char *null = NULL;
+ if ((flags & EDG_WLL_STAT_CLASSADS) == 0) {
+ char *null = NULL;
- mov(stat->jdl, null);
- mov(stat->matched_jdl, null);
- mov(stat->condor_jdl, null);
- mov(stat->rsl, null);
- }
+ mov(stat->jdl, null);
+ mov(stat->matched_jdl, null);
+ mov(stat->condor_jdl, null);
+ mov(stat->rsl, null);
+ }
-#if DAG_ENABLE
- if (stat->jobtype == EDG_WLL_STAT_DAG || stat->jobtype == EDG_WLL_STAT_COLLECTION) {
+ #if DAG_ENABLE
+ if (stat->jobtype == EDG_WLL_STAT_DAG || stat->jobtype == EDG_WLL_STAT_COLLECTION) {
-// XXX: The users does not want any histogram. What do we do about it?
-// if ((!(flags & EDG_WLL_STAT_CHILDHIST_FAST))&&(!(flags & EDG_WLL_STAT_CHILDHIST_THOROUGH))) { /* No Histogram */
-// if (stat->children_hist != NULL) { /* No histogram will be sent even if there was one */
-//
-// printf("\nNo Histogram required\n\n");
-//
-// free(stat->children_hist);
-// }
-//
-// }
+ // XXX: The users does not want any histogram. What do we do about it?
+ // if ((!(flags & EDG_WLL_STAT_CHILDHIST_FAST))&&(!(flags & EDG_WLL_STAT_CHILDHIST_THOROUGH))) { /* No Histogram */
+ // if (stat->children_hist != NULL) { /* No histogram will be sent even if there was one */
+ //
+ // printf("\nNo Histogram required\n\n");
+ //
+ // free(stat->children_hist);
+ // }
+ //
+ // }
- if (flags & EDG_WLL_STAT_CHILDSTAT) {
+ if (flags & EDG_WLL_STAT_CHILDSTAT) {
- trio_asprintf(&stmt, "SELECT int_status FROM states WHERE parent_job='%|Ss'"
- " AND version='%|Ss'",
- md5_jobid, INTSTAT_VERSION);
- if (stmt != NULL) {
- num_sub = edg_wll_ExecSQL(ctx, stmt, &sh);
- if (num_sub >=0 ) {
- i = 0;
- stat->children_states = calloc(num_sub+1, sizeof(edg_wll_JobStat));
- if (stat->children_states == NULL) {
- glite_lbu_FreeStmt(&sh);
- goto dag_enomem;
- }
- while ((num_f = edg_wll_FetchRow(ctx, sh, 1, NULL, &stat_str)) == 1
- && i < num_sub) {
- js = dec_intJobStat(stat_str, &s_out);
- if (s_out != NULL && js != NULL) {
- stat->children_states[i] = js->pub;
- destroy_intJobStat_extension(js);
- free(js);
- i++; // Careful, this value will also be used further
+ trio_asprintf(&stmt, "SELECT int_status FROM states WHERE parent_job='%|Ss'"
+ " AND version='%|Ss'",
+ md5_jobid, INTSTAT_VERSION);
+ if (stmt != NULL) {
+ num_sub = edg_wll_ExecSQL(ctx, stmt, &sh);
+ if (num_sub >=0 ) {
+ i = 0;
+ stat->children_states = calloc(num_sub+1, sizeof(edg_wll_JobStat));
+ if (stat->children_states == NULL) {
+ edg_wll_SetError(ctx, ENOMEM, "edg_wll_JobStatusServer() calloc children_states failed!");
+ goto rollback;
}
- free(stat_str);
+ while ((num_f = edg_wll_FetchRow(ctx, sh, 1, NULL, &stat_str)) == 1
+ && i < num_sub) {
+ js = dec_intJobStat(stat_str, &s_out);
+ if (s_out != NULL && js != NULL) {
+ stat->children_states[i] = js->pub;
+ destroy_intJobStat_extension(js);
+ free(js);
+ i++; // Careful, this value will also be used further
+ }
+ free(stat_str);
+ }
+ if (num_f < 0) goto rollback;
+
+ glite_lbu_FreeStmt(&sh); sh = NULL;
}
- glite_lbu_FreeStmt(&sh);
+ else goto rollback;
+
+ free(stmt); stmt = NULL;
+ } else {
+ edg_wll_SetError(ctx, ENOMEM, "edg_wll_JobStatusServer() trio_asprintf failed!");
+ goto rollback;
}
- free(stmt);
- } else goto dag_enomem;
- }
+ }
- if (flags & EDG_WLL_STAT_CHILDHIST_THOROUGH) { /* Full (thorough) Histogram */
+ if (flags & EDG_WLL_STAT_CHILDHIST_THOROUGH) { /* Full (thorough) Histogram */
- if (stat->children_hist == NULL) {
- stat->children_hist = (int*) calloc(1+EDG_WLL_NUMBER_OF_STATCODES, sizeof(int));
- stat->children_hist[0] = EDG_WLL_NUMBER_OF_STATCODES;
- }
- else {
- /* If hist is loaded, it probably contain only incomplete histogram
- * built in update_parent_status. Count it from scratch...*/
- for (ii=1; ii<=EDG_WLL_NUMBER_OF_STATCODES; ii++)
- stat->children_hist[ii] = 0;
- }
+ if (stat->children_hist == NULL) {
+ stat->children_hist = (int*) calloc(1+EDG_WLL_NUMBER_OF_STATCODES, sizeof(int));
+ if (stat->children_hist == NULL) {
+ edg_wll_SetError(ctx, ENOMEM, "edg_wll_JobStatusServer() calloc children_hist failed!");
+ goto rollback;
+ }
- if (flags & EDG_WLL_STAT_CHILDSTAT) { // Job states have already been loaded
- for ( ii = 0 ; ii < i ; ii++ ) {
- stat->children_hist[(stat->children_states[ii].state)+1]++;
+ stat->children_hist[0] = EDG_WLL_NUMBER_OF_STATCODES;
}
- }
- else {
- // Get child states from the database
- trio_asprintf(&stmt, "SELECT status FROM states WHERE parent_job='%|Ss' AND version='%|Ss'",
- md5_jobid, INTSTAT_VERSION);
- out[1] = NULL;
- if (stmt != NULL) {
- num_sub = edg_wll_ExecSQL(ctx, stmt, &sh);
- if (num_sub >=0 ) {
- while ((num_f = edg_wll_FetchRow(ctx, sh, sizeof(out)/sizeof(out[0]), NULL, out)) == 1 ) {
- num_f = atoi(out[0]);
- if (num_f > EDG_WLL_JOB_UNDEF && num_f < EDG_WLL_NUMBER_OF_STATCODES)
- stat->children_hist[num_f+1]++;
- free(out[0]);
+ else {
+ /* If hist is loaded, it probably contain only incomplete histogram
+ * built in update_parent_status. Count it from scratch...*/
+ for (ii=1; ii<=EDG_WLL_NUMBER_OF_STATCODES; ii++)
+ stat->children_hist[ii] = 0;
+ }
+
+ if (flags & EDG_WLL_STAT_CHILDSTAT) { // Job states have already been loaded
+ for ( ii = 0 ; ii < i ; ii++ ) {
+ stat->children_hist[(stat->children_states[ii].state)+1]++;
+ }
+ }
+ else {
+ // Get child states from the database
+ trio_asprintf(&stmt, "SELECT status FROM states WHERE parent_job='%|Ss' AND version='%|Ss'",
+ md5_jobid, INTSTAT_VERSION);
+ out[1] = NULL;
+ if (stmt != NULL) {
+ num_sub = edg_wll_ExecSQL(ctx, stmt, &sh);
+ if (num_sub >=0 ) {
+ while ((num_f = edg_wll_FetchRow(ctx, sh, sizeof(out)/sizeof(out[0]), NULL, out)) == 1 ) {
+ num_f = atoi(out[0]);
+ if (num_f > EDG_WLL_JOB_UNDEF && num_f < EDG_WLL_NUMBER_OF_STATCODES)
+ stat->children_hist[num_f+1]++;
+ free(out[0]);
+ }
+ if (num_f < 0) goto rollback;
+
+ glite_lbu_FreeStmt(&sh); sh = NULL;
}
- glite_lbu_FreeStmt(&sh);
+ else goto rollback;
+
+ free(stmt); stmt = NULL;
+ } else {
+ edg_wll_SetError(ctx, ENOMEM, "edg_wll_JobStatusServer() trio_asprintf failed!");
+ goto rollback;
}
- free(stmt);
- } else goto dag_enomem;
- }
- }
- else {
- if (flags & EDG_WLL_STAT_CHILDHIST_FAST) { /* Fast Histogram */
-
- if (stat->children_hist == NULL) {
- // If the histogram exists, assume that it was already filled during job state retrieval
- stat->children_hist = (int*) calloc(1+EDG_WLL_NUMBER_OF_STATCODES, sizeof(int));
- edg_wll_GetSubjobHistogram(ctx, job, stat->children_hist);
}
}
else {
- if (stat->children_hist) {
- free (stat->children_hist);
- stat->children_hist = NULL;
+ if (flags & EDG_WLL_STAT_CHILDHIST_FAST) { /* Fast Histogram */
+
+ if (stat->children_hist == NULL) {
+ // If the histogram exists, assume that it was already filled during job state retrieval
+ stat->children_hist = (int*) calloc(1+EDG_WLL_NUMBER_OF_STATCODES, sizeof(int));
+ if (stat->children_hist == NULL) {
+ edg_wll_SetError(ctx, ENOMEM, "edg_wll_JobStatusServer() calloc children_hist failed!");
+ goto rollback;
+ }
+
+ if (edg_wll_GetSubjobHistogram(ctx, job, stat->children_hist))
+ goto rollback;
+ }
+ }
+ else {
+ if (stat->children_hist) {
+ free (stat->children_hist);
+ stat->children_hist = NULL;
+ }
}
+
}
- }
+ if (flags & EDG_WLL_STAT_CHILDREN) {
- if (flags & EDG_WLL_STAT_CHILDREN) {
+ trio_asprintf(&stmt, "SELECT j.dg_jobid FROM states s,jobs j "
+ "WHERE s.parent_job='%|Ss' AND s.version='%|Ss' AND s.jobid=j.jobid",
+ md5_jobid, INTSTAT_VERSION);
+ if (stmt != NULL) {
+ num_sub = edg_wll_ExecSQL(ctx, stmt, &sh);
+ if (num_sub >=0 ) {
+ while ((num_f = edg_wll_FetchRow(ctx, sh, sizeof(out)/sizeof(out[0]), NULL, out)) == 1 ) {
+ add_stringlist(&stat->children, out[0]);
+ free(out[0]);
+ }
+ if (num_f < 0) goto rollback;
- trio_asprintf(&stmt, "SELECT j.dg_jobid FROM states s,jobs j "
- "WHERE s.parent_job='%|Ss' AND s.version='%|Ss' AND s.jobid=j.jobid",
- md5_jobid, INTSTAT_VERSION);
- if (stmt != NULL) {
- num_sub = edg_wll_ExecSQL(ctx, stmt, &sh);
- if (num_sub >=0 ) {
- while (edg_wll_FetchRow(ctx, sh, sizeof(out)/sizeof(out[0]), NULL, out) == 1 ) {
- add_stringlist(&stat->children, out[0]);
- free(out[0]);
+ glite_lbu_FreeStmt(&sh); sh = NULL;
}
- glite_lbu_FreeStmt(&sh);
+ else goto rollback;
+
+ free(stmt); stmt = NULL;
+ } else {
+ edg_wll_SetError(ctx, ENOMEM, "edg_wll_JobStatusServer() trio_asprintf failed!");
+ goto rollback;
}
- free(stmt);
- } else goto dag_enomem;
+ }
+ }
+#endif
+ whole_cycle = 1;
+commit:
+rollback:
+ if (!whole_cycle) {
+ edg_wll_FreeStatus(&jobstat.pub);
+ jobstat.pub.owner = NULL;
+ destroy_intJobStat_extension(&jobstat);
}
+ if (jobstat.pub.owner) { free(jobstat.pub.owner); jobstat.pub.owner = NULL; }
+ if (acl) { edg_wll_FreeAcl(acl); acl = NULL; }
+ if (stmt) { free(stmt); stmt = NULL; }
+ if (sh) { glite_lbu_FreeStmt(&sh); sh = NULL; }
- }
-#endif
- free(string_jobid);
- free(md5_jobid);
- return edg_wll_Error(ctx, NULL, NULL);
+ } while (edg_wll_TransNeedRetry(ctx));
-#if DAG_ENABLE
-dag_enomem:
free(string_jobid);
free(md5_jobid);
- edg_wll_FreeStatus(stat);
- free(stmt);
- return edg_wll_SetError(ctx, ENOMEM, NULL);
-#endif
+
+ return edg_wll_Error(ctx, NULL, NULL);
}
int edg_wll_intJobStatus(
edg_wll_ErrorCode edg_wll_LoadIntState(edg_wll_Context ctx,
edg_wlc_JobId jobid,
+ int lock,
int seq,
intJobStat **stat)
{
edg_wll_ResetError(ctx);
jobid_md5 = edg_wlc_JobIdGetUnique(jobid);
+ if (lock) {
+ edg_wll_LockJobRowForUpdate(ctx,jobid);
+ }
+
if (seq == -1) {
/* any sequence number */
trio_asprintf(&stmt,
free(res);
cleanup:
free(jobid_md5);
- free(stmt); glite_lbu_FreeStmt(&sh);
+ free(stmt);
+ if (sh) glite_lbu_FreeStmt(&sh);
return edg_wll_Error(ctx,NULL,NULL);
}
{
if (*pis) return edg_wll_Error(ctx, NULL, NULL); // already loaded and locked
- if (edg_wll_LockJob(ctx,cis->pub.parent_job)) goto err;
-
- if (edg_wll_LoadIntState(ctx, cis->pub.parent_job, - 1, pis))
+ if (edg_wll_LoadIntState(ctx, cis->pub.parent_job, LOCK, - 1, pis))
goto err;
assert(*pis); // deadlock would happen with next call of this function
}
err:
- edg_wll_UnlockJob(ctx,cis->pub.parent_job);
-
if (pis)
destroy_intJobStat(pis);
memset(&oldstat,0,sizeof oldstat);
- if (!edg_wll_LoadIntState(ctx, job, seq - 1, &ijsp)) {
+ if (!edg_wll_LoadIntState(ctx, job, DONT_LOCK, seq - 1, &ijsp)) {
edg_wll_CpyStatus(&ijsp->pub,&oldstat);
if (ctx->rgma_export) oldstat_rgmaline = write2rgma_statline(&ijsp->pub);
res = processEvent(ijsp, e, seq, be_strict, &errstring);
if (res == RET_FATAL || res == RET_INTERNAL) { /* !strict */
edg_wll_FreeStatus(&oldstat);
- edg_wll_UnlockJob(ctx,job); /* XXX: error lost */
return edg_wll_SetError(ctx, EINVAL, errstring);
}
edg_wll_StoreIntState(ctx, ijsp, seq);
- if (edg_wll_UnlockJob(ctx,job)) goto err;
edg_wll_UpdateStatistics(ctx,&oldstat,e,&ijsp->pub);
Right approach is computing parent status from scratch.
*/
- if (edg_wll_UnlockJob(ctx,job)) goto err;
edg_wll_UpdateStatistics(ctx,NULL,e,&jobstat.pub);
if (ctx->rgma_export) write2rgma_status(&jobstat.pub);
}
else destroy_intJobStat(&jobstat);
}
- else edg_wll_UnlockJob(ctx,job);
+
err:
return edg_wll_Error(ctx, NULL, NULL);
}
int edg_wll_intJobStatus( edg_wll_Context, glite_jobid_const_t, int, intJobStat *, int);
edg_wll_ErrorCode edg_wll_StoreIntState(edg_wll_Context, intJobStat *, int);
edg_wll_ErrorCode edg_wll_StoreIntStateEmbryonic(edg_wll_Context, edg_wlc_JobId, char *icnames, char *values, glite_lbu_bufInsert *bi);
-edg_wll_ErrorCode edg_wll_LoadIntState(edg_wll_Context , edg_wlc_JobId , int, intJobStat **);
+edg_wll_ErrorCode edg_wll_LoadIntState(edg_wll_Context , edg_wlc_JobId , int, int, intJobStat **);
edg_wll_ErrorCode edg_wll_StepIntState(edg_wll_Context ctx, edg_wlc_JobId job, edg_wll_Event *e, int seq, edg_wll_JobStat *stat_out);
edg_wll_ErrorCode edg_wll_StepIntStateParent(edg_wll_Context,edg_wlc_JobId,edg_wll_Event *,int,intJobStat *,edg_wll_JobStat *);
{
int fd,
reject_fd = -1,
- readret, i;
+ readret, i, ret;
size_t maxsize;
char *line = NULL,
buff[30];
if ( (fd = open(req->server_file, O_RDONLY)) == -1 )
return edg_wll_SetError(ctx, errno, "Server can not open the file");
- if (edg_wll_Transaction(ctx) != 0)
- return edg_wll_Error(ctx, NULL, NULL);
-
memset(result,0,sizeof(*result));
i = 0;
while ( 1 )
/* Read one line
*/
if ( (readret = read_line(&line, &maxsize, fd)) == -1 ) {
- edg_wll_Rollback(ctx);
return edg_wll_SetError(ctx, errno, "reading dump file");
}
result->to = event->any.arrived.tv_sec;
}
ctx->event_load = 1;
- if ( edg_wll_StoreEvent(ctx, event, line, NULL) )
- {
+
+ do {
+ if (edg_wll_Transaction(ctx)) goto err;
+
+ ret = edg_wll_StoreEvent(ctx, event, line, NULL);
+
+ } while (edg_wll_TransNeedRetry(ctx));
+
+ if (ret) {
char *errdesc;
int len = strlen(line),
total = 0,
}
write(reject_fd,"\n",1);
}
- else
- {
+ else {
result->to = event->any.arrived.tv_sec;
if ( jobid )
{
edg_wll_FreeEvent(event);
}
+err:
if ( jobid )
{
edg_wll_JobStat st;
if ( reject_fd != -1 )
close(reject_fd);
- if (edg_wll_Commit(ctx) != 0)
- return edg_wll_Error(ctx, NULL, NULL);
-
return edg_wll_Error(ctx,NULL,NULL);
}
trio_asprintf(&addr_s, "%s:%s", ctx->connections->serverConnection->peerName, aux+1);
}
- /* Format DB insert statement
- */
- trio_asprintf(&q,
- "insert into notif_registrations(notifid,destination,valid,userid,conditions) "
- "values ('%|Ss','%|Ss',%s,'%|Ss', '<and>%|Ss</and>')",
- nid_s, addr_s? addr_s: address_override, time_s, owner, xml_conds);
+ do {
+ if (edg_wll_Transaction(ctx) != 0) goto cleanup;
- if ( edg_wll_ExecSQL(ctx, q, NULL) < 0 )
- goto cleanup;
+ /* Format DB insert statement
+ */
+ trio_asprintf(&q,
+ "insert into notif_registrations(notifid,destination,valid,userid,conditions) "
+ "values ('%|Ss','%|Ss',%s,'%|Ss', '<and>%|Ss</and>')",
+ nid_s, addr_s? addr_s: address_override, time_s, owner, xml_conds);
- if (get_indexed_cols(ctx,nid_s,nconds,&add_index) ||
- (add_index && edg_wll_ExecSQL(ctx,add_index,NULL) < 0)
- ) goto cleanup;
+ if ( edg_wll_ExecSQL(ctx, q, NULL) < 0 )
+ goto rollback;
+ if (get_indexed_cols(ctx,nid_s,nconds,&add_index) ||
+ (add_index && edg_wll_ExecSQL(ctx,add_index,NULL) < 0)
+ ) goto rollback;
- if (jobs) for ( i = 0; jobs[i]; i++ )
- {
- free(q);
- trio_asprintf(&q,
- "insert into notif_jobs(notifid,jobid) values ('%|Ss','%|Ss')",
- nid_s, jobs[i]);
- if ( edg_wll_ExecSQL(ctx, q, NULL) < 0 )
+
+ if (jobs) for ( i = 0; jobs[i]; i++ )
{
- /* XXX: Remove uncoplete registration?
- * Which error has to be returned?
- */
free(q);
- trio_asprintf(&q, "delete from notif_jobs where notifid='%|Ss'", nid_s);
- edg_wll_ExecSQL(ctx, q, NULL);
- free(q);
- trio_asprintf(&q, "delete from notif_registrations where notifid='%|Ss'", nid_s);
- edg_wll_ExecSQL(ctx, q, NULL);
- goto cleanup;
+ trio_asprintf(&q,
+ "insert into notif_jobs(notifid,jobid) values ('%|Ss','%|Ss')",
+ nid_s, jobs[i]);
+ if ( edg_wll_ExecSQL(ctx, q, NULL) < 0 )
+ goto rollback;
}
- }
- else {
- trio_asprintf(&q,"insert into notif_jobs(notifid,jobid) values ('%|Ss','%|Ss')",
- nid_s,NOTIF_ALL_JOBS);
- if ( edg_wll_ExecSQL(ctx, q, NULL) < 0 ) goto cleanup;
+ else {
+ trio_asprintf(&q,"insert into notif_jobs(notifid,jobid) values ('%|Ss','%|Ss')",
+ nid_s,NOTIF_ALL_JOBS);
+ if ( edg_wll_ExecSQL(ctx, q, NULL) < 0 ) goto rollback;
- }
+ }
+rollback:
+ free(q); q= NULL;
+ free(add_index); add_index = NULL;
+
+ } while (edg_wll_TransNeedRetry(ctx));
cleanup:
if ( q ) free(q);
const char *address_override,
time_t *valid)
{
- char *time_s = NULL,
- *addr_s = NULL;
+ char *time_s = NULL,
+ *addr_s = NULL;
if ( !address_override )
{
edg_wll_SetError(ctx, EINVAL, "Address parameter not given");
- goto cleanup;
+ goto err;
}
+
+ do {
+ if (edg_wll_Transaction(ctx) != 0) goto err;
- if ( check_notif_request(ctx, nid, NULL) )
- goto cleanup;
-
- /* Format time of validity
- */
- *valid = time(NULL);
- if ( ctx->peerProxyValidity
- && (ctx->peerProxyValidity - *valid) < ctx->notifDuration )
- *valid = ctx->peerProxyValidity;
- else
- *valid += ctx->notifDuration;
+ if ( check_notif_request(ctx, nid, NULL) )
+ goto rollback;
- glite_lbu_TimeToDB(*valid, &time_s);
- if ( !time_s )
- {
- edg_wll_SetError(ctx, errno, "Formating validity time");
- goto cleanup;
- }
+ /* Format time of validity
+ */
+ *valid = time(NULL);
+ if ( ctx->peerProxyValidity
+ && (ctx->peerProxyValidity - *valid) < ctx->notifDuration )
+ *valid = ctx->peerProxyValidity;
+ else
+ *valid += ctx->notifDuration;
- /* Format the address
- */
- if ( address_override )
- {
- char *aux;
+ glite_lbu_TimeToDB(*valid, &time_s);
+ if ( !time_s )
+ {
+ edg_wll_SetError(ctx, errno, "Formating validity time");
+ goto rollback;
+ }
- if ( !(aux = strchr(address_override, ':')) )
+ /* Format the address
+ */
+ if ( address_override )
{
- edg_wll_SetError(ctx, EINVAL, "Addres overrirde not in format host:port");
- goto cleanup;
+ char *aux;
+
+ if ( !(aux = strchr(address_override, ':')) )
+ {
+ edg_wll_SetError(ctx, EINVAL, "Addres overrirde not in format host:port");
+ goto rollback;
+ }
+ if ( !strncmp(address_override, "0.0.0.0", aux-address_override) )
+ trio_asprintf(&addr_s, "%s:%s", ctx->connections->serverConnection->peerName, aux+1);
}
- if ( !strncmp(address_override, "0.0.0.0", aux-address_override) )
- trio_asprintf(&addr_s, "%s:%s", ctx->connections->serverConnection->peerName, aux+1);
- }
- update_notif(ctx, nid, NULL, addr_s? addr_s: address_override, (const char *)(time_s));
+ update_notif(ctx, nid, NULL, addr_s? addr_s: address_override, (const char *)(time_s));
-cleanup:
- if ( time_s ) free(time_s);
- if ( addr_s ) free(addr_s);
+rollback:
+ free(time_s); time_s = NULL;
+ free(addr_s); addr_s = NULL;
+
+ } while (edg_wll_TransNeedRetry(ctx));
+err:
return edg_wll_Error(ctx, NULL, NULL);
}
/* Format notification ID
*/
if ( !(nid_s = edg_wll_NotifIdGetUnique(nid)) )
- goto cleanup;
+ goto err;
- if ( check_notif_request(ctx, nid, NULL) )
- goto cleanup;
+ do {
+ if (edg_wll_Transaction(ctx) != 0) goto err;
- switch ( op )
- {
- case EDG_WLL_NOTIF_REPLACE:
- /* Format conditions
- * - separate all jobids
- * - format new condition list without jobids
- */
- if ( split_cond_list(ctx, conditions, &nconds, &jobs) )
- goto cleanup;
+ if ( check_notif_request(ctx, nid, NULL) )
+ goto rollback;
- /*
- * encode new cond. list into a XML string
- */
- if ( edg_wll_JobQueryRecToXML(ctx, (edg_wll_QueryRec const * const *) nconds, &xml_conds) )
+ switch ( op )
{
- /* XXX: edg_wll_JobQueryRecToXML() do not set errors in context!
- * can't get propper error number :(
+ case EDG_WLL_NOTIF_REPLACE:
+ /* Format conditions
+ * - separate all jobids
+ * - format new condition list without jobids
*/
- edg_wll_SetError(ctx, errno, "Can't encode data into xml");
- goto cleanup;
- }
+ if ( split_cond_list(ctx, conditions, &nconds, &jobs) )
+ goto rollback;
- /* Format DB insert statement
- */
- if ( update_notif(ctx, nid, xml_conds, NULL, NULL) )
- goto cleanup;
+ /*
+ * encode new cond. list into a XML string
+ */
+ if ( edg_wll_JobQueryRecToXML(ctx, (edg_wll_QueryRec const * const *) nconds, &xml_conds) )
+ {
+ /* XXX: edg_wll_JobQueryRecToXML() do not set errors in context!
+ * can't get propper error number :(
+ */
+ edg_wll_SetError(ctx, errno, "Can't encode data into xml");
+ goto rollback;
+ }
- if ( jobs )
- {
/* Format DB insert statement
*/
- trio_asprintf(&q, "delete from notif_jobs where notifid='%|Ss'", nid_s);
- if ( edg_wll_ExecSQL(ctx, q, NULL) < 0 )
- goto cleanup;
+ if ( update_notif(ctx, nid, xml_conds, NULL, NULL) )
+ goto rollback;
- for ( i = 0; jobs[i]; i++ )
+ if ( jobs )
{
- free(q);
- trio_asprintf(&q,
- "insert into notif_jobs(notifid,jobid) values ('%|Ss','%|Ss')",
- nid_s, jobs[i]);
+ /* Format DB insert statement
+ */
+ trio_asprintf(&q, "delete from notif_jobs where notifid='%|Ss'", nid_s);
if ( edg_wll_ExecSQL(ctx, q, NULL) < 0 )
+ goto rollback;
+
+ for ( i = 0; jobs[i]; i++ )
{
- /* XXX: Remove uncoplete registration?
- * Which error has to be returned?
- */
- free(q);
- trio_asprintf(&q, "delete from notif_jobs where notifid='%|Ss'", nid_s);
- edg_wll_ExecSQL(ctx, q, NULL);
free(q);
- trio_asprintf(&q,"delete from notif_registrations where notifid='%|Ss'", nid_s);
- edg_wll_ExecSQL(ctx, q, NULL);
- goto cleanup;
+ trio_asprintf(&q,
+ "insert into notif_jobs(notifid,jobid) values ('%|Ss','%|Ss')",
+ nid_s, jobs[i]);
+ if ( edg_wll_ExecSQL(ctx, q, NULL) < 0 )
+ {
+ /* XXX: Remove uncoplete registration?
+ * Which error has to be returned?
+ */
+ free(q);
+ trio_asprintf(&q, "delete from notif_jobs where notifid='%|Ss'", nid_s);
+ edg_wll_ExecSQL(ctx, q, NULL);
+ free(q);
+ trio_asprintf(&q,"delete from notif_registrations where notifid='%|Ss'", nid_s);
+ edg_wll_ExecSQL(ctx, q, NULL);
+ goto rollback;
+ }
}
}
+ break;
+
+ case EDG_WLL_NOTIF_ADD:
+ break;
+ case EDG_WLL_NOTIF_REMOVE:
+ break;
+ default:
+ break;
}
- break;
-
- case EDG_WLL_NOTIF_ADD:
- break;
- case EDG_WLL_NOTIF_REMOVE:
- break;
- default:
- break;
- }
-cleanup:
- if ( q ) free(q);
- if ( xml_conds ) free(xml_conds);
- if ( nid_s ) free(nid_s);
- if ( jobs )
- {
- for ( i = 0; jobs[i]; i++ )
- free(jobs[i]);
- free(jobs);
- }
- if ( nconds ) free(nconds);
+rollback:
+ free(q); q = NULL;
+ free(xml_conds); xml_conds = NULL;
+ free(nid_s); nid_s = NULL;
+ if ( jobs ) {
+ for ( i = 0; jobs[i]; i++ )
+ free(jobs[i]);
+ free(jobs); jobs = NULL;
+ }
+ free(nconds); nconds = NULL;
+
+ } while (edg_wll_TransNeedRetry(ctx));
+err:
return edg_wll_Error(ctx, NULL, NULL);
}
{
char *time_s = NULL;
+ do {
+ if (edg_wll_Transaction(ctx) != 0) goto err;
- if ( check_notif_request(ctx, nid, NULL) )
- goto cleanup;
+ if ( check_notif_request(ctx, nid, NULL) )
+ goto rollback;
- /* Format time of validity
- */
- *valid = time(NULL);
- if ( ctx->peerProxyValidity
- && (ctx->peerProxyValidity - *valid) < ctx->notifDuration )
- *valid = ctx->peerProxyValidity;
- else
- *valid += ctx->notifDuration;
+ /* Format time of validity
+ */
+ *valid = time(NULL);
+ if ( ctx->peerProxyValidity
+ && (ctx->peerProxyValidity - *valid) < ctx->notifDuration )
+ *valid = ctx->peerProxyValidity;
+ else
+ *valid += ctx->notifDuration;
- glite_lbu_TimeToDB(*valid, &time_s);
- if ( !time_s )
- {
- edg_wll_SetError(ctx, errno, "Formating validity time");
- goto cleanup;
- }
+ glite_lbu_TimeToDB(*valid, &time_s);
+ if ( !time_s )
+ {
+ edg_wll_SetError(ctx, errno, "Formating validity time");
+ goto rollback;
+ }
- update_notif(ctx, nid, NULL, NULL, time_s);
+ update_notif(ctx, nid, NULL, NULL, time_s);
-cleanup:
- if ( time_s ) free(time_s);
+rollback:
+ free(time_s); time_s = NULL;
+ } while (edg_wll_TransNeedRetry(ctx));
+
+err:
return edg_wll_Error(ctx, NULL, NULL);
}
{
char *nid_s = NULL,
*stmt = NULL;
- int ret;
+
+ do {
+ if (edg_wll_Transaction(ctx) != 0) goto err;
- if ( check_notif_request(ctx, nid, NULL) )
- goto cleanup;
+ if ( check_notif_request(ctx, nid, NULL) )
+ goto rollback;
- if ( !(nid_s = edg_wll_NotifIdGetUnique(nid)) )
- goto cleanup;
+ if ( !(nid_s = edg_wll_NotifIdGetUnique(nid)) )
+ goto rollback;
- trio_asprintf(&stmt, "delete from notif_registrations where notifid='%|Ss'", nid_s);
- if ( (ret = edg_wll_ExecSQL(ctx, stmt, NULL)) < 0 )
- goto cleanup;
- free(stmt);
- trio_asprintf(&stmt, "delete from notif_jobs where notifid='%|Ss'", nid_s);
- edg_wll_ExecSQL(ctx, stmt, NULL);
- edg_wll_NotifCancelRegId(ctx, nid);
+ trio_asprintf(&stmt, "delete from notif_registrations where notifid='%|Ss'", nid_s);
+ if ( edg_wll_ExecSQL(ctx, stmt, NULL) < 0 )
+ goto rollback;
+ free(stmt);
+ trio_asprintf(&stmt, "delete from notif_jobs where notifid='%|Ss'", nid_s);
+ if ( edg_wll_ExecSQL(ctx, stmt, NULL) < 0 )
+ goto rollback;
+ edg_wll_NotifCancelRegId(ctx, nid);
-cleanup:
- if ( nid_s ) free(nid_s);
- if ( stmt ) free(stmt);
+rollback:
+ free(nid_s); nid_s = NULL;
+ free(stmt); stmt = NULL;
+
+ } while (edg_wll_TransNeedRetry(ctx));
+err:
return edg_wll_Error(ctx, NULL, NULL);
}
trio_asprintf(&stmt,
"select notifid from notif_registrations "
- "where notifid='%|Ss' and userid='%|Ss'",
+ "where notifid='%|Ss' and userid='%|Ss' FOR UPDATE",
nid_s, user);
if ( (ret = edg_wll_ExecSQL(ctx, stmt, NULL)) < 0 )
*/
int edg_wll_PurgeServerProxy(
edg_wll_Context ctx,
- edg_wlc_JobId job
+ glite_jobid_const_t job
);
#define FILE_TYPE_ANY ""
}
}
- // Auth checked in edg_wll_JobStatus above
+ // Auth checked in edg_wll_JobStatusServer above
if ( !(where_flags & FL_FILTER) && !noAuth )
{
if (!ctx->peerName || (strcmp(res[1],peerid) && strcmp(res[1], can_peerid))) {
NULL
};
-static int purge_one(edg_wll_Context ctx,glite_jobid_const_t,int,int);
-int unset_proxy_flag(edg_wll_Context ctx, edg_wlc_JobId job);
-static int unset_server_flag(edg_wll_Context ctx, edg_wlc_JobId job);
+static int purge_one(edg_wll_Context ctx,glite_jobid_const_t,int,int,int);
+int unset_proxy_flag(edg_wll_Context ctx, glite_jobid_const_t job);
+static int unset_server_flag(edg_wll_Context ctx, glite_jobid_const_t job);
int edg_wll_CreateTmpFileStorage(edg_wll_Context ctx, char *prefix, char **fname)
return retfd;
}
-int edg_wll_PurgeServerProxy(edg_wll_Context ctx, edg_wlc_JobId job)
+int edg_wll_PurgeServerProxy(edg_wll_Context ctx, glite_jobid_const_t job)
{
- switch ( purge_one(ctx, job, -1, 1) ) {
+ switch ( purge_one(ctx, job, -1, 1, 1) ) {
case 0:
case ENOENT:
- edg_wll_ResetError(ctx);
- return 0;
-
+ return(edg_wll_ResetError(ctx));
+ break;
default:
- return -1;
+ return(edg_wll_Error(ctx,NULL,NULL));
+ break;
}
}
parse = 1;
}
else {
- switch (purge_one(ctx,job,dumpfile,request->flags&EDG_WLL_PURGE_REALLY_PURGE)) {
+ switch (purge_one(ctx,job,dumpfile,request->flags&EDG_WLL_PURGE_REALLY_PURGE,0)) {
case 0: if (request->flags & EDG_WLL_PURGE_LIST_JOBS) {
result.jobs = realloc(result.jobs,(naffected_jobs+2) * sizeof(*result.jobs));
result.jobs[naffected_jobs] = strdup(request->jobs[i]);
}
naffected_jobs++;
break;
- case ENOENT: parse = 1;
+ case ENOENT: /* job does not exist, consider purged and ignore */
edg_wll_ResetError(ctx);
break;
default: goto abort;
memset(&stat,0,sizeof stat);
if (edg_wll_JobStatusServer(ctx,job,0,&stat)) { /* FIXME: replace by intJobStatus ?? */
+ if (edg_wll_Error(ctx, NULL, NULL) == ENOENT) {
+ /* job purged meanwhile, ignore */
+ edg_wll_ResetError(ctx);
+ continue;
+ }
edg_wll_FreeStatus(&stat);
goto abort;
}
if (now-stat.lastUpdateTime.tv_sec > timeout[i] && !check_strict_jobid(ctx,job))
{
- if (purge_one(ctx,job,dumpfile,request->flags&EDG_WLL_PURGE_REALLY_PURGE)) {
+ if (purge_one(ctx,job,dumpfile,request->flags&EDG_WLL_PURGE_REALLY_PURGE,0)) {
edg_wll_FreeStatus(&stat);
+ if (edg_wll_Error(ctx, NULL, NULL) == ENOENT) {
+ /* job purged meanwhile, ignore */
+ edg_wll_ResetError(ctx);
+ continue;
+ }
goto abort;
}
}
}
+static int dump_events(edg_wll_Context ctx, glite_jobid_const_t job, int dump, char **res)
+{
+ edg_wll_Event e;
+ int event;
+
+
+ event = atoi(res[0]);
+ free(res[0]); res[0] = NULL;
+
+ res[0] = edg_wlc_JobIdUnparse(job);
+ if (convert_event_head(ctx,res,&e) || edg_wll_get_event_flesh(ctx,event,&e))
+ {
+ char *et,*ed, *dbjob;
+ int i;
+
-int purge_one(edg_wll_Context ctx,glite_jobid_const_t job,int dump, int purge)
+ /* Most likely sort of internal inconsistency.
+ * Must not be fatal -- just complain
+ */
+ edg_wll_Error(ctx,&et,&ed);
+ dbjob = edg_wlc_JobIdGetUnique(job);
+ fprintf(stderr,"%s event %d: %s (%s)\n",dbjob,event,et,ed);
+ syslog(LOG_WARNING,"%s event %d: %s (%s)",dbjob,event,et,ed);
+ free(et); free(ed); free(dbjob);
+ for (i=0; i<sizofa(res); i++) free(res[i]);
+ edg_wll_ResetError(ctx);
+ }
+ else {
+ char *event_s = edg_wll_UnparseEvent(ctx,&e);
+ char arr_s[100];
+ int len, written, total;
+
+ strcpy(arr_s, "DG.ARRIVED=");
+ edg_wll_ULMTimevalToDate(e.any.arrived.tv_sec,
+ e.any.arrived.tv_usec,
+ arr_s+strlen("DG.ARRIVED="));
+
+ len = strlen(arr_s);
+ total = 0;
+ while (total != len) {
+ written = write(dump,arr_s+total,len-total);
+ if (written < 0 && errno != EAGAIN) {
+ edg_wll_SetError(ctx,errno,"writing dump file");
+ free(event_s);
+ return edg_wll_Error(ctx,NULL,NULL);
+ }
+ total += written;
+ }
+ write(dump, " ", 1);
+
+ len = strlen(event_s);
+ total = 0;
+ while (total != len) {
+ written = write(dump,event_s+total,len-total);
+ if (written < 0 && errno != EAGAIN) {
+ perror("dump to file");
+ syslog(LOG_ERR,"dump to file: %m");
+ dump = -1; /* XXX: likely to be a permanent error
+ * give up writing but do purge */
+ break;
+ }
+ total += written;
+ }
+ /* write(dump,"\n",1); edg_wll_UnparseEvent does so */
+ free(event_s);
+ }
+ edg_wll_FreeEvent(&e);
+
+
+ return edg_wll_Error(ctx,NULL,NULL);
+}
+
+int purge_one(edg_wll_Context ctx,glite_jobid_const_t job,int dump, int purge, int purge_from_proxy_only)
{
char *dbjob;
char *stmt = NULL;
glite_lbu_Statement q;
int ret,dumped = 0;
+ char *res[9];
+
edg_wll_ResetError(ctx);
if ( !purge && dump < 0 ) return 0;
- switch (edg_wll_jobMembership(ctx, job)) {
- case DB_PROXY_JOB:
- if (!ctx->isProxy) {
- /* should not happen */
- return 0;
- }
- /* continue */
- break;
- case DB_SERVER_JOB:
- if (ctx->isProxy) {
- /* should not happen */
- return 0;
- }
- /* continue */
- break;
- case DB_PROXY_JOB+DB_SERVER_JOB:
- if (ctx->isProxy) {
- purge = 0;
- if (unset_proxy_flag(ctx, job) < 0) {
- return(edg_wll_Error(ctx,NULL,NULL));
+ do {
+ if (edg_wll_Transaction(ctx)) goto err;
+
+ switch (edg_wll_jobMembership(ctx, job)) {
+ case DB_PROXY_JOB:
+ if (!ctx->isProxy) {
+ /* should not happen */
+ goto commit;
}
- }
- else {
- purge = 0;
- if (unset_server_flag(ctx, job) < 0) {
- return(edg_wll_Error(ctx,NULL,NULL));
+ /* continue */
+ break;
+ case DB_SERVER_JOB:
+ if (ctx->isProxy) {
+ /* should not happen */
+ goto commit;
}
- }
- break;
- case 0:
- // Zombie job (server=0, proxy=0)? should not happen;
- // clear it to keep DB healthy
- break;
- default:
- return 0;
- break;
- }
+ /* continue */
+ break;
+ case DB_PROXY_JOB+DB_SERVER_JOB:
+ if (ctx->isProxy) {
+ purge = 0;
+ if (unset_proxy_flag(ctx, job) < 0) {
+ goto rollback;
+ }
+ }
+ else {
+ purge = 0;
+ /* if server&proxy DB is shared ... */
+ if (is_job_local(ctx,job) && purge_from_proxy_only) {
+ if (unset_proxy_flag(ctx, job) < 0) {
+ goto rollback;
+ }
+ }
+ else {
+ if (unset_server_flag(ctx, job) < 0) {
+ goto rollback;
+ }
+ }
+ }
+ break;
+ case 0:
+ // Zombie job (server=0, proxy=0)? should not happen;
+ // clear it to keep DB healthy
+ break;
+ default:
+ goto rollback;
+ break;
+ }
- dbjob = edg_wlc_JobIdGetUnique(job); /* XXX: strict jobid already checked */
- if (edg_wll_LockJob(ctx,job)) goto clean;
+ dbjob = edg_wlc_JobIdGetUnique(job); /* XXX: strict jobid already checked */
- if ( purge )
- {
- trio_asprintf(&stmt,"delete from jobs where jobid = '%|Ss'",dbjob);
- ret = edg_wll_ExecSQL(ctx,stmt,NULL);
- if (ret <= 0) {
- unlock_and_check(ctx,job);
- if (ret == 0) {
- fprintf(stderr,"%s: no such job\n",dbjob);
- edg_wll_SetError(ctx,ENOENT,dbjob);
- }
- goto clean;
- }
- free(stmt); stmt = NULL;
+ if ( purge )
+ {
+ trio_asprintf(&stmt,"delete from jobs where jobid = '%|Ss'",dbjob);
+ if (edg_wll_ExecSQL(ctx,stmt,NULL) < 0) goto rollback;
+ free(stmt); stmt = NULL;
- trio_asprintf(&stmt,"delete from states where jobid = '%|Ss'",dbjob);
- if (edg_wll_ExecSQL(ctx,stmt,NULL) < 0) {
- unlock_and_check(ctx,job);
- goto clean;
+ trio_asprintf(&stmt,"delete from states where jobid = '%|Ss'",dbjob);
+ if (edg_wll_ExecSQL(ctx,stmt,NULL) < 0) goto rollback;
+ free(stmt); stmt = NULL;
}
- free(stmt); stmt = NULL;
-/* Why on earth ?
- trio_asprintf(&stmt,"delete from states where jobid = '%|Ss'",dbjob);
- if (edg_wll_ExecSQL(ctx,stmt,NULL) < 0) {
- unlock_and_check(ctx,job);
- goto clean;
+ if ( purge )
+ {
+ trio_asprintf(&stmt,"delete from status_tags where jobid = '%|Ss'",dbjob);
+ if (edg_wll_ExecSQL(ctx,stmt,NULL) < 0) goto rollback;
+ free(stmt); stmt = NULL;
}
- free(stmt); stmt = NULL;
-*/
- }
-
- if (!ctx->strict_locking) unlock_and_check(ctx,job);
+ if (dump >= 0)
+ trio_asprintf(&stmt,
+ "select event,code,prog,host,u.cert_subj,time_stamp,usec,level,arrived "
+ "from events e,users u "
+ "where e.jobid='%|Ss' "
+ "and u.userid=e.userid "
+ "order by event", dbjob);
+ else
+ trio_asprintf(&stmt,"select event from events "
+ "where jobid='%|Ss' "
+ "order by event", dbjob);
- if ( purge )
- {
- trio_asprintf(&stmt,"delete from status_tags where jobid = '%|Ss'",dbjob);
- if (edg_wll_ExecSQL(ctx,stmt,NULL) < 0) goto unlock;
+ if (edg_wll_ExecSQL(ctx,stmt,&q) < 0) goto rollback;
free(stmt); stmt = NULL;
- }
-
- if (dump >= 0)
- trio_asprintf(&stmt,
- "select event,code,prog,host,u.cert_subj,time_stamp,usec,level,arrived "
- "from events e,users u "
- "where e.jobid='%|Ss' "
- "and u.userid=e.userid "
- "order by event", dbjob);
- else
- trio_asprintf(&stmt,"select event from events "
- "where jobid='%|Ss' "
- "order by event", dbjob);
-
-/* check for events repeatedly -- new one may have arrived in the meantime */
- while ((ret = edg_wll_ExecSQL(ctx,stmt,&q)) > 0) {
- char *res[9];
dumped = 1;
while ((ret = edg_wll_FetchRow(ctx,q,sizofa(res),NULL,res)) > 0) {
int event;
+
+ assert(ret == 9);
event = atoi(res[0]);
- free(res[0]); res[0] = NULL;
- if (dump >= 0) {
- edg_wll_Event e;
+ if (dump >= 0)
+ if (dump_events( ctx, job, dump, (char **) &res)) goto rollback;
- assert(ret == 9);
- res[0] = edg_wlc_JobIdUnparse(job);
- if (convert_event_head(ctx,res,&e) || edg_wll_get_event_flesh(ctx,event,&e))
- {
- char *et,*ed;
- int i;
-
- /* Most likely sort of internal inconsistency.
- * Must not be fatal -- just complain
- */
- edg_wll_Error(ctx,&et,&ed);
- fprintf(stderr,"%s event %d: %s (%s)\n",dbjob,event,et,ed);
- syslog(LOG_WARNING,"%s event %d: %s (%s)",dbjob,event,et,ed);
- free(et); free(ed);
- for (i=0; i<sizofa(res); i++) free(res[i]);
- edg_wll_ResetError(ctx);
- }
- else {
- char *event_s = edg_wll_UnparseEvent(ctx,&e);
- char arr_s[100];
- int len, written, total;
-
- strcpy(arr_s, "DG.ARRIVED=");
- edg_wll_ULMTimevalToDate(e.any.arrived.tv_sec,
- e.any.arrived.tv_usec,
- arr_s+strlen("DG.ARRIVED="));
-
- len = strlen(arr_s);
- total = 0;
- while (total != len) {
- written = write(dump,arr_s+total,len-total);
- if (written < 0 && errno != EAGAIN) {
- edg_wll_SetError(ctx,errno,"writing dump file");
- free(event_s);
- goto clean;
- }
- total += written;
- }
- write(dump, " ", 1);
-
- len = strlen(event_s);
- total = 0;
- while (total != len) {
- written = write(dump,event_s+total,len-total);
- if (written < 0 && errno != EAGAIN) {
- perror("dump to file");
- syslog(LOG_ERR,"dump to file: %m");
- dump = -1; /* XXX: likely to be a permanent error
- * give up writing but do purge */
- break;
- }
- total += written;
- }
- /* write(dump,"\n",1); edg_wll_UnparseEvent does so */
- free(event_s);
- }
- edg_wll_FreeEvent(&e);
- }
-
- if ( purge ) {
- if (edg_wll_delete_event(ctx,dbjob,event)) {
- char *et,*ed;
-
- /* XXX: just complain and carry on. Is it OK? */
- edg_wll_Error(ctx,&et,&ed);
- fprintf(stderr,"%s event %d: %s (%s)\n",dbjob,event,et,ed);
- syslog(LOG_WARNING,"%s event %d: %s (%s)",dbjob,event,et,ed);
- free(et); free(ed);
- edg_wll_ResetError(ctx);
- }
- }
+ if ( purge )
+ if (edg_wll_delete_event(ctx,dbjob,event)) goto rollback;
}
glite_lbu_FreeStmt(&q);
- if (ret < 0 || !purge) break;
- }
+ if (ret < 0) goto rollback;
- glite_lbu_FreeStmt(&q);
+commit:
+rollback:;
+ } while (edg_wll_TransNeedRetry(ctx));
-unlock:
- if (ctx->strict_locking) unlock_and_check(ctx,job);
-clean:
+err:
free(dbjob);
free(stmt);
return edg_wll_Error(ctx,NULL,NULL);
}
-int unset_proxy_flag(edg_wll_Context ctx, edg_wlc_JobId job)
+int unset_proxy_flag(edg_wll_Context ctx, glite_jobid_const_t job)
{
char *stmt = NULL;
char *dbjob;
}
-int unset_server_flag(edg_wll_Context ctx, edg_wlc_JobId job)
+int unset_server_flag(edg_wll_Context ctx, glite_jobid_const_t job)
{
char *stmt = NULL;
char *dbjob;
#include <assert.h>
#include <errno.h>
#include <syslog.h>
+#include <ctype.h>
#include "glite/jobid/strmd5.h"
#include "glite/lbu/trio.h"
#include "jobstat.h"
#include "db_calls.h"
#include "db_supp.h"
+#include "index.h"
static int store_user(edg_wll_Context,const char *,const char *);
-static int store_job(edg_wll_Context,glite_jobid_const_t,const char *, int, int);
+static int store_job(edg_wll_Context,glite_jobid_const_t,const char *, int, int, int, int);
#ifdef LB_BUF
static int store_job_block(edg_wll_Context, glite_jobid_const_t, const char *, glite_lbu_bufInsert *, int, int);
#endif
-static int store_job_grey(edg_wll_Context,glite_jobid_const_t,time_t);
+static int set_job_grey(edg_wll_Context ctx, char *jobid);
static int store_flesh(edg_wll_Context,edg_wll_Event *,const char *ulm, char *,int);
-static int store_seq(edg_wll_Context,edg_wll_Event *,int);
static int check_dup(edg_wll_Context,edg_wll_Event *);
static int check_auth(edg_wll_Context,edg_wll_Event *e);
-#ifndef LB_DAG_EMBRIONIC
-static int register_subjobs(edg_wll_Context,const edg_wll_RegJobEvent *);
-#endif
-static int register_subjobs_embryonic(edg_wll_Context,const edg_wll_RegJobEvent *, const char *);
+static void lowercase_usertag(edg_wll_Event *ev);
void edg_wll_StoreAnonymous(edg_wll_Context ctx,int anon) {
ctx->allowAnonymous = anon;
}
+
+/* !!! to be called from OPEN TRANSACTION only !!!
+ */
int edg_wll_StoreEvent(edg_wll_Context ctx,edg_wll_Event *e,const char *ulm,int *seq)
{
- edg_wll_ErrorCode err = 0;
- char *userid = NULL,*jobid,*stmt;
- char *select_max,*ssrc;
+ char *userid, *jobid, *stmt, *ssrc, *now_s, *stamp, *dummy, *max;
glite_lbu_Statement sh = NULL;
- int next = 0xDEAD;
- int lbproxy_notreg = 0;
- char *now_s = NULL;
+ int next = 0xDEAD, nr;
- ssrc = jobid = stmt = select_max = NULL;
+
+ userid = ssrc = jobid = stmt = now_s = stamp = dummy = max = NULL;
+
+ lowercase_usertag(e);
+ jobid = edg_wlc_JobIdGetUnique(e->any.jobId);
+ glite_lbu_TimeToDB(e->any.timestamp.tv_sec, &stamp);
+ ssrc = edg_wll_SourceToString(e->any.source);
if ( ctx->event_load )
glite_lbu_TimeToDB(e->any.arrived.tv_sec, &now_s);
glite_lbu_TimeToDB(time(NULL), &now_s);
edg_wll_ResetError(ctx);
- switch (err = check_auth(ctx,e)) {
+ switch (check_auth(ctx,e)) {
case 0: break;
case ENOENT:
- if ( !ctx->isProxy ) {
- if (ctx->greyjobs) {
- edg_wll_ResetError(ctx);
- if (store_job_grey(ctx,e->any.jobId,e->any.timestamp.tv_sec))
- goto clean;
- break;
- }
- else goto clean;
- }
-
- edg_wll_ResetError(ctx);
- lbproxy_notreg = 1;
+ /* job not registered */
+ // should not happen, store_job_server_proxy() miscoded or going thu load?
+ goto clean;
break;
case EPERM:
if (!ctx->noAuth) goto clean;
default: goto clean;
}
-/* FIXME: does not work for grey jobs due to "select from jobs" -- I don't care for the time being */
- if ((err = check_dup(ctx,e))) goto clean;
-
- jobid = edg_wlc_JobIdGetUnique(e->any.jobId);
-
trio_asprintf(&stmt,"select userid from jobs where jobid='%|Ss'", jobid);
if (edg_wll_ExecSQL(ctx,stmt,&sh) < 0 || edg_wll_FetchRow(ctx,sh,1,NULL,&userid) < 0) goto clean;
free(stmt); stmt = NULL;
-/* obtain next event sequence number */
- trio_asprintf(&select_max,
- "select max(event) from events "
- "where jobid = '%|Ss'",jobid);
+/* check duplicity */
+ trio_asprintf(&stmt,
+ "select arrived from events where jobid='%|Ss' and code='%d'"
+ " and prog='%|Ss' and host='%|Ss' and time_stamp='%s' and usec='%d'"
+ " and level='%d' and userid='%|Ss' and seqcode='%|Ss') ",
+ jobid, (int) e->any.type,
+ ssrc,e->any.host,
+ stamp,e->any.timestamp.tv_usec,
+ e->any.level,userid, e->any.seqcode);
+
+ if (edg_wll_ExecSQL(ctx,stmt,NULL) < 0) goto clean;
+ nr = edg_wll_FetchRow(ctx,sh,1,NULL,&dummy);
+ if (sh) glite_lbu_FreeStmt(&sh);
+ free(stmt); stmt = NULL;
+ free(dummy);
- ssrc = edg_wll_SourceToString(e->any.source);
+ if (nr < 0) goto clean;
+ if (nr > 0) {
+ /* possible duplicity (99%) */
+ // XXX: check event flesh to be 100% sure
+ edg_wll_SetError(ctx,EEXIST,"duplicate event");
+ goto clean;
+ }
+ /* else (nr == 0) -> unique event, continue */
+
-/* try to insert (someone else may be doing the same) */
- {
- char *max = NULL;
+/* obtain number of stored events */
+ trio_asprintf(&stmt,
+ "select nevents from jobs "
+ "where jobid = '%|Ss'",jobid);
+
+ if (edg_wll_ExecSQL(ctx,stmt,&sh) < 0 ||
+ edg_wll_FetchRow(ctx,sh,1,NULL,&max) < 0) goto clean;
+ glite_lbu_FreeStmt(&sh);
+
+ next = (max && *max) ? atoi(max)+1 : 0;
+ free(max);
- if (edg_wll_ExecSQL(ctx,select_max,&sh) < 0 ||
- edg_wll_FetchRow(ctx,sh,1,NULL,&max) < 0)
- {
- err = edg_wll_Error(ctx,NULL,NULL);
- goto clean;
- }
- glite_lbu_FreeStmt(&sh);
-
- next = max && *max ? atoi(max)+1 : 0;
- free(max);
- }
-
- while (1) {
- /*
- * 1) when using transactions:
- * Store the whole event right now.
- *
- * 2) when not using transactions:
- * Store an UNDEF event first in order to prevent race condition
- * with readers and update event code later.
- */
- char *stamp = NULL;
-
- glite_lbu_TimeToDB(e->any.timestamp.tv_sec, &stamp);
- trio_asprintf(&stmt,
- "insert into events(jobid,event,code,prog,host,time_stamp,usec,arrived,level,userid) "
- "values ('%|Ss',%d,%d,'%|Ss','%|Ss',%s,%d,%s,%d,'%|Ss')",
- jobid,next,
- ctx->dbcaps & GLITE_LBU_DB_CAP_TRANSACTIONS ? (int) e->any.type : EDG_WLL_EVENT_UNDEF,
- ssrc,e->any.host,
- stamp,e->any.timestamp.tv_usec,
- now_s, e->any.level,userid);
- free(stamp);
-
- if (edg_wll_ExecSQL(ctx,stmt,NULL) < 0) {
- if ((err = edg_wll_Error(ctx,NULL,NULL)) != EEXIST)
- goto clean;
- } else break; /* successful insert */
-
- /* we were late -- try once again */
- next++;
- free(stmt);
- }
+/* store event */
+ trio_asprintf(&stmt,
+ "insert into events(jobid,event,code,prog,host,time_stamp,usec,arrived,level,userid,seqcode) "
+ "values ('%|Ss',%d,%d,'%|Ss','%|Ss',%s,%d,%s,%d,'%|Ss','%|Ss')",
+ jobid,next,
+ (int) e->any.type,
+ ssrc,e->any.host,
+ stamp,e->any.timestamp.tv_usec,
+ now_s, e->any.level,userid, e->any.seqcode);
+
+ if (edg_wll_ExecSQL(ctx,stmt,NULL) < 0) goto clean;
free(stmt); stmt = NULL;
- if ((err = store_seq(ctx,e,next)) ||
- (err = store_flesh(ctx,e,ulm,jobid,next))) {
- /* attempt to cleanup, ignore new errors */
- char *desc;
- edg_wll_ErrorCode oerr = edg_wll_Error(ctx,NULL,&desc);
-
- edg_wll_delete_event(ctx,jobid,next);
- edg_wll_SetError(ctx,oerr,desc);
- free(desc);
- } else
- if (!(ctx->dbcaps & GLITE_LBU_DB_CAP_TRANSACTIONS)) {
- /* emulate commit, i.e. swith to a real event type to make
- * the record valid */
- trio_asprintf(&stmt,
- "update events set code=%d "
- "where jobid='%|Ss' and event=%d",
- (int) e->any.type,jobid,next);
- switch (edg_wll_ExecSQL(ctx,stmt,NULL)) {
- case 0: if (ctx->strict_locking)
- err = edg_wll_SetError(ctx,ENOENT,"event disappeared on store while strict locking");
- /* purge in progres: drop the garbage, ignore errors */
- else {
- edg_wll_delete_event(ctx,jobid,next);
- err = edg_wll_SetError(ctx,ENOENT,"job being purged");
- }
- break;
- case 1: if (ctx->strict_locking) err = 0;
- else {
- /* check whether the job is still there to prevent garbage
- * left while there is a concurrent purge
- */
- free(stmt);
- trio_asprintf(&stmt,
- "select 'x' from jobs where jobid='%|Ss'",
- jobid);
- switch (edg_wll_ExecSQL(ctx,stmt,NULL)) {
- case 1: break;
- case 0: /* purge in progres */
- edg_wll_delete_event(ctx,jobid,next);
- err = edg_wll_SetError(ctx,ENOENT,"job being purged");
- break;
- default: err = edg_wll_SetError(ctx,EDG_WLL_ERROR_DB_CALL,
- "more job records, what is that?");
- break;
- }
- }
- break;
- case -1: err = edg_wll_Error(ctx,NULL,NULL);
- break;
-
- default: err = edg_wll_SetError(ctx,EDG_WLL_ERROR_DB_CALL,
- "more event records, what is that?");
- break;
- }
- } /* if !transactions */
- if (err == 0 &&
- e->any.type == EDG_WLL_EVENT_REGJOB &&
- (e->regJob.jobtype == EDG_WLL_REGJOB_DAG ||
- e->regJob.jobtype == EDG_WLL_REGJOB_PARTITIONED ||
- e->regJob.jobtype == EDG_WLL_REGJOB_COLLECTION) &&
- e->regJob.nsubjobs > 0)
+/* increase number of stored events */
+ trio_asprintf(&stmt,
+ "update jobs set nevents='%d'"
+ "where jobid = '%|Ss'", next, jobid);
-#ifdef LB_DAG_EMBRIONIC
- err = register_subjobs_embryonic(ctx,&e->regJob,userid);
-#else
- err = register_subjobs(ctx,&e->regJob);
-#endif
+ if (edg_wll_ExecSQL(ctx,stmt,NULL) < 0) goto clean;
+ free(stmt); stmt = NULL;
+
+/* store event record */
+ if (store_flesh(ctx,e,ulm,jobid,next)) goto clean;
clean:
free(now_s);
free(jobid);
free(stmt);
free(ssrc);
- free(select_max);
if (sh) glite_lbu_FreeStmt(&sh);
- if (!err && seq) *seq = next;
- return err;
+ if (!edg_wll_Error(ctx,NULL,NULL) && seq) *seq = next;
+ free(stamp);
+ return edg_wll_Error(ctx,NULL,NULL);
}
static int store_user(edg_wll_Context ctx,const char *userid,const char *subj)
return edg_wll_Error(ctx,NULL,NULL);
}
-static int store_job(edg_wll_Context ctx,glite_jobid_const_t job,const char *userid, int proxy, int server)
+static int store_job(edg_wll_Context ctx,glite_jobid_const_t job,const char *userid, int proxy, int server,int grey, int update)
{
char *jobstr = edg_wlc_JobIdUnparse(job);
char *jobid = edg_wlc_JobIdGetUnique(job);
else {
server = 1;
}
-
- trio_asprintf(&stmt,"insert into jobs(jobid,dg_jobid,userid,proxy,server) "
- "values ('%|Ss','%|Ss','%|Ss', '%|Sd', '%|Sd')",jobid,jobstr,userid,proxy,server);
- if (edg_wll_ExecSQL(ctx,stmt,NULL) < 0) {
- if (edg_wll_Error(ctx,NULL,NULL) == EEXIST)
- edg_wll_ResetError(ctx);
+ if (update) {
+ trio_asprintf(&stmt,"update jobs set userid='%|Ss', proxy='%|Sd', server='%|Sd', grey='%|Sd' where jobid='%|Ss'",
+ userid,proxy,server,grey,jobid);
+ }
+ else {
+ trio_asprintf(&stmt,"insert into jobs(jobid,dg_jobid,userid,proxy,server,grey) "
+ "values ('%|Ss','%|Ss','%|Ss', '%|Sd', '%|Sd', '%|Sd')",jobid,jobstr,userid,proxy,server,grey);
}
- free(stmt);
- if (ctx->greyjobs) {
- trio_asprintf(&stmt,"delete from grey_jobs where jobid = '%|Ss'",jobid);
- edg_wll_ExecSQL(ctx,stmt,NULL); /* XXX: error propagates */
- free(stmt);
+ if (edg_wll_ExecSQL(ctx,stmt,NULL) < 0) {
+ if (edg_wll_Error(ctx,NULL,NULL) == EEXIST && !update)
+ edg_wll_ResetError(ctx);
+ else
+ goto err;
}
+ free(stmt); stmt = NULL;
+err:
+ free(stmt);
free(jobstr);
free(jobid);
return edg_wll_Error(ctx,NULL,NULL);
}
#endif
-static int store_job_grey(edg_wll_Context ctx,glite_jobid_const_t job,time_t etime)
+static int set_job_grey(edg_wll_Context ctx, char *jobid)
{
- char *jobstr = edg_wlc_JobIdUnparse(job);
- char *jobid = edg_wlc_JobIdGetUnique(job);
- char *stmt, *s_etime;
-
- if (jobid == NULL || jobstr == NULL)
- return edg_wll_SetError(ctx,EINVAL,"store_job_grey()");
-
- edg_wll_ResetError(ctx);
- glite_lbu_TimeToDB(etime, &s_etime);
- trio_asprintf(&stmt,"insert into grey_jobs(jobid,dg_jobid,time_stamp) "
- "values ('%|Ss','%|Ss',%s)",
- jobid,jobstr,s_etime);
- free(s_etime);
+ char *stmt;
- if (edg_wll_ExecSQL(ctx,stmt,NULL) < 0) {
- if (edg_wll_Error(ctx,NULL,NULL) == EEXIST)
- edg_wll_ResetError(ctx);
- }
+ trio_asprintf(&stmt,"update jobs set grey='1' where jobid='%|Ss'", jobid);
+ edg_wll_ExecSQL(ctx,stmt,NULL);
free(stmt);
- free(jobstr);
- free(jobid);
return edg_wll_Error(ctx,NULL,NULL);
}
/* test whether job shares LB proxy and server DB or not */
-int is_job_local(edg_wll_Context ctx, edg_wlc_JobId jobId)
+int is_job_local(edg_wll_Context ctx, glite_jobid_const_t jobId)
{
char *srvName = NULL;
unsigned int srvPort;
return(ret);
}
-int store_job_server_proxy(edg_wll_Context ctx, edg_wll_Event *event)
-{
+int store_job_server_proxy(edg_wll_Context ctx, edg_wll_Event *event, int *register_to_JP)
+{
char *unique = edg_wlc_JobIdGetUnique(event->any.jobId);
- char *q = NULL, *owner = NULL, *userid = NULL;
+ char *q = NULL, *userid = NULL, *subj;
glite_lbu_Statement stmt = NULL;
- int nar;
+ int nar, grey = 0;
char *can_peername = NULL;
- int local_job = is_job_local(ctx, event->any.jobId);
+ int local_job = is_job_local(ctx, event->any.jobId);
+ char *res[3] = {NULL, NULL, NULL};
+
+
+ /* check auth */
+ if (!ctx->isProxy && !ctx->peerName)
+ return edg_wll_SetError(ctx,EPERM,"LB server can't store using unauthenticated connection");
+ if (ctx->isProxy && (!event->any.user || !strcmp(event->any.user,EDG_WLL_LOG_USER_DEFAULT)) )
+ return edg_wll_SetError(ctx,EPERM,"LB proxy can't store using unauthenticated connection");
- edg_wll_ResetError(ctx);
-
can_peername = edg_wll_gss_normalize_subj(ctx->peerName, 0);
- if (ctx->isProxy) {
- /* event arrived on proxy socket */
- if (event->any.type == EDG_WLL_EVENT_REGJOB) {
- if (event->any.priority & EDG_WLL_LOGFLAG_DIRECT) {
- /* first synchronous registration */
- if (local_job) {
- /* we are both server and proxy for this job */
- trio_asprintf(&q,"update jobs set proxy=1 where jobid='%|Ss'",
- unique);
-
- nar = edg_wll_ExecSQL(ctx, q, NULL);
-
- if (nar == 0) {
- /* job isn't registered yet */
- userid = strdup(strmd5("unknown_to_proxy", NULL));
- if (store_user(ctx,userid,"unknown_to_proxy")) goto err;
-
- if (store_job(ctx,(glite_jobid_const_t) event->any.jobId,
- userid, 1, ctx->serverRunning)) goto err;
-
- }
- else {} /* job was registered thru GSI, no further action needed */
- /* or error occured - and will go out via return() */
- }
- else {
- /* we are only proxy for this job, forward it to server */
-
- /* XXX - does it have any sence ??
- if (!strcmp(e->any.user,EDG_WLL_LOG_USER_DEFAULT)) {
- edg_wll_SetError(ctx,EPERM,"can't register jobs anonymously");
- goto err;
- }
- */
-
- userid = strdup(strmd5(event->any.user, NULL));
- if (store_user(ctx,userid,event->any.user)) goto err;
-
- if (store_job(ctx,(glite_jobid_const_t) event->any.jobId,
- userid, 1, 0)) goto err;
- }
- }
+ trio_asprintf(&q,"select proxy,server,grey from jobs where jobid='%|Ss' for update", event->any.jobId);
+
+ if ( (nar = edg_wll_ExecSQL(ctx,q,&stmt)) < 0 ) {
+ /* Job not registered yet */
+
+ if (!( (event->any.type == EDG_WLL_EVENT_REGJOB) &&
+ (event->any.priority & EDG_WLL_LOGFLAG_DIRECT) ))
+ {
+ if (ctx->greyjobs) grey = 1;
else {
- /* supplementary re-registration (JDL of subjob, etc.) */
- if (local_job) {
- /* previous registration via GSI required */
- trio_asprintf(&q,"update jobs set jobid='%|Ss', proxy='1' where jobid='%|Ss'",
- unique, unique);
- /* does the job exists ? */
- if (edg_wll_ExecSQL(ctx,q,NULL) < 0) {
- edg_wll_SetError(ctx, ENOENT, "job not registered");
- goto err;
- }
- }
- else {
- /* try to register job in case that first reistration */
- /* was sent to server only; ignore errors (EEXIST) */
- userid = strdup(strmd5(event->any.user, NULL));
- if (store_user(ctx,userid,event->any.user)) goto err;
-
- store_job(ctx,(glite_jobid_const_t) event->any.jobId,
- userid, 1, 0);
- edg_wll_ResetError(ctx);
-
- }
- }
- }
- else {
- /* any other event than JobReg */
- trio_asprintf(&q,"update jobs set jobid='%|Ss', proxy='1' where jobid='%|Ss'",
- unique, unique);
- /* does the job exists ? now we require registration on proxy too */
- if (edg_wll_ExecSQL(ctx,q,NULL) < 0) {
edg_wll_SetError(ctx, ENOENT, "job not registered");
- goto err;
+ goto err;
}
}
+
+ subj = strdup( ctx->isProxy ? event->any.user : can_peername);
+ userid = strdup(strmd5(subj, NULL));
+ if (store_user(ctx,userid,subj)) goto err;
+ if (store_job(ctx,(glite_jobid_const_t) event->any.jobId,
+ userid, ctx->isProxy, local_job ? ctx->serverRunning : 0, grey, 0 )) goto err;
+ *register_to_JP = 1;
}
else {
- /* event arrived on LB port */
- if (event->any.type == EDG_WLL_EVENT_REGJOB) {
- trio_asprintf(&q,"select cert_subj from jobs,users where jobs.jobid='%|Ss'"
- " AND jobs.userid=users.userid",unique);
- if ( (nar = edg_wll_ExecSQL(ctx,q,&stmt)) < 0 || edg_wll_FetchRow(ctx,stmt,1,NULL,&owner) < 0 ) {
- goto err;
- }
- free(q); q = NULL;
-
- if (nar) {
- /* job is already registered */
- if (!strcmp(owner,"unknown_to_proxy")) {
- /* proxy registration was already done */
- userid = strdup(strmd5(can_peername, NULL));
- if (store_user(ctx,userid,can_peername)) goto err;
-
- trio_asprintf(&q,"update jobs set server=1, userid='%|Ss' where jobid='%|Ss'",
- userid, unique);
-
- edg_wll_ExecSQL(ctx, q, NULL);
- }
- else { } /* re-registration, no action needed */
- }
- else {
- userid = strdup(strmd5(can_peername, NULL));
- if (store_user(ctx,userid,can_peername)) goto err;
+ /* Job already registered */
- if (store_job(ctx,(glite_jobid_const_t) event->any.jobId,
- userid, 0, 1)) goto err;
- }
+ if (edg_wll_FetchRow(ctx,stmt,sizeof(res)/sizeof(res[0]),NULL,res) < 0) goto err;
+ if (ctx->greyjobs && !strcmp(res[2],"1") &&
+ (event->any.type == EDG_WLL_EVENT_REGJOB) &&
+ (event->any.priority & EDG_WLL_LOGFLAG_DIRECT))
+ {
+ subj = strdup(ctx->isProxy ? event->any.user : can_peername);
+ userid = strdup(strmd5(subj, NULL));
+ if (store_user(ctx,userid,subj)) goto err;
+ if (store_job(ctx,(glite_jobid_const_t) event->any.jobId,
+ userid, (ctx->isProxy || !strcmp(res[0],"1")),
+ !strcmp(res[1],"1") || (local_job ? ctx->serverRunning : 0), 0, 1)) goto err;
+ *register_to_JP = 1;
+
}
else {
- /* any other event than JobReg */
- /* no action needed */
+ // if (!strcmp(res[0],"1") && !strcmp(res[1],"1") ) /*nothing to do */;
+ if ( (!strcmp(res[0],"0") && ctx->isProxy) || (!strcmp(res[1],"0") && !ctx->isProxy) ) {
+ trio_asprintf(&q,"update jobs set server='1', proxy='1' where jobid='%|Ss'",
+ unique);
+ }
}
+
+ /* ??? test whether user from proxy is the same as user from server ??? */
}
-
+
err:
+ free(res[0]); free(res[1]); free(res[3]);
if (stmt) glite_lbu_FreeStmt(&stmt);
- free(unique);
+ free(subj);
free(userid);
free(q);
return edg_wll_Error(ctx,NULL,NULL);
-}
+}
-/*
- * XXX: store it in SHORT_FIELDS for now despite it should go to dedicated
- * column in EVENTS.
- *
- * don't want to change the database structure now, will be done anyway
- * soon
- */
-static int store_seq(edg_wll_Context ctx,edg_wll_Event *e,int no)
-{
- int ret;
- char *stmt;
- char *jobid = edg_wlc_JobIdGetUnique(e->any.jobId);
-
- edg_wll_ResetError(ctx);
- trio_asprintf(&stmt,"insert into short_fields(jobid,event,name,value) "
- "values ('%|Ss',%d,'SEQCODE','%|Ss')",
- jobid,no,e->any.seqcode);
-
- ret = edg_wll_ExecSQL(ctx,stmt,NULL);
- free(stmt);
- free(jobid);
- return ret>=0 ? 0 : edg_wll_Error(ctx,NULL,NULL);
-}
#define SHORT_LEN 255 /* short_fiels.value db column lenght */
char *jobid = edg_wlc_JobIdGetUnique(e->any.jobId);
char *q = NULL,*owner = NULL;
glite_lbu_Statement stmt = NULL;
- char *user;
edg_wll_ResetError(ctx);
if (!ctx->isProxy && !ctx->peerName)
return edg_wll_SetError(ctx,EPERM,"can't store using unauthenticated connection");
-#if 0
- if (e->type == EDG_WLL_EVENT_REGJOB)
- return strcmp(e->any.user,EDG_WLL_LOG_USER_DEFAULT) ?
- 0 : edg_wll_SetError(ctx,EPERM,"can't register jobs anonymously");
-#endif
-
trio_asprintf(&q,"select u.cert_subj from jobs j, users u "
"where j.jobid='%|Ss' and u.userid=j.userid",jobid);
) goto clean;
if (!owner) {
- if ( ctx->isProxy )
- edg_wll_SetError(ctx, EINVAL, "Job not registered");
- else
- /* We have to let the calling function know what happened here
- * even if it hapens inside the LB Proxy which shouldn't consider
- * this as an error
- */
- edg_wll_SetError(ctx, ENOENT, "job not registered");
+ edg_wll_SetError(ctx, ENOENT, "job not registered");
goto clean;
}
}
#ifndef LB_DAG_EMBRIONIC
-static int register_subjobs(edg_wll_Context ctx,const edg_wll_RegJobEvent *e)
+int register_subjobs(edg_wll_Context ctx,const edg_wll_RegJobEvent *e)
{
int i,err;
edg_wlc_JobId *subjobs;
static edg_wll_ErrorCode states_values_embryonic(
edg_wll_Context ctx,
- edg_wlc_JobId jobid,
+ glite_jobid_const_t jobid,
const edg_wll_RegJobEvent *e,
char **icnames,
char **values)
return edg_wll_Error(ctx,NULL,NULL);
}
-static int register_subjobs_embryonic(edg_wll_Context ctx,const edg_wll_RegJobEvent *e, const char *userid)
+int register_subjobs_embryonic(edg_wll_Context ctx,const edg_wll_RegJobEvent *e)
{
- int i, err = 0;
- edg_wlc_JobId *subjobs;
+ int i, j, err = 0;
+ edg_wlc_JobId *subjobs = NULL;
struct timeval now;
- char *jobid_md5, *jobid_md5_old;
+ char *jobid = NULL, *jobid_md5 = NULL, *jobid_md5_old = NULL;
size_t jobid_len;
#ifdef LB_BUF
glite_lbu_bufInsert bi_j;
glite_lbu_bufInsert *bi_jobs = &bi_j;
- char *states_cols;
+ char *states_cols = NULL;
#endif
glite_lbu_bufInsert bi_s, *bi_states = &bi_s;
- char *icnames, *values;
+ char *icnames = NULL, *values = NULL, *userid = NULL, *stmt = NULL;
int server, proxy, membership = 0;
+ glite_lbu_Statement sh = NULL;
edg_wll_ResetError(ctx);
ctx->p_tmp_timeout.tv_sec += e->nsubjobs/10;
if (ctx->p_tmp_timeout.tv_sec > 86400) ctx->p_tmp_timeout.tv_sec = 86400;
- membership = edg_wll_jobMembership(ctx, e->jobId);
+ if ((membership = edg_wll_jobMembership(ctx, e->jobId)) < 0) goto err;
+
proxy = membership & DB_PROXY_JOB;
server = membership & DB_SERVER_JOB;
+ /* get userid of parent job */
+ jobid = edg_wlc_JobIdGetUnique(e->jobId);
+ trio_asprintf(&stmt,"select userid from jobs where jobid='%|Ss'", jobid);
+ if (edg_wll_ExecSQL(ctx,stmt,&sh) < 0 || edg_wll_FetchRow(ctx,sh,1,NULL,&userid) < 0) goto err;
+
for (i=0; i<e->nsubjobs; i++) {
char *et,*ed,*job_s,*p,*p1;
#ifdef LB_BUF
if ((err = store_job_block(ctx, subjobs[i], userid, bi_jobs, proxy, server)))
#else
- if ((err = store_job(ctx, subjobs[i], userid, proxy, server)))
+ if ((err = store_job(ctx, subjobs[i], userid, proxy, server, 0, 0)))
#endif
- edg_wll_Error(ctx,&et,&ed);
+ if (edg_wll_Error(ctx,&et,&ed) == EDEADLOCK) goto err;
/* interchange variable parts (jobids) in values */
/* there are only two occurences of subjob jobid */
if (!err && (err = edg_wll_StoreIntStateEmbryonic(ctx, subjobs[i], icnames, values, bi_states)))
edg_wll_Error(ctx,&et,&ed);
-//job_s = edg_wlc_JobIdUnparse(subjobs[i]);
-//printf("%s\n", job_s);
-//free(job_s);
-
if (err) {
job_s = edg_wlc_JobIdUnparse(subjobs[i]);
fprintf(stderr,"%s: %s (%s)\n",job_s,et,ed);
edg_wlc_JobIdFree(subjobs[i]);
}
+err:
free(jobid_md5_old); //free the last one
free(icnames);
free(values);
+ /* free the rest of subjobs if DEADLOCK occurs */
+ for (j=i; j<e->nsubjobs; j++) edg_wlc_JobIdFree(subjobs[i]);
free(subjobs);
+ if (sh) glite_lbu_FreeStmt(&sh);
+ free(stmt);
#ifdef LB_BUF
/* commit the rest of multirows insert and clean structures */
return edg_wll_Error(ctx,NULL,NULL);
}
+
+
+/* XXX: if event type is user tag, convert the tag name to lowercase!
+ * (not sure whether to convert a value too is reasonable
+ * or keep it 'case sensitive')
+ */
+static void lowercase_usertag(edg_wll_Event *ev)
+{
+ int i;
+
+ if ( ev->any.type == EDG_WLL_EVENT_USERTAG ) {
+
+ for ( i = 0; ev->userTag.name[i] != '\0'; i++ )
+ ev->userTag.name[i] = tolower(ev->userTag.name[i]);
+ }
+}
char *userid, *stmt = NULL,
*res = NULL;
char *can_peername;
- int njobs = 0,ret,i,j;
+ int njobs = 0,ret,i,j,idx;
edg_wlc_JobId *out = NULL;
glite_lbu_Statement sth = NULL;
edg_wll_ErrorCode err = 0;
free(stmt); stmt = NULL;
free(res); res = NULL;
- trio_asprintf(&stmt,"select dg_jobid from jobs where userid = '%|Ss'",userid);
+ trio_asprintf(&stmt,"select dg_jobid from jobs where userid = '%|Ss' and grey='0'",userid);
switch (njobs = edg_wll_ExecSQL(ctx,stmt,&sth)) {
case 0: edg_wll_SetError(ctx,ENOENT,ctx->peerName);
case -1: goto err;
}
*states = calloc(njobs, sizeof(**states));
+ idx = 0;
for (i = 0; i < njobs; i++) {
- if (edg_wll_JobStatusServer(ctx, out[i], -1, &(*states)[i]) != 0) {
+ if (edg_wll_JobStatusServer(ctx, out[idx], -1, &(*states)[idx]) != 0) {
for (j = 0; j < i; j++) edg_wll_FreeStatus(&(*states)[j]);
*states = NULL;
- break;
+ if (edg_wll_Error(ctx, NULL, NULL) == ENOENT) {
+ /* some jobs may be purged meanwhile, ignore */
+ continue;
+ }
+ else break;
}
+ idx++;
}
err:
free(res);