implementation of events processing on shared LB server&proxy database
authorMiloš Mulač <mulac@civ.zcu.cz>
Wed, 3 Oct 2007 11:31:04 +0000 (11:31 +0000)
committerMiloš Mulač <mulac@civ.zcu.cz>
Wed, 3 Oct 2007 11:31:04 +0000 (11:31 +0000)
- compiles, but needs audit and extensive testing

org.glite.lb.server/Makefile
org.glite.lb.server/interface/lbs_db.h
org.glite.lb.server/interface/store.h
org.glite.lb.server/src/bkserverd.c
org.glite.lb.server/src/db_calls.c [new file with mode: 0644]
org.glite.lb.server/src/db_calls.h [new file with mode: 0644]
org.glite.lb.server/src/db_store.c
org.glite.lb.server/src/srv_purge.c
org.glite.lb.server/src/store.c.T

index 8097e0d..e6fc47a 100644 (file)
@@ -180,7 +180,7 @@ BKSERVER_BASE_OBJS:= \
        lb_xml_parse_V21.o \
        lock.o openserver.o query.o userjobs.o db_store.o request.o store.o \
        stored_master.o srv_purge.o server_state.o dump.o lb_authz.o load.o \
-       notification.o il_notification.o notif_match.o stats.o 
+       notification.o il_notification.o notif_match.o stats.o db_calls.o
 
 dotless_gsoap_ver:=${shell echo ${gsoap_version} | tr -d . }
 GSOAP_LIB:=-L${stagedir}/lib -lglite_security_gsoap_plugin_${dotless_gsoap_ver}_${nothrflavour}
@@ -209,7 +209,7 @@ endif
 INDEX_OBJS:= index.o index_parse.o jobstat_supp.o lbs_db.o lbs_db_supp.o openserver.o \
        jobstat.o process_event.o process_event_pbs.o process_event_condor.o query.o lock.o get_events.o write2rgma.o index_lex.o \
        lb_authz.o store.o bkindex.o stats.o\
-       request.o db_store.o srv_purge.o notif_match.o il_lbproxy.o dump.o lb_xml_parse.o il_notification.o lb_proto.o server_state.o lb_xml_parse_V21.o lb_html.o notification.o seqcode.o userjobs.o load.o
+       request.o db_store.o srv_purge.o notif_match.o il_lbproxy.o dump.o lb_xml_parse.o il_notification.o lb_proto.o server_state.o lb_xml_parse_V21.o lb_html.o notification.o seqcode.o userjobs.o load.o db_calls.o
 
 INDEX_LIBS:= ${SRVBONES_LIB} ${COMMON_LIBS} ${EXT_LIBS} 
 
@@ -236,7 +236,8 @@ LIB_OBJS_BK:= \
        lb_xml_parse_V21.o \
        lock.o openserver.o query.o userjobs.o db_store.o request.o store.o \
        stored_master.o srv_purge.o server_state.o dump.o lb_authz.o load.o \
-       notification.o il_notification.o notif_match.o stats.o write2rgma.o
+       notification.o il_notification.o notif_match.o stats.o write2rgma.o \
+       db_calls.o
 
 MONDB_OBJS:=mon-db.o ${LIB_OBJS_BK}
 MONDB_LIBS:=${COMMON_LIBS} ${EXT_LIBS} 
