From 8d20e7ef44ccb7cd08802d050aea29b63546bbc8 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Ji=C5=99=C3=AD=20=C5=A0kr=C3=A1bal?= Date: Thu, 9 Dec 2004 14:35:23 +0000 Subject: [PATCH] - forwarding events prom LB Proxy to LB server added --- org.glite.lb.server/Makefile | 4 +- org.glite.lb.server/src/db_store.c | 13 +- org.glite.lb.server/src/il_lbproxy.c | 327 +++++++---------------------------- org.glite.lb.server/src/il_lbproxy.h | 4 +- 4 files changed, 74 insertions(+), 274 deletions(-) diff --git a/org.glite.lb.server/Makefile b/org.glite.lb.server/Makefile index c4d2927..ac6cd76 100644 --- a/org.glite.lb.server/Makefile +++ b/org.glite.lb.server/Makefile @@ -117,8 +117,8 @@ LB_PROXY_OBJS:= lbproxy.o il_lbproxy.o get_events.o index.o jobstat.o seqcode.o stored_master.o srv_purge.o server_state.o dump.o lb_authz.o load.o \ notification.o il_notification.o notif_match.o -SERVER_OBJS:= bkserverd.o get_events.o index.o jobstat.o jobstat_supp.o seqcode.o \ - write2rgma.o lbs_db.o lb_html.o lb_http.o lb_proto.o lb_xml_parse.o \ +SERVER_OBJS:= bkserverd.o il_lbproxy.o get_events.o index.o jobstat.o jobstat_supp.o \ + seqcode.o write2rgma.o lbs_db.o lb_html.o lb_http.o lb_proto.o lb_xml_parse.o \ lb_xml_parse_V21.o \ lock.o openserver.o query.o userjobs.o db_store.o request.o store.o \ stored_master.o srv_purge.o server_state.o dump.o lb_authz.o load.o \ diff --git a/org.glite.lb.server/src/db_store.c b/org.glite.lb.server/src/db_store.c index b4a1c1c..8ff5fb5 100644 --- a/org.glite.lb.server/src/db_store.c +++ b/org.glite.lb.server/src/db_store.c @@ -11,6 +11,7 @@ #include "store.h" #include "lbs_db.h" #include "lock.h" +#include "il_lbproxy.h" extern int debug; @@ -65,13 +66,19 @@ db_store(edg_wll_Context ctx,char *ucs, char *event) ev->changeACL.permission, ev->changeACL.permission_type, ev->changeACL.operation); else - err = edg_wll_StepIntState(ctx,ev->any.jobId, ev, seq,&newstat); + err = edg_wll_StepIntState(ctx,ev->any.jobId, ev, seq, ctx->isProxy? NULL: &newstat); if (edg_wll_UnlockJob(ctx,ev->any.jobId)) goto err; if (err) goto err; - if (!ctx->isProxy && newstat.state) { - edg_wll_NotifMatch(ctx,&newstat); + if ( ctx->isProxy ) { + /* + * send event to the proper BK server + */ + if ( ev->any.type != EDG_WLL_EVENT_REGJOB + && edg_wll_EventSendProxy(ctx, ev->any.jobId, event) ) goto err; + } else if ( newstat.state ) { + edg_wll_NotifMatch(ctx, &newstat); edg_wll_FreeStatus(&newstat); } diff --git a/org.glite.lb.server/src/il_lbproxy.c b/org.glite.lb.server/src/il_lbproxy.c index e4c228b..0d768f6 100644 --- a/org.glite.lb.server/src/il_lbproxy.c +++ b/org.glite.lb.server/src/il_lbproxy.c @@ -16,6 +16,7 @@ #include "glite/lb/events_parse.h" #include "glite/lb/escape.h" #include "glite/lb/log_proto.h" +#include "glite/lb/lb_plain_io.h" #include "il_lbproxy.h" #include "lb_xml_parse.h" @@ -39,85 +40,26 @@ char *lbproxy_ilog_file_prefix = FILE_PREFIX; }\ } -int edg_wll_SendEventProxy(edg_wll_Context ctx, const char *owner) +static int event_save_to_file_proxy( + edg_wll_Context ctx, + const char *event_file, + const char *ulm_data, + long *filepos) { - return 0; -} - -static -int -notif_create_ulm( - edg_wll_Context context, - edg_wll_NotifId reg_id, - const char *host, - const uint16_t port, - const char *owner, - const char *notif_data, - char **ulm_data, - char **reg_id_s) -{ - int ret; - edg_wll_Event *event=NULL; - - *ulm_data = NULL; - *reg_id_s = NULL; - - event = edg_wll_InitEvent(EDG_WLL_EVENT_NOTIFICATION); - - gettimeofday(&event->any.timestamp,0); - if (context->p_host) event->any.host = strdup(context->p_host); - event->any.level = context->p_level; - event->any.source = context->p_source; - if (context->p_instance) event->notification.src_instance = strdup(context->p_instance); - event->notification.notifId = reg_id; - if (owner) event->notification.owner = strdup(owner); - if (host) event->notification.dest_host = strdup(host); - event->notification.dest_port = port; - if (notif_data) event->notification.jobstat = strdup(notif_data); - - if ((*ulm_data = edg_wll_UnparseNotifEvent(context,event)) == NULL) { - edg_wll_SetError(context, ret = ENOMEM, "edg_wll_UnparseNotifEvent()"); - goto out; - } - - if((*reg_id_s = edg_wll_NotifIdGetUnique(reg_id)) == NULL) { - edg_wll_SetError(context, ret = ENOMEM, "edg_wll_NotifIdGetUnique()"); - goto out; - } - - ret = 0; - -out: - if(event) { - edg_wll_FreeEvent(event); - free(event); - } - if(ret) edg_wll_UpdateError(context, ret, "notif_create_ulm()"); - return(ret); -} + FILE *outfile; + struct flock filelock; + int filedesc, + i, filelock_status=-1; -static -int -notif_save_to_file(edg_wll_Context context, - const char *event_file, - const char *ulm_data, - long *filepos) -{ - int ret; - FILE *outfile; - int filedesc; - struct flock filelock; - int i, filelock_status=-1; - - for(i=0; i < FCNTL_ATTEMPTS; i++) { + for( i = 0; i < FCNTL_ATTEMPTS; i++ ) { /* fopen and properly handle the filelock */ - if ((outfile = fopen(event_file,"a")) == NULL) { - edg_wll_SetError(context, ret = errno, "fopen()"); + if ( (outfile = fopen(event_file, "a")) == NULL ) { + edg_wll_SetError(ctx, errno, "fopen()"); goto out; } - if ((filedesc = fileno(outfile)) == -1) { - edg_wll_SetError(context, ret = errno, "fileno()"); + if ( (filedesc = fileno(outfile)) == -1 ) { + edg_wll_SetError(ctx, errno, "fileno()"); goto out1; } filelock.l_type = F_WRLCK; @@ -135,7 +77,7 @@ notif_save_to_file(edg_wll_Context context, break; default: /* other error */ - edg_wll_SetError(context, ret=errno, "fcntl()"); + edg_wll_SetError(ctx, errno, "fcntl()"); goto out1; } } else { @@ -143,231 +85,84 @@ notif_save_to_file(edg_wll_Context context, break; } } - if (fseek(outfile, 0, SEEK_END) == -1) { - edg_wll_SetError(context, ret = errno, "fseek()"); - goto out1; - } - if ((*filepos=ftell(outfile)) == -1) { - edg_wll_SetError(context, ret = errno, "ftell()"); - goto out1; - } - /* write, flush and sync */ - if (fputs(ulm_data, outfile) == EOF) { - edg_wll_SetError(context, ret = errno, "fputs()"); - goto out1; - } - if (fflush(outfile) == EOF) { - edg_wll_SetError(context, ret = errno, "fflush()"); - goto out1; - } - if (fsync(filedesc) < 0) { /* synchronize */ - edg_wll_SetError(context, ret = errno, "fsync()"); - goto out1; - } + if (fseek(outfile, 0, SEEK_END) == -1) { edg_wll_SetError(ctx, errno, "fseek()"); goto out1; } + if ((*filepos=ftell(outfile)) == -1) { edg_wll_SetError(ctx, errno, "ftell()"); goto out1; } + if (fputs(ulm_data, outfile) == EOF) { edg_wll_SetError(ctx, errno, "fputs()"); goto out1; } + if (fflush(outfile) == EOF) { edg_wll_SetError(ctx, errno, "fflush()"); goto out1; } + if (fsync(filedesc) < 0) { edg_wll_SetError(ctx, errno, "fsync()"); goto out1; } - ret = 0; out1: - /* close and unlock */ fclose(outfile); -out: - if(ret) edg_wll_UpdateError(context, ret, "notif_save_to_file()"); - return(ret); -} - -static -ssize_t -socket_write_full(edg_wll_Context context, - int sock, - void *buf, - size_t bufsize, - struct timeval *timeout, - ssize_t *total) -{ - int ret = 0; - ssize_t len; - - *total = 0; - while (bufsize > 0) { - - fd_set fds; - struct timeval to,before,after; - - if (timeout) { - memcpy(&to, timeout, sizeof(to)); - gettimeofday(&before, NULL); - } - - len = write(sock, buf, bufsize); - while (len <= 0) { - FD_ZERO(&fds); - FD_SET(sock, &fds); - if (select(sock+1, &fds, NULL, NULL, timeout ? &to : NULL) < 0) { - edg_wll_SetError(context, ret = errno, "select()"); - goto out; - } - len = write(sock, buf, bufsize); - } - if (timeout) { - gettimeofday(&after,NULL); - tv_sub(after, before); - tv_sub(*timeout,after); - if (timeout->tv_sec < 0) { - timeout->tv_sec = 0; - timeout->tv_usec = 0; - } - } - - if (len < 0) { - edg_wll_SetError(context, ret = errno, "write()"); - goto out; - } - - bufsize -= len; - buf += len; - *total += len; - } - - ret = 0; out: - if(ret) edg_wll_UpdateError(context, ret, "socket_write_full()"); - return ret; + return edg_wll_Error(ctx, NULL, NULL)? + edg_wll_UpdateError(ctx, 0, "event_save_to_file_proxy()"): 0; } -static -int -notif_send_socket(edg_wll_Context context, - long filepos, - const char *ulm_data) +static int event_send_socket_proxy( + edg_wll_Context ctx, + long filepos, + const char *ulm_data) { - int ret; - struct sockaddr_un saddr; - int msg_sock, flags; - size_t count; - struct timeval timeout; + edg_wll_PlainConnection conn; + struct sockaddr_un saddr; + int flags; + struct timeval timeout; + timeout.tv_sec = EDG_WLL_LOG_TIMEOUT_MAX; - timeout.tv_usec = 0; + timeout.tv_usec = 0; - msg_sock = socket(PF_UNIX, SOCK_STREAM, 0); - if(msg_sock < 0) { - edg_wll_SetError(context, ret = errno, "socket()"); - goto out; - } + if ( (conn.sock = socket(PF_UNIX, SOCK_STREAM, 0)) < 0 ) + edg_wll_SetError(ctx, errno, "socket()"); memset(&saddr, 0, sizeof(saddr)); saddr.sun_family = AF_UNIX; strcpy(saddr.sun_path, lbproxy_ilog_socket_path); - if ((flags = fcntl(msg_sock, F_GETFL, 0)) < 0 || - fcntl(msg_sock, F_SETFL, flags | O_NONBLOCK) < 0) { - edg_wll_SetError(context, ret = errno, "fcntl()"); - goto out1; - } - - if(connect(msg_sock, (struct sockaddr *)&saddr, sizeof(saddr)) < 0) { - if(errno != EISCONN) { - edg_wll_SetError(context, ret = errno, "connect()"); - goto out1; - } + if ( (flags = fcntl(conn.sock, F_GETFL, 0)) < 0 + || fcntl(conn.sock, F_SETFL, flags | O_NONBLOCK) < 0) { + edg_wll_SetError(ctx, errno, "fcntl()"); + goto out; } - if (socket_write_full(context, msg_sock, &filepos, sizeof(filepos), &timeout, &count) < 0) { - ret = errno; - goto out1; + if ( connect(conn.sock, (struct sockaddr *)&saddr, sizeof(saddr)) < 0) { + if(errno != EISCONN) { edg_wll_SetError(ctx, errno, "connect()"); goto out; } } - if (socket_write_full(context, msg_sock, (void*)ulm_data, strlen(ulm_data), &timeout, &count) < 0) { - ret = errno; - goto out1; + if ( edg_wll_plain_write_full(&conn, &filepos, sizeof(filepos), &timeout) < 0 + || edg_wll_plain_write_full(&conn, (void*)ulm_data, strlen(ulm_data), &timeout) < 0 ) { + edg_wll_SetError(ctx, errno, "edg_wll_plain_write_full()"); + goto out; } - ret = 0; - -out1: - close(msg_sock); out: - if(ret) edg_wll_UpdateError(context, ret, "notif_send_socket()"); - return(ret); + close(conn.sock); + return edg_wll_Error(ctx, NULL, NULL)? + edg_wll_UpdateError(ctx, 0, "event_send_socket_proxy()"): 0; } -int -edg_wll_NotifSend_a(edg_wll_Context context, - edg_wll_NotifId reg_id, - const char *host, - int port, - const char *owner, - const char *notif_data) +int edg_wll_EventSendProxy(edg_wll_Context ctx, const edg_wlc_JobId jobid, const char *event) { - int ret; - long filepos; - char *ulm_data, *reg_id_s, *event_file; + long filepos; + char *jobid_s, *event_file = NULL; - if((ret=notif_create_ulm(context, - reg_id, - host, - port, - owner, - notif_data, - &ulm_data, - ®_id_s))) { - goto out; - } - asprintf(&event_file, "%s.%s", lbproxy_ilog_file_prefix, reg_id_s); - if(event_file == NULL) { - edg_wll_SetError(context, ret=ENOMEM, "asprintf()"); - goto out; - } + edg_wll_ResetError(ctx); + jobid_s = edg_wlc_JobIdGetUnique(jobid); + if ( !jobid_s ) { edg_wll_SetError(ctx, ENOMEM, "edg_wlc_JobIdGetUnique()"); goto out; } - if((ret=notif_save_to_file(context, - event_file, - ulm_data, - &filepos))) { - goto out; - } + asprintf(&event_file, "%s.%s", lbproxy_ilog_file_prefix, jobid_s); + if ( !event_file ) { edg_wll_SetError(ctx, ENOMEM, "asprintf()"); goto out; } - if((ret=notif_send_socket(context, - filepos, - ulm_data))) { - goto out; - } - ret = 0; + if ( event_save_to_file_proxy(ctx, event_file, event, &filepos) + || event_send_socket_proxy(ctx, filepos, event) ) goto out; out: - if(ulm_data) free(ulm_data); - if(reg_id_s) free(reg_id_s); - if(ret) edg_wll_UpdateError(context, ret, "edg_wll_NotifSend()"); - return(ret); -} - + if ( jobid_s ) free(jobid_s); + if ( event_file ) free(event_file); -int -edg_wll_NotifJobStatus_a(edg_wll_Context context, - edg_wll_NotifId reg_id, - const char *host, - int port, - const char *owner, - const edg_wll_JobStat notif_job_stat) -{ - int ret=0; - char *xml_data, *xml_esc_data=NULL; - - if(edg_wll_JobStatusToXML(context, notif_job_stat, &xml_data)) - goto out; - - if((xml_esc_data = edg_wll_EscapeXML(xml_data)) == NULL) { - edg_wll_SetError(context, ret=ENOMEM, "edg_wll_EscapeXML()"); - goto out; - } - - ret=edg_wll_NotifSend(context, reg_id, host, port, owner, xml_esc_data); - -out: - if(xml_data) free(xml_data); - if(xml_esc_data) free(xml_esc_data); - if(ret) edg_wll_UpdateError(context, ret, "edg_wll_NotifJobStatus()"); - return(ret); + return edg_wll_Error(ctx, NULL, NULL)? edg_wll_UpdateError(ctx, 0, "edg_wll_EventSendProxy()"): 0; } diff --git a/org.glite.lb.server/src/il_lbproxy.h b/org.glite.lb.server/src/il_lbproxy.h index 1f586e6..4107304 100644 --- a/org.glite.lb.server/src/il_lbproxy.h +++ b/org.glite.lb.server/src/il_lbproxy.h @@ -8,9 +8,7 @@ extern char *lbproxy_ilog_socket_path; extern char *lbproxy_ilog_file_prefix; -int edg_wll_SendEventProxy( - edg_wll_Context context, - const char *owner); +int edg_wll_EventSendProxy(edg_wll_Context ctx, const edg_wlc_JobId jobid, const char *event); #ifdef __cplusplus } -- 1.8.2.3