grey jobs
authorAleš Křenek <ljocha@ics.muni.cz>
Fri, 23 Feb 2007 13:10:28 +0000 (13:10 +0000)
committerAleš Křenek <ljocha@ics.muni.cz>
Fri, 23 Feb 2007 13:10:28 +0000 (13:10 +0000)
org.glite.lb.server/config/glite-lb-dbsetup.sql
org.glite.lb.server/src/bkserverd.c
org.glite.lb.server/src/jobstat.h
org.glite.lb.server/src/process_event.c
org.glite.lb.server/src/store.c.T

index 81266fe..6e512be 100644 (file)
@@ -9,6 +9,15 @@ create table jobs (
        index (userid)
 ) engine=innodb;
 
+create table grey_jobs (
+       jobid           char(32)        binary not null,
+       dg_jobid        varchar(255)    binary not null,
+       time_stamp      datetime        not null,
+
+       primary key (jobid),
+       unique (dg_jobid)
+) engine=innodb;
+
 create table users (
        userid          char(32)        binary not null,
        cert_subj       varchar(255)    binary not null,
index 00ccabf..97d5484 100644 (file)
@@ -117,6 +117,7 @@ static const int            one = 1;
 static int                             noAuth = 0;
 static int                             noIndex = 0;
 static int                             strict_locking = 0;
+static int greyjobs = 0;
 static int count_statistics = 0;
 static int                             hardJobsLimit = 0;
 static int                             hardEventsLimit = 0;
@@ -182,10 +183,11 @@ static struct option opts[] = {
        {"perf-sink",           1, NULL,        'K'},
 #endif
        {"transactions",        1,      NULL,   'b'},
+       {"greyjobs",    0,      NULL,   'g'},
        {NULL,0,NULL,0}
 };
 
-static const char *get_opt_string = "a:c:k:C:V:p:drm:ns:l:L:N:i:S:D:X:Y:T:t:J:jzb:"
+static const char *get_opt_string = "a:c:k:C:V:p:drm:ns:l:L:N:i:S:D:X:Y:T:t:J:jzb:g"
 #ifdef GLITE_LB_SERVER_WITH_WS
        "w:"
 #endif
@@ -234,6 +236,7 @@ static void usage(char *me)
 #ifdef LB_PERF
                "\t--perf-sink\t where to sink events\n"
 #endif
+               "\t-g,--greyjobs\t allow delayed registration (grey jobs), implies --strict-locking\n"
 
        ,me);
 }
@@ -400,6 +403,8 @@ int main(int argc, char *argv[])
                case 'K': sink_mode = atoi(optarg);
                          break;
 #endif
+               case 'g': greyjobs = strict_locking = 1;
+                         break;
                case '?': usage(name); return 1;
        }
 
@@ -947,6 +952,7 @@ int bk_handle_connection(int conn, struct timeval *timeout, void *data)
        case 2: ctx->noIndex = 1; break;
        }
        ctx->strict_locking = strict_locking;
+       ctx->greyjobs = greyjobs;
 
 
        return 0;
index 9fc07ab..f21bbdd 100644 (file)
@@ -27,6 +27,7 @@
 #define RET_BADBRANCH   7
 #define RET_GOODBRANCH  8
 #define RET_TOOOLD      9
+#define RET_UNREG      10
 #define RET_INTERNAL    100
 
 