index a9e6099..7e212d9 100644 (file)
@@ -15,9 +15,6 @@ extern "C" {
 #define EDG_WLL_MYSQL_VERSION          40001
 #define BUF_INSERT_ROW_ALLOC_BLOCK     1000
 
-#define        DB_PROXY_JOB    1
-#define        DB_SERVER_JOB   2
-
 
 #define DEFAULTCS      "lbserver/@localhost:lbserver20"
 
index ec08892..a9b50c8 100644 (file)
@@ -45,7 +45,7 @@ int db_parent_store(edg_wll_Context, edg_wll_Event *, intJobStat *);
 int handle_request(edg_wll_Context,char *);
 int create_reply(const edg_wll_Context,char **);
 int trans_db_store(edg_wll_Context,char *,edg_wll_Event *,intJobStat *);
-
+int store_job_server_proxy(edg_wll_Context ctx, edg_wll_Event *event, char *jobIdHost, unsigned int jobIdPort);
 
 int edg_wll_delete_event(edg_wll_Context,const char *, int);
 
index 82bf077..13503db 100644 (file)
@@ -57,6 +57,7 @@ enum lb_srv_perf_sink sink_mode;
 #include "lb_authz.h"
 #include "il_notification.h"
 #include "stats.h"
+#include "db_calls.h"
 
 #ifdef GLITE_LB_SERVER_WITH_WS
 #  if GSOAP_VERSION < 20700
@@ -490,8 +491,10 @@ int main(int argc, char *argv[])
        setlinebuf(stdout);
        setlinebuf(stderr);
 
-       if (mode & SERVICE_PROXY) dprintf(("\nStaring LB proxy service\n"));
-       if (mode & SERVICE_SERVER) dprintf(("\nStaring LB server service\n"));
+       dprintf(("\n"));
+       if (mode & SERVICE_PROXY) dprintf(("Staring LB proxy service\n"));
+       if (mode & SERVICE_SERVER) dprintf(("Staring LB server service\n"));
+       dprintf(("\n"));
 
        if (geteuid()) snprintf(pidfile,sizeof pidfile, "%s/edg-bkserverd.pid", getenv("HOME"));
 
diff --git a/org.glite.lb.server/src/db_calls.c b/org.glite.lb.server/src/db_calls.c
new file mode 100644 (file)
index 0000000..d3f3b9d
--- /dev/null
@@ -0,0 +1,58 @@
+#ident "$Header: "
+
+#include <string.h>
+#include <errno.h>
+
+#include "glite/jobid/cjobid.h"
+
+#include "glite/lb/trio.h"
+#include "glite/lb/context-int.h"
+
+#include "lbs_db.h"
+#include "db_calls.h"
+
+/** Returns bitmask of job membership in common server/proxy database 
+ */
+int edg_wll_jobMembership(edg_wll_Context ctx, edg_wlc_JobId job)
+{
+        char            *dbjob;
+        char            *stmt = NULL;
+        edg_wll_Stmt    q;
+        int             ret, result = -1;
+        char            *res[2] = { NULL, NULL};
+
+        edg_wll_ResetError(ctx);
+
+        dbjob = edg_wlc_JobIdGetUnique(job);
+
+        trio_asprintf(&stmt,"select proxy,server from jobs where jobid = '%|Ss'",dbjob);
+        ret = edg_wll_ExecStmt(ctx,stmt,&q);
+        if (ret <= 0) {
+                if (ret == 0) {
+                        fprintf(stderr,"%s: no such job\n",dbjob);
+                        edg_wll_SetError(ctx,ENOENT,dbjob);
+                }
+                goto clean;
+        }
+        free(stmt); stmt = NULL;
+
+        if ((ret = edg_wll_FetchRow(q,res)) > 0) {
+               result = 0;
+                if (strcmp(res[0],"0")) result += DB_PROXY_JOB;
+                if (strcmp(res[1],"0")) result += DB_SERVER_JOB;
+        }
+        else {
+                if (ret == 0) result = 0;
+                else {
+                        fprintf(stderr,"Error retrieving proxy&server fields of jobs table. Missing column?\n");
+                        edg_wll_SetError(ctx,ENOENT,dbjob);
+                }
+        }
+        edg_wll_FreeStmt(&q);
+
+clean:
+       free(res[0]); free(res[1]);
+        free(dbjob);
+        free(stmt);
+        return(result);
+}
diff --git a/org.glite.lb.server/src/db_calls.h b/org.glite.lb.server/src/db_calls.h
new file mode 100644 (file)
index 0000000..1adcbcc
--- /dev/null
@@ -0,0 +1,11 @@
+#ifndef GLITE_LB_LB_CALLS_H
+#define GLITE_LB_LB_CALLS_H
+
+#ident "$Header:"
+
+#define DB_PROXY_JOB    1
+#define DB_SERVER_JOB   2
+
+int edg_wll_jobMembership(edg_wll_Context ctx, edg_wlc_JobId job);
+
+#endif /* GLITE_LB_LB_CALLS_H */
index 23b1a8d..b2fdafb 100644 (file)
@@ -28,6 +28,7 @@
 
 extern int edg_wll_NotifMatch(edg_wll_Context, const edg_wll_JobStat *);
 
+
 static int db_actual_store(edg_wll_Context ctx, char *event, edg_wll_Event *ev, edg_wll_JobStat *newstat);
 
 
@@ -38,6 +39,9 @@ db_store(edg_wll_Context ctx,char *ucs, char *event)
   int  seq;
   int   err;
   edg_wll_JobStat      newstat;
+  char                         *srvName;
+  unsigned int         srvPort;
+
 
   ev = NULL;
 
@@ -56,16 +60,22 @@ db_store(edg_wll_Context ctx,char *ucs, char *event)
   }
 #endif
 
