#include "lock.h"
#include "lb_authz.h"
#include "jobstat.h"
+#include "db_calls.h"
static int store_user(edg_wll_Context,const char *,const char *);
-static int store_job(edg_wll_Context,const edg_wlc_JobId,const char *);
+static int store_job(edg_wll_Context,const edg_wlc_JobId,const char *, int, int);
#ifdef LB_BUF
-static int store_job_block(edg_wll_Context, const edg_wlc_JobId, const char *, edg_wll_bufInsert *);
+static int store_job_block(edg_wll_Context, const edg_wlc_JobId, const char *, edg_wll_bufInsert *, int, int);
#endif
static int store_job_grey(edg_wll_Context,const edg_wlc_JobId,time_t);
static int store_flesh(edg_wll_Context,edg_wll_Event *,char *,int);
/* FIXME: does not work for grey jobs due to "select from jobs" -- I don't care for the time being */
if ((err = check_dup(ctx,e))) goto clean;
- userid = strdup(strmd5(e->any.user,NULL));
-
- if ((e->type == EDG_WLL_EVENT_REGJOB || lbproxy_notreg)) {
- /* Register the job and owner. For LBproxy, contant "lbproxy"
- is used as the name - it's harmless as the job is already
- registered with LBserver */
- char *username;
-
- username = (ctx->isProxy) ? "lbproxy" : ctx->peerName;
- userid = strdup(strmd5(username, NULL));
- if ((err = store_user(ctx,userid, username))) goto clean;
- if ((err = store_job(ctx,e->any.jobId,userid))) goto clean;
- } else {
- /* for other events just make sure user record is there */
- userid = strdup(strmd5(e->any.user,NULL));
- if ((err = store_user(ctx,userid,e->any.user))) goto clean;
- }
-
jobid = edg_wlc_JobIdGetUnique(e->any.jobId);
+ trio_asprintf(&stmt,"select userid from jobs where jobid='%|Ss'", jobid);
+
+ if (edg_wll_ExecStmt(ctx,stmt,&sh) < 0 || edg_wll_FetchRow(sh,&userid) < 0) goto clean;
+ if (sh) edg_wll_FreeStmt(&sh);
+
+
/* obtain next event sequence number */
trio_asprintf(&select_max,
"select max(event) from events "
return edg_wll_Error(ctx,NULL,NULL);
}
-static int store_job(edg_wll_Context ctx,const edg_wlc_JobId job,const char *userid)
+static int store_job(edg_wll_Context ctx,const edg_wlc_JobId job,const char *userid, int proxy, int server)
{
char *jobstr = edg_wlc_JobIdUnparse(job);
char *jobid = edg_wlc_JobIdGetUnique(job);
char *stmt;
char *srvName;
unsigned int srvPort;
- int proxy=0, server=0;
/* debug Duplicate key on index: Duplicate entry '(nil)' for key 1
*/
}
#ifdef LB_BUF
-static int store_job_block(edg_wll_Context ctx,const edg_wlc_JobId job,const char *userid, edg_wll_bufInsert *bi)
+static int store_job_block(edg_wll_Context ctx,const edg_wlc_JobId job,const char *userid, edg_wll_bufInsert *bi, int proxy, int server)
{
char *jobstr = edg_wlc_JobIdUnparse(job);
char *jobid = edg_wlc_JobIdGetUnique(job);
edg_wll_ResetError(ctx);
- trio_asprintf(&row, "'%|Ss','%|Ss','%|Ss'", jobid,jobstr,userid);
+ trio_asprintf(&row, "'%|Ss','%|Ss','%|Ss','%|Ss','%|Ss'", jobid,jobstr,userid,proxy,server);
edg_wll_bufferedInsert(bi, row); // no need to free row
free(jobstr);
return edg_wll_Error(ctx,NULL,NULL);
}
+
+int store_job_server_proxy(edg_wll_Context ctx, edg_wll_Event *event, char *jobIdHost, unsigned int jobIdPort)
+{
+ char *unique = edg_wlc_JobIdGetUnique(event->any.jobId);
+ char *q = NULL, *owner = NULL, *userid = NULL;
+ edg_wll_Stmt stmt = NULL;
+ int nar;
+
+
+ edg_wll_ResetError(ctx);
+
+ if (ctx->isProxy) {
+ /* event arrived on proxy socket */
+ if (event->any.type == EDG_WLL_EVENT_REGJOB) {
+ if (event->any.priority & EDG_WLL_LOGFLAG_DIRECT) {
+ /* first synchronous registration */
+ if ((ctx->srvPort == jobIdPort) && !strcmp(jobIdHost,ctx->srvName)) {
+ /* we are both server and proxy for this job */
+ trio_asprintf(&q,"update jobs set proxy=1 where jobid='%|Ss'",
+ unique);
+
+ nar = edg_wll_ExecStmt(ctx, q, NULL);
+
+ if (nar == 0) {
+ /* job isn't registered yet */
+ userid = strdup(strmd5("unknown_to_proxy", NULL));
+ if (store_user(ctx,userid,"unknown_to_proxy")) goto err;
+
+ if (store_job(ctx,(const edg_wlc_JobId) event->any.jobId,
+ userid, 1, ctx->serverRunning)) goto err;
+
+ }
+ else {} /* job was registered thru GSI, no further action needed */
+ /* or error occured - and will go out via return() */
+ }
+ else {
+ /* we are only proxy for this job, forward it to server */
+
+ /* XXX - does it have any sence ??
+ if (!strcmp(e->any.user,EDG_WLL_LOG_USER_DEFAULT)) {
+ edg_wll_SetError(ctx,EPERM,"can't register jobs anonymously");
+ goto err;
+ }
+ */
+
+ userid = strdup(strmd5(event->any.user, NULL));
+ if (store_user(ctx,userid,event->any.user)) goto err;
+
+ if (store_job(ctx,(const edg_wlc_JobId) event->any.jobId,
+ userid, 1, 0)) goto err;
+ }
+ }
+ else {
+ /* supplementary re-registration (JDL of subjob, etc.) */
+ if ((ctx->srvPort == jobIdPort) && !strcmp(jobIdHost,ctx->srvName)) {
+ /* previous registration via GSI required */
+ trio_asprintf(&q,"update jobs set jobid='%|Ss' where jobid='%|Ss'",
+ unique, unique);
+ /* does the job exists ? */
+ if (edg_wll_ExecStmt(ctx,q,NULL) < 0) {
+ edg_wll_SetError(ctx, ENOENT, "job not registered");
+ goto err;
+ }
+ }
+ else {
+ /* try to register job in case that first reistration */
+ /* was sent to server only; ignore errors (EEXIST) */
+ userid = strdup(strmd5(event->any.user, NULL));
+ if (store_user(ctx,userid,event->any.user)) goto err;
+
+ store_job(ctx,(const edg_wlc_JobId) event->any.jobId,
+ userid, 1, 0);
+ edg_wll_ResetError(ctx);
+
+ }
+ }
+ }
+ else {
+ /* any other event than JobReg */
+ trio_asprintf(&q,"update jobs set jobid='%|Ss' where jobid='%|Ss'",
+ unique, unique);
+ /* does the job exists ? now we require registration on proxy too */
+ if (edg_wll_ExecStmt(ctx,q,NULL) < 0) {
+ edg_wll_SetError(ctx, ENOENT, "job not registered");
+ goto err;
+ }
+ }
+ }
+ else {
+ /* event arrived on LB port */
+ if (event->any.type == EDG_WLL_EVENT_REGJOB) {
+ trio_asprintf(&q,"select cert_subj from jobs,users where jobs.jobid='%|Ss'"
+ " AND jobs.userid=users.userid",unique);
+ if ( (nar = edg_wll_ExecStmt(ctx,q,&stmt)) < 0 || edg_wll_FetchRow(stmt,&owner) < 0 )
+ goto err;
+
+ if (nar) {
+ /* job is already registered */
+ if (!strcmp(owner,"unknown_to_proxy")) {
+ /* proxy registration was already done */
+ userid = strdup(strmd5(ctx->peerName, NULL));
+ if (store_user(ctx,userid,ctx->peerName)) goto err;
+
+ trio_asprintf(&q,"update jobs set server=1, userid='%|Ss' where jobid='%|Ss'",
+ userid, unique);
+
+ edg_wll_ExecStmt(ctx, q, NULL);
+ }
+ else { } /* re-registration, no action needed */
+ }
+ else {
+ userid = strdup(strmd5(ctx->peerName, NULL));
+ if (store_user(ctx,userid,ctx->peerName)) goto err;
+
+ if (store_job(ctx,(const edg_wlc_JobId) event->any.jobId,
+ userid, 0, 1)) goto err;
+ }
+ }
+ else {
+ /* any other event than JobReg */
+ /* no action needed */
+ }
+ }
+
+err:
+ if (stmt) edg_wll_FreeStmt(&stmt);
+ free(unique);
+ free(userid);
+ free(q);
+
+ return edg_wll_Error(ctx,NULL,NULL);
+}
+
+
/*
* XXX: store it in SHORT_FIELDS for now despite it should go to dedicated
* column in EVENTS.
edg_wll_ResetError(ctx);
- if (e->type == EDG_WLL_EVENT_REGJOB)
- return strcmp(e->any.user,EDG_WLL_LOG_USER_DEFAULT) ?
- 0 : edg_wll_SetError(ctx,EPERM,"can't register jobs anonymously");
-
trio_asprintf(&q,"select userid from jobs where jobid='%|Ss'",jobid);
if (edg_wll_ExecStmt(ctx,q,&stmt) < 0
) goto clean;
if (!owner) {
- if ( ctx->isProxy && !e->any.seqcode )
- edg_wll_SetError(ctx, EINVAL, "Job not registered - sequence code needed");
+ if ( ctx->isProxy )
+ edg_wll_SetError(ctx, EINVAL, "Job not registered");
else
/* We have to let the calling function know what happened here
* even if it hapens inside the LB Proxy which shouldn't consider
#endif
edg_wll_bufInsert bi_s, *bi_states = &bi_s;
char *icnames, *values;
+ int server, proxy, membership = 0;
edg_wll_ResetError(ctx);
#ifdef LB_BUF
/* init multirows insert mechanism for tables used here */
if (edg_wll_bufferedInsertInit(ctx, bi_jobs, NULL, "jobs", 4000, 1000,
- "jobid, dg_jobid, userid"))
+ "jobid, dg_jobid, userid, proxy, server"))
{
return edg_wll_SetError(ctx, EINVAL, "edg_wll_bufferedInsertInit()");
}
ctx->p_tmp_timeout.tv_sec += e->nsubjobs/10;
if (ctx->p_tmp_timeout.tv_sec > 86400) ctx->p_tmp_timeout.tv_sec = 86400;
+ membership = edg_wll_jobMembership(ctx, e->jobId);
+ proxy = membership & DB_PROXY_JOB;
+ server = membership & DB_SERVER_JOB;
+
for (i=0; i<e->nsubjobs; i++) {
char *et,*ed,*job_s,*p,*p1;
/* save jobid-userid relation into jobs table */
#ifdef LB_BUF
- if ((err = store_job_block(ctx, subjobs[i], userid, bi_jobs)))
+ if ((err = store_job_block(ctx, subjobs[i], userid, bi_jobs, proxy, server)))
#else
- if ((err = store_job(ctx, subjobs[i], userid)))
+ if ((err = store_job(ctx, subjobs[i], userid, proxy, server)))
#endif
edg_wll_Error(ctx,&et,&ed);