index 7133219..de0b038 100644 (file)
@@ -52,6 +52,7 @@ int processEvent(intJobStat *js, edg_wll_Event *e, int ev_seq, int strict, char
                        return processEvent_glite(js,e,ev_seq,strict,errstring);
                case EDG_WLL_STAT_PBS: 
                        return processEvent_PBS(js,e,ev_seq,strict,errstring);
+               case -1: return RET_UNREG;
                default: 
                        asprintf(errstring,"undefined job type %d",js->pub.jobtype);
                        return RET_FAIL;
index f24dd2f..cdaa0e4 100644 (file)
 
 static int store_user(edg_wll_Context,const char *,const char *); 
 static int store_job(edg_wll_Context,const edg_wlc_JobId,const char *);
+#ifdef LB_BUF
 static int store_job_block(edg_wll_Context, const edg_wlc_JobId, const char *, edg_wll_bufInsert *);
+#endif
+static int store_job_grey(edg_wll_Context,const edg_wlc_JobId,time_t);
 static int store_flesh(edg_wll_Context,edg_wll_Event *,char *,int);
 static int store_seq(edg_wll_Context,edg_wll_Event *,int);
 static int check_dup(edg_wll_Context,edg_wll_Event *);
 static int check_auth(edg_wll_Context,edg_wll_Event *e); 
+#ifndef LB_DAG_EMBRIONIC
 static int register_subjobs(edg_wll_Context,const edg_wll_RegJobEvent *);
+#endif
 static int register_subjobs_embryonic(edg_wll_Context,const edg_wll_RegJobEvent *);
 
 void edg_wll_StoreAnonymous(edg_wll_Context ctx,int anon) {
@@ -69,7 +74,16 @@ int edg_wll_StoreEvent(edg_wll_Context ctx,edg_wll_Event *e,int *seq)
        switch (err = check_auth(ctx,e)) {
                case 0: break;
                case ENOENT: 
-                       if ( !ctx->isProxy ) goto clean;
+                       if ( !ctx->isProxy ) {
+                               if (ctx->greyjobs) {
+                                       edg_wll_ResetError(ctx);
+                                       if (store_job_grey(ctx,e->any.jobId,e->any.timestamp.tv_sec))
+                                               goto clean;
+                                       break;
+                               } 
+                               else goto clean;
+                       }
+
                        edg_wll_ResetError(ctx);
                        lbproxy_notreg = 1;
                        break;
@@ -79,6 +93,8 @@ int edg_wll_StoreEvent(edg_wll_Context ctx,edg_wll_Event *e,int *seq)
                        break;
                default: goto clean;
        }
+
+/* FIXME: does not work for grey jobs due to "select from jobs" -- I don't care for the time being */
        if ((err = check_dup(ctx,e))) goto clean;
 
        userid = strdup(strmd5(e->any.user,NULL));
@@ -259,11 +275,19 @@ static int store_job(edg_wll_Context ctx,const edg_wlc_JobId job,const char *use
                        edg_wll_ResetError(ctx);
        }
        free(stmt);
+
+       if (ctx->greyjobs) {
+               trio_asprintf(&stmt,"delete from grey_jobs where jobid = '%|Ss'",jobid);
+               edg_wll_ExecStmt(ctx,stmt,NULL); /* XXX: error propagates */
+               free(stmt);
+       }
+
        free(jobstr);
        free(jobid);
        return edg_wll_Error(ctx,NULL,NULL);
 }
 
+#ifdef LB_BUF
 static int store_job_block(edg_wll_Context ctx,const edg_wlc_JobId job,const char *userid, edg_wll_bufInsert *bi)
 {
        char *jobstr = edg_wlc_JobIdUnparse(job);
@@ -272,6 +296,9 @@ static int store_job_block(edg_wll_Context ctx,const edg_wlc_JobId job,const cha
 
 /* debug Duplicate key on index: Duplicate entry '(nil)' for key 1
  */
+
+       assert(!ctx->greyjobs); /* XXX: should not happen */
+
        if (jobid == NULL || jobstr == NULL) 
                return edg_wll_SetError(ctx,EINVAL,"store_jobi_block()");
 
@@ -284,6 +311,32 @@ static int store_job_block(edg_wll_Context ctx,const edg_wlc_JobId job,const cha
        free(jobid);
        return edg_wll_Error(ctx,NULL,NULL);
 }
+#endif
+
+static int store_job_grey(edg_wll_Context ctx,const edg_wlc_JobId job,time_t etime)
+{
+       char *jobstr = edg_wlc_JobIdUnparse(job);
+       char *jobid = edg_wlc_JobIdGetUnique(job);
+       char *stmt;
+
+       if (jobid == NULL || jobstr == NULL)
+               return edg_wll_SetError(ctx,EINVAL,"store_job_grey()");
+
+       edg_wll_ResetError(ctx);
+       trio_asprintf(&stmt,"insert into grey_jobs(jobid,dg_jobid,time_stamp) "
+                       "values ('%|Ss','%|Ss',%s)",
+                       jobid,jobstr,edg_wll_TimeToDB(etime));
+
+       if (edg_wll_ExecStmt(ctx,stmt,NULL) < 0) {
+               if (edg_wll_Error(ctx,NULL,NULL) == EEXIST)
+                       edg_wll_ResetError(ctx);
+       }
+
+       free(stmt);
+       free(jobstr);
+       free(jobid);
+       return edg_wll_Error(ctx,NULL,NULL);
+}
 
 /*
  * XXX: store it in SHORT_FIELDS for now despite it should go to dedicated
@@ -506,6 +559,7 @@ clean:
        return edg_wll_Error(ctx,NULL,NULL);
 }
 
+#ifndef LB_DAG_EMBRIONIC
 static int register_subjobs(edg_wll_Context ctx,const edg_wll_RegJobEvent *e)
 {
        int     i,err;
@@ -593,6 +647,7 @@ static int register_subjobs(edg_wll_Context ctx,const edg_wll_RegJobEvent *e)
        free(subjobs);
        return edg_wll_Error(ctx,NULL,NULL);
 }
+#endif
 
 
 /*
@@ -648,10 +703,14 @@ static int register_subjobs_embryonic(edg_wll_Context ctx,const edg_wll_RegJobEv
        edg_wlc_JobId           *subjobs;
        struct timeval          now;
        char                    *userid = strdup(strmd5(e->user,NULL));
-       char                    *jobid_md5, *jobid_md5_old, *states_cols;
+       char                    *jobid_md5, *jobid_md5_old;
        size_t                  jobid_len;
-       edg_wll_bufInsert       bi_j, bi_s;
-       edg_wll_bufInsert       *bi_jobs = &bi_j, *bi_states = &bi_s;
+#ifdef LB_BUF
+       edg_wll_bufInsert       bi_j;
+       edg_wll_bufInsert       *bi_jobs = &bi_j;
+               char                    *states_cols;
+#endif
+       edg_wll_bufInsert       bi_s, *bi_states = &bi_s;
        char                    *icnames, *values;