#include <regex.h>
#include <syslog.h>
+#ifndef _GNU_SOURCE
+#define _GNU_SOURCE
+#endif
+
+#include <search.h>
+
#include "glite/lb/producer.h"
#include "glite/lb/context-int.h"
#include "glite/lb/trio.h"
edg_wll_Stmt sh;
int num_sub, num_f, i, ii;
+
edg_wll_ResetError(ctx);
string_jobid = edg_wlc_JobIdUnparse(job);
return edg_wll_Error(ctx,NULL,NULL);
}
+
+
/* authorization check */
if ( !(ctx->noAuth) &&
- (!(ctx->peerName) || strcmp(ctx->peerName, jobstat.pub.owner))) {
- intErr = (acl == NULL) || edg_wll_CheckACL(ctx, acl, EDG_WLL_PERM_READ);
- if (intErr) {
- free(string_jobid);
- free(md5_jobid);
- free(jobstat.pub.owner); jobstat.pub.owner = NULL;
- if (acl) {
- edg_wll_FreeAcl(acl);
- return edg_wll_Error(ctx, NULL, NULL);
- } else {
- return edg_wll_SetError(ctx,EPERM, "not owner, no ACL is set");
- }
- }
+ (!(ctx->peerName) || strcmp(ctx->peerName, jobstat.pub.owner) )
+ ) {
+ ENTRY gmap_search, *gmap_return;
+
+ gmap_search.key = ctx->peerName;
+ gmap_search.data = NULL;
+
+ if (!ctx->gridmap ||
+ !hsearch_r(gmap_search,FIND,&gmap_return,ctx->gridmap) ||
+ strcmp(jobstat.pub.owner,(const char *) gmap_return->data)
+ ) {
+ intErr = (acl == NULL) || edg_wll_CheckACL(ctx, acl, EDG_WLL_PERM_READ);
+ if (intErr) {
+ free(string_jobid);
+ free(md5_jobid);
+ free(jobstat.pub.owner); jobstat.pub.owner = NULL;
+ if (acl) {
+ edg_wll_FreeAcl(acl);
+ return edg_wll_Error(ctx, NULL, NULL);
+ } else {
+ return edg_wll_SetError(ctx,EPERM, "not owner, no ACL is set");
+ }
+ }
+ }
}
intErr = edg_wll_LoadIntState(ctx, job, -1 /*all events*/, &ijsp);
#include "glite/lb/trio.h"
#include "store.h"
+#include "store_int.h"
#include "get_events.h"
#include "lbs_db.h"
#include "lock.h"
#include "jobstat.h"
static int store_user(edg_wll_Context,const char *,const char *);
-static int store_job(edg_wll_Context,const edg_wlc_JobId,const char *);
+static int store_job(edg_wll_Context,const edg_wll_Event *,struct glite_lb_StoreAux *);
+static int store_job_grey(edg_wll_Context,const edg_wll_Event *,struct glite_lb_StoreAux *);
#ifdef LB_BUF
static int store_job_block(edg_wll_Context, const edg_wlc_JobId, const char *, edg_wll_bufInsert *);
#endif
-static int store_job_grey(edg_wll_Context,const edg_wlc_JobId,time_t);
static int store_flesh(edg_wll_Context,edg_wll_Event *,char *,int);
static int store_seq(edg_wll_Context,edg_wll_Event *,int);
static int check_dup(edg_wll_Context,edg_wll_Event *);
#endif
static int register_subjobs_embryonic(edg_wll_Context,const edg_wll_RegJobEvent *);
+static int store_event(edg_wll_Context ctx,edg_wll_Event *e,struct glite_lb_StoreAux *aux);
+
void edg_wll_StoreAnonymous(edg_wll_Context ctx,int anon) {
ctx->allowAnonymous = anon;
}
int edg_wll_StoreEvent(edg_wll_Context ctx,edg_wll_Event *e,int *seq)
{
+ int err = 0;
+ struct glite_lb_StoreAux aux;
+
+ edg_wll_ResetError(ctx);
+ memset(&aux,0,sizeof aux);
+
+ aux.now_s = edg_wll_TimeToDB(ctx->event_load ? e->any.arrived.tv_sec : time(NULL));
+ aux.jobid = edg_wlc_JobIdGetUnique(e->any.jobId);
+
+ if (e->type == EDG_WLL_EVENT_REGJOB) switch(err = store_job(ctx,e,&aux)) {
+ case 0: break;
+ case EEXIST:
+ if ((err = glite_lb_AuthEvent(ctx,e,&aux))) goto clean;
+ default: goto clean;
+ }
+ else switch ((err = glite_lb_AuthEvent(ctx,e,&aux))) {
+ case 0: break;
+ case ENOENT:
+ if ((err = store_job_grey(ctx,e,&aux))) goto clean;
+ break;
+ default: goto clean;
+ }
+
+ if ((err = store_event(ctx,e,&aux)) ||
+ (err = glite_lb_AfterRegistration(ctx,e,&aux))
+ ) goto clean;
+
+ *seq = aux.seq;
+clean:
+ /* TODO: free aux */
+ free(aux.now_s);
+ free(aux.jobid);
+ return err;
+}
+
+static int store_event(edg_wll_Context ctx,edg_wll_Event *e,struct glite_lb_StoreAux *aux)
+{
+ char *select_max,*ssrc,*stmt;
+ int next,err = 0;
+ edg_wll_Stmt sh = NULL;
+
+ edg_wll_ResetError(ctx);
+
+/* obtain next event sequence number */
+ trio_asprintf(&select_max,
+ "select max(event) from events "
+ "where jobid = '%|Ss'",aux->jobid);
+
+ ssrc = edg_wll_SourceToString(e->any.source);
+
+/* try to insert (someone else may be doing the same) */
+ while (1) {
+ char *max;
+
+ if (edg_wll_ExecStmt(ctx,select_max,&sh) < 0 ||
+ edg_wll_FetchRow(sh,&max) < 0)
+ {
+ err = edg_wll_Error(ctx,NULL,NULL);
+ goto clean;
+ }
+ edg_wll_FreeStmt(&sh);
+
+ next = max && *max ? atoi(max)+1 : 0;
+
+ /*
+ * 1) when using transactions:
+ * Store the whole event right now.
+ *
+ * 2) when not using transaction:
+ * Store an UNDEF event first in order to prevent race condition
+ * with readers and update event code later.
+ */
+ trio_asprintf(&stmt,
+ "insert into events(jobid,event,code,prog,host,time_stamp,usec,arrived,level,userid) "
+ "values ('%|Ss',%d,%d,'%|Ss','%|Ss',%s,%d,%s,%d,'%|Ss')",
+ aux->jobid,next,
+ ctx->use_transactions ? (int) e->any.type : EDG_WLL_EVENT_UNDEF,
+ ssrc,e->any.host,
+ edg_wll_TimeToDB(e->any.timestamp.tv_sec),e->any.timestamp.tv_usec,
+ aux->now_s, e->any.level,aux->userid);
+
+ if (edg_wll_ExecStmt(ctx,stmt,NULL) < 0) {
+ if ((err = edg_wll_Error(ctx,NULL,NULL)) != EEXIST)
+ goto clean;
+ } else break; /* successful insert */
+
+ /* we were late -- try once again */
+ free(stmt); stmt = NULL;
+ }
+
+ free(ssrc); ssrc = NULL;
+ free(select_max); select_max = NULL;
+ free(stmt); stmt = NULL;
+
+ if ((err = store_seq(ctx,e,next)) ||
+ (err = store_flesh(ctx,e,aux->jobid,next))) {
+ /* attempt to cleanup, ignore new errors */
+ char *desc;
+ edg_wll_ErrorCode oerr = edg_wll_Error(ctx,NULL,&desc);
+
+ edg_wll_delete_event(ctx,aux->jobid,next);
+ edg_wll_SetError(ctx,oerr,desc);
+ free(desc);
+ } else
+ if (!ctx->use_transactions) {
+ /* emulate commit, i.e. swith to a real event type to make
+ * the record valid */
+ trio_asprintf(&stmt,
+ "update events set code=%d "
+ "where jobid='%|Ss' and event=%d",
+ (int) e->any.type,aux->jobid,next);
+ switch (edg_wll_ExecStmt(ctx,stmt,NULL)) {
+ case 0: if (ctx->strict_locking)
+ err = edg_wll_SetError(ctx,ENOENT,"event disappeared on store while strict locking");
+ /* purge in progres: drop the garbage, ignore errors */
+ else {
+ edg_wll_delete_event(ctx,aux->jobid,next);
+ err = edg_wll_SetError(ctx,ENOENT,"job being purged");
+ }
+ break;
+ case 1: if (ctx->strict_locking) err = 0;
+ else {
+ /* check whether the job is still there to prevent garbage
+ * left while there is a concurrent purge
+ */
+ free(stmt);
+ trio_asprintf(&stmt,
+ "select 'x' from jobs where jobid='%|Ss'",
+ aux->jobid);
+ switch (edg_wll_ExecStmt(ctx,stmt,NULL)) {
+ case 1: break;
+ case 0: /* purge in progres */
+ edg_wll_delete_event(ctx,aux->jobid,next);
+ err = edg_wll_SetError(ctx,ENOENT,"job being purged");
+ break;
+ default: err = edg_wll_SetError(ctx,EDG_WLL_ERROR_DB_CALL,
+ "more job records, what is that?");
+ break;
+ }
+ }
+ break;
+ case -1: err = edg_wll_Error(ctx,NULL,NULL);
+ break;
+
+ default: err = edg_wll_SetError(ctx,EDG_WLL_ERROR_DB_CALL,
+ "more event records, what is that?");
+ break;
+ }
+ } /* if !ctx->use_transactions */
+
+ aux->seq = next;
+
+clean:
+ free(ssrc);
+ free(select_max);
+ free(stmt);
+ edg_wll_FreeStmt(&sh);
+ return err;
+}
+
+int OLD_edg_wll_StoreEvent(edg_wll_Context ctx,edg_wll_Event *e,int *seq)
+{
edg_wll_ErrorCode err = 0;
char *userid = NULL,*jobid,*stmt;
char *select_max,*ssrc;