static int noAuth = 0;
static int noIndex = 0;
static int strict_locking = 0;
+static int greyjobs = 0;
static int count_statistics = 0;
static int hardJobsLimit = 0;
static int hardEventsLimit = 0;
{"perf-sink", 1, NULL, 'K'},
#endif
{"transactions", 1, NULL, 'b'},
+ {"greyjobs", 0, NULL, 'g'},
{NULL,0,NULL,0}
};
-static const char *get_opt_string = "a:c:k:C:V:p:drm:ns:l:L:N:i:S:D:X:Y:T:t:J:jzb:"
+static const char *get_opt_string = "a:c:k:C:V:p:drm:ns:l:L:N:i:S:D:X:Y:T:t:J:jzb:g"
#ifdef GLITE_LB_SERVER_WITH_WS
"w:"
#endif
#ifdef LB_PERF
"\t--perf-sink\t where to sink events\n"
#endif
+ "\t-g,--greyjobs\t allow delayed registration (grey jobs), implies --strict-locking\n"
,me);
}
case 'K': sink_mode = atoi(optarg);
break;
#endif
+ case 'g': greyjobs = strict_locking = 1;
+ break;
case '?': usage(name); return 1;
}
case 2: ctx->noIndex = 1; break;
}
ctx->strict_locking = strict_locking;
+ ctx->greyjobs = greyjobs;
return 0;
static int store_user(edg_wll_Context,const char *,const char *);
static int store_job(edg_wll_Context,const edg_wlc_JobId,const char *);
+#ifdef LB_BUF
static int store_job_block(edg_wll_Context, const edg_wlc_JobId, const char *, edg_wll_bufInsert *);
+#endif
+static int store_job_grey(edg_wll_Context,const edg_wlc_JobId,time_t);
static int store_flesh(edg_wll_Context,edg_wll_Event *,char *,int);
static int store_seq(edg_wll_Context,edg_wll_Event *,int);
static int check_dup(edg_wll_Context,edg_wll_Event *);
static int check_auth(edg_wll_Context,edg_wll_Event *e);
+#ifndef LB_DAG_EMBRIONIC
static int register_subjobs(edg_wll_Context,const edg_wll_RegJobEvent *);
+#endif
static int register_subjobs_embryonic(edg_wll_Context,const edg_wll_RegJobEvent *);
void edg_wll_StoreAnonymous(edg_wll_Context ctx,int anon) {
switch (err = check_auth(ctx,e)) {
case 0: break;
case ENOENT:
- if ( !ctx->isProxy ) goto clean;
+ if ( !ctx->isProxy ) {
+ if (ctx->greyjobs) {
+ edg_wll_ResetError(ctx);
+ if (store_job_grey(ctx,e->any.jobId,e->any.timestamp.tv_sec))
+ goto clean;
+ break;
+ }
+ else goto clean;
+ }
+
edg_wll_ResetError(ctx);
lbproxy_notreg = 1;
break;
break;
default: goto clean;
}
+
+/* FIXME: does not work for grey jobs due to "select from jobs" -- I don't care for the time being */
if ((err = check_dup(ctx,e))) goto clean;
userid = strdup(strmd5(e->any.user,NULL));
edg_wll_ResetError(ctx);
}
free(stmt);
+
+ if (ctx->greyjobs) {
+ trio_asprintf(&stmt,"delete from grey_jobs where jobid = '%|Ss'",jobid);
+ edg_wll_ExecStmt(ctx,stmt,NULL); /* XXX: error propagates */
+ free(stmt);
+ }
+
free(jobstr);
free(jobid);
return edg_wll_Error(ctx,NULL,NULL);
}
+#ifdef LB_BUF
static int store_job_block(edg_wll_Context ctx,const edg_wlc_JobId job,const char *userid, edg_wll_bufInsert *bi)
{
char *jobstr = edg_wlc_JobIdUnparse(job);
/* debug Duplicate key on index: Duplicate entry '(nil)' for key 1
*/
+
+ assert(!ctx->greyjobs); /* XXX: should not happen */
+
if (jobid == NULL || jobstr == NULL)
return edg_wll_SetError(ctx,EINVAL,"store_jobi_block()");
free(jobid);
return edg_wll_Error(ctx,NULL,NULL);
}
+#endif
+
+static int store_job_grey(edg_wll_Context ctx,const edg_wlc_JobId job,time_t etime)
+{
+ char *jobstr = edg_wlc_JobIdUnparse(job);
+ char *jobid = edg_wlc_JobIdGetUnique(job);
+ char *stmt;
+
+ if (jobid == NULL || jobstr == NULL)
+ return edg_wll_SetError(ctx,EINVAL,"store_job_grey()");
+
+ edg_wll_ResetError(ctx);
+ trio_asprintf(&stmt,"insert into grey_jobs(jobid,dg_jobid,time_stamp) "
+ "values ('%|Ss','%|Ss',%s)",
+ jobid,jobstr,edg_wll_TimeToDB(etime));
+
+ if (edg_wll_ExecStmt(ctx,stmt,NULL) < 0) {
+ if (edg_wll_Error(ctx,NULL,NULL) == EEXIST)
+ edg_wll_ResetError(ctx);
+ }
+
+ free(stmt);
+ free(jobstr);
+ free(jobid);
+ return edg_wll_Error(ctx,NULL,NULL);
+}
/*
* XXX: store it in SHORT_FIELDS for now despite it should go to dedicated
return edg_wll_Error(ctx,NULL,NULL);
}
+#ifndef LB_DAG_EMBRIONIC
static int register_subjobs(edg_wll_Context ctx,const edg_wll_RegJobEvent *e)
{
int i,err;
free(subjobs);
return edg_wll_Error(ctx,NULL,NULL);
}
+#endif
/*
edg_wlc_JobId *subjobs;
struct timeval now;
char *userid = strdup(strmd5(e->user,NULL));
- char *jobid_md5, *jobid_md5_old, *states_cols;
+ char *jobid_md5, *jobid_md5_old;
size_t jobid_len;
- edg_wll_bufInsert bi_j, bi_s;
- edg_wll_bufInsert *bi_jobs = &bi_j, *bi_states = &bi_s;
+#ifdef LB_BUF
+ edg_wll_bufInsert bi_j;
+ edg_wll_bufInsert *bi_jobs = &bi_j;
+ char *states_cols;
+#endif
+ edg_wll_bufInsert bi_s, *bi_states = &bi_s;
char *icnames, *values;