+  edg_wlc_JobIdGetServerParts(ev->any.jobId, &srvName, &srvPort);
+
+  if(use_db) {
+    if (edg_wll_LockJob(ctx,ev->any.jobId)) goto err;
+    if(store_job_server_proxy(ctx, ev, srvName, srvPort))
+
+      goto err;
+  }
+  if (edg_wll_UnlockJob(ctx,ev->any.jobId)) goto err;
+
+
   /* events logged to proxy and server (DIRECT flag) may be ignored on proxy
    * if jobid prefix hostname matches server hostname -> they will
    * sooner or later arrive to server too and are stored in common DB 
    */
   if (ctx->isProxy && ctx->serverRunning && (ev->any.priority & EDG_WLL_LOGFLAG_DIRECT) ) {
-       char            *srvName;
-       unsigned int    srvPort;
-
-       
-       edg_wlc_JobIdGetServerParts(ev->any.jobId, &srvName, &srvPort);
        if (!strcmp(ctx->srvName, srvName)) {
                return 0;
        }
index 5e1c323..7e9bb7f 100644 (file)
@@ -27,6 +27,7 @@
 #include "get_events.h"
 #include "purge.h"
 #include "lb_xml_parse.h"
+#include "db_calls.h"
 
 
 #define DUMP_FILE_STORAGE                                      "/tmp/"
@@ -599,50 +600,6 @@ clean:
 }
 
 
