From a0572f11061b4c399896e9b57993eccd524497ae Mon Sep 17 00:00:00 2001 From: =?utf8?q?Milo=C5=A1=20Mula=C4=8D?= Date: Wed, 3 Oct 2007 11:31:04 +0000 Subject: [PATCH] implementation of events processing on shared LB server&proxy database - compiles, but needs audit and extensive testing --- org.glite.lb.server/Makefile | 7 +- org.glite.lb.server/interface/lbs_db.h | 3 - org.glite.lb.server/interface/store.h | 2 +- org.glite.lb.server/src/bkserverd.c | 7 +- org.glite.lb.server/src/db_calls.c | 58 ++++++++++ org.glite.lb.server/src/db_calls.h | 11 ++ org.glite.lb.server/src/db_store.c | 20 +++- org.glite.lb.server/src/srv_purge.c | 45 +------- org.glite.lb.server/src/store.c.T | 189 +++++++++++++++++++++++++++------ 9 files changed, 251 insertions(+), 91 deletions(-) create mode 100644 org.glite.lb.server/src/db_calls.c create mode 100644 org.glite.lb.server/src/db_calls.h diff --git a/org.glite.lb.server/Makefile b/org.glite.lb.server/Makefile index 8097e0d..e6fc47a 100644 --- a/org.glite.lb.server/Makefile +++ b/org.glite.lb.server/Makefile @@ -180,7 +180,7 @@ BKSERVER_BASE_OBJS:= \ lb_xml_parse_V21.o \ lock.o openserver.o query.o userjobs.o db_store.o request.o store.o \ stored_master.o srv_purge.o server_state.o dump.o lb_authz.o load.o \ - notification.o il_notification.o notif_match.o stats.o + notification.o il_notification.o notif_match.o stats.o db_calls.o dotless_gsoap_ver:=${shell echo ${gsoap_version} | tr -d . } GSOAP_LIB:=-L${stagedir}/lib -lglite_security_gsoap_plugin_${dotless_gsoap_ver}_${nothrflavour} @@ -209,7 +209,7 @@ endif INDEX_OBJS:= index.o index_parse.o jobstat_supp.o lbs_db.o lbs_db_supp.o openserver.o \ jobstat.o process_event.o process_event_pbs.o process_event_condor.o query.o lock.o get_events.o write2rgma.o index_lex.o \ lb_authz.o store.o bkindex.o stats.o\ - request.o db_store.o srv_purge.o notif_match.o il_lbproxy.o dump.o lb_xml_parse.o il_notification.o lb_proto.o server_state.o lb_xml_parse_V21.o lb_html.o notification.o seqcode.o userjobs.o load.o + request.o db_store.o srv_purge.o notif_match.o il_lbproxy.o dump.o lb_xml_parse.o il_notification.o lb_proto.o server_state.o lb_xml_parse_V21.o lb_html.o notification.o seqcode.o userjobs.o load.o db_calls.o INDEX_LIBS:= ${SRVBONES_LIB} ${COMMON_LIBS} ${EXT_LIBS} @@ -236,7 +236,8 @@ LIB_OBJS_BK:= \ lb_xml_parse_V21.o \ lock.o openserver.o query.o userjobs.o db_store.o request.o store.o \ stored_master.o srv_purge.o server_state.o dump.o lb_authz.o load.o \ - notification.o il_notification.o notif_match.o stats.o write2rgma.o + notification.o il_notification.o notif_match.o stats.o write2rgma.o \ + db_calls.o MONDB_OBJS:=mon-db.o ${LIB_OBJS_BK} MONDB_LIBS:=${COMMON_LIBS} ${EXT_LIBS} diff --git a/org.glite.lb.server/interface/lbs_db.h b/org.glite.lb.server/interface/lbs_db.h index a9e6099..7e212d9 100644 --- a/org.glite.lb.server/interface/lbs_db.h +++ b/org.glite.lb.server/interface/lbs_db.h @@ -15,9 +15,6 @@ extern "C" { #define EDG_WLL_MYSQL_VERSION 40001 #define BUF_INSERT_ROW_ALLOC_BLOCK 1000 -#define DB_PROXY_JOB 1 -#define DB_SERVER_JOB 2 - #define DEFAULTCS "lbserver/@localhost:lbserver20" diff --git a/org.glite.lb.server/interface/store.h b/org.glite.lb.server/interface/store.h index ec08892..a9b50c8 100644 --- a/org.glite.lb.server/interface/store.h +++ b/org.glite.lb.server/interface/store.h @@ -45,7 +45,7 @@ int db_parent_store(edg_wll_Context, edg_wll_Event *, intJobStat *); int handle_request(edg_wll_Context,char *); int create_reply(const edg_wll_Context,char **); int trans_db_store(edg_wll_Context,char *,edg_wll_Event *,intJobStat *); - +int store_job_server_proxy(edg_wll_Context ctx, edg_wll_Event *event, char *jobIdHost, unsigned int jobIdPort); int edg_wll_delete_event(edg_wll_Context,const char *, int); diff --git a/org.glite.lb.server/src/bkserverd.c b/org.glite.lb.server/src/bkserverd.c index 82bf077..13503db 100644 --- a/org.glite.lb.server/src/bkserverd.c +++ b/org.glite.lb.server/src/bkserverd.c @@ -57,6 +57,7 @@ enum lb_srv_perf_sink sink_mode; #include "lb_authz.h" #include "il_notification.h" #include "stats.h" +#include "db_calls.h" #ifdef GLITE_LB_SERVER_WITH_WS # if GSOAP_VERSION < 20700 @@ -490,8 +491,10 @@ int main(int argc, char *argv[]) setlinebuf(stdout); setlinebuf(stderr); - if (mode & SERVICE_PROXY) dprintf(("\nStaring LB proxy service\n")); - if (mode & SERVICE_SERVER) dprintf(("\nStaring LB server service\n")); + dprintf(("\n")); + if (mode & SERVICE_PROXY) dprintf(("Staring LB proxy service\n")); + if (mode & SERVICE_SERVER) dprintf(("Staring LB server service\n")); + dprintf(("\n")); if (geteuid()) snprintf(pidfile,sizeof pidfile, "%s/edg-bkserverd.pid", getenv("HOME")); diff --git a/org.glite.lb.server/src/db_calls.c b/org.glite.lb.server/src/db_calls.c new file mode 100644 index 0000000..d3f3b9d --- /dev/null +++ b/org.glite.lb.server/src/db_calls.c @@ -0,0 +1,58 @@ +#ident "$Header: " + +#include +#include + +#include "glite/jobid/cjobid.h" + +#include "glite/lb/trio.h" +#include "glite/lb/context-int.h" + +#include "lbs_db.h" +#include "db_calls.h" + +/** Returns bitmask of job membership in common server/proxy database + */ +int edg_wll_jobMembership(edg_wll_Context ctx, edg_wlc_JobId job) +{ + char *dbjob; + char *stmt = NULL; + edg_wll_Stmt q; + int ret, result = -1; + char *res[2] = { NULL, NULL}; + + edg_wll_ResetError(ctx); + + dbjob = edg_wlc_JobIdGetUnique(job); + + trio_asprintf(&stmt,"select proxy,server from jobs where jobid = '%|Ss'",dbjob); + ret = edg_wll_ExecStmt(ctx,stmt,&q); + if (ret <= 0) { + if (ret == 0) { + fprintf(stderr,"%s: no such job\n",dbjob); + edg_wll_SetError(ctx,ENOENT,dbjob); + } + goto clean; + } + free(stmt); stmt = NULL; + + if ((ret = edg_wll_FetchRow(q,res)) > 0) { + result = 0; + if (strcmp(res[0],"0")) result += DB_PROXY_JOB; + if (strcmp(res[1],"0")) result += DB_SERVER_JOB; + } + else { + if (ret == 0) result = 0; + else { + fprintf(stderr,"Error retrieving proxy&server fields of jobs table. Missing column?\n"); + edg_wll_SetError(ctx,ENOENT,dbjob); + } + } + edg_wll_FreeStmt(&q); + +clean: + free(res[0]); free(res[1]); + free(dbjob); + free(stmt); + return(result); +} diff --git a/org.glite.lb.server/src/db_calls.h b/org.glite.lb.server/src/db_calls.h new file mode 100644 index 0000000..1adcbcc --- /dev/null +++ b/org.glite.lb.server/src/db_calls.h @@ -0,0 +1,11 @@ +#ifndef GLITE_LB_LB_CALLS_H +#define GLITE_LB_LB_CALLS_H + +#ident "$Header:" + +#define DB_PROXY_JOB 1 +#define DB_SERVER_JOB 2 + +int edg_wll_jobMembership(edg_wll_Context ctx, edg_wlc_JobId job); + +#endif /* GLITE_LB_LB_CALLS_H */ diff --git a/org.glite.lb.server/src/db_store.c b/org.glite.lb.server/src/db_store.c index 23b1a8d..b2fdafb 100644 --- a/org.glite.lb.server/src/db_store.c +++ b/org.glite.lb.server/src/db_store.c @@ -28,6 +28,7 @@ extern int edg_wll_NotifMatch(edg_wll_Context, const edg_wll_JobStat *); + static int db_actual_store(edg_wll_Context ctx, char *event, edg_wll_Event *ev, edg_wll_JobStat *newstat); @@ -38,6 +39,9 @@ db_store(edg_wll_Context ctx,char *ucs, char *event) int seq; int err; edg_wll_JobStat newstat; + char *srvName; + unsigned int srvPort; + ev = NULL; @@ -56,16 +60,22 @@ db_store(edg_wll_Context ctx,char *ucs, char *event) } #endif + edg_wlc_JobIdGetServerParts(ev->any.jobId, &srvName, &srvPort); + + if(use_db) { + if (edg_wll_LockJob(ctx,ev->any.jobId)) goto err; + if(store_job_server_proxy(ctx, ev, srvName, srvPort)) + + goto err; + } + if (edg_wll_UnlockJob(ctx,ev->any.jobId)) goto err; + + /* events logged to proxy and server (DIRECT flag) may be ignored on proxy * if jobid prefix hostname matches server hostname -> they will * sooner or later arrive to server too and are stored in common DB */ if (ctx->isProxy && ctx->serverRunning && (ev->any.priority & EDG_WLL_LOGFLAG_DIRECT) ) { - char *srvName; - unsigned int srvPort; - - - edg_wlc_JobIdGetServerParts(ev->any.jobId, &srvName, &srvPort); if (!strcmp(ctx->srvName, srvName)) { return 0; } diff --git a/org.glite.lb.server/src/srv_purge.c b/org.glite.lb.server/src/srv_purge.c index 5e1c323..7e9bb7f 100644 --- a/org.glite.lb.server/src/srv_purge.c +++ b/org.glite.lb.server/src/srv_purge.c @@ -27,6 +27,7 @@ #include "get_events.h" #include "purge.h" #include "lb_xml_parse.h" +#include "db_calls.h" #define DUMP_FILE_STORAGE "/tmp/" @@ -599,50 +600,6 @@ clean: } -int edg_wll_jobMembership(edg_wll_Context ctx, edg_wlc_JobId job) -{ - char *dbjob; - char *stmt = NULL; - edg_wll_Stmt q; - int ret, result = -1; - char *res[2] = { NULL, NULL}; - - edg_wll_ResetError(ctx); - - dbjob = edg_wlc_JobIdGetUnique(job); - - trio_asprintf(&stmt,"select proxy,server from jobs where jobid = '%|Ss'",dbjob); - ret = edg_wll_ExecStmt(ctx,stmt,&q); - if (ret <= 0) { - if (ret == 0) { - fprintf(stderr,"%s: no such job\n",dbjob); - edg_wll_SetError(ctx,ENOENT,dbjob); - } - goto clean; - } - free(stmt); stmt = NULL; - - if ((ret = edg_wll_FetchRow(q,res)) > 0) { - result = 0; - if (strcmp(res[0],"0")) result += DB_PROXY_JOB; - if (strcmp(res[1],"0")) result += DB_SERVER_JOB; - } - else { - if (ret == 0) result = 0; - else { - fprintf(stderr,"Error retrieving proxy&server fields of jobs table. Missing column?\n"); - edg_wll_SetError(ctx,ENOENT,dbjob); - } - } - edg_wll_FreeStmt(&q); - -clean: - free(res[0]); free(res[1]); - free(dbjob); - free(stmt); - return(result); -} - int unset_proxy_flag(edg_wll_Context ctx, edg_wlc_JobId job) { char *stmt = NULL; diff --git a/org.glite.lb.server/src/store.c.T b/org.glite.lb.server/src/store.c.T index 12241a7..a9114a3 100644 --- a/org.glite.lb.server/src/store.c.T +++ b/org.glite.lb.server/src/store.c.T @@ -30,11 +30,12 @@ #include "lock.h" #include "lb_authz.h" #include "jobstat.h" +#include "db_calls.h" static int store_user(edg_wll_Context,const char *,const char *); -static int store_job(edg_wll_Context,const edg_wlc_JobId,const char *); +static int store_job(edg_wll_Context,const edg_wlc_JobId,const char *, int, int); #ifdef LB_BUF -static int store_job_block(edg_wll_Context, const edg_wlc_JobId, const char *, edg_wll_bufInsert *); +static int store_job_block(edg_wll_Context, const edg_wlc_JobId, const char *, edg_wll_bufInsert *, int, int); #endif static int store_job_grey(edg_wll_Context,const edg_wlc_JobId,time_t); static int store_flesh(edg_wll_Context,edg_wll_Event *,char *,int); @@ -94,26 +95,14 @@ int edg_wll_StoreEvent(edg_wll_Context ctx,edg_wll_Event *e,int *seq) /* FIXME: does not work for grey jobs due to "select from jobs" -- I don't care for the time being */ if ((err = check_dup(ctx,e))) goto clean; - userid = strdup(strmd5(e->any.user,NULL)); - - if ((e->type == EDG_WLL_EVENT_REGJOB || lbproxy_notreg)) { - /* Register the job and owner. For LBproxy, contant "lbproxy" - is used as the name - it's harmless as the job is already - registered with LBserver */ - char *username; - - username = (ctx->isProxy) ? "lbproxy" : ctx->peerName; - userid = strdup(strmd5(username, NULL)); - if ((err = store_user(ctx,userid, username))) goto clean; - if ((err = store_job(ctx,e->any.jobId,userid))) goto clean; - } else { - /* for other events just make sure user record is there */ - userid = strdup(strmd5(e->any.user,NULL)); - if ((err = store_user(ctx,userid,e->any.user))) goto clean; - } - jobid = edg_wlc_JobIdGetUnique(e->any.jobId); + trio_asprintf(&stmt,"select userid from jobs where jobid='%|Ss'", jobid); + + if (edg_wll_ExecStmt(ctx,stmt,&sh) < 0 || edg_wll_FetchRow(sh,&userid) < 0) goto clean; + if (sh) edg_wll_FreeStmt(&sh); + + /* obtain next event sequence number */ trio_asprintf(&select_max, "select max(event) from events " @@ -260,14 +249,13 @@ static int store_user(edg_wll_Context ctx,const char *userid,const char *subj) return edg_wll_Error(ctx,NULL,NULL); } -static int store_job(edg_wll_Context ctx,const edg_wlc_JobId job,const char *userid) +static int store_job(edg_wll_Context ctx,const edg_wlc_JobId job,const char *userid, int proxy, int server) { char *jobstr = edg_wlc_JobIdUnparse(job); char *jobid = edg_wlc_JobIdGetUnique(job); char *stmt; char *srvName; unsigned int srvPort; - int proxy=0, server=0; /* debug Duplicate key on index: Duplicate entry '(nil)' for key 1 */ @@ -313,7 +301,7 @@ static int store_job(edg_wll_Context ctx,const edg_wlc_JobId job,const char *use } #ifdef LB_BUF -static int store_job_block(edg_wll_Context ctx,const edg_wlc_JobId job,const char *userid, edg_wll_bufInsert *bi) +static int store_job_block(edg_wll_Context ctx,const edg_wlc_JobId job,const char *userid, edg_wll_bufInsert *bi, int proxy, int server) { char *jobstr = edg_wlc_JobIdUnparse(job); char *jobid = edg_wlc_JobIdGetUnique(job); @@ -329,7 +317,7 @@ static int store_job_block(edg_wll_Context ctx,const edg_wlc_JobId job,const cha edg_wll_ResetError(ctx); - trio_asprintf(&row, "'%|Ss','%|Ss','%|Ss'", jobid,jobstr,userid); + trio_asprintf(&row, "'%|Ss','%|Ss','%|Ss','%|Ss','%|Ss'", jobid,jobstr,userid,proxy,server); edg_wll_bufferedInsert(bi, row); // no need to free row free(jobstr); @@ -363,6 +351,140 @@ static int store_job_grey(edg_wll_Context ctx,const edg_wlc_JobId job,time_t eti return edg_wll_Error(ctx,NULL,NULL); } + +int store_job_server_proxy(edg_wll_Context ctx, edg_wll_Event *event, char *jobIdHost, unsigned int jobIdPort) +{ + char *unique = edg_wlc_JobIdGetUnique(event->any.jobId); + char *q = NULL, *owner = NULL, *userid = NULL; + edg_wll_Stmt stmt = NULL; + int nar; + + + edg_wll_ResetError(ctx); + + if (ctx->isProxy) { + /* event arrived on proxy socket */ + if (event->any.type == EDG_WLL_EVENT_REGJOB) { + if (event->any.priority & EDG_WLL_LOGFLAG_DIRECT) { + /* first synchronous registration */ + if ((ctx->srvPort == jobIdPort) && !strcmp(jobIdHost,ctx->srvName)) { + /* we are both server and proxy for this job */ + trio_asprintf(&q,"update jobs set proxy=1 where jobid='%|Ss'", + unique); + + nar = edg_wll_ExecStmt(ctx, q, NULL); + + if (nar == 0) { + /* job isn't registered yet */ + userid = strdup(strmd5("unknown_to_proxy", NULL)); + if (store_user(ctx,userid,"unknown_to_proxy")) goto err; + + if (store_job(ctx,(const edg_wlc_JobId) event->any.jobId, + userid, 1, ctx->serverRunning)) goto err; + + } + else {} /* job was registered thru GSI, no further action needed */ + /* or error occured - and will go out via return() */ + } + else { + /* we are only proxy for this job, forward it to server */ + + /* XXX - does it have any sence ?? + if (!strcmp(e->any.user,EDG_WLL_LOG_USER_DEFAULT)) { + edg_wll_SetError(ctx,EPERM,"can't register jobs anonymously"); + goto err; + } + */ + + userid = strdup(strmd5(event->any.user, NULL)); + if (store_user(ctx,userid,event->any.user)) goto err; + + if (store_job(ctx,(const edg_wlc_JobId) event->any.jobId, + userid, 1, 0)) goto err; + } + } + else { + /* supplementary re-registration (JDL of subjob, etc.) */ + if ((ctx->srvPort == jobIdPort) && !strcmp(jobIdHost,ctx->srvName)) { + /* previous registration via GSI required */ + trio_asprintf(&q,"update jobs set jobid='%|Ss' where jobid='%|Ss'", + unique, unique); + /* does the job exists ? */ + if (edg_wll_ExecStmt(ctx,q,NULL) < 0) { + edg_wll_SetError(ctx, ENOENT, "job not registered"); + goto err; + } + } + else { + /* try to register job in case that first reistration */ + /* was sent to server only; ignore errors (EEXIST) */ + userid = strdup(strmd5(event->any.user, NULL)); + if (store_user(ctx,userid,event->any.user)) goto err; + + store_job(ctx,(const edg_wlc_JobId) event->any.jobId, + userid, 1, 0); + edg_wll_ResetError(ctx); + + } + } + } + else { + /* any other event than JobReg */ + trio_asprintf(&q,"update jobs set jobid='%|Ss' where jobid='%|Ss'", + unique, unique); + /* does the job exists ? now we require registration on proxy too */ + if (edg_wll_ExecStmt(ctx,q,NULL) < 0) { + edg_wll_SetError(ctx, ENOENT, "job not registered"); + goto err; + } + } + } + else { + /* event arrived on LB port */ + if (event->any.type == EDG_WLL_EVENT_REGJOB) { + trio_asprintf(&q,"select cert_subj from jobs,users where jobs.jobid='%|Ss'" + " AND jobs.userid=users.userid",unique); + if ( (nar = edg_wll_ExecStmt(ctx,q,&stmt)) < 0 || edg_wll_FetchRow(stmt,&owner) < 0 ) + goto err; + + if (nar) { + /* job is already registered */ + if (!strcmp(owner,"unknown_to_proxy")) { + /* proxy registration was already done */ + userid = strdup(strmd5(ctx->peerName, NULL)); + if (store_user(ctx,userid,ctx->peerName)) goto err; + + trio_asprintf(&q,"update jobs set server=1, userid='%|Ss' where jobid='%|Ss'", + userid, unique); + + edg_wll_ExecStmt(ctx, q, NULL); + } + else { } /* re-registration, no action needed */ + } + else { + userid = strdup(strmd5(ctx->peerName, NULL)); + if (store_user(ctx,userid,ctx->peerName)) goto err; + + if (store_job(ctx,(const edg_wlc_JobId) event->any.jobId, + userid, 0, 1)) goto err; + } + } + else { + /* any other event than JobReg */ + /* no action needed */ + } + } + +err: + if (stmt) edg_wll_FreeStmt(&stmt); + free(unique); + free(userid); + free(q); + + return edg_wll_Error(ctx,NULL,NULL); +} + + /* * XXX: store it in SHORT_FIELDS for now despite it should go to dedicated * column in EVENTS. @@ -541,10 +663,6 @@ static int check_auth(edg_wll_Context ctx,edg_wll_Event *e) edg_wll_ResetError(ctx); - if (e->type == EDG_WLL_EVENT_REGJOB) - return strcmp(e->any.user,EDG_WLL_LOG_USER_DEFAULT) ? - 0 : edg_wll_SetError(ctx,EPERM,"can't register jobs anonymously"); - trio_asprintf(&q,"select userid from jobs where jobid='%|Ss'",jobid); if (edg_wll_ExecStmt(ctx,q,&stmt) < 0 @@ -552,8 +670,8 @@ static int check_auth(edg_wll_Context ctx,edg_wll_Event *e) ) goto clean; if (!owner) { - if ( ctx->isProxy && !e->any.seqcode ) - edg_wll_SetError(ctx, EINVAL, "Job not registered - sequence code needed"); + if ( ctx->isProxy ) + edg_wll_SetError(ctx, EINVAL, "Job not registered"); else /* We have to let the calling function know what happened here * even if it hapens inside the LB Proxy which shouldn't consider @@ -738,6 +856,7 @@ static int register_subjobs_embryonic(edg_wll_Context ctx,const edg_wll_RegJobEv #endif edg_wll_bufInsert bi_s, *bi_states = &bi_s; char *icnames, *values; + int server, proxy, membership = 0; edg_wll_ResetError(ctx); @@ -757,7 +876,7 @@ static int register_subjobs_embryonic(edg_wll_Context ctx,const edg_wll_RegJobEv #ifdef LB_BUF /* init multirows insert mechanism for tables used here */ if (edg_wll_bufferedInsertInit(ctx, bi_jobs, NULL, "jobs", 4000, 1000, - "jobid, dg_jobid, userid")) + "jobid, dg_jobid, userid, proxy, server")) { return edg_wll_SetError(ctx, EINVAL, "edg_wll_bufferedInsertInit()"); } @@ -777,14 +896,18 @@ static int register_subjobs_embryonic(edg_wll_Context ctx,const edg_wll_RegJobEv ctx->p_tmp_timeout.tv_sec += e->nsubjobs/10; if (ctx->p_tmp_timeout.tv_sec > 86400) ctx->p_tmp_timeout.tv_sec = 86400; + membership = edg_wll_jobMembership(ctx, e->jobId); + proxy = membership & DB_PROXY_JOB; + server = membership & DB_SERVER_JOB; + for (i=0; insubjobs; i++) { char *et,*ed,*job_s,*p,*p1; /* save jobid-userid relation into jobs table */ #ifdef LB_BUF - if ((err = store_job_block(ctx, subjobs[i], userid, bi_jobs))) + if ((err = store_job_block(ctx, subjobs[i], userid, bi_jobs, proxy, server))) #else - if ((err = store_job(ctx, subjobs[i], userid))) + if ((err = store_job(ctx, subjobs[i], userid, proxy, server))) #endif edg_wll_Error(ctx,&et,&ed); -- 1.8.2.3