#include "glite/lb/events_parse.h"
#include "glite/lb/escape.h"
#include "glite/lb/log_proto.h"
+#include "glite/lb/lb_plain_io.h"
#include "il_lbproxy.h"
#include "lb_xml_parse.h"
}\
}
-int edg_wll_SendEventProxy(edg_wll_Context ctx, const char *owner)
+static int event_save_to_file_proxy(
+ edg_wll_Context ctx,
+ const char *event_file,
+ const char *ulm_data,
+ long *filepos)
{
- return 0;
-}
-
-static
-int
-notif_create_ulm(
- edg_wll_Context context,
- edg_wll_NotifId reg_id,
- const char *host,
- const uint16_t port,
- const char *owner,
- const char *notif_data,
- char **ulm_data,
- char **reg_id_s)
-{
- int ret;
- edg_wll_Event *event=NULL;
-
- *ulm_data = NULL;
- *reg_id_s = NULL;
-
- event = edg_wll_InitEvent(EDG_WLL_EVENT_NOTIFICATION);
-
- gettimeofday(&event->any.timestamp,0);
- if (context->p_host) event->any.host = strdup(context->p_host);
- event->any.level = context->p_level;
- event->any.source = context->p_source;
- if (context->p_instance) event->notification.src_instance = strdup(context->p_instance);
- event->notification.notifId = reg_id;
- if (owner) event->notification.owner = strdup(owner);
- if (host) event->notification.dest_host = strdup(host);
- event->notification.dest_port = port;
- if (notif_data) event->notification.jobstat = strdup(notif_data);
-
- if ((*ulm_data = edg_wll_UnparseNotifEvent(context,event)) == NULL) {
- edg_wll_SetError(context, ret = ENOMEM, "edg_wll_UnparseNotifEvent()");
- goto out;
- }
-
- if((*reg_id_s = edg_wll_NotifIdGetUnique(reg_id)) == NULL) {
- edg_wll_SetError(context, ret = ENOMEM, "edg_wll_NotifIdGetUnique()");
- goto out;
- }
-
- ret = 0;
-
-out:
- if(event) {
- edg_wll_FreeEvent(event);
- free(event);
- }
- if(ret) edg_wll_UpdateError(context, ret, "notif_create_ulm()");
- return(ret);
-}
+ FILE *outfile;
+ struct flock filelock;
+ int filedesc,
+ i, filelock_status=-1;
-static
-int
-notif_save_to_file(edg_wll_Context context,
- const char *event_file,
- const char *ulm_data,
- long *filepos)
-{
- int ret;
- FILE *outfile;
- int filedesc;
- struct flock filelock;
- int i, filelock_status=-1;
-
- for(i=0; i < FCNTL_ATTEMPTS; i++) {
+ for( i = 0; i < FCNTL_ATTEMPTS; i++ ) {
/* fopen and properly handle the filelock */
- if ((outfile = fopen(event_file,"a")) == NULL) {
- edg_wll_SetError(context, ret = errno, "fopen()");
+ if ( (outfile = fopen(event_file, "a")) == NULL ) {
+ edg_wll_SetError(ctx, errno, "fopen()");
goto out;
}
- if ((filedesc = fileno(outfile)) == -1) {
- edg_wll_SetError(context, ret = errno, "fileno()");
+ if ( (filedesc = fileno(outfile)) == -1 ) {
+ edg_wll_SetError(ctx, errno, "fileno()");
goto out1;
}
filelock.l_type = F_WRLCK;
break;
default:
/* other error */
- edg_wll_SetError(context, ret=errno, "fcntl()");
+ edg_wll_SetError(ctx, errno, "fcntl()");
goto out1;
}
} else {
break;
}
}
- if (fseek(outfile, 0, SEEK_END) == -1) {
- edg_wll_SetError(context, ret = errno, "fseek()");
- goto out1;
- }
- if ((*filepos=ftell(outfile)) == -1) {
- edg_wll_SetError(context, ret = errno, "ftell()");
- goto out1;
- }
- /* write, flush and sync */
- if (fputs(ulm_data, outfile) == EOF) {
- edg_wll_SetError(context, ret = errno, "fputs()");
- goto out1;
- }
- if (fflush(outfile) == EOF) {
- edg_wll_SetError(context, ret = errno, "fflush()");
- goto out1;
- }
- if (fsync(filedesc) < 0) { /* synchronize */
- edg_wll_SetError(context, ret = errno, "fsync()");
- goto out1;
- }
+ if (fseek(outfile, 0, SEEK_END) == -1) { edg_wll_SetError(ctx, errno, "fseek()"); goto out1; }
+ if ((*filepos=ftell(outfile)) == -1) { edg_wll_SetError(ctx, errno, "ftell()"); goto out1; }
+ if (fputs(ulm_data, outfile) == EOF) { edg_wll_SetError(ctx, errno, "fputs()"); goto out1; }
+ if (fflush(outfile) == EOF) { edg_wll_SetError(ctx, errno, "fflush()"); goto out1; }
+ if (fsync(filedesc) < 0) { edg_wll_SetError(ctx, errno, "fsync()"); goto out1; }
- ret = 0;
out1:
- /* close and unlock */
fclose(outfile);
-out:
- if(ret) edg_wll_UpdateError(context, ret, "notif_save_to_file()");
- return(ret);
-}
-
-static
-ssize_t
-socket_write_full(edg_wll_Context context,
- int sock,
- void *buf,
- size_t bufsize,
- struct timeval *timeout,
- ssize_t *total)
-{
- int ret = 0;
- ssize_t len;
-
- *total = 0;
- while (bufsize > 0) {
-
- fd_set fds;
- struct timeval to,before,after;
-
- if (timeout) {
- memcpy(&to, timeout, sizeof(to));
- gettimeofday(&before, NULL);
- }
-
- len = write(sock, buf, bufsize);
- while (len <= 0) {
- FD_ZERO(&fds);
- FD_SET(sock, &fds);
- if (select(sock+1, &fds, NULL, NULL, timeout ? &to : NULL) < 0) {
- edg_wll_SetError(context, ret = errno, "select()");
- goto out;
- }
- len = write(sock, buf, bufsize);
- }
- if (timeout) {
- gettimeofday(&after,NULL);
- tv_sub(after, before);
- tv_sub(*timeout,after);
- if (timeout->tv_sec < 0) {
- timeout->tv_sec = 0;
- timeout->tv_usec = 0;
- }
- }
-
- if (len < 0) {
- edg_wll_SetError(context, ret = errno, "write()");
- goto out;
- }
-
- bufsize -= len;
- buf += len;
- *total += len;
- }
-
- ret = 0;
out:
- if(ret) edg_wll_UpdateError(context, ret, "socket_write_full()");
- return ret;
+ return edg_wll_Error(ctx, NULL, NULL)?
+ edg_wll_UpdateError(ctx, 0, "event_save_to_file_proxy()"): 0;
}
-static
-int
-notif_send_socket(edg_wll_Context context,
- long filepos,
- const char *ulm_data)
+static int event_send_socket_proxy(
+ edg_wll_Context ctx,
+ long filepos,
+ const char *ulm_data)
{
- int ret;
- struct sockaddr_un saddr;
- int msg_sock, flags;
- size_t count;
- struct timeval timeout;
+ edg_wll_PlainConnection conn;
+ struct sockaddr_un saddr;
+ int flags;
+ struct timeval timeout;
+
timeout.tv_sec = EDG_WLL_LOG_TIMEOUT_MAX;
- timeout.tv_usec = 0;
+ timeout.tv_usec = 0;
- msg_sock = socket(PF_UNIX, SOCK_STREAM, 0);
- if(msg_sock < 0) {
- edg_wll_SetError(context, ret = errno, "socket()");
- goto out;
- }
+ if ( (conn.sock = socket(PF_UNIX, SOCK_STREAM, 0)) < 0 )
+ edg_wll_SetError(ctx, errno, "socket()");
memset(&saddr, 0, sizeof(saddr));
saddr.sun_family = AF_UNIX;
strcpy(saddr.sun_path, lbproxy_ilog_socket_path);
- if ((flags = fcntl(msg_sock, F_GETFL, 0)) < 0 ||
- fcntl(msg_sock, F_SETFL, flags | O_NONBLOCK) < 0) {
- edg_wll_SetError(context, ret = errno, "fcntl()");
- goto out1;
- }
-
- if(connect(msg_sock, (struct sockaddr *)&saddr, sizeof(saddr)) < 0) {
- if(errno != EISCONN) {
- edg_wll_SetError(context, ret = errno, "connect()");
- goto out1;
- }
+ if ( (flags = fcntl(conn.sock, F_GETFL, 0)) < 0
+ || fcntl(conn.sock, F_SETFL, flags | O_NONBLOCK) < 0) {
+ edg_wll_SetError(ctx, errno, "fcntl()");
+ goto out;
}
- if (socket_write_full(context, msg_sock, &filepos, sizeof(filepos), &timeout, &count) < 0) {
- ret = errno;
- goto out1;
+ if ( connect(conn.sock, (struct sockaddr *)&saddr, sizeof(saddr)) < 0) {
+ if(errno != EISCONN) { edg_wll_SetError(ctx, errno, "connect()"); goto out; }
}
- if (socket_write_full(context, msg_sock, (void*)ulm_data, strlen(ulm_data), &timeout, &count) < 0) {
- ret = errno;
- goto out1;
+ if ( edg_wll_plain_write_full(&conn, &filepos, sizeof(filepos), &timeout) < 0
+ || edg_wll_plain_write_full(&conn, (void*)ulm_data, strlen(ulm_data), &timeout) < 0 ) {
+ edg_wll_SetError(ctx, errno, "edg_wll_plain_write_full()");
+ goto out;
}
- ret = 0;
-
-out1:
- close(msg_sock);
out:
- if(ret) edg_wll_UpdateError(context, ret, "notif_send_socket()");
- return(ret);
+ close(conn.sock);
+ return edg_wll_Error(ctx, NULL, NULL)?
+ edg_wll_UpdateError(ctx, 0, "event_send_socket_proxy()"): 0;
}
-int
-edg_wll_NotifSend_a(edg_wll_Context context,
- edg_wll_NotifId reg_id,
- const char *host,
- int port,
- const char *owner,
- const char *notif_data)
+int edg_wll_EventSendProxy(edg_wll_Context ctx, const edg_wlc_JobId jobid, const char *event)
{
- int ret;
- long filepos;
- char *ulm_data, *reg_id_s, *event_file;
+ long filepos;
+ char *jobid_s, *event_file = NULL;
- if((ret=notif_create_ulm(context,
- reg_id,
- host,
- port,
- owner,
- notif_data,
- &ulm_data,
- ®_id_s))) {
- goto out;
- }
- asprintf(&event_file, "%s.%s", lbproxy_ilog_file_prefix, reg_id_s);
- if(event_file == NULL) {
- edg_wll_SetError(context, ret=ENOMEM, "asprintf()");
- goto out;
- }
+ edg_wll_ResetError(ctx);
+ jobid_s = edg_wlc_JobIdGetUnique(jobid);
+ if ( !jobid_s ) { edg_wll_SetError(ctx, ENOMEM, "edg_wlc_JobIdGetUnique()"); goto out; }
- if((ret=notif_save_to_file(context,
- event_file,
- ulm_data,
- &filepos))) {
- goto out;
- }
+ asprintf(&event_file, "%s.%s", lbproxy_ilog_file_prefix, jobid_s);
+ if ( !event_file ) { edg_wll_SetError(ctx, ENOMEM, "asprintf()"); goto out; }
- if((ret=notif_send_socket(context,
- filepos,
- ulm_data))) {
- goto out;
- }
- ret = 0;
+ if ( event_save_to_file_proxy(ctx, event_file, event, &filepos)
+ || event_send_socket_proxy(ctx, filepos, event) ) goto out;
out:
- if(ulm_data) free(ulm_data);
- if(reg_id_s) free(reg_id_s);
- if(ret) edg_wll_UpdateError(context, ret, "edg_wll_NotifSend()");
- return(ret);
-}
-
+ if ( jobid_s ) free(jobid_s);
+ if ( event_file ) free(event_file);
-int
-edg_wll_NotifJobStatus_a(edg_wll_Context context,
- edg_wll_NotifId reg_id,
- const char *host,
- int port,
- const char *owner,
- const edg_wll_JobStat notif_job_stat)
-{
- int ret=0;
- char *xml_data, *xml_esc_data=NULL;
-
- if(edg_wll_JobStatusToXML(context, notif_job_stat, &xml_data))
- goto out;
-
- if((xml_esc_data = edg_wll_EscapeXML(xml_data)) == NULL) {
- edg_wll_SetError(context, ret=ENOMEM, "edg_wll_EscapeXML()");
- goto out;
- }
-
- ret=edg_wll_NotifSend(context, reg_id, host, port, owner, xml_esc_data);
-
-out:
- if(xml_data) free(xml_data);
- if(xml_esc_data) free(xml_esc_data);
- if(ret) edg_wll_UpdateError(context, ret, "edg_wll_NotifJobStatus()");
- return(ret);
+ return edg_wll_Error(ctx, NULL, NULL)? edg_wll_UpdateError(ctx, 0, "edg_wll_EventSendProxy()"): 0;
}