-int edg_wll_jobMembership(edg_wll_Context ctx, edg_wlc_JobId job)
-{
-        char            *dbjob;
-        char            *stmt = NULL;
-        edg_wll_Stmt    q;
-        int             ret, result = -1;
-        char            *res[2] = { NULL, NULL};
-
-        edg_wll_ResetError(ctx);
-
-        dbjob = edg_wlc_JobIdGetUnique(job);
-
-        trio_asprintf(&stmt,"select proxy,server from jobs where jobid = '%|Ss'",dbjob);
-        ret = edg_wll_ExecStmt(ctx,stmt,&q);
-        if (ret <= 0) {
-                if (ret == 0) {
-                        fprintf(stderr,"%s: no such job\n",dbjob);
-                        edg_wll_SetError(ctx,ENOENT,dbjob);
-                }
-                goto clean;
-        }
-        free(stmt); stmt = NULL;
-
-        if ((ret = edg_wll_FetchRow(q,res)) > 0) {
-               result = 0;
-                if (strcmp(res[0],"0")) result += DB_PROXY_JOB;
-                if (strcmp(res[1],"0")) result += DB_SERVER_JOB;
-        }
-        else {
-                if (ret == 0) result = 0;
-                else {
-                        fprintf(stderr,"Error retrieving proxy&server fields of jobs table. Missing column?\n");
-                        edg_wll_SetError(ctx,ENOENT,dbjob);
-                }
-        }
-        edg_wll_FreeStmt(&q);
-
-clean:
-       free(res[0]); free(res[1]);
-        free(dbjob);
-        free(stmt);
-        return(result);
-}
-
 int unset_proxy_flag(edg_wll_Context ctx, edg_wlc_JobId job)
 {
        char    *stmt = NULL;
index 12241a7..a9114a3 100644 (file)
 #include "lock.h"
 #include "lb_authz.h"
 #include "jobstat.h"
+#include "db_calls.h"
 
 static int store_user(edg_wll_Context,const char *,const char *); 
-static int store_job(edg_wll_Context,const edg_wlc_JobId,const char *);
+static int store_job(edg_wll_Context,const edg_wlc_JobId,const char *, int, int);
 #ifdef LB_BUF
-static int store_job_block(edg_wll_Context, const edg_wlc_JobId, const char *, edg_wll_bufInsert *);
+static int store_job_block(edg_wll_Context, const edg_wlc_JobId, const char *, edg_wll_bufInsert *, int, int);
 #endif
 static int store_job_grey(edg_wll_Context,const edg_wlc_JobId,time_t);
 static int store_flesh(edg_wll_Context,edg_wll_Event *,char *,int);
@@ -94,26 +95,14 @@ int edg_wll_StoreEvent(edg_wll_Context ctx,edg_wll_Event *e,int *seq)
 /* FIXME: does not work for grey jobs due to "select from jobs" -- I don't care for the time being */
        if ((err = check_dup(ctx,e))) goto clean;
 
-       userid = strdup(strmd5(e->any.user,NULL));
-
-       if ((e->type == EDG_WLL_EVENT_REGJOB || lbproxy_notreg)) {
-               /* Register the job and owner. For LBproxy, contant "lbproxy"
-                   is used as the name - it's harmless as the job is already
-                  registered with LBserver */
-               char *username;
-
-               username = (ctx->isProxy) ? "lbproxy" : ctx->peerName;
-               userid = strdup(strmd5(username, NULL));
-               if ((err = store_user(ctx,userid, username))) goto clean;
-               if ((err = store_job(ctx,e->any.jobId,userid))) goto clean;
-       } else {
-               /* for other events just make sure user record is there */
-               userid = strdup(strmd5(e->any.user,NULL));
-               if ((err = store_user(ctx,userid,e->any.user))) goto clean;
-       }
-
        jobid = edg_wlc_JobIdGetUnique(e->any.jobId);
 
+       trio_asprintf(&stmt,"select userid from jobs where jobid='%|Ss'", jobid);
+       
+       if (edg_wll_ExecStmt(ctx,stmt,&sh) < 0 || edg_wll_FetchRow(sh,&userid) < 0) goto clean;
+       if (sh) edg_wll_FreeStmt(&sh);
+
+
 /* obtain next event sequence number */
        trio_asprintf(&select_max,
                "select max(event) from events "
@@ -260,14 +249,13 @@ static int store_user(edg_wll_Context ctx,const char *userid,const char *subj)
        return edg_wll_Error(ctx,NULL,NULL);
 }
 
-static int store_job(edg_wll_Context ctx,const edg_wlc_JobId job,const char *userid)
+static int store_job(edg_wll_Context ctx,const edg_wlc_JobId job,const char *userid, int proxy, int server)
 {
        char *jobstr = edg_wlc_JobIdUnparse(job);
        char *jobid = edg_wlc_JobIdGetUnique(job);
        char *stmt;
        char *srvName;
        unsigned int    srvPort;
-       int proxy=0, server=0;
 
 /* debug Duplicate key on index: Duplicate entry '(nil)' for key 1
  */
@@ -313,7 +301,7 @@ static int store_job(edg_wll_Context ctx,const edg_wlc_JobId job,const char *use
 }
 
 #ifdef LB_BUF
-static int store_job_block(edg_wll_Context ctx,const edg_wlc_JobId job,const char *userid, edg_wll_bufInsert *bi)
+static int store_job_block(edg_wll_Context ctx,const edg_wlc_JobId job,const char *userid, edg_wll_bufInsert *bi, int proxy, int server)
 {
        char *jobstr = edg_wlc_JobIdUnparse(job);
        char *jobid = edg_wlc_JobIdGetUnique(job);
@@ -329,7 +317,7 @@ static int store_job_block(edg_wll_Context ctx,const edg_wlc_JobId job,const cha
 
        edg_wll_ResetError(ctx);
 
-       trio_asprintf(&row, "'%|Ss','%|Ss','%|Ss'", jobid,jobstr,userid);
+       trio_asprintf(&row, "'%|Ss','%|Ss','%|Ss','%|Ss','%|Ss'", jobid,jobstr,userid,proxy,server);
        edg_wll_bufferedInsert(bi, row); // no need to free row
 
        free(jobstr);
@@ -363,6 +351,140 @@ static int store_job_grey(edg_wll_Context ctx,const edg_wlc_JobId job,time_t eti
        return edg_wll_Error(ctx,NULL,NULL);
 }
 
+
+int store_job_server_proxy(edg_wll_Context ctx, edg_wll_Event *event, char *jobIdHost, unsigned int jobIdPort)
+{      
+       char            *unique = edg_wlc_JobIdGetUnique(event->any.jobId);
+       char            *q = NULL, *owner = NULL, *userid = NULL;
+       edg_wll_Stmt    stmt = NULL;
+       int             nar;
+
+
+       edg_wll_ResetError(ctx);
+
+       if (ctx->isProxy) {
+               /* event arrived on proxy socket */
+               if (event->any.type == EDG_WLL_EVENT_REGJOB) {
+                       if (event->any.priority & EDG_WLL_LOGFLAG_DIRECT) {
+                        /* first synchronous registration */
+                               if ((ctx->srvPort == jobIdPort) && !strcmp(jobIdHost,ctx->srvName)) {
+                                       /* we are both server and proxy for this job */
+                                       trio_asprintf(&q,"update jobs set proxy=1 where jobid='%|Ss'",
+                                               unique);
+
+                                       nar = edg_wll_ExecStmt(ctx, q, NULL);
+
+                                       if (nar == 0) {
+                                               /* job isn't registered yet */
+                                               userid = strdup(strmd5("unknown_to_proxy", NULL));
+                                               if (store_user(ctx,userid,"unknown_to_proxy")) goto err;
+
+                                               if (store_job(ctx,(const edg_wlc_JobId) event->any.jobId,
+                                                       userid, 1, ctx->serverRunning)) goto err;
+                                                       
+                                       }
+                                       else {} /* job was registered thru GSI, no further action needed */
+                                               /* or error occured - and will go out via return() */
+                               }
+                               else {
+                                       /* we are only proxy for this job, forward it to server */
+
+                                       /* XXX - does it have any sence ??
+                                       if (!strcmp(e->any.user,EDG_WLL_LOG_USER_DEFAULT)) {
+                                               edg_wll_SetError(ctx,EPERM,"can't register jobs anonymously");
+                                               goto err;
+                                       }
+                                       */
+
+                                       userid = strdup(strmd5(event->any.user, NULL));
+                                       if (store_user(ctx,userid,event->any.user)) goto err;
+
+                                       if (store_job(ctx,(const edg_wlc_JobId) event->any.jobId,
+                                                        userid, 1, 0)) goto err;
+                               }
+                       }
+                       else {
+                               /* supplementary re-registration (JDL of subjob, etc.) */
+                               if ((ctx->srvPort == jobIdPort) && !strcmp(jobIdHost,ctx->srvName)) {
+                                       /* previous registration via GSI required */
+                                       trio_asprintf(&q,"update jobs set jobid='%|Ss' where jobid='%|Ss'",
+                                               unique, unique);
+                                       /* does the job exists ? */
+                                       if (edg_wll_ExecStmt(ctx,q,NULL) < 0) {
+                                               edg_wll_SetError(ctx, ENOENT, "job not registered");
+                                               goto err;
+                                       }
+                               }
+                               else {
+                                       /* try to register job in case that first reistration   */
+                                       /* was sent to server only; ignore errors (EEXIST)      */
+                                       userid = strdup(strmd5(event->any.user, NULL));
+                                       if (store_user(ctx,userid,event->any.user)) goto err;
+
+                                       store_job(ctx,(const edg_wlc_JobId) event->any.jobId,
+                                                        userid, 1, 0);
+                                       edg_wll_ResetError(ctx);
+                                       
+                               }
+                       }
+               }
+               else {
+                       /* any other event than JobReg */
+                       trio_asprintf(&q,"update jobs set jobid='%|Ss' where jobid='%|Ss'",
+                               unique, unique);
+                       /* does the job exists ? now we require registration on proxy too */
+                       if (edg_wll_ExecStmt(ctx,q,NULL) < 0) {
+                               edg_wll_SetError(ctx, ENOENT, "job not registered");
+                                goto err;
+                       }
+               }
+       }
+       else {
+               /* event arrived on LB port */
+               if (event->any.type == EDG_WLL_EVENT_REGJOB) {
+                       trio_asprintf(&q,"select cert_subj from jobs,users where jobs.jobid='%|Ss'"
+                                       " AND jobs.userid=users.userid",unique);
+                       if ( (nar = edg_wll_ExecStmt(ctx,q,&stmt)) < 0 || edg_wll_FetchRow(stmt,&owner) < 0 ) 
+                               goto err;
+                       
+                       if (nar) {
+                               /* job is already registered */
+                               if (!strcmp(owner,"unknown_to_proxy")) {
+                                       /* proxy registration was already done */
+                                       userid = strdup(strmd5(ctx->peerName, NULL));
+                                       if (store_user(ctx,userid,ctx->peerName)) goto err;
+
+                                       trio_asprintf(&q,"update jobs set server=1, userid='%|Ss' where jobid='%|Ss'",
+                                               userid, unique);
+                                       
+                                       edg_wll_ExecStmt(ctx, q, NULL);
+                               }
+                               else { } /* re-registration, no action needed */
+                       }
+                       else {
+                               userid = strdup(strmd5(ctx->peerName, NULL));
+                               if (store_user(ctx,userid,ctx->peerName)) goto err;
+
+                               if (store_job(ctx,(const edg_wlc_JobId) event->any.jobId,
+                                               userid, 0, 1)) goto err;
+                       }
+               }
+               else {
+                       /* any other event than JobReg  */
+                       /* no action needed             */
+               }
+       }
+       
+err:
+       if (stmt) edg_wll_FreeStmt(&stmt);
+       free(unique);
+       free(userid);
+       free(q);
+
+       return edg_wll_Error(ctx,NULL,NULL);
+}
+
+
 /*
  * XXX: store it in SHORT_FIELDS for now despite it should go to dedicated
  *     column in EVENTS.
@@ -541,10 +663,6 @@ static int check_auth(edg_wll_Context ctx,edg_wll_Event *e)
 
        edg_wll_ResetError(ctx);
 
-       if (e->type == EDG_WLL_EVENT_REGJOB) 
-               return strcmp(e->any.user,EDG_WLL_LOG_USER_DEFAULT) ?
-                       0 : edg_wll_SetError(ctx,EPERM,"can't register jobs anonymously");
-
        trio_asprintf(&q,"select userid from jobs where jobid='%|Ss'",jobid);
 
        if (edg_wll_ExecStmt(ctx,q,&stmt) < 0
@@ -552,8 +670,8 @@ static int check_auth(edg_wll_Context ctx,edg_wll_Event *e)
        ) goto clean;
 
        if (!owner) {
-               if ( ctx->isProxy && !e->any.seqcode )
-                       edg_wll_SetError(ctx, EINVAL, "Job not registered - sequence code needed");
+               if ( ctx->isProxy )
+                       edg_wll_SetError(ctx, EINVAL, "Job not registered");
                else
                        /* We have to let the calling function know what happened here
                         * even if it hapens inside the LB Proxy which shouldn't consider
@@ -738,6 +856,7 @@ static int register_subjobs_embryonic(edg_wll_Context ctx,const edg_wll_RegJobEv
 #endif
        edg_wll_bufInsert       bi_s, *bi_states = &bi_s;
        char                    *icnames, *values;
+       int                     server, proxy, membership = 0;
 
 
        edg_wll_ResetError(ctx);
@@ -757,7 +876,7 @@ static int register_subjobs_embryonic(edg_wll_Context ctx,const edg_wll_RegJobEv
 #ifdef LB_BUF
        /* init multirows insert mechanism for tables used here */
        if (edg_wll_bufferedInsertInit(ctx, bi_jobs, NULL, "jobs", 4000, 1000,
-               "jobid, dg_jobid, userid"))
+               "jobid, dg_jobid, userid, proxy, server"))
        {
                return edg_wll_SetError(ctx, EINVAL, "edg_wll_bufferedInsertInit()");
        }
@@ -777,14 +896,18 @@ static int register_subjobs_embryonic(edg_wll_Context ctx,const edg_wll_RegJobEv
        ctx->p_tmp_timeout.tv_sec += e->nsubjobs/10;
        if (ctx->p_tmp_timeout.tv_sec > 86400) ctx->p_tmp_timeout.tv_sec = 86400;
 
+       membership = edg_wll_jobMembership(ctx, e->jobId);
+       proxy = membership & DB_PROXY_JOB;
+       server = membership & DB_SERVER_JOB;
+
        for (i=0; i<e->nsubjobs; i++) {
                char            *et,*ed,*job_s,*p,*p1;
 
                /* save jobid-userid relation into jobs table */
 #ifdef LB_BUF
-               if ((err = store_job_block(ctx, subjobs[i], userid, bi_jobs)))
+               if ((err = store_job_block(ctx, subjobs[i], userid, bi_jobs, proxy, server)))
 #else
-               if ((err = store_job(ctx, subjobs[i], userid)))
+               if ((err = store_job(ctx, subjobs[i], userid, proxy, server)))
 #endif
                        edg_wll_Error(ctx,&et,&ed);